From 4af2460736caa15ddfe97a698caaa480e0c8b319 Mon Sep 17 00:00:00 2001 From: Nils Reiners Date: Wed, 29 Oct 2025 22:06:53 +0100 Subject: [PATCH] =?UTF-8?q?l=C3=A4uft=20noch=20nicht;=20slave=20inverter?= =?UTF-8?q?=20liest=20in=20komischen=20zeitabst=C3=A4nden=20und=20es=20gib?= =?UTF-8?q?t=20wohl=20bei=20einigen=20Registereintr=C3=A4gen=20probleme?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pv_inverter.py | 234 ++++++++++++++++++++++++++----------------------- 1 file changed, 126 insertions(+), 108 deletions(-) diff --git a/pv_inverter.py b/pv_inverter.py index 9f7dcfa..2a470d2 100644 --- a/pv_inverter.py +++ b/pv_inverter.py @@ -1,137 +1,155 @@ -import time -import struct -import pandas as pd -from typing import Dict, Any, List, Optional +# pv_inverter.py +# -*- coding: utf-8 -*- +from typing import Optional, Dict, Any, List from pymodbus.client import ModbusTcpClient - -EXCEL_PATH = "modbus_registers/pv_inverter_registers.xlsx" - -# Bis EXKLUSIVE 40206 (also max. 40205) -MAX_ADDR_EXCLUSIVE = 40206 +from pymodbus.exceptions import ModbusIOException +import struct +import time class PvInverter: - def __init__(self, device_name: str, ip_address: str, port: int = 502, unit: int = 1): - """ - device_name : Anzeigename (z.B. 'master' oder 'slave') - ip_address : IP des Wechselrichters oder Modbus-Gateways - port : TCP-Port (Standard 502) - unit : Modbus Unit-ID (1 = Master, 3 = Slave) - """ + """ + Minimaler Reader für einen SolarEdge-Inverter hinter Modbus-TCP→RTU-Gateway. + Liest nur die bekannten Register (wie im funktionierenden Skript). + Kompatibel mit pymodbus 2.5.x und 3.x – kein retry_on_empty. + """ + + def __init__( + self, + device_name: str, + ip_address: str, + port: int = 502, + unit_id: int = 1, + timeout: float = 1.5, + silent_interval: float = 0.02, + ): self.device_name = device_name - self.ip = ip_address + self.host = ip_address self.port = port - self.unit = unit + self.unit = unit_id + self.timeout = timeout + self.silent_interval = silent_interval self.client: Optional[ModbusTcpClient] = None - self.registers: Dict[int, Dict[str, Any]] = {} + self._connect() - self.connect_to_modbus() - self.load_registers(EXCEL_PATH) - - # ---------- Verbindung ---------- - def connect_to_modbus(self): - self.client = ModbusTcpClient(self.ip, port=self.port, timeout=3.0) + # ---------------- Verbindung ---------------- + def _connect(self): + # retries=0: keine internen Mehrfachversuche + self.client = ModbusTcpClient(self.host, port=self.port, timeout=self.timeout, retries=0) if not self.client.connect(): - print(f"❌ Verbindung zu {self.device_name} ({self.ip}:{self.port}) fehlgeschlagen.") - raise SystemExit(1) - print(f"✅ Verbindung hergestellt zu {self.device_name} ({self.ip}:{self.port}, unit={self.unit})") + raise ConnectionError(f"Verbindung zu {self.device_name} ({self.host}:{self.port}) fehlgeschlagen.") + print(f"✅ Verbindung hergestellt zu {self.device_name} ({self.host}:{self.port}, unit={self.unit})") def close(self): if self.client: self.client.close() self.client = None - # ---------- Register-Liste ---------- - def load_registers(self, excel_path: str): - xls = pd.ExcelFile(excel_path) - df = xls.parse() - - # Passe Spaltennamen an deine Excel an - cols = ["MB Adresse", "Beschreibung", "Variabel Typ"] - df = df[cols].dropna() - df["MB Adresse"] = df["MB Adresse"].astype(int) - - # Nur Register unterhalb der Grenze übernehmen - df = df[df["MB Adresse"] < MAX_ADDR_EXCLUSIVE] - - self.registers = { - int(row["MB Adresse"]): { - "desc": str(row["Beschreibung"]).strip(), - "type": str(row["Variabel Typ"]).strip() - } - for _, row in df.iterrows() - } - - # ---------- Low-Level Lesen ---------- - def _try_read(self, fn_name: str, address: int, count: int) -> Optional[List[int]]: - """ - Ruft die pymodbus-Funktion mit fester unit-ID auf (kein Fallback). - """ - fn = getattr(self.client, fn_name) - res = fn(address=address, count=count, slave=self.unit) - if res is None or (hasattr(res, "isError") and res.isError()): + # ---------------- Low-Level Lesen ---------------- + def _read_regs(self, addr: int, count: int) -> Optional[List[int]]: + """Liest 'count' Holding-Register ab base-0 'addr' für die konfigurierte Unit-ID.""" + try: + rr = self.client.read_holding_registers(address=addr, count=count, slave=self.unit) + except ModbusIOException: + time.sleep(self.silent_interval) + return None + except Exception: + time.sleep(self.silent_interval) return None - return getattr(res, "registers", None) - def _read_any(self, address: int, count: int) -> Optional[List[int]]: - regs = self._try_read("read_holding_registers", address, count) - if regs is None: - regs = self._try_read("read_input_registers", address, count) - return regs + time.sleep(self.silent_interval) + if not rr or rr.isError(): + return None + return rr.registers - # ---------- Decoding ---------- @staticmethod - def _to_i16(u16: int) -> int: + def _to_int16(u16: int) -> int: return struct.unpack(">h", struct.pack(">H", u16))[0] @staticmethod - def _to_f32_from_two(u16_hi: int, u16_lo: int, msw_first: bool = True) -> float: - b = struct.pack(">HH", u16_hi, u16_lo) if msw_first else struct.pack(">HH", u16_lo, u16_hi) - return struct.unpack(">f", b)[0] + def _apply_sf(raw: int, sf: int) -> float: + return raw * (10 ** sf) - # Wie viele Register braucht der Typ? @staticmethod - def _word_count_for_type(rtype: str) -> int: - rt = (rtype or "").lower() - if "uint32" in rt or "real" in rt or "float" in rt or "string(32)" in rt: - return 2 - return 1 + def _read_string_from_regs(regs: List[int]) -> Optional[str]: + b = b"".join(struct.pack(">H", r) for r in regs) + s = b.decode("ascii", errors="ignore").rstrip("\x00 ").strip() + return s or None - # ---------- Lesen ---------- - def read_one(self, address_excel: int, rtype: str) -> Optional[float]: - """ - Liest einen Wert nach Typ ('INT', 'REAL' etc.) mit fixer Unit-ID. - """ - addr = int(address_excel) - words = self._word_count_for_type(rtype) - - # Grenze prüfen - if addr + words - 1 >= MAX_ADDR_EXCLUSIVE: + # ---------------- Hilfsfunktionen ---------------- + def _read_string(self, addr: int, words: int) -> Optional[str]: + regs = self._read_regs(addr, words) + if regs is None: return None + return self._read_string_from_regs(regs) - if words == 2: - regs = self._read_any(addr, 2) - if not regs or len(regs) < 2: - return None - return self._to_f32_from_two(regs[0], regs[1]) - else: - regs = self._read_any(addr, 1) - if not regs: - return None - return float(self._to_i16(regs[0])) + def _read_scaled(self, value_addr: int, sf_addr: int) -> Optional[float]: + regs = self._read_regs(value_addr, 1) + sf = self._read_regs(sf_addr, 1) + if regs is None or sf is None: + return None + raw = self._to_int16(regs[0]) + sff = self._to_int16(sf[0]) + return self._apply_sf(raw, sff) + def _read_u32_with_sf(self, value_addr: int, sf_addr: int) -> Optional[float]: + regs = self._read_regs(value_addr, 2) + sf = self._read_regs(sf_addr, 1) + if regs is None or sf is None: + return None + u32 = (regs[0] << 16) | regs[1] + sff = self._to_int16(sf[0]) + return self._apply_sf(u32, sff) + + # ---------------- Öffentliche API ---------------- def get_state(self) -> Dict[str, Any]: - """ - Liest alle gültigen Register und gibt ein dict zurück. - """ - data = {"Zeit": time.strftime("%Y-%m-%d %H:%M:%S")} - for address, meta in sorted(self.registers.items()): - words = self._word_count_for_type(meta["type"]) - if address + words - 1 >= MAX_ADDR_EXCLUSIVE: - continue - val = self.read_one(address, meta["type"]) - if val is None: - continue - key = f"{address} - {meta['desc']}" - data[key] = val - return data + """Liest exakt die bekannten Register und gibt ein Dict zurück.""" + state: Dict[str, Any] = {} + + # --- Common Block --- + state["C_Manufacturer"] = self._read_string(40004, 16) + state["C_Model"] = self._read_string(40020, 16) + state["C_Version"] = self._read_string(40044, 8) + state["C_SerialNumber"] = self._read_string(40052, 16) + + # --- Inverter Block --- + state["I_AC_Power_W"] = self._read_scaled(40083, 40084) + state["I_AC_Voltage_V"] = self._read_scaled(40079, 40082) + state["I_AC_Frequency_Hz"] = self._read_scaled(40085, 40086) + state["I_DC_Power_W"] = self._read_scaled(40100, 40101) + state["I_AC_Energy_Wh_total"] = self._read_u32_with_sf(40093, 40095) + + status_regs = self._read_regs(40107, 2) + if status_regs: + state["I_Status"] = status_regs[0] + state["I_Status_Vendor"] = status_regs[1] + else: + state["I_Status"] = None + state["I_Status_Vendor"] = None + + return state + + +# ---------------- Beispiel ---------------- +if __name__ == "__main__": + MODBUS_IP = "192.168.1.112" + MODBUS_PORT = 502 + + master = PvInverter("solaredge_master", MODBUS_IP, port=MODBUS_PORT, unit_id=1) + slave = PvInverter("solaredge_slave", MODBUS_IP, port=MODBUS_PORT, unit_id=3) + + try: + sm = master.get_state() + ss = slave.get_state() + + print("\n=== MASTER ===") + for k, v in sm.items(): + print(f"{k:22s}: {v}") + + print("\n=== SLAVE ===") + for k, v in ss.items(): + print(f"{k:22s}: {v}") + + finally: + master.close() + slave.close()