Compare commits
1 Commits
standard_r
...
lüftungsan
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
7b00d622aa |
@@ -1,7 +0,0 @@
|
||||
from heat_pump import HeatPump
|
||||
|
||||
hp_master = HeatPump(device_name='hp_master', ip_address='10.0.0.10', port=502, excel_path="../modbus_registers/heat_pump_registers.xlsx")
|
||||
|
||||
state = hp_master.get_state()
|
||||
|
||||
print(state)
|
||||
@@ -1,49 +0,0 @@
|
||||
from pymodbus.client import ModbusTcpClient
|
||||
|
||||
def switch_sg_ready_mode(ip, port, mode):
|
||||
"""
|
||||
Register 300: 1=BUS 0= Hardware Kontakte
|
||||
Register 301 & 302:
|
||||
0-0= Kein Offset
|
||||
0-1 Boiler und Heizung Offset
|
||||
1-1 Boiler Offset + E-Einsatz Sollwert Erhöht
|
||||
1-0 SG EVU Sperre
|
||||
:param ip:
|
||||
:param mode:
|
||||
'mode1' = [True, False, False] => SG Ready deactivated
|
||||
'mode2' = [True, False, True] => SG ready activated for heatpump only
|
||||
'mode3' = [True, True, True] => SG ready activated for heatpump and heat rod
|
||||
:return:
|
||||
"""
|
||||
client = ModbusTcpClient(ip, port=port)
|
||||
if not client.connect():
|
||||
print("Verbindung zur Wärmepumpe fehlgeschlagen.")
|
||||
return
|
||||
|
||||
mode_code = None
|
||||
if mode == 'mode1':
|
||||
mode_code = [True, False, False]
|
||||
elif mode == 'mode2':
|
||||
mode_code = [True, False, True]
|
||||
elif mode == 'mode3':
|
||||
mode_code = [True, True, True]
|
||||
else:
|
||||
print('Uncorrect or no string for mode!')
|
||||
|
||||
try:
|
||||
response_300 = client.write_coil(300, mode_code[0])
|
||||
response_301 = client.write_coil(301, mode_code[1])
|
||||
response_302 = client.write_coil(302, mode_code[2])
|
||||
|
||||
# Optional: Rückmeldungen prüfen
|
||||
for addr, resp in zip([300, 301, 302], [response_300, response_301, response_302]):
|
||||
if resp.isError():
|
||||
print(f"Fehler beim Schreiben von Coil {addr}: {resp}")
|
||||
else:
|
||||
print(f"Coil {addr} erfolgreich geschrieben.")
|
||||
|
||||
finally:
|
||||
client.close()
|
||||
|
||||
if '__name__' == '__main__':
|
||||
switch_sg_ready_mode(ip='10.0.0.10', port=502, mode='mode2')
|
||||
215
heat_pump.py
215
heat_pump.py
@@ -1,177 +1,64 @@
|
||||
from pymodbus.client import ModbusTcpClient
|
||||
import pandas as pd
|
||||
import time
|
||||
import struct
|
||||
import math
|
||||
|
||||
|
||||
class HeatPump:
|
||||
def __init__(self, device_name: str, ip_address: str, port: int = 502,
|
||||
excel_path: str = "modbus_registers/heat_pump_registers_modbus.xlsx",
|
||||
sheet_name: str = "Register_Map"):
|
||||
def __init__(self, device_name: str, ip_address: str, port: int=502):
|
||||
self.device_name = device_name
|
||||
self.ip = ip_address
|
||||
self.port = port
|
||||
self.client = ModbusTcpClient(self.ip, port=self.port)
|
||||
self.client = None
|
||||
self.connect_to_modbus()
|
||||
self.registers = None
|
||||
self.get_registers()
|
||||
|
||||
self.excel_path = excel_path
|
||||
self.sheet_name = sheet_name
|
||||
self.registers = self.get_registers()
|
||||
|
||||
# -------------
|
||||
# Connection
|
||||
# -------------
|
||||
def connect(self) -> bool:
|
||||
ok = self.client.connect()
|
||||
if not ok:
|
||||
print("Verbindung zur Wärmepumpe fehlgeschlagen.")
|
||||
return ok
|
||||
|
||||
def close(self):
|
||||
def connect_to_modbus(self):
|
||||
port = self.port
|
||||
self.client = ModbusTcpClient(self.ip, port=port)
|
||||
try:
|
||||
self.client.close()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# -------------
|
||||
# Excel parsing
|
||||
# -------------
|
||||
def get_registers(self) -> dict:
|
||||
df = pd.read_excel(self.excel_path, sheet_name=self.sheet_name)
|
||||
df = df[df["Register_Type"].astype(str).str.upper() == "IR"].copy()
|
||||
|
||||
df["Address"] = df["Address"].astype(int)
|
||||
df["Length"] = df["Length"].astype(int)
|
||||
df["Data_Type"] = df["Data_Type"].astype(str).str.upper()
|
||||
df["Byteorder"] = df["Byteorder"].astype(str).str.upper()
|
||||
|
||||
df["Scaling"] = df.get("Scaling", 1.0)
|
||||
df["Scaling"] = df["Scaling"].fillna(1.0).astype(float)
|
||||
|
||||
df["Offset"] = df.get("Offset", 0.0)
|
||||
df["Offset"] = df["Offset"].fillna(0.0).astype(float)
|
||||
|
||||
regs = {}
|
||||
for _, row in df.iterrows():
|
||||
regs[int(row["Address"])] = {
|
||||
"length": int(row["Length"]),
|
||||
"data_type": row["Data_Type"],
|
||||
"byteorder": row["Byteorder"],
|
||||
"scaling": float(row["Scaling"]),
|
||||
"offset": float(row["Offset"]),
|
||||
"tag": str(row.get("Tag_Name", "")).strip(),
|
||||
"desc": "" if pd.isna(row.get("Description")) else str(row.get("Description")).strip(),
|
||||
}
|
||||
return regs
|
||||
|
||||
# -------------
|
||||
# Byteorder handling
|
||||
# -------------
|
||||
@staticmethod
|
||||
def _registers_to_bytes(registers: list[int], byteorder_code: str) -> bytes:
|
||||
"""
|
||||
registers: Liste von uint16 (0..65535), wie pymodbus sie liefert.
|
||||
byteorder_code: AB, ABCD, CDAB, BADC, DCBA (gemäß Template)
|
||||
Rückgabe: bytes in der Reihenfolge, wie sie für struct.unpack benötigt werden.
|
||||
"""
|
||||
code = (byteorder_code or "ABCD").upper()
|
||||
|
||||
# Pro Register: 16-bit => zwei Bytes (MSB, LSB)
|
||||
words = [struct.pack(">H", r & 0xFFFF) for r in registers] # big endian pro Wort
|
||||
|
||||
if len(words) == 1:
|
||||
w = words[0] # b'\xAA\xBB'
|
||||
if code in ("AB", "ABCD", "CDAB"):
|
||||
return w
|
||||
if code == "BADC": # byte swap
|
||||
return w[::-1]
|
||||
if code == "DCBA": # byte swap (bei 16-bit identisch zu BADC)
|
||||
return w[::-1]
|
||||
return w
|
||||
|
||||
# 32-bit (2 words) oder 64-bit (4 words): Word/Byte swaps abbilden
|
||||
# words[0] = high word bytes, words[1] = low word bytes (in Modbus-Reihenfolge gelesen)
|
||||
if code == "ABCD":
|
||||
ordered = words
|
||||
elif code == "CDAB":
|
||||
# word swap
|
||||
ordered = words[1:] + words[:1]
|
||||
elif code == "BADC":
|
||||
# byte swap innerhalb jedes Words
|
||||
ordered = [w[::-1] for w in words]
|
||||
elif code == "DCBA":
|
||||
# word + byte swap
|
||||
ordered = [w[::-1] for w in (words[1:] + words[:1])]
|
||||
else:
|
||||
ordered = words
|
||||
|
||||
return b"".join(ordered)
|
||||
|
||||
@staticmethod
|
||||
def _decode_by_type(raw_bytes: bytes, data_type: str):
|
||||
dt = (data_type or "").upper()
|
||||
|
||||
# struct: > = big endian, < = little endian
|
||||
# Wir liefern raw_bytes bereits in der richtigen Reihenfolge; daher nutzen wir ">" konsistent.
|
||||
if dt == "UINT16":
|
||||
return struct.unpack(">H", raw_bytes[:2])[0]
|
||||
if dt == "INT16":
|
||||
return struct.unpack(">h", raw_bytes[:2])[0]
|
||||
if dt == "UINT32":
|
||||
return struct.unpack(">I", raw_bytes[:4])[0]
|
||||
if dt == "INT32":
|
||||
return struct.unpack(">i", raw_bytes[:4])[0]
|
||||
if dt == "FLOAT32":
|
||||
return struct.unpack(">f", raw_bytes[:4])[0]
|
||||
if dt == "FLOAT64":
|
||||
return struct.unpack(">d", raw_bytes[:8])[0]
|
||||
|
||||
raise ValueError(f"Unbekannter Data_Type: {dt}")
|
||||
|
||||
def _decode_value(self, registers: list[int], meta: dict):
|
||||
raw = self._registers_to_bytes(registers, meta["byteorder"])
|
||||
val = self._decode_by_type(raw, meta["data_type"])
|
||||
return (val * meta["scaling"]) + meta["offset"]
|
||||
|
||||
# -------------
|
||||
# Reading
|
||||
# -------------
|
||||
def get_state(self) -> dict:
|
||||
data = {"Zeit": time.strftime("%Y-%m-%d %H:%M:%S")}
|
||||
|
||||
if not self.connect():
|
||||
data["error"] = "connect_failed"
|
||||
return data
|
||||
|
||||
try:
|
||||
for address, meta in self.registers.items():
|
||||
count = int(meta["length"])
|
||||
result = self.client.read_input_registers(address, count=count)
|
||||
if result.isError():
|
||||
print(f"Fehler beim Lesen von Adresse {address}: {result}")
|
||||
continue
|
||||
|
||||
try:
|
||||
value = self._decode_value(result.registers, meta)
|
||||
except Exception as e:
|
||||
print(f"Decode-Fehler an Adresse {address} ({meta.get('tag','')}): {e}")
|
||||
continue
|
||||
|
||||
# Optional filter
|
||||
# if self._is_invalid_sentinel(value):
|
||||
# continue
|
||||
|
||||
desc = meta.get("desc") or ""
|
||||
label = f"{address} - {desc}".strip(" -")
|
||||
|
||||
data[label] = value
|
||||
tag = meta.get("tag")
|
||||
if tag:
|
||||
data[tag] = value
|
||||
|
||||
print(f"Adresse {address} - {desc}: {value}")
|
||||
|
||||
if not self.client.connect():
|
||||
print("Verbindung zur Wärmepumpe fehlgeschlagen.")
|
||||
exit(1)
|
||||
print("Verbindung zur Wärmepumpe erfolgreich.")
|
||||
except KeyboardInterrupt:
|
||||
print("Beendet durch Benutzer (Ctrl+C).")
|
||||
finally:
|
||||
self.close()
|
||||
self.client.close()
|
||||
|
||||
def get_registers(self):
|
||||
# Excel-Datei mit den Input-Registerinformationen
|
||||
excel_path = "modbus_registers/heat_pump_registers.xlsx"
|
||||
xls = pd.ExcelFile(excel_path)
|
||||
df_input_registers = xls.parse('04 Input Register')
|
||||
|
||||
# Relevante Spalten bereinigen
|
||||
df_clean = df_input_registers[['MB Adresse', 'Variable', 'Beschreibung', 'Variabel Typ']].dropna()
|
||||
df_clean['MB Adresse'] = df_clean['MB Adresse'].astype(int)
|
||||
|
||||
# Dictionary aus Excel erzeugen
|
||||
self.registers = {
|
||||
row['MB Adresse']: {
|
||||
'desc': row['Beschreibung'],
|
||||
'type': 'REAL' if row['Variabel Typ'] == 'REAL' else 'INT'
|
||||
}
|
||||
for _, row in df_clean.iterrows()
|
||||
}
|
||||
|
||||
def get_state(self):
|
||||
data = {}
|
||||
data['Zeit'] = time.strftime('%Y-%m-%d %H:%M:%S')
|
||||
for address, info in self.registers.items():
|
||||
reg_type = info['type']
|
||||
result = self.client.read_input_registers(address, count=2 if reg_type == 'REAL' else 1)
|
||||
if result.isError():
|
||||
print(f"Fehler beim Lesen von Adresse {address}: {result}")
|
||||
continue
|
||||
|
||||
if reg_type == 'REAL':
|
||||
value = result.registers[0] / 10.0
|
||||
else:
|
||||
value = result.registers[0]
|
||||
|
||||
print(f"Adresse {address} - {info['desc']}: {value}")
|
||||
data[f"{address} - {info['desc']}"] = value
|
||||
return data
|
||||
|
||||
4
main.py
4
main.py
@@ -26,8 +26,8 @@ db = DataBaseInflux(
|
||||
bucket="allmende_db"
|
||||
)
|
||||
|
||||
hp_master = HeatPump(device_name='hp_master', ip_address='10.0.0.10', port=502)
|
||||
hp_slave = HeatPump(device_name='hp_slave', ip_address='10.0.0.11', port=502)
|
||||
hp_master = HeatPump(device_name='hp_master', ip_address='127.0.0.1', port=8111)
|
||||
hp_slave = HeatPump(device_name='hp_slave', ip_address='127.0.0.1', port=8111)
|
||||
shelly = ShellyPro3m(device_name='wohnung_2_6', ip_address='192.168.1.121')
|
||||
wr = PvInverter(device_name='solaredge_master', ip_address='192.168.1.112')
|
||||
meter = SolaredgeMeter(device_name='solaredge_meter', ip_address='192.168.1.112')
|
||||
|
||||
Binary file not shown.
Binary file not shown.
BIN
modbus_registers/ventilation_modbus_registers.xlsx
Normal file
BIN
modbus_registers/ventilation_modbus_registers.xlsx
Normal file
Binary file not shown.
18
test.py
Normal file
18
test.py
Normal file
@@ -0,0 +1,18 @@
|
||||
from pymodbus.client import ModbusTcpClient
|
||||
import struct
|
||||
|
||||
MODBUS_IP="10.0.0.40"
|
||||
client=ModbusTcpClient(MODBUS_IP, port=502)
|
||||
client.connect()
|
||||
|
||||
try:
|
||||
rr = client.read_input_registers(30, count=3, slave=1)
|
||||
print("Raw 30..32:", rr.registers)
|
||||
|
||||
def as_int16(x):
|
||||
return struct.unpack(">h", struct.pack(">H", x))[0]
|
||||
|
||||
for i, raw in enumerate(rr.registers, start=30):
|
||||
print(i, "raw", raw, "int16", as_int16(raw), "scaled", as_int16(raw)/10.0)
|
||||
finally:
|
||||
client.close()
|
||||
Reference in New Issue
Block a user