9 Commits

Author SHA1 Message Date
Nils Reiners
13f27e12e8 first version of load forecaster implemented - not yet running 2025-10-23 13:21:28 +02:00
Nils Reiners
ba6ff9f6c3 stündliche Speicherung des Forecasts angepasst 2025-10-07 22:34:16 +02:00
Nils Reiners
9ccb1e042b stündliche Speicherung des Forecasts angepasst 2025-10-07 22:33:02 +02:00
Nils Reiners
a5bcfca39a stündliche Speicherung des Forecasts angepasst 2025-10-07 22:29:49 +02:00
Nils Reiners
a1f9e29134 pv forecaster added 2025-10-07 20:52:28 +02:00
Nils Reiners
98302b9af5 heat pump slave added 2025-09-28 20:21:54 +02:00
Nils Reiners
f3de1f9280 mode as binary 2025-09-25 21:45:09 +02:00
Nils Reiners
ecd0180483 debug 2025-09-25 21:30:42 +02:00
Nils Reiners
1784b7c283 storing sg ready mode to db 2025-09-25 21:24:45 +02:00
13 changed files with 695 additions and 9 deletions

View File

@@ -1,5 +1,7 @@
from influxdb_client import InfluxDBClient, Point, WritePrecision
from datetime import datetime
import datetime as dt
import pandas as pd
class DataBaseInflux:
def __init__(self, url: str, token: str, org: str, bucket: str):
@@ -25,4 +27,22 @@ class DataBaseInflux:
# Punkt in InfluxDB schreiben
self.write_api.write(bucket=self.bucket, org=self.org, record=point)
def store_forecasts(self, forecast_name: str, data: pd.Series):
measurement = forecast_name
run_tag = dt.datetime.now(dt.timezone.utc).replace(second=0, microsecond=0).isoformat(timespec="minutes")
pts = []
series = pd.to_numeric(data, errors="coerce").dropna()
for ts, val in series.items():
pts.append(
Point(measurement)
.tag("run", run_tag)
.field("value", float(val))
.time(ts.to_pydatetime(), WritePrecision.S)
)
self.write_api.write(bucket=self.bucket, org=self.org, record=pts)

View File

@@ -0,0 +1,349 @@
# load_forecaster.py
# -*- coding: utf-8 -*-
"""
LoadForecaster: builds a 36-hour forecast at 15-min resolution from InfluxDB data.
- Data source: InfluxDB (Flux query provided by user)
- Target: House load = M_AC_real - I_AC_real
- Frequency: 15 minutes (changeable via init)
- Model: Keras (LSTM by default, pluggable)
- Persistence: Saves model (H5) and scaler (joblib)
Usage (example):
from load_forecaster import LoadForecaster
import tensorflow as tf
lf = LoadForecaster(
url="http://localhost:8086",
token="<YOUR_TOKEN>",
org="<YOUR_ORG>",
bucket="allmende_db",
agg_every="15m",
input_hours=72,
output_hours=36,
model_path="model/load_forecaster.h5",
scaler_path="model/scaler.joblib",
)
# Train or retrain
lf.train_and_save(train_days=90, epochs=60)
# Load model and forecast
model = lf.load_model()
forecast_df = lf.get_15min_forecast(model)
print(forecast_df.head())
"""
from __future__ import annotations
import os
import math
import json
import warnings
from dataclasses import dataclass
from typing import Optional, Tuple
import numpy as np
import pandas as pd
from influxdb_client import InfluxDBClient
from influxdb_client.client.warnings import MissingPivotFunction
from sklearn.preprocessing import StandardScaler
from sklearn.exceptions import NotFittedError
import joblib
# TensorFlow / Keras
import tensorflow as tf
from tensorflow.keras.models import Sequential, load_model
from tensorflow.keras.layers import LSTM, Dense, Dropout
from tensorflow.keras.callbacks import EarlyStopping
warnings.filterwarnings("ignore", category=MissingPivotFunction)
@dataclass
class InfluxParams:
url: str
token: str
org: str
bucket: str = "allmende_db"
class LoadForecaster:
def __init__(
self,
url: str,
token: str,
org: str,
bucket: str = "allmende_db",
agg_every: str = "15m",
input_hours: int = 72,
output_hours: int = 36,
model_path: str = "model/load_forecaster.h5",
scaler_path: str = "model/scaler.joblib",
feature_config: Optional[dict] = None,
) -> None:
self.influx = InfluxParams(url=url, token=token, org=org, bucket=bucket)
self.agg_every = agg_every
self.input_steps = int((input_hours * 60) / self._freq_minutes(agg_every))
self.output_steps = int((output_hours * 60) / self._freq_minutes(agg_every))
self.model_path = model_path
self.scaler_path = scaler_path
self.feature_config = feature_config or {"use_temp": True, "use_time_cyc": True}
self._scaler: Optional[StandardScaler] = None
# Ensure model dir exists
os.makedirs(os.path.dirname(model_path), exist_ok=True)
# ---------------------------- Public API ---------------------------- #
def get_15min_forecast(self, model: tf.keras.Model) -> pd.DataFrame:
"""Create a 36-hour forecast at 15-min resolution using the latest data.
Assumes a StandardScaler has been fitted during training and saved.
The method uses the most recent input window from InfluxDB.
"""
# Pull just enough history for one input window
history_hours = math.ceil(self.input_steps * self._freq_minutes(self.agg_every) / 60)
df = self._query_and_prepare(range_hours=history_hours)
if len(df) < self.input_steps:
raise RuntimeError(f"Not enough data: need {self.input_steps} steps, got {len(df)}")
# Build features for the latest window
feats = self._build_features(df)
X_window = feats[-self.input_steps :]
# Load scaler
scaler = self._load_or_get_scaler()
X_scaled = scaler.transform(X_window)
# Predict
pred_scaled = model.predict(X_scaled[np.newaxis, ...], verbose=0)[0]
# Inverse transform only the target column (index 0 is Load)
# Reconstruct a full array to inverse_transform
inv = np.zeros((self.output_steps, X_scaled.shape[1]))
inv[:, 0] = pred_scaled
inv_full = scaler.inverse_transform(inv)
y_pred = inv_full[:, 0]
# Build forecast index
last_ts = df.index[-1]
freq = pd.tseries.frequencies.to_offset(self.agg_every)
idx = pd.date_range(last_ts + freq, periods=self.output_steps, freq=freq)
out = pd.DataFrame({"Forecast_Load": y_pred}, index=idx)
out.index.name = "timestamp"
return out
def train_and_save(
self,
train_days: int = 90,
epochs: int = 80,
batch_size: int = 128,
validation_split: float = 0.2,
learning_rate: float = 1e-3,
fine_tune: bool = False,
) -> tf.keras.Model:
"""Train (or fine-tune) a model from recent history and persist model + scaler."""
df = self._query_and_prepare(range_hours=24 * train_days)
feats = self._build_features(df)
# Prepare windows
X, y = self._make_windows(feats)
if len(X) < 10:
raise RuntimeError("Not enough windowed samples to train.")
# Fit scaler on full X
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)
self._scaler = scaler
joblib.dump(scaler, self.scaler_path)
# Build model (or load existing for fine-tune)
if fine_tune and os.path.exists(self.model_path):
model = load_model(self.model_path)
else:
model = self._build_default_model(input_dim=X.shape[1], output_dim=self.output_steps, lr=learning_rate)
# Train
es = EarlyStopping(monitor="val_loss", patience=10, restore_best_weights=True)
model.fit(
X_scaled.reshape((-1, self.input_steps, X.shape[1] // self.input_steps)),
y,
epochs=epochs,
batch_size=batch_size,
validation_split=validation_split,
callbacks=[es],
verbose=1,
)
model.save(self.model_path)
return model
# A convenience wrapper to be called from an external script once per day
def retrain_daily(self, train_days: int = 90, epochs: int = 40, fine_tune: bool = True) -> None:
self.train_and_save(train_days=train_days, epochs=epochs, fine_tune=fine_tune)
def load_model(self) -> tf.keras.Model:
if not os.path.exists(self.model_path):
raise FileNotFoundError(f"Model not found at {self.model_path}")
return load_model(self.model_path)
# ------------------------- Internals: Data ------------------------- #
def _query_and_prepare(self, range_hours: int) -> pd.DataFrame:
"""Query InfluxDB for the last `range_hours` and construct the Load series.
Expected fields (exactly as in DB):
- "40206 - M_AC_Power"
- "40210 - M_AC_Power_SF"
- "40083 - I_AC_Power"
- "40084 - I_AC_Power_SF"
- "300 - Aussentemperatur"
"""
start_str = f"-{range_hours}h"
flux = f'''
from(bucket: "{self.influx.bucket}")
|> range(start: {start_str})
|> filter(fn: (r) => r["_measurement"] == "solaredge_meter" or r["_measurement"] == "solaredge_master" or r["_measurement"] == "hp_master")
|> filter(fn: (r) => r["_field"] == "40206 - M_AC_Power" or r["_field"] == "40210 - M_AC_Power_SF" or r["_field"] == "40083 - I_AC_Power" or r["_field"] == "40084 - I_AC_Power_SF" or r["_field"] == "300 - Aussentemperatur")
|> aggregateWindow(every: {self.agg_every}, fn: mean, createEmpty: false)
|> yield(name: "mean")
'''
with InfluxDBClient(url=self.influx.url, token=self.influx.token, org=self.influx.org) as client:
tables = client.query_api().query_data_frame(flux)
# Concatenate if list of frames is returned
if isinstance(tables, list):
df = pd.concat(tables, ignore_index=True)
else:
df = tables
# Keep relevant columns and pivot
df = df[["_time", "_field", "_value"]]
df = df.pivot(index="_time", columns="_field", values="_value").reset_index()
df = df.rename(
columns={
"_time": "timestamp",
"40206 - M_AC_Power": "M_AC",
"40210 - M_AC_Power_SF": "M_SF",
"40083 - I_AC_Power": "I_AC",
"40084 - I_AC_Power_SF": "I_SF",
"300 - Aussentemperatur": "Temp",
}
)
df = df.sort_values("timestamp").set_index("timestamp")
# Forward-fill reasonable gaps (e.g., scaler factors and temp)
df[["M_SF", "I_SF", "Temp"]] = df[["M_SF", "I_SF", "Temp"]].ffill()
# Apply scaling: real = value * 10^sf
df["I_AC_real"] = df["I_AC"] * np.power(10.0, df["I_SF"]).astype(float)
df["M_AC_real"] = df["M_AC"] * np.power(10.0, df["M_SF"]).astype(float)
# Compute load
df["Load"] = df["M_AC_real"] - df["I_AC_real"]
# Ensure regular 15-min grid
df = df.asfreq(self.agg_every)
df[["Load", "Temp"]] = df[["Load", "Temp"]].interpolate(limit_direction="both")
return df[["Load", "Temp"]]
def _build_features(self, df: pd.DataFrame) -> np.ndarray:
"""Create feature matrix: [Load, Temp?, sin/cos day, sin/cos dow]."""
feats = [df["Load"].values.reshape(-1, 1)]
if self.feature_config.get("use_temp", True):
feats.append(df["Temp"].values.reshape(-1, 1))
if self.feature_config.get("use_time_cyc", True):
idx = df.index
minute_of_day = (idx.hour * 60 + idx.minute).values.astype(float)
sod = 2 * np.pi * minute_of_day / (24 * 60)
dow = 2 * np.pi * idx.dayofweek.values.astype(float) / 7.0
feats.append(np.sin(sod).reshape(-1, 1))
feats.append(np.cos(sod).reshape(-1, 1))
feats.append(np.sin(dow).reshape(-1, 1))
feats.append(np.cos(dow).reshape(-1, 1))
X = np.hstack(feats) # shape: (T, n_features)
# Flatten windows to 2D for scaler fitting, but model expects 3D; we reshape later
return X
def _make_windows(self, X_2d: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
"""Create sliding windows: returns (X_flat, y) where X_flat stacks the windowed features.
For Keras we later reshape X_flat -> (N, input_steps, n_features).
"""
n = X_2d.shape[0]
n_features = X_2d.shape[1]
X_list, y_list = [], []
for i in range(n - self.input_steps - self.output_steps):
xw = X_2d[i : i + self.input_steps, :]
yw = X_2d[i + self.input_steps : i + self.input_steps + self.output_steps, 0] # target: Load
X_list.append(xw.reshape(-1)) # flatten
y_list.append(yw)
X_flat = np.stack(X_list)
y = np.stack(y_list)
return X_flat, y
# ----------------------- Internals: Modeling ----------------------- #
def _build_default_model(self, input_dim: int, output_dim: int, lr: float = 1e-3) -> tf.keras.Model:
n_features = input_dim // self.input_steps
model = Sequential([
LSTM(96, input_shape=(self.input_steps, n_features), return_sequences=False),
Dropout(0.1),
Dense(output_dim)
])
model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=lr), loss="mse")
return model
def _load_or_get_scaler(self) -> StandardScaler:
if self._scaler is not None:
return self._scaler
if not os.path.exists(self.scaler_path):
raise NotFittedError("Scaler not found. Train the model first to create scaler.")
self._scaler = joblib.load(self.scaler_path)
return self._scaler
@staticmethod
def _freq_minutes(spec: str) -> int:
# supports formats like "15m", "1h"
if spec.endswith("m"):
return int(spec[:-1])
if spec.endswith("h"):
return int(spec[:-1]) * 60
raise ValueError(f"Unsupported frequency spec: {spec}")
# ----------------------------- retrain_daily.py -----------------------------
# A tiny script you can run once per day (e.g., via cron/systemd) to retrain the model.
# It delegates the work to LoadForecaster.retrain_daily().
if __name__ == "__main__":
# Read credentials/config from env vars or fill here
URL = os.getenv("INFLUX_URL", "http://localhost:8086")
TOKEN = os.getenv("INFLUX_TOKEN", "<YOUR_TOKEN>")
ORG = os.getenv("INFLUX_ORG", "<YOUR_ORG>")
BUCKET = os.getenv("INFLUX_BUCKET", "allmende_db")
lf = LoadForecaster(
url=URL,
token=TOKEN,
org=ORG,
bucket=BUCKET,
agg_every="15m",
input_hours=72,
output_hours=36,
model_path=os.getenv("FORECASTER_MODEL", "model/load_forecaster.h5"),
scaler_path=os.getenv("FORECASTER_SCALER", "model/scaler.joblib"),
)
# One call per day is enough; decrease epochs for faster daily updates
lf.retrain_daily(train_days=int(os.getenv("TRAIN_DAYS", "120")), epochs=int(os.getenv("EPOCHS", "30")), fine_tune=True)
# Optionally, produce a fresh forecast right after training
try:
model = lf.load_model()
fc = lf.get_15min_forecast(model)
# Save latest forecast to CSV for dashboards/consumers
out_path = os.getenv("FORECAST_OUT", "model/latest_forecast_15min.csv")
os.makedirs(os.path.dirname(out_path), exist_ok=True)
fc.to_csv(out_path)
print(f"Saved forecast: {out_path}")
except Exception as e:
print(f"Forecast generation failed: {e}")

View File

@@ -0,0 +1,61 @@
#!/usr/bin/env python3
import time
import datetime as dt
import requests
from zoneinfo import ZoneInfo
from matplotlib import pyplot as plt
import pandas as pd
TZ = "Europe/Berlin"
DAYS = 2
OPEN_METEO_URL = "https://api.open-meteo.com/v1/forecast"
class WeatherForecaster:
def __init__(self, latitude, longitude):
self.lat = latitude
self.lon = longitude
def get_hourly_forecast(self, start_hour, days):
start_hour_local = start_hour
end_hour_local = start_hour_local + dt.timedelta(days=days)
params = {
"latitude": self.lat,
"longitude": self.lon,
"hourly": ["temperature_2m", "shortwave_radiation", "wind_speed_10m"],
"timezone": TZ,
"start_hour": start_hour_local.strftime("%Y-%m-%dT%H:%M"),
"end_hour": end_hour_local.strftime("%Y-%m-%dT%H:%M")
}
h = requests.get(OPEN_METEO_URL, params=params).json()["hourly"]
time_stamps = h["time"]
time_stamps = [
dt.datetime.fromisoformat(t).replace(tzinfo=ZoneInfo(TZ))
for t in time_stamps
]
weather = pd.DataFrame(index=time_stamps)
weather["ghi"] = h["shortwave_radiation"]
weather["temp_air"] = h["temperature_2m"]
weather["wind_speed"] = h["wind_speed_10m"]
return weather
if __name__=='__main__':
weather_forecast = WeatherForecaster(latitude=48.041, longitude=7.862)
while True:
now = dt.datetime.now()
secs = 60 - now.second #(60 - now.minute) * 60 - now.second # Sekunden bis volle Stunde
time.sleep(secs)
now_local = dt.datetime.now()
start_hour_local = (now_local + dt.timedelta(hours=1)).replace(minute=0, second=0, microsecond=0)
time_stamps, temps, ghi, wind_speed = weather_forecast.get_hourly_forecast(start_hour_local, DAYS)
plt.plot(time_stamps, temps)
plt.show()

48
main.py
View File

@@ -1,12 +1,16 @@
import time
from datetime import datetime
from data_base_influx import DataBaseInflux
from forecaster.weather_forecaster import WeatherForecaster
from heat_pump import HeatPump
from pv_inverter import PvInverter
from simulators.pv_plant_simulator import PvWattsSubarrayConfig, PvWattsPlant
from solaredge_meter import SolaredgeMeter
from shelly_pro_3m import ShellyPro3m
from energysystem import EnergySystem
from sg_ready_controller import SgReadyController
from pvlib.location import Location
import datetime as dt
# For dev-System run in terminal: ssh -N -L 127.0.0.1:8111:10.0.0.10:502 pi@192.168.1.146
# For productive-System change IP-adress in heatpump to '10.0.0.10' and port to 502
@@ -22,19 +26,57 @@ db = DataBaseInflux(
bucket="allmende_db"
)
hp = HeatPump(device_name='hp_master', ip_address='10.0.0.10', port=502)
hp_master = HeatPump(device_name='hp_master', ip_address='10.0.0.10', port=502)
hp_slave = HeatPump(device_name='hp_slave', ip_address='10.0.0.11', port=502)
shelly = ShellyPro3m(device_name='wohnung_2_6', ip_address='192.168.1.121')
wr = PvInverter(device_name='solaredge_master', ip_address='192.168.1.112')
meter = SolaredgeMeter(device_name='solaredge_meter', ip_address='192.168.1.112')
es.add_components(hp, shelly, wr, meter)
es.add_components(hp_master, hp_slave, shelly, wr, meter)
controller = SgReadyController(es)
# FORECASTING
latitude = 48.041
longitude = 7.862
TZ = "Europe/Berlin"
HORIZON_DAYS = 2
weather_forecaster = WeatherForecaster(latitude=latitude, longitude=longitude)
site = Location(latitude=latitude, longitude=longitude, altitude=35, tz=TZ, name="Gundelfingen")
p_module = 435
upper_roof_north = PvWattsSubarrayConfig(name="north", pdc0_w=(29+29+21)*p_module, tilt_deg=10, azimuth_deg=20, dc_loss=0.02, ac_loss=0.01)
upper_roof_south = PvWattsSubarrayConfig(name="south", pdc0_w=(29+21+20)*p_module, tilt_deg=10, azimuth_deg=200, dc_loss=0.02, ac_loss=0.01)
upper_roof_east = PvWattsSubarrayConfig(name="east", pdc0_w=7*p_module, tilt_deg=10, azimuth_deg=110, dc_loss=0.02, ac_loss=0.01)
upper_roof_west = PvWattsSubarrayConfig(name="west", pdc0_w=7*p_module, tilt_deg=10, azimuth_deg=290, dc_loss=0.02, ac_loss=0.01)
cfgs = [upper_roof_north, upper_roof_south, upper_roof_east, upper_roof_west]
pv_plant = PvWattsPlant(site, cfgs)
now = datetime.now()
next_forecast_at = (now + dt.timedelta(hours=1)).replace(minute=0, second=0, microsecond=0)
while True:
now = datetime.now()
if now.second % interval_seconds == 0 and now.microsecond < 100_000:
state = es.get_state_and_store_to_database(db)
controller.perform_action(heat_pump_name='hp_master', meter_name='solaredge_meter', state=state)
mode = controller.perform_action(heat_pump_name='hp_master', meter_name='solaredge_meter', state=state)
if mode == 'mode1':
mode_as_binary = 0
else:
mode_as_binary = 1
db.store_data('sg_ready', {'mode': mode_as_binary})
if now >= next_forecast_at:
# Start der Prognose: ab der kommenden vollen Stunde
start_hour_local = (now + dt.timedelta(hours=1)).replace(minute=0, second=0, microsecond=0)
weather = weather_forecaster.get_hourly_forecast(start_hour_local, HORIZON_DAYS)
total = pv_plant.get_power(weather)
db.store_forecasts('pv_forecast', total)
# Nächste geplante Ausführung definieren (immer volle Stunde)
# Falls wir durch Delay mehrere Stunden verpasst haben, hole auf:
while next_forecast_at <= now:
next_forecast_at = (next_forecast_at + dt.timedelta(hours=1)).replace(minute=0, second=0, microsecond=0)
time.sleep(0.1)

View File

@@ -1 +0,0 @@
,nils,nils-ThinkPad-P52,25.09.2025 17:32,file:///home/nils/.config/libreoffice/4;

View File

@@ -2,3 +2,4 @@ pymodbus~=3.8.6
pandas
openpyxl
sshtunnel
pvlib

View File

@@ -9,11 +9,15 @@ class SgReadyController():
meter_values = state[meter_name]
power_to_grid = meter_values['40206 - M_AC_Power'] * 10 ** meter_values['40210 - M_AC_Power_SF']
mode = None
if power_to_grid > 10000:
self.switch_sg_ready_mode(hp.ip, hp.port, 'mode2')
mode = 'mode2'
self.switch_sg_ready_mode(hp.ip, hp.port, mode)
elif power_to_grid < 0:
self.switch_sg_ready_mode(hp.ip, hp.port, 'mode1')
mode = 'mode1'
self.switch_sg_ready_mode(hp.ip, hp.port, mode)
return mode
def switch_sg_ready_mode(self, ip, port, mode):
"""

View File

@@ -0,0 +1,210 @@
from __future__ import annotations
from dataclasses import dataclass
from typing import Optional, Dict, List, Literal, Tuple, Union
import numpy as np
import pandas as pd
import pvlib
import matplotlib.pyplot as plt
from pvlib.location import Location
from pvlib.pvsystem import PVSystem
from pvlib.modelchain import ModelChain
SeriesOrArray = Union[pd.Series, np.ndarray]
# ----------------------------- Konfiguration -----------------------------
@dataclass
class PvWattsSubarrayConfig:
name: str
pdc0_w: float # STC-DC-Leistung [W]
tilt_deg: float # Neigung (0=horizontal)
azimuth_deg: float # Azimut (180=Süd)
gamma_pdc: float = -0.004 # Tempkoeff. [1/K]
eta_inv_nom: float = 0.96 # WR-Wirkungsgrad (nominal)
albedo: float = 0.2 # Bodenreflexion
# Pauschale Verluste (PVWatts-Losses)
dc_loss: float = 0.0
ac_loss: float = 0.0
soiling: float = 0.0
# Modell
transposition_model: Literal["perez","haydavies","isotropic","klucher","reindl"] = "perez"
# ------------------------------ Subarray ---------------------------------
class PvWattsSubarray:
"""
Ein Subarray mit pvlib.ModelChain (PVWatts).
Berechnet automatisch DNI/DHI aus GHI (ERBS-Methode)
und nutzt ein SAPM-Temperaturmodell.
"""
def __init__(self, cfg: PvWattsSubarrayConfig, location: Location):
self.cfg = cfg
self.location = location
self._mc: Optional[ModelChain] = None
# ---------------------------------------------------------------------
def _create_modelchain(self) -> ModelChain:
"""Erzeuge eine pvlib.ModelChain-Instanz mit PVWatts-Parametern."""
temp_params = pvlib.temperature.TEMPERATURE_MODEL_PARAMETERS["sapm"]["open_rack_glass_polymer"]
system = PVSystem(
surface_tilt=self.cfg.tilt_deg,
surface_azimuth=self.cfg.azimuth_deg,
module_parameters={"pdc0": self.cfg.pdc0_w, "gamma_pdc": self.cfg.gamma_pdc},
inverter_parameters={"pdc0": self.cfg.pdc0_w, "eta_inv_nom": self.cfg.eta_inv_nom},
albedo=self.cfg.albedo,
temperature_model_parameters=temp_params,
module_type="glass_polymer",
racking_model="open_rack",
)
mc = ModelChain(
system, self.location,
transposition_model=self.cfg.transposition_model,
solar_position_method="nrel_numpy",
airmass_model="kastenyoung1989",
dc_model="pvwatts",
ac_model="pvwatts",
aoi_model="physical",
spectral_model=None,
losses_model="pvwatts",
temperature_model="sapm",
)
mc.losses_parameters = {
"dc_loss": float(self.cfg.dc_loss),
"ac_loss": float(self.cfg.ac_loss),
"soiling": float(self.cfg.soiling),
}
self._mc = mc
return mc
# ---------------------------------------------------------------------
def calc_dni_and_dhi(self, weather: pd.DataFrame) -> pd.DataFrame:
"""
Berechnet DNI & DHI aus GHI über die ERBS-Methode.
Gibt ein neues DataFrame mit 'ghi', 'dni', 'dhi' zurück.
"""
if "ghi" not in weather:
raise ValueError("Wetterdaten benötigen mindestens 'ghi'.")
# Sonnenstand bestimmen
sp = self.location.get_solarposition(weather.index)
erbs = pvlib.irradiance.erbs(weather["ghi"], sp["zenith"], weather.index)
out = weather.copy()
out["dni"] = erbs["dni"].clip(lower=0)
out["dhi"] = erbs["dhi"].clip(lower=0)
return out
# ---------------------------------------------------------------------
def _prepare_weather(self, weather: pd.DataFrame) -> pd.DataFrame:
"""Sichert vollständige Spalten (ghi, dni, dhi, temp_air, wind_speed)."""
if "ghi" not in weather or "temp_air" not in weather:
raise ValueError("weather benötigt Spalten: 'ghi' und 'temp_air'.")
w = weather.copy()
# Zeitzone prüfen
if w.index.tz is None:
w.index = w.index.tz_localize(self.location.tz)
else:
if str(w.index.tz) != str(self.location.tz):
w = w.tz_convert(self.location.tz)
# Wind default
if "wind_speed" not in w:
w["wind_speed"] = 1.0
# DNI/DHI ergänzen (immer mit ERBS)
if "dni" not in w or "dhi" not in w:
w = self.calc_dni_and_dhi(w)
return w
# ---------------------------------------------------------------------
def get_power(self, weather: pd.DataFrame) -> pd.Series:
"""
Berechnet AC-Leistung aus Wetterdaten.
"""
w = self._prepare_weather(weather)
mc = self._create_modelchain()
mc.run_model(weather=w)
return mc.results.ac.rename(self.cfg.name)
# ------------------------------- Anlage ----------------------------------
class PvWattsPlant:
"""
Eine PV-Anlage mit mehreren Subarrays, die ein gemeinsames Wetter-DataFrame nutzt.
"""
def __init__(self, site: Location, subarray_cfgs: List[PvWattsSubarrayConfig]):
self.site = site
self.subs: Dict[str, PvWattsSubarray] = {c.name: PvWattsSubarray(c, site) for c in subarray_cfgs}
def get_power(
self,
weather: pd.DataFrame,
*,
return_breakdown: bool = False
) -> pd.Series | Tuple[pd.Series, Dict[str, pd.Series]]:
"""Berechne Gesamtleistung und optional Einzel-Subarrays."""
parts: Dict[str, pd.Series] = {name: sub.get_power(weather) for name, sub in self.subs.items()}
# gemeinsamen Index bilden
idx = list(parts.values())[0].index
for s in parts.values():
idx = idx.intersection(s.index)
parts = {k: v.reindex(idx).fillna(0.0) for k, v in parts.items()}
total = sum(parts.values())
total.name = "total_ac"
if return_breakdown:
return total, parts
return total
# --------------------------- Beispielnutzung -----------------------------
if __name__ == "__main__":
# Standort
site = Location(latitude=52.52, longitude=13.405, altitude=35, tz="Europe/Berlin", name="Berlin")
# Zeitachse: 1 Tag, 15-minütig
times = pd.date_range("2025-06-21 00:00", "2025-06-21 23:45", freq="15min", tz=site.tz)
# Dummy-Wetter
ghi = 1000 * np.clip(np.sin(np.linspace(0, np.pi, len(times)))**1.2, 0, None)
temp_air = 16 + 8 * np.clip(np.sin(np.linspace(-np.pi/2, np.pi/2, len(times))), 0, None)
wind = np.full(len(times), 1.0)
weather = pd.DataFrame(index=times)
weather["ghi"] = ghi
weather["temp_air"] = temp_air
weather["wind_speed"] = wind
# Zwei Subarrays
cfgs = [
PvWattsSubarrayConfig(name="Sued_30", pdc0_w=6000, tilt_deg=30, azimuth_deg=180, dc_loss=0.02, ac_loss=0.01),
PvWattsSubarrayConfig(name="West_20", pdc0_w=4000, tilt_deg=20, azimuth_deg=270, soiling=0.02),
]
plant = PvWattsPlant(site, cfgs)
# Simulation
total, parts = plant.get_power(weather, return_breakdown=True)
# Plot
plt.figure(figsize=(10, 6))
plt.plot(total.index, total / 1000, label="Gesamtleistung (AC)", linewidth=2, color="black")
for name, s in parts.items():
plt.plot(s.index, s / 1000, label=name)
plt.title("PV-Leistung (PVWatts, ERBS-Methode für DNI/DHI)")
plt.ylabel("Leistung [kW]")
plt.xlabel("Zeit")
plt.legend()
plt.grid(True, linestyle="--", alpha=0.5)
plt.tight_layout()
plt.show()