adj error code

This commit is contained in:
54liuyao 2024-08-28 14:07:55 +08:00
parent ff1f75bd33
commit 7361d95516
1 changed files with 21 additions and 12 deletions

View File

@ -1244,8 +1244,9 @@ static void doStreamTimeSliceImpl(SOperatorInfo* pOperator, SSDataBlock* pBlock)
if (right) {
transBlockToResultRow(pBlock, startPos, tsCols[startPos], curPoint.pRightRow);
bool needDel = pInfo->destHasPrimaryKey && winCode == TSDB_CODE_SUCCESS;
saveTimeSliceWinResultInfo(pAggSup, pInfo->twAggSup.calTrigger, &curPoint.key, pInfo->pUpdatedMap, needDel,
pInfo->pDeletedMap);
code = saveTimeSliceWinResultInfo(pAggSup, pInfo->twAggSup.calTrigger, &curPoint.key, pInfo->pUpdatedMap, needDel,
pInfo->pDeletedMap);
QUERY_CHECK_CODE(code, lino, _end);
}
releaseOutputBuf(pAggSup->pState, curPoint.pResPos, &pAggSup->stateStore);
@ -1259,8 +1260,9 @@ static void doStreamTimeSliceImpl(SOperatorInfo* pOperator, SSDataBlock* pBlock)
if (left) {
transBlockToResultRow(pBlock, leftRowId, tsCols[leftRowId], nextPoint.pLeftRow);
bool needDel = pInfo->destHasPrimaryKey && winCode == TSDB_CODE_SUCCESS;
saveTimeSliceWinResultInfo(pAggSup, pInfo->twAggSup.calTrigger, &nextPoint.key, pInfo->pUpdatedMap, needDel,
pInfo->pDeletedMap);
code = saveTimeSliceWinResultInfo(pAggSup, pInfo->twAggSup.calTrigger, &nextPoint.key, pInfo->pUpdatedMap,
needDel, pInfo->pDeletedMap);
QUERY_CHECK_CODE(code, lino, _end);
}
releaseOutputBuf(pAggSup->pState, nextPoint.pResPos, &pAggSup->stateStore);
@ -1280,8 +1282,9 @@ static void doStreamTimeSliceImpl(SOperatorInfo* pOperator, SSDataBlock* pBlock)
if (right) {
transBlockToResultRow(pBlock, startPos, tsCols[startPos], curPoint.pRightRow);
bool needDel = pInfo->destHasPrimaryKey && winCode == TSDB_CODE_SUCCESS;
saveTimeSliceWinResultInfo(pAggSup, pInfo->twAggSup.calTrigger, &curPoint.key, pInfo->pUpdatedMap, needDel,
pInfo->pDeletedMap);
code = saveTimeSliceWinResultInfo(pAggSup, pInfo->twAggSup.calTrigger, &curPoint.key, pInfo->pUpdatedMap, needDel,
pInfo->pDeletedMap);
QUERY_CHECK_CODE(code, lino, _end);
}
releaseOutputBuf(pAggSup->pState, curPoint.pResPos, &pAggSup->stateStore);
}
@ -1563,7 +1566,8 @@ static int32_t doStreamTimeSliceNext(SOperatorInfo* pOperator, SSDataBlock** ppR
case STREAM_DELETE_DATA: {
code = doDeleteTimeSliceResult(pAggSup, pBlock, pInfo->pUpdatedMap, pInfo->pDelWins);
QUERY_CHECK_CODE(code, lino, _end);
copyDataBlock(pInfo->pDelRes, pBlock);
code = copyDataBlock(pInfo->pDelRes, pBlock);
QUERY_CHECK_CODE(code, lino, _end);
pInfo->pDelRes->info.type = STREAM_DELETE_RESULT;
(*ppRes) = pInfo->pDelRes;
printDataBlock((*ppRes), getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
@ -1573,7 +1577,8 @@ static int32_t doStreamTimeSliceNext(SOperatorInfo* pOperator, SSDataBlock** ppR
case STREAM_INVALID: {
SExprSupp* pExprSup = &pInfo->scalarSup;
if (pExprSup->pExprInfo != NULL) {
projectApplyFunctions(pExprSup->pExprInfo, pBlock, pBlock, pExprSup->pCtx, pExprSup->numOfExprs, NULL);
code = projectApplyFunctions(pExprSup->pExprInfo, pBlock, pBlock, pExprSup->pCtx, pExprSup->numOfExprs, NULL);
QUERY_CHECK_CODE(code, lino, _end);
}
} break;
case STREAM_CHECKPOINT: {
@ -1581,7 +1586,8 @@ static int32_t doStreamTimeSliceNext(SOperatorInfo* pOperator, SSDataBlock** ppR
pAggSup->stateStore.streamStateCommit(pAggSup->pState);
doStreamTimeSliceSaveCheckpoint(pOperator);
pInfo->recvCkBlock = true;
copyDataBlock(pInfo->pCheckpointRes, pBlock);
code = copyDataBlock(pInfo->pCheckpointRes, pBlock);
QUERY_CHECK_CODE(code, lino, _end);
continue;
} break;
case STREAM_CREATE_CHILD_TABLE: {
@ -1589,7 +1595,8 @@ static int32_t doStreamTimeSliceNext(SOperatorInfo* pOperator, SSDataBlock** ppR
goto _end;
} break;
case STREAM_GET_RESULT: {
setAllResultKey(pAggSup, pBlock->info.window.skey, pInfo->pUpdatedMap);
code = setAllResultKey(pAggSup, pBlock->info.window.skey, pInfo->pUpdatedMap);
QUERY_CHECK_CODE(code, lino, _end);
continue;
}
default:
@ -1601,7 +1608,8 @@ static int32_t doStreamTimeSliceNext(SOperatorInfo* pOperator, SSDataBlock** ppR
}
if (pInfo->destHasPrimaryKey) {
copyIntervalDeleteKey(pInfo->pDeletedMap, pInfo->pDelWins);
code = copyIntervalDeleteKey(pInfo->pDeletedMap, pInfo->pDelWins);
QUERY_CHECK_CODE(code, lino, _end);
}
void* pIte = NULL;
@ -1801,7 +1809,8 @@ int32_t createStreamTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode*
&pTaskInfo->storageAPI, pInfo->primaryTsIndex, STREAM_STATE_BUFF_HASH_SORT, ratio);
QUERY_CHECK_CODE(code, lino, _error);
initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);
code = initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);
QUERY_CHECK_CODE(code, lino, _error);
pInfo->pRes = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc);
pInfo->delIndex = 0;