3 Commits

13 changed files with 174 additions and 283 deletions

87
main.py
View File

@@ -26,56 +26,57 @@ db = DataBaseInflux(
bucket="allmende_db" bucket="allmende_db"
) )
hp_master = HeatPump(device_name='hp_master', ip_address='127.0.0.1', port=8111) # hp_master = HeatPump(device_name='hp_master', ip_address='10.0.0.10', port=502)
hp_slave = HeatPump(device_name='hp_slave', ip_address='127.0.0.1', port=8111) # hp_slave = HeatPump(device_name='hp_slave', ip_address='10.0.0.11', port=502)
shelly = ShellyPro3m(device_name='wohnung_2_6', ip_address='192.168.1.121') # shelly = ShellyPro3m(device_name='wohnung_2_6', ip_address='192.168.1.121')
wr = PvInverter(device_name='solaredge_master', ip_address='192.168.1.112') wr_master = PvInverter(device_name='solaredge_master', ip_address='192.168.1.112', unit=1)
wr_slave = PvInverter(device_name='solaredge_slave', ip_address='192.168.1.112', unit=3)
meter = SolaredgeMeter(device_name='solaredge_meter', ip_address='192.168.1.112') meter = SolaredgeMeter(device_name='solaredge_meter', ip_address='192.168.1.112')
es.add_components(hp_master, hp_slave, shelly, wr, meter) es.add_components(wr_master, wr_slave)#hp_master, hp_slave, shelly, wr_master, wr_slave, meter)
controller = SgReadyController(es) # controller = SgReadyController(es)
#
# FORECASTING # # FORECASTING
latitude = 48.041 # latitude = 48.041
longitude = 7.862 # longitude = 7.862
TZ = "Europe/Berlin" # TZ = "Europe/Berlin"
HORIZON_DAYS = 2 # HORIZON_DAYS = 2
weather_forecaster = WeatherForecaster(latitude=latitude, longitude=longitude) # weather_forecaster = WeatherForecaster(latitude=latitude, longitude=longitude)
site = Location(latitude=latitude, longitude=longitude, altitude=35, tz=TZ, name="Gundelfingen") # site = Location(latitude=latitude, longitude=longitude, altitude=35, tz=TZ, name="Gundelfingen")
#
p_module = 435 # p_module = 435
upper_roof_north = PvWattsSubarrayConfig(name="north", pdc0_w=(29+29+21)*p_module, tilt_deg=10, azimuth_deg=20, dc_loss=0.02, ac_loss=0.01) # upper_roof_north = PvWattsSubarrayConfig(name="north", pdc0_w=(29+29+21)*p_module, tilt_deg=10, azimuth_deg=20, dc_loss=0.02, ac_loss=0.01)
upper_roof_south = PvWattsSubarrayConfig(name="south", pdc0_w=(29+21+20)*p_module, tilt_deg=10, azimuth_deg=200, dc_loss=0.02, ac_loss=0.01) # upper_roof_south = PvWattsSubarrayConfig(name="south", pdc0_w=(29+21+20)*p_module, tilt_deg=10, azimuth_deg=200, dc_loss=0.02, ac_loss=0.01)
upper_roof_east = PvWattsSubarrayConfig(name="east", pdc0_w=7*p_module, tilt_deg=10, azimuth_deg=110, dc_loss=0.02, ac_loss=0.01) # upper_roof_east = PvWattsSubarrayConfig(name="east", pdc0_w=7*p_module, tilt_deg=10, azimuth_deg=110, dc_loss=0.02, ac_loss=0.01)
upper_roof_west = PvWattsSubarrayConfig(name="west", pdc0_w=7*p_module, tilt_deg=10, azimuth_deg=290, dc_loss=0.02, ac_loss=0.01) # upper_roof_west = PvWattsSubarrayConfig(name="west", pdc0_w=7*p_module, tilt_deg=10, azimuth_deg=290, dc_loss=0.02, ac_loss=0.01)
cfgs = [upper_roof_north, upper_roof_south, upper_roof_east, upper_roof_west] # cfgs = [upper_roof_north, upper_roof_south, upper_roof_east, upper_roof_west]
pv_plant = PvWattsPlant(site, cfgs) # pv_plant = PvWattsPlant(site, cfgs)
#
now = datetime.now() # now = datetime.now()
next_forecast_at = (now + dt.timedelta(hours=1)).replace(minute=0, second=0, microsecond=0) # next_forecast_at = (now + dt.timedelta(hours=1)).replace(minute=0, second=0, microsecond=0)
while True: while True:
now = datetime.now() now = datetime.now()
if now.second % interval_seconds == 0 and now.microsecond < 100_000: if now.second % interval_seconds == 0 and now.microsecond < 100_000:
state = es.get_state_and_store_to_database(db) state = es.get_state_and_store_to_database(db)
mode = controller.perform_action(heat_pump_name='hp_master', meter_name='solaredge_meter', state=state) # mode = controller.perform_action(heat_pump_name='hp_master', meter_name='solaredge_meter', state=state)
#
# if mode == 'mode1':
# mode_as_binary = 0
# else:
# mode_as_binary = 1
# db.store_data('sg_ready', {'mode': mode_as_binary})
if mode == 'mode1': # if now >= next_forecast_at:
mode_as_binary = 0 # # Start der Prognose: ab der kommenden vollen Stunde
else: # start_hour_local = (now + dt.timedelta(hours=1)).replace(minute=0, second=0, microsecond=0)
mode_as_binary = 1 # weather = weather_forecaster.get_hourly_forecast(start_hour_local, HORIZON_DAYS)
db.store_data('sg_ready', {'mode': mode_as_binary}) # total = pv_plant.get_power(weather)
# db.store_forecasts('pv_forecast', total)
if now >= next_forecast_at: #
# Start der Prognose: ab der kommenden vollen Stunde # # Nächste geplante Ausführung definieren (immer volle Stunde)
start_hour_local = (now + dt.timedelta(hours=1)).replace(minute=0, second=0, microsecond=0) # # Falls wir durch Delay mehrere Stunden verpasst haben, hole auf:
weather = weather_forecaster.get_hourly_forecast(start_hour_local, HORIZON_DAYS) # while next_forecast_at <= now:
total = pv_plant.get_power(weather) # next_forecast_at = (next_forecast_at + dt.timedelta(hours=1)).replace(minute=0, second=0, microsecond=0)
db.store_forecasts('pv_forecast', total)
# Nächste geplante Ausführung definieren (immer volle Stunde)
# Falls wir durch Delay mehrere Stunden verpasst haben, hole auf:
while next_forecast_at <= now:
next_forecast_at = (next_forecast_at + dt.timedelta(hours=1)).replace(minute=0, second=0, microsecond=0)
time.sleep(0.1) time.sleep(0.1)

View File

@@ -1,139 +1,155 @@
import time # pv_inverter.py
import struct # -*- coding: utf-8 -*-
import pandas as pd from typing import Optional, Dict, Any, List
from typing import Dict, Any, List, Tuple, Optional
from pymodbus.client import ModbusTcpClient from pymodbus.client import ModbusTcpClient
from pymodbus.exceptions import ModbusIOException
import struct
import time
EXCEL_PATH = "modbus_registers/pv_inverter_registers.xlsx"
# Obergrenze: bis EXKLUSIVE 40206 (d.h. max. 40205)
MAX_ADDR_EXCLUSIVE = 40121
class PvInverter: class PvInverter:
def __init__(self, device_name: str, ip_address: str, port: int = 502, unit: int = 1): """
self.device_name = device_name Minimaler Reader für einen SolarEdge-Inverter hinter Modbus-TCP→RTU-Gateway.
self.ip = ip_address Liest nur die bekannten Register (wie im funktionierenden Skript).
self.port = port Kompatibel mit pymodbus 2.5.x und 3.x kein retry_on_empty.
self.unit = unit """
self.client: Optional[ModbusTcpClient] = None
self.registers: Dict[int, Dict[str, Any]] = {} # addr -> {"desc":..., "type":...}
self.connect_to_modbus()
self.load_registers(EXCEL_PATH)
# ---------- Verbindung ---------- def __init__(
def connect_to_modbus(self): self,
self.client = ModbusTcpClient(self.ip, port=self.port, timeout=3.0, retries=3) device_name: str,
ip_address: str,
port: int = 502,
unit_id: int = 1,
timeout: float = 1.5,
silent_interval: float = 0.02,
):
self.device_name = device_name
self.host = ip_address
self.port = port
self.unit = unit_id
self.timeout = timeout
self.silent_interval = silent_interval
self.client: Optional[ModbusTcpClient] = None
self._connect()
# ---------------- Verbindung ----------------
def _connect(self):
# retries=0: keine internen Mehrfachversuche
self.client = ModbusTcpClient(self.host, port=self.port, timeout=self.timeout, retries=0)
if not self.client.connect(): if not self.client.connect():
print("Verbindung zu Wechselrichter fehlgeschlagen.") raise ConnectionError(f"Verbindung zu {self.device_name} ({self.host}:{self.port}) fehlgeschlagen.")
raise SystemExit(1) print(f"✅ Verbindung hergestellt zu {self.device_name} ({self.host}:{self.port}, unit={self.unit})")
print("✅ Verbindung zu Wechselrichter hergestellt.")
def close(self): def close(self):
if self.client: if self.client:
self.client.close() self.client.close()
self.client = None self.client = None
# ---------- Register-Liste ---------- # ---------------- Low-Level Lesen ----------------
def load_registers(self, excel_path: str): def _read_regs(self, addr: int, count: int) -> Optional[List[int]]:
xls = pd.ExcelFile(excel_path) """Liest 'count' Holding-Register ab base-0 'addr' für die konfigurierte Unit-ID."""
df = xls.parse()
# Passe Spaltennamen hier an, falls nötig:
cols = ["MB Adresse", "Beschreibung", "Variabel Typ"]
df = df[cols].dropna()
df["MB Adresse"] = df["MB Adresse"].astype(int)
# 1) Vorab-Filter: nur Adressen < 40206 übernehmen
df = df[df["MB Adresse"] < MAX_ADDR_EXCLUSIVE]
self.registers = {
int(row["MB Adresse"]): {
"desc": str(row["Beschreibung"]).strip(),
"type": str(row["Variabel Typ"]).strip()
}
for _, row in df.iterrows()
}
# ---------- Low-Level Lesen ----------
def _try_read(self, fn_name: str, address: int, count: int) -> Optional[List[int]]:
fn = getattr(self.client, fn_name)
# pymodbus 3.8.x hat 'slave='; Fallbacks schaden nicht
for kwargs in (dict(address=address, count=count, slave=self.unit),
dict(address=address, count=count)):
try: try:
res = fn(**kwargs) rr = self.client.read_holding_registers(address=addr, count=count, slave=self.unit)
if res is None or (hasattr(res, "isError") and res.isError()): except ModbusIOException:
continue time.sleep(self.silent_interval)
return res.registers return None
except TypeError: except Exception:
continue time.sleep(self.silent_interval)
return None return None
def _read_any(self, address: int, count: int) -> Optional[List[int]]: time.sleep(self.silent_interval)
regs = self._try_read("read_holding_registers", address, count) if not rr or rr.isError():
if regs is None: return None
regs = self._try_read("read_input_registers", address, count) return rr.registers
return regs
# ---------- Decoding ----------
@staticmethod @staticmethod
def _to_i16(u16: int) -> int: def _to_int16(u16: int) -> int:
return struct.unpack(">h", struct.pack(">H", u16))[0] return struct.unpack(">h", struct.pack(">H", u16))[0]
@staticmethod @staticmethod
def _to_f32_from_two(u16_hi: int, u16_lo: int, msw_first: bool = True) -> float: def _apply_sf(raw: int, sf: int) -> float:
b = struct.pack(">HH", u16_hi, u16_lo) if msw_first else struct.pack(">HH", u16_lo, u16_hi) return raw * (10 ** sf)
return struct.unpack(">f", b)[0]
# Hilfsfunktion: wie viele 16-Bit-Register braucht dieser Typ?
@staticmethod @staticmethod
def _word_count_for_type(rtype: str) -> int: def _read_string_from_regs(regs: List[int]) -> Optional[str]:
rt = (rtype or "").lower() b = b"".join(struct.pack(">H", r) for r in regs)
# Passe hier an deine Excel-Typen an: s = b.decode("ascii", errors="ignore").rstrip("\x00 ").strip()
if "uint32" in rt or "real" in rt or "float" in rt or "string(32)" in rt: return s or None
return 2
# Default: 1 Wort (z.B. int16/uint16)
return 1
def read_one(self, address_excel: int, rtype: str) -> Optional[float]: # ---------------- Hilfsfunktionen ----------------
""" def _read_string(self, addr: int, words: int) -> Optional[str]:
Liest einen Wert nach Typ ('INT' oder 'REAL' etc.). regs = self._read_regs(addr, words)
Es werden ausschließlich Register < 40206 gelesen. if regs is None:
"""
addr = int(address_excel)
words = self._word_count_for_type(rtype)
# 2) Harte Grenze prüfen: höchstes angefasstes Register muss < 40206 sein
if addr + words - 1 >= MAX_ADDR_EXCLUSIVE:
# Überspringen, da der Lesevorgang die Grenze >= 40206 berühren würde
return None return None
return self._read_string_from_regs(regs)
if words == 2: def _read_scaled(self, value_addr: int, sf_addr: int) -> Optional[float]:
regs = self._read_any(addr, 2) regs = self._read_regs(value_addr, 1)
if not regs or len(regs) < 2: sf = self._read_regs(sf_addr, 1)
if regs is None or sf is None:
return None return None
# Deine bisherige Logik interpretiert 2 Worte als Float32: raw = self._to_int16(regs[0])
return self._to_f32_from_two(regs[0], regs[1]) sff = self._to_int16(sf[0])
else: return self._apply_sf(raw, sff)
regs = self._read_any(addr, 1)
if not regs:
return None
return float(self._to_i16(regs[0]))
def _read_u32_with_sf(self, value_addr: int, sf_addr: int) -> Optional[float]:
regs = self._read_regs(value_addr, 2)
sf = self._read_regs(sf_addr, 1)
if regs is None or sf is None:
return None
u32 = (regs[0] << 16) | regs[1]
sff = self._to_int16(sf[0])
return self._apply_sf(u32, sff)
# ---------------- Öffentliche API ----------------
def get_state(self) -> Dict[str, Any]: def get_state(self) -> Dict[str, Any]:
""" """Liest exakt die bekannten Register und gibt ein Dict zurück."""
Liest ALLE Register aus self.registers und gibt dict zurück. state: Dict[str, Any] = {}
Achtet darauf, dass keine Adresse (inkl. Mehrwort) >= 40206 gelesen wird.
""" # --- Common Block ---
data = {"Zeit": time.strftime("%Y-%m-%d %H:%M:%S")} state["C_Manufacturer"] = self._read_string(40004, 16)
for address, meta in sorted(self.registers.items()): state["C_Model"] = self._read_string(40020, 16)
words = self._word_count_for_type(meta["type"]) state["C_Version"] = self._read_string(40044, 8)
# 3) Nochmals Schutz auf Ebene der Iteration: state["C_SerialNumber"] = self._read_string(40052, 16)
if address + words - 1 >= MAX_ADDR_EXCLUSIVE:
continue # --- Inverter Block ---
val = self.read_one(address, meta["type"]) state["I_AC_Power_W"] = self._read_scaled(40083, 40084)
if val is None: state["I_AC_Voltage_V"] = self._read_scaled(40079, 40082)
continue state["I_AC_Frequency_Hz"] = self._read_scaled(40085, 40086)
key = f"{address} - {meta['desc']}" state["I_DC_Power_W"] = self._read_scaled(40100, 40101)
data[key] = val state["I_AC_Energy_Wh_total"] = self._read_u32_with_sf(40093, 40095)
return data
status_regs = self._read_regs(40107, 2)
if status_regs:
state["I_Status"] = status_regs[0]
state["I_Status_Vendor"] = status_regs[1]
else:
state["I_Status"] = None
state["I_Status_Vendor"] = None
return state
# ---------------- Beispiel ----------------
if __name__ == "__main__":
MODBUS_IP = "192.168.1.112"
MODBUS_PORT = 502
master = PvInverter("solaredge_master", MODBUS_IP, port=MODBUS_PORT, unit_id=1)
slave = PvInverter("solaredge_slave", MODBUS_IP, port=MODBUS_PORT, unit_id=3)
try:
sm = master.get_state()
ss = slave.get_state()
print("\n=== MASTER ===")
for k, v in sm.items():
print(f"{k:22s}: {v}")
print("\n=== SLAVE ===")
for k, v in ss.items():
print(f"{k:22s}: {v}")
finally:
master.close()
slave.close()

69
test.py
View File

@@ -1,69 +0,0 @@
import time
import struct
from pymodbus.client import ModbusTcpClient
MODBUS_IP = "10.0.0.40"
SLAVE_ID = 1
POLL = 2.0 # Sekunden
def u16_to_i16(u16):
return struct.unpack(">h", struct.pack(">H", u16 & 0xFFFF))[0]
def read_i16(client, addr, scale):
rr = client.read_input_registers(address=addr, count=1, slave=SLAVE_ID)
if rr.isError():
return None
raw = rr.registers[0]
if raw == 65535:
return None
return round(u16_to_i16(raw) / scale, 1)
def fmt(v):
return f"{v:5.1f}" if v is not None else " ---"
client = ModbusTcpClient(MODBUS_IP, port=502)
try:
if not client.connect():
raise RuntimeError("Modbus connect failed")
print("Logging temperatures (Ctrl+C to stop)\n")
while True:
# Eintrittsluft = Mittelwert aus 3x0324 & 3x0323 (scale 100)
t_e1 = read_i16(client, 324, 100)
t_e2 = read_i16(client, 323, 100)
t_ein = None
if t_e1 is not None and t_e2 is not None:
t_ein = round((t_e1 + t_e2) / 2, 1)
# Zuluft -> 3x0614 (/10)
t_zuluft = read_i16(client, 614, 10)
# Abluft -> Mittelwert aus 3x0581 & 3x0582 (/10)
t_a1 = read_i16(client, 581, 10)
t_a2 = read_i16(client, 582, 10)
t_abluft = None
if t_a1 is not None and t_a2 is not None:
t_abluft = round((t_a1 + t_a2) / 2, 1)
# Fortluft -> 3x0301 (/100)
t_fortluft = read_i16(client, 301, 100)
ts = time.strftime("%H:%M:%S")
print(
f"{ts} | "
f"Eintritt: {fmt(t_ein)} °C | "
f"Zuluft: {fmt(t_zuluft)} °C | "
f"Abluft: {fmt(t_abluft)} °C | "
f"Fortluft: {fmt(t_fortluft)} °C"
)
time.sleep(POLL)
except KeyboardInterrupt:
print("\nStopped by user.")
finally:
client.close()

View File

@@ -1,57 +0,0 @@
import asyncio
import logging
from datetime import datetime
from xknx import XKNX
from xknx.io import ConnectionConfig, ConnectionType
from xknx.devices import Sensor
logging.basicConfig(level=logging.INFO)
GA_TEMP = "0/0/8" # Außentemperatur
POLL_SECONDS = 60 # Abfrageintervall
TIMEOUT_SECONDS = 10 # Antwort-Timeout pro Read
async def main():
connection_config = ConnectionConfig(
connection_type=ConnectionType.TUNNELING,
gateway_ip="10.0.0.111",
gateway_port=3671,
local_ip="192.168.1.88",
route_back=True,
# Optional: festen UDP-Quellport setzen, falls NAT instabil wird
# local_port=50055,
)
async with XKNX(connection_config=connection_config, daemon_mode=True) as xknx:
temp = Sensor(
xknx=xknx,
name="Aussentemperatur",
group_address_state=GA_TEMP,
value_type="temperature",
sync_state=True,
)
xknx.devices.async_add(temp)
while True:
logging.info("Sende GroupValueRead an %s ...", GA_TEMP)
try:
try:
await temp.sync(wait_for_result=True, timeout=TIMEOUT_SECONDS)
except TypeError:
await asyncio.wait_for(temp.sync(wait_for_result=True), timeout=TIMEOUT_SECONDS)
value = temp.resolve_state()
ts = datetime.now().isoformat(timespec="seconds")
if value is None:
logging.warning("%s Aussentemperatur: None (keine verwertbare Antwort)", ts)
else:
logging.info("%s Aussentemperatur: %.2f °C", ts, value)
except asyncio.TimeoutError:
logging.warning("Timeout nach %ss: keine Antwort", TIMEOUT_SECONDS)
await asyncio.sleep(POLL_SECONDS)
if __name__ == "__main__":
asyncio.run(main())