from influxdb_client import InfluxDBClient, Point, WritePrecision from datetime import datetime import datetime as dt import pandas as pd class DataBaseInflux: def __init__(self, url: str, token: str, org: str, bucket: str): self.url = url self.token = token self.org = org self.bucket = bucket self.client = InfluxDBClient(url=self.url, token=self.token, org=self.org) self.write_api = self.client.write_api() def store_data(self, device_name: str, data: dict): measurement = device_name # Fest auf "messungen" gesetzt point = Point(measurement) # Alle Key/Value-Paare als Fields speichern for key, value in data.items(): point = point.field(key, value) # Zeitstempel automatisch auf jetzt setzen point = point.time(datetime.utcnow(), WritePrecision.NS) # Punkt in InfluxDB schreiben self.write_api.write(bucket=self.bucket, org=self.org, record=point) def store_forecasts(self, forecast_name: str, data: pd.Series): measurement = forecast_name run_tag = dt.datetime.now(dt.timezone.utc).replace(second=0, microsecond=0).isoformat(timespec="minutes") pts = [] series = pd.to_numeric(data, errors="coerce").dropna() for ts, val in series.items(): pts.append( Point(measurement) .tag("run", run_tag) .field("value", float(val)) .time(ts.to_pydatetime(), WritePrecision.S) ) self.write_api.write(bucket=self.bucket, org=self.org, record=pts)