31 Commits

Author SHA1 Message Date
Nils Reiners
f1c5c99621 minor changes 2026-01-08 10:26:01 +01:00
Nils Reiners
2e17412b4a pv forecaster will not be used in master any more 2026-01-08 10:24:07 +01:00
Nils Reiners
afbcd81310 transformation of data from old database to new was updated. 2026-01-08 10:18:24 +01:00
Nils Reiners
fd87257f37 data are written in new database as the datatypes of the original database does not fit any more. 2026-01-06 23:07:06 +01:00
Nils Reiners
876115cf6e daten werden nicht korrekt in datenbank abgelegt 2026-01-06 18:51:35 +01:00
Nils Reiners
f0e7c1338b path to hp excel was modified 2026-01-06 17:13:14 +01:00
Nils Reiners
8642a057f0 excel sheet for heat pump registers now in template form. tested with script that was also added in folder. sg-ready testing file was also added. 2026-01-06 17:01:50 +01:00
Nils Reiners
ce14d59d51 adresse für hp angepasst 2026-01-05 17:15:25 +01:00
Nils Reiners
4727364048 scheint zu laufen 2025-12-09 22:07:57 +01:00
Nils Reiners
666eb211a3 old version of pv_forecaster restored 2025-10-29 22:03:46 +01:00
Nils Reiners
ba6ff9f6c3 stündliche Speicherung des Forecasts angepasst 2025-10-07 22:34:16 +02:00
Nils Reiners
9ccb1e042b stündliche Speicherung des Forecasts angepasst 2025-10-07 22:33:02 +02:00
Nils Reiners
a5bcfca39a stündliche Speicherung des Forecasts angepasst 2025-10-07 22:29:49 +02:00
Nils Reiners
a1f9e29134 pv forecaster added 2025-10-07 20:52:28 +02:00
Nils Reiners
98302b9af5 heat pump slave added 2025-09-28 20:21:54 +02:00
Nils Reiners
f3de1f9280 mode as binary 2025-09-25 21:45:09 +02:00
Nils Reiners
ecd0180483 debug 2025-09-25 21:30:42 +02:00
Nils Reiners
1784b7c283 storing sg ready mode to db 2025-09-25 21:24:45 +02:00
Nils Reiners
b066658eb0 controller implemented and tested 2025-09-25 21:16:51 +02:00
Nils Reiners
0bcf8a2d8c inverter and meter seems to run 2025-09-18 14:14:53 +02:00
Nils Reiners
397935f51a minor changes 2025-09-16 22:55:13 +02:00
Nils Reiners
8eda3bc954 reading out registers corrected 2025-09-16 22:46:42 +02:00
Nils Reiners
b9cba11be7 cleaned up 2025-09-16 12:57:37 +02:00
Nils Reiners
5319a299be inverter was included 2025-09-16 12:52:27 +02:00
Nils Reiners
2186c4d7db wechselrichter zum tesent eingebunden 2025-09-14 10:52:50 +02:00
Nils Reiners
7df61fd6c1 shelly upgedatet 2025-05-26 21:31:28 +02:00
Nils Reiners
0734f7a810 shelly hinzugefügt 2025-05-26 21:08:16 +02:00
Nils Reiners
65a75e061b läuft 2025-04-26 22:31:14 +01:00
Nils Reiners
974ec43f10 influx data base added 2025-04-26 23:13:22 +02:00
Nils Reiners
f0d390cd59 Merge branch 'feature_wp_klasse' 2025-04-18 18:59:48 +02:00
Nils Reiners
a7e67cc8f1 daten übertragen 2025-04-18 12:46:15 +01:00
35 changed files with 1243 additions and 101167 deletions

38
README
View File

@@ -11,10 +11,42 @@ Was needs to be done on the Raspberry pi before the tool can run.
- pip install -r requirements.txt - pip install -r requirements.txt
How to run the script: 3) How to run the script for testing:
- nohup python main.py > terminal_log 2>&1 & nohup python main.py > terminal_log 2>&1 &
For reading out the terminal_log while script is runing: For reading out the terminal_log while script is runing:
- tail -f terminal_log tail -f terminal_log
4) Implement and run the ems as systemd service:
create:
/etc/systemd/system/allmende_ems.service
insert:
[Unit]
Description=Allmende EMS Python Script
After=network.target
[Service]
WorkingDirectory=/home/pi/projects/allmende_ems
ExecStart=/home/pi/allmende_ems/bin/python3.11 /home/pi/projects/allmende_ems/main.py
Restart=always
RestartSec=5
StandardOutput=journal
StandardError=journal
[Install]
WantedBy=multi-user.target
manage the service with the following commands:
Once:
sudo systemctl daemon-reload
sudo systemctl start allmende_ems.service
sudo systemctl enable allmende_ems.service
While running:
sudo systemctl status allmende_ems.service
sudo systemctl restart allmende_ems.service
sudo systemctl stop allmende_ems.service
journalctl -u allmende_ems.service

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@@ -0,0 +1,7 @@
from heat_pump import HeatPump
hp_master = HeatPump(device_name='hp_master', ip_address='10.0.0.10', port=502, excel_path="../modbus_registers/heat_pump_registers.xlsx")
state = hp_master.get_state()
print(state)

View File

@@ -0,0 +1,49 @@
from pymodbus.client import ModbusTcpClient
def switch_sg_ready_mode(ip, port, mode):
"""
Register 300: 1=BUS 0= Hardware Kontakte
Register 301 & 302:
0-0= Kein Offset
0-1 Boiler und Heizung Offset
1-1 Boiler Offset + E-Einsatz Sollwert Erhöht
1-0 SG EVU Sperre
:param ip:
:param mode:
'mode1' = [True, False, False] => SG Ready deactivated
'mode2' = [True, False, True] => SG ready activated for heatpump only
'mode3' = [True, True, True] => SG ready activated for heatpump and heat rod
:return:
"""
client = ModbusTcpClient(ip, port=port)
if not client.connect():
print("Verbindung zur Wärmepumpe fehlgeschlagen.")
return
mode_code = None
if mode == 'mode1':
mode_code = [True, False, False]
elif mode == 'mode2':
mode_code = [True, False, True]
elif mode == 'mode3':
mode_code = [True, True, True]
else:
print('Uncorrect or no string for mode!')
try:
response_300 = client.write_coil(300, mode_code[0])
response_301 = client.write_coil(301, mode_code[1])
response_302 = client.write_coil(302, mode_code[2])
# Optional: Rückmeldungen prüfen
for addr, resp in zip([300, 301, 302], [response_300, response_301, response_302]):
if resp.isError():
print(f"Fehler beim Schreiben von Coil {addr}: {resp}")
else:
print(f"Coil {addr} erfolgreich geschrieben.")
finally:
client.close()
if '__name__' == '__main__':
switch_sg_ready_mode(ip='10.0.0.10', port=502, mode='mode2')

View File

@@ -1,46 +0,0 @@
import csv
import os
import tempfile
import shutil
class DataBaseCsv:
def __init__(self, filename: str):
self.filename = filename
def store_data(self, data: dict):
new_fields = list(data.keys())
# If file does not exist or is empty → create new file with header
if not os.path.exists(self.filename) or os.path.getsize(self.filename) == 0:
with open(self.filename, mode='w', newline='') as csv_file:
writer = csv.DictWriter(csv_file, fieldnames=new_fields)
writer.writeheader()
writer.writerow(data)
return
# If file exists → read existing header and data
with open(self.filename, mode='r', newline='') as csv_file:
reader = csv.DictReader(csv_file)
existing_fields = reader.fieldnames
existing_data = list(reader)
# Merge old and new fields (keep original order, add new ones)
all_fields = existing_fields.copy()
for field in new_fields:
if field not in all_fields:
all_fields.append(field)
# Write to a temporary file with updated header
with tempfile.NamedTemporaryFile(mode='w', delete=False, newline='', encoding='utf-8') as tmp_file:
writer = csv.DictWriter(tmp_file, fieldnames=all_fields)
writer.writeheader()
# Write old rows with updated field list
for row in existing_data:
writer.writerow({field: row.get(field, '') for field in all_fields})
# Write new data row
writer.writerow({field: data.get(field, '') for field in all_fields})
# Replace original file with updated temporary file
shutil.move(tmp_file.name, self.filename)

48
data_base_influx.py Normal file
View File

@@ -0,0 +1,48 @@
from influxdb_client import InfluxDBClient, Point, WritePrecision
from datetime import datetime
import datetime as dt
import pandas as pd
class DataBaseInflux:
def __init__(self, url: str, token: str, org: str, bucket: str):
self.url = url
self.token = token
self.org = org
self.bucket = bucket
self.client = InfluxDBClient(url=self.url, token=self.token, org=self.org)
self.write_api = self.client.write_api()
def store_data(self, device_name: str, data: dict):
measurement = device_name # Fest auf "messungen" gesetzt
point = Point(measurement)
# Alle Key/Value-Paare als Fields speichern
for key, value in data.items():
point = point.field(key, value)
# Zeitstempel automatisch auf jetzt setzen
point = point.time(datetime.utcnow(), WritePrecision.NS)
# Punkt in InfluxDB schreiben
self.write_api.write(bucket=self.bucket, org=self.org, record=point)
def store_forecasts(self, forecast_name: str, data: pd.Series):
measurement = forecast_name
run_tag = dt.datetime.now(dt.timezone.utc).replace(second=0, microsecond=0).isoformat(timespec="minutes")
pts = []
series = pd.to_numeric(data, errors="coerce").dropna()
for ts, val in series.items():
pts.append(
Point(measurement)
.tag("run", run_tag)
.field("value", float(val))
.time(ts.to_pydatetime(), WritePrecision.S)
)
self.write_api.write(bucket=self.bucket, org=self.org, record=pts)

View File

@@ -0,0 +1,213 @@
import os, re, math, time
from datetime import datetime, timezone, timedelta
import pandas as pd
from influxdb_client import InfluxDBClient, Point, WritePrecision
from influxdb_client.client.write_api import SYNCHRONOUS
from influxdb_client.rest import ApiException
# -----------------------
# CONFIG
# -----------------------
INFLUX_URL = "http://192.168.1.146:8086"
INFLUX_ORG = "allmende"
INFLUX_TOKEN = os.environ.get("INFLUX_TOKEN", "Cw_naEZyvJ3isiAh1P4Eq3TsjcHmzzDFS7SlbKDsS6ZWL04fMEYixWqtNxGThDdG27S9aW5g7FP9eiq5z1rsGA==")
SOURCE_BUCKET = "allmende_db"
TARGET_BUCKET = "allmende_db_v2"
MEASUREMENTS = [
"hp_master", "hp_slave", "pv_forecast", "sg_ready",
"solaredge_master", "solaredge_meter", "solaredge_slave", "wohnung_2_6"
]
START_DT = datetime(2025, 6, 1, tzinfo=timezone.utc)
STOP_DT = datetime.now(timezone.utc)
WINDOW = timedelta(days=1)
EXCEL_PATH = "../modbus_registers/heat_pump_registers.xlsx"
EXCEL_SHEET = "Register_Map"
BATCH_SIZE = 1000
MAX_RETRIES = 8
# -----------------------
# Helpers
# -----------------------
def normalize(s) -> str:
s = "" if s is None else str(s).strip()
return re.sub(r"\s+", " ", s)
def is_invalid_sentinel(v: float) -> bool:
return v in (-999.9, -999.0, 30000.0, 32767.0, 65535.0)
def ensure_bucket(client: InfluxDBClient, name: str):
bapi = client.buckets_api()
if bapi.find_bucket_by_name(name):
return
bapi.create_bucket(bucket_name=name, org=INFLUX_ORG, retention_rules=None)
def build_field_type_map_from_excel(path: str) -> dict[str, str]:
df = pd.read_excel(path, sheet_name=EXCEL_SHEET)
df = df[df["Register_Type"].astype(str).str.upper() == "IR"].copy()
df["Address"] = df["Address"].astype(int)
df["Description"] = df["Description"].fillna("").astype(str)
df["Tag_Name"] = df["Tag_Name"].fillna("").astype(str)
df["Data_Type"] = df["Data_Type"].fillna("").astype(str)
m: dict[str, str] = {}
for _, r in df.iterrows():
addr = int(r["Address"])
desc = normalize(r["Description"])
tag = normalize(r["Tag_Name"])
dtp = normalize(r["Data_Type"]).upper()
if tag:
m[tag] = dtp
old_key = normalize(f"{addr} - {desc}".strip(" -"))
if old_key:
m[old_key] = dtp
return m
def coerce_value_to_dtype(v, dtype: str):
if v is None:
return None
dtp = (dtype or "").upper()
if isinstance(v, (int, float)):
fv = float(v)
if math.isnan(fv) or math.isinf(fv):
return None
if dtp in ("BOOL", "BOOLEAN"):
if isinstance(v, bool): return v
if isinstance(v, (int, float)): return bool(int(v))
return None
if dtp.startswith("INT") or dtp.startswith("UINT"):
if isinstance(v, bool): return int(v)
if isinstance(v, (int, float)): return int(float(v))
return None
if dtp.startswith("FLOAT") or dtp in ("DOUBLE",):
if isinstance(v, bool): return float(int(v))
if isinstance(v, (int, float)): return float(v)
return None
return None
def write_with_retry(write_api, batch):
delay = 1.0
last_msg = ""
for _ in range(MAX_RETRIES):
try:
write_api.write(bucket=TARGET_BUCKET, org=INFLUX_ORG, record=batch)
return
except ApiException as e:
last_msg = getattr(e, "body", "") or str(e)
status = getattr(e, "status", None)
if "timeout" in last_msg.lower() or status in (429, 500, 502, 503, 504):
time.sleep(delay)
delay = min(delay * 2, 30)
continue
raise
raise RuntimeError(f"Write failed after {MAX_RETRIES} retries: {last_msg}")
def window_already_migrated(query_api, measurement: str, start: datetime, stop: datetime) -> bool:
# Prüft: gibt es im Zielbucket im Fenster mindestens 1 Punkt?
flux = f'''
from(bucket: "{TARGET_BUCKET}")
|> range(start: time(v: "{start.isoformat()}"), stop: time(v: "{stop.isoformat()}"))
|> filter(fn: (r) => r._measurement == "{measurement}")
|> limit(n: 1)
'''
tables = query_api.query(flux, org=INFLUX_ORG)
for t in tables:
if t.records:
return True
return False
def migrate_window(query_api, write_api, measurement: str,
start: datetime, stop: datetime,
type_map: dict[str, str],
do_type_cast: bool) -> int:
flux = f'''
from(bucket: "{SOURCE_BUCKET}")
|> range(start: time(v: "{start.isoformat()}"), stop: time(v: "{stop.isoformat()}"))
|> filter(fn: (r) => r._measurement == "{measurement}")
|> keep(columns: ["_time","_measurement","_field","_value"])
'''
tables = query_api.query(flux, org=INFLUX_ORG)
batch, written = [], 0
for table in tables:
for rec in table.records:
t = rec.get_time()
field = normalize(rec.get_field())
value = rec.get_value()
if value is None:
continue
if do_type_cast:
dtp = type_map.get(field)
if dtp:
cv = coerce_value_to_dtype(value, dtp)
if cv is None:
continue
if isinstance(cv, (int, float)) and is_invalid_sentinel(float(cv)):
continue
value = cv
# kein Mapping -> unverändert schreiben
batch.append(Point(measurement).field(field, value).time(t, WritePrecision.NS))
if len(batch) >= BATCH_SIZE:
write_with_retry(write_api, batch)
written += len(batch)
batch = []
if batch:
write_with_retry(write_api, batch)
written += len(batch)
return written
# -----------------------
# Main
# -----------------------
def main():
if not INFLUX_TOKEN:
raise RuntimeError("INFLUX_TOKEN fehlt (Env-Var INFLUX_TOKEN setzen).")
with InfluxDBClient(url=INFLUX_URL, token=INFLUX_TOKEN, org=INFLUX_ORG, timeout=900_000) as client:
ensure_bucket(client, TARGET_BUCKET)
type_map = build_field_type_map_from_excel(EXCEL_PATH)
query_api = client.query_api()
write_api = client.write_api(write_options=SYNCHRONOUS)
for meas in MEASUREMENTS:
do_cast = meas in ("hp_master", "hp_slave")
cur, total = START_DT, 0
print(f"\n== {meas} (cast={'ON' if do_cast else 'OFF'}) ==")
while cur < STOP_DT:
nxt = min(cur + WINDOW, STOP_DT)
if window_already_migrated(query_api, meas, cur, nxt):
print(f"{cur.isoformat()} -> {nxt.isoformat()} : SKIP (existiert schon)")
cur = nxt
continue
n = migrate_window(query_api, write_api, meas, cur, nxt, type_map, do_cast)
total += n
print(f"{cur.isoformat()} -> {nxt.isoformat()} : {n} (gesamt {total})")
cur = nxt
print(f"== Fertig {meas}: {total} Punkte ==")
if __name__ == "__main__":
main()

25
energysystem.py Normal file
View File

@@ -0,0 +1,25 @@
class EnergySystem():
def __init__(self):
self.components = []
def add_components(self, *args):
for comp in args:
self.components.append(comp)
def get_state_and_store_to_database(self, db):
state = {}
for comp in self.components:
component_state = comp.get_state()
state[comp.device_name] = component_state
db.store_data(comp.device_name, component_state)
return state
def get_component_by_name(self, name):
for comp in self.components:
if comp.device_name == name:
return comp

View File

@@ -0,0 +1,61 @@
#!/usr/bin/env python3
import time
import datetime as dt
import requests
from zoneinfo import ZoneInfo
from matplotlib import pyplot as plt
import pandas as pd
TZ = "Europe/Berlin"
DAYS = 2
OPEN_METEO_URL = "https://api.open-meteo.com/v1/forecast"
class WeatherForecaster:
def __init__(self, latitude, longitude):
self.lat = latitude
self.lon = longitude
def get_hourly_forecast(self, start_hour, days):
start_hour_local = start_hour
end_hour_local = start_hour_local + dt.timedelta(days=days)
params = {
"latitude": self.lat,
"longitude": self.lon,
"hourly": ["temperature_2m", "shortwave_radiation", "wind_speed_10m"],
"timezone": TZ,
"start_hour": start_hour_local.strftime("%Y-%m-%dT%H:%M"),
"end_hour": end_hour_local.strftime("%Y-%m-%dT%H:%M")
}
h = requests.get(OPEN_METEO_URL, params=params).json()["hourly"]
time_stamps = h["time"]
time_stamps = [
dt.datetime.fromisoformat(t).replace(tzinfo=ZoneInfo(TZ))
for t in time_stamps
]
weather = pd.DataFrame(index=time_stamps)
weather["ghi"] = h["shortwave_radiation"]
weather["temp_air"] = h["temperature_2m"]
weather["wind_speed"] = h["wind_speed_10m"]
return weather
if __name__=='__main__':
weather_forecast = WeatherForecaster(latitude=48.041, longitude=7.862)
while True:
now = dt.datetime.now()
secs = 60 - now.second #(60 - now.minute) * 60 - now.second # Sekunden bis volle Stunde
time.sleep(secs)
now_local = dt.datetime.now()
start_hour_local = (now_local + dt.timedelta(hours=1)).replace(minute=0, second=0, microsecond=0)
time_stamps, temps, ghi, wind_speed = weather_forecast.get_hourly_forecast(start_hour_local, DAYS)
plt.plot(time_stamps, temps)
plt.show()

View File

@@ -1,62 +1,173 @@
from pymodbus.client import ModbusTcpClient from pymodbus.client import ModbusTcpClient
import pandas as pd import pandas as pd
import time import time
import struct
import math
class HeatPump: class HeatPump:
def __init__(self, ip_address: str): def __init__(self, device_name: str, ip_address: str, port: int = 502,
excel_path: str = "modbus_registers/heat_pump_registers.xlsx",
sheet_name: str = "Register_Map"):
self.device_name = device_name
self.ip = ip_address self.ip = ip_address
self.client = None self.port = port
self.connect_to_modbus() self.client = ModbusTcpClient(self.ip, port=self.port)
self.registers = None
self.get_registers()
def connect_to_modbus(self): self.excel_path = excel_path
port = 502 self.sheet_name = sheet_name
self.client = ModbusTcpClient(self.ip, port=port) self.registers = self.get_registers()
try:
if not self.client.connect(): # -------------
# Connection
# -------------
def connect(self) -> bool:
ok = self.client.connect()
if not ok:
print("Verbindung zur Wärmepumpe fehlgeschlagen.") print("Verbindung zur Wärmepumpe fehlgeschlagen.")
exit(1) return ok
print("Verbindung zur Wärmepumpe erfolgreich.")
except KeyboardInterrupt: def close(self):
print("Beendet durch Benutzer (Ctrl+C).") try:
finally:
self.client.close() self.client.close()
except Exception:
pass
def get_registers(self): # -------------
# Excel-Datei mit den Input-Registerinformationen # Excel parsing
excel_path = "data/ModBus TCPIP 1.17(1).xlsx" # -------------
xls = pd.ExcelFile(excel_path) def get_registers(self) -> dict:
df_input_registers = xls.parse('04 Input Register') df = pd.read_excel(self.excel_path, sheet_name=self.sheet_name)
df = df[df["Register_Type"].astype(str).str.upper() == "IR"].copy()
# Relevante Spalten bereinigen df["Address"] = df["Address"].astype(int)
df_clean = df_input_registers[['MB Adresse', 'Variable', 'Beschreibung', 'Variabel Typ']].dropna() df["Length"] = df["Length"].astype(int)
df_clean['MB Adresse'] = df_clean['MB Adresse'].astype(int) df["Data_Type"] = df["Data_Type"].astype(str).str.upper()
df["Byteorder"] = df["Byteorder"].astype(str).str.upper()
# Dictionary aus Excel erzeugen df["Scaling"] = df.get("Scaling", 1.0)
self.registers = { df["Scaling"] = df["Scaling"].fillna(1.0).astype(float)
row['MB Adresse']: {
'desc': row['Beschreibung'], df["Offset"] = df.get("Offset", 0.0)
'type': 'REAL' if row['Variabel Typ'] == 'REAL' else 'INT' df["Offset"] = df["Offset"].fillna(0.0).astype(float)
}
for _, row in df_clean.iterrows() regs = {}
for _, row in df.iterrows():
regs[int(row["Address"])] = {
"length": int(row["Length"]),
"data_type": row["Data_Type"],
"byteorder": row["Byteorder"],
"scaling": float(row["Scaling"]),
"offset": float(row["Offset"]),
"tag": str(row.get("Tag_Name", "")).strip(),
"desc": "" if pd.isna(row.get("Description")) else str(row.get("Description")).strip(),
} }
return regs
def get_data(self): # -------------
data = {} # Byteorder handling
data['Zeit'] = time.strftime('%Y-%m-%d %H:%M:%S') # -------------
for address, info in self.registers.items(): @staticmethod
reg_type = info['type'] def _registers_to_bytes(registers: list[int], byteorder_code: str) -> bytes:
result = self.client.read_input_registers(address, count=2 if reg_type == 'REAL' else 1) """
registers: Liste von uint16 (0..65535), wie pymodbus sie liefert.
byteorder_code: AB, ABCD, CDAB, BADC, DCBA (gemäß Template)
Rückgabe: bytes in der Reihenfolge, wie sie für struct.unpack benötigt werden.
"""
code = (byteorder_code or "ABCD").upper()
# Pro Register: 16-bit => zwei Bytes (MSB, LSB)
words = [struct.pack(">H", r & 0xFFFF) for r in registers] # big endian pro Wort
if len(words) == 1:
w = words[0] # b'\xAA\xBB'
if code in ("AB", "ABCD", "CDAB"):
return w
if code == "BADC": # byte swap
return w[::-1]
if code == "DCBA": # byte swap (bei 16-bit identisch zu BADC)
return w[::-1]
return w
# 32-bit (2 words) oder 64-bit (4 words): Word/Byte swaps abbilden
# words[0] = high word bytes, words[1] = low word bytes (in Modbus-Reihenfolge gelesen)
if code == "ABCD":
ordered = words
elif code == "CDAB":
# word swap
ordered = words[1:] + words[:1]
elif code == "BADC":
# byte swap innerhalb jedes Words
ordered = [w[::-1] for w in words]
elif code == "DCBA":
# word + byte swap
ordered = [w[::-1] for w in (words[1:] + words[:1])]
else:
ordered = words
return b"".join(ordered)
@staticmethod
def _decode_by_type(raw_bytes: bytes, data_type: str):
dt = (data_type or "").upper()
# struct: > = big endian, < = little endian
# Wir liefern raw_bytes bereits in der richtigen Reihenfolge; daher nutzen wir ">" konsistent.
if dt == "UINT16":
return struct.unpack(">H", raw_bytes[:2])[0]
if dt == "INT16":
return struct.unpack(">h", raw_bytes[:2])[0]
if dt == "UINT32":
return struct.unpack(">I", raw_bytes[:4])[0]
if dt == "INT32":
return struct.unpack(">i", raw_bytes[:4])[0]
if dt == "FLOAT32":
return struct.unpack(">f", raw_bytes[:4])[0]
if dt == "FLOAT64":
return struct.unpack(">d", raw_bytes[:8])[0]
raise ValueError(f"Unbekannter Data_Type: {dt}")
def _decode_value(self, registers: list[int], meta: dict):
raw = self._registers_to_bytes(registers, meta["byteorder"])
val = self._decode_by_type(raw, meta["data_type"])
return (val * meta["scaling"]) + meta["offset"]
# -------------
# Reading
# -------------
def get_state(self) -> dict:
data = {"Zeit": time.strftime("%Y-%m-%d %H:%M:%S")}
if not self.connect():
data["error"] = "connect_failed"
return data
try:
for address, meta in self.registers.items():
count = int(meta["length"])
result = self.client.read_input_registers(address, count=count)
if result.isError(): if result.isError():
print(f"Fehler beim Lesen von Adresse {address}: {result}") print(f"Fehler beim Lesen von Adresse {address}: {result}")
continue continue
if reg_type == 'REAL': try:
value = result.registers[0] / 10.0 value = self._decode_value(result.registers, meta)
else: except Exception as e:
value = result.registers[0] print(f"Decode-Fehler an Adresse {address} ({meta.get('tag','')}): {e}")
continue
# Optional filter
# if self._is_invalid_sentinel(value):
# continue
value = float(value)
desc = meta.get("desc") or ""
field_name = f"{address} - {desc}".strip(" -")
data[field_name] = float(value)
print(f"Adresse {address} - {desc}: {value}")
finally:
self.close()
print(f"Adresse {address} - {info['desc']}: {value}")
data[f"{address} - {info['desc']}"] = value
return data return data

40
main.py
View File

@@ -1,17 +1,45 @@
import time import time
from datetime import datetime from datetime import datetime
from data_base_csv import DataBaseCsv from data_base_influx import DataBaseInflux
from heat_pump import HeatPump from heat_pump import HeatPump
from pv_inverter import PvInverter
from solaredge_meter import SolaredgeMeter
from shelly_pro_3m import ShellyPro3m
from energysystem import EnergySystem
from sg_ready_controller import SgReadyController
interval = 10 # z.B. alle 10 Sekunden interval_seconds = 10
db = DataBaseCsv('modbus_log.csv') es = EnergySystem()
hp = HeatPump(ip_address='10.0.0.10')
db = DataBaseInflux(
url="http://192.168.1.146:8086",
token="Cw_naEZyvJ3isiAh1P4Eq3TsjcHmzzDFS7SlbKDsS6ZWL04fMEYixWqtNxGThDdG27S9aW5g7FP9eiq5z1rsGA==",
org="allmende",
bucket="allmende_db_v3"
)
hp_master = HeatPump(device_name='hp_master', ip_address='10.0.0.10', port=502)
hp_slave = HeatPump(device_name='hp_slave', ip_address='10.0.0.11', port=502)
shelly = ShellyPro3m(device_name='wohnung_2_6', ip_address='192.168.1.121')
wr = PvInverter(device_name='solaredge_master', ip_address='192.168.1.112')
meter = SolaredgeMeter(device_name='solaredge_meter', ip_address='192.168.1.112')
es.add_components(hp_master, hp_slave, shelly, wr, meter)
controller = SgReadyController(es)
now = datetime.now()
while True: while True:
now = datetime.now() now = datetime.now()
if now.second % interval == 0 and now.microsecond < 100_000: if now.second % interval_seconds == 0 and now.microsecond < 100_000:
db.store_data(hp.get_data()) state = es.get_state_and_store_to_database(db)
mode = controller.perform_action(heat_pump_name='hp_master', meter_name='solaredge_meter', state=state)
if mode == 'mode1':
mode_as_binary = 0
else:
mode_as_binary = 1
db.store_data('sg_ready', {'mode': mode_as_binary})
time.sleep(0.1) time.sleep(0.1)

File diff suppressed because it is too large Load Diff

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

139
pv_inverter.py Normal file
View File

@@ -0,0 +1,139 @@
import time
import struct
import pandas as pd
from typing import Dict, Any, List, Tuple, Optional
from pymodbus.client import ModbusTcpClient
EXCEL_PATH = "modbus_registers/pv_inverter_registers.xlsx"
# Obergrenze: bis EXKLUSIVE 40206 (d.h. max. 40205)
MAX_ADDR_EXCLUSIVE = 40121
class PvInverter:
def __init__(self, device_name: str, ip_address: str, port: int = 502, unit: int = 1):
self.device_name = device_name
self.ip = ip_address
self.port = port
self.unit = unit
self.client: Optional[ModbusTcpClient] = None
self.registers: Dict[int, Dict[str, Any]] = {} # addr -> {"desc":..., "type":...}
self.connect_to_modbus()
self.load_registers(EXCEL_PATH)
# ---------- Verbindung ----------
def connect_to_modbus(self):
self.client = ModbusTcpClient(self.ip, port=self.port, timeout=3.0, retries=3)
if not self.client.connect():
print("❌ Verbindung zu Wechselrichter fehlgeschlagen.")
raise SystemExit(1)
print("✅ Verbindung zu Wechselrichter hergestellt.")
def close(self):
if self.client:
self.client.close()
self.client = None
# ---------- Register-Liste ----------
def load_registers(self, excel_path: str):
xls = pd.ExcelFile(excel_path)
df = xls.parse()
# Passe Spaltennamen hier an, falls nötig:
cols = ["MB Adresse", "Beschreibung", "Variabel Typ"]
df = df[cols].dropna()
df["MB Adresse"] = df["MB Adresse"].astype(int)
# 1) Vorab-Filter: nur Adressen < 40206 übernehmen
df = df[df["MB Adresse"] < MAX_ADDR_EXCLUSIVE]
self.registers = {
int(row["MB Adresse"]): {
"desc": str(row["Beschreibung"]).strip(),
"type": str(row["Variabel Typ"]).strip()
}
for _, row in df.iterrows()
}
# ---------- Low-Level Lesen ----------
def _try_read(self, fn_name: str, address: int, count: int) -> Optional[List[int]]:
fn = getattr(self.client, fn_name)
# pymodbus 3.8.x hat 'slave='; Fallbacks schaden nicht
for kwargs in (dict(address=address, count=count, slave=self.unit),
dict(address=address, count=count)):
try:
res = fn(**kwargs)
if res is None or (hasattr(res, "isError") and res.isError()):
continue
return res.registers
except TypeError:
continue
return None
def _read_any(self, address: int, count: int) -> Optional[List[int]]:
regs = self._try_read("read_holding_registers", address, count)
if regs is None:
regs = self._try_read("read_input_registers", address, count)
return regs
# ---------- Decoding ----------
@staticmethod
def _to_i16(u16: int) -> int:
return struct.unpack(">h", struct.pack(">H", u16))[0]
@staticmethod
def _to_f32_from_two(u16_hi: int, u16_lo: int, msw_first: bool = True) -> float:
b = struct.pack(">HH", u16_hi, u16_lo) if msw_first else struct.pack(">HH", u16_lo, u16_hi)
return struct.unpack(">f", b)[0]
# Hilfsfunktion: wie viele 16-Bit-Register braucht dieser Typ?
@staticmethod
def _word_count_for_type(rtype: str) -> int:
rt = (rtype or "").lower()
# Passe hier an deine Excel-Typen an:
if "uint32" in rt or "real" in rt or "float" in rt or "string(32)" in rt:
return 2
# Default: 1 Wort (z.B. int16/uint16)
return 1
def read_one(self, address_excel: int, rtype: str) -> Optional[float]:
"""
Liest einen Wert nach Typ ('INT' oder 'REAL' etc.).
Es werden ausschließlich Register < 40206 gelesen.
"""
addr = int(address_excel)
words = self._word_count_for_type(rtype)
# 2) Harte Grenze prüfen: höchstes angefasstes Register muss < 40206 sein
if addr + words - 1 >= MAX_ADDR_EXCLUSIVE:
# Überspringen, da der Lesevorgang die Grenze >= 40206 berühren würde
return None
if words == 2:
regs = self._read_any(addr, 2)
if not regs or len(regs) < 2:
return None
# Deine bisherige Logik interpretiert 2 Worte als Float32:
return self._to_f32_from_two(regs[0], regs[1])
else:
regs = self._read_any(addr, 1)
if not regs:
return None
return float(self._to_i16(regs[0]))
def get_state(self) -> Dict[str, Any]:
"""
Liest ALLE Register aus self.registers und gibt dict zurück.
Achtet darauf, dass keine Adresse (inkl. Mehrwort) >= 40206 gelesen wird.
"""
data = {"Zeit": time.strftime("%Y-%m-%d %H:%M:%S")}
for address, meta in sorted(self.registers.items()):
words = self._word_count_for_type(meta["type"])
# 3) Nochmals Schutz auf Ebene der Iteration:
if address + words - 1 >= MAX_ADDR_EXCLUSIVE:
continue
val = self.read_one(address, meta["type"])
if val is None:
continue
key = f"{address} - {meta['desc']}"
data[key] = val
return data

View File

@@ -1,3 +1,5 @@
pymodbus~=3.8.6 pymodbus~=3.8.6
pandas pandas
openpyxl openpyxl
sshtunnel
pvlib

65
sg_ready_controller.py Normal file
View File

@@ -0,0 +1,65 @@
from pymodbus.client import ModbusTcpClient
class SgReadyController():
def __init__(self, es):
self.es = es
def perform_action(self, heat_pump_name, meter_name, state):
hp = self.es.get_component_by_name(heat_pump_name)
meter_values = state[meter_name]
power_to_grid = meter_values['40206 - M_AC_Power'] * 10 ** meter_values['40210 - M_AC_Power_SF']
mode = None
if power_to_grid > 10000:
mode = 'mode2'
self.switch_sg_ready_mode(hp.ip, hp.port, mode)
elif power_to_grid < 0:
mode = 'mode1'
self.switch_sg_ready_mode(hp.ip, hp.port, mode)
return mode
def switch_sg_ready_mode(self, ip, port, mode):
"""
Register 300: 1=BUS 0= Hardware Kontakte
Register 301 & 302:
0-0= Kein Offset
0-1 Boiler und Heizung Offset
1-1 Boiler Offset + E-Einsatz Sollwert Erhöht
1-0 SG EVU Sperre
:param ip:
:param mode:
'mode1' = [True, False, False] => SG Ready deactivated
'mode2' = [True, False, True] => SG ready activated for heatpump only
'mode3' = [True, True, True] => SG ready activated for heatpump and heat rod
:return:
"""
client = ModbusTcpClient(ip, port=port)
if not client.connect():
print("Verbindung zur Wärmepumpe fehlgeschlagen.")
return
mode_code = None
if mode == 'mode1':
mode_code = [True, False, False]
elif mode == 'mode2':
mode_code = [True, False, True]
elif mode == 'mode3':
mode_code = [True, True, True]
else:
print('Uncorrect or no string for mode!')
try:
response_300 = client.write_coil(300, mode_code[0])
response_301 = client.write_coil(301, mode_code[1])
response_302 = client.write_coil(302, mode_code[2])
# Optional: Rückmeldungen prüfen
for addr, resp in zip([300, 301, 302], [response_300, response_301, response_302]):
if resp.isError():
print(f"Fehler beim Schreiben von Coil {addr}: {resp}")
else:
print(f"Coil {addr} erfolgreich geschrieben.")
finally:
client.close()

64
shelly_pro_3m.py Normal file
View File

@@ -0,0 +1,64 @@
import struct
from pymodbus.client import ModbusTcpClient
import pandas as pd
import time
class ShellyPro3m:
def __init__(self, device_name: str, ip_address: str, port: int=502):
self.device_name = device_name
self.ip = ip_address
self.port = port
self.client = None
self.connect_to_modbus()
self.registers = None
self.get_registers()
def connect_to_modbus(self):
port = self.port
self.client = ModbusTcpClient(self.ip, port=port)
try:
if not self.client.connect():
print("Verbindung zum Shelly-Logger fehlgeschlagen.")
exit(1)
print("Verbindung zum Shelly-Logger erfolgreich.")
except KeyboardInterrupt:
print("Beendet durch Benutzer (Ctrl+C).")
finally:
self.client.close()
def get_registers(self):
# Excel-Datei mit den Input-Registerinformationen
excel_path = "modbus_registers/shelly_pro_3m_registers.xlsx"
xls = pd.ExcelFile(excel_path)
df_input_registers = xls.parse()
# Relevante Spalten bereinigen
df_clean = df_input_registers[['MB Adresse', 'Beschreibung', 'Variabel Typ']].dropna()
df_clean['MB Adresse'] = df_clean['MB Adresse'].astype(int)
# Dictionary aus Excel erzeugen
self.registers = {
row['MB Adresse']: {
'desc': row['Beschreibung'],
'type': 'REAL' if row['Variabel Typ'] == 'REAL' else 'INT'
}
for _, row in df_clean.iterrows()
}
def get_state(self):
data = {}
data['Zeit'] = time.strftime('%Y-%m-%d %H:%M:%S')
for address, info in self.registers.items():
reg_type = info['type']
result = self.client.read_input_registers(address, count=2 if reg_type == 'REAL' else 1)
if result.isError():
print(f"Fehler beim Lesen von Adresse {address}: {result}")
continue
packed = struct.pack(">HH", result.registers[1], result.registers[0])
value = round(struct.unpack(">f", packed)[0], 2)
print(f"Adresse {address} - {info['desc']}: {value}")
data[f"{address} - {info['desc']}"] = value
return data

View File

@@ -0,0 +1,210 @@
from __future__ import annotations
from dataclasses import dataclass
from typing import Optional, Dict, List, Literal, Tuple, Union
import numpy as np
import pandas as pd
import pvlib
import matplotlib.pyplot as plt
from pvlib.location import Location
from pvlib.pvsystem import PVSystem
from pvlib.modelchain import ModelChain
SeriesOrArray = Union[pd.Series, np.ndarray]
# ----------------------------- Konfiguration -----------------------------
@dataclass
class PvWattsSubarrayConfig:
name: str
pdc0_w: float # STC-DC-Leistung [W]
tilt_deg: float # Neigung (0=horizontal)
azimuth_deg: float # Azimut (180=Süd)
gamma_pdc: float = -0.004 # Tempkoeff. [1/K]
eta_inv_nom: float = 0.96 # WR-Wirkungsgrad (nominal)
albedo: float = 0.2 # Bodenreflexion
# Pauschale Verluste (PVWatts-Losses)
dc_loss: float = 0.0
ac_loss: float = 0.0
soiling: float = 0.0
# Modell
transposition_model: Literal["perez","haydavies","isotropic","klucher","reindl"] = "perez"
# ------------------------------ Subarray ---------------------------------
class PvWattsSubarray:
"""
Ein Subarray mit pvlib.ModelChain (PVWatts).
Berechnet automatisch DNI/DHI aus GHI (ERBS-Methode)
und nutzt ein SAPM-Temperaturmodell.
"""
def __init__(self, cfg: PvWattsSubarrayConfig, location: Location):
self.cfg = cfg
self.location = location
self._mc: Optional[ModelChain] = None
# ---------------------------------------------------------------------
def _create_modelchain(self) -> ModelChain:
"""Erzeuge eine pvlib.ModelChain-Instanz mit PVWatts-Parametern."""
temp_params = pvlib.temperature.TEMPERATURE_MODEL_PARAMETERS["sapm"]["open_rack_glass_polymer"]
system = PVSystem(
surface_tilt=self.cfg.tilt_deg,
surface_azimuth=self.cfg.azimuth_deg,
module_parameters={"pdc0": self.cfg.pdc0_w, "gamma_pdc": self.cfg.gamma_pdc},
inverter_parameters={"pdc0": self.cfg.pdc0_w, "eta_inv_nom": self.cfg.eta_inv_nom},
albedo=self.cfg.albedo,
temperature_model_parameters=temp_params,
module_type="glass_polymer",
racking_model="open_rack",
)
mc = ModelChain(
system, self.location,
transposition_model=self.cfg.transposition_model,
solar_position_method="nrel_numpy",
airmass_model="kastenyoung1989",
dc_model="pvwatts",
ac_model="pvwatts",
aoi_model="physical",
spectral_model=None,
losses_model="pvwatts",
temperature_model="sapm",
)
mc.losses_parameters = {
"dc_loss": float(self.cfg.dc_loss),
"ac_loss": float(self.cfg.ac_loss),
"soiling": float(self.cfg.soiling),
}
self._mc = mc
return mc
# ---------------------------------------------------------------------
def calc_dni_and_dhi(self, weather: pd.DataFrame) -> pd.DataFrame:
"""
Berechnet DNI & DHI aus GHI über die ERBS-Methode.
Gibt ein neues DataFrame mit 'ghi', 'dni', 'dhi' zurück.
"""
if "ghi" not in weather:
raise ValueError("Wetterdaten benötigen mindestens 'ghi'.")
# Sonnenstand bestimmen
sp = self.location.get_solarposition(weather.index)
erbs = pvlib.irradiance.erbs(weather["ghi"], sp["zenith"], weather.index)
out = weather.copy()
out["dni"] = erbs["dni"].clip(lower=0)
out["dhi"] = erbs["dhi"].clip(lower=0)
return out
# ---------------------------------------------------------------------
def _prepare_weather(self, weather: pd.DataFrame) -> pd.DataFrame:
"""Sichert vollständige Spalten (ghi, dni, dhi, temp_air, wind_speed)."""
if "ghi" not in weather or "temp_air" not in weather:
raise ValueError("weather benötigt Spalten: 'ghi' und 'temp_air'.")
w = weather.copy()
# Zeitzone prüfen
if w.index.tz is None:
w.index = w.index.tz_localize(self.location.tz)
else:
if str(w.index.tz) != str(self.location.tz):
w = w.tz_convert(self.location.tz)
# Wind default
if "wind_speed" not in w:
w["wind_speed"] = 1.0
# DNI/DHI ergänzen (immer mit ERBS)
if "dni" not in w or "dhi" not in w:
w = self.calc_dni_and_dhi(w)
return w
# ---------------------------------------------------------------------
def get_power(self, weather: pd.DataFrame) -> pd.Series:
"""
Berechnet AC-Leistung aus Wetterdaten.
"""
w = self._prepare_weather(weather)
mc = self._create_modelchain()
mc.run_model(weather=w)
return mc.results.ac.rename(self.cfg.name)
# ------------------------------- Anlage ----------------------------------
class PvWattsPlant:
"""
Eine PV-Anlage mit mehreren Subarrays, die ein gemeinsames Wetter-DataFrame nutzt.
"""
def __init__(self, site: Location, subarray_cfgs: List[PvWattsSubarrayConfig]):
self.site = site
self.subs: Dict[str, PvWattsSubarray] = {c.name: PvWattsSubarray(c, site) for c in subarray_cfgs}
def get_power(
self,
weather: pd.DataFrame,
*,
return_breakdown: bool = False
) -> pd.Series | Tuple[pd.Series, Dict[str, pd.Series]]:
"""Berechne Gesamtleistung und optional Einzel-Subarrays."""
parts: Dict[str, pd.Series] = {name: sub.get_power(weather) for name, sub in self.subs.items()}
# gemeinsamen Index bilden
idx = list(parts.values())[0].index
for s in parts.values():
idx = idx.intersection(s.index)
parts = {k: v.reindex(idx).fillna(0.0) for k, v in parts.items()}
total = sum(parts.values())
total.name = "total_ac"
if return_breakdown:
return total, parts
return total
# --------------------------- Beispielnutzung -----------------------------
if __name__ == "__main__":
# Standort
site = Location(latitude=52.52, longitude=13.405, altitude=35, tz="Europe/Berlin", name="Berlin")
# Zeitachse: 1 Tag, 15-minütig
times = pd.date_range("2025-06-21 00:00", "2025-06-21 23:45", freq="15min", tz=site.tz)
# Dummy-Wetter
ghi = 1000 * np.clip(np.sin(np.linspace(0, np.pi, len(times)))**1.2, 0, None)
temp_air = 16 + 8 * np.clip(np.sin(np.linspace(-np.pi/2, np.pi/2, len(times))), 0, None)
wind = np.full(len(times), 1.0)
weather = pd.DataFrame(index=times)
weather["ghi"] = ghi
weather["temp_air"] = temp_air
weather["wind_speed"] = wind
# Zwei Subarrays
cfgs = [
PvWattsSubarrayConfig(name="Sued_30", pdc0_w=6000, tilt_deg=30, azimuth_deg=180, dc_loss=0.02, ac_loss=0.01),
PvWattsSubarrayConfig(name="West_20", pdc0_w=4000, tilt_deg=20, azimuth_deg=270, soiling=0.02),
]
plant = PvWattsPlant(site, cfgs)
# Simulation
total, parts = plant.get_power(weather, return_breakdown=True)
# Plot
plt.figure(figsize=(10, 6))
plt.plot(total.index, total / 1000, label="Gesamtleistung (AC)", linewidth=2, color="black")
for name, s in parts.items():
plt.plot(s.index, s / 1000, label=name)
plt.title("PV-Leistung (PVWatts, ERBS-Methode für DNI/DHI)")
plt.ylabel("Leistung [kW]")
plt.xlabel("Zeit")
plt.legend()
plt.grid(True, linestyle="--", alpha=0.5)
plt.tight_layout()
plt.show()

134
solaredge_meter.py Normal file
View File

@@ -0,0 +1,134 @@
import time
import struct
import pandas as pd
from typing import Dict, Any, List, Tuple, Optional
from pymodbus.client import ModbusTcpClient
EXCEL_PATH = "modbus_registers/pv_inverter_registers.xlsx"
# Obergrenze: bis EXKLUSIVE 40206 (d.h. max. 40205)
MIN_ADDR_INCLUSIVE = 40121
ADDRESS_SHIFT = 50
class SolaredgeMeter:
def __init__(self, device_name: str, ip_address: str, port: int = 502, unit: int = 1):
self.device_name = device_name
self.ip = ip_address
self.port = port
self.unit = unit
self.client: Optional[ModbusTcpClient] = None
self.registers: Dict[int, Dict[str, Any]] = {} # addr -> {"desc":..., "type":...}
self.connect_to_modbus()
self.load_registers(EXCEL_PATH)
# ---------- Verbindung ----------
def connect_to_modbus(self):
self.client = ModbusTcpClient(self.ip, port=self.port, timeout=3.0, retries=3)
if not self.client.connect():
print("❌ Verbindung zu Zähler fehlgeschlagen.")
raise SystemExit(1)
print("✅ Verbindung zu Zähler hergestellt.")
def close(self):
if self.client:
self.client.close()
self.client = None
# ---------- Register-Liste ----------
def load_registers(self, excel_path: str):
xls = pd.ExcelFile(excel_path)
df = xls.parse()
# Passe Spaltennamen hier an, falls nötig:
cols = ["MB Adresse", "Beschreibung", "Variabel Typ"]
df = df[cols].dropna()
df["MB Adresse"] = df["MB Adresse"].astype(int)
# 1) Vorab-Filter: nur Adressen < 40206 übernehmen
df = df[df["MB Adresse"] >= MIN_ADDR_INCLUSIVE]
self.registers = {
int(row["MB Adresse"]): {
"desc": str(row["Beschreibung"]).strip(),
"type": str(row["Variabel Typ"]).strip()
}
for _, row in df.iterrows()
}
# ---------- Low-Level Lesen ----------
def _try_read(self, fn_name: str, address: int, count: int) -> Optional[List[int]]:
fn = getattr(self.client, fn_name)
# pymodbus 3.8.x hat 'slave='; Fallbacks schaden nicht
shifted_addr = address + ADDRESS_SHIFT
for kwargs in (dict(address=shifted_addr, count=count, slave=self.unit),
dict(address=shifted_addr, count=count)):
try:
res = fn(**kwargs)
if res is None or (hasattr(res, "isError") and res.isError()):
continue
return res.registers
except TypeError:
continue
return None
def _read_any(self, address: int, count: int) -> Optional[List[int]]:
regs = self._try_read("read_holding_registers", address, count)
if regs is None:
regs = self._try_read("read_input_registers", address, count)
return regs
# ---------- Decoding ----------
@staticmethod
def _to_i16(u16: int) -> int:
return struct.unpack(">h", struct.pack(">H", u16))[0]
@staticmethod
def _to_f32_from_two(u16_hi: int, u16_lo: int, msw_first: bool = True) -> float:
b = struct.pack(">HH", u16_hi, u16_lo) if msw_first else struct.pack(">HH", u16_lo, u16_hi)
return struct.unpack(">f", b)[0]
# Hilfsfunktion: wie viele 16-Bit-Register braucht dieser Typ?
@staticmethod
def _word_count_for_type(rtype: str) -> int:
rt = (rtype or "").lower()
# Passe hier an deine Excel-Typen an:
if "uint32" in rt or "real" in rt or "float" in rt or "string(32)" in rt:
return 2
# Default: 1 Wort (z.B. int16/uint16)
return 1
def read_one(self, address_excel: int, rtype: str) -> Optional[float]:
"""
Liest einen Wert nach Typ ('INT' oder 'REAL' etc.).
Es werden ausschließlich Register < 40206 gelesen.
"""
addr = int(address_excel)
words = self._word_count_for_type(rtype)
if words == 2:
regs = self._read_any(addr, 2)
if not regs or len(regs) < 2:
return None
# Deine bisherige Logik interpretiert 2 Worte als Float32:
return self._to_f32_from_two(regs[0], regs[1])
else:
regs = self._read_any(addr, 1)
if not regs:
return None
return float(self._to_i16(regs[0]))
def get_state(self) -> Dict[str, Any]:
"""
Liest ALLE Register aus self.registers und gibt dict zurück.
Achtet darauf, dass keine Adresse (inkl. Mehrwort) >= 40206 gelesen wird.
"""
data = {"Zeit": time.strftime("%Y-%m-%d %H:%M:%S")}
for address, meta in sorted(self.registers.items()):
words = self._word_count_for_type(meta["type"])
val = self.read_one(address, meta["type"])
if val is None:
continue
key = f"{address} - {meta['desc']}"
data[key] = val
return data

99698
terminal_log

File diff suppressed because it is too large Load Diff