在 InfluxDB 3 Core 中处理数据
InfluxDB 3 Core 处理引擎是一个嵌入式的 Python 虚拟机 (VM),它在数据库内部运行代码来处理和转换数据。您可以创建处理引擎 插件,当特定事件 触发 时运行这些插件。
处理引擎术语
插件
插件是一个 Python 函数,其签名与处理引擎的 触发器 兼容。
触发器
创建触发器时,您需要指定一个 插件、一个数据库、可选参数以及一个触发器规范,该规范定义了插件的执行时间和接收的数据。
触发器类型
InfluxDB 3 提供了以下类型的触发器,每种都有特定的规范:
- 数据写入 (
table:或all_tables):当数据库将数据刷新到写前日志 (默认每秒) 时,将一批写入的数据 (针对特定表或所有表) 发送到插件。 - 计划任务 (
every:或cron:):根据用户配置的计划 (使用 crontab 或持续时间) 执行插件。此触发器类型对于数据收集和故障监控非常有用。 - HTTP 请求 (
request:):将插件绑定到/api/v3/engine/<REQUEST_PATH>的自定义 HTTP API 端点。插件接收 HTTP 请求头和内容,可以解析、处理并将数据发送到数据库或第三方服务。
激活处理引擎
要激活处理引擎,请在启动 InfluxDB 3 Core 服务器时包含 --plugin-dir <PLUGIN_DIR> 选项。PLUGIN_DIR 是您存储处理引擎运行的 插件 文件的文件系统位置。
如果您是从 tar 存档手动安装 InfluxDB 3 Core,请确保 influxdb3 二进制文件和 python/ 目录保留在同一个父目录中。安装脚本会自动处理此问题。
influxdb3 serve \
# ...
--plugin-dir PLUGIN_DIR将 PLUGIN_DIR 替换为您插件目录的路径。此路径可以是绝对路径,也可以是相对于 influxdb3 服务器当前工作目录的相对路径。
创建插件
要创建插件,请在您配置的 PLUGIN_DIR 中编写并存储一个 Python 文件。以下示例是一个数据写入插件,它在数据持久化到对象存储之前进行处理。
数据写入的示例 Python 插件
# This is the basic structure for Python plugin code that runs in the
# InfluxDB 3 Processing engine.
# When creating a trigger, you can provide runtime arguments to your plugin,
# allowing you to write generic code that uses variables such as monitoring
# thresholds, environment variables, and host names.
#
# Use the following exact signature to define a function for the data write
# trigger.
# When you create a trigger for a data write plugin, you specify the database
# and tables that the plugin receives written data from on every WAL flush
# (default is once per second).
def process_writes(influxdb3_local, table_batches, args=None):
# here you can see logging. for now this won't do anything, but soon
# we'll capture this so you can query it from system tables
if args and "arg1" in args:
influxdb3_local.info("arg1: " + args["arg1"])
# here we're using arguments provided at the time the trigger was set up
# to feed into parameters that we'll put into a query
query_params = {"room": "Kitchen"}
# The following example shows how to execute a parameterized query. Only SQL is supported.
# It queries the database that the trigger is configured for.
query_result = influxdb3_local.query("SELECT * FROM home where room = '$room'", query_params)
# The result is a list of Dict that have the column name as key and value as
# value.
influxdb3_local.info("query result: " + str(query_result))
# this is the data that is sent when data is written to the database and flushed to the WAL.
# One batch for each table (will only be one if triggered on a single table)
for table_batch in table_batches:
# here you can see that the table_name is available.
influxdb3_local.info("table: " + table_batch["table_name"])
# example to skip the table we're later writing data into
if table_batch["table_name"] == "some_table":
continue
# and then the individual rows, which are Dict with keys of the column names and values
for row in table_batch["rows"]:
influxdb3_local.info("row: " + str(row))
# this shows building a line of LP to write back to the database. tags must go first and
# their order is important and must always be the same for each individual table. Then
# fields and lastly an optional time, which you can see in the next example below
line = LineBuilder("some_table")\
.tag("tag1", "tag1_value")\
.tag("tag2", "tag2_value")\
.int64_field("field1", 1)\
.float64_field("field2", 2.0)\
.string_field("field3", "number three")
# this writes it back (it actually just buffers it until the completion of this function
# at which point it will write everything back that you put in)
influxdb3_local.write(line)
# here's another example, but with us setting a nanosecond timestamp at the end
other_line = LineBuilder("other_table")
other_line.int64_field("other_field", 1)
other_line.float64_field("other_field2", 3.14)
other_line.time_ns(1302)
# and you can see that we can write to any DB in the server
influxdb3_local.write_to_db("mytestdb", other_line)
# just some log output as an example
influxdb3_local.info("done")在服务器上测试插件
使用 influxdb3 test wal_plugin CLI 命令安全地测试您的处理引擎插件,而不会影响实际数据。在插件测试期间:
- 插件执行的查询将针对您发送请求的服务器进行。
- 写入不会发送到服务器,而是返回给您。
测试插件
- 将 示例插件代码 保存到插件目录中的插件文件中。如果您尚未向示例中的表写入数据,请注释掉查询数据的行。
- 要运行测试,请使用以下选项输入以下命令:
--lp或--file:要测试的行协议- 可选:
--input-arguments:一个逗号分隔的<KEY>=<VALUE>参数列表,用于您的插件代码
influxdb3 test wal_plugin \
--database DATABASE_NAME \
--token AUTH_TOKEN \
--lp INPUT_LINE_PROTOCOL \
--input-arguments INPUT_ARGS \
PLUGIN_FILENAME替换以下内容:
INPUT_LINE_PROTOCOL:要测试的行协议- 可选:
INPUT_ARGS:一个逗号分隔的<KEY>=<VALUE>参数列表,用于您的插件代码 — 例如,arg1=hello,arg2=world DATABASE_NAME:要测试的数据库名称AUTH_TOKEN:您的 InfluxDB 3 Core 服务器的 管理员令牌PLUGIN_FILENAME:要测试的插件文件名
示例:测试插件
# Test a plugin
# Requires:
# - A database named `mydb` with a table named `foo`
# - A Python plugin file named `test.py`
# Test a plugin
influxdb3 test wal_plugin \
--lp "my_measure,tag1=asdf f1=1.0 123" \
--token apiv3_0xxx0o0XxXxx00Xxxx000xXXxoo0== \
--database sensors \
--input-arguments "arg1=hello,arg2=world" \
test.py该命令将使用测试数据运行插件代码,将数据提供给插件代码,然后返回插件结果。您可以快速查看插件的行为、它将写入数据库的数据以及任何错误。然后,您可以编辑插件目录中的 Python 代码,并重新运行测试。服务器会在每次请求 test API 时重新加载文件。
有关更多信息,请参阅 influxdb3 test wal_plugin 或运行 influxdb3 test wal_plugin -h。
创建触发器
将插件代码放在服务器插件目录中并通过了成功的测试后,您就可以创建触发器来运行插件了。使用 influxdb3 create trigger 命令 来创建触发器。
# Create a trigger that runs the plugin
influxdb3 create trigger \
--token apiv3_0xxx0o0XxXxx00Xxxx000xXXxoo0== \
--database sensors \
--plugin test_plugin \
--trigger-spec "table:foo" \
--trigger-arguments "arg1=hello,arg2=world" \
trigger1启用触发器
创建插件和触发器后,输入以下命令以启用触发器,使其在您写入数据时运行插件:
influxdb3 enable trigger \
--token AUTH_TOKEN \
--database DATABASE_NAME \
TRIGGER_NAME将以下占位符替换为您自己的值
DATABASE_NAME:要启用触发器的数据库名称AUTH_TOKEN:您的 管理员令牌TRIGGER_NAME:要启用的触发器名称
例如,要在 sensors 数据库中启用名为 trigger1 的触发器:
influxdb3 enable trigger \
--token apiv3_0xxx0o0XxXxx00Xxxx000xXXxoo0== \
--database sensors
trigger1 下一步
如果您已完成此 InfluxDB 3 Core 入门指南,请了解有关以下方面的工具和选项:
此页面是否有帮助?
感谢您的反馈!
支持和反馈
感谢您成为我们社区的一员!我们欢迎并鼓励您对 InfluxDB 3 Core 和本文档提供反馈和错误报告。要获得支持,请使用以下资源
具有年度合同或支持合同的客户可以 联系 InfluxData 支持。