from typing import Dict, List, Optional from pathlib import Path import pandas as pd from datetime import datetime from app.config import settings from app.utils.logger import get_logger logger = get_logger(__name__) class DataService: def __init__(self): self.data_path: Path = settings.DATA_PATH_RESOLVED self._price_data: Dict[str, pd.DataFrame] = {} self._battery_data: Optional[pd.DataFrame] = None self._loaded: bool = False async def initialize(self): logger.info(f"Loading data from {self.data_path}") self._load_price_data() self._load_battery_data() self._loaded = True logger.info("Data loaded successfully") def _load_price_data(self): if not self.data_path.exists(): logger.warning(f"Data path {self.data_path} does not exist") return prices_file = self.data_path / "electricity_prices.parquet" if prices_file.exists(): df = pd.read_parquet(prices_file) logger.info(f"Loaded price data: {len(df)} total rows from {prices_file}") if "region" in df.columns: for region in ["FR", "BE", "DE", "NL", "UK"]: region_df = df[df["region"] == region].copy() if len(region_df) > 0: self._price_data[region] = region_df logger.info(f"Loaded {region} price data: {len(region_df)} rows") else: logger.warning("Price data file does not contain 'region' column") else: logger.warning(f"Price data file not found: {prices_file}") def _load_battery_data(self): battery_path = self.data_path / "battery_capacity.parquet" if battery_path.exists(): self._battery_data = pd.read_parquet(battery_path) logger.info(f"Loaded battery data: {len(self._battery_data)} rows") else: logger.warning(f"Battery data file not found: {battery_path}") def get_latest_prices(self) -> Dict[str, Dict]: result = {} for region, df in self._price_data.items(): if len(df) > 0: latest = df.iloc[-1].to_dict() result[region] = { "timestamp": latest.get("timestamp"), "day_ahead_price": latest.get("day_ahead_price", 0), "real_time_price": latest.get("real_time_price", 0), "volume_mw": latest.get("volume_mw", 0), } return result def get_price_history( self, region: str, start: Optional[str] = None, end: Optional[str] = None, limit: int = 1000 ) -> List[Dict]: if region not in self._price_data: return [] df = self._price_data[region].copy() if "timestamp" in df.columns: df = df.sort_values("timestamp") if start: df = df[df["timestamp"] >= start] if end: df = df[df["timestamp"] <= end] df = df.tail(limit) return df.to_dict("records") def get_battery_states(self) -> List[Dict]: if self._battery_data is None or len(self._battery_data) == 0: return [] latest_by_battery = self._battery_data.groupby("battery_id").last().reset_index() result = [] for _, row in latest_by_battery.iterrows(): result.append( { "timestamp": row.get("timestamp"), "battery_id": row.get("battery_id"), "capacity_mwh": row.get("capacity_mwh", 0), "charge_level_mwh": row.get("charge_level_mwh", 0), "charge_rate_mw": row.get("charge_rate_mw", 0), "discharge_rate_mw": row.get("discharge_rate_mw", 0), "efficiency": row.get("efficiency", 0.9), } ) return result def get_arbitrage_opportunities(self, min_spread: Optional[float] = None) -> List[Dict]: if min_spread is None: min_spread = settings.ARBITRAGE_MIN_SPREAD opportunities = [] latest_prices = self.get_latest_prices() regions = list(latest_prices.keys()) for i in range(len(regions)): for j in range(i + 1, len(regions)): region_a = regions[i] region_b = regions[j] price_a = latest_prices[region_a].get("real_time_price", 0) price_b = latest_prices[region_b].get("real_time_price", 0) if price_a > 0 and price_b > 0: spread = abs(price_a - price_b) if spread >= min_spread: if price_a < price_b: buy_region, sell_region = region_a, region_b buy_price, sell_price = price_a, price_b else: buy_region, sell_region = region_b, region_a buy_price, sell_price = price_b, price_a opportunities.append( { "timestamp": datetime.utcnow(), "buy_region": buy_region, "sell_region": sell_region, "buy_price": buy_price, "sell_price": sell_price, "spread": spread, "volume_mw": 100, } ) return opportunities def get_dashboard_summary(self) -> Dict: latest_prices = self.get_latest_prices() total_volume = sum(p.get("volume_mw", 0) for p in latest_prices.values()) avg_price = ( sum(p.get("real_time_price", 0) for p in latest_prices.values()) / len(latest_prices) if latest_prices else 0 ) arbitrage = self.get_arbitrage_opportunities() battery_states = self.get_battery_states() avg_battery_charge = 0 if battery_states: avg_battery_charge = sum( b.get("charge_level_mwh", 0) / b.get("capacity_mwh", 1) for b in battery_states ) / len(battery_states) return { "latest_timestamp": datetime.utcnow(), "total_volume_mw": total_volume, "avg_realtime_price": avg_price, "arbitrage_count": len(arbitrage), "battery_count": len(battery_states), "avg_battery_charge": avg_battery_charge, }