使用 Kapacitor 进行自定义异常检测
每个人都有自己的异常检测算法,因此我们构建了 Kapacitor 以便轻松集成任何适合您领域的算法。Kapacitor 将这些自定义算法称为 UDF(用户定义函数)。本指南将引导您完成在 Kapacitor 中编写和使用自己的 UDF 所需的步骤。
如果您还没有完成,我们建议您在继续之前先阅读 Kapacitor 的入门指南。
3D 打印
如果您拥有或最近购买了 3D 打印机,您可能知道 3D 打印需要在特定温度下进行才能确保打印质量。打印也可能花费很长时间(有些可能超过 24 小时),因此您无法一直盯着温度图表来确保打印顺利进行。此外,如果打印早期出现问题,您需要确保及时停止打印,以便重新开始,而不是在继续打印糟糕的打印品上浪费材料。
由于 3D 打印的物理限制,打印机软件通常设计为将温度保持在一定的容差范围内。为了方便论证,我们假设您不信任软件能做好本职工作(或想创建自己的逻辑),并希望在温度达到异常水平时收到警报。
3D 打印涉及三种温度
- 热端温度(打印前熔化塑料的地方)。
- 打印床温度(打印部件的地方)。
- 环境空气温度(打印机周围的空气)。
这三种温度都会影响打印质量(有些比其他更重要),但我们希望跟踪所有这些温度。
为了使我们的异常检测算法保持简单,让我们为接收到的每个数据窗口计算一个p 值,然后发出一个包含该p 值的数据点。为了计算p 值,我们将使用Welch t 检验。对于零假设,我们将声明一个新窗口来自与历史窗口相同的总体。如果p 值足够低,我们可以拒绝零假设,并得出结论该窗口一定来自与历史数据总体不同的数据,或者说是一个异常。这是一个过度简化的方法,但我们正在学习如何编写 UDF,而不是统计学。
编写用户定义函数 (UDF)
现在我们对要做什么有了一个想法,让我们来了解 Kapacitor 希望如何与我们的进程进行通信。从UDF README中,我们了解到 Kapacitor 将启动一个名为agent的进程。agent负责描述它拥有的选项,然后用一组选项初始化自己。当数据被 UDF 接收时,agent执行其计算,然后将结果数据返回给 Kapacitor。所有这些通信都通过 STDIN 和 STDOUT 使用协议缓冲区进行。截至目前,Kapacitor 已经用 Go 和 Python 实现了一些 agent,它们负责通信细节并提供一个用于执行实际工作的接口。在本指南中,我们将使用 Python agent。
Handler 接口
这是 agent 的 Python handler 接口
# The Agent calls the appropriate methods on the Handler as requests are read off STDIN.
#
# Throwing an exception will cause the Agent to stop and an ErrorResponse to be sent.
# Some *Response objects (like SnapshotResponse) allow for returning their own error within the object itself.
# These types of errors will not stop the Agent and Kapacitor will deal with them appropriately.
#
# The Handler is called from a single thread, meaning methods will not be called concurrently.
#
# To write Points/Batches back to the Agent/Kapacitor use the Agent.write_response method, which is thread safe.
class Handler(object):
def info(self):
pass
def init(self, init_req):
pass
def snapshot(self):
pass
def restore(self, restore_req):
pass
def begin_batch(self):
pass
def point(self):
pass
def end_batch(self, end_req):
passinfo方法
让我们从info方法开始。当 Kapacitor 启动时,它会调用info并期望返回有关此 UDF 如何工作的信息。具体来说,Kapacitor 期望 UDF 所需的边缘类型和提供的边缘类型。
请记住:在 Kapacitor 中,数据通过流或批次传输,因此 UDF 必须声明它期望什么。
此外,UDF 可以接受某些选项,以便单独配置它们。info响应可以包含选项列表、它们的名称和期望的参数。
对于我们的示例 UDF,我们需要知道三件事
- 要操作的字段。
- 要保留的历史窗口大小。
- 使用的显著性水平或
alpha。
下面是我们 handler 的info方法的实现,它定义了可用的边缘类型和选项。
...
def info(self):
"""
Respond with which type of edges we want/provide and any options we have.
"""
response = udf_pb2.Response()
# We will consume batch edges aka windows of data.
response.info.wants = udf_pb2.BATCH
# We will produce single points of data aka stream.
response.info.provides = udf_pb2.STREAM
# Here we can define options for the UDF.
# Define which field we should process.
response.info.options['field'].valueTypes.append(udf_pb2.STRING)
# Since we will be computing a moving average let's make the size configurable.
# Define an option 'size' that takes one integer argument.
response.info.options['size'].valueTypes.append(udf_pb2.INT)
# We need to know the alpha level so that we can ignore bad windows.
# Define an option 'alpha' that takes one double valued argument.
response.info.options['alpha'].valueTypes.append(udf_pb2.DOUBLE)
return response
...当 Kapacitor 启动时,它会启动我们的 UDF 进程并请求info数据,然后关闭进程。Kapacitor 将记住每个 UDF 的这些信息。这样,Kapacitor 可以在 UDF 在任务中执行之前了解其可用选项。
init方法
接下来,让我们实现init方法,该方法在任务开始执行时被调用。init方法接收一组选定的选项,然后用于相应地配置 handler。作为响应,我们指示init请求是否成功,如果不成功,则提供任何错误消息(如果选项无效)。
...
def init(self, init_req):
"""
Given a list of options initialize this instance of the handler
"""
success = True
msg = ''
size = 0
for opt in init_req.options:
if opt.name == 'field':
self._field = opt.values[0].stringValue
elif opt.name == 'size':
size = opt.values[0].intValue
elif opt.name == 'alpha':
self._alpha = opt.values[0].doubleValue
if size <= 1:
success = False
msg += ' must supply window size > 1'
if self._field == '':
success = False
msg += ' must supply a field name'
if self._alpha == 0:
success = False
msg += ' must supply an alpha value'
# Initialize our historical window
# We will define MovingStats in the next step
self._history = MovingStats(size)
response = udf_pb2.Response()
response.init.success = success
response.init.error = msg[1:]
return response
...当任务开始时,Kapacitor 会为 UDF 启动一个新进程,并调用init,传递 TICKscript 中指定的任何选项。初始化后,进程将保持运行,Kapacitor 将开始发送到达的数据。
Batch和Point方法
我们的任务需要一个batch边缘,这意味着它期望以批次或窗口的形式接收数据。要将一批数据发送到 UDF 进程,Kapacitor 首先调用begin_batch方法,该方法指示所有后续点都属于一个批次。批次完成后,将使用有关批次的元数据调用end_batch方法。
总的来说,这就是我们的 UDF 代码将对每个begin_batch、point和end_batch调用所做的事情。
begin_batch:标记一个新批次的开始并为其初始化一个结构。point:存储该点。end_batch:执行t 检验,然后更新历史数据。
完整的 UDF 脚本
下面是完整的 UDF 实现,包括我们的info、init和批处理方法(以及我们所需的所有其他内容)。
from kapacitor.udf.agent import Agent, Handler
from scipy import stats
import math
from kapacitor.udf import udf_pb2
import sys
class TTestHandler(Handler):
"""
Keep a rolling window of historically normal data
When a new window arrives use a two-sided t-test to determine
if the new window is statistically significantly different.
"""
def __init__(self, agent):
self._agent = agent
self._field = ''
self._history = None
self._batch = None
self._alpha = 0.0
def info(self):
"""
Respond with which type of edges we want/provide and any options we have.
"""
response = udf_pb2.Response()
# We will consume batch edges aka windows of data.
response.info.wants = udf_pb2.BATCH
# We will produce single points of data aka stream.
response.info.provides = udf_pb2.STREAM
# Here we can define options for the UDF.
# Define which field we should process
response.info.options['field'].valueTypes.append(udf_pb2.STRING)
# Since we will be computing a moving average let's make the size configurable.
# Define an option 'size' that takes one integer argument.
response.info.options['size'].valueTypes.append(udf_pb2.INT)
# We need to know the alpha level so that we can ignore bad windows
# Define an option 'alpha' that takes one double argument.
response.info.options['alpha'].valueTypes.append(udf_pb2.DOUBLE)
return response
def init(self, init_req):
"""
Given a list of options initialize this instance of the handler
"""
success = True
msg = ''
size = 0
for opt in init_req.options:
if opt.name == 'field':
self._field = opt.values[0].stringValue
elif opt.name == 'size':
size = opt.values[0].intValue
elif opt.name == 'alpha':
self._alpha = opt.values[0].doubleValue
if size <= 1:
success = False
msg += ' must supply window size > 1'
if self._field == '':
success = False
msg += ' must supply a field name'
if self._alpha == 0:
success = False
msg += ' must supply an alpha value'
# Initialize our historical window
self._history = MovingStats(size)
response = udf_pb2.Response()
response.init.success = success
response.init.error = msg[1:]
return response
def begin_batch(self, begin_req):
# create new window for batch
self._batch = MovingStats(-1)
def point(self, point):
self._batch.update(point.fieldsDouble[self._field])
def end_batch(self, batch_meta):
pvalue = 1.0
if self._history.n != 0:
# Perform Welch's t test
t, pvalue = stats.ttest_ind_from_stats(
self._history.mean, self._history.stddev(), self._history.n,
self._batch.mean, self._batch.stddev(), self._batch.n,
equal_var=False)
# Send pvalue point back to Kapacitor
response = udf_pb2.Response()
response.point.time = batch_meta.tmax
response.point.name = batch_meta.name
response.point.group = batch_meta.group
response.point.tags.update(batch_meta.tags)
response.point.fieldsDouble["t"] = t
response.point.fieldsDouble["pvalue"] = pvalue
self._agent.write_response(response)
# Update historical stats with batch, but only if it was normal.
if pvalue > self._alpha:
for value in self._batch._window:
self._history.update(value)
class MovingStats(object):
"""
Calculate the moving mean and variance of a window.
Uses Welford's Algorithm.
"""
def __init__(self, size):
"""
Create new MovingStats object.
Size can be -1, infinite size or > 1 meaning static size
"""
self.size = size
if not (self.size == -1 or self.size > 1):
raise Exception("size must be -1 or > 1")
self._window = []
self.n = 0.0
self.mean = 0.0
self._s = 0.0
def stddev(self):
"""
Return the standard deviation
"""
if self.n == 1:
return 0.0
return math.sqrt(self._s / (self.n - 1))
def update(self, value):
# update stats for new value
self.n += 1.0
diff = (value - self.mean)
self.mean += diff / self.n
self._s += diff * (value - self.mean)
if self.n == self.size + 1:
# update stats for removing old value
old = self._window.pop(0)
oldM = (self.n * self.mean - old)/(self.n - 1)
self._s -= (old - self.mean) * (old - oldM)
self.mean = oldM
self.n -= 1
self._window.append(value)
if __name__ == '__main__':
# Create an agent
agent = Agent()
# Create a handler and pass it an agent so it can write points
h = TTestHandler(agent)
# Set the handler on the agent
agent.handler = h
# Anything printed to STDERR from a UDF process gets captured into the Kapacitor logs.
print("Starting agent for TTestHandler", file=sys.stderr)
agent.start()
agent.wait()
print("Agent finished", file=sys.stderr)这信息量很大,但现在我们已经准备好配置 Kapacitor 来运行我们的代码了。确保已安装scipy($ pip3 install scipy)。创建一个临时目录以完成本指南的其余部分。
mkdir /tmp/kapacitor_udf
cd /tmp/kapacitor_udf将上面的 UDF Python 脚本保存到/tmp/kapacitor_udf/ttest.py。
为我们的 UDF 配置 Kapacitor
将此代码片段添加到您的 Kapacitor 配置文件中(通常位于/etc/kapacitor/kapacitor.conf)
[udf]
[udf.functions]
[udf.functions.tTest]
# Run python
prog = "/usr/bin/python3"
# Pass args to python
# -u for unbuffered STDIN and STDOUT
# and the path to the script
args = ["-u", "/tmp/kapacitor_udf/ttest.py"]
# If the python process is unresponsive for 10s kill it
timeout = "10s"
# Define env vars for the process, in this case the PYTHONPATH
[udf.functions.tTest.env]
PYTHONPATH = "/tmp/kapacitor_udf/kapacitor/udf/agent/py"在配置中,我们将函数调用为tTest。我们也将这样在 TICKscript 中引用它。
请注意,我们的 Python 脚本导入了Agent对象,并在配置中设置了PYTHONPATH。将 Kapacitor 源克隆到临时目录,以便我们可以将PYTHONPATH指向必需的 Python 代码。这通常是多余的,因为它只有两个 Python 文件,但这样更容易遵循。
git clone https://github.com/influxdata/kapacitor.git /tmp/kapacitor_udf/kapacitor运行带 UDF 的 Kapacitor
重新启动 Kapacitor 守护进程,以确保所有配置都正确无误。
service kapacitor restart检查日志(/var/log/kapacitor/或journalctl -f -n 256 -u kapacitor.service),确保您看到“正在监听信号”一行并且没有发生任何错误。如果您没有看到该行,则表示 UDF 进程已挂起且无响应。它应该在超时后被终止,所以请给它一些时间来正确停止。停止后,您可以修复任何错误并重试。
TICKscript
如果一切都已正确启动,那么是时候编写我们的 TICKscript 来使用tTest UDF 方法了。
dbrp "printer"."autogen"
// This TICKscript monitors the three temperatures for a 3d printing job,
// and triggers alerts if the temperatures start to experience abnormal behavior.
// Define our desired significance level.
var alpha = 0.001
// Select the temperatures measurements
var data = stream
|from()
.measurement('temperatures')
|window()
.period(5m)
.every(5m)
data
//Run our tTest UDF on the hotend temperature
@tTest()
// specify the hotend field
.field('hotend')
// Keep a 1h rolling window
.size(3600)
// pass in the alpha value
.alpha(alpha)
|alert()
.id('hotend')
.crit(lambda: "pvalue" < alpha)
.log('/tmp/kapacitor_udf/hotend_failure.log')
// Do the same for the bed and air temperature.
data
@tTest()
.field('bed')
.size(3600)
.alpha(alpha)
|alert()
.id('bed')
.crit(lambda: "pvalue" < alpha)
.log('/tmp/kapacitor_udf/bed_failure.log')
data
@tTest()
.field('air')
.size(3600)
.alpha(alpha)
|alert()
.id('air')
.crit(lambda: "pvalue" < alpha)
.log('/tmp/kapacitor_udf/air_failure.log')请注意,我们调用了tTest三次。这意味着 Kapacitor 将启动三个不同的 Python 进程,并将各自的 init 选项传递给每个进程。
将此脚本保存为/tmp/kapacitor_udf/print_temps.tick并定义 Kapacitor 任务。
kapacitor define print_temps -tick print_temps.tick确保任务已启用。
kapacitor enable print_temps然后列出任务。
kapacitor list tasks
ID Type Status Executing Databases and Retention Policies
print_temps stream enabled true ["printer"."autogen"]生成测试数据
为了模拟我们的打印机进行测试,我们将编写一个简单的 Python 脚本来生成温度。该脚本生成围绕目标温度正态分布的随机温度。在指定的时间,温度的变化和偏移量会发生变化,从而产生异常。
不用太担心这里的细节。使用真实数据来测试我们的 TICKscript 和 UDF 会更好,但这样更快(而且比 3D 打印机便宜得多)。
#!/usr/bin/env python
from numpy import random
from datetime import timedelta, datetime
import sys
import time
import requests
# Target temperatures in C
hotend_t = 220
bed_t = 90
air_t = 70
# Connection info
write_url = 'https://:9092/write?db=printer&rp=autogen&precision=s'
measurement = 'temperatures'
def temp(target, sigma):
"""
Pick a random temperature from a normal distribution
centered on target temperature.
"""
return random.normal(target, sigma)
def main():
hotend_sigma = 0
bed_sigma = 0
air_sigma = 0
hotend_offset = 0
bed_offset = 0
air_offset = 0
# Define some anomalies by changing sigma at certain times
# list of sigma values to start at a specified iteration
hotend_anomalies =[
(0, 0.5, 0), # normal sigma
(3600, 3.0, -1.5), # at one hour the hotend goes bad
(3900, 0.5, 0), # 5 minutes later recovers
]
bed_anomalies =[
(0, 1.0, 0), # normal sigma
(28800, 5.0, 2.0), # at 8 hours the bed goes bad
(29700, 1.0, 0), # 15 minutes later recovers
]
air_anomalies = [
(0, 3.0, 0), # normal sigma
(10800, 5.0, 0), # at 3 hours air starts to fluctuate more
(43200, 15.0, -5.0), # at 12 hours air goes really bad
(45000, 5.0, 0), # 30 minutes later recovers
(72000, 3.0, 0), # at 20 hours goes back to normal
]
# Start from 2016-01-01 00:00:00 UTC
# This makes it easy to reason about the data later
now = datetime(2016, 1, 1)
second = timedelta(seconds=1)
epoch = datetime(1970,1,1)
# 24 hours of temperatures once per second
points = []
for i in range(60*60*24+2):
# update sigma values
if len(hotend_anomalies) > 0 and i == hotend_anomalies[0][0]:
hotend_sigma = hotend_anomalies[0][1]
hotend_offset = hotend_anomalies[0][2]
hotend_anomalies = hotend_anomalies[1:]
if len(bed_anomalies) > 0 and i == bed_anomalies[0][0]:
bed_sigma = bed_anomalies[0][1]
bed_offset = bed_anomalies[0][2]
bed_anomalies = bed_anomalies[1:]
if len(air_anomalies) > 0 and i == air_anomalies[0][0]:
air_sigma = air_anomalies[0][1]
air_offset = air_anomalies[0][2]
air_anomalies = air_anomalies[1:]
# generate temps
hotend = temp(hotend_t+hotend_offset, hotend_sigma)
bed = temp(bed_t+bed_offset, bed_sigma)
air = temp(air_t+air_offset, air_sigma)
points.append("%s hotend=%f,bed=%f,air=%f %d" % (
measurement,
hotend,
bed,
air,
(now - epoch).total_seconds(),
))
now += second
# Write data to Kapacitor
r = requests.post(write_url, data='\n'.join(points))
if r.status_code != 204:
print >> sys.stderr, r.text
return 1
return 0
if __name__ == '__main__':
exit(main())将上面的脚本保存为/tmp/kapacitor_udf/printer_data.py。
此 Python 脚本有两个 Python 依赖项:
requests和numpy。它们可以通过pip或您的包管理器轻松安装。
此时,我们已经准备好了一个任务和一个生成带有异常的假数据的脚本。现在我们可以创建一个假数据的录制,以便轻松迭代任务。
# Start the recording in the background
kapacitor record stream -task print_temps -duration 24h -no-wait
# List recordings to find the ID
kapacitor list recordings
ID Type Status Size Date
7bd3ced5-5e95-4a67-a0e1-f00860b1af47 stream running 0 B 04 May 16 11:34 MDT
# Copy the ID and store it in a variable
rid=7bd3ced5-5e95-4a67-a0e1-f00860b1af47
# Run our python script to generate data
chmod +x ./printer_data.py
./printer_data.py我们可以通过列出录制信息来验证它是否有效。我们的录制大小为1.6MB,所以您的录制大小也应该接近这个值。
$ kapacitor list recordings $rid
ID Type Status Size Date
7bd3ced5-5e95-4a67-a0e1-f00860b1af47 stream finished 1.6 MB 04 May 16 11:44 MDT检测异常
最后,让我们运行 play 来测试我们的任务,看看它是如何工作的。
kapacitor replay -task print_temps -recording $rid -rec-time检查各个日志文件,看看算法是否捕捉到了异常。
cat /tmp/kapacitor_udf/{hotend,bed,air}_failure.log根据上面的printer_data.py脚本,应该在以下时间出现异常:
- 1 小时:热端
- 8 小时:打印床
- 12 小时:空气
也可能出现一些误报,但由于我们希望它能与真实数据(而不是我们干净的假数据)一起工作,因此在此阶段进行调整并没有太大帮助。
好了,就是这样。我们现在可以在打印温度偏离正常值时收到警报。希望您现在对 Kapacitor UDF 的工作原理有了更好的理解,并有一个良好的工作示例作为进一步使用 UDF 的起点。
框架已经就位,现在去连接一个适合您领域的真实异常检测算法吧!
扩展示例
有几件事情留给读者作为练习。
快照/恢复:Kapacitor 将定期快照您的 UDF 进程的状态,以便在进程重启时可以恢复。此处的示例具有
snapshot和restore方法的实现。作为练习,请为TTestHandlerhandler 实现它们。将算法从 t 检验更改为您领域更合适的算法。
numpy和scipy都有大量的算法。info请求返回的选项可以包含多个参数。修改field选项以接受三个字段名,并将TTestHandler更改为为每个字段维护历史数据和批次,而不是仅为一个字段。这样,只需要运行一个 ttest.py 进程。
此页面是否有帮助?
感谢您的反馈!
支持和反馈
感谢您成为我们社区的一员!我们欢迎并鼓励您对 Kapacitor 和本文档提供反馈和错误报告。要获取支持,请使用以下资源: