from pymodbus.client import ModbusTcpClient class SgReadyController(): def __init__(self, es): self.es = es def perform_action(self, heat_pump_name, meter_name, state): hp = self.es.get_component_by_name(heat_pump_name) meter_values = state[meter_name] power_to_grid = meter_values['40206 - M_AC_Power'] * 10 ** meter_values['40210 - M_AC_Power_SF'] mode = None if power_to_grid > 10000: mode = 'mode2' self.switch_sg_ready_mode(hp.ip, hp.port, mode) elif power_to_grid < 0: mode = 'mode1' self.switch_sg_ready_mode(hp.ip, hp.port, mode) return mode def switch_sg_ready_mode(self, ip, port, mode): """ Register 300: 1=BUS 0= Hardware Kontakte Register 301 & 302: 0-0= Kein Offset 0-1 Boiler und Heizung Offset 1-1 Boiler Offset + E-Einsatz Sollwert Erhöht 1-0 SG EVU Sperre :param ip: :param mode: 'mode1' = [True, False, False] => SG Ready deactivated 'mode2' = [True, False, True] => SG ready activated for heatpump only 'mode3' = [True, True, True] => SG ready activated for heatpump and heat rod :return: """ client = ModbusTcpClient(ip, port=port) if not client.connect(): print("Verbindung zur Wärmepumpe fehlgeschlagen.") return mode_code = None if mode == 'mode1': mode_code = [True, False, False] elif mode == 'mode2': mode_code = [True, False, True] elif mode == 'mode3': mode_code = [True, True, True] else: print('Uncorrect or no string for mode!') try: response_300 = client.write_coil(300, mode_code[0]) response_301 = client.write_coil(301, mode_code[1]) response_302 = client.write_coil(302, mode_code[2]) # Optional: Rückmeldungen prüfen for addr, resp in zip([300, 301, 302], [response_300, response_301, response_302]): if resp.isError(): print(f"Fehler beim Schreiben von Coil {addr}: {resp}") else: print(f"Coil {addr} erfolgreich geschrieben.") finally: client.close()