Files
energy-test-data/scripts/01_generate_synthetic.py
kbt-devops a643767359 Initial commit: Energy test data generation pipeline
Add complete test data preparation system for energy trading strategy
demo. Includes configuration, data generation scripts, and validation
tools for 7 datasets covering electricity prices, battery capacity,
renewable/conventional generation, load profiles, data centers, and
mining data.

Excluded from git: Actual parquet data files (data/raw/, data/processed/)
can be regenerated using the provided scripts.

Datasets:
- electricity_prices: Day-ahead and real-time prices (5 regions)
- battery_capacity: Storage system charge/discharge cycles
- renewable_generation: Solar, wind, hydro with forecast errors
- conventional_generation: Gas, coal, nuclear plant outputs
- load_profiles: Regional demand with weather correlations
- data_centers: Power demand profiles including mining operations
- mining_data: Hashrate, price, profitability (mempool.space API)
2026-02-10 23:28:23 +07:00

321 lines
11 KiB
Python

"""
Generate synthetic data for energy trading strategy test data.
Handles: battery capacity, data centers, renewable generation, conventional generation.
"""
import yaml
import numpy as np
import pandas as pd
from pathlib import Path
from datetime import datetime, timedelta
import json
def load_config():
config_path = Path(__file__).parent.parent / "config" / "data_config.yaml"
with open(config_path) as f:
return yaml.safe_load(f)
def generate_timestamps(start_date, end_date, granularity):
start = pd.to_datetime(start_date)
end = pd.to_datetime(end_date)
freq = granularity
return pd.date_range(start=start, end=end, freq=freq)
def generate_battery_data(config, timestamps):
np.random.seed(config['generation']['seed'])
num_batteries = config['data_sources']['battery_capacity']['num_batteries']
params = config['battery']
gen_params = config['generation']
batteries = []
for i in range(num_batteries):
battery_id = f"BAT_{i+1:03d}"
capacity = np.random.uniform(*params['capacity_range'])
charge_rate = np.random.uniform(*params['charge_rate_range'])
discharge_rate = np.random.uniform(*params['discharge_rate_range'])
efficiency = np.random.uniform(*params['efficiency_range'])
n = len(timestamps)
charge_level = np.zeros(n)
charge_level[0] = capacity * np.random.uniform(0.3, 0.7)
for t in range(1, n):
action = np.random.choice([-1, 0, 1], p=[0.3, 0.2, 0.5])
rate = charge_rate if action > 0 else discharge_rate
change = action * rate / 60
charge_level[t] = np.clip(charge_level[t-1] + change, 0, capacity)
current_rate = np.diff(charge_level, prepend=charge_level[0]) * 60
current_rate = np.clip(current_rate, -discharge_rate, charge_rate)
data = pd.DataFrame({
'timestamp': timestamps,
'battery_id': battery_id,
'capacity_mwh': capacity,
'charge_level_mwh': charge_level,
'charge_rate_mw': current_rate,
'discharge_rate_mw': discharge_rate,
'efficiency': efficiency
})
batteries.append(data)
return pd.concat(batteries, ignore_index=True)
def generate_renewable_data(config, timestamps):
np.random.seed(config['generation']['seed'] + 1)
sources = config['data_sources']['renewable_generation']['sources']
plants_per_source = config['data_sources']['renewable_generation']['plants_per_source']
params = config['renewable']
gen_params = config['generation']
df_list = []
plant_counter = 0
for source in sources:
source_params = params[source]
for i in range(plants_per_source):
plant_id = f"{source.upper()}_{i+1:03d}"
plant_counter += 1
capacity = np.random.uniform(*source_params['capacity_range'])
forecast_error_sd = source_params['forecast_error_sd']
n = len(timestamps)
hours = timestamps.hour + timestamps.minute / 60
if source == 'solar':
base_pattern = np.maximum(0, np.sin(np.pi * (hours - 6) / 12))
seasonal = 0.7 + 0.3 * np.sin(2 * np.pi * timestamps.dayofyear / 365)
elif source == 'wind':
base_pattern = 0.4 + 0.3 * np.sin(2 * np.pi * hours / 24) + 0.3 * np.random.randn(n)
seasonal = 0.8 + 0.2 * np.sin(2 * np.pi * timestamps.dayofyear / 365)
else:
base_pattern = 0.6 + 0.2 * np.random.randn(n)
seasonal = 1.0
generation = base_pattern * seasonal * capacity * np.random.uniform(0.8, 1.2, n)
generation = np.maximum(0, generation)
forecast_error = np.random.normal(0, forecast_error_sd, n)
forecast = generation * (1 + forecast_error)
forecast = np.maximum(0, forecast)
capacity_factor = generation / capacity
data = pd.DataFrame({
'timestamp': timestamps,
'source': source,
'plant_id': plant_id,
'generation_mw': generation,
'forecast_mw': forecast,
'actual_mw': generation,
'capacity_factor': capacity_factor
})
df_list.append(data)
return pd.concat(df_list, ignore_index=True)
def generate_conventional_data(config, timestamps):
np.random.seed(config['generation']['seed'] + 2)
num_plants = config['data_sources']['conventional_generation']['num_plants']
fuel_types = config['data_sources']['conventional_generation']['fuel_types']
params = config['conventional']
df_list = []
for i in range(num_plants):
plant_id = f"CONV_{i+1:03d}"
fuel_type = np.random.choice(fuel_types)
fuel_params = params[fuel_type]
capacity = np.random.uniform(*fuel_params['capacity_range'])
marginal_cost = np.random.uniform(*fuel_params['marginal_cost_range'])
heat_rate = np.random.uniform(6, 12) if fuel_type == 'gas' else np.random.uniform(8, 14)
n = len(timestamps)
hours = timestamps.hour + timestamps.minute / 60
if fuel_type == 'nuclear':
base_load = 0.9 * capacity
generation = base_load + np.random.normal(0, 0.01 * capacity, n)
elif fuel_type == 'gas':
peaking_pattern = 0.3 + 0.4 * np.sin(2 * np.pi * (hours - 12) / 24)
generation = peaking_pattern * capacity + np.random.normal(0, 0.05 * capacity, n)
else:
baseload_pattern = 0.5 + 0.2 * np.sin(2 * np.pi * hours / 24)
generation = baseload_pattern * capacity + np.random.normal(0, 0.03 * capacity, n)
generation = np.clip(generation, 0, capacity)
data = pd.DataFrame({
'timestamp': timestamps,
'plant_id': plant_id,
'fuel_type': fuel_type,
'generation_mw': generation,
'marginal_cost': marginal_cost,
'heat_rate': heat_rate
})
df_list.append(data)
return pd.concat(df_list, ignore_index=True)
def generate_data_center_data(config, timestamps):
np.random.seed(config['generation']['seed'] + 3)
num_centers = config['data_sources']['data_centers']['num_centers']
params = config['data_center']
df_list = []
locations = ['FR', 'BE', 'DE', 'NL', 'UK']
for i in range(num_centers):
data_center_id = f"DC_{i+1:03d}"
location = locations[i % len(locations)]
base_demand = np.random.uniform(*params['power_demand_range'])
price_sensitivity = np.random.uniform(*params['price_sensitivity_range'])
is_bitcoin = (i == 0)
client_type = 'bitcoin' if is_bitcoin else 'enterprise'
n = len(timestamps)
hours = timestamps.hour + timestamps.minute / 60
if is_bitcoin:
base_profile = 0.7 + 0.3 * np.random.randn(n)
else:
base_profile = 0.6 + 0.2 * np.sin(2 * np.pi * (hours - 12) / 24)
demand = base_demand * base_profile
demand = np.maximum(demand * 0.5, demand)
max_bid = base_demand * price_sensitivity * (0.8 + 0.4 * np.random.rand(n))
data = pd.DataFrame({
'timestamp': timestamps,
'data_center_id': data_center_id,
'location': location,
'power_demand_mw': demand,
'max_bid_price': max_bid,
'client_type': client_type
})
df_list.append(data)
return pd.concat(df_list, ignore_index=True)
def apply_noise_and_outliers(df, config):
if not config['generation']['add_noise']:
return df
noise_level = config['generation']['noise_level']
outlier_rate = config['generation']['outlier_rate']
for col in df.select_dtypes(include=[np.number]).columns:
if col == 'timestamp':
continue
noise = np.random.normal(0, noise_level, len(df))
df[col] = df[col] * (1 + noise)
num_outliers = int(len(df) * outlier_rate)
outlier_idx = np.random.choice(len(df), num_outliers, replace=False)
df.loc[outlier_idx, col] = df.loc[outlier_idx, col] * np.random.uniform(0.5, 2.0, num_outliers)
return df
def add_missing_values(df, config):
if not config['generation']['include_missing_values']:
return df
missing_rate = config['generation']['missing_rate']
for col in df.select_dtypes(include=[np.number]).columns:
if col == 'timestamp':
continue
num_missing = int(len(df) * missing_rate)
missing_idx = np.random.choice(len(df), num_missing, replace=False)
df.loc[missing_idx, col] = np.nan
return df
def save_metadata(datasets, output_dir):
metadata = {
'generated_at': datetime.utcnow().isoformat(),
'datasets': {}
}
for name, df in datasets.items():
metadata['datasets'][name] = {
'rows': len(df),
'columns': len(df.columns),
'memory_usage_mb': df.memory_usage(deep=True).sum() / 1024 / 1024,
'dtypes': {col: str(dtype) for col, dtype in df.dtypes.items()},
'columns': list(df.columns)
}
output_path = Path(output_dir) / 'metadata' / 'generation_metadata.json'
with open(output_path, 'w') as f:
json.dump(metadata, f, indent=2, default=str)
return metadata
def main():
config = load_config()
time_config = config['time_range']
timestamps = generate_timestamps(
time_config['start_date'],
time_config['end_date'],
time_config['granularity']
)
print(f"Generating synthetic data for {len(timestamps)} timestamps...")
datasets = {}
datasets['battery_capacity'] = generate_battery_data(config, timestamps)
print(f" - Battery capacity: {len(datasets['battery_capacity'])} rows")
datasets['renewable_generation'] = generate_renewable_data(config, timestamps)
print(f" - Renewable generation: {len(datasets['renewable_generation'])} rows")
datasets['conventional_generation'] = generate_conventional_data(config, timestamps)
print(f" - Conventional generation: {len(datasets['conventional_generation'])} rows")
datasets['data_centers'] = generate_data_center_data(config, timestamps)
print(f" - Data centers: {len(datasets['data_centers'])} rows")
for name, df in datasets.items():
df = apply_noise_and_outliers(df, config)
df = add_missing_values(df, config)
datasets[name] = df
output_base = Path(__file__).parent.parent / 'data'
output_base.mkdir(parents=True, exist_ok=True)
raw_dir = output_base / 'raw'
raw_dir.mkdir(parents=True, exist_ok=True)
for name, df in datasets.items():
file_path = raw_dir / f'{name}_raw.parquet'
df.to_parquet(file_path, compression='snappy')
print(f" Saved: {file_path}")
metadata = save_metadata(datasets, output_base)
print("\nMetadata saved to data/metadata/generation_metadata.json")
print(f"Total datasets generated: {len(datasets)}")
return datasets
if __name__ == '__main__':
main()