实时游戏分数排行榜
如果您还没有运行的 Kapacitor 实例,请查看 入门指南 以在本地启动 Kapacitor。
今天我们是游戏开发者。我们运行了几个游戏服务器,每个服务器运行一个游戏实例,每个游戏大约有百名玩家。
我们需要构建一个排行榜,以便观众可以实时查看玩家的分数。我们还希望有关于领先者的历史数据,以便在赛后进行分析,了解谁领先了多久等。
我们将使用 Kapacitor 的流处理来完成繁重的工作。游戏服务器可以在玩家分数发生变化时发送 UDP 数据包,或者在分数未发生变化的情况下至少每 10 秒发送一次。
设置
下面所有的代码片段都可以在 这里 找到。
我们的首要任务是配置 Kapacitor 以接收分数流。在这种情况下,分数更新过于频繁,无法全部存储在 InfluxDB 中,因此我们将直接将它们发送到 Kapacitor。与 InfluxDB 一样,您可以配置一个 UDP 监听器。将此配置部分添加到您的 Kapacitor 配置文件的末尾。
[[udp]]
enabled = true
bind-address = ":9100"
database = "game"
retention-policy = "autogen"此配置告诉 Kapacitor 在端口 9100 上监听 line protocol 格式的 UDP 数据包。它会将传入数据的作用域限定在 game.autogen 数据库和保留策略内。使用添加了此配置的 Kapacitor 启动。
这是一个简单的 bash 脚本,用于生成随机分数数据,以便我们可以在不干扰实际游戏服务器的情况下进行测试。
#!/bin/bash
# default options: can be overridden with corresponding arguments.
host=${1-localhost}
port=${2-9100}
games=${3-10}
players=${4-100}
games=$(seq $games)
players=$(seq $players)
# Spam score updates over UDP
while true
do
for game in $games
do
game="g$game"
for player in $players
do
player="p$player"
score=$(($RANDOM % 1000))
echo "scores,player=$player,game=$game value=$score" > /dev/udp/$host/$port
done
done
sleep 0.1
done将上面的脚本放入一个名为 scores.sh 的文件中并运行它。
chmod +x ./scores.sh
./scores.sh现在我们正在用我们的虚假分数数据“轰炸”Kapacitor。我们可以让它一直运行,因为 Kapacitor 会在没有任务需要它时丢弃传入的数据。
定义 Kapacitor 任务
排行榜需要做什么?
- 获取每个玩家在每个游戏中的最新分数。
- 计算每个游戏排名前 X 的玩家分数。
- 发布结果。
- 存储结果。
要完成第一步,我们需要缓冲传入的流,并返回每个玩家在每个游戏中的最新分数更新。我们的 TICKscript 将如下所示:
var topPlayerScores = stream
|from()
.measurement('scores')
// Get the most recent score for each player per game.
// Not likely that a player is playing two games but just in case.
.groupBy('game', 'player')
|window()
// keep a buffer of the last 11s of scores
// just in case a player score hasn't updated in a while
.period(11s)
// Emit the current score per player every second.
.every(1s)
// Align the window boundaries to be on the second.
.align()
|last('value')将此脚本放入一个名为 top_scores.tick 的文件中。
现在我们的 topPlayerScores 变量包含了每个玩家的最新分数。接下来,要计算每个游戏的最高分数,我们只需要按游戏分组并运行另一个 map reduce 作业。我们保留每个游戏排名前 15 的分数。将以下几行添加到 top_scores.tick 文件中。
// Calculate the top 15 scores per game
var topScores = topPlayerScores
|groupBy('game')
|top(15, 'last', 'player')topScores 变量现在包含了每个游戏排名前 15 名玩家的分数。构建排行榜所需的一切都已准备就绪。Kapacitor 可以通过 HTTPOutNode 通过 HTTP 公开分数。我们将任务命名为 top_scores;通过以下添加,最新的分数将可以在 https://:9092/kapacitor/v1/tasks/top_scores/top_scores 处获得。
// Expose top scores over the HTTP API at the 'top_scores' endpoint.
// Now your app can just request the top scores from Kapacitor
// and always get the most recent result.
//
// https://:9092/kapacitor/v1/tasks/top_scores/top_scores
topScores
|httpOut('top_scores')最后,我们希望随着时间的推移存储最高分数,以便进行深入分析以确保最佳游戏玩法。但我们不想每秒都存储分数,因为数据量仍然太大。首先,我们将对数据进行采样,每 10 秒存储一次分数。此外,让我们提前做一些基本的分析,因为我们已经有了所有数据的流。目前,我们只会做基本的差距分析,即存储最高玩家和第 15 名玩家之间的差距。将以下几行添加到 top_scores.tick 以完成我们的任务。
// Sample the top scores and keep a score once every 10s
var topScoresSampled = topScores
|sample(10s)
// Store top fifteen player scores in InfluxDB.
topScoresSampled
|influxDBOut()
.database('game')
.measurement('top_scores')
// Calculate the max and min of the top scores.
var max = topScoresSampled
|max('top')
var min = topScoresSampled
|min('top')
// Join the max and min streams back together and calculate the gap.
max
|join(min)
.as('max', 'min')
// Calculate the difference between the max and min scores.
// Rename the max and min fields to more friendly names 'topFirst', 'topLast'.
|eval(lambda: "max.max" - "min.min", lambda: "max.max", lambda: "min.min")
.as('gap', 'topFirst', 'topLast')
// Store the fields: gap, topFirst and topLast in InfluxDB.
|influxDBOut()
.database('game')
.measurement('top_scores_gap')由于我们将数据写回 InfluxDB,请创建一个名为 game 的数据库来存储我们的结果。
curl -G 'https://:8086/query?' --data-urlencode 'q=CREATE DATABASE game'如果您不想复制粘贴太多,这里是完整的任务 TICKscript:
dbrp "game"."autogen"
// Define a result that contains the most recent score per player.
var topPlayerScores = stream
|from()
.measurement('scores')
// Get the most recent score for each player per game.
// Not likely that a player is playing two games but just in case.
.groupBy('game', 'player')
|window()
// keep a buffer of the last 11s of scores
// just in case a player score hasn't updated in a while
.period(11s)
// Emit the current score per player every second.
.every(1s)
// Align the window boundaries to be on the second.
.align()
|last('value')
// Calculate the top 15 scores per game
var topScores = topPlayerScores
|groupBy('game')
|top(15, 'last', 'player')
// Expose top scores over the HTTP API at the 'top_scores' endpoint.
// Now your app can just request the top scores from Kapacitor
// and always get the most recent result.
//
// https://:9092/kapacitor/v1/tasks/top_scores/top_scores
topScores
|httpOut('top_scores')
// Sample the top scores and keep a score once every 10s
var topScoresSampled = topScores
|sample(10s)
// Store top fifteen player scores in InfluxDB.
topScoresSampled
|influxDBOut()
.database('game')
.measurement('top_scores')
// Calculate the max and min of the top scores.
var max = topScoresSampled
|max('top')
var min = topScoresSampled
|min('top')
// Join the max and min streams back together and calculate the gap.
max
|join(min)
.as('max', 'min')
// calculate the difference between the max and min scores.
|eval(lambda: "max.max" - "min.min", lambda: "max.max", lambda: "min.min")
.as('gap', 'topFirst', 'topLast')
// store the fields: gap, topFirst, and topLast in InfluxDB.
|influxDBOut()
.database('game')
.measurement('top_scores_gap')定义并启用我们的任务以使其生效。
kapacitor define top_scores -tick top_scores.tick
kapacitor enable top_scores首先,让我们检查 HTTP 输出是否正常工作。
curl 'https://:9092/kapacitor/v1/tasks/top_scores/top_scores'您应该会看到一个 JSON 格式的结果,其中包含每个游戏排名前 15 的玩家及其分数。多次访问该端点,您会看到分数每秒更新一次。
现在,让我们检查 InfluxDB 以查看我们的历史数据。
curl \
-G 'https://:8086/query?db=game' \
--data-urlencode 'q=SELECT * FROM top_scores WHERE time > now() - 5m GROUP BY game'
curl \
-G 'https://:8086/query?db=game' \
--data-urlencode 'q=SELECT * FROM top_scores_gap WHERE time > now() - 5m GROUP BY game'太棒了!繁重的工作已经完成。剩下的是配置游戏服务器向 Kapacitor 发送分数更新,并更新观众仪表板以从 Kapacitor 拉取分数。
此页面是否有帮助?
感谢您的反馈!
支持和反馈
感谢您成为我们社区的一员!我们欢迎并鼓励您对 Kapacitor 和本文档提供反馈和错误报告。要获取支持,请使用以下资源: