文档文档

Python Flight SQL DBAPI 客户端

Python flightsql-dbapi Flight SQL DBAPI 库使用 SQL 与 Python 应用程序集成,以查询存储在 InfluxDB 3 Core 数据库中的数据。flightsql-dbapi 库使用 Flight SQL 协议来查询和检索数据。

使用 InfluxDB 3 客户端库

我们建议使用 influxdb3-python Python 客户端库,以便将 InfluxDB 3 与您的 Python 应用程序代码集成。

InfluxDB 3 客户端库封装了 Apache Arrow Flight 客户端,并为 写入查询和处理存储在 InfluxDB 3 Core 中的数据提供了便捷的方法。客户端库可以使用 SQL 或 InfluxQL 进行查询。

安装

用于 Python 的 flightsql-dbapi Flight SQL 库为 Flight SQL 提供了 DB API 2 接口和 SQLAlchemy 方言。安装 flightsql-dbapi 还会安装 pyarrow 库,您将使用该库来处理 Arrow 数据。

在您的终端中,使用 pip 安装 flightsql-dbapi

pip install flightsql-dbapi

导入模块

flightsql-dbapi 包提供了 flightsql 模块。从该模块中,导入 FlightSQLClient 类方法

from flightsql import FlightSQLClient
  • flightsql.FlightSQLClient 类:用于初始化客户端和与 Flight SQL 服务器交互的接口。

API 参考

类 FlightSQLClient

提供用于初始化客户端和与 Flight SQL 服务器交互的接口。

语法

__init__(self, host=None, token=None, metadata=None, features=None)

初始化并返回用于与服务器交互的 FlightSQLClient 实例。

初始化客户端

以下示例展示了如何将 Python 与 flightsql-dbapi 和 *DB API 2* 接口结合使用,以实例化为 InfluxDB 数据库配置的 Flight SQL 客户端。

from flightsql import FlightSQLClient

# Instantiate a FlightSQLClient configured for a database
client = FlightSQLClient(host='localhost:8181',
                        token='
DATABASE_TOKEN
'
,
metadata={'database': '
DATABASE_NAME
'
},
features={'metadata-reflection': 'true'})

替换以下内容

  • DATABASE_TOKEN:InfluxDB 3 Core 数据库令牌,具有对您要查询的数据库的读取权限
  • DATABASE_NAME:您的 InfluxDB 3 Core 数据库的名称

实例方法

FlightSQLClient.execute

发送 Flight SQL RPC 请求以执行指定的 SQL 查询。

语法

execute(query: str, call_options: Optional[FlightSQLCallOptions] = None)

示例

# Execute the query
info = client.execute("SELECT * FROM home")

响应包含一个 flight.FlightInfo 对象,其中包含元数据和一个 endpoints: [...] 列表。每个端点包含以下内容

  • 您可以从中检索查询结果数据的地址列表。
  • 一个 ticket 值,用于标识要检索的数据。

FlightSQLClient.do_get

传递 Flight ticket(从 FlightSQLClient.execute 响应中获得),并检索由该 ticket 标识的 Arrow 数据。返回用于流式传输数据的 pyarrow.flight.FlightStreamReader

语法

 do_get(ticket, call_options: Optional[FlightSQLCallOptions] = None)

示例

以下示例展示了如何将 Python 与 flightsql-dbapipyarrow 结合使用,以查询 InfluxDB 并检索数据。

from flightsql import FlightSQLClient

# Instantiate a FlightSQLClient configured for a database
client = FlightSQLClient(host='localhost:8181',
    token='DATABASE_TOKEN',
    metadata={'database': 'DATABASE_NAME'},
    features={'metadata-reflection': 'true'})

# Execute the query to retrieve FlightInfo
info = client.execute("SELECT * FROM home")

# Extract the token for retrieving data
ticket = info.endpoints[0].ticket

# Use the ticket to request the Arrow data stream.
# Return a FlightStreamReader for streaming the results.
reader = client.do_get(ticket)

# Read all data to a pyarrow.Table
table = reader.read_all()

print(table)

do_get(ticket) 返回一个 pyarrow.flight.FlightStreamReader,用于流式传输 Arrow 记录批次

要从流中读取数据,请调用以下 FlightStreamReader 方法之一

  • read_all():将所有记录批次读取为 pyarrow.Table
  • read_chunk():读取下一个 RecordBatch 和元数据。
  • read_pandas():读取所有记录批次,并将它们转换为 pandas.DataFrame

此页内容是否有帮助?

感谢您的反馈!


Flux 的未来

Flux 即将进入维护模式。您可以继续像现在一样使用它,而无需对代码进行任何更改。

阅读更多

现已全面上市

InfluxDB 3 Core 和 Enterprise

快速启动。更快扩展。

获取更新

InfluxDB 3 Core 是一个开源、高速、近实时数据引擎,可实时收集和处理数据,并将其持久化到本地磁盘或对象存储。InfluxDB 3 Enterprise 构建在 Core 的基础上,增加了高可用性、读取副本、增强的安全性以及数据压缩功能,从而实现更快的查询和优化的存储。InfluxDB 3 Enterprise 的免费层可供非商业家庭或业余爱好者使用。

有关更多信息,请查看