Add two new static datasets for cross-region arbitrage calculations: - transmission_capacity: region-to-region capacity limits (20 rows) - transmission_cost: transmission costs per path (20 rows) Update mining dataset with EUR pricing and power metrics: - Change btc_price_usd to btc_price_eur - Add power_efficiency_th_per_mw, power_demand_mw - Add revenue_eur_per_mwh, profit_eur_per_mwh - Remove mining_profitability column Changes include: - scripts/02_fetch_historical.py: rewrite fetch_bitcoin_mining_data() - scripts/01_generate_synthetic.py: add transmission data generators - config/data_config.yaml: add transmission config, update bitcoin config - config/schema.yaml: add 2 new schemas, update bitcoin_mining schema - scripts/03_process_merge.py: add 2 new datasets - scripts/04_validate.py: add 2 new datasets - test/test_data.py: update for new datasets and bitcoin price reference Total datasets: 9 (734,491 rows, 17.89 MB)
175 lines
5.3 KiB
Python
175 lines
5.3 KiB
Python
"""
|
|
Process and merge all datasets, apply compression, and save to Parquet format.
|
|
"""
|
|
|
|
import yaml
|
|
import numpy as np
|
|
import pandas as pd
|
|
from pathlib import Path
|
|
import json
|
|
import sys
|
|
|
|
def load_config():
|
|
config_path = Path(__file__).parent.parent / "config" / "data_config.yaml"
|
|
with open(config_path) as f:
|
|
return yaml.safe_load(f)
|
|
|
|
def load_dataset(dataset_name, data_base):
|
|
synthetic_path = data_base / 'metadata' / 'generation_metadata.json'
|
|
|
|
df_list = []
|
|
|
|
raw_path = data_base / 'raw' / f'{dataset_name}_raw.parquet'
|
|
if raw_path.exists():
|
|
print(f" Loading {dataset_name} from raw data...")
|
|
df = pd.read_parquet(raw_path)
|
|
df_list.append(df)
|
|
|
|
print(f" Total rows for {dataset_name}: {len(pd.concat(df_list, ignore_index=True)) if df_list else 0}")
|
|
|
|
return pd.concat(df_list, ignore_index=True) if df_list else None
|
|
|
|
def downgrade_precision(df, config):
|
|
precision = config['output'].get('precision', 'float32')
|
|
|
|
for col in df.select_dtypes(include=['float64']).columns:
|
|
if col == 'timestamp':
|
|
continue
|
|
df[col] = df[col].astype(precision)
|
|
|
|
for col in df.select_dtypes(include=['int64']).columns:
|
|
if col == 'timestamp':
|
|
continue
|
|
df[col] = df[col].astype('int32')
|
|
|
|
return df
|
|
|
|
def convert_categoricals(df):
|
|
for col in df.select_dtypes(include=['object']).columns:
|
|
if col == 'timestamp':
|
|
continue
|
|
if df[col].nunique() < df.shape[0] * 0.5:
|
|
df[col] = df[col].astype('category')
|
|
|
|
return df
|
|
|
|
def optimize_memory(df):
|
|
start_mem = df.memory_usage(deep=True).sum() / 1024 / 1024
|
|
|
|
df = downgrade_precision(df, {'output': {'precision': 'float32'}})
|
|
df = convert_categoricals(df)
|
|
|
|
end_mem = df.memory_usage(deep=True).sum() / 1024 / 1024
|
|
|
|
reduction = (1 - end_mem / start_mem) * 100
|
|
print(f" Memory: {start_mem:.2f}MB -> {end_mem:.2f}MB ({reduction:.1f}% reduction)")
|
|
|
|
return df
|
|
|
|
def save_processed_dataset(df, dataset_name, output_dir, config):
|
|
output_path = Path(output_dir) / f'{dataset_name}.parquet'
|
|
|
|
compression = config['output'].get('compression', 'snappy')
|
|
|
|
df.to_parquet(output_path, compression=compression, index=False)
|
|
|
|
file_size_mb = output_path.stat().st_size / 1024 / 1024
|
|
print(f" Saved: {output_path} ({file_size_mb:.2f}MB)")
|
|
|
|
return {
|
|
'path': str(output_path),
|
|
'size_mb': file_size_mb,
|
|
'rows': len(df),
|
|
'columns': len(df.columns)
|
|
}
|
|
|
|
def validate_timestamps(df, dataset_name):
|
|
if 'timestamp' not in df.columns:
|
|
print(f" Warning: {dataset_name} has no timestamp column")
|
|
return False
|
|
|
|
df['timestamp'] = pd.to_datetime(df['timestamp'])
|
|
duplicates = df['timestamp'].duplicated().sum()
|
|
|
|
if duplicates > 0:
|
|
print(f" Warning: {dataset_name} has {duplicates} duplicate timestamps")
|
|
|
|
return True
|
|
|
|
def generate_final_metadata(processed_info, output_dir):
|
|
metadata = {
|
|
'processed_at': pd.Timestamp.utcnow().isoformat(),
|
|
'total_datasets': len(processed_info),
|
|
'total_size_mb': sum(info['size_mb'] for info in processed_info.values()),
|
|
'datasets': processed_info
|
|
}
|
|
|
|
output_path = Path(output_dir) / 'metadata' / 'final_metadata.json'
|
|
with open(output_path, 'w') as f:
|
|
json.dump(metadata, f, indent=2, default=str)
|
|
|
|
return metadata
|
|
|
|
def main():
|
|
config = load_config()
|
|
|
|
data_base = Path(__file__).parent.parent / 'data'
|
|
processed_dir = data_base / 'processed'
|
|
processed_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
print("Processing and merging datasets...")
|
|
|
|
datasets = [
|
|
'electricity_prices',
|
|
'battery_capacity',
|
|
'renewable_generation',
|
|
'conventional_generation',
|
|
'load_profiles',
|
|
'data_centers',
|
|
'bitcoin_mining',
|
|
'transmission_capacity',
|
|
'transmission_cost'
|
|
]
|
|
|
|
processed_info = {}
|
|
|
|
for dataset_name in datasets:
|
|
print(f"\nProcessing {dataset_name}...")
|
|
|
|
df = load_dataset(dataset_name, data_base)
|
|
|
|
if df is None:
|
|
print(f" Warning: {dataset_name} has no data, skipping")
|
|
continue
|
|
|
|
validate_timestamps(df, dataset_name)
|
|
|
|
print(" Optimizing memory...")
|
|
df = optimize_memory(df)
|
|
|
|
info = save_processed_dataset(df, dataset_name, processed_dir, config)
|
|
processed_info[dataset_name] = info
|
|
|
|
print(f"\n{'='*60}")
|
|
print("Processing complete!")
|
|
print(f"{'='*60}")
|
|
|
|
metadata = generate_final_metadata(processed_info, data_base)
|
|
|
|
print(f"\nTotal datasets processed: {len(processed_info)}")
|
|
print(f"Total size: {metadata['total_size_mb']:.2f}MB")
|
|
print(f"Target size: {config['output']['target_size_mb']}MB")
|
|
|
|
if metadata['total_size_mb'] > config['output']['target_size_mb']:
|
|
print(f"Warning: Total size exceeds target by {metadata['total_size_mb'] - config['output']['target_size_mb']:.2f}MB")
|
|
else:
|
|
print("✓ Total size within target")
|
|
|
|
print(f"\nProcessed data saved to: {processed_dir}")
|
|
print(f"Metadata saved to: {data_base / 'metadata' / 'final_metadata.json'}")
|
|
|
|
return processed_info
|
|
|
|
if __name__ == '__main__':
|
|
main()
|