文档说明

使用 Go 查询数据

使用 InfluxDB influxdb3-go Go 客户端库包和 SQL 或 InfluxQL 查询存储在 InfluxDB 中的数据。通过 Flight+gRPC 协议执行查询并检索数据,然后使用常见的 Go 工具处理数据。

开始使用 Go 查询 InfluxDB

以下示例展示了如何使用 Go 与 influxdb3-go 模块创建客户端并查询 InfluxDB 集群数据库。

安装 Go

按照 Go 下载和安装说明 安装适用于您的系统的最新版本的 Go 编程语言。

创建 Go 模块目录

  1. 在您的项目目录中,创建一个新的模块目录并进入该目录。

    mkdir influxdb_go_client && cd $_
    
  2. 输入以下命令初始化一个新的 Go 模块

    go mod init influxdb_go_client
    

安装依赖项

在您的终端中,输入以下命令下载并安装客户端库

go get github.com/InfluxCommunity/influxdb3-go

依赖项安装完成后,您即可查询和分析存储在 InfluxDB 数据库中的数据。

执行查询

以下示例展示了如何创建 InfluxDB 客户端,使用客户端查询方法选择测量中的所有字段,然后访问查询结果数据和元数据。

在您的 influxdb_go_client 模块目录中,创建一个名为 query.go 的文件,并输入以下示例之一以使用 SQL 或 InfluxQL 进行查询。

替换以下示例代码中的配置值

使用 SQL 查询

// query.go
package main

import (
    "context"
    "fmt"
    "io"
    "os"
    "text/tabwriter"
    "time"

    "github.com/InfluxCommunity/influxdb3-go/influxdb3"
    "github.com/apache/arrow/go/v13/arrow"
)

func Query() error {

    // Instantiate the client.
    client, err := influxdb3.New(influxdb3.ClientConfig{
        Host:       "https://cluster-host.com",
        Token:      "
DATABASE_TOKEN
"
,
Database: "
DATABASE_NAME
"
,
}) defer func(client *influxdb3.Client) { err := client.Close() if err != nil { panic(err) } }(client) query := `SELECT * FROM home WHERE time >= '2022-01-02T08:00:00Z' AND time <= '2022-01-02T20:00:00Z'` // Example 1: Query data and then read the schema and all data in the result stream. iterator, err := client.Query(context.Background(), query) fmt.Fprintln(os.Stdout, "Read all data in the stream:") data, err := iterator.Raw().Reader.Read() fmt.Fprintln(os.Stdout, data) if err != nil { panic(err) } // Example 2: Query data, view the result schema, and then process result data by row. iterator2, err = client.Query(context.Background(), query) fmt.Fprintln(os.Stdout, "View the query result schema:") schema := iterator2.Raw().Reader.Schema() fmt.Fprintln(os.Stdout, schema) w := tabwriter.NewWriter(io.Discard, 4, 4, 1, ' ', 0) w.Init(os.Stdout, 0, 8, 0, '\t', 0) fmt.Fprintln(w, "Process each row as key-value pairs:") for iterator2.Next() { row := iterator2.Value() // Use Go arrow and time packages to format unix timestamp // as a time with timezone layout (RFC3339) time := (row["time"].(arrow.Timestamp)). ToTime(arrow.TimeUnit(arrow.Nanosecond)). Format(time.RFC3339) fmt.Fprintf(w, "%s\t%s\t%d\t%.1f\t%.1f\n", time, row["room"], row["co"], row["hum"], row["temp"]) } w.Flush() }

示例代码执行以下操作

  1. 为您的模块定义一个 main 包并导入代码中将使用的包。
  2. 定义一个 Query() 函数。
  3. 使用 InfluxDB 凭据实例化 influxdb3 客户端并将其分配给 client 变量。
  4. 定义一个延迟函数,在 Query() 执行完成后关闭客户端。
  5. 定义要执行的 SQL 查询。
  6. 调用客户端的 Query(ctx context.Context, query string) 方法,并将 SQL 字符串作为 query 参数传递。 Query() 返回以下内容
    • *influxdb3.QueryIterator:用于从查询结果流中读取数据的自定义迭代器。
    • error:Flight 请求错误。

使用 InfluxQL 查询

// query.go

package main

import (
    "context"
    "fmt"
    "io"
    "os"
    "text/tabwriter"
    "time"

    "github.com/InfluxCommunity/influxdb3-go/influxdb3"
    "github.com/apache/arrow/go/v13/arrow"
)

func InfluxQL() error {

    // Instantiate the client.
    client, err := influxdb3.New(influxdb3.ClientConfig{
        Host:       "https://cluster-host.com",
        Token:      "
DATABASE_TOKEN
"
,
Database: "
DATABASE_NAME
"
,
}) defer func(client *influxdb3.Client) { err := client.Close() if err != nil { panic(err) } }(client) query := `SELECT * FROM home WHERE time >= 1641124000s AND time <= 1641124000s + 8h` queryOptions := influxdb3.QueryOptions{ QueryType: influxdb3.InfluxQL, } // Example 1: Query data and then read the schema and all data in the result stream. iterator, err := client.QueryWithOptions(context.Background(), &queryOptions, query) fmt.Fprintln(os.Stdout, "Read all data in the stream:") data, err := iterator.Raw().Reader.Read() fmt.Fprintln(os.Stdout, data) if err != nil { panic(err) } // Example 2: Query data, view the result schema, and then process result data row by row. iterator2, err = client.QueryWithOptions(context.Background(), &queryOptions, query) fmt.Fprintln(os.Stdout, "View the query result schema:") schema := iterator2.Raw().Reader.Schema() fmt.Fprintln(os.Stdout, schema) w := tabwriter.NewWriter(io.Discard, 4, 4, 1, ' ', 0) w.Init(os.Stdout, 0, 8, 0, '\t', 0) fmt.Fprintln(w, "Process each row as key-value pairs:") for iterator2.Next() { row := iterator2.Value() // Use Go arrow and time packages to format unix timestamp // as a time with timezone layout (RFC3339) time := (row["time"].(arrow.Timestamp)). ToTime(arrow.TimeUnit(arrow.Nanosecond)). Format(time.RFC3339) fmt.Fprintf(w, "%s\t%s\t%d\t%.1f\t%.1f\n", time, row["room"], row["co"], row["hum"], row["temp"]) } w.Flush() }

示例代码执行以下操作

  1. 为您的模块定义一个 main 包并导入代码中将使用的包。

  2. 定义一个 Query() 函数。

  3. 使用 InfluxDB 凭据实例化 influxdb3 客户端并将其分配给 client 变量。

  4. 定义一个延迟函数,在 Query() 执行完成后关闭客户端。

  5. 定义要执行的 InfluxQL 查询。

  6. 调用以下客户端方法

    QueryWithOptions(ctx context.Context, options *QueryOptions, query string)

    并传递以下参数

    • options:一个具有 QueryType 属性设置为 influxdb3.InfluxQLQueryOptions 结构。
    • query:一个字符串。要执行的 SQL 或 InfluxQL 查询。 QueryWithOptions 返回以下内容
      • *influxdb3.QueryIterator:提供访问查询结果数据和元数据的自定义迭代器。
      • error:Flight 请求错误。

运行示例

  1. 在您的 influxdb_go_client 模块目录中,创建一个名为 main.go 的文件。

  2. main.go 中,输入以下示例代码以定义一个调用 Query() 函数的 main() 可执行函数

    package main
    
    func main() {
      Query()
    }
    
  3. 在您的终端中,输入以下命令以安装必要的包,构建模块并运行程序

    go build && go run influxdb_go_client
    

    程序执行 main() 函数,该函数写入数据并将查询结果打印到控制台。


这个页面有帮助吗?

感谢您的反馈!


Flux的未来

Flux将进入维护模式。您可以在不更改代码的情况下继续像目前一样使用它。

阅读更多

InfluxDB v3增强和InfluxDB集群版现已普遍可用

新功能,包括更快的查询性能和管理工具,推动了InfluxDB v3产品线的进步。InfluxDB集群版现已普遍可用。

InfluxDB v3性能和功能

InfluxDB v3产品线在查询性能方面取得了显著提升,并提供了新的管理工具。这些增强包括一个用于监控InfluxDB集群健康状况的操作仪表板,InfluxDB云专用的单点登录(SSO)支持,以及用于令牌和数据库的新管理API。

了解新的v3增强功能


InfluxDB集群版普遍可用

InfluxDB集群版现已普遍可用,为您在自管理堆栈中带来了InfluxDB v3的强大功能。

与我们谈谈InfluxDB集群版