""" Generate synthetic data for energy trading strategy test data. Handles: battery capacity, data centers, renewable generation, conventional generation. """ import yaml import numpy as np import pandas as pd from pathlib import Path from datetime import datetime, timedelta import json def load_config(): config_path = Path(__file__).parent.parent / "config" / "data_config.yaml" with open(config_path) as f: return yaml.safe_load(f) def generate_timestamps(start_date, end_date, granularity): start = pd.to_datetime(start_date) end = pd.to_datetime(end_date) freq = granularity return pd.date_range(start=start, end=end, freq=freq) def generate_battery_data(config, timestamps): np.random.seed(config['generation']['seed']) num_batteries = config['data_sources']['battery_capacity']['num_batteries'] params = config['battery'] gen_params = config['generation'] batteries = [] for i in range(num_batteries): battery_id = f"BAT_{i+1:03d}" capacity = np.random.uniform(*params['capacity_range']) charge_rate = np.random.uniform(*params['charge_rate_range']) discharge_rate = np.random.uniform(*params['discharge_rate_range']) efficiency = np.random.uniform(*params['efficiency_range']) n = len(timestamps) charge_level = np.zeros(n) charge_level[0] = capacity * np.random.uniform(0.3, 0.7) for t in range(1, n): action = np.random.choice([-1, 0, 1], p=[0.3, 0.2, 0.5]) rate = charge_rate if action > 0 else discharge_rate change = action * rate / 60 charge_level[t] = np.clip(charge_level[t-1] + change, 0, capacity) current_rate = np.diff(charge_level, prepend=charge_level[0]) * 60 current_rate = np.clip(current_rate, -discharge_rate, charge_rate) data = pd.DataFrame({ 'timestamp': timestamps, 'battery_id': battery_id, 'capacity_mwh': capacity, 'charge_level_mwh': charge_level, 'charge_rate_mw': current_rate, 'discharge_rate_mw': discharge_rate, 'efficiency': efficiency }) batteries.append(data) return pd.concat(batteries, ignore_index=True) def generate_renewable_data(config, timestamps): np.random.seed(config['generation']['seed'] + 1) sources = config['data_sources']['renewable_generation']['sources'] plants_per_source = config['data_sources']['renewable_generation']['plants_per_source'] params = config['renewable'] gen_params = config['generation'] df_list = [] plant_counter = 0 for source in sources: source_params = params[source] for i in range(plants_per_source): plant_id = f"{source.upper()}_{i+1:03d}" plant_counter += 1 capacity = np.random.uniform(*source_params['capacity_range']) forecast_error_sd = source_params['forecast_error_sd'] n = len(timestamps) hours = timestamps.hour + timestamps.minute / 60 if source == 'solar': base_pattern = np.maximum(0, np.sin(np.pi * (hours - 6) / 12)) seasonal = 0.7 + 0.3 * np.sin(2 * np.pi * timestamps.dayofyear / 365) elif source == 'wind': base_pattern = 0.4 + 0.3 * np.sin(2 * np.pi * hours / 24) + 0.3 * np.random.randn(n) seasonal = 0.8 + 0.2 * np.sin(2 * np.pi * timestamps.dayofyear / 365) else: base_pattern = 0.6 + 0.2 * np.random.randn(n) seasonal = 1.0 generation = base_pattern * seasonal * capacity * np.random.uniform(0.8, 1.2, n) generation = np.maximum(0, generation) forecast_error = np.random.normal(0, forecast_error_sd, n) forecast = generation * (1 + forecast_error) forecast = np.maximum(0, forecast) capacity_factor = generation / capacity data = pd.DataFrame({ 'timestamp': timestamps, 'source': source, 'plant_id': plant_id, 'generation_mw': generation, 'forecast_mw': forecast, 'actual_mw': generation, 'capacity_factor': capacity_factor }) df_list.append(data) return pd.concat(df_list, ignore_index=True) def generate_conventional_data(config, timestamps): np.random.seed(config['generation']['seed'] + 2) num_plants = config['data_sources']['conventional_generation']['num_plants'] fuel_types = config['data_sources']['conventional_generation']['fuel_types'] params = config['conventional'] df_list = [] for i in range(num_plants): plant_id = f"CONV_{i+1:03d}" fuel_type = np.random.choice(fuel_types) fuel_params = params[fuel_type] capacity = np.random.uniform(*fuel_params['capacity_range']) marginal_cost = np.random.uniform(*fuel_params['marginal_cost_range']) heat_rate = np.random.uniform(6, 12) if fuel_type == 'gas' else np.random.uniform(8, 14) n = len(timestamps) hours = timestamps.hour + timestamps.minute / 60 if fuel_type == 'nuclear': base_load = 0.9 * capacity generation = base_load + np.random.normal(0, 0.01 * capacity, n) elif fuel_type == 'gas': peaking_pattern = 0.3 + 0.4 * np.sin(2 * np.pi * (hours - 12) / 24) generation = peaking_pattern * capacity + np.random.normal(0, 0.05 * capacity, n) else: baseload_pattern = 0.5 + 0.2 * np.sin(2 * np.pi * hours / 24) generation = baseload_pattern * capacity + np.random.normal(0, 0.03 * capacity, n) generation = np.clip(generation, 0, capacity) data = pd.DataFrame({ 'timestamp': timestamps, 'plant_id': plant_id, 'fuel_type': fuel_type, 'generation_mw': generation, 'marginal_cost': marginal_cost, 'heat_rate': heat_rate }) df_list.append(data) return pd.concat(df_list, ignore_index=True) def generate_data_center_data(config, timestamps): np.random.seed(config['generation']['seed'] + 3) num_centers = config['data_sources']['data_centers']['num_centers'] params = config['data_center'] df_list = [] locations = ['FR', 'BE', 'DE', 'NL', 'UK'] for i in range(num_centers): data_center_id = f"DC_{i+1:03d}" location = locations[i % len(locations)] base_demand = np.random.uniform(*params['power_demand_range']) price_sensitivity = np.random.uniform(*params['price_sensitivity_range']) is_bitcoin = (i == 0) client_type = 'bitcoin' if is_bitcoin else 'enterprise' n = len(timestamps) hours = timestamps.hour + timestamps.minute / 60 if is_bitcoin: base_profile = 0.7 + 0.3 * np.random.randn(n) else: base_profile = 0.6 + 0.2 * np.sin(2 * np.pi * (hours - 12) / 24) demand = base_demand * base_profile demand = np.maximum(demand * 0.5, demand) max_bid = base_demand * price_sensitivity * (0.8 + 0.4 * np.random.rand(n)) data = pd.DataFrame({ 'timestamp': timestamps, 'data_center_id': data_center_id, 'location': location, 'power_demand_mw': demand, 'max_bid_price': max_bid, 'client_type': client_type }) df_list.append(data) return pd.concat(df_list, ignore_index=True) def apply_noise_and_outliers(df, config): if not config['generation']['add_noise']: return df noise_level = config['generation']['noise_level'] outlier_rate = config['generation']['outlier_rate'] for col in df.select_dtypes(include=[np.number]).columns: if col == 'timestamp': continue noise = np.random.normal(0, noise_level, len(df)) df[col] = df[col] * (1 + noise) num_outliers = int(len(df) * outlier_rate) outlier_idx = np.random.choice(len(df), num_outliers, replace=False) df.loc[outlier_idx, col] = df.loc[outlier_idx, col] * np.random.uniform(0.5, 2.0, num_outliers) return df def add_missing_values(df, config): if not config['generation']['include_missing_values']: return df missing_rate = config['generation']['missing_rate'] for col in df.select_dtypes(include=[np.number]).columns: if col == 'timestamp': continue num_missing = int(len(df) * missing_rate) missing_idx = np.random.choice(len(df), num_missing, replace=False) df.loc[missing_idx, col] = np.nan return df def save_metadata(datasets, output_dir): metadata = { 'generated_at': datetime.utcnow().isoformat(), 'datasets': {} } for name, df in datasets.items(): metadata['datasets'][name] = { 'rows': len(df), 'columns': len(df.columns), 'memory_usage_mb': df.memory_usage(deep=True).sum() / 1024 / 1024, 'dtypes': {col: str(dtype) for col, dtype in df.dtypes.items()}, 'columns': list(df.columns) } output_path = Path(output_dir) / 'metadata' / 'generation_metadata.json' with open(output_path, 'w') as f: json.dump(metadata, f, indent=2, default=str) return metadata def main(): config = load_config() time_config = config['time_range'] timestamps = generate_timestamps( time_config['start_date'], time_config['end_date'], time_config['granularity'] ) print(f"Generating synthetic data for {len(timestamps)} timestamps...") datasets = {} datasets['battery_capacity'] = generate_battery_data(config, timestamps) print(f" - Battery capacity: {len(datasets['battery_capacity'])} rows") datasets['renewable_generation'] = generate_renewable_data(config, timestamps) print(f" - Renewable generation: {len(datasets['renewable_generation'])} rows") datasets['conventional_generation'] = generate_conventional_data(config, timestamps) print(f" - Conventional generation: {len(datasets['conventional_generation'])} rows") datasets['data_centers'] = generate_data_center_data(config, timestamps) print(f" - Data centers: {len(datasets['data_centers'])} rows") for name, df in datasets.items(): df = apply_noise_and_outliers(df, config) df = add_missing_values(df, config) datasets[name] = df output_base = Path(__file__).parent.parent / 'data' output_base.mkdir(parents=True, exist_ok=True) raw_dir = output_base / 'raw' raw_dir.mkdir(parents=True, exist_ok=True) for name, df in datasets.items(): file_path = raw_dir / f'{name}_raw.parquet' df.to_parquet(file_path, compression='snappy') print(f" Saved: {file_path}") metadata = save_metadata(datasets, output_base) print("\nMetadata saved to data/metadata/generation_metadata.json") print(f"Total datasets generated: {len(datasets)}") return datasets if __name__ == '__main__': main()