# Backend Implementation Strategy ## Overview This document outlines the FastAPI backend for the energy trading system UI. The backend serves data, executes strategies, runs backtests, and provides real-time updates via WebSockets. **Data Source**: `~/energy-test-data/data/processed/` --- ## Architecture ``` ┌──────────────────────────────────────────────────────────────┐ │ FastAPI Application │ ├──────────────────────────────────────────────────────────────┤ │ │ │ ┌─────────────┬─────────────┬─────────────┬──────────────┐ │ │ │ API │ Services │ Tasks │ WebSocket │ │ │ │ Routes │ Layer │ (Celery) │ Manager │ │ │ └─────────────┴─────────────┴─────────────┴──────────────┘ │ │ ┌──────────┐ │ │ │ Data │ │ │ │ Cache │ │ │ └──────────┘ │ └──────────────────────────────────────────────────────────────┘ │ ▼ ┌──────────────────────────────────────────────────────────────┐ │ Core Trading Engine (Imported) │ │ - Fundamental Strategy │ │ - Technical Analysis │ │ - ML Models (Price Prediction, RL Battery) │ │ - Backtesting Engine │ └──────────────────────────────────────────────────────────────┘ │ ▼ ┌──────────────────────────────────────────────────────────────┐ │ Data Source │ │ ~/energy-test-data/data/processed/*.parquet │ └──────────────────────────────────────────────────────────────┘ ``` --- ## Project Structure ``` backend/ ├── app/ │ ├── __init__.py │ ├── main.py # FastAPI app entry │ ├── config.py # Configuration management │ │ │ ├── api/ │ │ ├── __init__.py │ │ ├── routes/ │ │ │ ├── __init__.py │ │ │ ├── dashboard.py # Dashboard data endpoints │ │ │ ├── backtest.py # Backtest execution │ │ │ ├── models.py # ML model endpoints │ │ │ ├── trading.py # Trading control │ │ │ └── settings.py # Configuration management │ │ └── websocket.py # WebSocket connection manager │ │ │ ├── services/ │ │ ├── __init__.py │ │ ├── data_service.py # Data loading and caching │ │ ├── strategy_service.py # Strategy execution │ │ ├── ml_service.py # ML model management │ │ ├── trading_service.py # Trading operations │ │ └── alert_service.py # Alert management │ │ │ ├── tasks/ │ │ ├── __init__.py │ │ ├── backtest_tasks.py # Async backtest execution │ │ ├── training_tasks.py # ML model training │ │ └── monitoring_tasks.py # Real-time data updates │ │ │ ├── ml/ # ML models and training │ │ ├── __init__.py │ │ ├── features/ │ │ │ ├── __init__.py │ │ │ ├── lag_features.py │ │ │ ├── rolling_features.py │ │ │ ├── time_features.py │ │ │ ├── regional_features.py │ │ │ └── battery_features.py │ │ │ │ │ ├── price_prediction/ │ │ │ ├── __init__.py │ │ │ ├── model.py │ │ │ ├── trainer.py │ │ │ └── predictor.py │ │ │ │ │ ├── rl_battery/ │ │ │ ├── __init__.py │ │ │ ├── environment.py │ │ │ ├── agent.py │ │ │ ├── trainer.py │ │ │ └── policy.py │ │ │ │ │ ├── model_management/ │ │ │ ├── __init__.py │ │ │ ├── registry.py │ │ │ ├── persistence.py │ │ │ ├── versioning.py │ │ │ └── comparison.py │ │ │ │ │ ├── evaluation/ │ │ │ ├── __init__.py │ │ │ ├── metrics.py │ │ │ ├── backtest_evaluator.py │ │ │ └── reports.py │ │ │ │ │ ├── training/ │ │ │ ├── __init__.py │ │ │ └── cli.py │ │ │ │ │ └── utils/ │ │ ├── __init__.py │ │ ├── data_split.py │ │ ├── config.py │ │ └── evaluation.py │ │ │ ├── models/ │ │ ├── __init__.py │ │ ├── schemas.py # Pydantic models │ │ └── enums.py # Enumerations │ │ │ ├── core/ │ │ ├── __init__.py │ │ └── constants.py # Constants and defaults │ │ │ └── utils/ │ ├── __init__.py │ ├── logger.py │ └── helpers.py │ ├── tests/ │ ├── __init__.py │ ├── conftest.py │ ├── test_api/ │ ├── test_services/ │ └── test_websocket.py │ ├── models/ # Trained ML models storage │ ├── price_prediction/ │ │ ├── model_1min.pkl │ │ ├── model_5min.pkl │ │ ├── model_15min.pkl │ │ └── model_60min.pkl │ └── rl_battery/ │ └── battery_policy.pkl │ ├── results/ # Backtest results storage │ └── backtests/ │ ├── .env.example ├── requirements.txt ├── pyproject.toml └── Dockerfile ``` --- ## Configuration ### app/config.py (Settings) ```python from pydantic_settings import BaseSettings from pathlib import Path from typing import List class Settings(BaseSettings): # Application APP_NAME: str = "Energy Trading API" APP_VERSION: str = "1.0.0" DEBUG: bool = True # Server HOST: str = "0.0.0.0" PORT: int = 8000 # Data DATA_PATH: str = "~/energy-test-data/data/processed" DATA_PATH_RESOLVED: Path = Path(DATA_PATH).expanduser() # CORS CORS_ORIGINS: List[str] = [ "http://localhost:3000", "http://localhost:5173", ] # WebSocket WS_HEARTBEAT_INTERVAL: int = 30 # Celery CELERY_BROKER_URL: str = "redis://localhost:6379/0" CELERY_RESULT_BACKEND: str = "redis://localhost:6379/0" # Models MODELS_PATH: str = "models" RESULTS_PATH: str = "results" # Battery BATTERY_MIN_RESERVE: float = 0.10 BATTERY_MAX_CHARGE: float = 0.90 # Arbitrage ARBITRAGE_MIN_SPREAD: float = 5.0 # EUR/MWh # Mining MINING_MARGIN_THRESHOLD: float = 5.0 # EUR/MWh # ML ML_PREDICTION_HORIZONS: List[int] = [1, 5, 15, 60] ML_FEATURE_LAGS: List[int] = [1, 5, 10, 15, 30, 60] class Config: env_file = ".env" case_sensitive = True settings = Settings() ``` --- ## Data Models (app/models/schemas.py) ### Enums ```python class RegionEnum(str, Enum): FR = "FR" BE = "BE" DE = "DE" NL = "NL" UK = "UK" class FuelTypeEnum(str, Enum): GAS = "gas" NUCLEAR = "nuclear" COAL = "coal" SOLAR = "solar" WIND = "wind" HYDRO = "hydro" class StrategyEnum(str, Enum): FUNDAMENTAL = "fundamental" TECHNICAL = "technical" ML = "ml" MINING = "mining" class TradeTypeEnum(str, Enum): BUY = "buy" SELL = "sell" CHARGE = "charge" DISCHARGE = "discharge" class BacktestStatusEnum(str, Enum): PENDING = "pending" RUNNING = "running" COMPLETED = "completed" FAILED = "failed" CANCELLED = "cancelled" class ModelType(str, Enum): PRICE_PREDICTION = "price_prediction" RL_BATTERY = "rl_battery" class AlertTypeEnum(str, Enum): PRICE_SPIKE = "price_spike" ARBITRAGE_OPPORTUNITY = "arbitrage_opportunity" BATTERY_LOW = "battery_low" BATTERY_FULL = "battery_full" STRATEGY_ERROR = "strategy_error" ``` ### Key Schemas ```python class PriceData(BaseModel): timestamp: datetime region: RegionEnum day_ahead_price: float real_time_price: float volume_mw: float class BatteryState(BaseModel): timestamp: datetime battery_id: str capacity_mwh: float charge_level_mwh: float charge_rate_mw: float discharge_rate_mw: float efficiency: float charge_level_pct: float = Field(default_factory=lambda: 0.0) class BacktestConfig(BaseModel): start_date: str end_date: str strategies: List[StrategyEnum] = Field(default_factory=list) use_ml: bool = True battery_min_reserve: Optional[float] = None battery_max_charge: Optional[float] = None arbitrage_min_spread: Optional[float] = None class BacktestMetrics(BaseModel): total_revenue: float arbitrage_profit: float battery_revenue: float mining_profit: float battery_utilization: float price_capture_rate: float win_rate: float sharpe_ratio: float max_drawdown: float total_trades: int class TrainingRequest(BaseModel): model_type: ModelType horizon: Optional[int] = None start_date: str end_date: str hyperparameters: Dict[str, Any] = Field(default_factory=dict) class PredictionResponse(BaseModel): model_id: str timestamp: datetime prediction: float confidence: Optional[float] = None features_used: List[str] = Field(default_factory=list) ``` --- ## API Routes ### Dashboard API (`/api/v1/dashboard/*`) ```python # GET /api/v1/dashboard/summary Response: DashboardSummary # GET /api/v1/dashboard/prices Response: { regions: { [region]: { timestamp, day_ahead_price, real_time_price, volume_mw } } } # GET /api/v1/dashboard/prices/history?region={region}&start={start}&end={end}&limit={limit} Response: { region, data: PriceData[] } # GET /api/v1/dashboard/battery Response: { batteries: BatteryState[] } # GET /api/v1/dashboard/arbitrage?min_spread={min_spread} Response: { opportunities: ArbitrageOpportunity[], count: int } ``` ### Backtest API (`/api/v1/backtest/*`) ```python # POST /api/v1/backtest/start Request: { config: BacktestConfig, name?: string } Response: { backtest_id: string, status: BacktestStatus } # GET /api/v1/backtest/{backtest_id} Response: { status: BacktestStatus, results?: BacktestResult } # GET /api/v1/backtest/{backtest_id}/results Response: BacktestResult # GET /api/v1/backtest/{backtest_id}/trades?limit={limit} Response: { backtest_id, trades: Trade[], total: int } # GET /api/v1/backtest/history Response: { backtests: BacktestStatus[], total: int } # DELETE /api/v1/backtest/{backtest_id} Response: { message: string } ``` ### Models API (`/api/v1/models/*`) ```python # GET /api/v1/models Response: { models: ModelInfo[], total: int } # POST /api/v1/models/train Request: TrainingRequest Response: { training_id: string, status: TrainingStatus } # GET /api/v1/models/{model_id}/status Response: TrainingStatus # GET /api/v1/models/{model_id}/metrics Response: { model_id, metrics: dict } # POST /api/v1/models/predict Request: { model_id, timestamp, features?: dict } Response: PredictionResponse ``` ### Trading API (`/api/v1/trading/*`) ```python # GET /api/v1/trading/strategies Response: { strategies: StrategyStatus[] } # POST /api/v1/trading/strategies Request: { strategy: StrategyEnum, action: "start" | "stop" } Response: { status: StrategyStatus } # GET /api/v1/trading/positions Response: { positions: TradingPosition[] } ``` ### Settings API (`/api/v1/settings/*`) ```python # GET /api/v1/settings Response: AppSettings # POST /api/v1/settings Request: Partial Response: { message, updated_fields: string[] } ``` --- ## Services Interface ### DataService (app/services/data_service.py) ```python class DataService: """Data loading and caching service.""" async def initialize(self): """Load all datasets into memory.""" def get_latest_prices(self) -> Dict[str, Dict]: """Get latest prices for all regions.""" def get_price_history(self, region, start=None, end=None, limit=1000) -> List[Dict]: """Get price history for a region.""" def get_battery_states(self) -> List[Dict]: """Get current battery states.""" def get_arbitrage_opportunities(self, min_spread=None) -> List[Dict]: """Get current arbitrage opportunities.""" def get_dashboard_summary(self) -> Dict: """Get overall dashboard summary.""" ``` ### MLService (app/services/ml_service.py) ```python class MLService: """Service for ML model management and inference.""" def list_models(self) -> List[ModelInfo]: """List all available trained models.""" def get_model_metrics(self, model_id: str) -> Dict[str, float]: """Get performance metrics for a model.""" def load_price_prediction_model(self, model_id: str): """Load price prediction model on-demand.""" def load_rl_battery_policy(self, model_id: str): """Load RL battery policy on-demand.""" def predict( self, model_id: str, timestamp: datetime, features: Optional[Dict[str, Any]] = None ) -> Dict[str, Any]: """Run prediction with on-demand model loading.""" def predict_with_confidence( self, model_id: str, timestamp: datetime, features: Optional[Dict[str, Any]] = None ) -> Dict[str, Any]: """Run prediction with confidence interval.""" def get_feature_importance(self, model_id: str) -> Dict[str, float]: """Get feature importance for a model.""" def get_model_info(self, model_id: str) -> Optional[ModelInfo]: """Get detailed info about a specific model.""" ``` ### StrategyService (app/services/strategy_service.py) ```python class StrategyService: """Strategy execution service.""" async def execute_strategy( self, strategy: StrategyEnum, config: Dict = None ) -> Dict: """Execute a trading strategy.""" async def get_strategy_status(self, strategy: StrategyEnum) -> StrategyStatus: """Get current status of a strategy.""" async def toggle_strategy( self, strategy: StrategyEnum, action: str ) -> StrategyStatus: """Start or stop a strategy.""" ``` --- ## Tasks Interface ### Backtest Tasks (app/tasks/backtest_tasks.py) ```python async def run_backtest_task(backtest_id: str, config: Dict, name: str = None): """ Execute backtest in background. Process: 1. Load data 2. Execute strategies 3. Calculate metrics 4. Save results 5. Emit WebSocket progress events """ ``` ### Training Tasks (app/tasks/training_tasks.py) ```python async def train_model_task(training_id: str, request: TrainingRequest): """ Execute ML model training via Celery task. Dispatches to Celery for async processing. Emits WebSocket events for progress updates. """ @shared_task(name="tasks.train_price_prediction") def train_price_prediction(training_id: str, request_dict: dict): """Celery task for price prediction model training.""" @shared_task(name="tasks.train_rl_battery") def train_rl_battery(training_id: str, request_dict: dict): """Celery task for RL battery policy training.""" ``` --- ## WebSocket Interface ### ConnectionManager (app/api/websocket.py) ```python class ConnectionManager: """WebSocket connection manager.""" async def connect(self, websocket: WebSocket): """Accept and track new connection.""" def disconnect(self, websocket: WebSocket): """Remove connection.""" async def broadcast(self, event_type: str, data: Any): """Broadcast event to all connected clients.""" # Specific event broadcasters async def broadcast_price_update(self, region: str, price_data: Dict): """Broadcast price update.""" async def broadcast_battery_update(self, battery_id: str, battery_state: Dict): """Broadcast battery state update.""" async def broadcast_trade(self, trade: Dict): """Broadcast new trade execution.""" async def broadcast_alert(self, alert: Dict): """Broadcast new alert.""" async def broadcast_backtest_progress(self, backtest_id: str, progress: float, status: str): """Broadcast backtest progress.""" async def broadcast_model_training_progress( self, model_id: str, progress: float, epoch: Optional[int] = None, metrics: Optional[Dict] = None ): """Broadcast model training progress.""" ``` ### WebSocket Events ```python # Event types "price_update" # Real-time price changes "battery_update" # Battery state changes "arbitrage_opportunity" # New arbitrage opportunity "trade_executed" # Trade execution "alert_triggered" # Alert triggered "backtest_progress" # Backtest progress "model_training_progress" # Training progress ``` --- ## Main Application ### app/main.py ```python from fastapi import FastAPI, WebSocket from fastapi.middleware.cors import CORSMiddleware from contextlib import asynccontextmanager app = FastAPI( title=settings.APP_NAME, version=settings.APP_VERSION, docs_url="/docs", redoc_url="/redoc", ) # CORS middleware app.add_middleware(CORSMiddleware, ...) # Include routers app.include_router(dashboard.router, prefix="/api/v1/dashboard", tags=["dashboard"]) app.include_router(backtest.router, prefix="/api/v1/backtest", tags=["backtest"]) app.include_router(models.router, prefix="/api/v1/models", tags=["models"]) app.include_router(trading.router, prefix="/api/v1/trading", tags=["trading"]) app.include_router(settings_routes.router, prefix="/api/v1/settings", tags=["settings"]) # Health check @app.get("/health") async def health_check(): return { "status": "healthy" } # WebSocket endpoint @app.websocket("/ws/real-time") async def websocket_endpoint(websocket: WebSocket): await manager.connect(websocket) ``` --- ## Dependencies ### requirements.txt ``` # FastAPI & Server fastapi>=0.104.0 uvicorn[standard]>=0.24.0 pydantic>=2.4.0 pydantic-settings>=2.0.0 # Data Processing pandas>=2.1.0 numpy>=1.24.0 pyarrow>=14.0.0 # Machine Learning xgboost>=2.0.0 scikit-learn>=1.3.0 # Reinforcement Learning gymnasium>=0.29.0 stable-baselines3>=2.0.0 # Background Tasks celery>=5.3.0 redis>=5.0.0 # WebSockets websockets>=12.0.0 # Database sqlalchemy>=2.0.0 alembic>=1.12.0 # Utilities python-multipart>=0.0.6 python-jose[cryptography]>=3.3.0 python-dotenv>=1.0.0 # Testing pytest>=7.4.0 pytest-asyncio>=0.21.0 httpx>=0.25.0 # Logging loguru>=0.7.0 ``` --- ## Environment Variables ### .env.example ```bash # Application APP_NAME=Energy Trading API APP_VERSION=1.0.0 DEBUG=true # Server HOST=0.0.0.0 PORT=8000 # Data DATA_PATH=~/energy-test-data/data/processed # CORS CORS_ORIGINS=http://localhost:3000,http://localhost:5173 # Celery CELERY_BROKER_URL=redis://localhost:6379/0 CELERY_RESULT_BACKEND=redis://localhost:6379/0 # Paths MODELS_PATH=models RESULTS_PATH=results # Battery BATTERY_MIN_RESERVE=0.10 BATTERY_MAX_CHARGE=0.90 # Arbitrage ARBITRAGE_MIN_SPREAD=5.0 # Mining MINING_MARGIN_THRESHOLD=5.0 ```