import time import struct import pandas as pd from typing import Dict, Any, List, Optional from pymodbus.client import ModbusTcpClient EXCEL_PATH = "modbus_registers/pv_inverter_registers.xlsx" # Bis EXKLUSIVE 40206 (also max. 40205) MAX_ADDR_EXCLUSIVE = 40206 class PvInverter: def __init__(self, device_name: str, ip_address: str, port: int = 502, unit: int = 1): """ device_name : Anzeigename (z.B. 'master' oder 'slave') ip_address : IP des Wechselrichters oder Modbus-Gateways port : TCP-Port (Standard 502) unit : Modbus Unit-ID (1 = Master, 3 = Slave) """ self.device_name = device_name self.ip = ip_address self.port = port self.unit = unit self.client: Optional[ModbusTcpClient] = None self.registers: Dict[int, Dict[str, Any]] = {} self.connect_to_modbus() self.load_registers(EXCEL_PATH) # ---------- Verbindung ---------- def connect_to_modbus(self): self.client = ModbusTcpClient(self.ip, port=self.port, timeout=3.0) if not self.client.connect(): print(f"❌ Verbindung zu {self.device_name} ({self.ip}:{self.port}) fehlgeschlagen.") raise SystemExit(1) print(f"✅ Verbindung hergestellt zu {self.device_name} ({self.ip}:{self.port}, unit={self.unit})") def close(self): if self.client: self.client.close() self.client = None # ---------- Register-Liste ---------- def load_registers(self, excel_path: str): xls = pd.ExcelFile(excel_path) df = xls.parse() # Passe Spaltennamen an deine Excel an cols = ["MB Adresse", "Beschreibung", "Variabel Typ"] df = df[cols].dropna() df["MB Adresse"] = df["MB Adresse"].astype(int) # Nur Register unterhalb der Grenze übernehmen df = df[df["MB Adresse"] < MAX_ADDR_EXCLUSIVE] self.registers = { int(row["MB Adresse"]): { "desc": str(row["Beschreibung"]).strip(), "type": str(row["Variabel Typ"]).strip() } for _, row in df.iterrows() } # ---------- Low-Level Lesen ---------- def _try_read(self, fn_name: str, address: int, count: int) -> Optional[List[int]]: """ Ruft die pymodbus-Funktion mit fester unit-ID auf (kein Fallback). """ fn = getattr(self.client, fn_name) res = fn(address=address, count=count, slave=self.unit) if res is None or (hasattr(res, "isError") and res.isError()): return None return getattr(res, "registers", None) def _read_any(self, address: int, count: int) -> Optional[List[int]]: regs = self._try_read("read_holding_registers", address, count) if regs is None: regs = self._try_read("read_input_registers", address, count) return regs # ---------- Decoding ---------- @staticmethod def _to_i16(u16: int) -> int: return struct.unpack(">h", struct.pack(">H", u16))[0] @staticmethod def _to_f32_from_two(u16_hi: int, u16_lo: int, msw_first: bool = True) -> float: b = struct.pack(">HH", u16_hi, u16_lo) if msw_first else struct.pack(">HH", u16_lo, u16_hi) return struct.unpack(">f", b)[0] # Wie viele Register braucht der Typ? @staticmethod def _word_count_for_type(rtype: str) -> int: rt = (rtype or "").lower() if "uint32" in rt or "real" in rt or "float" in rt or "string(32)" in rt: return 2 return 1 # ---------- Lesen ---------- def read_one(self, address_excel: int, rtype: str) -> Optional[float]: """ Liest einen Wert nach Typ ('INT', 'REAL' etc.) mit fixer Unit-ID. """ addr = int(address_excel) words = self._word_count_for_type(rtype) # Grenze prüfen if addr + words - 1 >= MAX_ADDR_EXCLUSIVE: return None if words == 2: regs = self._read_any(addr, 2) if not regs or len(regs) < 2: return None return self._to_f32_from_two(regs[0], regs[1]) else: regs = self._read_any(addr, 1) if not regs: return None return float(self._to_i16(regs[0])) def get_state(self) -> Dict[str, Any]: """ Liest alle gültigen Register und gibt ein dict zurück. """ data = {"Zeit": time.strftime("%Y-%m-%d %H:%M:%S")} for address, meta in sorted(self.registers.items()): words = self._word_count_for_type(meta["type"]) if address + words - 1 >= MAX_ADDR_EXCLUSIVE: continue val = self.read_one(address, meta["type"]) if val is None: continue key = f"{address} - {meta['desc']}" data[key] = val return data