文档说明

InfluxDB v3的Python客户端库

InfluxDB v3 influxdb3-python Python客户端库 将InfluxDB Clustered的写和查询操作与Python脚本和应用程序集成。

InfluxDB客户端库提供可配置的批量将数据写入InfluxDB Clustered。客户端库可用于构造行协议数据,将其他格式数据转换为行协议,并将行协议数据批量写入InfluxDB HTTP API。

InfluxDB v3客户端库可以使用SQL或InfluxQL查询InfluxDB Clustered。Python客户端库 influxdb3-python 将Apache Arrow的 pyarrow.flight 客户端封装在方便的InfluxDB v3接口中,用于执行SQL和InfluxQL查询,请求服务器元数据,以及使用gRPC通过Flight协议从InfluxDB Clustered检索数据。

此页面的代码示例使用入门家庭传感器示例数据

安装

使用pip安装客户端库和依赖项

pip install influxdb3-python

导入模块

influxdb3-python客户端库包提供influxdb_client_3模块。

导入模块

import influxdb_client_3

从模块导入特定类方法

from influxdb_client_3 import InfluxDBClient3, Point, WriteOptions

API参考

influxdb_client_3模块包含以下类和函数。

类InfluxDBClient3

提供与InfluxDB API交互的接口,用于写入和查询数据。

InfluxDBClient3 构造函数初始化并返回一个客户端实例,具有以下功能

  • 一个单例 写入客户端,用于向数据库写入。
  • 一个单例 航班客户端,用于查询数据库。

参数

  • host(字符串):InfluxDB 实例的主机 URL。
  • database(字符串):用于写入和查询的数据库。
  • token(字符串):具有读写权限的数据库令牌。
  • 可选 write_client_options(字典):用于写入 InfluxDB 的选项。如果为 None,则写入为同步
  • 可选 flight_client_options(字典):用于查询 InfluxDB 的选项。

写入模式

写入数据时,客户端使用以下模式之一

同步写入

默认。在 InfluxDBClient3 初始化期间未提供 write_client_options 时,写入为同步。在同步模式下写入数据时,客户端立即尝试将提供的数据写入 InfluxDB,不会重试失败的请求,也不会调用响应回调。

示例:使用同步(非批量)默认值初始化客户端

以下示例初始化一个客户端,用于在 InfluxDB 集群数据库中写入和查询数据。由于未指定 write_client_options,客户端使用默认的同步写入模式。

–>

from influxdb_client_3 import InfluxDBClient3

client = InfluxDBClient3(host=f"cluster-host.com",
                        database=f"
DATABASE_NAME
"
,
token=f"
DATABASE_TOKEN
"
)

替换以下内容

  • DATABASE_NAME:您的 InfluxDB 集群数据库数据库的名称
  • DATABASE_TOKEN:具有在指定数据库上读写权限的 InfluxDB 集群数据库令牌

要显式指定同步模式,请使用 write_options=SYNCHRONOUS 创建客户端–例如

from influxdb_client_3 import InfluxDBClient3, write_client_options, SYNCHRONOUS

wco = write_client_options(write_options=SYNCHRONOUS)

client = InfluxDBClient3(host=f"cluster-host.com",
                        database=f"
DATABASE_NAME
"
,
token=f"
DATABASE_TOKEN
"
,
write_client_options=wco, flight_client_options=None)

替换以下内容

  • DATABASE_NAME:您的 InfluxDB 集群数据库数据库的名称
  • DATABASE_TOKEN:具有在指定数据库上写入权限的 InfluxDB 集群数据库令牌

批量写入

批量写入对于高效的批量数据操作特别有用。选项包括设置批量大小、刷新间隔、重试间隔等。

批量写入将多个写入操作组合成一个请求发送到 InfluxDB。在批量模式下,客户端将记录或记录添加到批处理中,然后安排批处理写入 InfluxDB。当达到 write_client_options.batch_sizewrite_client_options.flush_interval 时,客户端将批处理写入 InfluxDB。如果写入失败,客户端将根据 write_client_options 重试选项重新安排写入。

配置写入客户端选项

使用 WriteOptionswrite_client_options 配置客户端的批量写入和响应处理

  1. 实例化 WriteOptions。要使用批量默认值,请在构造函数中不指定参数。
  2. 调用 write_client_options 并使用 write_options 参数指定前一步骤中创建的 WriteOptions 实例。指定回调参数(成功、错误和重试)以在成功或错误时调用函数。
  3. 实例化 InfluxDBClient3 并使用 write_client_options 参数指定前一步骤中创建的 dict 输出。
示例:使用批量默认值和回调函数初始化客户端

以下示例展示了如何使用默认值批量模式和指定响应状态(成功、错误或可重试错误)的回调函数。

from influxdb_client_3 import(InfluxDBClient3,
                              write_client_options,
                              WriteOptions,
                              InfluxDBError)

status = None

# Define callbacks for write responses
def success(self, data: str):
    status = "Success writing batch: data: {data}"
    assert status.startswith('Success'), f"Expected {status} to be success"

def error(self, data: str, err: InfluxDBError):
    status = f"Error writing batch: config: {self}, data: {data}, error: {err}"
    assert status.startswith('Success'), f"Expected {status} to be success"


def retry(self, data: str, err: InfluxDBError):
    status = f"Retry error writing batch: config: {self}, data: {data}, error: {err}"
    assert status.startswith('Success'), f"Expected {status} to be success"

# Instantiate WriteOptions for batching
write_options = WriteOptions()
wco = write_client_options(success_callback=success,
                            error_callback=error,
                            retry_callback=retry,
                            write_options=write_options)

# Use the with...as statement to ensure the file is properly closed and resources
# are released.
with InfluxDBClient3(host=f"cluster-host.com",
                     database=f"
DATABASE_NAME
"
,
token=f"
DATABASE_TOKEN
"
,
write_client_options=wco) as client: client.write_file(file='./data/home-sensor-data.csv', timestamp_column='time', tag_columns=["room"], write_precision='s')

替换以下内容

  • DATABASE_NAME:您的 InfluxDB 集群数据库数据库的名称
  • DATABASE_TOKEN:具有在指定数据库上写入权限的 InfluxDB 集群数据库令牌

InfluxDBClient3实例方法

InfluxDBClient3.write

将记录或记录列表写入 InfluxDB。

参数

  • record(记录或列表):要写入的记录或记录列表。记录可以是 Point 对象、表示点的字典、行协议字符串或 DataFrame
  • database(字符串):要写入的数据库。默认是将数据写入客户端指定的数据库。
  • ****kwargs**: 其他写入选项 – 例如
    • write_precision (字符串):可选。默认是 "ns"。指定 record 中时间戳的 精度"ms""s""us""ns")。
    • write_client_options (字典):可选。指定 批量写入 模式下的回调函数和选项。要生成 dict,请使用 write_client_options 函数

示例:写入行协议字符串

from influxdb_client_3 import InfluxDBClient3

point = "home,room=Living\\ Room temp=21.1,hum=35.9,co=0i 1641024000"

client = InfluxDBClient3(host=f"cluster-host.com",
                         database=f"
DATABASE_NAME
"
,
token=f"
DATABASE_TOKEN
"
)
client.write(record=point, write_precision="s")

以下示例代码执行 SQL 查询以检索点

# Execute an SQL query
table = client.query(query='''SELECT room
                            FROM home
                            WHERE temp=21.1
                              AND time=from_unixtime(1641024000)''')
# table is a pyarrow.Table
room = table[0][0]
assert f"{room}" == 'Living Room', f"Expected {room} to be Living Room"

替换以下内容

  • DATABASE_NAME:您的 InfluxDB 集群数据库数据库的名称
  • DATABASE_TOKEN:具有在指定数据库上写入权限的 InfluxDB 集群数据库令牌

示例:使用 points 写入数据

influxdb_client_3.Point 类提供了为测量构建数据点并设置字段、标签和点时间戳的接口。以下示例显示了如何创建 Point 对象,然后将数据写入 InfluxDB。

from influxdb_client_3 import Point, InfluxDBClient3

point = Point("home").tag("room", "Kitchen").field("temp", 21.5).field("hum", .25)
client = InfluxDBClient3(host=f"cluster-host.com",
                         database=f"
DATABASE_NAME
"
,
token=f"
DATABASE_TOKEN
"
)
client.write(point)

以下示例代码执行 InfluxQL 查询以检索写入的数据

# Execute an InfluxQL query
table = client.query(query='''SELECT DISTINCT(temp) as val
                              FROM home
                              WHERE temp > 21.0
                              AND time >= now() - 10m''', language="influxql")
# table is a pyarrow.Table
df = table.to_pandas()
assert 21.5 in df['val'].values, f"Expected value in {df['val']}"

替换以下内容

  • DATABASE_NAME:您的 InfluxDB 集群数据库数据库的名称
  • DATABASE_TOKEN:具有在指定数据库上写入权限的 InfluxDB 集群数据库令牌
示例:使用字典写入数据

InfluxDBClient3 可以将字典对象序列化为行协议。如果您将 dict 传递给 InfluxDBClient3.write,客户端期望该 dict 具有以下 属性

  • measurement (字符串):测量名称
  • tags (字典):标签键值对的字典
  • fields (字典):字段键值对的字典
  • time:记录的 时间戳

以下示例显示了如何定义表示点的 dict,然后将其数据写入 InfluxDB。

from influxdb_client_3 import InfluxDBClient3

# Using point dictionary structure
points = {
          "measurement": "home",
          "tags": {"room": "Kitchen", "sensor": "K001"},
          "fields": {"temp": 72.2, "hum": 36.9, "co": 4},
          "time": 1641067200
          }

client = InfluxDBClient3(host=f"cluster-host.com",
                         database=f"
DATABASE_NAME
"
,
token=f"
DATABASE_TOKEN
"
)
client.write(record=points, write_precision="s")

替换以下内容

  • DATABASE_NAME:您的 InfluxDB 集群数据库数据库的名称
  • DATABASE_TOKEN:具有在指定数据库上写入权限的 InfluxDB 集群数据库令牌

InfluxDBClient3.write_file

将文件中的数据写入 InfluxDB。执行是同步的。

参数

  • file (字符串):包含要写入 InfluxDB 的记录的文件的路径。文件名必须以以下支持的扩展名之一结尾。有关编码和格式化数据的更多信息,请参阅每个支持格式的文档

  • measurement_name (字符串):定义文件中记录的测量名称。指定的值优先于文件中的 measurementiox::measurement 列。如果未指定参数值,并且文件中存在 measurement 列,则使用 measurement 列值作为测量名称。如果未指定参数值,并且不存在 measurement 列,则使用 iox::measurement 列值作为测量名称。

  • tag_columns (列表):标签列名称。列表中未包含的列以及未由其他参数指定的列被视为字段。

  • timestamp_column (字符串):包含时间戳的列的名称。默认为 'time'

  • database (str):要写入的数据库。默认为写入客户端指定的数据库。

  • file_parser_options (可调用对象):一个函数,用于向文件解析器提供额外的参数。

  • **kwargs:传递给 WriteAPI 的额外选项 – 例如

    • write_precision (字符串):可选。默认是 "ns"。指定 record 中时间戳的 精度"ms""s""us""ns")。
    • write_client_options (字典):可选。指定 批量写入 模式下的回调函数和选项。要生成 dict,请使用 write_client_options 函数

示例:在写入文件数据时使用批处理选项

以下示例显示了如何指定自定义的批量、重试和响应回调写入选项,以及如何从 CSV 和 JSON 文件中写入数据到 InfluxDB。

from influxdb_client_3 import(InfluxDBClient3, write_client_options,
                              WritePrecision, WriteOptions, InfluxDBError)

# Define the result object
result = {
    'config': None,
    'status': None,
    'data': None,
    'error': None
}

# Define callbacks for write responses
def success_callback(self, data: str):
    result['config'] = self
    result['status'] = 'success'
    result['data'] = data

    assert result['data'] != None, f"Expected {result['data']}"
    print("Successfully wrote data: {result['data']}")

def error_callback(self, data: str, exception: InfluxDBError):
    result['config'] = self
    result['status'] = 'error'
    result['data'] = data
    result['error'] = exception

    assert result['status'] == "success", f"Expected {result['error']} to be success for {result['config']}"

def retry_callback(self, data: str, exception: InfluxDBError):
    result['config'] = self
    result['status'] = 'retry_error'
    result['data'] = data
    result['error'] = exception

    assert result['status'] == "success", f"Expected {result['status']} to be success for {result['config']}"

write_options = WriteOptions(batch_size=500,
                            flush_interval=10_000,
                            jitter_interval=2_000,
                            retry_interval=5_000,
                            max_retries=5,
                            max_retry_delay=30_000,
                            exponential_base=2)


wco = write_client_options(success_callback=success_callback,
                          error_callback=error_callback,
                          retry_callback=retry_callback,
                          write_options=write_options)

with InfluxDBClient3(host=f"cluster-host.com",
                         database=f"
DATABASE_NAME
"
,
token=f"
DATABASE_TOKEN
"
,
write_client_options=wco) as client: client.write_file(file='./data/home-sensor-data.csv', timestamp_column='time', tag_columns=["room"], write_precision='s') client.write_file(file='./data/home-sensor-data.json', timestamp_column='time', tag_columns=["room"], write_precision='s')

替换以下内容

  • DATABASE_NAME:您的 InfluxDB 集群数据库数据库的名称
  • DATABASE_TOKEN:具有在指定数据库上写入权限的 InfluxDB 集群数据库令牌

InfluxDBClient3.query

发送一个 Flight 请求以执行指定的 SQL 或 InfluxQL 查询。返回查询结果中的所有数据,作为 Arrow 表(pyarrow.Table 实例)。

参数

  • query (字符串): 要执行的 SQL 或 InfluxQL。
  • language (字符串): query 参数中使用的查询语言 - "sql""influxql"。默认为 "sql"
  • mode (字符串): 指定从 pyarrow.flight.FlightStreamReader 返回的输出。默认为 "all"
    • all: 读取流中的全部内容,并作为一个 pyarrow.Table 返回。
    • chunk: 读取下一个消息(一个 FlightStreamChunk)并返回 dataapp_metadata。如果没有更多消息,则返回 null
    • pandas: 读取流中的内容,并作为一个 pandas.DataFrame 返回。
    • reader: 将 FlightStreamReader 转换为 pyarrow.RecordBatchReader
    • schema: 返回流中所有记录批次的模式。
  • **kwargs: FlightCallOptions

示例:使用 SQL 查询

from influxdb_client_3 import InfluxDBClient3

client = InfluxDBClient3(host=f"cluster-host.com",
                         database=f"
DATABASE_NAME
"
,
token=f"
DATABASE_TOKEN
"
)
table = client.query("SELECT * from home WHERE time >= now() - INTERVAL '90 days'") # Filter columns. print(table.select(['room', 'temp'])) # Use PyArrow to aggregate data. print(table.group_by('hum').aggregate([]))

在示例中,替换以下内容

  • DATABASE_NAME:您的 InfluxDB 集群数据库数据库的名称
  • DATABASE_TOKEN:具有读取指定数据库权限的 InfluxDB 集群 数据库令牌

示例:使用 InfluxQL 查询

from influxdb_client_3 import InfluxDBClient3

client = InfluxDBClient3(host=f"cluster-host.com",
                         database=f"
DATABASE_NAME
"
,
token=f"
DATABASE_TOKEN
"
)
query = "SELECT * from home WHERE time >= -90d" table = client.query(query=query, language="influxql") # Filter columns. print(table.select(['room', 'temp']))
示例:读取流中的所有数据并返回一个 pandas DataFrame
from influxdb_client_3 import InfluxDBClient3

client = InfluxDBClient3(host=f"cluster-host.com",
                         database=f"
DATABASE_NAME
"
,
token=f"
DATABASE_TOKEN
"
)
query = "SELECT * from home WHERE time >= now() - INTERVAL '90 days'" pd = client.query(query=query, mode="pandas") # Print the pandas DataFrame formatted as a Markdown table. print(pd.to_markdown())
示例:查看流中所有批次的模式
from influxdb_client_3 import InfluxDBClient3

client = InfluxDBClient3(host=f"cluster-host.com",
                         database=f"
DATABASE_NAME
"
,
token=f"
DATABASE_TOKEN
"
)
table = client.query("""SELECT * from home WHERE time >= now() - INTERVAL '90 days'""") # View the table schema. print(table.schema)
示例:检索结果模式且没有数据
from influxdb_client_3 import InfluxDBClient3

client = InfluxDBClient3(host=f"cluster-host.com",
                         database=f"
DATABASE_NAME
"
,
token=f"
DATABASE_TOKEN
"
)
query = "SELECT * from home WHERE time >= now() - INTERVAL '90 days'" schema = client.query(query=query, mode="schema") print(schema)
指定超时时间

FlightCallOptions 传递 timeout=<秒数> 以使用自定义超时。

from influxdb_client_3 import InfluxDBClient3

client = InfluxDBClient3(host=f"cluster-host.com",
                         database=f"
DATABASE_NAME
"
,
token=f"
DATABASE_TOKEN
"
)
query = "SELECT * from home WHERE time >= now() - INTERVAL '90 days'" client.query(query=query, timeout=5)

InfluxDBClient3.close

将批次中的所有剩余记录发送到 InfluxDB,然后关闭底层的写入客户端和 Flight 客户端以释放资源。

示例:关闭客户端

from influxdb_client_3 import InfluxDBClient3

client = InfluxDBClient3(host=f"cluster-host.com",
                         database=f"
DATABASE_NAME
"
,
token=f"
DATABASE_TOKEN
"
)
client.close()

类Point

提供了一个接口来构建一个测量值的时间序列数据点,并设置字段、标签和时间戳。

from influxdb_client_3 import Point
point = Point("home").tag("room", "Living Room").field("temp", 72)

查看如何 使用点写入数据

类WriteOptions

提供了一个接口来构建自定义批量写入行为的选项,例如批量大小和重试。

from influxdb_client_3 import WriteOptions

write_options = WriteOptions(batch_size=500,
                             flush_interval=10_000,
                             jitter_interval=2_000,
                             retry_interval=5_000,
                             max_retries=5,
                             max_retry_delay=30_000,
                             exponential_base=2)

查看如何 在写入数据时使用批量选项

参数

  • batch_size:默认为 1000
  • flush_interval:默认为 1000
  • jitter_interval:默认为 0
  • retry_interval:默认为 5000
  • max_retries:默认为 5
  • max_retry_delay:默认为 125000
  • max_retry_time:默认为 180000
  • exponential_base:默认为 2
  • max_close_wait:默认为 300000
  • write_scheduler:默认为 ThreadPoolScheduler(max_workers=1)

函数

函数write_client_options(**kwargs)

返回一个包含指定写入客户端选项的 dict

参数

该函数接受以下关键字参数

  • write_options (WriteOptions):指定客户端是否使用同步模式或批量模式写入数据。如果使用批量模式,客户端将使用指定的批量选项。
  • point_settings (dict):客户端在将数据写入 InfluxDB 时将添加到每个点的默认标签。
  • success_callback (可调用对象):如果使用批量模式,则在数据成功写入 InfluxDB(HTTP 状态 204)后调用的函数
  • error_callback (可调用对象):如果使用批处理模式,当数据写入不成功(响应HTTP状态非204)时调用的函数
  • retry_callback (可调用对象):如果使用批处理模式,当请求是重试(使用批处理模式)且数据写入不成功时调用的函数

示例:实例化批处理写入的选项

from influxdb_client_3 import write_client_options, WriteOptions
from influxdb_client_3.write_client.client.write_api import WriteType

def success():
  print("Success")
def error():
  print("Error")
def retry():
  print("Retry error")

write_options = WriteOptions()
wco = write_client_options(success_callback=success,
                            error_callback=error,
                            retry_callback=retry,
                            write_options=write_options)

assert wco['success_callback']
assert wco['error_callback']
assert wco['retry_callback']
assert wco['write_options'].write_type == WriteType.batching

示例:实例化同步写入的选项

from influxdb_client_3 import write_client_options, SYNCHRONOUS
from influxdb_client_3.write_client.client.write_api import WriteType

wco = write_client_options(write_options=SYNCHRONOUS)

assert wco['write_options'].write_type == WriteType.synchronous

函数flight_client_options(**kwargs)

返回一个包含指定 FlightClient 参数的 dict

参数

示例:指定根证书路径

from influxdb_client_3 import InfluxDBClient3, flight_client_options
import certifi

fh = open(certifi.where(), "r")
cert = fh.read()
fh.close()

client = InfluxDBClient3(host=f"cluster-host.com",
                         database=f"
DATABASE_NAME
"
,
token=f"
DATABASE_TOKEN
"
,
fco=flight_client_options(tls_root_certs=cert))

替换以下内容

  • DATABASE_NAME:您的 InfluxDB 集群数据库数据库的名称
  • DATABASE_TOKEN:具有读取指定数据库权限的 InfluxDB 集群 数据库令牌

常量

  • influxdb_client_3.SYNCHRONOUS:表示同步写入模式
  • influxdb_client_3.WritePrecision:表示写入精度的枚举类

异常

  • influxdb_client_3.InfluxDBError:引发与 InfluxDB 相关错误的异常类

这个页面有用吗?

感谢您的反馈!


Flux 的未来

Flux 将进入维护模式。您可以在不更改代码的情况下继续像现在一样使用它。

了解更多信息

InfluxDB v3 增强功能和 InfluxDB Clustered 现已正式发布

新功能包括更快的查询性能和管理工具,这些功能推动了 InfluxDB v3 产品线的发展。InfluxDB Clustered 现已正式发布。

InfluxDB v3 性能和功能

InfluxDB v3 产品线在查询性能方面取得了重大提升,并提供了新的管理工具。这些增强包括一个操作仪表板来监控您 InfluxDB 集群的健康状况,InfluxDB Cloud Dedicated 中的单点登录(SSO)支持,以及针对令牌和数据库的新管理 API。

了解新的 v3 增强功能


InfluxDB Clustered 正式发布

InfluxDB Clustered 现已正式发布,为您在自管理的堆栈中提供了 InfluxDB v3 的功能。

与我们谈谈 InfluxDB Clustered