8 Commits

Author SHA1 Message Date
Nils Reiners
afbcd81310 transformation of data from old database to new was updated. 2026-01-08 10:18:24 +01:00
Nils Reiners
fd87257f37 data are written in new database as the datatypes of the original database does not fit any more. 2026-01-06 23:07:06 +01:00
Nils Reiners
876115cf6e daten werden nicht korrekt in datenbank abgelegt 2026-01-06 18:51:35 +01:00
Nils Reiners
f0e7c1338b path to hp excel was modified 2026-01-06 17:13:14 +01:00
Nils Reiners
8642a057f0 excel sheet for heat pump registers now in template form. tested with script that was also added in folder. sg-ready testing file was also added. 2026-01-06 17:01:50 +01:00
Nils Reiners
ce14d59d51 adresse für hp angepasst 2026-01-05 17:15:25 +01:00
Nils Reiners
4727364048 scheint zu laufen 2025-12-09 22:07:57 +01:00
Nils Reiners
666eb211a3 old version of pv_forecaster restored 2025-10-29 22:03:46 +01:00
16 changed files with 578 additions and 217 deletions

View File

@@ -0,0 +1,7 @@
from heat_pump import HeatPump
hp_master = HeatPump(device_name='hp_master', ip_address='10.0.0.10', port=502, excel_path="../modbus_registers/heat_pump_registers.xlsx")
state = hp_master.get_state()
print(state)

View File

@@ -0,0 +1,49 @@
from pymodbus.client import ModbusTcpClient
def switch_sg_ready_mode(ip, port, mode):
"""
Register 300: 1=BUS 0= Hardware Kontakte
Register 301 & 302:
0-0= Kein Offset
0-1 Boiler und Heizung Offset
1-1 Boiler Offset + E-Einsatz Sollwert Erhöht
1-0 SG EVU Sperre
:param ip:
:param mode:
'mode1' = [True, False, False] => SG Ready deactivated
'mode2' = [True, False, True] => SG ready activated for heatpump only
'mode3' = [True, True, True] => SG ready activated for heatpump and heat rod
:return:
"""
client = ModbusTcpClient(ip, port=port)
if not client.connect():
print("Verbindung zur Wärmepumpe fehlgeschlagen.")
return
mode_code = None
if mode == 'mode1':
mode_code = [True, False, False]
elif mode == 'mode2':
mode_code = [True, False, True]
elif mode == 'mode3':
mode_code = [True, True, True]
else:
print('Uncorrect or no string for mode!')
try:
response_300 = client.write_coil(300, mode_code[0])
response_301 = client.write_coil(301, mode_code[1])
response_302 = client.write_coil(302, mode_code[2])
# Optional: Rückmeldungen prüfen
for addr, resp in zip([300, 301, 302], [response_300, response_301, response_302]):
if resp.isError():
print(f"Fehler beim Schreiben von Coil {addr}: {resp}")
else:
print(f"Coil {addr} erfolgreich geschrieben.")
finally:
client.close()
if '__name__' == '__main__':
switch_sg_ready_mode(ip='10.0.0.10', port=502, mode='mode2')

View File

@@ -0,0 +1,213 @@
import os, re, math, time
from datetime import datetime, timezone, timedelta
import pandas as pd
from influxdb_client import InfluxDBClient, Point, WritePrecision
from influxdb_client.client.write_api import SYNCHRONOUS
from influxdb_client.rest import ApiException
# -----------------------
# CONFIG
# -----------------------
INFLUX_URL = "http://192.168.1.146:8086"
INFLUX_ORG = "allmende"
INFLUX_TOKEN = os.environ.get("INFLUX_TOKEN", "Cw_naEZyvJ3isiAh1P4Eq3TsjcHmzzDFS7SlbKDsS6ZWL04fMEYixWqtNxGThDdG27S9aW5g7FP9eiq5z1rsGA==")
SOURCE_BUCKET = "allmende_db"
TARGET_BUCKET = "allmende_db_v2"
MEASUREMENTS = [
"hp_master", "hp_slave", "pv_forecast", "sg_ready",
"solaredge_master", "solaredge_meter", "solaredge_slave", "wohnung_2_6"
]
START_DT = datetime(2025, 6, 1, tzinfo=timezone.utc)
STOP_DT = datetime.now(timezone.utc)
WINDOW = timedelta(days=1)
EXCEL_PATH = "../modbus_registers/heat_pump_registers.xlsx"
EXCEL_SHEET = "Register_Map"
BATCH_SIZE = 1000
MAX_RETRIES = 8
# -----------------------
# Helpers
# -----------------------
def normalize(s) -> str:
s = "" if s is None else str(s).strip()
return re.sub(r"\s+", " ", s)
def is_invalid_sentinel(v: float) -> bool:
return v in (-999.9, -999.0, 30000.0, 32767.0, 65535.0)
def ensure_bucket(client: InfluxDBClient, name: str):
bapi = client.buckets_api()
if bapi.find_bucket_by_name(name):
return
bapi.create_bucket(bucket_name=name, org=INFLUX_ORG, retention_rules=None)
def build_field_type_map_from_excel(path: str) -> dict[str, str]:
df = pd.read_excel(path, sheet_name=EXCEL_SHEET)
df = df[df["Register_Type"].astype(str).str.upper() == "IR"].copy()
df["Address"] = df["Address"].astype(int)
df["Description"] = df["Description"].fillna("").astype(str)
df["Tag_Name"] = df["Tag_Name"].fillna("").astype(str)
df["Data_Type"] = df["Data_Type"].fillna("").astype(str)
m: dict[str, str] = {}
for _, r in df.iterrows():
addr = int(r["Address"])
desc = normalize(r["Description"])
tag = normalize(r["Tag_Name"])
dtp = normalize(r["Data_Type"]).upper()
if tag:
m[tag] = dtp
old_key = normalize(f"{addr} - {desc}".strip(" -"))
if old_key:
m[old_key] = dtp
return m
def coerce_value_to_dtype(v, dtype: str):
if v is None:
return None
dtp = (dtype or "").upper()
if isinstance(v, (int, float)):
fv = float(v)
if math.isnan(fv) or math.isinf(fv):
return None
if dtp in ("BOOL", "BOOLEAN"):
if isinstance(v, bool): return v
if isinstance(v, (int, float)): return bool(int(v))
return None
if dtp.startswith("INT") or dtp.startswith("UINT"):
if isinstance(v, bool): return int(v)
if isinstance(v, (int, float)): return int(float(v))
return None
if dtp.startswith("FLOAT") or dtp in ("DOUBLE",):
if isinstance(v, bool): return float(int(v))
if isinstance(v, (int, float)): return float(v)
return None
return None
def write_with_retry(write_api, batch):
delay = 1.0
last_msg = ""
for _ in range(MAX_RETRIES):
try:
write_api.write(bucket=TARGET_BUCKET, org=INFLUX_ORG, record=batch)
return
except ApiException as e:
last_msg = getattr(e, "body", "") or str(e)
status = getattr(e, "status", None)
if "timeout" in last_msg.lower() or status in (429, 500, 502, 503, 504):
time.sleep(delay)
delay = min(delay * 2, 30)
continue
raise
raise RuntimeError(f"Write failed after {MAX_RETRIES} retries: {last_msg}")
def window_already_migrated(query_api, measurement: str, start: datetime, stop: datetime) -> bool:
# Prüft: gibt es im Zielbucket im Fenster mindestens 1 Punkt?
flux = f'''
from(bucket: "{TARGET_BUCKET}")
|> range(start: time(v: "{start.isoformat()}"), stop: time(v: "{stop.isoformat()}"))
|> filter(fn: (r) => r._measurement == "{measurement}")
|> limit(n: 1)
'''
tables = query_api.query(flux, org=INFLUX_ORG)
for t in tables:
if t.records:
return True
return False
def migrate_window(query_api, write_api, measurement: str,
start: datetime, stop: datetime,
type_map: dict[str, str],
do_type_cast: bool) -> int:
flux = f'''
from(bucket: "{SOURCE_BUCKET}")
|> range(start: time(v: "{start.isoformat()}"), stop: time(v: "{stop.isoformat()}"))
|> filter(fn: (r) => r._measurement == "{measurement}")
|> keep(columns: ["_time","_measurement","_field","_value"])
'''
tables = query_api.query(flux, org=INFLUX_ORG)
batch, written = [], 0
for table in tables:
for rec in table.records:
t = rec.get_time()
field = normalize(rec.get_field())
value = rec.get_value()
if value is None:
continue
if do_type_cast:
dtp = type_map.get(field)
if dtp:
cv = coerce_value_to_dtype(value, dtp)
if cv is None:
continue
if isinstance(cv, (int, float)) and is_invalid_sentinel(float(cv)):
continue
value = cv
# kein Mapping -> unverändert schreiben
batch.append(Point(measurement).field(field, value).time(t, WritePrecision.NS))
if len(batch) >= BATCH_SIZE:
write_with_retry(write_api, batch)
written += len(batch)
batch = []
if batch:
write_with_retry(write_api, batch)
written += len(batch)
return written
# -----------------------
# Main
# -----------------------
def main():
if not INFLUX_TOKEN:
raise RuntimeError("INFLUX_TOKEN fehlt (Env-Var INFLUX_TOKEN setzen).")
with InfluxDBClient(url=INFLUX_URL, token=INFLUX_TOKEN, org=INFLUX_ORG, timeout=900_000) as client:
ensure_bucket(client, TARGET_BUCKET)
type_map = build_field_type_map_from_excel(EXCEL_PATH)
query_api = client.query_api()
write_api = client.write_api(write_options=SYNCHRONOUS)
for meas in MEASUREMENTS:
do_cast = meas in ("hp_master", "hp_slave")
cur, total = START_DT, 0
print(f"\n== {meas} (cast={'ON' if do_cast else 'OFF'}) ==")
while cur < STOP_DT:
nxt = min(cur + WINDOW, STOP_DT)
if window_already_migrated(query_api, meas, cur, nxt):
print(f"{cur.isoformat()} -> {nxt.isoformat()} : SKIP (existiert schon)")
cur = nxt
continue
n = migrate_window(query_api, write_api, meas, cur, nxt, type_map, do_cast)
total += n
print(f"{cur.isoformat()} -> {nxt.isoformat()} : {n} (gesamt {total})")
cur = nxt
print(f"== Fertig {meas}: {total} Punkte ==")
if __name__ == "__main__":
main()

View File

@@ -1,64 +1,173 @@
from pymodbus.client import ModbusTcpClient
import pandas as pd
import time
import struct
import math
class HeatPump:
def __init__(self, device_name: str, ip_address: str, port: int=502):
def __init__(self, device_name: str, ip_address: str, port: int = 502,
excel_path: str = "modbus_registers/heat_pump_registers.xlsx",
sheet_name: str = "Register_Map"):
self.device_name = device_name
self.ip = ip_address
self.port = port
self.client = None
self.connect_to_modbus()
self.registers = None
self.get_registers()
self.client = ModbusTcpClient(self.ip, port=self.port)
def connect_to_modbus(self):
port = self.port
self.client = ModbusTcpClient(self.ip, port=port)
try:
if not self.client.connect():
self.excel_path = excel_path
self.sheet_name = sheet_name
self.registers = self.get_registers()
# -------------
# Connection
# -------------
def connect(self) -> bool:
ok = self.client.connect()
if not ok:
print("Verbindung zur Wärmepumpe fehlgeschlagen.")
exit(1)
print("Verbindung zur Wärmepumpe erfolgreich.")
except KeyboardInterrupt:
print("Beendet durch Benutzer (Ctrl+C).")
finally:
return ok
def close(self):
try:
self.client.close()
except Exception:
pass
def get_registers(self):
# Excel-Datei mit den Input-Registerinformationen
excel_path = "modbus_registers/heat_pump_registers.xlsx"
xls = pd.ExcelFile(excel_path)
df_input_registers = xls.parse('04 Input Register')
# -------------
# Excel parsing
# -------------
def get_registers(self) -> dict:
df = pd.read_excel(self.excel_path, sheet_name=self.sheet_name)
df = df[df["Register_Type"].astype(str).str.upper() == "IR"].copy()
# Relevante Spalten bereinigen
df_clean = df_input_registers[['MB Adresse', 'Variable', 'Beschreibung', 'Variabel Typ']].dropna()
df_clean['MB Adresse'] = df_clean['MB Adresse'].astype(int)
df["Address"] = df["Address"].astype(int)
df["Length"] = df["Length"].astype(int)
df["Data_Type"] = df["Data_Type"].astype(str).str.upper()
df["Byteorder"] = df["Byteorder"].astype(str).str.upper()
# Dictionary aus Excel erzeugen
self.registers = {
row['MB Adresse']: {
'desc': row['Beschreibung'],
'type': 'REAL' if row['Variabel Typ'] == 'REAL' else 'INT'
}
for _, row in df_clean.iterrows()
df["Scaling"] = df.get("Scaling", 1.0)
df["Scaling"] = df["Scaling"].fillna(1.0).astype(float)
df["Offset"] = df.get("Offset", 0.0)
df["Offset"] = df["Offset"].fillna(0.0).astype(float)
regs = {}
for _, row in df.iterrows():
regs[int(row["Address"])] = {
"length": int(row["Length"]),
"data_type": row["Data_Type"],
"byteorder": row["Byteorder"],
"scaling": float(row["Scaling"]),
"offset": float(row["Offset"]),
"tag": str(row.get("Tag_Name", "")).strip(),
"desc": "" if pd.isna(row.get("Description")) else str(row.get("Description")).strip(),
}
return regs
def get_state(self):
data = {}
data['Zeit'] = time.strftime('%Y-%m-%d %H:%M:%S')
for address, info in self.registers.items():
reg_type = info['type']
result = self.client.read_input_registers(address, count=2 if reg_type == 'REAL' else 1)
# -------------
# Byteorder handling
# -------------
@staticmethod
def _registers_to_bytes(registers: list[int], byteorder_code: str) -> bytes:
"""
registers: Liste von uint16 (0..65535), wie pymodbus sie liefert.
byteorder_code: AB, ABCD, CDAB, BADC, DCBA (gemäß Template)
Rückgabe: bytes in der Reihenfolge, wie sie für struct.unpack benötigt werden.
"""
code = (byteorder_code or "ABCD").upper()
# Pro Register: 16-bit => zwei Bytes (MSB, LSB)
words = [struct.pack(">H", r & 0xFFFF) for r in registers] # big endian pro Wort
if len(words) == 1:
w = words[0] # b'\xAA\xBB'
if code in ("AB", "ABCD", "CDAB"):
return w
if code == "BADC": # byte swap
return w[::-1]
if code == "DCBA": # byte swap (bei 16-bit identisch zu BADC)
return w[::-1]
return w
# 32-bit (2 words) oder 64-bit (4 words): Word/Byte swaps abbilden
# words[0] = high word bytes, words[1] = low word bytes (in Modbus-Reihenfolge gelesen)
if code == "ABCD":
ordered = words
elif code == "CDAB":
# word swap
ordered = words[1:] + words[:1]
elif code == "BADC":
# byte swap innerhalb jedes Words
ordered = [w[::-1] for w in words]
elif code == "DCBA":
# word + byte swap
ordered = [w[::-1] for w in (words[1:] + words[:1])]
else:
ordered = words
return b"".join(ordered)
@staticmethod
def _decode_by_type(raw_bytes: bytes, data_type: str):
dt = (data_type or "").upper()
# struct: > = big endian, < = little endian
# Wir liefern raw_bytes bereits in der richtigen Reihenfolge; daher nutzen wir ">" konsistent.
if dt == "UINT16":
return struct.unpack(">H", raw_bytes[:2])[0]
if dt == "INT16":
return struct.unpack(">h", raw_bytes[:2])[0]
if dt == "UINT32":
return struct.unpack(">I", raw_bytes[:4])[0]
if dt == "INT32":
return struct.unpack(">i", raw_bytes[:4])[0]
if dt == "FLOAT32":
return struct.unpack(">f", raw_bytes[:4])[0]
if dt == "FLOAT64":
return struct.unpack(">d", raw_bytes[:8])[0]
raise ValueError(f"Unbekannter Data_Type: {dt}")
def _decode_value(self, registers: list[int], meta: dict):
raw = self._registers_to_bytes(registers, meta["byteorder"])
val = self._decode_by_type(raw, meta["data_type"])
return (val * meta["scaling"]) + meta["offset"]
# -------------
# Reading
# -------------
def get_state(self) -> dict:
data = {"Zeit": time.strftime("%Y-%m-%d %H:%M:%S")}
if not self.connect():
data["error"] = "connect_failed"
return data
try:
for address, meta in self.registers.items():
count = int(meta["length"])
result = self.client.read_input_registers(address, count=count)
if result.isError():
print(f"Fehler beim Lesen von Adresse {address}: {result}")
continue
if reg_type == 'REAL':
value = result.registers[0] / 10.0
else:
value = result.registers[0]
try:
value = self._decode_value(result.registers, meta)
except Exception as e:
print(f"Decode-Fehler an Adresse {address} ({meta.get('tag','')}): {e}")
continue
# Optional filter
# if self._is_invalid_sentinel(value):
# continue
value = float(value)
desc = meta.get("desc") or ""
field_name = f"{address} - {desc}".strip(" -")
data[field_name] = float(value)
print(f"Adresse {address} - {desc}: {value}")
finally:
self.close()
print(f"Adresse {address} - {info['desc']}: {value}")
data[f"{address} - {info['desc']}"] = value
return data

89
main.py
View File

@@ -23,60 +23,59 @@ db = DataBaseInflux(
url="http://192.168.1.146:8086",
token="Cw_naEZyvJ3isiAh1P4Eq3TsjcHmzzDFS7SlbKDsS6ZWL04fMEYixWqtNxGThDdG27S9aW5g7FP9eiq5z1rsGA==",
org="allmende",
bucket="allmende_db"
bucket="allmende_db_v3"
)
# hp_master = HeatPump(device_name='hp_master', ip_address='10.0.0.10', port=502)
# hp_slave = HeatPump(device_name='hp_slave', ip_address='10.0.0.11', port=502)
# shelly = ShellyPro3m(device_name='wohnung_2_6', ip_address='192.168.1.121')
wr_master = PvInverter(device_name='solaredge_master', ip_address='192.168.1.112', unit=1)
wr_slave = PvInverter(device_name='solaredge_slave', ip_address='192.168.1.112', unit=3)
hp_master = HeatPump(device_name='hp_master', ip_address='10.0.0.10', port=502)
hp_slave = HeatPump(device_name='hp_slave', ip_address='10.0.0.11', port=502)
shelly = ShellyPro3m(device_name='wohnung_2_6', ip_address='192.168.1.121')
wr = PvInverter(device_name='solaredge_master', ip_address='192.168.1.112')
meter = SolaredgeMeter(device_name='solaredge_meter', ip_address='192.168.1.112')
es.add_components(wr_master, wr_slave)#hp_master, hp_slave, shelly, wr_master, wr_slave, meter)
# controller = SgReadyController(es)
#
# # FORECASTING
# latitude = 48.041
# longitude = 7.862
# TZ = "Europe/Berlin"
# HORIZON_DAYS = 2
# weather_forecaster = WeatherForecaster(latitude=latitude, longitude=longitude)
# site = Location(latitude=latitude, longitude=longitude, altitude=35, tz=TZ, name="Gundelfingen")
#
# p_module = 435
# upper_roof_north = PvWattsSubarrayConfig(name="north", pdc0_w=(29+29+21)*p_module, tilt_deg=10, azimuth_deg=20, dc_loss=0.02, ac_loss=0.01)
# upper_roof_south = PvWattsSubarrayConfig(name="south", pdc0_w=(29+21+20)*p_module, tilt_deg=10, azimuth_deg=200, dc_loss=0.02, ac_loss=0.01)
# upper_roof_east = PvWattsSubarrayConfig(name="east", pdc0_w=7*p_module, tilt_deg=10, azimuth_deg=110, dc_loss=0.02, ac_loss=0.01)
# upper_roof_west = PvWattsSubarrayConfig(name="west", pdc0_w=7*p_module, tilt_deg=10, azimuth_deg=290, dc_loss=0.02, ac_loss=0.01)
# cfgs = [upper_roof_north, upper_roof_south, upper_roof_east, upper_roof_west]
# pv_plant = PvWattsPlant(site, cfgs)
#
# now = datetime.now()
# next_forecast_at = (now + dt.timedelta(hours=1)).replace(minute=0, second=0, microsecond=0)
es.add_components(hp_master, hp_slave, shelly, wr, meter)
controller = SgReadyController(es)
# FORECASTING
latitude = 48.041
longitude = 7.862
TZ = "Europe/Berlin"
HORIZON_DAYS = 2
weather_forecaster = WeatherForecaster(latitude=latitude, longitude=longitude)
site = Location(latitude=latitude, longitude=longitude, altitude=35, tz=TZ, name="Gundelfingen")
p_module = 435
upper_roof_north = PvWattsSubarrayConfig(name="north", pdc0_w=(29+29+21)*p_module, tilt_deg=10, azimuth_deg=20, dc_loss=0.02, ac_loss=0.01)
upper_roof_south = PvWattsSubarrayConfig(name="south", pdc0_w=(29+21+20)*p_module, tilt_deg=10, azimuth_deg=200, dc_loss=0.02, ac_loss=0.01)
upper_roof_east = PvWattsSubarrayConfig(name="east", pdc0_w=7*p_module, tilt_deg=10, azimuth_deg=110, dc_loss=0.02, ac_loss=0.01)
upper_roof_west = PvWattsSubarrayConfig(name="west", pdc0_w=7*p_module, tilt_deg=10, azimuth_deg=290, dc_loss=0.02, ac_loss=0.01)
cfgs = [upper_roof_north, upper_roof_south, upper_roof_east, upper_roof_west]
pv_plant = PvWattsPlant(site, cfgs)
now = datetime.now()
next_forecast_at = (now + dt.timedelta(hours=1)).replace(minute=0, second=0, microsecond=0)
while True:
now = datetime.now()
if now.second % interval_seconds == 0 and now.microsecond < 100_000:
state = es.get_state_and_store_to_database(db)
# mode = controller.perform_action(heat_pump_name='hp_master', meter_name='solaredge_meter', state=state)
#
# if mode == 'mode1':
# mode_as_binary = 0
# else:
# mode_as_binary = 1
# db.store_data('sg_ready', {'mode': mode_as_binary})
mode = controller.perform_action(heat_pump_name='hp_master', meter_name='solaredge_meter', state=state)
# if now >= next_forecast_at:
# # Start der Prognose: ab der kommenden vollen Stunde
# start_hour_local = (now + dt.timedelta(hours=1)).replace(minute=0, second=0, microsecond=0)
# weather = weather_forecaster.get_hourly_forecast(start_hour_local, HORIZON_DAYS)
# total = pv_plant.get_power(weather)
# db.store_forecasts('pv_forecast', total)
#
# # Nächste geplante Ausführung definieren (immer volle Stunde)
# # Falls wir durch Delay mehrere Stunden verpasst haben, hole auf:
# while next_forecast_at <= now:
# next_forecast_at = (next_forecast_at + dt.timedelta(hours=1)).replace(minute=0, second=0, microsecond=0)
if mode == 'mode1':
mode_as_binary = 0
else:
mode_as_binary = 1
db.store_data('sg_ready', {'mode': mode_as_binary})
if now >= next_forecast_at:
# Start der Prognose: ab der kommenden vollen Stunde
start_hour_local = (now + dt.timedelta(hours=1)).replace(minute=0, second=0, microsecond=0)
weather = weather_forecaster.get_hourly_forecast(start_hour_local, HORIZON_DAYS)
total = pv_plant.get_power(weather)
db.store_forecasts('pv_forecast', total)
# Nächste geplante Ausführung definieren (immer volle Stunde)
# Falls wir durch Delay mehrere Stunden verpasst haben, hole auf:
while next_forecast_at <= now:
next_forecast_at = (next_forecast_at + dt.timedelta(hours=1)).replace(minute=0, second=0, microsecond=0)
time.sleep(0.1)

Binary file not shown.

View File

@@ -1,155 +1,139 @@
# pv_inverter.py
# -*- coding: utf-8 -*-
from typing import Optional, Dict, Any, List
from pymodbus.client import ModbusTcpClient
from pymodbus.exceptions import ModbusIOException
import struct
import time
import struct
import pandas as pd
from typing import Dict, Any, List, Tuple, Optional
from pymodbus.client import ModbusTcpClient
EXCEL_PATH = "modbus_registers/pv_inverter_registers.xlsx"
# Obergrenze: bis EXKLUSIVE 40206 (d.h. max. 40205)
MAX_ADDR_EXCLUSIVE = 40121
class PvInverter:
"""
Minimaler Reader für einen SolarEdge-Inverter hinter Modbus-TCP→RTU-Gateway.
Liest nur die bekannten Register (wie im funktionierenden Skript).
Kompatibel mit pymodbus 2.5.x und 3.x kein retry_on_empty.
"""
def __init__(
self,
device_name: str,
ip_address: str,
port: int = 502,
unit_id: int = 1,
timeout: float = 1.5,
silent_interval: float = 0.02,
):
def __init__(self, device_name: str, ip_address: str, port: int = 502, unit: int = 1):
self.device_name = device_name
self.host = ip_address
self.ip = ip_address
self.port = port
self.unit = unit_id
self.timeout = timeout
self.silent_interval = silent_interval
self.unit = unit
self.client: Optional[ModbusTcpClient] = None
self._connect()
self.registers: Dict[int, Dict[str, Any]] = {} # addr -> {"desc":..., "type":...}
self.connect_to_modbus()
self.load_registers(EXCEL_PATH)
# ---------------- Verbindung ----------------
def _connect(self):
# retries=0: keine internen Mehrfachversuche
self.client = ModbusTcpClient(self.host, port=self.port, timeout=self.timeout, retries=0)
# ---------- Verbindung ----------
def connect_to_modbus(self):
self.client = ModbusTcpClient(self.ip, port=self.port, timeout=3.0, retries=3)
if not self.client.connect():
raise ConnectionError(f"Verbindung zu {self.device_name} ({self.host}:{self.port}) fehlgeschlagen.")
print(f"✅ Verbindung hergestellt zu {self.device_name} ({self.host}:{self.port}, unit={self.unit})")
print("Verbindung zu Wechselrichter fehlgeschlagen.")
raise SystemExit(1)
print("✅ Verbindung zu Wechselrichter hergestellt.")
def close(self):
if self.client:
self.client.close()
self.client = None
# ---------------- Low-Level Lesen ----------------
def _read_regs(self, addr: int, count: int) -> Optional[List[int]]:
"""Liest 'count' Holding-Register ab base-0 'addr' für die konfigurierte Unit-ID."""
# ---------- Register-Liste ----------
def load_registers(self, excel_path: str):
xls = pd.ExcelFile(excel_path)
df = xls.parse()
# Passe Spaltennamen hier an, falls nötig:
cols = ["MB Adresse", "Beschreibung", "Variabel Typ"]
df = df[cols].dropna()
df["MB Adresse"] = df["MB Adresse"].astype(int)
# 1) Vorab-Filter: nur Adressen < 40206 übernehmen
df = df[df["MB Adresse"] < MAX_ADDR_EXCLUSIVE]
self.registers = {
int(row["MB Adresse"]): {
"desc": str(row["Beschreibung"]).strip(),
"type": str(row["Variabel Typ"]).strip()
}
for _, row in df.iterrows()
}
# ---------- Low-Level Lesen ----------
def _try_read(self, fn_name: str, address: int, count: int) -> Optional[List[int]]:
fn = getattr(self.client, fn_name)
# pymodbus 3.8.x hat 'slave='; Fallbacks schaden nicht
for kwargs in (dict(address=address, count=count, slave=self.unit),
dict(address=address, count=count)):
try:
rr = self.client.read_holding_registers(address=addr, count=count, slave=self.unit)
except ModbusIOException:
time.sleep(self.silent_interval)
return None
except Exception:
time.sleep(self.silent_interval)
res = fn(**kwargs)
if res is None or (hasattr(res, "isError") and res.isError()):
continue
return res.registers
except TypeError:
continue
return None
time.sleep(self.silent_interval)
if not rr or rr.isError():
return None
return rr.registers
def _read_any(self, address: int, count: int) -> Optional[List[int]]:
regs = self._try_read("read_holding_registers", address, count)
if regs is None:
regs = self._try_read("read_input_registers", address, count)
return regs
# ---------- Decoding ----------
@staticmethod
def _to_int16(u16: int) -> int:
def _to_i16(u16: int) -> int:
return struct.unpack(">h", struct.pack(">H", u16))[0]
@staticmethod
def _apply_sf(raw: int, sf: int) -> float:
return raw * (10 ** sf)
def _to_f32_from_two(u16_hi: int, u16_lo: int, msw_first: bool = True) -> float:
b = struct.pack(">HH", u16_hi, u16_lo) if msw_first else struct.pack(">HH", u16_lo, u16_hi)
return struct.unpack(">f", b)[0]
# Hilfsfunktion: wie viele 16-Bit-Register braucht dieser Typ?
@staticmethod
def _read_string_from_regs(regs: List[int]) -> Optional[str]:
b = b"".join(struct.pack(">H", r) for r in regs)
s = b.decode("ascii", errors="ignore").rstrip("\x00 ").strip()
return s or None
def _word_count_for_type(rtype: str) -> int:
rt = (rtype or "").lower()
# Passe hier an deine Excel-Typen an:
if "uint32" in rt or "real" in rt or "float" in rt or "string(32)" in rt:
return 2
# Default: 1 Wort (z.B. int16/uint16)
return 1
# ---------------- Hilfsfunktionen ----------------
def _read_string(self, addr: int, words: int) -> Optional[str]:
regs = self._read_regs(addr, words)
if regs is None:
def read_one(self, address_excel: int, rtype: str) -> Optional[float]:
"""
Liest einen Wert nach Typ ('INT' oder 'REAL' etc.).
Es werden ausschließlich Register < 40206 gelesen.
"""
addr = int(address_excel)
words = self._word_count_for_type(rtype)
# 2) Harte Grenze prüfen: höchstes angefasstes Register muss < 40206 sein
if addr + words - 1 >= MAX_ADDR_EXCLUSIVE:
# Überspringen, da der Lesevorgang die Grenze >= 40206 berühren würde
return None
return self._read_string_from_regs(regs)
def _read_scaled(self, value_addr: int, sf_addr: int) -> Optional[float]:
regs = self._read_regs(value_addr, 1)
sf = self._read_regs(sf_addr, 1)
if regs is None or sf is None:
if words == 2:
regs = self._read_any(addr, 2)
if not regs or len(regs) < 2:
return None
raw = self._to_int16(regs[0])
sff = self._to_int16(sf[0])
return self._apply_sf(raw, sff)
def _read_u32_with_sf(self, value_addr: int, sf_addr: int) -> Optional[float]:
regs = self._read_regs(value_addr, 2)
sf = self._read_regs(sf_addr, 1)
if regs is None or sf is None:
return None
u32 = (regs[0] << 16) | regs[1]
sff = self._to_int16(sf[0])
return self._apply_sf(u32, sff)
# ---------------- Öffentliche API ----------------
def get_state(self) -> Dict[str, Any]:
"""Liest exakt die bekannten Register und gibt ein Dict zurück."""
state: Dict[str, Any] = {}
# --- Common Block ---
state["C_Manufacturer"] = self._read_string(40004, 16)
state["C_Model"] = self._read_string(40020, 16)
state["C_Version"] = self._read_string(40044, 8)
state["C_SerialNumber"] = self._read_string(40052, 16)
# --- Inverter Block ---
state["I_AC_Power_W"] = self._read_scaled(40083, 40084)
state["I_AC_Voltage_V"] = self._read_scaled(40079, 40082)
state["I_AC_Frequency_Hz"] = self._read_scaled(40085, 40086)
state["I_DC_Power_W"] = self._read_scaled(40100, 40101)
state["I_AC_Energy_Wh_total"] = self._read_u32_with_sf(40093, 40095)
status_regs = self._read_regs(40107, 2)
if status_regs:
state["I_Status"] = status_regs[0]
state["I_Status_Vendor"] = status_regs[1]
# Deine bisherige Logik interpretiert 2 Worte als Float32:
return self._to_f32_from_two(regs[0], regs[1])
else:
state["I_Status"] = None
state["I_Status_Vendor"] = None
regs = self._read_any(addr, 1)
if not regs:
return None
return float(self._to_i16(regs[0]))
return state
# ---------------- Beispiel ----------------
if __name__ == "__main__":
MODBUS_IP = "192.168.1.112"
MODBUS_PORT = 502
master = PvInverter("solaredge_master", MODBUS_IP, port=MODBUS_PORT, unit_id=1)
slave = PvInverter("solaredge_slave", MODBUS_IP, port=MODBUS_PORT, unit_id=3)
try:
sm = master.get_state()
ss = slave.get_state()
print("\n=== MASTER ===")
for k, v in sm.items():
print(f"{k:22s}: {v}")
print("\n=== SLAVE ===")
for k, v in ss.items():
print(f"{k:22s}: {v}")
finally:
master.close()
slave.close()
def get_state(self) -> Dict[str, Any]:
"""
Liest ALLE Register aus self.registers und gibt dict zurück.
Achtet darauf, dass keine Adresse (inkl. Mehrwort) >= 40206 gelesen wird.
"""
data = {"Zeit": time.strftime("%Y-%m-%d %H:%M:%S")}
for address, meta in sorted(self.registers.items()):
words = self._word_count_for_type(meta["type"])
# 3) Nochmals Schutz auf Ebene der Iteration:
if address + words - 1 >= MAX_ADDR_EXCLUSIVE:
continue
val = self.read_one(address, meta["type"])
if val is None:
continue
key = f"{address} - {meta['desc']}"
data[key] = val
return data