Python Flight SQL DBAPI 客户端
Python flightsql-dbapi
Flight SQL DBAPI 库与 Python 应用程序集成,使用 SQL 查询存储在 InfluxDB 集群数据库中的数据。该 flightsql-dbapi
库使用 Flight SQL 协议 查询和检索数据。
使用 InfluxDB v3 客户端库
我们建议使用 influxdb3-python
Python 客户端库 将 InfluxDB v3 集成到您的 Python 应用程序代码中。
InfluxDB v3 客户端库 包装 Apache Arrow Flight 客户端,并提供方便的方法来写入、查询和处理存储在 InfluxDB 集群中的数据。客户端库可以使用 SQL 或 InfluxQL 进行查询。
安装
Python 的 flightsql-dbapi
Flight SQL 库提供 DB API 2 接口和 SQLAlchemy 方言,用于 Flight SQL。安装 flightsql-dbapi
还会安装您用于处理 Arrow 数据的 pyarrow
库。
在您的终端中,使用 pip
安装 flightsql-dbapi
pip install flightsql-dbapi
导入模块
flightsql-dbapi
包提供了 flightsql
模块。从模块中,导入 FlightSQLClient
类方法
from flightsql import FlightSQLClient
flightsql.FlightSQLClient
类:用于 初始化客户端 和与 Flight SQL 服务器交互的接口。
API 参考
类 FlightSQLClient
提供用于 初始化客户端 和与 Flight SQL 服务器交互的接口。
语法
__init__(self, host=None, token=None, metadata=None, features=None)
初始化并返回一个用于与服务器交互的 FlightSQLClient
实例。
初始化客户端
以下示例展示了如何使用 Python 和 flightsql-dbapi
以及 DB API 2 接口创建一个配置为 InfluxDB 数据库的 Flight SQL 客户端。
from flightsql import FlightSQLClient
# Instantiate a FlightSQLClient configured for a database
client = FlightSQLClient(host='cluster-host.com',
token='DATABASE_TOKEN',
metadata={'database': 'DATABASE_NAME'},
features={'metadata-reflection': 'true'})
替换以下内容
DATABASE_TOKEN
:您要查询的数据库上的读权限数据库令牌DATABASE_NAME
:您的 InfluxDB 集群数据库的名称
实例方法
FlightSQLClient.execute
发送一个 Flight SQL RPC 请求以执行指定的 SQL 查询。
语法
execute(query: str, call_options: Optional[FlightSQLCallOptions] = None)
示例
# Execute the query
info = client.execute("SELECT * FROM home")
响应包含一个包含元数据和 endpoints: [...]
列表的 flight.FlightInfo
对象。每个端点包含以下内容
- 可以检索查询结果数据的一组地址。
- 一个
ticket
值,用于 检索 数据。
FlightSQLClient.do_get
传递一个从 FlightSQLClient.execute
响应中获得的 Flight 票据,并检索由票据标识的 Arrow 数据。返回一个用于流式传输数据的 pyarrow.flight.FlightStreamReader
。
语法
do_get(ticket, call_options: Optional[FlightSQLCallOptions] = None)
示例
以下示例展示了如何使用 Python 和 flightsql-dbapi
以及 pyarrow
查询 InfluxDB 并检索数据。
from flightsql import FlightSQLClient
# Instantiate a FlightSQLClient configured for a database
client = FlightSQLClient(host='cluster-host.com',
token='DATABASE_TOKEN',
metadata={'database': 'DATABASE_NAME'},
features={'metadata-reflection': 'true'})
# Execute the query to retrieve FlightInfo
info = client.execute("SELECT * FROM home")
# Extract the token for retrieving data
ticket = info.endpoints[0].ticket
# Use the ticket to request the Arrow data stream.
# Return a FlightStreamReader for streaming the results.
reader = client.do_get(ticket)
# Read all data to a pyarrow.Table
table = reader.read_all()
print(table)
do_get(ticket)
返回一个 pyarrow.flight.FlightStreamReader
,用于流式传输 Arrow 记录批次。
要读取流中的数据,请调用以下 FlightStreamReader
方法之一
read_all()
:将所有记录批次作为pyarrow.Table
读取。read_chunk()
:读取下一个 RecordBatch 和元数据。read_pandas()
:读取所有记录批次并将它们转换为pandas.DataFrame
。
这个页面有帮助吗?
感谢您的反馈!