2 Commits

Author SHA1 Message Date
Nils Reiners
1348329a24 aktueller stand 2025-04-26 21:19:21 +01:00
Nils Reiners
e2c3d208de update data 2025-04-19 08:22:30 +01:00
35 changed files with 5160606 additions and 1286 deletions

38
README
View File

@@ -11,42 +11,10 @@ Was needs to be done on the Raspberry pi before the tool can run.
- pip install -r requirements.txt - pip install -r requirements.txt
3) How to run the script for testing: How to run the script:
nohup python main.py > terminal_log 2>&1 & - nohup python main.py > terminal_log 2>&1 &
For reading out the terminal_log while script is runing: For reading out the terminal_log while script is runing:
tail -f terminal_log - tail -f terminal_log
4) Implement and run the ems as systemd service:
create:
/etc/systemd/system/allmende_ems.service
insert:
[Unit]
Description=Allmende EMS Python Script
After=network.target
[Service]
WorkingDirectory=/home/pi/projects/allmende_ems
ExecStart=/home/pi/allmende_ems/bin/python3.11 /home/pi/projects/allmende_ems/main.py
Restart=always
RestartSec=5
StandardOutput=journal
StandardError=journal
[Install]
WantedBy=multi-user.target
manage the service with the following commands:
Once:
sudo systemctl daemon-reload
sudo systemctl start allmende_ems.service
sudo systemctl enable allmende_ems.service
While running:
sudo systemctl status allmende_ems.service
sudo systemctl restart allmende_ems.service
sudo systemctl stop allmende_ems.service
journalctl -u allmende_ems.service

View File

@@ -1,7 +0,0 @@
from heat_pump import HeatPump
hp_master = HeatPump(device_name='hp_master', ip_address='10.0.0.10', port=502, excel_path="../modbus_registers/heat_pump_registers.xlsx")
state = hp_master.get_state()
print(state)

View File

@@ -1,49 +0,0 @@
from pymodbus.client import ModbusTcpClient
def switch_sg_ready_mode(ip, port, mode):
"""
Register 300: 1=BUS 0= Hardware Kontakte
Register 301 & 302:
0-0= Kein Offset
0-1 Boiler und Heizung Offset
1-1 Boiler Offset + E-Einsatz Sollwert Erhöht
1-0 SG EVU Sperre
:param ip:
:param mode:
'mode1' = [True, False, False] => SG Ready deactivated
'mode2' = [True, False, True] => SG ready activated for heatpump only
'mode3' = [True, True, True] => SG ready activated for heatpump and heat rod
:return:
"""
client = ModbusTcpClient(ip, port=port)
if not client.connect():
print("Verbindung zur Wärmepumpe fehlgeschlagen.")
return
mode_code = None
if mode == 'mode1':
mode_code = [True, False, False]
elif mode == 'mode2':
mode_code = [True, False, True]
elif mode == 'mode3':
mode_code = [True, True, True]
else:
print('Uncorrect or no string for mode!')
try:
response_300 = client.write_coil(300, mode_code[0])
response_301 = client.write_coil(301, mode_code[1])
response_302 = client.write_coil(302, mode_code[2])
# Optional: Rückmeldungen prüfen
for addr, resp in zip([300, 301, 302], [response_300, response_301, response_302]):
if resp.isError():
print(f"Fehler beim Schreiben von Coil {addr}: {resp}")
else:
print(f"Coil {addr} erfolgreich geschrieben.")
finally:
client.close()
if '__name__' == '__main__':
switch_sg_ready_mode(ip='10.0.0.10', port=502, mode='mode2')

46
data_base_csv.py Normal file
View File

@@ -0,0 +1,46 @@
import csv
import os
import tempfile
import shutil
class DataBaseCsv:
def __init__(self, filename: str):
self.filename = filename
def store_data(self, data: dict):
new_fields = list(data.keys())
# If file does not exist or is empty → create new file with header
if not os.path.exists(self.filename) or os.path.getsize(self.filename) == 0:
with open(self.filename, mode='w', newline='') as csv_file:
writer = csv.DictWriter(csv_file, fieldnames=new_fields)
writer.writeheader()
writer.writerow(data)
return
# If file exists → read existing header and data
with open(self.filename, mode='r', newline='') as csv_file:
reader = csv.DictReader(csv_file)
existing_fields = reader.fieldnames
existing_data = list(reader)
# Merge old and new fields (keep original order, add new ones)
all_fields = existing_fields.copy()
for field in new_fields:
if field not in all_fields:
all_fields.append(field)
# Write to a temporary file with updated header
with tempfile.NamedTemporaryFile(mode='w', delete=False, newline='', encoding='utf-8') as tmp_file:
writer = csv.DictWriter(tmp_file, fieldnames=all_fields)
writer.writeheader()
# Write old rows with updated field list
for row in existing_data:
writer.writerow({field: row.get(field, '') for field in all_fields})
# Write new data row
writer.writerow({field: data.get(field, '') for field in all_fields})
# Replace original file with updated temporary file
shutil.move(tmp_file.name, self.filename)

View File

@@ -1,48 +0,0 @@
from influxdb_client import InfluxDBClient, Point, WritePrecision
from datetime import datetime
import datetime as dt
import pandas as pd
class DataBaseInflux:
def __init__(self, url: str, token: str, org: str, bucket: str):
self.url = url
self.token = token
self.org = org
self.bucket = bucket
self.client = InfluxDBClient(url=self.url, token=self.token, org=self.org)
self.write_api = self.client.write_api()
def store_data(self, device_name: str, data: dict):
measurement = device_name # Fest auf "messungen" gesetzt
point = Point(measurement)
# Alle Key/Value-Paare als Fields speichern
for key, value in data.items():
point = point.field(key, value)
# Zeitstempel automatisch auf jetzt setzen
point = point.time(datetime.utcnow(), WritePrecision.NS)
# Punkt in InfluxDB schreiben
self.write_api.write(bucket=self.bucket, org=self.org, record=point)
def store_forecasts(self, forecast_name: str, data: pd.Series):
measurement = forecast_name
run_tag = dt.datetime.now(dt.timezone.utc).replace(second=0, microsecond=0).isoformat(timespec="minutes")
pts = []
series = pd.to_numeric(data, errors="coerce").dropna()
for ts, val in series.items():
pts.append(
Point(measurement)
.tag("run", run_tag)
.field("value", float(val))
.time(ts.to_pydatetime(), WritePrecision.S)
)
self.write_api.write(bucket=self.bucket, org=self.org, record=pts)

View File

@@ -1,213 +0,0 @@
import os, re, math, time
from datetime import datetime, timezone, timedelta
import pandas as pd
from influxdb_client import InfluxDBClient, Point, WritePrecision
from influxdb_client.client.write_api import SYNCHRONOUS
from influxdb_client.rest import ApiException
# -----------------------
# CONFIG
# -----------------------
INFLUX_URL = "http://192.168.1.146:8086"
INFLUX_ORG = "allmende"
INFLUX_TOKEN = os.environ.get("INFLUX_TOKEN", "Cw_naEZyvJ3isiAh1P4Eq3TsjcHmzzDFS7SlbKDsS6ZWL04fMEYixWqtNxGThDdG27S9aW5g7FP9eiq5z1rsGA==")
SOURCE_BUCKET = "allmende_db"
TARGET_BUCKET = "allmende_db_v2"
MEASUREMENTS = [
"hp_master", "hp_slave", "pv_forecast", "sg_ready",
"solaredge_master", "solaredge_meter", "solaredge_slave", "wohnung_2_6"
]
START_DT = datetime(2025, 6, 1, tzinfo=timezone.utc)
STOP_DT = datetime.now(timezone.utc)
WINDOW = timedelta(days=1)
EXCEL_PATH = "../modbus_registers/heat_pump_registers.xlsx"
EXCEL_SHEET = "Register_Map"
BATCH_SIZE = 1000
MAX_RETRIES = 8
# -----------------------
# Helpers
# -----------------------
def normalize(s) -> str:
s = "" if s is None else str(s).strip()
return re.sub(r"\s+", " ", s)
def is_invalid_sentinel(v: float) -> bool:
return v in (-999.9, -999.0, 30000.0, 32767.0, 65535.0)
def ensure_bucket(client: InfluxDBClient, name: str):
bapi = client.buckets_api()
if bapi.find_bucket_by_name(name):
return
bapi.create_bucket(bucket_name=name, org=INFLUX_ORG, retention_rules=None)
def build_field_type_map_from_excel(path: str) -> dict[str, str]:
df = pd.read_excel(path, sheet_name=EXCEL_SHEET)
df = df[df["Register_Type"].astype(str).str.upper() == "IR"].copy()
df["Address"] = df["Address"].astype(int)
df["Description"] = df["Description"].fillna("").astype(str)
df["Tag_Name"] = df["Tag_Name"].fillna("").astype(str)
df["Data_Type"] = df["Data_Type"].fillna("").astype(str)
m: dict[str, str] = {}
for _, r in df.iterrows():
addr = int(r["Address"])
desc = normalize(r["Description"])
tag = normalize(r["Tag_Name"])
dtp = normalize(r["Data_Type"]).upper()
if tag:
m[tag] = dtp
old_key = normalize(f"{addr} - {desc}".strip(" -"))
if old_key:
m[old_key] = dtp
return m
def coerce_value_to_dtype(v, dtype: str):
if v is None:
return None
dtp = (dtype or "").upper()
if isinstance(v, (int, float)):
fv = float(v)
if math.isnan(fv) or math.isinf(fv):
return None
if dtp in ("BOOL", "BOOLEAN"):
if isinstance(v, bool): return v
if isinstance(v, (int, float)): return bool(int(v))
return None
if dtp.startswith("INT") or dtp.startswith("UINT"):
if isinstance(v, bool): return int(v)
if isinstance(v, (int, float)): return int(float(v))
return None
if dtp.startswith("FLOAT") or dtp in ("DOUBLE",):
if isinstance(v, bool): return float(int(v))
if isinstance(v, (int, float)): return float(v)
return None
return None
def write_with_retry(write_api, batch):
delay = 1.0
last_msg = ""
for _ in range(MAX_RETRIES):
try:
write_api.write(bucket=TARGET_BUCKET, org=INFLUX_ORG, record=batch)
return
except ApiException as e:
last_msg = getattr(e, "body", "") or str(e)
status = getattr(e, "status", None)
if "timeout" in last_msg.lower() or status in (429, 500, 502, 503, 504):
time.sleep(delay)
delay = min(delay * 2, 30)
continue
raise
raise RuntimeError(f"Write failed after {MAX_RETRIES} retries: {last_msg}")
def window_already_migrated(query_api, measurement: str, start: datetime, stop: datetime) -> bool:
# Prüft: gibt es im Zielbucket im Fenster mindestens 1 Punkt?
flux = f'''
from(bucket: "{TARGET_BUCKET}")
|> range(start: time(v: "{start.isoformat()}"), stop: time(v: "{stop.isoformat()}"))
|> filter(fn: (r) => r._measurement == "{measurement}")
|> limit(n: 1)
'''
tables = query_api.query(flux, org=INFLUX_ORG)
for t in tables:
if t.records:
return True
return False
def migrate_window(query_api, write_api, measurement: str,
start: datetime, stop: datetime,
type_map: dict[str, str],
do_type_cast: bool) -> int:
flux = f'''
from(bucket: "{SOURCE_BUCKET}")
|> range(start: time(v: "{start.isoformat()}"), stop: time(v: "{stop.isoformat()}"))
|> filter(fn: (r) => r._measurement == "{measurement}")
|> keep(columns: ["_time","_measurement","_field","_value"])
'''
tables = query_api.query(flux, org=INFLUX_ORG)
batch, written = [], 0
for table in tables:
for rec in table.records:
t = rec.get_time()
field = normalize(rec.get_field())
value = rec.get_value()
if value is None:
continue
if do_type_cast:
dtp = type_map.get(field)
if dtp:
cv = coerce_value_to_dtype(value, dtp)
if cv is None:
continue
if isinstance(cv, (int, float)) and is_invalid_sentinel(float(cv)):
continue
value = cv
# kein Mapping -> unverändert schreiben
batch.append(Point(measurement).field(field, value).time(t, WritePrecision.NS))
if len(batch) >= BATCH_SIZE:
write_with_retry(write_api, batch)
written += len(batch)
batch = []
if batch:
write_with_retry(write_api, batch)
written += len(batch)
return written
# -----------------------
# Main
# -----------------------
def main():
if not INFLUX_TOKEN:
raise RuntimeError("INFLUX_TOKEN fehlt (Env-Var INFLUX_TOKEN setzen).")
with InfluxDBClient(url=INFLUX_URL, token=INFLUX_TOKEN, org=INFLUX_ORG, timeout=900_000) as client:
ensure_bucket(client, TARGET_BUCKET)
type_map = build_field_type_map_from_excel(EXCEL_PATH)
query_api = client.query_api()
write_api = client.write_api(write_options=SYNCHRONOUS)
for meas in MEASUREMENTS:
do_cast = meas in ("hp_master", "hp_slave")
cur, total = START_DT, 0
print(f"\n== {meas} (cast={'ON' if do_cast else 'OFF'}) ==")
while cur < STOP_DT:
nxt = min(cur + WINDOW, STOP_DT)
if window_already_migrated(query_api, meas, cur, nxt):
print(f"{cur.isoformat()} -> {nxt.isoformat()} : SKIP (existiert schon)")
cur = nxt
continue
n = migrate_window(query_api, write_api, meas, cur, nxt, type_map, do_cast)
total += n
print(f"{cur.isoformat()} -> {nxt.isoformat()} : {n} (gesamt {total})")
cur = nxt
print(f"== Fertig {meas}: {total} Punkte ==")
if __name__ == "__main__":
main()

View File

@@ -1,25 +0,0 @@
class EnergySystem():
def __init__(self):
self.components = []
def add_components(self, *args):
for comp in args:
self.components.append(comp)
def get_state_and_store_to_database(self, db):
state = {}
for comp in self.components:
component_state = comp.get_state()
state[comp.device_name] = component_state
db.store_data(comp.device_name, component_state)
return state
def get_component_by_name(self, name):
for comp in self.components:
if comp.device_name == name:
return comp

View File

@@ -1,61 +0,0 @@
#!/usr/bin/env python3
import time
import datetime as dt
import requests
from zoneinfo import ZoneInfo
from matplotlib import pyplot as plt
import pandas as pd
TZ = "Europe/Berlin"
DAYS = 2
OPEN_METEO_URL = "https://api.open-meteo.com/v1/forecast"
class WeatherForecaster:
def __init__(self, latitude, longitude):
self.lat = latitude
self.lon = longitude
def get_hourly_forecast(self, start_hour, days):
start_hour_local = start_hour
end_hour_local = start_hour_local + dt.timedelta(days=days)
params = {
"latitude": self.lat,
"longitude": self.lon,
"hourly": ["temperature_2m", "shortwave_radiation", "wind_speed_10m"],
"timezone": TZ,
"start_hour": start_hour_local.strftime("%Y-%m-%dT%H:%M"),
"end_hour": end_hour_local.strftime("%Y-%m-%dT%H:%M")
}
h = requests.get(OPEN_METEO_URL, params=params).json()["hourly"]
time_stamps = h["time"]
time_stamps = [
dt.datetime.fromisoformat(t).replace(tzinfo=ZoneInfo(TZ))
for t in time_stamps
]
weather = pd.DataFrame(index=time_stamps)
weather["ghi"] = h["shortwave_radiation"]
weather["temp_air"] = h["temperature_2m"]
weather["wind_speed"] = h["wind_speed_10m"]
return weather
if __name__=='__main__':
weather_forecast = WeatherForecaster(latitude=48.041, longitude=7.862)
while True:
now = dt.datetime.now()
secs = 60 - now.second #(60 - now.minute) * 60 - now.second # Sekunden bis volle Stunde
time.sleep(secs)
now_local = dt.datetime.now()
start_hour_local = (now_local + dt.timedelta(hours=1)).replace(minute=0, second=0, microsecond=0)
time_stamps, temps, ghi, wind_speed = weather_forecast.get_hourly_forecast(start_hour_local, DAYS)
plt.plot(time_stamps, temps)
plt.show()

View File

@@ -1,173 +1,62 @@
from pymodbus.client import ModbusTcpClient from pymodbus.client import ModbusTcpClient
import pandas as pd import pandas as pd
import time import time
import struct
import math
class HeatPump: class HeatPump:
def __init__(self, device_name: str, ip_address: str, port: int = 502, def __init__(self, ip_address: str):
excel_path: str = "modbus_registers/heat_pump_registers.xlsx",
sheet_name: str = "Register_Map"):
self.device_name = device_name
self.ip = ip_address self.ip = ip_address
self.port = port self.client = None
self.client = ModbusTcpClient(self.ip, port=self.port) self.connect_to_modbus()
self.registers = None
self.get_registers()
self.excel_path = excel_path def connect_to_modbus(self):
self.sheet_name = sheet_name port = 502
self.registers = self.get_registers() self.client = ModbusTcpClient(self.ip, port=port)
# -------------
# Connection
# -------------
def connect(self) -> bool:
ok = self.client.connect()
if not ok:
print("Verbindung zur Wärmepumpe fehlgeschlagen.")
return ok
def close(self):
try: try:
self.client.close() if not self.client.connect():
except Exception: print("Verbindung zur Wärmepumpe fehlgeschlagen.")
pass exit(1)
print("Verbindung zur Wärmepumpe erfolgreich.")
# ------------- except KeyboardInterrupt:
# Excel parsing print("Beendet durch Benutzer (Ctrl+C).")
# -------------
def get_registers(self) -> dict:
df = pd.read_excel(self.excel_path, sheet_name=self.sheet_name)
df = df[df["Register_Type"].astype(str).str.upper() == "IR"].copy()
df["Address"] = df["Address"].astype(int)
df["Length"] = df["Length"].astype(int)
df["Data_Type"] = df["Data_Type"].astype(str).str.upper()
df["Byteorder"] = df["Byteorder"].astype(str).str.upper()
df["Scaling"] = df.get("Scaling", 1.0)
df["Scaling"] = df["Scaling"].fillna(1.0).astype(float)
df["Offset"] = df.get("Offset", 0.0)
df["Offset"] = df["Offset"].fillna(0.0).astype(float)
regs = {}
for _, row in df.iterrows():
regs[int(row["Address"])] = {
"length": int(row["Length"]),
"data_type": row["Data_Type"],
"byteorder": row["Byteorder"],
"scaling": float(row["Scaling"]),
"offset": float(row["Offset"]),
"tag": str(row.get("Tag_Name", "")).strip(),
"desc": "" if pd.isna(row.get("Description")) else str(row.get("Description")).strip(),
}
return regs
# -------------
# Byteorder handling
# -------------
@staticmethod
def _registers_to_bytes(registers: list[int], byteorder_code: str) -> bytes:
"""
registers: Liste von uint16 (0..65535), wie pymodbus sie liefert.
byteorder_code: AB, ABCD, CDAB, BADC, DCBA (gemäß Template)
Rückgabe: bytes in der Reihenfolge, wie sie für struct.unpack benötigt werden.
"""
code = (byteorder_code or "ABCD").upper()
# Pro Register: 16-bit => zwei Bytes (MSB, LSB)
words = [struct.pack(">H", r & 0xFFFF) for r in registers] # big endian pro Wort
if len(words) == 1:
w = words[0] # b'\xAA\xBB'
if code in ("AB", "ABCD", "CDAB"):
return w
if code == "BADC": # byte swap
return w[::-1]
if code == "DCBA": # byte swap (bei 16-bit identisch zu BADC)
return w[::-1]
return w
# 32-bit (2 words) oder 64-bit (4 words): Word/Byte swaps abbilden
# words[0] = high word bytes, words[1] = low word bytes (in Modbus-Reihenfolge gelesen)
if code == "ABCD":
ordered = words
elif code == "CDAB":
# word swap
ordered = words[1:] + words[:1]
elif code == "BADC":
# byte swap innerhalb jedes Words
ordered = [w[::-1] for w in words]
elif code == "DCBA":
# word + byte swap
ordered = [w[::-1] for w in (words[1:] + words[:1])]
else:
ordered = words
return b"".join(ordered)
@staticmethod
def _decode_by_type(raw_bytes: bytes, data_type: str):
dt = (data_type or "").upper()
# struct: > = big endian, < = little endian
# Wir liefern raw_bytes bereits in der richtigen Reihenfolge; daher nutzen wir ">" konsistent.
if dt == "UINT16":
return struct.unpack(">H", raw_bytes[:2])[0]
if dt == "INT16":
return struct.unpack(">h", raw_bytes[:2])[0]
if dt == "UINT32":
return struct.unpack(">I", raw_bytes[:4])[0]
if dt == "INT32":
return struct.unpack(">i", raw_bytes[:4])[0]
if dt == "FLOAT32":
return struct.unpack(">f", raw_bytes[:4])[0]
if dt == "FLOAT64":
return struct.unpack(">d", raw_bytes[:8])[0]
raise ValueError(f"Unbekannter Data_Type: {dt}")
def _decode_value(self, registers: list[int], meta: dict):
raw = self._registers_to_bytes(registers, meta["byteorder"])
val = self._decode_by_type(raw, meta["data_type"])
return (val * meta["scaling"]) + meta["offset"]
# -------------
# Reading
# -------------
def get_state(self) -> dict:
data = {"Zeit": time.strftime("%Y-%m-%d %H:%M:%S")}
if not self.connect():
data["error"] = "connect_failed"
return data
try:
for address, meta in self.registers.items():
count = int(meta["length"])
result = self.client.read_input_registers(address, count=count)
if result.isError():
print(f"Fehler beim Lesen von Adresse {address}: {result}")
continue
try:
value = self._decode_value(result.registers, meta)
except Exception as e:
print(f"Decode-Fehler an Adresse {address} ({meta.get('tag','')}): {e}")
continue
# Optional filter
# if self._is_invalid_sentinel(value):
# continue
value = float(value)
desc = meta.get("desc") or ""
field_name = f"{address} - {desc}".strip(" -")
data[field_name] = float(value)
print(f"Adresse {address} - {desc}: {value}")
finally: finally:
self.close() self.client.close()
def get_registers(self):
# Excel-Datei mit den Input-Registerinformationen
excel_path = "data/ModBus TCPIP 1.17(1).xlsx"
xls = pd.ExcelFile(excel_path)
df_input_registers = xls.parse('04 Input Register')
# Relevante Spalten bereinigen
df_clean = df_input_registers[['MB Adresse', 'Variable', 'Beschreibung', 'Variabel Typ']].dropna()
df_clean['MB Adresse'] = df_clean['MB Adresse'].astype(int)
# Dictionary aus Excel erzeugen
self.registers = {
row['MB Adresse']: {
'desc': row['Beschreibung'],
'type': 'REAL' if row['Variabel Typ'] == 'REAL' else 'INT'
}
for _, row in df_clean.iterrows()
}
def get_data(self):
data = {}
data['Zeit'] = time.strftime('%Y-%m-%d %H:%M:%S')
for address, info in self.registers.items():
reg_type = info['type']
result = self.client.read_input_registers(address, count=2 if reg_type == 'REAL' else 1)
if result.isError():
print(f"Fehler beim Lesen von Adresse {address}: {result}")
continue
if reg_type == 'REAL':
value = result.registers[0] / 10.0
else:
value = result.registers[0]
print(f"Adresse {address} - {info['desc']}: {value}")
data[f"{address} - {info['desc']}"] = value
return data return data

77
main.py
View File

@@ -1,82 +1,17 @@
import time import time
from datetime import datetime from datetime import datetime
from data_base_influx import DataBaseInflux from data_base_csv import DataBaseCsv
from forecaster.weather_forecaster import WeatherForecaster
from heat_pump import HeatPump from heat_pump import HeatPump
from pv_inverter import PvInverter
from simulators.pv_plant_simulator import PvWattsSubarrayConfig, PvWattsPlant
from solaredge_meter import SolaredgeMeter
from shelly_pro_3m import ShellyPro3m
from energysystem import EnergySystem
from sg_ready_controller import SgReadyController
from pvlib.location import Location
import datetime as dt
# For dev-System run in terminal: ssh -N -L 127.0.0.1:8111:10.0.0.10:502 pi@192.168.1.146 interval = 10 # z.B. alle 10 Sekunden
# For productive-System change IP-adress in heatpump to '10.0.0.10' and port to 502
interval_seconds = 10 db = DataBaseCsv('modbus_log.csv')
hp = HeatPump(ip_address='10.0.0.10')
es = EnergySystem()
db = DataBaseInflux(
url="http://192.168.1.146:8086",
token="Cw_naEZyvJ3isiAh1P4Eq3TsjcHmzzDFS7SlbKDsS6ZWL04fMEYixWqtNxGThDdG27S9aW5g7FP9eiq5z1rsGA==",
org="allmende",
bucket="allmende_db_v3"
)
hp_master = HeatPump(device_name='hp_master', ip_address='10.0.0.10', port=502)
hp_slave = HeatPump(device_name='hp_slave', ip_address='10.0.0.11', port=502)
shelly = ShellyPro3m(device_name='wohnung_2_6', ip_address='192.168.1.121')
wr = PvInverter(device_name='solaredge_master', ip_address='192.168.1.112')
meter = SolaredgeMeter(device_name='solaredge_meter', ip_address='192.168.1.112')
es.add_components(hp_master, hp_slave, shelly, wr, meter)
controller = SgReadyController(es)
# FORECASTING
latitude = 48.041
longitude = 7.862
TZ = "Europe/Berlin"
HORIZON_DAYS = 2
weather_forecaster = WeatherForecaster(latitude=latitude, longitude=longitude)
site = Location(latitude=latitude, longitude=longitude, altitude=35, tz=TZ, name="Gundelfingen")
p_module = 435
upper_roof_north = PvWattsSubarrayConfig(name="north", pdc0_w=(29+29+21)*p_module, tilt_deg=10, azimuth_deg=20, dc_loss=0.02, ac_loss=0.01)
upper_roof_south = PvWattsSubarrayConfig(name="south", pdc0_w=(29+21+20)*p_module, tilt_deg=10, azimuth_deg=200, dc_loss=0.02, ac_loss=0.01)
upper_roof_east = PvWattsSubarrayConfig(name="east", pdc0_w=7*p_module, tilt_deg=10, azimuth_deg=110, dc_loss=0.02, ac_loss=0.01)
upper_roof_west = PvWattsSubarrayConfig(name="west", pdc0_w=7*p_module, tilt_deg=10, azimuth_deg=290, dc_loss=0.02, ac_loss=0.01)
cfgs = [upper_roof_north, upper_roof_south, upper_roof_east, upper_roof_west]
pv_plant = PvWattsPlant(site, cfgs)
now = datetime.now()
next_forecast_at = (now + dt.timedelta(hours=1)).replace(minute=0, second=0, microsecond=0)
while True: while True:
now = datetime.now() now = datetime.now()
if now.second % interval_seconds == 0 and now.microsecond < 100_000: if now.second % interval == 0 and now.microsecond < 100_000:
state = es.get_state_and_store_to_database(db) db.store_data(hp.get_data())
mode = controller.perform_action(heat_pump_name='hp_master', meter_name='solaredge_meter', state=state)
if mode == 'mode1':
mode_as_binary = 0
else:
mode_as_binary = 1
db.store_data('sg_ready', {'mode': mode_as_binary})
if now >= next_forecast_at:
# Start der Prognose: ab der kommenden vollen Stunde
start_hour_local = (now + dt.timedelta(hours=1)).replace(minute=0, second=0, microsecond=0)
weather = weather_forecaster.get_hourly_forecast(start_hour_local, HORIZON_DAYS)
total = pv_plant.get_power(weather)
db.store_forecasts('pv_forecast', total)
# Nächste geplante Ausführung definieren (immer volle Stunde)
# Falls wir durch Delay mehrere Stunden verpasst haben, hole auf:
while next_forecast_at <= now:
next_forecast_at = (next_forecast_at + dt.timedelta(hours=1)).replace(minute=0, second=0, microsecond=0)
time.sleep(0.1) time.sleep(0.1)

51554
modbus_log.csv Normal file

File diff suppressed because one or more lines are too long

View File

@@ -1,139 +0,0 @@
import time
import struct
import pandas as pd
from typing import Dict, Any, List, Tuple, Optional
from pymodbus.client import ModbusTcpClient
EXCEL_PATH = "modbus_registers/pv_inverter_registers.xlsx"
# Obergrenze: bis EXKLUSIVE 40206 (d.h. max. 40205)
MAX_ADDR_EXCLUSIVE = 40121
class PvInverter:
def __init__(self, device_name: str, ip_address: str, port: int = 502, unit: int = 1):
self.device_name = device_name
self.ip = ip_address
self.port = port
self.unit = unit
self.client: Optional[ModbusTcpClient] = None
self.registers: Dict[int, Dict[str, Any]] = {} # addr -> {"desc":..., "type":...}
self.connect_to_modbus()
self.load_registers(EXCEL_PATH)
# ---------- Verbindung ----------
def connect_to_modbus(self):
self.client = ModbusTcpClient(self.ip, port=self.port, timeout=3.0, retries=3)
if not self.client.connect():
print("❌ Verbindung zu Wechselrichter fehlgeschlagen.")
raise SystemExit(1)
print("✅ Verbindung zu Wechselrichter hergestellt.")
def close(self):
if self.client:
self.client.close()
self.client = None
# ---------- Register-Liste ----------
def load_registers(self, excel_path: str):
xls = pd.ExcelFile(excel_path)
df = xls.parse()
# Passe Spaltennamen hier an, falls nötig:
cols = ["MB Adresse", "Beschreibung", "Variabel Typ"]
df = df[cols].dropna()
df["MB Adresse"] = df["MB Adresse"].astype(int)
# 1) Vorab-Filter: nur Adressen < 40206 übernehmen
df = df[df["MB Adresse"] < MAX_ADDR_EXCLUSIVE]
self.registers = {
int(row["MB Adresse"]): {
"desc": str(row["Beschreibung"]).strip(),
"type": str(row["Variabel Typ"]).strip()
}
for _, row in df.iterrows()
}
# ---------- Low-Level Lesen ----------
def _try_read(self, fn_name: str, address: int, count: int) -> Optional[List[int]]:
fn = getattr(self.client, fn_name)
# pymodbus 3.8.x hat 'slave='; Fallbacks schaden nicht
for kwargs in (dict(address=address, count=count, slave=self.unit),
dict(address=address, count=count)):
try:
res = fn(**kwargs)
if res is None or (hasattr(res, "isError") and res.isError()):
continue
return res.registers
except TypeError:
continue
return None
def _read_any(self, address: int, count: int) -> Optional[List[int]]:
regs = self._try_read("read_holding_registers", address, count)
if regs is None:
regs = self._try_read("read_input_registers", address, count)
return regs
# ---------- Decoding ----------
@staticmethod
def _to_i16(u16: int) -> int:
return struct.unpack(">h", struct.pack(">H", u16))[0]
@staticmethod
def _to_f32_from_two(u16_hi: int, u16_lo: int, msw_first: bool = True) -> float:
b = struct.pack(">HH", u16_hi, u16_lo) if msw_first else struct.pack(">HH", u16_lo, u16_hi)
return struct.unpack(">f", b)[0]
# Hilfsfunktion: wie viele 16-Bit-Register braucht dieser Typ?
@staticmethod
def _word_count_for_type(rtype: str) -> int:
rt = (rtype or "").lower()
# Passe hier an deine Excel-Typen an:
if "uint32" in rt or "real" in rt or "float" in rt or "string(32)" in rt:
return 2
# Default: 1 Wort (z.B. int16/uint16)
return 1
def read_one(self, address_excel: int, rtype: str) -> Optional[float]:
"""
Liest einen Wert nach Typ ('INT' oder 'REAL' etc.).
Es werden ausschließlich Register < 40206 gelesen.
"""
addr = int(address_excel)
words = self._word_count_for_type(rtype)
# 2) Harte Grenze prüfen: höchstes angefasstes Register muss < 40206 sein
if addr + words - 1 >= MAX_ADDR_EXCLUSIVE:
# Überspringen, da der Lesevorgang die Grenze >= 40206 berühren würde
return None
if words == 2:
regs = self._read_any(addr, 2)
if not regs or len(regs) < 2:
return None
# Deine bisherige Logik interpretiert 2 Worte als Float32:
return self._to_f32_from_two(regs[0], regs[1])
else:
regs = self._read_any(addr, 1)
if not regs:
return None
return float(self._to_i16(regs[0]))
def get_state(self) -> Dict[str, Any]:
"""
Liest ALLE Register aus self.registers und gibt dict zurück.
Achtet darauf, dass keine Adresse (inkl. Mehrwort) >= 40206 gelesen wird.
"""
data = {"Zeit": time.strftime("%Y-%m-%d %H:%M:%S")}
for address, meta in sorted(self.registers.items()):
words = self._word_count_for_type(meta["type"])
# 3) Nochmals Schutz auf Ebene der Iteration:
if address + words - 1 >= MAX_ADDR_EXCLUSIVE:
continue
val = self.read_one(address, meta["type"])
if val is None:
continue
key = f"{address} - {meta['desc']}"
data[key] = val
return data

View File

@@ -1,5 +1,3 @@
pymodbus~=3.8.6 pymodbus~=3.8.6
pandas pandas
openpyxl openpyxl
sshtunnel
pvlib

View File

@@ -1,65 +0,0 @@
from pymodbus.client import ModbusTcpClient
class SgReadyController():
def __init__(self, es):
self.es = es
def perform_action(self, heat_pump_name, meter_name, state):
hp = self.es.get_component_by_name(heat_pump_name)
meter_values = state[meter_name]
power_to_grid = meter_values['40206 - M_AC_Power'] * 10 ** meter_values['40210 - M_AC_Power_SF']
mode = None
if power_to_grid > 10000:
mode = 'mode2'
self.switch_sg_ready_mode(hp.ip, hp.port, mode)
elif power_to_grid < 0:
mode = 'mode1'
self.switch_sg_ready_mode(hp.ip, hp.port, mode)
return mode
def switch_sg_ready_mode(self, ip, port, mode):
"""
Register 300: 1=BUS 0= Hardware Kontakte
Register 301 & 302:
0-0= Kein Offset
0-1 Boiler und Heizung Offset
1-1 Boiler Offset + E-Einsatz Sollwert Erhöht
1-0 SG EVU Sperre
:param ip:
:param mode:
'mode1' = [True, False, False] => SG Ready deactivated
'mode2' = [True, False, True] => SG ready activated for heatpump only
'mode3' = [True, True, True] => SG ready activated for heatpump and heat rod
:return:
"""
client = ModbusTcpClient(ip, port=port)
if not client.connect():
print("Verbindung zur Wärmepumpe fehlgeschlagen.")
return
mode_code = None
if mode == 'mode1':
mode_code = [True, False, False]
elif mode == 'mode2':
mode_code = [True, False, True]
elif mode == 'mode3':
mode_code = [True, True, True]
else:
print('Uncorrect or no string for mode!')
try:
response_300 = client.write_coil(300, mode_code[0])
response_301 = client.write_coil(301, mode_code[1])
response_302 = client.write_coil(302, mode_code[2])
# Optional: Rückmeldungen prüfen
for addr, resp in zip([300, 301, 302], [response_300, response_301, response_302]):
if resp.isError():
print(f"Fehler beim Schreiben von Coil {addr}: {resp}")
else:
print(f"Coil {addr} erfolgreich geschrieben.")
finally:
client.close()

View File

@@ -1,64 +0,0 @@
import struct
from pymodbus.client import ModbusTcpClient
import pandas as pd
import time
class ShellyPro3m:
def __init__(self, device_name: str, ip_address: str, port: int=502):
self.device_name = device_name
self.ip = ip_address
self.port = port
self.client = None
self.connect_to_modbus()
self.registers = None
self.get_registers()
def connect_to_modbus(self):
port = self.port
self.client = ModbusTcpClient(self.ip, port=port)
try:
if not self.client.connect():
print("Verbindung zum Shelly-Logger fehlgeschlagen.")
exit(1)
print("Verbindung zum Shelly-Logger erfolgreich.")
except KeyboardInterrupt:
print("Beendet durch Benutzer (Ctrl+C).")
finally:
self.client.close()
def get_registers(self):
# Excel-Datei mit den Input-Registerinformationen
excel_path = "modbus_registers/shelly_pro_3m_registers.xlsx"
xls = pd.ExcelFile(excel_path)
df_input_registers = xls.parse()
# Relevante Spalten bereinigen
df_clean = df_input_registers[['MB Adresse', 'Beschreibung', 'Variabel Typ']].dropna()
df_clean['MB Adresse'] = df_clean['MB Adresse'].astype(int)
# Dictionary aus Excel erzeugen
self.registers = {
row['MB Adresse']: {
'desc': row['Beschreibung'],
'type': 'REAL' if row['Variabel Typ'] == 'REAL' else 'INT'
}
for _, row in df_clean.iterrows()
}
def get_state(self):
data = {}
data['Zeit'] = time.strftime('%Y-%m-%d %H:%M:%S')
for address, info in self.registers.items():
reg_type = info['type']
result = self.client.read_input_registers(address, count=2 if reg_type == 'REAL' else 1)
if result.isError():
print(f"Fehler beim Lesen von Adresse {address}: {result}")
continue
packed = struct.pack(">HH", result.registers[1], result.registers[0])
value = round(struct.unpack(">f", packed)[0], 2)
print(f"Adresse {address} - {info['desc']}: {value}")
data[f"{address} - {info['desc']}"] = value
return data

View File

@@ -1,210 +0,0 @@
from __future__ import annotations
from dataclasses import dataclass
from typing import Optional, Dict, List, Literal, Tuple, Union
import numpy as np
import pandas as pd
import pvlib
import matplotlib.pyplot as plt
from pvlib.location import Location
from pvlib.pvsystem import PVSystem
from pvlib.modelchain import ModelChain
SeriesOrArray = Union[pd.Series, np.ndarray]
# ----------------------------- Konfiguration -----------------------------
@dataclass
class PvWattsSubarrayConfig:
name: str
pdc0_w: float # STC-DC-Leistung [W]
tilt_deg: float # Neigung (0=horizontal)
azimuth_deg: float # Azimut (180=Süd)
gamma_pdc: float = -0.004 # Tempkoeff. [1/K]
eta_inv_nom: float = 0.96 # WR-Wirkungsgrad (nominal)
albedo: float = 0.2 # Bodenreflexion
# Pauschale Verluste (PVWatts-Losses)
dc_loss: float = 0.0
ac_loss: float = 0.0
soiling: float = 0.0
# Modell
transposition_model: Literal["perez","haydavies","isotropic","klucher","reindl"] = "perez"
# ------------------------------ Subarray ---------------------------------
class PvWattsSubarray:
"""
Ein Subarray mit pvlib.ModelChain (PVWatts).
Berechnet automatisch DNI/DHI aus GHI (ERBS-Methode)
und nutzt ein SAPM-Temperaturmodell.
"""
def __init__(self, cfg: PvWattsSubarrayConfig, location: Location):
self.cfg = cfg
self.location = location
self._mc: Optional[ModelChain] = None
# ---------------------------------------------------------------------
def _create_modelchain(self) -> ModelChain:
"""Erzeuge eine pvlib.ModelChain-Instanz mit PVWatts-Parametern."""
temp_params = pvlib.temperature.TEMPERATURE_MODEL_PARAMETERS["sapm"]["open_rack_glass_polymer"]
system = PVSystem(
surface_tilt=self.cfg.tilt_deg,
surface_azimuth=self.cfg.azimuth_deg,
module_parameters={"pdc0": self.cfg.pdc0_w, "gamma_pdc": self.cfg.gamma_pdc},
inverter_parameters={"pdc0": self.cfg.pdc0_w, "eta_inv_nom": self.cfg.eta_inv_nom},
albedo=self.cfg.albedo,
temperature_model_parameters=temp_params,
module_type="glass_polymer",
racking_model="open_rack",
)
mc = ModelChain(
system, self.location,
transposition_model=self.cfg.transposition_model,
solar_position_method="nrel_numpy",
airmass_model="kastenyoung1989",
dc_model="pvwatts",
ac_model="pvwatts",
aoi_model="physical",
spectral_model=None,
losses_model="pvwatts",
temperature_model="sapm",
)
mc.losses_parameters = {
"dc_loss": float(self.cfg.dc_loss),
"ac_loss": float(self.cfg.ac_loss),
"soiling": float(self.cfg.soiling),
}
self._mc = mc
return mc
# ---------------------------------------------------------------------
def calc_dni_and_dhi(self, weather: pd.DataFrame) -> pd.DataFrame:
"""
Berechnet DNI & DHI aus GHI über die ERBS-Methode.
Gibt ein neues DataFrame mit 'ghi', 'dni', 'dhi' zurück.
"""
if "ghi" not in weather:
raise ValueError("Wetterdaten benötigen mindestens 'ghi'.")
# Sonnenstand bestimmen
sp = self.location.get_solarposition(weather.index)
erbs = pvlib.irradiance.erbs(weather["ghi"], sp["zenith"], weather.index)
out = weather.copy()
out["dni"] = erbs["dni"].clip(lower=0)
out["dhi"] = erbs["dhi"].clip(lower=0)
return out
# ---------------------------------------------------------------------
def _prepare_weather(self, weather: pd.DataFrame) -> pd.DataFrame:
"""Sichert vollständige Spalten (ghi, dni, dhi, temp_air, wind_speed)."""
if "ghi" not in weather or "temp_air" not in weather:
raise ValueError("weather benötigt Spalten: 'ghi' und 'temp_air'.")
w = weather.copy()
# Zeitzone prüfen
if w.index.tz is None:
w.index = w.index.tz_localize(self.location.tz)
else:
if str(w.index.tz) != str(self.location.tz):
w = w.tz_convert(self.location.tz)
# Wind default
if "wind_speed" not in w:
w["wind_speed"] = 1.0
# DNI/DHI ergänzen (immer mit ERBS)
if "dni" not in w or "dhi" not in w:
w = self.calc_dni_and_dhi(w)
return w
# ---------------------------------------------------------------------
def get_power(self, weather: pd.DataFrame) -> pd.Series:
"""
Berechnet AC-Leistung aus Wetterdaten.
"""
w = self._prepare_weather(weather)
mc = self._create_modelchain()
mc.run_model(weather=w)
return mc.results.ac.rename(self.cfg.name)
# ------------------------------- Anlage ----------------------------------
class PvWattsPlant:
"""
Eine PV-Anlage mit mehreren Subarrays, die ein gemeinsames Wetter-DataFrame nutzt.
"""
def __init__(self, site: Location, subarray_cfgs: List[PvWattsSubarrayConfig]):
self.site = site
self.subs: Dict[str, PvWattsSubarray] = {c.name: PvWattsSubarray(c, site) for c in subarray_cfgs}
def get_power(
self,
weather: pd.DataFrame,
*,
return_breakdown: bool = False
) -> pd.Series | Tuple[pd.Series, Dict[str, pd.Series]]:
"""Berechne Gesamtleistung und optional Einzel-Subarrays."""
parts: Dict[str, pd.Series] = {name: sub.get_power(weather) for name, sub in self.subs.items()}
# gemeinsamen Index bilden
idx = list(parts.values())[0].index
for s in parts.values():
idx = idx.intersection(s.index)
parts = {k: v.reindex(idx).fillna(0.0) for k, v in parts.items()}
total = sum(parts.values())
total.name = "total_ac"
if return_breakdown:
return total, parts
return total
# --------------------------- Beispielnutzung -----------------------------
if __name__ == "__main__":
# Standort
site = Location(latitude=52.52, longitude=13.405, altitude=35, tz="Europe/Berlin", name="Berlin")
# Zeitachse: 1 Tag, 15-minütig
times = pd.date_range("2025-06-21 00:00", "2025-06-21 23:45", freq="15min", tz=site.tz)
# Dummy-Wetter
ghi = 1000 * np.clip(np.sin(np.linspace(0, np.pi, len(times)))**1.2, 0, None)
temp_air = 16 + 8 * np.clip(np.sin(np.linspace(-np.pi/2, np.pi/2, len(times))), 0, None)
wind = np.full(len(times), 1.0)
weather = pd.DataFrame(index=times)
weather["ghi"] = ghi
weather["temp_air"] = temp_air
weather["wind_speed"] = wind
# Zwei Subarrays
cfgs = [
PvWattsSubarrayConfig(name="Sued_30", pdc0_w=6000, tilt_deg=30, azimuth_deg=180, dc_loss=0.02, ac_loss=0.01),
PvWattsSubarrayConfig(name="West_20", pdc0_w=4000, tilt_deg=20, azimuth_deg=270, soiling=0.02),
]
plant = PvWattsPlant(site, cfgs)
# Simulation
total, parts = plant.get_power(weather, return_breakdown=True)
# Plot
plt.figure(figsize=(10, 6))
plt.plot(total.index, total / 1000, label="Gesamtleistung (AC)", linewidth=2, color="black")
for name, s in parts.items():
plt.plot(s.index, s / 1000, label=name)
plt.title("PV-Leistung (PVWatts, ERBS-Methode für DNI/DHI)")
plt.ylabel("Leistung [kW]")
plt.xlabel("Zeit")
plt.legend()
plt.grid(True, linestyle="--", alpha=0.5)
plt.tight_layout()
plt.show()

View File

@@ -1,134 +0,0 @@
import time
import struct
import pandas as pd
from typing import Dict, Any, List, Tuple, Optional
from pymodbus.client import ModbusTcpClient
EXCEL_PATH = "modbus_registers/pv_inverter_registers.xlsx"
# Obergrenze: bis EXKLUSIVE 40206 (d.h. max. 40205)
MIN_ADDR_INCLUSIVE = 40121
ADDRESS_SHIFT = 50
class SolaredgeMeter:
def __init__(self, device_name: str, ip_address: str, port: int = 502, unit: int = 1):
self.device_name = device_name
self.ip = ip_address
self.port = port
self.unit = unit
self.client: Optional[ModbusTcpClient] = None
self.registers: Dict[int, Dict[str, Any]] = {} # addr -> {"desc":..., "type":...}
self.connect_to_modbus()
self.load_registers(EXCEL_PATH)
# ---------- Verbindung ----------
def connect_to_modbus(self):
self.client = ModbusTcpClient(self.ip, port=self.port, timeout=3.0, retries=3)
if not self.client.connect():
print("❌ Verbindung zu Zähler fehlgeschlagen.")
raise SystemExit(1)
print("✅ Verbindung zu Zähler hergestellt.")
def close(self):
if self.client:
self.client.close()
self.client = None
# ---------- Register-Liste ----------
def load_registers(self, excel_path: str):
xls = pd.ExcelFile(excel_path)
df = xls.parse()
# Passe Spaltennamen hier an, falls nötig:
cols = ["MB Adresse", "Beschreibung", "Variabel Typ"]
df = df[cols].dropna()
df["MB Adresse"] = df["MB Adresse"].astype(int)
# 1) Vorab-Filter: nur Adressen < 40206 übernehmen
df = df[df["MB Adresse"] >= MIN_ADDR_INCLUSIVE]
self.registers = {
int(row["MB Adresse"]): {
"desc": str(row["Beschreibung"]).strip(),
"type": str(row["Variabel Typ"]).strip()
}
for _, row in df.iterrows()
}
# ---------- Low-Level Lesen ----------
def _try_read(self, fn_name: str, address: int, count: int) -> Optional[List[int]]:
fn = getattr(self.client, fn_name)
# pymodbus 3.8.x hat 'slave='; Fallbacks schaden nicht
shifted_addr = address + ADDRESS_SHIFT
for kwargs in (dict(address=shifted_addr, count=count, slave=self.unit),
dict(address=shifted_addr, count=count)):
try:
res = fn(**kwargs)
if res is None or (hasattr(res, "isError") and res.isError()):
continue
return res.registers
except TypeError:
continue
return None
def _read_any(self, address: int, count: int) -> Optional[List[int]]:
regs = self._try_read("read_holding_registers", address, count)
if regs is None:
regs = self._try_read("read_input_registers", address, count)
return regs
# ---------- Decoding ----------
@staticmethod
def _to_i16(u16: int) -> int:
return struct.unpack(">h", struct.pack(">H", u16))[0]
@staticmethod
def _to_f32_from_two(u16_hi: int, u16_lo: int, msw_first: bool = True) -> float:
b = struct.pack(">HH", u16_hi, u16_lo) if msw_first else struct.pack(">HH", u16_lo, u16_hi)
return struct.unpack(">f", b)[0]
# Hilfsfunktion: wie viele 16-Bit-Register braucht dieser Typ?
@staticmethod
def _word_count_for_type(rtype: str) -> int:
rt = (rtype or "").lower()
# Passe hier an deine Excel-Typen an:
if "uint32" in rt or "real" in rt or "float" in rt or "string(32)" in rt:
return 2
# Default: 1 Wort (z.B. int16/uint16)
return 1
def read_one(self, address_excel: int, rtype: str) -> Optional[float]:
"""
Liest einen Wert nach Typ ('INT' oder 'REAL' etc.).
Es werden ausschließlich Register < 40206 gelesen.
"""
addr = int(address_excel)
words = self._word_count_for_type(rtype)
if words == 2:
regs = self._read_any(addr, 2)
if not regs or len(regs) < 2:
return None
# Deine bisherige Logik interpretiert 2 Worte als Float32:
return self._to_f32_from_two(regs[0], regs[1])
else:
regs = self._read_any(addr, 1)
if not regs:
return None
return float(self._to_i16(regs[0]))
def get_state(self) -> Dict[str, Any]:
"""
Liest ALLE Register aus self.registers und gibt dict zurück.
Achtet darauf, dass keine Adresse (inkl. Mehrwort) >= 40206 gelesen wird.
"""
data = {"Zeit": time.strftime("%Y-%m-%d %H:%M:%S")}
for address, meta in sorted(self.registers.items()):
words = self._word_count_for_type(meta["type"])
val = self.read_one(address, meta["type"])
if val is None:
continue
key = f"{address} - {meta['desc']}"
data[key] = val
return data

5108945
terminal_log Normal file

File diff suppressed because it is too large Load Diff