最新消息:

CoreSeek Python数据源的基类

python admin 2874浏览 0评论

上次提到过Ubuntu上Coreseek+php的安装的安装一文,我个人建议Coreseek最好采用Python作为数据源,相对灵活性很大。这次我就分享一下我写的一个CoreSeek的Python数据源基类。

这个基类的优势在于特别是对于“分库分表”的MySQL来说,支持直接多进程并发读库,性能超强。而且对于Python2.6以下不具有多进程特性的用户来说,这个基类支持通过线程来模拟进程,完全透明!

该库已经在生产环境中使用。

需要MySQLdb类包

#!/usr/bin/env python
# -*- coding:utf-8 -*-
# abs class for CoreSeek indexer data source
#
#   By Litrin J. 2011-07
#

from DBConfig import DBConfig
import MySQLdb
import datetime, time, types
try:
    from multiprocessing import Process, Queue
except:
    from threading import Thread as Process
    from Queue import Queue

class CoreSeek(object):
    '''''
    Abs class for CoreSeek data source.
    '''

    DBName = ""
    #field list, you can use "as" method, just like 'id as uid' to rename the `id` field to uid
    Field = []
    WhereCause = "TRUE"
    #the uniq id in the row for 1 table
    UniqId = None
    Scheme = []
    FieldOrder = []

    SQLGroupBy = ""
    #if the table to long, use it!
    SQLLimit = 0
    #Debug switch
    Debug = False

    MaxProcess = 2

    #private var
    __data = []
    __curItem = []
    __uniqNumber = 0
    __tableList = []

    __processPool = []

    def __init__(self, conf):
        if self.__class__ == CoreSeek:
            raise NotImplementedError, "Cannot create object of class CoreSeek!"

        self.conf = conf

    def __del__(self):
        pass

    def __getattr__(self, key):
        if self.UniqId is None and 'id' == key:
            return self.__uniqNumber
        else:
            return self.__curItem[key]

    def __iter__(self):
        while self.NextDocument() :
            yield self.__curItem

    def __str__(self):
        return str(self.__curItem)

    def GetScheme(self):
        return self.Scheme

    def GetFieldOrder(self):
        return self.FieldOrder

    def Connected(self):
        tableList = DBConfig().getDBTableConfig(self.DBName)
        self.__tableList = Queue(len(tableList))
        for dConfig in tableList:
            self.__tableList.put(dConfig)

        self.__data = Queue()
        if (len(tableList) < self.MaxProcess):
            self.MaxProcess = len(tableList)

        for i in range (0, self.MaxProcess):
            process = Process(target=self.getTableData).run()
            self.__processPool.append(process)

    def NextDocument(self, err=None):
        if (  self.__data.empty() == False ):
            self.__curItem = self.__data.get()
            self.__uniqNumber += 1

            if (self.Debug):
                print self
            return True

        iProcessRuning = 0
        for process in self.__processPool:
            if process is not None and process.is_alive():
                iProcessRuning += 1

        if (iProcessRuning > 0):
            return self.NextDocument()

        else:
            del self.__tableList
            del self.__data
            time.sleep(0.01)
            return False

    def getTableData(self):
        if (self.__tableList.empty() ):
            return False

        dConfig = self.__tableList.get()

        sSQL = self.getSQL(dConfig["tableName"])
        iCountPreLoad = self.SQLLimit
        iRecordLoaded = 0

        if (iCountPreLoad == 0):
            iRecordLoaded = self.doLoadData(dConfig, sSQL)

        else:
            iStep         = 0
            iRecordCount  = iCountPreLoad

            while (iCountPreLoad == iRecordCount):
                sLimit = " LIMIT " + str(iStep * iCountPreLoad ) + ", " + str(iCountPreLoad)
                sLimitSQL = sSQL + sLimit

                iRecordCount = self.doLoadData(dConfig, sLimitSQL)

                if (iRecordCount < iCountPreLoad):
                    break
                iRecordLoaded += iRecordCount

                iStep += 1

        self.getTableData()

        return True

    def doLoadData(self, dConfig, sSQL):

        tableNumber = int(dConfig["tableName"][-2:], 16)

        mysqlHandle = MySQLdb.connect(host=dConfig["host"], user=dConfig["user"],passwd=dConfig["passwd"], db=dConfig["dbName"], charset="UTF8")
        mysqlCursor = mysqlHandle.cursor()

        if (self.Debug):
            print "SQL: " + sSQL

        try:
            mysqlCursor.execute(sSQL)

            lResultList = mysqlCursor.fetchall()
            iRecordCount = len(lResultList)

        except:
            return 0

        for lRow in lResultList:
            dRow = self.getNamedDict(lRow)
            if self.UniqId is not None:
                dRow["tablerecord"] = [tableNumber, dRow[self.UniqId.lower()]]
                dRow["id"] = int (dRow[self.UniqId.lower()]) * 1000 + tableNumber

            dRow = self.buildIndex(dRow)

            self.__data.put(dRow)

        mysqlHandle.close()

        return iRecordCount

    def getSQL(self, sTableName):
        SQL = "SELECT %s FROM %s WHERE %s " % (", ".join(self.Field), sTableName, self.WhereCause)

        if(self.SQLGroupBy != ""):
            SQL += "GROUP BY %s " % self.SQLGroupBy
        return SQL

    def getNamedDict(self, lRow):
        i = 0
        result = {}
        for sFieldName in self.Field:
            sDictKey = sFieldName.lower()
            if (sDictKey.find(" as ") > 0):
                sDictKey = sDictKey[sDictKey.find(' as ')+4: ]

            if (isinstance(lRow[i], types.StringTypes)):
                result[sDictKey] = lRow[i].encode("utf-8")
            elif (type(lRow[i]) == type(datetime.datetime.today())):
                result[sDictKey] = int(time.mktime(lRow[i].timetuple()))
            else:
                result[sDictKey] = lRow[i]
            i+=1

        return result

    def buildIndex(self, *dRow):
        if(self.Debug):
            print dRow

        return dRow

class CoreSeekUtility:

    @staticmethod
    def literallyCut(string, sChar=" ", CharSet="utf-8"):

        uString = string.decode(CharSet)
        iLength = len(uString)
        if (iLength <= 1):
            return string

        lString = [uString[i:i+1] for i in range(0, iLength)]
        sCut = sChar.join(lString)

        return sCut.encode(CharSet)

    @staticmethod
    def dictIndex(dRecord):
        result = ""
        for key in dRecord:
            key = key.upper()
            value = dRecord[key]

            result += key + "=" + str(value) + " "

        return result
数据源示例:
from CoreSeek import CoreSeek, CoreSeekUtility
import time

class Topic(CoreSeek):
    Scheme = [
              ('id' , {'docid' : True ,} ),
              ('index', { 'type' : 'text',} ),
              ('index_uid', { 'type' : 'text',} ),

              ('topicid', { 'type' : 'integer'} ),
              ('type', { 'type' : 'integer'} ),
              ('privacy', { 'type' : 'integer'} ),
              ('body', { 'type' : 'string'} ),
              ('title', { 'type' : 'string'} ),
              ('uid', { 'type' : 'string'} ),
              ('description', { 'type' : 'string'} )
          ]

    FieldOrder = [('index', 'index_uid')]
    Field = ["topicId", "uid", "title", 'body', 'privacy', 'description', 'type']

    DBName = "Topic"

    def buildIndex(self, dRow):
        dRow['index_uid'] = dRow['uid']
        dRow['index'] = "%s %s" % (dRow['title'], dRow['description'])

        return dRow

class TopicDelta(Topic):
    WhereCause = "createTime > %s " % (time.time() - 3600)

if __name__ == "__main__":
    conf = {}
    source = Topic(conf)
    source.Connected()

    while source.NextDocument():
        print source

转载请注明:爱开源 » CoreSeek Python数据源的基类

您必须 登录 才能发表评论!