Практические примеры кода для быстрого старта
# models/base.py
from sqlalchemy import Column, Integer, String, Float, DateTime, Enum
from sqlalchemy.ext.declarative import declarative_base
from datetime import datetime
import enum
Base = declarative_base()
class Warehouse(Base):
__tablename__ = "warehouses"
id = Column(Integer, primary_key=True)
code = Column(String(20), unique=True, nullable=False)
name = Column(String(255), nullable=False)
address = Column(String(500))
created_at = Column(DateTime, default=datetime.utcnow)
class Location(Base):
__tablename__ = "locations"
id = Column(Integer, primary_key=True)
warehouse_id = Column(Integer, ForeignKey("warehouses.id"), nullable=False)
code = Column(String(50), nullable=False) # A-01-02-03
aisle = Column(String(10))
shelf = Column(String(10))
bin = Column(String(10))
location_type = Column(String(20)) # storage, receiving, shipping, scrap
capacity_units = Column(Float)
current_usage = Column(Float, default=0)
__table_args__ = (
UniqueConstraint('warehouse_id', 'code'),
)
class Product(Base):
__tablename__ = "products"
id = Column(Integer, primary_key=True)
sku = Column(String(50), unique=True, nullable=False)
name = Column(String(255), nullable=False)
description = Column(String(1000))
barcode = Column(String(50), unique=True)
secondary_barcodes = Column(String(500)) # comma-separated
weight = Column(Float) # kg
dimensions = Column(String(100)) # LxWxH in cm
tracking_type = Column(String(20), default='none') # none, serial, lot, both
requires_serial = Column(Boolean, default=False)
requires_lot = Column(Boolean, default=False)
reorder_point = Column(Integer, default=0)
unit_cost = Column(Float)
class Stock(Base):
__tablename__ = "stock"
id = Column(Integer, primary_key=True)
product_id = Column(Integer, ForeignKey("products.id"), nullable=False)
warehouse_id = Column(Integer, ForeignKey("warehouses.id"), nullable=False)
location_id = Column(Integer, ForeignKey("locations.id"))
quantity_on_hand = Column(Integer, default=0)
quantity_allocated = Column(Integer, default=0)
quantity_on_order = Column(Integer, default=0)
# Tracking info
lot_number = Column(String(50))
serial_numbers = Column(String(2000)) # JSON list
expiration_date = Column(Date)
received_date = Column(DateTime)
last_movement_date = Column(DateTime, default=datetime.utcnow)
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
class MovementType(enum.Enum):
INBOUND = "inbound"
OUTBOUND = "outbound"
ADJUSTMENT = "adjustment"
TRANSFER = "transfer"
PRODUCTION = "production"
class StockMovement(Base):
__tablename__ = "stock_movements"
id = Column(Integer, primary_key=True)
movement_type = Column(Enum(MovementType), nullable=False)
product_id = Column(Integer, ForeignKey("products.id"), nullable=False)
quantity = Column(Integer, nullable=False)
from_location_id = Column(Integer, ForeignKey("locations.id"))
to_location_id = Column(Integer, ForeignKey("locations.id"))
# Tracking
lot_number = Column(String(50))
serial_numbers = Column(String(2000)) # JSON
# Reference document
reference_type = Column(String(20)) # PO, SO, Assembly, Manual
reference_id = Column(String(50)) # PO#123, SO#456
# User & timestamp
created_by_id = Column(Integer, ForeignKey("users.id"))
created_at = Column(DateTime, default=datetime.utcnow)
# Status
status = Column(String(20), default='pending') # pending, completed, cancelled
notes = Column(String(500))
__table_args__ = (
Index('idx_product_date', 'product_id', 'created_at'),
Index('idx_reference', 'reference_type', 'reference_id'),
)
class PurchaseOrder(Base):
__tablename__ = "purchase_orders"
id = Column(Integer, primary_key=True)
po_number = Column(String(50), unique=True, nullable=False)
supplier_id = Column(Integer, ForeignKey("suppliers.id"), nullable=False)
warehouse_id = Column(Integer, ForeignKey("warehouses.id"), nullable=False)
order_date = Column(DateTime, default=datetime.utcnow)
expected_delivery = Column(DateTime)
status = Column(String(20), default='draft') # draft, issued, received, closed
total_amount = Column(Float)
notes = Column(String(500))
class POLine(Base):
__tablename__ = "po_lines"
id = Column(Integer, primary_key=True)
po_id = Column(Integer, ForeignKey("purchase_orders.id"), nullable=False)
product_id = Column(Integer, ForeignKey("products.id"), nullable=False)
quantity_ordered = Column(Integer, nullable=False)
quantity_received = Column(Integer, default=0)
unit_price = Column(Float)
line_status = Column(String(20), default='pending') # pending, partially_received, received
class SalesOrder(Base):
__tablename__ = "sales_orders"
id = Column(Integer, primary_key=True)
so_number = Column(String(50), unique=True, nullable=False)
customer_id = Column(Integer, ForeignKey("customers.id"), nullable=False)
warehouse_id = Column(Integer, ForeignKey("warehouses.id"), nullable=False)
order_date = Column(DateTime, default=datetime.utcnow)
promised_delivery = Column(DateTime)
status = Column(String(20), default='draft') # draft, allocated, picked, packed, shipped
total_amount = Column(Float)
class SOLine(Base):
__tablename__ = "so_lines"
id = Column(Integer, primary_key=True)
so_id = Column(Integer, ForeignKey("sales_orders.id"), nullable=False)
product_id = Column(Integer, ForeignKey("products.id"), nullable=False)
quantity_ordered = Column(Integer, nullable=False)
quantity_allocated = Column(Integer, default=0)
quantity_picked = Column(Integer, default=0)
quantity_shipped = Column(Integer, default=0)
unit_price = Column(Float)
class InventoryCount(Base):
__tablename__ = "inventory_counts"
id = Column(Integer, primary_key=True)
count_number = Column(String(50), unique=True, nullable=False)
warehouse_id = Column(Integer, ForeignKey("warehouses.id"), nullable=False)
location_id = Column(Integer, ForeignKey("locations.id")) # NULL = entire warehouse
count_date = Column(DateTime, default=datetime.utcnow)
count_by_id = Column(Integer, ForeignKey("users.id"))
status = Column(String(20), default='in_progress') # in_progress, completed, posted
posted_at = Column(DateTime)
posted_by_id = Column(Integer, ForeignKey("users.id"))
class CountLine(Base):
__tablename__ = "count_lines"
id = Column(Integer, primary_key=True)
count_id = Column(Integer, ForeignKey("inventory_counts.id"), nullable=False)
product_id = Column(Integer, ForeignKey("products.id"), nullable=False)
system_quantity = Column(Integer)
counted_quantity = Column(Integer)
variance = Column(Integer)
variance_percentage = Column(Float)
lot_number = Column(String(50))
serial_numbers = Column(String(2000))
notes = Column(String(500))
# schemas/stock.py
from pydantic import BaseModel, Field
from typing import Optional, List
from datetime import datetime
class LocationCreate(BaseModel):
warehouse_id: int
code: str
aisle: Optional[str] = None
shelf: Optional[str] = None
bin: Optional[str] = None
location_type: str = "storage"
capacity_units: Optional[float] = None
class LocationResponse(LocationCreate):
id: int
current_usage: float
created_at: datetime
class ProductCreate(BaseModel):
sku: str
name: str
barcode: Optional[str] = None
tracking_type: str = "none"
unit_cost: float
class StockResponse(BaseModel):
product_id: int
location_id: Optional[int] = None
quantity_on_hand: int
quantity_allocated: int
lot_number: Optional[str] = None
expiration_date: Optional[datetime] = None
class StockMovementCreate(BaseModel):
movement_type: str
product_id: int
quantity: int
from_location_id: Optional[int] = None
to_location_id: Optional[int] = None
reference_type: Optional[str] = None
reference_id: Optional[str] = None
lot_number: Optional[str] = None
serial_numbers: Optional[List[str]] = None
notes: Optional[str] = None
class StockMovementResponse(StockMovementCreate):
id: int
created_at: datetime
status: str
# api/stock.py
from fastapi import APIRouter, HTTPException, Depends, Query
from sqlalchemy.orm import Session
from datetime import datetime, timedelta
from models import Product, Stock, StockMovement, Location, Warehouse
from schemas import StockMovementCreate, StockMovementResponse
router = APIRouter(prefix="/api/inventory", tags=["inventory"])
# Dependency
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
# Stock lookup
@router.get("/stock/{product_id}")
async def get_stock(
product_id: int,
warehouse_id: int = Query(...),
db: Session = Depends(get_db)
):
"""Get current stock level for product"""
stock = db.query(Stock).filter(
Stock.product_id == product_id,
Stock.warehouse_id == warehouse_id
).all()
if not stock:
raise HTTPException(status_code=404, detail="Stock not found")
return {
"product_id": product_id,
"warehouse_id": warehouse_id,
"locations": [
{
"location_code": s.location.code if s.location else "bulk",
"quantity_on_hand": s.quantity_on_hand,
"quantity_allocated": s.quantity_allocated,
"lot_number": s.lot_number,
"expiration_date": s.expiration_date
}
for s in stock
]
}
# Receive goods (приёмка товара)
@router.post("/receiving/create")
async def create_receipt(
po_number: str,
warehouse_id: int,
db: Session = Depends(get_db)
):
"""Create receipt for PO"""
from models import PurchaseOrder, POLine
po = db.query(PurchaseOrder).filter_by(po_number=po_number).first()
if not po:
raise HTTPException(status_code=404, detail="PO not found")
# Get input location
input_location = db.query(Location).filter(
Location.warehouse_id == warehouse_id,
Location.location_type == "receiving"
).first()
if not input_location:
raise HTTPException(status_code=400, detail="No receiving location configured")
return {
"receipt_id": f"RCP-{datetime.now().strftime('%Y%m%d%H%M%S')}",
"po_number": po_number,
"input_location": input_location.code,
"lines": [
{
"product_sku": line.product.sku,
"product_name": line.product.name,
"expected_qty": line.quantity_ordered,
"needs_tracking": line.product.tracking_type in ['serial', 'lot', 'both']
}
for line in po.po_lines
]
}
@router.post("/receiving/receive-item")
async def receive_item(
receipt_id: str,
product_id: int,
quantity: int,
warehouse_id: int,
lot_number: Optional[str] = None,
serial_numbers: Optional[List[str]] = None,
db: Session = Depends(get_db)
):
"""Record received item"""
product = db.query(Product).get(product_id)
if not product:
raise HTTPException(status_code=404, detail="Product not found")
# Get input location
input_location = db.query(Location).filter(
Location.warehouse_id == warehouse_id,
Location.location_type == "receiving"
).first()
# Create stock movement
movement = StockMovement(
movement_type=MovementType.INBOUND,
product_id=product_id,
quantity=quantity,
to_location_id=input_location.id,
reference_type="PO",
reference_id=receipt_id,
lot_number=lot_number,
serial_numbers=','.join(serial_numbers) if serial_numbers else None,
status="completed"
)
# Update stock
stock = db.query(Stock).filter(
Stock.product_id == product_id,
Stock.warehouse_id == warehouse_id,
Stock.location_id == input_location.id
).first()
if not stock:
stock = Stock(
product_id=product_id,
warehouse_id=warehouse_id,
location_id=input_location.id,
quantity_on_hand=quantity,
lot_number=lot_number,
received_date=datetime.utcnow()
)
else:
stock.quantity_on_hand += quantity
db.add(movement)
db.add(stock)
db.commit()
return {
"status": "success",
"movement_id": movement.id,
"quantity_received": quantity,
"location": input_location.code
}
# Picking (отбор товара)
@router.get("/picking/list/{so_number}")
async def get_pick_list(
so_number: str,
warehouse_id: int,
db: Session = Depends(get_db)
):
"""Get optimized pick list for sales order"""
from models import SalesOrder, SOLine
so = db.query(SalesOrder).filter_by(so_number=so_number).first()
if not so:
raise HTTPException(status_code=404, detail="Sales order not found")
# Build pick list sorted by location
pick_lines = []
for line in so.so_lines:
# Find stock with FIFO logic
stocks = db.query(Stock).filter(
Stock.product_id == line.product_id,
Stock.warehouse_id == warehouse_id,
Stock.quantity_on_hand > 0
).order_by(Stock.received_date).all()
qty_needed = line.quantity_ordered - line.quantity_allocated
for stock in stocks:
if qty_needed <= 0:
break
qty_to_pick = min(qty_needed, stock.quantity_on_hand)
pick_lines.append({
"product_sku": line.product.sku,
"product_name": line.product.name,
"quantity": qty_to_pick,
"from_location": stock.location.code if stock.location else "bulk",
"lot_number": stock.lot_number,
"serial_numbers": stock.serial_numbers.split(',') if stock.serial_numbers else [],
"barcode": line.product.barcode
})
qty_needed -= qty_to_pick
# Sort by location for optimized route
pick_lines.sort(key=lambda x: x['from_location'])
return {
"pick_list_id": f"PL-{so_number}",
"sales_order": so_number,
"warehouse": warehouse_id,
"lines": pick_lines,
"created_at": datetime.utcnow()
}
@router.post("/picking/confirm-pick")
async def confirm_pick(
pick_list_id: str,
product_id: int,
quantity_picked: int,
from_location_id: int,
db: Session = Depends(get_db)
):
"""Confirm picked item"""
# Update stock
stock = db.query(Stock).filter_by(
product_id=product_id,
location_id=from_location_id
).first()
if not stock or stock.quantity_on_hand < quantity_picked:
raise HTTPException(status_code=400, detail="Insufficient stock")
# Create movement
movement = StockMovement(
movement_type=MovementType.OUTBOUND,
product_id=product_id,
quantity=quantity_picked,
from_location_id=from_location_id,
reference_type="SO",
reference_id=pick_list_id,
status="completed"
)
stock.quantity_on_hand -= quantity_picked
stock.quantity_allocated = 0
db.add(movement)
db.add(stock)
db.commit()
return {
"status": "success",
"movement_id": movement.id,
"quantity_picked": quantity_picked
}
# Inventory count (инвентаризация)
@router.post("/inventory/start-count")
async def start_count(
warehouse_id: int,
location_id: Optional[int] = None,
db: Session = Depends(get_db)
):
"""Start physical count"""
from models import InventoryCount
count = InventoryCount(
count_number=f"CNT-{datetime.now().strftime('%Y%m%d%H%M%S')}",
warehouse_id=warehouse_id,
location_id=location_id,
status="in_progress"
)
db.add(count)
db.commit()
return {
"count_id": count.id,
"count_number": count.count_number,
"warehouse_id": warehouse_id,
"location_id": location_id
}
@router.post("/inventory/record-count")
async def record_count(
count_id: int,
product_id: int,
counted_quantity: int,
lot_number: Optional[str] = None,
db: Session = Depends(get_db)
):
"""Record counted item"""
from models import InventoryCount, CountLine
count = db.query(InventoryCount).get(count_id)
if not count:
raise HTTPException(status_code=404, detail="Count not found")
# Get system quantity
stock = db.query(Stock).filter(
Stock.product_id == product_id,
Stock.warehouse_id == count.warehouse_id,
(Stock.location_id == count.location_id) if count.location_id else True
).first()
system_qty = stock.quantity_on_hand if stock else 0
variance = counted_quantity - system_qty
count_line = CountLine(
count_id=count_id,
product_id=product_id,
system_quantity=system_qty,
counted_quantity=counted_quantity,
variance=variance,
variance_percentage=(variance / system_qty * 100) if system_qty > 0 else 0,
lot_number=lot_number
)
db.add(count_line)
db.commit()
return {
"status": "recorded",
"product_id": product_id,
"system_qty": system_qty,
"counted_qty": counted_quantity,
"variance": variance
}
@router.post("/inventory/post-count/{count_id}")
async def post_count(count_id: int, db: Session = Depends(get_db)):
"""Post count and create adjustments"""
from models import InventoryCount, CountLine
count = db.query(InventoryCount).get(count_id)
if not count:
raise HTTPException(status_code=404, detail="Count not found")
count_lines = db.query(CountLine).filter_by(count_id=count_id).all()
adjustments = []
for line in count_lines:
if line.variance != 0:
# Create adjustment movement
if line.variance > 0:
from_loc = None
to_loc = count.location_id
ref = "InventoryAdjustmentGain"
else:
from_loc = count.location_id
to_loc = None
ref = "InventoryAdjustmentLoss"
movement = StockMovement(
movement_type=MovementType.ADJUSTMENT,
product_id=line.product_id,
quantity=abs(line.variance),
from_location_id=from_loc,
to_location_id=to_loc,
reference_type="CountVariance",
reference_id=count.count_number,
status="completed",
notes=f"Count variance: {line.variance}"
)
# Update stock
stock = db.query(Stock).filter_by(
product_id=line.product_id,
location_id=count.location_id
).first()
if stock:
stock.quantity_on_hand = line.counted_quantity
db.add(movement)
adjustments.append(movement.id)
count.status = "posted"
count.posted_at = datetime.utcnow()
db.commit()
return {
"status": "posted",
"count_number": count.count_number,
"adjustments_created": len(adjustments),
"adjustment_ids": adjustments
}
# Stock movements report
@router.get("/movements/report")
async def get_movements_report(
warehouse_id: int,
product_id: Optional[int] = None,
movement_type: Optional[str] = None,
days: int = Query(7),
db: Session = Depends(get_db)
):
"""Get stock movements report"""
date_from = datetime.utcnow() - timedelta(days=days)
query = db.query(StockMovement).filter(
StockMovement.created_at >= date_from
)
if product_id:
query = query.filter(StockMovement.product_id == product_id)
if movement_type:
query = query.filter(StockMovement.movement_type == movement_type)
movements = query.order_by(StockMovement.created_at.desc()).all()
return {
"period": f"Last {days} days",
"total_movements": len(movements),
"movements": [
{
"id": m.id,
"type": m.movement_type.value,
"product_sku": m.product.sku,
"quantity": m.quantity,
"from_location": m.from_location.code if m.from_location else None,
"to_location": m.to_location.code if m.to_location else None,
"reference": f"{m.reference_type}#{m.reference_id}",
"created_at": m.created_at
}
for m in movements
]
}
# services/barcode_scanner.py
from typing import Optional, Dict, List
import barcode
from barcode.ean import EAN13
import io
class BarcodeScanner:
"""Обработка отсканированных баркодов"""
@staticmethod
def parse_barcode(barcode_str: str, db: Session) -> Dict:
"""
Identify what was scanned:
- Product barcode → return product info
- Location barcode → return location info
- Order barcode (QR) → return order info
"""
# Try product first
product = db.query(Product).filter(
(Product.barcode == barcode_str) |
(Product.secondary_barcodes.contains(barcode_str))
).first()
if product:
return {
"type": "product",
"product_id": product.id,
"sku": product.sku,
"name": product.name,
"tracking_required": product.tracking_type in ['serial', 'lot', 'both']
}
# Try location
location = db.query(Location).filter_by(code=barcode_str).first()
if location:
return {
"type": "location",
"location_id": location.id,
"code": location.code,
"warehouse_id": location.warehouse_id
}
# Try order (QR code)
if barcode_str.startswith("SO"):
order = db.query(SalesOrder).filter_by(so_number=barcode_str).first()
if order:
return {
"type": "order",
"order_id": order.id,
"order_number": order.so_number,
"status": order.status
}
if barcode_str.startswith("PO"):
po = db.query(PurchaseOrder).filter_by(po_number=barcode_str).first()
if po:
return {
"type": "po",
"po_id": po.id,
"po_number": po.po_number
}
return {"type": "unknown", "barcode": barcode_str}
@staticmethod
def generate_product_barcode(product_id: int) -> bytes:
"""Generate EAN-13 barcode for product"""
ean = EAN13(product_id)
return ean.save(None)
@staticmethod
def generate_location_barcode(location_code: str) -> bytes:
"""Generate Code-128 barcode for location"""
from barcode import Code128
code = Code128(location_code, charset='utf-8')
return code.save(None)
# Mobile receiving workflow
class ReceivingWorkflow:
"""
Workflow для приёмки с баркодом:
1. Сканировать PO barcode
2. For each line in PO:
- Scan product barcode
- Input quantity
- If required: input serials/lots
- Confirm location
"""
def __init__(self, db: Session):
self.db = db
self.current_po = None
self.current_receipt = None
self.received_items = []
def start_receipt(self, po_barcode: str):
"""Начать приёмку"""
po = self.db.query(PurchaseOrder).filter_by(po_number=po_barcode).first()
if not po:
raise ValueError(f"PO {po_barcode} not found")
self.current_po = po
self.current_receipt = {
"po_number": po.po_number,
"items": []
}
return {
"status": "started",
"po_number": po.po_number,
"expected_items": len(po.po_lines)
}
def scan_product(self, product_barcode: str):
"""Сканировать товар"""
product = self.db.query(Product).filter_by(barcode=product_barcode).first()
if not product:
raise ValueError(f"Product {product_barcode} not found")
# Check if this product is in current PO
po_line = self.db.query(POLine).filter(
POLine.po_id == self.current_po.id,
POLine.product_id == product.id
).first()
if not po_line:
raise ValueError(f"Product {product.sku} not in PO {self.current_po.po_number}")
return {
"product_id": product.id,
"sku": product.sku,
"name": product.name,
"expected_quantity": po_line.quantity_ordered,
"already_received": po_line.quantity_received,
"remaining": po_line.quantity_ordered - po_line.quantity_received,
"needs_serial": product.requires_serial,
"needs_lot": product.requires_lot
}
def confirm_item(
self,
product_id: int,
quantity: int,
location_id: int,
serials: Optional[List[str]] = None,
lot_number: Optional[str] = None
):
"""Подтвердить товар"""
if not self.current_po:
raise ValueError("No receipt in progress")
# Validation
if not serials and self.db.query(Product).get(product_id).requires_serial:
raise ValueError("Serial numbers required for this product")
if not lot_number and self.db.query(Product).get(product_id).requires_lot:
raise ValueError("Lot number required for this product")
# Create movement and update stock
movement = StockMovement(
movement_type=MovementType.INBOUND,
product_id=product_id,
quantity=quantity,
to_location_id=location_id,
reference_type="PO",
reference_id=self.current_po.po_number,
lot_number=lot_number,
serial_numbers=','.join(serials) if serials else None,
status="completed"
)
self.db.add(movement)
self.db.commit()
self.received_items.append({
"product_id": product_id,
"quantity": quantity,
"location_id": location_id
})
return {
"status": "confirmed",
"items_received": len(self.received_items)
}
def complete_receipt(self):
"""Завершить приёмку"""
return {
"status": "completed",
"po_number": self.current_po.po_number,
"items_received": len(self.received_items),
"receipt_id": self.current_receipt
}
# services/batch_operations.py
class WavePicking:
"""Picking wave для группировки заказов"""
@staticmethod
def create_wave(
warehouse_id: int,
so_numbers: List[str],
db: Session
):
"""Create optimized picking wave"""
orders = db.query(SalesOrder).filter(
SalesOrder.so_number.in_(so_numbers),
SalesOrder.warehouse_id == warehouse_id
).all()
if not orders:
raise ValueError("No orders found")
# Collect all lines
all_lines = []
for order in orders:
for line in order.so_lines:
all_lines.append({
"so_number": order.so_number,
"product_id": line.product_id,
"quantity": line.quantity_ordered,
"sku": line.product.sku
})
# Sort by location (optimize picking route)
all_lines.sort(key=lambda x: f"product_{x['product_id']}")
return {
"wave_id": f"WAVE-{len(so_numbers)}-{datetime.now().strftime('%Y%m%d%H%M')}",
"orders": len(orders),
"lines": all_lines
}
class InventoryAdjustment:
"""Batch adjustment operations"""
@staticmethod
def create_adjustment(
product_ids: List[int],
warehouse_id: int,
reason: str,
quantities: Dict[int, int], # product_id -> adjustment qty
db: Session
):
"""Create batch inventory adjustment"""
adjustments = []
for product_id in product_ids:
adj_qty = quantities.get(product_id, 0)
if adj_qty == 0:
continue
movement = StockMovement(
movement_type=MovementType.ADJUSTMENT,
product_id=product_id,
quantity=abs(adj_qty),
reference_type="Adjustment",
reference_id=f"ADJ-{datetime.now().isoformat()}",
status="completed",
notes=reason
)
db.add(movement)
adjustments.append(product_id)
db.commit()
return {
"adjustments_created": len(adjustments),
"reason": reason
}
# services/notifications.py
from fastapi import WebSocket
import asyncio
import json
class StockNotificationManager:
"""Real-time notifications for stock changes"""
def __init__(self):
self.active_connections: List[WebSocket] = []
async def connect(self, websocket: WebSocket):
await websocket.accept()
self.active_connections.append(websocket)
def disconnect(self, websocket: WebSocket):
self.active_connections.remove(websocket)
async def broadcast_stock_change(self, message: dict):
"""Broadcast stock change to all connected clients"""
for connection in self.active_connections:
try:
await connection.send_json(message)
except Exception as e:
print(f"Error sending message: {e}")
async def notify_low_stock(self, product_id: int, quantity: int, reorder_point: int):
"""Notify when stock drops below reorder point"""
if quantity <= reorder_point:
await self.broadcast_stock_change({
"event_type": "low_stock_alert",
"product_id": product_id,
"current_quantity": quantity,
"reorder_point": reorder_point,
"timestamp": datetime.utcnow().isoformat()
})
# WebSocket endpoint
manager = StockNotificationManager()
@router.websocket("/ws/inventory/{warehouse_id}")
async def websocket_inventory(websocket: WebSocket, warehouse_id: int):
await manager.connect(websocket)
try:
while True:
# Keep connection alive
await asyncio.sleep(1)
except Exception:
manager.disconnect(websocket)
Database: PostgreSQL / SQLite
ORM: SQLAlchemy
API: FastAPI
Validation: Pydantic
Async: asyncio
Barcodes: python-barcode, opencv (для scanning)
Mobile: React Native / Flutter (интеграция с API)
Эти примеры можно использовать как основу для быстрого прототипирования WMS системы.