文档文档

使用 API 功能和状态管理扩展插件

处理引擎包含一个共享 API,您的插件可以使用它来与数据交互、以行协议格式写入新记录以及在执行之间维护状态。这些功能使您能够构建插件,在时间序列数据流经数据库时对其进行转换、分析和响应。

插件 API 允许您

开始使用共享 API

每个插件都可以通过 influxdb3_local 对象自动访问共享 API。您无需导入任何库。API 在插件运行后即可使用。

如果您的插件需要第三方 Python 包(如 pandasrequestsinfluxdb3-python),请参阅 管理插件依赖项 以获取安装说明。

写入数据

要将数据写入数据库,请使用 LineBuilder API 创建行协议数据

# Create a line protocol entry
line = LineBuilder("weather")
line.tag("location", "us-midwest")
line.float64_field("temperature", 82.5)
line.time_ns(1627680000000000000)

# Write the data to the database
influxdb3_local.write(line)

InfluxDB 3 会在插件运行时缓冲您的写入,并在插件完成时刷新它们。

查看 LineBuilder Python 实现

查询数据

您的插件可以直接执行 SQL 查询并处理结果

# Simple query
results = influxdb3_local.query("SELECT * FROM metrics WHERE time > now() - INTERVAL '1 hour'")

# Parameterized query for safer execution
params = {"table": "metrics", "threshold": 90}
results = influxdb3_local.query("SELECT * FROM $table WHERE value > $threshold", params)

查询结果是 Dict[String, Any]List,其中每个字典代表一行。列名是键,列值是相应的值。

记录消息以进行监控和调试

使用共享 API 的 infowarnerror 函数从您的插件记录消息。每个函数接受一个或多个参数,将它们转换为字符串,并将它们记录为以空格分隔的消息。

添加日志以监控插件执行并协助调试

influxdb3_local.info("Starting data processing")
influxdb3_local.warn("Could not process some records")
influxdb3_local.error("Failed to connect to external API")

# Log structured data
obj_to_log = {"records": 157, "errors": 3}
influxdb3_local.info("Processing complete", obj_to_log)

系统将所有日志消息写入服务器日志,并将其存储在 系统表中,您可以在其中使用 SQL 进行查询。

使用内存缓存维护状态

处理引擎提供了一个内存缓存,使您的插件能够在执行之间持久化和检索数据。

使用共享 API 的 cache 属性访问缓存

# Basic usage pattern  
influxdb3_local.cache.METHOD(PARAMETERS)

cache 提供以下方法来检索和管理缓存值

方法参数返回值描述
putkey (str):用于存储值的键
value (Any):要缓存的任何 Python 对象
ttl (Optional[float],默认值=None):过期时间(秒)
use_global (bool,默认值=False):如果为 True,则使用全局命名空间
None使用可选的生存时间将值存储在缓存中
getkey (str):要检索的键
default (Any,默认值=None):如果找不到键,则返回的值
use_global (bool,默认值=False):如果为 True,则使用全局命名空间
Any从缓存中检索值,如果找不到则返回默认值
deletekey (str):要删除的键
use_global (bool,默认值=False):如果为 True,则使用全局命名空间
bool从缓存中删除值。如果删除成功则返回 True,如果未找到则返回 False

理解缓存命名空间

缓存系统提供两个不同的命名空间

命名空间范围最佳用途
触发器特定(默认)隔离到单个触发器特定于单个插件的插件状态、计数器、时间戳
全局在所有触发器之间共享应可供所有插件使用的配置、查找表、服务状态

常用缓存操作

存储和检索缓存的数据

# Store a value
influxdb3_local.cache.put("last_run_time", time.time())

# Retrieve a value with a default if not found
last_time = influxdb3_local.cache.get("last_run_time", default=0)

# Delete a cached value
influxdb3_local.cache.delete("temporary_data")

存储带过期的缓存数据

# Cache with a 5-minute TTL (time-to-live)
influxdb3_local.cache.put("api_response", response_data, ttl=300)

跨插件共享数据

# Store in the global namespace
influxdb3_local.cache.put("config", {"version": "1.0"}, use_global=True)

# Retrieve from the global namespace
config = influxdb3_local.cache.get("config", use_global=True)

构建计数器

您可以跟踪插件运行的次数

# Get current counter or default to 0
counter = influxdb3_local.cache.get("execution_count", default=0)

# Increment counter
counter += 1

# Store the updated value
influxdb3_local.cache.put("execution_count", counter)

influxdb3_local.info(f"This plugin has run {counter} times")

内存缓存指南

要充分利用内存缓存,请遵循以下指南

使用触发器特定的命名空间

处理引擎提供了一个缓存,该缓存支持有状态操作,同时保持不同触发器之间的隔离。对于大多数用例,请使用触发器特定的命名空间来隔离插件状态。仅在需要跨触发器共享数据时使用全局命名空间。

适当地使用 TTL

根据数据更改的频率设置适当的过期时间

# Cache external API responses for 5 minutes  
influxdb3_local.cache.put("weather_data", api_response, ttl=300)

缓存计算结果

存储您经常使用的昂贵计算的结果

# Cache aggregated statistics  
influxdb3_local.cache.put("daily_stats", calculate_statistics(data), ttl=3600)

预热缓存

对于关键数据,在启动时预加载缓存。这对于需要数据的多个触发器的全局命名空间数据尤其有用

# Check if cache needs to be initialized  
if not influxdb3_local.cache.get("lookup_table"):   
    influxdb3_local.cache.put("lookup_table", load_lookup_data())

考虑缓存限制

  • 内存使用:由于系统在内存中存储缓存内容,因此在缓存大型数据集时请监控您的内存使用情况。
  • 服务器重启:由于服务器会在重启时清除缓存,因此请设计您的插件以处理缓存初始化(如上所述)。
  • 并发:当多个触发器实例可能同时更新同一缓存键时,要小心访问不准确或过时的数据。

后续步骤

通过理解 InfluxDB 3 共享插件 API,您可以开始构建数据工作流,以转换、分析和响应您的时间序列数据。

有关您可以扩展的官方插件和示例,请参阅 插件库


此页面是否有帮助?

感谢您的反馈!


InfluxDB 3.8 新特性

InfluxDB 3.8 和 InfluxDB 3 Explorer 1.6 的主要增强功能。

查看博客文章

InfluxDB 3.8 现已适用于 Core 和 Enterprise 版本,同时发布了 InfluxDB 3 Explorer UI 的 1.6 版本。本次发布着重于操作成熟度,以及如何更轻松地部署、管理和可靠地运行 InfluxDB。

更多信息,请查看

InfluxDB Docker 的 latest 标签将指向 InfluxDB 3 Core

在 **2026 年 2 月 3 日**,InfluxDB Docker 镜像的 latest 标签将指向 InfluxDB 3 Core。为避免意外升级,请在您的 Docker 部署中使用特定的版本标签。

如果使用 Docker 来安装和运行 InfluxDB,latest 标签将指向 InfluxDB 3 Core。为避免意外升级,请在您的 Docker 部署中使用特定的版本标签。例如,如果使用 Docker 运行 InfluxDB v2,请将 latest 版本标签替换为 Docker pull 命令中的特定版本标签 — 例如

docker pull influxdb:2