Files
allmende_ems/pv_inverter.py

156 lines
5.3 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# pv_inverter.py
# -*- coding: utf-8 -*-
from typing import Optional, Dict, Any, List
from pymodbus.client import ModbusTcpClient
from pymodbus.exceptions import ModbusIOException
import struct
import time
class PvInverter:
"""
Minimaler Reader für einen SolarEdge-Inverter hinter Modbus-TCP→RTU-Gateway.
Liest nur die bekannten Register (wie im funktionierenden Skript).
Kompatibel mit pymodbus 2.5.x und 3.x kein retry_on_empty.
"""
def __init__(
self,
device_name: str,
ip_address: str,
port: int = 502,
unit_id: int = 1,
timeout: float = 1.5,
silent_interval: float = 0.02,
):
self.device_name = device_name
self.host = ip_address
self.port = port
self.unit = unit_id
self.timeout = timeout
self.silent_interval = silent_interval
self.client: Optional[ModbusTcpClient] = None
self._connect()
# ---------------- Verbindung ----------------
def _connect(self):
# retries=0: keine internen Mehrfachversuche
self.client = ModbusTcpClient(self.host, port=self.port, timeout=self.timeout, retries=0)
if not self.client.connect():
raise ConnectionError(f"Verbindung zu {self.device_name} ({self.host}:{self.port}) fehlgeschlagen.")
print(f"✅ Verbindung hergestellt zu {self.device_name} ({self.host}:{self.port}, unit={self.unit})")
def close(self):
if self.client:
self.client.close()
self.client = None
# ---------------- Low-Level Lesen ----------------
def _read_regs(self, addr: int, count: int) -> Optional[List[int]]:
"""Liest 'count' Holding-Register ab base-0 'addr' für die konfigurierte Unit-ID."""
try:
rr = self.client.read_holding_registers(address=addr, count=count, slave=self.unit)
except ModbusIOException:
time.sleep(self.silent_interval)
return None
except Exception:
time.sleep(self.silent_interval)
return None
time.sleep(self.silent_interval)
if not rr or rr.isError():
return None
return rr.registers
@staticmethod
def _to_int16(u16: int) -> int:
return struct.unpack(">h", struct.pack(">H", u16))[0]
@staticmethod
def _apply_sf(raw: int, sf: int) -> float:
return raw * (10 ** sf)
@staticmethod
def _read_string_from_regs(regs: List[int]) -> Optional[str]:
b = b"".join(struct.pack(">H", r) for r in regs)
s = b.decode("ascii", errors="ignore").rstrip("\x00 ").strip()
return s or None
# ---------------- Hilfsfunktionen ----------------
def _read_string(self, addr: int, words: int) -> Optional[str]:
regs = self._read_regs(addr, words)
if regs is None:
return None
return self._read_string_from_regs(regs)
def _read_scaled(self, value_addr: int, sf_addr: int) -> Optional[float]:
regs = self._read_regs(value_addr, 1)
sf = self._read_regs(sf_addr, 1)
if regs is None or sf is None:
return None
raw = self._to_int16(regs[0])
sff = self._to_int16(sf[0])
return self._apply_sf(raw, sff)
def _read_u32_with_sf(self, value_addr: int, sf_addr: int) -> Optional[float]:
regs = self._read_regs(value_addr, 2)
sf = self._read_regs(sf_addr, 1)
if regs is None or sf is None:
return None
u32 = (regs[0] << 16) | regs[1]
sff = self._to_int16(sf[0])
return self._apply_sf(u32, sff)
# ---------------- Öffentliche API ----------------
def get_state(self) -> Dict[str, Any]:
"""Liest exakt die bekannten Register und gibt ein Dict zurück."""
state: Dict[str, Any] = {}
# --- Common Block ---
state["C_Manufacturer"] = self._read_string(40004, 16)
state["C_Model"] = self._read_string(40020, 16)
state["C_Version"] = self._read_string(40044, 8)
state["C_SerialNumber"] = self._read_string(40052, 16)
# --- Inverter Block ---
state["I_AC_Power_W"] = self._read_scaled(40083, 40084)
state["I_AC_Voltage_V"] = self._read_scaled(40079, 40082)
state["I_AC_Frequency_Hz"] = self._read_scaled(40085, 40086)
state["I_DC_Power_W"] = self._read_scaled(40100, 40101)
state["I_AC_Energy_Wh_total"] = self._read_u32_with_sf(40093, 40095)
status_regs = self._read_regs(40107, 2)
if status_regs:
state["I_Status"] = status_regs[0]
state["I_Status_Vendor"] = status_regs[1]
else:
state["I_Status"] = None
state["I_Status_Vendor"] = None
return state
# ---------------- Beispiel ----------------
if __name__ == "__main__":
MODBUS_IP = "192.168.1.112"
MODBUS_PORT = 502
master = PvInverter("solaredge_master", MODBUS_IP, port=MODBUS_PORT, unit_id=1)
slave = PvInverter("solaredge_slave", MODBUS_IP, port=MODBUS_PORT, unit_id=3)
try:
sm = master.get_state()
ss = slave.get_state()
print("\n=== MASTER ===")
for k, v in sm.items():
print(f"{k:22s}: {v}")
print("\n=== SLAVE ===")
for k, v in ss.items():
print(f"{k:22s}: {v}")
finally:
master.close()
slave.close()