公司项目中需要使用长链接方式的获取后端数据库——主要是Redis的实时数据。
由于项目本身是PHP的初次看到这个项目,首先想到的是Apache + mod_php的方式,配合php的ob_start()方式直接调用,就如同我之前的一篇东西所说的那样。可问题不这么简单:
- 系统是nginx + php-fpm方式,php-fpm“hold不住”过多的Http请求,而nginx需要调整响应时间。
- 用户数量很多,Apache的消耗很大。本身功能点很小,实现成本不合算。
说到并发,Apache采用的方式是大量的fork进程,通过“人多力量大”的方式应对多个请求,这样的基于进程(线程)模型的并发,一旦调用sleep,进程只是休眠而已,仍然占用着内存,仍然需要进程调度,资源始终得不到释放,资源自然无法得到控制。这些年nginx的流行大多都是因为nginx采用的epoll方式有效的解决这个问题——直接挂掉进程,然后再指定时间内重新启用。这就是传说中的“非阻塞”(asynchronous IO AIO)。
多并发、轻量级应用,我首先想到的是Python,加上“非阻塞”关键字,得到的结果就是Tornado。
tornado原本是FriendFeed引擎的开源版本,本身就是为解决“C10K”问题而生的。
稍微研究了一下,coding也很简便:
#!/usr/bin/env python #By Litrin J #http://www.litrin.net/ import tornado.httpserver import tornado.ioloop import tornado.options import tornado.web import time, redis, hashlib,random from tornado.options import define, options define("port", default=8888, help="run on the given port", type=int) class LongPolling(tornado.web.RequestHandler): minWaitTime = 15 maxWaitTime = 900 RedisHost = "172.18.194.98" RedisPrefix = "Q/MSG%s" SignSalt = "salt" StopCode = "{'stop':1}" @tornado.web.asynchronous def get(self): sUid = self.get_argument("uid", None) sSign = self.get_argument("sign", None) sJsonCall = self.get_argument("jsonCallback", None) if (sUid is None or sSign is None or self.checkSign(sUid, sSign) == False): raise tornado.web.HTTPError(404) self.clear() else: self.doLongPolling(callback=self.onWaitting) def checkSign(self, sUid, sSign): return True def doLongPolling(self, callback): #Check if the client close the connection if self.request.connection.stream.closed(): self.clear() sKey = self.RedisPrefix % self.uid res = redis.Redis(self.RedisHost) sMessage = res.rpop(sKey) del res #close the redis at 1st time callback(sMessage) def onWaitting(self, sMessage): if (sMessage is not None): self.onRespones(sMessage) else: iNextPollingTime = time.time() + self.minWaitTime if self.minWaitTime < self.maxWaitTime: self.minWaitTime *= 2 #Can't useing time.sleep for no-bloking mode tornado.ioloop.IOLoop.instance().add_timeout( iNextPollingTime , lambda: self.doLongPolling(callback=self.onWaitting) ) else: self.onRespones(self.StopCode) def onRespones(self, sMessage): sJsonCall = self.get_argument("jsonCallback", None) if sJsonCall is not None: sMessage= sJsonCall + "( " + sMessage.decode('utf-8') + ")" self.write(sMessage) self.finish() def main(): tornado.options.parse_command_line() application = tornado.web.Application([ (r"/", LongPolling), ]) http_server = tornado.httpserver.HTTPServer(application) http_server.listen(options.port) tornado.ioloop.IOLoop.instance().start() if __name__ == "__main__": main()
一旦建立http连接,系统会每隔一段时间去轮询用户的队列中是否有数据,有数据时才向客户端发送请求,否则就是“长时间小菊花”状态。很简单,仅仅用了不到100行代码的样子。当然,为了安全因素和节省资源,代码里添加了很多限制性的操作,否则可能会更简洁高效。
需要注意的是,tornado的理念就是建立在“非阻塞”基础上的,你当然可以选择time.sleep(n),但传统上的time.sleep(n)方式的休眠会导致IO阻塞,故只能采用tornado.ioloop.IOLoop.instance().add_timeout 的方法回调来回调去,将下次启动的时间转给系统。话说回来,这是我第一次在Java或JS以外使用回调函数……
初步测试了一下,借助Redis本身就很强的并发能力在台式机上可以每秒钟完成4K次请求以上,并可以保持同时4K以上的访问不掉线,此时的CPU load仍然保持在1以下,单是测试的结果就足以让Apache+Php组合汗颜。
部署的时候采用了单IP多端口方式,服务器有4个核心,决定开4个端口对应,分别是8885~8888,修改
define("port", default=8888, help="run on the given port", type=int)
即可
前端nginx用负责负载分发:
upstream backend { server 127.0.0.1:8888; server 127.0.0.1:8887; server 127.0.0.1:8886; server 127.0.0.1:8885; } server{ listen 80; server_name message.test.com; keepalive_timeout 65; # proxy_read_timeout 2000; # sendfile on; tcp_nopush on; tcp_nodelay on; location / { proxy_pass_header Server; proxy_set_header Host $http_host; proxy_redirect off; proxy_set_header X-Real-IP $remote_addr; proxy_set_header X-Scheme $scheme; proxy_pass http://backend; } }
需要注意的是,这样的httpSocket方式很可能被内核认为是SYN攻击,而且对于大量的keep alive而言,很可能超过核心限制,还要修改sysctl参数。
#系统最大打开文件数 fs.file-max = 201510 #TCP连接的最长时间 net.ipv4.tcp_keepalive_time = 1800 #出现问题的尝试次数 net.ipv4.tcp_keepalive_probes = 15 #检查是否连接的等待时间 net.ipv4.tcp_keepalive_intvl = 60 #不进行SYN cookies防御 net.ipv4.tcp_syncookies = 0
转载请注明:爱开源 » 非阻塞的Python web框架tornado