使用 HTTP API 和客户端库写入数据
使用 /api/v3/write_lp
HTTP API 端点和 InfluxDB v3 API 客户端,将点作为 Line Protocol 数据写入 InfluxDB 3 Core。
使用 /api/v3/write_lp 端点
InfluxDB 3 Core 添加了 /api/v3/write_lp
端点。
POST /api/v3/write_lp?db=mydb&precision=nanosecond&accept_partial=true&no_sync=false
此端点接受与先前版本相同的 Line Protocol 语法,并支持以下参数
?accept_partial=<BOOLEAN>
:接受或拒绝部分写入(默认为true
)。?no_sync=<BOOLEAN>
:控制写入何时被确认no_sync=true
:在 WAL 持久化完成之前确认写入。no_sync=false
:在 WAL 持久化完成后确认写入(默认)。
?precision=<PRECISION>
:指定时间戳的精度。默认精度为纳秒。
有关参数的更多信息,请参阅写入数据。
InfluxData 提供了受支持的 InfluxDB 3 客户端库,您可以将其与您的代码集成,以将数据构建为时间序列点,然后将其作为 Line Protocol 写入 InfluxDB 3 Core 数据库。有关更多信息,请参阅如何使用 InfluxDB 客户端库写入数据。
示例:使用 /api/v3 HTTP API 写入数据
以下示例演示了如何使用 curl
和 /api/3/write_lp
HTTP 端点写入数据。为了显示接受和拒绝部分写入之间的区别,示例中的第 2
行包含浮点字段 (temp
) 的字符串值 ("hi"
)。
发生 Line Protocol 部分写入
使用 accept_partial=true
(默认)
curl -v "http://localhost:8181/api/v3/write_lp?db=sensors&precision=auto" \
--data-raw 'home,room=Sunroom temp=96
home,room=Sunroom temp="hi"'
响应如下
< HTTP/1.1 400 Bad Request
...
{
"error": "partial write of line protocol occurred",
"data": [
{
"original_line": "home,room=Sunroom temp=hi",
"line_number": 2,
"error_message": "invalid column type for column 'temp', expected iox::column_type::field::float, got iox::column_type::field::string"
}
]
}
第 1
行已写入且可查询。第 2
行被拒绝。响应是 HTTP 错误 (400
) 状态,并且响应正文包含错误消息 partial write of line protocol occurred
,其中包含有关问题行的详细信息。
write_lp 端点解析失败
使用 accept_partial=false
curl -v "http://localhost:8181/api/v3/write_lp?db=sensors&precision=auto&accept_partial=false" \
--data-raw 'home,room=Sunroom temp=96
home,room=Sunroom temp="hi"'
响应如下
< HTTP/1.1 400 Bad Request
...
{
"error": "parsing failed for write_lp endpoint",
"data": {
"original_line": "home,room=Sunroom temp=hi",
"line_number": 2,
"error_message": "invalid column type for column 'temp', expected iox::column_type::field::float, got iox::column_type::field::string"
}
}
InfluxDB 拒绝批处理中的所有点。响应是 HTTP 错误 (400
) 状态,并且响应正文包含 parsing failed for write_lp endpoint
以及有关问题行的详细信息。
有关摄取路径和数据流的更多信息,请参阅数据持久性。
写入响应
默认情况下,InfluxDB 在将 WAL 文件刷新到对象存储(每秒发生一次)后确认写入。为了实现高写入吞吐量,您可以发送多个并发写入请求。
使用 no_sync 获取即时写入响应
为了减少写入延迟,请使用 no_sync
写入选项,该选项在 WAL 持久化完成之前确认写入。当 no_sync=true
时,InfluxDB 会验证数据,将数据写入 WAL,然后立即响应客户端,而无需等待持久化到对象存储。
当优先考虑高吞吐量写入而不是绝对持久性时,最好使用 no_sync=true
。
- 默认行为 (
no_sync=false
):等待数据写入对象存储后才确认写入。降低数据丢失的风险,但增加响应的延迟。 - 使用
no_sync=true
:降低写入延迟,但增加 WAL 持久化之前发生崩溃时数据丢失的风险。
使用 HTTP API 进行即时写入
no_sync
参数控制写入何时被确认——例如
curl "http://localhost:8181/api/v3/write_lp?db=sensors&precision=auto&no_sync=true" \
--data-raw "home,room=Sunroom temp=96"
使用 API 客户端库
使用与您的代码集成的 InfluxDB 3 客户端库,将数据构建为时间序列点,然后将其作为 Line Protocol 写入 InfluxDB 3 Core 数据库。
构建 Line Protocol
通过对 Line Protocol 的基本理解,您可以构建 Line Protocol 数据并将其写入 InfluxDB 3 Core。
所有 InfluxDB 客户端库都以 Line Protocol 格式将数据写入 InfluxDB。客户端库 write
方法允许您将数据提供为原始 Line Protocol 或客户端库转换为 Line Protocol 的 Point
对象。如果您的程序创建了您写入 InfluxDB 的数据,请使用客户端库 Point
接口,以利用程序中的类型安全性。
示例家庭模式
考虑一个用例,您从家中的传感器收集数据。每个传感器收集温度、湿度和一氧化碳读数。
要收集此数据,请使用以下模式
- 表:
home
- 标签
room
:客厅或厨房
- 字段
temp
:温度,单位 °C(浮点数)hum
:湿度百分比(浮点数)co
:一氧化碳,单位百万分之几(整数)
- 时间戳:Unix 时间戳,秒精度
- 标签
以下示例演示了如何构建和写入遵循 home
模式的点。
设置您的项目
设置 InfluxDB 3 Core 和您的项目后,您应该具有以下内容
InfluxDB 3 Core 凭据
授权令牌
在 Beta 测试期间,InfluxDB 3 Core 不需要授权令牌。
InfluxDB 3 Core URL
项目的目录。
凭据存储为环境变量或项目配置文件中——例如,
.env
(“dotenv”) 文件。为写入数据到 InfluxDB 3 Core 安装的客户端库。
以下示例使用 InfluxDB 3 客户端库来演示如何构建遵循示例 home
模式的 Point
对象,然后将数据作为 Line Protocol 写入 InfluxDB 3 Core 数据库。
以下步骤使用InfluxDB 3 Go 客户端设置 Go 项目
为您的 Go 模块创建一个目录并更改到该目录——例如
mkdir iot-starter-go && cd $_
初始化 Go 模块——例如
go mod init iot-starter
安装
influxdb3-go
,它提供了 InfluxDBinfluxdb3
Go 客户端库模块。go get github.com/InfluxCommunity/influxdb3-go/v2
以下步骤使用InfluxDB 3 JavaScript 客户端设置 JavaScript 项目。
安装Node.js。
为您的 JavaScript 项目创建一个目录并更改到该目录——例如
mkdir -p iot-starter-js && cd $_
初始化一个项目——例如,使用
npm
npm init
安装
@influxdata/influxdb3-client
InfluxDB 3 JavaScript 客户端库。npm install @influxdata/influxdb3-client
以下步骤使用InfluxDB 3 Python 客户端设置 Python 项目
安装 Python
在您的项目目录中,为您的 Python 模块创建一个目录并更改到该模块目录——例如
mkdir -p iot-starter-py && cd $_
可选,但推荐:使用
venv
或conda
激活虚拟环境,以便安装和执行代码——例如,使用venv
输入以下命令,为项目创建并激活虚拟环境python3 -m venv envs/iot-starter && source ./envs/iot-starter/bin/activate
安装
influxdb3-python
,它提供了 InfluxDBinfluxdb_client_3
Python 客户端库模块,并安装了pyarrow
包,用于处理 Arrow 数据。pip install influxdb3-python
构建点并写入 Line Protocol
客户端库提供一个或多个 Point
构造函数方法。某些库支持语言原生数据结构,例如 Go 的 struct
,用于创建点。
为您的模块创建一个文件——例如:
main.go
。在
main.go
中,输入以下示例代码package main import ( "context" "os" "fmt" "time" "github.com/InfluxCommunity/influxdb3-go/v2/influxdb3" "github.com/influxdata/line-protocol/v2/lineprotocol" ) func Write() error { url := os.Getenv("INFLUX_HOST") token := os.Getenv("INFLUX_TOKEN") database := os.Getenv("INFLUX_DATABASE") // To instantiate a client, call New() with InfluxDB credentials. client, err := influxdb3.New(influxdb3.ClientConfig{ Host: url, Token: token, Database: database, }) /** Use a deferred function to ensure the client is closed when the * function returns. **/ defer func (client *influxdb3.Client) { err = client.Close() if err != nil { panic(err) } }(client) /** Use the NewPoint method to construct a point. * NewPoint(measurement, tags map, fields map, time) **/ point := influxdb3.NewPoint("home", map[string]string{ "room": "Living Room", }, map[string]any{ "temp": 24.5, "hum": 40.5, "co": 15i}, time.Now(), ) /** Use the NewPointWithMeasurement method to construct a point with * method chaining. **/ point2 := influxdb3.NewPointWithMeasurement("home"). SetTag("room", "Living Room"). SetField("temp", 23.5). SetField("hum", 38.0). SetField("co", 16i). SetTimestamp(time.Now()) fmt.Println("Writing points") points := []*influxdb3.Point{point, point2} /** Write points to InfluxDB. * You can specify WriteOptions, such as Gzip threshold, * default tags, and timestamp precision. Default precision is lineprotocol.Nanosecond **/ err = client.WritePoints(context.Background(), points, influxdb3.WithPrecision(lineprotocol.Second)) return nil } func main() { Write() }
要运行模块并将数据写入您的 InfluxDB 3 Core 数据库,请在终端中输入以下命令
go run main.go
为您的模块创建一个文件——例如:
write-points.js
。在
write-points.js
中,输入以下示例代码// write-points.js import { InfluxDBClient, Point } from '@influxdata/influxdb3-client'; /** * Set InfluxDB credentials. */ const host = process.env.INFLUX_HOST ?? ''; const database = process.env.INFLUX_DATABASE; const token = process.env.INFLUX_TOKEN; /** * Write line protocol to InfluxDB using the JavaScript client library. */ export async function writePoints() { /** * Instantiate an InfluxDBClient. * Provide the host URL and the database token. */ const client = new InfluxDBClient({ host, token }); /** Use the fluent interface with chained methods to construct Points. */ const point = Point.measurement('home') .setTag('room', 'Living Room') .setFloatField('temp', 22.2) .setFloatField('hum', 35.5) .setIntegerField('co', 7) .setTimestamp(new Date().getTime() / 1000); const point2 = Point.measurement('home') .setTag('room', 'Kitchen') .setFloatField('temp', 21.0) .setFloatField('hum', 35.9) .setIntegerField('co', 0) .setTimestamp(new Date().getTime() / 1000); /** Write points to InfluxDB. * The write method accepts an array of points, the target database, and * an optional configuration object. * You can specify WriteOptions, such as Gzip threshold, default tags, * and timestamp precision. Default precision is lineprotocol.Nanosecond **/ try { await client.write([point, point2], database, '', { precision: 's' }); console.log('Data has been written successfully!'); } catch (error) { console.error(`Error writing data to InfluxDB: ${error.body}`); } client.close(); } writePoints();
要运行模块并将数据写入您的 {{< product-name >}} 数据库,请在终端中输入以下命令
node writePoints.js
为您的模块创建一个文件——例如:
write-points.py
。在
write-points.py
中,输入以下示例代码以在批处理模式下写入数据import os from influxdb_client_3 import ( InfluxDBClient3, InfluxDBError, Point, WritePrecision, WriteOptions, write_client_options) host = os.getenv('INFLUX_HOST') token = os.getenv('INFLUX_TOKEN') database = os.getenv('INFLUX_DATABASE') # Create an array of points with tags and fields. points = [Point("home") .tag("room", "Kitchen") .field("temp", 25.3) .field('hum', 20.2) .field('co', 9)] # With batching mode, define callbacks to execute after a successful or # failed write request. # Callback methods receive the configuration and data sent in the request. def success(self, data: str): print(f"Successfully wrote batch: data: {data}") def error(self, data: str, exception: InfluxDBError): print(f"Failed writing batch: config: {self}, data: {data} due: {exception}") def retry(self, data: str, exception: InfluxDBError): print(f"Failed retry writing batch: config: {self}, data: {data} retry: {exception}") # Configure options for batch writing. write_options = WriteOptions(batch_size=500, flush_interval=10_000, jitter_interval=2_000, retry_interval=5_000, max_retries=5, max_retry_delay=30_000, exponential_base=2) # Create an options dict that sets callbacks and WriteOptions. wco = write_client_options(success_callback=success, error_callback=error, retry_callback=retry, write_options=write_options) # Instantiate a synchronous instance of the client with your # InfluxDB credentials and write options, such as Gzip threshold, default tags, # and timestamp precision. Default precision is nanosecond ('ns'). with InfluxDBClient3(host=host, token=token, database=database, write_client_options=wco) as client: client.write(points, write_precision='s')
要运行模块并将数据写入您的 InfluxDB 3 Core 数据库,请在终端中输入以下命令
python write-points.py
示例代码执行以下操作
- 实例化配置了 InfluxDB URL 和 API 令牌的客户端。
- 构建
home
表Point
对象。 - 以 Line Protocol 格式将数据发送到 InfluxDB 并等待响应。
- 如果写入成功,则将成功消息记录到 stdout;否则,记录失败消息和错误详细信息。
- 关闭客户端以释放资源。
此页是否对您有帮助?
感谢您的反馈!
支持和反馈
感谢您成为我们社区的一份子!我们欢迎并鼓励您提供关于 InfluxDB 3 Core 和此文档的反馈和错误报告。要获得支持,请使用以下资源
拥有年度或支持合同的客户可以联系 InfluxData 支持。