Add FastAPI backend for energy trading system

Implements FastAPI backend with ML model support for energy trading,
including price prediction models and RL-based battery trading policy.
Features dashboard, trading, backtest, and settings API routes with
WebSocket support for real-time updates.
This commit is contained in:
2026-02-12 00:59:26 +07:00
parent a22a13f6f4
commit fe76bc7629
72 changed files with 2931 additions and 0 deletions

View File

@@ -0,0 +1,3 @@
from app.ml.rl_battery import BatteryPolicy, BatteryRLTrainer
__all__ = ["BatteryPolicy", "BatteryRLTrainer"]

View File

@@ -0,0 +1,88 @@
from typing import Dict, Optional
import numpy as np
import pickle
from app.utils.logger import get_logger
logger = get_logger(__name__)
class QLearningAgent:
def __init__(
self,
state_bins: int = 10,
action_space: int = 3,
learning_rate: float = 0.1,
discount_factor: float = 0.95,
epsilon: float = 1.0,
epsilon_decay: float = 0.995,
epsilon_min: float = 0.05,
):
self.state_bins = state_bins
self.action_space = action_space
self.learning_rate = learning_rate
self.discount_factor = discount_factor
self.epsilon = epsilon
self.epsilon_decay = epsilon_decay
self.epsilon_min = epsilon_min
self.q_table: Optional[np.ndarray] = None
self.policy_id = "battery_policy"
def initialize_q_table(self, observation_space: int):
self.q_table = np.zeros((self.state_bins ** observation_space, self.action_space))
def _discretize_state(self, state: np.ndarray) -> int:
discretized = (state * self.state_bins).astype(int)
discretized = np.clip(discretized, 0, self.state_bins - 1)
index = 0
multiplier = 1
for val in discretized:
index += val * multiplier
multiplier *= self.state_bins
return index
def get_action(self, state: np.ndarray, training: bool = True) -> int:
state_idx = self._discretize_state(state)
if training and np.random.random() < self.epsilon:
return np.random.randint(self.action_space)
if self.q_table is None:
return 1
return np.argmax(self.q_table[state_idx])
def update(self, state: np.ndarray, action: int, reward: float, next_state: np.ndarray, done: bool):
if self.q_table is None:
return
state_idx = self._discretize_state(state)
next_state_idx = self._discretize_state(next_state)
current_q = self.q_table[state_idx, action]
if done:
target = reward
else:
next_q = np.max(self.q_table[next_state_idx])
target = reward + self.discount_factor * next_q
self.q_table[state_idx, action] += self.learning_rate * (target - current_q)
def decay_epsilon(self):
self.epsilon = max(self.epsilon_min, self.epsilon * self.epsilon_decay)
def save(self, filepath: str):
with open(filepath, "wb") as f:
pickle.dump(self, f)
logger.info(f"Saved Q-learning policy to {filepath}")
@classmethod
def load(cls, filepath: str):
with open(filepath, "rb") as f:
return pickle.load(f)
__all__ = ["QLearningAgent"]

View File

@@ -0,0 +1,87 @@
from typing import Dict, Tuple
import numpy as np
class BatteryEnvironment:
def __init__(
self,
capacity: float = 100.0,
charge_rate: float = 50.0,
discharge_rate: float = 50.0,
efficiency: float = 0.9,
min_reserve: float = 0.1,
max_charge: float = 0.9,
):
self.capacity = capacity
self.charge_rate = charge_rate
self.discharge_rate = discharge_rate
self.efficiency = efficiency
self.min_reserve = min_reserve
self.max_charge = max_charge
self.charge_level = capacity * 0.5
self.current_price = 50.0
self.time_step = 0
def reset(self) -> np.ndarray:
self.charge_level = self.capacity * 0.5
self.current_price = 50.0
self.time_step = 0
return self._get_state()
def _get_state(self) -> np.ndarray:
charge_pct = self.charge_level / self.capacity
price_norm = np.clip(self.current_price / 200.0, 0, 1)
time_norm = (self.time_step % 1440) / 1440.0
return np.array([charge_pct, price_norm, time_norm])
def step(self, action: int) -> Tuple[np.ndarray, float, bool, Dict]:
old_price = self.current_price
if action == 0:
charge_amount = min(self.charge_rate, self.capacity * self.max_charge - self.charge_level)
self.charge_level += charge_amount * self.efficiency
reward = -charge_amount * old_price / 1000.0
elif action == 1:
reward = 0.0
elif action == 2:
discharge_amount = min(
self.discharge_rate,
self.charge_level - self.capacity * self.min_reserve
)
revenue = discharge_amount * old_price
self.charge_level -= discharge_amount / self.efficiency
reward = revenue / 1000.0
else:
reward = 0.0
self.charge_level = np.clip(self.charge_level, self.capacity * self.min_reserve, self.capacity * self.max_charge)
self.current_price = old_price + np.random.randn() * 5
self.current_price = np.clip(self.current_price, 0, 300)
self.time_step += 1
state = self._get_state()
info = {
"charge_level": self.charge_level,
"price": self.current_price,
"action": action,
}
done = self.time_step >= 1440
return state, reward, done, info
@property
def action_space(self):
return 3
@property
def observation_space(self):
return 3
__all__ = ["BatteryEnvironment"]

View File

@@ -0,0 +1,65 @@
from typing import Dict
from app.ml.rl_battery.agent import QLearningAgent
from app.ml.rl_battery.environment import BatteryEnvironment
from app.utils.logger import get_logger
logger = get_logger(__name__)
class BatteryPolicy:
def __init__(self, policy_path: str = "models/rl_battery"):
self.policy_path = policy_path
self.agent: QLearningAgent = None
self.env: BatteryEnvironment = None
self._load_policy()
def _load_policy(self):
from pathlib import Path
filepath = Path(self.policy_path) / "battery_policy.pkl"
if filepath.exists():
self.agent = QLearningAgent.load(filepath)
self.env = BatteryEnvironment()
logger.info(f"Loaded policy from {filepath}")
def get_action(
self,
charge_level: float,
current_price: float,
price_forecast_1m: float = 0,
price_forecast_5m: float = 0,
price_forecast_15m: float = 0,
hour: int = 0,
) -> Dict:
if self.agent is None:
return {
"action": "hold",
"q_values": [0.0, 0.0, 0.0],
"confidence": 0.0,
}
self.env.charge_level = charge_level
self.env.current_price = current_price
self.env.time_step = hour * 60
state = self.env._get_state()
action_idx = self.agent.get_action(state, training=False)
actions = ["charge", "hold", "discharge"]
action_name = actions[action_idx]
state_idx = self.agent._discretize_state(state)
q_values = self.agent.q_table[state_idx].tolist() if self.agent.q_table is not None else [0.0, 0.0, 0.0]
max_q = max(q_values) if q_values else 0.0
confidence = (max_q - min(q_values)) / (max_q + 0.001) if q_values else 0.0
return {
"action": action_name,
"q_values": q_values,
"confidence": min(confidence, 1.0),
}
__all__ = ["BatteryPolicy"]

View File

@@ -0,0 +1,95 @@
from typing import Dict
from app.ml.rl_battery.environment import BatteryEnvironment
from app.ml.rl_battery.agent import QLearningAgent
from app.utils.logger import get_logger
logger = get_logger(__name__)
class BatteryRLTrainer:
def __init__(self, config=None):
self.config = config or {}
self.agent: QLearningAgent = None
self.env: BatteryEnvironment = None
def _create_agent(self) -> QLearningAgent:
return QLearningAgent(
state_bins=self.config.get("charge_level_bins", 10),
action_space=3,
learning_rate=self.config.get("learning_rate", 0.1),
discount_factor=self.config.get("discount_factor", 0.95),
epsilon=self.config.get("epsilon", 1.0),
epsilon_decay=self.config.get("epsilon_decay", 0.995),
epsilon_min=self.config.get("epsilon_min", 0.05),
)
def load_data(self):
pass
def train(self, n_episodes: int = 1000, region: str = "FR") -> Dict:
logger.info(f"Starting RL training for {n_episodes} episodes")
self.env = BatteryEnvironment(
capacity=100.0,
charge_rate=50.0,
discharge_rate=50.0,
efficiency=0.9,
min_reserve=0.1,
max_charge=0.9,
)
self.agent = self._create_agent()
self.agent.initialize_q_table(self.env.observation_space)
episode_rewards = []
for episode in range(n_episodes):
state = self.env.reset()
total_reward = 0
steps = 0
while True:
action = self.agent.get_action(state, training=True)
next_state, reward, done, info = self.env.step(action)
self.agent.update(state, action, reward, next_state, done)
total_reward += reward
state = next_state
steps += 1
if done:
break
episode_rewards.append(total_reward)
self.agent.decay_epsilon()
if (episode + 1) % 100 == 0:
avg_reward = sum(episode_rewards[-100:]) / 100
logger.info(f"Episode {episode + 1}/{n_episodes}, Avg Reward: {avg_reward:.2f}, Epsilon: {self.agent.epsilon:.3f}")
final_avg_reward = sum(episode_rewards[-100:]) / 100
results = {
"n_episodes": n_episodes,
"final_avg_reward": final_avg_reward,
"episode_rewards": episode_rewards,
"final_epsilon": self.agent.epsilon,
}
logger.info(f"Training complete. Final avg reward: {final_avg_reward:.2f}")
return results
def save(self, output_dir: str = "models/rl_battery") -> None:
from pathlib import Path
output_path = Path(output_dir)
output_path.mkdir(parents=True, exist_ok=True)
filepath = output_path / "battery_policy.pkl"
self.agent.save(filepath)
logger.info(f"Saved trained policy to {filepath}")
__all__ = ["BatteryRLTrainer"]