From a6437673598049f671a31248fc056cf89b856a3e Mon Sep 17 00:00:00 2001 From: kbt-devops Date: Tue, 10 Feb 2026 23:28:23 +0700 Subject: [PATCH] Initial commit: Energy test data generation pipeline Add complete test data preparation system for energy trading strategy demo. Includes configuration, data generation scripts, and validation tools for 7 datasets covering electricity prices, battery capacity, renewable/conventional generation, load profiles, data centers, and mining data. Excluded from git: Actual parquet data files (data/raw/, data/processed/) can be regenerated using the provided scripts. Datasets: - electricity_prices: Day-ahead and real-time prices (5 regions) - battery_capacity: Storage system charge/discharge cycles - renewable_generation: Solar, wind, hydro with forecast errors - conventional_generation: Gas, coal, nuclear plant outputs - load_profiles: Regional demand with weather correlations - data_centers: Power demand profiles including mining operations - mining_data: Hashrate, price, profitability (mempool.space API) --- .gitignore | 46 ++++ README.md | 124 ++++++++++ config/data_config.yaml | 96 ++++++++ config/schema.yaml | 233 ++++++++++++++++++ data/metadata/final_metadata.json | 49 ++++ data/metadata/generation_metadata.json | 89 +++++++ data/metadata/validation_report.json | 239 ++++++++++++++++++ requirements.txt | 7 + scripts/01_generate_synthetic.py | 320 +++++++++++++++++++++++++ scripts/02_fetch_historical.py | 222 +++++++++++++++++ scripts/03_process_merge.py | 172 +++++++++++++ scripts/04_validate.py | 272 +++++++++++++++++++++ 12 files changed, 1869 insertions(+) create mode 100644 .gitignore create mode 100644 README.md create mode 100644 config/data_config.yaml create mode 100644 config/schema.yaml create mode 100644 data/metadata/final_metadata.json create mode 100644 data/metadata/generation_metadata.json create mode 100644 data/metadata/validation_report.json create mode 100644 requirements.txt create mode 100644 scripts/01_generate_synthetic.py create mode 100644 scripts/02_fetch_historical.py create mode 100644 scripts/03_process_merge.py create mode 100644 scripts/04_validate.py diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..41ab745 --- /dev/null +++ b/.gitignore @@ -0,0 +1,46 @@ +# Data files - exclude from git +data/raw/*.parquet +data/processed/*.parquet + +# Python artifacts +__pycache__/ +test/__pycache__/ +*.py[cod] +*$py.class +*.so +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +*.egg-info/ +.installed.cfg +*.egg + +# Virtual environments +venv/ +ENV/ +env/ +.venv/ + +# IDE +.vscode/ +.idea/ +*.swp +*.swo +*~ + +# OS +.DS_Store +Thumbs.db + +# Logs +*.log diff --git a/README.md b/README.md new file mode 100644 index 0000000..65454b0 --- /dev/null +++ b/README.md @@ -0,0 +1,124 @@ +# Energy Test Data + +Preparation of test data for energy trading strategy demo. + +## Overview + +This project generates and processes realistic test data for energy trading strategies, including: + +- **Electricity Prices**: Day-ahead and real-time market prices for European regions (FR, BE, DE, NL, UK) +- **Battery Capacity**: Storage system states with charge/discharge cycles +- **Renewable Generation**: Solar, wind, and hydro generation with forecast errors +- **Conventional Generation**: Gas, coal, and nuclear plant outputs +- **Load Profiles**: Regional electricity demand with weather correlations +- **Data Centers**: Power demand profiles including Bitcoin mining client +- **Bitcoin Mining**: Hashrate, price, and profitability data (from mempool.space) + +## Project Structure + +``` +energy-test-data/ +├── data/ +│ ├── processed/ # Final Parquet files (<200MB total) +│ ├── raw/ # Unprocessed source data +│ └── metadata/ # Data documentation and reports +├── scripts/ +│ ├── 01_generate_synthetic.py # Generate synthetic data +│ ├── 02_fetch_historical.py # Fetch historical data +│ ├── 03_process_merge.py # Process and compress +│ └── 04_validate.py # Validate and report +├── config/ +│ ├── data_config.yaml # Configuration parameters +│ └── schema.yaml # Data schema definitions +├── requirements.txt +└── README.md +``` + +## Installation + +```bash +pip install -r requirements.txt +``` + +## Usage + +### Generate all test data + +Run scripts in sequence: + +```bash +python scripts/01_generate_synthetic.py +python scripts/02_fetch_historical.py +python scripts/03_process_merge.py +python scripts/04_validate.py +``` + +Or run all at once: + +```bash +python scripts/01_generate_synthetic.py && \ +python scripts/02_fetch_historical.py && \ +python scripts/03_process_merge.py && \ +python scripts/04_validate.py +``` + +### Individual scripts + +**01_generate_synthetic.py**: Creates synthetic data for battery systems, renewable generation, conventional generation, and data centers. + +**02_fetch_historical.py**: Fetches electricity prices, Bitcoin mining data, and load profiles from public APIs (or generates realistic synthetic data when APIs are unavailable). + +**03_process_merge.py**: Merges datasets, optimizes memory usage, and saves to compressed Parquet format. + +**04_validate.py**: Validates data quality, checks for missing values and outliers, and generates validation reports. + +## Configuration + +Edit `config/data_config.yaml` to customize: + +- **Time range**: Start/end dates and granularity +- **Regions**: Market regions to include +- **Data sources**: Synthetic vs historical for each dataset +- **Generation parameters**: Noise levels, outlier rates, missing value rates +- **Battery parameters**: Capacity ranges, efficiency, degradation +- **Generation parameters**: Plant capacities, marginal costs +- **Bitcoin parameters**: Hashrate ranges, mining efficiency + +## Data Specifications + +| Dataset | Time Range | Rows (10d × 1min) | Est. Size | +|---------|-----------|-------------------|-----------| +| electricity_prices | 10 days | 72,000 | ~40MB | +| battery_capacity | 10 days | 144,000 | ~20MB | +| renewable_generation | 10 days | 216,000 | ~35MB | +| conventional_generation | 10 days | 144,000 | ~25MB | +| load_profiles | 10 days | 72,000 | ~30MB | +| data_centers | 10 days | 72,000 | ~15MB | +| bitcoin_mining | 10 days | 14,400 | ~20MB | +| **Total** | | | **~185MB** | + +## Output Format + +All processed datasets are saved as Parquet files with Snappy compression in `data/processed/`. + +To read a dataset: + +```python +import pandas as pd + +df = pd.read_parquet('data/processed/electricity_prices.parquet') +print(df.head()) +``` + +## Data Sources + +- **Electricity Prices**: Hybrid (synthetic patterns based on EPEX Spot market characteristics) +- **Bitcoin Mining**: Hybrid (mempool.space API + synthetic patterns) +- **Load Profiles**: Hybrid (ENTSO-E transparency platform patterns + synthetic) + +## Validation Reports + +After processing, validation reports are generated in `data/metadata/`: + +- `validation_report.json`: Data quality checks, missing values, range violations +- `final_metadata.json`: Dataset sizes, row counts, processing details diff --git a/config/data_config.yaml b/config/data_config.yaml new file mode 100644 index 0000000..7446c98 --- /dev/null +++ b/config/data_config.yaml @@ -0,0 +1,96 @@ +# Energy Test Data Configuration +# For energy trading strategy demo + +time_range: + # Last 10 days from current date (adjustable) + start_date: "2026-01-31" + end_date: "2026-02-10" + granularity: "1min" # 1-minute intervals + +regions: + # European energy markets + - "FR" # France + - "BE" # Belgium + - "DE" # Germany + - "NL" # Netherlands + - "UK" # United Kingdom + +data_sources: + electricity_prices: + type: "hybrid" + historical_source: "epex_spot" + synthetic_patterns: true + battery_capacity: + type: "synthetic" + num_batteries: 10 + renewable_generation: + type: "synthetic" + plants_per_source: 5 + sources: ["solar", "wind", "hydro"] + conventional_generation: + type: "synthetic" + num_plants: 10 + fuel_types: ["gas", "coal", "nuclear"] + load_profiles: + type: "synthetic" + historical_source: "entso_e" + data_centers: + type: "synthetic" + num_centers: 5 + special_client: "bitcoin" + bitcoin_mining: + type: "hybrid" + historical_source: "mempool.space" + synthetic_patterns: true + +output: + format: "parquet" + compression: "snappy" + target_size_mb: 200 + precision: "float32" + +generation: + seed: 42 + add_noise: true + noise_level: 0.05 + include_outliers: true + outlier_rate: 0.01 + include_missing_values: true + missing_rate: 0.005 + +battery: + capacity_range: [10, 100] # MWh + charge_rate_range: [5, 50] # MW + discharge_rate_range: [5, 50] # MW + efficiency_range: [0.85, 0.95] + degradation_rate: 0.001 + +renewable: + solar: + capacity_range: [50, 500] # MW + forecast_error_sd: 0.15 + wind: + capacity_range: [100, 800] # MW + forecast_error_sd: 0.20 + hydro: + capacity_range: [50, 300] # MW + forecast_error_sd: 0.05 + +conventional: + gas: + capacity_range: [200, 1000] # MW + marginal_cost_range: [30, 80] # EUR/MWh + coal: + capacity_range: [300, 1500] # MW + marginal_cost_range: [40, 70] # EUR/MWh + nuclear: + capacity_range: [800, 1600] # MW + marginal_cost_range: [10, 30] # EUR/MWh + +data_center: + power_demand_range: [10, 100] # MW + price_sensitivity_range: [0.8, 1.2] + +bitcoin: + hashrate_range: [150, 250] # EH/s + mining_efficiency_range: [25, 35] # J/TH diff --git a/config/schema.yaml b/config/schema.yaml new file mode 100644 index 0000000..77ae922 --- /dev/null +++ b/config/schema.yaml @@ -0,0 +1,233 @@ +# Schema definitions for energy test data datasets + +schemas: + electricity_prices: + columns: + - name: "timestamp" + type: "datetime64[ns]" + description: "Timestamp of price observation" + - name: "region" + type: "category" + description: "Market region code" + - name: "day_ahead_price" + type: "float32" + unit: "EUR/MWh" + description: "Day-ahead market clearing price" + - name: "real_time_price" + type: "float32" + unit: "EUR/MWh" + description: "Real-time market price" + - name: "capacity_price" + type: "float32" + unit: "EUR/MWh" + description: "Capacity market price" + - name: "regulation_price" + type: "float32" + unit: "EUR/MWh" + description: "Frequency regulation price" + - name: "volume_mw" + type: "float32" + unit: "MW" + description: "Traded volume" + + battery_capacity: + columns: + - name: "timestamp" + type: "datetime64[ns]" + description: "Timestamp of battery state" + - name: "battery_id" + type: "category" + description: "Unique battery identifier" + - name: "capacity_mwh" + type: "float32" + unit: "MWh" + description: "Total storage capacity" + - name: "charge_level_mwh" + type: "float32" + unit: "MWh" + description: "Current energy stored" + - name: "charge_rate_mw" + type: "float32" + unit: "MW" + description: "Current charging rate (positive) or discharging (negative)" + - name: "discharge_rate_mw" + type: "float32" + unit: "MW" + description: "Maximum discharge rate" + - name: "efficiency" + type: "float32" + description: "Round-trip efficiency (0-1)" + + renewable_generation: + columns: + - name: "timestamp" + type: "datetime64[ns]" + description: "Timestamp of generation measurement" + - name: "source" + type: "category" + description: "Renewable source type (solar, wind, hydro)" + - name: "plant_id" + type: "category" + description: "Unique plant identifier" + - name: "generation_mw" + type: "float32" + unit: "MW" + description: "Actual generation output" + - name: "forecast_mw" + type: "float32" + unit: "MW" + description: "Forecasted generation" + - name: "actual_mw" + type: "float32" + unit: "MW" + description: "Actual measured generation (after correction)" + - name: "capacity_factor" + type: "float32" + description: "Capacity utilization factor (0-1)" + + conventional_generation: + columns: + - name: "timestamp" + type: "datetime64[ns]" + description: "Timestamp of generation measurement" + - name: "plant_id" + type: "category" + description: "Unique plant identifier" + - name: "fuel_type" + type: "category" + description: "Primary fuel type (gas, coal, nuclear)" + - name: "generation_mw" + type: "float32" + unit: "MW" + description: "Current generation output" + - name: "marginal_cost" + type: "float32" + unit: "EUR/MWh" + description: "Short-run marginal cost" + - name: "heat_rate" + type: "float32" + unit: "MMBtu/MWh" + description: "Thermal efficiency metric" + + load_profiles: + columns: + - name: "timestamp" + type: "datetime64[ns]" + description: "Timestamp of load measurement" + - name: "region" + type: "category" + description: "Region code" + - name: "load_mw" + type: "float32" + unit: "MW" + description: "Actual system load" + - name: "forecast_mw" + type: "float32" + unit: "MW" + description: "Load forecast" + - name: "weather_temp" + type: "float32" + unit: "Celsius" + description: "Average temperature" + - name: "humidity" + type: "float32" + unit: "%" + description: "Relative humidity" + + data_centers: + columns: + - name: "timestamp" + type: "datetime64[ns]" + description: "Timestamp of demand measurement" + - name: "data_center_id" + type: "category" + description: "Data center identifier" + - name: "location" + type: "category" + description: "Geographic location" + - name: "power_demand_mw" + type: "float32" + unit: "MW" + description: "Current power demand" + - name: "max_bid_price" + type: "float32" + unit: "EUR/MWh" + description: "Maximum price willing to pay" + - name: "client_type" + type: "category" + description: "Client type (bitcoin, enterprise, etc.)" + + bitcoin_mining: + columns: + - name: "timestamp" + type: "datetime64[ns]" + description: "Timestamp of mining measurement" + - name: "pool_id" + type: "category" + description: "Mining pool identifier" + - name: "hashrate_ths" + type: "float32" + unit: "TH/s" + description: "Mining pool hashrate" + - name: "btc_price_usd" + type: "float32" + unit: "USD" + description: "Bitcoin price" + - name: "mining_profitability" + type: "float32" + unit: "USD/TH/day" + description: "Mining profitability per terahash per day" + - name: "electricity_cost" + type: "float32" + unit: "EUR/MWh" + description: "Electricity cost breakeven point" + +validation_rules: + electricity_prices: + - column: "day_ahead_price" + min: -500 + max: 3000 + - column: "real_time_price" + min: -500 + max: 5000 + + battery_capacity: + - column: "charge_level_mwh" + min: 0 + check_max: "capacity_mwh" + - column: "efficiency" + min: 0.5 + max: 1.0 + + renewable_generation: + - column: "generation_mw" + min: 0 + - column: "capacity_factor" + min: 0 + max: 1.0 + + conventional_generation: + - column: "generation_mw" + min: 0 + - column: "heat_rate" + min: 5 + max: 15 + + load_profiles: + - column: "load_mw" + min: 0 + - column: "weather_temp" + min: -30 + max: 50 + + data_centers: + - column: "power_demand_mw" + min: 0 + - column: "max_bid_price" + min: 0 + + bitcoin_mining: + - column: "hashrate_ths" + min: 0 + - column: "btc_price_usd" + min: 1000 diff --git a/data/metadata/final_metadata.json b/data/metadata/final_metadata.json new file mode 100644 index 0000000..ff29c2c --- /dev/null +++ b/data/metadata/final_metadata.json @@ -0,0 +1,49 @@ +{ + "processed_at": "2026-02-10T16:10:49.295018+00:00", + "total_datasets": 7, + "total_size_mb": 16.977967262268066, + "datasets": { + "electricity_prices": { + "path": "/home/user/energy-test-data/data/processed/electricity_prices.parquet", + "size_mb": 2.2755775451660156, + "rows": 72005, + "columns": 7 + }, + "battery_capacity": { + "path": "/home/user/energy-test-data/data/processed/battery_capacity.parquet", + "size_mb": 4.204527854919434, + "rows": 144010, + "columns": 7 + }, + "renewable_generation": { + "path": "/home/user/energy-test-data/data/processed/renewable_generation.parquet", + "size_mb": 4.482715606689453, + "rows": 216015, + "columns": 7 + }, + "conventional_generation": { + "path": "/home/user/energy-test-data/data/processed/conventional_generation.parquet", + "size_mb": 2.749570846557617, + "rows": 144010, + "columns": 6 + }, + "load_profiles": { + "path": "/home/user/energy-test-data/data/processed/load_profiles.parquet", + "size_mb": 1.861943244934082, + "rows": 72005, + "columns": 6 + }, + "data_centers": { + "path": "/home/user/energy-test-data/data/processed/data_centers.parquet", + "size_mb": 1.0422554016113281, + "rows": 72005, + "columns": 6 + }, + "bitcoin_mining": { + "path": "/home/user/energy-test-data/data/processed/bitcoin_mining.parquet", + "size_mb": 0.3613767623901367, + "rows": 14401, + "columns": 6 + } + } +} \ No newline at end of file diff --git a/data/metadata/generation_metadata.json b/data/metadata/generation_metadata.json new file mode 100644 index 0000000..10baa59 --- /dev/null +++ b/data/metadata/generation_metadata.json @@ -0,0 +1,89 @@ +{ + "generated_at": "2026-02-10T16:10:43.522420", + "datasets": { + "battery_capacity": { + "rows": 144010, + "columns": [ + "timestamp", + "battery_id", + "capacity_mwh", + "charge_level_mwh", + "charge_rate_mw", + "discharge_rate_mw", + "efficiency" + ], + "memory_usage_mb": 15.38205337524414, + "dtypes": { + "timestamp": "datetime64[ns]", + "battery_id": "object", + "capacity_mwh": "float64", + "charge_level_mwh": "float64", + "charge_rate_mw": "float64", + "discharge_rate_mw": "float64", + "efficiency": "float64" + } + }, + "renewable_generation": { + "rows": 216015, + "columns": [ + "timestamp", + "source", + "plant_id", + "generation_mw", + "forecast_mw", + "actual_mw", + "capacity_factor" + ], + "memory_usage_mb": 34.472124099731445, + "dtypes": { + "timestamp": "datetime64[ns]", + "source": "object", + "plant_id": "object", + "generation_mw": "float64", + "forecast_mw": "float64", + "actual_mw": "float64", + "capacity_factor": "float64" + } + }, + "conventional_generation": { + "rows": 144010, + "columns": [ + "timestamp", + "plant_id", + "fuel_type", + "generation_mw", + "marginal_cost", + "heat_rate" + ], + "memory_usage_mb": 26.149402618408203, + "dtypes": { + "timestamp": "datetime64[ns]", + "plant_id": "object", + "fuel_type": "object", + "generation_mw": "float64", + "marginal_cost": "float64", + "heat_rate": "float64" + } + }, + "data_centers": { + "rows": 72005, + "columns": [ + "timestamp", + "data_center_id", + "location", + "power_demand_mw", + "max_bid_price", + "client_type" + ], + "memory_usage_mb": 14.585489273071289, + "dtypes": { + "timestamp": "datetime64[ns]", + "data_center_id": "object", + "location": "object", + "power_demand_mw": "float64", + "max_bid_price": "float64", + "client_type": "object" + } + } + } +} \ No newline at end of file diff --git a/data/metadata/validation_report.json b/data/metadata/validation_report.json new file mode 100644 index 0000000..2998ffb --- /dev/null +++ b/data/metadata/validation_report.json @@ -0,0 +1,239 @@ +{ + "generated_at": "2026-02-10T16:10:53.614368", + "summary": { + "total_datasets": 7, + "passed": 2, + "warnings": 5, + "failed": 0, + "total_size_mb": 17.72, + "total_rows": 734451 + }, + "datasets": [ + { + "dataset": "electricity_prices", + "rows": 72005, + "columns": 7, + "memory_mb": 1.99, + "missing_values": {}, + "duplicated_rows": 0, + "timestamp_continuity": { + "status": "checked", + "expected_frequency": "1min", + "gaps_detected": 0, + "total_rows": 72005 + }, + "data_ranges": [], + "data_types": [], + "status": "pass" + }, + { + "dataset": "battery_capacity", + "rows": 144010, + "columns": 7, + "memory_mb": 3.98, + "missing_values": { + "capacity_mwh": { + "count": 720, + "percentage": 0.5 + }, + "charge_level_mwh": { + "count": 720, + "percentage": 0.5 + }, + "charge_rate_mw": { + "count": 720, + "percentage": 0.5 + }, + "discharge_rate_mw": { + "count": 720, + "percentage": 0.5 + }, + "efficiency": { + "count": 720, + "percentage": 0.5 + } + }, + "duplicated_rows": 0, + "timestamp_continuity": { + "status": "checked", + "expected_frequency": "1min", + "gaps_detected": 0, + "total_rows": 144010 + }, + "data_ranges": [ + { + "column": "efficiency", + "rule": "min >= 0.5", + "violations": 36, + "severity": "error" + }, + { + "column": "efficiency", + "rule": "max <= 1.0", + "violations": 4371, + "severity": "error" + } + ], + "data_types": [], + "status": "warning" + }, + { + "dataset": "renewable_generation", + "rows": 216015, + "columns": 7, + "memory_mb": 5.36, + "missing_values": { + "generation_mw": { + "count": 1080, + "percentage": 0.5 + }, + "forecast_mw": { + "count": 1080, + "percentage": 0.5 + }, + "actual_mw": { + "count": 1080, + "percentage": 0.5 + }, + "capacity_factor": { + "count": 1080, + "percentage": 0.5 + } + }, + "duplicated_rows": 0, + "timestamp_continuity": { + "status": "checked", + "expected_frequency": "1min", + "gaps_detected": 0, + "total_rows": 216015 + }, + "data_ranges": [ + { + "column": "capacity_factor", + "rule": "max <= 1.0", + "violations": 6382, + "severity": "error" + } + ], + "data_types": [], + "status": "warning" + }, + { + "dataset": "conventional_generation", + "rows": 144010, + "columns": 6, + "memory_mb": 3.02, + "missing_values": { + "generation_mw": { + "count": 720, + "percentage": 0.5 + }, + "marginal_cost": { + "count": 720, + "percentage": 0.5 + }, + "heat_rate": { + "count": 720, + "percentage": 0.5 + } + }, + "duplicated_rows": 0, + "timestamp_continuity": { + "status": "checked", + "expected_frequency": "1min", + "gaps_detected": 0, + "total_rows": 144010 + }, + "data_ranges": [ + { + "column": "heat_rate", + "rule": "min >= 5", + "violations": 29, + "severity": "error" + }, + { + "column": "heat_rate", + "rule": "max <= 15", + "violations": 867, + "severity": "error" + } + ], + "data_types": [], + "status": "warning" + }, + { + "dataset": "load_profiles", + "rows": 72005, + "columns": 6, + "memory_mb": 1.72, + "missing_values": {}, + "duplicated_rows": 0, + "timestamp_continuity": { + "status": "checked", + "expected_frequency": "1min", + "gaps_detected": 0, + "total_rows": 72005 + }, + "data_ranges": [], + "data_types": [], + "status": "pass" + }, + { + "dataset": "data_centers", + "rows": 72005, + "columns": 6, + "memory_mb": 1.31, + "missing_values": { + "power_demand_mw": { + "count": 360, + "percentage": 0.5 + }, + "max_bid_price": { + "count": 360, + "percentage": 0.5 + } + }, + "duplicated_rows": 0, + "timestamp_continuity": { + "status": "checked", + "expected_frequency": "1min", + "gaps_detected": 0, + "total_rows": 72005 + }, + "data_ranges": [ + { + "column": "power_demand_mw", + "rule": "min >= 0", + "violations": 137, + "severity": "error" + } + ], + "data_types": [], + "status": "warning" + }, + { + "dataset": "bitcoin_mining", + "rows": 14401, + "columns": 6, + "memory_mb": 0.34, + "missing_values": {}, + "duplicated_rows": 0, + "timestamp_continuity": { + "status": "checked", + "expected_frequency": "1min", + "gaps_detected": 0, + "total_rows": 14401 + }, + "data_ranges": [ + { + "column": "btc_price_usd", + "rule": "min >= 1000", + "violations": 456, + "severity": "error" + } + ], + "data_types": [], + "status": "warning" + } + ] +} \ No newline at end of file diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..dd3aff0 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,7 @@ +pandas>=2.0.0 +numpy>=1.24.0 +pyarrow>=14.0.0 +pyyaml>=6.0 +requests>=2.31.0 +scipy>=1.11.0 +python-dateutil>=2.8.0 diff --git a/scripts/01_generate_synthetic.py b/scripts/01_generate_synthetic.py new file mode 100644 index 0000000..9e3fc5a --- /dev/null +++ b/scripts/01_generate_synthetic.py @@ -0,0 +1,320 @@ +""" +Generate synthetic data for energy trading strategy test data. +Handles: battery capacity, data centers, renewable generation, conventional generation. +""" + +import yaml +import numpy as np +import pandas as pd +from pathlib import Path +from datetime import datetime, timedelta +import json + +def load_config(): + config_path = Path(__file__).parent.parent / "config" / "data_config.yaml" + with open(config_path) as f: + return yaml.safe_load(f) + +def generate_timestamps(start_date, end_date, granularity): + start = pd.to_datetime(start_date) + end = pd.to_datetime(end_date) + freq = granularity + return pd.date_range(start=start, end=end, freq=freq) + +def generate_battery_data(config, timestamps): + np.random.seed(config['generation']['seed']) + num_batteries = config['data_sources']['battery_capacity']['num_batteries'] + + params = config['battery'] + gen_params = config['generation'] + + batteries = [] + for i in range(num_batteries): + battery_id = f"BAT_{i+1:03d}" + capacity = np.random.uniform(*params['capacity_range']) + charge_rate = np.random.uniform(*params['charge_rate_range']) + discharge_rate = np.random.uniform(*params['discharge_rate_range']) + efficiency = np.random.uniform(*params['efficiency_range']) + + n = len(timestamps) + + charge_level = np.zeros(n) + charge_level[0] = capacity * np.random.uniform(0.3, 0.7) + + for t in range(1, n): + action = np.random.choice([-1, 0, 1], p=[0.3, 0.2, 0.5]) + rate = charge_rate if action > 0 else discharge_rate + + change = action * rate / 60 + charge_level[t] = np.clip(charge_level[t-1] + change, 0, capacity) + + current_rate = np.diff(charge_level, prepend=charge_level[0]) * 60 + current_rate = np.clip(current_rate, -discharge_rate, charge_rate) + + data = pd.DataFrame({ + 'timestamp': timestamps, + 'battery_id': battery_id, + 'capacity_mwh': capacity, + 'charge_level_mwh': charge_level, + 'charge_rate_mw': current_rate, + 'discharge_rate_mw': discharge_rate, + 'efficiency': efficiency + }) + batteries.append(data) + + return pd.concat(batteries, ignore_index=True) + +def generate_renewable_data(config, timestamps): + np.random.seed(config['generation']['seed'] + 1) + + sources = config['data_sources']['renewable_generation']['sources'] + plants_per_source = config['data_sources']['renewable_generation']['plants_per_source'] + + params = config['renewable'] + gen_params = config['generation'] + + df_list = [] + plant_counter = 0 + + for source in sources: + source_params = params[source] + for i in range(plants_per_source): + plant_id = f"{source.upper()}_{i+1:03d}" + plant_counter += 1 + capacity = np.random.uniform(*source_params['capacity_range']) + forecast_error_sd = source_params['forecast_error_sd'] + + n = len(timestamps) + + hours = timestamps.hour + timestamps.minute / 60 + + if source == 'solar': + base_pattern = np.maximum(0, np.sin(np.pi * (hours - 6) / 12)) + seasonal = 0.7 + 0.3 * np.sin(2 * np.pi * timestamps.dayofyear / 365) + elif source == 'wind': + base_pattern = 0.4 + 0.3 * np.sin(2 * np.pi * hours / 24) + 0.3 * np.random.randn(n) + seasonal = 0.8 + 0.2 * np.sin(2 * np.pi * timestamps.dayofyear / 365) + else: + base_pattern = 0.6 + 0.2 * np.random.randn(n) + seasonal = 1.0 + + generation = base_pattern * seasonal * capacity * np.random.uniform(0.8, 1.2, n) + generation = np.maximum(0, generation) + + forecast_error = np.random.normal(0, forecast_error_sd, n) + forecast = generation * (1 + forecast_error) + forecast = np.maximum(0, forecast) + + capacity_factor = generation / capacity + + data = pd.DataFrame({ + 'timestamp': timestamps, + 'source': source, + 'plant_id': plant_id, + 'generation_mw': generation, + 'forecast_mw': forecast, + 'actual_mw': generation, + 'capacity_factor': capacity_factor + }) + df_list.append(data) + + return pd.concat(df_list, ignore_index=True) + +def generate_conventional_data(config, timestamps): + np.random.seed(config['generation']['seed'] + 2) + + num_plants = config['data_sources']['conventional_generation']['num_plants'] + fuel_types = config['data_sources']['conventional_generation']['fuel_types'] + + params = config['conventional'] + + df_list = [] + + for i in range(num_plants): + plant_id = f"CONV_{i+1:03d}" + fuel_type = np.random.choice(fuel_types) + + fuel_params = params[fuel_type] + capacity = np.random.uniform(*fuel_params['capacity_range']) + marginal_cost = np.random.uniform(*fuel_params['marginal_cost_range']) + heat_rate = np.random.uniform(6, 12) if fuel_type == 'gas' else np.random.uniform(8, 14) + + n = len(timestamps) + hours = timestamps.hour + timestamps.minute / 60 + + if fuel_type == 'nuclear': + base_load = 0.9 * capacity + generation = base_load + np.random.normal(0, 0.01 * capacity, n) + elif fuel_type == 'gas': + peaking_pattern = 0.3 + 0.4 * np.sin(2 * np.pi * (hours - 12) / 24) + generation = peaking_pattern * capacity + np.random.normal(0, 0.05 * capacity, n) + else: + baseload_pattern = 0.5 + 0.2 * np.sin(2 * np.pi * hours / 24) + generation = baseload_pattern * capacity + np.random.normal(0, 0.03 * capacity, n) + + generation = np.clip(generation, 0, capacity) + + data = pd.DataFrame({ + 'timestamp': timestamps, + 'plant_id': plant_id, + 'fuel_type': fuel_type, + 'generation_mw': generation, + 'marginal_cost': marginal_cost, + 'heat_rate': heat_rate + }) + df_list.append(data) + + return pd.concat(df_list, ignore_index=True) + +def generate_data_center_data(config, timestamps): + np.random.seed(config['generation']['seed'] + 3) + + num_centers = config['data_sources']['data_centers']['num_centers'] + params = config['data_center'] + + df_list = [] + locations = ['FR', 'BE', 'DE', 'NL', 'UK'] + + for i in range(num_centers): + data_center_id = f"DC_{i+1:03d}" + location = locations[i % len(locations)] + + base_demand = np.random.uniform(*params['power_demand_range']) + price_sensitivity = np.random.uniform(*params['price_sensitivity_range']) + + is_bitcoin = (i == 0) + client_type = 'bitcoin' if is_bitcoin else 'enterprise' + + n = len(timestamps) + hours = timestamps.hour + timestamps.minute / 60 + + if is_bitcoin: + base_profile = 0.7 + 0.3 * np.random.randn(n) + else: + base_profile = 0.6 + 0.2 * np.sin(2 * np.pi * (hours - 12) / 24) + + demand = base_demand * base_profile + demand = np.maximum(demand * 0.5, demand) + + max_bid = base_demand * price_sensitivity * (0.8 + 0.4 * np.random.rand(n)) + + data = pd.DataFrame({ + 'timestamp': timestamps, + 'data_center_id': data_center_id, + 'location': location, + 'power_demand_mw': demand, + 'max_bid_price': max_bid, + 'client_type': client_type + }) + df_list.append(data) + + return pd.concat(df_list, ignore_index=True) + +def apply_noise_and_outliers(df, config): + if not config['generation']['add_noise']: + return df + + noise_level = config['generation']['noise_level'] + outlier_rate = config['generation']['outlier_rate'] + + for col in df.select_dtypes(include=[np.number]).columns: + if col == 'timestamp': + continue + + noise = np.random.normal(0, noise_level, len(df)) + df[col] = df[col] * (1 + noise) + + num_outliers = int(len(df) * outlier_rate) + outlier_idx = np.random.choice(len(df), num_outliers, replace=False) + df.loc[outlier_idx, col] = df.loc[outlier_idx, col] * np.random.uniform(0.5, 2.0, num_outliers) + + return df + +def add_missing_values(df, config): + if not config['generation']['include_missing_values']: + return df + + missing_rate = config['generation']['missing_rate'] + + for col in df.select_dtypes(include=[np.number]).columns: + if col == 'timestamp': + continue + + num_missing = int(len(df) * missing_rate) + missing_idx = np.random.choice(len(df), num_missing, replace=False) + df.loc[missing_idx, col] = np.nan + + return df + +def save_metadata(datasets, output_dir): + metadata = { + 'generated_at': datetime.utcnow().isoformat(), + 'datasets': {} + } + + for name, df in datasets.items(): + metadata['datasets'][name] = { + 'rows': len(df), + 'columns': len(df.columns), + 'memory_usage_mb': df.memory_usage(deep=True).sum() / 1024 / 1024, + 'dtypes': {col: str(dtype) for col, dtype in df.dtypes.items()}, + 'columns': list(df.columns) + } + + output_path = Path(output_dir) / 'metadata' / 'generation_metadata.json' + with open(output_path, 'w') as f: + json.dump(metadata, f, indent=2, default=str) + + return metadata + +def main(): + config = load_config() + + time_config = config['time_range'] + timestamps = generate_timestamps( + time_config['start_date'], + time_config['end_date'], + time_config['granularity'] + ) + + print(f"Generating synthetic data for {len(timestamps)} timestamps...") + + datasets = {} + + datasets['battery_capacity'] = generate_battery_data(config, timestamps) + print(f" - Battery capacity: {len(datasets['battery_capacity'])} rows") + + datasets['renewable_generation'] = generate_renewable_data(config, timestamps) + print(f" - Renewable generation: {len(datasets['renewable_generation'])} rows") + + datasets['conventional_generation'] = generate_conventional_data(config, timestamps) + print(f" - Conventional generation: {len(datasets['conventional_generation'])} rows") + + datasets['data_centers'] = generate_data_center_data(config, timestamps) + print(f" - Data centers: {len(datasets['data_centers'])} rows") + + for name, df in datasets.items(): + df = apply_noise_and_outliers(df, config) + df = add_missing_values(df, config) + datasets[name] = df + + output_base = Path(__file__).parent.parent / 'data' + output_base.mkdir(parents=True, exist_ok=True) + + raw_dir = output_base / 'raw' + raw_dir.mkdir(parents=True, exist_ok=True) + + for name, df in datasets.items(): + file_path = raw_dir / f'{name}_raw.parquet' + df.to_parquet(file_path, compression='snappy') + print(f" Saved: {file_path}") + + metadata = save_metadata(datasets, output_base) + + print("\nMetadata saved to data/metadata/generation_metadata.json") + print(f"Total datasets generated: {len(datasets)}") + + return datasets + +if __name__ == '__main__': + main() diff --git a/scripts/02_fetch_historical.py b/scripts/02_fetch_historical.py new file mode 100644 index 0000000..eaad3c9 --- /dev/null +++ b/scripts/02_fetch_historical.py @@ -0,0 +1,222 @@ +""" +Fetch historical data for energy trading strategy test data. +Handles: electricity prices, bitcoin mining data, load profiles. +""" + +import yaml +import numpy as np +import pandas as pd +from pathlib import Path +from datetime import datetime, timedelta +import requests +import json +import time + +def load_config(): + config_path = Path(__file__).parent.parent / "config" / "data_config.yaml" + with open(config_path) as f: + return yaml.safe_load(f) + +def generate_timestamps(start_date, end_date, granularity): + start = pd.to_datetime(start_date) + end = pd.to_datetime(end_date) + return pd.date_range(start=start, end=end, freq=granularity) + +def fetch_electricity_prices(config, timestamps): + np.random.seed(config['generation']['seed'] + 10) + + regions = config['regions'] + print(f"Fetching electricity prices for {len(regions)} regions...") + + df_list = [] + + for region in regions: + n = len(timestamps) + hours = timestamps.hour + timestamps.minute / 60 + days = timestamps.dayofyear + + if region == 'FR': + base_price = 80 + volatility = 30 + elif region == 'DE': + base_price = 90 + volatility = 40 + elif region == 'NL': + base_price = 85 + volatility = 35 + elif region == 'BE': + base_price = 82 + volatility = 32 + else: + base_price = 100 + volatility = 50 + + day_ahead = base_price + volatility * np.sin(2 * np.pi * hours / 24) + np.random.normal(0, 10, n) + real_time = day_ahead + np.random.normal(0, 20, n) + + price_spikes = np.random.random(n) < 0.02 + real_time = np.array(real_time) + real_time[price_spikes] += np.random.uniform(100, 500, int(np.sum(price_spikes))) + + capacity_price = np.abs(np.random.normal(5, 2, n)) + regulation_price = np.abs(np.random.normal(3, 1, n)) + + volume = np.random.uniform(1000, 5000, n) + + data = pd.DataFrame({ + 'timestamp': timestamps, + 'region': region, + 'day_ahead_price': day_ahead, + 'real_time_price': real_time, + 'capacity_price': capacity_price, + 'regulation_price': regulation_price, + 'volume_mw': volume + }) + df_list.append(data) + + return pd.concat(df_list, ignore_index=True) + +def fetch_bitcoin_mining_data(config, timestamps): + np.random.seed(config['generation']['seed'] + 11) + + print(f"Fetching bitcoin mining data from mempool.space (simulated)...") + + n = len(timestamps) + + try: + btc_api = "https://mempool.space/api/v1/fees/recommended" + response = requests.get(btc_api, timeout=10) + if response.status_code == 200: + fees = response.json() + base_btc_price = 45000 + else: + base_btc_price = 45000 + except: + base_btc_price = 45000 + + btc_params = config['bitcoin'] + + btc_trend = np.linspace(0.95, 1.05, n) + btc_daily_volatility = np.cumsum(np.random.normal(0, 0.01, n)) + 1 + btc_daily_volatility = btc_daily_volatility / btc_daily_volatility[0] + + btc_price = base_btc_price * btc_trend * btc_daily_volatility * (1 + 0.03 * np.random.randn(n)) + + hashrate_base = np.random.uniform(*btc_params['hashrate_range']) + hashrate = hashrate_base * (1 + 0.05 * np.sin(2 * np.pi * np.arange(n) / (n / 10))) * (1 + 0.02 * np.random.randn(n)) + + electricity_efficiency = np.random.uniform(*btc_params['mining_efficiency_range']) + + btc_price_eur = btc_price * 0.92 + power_cost_eur = 50 + mining_profitability = (btc_price_eur * 0.0001 / 3.6) / (electricity_efficiency / 1000) + + electricity_breakeven = (btc_price_eur * 0.0001 / 3.6) / (mining_profitability / 24 * electricity_efficiency / 1000) * 24 + + data = pd.DataFrame({ + 'timestamp': timestamps, + 'pool_id': 'POOL_001', + 'hashrate_ths': hashrate, + 'btc_price_usd': btc_price, + 'mining_profitability': mining_profitability, + 'electricity_cost': electricity_breakeven + }) + + return data + +def fetch_load_profiles(config, timestamps): + np.random.seed(config['generation']['seed'] + 12) + + regions = config['regions'] + print(f"Fetching load profiles for {len(regions)} regions...") + + df_list = [] + + for region in regions: + n = len(timestamps) + hours = timestamps.hour + timestamps.minute / 60 + day_of_year = timestamps.dayofyear + + if region == 'FR': + base_load = 60000 + peak_hours = [10, 20] + elif region == 'DE': + base_load = 70000 + peak_hours = [9, 19] + elif region == 'NL': + base_load = 15000 + peak_hours = [11, 21] + elif region == 'BE': + base_load = 12000 + peak_hours = [10, 20] + else: + base_load = 45000 + peak_hours = [9, 19] + + daily_pattern = 0.7 + 0.3 * np.exp(-0.5 * ((hours - 18) / 4) ** 2) + seasonal_pattern = 0.8 + 0.2 * np.sin(2 * np.pi * (day_of_year - 15) / 365) + + load = base_load * daily_pattern * seasonal_pattern * (1 + 0.05 * np.random.randn(n)) + + forecast = load * (1 + np.random.normal(0, 0.03, n)) + + temp = 15 + 15 * np.sin(2 * np.pi * (day_of_year - 15) / 365) + np.random.normal(0, 3, n) + humidity = 60 + 20 * np.sin(2 * np.pi * (day_of_year - 15) / 365) + np.random.normal(0, 10, n) + + data = pd.DataFrame({ + 'timestamp': timestamps, + 'region': region, + 'load_mw': load, + 'forecast_mw': forecast, + 'weather_temp': temp, + 'humidity': humidity + }) + df_list.append(data) + + return pd.concat(df_list, ignore_index=True) + +def save_raw_data(datasets, output_dir): + output_path = Path(output_dir) / 'raw' + output_path.mkdir(parents=True, exist_ok=True) + + saved = {} + for name, df in datasets.items(): + file_path = output_path / f'{name}_raw.parquet' + df.to_parquet(file_path, compression='snappy') + saved[name] = str(file_path) + print(f" Saved: {file_path}") + + return saved + +def main(): + config = load_config() + + time_config = config['time_range'] + timestamps = generate_timestamps( + time_config['start_date'], + time_config['end_date'], + time_config['granularity'] + ) + + print(f"Fetching historical data for {len(timestamps)} timestamps...") + + datasets = {} + + datasets['electricity_prices'] = fetch_electricity_prices(config, timestamps) + print(f" - Electricity prices: {len(datasets['electricity_prices'])} rows") + + datasets['bitcoin_mining'] = fetch_bitcoin_mining_data(config, timestamps) + print(f" - Bitcoin mining: {len(datasets['bitcoin_mining'])} rows") + + datasets['load_profiles'] = fetch_load_profiles(config, timestamps) + print(f" - Load profiles: {len(datasets['load_profiles'])} rows") + + output_base = Path(__file__).parent.parent / 'data' + saved_files = save_raw_data(datasets, output_base) + + print(f"\nSaved {len(datasets)} historical datasets to data/raw/") + + return datasets + +if __name__ == '__main__': + main() diff --git a/scripts/03_process_merge.py b/scripts/03_process_merge.py new file mode 100644 index 0000000..8371efe --- /dev/null +++ b/scripts/03_process_merge.py @@ -0,0 +1,172 @@ +""" +Process and merge all datasets, apply compression, and save to Parquet format. +""" + +import yaml +import numpy as np +import pandas as pd +from pathlib import Path +import json +import sys + +def load_config(): + config_path = Path(__file__).parent.parent / "config" / "data_config.yaml" + with open(config_path) as f: + return yaml.safe_load(f) + +def load_dataset(dataset_name, data_base): + synthetic_path = data_base / 'metadata' / 'generation_metadata.json' + + df_list = [] + + raw_path = data_base / 'raw' / f'{dataset_name}_raw.parquet' + if raw_path.exists(): + print(f" Loading {dataset_name} from raw data...") + df = pd.read_parquet(raw_path) + df_list.append(df) + + print(f" Total rows for {dataset_name}: {len(pd.concat(df_list, ignore_index=True)) if df_list else 0}") + + return pd.concat(df_list, ignore_index=True) if df_list else None + +def downgrade_precision(df, config): + precision = config['output'].get('precision', 'float32') + + for col in df.select_dtypes(include=['float64']).columns: + if col == 'timestamp': + continue + df[col] = df[col].astype(precision) + + for col in df.select_dtypes(include=['int64']).columns: + if col == 'timestamp': + continue + df[col] = df[col].astype('int32') + + return df + +def convert_categoricals(df): + for col in df.select_dtypes(include=['object']).columns: + if col == 'timestamp': + continue + if df[col].nunique() < df.shape[0] * 0.5: + df[col] = df[col].astype('category') + + return df + +def optimize_memory(df): + start_mem = df.memory_usage(deep=True).sum() / 1024 / 1024 + + df = downgrade_precision(df, {'output': {'precision': 'float32'}}) + df = convert_categoricals(df) + + end_mem = df.memory_usage(deep=True).sum() / 1024 / 1024 + + reduction = (1 - end_mem / start_mem) * 100 + print(f" Memory: {start_mem:.2f}MB -> {end_mem:.2f}MB ({reduction:.1f}% reduction)") + + return df + +def save_processed_dataset(df, dataset_name, output_dir, config): + output_path = Path(output_dir) / f'{dataset_name}.parquet' + + compression = config['output'].get('compression', 'snappy') + + df.to_parquet(output_path, compression=compression, index=False) + + file_size_mb = output_path.stat().st_size / 1024 / 1024 + print(f" Saved: {output_path} ({file_size_mb:.2f}MB)") + + return { + 'path': str(output_path), + 'size_mb': file_size_mb, + 'rows': len(df), + 'columns': len(df.columns) + } + +def validate_timestamps(df, dataset_name): + if 'timestamp' not in df.columns: + print(f" Warning: {dataset_name} has no timestamp column") + return False + + df['timestamp'] = pd.to_datetime(df['timestamp']) + duplicates = df['timestamp'].duplicated().sum() + + if duplicates > 0: + print(f" Warning: {dataset_name} has {duplicates} duplicate timestamps") + + return True + +def generate_final_metadata(processed_info, output_dir): + metadata = { + 'processed_at': pd.Timestamp.utcnow().isoformat(), + 'total_datasets': len(processed_info), + 'total_size_mb': sum(info['size_mb'] for info in processed_info.values()), + 'datasets': processed_info + } + + output_path = Path(output_dir) / 'metadata' / 'final_metadata.json' + with open(output_path, 'w') as f: + json.dump(metadata, f, indent=2, default=str) + + return metadata + +def main(): + config = load_config() + + data_base = Path(__file__).parent.parent / 'data' + processed_dir = data_base / 'processed' + processed_dir.mkdir(parents=True, exist_ok=True) + + print("Processing and merging datasets...") + + datasets = [ + 'electricity_prices', + 'battery_capacity', + 'renewable_generation', + 'conventional_generation', + 'load_profiles', + 'data_centers', + 'bitcoin_mining' + ] + + processed_info = {} + + for dataset_name in datasets: + print(f"\nProcessing {dataset_name}...") + + df = load_dataset(dataset_name, data_base) + + if df is None: + print(f" Warning: {dataset_name} has no data, skipping") + continue + + validate_timestamps(df, dataset_name) + + print(" Optimizing memory...") + df = optimize_memory(df) + + info = save_processed_dataset(df, dataset_name, processed_dir, config) + processed_info[dataset_name] = info + + print(f"\n{'='*60}") + print("Processing complete!") + print(f"{'='*60}") + + metadata = generate_final_metadata(processed_info, data_base) + + print(f"\nTotal datasets processed: {len(processed_info)}") + print(f"Total size: {metadata['total_size_mb']:.2f}MB") + print(f"Target size: {config['output']['target_size_mb']}MB") + + if metadata['total_size_mb'] > config['output']['target_size_mb']: + print(f"Warning: Total size exceeds target by {metadata['total_size_mb'] - config['output']['target_size_mb']:.2f}MB") + else: + print("✓ Total size within target") + + print(f"\nProcessed data saved to: {processed_dir}") + print(f"Metadata saved to: {data_base / 'metadata' / 'final_metadata.json'}") + + return processed_info + +if __name__ == '__main__': + main() diff --git a/scripts/04_validate.py b/scripts/04_validate.py new file mode 100644 index 0000000..8f3a5a8 --- /dev/null +++ b/scripts/04_validate.py @@ -0,0 +1,272 @@ +""" +Validate processed datasets for quality, missing values, and data consistency. +""" + +import yaml +import numpy as np +import pandas as pd +from pathlib import Path +import json +from datetime import datetime + +def load_config(): + config_path = Path(__file__).parent.parent / "config" / "data_config.yaml" + with open(config_path) as f: + return yaml.safe_load(f) + +def load_schema(): + schema_path = Path(__file__).parent.parent / "config" / "schema.yaml" + with open(schema_path) as f: + return yaml.safe_load(f) + +def load_processed_dataset(dataset_name, data_dir): + file_path = Path(data_dir) / 'processed' / f'{dataset_name}.parquet' + if file_path.exists(): + return pd.read_parquet(file_path) + return None + +def check_missing_values(df, dataset_name): + missing_info = {} + + for col in df.columns: + missing_count = df[col].isna().sum() + missing_pct = (missing_count / len(df)) * 100 + + if missing_count > 0: + missing_info[col] = { + 'count': int(missing_count), + 'percentage': round(missing_pct, 2) + } + + return missing_info + +def check_data_ranges(df, dataset_name, schema): + validation_results = [] + + if dataset_name not in schema['validation_rules']: + return validation_results + + rules = schema['validation_rules'][dataset_name] + + for rule in rules: + column = rule['column'] + if column not in df.columns: + continue + + col_data = df[column].dropna() + + if 'min' in rule: + violations = (col_data < rule['min']).sum() + if violations > 0: + validation_results.append({ + 'column': column, + 'rule': f'min >= {rule["min"]}', + 'violations': int(violations), + 'severity': 'error' + }) + + if 'max' in rule: + violations = (col_data > rule['max']).sum() + if violations > 0: + validation_results.append({ + 'column': column, + 'rule': f'max <= {rule["max"]}', + 'violations': int(violations), + 'severity': 'error' + }) + + return validation_results + +def check_duplicated_rows(df, dataset_name): + duplicates = df.duplicated().sum() + return int(duplicates) + +def check_timestamp_continuity(df, dataset_name, expected_freq='1min'): + if 'timestamp' not in df.columns: + return {'status': 'skipped', 'reason': 'no timestamp column'} + + df_sorted = df.sort_values('timestamp') + time_diffs = df_sorted['timestamp'].diff().dropna() + + expected_diff = pd.Timedelta(expected_freq) + missing_gaps = time_diffs[time_diffs > expected_diff * 1.5] + + return { + 'status': 'checked', + 'expected_frequency': expected_freq, + 'gaps_detected': len(missing_gaps), + 'total_rows': len(df) + } + +def check_data_types(df, dataset_name, schema): + type_issues = [] + + expected_schema = schema['schemas'].get(dataset_name, {}) + expected_columns = {col['name']: col['type'] for col in expected_schema.get('columns', [])} + + for col, expected_type in expected_columns.items(): + if col not in df.columns: + type_issues.append({ + 'column': col, + 'issue': 'missing', + 'expected': expected_type + }) + elif expected_type == 'datetime64[ns]': + if not pd.api.types.is_datetime64_any_dtype(df[col]): + type_issues.append({ + 'column': col, + 'issue': 'wrong_type', + 'expected': 'datetime', + 'actual': str(df[col].dtype) + }) + elif expected_type == 'category': + if not pd.api.types.is_categorical_dtype(df[col]): + type_issues.append({ + 'column': col, + 'issue': 'wrong_type', + 'expected': 'category', + 'actual': str(df[col].dtype) + }) + elif expected_type == 'float32': + if df[col].dtype not in ['float32', 'float64']: + type_issues.append({ + 'column': col, + 'issue': 'wrong_type', + 'expected': 'float32', + 'actual': str(df[col].dtype) + }) + + return type_issues + +def validate_dataset(df, dataset_name, schema): + results = { + 'dataset': dataset_name, + 'rows': len(df), + 'columns': len(df.columns), + 'memory_mb': round(df.memory_usage(deep=True).sum() / 1024 / 1024, 2), + 'missing_values': check_missing_values(df, dataset_name), + 'duplicated_rows': check_duplicated_rows(df, dataset_name), + 'timestamp_continuity': check_timestamp_continuity(df, dataset_name), + 'data_ranges': check_data_ranges(df, dataset_name, schema), + 'data_types': check_data_types(df, dataset_name, schema) + } + + error_count = ( + sum(1 for v in results['data_ranges'] if v.get('severity') == 'error') + + len(results['data_types']) + ) + + results['status'] = 'pass' if error_count == 0 else 'warning' if error_count < 10 else 'fail' + + return results + +def generate_validation_report(all_results, output_dir): + total_errors = sum(1 for r in all_results if r['status'] == 'fail') + total_warnings = sum(1 for r in all_results if r['status'] == 'warning') + total_pass = sum(1 for r in all_results if r['status'] == 'pass') + + total_size_mb = sum(r['memory_mb'] for r in all_results) + total_rows = sum(r['rows'] for r in all_results) + + report = { + 'generated_at': datetime.utcnow().isoformat(), + 'summary': { + 'total_datasets': len(all_results), + 'passed': total_pass, + 'warnings': total_warnings, + 'failed': total_errors, + 'total_size_mb': round(total_size_mb, 2), + 'total_rows': total_rows + }, + 'datasets': all_results + } + + output_path = Path(output_dir) / 'metadata' / 'validation_report.json' + with open(output_path, 'w') as f: + json.dump(report, f, indent=2, default=str) + + return report + +def print_summary(report): + print(f"\n{'='*60}") + print("VALIDATION SUMMARY") + print(f"{'='*60}") + print(f"Datasets processed: {report['summary']['total_datasets']}") + print(f" ✓ Passed: {report['summary']['passed']}") + print(f" ⚠ Warnings: {report['summary']['warnings']}") + print(f" ✗ Failed: {report['summary']['failed']}") + print(f"\nTotal size: {report['summary']['total_size_mb']:.2f}MB") + print(f"Total rows: {report['summary']['total_rows']:,}") + + print(f"\n{'='*60}") + print("PER-DATASET DETAILS") + print(f"{'='*60}") + + for result in report['datasets']: + status_icon = '✓' if result['status'] == 'pass' else '⚠' if result['status'] == 'warning' else '✗' + print(f"\n{status_icon} {result['dataset']}") + print(f" Rows: {result['rows']:,} | Columns: {result['columns']} | Size: {result['memory_mb']:.2f}MB") + + if result['missing_values']: + print(f" Missing values: {len(result['missing_values'])} columns") + + if result['data_ranges']: + print(f" Range violations: {len(result['data_ranges'])}") + + if result['data_types']: + print(f" Type issues: {len(result['data_types'])}") + + if result['timestamp_continuity']['status'] == 'checked': + if result['timestamp_continuity']['gaps_detected'] > 0: + print(f" Time gaps: {result['timestamp_continuity']['gaps_detected']}") + +def main(): + config = load_config() + schema = load_schema() + + data_dir = Path(__file__).parent.parent / 'data' + + datasets = [ + 'electricity_prices', + 'battery_capacity', + 'renewable_generation', + 'conventional_generation', + 'load_profiles', + 'data_centers', + 'bitcoin_mining' + ] + + print("Validating processed datasets...\n") + + all_results = [] + + for dataset_name in datasets: + print(f"Validating {dataset_name}...") + + df = load_processed_dataset(dataset_name, data_dir) + + if df is None: + print(f" ✗ Dataset not found, skipping") + all_results.append({ + 'dataset': dataset_name, + 'status': 'error', + 'error': 'Dataset file not found' + }) + continue + + result = validate_dataset(df, dataset_name, schema) + all_results.append(result) + + status_icon = '✓' if result['status'] == 'pass' else '⚠' if result['status'] == 'warning' else '✗' + print(f" {status_icon} {result['rows']:,} rows, {result['columns']} cols, {result['memory_mb']:.2f}MB") + + report = generate_validation_report(all_results, data_dir) + print_summary(report) + + print(f"\n{'='*60}") + print(f"Validation report saved to: {data_dir / 'metadata' / 'validation_report.json'}") + + return report + +if __name__ == '__main__': + main()