""" Process and merge all datasets, apply compression, and save to Parquet format. """ import yaml import numpy as np import pandas as pd from pathlib import Path import json import sys def load_config(): config_path = Path(__file__).parent.parent / "config" / "data_config.yaml" with open(config_path) as f: return yaml.safe_load(f) def load_dataset(dataset_name, data_base): synthetic_path = data_base / 'metadata' / 'generation_metadata.json' df_list = [] raw_path = data_base / 'raw' / f'{dataset_name}_raw.parquet' if raw_path.exists(): print(f" Loading {dataset_name} from raw data...") df = pd.read_parquet(raw_path) df_list.append(df) print(f" Total rows for {dataset_name}: {len(pd.concat(df_list, ignore_index=True)) if df_list else 0}") return pd.concat(df_list, ignore_index=True) if df_list else None def downgrade_precision(df, config): precision = config['output'].get('precision', 'float32') for col in df.select_dtypes(include=['float64']).columns: if col == 'timestamp': continue df[col] = df[col].astype(precision) for col in df.select_dtypes(include=['int64']).columns: if col == 'timestamp': continue df[col] = df[col].astype('int32') return df def convert_categoricals(df): for col in df.select_dtypes(include=['object']).columns: if col == 'timestamp': continue if df[col].nunique() < df.shape[0] * 0.5: df[col] = df[col].astype('category') return df def optimize_memory(df): start_mem = df.memory_usage(deep=True).sum() / 1024 / 1024 df = downgrade_precision(df, {'output': {'precision': 'float32'}}) df = convert_categoricals(df) end_mem = df.memory_usage(deep=True).sum() / 1024 / 1024 reduction = (1 - end_mem / start_mem) * 100 print(f" Memory: {start_mem:.2f}MB -> {end_mem:.2f}MB ({reduction:.1f}% reduction)") return df def save_processed_dataset(df, dataset_name, output_dir, config): output_path = Path(output_dir) / f'{dataset_name}.parquet' compression = config['output'].get('compression', 'snappy') df.to_parquet(output_path, compression=compression, index=False) file_size_mb = output_path.stat().st_size / 1024 / 1024 print(f" Saved: {output_path} ({file_size_mb:.2f}MB)") return { 'path': str(output_path), 'size_mb': file_size_mb, 'rows': len(df), 'columns': len(df.columns) } def validate_timestamps(df, dataset_name): if 'timestamp' not in df.columns: print(f" Warning: {dataset_name} has no timestamp column") return False df['timestamp'] = pd.to_datetime(df['timestamp']) duplicates = df['timestamp'].duplicated().sum() if duplicates > 0: print(f" Warning: {dataset_name} has {duplicates} duplicate timestamps") return True def generate_final_metadata(processed_info, output_dir): metadata = { 'processed_at': pd.Timestamp.utcnow().isoformat(), 'total_datasets': len(processed_info), 'total_size_mb': sum(info['size_mb'] for info in processed_info.values()), 'datasets': processed_info } output_path = Path(output_dir) / 'metadata' / 'final_metadata.json' with open(output_path, 'w') as f: json.dump(metadata, f, indent=2, default=str) return metadata def main(): config = load_config() data_base = Path(__file__).parent.parent / 'data' processed_dir = data_base / 'processed' processed_dir.mkdir(parents=True, exist_ok=True) print("Processing and merging datasets...") datasets = [ 'electricity_prices', 'battery_capacity', 'renewable_generation', 'conventional_generation', 'load_profiles', 'data_centers', 'bitcoin_mining' ] processed_info = {} for dataset_name in datasets: print(f"\nProcessing {dataset_name}...") df = load_dataset(dataset_name, data_base) if df is None: print(f" Warning: {dataset_name} has no data, skipping") continue validate_timestamps(df, dataset_name) print(" Optimizing memory...") df = optimize_memory(df) info = save_processed_dataset(df, dataset_name, processed_dir, config) processed_info[dataset_name] = info print(f"\n{'='*60}") print("Processing complete!") print(f"{'='*60}") metadata = generate_final_metadata(processed_info, data_base) print(f"\nTotal datasets processed: {len(processed_info)}") print(f"Total size: {metadata['total_size_mb']:.2f}MB") print(f"Target size: {config['output']['target_size_mb']}MB") if metadata['total_size_mb'] > config['output']['target_size_mb']: print(f"Warning: Total size exceeds target by {metadata['total_size_mb'] - config['output']['target_size_mb']:.2f}MB") else: print("✓ Total size within target") print(f"\nProcessed data saved to: {processed_dir}") print(f"Metadata saved to: {data_base / 'metadata' / 'final_metadata.json'}") return processed_info if __name__ == '__main__': main()