2 Commits

Author SHA1 Message Date
Nils Reiners
1348329a24 aktueller stand 2025-04-26 21:19:21 +01:00
Nils Reiners
e2c3d208de update data 2025-04-19 08:22:30 +01:00
33 changed files with 5160559 additions and 879 deletions

38
README
View File

@@ -11,42 +11,10 @@ Was needs to be done on the Raspberry pi before the tool can run.
- pip install -r requirements.txt
3) How to run the script for testing:
How to run the script:
nohup python main.py > terminal_log 2>&1 &
- nohup python main.py > terminal_log 2>&1 &
For reading out the terminal_log while script is runing:
tail -f terminal_log
4) Implement and run the ems as systemd service:
create:
/etc/systemd/system/allmende_ems.service
insert:
[Unit]
Description=Allmende EMS Python Script
After=network.target
[Service]
WorkingDirectory=/home/pi/projects/allmende_ems
ExecStart=/home/pi/allmende_ems/bin/python3.11 /home/pi/projects/allmende_ems/main.py
Restart=always
RestartSec=5
StandardOutput=journal
StandardError=journal
[Install]
WantedBy=multi-user.target
manage the service with the following commands:
Once:
sudo systemctl daemon-reload
sudo systemctl start allmende_ems.service
sudo systemctl enable allmende_ems.service
While running:
sudo systemctl status allmende_ems.service
sudo systemctl restart allmende_ems.service
sudo systemctl stop allmende_ems.service
journalctl -u allmende_ems.service
- tail -f terminal_log

46
data_base_csv.py Normal file
View File

@@ -0,0 +1,46 @@
import csv
import os
import tempfile
import shutil
class DataBaseCsv:
def __init__(self, filename: str):
self.filename = filename
def store_data(self, data: dict):
new_fields = list(data.keys())
# If file does not exist or is empty → create new file with header
if not os.path.exists(self.filename) or os.path.getsize(self.filename) == 0:
with open(self.filename, mode='w', newline='') as csv_file:
writer = csv.DictWriter(csv_file, fieldnames=new_fields)
writer.writeheader()
writer.writerow(data)
return
# If file exists → read existing header and data
with open(self.filename, mode='r', newline='') as csv_file:
reader = csv.DictReader(csv_file)
existing_fields = reader.fieldnames
existing_data = list(reader)
# Merge old and new fields (keep original order, add new ones)
all_fields = existing_fields.copy()
for field in new_fields:
if field not in all_fields:
all_fields.append(field)
# Write to a temporary file with updated header
with tempfile.NamedTemporaryFile(mode='w', delete=False, newline='', encoding='utf-8') as tmp_file:
writer = csv.DictWriter(tmp_file, fieldnames=all_fields)
writer.writeheader()
# Write old rows with updated field list
for row in existing_data:
writer.writerow({field: row.get(field, '') for field in all_fields})
# Write new data row
writer.writerow({field: data.get(field, '') for field in all_fields})
# Replace original file with updated temporary file
shutil.move(tmp_file.name, self.filename)

View File

@@ -1,48 +0,0 @@
from influxdb_client import InfluxDBClient, Point, WritePrecision
from datetime import datetime
import datetime as dt
import pandas as pd
class DataBaseInflux:
def __init__(self, url: str, token: str, org: str, bucket: str):
self.url = url
self.token = token
self.org = org
self.bucket = bucket
self.client = InfluxDBClient(url=self.url, token=self.token, org=self.org)
self.write_api = self.client.write_api()
def store_data(self, device_name: str, data: dict):
measurement = device_name # Fest auf "messungen" gesetzt
point = Point(measurement)
# Alle Key/Value-Paare als Fields speichern
for key, value in data.items():
point = point.field(key, value)
# Zeitstempel automatisch auf jetzt setzen
point = point.time(datetime.utcnow(), WritePrecision.NS)
# Punkt in InfluxDB schreiben
self.write_api.write(bucket=self.bucket, org=self.org, record=point)
def store_forecasts(self, forecast_name: str, data: pd.Series):
measurement = forecast_name
run_tag = dt.datetime.now(dt.timezone.utc).replace(second=0, microsecond=0).isoformat(timespec="minutes")
pts = []
series = pd.to_numeric(data, errors="coerce").dropna()
for ts, val in series.items():
pts.append(
Point(measurement)
.tag("run", run_tag)
.field("value", float(val))
.time(ts.to_pydatetime(), WritePrecision.S)
)
self.write_api.write(bucket=self.bucket, org=self.org, record=pts)

View File

@@ -1,25 +0,0 @@
class EnergySystem():
def __init__(self):
self.components = []
def add_components(self, *args):
for comp in args:
self.components.append(comp)
def get_state_and_store_to_database(self, db):
state = {}
for comp in self.components:
component_state = comp.get_state()
state[comp.device_name] = component_state
db.store_data(comp.device_name, component_state)
return state
def get_component_by_name(self, name):
for comp in self.components:
if comp.device_name == name:
return comp

View File

@@ -1,61 +0,0 @@
#!/usr/bin/env python3
import time
import datetime as dt
import requests
from zoneinfo import ZoneInfo
from matplotlib import pyplot as plt
import pandas as pd
TZ = "Europe/Berlin"
DAYS = 2
OPEN_METEO_URL = "https://api.open-meteo.com/v1/forecast"
class WeatherForecaster:
def __init__(self, latitude, longitude):
self.lat = latitude
self.lon = longitude
def get_hourly_forecast(self, start_hour, days):
start_hour_local = start_hour
end_hour_local = start_hour_local + dt.timedelta(days=days)
params = {
"latitude": self.lat,
"longitude": self.lon,
"hourly": ["temperature_2m", "shortwave_radiation", "wind_speed_10m"],
"timezone": TZ,
"start_hour": start_hour_local.strftime("%Y-%m-%dT%H:%M"),
"end_hour": end_hour_local.strftime("%Y-%m-%dT%H:%M")
}
h = requests.get(OPEN_METEO_URL, params=params).json()["hourly"]
time_stamps = h["time"]
time_stamps = [
dt.datetime.fromisoformat(t).replace(tzinfo=ZoneInfo(TZ))
for t in time_stamps
]
weather = pd.DataFrame(index=time_stamps)
weather["ghi"] = h["shortwave_radiation"]
weather["temp_air"] = h["temperature_2m"]
weather["wind_speed"] = h["wind_speed_10m"]
return weather
if __name__=='__main__':
weather_forecast = WeatherForecaster(latitude=48.041, longitude=7.862)
while True:
now = dt.datetime.now()
secs = 60 - now.second #(60 - now.minute) * 60 - now.second # Sekunden bis volle Stunde
time.sleep(secs)
now_local = dt.datetime.now()
start_hour_local = (now_local + dt.timedelta(hours=1)).replace(minute=0, second=0, microsecond=0)
time_stamps, temps, ghi, wind_speed = weather_forecast.get_hourly_forecast(start_hour_local, DAYS)
plt.plot(time_stamps, temps)
plt.show()

View File

@@ -3,17 +3,15 @@ import pandas as pd
import time
class HeatPump:
def __init__(self, device_name: str, ip_address: str, port: int=502):
self.device_name = device_name
def __init__(self, ip_address: str):
self.ip = ip_address
self.port = port
self.client = None
self.connect_to_modbus()
self.registers = None
self.get_registers()
def connect_to_modbus(self):
port = self.port
port = 502
self.client = ModbusTcpClient(self.ip, port=port)
try:
if not self.client.connect():
@@ -27,7 +25,7 @@ class HeatPump:
def get_registers(self):
# Excel-Datei mit den Input-Registerinformationen
excel_path = "modbus_registers/heat_pump_registers.xlsx"
excel_path = "data/ModBus TCPIP 1.17(1).xlsx"
xls = pd.ExcelFile(excel_path)
df_input_registers = xls.parse('04 Input Register')
@@ -44,7 +42,7 @@ class HeatPump:
for _, row in df_clean.iterrows()
}
def get_state(self):
def get_data(self):
data = {}
data['Zeit'] = time.strftime('%Y-%m-%d %H:%M:%S')
for address, info in self.registers.items():

77
main.py
View File

@@ -1,82 +1,17 @@
import time
from datetime import datetime
from data_base_influx import DataBaseInflux
from forecaster.weather_forecaster import WeatherForecaster
from data_base_csv import DataBaseCsv
from heat_pump import HeatPump
from pv_inverter import PvInverter
from simulators.pv_plant_simulator import PvWattsSubarrayConfig, PvWattsPlant
from solaredge_meter import SolaredgeMeter
from shelly_pro_3m import ShellyPro3m
from energysystem import EnergySystem
from sg_ready_controller import SgReadyController
from pvlib.location import Location
import datetime as dt
# For dev-System run in terminal: ssh -N -L 127.0.0.1:8111:10.0.0.10:502 pi@192.168.1.146
# For productive-System change IP-adress in heatpump to '10.0.0.10' and port to 502
interval = 10 # z.B. alle 10 Sekunden
interval_seconds = 10
db = DataBaseCsv('modbus_log.csv')
hp = HeatPump(ip_address='10.0.0.10')
es = EnergySystem()
db = DataBaseInflux(
url="http://192.168.1.146:8086",
token="Cw_naEZyvJ3isiAh1P4Eq3TsjcHmzzDFS7SlbKDsS6ZWL04fMEYixWqtNxGThDdG27S9aW5g7FP9eiq5z1rsGA==",
org="allmende",
bucket="allmende_db"
)
hp_master = HeatPump(device_name='hp_master', ip_address='127.0.0.1', port=8111)
hp_slave = HeatPump(device_name='hp_slave', ip_address='127.0.0.1', port=8111)
shelly = ShellyPro3m(device_name='wohnung_2_6', ip_address='192.168.1.121')
wr = PvInverter(device_name='solaredge_master', ip_address='192.168.1.112')
meter = SolaredgeMeter(device_name='solaredge_meter', ip_address='192.168.1.112')
es.add_components(hp_master, hp_slave, shelly, wr, meter)
controller = SgReadyController(es)
# FORECASTING
latitude = 48.041
longitude = 7.862
TZ = "Europe/Berlin"
HORIZON_DAYS = 2
weather_forecaster = WeatherForecaster(latitude=latitude, longitude=longitude)
site = Location(latitude=latitude, longitude=longitude, altitude=35, tz=TZ, name="Gundelfingen")
p_module = 435
upper_roof_north = PvWattsSubarrayConfig(name="north", pdc0_w=(29+29+21)*p_module, tilt_deg=10, azimuth_deg=20, dc_loss=0.02, ac_loss=0.01)
upper_roof_south = PvWattsSubarrayConfig(name="south", pdc0_w=(29+21+20)*p_module, tilt_deg=10, azimuth_deg=200, dc_loss=0.02, ac_loss=0.01)
upper_roof_east = PvWattsSubarrayConfig(name="east", pdc0_w=7*p_module, tilt_deg=10, azimuth_deg=110, dc_loss=0.02, ac_loss=0.01)
upper_roof_west = PvWattsSubarrayConfig(name="west", pdc0_w=7*p_module, tilt_deg=10, azimuth_deg=290, dc_loss=0.02, ac_loss=0.01)
cfgs = [upper_roof_north, upper_roof_south, upper_roof_east, upper_roof_west]
pv_plant = PvWattsPlant(site, cfgs)
now = datetime.now()
next_forecast_at = (now + dt.timedelta(hours=1)).replace(minute=0, second=0, microsecond=0)
while True:
now = datetime.now()
if now.second % interval_seconds == 0 and now.microsecond < 100_000:
state = es.get_state_and_store_to_database(db)
mode = controller.perform_action(heat_pump_name='hp_master', meter_name='solaredge_meter', state=state)
if mode == 'mode1':
mode_as_binary = 0
else:
mode_as_binary = 1
db.store_data('sg_ready', {'mode': mode_as_binary})
if now >= next_forecast_at:
# Start der Prognose: ab der kommenden vollen Stunde
start_hour_local = (now + dt.timedelta(hours=1)).replace(minute=0, second=0, microsecond=0)
weather = weather_forecaster.get_hourly_forecast(start_hour_local, HORIZON_DAYS)
total = pv_plant.get_power(weather)
db.store_forecasts('pv_forecast', total)
# Nächste geplante Ausführung definieren (immer volle Stunde)
# Falls wir durch Delay mehrere Stunden verpasst haben, hole auf:
while next_forecast_at <= now:
next_forecast_at = (next_forecast_at + dt.timedelta(hours=1)).replace(minute=0, second=0, microsecond=0)
if now.second % interval == 0 and now.microsecond < 100_000:
db.store_data(hp.get_data())
time.sleep(0.1)

51554
modbus_log.csv Normal file

File diff suppressed because one or more lines are too long

View File

@@ -1,139 +0,0 @@
import time
import struct
import pandas as pd
from typing import Dict, Any, List, Tuple, Optional
from pymodbus.client import ModbusTcpClient
EXCEL_PATH = "modbus_registers/pv_inverter_registers.xlsx"
# Obergrenze: bis EXKLUSIVE 40206 (d.h. max. 40205)
MAX_ADDR_EXCLUSIVE = 40121
class PvInverter:
def __init__(self, device_name: str, ip_address: str, port: int = 502, unit: int = 1):
self.device_name = device_name
self.ip = ip_address
self.port = port
self.unit = unit
self.client: Optional[ModbusTcpClient] = None
self.registers: Dict[int, Dict[str, Any]] = {} # addr -> {"desc":..., "type":...}
self.connect_to_modbus()
self.load_registers(EXCEL_PATH)
# ---------- Verbindung ----------
def connect_to_modbus(self):
self.client = ModbusTcpClient(self.ip, port=self.port, timeout=3.0, retries=3)
if not self.client.connect():
print("❌ Verbindung zu Wechselrichter fehlgeschlagen.")
raise SystemExit(1)
print("✅ Verbindung zu Wechselrichter hergestellt.")
def close(self):
if self.client:
self.client.close()
self.client = None
# ---------- Register-Liste ----------
def load_registers(self, excel_path: str):
xls = pd.ExcelFile(excel_path)
df = xls.parse()
# Passe Spaltennamen hier an, falls nötig:
cols = ["MB Adresse", "Beschreibung", "Variabel Typ"]
df = df[cols].dropna()
df["MB Adresse"] = df["MB Adresse"].astype(int)
# 1) Vorab-Filter: nur Adressen < 40206 übernehmen
df = df[df["MB Adresse"] < MAX_ADDR_EXCLUSIVE]
self.registers = {
int(row["MB Adresse"]): {
"desc": str(row["Beschreibung"]).strip(),
"type": str(row["Variabel Typ"]).strip()
}
for _, row in df.iterrows()
}
# ---------- Low-Level Lesen ----------
def _try_read(self, fn_name: str, address: int, count: int) -> Optional[List[int]]:
fn = getattr(self.client, fn_name)
# pymodbus 3.8.x hat 'slave='; Fallbacks schaden nicht
for kwargs in (dict(address=address, count=count, slave=self.unit),
dict(address=address, count=count)):
try:
res = fn(**kwargs)
if res is None or (hasattr(res, "isError") and res.isError()):
continue
return res.registers
except TypeError:
continue
return None
def _read_any(self, address: int, count: int) -> Optional[List[int]]:
regs = self._try_read("read_holding_registers", address, count)
if regs is None:
regs = self._try_read("read_input_registers", address, count)
return regs
# ---------- Decoding ----------
@staticmethod
def _to_i16(u16: int) -> int:
return struct.unpack(">h", struct.pack(">H", u16))[0]
@staticmethod
def _to_f32_from_two(u16_hi: int, u16_lo: int, msw_first: bool = True) -> float:
b = struct.pack(">HH", u16_hi, u16_lo) if msw_first else struct.pack(">HH", u16_lo, u16_hi)
return struct.unpack(">f", b)[0]
# Hilfsfunktion: wie viele 16-Bit-Register braucht dieser Typ?
@staticmethod
def _word_count_for_type(rtype: str) -> int:
rt = (rtype or "").lower()
# Passe hier an deine Excel-Typen an:
if "uint32" in rt or "real" in rt or "float" in rt or "string(32)" in rt:
return 2
# Default: 1 Wort (z.B. int16/uint16)
return 1
def read_one(self, address_excel: int, rtype: str) -> Optional[float]:
"""
Liest einen Wert nach Typ ('INT' oder 'REAL' etc.).
Es werden ausschließlich Register < 40206 gelesen.
"""
addr = int(address_excel)
words = self._word_count_for_type(rtype)
# 2) Harte Grenze prüfen: höchstes angefasstes Register muss < 40206 sein
if addr + words - 1 >= MAX_ADDR_EXCLUSIVE:
# Überspringen, da der Lesevorgang die Grenze >= 40206 berühren würde
return None
if words == 2:
regs = self._read_any(addr, 2)
if not regs or len(regs) < 2:
return None
# Deine bisherige Logik interpretiert 2 Worte als Float32:
return self._to_f32_from_two(regs[0], regs[1])
else:
regs = self._read_any(addr, 1)
if not regs:
return None
return float(self._to_i16(regs[0]))
def get_state(self) -> Dict[str, Any]:
"""
Liest ALLE Register aus self.registers und gibt dict zurück.
Achtet darauf, dass keine Adresse (inkl. Mehrwort) >= 40206 gelesen wird.
"""
data = {"Zeit": time.strftime("%Y-%m-%d %H:%M:%S")}
for address, meta in sorted(self.registers.items()):
words = self._word_count_for_type(meta["type"])
# 3) Nochmals Schutz auf Ebene der Iteration:
if address + words - 1 >= MAX_ADDR_EXCLUSIVE:
continue
val = self.read_one(address, meta["type"])
if val is None:
continue
key = f"{address} - {meta['desc']}"
data[key] = val
return data

View File

@@ -1,5 +1,3 @@
pymodbus~=3.8.6
pandas
openpyxl
sshtunnel
pvlib

View File

@@ -1,65 +0,0 @@
from pymodbus.client import ModbusTcpClient
class SgReadyController():
def __init__(self, es):
self.es = es
def perform_action(self, heat_pump_name, meter_name, state):
hp = self.es.get_component_by_name(heat_pump_name)
meter_values = state[meter_name]
power_to_grid = meter_values['40206 - M_AC_Power'] * 10 ** meter_values['40210 - M_AC_Power_SF']
mode = None
if power_to_grid > 10000:
mode = 'mode2'
self.switch_sg_ready_mode(hp.ip, hp.port, mode)
elif power_to_grid < 0:
mode = 'mode1'
self.switch_sg_ready_mode(hp.ip, hp.port, mode)
return mode
def switch_sg_ready_mode(self, ip, port, mode):
"""
Register 300: 1=BUS 0= Hardware Kontakte
Register 301 & 302:
0-0= Kein Offset
0-1 Boiler und Heizung Offset
1-1 Boiler Offset + E-Einsatz Sollwert Erhöht
1-0 SG EVU Sperre
:param ip:
:param mode:
'mode1' = [True, False, False] => SG Ready deactivated
'mode2' = [True, False, True] => SG ready activated for heatpump only
'mode3' = [True, True, True] => SG ready activated for heatpump and heat rod
:return:
"""
client = ModbusTcpClient(ip, port=port)
if not client.connect():
print("Verbindung zur Wärmepumpe fehlgeschlagen.")
return
mode_code = None
if mode == 'mode1':
mode_code = [True, False, False]
elif mode == 'mode2':
mode_code = [True, False, True]
elif mode == 'mode3':
mode_code = [True, True, True]
else:
print('Uncorrect or no string for mode!')
try:
response_300 = client.write_coil(300, mode_code[0])
response_301 = client.write_coil(301, mode_code[1])
response_302 = client.write_coil(302, mode_code[2])
# Optional: Rückmeldungen prüfen
for addr, resp in zip([300, 301, 302], [response_300, response_301, response_302]):
if resp.isError():
print(f"Fehler beim Schreiben von Coil {addr}: {resp}")
else:
print(f"Coil {addr} erfolgreich geschrieben.")
finally:
client.close()

View File

@@ -1,64 +0,0 @@
import struct
from pymodbus.client import ModbusTcpClient
import pandas as pd
import time
class ShellyPro3m:
def __init__(self, device_name: str, ip_address: str, port: int=502):
self.device_name = device_name
self.ip = ip_address
self.port = port
self.client = None
self.connect_to_modbus()
self.registers = None
self.get_registers()
def connect_to_modbus(self):
port = self.port
self.client = ModbusTcpClient(self.ip, port=port)
try:
if not self.client.connect():
print("Verbindung zum Shelly-Logger fehlgeschlagen.")
exit(1)
print("Verbindung zum Shelly-Logger erfolgreich.")
except KeyboardInterrupt:
print("Beendet durch Benutzer (Ctrl+C).")
finally:
self.client.close()
def get_registers(self):
# Excel-Datei mit den Input-Registerinformationen
excel_path = "modbus_registers/shelly_pro_3m_registers.xlsx"
xls = pd.ExcelFile(excel_path)
df_input_registers = xls.parse()
# Relevante Spalten bereinigen
df_clean = df_input_registers[['MB Adresse', 'Beschreibung', 'Variabel Typ']].dropna()
df_clean['MB Adresse'] = df_clean['MB Adresse'].astype(int)
# Dictionary aus Excel erzeugen
self.registers = {
row['MB Adresse']: {
'desc': row['Beschreibung'],
'type': 'REAL' if row['Variabel Typ'] == 'REAL' else 'INT'
}
for _, row in df_clean.iterrows()
}
def get_state(self):
data = {}
data['Zeit'] = time.strftime('%Y-%m-%d %H:%M:%S')
for address, info in self.registers.items():
reg_type = info['type']
result = self.client.read_input_registers(address, count=2 if reg_type == 'REAL' else 1)
if result.isError():
print(f"Fehler beim Lesen von Adresse {address}: {result}")
continue
packed = struct.pack(">HH", result.registers[1], result.registers[0])
value = round(struct.unpack(">f", packed)[0], 2)
print(f"Adresse {address} - {info['desc']}: {value}")
data[f"{address} - {info['desc']}"] = value
return data

View File

@@ -1,210 +0,0 @@
from __future__ import annotations
from dataclasses import dataclass
from typing import Optional, Dict, List, Literal, Tuple, Union
import numpy as np
import pandas as pd
import pvlib
import matplotlib.pyplot as plt
from pvlib.location import Location
from pvlib.pvsystem import PVSystem
from pvlib.modelchain import ModelChain
SeriesOrArray = Union[pd.Series, np.ndarray]
# ----------------------------- Konfiguration -----------------------------
@dataclass
class PvWattsSubarrayConfig:
name: str
pdc0_w: float # STC-DC-Leistung [W]
tilt_deg: float # Neigung (0=horizontal)
azimuth_deg: float # Azimut (180=Süd)
gamma_pdc: float = -0.004 # Tempkoeff. [1/K]
eta_inv_nom: float = 0.96 # WR-Wirkungsgrad (nominal)
albedo: float = 0.2 # Bodenreflexion
# Pauschale Verluste (PVWatts-Losses)
dc_loss: float = 0.0
ac_loss: float = 0.0
soiling: float = 0.0
# Modell
transposition_model: Literal["perez","haydavies","isotropic","klucher","reindl"] = "perez"
# ------------------------------ Subarray ---------------------------------
class PvWattsSubarray:
"""
Ein Subarray mit pvlib.ModelChain (PVWatts).
Berechnet automatisch DNI/DHI aus GHI (ERBS-Methode)
und nutzt ein SAPM-Temperaturmodell.
"""
def __init__(self, cfg: PvWattsSubarrayConfig, location: Location):
self.cfg = cfg
self.location = location
self._mc: Optional[ModelChain] = None
# ---------------------------------------------------------------------
def _create_modelchain(self) -> ModelChain:
"""Erzeuge eine pvlib.ModelChain-Instanz mit PVWatts-Parametern."""
temp_params = pvlib.temperature.TEMPERATURE_MODEL_PARAMETERS["sapm"]["open_rack_glass_polymer"]
system = PVSystem(
surface_tilt=self.cfg.tilt_deg,
surface_azimuth=self.cfg.azimuth_deg,
module_parameters={"pdc0": self.cfg.pdc0_w, "gamma_pdc": self.cfg.gamma_pdc},
inverter_parameters={"pdc0": self.cfg.pdc0_w, "eta_inv_nom": self.cfg.eta_inv_nom},
albedo=self.cfg.albedo,
temperature_model_parameters=temp_params,
module_type="glass_polymer",
racking_model="open_rack",
)
mc = ModelChain(
system, self.location,
transposition_model=self.cfg.transposition_model,
solar_position_method="nrel_numpy",
airmass_model="kastenyoung1989",
dc_model="pvwatts",
ac_model="pvwatts",
aoi_model="physical",
spectral_model=None,
losses_model="pvwatts",
temperature_model="sapm",
)
mc.losses_parameters = {
"dc_loss": float(self.cfg.dc_loss),
"ac_loss": float(self.cfg.ac_loss),
"soiling": float(self.cfg.soiling),
}
self._mc = mc
return mc
# ---------------------------------------------------------------------
def calc_dni_and_dhi(self, weather: pd.DataFrame) -> pd.DataFrame:
"""
Berechnet DNI & DHI aus GHI über die ERBS-Methode.
Gibt ein neues DataFrame mit 'ghi', 'dni', 'dhi' zurück.
"""
if "ghi" not in weather:
raise ValueError("Wetterdaten benötigen mindestens 'ghi'.")
# Sonnenstand bestimmen
sp = self.location.get_solarposition(weather.index)
erbs = pvlib.irradiance.erbs(weather["ghi"], sp["zenith"], weather.index)
out = weather.copy()
out["dni"] = erbs["dni"].clip(lower=0)
out["dhi"] = erbs["dhi"].clip(lower=0)
return out
# ---------------------------------------------------------------------
def _prepare_weather(self, weather: pd.DataFrame) -> pd.DataFrame:
"""Sichert vollständige Spalten (ghi, dni, dhi, temp_air, wind_speed)."""
if "ghi" not in weather or "temp_air" not in weather:
raise ValueError("weather benötigt Spalten: 'ghi' und 'temp_air'.")
w = weather.copy()
# Zeitzone prüfen
if w.index.tz is None:
w.index = w.index.tz_localize(self.location.tz)
else:
if str(w.index.tz) != str(self.location.tz):
w = w.tz_convert(self.location.tz)
# Wind default
if "wind_speed" not in w:
w["wind_speed"] = 1.0
# DNI/DHI ergänzen (immer mit ERBS)
if "dni" not in w or "dhi" not in w:
w = self.calc_dni_and_dhi(w)
return w
# ---------------------------------------------------------------------
def get_power(self, weather: pd.DataFrame) -> pd.Series:
"""
Berechnet AC-Leistung aus Wetterdaten.
"""
w = self._prepare_weather(weather)
mc = self._create_modelchain()
mc.run_model(weather=w)
return mc.results.ac.rename(self.cfg.name)
# ------------------------------- Anlage ----------------------------------
class PvWattsPlant:
"""
Eine PV-Anlage mit mehreren Subarrays, die ein gemeinsames Wetter-DataFrame nutzt.
"""
def __init__(self, site: Location, subarray_cfgs: List[PvWattsSubarrayConfig]):
self.site = site
self.subs: Dict[str, PvWattsSubarray] = {c.name: PvWattsSubarray(c, site) for c in subarray_cfgs}
def get_power(
self,
weather: pd.DataFrame,
*,
return_breakdown: bool = False
) -> pd.Series | Tuple[pd.Series, Dict[str, pd.Series]]:
"""Berechne Gesamtleistung und optional Einzel-Subarrays."""
parts: Dict[str, pd.Series] = {name: sub.get_power(weather) for name, sub in self.subs.items()}
# gemeinsamen Index bilden
idx = list(parts.values())[0].index
for s in parts.values():
idx = idx.intersection(s.index)
parts = {k: v.reindex(idx).fillna(0.0) for k, v in parts.items()}
total = sum(parts.values())
total.name = "total_ac"
if return_breakdown:
return total, parts
return total
# --------------------------- Beispielnutzung -----------------------------
if __name__ == "__main__":
# Standort
site = Location(latitude=52.52, longitude=13.405, altitude=35, tz="Europe/Berlin", name="Berlin")
# Zeitachse: 1 Tag, 15-minütig
times = pd.date_range("2025-06-21 00:00", "2025-06-21 23:45", freq="15min", tz=site.tz)
# Dummy-Wetter
ghi = 1000 * np.clip(np.sin(np.linspace(0, np.pi, len(times)))**1.2, 0, None)
temp_air = 16 + 8 * np.clip(np.sin(np.linspace(-np.pi/2, np.pi/2, len(times))), 0, None)
wind = np.full(len(times), 1.0)
weather = pd.DataFrame(index=times)
weather["ghi"] = ghi
weather["temp_air"] = temp_air
weather["wind_speed"] = wind
# Zwei Subarrays
cfgs = [
PvWattsSubarrayConfig(name="Sued_30", pdc0_w=6000, tilt_deg=30, azimuth_deg=180, dc_loss=0.02, ac_loss=0.01),
PvWattsSubarrayConfig(name="West_20", pdc0_w=4000, tilt_deg=20, azimuth_deg=270, soiling=0.02),
]
plant = PvWattsPlant(site, cfgs)
# Simulation
total, parts = plant.get_power(weather, return_breakdown=True)
# Plot
plt.figure(figsize=(10, 6))
plt.plot(total.index, total / 1000, label="Gesamtleistung (AC)", linewidth=2, color="black")
for name, s in parts.items():
plt.plot(s.index, s / 1000, label=name)
plt.title("PV-Leistung (PVWatts, ERBS-Methode für DNI/DHI)")
plt.ylabel("Leistung [kW]")
plt.xlabel("Zeit")
plt.legend()
plt.grid(True, linestyle="--", alpha=0.5)
plt.tight_layout()
plt.show()

View File

@@ -1,134 +0,0 @@
import time
import struct
import pandas as pd
from typing import Dict, Any, List, Tuple, Optional
from pymodbus.client import ModbusTcpClient
EXCEL_PATH = "modbus_registers/pv_inverter_registers.xlsx"
# Obergrenze: bis EXKLUSIVE 40206 (d.h. max. 40205)
MIN_ADDR_INCLUSIVE = 40121
ADDRESS_SHIFT = 50
class SolaredgeMeter:
def __init__(self, device_name: str, ip_address: str, port: int = 502, unit: int = 1):
self.device_name = device_name
self.ip = ip_address
self.port = port
self.unit = unit
self.client: Optional[ModbusTcpClient] = None
self.registers: Dict[int, Dict[str, Any]] = {} # addr -> {"desc":..., "type":...}
self.connect_to_modbus()
self.load_registers(EXCEL_PATH)
# ---------- Verbindung ----------
def connect_to_modbus(self):
self.client = ModbusTcpClient(self.ip, port=self.port, timeout=3.0, retries=3)
if not self.client.connect():
print("❌ Verbindung zu Zähler fehlgeschlagen.")
raise SystemExit(1)
print("✅ Verbindung zu Zähler hergestellt.")
def close(self):
if self.client:
self.client.close()
self.client = None
# ---------- Register-Liste ----------
def load_registers(self, excel_path: str):
xls = pd.ExcelFile(excel_path)
df = xls.parse()
# Passe Spaltennamen hier an, falls nötig:
cols = ["MB Adresse", "Beschreibung", "Variabel Typ"]
df = df[cols].dropna()
df["MB Adresse"] = df["MB Adresse"].astype(int)
# 1) Vorab-Filter: nur Adressen < 40206 übernehmen
df = df[df["MB Adresse"] >= MIN_ADDR_INCLUSIVE]
self.registers = {
int(row["MB Adresse"]): {
"desc": str(row["Beschreibung"]).strip(),
"type": str(row["Variabel Typ"]).strip()
}
for _, row in df.iterrows()
}
# ---------- Low-Level Lesen ----------
def _try_read(self, fn_name: str, address: int, count: int) -> Optional[List[int]]:
fn = getattr(self.client, fn_name)
# pymodbus 3.8.x hat 'slave='; Fallbacks schaden nicht
shifted_addr = address + ADDRESS_SHIFT
for kwargs in (dict(address=shifted_addr, count=count, slave=self.unit),
dict(address=shifted_addr, count=count)):
try:
res = fn(**kwargs)
if res is None or (hasattr(res, "isError") and res.isError()):
continue
return res.registers
except TypeError:
continue
return None
def _read_any(self, address: int, count: int) -> Optional[List[int]]:
regs = self._try_read("read_holding_registers", address, count)
if regs is None:
regs = self._try_read("read_input_registers", address, count)
return regs
# ---------- Decoding ----------
@staticmethod
def _to_i16(u16: int) -> int:
return struct.unpack(">h", struct.pack(">H", u16))[0]
@staticmethod
def _to_f32_from_two(u16_hi: int, u16_lo: int, msw_first: bool = True) -> float:
b = struct.pack(">HH", u16_hi, u16_lo) if msw_first else struct.pack(">HH", u16_lo, u16_hi)
return struct.unpack(">f", b)[0]
# Hilfsfunktion: wie viele 16-Bit-Register braucht dieser Typ?
@staticmethod
def _word_count_for_type(rtype: str) -> int:
rt = (rtype or "").lower()
# Passe hier an deine Excel-Typen an:
if "uint32" in rt or "real" in rt or "float" in rt or "string(32)" in rt:
return 2
# Default: 1 Wort (z.B. int16/uint16)
return 1
def read_one(self, address_excel: int, rtype: str) -> Optional[float]:
"""
Liest einen Wert nach Typ ('INT' oder 'REAL' etc.).
Es werden ausschließlich Register < 40206 gelesen.
"""
addr = int(address_excel)
words = self._word_count_for_type(rtype)
if words == 2:
regs = self._read_any(addr, 2)
if not regs or len(regs) < 2:
return None
# Deine bisherige Logik interpretiert 2 Worte als Float32:
return self._to_f32_from_two(regs[0], regs[1])
else:
regs = self._read_any(addr, 1)
if not regs:
return None
return float(self._to_i16(regs[0]))
def get_state(self) -> Dict[str, Any]:
"""
Liest ALLE Register aus self.registers und gibt dict zurück.
Achtet darauf, dass keine Adresse (inkl. Mehrwort) >= 40206 gelesen wird.
"""
data = {"Zeit": time.strftime("%Y-%m-%d %H:%M:%S")}
for address, meta in sorted(self.registers.items()):
words = self._word_count_for_type(meta["type"])
val = self.read_one(address, meta["type"])
if val is None:
continue
key = f"{address} - {meta['desc']}"
data[key] = val
return data

5108945
terminal_log Normal file

File diff suppressed because it is too large Load Diff

18
test.py
View File

@@ -1,18 +0,0 @@
from pymodbus.client import ModbusTcpClient
import struct
MODBUS_IP="10.0.0.40"
client=ModbusTcpClient(MODBUS_IP, port=502)
client.connect()
try:
rr = client.read_input_registers(30, count=3, slave=1)
print("Raw 30..32:", rr.registers)
def as_int16(x):
return struct.unpack(">h", struct.pack(">H", x))[0]
for i, raw in enumerate(rr.registers, start=30):
print(i, "raw", raw, "int16", as_int16(raw), "scaled", as_int16(raw)/10.0)
finally:
client.close()