使用 API 功能和状态管理扩展插件
处理引擎包含一个共享 API,您的插件可以使用它来与数据交互、以行协议格式写入新记录以及在执行之间维护状态。这些功能使您能够构建插件,在时间序列数据流经数据库时对其进行转换、分析和响应。
插件 API 允许您
开始使用共享 API
每个插件都可以通过 influxdb3_local 对象自动访问共享 API。您无需导入任何库。API 在插件运行后即可使用。
如果您的插件需要第三方 Python 包(如 pandas、requests 或 influxdb3-python),请参阅 管理插件依赖项 以获取安装说明。
写入数据
要将数据写入数据库,请使用 LineBuilder API 创建行协议数据
# Create a line protocol entry
line = LineBuilder("weather")
line.tag("location", "us-midwest")
line.float64_field("temperature", 82.5)
line.time_ns(1627680000000000000)
# Write the data to the database
influxdb3_local.write(line)InfluxDB 3 会在插件运行时缓冲您的写入,并在插件完成时刷新它们。
查询数据
您的插件可以直接执行 SQL 查询并处理结果
# Simple query
results = influxdb3_local.query("SELECT * FROM metrics WHERE time > now() - INTERVAL '1 hour'")
# Parameterized query for safer execution
params = {"table": "metrics", "threshold": 90}
results = influxdb3_local.query("SELECT * FROM $table WHERE value > $threshold", params)查询结果是 Dict[String, Any] 的 List,其中每个字典代表一行。列名是键,列值是相应的值。
记录消息以进行监控和调试
使用共享 API 的 info、warn 和 error 函数从您的插件记录消息。每个函数接受一个或多个参数,将它们转换为字符串,并将它们记录为以空格分隔的消息。
添加日志以监控插件执行并协助调试
influxdb3_local.info("Starting data processing")
influxdb3_local.warn("Could not process some records")
influxdb3_local.error("Failed to connect to external API")
# Log structured data
obj_to_log = {"records": 157, "errors": 3}
influxdb3_local.info("Processing complete", obj_to_log)系统将所有日志消息写入服务器日志,并将其存储在 系统表中,您可以在其中使用 SQL 进行查询。
使用内存缓存维护状态
处理引擎提供了一个内存缓存,使您的插件能够在执行之间持久化和检索数据。
使用共享 API 的 cache 属性访问缓存
# Basic usage pattern
influxdb3_local.cache.METHOD(PARAMETERS)cache 提供以下方法来检索和管理缓存值
| 方法 | 参数 | 返回值 | 描述 |
|---|---|---|---|
put | key (str):用于存储值的键value (Any):要缓存的任何 Python 对象ttl (Optional[float],默认值=None):过期时间(秒)use_global (bool,默认值=False):如果为 True,则使用全局命名空间 | None | 使用可选的生存时间将值存储在缓存中 |
get | key (str):要检索的键default (Any,默认值=None):如果找不到键,则返回的值use_global (bool,默认值=False):如果为 True,则使用全局命名空间 | Any | 从缓存中检索值,如果找不到则返回默认值 |
delete | key (str):要删除的键use_global (bool,默认值=False):如果为 True,则使用全局命名空间 | bool | 从缓存中删除值。如果删除成功则返回 True,如果未找到则返回 False |
理解缓存命名空间
缓存系统提供两个不同的命名空间
| 命名空间 | 范围 | 最佳用途 |
|---|---|---|
| 触发器特定(默认) | 隔离到单个触发器 | 特定于单个插件的插件状态、计数器、时间戳 |
| 全局 | 在所有触发器之间共享 | 应可供所有插件使用的配置、查找表、服务状态 |
常用缓存操作
存储和检索缓存的数据
# Store a value
influxdb3_local.cache.put("last_run_time", time.time())
# Retrieve a value with a default if not found
last_time = influxdb3_local.cache.get("last_run_time", default=0)
# Delete a cached value
influxdb3_local.cache.delete("temporary_data")存储带过期的缓存数据
# Cache with a 5-minute TTL (time-to-live)
influxdb3_local.cache.put("api_response", response_data, ttl=300)跨插件共享数据
# Store in the global namespace
influxdb3_local.cache.put("config", {"version": "1.0"}, use_global=True)
# Retrieve from the global namespace
config = influxdb3_local.cache.get("config", use_global=True)构建计数器
您可以跟踪插件运行的次数
# Get current counter or default to 0
counter = influxdb3_local.cache.get("execution_count", default=0)
# Increment counter
counter += 1
# Store the updated value
influxdb3_local.cache.put("execution_count", counter)
influxdb3_local.info(f"This plugin has run {counter} times")内存缓存指南
要充分利用内存缓存,请遵循以下指南
使用触发器特定的命名空间
处理引擎提供了一个缓存,该缓存支持有状态操作,同时保持不同触发器之间的隔离。对于大多数用例,请使用触发器特定的命名空间来隔离插件状态。仅在需要跨触发器共享数据时使用全局命名空间。
适当地使用 TTL
根据数据更改的频率设置适当的过期时间
# Cache external API responses for 5 minutes
influxdb3_local.cache.put("weather_data", api_response, ttl=300)缓存计算结果
存储您经常使用的昂贵计算的结果
# Cache aggregated statistics
influxdb3_local.cache.put("daily_stats", calculate_statistics(data), ttl=3600)预热缓存
对于关键数据,在启动时预加载缓存。这对于需要数据的多个触发器的全局命名空间数据尤其有用
# Check if cache needs to be initialized
if not influxdb3_local.cache.get("lookup_table"):
influxdb3_local.cache.put("lookup_table", load_lookup_data())考虑缓存限制
- 内存使用:由于系统在内存中存储缓存内容,因此在缓存大型数据集时请监控您的内存使用情况。
- 服务器重启:由于服务器会在重启时清除缓存,因此请设计您的插件以处理缓存初始化(如上所述)。
- 并发:当多个触发器实例可能同时更新同一缓存键时,要小心访问不准确或过时的数据。
后续步骤
通过理解 InfluxDB 3 共享插件 API,您可以开始构建数据工作流,以转换、分析和响应您的时间序列数据。
有关您可以扩展的官方插件和示例,请参阅 插件库。
此页面是否有帮助?
感谢您的反馈!
支持和反馈
感谢您成为我们社区的一员!我们欢迎并鼓励您对 InfluxDB 3 Core 和本文档提供反馈和错误报告。要获得支持,请使用以下资源
具有年度合同或支持合同的客户可以 联系 InfluxData 支持。