Python 插件和处理引擎
InfluxDB 3 Core 处于公开 Alpha 阶段
InfluxDB 3 Core 处于公开 alpha 阶段,可用于测试和反馈,但不适用于生产环境。 产品和本文档都在不断完善中。 我们欢迎并鼓励您提供关于 alpha 体验的意见,并邀请您加入我们的公共渠道以获取更新和分享反馈。
处理引擎仅适用于 Docker
处理引擎目前仅在 Docker x86 环境中受支持。 非 Docker 支持即将推出。 引擎、API 和开发者体验都在积极发展,可能会发生变化。 加入我们的 Discord 以获取更新和反馈。
使用 InfluxDB 3 Core 处理引擎来运行代码并执行不同数据库事件的任务。
InfluxDB 3 Core 提供了 InfluxDB 3 处理引擎,这是一个嵌入式 Python VM,可以动态加载和触发 Python 插件以响应数据库中的事件。
插件
贡献和使用社区插件
influxdata/influxdb3_plugins 是 GitHub 上的一个公共仓库,您可以在其中找到和贡献示例插件。 您可以直接在触发器配置中引用仓库中的插件。
处理引擎插件是您提供的 Python 代码,用于运行任务,例如数据降采样、监控、创建警报或调用外部服务。
触发器
触发器是您创建的 InfluxDB 3 资源,用于将数据库事件(例如,WAL 刷新)与应运行的插件关联起来。 当事件发生时,触发器会将配置、可选参数和事件数据传递给插件。
处理引擎提供四种类型的插件和触发器——每种类型都对应于一种事件类型,并具有特定于事件的配置,使您可以使用有针对性的逻辑来处理事件。
- WAL 刷新:当预写日志 (WAL) 刷新到对象存储时触发(默认情况下为每秒一次)
- Parquet 持久化(即将推出):当 InfluxDB 3 将数据持久化到对象存储 Parquet 文件时触发
- 计划任务:按照您使用 cron 语法指定的计划触发
- 按需请求:绑定到 HTTP API
/api/v3/engine/<CUSTOM_PATH>
端点,并通过对端点的 GET 或 POST 请求触发。
激活处理引擎
要启用处理引擎,请使用 --plugin-dir
选项和插件目录的路径(目录尚无需存在)启动 InfluxDB 3 Core 服务器——例如
influxdb3 serve --node-id node0 --plugin-dir /path/to/plugins
共享 API
所有插件类型都提供 InfluxDB 3 共享 API,用于与数据库交互。 共享 API 提供对以下内容的访问:
LineBuilder
:用于创建 Line Protocol 行以写入数据库query
:用于从任何数据库查询数据info
、warn
和error
:用于将消息记录到数据库日志,该日志在服务器日志中输出,并捕获在可通过 SQL 查询的系统表中
Line builder (行构建器)
LineBuilder
是一个简单的 API,用于构建 Line Protocol 行以写入数据库。 写入在插件运行时缓冲,并在插件完成时刷新。 所有插件类型都提供 LineBuilder
API。
以下示例演示如何使用 LineBuilder
API
# Build line protocol incrementally
line = LineBuilder("weather")
line.tag("location", "us-midwest")
line.float64_field("temperature", 82.5)
line.time_ns(1627680000000000000)
influxdb3_local.write(line)
# Output line protocol as a string ("weather,location=us-midwest temperature=82.5 1627680000000000000")
line_str = line.build()
以下是 LineBuilder
API 的 Python 实现
from typing import Optional
from collections import OrderedDict
class InfluxDBError(Exception):
"""Base exception for InfluxDB-related errors"""
pass
class InvalidMeasurementError(InfluxDBError):
"""Raised when measurement name is invalid"""
pass
class InvalidKeyError(InfluxDBError):
"""Raised when a tag or field key is invalid"""
pass
class InvalidLineError(InfluxDBError):
"""Raised when a line protocol string is invalid"""
pass
class LineBuilder:
def __init__(self, measurement: str):
if ' ' in measurement:
raise InvalidMeasurementError("Measurement name cannot contain spaces")
self.measurement = measurement
self.tags: OrderedDict[str, str] = OrderedDict()
self.fields: OrderedDict[str, str] = OrderedDict()
self._timestamp_ns: Optional[int] = None
def _validate_key(self, key: str, key_type: str) -> None:
"""Validate that a key does not contain spaces, commas, or equals signs."""
if not key:
raise InvalidKeyError(f"{key_type} key cannot be empty")
if ' ' in key:
raise InvalidKeyError(f"{key_type} key '{key}' cannot contain spaces")
if ',' in key:
raise InvalidKeyError(f"{key_type} key '{key}' cannot contain commas")
if '=' in key:
raise InvalidKeyError(f"{key_type} key '{key}' cannot contain equals signs")
def tag(self, key: str, value: str) -> 'LineBuilder':
"""Add a tag to the line protocol."""
self._validate_key(key, "tag")
self.tags[key] = str(value)
return self
def uint64_field(self, key: str, value: int) -> 'LineBuilder':
"""Add an unsigned integer field to the line protocol."""
self._validate_key(key, "field")
if value < 0:
raise ValueError(f"uint64 field '{key}' cannot be negative")
self.fields[key] = f"{value}u"
return self
def int64_field(self, key: str, value: int) -> 'LineBuilder':
"""Add an integer field to the line protocol."""
self._validate_key(key, "field")
self.fields[key] = f"{value}i"
return self
def float64_field(self, key: str, value: float) -> 'LineBuilder':
"""Add a float field to the line protocol."""
self._validate_key(key, "field")
# Check if value has no decimal component
self.fields[key] = f"{int(value)}.0" if value % 1 == 0 else str(value)
return self
def string_field(self, key: str, value: str) -> 'LineBuilder':
"""Add a string field to the line protocol."""
self._validate_key(key, "field")
# Escape quotes and backslashes in string values
escaped_value = value.replace('"', '\\"').replace('\\', '\\\\')
self.fields[key] = f'"{escaped_value}"'
return self
def bool_field(self, key: str, value: bool) -> 'LineBuilder':
"""Add a boolean field to the line protocol."""
self._validate_key(key, "field")
self.fields[key] = 't' if value else 'f'
return self
def time_ns(self, timestamp_ns: int) -> 'LineBuilder':
"""Set the timestamp in nanoseconds."""
self._timestamp_ns = timestamp_ns
return self
def build(self) -> str:
"""Build the line protocol string."""
# Start with measurement name (escape commas only)
line = self.measurement.replace(',', '\\,')
# Add tags if present
if self.tags:
tags_str = ','.join(
f"{k}={v}" for k, v in self.tags.items()
)
line += f",{tags_str}"
# Add fields (required)
if not self.fields:
raise InvalidLineError(f"At least one field is required: {line}")
fields_str = ','.join(
f"{k}={v}" for k, v in self.fields.items()
)
line += f" {fields_str}"
# Add timestamp if present
if self._timestamp_ns is not None:
line += f" {self._timestamp_ns}"
return line
Query (查询)
共享 API query
函数使用可选参数(参数化查询)执行 SQL 查询,并将结果作为 Dict[String, Any]
的 List
返回,其中键是列名,值是列值。 所有插件类型都提供 query
函数。
以下示例演示如何使用 query
函数
influxdb3_local.query("SELECT * from foo where bar = 'baz' and time > now() - 'interval 1 hour'")
# Or using parameterized queries
args = {"bar": "baz"}
influxdb3_local.query("SELECT * from foo where bar = $bar and time > now() - 'interval 1 hour'", args)
Logging (日志记录)
共享 API info
、warn
和 error
函数将消息记录到数据库日志,该日志在服务器日志中输出,并捕获在可通过 SQL 查询的系统表中。 所有插件类型都提供 info
、warn
和 error
函数。 这些函数接受任意数量的参数,将它们转换为字符串,然后将它们连接成一个以空格分隔的单条消息。
以下示例演示如何使用 info
、warn
和 error
日志记录函数
ifluxdb3_local.info("This is an info message")
influxdb3_local.warn("This is a warning message")
influxdb3_local.error("This is an error message")
# Log a message that contains a data object
obj_to_log = {"hello": "world"}
influxdb3_local.info("This is an info message with an object", obj_to_log)
触发器参数
每种插件类型都可以从运行它的触发器的配置中接收参数。 您可以使用它来提供运行时配置并驱动插件的行为——例如:
- 用于监控的阈值
- 用于连接到第三方服务的连接属性
参数以 Dict[str, str]
的形式传递,其中键是参数名称,值是参数值。
以下示例演示如何在 WAL 插件中使用参数
def process_writes(influxdb3_local, table_batches, args=None):
if args and "threshold" in args:
threshold = int(args["threshold"])
influxdb3_local.info(f"Threshold is {threshold}")
else:
influxdb3_local.warn("No threshold provided")
args
参数是可选的,如果插件不需要使用参数,则可以从触发器定义中省略。
导入插件依赖项
使用 influxdb3 install
命令下载并安装您的插件依赖的 Python 包。
influxdb3 install package <PACKAGE_NAME>
将 influxdb3 install
与 Docker 结合使用
启动服务器
docker run \ --name CONTAINER_NAME \ -v /path/to/.influxdb3/data:/data \ -v /path/to/.influxdb3/plugins:/plugins \ quay.io/influxdb/influxdb3-core :latest \ serve --node-id=node0 \ --object-store=file \ --data-dir=/data \ --http-bind=localhost:8183 \ --plugin-dir=/plugins
使用
docker exec
运行influxdb3 install
命令docker exec -it CONTAINER_NAME influxdb3 install package pandas
结果是一个活动的 Python 虚拟环境,包安装在 <PLUGINS_DIR>/.venv
中。 您可以传递其他选项以使用 requirements.txt
文件或自定义虚拟环境路径。 有关更多信息,请参阅 influxdb3
CLI 帮助
influxdb3 install package --help
WAL 刷新插件
当 WAL 刷新插件被触发时,插件会收到一个 table_batches
列表,该列表按触发器配置(数据库中的所有表或特定表)进行过滤。
以下示例演示了一个简单的 WAL 刷新插件
def process_writes(influxdb3_local, table_batches, args=None):
for table_batch in table_batches:
# Skip the batch if table_name is write_reports
if table_batch["table_name"] == "write_reports":
continue
row_count = len(table_batch["rows"])
# Double the row count if table name matches args table_name
if args and "double_count_table" in args and table_batch["table_name"] == args["double_count_table"]:
row_count *= 2
# Use the LineBuilder API to write data
line = LineBuilder("write_reports")\
.tag("table_name", table_batch["table_name"])\
.int64_field("row_count", row_count)
influxdb3_local.write(line)
influxdb3_local.info("wal_plugin.py done")
WAL 刷新触发器配置
创建触发器时,您需要将其与数据库关联,并提供特定于触发器类型的配置。
对于 WAL 刷新触发器,您需要指定 trigger-spec
,它决定插件何时被触发(以及接收哪些表数据)
all-tables
:在每次写入关联数据库时触发插件table:<table_name>
:仅针对写入指定表的操作触发插件函数。
以下示例为 gh:examples/wal_plugin/wal_plugin.py
插件创建了一个 WAL 刷新触发器。
influxdb3 create trigger \
--trigger-spec "table:TABLE_NAME" \
--plugin-filename "gh:examples/wal_plugin/wal_plugin.py" \
--database DATABASE_NAME TRIGGER_NAME
gh:
前缀允许您直接从 GitHub 上的 influxdata/influxdb3_plugins 仓库中获取插件文件。 如果没有此前缀,服务器将在插件目录中查找该文件。
要为您的插件提供其他配置,请在 --trigger-arguments
选项中传递键值对列表,并在您的插件中使用 args
参数来接收参数。 有关触发器参数的更多信息,请参阅 CLI 帮助
influxdb3 create trigger help
计划插件
计划插件按照 cron 语法指定的计划运行。 插件将接收本地 API、触发器的时间以及在触发器定义中传递的任何参数。 以下是一个简单的计划插件示例
# see if a table has been written to in the last 5 minutes
def process_scheduled_call(influxdb3_local, time, args=None):
if args and "table_name" in args:
table_name = args["table_name"]
result = influxdb3_local.query(f"SELECT * FROM {table_name} WHERE time > now() - 'interval 5m'")
# write an error log if the result is empty
if not result:
influxdb3_local.error(f"No data in {table_name} in the last 5 minutes")
else:
influxdb3_local.error("No table_name provided for schedule plugin")
计划触发器配置
计划插件使用 schedule:<cron_expression>
或 every:<duration>
的 trigger-spec
设置。 args
参数可用于将配置传递给插件。 例如,如果我们想使用 Github 仓库中的 system-metrics 示例,并使其每 10 秒收集一次,我们可以使用以下触发器定义
influxdb3 create trigger \
--trigger-spec "every:10s" \
--plugin-filename "gh:examples/schedule/system_metrics/system_metrics.py" \
--database mydb system-metrics
按需请求插件
按需请求插件由对 /api/v3/engine
下特定端点的请求触发。 插件将接收本地 API、查询参数 Dict[str, str]
、请求头 Dict[str, str]
、请求体(以字节为单位)以及在触发器定义中传递的任何参数。 以下是一个简单的按需请求插件示例
import json
def process_request(influxdb3_local, query_parameters, request_headers, request_body, args=None):
for k, v in query_parameters.items():
influxdb3_local.info(f"query_parameters: {k}={v}")
for k, v in request_headers.items():
influxdb3_local.info(f"request_headers: {k}={v}")
request_data = json.loads(request_body)
influxdb3_local.info("parsed JSON request body:", request_data)
# write the data to the database
line = LineBuilder("request_data").tag("tag1", "tag1_value").int64_field("field1", 1)
# get a string of the line to return as the body
line_str = line.build()
influxdb3_local.write(line)
return 200, {"Content-Type": "application/json"}, json.dumps({"status": "ok", "line": line_str})
按需请求触发器配置
按需请求插件使用 request:<endpoint>
的 trigger-spec
设置。 args
参数可用于将配置传递给插件。 例如,如果我们希望上述插件在端点 /api/v3/engine/my_plugin
上运行,我们将使用 request:my_plugin
作为 trigger-spec
。
触发器规范在所有配置的插件中必须是唯一的,无论它们与哪个数据库关联,只要路径相同即可。 以下示例演示如何创建绑定到 “hello-world” 路径的请求触发器,该触发器使用插件目录中的插件
influxdb3 create trigger \
--trigger-spec "request:hello-world" \
--plugin-filename "hellp/hello_world.py" \
--database mydb hello-world
此页面是否对您有帮助?
感谢您的反馈!
支持和反馈
感谢您成为我们社区的一份子! 我们欢迎并鼓励您提供关于 InfluxDB 3 Core 和本文档的反馈和错误报告。 要寻求支持,请使用以下资源:
拥有年度合同或支持合同的客户可以联系 InfluxData 支持。