From c47371c877a523cf698f904bbf2506602193e32f Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Fri, 27 May 2022 18:13:22 +0800 Subject: [PATCH 1/4] change filter interface --- include/libs/index/index.h | 2 +- source/libs/executor/src/executorimpl.c | 61 +++++++++++++------------ source/libs/index/src/indexFilter.c | 6 ++- 3 files changed, 37 insertions(+), 32 deletions(-) diff --git a/include/libs/index/index.h b/include/libs/index/index.h index 05db99db0f..890908b0ed 100644 --- a/include/libs/index/index.h +++ b/include/libs/index/index.h @@ -196,7 +196,7 @@ typedef enum { SFLT_NOT_INDEX, SFLT_COARSE_INDEX, SFLT_ACCURATE_INDEX } SIdxFltS SIdxFltStatus idxGetFltStatus(SNode* pFilterNode); -int32_t doFilterTag(const SNode* pFilterNode, SArray* result); +int32_t doFilterTag(const SNode* pFilterNode, void* metaHandle, SArray* result); /* * destory index env * diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 8c6c6f0f64..af96bc34d1 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -28,13 +28,13 @@ #include "ttime.h" #include "executorimpl.h" +#include "index.h" #include "query.h" #include "tcompare.h" #include "tcompression.h" #include "thash.h" #include "ttypes.h" #include "vnode.h" -#include "index.h" #define IS_MAIN_SCAN(runtime) ((runtime)->scanFlag == MAIN_SCAN) #define IS_REVERSE_SCAN(runtime) ((runtime)->scanFlag == REVERSE_SCAN) @@ -125,7 +125,7 @@ static void destroySysTableScannerOperatorInfo(void* param, int32_t numOfOutput) void doSetOperatorCompleted(SOperatorInfo* pOperator) { pOperator->status = OP_EXEC_DONE; - pOperator->cost.totalCost = (taosGetTimestampUs() - pOperator->pTaskInfo->cost.start * 1000)/1000.0; + pOperator->cost.totalCost = (taosGetTimestampUs() - pOperator->pTaskInfo->cost.start * 1000) / 1000.0; if (pOperator->pTaskInfo != NULL) { setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED); } @@ -2718,7 +2718,7 @@ static void* setAllSourcesCompleted(SOperatorInfo* pOperator, int64_t startTs) { SExchangeInfo* pExchangeInfo = pOperator->info; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; - int64_t el = taosGetTimestampUs() - startTs; + int64_t el = taosGetTimestampUs() - startTs; SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo; pLoadInfo->totalElapsed += el; @@ -3024,13 +3024,13 @@ SOperatorInfo* createExchangeOperatorInfo(void* pTransporter, const SNodeList* p tsem_init(&pInfo->ready, 0, 0); - pOperator->name = "ExchangeOperator"; + pOperator->name = "ExchangeOperator"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_EXCHANGE; - pOperator->blocking = false; - pOperator->status = OP_NOT_OPENED; - pOperator->info = pInfo; - pOperator->numOfExprs = pBlock->info.numOfCols; - pOperator->pTaskInfo = pTaskInfo; + pOperator->blocking = false; + pOperator->status = OP_NOT_OPENED; + pOperator->info = pInfo; + pOperator->numOfExprs = pBlock->info.numOfCols; + pOperator->pTaskInfo = pTaskInfo; pOperator->fpSet = createOperatorFpSet(prepareLoadRemoteData, doLoadRemoteData, NULL, NULL, destroyExchangeOperatorInfo, NULL, NULL, NULL); @@ -3466,7 +3466,7 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) { initGroupedResultInfo(&pAggInfo->groupResInfo, pAggInfo->aggSup.pResultRowHashTable, 0); OPTR_SET_OPENED(pOperator); - pOperator->cost.openCost = (taosGetTimestampUs() - st)/1000.0; + pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0; return TSDB_CODE_SUCCESS; } @@ -3491,10 +3491,10 @@ static SSDataBlock* getAggregateResult(SOperatorInfo* pOperator) { doSetOperatorCompleted(pOperator); } - size_t rows = blockDataGetNumOfRows(pInfo->pRes);//pInfo->pRes : NULL; + size_t rows = blockDataGetNumOfRows(pInfo->pRes); // pInfo->pRes : NULL; pOperator->resultInfo.totalRows += rows; - return (rows == 0)? NULL:pInfo->pRes; + return (rows == 0) ? NULL : pInfo->pRes; } void aggEncodeResultRow(SOperatorInfo* pOperator, SAggSupporter* pSup, SOptrBasicInfo* pInfo, char** result, @@ -3779,10 +3779,10 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) { pOperator->resultInfo.totalRows += rows; if (pOperator->cost.openCost == 0) { - pOperator->cost.openCost = (taosGetTimestampUs() - st)/ 1000.0; + pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0; } - return (rows > 0)? pInfo->pRes:NULL; + return (rows > 0) ? pInfo->pRes : NULL; } static void doHandleRemainBlockForNewGroupImpl(SFillOperatorInfo* pInfo, SResultInfo* pResultInfo, bool* newgroup, @@ -4438,7 +4438,8 @@ static SExecTaskInfo* createExecTaskInfo(uint64_t queryId, uint64_t taskId, EOPT } static tsdbReaderT doCreateDataReader(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle, - STableGroupInfo* pTableGroupInfo, uint64_t queryId, uint64_t taskId, SNode* pTagNode); + STableGroupInfo* pTableGroupInfo, uint64_t queryId, uint64_t taskId, + SNode* pTagNode); static int32_t doCreateTableGroup(void* metaHandle, int32_t tableType, uint64_t tableUid, STableGroupInfo* pGroupInfo, uint64_t queryId, uint64_t taskId, SNode* pTagCond); @@ -4471,14 +4472,16 @@ void extractTableSchemaVersion(SReadHandle* pHandle, uint64_t uid, SExecTaskInfo } SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle, - uint64_t queryId, uint64_t taskId, STableGroupInfo* pTableGroupInfo, SNode* pTagCond) { + uint64_t queryId, uint64_t taskId, STableGroupInfo* pTableGroupInfo, + SNode* pTagCond) { int32_t type = nodeType(pPhyNode); if (pPhyNode->pChildren == NULL || LIST_LENGTH(pPhyNode->pChildren) == 0) { if (QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == type) { STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pPhyNode; - tsdbReaderT pDataReader = doCreateDataReader(pTableScanNode, pHandle, pTableGroupInfo, (uint64_t)queryId, taskId, pTagCond); + tsdbReaderT pDataReader = + doCreateDataReader(pTableScanNode, pHandle, pTableGroupInfo, (uint64_t)queryId, taskId, pTagCond); if (pDataReader == NULL && terrno != 0) { return NULL; } @@ -4504,7 +4507,8 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo if (pHandle->vnode) { pDataReader = doCreateDataReader(pTableScanNode, pHandle, pTableGroupInfo, (uint64_t)queryId, taskId, pTagCond); } else { - doCreateTableGroup(pHandle->meta, pScanPhyNode->tableType, pScanPhyNode->uid, pTableGroupInfo, queryId, taskId, pTagCond); + doCreateTableGroup(pHandle->meta, pScanPhyNode->tableType, pScanPhyNode->uid, pTableGroupInfo, queryId, taskId, + pTagCond); } if (pDataReader == NULL && terrno != 0) { @@ -4669,8 +4673,8 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc); int32_t tsSlotId = ((SColumnNode*)pSessionNode->window.pTspk)->slotId; - pOptr = - createStreamSessionAggOperatorInfo(ops[0], pExprInfo, num, pResBlock, pSessionNode->gap, tsSlotId, &as, pTaskInfo); + pOptr = createStreamSessionAggOperatorInfo(ops[0], pExprInfo, num, pResBlock, pSessionNode->gap, tsSlotId, &as, + pTaskInfo); } else if (QUERY_NODE_PHYSICAL_PLAN_PARTITION == type) { SPartitionPhysiNode* pPartNode = (SPartitionPhysiNode*)pPhyNode; @@ -4898,9 +4902,9 @@ int32_t doCreateTableGroup(void* metaHandle, int32_t tableType, uint64_t tableUi if (tableType == TSDB_SUPER_TABLE) { SArray* res = taosArrayInit(8, sizeof(STableKeyInfo)); - if(pTagCond){ - code = doFilterTag(pTagCond, res); - }else{ + if (pTagCond) { + code = doFilterTag(pTagCond, metaHandle, res); + } else { code = tsdbGetAllTableList(metaHandle, tableUid, res); } if (code != TSDB_CODE_SUCCESS) { @@ -4938,8 +4942,8 @@ SArray* extractTableIdList(const STableGroupInfo* pTableGroupInfo) { tsdbReaderT doCreateDataReader(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle, STableGroupInfo* pTableGroupInfo, uint64_t queryId, uint64_t taskId, SNode* pTagNode) { uint64_t uid = pTableScanNode->scan.uid; - int32_t code = - doCreateTableGroup(pHandle->meta, pTableScanNode->scan.tableType, uid, pTableGroupInfo, queryId, taskId, pTagNode); + int32_t code = doCreateTableGroup(pHandle->meta, pTableScanNode->scan.tableType, uid, pTableGroupInfo, queryId, + taskId, pTagNode); if (code != TSDB_CODE_SUCCESS) { goto _error; } @@ -4974,8 +4978,8 @@ int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SRead goto _complete; } - (*pTaskInfo)->pRoot = - createOperatorTree(pPlan->pNode, *pTaskInfo, pHandle, queryId, taskId, &(*pTaskInfo)->tableqinfoGroupInfo, pPlan->pTagCond); + (*pTaskInfo)->pRoot = createOperatorTree(pPlan->pNode, *pTaskInfo, pHandle, queryId, taskId, + &(*pTaskInfo)->tableqinfoGroupInfo, pPlan->pTagCond); if (NULL == (*pTaskInfo)->pRoot) { code = terrno; goto _complete; @@ -5176,8 +5180,7 @@ int32_t getOperatorExplainExecInfo(SOperatorInfo* operatorInfo, SExplainExecInfo return TSDB_CODE_SUCCESS; } -int32_t initCatchSupporter(SCatchSupporter* pCatchSup, size_t rowSize, const char* pKey, - const char* pDir) { +int32_t initCatchSupporter(SCatchSupporter* pCatchSup, size_t rowSize, const char* pKey, const char* pDir) { pCatchSup->keySize = sizeof(int64_t) + sizeof(int64_t) + sizeof(TSKEY); pCatchSup->pKeyBuf = taosMemoryCalloc(1, pCatchSup->keySize); _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); diff --git a/source/libs/index/src/indexFilter.c b/source/libs/index/src/indexFilter.c index 0273867ccf..15323cc807 100644 --- a/source/libs/index/src/indexFilter.c +++ b/source/libs/index/src/indexFilter.c @@ -37,6 +37,8 @@ typedef struct SIFParam { int64_t suid; // add later char dbName[TSDB_DB_NAME_LEN]; char colName[TSDB_COL_NAME_LEN]; + + void *metaHandle; } SIFParam; typedef struct SIFCtx { @@ -561,7 +563,7 @@ static int32_t sifGetFltHint(SNode *pNode, SIdxFltStatus *status) { SIF_RET(code); } -int32_t doFilterTag(const SNode *pFilterNode, SArray *result) { +int32_t doFilterTag(const SNode *pFilterNode, void *metaHandle, SArray *result) { if (pFilterNode == NULL) { return TSDB_CODE_SUCCESS; } @@ -570,7 +572,7 @@ int32_t doFilterTag(const SNode *pFilterNode, SArray *result) { // todo move to the initialization function // SIF_ERR_RET(filterInitFromNode((SNode *)pFilterNode, &filter, 0)); - SIFParam param = {0}; + SIFParam param = {.metHandle = metaHandle}; SIF_ERR_RET(sifCalculate((SNode *)pFilterNode, ¶m)); taosArrayAddAll(result, param.result); From 6acbe7e777517a6b886eb9ad1cedef92b5c7a9d2 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Sat, 28 May 2022 16:15:24 +0800 Subject: [PATCH 2/4] enh: support tag filter --- include/libs/index/index.h | 7 +- source/common/src/tdatablock.c | 4 +- source/common/src/tglobal.c | 2 +- source/dnode/vnode/inc/vnode.h | 20 +-- source/dnode/vnode/src/inc/vnodeInt.h | 1 + source/dnode/vnode/src/meta/metaTable.c | 46 ++++--- source/dnode/vnode/src/tsdb/tsdbRead.c | 175 +++++++++++++----------- source/libs/executor/src/executorimpl.c | 6 +- source/libs/index/inc/indexUtil.h | 8 +- source/libs/index/src/index.c | 20 +-- source/libs/index/src/indexCache.c | 7 +- source/libs/index/src/indexFilter.c | 37 +++-- source/libs/index/src/indexTfile.c | 2 +- source/libs/index/src/indexUtil.c | 15 +- source/libs/index/test/CMakeLists.txt | 46 +++---- source/libs/index/test/indexTests.cc | 12 +- source/libs/index/test/utilUT.cc | 30 +++- 17 files changed, 257 insertions(+), 181 deletions(-) diff --git a/include/libs/index/index.h b/include/libs/index/index.h index 890908b0ed..c3d31ffe38 100644 --- a/include/libs/index/index.h +++ b/include/libs/index/index.h @@ -192,11 +192,16 @@ void indexTermDestroy(SIndexTerm* p); void indexInit(); /* index filter */ +typedef struct SIndexMetaArg { + void* metaHandle; + uint64_t suid; +} SIndexMetaArg; + typedef enum { SFLT_NOT_INDEX, SFLT_COARSE_INDEX, SFLT_ACCURATE_INDEX } SIdxFltStatus; SIdxFltStatus idxGetFltStatus(SNode* pFilterNode); -int32_t doFilterTag(const SNode* pFilterNode, void* metaHandle, SArray* result); +int32_t doFilterTag(const SNode* pFilterNode, SIndexMetaArg* metaArg, SArray* result); /* * destory index env * diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index 2d77905d86..a59817075d 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -1153,7 +1153,9 @@ void colInfoDataCleanup(SColumnInfoData* pColumn, uint32_t numOfRows) { if (IS_VAR_DATA_TYPE(pColumn->info.type)) { pColumn->varmeta.length = 0; } else { - memset(pColumn->nullbitmap, 0, BitmapLen(numOfRows)); + if (pColumn->nullbitmap != NULL) { + memset(pColumn->nullbitmap, 0, BitmapLen(numOfRows)); + } } } diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index d0a2ddd9bb..141ec4f03b 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -293,7 +293,7 @@ int32_t taosAddClientLogCfg(SConfig *pCfg) { if (cfgAddInt32(pCfg, "jniDebugFlag", jniDebugFlag, 0, 255, 1) != 0) return -1; if (cfgAddInt32(pCfg, "simDebugFlag", 143, 0, 255, 1) != 0) return -1; if (cfgAddInt32(pCfg, "debugFlag", 0, 0, 255, 1) != 0) return -1; - if (cfgAddInt32(pCfg, "idxDebugFlag", 0, 0, 255, 1) != 0) return -1; + if (cfgAddInt32(pCfg, "idxDebugFlag", idxDebugFlag, 0, 255, 1) != 0) return -1; return 0; } diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index 2b713ff980..420c8b7d3d 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -105,10 +105,12 @@ tsdbReaderT tsdbQueryCacheLast(SVnode *pVnode, SQueryTableDataCond *pCond, STab void *pMemRef); int32_t tsdbGetFileBlocksDistInfo(tsdbReaderT *pReader, STableBlockDistInfo *pTableBlockInfo); bool isTsdbCacheLastRow(tsdbReaderT *pReader); -int32_t tsdbGetAllTableList(SMeta* pMeta, uint64_t uid, SArray* list); +int32_t tsdbGetAllTableList(SMeta *pMeta, uint64_t uid, SArray *list); +void * tsdbGetIdx(SMeta *pMeta); int64_t tsdbGetNumOfRowsInMemTable(tsdbReaderT *pHandle); -bool tsdbNextDataBlock(tsdbReaderT pTsdbReadHandle); -void tsdbRetrieveDataBlockInfo(tsdbReaderT *pTsdbReadHandle, SDataBlockInfo *pBlockInfo); + +bool tsdbNextDataBlock(tsdbReaderT pTsdbReadHandle); +void tsdbRetrieveDataBlockInfo(tsdbReaderT *pTsdbReadHandle, SDataBlockInfo *pBlockInfo); int32_t tsdbRetrieveDataBlockStatisInfo(tsdbReaderT *pTsdbReadHandle, SColumnDataAgg ***pBlockStatis, bool *allHave); SArray *tsdbRetrieveDataBlock(tsdbReaderT *pTsdbReadHandle, SArray *pColumnIdList); void tsdbResetReadHandle(tsdbReaderT queryHandle, SQueryTableDataCond *pCond); @@ -174,7 +176,7 @@ struct SMetaEntry { int64_t version; int8_t type; tb_uid_t uid; - char *name; + char * name; union { struct { SSchemaWrapper schemaRow; @@ -202,17 +204,17 @@ struct SMetaEntry { struct SMetaReader { int32_t flags; - SMeta *pMeta; + SMeta * pMeta; SDecoder coder; SMetaEntry me; - void *pBuf; + void * pBuf; int32_t szBuf; }; struct SMTbCursor { - TBC *pDbc; - void *pKey; - void *pVal; + TBC * pDbc; + void * pKey; + void * pVal; int32_t kLen; int32_t vLen; SMetaReader mr; diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index ba25c5e286..0e67d9e426 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -103,6 +103,7 @@ SArray* metaGetSmaTbUids(SMeta* pMeta); int32_t metaSnapshotReaderOpen(SMeta* pMeta, SMetaSnapshotReader** ppReader, int64_t sver, int64_t ever); int32_t metaSnapshotReaderClose(SMetaSnapshotReader* pReader); int32_t metaSnapshotRead(SMetaSnapshotReader* pReader, void** ppData, uint32_t* nData); +void* metaGetIdx(SMeta* pMeta); int32_t metaCreateTSma(SMeta* pMeta, int64_t version, SSmaCfg* pCfg); int32_t metaDropTSma(SMeta* pMeta, int64_t indexUid); diff --git a/source/dnode/vnode/src/meta/metaTable.c b/source/dnode/vnode/src/meta/metaTable.c index 7182f496c4..f610f18126 100644 --- a/source/dnode/vnode/src/meta/metaTable.c +++ b/source/dnode/vnode/src/meta/metaTable.c @@ -31,9 +31,9 @@ int metaCreateSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) { int vLen = 0; const void *pKey = NULL; const void *pVal = NULL; - void *pBuf = NULL; + void * pBuf = NULL; int32_t szBuf = 0; - void *p = NULL; + void * p = NULL; SMetaReader mr = {0}; // validate req @@ -87,7 +87,7 @@ int metaDropSTable(SMeta *pMeta, int64_t verison, SVDropStbReq *pReq) { } // drop all child tables - TBC *pCtbIdxc = NULL; + TBC * pCtbIdxc = NULL; SArray *pArray = taosArrayInit(8, sizeof(tb_uid_t)); tdbTbcOpen(pMeta->pCtbIdx, &pCtbIdxc, &pMeta->txn); @@ -142,8 +142,8 @@ _exit: int metaAlterSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) { SMetaEntry oStbEntry = {0}; SMetaEntry nStbEntry = {0}; - TBC *pUidIdxc = NULL; - TBC *pTbDbc = NULL; + TBC * pUidIdxc = NULL; + TBC * pTbDbc = NULL; const void *pData; int nData; int64_t oversion; @@ -262,7 +262,7 @@ _err: } int metaDropTable(SMeta *pMeta, int64_t version, SVDropTbReq *pReq, SArray *tbUids) { - void *pData = NULL; + void * pData = NULL; int nData = 0; int rc = 0; tb_uid_t uid; @@ -288,7 +288,7 @@ int metaDropTable(SMeta *pMeta, int64_t version, SVDropTbReq *pReq, SArray *tbUi } static int metaDropTableByUid(SMeta *pMeta, tb_uid_t uid, int *type) { - void *pData = NULL; + void * pData = NULL; int nData = 0; int rc = 0; int64_t version; @@ -324,14 +324,14 @@ static int metaDropTableByUid(SMeta *pMeta, tb_uid_t uid, int *type) { } static int metaAlterTableColumn(SMeta *pMeta, int64_t version, SVAlterTbReq *pAlterTbReq) { - void *pVal = NULL; + void * pVal = NULL; int nVal = 0; - const void *pData = NULL; + const void * pData = NULL; int nData = 0; int ret = 0; tb_uid_t uid; int64_t oversion; - SSchema *pColumn = NULL; + SSchema * pColumn = NULL; SMetaEntry entry = {0}; SSchemaWrapper *pSchema; int c; @@ -479,7 +479,7 @@ _err: static int metaUpdateTableTagVal(SMeta *pMeta, int64_t version, SVAlterTbReq *pAlterTbReq) { SMetaEntry ctbEntry = {0}; SMetaEntry stbEntry = {0}; - void *pVal = NULL; + void * pVal = NULL; int nVal = 0; int ret; int c; @@ -510,7 +510,7 @@ static int metaUpdateTableTagVal(SMeta *pMeta, int64_t version, SVAlterTbReq *pA oversion = *(int64_t *)pData; // search table.db - TBC *pTbDbc = NULL; + TBC * pTbDbc = NULL; SDecoder dc1 = {0}; SDecoder dc2 = {0}; @@ -534,7 +534,7 @@ static int metaUpdateTableTagVal(SMeta *pMeta, int64_t version, SVAlterTbReq *pA metaDecodeEntry(&dc2, &stbEntry); SSchemaWrapper *pTagSchema = &stbEntry.stbEntry.schemaTag; - SSchema *pColumn = NULL; + SSchema * pColumn = NULL; int32_t iCol = 0; for (;;) { pColumn = NULL; @@ -639,8 +639,8 @@ int metaAlterTable(SMeta *pMeta, int64_t version, SVAlterTbReq *pReq) { static int metaSaveToTbDb(SMeta *pMeta, const SMetaEntry *pME) { STbDbKey tbDbKey; - void *pKey = NULL; - void *pVal = NULL; + void * pKey = NULL; + void * pVal = NULL; int kLen = 0; int vLen = 0; SEncoder coder = {0}; @@ -755,14 +755,14 @@ static void metaDestroyTagIdxKey(STagIdxKey *pTagIdxKey) { } static int metaUpdateTagIdx(SMeta *pMeta, const SMetaEntry *pCtbEntry) { - void *pData = NULL; + void * pData = NULL; int nData = 0; STbDbKey tbDbKey = {0}; SMetaEntry stbEntry = {0}; - STagIdxKey *pTagIdxKey = NULL; + STagIdxKey * pTagIdxKey = NULL; int32_t nTagIdxKey; const SSchema *pTagColumn; // = &stbEntry.stbEntry.schema.pSchema[0]; - const void *pTagData = NULL; // + const void * pTagData = NULL; // SDecoder dc = {0}; // get super table @@ -804,7 +804,7 @@ static int metaUpdateTagIdx(SMeta *pMeta, const SMetaEntry *pCtbEntry) { static int metaSaveToSkmDb(SMeta *pMeta, const SMetaEntry *pME) { SEncoder coder = {0}; - void *pVal = NULL; + void * pVal = NULL; int vLen = 0; int rcode = 0; SSkmDbKey skmDbKey = {0}; @@ -880,3 +880,11 @@ _err: metaULock(pMeta); return -1; } +// refactor later +void *metaGetIdx(SMeta *pMeta) { +#ifdef USE_INVERTED_INDEX + return pMeta->pTagIvtIdx; +#else + return pMeta->pTagIdx; +#endif +} diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index c0b97f7536..b7155b4221 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -259,8 +259,8 @@ static SArray* createCheckInfoFromTableGroup(STsdbReadHandle* pTsdbReadHandle, S } taosArrayPush(pTableCheckInfo, &info); - tsdbDebug("%p check table uid:%" PRId64 " from lastKey:%" PRId64 " %s", pTsdbReadHandle, info.tableId, - info.lastKey, pTsdbReadHandle->idStr); + tsdbDebug("%p check table uid:%" PRId64 " from lastKey:%" PRId64 " %s", pTsdbReadHandle, info.tableId, info.lastKey, + pTsdbReadHandle->idStr); } // TODO group table according to the tag value. @@ -363,13 +363,16 @@ static STsdb* getTsdbByRetentions(SVnode* pVnode, STsdbReadHandle* pReadHandle, } if (level == TSDB_RETENTION_L0) { - tsdbDebug("vgId:%d read handle %p rsma level %d is selected to query", TD_VID(pVnode), pReadHandle, TSDB_RETENTION_L0); + tsdbDebug("vgId:%d read handle %p rsma level %d is selected to query", TD_VID(pVnode), pReadHandle, + TSDB_RETENTION_L0); return VND_RSMA0(pVnode); } else if (level == TSDB_RETENTION_L1) { - tsdbDebug("vgId:%d read handle %p rsma level %d is selected to query", TD_VID(pVnode), pReadHandle, TSDB_RETENTION_L1); + tsdbDebug("vgId:%d read handle %p rsma level %d is selected to query", TD_VID(pVnode), pReadHandle, + TSDB_RETENTION_L1); return VND_RSMA1(pVnode); } else { - tsdbDebug("vgId:%d read handle %p rsma level %d is selected to query", TD_VID(pVnode), pReadHandle, TSDB_RETENTION_L2); + tsdbDebug("vgId:%d read handle %p rsma level %d is selected to query", TD_VID(pVnode), pReadHandle, + TSDB_RETENTION_L2); return VND_RSMA2(pVnode); } } @@ -412,7 +415,7 @@ static STsdbReadHandle* tsdbQueryTablesImpl(SVnode* pVnode, SQueryTableDataCond* if (pCond->numOfCols > 0) { int32_t rowLen = 0; - for(int32_t i = 0; i < pCond->numOfCols; ++i) { + for (int32_t i = 0; i < pCond->numOfCols; ++i) { rowLen += pCond->colList[i].bytes; } @@ -660,7 +663,7 @@ SArray* tsdbGetQueriedTableList(tsdbReaderT* pHandle) { } // leave only one table for each group -//static STableGroupInfo* trimTableGroup(STimeWindow* window, STableGroupInfo* pGroupList) { +// static STableGroupInfo* trimTableGroup(STimeWindow* window, STableGroupInfo* pGroupList) { // assert(pGroupList); // size_t numOfGroup = taosArrayGetSize(pGroupList->pGroupList); // @@ -692,7 +695,7 @@ SArray* tsdbGetQueriedTableList(tsdbReaderT* pHandle) { // return pNew; //} -//tsdbReaderT tsdbQueryRowsInExternalWindow(SVnode* pVnode, SQueryTableDataCond* pCond, STableGroupInfo* groupList, +// tsdbReaderT tsdbQueryRowsInExternalWindow(SVnode* pVnode, SQueryTableDataCond* pCond, STableGroupInfo* groupList, // uint64_t qId, uint64_t taskId) { // STableGroupInfo* pNew = trimTableGroup(&pCond->twindow, groupList); // @@ -1299,7 +1302,6 @@ static int32_t handleDataMergeIfNeeded(STsdbReadHandle* pTsdbReadHandle, SBlock* if ((ascScan && (key != TSKEY_INITIAL_VAL && key <= binfo.window.ekey)) || (!ascScan && (key != TSKEY_INITIAL_VAL && key >= binfo.window.skey))) { - bool cacheDataInFileBlockHole = (ascScan && (key != TSKEY_INITIAL_VAL && key < binfo.window.skey)) || (!ascScan && (key != TSKEY_INITIAL_VAL && key > binfo.window.ekey)); if (cacheDataInFileBlockHole) { @@ -1342,7 +1344,7 @@ static int32_t handleDataMergeIfNeeded(STsdbReadHandle* pTsdbReadHandle, SBlock* pTsdbReadHandle->realNumOfRows = binfo.rows; cur->rows = binfo.rows; - cur->win = binfo.window; + cur->win = binfo.window; cur->mixBlock = false; cur->blockCompleted = true; @@ -1353,9 +1355,9 @@ static int32_t handleDataMergeIfNeeded(STsdbReadHandle* pTsdbReadHandle, SBlock* cur->lastKey = binfo.window.skey - 1; cur->pos = -1; } - } else { // partially copy to dest buffer + } else { // partially copy to dest buffer // make sure to only load once - bool firstTimeExtract = ((cur->pos == 0 && ascScan) || (cur->pos == binfo.rows -1 && (!ascScan))); + bool firstTimeExtract = ((cur->pos == 0 && ascScan) || (cur->pos == binfo.rows - 1 && (!ascScan))); if (pTsdbReadHandle->outputCapacity < binfo.rows && firstTimeExtract) { code = doLoadFileDataBlock(pTsdbReadHandle, pBlock, pCheckInfo, cur->slot); if (code != TSDB_CODE_SUCCESS) { @@ -1864,7 +1866,7 @@ static void copyAllRemainRowsFromFileBlock(STsdbReadHandle* pTsdbReadHandle, STa bool ascScan = ASCENDING_TRAVERSE(pTsdbReadHandle->order); - int32_t step = ascScan? 1 : -1; + int32_t step = ascScan ? 1 : -1; int32_t start = cur->pos; int32_t end = endPos; @@ -1879,8 +1881,8 @@ static void copyAllRemainRowsFromFileBlock(STsdbReadHandle* pTsdbReadHandle, STa // the time window should always be ascending order: skey <= ekey cur->win = (STimeWindow){.skey = tsArray[start], .ekey = tsArray[end]}; cur->mixBlock = (numOfRows != pBlockInfo->rows); - cur->lastKey = tsArray[endPos] + step; - cur->blockCompleted = (ascScan? (endPos == pBlockInfo->rows - 1):(endPos == 0)); + cur->lastKey = tsArray[endPos] + step; + cur->blockCompleted = (ascScan ? (endPos == pBlockInfo->rows - 1) : (endPos == 0)); // The value of pos may be -1 or pBlockInfo->rows, and it is invalid in both cases. int32_t pos = endPos + step; @@ -1896,7 +1898,7 @@ int32_t getEndPosInDataBlock(STsdbReadHandle* pTsdbReadHandle, SDataBlockInfo* p // NOTE: reverse the order to find the end position in data block int32_t endPos = -1; bool ascScan = ASCENDING_TRAVERSE(pTsdbReadHandle->order); - int32_t order = ascScan? TSDB_ORDER_DESC : TSDB_ORDER_ASC; + int32_t order = ascScan ? TSDB_ORDER_DESC : TSDB_ORDER_ASC; SQueryFilePos* cur = &pTsdbReadHandle->cur; SDataCols* pCols = pTsdbReadHandle->rhelper.pDCols[0]; @@ -1956,7 +1958,7 @@ static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInf assert(pCols->numOfRows == pBlock->numOfRows && tsArray[0] == pBlock->keyFirst && tsArray[pBlock->numOfRows - 1] == pBlock->keyLast); - bool ascScan = ASCENDING_TRAVERSE(pTsdbReadHandle->order); + bool ascScan = ASCENDING_TRAVERSE(pTsdbReadHandle->order); int32_t step = ascScan ? 1 : -1; // for search the endPos, so the order needs to reverse @@ -1967,8 +1969,9 @@ static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInf STimeWindow* pWin = &blockInfo.window; tsdbDebug("%p uid:%" PRIu64 " start merge data block, file block range:%" PRIu64 "-%" PRIu64 - " rows:%d, start:%d, end:%d, %s", pTsdbReadHandle, pCheckInfo->tableId, pWin->skey, pWin->ekey, blockInfo.rows, - cur->pos, endPos, pTsdbReadHandle->idStr); + " rows:%d, start:%d, end:%d, %s", + pTsdbReadHandle, pCheckInfo->tableId, pWin->skey, pWin->ekey, blockInfo.rows, cur->pos, endPos, + pTsdbReadHandle->idStr); // compared with the data from in-memory buffer, to generate the correct timestamp array list int32_t numOfRows = 0; @@ -2087,8 +2090,9 @@ static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInf } // still assign data into current row - numOfRows += mergeTwoRowFromMem(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, &curRow, row1, row2, numOfCols, - pCheckInfo->tableId, pSchema1, pSchema2, pCfg->update, &lastKeyAppend); + numOfRows += + mergeTwoRowFromMem(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, &curRow, row1, row2, numOfCols, + pCheckInfo->tableId, pSchema1, pSchema2, pCfg->update, &lastKeyAppend); if (cur->win.skey == TSKEY_INITIAL_VAL) { cur->win.skey = key; @@ -2153,8 +2157,7 @@ static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInf * if cache is empty, load remain file block data. In contrast, if there are remain data in cache, do NOT * copy them all to result buffer, since it may be overlapped with file data block. */ - if (node == NULL || - ((TD_ROW_KEY((STSRow*)SL_GET_NODE_DATA(node)) > pTsdbReadHandle->window.ekey) && ascScan) || + if (node == NULL || ((TD_ROW_KEY((STSRow*)SL_GET_NODE_DATA(node)) > pTsdbReadHandle->window.ekey) && ascScan) || ((TD_ROW_KEY((STSRow*)SL_GET_NODE_DATA(node)) < pTsdbReadHandle->window.ekey) && !ascScan)) { // no data in cache or data in cache is greater than the ekey of time window, load data from file block if (cur->win.skey == TSKEY_INITIAL_VAL) { @@ -2175,7 +2178,7 @@ static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInf } cur->blockCompleted = (((pos > endPos || cur->lastKey > pTsdbReadHandle->window.ekey) && ascScan) || - ((pos < endPos || cur->lastKey < pTsdbReadHandle->window.ekey) && !ascScan)); + ((pos < endPos || cur->lastKey < pTsdbReadHandle->window.ekey) && !ascScan)); if (!ascScan) { TSWAP(cur->win.skey, cur->win.ekey); @@ -2794,6 +2797,12 @@ static int tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int return numOfRows; } +void* tsdbGetIdx(SMeta* pMeta) { + if (pMeta == NULL) { + return NULL; + } + return metaGetIdx(pMeta); +} int32_t tsdbGetAllTableList(SMeta* pMeta, uint64_t uid, SArray* list) { SMCtbCursor* pCur = metaOpenCtbCursor(pMeta, uid); @@ -3382,65 +3391,65 @@ int32_t checkForCachedLast(STsdbReadHandle* pTsdbReadHandle) { STimeWindow updateLastrowForEachGroup(STableListInfo* pList) { STimeWindow window = {INT64_MAX, INT64_MIN}; -// int32_t totalNumOfTable = 0; -// SArray* emptyGroup = taosArrayInit(16, sizeof(int32_t)); -// -// // NOTE: starts from the buffer in case of descending timestamp order check data blocks -// size_t numOfGroups = taosArrayGetSize(groupList->pGroupList); -// for (int32_t j = 0; j < numOfGroups; ++j) { -// SArray* pGroup = taosArrayGetP(groupList->pGroupList, j); -// TSKEY key = TSKEY_INITIAL_VAL; -// -// STableKeyInfo keyInfo = {0}; -// -// size_t numOfTables = taosArrayGetSize(pGroup); -// for (int32_t i = 0; i < numOfTables; ++i) { -// STableKeyInfo* pInfo = (STableKeyInfo*)taosArrayGet(pGroup, i); -// -// // if the lastKey equals to INT64_MIN, there is no data in this table -// TSKEY lastKey = 0; //((STable*)(pInfo->pTable))->lastKey; -// if (key < lastKey) { -// key = lastKey; -// -// // keyInfo.pTable = pInfo->pTable; -// keyInfo.lastKey = key; -// pInfo->lastKey = key; -// -// if (key < window.skey) { -// window.skey = key; -// } -// -// if (key > window.ekey) { -// window.ekey = key; -// } -// } -// } -// -// // more than one table in each group, only one table left for each group -// // if (keyInfo.pTable != NULL) { -// // totalNumOfTable++; -// // if (taosArrayGetSize(pGroup) == 1) { -// // // do nothing -// // } else { -// // taosArrayClear(pGroup); -// // taosArrayPush(pGroup, &keyInfo); -// // } -// // } else { // mark all the empty groups, and remove it later -// // taosArrayDestroy(pGroup); -// // taosArrayPush(emptyGroup, &j); -// // } -// } -// -// // window does not being updated, so set the original -// if (window.skey == INT64_MAX && window.ekey == INT64_MIN) { -// window = TSWINDOW_INITIALIZER; -// assert(totalNumOfTable == 0 && taosArrayGetSize(groupList->pGroupList) == numOfGroups); -// } -// -// taosArrayRemoveBatch(groupList->pGroupList, TARRAY_GET_START(emptyGroup), (int32_t)taosArrayGetSize(emptyGroup)); -// taosArrayDestroy(emptyGroup); -// -// groupList->numOfTables = totalNumOfTable; + // int32_t totalNumOfTable = 0; + // SArray* emptyGroup = taosArrayInit(16, sizeof(int32_t)); + // + // // NOTE: starts from the buffer in case of descending timestamp order check data blocks + // size_t numOfGroups = taosArrayGetSize(groupList->pGroupList); + // for (int32_t j = 0; j < numOfGroups; ++j) { + // SArray* pGroup = taosArrayGetP(groupList->pGroupList, j); + // TSKEY key = TSKEY_INITIAL_VAL; + // + // STableKeyInfo keyInfo = {0}; + // + // size_t numOfTables = taosArrayGetSize(pGroup); + // for (int32_t i = 0; i < numOfTables; ++i) { + // STableKeyInfo* pInfo = (STableKeyInfo*)taosArrayGet(pGroup, i); + // + // // if the lastKey equals to INT64_MIN, there is no data in this table + // TSKEY lastKey = 0; //((STable*)(pInfo->pTable))->lastKey; + // if (key < lastKey) { + // key = lastKey; + // + // // keyInfo.pTable = pInfo->pTable; + // keyInfo.lastKey = key; + // pInfo->lastKey = key; + // + // if (key < window.skey) { + // window.skey = key; + // } + // + // if (key > window.ekey) { + // window.ekey = key; + // } + // } + // } + // + // // more than one table in each group, only one table left for each group + // // if (keyInfo.pTable != NULL) { + // // totalNumOfTable++; + // // if (taosArrayGetSize(pGroup) == 1) { + // // // do nothing + // // } else { + // // taosArrayClear(pGroup); + // // taosArrayPush(pGroup, &keyInfo); + // // } + // // } else { // mark all the empty groups, and remove it later + // // taosArrayDestroy(pGroup); + // // taosArrayPush(emptyGroup, &j); + // // } + // } + // + // // window does not being updated, so set the original + // if (window.skey == INT64_MAX && window.ekey == INT64_MIN) { + // window = TSWINDOW_INITIALIZER; + // assert(totalNumOfTable == 0 && taosArrayGetSize(groupList->pGroupList) == numOfGroups); + // } + // + // taosArrayRemoveBatch(groupList->pGroupList, TARRAY_GET_START(emptyGroup), (int32_t)taosArrayGetSize(emptyGroup)); + // taosArrayDestroy(emptyGroup); + // + // groupList->numOfTables = totalNumOfTable; return window; } diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 68050d8d2b..bb3dbaa707 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -4895,13 +4895,17 @@ int32_t getTableList(void* metaHandle, int32_t tableType, uint64_t tableUid, STa if (tableType == TSDB_SUPER_TABLE) { if (pTagCond) { + SIndexMetaArg metaArg = {.metaHandle = tsdbGetIdx(metaHandle), .suid = tableUid}; + SArray* res = taosArrayInit(8, sizeof(uint64_t)); - code = doFilterTag(pTagCond, NULL, res); + code = doFilterTag(pTagCond, &metaArg, res); if (code != TSDB_CODE_SUCCESS) { qError("doFilterTag error:%d", code); taosArrayDestroy(res); terrno = code; return code; + } else { + qDebug("doFilterTag error:%d, suid: %" PRIu64 "", code, tableUid); } for (int i = 0; i < taosArrayGetSize(res); i++) { STableKeyInfo info = {.lastKey = TSKEY_INITIAL_VAL, .uid = *(uint64_t*)taosArrayGet(res, i)}; diff --git a/source/libs/index/inc/indexUtil.h b/source/libs/index/inc/indexUtil.h index f1676ed411..1bd46e976d 100644 --- a/source/libs/index/inc/indexUtil.h +++ b/source/libs/index/inc/indexUtil.h @@ -98,13 +98,13 @@ typedef struct { SArray *deled; } SIdxTempResult; -SIdxTempResult *sIdxTempResultCreate(); +SIdxTempResult *idxTempResultCreate(); -void sIdxTempResultClear(SIdxTempResult *tr); +void idxTempResultClear(SIdxTempResult *tr); -void sIdxTempResultDestroy(SIdxTempResult *tr); +void idxTempResultDestroy(SIdxTempResult *tr); -void sIdxTempResultMergeTo(SArray *result, SIdxTempResult *tr); +void idxTempResultMergeTo(SIdxTempResult *tr, SArray *result); #ifdef __cplusplus } #endif diff --git a/source/libs/index/src/index.c b/source/libs/index/src/index.c index 500f570649..d929c12320 100644 --- a/source/libs/index/src/index.c +++ b/source/libs/index/src/index.c @@ -201,6 +201,7 @@ int indexPut(SIndex* index, SIndexMultiTerm* fVals, uint64_t uid) { char buf[128] = {0}; ICacheKey key = {.suid = p->suid, .colName = p->colName, .nColName = strlen(p->colName), .colType = p->colType}; int32_t sz = indexSerialCacheKey(&key, buf); + indexDebug("suid: %" PRIu64 ", colName: %s, colType: %d", key.suid, key.colName, key.colType); IndexCache** cache = taosHashGet(index->colObj, buf, sz); assert(*cache != NULL); @@ -328,6 +329,7 @@ static int indexTermSearch(SIndex* sIdx, SIndexTermQuery* query, SArray** result char buf[128] = {0}; ICacheKey key = { .suid = term->suid, .colName = term->colName, .nColName = strlen(term->colName), .colType = term->colType}; + indexDebug("suid: %" PRIu64 ", colName: %s, colType: %d", key.suid, key.colName, key.colType); int32_t sz = indexSerialCacheKey(&key, buf); taosThreadMutexLock(&sIdx->mtx); @@ -341,7 +343,7 @@ static int indexTermSearch(SIndex* sIdx, SIndexTermQuery* query, SArray** result int64_t st = taosGetTimestampUs(); - SIdxTempResult* tr = sIdxTempResultCreate(); + SIdxTempResult* tr = idxTempResultCreate(); if (0 == indexCacheSearch(cache, query, tr, &s)) { if (s == kTypeDeletion) { indexInfo("col: %s already drop by", term->colName); @@ -363,12 +365,12 @@ static int indexTermSearch(SIndex* sIdx, SIndexTermQuery* query, SArray** result int64_t cost = taosGetTimestampUs() - st; indexInfo("search cost: %" PRIu64 "us", cost); - sIdxTempResultMergeTo(*result, tr); + idxTempResultMergeTo(tr, *result); - sIdxTempResultDestroy(tr); + idxTempResultDestroy(tr); return 0; END: - sIdxTempResultDestroy(tr); + idxTempResultDestroy(tr); return -1; } static void indexInterResultsDestroy(SArray* results) { @@ -409,13 +411,13 @@ static void indexMayMergeTempToFinalResult(SArray* result, TFileValue* tfv, SIdx if (sz > 0) { TFileValue* lv = taosArrayGetP(result, sz - 1); if (tfv != NULL && strcmp(lv->colVal, tfv->colVal) != 0) { - sIdxTempResultMergeTo(lv->tableId, tr); - sIdxTempResultClear(tr); + idxTempResultMergeTo(tr, lv->tableId); + idxTempResultClear(tr); taosArrayPush(result, &tfv); } else if (tfv == NULL) { // handle last iterator - sIdxTempResultMergeTo(lv->tableId, tr); + idxTempResultMergeTo(tr, lv->tableId); } else { // temp result saved in help tfileValueDestroy(tfv); @@ -489,7 +491,7 @@ int indexFlushCacheToTFile(SIndex* sIdx, void* cache) { bool cn = cacheIter ? cacheIter->next(cacheIter) : false; bool tn = tfileIter ? tfileIter->next(tfileIter) : false; - SIdxTempResult* tr = sIdxTempResultCreate(); + SIdxTempResult* tr = idxTempResultCreate(); while (cn == true || tn == true) { IterateValue* cv = (cn == true) ? cacheIter->getValue(cacheIter) : NULL; IterateValue* tv = (tn == true) ? tfileIter->getValue(tfileIter) : NULL; @@ -515,7 +517,7 @@ int indexFlushCacheToTFile(SIndex* sIdx, void* cache) { } } indexMayMergeTempToFinalResult(result, NULL, tr); - sIdxTempResultDestroy(tr); + idxTempResultDestroy(tr); int ret = indexGenTFile(sIdx, pCache, result); indexDestroyFinalResult(result); diff --git a/source/libs/index/src/indexCache.c b/source/libs/index/src/indexCache.c index 6e52c4b1ba..bbe8f1f91a 100644 --- a/source/libs/index/src/indexCache.c +++ b/source/libs/index/src/indexCache.c @@ -133,6 +133,7 @@ static int32_t cacheSearchCompareFunc(void* cache, SIndexTerm* term, SIdxTempRes CacheTerm* pCt = taosMemoryCalloc(1, sizeof(CacheTerm)); pCt->colVal = term->colVal; + pCt->colType = term->colType; pCt->version = atomic_load_64(&pCache->version); char* key = indexCacheTermGet(pCt); @@ -597,10 +598,10 @@ int indexCacheSearch(void* cache, SIndexTermQuery* query, SIdxTempResult* result indexMemRef(imm); taosThreadMutexUnlock(&pCache->mtx); - int ret = indexQueryMem(mem, query, result, s); + int ret = (mem && mem->mem) ? indexQueryMem(mem, query, result, s) : 0; if (ret == 0 && *s != kTypeDeletion) { // continue search in imm - ret = indexQueryMem(imm, query, result, s); + ret = (imm && imm->mem) ? indexQueryMem(imm, query, result, s) : 0; } indexMemUnRef(mem); @@ -709,7 +710,7 @@ static int32_t indexCacheJsonTermCompare(const void* l, const void* r) { return cmp; } static MemTable* indexInternalCacheCreate(int8_t type) { - int ttype = INDEX_TYPE_CONTAIN_EXTERN_TYPE(type, TSDB_DATA_TYPE_JSON) ? TSDB_DATA_TYPE_BINARY : type; + int ttype = INDEX_TYPE_CONTAIN_EXTERN_TYPE(type, TSDB_DATA_TYPE_JSON) ? TSDB_DATA_TYPE_BINARY : TSDB_DATA_TYPE_BINARY; int32_t (*cmpFn)(const void* l, const void* r) = INDEX_TYPE_CONTAIN_EXTERN_TYPE(type, TSDB_DATA_TYPE_JSON) ? indexCacheJsonTermCompare : indexCacheTermCompare; diff --git a/source/libs/index/src/indexFilter.c b/source/libs/index/src/indexFilter.c index 5544fafc5a..ab96b2929c 100644 --- a/source/libs/index/src/indexFilter.c +++ b/source/libs/index/src/indexFilter.c @@ -38,13 +38,14 @@ typedef struct SIFParam { char dbName[TSDB_DB_NAME_LEN]; char colName[TSDB_COL_NAME_LEN]; - void *metaHandle; + SIndexMetaArg arg; } SIFParam; typedef struct SIFCtx { - int32_t code; - SHashObj *pRes; /* element is SIFParam */ - bool noExec; // true: just iterate condition tree, and add hint to executor plan + int32_t code; + SHashObj * pRes; /* element is SIFParam */ + bool noExec; // true: just iterate condition tree, and add hint to executor plan + SIndexMetaArg arg; // SIdxFltStatus st; } SIFCtx; @@ -259,7 +260,8 @@ static int32_t sifExecFunction(SFunctionNode *node, SIFCtx *ctx, SIFParam *outpu return TSDB_CODE_QRY_INVALID_INPUT; } static int32_t sifDoIndex(SIFParam *left, SIFParam *right, int8_t operType, SIFParam *output) { - SIndexTerm *tm = indexTermCreate(left->suid, DEFAULT, left->colValType, left->colName, strlen(left->colName), + SIndexMetaArg *arg = &output->arg; + SIndexTerm * tm = indexTermCreate(arg->suid, DEFAULT, left->colValType, left->colName, strlen(left->colName), right->condValue, strlen(right->condValue)); if (tm == NULL) { return TSDB_CODE_QRY_OUT_OF_MEMORY; @@ -270,7 +272,8 @@ static int32_t sifDoIndex(SIFParam *left, SIFParam *right, int8_t operType, SIFP SIndexMultiTermQuery *mtm = indexMultiTermQueryCreate(MUST); indexMultiTermQueryAdd(mtm, tm, qtype); - int ret = indexSearch(NULL, mtm, output->result); + int ret = indexSearch(arg->metaHandle, mtm, output->result); + indexDebug("index filter data size: %d", (int)taosArrayGetSize(output->result)); indexMultiTermQueryDestroy(mtm); return ret; } @@ -374,6 +377,8 @@ static int32_t sifExecOper(SOperatorNode *node, SIFCtx *ctx, SIFParam *output) { SIFParam *params = NULL; SIF_ERR_RET(sifInitOperParams(¶ms, node, ctx)); + // ugly code, refactor later + output->arg = ctx->arg; sif_func_t operFn = sifGetOperFn(node->opType); if (ctx->noExec && operFn == NULL) { output->status = SFLT_NOT_INDEX; @@ -425,7 +430,7 @@ _return: static EDealRes sifWalkFunction(SNode *pNode, void *context) { SFunctionNode *node = (SFunctionNode *)pNode; - SIFParam output = {0}; + SIFParam output = {.result = taosArrayInit(8, sizeof(uint64_t))}; SIFCtx *ctx = context; ctx->code = sifExecFunction(node, ctx, &output); @@ -441,7 +446,8 @@ static EDealRes sifWalkFunction(SNode *pNode, void *context) { } static EDealRes sifWalkLogic(SNode *pNode, void *context) { SLogicConditionNode *node = (SLogicConditionNode *)pNode; - SIFParam output = {0}; + + SIFParam output = {.result = taosArrayInit(8, sizeof(uint64_t))}; SIFCtx *ctx = context; ctx->code = sifExecLogic(node, ctx, &output); @@ -457,7 +463,7 @@ static EDealRes sifWalkLogic(SNode *pNode, void *context) { } static EDealRes sifWalkOper(SNode *pNode, void *context) { SOperatorNode *node = (SOperatorNode *)pNode; - SIFParam output = {0}; + SIFParam output = {.result = taosArrayInit(8, sizeof(uint64_t))}; SIFCtx *ctx = context; ctx->code = sifExecOper(node, ctx, &output); @@ -509,8 +515,9 @@ static int32_t sifCalculate(SNode *pNode, SIFParam *pDst) { return TSDB_CODE_QRY_INVALID_INPUT; } int32_t code = 0; - SIFCtx ctx = {.code = 0, .noExec = false}; + SIFCtx ctx = {.code = 0, .noExec = false, .arg = pDst->arg}; ctx.pRes = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK); + if (NULL == ctx.pRes) { indexError("index-filter failed to taosHashInit"); return TSDB_CODE_QRY_OUT_OF_MEMORY; @@ -525,7 +532,9 @@ static int32_t sifCalculate(SNode *pNode, SIFParam *pDst) { indexError("no valid res in hash, node:(%p), type(%d)", (void *)&pNode, nodeType(pNode)); SIF_ERR_RET(TSDB_CODE_QRY_APP_ERROR); } - taosArrayAddAll(pDst->result, res->result); + if (res->result != NULL) { + taosArrayAddAll(pDst->result, res->result); + } sifFreeParam(res); taosHashRemove(ctx.pRes, (void *)&pNode, POINTER_BYTES); @@ -563,7 +572,7 @@ static int32_t sifGetFltHint(SNode *pNode, SIdxFltStatus *status) { SIF_RET(code); } -int32_t doFilterTag(const SNode *pFilterNode, void *metaHandle, SArray *result) { +int32_t doFilterTag(const SNode *pFilterNode, SIndexMetaArg *metaArg, SArray *result) { if (pFilterNode == NULL) { return TSDB_CODE_SUCCESS; } @@ -572,10 +581,12 @@ int32_t doFilterTag(const SNode *pFilterNode, void *metaHandle, SArray *result) // todo move to the initialization function // SIF_ERR_RET(filterInitFromNode((SNode *)pFilterNode, &filter, 0)); - SIFParam param = {.metaHandle = metaHandle}; + SArray * output = taosArrayInit(8, sizeof(uint64_t)); + SIFParam param = {.arg = *metaArg, .result = output}; SIF_ERR_RET(sifCalculate((SNode *)pFilterNode, ¶m)); taosArrayAddAll(result, param.result); + // taosArrayAddAll(result, param.result); sifFreeParam(¶m); SIF_RET(TSDB_CODE_SUCCESS); } diff --git a/source/libs/index/src/indexTfile.c b/source/libs/index/src/indexTfile.c index 3de556e8b5..4047ee6e38 100644 --- a/source/libs/index/src/indexTfile.c +++ b/source/libs/index/src/indexTfile.c @@ -211,12 +211,12 @@ void tfileReaderDestroy(TFileReader* reader) { } // T_REF_INC(reader); fstDestroy(reader->fst); - writerCtxDestroy(reader->ctx, reader->remove); if (reader->remove) { indexInfo("%s is removed", reader->ctx->file.buf); } else { indexInfo("%s is not removed", reader->ctx->file.buf); } + writerCtxDestroy(reader->ctx, reader->remove); taosMemoryFree(reader); } diff --git a/source/libs/index/src/indexUtil.c b/source/libs/index/src/indexUtil.c index a618787fd4..049bc213bc 100644 --- a/source/libs/index/src/indexUtil.c +++ b/source/libs/index/src/indexUtil.c @@ -86,7 +86,7 @@ void iUnion(SArray *inters, SArray *final) { mi[i].idx = 0; } while (1) { - uint64_t mVal = UINT_MAX; + uint64_t mVal = UINT64_MAX; int mIdx = -1; for (int j = 0; j < sz; j++) { @@ -158,7 +158,7 @@ int verdataCompare(const void *a, const void *b) { return cmp; } -SIdxTempResult *sIdxTempResultCreate() { +SIdxTempResult *idxTempResultCreate() { SIdxTempResult *tr = taosMemoryCalloc(1, sizeof(SIdxTempResult)); tr->total = taosArrayInit(4, sizeof(uint64_t)); @@ -166,7 +166,7 @@ SIdxTempResult *sIdxTempResultCreate() { tr->deled = taosArrayInit(4, sizeof(uint64_t)); return tr; } -void sIdxTempResultClear(SIdxTempResult *tr) { +void idxTempResultClear(SIdxTempResult *tr) { if (tr == NULL) { return; } @@ -174,7 +174,7 @@ void sIdxTempResultClear(SIdxTempResult *tr) { taosArrayClear(tr->added); taosArrayClear(tr->deled); } -void sIdxTempResultDestroy(SIdxTempResult *tr) { +void idxTempResultDestroy(SIdxTempResult *tr) { if (tr == NULL) { return; } @@ -182,7 +182,7 @@ void sIdxTempResultDestroy(SIdxTempResult *tr) { taosArrayDestroy(tr->added); taosArrayDestroy(tr->deled); } -void sIdxTempResultMergeTo(SArray *result, SIdxTempResult *tr) { +void idxTempResultMergeTo(SIdxTempResult *tr, SArray *result) { taosArraySort(tr->total, uidCompare); taosArraySort(tr->added, uidCompare); taosArraySort(tr->deled, uidCompare); @@ -194,5 +194,10 @@ void sIdxTempResultMergeTo(SArray *result, SIdxTempResult *tr) { iUnion(arrs, result); taosArrayDestroy(arrs); + indexError("tmp result: total: %d, added: %d, del: %d", (int)taosArrayGetSize(tr->total), + (int)taosArrayGetSize(tr->added), (int)taosArrayGetSize(tr->deled)); + if (taosArrayGetSize(tr->added) != 0 && taosArrayGetSize(result) == 0) { + indexError("except result: %d", (int)(taosArrayGetSize(result))); + } iExcept(result, tr->deled); } diff --git a/source/libs/index/test/CMakeLists.txt b/source/libs/index/test/CMakeLists.txt index c0b47e74c6..0b97fefa3e 100644 --- a/source/libs/index/test/CMakeLists.txt +++ b/source/libs/index/test/CMakeLists.txt @@ -1,74 +1,74 @@ -add_executable(indexTest "") -add_executable(fstTest "") -add_executable(fstUT "") -add_executable(UtilUT "") -add_executable(jsonUT "") +add_executable(idxTest "") +add_executable(idxFstTest "") +add_executable(idxFstUT "") +add_executable(idxUtilUT "") +add_executable(idxJsonUT "") -target_sources(indexTest +target_sources(idxTest PRIVATE "indexTests.cc" ) -target_sources(fstTest +target_sources(idxFstTest PRIVATE "fstTest.cc" ) -target_sources(fstUT +target_sources(idxFstUT PRIVATE "fstUT.cc" ) -target_sources(UtilUT +target_sources(idxUtilUT PRIVATE "utilUT.cc" ) -target_sources(jsonUT +target_sources(idxJsonUT PRIVATE "jsonUT.cc" ) -target_include_directories ( indexTest +target_include_directories (idxTest PUBLIC "${TD_SOURCE_DIR}/include/libs/index" "${CMAKE_CURRENT_SOURCE_DIR}/../inc" ) -target_include_directories ( fstTest +target_include_directories (idxFstTest PUBLIC "${TD_SOURCE_DIR}/include/libs/index" "${CMAKE_CURRENT_SOURCE_DIR}/../inc" ) -target_include_directories ( fstUT +target_include_directories (idxFstUT PUBLIC "${TD_SOURCE_DIR}/include/libs/index" "${CMAKE_CURRENT_SOURCE_DIR}/../inc" ) -target_include_directories ( UtilUT +target_include_directories (idxUtilUT PUBLIC "${TD_SOURCE_DIR}/include/libs/index" "${CMAKE_CURRENT_SOURCE_DIR}/../inc" ) -target_include_directories (jsonUT +target_include_directories (idxJsonUT PUBLIC "${TD_SOURCE_DIR}/include/libs/index" "${CMAKE_CURRENT_SOURCE_DIR}/../inc" ) -target_link_libraries (indexTest +target_link_libraries (idxTest os util common gtest_main index ) -target_link_libraries (fstTest +target_link_libraries (idxFstTest os util common gtest_main index ) -target_link_libraries (fstUT +target_link_libraries (idxFstUT os util common @@ -76,7 +76,7 @@ target_link_libraries (fstUT index ) -target_link_libraries (UtilUT +target_link_libraries (idxUtilUT os util common @@ -84,7 +84,7 @@ target_link_libraries (UtilUT index ) -target_link_libraries (jsonUT +target_link_libraries (idxJsonUT os util common @@ -98,13 +98,13 @@ add_test( ) add_test( NAME idxJsonUT - COMMAND jsonUT + COMMAND idxJsonUT ) add_test( NAME idxUtilUT - COMMAND UtilUT + COMMAND idxUtilUT ) add_test( NAME idxFstUT - COMMAND fstUT + COMMAND idxFstUT ) diff --git a/source/libs/index/test/indexTests.cc b/source/libs/index/test/indexTests.cc index 2d06002af8..9bc732f52e 100644 --- a/source/libs/index/test/indexTests.cc +++ b/source/libs/index/test/indexTests.cc @@ -411,12 +411,12 @@ class TFileObj { // // } - SIdxTempResult* tr = sIdxTempResultCreate(); + SIdxTempResult* tr = idxTempResultCreate(); int ret = tfileReaderSearch(reader_, query, tr); - sIdxTempResultMergeTo(result, tr); - sIdxTempResultDestroy(tr); + idxTempResultMergeTo(tr, result); + idxTempResultDestroy(tr); return ret; } ~TFileObj() { @@ -531,11 +531,11 @@ class CacheObj { indexCacheDebug(cache); } int Get(SIndexTermQuery* query, int16_t colId, int32_t version, SArray* result, STermValueType* s) { - SIdxTempResult* tr = sIdxTempResultCreate(); + SIdxTempResult* tr = idxTempResultCreate(); int ret = indexCacheSearch(cache, query, tr, s); - sIdxTempResultMergeTo(result, tr); - sIdxTempResultDestroy(tr); + idxTempResultMergeTo(tr, result); + idxTempResultDestroy(tr); if (ret != 0) { std::cout << "failed to get from cache:" << ret << std::endl; diff --git a/source/libs/index/test/utilUT.cc b/source/libs/index/test/utilUT.cc index 18a2b457c4..5bf7c51f1f 100644 --- a/source/libs/index/test/utilUT.cc +++ b/source/libs/index/test/utilUT.cc @@ -226,6 +226,22 @@ TEST_F(UtilEnv, 04union) { iUnion(src, rslt); assert(taosArrayGetSize(rslt) == 12); } +TEST_F(UtilEnv, 05unionExcept) { + clearSourceArray(src); + clearFinalArray(rslt); + + uint64_t arr2[] = {7}; + SArray * f = (SArray *)taosArrayGetP(src, 1); + for (int i = 0; i < sizeof(arr2) / sizeof(arr2[0]); i++) { + taosArrayPush(f, &arr2[i]); + } + + iUnion(src, rslt); + + SArray *ept = taosArrayInit(0, sizeof(uint64_t)); + iExcept(rslt, ept); + EXPECT_EQ(taosArrayGetSize(rslt), 1); +} TEST_F(UtilEnv, 01Except) { SArray *total = taosArrayInit(4, sizeof(uint64_t)); { @@ -308,16 +324,26 @@ TEST_F(UtilEnv, 01Except) { ASSERT_EQ(*(uint64_t *)taosArrayGet(total, 1), 100); } TEST_F(UtilEnv, testFill) { - for (int i = 0; i < 10000000; i++) { + for (int i = 0; i < 1000000; i++) { int64_t val = i; char buf[65] = {0}; indexInt2str(val, buf, 1); EXPECT_EQ(val, taosStr2int64(buf)); } - for (int i = 0; i < 10000000; i++) { + for (int i = 0; i < 1000000; i++) { int64_t val = 0 - i; char buf[65] = {0}; indexInt2str(val, buf, -1); EXPECT_EQ(val, taosStr2int64(buf)); } } +TEST_F(UtilEnv, TempResult) { + SIdxTempResult *relt = idxTempResultCreate(); + + SArray *f = taosArrayInit(0, sizeof(uint64_t)); + + uint64_t val = UINT64_MAX - 1; + taosArrayPush(relt->added, &val); + idxTempResultMergeTo(relt, f); + EXPECT_EQ(taosArrayGetSize(f), 1); +} From 0b0253f2d19e12a4ba3757fd26756240b8a6f7c5 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Sat, 28 May 2022 17:30:44 +0800 Subject: [PATCH 3/4] enh: support tag filter --- source/libs/index/inc/indexCache.h | 2 +- source/libs/index/inc/indexTfile.h | 4 +- source/libs/index/inc/indexUtil.h | 23 +++--- source/libs/index/src/index.c | 30 ++++---- source/libs/index/src/indexCache.c | 104 +++++++++++++-------------- source/libs/index/src/indexTfile.c | 86 +++++++++++----------- source/libs/index/src/indexUtil.c | 78 ++++++++++---------- source/libs/index/test/indexTests.cc | 12 ++-- source/libs/index/test/utilUT.cc | 16 ++++- 9 files changed, 183 insertions(+), 172 deletions(-) diff --git a/source/libs/index/inc/indexCache.h b/source/libs/index/inc/indexCache.h index 6cbe2532cc..1046a04db3 100644 --- a/source/libs/index/inc/indexCache.h +++ b/source/libs/index/inc/indexCache.h @@ -74,7 +74,7 @@ void indexCacheIteratorDestroy(Iterate* iiter); int indexCachePut(void* cache, SIndexTerm* term, uint64_t uid); // int indexCacheGet(void *cache, uint64_t *rst); -int indexCacheSearch(void* cache, SIndexTermQuery* query, SIdxTempResult* tr, STermValueType* s); +int indexCacheSearch(void* cache, SIndexTermQuery* query, SIdxTRslt* tr, STermValueType* s); void indexCacheRef(IndexCache* cache); void indexCacheUnRef(IndexCache* cache); diff --git a/source/libs/index/inc/indexTfile.h b/source/libs/index/inc/indexTfile.h index af32caa821..ca55aa93da 100644 --- a/source/libs/index/inc/indexTfile.h +++ b/source/libs/index/inc/indexTfile.h @@ -105,7 +105,7 @@ TFileReader* tfileGetReaderByCol(IndexTFile* tf, uint64_t suid, char* colName); TFileReader* tfileReaderOpen(char* path, uint64_t suid, int64_t version, const char* colName); TFileReader* tfileReaderCreate(WriterCtx* ctx); void tfileReaderDestroy(TFileReader* reader); -int tfileReaderSearch(TFileReader* reader, SIndexTermQuery* query, SIdxTempResult* tr); +int tfileReaderSearch(TFileReader* reader, SIndexTermQuery* query, SIdxTRslt* tr); void tfileReaderRef(TFileReader* reader); void tfileReaderUnRef(TFileReader* reader); @@ -120,7 +120,7 @@ int tfileWriterFinish(TFileWriter* tw); IndexTFile* indexTFileCreate(const char* path); void indexTFileDestroy(IndexTFile* tfile); int indexTFilePut(void* tfile, SIndexTerm* term, uint64_t uid); -int indexTFileSearch(void* tfile, SIndexTermQuery* query, SIdxTempResult* tr); +int indexTFileSearch(void* tfile, SIndexTermQuery* query, SIdxTRslt* tr); Iterate* tfileIteratorCreate(TFileReader* reader); void tfileIteratorDestroy(Iterate* iterator); diff --git a/source/libs/index/inc/indexUtil.h b/source/libs/index/inc/indexUtil.h index 1bd46e976d..dbaecaa963 100644 --- a/source/libs/index/inc/indexUtil.h +++ b/source/libs/index/inc/indexUtil.h @@ -66,7 +66,7 @@ extern "C" { * [1, 4, 5] * output:[4, 5] */ -void iIntersection(SArray *interResults, SArray *finalResult); +void iIntersection(SArray *in, SArray *out); /* multi sorted result union * input: [1, 2, 4, 5] @@ -74,7 +74,7 @@ void iIntersection(SArray *interResults, SArray *finalResult); * [1, 4, 5] * output:[1, 2, 3, 4, 5] */ -void iUnion(SArray *interResults, SArray *finalResult); +void iUnion(SArray *in, SArray *out); /* see example * total: [1, 2, 4, 5, 7, 8] @@ -92,19 +92,24 @@ typedef struct { uint64_t data; } SIdxVerdata; +/* + * index temp result + * + */ typedef struct { SArray *total; - SArray *added; - SArray *deled; -} SIdxTempResult; + SArray *add; + SArray *del; +} SIdxTRslt; -SIdxTempResult *idxTempResultCreate(); +SIdxTRslt *idxTRsltCreate(); -void idxTempResultClear(SIdxTempResult *tr); +void idxTRsltClear(SIdxTRslt *tr); -void idxTempResultDestroy(SIdxTempResult *tr); +void idxTRsltDestroy(SIdxTRslt *tr); + +void idxTRsltMergeTo(SIdxTRslt *tr, SArray *out); -void idxTempResultMergeTo(SIdxTempResult *tr, SArray *result); #ifdef __cplusplus } #endif diff --git a/source/libs/index/src/index.c b/source/libs/index/src/index.c index d929c12320..8584d95bf2 100644 --- a/source/libs/index/src/index.c +++ b/source/libs/index/src/index.c @@ -29,7 +29,7 @@ #include "lucene++/Lucene_c.h" #endif -#define INDEX_NUM_OF_THREADS 1 +#define INDEX_NUM_OF_THREADS 5 #define INDEX_QUEUE_SIZE 200 #define INDEX_DATA_BOOL_NULL 0x02 @@ -85,7 +85,7 @@ static int indexMergeFinalResults(SArray* interResults, EIndexOperatorType oTyp static int indexGenTFile(SIndex* index, IndexCache* cache, SArray* batch); // merge cache and tfile by opera type -static void indexMergeCacheAndTFile(SArray* result, IterateValue* icache, IterateValue* iTfv, SIdxTempResult* helper); +static void indexMergeCacheAndTFile(SArray* result, IterateValue* icache, IterateValue* iTfv, SIdxTRslt* helper); // static int32_t indexSerialTermKey(SIndexTerm* itm, char* buf); // int32_t indexSerialKey(ICacheKey* key, char* buf); @@ -343,7 +343,7 @@ static int indexTermSearch(SIndex* sIdx, SIndexTermQuery* query, SArray** result int64_t st = taosGetTimestampUs(); - SIdxTempResult* tr = idxTempResultCreate(); + SIdxTRslt* tr = idxTRsltCreate(); if (0 == indexCacheSearch(cache, query, tr, &s)) { if (s == kTypeDeletion) { indexInfo("col: %s already drop by", term->colName); @@ -365,12 +365,12 @@ static int indexTermSearch(SIndex* sIdx, SIndexTermQuery* query, SArray** result int64_t cost = taosGetTimestampUs() - st; indexInfo("search cost: %" PRIu64 "us", cost); - idxTempResultMergeTo(tr, *result); + idxTRsltMergeTo(tr, *result); - idxTempResultDestroy(tr); + idxTRsltDestroy(tr); return 0; END: - idxTempResultDestroy(tr); + idxTRsltDestroy(tr); return -1; } static void indexInterResultsDestroy(SArray* results) { @@ -406,18 +406,18 @@ static int indexMergeFinalResults(SArray* interResults, EIndexOperatorType oType return 0; } -static void indexMayMergeTempToFinalResult(SArray* result, TFileValue* tfv, SIdxTempResult* tr) { +static void indexMayMergeTempToFinalResult(SArray* result, TFileValue* tfv, SIdxTRslt* tr) { int32_t sz = taosArrayGetSize(result); if (sz > 0) { TFileValue* lv = taosArrayGetP(result, sz - 1); if (tfv != NULL && strcmp(lv->colVal, tfv->colVal) != 0) { - idxTempResultMergeTo(tr, lv->tableId); - idxTempResultClear(tr); + idxTRsltMergeTo(tr, lv->tableId); + idxTRsltClear(tr); taosArrayPush(result, &tfv); } else if (tfv == NULL) { // handle last iterator - idxTempResultMergeTo(tr, lv->tableId); + idxTRsltMergeTo(tr, lv->tableId); } else { // temp result saved in help tfileValueDestroy(tfv); @@ -426,7 +426,7 @@ static void indexMayMergeTempToFinalResult(SArray* result, TFileValue* tfv, SIdx taosArrayPush(result, &tfv); } } -static void indexMergeCacheAndTFile(SArray* result, IterateValue* cv, IterateValue* tv, SIdxTempResult* tr) { +static void indexMergeCacheAndTFile(SArray* result, IterateValue* cv, IterateValue* tv, SIdxTRslt* tr) { char* colVal = (cv != NULL) ? cv->colVal : tv->colVal; TFileValue* tfv = tfileValueCreate(colVal); @@ -436,9 +436,9 @@ static void indexMergeCacheAndTFile(SArray* result, IterateValue* cv, IterateVal uint64_t id = *(uint64_t*)taosArrayGet(cv->val, 0); uint32_t ver = cv->ver; if (cv->type == ADD_VALUE) { - INDEX_MERGE_ADD_DEL(tr->deled, tr->added, id) + INDEX_MERGE_ADD_DEL(tr->del, tr->add, id) } else if (cv->type == DEL_VALUE) { - INDEX_MERGE_ADD_DEL(tr->added, tr->deled, id) + INDEX_MERGE_ADD_DEL(tr->add, tr->del, id) } } if (tv != NULL) { @@ -491,7 +491,7 @@ int indexFlushCacheToTFile(SIndex* sIdx, void* cache) { bool cn = cacheIter ? cacheIter->next(cacheIter) : false; bool tn = tfileIter ? tfileIter->next(tfileIter) : false; - SIdxTempResult* tr = idxTempResultCreate(); + SIdxTRslt* tr = idxTRsltCreate(); while (cn == true || tn == true) { IterateValue* cv = (cn == true) ? cacheIter->getValue(cacheIter) : NULL; IterateValue* tv = (tn == true) ? tfileIter->getValue(tfileIter) : NULL; @@ -517,7 +517,7 @@ int indexFlushCacheToTFile(SIndex* sIdx, void* cache) { } } indexMayMergeTempToFinalResult(result, NULL, tr); - idxTempResultDestroy(tr); + idxTRsltDestroy(tr); int ret = indexGenTFile(sIdx, pCache, result); indexDestroyFinalResult(result); diff --git a/source/libs/index/src/indexCache.c b/source/libs/index/src/indexCache.c index bbe8f1f91a..3b33006452 100644 --- a/source/libs/index/src/indexCache.c +++ b/source/libs/index/src/indexCache.c @@ -36,32 +36,31 @@ static char* indexCacheTermGet(const void* pData); static MemTable* indexInternalCacheCreate(int8_t type); -static int32_t cacheSearchTerm(void* cache, SIndexTerm* ct, SIdxTempResult* tr, STermValueType* s); -static int32_t cacheSearchPrefix(void* cache, SIndexTerm* ct, SIdxTempResult* tr, STermValueType* s); -static int32_t cacheSearchSuffix(void* cache, SIndexTerm* ct, SIdxTempResult* tr, STermValueType* s); -static int32_t cacheSearchRegex(void* cache, SIndexTerm* ct, SIdxTempResult* tr, STermValueType* s); -static int32_t cacheSearchLessThan(void* cache, SIndexTerm* ct, SIdxTempResult* tr, STermValueType* s); -static int32_t cacheSearchLessEqual(void* cache, SIndexTerm* ct, SIdxTempResult* tr, STermValueType* s); -static int32_t cacheSearchGreaterThan(void* cache, SIndexTerm* ct, SIdxTempResult* tr, STermValueType* s); -static int32_t cacheSearchGreaterEqual(void* cache, SIndexTerm* ct, SIdxTempResult* tr, STermValueType* s); -static int32_t cacheSearchRange(void* cache, SIndexTerm* ct, SIdxTempResult* tr, STermValueType* s); +static int32_t cacheSearchTerm(void* cache, SIndexTerm* ct, SIdxTRslt* tr, STermValueType* s); +static int32_t cacheSearchPrefix(void* cache, SIndexTerm* ct, SIdxTRslt* tr, STermValueType* s); +static int32_t cacheSearchSuffix(void* cache, SIndexTerm* ct, SIdxTRslt* tr, STermValueType* s); +static int32_t cacheSearchRegex(void* cache, SIndexTerm* ct, SIdxTRslt* tr, STermValueType* s); +static int32_t cacheSearchLessThan(void* cache, SIndexTerm* ct, SIdxTRslt* tr, STermValueType* s); +static int32_t cacheSearchLessEqual(void* cache, SIndexTerm* ct, SIdxTRslt* tr, STermValueType* s); +static int32_t cacheSearchGreaterThan(void* cache, SIndexTerm* ct, SIdxTRslt* tr, STermValueType* s); +static int32_t cacheSearchGreaterEqual(void* cache, SIndexTerm* ct, SIdxTRslt* tr, STermValueType* s); +static int32_t cacheSearchRange(void* cache, SIndexTerm* ct, SIdxTRslt* tr, STermValueType* s); /*comm func of compare, used in (LE/LT/GE/GT compare)*/ -static int32_t cacheSearchCompareFunc(void* cache, SIndexTerm* ct, SIdxTempResult* tr, STermValueType* s, - RangeType type); -static int32_t cacheSearchTerm_JSON(void* cache, SIndexTerm* ct, SIdxTempResult* tr, STermValueType* s); -static int32_t cacheSearchPrefix_JSON(void* cache, SIndexTerm* ct, SIdxTempResult* tr, STermValueType* s); -static int32_t cacheSearchSuffix_JSON(void* cache, SIndexTerm* ct, SIdxTempResult* tr, STermValueType* s); -static int32_t cacheSearchRegex_JSON(void* cache, SIndexTerm* ct, SIdxTempResult* tr, STermValueType* s); -static int32_t cacheSearchLessThan_JSON(void* cache, SIndexTerm* ct, SIdxTempResult* tr, STermValueType* s); -static int32_t cacheSearchLessEqual_JSON(void* cache, SIndexTerm* ct, SIdxTempResult* tr, STermValueType* s); -static int32_t cacheSearchGreaterThan_JSON(void* cache, SIndexTerm* ct, SIdxTempResult* tr, STermValueType* s); -static int32_t cacheSearchGreaterEqual_JSON(void* cache, SIndexTerm* ct, SIdxTempResult* tr, STermValueType* s); -static int32_t cacheSearchRange_JSON(void* cache, SIndexTerm* ct, SIdxTempResult* tr, STermValueType* s); +static int32_t cacheSearchCompareFunc(void* cache, SIndexTerm* ct, SIdxTRslt* tr, STermValueType* s, RangeType type); +static int32_t cacheSearchTerm_JSON(void* cache, SIndexTerm* ct, SIdxTRslt* tr, STermValueType* s); +static int32_t cacheSearchPrefix_JSON(void* cache, SIndexTerm* ct, SIdxTRslt* tr, STermValueType* s); +static int32_t cacheSearchSuffix_JSON(void* cache, SIndexTerm* ct, SIdxTRslt* tr, STermValueType* s); +static int32_t cacheSearchRegex_JSON(void* cache, SIndexTerm* ct, SIdxTRslt* tr, STermValueType* s); +static int32_t cacheSearchLessThan_JSON(void* cache, SIndexTerm* ct, SIdxTRslt* tr, STermValueType* s); +static int32_t cacheSearchLessEqual_JSON(void* cache, SIndexTerm* ct, SIdxTRslt* tr, STermValueType* s); +static int32_t cacheSearchGreaterThan_JSON(void* cache, SIndexTerm* ct, SIdxTRslt* tr, STermValueType* s); +static int32_t cacheSearchGreaterEqual_JSON(void* cache, SIndexTerm* ct, SIdxTRslt* tr, STermValueType* s); +static int32_t cacheSearchRange_JSON(void* cache, SIndexTerm* ct, SIdxTRslt* tr, STermValueType* s); -static int32_t cacheSearchCompareFunc_JSON(void* cache, SIndexTerm* term, SIdxTempResult* tr, STermValueType* s, +static int32_t cacheSearchCompareFunc_JSON(void* cache, SIndexTerm* term, SIdxTRslt* tr, STermValueType* s, RangeType type); -static int32_t (*cacheSearch[][QUERY_MAX])(void* cache, SIndexTerm* ct, SIdxTempResult* tr, STermValueType* s) = { +static int32_t (*cacheSearch[][QUERY_MAX])(void* cache, SIndexTerm* ct, SIdxTRslt* tr, STermValueType* s) = { {cacheSearchTerm, cacheSearchPrefix, cacheSearchSuffix, cacheSearchRegex, cacheSearchLessThan, cacheSearchLessEqual, cacheSearchGreaterThan, cacheSearchGreaterEqual, cacheSearchRange}, {cacheSearchTerm_JSON, cacheSearchPrefix_JSON, cacheSearchSuffix_JSON, cacheSearchRegex_JSON, @@ -71,7 +70,7 @@ static int32_t (*cacheSearch[][QUERY_MAX])(void* cache, SIndexTerm* ct, SIdxTemp static void doMergeWork(SSchedMsg* msg); static bool indexCacheIteratorNext(Iterate* itera); -static int32_t cacheSearchTerm(void* cache, SIndexTerm* term, SIdxTempResult* tr, STermValueType* s) { +static int32_t cacheSearchTerm(void* cache, SIndexTerm* term, SIdxTRslt* tr, STermValueType* s) { if (cache == NULL) { return 0; } @@ -93,11 +92,11 @@ static int32_t cacheSearchTerm(void* cache, SIndexTerm* term, SIdxTempResult* tr CacheTerm* c = (CacheTerm*)SL_GET_NODE_DATA(node); if (0 == strcmp(c->colVal, pCt->colVal)) { if (c->operaType == ADD_VALUE) { - INDEX_MERGE_ADD_DEL(tr->deled, tr->added, c->uid) + INDEX_MERGE_ADD_DEL(tr->del, tr->add, c->uid) // taosArrayPush(result, &c->uid); *s = kTypeValue; } else if (c->operaType == DEL_VALUE) { - INDEX_MERGE_ADD_DEL(tr->added, tr->deled, c->uid) + INDEX_MERGE_ADD_DEL(tr->add, tr->del, c->uid) } } else { break; @@ -108,20 +107,19 @@ static int32_t cacheSearchTerm(void* cache, SIndexTerm* term, SIdxTempResult* tr tSkipListDestroyIter(iter); return 0; } -static int32_t cacheSearchPrefix(void* cache, SIndexTerm* term, SIdxTempResult* tr, STermValueType* s) { +static int32_t cacheSearchPrefix(void* cache, SIndexTerm* term, SIdxTRslt* tr, STermValueType* s) { // impl later return 0; } -static int32_t cacheSearchSuffix(void* cache, SIndexTerm* term, SIdxTempResult* tr, STermValueType* s) { +static int32_t cacheSearchSuffix(void* cache, SIndexTerm* term, SIdxTRslt* tr, STermValueType* s) { // impl later return 0; } -static int32_t cacheSearchRegex(void* cache, SIndexTerm* term, SIdxTempResult* tr, STermValueType* s) { +static int32_t cacheSearchRegex(void* cache, SIndexTerm* term, SIdxTRslt* tr, STermValueType* s) { // impl later return 0; } -static int32_t cacheSearchCompareFunc(void* cache, SIndexTerm* term, SIdxTempResult* tr, STermValueType* s, - RangeType type) { +static int32_t cacheSearchCompareFunc(void* cache, SIndexTerm* term, SIdxTRslt* tr, STermValueType* s, RangeType type) { if (cache == NULL) { return 0; } @@ -148,11 +146,11 @@ static int32_t cacheSearchCompareFunc(void* cache, SIndexTerm* term, SIdxTempRes TExeCond cond = cmpFn(c->colVal, pCt->colVal, pCt->colType); if (cond == MATCH) { if (c->operaType == ADD_VALUE) { - INDEX_MERGE_ADD_DEL(tr->deled, tr->added, c->uid) + INDEX_MERGE_ADD_DEL(tr->del, tr->add, c->uid) // taosArrayPush(result, &c->uid); *s = kTypeValue; } else if (c->operaType == DEL_VALUE) { - INDEX_MERGE_ADD_DEL(tr->added, tr->deled, c->uid) + INDEX_MERGE_ADD_DEL(tr->add, tr->del, c->uid) } } else if (cond == CONTINUE) { continue; @@ -164,20 +162,20 @@ static int32_t cacheSearchCompareFunc(void* cache, SIndexTerm* term, SIdxTempRes tSkipListDestroyIter(iter); return TSDB_CODE_SUCCESS; } -static int32_t cacheSearchLessThan(void* cache, SIndexTerm* term, SIdxTempResult* tr, STermValueType* s) { +static int32_t cacheSearchLessThan(void* cache, SIndexTerm* term, SIdxTRslt* tr, STermValueType* s) { return cacheSearchCompareFunc(cache, term, tr, s, LT); } -static int32_t cacheSearchLessEqual(void* cache, SIndexTerm* term, SIdxTempResult* tr, STermValueType* s) { +static int32_t cacheSearchLessEqual(void* cache, SIndexTerm* term, SIdxTRslt* tr, STermValueType* s) { return cacheSearchCompareFunc(cache, term, tr, s, LE); } -static int32_t cacheSearchGreaterThan(void* cache, SIndexTerm* term, SIdxTempResult* tr, STermValueType* s) { +static int32_t cacheSearchGreaterThan(void* cache, SIndexTerm* term, SIdxTRslt* tr, STermValueType* s) { return cacheSearchCompareFunc(cache, term, tr, s, GT); } -static int32_t cacheSearchGreaterEqual(void* cache, SIndexTerm* term, SIdxTempResult* tr, STermValueType* s) { +static int32_t cacheSearchGreaterEqual(void* cache, SIndexTerm* term, SIdxTRslt* tr, STermValueType* s) { return cacheSearchCompareFunc(cache, term, tr, s, GE); } -static int32_t cacheSearchTerm_JSON(void* cache, SIndexTerm* term, SIdxTempResult* tr, STermValueType* s) { +static int32_t cacheSearchTerm_JSON(void* cache, SIndexTerm* term, SIdxTRslt* tr, STermValueType* s) { if (cache == NULL) { return 0; } @@ -205,11 +203,11 @@ static int32_t cacheSearchTerm_JSON(void* cache, SIndexTerm* term, SIdxTempResul if (0 == strcmp(c->colVal, pCt->colVal)) { if (c->operaType == ADD_VALUE) { - INDEX_MERGE_ADD_DEL(tr->deled, tr->added, c->uid) + INDEX_MERGE_ADD_DEL(tr->del, tr->add, c->uid) // taosArrayPush(result, &c->uid); *s = kTypeValue; } else if (c->operaType == DEL_VALUE) { - INDEX_MERGE_ADD_DEL(tr->added, tr->deled, c->uid) + INDEX_MERGE_ADD_DEL(tr->add, tr->del, c->uid) } } else { break; @@ -223,32 +221,32 @@ static int32_t cacheSearchTerm_JSON(void* cache, SIndexTerm* term, SIdxTempResul return TSDB_CODE_SUCCESS; } -static int32_t cacheSearchPrefix_JSON(void* cache, SIndexTerm* term, SIdxTempResult* tr, STermValueType* s) { +static int32_t cacheSearchPrefix_JSON(void* cache, SIndexTerm* term, SIdxTRslt* tr, STermValueType* s) { return TSDB_CODE_SUCCESS; } -static int32_t cacheSearchSuffix_JSON(void* cache, SIndexTerm* term, SIdxTempResult* tr, STermValueType* s) { +static int32_t cacheSearchSuffix_JSON(void* cache, SIndexTerm* term, SIdxTRslt* tr, STermValueType* s) { return TSDB_CODE_SUCCESS; } -static int32_t cacheSearchRegex_JSON(void* cache, SIndexTerm* term, SIdxTempResult* tr, STermValueType* s) { +static int32_t cacheSearchRegex_JSON(void* cache, SIndexTerm* term, SIdxTRslt* tr, STermValueType* s) { return TSDB_CODE_SUCCESS; } -static int32_t cacheSearchLessThan_JSON(void* cache, SIndexTerm* term, SIdxTempResult* tr, STermValueType* s) { +static int32_t cacheSearchLessThan_JSON(void* cache, SIndexTerm* term, SIdxTRslt* tr, STermValueType* s) { return cacheSearchCompareFunc_JSON(cache, term, tr, s, LT); } -static int32_t cacheSearchLessEqual_JSON(void* cache, SIndexTerm* term, SIdxTempResult* tr, STermValueType* s) { +static int32_t cacheSearchLessEqual_JSON(void* cache, SIndexTerm* term, SIdxTRslt* tr, STermValueType* s) { return cacheSearchCompareFunc_JSON(cache, term, tr, s, LE); } -static int32_t cacheSearchGreaterThan_JSON(void* cache, SIndexTerm* term, SIdxTempResult* tr, STermValueType* s) { +static int32_t cacheSearchGreaterThan_JSON(void* cache, SIndexTerm* term, SIdxTRslt* tr, STermValueType* s) { return cacheSearchCompareFunc_JSON(cache, term, tr, s, GT); } -static int32_t cacheSearchGreaterEqual_JSON(void* cache, SIndexTerm* term, SIdxTempResult* tr, STermValueType* s) { +static int32_t cacheSearchGreaterEqual_JSON(void* cache, SIndexTerm* term, SIdxTRslt* tr, STermValueType* s) { return cacheSearchCompareFunc_JSON(cache, term, tr, s, GE); } -static int32_t cacheSearchRange_JSON(void* cache, SIndexTerm* term, SIdxTempResult* tr, STermValueType* s) { +static int32_t cacheSearchRange_JSON(void* cache, SIndexTerm* term, SIdxTRslt* tr, STermValueType* s) { return TSDB_CODE_SUCCESS; } -static int32_t cacheSearchCompareFunc_JSON(void* cache, SIndexTerm* term, SIdxTempResult* tr, STermValueType* s, +static int32_t cacheSearchCompareFunc_JSON(void* cache, SIndexTerm* term, SIdxTRslt* tr, STermValueType* s, RangeType type) { if (cache == NULL) { return 0; @@ -290,11 +288,11 @@ static int32_t cacheSearchCompareFunc_JSON(void* cache, SIndexTerm* term, SIdxTe TExeCond cond = cmpFn(p + skip, term->colVal, dType); if (cond == MATCH) { if (c->operaType == ADD_VALUE) { - INDEX_MERGE_ADD_DEL(tr->deled, tr->added, c->uid) + INDEX_MERGE_ADD_DEL(tr->del, tr->add, c->uid) // taosArrayPush(result, &c->uid); *s = kTypeValue; } else if (c->operaType == DEL_VALUE) { - INDEX_MERGE_ADD_DEL(tr->added, tr->deled, c->uid) + INDEX_MERGE_ADD_DEL(tr->add, tr->del, c->uid) } } else if (cond == CONTINUE) { continue; @@ -310,7 +308,7 @@ static int32_t cacheSearchCompareFunc_JSON(void* cache, SIndexTerm* term, SIdxTe return TSDB_CODE_SUCCESS; } -static int32_t cacheSearchRange(void* cache, SIndexTerm* term, SIdxTempResult* tr, STermValueType* s) { +static int32_t cacheSearchRange(void* cache, SIndexTerm* term, SIdxTRslt* tr, STermValueType* s) { // impl later return 0; } @@ -569,7 +567,7 @@ int indexCacheDel(void* cache, const char* fieldValue, int32_t fvlen, uint64_t u return 0; } -static int32_t indexQueryMem(MemTable* mem, SIndexTermQuery* query, SIdxTempResult* tr, STermValueType* s) { +static int32_t indexQueryMem(MemTable* mem, SIndexTermQuery* query, SIdxTRslt* tr, STermValueType* s) { if (mem == NULL) { return 0; } @@ -583,7 +581,7 @@ static int32_t indexQueryMem(MemTable* mem, SIndexTermQuery* query, SIdxTempResu return cacheSearch[0][qtype](mem, term, tr, s); } } -int indexCacheSearch(void* cache, SIndexTermQuery* query, SIdxTempResult* result, STermValueType* s) { +int indexCacheSearch(void* cache, SIndexTermQuery* query, SIdxTRslt* result, STermValueType* s) { int64_t st = taosGetTimestampUs(); if (cache == NULL) { return 0; diff --git a/source/libs/index/src/indexTfile.c b/source/libs/index/src/indexTfile.c index 4047ee6e38..53dd2923ac 100644 --- a/source/libs/index/src/indexTfile.c +++ b/source/libs/index/src/indexTfile.c @@ -60,31 +60,31 @@ static void tfileGenFileFullName(char* fullname, const char* path, uint64_t s /* * search from tfile */ -static int32_t tfSearchTerm(void* reader, SIndexTerm* tem, SIdxTempResult* tr); -static int32_t tfSearchPrefix(void* reader, SIndexTerm* tem, SIdxTempResult* tr); -static int32_t tfSearchSuffix(void* reader, SIndexTerm* tem, SIdxTempResult* tr); -static int32_t tfSearchRegex(void* reader, SIndexTerm* tem, SIdxTempResult* tr); -static int32_t tfSearchLessThan(void* reader, SIndexTerm* tem, SIdxTempResult* tr); -static int32_t tfSearchLessEqual(void* reader, SIndexTerm* tem, SIdxTempResult* tr); -static int32_t tfSearchGreaterThan(void* reader, SIndexTerm* tem, SIdxTempResult* tr); -static int32_t tfSearchGreaterEqual(void* reader, SIndexTerm* tem, SIdxTempResult* tr); -static int32_t tfSearchRange(void* reader, SIndexTerm* tem, SIdxTempResult* tr); +static int32_t tfSearchTerm(void* reader, SIndexTerm* tem, SIdxTRslt* tr); +static int32_t tfSearchPrefix(void* reader, SIndexTerm* tem, SIdxTRslt* tr); +static int32_t tfSearchSuffix(void* reader, SIndexTerm* tem, SIdxTRslt* tr); +static int32_t tfSearchRegex(void* reader, SIndexTerm* tem, SIdxTRslt* tr); +static int32_t tfSearchLessThan(void* reader, SIndexTerm* tem, SIdxTRslt* tr); +static int32_t tfSearchLessEqual(void* reader, SIndexTerm* tem, SIdxTRslt* tr); +static int32_t tfSearchGreaterThan(void* reader, SIndexTerm* tem, SIdxTRslt* tr); +static int32_t tfSearchGreaterEqual(void* reader, SIndexTerm* tem, SIdxTRslt* tr); +static int32_t tfSearchRange(void* reader, SIndexTerm* tem, SIdxTRslt* tr); -static int32_t tfSearchCompareFunc(void* reader, SIndexTerm* tem, SIdxTempResult* tr, RangeType ctype); +static int32_t tfSearchCompareFunc(void* reader, SIndexTerm* tem, SIdxTRslt* tr, RangeType ctype); -static int32_t tfSearchTerm_JSON(void* reader, SIndexTerm* tem, SIdxTempResult* tr); -static int32_t tfSearchPrefix_JSON(void* reader, SIndexTerm* tem, SIdxTempResult* tr); -static int32_t tfSearchSuffix_JSON(void* reader, SIndexTerm* tem, SIdxTempResult* tr); -static int32_t tfSearchRegex_JSON(void* reader, SIndexTerm* tem, SIdxTempResult* tr); -static int32_t tfSearchLessThan_JSON(void* reader, SIndexTerm* tem, SIdxTempResult* tr); -static int32_t tfSearchLessEqual_JSON(void* reader, SIndexTerm* tem, SIdxTempResult* tr); -static int32_t tfSearchGreaterThan_JSON(void* reader, SIndexTerm* tem, SIdxTempResult* tr); -static int32_t tfSearchGreaterEqual_JSON(void* reader, SIndexTerm* tem, SIdxTempResult* tr); -static int32_t tfSearchRange_JSON(void* reader, SIndexTerm* tem, SIdxTempResult* tr); +static int32_t tfSearchTerm_JSON(void* reader, SIndexTerm* tem, SIdxTRslt* tr); +static int32_t tfSearchPrefix_JSON(void* reader, SIndexTerm* tem, SIdxTRslt* tr); +static int32_t tfSearchSuffix_JSON(void* reader, SIndexTerm* tem, SIdxTRslt* tr); +static int32_t tfSearchRegex_JSON(void* reader, SIndexTerm* tem, SIdxTRslt* tr); +static int32_t tfSearchLessThan_JSON(void* reader, SIndexTerm* tem, SIdxTRslt* tr); +static int32_t tfSearchLessEqual_JSON(void* reader, SIndexTerm* tem, SIdxTRslt* tr); +static int32_t tfSearchGreaterThan_JSON(void* reader, SIndexTerm* tem, SIdxTRslt* tr); +static int32_t tfSearchGreaterEqual_JSON(void* reader, SIndexTerm* tem, SIdxTRslt* tr); +static int32_t tfSearchRange_JSON(void* reader, SIndexTerm* tem, SIdxTRslt* tr); -static int32_t tfSearchCompareFunc_JSON(void* reader, SIndexTerm* tem, SIdxTempResult* tr, RangeType ctype); +static int32_t tfSearchCompareFunc_JSON(void* reader, SIndexTerm* tem, SIdxTRslt* tr, RangeType ctype); -static int32_t (*tfSearch[][QUERY_MAX])(void* reader, SIndexTerm* tem, SIdxTempResult* tr) = { +static int32_t (*tfSearch[][QUERY_MAX])(void* reader, SIndexTerm* tem, SIdxTRslt* tr) = { {tfSearchTerm, tfSearchPrefix, tfSearchSuffix, tfSearchRegex, tfSearchLessThan, tfSearchLessEqual, tfSearchGreaterThan, tfSearchGreaterEqual, tfSearchRange}, {tfSearchTerm_JSON, tfSearchPrefix_JSON, tfSearchSuffix_JSON, tfSearchRegex_JSON, tfSearchLessThan_JSON, @@ -220,7 +220,7 @@ void tfileReaderDestroy(TFileReader* reader) { taosMemoryFree(reader); } -static int32_t tfSearchTerm(void* reader, SIndexTerm* tem, SIdxTempResult* tr) { +static int32_t tfSearchTerm(void* reader, SIndexTerm* tem, SIdxTRslt* tr) { int ret = 0; char* p = tem->colVal; uint64_t sz = tem->nColVal; @@ -243,7 +243,7 @@ static int32_t tfSearchTerm(void* reader, SIndexTerm* tem, SIdxTempResult* tr) { return 0; } -static int32_t tfSearchPrefix(void* reader, SIndexTerm* tem, SIdxTempResult* tr) { +static int32_t tfSearchPrefix(void* reader, SIndexTerm* tem, SIdxTRslt* tr) { bool hasJson = INDEX_TYPE_CONTAIN_EXTERN_TYPE(tem->colType, TSDB_DATA_TYPE_JSON); char* p = tem->colVal; uint64_t sz = tem->nColVal; @@ -279,7 +279,7 @@ static int32_t tfSearchPrefix(void* reader, SIndexTerm* tem, SIdxTempResult* tr) } return 0; } -static int32_t tfSearchSuffix(void* reader, SIndexTerm* tem, SIdxTempResult* tr) { +static int32_t tfSearchSuffix(void* reader, SIndexTerm* tem, SIdxTRslt* tr) { bool hasJson = INDEX_TYPE_CONTAIN_EXTERN_TYPE(tem->colType, TSDB_DATA_TYPE_JSON); int ret = 0; @@ -298,7 +298,7 @@ static int32_t tfSearchSuffix(void* reader, SIndexTerm* tem, SIdxTempResult* tr) fstSliceDestroy(&key); return 0; } -static int32_t tfSearchRegex(void* reader, SIndexTerm* tem, SIdxTempResult* tr) { +static int32_t tfSearchRegex(void* reader, SIndexTerm* tem, SIdxTRslt* tr) { bool hasJson = INDEX_TYPE_CONTAIN_EXTERN_TYPE(tem->colType, TSDB_DATA_TYPE_JSON); int ret = 0; @@ -319,7 +319,7 @@ static int32_t tfSearchRegex(void* reader, SIndexTerm* tem, SIdxTempResult* tr) return 0; } -static int32_t tfSearchCompareFunc(void* reader, SIndexTerm* tem, SIdxTempResult* tr, RangeType type) { +static int32_t tfSearchCompareFunc(void* reader, SIndexTerm* tem, SIdxTRslt* tr, RangeType type) { int ret = 0; char* p = tem->colVal; int skip = 0; @@ -358,19 +358,19 @@ static int32_t tfSearchCompareFunc(void* reader, SIndexTerm* tem, SIdxTempResult fstStreamBuilderDestroy(sb); return TSDB_CODE_SUCCESS; } -static int32_t tfSearchLessThan(void* reader, SIndexTerm* tem, SIdxTempResult* tr) { +static int32_t tfSearchLessThan(void* reader, SIndexTerm* tem, SIdxTRslt* tr) { return tfSearchCompareFunc(reader, tem, tr, LT); } -static int32_t tfSearchLessEqual(void* reader, SIndexTerm* tem, SIdxTempResult* tr) { +static int32_t tfSearchLessEqual(void* reader, SIndexTerm* tem, SIdxTRslt* tr) { return tfSearchCompareFunc(reader, tem, tr, LE); } -static int32_t tfSearchGreaterThan(void* reader, SIndexTerm* tem, SIdxTempResult* tr) { +static int32_t tfSearchGreaterThan(void* reader, SIndexTerm* tem, SIdxTRslt* tr) { return tfSearchCompareFunc(reader, tem, tr, GT); } -static int32_t tfSearchGreaterEqual(void* reader, SIndexTerm* tem, SIdxTempResult* tr) { +static int32_t tfSearchGreaterEqual(void* reader, SIndexTerm* tem, SIdxTRslt* tr) { return tfSearchCompareFunc(reader, tem, tr, GE); } -static int32_t tfSearchRange(void* reader, SIndexTerm* tem, SIdxTempResult* tr) { +static int32_t tfSearchRange(void* reader, SIndexTerm* tem, SIdxTRslt* tr) { bool hasJson = INDEX_TYPE_CONTAIN_EXTERN_TYPE(tem->colType, TSDB_DATA_TYPE_JSON); int ret = 0; char* p = tem->colVal; @@ -399,7 +399,7 @@ static int32_t tfSearchRange(void* reader, SIndexTerm* tem, SIdxTempResult* tr) fstSliceDestroy(&key); return 0; } -static int32_t tfSearchTerm_JSON(void* reader, SIndexTerm* tem, SIdxTempResult* tr) { +static int32_t tfSearchTerm_JSON(void* reader, SIndexTerm* tem, SIdxTRslt* tr) { int ret = 0; char* p = indexPackJsonData(tem); int sz = strlen(p); @@ -424,36 +424,36 @@ static int32_t tfSearchTerm_JSON(void* reader, SIndexTerm* tem, SIdxTempResult* // deprecate api return TSDB_CODE_SUCCESS; } -static int32_t tfSearchPrefix_JSON(void* reader, SIndexTerm* tem, SIdxTempResult* tr) { +static int32_t tfSearchPrefix_JSON(void* reader, SIndexTerm* tem, SIdxTRslt* tr) { // impl later return TSDB_CODE_SUCCESS; } -static int32_t tfSearchSuffix_JSON(void* reader, SIndexTerm* tem, SIdxTempResult* tr) { +static int32_t tfSearchSuffix_JSON(void* reader, SIndexTerm* tem, SIdxTRslt* tr) { // impl later return TSDB_CODE_SUCCESS; } -static int32_t tfSearchRegex_JSON(void* reader, SIndexTerm* tem, SIdxTempResult* tr) { +static int32_t tfSearchRegex_JSON(void* reader, SIndexTerm* tem, SIdxTRslt* tr) { // impl later return TSDB_CODE_SUCCESS; } -static int32_t tfSearchLessThan_JSON(void* reader, SIndexTerm* tem, SIdxTempResult* tr) { +static int32_t tfSearchLessThan_JSON(void* reader, SIndexTerm* tem, SIdxTRslt* tr) { return tfSearchCompareFunc_JSON(reader, tem, tr, LT); } -static int32_t tfSearchLessEqual_JSON(void* reader, SIndexTerm* tem, SIdxTempResult* tr) { +static int32_t tfSearchLessEqual_JSON(void* reader, SIndexTerm* tem, SIdxTRslt* tr) { return tfSearchCompareFunc_JSON(reader, tem, tr, LE); } -static int32_t tfSearchGreaterThan_JSON(void* reader, SIndexTerm* tem, SIdxTempResult* tr) { +static int32_t tfSearchGreaterThan_JSON(void* reader, SIndexTerm* tem, SIdxTRslt* tr) { return tfSearchCompareFunc_JSON(reader, tem, tr, GT); } -static int32_t tfSearchGreaterEqual_JSON(void* reader, SIndexTerm* tem, SIdxTempResult* tr) { +static int32_t tfSearchGreaterEqual_JSON(void* reader, SIndexTerm* tem, SIdxTRslt* tr) { return tfSearchCompareFunc_JSON(reader, tem, tr, GE); } -static int32_t tfSearchRange_JSON(void* reader, SIndexTerm* tem, SIdxTempResult* tr) { +static int32_t tfSearchRange_JSON(void* reader, SIndexTerm* tem, SIdxTRslt* tr) { // impl later return TSDB_CODE_SUCCESS; } -static int32_t tfSearchCompareFunc_JSON(void* reader, SIndexTerm* tem, SIdxTempResult* tr, RangeType ctype) { +static int32_t tfSearchCompareFunc_JSON(void* reader, SIndexTerm* tem, SIdxTRslt* tr, RangeType ctype) { int ret = 0; int skip = 0; @@ -501,7 +501,7 @@ static int32_t tfSearchCompareFunc_JSON(void* reader, SIndexTerm* tem, SIdxTempR fstStreamBuilderDestroy(sb); return TSDB_CODE_SUCCESS; } -int tfileReaderSearch(TFileReader* reader, SIndexTermQuery* query, SIdxTempResult* tr) { +int tfileReaderSearch(TFileReader* reader, SIndexTermQuery* query, SIdxTRslt* tr) { SIndexTerm* term = query->term; EIndexQueryType qtype = query->qType; int ret = 0; @@ -673,7 +673,7 @@ void indexTFileDestroy(IndexTFile* tfile) { taosMemoryFree(tfile); } -int indexTFileSearch(void* tfile, SIndexTermQuery* query, SIdxTempResult* result) { +int indexTFileSearch(void* tfile, SIndexTermQuery* query, SIdxTRslt* result) { int ret = -1; if (tfile == NULL) { return ret; diff --git a/source/libs/index/src/indexUtil.c b/source/libs/index/src/indexUtil.c index 049bc213bc..1d20278895 100644 --- a/source/libs/index/src/indexUtil.c +++ b/source/libs/index/src/indexUtil.c @@ -36,24 +36,24 @@ static int iBinarySearch(SArray *arr, int s, int e, uint64_t k) { return s; } -void iIntersection(SArray *inters, SArray *final) { - int32_t sz = (int32_t)taosArrayGetSize(inters); +void iIntersection(SArray *in, SArray *out) { + int32_t sz = (int32_t)taosArrayGetSize(in); if (sz <= 0) { return; } MergeIndex *mi = taosMemoryCalloc(sz, sizeof(MergeIndex)); for (int i = 0; i < sz; i++) { - SArray *t = taosArrayGetP(inters, i); + SArray *t = taosArrayGetP(in, i); mi[i].len = (int32_t)taosArrayGetSize(t); mi[i].idx = 0; } - SArray *base = taosArrayGetP(inters, 0); + SArray *base = taosArrayGetP(in, 0); for (int i = 0; i < taosArrayGetSize(base); i++) { uint64_t tgt = *(uint64_t *)taosArrayGet(base, i); bool has = true; - for (int j = 1; j < taosArrayGetSize(inters); j++) { - SArray *oth = taosArrayGetP(inters, j); + for (int j = 1; j < taosArrayGetSize(in); j++) { + SArray *oth = taosArrayGetP(in, j); int mid = iBinarySearch(oth, mi[j].idx, mi[j].len - 1, tgt); if (mid >= 0 && mid < mi[j].len) { uint64_t val = *(uint64_t *)taosArrayGet(oth, mid); @@ -64,24 +64,24 @@ void iIntersection(SArray *inters, SArray *final) { } } if (has == true) { - taosArrayPush(final, &tgt); + taosArrayPush(out, &tgt); } } taosMemoryFreeClear(mi); } -void iUnion(SArray *inters, SArray *final) { - int32_t sz = (int32_t)taosArrayGetSize(inters); +void iUnion(SArray *in, SArray *out) { + int32_t sz = (int32_t)taosArrayGetSize(in); if (sz <= 0) { return; } if (sz == 1) { - taosArrayAddAll(final, taosArrayGetP(inters, 0)); + taosArrayAddAll(out, taosArrayGetP(in, 0)); return; } MergeIndex *mi = taosMemoryCalloc(sz, sizeof(MergeIndex)); for (int i = 0; i < sz; i++) { - SArray *t = taosArrayGetP(inters, i); + SArray *t = taosArrayGetP(in, i); mi[i].len = (int32_t)taosArrayGetSize(t); mi[i].idx = 0; } @@ -90,7 +90,7 @@ void iUnion(SArray *inters, SArray *final) { int mIdx = -1; for (int j = 0; j < sz; j++) { - SArray *t = taosArrayGetP(inters, j); + SArray *t = taosArrayGetP(in, j); if (mi[j].idx >= mi[j].len) { continue; } @@ -102,13 +102,13 @@ void iUnion(SArray *inters, SArray *final) { } if (mIdx != -1) { mi[mIdx].idx++; - if (taosArrayGetSize(final) > 0) { - uint64_t lVal = *(uint64_t *)taosArrayGetLast(final); + if (taosArrayGetSize(out) > 0) { + uint64_t lVal = *(uint64_t *)taosArrayGetLast(out); if (lVal == mVal) { continue; } } - taosArrayPush(final, &mVal); + taosArrayPush(out, &mVal); } else { break; } @@ -158,46 +158,44 @@ int verdataCompare(const void *a, const void *b) { return cmp; } -SIdxTempResult *idxTempResultCreate() { - SIdxTempResult *tr = taosMemoryCalloc(1, sizeof(SIdxTempResult)); +SIdxTRslt *idxTRsltCreate() { + SIdxTRslt *tr = taosMemoryCalloc(1, sizeof(SIdxTRslt)); tr->total = taosArrayInit(4, sizeof(uint64_t)); - tr->added = taosArrayInit(4, sizeof(uint64_t)); - tr->deled = taosArrayInit(4, sizeof(uint64_t)); + tr->add = taosArrayInit(4, sizeof(uint64_t)); + tr->del = taosArrayInit(4, sizeof(uint64_t)); return tr; } -void idxTempResultClear(SIdxTempResult *tr) { +void idxTRsltClear(SIdxTRslt *tr) { if (tr == NULL) { return; } taosArrayClear(tr->total); - taosArrayClear(tr->added); - taosArrayClear(tr->deled); + taosArrayClear(tr->add); + taosArrayClear(tr->del); } -void idxTempResultDestroy(SIdxTempResult *tr) { +void idxTRsltDestroy(SIdxTRslt *tr) { if (tr == NULL) { return; } taosArrayDestroy(tr->total); - taosArrayDestroy(tr->added); - taosArrayDestroy(tr->deled); + taosArrayDestroy(tr->add); + taosArrayDestroy(tr->del); } -void idxTempResultMergeTo(SIdxTempResult *tr, SArray *result) { +void idxTRsltMergeTo(SIdxTRslt *tr, SArray *result) { taosArraySort(tr->total, uidCompare); - taosArraySort(tr->added, uidCompare); - taosArraySort(tr->deled, uidCompare); + taosArraySort(tr->add, uidCompare); + taosArraySort(tr->del, uidCompare); - SArray *arrs = taosArrayInit(2, sizeof(void *)); - taosArrayPush(arrs, &tr->total); - taosArrayPush(arrs, &tr->added); - - iUnion(arrs, result); - taosArrayDestroy(arrs); - - indexError("tmp result: total: %d, added: %d, del: %d", (int)taosArrayGetSize(tr->total), - (int)taosArrayGetSize(tr->added), (int)taosArrayGetSize(tr->deled)); - if (taosArrayGetSize(tr->added) != 0 && taosArrayGetSize(result) == 0) { - indexError("except result: %d", (int)(taosArrayGetSize(result))); + if (taosArrayGetSize(tr->total) == 0 || taosArrayGetSize(tr->add) == 0) { + SArray *t = taosArrayGetSize(tr->total) == 0 ? tr->add : tr->total; + taosArrayAddAll(result, t); + } else { + SArray *arrs = taosArrayInit(2, sizeof(void *)); + taosArrayPush(arrs, &tr->total); + taosArrayPush(arrs, &tr->add); + iUnion(arrs, result); + taosArrayDestroy(arrs); } - iExcept(result, tr->deled); + iExcept(result, tr->del); } diff --git a/source/libs/index/test/indexTests.cc b/source/libs/index/test/indexTests.cc index 9bc732f52e..77635af6fe 100644 --- a/source/libs/index/test/indexTests.cc +++ b/source/libs/index/test/indexTests.cc @@ -411,12 +411,12 @@ class TFileObj { // // } - SIdxTempResult* tr = idxTempResultCreate(); + SIdxTRslt* tr = idxTRsltCreate(); int ret = tfileReaderSearch(reader_, query, tr); - idxTempResultMergeTo(tr, result); - idxTempResultDestroy(tr); + idxTRsltMergeTo(tr, result); + idxTRsltDestroy(tr); return ret; } ~TFileObj() { @@ -531,11 +531,11 @@ class CacheObj { indexCacheDebug(cache); } int Get(SIndexTermQuery* query, int16_t colId, int32_t version, SArray* result, STermValueType* s) { - SIdxTempResult* tr = idxTempResultCreate(); + SIdxTRslt* tr = idxTRsltCreate(); int ret = indexCacheSearch(cache, query, tr, s); - idxTempResultMergeTo(tr, result); - idxTempResultDestroy(tr); + idxTRsltMergeTo(tr, result); + idxTRsltDestroy(tr); if (ret != 0) { std::cout << "failed to get from cache:" << ret << std::endl; diff --git a/source/libs/index/test/utilUT.cc b/source/libs/index/test/utilUT.cc index 5bf7c51f1f..4a30160244 100644 --- a/source/libs/index/test/utilUT.cc +++ b/source/libs/index/test/utilUT.cc @@ -338,12 +338,22 @@ TEST_F(UtilEnv, testFill) { } } TEST_F(UtilEnv, TempResult) { - SIdxTempResult *relt = idxTempResultCreate(); + SIdxTRslt *relt = idxTRsltCreate(); SArray *f = taosArrayInit(0, sizeof(uint64_t)); uint64_t val = UINT64_MAX - 1; - taosArrayPush(relt->added, &val); - idxTempResultMergeTo(relt, f); + taosArrayPush(relt->add, &val); + idxTRsltMergeTo(relt, f); + EXPECT_EQ(taosArrayGetSize(f), 1); +} +TEST_F(UtilEnv, TempResultExcept) { + SIdxTRslt *relt = idxTRsltCreate(); + + SArray *f = taosArrayInit(0, sizeof(uint64_t)); + + uint64_t val = UINT64_MAX; + taosArrayPush(relt->add, &val); + idxTRsltMergeTo(relt, f); EXPECT_EQ(taosArrayGetSize(f), 1); } From ddab67520726ebcd32dbdb0b4236be0296dbff5b Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Sat, 28 May 2022 19:41:27 +0800 Subject: [PATCH 4/4] set default idx --- source/libs/index/src/indexFilter.c | 4 ++++ source/libs/index/test/CMakeLists.txt | 2 +- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/source/libs/index/src/indexFilter.c b/source/libs/index/src/indexFilter.c index ab96b2929c..b882caa168 100644 --- a/source/libs/index/src/indexFilter.c +++ b/source/libs/index/src/indexFilter.c @@ -260,6 +260,7 @@ static int32_t sifExecFunction(SFunctionNode *node, SIFCtx *ctx, SIFParam *outpu return TSDB_CODE_QRY_INVALID_INPUT; } static int32_t sifDoIndex(SIFParam *left, SIFParam *right, int8_t operType, SIFParam *output) { +#ifdef USE_INVERTED_INDEX SIndexMetaArg *arg = &output->arg; SIndexTerm * tm = indexTermCreate(arg->suid, DEFAULT, left->colValType, left->colName, strlen(left->colName), right->condValue, strlen(right->condValue)); @@ -276,6 +277,9 @@ static int32_t sifDoIndex(SIFParam *left, SIFParam *right, int8_t operType, SIFP indexDebug("index filter data size: %d", (int)taosArrayGetSize(output->result)); indexMultiTermQueryDestroy(mtm); return ret; +#else + return 0; +#endif } static int32_t sifLessThanFunc(SIFParam *left, SIFParam *right, SIFParam *output) { diff --git a/source/libs/index/test/CMakeLists.txt b/source/libs/index/test/CMakeLists.txt index 0b97fefa3e..040460ae5c 100644 --- a/source/libs/index/test/CMakeLists.txt +++ b/source/libs/index/test/CMakeLists.txt @@ -94,7 +94,7 @@ target_link_libraries (idxJsonUT add_test( NAME idxtest - COMMAND indexTest + COMMAND idxTest ) add_test( NAME idxJsonUT