实时游戏分数排行榜
如果您没有运行 Kapacitor 实例,请查看入门指南,以在 localhost 上启动并运行 Kapacitor。
今天我们是游戏开发者。我们托管了几个游戏服务器,每个服务器都运行着游戏代码的实例,每个游戏大约有 100 名玩家。
我们需要构建一个排行榜,以便观众可以实时查看玩家的分数。我们还希望拥有关于领导者的历史数据,以便对谁领先了多久等进行赛后分析。
我们将使用 Kapacitor 的流处理来为我们完成繁重的工作。游戏服务器可以在玩家分数更改时或至少每 10 秒(如果分数没有更改)发送 UDP 数据包。
设置
以下所有代码片段都可以在这里找到
我们的首要任务是配置 Kapacitor 以接收分数流。在这种情况下,分数更新过于频繁,无法将所有分数都存储在 InfluxDB 中,因此我们将直接将它们发送到 Kapacitor。与 InfluxDB 一样,您可以配置 UDP 监听器。将此配置部分添加到您的 Kapacitor 配置文件的末尾。
[[udp]]
enabled = true
bind-address = ":9100"
database = "game"
retention-policy = "autogen"
此配置告诉 Kapacitor 监听端口 9100
以接收线路协议格式的 UDP 数据包。它会将传入数据的作用域限定为 game.autogen
数据库和保留策略。启动 Kapacitor 并添加该配置。
这是一个简单的 bash 脚本,用于生成随机分数数据,以便我们可以在不干扰真实游戏服务器的情况下对其进行测试。
#!/bin/bash
# default options: can be overridden with corresponding arguments.
host=${1-localhost}
port=${2-9100}
games=${3-10}
players=${4-100}
games=$(seq $games)
players=$(seq $players)
# Spam score updates over UDP
while true
do
for game in $games
do
game="g$game"
for player in $players
do
player="p$player"
score=$(($RANDOM % 1000))
echo "scores,player=$player,game=$game value=$score" > /dev/udp/$host/$port
done
done
sleep 0.1
done
将上面的脚本放入文件 scores.sh
并运行它
chmod +x ./scores.sh
./scores.sh
现在我们正在用虚假分数数据垃圾邮件 Kapacitor。我们可以让它保持运行,因为 Kapacitor 将丢弃传入的数据,直到它有任务需要它。
定义 Kapacitor 任务
排行榜需要做什么?
- 获取每个游戏每个玩家的最新分数。
- 计算每个游戏的前 X 名玩家分数。
- 发布结果。
- 存储结果。
要完成第一步,我们需要缓冲传入的流并返回每个游戏每个玩家的最新分数更新。我们的TICKscript 将如下所示
var topPlayerScores = stream
|from()
.measurement('scores')
// Get the most recent score for each player per game.
// Not likely that a player is playing two games but just in case.
.groupBy('game', 'player')
|window()
// keep a buffer of the last 11s of scores
// just in case a player score hasn't updated in a while
.period(11s)
// Emit the current score per player every second.
.every(1s)
// Align the window boundaries to be on the second.
.align()
|last('value')
将此脚本放在名为 top_scores.tick
的文件中。
现在我们的 topPlayerScores
变量包含每个玩家的最新分数。接下来,要计算每个游戏的最高分数,我们只需要按游戏分组并运行另一个 map reduce 作业。让我们保留每个游戏的前 15 个分数。将以下行添加到 top_scores.tick
文件中。
// Calculate the top 15 scores per game
var topScores = topPlayerScores
|groupBy('game')
|top(15, 'last', 'player')
现在 topScores
变量包含每个游戏的前 15 名玩家的分数。这就是我们需要构建排行榜的全部内容。Kapacitor 可以通过HTTPOutNode 通过 HTTP 公开分数。我们将我们的任务称为 top_scores
;通过以下添加,最新的分数将在 http://localhost:9092/kapacitor/v1/tasks/top_scores/top_scores
上可用。
// Expose top scores over the HTTP API at the 'top_scores' endpoint.
// Now your app can just request the top scores from Kapacitor
// and always get the most recent result.
//
// http://localhost:9092/kapacitor/v1/tasks/top_scores/top_scores
topScores
|httpOut('top_scores')
最后,我们希望随着时间的推移存储最高分数,以便我们可以进行深入分析,以确保最佳游戏体验。但是我们不想每秒都存储分数,因为数据仍然太多。首先,我们将对数据进行采样,并且仅每 10 秒存储一次分数。此外,由于我们已经拥有所有数据的流,因此让我们提前进行一些基本分析。目前,我们将仅进行基本差距分析,其中我们将存储最高分玩家和第 15 名玩家之间的差距。将以下行添加到 top_scores.tick
以完成我们的任务。
// Sample the top scores and keep a score once every 10s
var topScoresSampled = topScores
|sample(10s)
// Store top fifteen player scores in InfluxDB.
topScoresSampled
|influxDBOut()
.database('game')
.measurement('top_scores')
// Calculate the max and min of the top scores.
var max = topScoresSampled
|max('top')
var min = topScoresSampled
|min('top')
// Join the max and min streams back together and calculate the gap.
max
|join(min)
.as('max', 'min')
// Calculate the difference between the max and min scores.
// Rename the max and min fields to more friendly names 'topFirst', 'topLast'.
|eval(lambda: "max.max" - "min.min", lambda: "max.max", lambda: "min.min")
.as('gap', 'topFirst', 'topLast')
// Store the fields: gap, topFirst and topLast in InfluxDB.
|influxDBOut()
.database('game')
.measurement('top_scores_gap')
由于我们将数据写回 InfluxDB,因此为我们的结果创建一个数据库 game
。
curl -G 'http://localhost:8086/query?' --data-urlencode 'q=CREATE DATABASE game'
如果您不想复制粘贴那么多,这是完整的任务 TICKscript :)
dbrp "game"."autogen"
// Define a result that contains the most recent score per player.
var topPlayerScores = stream
|from()
.measurement('scores')
// Get the most recent score for each player per game.
// Not likely that a player is playing two games but just in case.
.groupBy('game', 'player')
|window()
// keep a buffer of the last 11s of scores
// just in case a player score hasn't updated in a while
.period(11s)
// Emit the current score per player every second.
.every(1s)
// Align the window boundaries to be on the second.
.align()
|last('value')
// Calculate the top 15 scores per game
var topScores = topPlayerScores
|groupBy('game')
|top(15, 'last', 'player')
// Expose top scores over the HTTP API at the 'top_scores' endpoint.
// Now your app can just request the top scores from Kapacitor
// and always get the most recent result.
//
// http://localhost:9092/kapacitor/v1/tasks/top_scores/top_scores
topScores
|httpOut('top_scores')
// Sample the top scores and keep a score once every 10s
var topScoresSampled = topScores
|sample(10s)
// Store top fifteen player scores in InfluxDB.
topScoresSampled
|influxDBOut()
.database('game')
.measurement('top_scores')
// Calculate the max and min of the top scores.
var max = topScoresSampled
|max('top')
var min = topScoresSampled
|min('top')
// Join the max and min streams back together and calculate the gap.
max
|join(min)
.as('max', 'min')
// calculate the difference between the max and min scores.
|eval(lambda: "max.max" - "min.min", lambda: "max.max", lambda: "min.min")
.as('gap', 'topFirst', 'topLast')
// store the fields: gap, topFirst, and topLast in InfluxDB.
|influxDBOut()
.database('game')
.measurement('top_scores_gap')
定义并启用我们的任务以查看其运行情况
kapacitor define top_scores -tick top_scores.tick
kapacitor enable top_scores
首先,让我们检查 HTTP 输出是否正常工作。
curl 'http://localhost:9092/kapacitor/v1/tasks/top_scores/top_scores'
您应该获得每个游戏前 15 名玩家及其分数的 JSON 结果。多次点击端点以查看分数是否每秒更新一次。
现在,让我们检查 InfluxDB 以查看我们的历史数据。
curl \
-G 'http://localhost:8086/query?db=game' \
--data-urlencode 'q=SELECT * FROM top_scores WHERE time > now() - 5m GROUP BY game'
curl \
-G 'http://localhost:8086/query?db=game' \
--data-urlencode 'q=SELECT * FROM top_scores_gap WHERE time > now() - 5m GROUP BY game'
太棒了!艰苦的工作已经完成。剩下的就是配置游戏服务器以将分数更新发送到 Kapacitor,并更新观众仪表板以从 Kapacitor 拉取分数。
此页面是否对您有帮助?
感谢您的反馈!
支持和反馈
感谢您成为我们社区的一份子!我们欢迎并鼓励您对 Kapacitor 和本文档提出反馈和错误报告。要获得支持,请使用以下资源