diff --git a/data_base_operations/transform_old_db_to_new.py b/data_base_operations/transform_old_db_to_new.py index cf498f2..3068c17 100644 --- a/data_base_operations/transform_old_db_to_new.py +++ b/data_base_operations/transform_old_db_to_new.py @@ -1,189 +1,209 @@ -import os -import re -import math +import os, re, math, time from datetime import datetime, timezone, timedelta import pandas as pd -from influxdb_client import InfluxDBClient, BucketRetentionRules, Point, WritePrecision +from influxdb_client import InfluxDBClient, Point, WritePrecision from influxdb_client.client.write_api import SYNCHRONOUS +from influxdb_client.rest import ApiException # ----------------------- -# CONFIG (deine "richtigen Eingabewerte") +# CONFIG # ----------------------- INFLUX_URL = "http://192.168.1.146:8086" INFLUX_ORG = "allmende" INFLUX_TOKEN = os.environ.get("INFLUX_TOKEN", "Cw_naEZyvJ3isiAh1P4Eq3TsjcHmzzDFS7SlbKDsS6ZWL04fMEYixWqtNxGThDdG27S9aW5g7FP9eiq5z1rsGA==") SOURCE_BUCKET = "allmende_db" -TARGET_BUCKET = "allmende_db_v3" +TARGET_BUCKET = "allmende_db_v2" -# Welche Measurements migrieren? -MEASUREMENTS = ["hp_master", "hp_slave"] # falls du mehr willst: ergänzen +MEASUREMENTS = [ + "hp_master", "hp_slave", "pv_forecast", "sg_ready", + "solaredge_master", "solaredge_meter", "solaredge_slave", "wohnung_2_6" +] -# Zeitraum -START_DT = datetime(2025, 12, 30, tzinfo=timezone.utc) # anpassen (oder weiter zurück) +START_DT = datetime(2025, 6, 1, tzinfo=timezone.utc) STOP_DT = datetime.now(timezone.utc) - -# Query-Fenster (bei Timeouts kleiner machen: 6h oder 1h) WINDOW = timedelta(days=1) -# Excel Mapping: alte Felder "300 - Aussentemperatur" -> neue Felder (Tag_Name) EXCEL_PATH = "../modbus_registers/heat_pump_registers.xlsx" EXCEL_SHEET = "Register_Map" +BATCH_SIZE = 1000 +MAX_RETRIES = 8 + # ----------------------- # Helpers # ----------------------- -def ensure_bucket(client: InfluxDBClient, bucket_name: str, retention_days: int | None = None): - buckets_api = client.buckets_api() - existing = buckets_api.find_bucket_by_name(bucket_name) - if existing: - print(f"Bucket existiert bereits: {bucket_name}") - return existing +def normalize(s) -> str: + s = "" if s is None else str(s).strip() + return re.sub(r"\s+", " ", s) - rules = None - if retention_days is not None: - rules = BucketRetentionRules(type="expire", every_seconds=int(timedelta(days=retention_days).total_seconds())) +def is_invalid_sentinel(v: float) -> bool: + return v in (-999.9, -999.0, 30000.0, 32767.0, 65535.0) - b = buckets_api.create_bucket(bucket_name=bucket_name, org=INFLUX_ORG, retention_rules=rules) - print(f"Bucket angelegt: {bucket_name}") - return b +def ensure_bucket(client: InfluxDBClient, name: str): + bapi = client.buckets_api() + if bapi.find_bucket_by_name(name): + return + bapi.create_bucket(bucket_name=name, org=INFLUX_ORG, retention_rules=None) +def build_field_type_map_from_excel(path: str) -> dict[str, str]: + df = pd.read_excel(path, sheet_name=EXCEL_SHEET) + df = df[df["Register_Type"].astype(str).str.upper() == "IR"].copy() + df["Address"] = df["Address"].astype(int) + df["Description"] = df["Description"].fillna("").astype(str) + df["Tag_Name"] = df["Tag_Name"].fillna("").astype(str) + df["Data_Type"] = df["Data_Type"].fillna("").astype(str) -def normalize(s: str) -> str: - """Normalisiert Field-Keys, um Abweichungen bei Whitespaces abzufangen.""" - if s is None: - return "" - s = str(s).strip() - s = re.sub(r"\s+", " ", s) - return s + m: dict[str, str] = {} + for _, r in df.iterrows(): + addr = int(r["Address"]) + desc = normalize(r["Description"]) + tag = normalize(r["Tag_Name"]) + dtp = normalize(r["Data_Type"]).upper() + if tag: + m[tag] = dtp + old_key = normalize(f"{addr} - {desc}".strip(" -")) + if old_key: + m[old_key] = dtp + return m - -def to_float_or_none(v): +def coerce_value_to_dtype(v, dtype: str): if v is None: return None - if isinstance(v, bool): - return float(int(v)) + dtp = (dtype or "").upper() + if isinstance(v, (int, float)): fv = float(v) if math.isnan(fv) or math.isinf(fv): return None - return fv + + if dtp in ("BOOL", "BOOLEAN"): + if isinstance(v, bool): return v + if isinstance(v, (int, float)): return bool(int(v)) + return None + + if dtp.startswith("INT") or dtp.startswith("UINT"): + if isinstance(v, bool): return int(v) + if isinstance(v, (int, float)): return int(float(v)) + return None + + if dtp.startswith("FLOAT") or dtp in ("DOUBLE",): + if isinstance(v, bool): return float(int(v)) + if isinstance(v, (int, float)): return float(v) + return None + return None +def write_with_retry(write_api, batch): + delay = 1.0 + last_msg = "" + for _ in range(MAX_RETRIES): + try: + write_api.write(bucket=TARGET_BUCKET, org=INFLUX_ORG, record=batch) + return + except ApiException as e: + last_msg = getattr(e, "body", "") or str(e) + status = getattr(e, "status", None) + if "timeout" in last_msg.lower() or status in (429, 500, 502, 503, 504): + time.sleep(delay) + delay = min(delay * 2, 30) + continue + raise + raise RuntimeError(f"Write failed after {MAX_RETRIES} retries: {last_msg}") -def is_invalid_sentinel(v: float) -> bool: - # Optional: typische "ungültig" Marker (bei dir sichtbar) - return v in (-999.9, -999.0, 30000.0, 32767.0, 65535.0) - - -def build_field_mapping_from_excel(excel_path: str) -> dict[str, str]: - """ - Mappt ALT: "{Address} - {Description}" -> NEU: Tag_Name - """ - df = pd.read_excel(excel_path, sheet_name=EXCEL_SHEET) - df = df[df["Register_Type"].astype(str).str.upper() == "IR"].copy() - - df["Address"] = df["Address"].astype(int) - df["Description"] = df["Description"].fillna("").astype(str) - df["Tag_Name"] = df["Tag_Name"].fillna("").astype(str) - - mapping: dict[str, str] = {} - for _, r in df.iterrows(): - addr = int(r["Address"]) - desc = normalize(r["Description"]) - tag = normalize(r["Tag_Name"]) - - old_key = normalize(f"{addr} - {desc}".strip(" -")) - new_key = tag if tag else old_key - - mapping[old_key] = new_key - - return mapping - - -def migrate_window(client: InfluxDBClient, query_api, write_api, - measurement: str, field_map: dict[str, str], - start: datetime, stop: datetime) -> int: - start_iso = start.isoformat() - stop_iso = stop.isoformat() +def window_already_migrated(query_api, measurement: str, start: datetime, stop: datetime) -> bool: + # Prüft: gibt es im Zielbucket im Fenster mindestens 1 Punkt? + flux = f''' +from(bucket: "{TARGET_BUCKET}") + |> range(start: time(v: "{start.isoformat()}"), stop: time(v: "{stop.isoformat()}")) + |> filter(fn: (r) => r._measurement == "{measurement}") + |> limit(n: 1) +''' + tables = query_api.query(flux, org=INFLUX_ORG) + for t in tables: + if t.records: + return True + return False +def migrate_window(query_api, write_api, measurement: str, + start: datetime, stop: datetime, + type_map: dict[str, str], + do_type_cast: bool) -> int: flux = f''' from(bucket: "{SOURCE_BUCKET}") - |> range(start: time(v: "{start_iso}"), stop: time(v: "{stop_iso}")) + |> range(start: time(v: "{start.isoformat()}"), stop: time(v: "{stop.isoformat()}")) |> filter(fn: (r) => r._measurement == "{measurement}") |> keep(columns: ["_time","_measurement","_field","_value"]) ''' - tables = query_api.query(flux, org=INFLUX_ORG) - batch = [] - written = 0 - + batch, written = [], 0 for table in tables: - for record in table.records: - t = record.get_time() - old_field = normalize(record.get_field()) - value = to_float_or_none(record.get_value()) - if value is None or is_invalid_sentinel(value): + for rec in table.records: + t = rec.get_time() + field = normalize(rec.get_field()) + value = rec.get_value() + if value is None: continue - # ALT -> NEU Feldname - new_field = field_map.get(old_field, old_field) + if do_type_cast: + dtp = type_map.get(field) + if dtp: + cv = coerce_value_to_dtype(value, dtp) + if cv is None: + continue + if isinstance(cv, (int, float)) and is_invalid_sentinel(float(cv)): + continue + value = cv + # kein Mapping -> unverändert schreiben - batch.append( - Point(measurement) - .field(new_field, float(value)) # Zielbucket: konsistent float - .time(t, WritePrecision.NS) - ) + batch.append(Point(measurement).field(field, value).time(t, WritePrecision.NS)) - if len(batch) >= 5000: - write_api.write(bucket=TARGET_BUCKET, org=INFLUX_ORG, record=batch) + if len(batch) >= BATCH_SIZE: + write_with_retry(write_api, batch) written += len(batch) batch = [] if batch: - write_api.write(bucket=TARGET_BUCKET, org=INFLUX_ORG, record=batch) + write_with_retry(write_api, batch) written += len(batch) return written +# ----------------------- +# Main +# ----------------------- def main(): if not INFLUX_TOKEN: - raise RuntimeError("INFLUX_TOKEN fehlt. Setze ihn z.B. als Env-Var INFLUX_TOKEN.") + raise RuntimeError("INFLUX_TOKEN fehlt (Env-Var INFLUX_TOKEN setzen).") - # Timeout hoch, da Flux-Queries je nach Datenmenge länger dauern - with InfluxDBClient( - url=INFLUX_URL, - token=INFLUX_TOKEN, - org=INFLUX_ORG, - timeout=240_000, # ms - ) as client: - - ensure_bucket(client, TARGET_BUCKET, retention_days=None) - - field_map = build_field_mapping_from_excel(EXCEL_PATH) - print(f"Field-Mappings geladen: {len(field_map)}") + with InfluxDBClient(url=INFLUX_URL, token=INFLUX_TOKEN, org=INFLUX_ORG, timeout=900_000) as client: + ensure_bucket(client, TARGET_BUCKET) + type_map = build_field_type_map_from_excel(EXCEL_PATH) query_api = client.query_api() write_api = client.write_api(write_options=SYNCHRONOUS) for meas in MEASUREMENTS: - print(f"\n== Migration: {meas} ({SOURCE_BUCKET} -> {TARGET_BUCKET}) ==") + do_cast = meas in ("hp_master", "hp_slave") + cur, total = START_DT, 0 + print(f"\n== {meas} (cast={'ON' if do_cast else 'OFF'}) ==") - cur = START_DT - total = 0 while cur < STOP_DT: nxt = min(cur + WINDOW, STOP_DT) - print(f" Fenster: {cur.isoformat()} -> {nxt.isoformat()}") - n = migrate_window(client, query_api, write_api, meas, field_map, cur, nxt) + if window_already_migrated(query_api, meas, cur, nxt): + print(f"{cur.isoformat()} -> {nxt.isoformat()} : SKIP (existiert schon)") + cur = nxt + continue + + n = migrate_window(query_api, write_api, meas, cur, nxt, type_map, do_cast) total += n - print(f" geschrieben: {n} (gesamt {total})") - + print(f"{cur.isoformat()} -> {nxt.isoformat()} : {n} (gesamt {total})") cur = nxt print(f"== Fertig {meas}: {total} Punkte ==")