Supervisor Event Listener
目录
Supervisor是一个在类Unix操作系统上提供给用户管理大量进程的开源软件,supervisor具有自动启动和重启进程等许多功能,在管理多个进程时十分方便。然而,有时候对于进程崩溃或者重启,以及内存占用大小等都希望能够通知我们,supervisor的event listener刚好提供了这个功能……
简介
参考官方文档:
Events are an advanced feature of Supervisor introduced in version 3.0. You don’t need to understand events if you simply want to use Supervisor as a mechanism to restart crashed processes or as a system to manually control process state. You do need to understand events if you want to use Supervisor as part of a process monitoring/notification framework.
Listeners & Notifications
Supervisor提供了一种特殊的方式,让我们可以编写某些特定的程序(在supervisor中以子进程的方式运行,其实也就是用supervisor管理该程序)作为Event Listener来订阅 Event Notifications。
Event notifications分成不同的类型,我们编写的Event listener可以订阅其中的一个或者多个类型。在supervisor运行时,即是没有event listener,supervisor也会持续不断的发送对应的event notifications;如果某个event listener订阅了某一类型的event,当supervisor发送该类型的notifications时,对应的listener就会收到相应的通知。
配置
在supervisor的配置中,跟普通进程的配置类似,用 [eventlistener:xxx] 替代 [program:xxx] 即可。不过有部分参数不能使用,因为event listener不具备“capture mode”,如:
- stdout_capture_maxbytes
- stderr_capture_maxbytes
[eventlistener:memmon] command=memmon -a 200MB -m bob@example.com events=TICK_60
Event Listener
那么我们应该如何编写一个自己的event listenser呢?
Supervisor通过 stdin 直接发送相关的数据给event listener,listenner处理完成后从stdout输出执行结果(ok 或者 fail),Event Listener在通过stdout输出时应该以unbuffered或者flush的形式直接输出结果给supervisor。
Header Tokens
Supervisor首先发送一个Header Tokens给event listener:
Key | Description | Example |
---|---|---|
ver | The event system protocol version | 3.0 |
server | The identifier of the supervisord sending the event (see config file [supervisord] section identifier value. | |
serial | An integer assigned to each event. No two events generated during the lifetime of a supervisord process will have the same serial number. The value is useful for functional testing and detecting event ordering anomalies. | 30 |
pool | The name of the event listener pool which generated this event. | myeventpool |
pooolserial | An integer assigned to each event by the eventlistener pool which it is being sent from. No two events generated by the same eventlister pool during the lifetime of a supervisord process will have the same poolserial number. This value can be used to detect event ordering anomalies. | 30 |
eventname | The specific event type name (see Event Types) | TICK_5 |
len | An integer indicating the number of bytes in the event payload, aka the PAYLOAD_LENGTH |
例如:
ver:3.0 server:supervisor serial:21 pool:listener poolserial:10 eventname:PROCESS_COMMUNICATION_STDOUT len:54
接着,我们编写的event listener继续从stdin读取 PAYLOAD_LENGTH字节的内容:
# PROCESS_COMMUNICATION_STDOUT event notification processname:foo groupname:bar pid:123 This is the data that was sent between the tags
Listener States
在event listener处理的过程中,supervisor根据lisener输出的结果判断该listener的状态:
Name | Description |
---|---|
ACKNOWLEDGED | The event listener has acknowledged (accepted or rejected) an event send. |
READY | Event notificatons may be sent to this event listener |
BUSY | Event notifications may not be sent to this event listener. |
当一个event listener第一次启动时,状态为 ACKNOWLEDGED;输出 READY\n 之后,其状态变成为 READY,这时候该listener开始接受supervisor的notifications(或者说supervisor才将notifications发送给该event listener);接受到一个notification之后,其状态为 BUSY,直到listener执行完毕输出(OK or FAIL,格式:RESULT 2\nOK,2为结果“OK”的长度),这时listener重新进入状态 ACKNOWLEDGED。
Notification Protocol
当我们在配置文件中定义了一个 [eventlistener:x] 的时候,实际上是定义了一个listener pool,我们可以通过 numprocs 来设置 pool 中listener的数量。
只有一个listener的状态为 READY 的情况下,supervisor才会发送notification给它;如果该listener的状态不是 READY,则supervisor会将notification发送给同一个pool的其他listener。
当listener处理结果为 fail 时,则supervisor会隔一段时间后重新给该listener发送该notification。
Example
import sys def write_stdout(s): # only eventlistener protocol messages may be sent to stdout sys.stdout.write(s) sys.stdout.flush() def write_stderr(s): sys.stderr.write(s) sys.stderr.flush() def main(): while 1: # transition from ACKNOWLEDGED to READY write_stdout('READY\n') # read header line and print it to stderr line = sys.stdin.readline() write_stderr(line) # read event payload and print it to stderr headers = dict([ x.split(':') for x in line.split() ]) data = sys.stdin.read(int(headers['len'])) write_stderr(data) # transition from READY to ACKNOWLEDGED write_stdout('RESULT 2\nOK') if __name__ == '__main__': main()
Superlance
Superlance是一个用来监视和控制supervisor中进程的插件包,通过它可以很轻松的实现对supervisor中进程状态、内存占用等的监控,并发送邮件给指定用户。
$ pip install superlance
参考官方文档,这里我们以 memmon 为例,在进程占用内存过大时自动重启:
[eventlistener:memmon] command=memmon -c -p memkiller=80MB -s "/usr/sbin/sendmail -t -i -F memmon -f memmon@example.com" -m aaa@example.com events=TICK_60
简单写一个测试程序:
#!/usr/bin/env python # -*- coding: utf-8 -*- import time import falcon from wsgiref import simple_server class MainResource: def on_get(self, req, resp): resp.status = falcon.HTTP_200 resp.body = "done" biglist = [] for i in range(1000000): a = str(time.time()*1000000) + str(i) biglist.append(a) print "done" while True: time.sleep(1) app = falcon.API() main = MainResource() app.add_route("/", main) if __name__ == "__main__": httpd = simple_server.make_server("127.0.0.1", 8999, app) httpd.serve_forever()
然后在supervisor中添加该进程:
[program:memkiller] directory=/home/jachua/project/jiehua/memkiller command=python main.py autostart=true autorestart=true redirect_stderr=true stdout_logfile=/home/jachua/log/supervisor/memkiller
当我们访问 localhost:8999 时,可以看到内存占用超过 100MB,并能够在邮箱中收到相应的邮件。
评论