diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 62146b6048..e70cf37c63 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -254,17 +254,18 @@ typedef struct SExchangeInfo { SArray* pSourceDataInfo; tsem_t ready; void* pTransporter; + // SArray, result block list, used to keep the multi-block that // passed by downstream operator - SArray* pResultBlockList; - int32_t rspBlockIndex; // indicate the return block index in pResultBlockList + SArray* pReadyBlocks; + SArray* pRecycledBlocks;// build a pool for small data block to avoid to repeatly create and then destroy. SSDataBlock* pDummyBlock; // dummy block, not keep data bool seqLoadData; // sequential load data or not, false by default int32_t current; SLoadRemoteDataInfo loadInfo; uint64_t self; SLimitInfo limitInfo; - int64_t openedTs; // start exec time stamp + int64_t openedTs; // start exec time stamp, todo: move to SLoadRemoteDataInfo } SExchangeInfo; typedef struct SScanInfo { diff --git a/source/libs/executor/src/cachescanoperator.c b/source/libs/executor/src/cachescanoperator.c index 5707c7cde0..bbd46502ce 100644 --- a/source/libs/executor/src/cachescanoperator.c +++ b/source/libs/executor/src/cachescanoperator.c @@ -74,9 +74,8 @@ SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SRe STableKeyInfo* pList = tableListGetInfo(pTableList, 0); - size_t num = tableListGetSize(pTableList); uint64_t suid = tableListGetSuid(pTableList); - code = tsdbCacherowsReaderOpen(pInfo->readHandle.vnode, pInfo->retrieveType, pList, num, + code = tsdbCacherowsReaderOpen(pInfo->readHandle.vnode, pInfo->retrieveType, pList, totalTables, taosArrayGetSize(pInfo->matchInfo.pList), suid, &pInfo->pLastrowReader); if (code != TSDB_CODE_SUCCESS) { goto _error; diff --git a/source/libs/executor/src/exchangeoperator.c b/source/libs/executor/src/exchangeoperator.c index 049de727df..4151018636 100644 --- a/source/libs/executor/src/exchangeoperator.c +++ b/source/libs/executor/src/exchangeoperator.c @@ -100,14 +100,23 @@ static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeIn int32_t index = 0; char* pStart = pRetrieveRsp->data; while (index++ < pRetrieveRsp->numOfBlocks) { - SSDataBlock* pb = createOneDataBlock(pExchangeInfo->pDummyBlock, false); + SSDataBlock* pb = NULL; + + void* p = taosArrayPop(pExchangeInfo->pRecycledBlocks); + if (p != NULL) { + pb = *(SSDataBlock**) p; + blockDataCleanup(pb); + } else { + pb = createOneDataBlock(pExchangeInfo->pDummyBlock, false); + } + code = extractDataBlockFromFetchRsp(pb, pStart, NULL, &pStart); if (code != 0) { taosMemoryFreeClear(pDataInfo->pRsp); goto _error; } - taosArrayPush(pExchangeInfo->pResultBlockList, &pb); + taosArrayPush(pExchangeInfo->pReadyBlocks, &pb); } updateLoadRemoteInfo(pLoadInfo, pRetrieveRsp->numOfRows, pRetrieveRsp->compLen, pExchangeInfo->openedTs, pOperator); @@ -170,23 +179,26 @@ static SSDataBlock* doLoadRemoteDataImpl(SOperatorInfo* pOperator) { return NULL; } - size_t size = taosArrayGetSize(pExchangeInfo->pResultBlockList); - if (size == 0 || pExchangeInfo->rspBlockIndex >= size) { - pExchangeInfo->rspBlockIndex = 0; - taosArrayClearEx(pExchangeInfo->pResultBlockList, freeBlock); + // we have buffered retrieved datablock, return it directly + SSDataBlock** p = taosArrayPop(pExchangeInfo->pReadyBlocks); + if (p != NULL) { + taosArrayPush(pExchangeInfo->pRecycledBlocks, p); + return *p; + } else { if (pExchangeInfo->seqLoadData) { seqLoadRemoteData(pOperator); } else { concurrentlyLoadRemoteDataImpl(pOperator, pExchangeInfo, pTaskInfo); } - if (taosArrayGetSize(pExchangeInfo->pResultBlockList) == 0) { + if (taosArrayGetSize(pExchangeInfo->pReadyBlocks) == 0) { return NULL; + } else { + p = taosArrayPop(pExchangeInfo->pReadyBlocks); + taosArrayPush(pExchangeInfo->pRecycledBlocks, p); + return *p; } } - - // we have buffered retrieved datablock, return it directly - return taosArrayGetP(pExchangeInfo->pResultBlockList, pExchangeInfo->rspBlockIndex++); } static SSDataBlock* doLoadRemoteData(SOperatorInfo* pOperator) { @@ -284,7 +296,8 @@ SOperatorInfo* createExchangeOperatorInfo(void* pTransporter, SExchangePhysiNode tsem_init(&pInfo->ready, 0, 0); pInfo->pDummyBlock = createResDataBlock(pExNode->node.pOutputDataBlockDesc); - pInfo->pResultBlockList = taosArrayInit(1, POINTER_BYTES); + pInfo->pReadyBlocks = taosArrayInit(64, POINTER_BYTES); + pInfo->pRecycledBlocks = taosArrayInit(64, POINTER_BYTES); pInfo->seqLoadData = false; pInfo->pTransporter = pTransporter; @@ -326,11 +339,9 @@ void doDestroyExchangeOperatorInfo(void* param) { taosArrayDestroy(pExInfo->pSources); taosArrayDestroyEx(pExInfo->pSourceDataInfo, freeSourceDataInfo); - if (pExInfo->pResultBlockList != NULL) { - taosArrayDestroyEx(pExInfo->pResultBlockList, freeBlock); - pExInfo->pResultBlockList = NULL; - } - + taosArrayDestroyEx(pExInfo->pReadyBlocks, freeBlock); + taosArrayDestroyEx(pExInfo->pRecycledBlocks, freeBlock); + blockDataDestroy(pExInfo->pDummyBlock); tsem_destroy(&pExInfo->ready);