文档文档

InfluxDB 3 的 Python 客户端库

InfluxDB 3 influxdb3-python Python 客户端库 将 InfluxDB Clustered 的写入和查询操作与 Python 脚本和应用程序集成。

InfluxDB 客户端库提供可配置的批量写入数据到 InfluxDB Clustered 的功能。客户端库可用于构建 Line Protocol 数据,将其他格式的数据转换为 Line Protocol,并将 Line Protocol 数据批量写入 InfluxDB HTTP API。

InfluxDB 3 客户端库可以使用 SQL 或 InfluxQL 查询 InfluxDB Clustered。influxdb3-python Python 客户端库将 Apache Arrow pyarrow.flight 客户端封装在一个方便的 InfluxDB 3 接口中,用于执行 SQL 和 InfluxQL 查询、请求服务器元数据,以及使用 Flight 协议和 gRPC 从 InfluxDB Clustered 检索数据。

本页面的代码示例使用了入门家庭传感器示例数据

安装

使用 pip 安装客户端库和依赖项

pip install influxdb3-python

导入模块

influxdb3-python 客户端库软件包提供了 influxdb_client_3 模块。

导入模块

import influxdb_client_3

从模块导入特定类方法

from influxdb_client_3 import InfluxDBClient3, Point, WriteOptions

API 参考

influxdb_client_3 模块包含以下类和函数。

InfluxDBClient3 类

提供用于与 InfluxDB API 交互以进行数据写入和查询的接口。

InfluxDBClient3 构造函数初始化并返回一个客户端实例,其中包含以下内容

  • 配置为写入数据库的单例写入客户端
  • 配置为查询数据库的单例 Flight 客户端

参数

  • host (字符串):InfluxDB 实例的主机 URL。
  • database (字符串):用于写入和查询的数据库。
  • token (字符串):具有读/写权限的数据库令牌。
  • 可选 write_client_options (字典):写入 InfluxDB 时使用的选项。如果为 None,则写入是同步的
  • 可选 flight_client_options (字典):查询 InfluxDB 时使用的选项。

写入模式

写入数据时,客户端使用以下模式之一

同步写入

默认值。当在初始化 InfluxDBClient3 期间未提供 write_client_options 时,写入是同步的。在同步模式下写入数据时,客户端立即尝试将提供的数据写入 InfluxDB,不重试失败的请求,也不调用响应回调。

示例:使用同步(非批量)默认值初始化客户端

以下示例初始化一个客户端,用于在 InfluxDB Clustered 数据库中写入和查询数据。鉴于未指定 write_client_options,客户端使用默认的同步写入模式。

–>

from influxdb_client_3 import InfluxDBClient3

client = InfluxDBClient3(host=f"cluster-host.com",
                        database=f"
DATABASE_NAME
"
,
token=f"
DATABASE_TOKEN
"
)

替换以下内容

  • DATABASE_NAME:您的 InfluxDB Clustered 数据库的名称
  • DATABASE_TOKEN:InfluxDB Clustered 数据库令牌,对指定的数据库具有读/写权限

要显式指定同步模式,请使用 write_options=SYNCHRONOUS 创建客户端——例如

from influxdb_client_3 import InfluxDBClient3, write_client_options, SYNCHRONOUS

wco = write_client_options(write_options=SYNCHRONOUS)

client = InfluxDBClient3(host=f"cluster-host.com",
                        database=f"
DATABASE_NAME
"
,
token=f"
DATABASE_TOKEN
"
,
write_client_options=wco, flight_client_options=None)

替换以下内容

  • DATABASE_NAME:您的 InfluxDB Clustered 数据库的名称
  • DATABASE_TOKEN:InfluxDB Clustered 数据库令牌,对指定的数据库具有写入权限

批量写入

批量写入对于高效的批量数据操作特别有用。选项包括设置批量大小、刷新间隔、重试间隔等等。

批量写入将多次写入分组到单个 InfluxDB 请求中。在批量模式下,客户端将记录或记录添加到批次中,然后安排批次写入 InfluxDB。客户端在达到 write_client_options.batch_sizewrite_client_options.flush_interval 后将批次写入 InfluxDB。如果写入失败,客户端将根据 write_client_options 重试选项重新安排写入。

配置写入客户端选项

使用 WriteOptionswrite_client_options 为客户端配置批量写入和响应处理

  1. 实例化 WriteOptions。要使用批量默认值,请调用不带指定参数的构造函数。
  2. 调用 write_client_options 并使用 write_options 参数指定上一步中的 WriteOptions 实例。指定回调参数(成功、错误和重试)以在成功或错误时调用函数。
  3. 实例化 InfluxDBClient3 并使用 write_client_options 参数指定上一步中的 dict 输出。
示例:使用批量默认值和回调初始化客户端

以下示例演示如何使用带有默认值的批量模式,并为响应状态(成功、错误或可重试错误)指定回调函数。

from influxdb_client_3 import(InfluxDBClient3,
                              write_client_options,
                              WriteOptions,
                              InfluxDBError)

status = None

# Define callbacks for write responses
def success(self, data: str):
    status = "Success writing batch: data: {data}"
    assert status.startswith('Success'), f"Expected {status} to be success"

def error(self, data: str, err: InfluxDBError):
    status = f"Error writing batch: config: {self}, data: {data}, error: {err}"
    assert status.startswith('Success'), f"Expected {status} to be success"


def retry(self, data: str, err: InfluxDBError):
    status = f"Retry error writing batch: config: {self}, data: {data}, error: {err}"
    assert status.startswith('Success'), f"Expected {status} to be success"

# Instantiate WriteOptions for batching
write_options = WriteOptions()
wco = write_client_options(success_callback=success,
                            error_callback=error,
                            retry_callback=retry,
                            write_options=write_options)

# Use the with...as statement to ensure the file is properly closed and resources
# are released.
with InfluxDBClient3(host=f"cluster-host.com",
                     database=f"
DATABASE_NAME
"
,
token=f"
DATABASE_TOKEN
"
,
write_client_options=wco) as client: client.write_file(file='./data/home-sensor-data.csv', timestamp_column='time', tag_columns=["room"], write_precision='s')

替换以下内容

  • DATABASE_NAME:您的 InfluxDB Clustered 数据库的名称
  • DATABASE_TOKEN:InfluxDB Clustered 数据库令牌,对指定的数据库具有写入权限

InfluxDBClient3 实例方法

InfluxDBClient3.write

将记录或记录列表写入 InfluxDB。

参数

  • record (record 或列表):要写入的记录或记录列表。记录可以是 Point 对象、表示点的字典、Line Protocol 字符串或 DataFrame
  • database (字符串):要写入的数据库。默认值是写入为客户端指定的数据库。
  • ****kwargs**: 其他写入选项——例如
    • write_precision (字符串):可选。默认值为 "ns"。指定 record 中时间戳的精度"ms""s""us""ns")。
    • write_client_options (字典):可选。指定批量写入模式的回调函数和选项。要生成 dict,请使用write_client_options 函数

示例:写入 Line Protocol 字符串

from influxdb_client_3 import InfluxDBClient3

point = "home,room=Living\\ Room temp=21.1,hum=35.9,co=0i 1641024000"

client = InfluxDBClient3(host=f"cluster-host.com",
                         database=f"
DATABASE_NAME
"
,
token=f"
DATABASE_TOKEN
"
)
client.write(record=point, write_precision="s")

以下示例代码执行 SQL 查询以检索点

# Execute an SQL query
table = client.query(query='''SELECT room
                            FROM home
                            WHERE temp=21.1
                              AND time=from_unixtime(1641024000)''')
# table is a pyarrow.Table
room = table[0][0]
assert f"{room}" == 'Living Room', f"Expected {room} to be Living Room"

替换以下内容

  • DATABASE_NAME:您的 InfluxDB Clustered 数据库的名称
  • DATABASE_TOKEN:InfluxDB Clustered 数据库令牌,对指定的数据库具有写入权限

示例:使用点写入数据

influxdb_client_3.Point 类提供了一个接口,用于构造测量的数据点并设置点的字段、标签和时间戳。以下示例演示如何创建 Point 对象,然后将数据写入 InfluxDB。

from influxdb_client_3 import Point, InfluxDBClient3

point = Point("home").tag("room", "Kitchen").field("temp", 21.5).field("hum", .25)
client = InfluxDBClient3(host=f"cluster-host.com",
                         database=f"
DATABASE_NAME
"
,
token=f"
DATABASE_TOKEN
"
)
client.write(point)

以下示例代码执行 InfluxQL 查询以检索写入的数据

# Execute an InfluxQL query
table = client.query(query='''SELECT DISTINCT(temp) as val
                              FROM home
                              WHERE temp > 21.0
                              AND time >= now() - 10m''', language="influxql")
# table is a pyarrow.Table
df = table.to_pandas()
assert 21.5 in df['val'].values, f"Expected value in {df['val']}"

替换以下内容

  • DATABASE_NAME:您的 InfluxDB Clustered 数据库的名称
  • DATABASE_TOKEN:InfluxDB Clustered 数据库令牌,对指定的数据库具有写入权限
示例:使用字典写入数据

InfluxDBClient3 可以将字典对象序列化为 Line Protocol。如果您将 dict 传递给 InfluxDBClient3.write,则客户端期望 dict 具有以下属性

  • measurement (字符串):测量名称
  • tags (字典):标签键值对的字典
  • fields (字典):字段键值对的字典
  • time:记录的时间戳

以下示例演示如何定义表示点的 dict,然后将数据写入 InfluxDB。

from influxdb_client_3 import InfluxDBClient3

# Using point dictionary structure
points = {
          "measurement": "home",
          "tags": {"room": "Kitchen", "sensor": "K001"},
          "fields": {"temp": 72.2, "hum": 36.9, "co": 4},
          "time": 1641067200
          }

client = InfluxDBClient3(host=f"cluster-host.com",
                         database=f"
DATABASE_NAME
"
,
token=f"
DATABASE_TOKEN
"
)
client.write(record=points, write_precision="s")

替换以下内容

  • DATABASE_NAME:您的 InfluxDB Clustered 数据库的名称
  • DATABASE_TOKEN:InfluxDB Clustered 数据库令牌,对指定的数据库具有写入权限

InfluxDBClient3.write_file

将文件中的数据写入 InfluxDB。执行是同步的。

参数

  • file (字符串):包含要写入 InfluxDB 的记录的文件的路径。文件名必须以以下支持的扩展名之一结尾。有关编码和格式化数据的更多信息,请参阅每种支持格式的文档

  • measurement_name (字符串):定义文件中记录的测量名称。指定的值优先于文件中的 measurementiox::measurement 列。如果未为该参数指定值,并且文件中存在 measurement 列,则 measurement 列值用于测量名称。如果未为该参数指定值,并且不存在 measurement 列,则 iox::measurement 列值用于测量名称。

  • tag_columns (列表):标签列名。未包含在列表中的列以及未被另一个参数指定的列被假定为字段。

  • timestamp_column (字符串):包含时间戳的列的名称。默认值为 'time'

  • database (str):要写入的数据库。默认值是写入为客户端指定的数据库。

  • file_parser_options (可调用对象):用于为文件解析器提供附加参数的函数。

  • **kwargs:传递给 WriteAPI 的其他选项——例如

    • write_precision (字符串):可选。默认值为 "ns"。指定 record 中时间戳的精度"ms""s""us""ns")。
    • write_client_options (字典):可选。指定批量写入模式的回调函数和选项。要生成 dict,请使用write_client_options 函数

示例:写入文件数据时使用批量选项

以下示例演示如何为批处理、重试和响应回调指定自定义写入选项,以及如何将 CSV 和 JSON 文件中的数据写入 InfluxDB

from influxdb_client_3 import(InfluxDBClient3, write_client_options,
                              WritePrecision, WriteOptions, InfluxDBError)

# Define the result object
result = {
    'config': None,
    'status': None,
    'data': None,
    'error': None
}

# Define callbacks for write responses
def success_callback(self, data: str):
    result['config'] = self
    result['status'] = 'success'
    result['data'] = data

    assert result['data'] != None, f"Expected {result['data']}"
    print("Successfully wrote data: {result['data']}")

def error_callback(self, data: str, exception: InfluxDBError):
    result['config'] = self
    result['status'] = 'error'
    result['data'] = data
    result['error'] = exception

    assert result['status'] == "success", f"Expected {result['error']} to be success for {result['config']}"

def retry_callback(self, data: str, exception: InfluxDBError):
    result['config'] = self
    result['status'] = 'retry_error'
    result['data'] = data
    result['error'] = exception

    assert result['status'] == "success", f"Expected {result['status']} to be success for {result['config']}"

write_options = WriteOptions(batch_size=500,
                            flush_interval=10_000,
                            jitter_interval=2_000,
                            retry_interval=5_000,
                            max_retries=5,
                            max_retry_delay=30_000,
                            exponential_base=2)


wco = write_client_options(success_callback=success_callback,
                          error_callback=error_callback,
                          retry_callback=retry_callback,
                          write_options=write_options)

with InfluxDBClient3(host=f"cluster-host.com",
                         database=f"
DATABASE_NAME
"
,
token=f"
DATABASE_TOKEN
"
,
write_client_options=wco) as client: client.write_file(file='./data/home-sensor-data.csv', timestamp_column='time', tag_columns=["room"], write_precision='s') client.write_file(file='./data/home-sensor-data.json', timestamp_column='time', tag_columns=["room"], write_precision='s')

替换以下内容

  • DATABASE_NAME:您的 InfluxDB Clustered 数据库的名称
  • DATABASE_TOKEN:InfluxDB Clustered 数据库令牌,对指定的数据库具有写入权限

InfluxDBClient3.query

发送 Flight 请求以执行指定的 SQL 或 InfluxQL 查询。将查询结果中的所有数据作为 Arrow 表返回(pyarrow.Table 实例)。

参数

  • query (字符串):要执行的 SQL 或 InfluxQL。
  • language (字符串):query 参数中使用的查询语言——"sql""influxql"。默认值为 "sql"
  • mode (字符串):指定从 pyarrow.flight.FlightStreamReader 返回的输出。默认值为 "all"
    • all:读取流的全部内容,并将其作为 pyarrow.Table 返回。
    • chunk:读取下一条消息(FlightStreamChunk),并返回 dataapp_metadata。如果没有更多消息,则返回 null
    • pandas:读取流的内容,并将其作为 pandas.DataFrame 返回。
    • reader:将 FlightStreamReader 转换为 pyarrow.RecordBatchReader
    • schema:返回流中所有记录批次的模式。
  • **kwargsFlightCallOptions

示例:使用 SQL 查询

from influxdb_client_3 import InfluxDBClient3

client = InfluxDBClient3(host=f"cluster-host.com",
                         database=f"
DATABASE_NAME
"
,
token=f"
DATABASE_TOKEN
"
)
table = client.query("SELECT * from home WHERE time >= now() - INTERVAL '90 days'") # Filter columns. print(table.select(['room', 'temp'])) # Use PyArrow to aggregate data. print(table.group_by('hum').aggregate([]))

在示例中,替换以下内容

  • DATABASE_NAME:您的 InfluxDB Clustered 数据库的名称
  • DATABASE_TOKEN:InfluxDB Clustered 数据库令牌,对指定的数据库具有读取权限

示例:使用 InfluxQL 查询

from influxdb_client_3 import InfluxDBClient3

client = InfluxDBClient3(host=f"cluster-host.com",
                         database=f"
DATABASE_NAME
"
,
token=f"
DATABASE_TOKEN
"
)
query = "SELECT * from home WHERE time >= -90d" table = client.query(query=query, language="influxql") # Filter columns. print(table.select(['room', 'temp']))
示例:从流中读取所有数据并返回 pandas DataFrame
from influxdb_client_3 import InfluxDBClient3

client = InfluxDBClient3(host=f"cluster-host.com",
                         database=f"
DATABASE_NAME
"
,
token=f"
DATABASE_TOKEN
"
)
query = "SELECT * from home WHERE time >= now() - INTERVAL '90 days'" pd = client.query(query=query, mode="pandas") # Print the pandas DataFrame formatted as a Markdown table. print(pd.to_markdown())
示例:查看流中所有批次的模式
from influxdb_client_3 import InfluxDBClient3

client = InfluxDBClient3(host=f"cluster-host.com",
                         database=f"
DATABASE_NAME
"
,
token=f"
DATABASE_TOKEN
"
)
table = client.query("""SELECT * from home WHERE time >= now() - INTERVAL '90 days'""") # View the table schema. print(table.schema)
示例:检索结果模式,但不检索数据
from influxdb_client_3 import InfluxDBClient3

client = InfluxDBClient3(host=f"cluster-host.com",
                         database=f"
DATABASE_NAME
"
,
token=f"
DATABASE_TOKEN
"
)
query = "SELECT * from home WHERE time >= now() - INTERVAL '90 days'" schema = client.query(query=query, mode="schema") print(schema)
指定超时

传递 timeout=<秒数> 以用于 FlightCallOptions 来使用自定义超时。

from influxdb_client_3 import InfluxDBClient3

client = InfluxDBClient3(host=f"cluster-host.com",
                         database=f"
DATABASE_NAME
"
,
token=f"
DATABASE_TOKEN
"
)
query = "SELECT * from home WHERE time >= now() - INTERVAL '90 days'" client.query(query=query, timeout=5)

InfluxDBClient3.close

将批次中所有剩余的记录发送到 InfluxDB,然后关闭底层的写入客户端和 Flight 客户端以释放资源。

示例:关闭客户端

from influxdb_client_3 import InfluxDBClient3

client = InfluxDBClient3(host=f"cluster-host.com",
                         database=f"
DATABASE_NAME
"
,
token=f"
DATABASE_TOKEN
"
)
client.close()

Point 类

提供用于构造测量的时序数据点,并设置字段、标签和时间戳的接口。

from influxdb_client_3 import Point
point = Point("home").tag("room", "Living Room").field("temp", 72)

请参阅如何使用点写入数据

WriteOptions 类

提供用于构造自定义批量写入行为的选项的接口,例如批量大小和重试。

from influxdb_client_3 import WriteOptions

write_options = WriteOptions(batch_size=500,
                             flush_interval=10_000,
                             jitter_interval=2_000,
                             retry_interval=5_000,
                             max_retries=5,
                             max_retry_delay=30_000,
                             exponential_base=2)

请参阅如何使用批量选项写入文件数据

参数

  • batch_size:默认值为 1000
  • flush_interval:默认值为 1000
  • jitter_interval:默认值为 0
  • retry_interval:默认值为 5000
  • max_retries:默认值为 5
  • max_retry_delay:默认值为 125000
  • max_retry_time:默认值为 180000
  • exponential_base:默认值为 2
  • max_close_wait:默认值为 300000
  • write_scheduler:默认值为 ThreadPoolScheduler(max_workers=1)

函数

Function write_client_options(**kwargs)

返回包含指定写入客户端选项的 dict

参数

该函数接受以下关键字参数

  • write_options (WriteOptions):指定客户端是使用同步模式还是批量模式写入数据。如果使用批量模式,则客户端使用指定的批量选项。
  • point_settings (字典):客户端在将数据写入 InfluxDB 时将添加到每个点的默认标签。
  • success_callback (可调用对象):如果使用批量模式,则在数据成功写入 InfluxDB 后(HTTP 状态 204)要调用的函数
  • error_callback (可调用对象):如果使用批量模式,则在数据未成功写入时(响应具有非 204 HTTP 状态)要调用的函数
  • retry_callback (可调用对象):如果使用批量模式,则在请求是重试(使用批量模式)且数据未成功写入时要调用的函数

示例:实例化批量写入的选项

from influxdb_client_3 import write_client_options, WriteOptions
from influxdb_client_3.write_client.client.write_api import WriteType

def success():
  print("Success")
def error():
  print("Error")
def retry():
  print("Retry error")

write_options = WriteOptions()
wco = write_client_options(success_callback=success,
                            error_callback=error,
                            retry_callback=retry,
                            write_options=write_options)

assert wco['success_callback']
assert wco['error_callback']
assert wco['retry_callback']
assert wco['write_options'].write_type == WriteType.batching

示例:实例化同步写入的选项

from influxdb_client_3 import write_client_options, SYNCHRONOUS
from influxdb_client_3.write_client.client.write_api import WriteType

wco = write_client_options(write_options=SYNCHRONOUS)

assert wco['write_options'].write_type == WriteType.synchronous

Function flight_client_options(**kwargs)

返回包含指定的 FlightClient 参数的 dict

参数

示例:指定根证书路径

from influxdb_client_3 import InfluxDBClient3, flight_client_options
import certifi

fh = open(certifi.where(), "r")
cert = fh.read()
fh.close()

client = InfluxDBClient3(host=f"cluster-host.com",
                         database=f"
DATABASE_NAME
"
,
token=f"
DATABASE_TOKEN
"
,
flight_client_options=flight_client_options(tls_root_certs=cert))

替换以下内容

  • DATABASE_NAME:您的 InfluxDB Clustered 数据库的名称
  • DATABASE_TOKEN:InfluxDB Clustered 数据库令牌,对指定的数据库具有读取权限

常量

  • influxdb_client_3.SYNCHRONOUS:表示同步写入模式
  • influxdb_client_3.WritePrecision:表示写入精度的枚举类

异常

  • influxdb_client_3.InfluxDBError:为 InfluxDB 相关错误引发的异常类

此页面是否对您有帮助?

感谢您的反馈!


Flux 的未来

Flux 即将进入维护模式。您可以继续像目前一样使用它,而无需对代码进行任何更改。

阅读更多

InfluxDB 3 开源版本现已发布公开 Alpha 版

InfluxDB 3 开源版本现已可用于 Alpha 测试,根据 MIT 或 Apache 2 许可获得许可。

我们正在发布两个产品作为 Alpha 版本的一部分。

InfluxDB 3 Core 是我们新的开源产品。它是用于时间序列和事件数据的最新数据引擎。InfluxDB 3 Enterprise 是一个商业版本,它建立在 Core 的基础上,增加了历史查询功能、读取副本、高可用性、可扩展性和细粒度的安全性。

有关如何开始使用的更多信息,请查看