查询 SQL 数据源
Flux sql
程序包提供了用于处理 SQL 数据源的函数。sql.from()
允许您查询 SQL 数据源,例如 PostgreSQL、MySQL、Snowflake、SQLite、Microsoft SQL Server、Amazon Athena 和 Google BigQuery,并将结果用于 InfluxDB 仪表盘、任务和其他操作。
如果您刚开始使用 Flux 查询,请查看以下内容
查询 SQL 数据源
要查询 SQL 数据源
- 在您的 Flux 查询中导入
sql
程序包 - 使用
sql.from()
函数指定驱动程序、数据源名称 (DSN) 和用于从 SQL 数据源查询数据的查询
import "sql"
sql.from(
driverName: "postgres",
dataSourceName: "postgresql://user:password@localhost",
query: "SELECT * FROM example_table",
)
import "sql"
sql.from(
driverName: "mysql",
dataSourceName: "user:password@tcp(localhost:3306)/db",
query: "SELECT * FROM example_table",
)
import "sql"
sql.from(
driverName: "snowflake",
dataSourceName: "user:password@account/db/exampleschema?warehouse=wh",
query: "SELECT * FROM example_table",
)
// NOTE: InfluxDB OSS and InfluxDB Cloud do not have access to
// the local filesystem and cannot query SQLite data sources.
// Use the Flux REPL to query an SQLite data source.
import "sql"
sql.from(
driverName: "sqlite3",
dataSourceName: "file:/path/to/test.db?cache=shared&mode=ro",
query: "SELECT * FROM example_table",
)
import "sql"
sql.from(
driverName: "sqlserver",
dataSourceName: "sqlserver://user:password@localhost:1234?database=examplebdb",
query: "GO SELECT * FROM Example.Table",
)
有关使用 ADO 样式参数向 SQL Server 进行身份验证的信息,请参阅SQL Server ADO 身份验证。
import "sql"
sql.from(
driverName: "awsathena",
dataSourceName: "s3://myorgqueryresults/?accessID=12ab34cd56ef®ion=region-name&secretAccessKey=y0urSup3rs3crEtT0k3n",
query: "GO SELECT * FROM Example.Table",
)
有关 Athena DSN 中包含的参数的信息,请参阅Athena 连接字符串。
import "sql"
sql.from(
driverName: "bigquery",
dataSourceName: "bigquery://projectid/?apiKey=mySuP3r5ecR3tAP1K3y",
query: "SELECT * FROM exampleTable",
)
有关 BigQuery 身份验证的信息,请参阅BigQuery 身份验证参数。
有关必需的函数参数的信息,请参阅sql.from()
文档。
将 SQL 数据与 InfluxDB 中的数据连接
从 InfluxDB 查询 SQL 数据源的主要好处之一是能够使用存储在 InfluxDB 外部的数据来丰富查询结果。
使用下面的空气传感器示例数据,以下查询将存储在 InfluxDB 中的空气传感器指标与存储在 PostgreSQL 中的传感器信息连接起来。连接后的数据使您可以根据未存储在 InfluxDB 中的传感器信息查询和筛选结果。
// Import the "sql" package
import "sql"
// Query data from PostgreSQL
sensorInfo = sql.from(
driverName: "postgres",
dataSourceName: "postgresql://localhost?sslmode=disable",
query: "SELECT * FROM sensors",
)
// Query data from InfluxDB
sensorMetrics = from(bucket: "example-bucket")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "airSensors")
// Join InfluxDB query results with PostgreSQL query results
join(tables: {metric: sensorMetrics, info: sensorInfo}, on: ["sensor_id"])
使用 SQL 结果填充仪表盘变量
使用 sql.from()
从 SQL 查询结果创建仪表盘变量。以下示例使用下面的空气传感器示例数据来创建一个变量,让您可以选择传感器的位置。
import "sql"
sql.from(
driverName: "postgres",
dataSourceName: "postgresql://localhost?sslmode=disable",
query: "SELECT * FROM sensors",
)
|> rename(columns: {location: "_value"})
|> keep(columns: ["_value"])
使用该变量来操作仪表盘中的查询。

使用密钥存储 SQL 数据库凭据
如果您的 SQL 数据库需要身份验证,请使用InfluxDB 密钥来存储和填充连接凭据。默认情况下,InfluxDB 会对密钥进行 base64 编码并将其存储在其内部键值存储 BoltDB 中。为了增加安全性,将密钥存储在 Vault 中。
将您的数据库凭据存储为密钥
使用 InfluxDB API 或 influx
CLI 将您的数据库凭据存储为密钥。
curl --request PATCH http://localhost:8086/api/v2/orgs/<org-id>/secrets \
--header 'Authorization: Token YOURAUTHTOKEN' \
--header 'Content-type: application/json' \
--data '{
"POSTGRES_HOST": "http://example.com",
"POSTGRES_USER": "example-username",
"POSTGRES_PASS": "example-password"
}'
要存储密钥,您需要
# Syntax
influx secret update -k <secret-key>
# Example
influx secret update -k POSTGRES_PASS
出现提示时,输入您的密钥值。
您可以使用 -v
、--value
标志提供密钥值,但纯文本密钥可能会出现在您的 shell 历史记录中。
influx secret update -k <secret-key> -v <secret-value>
在您的查询中使用密钥
导入 influxdata/influxdb/secrets
程序包,并使用字符串插值,以使用 Flux 查询中存储的密钥填充连接凭据。
import "sql"
import "influxdata/influxdb/secrets"
POSTGRES_HOST = secrets.get(key: "POSTGRES_HOST")
POSTGRES_USER = secrets.get(key: "POSTGRES_USER")
POSTGRES_PASS = secrets.get(key: "POSTGRES_PASS")
sql.from(
driverName: "postgres",
dataSourceName: "postgresql://${POSTGRES_USER}:${POSTGRES_PASS}@${POSTGRES_HOST}",
query: "SELECT * FROM sensors",
)
示例传感器数据
空气传感器示例数据和示例传感器信息模拟了一组传感器,这些传感器测量建筑物内房间的温度、湿度和一氧化碳。每个收集的数据点都与 sensor_id
标签一起存储在 InfluxDB 中,该标签标识了数据点来自的特定传感器。示例传感器信息存储在 PostgreSQL 中。
示例数据包括
从每个传感器收集并存储在 InfluxDB 的
airSensors
测量中的模拟数据- 温度
- 湿度
- 一氧化碳
有关存储在 PostgreSQL 的
sensors
表中的每个传感器的信息- sensor_id
- 位置
- 型号
- 上次检查时间
下载空气传感器示例数据
创建一个存储桶来存储数据。
创建一个 InfluxDB 任务,并使用
sample.data()
函数每 15 分钟下载一次空气传感器示例数据。将下载的示例数据写入您的新存储桶import "influxdata/influxdb/sample" option task = {name: "Collect sample air sensor data", every: 15m} sample.data(set: "airSensor") |> to(org: "example-org", bucket: "example-bucket")
在第一次任务运行后查询您的目标存储桶,以确保示例数据成功写入。
from(bucket: "example-bucket") |> range(start: -1m) |> filter(fn: (r) => r._measurement == "airSensors")
导入示例传感器信息
下载示例传感器信息 CSV。
使用 PostgreSQL 客户端(
psql
或 GUI)创建sensors
表CREATE TABLE sensors ( sensor_id character varying(50), location character varying(50), model_number character varying(50), last_inspected date );
导入下载的 CSV 示例数据。更新
FROM
文件路径为下载的 CSV 示例数据的路径。COPY sensors(sensor_id,location,model_number,last_inspected) FROM '/path/to/sample-sensor-info.csv' DELIMITER ',' CSV HEADER;
查询表以确保数据已正确导入
SELECT * FROM sensors;
导入示例数据仪表盘
下载并导入空气传感器仪表盘以可视化生成的数据
有关导入仪表盘的信息,请参阅创建仪表盘。
此页面是否对您有帮助?
感谢您的反馈!