From c5291c2d1f816bfa3092cfea83fc8b567a4946c8 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Thu, 22 Aug 2024 13:49:38 +0800 Subject: [PATCH] fix: return code issue --- source/libs/executor/inc/tsort.h | 2 +- source/libs/executor/src/mergeoperator.c | 6 +++--- source/libs/executor/src/scanoperator.c | 19 ++++++++++++------- source/libs/executor/src/sortoperator.c | 18 ++++++++++-------- source/libs/executor/src/tsort.c | 19 ++++++++++++------- 5 files changed, 38 insertions(+), 26 deletions(-) diff --git a/source/libs/executor/inc/tsort.h b/source/libs/executor/inc/tsort.h index 474f3eedbf..6f6384af5b 100644 --- a/source/libs/executor/inc/tsort.h +++ b/source/libs/executor/inc/tsort.h @@ -66,7 +66,7 @@ typedef struct SMsortComparParam { typedef struct SSortHandle SSortHandle; typedef struct STupleHandle STupleHandle; -typedef SSDataBlock* (*_sort_fetch_block_fn_t)(void* param); +typedef int32_t (*_sort_fetch_block_fn_t)(void* param, SSDataBlock** ppBlock); typedef int32_t (*_sort_merge_compar_fn_t)(const void* p1, const void* p2, void* param); /** diff --git a/source/libs/executor/src/mergeoperator.c b/source/libs/executor/src/mergeoperator.c index 8bc7c2db50..3f85324f57 100644 --- a/source/libs/executor/src/mergeoperator.c +++ b/source/libs/executor/src/mergeoperator.c @@ -65,10 +65,10 @@ static SSDataBlock* doNonSortMerge1(SOperatorInfo* pOperator); static SSDataBlock* doColsMerge1(SOperatorInfo* pOperator); static int32_t doColsMerge(SOperatorInfo* pOperator, SSDataBlock** pResBlock); -SSDataBlock* sortMergeloadNextDataBlock(void* param) { +int32_t sortMergeloadNextDataBlock(void* param, SSDataBlock** ppBlock) { SOperatorInfo* pOperator = (SOperatorInfo*)param; - SSDataBlock* pBlock = pOperator->fpSet.getNextFn(pOperator); - return pBlock; + *ppBlock = pOperator->fpSet.getNextFn(pOperator); + return TSDB_CODE_SUCCESS; } int32_t openSortMergeOperator(SOperatorInfo* pOperator) { diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 1a6bb8a5cc..b7b5e4fff9 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -5414,7 +5414,7 @@ static void doGetBlockForTableMergeScan(SOperatorInfo* pOperator, bool* pFinishe return; } -static SSDataBlock* getBlockForTableMergeScan(void* param) { +static int32_t getBlockForTableMergeScan(void* param, SSDataBlock** ppBlock) { STableMergeScanSortSourceParam* source = param; SOperatorInfo* pOperator = source->pOperator; @@ -5422,6 +5422,7 @@ static SSDataBlock* getBlockForTableMergeScan(void* param) { SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SSDataBlock* pBlock = NULL; int64_t st = taosGetTimestampUs(); + int32_t code = TSDB_CODE_SUCCESS; while (true) { if (pInfo->rtnNextDurationBlocks) { @@ -5456,10 +5457,11 @@ static SSDataBlock* getBlockForTableMergeScan(void* param) { if (pInfo->bNextDurationBlockEvent || pInfo->bNewFilesetEvent) { if (!bSkipped) { - int32_t code = createOneDataBlock(pBlock, true, &pInfo->nextDurationBlocks[pInfo->numNextDurationBlocks]); + code = createOneDataBlock(pBlock, true, &pInfo->nextDurationBlocks[pInfo->numNextDurationBlocks]); if (code) { terrno = code; - return NULL; + *ppBlock = NULL; + return code; } ++pInfo->numNextDurationBlocks; @@ -5473,7 +5475,8 @@ static SSDataBlock* getBlockForTableMergeScan(void* param) { if (pInfo->bNewFilesetEvent) { pInfo->rtnNextDurationBlocks = true; - return NULL; + *ppBlock = NULL; + return code; } if (pInfo->bNextDurationBlockEvent) { @@ -5488,11 +5491,13 @@ static SSDataBlock* getBlockForTableMergeScan(void* param) { pOperator->resultInfo.totalRows += pBlock->info.rows; pInfo->base.readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0; - - return pBlock; + *ppBlock = pBlock; + + return code; } - return NULL; + *ppBlock = NULL; + return code; } int32_t generateSortByTsPkInfo(SArray* colMatchInfo, int32_t order, SArray** ppSortArray) { diff --git a/source/libs/executor/src/sortoperator.c b/source/libs/executor/src/sortoperator.c index cb15c3c836..e23988d357 100644 --- a/source/libs/executor/src/sortoperator.c +++ b/source/libs/executor/src/sortoperator.c @@ -337,10 +337,10 @@ static int32_t getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, return code; } -SSDataBlock* loadNextDataBlock(void* param) { +int32_t loadNextDataBlock(void* param, SSDataBlock** ppBlock) { SOperatorInfo* pOperator = (SOperatorInfo*)param; - SSDataBlock* pBlock = pOperator->fpSet.getNextFn(pOperator); - return pBlock; + *ppBlock = pOperator->fpSet.getNextFn(pOperator); + return TSDB_CODE_SUCCESS; } // todo refactor: merged with fetch fp @@ -609,30 +609,32 @@ typedef struct SGroupSortSourceParam { SGroupSortOperatorInfo* grpSortOpInfo; } SGroupSortSourceParam; -SSDataBlock* fetchNextGroupSortDataBlock(void* param) { +int32_t fetchNextGroupSortDataBlock(void* param, SSDataBlock** ppBlock) { + *ppBlock = NULL; + SGroupSortSourceParam* source = param; SGroupSortOperatorInfo* grpSortOpInfo = source->grpSortOpInfo; if (grpSortOpInfo->prefetchedSortInput) { SSDataBlock* block = grpSortOpInfo->prefetchedSortInput; grpSortOpInfo->prefetchedSortInput = NULL; - return block; + *ppBlock = block; } else { SOperatorInfo* childOp = source->childOpInfo; SSDataBlock* block = childOp->fpSet.getNextFn(childOp); if (block != NULL) { if (block->info.id.groupId == grpSortOpInfo->currGroupId) { grpSortOpInfo->childOpStatus = CHILD_OP_SAME_GROUP; - return block; + *ppBlock = block; } else { grpSortOpInfo->childOpStatus = CHILD_OP_NEW_GROUP; grpSortOpInfo->prefetchedSortInput = block; - return NULL; } } else { grpSortOpInfo->childOpStatus = CHILD_OP_FINISHED; - return NULL; } } + + return TSDB_CODE_SUCCESS; } int32_t beginSortGroup(SOperatorInfo* pOperator) { diff --git a/source/libs/executor/src/tsort.c b/source/libs/executor/src/tsort.c index 5bd11489c9..61cbbab72e 100644 --- a/source/libs/executor/src/tsort.c +++ b/source/libs/executor/src/tsort.c @@ -619,7 +619,7 @@ static int32_t sortComparInit(SMsortComparParam* pParam, SArray* pSources, int32 for (int32_t i = 0; i < pParam->numOfSources; ++i) { SSortSource* pSource = pParam->pSources[i]; - pSource->src.pBlock = pHandle->fetchfp(pSource->param); + TAOS_CHECK_RETURN(pHandle->fetchfp(pSource->param, &pSource->src.pBlock)); // set current source is done if (pSource->src.pBlock == NULL) { @@ -711,7 +711,7 @@ static int32_t adjustMergeTreeForNextTuple(SSortSource* pSource, SMultiwayMergeT } } else { int64_t st = taosGetTimestampUs(); - pSource->src.pBlock = pHandle->fetchfp(((SSortSource*)pSource)->param); + TAOS_CHECK_RETURN(pHandle->fetchfp(((SSortSource*)pSource)->param, &pSource->src.pBlock)); pSource->fetchUs += taosGetTimestampUs() - st; pSource->fetchNum++; if (pSource->src.pBlock == NULL) { @@ -2236,8 +2236,9 @@ static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) { while (1) { bool bExtractedBlock = false; bool bSkipBlock = false; - - SSDataBlock* pBlk = pHandle->fetchfp(pSrc->param); + SSDataBlock* pBlk = NULL; + + TAOS_CHECK_RETURN(pHandle->fetchfp(pSrc->param, &pBlk)); if (pBlk != NULL && pHandle->mergeLimit > 0) { SSDataBlock* p = NULL; code = getRowsBlockWithinMergeLimit(pHandle, mTableNumRows, pBlk, &bExtractedBlock, &bSkipBlock, &p); @@ -2390,7 +2391,8 @@ static int32_t createBlocksQuickSortInitialSources(SSortHandle* pHandle) { tsortClearOrderdSource(pHandle->pOrderedSource, NULL, NULL); while (1) { - SSDataBlock* pBlock = pHandle->fetchfp(source->param); + SSDataBlock* pBlock = NULL; + TAOS_CHECK_RETURN(pHandle->fetchfp(source->param, &pBlock)); if (pBlock == NULL) { break; } @@ -2701,7 +2703,8 @@ static int32_t tsortOpenForPQSort(SSortHandle* pHandle) { while (1) { // fetch data - SSDataBlock* pBlock = pHandle->fetchfp(source->param); + SSDataBlock* pBlock = NULL; + TAOS_CHECK_RETURN(pHandle->fetchfp(source->param, &pBlock)); if (NULL == pBlock) { break; } @@ -2828,7 +2831,9 @@ static int32_t tsortSingleTableMergeNextTuple(SSortHandle* pHandle, STupleHandle } SSortSource* source = *pSource; - SSDataBlock* pBlock = pHandle->fetchfp(source->param); + SSDataBlock* pBlock = NULL; + TAOS_CHECK_RETURN(pHandle->fetchfp(source->param, &pBlock)); + if (!pBlock || pBlock->info.rows == 0) { setCurrentSourceDone(source, pHandle); pHandle->tupleHandle.pBlock = NULL;