diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 320dc22655..c3fabea442 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -298,6 +298,12 @@ typedef struct { SExprSupp* pExprSup; // expr supporter of aggregate operator } SAggOptrPushDownInfo; +typedef struct STableMetaCacheInfo { + SLRUCache* pTableMetaEntryCache; // 100 by default + uint64_t metaFetch; + uint64_t cacheHit; +} STableMetaCacheInfo; + typedef struct STableScanInfo { STsdbReader* dataReader; SReadHandle readHandle; @@ -318,7 +324,7 @@ typedef struct STableScanInfo { int8_t scanMode; SAggOptrPushDownInfo pdInfo; int8_t assignBlockUid; - SLRUCache* pTableMetaEntryCache; // 100 by default + STableMetaCacheInfo metaCache; } STableScanInfo; typedef struct STableMergeScanInfo { @@ -327,7 +333,6 @@ typedef struct STableMergeScanInfo { int32_t tableEndIndex; bool hasGroupId; uint64_t groupId; - SArray* dataReaders; // array of tsdbReaderT* SArray* queryConds; // array of queryTableDataCond STsdbReader* pReader; SReadHandle readHandle; @@ -898,7 +903,7 @@ int32_t getBufferPgSize(int32_t rowSize, uint32_t* defaultPgsz, uint32_t* defaul void doSetOperatorCompleted(SOperatorInfo* pOperator); void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock, SColMatchInfo* pColMatchInfo, SFilterInfo* pFilterInfo); int32_t addTagPseudoColumnData(SReadHandle* pHandle, const SExprInfo* pExpr, int32_t numOfExpr, - SSDataBlock* pBlock, int32_t rows, const char* idStr, SLRUCache* pCache); + SSDataBlock* pBlock, int32_t rows, const char* idStr, STableMetaCacheInfo * pCache); void cleanupAggSup(SAggSupporter* pAggSup); void appendOneRowToDataBlock(SSDataBlock* pBlock, STupleHandle* pTupleHandle); diff --git a/source/libs/executor/src/cachescanoperator.c b/source/libs/executor/src/cachescanoperator.c index 95d3c5cf23..76866eedb7 100644 --- a/source/libs/executor/src/cachescanoperator.c +++ b/source/libs/executor/src/cachescanoperator.c @@ -172,7 +172,7 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) { SExprSupp* pSup = &pInfo->pseudoExprSup; int32_t code = addTagPseudoColumnData(&pInfo->readHandle, pSup->pExprInfo, pSup->numOfExprs, pRes, - pRes->info.rows, GET_TASKID(pTaskInfo)); + pRes->info.rows, GET_TASKID(pTaskInfo), NULL); if (code != TSDB_CODE_SUCCESS) { pTaskInfo->code = code; return NULL; @@ -221,7 +221,7 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) { pInfo->pRes->info.uid = *(tb_uid_t*)taosArrayGet(pInfo->pUidList, 0); code = addTagPseudoColumnData(&pInfo->readHandle, pSup->pExprInfo, pSup->numOfExprs, pInfo->pRes, pInfo->pRes->info.rows, - GET_TASKID(pTaskInfo)); + GET_TASKID(pTaskInfo), NULL); if (code != TSDB_CODE_SUCCESS) { pTaskInfo->code = code; return NULL; diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 269f1bff5b..c049aac5b8 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -13,6 +13,7 @@ * along with this program. If not, see . */ +#include #include "executorimpl.h" #include "filter.h" #include "function.h" @@ -339,7 +340,7 @@ static void doSetTagColumnData(STableScanInfo* pTableScanInfo, SSDataBlock* pBlo SExprSupp* pSup = &pTableScanInfo->pseudoSup; int32_t code = addTagPseudoColumnData(&pTableScanInfo->readHandle, pSup->pExprInfo, pSup->numOfExprs, pBlock, rows, - GET_TASKID(pTaskInfo)); + GET_TASKID(pTaskInfo), &pTableScanInfo->metaCache); if (code != TSDB_CODE_SUCCESS) { T_LONG_JMP(pTaskInfo->env, code); } @@ -505,8 +506,25 @@ typedef struct STableCachedVal { STag* pTags; } STableCachedVal; +static void freeTableCachedVal(void* param) { + if (param == NULL) { + return; + } + + STableCachedVal* pVal = param; + taosMemoryFree((void*)pVal->pName); + taosMemoryFree(pVal->pTags); + taosMemoryFree(pVal); +} + +//const void *key, size_t keyLen, void *value +static void freeCachedMetaItem(const void *key, size_t keyLen, void *value) { + taosMemoryFree((void*)key); + freeTableCachedVal(value); +} + int32_t addTagPseudoColumnData(SReadHandle* pHandle, const SExprInfo* pExpr, int32_t numOfExpr, - SSDataBlock* pBlock, int32_t rows, const char* idStr, SLRUCache* pCache) { + SSDataBlock* pBlock, int32_t rows, const char* idStr, STableMetaCacheInfo* pCache) { // currently only the tbname pseudo column if (numOfExpr <= 0) { return TSDB_CODE_SUCCESS; @@ -518,13 +536,13 @@ int32_t addTagPseudoColumnData(SReadHandle* pHandle, const SExprInfo* pExpr, int int32_t backupRows = pBlock->info.rows; pBlock->info.rows = rows; + bool freeReader = false; STableCachedVal val = {0}; - bool needFreeReader = true; + + SMetaReader mr = {0}; // 1. check if it is existed in meta cache - LRUHandle *h = taosLRUCacheLookup(pCache, &pBlock->info.uid, sizeof(pBlock->info.uid)); - if (h == NULL) { - SMetaReader mr = {0}; + if (pCache == NULL) { metaReaderInit(&mr, pHandle->meta, 0); code = metaGetTableEntryByUid(&mr, pBlock->info.uid); if (code != TSDB_CODE_SUCCESS) { @@ -534,11 +552,46 @@ int32_t addTagPseudoColumnData(SReadHandle* pHandle, const SExprInfo* pExpr, int } metaReaderReleaseLock(&mr); + val.pName = mr.me.name; val.pTags = (STag*)mr.me.ctbEntry.pTags; + + freeReader = true; } else { - STableCachedVal* pVal = taosLRUCacheValue(pCache, h); - val = *pVal; + pCache->metaFetch += 1; + + LRUHandle* h = taosLRUCacheLookup(pCache->pTableMetaEntryCache, &pBlock->info.uid, sizeof(pBlock->info.uid)); + if (h == NULL) { + metaReaderInit(&mr, pHandle->meta, 0); + code = metaGetTableEntryByUid(&mr, pBlock->info.uid); + if (code != TSDB_CODE_SUCCESS) { + qError("failed to get table meta, uid:0x%" PRIx64 ", code:%s, %s", pBlock->info.uid, tstrerror(terrno), idStr); + metaReaderClear(&mr); + return terrno; + } + + metaReaderReleaseLock(&mr); + + STableCachedVal* pVal = taosMemoryMalloc(sizeof(STableCachedVal)); + pVal->pName = strdup(mr.me.name); + STag* pTag = (STag*)mr.me.ctbEntry.pTags; + + pVal->pTags = taosMemoryMalloc(pTag->len); + memcpy(pVal->pTags, mr.me.ctbEntry.pTags, pTag->len); + + freeReader = true; + int32_t ret = taosLRUCacheInsert(pCache->pTableMetaEntryCache, &pBlock->info.uid, sizeof(uint64_t), pVal, sizeof(STableCachedVal), freeCachedMetaItem, NULL, TAOS_LRU_PRIORITY_LOW); + if (ret != TAOS_LRU_STATUS_OK) { + qError("failed to put meta into lru cache, code:%d, %s", ret, idStr); + freeTableCachedVal(pVal); + } else { + val = *pVal; + } + } else { + pCache->cacheHit += 1; + STableCachedVal* pVal = taosLRUCacheValue(pCache->pTableMetaEntryCache, h); + val = *pVal; + } } for (int32_t j = 0; j < numOfExpr; ++j) { @@ -581,10 +634,12 @@ int32_t addTagPseudoColumnData(SReadHandle* pHandle, const SExprInfo* pExpr, int } } - metaReaderClear(&mr); - // restore the rows pBlock->info.rows = backupRows; + if (freeReader) { + metaReaderClear(&mr); + } + return TSDB_CODE_SUCCESS; } @@ -832,6 +887,7 @@ static void destroyTableScanOperatorInfo(void* param) { taosArrayDestroy(pTableScanInfo->matchInfo.pList); } + taosLRUCacheCleanup(pTableScanInfo->metaCache.pTableMetaEntryCache); cleanupExprSupp(&pTableScanInfo->pseudoSup); taosMemoryFreeClear(param); } @@ -898,7 +954,7 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, pOperator->exprSupp.numOfExprs = numOfCols; pOperator->pTaskInfo = pTaskInfo; - pInfo->pTableMetaEntryCache = taosLRUCacheInit(100, -1, .5); + pInfo->metaCache.pTableMetaEntryCache = taosLRUCacheInit(100, -1, .5); pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doTableScan, NULL, NULL, destroyTableScanOperatorInfo, getTableScannerExecInfo); @@ -1650,7 +1706,7 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock // currently only the tbname pseudo column if (pInfo->numOfPseudoExpr > 0) { int32_t code = addTagPseudoColumnData(&pInfo->readHandle, pInfo->pPseudoExpr, pInfo->numOfPseudoExpr, pInfo->pRes, - pInfo->pRes->info.rows, GET_TASKID(pTaskInfo)); + pInfo->pRes->info.rows, GET_TASKID(pTaskInfo), NULL); if (code != TSDB_CODE_SUCCESS) { blockDataFreeRes((SSDataBlock*)pBlock); T_LONG_JMP(pTaskInfo->env, code); @@ -4380,7 +4436,7 @@ static int32_t loadDataBlockFromOneTable(SOperatorInfo* pOperator, STableMergeSc SExprSupp* pSup = &pTableScanInfo->pseudoSup; int32_t code = addTagPseudoColumnData(&pTableScanInfo->readHandle, pSup->pExprInfo, pSup->numOfExprs, pBlock, - pBlock->info.rows, GET_TASKID(pTaskInfo)); + pBlock->info.rows, GET_TASKID(pTaskInfo), NULL); if (code != TSDB_CODE_SUCCESS) { T_LONG_JMP(pTaskInfo->env, code); }