文档文档

使用 PyArrow 库分析数据

使用 PyArrow 读取和分析来自 InfluxDB Clustered 的查询结果。PyArrow 库提供 Arrow 格式数据的高效计算、聚合、序列化和转换。

Apache Arrow 是一个用于内存分析的开发平台。它包含一组技术,使大数据系统能够快速存储、处理和移动数据。

Arrow Python 绑定(也称为“PyArrow”)与 NumPy、pandas 和内置 Python 对象具有一流的集成。它们基于 Arrow 的 C++ 实现。

安装先决条件

本指南中的示例假设使用 Python 虚拟环境和 InfluxDB 3 influxdb3-python Python 客户端库。有关更多信息,请参阅如何开始使用 Python 查询 InfluxDB

安装 influxdb3-python 还会安装 pyarrow 库,该库为 Apache Arrow 提供 Python 绑定。

使用 PyArrow 读取查询结果

以下示例展示了如何使用 influxdb3-pythonpyarrow 查询 InfluxDB,并将 Arrow 数据作为 PyArrow Table 查看。

  1. 在您的编辑器中,复制并粘贴以下示例代码到一个新文件——例如,pyarrow-example.py

    # pyarrow-example.py
    
    from influxdb_client_3 import InfluxDBClient3
    import pandas
    
    def querySQL():
      
      # Instantiate an InfluxDB client configured for a database
      client = InfluxDBClient3(
        "https://cluster-host.com",
        database="
    DATABASE_NAME
    "
    ,
    token="
    DATABASE_TOKEN
    "
    )
    # Execute the query to retrieve all record batches in the stream formatted as a PyArrow Table. table = client.query( '''SELECT * FROM home WHERE time >= now() - INTERVAL '90 days' ORDER BY time''' ) client.close() print(querySQL())
  2. 替换以下配置值

    • DATABASE_TOKEN:一个数据库令牌,具有对您要查询的数据库的读取权限
    • DATABASE_NAME:要查询的数据库的名称
  3. 在您的终端中,使用 Python 解释器运行该文件

    python pyarrow-example.py
    

InfluxDBClient3.query() 方法发送查询请求,然后返回一个 pyarrow.Table,其中包含来自响应流的所有 Arrow 记录批次。

接下来,使用 PyArrow 分析数据

使用 PyArrow 分析数据

分组和聚合数据

使用 pyarrow.Table,您可以将列中的值用作分组的

以下示例展示了如何查询 InfluxDB,然后使用 PyArrow 对表数据进行分组,并计算每个组的聚合值

# pyarrow-example.py

from influxdb_client_3 import InfluxDBClient3
import pandas

def querySQL():
  
  # Instantiate an InfluxDB client configured for a database
  client = InfluxDBClient3(
    "https://cluster-host.com",
    database="
DATABASE_NAME
"
,
token="
DATABASE_TOKEN
"
)
# Execute the query to retrieve data # formatted as a PyArrow Table table = client.query( '''SELECT * FROM home WHERE time >= now() - INTERVAL '90 days' ORDER BY time''' ) client.close() return table table = querySQL() # Use PyArrow to aggregate data print(table.group_by('room').aggregate([('temp', 'mean')]))

替换以下内容

  • DATABASE_TOKEN:一个数据库令牌,具有对您要查询的数据库的读取权限
  • DATABASE_NAME:要查询的数据库的名称

查看示例结果

有关更多详细信息和示例,请参阅 PyArrow 文档Apache Arrow Python Cookbook


此页是否对您有帮助?

感谢您的反馈!


Flux 的未来

Flux 即将进入维护模式。您可以继续像目前一样使用它,而无需对代码进行任何更改。

阅读更多

InfluxDB 3 开源版本现已公开发布 Alpha 版

InfluxDB 3 开源版本现已可用于 Alpha 测试,根据 MIT 或 Apache 2 许可获得许可。

我们正在发布两个产品作为 Alpha 版本的一部分。

InfluxDB 3 Core 是我们新的开源产品。它是一个用于时间序列和事件数据的最新数据引擎。InfluxDB 3 Enterprise 是一个商业版本,它建立在 Core 的基础上,增加了历史查询功能、读取副本、高可用性、可扩展性和细粒度安全性。

有关如何开始使用的更多信息,请查看