将InfluxDB Cloud中的数据迁移到InfluxDB Clustered
要将InfluxDB Cloud(TSM)中的数据迁移到由v3存储引擎提供的InfluxDB Clustered,请按时间批次从您的TSM支持的桶中查询数据,并将查询到的数据写入InfluxDB集群中的数据库。由于完整的数据迁移可能会超过您的InfluxDB Cloud组织限制和可调整配额,因此请分批迁移数据。
以下指南提供了设置InfluxDB任务以按时间批次从InfluxDB Cloud TSM支持的桶中查询数据并将每个批次写入另一个组织中的InfluxDB Clustered(InfluxDB v3)数据库的说明。
所有查询请求都受InfluxDB Cloud组织速率限制和可调整配额的约束。
迁移前
在您从InfluxDB Cloud(TSM)迁移到InfluxDB Clustered之前,有一些由TSM存储引擎支持的架构设计实践,这些实践在InfluxDB v3存储引擎中不受支持。具体来说,InfluxDB v3强制执行以下架构限制
- 您不能使用重复的名称来标记和字段。
- 默认情况下,度量值可以包含多达250个列,其中每个列代表时间、字段或标记。
有关更多信息,请参阅架构限制。
如果您的架构不遵循这些限制,您必须在迁移到InfluxDB Clustered之前更新您的架构。
设置迁移
迁移过程需要在您的源InfluxDB组织中设置两个桶:一个用于存储要迁移的数据,另一个用于存储迁移元数据。如果您使用的是InfluxDB Cloud Free Plan,并且有多个桶要迁移,您将超过您的计划桶限制。要迁移多个桶,您需要升级到基于使用的计划以完成迁移。
您正在迁移数据到的InfluxDB集群中:
您正在迁移数据从的InfluxDB Cloud(TSM)组织中:
迁移完成后,每个后续迁移任务执行将失败,并出现以下错误
error exhausting result iterator: error calling function "die" @41:9-41:86:
Batch range is beyond the migration range. Migration is complete.
迁移任务
配置迁移
使用
task.every
选项指定任务运行频率。有关信息,请参阅确定任务间隔。在
migration
记录中定义以下属性migration
- start:包含在迁移中的最早时间。有关信息,请参阅确定迁移开始时间。
- stop:包含在迁移中的最晚时间。
- batchInterval:基于时间的每个批次的持续时间。有关信息,请参阅确定批间隔。
- batchBucket:用于在InfluxDB Cloud(TSM)中存储迁移批次元数据的桶。
- sourceBucket:用于从InfluxDB Cloud(TSM)迁移数据的桶。
- destinationHost:要迁移数据的InfluxDB集群URL。
- destinationOrg:提供一个空字符串(由InfluxDB Clustered忽略)。
- destinationToken:InfluxDB Clustered令牌。为了保持API令牌安全,将其存储为InfluxDB Cloud(TSM)中的秘密。
- destinationDatabase:要迁移数据的InfluxDB Clustered数据库。
迁移Flux脚本
import "array"
import "experimental"
import "date"
import "influxdata/influxdb/secrets"
// Configure the task
option task = {every: 5m, name: "Migrate data from TSM to v3"}
// Configure the migration
migration = {
start: 2023-01-01T00:00:00Z,
stop: 2023-02-01T00:00:00Z,
batchInterval: 1h,
batchBucket: "migration",
sourceBucket: "example-cloud-bucket",
destinationHost: "https://cluster-host.com",
destinationOrg: "",
destinationToken: secrets.get(key: "INFLUXDB_CLUSTERED_TOKEN"),
destinationDatabase: "example-destination-database",
}
// batchRange dynamically returns a record with start and stop properties for
// the current batch. It queries migration metadata stored in the
// `migration.batchBucket` to determine the stop time of the previous batch.
// It uses the previous stop time as the new start time for the current batch
// and adds the `migration.batchInterval` to determine the current batch stop time.
batchRange = () => {
_lastBatchStop =
(from(bucket: migration.batchBucket)
|> range(start: migration.start)
|> filter(fn: (r) => r._field == "batch_stop")
|> filter(fn: (r) => r.dstOrg == migration.destinationOrg)
|> filter(fn: (r) => r.dstBucket == migration.destinationDatabase)
|> last()
|> findRecord(fn: (key) => true, idx: 0))._value
_batchStart =
if exists _lastBatchStop then
time(v: _lastBatchStop)
else
migration.start
return {start: _batchStart, stop: date.add(d: migration.batchInterval, to: _batchStart)}
}
// Define a static record with batch start and stop time properties
batch = batchRange()
// Check to see if the current batch start time is beyond the migration.stop
// time and exit with an error if it is.
finished =
if batch.start >= migration.stop then
die(msg: "Batch range is beyond the migration range. Migration is complete.")
else
"Migration in progress"
// Query all data from the specified source bucket within the batch-defined time
// range. To limit migrated data by measurement, tag, or field, add a `filter()`
// function after `range()` with the appropriate predicate fn.
data = () =>
from(bucket: migration.sourceBucket)
|> range(start: batch.start, stop: batch.stop)
// rowCount is a stream of tables that contains the number of rows returned in
// the batch and is used to generate batch metadata.
rowCount =
data()
|> count()
|> group(columns: ["_start", "_stop"])
|> sum()
// emptyRange is a stream of tables that acts as filler data if the batch is
// empty. This is used to generate batch metadata for empty batches and is
// necessary to correctly increment the time range for the next batch.
emptyRange = array.from(rows: [{_start: batch.start, _stop: batch.stop, _value: 0}])
// metadata returns a stream of tables representing batch metadata.
metadata = () => {
_input =
if exists (rowCount |> findRecord(fn: (key) => true, idx: 0))._value then
rowCount
else
emptyRange
return
_input
|> map(
fn: (r) =>
({
_time: now(),
_measurement: "batches",
srcBucket: migration.sourceBucket,
dstOrg: migration.destinationOrg,
dstBucket: migration.destinationDatabase,
batch_start: string(v: batch.start),
batch_stop: string(v: batch.stop),
rows: r._value,
percent_complete:
float(v: int(v: r._stop) - int(v: migration.start)) / float(
v: int(v: migration.stop) - int(v: migration.start),
) * 100.0,
}),
)
|> group(columns: ["_measurement", "dstOrg", "srcBucket", "dstBucket"])
}
// Write the queried data to the specified InfluxDB OSS bucket.
data()
|> to(
host: migration.destinationHost,
org: migration.destinationOrg,
token: migration.destinationToken,
bucket: migration.destinationDatabase
)
// Generate and store batch metadata in the migration.batchBucket.
metadata()
|> experimental.to(bucket: migration.batchBucket)
配置帮助
解决迁移任务失败的问题
如果迁移任务失败,请查看任务日志以确定特定错误。以下是迁移任务失败的一些常见原因。
超过速率限制
如果您的数据迁移使您超出InfluxDB Cloud组织的限制和配额,任务将返回类似以下错误
too many requests
可能的解决方案:
- 更新您的迁移任务中的
migration.batchInterval
设置,以使用更小的间隔。每个批次将查询更少的数据。
无效的API令牌
如果您添加为 INFLUXDB_CLUSTERED_TOKEN
的 API 令牌没有对您的 InfluxDB 集群数据库的写入访问权限,任务将返回类似于以下错误的错误
unauthorized access
可能的解决方案:
- 确保 API 令牌有对您的 InfluxDB 集群数据库的写入访问权限。
- 生成一个新的 InfluxDB 集群令牌,该令牌具有对您要迁移到的数据库的写入访问权限。然后,使用新令牌更新您的 InfluxDB Cloud(TSM)实例中的
INFLUXDB_CLUSTERED_TOKEN
机密。
查询超时
InfluxDB Cloud 查询超时为 90 秒。如果从批次间隔返回数据需要更长的时间,查询将超时,任务将失败。
可能的解决方案:
- 更新您的迁移任务中的
migration.batchInterval
设置,以使用更小的间隔。每个批次将查询更少的数据,并更快地返回结果。
批量大小过大
如果您的批次大小太大,任务将返回类似于以下错误的错误
internal error: error calling function "metadata" @97:1-97:11: error calling function "findRecord" @67:32-67:69: wrong number of fields
可能的解决方案:
- 更新您的迁移任务中的
migration.batchInterval
设置,以使用更小的间隔并每批检索更少的数据。
这个页面有帮助吗?
感谢您的反馈!