文档文档

使用 Kapacitor 的自定义异常检测

每个人都有自己的异常检测算法,因此我们构建了 Kapacitor,以便轻松地与适合您领域的任何算法集成。Kapacitor 将这些自定义算法称为用户自定义函数 (UDF)。本指南将逐步介绍在 Kapacitor 中编写和使用您自己的 UDF 的必要步骤。

如果您尚未这样做,我们建议在继续之前,先按照 Kapacitor 入门指南 进行操作。

3D 打印

如果您拥有或最近购买了 3D 打印机,您可能知道 3D 打印需要环境处于特定温度才能确保打印质量。打印也可能需要很长时间(有些可能超过 24 小时),因此您不能一直观看温度图,以确保打印顺利进行。此外,如果打印早期出现问题,您需要确保停止它,以便您可以重新启动它,而不是浪费材料继续不良打印。

由于 3D 打印的物理限制,打印机软件通常设计为将温度保持在一定公差范围内。为了论证,假设您不信任该软件来完成其工作(或想要创建自己的软件),并且希望在温度达到异常水平时收到警报。

3D 打印涉及三个温度

  1. 热端(塑料在打印前熔化的地方)的温度。
  2. 床(正在打印部件的地方)的温度。
  3. 环境空气(打印机周围的空气)的温度。

所有这三个温度都会影响打印质量(有些比其他温度更重要),但我们希望确保跟踪所有这些温度。

为了使我们的异常检测算法保持简单,让我们为我们收到的每个数据窗口计算一个 p-value,然后发出一个包含该 p-value 的数据点。为了计算 p-value,我们将使用 Welch's t 检验。对于零假设,我们将声明新窗口与历史窗口来自同一总体。如果 p-value 降得足够低,我们可以拒绝零假设,并得出结论,窗口必须来自与历史数据总体不同的事物,或者异常。这是一种过度简化的方法,但我们正在学习如何编写 UDF,而不是统计学。

编写用户自定义函数 (UDF)

现在我们对想要做什么有了一个概念,让我们了解 Kapacitor 希望如何与我们的进程通信。从 UDF README 中,我们了解到 Kapacitor 将生成一个名为 agent 的进程。agent 负责描述它有哪些选项,然后使用一组选项初始化自身。当 UDF 接收到数据时,agent 执行其计算,然后将结果数据返回给 Kapacitor。所有这些通信都通过 STDIN 和 STDOUT 使用协议缓冲区进行。截至撰写本文时,Kapacitor 已经实现了 Go 和 Python 中的代理,它们负责通信细节,并公开了一个用于执行实际工作的接口。在本指南中,我们将使用 Python 代理。

Handler 接口

这是代理的 Python handler 接口

# The Agent calls the appropriate methods on the Handler as requests are read off STDIN.
#
# Throwing an exception will cause the Agent to stop and an ErrorResponse to be sent.
# Some *Response objects (like SnapshotResponse) allow for returning their own error within the object itself.
# These types of errors will not stop the Agent and Kapacitor will deal with them appropriately.
#
# The Handler is called from a single thread, meaning methods will not be called concurrently.
#
# To write Points/Batches back to the Agent/Kapacitor use the Agent.write_response method, which is thread safe.
class Handler(object):
    def info(self):
        pass
    def init(self, init_req):
        pass
    def snapshot(self):
        pass
    def restore(self, restore_req):
        pass
    def begin_batch(self):
        pass
    def point(self):
        pass
    def end_batch(self, end_req):
        pass

Info 方法

让我们从 info 方法开始。当 Kapacitor 启动时,它将调用 info 并期望返回一些关于此 UDF 行为的信息。具体来说,Kapacitor 期望 UDF 想要的边缘类型和提供的边缘类型。

记住:在 Kapacitor 中,数据以流或批处理形式传输,因此 UDF 必须声明它期望什么。

此外,UDF 可以接受某些选项,以便可以单独配置它们。info 响应可以包含选项列表、它们的名称和期望的参数。

对于我们的示例 UDF,我们需要知道三件事

  1. 要操作的字段。
  2. 要保留的历史窗口的大小。
  3. 正在使用的显着性水平或 alpha

下面是我们处理程序的 info 方法的实现,该方法定义了边缘类型和可用选项

...
    def info(self):
        """
        Respond with which type of edges we want/provide and any options we have.
        """
        response = udf_pb2.Response()

        # We will consume batch edges aka windows of data.
        response.info.wants = udf_pb2.BATCH
        # We will produce single points of data aka stream.
        response.info.provides = udf_pb2.STREAM

        # Here we can define options for the UDF.
        # Define which field we should process.
        response.info.options['field'].valueTypes.append(udf_pb2.STRING)

        # Since we will be computing a moving average let's make the size configurable.
        # Define an option 'size' that takes one integer argument.
        response.info.options['size'].valueTypes.append(udf_pb2.INT)

        # We need to know the alpha level so that we can ignore bad windows.
        # Define an option 'alpha' that takes one double valued argument.
        response.info.options['alpha'].valueTypes.append(udf_pb2.DOUBLE)

        return response
...

当 Kapacitor 启动时,它将生成我们的 UDF 进程并请求 info 数据,然后关闭该进程。Kapacitor 将记住每个 UDF 的此信息。这样,Kapacitor 可以在任务内部执行 UDF 之前了解给定 UDF 的可用选项。

Init 方法

接下来,让我们实现 init 方法,该方法在任务开始执行时调用一次。init 方法接收选择的选项列表,然后使用这些选项来适当配置处理程序。作为响应,我们指示 init 请求是否成功,如果未成功,则指示选项无效的任何错误消息。

...
    def init(self, init_req):
        """
        Given a list of options initialize this instance of the handler
        """
        success = True
        msg = ''
        size = 0
        for opt in init_req.options:
            if opt.name == 'field':
                self._field = opt.values[0].stringValue
            elif opt.name == 'size':
                size = opt.values[0].intValue
            elif opt.name == 'alpha':
                self._alpha = opt.values[0].doubleValue

        if size <= 1:
            success = False
            msg += ' must supply window size > 1'
        if self._field == '':
            success = False
            msg += ' must supply a field name'
        if self._alpha == 0:
            success = False
            msg += ' must supply an alpha value'

        # Initialize our historical window
        # We will define MovingStats in the next step
        self._history = MovingStats(size)

        response = udf_pb2.Response()
        response.init.success = success
        response.init.error = msg[1:]

        return response
...

当任务启动时,Kapacitor 会为 UDF 生成一个新进程并调用 init,传递来自 TICKscript 的任何指定选项。一旦初始化,该进程将保持运行,并且 Kapacitor 将在数据到达时开始发送数据。

Batch 和 Point 方法

我们的任务想要一个 batch 边缘,这意味着它期望以批处理或窗口形式获取数据。为了将一批数据发送到 UDF 进程,Kapacitor 首先调用 begin_batch 方法,该方法指示所有后续点都属于一个批处理。一旦批处理完成,将使用有关批处理的一些元数据调用 end_batch 方法。

在高层次上,这是我们的 UDF 代码将为每个 begin_batchpointend_batch 调用执行的操作

  • begin_batch:标记新批处理的开始并初始化其结构
  • point:存储点
  • end_batch:执行 t 检验,然后更新历史数据

完整的 UDF 脚本

接下来是完整的 UDF 实现,其中包含我们的 infoinit 和批处理方法(以及我们需要的所有其他内容)。


from kapacitor.udf.agent import Agent, Handler
from scipy import stats
import math
from kapacitor.udf import udf_pb2
import sys

class TTestHandler(Handler):
    """
    Keep a rolling window of historically normal data
    When a new window arrives use a two-sided t-test to determine
    if the new window is statistically significantly different.
    """
    def __init__(self, agent):
        self._agent = agent

        self._field = ''
        self._history = None

        self._batch = None

        self._alpha = 0.0

    def info(self):
        """
        Respond with which type of edges we want/provide and any options we have.
        """
        response = udf_pb2.Response()
        # We will consume batch edges aka windows of data.
        response.info.wants = udf_pb2.BATCH
        # We will produce single points of data aka stream.
        response.info.provides = udf_pb2.STREAM

        # Here we can define options for the UDF.
        # Define which field we should process
        response.info.options['field'].valueTypes.append(udf_pb2.STRING)

        # Since we will be computing a moving average let's make the size configurable.
        # Define an option 'size' that takes one integer argument.
        response.info.options['size'].valueTypes.append(udf_pb2.INT)

        # We need to know the alpha level so that we can ignore bad windows
        # Define an option 'alpha' that takes one double argument.
        response.info.options['alpha'].valueTypes.append(udf_pb2.DOUBLE)

        return response

    def init(self, init_req):
        """
        Given a list of options initialize this instance of the handler
        """
        success = True
        msg = ''
        size = 0
        for opt in init_req.options:
            if opt.name == 'field':
                self._field = opt.values[0].stringValue
            elif opt.name == 'size':
                size = opt.values[0].intValue
            elif opt.name == 'alpha':
                self._alpha = opt.values[0].doubleValue

        if size <= 1:
            success = False
            msg += ' must supply window size > 1'
        if self._field == '':
            success = False
            msg += ' must supply a field name'
        if self._alpha == 0:
            success = False
            msg += ' must supply an alpha value'

        # Initialize our historical window
        self._history = MovingStats(size)

        response = udf_pb2.Response()
        response.init.success = success
        response.init.error = msg[1:]

        return response

    def begin_batch(self, begin_req):
        # create new window for batch
        self._batch = MovingStats(-1)

    def point(self, point):
        self._batch.update(point.fieldsDouble[self._field])

    def end_batch(self, batch_meta):
        pvalue = 1.0
        if self._history.n != 0:
            # Perform Welch's t test
            t, pvalue = stats.ttest_ind_from_stats(
                    self._history.mean, self._history.stddev(), self._history.n,
                    self._batch.mean, self._batch.stddev(), self._batch.n,
                    equal_var=False)


            # Send pvalue point back to Kapacitor
            response = udf_pb2.Response()
            response.point.time = batch_meta.tmax
            response.point.name = batch_meta.name
            response.point.group = batch_meta.group
            response.point.tags.update(batch_meta.tags)
            response.point.fieldsDouble["t"] = t
            response.point.fieldsDouble["pvalue"] = pvalue
            self._agent.write_response(response)

        # Update historical stats with batch, but only if it was normal.
        if pvalue > self._alpha:
            for value in self._batch._window:
                self._history.update(value)


class MovingStats(object):
    """
    Calculate the moving mean and variance of a window.
    Uses Welford's Algorithm.
    """
    def __init__(self, size):
        """
        Create new MovingStats object.
        Size can be -1, infinite size or > 1 meaning static size
        """
        self.size = size
        if not (self.size == -1 or self.size > 1):
            raise Exception("size must be -1 or > 1")


        self._window = []
        self.n = 0.0
        self.mean = 0.0
        self._s = 0.0

    def stddev(self):
        """
        Return the standard deviation
        """
        if self.n == 1:
            return 0.0
        return math.sqrt(self._s / (self.n - 1))

    def update(self, value):

        # update stats for new value
        self.n += 1.0
        diff = (value - self.mean)
        self.mean += diff / self.n
        self._s += diff * (value - self.mean)

        if self.n == self.size + 1:
            # update stats for removing old value
            old = self._window.pop(0)
            oldM = (self.n * self.mean - old)/(self.n - 1)
            self._s -= (old - self.mean) * (old - oldM)
            self.mean = oldM
            self.n -= 1

        self._window.append(value)

if __name__ == '__main__':
    # Create an agent
    agent = Agent()

    # Create a handler and pass it an agent so it can write points
    h = TTestHandler(agent)

    # Set the handler on the agent
    agent.handler = h

    # Anything printed to STDERR from a UDF process gets captured into the Kapacitor logs.
    print >> sys.stderr, "Starting agent for TTestHandler"
    agent.start()
    agent.wait()
    print >> sys.stderr, "Agent finished"

内容很多,但现在我们准备配置 Kapacitor 以运行我们的代码。创建一个临时目录以完成本指南的其余部分

mkdir /tmp/kapacitor_udf
cd /tmp/kapacitor_udf

将上述 UDF python 脚本保存到 /tmp/kapacitor_udf/ttest.py

为我们的 UDF 配置 Kapacitor

将此代码段添加到您的 Kapacitor 配置文件(通常位于 /etc/kapacitor/kapacitor.conf

[udf]
[udf.functions]
    [udf.functions.tTest]
        # Run python
        prog = "/usr/bin/python2"
        # Pass args to python
        # -u for unbuffered STDIN and STDOUT
        # and the path to the script
        args = ["-u", "/tmp/kapacitor_udf/ttest.py"]
        # If the python process is unresponsive for 10s kill it
        timeout = "10s"
        # Define env vars for the process, in this case the PYTHONPATH
        [udf.functions.tTest.env]
            PYTHONPATH = "/tmp/kapacitor_udf/kapacitor/udf/agent/py"

在配置中,我们将该函数称为 tTest。这也是我们将在 TICKscript 中引用它的方式。

请注意,我们的 Python 脚本导入了 Agent 对象,并且我们在配置中设置了 PYTHONPATH。将 Kapacitor 源代码克隆到临时目录中,以便我们可以将 PYTHONPATH 指向必要的 python 代码。这通常是多余的,因为它只是两个 Python 文件,但它使其易于遵循

git clone https://github.com/influxdata/kapacitor.git /tmp/kapacitor_udf/kapacitor

使用 UDF 运行 Kapacitor

重新启动 Kapacitor 守护程序,以确保一切配置正确

service kapacitor restart

检查日志 (/var/log/kapacitor/) 以确保您看到 Listening for signals 行,并且没有发生错误。如果您没有看到该行,那是因为 UDF 进程挂起且未响应。它应该在超时后被终止,所以请稍等片刻以使其正常停止。停止后,您可以修复任何错误并重试。

TICKscript

如果一切都正确启动,那么现在是编写我们的 TICKscript 以使用 tTest UDF 方法的时候了

dbrp "printer"."autogen"

// This TICKscript monitors the three temperatures for a 3d printing job,
// and triggers alerts if the temperatures start to experience abnormal behavior.

// Define our desired significance level.
var alpha = 0.001

// Select the temperatures measurements
var data = stream
    |from()
        .measurement('temperatures')
    |window()
        .period(5m)
        .every(5m)

data
    //Run our tTest UDF on the hotend temperature
    @tTest()
        // specify the hotend field
        .field('hotend')
        // Keep a 1h rolling window
        .size(3600)
        // pass in the alpha value
        .alpha(alpha)
    |alert()
        .id('hotend')
        .crit(lambda: "pvalue" < alpha)
        .log('/tmp/kapacitor_udf/hotend_failure.log')

// Do the same for the bed and air temperature.
data
    @tTest()
        .field('bed')
        .size(3600)
        .alpha(alpha)
    |alert()
        .id('bed')
        .crit(lambda: "pvalue" < alpha)
        .log('/tmp/kapacitor_udf/bed_failure.log')

data
    @tTest()
        .field('air')
        .size(3600)
        .alpha(alpha)
    |alert()
        .id('air')
        .crit(lambda: "pvalue" < alpha)
        .log('/tmp/kapacitor_udf/air_failure.log')

请注意,我们调用了 tTest 三次。这意味着 Kapacitor 将生成三个不同的 Python 进程,并将相应的 init 选项传递给每个进程。

将此脚本另存为 /tmp/kapacitor_udf/print_temps.tick 并定义 Kapacitor 任务

kapacitor define print_temps -tick print_temps.tick

生成测试数据

为了模拟我们的打印机进行测试,我们将编写一个简单的 Python 脚本来生成温度。此脚本生成随机温度,这些温度通常围绕目标温度分布。在指定的时间,温度的变化和偏移会发生变化,从而产生异常。

不要太担心这里的细节。最好使用真实数据来测试我们的 TICKscript 和 UDF,但这更快(并且比 3D 打印机便宜得多)。

#!/usr/bin/python2

from numpy import random
from datetime import timedelta, datetime
import sys
import time
import requests


# Target temperatures in C
hotend_t = 220
bed_t = 90
air_t = 70

# Connection info
write_url = 'http://localhost:9092/write?db=printer&rp=autogen&precision=s'
measurement = 'temperatures'

def temp(target, sigma):
    """
    Pick a random temperature from a normal distribution
    centered on target temperature.
    """
    return random.normal(target, sigma)

def main():
    hotend_sigma = 0
    bed_sigma = 0
    air_sigma = 0
    hotend_offset = 0
    bed_offset = 0
    air_offset = 0

    # Define some anomalies by changing sigma at certain times
    # list of sigma values to start at a specified iteration
    hotend_anomalies =[
        (0, 0.5, 0), # normal sigma
        (3600, 3.0, -1.5), # at one hour the hotend goes bad
        (3900, 0.5, 0), # 5 minutes later recovers
    ]
    bed_anomalies =[
        (0, 1.0, 0), # normal sigma
        (28800, 5.0, 2.0), # at 8 hours the bed goes bad
        (29700, 1.0, 0), # 15 minutes later recovers
    ]
    air_anomalies = [
        (0, 3.0, 0), # normal sigma
        (10800, 5.0, 0), # at 3 hours air starts to fluctuate more
        (43200, 15.0, -5.0), # at 12 hours air goes really bad
        (45000, 5.0, 0), # 30 minutes later recovers
        (72000, 3.0, 0), # at 20 hours goes back to normal
    ]

    # Start from 2016-01-01 00:00:00 UTC
    # This makes it easy to reason about the data later
    now = datetime(2016, 1, 1)
    second = timedelta(seconds=1)
    epoch = datetime(1970,1,1)

    # 24 hours of temperatures once per second
    points = []
    for i in range(60*60*24+2):
        # update sigma values
        if len(hotend_anomalies) > 0 and i == hotend_anomalies[0][0]:
            hotend_sigma = hotend_anomalies[0][1]
            hotend_offset = hotend_anomalies[0][2]
            hotend_anomalies = hotend_anomalies[1:]

        if len(bed_anomalies) > 0 and i == bed_anomalies[0][0]:
            bed_sigma = bed_anomalies[0][1]
            bed_offset = bed_anomalies[0][2]
            bed_anomalies = bed_anomalies[1:]

        if len(air_anomalies) > 0 and i == air_anomalies[0][0]:
            air_sigma = air_anomalies[0][1]
            air_offset = air_anomalies[0][2]
            air_anomalies = air_anomalies[1:]

        # generate temps
        hotend = temp(hotend_t+hotend_offset, hotend_sigma)
        bed = temp(bed_t+bed_offset, bed_sigma)
        air = temp(air_t+air_offset, air_sigma)
        points.append("%s hotend=%f,bed=%f,air=%f %d" % (
            measurement,
            hotend,
            bed,
            air,
            (now - epoch).total_seconds(),
        ))
        now += second

    # Write data to Kapacitor
    r = requests.post(write_url, data='\n'.join(points))
    if r.status_code != 204:
        print >> sys.stderr, r.text
        return 1
    return 0

if __name__ == '__main__':
    exit(main())

将上述脚本另存为 /tmp/kapacitor_udf/printer_data.py

此 Python 脚本有两个 Python 依赖项:requestsnumpy。它们可以轻松地通过 pip 或您的软件包管理器安装。

此时,我们有一个准备就绪的任务和一个脚本来生成一些带有异常的虚假数据。现在我们可以创建虚假数据的记录,以便我们可以轻松地迭代任务

# Start the recording in the background
kapacitor record stream -task print_temps -duration 24h -no-wait
# Grab the ID from the output and store it in a var
rid=7bd3ced5-5e95-4a67-a0e1-f00860b1af47
# Run our python script to generate data
chmod +x ./printer_data.py
./printer_data.py

我们可以通过列出有关记录的信息来验证它是否有效。我们的记录大小为 1.6MB,因此您的记录大小应该接近该大小

$ kapacitor list recordings $rid
ID                                      Type    Status    Size      Date
7bd3ced5-5e95-4a67-a0e1-f00860b1af47    stream  finished  1.6 MB    04 May 16 11:44 MDT

检测异常

最后,让我们针对我们的任务运行播放,看看它是如何工作的

kapacitor replay -task print_temps -recording $rid -rec-time

检查各种日志文件,看看算法是否捕获到异常

cat /tmp/kapacitor_udf/{hotend,bed,air}_failure.log

根据上面的 printer_data.py 脚本,应该在以下位置出现异常

  • 1 小时:热端
  • 8 小时:床
  • 12 小时:空气

可能也会有一些误报,但是,由于我们希望它与真实数据(而不是我们漂亮的干净的虚假数据)一起工作,因此此时进行调整没有太大帮助。

好了,我们成功了。现在,当我们的打印温度偏离常态时,我们可以收到警报。希望您现在对 Kapacitor UDF 的工作原理有了更好的了解,并且有一个良好的工作示例作为进一步使用 UDF 的起点。

框架已经到位,现在去插入一个适用于您领域的真实异常检测算法!

扩展示例

我们留下了一些内容作为读者的练习

  1. 快照/恢复:Kapacitor 将定期快照您的 UDF 进程的状态,以便在进程重新启动时可以恢复它。 此处 的示例具有 snapshotrestore 方法的实现。为 TTestHandler 处理程序实现它们作为练习。

  2. 将算法从 t 检验更改为更适合您领域的内容。numpyscipy 都拥有丰富的算法。

  3. info 请求返回的选项可以包含多个参数。修改 field 选项以接受三个字段名称,并更改 TTestHandler 以维护每个字段而不是仅一个字段的历史数据和批处理。这样,只需要运行一个 ttest.py 进程。


此页是否对您有帮助?

感谢您的反馈!


Flux 的未来

Flux 即将进入维护模式。您可以继续像当前一样使用它,而无需对代码进行任何更改。

阅读更多

现已全面上市

InfluxDB 3 Core 和 Enterprise

快速启动。更快扩展。

获取更新

InfluxDB 3 Core 是一个开源、高速、最新的数据引擎,可实时收集和处理数据,并将其持久化到本地磁盘或对象存储。InfluxDB 3 Enterprise 基于 Core 的基础构建,增加了高可用性、读取副本、增强的安全性以及数据压缩,以实现更快的查询和优化的存储。InfluxDB 3 Enterprise 的免费层可供非商业家庭或业余爱好者使用。

有关更多信息,请查看