transformation of data from old database to new was updated.

This commit is contained in:
Nils Reiners
2026-01-08 10:18:24 +01:00
parent fd87257f37
commit afbcd81310

View File

@@ -1,189 +1,209 @@
import os import os, re, math, time
import re
import math
from datetime import datetime, timezone, timedelta from datetime import datetime, timezone, timedelta
import pandas as pd import pandas as pd
from influxdb_client import InfluxDBClient, BucketRetentionRules, Point, WritePrecision from influxdb_client import InfluxDBClient, Point, WritePrecision
from influxdb_client.client.write_api import SYNCHRONOUS from influxdb_client.client.write_api import SYNCHRONOUS
from influxdb_client.rest import ApiException
# ----------------------- # -----------------------
# CONFIG (deine "richtigen Eingabewerte") # CONFIG
# ----------------------- # -----------------------
INFLUX_URL = "http://192.168.1.146:8086" INFLUX_URL = "http://192.168.1.146:8086"
INFLUX_ORG = "allmende" INFLUX_ORG = "allmende"
INFLUX_TOKEN = os.environ.get("INFLUX_TOKEN", "Cw_naEZyvJ3isiAh1P4Eq3TsjcHmzzDFS7SlbKDsS6ZWL04fMEYixWqtNxGThDdG27S9aW5g7FP9eiq5z1rsGA==") INFLUX_TOKEN = os.environ.get("INFLUX_TOKEN", "Cw_naEZyvJ3isiAh1P4Eq3TsjcHmzzDFS7SlbKDsS6ZWL04fMEYixWqtNxGThDdG27S9aW5g7FP9eiq5z1rsGA==")
SOURCE_BUCKET = "allmende_db" SOURCE_BUCKET = "allmende_db"
TARGET_BUCKET = "allmende_db_v3" TARGET_BUCKET = "allmende_db_v2"
# Welche Measurements migrieren? MEASUREMENTS = [
MEASUREMENTS = ["hp_master", "hp_slave"] # falls du mehr willst: ergänzen "hp_master", "hp_slave", "pv_forecast", "sg_ready",
"solaredge_master", "solaredge_meter", "solaredge_slave", "wohnung_2_6"
]
# Zeitraum START_DT = datetime(2025, 6, 1, tzinfo=timezone.utc)
START_DT = datetime(2025, 12, 30, tzinfo=timezone.utc) # anpassen (oder weiter zurück)
STOP_DT = datetime.now(timezone.utc) STOP_DT = datetime.now(timezone.utc)
# Query-Fenster (bei Timeouts kleiner machen: 6h oder 1h)
WINDOW = timedelta(days=1) WINDOW = timedelta(days=1)
# Excel Mapping: alte Felder "300 - Aussentemperatur" -> neue Felder (Tag_Name)
EXCEL_PATH = "../modbus_registers/heat_pump_registers.xlsx" EXCEL_PATH = "../modbus_registers/heat_pump_registers.xlsx"
EXCEL_SHEET = "Register_Map" EXCEL_SHEET = "Register_Map"
BATCH_SIZE = 1000
MAX_RETRIES = 8
# ----------------------- # -----------------------
# Helpers # Helpers
# ----------------------- # -----------------------
def ensure_bucket(client: InfluxDBClient, bucket_name: str, retention_days: int | None = None): def normalize(s) -> str:
buckets_api = client.buckets_api() s = "" if s is None else str(s).strip()
existing = buckets_api.find_bucket_by_name(bucket_name) return re.sub(r"\s+", " ", s)
if existing:
print(f"Bucket existiert bereits: {bucket_name}")
return existing
rules = None def is_invalid_sentinel(v: float) -> bool:
if retention_days is not None: return v in (-999.9, -999.0, 30000.0, 32767.0, 65535.0)
rules = BucketRetentionRules(type="expire", every_seconds=int(timedelta(days=retention_days).total_seconds()))
b = buckets_api.create_bucket(bucket_name=bucket_name, org=INFLUX_ORG, retention_rules=rules) def ensure_bucket(client: InfluxDBClient, name: str):
print(f"Bucket angelegt: {bucket_name}") bapi = client.buckets_api()
return b if bapi.find_bucket_by_name(name):
return
bapi.create_bucket(bucket_name=name, org=INFLUX_ORG, retention_rules=None)
def build_field_type_map_from_excel(path: str) -> dict[str, str]:
df = pd.read_excel(path, sheet_name=EXCEL_SHEET)
df = df[df["Register_Type"].astype(str).str.upper() == "IR"].copy()
df["Address"] = df["Address"].astype(int)
df["Description"] = df["Description"].fillna("").astype(str)
df["Tag_Name"] = df["Tag_Name"].fillna("").astype(str)
df["Data_Type"] = df["Data_Type"].fillna("").astype(str)
def normalize(s: str) -> str: m: dict[str, str] = {}
"""Normalisiert Field-Keys, um Abweichungen bei Whitespaces abzufangen.""" for _, r in df.iterrows():
if s is None: addr = int(r["Address"])
return "" desc = normalize(r["Description"])
s = str(s).strip() tag = normalize(r["Tag_Name"])
s = re.sub(r"\s+", " ", s) dtp = normalize(r["Data_Type"]).upper()
return s if tag:
m[tag] = dtp
old_key = normalize(f"{addr} - {desc}".strip(" -"))
if old_key:
m[old_key] = dtp
return m
def coerce_value_to_dtype(v, dtype: str):
def to_float_or_none(v):
if v is None: if v is None:
return None return None
if isinstance(v, bool): dtp = (dtype or "").upper()
return float(int(v))
if isinstance(v, (int, float)): if isinstance(v, (int, float)):
fv = float(v) fv = float(v)
if math.isnan(fv) or math.isinf(fv): if math.isnan(fv) or math.isinf(fv):
return None return None
return fv
if dtp in ("BOOL", "BOOLEAN"):
if isinstance(v, bool): return v
if isinstance(v, (int, float)): return bool(int(v))
return None
if dtp.startswith("INT") or dtp.startswith("UINT"):
if isinstance(v, bool): return int(v)
if isinstance(v, (int, float)): return int(float(v))
return None
if dtp.startswith("FLOAT") or dtp in ("DOUBLE",):
if isinstance(v, bool): return float(int(v))
if isinstance(v, (int, float)): return float(v)
return None
return None return None
def write_with_retry(write_api, batch):
delay = 1.0
last_msg = ""
for _ in range(MAX_RETRIES):
try:
write_api.write(bucket=TARGET_BUCKET, org=INFLUX_ORG, record=batch)
return
except ApiException as e:
last_msg = getattr(e, "body", "") or str(e)
status = getattr(e, "status", None)
if "timeout" in last_msg.lower() or status in (429, 500, 502, 503, 504):
time.sleep(delay)
delay = min(delay * 2, 30)
continue
raise
raise RuntimeError(f"Write failed after {MAX_RETRIES} retries: {last_msg}")
def is_invalid_sentinel(v: float) -> bool: def window_already_migrated(query_api, measurement: str, start: datetime, stop: datetime) -> bool:
# Optional: typische "ungültig" Marker (bei dir sichtbar) # Prüft: gibt es im Zielbucket im Fenster mindestens 1 Punkt?
return v in (-999.9, -999.0, 30000.0, 32767.0, 65535.0) flux = f'''
from(bucket: "{TARGET_BUCKET}")
|> range(start: time(v: "{start.isoformat()}"), stop: time(v: "{stop.isoformat()}"))
def build_field_mapping_from_excel(excel_path: str) -> dict[str, str]: |> filter(fn: (r) => r._measurement == "{measurement}")
""" |> limit(n: 1)
Mappt ALT: "{Address} - {Description}" -> NEU: Tag_Name '''
""" tables = query_api.query(flux, org=INFLUX_ORG)
df = pd.read_excel(excel_path, sheet_name=EXCEL_SHEET) for t in tables:
df = df[df["Register_Type"].astype(str).str.upper() == "IR"].copy() if t.records:
return True
df["Address"] = df["Address"].astype(int) return False
df["Description"] = df["Description"].fillna("").astype(str)
df["Tag_Name"] = df["Tag_Name"].fillna("").astype(str)
mapping: dict[str, str] = {}
for _, r in df.iterrows():
addr = int(r["Address"])
desc = normalize(r["Description"])
tag = normalize(r["Tag_Name"])
old_key = normalize(f"{addr} - {desc}".strip(" -"))
new_key = tag if tag else old_key
mapping[old_key] = new_key
return mapping
def migrate_window(client: InfluxDBClient, query_api, write_api,
measurement: str, field_map: dict[str, str],
start: datetime, stop: datetime) -> int:
start_iso = start.isoformat()
stop_iso = stop.isoformat()
def migrate_window(query_api, write_api, measurement: str,
start: datetime, stop: datetime,
type_map: dict[str, str],
do_type_cast: bool) -> int:
flux = f''' flux = f'''
from(bucket: "{SOURCE_BUCKET}") from(bucket: "{SOURCE_BUCKET}")
|> range(start: time(v: "{start_iso}"), stop: time(v: "{stop_iso}")) |> range(start: time(v: "{start.isoformat()}"), stop: time(v: "{stop.isoformat()}"))
|> filter(fn: (r) => r._measurement == "{measurement}") |> filter(fn: (r) => r._measurement == "{measurement}")
|> keep(columns: ["_time","_measurement","_field","_value"]) |> keep(columns: ["_time","_measurement","_field","_value"])
''' '''
tables = query_api.query(flux, org=INFLUX_ORG) tables = query_api.query(flux, org=INFLUX_ORG)
batch = [] batch, written = [], 0
written = 0
for table in tables: for table in tables:
for record in table.records: for rec in table.records:
t = record.get_time() t = rec.get_time()
old_field = normalize(record.get_field()) field = normalize(rec.get_field())
value = to_float_or_none(record.get_value()) value = rec.get_value()
if value is None or is_invalid_sentinel(value): if value is None:
continue continue
# ALT -> NEU Feldname if do_type_cast:
new_field = field_map.get(old_field, old_field) dtp = type_map.get(field)
if dtp:
cv = coerce_value_to_dtype(value, dtp)
if cv is None:
continue
if isinstance(cv, (int, float)) and is_invalid_sentinel(float(cv)):
continue
value = cv
# kein Mapping -> unverändert schreiben
batch.append( batch.append(Point(measurement).field(field, value).time(t, WritePrecision.NS))
Point(measurement)
.field(new_field, float(value)) # Zielbucket: konsistent float
.time(t, WritePrecision.NS)
)
if len(batch) >= 5000: if len(batch) >= BATCH_SIZE:
write_api.write(bucket=TARGET_BUCKET, org=INFLUX_ORG, record=batch) write_with_retry(write_api, batch)
written += len(batch) written += len(batch)
batch = [] batch = []
if batch: if batch:
write_api.write(bucket=TARGET_BUCKET, org=INFLUX_ORG, record=batch) write_with_retry(write_api, batch)
written += len(batch) written += len(batch)
return written return written
# -----------------------
# Main
# -----------------------
def main(): def main():
if not INFLUX_TOKEN: if not INFLUX_TOKEN:
raise RuntimeError("INFLUX_TOKEN fehlt. Setze ihn z.B. als Env-Var INFLUX_TOKEN.") raise RuntimeError("INFLUX_TOKEN fehlt (Env-Var INFLUX_TOKEN setzen).")
# Timeout hoch, da Flux-Queries je nach Datenmenge länger dauern with InfluxDBClient(url=INFLUX_URL, token=INFLUX_TOKEN, org=INFLUX_ORG, timeout=900_000) as client:
with InfluxDBClient( ensure_bucket(client, TARGET_BUCKET)
url=INFLUX_URL,
token=INFLUX_TOKEN,
org=INFLUX_ORG,
timeout=240_000, # ms
) as client:
ensure_bucket(client, TARGET_BUCKET, retention_days=None)
field_map = build_field_mapping_from_excel(EXCEL_PATH)
print(f"Field-Mappings geladen: {len(field_map)}")
type_map = build_field_type_map_from_excel(EXCEL_PATH)
query_api = client.query_api() query_api = client.query_api()
write_api = client.write_api(write_options=SYNCHRONOUS) write_api = client.write_api(write_options=SYNCHRONOUS)
for meas in MEASUREMENTS: for meas in MEASUREMENTS:
print(f"\n== Migration: {meas} ({SOURCE_BUCKET} -> {TARGET_BUCKET}) ==") do_cast = meas in ("hp_master", "hp_slave")
cur, total = START_DT, 0
print(f"\n== {meas} (cast={'ON' if do_cast else 'OFF'}) ==")
cur = START_DT
total = 0
while cur < STOP_DT: while cur < STOP_DT:
nxt = min(cur + WINDOW, STOP_DT) nxt = min(cur + WINDOW, STOP_DT)
print(f" Fenster: {cur.isoformat()} -> {nxt.isoformat()}")
n = migrate_window(client, query_api, write_api, meas, field_map, cur, nxt) if window_already_migrated(query_api, meas, cur, nxt):
print(f"{cur.isoformat()} -> {nxt.isoformat()} : SKIP (existiert schon)")
cur = nxt
continue
n = migrate_window(query_api, write_api, meas, cur, nxt, type_map, do_cast)
total += n total += n
print(f" geschrieben: {n} (gesamt {total})") print(f"{cur.isoformat()} -> {nxt.isoformat()} : {n} (gesamt {total})")
cur = nxt cur = nxt
print(f"== Fertig {meas}: {total} Punkte ==") print(f"== Fertig {meas}: {total} Punkte ==")