diff --git a/data_base_operations/transform_old_db_to_new.py b/data_base_operations/transform_old_db_to_new.py new file mode 100644 index 0000000..cf498f2 --- /dev/null +++ b/data_base_operations/transform_old_db_to_new.py @@ -0,0 +1,193 @@ +import os +import re +import math +from datetime import datetime, timezone, timedelta + +import pandas as pd +from influxdb_client import InfluxDBClient, BucketRetentionRules, Point, WritePrecision +from influxdb_client.client.write_api import SYNCHRONOUS + + +# ----------------------- +# CONFIG (deine "richtigen Eingabewerte") +# ----------------------- +INFLUX_URL = "http://192.168.1.146:8086" +INFLUX_ORG = "allmende" +INFLUX_TOKEN = os.environ.get("INFLUX_TOKEN", "Cw_naEZyvJ3isiAh1P4Eq3TsjcHmzzDFS7SlbKDsS6ZWL04fMEYixWqtNxGThDdG27S9aW5g7FP9eiq5z1rsGA==") + +SOURCE_BUCKET = "allmende_db" +TARGET_BUCKET = "allmende_db_v3" + +# Welche Measurements migrieren? +MEASUREMENTS = ["hp_master", "hp_slave"] # falls du mehr willst: ergänzen + +# Zeitraum +START_DT = datetime(2025, 12, 30, tzinfo=timezone.utc) # anpassen (oder weiter zurück) +STOP_DT = datetime.now(timezone.utc) + +# Query-Fenster (bei Timeouts kleiner machen: 6h oder 1h) +WINDOW = timedelta(days=1) + +# Excel Mapping: alte Felder "300 - Aussentemperatur" -> neue Felder (Tag_Name) +EXCEL_PATH = "../modbus_registers/heat_pump_registers.xlsx" +EXCEL_SHEET = "Register_Map" + + +# ----------------------- +# Helpers +# ----------------------- +def ensure_bucket(client: InfluxDBClient, bucket_name: str, retention_days: int | None = None): + buckets_api = client.buckets_api() + existing = buckets_api.find_bucket_by_name(bucket_name) + if existing: + print(f"Bucket existiert bereits: {bucket_name}") + return existing + + rules = None + if retention_days is not None: + rules = BucketRetentionRules(type="expire", every_seconds=int(timedelta(days=retention_days).total_seconds())) + + b = buckets_api.create_bucket(bucket_name=bucket_name, org=INFLUX_ORG, retention_rules=rules) + print(f"Bucket angelegt: {bucket_name}") + return b + + +def normalize(s: str) -> str: + """Normalisiert Field-Keys, um Abweichungen bei Whitespaces abzufangen.""" + if s is None: + return "" + s = str(s).strip() + s = re.sub(r"\s+", " ", s) + return s + + +def to_float_or_none(v): + if v is None: + return None + if isinstance(v, bool): + return float(int(v)) + if isinstance(v, (int, float)): + fv = float(v) + if math.isnan(fv) or math.isinf(fv): + return None + return fv + return None + + +def is_invalid_sentinel(v: float) -> bool: + # Optional: typische "ungültig" Marker (bei dir sichtbar) + return v in (-999.9, -999.0, 30000.0, 32767.0, 65535.0) + + +def build_field_mapping_from_excel(excel_path: str) -> dict[str, str]: + """ + Mappt ALT: "{Address} - {Description}" -> NEU: Tag_Name + """ + df = pd.read_excel(excel_path, sheet_name=EXCEL_SHEET) + df = df[df["Register_Type"].astype(str).str.upper() == "IR"].copy() + + df["Address"] = df["Address"].astype(int) + df["Description"] = df["Description"].fillna("").astype(str) + df["Tag_Name"] = df["Tag_Name"].fillna("").astype(str) + + mapping: dict[str, str] = {} + for _, r in df.iterrows(): + addr = int(r["Address"]) + desc = normalize(r["Description"]) + tag = normalize(r["Tag_Name"]) + + old_key = normalize(f"{addr} - {desc}".strip(" -")) + new_key = tag if tag else old_key + + mapping[old_key] = new_key + + return mapping + + +def migrate_window(client: InfluxDBClient, query_api, write_api, + measurement: str, field_map: dict[str, str], + start: datetime, stop: datetime) -> int: + start_iso = start.isoformat() + stop_iso = stop.isoformat() + + flux = f''' +from(bucket: "{SOURCE_BUCKET}") + |> range(start: time(v: "{start_iso}"), stop: time(v: "{stop_iso}")) + |> filter(fn: (r) => r._measurement == "{measurement}") + |> keep(columns: ["_time","_measurement","_field","_value"]) +''' + + tables = query_api.query(flux, org=INFLUX_ORG) + + batch = [] + written = 0 + + for table in tables: + for record in table.records: + t = record.get_time() + old_field = normalize(record.get_field()) + value = to_float_or_none(record.get_value()) + if value is None or is_invalid_sentinel(value): + continue + + # ALT -> NEU Feldname + new_field = field_map.get(old_field, old_field) + + batch.append( + Point(measurement) + .field(new_field, float(value)) # Zielbucket: konsistent float + .time(t, WritePrecision.NS) + ) + + if len(batch) >= 5000: + write_api.write(bucket=TARGET_BUCKET, org=INFLUX_ORG, record=batch) + written += len(batch) + batch = [] + + if batch: + write_api.write(bucket=TARGET_BUCKET, org=INFLUX_ORG, record=batch) + written += len(batch) + + return written + + +def main(): + if not INFLUX_TOKEN: + raise RuntimeError("INFLUX_TOKEN fehlt. Setze ihn z.B. als Env-Var INFLUX_TOKEN.") + + # Timeout hoch, da Flux-Queries je nach Datenmenge länger dauern + with InfluxDBClient( + url=INFLUX_URL, + token=INFLUX_TOKEN, + org=INFLUX_ORG, + timeout=240_000, # ms + ) as client: + + ensure_bucket(client, TARGET_BUCKET, retention_days=None) + + field_map = build_field_mapping_from_excel(EXCEL_PATH) + print(f"Field-Mappings geladen: {len(field_map)}") + + query_api = client.query_api() + write_api = client.write_api(write_options=SYNCHRONOUS) + + for meas in MEASUREMENTS: + print(f"\n== Migration: {meas} ({SOURCE_BUCKET} -> {TARGET_BUCKET}) ==") + + cur = START_DT + total = 0 + while cur < STOP_DT: + nxt = min(cur + WINDOW, STOP_DT) + print(f" Fenster: {cur.isoformat()} -> {nxt.isoformat()}") + + n = migrate_window(client, query_api, write_api, meas, field_map, cur, nxt) + total += n + print(f" geschrieben: {n} (gesamt {total})") + + cur = nxt + + print(f"== Fertig {meas}: {total} Punkte ==") + + +if __name__ == "__main__": + main() diff --git a/heat_pump.py b/heat_pump.py index beac351..7e6aed5 100644 --- a/heat_pump.py +++ b/heat_pump.py @@ -162,12 +162,8 @@ class HeatPump: # continue value = float(value) desc = meta.get("desc") or "" - label = f"{address} - {desc}".strip(" -") - - data[label] = value - tag = meta.get("tag") - if tag: - data[tag] = value + field_name = f"{address} - {desc}".strip(" -") + data[field_name] = float(value) print(f"Adresse {address} - {desc}: {value}") diff --git a/main.py b/main.py index 471987e..e48176b 100644 --- a/main.py +++ b/main.py @@ -23,7 +23,7 @@ db = DataBaseInflux( url="http://192.168.1.146:8086", token="Cw_naEZyvJ3isiAh1P4Eq3TsjcHmzzDFS7SlbKDsS6ZWL04fMEYixWqtNxGThDdG27S9aW5g7FP9eiq5z1rsGA==", org="allmende", - bucket="allmende_db" + bucket="allmende_db_v3" ) hp_master = HeatPump(device_name='hp_master', ip_address='10.0.0.10', port=502)