Files
allmende_ems/test_meter.py
2025-09-18 14:14:53 +02:00

129 lines
4.2 KiB
Python
Raw Blame History

This file contains invisible Unicode characters
This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import time
import struct
import pandas as pd
from typing import Dict, Any, List, Optional
from pymodbus.client import ModbusTcpClient
# ==== Nutzer-Parameter ====
IP = "192.168.1.112"
PORT = 502 # ggf. 1502
UNIT = 1
EXCEL_PATH = "modbus_registers/pv_inverter_registers.xlsx"
# Spaltennamen in deiner Excel
EXCEL_COLS = ["MB Adresse", "Beschreibung", "Variabel Typ"]
# Adressfilter
MIN_ADDR = 40121 # nur ab hier
ADDRESS_SHIFT = 50 # +50 für Synergy 2-Unit
# =================== Modbus-Helfer ===================
def to_i16(u16: int) -> int:
return struct.unpack(">h", struct.pack(">H", u16))[0]
def f32_from_two(u16_hi: int, u16_lo: int, msw_first: bool = True) -> float:
if msw_first:
b = struct.pack(">HH", u16_hi, u16_lo)
else:
b = struct.pack(">HH", u16_lo, u16_hi)
return struct.unpack(">f", b)[0]
def word_count_for_type(rtype: str) -> int:
rt = (rtype or "").strip().lower()
if "uint32" in rt or "real" in rt or "float" in rt or "string(32)" in rt:
return 2
return 1 # default: 16-bit
class ModbusReader:
def __init__(self, ip: str, port: int, unit: int):
self.client = ModbusTcpClient(ip, port=port, timeout=3.0, retries=3)
if not self.client.connect():
print("❌ Verbindung zu Wechselrichter fehlgeschlagen.")
raise SystemExit(1)
self.unit = unit
print("✅ Verbindung zu Wechselrichter hergestellt.")
def close(self):
try:
self.client.close()
except Exception:
pass
def _try_read(self, fn_name: str, address: int, count: int) -> Optional[List[int]]:
fn = getattr(self.client, fn_name)
for kwargs in (dict(address=address, count=count, slave=self.unit),
dict(address=address, count=count)):
try:
res = fn(**kwargs)
if res is None or (hasattr(res, "isError") and res.isError()):
continue
return getattr(res, "registers", None)
except TypeError:
continue
return None
def read_any(self, address: int, count: int) -> Optional[List[int]]:
regs = self._try_read("read_holding_registers", address, count)
if regs is None:
regs = self._try_read("read_input_registers", address, count)
return regs
# =================== Hauptlogik ===================
def load_register_map(excel_path: str) -> Dict[int, Dict[str, Any]]:
xls = pd.ExcelFile(excel_path)
df = xls.parse()
df = df[EXCEL_COLS].dropna()
df["MB Adresse"] = df["MB Adresse"].astype(int)
regmap: Dict[int, Dict[str, Any]] = {}
for _, row in df.iterrows():
addr_excel = int(row["MB Adresse"])
if addr_excel < MIN_ADDR: # nur ab 40121
continue
desc = str(row["Beschreibung"]).strip()
rtype = str(row["Variabel Typ"]).strip()
regmap[addr_excel] = {"desc": desc, "type": rtype}
print(f" {len(regmap)} Register aus Excel geladen (>= {MIN_ADDR}).")
return regmap
def read_value(reader: ModbusReader, start_addr: int, rtype: str) -> Optional[float]:
words = word_count_for_type(rtype)
if words == 2:
regs = reader.read_any(start_addr, 2)
if not regs or len(regs) < 2:
return None
return f32_from_two(regs[0], regs[1])
else:
regs = reader.read_any(start_addr, 1)
if not regs:
return None
return float(to_i16(regs[0]))
def main():
regs = load_register_map(EXCEL_PATH)
reader = ModbusReader(IP, PORT, UNIT)
try:
print(f"\n📋 Einmalige Auslesung ab {MIN_ADDR} (+{ADDRESS_SHIFT} Adressversatz) {time.strftime('%Y-%m-%d %H:%M:%S')}\n")
for addr_excel, meta in sorted(regs.items()):
shifted_addr = addr_excel + ADDRESS_SHIFT
val = read_value(reader, shifted_addr, meta["type"])
if val is None:
continue
print(f"{addr_excel:5d}+{ADDRESS_SHIFT:2d}{shifted_addr:5d} | "
f"{meta['desc']:<40} | Wert: {val}")
finally:
reader.close()
if __name__ == "__main__":
main()