记得在zipsite项目中,对于文本文件本身就没有序列化的问题,但对于富媒体文件的序列化,我采用的方法是pickle。后来我忽然觉得使用pickle是可以实现简单的分布任务处理的。
废话不表,上代码:
首先是一个“有限事务机”,讲白一点就是所有的任务都必须通过executeNextStep()方法按step1,step2, finish的顺序执行。但增加了两个魔术方法,稍后再说。
process.py
import pickle class process: _nextStep = None _lock = False name = None def __init__(self, name): self.name = name self.setNextStep('step 1') def setNextStep(self, nextStep): if self._lock : return self._lock = True if nextStep == 'step 1' : self._nextStepHandle = self.step1 elif nextStep == 'step 2' : self._nextStepHandle = self.step2 elif nextStep == 'finish' : self._nextStepHandle = self.finish else: self._nextStepHandle = None return self._nextStep = nextStep self._lock = False def step1(self): print "Processing step 1, %s " % self.name self.setNextStep('step 2') def step2(self): print "Processing step 2, %s " % self.name self.setNextStep('finish') def finish(self): print "Processing finished" self.setNextStep(None) def executeNextStep(self, *arge, **args): if self._nextStepHandle is not None: self._nextStepHandle(*arge, **args) def __getstate__(self): return self._nextStep def __setstate__(self, state): self.setNextStep(state) if __name__ == '__main__': p = process('Litrin') p.executeNextStep() p.executeNextStep() p.executeNextStep()
执行结果为:
Processing step 1, Litrin
Processing step 2, Litrin
Processing finished
服务器端,Server.py
# Echo server program import socket import sys import pickle from process import process HOST = None # Symbolic name meaning all available interfaces PORT = 50007 # Arbitrary non-privileged port s = None for res in socket.getaddrinfo(HOST, PORT, socket.AF_UNSPEC, socket.SOCK_STREAM, 0, socket.AI_PASSIVE): af, socktype, proto, canonname, sa = res try: s = socket.socket(af, socktype, proto) except socket.error as msg: s = None continue try: s.bind(sa) s.listen(1) except socket.error as msg: s.close() s = None continue break if s is None: print 'could not open socket' sys.exit(1) while True: conn, addr = s.accept() while 1: data = conn.recv(64) if not data: break Object = pickle.loads(data) Object.executeNextStep() conn.send(pickle.dumps(Object)) conn.close()
客户端,client.py
import socket import time import pickle from process import process class Client(object): handle = None host = '127.0.0.1' port = 50007 def __init__(self): if self.handle is None: self.connect() def __del__(self): if self.handle is not None: self.close() def close(self): self.handle.close() def connect(self): address = (self.host, self.port) self.handle = socket.socket(socket.AF_INET, socket.SOCK_STREAM) #try: self.handle.connect(address) #except: # self.error("can't connect to %s" % self.hostName) # exit() def send(self): p = process('litrin') p.executeNextStep() tcpMessage = pickle.dumps(p) self.handle.send(tcpMessage) tcpRespones = '' while True: d = self.handle.recv(16) tcpRespones += d if len(d) < 16: break # except: # self.error("can't get date from %s" % self.hostName) # return False self.respones = pickle.loads(tcpRespones) self.respones.executeNextStep() self.respones.executeNextStep() if __name__ == '__main__': a = Client() a.connect() a.send() a.close()
执行之后服务器端后,再执行客户端显示:
Processing step 1, Litrin
Processing finished
而服务器端显示:
Processing step 2, Litrin
这也就意味着process对象在两台主机上完成了分布事务管理。
pickle是python提供的一个对象序列化工具,简单的来说就是会将一个对象用字符序列化以方便保存和传输。pickle依赖两个魔术方法:
- __getstate__(self):返回对象的当前状态描述
- __setstate__(self, state): 将状态导回到对象中
理论上说,并不一定需要服务器端的代码和客户端的代码保持一致就可以实现分布式处理,感觉对于很多需要加密验证的方法上这一招很有效。
以上仅仅只是个Demo,现实中由于网络的传输存在不确定因素,对传输的文本进行校验回调是需要考虑的,同时由于pickle的方式往往会依次向父类递归,状况也会复杂很多。
转载请注明:爱开源 » Python实现简单分布式处理