Python 客户端库入门程序
按照此分步教程,使用 InfluxData 客户端库和您喜欢的框架或语言构建物联网 (IoT) 应用程序。
在本教程中,您将使用 InfluxDB API 和客户端库构建现代应用程序,同时学习以下内容:
InfluxDB 核心概念。
应用程序如何与设备和 InfluxDB 交互。
如何对应用程序和设备进行 API 身份验证。
如何安装客户端库。
如何在 InfluxDB 中写入和查询数据。
如何使用 InfluxData UI 库格式化数据并创建可视化效果。
目录
- 目录
- 设置 InfluxDB
- IoT Starter 简介
- 创建应用程序
- 安装 InfluxDB 客户端库
- 配置客户端库
- 构建 API
- 创建 API 以注册设备
- 创建 API 以列出设备
- 创建 IoT 虚拟设备
- 写入遥测数据
- 查询遥测数据
- 定义 API 响应
- 安装并运行 UI
设置 InfluxDB
如果您尚未这样做,请创建 InfluxDB Cloud 帐户或安装 InfluxDB OSS。
IoT Starter 示例应用程序假定具有以下先决条件:
- InfluxDB org ID
- API 令牌(例如,具有存储桶的读取和写入权限的所有访问令牌)
- 名为
iot_center
的存储桶,用于存储来自设备的时间序列数据 - 名为
iot_center_devices
的存储桶,用于存储设备元数据和 API 令牌 ID
IoT Starter 简介
应用程序架构有四个层:
- InfluxDB API:InfluxDB v2 API。
- IoT 设备:虚拟或物理设备将 IoT 数据写入 InfluxDB API。
- UI:向服务器发送请求并在浏览器中呈现视图。
- API:接收来自 UI 的请求,向 InfluxDB 发送请求,并处理来自 InfluxDB 的响应。
有关本教程中引用的完整代码,请参阅 influxdata/iot-api-python 存储库。
创建应用程序
创建一个将包含您的 iot-api
项目的目录。以下示例代码在您的主目录中创建一个 iot-api
目录,并更改为新目录:
mkdir ~/iot-api-apps
cd ~/iot-api-apps
使用轻量级 Python Web 框架 Flask 创建您的应用程序。
在您的
~/iot-api-apps
目录中,打开终端并输入以下命令以创建并导航到新的项目目录:mkdir iot-api-python && cd $_
在您的终端中输入以下命令,为项目创建并激活 Python 虚拟环境:
# Create a new virtual environment named "virtualenv" # Python 3.8+ python -m venv virtualenv # Activate the virtualenv (OS X & Linux) source virtualenv/bin/activate
激活完成后,在您的终端中输入以下命令,使用
pip
包安装程序(Python 随附)安装 Flask:pip install Flask
在您的项目中,创建一个
app.py
文件,该文件:- 导入 Flask 包。
- 实例化 Flask 应用程序。
- 提供执行应用程序的路由。
from flask import Flask app = Flask(__name__) @app.route("/") def hello(): return "Hello World!"
启动您的应用程序。以下示例代码在
http://localhost:3001
上启动应用程序,并启用调试和热重载:export FLASK_ENV=development flask run -h localhost -p 3001
在您的浏览器中,访问 http://localhost:3001 以查看“Hello World!”响应。
安装 InfluxDB 客户端库
InfluxDB 客户端库提供以下 InfluxDB API 交互:
- 使用 Flux 语言查询数据。
- 将数据写入 InfluxDB。
- 在后台批量处理数据。
- 在失败时自动重试请求。
在您的终端中输入以下命令以安装客户端库:
pip install influxdb-client
有关客户端库的更多信息,请参阅 influxdata/influxdb-client-python 存储库。
配置客户端库
InfluxDB 客户端库需要来自您的 InfluxDB 环境的配置属性。通常,您将以下属性作为应用程序的环境变量提供:
INFLUX_URL
INFLUX_TOKEN
INFLUX_ORG
INFLUX_BUCKET
INFLUX_BUCKET_AUTH
要设置客户端配置,请在您的项目顶级目录中创建一个 config.ini
,并粘贴以下内容以提供必要的 InfluxDB 凭据:
[APP]
INFLUX_URL = <INFLUX_URL>
INFLUX_TOKEN = <INFLUX_TOKEN>
INFLUX_ORG = <INFLUX_ORG_ID>
INFLUX_BUCKET = iot_center
INFLUX_BUCKET_AUTH = iot_center_devices
替换以下内容:
<INFLUX_URL>
:您的 InfluxDB 实例 URL。<INFLUX_TOKEN>
:您的 InfluxDB API 令牌,具有查询(读取)存储桶和为设备创建(写入)授权的权限。<INFLUX_ORG_ID>
:您的 InfluxDB 组织 ID。
构建 API
您的应用程序 API 提供服务器端 HTTP 端点,用于处理来自 UI 的请求。每个 API 端点负责以下操作:
- 侦听 HTTP 请求(来自 UI)。
- 将请求转换为 InfluxDB API 请求。
- 处理 InfluxDB API 响应并处理错误。
- 使用状态和数据进行响应(对于 UI)。
创建 API 以注册设备
在此应用程序中,注册设备是一个点,其中包含您的设备 ID、授权 ID 和 API 令牌。API 令牌和授权权限允许设备查询和写入 INFLUX_BUCKET
。在本节中,您将添加 API 端点,该端点处理来自 UI 的请求,在 InfluxDB 中创建授权,并将注册设备写入 INFLUX_BUCKET_AUTH
存储桶。要了解有关 API 令牌和授权的更多信息,请参阅 管理 API 令牌
应用程序 API 使用以下 /api/v2
InfluxDB API 端点:
POST /api/v2/query
:查询INFLUX_BUCKET_AUTH
以查找注册设备。GET /api/v2/buckets
:获取INFLUX_BUCKET
的存储桶 ID。POST /api/v2/authorizations
:为设备创建授权。POST /api/v2/write
:将设备授权写入INFLUX_BUCKET_AUTH
。
为设备创建授权
在本节中,您将创建一个具有对 INFLUX_BUCKET
的读取-写入权限的授权,并接收设备的 API 令牌。以下示例使用以下步骤创建授权:
- 使用配置实例化
AuthorizationsAPI
客户端和BucketsAPI
客户端。 - 检索存储桶 ID。
- 使用客户端库向
/api/v2/authorizations
InfluxDB API 端点发送POST
请求。
创建一个 ./api/devices.py
文件,其中包含以下内容:
# Import the dependencies.
import configparser
from datetime import datetime
from uuid import uuid4
# Import client library classes.
from influxdb_client import Authorization, InfluxDBClient, Permission, PermissionResource, Point, WriteOptions
from influxdb_client.client.authorizations_api import AuthorizationsApi
from influxdb_client.client.bucket_api import BucketsApi
from influxdb_client.client.query_api import QueryApi
from influxdb_client.client.write_api import SYNCHRONOUS
from api.sensor import Sensor
# Get the configuration key-value pairs.
config = configparser.ConfigParser()
config.read('config.ini')
def create_authorization(device_id) -> Authorization:
influxdb_client = InfluxDBClient(url=config.get('APP', 'INFLUX_URL'),
token=os.environ.get('INFLUX_TOKEN'),
org=os.environ.get('INFLUX_ORG'))
authorization_api = AuthorizationsApi(influxdb_client)
# get bucket_id from bucket
buckets_api = BucketsApi(influxdb_client)
buckets = buckets_api.find_bucket_by_name(config.get('APP', 'INFLUX_BUCKET')) # function returns only 1 bucket
bucket_id = buckets.id
org_id = buckets.org_id
desc_prefix = f'IoTCenterDevice: {device_id}'
org_resource = PermissionResource(org_id=org_id, id=bucket_id, type="buckets")
read = Permission(action="read", resource=org_resource)
write = Permission(action="write", resource=org_resource)
permissions = [read, write]
authorization = Authorization(org_id=org_id, permissions=permissions, description=desc_prefix)
request = authorization_api.create_authorization(authorization=authorization)
return request
要创建具有对 INFLUX_BUCKET
的读取-写入权限的授权,您需要存储桶 ID。要检索存储桶 ID,create_authorization(deviceId)
调用 BucketsAPI find_bucket_by_name
函数,该函数向 /api/v2/buckets
InfluxDB API 端点发送 GET
请求。然后,create_authorization(deviceId)
在请求正文中传递新授权,其中包含以下内容:
- 存储桶 ID。
- 组织 ID。
- 描述:
IoTCenterDevice: DEVICE_ID
。 - 存储桶的权限列表。
要了解有关 API 令牌和授权的更多信息,请参阅 管理 API 令牌。
接下来,将设备授权写入存储桶。
将设备授权写入存储桶
在 InfluxDB 中进行设备授权后,为设备和授权详细信息写入一个点到 INFLUX_BUCKET_AUTH
。将设备授权存储在存储桶中允许您执行以下操作:
- 报告设备授权历史记录。
- 管理带有和不带有令牌的设备。
- 将同一令牌分配给多个设备。
- 刷新令牌。
要将点写入 InfluxDB,请使用 InfluxDB 客户端库向 /api/v2/write
InfluxDB API 端点发送 POST
请求。在 ./api/devices.py
中,添加以下 create_device(device_id)
函数:
def create_device(device_id=None):
influxdb_client = InfluxDBClient(url=config.get('APP', 'INFLUX_URL'),
token=config.get('APP', 'INFLUX_TOKEN'),
org=config.get('APP', 'INFLUX_ORG'))
if device_id is None:
device_id = str(uuid4())
write_api = influxdb_client.write_api(write_options=SYNCHRONOUS)
point = Point('deviceauth') \
.tag("deviceId", device_id) \
.field('key', f'fake_auth_id_{device_id}') \
.field('token', f'fake_auth_token_{device_id}')
client_response = write_api.write(bucket=config.get('APP', 'INFLUX_BUCKET_AUTH'), record=point)
# write() returns None on success
if client_response is None:
return device_id
# Return None on failure
return None
create_device(device_id)
接受 device_id
,并在以下步骤中将数据写入 INFLUX_BUCKET_AUTH
:
- 使用来自配置的
url
、token
和org
值初始化InfluxDBClient()
。 - 初始化
WriteAPI
客户端以将数据写入 InfluxDB 存储桶。 - 创建
Point
。 - 使用
write_api.write()
将Point
写入存储桶。 - 检查失败 - 如果写入成功,
write_api
将返回None
。 - 如果成功,则返回
device_id
;否则返回None
。
该函数写入一个包含以下元素的点:
元素 | 名称 | 值 |
---|---|---|
测量 | deviceauth | |
标签 | deviceId | 设备 ID |
字段 | key | 授权 ID |
字段 | token | 授权 (API) 令牌 |
接下来,创建 API 以列出设备。
创建 API 以列出设备
添加 /api/devices
API 端点,该端点检索、处理和列出已注册设备。
创建一个 Flux 查询,该查询获取每个序列的最后一行,其中包含
deviceauth
测量。以下示例查询返回包含key
字段(授权 ID)的行,并排除包含token
字段的行(以避免向 UI 公开令牌)。// Flux query finds devices from(bucket:`${INFLUX_BUCKET_AUTH}`) |> range(start: 0) |> filter(fn: (r) => r._measurement == "deviceauth" and r._field != "token") |> last()
使用
QueryApi
客户端将 Flux 查询发送到POST /api/v2/query
InfluxDB API 端点。在
./api/devices.py
中,添加以下内容:def get_device(device_id=None) -> {}: influxdb_client = InfluxDBClient(url=config.get('APP', 'INFLUX_URL'), token=os.environ.get('INFLUX_TOKEN'), org=os.environ.get('INFLUX_ORG')) # Queries must be formatted with single and double quotes correctly query_api = QueryApi(influxdb_client) device_filter = '' if device_id: device_id = str(device_id) device_filter = f'r.deviceId == "{device_id}" and r._field != "token"' else: device_filter = f'r._field != "token"' flux_query = f'from(bucket: "{config.get("APP", "INFLUX_BUCKET_AUTH")}") ' \ f'|> range(start: 0) ' \ f'|> filter(fn: (r) => r._measurement == "deviceauth" and {device_filter}) ' \ f'|> last()' response = query_api.query(flux_query) result = [] for table in response: for record in table.records: try: 'updatedAt' in record except KeyError: record['updatedAt'] = record.get_time() record[record.get_field()] = record.get_value() result.append(record.values) return result
get_device(device_id)
函数执行以下操作:- 实例化
QueryApi
客户端并将 Flux 查询发送到 InfluxDB。 - 迭代响应中的
FluxTable
,并返回元组列表。
- 实例化
创建 IoT 虚拟设备
创建一个 ./api/sensor.py
文件,该文件生成模拟天气遥测数据。按照 示例代码 创建 IoT 虚拟设备。
接下来,为虚拟设备生成数据,并将数据写入 InfluxDB。
写入遥测数据
在本节中,您将遥测数据写入 InfluxDB 存储桶。要写入数据,请使用 InfluxDB 客户端库向 /api/v2/write
InfluxDB API 端点发送 POST
请求。
以下示例使用以下步骤生成数据,然后将其写入 InfluxDB:
- 初始化
WriteAPI
实例。 - 创建一个
Point
,其中包含environment
测量和温度、湿度、压力、纬度和经度的数据字段。 - 使用
WriteAPI write
方法将点发送到 InfluxDB。
在 ./api/devices.py
中,添加以下 write_measurements(device_id)
函数:
def write_measurements(device_id):
influxdb_client = InfluxDBClient(url=config.get('APP', 'INFLUX_URL'),
token=config.get('APP', 'INFLUX_TOKEN'),
org=config.get('APP', 'INFLUX_ORG'))
write_api = influxdb_client.write_api(write_options=SYNCHRONOUS)
virtual_device = Sensor()
coord = virtual_device.geo()
point = Point("environment") \
.tag("device", device_id) \
.tag("TemperatureSensor", "virtual_bme280") \
.tag("HumiditySensor", "virtual_bme280") \
.tag("PressureSensor", "virtual_bme280") \
.field("Temperature", virtual_device.generate_measurement()) \
.field("Humidity", virtual_device.generate_measurement()) \
.field("Pressure", virtual_device.generate_measurement()) \
.field("Lat", coord['latitude']) \
.field("Lon", coord['latitude']) \
.time(datetime.utcnow())
print(f"Writing: {point.to_line_protocol()}")
client_response = write_api.write(bucket=config.get('APP', 'INFLUX_BUCKET'), record=point)
# write() returns None on success
if client_response is None:
# TODO Maybe also return the data that was written
return device_id
# Return None on failure
return None
查询遥测数据
在本节中,您将从 InfluxDB 存储桶检索遥测数据。要检索数据,请使用 InfluxDB 客户端库向 /api/v2/query
InfluxDB API 端点发送 POST
请求。以下示例使用以下步骤检索和处理遥测数据:
- 在
INFLUX_BUCKET
中查询environment
测量。 - 按
device_id
过滤结果。 - 返回
influxdata/giraffe
UI 库 可以处理的 CSV 数据。
在 ./api/devices.py
中,添加以下 get_measurements(device_id)
函数:
def get_measurements(query):
influxdb_client = InfluxDBClient(url=config.get('APP', 'INFLUX_URL'),
token=os.environ.get('INFLUX_TOKEN'), org=os.environ.get('INFLUX_ORG'))
query_api = QueryApi(influxdb_client)
result = query_api.query_csv(query,
dialect=Dialect(
header=True,
delimiter=",",
comment_prefix="#",
annotations=['group', 'datatype', 'default'],
date_time_format="RFC3339"))
response = ''
for row in result:
response += (',').join(row) + ('\n')
return response
定义 API 响应
在 app.py
中,添加与传入请求匹配的 API 端点,并使用模块的结果进行响应。在以下 /api/devices/<device_id>
路由示例中,app.py
从 GET
和 POST
请求中检索 device_id
,将其传递给 get_device(device_id)
方法,并将结果作为带有 CORS allow-
标头的 JSON 数据返回。
@app.route('/api/devices/<string:device_id>', methods=['GET', 'POST'])
def api_get_device(device_id):
if request.method == "OPTIONS": # CORS preflight
return _build_cors_preflight_response()
return _corsify_actual_response(jsonify(devices.get_device(device_id)))
在您的终端中输入以下命令以重新启动应用程序:
CONTROL+C
停止应用程序。flask run -h localhost -p 3001
启动应用程序。
要从您的 API 检索设备数据,请在您的浏览器中访问 http://localhost:3001/api/devices。
安装并运行 UI
influxdata/iot-api-ui
是一个独立的 Next.js React UI,它使用您的应用程序 API 在 InfluxDB 中写入和查询数据。iot-api-ui
使用 Next.js 重写 将 /api/
路径中的所有请求路由到您的 API。
要安装和运行 UI,请执行以下操作:
在您的
~/iot-api-apps
目录中,克隆influxdata/iot-api-ui
存储库 并进入iot-api-ui
目录 - 例如:cd ~/iot-api-apps git clone git@github.com:influxdata/iot-api-ui.git cd ./iot-app-ui
./.env.development
文件包含您可以编辑或覆盖的默认配置设置(使用./.env.local
文件)。要启动 UI,请在您的终端中输入以下命令:
yarn dev
要查看列表和注册设备,请在您的浏览器中访问 http://localhost:3000/devices。
要了解有关 UI 组件的更多信息,请参阅 influxdata/iot-api-ui
。
此页面是否对您有帮助?
感谢您的反馈!