InfluxDB v3的Python客户端库
InfluxDB v3 influxdb3-python
Python客户端库 将InfluxDB Clustered的写和查询操作与Python脚本和应用程序集成。
InfluxDB客户端库提供可配置的批量将数据写入InfluxDB Clustered。客户端库可用于构造行协议数据,将其他格式数据转换为行协议,并将行协议数据批量写入InfluxDB HTTP API。
InfluxDB v3客户端库可以使用SQL或InfluxQL查询InfluxDB Clustered。Python客户端库 influxdb3-python
将Apache Arrow的 pyarrow.flight
客户端封装在方便的InfluxDB v3接口中,用于执行SQL和InfluxQL查询,请求服务器元数据,以及使用gRPC通过Flight协议从InfluxDB Clustered检索数据。
此页面的代码示例使用入门家庭传感器示例数据。
安装
使用pip
安装客户端库和依赖项
pip install influxdb3-python
导入模块
influxdb3-python
客户端库包提供influxdb_client_3
模块。
导入模块
import influxdb_client_3
从模块导入特定类方法
from influxdb_client_3 import InfluxDBClient3, Point, WriteOptions
influxdb_client_3.InfluxDBClient3
:用于与InfluxDB交互的类influxdb_client_3.Point
:用于构造时间序列数据点的类influxdb_client_3.WriteOptions
:用于配置客户端写入选项的类。
API参考
influxdb_client_3
模块包含以下类和函数。
类
类InfluxDBClient3
提供与InfluxDB API交互的接口,用于写入和查询数据。
InfluxDBClient3
构造函数初始化并返回一个客户端实例,具有以下功能
- 一个单例 写入客户端,用于向数据库写入。
- 一个单例 航班客户端,用于查询数据库。
参数
host
(字符串):InfluxDB 实例的主机 URL。database
(字符串):用于写入和查询的数据库。token
(字符串):具有读写权限的数据库令牌。- 可选
write_client_options
(字典):用于写入 InfluxDB 的选项。如果为None
,则写入为同步。 - 可选
flight_client_options
(字典):用于查询 InfluxDB 的选项。
写入模式
写入数据时,客户端使用以下模式之一
同步写入
默认。在 InfluxDBClient3
初始化期间未提供 write_client_options
时,写入为同步。在同步模式下写入数据时,客户端立即尝试将提供的数据写入 InfluxDB,不会重试失败的请求,也不会调用响应回调。
示例:使用同步(非批量)默认值初始化客户端
以下示例初始化一个客户端,用于在 InfluxDB 集群数据库中写入和查询数据。由于未指定 write_client_options
,客户端使用默认的同步写入模式。
–>
from influxdb_client_3 import InfluxDBClient3
client = InfluxDBClient3(host=f"cluster-host.com",
database=f"DATABASE_NAME",
token=f"DATABASE_TOKEN")
替换以下内容
要显式指定同步模式,请使用 write_options=SYNCHRONOUS
创建客户端–例如
from influxdb_client_3 import InfluxDBClient3, write_client_options, SYNCHRONOUS
wco = write_client_options(write_options=SYNCHRONOUS)
client = InfluxDBClient3(host=f"cluster-host.com",
database=f"DATABASE_NAME",
token=f"DATABASE_TOKEN",
write_client_options=wco,
flight_client_options=None)
替换以下内容
批量写入
批量写入对于高效的批量数据操作特别有用。选项包括设置批量大小、刷新间隔、重试间隔等。
批量写入将多个写入操作组合成一个请求发送到 InfluxDB。在批量模式下,客户端将记录或记录添加到批处理中,然后安排批处理写入 InfluxDB。当达到 write_client_options.batch_size
或 write_client_options.flush_interval
时,客户端将批处理写入 InfluxDB。如果写入失败,客户端将根据 write_client_options
重试选项重新安排写入。
配置写入客户端选项
使用 WriteOptions
和 write_client_options
配置客户端的批量写入和响应处理
- 实例化
WriteOptions
。要使用批量默认值,请在构造函数中不指定参数。 - 调用
write_client_options
并使用write_options
参数指定前一步骤中创建的WriteOptions
实例。指定回调参数(成功、错误和重试)以在成功或错误时调用函数。 - 实例化
InfluxDBClient3
并使用write_client_options
参数指定前一步骤中创建的dict
输出。
示例:使用批量默认值和回调函数初始化客户端
以下示例展示了如何使用默认值批量模式和指定响应状态(成功、错误或可重试错误)的回调函数。
from influxdb_client_3 import(InfluxDBClient3,
write_client_options,
WriteOptions,
InfluxDBError)
status = None
# Define callbacks for write responses
def success(self, data: str):
status = "Success writing batch: data: {data}"
assert status.startswith('Success'), f"Expected {status} to be success"
def error(self, data: str, err: InfluxDBError):
status = f"Error writing batch: config: {self}, data: {data}, error: {err}"
assert status.startswith('Success'), f"Expected {status} to be success"
def retry(self, data: str, err: InfluxDBError):
status = f"Retry error writing batch: config: {self}, data: {data}, error: {err}"
assert status.startswith('Success'), f"Expected {status} to be success"
# Instantiate WriteOptions for batching
write_options = WriteOptions()
wco = write_client_options(success_callback=success,
error_callback=error,
retry_callback=retry,
write_options=write_options)
# Use the with...as statement to ensure the file is properly closed and resources
# are released.
with InfluxDBClient3(host=f"cluster-host.com",
database=f"DATABASE_NAME",
token=f"DATABASE_TOKEN",
write_client_options=wco) as client:
client.write_file(file='./data/home-sensor-data.csv',
timestamp_column='time', tag_columns=["room"], write_precision='s')
替换以下内容
InfluxDBClient3实例方法
InfluxDBClient3.write
将记录或记录列表写入 InfluxDB。
参数
record
(记录或列表):要写入的记录或记录列表。记录可以是Point
对象、表示点的字典、行协议字符串或DataFrame
。database
(字符串):要写入的数据库。默认是将数据写入客户端指定的数据库。- **
**kwargs
**: 其他写入选项 – 例如write_precision
(字符串):可选。默认是"ns"
。指定record
中时间戳的 精度("ms"
、"s"
、"us"
、"ns"
)。write_client_options
(字典):可选。指定 批量写入 模式下的回调函数和选项。要生成dict
,请使用write_client_options
函数。
示例:写入行协议字符串
替换以下内容
示例:使用 points 写入数据
influxdb_client_3.Point
类提供了为测量构建数据点并设置字段、标签和点时间戳的接口。以下示例显示了如何创建 Point
对象,然后将数据写入 InfluxDB。
from influxdb_client_3 import Point, InfluxDBClient3
point = Point("home").tag("room", "Kitchen").field("temp", 21.5).field("hum", .25)
client = InfluxDBClient3(host=f"cluster-host.com",
database=f"DATABASE_NAME",
token=f"DATABASE_TOKEN")
client.write(point)
以下示例代码执行 InfluxQL 查询以检索写入的数据
# Execute an InfluxQL query
table = client.query(query='''SELECT DISTINCT(temp) as val
FROM home
WHERE temp > 21.0
AND time >= now() - 10m''', language="influxql")
# table is a pyarrow.Table
df = table.to_pandas()
assert 21.5 in df['val'].values, f"Expected value in {df['val']}"
替换以下内容
示例:使用字典写入数据
InfluxDBClient3
可以将字典对象序列化为行协议。如果您将 dict
传递给 InfluxDBClient3.write
,客户端期望该 dict
具有以下 点 属性
- measurement (字符串):测量名称
- tags (字典):标签键值对的字典
- fields (字典):字段键值对的字典
- time:记录的 时间戳
以下示例显示了如何定义表示点的 dict
,然后将其数据写入 InfluxDB。
替换以下内容
InfluxDBClient3.write_file
将文件中的数据写入 InfluxDB。执行是同步的。
参数
file
(字符串):包含要写入 InfluxDB 的记录的文件的路径。文件名必须以以下支持的扩展名之一结尾。有关编码和格式化数据的更多信息,请参阅每个支持格式的文档measurement_name
(字符串):定义文件中记录的测量名称。指定的值优先于文件中的measurement
和iox::measurement
列。如果未指定参数值,并且文件中存在measurement
列,则使用measurement
列值作为测量名称。如果未指定参数值,并且不存在measurement
列,则使用iox::measurement
列值作为测量名称。tag_columns
(列表):标签列名称。列表中未包含的列以及未由其他参数指定的列被视为字段。timestamp_column
(字符串):包含时间戳的列的名称。默认为'time'
。database
(str
):要写入的数据库。默认为写入客户端指定的数据库。file_parser_options
(可调用对象):一个函数,用于向文件解析器提供额外的参数。**kwargs
:传递给WriteAPI
的额外选项 – 例如write_precision
(字符串):可选。默认是"ns"
。指定record
中时间戳的 精度("ms"
、"s"
、"us"
、"ns"
)。write_client_options
(字典):可选。指定 批量写入 模式下的回调函数和选项。要生成dict
,请使用write_client_options
函数。
示例:在写入文件数据时使用批处理选项
以下示例显示了如何指定自定义的批量、重试和响应回调写入选项,以及如何从 CSV 和 JSON 文件中写入数据到 InfluxDB。
from influxdb_client_3 import(InfluxDBClient3, write_client_options,
WritePrecision, WriteOptions, InfluxDBError)
# Define the result object
result = {
'config': None,
'status': None,
'data': None,
'error': None
}
# Define callbacks for write responses
def success_callback(self, data: str):
result['config'] = self
result['status'] = 'success'
result['data'] = data
assert result['data'] != None, f"Expected {result['data']}"
print("Successfully wrote data: {result['data']}")
def error_callback(self, data: str, exception: InfluxDBError):
result['config'] = self
result['status'] = 'error'
result['data'] = data
result['error'] = exception
assert result['status'] == "success", f"Expected {result['error']} to be success for {result['config']}"
def retry_callback(self, data: str, exception: InfluxDBError):
result['config'] = self
result['status'] = 'retry_error'
result['data'] = data
result['error'] = exception
assert result['status'] == "success", f"Expected {result['status']} to be success for {result['config']}"
write_options = WriteOptions(batch_size=500,
flush_interval=10_000,
jitter_interval=2_000,
retry_interval=5_000,
max_retries=5,
max_retry_delay=30_000,
exponential_base=2)
wco = write_client_options(success_callback=success_callback,
error_callback=error_callback,
retry_callback=retry_callback,
write_options=write_options)
with InfluxDBClient3(host=f"cluster-host.com",
database=f"DATABASE_NAME",
token=f"DATABASE_TOKEN",
write_client_options=wco) as client:
client.write_file(file='./data/home-sensor-data.csv', timestamp_column='time',
tag_columns=["room"], write_precision='s')
client.write_file(file='./data/home-sensor-data.json', timestamp_column='time',
tag_columns=["room"], write_precision='s')
替换以下内容
InfluxDBClient3.query
发送一个 Flight 请求以执行指定的 SQL 或 InfluxQL 查询。返回查询结果中的所有数据,作为 Arrow 表(pyarrow.Table
实例)。
参数
query
(字符串): 要执行的 SQL 或 InfluxQL。language
(字符串):query
参数中使用的查询语言 -"sql"
或"influxql"
。默认为"sql"
。mode
(字符串): 指定从pyarrow.flight.FlightStreamReader
返回的输出。默认为"all"
。all
: 读取流中的全部内容,并作为一个pyarrow.Table
返回。chunk
: 读取下一个消息(一个FlightStreamChunk
)并返回data
和app_metadata
。如果没有更多消息,则返回null
。pandas
: 读取流中的内容,并作为一个pandas.DataFrame
返回。reader
: 将FlightStreamReader
转换为pyarrow.RecordBatchReader
。schema
: 返回流中所有记录批次的模式。
**kwargs
:FlightCallOptions
示例:使用 SQL 查询
from influxdb_client_3 import InfluxDBClient3
client = InfluxDBClient3(host=f"cluster-host.com",
database=f"DATABASE_NAME",
token=f"DATABASE_TOKEN")
table = client.query("SELECT * from home WHERE time >= now() - INTERVAL '90 days'")
# Filter columns.
print(table.select(['room', 'temp']))
# Use PyArrow to aggregate data.
print(table.group_by('hum').aggregate([]))
在示例中,替换以下内容
示例:使用 InfluxQL 查询
from influxdb_client_3 import InfluxDBClient3
client = InfluxDBClient3(host=f"cluster-host.com",
database=f"DATABASE_NAME",
token=f"DATABASE_TOKEN")
query = "SELECT * from home WHERE time >= -90d"
table = client.query(query=query, language="influxql")
# Filter columns.
print(table.select(['room', 'temp']))
示例:读取流中的所有数据并返回一个 pandas DataFrame
from influxdb_client_3 import InfluxDBClient3
client = InfluxDBClient3(host=f"cluster-host.com",
database=f"DATABASE_NAME",
token=f"DATABASE_TOKEN")
query = "SELECT * from home WHERE time >= now() - INTERVAL '90 days'"
pd = client.query(query=query, mode="pandas")
# Print the pandas DataFrame formatted as a Markdown table.
print(pd.to_markdown())
示例:查看流中所有批次的模式
from influxdb_client_3 import InfluxDBClient3
client = InfluxDBClient3(host=f"cluster-host.com",
database=f"DATABASE_NAME",
token=f"DATABASE_TOKEN")
table = client.query("""SELECT *
from home
WHERE time >= now() - INTERVAL '90 days'""")
# View the table schema.
print(table.schema)
示例:检索结果模式且没有数据
from influxdb_client_3 import InfluxDBClient3
client = InfluxDBClient3(host=f"cluster-host.com",
database=f"DATABASE_NAME",
token=f"DATABASE_TOKEN")
query = "SELECT * from home WHERE time >= now() - INTERVAL '90 days'"
schema = client.query(query=query, mode="schema")
print(schema)
指定超时时间
为 FlightCallOptions
传递 timeout=<秒数>
以使用自定义超时。
from influxdb_client_3 import InfluxDBClient3
client = InfluxDBClient3(host=f"cluster-host.com",
database=f"DATABASE_NAME",
token=f"DATABASE_TOKEN")
query = "SELECT * from home WHERE time >= now() - INTERVAL '90 days'"
client.query(query=query, timeout=5)
InfluxDBClient3.close
将批次中的所有剩余记录发送到 InfluxDB,然后关闭底层的写入客户端和 Flight 客户端以释放资源。
示例:关闭客户端
from influxdb_client_3 import InfluxDBClient3
client = InfluxDBClient3(host=f"cluster-host.com",
database=f"DATABASE_NAME",
token=f"DATABASE_TOKEN")
client.close()
类Point
提供了一个接口来构建一个测量值的时间序列数据点,并设置字段、标签和时间戳。
from influxdb_client_3 import Point
point = Point("home").tag("room", "Living Room").field("temp", 72)
查看如何 使用点写入数据。
类WriteOptions
提供了一个接口来构建自定义批量写入行为的选项,例如批量大小和重试。
from influxdb_client_3 import WriteOptions
write_options = WriteOptions(batch_size=500,
flush_interval=10_000,
jitter_interval=2_000,
retry_interval=5_000,
max_retries=5,
max_retry_delay=30_000,
exponential_base=2)
查看如何 在写入数据时使用批量选项。
参数
batch_size
:默认为1000
。flush_interval
:默认为1000
。jitter_interval
:默认为0
。retry_interval
:默认为5000
。max_retries
:默认为5
。max_retry_delay
:默认为125000
。max_retry_time
:默认为180000
。exponential_base
:默认为2
。max_close_wait
:默认为300000
。write_scheduler
:默认为ThreadPoolScheduler(max_workers=1)
。
函数
函数write_client_options(**kwargs)
返回一个包含指定写入客户端选项的 dict
。
参数
该函数接受以下关键字参数
write_options
(WriteOptions
):指定客户端是否使用同步模式或批量模式写入数据。如果使用批量模式,客户端将使用指定的批量选项。point_settings
(dict):客户端在将数据写入 InfluxDB 时将添加到每个点的默认标签。success_callback
(可调用对象):如果使用批量模式,则在数据成功写入 InfluxDB(HTTP 状态204
)后调用的函数error_callback
(可调用对象):如果使用批处理模式,当数据写入不成功(响应HTTP状态非204
)时调用的函数retry_callback
(可调用对象):如果使用批处理模式,当请求是重试(使用批处理模式)且数据写入不成功时调用的函数
示例:实例化批处理写入的选项
from influxdb_client_3 import write_client_options, WriteOptions
from influxdb_client_3.write_client.client.write_api import WriteType
def success():
print("Success")
def error():
print("Error")
def retry():
print("Retry error")
write_options = WriteOptions()
wco = write_client_options(success_callback=success,
error_callback=error,
retry_callback=retry,
write_options=write_options)
assert wco['success_callback']
assert wco['error_callback']
assert wco['retry_callback']
assert wco['write_options'].write_type == WriteType.batching
示例:实例化同步写入的选项
from influxdb_client_3 import write_client_options, SYNCHRONOUS
from influxdb_client_3.write_client.client.write_api import WriteType
wco = write_client_options(write_options=SYNCHRONOUS)
assert wco['write_options'].write_type == WriteType.synchronous
函数flight_client_options(**kwargs)
返回一个包含指定 FlightClient 参数的 dict
。
参数
kwargs
:用于pyarrow.flight.FlightClient
参数的关键字参数
示例:指定根证书路径
from influxdb_client_3 import InfluxDBClient3, flight_client_options
import certifi
fh = open(certifi.where(), "r")
cert = fh.read()
fh.close()
client = InfluxDBClient3(host=f"cluster-host.com",
database=f"DATABASE_NAME",
token=f"DATABASE_TOKEN",
fco=flight_client_options(tls_root_certs=cert))
替换以下内容
常量
influxdb_client_3.SYNCHRONOUS
:表示同步写入模式influxdb_client_3.WritePrecision
:表示写入精度的枚举类
异常
influxdb_client_3.InfluxDBError
:引发与 InfluxDB 相关错误的异常类
这个页面有用吗?
感谢您的反馈!