Documentation

Go Flight 客户端

Go 的 Apache Arrow 集成了 Go 脚本和应用程序,用于查询存储在 InfluxDB 中的数据。

使用 InfluxDB 3 客户端库

我们建议使用 influxdb3-go Go 客户端库 将 InfluxDB 3 与您的 Go 应用程序代码集成。

InfluxDB 3 客户端库 封装了 Apache Arrow Flight 客户端,并为 写入查询 和处理存储在 InfluxDB 集群中的数据提供了便捷的方法。客户端库可以使用 SQL 或 InfluxQL 进行查询。

Flight SQL 客户端

使用 Flight SQL 的查询示例

以下示例展示了如何使用 Go 的 Arrow Flight SQL 客户端查询 InfluxDB 集群数据库

  1. 在您的编辑器中,打开一个名为 query.go 的新文件,并输入以下示例代码

    package main
    
    import (
      "context"
      "crypto/x509"
      "encoding/json"
      "fmt"
      "os"
    
      "github.com/apache/arrow/go/v14/arrow/flight/flightsql"
      "google.golang.org/grpc"
      "google.golang.org/grpc/credentials"
      "google.golang.org/grpc/metadata"
    )
    
    func dbQuery(ctx context.Context) error {
      url := "cluster-host.com:443"
    
      // INFLUX_TOKEN is an environment variable you created for your database READ token
      token := os.Getenv("INFLUX_TOKEN")
      database := "get-started"
    
      // Create a gRPC transport
      pool, err := x509.SystemCertPool()
      if err != nil {
        return fmt.Errorf("x509: %s", err)
      }
      transport := grpc.WithTransportCredentials(credentials.NewClientTLSFromCert(pool, ""))
      opts := []grpc.DialOption{
        transport,
      }
    
      // Create query client
      client, err := flightsql.NewClient(url, nil, nil, opts...)
      if err != nil {
        return fmt.Errorf("flightsql: %s", err)
      }
    
      ctx = metadata.AppendToOutgoingContext(ctx, "authorization", "Bearer "+token)
      ctx = metadata.AppendToOutgoingContext(ctx, "database", database)
    
      // Execute query
      query := `SELECT
        *
      FROM
        home
      WHERE
        time >= '2022-01-01T08:00:00Z'
        AND time <= '2022-01-01T20:00:00Z'`
    
      info, err := client.Execute(ctx, query)
      if err != nil {
        return fmt.Errorf("flightsql flight info: %s", err)
      }
      reader, err := client.DoGet(ctx, info.Endpoint[0].Ticket)
      if err != nil {
        return fmt.Errorf("flightsql do get: %s", err)
      }
    
      // Print results as JSON
      for reader.Next() {
        record := reader.Record()
        b, err := json.MarshalIndent(record, "", "  ")
        if err != nil {
          return err
        }
        fmt.Println("RECORD BATCH")
        fmt.Println(string(b))
    
        if err := reader.Err(); err != nil {
          return fmt.Errorf("flightsql reader: %s", err)
        }
      }
    
      return nil
    }
    
    func main() {
      if err := dbQuery(context.Background()); err != nil {
        fmt.Fprintf(os.Stderr, "error: %v\n", err)
        os.Exit(1)
      }
    }
    

    该示例执行以下操作

    1. 导入以下包

      • context
      • crypto/x509
      • encoding/json
      • fmt
      • os
      • github.com/apache/arrow/go/v14/arrow/flight/flightsql
      • google.golang.org/grpc
      • google.golang.org/grpc/credentials
      • google.golang.org/grpc/metadata
    2. 创建一个执行以下操作的 dbQuery 函数

      1. 定义 InfluxDB 凭据的变量。

        • url: InfluxDB 集群主机名和端口 (:443) (无协议)
        • database: 要查询的 InfluxDB 集群数据库的名称
        • token: 对指定数据库具有读取权限的数据库令牌出于安全原因,我们建议将其设置为环境变量,而不是包含原始令牌字符串。
      2. 定义一个 opts 选项列表,其中包含用于通过 gRPC+TLS 协议与 InfluxDB 通信的 gRPC 传输。

      3. 使用 urlopts 调用 flightsql.NewClient() 方法以创建一个新的 Flight SQL 客户端。

      4. 将以下 InfluxDB 凭据作为键值对附加到传出的上下文中

        • authorization: Bearer <INFLUX_TOKEN>
        • database: 数据库名称
      5. 定义要执行的 SQL 查询。

      6. 调用 client.execute() 方法以发送查询请求。

      7. 使用来自查询响应的 ticket 调用 client.doGet() 方法,以从端点检索结果数据。

      8. 创建一个读取器以读取端点返回的 Arrow 表,并将结果打印为 JSON。

    3. 创建一个执行 dbQuery 函数的 main 模块函数。

  2. 输入以下命令以安装所有必要的包并运行程序以查询 InfluxDB 集群

    go get ./...
    go run ./query.go
    

查看程序输出

有关更多信息,请参阅 Go Arrow Flight 客户端文档


此页面是否对您有帮助?

感谢您的反馈!


Flux 的未来

Flux 即将进入维护模式。您可以继续像现在这样使用它,而无需对您的代码进行任何更改。

阅读更多

InfluxDB 3 开源版本现已公开发布 Alpha 版

InfluxDB 3 开源版本现已可用于 Alpha 测试,并根据 MIT 或 Apache 2 许可协议获得许可。

作为 Alpha 版本的一部分,我们发布了两个产品。

InfluxDB 3 Core 是我们新的开源产品。它是一个用于时间序列和事件数据的最新数据引擎。InfluxDB 3 Enterprise 是一个商业版本,它建立在 Core 的基础上,增加了历史查询功能、只读副本、高可用性、可扩展性和细粒度的安全性。

有关如何开始使用的更多信息,请查看