文档文档

使用 HTTP API 和客户端库写入数据

使用 /api/v3/write_lp HTTP API 端点和 InfluxDB v3 API 客户端,将点作为 Line Protocol 数据写入 InfluxDB 3 Core。

使用 /api/v3/write_lp 端点

InfluxDB 3 Core 添加了 /api/v3/write_lp 端点。

POST /api/v3/write_lp?db=mydb&precision=nanosecond&accept_partial=true&no_sync=false

此端点接受与先前版本相同的 Line Protocol 语法,并支持以下参数

  • ?accept_partial=<BOOLEAN>:接受或拒绝部分写入(默认为 true)。
  • ?no_sync=<BOOLEAN>:控制写入何时被确认
    • no_sync=true:在 WAL 持久化完成之前确认写入。
    • no_sync=false:在 WAL 持久化完成后确认写入(默认)。
  • ?precision=<PRECISION>:指定时间戳的精度。默认精度为纳秒。

有关参数的更多信息,请参阅写入数据

InfluxData 提供了受支持的 InfluxDB 3 客户端库,您可以将其与您的代码集成,以将数据构建为时间序列点,然后将其作为 Line Protocol 写入 InfluxDB 3 Core 数据库。有关更多信息,请参阅如何使用 InfluxDB 客户端库写入数据

示例:使用 /api/v3 HTTP API 写入数据

以下示例演示了如何使用 curl/api/3/write_lp HTTP 端点写入数据。为了显示接受和拒绝部分写入之间的区别,示例中的第 2 行包含浮点字段 (temp) 的字符串值 ("hi")。

发生 Line Protocol 部分写入

使用 accept_partial=true(默认)

curl -v "http://localhost:8181/api/v3/write_lp?db=sensors&precision=auto" \
  --data-raw 'home,room=Sunroom temp=96
home,room=Sunroom temp="hi"'

响应如下

< HTTP/1.1 400 Bad Request
...
{
  "error": "partial write of line protocol occurred",
  "data": [
    {
      "original_line": "home,room=Sunroom temp=hi",
      "line_number": 2,
      "error_message": "invalid column type for column 'temp', expected iox::column_type::field::float, got iox::column_type::field::string"
    }
  ]
}

1 行已写入且可查询。第 2 行被拒绝。响应是 HTTP 错误 (400) 状态,并且响应正文包含错误消息 partial write of line protocol occurred,其中包含有关问题行的详细信息。

write_lp 端点解析失败

使用 accept_partial=false

curl -v "http://localhost:8181/api/v3/write_lp?db=sensors&precision=auto&accept_partial=false" \
  --data-raw 'home,room=Sunroom temp=96
home,room=Sunroom temp="hi"'

响应如下

< HTTP/1.1 400 Bad Request
...
{
  "error": "parsing failed for write_lp endpoint",
  "data": {
    "original_line": "home,room=Sunroom temp=hi",
    "line_number": 2,
    "error_message": "invalid column type for column 'temp', expected iox::column_type::field::float, got iox::column_type::field::string"
  }
}

InfluxDB 拒绝批处理中的所有点。响应是 HTTP 错误 (400) 状态,并且响应正文包含 parsing failed for write_lp endpoint 以及有关问题行的详细信息。

有关摄取路径和数据流的更多信息,请参阅数据持久性

写入响应

默认情况下,InfluxDB 在将 WAL 文件刷新到对象存储(每秒发生一次)后确认写入。为了实现高写入吞吐量,您可以发送多个并发写入请求。

使用 no_sync 获取即时写入响应

为了减少写入延迟,请使用 no_sync 写入选项,该选项在 WAL 持久化完成之前确认写入。当 no_sync=true 时,InfluxDB 会验证数据,将数据写入 WAL,然后立即响应客户端,而无需等待持久化到对象存储。

当优先考虑高吞吐量写入而不是绝对持久性时,最好使用 no_sync=true

  • 默认行为 (no_sync=false):等待数据写入对象存储后才确认写入。降低数据丢失的风险,但增加响应的延迟。
  • 使用 no_sync=true:降低写入延迟,但增加 WAL 持久化之前发生崩溃时数据丢失的风险。

使用 HTTP API 进行即时写入

no_sync 参数控制写入何时被确认——例如

curl "http://localhost:8181/api/v3/write_lp?db=sensors&precision=auto&no_sync=true" \
  --data-raw "home,room=Sunroom temp=96"

使用 API 客户端库

使用与您的代码集成的 InfluxDB 3 客户端库,将数据构建为时间序列点,然后将其作为 Line Protocol 写入 InfluxDB 3 Core 数据库。

构建 Line Protocol

通过对 Line Protocol 的基本理解,您可以构建 Line Protocol 数据并将其写入 InfluxDB 3 Core。

所有 InfluxDB 客户端库都以 Line Protocol 格式将数据写入 InfluxDB。客户端库 write 方法允许您将数据提供为原始 Line Protocol 或客户端库转换为 Line Protocol 的 Point 对象。如果您的程序创建了您写入 InfluxDB 的数据,请使用客户端库 Point 接口,以利用程序中的类型安全性。

示例家庭模式

考虑一个用例,您从家中的传感器收集数据。每个传感器收集温度、湿度和一氧化碳读数。

要收集此数据,请使用以下模式

  • home
    • 标签
      • room:客厅或厨房
    • 字段
      • temp:温度,单位 °C(浮点数)
      • hum:湿度百分比(浮点数)
      • co:一氧化碳,单位百万分之几(整数)
    • 时间戳:Unix 时间戳,精度

以下示例演示了如何构建和写入遵循 home 模式的点。

设置您的项目

设置 InfluxDB 3 Core 和您的项目后,您应该具有以下内容

  • InfluxDB 3 Core 凭据

    • 数据库

    • 授权令牌

      在 Beta 测试期间,InfluxDB 3 Core 不需要授权令牌。

    • InfluxDB 3 Core URL

  • 项目的目录。

  • 凭据存储为环境变量或项目配置文件中——例如,.env (“dotenv”) 文件。

  • 为写入数据到 InfluxDB 3 Core 安装的客户端库。

以下示例使用 InfluxDB 3 客户端库来演示如何构建遵循示例 home 模式Point 对象,然后将数据作为 Line Protocol 写入 InfluxDB 3 Core 数据库。

以下步骤使用InfluxDB 3 Go 客户端设置 Go 项目

  1. 安装Go 1.13 或更高版本

  2. 为您的 Go 模块创建一个目录并更改到该目录——例如

    mkdir iot-starter-go && cd $_
    
  3. 初始化 Go 模块——例如

    go mod init iot-starter
    
  4. 安装influxdb3-go,它提供了 InfluxDB influxdb3 Go 客户端库模块。

    go get github.com/InfluxCommunity/influxdb3-go/v2
    

以下步骤使用InfluxDB 3 JavaScript 客户端设置 JavaScript 项目。

  1. 安装Node.js

  2. 为您的 JavaScript 项目创建一个目录并更改到该目录——例如

    mkdir -p iot-starter-js && cd $_
    
  3. 初始化一个项目——例如,使用 npm

    npm init
    
  4. 安装 @influxdata/influxdb3-client InfluxDB 3 JavaScript 客户端库。

    npm install @influxdata/influxdb3-client
    

以下步骤使用InfluxDB 3 Python 客户端设置 Python 项目

  1. 安装 Python

  2. 在您的项目目录中,为您的 Python 模块创建一个目录并更改到该模块目录——例如

    mkdir -p iot-starter-py && cd $_
    
  3. 可选,但推荐:使用 venvconda 激活虚拟环境,以便安装和执行代码——例如,使用 venv 输入以下命令,为项目创建并激活虚拟环境

    python3 -m venv envs/iot-starter && source ./envs/iot-starter/bin/activate
    
  4. 安装 influxdb3-python,它提供了 InfluxDB influxdb_client_3 Python 客户端库模块,并安装了 pyarrow,用于处理 Arrow 数据。

    pip install influxdb3-python
    

构建点并写入 Line Protocol

客户端库提供一个或多个 Point 构造函数方法。某些库支持语言原生数据结构,例如 Go 的 struct,用于创建点。

  1. 为您的模块创建一个文件——例如:main.go

  2. main.go 中,输入以下示例代码

    package main
    
    import (
     "context"
     "os"
     "fmt"
     "time"
     "github.com/InfluxCommunity/influxdb3-go/v2/influxdb3"
     "github.com/influxdata/line-protocol/v2/lineprotocol"
    )
    
    func Write() error {
      url := os.Getenv("INFLUX_HOST")
      token := os.Getenv("INFLUX_TOKEN")
      database := os.Getenv("INFLUX_DATABASE")
    
      // To instantiate a client, call New() with InfluxDB credentials.
      client, err := influxdb3.New(influxdb3.ClientConfig{
       Host: url,
       Token: token,
       Database: database,
      })
    
      /** Use a deferred function to ensure the client is closed when the
        * function returns.
       **/
      defer func (client *influxdb3.Client)  {
       err = client.Close()
       if err != nil {
         panic(err)
       }
      }(client)
    
      /** Use the NewPoint method to construct a point.
        * NewPoint(measurement, tags map, fields map, time)
       **/
      point := influxdb3.NewPoint("home",
         map[string]string{
           "room": "Living Room",
         },
         map[string]any{
           "temp": 24.5,
           "hum":  40.5,
           "co":   15i},
         time.Now(),
       )
    
      /** Use the NewPointWithMeasurement method to construct a point with
        * method chaining.
       **/
      point2 := influxdb3.NewPointWithMeasurement("home").
       SetTag("room", "Living Room").
       SetField("temp", 23.5).
       SetField("hum", 38.0).
       SetField("co",  16i).
       SetTimestamp(time.Now())
    
      fmt.Println("Writing points")
      points := []*influxdb3.Point{point, point2}
    
      /** Write points to InfluxDB.
        * You can specify WriteOptions, such as Gzip threshold,
        * default tags, and timestamp precision. Default precision is lineprotocol.Nanosecond
       **/
      err = client.WritePoints(context.Background(), points,
        influxdb3.WithPrecision(lineprotocol.Second))
      return nil
    }
    
    func main() {
      Write()
    }
    
  3. 要运行模块并将数据写入您的 InfluxDB 3 Core 数据库,请在终端中输入以下命令

    go run main.go
    
  1. 为您的模块创建一个文件——例如:write-points.js

  2. write-points.js 中,输入以下示例代码

    // write-points.js
    import { InfluxDBClient, Point } from '@influxdata/influxdb3-client';
    
    /**
     * Set InfluxDB credentials.
     */
    const host = process.env.INFLUX_HOST ?? '';
    const database = process.env.INFLUX_DATABASE;
    const token = process.env.INFLUX_TOKEN;
    
    /**
     * Write line protocol to InfluxDB using the JavaScript client library.
     */
    export async function writePoints() {
      /**
       * Instantiate an InfluxDBClient.
       * Provide the host URL and the database token.
       */
      const client = new InfluxDBClient({ host, token });
    
      /** Use the fluent interface with chained methods to construct Points. */
      const point = Point.measurement('home')
        .setTag('room', 'Living Room')
        .setFloatField('temp', 22.2)
        .setFloatField('hum', 35.5)
        .setIntegerField('co', 7)
        .setTimestamp(new Date().getTime() / 1000);
    
      const point2 = Point.measurement('home')
        .setTag('room', 'Kitchen')
        .setFloatField('temp', 21.0)
        .setFloatField('hum', 35.9)
        .setIntegerField('co', 0)
        .setTimestamp(new Date().getTime() / 1000);
    
      /** Write points to InfluxDB.
       * The write method accepts an array of points, the target database, and
       * an optional configuration object.
       * You can specify WriteOptions, such as Gzip threshold, default tags,
       * and timestamp precision. Default precision is lineprotocol.Nanosecond
       **/
    
      try {
        await client.write([point, point2], database, '', { precision: 's' });
        console.log('Data has been written successfully!');
      } catch (error) {
        console.error(`Error writing data to InfluxDB: ${error.body}`);
      }
    
      client.close();
    }
    
    writePoints();
    
  3. 要运行模块并将数据写入您的 {{< product-name >}} 数据库,请在终端中输入以下命令

    node writePoints.js
    
  1. 为您的模块创建一个文件——例如:write-points.py

  2. write-points.py 中,输入以下示例代码以在批处理模式下写入数据

    import os
    from influxdb_client_3 import (
      InfluxDBClient3, InfluxDBError, Point, WritePrecision,
      WriteOptions, write_client_options)
    
    host = os.getenv('INFLUX_HOST')
    token = os.getenv('INFLUX_TOKEN')
    database = os.getenv('INFLUX_DATABASE')
    
    # Create an array of points with tags and fields.
    points = [Point("home")
                .tag("room", "Kitchen")
                .field("temp", 25.3)
                .field('hum', 20.2)
                .field('co', 9)]
    
    # With batching mode, define callbacks to execute after a successful or
    # failed write request.
    # Callback methods receive the configuration and data sent in the request.
    def success(self, data: str):
        print(f"Successfully wrote batch: data: {data}")
    
    def error(self, data: str, exception: InfluxDBError):
        print(f"Failed writing batch: config: {self}, data: {data} due: {exception}")
    
    def retry(self, data: str, exception: InfluxDBError):
        print(f"Failed retry writing batch: config: {self}, data: {data} retry: {exception}")
    
    # Configure options for batch writing.
    write_options = WriteOptions(batch_size=500,
                                        flush_interval=10_000,
                                        jitter_interval=2_000,
                                        retry_interval=5_000,
                                        max_retries=5,
                                        max_retry_delay=30_000,
                                        exponential_base=2)
    
    # Create an options dict that sets callbacks and WriteOptions.
    wco = write_client_options(success_callback=success,
                              error_callback=error,
                              retry_callback=retry,
                              write_options=write_options)
    
    # Instantiate a synchronous instance of the client with your
    # InfluxDB credentials and write options, such as Gzip threshold, default tags,
    # and timestamp precision. Default precision is nanosecond ('ns').
    with InfluxDBClient3(host=host,
                            token=token,
                            database=database,
                            write_client_options=wco) as client:
    
          client.write(points, write_precision='s')
    
  3. 要运行模块并将数据写入您的 InfluxDB 3 Core 数据库,请在终端中输入以下命令

    python write-points.py
    

示例代码执行以下操作

  1. 实例化配置了 InfluxDB URL 和 API 令牌的客户端。
  2. 构建 homePoint 对象。
  3. 以 Line Protocol 格式将数据发送到 InfluxDB 并等待响应。
  4. 如果写入成功,则将成功消息记录到 stdout;否则,记录失败消息和错误详细信息。
  5. 关闭客户端以释放资源。

此页是否对您有帮助?

感谢您的反馈!


Flux 的未来

Flux 即将进入维护模式。您可以继续像现在一样使用它,而无需对代码进行任何更改。

阅读更多

现已全面上市

InfluxDB 3 Core 和 Enterprise

快速启动。更快扩展。

获取更新

InfluxDB 3 Core 是一个开源、高速、近实时数据引擎,可实时收集和处理数据,并将其持久化到本地磁盘或对象存储。InfluxDB 3 Enterprise 构建在 Core 的基础上,增加了高可用性、读取副本、增强的安全性以及数据压缩,从而实现更快的查询和优化的存储。InfluxDB 3 Enterprise 的免费层可供非商业家庭或业余爱好者使用。

有关更多信息,请查看