• 隐藏侧边栏
  • 展开分类目录
  • 关注微信公众号
  • 我的GitHub
  • QQ:1753970025
Chen Jiehua

Supervisor Event Listener 

目录

Supervisor是一个在类Unix操作系统上提供给用户管理大量进程的开源软件,supervisor具有自动启动和重启进程等许多功能,在管理多个进程时十分方便。然而,有时候对于进程崩溃或者重启,以及内存占用大小等都希望能够通知我们,supervisor的event listener刚好提供了这个功能……

简介

参考官方文档:

Events are an advanced feature of Supervisor introduced in version 3.0. You don’t need to understand events if you simply want to use Supervisor as a mechanism to restart crashed processes or as a system to manually control process state. You do need to understand events if you want to use Supervisor as part of a process monitoring/notification framework.

Listeners & Notifications

Supervisor提供了一种特殊的方式,让我们可以编写某些特定的程序(在supervisor中以子进程的方式运行,其实也就是用supervisor管理该程序)作为Event Listener来订阅 Event Notifications。

Event notifications分成不同的类型,我们编写的Event listener可以订阅其中的一个或者多个类型。在supervisor运行时,即是没有event listener,supervisor也会持续不断的发送对应的event notifications;如果某个event listener订阅了某一类型的event,当supervisor发送该类型的notifications时,对应的listener就会收到相应的通知。

配置

在supervisor的配置中,跟普通进程的配置类似,用 [eventlistener:xxx] 替代 [program:xxx] 即可。不过有部分参数不能使用,因为event listener不具备“capture mode”,如:

  • stdout_capture_maxbytes
  • stderr_capture_maxbytes
[eventlistener:memmon]
command=memmon -a 200MB -m bob@example.com
events=TICK_60

Event Listener

那么我们应该如何编写一个自己的event listenser呢?

Supervisor通过 stdin 直接发送相关的数据给event listener,listenner处理完成后从stdout输出执行结果(ok 或者 fail),Event Listener在通过stdout输出时应该以unbuffered或者flush的形式直接输出结果给supervisor。

Header Tokens

Supervisor首先发送一个Header Tokens给event listener:

KeyDescriptionExample
verThe event system protocol version3.0
serverThe identifier of the supervisord sending the event (see config file [supervisord] section identifier value.
serialAn integer assigned to each event. No two events generated during the lifetime of a supervisord process will have the same serial number. The value is useful for functional testing and detecting event ordering anomalies.30
poolThe name of the event listener pool which generated this event.myeventpool
pooolserialAn integer assigned to each event by the eventlistener pool which it is being sent from. No two events generated by the same eventlister pool during the lifetime of a supervisord process will have the same poolserial number. This value can be used to detect event ordering anomalies.30
eventnameThe specific event type name (see Event Types)TICK_5
lenAn integer indicating the number of bytes in the event payload, aka the PAYLOAD_LENGTH

例如:

ver:3.0 server:supervisor serial:21 pool:listener poolserial:10 eventname:PROCESS_COMMUNICATION_STDOUT len:54

接着,我们编写的event listener继续从stdin读取 PAYLOAD_LENGTH字节的内容:

# PROCESS_COMMUNICATION_STDOUT event notification
processname:foo groupname:bar pid:123
This is the data that was sent between the tags

Listener States

在event listener处理的过程中,supervisor根据lisener输出的结果判断该listener的状态:

NameDescription
ACKNOWLEDGEDThe event listener has acknowledged (accepted or rejected) an event send.
READYEvent notificatons may be sent to this event listener
BUSYEvent notifications may not be sent to this event listener.

当一个event listener第一次启动时,状态为 ACKNOWLEDGED;输出 READY\n 之后,其状态变成为 READY,这时候该listener开始接受supervisor的notifications(或者说supervisor才将notifications发送给该event listener);接受到一个notification之后,其状态为 BUSY,直到listener执行完毕输出(OK or FAIL,格式:RESULT 2\nOK,2为结果“OK”的长度),这时listener重新进入状态 ACKNOWLEDGED。

Notification Protocol

当我们在配置文件中定义了一个 [eventlistener:x] 的时候,实际上是定义了一个listener pool,我们可以通过 numprocs 来设置 pool 中listener的数量。

只有一个listener的状态为 READY 的情况下,supervisor才会发送notification给它;如果该listener的状态不是 READY,则supervisor会将notification发送给同一个pool的其他listener。

当listener处理结果为 fail 时,则supervisor会隔一段时间后重新给该listener发送该notification。

Example

import sys

def write_stdout(s):
    # only eventlistener protocol messages may be sent to stdout
    sys.stdout.write(s)
    sys.stdout.flush()

def write_stderr(s):
    sys.stderr.write(s)
    sys.stderr.flush()

def main():
    while 1:
        # transition from ACKNOWLEDGED to READY
        write_stdout('READY\n')

        # read header line and print it to stderr
        line = sys.stdin.readline()
        write_stderr(line)

        # read event payload and print it to stderr
        headers = dict([ x.split(':') for x in line.split() ])
        data = sys.stdin.read(int(headers['len']))
        write_stderr(data)

        # transition from READY to ACKNOWLEDGED
        write_stdout('RESULT 2\nOK')

if __name__ == '__main__':
    main()

Superlance

Superlance是一个用来监视和控制supervisor中进程的插件包,通过它可以很轻松的实现对supervisor中进程状态、内存占用等的监控,并发送邮件给指定用户。

$ pip install superlance

参考官方文档,这里我们以 memmon 为例,在进程占用内存过大时自动重启:

[eventlistener:memmon]
command=memmon -c -p memkiller=80MB -s "/usr/sbin/sendmail -t -i -F memmon -f memmon@example.com" -m aaa@example.com
events=TICK_60

简单写一个测试程序:

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import time
import falcon
from wsgiref import simple_server


class MainResource:

    def on_get(self, req, resp):
        resp.status = falcon.HTTP_200
        resp.body = "done"

        biglist = []
        for i in range(1000000):
            a = str(time.time()*1000000) + str(i)
            biglist.append(a)

        print "done"
        while True:
            time.sleep(1)

app = falcon.API()
main = MainResource()
app.add_route("/", main)


if __name__ == "__main__":
    httpd = simple_server.make_server("127.0.0.1", 8999, app)
    httpd.serve_forever()

然后在supervisor中添加该进程:

[program:memkiller]
directory=/home/jachua/project/jiehua/memkiller
command=python main.py
autostart=true
autorestart=true
redirect_stderr=true
stdout_logfile=/home/jachua/log/supervisor/memkiller

当我们访问 localhost:8999 时,可以看到内存占用超过 100MB,并能够在邮箱中收到相应的邮件。

码字很辛苦,转载请注明来自ChenJiehua《Supervisor Event Listener》

评论