Files
energy-test-data/scripts/03_process_merge.py
kbt-devops faaadc1297 Add transmission datasets and update mining data
Add two new static datasets for cross-region arbitrage calculations:
- transmission_capacity: region-to-region capacity limits (20 rows)
- transmission_cost: transmission costs per path (20 rows)

Update mining dataset with EUR pricing and power metrics:
- Change btc_price_usd to btc_price_eur
- Add power_efficiency_th_per_mw, power_demand_mw
- Add revenue_eur_per_mwh, profit_eur_per_mwh
- Remove mining_profitability column

Changes include:
- scripts/02_fetch_historical.py: rewrite fetch_bitcoin_mining_data()
- scripts/01_generate_synthetic.py: add transmission data generators
- config/data_config.yaml: add transmission config, update bitcoin config
- config/schema.yaml: add 2 new schemas, update bitcoin_mining schema
- scripts/03_process_merge.py: add 2 new datasets
- scripts/04_validate.py: add 2 new datasets
- test/test_data.py: update for new datasets and bitcoin price reference

Total datasets: 9 (734,491 rows, 17.89 MB)
2026-02-11 01:09:33 +07:00

175 lines
5.3 KiB
Python

"""
Process and merge all datasets, apply compression, and save to Parquet format.
"""
import yaml
import numpy as np
import pandas as pd
from pathlib import Path
import json
import sys
def load_config():
config_path = Path(__file__).parent.parent / "config" / "data_config.yaml"
with open(config_path) as f:
return yaml.safe_load(f)
def load_dataset(dataset_name, data_base):
synthetic_path = data_base / 'metadata' / 'generation_metadata.json'
df_list = []
raw_path = data_base / 'raw' / f'{dataset_name}_raw.parquet'
if raw_path.exists():
print(f" Loading {dataset_name} from raw data...")
df = pd.read_parquet(raw_path)
df_list.append(df)
print(f" Total rows for {dataset_name}: {len(pd.concat(df_list, ignore_index=True)) if df_list else 0}")
return pd.concat(df_list, ignore_index=True) if df_list else None
def downgrade_precision(df, config):
precision = config['output'].get('precision', 'float32')
for col in df.select_dtypes(include=['float64']).columns:
if col == 'timestamp':
continue
df[col] = df[col].astype(precision)
for col in df.select_dtypes(include=['int64']).columns:
if col == 'timestamp':
continue
df[col] = df[col].astype('int32')
return df
def convert_categoricals(df):
for col in df.select_dtypes(include=['object']).columns:
if col == 'timestamp':
continue
if df[col].nunique() < df.shape[0] * 0.5:
df[col] = df[col].astype('category')
return df
def optimize_memory(df):
start_mem = df.memory_usage(deep=True).sum() / 1024 / 1024
df = downgrade_precision(df, {'output': {'precision': 'float32'}})
df = convert_categoricals(df)
end_mem = df.memory_usage(deep=True).sum() / 1024 / 1024
reduction = (1 - end_mem / start_mem) * 100
print(f" Memory: {start_mem:.2f}MB -> {end_mem:.2f}MB ({reduction:.1f}% reduction)")
return df
def save_processed_dataset(df, dataset_name, output_dir, config):
output_path = Path(output_dir) / f'{dataset_name}.parquet'
compression = config['output'].get('compression', 'snappy')
df.to_parquet(output_path, compression=compression, index=False)
file_size_mb = output_path.stat().st_size / 1024 / 1024
print(f" Saved: {output_path} ({file_size_mb:.2f}MB)")
return {
'path': str(output_path),
'size_mb': file_size_mb,
'rows': len(df),
'columns': len(df.columns)
}
def validate_timestamps(df, dataset_name):
if 'timestamp' not in df.columns:
print(f" Warning: {dataset_name} has no timestamp column")
return False
df['timestamp'] = pd.to_datetime(df['timestamp'])
duplicates = df['timestamp'].duplicated().sum()
if duplicates > 0:
print(f" Warning: {dataset_name} has {duplicates} duplicate timestamps")
return True
def generate_final_metadata(processed_info, output_dir):
metadata = {
'processed_at': pd.Timestamp.utcnow().isoformat(),
'total_datasets': len(processed_info),
'total_size_mb': sum(info['size_mb'] for info in processed_info.values()),
'datasets': processed_info
}
output_path = Path(output_dir) / 'metadata' / 'final_metadata.json'
with open(output_path, 'w') as f:
json.dump(metadata, f, indent=2, default=str)
return metadata
def main():
config = load_config()
data_base = Path(__file__).parent.parent / 'data'
processed_dir = data_base / 'processed'
processed_dir.mkdir(parents=True, exist_ok=True)
print("Processing and merging datasets...")
datasets = [
'electricity_prices',
'battery_capacity',
'renewable_generation',
'conventional_generation',
'load_profiles',
'data_centers',
'bitcoin_mining',
'transmission_capacity',
'transmission_cost'
]
processed_info = {}
for dataset_name in datasets:
print(f"\nProcessing {dataset_name}...")
df = load_dataset(dataset_name, data_base)
if df is None:
print(f" Warning: {dataset_name} has no data, skipping")
continue
validate_timestamps(df, dataset_name)
print(" Optimizing memory...")
df = optimize_memory(df)
info = save_processed_dataset(df, dataset_name, processed_dir, config)
processed_info[dataset_name] = info
print(f"\n{'='*60}")
print("Processing complete!")
print(f"{'='*60}")
metadata = generate_final_metadata(processed_info, data_base)
print(f"\nTotal datasets processed: {len(processed_info)}")
print(f"Total size: {metadata['total_size_mb']:.2f}MB")
print(f"Target size: {config['output']['target_size_mb']}MB")
if metadata['total_size_mb'] > config['output']['target_size_mb']:
print(f"Warning: Total size exceeds target by {metadata['total_size_mb'] - config['output']['target_size_mb']:.2f}MB")
else:
print("✓ Total size within target")
print(f"\nProcessed data saved to: {processed_dir}")
print(f"Metadata saved to: {data_base / 'metadata' / 'final_metadata.json'}")
return processed_info
if __name__ == '__main__':
main()