Compare commits
4 Commits
load_forec
...
standard_r
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
8642a057f0 | ||
|
|
ce14d59d51 | ||
|
|
4727364048 | ||
|
|
666eb211a3 |
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
@@ -0,0 +1,7 @@
|
|||||||
|
from heat_pump import HeatPump
|
||||||
|
|
||||||
|
hp_master = HeatPump(device_name='hp_master', ip_address='10.0.0.10', port=502, excel_path="../modbus_registers/heat_pump_registers.xlsx")
|
||||||
|
|
||||||
|
state = hp_master.get_state()
|
||||||
|
|
||||||
|
print(state)
|
||||||
49
component_test_connectors/heat_pump_connection_sg_ready.py
Normal file
49
component_test_connectors/heat_pump_connection_sg_ready.py
Normal file
@@ -0,0 +1,49 @@
|
|||||||
|
from pymodbus.client import ModbusTcpClient
|
||||||
|
|
||||||
|
def switch_sg_ready_mode(ip, port, mode):
|
||||||
|
"""
|
||||||
|
Register 300: 1=BUS 0= Hardware Kontakte
|
||||||
|
Register 301 & 302:
|
||||||
|
0-0= Kein Offset
|
||||||
|
0-1 Boiler und Heizung Offset
|
||||||
|
1-1 Boiler Offset + E-Einsatz Sollwert Erhöht
|
||||||
|
1-0 SG EVU Sperre
|
||||||
|
:param ip:
|
||||||
|
:param mode:
|
||||||
|
'mode1' = [True, False, False] => SG Ready deactivated
|
||||||
|
'mode2' = [True, False, True] => SG ready activated for heatpump only
|
||||||
|
'mode3' = [True, True, True] => SG ready activated for heatpump and heat rod
|
||||||
|
:return:
|
||||||
|
"""
|
||||||
|
client = ModbusTcpClient(ip, port=port)
|
||||||
|
if not client.connect():
|
||||||
|
print("Verbindung zur Wärmepumpe fehlgeschlagen.")
|
||||||
|
return
|
||||||
|
|
||||||
|
mode_code = None
|
||||||
|
if mode == 'mode1':
|
||||||
|
mode_code = [True, False, False]
|
||||||
|
elif mode == 'mode2':
|
||||||
|
mode_code = [True, False, True]
|
||||||
|
elif mode == 'mode3':
|
||||||
|
mode_code = [True, True, True]
|
||||||
|
else:
|
||||||
|
print('Uncorrect or no string for mode!')
|
||||||
|
|
||||||
|
try:
|
||||||
|
response_300 = client.write_coil(300, mode_code[0])
|
||||||
|
response_301 = client.write_coil(301, mode_code[1])
|
||||||
|
response_302 = client.write_coil(302, mode_code[2])
|
||||||
|
|
||||||
|
# Optional: Rückmeldungen prüfen
|
||||||
|
for addr, resp in zip([300, 301, 302], [response_300, response_301, response_302]):
|
||||||
|
if resp.isError():
|
||||||
|
print(f"Fehler beim Schreiben von Coil {addr}: {resp}")
|
||||||
|
else:
|
||||||
|
print(f"Coil {addr} erfolgreich geschrieben.")
|
||||||
|
|
||||||
|
finally:
|
||||||
|
client.close()
|
||||||
|
|
||||||
|
if '__name__' == '__main__':
|
||||||
|
switch_sg_ready_mode(ip='10.0.0.10', port=502, mode='mode2')
|
||||||
Binary file not shown.
197
heat_pump.py
197
heat_pump.py
@@ -1,64 +1,177 @@
|
|||||||
from pymodbus.client import ModbusTcpClient
|
from pymodbus.client import ModbusTcpClient
|
||||||
import pandas as pd
|
import pandas as pd
|
||||||
import time
|
import time
|
||||||
|
import struct
|
||||||
|
import math
|
||||||
|
|
||||||
|
|
||||||
class HeatPump:
|
class HeatPump:
|
||||||
def __init__(self, device_name: str, ip_address: str, port: int=502):
|
def __init__(self, device_name: str, ip_address: str, port: int = 502,
|
||||||
|
excel_path: str = "modbus_registers/heat_pump_registers_modbus.xlsx",
|
||||||
|
sheet_name: str = "Register_Map"):
|
||||||
self.device_name = device_name
|
self.device_name = device_name
|
||||||
self.ip = ip_address
|
self.ip = ip_address
|
||||||
self.port = port
|
self.port = port
|
||||||
self.client = None
|
self.client = ModbusTcpClient(self.ip, port=self.port)
|
||||||
self.connect_to_modbus()
|
|
||||||
self.registers = None
|
|
||||||
self.get_registers()
|
|
||||||
|
|
||||||
def connect_to_modbus(self):
|
self.excel_path = excel_path
|
||||||
port = self.port
|
self.sheet_name = sheet_name
|
||||||
self.client = ModbusTcpClient(self.ip, port=port)
|
self.registers = self.get_registers()
|
||||||
try:
|
|
||||||
if not self.client.connect():
|
# -------------
|
||||||
|
# Connection
|
||||||
|
# -------------
|
||||||
|
def connect(self) -> bool:
|
||||||
|
ok = self.client.connect()
|
||||||
|
if not ok:
|
||||||
print("Verbindung zur Wärmepumpe fehlgeschlagen.")
|
print("Verbindung zur Wärmepumpe fehlgeschlagen.")
|
||||||
exit(1)
|
return ok
|
||||||
print("Verbindung zur Wärmepumpe erfolgreich.")
|
|
||||||
except KeyboardInterrupt:
|
def close(self):
|
||||||
print("Beendet durch Benutzer (Ctrl+C).")
|
try:
|
||||||
finally:
|
|
||||||
self.client.close()
|
self.client.close()
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
def get_registers(self):
|
# -------------
|
||||||
# Excel-Datei mit den Input-Registerinformationen
|
# Excel parsing
|
||||||
excel_path = "modbus_registers/heat_pump_registers.xlsx"
|
# -------------
|
||||||
xls = pd.ExcelFile(excel_path)
|
def get_registers(self) -> dict:
|
||||||
df_input_registers = xls.parse('04 Input Register')
|
df = pd.read_excel(self.excel_path, sheet_name=self.sheet_name)
|
||||||
|
df = df[df["Register_Type"].astype(str).str.upper() == "IR"].copy()
|
||||||
|
|
||||||
# Relevante Spalten bereinigen
|
df["Address"] = df["Address"].astype(int)
|
||||||
df_clean = df_input_registers[['MB Adresse', 'Variable', 'Beschreibung', 'Variabel Typ']].dropna()
|
df["Length"] = df["Length"].astype(int)
|
||||||
df_clean['MB Adresse'] = df_clean['MB Adresse'].astype(int)
|
df["Data_Type"] = df["Data_Type"].astype(str).str.upper()
|
||||||
|
df["Byteorder"] = df["Byteorder"].astype(str).str.upper()
|
||||||
|
|
||||||
# Dictionary aus Excel erzeugen
|
df["Scaling"] = df.get("Scaling", 1.0)
|
||||||
self.registers = {
|
df["Scaling"] = df["Scaling"].fillna(1.0).astype(float)
|
||||||
row['MB Adresse']: {
|
|
||||||
'desc': row['Beschreibung'],
|
df["Offset"] = df.get("Offset", 0.0)
|
||||||
'type': 'REAL' if row['Variabel Typ'] == 'REAL' else 'INT'
|
df["Offset"] = df["Offset"].fillna(0.0).astype(float)
|
||||||
}
|
|
||||||
for _, row in df_clean.iterrows()
|
regs = {}
|
||||||
|
for _, row in df.iterrows():
|
||||||
|
regs[int(row["Address"])] = {
|
||||||
|
"length": int(row["Length"]),
|
||||||
|
"data_type": row["Data_Type"],
|
||||||
|
"byteorder": row["Byteorder"],
|
||||||
|
"scaling": float(row["Scaling"]),
|
||||||
|
"offset": float(row["Offset"]),
|
||||||
|
"tag": str(row.get("Tag_Name", "")).strip(),
|
||||||
|
"desc": "" if pd.isna(row.get("Description")) else str(row.get("Description")).strip(),
|
||||||
}
|
}
|
||||||
|
return regs
|
||||||
|
|
||||||
def get_state(self):
|
# -------------
|
||||||
data = {}
|
# Byteorder handling
|
||||||
data['Zeit'] = time.strftime('%Y-%m-%d %H:%M:%S')
|
# -------------
|
||||||
for address, info in self.registers.items():
|
@staticmethod
|
||||||
reg_type = info['type']
|
def _registers_to_bytes(registers: list[int], byteorder_code: str) -> bytes:
|
||||||
result = self.client.read_input_registers(address, count=2 if reg_type == 'REAL' else 1)
|
"""
|
||||||
|
registers: Liste von uint16 (0..65535), wie pymodbus sie liefert.
|
||||||
|
byteorder_code: AB, ABCD, CDAB, BADC, DCBA (gemäß Template)
|
||||||
|
Rückgabe: bytes in der Reihenfolge, wie sie für struct.unpack benötigt werden.
|
||||||
|
"""
|
||||||
|
code = (byteorder_code or "ABCD").upper()
|
||||||
|
|
||||||
|
# Pro Register: 16-bit => zwei Bytes (MSB, LSB)
|
||||||
|
words = [struct.pack(">H", r & 0xFFFF) for r in registers] # big endian pro Wort
|
||||||
|
|
||||||
|
if len(words) == 1:
|
||||||
|
w = words[0] # b'\xAA\xBB'
|
||||||
|
if code in ("AB", "ABCD", "CDAB"):
|
||||||
|
return w
|
||||||
|
if code == "BADC": # byte swap
|
||||||
|
return w[::-1]
|
||||||
|
if code == "DCBA": # byte swap (bei 16-bit identisch zu BADC)
|
||||||
|
return w[::-1]
|
||||||
|
return w
|
||||||
|
|
||||||
|
# 32-bit (2 words) oder 64-bit (4 words): Word/Byte swaps abbilden
|
||||||
|
# words[0] = high word bytes, words[1] = low word bytes (in Modbus-Reihenfolge gelesen)
|
||||||
|
if code == "ABCD":
|
||||||
|
ordered = words
|
||||||
|
elif code == "CDAB":
|
||||||
|
# word swap
|
||||||
|
ordered = words[1:] + words[:1]
|
||||||
|
elif code == "BADC":
|
||||||
|
# byte swap innerhalb jedes Words
|
||||||
|
ordered = [w[::-1] for w in words]
|
||||||
|
elif code == "DCBA":
|
||||||
|
# word + byte swap
|
||||||
|
ordered = [w[::-1] for w in (words[1:] + words[:1])]
|
||||||
|
else:
|
||||||
|
ordered = words
|
||||||
|
|
||||||
|
return b"".join(ordered)
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _decode_by_type(raw_bytes: bytes, data_type: str):
|
||||||
|
dt = (data_type or "").upper()
|
||||||
|
|
||||||
|
# struct: > = big endian, < = little endian
|
||||||
|
# Wir liefern raw_bytes bereits in der richtigen Reihenfolge; daher nutzen wir ">" konsistent.
|
||||||
|
if dt == "UINT16":
|
||||||
|
return struct.unpack(">H", raw_bytes[:2])[0]
|
||||||
|
if dt == "INT16":
|
||||||
|
return struct.unpack(">h", raw_bytes[:2])[0]
|
||||||
|
if dt == "UINT32":
|
||||||
|
return struct.unpack(">I", raw_bytes[:4])[0]
|
||||||
|
if dt == "INT32":
|
||||||
|
return struct.unpack(">i", raw_bytes[:4])[0]
|
||||||
|
if dt == "FLOAT32":
|
||||||
|
return struct.unpack(">f", raw_bytes[:4])[0]
|
||||||
|
if dt == "FLOAT64":
|
||||||
|
return struct.unpack(">d", raw_bytes[:8])[0]
|
||||||
|
|
||||||
|
raise ValueError(f"Unbekannter Data_Type: {dt}")
|
||||||
|
|
||||||
|
def _decode_value(self, registers: list[int], meta: dict):
|
||||||
|
raw = self._registers_to_bytes(registers, meta["byteorder"])
|
||||||
|
val = self._decode_by_type(raw, meta["data_type"])
|
||||||
|
return (val * meta["scaling"]) + meta["offset"]
|
||||||
|
|
||||||
|
# -------------
|
||||||
|
# Reading
|
||||||
|
# -------------
|
||||||
|
def get_state(self) -> dict:
|
||||||
|
data = {"Zeit": time.strftime("%Y-%m-%d %H:%M:%S")}
|
||||||
|
|
||||||
|
if not self.connect():
|
||||||
|
data["error"] = "connect_failed"
|
||||||
|
return data
|
||||||
|
|
||||||
|
try:
|
||||||
|
for address, meta in self.registers.items():
|
||||||
|
count = int(meta["length"])
|
||||||
|
result = self.client.read_input_registers(address, count=count)
|
||||||
if result.isError():
|
if result.isError():
|
||||||
print(f"Fehler beim Lesen von Adresse {address}: {result}")
|
print(f"Fehler beim Lesen von Adresse {address}: {result}")
|
||||||
continue
|
continue
|
||||||
|
|
||||||
if reg_type == 'REAL':
|
try:
|
||||||
value = result.registers[0] / 10.0
|
value = self._decode_value(result.registers, meta)
|
||||||
else:
|
except Exception as e:
|
||||||
value = result.registers[0]
|
print(f"Decode-Fehler an Adresse {address} ({meta.get('tag','')}): {e}")
|
||||||
|
continue
|
||||||
|
|
||||||
|
# Optional filter
|
||||||
|
# if self._is_invalid_sentinel(value):
|
||||||
|
# continue
|
||||||
|
|
||||||
|
desc = meta.get("desc") or ""
|
||||||
|
label = f"{address} - {desc}".strip(" -")
|
||||||
|
|
||||||
|
data[label] = value
|
||||||
|
tag = meta.get("tag")
|
||||||
|
if tag:
|
||||||
|
data[tag] = value
|
||||||
|
|
||||||
|
print(f"Adresse {address} - {desc}: {value}")
|
||||||
|
|
||||||
|
finally:
|
||||||
|
self.close()
|
||||||
|
|
||||||
print(f"Adresse {address} - {info['desc']}: {value}")
|
|
||||||
data[f"{address} - {info['desc']}"] = value
|
|
||||||
return data
|
return data
|
||||||
|
|||||||
BIN
modbus_registers/_modbus_register_template.xlsx
Normal file
BIN
modbus_registers/_modbus_register_template.xlsx
Normal file
Binary file not shown.
Binary file not shown.
BIN
modbus_registers/raw_register_tables/heat_pump_registers.xlsx
Normal file
BIN
modbus_registers/raw_register_tables/heat_pump_registers.xlsx
Normal file
Binary file not shown.
Binary file not shown.
Reference in New Issue
Block a user