上次提到过Ubuntu上Coreseek+php的安装的安装一文,我个人建议Coreseek最好采用Python作为数据源,相对灵活性很大。这次我就分享一下我写的一个CoreSeek的Python数据源基类。
这个基类的优势在于特别是对于“分库分表”的MySQL来说,支持直接多进程并发读库,性能超强。而且对于Python2.6以下不具有多进程特性的用户来说,这个基类支持通过线程来模拟进程,完全透明!
该库已经在生产环境中使用。
需要MySQLdb类包
#!/usr/bin/env python # -*- coding:utf-8 -*- # abs class for CoreSeek indexer data source # # By Litrin J. 2011-07 # from DBConfig import DBConfig import MySQLdb import datetime, time, types try: from multiprocessing import Process, Queue except: from threading import Thread as Process from Queue import Queue class CoreSeek(object): ''''' Abs class for CoreSeek data source. ''' DBName = "" #field list, you can use "as" method, just like 'id as uid' to rename the `id` field to uid Field = [] WhereCause = "TRUE" #the uniq id in the row for 1 table UniqId = None Scheme = [] FieldOrder = [] SQLGroupBy = "" #if the table to long, use it! SQLLimit = 0 #Debug switch Debug = False MaxProcess = 2 #private var __data = [] __curItem = [] __uniqNumber = 0 __tableList = [] __processPool = [] def __init__(self, conf): if self.__class__ == CoreSeek: raise NotImplementedError, "Cannot create object of class CoreSeek!" self.conf = conf def __del__(self): pass def __getattr__(self, key): if self.UniqId is None and 'id' == key: return self.__uniqNumber else: return self.__curItem[key] def __iter__(self): while self.NextDocument() : yield self.__curItem def __str__(self): return str(self.__curItem) def GetScheme(self): return self.Scheme def GetFieldOrder(self): return self.FieldOrder def Connected(self): tableList = DBConfig().getDBTableConfig(self.DBName) self.__tableList = Queue(len(tableList)) for dConfig in tableList: self.__tableList.put(dConfig) self.__data = Queue() if (len(tableList) < self.MaxProcess): self.MaxProcess = len(tableList) for i in range (0, self.MaxProcess): process = Process(target=self.getTableData).run() self.__processPool.append(process) def NextDocument(self, err=None): if ( self.__data.empty() == False ): self.__curItem = self.__data.get() self.__uniqNumber += 1 if (self.Debug): print self return True iProcessRuning = 0 for process in self.__processPool: if process is not None and process.is_alive(): iProcessRuning += 1 if (iProcessRuning > 0): return self.NextDocument() else: del self.__tableList del self.__data time.sleep(0.01) return False def getTableData(self): if (self.__tableList.empty() ): return False dConfig = self.__tableList.get() sSQL = self.getSQL(dConfig["tableName"]) iCountPreLoad = self.SQLLimit iRecordLoaded = 0 if (iCountPreLoad == 0): iRecordLoaded = self.doLoadData(dConfig, sSQL) else: iStep = 0 iRecordCount = iCountPreLoad while (iCountPreLoad == iRecordCount): sLimit = " LIMIT " + str(iStep * iCountPreLoad ) + ", " + str(iCountPreLoad) sLimitSQL = sSQL + sLimit iRecordCount = self.doLoadData(dConfig, sLimitSQL) if (iRecordCount < iCountPreLoad): break iRecordLoaded += iRecordCount iStep += 1 self.getTableData() return True def doLoadData(self, dConfig, sSQL): tableNumber = int(dConfig["tableName"][-2:], 16) mysqlHandle = MySQLdb.connect(host=dConfig["host"], user=dConfig["user"],passwd=dConfig["passwd"], db=dConfig["dbName"], charset="UTF8") mysqlCursor = mysqlHandle.cursor() if (self.Debug): print "SQL: " + sSQL try: mysqlCursor.execute(sSQL) lResultList = mysqlCursor.fetchall() iRecordCount = len(lResultList) except: return 0 for lRow in lResultList: dRow = self.getNamedDict(lRow) if self.UniqId is not None: dRow["tablerecord"] = [tableNumber, dRow[self.UniqId.lower()]] dRow["id"] = int (dRow[self.UniqId.lower()]) * 1000 + tableNumber dRow = self.buildIndex(dRow) self.__data.put(dRow) mysqlHandle.close() return iRecordCount def getSQL(self, sTableName): SQL = "SELECT %s FROM %s WHERE %s " % (", ".join(self.Field), sTableName, self.WhereCause) if(self.SQLGroupBy != ""): SQL += "GROUP BY %s " % self.SQLGroupBy return SQL def getNamedDict(self, lRow): i = 0 result = {} for sFieldName in self.Field: sDictKey = sFieldName.lower() if (sDictKey.find(" as ") > 0): sDictKey = sDictKey[sDictKey.find(' as ')+4: ] if (isinstance(lRow[i], types.StringTypes)): result[sDictKey] = lRow[i].encode("utf-8") elif (type(lRow[i]) == type(datetime.datetime.today())): result[sDictKey] = int(time.mktime(lRow[i].timetuple())) else: result[sDictKey] = lRow[i] i+=1 return result def buildIndex(self, *dRow): if(self.Debug): print dRow return dRow class CoreSeekUtility: @staticmethod def literallyCut(string, sChar=" ", CharSet="utf-8"): uString = string.decode(CharSet) iLength = len(uString) if (iLength <= 1): return string lString = [uString[i:i+1] for i in range(0, iLength)] sCut = sChar.join(lString) return sCut.encode(CharSet) @staticmethod def dictIndex(dRecord): result = "" for key in dRecord: key = key.upper() value = dRecord[key] result += key + "=" + str(value) + " " return result
数据源示例:
from CoreSeek import CoreSeek, CoreSeekUtility import time class Topic(CoreSeek): Scheme = [ ('id' , {'docid' : True ,} ), ('index', { 'type' : 'text',} ), ('index_uid', { 'type' : 'text',} ), ('topicid', { 'type' : 'integer'} ), ('type', { 'type' : 'integer'} ), ('privacy', { 'type' : 'integer'} ), ('body', { 'type' : 'string'} ), ('title', { 'type' : 'string'} ), ('uid', { 'type' : 'string'} ), ('description', { 'type' : 'string'} ) ] FieldOrder = [('index', 'index_uid')] Field = ["topicId", "uid", "title", 'body', 'privacy', 'description', 'type'] DBName = "Topic" def buildIndex(self, dRow): dRow['index_uid'] = dRow['uid'] dRow['index'] = "%s %s" % (dRow['title'], dRow['description']) return dRow class TopicDelta(Topic): WhereCause = "createTime > %s " % (time.time() - 3600) if __name__ == "__main__": conf = {} source = Topic(conf) source.Connected() while source.NextDocument(): print source
转载请注明:爱开源 » CoreSeek Python数据源的基类