from influxdb_client import InfluxDBClient
import re
from pydantic import BaseModel
from langchain.tools.base import StructuredTool
from typing import Optional
from tfg.utils.config import get_config_value
# Client configuration
INFLUXDB_URL = get_config_value("INFLUXDB_URL", "default-url")
INFLUXDB_TOKEN = get_config_value("INFLUXDB_TOKEN", "dummy-token")
INFLUXDB_ORG = get_config_value("INFLUXDB_ORG", "default-org")
INFLUXDB_BUCKET = get_config_value("INFLUXDB_BUCKET", "default-bucket")
client = InfluxDBClient(url=INFLUXDB_URL, token=INFLUXDB_TOKEN, org=INFLUXDB_ORG)
query_api = client.query_api()
# Supported parameters
VALID_METRICS = {"temperature", "humidity", "light", "motion", "vdd"}
VALID_AGGREGATIONS = {"mean", "max", "min", "sum"}
# Define expected input schema for the tool using Pydantic
# Function to construct Flux query dynamically
[docs]
def construct_flux_query(params: dict) -> str:
"""
Constructs a Flux query based on extracted parameters.
Args:
params (dict): Dictionary with keys 'metric', 'aggregation', and either 'time_range' or both 'start_time' and 'end_time'.
Returns:
str: A formatted Flux query string.
Raises:
ValueError: If metric or aggregation is invalid, or required time parameters are missing.
"""
field = params.get("metric", "humidity")
aggregation = params.get("aggregation", "mean")
start_time = params.get("start_time")
end_time = params.get("end_time")
time_range = params.get("time_range", "24h")
# Validate metric and aggregation
if field not in VALID_METRICS:
raise ValueError(f"❌ Invalid metric '{field}'. Available metrics: {', '.join(VALID_METRICS)}")
if aggregation not in VALID_AGGREGATIONS:
raise ValueError(f"❌ Invalid aggregation '{aggregation}'. Available functions: {', '.join(VALID_AGGREGATIONS)}")
# Build time range
if start_time and end_time:
time_clause = f'range(start: time(v: "{start_time}"), stop: time(v: "{end_time}"))'
elif time_range:
time_clause = f'range(start: -{time_range})'
else:
raise ValueError("❌ You must provide either a relative time_range or both start_time and end_time.")
# Construct query
flux_query = f"""
from(bucket: "{INFLUXDB_BUCKET}")
|> {time_clause}
|> filter(fn: (r) => r["_measurement"] == "sensor_data")
|> filter(fn: (r) => r["_field"] == "{field}")
|> aggregateWindow(every: 1h, fn: {aggregation}, createEmpty: false)
|> yield(name: "result")
"""
return flux_query
# Function that receives individual arguments (required by StructuredTool)
[docs]
def query_influxdb(
metric: str,
aggregation: str,
time_range: Optional[str] = None,
start_time: Optional[str] = None,
end_time: Optional[str] = None
) -> str:
"""
Queries InfluxDB using structured parameters. Accepts both relative and absolute time formats.
Args:
metric (str): Metric name to query (e.g., "temperature").
aggregation (str): Aggregation function (e.g., "mean", "max").
time_range (str, optional): Relative time range (e.g., "24h", "7d"). Ignored if start_time and end_time are provided.
start_time (str, optional): Absolute start time (e.g., "2024-11-01T00:00:00Z").
end_time (str, optional): Absolute end time (e.g., "2024-11-10T23:59:59Z").
Returns:
str: Resulting observation string or error message.
"""
params = {
"metric": metric,
"time_range": time_range,
"aggregation": aggregation,
"start_time": start_time,
"end_time": end_time,
}
return _query_influxdb_internal(params)
# Internal function to perform the actual query logic
[docs]
def _query_influxdb_internal(params: dict) -> str:
"""
Constructs and executes a Flux query on InfluxDB from parameter dictionary.
Args:
params (dict): Dictionary containing 'metric', 'time_range', and 'aggregation'.
Returns:
str: Query results or an error message.
"""
try:
# Build the Flux query dynamically from parameters
flux_query = construct_flux_query(params)
print(f"📊 Extracted Parameters: {params}")
print(f"🔥 Executing Flux Query:\n{flux_query}")
# Execute the query using InfluxDB client
result = query_api.query(org=INFLUXDB_ORG, query=flux_query)
results = []
# Format the results
for table in result:
for record in table.records:
results.append(f"Time: {record.get_time()}, Value: {record.get_value()}")
return "\n".join(results) if results else "⚠️ No data found in the database. Verify if data exists for this time range."
except ValueError as ve:
return str(ve) # Return validation error messages
except Exception as e:
return f"❌ Error querying InfluxDB: {str(e)}"
# Function to extract the time range from a query
# LangChain compatible tool
influx_tool = StructuredTool.from_function(
name="InfluxDB Query Tool",
description=(
"Fetches sensor data from InfluxDB.\n"
"- Required parameters: `metric` (e.g., 'temperature'), `aggregation` (e.g., 'mean').\n"
"- Time range can be specified either as a relative `time_range` (e.g., '24h') or as absolute times with `start_time` and `end_time` "
"(e.g., '2024-11-01T00:00:00Z')."
),
func=query_influxdb,
args_schema=InfluxDBQueryInput
)