fix: return code issue

This commit is contained in:
dapan1121 2024-08-22 13:49:38 +08:00
parent e24517a799
commit c5291c2d1f
5 changed files with 38 additions and 26 deletions

View File

@ -66,7 +66,7 @@ typedef struct SMsortComparParam {
typedef struct SSortHandle SSortHandle;
typedef struct STupleHandle STupleHandle;
typedef SSDataBlock* (*_sort_fetch_block_fn_t)(void* param);
typedef int32_t (*_sort_fetch_block_fn_t)(void* param, SSDataBlock** ppBlock);
typedef int32_t (*_sort_merge_compar_fn_t)(const void* p1, const void* p2, void* param);
/**

View File

@ -65,10 +65,10 @@ static SSDataBlock* doNonSortMerge1(SOperatorInfo* pOperator);
static SSDataBlock* doColsMerge1(SOperatorInfo* pOperator);
static int32_t doColsMerge(SOperatorInfo* pOperator, SSDataBlock** pResBlock);
SSDataBlock* sortMergeloadNextDataBlock(void* param) {
int32_t sortMergeloadNextDataBlock(void* param, SSDataBlock** ppBlock) {
SOperatorInfo* pOperator = (SOperatorInfo*)param;
SSDataBlock* pBlock = pOperator->fpSet.getNextFn(pOperator);
return pBlock;
*ppBlock = pOperator->fpSet.getNextFn(pOperator);
return TSDB_CODE_SUCCESS;
}
int32_t openSortMergeOperator(SOperatorInfo* pOperator) {

View File

@ -5414,7 +5414,7 @@ static void doGetBlockForTableMergeScan(SOperatorInfo* pOperator, bool* pFinishe
return;
}
static SSDataBlock* getBlockForTableMergeScan(void* param) {
static int32_t getBlockForTableMergeScan(void* param, SSDataBlock** ppBlock) {
STableMergeScanSortSourceParam* source = param;
SOperatorInfo* pOperator = source->pOperator;
@ -5422,6 +5422,7 @@ static SSDataBlock* getBlockForTableMergeScan(void* param) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SSDataBlock* pBlock = NULL;
int64_t st = taosGetTimestampUs();
int32_t code = TSDB_CODE_SUCCESS;
while (true) {
if (pInfo->rtnNextDurationBlocks) {
@ -5456,10 +5457,11 @@ static SSDataBlock* getBlockForTableMergeScan(void* param) {
if (pInfo->bNextDurationBlockEvent || pInfo->bNewFilesetEvent) {
if (!bSkipped) {
int32_t code = createOneDataBlock(pBlock, true, &pInfo->nextDurationBlocks[pInfo->numNextDurationBlocks]);
code = createOneDataBlock(pBlock, true, &pInfo->nextDurationBlocks[pInfo->numNextDurationBlocks]);
if (code) {
terrno = code;
return NULL;
*ppBlock = NULL;
return code;
}
++pInfo->numNextDurationBlocks;
@ -5473,7 +5475,8 @@ static SSDataBlock* getBlockForTableMergeScan(void* param) {
if (pInfo->bNewFilesetEvent) {
pInfo->rtnNextDurationBlocks = true;
return NULL;
*ppBlock = NULL;
return code;
}
if (pInfo->bNextDurationBlockEvent) {
@ -5488,11 +5491,13 @@ static SSDataBlock* getBlockForTableMergeScan(void* param) {
pOperator->resultInfo.totalRows += pBlock->info.rows;
pInfo->base.readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0;
return pBlock;
*ppBlock = pBlock;
return code;
}
return NULL;
*ppBlock = NULL;
return code;
}
int32_t generateSortByTsPkInfo(SArray* colMatchInfo, int32_t order, SArray** ppSortArray) {

View File

@ -337,10 +337,10 @@ static int32_t getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock,
return code;
}
SSDataBlock* loadNextDataBlock(void* param) {
int32_t loadNextDataBlock(void* param, SSDataBlock** ppBlock) {
SOperatorInfo* pOperator = (SOperatorInfo*)param;
SSDataBlock* pBlock = pOperator->fpSet.getNextFn(pOperator);
return pBlock;
*ppBlock = pOperator->fpSet.getNextFn(pOperator);
return TSDB_CODE_SUCCESS;
}
// todo refactor: merged with fetch fp
@ -609,30 +609,32 @@ typedef struct SGroupSortSourceParam {
SGroupSortOperatorInfo* grpSortOpInfo;
} SGroupSortSourceParam;
SSDataBlock* fetchNextGroupSortDataBlock(void* param) {
int32_t fetchNextGroupSortDataBlock(void* param, SSDataBlock** ppBlock) {
*ppBlock = NULL;
SGroupSortSourceParam* source = param;
SGroupSortOperatorInfo* grpSortOpInfo = source->grpSortOpInfo;
if (grpSortOpInfo->prefetchedSortInput) {
SSDataBlock* block = grpSortOpInfo->prefetchedSortInput;
grpSortOpInfo->prefetchedSortInput = NULL;
return block;
*ppBlock = block;
} else {
SOperatorInfo* childOp = source->childOpInfo;
SSDataBlock* block = childOp->fpSet.getNextFn(childOp);
if (block != NULL) {
if (block->info.id.groupId == grpSortOpInfo->currGroupId) {
grpSortOpInfo->childOpStatus = CHILD_OP_SAME_GROUP;
return block;
*ppBlock = block;
} else {
grpSortOpInfo->childOpStatus = CHILD_OP_NEW_GROUP;
grpSortOpInfo->prefetchedSortInput = block;
return NULL;
}
} else {
grpSortOpInfo->childOpStatus = CHILD_OP_FINISHED;
return NULL;
}
}
return TSDB_CODE_SUCCESS;
}
int32_t beginSortGroup(SOperatorInfo* pOperator) {

View File

@ -619,7 +619,7 @@ static int32_t sortComparInit(SMsortComparParam* pParam, SArray* pSources, int32
for (int32_t i = 0; i < pParam->numOfSources; ++i) {
SSortSource* pSource = pParam->pSources[i];
pSource->src.pBlock = pHandle->fetchfp(pSource->param);
TAOS_CHECK_RETURN(pHandle->fetchfp(pSource->param, &pSource->src.pBlock));
// set current source is done
if (pSource->src.pBlock == NULL) {
@ -711,7 +711,7 @@ static int32_t adjustMergeTreeForNextTuple(SSortSource* pSource, SMultiwayMergeT
}
} else {
int64_t st = taosGetTimestampUs();
pSource->src.pBlock = pHandle->fetchfp(((SSortSource*)pSource)->param);
TAOS_CHECK_RETURN(pHandle->fetchfp(((SSortSource*)pSource)->param, &pSource->src.pBlock));
pSource->fetchUs += taosGetTimestampUs() - st;
pSource->fetchNum++;
if (pSource->src.pBlock == NULL) {
@ -2236,8 +2236,9 @@ static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) {
while (1) {
bool bExtractedBlock = false;
bool bSkipBlock = false;
SSDataBlock* pBlk = pHandle->fetchfp(pSrc->param);
SSDataBlock* pBlk = NULL;
TAOS_CHECK_RETURN(pHandle->fetchfp(pSrc->param, &pBlk));
if (pBlk != NULL && pHandle->mergeLimit > 0) {
SSDataBlock* p = NULL;
code = getRowsBlockWithinMergeLimit(pHandle, mTableNumRows, pBlk, &bExtractedBlock, &bSkipBlock, &p);
@ -2390,7 +2391,8 @@ static int32_t createBlocksQuickSortInitialSources(SSortHandle* pHandle) {
tsortClearOrderdSource(pHandle->pOrderedSource, NULL, NULL);
while (1) {
SSDataBlock* pBlock = pHandle->fetchfp(source->param);
SSDataBlock* pBlock = NULL;
TAOS_CHECK_RETURN(pHandle->fetchfp(source->param, &pBlock));
if (pBlock == NULL) {
break;
}
@ -2701,7 +2703,8 @@ static int32_t tsortOpenForPQSort(SSortHandle* pHandle) {
while (1) {
// fetch data
SSDataBlock* pBlock = pHandle->fetchfp(source->param);
SSDataBlock* pBlock = NULL;
TAOS_CHECK_RETURN(pHandle->fetchfp(source->param, &pBlock));
if (NULL == pBlock) {
break;
}
@ -2828,7 +2831,9 @@ static int32_t tsortSingleTableMergeNextTuple(SSortHandle* pHandle, STupleHandle
}
SSortSource* source = *pSource;
SSDataBlock* pBlock = pHandle->fetchfp(source->param);
SSDataBlock* pBlock = NULL;
TAOS_CHECK_RETURN(pHandle->fetchfp(source->param, &pBlock));
if (!pBlock || pBlock->info.rows == 0) {
setCurrentSourceDone(source, pHandle);
pHandle->tupleHandle.pBlock = NULL;