Files
allmende_ems/pv_inverter.py
2025-09-16 12:52:27 +02:00

69 lines
2.4 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import time
import pandas as pd
from pymodbus.client import ModbusTcpClient
class PvInverter:
def __init__(self, device_name: str, ip_address: str, port: int = 502, unit: int = 1):
self.device_name = device_name
self.ip = ip_address
self.port = port
self.unit = unit
self.client = None
self.registers = None
self.connect_to_modbus()
self.get_registers()
def connect_to_modbus(self):
# Timeout & retries optional, aber hilfreich:
self.client = ModbusTcpClient(self.ip, port=self.port, timeout=3.0, retries=3)
if not self.client.connect():
print("Verbindung zu Wechselrichter fehlgeschlagen.")
raise SystemExit(1)
print("Verbindung zu Wechselrichter erfolgreich.")
# WICHTIG: NICHT hier schließen!
# finally: self.client.close() <-- entfernen
def close(self):
if self.client:
self.client.close()
self.client = None
def get_registers(self):
excel_path = "modbus_registers/pv_inverter_registers.xlsx"
xls = pd.ExcelFile(excel_path)
df_input_registers = xls.parse()
df_clean = df_input_registers[['MB Adresse', 'Beschreibung', 'Variabel Typ']].dropna()
df_clean['MB Adresse'] = df_clean['MB Adresse'].astype(int)
self.registers = {
row['MB Adresse']: {
'desc': row['Beschreibung'],
'type': 'REAL' if str(row['Variabel Typ']).upper() == 'REAL' else 'INT'
}
for _, row in df_clean.iterrows()
}
def get_state(self):
data = {'Zeit': time.strftime('%Y-%m-%d %H:%M:%S')}
for address, info in self.registers.items():
reg_type = info['type']
# Unit-ID mitgeben (wichtig bei pymodbus>=3)
result = self.client.read_holding_registers(
address=address,
count=2 if reg_type == 'REAL' else 1,
slave=self.unit # pymodbus 2.x -> 'slave', nicht 'unit'
)
if result.isError():
print(f"Fehler beim Lesen von Adresse {address}: {result}")
continue
# Minimal invasiv: wie bei dir erstes Register verwenden
value = result.registers[0]
print(f"Adresse {address} - {info['desc']}: {value}")
data[f"{address} - {info['desc']}"] = value
return data