diff --git a/component_test_connectors/heat_pump_connection_read_data.py b/component_test_connectors/heat_pump_connection_read_data.py new file mode 100644 index 0000000..c9ef06f --- /dev/null +++ b/component_test_connectors/heat_pump_connection_read_data.py @@ -0,0 +1,7 @@ +from heat_pump import HeatPump + +hp_master = HeatPump(device_name='hp_master', ip_address='10.0.0.10', port=502, excel_path="../modbus_registers/heat_pump_registers.xlsx") + +state = hp_master.get_state() + +print(state) \ No newline at end of file diff --git a/component_test_connectors/heat_pump_connection_sg_ready.py b/component_test_connectors/heat_pump_connection_sg_ready.py new file mode 100644 index 0000000..8dff783 --- /dev/null +++ b/component_test_connectors/heat_pump_connection_sg_ready.py @@ -0,0 +1,49 @@ +from pymodbus.client import ModbusTcpClient + +def switch_sg_ready_mode(ip, port, mode): + """ + Register 300: 1=BUS 0= Hardware Kontakte + Register 301 & 302: + 0-0= Kein Offset + 0-1 Boiler und Heizung Offset + 1-1 Boiler Offset + E-Einsatz Sollwert Erhöht + 1-0 SG EVU Sperre + :param ip: + :param mode: + 'mode1' = [True, False, False] => SG Ready deactivated + 'mode2' = [True, False, True] => SG ready activated for heatpump only + 'mode3' = [True, True, True] => SG ready activated for heatpump and heat rod + :return: + """ + client = ModbusTcpClient(ip, port=port) + if not client.connect(): + print("Verbindung zur Wärmepumpe fehlgeschlagen.") + return + + mode_code = None + if mode == 'mode1': + mode_code = [True, False, False] + elif mode == 'mode2': + mode_code = [True, False, True] + elif mode == 'mode3': + mode_code = [True, True, True] + else: + print('Uncorrect or no string for mode!') + + try: + response_300 = client.write_coil(300, mode_code[0]) + response_301 = client.write_coil(301, mode_code[1]) + response_302 = client.write_coil(302, mode_code[2]) + + # Optional: Rückmeldungen prüfen + for addr, resp in zip([300, 301, 302], [response_300, response_301, response_302]): + if resp.isError(): + print(f"Fehler beim Schreiben von Coil {addr}: {resp}") + else: + print(f"Coil {addr} erfolgreich geschrieben.") + + finally: + client.close() + +if '__name__' == '__main__': + switch_sg_ready_mode(ip='10.0.0.10', port=502, mode='mode2') diff --git a/heat_pump.py b/heat_pump.py index 773e84a..6a29f5e 100644 --- a/heat_pump.py +++ b/heat_pump.py @@ -1,64 +1,177 @@ from pymodbus.client import ModbusTcpClient import pandas as pd import time +import struct +import math + class HeatPump: - def __init__(self, device_name: str, ip_address: str, port: int=502): + def __init__(self, device_name: str, ip_address: str, port: int = 502, + excel_path: str = "modbus_registers/heat_pump_registers_modbus.xlsx", + sheet_name: str = "Register_Map"): self.device_name = device_name self.ip = ip_address self.port = port - self.client = None - self.connect_to_modbus() - self.registers = None - self.get_registers() + self.client = ModbusTcpClient(self.ip, port=self.port) - def connect_to_modbus(self): - port = self.port - self.client = ModbusTcpClient(self.ip, port=port) + self.excel_path = excel_path + self.sheet_name = sheet_name + self.registers = self.get_registers() + + # ------------- + # Connection + # ------------- + def connect(self) -> bool: + ok = self.client.connect() + if not ok: + print("Verbindung zur Wärmepumpe fehlgeschlagen.") + return ok + + def close(self): try: - if not self.client.connect(): - print("Verbindung zur Wärmepumpe fehlgeschlagen.") - exit(1) - print("Verbindung zur Wärmepumpe erfolgreich.") - except KeyboardInterrupt: - print("Beendet durch Benutzer (Ctrl+C).") - finally: self.client.close() + except Exception: + pass - def get_registers(self): - # Excel-Datei mit den Input-Registerinformationen - excel_path = "modbus_registers/heat_pump_registers.xlsx" - xls = pd.ExcelFile(excel_path) - df_input_registers = xls.parse('04 Input Register') + # ------------- + # Excel parsing + # ------------- + def get_registers(self) -> dict: + df = pd.read_excel(self.excel_path, sheet_name=self.sheet_name) + df = df[df["Register_Type"].astype(str).str.upper() == "IR"].copy() - # Relevante Spalten bereinigen - df_clean = df_input_registers[['MB Adresse', 'Variable', 'Beschreibung', 'Variabel Typ']].dropna() - df_clean['MB Adresse'] = df_clean['MB Adresse'].astype(int) + df["Address"] = df["Address"].astype(int) + df["Length"] = df["Length"].astype(int) + df["Data_Type"] = df["Data_Type"].astype(str).str.upper() + df["Byteorder"] = df["Byteorder"].astype(str).str.upper() - # Dictionary aus Excel erzeugen - self.registers = { - row['MB Adresse']: { - 'desc': row['Beschreibung'], - 'type': 'REAL' if row['Variabel Typ'] == 'REAL' else 'INT' + df["Scaling"] = df.get("Scaling", 1.0) + df["Scaling"] = df["Scaling"].fillna(1.0).astype(float) + + df["Offset"] = df.get("Offset", 0.0) + df["Offset"] = df["Offset"].fillna(0.0).astype(float) + + regs = {} + for _, row in df.iterrows(): + regs[int(row["Address"])] = { + "length": int(row["Length"]), + "data_type": row["Data_Type"], + "byteorder": row["Byteorder"], + "scaling": float(row["Scaling"]), + "offset": float(row["Offset"]), + "tag": str(row.get("Tag_Name", "")).strip(), + "desc": "" if pd.isna(row.get("Description")) else str(row.get("Description")).strip(), } - for _, row in df_clean.iterrows() - } + return regs - def get_state(self): - data = {} - data['Zeit'] = time.strftime('%Y-%m-%d %H:%M:%S') - for address, info in self.registers.items(): - reg_type = info['type'] - result = self.client.read_input_registers(address, count=2 if reg_type == 'REAL' else 1) - if result.isError(): - print(f"Fehler beim Lesen von Adresse {address}: {result}") - continue + # ------------- + # Byteorder handling + # ------------- + @staticmethod + def _registers_to_bytes(registers: list[int], byteorder_code: str) -> bytes: + """ + registers: Liste von uint16 (0..65535), wie pymodbus sie liefert. + byteorder_code: AB, ABCD, CDAB, BADC, DCBA (gemäß Template) + Rückgabe: bytes in der Reihenfolge, wie sie für struct.unpack benötigt werden. + """ + code = (byteorder_code or "ABCD").upper() - if reg_type == 'REAL': - value = result.registers[0] / 10.0 - else: - value = result.registers[0] + # Pro Register: 16-bit => zwei Bytes (MSB, LSB) + words = [struct.pack(">H", r & 0xFFFF) for r in registers] # big endian pro Wort + + if len(words) == 1: + w = words[0] # b'\xAA\xBB' + if code in ("AB", "ABCD", "CDAB"): + return w + if code == "BADC": # byte swap + return w[::-1] + if code == "DCBA": # byte swap (bei 16-bit identisch zu BADC) + return w[::-1] + return w + + # 32-bit (2 words) oder 64-bit (4 words): Word/Byte swaps abbilden + # words[0] = high word bytes, words[1] = low word bytes (in Modbus-Reihenfolge gelesen) + if code == "ABCD": + ordered = words + elif code == "CDAB": + # word swap + ordered = words[1:] + words[:1] + elif code == "BADC": + # byte swap innerhalb jedes Words + ordered = [w[::-1] for w in words] + elif code == "DCBA": + # word + byte swap + ordered = [w[::-1] for w in (words[1:] + words[:1])] + else: + ordered = words + + return b"".join(ordered) + + @staticmethod + def _decode_by_type(raw_bytes: bytes, data_type: str): + dt = (data_type or "").upper() + + # struct: > = big endian, < = little endian + # Wir liefern raw_bytes bereits in der richtigen Reihenfolge; daher nutzen wir ">" konsistent. + if dt == "UINT16": + return struct.unpack(">H", raw_bytes[:2])[0] + if dt == "INT16": + return struct.unpack(">h", raw_bytes[:2])[0] + if dt == "UINT32": + return struct.unpack(">I", raw_bytes[:4])[0] + if dt == "INT32": + return struct.unpack(">i", raw_bytes[:4])[0] + if dt == "FLOAT32": + return struct.unpack(">f", raw_bytes[:4])[0] + if dt == "FLOAT64": + return struct.unpack(">d", raw_bytes[:8])[0] + + raise ValueError(f"Unbekannter Data_Type: {dt}") + + def _decode_value(self, registers: list[int], meta: dict): + raw = self._registers_to_bytes(registers, meta["byteorder"]) + val = self._decode_by_type(raw, meta["data_type"]) + return (val * meta["scaling"]) + meta["offset"] + + # ------------- + # Reading + # ------------- + def get_state(self) -> dict: + data = {"Zeit": time.strftime("%Y-%m-%d %H:%M:%S")} + + if not self.connect(): + data["error"] = "connect_failed" + return data + + try: + for address, meta in self.registers.items(): + count = int(meta["length"]) + result = self.client.read_input_registers(address, count=count) + if result.isError(): + print(f"Fehler beim Lesen von Adresse {address}: {result}") + continue + + try: + value = self._decode_value(result.registers, meta) + except Exception as e: + print(f"Decode-Fehler an Adresse {address} ({meta.get('tag','')}): {e}") + continue + + # Optional filter + # if self._is_invalid_sentinel(value): + # continue + + desc = meta.get("desc") or "" + label = f"{address} - {desc}".strip(" -") + + data[label] = value + tag = meta.get("tag") + if tag: + data[tag] = value + + print(f"Adresse {address} - {desc}: {value}") + + finally: + self.close() - print(f"Adresse {address} - {info['desc']}: {value}") - data[f"{address} - {info['desc']}"] = value return data diff --git a/modbus_registers/_modbus_register_template.xlsx b/modbus_registers/_modbus_register_template.xlsx new file mode 100644 index 0000000..9184dc1 Binary files /dev/null and b/modbus_registers/_modbus_register_template.xlsx differ diff --git a/modbus_registers/heat_pump_registers.xlsx b/modbus_registers/heat_pump_registers.xlsx index b9dff62..c9ea342 100644 Binary files a/modbus_registers/heat_pump_registers.xlsx and b/modbus_registers/heat_pump_registers.xlsx differ diff --git a/modbus_registers/raw_register_tables/heat_pump_registers.xlsx b/modbus_registers/raw_register_tables/heat_pump_registers.xlsx new file mode 100644 index 0000000..b9dff62 Binary files /dev/null and b/modbus_registers/raw_register_tables/heat_pump_registers.xlsx differ