# pv_inverter.py # -*- coding: utf-8 -*- from typing import Optional, Dict, Any, List from pymodbus.client import ModbusTcpClient from pymodbus.exceptions import ModbusIOException import struct import time class PvInverter: """ Minimaler Reader für einen SolarEdge-Inverter hinter Modbus-TCP→RTU-Gateway. Liest nur die bekannten Register (wie im funktionierenden Skript). Kompatibel mit pymodbus 2.5.x und 3.x – kein retry_on_empty. """ def __init__( self, device_name: str, ip_address: str, port: int = 502, unit_id: int = 1, timeout: float = 1.5, silent_interval: float = 0.02, ): self.device_name = device_name self.host = ip_address self.port = port self.unit = unit_id self.timeout = timeout self.silent_interval = silent_interval self.client: Optional[ModbusTcpClient] = None self._connect() # ---------------- Verbindung ---------------- def _connect(self): # retries=0: keine internen Mehrfachversuche self.client = ModbusTcpClient(self.host, port=self.port, timeout=self.timeout, retries=0) if not self.client.connect(): raise ConnectionError(f"Verbindung zu {self.device_name} ({self.host}:{self.port}) fehlgeschlagen.") print(f"✅ Verbindung hergestellt zu {self.device_name} ({self.host}:{self.port}, unit={self.unit})") def close(self): if self.client: self.client.close() self.client = None # ---------------- Low-Level Lesen ---------------- def _read_regs(self, addr: int, count: int) -> Optional[List[int]]: """Liest 'count' Holding-Register ab base-0 'addr' für die konfigurierte Unit-ID.""" try: rr = self.client.read_holding_registers(address=addr, count=count, slave=self.unit) except ModbusIOException: time.sleep(self.silent_interval) return None except Exception: time.sleep(self.silent_interval) return None time.sleep(self.silent_interval) if not rr or rr.isError(): return None return rr.registers @staticmethod def _to_int16(u16: int) -> int: return struct.unpack(">h", struct.pack(">H", u16))[0] @staticmethod def _apply_sf(raw: int, sf: int) -> float: return raw * (10 ** sf) @staticmethod def _read_string_from_regs(regs: List[int]) -> Optional[str]: b = b"".join(struct.pack(">H", r) for r in regs) s = b.decode("ascii", errors="ignore").rstrip("\x00 ").strip() return s or None # ---------------- Hilfsfunktionen ---------------- def _read_string(self, addr: int, words: int) -> Optional[str]: regs = self._read_regs(addr, words) if regs is None: return None return self._read_string_from_regs(regs) def _read_scaled(self, value_addr: int, sf_addr: int) -> Optional[float]: regs = self._read_regs(value_addr, 1) sf = self._read_regs(sf_addr, 1) if regs is None or sf is None: return None raw = self._to_int16(regs[0]) sff = self._to_int16(sf[0]) return self._apply_sf(raw, sff) def _read_u32_with_sf(self, value_addr: int, sf_addr: int) -> Optional[float]: regs = self._read_regs(value_addr, 2) sf = self._read_regs(sf_addr, 1) if regs is None or sf is None: return None u32 = (regs[0] << 16) | regs[1] sff = self._to_int16(sf[0]) return self._apply_sf(u32, sff) # ---------------- Öffentliche API ---------------- def get_state(self) -> Dict[str, Any]: """Liest exakt die bekannten Register und gibt ein Dict zurück.""" state: Dict[str, Any] = {} # --- Common Block --- state["C_Manufacturer"] = self._read_string(40004, 16) state["C_Model"] = self._read_string(40020, 16) state["C_Version"] = self._read_string(40044, 8) state["C_SerialNumber"] = self._read_string(40052, 16) # --- Inverter Block --- state["I_AC_Power_W"] = self._read_scaled(40083, 40084) state["I_AC_Voltage_V"] = self._read_scaled(40079, 40082) state["I_AC_Frequency_Hz"] = self._read_scaled(40085, 40086) state["I_DC_Power_W"] = self._read_scaled(40100, 40101) state["I_AC_Energy_Wh_total"] = self._read_u32_with_sf(40093, 40095) status_regs = self._read_regs(40107, 2) if status_regs: state["I_Status"] = status_regs[0] state["I_Status_Vendor"] = status_regs[1] else: state["I_Status"] = None state["I_Status_Vendor"] = None return state # ---------------- Beispiel ---------------- if __name__ == "__main__": MODBUS_IP = "192.168.1.112" MODBUS_PORT = 502 master = PvInverter("solaredge_master", MODBUS_IP, port=MODBUS_PORT, unit_id=1) slave = PvInverter("solaredge_slave", MODBUS_IP, port=MODBUS_PORT, unit_id=3) try: sm = master.get_state() ss = slave.get_state() print("\n=== MASTER ===") for k, v in sm.items(): print(f"{k:22s}: {v}") print("\n=== SLAVE ===") for k, v in ss.items(): print(f"{k:22s}: {v}") finally: master.close() slave.close()