Files
energy-trade/BACKEND_IMPLEMENTATION.md
kbt-devops a22a13f6f4 Add initial implementation strategy documentation
Add comprehensive documentation for energy trading system:
- Backend: FastAPI architecture, API routes, services, WebSocket
- Frontend: React structure, components, state management
- ML: Feature engineering, XGBoost price prediction, RL battery optimization
2026-02-11 02:16:25 +07:00

751 lines
21 KiB
Markdown

# Backend Implementation Strategy
## Overview
This document outlines the FastAPI backend for the energy trading system UI. The backend serves data, executes strategies, runs backtests, and provides real-time updates via WebSockets.
**Data Source**: `~/energy-test-data/data/processed/`
---
## Architecture
```
┌──────────────────────────────────────────────────────────────┐
│ FastAPI Application │
├──────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────┬─────────────┬─────────────┬──────────────┐ │
│ │ API │ Services │ Tasks │ WebSocket │ │
│ │ Routes │ Layer │ (Celery) │ Manager │ │
│ └─────────────┴─────────────┴─────────────┴──────────────┘ │
│ ┌──────────┐ │
│ │ Data │ │
│ │ Cache │ │
│ └──────────┘ │
└──────────────────────────────────────────────────────────────┘
┌──────────────────────────────────────────────────────────────┐
│ Core Trading Engine (Imported) │
│ - Fundamental Strategy │
│ - Technical Analysis │
│ - ML Models (Price Prediction, RL Battery) │
│ - Backtesting Engine │
└──────────────────────────────────────────────────────────────┘
┌──────────────────────────────────────────────────────────────┐
│ Data Source │
│ ~/energy-test-data/data/processed/*.parquet │
└──────────────────────────────────────────────────────────────┘
```
---
## Project Structure
```
backend/
├── app/
│ ├── __init__.py
│ ├── main.py # FastAPI app entry
│ ├── config.py # Configuration management
│ │
│ ├── api/
│ │ ├── __init__.py
│ │ ├── routes/
│ │ │ ├── __init__.py
│ │ │ ├── dashboard.py # Dashboard data endpoints
│ │ │ ├── backtest.py # Backtest execution
│ │ │ ├── models.py # ML model endpoints
│ │ │ ├── trading.py # Trading control
│ │ │ └── settings.py # Configuration management
│ │ └── websocket.py # WebSocket connection manager
│ │
│ ├── services/
│ │ ├── __init__.py
│ │ ├── data_service.py # Data loading and caching
│ │ ├── strategy_service.py # Strategy execution
│ │ ├── ml_service.py # ML model management
│ │ ├── trading_service.py # Trading operations
│ │ └── alert_service.py # Alert management
│ │
│ ├── tasks/
│ │ ├── __init__.py
│ │ ├── backtest_tasks.py # Async backtest execution
│ │ ├── training_tasks.py # ML model training
│ │ └── monitoring_tasks.py # Real-time data updates
│ │
│ ├── ml/ # ML models and training
│ │ ├── __init__.py
│ │ ├── features/
│ │ │ ├── __init__.py
│ │ │ ├── lag_features.py
│ │ │ ├── rolling_features.py
│ │ │ ├── time_features.py
│ │ │ ├── regional_features.py
│ │ │ └── battery_features.py
│ │ │
│ │ ├── price_prediction/
│ │ │ ├── __init__.py
│ │ │ ├── model.py
│ │ │ ├── trainer.py
│ │ │ └── predictor.py
│ │ │
│ │ ├── rl_battery/
│ │ │ ├── __init__.py
│ │ │ ├── environment.py
│ │ │ ├── agent.py
│ │ │ ├── trainer.py
│ │ │ └── policy.py
│ │ │
│ │ ├── model_management/
│ │ │ ├── __init__.py
│ │ │ ├── registry.py
│ │ │ ├── persistence.py
│ │ │ ├── versioning.py
│ │ │ └── comparison.py
│ │ │
│ │ ├── evaluation/
│ │ │ ├── __init__.py
│ │ │ ├── metrics.py
│ │ │ ├── backtest_evaluator.py
│ │ │ └── reports.py
│ │ │
│ │ ├── training/
│ │ │ ├── __init__.py
│ │ │ └── cli.py
│ │ │
│ │ └── utils/
│ │ ├── __init__.py
│ │ ├── data_split.py
│ │ ├── config.py
│ │ └── evaluation.py
│ │
│ ├── models/
│ │ ├── __init__.py
│ │ ├── schemas.py # Pydantic models
│ │ └── enums.py # Enumerations
│ │
│ ├── core/
│ │ ├── __init__.py
│ │ └── constants.py # Constants and defaults
│ │
│ └── utils/
│ ├── __init__.py
│ ├── logger.py
│ └── helpers.py
├── tests/
│ ├── __init__.py
│ ├── conftest.py
│ ├── test_api/
│ ├── test_services/
│ └── test_websocket.py
├── models/ # Trained ML models storage
│ ├── price_prediction/
│ │ ├── model_1min.pkl
│ │ ├── model_5min.pkl
│ │ ├── model_15min.pkl
│ │ └── model_60min.pkl
│ └── rl_battery/
│ └── battery_policy.pkl
├── results/ # Backtest results storage
│ └── backtests/
├── .env.example
├── requirements.txt
├── pyproject.toml
└── Dockerfile
```
---
## Configuration
### app/config.py (Settings)
```python
from pydantic_settings import BaseSettings
from pathlib import Path
from typing import List
class Settings(BaseSettings):
# Application
APP_NAME: str = "Energy Trading API"
APP_VERSION: str = "1.0.0"
DEBUG: bool = True
# Server
HOST: str = "0.0.0.0"
PORT: int = 8000
# Data
DATA_PATH: str = "~/energy-test-data/data/processed"
DATA_PATH_RESOLVED: Path = Path(DATA_PATH).expanduser()
# CORS
CORS_ORIGINS: List[str] = [
"http://localhost:3000",
"http://localhost:5173",
]
# WebSocket
WS_HEARTBEAT_INTERVAL: int = 30
# Celery
CELERY_BROKER_URL: str = "redis://localhost:6379/0"
CELERY_RESULT_BACKEND: str = "redis://localhost:6379/0"
# Models
MODELS_PATH: str = "models"
RESULTS_PATH: str = "results"
# Battery
BATTERY_MIN_RESERVE: float = 0.10
BATTERY_MAX_CHARGE: float = 0.90
# Arbitrage
ARBITRAGE_MIN_SPREAD: float = 5.0 # EUR/MWh
# Mining
MINING_MARGIN_THRESHOLD: float = 5.0 # EUR/MWh
# ML
ML_PREDICTION_HORIZONS: List[int] = [1, 5, 15, 60]
ML_FEATURE_LAGS: List[int] = [1, 5, 10, 15, 30, 60]
class Config:
env_file = ".env"
case_sensitive = True
settings = Settings()
```
---
## Data Models (app/models/schemas.py)
### Enums
```python
class RegionEnum(str, Enum):
FR = "FR"
BE = "BE"
DE = "DE"
NL = "NL"
UK = "UK"
class FuelTypeEnum(str, Enum):
GAS = "gas"
NUCLEAR = "nuclear"
COAL = "coal"
SOLAR = "solar"
WIND = "wind"
HYDRO = "hydro"
class StrategyEnum(str, Enum):
FUNDAMENTAL = "fundamental"
TECHNICAL = "technical"
ML = "ml"
MINING = "mining"
class TradeTypeEnum(str, Enum):
BUY = "buy"
SELL = "sell"
CHARGE = "charge"
DISCHARGE = "discharge"
class BacktestStatusEnum(str, Enum):
PENDING = "pending"
RUNNING = "running"
COMPLETED = "completed"
FAILED = "failed"
CANCELLED = "cancelled"
class ModelType(str, Enum):
PRICE_PREDICTION = "price_prediction"
RL_BATTERY = "rl_battery"
class AlertTypeEnum(str, Enum):
PRICE_SPIKE = "price_spike"
ARBITRAGE_OPPORTUNITY = "arbitrage_opportunity"
BATTERY_LOW = "battery_low"
BATTERY_FULL = "battery_full"
STRATEGY_ERROR = "strategy_error"
```
### Key Schemas
```python
class PriceData(BaseModel):
timestamp: datetime
region: RegionEnum
day_ahead_price: float
real_time_price: float
volume_mw: float
class BatteryState(BaseModel):
timestamp: datetime
battery_id: str
capacity_mwh: float
charge_level_mwh: float
charge_rate_mw: float
discharge_rate_mw: float
efficiency: float
charge_level_pct: float = Field(default_factory=lambda: 0.0)
class BacktestConfig(BaseModel):
start_date: str
end_date: str
strategies: List[StrategyEnum] = Field(default_factory=list)
use_ml: bool = True
battery_min_reserve: Optional[float] = None
battery_max_charge: Optional[float] = None
arbitrage_min_spread: Optional[float] = None
class BacktestMetrics(BaseModel):
total_revenue: float
arbitrage_profit: float
battery_revenue: float
mining_profit: float
battery_utilization: float
price_capture_rate: float
win_rate: float
sharpe_ratio: float
max_drawdown: float
total_trades: int
class TrainingRequest(BaseModel):
model_type: ModelType
horizon: Optional[int] = None
start_date: str
end_date: str
hyperparameters: Dict[str, Any] = Field(default_factory=dict)
class PredictionResponse(BaseModel):
model_id: str
timestamp: datetime
prediction: float
confidence: Optional[float] = None
features_used: List[str] = Field(default_factory=list)
```
---
## API Routes
### Dashboard API (`/api/v1/dashboard/*`)
```python
# GET /api/v1/dashboard/summary
Response: DashboardSummary
# GET /api/v1/dashboard/prices
Response: { regions: { [region]: { timestamp, day_ahead_price, real_time_price, volume_mw } } }
# GET /api/v1/dashboard/prices/history?region={region}&start={start}&end={end}&limit={limit}
Response: { region, data: PriceData[] }
# GET /api/v1/dashboard/battery
Response: { batteries: BatteryState[] }
# GET /api/v1/dashboard/arbitrage?min_spread={min_spread}
Response: { opportunities: ArbitrageOpportunity[], count: int }
```
### Backtest API (`/api/v1/backtest/*`)
```python
# POST /api/v1/backtest/start
Request: { config: BacktestConfig, name?: string }
Response: { backtest_id: string, status: BacktestStatus }
# GET /api/v1/backtest/{backtest_id}
Response: { status: BacktestStatus, results?: BacktestResult }
# GET /api/v1/backtest/{backtest_id}/results
Response: BacktestResult
# GET /api/v1/backtest/{backtest_id}/trades?limit={limit}
Response: { backtest_id, trades: Trade[], total: int }
# GET /api/v1/backtest/history
Response: { backtests: BacktestStatus[], total: int }
# DELETE /api/v1/backtest/{backtest_id}
Response: { message: string }
```
### Models API (`/api/v1/models/*`)
```python
# GET /api/v1/models
Response: { models: ModelInfo[], total: int }
# POST /api/v1/models/train
Request: TrainingRequest
Response: { training_id: string, status: TrainingStatus }
# GET /api/v1/models/{model_id}/status
Response: TrainingStatus
# GET /api/v1/models/{model_id}/metrics
Response: { model_id, metrics: dict }
# POST /api/v1/models/predict
Request: { model_id, timestamp, features?: dict }
Response: PredictionResponse
```
### Trading API (`/api/v1/trading/*`)
```python
# GET /api/v1/trading/strategies
Response: { strategies: StrategyStatus[] }
# POST /api/v1/trading/strategies
Request: { strategy: StrategyEnum, action: "start" | "stop" }
Response: { status: StrategyStatus }
# GET /api/v1/trading/positions
Response: { positions: TradingPosition[] }
```
### Settings API (`/api/v1/settings/*`)
```python
# GET /api/v1/settings
Response: AppSettings
# POST /api/v1/settings
Request: Partial<AppSettings>
Response: { message, updated_fields: string[] }
```
---
## Services Interface
### DataService (app/services/data_service.py)
```python
class DataService:
"""Data loading and caching service."""
async def initialize(self):
"""Load all datasets into memory."""
def get_latest_prices(self) -> Dict[str, Dict]:
"""Get latest prices for all regions."""
def get_price_history(self, region, start=None, end=None, limit=1000) -> List[Dict]:
"""Get price history for a region."""
def get_battery_states(self) -> List[Dict]:
"""Get current battery states."""
def get_arbitrage_opportunities(self, min_spread=None) -> List[Dict]:
"""Get current arbitrage opportunities."""
def get_dashboard_summary(self) -> Dict:
"""Get overall dashboard summary."""
```
### MLService (app/services/ml_service.py)
```python
class MLService:
"""Service for ML model management and inference."""
def list_models(self) -> List[ModelInfo]:
"""List all available trained models."""
def get_model_metrics(self, model_id: str) -> Dict[str, float]:
"""Get performance metrics for a model."""
def load_price_prediction_model(self, model_id: str):
"""Load price prediction model on-demand."""
def load_rl_battery_policy(self, model_id: str):
"""Load RL battery policy on-demand."""
def predict(
self,
model_id: str,
timestamp: datetime,
features: Optional[Dict[str, Any]] = None
) -> Dict[str, Any]:
"""Run prediction with on-demand model loading."""
def predict_with_confidence(
self,
model_id: str,
timestamp: datetime,
features: Optional[Dict[str, Any]] = None
) -> Dict[str, Any]:
"""Run prediction with confidence interval."""
def get_feature_importance(self, model_id: str) -> Dict[str, float]:
"""Get feature importance for a model."""
def get_model_info(self, model_id: str) -> Optional[ModelInfo]:
"""Get detailed info about a specific model."""
```
### StrategyService (app/services/strategy_service.py)
```python
class StrategyService:
"""Strategy execution service."""
async def execute_strategy(
self,
strategy: StrategyEnum,
config: Dict = None
) -> Dict:
"""Execute a trading strategy."""
async def get_strategy_status(self, strategy: StrategyEnum) -> StrategyStatus:
"""Get current status of a strategy."""
async def toggle_strategy(
self,
strategy: StrategyEnum,
action: str
) -> StrategyStatus:
"""Start or stop a strategy."""
```
---
## Tasks Interface
### Backtest Tasks (app/tasks/backtest_tasks.py)
```python
async def run_backtest_task(backtest_id: str, config: Dict, name: str = None):
"""
Execute backtest in background.
Process:
1. Load data
2. Execute strategies
3. Calculate metrics
4. Save results
5. Emit WebSocket progress events
"""
```
### Training Tasks (app/tasks/training_tasks.py)
```python
async def train_model_task(training_id: str, request: TrainingRequest):
"""
Execute ML model training via Celery task.
Dispatches to Celery for async processing.
Emits WebSocket events for progress updates.
"""
@shared_task(name="tasks.train_price_prediction")
def train_price_prediction(training_id: str, request_dict: dict):
"""Celery task for price prediction model training."""
@shared_task(name="tasks.train_rl_battery")
def train_rl_battery(training_id: str, request_dict: dict):
"""Celery task for RL battery policy training."""
```
---
## WebSocket Interface
### ConnectionManager (app/api/websocket.py)
```python
class ConnectionManager:
"""WebSocket connection manager."""
async def connect(self, websocket: WebSocket):
"""Accept and track new connection."""
def disconnect(self, websocket: WebSocket):
"""Remove connection."""
async def broadcast(self, event_type: str, data: Any):
"""Broadcast event to all connected clients."""
# Specific event broadcasters
async def broadcast_price_update(self, region: str, price_data: Dict):
"""Broadcast price update."""
async def broadcast_battery_update(self, battery_id: str, battery_state: Dict):
"""Broadcast battery state update."""
async def broadcast_trade(self, trade: Dict):
"""Broadcast new trade execution."""
async def broadcast_alert(self, alert: Dict):
"""Broadcast new alert."""
async def broadcast_backtest_progress(self, backtest_id: str, progress: float, status: str):
"""Broadcast backtest progress."""
async def broadcast_model_training_progress(
self,
model_id: str,
progress: float,
epoch: Optional[int] = None,
metrics: Optional[Dict] = None
):
"""Broadcast model training progress."""
```
### WebSocket Events
```python
# Event types
"price_update" # Real-time price changes
"battery_update" # Battery state changes
"arbitrage_opportunity" # New arbitrage opportunity
"trade_executed" # Trade execution
"alert_triggered" # Alert triggered
"backtest_progress" # Backtest progress
"model_training_progress" # Training progress
```
---
## Main Application
### app/main.py
```python
from fastapi import FastAPI, WebSocket
from fastapi.middleware.cors import CORSMiddleware
from contextlib import asynccontextmanager
app = FastAPI(
title=settings.APP_NAME,
version=settings.APP_VERSION,
docs_url="/docs",
redoc_url="/redoc",
)
# CORS middleware
app.add_middleware(CORSMiddleware, ...)
# Include routers
app.include_router(dashboard.router, prefix="/api/v1/dashboard", tags=["dashboard"])
app.include_router(backtest.router, prefix="/api/v1/backtest", tags=["backtest"])
app.include_router(models.router, prefix="/api/v1/models", tags=["models"])
app.include_router(trading.router, prefix="/api/v1/trading", tags=["trading"])
app.include_router(settings_routes.router, prefix="/api/v1/settings", tags=["settings"])
# Health check
@app.get("/health")
async def health_check():
return { "status": "healthy" }
# WebSocket endpoint
@app.websocket("/ws/real-time")
async def websocket_endpoint(websocket: WebSocket):
await manager.connect(websocket)
```
---
## Dependencies
### requirements.txt
```
# FastAPI & Server
fastapi>=0.104.0
uvicorn[standard]>=0.24.0
pydantic>=2.4.0
pydantic-settings>=2.0.0
# Data Processing
pandas>=2.1.0
numpy>=1.24.0
pyarrow>=14.0.0
# Machine Learning
xgboost>=2.0.0
scikit-learn>=1.3.0
# Reinforcement Learning
gymnasium>=0.29.0
stable-baselines3>=2.0.0
# Background Tasks
celery>=5.3.0
redis>=5.0.0
# WebSockets
websockets>=12.0.0
# Database
sqlalchemy>=2.0.0
alembic>=1.12.0
# Utilities
python-multipart>=0.0.6
python-jose[cryptography]>=3.3.0
python-dotenv>=1.0.0
# Testing
pytest>=7.4.0
pytest-asyncio>=0.21.0
httpx>=0.25.0
# Logging
loguru>=0.7.0
```
---
## Environment Variables
### .env.example
```bash
# Application
APP_NAME=Energy Trading API
APP_VERSION=1.0.0
DEBUG=true
# Server
HOST=0.0.0.0
PORT=8000
# Data
DATA_PATH=~/energy-test-data/data/processed
# CORS
CORS_ORIGINS=http://localhost:3000,http://localhost:5173
# Celery
CELERY_BROKER_URL=redis://localhost:6379/0
CELERY_RESULT_BACKEND=redis://localhost:6379/0
# Paths
MODELS_PATH=models
RESULTS_PATH=results
# Battery
BATTERY_MIN_RESERVE=0.10
BATTERY_MAX_CHARGE=0.90
# Arbitrage
ARBITRAGE_MIN_SPREAD=5.0
# Mining
MINING_MARGIN_THRESHOLD=5.0
```