文档文档

查询 SQL 数据源

Flux sql 程序包提供了用于处理 SQL 数据源的函数。sql.from() 允许您查询 SQL 数据源,例如 PostgreSQLMySQLSnowflakeSQLiteMicrosoft SQL ServerAmazon AthenaGoogle BigQuery,并将结果用于 InfluxDB 仪表盘、任务和其他操作。

如果您刚开始使用 Flux 查询,请查看以下内容

  • Flux 入门,了解 Flux 的概念概述和 Flux 查询的组成部分。
  • 执行查询,了解运行查询的各种方法。

查询 SQL 数据源

要查询 SQL 数据源

  1. 在您的 Flux 查询中导入 sql 程序包
  2. 使用 sql.from() 函数指定驱动程序、数据源名称 (DSN) 和用于从 SQL 数据源查询数据的查询
import "sql"

sql.from(
    driverName: "postgres",
    dataSourceName: "postgresql://user:password@localhost",
    query: "SELECT * FROM example_table",
)
import "sql"

sql.from(
    driverName: "mysql",
    dataSourceName: "user:password@tcp(localhost:3306)/db",
    query: "SELECT * FROM example_table",
)
import "sql"

sql.from(
    driverName: "snowflake",
    dataSourceName: "user:password@account/db/exampleschema?warehouse=wh",
    query: "SELECT * FROM example_table",
)
// NOTE: InfluxDB OSS and InfluxDB Cloud do not have access to
// the local filesystem and cannot query SQLite data sources.
// Use the Flux REPL to query an SQLite data source.

import "sql"

sql.from(
    driverName: "sqlite3",
    dataSourceName: "file:/path/to/test.db?cache=shared&mode=ro",
    query: "SELECT * FROM example_table",
)
import "sql"

sql.from(
    driverName: "sqlserver",
    dataSourceName: "sqlserver://user:password@localhost:1234?database=examplebdb",
    query: "GO SELECT * FROM Example.Table",
)

有关使用 ADO 样式参数向 SQL Server 进行身份验证的信息,请参阅SQL Server ADO 身份验证

import "sql"
sql.from(
    driverName: "awsathena",
    dataSourceName: "s3://myorgqueryresults/?accessID=12ab34cd56ef&region=region-name&secretAccessKey=y0urSup3rs3crEtT0k3n",
    query: "GO SELECT * FROM Example.Table",
)

有关 Athena DSN 中包含的参数的信息,请参阅Athena 连接字符串

import "sql"
sql.from(
    driverName: "bigquery",
    dataSourceName: "bigquery://projectid/?apiKey=mySuP3r5ecR3tAP1K3y",
    query: "SELECT * FROM exampleTable",
)

有关 BigQuery 身份验证的信息,请参阅BigQuery 身份验证参数

有关必需的函数参数的信息,请参阅sql.from() 文档

将 SQL 数据与 InfluxDB 中的数据连接

从 InfluxDB 查询 SQL 数据源的主要好处之一是能够使用存储在 InfluxDB 外部的数据来丰富查询结果。

使用下面的空气传感器示例数据,以下查询将存储在 InfluxDB 中的空气传感器指标与存储在 PostgreSQL 中的传感器信息连接起来。连接后的数据使您可以根据未存储在 InfluxDB 中的传感器信息查询和筛选结果。

// Import the "sql" package
import "sql"

// Query data from PostgreSQL
sensorInfo = sql.from(
    driverName: "postgres",
    dataSourceName: "postgresql://localhost?sslmode=disable",
    query: "SELECT * FROM sensors",
)

// Query data from InfluxDB
sensorMetrics = from(bucket: "example-bucket")
    |> range(start: -1h)
    |> filter(fn: (r) => r._measurement == "airSensors")

// Join InfluxDB query results with PostgreSQL query results
join(tables: {metric: sensorMetrics, info: sensorInfo}, on: ["sensor_id"])

使用 SQL 结果填充仪表盘变量

使用 sql.from() 从 SQL 查询结果创建仪表盘变量。以下示例使用下面的空气传感器示例数据来创建一个变量,让您可以选择传感器的位置。

import "sql"

sql.from(
    driverName: "postgres",
    dataSourceName: "postgresql://localhost?sslmode=disable",
    query: "SELECT * FROM sensors",
)
    |> rename(columns: {location: "_value"})
    |> keep(columns: ["_value"])

使用该变量来操作仪表盘中的查询。

Dashboard variable from SQL query results

使用密钥存储 SQL 数据库凭据

如果您的 SQL 数据库需要身份验证,请使用InfluxDB 密钥来存储和填充连接凭据。默认情况下,InfluxDB 会对密钥进行 base64 编码并将其存储在其内部键值存储 BoltDB 中。为了增加安全性,将密钥存储在 Vault 中

将您的数据库凭据存储为密钥

使用 InfluxDB APIinflux CLI 将您的数据库凭据存储为密钥。

curl --request PATCH http://localhost:8086/api/v2/orgs/<org-id>/secrets \
  --header 'Authorization: Token YOURAUTHTOKEN' \
  --header 'Content-type: application/json' \
  --data '{
  "POSTGRES_HOST": "http://example.com",
  "POSTGRES_USER": "example-username",
  "POSTGRES_PASS": "example-password"
}'

要存储密钥,您需要

# Syntax
influx secret update -k <secret-key>

# Example
influx secret update -k POSTGRES_PASS

出现提示时,输入您的密钥值。

您可以使用 -v--value 标志提供密钥值,但纯文本密钥可能会出现在您的 shell 历史记录中

influx secret update -k <secret-key> -v <secret-value>

在您的查询中使用密钥

导入 influxdata/influxdb/secrets 程序包,并使用字符串插值,以使用 Flux 查询中存储的密钥填充连接凭据。

import "sql"
import "influxdata/influxdb/secrets"

POSTGRES_HOST = secrets.get(key: "POSTGRES_HOST")
POSTGRES_USER = secrets.get(key: "POSTGRES_USER")
POSTGRES_PASS = secrets.get(key: "POSTGRES_PASS")

sql.from(
    driverName: "postgres",
    dataSourceName: "postgresql://${POSTGRES_USER}:${POSTGRES_PASS}@${POSTGRES_HOST}",
    query: "SELECT * FROM sensors",
)

示例传感器数据

空气传感器示例数据示例传感器信息模拟了一组传感器,这些传感器测量建筑物内房间的温度、湿度和一氧化碳。每个收集的数据点都与 sensor_id 标签一起存储在 InfluxDB 中,该标签标识了数据点来自的特定传感器。示例传感器信息存储在 PostgreSQL 中。

示例数据包括

  • 从每个传感器收集并存储在 InfluxDBairSensors 测量中的模拟数据

    • 温度
    • 湿度
    • 一氧化碳
  • 有关存储在 PostgreSQLsensors 表中的每个传感器的信息

    • sensor_id
    • 位置
    • 型号
    • 上次检查时间

下载空气传感器示例数据

  1. 创建一个存储桶来存储数据。

  2. 创建一个 InfluxDB 任务,并使用 sample.data() 函数每 15 分钟下载一次空气传感器示例数据。将下载的示例数据写入您的新存储桶

    import "influxdata/influxdb/sample"
    
    option task = {name: "Collect sample air sensor data", every: 15m}
    
    sample.data(set: "airSensor")
        |> to(org: "example-org", bucket: "example-bucket")
    
  3. 在第一次任务运行后查询您的目标存储桶,以确保示例数据成功写入。

    from(bucket: "example-bucket")
        |> range(start: -1m)
        |> filter(fn: (r) => r._measurement == "airSensors")
    

导入示例传感器信息

  1. 下载并安装 PostgreSQL.

  2. 下载示例传感器信息 CSV。

    下载示例传感器信息

  3. 使用 PostgreSQL 客户端(psql 或 GUI)创建 sensors

    CREATE TABLE sensors (
      sensor_id character varying(50),
      location character varying(50),
      model_number character varying(50),
      last_inspected date
    );
    
  4. 导入下载的 CSV 示例数据。更新 FROM 文件路径为下载的 CSV 示例数据的路径。

    COPY sensors(sensor_id,location,model_number,last_inspected)
    FROM '/path/to/sample-sensor-info.csv' DELIMITER ',' CSV HEADER;
    
  5. 查询表以确保数据已正确导入

    SELECT * FROM sensors;
    

导入示例数据仪表盘

下载并导入空气传感器仪表盘以可视化生成的数据

查看空气传感器仪表盘 JSON

有关导入仪表盘的信息,请参阅创建仪表盘


此页面是否对您有帮助?

感谢您的反馈!


Flux 的未来

Flux 即将进入维护模式。您可以继续像目前一样使用它,而无需对您的代码进行任何更改。

阅读更多

现已全面上市

InfluxDB 3 Core 和 Enterprise

快速启动。更快扩展。

获取更新

InfluxDB 3 Core 是一个开源、高速、近实时数据引擎,可实时收集和处理数据,并将其持久化到本地磁盘或对象存储。InfluxDB 3 Enterprise 构建在 Core 的基础上,增加了高可用性、读取副本、增强的安全性以及数据压缩功能,从而实现更快的查询和优化的存储。InfluxDB 3 Enterprise 的免费层可供非商业家庭或业余爱好者使用。

有关更多信息,请查看