文档文档

实时游戏分数排行榜

如果您还没有运行的 Kapacitor 实例,请查看 入门指南 以在本地启动 Kapacitor。

今天我们是游戏开发者。我们运行了几个游戏服务器,每个服务器运行一个游戏实例,每个游戏大约有百名玩家。

我们需要构建一个排行榜,以便观众可以实时查看玩家的分数。我们还希望有关于领先者的历史数据,以便在赛后进行分析,了解谁领先了多久等。

我们将使用 Kapacitor 的流处理来完成繁重的工作。游戏服务器可以在玩家分数发生变化时发送 UDP 数据包,或者在分数未发生变化的情况下至少每 10 秒发送一次。

设置

下面所有的代码片段都可以在 这里 找到。

我们的首要任务是配置 Kapacitor 以接收分数流。在这种情况下,分数更新过于频繁,无法全部存储在 InfluxDB 中,因此我们将直接将它们发送到 Kapacitor。与 InfluxDB 一样,您可以配置一个 UDP 监听器。将此配置部分添加到您的 Kapacitor 配置文件的末尾。

[[udp]]
    enabled = true
    bind-address = ":9100"
    database = "game"
    retention-policy = "autogen"

此配置告诉 Kapacitor 在端口 9100 上监听 line protocol 格式的 UDP 数据包。它会将传入数据的作用域限定在 game.autogen 数据库和保留策略内。使用添加了此配置的 Kapacitor 启动。

这是一个简单的 bash 脚本,用于生成随机分数数据,以便我们可以在不干扰实际游戏服务器的情况下进行测试。

#!/bin/bash

# default options: can be overridden with corresponding arguments.
host=${1-localhost}
port=${2-9100}
games=${3-10}
players=${4-100}

games=$(seq $games)
players=$(seq $players)
# Spam score updates over UDP
while true
do
    for game in $games
    do
        game="g$game"
        for player in $players
        do
            player="p$player"
            score=$(($RANDOM % 1000))
            echo "scores,player=$player,game=$game value=$score" > /dev/udp/$host/$port
        done
    done
    sleep 0.1
done

将上面的脚本放入一个名为 scores.sh 的文件中并运行它。

chmod +x ./scores.sh
./scores.sh

现在我们正在用我们的虚假分数数据“轰炸”Kapacitor。我们可以让它一直运行,因为 Kapacitor 会在没有任务需要它时丢弃传入的数据。

定义 Kapacitor 任务

排行榜需要做什么?

  1. 获取每个玩家在每个游戏中的最新分数。
  2. 计算每个游戏排名前 X 的玩家分数。
  3. 发布结果。
  4. 存储结果。

要完成第一步,我们需要缓冲传入的流,并返回每个玩家在每个游戏中的最新分数更新。我们的 TICKscript 将如下所示:

var topPlayerScores = stream
    |from()
        .measurement('scores')
        // Get the most recent score for each player per game.
        // Not likely that a player is playing two games but just in case.
        .groupBy('game', 'player')
    |window()
        // keep a buffer of the last 11s of scores
        // just in case a player score hasn't updated in a while
        .period(11s)
        // Emit the current score per player every second.
        .every(1s)
        // Align the window boundaries to be on the second.
        .align()
    |last('value')

将此脚本放入一个名为 top_scores.tick 的文件中。

现在我们的 topPlayerScores 变量包含了每个玩家的最新分数。接下来,要计算每个游戏的最高分数,我们只需要按游戏分组并运行另一个 map reduce 作业。我们保留每个游戏排名前 15 的分数。将以下几行添加到 top_scores.tick 文件中。

// Calculate the top 15 scores per game
var topScores = topPlayerScores
    |groupBy('game')
    |top(15, 'last', 'player')

topScores 变量现在包含了每个游戏排名前 15 名玩家的分数。构建排行榜所需的一切都已准备就绪。Kapacitor 可以通过 HTTPOutNode 通过 HTTP 公开分数。我们将任务命名为 top_scores;通过以下添加,最新的分数将可以在 https://:9092/kapacitor/v1/tasks/top_scores/top_scores 处获得。

// Expose top scores over the HTTP API at the 'top_scores' endpoint.
// Now your app can just request the top scores from Kapacitor
// and always get the most recent result.
//
// https://:9092/kapacitor/v1/tasks/top_scores/top_scores
topScores
   |httpOut('top_scores')

最后,我们希望随着时间的推移存储最高分数,以便进行深入分析以确保最佳游戏玩法。但我们不想每秒都存储分数,因为数据量仍然太大。首先,我们将对数据进行采样,每 10 秒存储一次分数。此外,让我们提前做一些基本的分析,因为我们已经有了所有数据的流。目前,我们只会做基本的差距分析,即存储最高玩家和第 15 名玩家之间的差距。将以下几行添加到 top_scores.tick 以完成我们的任务。

// Sample the top scores and keep a score once every 10s
var topScoresSampled = topScores
    |sample(10s)

// Store top fifteen player scores in InfluxDB.
topScoresSampled
    |influxDBOut()
        .database('game')
        .measurement('top_scores')

// Calculate the max and min of the top scores.
var max = topScoresSampled
    |max('top')

var min = topScoresSampled
    |min('top')

// Join the max and min streams back together and calculate the gap.
max
    |join(min)
        .as('max', 'min')
    // Calculate the difference between the max and min scores.
    // Rename the max and min fields to more friendly names 'topFirst', 'topLast'.
    |eval(lambda: "max.max" - "min.min", lambda: "max.max", lambda: "min.min")
        .as('gap', 'topFirst', 'topLast')
    // Store the fields: gap, topFirst and topLast in InfluxDB.
    |influxDBOut()
        .database('game')
        .measurement('top_scores_gap')

由于我们将数据写回 InfluxDB,请创建一个名为 game 的数据库来存储我们的结果。

curl -G 'https://:8086/query?' --data-urlencode 'q=CREATE DATABASE game'

如果您不想复制粘贴太多,这里是完整的任务 TICKscript:

dbrp "game"."autogen"

// Define a result that contains the most recent score per player.
var topPlayerScores = stream
    |from()
        .measurement('scores')
        // Get the most recent score for each player per game.
        // Not likely that a player is playing two games but just in case.
        .groupBy('game', 'player')
    |window()
        // keep a buffer of the last 11s of scores
        // just in case a player score hasn't updated in a while
        .period(11s)
        // Emit the current score per player every second.
        .every(1s)
        // Align the window boundaries to be on the second.
        .align()
    |last('value')

// Calculate the top 15 scores per game
var topScores = topPlayerScores
    |groupBy('game')
    |top(15, 'last', 'player')

// Expose top scores over the HTTP API at the 'top_scores' endpoint.
// Now your app can just request the top scores from Kapacitor
// and always get the most recent result.
//
// https://:9092/kapacitor/v1/tasks/top_scores/top_scores
topScores
   |httpOut('top_scores')

// Sample the top scores and keep a score once every 10s
var topScoresSampled = topScores
    |sample(10s)

// Store top fifteen player scores in InfluxDB.
topScoresSampled
    |influxDBOut()
        .database('game')
        .measurement('top_scores')

// Calculate the max and min of the top scores.
var max = topScoresSampled
    |max('top')

var min = topScoresSampled
    |min('top')

// Join the max and min streams back together and calculate the gap.
max
    |join(min)
        .as('max', 'min')
    // calculate the difference between the max and min scores.
    |eval(lambda: "max.max" - "min.min", lambda: "max.max", lambda: "min.min")
        .as('gap', 'topFirst', 'topLast')
    // store the fields: gap, topFirst, and topLast in InfluxDB.
    |influxDBOut()
        .database('game')
        .measurement('top_scores_gap')

定义并启用我们的任务以使其生效。

kapacitor define top_scores -tick top_scores.tick
kapacitor enable top_scores

首先,让我们检查 HTTP 输出是否正常工作。

curl 'https://:9092/kapacitor/v1/tasks/top_scores/top_scores'

您应该会看到一个 JSON 格式的结果,其中包含每个游戏排名前 15 的玩家及其分数。多次访问该端点,您会看到分数每秒更新一次。

现在,让我们检查 InfluxDB 以查看我们的历史数据。

curl \
    -G 'https://:8086/query?db=game' \
    --data-urlencode 'q=SELECT * FROM top_scores  WHERE time > now() - 5m GROUP BY game'

curl \
    -G 'https://:8086/query?db=game' \
    --data-urlencode 'q=SELECT * FROM top_scores_gap WHERE time > now() - 5m GROUP BY game'

太棒了!繁重的工作已经完成。剩下的是配置游戏服务器向 Kapacitor 发送分数更新,并更新观众仪表板以从 Kapacitor 拉取分数。


此页面是否有帮助?

感谢您的反馈!


InfluxDB 3.8 新特性

InfluxDB 3.8 和 InfluxDB 3 Explorer 1.6 的主要增强功能。

查看博客文章

InfluxDB 3.8 现已适用于 Core 和 Enterprise 版本,同时发布了 InfluxDB 3 Explorer UI 的 1.6 版本。本次发布着重于操作成熟度,以及如何更轻松地部署、管理和可靠地运行 InfluxDB。

更多信息,请查看

InfluxDB Docker 的 latest 标签将指向 InfluxDB 3 Core

在 **2026 年 2 月 3 日**,InfluxDB Docker 镜像的 latest 标签将指向 InfluxDB 3 Core。为避免意外升级,请在您的 Docker 部署中使用特定的版本标签。

如果使用 Docker 来安装和运行 InfluxDB,latest 标签将指向 InfluxDB 3 Core。为避免意外升级,请在您的 Docker 部署中使用特定的版本标签。例如,如果使用 Docker 运行 InfluxDB v2,请将 latest 版本标签替换为 Docker pull 命令中的特定版本标签 — 例如

docker pull influxdb:2