处理引擎和 Python 插件
在 InfluxDB 3 Core 中使用处理引擎,通过自定义 Python 代码来扩展您的数据库。在数据写入、按计划或按需触发您的代码,以自动化工作流程、转换数据并创建 API 端点。
什么是处理引擎?
处理引擎是嵌入在 InfluxDB 3 Core 数据库内部的 Python 虚拟机。您配置触发器来运行您的 Python插件代码,以响应
- 数据写入 - 在数据进入数据库时进行处理和转换
- 计划事件 - 在指定的时间间隔或特定时间运行代码
- HTTP 请求 - 暴露执行您的自定义 API 端点
您可以使用处理引擎的内存缓存来管理执行之间的状态,并直接在数据库中构建有状态应用程序。
本指南将引导您完成设置处理引擎、创建第一个插件以及配置在特定事件上执行代码的触发器。
开始之前
确保您拥有
- 正常运行的 InfluxDB 3 Core 实例
- 命令行访问权限
- 如果您要编写自己的插件,请安装 Python
- InfluxDB CLI 的基本知识
当您具备所有先决条件后,请按照以下步骤为您的数据自动化需求实施处理引擎。
设置处理引擎
要激活处理引擎,请使用 --plugin-dir 标志启动您的 InfluxDB 3 Core 服务器。此标志告诉 InfluxDB 从何处加载您的插件文件。
将 influxdb3 二进制文件及其 python 目录一起保留
influxdb3 二进制文件需要相邻的 python/ 目录才能正常工作。如果您手动从 tar.gz 提取,请将它们保留在同一个父目录中
your-install-location/
├── influxdb3
└── python/将父目录添加到您的 PATH;请勿将二进制文件移出此目录。
influxdb3 serve \
--NODE_ID \
--object-store OBJECT_STORE_TYPE \
--plugin-dir PLUGIN_DIR在上面的示例中,替换以下内容
NODE_ID:实例的唯一标识符OBJECT_STORE_TYPE:对象存储的类型(例如,file 或 s3)PLUGIN_DIR:插件文件存储目录的绝对路径。将所有插件文件存储在此目录或其子目录中。
使用自定义插件存储库
默认情况下,带有 gh: 前缀引用的插件将从官方 influxdata/influxdb3_plugins 存储库中获取。要使用自定义存储库,请在启动服务器时添加 --plugin-repo 标志。有关详细信息,请参阅 使用自定义插件存储库。
配置分布式环境
在分布式环境中运行 InfluxDB 3 Core 时,请按照以下步骤配置处理引擎
- 决定每个插件应在何处运行
- 数据处理插件,例如 WAL 插件,在摄取节点上运行
- HTTP 触发的插件在处理 API 请求的节点上运行
- 计划插件可以在任何已配置的节点上运行
- 在正确的实例上启用插件
- 在插件运行的所有实例上保持相同的插件文件
- 使用共享存储或文件同步工具来保持插件的一致性
为运行它们的节点提供插件
在运行触发器和插件的节点相同的系统上配置您的插件目录。
添加处理引擎插件
插件是一个 Python 脚本,它为触发器定义了一个特定的函数签名(触发器规范)。当指定事件发生时,InfluxDB 会运行该插件。
选择插件策略
您有两种主要选项可将插件添加到您的 InfluxDB 实例
使用示例插件
InfluxData 维护着一个官方和社区插件存储库,您可以立即在您的处理引擎设置中使用。
浏览 插件库,查找示例和 InfluxData 官方插件,用于
- 数据转换:处理和转换传入数据
- 警报:根据数据阈值发送通知
- 聚合:计算时间序列数据的统计信息
- 集成:连接到外部服务和 API
- 系统监控:跟踪资源使用情况和健康指标
有关社区贡献,请参阅 GitHub 上的 influxdb3_plugins 存储库。
添加示例插件
您有两种选择可以使用存储库中的插件
选项 1:本地复制插件
克隆 influxdata/influxdb3_plugins 存储库并将插件复制到您配置的插件目录
# Clone the repository
git clone https://github.com/influxdata/influxdb3_plugins.git
# Copy a plugin to your configured plugin directory
cp influxdb3_plugins/influxdata/system_metrics/system_metrics.py /path/to/plugins/选项 2:直接从 GitHub 引用插件
通过使用 gh: 前缀直接从 GitHub 引用插件,跳过下载插件
# Create a trigger using a plugin from GitHub
influxdb3 create trigger \
--trigger-spec "every:1m" \
--plugin-filename "gh:influxdata/system_metrics/system_metrics.py" \
--database my_database \
system_metrics此方法
- 确保您使用的是最新版本
- 简化更新和维护
- 减少本地存储需求
选项 3:使用自定义插件存储库
对于维护自己的插件存储库或需要使用私有/内部插件的组织,请配置自定义插件存储库 URL
# Start the server with a custom plugin repository
influxdb3 serve \
--node-id node0 \
--object-store file \
--data-dir ~/.influxdb3 \
--plugin-dir ~/.plugins \
--plugin-repo "https://internal.company.com/influxdb-plugins/"然后使用 gh: 前缀从您的自定义存储库中引用插件
# Fetches from: https://internal.company.com/influxdb-plugins/myorg/custom_plugin.py
influxdb3 create trigger \
--trigger-spec "every:5m" \
--plugin-filename "gh:myorg/custom_plugin.py" \
--database my_database \
custom_trigger自定义存储库的用例
- 私有插件:托管不适合公开存储库的专有插件
- 隔离环境:在外部互联网访问受限时使用内部镜像
- 开发和暂存:在生产部署前测试开发分支中的插件
- 合规性要求:满足要求内部托管的数据治理策略
--plugin-repo 选项接受任何提供原始插件文件的 HTTP/HTTPS URL。有关更多详细信息,请参阅 plugin-repo 配置选项。
插件具有各种功能,例如
- 接收插件特定参数(例如写入的数据、调用时间或 HTTP 请求)
- 访问从触发器参数配置传递的关键字参数(作为
args) - 访问
influxdb3_local共享 API 来写入数据、查询数据以及管理执行之间的状态
有关可用函数、参数以及插件如何与 InfluxDB 交互的更多信息,请参阅如何 扩展插件。
创建自定义插件
要构建自定义功能,您可以创建自己的处理引擎插件。
前提条件
开始之前,请确保
- 您的 InfluxDB 3 Core 实例已启用处理引擎。
- 您已配置了存储插件文件的
--plugin-dir。 - 您有权访问该插件目录。
创建插件的步骤
选择您的插件类型
根据您的自动化目标选择插件类型
| 插件类型 | 最适合 |
|---|---|
| 数据写入 | 处理传入数据 |
| 计划 | 在特定时间间隔或时间运行代码 |
| HTTP 请求 | 通过 API 端点按需运行代码 |
创建您的插件文件
插件现在同时支持单文件和多文件架构
单文件插件
- 在您的插件目录中创建一个
.py文件 - 根据您选择的插件类型添加适当的函数签名
- 在函数中编写您的处理逻辑
多文件插件
- 在您的插件目录中创建一个目录
- 添加一个
__init__.py文件作为入口点(必需) - 将支持模块组织在其他
.py文件中 - 在您的插件代码中导入和使用模块
多文件插件示例结构
my_plugin/
├── __init__.py # Required - entry point with trigger function
├── utils.py # Supporting module
├── processors.py # Data processing functions
└── config.py # Configuration helpers__init__.py 文件必须包含您的触发器函数
# my_plugin/__init__.py
from .processors import process_data
from .config import get_settings
def process_writes(influxdb3_local, table_batches, args=None):
settings = get_settings()
for table_batch in table_batches:
process_data(influxdb3_local, table_batch, settings)支持模块可以包含辅助函数
# my_plugin/processors.py
def process_data(influxdb3_local, table_batch, settings):
# Processing logic here
pass编写插件后,创建触发器以将其连接到数据库事件并定义其运行时间。
创建数据写入插件
使用数据写入插件来处理数据写入数据库时的过程。这些插件使用 table: 或 all_tables: 触发器规范。理想的用例包括
- 数据转换和丰富
- 对传入值进行警报
- 创建派生指标
def process_writes(influxdb3_local, table_batches, args=None):
# Process data as it's written to the database
for table_batch in table_batches:
table_name = table_batch["table_name"]
rows = table_batch["rows"]
# Log information about the write
influxdb3_local.info(f"Processing {len(rows)} rows from {table_name}")
# Write derived data back to the database
line = LineBuilder("processed_data")
line.tag("source_table", table_name)
line.int64_field("row_count", len(rows))
influxdb3_local.write(line)创建计划插件
计划插件在定义的间隔使用 every: 或 cron: 触发器规范运行。将它们用于
- 定期数据聚合
- 报告生成
- 系统健康检查
def process_scheduled_call(influxdb3_local, call_time, args=None):
# Run code on a schedule
# Query recent data
results = influxdb3_local.query("SELECT * FROM metrics WHERE time > now() - INTERVAL '1 hour'")
# Process the results
if results:
influxdb3_local.info(f"Found {len(results)} recent metrics")
else:
influxdb3_local.warn("No recent metrics found")创建 HTTP 请求插件
HTTP 请求插件响应 API 调用,使用 request: 触发器规范。将它们用于
- 创建自定义 API 端点
- 用于外部集成的 Webhook
- 用于数据交互的用户界面
def process_request(influxdb3_local, query_parameters, request_headers, request_body, args=None):
# Handle HTTP requests to a custom endpoint
# Log the request parameters
influxdb3_local.info(f"Received request with parameters: {query_parameters}")
# Process the request body
if request_body:
import json
data = json.loads(request_body)
influxdb3_local.info(f"Request data: {data}")
# Return a response (automatically converted to JSON)
return {"status": "success", "message": "Request processed"}下一步
编写插件后
- 创建触发器以将您的插件连接到数据库事件
- 安装插件所需的任何 Python 依赖项
- 了解如何 使用 API 扩展插件
从本地计算机上传插件
对于本地开发和测试,您可以在创建触发器时直接从您的计算机上传插件文件。这消除了手动将文件复制到服务器插件目录的需要。
将 --upload 标志与 --path 一起使用来传输本地文件或目录
# Upload single-file plugin
influxdb3 create trigger \
--trigger-spec "every:10s" \
--path "/local/path/to/plugin.py" \
--upload \
--database metrics \
my_trigger
# Upload multifile plugin directory
influxdb3 create trigger \
--trigger-spec "every:30s" \
--path "/local/path/to/plugin-dir" \
--upload \
--database metrics \
complex_trigger需要管理员权限
插件上传需要管理员令牌。此安全措施可防止在服务器上未经授权的代码执行。
何时使用插件上传
- 本地插件开发和测试
- 部署插件而无需 SSH 访问服务器
- 插件代码的快速迭代
- 在 CI/CD 管道中自动化插件部署
有关更多信息,请参阅 influxdb3 create trigger CLI 参考。
更新现有插件
修改运行触发器的插件代码,而无需重新创建它们。这允许您迭代插件开发,同时保留触发器配置和历史记录。
使用 influxdb3 update trigger 命令
# Update single-file plugin
influxdb3 update trigger \
--database metrics \
--trigger-name my_trigger \
--path "/path/to/updated/plugin.py"
# Update multifile plugin
influxdb3 update trigger \
--database metrics \
--trigger-name complex_trigger \
--path "/path/to/updated/plugin-dir"更新操作
- 立即替换插件文件
- 保留触发器配置(规范、计划、参数)
- 出于安全原因需要管理员令牌
- 适用于本地路径和上传文件
有关完整参考,请参阅 influxdb3 update trigger。
查看已加载的插件
监控系统中加载了哪些插件,以实现操作可见性和故障排除。
选项 1:使用 CLI 命令
# List all plugins
influxdb3 show plugins --token $ADMIN_TOKEN
# JSON format for programmatic access
influxdb3 show plugins --format json --token $ADMIN_TOKEN选项 2:查询系统表
_internal 数据库中的 system.plugin_files 表提供了详细的插件文件信息
influxdb3 query \
-d _internal \
"SELECT * FROM system.plugin_files ORDER BY plugin_name" \
--token $ADMIN_TOKEN可用列
plugin_name(String):触发器名称file_name(String):插件文件名file_path(String):服务器的完整路径size_bytes(Int64):文件大小last_modified(Int64):修改时间戳(毫秒)
示例查询
-- Find plugins by name
SELECT * FROM system.plugin_files WHERE plugin_name = 'my_trigger';
-- Find large plugins
SELECT plugin_name, size_bytes
FROM system.plugin_files
WHERE size_bytes > 10000;
-- Check modification times
SELECT plugin_name, file_name, last_modified
FROM system.plugin_files
ORDER BY last_modified DESC;有关更多信息,请参阅 influxdb3 show plugins 参考 和 查询系统数据。
设置触发器
了解触发器类型
| 插件类型 | 触发器规范 | 插件运行时间 |
|---|---|---|
| 数据写入 | table:<TABLE_NAME> 或 all_tables | 写入表时 |
| 计划 | every:<DURATION> 或 cron:<EXPRESSION> | 在指定的时间间隔 |
| HTTP 请求 | request:<REQUEST_PATH> | 收到 HTTP 请求时 |
使用 create trigger 命令
使用 influxdb3 create trigger 命令,并附带适当的触发器规范
influxdb3 create trigger \
--trigger-spec SPECIFICATION \
--plugin-filename PLUGIN_FILE \
--database DATABASE_NAME \
TRIGGER_NAME在上面的示例中,替换以下内容
SPECIFICATION:触发器规范PLUGIN_FILE:相对于您的已配置插件目录的插件文件名DATABASE_NAME:数据库名称TRIGGER_NAME:新触发器的名称
当指定本地插件文件时,--plugin-filename 参数相对于服务器配置的 --plugin-dir。您无需提供绝对路径。
触发器规范示例
触发数据写入
# Trigger on writes to a specific table
# The plugin file must be in your configured plugin directory
influxdb3 create trigger \
--trigger-spec "table:sensor_data" \
--plugin-filename "process_sensors.py" \
--database my_database \
sensor_processor
# Trigger on writes to all tables
influxdb3 create trigger \
--trigger-spec "all_tables" \
--plugin-filename "process_all_data.py" \
--database my_database \
all_data_processor当数据库将摄入的数据刷新到对象存储的写前日志 (WAL) 时(默认为每秒一次),触发器运行。
插件接收写入的数据和表信息。
触发数据写入(排除表)
如果您想为所有表使用一个触发器,但排除特定表,您可以使用触发器参数和您的插件代码来过滤掉不需要的表——例如
influxdb3 create trigger \
--database DATABASE_NAME \
--token AUTH_TOKEN \
--plugin-filename processor.py \
--trigger-spec "all_tables" \
--trigger-arguments "exclude_tables=temp_data,debug_info,system_logs" \
data_processor替换以下内容:
- DATABASE_NAME:数据库名称
- AUTH_TOKEN:您的 令牌
然后在您的插件中
# processor.py
def on_write(self, database, table_name, batch):
# Get excluded tables from trigger arguments
excluded_tables = set(self.args.get('exclude_tables', '').split(','))
if table_name in excluded_tables:
return
# Process allowed tables
self.process_data(database, table_name, batch)建议
- 尽早返回:在插件中尽早检查排除项。
- 高效查找:使用集合来获得 O(1) 的查找性能,对于大型排除列表。
- 性能:记录被跳过的表用于调试,但在生产中避免过度日志记录。
- 多个触发器:对于少量表,请考虑创建单独的表特定触发器,而不是在插件代码中进行过滤。请参阅 HTTP API 处理引擎端点以管理触发器。
按计划触发
# Run every 5 minutes
influxdb3 create trigger \
--trigger-spec "every:5m" \
--plugin-filename "periodic_check.py" \
--database my_database \
regular_check
# Run on a cron schedule (8am daily)
# Supports extended cron format with seconds
influxdb3 create trigger \
--trigger-spec "cron:0 0 8 * * *" \
--plugin-filename "daily_report.py" \
--database my_database \
daily_report插件接收计划的调用时间。
触发 HTTP 请求
# Create an endpoint at /api/v3/engine/webhook
influxdb3 create trigger \
--trigger-spec "request:webhook" \
--plugin-filename "webhook_handler.py" \
--database my_database \
webhook_processor通过 /api/v3/engine/{REQUEST_PATH}(在本例中为 /api/v3/engine/webhook)访问您的端点。该触发器默认启用,并在收到指定路径上的 HTTP 请求时运行。
要运行插件,请向端点发送 GET 或 POST 请求——例如
curl https://:8181/api/v3/engine/webhook插件接收包含方法、标头和正文的 HTTP 请求对象。
要查看与数据库关联的触发器,请使用 influxdb3 show summary 命令
influxdb3 show summary --database my_database --token AUTH_TOKEN将参数传递给插件
使用触发器参数将配置从触发器传递给它运行的插件。您可以使用此来
- 监控的阈值
- 外部服务的连接属性
- 插件行为的配置设置
influxdb3 create trigger \
--trigger-spec "every:1h" \
--plugin-filename "threshold_check.py" \
--trigger-arguments threshold=90,notify_email=admin@example.com \
--database my_database \
threshold_monitor参数作为 Dict[str, str] 传递给插件,其中键是参数名称,值是参数值
def process_scheduled_call(influxdb3_local, call_time, args=None):
if args and "threshold" in args:
threshold = float(args["threshold"])
email = args.get("notify_email", "default@example.com")
# Use the arguments in your logic
influxdb3_local.info(f"Checking threshold {threshold}, will notify {email}")控制触发器执行
默认情况下,触发器是同步运行的——每个实例在执行前等待前一个实例完成。
要允许同一触发器的多个实例同时运行,请将触发器配置为异步运行
# Allow multiple trigger instances to run simultaneously
influxdb3 create trigger \
--trigger-spec "table:metrics" \
--plugin-filename "heavy_process.py" \
--run-asynchronous \
--database my_database \
async_processor配置触发器的错误处理
要配置触发器的错误处理行为,请使用 --error-behavior <ERROR_BEHAVIOR> CLI 选项,并附带以下值之一
log(默认):将所有插件错误日志记录到 stdout 和system.processing_engine_logs系统表。retry:在发生错误后立即尝试再次运行插件。disable:发生错误时自动禁用插件(稍后可通过 CLI 重新启用)。
# Automatically retry on error
influxdb3 create trigger \
--trigger-spec "table:important_data" \
--plugin-filename "critical_process.py" \
--error-behavior retry \
--database my_database \
critical_processor
# Disable the trigger on error
influxdb3 create trigger \
--trigger-spec "request:webhook" \
--plugin-filename "webhook_handler.py" \
--error-behavior disable \
--database my_database \
auto_disable_processor管理插件依赖项
使用 influxdb3 install package 命令将第三方库(如 pandas、requests 或 influxdb3-python)添加到您的插件环境。
这会将包安装到处理引擎的嵌入式 Python 环境中,以确保与您的 InfluxDB 实例兼容。
# Use the CLI to install a Python package
influxdb3 install package pandas# Use the CLI to install a Python package in a Docker container
docker exec -it CONTAINER_NAME influxdb3 install package pandas这些示例将指定的 Python 包(例如,pandas)安装到处理引擎的嵌入式虚拟环境中。
- 在直接在系统上运行 InfluxDB 时使用 CLI 命令。
- 如果您在容器化环境中运行 InfluxDB,请使用 Docker 变体。
为插件使用捆绑的 Python
当您使用 --plugin-dir 选项启动服务器时,InfluxDB 3 会在您的插件目录中创建一个 Python 虚拟环境(<PLUGIN_DIR>/venv)。如果您需要创建自定义虚拟环境,请使用 InfluxDB 3 捆绑的 Python 解释器。不要使用系统 Python。使用系统 Python 创建虚拟环境(例如,使用 python -m venv)可能导致运行时错误和插件失败。
有关更多信息,请参阅 处理引擎 README。
InfluxDB 会在您的插件目录中创建一个 Python 虚拟环境,并安装指定的包。
为安全环境禁用包安装
对于隔离部署或安全要求严格的环境,您可以在保持处理引擎功能正常的同时禁用 Python 包安装。
使用 --package-manager disabled 启动服务器
influxdb3 serve \
--node-id node0 \
--object-store file \
--data-dir ~/.influxdb3 \
--plugin-dir ~/.plugins \
--package-manager disabled当禁用包安装时
- 处理引擎继续正常运行触发器
- 插件代码在不受限制的情况下执行
- 包安装命令被阻止
- 虚拟环境中的预装依赖项仍然可用
预装必需的依赖项
在禁用包管理器之前,请安装所有必需的 Python 包
# Install packages first
influxdb3 install package pandas requests numpy
# Then start with disabled package manager
influxdb3 serve \
--plugin-dir ~/.plugins \
--package-manager disabled禁用包管理的用例
- 没有互联网访问的隔离环境
- 禁止运行时包安装的合规性要求
- 集中管理的依赖项环境
- 仅要求预先批准包的安全策略
有关更多配置选项,请参阅 –package-manager。
插件安全
处理引擎包含安全功能,以保护您的 InfluxDB 3 Core 实例免受未经授权的代码执行和文件系统攻击。
插件路径验证
所有插件文件路径都会经过验证,以防止目录遍历攻击。系统会阻止
- 带有父目录引用的相对路径(
../、../../) - 绝对路径(
/etc/passwd、/usr/bin/script.py) - 逃离插件目录的符号链接
创建或更新触发器时,插件路径必须解析到配置的 --plugin-dir 内部。
被阻止路径示例
# These will be rejected
influxdb3 create trigger \
--path "../../../etc/passwd" \ # Blocked: parent directory traversal
...
influxdb3 create trigger \
--path "/tmp/malicious.py" \ # Blocked: absolute path
...有效插件路径
# These are allowed
influxdb3 create trigger \
--path "myapp/plugin.py" \ # Relative to plugin-dir
...
influxdb3 create trigger \
--path "transforms/data.py" \ # Subdirectory in plugin-dir
...上传和更新权限
插件上传和更新操作需要管理员令牌,以防止未经授权的代码部署
--upload标志需要管理员权限update trigger命令需要管理员令牌- 标准资源令牌无法上传或修改插件代码
此安全模型确保只有管理员才能在您的数据库中引入或修改可执行代码。
最佳实践
对于开发
- 在开发过程中使用
--upload标志来部署插件 - 首先在非生产环境中测试插件
- 在部署前审查插件代码
对于生产
- 通过安全文件传输预先将插件部署到服务器的插件目录
- 使用自定义插件存储库来存放经过审核、批准的插件
- 在锁定环境中禁用包安装(
--package-manager disabled) - 使用
system.plugin_files表审计插件文件 - 实施插件更新的变更控制流程
有关更多安全配置选项,请参阅 配置选项。
此页面是否有帮助?
感谢您的反馈!
支持和反馈
感谢您成为我们社区的一员!我们欢迎并鼓励您对 InfluxDB 3 Core 和本文档提供反馈和错误报告。要获得支持,请使用以下资源
具有年度合同或支持合同的客户可以 联系 InfluxData 支持。