diff --git a/source/dnode/vnode/src/meta/metaQuery.c b/source/dnode/vnode/src/meta/metaQuery.c index 43e80b99d4..4aabd39800 100644 --- a/source/dnode/vnode/src/meta/metaQuery.c +++ b/source/dnode/vnode/src/meta/metaQuery.c @@ -153,7 +153,7 @@ bool metaIsTableExist(SMeta *pMeta, tb_uid_t uid) { int metaGetTableEntryByUid(SMetaReader *pReader, tb_uid_t uid) { SMeta *pMeta = pReader->pMeta; - int64_t version; + int64_t version1; // query uid.idx if (tdbTbGet(pMeta->pUidIdx, &uid, sizeof(uid), &pReader->pBuf, &pReader->szBuf) < 0) { @@ -161,8 +161,8 @@ int metaGetTableEntryByUid(SMetaReader *pReader, tb_uid_t uid) { return -1; } - version = ((SUidIdxVal *)pReader->pBuf)[0].version; - return metaGetTableEntryByVersion(pReader, version, uid); + version1 = ((SUidIdxVal *)pReader->pBuf)[0].version; + return metaGetTableEntryByVersion(pReader, version1, uid); } int metaGetTableEntryByName(SMetaReader *pReader, const char *name) { diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index dedf1630af..5051bedf8d 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -298,6 +298,12 @@ typedef struct { SExprSupp* pExprSup; // expr supporter of aggregate operator } SAggOptrPushDownInfo; +typedef struct STableMetaCacheInfo { + SLRUCache* pTableMetaEntryCache; // 100 by default + uint64_t metaFetch; + uint64_t cacheHit; +} STableMetaCacheInfo; + typedef struct STableScanInfo { STsdbReader* dataReader; SReadHandle readHandle; @@ -317,6 +323,7 @@ typedef struct STableScanInfo { int8_t scanMode; SAggOptrPushDownInfo pdInfo; int8_t assignBlockUid; + STableMetaCacheInfo metaCache; } STableScanInfo; typedef struct STableMergeScanInfo { @@ -325,7 +332,6 @@ typedef struct STableMergeScanInfo { int32_t tableEndIndex; bool hasGroupId; uint64_t groupId; - SArray* dataReaders; // array of tsdbReaderT* SArray* queryConds; // array of queryTableDataCond STsdbReader* pReader; SReadHandle readHandle; @@ -877,8 +883,8 @@ int32_t getBufferPgSize(int32_t rowSize, uint32_t* defaultPgsz, uint32_t* defaul void doSetOperatorCompleted(SOperatorInfo* pOperator); void doFilter(SSDataBlock* pBlock, SFilterInfo* pFilterInfo, SColMatchInfo* pColMatchInfo); -int32_t addTagPseudoColumnData(SReadHandle* pHandle, SExprInfo* pPseudoExpr, int32_t numOfPseudoExpr, - SSDataBlock* pBlock, int32_t rows, const char* idStr); +int32_t addTagPseudoColumnData(SReadHandle* pHandle, const SExprInfo* pExpr, int32_t numOfExpr, + SSDataBlock* pBlock, int32_t rows, const char* idStr, STableMetaCacheInfo * pCache); void cleanupAggSup(SAggSupporter* pAggSup); void appendOneRowToDataBlock(SSDataBlock* pBlock, STupleHandle* pTupleHandle); diff --git a/source/libs/executor/src/cachescanoperator.c b/source/libs/executor/src/cachescanoperator.c index 95d3c5cf23..76866eedb7 100644 --- a/source/libs/executor/src/cachescanoperator.c +++ b/source/libs/executor/src/cachescanoperator.c @@ -172,7 +172,7 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) { SExprSupp* pSup = &pInfo->pseudoExprSup; int32_t code = addTagPseudoColumnData(&pInfo->readHandle, pSup->pExprInfo, pSup->numOfExprs, pRes, - pRes->info.rows, GET_TASKID(pTaskInfo)); + pRes->info.rows, GET_TASKID(pTaskInfo), NULL); if (code != TSDB_CODE_SUCCESS) { pTaskInfo->code = code; return NULL; @@ -221,7 +221,7 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) { pInfo->pRes->info.uid = *(tb_uid_t*)taosArrayGet(pInfo->pUidList, 0); code = addTagPseudoColumnData(&pInfo->readHandle, pSup->pExprInfo, pSup->numOfExprs, pInfo->pRes, pInfo->pRes->info.rows, - GET_TASKID(pTaskInfo)); + GET_TASKID(pTaskInfo), NULL); if (code != TSDB_CODE_SUCCESS) { pTaskInfo->code = code; return NULL; diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 447bd801bd..ee935837d0 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -13,6 +13,7 @@ * along with this program. If not, see . */ +#include #include "executorimpl.h" #include "filter.h" #include "function.h" @@ -335,7 +336,7 @@ static void doSetTagColumnData(STableScanInfo* pTableScanInfo, SSDataBlock* pBlo SExprSupp* pSup = &pTableScanInfo->pseudoSup; int32_t code = addTagPseudoColumnData(&pTableScanInfo->readHandle, pSup->pExprInfo, pSup->numOfExprs, pBlock, rows, - GET_TASKID(pTaskInfo)); + GET_TASKID(pTaskInfo), &pTableScanInfo->metaCache); if (code != TSDB_CODE_SUCCESS) { T_LONG_JMP(pTaskInfo->env, code); } @@ -491,51 +492,128 @@ static void prepareForDescendingScan(STableScanInfo* pTableScanInfo, SqlFunction SET_REVERSE_SCAN_FLAG(pTableScanInfo); switchCtxOrder(pCtx, numOfOutput); - // setupQueryRangeForReverseScan(pTableScanInfo); - pTableScanInfo->cond.order = TSDB_ORDER_DESC; STimeWindow* pTWindow = &pTableScanInfo->cond.twindows; TSWAP(pTWindow->skey, pTWindow->ekey); } -int32_t addTagPseudoColumnData(SReadHandle* pHandle, SExprInfo* pPseudoExpr, int32_t numOfPseudoExpr, - SSDataBlock* pBlock, int32_t rows, const char* idStr) { +typedef struct STableCachedVal { + const char* pName; + STag* pTags; +} STableCachedVal; + +static void freeTableCachedVal(void* param) { + if (param == NULL) { + return; + } + + STableCachedVal* pVal = param; + taosMemoryFree((void*)pVal->pName); + taosMemoryFree(pVal->pTags); + taosMemoryFree(pVal); +} + +//const void *key, size_t keyLen, void *value +static void freeCachedMetaItem(const void *key, size_t keyLen, void *value) { + freeTableCachedVal(value); +} + +int32_t addTagPseudoColumnData(SReadHandle* pHandle, const SExprInfo* pExpr, int32_t numOfExpr, + SSDataBlock* pBlock, int32_t rows, const char* idStr, STableMetaCacheInfo* pCache) { // currently only the tbname pseudo column - if (numOfPseudoExpr <= 0) { + if (numOfExpr <= 0) { return TSDB_CODE_SUCCESS; } + int32_t code = 0; + // backup the rows int32_t backupRows = pBlock->info.rows; pBlock->info.rows = rows; - SMetaReader mr = {0}; - metaReaderInit(&mr, pHandle->meta, 0); - int32_t code = metaGetTableEntryByUid(&mr, pBlock->info.uid); - metaReaderReleaseLock(&mr); + bool freeReader = false; + STableCachedVal val = {0}; - if (code != TSDB_CODE_SUCCESS) { - qError("failed to get table meta, uid:0x%" PRIx64 ", code:%s, %s", pBlock->info.uid, tstrerror(terrno), idStr); - metaReaderClear(&mr); - return terrno; + SMetaReader mr = {0}; + LRUHandle* h = NULL; + + // 1. check if it is existed in meta cache + if (pCache == NULL) { + metaReaderInit(&mr, pHandle->meta, 0); + code = metaGetTableEntryByUid(&mr, pBlock->info.uid); + if (code != TSDB_CODE_SUCCESS) { + qError("failed to get table meta, uid:0x%" PRIx64 ", code:%s, %s", pBlock->info.uid, tstrerror(terrno), idStr); + metaReaderClear(&mr); + return terrno; + } + + metaReaderReleaseLock(&mr); + + val.pName = mr.me.name; + val.pTags = (STag*)mr.me.ctbEntry.pTags; + + freeReader = true; + } else { + pCache->metaFetch += 1; + + h = taosLRUCacheLookup(pCache->pTableMetaEntryCache, &pBlock->info.uid, sizeof(pBlock->info.uid)); + if (h == NULL) { + metaReaderInit(&mr, pHandle->meta, 0); + code = metaGetTableEntryByUid(&mr, pBlock->info.uid); + if (code != TSDB_CODE_SUCCESS) { + qError("failed to get table meta, uid:0x%" PRIx64 ", code:%s, %s", pBlock->info.uid, tstrerror(terrno), idStr); + metaReaderClear(&mr); + return terrno; + } + + metaReaderReleaseLock(&mr); + + STableCachedVal* pVal = taosMemoryMalloc(sizeof(STableCachedVal)); + pVal->pName = strdup(mr.me.name); + pVal->pTags = NULL; + + // only child table has tag value + if (mr.me.type == TSDB_CHILD_TABLE) { + STag* pTag = (STag*)mr.me.ctbEntry.pTags; + pVal->pTags = taosMemoryMalloc(pTag->len); + memcpy(pVal->pTags, mr.me.ctbEntry.pTags, pTag->len); + } + + val = *pVal; + freeReader = true; + + int32_t ret = taosLRUCacheInsert(pCache->pTableMetaEntryCache, &pBlock->info.uid, sizeof(uint64_t), pVal, sizeof(STableCachedVal), freeCachedMetaItem, NULL, TAOS_LRU_PRIORITY_LOW); + if (ret != TAOS_LRU_STATUS_OK) { + qError("failed to put meta into lru cache, code:%d, %s", ret, idStr); + freeTableCachedVal(pVal); + } + } else { + pCache->cacheHit += 1; + STableCachedVal* pVal = taosLRUCacheValue(pCache->pTableMetaEntryCache, h); + val = *pVal; + taosLRUCacheRelease(pCache->pTableMetaEntryCache, h, false); + } + + qDebug("retrieve table meta from cache:%"PRIu64", hit:%"PRIu64 " miss:%"PRIu64", %s", pCache->metaFetch, pCache->cacheHit, + (pCache->metaFetch - pCache->cacheHit), idStr); } - for (int32_t j = 0; j < numOfPseudoExpr; ++j) { - SExprInfo* pExpr = &pPseudoExpr[j]; - int32_t dstSlotId = pExpr->base.resSchema.slotId; + for (int32_t j = 0; j < numOfExpr; ++j) { + const SExprInfo* pExpr1 = &pExpr[j]; + int32_t dstSlotId = pExpr1->base.resSchema.slotId; SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, dstSlotId); colInfoDataCleanup(pColInfoData, pBlock->info.rows); - int32_t functionId = pExpr->pExpr->_function.functionId; + int32_t functionId = pExpr1->pExpr->_function.functionId; // this is to handle the tbname if (fmIsScanPseudoColumnFunc(functionId)) { - setTbNameColData(pBlock, pColInfoData, functionId, mr.me.name); + setTbNameColData(pBlock, pColInfoData, functionId, val.pName); } else { // these are tags STagVal tagVal = {0}; - tagVal.cid = pExpr->base.pParam[0].pCol->colId; - const char* p = metaGetTableTagVal(mr.me.ctbEntry.pTags, pColInfoData->info.type, &tagVal); + tagVal.cid = pExpr1->base.pParam[0].pCol->colId; + const char* p = metaGetTableTagVal(val.pTags, pColInfoData->info.type, &tagVal); char* data = NULL; if (pColInfoData->info.type != TSDB_DATA_TYPE_JSON && p != NULL) { @@ -560,10 +638,12 @@ int32_t addTagPseudoColumnData(SReadHandle* pHandle, SExprInfo* pPseudoExpr, int } } - metaReaderClear(&mr); - // restore the rows pBlock->info.rows = backupRows; + if (freeReader) { + metaReaderClear(&mr); + } + return TSDB_CODE_SUCCESS; } @@ -811,6 +891,7 @@ static void destroyTableScanOperatorInfo(void* param) { taosArrayDestroy(pTableScanInfo->matchInfo.pList); } + taosLRUCacheCleanup(pTableScanInfo->metaCache.pTableMetaEntryCache); cleanupExprSupp(&pTableScanInfo->pseudoSup); taosMemoryFreeClear(param); } @@ -874,6 +955,9 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, pOperator->exprSupp.numOfExprs = numOfCols; pOperator->pTaskInfo = pTaskInfo; + pInfo->metaCache.pTableMetaEntryCache = taosLRUCacheInit(1024*128, -1, .5); + taosLRUCacheSetStrictCapacity(pInfo->metaCache.pTableMetaEntryCache, false); + pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doTableScan, NULL, NULL, destroyTableScanOperatorInfo, getTableScannerExecInfo); @@ -1624,7 +1708,7 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock // currently only the tbname pseudo column if (pInfo->numOfPseudoExpr > 0) { int32_t code = addTagPseudoColumnData(&pInfo->readHandle, pInfo->pPseudoExpr, pInfo->numOfPseudoExpr, pInfo->pRes, - pInfo->pRes->info.rows, GET_TASKID(pTaskInfo)); + pInfo->pRes->info.rows, GET_TASKID(pTaskInfo), NULL); if (code != TSDB_CODE_SUCCESS) { blockDataFreeRes((SSDataBlock*)pBlock); T_LONG_JMP(pTaskInfo->env, code); @@ -4362,7 +4446,7 @@ static int32_t loadDataBlockFromOneTable(SOperatorInfo* pOperator, STableMergeSc SExprSupp* pSup = &pTableScanInfo->pseudoSup; int32_t code = addTagPseudoColumnData(&pTableScanInfo->readHandle, pSup->pExprInfo, pSup->numOfExprs, pBlock, - pBlock->info.rows, GET_TASKID(pTaskInfo)); + pBlock->info.rows, GET_TASKID(pTaskInfo), NULL); if (code != TSDB_CODE_SUCCESS) { T_LONG_JMP(pTaskInfo->env, code); } diff --git a/source/libs/executor/src/sortoperator.c b/source/libs/executor/src/sortoperator.c index 2f41ee1495..af87c4b2eb 100644 --- a/source/libs/executor/src/sortoperator.c +++ b/source/libs/executor/src/sortoperator.c @@ -482,24 +482,31 @@ SOperatorInfo* createGroupSortOperatorInfo(SOperatorInfo* downstream, SGroupSort SExecTaskInfo* pTaskInfo) { SGroupSortOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SGroupSortOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); - if (pInfo == NULL || pOperator == NULL /* || rowSize > 100 * 1024 * 1024*/) { + if (pInfo == NULL || pOperator == NULL) { goto _error; } + SExprSupp* pSup = &pOperator->exprSupp; SDataBlockDescNode* pDescNode = pSortPhyNode->node.pOutputDataBlockDesc; int32_t numOfCols = 0; - SSDataBlock* pResBlock = createResDataBlock(pDescNode); SExprInfo* pExprInfo = createExprInfo(pSortPhyNode->pExprs, NULL, &numOfCols); + pSup->pExprInfo = pExprInfo; + pSup->numOfExprs = numOfCols; + + initResultSizeInfo(&pOperator->resultInfo, 1024); + pOperator->exprSupp.pCtx = createSqlFunctionCtx(pExprInfo, numOfCols, &pOperator->exprSupp.rowEntryInfoOffset); + + pInfo->binfo.pRes = createResDataBlock(pDescNode); + blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity); + int32_t numOfOutputCols = 0; int32_t code = extractColMatchInfo(pSortPhyNode->pTargets, pDescNode, &numOfOutputCols, COL_MATCH_FROM_SLOT_ID, &pInfo->matchInfo); - - pOperator->exprSupp.pCtx = createSqlFunctionCtx(pExprInfo, numOfCols, &pOperator->exprSupp.rowEntryInfoOffset); - pInfo->binfo.pRes = pResBlock; - - initResultSizeInfo(&pOperator->resultInfo, 1024); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } pInfo->pSortInfo = createSortInfo(pSortPhyNode->pSortKeys); @@ -508,8 +515,6 @@ SOperatorInfo* createGroupSortOperatorInfo(SOperatorInfo* downstream, SGroupSort pOperator->blocking = false; pOperator->status = OP_NOT_OPENED; pOperator->info = pInfo; - pOperator->exprSupp.pExprInfo = pExprInfo; - pOperator->exprSupp.numOfExprs = numOfCols; pOperator->pTaskInfo = pTaskInfo; pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doGroupSort, NULL, NULL, destroyGroupSortOperatorInfo, @@ -523,8 +528,10 @@ SOperatorInfo* createGroupSortOperatorInfo(SOperatorInfo* downstream, SGroupSort return pOperator; _error: - pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY; - taosMemoryFree(pInfo); + pTaskInfo->code = code; + if (pInfo != NULL) { + destroyGroupSortOperatorInfo(pInfo); + } taosMemoryFree(pOperator); return NULL; }