文档文档 (Documentation)

Python 插件和处理引擎

InfluxDB 3 Core 处于公开 Alpha 阶段

InfluxDB 3 Core 处于公开 alpha 阶段,可用于测试和反馈,但不适用于生产环境。 产品和本文档都在不断完善中。 我们欢迎并鼓励您提供关于 alpha 体验的意见,并邀请您加入我们的公共渠道以获取更新和分享反馈。

Alpha 阶段的预期和建议

处理引擎仅适用于 Docker

处理引擎目前仅在 Docker x86 环境中受支持。 非 Docker 支持即将推出。 引擎、API 和开发者体验都在积极发展,可能会发生变化。 加入我们的 Discord 以获取更新和反馈。

使用 InfluxDB 3 Core 处理引擎来运行代码并执行不同数据库事件的任务。

InfluxDB 3 Core 提供了 InfluxDB 3 处理引擎,这是一个嵌入式 Python VM,可以动态加载和触发 Python 插件以响应数据库中的事件。

插件

贡献和使用社区插件

influxdata/influxdb3_plugins 是 GitHub 上的一个公共仓库,您可以在其中找到和贡献示例插件。 您可以直接在触发器配置中引用仓库中的插件。

处理引擎插件是您提供的 Python 代码,用于运行任务,例如数据降采样、监控、创建警报或调用外部服务。

触发器

触发器是您创建的 InfluxDB 3 资源,用于将数据库事件(例如,WAL 刷新)与应运行的插件关联起来。 当事件发生时,触发器会将配置、可选参数和事件数据传递给插件。

处理引擎提供四种类型的插件和触发器——每种类型都对应于一种事件类型,并具有特定于事件的配置,使您可以使用有针对性的逻辑来处理事件。

  • WAL 刷新:当预写日志 (WAL) 刷新到对象存储时触发(默认情况下为每秒一次)
  • Parquet 持久化(即将推出):当 InfluxDB 3 将数据持久化到对象存储 Parquet 文件时触发
  • 计划任务:按照您使用 cron 语法指定的计划触发
  • 按需请求:绑定到 HTTP API /api/v3/engine/<CUSTOM_PATH> 端点,并通过对端点的 GET 或 POST 请求触发。

激活处理引擎

要启用处理引擎,请使用 --plugin-dir 选项和插件目录的路径(目录尚无需存在)启动 InfluxDB 3 Core 服务器——例如

influxdb3 serve --node-id node0 --plugin-dir /path/to/plugins

共享 API

所有插件类型都提供 InfluxDB 3 共享 API,用于与数据库交互。 共享 API 提供对以下内容的访问:

  • LineBuilder:用于创建 Line Protocol 行以写入数据库
  • query:用于从任何数据库查询数据
  • infowarnerror:用于将消息记录到数据库日志,该日志在服务器日志中输出,并捕获在可通过 SQL 查询的系统表中

Line builder (行构建器)

LineBuilder 是一个简单的 API,用于构建 Line Protocol 行以写入数据库。 写入在插件运行时缓冲,并在插件完成时刷新。 所有插件类型都提供 LineBuilder API。

以下示例演示如何使用 LineBuilder API

# Build line protocol incrementally
line = LineBuilder("weather")
line.tag("location", "us-midwest")
line.float64_field("temperature", 82.5)
line.time_ns(1627680000000000000)
influxdb3_local.write(line)

# Output line protocol as a string ("weather,location=us-midwest temperature=82.5 1627680000000000000")
line_str = line.build()

以下是 LineBuilder API 的 Python 实现

from typing import Optional
from collections import OrderedDict

class InfluxDBError(Exception):
    """Base exception for InfluxDB-related errors"""
    pass

class InvalidMeasurementError(InfluxDBError):
    """Raised when measurement name is invalid"""
    pass

class InvalidKeyError(InfluxDBError):
    """Raised when a tag or field key is invalid"""
    pass

class InvalidLineError(InfluxDBError):
    """Raised when a line protocol string is invalid"""
    pass

class LineBuilder:
    def __init__(self, measurement: str):
        if ' ' in measurement:
            raise InvalidMeasurementError("Measurement name cannot contain spaces")
        self.measurement = measurement
        self.tags: OrderedDict[str, str] = OrderedDict()
        self.fields: OrderedDict[str, str] = OrderedDict()
        self._timestamp_ns: Optional[int] = None

    def _validate_key(self, key: str, key_type: str) -> None:
        """Validate that a key does not contain spaces, commas, or equals signs."""
        if not key:
            raise InvalidKeyError(f"{key_type} key cannot be empty")
        if ' ' in key:
            raise InvalidKeyError(f"{key_type} key '{key}' cannot contain spaces")
        if ',' in key:
            raise InvalidKeyError(f"{key_type} key '{key}' cannot contain commas")
        if '=' in key:
            raise InvalidKeyError(f"{key_type} key '{key}' cannot contain equals signs")

    def tag(self, key: str, value: str) -> 'LineBuilder':
        """Add a tag to the line protocol."""
        self._validate_key(key, "tag")
        self.tags[key] = str(value)
        return self

    def uint64_field(self, key: str, value: int) -> 'LineBuilder':
        """Add an unsigned integer field to the line protocol."""
        self._validate_key(key, "field")
        if value < 0:
            raise ValueError(f"uint64 field '{key}' cannot be negative")
        self.fields[key] = f"{value}u"
        return self

    def int64_field(self, key: str, value: int) -> 'LineBuilder':
        """Add an integer field to the line protocol."""
        self._validate_key(key, "field")
        self.fields[key] = f"{value}i"
        return self

    def float64_field(self, key: str, value: float) -> 'LineBuilder':
        """Add a float field to the line protocol."""
        self._validate_key(key, "field")
        # Check if value has no decimal component
        self.fields[key] = f"{int(value)}.0" if value % 1 == 0 else str(value)
        return self

    def string_field(self, key: str, value: str) -> 'LineBuilder':
        """Add a string field to the line protocol."""
        self._validate_key(key, "field")
        # Escape quotes and backslashes in string values
        escaped_value = value.replace('"', '\\"').replace('\\', '\\\\')
        self.fields[key] = f'"{escaped_value}"'
        return self

    def bool_field(self, key: str, value: bool) -> 'LineBuilder':
        """Add a boolean field to the line protocol."""
        self._validate_key(key, "field")
        self.fields[key] = 't' if value else 'f'
        return self

    def time_ns(self, timestamp_ns: int) -> 'LineBuilder':
        """Set the timestamp in nanoseconds."""
        self._timestamp_ns = timestamp_ns
        return self

    def build(self) -> str:
        """Build the line protocol string."""
        # Start with measurement name (escape commas only)
        line = self.measurement.replace(',', '\\,')

        # Add tags if present
        if self.tags:
            tags_str = ','.join(
                f"{k}={v}" for k, v in self.tags.items()
            )
            line += f",{tags_str}"

        # Add fields (required)
        if not self.fields:
            raise InvalidLineError(f"At least one field is required: {line}")

        fields_str = ','.join(
            f"{k}={v}" for k, v in self.fields.items()
        )
        line += f" {fields_str}"

        # Add timestamp if present
        if self._timestamp_ns is not None:
            line += f" {self._timestamp_ns}"

        return line

Query (查询)

共享 API query 函数使用可选参数(参数化查询)执行 SQL 查询,并将结果作为 Dict[String, Any]List 返回,其中键是列名,值是列值。 所有插件类型都提供 query 函数。

以下示例演示如何使用 query 函数

influxdb3_local.query("SELECT * from foo where bar = 'baz' and time > now() - 'interval 1 hour'")

# Or using parameterized queries
args = {"bar": "baz"}
influxdb3_local.query("SELECT * from foo where bar = $bar and time > now() - 'interval 1 hour'", args)

Logging (日志记录)

共享 API infowarnerror 函数将消息记录到数据库日志,该日志在服务器日志中输出,并捕获在可通过 SQL 查询的系统表中。 所有插件类型都提供 infowarnerror 函数。 这些函数接受任意数量的参数,将它们转换为字符串,然后将它们连接成一个以空格分隔的单条消息。

以下示例演示如何使用 infowarnerror 日志记录函数

ifluxdb3_local.info("This is an info message")
influxdb3_local.warn("This is a warning message")
influxdb3_local.error("This is an error message")

# Log a message that contains a data object
obj_to_log = {"hello": "world"}
influxdb3_local.info("This is an info message with an object", obj_to_log)

触发器参数

每种插件类型都可以从运行它的触发器的配置中接收参数。 您可以使用它来提供运行时配置并驱动插件的行为——例如:

  • 用于监控的阈值
  • 用于连接到第三方服务的连接属性

参数以 Dict[str, str] 的形式传递,其中键是参数名称,值是参数值。

以下示例演示如何在 WAL 插件中使用参数

def process_writes(influxdb3_local, table_batches, args=None):
    if args and "threshold" in args:
        threshold = int(args["threshold"])
        influxdb3_local.info(f"Threshold is {threshold}")
    else:
        influxdb3_local.warn("No threshold provided")

args 参数是可选的,如果插件不需要使用参数,则可以从触发器定义中省略。

导入插件依赖项

使用 influxdb3 install 命令下载并安装您的插件依赖的 Python 包。

influxdb3 install package <PACKAGE_NAME>

influxdb3 install 与 Docker 结合使用

  1. 启动服务器

    docker run \
    --name CONTAINER_NAME \
    -v /path/to/.influxdb3/data:/data \
    -v /path/to/.influxdb3/plugins:/plugins \
    quay.io/influxdb/influxdb3-core
    :latest \
    serve --node-id=node0 \
    --object-store=file \
    --data-dir=/data \
    --http-bind=localhost:8183 \
    --plugin-dir=/plugins
    
  2. 使用 docker exec 运行 influxdb3 install 命令

    docker exec -it CONTAINER_NAME influxdb3 install package pandas
    

结果是一个活动的 Python 虚拟环境,包安装在 <PLUGINS_DIR>/.venv 中。 您可以传递其他选项以使用 requirements.txt 文件或自定义虚拟环境路径。 有关更多信息,请参阅 influxdb3 CLI 帮助

influxdb3 install package --help

WAL 刷新插件

当 WAL 刷新插件被触发时,插件会收到一个 table_batches 列表,该列表按触发器配置(数据库中的所有表或特定表)进行过滤。

以下示例演示了一个简单的 WAL 刷新插件

def process_writes(influxdb3_local, table_batches, args=None):
    for table_batch in table_batches:
        # Skip the batch if table_name is write_reports
        if table_batch["table_name"] == "write_reports":
            continue

        row_count = len(table_batch["rows"])

        # Double the row count if table name matches args table_name
        if args and "double_count_table" in args and table_batch["table_name"] == args["double_count_table"]:
            row_count *= 2

        # Use the LineBuilder API to write data
        line = LineBuilder("write_reports")\
            .tag("table_name", table_batch["table_name"])\
            .int64_field("row_count", row_count)
        influxdb3_local.write(line)

    influxdb3_local.info("wal_plugin.py done")

WAL 刷新触发器配置

创建触发器时,您需要将其与数据库关联,并提供特定于触发器类型的配置。

对于 WAL 刷新触发器,您需要指定 trigger-spec,它决定插件何时被触发(以及接收哪些表数据)

  • all-tables:在每次写入关联数据库时触发插件
  • table:<table_name>:仅针对写入指定表的操作触发插件函数。

以下示例为 gh:examples/wal_plugin/wal_plugin.py 插件创建了一个 WAL 刷新触发器。

influxdb3 create trigger \
  --trigger-spec "table:TABLE_NAME" \
  --plugin-filename "gh:examples/wal_plugin/wal_plugin.py" \
  --database DATABASE_NAME TRIGGER_NAME

gh: 前缀允许您直接从 GitHub 上的 influxdata/influxdb3_plugins 仓库中获取插件文件。 如果没有此前缀,服务器将在插件目录中查找该文件。

要为您的插件提供其他配置,请在 --trigger-arguments 选项中传递键值对列表,并在您的插件中使用 args 参数来接收参数。 有关触发器参数的更多信息,请参阅 CLI 帮助

influxdb3 create trigger help

计划插件

计划插件按照 cron 语法指定的计划运行。 插件将接收本地 API、触发器的时间以及在触发器定义中传递的任何参数。 以下是一个简单的计划插件示例

# see if a table has been written to in the last 5 minutes
def process_scheduled_call(influxdb3_local, time, args=None):
    if args and "table_name" in args:
        table_name = args["table_name"]
        result = influxdb3_local.query(f"SELECT * FROM {table_name} WHERE time > now() - 'interval 5m'")
        # write an error log if the result is empty
        if not result:
            influxdb3_local.error(f"No data in {table_name} in the last 5 minutes")
    else:
        influxdb3_local.error("No table_name provided for schedule plugin")

计划触发器配置

计划插件使用 schedule:<cron_expression>every:<duration>trigger-spec 设置。 args 参数可用于将配置传递给插件。 例如,如果我们想使用 Github 仓库中的 system-metrics 示例,并使其每 10 秒收集一次,我们可以使用以下触发器定义

influxdb3 create trigger \
  --trigger-spec "every:10s" \
  --plugin-filename "gh:examples/schedule/system_metrics/system_metrics.py" \
  --database mydb system-metrics

按需请求插件

按需请求插件由对 /api/v3/engine 下特定端点的请求触发。 插件将接收本地 API、查询参数 Dict[str, str]、请求头 Dict[str, str]、请求体(以字节为单位)以及在触发器定义中传递的任何参数。 以下是一个简单的按需请求插件示例

import json

def process_request(influxdb3_local, query_parameters, request_headers, request_body, args=None):
    for k, v in query_parameters.items():
        influxdb3_local.info(f"query_parameters: {k}={v}")
    for k, v in request_headers.items():
        influxdb3_local.info(f"request_headers: {k}={v}")

    request_data = json.loads(request_body)

    influxdb3_local.info("parsed JSON request body:", request_data)

    # write the data to the database
    line = LineBuilder("request_data").tag("tag1", "tag1_value").int64_field("field1", 1)
    # get a string of the line to return as the body
    line_str = line.build()

    influxdb3_local.write(line)

    return 200, {"Content-Type": "application/json"}, json.dumps({"status": "ok", "line": line_str})

按需请求触发器配置

按需请求插件使用 request:<endpoint>trigger-spec 设置。 args 参数可用于将配置传递给插件。 例如,如果我们希望上述插件在端点 /api/v3/engine/my_plugin 上运行,我们将使用 request:my_plugin 作为 trigger-spec

触发器规范在所有配置的插件中必须是唯一的,无论它们与哪个数据库关联,只要路径相同即可。 以下示例演示如何创建绑定到 “hello-world” 路径的请求触发器,该触发器使用插件目录中的插件

influxdb3 create trigger \
  --trigger-spec "request:hello-world" \
  --plugin-filename "hellp/hello_world.py" \
  --database mydb hello-world

此页面是否对您有帮助?

感谢您的反馈!


Flux 的未来

Flux 即将进入维护模式。 您可以继续像当前一样使用它,而无需对您的代码进行任何更改。

阅读更多

InfluxDB 3 开源版本现已发布公开 Alpha 版

InfluxDB 3 开源版本现已可用于 alpha 测试,根据 MIT 或 Apache 2 许可获得许可。

我们正在发布两个产品作为 alpha 版本的一部分。

InfluxDB 3 Core 是我们新的开源产品。 它是用于时间序列和事件数据的最新数据引擎。 InfluxDB 3 Enterprise 是一个商业版本,它建立在 Core 的基础上,增加了历史查询能力、读取副本、高可用性、可扩展性和细粒度的安全性。

有关如何开始使用的更多信息,请查看