文档文档

处理引擎和 Python 插件

使用 InfluxDB 3 处理引擎在您的 InfluxDB 3 Core 数据库中直接运行 Python 代码,以自动处理数据并响应数据库事件。

处理引擎是一个嵌入式 Python VM,它在您的 InfluxDB 3 数据库内部运行,并允许您

  • 在数据写入数据库时处理数据
  • 按计划运行代码
  • 创建执行 Python 代码的 API 端点
  • 使用内存缓存维护执行之间的状态

了解如何创建、配置、运行和扩展在特定事件发生时执行的 Python 插件。

  1. 设置处理引擎
  2. 添加处理引擎插件
  3. 创建触发器以运行插件

设置处理引擎

要启用处理引擎,请使用 --plugin-dir 选项启动您的 InfluxDB 服务器

influxdb3 serve \
  --node-id node0 \
  --object-store [OBJECT_STORE_TYPE] \
  --plugin-dir /path/to/plugins

/path/to/plugins 替换为您要存储 Python 插件文件的目录。所有插件文件都必须位于此目录或其子目录中。

配置分布式环境

如果您正在运行多个 InfluxDB 3 Core 实例(分布式部署)

  1. 决定插件应在何处运行
    • 数据处理插件(例如 WAL 插件)在摄取节点上运行
    • HTTP 触发的插件在处理 API 请求的节点上运行
    • 计划插件可以在任何配置的节点上运行
  2. 在选定的实例上启用插件
  3. 在运行插件的所有实例中维护相同的插件文件
    • 使用共享存储或文件同步工具来保持插件一致

向运行插件的节点提供插件

在与运行触发器和插件的节点相同的系统上配置您的插件目录。

添加处理引擎插件

插件是一个 Python 文件,其中包含与触发器类型对应的特定函数签名。插件

  • 接收特定于插件的参数(例如写入的数据、调用时间或 HTTP 请求)
  • 可以从触发器参数接收关键字参数(作为 args
  • 可以访问 influxdb3_local 共享 API 以进行写入、查询和管理状态

开始使用示例插件或创建您自己的插件

获取示例插件

InfluxData 维护着一个贡献插件的存储库,您可以按原样使用,也可以将其用作您自己的插件的起点。

从本地文件

您可以将示例插件从 influxdb3_plugins 存储库复制到您的本地插件目录

# Clone the repository
git clone https://github.com/influxdata/influxdb3_plugins.git

# Copy example plugins to your plugin directory
cp -r influxdb3_plugins/examples/wal_plugin/* /path/to/plugins/

直接从 GitHub

您可以使用 GitHub 中的插件,而无需先下载它们,方法是在插件文件名中使用 gh: 前缀

# Use a plugin directly from GitHub
influxdb3 create trigger \
    --trigger-spec "every:1m" \
    --plugin-filename "gh:examples/schedule/system_metrics/system_metrics.py" \
    --database my_database \
    system_metrics

查找和贡献插件

插件存储库包含各种用例的示例

  • 数据转换:处理和转换传入数据
  • 警报:根据数据阈值发送通知
  • 聚合:计算时间序列数据的统计信息
  • 集成:连接到外部服务和 API
  • 系统监控:跟踪资源使用情况和健康指标

访问 influxdata/influxdb3_plugins 以浏览可用的插件或贡献您自己的插件。

创建插件

  1. 在您的插件目录中创建一个 .py 文件
  2. 使用以下签名之一定义函数

对于数据写入事件

def process_writes(influxdb3_local, table_batches, args=None):
    # Process data as it's written to the database
    for table_batch in table_batches:
        table_name = table_batch["table_name"]
        rows = table_batch["rows"]
        
        # Log information about the write
        influxdb3_local.info(f"Processing {len(rows)} rows from {table_name}")
        
        # Write derived data back to the database
        line = LineBuilder("processed_data")
        line.tag("source_table", table_name)
        line.int64_field("row_count", len(rows))
        influxdb3_local.write(line)

对于计划事件

def process_scheduled_call(influxdb3_local, call_time, args=None):
    # Run code on a schedule
    
    # Query recent data
    results = influxdb3_local.query("SELECT * FROM metrics WHERE time > now() - INTERVAL '1 hour'")
    
    # Process the results
    if results:
        influxdb3_local.info(f"Found {len(results)} recent metrics")
    else:
        influxdb3_local.warn("No recent metrics found")

对于 HTTP 请求

def process_request(influxdb3_local, query_parameters, request_headers, request_body, args=None):
    # Handle HTTP requests to a custom endpoint
    
    # Log the request parameters
    influxdb3_local.info(f"Received request with parameters: {query_parameters}")
    
    # Process the request body
    if request_body:
        import json
        data = json.loads(request_body)
        influxdb3_local.info(f"Request data: {data}")
    
    # Return a response (automatically converted to JSON)
    return {"status": "success", "message": "Request processed"}

添加插件后,您可以安装 Python 依赖项或学习如何使用 API 功能和状态管理扩展插件

创建触发器以运行插件

触发器将您的插件连接到特定的数据库事件。插件文件中的插件函数签名决定了您可以选择哪个触发器规范来配置和激活您的插件。

使用 influxdb3 create trigger 命令创建触发器。

当指定本地插件文件时,--plugin-filename 参数相对于为服务器配置的 --plugin-dir。您无需提供绝对路径。

为数据写入创建触发器

使用 table:<TABLE_NAME>all_tables 触发器规范来配置和运行数据写入事件的插件——例如

# Trigger on writes to a specific table
# The plugin file must be in your configured plugin directory
influxdb3 create trigger \
  --trigger-spec "table:sensor_data" \
  --plugin-filename "process_sensors.py" \
  --database my_database \
  sensor_processor

# Trigger on writes to all tables
influxdb3 create trigger \
  --trigger-spec "all_tables" \
  --plugin-filename "process_all_data.py" \
  --database my_database \
  all_data_processor

当数据库将指定表的摄取数据刷新到对象存储中的预写日志 (WAL) 时(默认为每秒一次),触发器运行。

插件接收写入的数据和表信息。

为计划事件创建触发器

使用 every:<DURATION>cron:<CRONTAB_EXPRESSION> 触发器规范来配置和运行计划事件的插件——例如

# Run every 5 minutes
influxdb3 create trigger \
  --trigger-spec "every:5m" \
  --plugin-filename "hourly_check.py" \
  --database my_database \
  regular_check

# Run on a cron schedule (8am daily)
influxdb3 create trigger \
  --trigger-spec "cron:0 8 * * *" \
  --plugin-filename "daily_report.py" \
  --database my_database \
  daily_report

插件接收计划的调用时间。

为 HTTP 请求创建触发器

[对于 HTTP 请求插件],使用 path:<ENDPOINT_PATH> 触发器规范来配置和启用HTTP 请求的插件——例如

# Create an endpoint at /api/v3/engine/webhook
influxdb3 create trigger \
  --trigger-spec "path:webhook" \
  --plugin-filename "webhook_handler.py" \
  --database my_database \
  webhook_processor

触发器使您的端点在 /api/v3/engine/<ENDPOINT_PATH> 上可用。要运行插件,请向端点发送 GETPOST 请求——例如

curl http://localhost:8181/api/v3/engine/webhook

插件接收包含方法、标头和正文的 HTTP 请求对象。

使用来自 GitHub 的社区插件

您可以使用 gh: 前缀直接从 GitHub 存储库引用插件

# Create a trigger using a plugin from GitHub
influxdb3 create trigger \
  --trigger-spec "every:1m" \
  --plugin-filename "gh:examples/schedule/system_metrics/system_metrics.py" \
  --database my_database \
  system_metrics

将参数传递给插件

使用触发器参数将配置从触发器传递到它运行的插件。您可以将其用于

  • 监控的阈值
  • 外部服务的连接属性
  • 插件行为的配置设置
influxdb3 create trigger \
  --trigger-spec "every:1h" \
  --plugin-filename "threshold_check.py" \
  --trigger-arguments threshold=90,notify_email=admin@example.com \
  --database my_database \
  threshold_monitor

参数作为 Dict[str, str] 传递给插件,其中键是参数名称,值是参数值

def process_scheduled_call(influxdb3_local, call_time, args=None):
    if args and "threshold" in args:
        threshold = float(args["threshold"])
        email = args.get("notify_email", "default@example.com")
        
        # Use the arguments in your logic
        influxdb3_local.info(f"Checking threshold {threshold}, will notify {email}")

控制触发器执行

默认情况下,触发器同步运行——每个实例在执行之前等待之前的实例完成。

要允许多个相同触发器的实例同时运行,请将触发器配置为异步运行

# Allow multiple trigger instances to run simultaneously
influxdb3 create trigger \
  --trigger-spec "table:metrics" \
  --plugin-filename "heavy_process.py" \
  --run-asynchronous \
  --database my_database \
  async_processor

配置触发器的错误处理

要配置触发器的错误处理行为,请使用带有以下值之一的 --error-behavior <ERROR_BEHAVIOR> CLI 选项

  • log(默认):将所有插件错误记录到 stdout 和 system.processing_engine_logs 系统表。
  • retry:尝试在错误发生后立即再次运行插件。
  • disable:在发生错误时自动禁用插件(稍后可以通过 CLI 重新启用)。
# Automatically retry on error
influxdb3 create trigger \
  --trigger-spec "table:important_data" \
  --plugin-filename "critical_process.py" \
  --error-behavior retry \
  --database my_database \
  critical_processor

# Disable the trigger on error
influxdb3 create trigger \
  --trigger-spec "path:webhook" \
  --plugin-filename "webhook_handler.py" \
  --error-behavior disable \
  --database my_database \
  auto_disable_processor

使用 API 功能和状态管理扩展插件

处理引擎包括 API 功能,这些功能允许您的插件与 InfluxDB 数据交互并在执行之间维护状态。这些功能使您可以构建更复杂的插件,这些插件可以转换、分析和响应数据。

使用共享 API

所有插件都可以访问共享 API 以与数据库交互。

写入数据

使用 LineBuilder API 创建行协议数据

# Create a line protocol entry
line = LineBuilder("weather")
line.tag("location", "us-midwest")
line.float64_field("temperature", 82.5)
line.time_ns(1627680000000000000)

# Write the data to the database
influxdb3_local.write(line)

写入在插件运行时缓冲,并在插件完成时刷新。

查看 LineBuilder Python 实现

查询数据

执行 SQL 查询并获取结果

# Simple query
results = influxdb3_local.query("SELECT * FROM metrics WHERE time > now() - INTERVAL '1 hour'")

# Parameterized query for safer execution
params = {"table": "metrics", "threshold": 90}
results = influxdb3_local.query("SELECT * FROM $table WHERE value > $threshold", params)

共享 API query 函数将结果作为 Dict[String, Any]List 返回,其中键是列名,值是列值。

记录信息

共享 API infowarnerror 函数接受多个参数,将它们转换为字符串,并将它们作为空格分隔的消息记录到数据库日志中,该日志输出在服务器日志中,并捕获在您可以使用 SQL 查询的系统表中。

添加日志记录以跟踪插件执行

influxdb3_local.info("Starting data processing")
influxdb3_local.warn("Could not process some records")
influxdb3_local.error("Failed to connect to external API")

# Log structured data
obj_to_log = {"records": 157, "errors": 3}
influxdb3_local.info("Processing complete", obj_to_log)

使用内存缓存

处理引擎提供了一个内存缓存系统,使插件能够在执行之间持久保存和检索数据。

使用共享 API cache 属性访问缓存 API。

# Basic usage pattern  
influxdb3_local.cache.METHOD(PARAMETERS)
方法参数返回描述
putkey (str):要存储值的键
value (Any):要缓存的任何 Python 对象
ttl (Optional[float], default=None):过期前的秒数
use_global (bool, default=False):如果为 True,则使用全局命名空间
在缓存中存储一个值,并带有可选的生存时间
getkey (str):要检索的键
default (Any, default=None):如果未找到键,则返回的值
use_global (bool, default=False):如果为 True,则使用全局命名空间
Any从缓存中检索值,如果未找到则返回默认值
deletekey (str):要删除的键
use_global (bool, default=False):如果为 True,则使用全局命名空间
bool从缓存中删除一个值。如果已删除,则返回 True,如果未找到,则返回 False
缓存命名空间

缓存系统提供两个不同的命名空间

命名空间范围最适合
特定于触发器(默认)隔离到单个触发器特定于一个插件的插件状态、计数器、时间戳
全局在所有触发器之间共享应可供所有插件使用的配置、查找表、服务状态
存储和检索缓存的数据
# Store a value
influxdb3_local.cache.put("last_run_time", time.time())

# Retrieve a value with a default if not found
last_time = influxdb3_local.cache.get("last_run_time", default=0)

# Delete a cached value
influxdb3_local.cache.delete("temporary_data")
存储带有过期时间的缓存数据
# Cache with a 5-minute TTL (time-to-live)
influxdb3_local.cache.put("api_response", response_data, ttl=300)
跨插件共享数据
# Store in the global namespace
influxdb3_local.cache.put("config", {"version": "1.0"}, use_global=True)

# Retrieve from the global namespace
config = influxdb3_local.cache.get("config", use_global=True)
跟踪执行之间的状态
# Get current counter or default to 0
counter = influxdb3_local.cache.get("execution_count", default=0)

# Increment counter
counter += 1

# Store the updated value
influxdb3_local.cache.put("execution_count", counter)

influxdb3_local.info(f"This plugin has run {counter} times")

内存缓存的最佳实践

使用特定于触发器的命名空间

缓存旨在支持有状态操作,同时保持不同触发器之间的隔离。对于大多数操作,请使用特定于触发器的命名空间,仅当需要在触发器之间共享数据时才使用全局命名空间。

适当使用 TTL

根据数据更改的频率设置实际的过期时间。

# Cache external API responses for 5 minutes  
influxdb3_local.cache.put("weather_data", api_response, ttl=300)
缓存计算结果

存储需要频繁使用的高成本计算的结果。

# Cache aggregated statistics  
influxdb3_local.cache.put("daily_stats", calculate_statistics(data), ttl=3600)
预热缓存

对于关键数据,请在启动时预热缓存。这对于多个触发器需要数据的全局命名空间数据尤其有用。

# Check if cache needs to be initialized  
if not influxdb3_local.cache.get("lookup_table"):   
    influxdb3_local.cache.put("lookup_table", load_lookup_data())
考虑缓存限制
  • 内存使用量:由于缓存内容存储在内存中,因此在缓存大型数据集时,请监控您的内存使用量。
  • 服务器重启:由于缓存会在服务器重启时清除,因此请设计您的插件来处理缓存初始化(如上所述)。
  • 并发:当多个触发器实例可能同时更新相同的缓存键时,请注意访问不准确或过时的数据。

安装 Python 依赖项

如果您的插件需要额外的 Python 包,请使用 influxdb3 install 命令

# Install a package directly
influxdb3 install package pandas
# With Docker
docker exec -it CONTAINER_NAME influxdb3 install package pandas

这会在您的插件目录中创建一个 Python 虚拟环境,其中安装了指定的包。


此页面是否有帮助?

感谢您的反馈!


Flux 的未来

Flux 即将进入维护模式。您可以继续像目前一样使用它,而无需对您的代码进行任何更改。

阅读更多

现已全面上市

InfluxDB 3 Core 和 Enterprise

快速启动。更快扩展。

获取更新

InfluxDB 3 Core 是一个开源、高速、最新的数据引擎,可实时收集和处理数据,并将其持久化到本地磁盘或对象存储。InfluxDB 3 Enterprise 以 Core 的基础为构建,增加了高可用性、读取副本、增强的安全性以及数据压缩,以实现更快的查询和优化的存储。InfluxDB 3 Enterprise 的免费层可供非商业家庭或业余爱好者使用。

有关更多信息,请查看