architect/research/WMS_IMPLEMENTATION_EXAMPLES.md

WMS Implementation Examples

Практические примеры кода для быстрого старта


1. DATA MODELS (SQLAlchemy + Pydantic)

Base models

# models/base.py
from sqlalchemy import Column, Integer, String, Float, DateTime, Enum
from sqlalchemy.ext.declarative import declarative_base
from datetime import datetime
import enum

Base = declarative_base()

class Warehouse(Base):
    __tablename__ = "warehouses"

    id = Column(Integer, primary_key=True)
    code = Column(String(20), unique=True, nullable=False)
    name = Column(String(255), nullable=False)
    address = Column(String(500))
    created_at = Column(DateTime, default=datetime.utcnow)

class Location(Base):
    __tablename__ = "locations"

    id = Column(Integer, primary_key=True)
    warehouse_id = Column(Integer, ForeignKey("warehouses.id"), nullable=False)
    code = Column(String(50), nullable=False)  # A-01-02-03
    aisle = Column(String(10))
    shelf = Column(String(10))
    bin = Column(String(10))
    location_type = Column(String(20))  # storage, receiving, shipping, scrap
    capacity_units = Column(Float)
    current_usage = Column(Float, default=0)

    __table_args__ = (
        UniqueConstraint('warehouse_id', 'code'),
    )

class Product(Base):
    __tablename__ = "products"

    id = Column(Integer, primary_key=True)
    sku = Column(String(50), unique=True, nullable=False)
    name = Column(String(255), nullable=False)
    description = Column(String(1000))
    barcode = Column(String(50), unique=True)
    secondary_barcodes = Column(String(500))  # comma-separated
    weight = Column(Float)  # kg
    dimensions = Column(String(100))  # LxWxH in cm
    tracking_type = Column(String(20), default='none')  # none, serial, lot, both
    requires_serial = Column(Boolean, default=False)
    requires_lot = Column(Boolean, default=False)
    reorder_point = Column(Integer, default=0)
    unit_cost = Column(Float)

class Stock(Base):
    __tablename__ = "stock"

    id = Column(Integer, primary_key=True)
    product_id = Column(Integer, ForeignKey("products.id"), nullable=False)
    warehouse_id = Column(Integer, ForeignKey("warehouses.id"), nullable=False)
    location_id = Column(Integer, ForeignKey("locations.id"))

    quantity_on_hand = Column(Integer, default=0)
    quantity_allocated = Column(Integer, default=0)
    quantity_on_order = Column(Integer, default=0)

    # Tracking info
    lot_number = Column(String(50))
    serial_numbers = Column(String(2000))  # JSON list
    expiration_date = Column(Date)
    received_date = Column(DateTime)

    last_movement_date = Column(DateTime, default=datetime.utcnow)
    updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)

class MovementType(enum.Enum):
    INBOUND = "inbound"
    OUTBOUND = "outbound"
    ADJUSTMENT = "adjustment"
    TRANSFER = "transfer"
    PRODUCTION = "production"

class StockMovement(Base):
    __tablename__ = "stock_movements"

    id = Column(Integer, primary_key=True)
    movement_type = Column(Enum(MovementType), nullable=False)

    product_id = Column(Integer, ForeignKey("products.id"), nullable=False)
    quantity = Column(Integer, nullable=False)

    from_location_id = Column(Integer, ForeignKey("locations.id"))
    to_location_id = Column(Integer, ForeignKey("locations.id"))

    # Tracking
    lot_number = Column(String(50))
    serial_numbers = Column(String(2000))  # JSON

    # Reference document
    reference_type = Column(String(20))  # PO, SO, Assembly, Manual
    reference_id = Column(String(50))  # PO#123, SO#456

    # User & timestamp
    created_by_id = Column(Integer, ForeignKey("users.id"))
    created_at = Column(DateTime, default=datetime.utcnow)

    # Status
    status = Column(String(20), default='pending')  # pending, completed, cancelled
    notes = Column(String(500))

    __table_args__ = (
        Index('idx_product_date', 'product_id', 'created_at'),
        Index('idx_reference', 'reference_type', 'reference_id'),
    )

class PurchaseOrder(Base):
    __tablename__ = "purchase_orders"

    id = Column(Integer, primary_key=True)
    po_number = Column(String(50), unique=True, nullable=False)
    supplier_id = Column(Integer, ForeignKey("suppliers.id"), nullable=False)
    warehouse_id = Column(Integer, ForeignKey("warehouses.id"), nullable=False)

    order_date = Column(DateTime, default=datetime.utcnow)
    expected_delivery = Column(DateTime)
    status = Column(String(20), default='draft')  # draft, issued, received, closed

    total_amount = Column(Float)
    notes = Column(String(500))

class POLine(Base):
    __tablename__ = "po_lines"

    id = Column(Integer, primary_key=True)
    po_id = Column(Integer, ForeignKey("purchase_orders.id"), nullable=False)
    product_id = Column(Integer, ForeignKey("products.id"), nullable=False)

    quantity_ordered = Column(Integer, nullable=False)
    quantity_received = Column(Integer, default=0)
    unit_price = Column(Float)

    line_status = Column(String(20), default='pending')  # pending, partially_received, received

class SalesOrder(Base):
    __tablename__ = "sales_orders"

    id = Column(Integer, primary_key=True)
    so_number = Column(String(50), unique=True, nullable=False)
    customer_id = Column(Integer, ForeignKey("customers.id"), nullable=False)
    warehouse_id = Column(Integer, ForeignKey("warehouses.id"), nullable=False)

    order_date = Column(DateTime, default=datetime.utcnow)
    promised_delivery = Column(DateTime)
    status = Column(String(20), default='draft')  # draft, allocated, picked, packed, shipped

    total_amount = Column(Float)

class SOLine(Base):
    __tablename__ = "so_lines"

    id = Column(Integer, primary_key=True)
    so_id = Column(Integer, ForeignKey("sales_orders.id"), nullable=False)
    product_id = Column(Integer, ForeignKey("products.id"), nullable=False)

    quantity_ordered = Column(Integer, nullable=False)
    quantity_allocated = Column(Integer, default=0)
    quantity_picked = Column(Integer, default=0)
    quantity_shipped = Column(Integer, default=0)
    unit_price = Column(Float)

class InventoryCount(Base):
    __tablename__ = "inventory_counts"

    id = Column(Integer, primary_key=True)
    count_number = Column(String(50), unique=True, nullable=False)
    warehouse_id = Column(Integer, ForeignKey("warehouses.id"), nullable=False)
    location_id = Column(Integer, ForeignKey("locations.id"))  # NULL = entire warehouse

    count_date = Column(DateTime, default=datetime.utcnow)
    count_by_id = Column(Integer, ForeignKey("users.id"))
    status = Column(String(20), default='in_progress')  # in_progress, completed, posted

    posted_at = Column(DateTime)
    posted_by_id = Column(Integer, ForeignKey("users.id"))

class CountLine(Base):
    __tablename__ = "count_lines"

    id = Column(Integer, primary_key=True)
    count_id = Column(Integer, ForeignKey("inventory_counts.id"), nullable=False)
    product_id = Column(Integer, ForeignKey("products.id"), nullable=False)

    system_quantity = Column(Integer)
    counted_quantity = Column(Integer)
    variance = Column(Integer)
    variance_percentage = Column(Float)

    lot_number = Column(String(50))
    serial_numbers = Column(String(2000))

    notes = Column(String(500))

Pydantic schemas

# schemas/stock.py
from pydantic import BaseModel, Field
from typing import Optional, List
from datetime import datetime

class LocationCreate(BaseModel):
    warehouse_id: int
    code: str
    aisle: Optional[str] = None
    shelf: Optional[str] = None
    bin: Optional[str] = None
    location_type: str = "storage"
    capacity_units: Optional[float] = None

class LocationResponse(LocationCreate):
    id: int
    current_usage: float
    created_at: datetime

class ProductCreate(BaseModel):
    sku: str
    name: str
    barcode: Optional[str] = None
    tracking_type: str = "none"
    unit_cost: float

class StockResponse(BaseModel):
    product_id: int
    location_id: Optional[int] = None
    quantity_on_hand: int
    quantity_allocated: int
    lot_number: Optional[str] = None
    expiration_date: Optional[datetime] = None

class StockMovementCreate(BaseModel):
    movement_type: str
    product_id: int
    quantity: int
    from_location_id: Optional[int] = None
    to_location_id: Optional[int] = None
    reference_type: Optional[str] = None
    reference_id: Optional[str] = None
    lot_number: Optional[str] = None
    serial_numbers: Optional[List[str]] = None
    notes: Optional[str] = None

class StockMovementResponse(StockMovementCreate):
    id: int
    created_at: datetime
    status: str

2. API ENDPOINTS (FastAPI)

# api/stock.py
from fastapi import APIRouter, HTTPException, Depends, Query
from sqlalchemy.orm import Session
from datetime import datetime, timedelta
from models import Product, Stock, StockMovement, Location, Warehouse
from schemas import StockMovementCreate, StockMovementResponse

router = APIRouter(prefix="/api/inventory", tags=["inventory"])

# Dependency
def get_db():
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()

# Stock lookup
@router.get("/stock/{product_id}")
async def get_stock(
    product_id: int,
    warehouse_id: int = Query(...),
    db: Session = Depends(get_db)
):
    """Get current stock level for product"""
    stock = db.query(Stock).filter(
        Stock.product_id == product_id,
        Stock.warehouse_id == warehouse_id
    ).all()

    if not stock:
        raise HTTPException(status_code=404, detail="Stock not found")

    return {
        "product_id": product_id,
        "warehouse_id": warehouse_id,
        "locations": [
            {
                "location_code": s.location.code if s.location else "bulk",
                "quantity_on_hand": s.quantity_on_hand,
                "quantity_allocated": s.quantity_allocated,
                "lot_number": s.lot_number,
                "expiration_date": s.expiration_date
            }
            for s in stock
        ]
    }

# Receive goods (приёмка товара)
@router.post("/receiving/create")
async def create_receipt(
    po_number: str,
    warehouse_id: int,
    db: Session = Depends(get_db)
):
    """Create receipt for PO"""
    from models import PurchaseOrder, POLine

    po = db.query(PurchaseOrder).filter_by(po_number=po_number).first()
    if not po:
        raise HTTPException(status_code=404, detail="PO not found")

    # Get input location
    input_location = db.query(Location).filter(
        Location.warehouse_id == warehouse_id,
        Location.location_type == "receiving"
    ).first()

    if not input_location:
        raise HTTPException(status_code=400, detail="No receiving location configured")

    return {
        "receipt_id": f"RCP-{datetime.now().strftime('%Y%m%d%H%M%S')}",
        "po_number": po_number,
        "input_location": input_location.code,
        "lines": [
            {
                "product_sku": line.product.sku,
                "product_name": line.product.name,
                "expected_qty": line.quantity_ordered,
                "needs_tracking": line.product.tracking_type in ['serial', 'lot', 'both']
            }
            for line in po.po_lines
        ]
    }

@router.post("/receiving/receive-item")
async def receive_item(
    receipt_id: str,
    product_id: int,
    quantity: int,
    warehouse_id: int,
    lot_number: Optional[str] = None,
    serial_numbers: Optional[List[str]] = None,
    db: Session = Depends(get_db)
):
    """Record received item"""

    product = db.query(Product).get(product_id)
    if not product:
        raise HTTPException(status_code=404, detail="Product not found")

    # Get input location
    input_location = db.query(Location).filter(
        Location.warehouse_id == warehouse_id,
        Location.location_type == "receiving"
    ).first()

    # Create stock movement
    movement = StockMovement(
        movement_type=MovementType.INBOUND,
        product_id=product_id,
        quantity=quantity,
        to_location_id=input_location.id,
        reference_type="PO",
        reference_id=receipt_id,
        lot_number=lot_number,
        serial_numbers=','.join(serial_numbers) if serial_numbers else None,
        status="completed"
    )

    # Update stock
    stock = db.query(Stock).filter(
        Stock.product_id == product_id,
        Stock.warehouse_id == warehouse_id,
        Stock.location_id == input_location.id
    ).first()

    if not stock:
        stock = Stock(
            product_id=product_id,
            warehouse_id=warehouse_id,
            location_id=input_location.id,
            quantity_on_hand=quantity,
            lot_number=lot_number,
            received_date=datetime.utcnow()
        )
    else:
        stock.quantity_on_hand += quantity

    db.add(movement)
    db.add(stock)
    db.commit()

    return {
        "status": "success",
        "movement_id": movement.id,
        "quantity_received": quantity,
        "location": input_location.code
    }

# Picking (отбор товара)
@router.get("/picking/list/{so_number}")
async def get_pick_list(
    so_number: str,
    warehouse_id: int,
    db: Session = Depends(get_db)
):
    """Get optimized pick list for sales order"""
    from models import SalesOrder, SOLine

    so = db.query(SalesOrder).filter_by(so_number=so_number).first()
    if not so:
        raise HTTPException(status_code=404, detail="Sales order not found")

    # Build pick list sorted by location
    pick_lines = []
    for line in so.so_lines:
        # Find stock with FIFO logic
        stocks = db.query(Stock).filter(
            Stock.product_id == line.product_id,
            Stock.warehouse_id == warehouse_id,
            Stock.quantity_on_hand > 0
        ).order_by(Stock.received_date).all()

        qty_needed = line.quantity_ordered - line.quantity_allocated

        for stock in stocks:
            if qty_needed <= 0:
                break

            qty_to_pick = min(qty_needed, stock.quantity_on_hand)

            pick_lines.append({
                "product_sku": line.product.sku,
                "product_name": line.product.name,
                "quantity": qty_to_pick,
                "from_location": stock.location.code if stock.location else "bulk",
                "lot_number": stock.lot_number,
                "serial_numbers": stock.serial_numbers.split(',') if stock.serial_numbers else [],
                "barcode": line.product.barcode
            })

            qty_needed -= qty_to_pick

    # Sort by location for optimized route
    pick_lines.sort(key=lambda x: x['from_location'])

    return {
        "pick_list_id": f"PL-{so_number}",
        "sales_order": so_number,
        "warehouse": warehouse_id,
        "lines": pick_lines,
        "created_at": datetime.utcnow()
    }

@router.post("/picking/confirm-pick")
async def confirm_pick(
    pick_list_id: str,
    product_id: int,
    quantity_picked: int,
    from_location_id: int,
    db: Session = Depends(get_db)
):
    """Confirm picked item"""

    # Update stock
    stock = db.query(Stock).filter_by(
        product_id=product_id,
        location_id=from_location_id
    ).first()

    if not stock or stock.quantity_on_hand < quantity_picked:
        raise HTTPException(status_code=400, detail="Insufficient stock")

    # Create movement
    movement = StockMovement(
        movement_type=MovementType.OUTBOUND,
        product_id=product_id,
        quantity=quantity_picked,
        from_location_id=from_location_id,
        reference_type="SO",
        reference_id=pick_list_id,
        status="completed"
    )

    stock.quantity_on_hand -= quantity_picked
    stock.quantity_allocated = 0

    db.add(movement)
    db.add(stock)
    db.commit()

    return {
        "status": "success",
        "movement_id": movement.id,
        "quantity_picked": quantity_picked
    }

# Inventory count (инвентаризация)
@router.post("/inventory/start-count")
async def start_count(
    warehouse_id: int,
    location_id: Optional[int] = None,
    db: Session = Depends(get_db)
):
    """Start physical count"""
    from models import InventoryCount

    count = InventoryCount(
        count_number=f"CNT-{datetime.now().strftime('%Y%m%d%H%M%S')}",
        warehouse_id=warehouse_id,
        location_id=location_id,
        status="in_progress"
    )

    db.add(count)
    db.commit()

    return {
        "count_id": count.id,
        "count_number": count.count_number,
        "warehouse_id": warehouse_id,
        "location_id": location_id
    }

@router.post("/inventory/record-count")
async def record_count(
    count_id: int,
    product_id: int,
    counted_quantity: int,
    lot_number: Optional[str] = None,
    db: Session = Depends(get_db)
):
    """Record counted item"""
    from models import InventoryCount, CountLine

    count = db.query(InventoryCount).get(count_id)
    if not count:
        raise HTTPException(status_code=404, detail="Count not found")

    # Get system quantity
    stock = db.query(Stock).filter(
        Stock.product_id == product_id,
        Stock.warehouse_id == count.warehouse_id,
        (Stock.location_id == count.location_id) if count.location_id else True
    ).first()

    system_qty = stock.quantity_on_hand if stock else 0
    variance = counted_quantity - system_qty

    count_line = CountLine(
        count_id=count_id,
        product_id=product_id,
        system_quantity=system_qty,
        counted_quantity=counted_quantity,
        variance=variance,
        variance_percentage=(variance / system_qty * 100) if system_qty > 0 else 0,
        lot_number=lot_number
    )

    db.add(count_line)
    db.commit()

    return {
        "status": "recorded",
        "product_id": product_id,
        "system_qty": system_qty,
        "counted_qty": counted_quantity,
        "variance": variance
    }

@router.post("/inventory/post-count/{count_id}")
async def post_count(count_id: int, db: Session = Depends(get_db)):
    """Post count and create adjustments"""
    from models import InventoryCount, CountLine

    count = db.query(InventoryCount).get(count_id)
    if not count:
        raise HTTPException(status_code=404, detail="Count not found")

    count_lines = db.query(CountLine).filter_by(count_id=count_id).all()

    adjustments = []

    for line in count_lines:
        if line.variance != 0:
            # Create adjustment movement
            if line.variance > 0:
                from_loc = None
                to_loc = count.location_id
                ref = "InventoryAdjustmentGain"
            else:
                from_loc = count.location_id
                to_loc = None
                ref = "InventoryAdjustmentLoss"

            movement = StockMovement(
                movement_type=MovementType.ADJUSTMENT,
                product_id=line.product_id,
                quantity=abs(line.variance),
                from_location_id=from_loc,
                to_location_id=to_loc,
                reference_type="CountVariance",
                reference_id=count.count_number,
                status="completed",
                notes=f"Count variance: {line.variance}"
            )

            # Update stock
            stock = db.query(Stock).filter_by(
                product_id=line.product_id,
                location_id=count.location_id
            ).first()

            if stock:
                stock.quantity_on_hand = line.counted_quantity

            db.add(movement)
            adjustments.append(movement.id)

    count.status = "posted"
    count.posted_at = datetime.utcnow()

    db.commit()

    return {
        "status": "posted",
        "count_number": count.count_number,
        "adjustments_created": len(adjustments),
        "adjustment_ids": adjustments
    }

# Stock movements report
@router.get("/movements/report")
async def get_movements_report(
    warehouse_id: int,
    product_id: Optional[int] = None,
    movement_type: Optional[str] = None,
    days: int = Query(7),
    db: Session = Depends(get_db)
):
    """Get stock movements report"""

    date_from = datetime.utcnow() - timedelta(days=days)

    query = db.query(StockMovement).filter(
        StockMovement.created_at >= date_from
    )

    if product_id:
        query = query.filter(StockMovement.product_id == product_id)

    if movement_type:
        query = query.filter(StockMovement.movement_type == movement_type)

    movements = query.order_by(StockMovement.created_at.desc()).all()

    return {
        "period": f"Last {days} days",
        "total_movements": len(movements),
        "movements": [
            {
                "id": m.id,
                "type": m.movement_type.value,
                "product_sku": m.product.sku,
                "quantity": m.quantity,
                "from_location": m.from_location.code if m.from_location else None,
                "to_location": m.to_location.code if m.to_location else None,
                "reference": f"{m.reference_type}#{m.reference_id}",
                "created_at": m.created_at
            }
            for m in movements
        ]
    }

3. BARCODE SCANNING (LOGIC)

# services/barcode_scanner.py
from typing import Optional, Dict, List
import barcode
from barcode.ean import EAN13
import io

class BarcodeScanner:
    """Обработка отсканированных баркодов"""

    @staticmethod
    def parse_barcode(barcode_str: str, db: Session) -> Dict:
        """
        Identify what was scanned:
        - Product barcode → return product info
        - Location barcode → return location info
        - Order barcode (QR) → return order info
        """

        # Try product first
        product = db.query(Product).filter(
            (Product.barcode == barcode_str) |
            (Product.secondary_barcodes.contains(barcode_str))
        ).first()

        if product:
            return {
                "type": "product",
                "product_id": product.id,
                "sku": product.sku,
                "name": product.name,
                "tracking_required": product.tracking_type in ['serial', 'lot', 'both']
            }

        # Try location
        location = db.query(Location).filter_by(code=barcode_str).first()
        if location:
            return {
                "type": "location",
                "location_id": location.id,
                "code": location.code,
                "warehouse_id": location.warehouse_id
            }

        # Try order (QR code)
        if barcode_str.startswith("SO"):
            order = db.query(SalesOrder).filter_by(so_number=barcode_str).first()
            if order:
                return {
                    "type": "order",
                    "order_id": order.id,
                    "order_number": order.so_number,
                    "status": order.status
                }

        if barcode_str.startswith("PO"):
            po = db.query(PurchaseOrder).filter_by(po_number=barcode_str).first()
            if po:
                return {
                    "type": "po",
                    "po_id": po.id,
                    "po_number": po.po_number
                }

        return {"type": "unknown", "barcode": barcode_str}

    @staticmethod
    def generate_product_barcode(product_id: int) -> bytes:
        """Generate EAN-13 barcode for product"""
        ean = EAN13(product_id)
        return ean.save(None)

    @staticmethod
    def generate_location_barcode(location_code: str) -> bytes:
        """Generate Code-128 barcode for location"""
        from barcode import Code128
        code = Code128(location_code, charset='utf-8')
        return code.save(None)

# Mobile receiving workflow
class ReceivingWorkflow:
    """
    Workflow для приёмки с баркодом:
    1. Сканировать PO barcode
    2. For each line in PO:
       - Scan product barcode
       - Input quantity
       - If required: input serials/lots
       - Confirm location
    """

    def __init__(self, db: Session):
        self.db = db
        self.current_po = None
        self.current_receipt = None
        self.received_items = []

    def start_receipt(self, po_barcode: str):
        """Начать приёмку"""
        po = self.db.query(PurchaseOrder).filter_by(po_number=po_barcode).first()
        if not po:
            raise ValueError(f"PO {po_barcode} not found")

        self.current_po = po
        self.current_receipt = {
            "po_number": po.po_number,
            "items": []
        }

        return {
            "status": "started",
            "po_number": po.po_number,
            "expected_items": len(po.po_lines)
        }

    def scan_product(self, product_barcode: str):
        """Сканировать товар"""
        product = self.db.query(Product).filter_by(barcode=product_barcode).first()
        if not product:
            raise ValueError(f"Product {product_barcode} not found")

        # Check if this product is in current PO
        po_line = self.db.query(POLine).filter(
            POLine.po_id == self.current_po.id,
            POLine.product_id == product.id
        ).first()

        if not po_line:
            raise ValueError(f"Product {product.sku} not in PO {self.current_po.po_number}")

        return {
            "product_id": product.id,
            "sku": product.sku,
            "name": product.name,
            "expected_quantity": po_line.quantity_ordered,
            "already_received": po_line.quantity_received,
            "remaining": po_line.quantity_ordered - po_line.quantity_received,
            "needs_serial": product.requires_serial,
            "needs_lot": product.requires_lot
        }

    def confirm_item(
        self,
        product_id: int,
        quantity: int,
        location_id: int,
        serials: Optional[List[str]] = None,
        lot_number: Optional[str] = None
    ):
        """Подтвердить товар"""

        if not self.current_po:
            raise ValueError("No receipt in progress")

        # Validation
        if not serials and self.db.query(Product).get(product_id).requires_serial:
            raise ValueError("Serial numbers required for this product")

        if not lot_number and self.db.query(Product).get(product_id).requires_lot:
            raise ValueError("Lot number required for this product")

        # Create movement and update stock
        movement = StockMovement(
            movement_type=MovementType.INBOUND,
            product_id=product_id,
            quantity=quantity,
            to_location_id=location_id,
            reference_type="PO",
            reference_id=self.current_po.po_number,
            lot_number=lot_number,
            serial_numbers=','.join(serials) if serials else None,
            status="completed"
        )

        self.db.add(movement)
        self.db.commit()

        self.received_items.append({
            "product_id": product_id,
            "quantity": quantity,
            "location_id": location_id
        })

        return {
            "status": "confirmed",
            "items_received": len(self.received_items)
        }

    def complete_receipt(self):
        """Завершить приёмку"""
        return {
            "status": "completed",
            "po_number": self.current_po.po_number,
            "items_received": len(self.received_items),
            "receipt_id": self.current_receipt
        }

4. BATCH OPERATIONS

# services/batch_operations.py

class WavePicking:
    """Picking wave для группировки заказов"""

    @staticmethod
    def create_wave(
        warehouse_id: int,
        so_numbers: List[str],
        db: Session
    ):
        """Create optimized picking wave"""

        orders = db.query(SalesOrder).filter(
            SalesOrder.so_number.in_(so_numbers),
            SalesOrder.warehouse_id == warehouse_id
        ).all()

        if not orders:
            raise ValueError("No orders found")

        # Collect all lines
        all_lines = []
        for order in orders:
            for line in order.so_lines:
                all_lines.append({
                    "so_number": order.so_number,
                    "product_id": line.product_id,
                    "quantity": line.quantity_ordered,
                    "sku": line.product.sku
                })

        # Sort by location (optimize picking route)
        all_lines.sort(key=lambda x: f"product_{x['product_id']}")

        return {
            "wave_id": f"WAVE-{len(so_numbers)}-{datetime.now().strftime('%Y%m%d%H%M')}",
            "orders": len(orders),
            "lines": all_lines
        }

class InventoryAdjustment:
    """Batch adjustment operations"""

    @staticmethod
    def create_adjustment(
        product_ids: List[int],
        warehouse_id: int,
        reason: str,
        quantities: Dict[int, int],  # product_id -> adjustment qty
        db: Session
    ):
        """Create batch inventory adjustment"""

        adjustments = []

        for product_id in product_ids:
            adj_qty = quantities.get(product_id, 0)
            if adj_qty == 0:
                continue

            movement = StockMovement(
                movement_type=MovementType.ADJUSTMENT,
                product_id=product_id,
                quantity=abs(adj_qty),
                reference_type="Adjustment",
                reference_id=f"ADJ-{datetime.now().isoformat()}",
                status="completed",
                notes=reason
            )

            db.add(movement)
            adjustments.append(product_id)

        db.commit()

        return {
            "adjustments_created": len(adjustments),
            "reason": reason
        }

5. REAL-TIME NOTIFICATIONS

# services/notifications.py
from fastapi import WebSocket
import asyncio
import json

class StockNotificationManager:
    """Real-time notifications for stock changes"""

    def __init__(self):
        self.active_connections: List[WebSocket] = []

    async def connect(self, websocket: WebSocket):
        await websocket.accept()
        self.active_connections.append(websocket)

    def disconnect(self, websocket: WebSocket):
        self.active_connections.remove(websocket)

    async def broadcast_stock_change(self, message: dict):
        """Broadcast stock change to all connected clients"""
        for connection in self.active_connections:
            try:
                await connection.send_json(message)
            except Exception as e:
                print(f"Error sending message: {e}")

    async def notify_low_stock(self, product_id: int, quantity: int, reorder_point: int):
        """Notify when stock drops below reorder point"""
        if quantity <= reorder_point:
            await self.broadcast_stock_change({
                "event_type": "low_stock_alert",
                "product_id": product_id,
                "current_quantity": quantity,
                "reorder_point": reorder_point,
                "timestamp": datetime.utcnow().isoformat()
            })

# WebSocket endpoint
manager = StockNotificationManager()

@router.websocket("/ws/inventory/{warehouse_id}")
async def websocket_inventory(websocket: WebSocket, warehouse_id: int):
    await manager.connect(websocket)
    try:
        while True:
            # Keep connection alive
            await asyncio.sleep(1)
    except Exception:
        manager.disconnect(websocket)

Используемые технологии

Database: PostgreSQL / SQLite
ORM: SQLAlchemy
API: FastAPI
Validation: Pydantic
Async: asyncio
Barcodes: python-barcode, opencv (для scanning)
Mobile: React Native / Flutter (интеграция с API)

Эти примеры можно использовать как основу для быстрого прототипирования WMS системы.