文档文档

使用 Go 查询数据

使用 InfluxDB influxdb3-go Go 客户端库包和 SQL 或 InfluxQL 查询存储在 InfluxDB 中的数据。通过 Flight+gRPC 协议执行查询并检索数据,然后使用常见的 Go 工具处理数据。

开始使用 Go 查询 InfluxDB

以下示例演示如何使用 Go 和 influxdb3-go 模块创建客户端并查询 InfluxDB Clustered 数据库。

安装 Go

按照 Go 下载和安装说明 安装适用于您系统的 Go 编程语言的最新版本。

创建 Go 模块目录

  1. 在您的项目目录中,创建一个新的模块目录并导航到其中。

    mkdir influxdb_go_client && cd $_
    
  2. 输入以下命令以初始化新的 Go 模块

    go mod init influxdb_go_client
    

安装依赖项

在您的终端中,输入以下命令以下载并安装客户端库

go get github.com/InfluxCommunity/influxdb3-go/v2

安装依赖项后,您就可以查询和分析存储在 InfluxDB 数据库中的数据了。

执行查询

以下示例演示如何创建 InfluxDB 客户端,使用客户端查询方法选择测量中的所有字段,然后访问查询结果数据和元数据。

在您的 influxdb_go_client 模块目录中,创建一个名为 query.go 的文件,并输入以下示例之一以使用 SQL 或 InfluxQL 进行查询。

替换示例代码中的以下配置值

使用 SQL 查询

// query.go
package main

import (
    "context"
    "fmt"
    "io"
    "os"
    "text/tabwriter"
    "time"

    "github.com/InfluxCommunity/influxdb3-go/v2/influxdb3"
)

func Query() error {

    // Instantiate the client.
    client, err := influxdb3.New(influxdb3.ClientConfig{
        Host:       "https://cluster-host.com",
        Token:      "
DATABASE_TOKEN
"
,
Database: "
DATABASE_NAME
"
,
}) defer func(client *influxdb3.Client) { err := client.Close() if err != nil { panic(err) } }(client) query := `SELECT * FROM home WHERE time >= '2022-01-02T08:00:00Z' AND time <= '2022-01-02T20:00:00Z'` // Example 1: Query data and then read the schema and all data in the result stream. iterator, err := client.Query(context.Background(), query) fmt.Fprintln(os.Stdout, "Read all data in the stream:") data, err := iterator.Raw().Reader.Read() fmt.Fprintln(os.Stdout, data) if err != nil { panic(err) } // Example 2: Query data, view the result schema, and then process result data by row. iterator2, err = client.Query(context.Background(), query) fmt.Fprintln(os.Stdout, "View the query result schema:") schema := iterator2.Raw().Reader.Schema() fmt.Fprintln(os.Stdout, schema) w := tabwriter.NewWriter(io.Discard, 4, 4, 1, ' ', 0) w.Init(os.Stdout, 0, 8, 0, '\t', 0) fmt.Fprintln(w, "Process each row as key-value pairs:") for iterator2.Next() { row := iterator2.Value() // Use Go time package to format unix timestamp // as a time with timezone layout (RFC3339) time := (row["time"].(time.Time)). Format(time.RFC3339) fmt.Fprintf(w, "%s\t%s\t%d\t%.1f\t%.1f\n", time, row["room"], row["co"], row["hum"], row["temp"]) } w.Flush() }

示例代码执行以下操作

  1. 为您的模块定义 main 包,并导入您将在代码中使用的包。
  2. 定义 Query() 函数。
  3. 使用 InfluxDB 凭据实例化 influxdb3 客户端,并将其分配给 client 变量。
  4. 定义一个延迟函数,该函数在 Query() 执行完成后关闭客户端。
  5. 定义要执行的 SQL 查询。
  6. 调用客户端的 Query(ctx context.Context, query string) 方法,并将 SQL 字符串作为 query 参数传递。Query() 返回以下内容
    • *influxdb3.QueryIterator: 用于从查询结果流中读取数据的自定义迭代器。
    • error: Flight 请求错误。

使用 InfluxQL 查询

// query.go

package main

import (
    "context"
    "fmt"
    "io"
    "os"
    "text/tabwriter"
    "time"

    "github.com/InfluxCommunity/influxdb3-go/v2/influxdb3"
)

func InfluxQL() error {

    // Instantiate the client.
    client, err := influxdb3.New(influxdb3.ClientConfig{
        Host:       "https://cluster-host.com",
        Token:      "
DATABASE_TOKEN
"
,
Database: "
DATABASE_NAME
"
,
}) defer func(client *influxdb3.Client) { err := client.Close() if err != nil { panic(err) } }(client) query := `SELECT * FROM home WHERE time >= 1641124000s AND time <= 1641124000s + 8h` queryOptions := influxdb3.QueryOptions{ QueryType: influxdb3.InfluxQL, } // Example 1: Query data and then read the schema and all data in the result stream. iterator, err := client.QueryWithOptions(context.Background(), &queryOptions, query) fmt.Fprintln(os.Stdout, "Read all data in the stream:") data, err := iterator.Raw().Reader.Read() fmt.Fprintln(os.Stdout, data) if err != nil { panic(err) } // Example 2: Query data, view the result schema, and then process result data row by row. iterator2, err = client.QueryWithOptions(context.Background(), &queryOptions, query) fmt.Fprintln(os.Stdout, "View the query result schema:") schema := iterator2.Raw().Reader.Schema() fmt.Fprintln(os.Stdout, schema) w := tabwriter.NewWriter(io.Discard, 4, 4, 1, ' ', 0) w.Init(os.Stdout, 0, 8, 0, '\t', 0) fmt.Fprintln(w, "Process each row as key-value pairs:") for iterator2.Next() { row := iterator2.Value() // Use Go time package to format unix timestamp // as a time with timezone layout (RFC3339) time := (row["time"].(time.Time)). Format(time.RFC3339) fmt.Fprintf(w, "%s\t%s\t%d\t%.1f\t%.1f\n", time, row["room"], row["co"], row["hum"], row["temp"]) } w.Flush() }

示例代码执行以下操作

  1. 为您的模块定义 main 包,并导入您将在代码中使用的包。

  2. 定义 Query() 函数。

  3. 使用 InfluxDB 凭据实例化 influxdb3 客户端,并将其分配给 client 变量。

  4. 定义一个延迟函数,该函数在 Query() 执行完成后关闭客户端。

  5. 定义要执行的 InfluxQL 查询。

  6. 调用以下客户端方法

    QueryWithOptions(ctx context.Context, options *QueryOptions, query string)

    并传递以下参数

    • options: QueryOptions 结构体,其 QueryType 属性设置为 influxdb3.InfluxQL
    • query: 字符串。要执行的 SQL 或 InfluxQL 查询。QueryWithOptions 返回以下内容
      • *influxdb3.QueryIterator: 一个自定义迭代器,提供对查询结果数据和元数据的访问。
      • error: Flight 请求错误。

运行示例

  1. 在您的 influxdb_go_client 模块目录中,创建一个名为 main.go 的文件。

  2. main.go 中,输入以下示例代码以定义一个 main() 可执行函数,该函数调用 Query() 函数

    package main
    
    func main() {
      Query()
    }
    
  3. 在您的终端中,输入以下命令以安装必要的包,构建模块并运行程序

    go build && go run influxdb_go_client
    

    程序执行 main() 函数,该函数写入数据并将查询结果打印到控制台。


此页内容是否对您有帮助?

感谢您的反馈!


Flux 的未来

Flux 即将进入维护模式。您可以继续像目前一样使用它,而无需对代码进行任何更改。

阅读更多

InfluxDB 3 开源版现已发布公开 Alpha 版

InfluxDB 3 开源版现已可用于 Alpha 测试,根据 MIT 或 Apache 2 许可获得许可。

我们正在发布两个产品作为 Alpha 版的一部分。

InfluxDB 3 Core 是我们新的开源产品。它是一个用于时间序列和事件数据的最新数据引擎。InfluxDB 3 Enterprise 是一个商业版本,它建立在 Core 的基础上,增加了历史查询功能、读取副本、高可用性、可扩展性和细粒度的安全性。

有关如何开始使用的更多信息,请查看