文档文档

实时游戏分数排行榜

如果您没有运行 Kapacitor 实例,请查看入门指南,以在 localhost 上启动并运行 Kapacitor。

今天我们是游戏开发者。我们托管了几个游戏服务器,每个服务器都运行着游戏代码的实例,每个游戏大约有 100 名玩家。

我们需要构建一个排行榜,以便观众可以实时查看玩家的分数。我们还希望拥有关于领导者的历史数据,以便对谁领先了多久等进行赛后分析。

我们将使用 Kapacitor 的流处理来为我们完成繁重的工作。游戏服务器可以在玩家分数更改时或至少每 10 秒(如果分数没有更改)发送 UDP 数据包。

设置

以下所有代码片段都可以在这里找到

我们的首要任务是配置 Kapacitor 以接收分数流。在这种情况下,分数更新过于频繁,无法将所有分数都存储在 InfluxDB 中,因此我们将直接将它们发送到 Kapacitor。与 InfluxDB 一样,您可以配置 UDP 监听器。将此配置部分添加到您的 Kapacitor 配置文件的末尾。

[[udp]]
    enabled = true
    bind-address = ":9100"
    database = "game"
    retention-policy = "autogen"

此配置告诉 Kapacitor 监听端口 9100 以接收线路协议格式的 UDP 数据包。它会将传入数据的作用域限定为 game.autogen 数据库和保留策略。启动 Kapacitor 并添加该配置。

这是一个简单的 bash 脚本,用于生成随机分数数据,以便我们可以在不干扰真实游戏服务器的情况下对其进行测试。

#!/bin/bash

# default options: can be overridden with corresponding arguments.
host=${1-localhost}
port=${2-9100}
games=${3-10}
players=${4-100}

games=$(seq $games)
players=$(seq $players)
# Spam score updates over UDP
while true
do
    for game in $games
    do
        game="g$game"
        for player in $players
        do
            player="p$player"
            score=$(($RANDOM % 1000))
            echo "scores,player=$player,game=$game value=$score" > /dev/udp/$host/$port
        done
    done
    sleep 0.1
done

将上面的脚本放入文件 scores.sh 并运行它

chmod +x ./scores.sh
./scores.sh

现在我们正在用虚假分数数据垃圾邮件 Kapacitor。我们可以让它保持运行,因为 Kapacitor 将丢弃传入的数据,直到它有任务需要它。

定义 Kapacitor 任务

排行榜需要做什么?

  1. 获取每个游戏每个玩家的最新分数。
  2. 计算每个游戏的前 X 名玩家分数。
  3. 发布结果。
  4. 存储结果。

要完成第一步,我们需要缓冲传入的流并返回每个游戏每个玩家的最新分数更新。我们的TICKscript 将如下所示

var topPlayerScores = stream
    |from()
        .measurement('scores')
        // Get the most recent score for each player per game.
        // Not likely that a player is playing two games but just in case.
        .groupBy('game', 'player')
    |window()
        // keep a buffer of the last 11s of scores
        // just in case a player score hasn't updated in a while
        .period(11s)
        // Emit the current score per player every second.
        .every(1s)
        // Align the window boundaries to be on the second.
        .align()
    |last('value')

将此脚本放在名为 top_scores.tick 的文件中。

现在我们的 topPlayerScores 变量包含每个玩家的最新分数。接下来,要计算每个游戏的最高分数,我们只需要按游戏分组并运行另一个 map reduce 作业。让我们保留每个游戏的前 15 个分数。将以下行添加到 top_scores.tick 文件中。

// Calculate the top 15 scores per game
var topScores = topPlayerScores
    |groupBy('game')
    |top(15, 'last', 'player')

现在 topScores 变量包含每个游戏的前 15 名玩家的分数。这就是我们需要构建排行榜的全部内容。Kapacitor 可以通过HTTPOutNode 通过 HTTP 公开分数。我们将我们的任务称为 top_scores;通过以下添加,最新的分数将在 http://localhost:9092/kapacitor/v1/tasks/top_scores/top_scores 上可用。

// Expose top scores over the HTTP API at the 'top_scores' endpoint.
// Now your app can just request the top scores from Kapacitor
// and always get the most recent result.
//
// http://localhost:9092/kapacitor/v1/tasks/top_scores/top_scores
topScores
   |httpOut('top_scores')

最后,我们希望随着时间的推移存储最高分数,以便我们可以进行深入分析,以确保最佳游戏体验。但是我们不想每秒都存储分数,因为数据仍然太多。首先,我们将对数据进行采样,并且仅每 10 秒存储一次分数。此外,由于我们已经拥有所有数据的流,因此让我们提前进行一些基本分析。目前,我们将仅进行基本差距分析,其中我们将存储最高分玩家和第 15 名玩家之间的差距。将以下行添加到 top_scores.tick 以完成我们的任务。

// Sample the top scores and keep a score once every 10s
var topScoresSampled = topScores
    |sample(10s)

// Store top fifteen player scores in InfluxDB.
topScoresSampled
    |influxDBOut()
        .database('game')
        .measurement('top_scores')

// Calculate the max and min of the top scores.
var max = topScoresSampled
    |max('top')

var min = topScoresSampled
    |min('top')

// Join the max and min streams back together and calculate the gap.
max
    |join(min)
        .as('max', 'min')
    // Calculate the difference between the max and min scores.
    // Rename the max and min fields to more friendly names 'topFirst', 'topLast'.
    |eval(lambda: "max.max" - "min.min", lambda: "max.max", lambda: "min.min")
        .as('gap', 'topFirst', 'topLast')
    // Store the fields: gap, topFirst and topLast in InfluxDB.
    |influxDBOut()
        .database('game')
        .measurement('top_scores_gap')

由于我们将数据写回 InfluxDB,因此为我们的结果创建一个数据库 game

curl -G 'http://localhost:8086/query?' --data-urlencode 'q=CREATE DATABASE game'

如果您不想复制粘贴那么多,这是完整的任务 TICKscript :)

dbrp "game"."autogen"

// Define a result that contains the most recent score per player.
var topPlayerScores = stream
    |from()
        .measurement('scores')
        // Get the most recent score for each player per game.
        // Not likely that a player is playing two games but just in case.
        .groupBy('game', 'player')
    |window()
        // keep a buffer of the last 11s of scores
        // just in case a player score hasn't updated in a while
        .period(11s)
        // Emit the current score per player every second.
        .every(1s)
        // Align the window boundaries to be on the second.
        .align()
    |last('value')

// Calculate the top 15 scores per game
var topScores = topPlayerScores
    |groupBy('game')
    |top(15, 'last', 'player')

// Expose top scores over the HTTP API at the 'top_scores' endpoint.
// Now your app can just request the top scores from Kapacitor
// and always get the most recent result.
//
// http://localhost:9092/kapacitor/v1/tasks/top_scores/top_scores
topScores
   |httpOut('top_scores')

// Sample the top scores and keep a score once every 10s
var topScoresSampled = topScores
    |sample(10s)

// Store top fifteen player scores in InfluxDB.
topScoresSampled
    |influxDBOut()
        .database('game')
        .measurement('top_scores')

// Calculate the max and min of the top scores.
var max = topScoresSampled
    |max('top')

var min = topScoresSampled
    |min('top')

// Join the max and min streams back together and calculate the gap.
max
    |join(min)
        .as('max', 'min')
    // calculate the difference between the max and min scores.
    |eval(lambda: "max.max" - "min.min", lambda: "max.max", lambda: "min.min")
        .as('gap', 'topFirst', 'topLast')
    // store the fields: gap, topFirst, and topLast in InfluxDB.
    |influxDBOut()
        .database('game')
        .measurement('top_scores_gap')

定义并启用我们的任务以查看其运行情况

kapacitor define top_scores -tick top_scores.tick
kapacitor enable top_scores

首先,让我们检查 HTTP 输出是否正常工作。

curl 'http://localhost:9092/kapacitor/v1/tasks/top_scores/top_scores'

您应该获得每个游戏前 15 名玩家及其分数的 JSON 结果。多次点击端点以查看分数是否每秒更新一次。

现在,让我们检查 InfluxDB 以查看我们的历史数据。

curl \
    -G 'http://localhost:8086/query?db=game' \
    --data-urlencode 'q=SELECT * FROM top_scores  WHERE time > now() - 5m GROUP BY game'

curl \
    -G 'http://localhost:8086/query?db=game' \
    --data-urlencode 'q=SELECT * FROM top_scores_gap WHERE time > now() - 5m GROUP BY game'

太棒了!艰苦的工作已经完成。剩下的就是配置游戏服务器以将分数更新发送到 Kapacitor,并更新观众仪表板以从 Kapacitor 拉取分数。


此页面是否对您有帮助?

感谢您的反馈!


Flux 的未来

Flux 即将进入维护模式。您可以继续像现在这样使用它,而无需对代码进行任何更改。

阅读更多

现已正式发布

InfluxDB 3 Core 和 Enterprise

快速启动。更快扩展。

获取更新

InfluxDB 3 Core 是一款开源、高速、近实时数据引擎,可实时收集和处理数据,并将其持久化到本地磁盘或对象存储。InfluxDB 3 Enterprise 构建于 Core 的基础上,增加了高可用性、读取副本、增强的安全性以及数据压缩,从而实现更快的查询和优化的存储。InfluxDB 3 Enterprise 的免费层可供非商业家庭或业余爱好者使用。

有关更多信息,请查看