from pymodbus.client import ModbusTcpClient import pandas as pd import time import struct import math class HeatPump: def __init__(self, device_name: str, ip_address: str, port: int = 502, excel_path: str = "modbus_registers/heat_pump_registers.xlsx", sheet_name: str = "Register_Map"): self.device_name = device_name self.ip = ip_address self.port = port self.client = ModbusTcpClient(self.ip, port=self.port) self.excel_path = excel_path self.sheet_name = sheet_name self.registers = self.get_registers() # ------------- # Connection # ------------- def connect(self) -> bool: ok = self.client.connect() if not ok: print("Verbindung zur Wärmepumpe fehlgeschlagen.") return ok def close(self): try: self.client.close() except Exception: pass # ------------- # Excel parsing # ------------- def get_registers(self) -> dict: df = pd.read_excel(self.excel_path, sheet_name=self.sheet_name) df = df[df["Register_Type"].astype(str).str.upper() == "IR"].copy() df["Address"] = df["Address"].astype(int) df["Length"] = df["Length"].astype(int) df["Data_Type"] = df["Data_Type"].astype(str).str.upper() df["Byteorder"] = df["Byteorder"].astype(str).str.upper() df["Scaling"] = df.get("Scaling", 1.0) df["Scaling"] = df["Scaling"].fillna(1.0).astype(float) df["Offset"] = df.get("Offset", 0.0) df["Offset"] = df["Offset"].fillna(0.0).astype(float) regs = {} for _, row in df.iterrows(): regs[int(row["Address"])] = { "length": int(row["Length"]), "data_type": row["Data_Type"], "byteorder": row["Byteorder"], "scaling": float(row["Scaling"]), "offset": float(row["Offset"]), "tag": str(row.get("Tag_Name", "")).strip(), "desc": "" if pd.isna(row.get("Description")) else str(row.get("Description")).strip(), } return regs # ------------- # Byteorder handling # ------------- @staticmethod def _registers_to_bytes(registers: list[int], byteorder_code: str) -> bytes: """ registers: Liste von uint16 (0..65535), wie pymodbus sie liefert. byteorder_code: AB, ABCD, CDAB, BADC, DCBA (gemäß Template) Rückgabe: bytes in der Reihenfolge, wie sie für struct.unpack benötigt werden. """ code = (byteorder_code or "ABCD").upper() # Pro Register: 16-bit => zwei Bytes (MSB, LSB) words = [struct.pack(">H", r & 0xFFFF) for r in registers] # big endian pro Wort if len(words) == 1: w = words[0] # b'\xAA\xBB' if code in ("AB", "ABCD", "CDAB"): return w if code == "BADC": # byte swap return w[::-1] if code == "DCBA": # byte swap (bei 16-bit identisch zu BADC) return w[::-1] return w # 32-bit (2 words) oder 64-bit (4 words): Word/Byte swaps abbilden # words[0] = high word bytes, words[1] = low word bytes (in Modbus-Reihenfolge gelesen) if code == "ABCD": ordered = words elif code == "CDAB": # word swap ordered = words[1:] + words[:1] elif code == "BADC": # byte swap innerhalb jedes Words ordered = [w[::-1] for w in words] elif code == "DCBA": # word + byte swap ordered = [w[::-1] for w in (words[1:] + words[:1])] else: ordered = words return b"".join(ordered) @staticmethod def _decode_by_type(raw_bytes: bytes, data_type: str): dt = (data_type or "").upper() # struct: > = big endian, < = little endian # Wir liefern raw_bytes bereits in der richtigen Reihenfolge; daher nutzen wir ">" konsistent. if dt == "UINT16": return struct.unpack(">H", raw_bytes[:2])[0] if dt == "INT16": return struct.unpack(">h", raw_bytes[:2])[0] if dt == "UINT32": return struct.unpack(">I", raw_bytes[:4])[0] if dt == "INT32": return struct.unpack(">i", raw_bytes[:4])[0] if dt == "FLOAT32": return struct.unpack(">f", raw_bytes[:4])[0] if dt == "FLOAT64": return struct.unpack(">d", raw_bytes[:8])[0] raise ValueError(f"Unbekannter Data_Type: {dt}") def _decode_value(self, registers: list[int], meta: dict): raw = self._registers_to_bytes(registers, meta["byteorder"]) val = self._decode_by_type(raw, meta["data_type"]) return (val * meta["scaling"]) + meta["offset"] # ------------- # Reading # ------------- def get_state(self) -> dict: data = {"Zeit": time.strftime("%Y-%m-%d %H:%M:%S")} if not self.connect(): data["error"] = "connect_failed" return data try: for address, meta in self.registers.items(): count = int(meta["length"]) result = self.client.read_input_registers(address, count=count) if result.isError(): print(f"Fehler beim Lesen von Adresse {address}: {result}") continue try: value = self._decode_value(result.registers, meta) except Exception as e: print(f"Decode-Fehler an Adresse {address} ({meta.get('tag','')}): {e}") continue # Optional filter # if self._is_invalid_sentinel(value): # continue value = float(value) desc = meta.get("desc") or "" field_name = f"{address} - {desc}".strip(" -") data[field_name] = float(value) print(f"Adresse {address} - {desc}: {value}") finally: self.close() return data