文档文档

使用 InfluxDB 客户端库写入 Line Protocol 数据

使用 InfluxDB 客户端库将数据构建为时间序列点,然后将其作为 Line Protocol 写入 InfluxDB Clustered 数据库。

构建 Line Protocol

基本了解 Line Protocol之后,您可以构建 Line Protocol 数据并将其写入 InfluxDB。

所有 InfluxDB 客户端库都以 Line Protocol 格式将数据写入 InfluxDB。客户端库 write 方法允许您将数据作为原始 Line Protocol 或客户端库转换为 Line Protocol 的 Point 对象提供。如果您的程序创建了您写入 InfluxDB 的数据,请使用客户端库 Point 接口,以利用程序中的类型安全。

家庭模式示例

考虑一个用例,您从家中的传感器收集数据。每个传感器收集温度、湿度和一氧化碳读数。

要收集此数据,请使用以下模式

  • measurementhome
    • 标签
      • room:客厅或厨房
    • 字段
      • temp:温度,单位为 °C(浮点数)
      • hum:湿度百分比(浮点数)
      • co:一氧化碳,单位为百万分之几 (ppm)(整数)
    • timestamp:Unix 时间戳,精度为

以下示例演示了如何构建和写入遵循 home 模式的点。

设置您的项目

本指南中的示例假设您已按照设置 InfluxDB写入数据设置 入门指南中的说明进行操作。

设置 InfluxDB 和您的项目后,您应该具有以下内容

  • InfluxDB Clustered 凭据

  • 您的项目目录。

  • 凭据存储为环境变量或项目配置文件中——例如,.env (“dotenv”) 文件。

  • 为写入数据到 InfluxDB 安装的客户端库。

以下示例演示了如何构建遵循示例 home 模式Point 对象,然后将数据作为 Line Protocol 写入 InfluxDB Clustered 数据库。

这些示例使用 InfluxDB 3 客户端库。有关使用 InfluxDB v2 客户端库将数据写入 InfluxDB 3 的示例,请参阅 InfluxDB v2 客户端

以下步骤使用 InfluxDB 3 Go 客户端设置 Go 项目

  1. 安装 Go 1.13 或更高版本

  2. 为您的 Go 模块创建一个目录并更改到该目录——例如

    mkdir iot-starter-go && cd $_
    
  3. 初始化 Go 模块——例如

    go mod init iot-starter
    
  4. 安装 influxdb3-go,它提供了 InfluxDB influxdb3 Go 客户端库模块。

    go get github.com/InfluxCommunity/influxdb3-go/v2
    

以下步骤使用 InfluxDB 3 JavaScript 客户端设置 JavaScript 项目。

  1. 安装 Node.js

  2. 为您的 JavaScript 项目创建一个目录并更改到该目录——例如

    mkdir -p iot-starter-js && cd $_
    
  3. 初始化一个项目——例如,使用 npm

    npm init
    
  4. 安装 @influxdata/influxdb3-client InfluxDB 3 JavaScript 客户端库。

    npm install @influxdata/influxdb3-client
    

以下步骤使用 InfluxDB 3 Python 客户端设置 Python 项目

  1. 安装 Python

  2. 在您的项目目录中,为您的 Python 模块创建一个目录并更改到模块目录——例如

    mkdir -p iot-starter-py && cd $_
    
  3. 可选,但推荐:使用 venvconda 激活虚拟环境以安装和执行代码——例如,输入以下命令使用 venv 为项目创建并激活虚拟环境

    python3 -m venv envs/iot-starter && source ./envs/iot-starter/bin/activate
    
  4. 安装 influxdb3-python,它提供了 InfluxDB influxdb_client_3 Python 客户端库模块,并且还安装了用于处理 Arrow 数据的 pyarrow

    pip install influxdb3-python
    

构建点并写入 Line Protocol

客户端库提供一个或多个 Point 构造函数方法。某些库支持语言原生的数据结构,例如 Go 的 struct,用于创建点。

  1. 为您的模块创建一个文件——例如:main.go

  2. main.go 中,输入以下示例代码

    package main
    
    import (
     "context"
     "os"
     "fmt"
     "time"
     "github.com/InfluxCommunity/influxdb3-go/v2/influxdb3"
     "github.com/influxdata/line-protocol/v2/lineprotocol"
    )
    
    func Write() error {
      url := os.Getenv("INFLUX_HOST")
      token := os.Getenv("INFLUX_TOKEN")
      database := os.Getenv("INFLUX_DATABASE")
    
      // To instantiate a client, call New() with InfluxDB credentials.
      client, err := influxdb3.New(influxdb3.ClientConfig{
       Host: url,
       Token: token,
       Database: database,
      })
    
      /** Use a deferred function to ensure the client is closed when the
        * function returns.
       **/
      defer func (client *influxdb3.Client)  {
       err = client.Close()
       if err != nil {
         panic(err)
       }
      }(client)
    
      /** Use the NewPoint method to construct a point.
        * NewPoint(measurement, tags map, fields map, time)
       **/
      point := influxdb3.NewPoint("home",
         map[string]string{
           "room": "Living Room",
         },
         map[string]any{
           "temp": 24.5,
           "hum":  40.5,
           "co":   15i},
         time.Now(),
       )
    
      /** Use the NewPointWithMeasurement method to construct a point with
        * method chaining.
       **/
      point2 := influxdb3.NewPointWithMeasurement("home").
       SetTag("room", "Living Room").
       SetField("temp", 23.5).
       SetField("hum", 38.0).
       SetField("co",  16i).
       SetTimestamp(time.Now())
    
      fmt.Println("Writing points")
      points := []*influxdb3.Point{point, point2}
    
      /** Write points to InfluxDB.
        * You can specify WriteOptions, such as Gzip threshold,
        * default tags, and timestamp precision. Default precision is lineprotocol.Nanosecond
       **/
      err = client.WritePoints(context.Background(), points,
        influxdb3.WithPrecision(lineprotocol.Second))
      return nil
    }
    
    func main() {
      Write()
    }
    
  3. 要运行模块并将数据写入您的 InfluxDB Clustered 数据库,请在您的终端中输入以下命令

    go run main.go
    
  1. 为您的模块创建一个文件——例如:write-points.js

  2. write-points.js 中,输入以下示例代码

    // write-points.js
    import { InfluxDBClient, Point } from '@influxdata/influxdb3-client';
    
    /**
     * Set InfluxDB credentials.
     */
    const host = process.env.INFLUX_HOST ?? '';
    const database = process.env.INFLUX_DATABASE;
    const token = process.env.INFLUX_TOKEN;
    
    /**
     * Write line protocol to InfluxDB using the JavaScript client library.
     */
    export async function writePoints() {
      /**
       * Instantiate an InfluxDBClient.
       * Provide the host URL and the database token.
       */
      const client = new InfluxDBClient({ host, token });
    
      /** Use the fluent interface with chained methods to construct Points. */
      const point = Point.measurement('home')
        .setTag('room', 'Living Room')
        .setFloatField('temp', 22.2)
        .setFloatField('hum', 35.5)
        .setIntegerField('co', 7)
        .setTimestamp(new Date().getTime() / 1000);
    
      const point2 = Point.measurement('home')
        .setTag('room', 'Kitchen')
        .setFloatField('temp', 21.0)
        .setFloatField('hum', 35.9)
        .setIntegerField('co', 0)
        .setTimestamp(new Date().getTime() / 1000);
    
      /** Write points to InfluxDB.
       * The write method accepts an array of points, the target database, and
       * an optional configuration object.
       * You can specify WriteOptions, such as Gzip threshold, default tags,
       * and timestamp precision. Default precision is lineprotocol.Nanosecond
       **/
    
      try {
        await client.write([point, point2], database, '', { precision: 's' });
        console.log('Data has been written successfully!');
      } catch (error) {
        console.error(`Error writing data to InfluxDB: ${error.body}`);
      }
    
      client.close();
    }
    
    writePoints();
    
  3. 要运行模块并将数据写入您的 {{< product-name >}} 数据库,请在您的终端中输入以下命令

    node writePoints.js
    
  1. 为您的模块创建一个文件——例如:write-points.py

  2. write-points.py 中,输入以下示例代码以批量模式写入数据

    import os
    from influxdb_client_3 import (
      InfluxDBClient3, InfluxDBError, Point, WritePrecision,
      WriteOptions, write_client_options)
    
    host = os.getenv('INFLUX_HOST')
    token = os.getenv('INFLUX_TOKEN')
    database = os.getenv('INFLUX_DATABASE')
    
    # Create an array of points with tags and fields.
    points = [Point("home")
                .tag("room", "Kitchen")
                .field("temp", 25.3)
                .field('hum', 20.2)
                .field('co', 9)]
    
    # With batching mode, define callbacks to execute after a successful or
    # failed write request.
    # Callback methods receive the configuration and data sent in the request.
    def success(self, data: str):
        print(f"Successfully wrote batch: data: {data}")
    
    def error(self, data: str, exception: InfluxDBError):
        print(f"Failed writing batch: config: {self}, data: {data} due: {exception}")
    
    def retry(self, data: str, exception: InfluxDBError):
        print(f"Failed retry writing batch: config: {self}, data: {data} retry: {exception}")
    
    # Configure options for batch writing.
    write_options = WriteOptions(batch_size=500,
                                        flush_interval=10_000,
                                        jitter_interval=2_000,
                                        retry_interval=5_000,
                                        max_retries=5,
                                        max_retry_delay=30_000,
                                        exponential_base=2)
    
    # Create an options dict that sets callbacks and WriteOptions.
    wco = write_client_options(success_callback=success,
                              error_callback=error,
                              retry_callback=retry,
                              write_options=write_options)
    
    # Instantiate a synchronous instance of the client with your
    # InfluxDB credentials and write options, such as Gzip threshold, default tags,
    # and timestamp precision. Default precision is nanosecond ('ns').
    with InfluxDBClient3(host=host,
                            token=token,
                            database=database,
                            write_client_options=wco) as client:
    
          client.write(points, write_precision='s')
    
  3. 要运行模块并将数据写入您的 InfluxDB Clustered 数据库,请在您的终端中输入以下命令

    python write-points.py
    

示例代码执行以下操作

  1. 实例化一个使用 InfluxDB URL 和 API 令牌配置的客户端。
  2. 构建 home measurement Point 对象。
  3. 以 Line Protocol 格式将数据发送到 InfluxDB 并等待响应。
  4. 如果写入成功,则将成功消息记录到 stdout;否则,记录失败消息和错误详细信息。
  5. 关闭客户端以释放资源。

此页是否对您有帮助?

感谢您的反馈!


Flux 的未来

Flux 即将进入维护模式。您可以继续像现在一样使用它,而无需对您的代码进行任何更改。

阅读更多

InfluxDB 3 开源版现已公开发布 Alpha 版

InfluxDB 3 开源版现已可用于 Alpha 测试,根据 MIT 或 Apache 2 许可获得许可。

我们正在发布两个产品作为 Alpha 版的一部分。

InfluxDB 3 Core 是我们的新开源产品。它是一个用于时间序列和事件数据的最新数据引擎。InfluxDB 3 Enterprise 是一个商业版本,它建立在 Core 的基础上,增加了历史查询功能、读取副本、高可用性、可伸缩性和细粒度的安全性。

有关如何开始使用的更多信息,请查看