InfluxDB 3 的 Python 客户端库
InfluxDB 3 influxdb3-python
Python 客户端库 将 InfluxDB Clustered 的写入和查询操作与 Python 脚本和应用程序集成。
InfluxDB 客户端库提供可配置的批量写入数据到 InfluxDB Clustered 的功能。客户端库可用于构建 Line Protocol 数据,将其他格式的数据转换为 Line Protocol,并将 Line Protocol 数据批量写入 InfluxDB HTTP API。
InfluxDB 3 客户端库可以使用 SQL 或 InfluxQL 查询 InfluxDB Clustered。influxdb3-python
Python 客户端库将 Apache Arrow pyarrow.flight
客户端封装在一个方便的 InfluxDB 3 接口中,用于执行 SQL 和 InfluxQL 查询、请求服务器元数据,以及使用 Flight 协议和 gRPC 从 InfluxDB Clustered 检索数据。
本页面的代码示例使用了入门家庭传感器示例数据。
安装
使用 pip
安装客户端库和依赖项
pip install influxdb3-python
导入模块
influxdb3-python
客户端库软件包提供了 influxdb_client_3
模块。
导入模块
import influxdb_client_3
从模块导入特定类方法
from influxdb_client_3 import InfluxDBClient3, Point, WriteOptions
influxdb_client_3.InfluxDBClient3
:用于与 InfluxDB 交互的类influxdb_client_3.Point
:用于构造时间序列数据点的类influxdb_client_3.WriteOptions
:用于配置客户端写入选项的类。
API 参考
influxdb_client_3
模块包含以下类和函数。
类
InfluxDBClient3 类
提供用于与 InfluxDB API 交互以进行数据写入和查询的接口。
InfluxDBClient3
构造函数初始化并返回一个客户端实例,其中包含以下内容
- 配置为写入数据库的单例写入客户端。
- 配置为查询数据库的单例 Flight 客户端。
参数
host
(字符串):InfluxDB 实例的主机 URL。database
(字符串):用于写入和查询的数据库。token
(字符串):具有读/写权限的数据库令牌。- 可选
write_client_options
(字典):写入 InfluxDB 时使用的选项。如果为None
,则写入是同步的。 - 可选
flight_client_options
(字典):查询 InfluxDB 时使用的选项。
写入模式
写入数据时,客户端使用以下模式之一
同步写入
默认值。当在初始化 InfluxDBClient3
期间未提供 write_client_options
时,写入是同步的。在同步模式下写入数据时,客户端立即尝试将提供的数据写入 InfluxDB,不重试失败的请求,也不调用响应回调。
示例:使用同步(非批量)默认值初始化客户端
以下示例初始化一个客户端,用于在 InfluxDB Clustered 数据库中写入和查询数据。鉴于未指定 write_client_options
,客户端使用默认的同步写入模式。
–>
from influxdb_client_3 import InfluxDBClient3
client = InfluxDBClient3(host=f"cluster-host.com",
database=f"DATABASE_NAME",
token=f"DATABASE_TOKEN")
替换以下内容
要显式指定同步模式,请使用 write_options=SYNCHRONOUS
创建客户端——例如
from influxdb_client_3 import InfluxDBClient3, write_client_options, SYNCHRONOUS
wco = write_client_options(write_options=SYNCHRONOUS)
client = InfluxDBClient3(host=f"cluster-host.com",
database=f"DATABASE_NAME",
token=f"DATABASE_TOKEN",
write_client_options=wco,
flight_client_options=None)
替换以下内容
批量写入
批量写入对于高效的批量数据操作特别有用。选项包括设置批量大小、刷新间隔、重试间隔等等。
批量写入将多次写入分组到单个 InfluxDB 请求中。在批量模式下,客户端将记录或记录添加到批次中,然后安排批次写入 InfluxDB。客户端在达到 write_client_options.batch_size
或 write_client_options.flush_interval
后将批次写入 InfluxDB。如果写入失败,客户端将根据 write_client_options
重试选项重新安排写入。
配置写入客户端选项
使用 WriteOptions
和 write_client_options
为客户端配置批量写入和响应处理
- 实例化
WriteOptions
。要使用批量默认值,请调用不带指定参数的构造函数。 - 调用
write_client_options
并使用write_options
参数指定上一步中的WriteOptions
实例。指定回调参数(成功、错误和重试)以在成功或错误时调用函数。 - 实例化
InfluxDBClient3
并使用write_client_options
参数指定上一步中的dict
输出。
示例:使用批量默认值和回调初始化客户端
以下示例演示如何使用带有默认值的批量模式,并为响应状态(成功、错误或可重试错误)指定回调函数。
from influxdb_client_3 import(InfluxDBClient3,
write_client_options,
WriteOptions,
InfluxDBError)
status = None
# Define callbacks for write responses
def success(self, data: str):
status = "Success writing batch: data: {data}"
assert status.startswith('Success'), f"Expected {status} to be success"
def error(self, data: str, err: InfluxDBError):
status = f"Error writing batch: config: {self}, data: {data}, error: {err}"
assert status.startswith('Success'), f"Expected {status} to be success"
def retry(self, data: str, err: InfluxDBError):
status = f"Retry error writing batch: config: {self}, data: {data}, error: {err}"
assert status.startswith('Success'), f"Expected {status} to be success"
# Instantiate WriteOptions for batching
write_options = WriteOptions()
wco = write_client_options(success_callback=success,
error_callback=error,
retry_callback=retry,
write_options=write_options)
# Use the with...as statement to ensure the file is properly closed and resources
# are released.
with InfluxDBClient3(host=f"cluster-host.com",
database=f"DATABASE_NAME",
token=f"DATABASE_TOKEN",
write_client_options=wco) as client:
client.write_file(file='./data/home-sensor-data.csv',
timestamp_column='time', tag_columns=["room"], write_precision='s')
替换以下内容
InfluxDBClient3 实例方法
InfluxDBClient3.write
将记录或记录列表写入 InfluxDB。
参数
record
(record 或列表):要写入的记录或记录列表。记录可以是Point
对象、表示点的字典、Line Protocol 字符串或DataFrame
。database
(字符串):要写入的数据库。默认值是写入为客户端指定的数据库。- **
**kwargs
**: 其他写入选项——例如write_precision
(字符串):可选。默认值为"ns"
。指定record
中时间戳的精度("ms"
、"s"
、"us"
、"ns"
)。write_client_options
(字典):可选。指定批量写入模式的回调函数和选项。要生成dict
,请使用write_client_options
函数。
示例:写入 Line Protocol 字符串
替换以下内容
示例:使用点写入数据
influxdb_client_3.Point
类提供了一个接口,用于构造测量的数据点并设置点的字段、标签和时间戳。以下示例演示如何创建 Point
对象,然后将数据写入 InfluxDB。
from influxdb_client_3 import Point, InfluxDBClient3
point = Point("home").tag("room", "Kitchen").field("temp", 21.5).field("hum", .25)
client = InfluxDBClient3(host=f"cluster-host.com",
database=f"DATABASE_NAME",
token=f"DATABASE_TOKEN")
client.write(point)
以下示例代码执行 InfluxQL 查询以检索写入的数据
# Execute an InfluxQL query
table = client.query(query='''SELECT DISTINCT(temp) as val
FROM home
WHERE temp > 21.0
AND time >= now() - 10m''', language="influxql")
# table is a pyarrow.Table
df = table.to_pandas()
assert 21.5 in df['val'].values, f"Expected value in {df['val']}"
替换以下内容
示例:使用字典写入数据
InfluxDBClient3
可以将字典对象序列化为 Line Protocol。如果您将 dict
传递给 InfluxDBClient3.write
,则客户端期望 dict
具有以下点属性
- measurement (字符串):测量名称
- tags (字典):标签键值对的字典
- fields (字典):字段键值对的字典
- time:记录的时间戳
以下示例演示如何定义表示点的 dict
,然后将数据写入 InfluxDB。
替换以下内容
InfluxDBClient3.write_file
将文件中的数据写入 InfluxDB。执行是同步的。
参数
file
(字符串):包含要写入 InfluxDB 的记录的文件的路径。文件名必须以以下支持的扩展名之一结尾。有关编码和格式化数据的更多信息,请参阅每种支持格式的文档measurement_name
(字符串):定义文件中记录的测量名称。指定的值优先于文件中的measurement
和iox::measurement
列。如果未为该参数指定值,并且文件中存在measurement
列,则measurement
列值用于测量名称。如果未为该参数指定值,并且不存在measurement
列,则iox::measurement
列值用于测量名称。tag_columns
(列表):标签列名。未包含在列表中的列以及未被另一个参数指定的列被假定为字段。timestamp_column
(字符串):包含时间戳的列的名称。默认值为'time'
。database
(str
):要写入的数据库。默认值是写入为客户端指定的数据库。file_parser_options
(可调用对象):用于为文件解析器提供附加参数的函数。**kwargs
:传递给WriteAPI
的其他选项——例如write_precision
(字符串):可选。默认值为"ns"
。指定record
中时间戳的精度("ms"
、"s"
、"us"
、"ns"
)。write_client_options
(字典):可选。指定批量写入模式的回调函数和选项。要生成dict
,请使用write_client_options
函数。
示例:写入文件数据时使用批量选项
以下示例演示如何为批处理、重试和响应回调指定自定义写入选项,以及如何将 CSV 和 JSON 文件中的数据写入 InfluxDB
from influxdb_client_3 import(InfluxDBClient3, write_client_options,
WritePrecision, WriteOptions, InfluxDBError)
# Define the result object
result = {
'config': None,
'status': None,
'data': None,
'error': None
}
# Define callbacks for write responses
def success_callback(self, data: str):
result['config'] = self
result['status'] = 'success'
result['data'] = data
assert result['data'] != None, f"Expected {result['data']}"
print("Successfully wrote data: {result['data']}")
def error_callback(self, data: str, exception: InfluxDBError):
result['config'] = self
result['status'] = 'error'
result['data'] = data
result['error'] = exception
assert result['status'] == "success", f"Expected {result['error']} to be success for {result['config']}"
def retry_callback(self, data: str, exception: InfluxDBError):
result['config'] = self
result['status'] = 'retry_error'
result['data'] = data
result['error'] = exception
assert result['status'] == "success", f"Expected {result['status']} to be success for {result['config']}"
write_options = WriteOptions(batch_size=500,
flush_interval=10_000,
jitter_interval=2_000,
retry_interval=5_000,
max_retries=5,
max_retry_delay=30_000,
exponential_base=2)
wco = write_client_options(success_callback=success_callback,
error_callback=error_callback,
retry_callback=retry_callback,
write_options=write_options)
with InfluxDBClient3(host=f"cluster-host.com",
database=f"DATABASE_NAME",
token=f"DATABASE_TOKEN",
write_client_options=wco) as client:
client.write_file(file='./data/home-sensor-data.csv', timestamp_column='time',
tag_columns=["room"], write_precision='s')
client.write_file(file='./data/home-sensor-data.json', timestamp_column='time',
tag_columns=["room"], write_precision='s')
替换以下内容
InfluxDBClient3.query
发送 Flight 请求以执行指定的 SQL 或 InfluxQL 查询。将查询结果中的所有数据作为 Arrow 表返回(pyarrow.Table
实例)。
参数
query
(字符串):要执行的 SQL 或 InfluxQL。language
(字符串):query
参数中使用的查询语言——"sql"
或"influxql"
。默认值为"sql"
。mode
(字符串):指定从pyarrow.flight.FlightStreamReader
返回的输出。默认值为"all"
。all
:读取流的全部内容,并将其作为pyarrow.Table
返回。chunk
:读取下一条消息(FlightStreamChunk
),并返回data
和app_metadata
。如果没有更多消息,则返回null
。pandas
:读取流的内容,并将其作为pandas.DataFrame
返回。reader
:将FlightStreamReader
转换为pyarrow.RecordBatchReader
。schema
:返回流中所有记录批次的模式。
**kwargs
:FlightCallOptions
示例:使用 SQL 查询
from influxdb_client_3 import InfluxDBClient3
client = InfluxDBClient3(host=f"cluster-host.com",
database=f"DATABASE_NAME",
token=f"DATABASE_TOKEN")
table = client.query("SELECT * from home WHERE time >= now() - INTERVAL '90 days'")
# Filter columns.
print(table.select(['room', 'temp']))
# Use PyArrow to aggregate data.
print(table.group_by('hum').aggregate([]))
在示例中,替换以下内容
示例:使用 InfluxQL 查询
from influxdb_client_3 import InfluxDBClient3
client = InfluxDBClient3(host=f"cluster-host.com",
database=f"DATABASE_NAME",
token=f"DATABASE_TOKEN")
query = "SELECT * from home WHERE time >= -90d"
table = client.query(query=query, language="influxql")
# Filter columns.
print(table.select(['room', 'temp']))
示例:从流中读取所有数据并返回 pandas DataFrame
from influxdb_client_3 import InfluxDBClient3
client = InfluxDBClient3(host=f"cluster-host.com",
database=f"DATABASE_NAME",
token=f"DATABASE_TOKEN")
query = "SELECT * from home WHERE time >= now() - INTERVAL '90 days'"
pd = client.query(query=query, mode="pandas")
# Print the pandas DataFrame formatted as a Markdown table.
print(pd.to_markdown())
示例:查看流中所有批次的模式
from influxdb_client_3 import InfluxDBClient3
client = InfluxDBClient3(host=f"cluster-host.com",
database=f"DATABASE_NAME",
token=f"DATABASE_TOKEN")
table = client.query("""SELECT *
from home
WHERE time >= now() - INTERVAL '90 days'""")
# View the table schema.
print(table.schema)
示例:检索结果模式,但不检索数据
from influxdb_client_3 import InfluxDBClient3
client = InfluxDBClient3(host=f"cluster-host.com",
database=f"DATABASE_NAME",
token=f"DATABASE_TOKEN")
query = "SELECT * from home WHERE time >= now() - INTERVAL '90 days'"
schema = client.query(query=query, mode="schema")
print(schema)
指定超时
传递 timeout=<秒数>
以用于 FlightCallOptions
来使用自定义超时。
from influxdb_client_3 import InfluxDBClient3
client = InfluxDBClient3(host=f"cluster-host.com",
database=f"DATABASE_NAME",
token=f"DATABASE_TOKEN")
query = "SELECT * from home WHERE time >= now() - INTERVAL '90 days'"
client.query(query=query, timeout=5)
InfluxDBClient3.close
将批次中所有剩余的记录发送到 InfluxDB,然后关闭底层的写入客户端和 Flight 客户端以释放资源。
示例:关闭客户端
from influxdb_client_3 import InfluxDBClient3
client = InfluxDBClient3(host=f"cluster-host.com",
database=f"DATABASE_NAME",
token=f"DATABASE_TOKEN")
client.close()
Point 类
提供用于构造测量的时序数据点,并设置字段、标签和时间戳的接口。
from influxdb_client_3 import Point
point = Point("home").tag("room", "Living Room").field("temp", 72)
请参阅如何使用点写入数据。
WriteOptions 类
提供用于构造自定义批量写入行为的选项的接口,例如批量大小和重试。
from influxdb_client_3 import WriteOptions
write_options = WriteOptions(batch_size=500,
flush_interval=10_000,
jitter_interval=2_000,
retry_interval=5_000,
max_retries=5,
max_retry_delay=30_000,
exponential_base=2)
请参阅如何使用批量选项写入文件数据。
参数
batch_size
:默认值为1000
。flush_interval
:默认值为1000
。jitter_interval
:默认值为0
。retry_interval
:默认值为5000
。max_retries
:默认值为5
。max_retry_delay
:默认值为125000
。max_retry_time
:默认值为180000
。exponential_base
:默认值为2
。max_close_wait
:默认值为300000
。write_scheduler
:默认值为ThreadPoolScheduler(max_workers=1)
。
函数
Function write_client_options(**kwargs)
返回包含指定写入客户端选项的 dict
。
参数
该函数接受以下关键字参数
write_options
(WriteOptions
):指定客户端是使用同步模式还是批量模式写入数据。如果使用批量模式,则客户端使用指定的批量选项。point_settings
(字典):客户端在将数据写入 InfluxDB 时将添加到每个点的默认标签。success_callback
(可调用对象):如果使用批量模式,则在数据成功写入 InfluxDB 后(HTTP 状态204
)要调用的函数error_callback
(可调用对象):如果使用批量模式,则在数据未成功写入时(响应具有非204
HTTP 状态)要调用的函数retry_callback
(可调用对象):如果使用批量模式,则在请求是重试(使用批量模式)且数据未成功写入时要调用的函数
示例:实例化批量写入的选项
from influxdb_client_3 import write_client_options, WriteOptions
from influxdb_client_3.write_client.client.write_api import WriteType
def success():
print("Success")
def error():
print("Error")
def retry():
print("Retry error")
write_options = WriteOptions()
wco = write_client_options(success_callback=success,
error_callback=error,
retry_callback=retry,
write_options=write_options)
assert wco['success_callback']
assert wco['error_callback']
assert wco['retry_callback']
assert wco['write_options'].write_type == WriteType.batching
示例:实例化同步写入的选项
from influxdb_client_3 import write_client_options, SYNCHRONOUS
from influxdb_client_3.write_client.client.write_api import WriteType
wco = write_client_options(write_options=SYNCHRONOUS)
assert wco['write_options'].write_type == WriteType.synchronous
Function flight_client_options(**kwargs)
返回包含指定的 FlightClient 参数的 dict
。
参数
kwargs
:pyarrow.flight.FlightClient
参数的关键字参数
示例:指定根证书路径
from influxdb_client_3 import InfluxDBClient3, flight_client_options
import certifi
fh = open(certifi.where(), "r")
cert = fh.read()
fh.close()
client = InfluxDBClient3(host=f"cluster-host.com",
database=f"DATABASE_NAME",
token=f"DATABASE_TOKEN",
flight_client_options=flight_client_options(tls_root_certs=cert))
替换以下内容
常量
influxdb_client_3.SYNCHRONOUS
:表示同步写入模式influxdb_client_3.WritePrecision
:表示写入精度的枚举类
异常
influxdb_client_3.InfluxDBError
:为 InfluxDB 相关错误引发的异常类
此页面是否对您有帮助?
感谢您的反馈!
支持和反馈
感谢您成为我们社区的一份子!我们欢迎并鼓励您提供关于 InfluxDB Clustered 和本文档的反馈和错误报告。要获得支持,请使用以下资源
拥有年度合同或支持合同的客户 可以联系 InfluxData 支持。