Files
energy-test-data/scripts/02_fetch_historical.py
kbt-devops faaadc1297 Add transmission datasets and update mining data
Add two new static datasets for cross-region arbitrage calculations:
- transmission_capacity: region-to-region capacity limits (20 rows)
- transmission_cost: transmission costs per path (20 rows)

Update mining dataset with EUR pricing and power metrics:
- Change btc_price_usd to btc_price_eur
- Add power_efficiency_th_per_mw, power_demand_mw
- Add revenue_eur_per_mwh, profit_eur_per_mwh
- Remove mining_profitability column

Changes include:
- scripts/02_fetch_historical.py: rewrite fetch_bitcoin_mining_data()
- scripts/01_generate_synthetic.py: add transmission data generators
- config/data_config.yaml: add transmission config, update bitcoin config
- config/schema.yaml: add 2 new schemas, update bitcoin_mining schema
- scripts/03_process_merge.py: add 2 new datasets
- scripts/04_validate.py: add 2 new datasets
- test/test_data.py: update for new datasets and bitcoin price reference

Total datasets: 9 (734,491 rows, 17.89 MB)
2026-02-11 01:09:33 +07:00

230 lines
7.3 KiB
Python

"""
Fetch historical data for energy trading strategy test data.
Handles: electricity prices, bitcoin mining data, load profiles.
"""
import yaml
import numpy as np
import pandas as pd
from pathlib import Path
from datetime import datetime, timedelta
import requests
import json
import time
def load_config():
config_path = Path(__file__).parent.parent / "config" / "data_config.yaml"
with open(config_path) as f:
return yaml.safe_load(f)
def generate_timestamps(start_date, end_date, granularity):
start = pd.to_datetime(start_date)
end = pd.to_datetime(end_date)
return pd.date_range(start=start, end=end, freq=granularity)
def fetch_electricity_prices(config, timestamps):
np.random.seed(config['generation']['seed'] + 10)
regions = config['regions']
print(f"Fetching electricity prices for {len(regions)} regions...")
df_list = []
for region in regions:
n = len(timestamps)
hours = timestamps.hour + timestamps.minute / 60
days = timestamps.dayofyear
if region == 'FR':
base_price = 80
volatility = 30
elif region == 'DE':
base_price = 90
volatility = 40
elif region == 'NL':
base_price = 85
volatility = 35
elif region == 'BE':
base_price = 82
volatility = 32
else:
base_price = 100
volatility = 50
day_ahead = base_price + volatility * np.sin(2 * np.pi * hours / 24) + np.random.normal(0, 10, n)
real_time = day_ahead + np.random.normal(0, 20, n)
price_spikes = np.random.random(n) < 0.02
real_time = np.array(real_time)
real_time[price_spikes] += np.random.uniform(100, 500, int(np.sum(price_spikes)))
capacity_price = np.abs(np.random.normal(5, 2, n))
regulation_price = np.abs(np.random.normal(3, 1, n))
volume = np.random.uniform(1000, 5000, n)
data = pd.DataFrame({
'timestamp': timestamps,
'region': region,
'day_ahead_price': day_ahead,
'real_time_price': real_time,
'capacity_price': capacity_price,
'regulation_price': regulation_price,
'volume_mw': volume
})
df_list.append(data)
return pd.concat(df_list, ignore_index=True)
def fetch_bitcoin_mining_data(config, timestamps):
np.random.seed(config['generation']['seed'] + 11)
print(f"Fetching bitcoin mining data from mempool.space (simulated)...")
n = len(timestamps)
try:
btc_api = "https://mempool.space/api/v1/fees/recommended"
response = requests.get(btc_api, timeout=10)
if response.status_code == 200:
fees = response.json()
else:
pass
except:
pass
btc_params = config['bitcoin']
btc_eur_trend = np.linspace(0.95, 1.05, n)
btc_daily_volatility = np.cumsum(np.random.normal(0, 0.01, n)) + 1
btc_daily_volatility = btc_daily_volatility / btc_daily_volatility[0]
base_btc_price_eur = 41400
btc_price_eur = base_btc_price_eur * btc_eur_trend * btc_daily_volatility * (1 + 0.03 * np.random.randn(n))
hashrate_base = np.random.uniform(*btc_params['hashrate_range'])
hashrate = hashrate_base * (1 + 0.05 * np.sin(2 * np.pi * np.arange(n) / (n / 10))) * (1 + 0.02 * np.random.randn(n))
power_efficiency = np.random.uniform(*btc_params['power_efficiency_range'])
power_demand = hashrate / power_efficiency
mining_profitability = (btc_price_eur * 0.0001 / 3.6) / (power_efficiency / 1000)
revenue_eur_per_mwh = mining_profitability * power_efficiency * 24
electricity_breakeven = 40 + np.random.normal(0, 5, n)
profit_eur_per_mwh = revenue_eur_per_mwh - electricity_breakeven
data = pd.DataFrame({
'timestamp': timestamps,
'pool_id': 'POOL_001',
'hashrate_ths': hashrate,
'btc_price_eur': btc_price_eur,
'power_efficiency_th_per_mw': power_efficiency,
'power_demand_mw': power_demand,
'revenue_eur_per_mwh': revenue_eur_per_mwh,
'profit_eur_per_mwh': profit_eur_per_mwh,
'electricity_cost': electricity_breakeven
})
return data
def fetch_load_profiles(config, timestamps):
np.random.seed(config['generation']['seed'] + 12)
regions = config['regions']
print(f"Fetching load profiles for {len(regions)} regions...")
df_list = []
for region in regions:
n = len(timestamps)
hours = timestamps.hour + timestamps.minute / 60
day_of_year = timestamps.dayofyear
if region == 'FR':
base_load = 60000
peak_hours = [10, 20]
elif region == 'DE':
base_load = 70000
peak_hours = [9, 19]
elif region == 'NL':
base_load = 15000
peak_hours = [11, 21]
elif region == 'BE':
base_load = 12000
peak_hours = [10, 20]
else:
base_load = 45000
peak_hours = [9, 19]
daily_pattern = 0.7 + 0.3 * np.exp(-0.5 * ((hours - 18) / 4) ** 2)
seasonal_pattern = 0.8 + 0.2 * np.sin(2 * np.pi * (day_of_year - 15) / 365)
load = base_load * daily_pattern * seasonal_pattern * (1 + 0.05 * np.random.randn(n))
forecast = load * (1 + np.random.normal(0, 0.03, n))
temp = 15 + 15 * np.sin(2 * np.pi * (day_of_year - 15) / 365) + np.random.normal(0, 3, n)
humidity = 60 + 20 * np.sin(2 * np.pi * (day_of_year - 15) / 365) + np.random.normal(0, 10, n)
data = pd.DataFrame({
'timestamp': timestamps,
'region': region,
'load_mw': load,
'forecast_mw': forecast,
'weather_temp': temp,
'humidity': humidity
})
df_list.append(data)
return pd.concat(df_list, ignore_index=True)
def save_raw_data(datasets, output_dir):
output_path = Path(output_dir) / 'raw'
output_path.mkdir(parents=True, exist_ok=True)
saved = {}
for name, df in datasets.items():
file_path = output_path / f'{name}_raw.parquet'
df.to_parquet(file_path, compression='snappy')
saved[name] = str(file_path)
print(f" Saved: {file_path}")
return saved
def main():
config = load_config()
time_config = config['time_range']
timestamps = generate_timestamps(
time_config['start_date'],
time_config['end_date'],
time_config['granularity']
)
print(f"Fetching historical data for {len(timestamps)} timestamps...")
datasets = {}
datasets['electricity_prices'] = fetch_electricity_prices(config, timestamps)
print(f" - Electricity prices: {len(datasets['electricity_prices'])} rows")
datasets['bitcoin_mining'] = fetch_bitcoin_mining_data(config, timestamps)
print(f" - Bitcoin mining: {len(datasets['bitcoin_mining'])} rows")
datasets['load_profiles'] = fetch_load_profiles(config, timestamps)
print(f" - Load profiles: {len(datasets['load_profiles'])} rows")
output_base = Path(__file__).parent.parent / 'data'
saved_files = save_raw_data(datasets, output_base)
print(f"\nSaved {len(datasets)} historical datasets to data/raw/")
return datasets
if __name__ == '__main__':
main()