Add two new static datasets for cross-region arbitrage calculations: - transmission_capacity: region-to-region capacity limits (20 rows) - transmission_cost: transmission costs per path (20 rows) Update mining dataset with EUR pricing and power metrics: - Change btc_price_usd to btc_price_eur - Add power_efficiency_th_per_mw, power_demand_mw - Add revenue_eur_per_mwh, profit_eur_per_mwh - Remove mining_profitability column Changes include: - scripts/02_fetch_historical.py: rewrite fetch_bitcoin_mining_data() - scripts/01_generate_synthetic.py: add transmission data generators - config/data_config.yaml: add transmission config, update bitcoin config - config/schema.yaml: add 2 new schemas, update bitcoin_mining schema - scripts/03_process_merge.py: add 2 new datasets - scripts/04_validate.py: add 2 new datasets - test/test_data.py: update for new datasets and bitcoin price reference Total datasets: 9 (734,491 rows, 17.89 MB)
394 lines
14 KiB
Python
394 lines
14 KiB
Python
"""
|
|
Generate synthetic data for energy trading strategy test data.
|
|
Handles: battery capacity, data centers, renewable generation, conventional generation.
|
|
"""
|
|
|
|
import yaml
|
|
import numpy as np
|
|
import pandas as pd
|
|
from pathlib import Path
|
|
from datetime import datetime, timedelta
|
|
import json
|
|
|
|
def load_config():
|
|
config_path = Path(__file__).parent.parent / "config" / "data_config.yaml"
|
|
with open(config_path) as f:
|
|
return yaml.safe_load(f)
|
|
|
|
def generate_timestamps(start_date, end_date, granularity):
|
|
start = pd.to_datetime(start_date)
|
|
end = pd.to_datetime(end_date)
|
|
freq = granularity
|
|
return pd.date_range(start=start, end=end, freq=freq)
|
|
|
|
def generate_battery_data(config, timestamps):
|
|
np.random.seed(config['generation']['seed'])
|
|
num_batteries = config['data_sources']['battery_capacity']['num_batteries']
|
|
|
|
params = config['battery']
|
|
gen_params = config['generation']
|
|
|
|
batteries = []
|
|
for i in range(num_batteries):
|
|
battery_id = f"BAT_{i+1:03d}"
|
|
capacity = np.random.uniform(*params['capacity_range'])
|
|
charge_rate = np.random.uniform(*params['charge_rate_range'])
|
|
discharge_rate = np.random.uniform(*params['discharge_rate_range'])
|
|
efficiency = np.random.uniform(*params['efficiency_range'])
|
|
|
|
n = len(timestamps)
|
|
|
|
charge_level = np.zeros(n)
|
|
charge_level[0] = capacity * np.random.uniform(0.3, 0.7)
|
|
|
|
for t in range(1, n):
|
|
action = np.random.choice([-1, 0, 1], p=[0.3, 0.2, 0.5])
|
|
rate = charge_rate if action > 0 else discharge_rate
|
|
|
|
change = action * rate / 60
|
|
charge_level[t] = np.clip(charge_level[t-1] + change, 0, capacity)
|
|
|
|
current_rate = np.diff(charge_level, prepend=charge_level[0]) * 60
|
|
current_rate = np.clip(current_rate, -discharge_rate, charge_rate)
|
|
|
|
data = pd.DataFrame({
|
|
'timestamp': timestamps,
|
|
'battery_id': battery_id,
|
|
'capacity_mwh': capacity,
|
|
'charge_level_mwh': charge_level,
|
|
'charge_rate_mw': current_rate,
|
|
'discharge_rate_mw': discharge_rate,
|
|
'efficiency': efficiency
|
|
})
|
|
batteries.append(data)
|
|
|
|
return pd.concat(batteries, ignore_index=True)
|
|
|
|
def generate_renewable_data(config, timestamps):
|
|
np.random.seed(config['generation']['seed'] + 1)
|
|
|
|
sources = config['data_sources']['renewable_generation']['sources']
|
|
plants_per_source = config['data_sources']['renewable_generation']['plants_per_source']
|
|
|
|
params = config['renewable']
|
|
gen_params = config['generation']
|
|
|
|
df_list = []
|
|
plant_counter = 0
|
|
|
|
for source in sources:
|
|
source_params = params[source]
|
|
for i in range(plants_per_source):
|
|
plant_id = f"{source.upper()}_{i+1:03d}"
|
|
plant_counter += 1
|
|
capacity = np.random.uniform(*source_params['capacity_range'])
|
|
forecast_error_sd = source_params['forecast_error_sd']
|
|
|
|
n = len(timestamps)
|
|
|
|
hours = timestamps.hour + timestamps.minute / 60
|
|
|
|
if source == 'solar':
|
|
base_pattern = np.maximum(0, np.sin(np.pi * (hours - 6) / 12))
|
|
seasonal = 0.7 + 0.3 * np.sin(2 * np.pi * timestamps.dayofyear / 365)
|
|
elif source == 'wind':
|
|
base_pattern = 0.4 + 0.3 * np.sin(2 * np.pi * hours / 24) + 0.3 * np.random.randn(n)
|
|
seasonal = 0.8 + 0.2 * np.sin(2 * np.pi * timestamps.dayofyear / 365)
|
|
else:
|
|
base_pattern = 0.6 + 0.2 * np.random.randn(n)
|
|
seasonal = 1.0
|
|
|
|
generation = base_pattern * seasonal * capacity * np.random.uniform(0.8, 1.2, n)
|
|
generation = np.maximum(0, generation)
|
|
|
|
forecast_error = np.random.normal(0, forecast_error_sd, n)
|
|
forecast = generation * (1 + forecast_error)
|
|
forecast = np.maximum(0, forecast)
|
|
|
|
capacity_factor = generation / capacity
|
|
|
|
data = pd.DataFrame({
|
|
'timestamp': timestamps,
|
|
'source': source,
|
|
'plant_id': plant_id,
|
|
'generation_mw': generation,
|
|
'forecast_mw': forecast,
|
|
'actual_mw': generation,
|
|
'capacity_factor': capacity_factor
|
|
})
|
|
df_list.append(data)
|
|
|
|
return pd.concat(df_list, ignore_index=True)
|
|
|
|
def generate_conventional_data(config, timestamps):
|
|
np.random.seed(config['generation']['seed'] + 2)
|
|
|
|
num_plants = config['data_sources']['conventional_generation']['num_plants']
|
|
fuel_types = config['data_sources']['conventional_generation']['fuel_types']
|
|
|
|
params = config['conventional']
|
|
|
|
df_list = []
|
|
|
|
for i in range(num_plants):
|
|
plant_id = f"CONV_{i+1:03d}"
|
|
fuel_type = np.random.choice(fuel_types)
|
|
|
|
fuel_params = params[fuel_type]
|
|
capacity = np.random.uniform(*fuel_params['capacity_range'])
|
|
marginal_cost = np.random.uniform(*fuel_params['marginal_cost_range'])
|
|
heat_rate = np.random.uniform(6, 12) if fuel_type == 'gas' else np.random.uniform(8, 14)
|
|
|
|
n = len(timestamps)
|
|
hours = timestamps.hour + timestamps.minute / 60
|
|
|
|
if fuel_type == 'nuclear':
|
|
base_load = 0.9 * capacity
|
|
generation = base_load + np.random.normal(0, 0.01 * capacity, n)
|
|
elif fuel_type == 'gas':
|
|
peaking_pattern = 0.3 + 0.4 * np.sin(2 * np.pi * (hours - 12) / 24)
|
|
generation = peaking_pattern * capacity + np.random.normal(0, 0.05 * capacity, n)
|
|
else:
|
|
baseload_pattern = 0.5 + 0.2 * np.sin(2 * np.pi * hours / 24)
|
|
generation = baseload_pattern * capacity + np.random.normal(0, 0.03 * capacity, n)
|
|
|
|
generation = np.clip(generation, 0, capacity)
|
|
|
|
data = pd.DataFrame({
|
|
'timestamp': timestamps,
|
|
'plant_id': plant_id,
|
|
'fuel_type': fuel_type,
|
|
'generation_mw': generation,
|
|
'marginal_cost': marginal_cost,
|
|
'heat_rate': heat_rate
|
|
})
|
|
df_list.append(data)
|
|
|
|
return pd.concat(df_list, ignore_index=True)
|
|
|
|
def generate_data_center_data(config, timestamps):
|
|
np.random.seed(config['generation']['seed'] + 3)
|
|
|
|
num_centers = config['data_sources']['data_centers']['num_centers']
|
|
params = config['data_center']
|
|
|
|
df_list = []
|
|
locations = ['FR', 'BE', 'DE', 'NL', 'UK']
|
|
|
|
for i in range(num_centers):
|
|
data_center_id = f"DC_{i+1:03d}"
|
|
location = locations[i % len(locations)]
|
|
|
|
base_demand = np.random.uniform(*params['power_demand_range'])
|
|
price_sensitivity = np.random.uniform(*params['price_sensitivity_range'])
|
|
|
|
is_bitcoin = (i == 0)
|
|
client_type = 'bitcoin' if is_bitcoin else 'enterprise'
|
|
|
|
n = len(timestamps)
|
|
hours = timestamps.hour + timestamps.minute / 60
|
|
|
|
if is_bitcoin:
|
|
base_profile = 0.7 + 0.3 * np.random.randn(n)
|
|
else:
|
|
base_profile = 0.6 + 0.2 * np.sin(2 * np.pi * (hours - 12) / 24)
|
|
|
|
demand = base_demand * base_profile
|
|
demand = np.maximum(demand * 0.5, demand)
|
|
|
|
max_bid = base_demand * price_sensitivity * (0.8 + 0.4 * np.random.rand(n))
|
|
|
|
data = pd.DataFrame({
|
|
'timestamp': timestamps,
|
|
'data_center_id': data_center_id,
|
|
'location': location,
|
|
'power_demand_mw': demand,
|
|
'max_bid_price': max_bid,
|
|
'client_type': client_type
|
|
})
|
|
df_list.append(data)
|
|
|
|
return pd.concat(df_list, ignore_index=True)
|
|
|
|
def generate_transmission_capacity_data(config):
|
|
np.random.seed(config['generation']['seed'] + 13)
|
|
|
|
regions = config['regions']
|
|
params = config['transmission']
|
|
|
|
data = []
|
|
|
|
for i, src in enumerate(regions):
|
|
for j, tgt in enumerate(regions):
|
|
if i == j:
|
|
continue
|
|
|
|
base_capacity = np.random.uniform(*params['capacity_base_range'])
|
|
|
|
if src == 'UK' or tgt == 'UK':
|
|
base_capacity *= params['capacity_uk_multiplier']
|
|
|
|
capacity = base_capacity * np.random.uniform(0.8, 1.2)
|
|
efficiency = np.random.uniform(*params['efficiency_range'])
|
|
direction = 'bidirectional'
|
|
|
|
data.append({
|
|
'source_region': src,
|
|
'target_region': tgt,
|
|
'capacity_mw': capacity,
|
|
'direction': direction,
|
|
'efficiency': efficiency
|
|
})
|
|
|
|
return pd.DataFrame(data)
|
|
|
|
def generate_transmission_cost_data(config):
|
|
np.random.seed(config['generation']['seed'] + 14)
|
|
|
|
regions = config['regions']
|
|
params = config['transmission']
|
|
|
|
avg_electricity_price = 80
|
|
|
|
data = []
|
|
|
|
for i, src in enumerate(regions):
|
|
for j, tgt in enumerate(regions):
|
|
if i == j:
|
|
continue
|
|
|
|
efficiency = np.random.uniform(*params['efficiency_range'])
|
|
loss_percent = (1 - efficiency) * 100
|
|
congestion_surcharge = np.random.uniform(*params['congestion_surcharge_range'])
|
|
fee = np.random.uniform(*params['fee_range'])
|
|
|
|
loss_cost = (loss_percent / 100) * avg_electricity_price
|
|
cost_eur_mwh = loss_cost + congestion_surcharge + fee
|
|
|
|
data.append({
|
|
'source_region': src,
|
|
'target_region': tgt,
|
|
'cost_eur_mwh': cost_eur_mwh,
|
|
'loss_percent': loss_percent,
|
|
'congestion_surcharge_eur_mwh': congestion_surcharge,
|
|
'fee_eur_mwh': fee
|
|
})
|
|
|
|
return pd.DataFrame(data)
|
|
|
|
def apply_noise_and_outliers(df, config):
|
|
if not config['generation']['add_noise']:
|
|
return df
|
|
|
|
noise_level = config['generation']['noise_level']
|
|
outlier_rate = config['generation']['outlier_rate']
|
|
|
|
for col in df.select_dtypes(include=[np.number]).columns:
|
|
if col == 'timestamp':
|
|
continue
|
|
|
|
noise = np.random.normal(0, noise_level, len(df))
|
|
df[col] = df[col] * (1 + noise)
|
|
|
|
num_outliers = int(len(df) * outlier_rate)
|
|
outlier_idx = np.random.choice(len(df), num_outliers, replace=False)
|
|
df.loc[outlier_idx, col] = df.loc[outlier_idx, col] * np.random.uniform(0.5, 2.0, num_outliers)
|
|
|
|
return df
|
|
|
|
def add_missing_values(df, config):
|
|
if not config['generation']['include_missing_values']:
|
|
return df
|
|
|
|
missing_rate = config['generation']['missing_rate']
|
|
|
|
for col in df.select_dtypes(include=[np.number]).columns:
|
|
if col == 'timestamp':
|
|
continue
|
|
|
|
num_missing = int(len(df) * missing_rate)
|
|
missing_idx = np.random.choice(len(df), num_missing, replace=False)
|
|
df.loc[missing_idx, col] = np.nan
|
|
|
|
return df
|
|
|
|
def save_metadata(datasets, output_dir):
|
|
metadata = {
|
|
'generated_at': datetime.utcnow().isoformat(),
|
|
'datasets': {}
|
|
}
|
|
|
|
for name, df in datasets.items():
|
|
metadata['datasets'][name] = {
|
|
'rows': len(df),
|
|
'columns': len(df.columns),
|
|
'memory_usage_mb': df.memory_usage(deep=True).sum() / 1024 / 1024,
|
|
'dtypes': {col: str(dtype) for col, dtype in df.dtypes.items()},
|
|
'columns': list(df.columns)
|
|
}
|
|
|
|
output_path = Path(output_dir) / 'metadata' / 'generation_metadata.json'
|
|
with open(output_path, 'w') as f:
|
|
json.dump(metadata, f, indent=2, default=str)
|
|
|
|
return metadata
|
|
|
|
def main():
|
|
config = load_config()
|
|
|
|
time_config = config['time_range']
|
|
timestamps = generate_timestamps(
|
|
time_config['start_date'],
|
|
time_config['end_date'],
|
|
time_config['granularity']
|
|
)
|
|
|
|
print(f"Generating synthetic data for {len(timestamps)} timestamps...")
|
|
|
|
datasets = {}
|
|
|
|
datasets['battery_capacity'] = generate_battery_data(config, timestamps)
|
|
print(f" - Battery capacity: {len(datasets['battery_capacity'])} rows")
|
|
|
|
datasets['renewable_generation'] = generate_renewable_data(config, timestamps)
|
|
print(f" - Renewable generation: {len(datasets['renewable_generation'])} rows")
|
|
|
|
datasets['conventional_generation'] = generate_conventional_data(config, timestamps)
|
|
print(f" - Conventional generation: {len(datasets['conventional_generation'])} rows")
|
|
|
|
datasets['data_centers'] = generate_data_center_data(config, timestamps)
|
|
print(f" - Data centers: {len(datasets['data_centers'])} rows")
|
|
|
|
datasets['transmission_capacity'] = generate_transmission_capacity_data(config)
|
|
print(f" - Transmission capacity: {len(datasets['transmission_capacity'])} rows")
|
|
|
|
datasets['transmission_cost'] = generate_transmission_cost_data(config)
|
|
print(f" - Transmission cost: {len(datasets['transmission_cost'])} rows")
|
|
|
|
for name, df in datasets.items():
|
|
if name not in ['transmission_capacity', 'transmission_cost']:
|
|
df = apply_noise_and_outliers(df, config)
|
|
df = add_missing_values(df, config)
|
|
datasets[name] = df
|
|
|
|
output_base = Path(__file__).parent.parent / 'data'
|
|
output_base.mkdir(parents=True, exist_ok=True)
|
|
|
|
raw_dir = output_base / 'raw'
|
|
raw_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
for name, df in datasets.items():
|
|
file_path = raw_dir / f'{name}_raw.parquet'
|
|
df.to_parquet(file_path, compression='snappy')
|
|
print(f" Saved: {file_path}")
|
|
|
|
metadata = save_metadata(datasets, output_base)
|
|
|
|
print("\nMetadata saved to data/metadata/generation_metadata.json")
|
|
print(f"Total datasets generated: {len(datasets)}")
|
|
|
|
return datasets
|
|
|
|
if __name__ == '__main__':
|
|
main()
|