文档文档

处理引擎和 Python 插件

在 InfluxDB 3 Core 中使用处理引擎,通过自定义 Python 代码来扩展您的数据库。在数据写入、按计划或按需触发您的代码,以自动化工作流程、转换数据并创建 API 端点。

什么是处理引擎?

处理引擎是嵌入在 InfluxDB 3 Core 数据库内部的 Python 虚拟机。您配置触发器来运行您的 Python插件代码,以响应

  • 数据写入 - 在数据进入数据库时进行处理和转换
  • 计划事件 - 在指定的时间间隔或特定时间运行代码
  • HTTP 请求 - 暴露执行您的自定义 API 端点

您可以使用处理引擎的内存缓存来管理执行之间的状态,并直接在数据库中构建有状态应用程序。

本指南将引导您完成设置处理引擎、创建第一个插件以及配置在特定事件上执行代码的触发器。

开始之前

确保您拥有

  • 正常运行的 InfluxDB 3 Core 实例
  • 命令行访问权限
  • 如果您要编写自己的插件,请安装 Python
  • InfluxDB CLI 的基本知识

当您具备所有先决条件后,请按照以下步骤为您的数据自动化需求实施处理引擎。

设置处理引擎

要激活处理引擎,请使用 --plugin-dir 标志启动您的 InfluxDB 3 Core 服务器。此标志告诉 InfluxDB 从何处加载您的插件文件。

将 influxdb3 二进制文件及其 python 目录一起保留

influxdb3 二进制文件需要相邻的 python/ 目录才能正常工作。如果您手动从 tar.gz 提取,请将它们保留在同一个父目录中

your-install-location/
├── influxdb3
└── python/

将父目录添加到您的 PATH;请勿将二进制文件移出此目录。

influxdb3 serve \
  --
NODE_ID
\
--object-store
OBJECT_STORE_TYPE
\
--plugin-dir
PLUGIN_DIR

在上面的示例中,替换以下内容

  • NODE_ID:实例的唯一标识符
  • OBJECT_STORE_TYPE:对象存储的类型(例如,file 或 s3)
  • PLUGIN_DIR:插件文件存储目录的绝对路径。将所有插件文件存储在此目录或其子目录中。

使用自定义插件存储库

默认情况下,带有 gh: 前缀引用的插件将从官方 influxdata/influxdb3_plugins 存储库中获取。要使用自定义存储库,请在启动服务器时添加 --plugin-repo 标志。有关详细信息,请参阅 使用自定义插件存储库

配置分布式环境

在分布式环境中运行 InfluxDB 3 Core 时,请按照以下步骤配置处理引擎

  1. 决定每个插件应在何处运行
    • 数据处理插件,例如 WAL 插件,在摄取节点上运行
    • HTTP 触发的插件在处理 API 请求的节点上运行
    • 计划插件可以在任何已配置的节点上运行
  2. 在正确的实例上启用插件
  3. 在插件运行的所有实例上保持相同的插件文件
    • 使用共享存储或文件同步工具来保持插件的一致性

为运行它们的节点提供插件

在运行触发器和插件的节点相同的系统上配置您的插件目录。

添加处理引擎插件

插件是一个 Python 脚本,它为触发器定义了一个特定的函数签名(触发器规范)。当指定事件发生时,InfluxDB 会运行该插件。

选择插件策略

您有两种主要选项可将插件添加到您的 InfluxDB 实例

使用示例插件

InfluxData 维护着一个官方和社区插件存储库,您可以立即在您的处理引擎设置中使用。

浏览 插件库,查找示例和 InfluxData 官方插件,用于

  • 数据转换:处理和转换传入数据
  • 警报:根据数据阈值发送通知
  • 聚合:计算时间序列数据的统计信息
  • 集成:连接到外部服务和 API
  • 系统监控:跟踪资源使用情况和健康指标

有关社区贡献,请参阅 GitHub 上的 influxdb3_plugins 存储库

添加示例插件

您有两种选择可以使用存储库中的插件

选项 1:本地复制插件

克隆 influxdata/influxdb3_plugins 存储库并将插件复制到您配置的插件目录

# Clone the repository
git clone https://github.com/influxdata/influxdb3_plugins.git
   
# Copy a plugin to your configured plugin directory
cp influxdb3_plugins/influxdata/system_metrics/system_metrics.py /path/to/plugins/
选项 2:直接从 GitHub 引用插件

通过使用 gh: 前缀直接从 GitHub 引用插件,跳过下载插件

# Create a trigger using a plugin from GitHub
influxdb3 create trigger \
  --trigger-spec "every:1m" \
  --plugin-filename "gh:influxdata/system_metrics/system_metrics.py" \
  --database my_database \
  system_metrics

此方法

  • 确保您使用的是最新版本
  • 简化更新和维护
  • 减少本地存储需求
选项 3:使用自定义插件存储库

对于维护自己的插件存储库或需要使用私有/内部插件的组织,请配置自定义插件存储库 URL

# Start the server with a custom plugin repository
influxdb3 serve \
  --node-id node0 \
  --object-store file \
  --data-dir ~/.influxdb3 \
  --plugin-dir ~/.plugins \
  --plugin-repo "https://internal.company.com/influxdb-plugins/"

然后使用 gh: 前缀从您的自定义存储库中引用插件

# Fetches from: https://internal.company.com/influxdb-plugins/myorg/custom_plugin.py
influxdb3 create trigger \
  --trigger-spec "every:5m" \
  --plugin-filename "gh:myorg/custom_plugin.py" \
  --database my_database \
  custom_trigger

自定义存储库的用例

  • 私有插件:托管不适合公开存储库的专有插件
  • 隔离环境:在外部互联网访问受限时使用内部镜像
  • 开发和暂存:在生产部署前测试开发分支中的插件
  • 合规性要求:满足要求内部托管的数据治理策略

--plugin-repo 选项接受任何提供原始插件文件的 HTTP/HTTPS URL。有关更多详细信息,请参阅 plugin-repo 配置选项

插件具有各种功能,例如

  • 接收插件特定参数(例如写入的数据、调用时间或 HTTP 请求)
  • 访问从触发器参数配置传递的关键字参数(作为 args
  • 访问 influxdb3_local 共享 API 来写入数据、查询数据以及管理执行之间的状态

有关可用函数、参数以及插件如何与 InfluxDB 交互的更多信息,请参阅如何 扩展插件

创建自定义插件

要构建自定义功能,您可以创建自己的处理引擎插件。

前提条件

开始之前,请确保

  • 您的 InfluxDB 3 Core 实例已启用处理引擎。
  • 您已配置了存储插件文件的 --plugin-dir
  • 您有权访问该插件目录。

创建插件的步骤

选择您的插件类型

根据您的自动化目标选择插件类型

插件类型最适合
数据写入处理传入数据
计划在特定时间间隔或时间运行代码
HTTP 请求通过 API 端点按需运行代码

创建您的插件文件

插件现在同时支持单文件和多文件架构

单文件插件

  • 在您的插件目录中创建一个 .py 文件
  • 根据您选择的插件类型添加适当的函数签名
  • 在函数中编写您的处理逻辑

多文件插件

  • 在您的插件目录中创建一个目录
  • 添加一个 __init__.py 文件作为入口点(必需)
  • 将支持模块组织在其他 .py 文件中
  • 在您的插件代码中导入和使用模块
多文件插件示例结构
my_plugin/
├── __init__.py       # Required - entry point with trigger function
├── utils.py          # Supporting module
├── processors.py     # Data processing functions
└── config.py         # Configuration helpers

__init__.py 文件必须包含您的触发器函数

# my_plugin/__init__.py
from .processors import process_data
from .config import get_settings

def process_writes(influxdb3_local, table_batches, args=None):
    settings = get_settings()
    for table_batch in table_batches:
        process_data(influxdb3_local, table_batch, settings)

支持模块可以包含辅助函数

# my_plugin/processors.py
def process_data(influxdb3_local, table_batch, settings):
    # Processing logic here
    pass

编写插件后,创建触发器以将其连接到数据库事件并定义其运行时间。

创建数据写入插件

使用数据写入插件来处理数据写入数据库时的过程。这些插件使用 table:all_tables: 触发器规范。理想的用例包括

  • 数据转换和丰富
  • 对传入值进行警报
  • 创建派生指标
def process_writes(influxdb3_local, table_batches, args=None):
    # Process data as it's written to the database
    for table_batch in table_batches:
        table_name = table_batch["table_name"]
        rows = table_batch["rows"]
        
        # Log information about the write
        influxdb3_local.info(f"Processing {len(rows)} rows from {table_name}")
        
        # Write derived data back to the database
        line = LineBuilder("processed_data")
        line.tag("source_table", table_name)
        line.int64_field("row_count", len(rows))
        influxdb3_local.write(line)

创建计划插件

计划插件在定义的间隔使用 every:cron: 触发器规范运行。将它们用于

  • 定期数据聚合
  • 报告生成
  • 系统健康检查
def process_scheduled_call(influxdb3_local, call_time, args=None):
    # Run code on a schedule
    
    # Query recent data
    results = influxdb3_local.query("SELECT * FROM metrics WHERE time > now() - INTERVAL '1 hour'")
    
    # Process the results
    if results:
        influxdb3_local.info(f"Found {len(results)} recent metrics")
    else:
        influxdb3_local.warn("No recent metrics found")

创建 HTTP 请求插件

HTTP 请求插件响应 API 调用,使用 request: 触发器规范。将它们用于

  • 创建自定义 API 端点
  • 用于外部集成的 Webhook
  • 用于数据交互的用户界面
def process_request(influxdb3_local, query_parameters, request_headers, request_body, args=None):
    # Handle HTTP requests to a custom endpoint
    
    # Log the request parameters
    influxdb3_local.info(f"Received request with parameters: {query_parameters}")
    
    # Process the request body
    if request_body:
        import json
        data = json.loads(request_body)
        influxdb3_local.info(f"Request data: {data}")
    
    # Return a response (automatically converted to JSON)
    return {"status": "success", "message": "Request processed"}

下一步

编写插件后

从本地计算机上传插件

对于本地开发和测试,您可以在创建触发器时直接从您的计算机上传插件文件。这消除了手动将文件复制到服务器插件目录的需要。

--upload 标志与 --path 一起使用来传输本地文件或目录

# Upload single-file plugin
influxdb3 create trigger \
  --trigger-spec "every:10s" \
  --path "/local/path/to/plugin.py" \
  --upload \
  --database metrics \
  my_trigger

# Upload multifile plugin directory
influxdb3 create trigger \
  --trigger-spec "every:30s" \
  --path "/local/path/to/plugin-dir" \
  --upload \
  --database metrics \
  complex_trigger

需要管理员权限

插件上传需要管理员令牌。此安全措施可防止在服务器上未经授权的代码执行。

何时使用插件上传

  • 本地插件开发和测试
  • 部署插件而无需 SSH 访问服务器
  • 插件代码的快速迭代
  • 在 CI/CD 管道中自动化插件部署

有关更多信息,请参阅 influxdb3 create trigger CLI 参考

更新现有插件

修改运行触发器的插件代码,而无需重新创建它们。这允许您迭代插件开发,同时保留触发器配置和历史记录。

使用 influxdb3 update trigger 命令

# Update single-file plugin
influxdb3 update trigger \
  --database metrics \
  --trigger-name my_trigger \
  --path "/path/to/updated/plugin.py"

# Update multifile plugin
influxdb3 update trigger \
  --database metrics \
  --trigger-name complex_trigger \
  --path "/path/to/updated/plugin-dir"

更新操作

  • 立即替换插件文件
  • 保留触发器配置(规范、计划、参数)
  • 出于安全原因需要管理员令牌
  • 适用于本地路径和上传文件

有关完整参考,请参阅 influxdb3 update trigger

查看已加载的插件

监控系统中加载了哪些插件,以实现操作可见性和故障排除。

选项 1:使用 CLI 命令

# List all plugins
influxdb3 show plugins --token $ADMIN_TOKEN

# JSON format for programmatic access
influxdb3 show plugins --format json --token $ADMIN_TOKEN

选项 2:查询系统表

_internal 数据库中的 system.plugin_files 表提供了详细的插件文件信息

influxdb3 query \
  -d _internal \
  "SELECT * FROM system.plugin_files ORDER BY plugin_name" \
  --token $ADMIN_TOKEN

可用列

  • plugin_name (String):触发器名称
  • file_name (String):插件文件名
  • file_path (String):服务器的完整路径
  • size_bytes (Int64):文件大小
  • last_modified (Int64):修改时间戳(毫秒)

示例查询

-- Find plugins by name
SELECT * FROM system.plugin_files WHERE plugin_name = 'my_trigger';

-- Find large plugins
SELECT plugin_name, size_bytes
FROM system.plugin_files
WHERE size_bytes > 10000;

-- Check modification times
SELECT plugin_name, file_name, last_modified
FROM system.plugin_files
ORDER BY last_modified DESC;

有关更多信息,请参阅 influxdb3 show plugins 参考查询系统数据

设置触发器

了解触发器类型

插件类型触发器规范插件运行时间
数据写入table:<TABLE_NAME>all_tables写入表时
计划every:<DURATION>cron:<EXPRESSION>在指定的时间间隔
HTTP 请求request:<REQUEST_PATH>收到 HTTP 请求时

使用 create trigger 命令

使用 influxdb3 create trigger 命令,并附带适当的触发器规范

influxdb3 create trigger \
  --trigger-spec 
SPECIFICATION
\
--plugin-filename
PLUGIN_FILE
\
--database
DATABASE_NAME
\
TRIGGER_NAME

在上面的示例中,替换以下内容

  • SPECIFICATION:触发器规范
  • PLUGIN_FILE:相对于您的已配置插件目录的插件文件名
  • DATABASE_NAME:数据库名称
  • TRIGGER_NAME:新触发器的名称

当指定本地插件文件时,--plugin-filename 参数相对于服务器配置的 --plugin-dir。您无需提供绝对路径。

触发器规范示例

触发数据写入

# Trigger on writes to a specific table
# The plugin file must be in your configured plugin directory
influxdb3 create trigger \
  --trigger-spec "table:sensor_data" \
  --plugin-filename "process_sensors.py" \
  --database my_database \
  sensor_processor

# Trigger on writes to all tables
influxdb3 create trigger \
  --trigger-spec "all_tables" \
  --plugin-filename "process_all_data.py" \
  --database my_database \
  all_data_processor

当数据库将摄入的数据刷新到对象存储的写前日志 (WAL) 时(默认为每秒一次),触发器运行。

插件接收写入的数据和表信息。

触发数据写入(排除表)

如果您想为所有表使用一个触发器,但排除特定表,您可以使用触发器参数和您的插件代码来过滤掉不需要的表——例如

influxdb3 create trigger \
  --database 
DATABASE_NAME
\
--token
AUTH_TOKEN
\
--plugin-filename processor.py \ --trigger-spec "all_tables" \ --trigger-arguments "exclude_tables=temp_data,debug_info,system_logs" \ data_processor

替换以下内容:

  • DATABASE_NAME:数据库名称
  • AUTH_TOKEN:您的 令牌

然后在您的插件中

# processor.py
def on_write(self, database, table_name, batch):
    # Get excluded tables from trigger arguments
    excluded_tables = set(self.args.get('exclude_tables', '').split(','))

    if table_name in excluded_tables:
        return

    # Process allowed tables
    self.process_data(database, table_name, batch)
建议
  • 尽早返回:在插件中尽早检查排除项。
  • 高效查找:使用集合来获得 O(1) 的查找性能,对于大型排除列表。
  • 性能:记录被跳过的表用于调试,但在生产中避免过度日志记录。
  • 多个触发器:对于少量表,请考虑创建单独的表特定触发器,而不是在插件代码中进行过滤。请参阅 HTTP API 处理引擎端点以管理触发器。

按计划触发

# Run every 5 minutes
influxdb3 create trigger \
  --trigger-spec "every:5m" \
  --plugin-filename "periodic_check.py" \
  --database my_database \
  regular_check

# Run on a cron schedule (8am daily)
# Supports extended cron format with seconds
influxdb3 create trigger \
  --trigger-spec "cron:0 0 8 * * *" \
  --plugin-filename "daily_report.py" \
  --database my_database \
  daily_report

插件接收计划的调用时间。

触发 HTTP 请求

# Create an endpoint at /api/v3/engine/webhook
influxdb3 create trigger \
  --trigger-spec "request:webhook" \
  --plugin-filename "webhook_handler.py" \
  --database my_database \
  webhook_processor

通过 /api/v3/engine/{REQUEST_PATH}(在本例中为 /api/v3/engine/webhook)访问您的端点。该触发器默认启用,并在收到指定路径上的 HTTP 请求时运行。

要运行插件,请向端点发送 GETPOST 请求——例如

curl https://:8181/api/v3/engine/webhook

插件接收包含方法、标头和正文的 HTTP 请求对象。

要查看与数据库关联的触发器,请使用 influxdb3 show summary 命令

influxdb3 show summary --database my_database --token AUTH_TOKEN

将参数传递给插件

使用触发器参数将配置从触发器传递给它运行的插件。您可以使用此来

  • 监控的阈值
  • 外部服务的连接属性
  • 插件行为的配置设置
influxdb3 create trigger \
  --trigger-spec "every:1h" \
  --plugin-filename "threshold_check.py" \
  --trigger-arguments threshold=90,notify_email=admin@example.com \
  --database my_database \
  threshold_monitor

参数作为 Dict[str, str] 传递给插件,其中键是参数名称,值是参数值

def process_scheduled_call(influxdb3_local, call_time, args=None):
    if args and "threshold" in args:
        threshold = float(args["threshold"])
        email = args.get("notify_email", "default@example.com")
        
        # Use the arguments in your logic
        influxdb3_local.info(f"Checking threshold {threshold}, will notify {email}")

控制触发器执行

默认情况下,触发器是同步运行的——每个实例在执行前等待前一个实例完成。

要允许同一触发器的多个实例同时运行,请将触发器配置为异步运行

# Allow multiple trigger instances to run simultaneously
influxdb3 create trigger \
  --trigger-spec "table:metrics" \
  --plugin-filename "heavy_process.py" \
  --run-asynchronous \
  --database my_database \
  async_processor

配置触发器的错误处理

要配置触发器的错误处理行为,请使用 --error-behavior <ERROR_BEHAVIOR> CLI 选项,并附带以下值之一

  • log(默认):将所有插件错误日志记录到 stdout 和 system.processing_engine_logs 系统表。
  • retry:在发生错误后立即尝试再次运行插件。
  • disable:发生错误时自动禁用插件(稍后可通过 CLI 重新启用)。
# Automatically retry on error
influxdb3 create trigger \
  --trigger-spec "table:important_data" \
  --plugin-filename "critical_process.py" \
  --error-behavior retry \
  --database my_database \
  critical_processor

# Disable the trigger on error
influxdb3 create trigger \
  --trigger-spec "request:webhook" \
  --plugin-filename "webhook_handler.py" \
  --error-behavior disable \
  --database my_database \
  auto_disable_processor

管理插件依赖项

使用 influxdb3 install package 命令将第三方库(如 pandasrequestsinfluxdb3-python)添加到您的插件环境。
这会将包安装到处理引擎的嵌入式 Python 环境中,以确保与您的 InfluxDB 实例兼容。

# Use the CLI to install a Python package
influxdb3 install package pandas
# Use the CLI to install a Python package in a Docker container
docker exec -it 
CONTAINER_NAME
influxdb3 install package pandas

这些示例将指定的 Python 包(例如,pandas)安装到处理引擎的嵌入式虚拟环境中。

  • 在直接在系统上运行 InfluxDB 时使用 CLI 命令。
  • 如果您在容器化环境中运行 InfluxDB,请使用 Docker 变体。

为插件使用捆绑的 Python

当您使用 --plugin-dir 选项启动服务器时,InfluxDB 3 会在您的插件目录中创建一个 Python 虚拟环境(<PLUGIN_DIR>/venv)。如果您需要创建自定义虚拟环境,请使用 InfluxDB 3 捆绑的 Python 解释器。不要使用系统 Python。使用系统 Python 创建虚拟环境(例如,使用 python -m venv)可能导致运行时错误和插件失败。

有关更多信息,请参阅 处理引擎 README

InfluxDB 会在您的插件目录中创建一个 Python 虚拟环境,并安装指定的包。

为安全环境禁用包安装

对于隔离部署或安全要求严格的环境,您可以在保持处理引擎功能正常的同时禁用 Python 包安装。

使用 --package-manager disabled 启动服务器

influxdb3 serve \
  --node-id node0 \
  --object-store file \
  --data-dir ~/.influxdb3 \
  --plugin-dir ~/.plugins \
  --package-manager disabled

当禁用包安装时

  • 处理引擎继续正常运行触发器
  • 插件代码在不受限制的情况下执行
  • 包安装命令被阻止
  • 虚拟环境中的预装依赖项仍然可用

预装必需的依赖项

在禁用包管理器之前,请安装所有必需的 Python 包

# Install packages first
influxdb3 install package pandas requests numpy

# Then start with disabled package manager
influxdb3 serve \
  --plugin-dir ~/.plugins \
  --package-manager disabled

禁用包管理的用例

  • 没有互联网访问的隔离环境
  • 禁止运行时包安装的合规性要求
  • 集中管理的依赖项环境
  • 仅要求预先批准包的安全策略

有关更多配置选项,请参阅 –package-manager

插件安全

处理引擎包含安全功能,以保护您的 InfluxDB 3 Core 实例免受未经授权的代码执行和文件系统攻击。

插件路径验证

所有插件文件路径都会经过验证,以防止目录遍历攻击。系统会阻止

  • 带有父目录引用的相对路径../../../
  • 绝对路径/etc/passwd/usr/bin/script.py
  • 逃离插件目录的符号链接

创建或更新触发器时,插件路径必须解析到配置的 --plugin-dir 内部。

被阻止路径示例

# These will be rejected
influxdb3 create trigger \
  --path "../../../etc/passwd" \  # Blocked: parent directory traversal
  ...

influxdb3 create trigger \
  --path "/tmp/malicious.py" \    # Blocked: absolute path
  ...

有效插件路径

# These are allowed
influxdb3 create trigger \
  --path "myapp/plugin.py" \      # Relative to plugin-dir
  ...

influxdb3 create trigger \
  --path "transforms/data.py" \    # Subdirectory in plugin-dir
  ...

上传和更新权限

插件上传和更新操作需要管理员令牌,以防止未经授权的代码部署

  • --upload 标志需要管理员权限
  • update trigger 命令需要管理员令牌
  • 标准资源令牌无法上传或修改插件代码

此安全模型确保只有管理员才能在您的数据库中引入或修改可执行代码。

最佳实践

对于开发

  • 在开发过程中使用 --upload 标志来部署插件
  • 首先在非生产环境中测试插件
  • 在部署前审查插件代码

对于生产

  • 通过安全文件传输预先将插件部署到服务器的插件目录
  • 使用自定义插件存储库来存放经过审核、批准的插件
  • 在锁定环境中禁用包安装(--package-manager disabled
  • 使用 system.plugin_files审计插件文件
  • 实施插件更新的变更控制流程

有关更多安全配置选项,请参阅 配置选项


此页面是否有帮助?

感谢您的反馈!


InfluxDB 3.8 新特性

InfluxDB 3.8 和 InfluxDB 3 Explorer 1.6 的主要增强功能。

查看博客文章

InfluxDB 3.8 现已适用于 Core 和 Enterprise 版本,同时发布了 InfluxDB 3 Explorer UI 的 1.6 版本。本次发布着重于操作成熟度,以及如何更轻松地部署、管理和可靠地运行 InfluxDB。

更多信息,请查看

InfluxDB Docker 的 latest 标签将指向 InfluxDB 3 Core

在 **2026 年 2 月 3 日**,InfluxDB Docker 镜像的 latest 标签将指向 InfluxDB 3 Core。为避免意外升级,请在您的 Docker 部署中使用特定的版本标签。

如果使用 Docker 来安装和运行 InfluxDB,latest 标签将指向 InfluxDB 3 Core。为避免意外升级,请在您的 Docker 部署中使用特定的版本标签。例如,如果使用 Docker 运行 InfluxDB v2,请将 latest 版本标签替换为 Docker pull 命令中的特定版本标签 — 例如

docker pull influxdb:2