test knx und test lüftungsanlage added

This commit is contained in:
Nils Reiners
2026-02-20 20:49:27 +01:00
parent 7b00d622aa
commit 3f19cc518b
2 changed files with 118 additions and 10 deletions

71
test.py
View File

@@ -1,18 +1,69 @@
from pymodbus.client import ModbusTcpClient import time
import struct import struct
from pymodbus.client import ModbusTcpClient
MODBUS_IP="10.0.0.40" MODBUS_IP = "10.0.0.40"
client=ModbusTcpClient(MODBUS_IP, port=502) SLAVE_ID = 1
client.connect() POLL = 2.0 # Sekunden
def u16_to_i16(u16):
return struct.unpack(">h", struct.pack(">H", u16 & 0xFFFF))[0]
def read_i16(client, addr, scale):
rr = client.read_input_registers(address=addr, count=1, slave=SLAVE_ID)
if rr.isError():
return None
raw = rr.registers[0]
if raw == 65535:
return None
return round(u16_to_i16(raw) / scale, 1)
def fmt(v):
return f"{v:5.1f}" if v is not None else " ---"
client = ModbusTcpClient(MODBUS_IP, port=502)
try: try:
rr = client.read_input_registers(30, count=3, slave=1) if not client.connect():
print("Raw 30..32:", rr.registers) raise RuntimeError("Modbus connect failed")
def as_int16(x): print("Logging temperatures (Ctrl+C to stop)\n")
return struct.unpack(">h", struct.pack(">H", x))[0]
while True:
# Eintrittsluft = Mittelwert aus 3x0324 & 3x0323 (scale 100)
t_e1 = read_i16(client, 324, 100)
t_e2 = read_i16(client, 323, 100)
t_ein = None
if t_e1 is not None and t_e2 is not None:
t_ein = round((t_e1 + t_e2) / 2, 1)
# Zuluft -> 3x0614 (/10)
t_zuluft = read_i16(client, 614, 10)
# Abluft -> Mittelwert aus 3x0581 & 3x0582 (/10)
t_a1 = read_i16(client, 581, 10)
t_a2 = read_i16(client, 582, 10)
t_abluft = None
if t_a1 is not None and t_a2 is not None:
t_abluft = round((t_a1 + t_a2) / 2, 1)
# Fortluft -> 3x0301 (/100)
t_fortluft = read_i16(client, 301, 100)
ts = time.strftime("%H:%M:%S")
print(
f"{ts} | "
f"Eintritt: {fmt(t_ein)} °C | "
f"Zuluft: {fmt(t_zuluft)} °C | "
f"Abluft: {fmt(t_abluft)} °C | "
f"Fortluft: {fmt(t_fortluft)} °C"
)
time.sleep(POLL)
except KeyboardInterrupt:
print("\nStopped by user.")
for i, raw in enumerate(rr.registers, start=30):
print(i, "raw", raw, "int16", as_int16(raw), "scaled", as_int16(raw)/10.0)
finally: finally:
client.close() client.close()

57
test_knx.py Normal file
View File

@@ -0,0 +1,57 @@
import asyncio
import logging
from datetime import datetime
from xknx import XKNX
from xknx.io import ConnectionConfig, ConnectionType
from xknx.devices import Sensor
logging.basicConfig(level=logging.INFO)
GA_TEMP = "0/0/8" # Außentemperatur
POLL_SECONDS = 60 # Abfrageintervall
TIMEOUT_SECONDS = 10 # Antwort-Timeout pro Read
async def main():
connection_config = ConnectionConfig(
connection_type=ConnectionType.TUNNELING,
gateway_ip="10.0.0.111",
gateway_port=3671,
local_ip="192.168.1.88",
route_back=True,
# Optional: festen UDP-Quellport setzen, falls NAT instabil wird
# local_port=50055,
)
async with XKNX(connection_config=connection_config, daemon_mode=True) as xknx:
temp = Sensor(
xknx=xknx,
name="Aussentemperatur",
group_address_state=GA_TEMP,
value_type="temperature",
sync_state=True,
)
xknx.devices.async_add(temp)
while True:
logging.info("Sende GroupValueRead an %s ...", GA_TEMP)
try:
try:
await temp.sync(wait_for_result=True, timeout=TIMEOUT_SECONDS)
except TypeError:
await asyncio.wait_for(temp.sync(wait_for_result=True), timeout=TIMEOUT_SECONDS)
value = temp.resolve_state()
ts = datetime.now().isoformat(timespec="seconds")
if value is None:
logging.warning("%s Aussentemperatur: None (keine verwertbare Antwort)", ts)
else:
logging.info("%s Aussentemperatur: %.2f °C", ts, value)
except asyncio.TimeoutError:
logging.warning("Timeout nach %ss: keine Antwort", TIMEOUT_SECONDS)
await asyncio.sleep(POLL_SECONDS)
if __name__ == "__main__":
asyncio.run(main())