Python Flight 客户端
Apache Arrow Python 绑定 与 Python 脚本和应用程序集成,以查询存储在 InfluxDB 中的数据。
使用 InfluxDB v3 客户端库
我们建议使用 influxdb3-python
Python 客户端库 将 InfluxDB v3 与您的 Python 应用程序代码集成。
InfluxDB v3 客户端库 包装 Apache Arrow Flight 客户端,并提供方便的方法来 写入、查询 和处理存储在 InfluxDB Clustered 中的数据。客户端库可以使用 SQL 或 InfluxQL 进行查询。
以下示例演示如何使用 pyarrow.flight
和 pandas
Python 模块查询和格式化存储在 InfluxDB Clustered 数据库中的数据
# Using pyarrow>=12.0.0 FlightClient
from pyarrow.flight import FlightClient, Ticket, FlightCallOptions
import json
import pandas
import tabulate
# Downsampling query groups data into 2-hour bins
sql="""
SELECT DATE_BIN(INTERVAL '2 hours', time) AS time,
room,
selector_max(temp, time)['value'] AS 'max temp',
selector_min(temp, time)['value'] AS 'min temp',
avg(temp) AS 'average temp'
FROM home
GROUP BY
1,
room
ORDER BY room, 1"""
flight_ticket = Ticket(json.dumps({
"namespace_name": "DATABASE_NAME",
"sql_query": sql,
"query_type": "sql"
}))
token = (b"authorization", bytes(f"Bearer DATABASE_TOKEN".encode('utf-8')))
options = FlightCallOptions(headers=[token])
client = FlightClient(f"grpc+tls://cluster-host.com:443")
reader = client.do_get(flight_ticket, options)
arrow_table = reader.read_all()
# Use pyarrow and pandas to view and analyze data
data_frame = arrow_table.to_pandas()
print(data_frame.to_markdown())
# Using pyarrow>=12.0.0 FlightClient
from pyarrow.flight import FlightClient, Ticket, FlightCallOptions
import json
import pandas
import tabulate
# Downsampling query groups data into 2-hour bins
influxql="""
SELECT FIRST(temp)
FROM home
WHERE room = 'kitchen'
AND time >= now() - 100d
AND time <= now() - 10d
GROUP BY time(2h)"""
flight_ticket = Ticket(json.dumps({
"namespace_name": "DATABASE_NAME",
"sql_query": influxql,
"query_type": "influxql"
}))
token = (b"authorization", bytes(f"Bearer DATABASE_TOKEN".encode('utf-8')))
options = FlightCallOptions(headers=[token])
client = FlightClient(f"grpc+tls://cluster-host.com:443")
reader = client.do_get(flight_ticket, options)
arrow_table = reader.read_all()
# Use pyarrow and pandas to view and analyze data
data_frame = arrow_table.to_pandas()
print(data_frame.to_markdown())
替换以下
DATABASE_NAME
:您的 InfluxDB Clustered 数据库DATABASE_TOKEN
:具有对指定数据库足够权限的 数据库令牌
这个页面有帮助吗?
感谢您的反馈!