使用 InfluxDB 客户端库写入 Line Protocol 数据
使用 InfluxDB 客户端库将数据构建为时间序列点,然后将其作为 Line Protocol 写入 InfluxDB Clustered 数据库。
构建 Line Protocol
在基本了解 Line Protocol之后,您可以构建 Line Protocol 数据并将其写入 InfluxDB。
所有 InfluxDB 客户端库都以 Line Protocol 格式将数据写入 InfluxDB。客户端库 write
方法允许您将数据作为原始 Line Protocol 或客户端库转换为 Line Protocol 的 Point
对象提供。如果您的程序创建了您写入 InfluxDB 的数据,请使用客户端库 Point
接口,以利用程序中的类型安全。
家庭模式示例
考虑一个用例,您从家中的传感器收集数据。每个传感器收集温度、湿度和一氧化碳读数。
要收集此数据,请使用以下模式
- measurement:
home
- 标签
room
:客厅或厨房
- 字段
temp
:温度,单位为 °C(浮点数)hum
:湿度百分比(浮点数)co
:一氧化碳,单位为百万分之几 (ppm)(整数)
- timestamp:Unix 时间戳,精度为秒
- 标签
以下示例演示了如何构建和写入遵循 home
模式的点。
设置您的项目
本指南中的示例假设您已按照设置 InfluxDB 和 写入数据设置 入门指南中的说明进行操作。
设置 InfluxDB 和您的项目后,您应该具有以下内容
InfluxDB Clustered 凭据
您的项目目录。
凭据存储为环境变量或项目配置文件中——例如,
.env
(“dotenv”) 文件。为写入数据到 InfluxDB 安装的客户端库。
以下示例演示了如何构建遵循示例 home
模式的 Point
对象,然后将数据作为 Line Protocol 写入 InfluxDB Clustered 数据库。
这些示例使用 InfluxDB 3 客户端库。有关使用 InfluxDB v2 客户端库将数据写入 InfluxDB 3 的示例,请参阅 InfluxDB v2 客户端。
以下步骤使用 InfluxDB 3 Go 客户端设置 Go 项目
安装 Go 1.13 或更高版本。
为您的 Go 模块创建一个目录并更改到该目录——例如
mkdir iot-starter-go && cd $_
初始化 Go 模块——例如
go mod init iot-starter
安装
influxdb3-go
,它提供了 InfluxDBinfluxdb3
Go 客户端库模块。go get github.com/InfluxCommunity/influxdb3-go/v2
以下步骤使用 InfluxDB 3 JavaScript 客户端设置 JavaScript 项目。
安装 Node.js。
为您的 JavaScript 项目创建一个目录并更改到该目录——例如
mkdir -p iot-starter-js && cd $_
初始化一个项目——例如,使用
npm
npm init
安装
@influxdata/influxdb3-client
InfluxDB 3 JavaScript 客户端库。npm install @influxdata/influxdb3-client
以下步骤使用 InfluxDB 3 Python 客户端设置 Python 项目
安装 Python
在您的项目目录中,为您的 Python 模块创建一个目录并更改到模块目录——例如
mkdir -p iot-starter-py && cd $_
可选,但推荐:使用
venv
或conda
激活虚拟环境以安装和执行代码——例如,输入以下命令使用venv
为项目创建并激活虚拟环境python3 -m venv envs/iot-starter && source ./envs/iot-starter/bin/activate
安装
influxdb3-python
,它提供了 InfluxDBinfluxdb_client_3
Python 客户端库模块,并且还安装了用于处理 Arrow 数据的pyarrow
包。pip install influxdb3-python
构建点并写入 Line Protocol
客户端库提供一个或多个 Point
构造函数方法。某些库支持语言原生的数据结构,例如 Go 的 struct
,用于创建点。
为您的模块创建一个文件——例如:
main.go
。在
main.go
中,输入以下示例代码package main import ( "context" "os" "fmt" "time" "github.com/InfluxCommunity/influxdb3-go/v2/influxdb3" "github.com/influxdata/line-protocol/v2/lineprotocol" ) func Write() error { url := os.Getenv("INFLUX_HOST") token := os.Getenv("INFLUX_TOKEN") database := os.Getenv("INFLUX_DATABASE") // To instantiate a client, call New() with InfluxDB credentials. client, err := influxdb3.New(influxdb3.ClientConfig{ Host: url, Token: token, Database: database, }) /** Use a deferred function to ensure the client is closed when the * function returns. **/ defer func (client *influxdb3.Client) { err = client.Close() if err != nil { panic(err) } }(client) /** Use the NewPoint method to construct a point. * NewPoint(measurement, tags map, fields map, time) **/ point := influxdb3.NewPoint("home", map[string]string{ "room": "Living Room", }, map[string]any{ "temp": 24.5, "hum": 40.5, "co": 15i}, time.Now(), ) /** Use the NewPointWithMeasurement method to construct a point with * method chaining. **/ point2 := influxdb3.NewPointWithMeasurement("home"). SetTag("room", "Living Room"). SetField("temp", 23.5). SetField("hum", 38.0). SetField("co", 16i). SetTimestamp(time.Now()) fmt.Println("Writing points") points := []*influxdb3.Point{point, point2} /** Write points to InfluxDB. * You can specify WriteOptions, such as Gzip threshold, * default tags, and timestamp precision. Default precision is lineprotocol.Nanosecond **/ err = client.WritePoints(context.Background(), points, influxdb3.WithPrecision(lineprotocol.Second)) return nil } func main() { Write() }
要运行模块并将数据写入您的 InfluxDB Clustered 数据库,请在您的终端中输入以下命令
go run main.go
为您的模块创建一个文件——例如:
write-points.js
。在
write-points.js
中,输入以下示例代码// write-points.js import { InfluxDBClient, Point } from '@influxdata/influxdb3-client'; /** * Set InfluxDB credentials. */ const host = process.env.INFLUX_HOST ?? ''; const database = process.env.INFLUX_DATABASE; const token = process.env.INFLUX_TOKEN; /** * Write line protocol to InfluxDB using the JavaScript client library. */ export async function writePoints() { /** * Instantiate an InfluxDBClient. * Provide the host URL and the database token. */ const client = new InfluxDBClient({ host, token }); /** Use the fluent interface with chained methods to construct Points. */ const point = Point.measurement('home') .setTag('room', 'Living Room') .setFloatField('temp', 22.2) .setFloatField('hum', 35.5) .setIntegerField('co', 7) .setTimestamp(new Date().getTime() / 1000); const point2 = Point.measurement('home') .setTag('room', 'Kitchen') .setFloatField('temp', 21.0) .setFloatField('hum', 35.9) .setIntegerField('co', 0) .setTimestamp(new Date().getTime() / 1000); /** Write points to InfluxDB. * The write method accepts an array of points, the target database, and * an optional configuration object. * You can specify WriteOptions, such as Gzip threshold, default tags, * and timestamp precision. Default precision is lineprotocol.Nanosecond **/ try { await client.write([point, point2], database, '', { precision: 's' }); console.log('Data has been written successfully!'); } catch (error) { console.error(`Error writing data to InfluxDB: ${error.body}`); } client.close(); } writePoints();
要运行模块并将数据写入您的 {{< product-name >}} 数据库,请在您的终端中输入以下命令
node writePoints.js
为您的模块创建一个文件——例如:
write-points.py
。在
write-points.py
中,输入以下示例代码以批量模式写入数据import os from influxdb_client_3 import ( InfluxDBClient3, InfluxDBError, Point, WritePrecision, WriteOptions, write_client_options) host = os.getenv('INFLUX_HOST') token = os.getenv('INFLUX_TOKEN') database = os.getenv('INFLUX_DATABASE') # Create an array of points with tags and fields. points = [Point("home") .tag("room", "Kitchen") .field("temp", 25.3) .field('hum', 20.2) .field('co', 9)] # With batching mode, define callbacks to execute after a successful or # failed write request. # Callback methods receive the configuration and data sent in the request. def success(self, data: str): print(f"Successfully wrote batch: data: {data}") def error(self, data: str, exception: InfluxDBError): print(f"Failed writing batch: config: {self}, data: {data} due: {exception}") def retry(self, data: str, exception: InfluxDBError): print(f"Failed retry writing batch: config: {self}, data: {data} retry: {exception}") # Configure options for batch writing. write_options = WriteOptions(batch_size=500, flush_interval=10_000, jitter_interval=2_000, retry_interval=5_000, max_retries=5, max_retry_delay=30_000, exponential_base=2) # Create an options dict that sets callbacks and WriteOptions. wco = write_client_options(success_callback=success, error_callback=error, retry_callback=retry, write_options=write_options) # Instantiate a synchronous instance of the client with your # InfluxDB credentials and write options, such as Gzip threshold, default tags, # and timestamp precision. Default precision is nanosecond ('ns'). with InfluxDBClient3(host=host, token=token, database=database, write_client_options=wco) as client: client.write(points, write_precision='s')
要运行模块并将数据写入您的 InfluxDB Clustered 数据库,请在您的终端中输入以下命令
python write-points.py
示例代码执行以下操作
- 实例化一个使用 InfluxDB URL 和 API 令牌配置的客户端。
- 构建
home
measurementPoint
对象。 - 以 Line Protocol 格式将数据发送到 InfluxDB 并等待响应。
- 如果写入成功,则将成功消息记录到 stdout;否则,记录失败消息和错误详细信息。
- 关闭客户端以释放资源。
此页是否对您有帮助?
感谢您的反馈!
支持和反馈
感谢您成为我们社区的一份子!我们欢迎并鼓励您提供关于 InfluxDB Clustered 和本文档的反馈和错误报告。要查找支持,请使用以下资源
拥有年度或支持合同的客户可以联系 InfluxData 支持。