文档文档

Python Flight 客户端

Apache Arrow Python 绑定与 Python 脚本和应用程序集成,以查询存储在 InfluxDB 中的数据。

使用 InfluxDB 3 客户端库

我们建议使用 influxdb3-python Python 客户端库,以便将 InfluxDB 3 与您的 Python 应用程序代码集成。

InfluxDB 3 客户端库 封装了 Apache Arrow Flight 客户端,并为写入查询和处理存储在 InfluxDB Clustered 中的数据提供了便捷的方法。客户端库可以使用 SQL 或 InfluxQL 进行查询。

以下示例展示了如何使用 pyarrow.flightpandas Python 模块来查询和格式化存储在 InfluxDB Clustered 数据库中的数据

# Using pyarrow>=12.0.0 FlightClient
from pyarrow.flight import FlightClient, Ticket, FlightCallOptions 
import json
import pandas
import tabulate

# Downsampling query groups data into 2-hour bins
sql="""
  SELECT DATE_BIN(INTERVAL '2 hours', time) AS time,
    room,
    selector_max(temp, time)['value'] AS 'max temp',
    selector_min(temp, time)['value'] AS 'min temp',
    avg(temp) AS 'average temp'
  FROM home
  GROUP BY
    1,
    room
  ORDER BY room, 1"""
  
flight_ticket = Ticket(json.dumps({
  "namespace_name": "
DATABASE_NAME
"
,
"sql_query": sql, "query_type": "sql" })) token = (b"authorization", bytes(f"Bearer
DATABASE_TOKEN
"
.encode('utf-8')))
options = FlightCallOptions(headers=[token]) client = FlightClient(f"grpc+tls://cluster-host.com:443") reader = client.do_get(flight_ticket, options) arrow_table = reader.read_all() # Use pyarrow and pandas to view and analyze data data_frame = arrow_table.to_pandas() print(data_frame.to_markdown())
# Using pyarrow>=12.0.0 FlightClient
from pyarrow.flight import FlightClient, Ticket, FlightCallOptions 
import json
import pandas
import tabulate

# Downsampling query groups data into 2-hour bins
influxql="""
  SELECT FIRST(temp)
  FROM home 
  WHERE room = 'kitchen'
    AND time >= now() - 100d
    AND time <= now() - 10d
  GROUP BY time(2h)"""
  
flight_ticket = Ticket(json.dumps({
  "namespace_name": "
DATABASE_NAME
"
,
"sql_query": influxql, "query_type": "influxql" })) token = (b"authorization", bytes(f"Bearer
DATABASE_TOKEN
"
.encode('utf-8')))
options = FlightCallOptions(headers=[token]) client = FlightClient(f"grpc+tls://cluster-host.com:443") reader = client.do_get(flight_ticket, options) arrow_table = reader.read_all() # Use pyarrow and pandas to view and analyze data data_frame = arrow_table.to_pandas() print(data_frame.to_markdown())

替换以下内容

  • DATABASE_NAME:您的 InfluxDB Clustered 数据库
  • DATABASE_TOKEN:具有足够权限访问指定数据库的数据库令牌

此页内容是否对您有帮助?

感谢您的反馈!


Flux 的未来

Flux 即将进入维护模式。您可以继续像目前一样使用它,而无需对您的代码进行任何更改。

阅读更多

InfluxDB 3 开源版本现已公开发布 Alpha 版

InfluxDB 3 开源版本现已可用于 Alpha 测试,根据 MIT 或 Apache 2 许可授权。

我们正在发布两个产品作为 Alpha 版本的一部分。

InfluxDB 3 Core 是我们新的开源产品。它是一个用于时间序列和事件数据的最新数据引擎。InfluxDB 3 Enterprise 是一个商业版本,它建立在 Core 的基础上,增加了历史查询功能、读取副本、高可用性、可扩展性和细粒度的安全性。

有关如何开始使用的更多信息,请查看