22 Commits

Author SHA1 Message Date
Nils Reiners
13f27e12e8 first version of load forecaster implemented - not yet running 2025-10-23 13:21:28 +02:00
Nils Reiners
ba6ff9f6c3 stündliche Speicherung des Forecasts angepasst 2025-10-07 22:34:16 +02:00
Nils Reiners
9ccb1e042b stündliche Speicherung des Forecasts angepasst 2025-10-07 22:33:02 +02:00
Nils Reiners
a5bcfca39a stündliche Speicherung des Forecasts angepasst 2025-10-07 22:29:49 +02:00
Nils Reiners
a1f9e29134 pv forecaster added 2025-10-07 20:52:28 +02:00
Nils Reiners
98302b9af5 heat pump slave added 2025-09-28 20:21:54 +02:00
Nils Reiners
f3de1f9280 mode as binary 2025-09-25 21:45:09 +02:00
Nils Reiners
ecd0180483 debug 2025-09-25 21:30:42 +02:00
Nils Reiners
1784b7c283 storing sg ready mode to db 2025-09-25 21:24:45 +02:00
Nils Reiners
b066658eb0 controller implemented and tested 2025-09-25 21:16:51 +02:00
Nils Reiners
0bcf8a2d8c inverter and meter seems to run 2025-09-18 14:14:53 +02:00
Nils Reiners
397935f51a minor changes 2025-09-16 22:55:13 +02:00
Nils Reiners
8eda3bc954 reading out registers corrected 2025-09-16 22:46:42 +02:00
Nils Reiners
b9cba11be7 cleaned up 2025-09-16 12:57:37 +02:00
Nils Reiners
5319a299be inverter was included 2025-09-16 12:52:27 +02:00
Nils Reiners
2186c4d7db wechselrichter zum tesent eingebunden 2025-09-14 10:52:50 +02:00
Nils Reiners
7df61fd6c1 shelly upgedatet 2025-05-26 21:31:28 +02:00
Nils Reiners
0734f7a810 shelly hinzugefügt 2025-05-26 21:08:16 +02:00
Nils Reiners
65a75e061b läuft 2025-04-26 22:31:14 +01:00
Nils Reiners
974ec43f10 influx data base added 2025-04-26 23:13:22 +02:00
Nils Reiners
f0d390cd59 Merge branch 'feature_wp_klasse' 2025-04-18 18:59:48 +02:00
Nils Reiners
a7e67cc8f1 daten übertragen 2025-04-18 12:46:15 +01:00
31 changed files with 1210 additions and 5160559 deletions

38
README
View File

@@ -11,10 +11,42 @@ Was needs to be done on the Raspberry pi before the tool can run.
- pip install -r requirements.txt
How to run the script:
3) How to run the script for testing:
- nohup python main.py > terminal_log 2>&1 &
nohup python main.py > terminal_log 2>&1 &
For reading out the terminal_log while script is runing:
- tail -f terminal_log
tail -f terminal_log
4) Implement and run the ems as systemd service:
create:
/etc/systemd/system/allmende_ems.service
insert:
[Unit]
Description=Allmende EMS Python Script
After=network.target
[Service]
WorkingDirectory=/home/pi/projects/allmende_ems
ExecStart=/home/pi/allmende_ems/bin/python3.11 /home/pi/projects/allmende_ems/main.py
Restart=always
RestartSec=5
StandardOutput=journal
StandardError=journal
[Install]
WantedBy=multi-user.target
manage the service with the following commands:
Once:
sudo systemctl daemon-reload
sudo systemctl start allmende_ems.service
sudo systemctl enable allmende_ems.service
While running:
sudo systemctl status allmende_ems.service
sudo systemctl restart allmende_ems.service
sudo systemctl stop allmende_ems.service
journalctl -u allmende_ems.service

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@@ -1,46 +0,0 @@
import csv
import os
import tempfile
import shutil
class DataBaseCsv:
def __init__(self, filename: str):
self.filename = filename
def store_data(self, data: dict):
new_fields = list(data.keys())
# If file does not exist or is empty → create new file with header
if not os.path.exists(self.filename) or os.path.getsize(self.filename) == 0:
with open(self.filename, mode='w', newline='') as csv_file:
writer = csv.DictWriter(csv_file, fieldnames=new_fields)
writer.writeheader()
writer.writerow(data)
return
# If file exists → read existing header and data
with open(self.filename, mode='r', newline='') as csv_file:
reader = csv.DictReader(csv_file)
existing_fields = reader.fieldnames
existing_data = list(reader)
# Merge old and new fields (keep original order, add new ones)
all_fields = existing_fields.copy()
for field in new_fields:
if field not in all_fields:
all_fields.append(field)
# Write to a temporary file with updated header
with tempfile.NamedTemporaryFile(mode='w', delete=False, newline='', encoding='utf-8') as tmp_file:
writer = csv.DictWriter(tmp_file, fieldnames=all_fields)
writer.writeheader()
# Write old rows with updated field list
for row in existing_data:
writer.writerow({field: row.get(field, '') for field in all_fields})
# Write new data row
writer.writerow({field: data.get(field, '') for field in all_fields})
# Replace original file with updated temporary file
shutil.move(tmp_file.name, self.filename)

48
data_base_influx.py Normal file
View File

@@ -0,0 +1,48 @@
from influxdb_client import InfluxDBClient, Point, WritePrecision
from datetime import datetime
import datetime as dt
import pandas as pd
class DataBaseInflux:
def __init__(self, url: str, token: str, org: str, bucket: str):
self.url = url
self.token = token
self.org = org
self.bucket = bucket
self.client = InfluxDBClient(url=self.url, token=self.token, org=self.org)
self.write_api = self.client.write_api()
def store_data(self, device_name: str, data: dict):
measurement = device_name # Fest auf "messungen" gesetzt
point = Point(measurement)
# Alle Key/Value-Paare als Fields speichern
for key, value in data.items():
point = point.field(key, value)
# Zeitstempel automatisch auf jetzt setzen
point = point.time(datetime.utcnow(), WritePrecision.NS)
# Punkt in InfluxDB schreiben
self.write_api.write(bucket=self.bucket, org=self.org, record=point)
def store_forecasts(self, forecast_name: str, data: pd.Series):
measurement = forecast_name
run_tag = dt.datetime.now(dt.timezone.utc).replace(second=0, microsecond=0).isoformat(timespec="minutes")
pts = []
series = pd.to_numeric(data, errors="coerce").dropna()
for ts, val in series.items():
pts.append(
Point(measurement)
.tag("run", run_tag)
.field("value", float(val))
.time(ts.to_pydatetime(), WritePrecision.S)
)
self.write_api.write(bucket=self.bucket, org=self.org, record=pts)

25
energysystem.py Normal file
View File

@@ -0,0 +1,25 @@
class EnergySystem():
def __init__(self):
self.components = []
def add_components(self, *args):
for comp in args:
self.components.append(comp)
def get_state_and_store_to_database(self, db):
state = {}
for comp in self.components:
component_state = comp.get_state()
state[comp.device_name] = component_state
db.store_data(comp.device_name, component_state)
return state
def get_component_by_name(self, name):
for comp in self.components:
if comp.device_name == name:
return comp

View File

@@ -0,0 +1,349 @@
# load_forecaster.py
# -*- coding: utf-8 -*-
"""
LoadForecaster: builds a 36-hour forecast at 15-min resolution from InfluxDB data.
- Data source: InfluxDB (Flux query provided by user)
- Target: House load = M_AC_real - I_AC_real
- Frequency: 15 minutes (changeable via init)
- Model: Keras (LSTM by default, pluggable)
- Persistence: Saves model (H5) and scaler (joblib)
Usage (example):
from load_forecaster import LoadForecaster
import tensorflow as tf
lf = LoadForecaster(
url="http://localhost:8086",
token="<YOUR_TOKEN>",
org="<YOUR_ORG>",
bucket="allmende_db",
agg_every="15m",
input_hours=72,
output_hours=36,
model_path="model/load_forecaster.h5",
scaler_path="model/scaler.joblib",
)
# Train or retrain
lf.train_and_save(train_days=90, epochs=60)
# Load model and forecast
model = lf.load_model()
forecast_df = lf.get_15min_forecast(model)
print(forecast_df.head())
"""
from __future__ import annotations
import os
import math
import json
import warnings
from dataclasses import dataclass
from typing import Optional, Tuple
import numpy as np
import pandas as pd
from influxdb_client import InfluxDBClient
from influxdb_client.client.warnings import MissingPivotFunction
from sklearn.preprocessing import StandardScaler
from sklearn.exceptions import NotFittedError
import joblib
# TensorFlow / Keras
import tensorflow as tf
from tensorflow.keras.models import Sequential, load_model
from tensorflow.keras.layers import LSTM, Dense, Dropout
from tensorflow.keras.callbacks import EarlyStopping
warnings.filterwarnings("ignore", category=MissingPivotFunction)
@dataclass
class InfluxParams:
url: str
token: str
org: str
bucket: str = "allmende_db"
class LoadForecaster:
def __init__(
self,
url: str,
token: str,
org: str,
bucket: str = "allmende_db",
agg_every: str = "15m",
input_hours: int = 72,
output_hours: int = 36,
model_path: str = "model/load_forecaster.h5",
scaler_path: str = "model/scaler.joblib",
feature_config: Optional[dict] = None,
) -> None:
self.influx = InfluxParams(url=url, token=token, org=org, bucket=bucket)
self.agg_every = agg_every
self.input_steps = int((input_hours * 60) / self._freq_minutes(agg_every))
self.output_steps = int((output_hours * 60) / self._freq_minutes(agg_every))
self.model_path = model_path
self.scaler_path = scaler_path
self.feature_config = feature_config or {"use_temp": True, "use_time_cyc": True}
self._scaler: Optional[StandardScaler] = None
# Ensure model dir exists
os.makedirs(os.path.dirname(model_path), exist_ok=True)
# ---------------------------- Public API ---------------------------- #
def get_15min_forecast(self, model: tf.keras.Model) -> pd.DataFrame:
"""Create a 36-hour forecast at 15-min resolution using the latest data.
Assumes a StandardScaler has been fitted during training and saved.
The method uses the most recent input window from InfluxDB.
"""
# Pull just enough history for one input window
history_hours = math.ceil(self.input_steps * self._freq_minutes(self.agg_every) / 60)
df = self._query_and_prepare(range_hours=history_hours)
if len(df) < self.input_steps:
raise RuntimeError(f"Not enough data: need {self.input_steps} steps, got {len(df)}")
# Build features for the latest window
feats = self._build_features(df)
X_window = feats[-self.input_steps :]
# Load scaler
scaler = self._load_or_get_scaler()
X_scaled = scaler.transform(X_window)
# Predict
pred_scaled = model.predict(X_scaled[np.newaxis, ...], verbose=0)[0]
# Inverse transform only the target column (index 0 is Load)
# Reconstruct a full array to inverse_transform
inv = np.zeros((self.output_steps, X_scaled.shape[1]))
inv[:, 0] = pred_scaled
inv_full = scaler.inverse_transform(inv)
y_pred = inv_full[:, 0]
# Build forecast index
last_ts = df.index[-1]
freq = pd.tseries.frequencies.to_offset(self.agg_every)
idx = pd.date_range(last_ts + freq, periods=self.output_steps, freq=freq)
out = pd.DataFrame({"Forecast_Load": y_pred}, index=idx)
out.index.name = "timestamp"
return out
def train_and_save(
self,
train_days: int = 90,
epochs: int = 80,
batch_size: int = 128,
validation_split: float = 0.2,
learning_rate: float = 1e-3,
fine_tune: bool = False,
) -> tf.keras.Model:
"""Train (or fine-tune) a model from recent history and persist model + scaler."""
df = self._query_and_prepare(range_hours=24 * train_days)
feats = self._build_features(df)
# Prepare windows
X, y = self._make_windows(feats)
if len(X) < 10:
raise RuntimeError("Not enough windowed samples to train.")
# Fit scaler on full X
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)
self._scaler = scaler
joblib.dump(scaler, self.scaler_path)
# Build model (or load existing for fine-tune)
if fine_tune and os.path.exists(self.model_path):
model = load_model(self.model_path)
else:
model = self._build_default_model(input_dim=X.shape[1], output_dim=self.output_steps, lr=learning_rate)
# Train
es = EarlyStopping(monitor="val_loss", patience=10, restore_best_weights=True)
model.fit(
X_scaled.reshape((-1, self.input_steps, X.shape[1] // self.input_steps)),
y,
epochs=epochs,
batch_size=batch_size,
validation_split=validation_split,
callbacks=[es],
verbose=1,
)
model.save(self.model_path)
return model
# A convenience wrapper to be called from an external script once per day
def retrain_daily(self, train_days: int = 90, epochs: int = 40, fine_tune: bool = True) -> None:
self.train_and_save(train_days=train_days, epochs=epochs, fine_tune=fine_tune)
def load_model(self) -> tf.keras.Model:
if not os.path.exists(self.model_path):
raise FileNotFoundError(f"Model not found at {self.model_path}")
return load_model(self.model_path)
# ------------------------- Internals: Data ------------------------- #
def _query_and_prepare(self, range_hours: int) -> pd.DataFrame:
"""Query InfluxDB for the last `range_hours` and construct the Load series.
Expected fields (exactly as in DB):
- "40206 - M_AC_Power"
- "40210 - M_AC_Power_SF"
- "40083 - I_AC_Power"
- "40084 - I_AC_Power_SF"
- "300 - Aussentemperatur"
"""
start_str = f"-{range_hours}h"
flux = f'''
from(bucket: "{self.influx.bucket}")
|> range(start: {start_str})
|> filter(fn: (r) => r["_measurement"] == "solaredge_meter" or r["_measurement"] == "solaredge_master" or r["_measurement"] == "hp_master")
|> filter(fn: (r) => r["_field"] == "40206 - M_AC_Power" or r["_field"] == "40210 - M_AC_Power_SF" or r["_field"] == "40083 - I_AC_Power" or r["_field"] == "40084 - I_AC_Power_SF" or r["_field"] == "300 - Aussentemperatur")
|> aggregateWindow(every: {self.agg_every}, fn: mean, createEmpty: false)
|> yield(name: "mean")
'''
with InfluxDBClient(url=self.influx.url, token=self.influx.token, org=self.influx.org) as client:
tables = client.query_api().query_data_frame(flux)
# Concatenate if list of frames is returned
if isinstance(tables, list):
df = pd.concat(tables, ignore_index=True)
else:
df = tables
# Keep relevant columns and pivot
df = df[["_time", "_field", "_value"]]
df = df.pivot(index="_time", columns="_field", values="_value").reset_index()
df = df.rename(
columns={
"_time": "timestamp",
"40206 - M_AC_Power": "M_AC",
"40210 - M_AC_Power_SF": "M_SF",
"40083 - I_AC_Power": "I_AC",
"40084 - I_AC_Power_SF": "I_SF",
"300 - Aussentemperatur": "Temp",
}
)
df = df.sort_values("timestamp").set_index("timestamp")
# Forward-fill reasonable gaps (e.g., scaler factors and temp)
df[["M_SF", "I_SF", "Temp"]] = df[["M_SF", "I_SF", "Temp"]].ffill()
# Apply scaling: real = value * 10^sf
df["I_AC_real"] = df["I_AC"] * np.power(10.0, df["I_SF"]).astype(float)
df["M_AC_real"] = df["M_AC"] * np.power(10.0, df["M_SF"]).astype(float)
# Compute load
df["Load"] = df["M_AC_real"] - df["I_AC_real"]
# Ensure regular 15-min grid
df = df.asfreq(self.agg_every)
df[["Load", "Temp"]] = df[["Load", "Temp"]].interpolate(limit_direction="both")
return df[["Load", "Temp"]]
def _build_features(self, df: pd.DataFrame) -> np.ndarray:
"""Create feature matrix: [Load, Temp?, sin/cos day, sin/cos dow]."""
feats = [df["Load"].values.reshape(-1, 1)]
if self.feature_config.get("use_temp", True):
feats.append(df["Temp"].values.reshape(-1, 1))
if self.feature_config.get("use_time_cyc", True):
idx = df.index
minute_of_day = (idx.hour * 60 + idx.minute).values.astype(float)
sod = 2 * np.pi * minute_of_day / (24 * 60)
dow = 2 * np.pi * idx.dayofweek.values.astype(float) / 7.0
feats.append(np.sin(sod).reshape(-1, 1))
feats.append(np.cos(sod).reshape(-1, 1))
feats.append(np.sin(dow).reshape(-1, 1))
feats.append(np.cos(dow).reshape(-1, 1))
X = np.hstack(feats) # shape: (T, n_features)
# Flatten windows to 2D for scaler fitting, but model expects 3D; we reshape later
return X
def _make_windows(self, X_2d: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
"""Create sliding windows: returns (X_flat, y) where X_flat stacks the windowed features.
For Keras we later reshape X_flat -> (N, input_steps, n_features).
"""
n = X_2d.shape[0]
n_features = X_2d.shape[1]
X_list, y_list = [], []
for i in range(n - self.input_steps - self.output_steps):
xw = X_2d[i : i + self.input_steps, :]
yw = X_2d[i + self.input_steps : i + self.input_steps + self.output_steps, 0] # target: Load
X_list.append(xw.reshape(-1)) # flatten
y_list.append(yw)
X_flat = np.stack(X_list)
y = np.stack(y_list)
return X_flat, y
# ----------------------- Internals: Modeling ----------------------- #
def _build_default_model(self, input_dim: int, output_dim: int, lr: float = 1e-3) -> tf.keras.Model:
n_features = input_dim // self.input_steps
model = Sequential([
LSTM(96, input_shape=(self.input_steps, n_features), return_sequences=False),
Dropout(0.1),
Dense(output_dim)
])
model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=lr), loss="mse")
return model
def _load_or_get_scaler(self) -> StandardScaler:
if self._scaler is not None:
return self._scaler
if not os.path.exists(self.scaler_path):
raise NotFittedError("Scaler not found. Train the model first to create scaler.")
self._scaler = joblib.load(self.scaler_path)
return self._scaler
@staticmethod
def _freq_minutes(spec: str) -> int:
# supports formats like "15m", "1h"
if spec.endswith("m"):
return int(spec[:-1])
if spec.endswith("h"):
return int(spec[:-1]) * 60
raise ValueError(f"Unsupported frequency spec: {spec}")
# ----------------------------- retrain_daily.py -----------------------------
# A tiny script you can run once per day (e.g., via cron/systemd) to retrain the model.
# It delegates the work to LoadForecaster.retrain_daily().
if __name__ == "__main__":
# Read credentials/config from env vars or fill here
URL = os.getenv("INFLUX_URL", "http://localhost:8086")
TOKEN = os.getenv("INFLUX_TOKEN", "<YOUR_TOKEN>")
ORG = os.getenv("INFLUX_ORG", "<YOUR_ORG>")
BUCKET = os.getenv("INFLUX_BUCKET", "allmende_db")
lf = LoadForecaster(
url=URL,
token=TOKEN,
org=ORG,
bucket=BUCKET,
agg_every="15m",
input_hours=72,
output_hours=36,
model_path=os.getenv("FORECASTER_MODEL", "model/load_forecaster.h5"),
scaler_path=os.getenv("FORECASTER_SCALER", "model/scaler.joblib"),
)
# One call per day is enough; decrease epochs for faster daily updates
lf.retrain_daily(train_days=int(os.getenv("TRAIN_DAYS", "120")), epochs=int(os.getenv("EPOCHS", "30")), fine_tune=True)
# Optionally, produce a fresh forecast right after training
try:
model = lf.load_model()
fc = lf.get_15min_forecast(model)
# Save latest forecast to CSV for dashboards/consumers
out_path = os.getenv("FORECAST_OUT", "model/latest_forecast_15min.csv")
os.makedirs(os.path.dirname(out_path), exist_ok=True)
fc.to_csv(out_path)
print(f"Saved forecast: {out_path}")
except Exception as e:
print(f"Forecast generation failed: {e}")

View File

@@ -0,0 +1,61 @@
#!/usr/bin/env python3
import time
import datetime as dt
import requests
from zoneinfo import ZoneInfo
from matplotlib import pyplot as plt
import pandas as pd
TZ = "Europe/Berlin"
DAYS = 2
OPEN_METEO_URL = "https://api.open-meteo.com/v1/forecast"
class WeatherForecaster:
def __init__(self, latitude, longitude):
self.lat = latitude
self.lon = longitude
def get_hourly_forecast(self, start_hour, days):
start_hour_local = start_hour
end_hour_local = start_hour_local + dt.timedelta(days=days)
params = {
"latitude": self.lat,
"longitude": self.lon,
"hourly": ["temperature_2m", "shortwave_radiation", "wind_speed_10m"],
"timezone": TZ,
"start_hour": start_hour_local.strftime("%Y-%m-%dT%H:%M"),
"end_hour": end_hour_local.strftime("%Y-%m-%dT%H:%M")
}
h = requests.get(OPEN_METEO_URL, params=params).json()["hourly"]
time_stamps = h["time"]
time_stamps = [
dt.datetime.fromisoformat(t).replace(tzinfo=ZoneInfo(TZ))
for t in time_stamps
]
weather = pd.DataFrame(index=time_stamps)
weather["ghi"] = h["shortwave_radiation"]
weather["temp_air"] = h["temperature_2m"]
weather["wind_speed"] = h["wind_speed_10m"]
return weather
if __name__=='__main__':
weather_forecast = WeatherForecaster(latitude=48.041, longitude=7.862)
while True:
now = dt.datetime.now()
secs = 60 - now.second #(60 - now.minute) * 60 - now.second # Sekunden bis volle Stunde
time.sleep(secs)
now_local = dt.datetime.now()
start_hour_local = (now_local + dt.timedelta(hours=1)).replace(minute=0, second=0, microsecond=0)
time_stamps, temps, ghi, wind_speed = weather_forecast.get_hourly_forecast(start_hour_local, DAYS)
plt.plot(time_stamps, temps)
plt.show()

View File

@@ -3,15 +3,17 @@ import pandas as pd
import time
class HeatPump:
def __init__(self, ip_address: str):
def __init__(self, device_name: str, ip_address: str, port: int=502):
self.device_name = device_name
self.ip = ip_address
self.port = port
self.client = None
self.connect_to_modbus()
self.registers = None
self.get_registers()
def connect_to_modbus(self):
port = 502
port = self.port
self.client = ModbusTcpClient(self.ip, port=port)
try:
if not self.client.connect():
@@ -25,7 +27,7 @@ class HeatPump:
def get_registers(self):
# Excel-Datei mit den Input-Registerinformationen
excel_path = "data/ModBus TCPIP 1.17(1).xlsx"
excel_path = "modbus_registers/heat_pump_registers.xlsx"
xls = pd.ExcelFile(excel_path)
df_input_registers = xls.parse('04 Input Register')
@@ -42,7 +44,7 @@ class HeatPump:
for _, row in df_clean.iterrows()
}
def get_data(self):
def get_state(self):
data = {}
data['Zeit'] = time.strftime('%Y-%m-%d %H:%M:%S')
for address, info in self.registers.items():

77
main.py
View File

@@ -1,17 +1,82 @@
import time
from datetime import datetime
from data_base_csv import DataBaseCsv
from data_base_influx import DataBaseInflux
from forecaster.weather_forecaster import WeatherForecaster
from heat_pump import HeatPump
from pv_inverter import PvInverter
from simulators.pv_plant_simulator import PvWattsSubarrayConfig, PvWattsPlant
from solaredge_meter import SolaredgeMeter
from shelly_pro_3m import ShellyPro3m
from energysystem import EnergySystem
from sg_ready_controller import SgReadyController
from pvlib.location import Location
import datetime as dt
interval = 10 # z.B. alle 10 Sekunden
# For dev-System run in terminal: ssh -N -L 127.0.0.1:8111:10.0.0.10:502 pi@192.168.1.146
# For productive-System change IP-adress in heatpump to '10.0.0.10' and port to 502
db = DataBaseCsv('modbus_log.csv')
hp = HeatPump(ip_address='10.0.0.10')
interval_seconds = 10
es = EnergySystem()
db = DataBaseInflux(
url="http://192.168.1.146:8086",
token="Cw_naEZyvJ3isiAh1P4Eq3TsjcHmzzDFS7SlbKDsS6ZWL04fMEYixWqtNxGThDdG27S9aW5g7FP9eiq5z1rsGA==",
org="allmende",
bucket="allmende_db"
)
hp_master = HeatPump(device_name='hp_master', ip_address='10.0.0.10', port=502)
hp_slave = HeatPump(device_name='hp_slave', ip_address='10.0.0.11', port=502)
shelly = ShellyPro3m(device_name='wohnung_2_6', ip_address='192.168.1.121')
wr = PvInverter(device_name='solaredge_master', ip_address='192.168.1.112')
meter = SolaredgeMeter(device_name='solaredge_meter', ip_address='192.168.1.112')
es.add_components(hp_master, hp_slave, shelly, wr, meter)
controller = SgReadyController(es)
# FORECASTING
latitude = 48.041
longitude = 7.862
TZ = "Europe/Berlin"
HORIZON_DAYS = 2
weather_forecaster = WeatherForecaster(latitude=latitude, longitude=longitude)
site = Location(latitude=latitude, longitude=longitude, altitude=35, tz=TZ, name="Gundelfingen")
p_module = 435
upper_roof_north = PvWattsSubarrayConfig(name="north", pdc0_w=(29+29+21)*p_module, tilt_deg=10, azimuth_deg=20, dc_loss=0.02, ac_loss=0.01)
upper_roof_south = PvWattsSubarrayConfig(name="south", pdc0_w=(29+21+20)*p_module, tilt_deg=10, azimuth_deg=200, dc_loss=0.02, ac_loss=0.01)
upper_roof_east = PvWattsSubarrayConfig(name="east", pdc0_w=7*p_module, tilt_deg=10, azimuth_deg=110, dc_loss=0.02, ac_loss=0.01)
upper_roof_west = PvWattsSubarrayConfig(name="west", pdc0_w=7*p_module, tilt_deg=10, azimuth_deg=290, dc_loss=0.02, ac_loss=0.01)
cfgs = [upper_roof_north, upper_roof_south, upper_roof_east, upper_roof_west]
pv_plant = PvWattsPlant(site, cfgs)
now = datetime.now()
next_forecast_at = (now + dt.timedelta(hours=1)).replace(minute=0, second=0, microsecond=0)
while True:
now = datetime.now()
if now.second % interval == 0 and now.microsecond < 100_000:
db.store_data(hp.get_data())
if now.second % interval_seconds == 0 and now.microsecond < 100_000:
state = es.get_state_and_store_to_database(db)
mode = controller.perform_action(heat_pump_name='hp_master', meter_name='solaredge_meter', state=state)
if mode == 'mode1':
mode_as_binary = 0
else:
mode_as_binary = 1
db.store_data('sg_ready', {'mode': mode_as_binary})
if now >= next_forecast_at:
# Start der Prognose: ab der kommenden vollen Stunde
start_hour_local = (now + dt.timedelta(hours=1)).replace(minute=0, second=0, microsecond=0)
weather = weather_forecaster.get_hourly_forecast(start_hour_local, HORIZON_DAYS)
total = pv_plant.get_power(weather)
db.store_forecasts('pv_forecast', total)
# Nächste geplante Ausführung definieren (immer volle Stunde)
# Falls wir durch Delay mehrere Stunden verpasst haben, hole auf:
while next_forecast_at <= now:
next_forecast_at = (next_forecast_at + dt.timedelta(hours=1)).replace(minute=0, second=0, microsecond=0)
time.sleep(0.1)

File diff suppressed because one or more lines are too long

Binary file not shown.

Binary file not shown.

139
pv_inverter.py Normal file
View File

@@ -0,0 +1,139 @@
import time
import struct
import pandas as pd
from typing import Dict, Any, List, Tuple, Optional
from pymodbus.client import ModbusTcpClient
EXCEL_PATH = "modbus_registers/pv_inverter_registers.xlsx"
# Obergrenze: bis EXKLUSIVE 40206 (d.h. max. 40205)
MAX_ADDR_EXCLUSIVE = 40121
class PvInverter:
def __init__(self, device_name: str, ip_address: str, port: int = 502, unit: int = 1):
self.device_name = device_name
self.ip = ip_address
self.port = port
self.unit = unit
self.client: Optional[ModbusTcpClient] = None
self.registers: Dict[int, Dict[str, Any]] = {} # addr -> {"desc":..., "type":...}
self.connect_to_modbus()
self.load_registers(EXCEL_PATH)
# ---------- Verbindung ----------
def connect_to_modbus(self):
self.client = ModbusTcpClient(self.ip, port=self.port, timeout=3.0, retries=3)
if not self.client.connect():
print("❌ Verbindung zu Wechselrichter fehlgeschlagen.")
raise SystemExit(1)
print("✅ Verbindung zu Wechselrichter hergestellt.")
def close(self):
if self.client:
self.client.close()
self.client = None
# ---------- Register-Liste ----------
def load_registers(self, excel_path: str):
xls = pd.ExcelFile(excel_path)
df = xls.parse()
# Passe Spaltennamen hier an, falls nötig:
cols = ["MB Adresse", "Beschreibung", "Variabel Typ"]
df = df[cols].dropna()
df["MB Adresse"] = df["MB Adresse"].astype(int)
# 1) Vorab-Filter: nur Adressen < 40206 übernehmen
df = df[df["MB Adresse"] < MAX_ADDR_EXCLUSIVE]
self.registers = {
int(row["MB Adresse"]): {
"desc": str(row["Beschreibung"]).strip(),
"type": str(row["Variabel Typ"]).strip()
}
for _, row in df.iterrows()
}
# ---------- Low-Level Lesen ----------
def _try_read(self, fn_name: str, address: int, count: int) -> Optional[List[int]]:
fn = getattr(self.client, fn_name)
# pymodbus 3.8.x hat 'slave='; Fallbacks schaden nicht
for kwargs in (dict(address=address, count=count, slave=self.unit),
dict(address=address, count=count)):
try:
res = fn(**kwargs)
if res is None or (hasattr(res, "isError") and res.isError()):
continue
return res.registers
except TypeError:
continue
return None
def _read_any(self, address: int, count: int) -> Optional[List[int]]:
regs = self._try_read("read_holding_registers", address, count)
if regs is None:
regs = self._try_read("read_input_registers", address, count)
return regs
# ---------- Decoding ----------
@staticmethod
def _to_i16(u16: int) -> int:
return struct.unpack(">h", struct.pack(">H", u16))[0]
@staticmethod
def _to_f32_from_two(u16_hi: int, u16_lo: int, msw_first: bool = True) -> float:
b = struct.pack(">HH", u16_hi, u16_lo) if msw_first else struct.pack(">HH", u16_lo, u16_hi)
return struct.unpack(">f", b)[0]
# Hilfsfunktion: wie viele 16-Bit-Register braucht dieser Typ?
@staticmethod
def _word_count_for_type(rtype: str) -> int:
rt = (rtype or "").lower()
# Passe hier an deine Excel-Typen an:
if "uint32" in rt or "real" in rt or "float" in rt or "string(32)" in rt:
return 2
# Default: 1 Wort (z.B. int16/uint16)
return 1
def read_one(self, address_excel: int, rtype: str) -> Optional[float]:
"""
Liest einen Wert nach Typ ('INT' oder 'REAL' etc.).
Es werden ausschließlich Register < 40206 gelesen.
"""
addr = int(address_excel)
words = self._word_count_for_type(rtype)
# 2) Harte Grenze prüfen: höchstes angefasstes Register muss < 40206 sein
if addr + words - 1 >= MAX_ADDR_EXCLUSIVE:
# Überspringen, da der Lesevorgang die Grenze >= 40206 berühren würde
return None
if words == 2:
regs = self._read_any(addr, 2)
if not regs or len(regs) < 2:
return None
# Deine bisherige Logik interpretiert 2 Worte als Float32:
return self._to_f32_from_two(regs[0], regs[1])
else:
regs = self._read_any(addr, 1)
if not regs:
return None
return float(self._to_i16(regs[0]))
def get_state(self) -> Dict[str, Any]:
"""
Liest ALLE Register aus self.registers und gibt dict zurück.
Achtet darauf, dass keine Adresse (inkl. Mehrwort) >= 40206 gelesen wird.
"""
data = {"Zeit": time.strftime("%Y-%m-%d %H:%M:%S")}
for address, meta in sorted(self.registers.items()):
words = self._word_count_for_type(meta["type"])
# 3) Nochmals Schutz auf Ebene der Iteration:
if address + words - 1 >= MAX_ADDR_EXCLUSIVE:
continue
val = self.read_one(address, meta["type"])
if val is None:
continue
key = f"{address} - {meta['desc']}"
data[key] = val
return data

View File

@@ -1,3 +1,5 @@
pymodbus~=3.8.6
pandas
openpyxl
sshtunnel
pvlib

65
sg_ready_controller.py Normal file
View File

@@ -0,0 +1,65 @@
from pymodbus.client import ModbusTcpClient
class SgReadyController():
def __init__(self, es):
self.es = es
def perform_action(self, heat_pump_name, meter_name, state):
hp = self.es.get_component_by_name(heat_pump_name)
meter_values = state[meter_name]
power_to_grid = meter_values['40206 - M_AC_Power'] * 10 ** meter_values['40210 - M_AC_Power_SF']
mode = None
if power_to_grid > 10000:
mode = 'mode2'
self.switch_sg_ready_mode(hp.ip, hp.port, mode)
elif power_to_grid < 0:
mode = 'mode1'
self.switch_sg_ready_mode(hp.ip, hp.port, mode)
return mode
def switch_sg_ready_mode(self, ip, port, mode):
"""
Register 300: 1=BUS 0= Hardware Kontakte
Register 301 & 302:
0-0= Kein Offset
0-1 Boiler und Heizung Offset
1-1 Boiler Offset + E-Einsatz Sollwert Erhöht
1-0 SG EVU Sperre
:param ip:
:param mode:
'mode1' = [True, False, False] => SG Ready deactivated
'mode2' = [True, False, True] => SG ready activated for heatpump only
'mode3' = [True, True, True] => SG ready activated for heatpump and heat rod
:return:
"""
client = ModbusTcpClient(ip, port=port)
if not client.connect():
print("Verbindung zur Wärmepumpe fehlgeschlagen.")
return
mode_code = None
if mode == 'mode1':
mode_code = [True, False, False]
elif mode == 'mode2':
mode_code = [True, False, True]
elif mode == 'mode3':
mode_code = [True, True, True]
else:
print('Uncorrect or no string for mode!')
try:
response_300 = client.write_coil(300, mode_code[0])
response_301 = client.write_coil(301, mode_code[1])
response_302 = client.write_coil(302, mode_code[2])
# Optional: Rückmeldungen prüfen
for addr, resp in zip([300, 301, 302], [response_300, response_301, response_302]):
if resp.isError():
print(f"Fehler beim Schreiben von Coil {addr}: {resp}")
else:
print(f"Coil {addr} erfolgreich geschrieben.")
finally:
client.close()

64
shelly_pro_3m.py Normal file
View File

@@ -0,0 +1,64 @@
import struct
from pymodbus.client import ModbusTcpClient
import pandas as pd
import time
class ShellyPro3m:
def __init__(self, device_name: str, ip_address: str, port: int=502):
self.device_name = device_name
self.ip = ip_address
self.port = port
self.client = None
self.connect_to_modbus()
self.registers = None
self.get_registers()
def connect_to_modbus(self):
port = self.port
self.client = ModbusTcpClient(self.ip, port=port)
try:
if not self.client.connect():
print("Verbindung zum Shelly-Logger fehlgeschlagen.")
exit(1)
print("Verbindung zum Shelly-Logger erfolgreich.")
except KeyboardInterrupt:
print("Beendet durch Benutzer (Ctrl+C).")
finally:
self.client.close()
def get_registers(self):
# Excel-Datei mit den Input-Registerinformationen
excel_path = "modbus_registers/shelly_pro_3m_registers.xlsx"
xls = pd.ExcelFile(excel_path)
df_input_registers = xls.parse()
# Relevante Spalten bereinigen
df_clean = df_input_registers[['MB Adresse', 'Beschreibung', 'Variabel Typ']].dropna()
df_clean['MB Adresse'] = df_clean['MB Adresse'].astype(int)
# Dictionary aus Excel erzeugen
self.registers = {
row['MB Adresse']: {
'desc': row['Beschreibung'],
'type': 'REAL' if row['Variabel Typ'] == 'REAL' else 'INT'
}
for _, row in df_clean.iterrows()
}
def get_state(self):
data = {}
data['Zeit'] = time.strftime('%Y-%m-%d %H:%M:%S')
for address, info in self.registers.items():
reg_type = info['type']
result = self.client.read_input_registers(address, count=2 if reg_type == 'REAL' else 1)
if result.isError():
print(f"Fehler beim Lesen von Adresse {address}: {result}")
continue
packed = struct.pack(">HH", result.registers[1], result.registers[0])
value = round(struct.unpack(">f", packed)[0], 2)
print(f"Adresse {address} - {info['desc']}: {value}")
data[f"{address} - {info['desc']}"] = value
return data

View File

@@ -0,0 +1,210 @@
from __future__ import annotations
from dataclasses import dataclass
from typing import Optional, Dict, List, Literal, Tuple, Union
import numpy as np
import pandas as pd
import pvlib
import matplotlib.pyplot as plt
from pvlib.location import Location
from pvlib.pvsystem import PVSystem
from pvlib.modelchain import ModelChain
SeriesOrArray = Union[pd.Series, np.ndarray]
# ----------------------------- Konfiguration -----------------------------
@dataclass
class PvWattsSubarrayConfig:
name: str
pdc0_w: float # STC-DC-Leistung [W]
tilt_deg: float # Neigung (0=horizontal)
azimuth_deg: float # Azimut (180=Süd)
gamma_pdc: float = -0.004 # Tempkoeff. [1/K]
eta_inv_nom: float = 0.96 # WR-Wirkungsgrad (nominal)
albedo: float = 0.2 # Bodenreflexion
# Pauschale Verluste (PVWatts-Losses)
dc_loss: float = 0.0
ac_loss: float = 0.0
soiling: float = 0.0
# Modell
transposition_model: Literal["perez","haydavies","isotropic","klucher","reindl"] = "perez"
# ------------------------------ Subarray ---------------------------------
class PvWattsSubarray:
"""
Ein Subarray mit pvlib.ModelChain (PVWatts).
Berechnet automatisch DNI/DHI aus GHI (ERBS-Methode)
und nutzt ein SAPM-Temperaturmodell.
"""
def __init__(self, cfg: PvWattsSubarrayConfig, location: Location):
self.cfg = cfg
self.location = location
self._mc: Optional[ModelChain] = None
# ---------------------------------------------------------------------
def _create_modelchain(self) -> ModelChain:
"""Erzeuge eine pvlib.ModelChain-Instanz mit PVWatts-Parametern."""
temp_params = pvlib.temperature.TEMPERATURE_MODEL_PARAMETERS["sapm"]["open_rack_glass_polymer"]
system = PVSystem(
surface_tilt=self.cfg.tilt_deg,
surface_azimuth=self.cfg.azimuth_deg,
module_parameters={"pdc0": self.cfg.pdc0_w, "gamma_pdc": self.cfg.gamma_pdc},
inverter_parameters={"pdc0": self.cfg.pdc0_w, "eta_inv_nom": self.cfg.eta_inv_nom},
albedo=self.cfg.albedo,
temperature_model_parameters=temp_params,
module_type="glass_polymer",
racking_model="open_rack",
)
mc = ModelChain(
system, self.location,
transposition_model=self.cfg.transposition_model,
solar_position_method="nrel_numpy",
airmass_model="kastenyoung1989",
dc_model="pvwatts",
ac_model="pvwatts",
aoi_model="physical",
spectral_model=None,
losses_model="pvwatts",
temperature_model="sapm",
)
mc.losses_parameters = {
"dc_loss": float(self.cfg.dc_loss),
"ac_loss": float(self.cfg.ac_loss),
"soiling": float(self.cfg.soiling),
}
self._mc = mc
return mc
# ---------------------------------------------------------------------
def calc_dni_and_dhi(self, weather: pd.DataFrame) -> pd.DataFrame:
"""
Berechnet DNI & DHI aus GHI über die ERBS-Methode.
Gibt ein neues DataFrame mit 'ghi', 'dni', 'dhi' zurück.
"""
if "ghi" not in weather:
raise ValueError("Wetterdaten benötigen mindestens 'ghi'.")
# Sonnenstand bestimmen
sp = self.location.get_solarposition(weather.index)
erbs = pvlib.irradiance.erbs(weather["ghi"], sp["zenith"], weather.index)
out = weather.copy()
out["dni"] = erbs["dni"].clip(lower=0)
out["dhi"] = erbs["dhi"].clip(lower=0)
return out
# ---------------------------------------------------------------------
def _prepare_weather(self, weather: pd.DataFrame) -> pd.DataFrame:
"""Sichert vollständige Spalten (ghi, dni, dhi, temp_air, wind_speed)."""
if "ghi" not in weather or "temp_air" not in weather:
raise ValueError("weather benötigt Spalten: 'ghi' und 'temp_air'.")
w = weather.copy()
# Zeitzone prüfen
if w.index.tz is None:
w.index = w.index.tz_localize(self.location.tz)
else:
if str(w.index.tz) != str(self.location.tz):
w = w.tz_convert(self.location.tz)
# Wind default
if "wind_speed" not in w:
w["wind_speed"] = 1.0
# DNI/DHI ergänzen (immer mit ERBS)
if "dni" not in w or "dhi" not in w:
w = self.calc_dni_and_dhi(w)
return w
# ---------------------------------------------------------------------
def get_power(self, weather: pd.DataFrame) -> pd.Series:
"""
Berechnet AC-Leistung aus Wetterdaten.
"""
w = self._prepare_weather(weather)
mc = self._create_modelchain()
mc.run_model(weather=w)
return mc.results.ac.rename(self.cfg.name)
# ------------------------------- Anlage ----------------------------------
class PvWattsPlant:
"""
Eine PV-Anlage mit mehreren Subarrays, die ein gemeinsames Wetter-DataFrame nutzt.
"""
def __init__(self, site: Location, subarray_cfgs: List[PvWattsSubarrayConfig]):
self.site = site
self.subs: Dict[str, PvWattsSubarray] = {c.name: PvWattsSubarray(c, site) for c in subarray_cfgs}
def get_power(
self,
weather: pd.DataFrame,
*,
return_breakdown: bool = False
) -> pd.Series | Tuple[pd.Series, Dict[str, pd.Series]]:
"""Berechne Gesamtleistung und optional Einzel-Subarrays."""
parts: Dict[str, pd.Series] = {name: sub.get_power(weather) for name, sub in self.subs.items()}
# gemeinsamen Index bilden
idx = list(parts.values())[0].index
for s in parts.values():
idx = idx.intersection(s.index)
parts = {k: v.reindex(idx).fillna(0.0) for k, v in parts.items()}
total = sum(parts.values())
total.name = "total_ac"
if return_breakdown:
return total, parts
return total
# --------------------------- Beispielnutzung -----------------------------
if __name__ == "__main__":
# Standort
site = Location(latitude=52.52, longitude=13.405, altitude=35, tz="Europe/Berlin", name="Berlin")
# Zeitachse: 1 Tag, 15-minütig
times = pd.date_range("2025-06-21 00:00", "2025-06-21 23:45", freq="15min", tz=site.tz)
# Dummy-Wetter
ghi = 1000 * np.clip(np.sin(np.linspace(0, np.pi, len(times)))**1.2, 0, None)
temp_air = 16 + 8 * np.clip(np.sin(np.linspace(-np.pi/2, np.pi/2, len(times))), 0, None)
wind = np.full(len(times), 1.0)
weather = pd.DataFrame(index=times)
weather["ghi"] = ghi
weather["temp_air"] = temp_air
weather["wind_speed"] = wind
# Zwei Subarrays
cfgs = [
PvWattsSubarrayConfig(name="Sued_30", pdc0_w=6000, tilt_deg=30, azimuth_deg=180, dc_loss=0.02, ac_loss=0.01),
PvWattsSubarrayConfig(name="West_20", pdc0_w=4000, tilt_deg=20, azimuth_deg=270, soiling=0.02),
]
plant = PvWattsPlant(site, cfgs)
# Simulation
total, parts = plant.get_power(weather, return_breakdown=True)
# Plot
plt.figure(figsize=(10, 6))
plt.plot(total.index, total / 1000, label="Gesamtleistung (AC)", linewidth=2, color="black")
for name, s in parts.items():
plt.plot(s.index, s / 1000, label=name)
plt.title("PV-Leistung (PVWatts, ERBS-Methode für DNI/DHI)")
plt.ylabel("Leistung [kW]")
plt.xlabel("Zeit")
plt.legend()
plt.grid(True, linestyle="--", alpha=0.5)
plt.tight_layout()
plt.show()

134
solaredge_meter.py Normal file
View File

@@ -0,0 +1,134 @@
import time
import struct
import pandas as pd
from typing import Dict, Any, List, Tuple, Optional
from pymodbus.client import ModbusTcpClient
EXCEL_PATH = "modbus_registers/pv_inverter_registers.xlsx"
# Obergrenze: bis EXKLUSIVE 40206 (d.h. max. 40205)
MIN_ADDR_INCLUSIVE = 40121
ADDRESS_SHIFT = 50
class SolaredgeMeter:
def __init__(self, device_name: str, ip_address: str, port: int = 502, unit: int = 1):
self.device_name = device_name
self.ip = ip_address
self.port = port
self.unit = unit
self.client: Optional[ModbusTcpClient] = None
self.registers: Dict[int, Dict[str, Any]] = {} # addr -> {"desc":..., "type":...}
self.connect_to_modbus()
self.load_registers(EXCEL_PATH)
# ---------- Verbindung ----------
def connect_to_modbus(self):
self.client = ModbusTcpClient(self.ip, port=self.port, timeout=3.0, retries=3)
if not self.client.connect():
print("❌ Verbindung zu Zähler fehlgeschlagen.")
raise SystemExit(1)
print("✅ Verbindung zu Zähler hergestellt.")
def close(self):
if self.client:
self.client.close()
self.client = None
# ---------- Register-Liste ----------
def load_registers(self, excel_path: str):
xls = pd.ExcelFile(excel_path)
df = xls.parse()
# Passe Spaltennamen hier an, falls nötig:
cols = ["MB Adresse", "Beschreibung", "Variabel Typ"]
df = df[cols].dropna()
df["MB Adresse"] = df["MB Adresse"].astype(int)
# 1) Vorab-Filter: nur Adressen < 40206 übernehmen
df = df[df["MB Adresse"] >= MIN_ADDR_INCLUSIVE]
self.registers = {
int(row["MB Adresse"]): {
"desc": str(row["Beschreibung"]).strip(),
"type": str(row["Variabel Typ"]).strip()
}
for _, row in df.iterrows()
}
# ---------- Low-Level Lesen ----------
def _try_read(self, fn_name: str, address: int, count: int) -> Optional[List[int]]:
fn = getattr(self.client, fn_name)
# pymodbus 3.8.x hat 'slave='; Fallbacks schaden nicht
shifted_addr = address + ADDRESS_SHIFT
for kwargs in (dict(address=shifted_addr, count=count, slave=self.unit),
dict(address=shifted_addr, count=count)):
try:
res = fn(**kwargs)
if res is None or (hasattr(res, "isError") and res.isError()):
continue
return res.registers
except TypeError:
continue
return None
def _read_any(self, address: int, count: int) -> Optional[List[int]]:
regs = self._try_read("read_holding_registers", address, count)
if regs is None:
regs = self._try_read("read_input_registers", address, count)
return regs
# ---------- Decoding ----------
@staticmethod
def _to_i16(u16: int) -> int:
return struct.unpack(">h", struct.pack(">H", u16))[0]
@staticmethod
def _to_f32_from_two(u16_hi: int, u16_lo: int, msw_first: bool = True) -> float:
b = struct.pack(">HH", u16_hi, u16_lo) if msw_first else struct.pack(">HH", u16_lo, u16_hi)
return struct.unpack(">f", b)[0]
# Hilfsfunktion: wie viele 16-Bit-Register braucht dieser Typ?
@staticmethod
def _word_count_for_type(rtype: str) -> int:
rt = (rtype or "").lower()
# Passe hier an deine Excel-Typen an:
if "uint32" in rt or "real" in rt or "float" in rt or "string(32)" in rt:
return 2
# Default: 1 Wort (z.B. int16/uint16)
return 1
def read_one(self, address_excel: int, rtype: str) -> Optional[float]:
"""
Liest einen Wert nach Typ ('INT' oder 'REAL' etc.).
Es werden ausschließlich Register < 40206 gelesen.
"""
addr = int(address_excel)
words = self._word_count_for_type(rtype)
if words == 2:
regs = self._read_any(addr, 2)
if not regs or len(regs) < 2:
return None
# Deine bisherige Logik interpretiert 2 Worte als Float32:
return self._to_f32_from_two(regs[0], regs[1])
else:
regs = self._read_any(addr, 1)
if not regs:
return None
return float(self._to_i16(regs[0]))
def get_state(self) -> Dict[str, Any]:
"""
Liest ALLE Register aus self.registers und gibt dict zurück.
Achtet darauf, dass keine Adresse (inkl. Mehrwort) >= 40206 gelesen wird.
"""
data = {"Zeit": time.strftime("%Y-%m-%d %H:%M:%S")}
for address, meta in sorted(self.registers.items()):
words = self._word_count_for_type(meta["type"])
val = self.read_one(address, meta["type"])
if val is None:
continue
key = f"{address} - {meta['desc']}"
data[key] = val
return data

5108945
terminal_log

File diff suppressed because it is too large Load Diff