Add comprehensive documentation for energy trading system: - Backend: FastAPI architecture, API routes, services, WebSocket - Frontend: React structure, components, state management - ML: Feature engineering, XGBoost price prediction, RL battery optimization
21 KiB
21 KiB
Backend Implementation Strategy
Overview
This document outlines the FastAPI backend for the energy trading system UI. The backend serves data, executes strategies, runs backtests, and provides real-time updates via WebSockets.
Data Source: ~/energy-test-data/data/processed/
Architecture
┌──────────────────────────────────────────────────────────────┐
│ FastAPI Application │
├──────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────┬─────────────┬─────────────┬──────────────┐ │
│ │ API │ Services │ Tasks │ WebSocket │ │
│ │ Routes │ Layer │ (Celery) │ Manager │ │
│ └─────────────┴─────────────┴─────────────┴──────────────┘ │
│ ┌──────────┐ │
│ │ Data │ │
│ │ Cache │ │
│ └──────────┘ │
└──────────────────────────────────────────────────────────────┘
│
▼
┌──────────────────────────────────────────────────────────────┐
│ Core Trading Engine (Imported) │
│ - Fundamental Strategy │
│ - Technical Analysis │
│ - ML Models (Price Prediction, RL Battery) │
│ - Backtesting Engine │
└──────────────────────────────────────────────────────────────┘
│
▼
┌──────────────────────────────────────────────────────────────┐
│ Data Source │
│ ~/energy-test-data/data/processed/*.parquet │
└──────────────────────────────────────────────────────────────┘
Project Structure
backend/
├── app/
│ ├── __init__.py
│ ├── main.py # FastAPI app entry
│ ├── config.py # Configuration management
│ │
│ ├── api/
│ │ ├── __init__.py
│ │ ├── routes/
│ │ │ ├── __init__.py
│ │ │ ├── dashboard.py # Dashboard data endpoints
│ │ │ ├── backtest.py # Backtest execution
│ │ │ ├── models.py # ML model endpoints
│ │ │ ├── trading.py # Trading control
│ │ │ └── settings.py # Configuration management
│ │ └── websocket.py # WebSocket connection manager
│ │
│ ├── services/
│ │ ├── __init__.py
│ │ ├── data_service.py # Data loading and caching
│ │ ├── strategy_service.py # Strategy execution
│ │ ├── ml_service.py # ML model management
│ │ ├── trading_service.py # Trading operations
│ │ └── alert_service.py # Alert management
│ │
│ ├── tasks/
│ │ ├── __init__.py
│ │ ├── backtest_tasks.py # Async backtest execution
│ │ ├── training_tasks.py # ML model training
│ │ └── monitoring_tasks.py # Real-time data updates
│ │
│ ├── ml/ # ML models and training
│ │ ├── __init__.py
│ │ ├── features/
│ │ │ ├── __init__.py
│ │ │ ├── lag_features.py
│ │ │ ├── rolling_features.py
│ │ │ ├── time_features.py
│ │ │ ├── regional_features.py
│ │ │ └── battery_features.py
│ │ │
│ │ ├── price_prediction/
│ │ │ ├── __init__.py
│ │ │ ├── model.py
│ │ │ ├── trainer.py
│ │ │ └── predictor.py
│ │ │
│ │ ├── rl_battery/
│ │ │ ├── __init__.py
│ │ │ ├── environment.py
│ │ │ ├── agent.py
│ │ │ ├── trainer.py
│ │ │ └── policy.py
│ │ │
│ │ ├── model_management/
│ │ │ ├── __init__.py
│ │ │ ├── registry.py
│ │ │ ├── persistence.py
│ │ │ ├── versioning.py
│ │ │ └── comparison.py
│ │ │
│ │ ├── evaluation/
│ │ │ ├── __init__.py
│ │ │ ├── metrics.py
│ │ │ ├── backtest_evaluator.py
│ │ │ └── reports.py
│ │ │
│ │ ├── training/
│ │ │ ├── __init__.py
│ │ │ └── cli.py
│ │ │
│ │ └── utils/
│ │ ├── __init__.py
│ │ ├── data_split.py
│ │ ├── config.py
│ │ └── evaluation.py
│ │
│ ├── models/
│ │ ├── __init__.py
│ │ ├── schemas.py # Pydantic models
│ │ └── enums.py # Enumerations
│ │
│ ├── core/
│ │ ├── __init__.py
│ │ └── constants.py # Constants and defaults
│ │
│ └── utils/
│ ├── __init__.py
│ ├── logger.py
│ └── helpers.py
│
├── tests/
│ ├── __init__.py
│ ├── conftest.py
│ ├── test_api/
│ ├── test_services/
│ └── test_websocket.py
│
├── models/ # Trained ML models storage
│ ├── price_prediction/
│ │ ├── model_1min.pkl
│ │ ├── model_5min.pkl
│ │ ├── model_15min.pkl
│ │ └── model_60min.pkl
│ └── rl_battery/
│ └── battery_policy.pkl
│
├── results/ # Backtest results storage
│ └── backtests/
│
├── .env.example
├── requirements.txt
├── pyproject.toml
└── Dockerfile
Configuration
app/config.py (Settings)
from pydantic_settings import BaseSettings
from pathlib import Path
from typing import List
class Settings(BaseSettings):
# Application
APP_NAME: str = "Energy Trading API"
APP_VERSION: str = "1.0.0"
DEBUG: bool = True
# Server
HOST: str = "0.0.0.0"
PORT: int = 8000
# Data
DATA_PATH: str = "~/energy-test-data/data/processed"
DATA_PATH_RESOLVED: Path = Path(DATA_PATH).expanduser()
# CORS
CORS_ORIGINS: List[str] = [
"http://localhost:3000",
"http://localhost:5173",
]
# WebSocket
WS_HEARTBEAT_INTERVAL: int = 30
# Celery
CELERY_BROKER_URL: str = "redis://localhost:6379/0"
CELERY_RESULT_BACKEND: str = "redis://localhost:6379/0"
# Models
MODELS_PATH: str = "models"
RESULTS_PATH: str = "results"
# Battery
BATTERY_MIN_RESERVE: float = 0.10
BATTERY_MAX_CHARGE: float = 0.90
# Arbitrage
ARBITRAGE_MIN_SPREAD: float = 5.0 # EUR/MWh
# Mining
MINING_MARGIN_THRESHOLD: float = 5.0 # EUR/MWh
# ML
ML_PREDICTION_HORIZONS: List[int] = [1, 5, 15, 60]
ML_FEATURE_LAGS: List[int] = [1, 5, 10, 15, 30, 60]
class Config:
env_file = ".env"
case_sensitive = True
settings = Settings()
Data Models (app/models/schemas.py)
Enums
class RegionEnum(str, Enum):
FR = "FR"
BE = "BE"
DE = "DE"
NL = "NL"
UK = "UK"
class FuelTypeEnum(str, Enum):
GAS = "gas"
NUCLEAR = "nuclear"
COAL = "coal"
SOLAR = "solar"
WIND = "wind"
HYDRO = "hydro"
class StrategyEnum(str, Enum):
FUNDAMENTAL = "fundamental"
TECHNICAL = "technical"
ML = "ml"
MINING = "mining"
class TradeTypeEnum(str, Enum):
BUY = "buy"
SELL = "sell"
CHARGE = "charge"
DISCHARGE = "discharge"
class BacktestStatusEnum(str, Enum):
PENDING = "pending"
RUNNING = "running"
COMPLETED = "completed"
FAILED = "failed"
CANCELLED = "cancelled"
class ModelType(str, Enum):
PRICE_PREDICTION = "price_prediction"
RL_BATTERY = "rl_battery"
class AlertTypeEnum(str, Enum):
PRICE_SPIKE = "price_spike"
ARBITRAGE_OPPORTUNITY = "arbitrage_opportunity"
BATTERY_LOW = "battery_low"
BATTERY_FULL = "battery_full"
STRATEGY_ERROR = "strategy_error"
Key Schemas
class PriceData(BaseModel):
timestamp: datetime
region: RegionEnum
day_ahead_price: float
real_time_price: float
volume_mw: float
class BatteryState(BaseModel):
timestamp: datetime
battery_id: str
capacity_mwh: float
charge_level_mwh: float
charge_rate_mw: float
discharge_rate_mw: float
efficiency: float
charge_level_pct: float = Field(default_factory=lambda: 0.0)
class BacktestConfig(BaseModel):
start_date: str
end_date: str
strategies: List[StrategyEnum] = Field(default_factory=list)
use_ml: bool = True
battery_min_reserve: Optional[float] = None
battery_max_charge: Optional[float] = None
arbitrage_min_spread: Optional[float] = None
class BacktestMetrics(BaseModel):
total_revenue: float
arbitrage_profit: float
battery_revenue: float
mining_profit: float
battery_utilization: float
price_capture_rate: float
win_rate: float
sharpe_ratio: float
max_drawdown: float
total_trades: int
class TrainingRequest(BaseModel):
model_type: ModelType
horizon: Optional[int] = None
start_date: str
end_date: str
hyperparameters: Dict[str, Any] = Field(default_factory=dict)
class PredictionResponse(BaseModel):
model_id: str
timestamp: datetime
prediction: float
confidence: Optional[float] = None
features_used: List[str] = Field(default_factory=list)
API Routes
Dashboard API (/api/v1/dashboard/*)
# GET /api/v1/dashboard/summary
Response: DashboardSummary
# GET /api/v1/dashboard/prices
Response: { regions: { [region]: { timestamp, day_ahead_price, real_time_price, volume_mw } } }
# GET /api/v1/dashboard/prices/history?region={region}&start={start}&end={end}&limit={limit}
Response: { region, data: PriceData[] }
# GET /api/v1/dashboard/battery
Response: { batteries: BatteryState[] }
# GET /api/v1/dashboard/arbitrage?min_spread={min_spread}
Response: { opportunities: ArbitrageOpportunity[], count: int }
Backtest API (/api/v1/backtest/*)
# POST /api/v1/backtest/start
Request: { config: BacktestConfig, name?: string }
Response: { backtest_id: string, status: BacktestStatus }
# GET /api/v1/backtest/{backtest_id}
Response: { status: BacktestStatus, results?: BacktestResult }
# GET /api/v1/backtest/{backtest_id}/results
Response: BacktestResult
# GET /api/v1/backtest/{backtest_id}/trades?limit={limit}
Response: { backtest_id, trades: Trade[], total: int }
# GET /api/v1/backtest/history
Response: { backtests: BacktestStatus[], total: int }
# DELETE /api/v1/backtest/{backtest_id}
Response: { message: string }
Models API (/api/v1/models/*)
# GET /api/v1/models
Response: { models: ModelInfo[], total: int }
# POST /api/v1/models/train
Request: TrainingRequest
Response: { training_id: string, status: TrainingStatus }
# GET /api/v1/models/{model_id}/status
Response: TrainingStatus
# GET /api/v1/models/{model_id}/metrics
Response: { model_id, metrics: dict }
# POST /api/v1/models/predict
Request: { model_id, timestamp, features?: dict }
Response: PredictionResponse
Trading API (/api/v1/trading/*)
# GET /api/v1/trading/strategies
Response: { strategies: StrategyStatus[] }
# POST /api/v1/trading/strategies
Request: { strategy: StrategyEnum, action: "start" | "stop" }
Response: { status: StrategyStatus }
# GET /api/v1/trading/positions
Response: { positions: TradingPosition[] }
Settings API (/api/v1/settings/*)
# GET /api/v1/settings
Response: AppSettings
# POST /api/v1/settings
Request: Partial<AppSettings>
Response: { message, updated_fields: string[] }
Services Interface
DataService (app/services/data_service.py)
class DataService:
"""Data loading and caching service."""
async def initialize(self):
"""Load all datasets into memory."""
def get_latest_prices(self) -> Dict[str, Dict]:
"""Get latest prices for all regions."""
def get_price_history(self, region, start=None, end=None, limit=1000) -> List[Dict]:
"""Get price history for a region."""
def get_battery_states(self) -> List[Dict]:
"""Get current battery states."""
def get_arbitrage_opportunities(self, min_spread=None) -> List[Dict]:
"""Get current arbitrage opportunities."""
def get_dashboard_summary(self) -> Dict:
"""Get overall dashboard summary."""
MLService (app/services/ml_service.py)
class MLService:
"""Service for ML model management and inference."""
def list_models(self) -> List[ModelInfo]:
"""List all available trained models."""
def get_model_metrics(self, model_id: str) -> Dict[str, float]:
"""Get performance metrics for a model."""
def load_price_prediction_model(self, model_id: str):
"""Load price prediction model on-demand."""
def load_rl_battery_policy(self, model_id: str):
"""Load RL battery policy on-demand."""
def predict(
self,
model_id: str,
timestamp: datetime,
features: Optional[Dict[str, Any]] = None
) -> Dict[str, Any]:
"""Run prediction with on-demand model loading."""
def predict_with_confidence(
self,
model_id: str,
timestamp: datetime,
features: Optional[Dict[str, Any]] = None
) -> Dict[str, Any]:
"""Run prediction with confidence interval."""
def get_feature_importance(self, model_id: str) -> Dict[str, float]:
"""Get feature importance for a model."""
def get_model_info(self, model_id: str) -> Optional[ModelInfo]:
"""Get detailed info about a specific model."""
StrategyService (app/services/strategy_service.py)
class StrategyService:
"""Strategy execution service."""
async def execute_strategy(
self,
strategy: StrategyEnum,
config: Dict = None
) -> Dict:
"""Execute a trading strategy."""
async def get_strategy_status(self, strategy: StrategyEnum) -> StrategyStatus:
"""Get current status of a strategy."""
async def toggle_strategy(
self,
strategy: StrategyEnum,
action: str
) -> StrategyStatus:
"""Start or stop a strategy."""
Tasks Interface
Backtest Tasks (app/tasks/backtest_tasks.py)
async def run_backtest_task(backtest_id: str, config: Dict, name: str = None):
"""
Execute backtest in background.
Process:
1. Load data
2. Execute strategies
3. Calculate metrics
4. Save results
5. Emit WebSocket progress events
"""
Training Tasks (app/tasks/training_tasks.py)
async def train_model_task(training_id: str, request: TrainingRequest):
"""
Execute ML model training via Celery task.
Dispatches to Celery for async processing.
Emits WebSocket events for progress updates.
"""
@shared_task(name="tasks.train_price_prediction")
def train_price_prediction(training_id: str, request_dict: dict):
"""Celery task for price prediction model training."""
@shared_task(name="tasks.train_rl_battery")
def train_rl_battery(training_id: str, request_dict: dict):
"""Celery task for RL battery policy training."""
WebSocket Interface
ConnectionManager (app/api/websocket.py)
class ConnectionManager:
"""WebSocket connection manager."""
async def connect(self, websocket: WebSocket):
"""Accept and track new connection."""
def disconnect(self, websocket: WebSocket):
"""Remove connection."""
async def broadcast(self, event_type: str, data: Any):
"""Broadcast event to all connected clients."""
# Specific event broadcasters
async def broadcast_price_update(self, region: str, price_data: Dict):
"""Broadcast price update."""
async def broadcast_battery_update(self, battery_id: str, battery_state: Dict):
"""Broadcast battery state update."""
async def broadcast_trade(self, trade: Dict):
"""Broadcast new trade execution."""
async def broadcast_alert(self, alert: Dict):
"""Broadcast new alert."""
async def broadcast_backtest_progress(self, backtest_id: str, progress: float, status: str):
"""Broadcast backtest progress."""
async def broadcast_model_training_progress(
self,
model_id: str,
progress: float,
epoch: Optional[int] = None,
metrics: Optional[Dict] = None
):
"""Broadcast model training progress."""
WebSocket Events
# Event types
"price_update" # Real-time price changes
"battery_update" # Battery state changes
"arbitrage_opportunity" # New arbitrage opportunity
"trade_executed" # Trade execution
"alert_triggered" # Alert triggered
"backtest_progress" # Backtest progress
"model_training_progress" # Training progress
Main Application
app/main.py
from fastapi import FastAPI, WebSocket
from fastapi.middleware.cors import CORSMiddleware
from contextlib import asynccontextmanager
app = FastAPI(
title=settings.APP_NAME,
version=settings.APP_VERSION,
docs_url="/docs",
redoc_url="/redoc",
)
# CORS middleware
app.add_middleware(CORSMiddleware, ...)
# Include routers
app.include_router(dashboard.router, prefix="/api/v1/dashboard", tags=["dashboard"])
app.include_router(backtest.router, prefix="/api/v1/backtest", tags=["backtest"])
app.include_router(models.router, prefix="/api/v1/models", tags=["models"])
app.include_router(trading.router, prefix="/api/v1/trading", tags=["trading"])
app.include_router(settings_routes.router, prefix="/api/v1/settings", tags=["settings"])
# Health check
@app.get("/health")
async def health_check():
return { "status": "healthy" }
# WebSocket endpoint
@app.websocket("/ws/real-time")
async def websocket_endpoint(websocket: WebSocket):
await manager.connect(websocket)
Dependencies
requirements.txt
# FastAPI & Server
fastapi>=0.104.0
uvicorn[standard]>=0.24.0
pydantic>=2.4.0
pydantic-settings>=2.0.0
# Data Processing
pandas>=2.1.0
numpy>=1.24.0
pyarrow>=14.0.0
# Machine Learning
xgboost>=2.0.0
scikit-learn>=1.3.0
# Reinforcement Learning
gymnasium>=0.29.0
stable-baselines3>=2.0.0
# Background Tasks
celery>=5.3.0
redis>=5.0.0
# WebSockets
websockets>=12.0.0
# Database
sqlalchemy>=2.0.0
alembic>=1.12.0
# Utilities
python-multipart>=0.0.6
python-jose[cryptography]>=3.3.0
python-dotenv>=1.0.0
# Testing
pytest>=7.4.0
pytest-asyncio>=0.21.0
httpx>=0.25.0
# Logging
loguru>=0.7.0
Environment Variables
.env.example
# Application
APP_NAME=Energy Trading API
APP_VERSION=1.0.0
DEBUG=true
# Server
HOST=0.0.0.0
PORT=8000
# Data
DATA_PATH=~/energy-test-data/data/processed
# CORS
CORS_ORIGINS=http://localhost:3000,http://localhost:5173
# Celery
CELERY_BROKER_URL=redis://localhost:6379/0
CELERY_RESULT_BACKEND=redis://localhost:6379/0
# Paths
MODELS_PATH=models
RESULTS_PATH=results
# Battery
BATTERY_MIN_RESERVE=0.10
BATTERY_MAX_CHARGE=0.90
# Arbitrage
ARBITRAGE_MIN_SPREAD=5.0
# Mining
MINING_MARGIN_THRESHOLD=5.0