上次提到过Ubuntu上Coreseek+php的安装的安装一文,我个人建议Coreseek最好采用Python作为数据源,相对灵活性很大。这次我就分享一下我写的一个CoreSeek的Python数据源基类。
这个基类的优势在于特别是对于“分库分表”的MySQL来说,支持直接多进程并发读库,性能超强。而且对于Python2.6以下不具有多进程特性的用户来说,这个基类支持通过线程来模拟进程,完全透明!
该库已经在生产环境中使用。
需要MySQLdb类包
#!/usr/bin/env python
# -*- coding:utf-8 -*-
# abs class for CoreSeek indexer data source
#
# By Litrin J. 2011-07
#
from DBConfig import DBConfig
import MySQLdb
import datetime, time, types
try:
from multiprocessing import Process, Queue
except:
from threading import Thread as Process
from Queue import Queue
class CoreSeek(object):
'''''
Abs class for CoreSeek data source.
'''
DBName = ""
#field list, you can use "as" method, just like 'id as uid' to rename the `id` field to uid
Field = []
WhereCause = "TRUE"
#the uniq id in the row for 1 table
UniqId = None
Scheme = []
FieldOrder = []
SQLGroupBy = ""
#if the table to long, use it!
SQLLimit = 0
#Debug switch
Debug = False
MaxProcess = 2
#private var
__data = []
__curItem = []
__uniqNumber = 0
__tableList = []
__processPool = []
def __init__(self, conf):
if self.__class__ == CoreSeek:
raise NotImplementedError, "Cannot create object of class CoreSeek!"
self.conf = conf
def __del__(self):
pass
def __getattr__(self, key):
if self.UniqId is None and 'id' == key:
return self.__uniqNumber
else:
return self.__curItem[key]
def __iter__(self):
while self.NextDocument() :
yield self.__curItem
def __str__(self):
return str(self.__curItem)
def GetScheme(self):
return self.Scheme
def GetFieldOrder(self):
return self.FieldOrder
def Connected(self):
tableList = DBConfig().getDBTableConfig(self.DBName)
self.__tableList = Queue(len(tableList))
for dConfig in tableList:
self.__tableList.put(dConfig)
self.__data = Queue()
if (len(tableList) < self.MaxProcess):
self.MaxProcess = len(tableList)
for i in range (0, self.MaxProcess):
process = Process(target=self.getTableData).run()
self.__processPool.append(process)
def NextDocument(self, err=None):
if ( self.__data.empty() == False ):
self.__curItem = self.__data.get()
self.__uniqNumber += 1
if (self.Debug):
print self
return True
iProcessRuning = 0
for process in self.__processPool:
if process is not None and process.is_alive():
iProcessRuning += 1
if (iProcessRuning > 0):
return self.NextDocument()
else:
del self.__tableList
del self.__data
time.sleep(0.01)
return False
def getTableData(self):
if (self.__tableList.empty() ):
return False
dConfig = self.__tableList.get()
sSQL = self.getSQL(dConfig["tableName"])
iCountPreLoad = self.SQLLimit
iRecordLoaded = 0
if (iCountPreLoad == 0):
iRecordLoaded = self.doLoadData(dConfig, sSQL)
else:
iStep = 0
iRecordCount = iCountPreLoad
while (iCountPreLoad == iRecordCount):
sLimit = " LIMIT " + str(iStep * iCountPreLoad ) + ", " + str(iCountPreLoad)
sLimitSQL = sSQL + sLimit
iRecordCount = self.doLoadData(dConfig, sLimitSQL)
if (iRecordCount < iCountPreLoad):
break
iRecordLoaded += iRecordCount
iStep += 1
self.getTableData()
return True
def doLoadData(self, dConfig, sSQL):
tableNumber = int(dConfig["tableName"][-2:], 16)
mysqlHandle = MySQLdb.connect(host=dConfig["host"], user=dConfig["user"],passwd=dConfig["passwd"], db=dConfig["dbName"], charset="UTF8")
mysqlCursor = mysqlHandle.cursor()
if (self.Debug):
print "SQL: " + sSQL
try:
mysqlCursor.execute(sSQL)
lResultList = mysqlCursor.fetchall()
iRecordCount = len(lResultList)
except:
return 0
for lRow in lResultList:
dRow = self.getNamedDict(lRow)
if self.UniqId is not None:
dRow["tablerecord"] = [tableNumber, dRow[self.UniqId.lower()]]
dRow["id"] = int (dRow[self.UniqId.lower()]) * 1000 + tableNumber
dRow = self.buildIndex(dRow)
self.__data.put(dRow)
mysqlHandle.close()
return iRecordCount
def getSQL(self, sTableName):
SQL = "SELECT %s FROM %s WHERE %s " % (", ".join(self.Field), sTableName, self.WhereCause)
if(self.SQLGroupBy != ""):
SQL += "GROUP BY %s " % self.SQLGroupBy
return SQL
def getNamedDict(self, lRow):
i = 0
result = {}
for sFieldName in self.Field:
sDictKey = sFieldName.lower()
if (sDictKey.find(" as ") > 0):
sDictKey = sDictKey[sDictKey.find(' as ')+4: ]
if (isinstance(lRow[i], types.StringTypes)):
result[sDictKey] = lRow[i].encode("utf-8")
elif (type(lRow[i]) == type(datetime.datetime.today())):
result[sDictKey] = int(time.mktime(lRow[i].timetuple()))
else:
result[sDictKey] = lRow[i]
i+=1
return result
def buildIndex(self, *dRow):
if(self.Debug):
print dRow
return dRow
class CoreSeekUtility:
@staticmethod
def literallyCut(string, sChar=" ", CharSet="utf-8"):
uString = string.decode(CharSet)
iLength = len(uString)
if (iLength <= 1):
return string
lString = [uString[i:i+1] for i in range(0, iLength)]
sCut = sChar.join(lString)
return sCut.encode(CharSet)
@staticmethod
def dictIndex(dRecord):
result = ""
for key in dRecord:
key = key.upper()
value = dRecord[key]
result += key + "=" + str(value) + " "
return result
数据源示例:
from CoreSeek import CoreSeek, CoreSeekUtility
import time
class Topic(CoreSeek):
Scheme = [
('id' , {'docid' : True ,} ),
('index', { 'type' : 'text',} ),
('index_uid', { 'type' : 'text',} ),
('topicid', { 'type' : 'integer'} ),
('type', { 'type' : 'integer'} ),
('privacy', { 'type' : 'integer'} ),
('body', { 'type' : 'string'} ),
('title', { 'type' : 'string'} ),
('uid', { 'type' : 'string'} ),
('description', { 'type' : 'string'} )
]
FieldOrder = [('index', 'index_uid')]
Field = ["topicId", "uid", "title", 'body', 'privacy', 'description', 'type']
DBName = "Topic"
def buildIndex(self, dRow):
dRow['index_uid'] = dRow['uid']
dRow['index'] = "%s %s" % (dRow['title'], dRow['description'])
return dRow
class TopicDelta(Topic):
WhereCause = "createTime > %s " % (time.time() - 3600)
if __name__ == "__main__":
conf = {}
source = Topic(conf)
source.Connected()
while source.NextDocument():
print source
转载请注明:爱开源 » CoreSeek Python数据源的基类