您好,登录后才能下订单哦!
Sphinx作为一款优秀的全文检索开源软件确实是很不错,最近工作需要,要求在其上进行二次开发,第一次接触这样一款开源软件,兴奋和紧张心情难免,作为一个刚毕业的应届生,看了一周的源代码,现在奉上一篇博文来对其Indexer部分代码进行分析,共各位及自己做一个参考,其中只代表个人的一些粗浅看法,如果不对,请各位大神一定要指正,这样才能提高,谢谢!
Indexer作为Sphinx的重要组成部分之一,其主要作用是将数据源进行索引化操作,其中涉及到了索引结构的问题,我会单独开一篇博文去详细讲解这个问题,下面我们开始进行代码分析。
//Indexer.cpp —— int main ( int argc, char ** argv )
这一部分主要是命令行下参数的读入,我们这里采用的是Indexer.exe -c d:/csft_mysql.conf -All命令,这里,argc = 4,即有3个参数,argv[1] :-c就是读取指定config文档,argv[2]: d: / csft_mysql.conf就配置文件config的位置,argv[3]就代表对config文件中指定的所有数据源进行索引构建
//Do argv[i] 解析命令行 const char * sOptConfig = NULL; bool bMerge = false; CSphVector<CSphFilterSettings> dMergeDstFilters; CSphVector<const char *> dIndexes; bool bIndexAll = false; bool bMergeKillLists = false; int i; for ( i=1; i<argc; i++ )//依次处理命令行中的每一个部分 { if ( ( !strcmp ( argv[i], "--config" ) || !strcmp ( argv[i], "-c" ) ) && (i+1)<argc ) { sOptConfig = argv[++i]; if ( !sphIsReadable ( sOptConfig ) ) sphDie ( "config file '%s' does not exist or is not readable", sOptConfig ); } else if ( strcasecmp ( argv[i], "--merge" )==0 && (i+2)<argc ) { bMerge = true; dIndexes.Add ( argv[i+1] ); dIndexes.Add ( argv[i+2] ); i += 2; } else if ( bMerge && strcasecmp ( argv[i], "--merge-dst-range" )==0 && (i+3)<argc ) { dMergeDstFilters.Resize ( dMergeDstFilters.GetLength()+1 ); dMergeDstFilters.Last().m_eType = SPH_FILTER_RANGE; dMergeDstFilters.Last().m_sAttrName = argv[i+1]; dMergeDstFilters.Last().m_uMinValue = (SphAttr_t) strtoull ( argv[i+2], NULL, 10 ); dMergeDstFilters.Last().m_uMaxValue = (SphAttr_t) strtoull ( argv[i+3], NULL, 10 ); i += 3; } else if ( strcasecmp ( argv[i], "--buildstops" )==0 && (i+2)<argc ) { g_sBuildStops = argv[i+1]; g_iTopStops = atoi ( argv[i+2] ); if ( g_iTopStops<=0 ) break; i += 2; } else if ( strcasecmp ( argv[i], "--rotate" )==0 ) { g_bRotate = true; } else if ( strcasecmp ( argv[i], "--buildfreqs" )==0 ) { g_bBuildFreqs = true; } else if ( strcasecmp ( argv[i], "--quiet" )==0 ) { g_bQuiet = true;//这里quiet就是只输出error信息其余信息都不输出 sphSetQuiet ( true ); } else if ( strcasecmp ( argv[i], "--noprogress" )==0 ) { g_bProgress = false; } else if ( strcasecmp ( argv[i], "--all" )==0 ) { bIndexAll = true;//表明所有数据源都要进行索引构建 } else if ( strcasecmp ( argv[i], "--merge-killlists" )==0 ) { bMergeKillLists = true; } else if ( sphIsAlpha(argv[i][0]) ) { dIndexes.Add ( argv[i] ); } else { break; } } if ( !g_bQuiet ) fprintf ( stdout, SPHINX_BANNER );//如果不是quiet模式则输出欢迎信息 if ( !isatty ( fileno(stdout) ) ) g_bProgress = false; if ( i!=argc || argc<2 ) { if ( argc>1 ) { fprintf ( stdout, "ERROR: malformed or unknown option near '%s'.\n", argv[i] ); } else { fprintf ( stdout, "Usage: indexer [OPTIONS] [indexname1 [indexname2 [...]]]\n" "\n" "Options are:\n" "--config <file>\t\tread configuration from specified file\n" "\t\t\t(default is csft.conf)\n" "--all\t\t\treindex all configured indexes\n" "--quiet\t\t\tbe quiet, only print errors\n" "--noprogress\t\tdo not display progress\n" "\t\t\t(automatically on if output is not to a tty)\n" #if !USE_WINDOWS "--rotate\t\tsend SIGHUP to searchd when indexing is over\n" "\t\t\tto rotate updated indexes automatically\n" #endif "--buildstops <output.txt> <N>\n" "\t\t\tbuild top N stopwords and write them to given file\n" "--buildfreqs\t\tstore words frequencies to output.txt\n" "\t\t\t(used with --buildstops only)\n" "--merge <dst-index> <src-index>\n" "\t\t\tmerge 'src-index' into 'dst-index'\n" "\t\t\t'dst-index' will receive merge result\n" "\t\t\t'src-index' will not be modified\n" "--merge-dst-range <attr> <min> <max>\n" "\t\t\tfilter 'dst-index' on merge, keep only those documents\n" "\t\t\twhere 'attr' is between 'min' and 'max' (inclusive)\n" "--merge-killlists" "\t\t\tmerge src and dst killlists instead of applying src killlist to dst" "\n" "Examples:\n" "indexer --quiet myidx1\treindex 'myidx1' defined in 'csft.conf'\n" "indexer --all\t\treindex all indexes defined in 'csft.conf'\n" ); } return 1; }
下一步则是进行Config文件的读取工作,这涉及到两个重要的类,在后面的索引及查询过程中会多次使用它们,一个是CSphConfigParser,一个是CSphConfig。首先看下CSphConfigParser的结构:
//Sphinxutils.h
class CSphConfigParser { public: CSphConfig m_tConf; public: CSphConfigParser (); bool Parse ( const char * sFileName, const char * pBuffer = NULL ); protected: CSphString m_sFileName; int m_iLine; CSphString m_sSectionType; CSphString m_sSectionName; char m_sError [ 1024 ]; int m_iWarnings; static const int WARNS_THRESH = 5; protected: bool IsPlainSection ( const char * sKey ); bool IsNamedSection ( const char * sKey ); bool AddSection ( const char * sType, const char * sSection ); void AddKey ( const char * sKey, char * sValue ); bool ValidateKey ( const char * sKey ); #if !USE_WINDOWS bool TryToExec ( char * pBuffer, char * pEnd, const char * szFilename, CSphVector<char> & dResult ); #endif char * GetBufferString ( char * szDest, int iMax, const char * & szSource ); };
它是解析Config文档的主要的数据结构,其中存储Config文档信息的就是我们这里提到的第二个数据结构,它同样也是CSphConfigParser中的一个成员变量,即CSphConfig,我们可以看到CSphConfig实际上是一个哈希表,通过代码发现它是一个拥有256个键值对的哈希表,后面我们会讲到,通过CSphConfigParser类函数,将Config文件解析,读取到某一Config名字插入CsphConfig哈希表中的Key值,读取到该Config对应的值插入到Value中,方便后面构建索引时使用。
//Sphinxutils.h
/// config (hash of section types) typedef SmallStringHash_T < CSphConfigType > CSphConfig;
说完了这两个数据结构我们来看下Indexer是如何读取Config信息的,其中主要是通过一个sphLoadConfig函数完成读取操作,将相关Config信息以键值对的形式存入cp.m_tConf中,然后检查重要的参数是否读入且存在,例如Source相关信息,数据源是否被读入,Sphinx中Mysql默认Source对应值为mysql,Indexer,即全局Index定义中是否定义了mem_limit的值,即索引过程中最大缓存限制,等等。
//Indexer.cpp —— int main ( int argc, char ** argv )
/////////////// // load config /////////////// CSphConfigParser cp; CSphConfig & hConf = cp.m_tConf; sOptConfig = sphLoadConfig ( sOptConfig, g_bQuiet, cp ); if ( !hConf ( "source" ) ) sphDie ( "no indexes found in config file '%s'", sOptConfig ); g_iMemLimit = 0; if ( hConf("indexer") && hConf["indexer"]("indexer") ) { CSphConfigSection & hIndexer = hConf["indexer"]["indexer"]; g_iMemLimit = hIndexer.GetSize ( "mem_limit", 0 ); g_iMaxXmlpipe2Field = hIndexer.GetSize ( "max_xmlpipe2_field", 2*1024*1024 ); g_iWriteBuffer = hIndexer.GetSize ( "write_buffer", 1024*1024 ); sphSetThrottling ( hIndexer.GetInt ( "max_iops", 0 ), hIndexer.GetSize ( "max_iosize", 0 ) ); }
这其中,主要解析函数为CSphConfig中的Parser函数,其里面比较复杂,大意就是按照字符流读取Config文档,遇到配置信息及其值就存储到CSphconfig这个哈希表中
//Sphinxutils.h
const char * sphLoadConfig ( const char * sOptConfig, bool bQuiet, CSphConfigParser & cp ) { // fallback to defaults if there was no explicit config specified while ( !sOptConfig ) { #ifdef SYSCONFDIR sOptConfig = SYSCONFDIR "/csft.conf"; if ( sphIsReadable(sOptConfig) ) break; #endif sOptConfig = "./csft.conf"; if ( sphIsReadable(sOptConfig) ) break; sOptConfig = NULL; break; } if ( !sOptConfig ) sphDie ( "no readable config file (looked in " #ifdef SYSCONFDIR SYSCONFDIR "/csft.conf, " #endif "./csft.conf)" ); if ( !bQuiet ) fprintf ( stdout, "using config file '%s'...\n", sOptConfig ); // load config if ( !cp.Parse ( sOptConfig ) )//Parser为实际解析函数 sphDie ( "failed to parse config file '%s'", sOptConfig ); CSphConfig & hConf = cp.m_tConf; if ( !hConf ( "index" ) ) sphDie ( "no indexes found in config file '%s'", sOptConfig ); return sOptConfig; }
当我们顺利的读取完Config信息后,我们进入构建索引阶段,前面我们提到了第三个参数,我们选用的是ALL即为所有的数据源构建索引,故bMerge(合并索引)为false,bIndexALL为true,我们开始为每一数据源构建索引,程序会开始在类型为CSphConfig的hConf哈希表中搜索Key为index的值,即需要构建的索引,然后取出该索引的名称,结合数据源Source信息构建索引,执行DoIndex函数。
//Indexer.cpp —— int main ( int argc, char ** argv )
///////////////////// // index each index //////////////////// sphStartIOStats (); bool bIndexedOk = false; // if any of the indexes are ok if ( bMerge ) { if ( dIndexes.GetLength()!=2 ) sphDie ( "there must be 2 indexes to merge specified" ); if ( !hConf["index"](dIndexes[0]) ) sphDie ( "no merge destination index '%s'", dIndexes[0] ); if ( !hConf["index"](dIndexes[1]) ) sphDie ( "no merge source index '%s'", dIndexes[1] ); bIndexedOk = DoMerge ( hConf["index"][dIndexes[0]], dIndexes[0], hConf["index"][dIndexes[1]], dIndexes[1], dMergeDstFilters, g_bRotate, bMergeKillLists ); } else if ( bIndexAll ) { hConf["index"].IterateStart (); while ( hConf["index"].IterateNext() ) bIndexedOk |= DoIndex ( hConf["index"].IterateGet (), hConf["index"].IterateGetKey().cstr(), hConf["source"] );//在这里构建索引,核心函数为DoIndex } else { ARRAY_FOREACH ( i, dIndexes ) { if ( !hConf["index"](dIndexes[i]) ) fprintf ( stdout, "WARNING: no such index '%s', skipping.\n", dIndexes[i] ); else bIndexedOk |= DoIndex ( hConf["index"][dIndexes[i]], dIndexes[i], hConf["source"] ); } } sphShutdownWordforms (); const CSphIOStats & tStats = sphStopIOStats (); if ( !g_bQuiet ) { ReportIOStats ( "reads", tStats.m_iReadOps, tStats.m_iReadTime, tStats.m_iReadBytes ); ReportIOStats ( "writes", tStats.m_iWriteOps, tStats.m_iWriteTime, tStats.m_iWriteBytes ); }
DoIndex为整个Indexer中最核心的函数,下面我们来详细分析下.
//Indexer.cpp ——DoIndex(const CSphConfigSection & hIndex, const char * sIndexName, const CSphConfigType & hSource)
首先判断数据源类型是否为分布式,是否采用quiet模式只输出error信息
if ( hIndex("type") && hIndex["type"]=="distributed" ) { if ( !g_bQuiet ) { fprintf ( stdout, "distributed index '%s' can not be directly indexed; skipping.\n", sIndexName ); fflush ( stdout ); } return false; } if ( !g_bQuiet ) { fprintf ( stdout, "indexing index '%s'...\n", sIndexName ); fflush ( stdout ); }
然后检查hIndex信息中的配置信息是否齐全正确
// check config if ( !hIndex("path") ) { fprintf ( stdout, "ERROR: index '%s': key 'path' not found.\n", sIndexName ); return false; } if ( ( hIndex.GetInt ( "min_prefix_len", 0 ) > 0 || hIndex.GetInt ( "min_infix_len", 0 ) > 0 ) && hIndex.GetInt ( "enable_star" ) == 0 ) { const char * szMorph = hIndex.GetStr ( "morphology", "" ); if ( szMorph && *szMorph && strcmp ( szMorph, "none" ) ) { fprintf ( stdout, "ERROR: index '%s': infixes and morphology are enabled, enable_star=0\n", sIndexName ); return false; } }
接着开始准备分词器,其中主要就是初始化一些参数,例如在sphConfTokenizer中会根据Config配置文件中设置的charsetType类型选择合适的编码解析字符,以及采用何种中文分词器来对中文进行分词操作。然后就是初始化参数后创建分析器实例指定分词所用的词典的地址位置等
/////////////////// // spawn tokenizer /////////////////// CSphString sError; CSphTokenizerSettings tTokSettings; if ( !sphConfTokenizer ( hIndex, tTokSettings, sError ) ) sphDie ( "index '%s': %s", sIndexName, sError.cstr() ); ISphTokenizer * pTokenizer = ISphTokenizer::Create ( tTokSettings, sError ); if ( !pTokenizer ) sphDie ( "index '%s': %s", sIndexName, sError.cstr() ); CSphDict * pDict = NULL; CSphDictSettings tDictSettings; if ( !g_sBuildStops ) { ISphTokenizer * pTokenFilter = NULL; sphConfDictionary ( hIndex, tDictSettings ); pDict = sphCreateDictionaryCRC ( tDictSettings, pTokenizer, sError ); if ( !pDict ) sphDie ( "index '%s': %s", sIndexName, sError.cstr() ); if ( !sError.IsEmpty () ) fprintf ( stdout, "WARNING: index '%s': %s\n", sIndexName, sError.cstr() ); pTokenFilter = ISphTokenizer::CreateTokenFilter ( pTokenizer, pDict->GetMultiWordforms () ); pTokenizer = pTokenFilter ? pTokenFilter : pTokenizer; }
然后是前缀后缀索引设置(这个地方研究的不细致,先把代码贴出来,待补充)
// prefix/infix indexing int iPrefix = hIndex("min_prefix_len") ? hIndex["min_prefix_len"].intval() : 0; int iInfix = hIndex("min_infix_len") ? hIndex["min_infix_len"].intval() : 0; iPrefix = Max ( iPrefix, 0 ); iInfix = Max ( iInfix, 0 ); CSphString sPrefixFields, sInfixFields; if ( hIndex.Exists ( "prefix_fields" ) ) sPrefixFields = hIndex ["prefix_fields"].cstr (); if ( hIndex.Exists ( "infix_fields" ) ) sInfixFields = hIndex ["infix_fields"].cstr (); if ( iPrefix == 0 && !sPrefixFields.IsEmpty () ) fprintf ( stdout, "WARNING: min_prefix_len = 0. prefix_fields are ignored\n" ); if ( iInfix == 0 && !sInfixFields.IsEmpty () ) fprintf ( stdout, "WARNING: min_infix_len = 0. infix_fields are ignored\n" );
然后设置boundary信息(词组边界符列表,此列表控制哪些字符被视作分隔不同词组的边界,每到一个这样的边界,其后面的词的“位置”值都会被加入一个额外的增量,可以借此用近似搜索符来模拟词组搜索。)
// boundary bool bInplaceEnable = hIndex.GetInt ( "inplace_enable", 0 ) != 0; int iHitGap = hIndex.GetSize ( "inplace_hit_gap", 0 ); int iDocinfoGap = hIndex.GetSize ( "inplace_docinfo_gap", 0 ); float fRelocFactor = hIndex.GetFloat ( "inplace_reloc_factor", 0.1f ); float fWriteFactor = hIndex.GetFloat ( "inplace_write_factor", 0.1f ); if ( bInplaceEnable ) { if ( fRelocFactor < 0.01f || fRelocFactor > 0.9f ) { fprintf ( stdout, "WARNING: inplace_reloc_factor must be 0.01 to 0.9, clamped\n" ); fRelocFactor = Min ( Max ( fRelocFactor, 0.01f ), 0.9f ); } if ( fWriteFactor < 0.01f || fWriteFactor > 0.9f ) { fprintf ( stdout, "WARNING: inplace_write_factor must be 0.01 to 0.9, clamped\n" ); fWriteFactor = Min ( Max ( fWriteFactor, 0.01f ), 0.9f ); } if ( fWriteFactor+fRelocFactor > 1.0f ) { fprintf ( stdout, "WARNING: inplace_write_factor+inplace_reloc_factor must be less than 0.9, scaled\n" ); float fScale = 0.9f/(fWriteFactor+fRelocFactor); fRelocFactor *= fScale; fWriteFactor *= fScale; } }
接下来准备数据源,其实发现Indexer在准备这些工作时很繁琐,一遍又一遍的检查相关配置信息是否完全,前面检查了后面还查,可能是出于严谨的考虑吧,这里提一下dSource是一个CSphSource的数组,每一个CSphSource类型的pSource对应一个数据源,因为配置信息中可能会存在多个数据源,所以会有多个pSource。程序会在hIndex中搜索Key值为Source的键值对,提取出对应的值作为pSourceName ,在本例中,我们只有配置文件中的一个Source即mysql。我们看一下CSphSource类型结构。其中包含有三个大部分,第一大部分存储文本分词后的word信息,每一个word(也许是字也许是词)对应一个WordHit,这个WordHit描述该word的相关信息,唯一标示该word。其中WordHit中又包含三部分,分别为word的文档ID,表示该word属于哪一篇文档;word的ID,表示该word在字典中的对应ID;Word的位置,表示该word在文档中的偏移量。第二大部分存储Source中文档的相关信息,其中亦包含了三部分,分别问文档ID;文档中列的数目,以及列对应的指针。第三大部分存储的就是doc中的属性字段信息。
/// generic data source class CSphSource : public CSphSourceSettings { public: CSphVector<CSphWordHit> m_dHits; ///< current document split into words CSphDocInfo m_tDocInfo; ///< current document info CSphVector<CSphString> m_dStrAttrs;///< current document string attrs
123
// parse all sources CSphVector<CSphSource*> dSources; bool bGotAttrs = false; bool bSpawnFailed = false; for ( CSphVariant * pSourceName = hIndex("source"); pSourceName; pSourceName = pSourceName->m_pNext ) { if ( !hSources ( pSourceName->cstr() ) ) { fprintf ( stdout, "ERROR: index '%s': source '%s' not found.\n", sIndexName, pSourceName->cstr() ); continue; } const CSphConfigSection & hSource = hSources [ pSourceName->cstr() ]; CSphSource * pSource = SpawnSource ( hSource, pSourceName->cstr(), pTokenizer->IsUtf8 () );//通过SpawnSource完成对于数据源的解析,其中包括了属性列,需要构建索引列等相关信息 if ( !pSource ) { bSpawnFailed = true; continue; } if ( pSource->HasAttrsConfigured() ) bGotAttrs = true;//判断数据源中是否有指定属性项 pSource->SetupFieldMatch ( sPrefixFields.cstr (), sInfixFields.cstr () ); pSource->SetTokenizer ( pTokenizer );//为每一个Source准备一个分词器 dSources.Add ( pSource );//将解析好的某个Source加入Source数组中去,因为可能存在多个Source }
Source信息准备好后,开始准备Index的构建工作,首先检测该Index是否被使用,即是否被上锁,其次通过CSphIndexSettings类型的tSettings对创建好的pIndex进行初始化,主要是一些索引构建的信息,例如缓存大小,Boudary大小,停用词初始化,分词器初始化等等。准备完相关信息后,重要的就是Build函数,这是索引构建的核心函数,我们来仔细分析
// do index CSphIndex * pIndex = sphCreateIndexPhrase ( sIndexPath.cstr() ); assert ( pIndex ); // check lock file if ( !pIndex->Lock() ) { fprintf ( stdout, "FATAL: %s, will not index. Try --rotate option.\n", pIndex->GetLastError().cstr() ); exit ( 1 ); } CSphIndexSettings tSettings; sphConfIndex ( hIndex, tSettings ); if ( tSettings.m_bIndexExactWords && !tDictSettings.HasMorphology () ) { tSettings.m_bIndexExactWords = false; fprintf ( stdout, "WARNING: index '%s': no morphology, index_exact_words=1 has no effect, ignoring\n", sIndexName ); } if ( bGotAttrs && tSettings.m_eDocinfo==SPH_DOCINFO_NONE ) { fprintf ( stdout, "FATAL: index '%s': got attributes, but docinfo is 'none' (fix your config file).\n", sIndexName ); exit ( 1 ); } pIndex->SetProgressCallback ( ShowProgress ); if ( bInplaceEnable ) pIndex->SetInplaceSettings ( iHitGap, iDocinfoGap, fRelocFactor, fWriteFactor ); pIndex->SetTokenizer ( pTokenizer ); pIndex->SetDictionary ( pDict ); pIndex->Setup ( tSettings ); bOK = pIndex->Build ( dSources, g_iMemLimit, g_iWriteBuffer )!=0;//Build函数是索引构建的重点,所有的核心操作都在其中 if ( bOK && g_bRotate ) { sIndexPath.SetSprintf ( "%s.new", hIndex["path"].cstr() ); bOK = pIndex->Rename ( sIndexPath.cstr() ); } if ( !bOK ) fprintf ( stdout, "ERROR: index '%s': %s.\n", sIndexName, pIndex->GetLastError().cstr() ); pIndex->Unlock ();
对于Build函数而言,它是单次处理一个数据源并为此构建索引信息,
//sphinx.cpp Build ( const CSphVector<CSphSource*> & dSources, int iMemoryLimit, int iWriteBuffer )
首先是准备Source,还是把dSource中的每一个pSource检查下是否都存在,词典是否都准备好,各种初始化是否都齐备
// setup sources ARRAY_FOREACH ( iSource, dSources ) { CSphSource * pSource = dSources[iSource]; assert ( pSource ); pSource->SetDict ( m_pDict ); pSource->Setup ( m_tSettings ); }
链接第一个数据源,获取数据源的Schema信息,就是数据源的Doc中哪些是属性,哪些列是要构建索引的信息
// connect 1st source and fetch its schema if ( !dSources[0]->Connect ( m_sLastError ) || !dSources[0]->IterateHitsStart ( m_sLastError ) || !dSources[0]->UpdateSchema ( &m_tSchema, m_sLastError ) ) { return 0; }
后面就是初始化一些存储结构,其中重点说下缓存出来的几个临时文件分别的作用。结尾时tmp0的存储的是被上锁的Index,有些Index正在被查询使用故上锁。tmp1,即对应将来生成的spp文件,存储词汇的位置信息,包含该词所在的文档ID,该词所在词典对应的ID,以及该词在本文档中的位置信息。tmp2,即对应将来生成的spa文件存储的是文档信息,包含了DocID以及DocInfo信息。tmp7对应的是多值查询,感兴趣的可以度娘,这是一种查询方式,这里不做过多解释
// create temp files CSphAutofile fdLock ( GetIndexFileName("tmp0"), SPH_O_NEW, m_sLastError, true ); CSphAutofile fdHits ( GetIndexFileName ( m_bInplaceSettings ? "spp" : "tmp1" ), SPH_O_NEW, m_sLastError, !m_bInplaceSettings ); CSphAutofile fdDocinfos ( GetIndexFileName ( m_bInplaceSettings ? "spa" : "tmp2" ), SPH_O_NEW, m_sLastError, !m_bInplaceSettings ); CSphAutofile fdTmpFieldMVAs ( GetIndexFileName("tmp7"), SPH_O_NEW, m_sLastError, true ); CSphWriter tOrdWriter; CSphString sRawOrdinalsFile = GetIndexFileName("tmp4");
下面具体处理每一个Source取出的每一个文档,主要是通过这个IterateHitsNext实现的
// fetch documents for ( ;; ) { // get next doc, and handle errors if ( !pSource->IterateHitsNext ( m_sLastError ) ) return 0;
具体到该函数可以看到,该函数主要是有两部分组成,即提取索引列(NextDocument),针对该索引列构建索引(BuildHits)
bool CSphSource_Document::IterateHitsNext ( CSphString & sError ) { assert ( m_pTokenizer ); PROFILE ( src_document ); BYTE ** dFields = NextDocument ( sError );//从数据源中提取需要构建索引的列 if ( m_tDocInfo.m_iDocID==0 ) return true; if ( !dFields ) return false; m_tStats.m_iTotalDocuments++; m_dHits.Reserve ( 1024 ); m_dHits.Resize ( 0 ); BuildHits ( dFields, -1, 0 );//针对提取出的需要索引的列构建索引 return true; }
具体看一下NexDocument的操作,通过Sql.h中的API——sqlFetchRow,取出一条记录,验证该记录是否合法
// get next non-zero-id row do { // try to get next row bool bGotRow = SqlFetchRow ();//首先尝试能否正常取出一条记录 // when the party's over... while ( !bGotRow )//如果取不出来这条记录,再继续思考原因 { // is that an error? if ( SqlIsError() ) { sError.SetSprintf ( "sql_fetch_row: %s", SqlError() ); m_tDocInfo.m_iDocID = 1; // 0 means legal eof return NULL; } // maybe we can do next step yet? if ( !RunQueryStep ( m_tParams.m_sQuery.cstr(), sError ) ) { // if there's a message, there's an error // otherwise, we're just over if ( !sError.IsEmpty() ) { m_tDocInfo.m_iDocID = 1; // 0 means legal eof return NULL; } } else { // step went fine; try to fetch bGotRow = SqlFetchRow (); continue; } SqlDismi***esult (); // ok, we're over ARRAY_FOREACH ( i, m_tParams.m_dQueryPost ) { if ( !SqlQuery ( m_tParams.m_dQueryPost[i].cstr() ) ) { sphWarn ( "sql_query_post[%d]: error=%s, query=%s", i, SqlError(), m_tParams.m_dQueryPost[i].cstr() ); break; } SqlDismi***esult (); } m_tDocInfo.m_iDocID = 0; // 0 means legal eof return NULL; } // get him!//成功取得后 m_tDocInfo.m_iDocID = VerifyID ( sphToDocid ( SqlColumn(0) ) );//判断ID是否为0,是否越界 m_uMaxFetchedID = Max ( m_uMaxFetchedID, m_tDocInfo.m_iDocID ); } while ( !m_tDocInfo.m_iDocID );
将条记录按照Schema分成Feild部分,即需要构建索引的部分,以及Attribute部分,即排序需要用到的属性部分
ARRAY_FOREACH ( i, m_tSchema.m_dFields ) { #if USE_ZLIB if ( m_dUnpack[i] != SPH_UNPACK_NONE ) { m_dFields[i] = (BYTE*) SqlUnpackColumn ( i, m_dUnpack[i] ); continue; } #endif m_dFields[i] = (BYTE*) SqlColumn ( m_tSchema.m_dFields[i].m_iIndex ); } int iFieldMVA = 0; for ( int i=0; i<m_tSchema.GetAttrsCount(); i++ ) { const CSphColumnInfo & tAttr = m_tSchema.GetAttr(i); // shortcut if ( tAttr.m_eAttrType & SPH_ATTR_MULTI ) { m_tDocInfo.SetAttr ( tAttr.m_tLocator, 0 ); if ( tAttr.m_eSrc == SPH_ATTRSRC_FIELD ) ParseFieldMVA ( m_dFieldMVAs, iFieldMVA++, SqlColumn ( tAttr.m_iIndex ) ); continue; } switch ( tAttr.m_eAttrType ) { case SPH_ATTR_ORDINAL: // memorize string, fixup NULLs m_dStrAttrs[i] = SqlColumn ( tAttr.m_iIndex ); if ( !m_dStrAttrs[i].cstr() ) m_dStrAttrs[i] = ""; m_tDocInfo.SetAttr ( tAttr.m_tLocator, 0 ); break; case SPH_ATTR_FLOAT: m_tDocInfo.SetAttrFloat ( tAttr.m_tLocator, sphToFloat ( SqlColumn ( tAttr.m_iIndex ) ) ); // FIXME? report conversion errors maybe? break; case SPH_ATTR_BIGINT: m_tDocInfo.SetAttr ( tAttr.m_tLocator, sphToInt64 ( SqlColumn ( tAttr.m_iIndex ) ) ); // FIXME? report conversion errors maybe? break; default: // just store as uint by default m_tDocInfo.SetAttr ( tAttr.m_tLocator, sphToDword ( SqlColumn ( tAttr.m_iIndex ) ) ); // FIXME? report conversion errors maybe? break; } } return m_dFields;
提取出相关数据后,针对每一条需要索引的item开始构建索引,进入BuildHit函数,首先先初始化相关参数,准备分词器缓存
ARRAY_FOREACH ( iField, m_tSchema.m_dFields ) { //BYTE * sField = dFields[iField]; BYTE * sField = GetField(dFields, iField);//取出索引字段 if ( !sField ) continue; if ( m_bStripHTML ) m_pStripper->Strip ( sField ); int iFieldBytes = (int) strlen ( (char*)sField ); m_tStats.m_iTotalBytes += iFieldBytes; m_pTokenizer->SetBuffer ( sField, iFieldBytes );//设置分词器缓存,实际上就是索引字段大小,准备针对索引字段进行分词 BYTE * sWord; int iPos = HIT_PACK(iField,0); int iLastStep = 1; bool bPrefixField = m_tSchema.m_dFields[iField].m_eWordpart == SPH_WORDPART_PREFIX; bool bInfixMode = m_iMinInfixLen > 0; BYTE sBuf [ 16+3*SPH_MAX_WORD_LEN ];
然后开始分词,分词的过程在这里不具体讲了,这不属于Sphinx的主要涉足领域,当我们把iField即要索引的字段放入分词器中依次解析,然后将分出的词赋值给sWord,将sWord的位置计算后赋值给ipos
// index words only while ( ( sWord = m_pTokenizer->GetToken() )!=NULL ) { iPos += iLastStep + m_pTokenizer->GetOvershortCount()*m_iOvershortStep; if ( m_pTokenizer->GetBoundary() ) iPos = Max ( iPos+m_iBoundaryStep, 1 ); iLastStep = 1; if ( bGlobalPartialMatch ) { int iBytes = strlen ( (const char*)sWord ); memcpy ( sBuf + 1, sWord, iBytes ); sBuf [0] = MAGIC_WORD_HEAD; sBuf [iBytes + 1] = '\0'; SphWordID_t iWord = m_pDict->GetWordIDWithMarkers ( sBuf ); if ( iWord ) { CSphWordHit & tHit = m_dHits.Add (); tHit.m_iDocID = m_tDocInfo.m_iDocID; tHit.m_iWordID = iWord; tHit.m_iWordPos = iPos; } }
将分词后的sWord去词典中查找它对应的词ID,这样我们就收集全了这个词的所有详细信息,创建一个类型为CSphWordHit类型的tHit,其中存储了该sWord所在的DocID,在词典中对应的词ID,以及在文档中词的位置信息Pos
SphWordID_t iWord = m_pDict->GetWordID ( sWord ); if ( iWord ) { CSphWordHit & tHit = m_dHits.Add ();//将tHit放入dHit中去 tHit.m_iDocID = m_tDocInfo.m_iDocID; tHit.m_iWordID = iWord; tHit.m_iWordPos = iPos; } else { iLastStep = m_iStopwordStep; }
处理完该词后,如果是中文的话还会进一步去判断其是否有近义词出现,其主要的函数为GetThesaurus,这里要简单说明下采用的MMSEG分词法,比如我们分词得到了中华,那么它还会继续从词典中去找是否存在其扩展词段(这里姑且翻译成近义词)如中华人民,×××,然后也会把他也存入进去(对于MMSEG的中文分词方法还有待进一步研究,这我只能照着代码念了),最后将所有的sWord的信息tHit都放入到m_dHits中去,形成我们的词索引spp索引
// zh_cn only GetThesaurus { int iBytes = strlen ( (const char*)sWord ); const BYTE* tbuf_ptr = m_pTokenizer->GetThesaurus(sWord, iBytes); if(tbuf_ptr) { while(*tbuf_ptr) { size_t len = strlen((const char*)tbuf_ptr); SphWordID_t iWord = m_pDict->GetWordID ( tbuf_ptr ,len , true); if ( iWord ) { CSphWordHit & tHit = m_dHits.Add (); tHit.m_iDocID = m_tDocInfo.m_iDocID; tHit.m_iWordID = iWord; tHit.m_iWordPos = iPos; //tHit.m_iBytePos = iBytePos; //tHit.m_iByteLen = iByteLen; //iLastStep = m_pTokenizer->TokenIsBlended() ? 0 : 1; //needs move this? } tbuf_ptr += len + 1; //move next } } //end if buf }//end GetThesaurus
当该iField索引字段全部都索引完成后,在dHit中添加结束标记
// mark trailing hit if ( m_dHits.GetLength() ) m_dHits.Last().m_iWordPos |= HIT_FIELD_END;
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。