4 Commits

Author SHA1 Message Date
Nils Reiners
8642a057f0 excel sheet for heat pump registers now in template form. tested with script that was also added in folder. sg-ready testing file was also added. 2026-01-06 17:01:50 +01:00
Nils Reiners
ce14d59d51 adresse für hp angepasst 2026-01-05 17:15:25 +01:00
Nils Reiners
4727364048 scheint zu laufen 2025-12-09 22:07:57 +01:00
Nils Reiners
666eb211a3 old version of pv_forecaster restored 2025-10-29 22:03:46 +01:00
14 changed files with 216 additions and 48 deletions

View File

@@ -0,0 +1,7 @@
from heat_pump import HeatPump
hp_master = HeatPump(device_name='hp_master', ip_address='10.0.0.10', port=502, excel_path="../modbus_registers/heat_pump_registers.xlsx")
state = hp_master.get_state()
print(state)

View File

@@ -0,0 +1,49 @@
from pymodbus.client import ModbusTcpClient
def switch_sg_ready_mode(ip, port, mode):
"""
Register 300: 1=BUS 0= Hardware Kontakte
Register 301 & 302:
0-0= Kein Offset
0-1 Boiler und Heizung Offset
1-1 Boiler Offset + E-Einsatz Sollwert Erhöht
1-0 SG EVU Sperre
:param ip:
:param mode:
'mode1' = [True, False, False] => SG Ready deactivated
'mode2' = [True, False, True] => SG ready activated for heatpump only
'mode3' = [True, True, True] => SG ready activated for heatpump and heat rod
:return:
"""
client = ModbusTcpClient(ip, port=port)
if not client.connect():
print("Verbindung zur Wärmepumpe fehlgeschlagen.")
return
mode_code = None
if mode == 'mode1':
mode_code = [True, False, False]
elif mode == 'mode2':
mode_code = [True, False, True]
elif mode == 'mode3':
mode_code = [True, True, True]
else:
print('Uncorrect or no string for mode!')
try:
response_300 = client.write_coil(300, mode_code[0])
response_301 = client.write_coil(301, mode_code[1])
response_302 = client.write_coil(302, mode_code[2])
# Optional: Rückmeldungen prüfen
for addr, resp in zip([300, 301, 302], [response_300, response_301, response_302]):
if resp.isError():
print(f"Fehler beim Schreiben von Coil {addr}: {resp}")
else:
print(f"Coil {addr} erfolgreich geschrieben.")
finally:
client.close()
if '__name__' == '__main__':
switch_sg_ready_mode(ip='10.0.0.10', port=502, mode='mode2')

View File

@@ -1,64 +1,177 @@
from pymodbus.client import ModbusTcpClient
import pandas as pd
import time
import struct
import math
class HeatPump:
def __init__(self, device_name: str, ip_address: str, port: int=502):
def __init__(self, device_name: str, ip_address: str, port: int = 502,
excel_path: str = "modbus_registers/heat_pump_registers_modbus.xlsx",
sheet_name: str = "Register_Map"):
self.device_name = device_name
self.ip = ip_address
self.port = port
self.client = None
self.connect_to_modbus()
self.registers = None
self.get_registers()
self.client = ModbusTcpClient(self.ip, port=self.port)
def connect_to_modbus(self):
port = self.port
self.client = ModbusTcpClient(self.ip, port=port)
try:
if not self.client.connect():
self.excel_path = excel_path
self.sheet_name = sheet_name
self.registers = self.get_registers()
# -------------
# Connection
# -------------
def connect(self) -> bool:
ok = self.client.connect()
if not ok:
print("Verbindung zur Wärmepumpe fehlgeschlagen.")
exit(1)
print("Verbindung zur Wärmepumpe erfolgreich.")
except KeyboardInterrupt:
print("Beendet durch Benutzer (Ctrl+C).")
finally:
return ok
def close(self):
try:
self.client.close()
except Exception:
pass
def get_registers(self):
# Excel-Datei mit den Input-Registerinformationen
excel_path = "modbus_registers/heat_pump_registers.xlsx"
xls = pd.ExcelFile(excel_path)
df_input_registers = xls.parse('04 Input Register')
# -------------
# Excel parsing
# -------------
def get_registers(self) -> dict:
df = pd.read_excel(self.excel_path, sheet_name=self.sheet_name)
df = df[df["Register_Type"].astype(str).str.upper() == "IR"].copy()
# Relevante Spalten bereinigen
df_clean = df_input_registers[['MB Adresse', 'Variable', 'Beschreibung', 'Variabel Typ']].dropna()
df_clean['MB Adresse'] = df_clean['MB Adresse'].astype(int)
df["Address"] = df["Address"].astype(int)
df["Length"] = df["Length"].astype(int)
df["Data_Type"] = df["Data_Type"].astype(str).str.upper()
df["Byteorder"] = df["Byteorder"].astype(str).str.upper()
# Dictionary aus Excel erzeugen
self.registers = {
row['MB Adresse']: {
'desc': row['Beschreibung'],
'type': 'REAL' if row['Variabel Typ'] == 'REAL' else 'INT'
}
for _, row in df_clean.iterrows()
df["Scaling"] = df.get("Scaling", 1.0)
df["Scaling"] = df["Scaling"].fillna(1.0).astype(float)
df["Offset"] = df.get("Offset", 0.0)
df["Offset"] = df["Offset"].fillna(0.0).astype(float)
regs = {}
for _, row in df.iterrows():
regs[int(row["Address"])] = {
"length": int(row["Length"]),
"data_type": row["Data_Type"],
"byteorder": row["Byteorder"],
"scaling": float(row["Scaling"]),
"offset": float(row["Offset"]),
"tag": str(row.get("Tag_Name", "")).strip(),
"desc": "" if pd.isna(row.get("Description")) else str(row.get("Description")).strip(),
}
return regs
def get_state(self):
data = {}
data['Zeit'] = time.strftime('%Y-%m-%d %H:%M:%S')
for address, info in self.registers.items():
reg_type = info['type']
result = self.client.read_input_registers(address, count=2 if reg_type == 'REAL' else 1)
# -------------
# Byteorder handling
# -------------
@staticmethod
def _registers_to_bytes(registers: list[int], byteorder_code: str) -> bytes:
"""
registers: Liste von uint16 (0..65535), wie pymodbus sie liefert.
byteorder_code: AB, ABCD, CDAB, BADC, DCBA (gemäß Template)
Rückgabe: bytes in der Reihenfolge, wie sie für struct.unpack benötigt werden.
"""
code = (byteorder_code or "ABCD").upper()
# Pro Register: 16-bit => zwei Bytes (MSB, LSB)
words = [struct.pack(">H", r & 0xFFFF) for r in registers] # big endian pro Wort
if len(words) == 1:
w = words[0] # b'\xAA\xBB'
if code in ("AB", "ABCD", "CDAB"):
return w
if code == "BADC": # byte swap
return w[::-1]
if code == "DCBA": # byte swap (bei 16-bit identisch zu BADC)
return w[::-1]
return w
# 32-bit (2 words) oder 64-bit (4 words): Word/Byte swaps abbilden
# words[0] = high word bytes, words[1] = low word bytes (in Modbus-Reihenfolge gelesen)
if code == "ABCD":
ordered = words
elif code == "CDAB":
# word swap
ordered = words[1:] + words[:1]
elif code == "BADC":
# byte swap innerhalb jedes Words
ordered = [w[::-1] for w in words]
elif code == "DCBA":
# word + byte swap
ordered = [w[::-1] for w in (words[1:] + words[:1])]
else:
ordered = words
return b"".join(ordered)
@staticmethod
def _decode_by_type(raw_bytes: bytes, data_type: str):
dt = (data_type or "").upper()
# struct: > = big endian, < = little endian
# Wir liefern raw_bytes bereits in der richtigen Reihenfolge; daher nutzen wir ">" konsistent.
if dt == "UINT16":
return struct.unpack(">H", raw_bytes[:2])[0]
if dt == "INT16":
return struct.unpack(">h", raw_bytes[:2])[0]
if dt == "UINT32":
return struct.unpack(">I", raw_bytes[:4])[0]
if dt == "INT32":
return struct.unpack(">i", raw_bytes[:4])[0]
if dt == "FLOAT32":
return struct.unpack(">f", raw_bytes[:4])[0]
if dt == "FLOAT64":
return struct.unpack(">d", raw_bytes[:8])[0]
raise ValueError(f"Unbekannter Data_Type: {dt}")
def _decode_value(self, registers: list[int], meta: dict):
raw = self._registers_to_bytes(registers, meta["byteorder"])
val = self._decode_by_type(raw, meta["data_type"])
return (val * meta["scaling"]) + meta["offset"]
# -------------
# Reading
# -------------
def get_state(self) -> dict:
data = {"Zeit": time.strftime("%Y-%m-%d %H:%M:%S")}
if not self.connect():
data["error"] = "connect_failed"
return data
try:
for address, meta in self.registers.items():
count = int(meta["length"])
result = self.client.read_input_registers(address, count=count)
if result.isError():
print(f"Fehler beim Lesen von Adresse {address}: {result}")
continue
if reg_type == 'REAL':
value = result.registers[0] / 10.0
else:
value = result.registers[0]
try:
value = self._decode_value(result.registers, meta)
except Exception as e:
print(f"Decode-Fehler an Adresse {address} ({meta.get('tag','')}): {e}")
continue
# Optional filter
# if self._is_invalid_sentinel(value):
# continue
desc = meta.get("desc") or ""
label = f"{address} - {desc}".strip(" -")
data[label] = value
tag = meta.get("tag")
if tag:
data[tag] = value
print(f"Adresse {address} - {desc}: {value}")
finally:
self.close()
print(f"Adresse {address} - {info['desc']}: {value}")
data[f"{address} - {info['desc']}"] = value
return data

View File

@@ -29,11 +29,10 @@ db = DataBaseInflux(
hp_master = HeatPump(device_name='hp_master', ip_address='10.0.0.10', port=502)
hp_slave = HeatPump(device_name='hp_slave', ip_address='10.0.0.11', port=502)
shelly = ShellyPro3m(device_name='wohnung_2_6', ip_address='192.168.1.121')
wr_master = PvInverter(device_name='solaredge_master', ip_address='192.168.1.112', unit=1)
wr_slave = PvInverter(device_name='solaredge_slave', ip_address='192.168.1.112', unit=3)
wr = PvInverter(device_name='solaredge_master', ip_address='192.168.1.112')
meter = SolaredgeMeter(device_name='solaredge_meter', ip_address='192.168.1.112')
es.add_components(hp_master, hp_slave, shelly, wr_master, wr_slave, meter)
es.add_components(hp_master, hp_slave, shelly, wr, meter)
controller = SgReadyController(es)
# FORECASTING

Binary file not shown.