Files
energy-trade/BACKEND_IMPLEMENTATION.md
kbt-devops a22a13f6f4 Add initial implementation strategy documentation
Add comprehensive documentation for energy trading system:
- Backend: FastAPI architecture, API routes, services, WebSocket
- Frontend: React structure, components, state management
- ML: Feature engineering, XGBoost price prediction, RL battery optimization
2026-02-11 02:16:25 +07:00

21 KiB

Backend Implementation Strategy

Overview

This document outlines the FastAPI backend for the energy trading system UI. The backend serves data, executes strategies, runs backtests, and provides real-time updates via WebSockets.

Data Source: ~/energy-test-data/data/processed/


Architecture

┌──────────────────────────────────────────────────────────────┐
│                    FastAPI Application                         │
├──────────────────────────────────────────────────────────────┤
│                                                              │
│  ┌─────────────┬─────────────┬─────────────┬──────────────┐  │
│  │    API      │   Services  │    Tasks    │  WebSocket   │  │
│  │   Routes    │   Layer     │   (Celery)  │   Manager    │  │
│  └─────────────┴─────────────┴─────────────┴──────────────┘  │
│                          ┌──────────┐                         │
│                          │   Data   │                         │
│                          │  Cache   │                         │
│                          └──────────┘                         │
└──────────────────────────────────────────────────────────────┘
                               │
                               ▼
┌──────────────────────────────────────────────────────────────┐
│              Core Trading Engine (Imported)                   │
│  - Fundamental Strategy                                       │
│  - Technical Analysis                                         │
│  - ML Models (Price Prediction, RL Battery)                   │
│  - Backtesting Engine                                         │
└──────────────────────────────────────────────────────────────┘
                               │
                               ▼
┌──────────────────────────────────────────────────────────────┐
│                 Data Source                                   │
│  ~/energy-test-data/data/processed/*.parquet                 │
└──────────────────────────────────────────────────────────────┘

Project Structure

backend/
├── app/
│   ├── __init__.py
│   ├── main.py                          # FastAPI app entry
│   ├── config.py                        # Configuration management
│   │
│   ├── api/
│   │   ├── __init__.py
│   │   ├── routes/
│   │   │   ├── __init__.py
│   │   │   ├── dashboard.py            # Dashboard data endpoints
│   │   │   ├── backtest.py             # Backtest execution
│   │   │   ├── models.py               # ML model endpoints
│   │   │   ├── trading.py              # Trading control
│   │   │   └── settings.py             # Configuration management
│   │   └── websocket.py                # WebSocket connection manager
│   │
│   ├── services/
│   │   ├── __init__.py
│   │   ├── data_service.py             # Data loading and caching
│   │   ├── strategy_service.py         # Strategy execution
│   │   ├── ml_service.py               # ML model management
│   │   ├── trading_service.py          # Trading operations
│   │   └── alert_service.py            # Alert management
│   │
│   ├── tasks/
│   │   ├── __init__.py
│   │   ├── backtest_tasks.py           # Async backtest execution
│   │   ├── training_tasks.py           # ML model training
│   │   └── monitoring_tasks.py         # Real-time data updates
│   │
│   ├── ml/                             # ML models and training
│   │   ├── __init__.py
│   │   ├── features/
│   │   │   ├── __init__.py
│   │   │   ├── lag_features.py
│   │   │   ├── rolling_features.py
│   │   │   ├── time_features.py
│   │   │   ├── regional_features.py
│   │   │   └── battery_features.py
│   │   │
│   │   ├── price_prediction/
│   │   │   ├── __init__.py
│   │   │   ├── model.py
│   │   │   ├── trainer.py
│   │   │   └── predictor.py
│   │   │
│   │   ├── rl_battery/
│   │   │   ├── __init__.py
│   │   │   ├── environment.py
│   │   │   ├── agent.py
│   │   │   ├── trainer.py
│   │   │   └── policy.py
│   │   │
│   │   ├── model_management/
│   │   │   ├── __init__.py
│   │   │   ├── registry.py
│   │   │   ├── persistence.py
│   │   │   ├── versioning.py
│   │   │   └── comparison.py
│   │   │
│   │   ├── evaluation/
│   │   │   ├── __init__.py
│   │   │   ├── metrics.py
│   │   │   ├── backtest_evaluator.py
│   │   │   └── reports.py
│   │   │
│   │   ├── training/
│   │   │   ├── __init__.py
│   │   │   └── cli.py
│   │   │
│   │   └── utils/
│   │       ├── __init__.py
│   │       ├── data_split.py
│   │       ├── config.py
│   │       └── evaluation.py
│   │
│   ├── models/
│   │   ├── __init__.py
│   │   ├── schemas.py                  # Pydantic models
│   │   └── enums.py                    # Enumerations
│   │
│   ├── core/
│   │   ├── __init__.py
│   │   └── constants.py                # Constants and defaults
│   │
│   └── utils/
│       ├── __init__.py
│       ├── logger.py
│       └── helpers.py
│
├── tests/
│   ├── __init__.py
│   ├── conftest.py
│   ├── test_api/
│   ├── test_services/
│   └── test_websocket.py
│
├── models/                             # Trained ML models storage
│   ├── price_prediction/
│   │   ├── model_1min.pkl
│   │   ├── model_5min.pkl
│   │   ├── model_15min.pkl
│   │   └── model_60min.pkl
│   └── rl_battery/
│       └── battery_policy.pkl
│
├── results/                             # Backtest results storage
│   └── backtests/
│
├── .env.example
├── requirements.txt
├── pyproject.toml
└── Dockerfile

Configuration

app/config.py (Settings)

from pydantic_settings import BaseSettings
from pathlib import Path
from typing import List

class Settings(BaseSettings):
    # Application
    APP_NAME: str = "Energy Trading API"
    APP_VERSION: str = "1.0.0"
    DEBUG: bool = True

    # Server
    HOST: str = "0.0.0.0"
    PORT: int = 8000

    # Data
    DATA_PATH: str = "~/energy-test-data/data/processed"
    DATA_PATH_RESOLVED: Path = Path(DATA_PATH).expanduser()

    # CORS
    CORS_ORIGINS: List[str] = [
        "http://localhost:3000",
        "http://localhost:5173",
    ]

    # WebSocket
    WS_HEARTBEAT_INTERVAL: int = 30

    # Celery
    CELERY_BROKER_URL: str = "redis://localhost:6379/0"
    CELERY_RESULT_BACKEND: str = "redis://localhost:6379/0"

    # Models
    MODELS_PATH: str = "models"
    RESULTS_PATH: str = "results"

    # Battery
    BATTERY_MIN_RESERVE: float = 0.10
    BATTERY_MAX_CHARGE: float = 0.90

    # Arbitrage
    ARBITRAGE_MIN_SPREAD: float = 5.0  # EUR/MWh

    # Mining
    MINING_MARGIN_THRESHOLD: float = 5.0  # EUR/MWh

    # ML
    ML_PREDICTION_HORIZONS: List[int] = [1, 5, 15, 60]
    ML_FEATURE_LAGS: List[int] = [1, 5, 10, 15, 30, 60]

    class Config:
        env_file = ".env"
        case_sensitive = True

settings = Settings()

Data Models (app/models/schemas.py)

Enums

class RegionEnum(str, Enum):
    FR = "FR"
    BE = "BE"
    DE = "DE"
    NL = "NL"
    UK = "UK"

class FuelTypeEnum(str, Enum):
    GAS = "gas"
    NUCLEAR = "nuclear"
    COAL = "coal"
    SOLAR = "solar"
    WIND = "wind"
    HYDRO = "hydro"

class StrategyEnum(str, Enum):
    FUNDAMENTAL = "fundamental"
    TECHNICAL = "technical"
    ML = "ml"
    MINING = "mining"

class TradeTypeEnum(str, Enum):
    BUY = "buy"
    SELL = "sell"
    CHARGE = "charge"
    DISCHARGE = "discharge"

class BacktestStatusEnum(str, Enum):
    PENDING = "pending"
    RUNNING = "running"
    COMPLETED = "completed"
    FAILED = "failed"
    CANCELLED = "cancelled"

class ModelType(str, Enum):
    PRICE_PREDICTION = "price_prediction"
    RL_BATTERY = "rl_battery"

class AlertTypeEnum(str, Enum):
    PRICE_SPIKE = "price_spike"
    ARBITRAGE_OPPORTUNITY = "arbitrage_opportunity"
    BATTERY_LOW = "battery_low"
    BATTERY_FULL = "battery_full"
    STRATEGY_ERROR = "strategy_error"

Key Schemas

class PriceData(BaseModel):
    timestamp: datetime
    region: RegionEnum
    day_ahead_price: float
    real_time_price: float
    volume_mw: float

class BatteryState(BaseModel):
    timestamp: datetime
    battery_id: str
    capacity_mwh: float
    charge_level_mwh: float
    charge_rate_mw: float
    discharge_rate_mw: float
    efficiency: float
    charge_level_pct: float = Field(default_factory=lambda: 0.0)

class BacktestConfig(BaseModel):
    start_date: str
    end_date: str
    strategies: List[StrategyEnum] = Field(default_factory=list)
    use_ml: bool = True
    battery_min_reserve: Optional[float] = None
    battery_max_charge: Optional[float] = None
    arbitrage_min_spread: Optional[float] = None

class BacktestMetrics(BaseModel):
    total_revenue: float
    arbitrage_profit: float
    battery_revenue: float
    mining_profit: float
    battery_utilization: float
    price_capture_rate: float
    win_rate: float
    sharpe_ratio: float
    max_drawdown: float
    total_trades: int

class TrainingRequest(BaseModel):
    model_type: ModelType
    horizon: Optional[int] = None
    start_date: str
    end_date: str
    hyperparameters: Dict[str, Any] = Field(default_factory=dict)

class PredictionResponse(BaseModel):
    model_id: str
    timestamp: datetime
    prediction: float
    confidence: Optional[float] = None
    features_used: List[str] = Field(default_factory=list)

API Routes

Dashboard API (/api/v1/dashboard/*)

# GET /api/v1/dashboard/summary
Response: DashboardSummary

# GET /api/v1/dashboard/prices
Response: { regions: { [region]: { timestamp, day_ahead_price, real_time_price, volume_mw } } }

# GET /api/v1/dashboard/prices/history?region={region}&start={start}&end={end}&limit={limit}
Response: { region, data: PriceData[] }

# GET /api/v1/dashboard/battery
Response: { batteries: BatteryState[] }

# GET /api/v1/dashboard/arbitrage?min_spread={min_spread}
Response: { opportunities: ArbitrageOpportunity[], count: int }

Backtest API (/api/v1/backtest/*)

# POST /api/v1/backtest/start
Request: { config: BacktestConfig, name?: string }
Response: { backtest_id: string, status: BacktestStatus }

# GET /api/v1/backtest/{backtest_id}
Response: { status: BacktestStatus, results?: BacktestResult }

# GET /api/v1/backtest/{backtest_id}/results
Response: BacktestResult

# GET /api/v1/backtest/{backtest_id}/trades?limit={limit}
Response: { backtest_id, trades: Trade[], total: int }

# GET /api/v1/backtest/history
Response: { backtests: BacktestStatus[], total: int }

# DELETE /api/v1/backtest/{backtest_id}
Response: { message: string }

Models API (/api/v1/models/*)

# GET /api/v1/models
Response: { models: ModelInfo[], total: int }

# POST /api/v1/models/train
Request: TrainingRequest
Response: { training_id: string, status: TrainingStatus }

# GET /api/v1/models/{model_id}/status
Response: TrainingStatus

# GET /api/v1/models/{model_id}/metrics
Response: { model_id, metrics: dict }

# POST /api/v1/models/predict
Request: { model_id, timestamp, features?: dict }
Response: PredictionResponse

Trading API (/api/v1/trading/*)

# GET /api/v1/trading/strategies
Response: { strategies: StrategyStatus[] }

# POST /api/v1/trading/strategies
Request: { strategy: StrategyEnum, action: "start" | "stop" }
Response: { status: StrategyStatus }

# GET /api/v1/trading/positions
Response: { positions: TradingPosition[] }

Settings API (/api/v1/settings/*)

# GET /api/v1/settings
Response: AppSettings

# POST /api/v1/settings
Request: Partial<AppSettings>
Response: { message, updated_fields: string[] }

Services Interface

DataService (app/services/data_service.py)

class DataService:
    """Data loading and caching service."""

    async def initialize(self):
        """Load all datasets into memory."""

    def get_latest_prices(self) -> Dict[str, Dict]:
        """Get latest prices for all regions."""

    def get_price_history(self, region, start=None, end=None, limit=1000) -> List[Dict]:
        """Get price history for a region."""

    def get_battery_states(self) -> List[Dict]:
        """Get current battery states."""

    def get_arbitrage_opportunities(self, min_spread=None) -> List[Dict]:
        """Get current arbitrage opportunities."""

    def get_dashboard_summary(self) -> Dict:
        """Get overall dashboard summary."""

MLService (app/services/ml_service.py)

class MLService:
    """Service for ML model management and inference."""

    def list_models(self) -> List[ModelInfo]:
        """List all available trained models."""

    def get_model_metrics(self, model_id: str) -> Dict[str, float]:
        """Get performance metrics for a model."""

    def load_price_prediction_model(self, model_id: str):
        """Load price prediction model on-demand."""

    def load_rl_battery_policy(self, model_id: str):
        """Load RL battery policy on-demand."""

    def predict(
        self,
        model_id: str,
        timestamp: datetime,
        features: Optional[Dict[str, Any]] = None
    ) -> Dict[str, Any]:
        """Run prediction with on-demand model loading."""

    def predict_with_confidence(
        self,
        model_id: str,
        timestamp: datetime,
        features: Optional[Dict[str, Any]] = None
    ) -> Dict[str, Any]:
        """Run prediction with confidence interval."""

    def get_feature_importance(self, model_id: str) -> Dict[str, float]:
        """Get feature importance for a model."""

    def get_model_info(self, model_id: str) -> Optional[ModelInfo]:
        """Get detailed info about a specific model."""

StrategyService (app/services/strategy_service.py)

class StrategyService:
    """Strategy execution service."""

    async def execute_strategy(
        self,
        strategy: StrategyEnum,
        config: Dict = None
    ) -> Dict:
        """Execute a trading strategy."""

    async def get_strategy_status(self, strategy: StrategyEnum) -> StrategyStatus:
        """Get current status of a strategy."""

    async def toggle_strategy(
        self,
        strategy: StrategyEnum,
        action: str
    ) -> StrategyStatus:
        """Start or stop a strategy."""

Tasks Interface

Backtest Tasks (app/tasks/backtest_tasks.py)

async def run_backtest_task(backtest_id: str, config: Dict, name: str = None):
    """
    Execute backtest in background.

    Process:
    1. Load data
    2. Execute strategies
    3. Calculate metrics
    4. Save results
    5. Emit WebSocket progress events
    """

Training Tasks (app/tasks/training_tasks.py)

async def train_model_task(training_id: str, request: TrainingRequest):
    """
    Execute ML model training via Celery task.

    Dispatches to Celery for async processing.
    Emits WebSocket events for progress updates.
    """

@shared_task(name="tasks.train_price_prediction")
def train_price_prediction(training_id: str, request_dict: dict):
    """Celery task for price prediction model training."""

@shared_task(name="tasks.train_rl_battery")
def train_rl_battery(training_id: str, request_dict: dict):
    """Celery task for RL battery policy training."""

WebSocket Interface

ConnectionManager (app/api/websocket.py)

class ConnectionManager:
    """WebSocket connection manager."""

    async def connect(self, websocket: WebSocket):
        """Accept and track new connection."""

    def disconnect(self, websocket: WebSocket):
        """Remove connection."""

    async def broadcast(self, event_type: str, data: Any):
        """Broadcast event to all connected clients."""

    # Specific event broadcasters
    async def broadcast_price_update(self, region: str, price_data: Dict):
        """Broadcast price update."""

    async def broadcast_battery_update(self, battery_id: str, battery_state: Dict):
        """Broadcast battery state update."""

    async def broadcast_trade(self, trade: Dict):
        """Broadcast new trade execution."""

    async def broadcast_alert(self, alert: Dict):
        """Broadcast new alert."""

    async def broadcast_backtest_progress(self, backtest_id: str, progress: float, status: str):
        """Broadcast backtest progress."""

    async def broadcast_model_training_progress(
        self,
        model_id: str,
        progress: float,
        epoch: Optional[int] = None,
        metrics: Optional[Dict] = None
    ):
        """Broadcast model training progress."""

WebSocket Events

# Event types
"price_update"           # Real-time price changes
"battery_update"         # Battery state changes
"arbitrage_opportunity"  # New arbitrage opportunity
"trade_executed"         # Trade execution
"alert_triggered"        # Alert triggered
"backtest_progress"      # Backtest progress
"model_training_progress" # Training progress

Main Application

app/main.py

from fastapi import FastAPI, WebSocket
from fastapi.middleware.cors import CORSMiddleware
from contextlib import asynccontextmanager

app = FastAPI(
    title=settings.APP_NAME,
    version=settings.APP_VERSION,
    docs_url="/docs",
    redoc_url="/redoc",
)

# CORS middleware
app.add_middleware(CORSMiddleware, ...)

# Include routers
app.include_router(dashboard.router, prefix="/api/v1/dashboard", tags=["dashboard"])
app.include_router(backtest.router, prefix="/api/v1/backtest", tags=["backtest"])
app.include_router(models.router, prefix="/api/v1/models", tags=["models"])
app.include_router(trading.router, prefix="/api/v1/trading", tags=["trading"])
app.include_router(settings_routes.router, prefix="/api/v1/settings", tags=["settings"])

# Health check
@app.get("/health")
async def health_check():
    return { "status": "healthy" }

# WebSocket endpoint
@app.websocket("/ws/real-time")
async def websocket_endpoint(websocket: WebSocket):
    await manager.connect(websocket)

Dependencies

requirements.txt

# FastAPI & Server
fastapi>=0.104.0
uvicorn[standard]>=0.24.0
pydantic>=2.4.0
pydantic-settings>=2.0.0

# Data Processing
pandas>=2.1.0
numpy>=1.24.0
pyarrow>=14.0.0

# Machine Learning
xgboost>=2.0.0
scikit-learn>=1.3.0

# Reinforcement Learning
gymnasium>=0.29.0
stable-baselines3>=2.0.0

# Background Tasks
celery>=5.3.0
redis>=5.0.0

# WebSockets
websockets>=12.0.0

# Database
sqlalchemy>=2.0.0
alembic>=1.12.0

# Utilities
python-multipart>=0.0.6
python-jose[cryptography]>=3.3.0
python-dotenv>=1.0.0

# Testing
pytest>=7.4.0
pytest-asyncio>=0.21.0
httpx>=0.25.0

# Logging
loguru>=0.7.0

Environment Variables

.env.example

# Application
APP_NAME=Energy Trading API
APP_VERSION=1.0.0
DEBUG=true

# Server
HOST=0.0.0.0
PORT=8000

# Data
DATA_PATH=~/energy-test-data/data/processed

# CORS
CORS_ORIGINS=http://localhost:3000,http://localhost:5173

# Celery
CELERY_BROKER_URL=redis://localhost:6379/0
CELERY_RESULT_BACKEND=redis://localhost:6379/0

# Paths
MODELS_PATH=models
RESULTS_PATH=results

# Battery
BATTERY_MIN_RESERVE=0.10
BATTERY_MAX_CHARGE=0.90

# Arbitrage
ARBITRAGE_MIN_SPREAD=5.0

# Mining
MINING_MARGIN_THRESHOLD=5.0