From 7361d95516dbad5c8e84f7420b92586290cd4fac Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao> Date: Wed, 28 Aug 2024 14:07:55 +0800 Subject: [PATCH] adj error code --- .../executor/src/streamtimesliceoperator.c | 33 ++++++++++++------- 1 file changed, 21 insertions(+), 12 deletions(-) diff --git a/source/libs/executor/src/streamtimesliceoperator.c b/source/libs/executor/src/streamtimesliceoperator.c index 0607250ca4..80cf2d8d00 100644 --- a/source/libs/executor/src/streamtimesliceoperator.c +++ b/source/libs/executor/src/streamtimesliceoperator.c @@ -1244,8 +1244,9 @@ static void doStreamTimeSliceImpl(SOperatorInfo* pOperator, SSDataBlock* pBlock) if (right) { transBlockToResultRow(pBlock, startPos, tsCols[startPos], curPoint.pRightRow); bool needDel = pInfo->destHasPrimaryKey && winCode == TSDB_CODE_SUCCESS; - saveTimeSliceWinResultInfo(pAggSup, pInfo->twAggSup.calTrigger, &curPoint.key, pInfo->pUpdatedMap, needDel, - pInfo->pDeletedMap); + code = saveTimeSliceWinResultInfo(pAggSup, pInfo->twAggSup.calTrigger, &curPoint.key, pInfo->pUpdatedMap, needDel, + pInfo->pDeletedMap); + QUERY_CHECK_CODE(code, lino, _end); } releaseOutputBuf(pAggSup->pState, curPoint.pResPos, &pAggSup->stateStore); @@ -1259,8 +1260,9 @@ static void doStreamTimeSliceImpl(SOperatorInfo* pOperator, SSDataBlock* pBlock) if (left) { transBlockToResultRow(pBlock, leftRowId, tsCols[leftRowId], nextPoint.pLeftRow); bool needDel = pInfo->destHasPrimaryKey && winCode == TSDB_CODE_SUCCESS; - saveTimeSliceWinResultInfo(pAggSup, pInfo->twAggSup.calTrigger, &nextPoint.key, pInfo->pUpdatedMap, needDel, - pInfo->pDeletedMap); + code = saveTimeSliceWinResultInfo(pAggSup, pInfo->twAggSup.calTrigger, &nextPoint.key, pInfo->pUpdatedMap, + needDel, pInfo->pDeletedMap); + QUERY_CHECK_CODE(code, lino, _end); } releaseOutputBuf(pAggSup->pState, nextPoint.pResPos, &pAggSup->stateStore); @@ -1280,8 +1282,9 @@ static void doStreamTimeSliceImpl(SOperatorInfo* pOperator, SSDataBlock* pBlock) if (right) { transBlockToResultRow(pBlock, startPos, tsCols[startPos], curPoint.pRightRow); bool needDel = pInfo->destHasPrimaryKey && winCode == TSDB_CODE_SUCCESS; - saveTimeSliceWinResultInfo(pAggSup, pInfo->twAggSup.calTrigger, &curPoint.key, pInfo->pUpdatedMap, needDel, - pInfo->pDeletedMap); + code = saveTimeSliceWinResultInfo(pAggSup, pInfo->twAggSup.calTrigger, &curPoint.key, pInfo->pUpdatedMap, needDel, + pInfo->pDeletedMap); + QUERY_CHECK_CODE(code, lino, _end); } releaseOutputBuf(pAggSup->pState, curPoint.pResPos, &pAggSup->stateStore); } @@ -1563,7 +1566,8 @@ static int32_t doStreamTimeSliceNext(SOperatorInfo* pOperator, SSDataBlock** ppR case STREAM_DELETE_DATA: { code = doDeleteTimeSliceResult(pAggSup, pBlock, pInfo->pUpdatedMap, pInfo->pDelWins); QUERY_CHECK_CODE(code, lino, _end); - copyDataBlock(pInfo->pDelRes, pBlock); + code = copyDataBlock(pInfo->pDelRes, pBlock); + QUERY_CHECK_CODE(code, lino, _end); pInfo->pDelRes->info.type = STREAM_DELETE_RESULT; (*ppRes) = pInfo->pDelRes; printDataBlock((*ppRes), getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo)); @@ -1573,7 +1577,8 @@ static int32_t doStreamTimeSliceNext(SOperatorInfo* pOperator, SSDataBlock** ppR case STREAM_INVALID: { SExprSupp* pExprSup = &pInfo->scalarSup; if (pExprSup->pExprInfo != NULL) { - projectApplyFunctions(pExprSup->pExprInfo, pBlock, pBlock, pExprSup->pCtx, pExprSup->numOfExprs, NULL); + code = projectApplyFunctions(pExprSup->pExprInfo, pBlock, pBlock, pExprSup->pCtx, pExprSup->numOfExprs, NULL); + QUERY_CHECK_CODE(code, lino, _end); } } break; case STREAM_CHECKPOINT: { @@ -1581,7 +1586,8 @@ static int32_t doStreamTimeSliceNext(SOperatorInfo* pOperator, SSDataBlock** ppR pAggSup->stateStore.streamStateCommit(pAggSup->pState); doStreamTimeSliceSaveCheckpoint(pOperator); pInfo->recvCkBlock = true; - copyDataBlock(pInfo->pCheckpointRes, pBlock); + code = copyDataBlock(pInfo->pCheckpointRes, pBlock); + QUERY_CHECK_CODE(code, lino, _end); continue; } break; case STREAM_CREATE_CHILD_TABLE: { @@ -1589,7 +1595,8 @@ static int32_t doStreamTimeSliceNext(SOperatorInfo* pOperator, SSDataBlock** ppR goto _end; } break; case STREAM_GET_RESULT: { - setAllResultKey(pAggSup, pBlock->info.window.skey, pInfo->pUpdatedMap); + code = setAllResultKey(pAggSup, pBlock->info.window.skey, pInfo->pUpdatedMap); + QUERY_CHECK_CODE(code, lino, _end); continue; } default: @@ -1601,7 +1608,8 @@ static int32_t doStreamTimeSliceNext(SOperatorInfo* pOperator, SSDataBlock** ppR } if (pInfo->destHasPrimaryKey) { - copyIntervalDeleteKey(pInfo->pDeletedMap, pInfo->pDelWins); + code = copyIntervalDeleteKey(pInfo->pDeletedMap, pInfo->pDelWins); + QUERY_CHECK_CODE(code, lino, _end); } void* pIte = NULL; @@ -1801,7 +1809,8 @@ int32_t createStreamTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode* &pTaskInfo->storageAPI, pInfo->primaryTsIndex, STREAM_STATE_BUFF_HASH_SORT, ratio); QUERY_CHECK_CODE(code, lino, _error); - initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window); + code = initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window); + QUERY_CHECK_CODE(code, lino, _error); pInfo->pRes = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc); pInfo->delIndex = 0;