diff --git a/__pycache__/pv_inverter.cpython-312.pyc b/__pycache__/pv_inverter.cpython-312.pyc index 25174a8..4adc3cb 100644 Binary files a/__pycache__/pv_inverter.cpython-312.pyc and b/__pycache__/pv_inverter.cpython-312.pyc differ diff --git a/__pycache__/solaredge_meter.cpython-312.pyc b/__pycache__/solaredge_meter.cpython-312.pyc new file mode 100644 index 0000000..cea1ffc Binary files /dev/null and b/__pycache__/solaredge_meter.cpython-312.pyc differ diff --git a/main.py b/main.py index 84a03ce..044baca 100644 --- a/main.py +++ b/main.py @@ -3,6 +3,7 @@ from datetime import datetime from data_base_influx import DataBaseInflux from heat_pump import HeatPump from pv_inverter import PvInverter +from solaredge_meter import SolaredgeMeter from shelly_pro_3m import ShellyPro3m # For dev-System run in terminal: ssh -N -L 127.0.0.1:8111:10.0.0.10:502 pi@192.168.1.146 @@ -19,7 +20,8 @@ db = DataBaseInflux( hp = HeatPump(device_name='hp_master', ip_address='10.0.0.10', port=502) shelly = ShellyPro3m(device_name='wohnung_2_6', ip_address='192.168.1.121') -wr = PvInverter(device_name='wr_master', ip_address='192.168.1.112') +wr = PvInverter(device_name='solaredge_master', ip_address='192.168.1.112') +meter = SolaredgeMeter(device_name='solaredge_meter', ip_address='192.168.1.112') #controller = SgReadyController(hp, wr) @@ -29,6 +31,7 @@ while True: db.store_data(hp.device_name, hp.get_state()) db.store_data(shelly.device_name, shelly.get_state()) db.store_data(wr.device_name, wr.get_state()) + db.store_data(meter.device_name, meter.get_state()) #controller.perform_action() time.sleep(0.1) diff --git a/pv_inverter.py b/pv_inverter.py index 1142652..0caba94 100644 --- a/pv_inverter.py +++ b/pv_inverter.py @@ -4,7 +4,10 @@ import pandas as pd from typing import Dict, Any, List, Tuple, Optional from pymodbus.client import ModbusTcpClient -EXCEL_PATH = "modbus_registers/pv_inverter_registers.xlsx" +EXCEL_PATH = "modbus_registers/pv_inverter_registers.xlsx" + +# Obergrenze: bis EXKLUSIVE 40206 (d.h. max. 40205) +MAX_ADDR_EXCLUSIVE = 40121 class PvInverter: def __init__(self, device_name: str, ip_address: str, port: int = 502, unit: int = 1): @@ -34,33 +37,29 @@ class PvInverter: def load_registers(self, excel_path: str): xls = pd.ExcelFile(excel_path) df = xls.parse() - # Passen die Spaltennamen bei dir anders, bitte hier anpassen: + # Passe Spaltennamen hier an, falls nötig: cols = ["MB Adresse", "Beschreibung", "Variabel Typ"] - for c in cols: - if c not in df.columns: - raise ValueError(f"Spalte '{c}' fehlt in {excel_path}") df = df[cols].dropna() df["MB Adresse"] = df["MB Adresse"].astype(int) - # NORMALISIERE TYP - def norm_type(x: Any) -> str: - s = str(x).strip().upper() - return "REAL" if s == "REAL" else "INT" + + # 1) Vorab-Filter: nur Adressen < 40206 übernehmen + df = df[df["MB Adresse"] < MAX_ADDR_EXCLUSIVE] + self.registers = { int(row["MB Adresse"]): { "desc": str(row["Beschreibung"]).strip(), - "type": norm_type(row["Variabel Typ"]) + "type": str(row["Variabel Typ"]).strip() } for _, row in df.iterrows() } - print(f"ℹ️ {len(self.registers)} Register aus Excel geladen.") + # ---------- Low-Level Lesen ---------- def _try_read(self, fn_name: str, address: int, count: int) -> Optional[List[int]]: fn = getattr(self.client, fn_name) # pymodbus 3.8.x hat 'slave='; Fallbacks schaden nicht for kwargs in (dict(address=address, count=count, slave=self.unit), - dict(address=address, count=count), - ): + dict(address=address, count=count)): try: res = fn(**kwargs) if res is None or (hasattr(res, "isError") and res.isError()): @@ -83,33 +82,58 @@ class PvInverter: @staticmethod def _to_f32_from_two(u16_hi: int, u16_lo: int, msw_first: bool = True) -> float: - if msw_first: - b = struct.pack(">HH", u16_hi, u16_lo) - else: - b = struct.pack(">HH", u16_lo, u16_hi) + b = struct.pack(">HH", u16_hi, u16_lo) if msw_first else struct.pack(">HH", u16_lo, u16_hi) return struct.unpack(">f", b)[0] + # Hilfsfunktion: wie viele 16-Bit-Register braucht dieser Typ? + @staticmethod + def _word_count_for_type(rtype: str) -> int: + rt = (rtype or "").lower() + # Passe hier an deine Excel-Typen an: + if "uint32" in rt or "real" in rt or "float" in rt or "string(32)" in rt: + return 2 + # Default: 1 Wort (z.B. int16/uint16) + return 1 + def read_one(self, address_excel: int, rtype: str) -> Optional[float]: - """Liest einen Wert nach Typ ('INT' oder 'REAL') unter Berücksichtigung Base-1.""" - addr = address_excel - if rtype == "REAL": + """ + Liest einen Wert nach Typ ('INT' oder 'REAL' etc.). + Es werden ausschließlich Register < 40206 gelesen. + """ + addr = int(address_excel) + words = self._word_count_for_type(rtype) + + # 2) Harte Grenze prüfen: höchstes angefasstes Register muss < 40206 sein + if addr + words - 1 >= MAX_ADDR_EXCLUSIVE: + # Überspringen, da der Lesevorgang die Grenze >= 40206 berühren würde + return None + + if words == 2: regs = self._read_any(addr, 2) if not regs or len(regs) < 2: return None + # Deine bisherige Logik interpretiert 2 Worte als Float32: return self._to_f32_from_two(regs[0], regs[1]) - else: # INT + else: regs = self._read_any(addr, 1) if not regs: return None return float(self._to_i16(regs[0])) def get_state(self) -> Dict[str, Any]: - """Liest ALLE Register aus self.registers und gibt dict zurück.""" + """ + Liest ALLE Register aus self.registers und gibt dict zurück. + Achtet darauf, dass keine Adresse (inkl. Mehrwort) >= 40206 gelesen wird. + """ data = {"Zeit": time.strftime("%Y-%m-%d %H:%M:%S")} - for address, meta in self.registers.items(): + for address, meta in sorted(self.registers.items()): + words = self._word_count_for_type(meta["type"]) + # 3) Nochmals Schutz auf Ebene der Iteration: + if address + words - 1 >= MAX_ADDR_EXCLUSIVE: + continue val = self.read_one(address, meta["type"]) if val is None: continue key = f"{address} - {meta['desc']}" data[key] = val - return data \ No newline at end of file + return data diff --git a/solaredge_meter.py b/solaredge_meter.py new file mode 100644 index 0000000..1214189 --- /dev/null +++ b/solaredge_meter.py @@ -0,0 +1,134 @@ +import time +import struct +import pandas as pd +from typing import Dict, Any, List, Tuple, Optional +from pymodbus.client import ModbusTcpClient + +EXCEL_PATH = "modbus_registers/pv_inverter_registers.xlsx" + +# Obergrenze: bis EXKLUSIVE 40206 (d.h. max. 40205) +MIN_ADDR_INCLUSIVE = 40121 +ADDRESS_SHIFT = 50 + +class SolaredgeMeter: + def __init__(self, device_name: str, ip_address: str, port: int = 502, unit: int = 1): + self.device_name = device_name + self.ip = ip_address + self.port = port + self.unit = unit + self.client: Optional[ModbusTcpClient] = None + self.registers: Dict[int, Dict[str, Any]] = {} # addr -> {"desc":..., "type":...} + self.connect_to_modbus() + self.load_registers(EXCEL_PATH) + + # ---------- Verbindung ---------- + def connect_to_modbus(self): + self.client = ModbusTcpClient(self.ip, port=self.port, timeout=3.0, retries=3) + if not self.client.connect(): + print("❌ Verbindung zu Zähler fehlgeschlagen.") + raise SystemExit(1) + print("✅ Verbindung zu Zähler hergestellt.") + + def close(self): + if self.client: + self.client.close() + self.client = None + + # ---------- Register-Liste ---------- + def load_registers(self, excel_path: str): + xls = pd.ExcelFile(excel_path) + df = xls.parse() + # Passe Spaltennamen hier an, falls nötig: + cols = ["MB Adresse", "Beschreibung", "Variabel Typ"] + df = df[cols].dropna() + df["MB Adresse"] = df["MB Adresse"].astype(int) + + # 1) Vorab-Filter: nur Adressen < 40206 übernehmen + df = df[df["MB Adresse"] >= MIN_ADDR_INCLUSIVE] + + self.registers = { + int(row["MB Adresse"]): { + "desc": str(row["Beschreibung"]).strip(), + "type": str(row["Variabel Typ"]).strip() + } + for _, row in df.iterrows() + } + + + # ---------- Low-Level Lesen ---------- + def _try_read(self, fn_name: str, address: int, count: int) -> Optional[List[int]]: + fn = getattr(self.client, fn_name) + # pymodbus 3.8.x hat 'slave='; Fallbacks schaden nicht + shifted_addr = address + ADDRESS_SHIFT + for kwargs in (dict(address=shifted_addr, count=count, slave=self.unit), + dict(address=shifted_addr, count=count)): + try: + res = fn(**kwargs) + if res is None or (hasattr(res, "isError") and res.isError()): + continue + return res.registers + except TypeError: + continue + return None + + def _read_any(self, address: int, count: int) -> Optional[List[int]]: + regs = self._try_read("read_holding_registers", address, count) + if regs is None: + regs = self._try_read("read_input_registers", address, count) + return regs + + # ---------- Decoding ---------- + @staticmethod + def _to_i16(u16: int) -> int: + return struct.unpack(">h", struct.pack(">H", u16))[0] + + @staticmethod + def _to_f32_from_two(u16_hi: int, u16_lo: int, msw_first: bool = True) -> float: + b = struct.pack(">HH", u16_hi, u16_lo) if msw_first else struct.pack(">HH", u16_lo, u16_hi) + return struct.unpack(">f", b)[0] + + # Hilfsfunktion: wie viele 16-Bit-Register braucht dieser Typ? + @staticmethod + def _word_count_for_type(rtype: str) -> int: + rt = (rtype or "").lower() + # Passe hier an deine Excel-Typen an: + if "uint32" in rt or "real" in rt or "float" in rt or "string(32)" in rt: + return 2 + # Default: 1 Wort (z.B. int16/uint16) + return 1 + + def read_one(self, address_excel: int, rtype: str) -> Optional[float]: + """ + Liest einen Wert nach Typ ('INT' oder 'REAL' etc.). + Es werden ausschließlich Register < 40206 gelesen. + """ + addr = int(address_excel) + words = self._word_count_for_type(rtype) + + if words == 2: + regs = self._read_any(addr, 2) + if not regs or len(regs) < 2: + return None + # Deine bisherige Logik interpretiert 2 Worte als Float32: + return self._to_f32_from_two(regs[0], regs[1]) + else: + regs = self._read_any(addr, 1) + if not regs: + return None + return float(self._to_i16(regs[0])) + + def get_state(self) -> Dict[str, Any]: + """ + Liest ALLE Register aus self.registers und gibt dict zurück. + Achtet darauf, dass keine Adresse (inkl. Mehrwort) >= 40206 gelesen wird. + """ + data = {"Zeit": time.strftime("%Y-%m-%d %H:%M:%S")} + for address, meta in sorted(self.registers.items()): + words = self._word_count_for_type(meta["type"]) + + val = self.read_one(address, meta["type"]) + if val is None: + continue + key = f"{address} - {meta['desc']}" + data[key] = val + return data diff --git a/test_meter.py b/test_meter.py index bb90c24..c1c2a2b 100644 --- a/test_meter.py +++ b/test_meter.py @@ -1,61 +1,128 @@ #!/usr/bin/env python3 # -*- coding: utf-8 -*- +import time +import struct +import pandas as pd +from typing import Dict, Any, List, Optional from pymodbus.client import ModbusTcpClient -import struct, sys -MODBUS_IP = "192.168.1.112" -MODBUS_PORT = 502 -UNIT_ID = 1 +# ==== Nutzer-Parameter ==== +IP = "192.168.1.112" +PORT = 502 # ggf. 1502 +UNIT = 1 +EXCEL_PATH = "modbus_registers/pv_inverter_registers.xlsx" -METER_START = 40240 # Startadresse Model 203-Felder +# Spaltennamen in deiner Excel +EXCEL_COLS = ["MB Adresse", "Beschreibung", "Variabel Typ"] -def to_i16(u16): # unsigned 16 → signed 16 +# Adressfilter +MIN_ADDR = 40121 # nur ab hier +ADDRESS_SHIFT = 50 # +50 für Synergy 2-Unit + + +# =================== Modbus-Helfer =================== + +def to_i16(u16: int) -> int: return struct.unpack(">h", struct.pack(">H", u16))[0] -def read_regs(client, addr, count): - rr = client.read_holding_registers(address=addr, count=count, slave=UNIT_ID) - if rr.isError(): - return None - return rr.registers +def f32_from_two(u16_hi: int, u16_lo: int, msw_first: bool = True) -> float: + if msw_first: + b = struct.pack(">HH", u16_hi, u16_lo) + else: + b = struct.pack(">HH", u16_lo, u16_hi) + return struct.unpack(">f", b)[0] -def read_meter_power(client): - base = METER_START - p = read_regs(client, base + 16, 1) # M_AC_Power - pa = read_regs(client, base + 17, 1) # Phase A - pb = read_regs(client, base + 18, 1) # Phase B - pc = read_regs(client, base + 19, 1) # Phase C - sf = read_regs(client, base + 20, 1) # Scale Factor - if not p or not sf: - return None - sff = to_i16(sf[0]) - return { - "total": to_i16(p[0]) * (10 ** sff), - "A": to_i16(pa[0]) * (10 ** sff) if pa else None, - "B": to_i16(pb[0]) * (10 ** sff) if pb else None, - "C": to_i16(pc[0]) * (10 ** sff) if pc else None, - "sf": sff - } +def word_count_for_type(rtype: str) -> int: + rt = (rtype or "").strip().lower() + if "uint32" in rt or "real" in rt or "float" in rt or "string(32)" in rt: + return 2 + return 1 # default: 16-bit -def fmt_w(v): - if v is None: return "-" - neg = v < 0 - v = abs(v) - return f"{'-' if neg else ''}{v/1000:.2f} kW" if v >= 1000 else f"{'-' if neg else ''}{v:.0f} W" +class ModbusReader: + def __init__(self, ip: str, port: int, unit: int): + self.client = ModbusTcpClient(ip, port=port, timeout=3.0, retries=3) + if not self.client.connect(): + print("❌ Verbindung zu Wechselrichter fehlgeschlagen.") + raise SystemExit(1) + self.unit = unit + print("✅ Verbindung zu Wechselrichter hergestellt.") + + def close(self): + try: + self.client.close() + except Exception: + pass + + def _try_read(self, fn_name: str, address: int, count: int) -> Optional[List[int]]: + fn = getattr(self.client, fn_name) + for kwargs in (dict(address=address, count=count, slave=self.unit), + dict(address=address, count=count)): + try: + res = fn(**kwargs) + if res is None or (hasattr(res, "isError") and res.isError()): + continue + return getattr(res, "registers", None) + except TypeError: + continue + return None + + def read_any(self, address: int, count: int) -> Optional[List[int]]: + regs = self._try_read("read_holding_registers", address, count) + if regs is None: + regs = self._try_read("read_input_registers", address, count) + return regs + + +# =================== Hauptlogik =================== + +def load_register_map(excel_path: str) -> Dict[int, Dict[str, Any]]: + xls = pd.ExcelFile(excel_path) + df = xls.parse() + df = df[EXCEL_COLS].dropna() + df["MB Adresse"] = df["MB Adresse"].astype(int) + + regmap: Dict[int, Dict[str, Any]] = {} + for _, row in df.iterrows(): + addr_excel = int(row["MB Adresse"]) + if addr_excel < MIN_ADDR: # nur ab 40121 + continue + desc = str(row["Beschreibung"]).strip() + rtype = str(row["Variabel Typ"]).strip() + regmap[addr_excel] = {"desc": desc, "type": rtype} + print(f"ℹ️ {len(regmap)} Register aus Excel geladen (>= {MIN_ADDR}).") + return regmap + +def read_value(reader: ModbusReader, start_addr: int, rtype: str) -> Optional[float]: + words = word_count_for_type(rtype) + if words == 2: + regs = reader.read_any(start_addr, 2) + if not regs or len(regs) < 2: + return None + return f32_from_two(regs[0], regs[1]) + else: + regs = reader.read_any(start_addr, 1) + if not regs: + return None + return float(to_i16(regs[0])) def main(): - client = ModbusTcpClient(MODBUS_IP, port=MODBUS_PORT) - if not client.connect(): - print("❌ Verbindung fehlgeschlagen."); sys.exit(1) + regs = load_register_map(EXCEL_PATH) + reader = ModbusReader(IP, PORT, UNIT) + try: - m = read_meter_power(client) - if m: - print(f"Meter-Leistung: {fmt_w(m['total'])} " - f"(A {fmt_w(m['A'])}, B {fmt_w(m['B'])}, C {fmt_w(m['C'])}) [SF={m['sf']}]") - else: - print("Meter-Leistung konnte nicht gelesen werden.") + print(f"\n📋 Einmalige Auslesung ab {MIN_ADDR} (+{ADDRESS_SHIFT} Adressversatz) – {time.strftime('%Y-%m-%d %H:%M:%S')}\n") + + for addr_excel, meta in sorted(regs.items()): + shifted_addr = addr_excel + ADDRESS_SHIFT + val = read_value(reader, shifted_addr, meta["type"]) + if val is None: + continue + print(f"{addr_excel:5d}+{ADDRESS_SHIFT:2d} → {shifted_addr:5d} | " + f"{meta['desc']:<40} | Wert: {val}") + finally: - client.close() + reader.close() if __name__ == "__main__": main()