文档说明

将InfluxDB Cloud中的数据迁移到InfluxDB Clustered

要将InfluxDB Cloud(TSM)中的数据迁移到由v3存储引擎提供的InfluxDB Clustered,请按时间批次从您的TSM支持的桶中查询数据,并将查询到的数据写入InfluxDB集群中的数据库。由于完整的数据迁移可能会超过您的InfluxDB Cloud组织限制和可调整配额,因此请分批迁移数据。

以下指南提供了设置InfluxDB任务以按时间批次从InfluxDB Cloud TSM支持的桶中查询数据并将每个批次写入另一个组织中的InfluxDB Clustered(InfluxDB v3)数据库的说明。

所有查询请求都受InfluxDB Cloud组织速率限制和可调整配额的约束。

迁移前

在您从InfluxDB Cloud(TSM)迁移到InfluxDB Clustered之前,有一些由TSM存储引擎支持的架构设计实践,这些实践在InfluxDB v3存储引擎中不受支持。具体来说,InfluxDB v3强制执行以下架构限制

  • 您不能使用重复的名称来标记和字段。
  • 默认情况下,度量值可以包含多达250个列,其中每个列代表时间、字段或标记。

有关更多信息,请参阅架构限制

如果您的架构不遵循这些限制,您必须在迁移到InfluxDB Clustered之前更新您的架构。

修复重复的标签和字段名

修复超过250个总列的测量

设置迁移

迁移过程需要在您的源InfluxDB组织中设置两个桶:一个用于存储要迁移的数据,另一个用于存储迁移元数据。如果您使用的是InfluxDB Cloud Free Plan,并且有多个桶要迁移,您将超过您的计划桶限制。要迁移多个桶,您需要升级到基于使用的计划以完成迁移。

  1. 您正在迁移数据到的InfluxDB集群中:

    1. 创建一个数据库 用于迁移数据
    2. 创建一个数据库令牌,具有对要迁移的数据库的写入访问权限
  2. 您正在迁移数据从的InfluxDB Cloud(TSM)组织中:

    1. 使用密钥INFLUXDB_CLUSTERED_TOKEN添加秘密中创建的InfluxDB Clustered令牌添加为秘密。有关更多信息,请参阅添加秘密

    2. 创建一个桶 用于存储临时迁移元数据

    3. 使用提供的迁移任务创建一个任务。更新必要的迁移配置选项

    4. (可选)设置迁移监控

    5. 保存任务。

      新创建的任务默认启用,因此保存任务时开始数据迁移。

迁移完成后,每个后续迁移任务执行将失败,并出现以下错误

error exhausting result iterator: error calling function "die" @41:9-41:86:
Batch range is beyond the migration range. Migration is complete.

迁移任务

配置迁移

  1. 使用task.every选项指定任务运行频率。有关信息,请参阅确定任务间隔

  2. migration 记录中定义以下属性

    migration
    • start:包含在迁移中的最早时间。有关信息,请参阅确定迁移开始时间
    • stop:包含在迁移中的最晚时间。
    • batchInterval:基于时间的每个批次的持续时间。有关信息,请参阅确定批间隔
    • batchBucket:用于在InfluxDB Cloud(TSM)中存储迁移批次元数据的桶。
    • sourceBucket:用于从InfluxDB Cloud(TSM)迁移数据的桶。
    • destinationHost:要迁移数据的InfluxDB集群URL。
    • destinationOrg提供一个空字符串(由InfluxDB Clustered忽略)。
    • destinationToken:InfluxDB Clustered令牌。为了保持API令牌安全,将其存储为InfluxDB Cloud(TSM)中的秘密。
    • destinationDatabase:要迁移数据的InfluxDB Clustered数据库。

迁移Flux脚本

import "array"
import "experimental"
import "date"
import "influxdata/influxdb/secrets"

// Configure the task
option task = {every: 5m, name: "Migrate data from TSM to v3"}

// Configure the migration
migration = {
    start: 2023-01-01T00:00:00Z,
    stop: 2023-02-01T00:00:00Z,
    batchInterval: 1h,
    batchBucket: "migration",
    sourceBucket: "example-cloud-bucket",
    destinationHost: "https://cluster-host.com",
    destinationOrg: "",
    destinationToken: secrets.get(key: "INFLUXDB_CLUSTERED_TOKEN"),
    destinationDatabase: "example-destination-database",
}

// batchRange dynamically returns a record with start and stop properties for
// the current batch. It queries migration metadata stored in the
// `migration.batchBucket` to determine the stop time of the previous batch.
// It uses the previous stop time as the new start time for the current batch
// and adds the `migration.batchInterval` to determine the current batch stop time.
batchRange = () => {
    _lastBatchStop =
        (from(bucket: migration.batchBucket)
            |> range(start: migration.start)
            |> filter(fn: (r) => r._field == "batch_stop")
            |> filter(fn: (r) => r.dstOrg == migration.destinationOrg)
            |> filter(fn: (r) => r.dstBucket == migration.destinationDatabase)
            |> last()
            |> findRecord(fn: (key) => true, idx: 0))._value
    _batchStart =
        if exists _lastBatchStop then
            time(v: _lastBatchStop)
        else
            migration.start

    return {start: _batchStart, stop: date.add(d: migration.batchInterval, to: _batchStart)}
}

// Define a static record with batch start and stop time properties
batch = batchRange()

// Check to see if the current batch start time is beyond the migration.stop
// time and exit with an error if it is.
finished =
    if batch.start >= migration.stop then
        die(msg: "Batch range is beyond the migration range. Migration is complete.")
    else
        "Migration in progress"

// Query all data from the specified source bucket within the batch-defined time
// range. To limit migrated data by measurement, tag, or field, add a `filter()`
// function after `range()` with the appropriate predicate fn.
data = () =>
    from(bucket: migration.sourceBucket)
        |> range(start: batch.start, stop: batch.stop)

// rowCount is a stream of tables that contains the number of rows returned in
// the batch and is used to generate batch metadata.
rowCount =
    data()
        |> count()
        |> group(columns: ["_start", "_stop"])
        |> sum()

// emptyRange is a stream of tables that acts as filler data if the batch is
// empty. This is used to generate batch metadata for empty batches and is
// necessary to correctly increment the time range for the next batch.
emptyRange = array.from(rows: [{_start: batch.start, _stop: batch.stop, _value: 0}])

// metadata returns a stream of tables representing batch metadata.
metadata = () => {
    _input =
        if exists (rowCount |> findRecord(fn: (key) => true, idx: 0))._value then
            rowCount
        else
            emptyRange

    return
        _input
            |> map(
                fn: (r) =>
                    ({
                        _time: now(),
                        _measurement: "batches",
                        srcBucket: migration.sourceBucket,
                        dstOrg: migration.destinationOrg,
                        dstBucket: migration.destinationDatabase,
                        batch_start: string(v: batch.start),
                        batch_stop: string(v: batch.stop),
                        rows: r._value,
                        percent_complete:
                            float(v: int(v: r._stop) - int(v: migration.start)) / float(
                                    v: int(v: migration.stop) - int(v: migration.start),
                                ) * 100.0,
                    }),
            )
            |> group(columns: ["_measurement", "dstOrg", "srcBucket", "dstBucket"])
}

// Write the queried data to the specified InfluxDB OSS bucket.
data()
    |> to(
        host: migration.destinationHost,
        org: migration.destinationOrg,
        token: migration.destinationToken,
        bucket: migration.destinationDatabase
    )

// Generate and store batch metadata in the migration.batchBucket.
metadata()
    |> experimental.to(bucket: migration.batchBucket)

配置帮助

确定任务间隔

确定迁移开始时间

确定批量间隔

解决迁移任务失败的问题

如果迁移任务失败,请查看任务日志以确定特定错误。以下是迁移任务失败的一些常见原因。

超过速率限制

如果您的数据迁移使您超出InfluxDB Cloud组织的限制和配额,任务将返回类似以下错误

too many requests

可能的解决方案:

  • 更新您的迁移任务中的 migration.batchInterval 设置,以使用更小的间隔。每个批次将查询更少的数据。

无效的API令牌

如果您添加为 INFLUXDB_CLUSTERED_TOKEN 的 API 令牌没有对您的 InfluxDB 集群数据库的写入访问权限,任务将返回类似于以下错误的错误

unauthorized access

可能的解决方案:

  • 确保 API 令牌有对您的 InfluxDB 集群数据库的写入访问权限。
  • 生成一个新的 InfluxDB 集群令牌,该令牌具有对您要迁移到的数据库的写入访问权限。然后,使用新令牌更新您的 InfluxDB Cloud(TSM)实例中的 INFLUXDB_CLUSTERED_TOKEN 机密。

查询超时

InfluxDB Cloud 查询超时为 90 秒。如果从批次间隔返回数据需要更长的时间,查询将超时,任务将失败。

可能的解决方案:

  • 更新您的迁移任务中的 migration.batchInterval 设置,以使用更小的间隔。每个批次将查询更少的数据,并更快地返回结果。

批量大小过大

如果您的批次大小太大,任务将返回类似于以下错误的错误

internal error: error calling function "metadata" @97:1-97:11: error calling function "findRecord" @67:32-67:69: wrong number of fields

可能的解决方案:

  • 更新您的迁移任务中的 migration.batchInterval 设置,以使用更小的间隔并每批检索更少的数据。

这个页面有帮助吗?

感谢您的反馈!


Flux 的未来

Flux 正在进入维护模式。您可以在不更改代码的情况下继续像现在一样使用它。

了解更多

InfluxDB v3 增强功能和 InfluxDB 集群现在已普遍可用

包括更快的查询性能和管理工具在内的新功能推动了 InfluxDB v3 产品线的进步。InfluxDB 集群现在已普遍可用。

InfluxDB v3 性能和功能

InfluxDB v3 产品线在查询性能方面取得了重大改进,并提供了新的管理工具。这些增强包括用于监视您的 InfluxDB 集群健康状况的操作仪表板、InfluxDB Cloud Dedicated 中的单点登录(SSO)支持以及用于令牌和数据库的新管理 API。

了解 v3 的新增强功能


InfluxDB 集群普遍可用

InfluxDB 集群现在已普遍可用,并在您自行管理的堆栈中提供了 InfluxDB v3 的功能。

与我们谈论 InfluxDB 集群