1 Commits

Author SHA1 Message Date
Nils Reiners
13f27e12e8 first version of load forecaster implemented - not yet running 2025-10-23 13:21:28 +02:00
3 changed files with 503 additions and 171 deletions

View File

@@ -0,0 +1,349 @@
# load_forecaster.py
# -*- coding: utf-8 -*-
"""
LoadForecaster: builds a 36-hour forecast at 15-min resolution from InfluxDB data.
- Data source: InfluxDB (Flux query provided by user)
- Target: House load = M_AC_real - I_AC_real
- Frequency: 15 minutes (changeable via init)
- Model: Keras (LSTM by default, pluggable)
- Persistence: Saves model (H5) and scaler (joblib)
Usage (example):
from load_forecaster import LoadForecaster
import tensorflow as tf
lf = LoadForecaster(
url="http://localhost:8086",
token="<YOUR_TOKEN>",
org="<YOUR_ORG>",
bucket="allmende_db",
agg_every="15m",
input_hours=72,
output_hours=36,
model_path="model/load_forecaster.h5",
scaler_path="model/scaler.joblib",
)
# Train or retrain
lf.train_and_save(train_days=90, epochs=60)
# Load model and forecast
model = lf.load_model()
forecast_df = lf.get_15min_forecast(model)
print(forecast_df.head())
"""
from __future__ import annotations
import os
import math
import json
import warnings
from dataclasses import dataclass
from typing import Optional, Tuple
import numpy as np
import pandas as pd
from influxdb_client import InfluxDBClient
from influxdb_client.client.warnings import MissingPivotFunction
from sklearn.preprocessing import StandardScaler
from sklearn.exceptions import NotFittedError
import joblib
# TensorFlow / Keras
import tensorflow as tf
from tensorflow.keras.models import Sequential, load_model
from tensorflow.keras.layers import LSTM, Dense, Dropout
from tensorflow.keras.callbacks import EarlyStopping
warnings.filterwarnings("ignore", category=MissingPivotFunction)
@dataclass
class InfluxParams:
url: str
token: str
org: str
bucket: str = "allmende_db"
class LoadForecaster:
def __init__(
self,
url: str,
token: str,
org: str,
bucket: str = "allmende_db",
agg_every: str = "15m",
input_hours: int = 72,
output_hours: int = 36,
model_path: str = "model/load_forecaster.h5",
scaler_path: str = "model/scaler.joblib",
feature_config: Optional[dict] = None,
) -> None:
self.influx = InfluxParams(url=url, token=token, org=org, bucket=bucket)
self.agg_every = agg_every
self.input_steps = int((input_hours * 60) / self._freq_minutes(agg_every))
self.output_steps = int((output_hours * 60) / self._freq_minutes(agg_every))
self.model_path = model_path
self.scaler_path = scaler_path
self.feature_config = feature_config or {"use_temp": True, "use_time_cyc": True}
self._scaler: Optional[StandardScaler] = None
# Ensure model dir exists
os.makedirs(os.path.dirname(model_path), exist_ok=True)
# ---------------------------- Public API ---------------------------- #
def get_15min_forecast(self, model: tf.keras.Model) -> pd.DataFrame:
"""Create a 36-hour forecast at 15-min resolution using the latest data.
Assumes a StandardScaler has been fitted during training and saved.
The method uses the most recent input window from InfluxDB.
"""
# Pull just enough history for one input window
history_hours = math.ceil(self.input_steps * self._freq_minutes(self.agg_every) / 60)
df = self._query_and_prepare(range_hours=history_hours)
if len(df) < self.input_steps:
raise RuntimeError(f"Not enough data: need {self.input_steps} steps, got {len(df)}")
# Build features for the latest window
feats = self._build_features(df)
X_window = feats[-self.input_steps :]
# Load scaler
scaler = self._load_or_get_scaler()
X_scaled = scaler.transform(X_window)
# Predict
pred_scaled = model.predict(X_scaled[np.newaxis, ...], verbose=0)[0]
# Inverse transform only the target column (index 0 is Load)
# Reconstruct a full array to inverse_transform
inv = np.zeros((self.output_steps, X_scaled.shape[1]))
inv[:, 0] = pred_scaled
inv_full = scaler.inverse_transform(inv)
y_pred = inv_full[:, 0]
# Build forecast index
last_ts = df.index[-1]
freq = pd.tseries.frequencies.to_offset(self.agg_every)
idx = pd.date_range(last_ts + freq, periods=self.output_steps, freq=freq)
out = pd.DataFrame({"Forecast_Load": y_pred}, index=idx)
out.index.name = "timestamp"
return out
def train_and_save(
self,
train_days: int = 90,
epochs: int = 80,
batch_size: int = 128,
validation_split: float = 0.2,
learning_rate: float = 1e-3,
fine_tune: bool = False,
) -> tf.keras.Model:
"""Train (or fine-tune) a model from recent history and persist model + scaler."""
df = self._query_and_prepare(range_hours=24 * train_days)
feats = self._build_features(df)
# Prepare windows
X, y = self._make_windows(feats)
if len(X) < 10:
raise RuntimeError("Not enough windowed samples to train.")
# Fit scaler on full X
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)
self._scaler = scaler
joblib.dump(scaler, self.scaler_path)
# Build model (or load existing for fine-tune)
if fine_tune and os.path.exists(self.model_path):
model = load_model(self.model_path)
else:
model = self._build_default_model(input_dim=X.shape[1], output_dim=self.output_steps, lr=learning_rate)
# Train
es = EarlyStopping(monitor="val_loss", patience=10, restore_best_weights=True)
model.fit(
X_scaled.reshape((-1, self.input_steps, X.shape[1] // self.input_steps)),
y,
epochs=epochs,
batch_size=batch_size,
validation_split=validation_split,
callbacks=[es],
verbose=1,
)
model.save(self.model_path)
return model
# A convenience wrapper to be called from an external script once per day
def retrain_daily(self, train_days: int = 90, epochs: int = 40, fine_tune: bool = True) -> None:
self.train_and_save(train_days=train_days, epochs=epochs, fine_tune=fine_tune)
def load_model(self) -> tf.keras.Model:
if not os.path.exists(self.model_path):
raise FileNotFoundError(f"Model not found at {self.model_path}")
return load_model(self.model_path)
# ------------------------- Internals: Data ------------------------- #
def _query_and_prepare(self, range_hours: int) -> pd.DataFrame:
"""Query InfluxDB for the last `range_hours` and construct the Load series.
Expected fields (exactly as in DB):
- "40206 - M_AC_Power"
- "40210 - M_AC_Power_SF"
- "40083 - I_AC_Power"
- "40084 - I_AC_Power_SF"
- "300 - Aussentemperatur"
"""
start_str = f"-{range_hours}h"
flux = f'''
from(bucket: "{self.influx.bucket}")
|> range(start: {start_str})
|> filter(fn: (r) => r["_measurement"] == "solaredge_meter" or r["_measurement"] == "solaredge_master" or r["_measurement"] == "hp_master")
|> filter(fn: (r) => r["_field"] == "40206 - M_AC_Power" or r["_field"] == "40210 - M_AC_Power_SF" or r["_field"] == "40083 - I_AC_Power" or r["_field"] == "40084 - I_AC_Power_SF" or r["_field"] == "300 - Aussentemperatur")
|> aggregateWindow(every: {self.agg_every}, fn: mean, createEmpty: false)
|> yield(name: "mean")
'''
with InfluxDBClient(url=self.influx.url, token=self.influx.token, org=self.influx.org) as client:
tables = client.query_api().query_data_frame(flux)
# Concatenate if list of frames is returned
if isinstance(tables, list):
df = pd.concat(tables, ignore_index=True)
else:
df = tables
# Keep relevant columns and pivot
df = df[["_time", "_field", "_value"]]
df = df.pivot(index="_time", columns="_field", values="_value").reset_index()
df = df.rename(
columns={
"_time": "timestamp",
"40206 - M_AC_Power": "M_AC",
"40210 - M_AC_Power_SF": "M_SF",
"40083 - I_AC_Power": "I_AC",
"40084 - I_AC_Power_SF": "I_SF",
"300 - Aussentemperatur": "Temp",
}
)
df = df.sort_values("timestamp").set_index("timestamp")
# Forward-fill reasonable gaps (e.g., scaler factors and temp)
df[["M_SF", "I_SF", "Temp"]] = df[["M_SF", "I_SF", "Temp"]].ffill()
# Apply scaling: real = value * 10^sf
df["I_AC_real"] = df["I_AC"] * np.power(10.0, df["I_SF"]).astype(float)
df["M_AC_real"] = df["M_AC"] * np.power(10.0, df["M_SF"]).astype(float)
# Compute load
df["Load"] = df["M_AC_real"] - df["I_AC_real"]
# Ensure regular 15-min grid
df = df.asfreq(self.agg_every)
df[["Load", "Temp"]] = df[["Load", "Temp"]].interpolate(limit_direction="both")
return df[["Load", "Temp"]]
def _build_features(self, df: pd.DataFrame) -> np.ndarray:
"""Create feature matrix: [Load, Temp?, sin/cos day, sin/cos dow]."""
feats = [df["Load"].values.reshape(-1, 1)]
if self.feature_config.get("use_temp", True):
feats.append(df["Temp"].values.reshape(-1, 1))
if self.feature_config.get("use_time_cyc", True):
idx = df.index
minute_of_day = (idx.hour * 60 + idx.minute).values.astype(float)
sod = 2 * np.pi * minute_of_day / (24 * 60)
dow = 2 * np.pi * idx.dayofweek.values.astype(float) / 7.0
feats.append(np.sin(sod).reshape(-1, 1))
feats.append(np.cos(sod).reshape(-1, 1))
feats.append(np.sin(dow).reshape(-1, 1))
feats.append(np.cos(dow).reshape(-1, 1))
X = np.hstack(feats) # shape: (T, n_features)
# Flatten windows to 2D for scaler fitting, but model expects 3D; we reshape later
return X
def _make_windows(self, X_2d: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
"""Create sliding windows: returns (X_flat, y) where X_flat stacks the windowed features.
For Keras we later reshape X_flat -> (N, input_steps, n_features).
"""
n = X_2d.shape[0]
n_features = X_2d.shape[1]
X_list, y_list = [], []
for i in range(n - self.input_steps - self.output_steps):
xw = X_2d[i : i + self.input_steps, :]
yw = X_2d[i + self.input_steps : i + self.input_steps + self.output_steps, 0] # target: Load
X_list.append(xw.reshape(-1)) # flatten
y_list.append(yw)
X_flat = np.stack(X_list)
y = np.stack(y_list)
return X_flat, y
# ----------------------- Internals: Modeling ----------------------- #
def _build_default_model(self, input_dim: int, output_dim: int, lr: float = 1e-3) -> tf.keras.Model:
n_features = input_dim // self.input_steps
model = Sequential([
LSTM(96, input_shape=(self.input_steps, n_features), return_sequences=False),
Dropout(0.1),
Dense(output_dim)
])
model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=lr), loss="mse")
return model
def _load_or_get_scaler(self) -> StandardScaler:
if self._scaler is not None:
return self._scaler
if not os.path.exists(self.scaler_path):
raise NotFittedError("Scaler not found. Train the model first to create scaler.")
self._scaler = joblib.load(self.scaler_path)
return self._scaler
@staticmethod
def _freq_minutes(spec: str) -> int:
# supports formats like "15m", "1h"
if spec.endswith("m"):
return int(spec[:-1])
if spec.endswith("h"):
return int(spec[:-1]) * 60
raise ValueError(f"Unsupported frequency spec: {spec}")
# ----------------------------- retrain_daily.py -----------------------------
# A tiny script you can run once per day (e.g., via cron/systemd) to retrain the model.
# It delegates the work to LoadForecaster.retrain_daily().
if __name__ == "__main__":
# Read credentials/config from env vars or fill here
URL = os.getenv("INFLUX_URL", "http://localhost:8086")
TOKEN = os.getenv("INFLUX_TOKEN", "<YOUR_TOKEN>")
ORG = os.getenv("INFLUX_ORG", "<YOUR_ORG>")
BUCKET = os.getenv("INFLUX_BUCKET", "allmende_db")
lf = LoadForecaster(
url=URL,
token=TOKEN,
org=ORG,
bucket=BUCKET,
agg_every="15m",
input_hours=72,
output_hours=36,
model_path=os.getenv("FORECASTER_MODEL", "model/load_forecaster.h5"),
scaler_path=os.getenv("FORECASTER_SCALER", "model/scaler.joblib"),
)
# One call per day is enough; decrease epochs for faster daily updates
lf.retrain_daily(train_days=int(os.getenv("TRAIN_DAYS", "120")), epochs=int(os.getenv("EPOCHS", "30")), fine_tune=True)
# Optionally, produce a fresh forecast right after training
try:
model = lf.load_model()
fc = lf.get_15min_forecast(model)
# Save latest forecast to CSV for dashboards/consumers
out_path = os.getenv("FORECAST_OUT", "model/latest_forecast_15min.csv")
os.makedirs(os.path.dirname(out_path), exist_ok=True)
fc.to_csv(out_path)
print(f"Saved forecast: {out_path}")
except Exception as e:
print(f"Forecast generation failed: {e}")

87
main.py
View File

@@ -26,57 +26,56 @@ db = DataBaseInflux(
bucket="allmende_db"
)
# hp_master = HeatPump(device_name='hp_master', ip_address='10.0.0.10', port=502)
# hp_slave = HeatPump(device_name='hp_slave', ip_address='10.0.0.11', port=502)
# shelly = ShellyPro3m(device_name='wohnung_2_6', ip_address='192.168.1.121')
wr_master = PvInverter(device_name='solaredge_master', ip_address='192.168.1.112', unit=1)
wr_slave = PvInverter(device_name='solaredge_slave', ip_address='192.168.1.112', unit=3)
hp_master = HeatPump(device_name='hp_master', ip_address='10.0.0.10', port=502)
hp_slave = HeatPump(device_name='hp_slave', ip_address='10.0.0.11', port=502)
shelly = ShellyPro3m(device_name='wohnung_2_6', ip_address='192.168.1.121')
wr = PvInverter(device_name='solaredge_master', ip_address='192.168.1.112')
meter = SolaredgeMeter(device_name='solaredge_meter', ip_address='192.168.1.112')
es.add_components(wr_master, wr_slave)#hp_master, hp_slave, shelly, wr_master, wr_slave, meter)
# controller = SgReadyController(es)
#
# # FORECASTING
# latitude = 48.041
# longitude = 7.862
# TZ = "Europe/Berlin"
# HORIZON_DAYS = 2
# weather_forecaster = WeatherForecaster(latitude=latitude, longitude=longitude)
# site = Location(latitude=latitude, longitude=longitude, altitude=35, tz=TZ, name="Gundelfingen")
#
# p_module = 435
# upper_roof_north = PvWattsSubarrayConfig(name="north", pdc0_w=(29+29+21)*p_module, tilt_deg=10, azimuth_deg=20, dc_loss=0.02, ac_loss=0.01)
# upper_roof_south = PvWattsSubarrayConfig(name="south", pdc0_w=(29+21+20)*p_module, tilt_deg=10, azimuth_deg=200, dc_loss=0.02, ac_loss=0.01)
# upper_roof_east = PvWattsSubarrayConfig(name="east", pdc0_w=7*p_module, tilt_deg=10, azimuth_deg=110, dc_loss=0.02, ac_loss=0.01)
# upper_roof_west = PvWattsSubarrayConfig(name="west", pdc0_w=7*p_module, tilt_deg=10, azimuth_deg=290, dc_loss=0.02, ac_loss=0.01)
# cfgs = [upper_roof_north, upper_roof_south, upper_roof_east, upper_roof_west]
# pv_plant = PvWattsPlant(site, cfgs)
#
# now = datetime.now()
# next_forecast_at = (now + dt.timedelta(hours=1)).replace(minute=0, second=0, microsecond=0)
es.add_components(hp_master, hp_slave, shelly, wr, meter)
controller = SgReadyController(es)
# FORECASTING
latitude = 48.041
longitude = 7.862
TZ = "Europe/Berlin"
HORIZON_DAYS = 2
weather_forecaster = WeatherForecaster(latitude=latitude, longitude=longitude)
site = Location(latitude=latitude, longitude=longitude, altitude=35, tz=TZ, name="Gundelfingen")
p_module = 435
upper_roof_north = PvWattsSubarrayConfig(name="north", pdc0_w=(29+29+21)*p_module, tilt_deg=10, azimuth_deg=20, dc_loss=0.02, ac_loss=0.01)
upper_roof_south = PvWattsSubarrayConfig(name="south", pdc0_w=(29+21+20)*p_module, tilt_deg=10, azimuth_deg=200, dc_loss=0.02, ac_loss=0.01)
upper_roof_east = PvWattsSubarrayConfig(name="east", pdc0_w=7*p_module, tilt_deg=10, azimuth_deg=110, dc_loss=0.02, ac_loss=0.01)
upper_roof_west = PvWattsSubarrayConfig(name="west", pdc0_w=7*p_module, tilt_deg=10, azimuth_deg=290, dc_loss=0.02, ac_loss=0.01)
cfgs = [upper_roof_north, upper_roof_south, upper_roof_east, upper_roof_west]
pv_plant = PvWattsPlant(site, cfgs)
now = datetime.now()
next_forecast_at = (now + dt.timedelta(hours=1)).replace(minute=0, second=0, microsecond=0)
while True:
now = datetime.now()
if now.second % interval_seconds == 0 and now.microsecond < 100_000:
state = es.get_state_and_store_to_database(db)
# mode = controller.perform_action(heat_pump_name='hp_master', meter_name='solaredge_meter', state=state)
#
# if mode == 'mode1':
# mode_as_binary = 0
# else:
# mode_as_binary = 1
# db.store_data('sg_ready', {'mode': mode_as_binary})
mode = controller.perform_action(heat_pump_name='hp_master', meter_name='solaredge_meter', state=state)
# if now >= next_forecast_at:
# # Start der Prognose: ab der kommenden vollen Stunde
# start_hour_local = (now + dt.timedelta(hours=1)).replace(minute=0, second=0, microsecond=0)
# weather = weather_forecaster.get_hourly_forecast(start_hour_local, HORIZON_DAYS)
# total = pv_plant.get_power(weather)
# db.store_forecasts('pv_forecast', total)
#
# # Nächste geplante Ausführung definieren (immer volle Stunde)
# # Falls wir durch Delay mehrere Stunden verpasst haben, hole auf:
# while next_forecast_at <= now:
# next_forecast_at = (next_forecast_at + dt.timedelta(hours=1)).replace(minute=0, second=0, microsecond=0)
if mode == 'mode1':
mode_as_binary = 0
else:
mode_as_binary = 1
db.store_data('sg_ready', {'mode': mode_as_binary})
if now >= next_forecast_at:
# Start der Prognose: ab der kommenden vollen Stunde
start_hour_local = (now + dt.timedelta(hours=1)).replace(minute=0, second=0, microsecond=0)
weather = weather_forecaster.get_hourly_forecast(start_hour_local, HORIZON_DAYS)
total = pv_plant.get_power(weather)
db.store_forecasts('pv_forecast', total)
# Nächste geplante Ausführung definieren (immer volle Stunde)
# Falls wir durch Delay mehrere Stunden verpasst haben, hole auf:
while next_forecast_at <= now:
next_forecast_at = (next_forecast_at + dt.timedelta(hours=1)).replace(minute=0, second=0, microsecond=0)
time.sleep(0.1)

View File

@@ -1,155 +1,139 @@
# pv_inverter.py
# -*- coding: utf-8 -*-
from typing import Optional, Dict, Any, List
from pymodbus.client import ModbusTcpClient
from pymodbus.exceptions import ModbusIOException
import struct
import time
import struct
import pandas as pd
from typing import Dict, Any, List, Tuple, Optional
from pymodbus.client import ModbusTcpClient
EXCEL_PATH = "modbus_registers/pv_inverter_registers.xlsx"
# Obergrenze: bis EXKLUSIVE 40206 (d.h. max. 40205)
MAX_ADDR_EXCLUSIVE = 40121
class PvInverter:
"""
Minimaler Reader für einen SolarEdge-Inverter hinter Modbus-TCP→RTU-Gateway.
Liest nur die bekannten Register (wie im funktionierenden Skript).
Kompatibel mit pymodbus 2.5.x und 3.x kein retry_on_empty.
"""
def __init__(
self,
device_name: str,
ip_address: str,
port: int = 502,
unit_id: int = 1,
timeout: float = 1.5,
silent_interval: float = 0.02,
):
def __init__(self, device_name: str, ip_address: str, port: int = 502, unit: int = 1):
self.device_name = device_name
self.host = ip_address
self.ip = ip_address
self.port = port
self.unit = unit_id
self.timeout = timeout
self.silent_interval = silent_interval
self.unit = unit
self.client: Optional[ModbusTcpClient] = None
self._connect()
self.registers: Dict[int, Dict[str, Any]] = {} # addr -> {"desc":..., "type":...}
self.connect_to_modbus()
self.load_registers(EXCEL_PATH)
# ---------------- Verbindung ----------------
def _connect(self):
# retries=0: keine internen Mehrfachversuche
self.client = ModbusTcpClient(self.host, port=self.port, timeout=self.timeout, retries=0)
# ---------- Verbindung ----------
def connect_to_modbus(self):
self.client = ModbusTcpClient(self.ip, port=self.port, timeout=3.0, retries=3)
if not self.client.connect():
raise ConnectionError(f"Verbindung zu {self.device_name} ({self.host}:{self.port}) fehlgeschlagen.")
print(f"✅ Verbindung hergestellt zu {self.device_name} ({self.host}:{self.port}, unit={self.unit})")
print("Verbindung zu Wechselrichter fehlgeschlagen.")
raise SystemExit(1)
print("✅ Verbindung zu Wechselrichter hergestellt.")
def close(self):
if self.client:
self.client.close()
self.client = None
# ---------------- Low-Level Lesen ----------------
def _read_regs(self, addr: int, count: int) -> Optional[List[int]]:
"""Liest 'count' Holding-Register ab base-0 'addr' für die konfigurierte Unit-ID."""
# ---------- Register-Liste ----------
def load_registers(self, excel_path: str):
xls = pd.ExcelFile(excel_path)
df = xls.parse()
# Passe Spaltennamen hier an, falls nötig:
cols = ["MB Adresse", "Beschreibung", "Variabel Typ"]
df = df[cols].dropna()
df["MB Adresse"] = df["MB Adresse"].astype(int)
# 1) Vorab-Filter: nur Adressen < 40206 übernehmen
df = df[df["MB Adresse"] < MAX_ADDR_EXCLUSIVE]
self.registers = {
int(row["MB Adresse"]): {
"desc": str(row["Beschreibung"]).strip(),
"type": str(row["Variabel Typ"]).strip()
}
for _, row in df.iterrows()
}
# ---------- Low-Level Lesen ----------
def _try_read(self, fn_name: str, address: int, count: int) -> Optional[List[int]]:
fn = getattr(self.client, fn_name)
# pymodbus 3.8.x hat 'slave='; Fallbacks schaden nicht
for kwargs in (dict(address=address, count=count, slave=self.unit),
dict(address=address, count=count)):
try:
rr = self.client.read_holding_registers(address=addr, count=count, slave=self.unit)
except ModbusIOException:
time.sleep(self.silent_interval)
return None
except Exception:
time.sleep(self.silent_interval)
res = fn(**kwargs)
if res is None or (hasattr(res, "isError") and res.isError()):
continue
return res.registers
except TypeError:
continue
return None
time.sleep(self.silent_interval)
if not rr or rr.isError():
return None
return rr.registers
def _read_any(self, address: int, count: int) -> Optional[List[int]]:
regs = self._try_read("read_holding_registers", address, count)
if regs is None:
regs = self._try_read("read_input_registers", address, count)
return regs
# ---------- Decoding ----------
@staticmethod
def _to_int16(u16: int) -> int:
def _to_i16(u16: int) -> int:
return struct.unpack(">h", struct.pack(">H", u16))[0]
@staticmethod
def _apply_sf(raw: int, sf: int) -> float:
return raw * (10 ** sf)
def _to_f32_from_two(u16_hi: int, u16_lo: int, msw_first: bool = True) -> float:
b = struct.pack(">HH", u16_hi, u16_lo) if msw_first else struct.pack(">HH", u16_lo, u16_hi)
return struct.unpack(">f", b)[0]
# Hilfsfunktion: wie viele 16-Bit-Register braucht dieser Typ?
@staticmethod
def _read_string_from_regs(regs: List[int]) -> Optional[str]:
b = b"".join(struct.pack(">H", r) for r in regs)
s = b.decode("ascii", errors="ignore").rstrip("\x00 ").strip()
return s or None
def _word_count_for_type(rtype: str) -> int:
rt = (rtype or "").lower()
# Passe hier an deine Excel-Typen an:
if "uint32" in rt or "real" in rt or "float" in rt or "string(32)" in rt:
return 2
# Default: 1 Wort (z.B. int16/uint16)
return 1
# ---------------- Hilfsfunktionen ----------------
def _read_string(self, addr: int, words: int) -> Optional[str]:
regs = self._read_regs(addr, words)
if regs is None:
def read_one(self, address_excel: int, rtype: str) -> Optional[float]:
"""
Liest einen Wert nach Typ ('INT' oder 'REAL' etc.).
Es werden ausschließlich Register < 40206 gelesen.
"""
addr = int(address_excel)
words = self._word_count_for_type(rtype)
# 2) Harte Grenze prüfen: höchstes angefasstes Register muss < 40206 sein
if addr + words - 1 >= MAX_ADDR_EXCLUSIVE:
# Überspringen, da der Lesevorgang die Grenze >= 40206 berühren würde
return None
return self._read_string_from_regs(regs)
def _read_scaled(self, value_addr: int, sf_addr: int) -> Optional[float]:
regs = self._read_regs(value_addr, 1)
sf = self._read_regs(sf_addr, 1)
if regs is None or sf is None:
if words == 2:
regs = self._read_any(addr, 2)
if not regs or len(regs) < 2:
return None
raw = self._to_int16(regs[0])
sff = self._to_int16(sf[0])
return self._apply_sf(raw, sff)
def _read_u32_with_sf(self, value_addr: int, sf_addr: int) -> Optional[float]:
regs = self._read_regs(value_addr, 2)
sf = self._read_regs(sf_addr, 1)
if regs is None or sf is None:
return None
u32 = (regs[0] << 16) | regs[1]
sff = self._to_int16(sf[0])
return self._apply_sf(u32, sff)
# ---------------- Öffentliche API ----------------
def get_state(self) -> Dict[str, Any]:
"""Liest exakt die bekannten Register und gibt ein Dict zurück."""
state: Dict[str, Any] = {}
# --- Common Block ---
state["C_Manufacturer"] = self._read_string(40004, 16)
state["C_Model"] = self._read_string(40020, 16)
state["C_Version"] = self._read_string(40044, 8)
state["C_SerialNumber"] = self._read_string(40052, 16)
# --- Inverter Block ---
state["I_AC_Power_W"] = self._read_scaled(40083, 40084)
state["I_AC_Voltage_V"] = self._read_scaled(40079, 40082)
state["I_AC_Frequency_Hz"] = self._read_scaled(40085, 40086)
state["I_DC_Power_W"] = self._read_scaled(40100, 40101)
state["I_AC_Energy_Wh_total"] = self._read_u32_with_sf(40093, 40095)
status_regs = self._read_regs(40107, 2)
if status_regs:
state["I_Status"] = status_regs[0]
state["I_Status_Vendor"] = status_regs[1]
# Deine bisherige Logik interpretiert 2 Worte als Float32:
return self._to_f32_from_two(regs[0], regs[1])
else:
state["I_Status"] = None
state["I_Status_Vendor"] = None
regs = self._read_any(addr, 1)
if not regs:
return None
return float(self._to_i16(regs[0]))
return state
# ---------------- Beispiel ----------------
if __name__ == "__main__":
MODBUS_IP = "192.168.1.112"
MODBUS_PORT = 502
master = PvInverter("solaredge_master", MODBUS_IP, port=MODBUS_PORT, unit_id=1)
slave = PvInverter("solaredge_slave", MODBUS_IP, port=MODBUS_PORT, unit_id=3)
try:
sm = master.get_state()
ss = slave.get_state()
print("\n=== MASTER ===")
for k, v in sm.items():
print(f"{k:22s}: {v}")
print("\n=== SLAVE ===")
for k, v in ss.items():
print(f"{k:22s}: {v}")
finally:
master.close()
slave.close()
def get_state(self) -> Dict[str, Any]:
"""
Liest ALLE Register aus self.registers und gibt dict zurück.
Achtet darauf, dass keine Adresse (inkl. Mehrwort) >= 40206 gelesen wird.
"""
data = {"Zeit": time.strftime("%Y-%m-%d %H:%M:%S")}
for address, meta in sorted(self.registers.items()):
words = self._word_count_for_type(meta["type"])
# 3) Nochmals Schutz auf Ebene der Iteration:
if address + words - 1 >= MAX_ADDR_EXCLUSIVE:
continue
val = self.read_one(address, meta["type"])
if val is None:
continue
key = f"{address} - {meta['desc']}"
data[key] = val
return data