From 0ca6a5ae271a4703da44f99fd2e8c067c6374238 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Sun, 5 Jun 2022 21:46:07 +0800 Subject: [PATCH] refactor json index --- cmake/cmake.options | 2 +- include/libs/index/index.h | 3 +- source/dnode/vnode/inc/vnode.h | 19 ++++----- source/dnode/vnode/src/inc/meta.h | 11 +++--- source/dnode/vnode/src/inc/vnodeInt.h | 3 +- source/dnode/vnode/src/meta/metaOpen.c | 6 --- source/dnode/vnode/src/meta/metaQuery.c | 4 -- source/dnode/vnode/src/meta/metaTable.c | 24 +----------- source/dnode/vnode/src/tsdb/tsdbRead.c | 6 +++ source/libs/executor/src/executorimpl.c | 19 ++++----- source/libs/index/src/indexFilter.c | 52 ++++++++++--------------- 11 files changed, 59 insertions(+), 90 deletions(-) diff --git a/cmake/cmake.options b/cmake/cmake.options index ab3c5ac1ad..62816a80d5 100644 --- a/cmake/cmake.options +++ b/cmake/cmake.options @@ -146,6 +146,6 @@ option( option( BUILD_WITH_INVERTEDINDEX "If use invertedIndex" - OFF + ON ) diff --git a/include/libs/index/index.h b/include/libs/index/index.h index 180c7e7216..bd601f1d9f 100644 --- a/include/libs/index/index.h +++ b/include/libs/index/index.h @@ -193,8 +193,9 @@ void indexInit(); /* index filter */ typedef struct SIndexMetaArg { - void* metaHandle; void* metaEx; + void* idx; + void* ivtIdx; uint64_t suid; } SIndexMetaArg; diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index 7bd2eadd4b..8c7a78a5af 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -56,7 +56,7 @@ int32_t vnodePreprocessReq(SVnode *pVnode, SRpcMsg *pMsg); int32_t vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg *pRsp); int32_t vnodeProcessCMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp); int32_t vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp); -int32_t vnodePreprocessQueryMsg(SVnode * pVnode, SRpcMsg * pMsg); +int32_t vnodePreprocessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg); int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg); int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo); int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad); @@ -86,7 +86,7 @@ typedef struct SMetaFltParam { tb_uid_t suid; int16_t cid; int16_t type; - char *val; + char * val; bool reverse; int (*filterFunc)(void *a, void *b, int16_t type); @@ -121,7 +121,8 @@ int32_t tsdbGetFileBlocksDistInfo(tsdbReaderT *pReader, STableBlockDistInfo bool isTsdbCacheLastRow(tsdbReaderT *pReader); int32_t tsdbGetAllTableList(SMeta *pMeta, uint64_t uid, SArray *list); int32_t tsdbGetCtbIdList(SMeta *pMeta, int64_t suid, SArray *list); -void *tsdbGetIdx(SMeta *pMeta); +void * tsdbGetIdx(SMeta *pMeta); +void * tsdbGetIvtIdx(SMeta *pMeta); int64_t tsdbGetNumOfRowsInMemTable(tsdbReaderT *pHandle); bool tsdbNextDataBlock(tsdbReaderT pTsdbReadHandle); @@ -197,7 +198,7 @@ struct SMetaEntry { int64_t version; int8_t type; tb_uid_t uid; - char *name; + char * name; union { struct { SSchemaWrapper schemaRow; @@ -225,17 +226,17 @@ struct SMetaEntry { struct SMetaReader { int32_t flags; - SMeta *pMeta; + SMeta * pMeta; SDecoder coder; SMetaEntry me; - void *pBuf; + void * pBuf; int32_t szBuf; }; struct SMTbCursor { - TBC *pDbc; - void *pKey; - void *pVal; + TBC * pDbc; + void * pKey; + void * pVal; int32_t kLen; int32_t vLen; SMetaReader mr; diff --git a/source/dnode/vnode/src/inc/meta.h b/source/dnode/vnode/src/inc/meta.h index b610676c19..63a8e7ec68 100644 --- a/source/dnode/vnode/src/inc/meta.h +++ b/source/dnode/vnode/src/inc/meta.h @@ -69,12 +69,11 @@ struct SMeta { TTB* pUidIdx; TTB* pNameIdx; TTB* pCtbIdx; -#ifdef USE_INVERTED_INDEX + // ivt idx and idx void* pTagIvtIdx; -#else - TTB* pTagIdx; -#endif - TTB* pTtlIdx; + TTB* pTagIdx; + TTB* pTtlIdx; + TTB* pSmaIdx; SMetaIdx* pIdx; }; @@ -117,7 +116,7 @@ typedef struct { } SSmaIdxKey; // metaTable ================== -int metaCreateTagIdxKey(tb_uid_t suid, int32_t cid, const void* pTagData, int32_t nTagData, int8_t type, tb_uid_t uid, +int metaCreateTagIdxKey(tb_uid_t suid, int32_t cid, const void* pTagData, int32_t nTagData, int8_t type, tb_uid_t uid, STagIdxKey** ppTagIdxKey, int32_t* nTagIdxKey); #ifndef META_REFACT diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index d3b5f29aac..b35f6e3abe 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -87,7 +87,7 @@ int metaAlterSTable(SMeta* pMeta, int64_t version, SVCreateStbReq* p int metaDropSTable(SMeta* pMeta, int64_t verison, SVDropStbReq* pReq); int metaCreateTable(SMeta* pMeta, int64_t version, SVCreateTbReq* pReq); int metaDropTable(SMeta* pMeta, int64_t version, SVDropTbReq* pReq, SArray* tbUids); -int metaAlterTable(SMeta* pMeta, int64_t version, SVAlterTbReq* pReq, STableMetaRsp *pMetaRsp); +int metaAlterTable(SMeta* pMeta, int64_t version, SVAlterTbReq* pReq, STableMetaRsp* pMetaRsp); SSchemaWrapper* metaGetTableSchema(SMeta* pMeta, tb_uid_t uid, int32_t sver, bool isinline); STSchema* metaGetTbTSchema(SMeta* pMeta, tb_uid_t uid, int32_t sver); int metaGetTableEntryByName(SMetaReader* pReader, const char* name); @@ -104,6 +104,7 @@ int32_t metaSnapshotReaderOpen(SMeta* pMeta, SMetaSnapshotReader** ppRea int32_t metaSnapshotReaderClose(SMetaSnapshotReader* pReader); int32_t metaSnapshotRead(SMetaSnapshotReader* pReader, void** ppData, uint32_t* nData); void* metaGetIdx(SMeta* pMeta); +void* metaGetIvtIdx(SMeta* pMeta); int32_t metaCreateTSma(SMeta* pMeta, int64_t version, SSmaCfg* pCfg); int32_t metaDropTSma(SMeta* pMeta, int64_t indexUid); diff --git a/source/dnode/vnode/src/meta/metaOpen.c b/source/dnode/vnode/src/meta/metaOpen.c index 86637d2850..95510c25e5 100644 --- a/source/dnode/vnode/src/meta/metaOpen.c +++ b/source/dnode/vnode/src/meta/metaOpen.c @@ -93,7 +93,6 @@ int metaOpen(SVnode *pVnode, SMeta **ppMeta) { } // open pTagIdx -#ifdef USE_INVERTED_INDEX // TODO(yihaoDeng), refactor later char indexFullPath[128] = {0}; sprintf(indexFullPath, "%s/%s", pMeta->path, "invert"); @@ -104,13 +103,11 @@ int metaOpen(SVnode *pVnode, SMeta **ppMeta) { goto _err; } -#else ret = tdbTbOpen("tag.idx", -1, 0, tagIdxKeyCmpr, pMeta->pEnv, &pMeta->pTagIdx); if (ret < 0) { metaError("vgId:%d, failed to open meta tag index since %s", TD_VID(pVnode), tstrerror(terrno)); goto _err; } -#endif // open pTtlIdx ret = tdbTbOpen("ttl.idx", sizeof(STtlIdxKey), 0, ttlIdxKeyCmpr, pMeta->pEnv, &pMeta->pTtlIdx); @@ -141,11 +138,8 @@ _err: if (pMeta->pIdx) metaCloseIdx(pMeta); if (pMeta->pSmaIdx) tdbTbClose(pMeta->pSmaIdx); if (pMeta->pTtlIdx) tdbTbClose(pMeta->pTtlIdx); -#ifdef USE_INVERTED_INDEX if (pMeta->pTagIvtIdx) indexClose(pMeta->pTagIvtIdx); -#else if (pMeta->pTagIdx) tdbTbClose(pMeta->pTagIdx); -#endif if (pMeta->pCtbIdx) tdbTbClose(pMeta->pCtbIdx); if (pMeta->pNameIdx) tdbTbClose(pMeta->pNameIdx); if (pMeta->pUidIdx) tdbTbClose(pMeta->pUidIdx); diff --git a/source/dnode/vnode/src/meta/metaQuery.c b/source/dnode/vnode/src/meta/metaQuery.c index fc7d1c3c92..21a55d6463 100644 --- a/source/dnode/vnode/src/meta/metaQuery.c +++ b/source/dnode/vnode/src/meta/metaQuery.c @@ -603,9 +603,6 @@ typedef struct { } SIdxCursor; int32_t metaFilteTableIds(SMeta *pMeta, SMetaFltParam *param, SArray *pUids) { -#ifdef USE_INVERTED_INDEX - return -1; -#else SIdxCursor *pCursor = NULL; int32_t ret = 0, valid = 0; @@ -681,5 +678,4 @@ END: taosMemoryFree(pCursor); return ret; -#endif } diff --git a/source/dnode/vnode/src/meta/metaTable.c b/source/dnode/vnode/src/meta/metaTable.c index 2d84ea3de9..4f4a940bc7 100644 --- a/source/dnode/vnode/src/meta/metaTable.c +++ b/source/dnode/vnode/src/meta/metaTable.c @@ -890,27 +890,12 @@ static int metaUpdateTagIdx(SMeta *pMeta, const SMetaEntry *pCtbEntry) { nTagData = ((const STag *)pCtbEntry->ctbEntry.pTags)->len; return metaSaveJsonVarToIdx(pMeta, pCtbEntry, pTagColumn); } - - // update tag index -#ifdef USE_INVERTED_INDEX - tb_uid_t suid = pCtbEntry->ctbEntry.suid; - tb_uid_t tuid = pCtbEntry->uid; - - SIndexMultiTerm *tmGroup = indexMultiTermCreate(); - - SIndexTerm *tm = indexTermCreate(suid, ADD_VALUE, pTagColumn->type, pTagColumn->name, sizeof(pTagColumn->name), - pTagData, pTagData == NULL ? 0 : strlen(pTagData)); - indexMultiTermAdd(tmGroup, tm); - int ret = indexPut((SIndex *)pMeta->pTagIvtIdx, tmGroup, tuid); - indexMultiTermDestroy(tmGroup); -#else if (metaCreateTagIdxKey(pCtbEntry->ctbEntry.suid, pTagColumn->colId, pTagData, nTagData, pTagColumn->type, pCtbEntry->uid, &pTagIdxKey, &nTagIdxKey) < 0) { return -1; } tdbTbInsert(pMeta->pTagIdx, pTagIdxKey, nTagIdxKey, NULL, 0, &pMeta->txn); metaDestroyTagIdxKey(pTagIdxKey); -#endif tDecoderClear(&dc); tdbFree(pData); return 0; @@ -995,10 +980,5 @@ _err: return -1; } // refactor later -void *metaGetIdx(SMeta *pMeta) { -#ifdef USE_INVERTED_INDEX - return pMeta->pTagIvtIdx; -#else - return pMeta->pTagIdx; -#endif -} +void *metaGetIdx(SMeta *pMeta) { return pMeta->pTagIdx; } +void *metaGetIvtIdx(SMeta *pMeta) { return pMeta->pTagIvtIdx; } diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 5f2ea80078..b2c077e9f5 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -2829,6 +2829,12 @@ void* tsdbGetIdx(SMeta* pMeta) { } return metaGetIdx(pMeta); } +void* tsdbGetIvtIdx(SMeta* pMeta) { + if (pMeta == NULL) { + return NULL; + } + return metaGetIvtIdx(pMeta); +} int32_t tsdbGetAllTableList(SMeta* pMeta, uint64_t uid, SArray* list) { SMCtbCursor* pCur = metaOpenCtbCursor(pMeta, uid); diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 07f4251340..bee0ce7f89 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -4052,12 +4052,12 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SExprInfo* p goto _error; } - pInfo->limit = *pLimit; - pInfo->slimit = *pSlimit; - pInfo->curOffset = pLimit->offset; + pInfo->limit = *pLimit; + pInfo->slimit = *pSlimit; + pInfo->curOffset = pLimit->offset; pInfo->curSOffset = pSlimit->offset; pInfo->binfo.pRes = pResBlock; - pInfo->pFilterNode= pCondition; + pInfo->pFilterNode = pCondition; int32_t numOfCols = num; int32_t numOfRows = 4096; @@ -4404,7 +4404,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo SScanPhysiNode* pScanPhyNode = (SScanPhysiNode*)pPhyNode; // simple child table. STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pPhyNode; STimeWindowAggSupp twSup = { - .waterMark = pTableScanNode->watermark, .calTrigger = pTableScanNode->triggerType, .maxTs = INT64_MIN}; + .waterMark = pTableScanNode->watermark, .calTrigger = pTableScanNode->triggerType, .maxTs = INT64_MIN}; tsdbReaderT pDataReader = NULL; if (pHandle->vnode) { pDataReader = doCreateDataReader(pTableScanNode, pHandle, pTableListInfo, (uint64_t)queryId, taskId, pTagCond); @@ -4487,7 +4487,8 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc); SLimit limit = {.limit = pProjPhyNode->limit, .offset = pProjPhyNode->offset}; SLimit slimit = {.limit = pProjPhyNode->slimit, .offset = pProjPhyNode->soffset}; - pOptr = createProjectOperatorInfo(ops[0], pExprInfo, num, pResBlock, &limit, &slimit, pProjPhyNode->node.pConditions, pTaskInfo); + pOptr = createProjectOperatorInfo(ops[0], pExprInfo, num, pResBlock, &limit, &slimit, + pProjPhyNode->node.pConditions, pTaskInfo); } else if (QUERY_NODE_PHYSICAL_PLAN_AGG == type) { SAggPhysiNode* pAggNode = (SAggPhysiNode*)pPhyNode; SExprInfo* pExprInfo = createExprInfo(pAggNode->pAggFuncs, pAggNode->pGroupKeys, &num); @@ -4836,7 +4837,8 @@ int32_t getTableList(void* metaHandle, int32_t tableType, uint64_t tableUid, STa if (tableType == TSDB_SUPER_TABLE) { if (pTagCond) { - SIndexMetaArg metaArg = {.metaEx = metaHandle, .metaHandle = tsdbGetIdx(metaHandle), .suid = tableUid}; + SIndexMetaArg metaArg = { + .metaEx = metaHandle, .idx = tsdbGetIdx(metaHandle), .ivtIdx = tsdbGetIvtIdx(metaHandle), .suid = tableUid}; SArray* res = taosArrayInit(8, sizeof(uint64_t)); code = doFilterTag(pTagCond, &metaArg, res); @@ -5187,8 +5189,7 @@ int32_t getOperatorExplainExecInfo(SOperatorInfo* operatorInfo, SExplainExecInfo return TSDB_CODE_SUCCESS; } -int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, const char* pKey, - size_t size) { +int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, const char* pKey, size_t size) { pSup->keySize = sizeof(int64_t) + sizeof(TSKEY); pSup->pKeyBuf = taosMemoryCalloc(1, pSup->keySize); pSup->pResultRows = taosArrayInit(1024, size); diff --git a/source/libs/index/src/indexFilter.c b/source/libs/index/src/indexFilter.c index cb605ca073..34251ab918 100644 --- a/source/libs/index/src/indexFilter.c +++ b/source/libs/index/src/indexFilter.c @@ -335,44 +335,34 @@ static Filter sifGetFilterFunc(EIndexQueryType type, bool *reverse) { } static int32_t sifDoIndex(SIFParam *left, SIFParam *right, int8_t operType, SIFParam *output) { SIndexMetaArg *arg = &output->arg; -#ifdef USE_INVERTED_INDEX - SIndexTerm *tm = indexTermCreate(arg->suid, DEFAULT, right->colValType, left->colName, strlen(left->colName), - right->condValue, strlen(right->condValue)); - if (tm == NULL) { - return TSDB_CODE_QRY_OUT_OF_MEMORY; - } + int ret = 0; - int ret = 0; EIndexQueryType qtype = 0; SIF_ERR_RET(sifGetFuncFromSql(operType, &qtype)); - - SIndexMultiTermQuery *mtm = indexMultiTermQueryCreate(MUST); - indexMultiTermQueryAdd(mtm, tm, qtype); if (left->colValType == TSDB_DATA_TYPE_JSON) { - ret = tIndexJsonSearch(arg->metaHandle, mtm, output->result); + SIndexTerm *tm = indexTermCreate(arg->suid, DEFAULT, right->colValType, left->colName, strlen(left->colName), + right->condValue, strlen(right->condValue)); + if (tm == NULL) { + return TSDB_CODE_QRY_OUT_OF_MEMORY; + } + + SIndexMultiTermQuery *mtm = indexMultiTermQueryCreate(MUST); + indexMultiTermQueryAdd(mtm, tm, qtype); + ret = tIndexJsonSearch(arg->ivtIdx, mtm, output->result); } else { - ret = indexSearch(arg->metaHandle, mtm, output->result); + bool reverse; + Filter filterFunc = sifGetFilterFunc(qtype, &reverse); + + SMetaFltParam param = {.suid = arg->suid, + .cid = left->colId, + .type = left->colValType, + .val = right->condValue, + .reverse = reverse, + .filterFunc = filterFunc}; + + ret = metaFilteTableIds(arg->metaEx, ¶m, output->result); } - indexDebug("index filter data size: %d", (int)taosArrayGetSize(output->result)); - indexMultiTermQueryDestroy(mtm); return ret; -#else - EIndexQueryType qtype = 0; - SIF_ERR_RET(sifGetFuncFromSql(operType, &qtype)); - bool reverse; - Filter filterFunc = sifGetFilterFunc(qtype, &reverse); - - SMetaFltParam param = {.suid = arg->suid, - .cid = left->colId, - .type = left->colValType, - .val = right->condValue, - .reverse = reverse, - .filterFunc = filterFunc}; - - int ret = metaFilteTableIds(arg->metaEx, ¶m, output->result); - return ret; -#endif - return 0; } static int32_t sifLessThanFunc(SIFParam *left, SIFParam *right, SIFParam *output) {