2 Commits

7 changed files with 216 additions and 47 deletions

View File

@@ -0,0 +1,7 @@
from heat_pump import HeatPump
hp_master = HeatPump(device_name='hp_master', ip_address='10.0.0.10', port=502, excel_path="../modbus_registers/heat_pump_registers.xlsx")
state = hp_master.get_state()
print(state)

View File

@@ -0,0 +1,49 @@
from pymodbus.client import ModbusTcpClient
def switch_sg_ready_mode(ip, port, mode):
"""
Register 300: 1=BUS 0= Hardware Kontakte
Register 301 & 302:
0-0= Kein Offset
0-1 Boiler und Heizung Offset
1-1 Boiler Offset + E-Einsatz Sollwert Erhöht
1-0 SG EVU Sperre
:param ip:
:param mode:
'mode1' = [True, False, False] => SG Ready deactivated
'mode2' = [True, False, True] => SG ready activated for heatpump only
'mode3' = [True, True, True] => SG ready activated for heatpump and heat rod
:return:
"""
client = ModbusTcpClient(ip, port=port)
if not client.connect():
print("Verbindung zur Wärmepumpe fehlgeschlagen.")
return
mode_code = None
if mode == 'mode1':
mode_code = [True, False, False]
elif mode == 'mode2':
mode_code = [True, False, True]
elif mode == 'mode3':
mode_code = [True, True, True]
else:
print('Uncorrect or no string for mode!')
try:
response_300 = client.write_coil(300, mode_code[0])
response_301 = client.write_coil(301, mode_code[1])
response_302 = client.write_coil(302, mode_code[2])
# Optional: Rückmeldungen prüfen
for addr, resp in zip([300, 301, 302], [response_300, response_301, response_302]):
if resp.isError():
print(f"Fehler beim Schreiben von Coil {addr}: {resp}")
else:
print(f"Coil {addr} erfolgreich geschrieben.")
finally:
client.close()
if '__name__' == '__main__':
switch_sg_ready_mode(ip='10.0.0.10', port=502, mode='mode2')

View File

@@ -1,64 +1,177 @@
from pymodbus.client import ModbusTcpClient from pymodbus.client import ModbusTcpClient
import pandas as pd import pandas as pd
import time import time
import struct
import math
class HeatPump: class HeatPump:
def __init__(self, device_name: str, ip_address: str, port: int=502): def __init__(self, device_name: str, ip_address: str, port: int = 502,
excel_path: str = "modbus_registers/heat_pump_registers_modbus.xlsx",
sheet_name: str = "Register_Map"):
self.device_name = device_name self.device_name = device_name
self.ip = ip_address self.ip = ip_address
self.port = port self.port = port
self.client = None self.client = ModbusTcpClient(self.ip, port=self.port)
self.connect_to_modbus()
self.registers = None
self.get_registers()
def connect_to_modbus(self): self.excel_path = excel_path
port = self.port self.sheet_name = sheet_name
self.client = ModbusTcpClient(self.ip, port=port) self.registers = self.get_registers()
# -------------
# Connection
# -------------
def connect(self) -> bool:
ok = self.client.connect()
if not ok:
print("Verbindung zur Wärmepumpe fehlgeschlagen.")
return ok
def close(self):
try: try:
if not self.client.connect():
print("Verbindung zur Wärmepumpe fehlgeschlagen.")
exit(1)
print("Verbindung zur Wärmepumpe erfolgreich.")
except KeyboardInterrupt:
print("Beendet durch Benutzer (Ctrl+C).")
finally:
self.client.close() self.client.close()
except Exception:
pass
def get_registers(self): # -------------
# Excel-Datei mit den Input-Registerinformationen # Excel parsing
excel_path = "modbus_registers/heat_pump_registers.xlsx" # -------------
xls = pd.ExcelFile(excel_path) def get_registers(self) -> dict:
df_input_registers = xls.parse('04 Input Register') df = pd.read_excel(self.excel_path, sheet_name=self.sheet_name)
df = df[df["Register_Type"].astype(str).str.upper() == "IR"].copy()
# Relevante Spalten bereinigen df["Address"] = df["Address"].astype(int)
df_clean = df_input_registers[['MB Adresse', 'Variable', 'Beschreibung', 'Variabel Typ']].dropna() df["Length"] = df["Length"].astype(int)
df_clean['MB Adresse'] = df_clean['MB Adresse'].astype(int) df["Data_Type"] = df["Data_Type"].astype(str).str.upper()
df["Byteorder"] = df["Byteorder"].astype(str).str.upper()
# Dictionary aus Excel erzeugen df["Scaling"] = df.get("Scaling", 1.0)
self.registers = { df["Scaling"] = df["Scaling"].fillna(1.0).astype(float)
row['MB Adresse']: {
'desc': row['Beschreibung'], df["Offset"] = df.get("Offset", 0.0)
'type': 'REAL' if row['Variabel Typ'] == 'REAL' else 'INT' df["Offset"] = df["Offset"].fillna(0.0).astype(float)
regs = {}
for _, row in df.iterrows():
regs[int(row["Address"])] = {
"length": int(row["Length"]),
"data_type": row["Data_Type"],
"byteorder": row["Byteorder"],
"scaling": float(row["Scaling"]),
"offset": float(row["Offset"]),
"tag": str(row.get("Tag_Name", "")).strip(),
"desc": "" if pd.isna(row.get("Description")) else str(row.get("Description")).strip(),
} }
for _, row in df_clean.iterrows() return regs
}
def get_state(self): # -------------
data = {} # Byteorder handling
data['Zeit'] = time.strftime('%Y-%m-%d %H:%M:%S') # -------------
for address, info in self.registers.items(): @staticmethod
reg_type = info['type'] def _registers_to_bytes(registers: list[int], byteorder_code: str) -> bytes:
result = self.client.read_input_registers(address, count=2 if reg_type == 'REAL' else 1) """
if result.isError(): registers: Liste von uint16 (0..65535), wie pymodbus sie liefert.
print(f"Fehler beim Lesen von Adresse {address}: {result}") byteorder_code: AB, ABCD, CDAB, BADC, DCBA (gemäß Template)
continue Rückgabe: bytes in der Reihenfolge, wie sie für struct.unpack benötigt werden.
"""
code = (byteorder_code or "ABCD").upper()
if reg_type == 'REAL': # Pro Register: 16-bit => zwei Bytes (MSB, LSB)
value = result.registers[0] / 10.0 words = [struct.pack(">H", r & 0xFFFF) for r in registers] # big endian pro Wort
else:
value = result.registers[0] if len(words) == 1:
w = words[0] # b'\xAA\xBB'
if code in ("AB", "ABCD", "CDAB"):
return w
if code == "BADC": # byte swap
return w[::-1]
if code == "DCBA": # byte swap (bei 16-bit identisch zu BADC)
return w[::-1]
return w
# 32-bit (2 words) oder 64-bit (4 words): Word/Byte swaps abbilden
# words[0] = high word bytes, words[1] = low word bytes (in Modbus-Reihenfolge gelesen)
if code == "ABCD":
ordered = words
elif code == "CDAB":
# word swap
ordered = words[1:] + words[:1]
elif code == "BADC":
# byte swap innerhalb jedes Words
ordered = [w[::-1] for w in words]
elif code == "DCBA":
# word + byte swap
ordered = [w[::-1] for w in (words[1:] + words[:1])]
else:
ordered = words
return b"".join(ordered)
@staticmethod
def _decode_by_type(raw_bytes: bytes, data_type: str):
dt = (data_type or "").upper()
# struct: > = big endian, < = little endian
# Wir liefern raw_bytes bereits in der richtigen Reihenfolge; daher nutzen wir ">" konsistent.
if dt == "UINT16":
return struct.unpack(">H", raw_bytes[:2])[0]
if dt == "INT16":
return struct.unpack(">h", raw_bytes[:2])[0]
if dt == "UINT32":
return struct.unpack(">I", raw_bytes[:4])[0]
if dt == "INT32":
return struct.unpack(">i", raw_bytes[:4])[0]
if dt == "FLOAT32":
return struct.unpack(">f", raw_bytes[:4])[0]
if dt == "FLOAT64":
return struct.unpack(">d", raw_bytes[:8])[0]
raise ValueError(f"Unbekannter Data_Type: {dt}")
def _decode_value(self, registers: list[int], meta: dict):
raw = self._registers_to_bytes(registers, meta["byteorder"])
val = self._decode_by_type(raw, meta["data_type"])
return (val * meta["scaling"]) + meta["offset"]
# -------------
# Reading
# -------------
def get_state(self) -> dict:
data = {"Zeit": time.strftime("%Y-%m-%d %H:%M:%S")}
if not self.connect():
data["error"] = "connect_failed"
return data
try:
for address, meta in self.registers.items():
count = int(meta["length"])
result = self.client.read_input_registers(address, count=count)
if result.isError():
print(f"Fehler beim Lesen von Adresse {address}: {result}")
continue
try:
value = self._decode_value(result.registers, meta)
except Exception as e:
print(f"Decode-Fehler an Adresse {address} ({meta.get('tag','')}): {e}")
continue
# Optional filter
# if self._is_invalid_sentinel(value):
# continue
desc = meta.get("desc") or ""
label = f"{address} - {desc}".strip(" -")
data[label] = value
tag = meta.get("tag")
if tag:
data[tag] = value
print(f"Adresse {address} - {desc}: {value}")
finally:
self.close()
print(f"Adresse {address} - {info['desc']}: {value}")
data[f"{address} - {info['desc']}"] = value
return data return data

View File

@@ -26,8 +26,8 @@ db = DataBaseInflux(
bucket="allmende_db" bucket="allmende_db"
) )
hp_master = HeatPump(device_name='hp_master', ip_address='127.0.0.1', port=8111) hp_master = HeatPump(device_name='hp_master', ip_address='10.0.0.10', port=502)
hp_slave = HeatPump(device_name='hp_slave', ip_address='127.0.0.1', port=8111) hp_slave = HeatPump(device_name='hp_slave', ip_address='10.0.0.11', port=502)
shelly = ShellyPro3m(device_name='wohnung_2_6', ip_address='192.168.1.121') shelly = ShellyPro3m(device_name='wohnung_2_6', ip_address='192.168.1.121')
wr = PvInverter(device_name='solaredge_master', ip_address='192.168.1.112') wr = PvInverter(device_name='solaredge_master', ip_address='192.168.1.112')
meter = SolaredgeMeter(device_name='solaredge_meter', ip_address='192.168.1.112') meter = SolaredgeMeter(device_name='solaredge_meter', ip_address='192.168.1.112')

Binary file not shown.