文档文档

在 InfluxDB 3 Core 中处理数据

InfluxDB 3 Core 处理引擎是一个嵌入式的 Python 虚拟机 (VM),它在数据库内部运行代码来处理和转换数据。您可以创建处理引擎 插件,当特定事件 触发 时运行这些插件。

处理引擎术语

插件

插件是一个 Python 函数,其签名与处理引擎的 触发器 兼容。

触发器

创建触发器时,您需要指定一个 插件、一个数据库、可选参数以及一个触发器规范,该规范定义了插件的执行时间和接收的数据。

触发器类型

InfluxDB 3 提供了以下类型的触发器,每种都有特定的规范:

  • 数据写入 (table:all_tables):当数据库将数据刷新到写前日志 (默认每秒) 时,将一批写入的数据 (针对特定表或所有表) 发送到插件。
  • 计划任务 (every:cron:):根据用户配置的计划 (使用 crontab 或持续时间) 执行插件。此触发器类型对于数据收集和故障监控非常有用。
  • HTTP 请求 (request:):将插件绑定到 /api/v3/engine/<REQUEST_PATH> 的自定义 HTTP API 端点。插件接收 HTTP 请求头和内容,可以解析、处理并将数据发送到数据库或第三方服务。

激活处理引擎

要激活处理引擎,请在启动 InfluxDB 3 Core 服务器时包含 --plugin-dir <PLUGIN_DIR> 选项。PLUGIN_DIR 是您存储处理引擎运行的 插件 文件的文件系统位置。

如果您是从 tar 存档手动安装 InfluxDB 3 Core,请确保 influxdb3 二进制文件和 python/ 目录保留在同一个父目录中。安装脚本会自动处理此问题。

influxdb3 serve \
  # ...
  --plugin-dir 
PLUGIN_DIR

PLUGIN_DIR 替换为您插件目录的路径。此路径可以是绝对路径,也可以是相对于 influxdb3 服务器当前工作目录的相对路径。

创建插件

要创建插件,请在您配置的 PLUGIN_DIR 中编写并存储一个 Python 文件。以下示例是一个数据写入插件,它在数据持久化到对象存储之前进行处理。

数据写入的示例 Python 插件
# This is the basic structure for Python plugin code that runs in the
# InfluxDB 3 Processing engine.

# When creating a trigger, you can provide runtime arguments to your plugin,
# allowing you to write generic code that uses variables such as monitoring
# thresholds, environment variables, and host names.
#
# Use the following exact signature to define a function for the data write
# trigger.
# When you create a trigger for a data write plugin, you specify the database
# and tables that the plugin receives written data from on every WAL flush
# (default is once per second).
def process_writes(influxdb3_local, table_batches, args=None):
    # here you can see logging. for now this won't do anything, but soon 
    # we'll capture this so you can query it from system tables
    if args and "arg1" in args:
        influxdb3_local.info("arg1: " + args["arg1"])

    # here we're using arguments provided at the time the trigger was set up 
    # to feed into parameters that we'll put into a query
    query_params = {"room": "Kitchen"}
    # The following example shows how to execute a parameterized query. Only SQL is supported. 
    # It queries the database that the trigger is configured for.
    query_result = influxdb3_local.query("SELECT * FROM home where room = '$room'", query_params)
    # The result is a list of Dict that have the column name as key and value as 
    # value.
    influxdb3_local.info("query result: " + str(query_result))

    # this is the data that is sent when data is written to the database and flushed to the WAL. 
    # One batch for each table (will only be one if triggered on a single table)
    for table_batch in table_batches:
        # here you can see that the table_name is available.
        influxdb3_local.info("table: " + table_batch["table_name"])

        # example to skip the table we're later writing data into
        if table_batch["table_name"] == "some_table":
            continue

        # and then the individual rows, which are Dict with keys of the column names and values
        for row in table_batch["rows"]:
            influxdb3_local.info("row: " + str(row))

    # this shows building a line of LP to write back to the database. tags must go first and 
    # their order is important and must always be the same for each individual table. Then 
    # fields and lastly an optional time, which you can see in the next example below
    line = LineBuilder("some_table")\
        .tag("tag1", "tag1_value")\
        .tag("tag2", "tag2_value")\
        .int64_field("field1", 1)\
        .float64_field("field2", 2.0)\
        .string_field("field3", "number three")
    
    # this writes it back (it actually just buffers it until the completion of this function
    # at which point it will write everything back that you put in)
    influxdb3_local.write(line)

    # here's another example, but with us setting a nanosecond timestamp at the end
    other_line = LineBuilder("other_table")
    other_line.int64_field("other_field", 1)
    other_line.float64_field("other_field2", 3.14)
    other_line.time_ns(1302)

    # and you can see that we can write to any DB in the server
    influxdb3_local.write_to_db("mytestdb", other_line)

    # just some log output as an example
    influxdb3_local.info("done")

在服务器上测试插件

使用 influxdb3 test wal_plugin CLI 命令安全地测试您的处理引擎插件,而不会影响实际数据。在插件测试期间:

  • 插件执行的查询将针对您发送请求的服务器进行。
  • 写入不会发送到服务器,而是返回给您。

测试插件

  1. 示例插件代码 保存到插件目录中的插件文件中。如果您尚未向示例中的表写入数据,请注释掉查询数据的行。
  2. 要运行测试,请使用以下选项输入以下命令:
  • --lp--file:要测试的行协议
  • 可选:--input-arguments:一个逗号分隔的 <KEY>=<VALUE> 参数列表,用于您的插件代码
influxdb3 test wal_plugin \
  --database 
DATABASE_NAME
\
--token
AUTH_TOKEN
\
--lp
INPUT_LINE_PROTOCOL
\
--input-arguments
INPUT_ARGS
\
PLUGIN_FILENAME

替换以下内容:

  • INPUT_LINE_PROTOCOL:要测试的行协议
  • 可选:INPUT_ARGS:一个逗号分隔的 <KEY>=<VALUE> 参数列表,用于您的插件代码 — 例如,arg1=hello,arg2=world
  • DATABASE_NAME:要测试的数据库名称
  • AUTH_TOKEN:您的 InfluxDB 3 Core 服务器的 管理员令牌
  • PLUGIN_FILENAME:要测试的插件文件名

示例:测试插件

# Test a plugin
# Requires:
#   - A database named `mydb` with a table named `foo`
#   - A Python plugin file named `test.py`
# Test a plugin
influxdb3 test wal_plugin \
  --lp "my_measure,tag1=asdf f1=1.0 123" \
  --token apiv3_0xxx0o0XxXxx00Xxxx000xXXxoo0== \
  --database sensors \
  --input-arguments "arg1=hello,arg2=world" \
  test.py

该命令将使用测试数据运行插件代码,将数据提供给插件代码,然后返回插件结果。您可以快速查看插件的行为、它将写入数据库的数据以及任何错误。然后,您可以编辑插件目录中的 Python 代码,并重新运行测试。服务器会在每次请求 test API 时重新加载文件。

有关更多信息,请参阅 influxdb3 test wal_plugin 或运行 influxdb3 test wal_plugin -h

创建触发器

将插件代码放在服务器插件目录中并通过了成功的测试后,您就可以创建触发器来运行插件了。使用 influxdb3 create trigger 命令 来创建触发器。

# Create a trigger that runs the plugin
influxdb3 create trigger \
  --token apiv3_0xxx0o0XxXxx00Xxxx000xXXxoo0== \
  --database sensors \
  --plugin test_plugin \
  --trigger-spec "table:foo" \
  --trigger-arguments "arg1=hello,arg2=world" \
  trigger1

启用触发器

创建插件和触发器后,输入以下命令以启用触发器,使其在您写入数据时运行插件:

influxdb3 enable trigger \
  --token 
AUTH_TOKEN
\
--database
DATABASE_NAME
\
TRIGGER_NAME

将以下占位符替换为您自己的值

  • DATABASE_NAME:要启用触发器的数据库名称
  • AUTH_TOKEN:您的 管理员令牌
  • TRIGGER_NAME:要启用的触发器名称

例如,要在 sensors 数据库中启用名为 trigger1 的触发器:

influxdb3 enable trigger \
  --token apiv3_0xxx0o0XxXxx00Xxxx000xXXxoo0== \
  --database sensors
  trigger1 

下一步

如果您已完成此 InfluxDB 3 Core 入门指南,请了解有关以下方面的工具和选项:


此页面是否有帮助?

感谢您的反馈!


InfluxDB 3.8 新特性

InfluxDB 3.8 和 InfluxDB 3 Explorer 1.6 的主要增强功能。

查看博客文章

InfluxDB 3.8 现已适用于 Core 和 Enterprise 版本,同时发布了 InfluxDB 3 Explorer UI 的 1.6 版本。本次发布着重于操作成熟度,以及如何更轻松地部署、管理和可靠地运行 InfluxDB。

更多信息,请查看

InfluxDB Docker 的 latest 标签将指向 InfluxDB 3 Core

在 **2026 年 2 月 3 日**,InfluxDB Docker 镜像的 latest 标签将指向 InfluxDB 3 Core。为避免意外升级,请在您的 Docker 部署中使用特定的版本标签。

如果使用 Docker 来安装和运行 InfluxDB,latest 标签将指向 InfluxDB 3 Core。为避免意外升级,请在您的 Docker 部署中使用特定的版本标签。例如,如果使用 Docker 运行 InfluxDB v2,请将 latest 版本标签替换为 Docker pull 命令中的特定版本标签 — 例如

docker pull influxdb:2