From b24c9c5d2064baac9523a9c308e41dd0545b13d0 Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao@163.com> Date: Wed, 16 Oct 2024 13:39:32 +0800 Subject: [PATCH] fix ci check --- source/libs/executor/src/streamfilloperator.c | 23 ++++++++++++------- .../src/streamintervalsliceoperator.c | 14 +++++++---- source/libs/stream/src/tstreamFileState.c | 3 ++- 3 files changed, 27 insertions(+), 13 deletions(-) diff --git a/source/libs/executor/src/streamfilloperator.c b/source/libs/executor/src/streamfilloperator.c index d3766a5eb5..58050561dd 100644 --- a/source/libs/executor/src/streamfilloperator.c +++ b/source/libs/executor/src/streamfilloperator.c @@ -1177,7 +1177,8 @@ void doBuildForceFillResultImpl(SOperatorInfo* pOperator, SStreamFillSupporter* if (winCode == TSDB_CODE_SUCCESS) { pFillSup->cur.key = pKey->ts; pFillSup->cur.pRowVal = val; - buildFillResult(&pFillSup->cur, pFillSup, pKey->ts, pBlock, &res); + code = buildFillResult(&pFillSup->cur, pFillSup, pKey->ts, pBlock, &res); + QUERY_CHECK_CODE(code, lino, _end); resetFillWindow(&pFillSup->cur); } else { SStreamStateCur* pCur = pAPI->stateStore.streamStateFillSeekKeyPrev(pState, pKey); @@ -1254,13 +1255,14 @@ static int32_t doStreamForceFillImpl(SOperatorInfo* pOperator) { SStreamFillInfo* pFillInfo = pInfo->pFillInfo; SSDataBlock* pBlock = pInfo->pSrcBlock; uint64_t groupId = pBlock->info.id.groupId; - SSDataBlock* pRes = pInfo->pRes; SStreamAggSupporter* pAggSup = pInfo->pStreamAggSup; SColumnInfoData* pTsCol = taosArrayGet(pInfo->pSrcBlock->pDataBlock, pInfo->primaryTsCol); TSKEY* tsCol = (TSKEY*)pTsCol->pData; - for (int32_t i = 0; i < pBlock->info.rows; i++){ - code = keepBlockRowInDiscBuf(pOperator, pFillInfo, pBlock, tsCol, i, groupId, pFillSup->rowSize); - QUERY_CHECK_CODE(code, lino, _end); + if (pFillInfo->type == TSDB_FILL_PREV) { + for (int32_t i = 0; i < pBlock->info.rows; i++){ + code = keepBlockRowInDiscBuf(pOperator, pFillInfo, pBlock, tsCol, i, groupId, pFillSup->rowSize); + QUERY_CHECK_CODE(code, lino, _end); + } } code = pAggSup->stateStore.streamStateGroupPut(pAggSup->pState, groupId, NULL, 0); QUERY_CHECK_CODE(code, lino, _end); @@ -1322,7 +1324,9 @@ static int32_t doStreamForceFillNext(SOperatorInfo* pOperator, SSDataBlock** ppR goto _end; } - pInfo->pStreamAggSup->stateStore.streamStateClearExpiredState(pInfo->pStreamAggSup->pState); + if (pInfo->pFillInfo->type == TSDB_FILL_PREV) { + pInfo->pStreamAggSup->stateStore.streamStateClearExpiredState(pInfo->pStreamAggSup->pState); + } setStreamOperatorCompleted(pOperator); (*ppRes) = NULL; goto _end; @@ -1364,7 +1368,8 @@ static int32_t doStreamForceFillNext(SOperatorInfo* pOperator, SSDataBlock** ppR QUERY_CHECK_CODE(code, lino, _end); } - doStreamForceFillImpl(pOperator); + code = doStreamForceFillImpl(pOperator); + QUERY_CHECK_CODE(code, lino, _end); } for (int32_t i = 0; i < taosArrayGetSize(pInfo->pCloseTs); i++) { @@ -1388,7 +1393,9 @@ static int32_t doStreamForceFillNext(SOperatorInfo* pOperator, SSDataBlock** ppR QUERY_CHECK_CODE(code, lino, _end); if ((*ppRes) == NULL) { - pInfo->pStreamAggSup->stateStore.streamStateClearExpiredState(pInfo->pStreamAggSup->pState); + if (pInfo->pFillInfo->type == TSDB_FILL_PREV) { + pInfo->pStreamAggSup->stateStore.streamStateClearExpiredState(pInfo->pStreamAggSup->pState); + } setStreamOperatorCompleted(pOperator); } diff --git a/source/libs/executor/src/streamintervalsliceoperator.c b/source/libs/executor/src/streamintervalsliceoperator.c index 7ec47958d1..e873b31062 100644 --- a/source/libs/executor/src/streamintervalsliceoperator.c +++ b/source/libs/executor/src/streamintervalsliceoperator.c @@ -248,7 +248,9 @@ static int32_t doStreamIntervalSliceAggImpl(SOperatorInfo* pOperator, SSDataBloc QUERY_CHECK_CODE(code, lino, _end); if (IS_VALID_WIN_KEY(prevPoint.winKey.win.skey) && prevPoint.pLastRow->key != prevPoint.winKey.win.ekey) { - setIntervalSliceOutputBuf(&prevPoint, pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset); + code = setIntervalSliceOutputBuf(&prevPoint, pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset); + QUERY_CHECK_CODE(code, lino, _end); + resetIntervalSliceFunctionKey(pSup->pCtx, numOfOutput); doStreamSliceInterpolation(prevPoint.pLastRow, prevPoint.winKey.win.ekey, curTs, pBlock, startPos, &pOperator->exprSupp, INTERVAL_SLICE_END); updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &prevPoint.winKey.win, 1); @@ -256,10 +258,13 @@ static int32_t doStreamIntervalSliceAggImpl(SOperatorInfo* pOperator, SSDataBloc 0, pBlock->info.rows, numOfOutput); QUERY_CHECK_CODE(code, lino, _end); SWinKey prevKey = {.ts = prevPoint.winKey.win.skey, .groupId = prevPoint.winKey.groupId}; - saveWinResult(&prevKey, prevPoint.pResPos, pInfo->pUpdatedMap); + code = saveWinResult(&prevKey, prevPoint.pResPos, pInfo->pUpdatedMap); + QUERY_CHECK_CODE(code, lino, _end); } - setIntervalSliceOutputBuf(&curPoint, pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset); + code = setIntervalSliceOutputBuf(&curPoint, pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset); + QUERY_CHECK_CODE(code, lino, _end); + resetIntervalSliceFunctionKey(pSup->pCtx, numOfOutput); if (IS_VALID_WIN_KEY(prevPoint.winKey.win.skey) && curPoint.winKey.win.skey != curTs) { doStreamSliceInterpolation(prevPoint.pLastRow, curPoint.winKey.win.skey, curTs, pBlock, startPos, &pOperator->exprSupp, INTERVAL_SLICE_START); @@ -278,7 +283,8 @@ static int32_t doStreamIntervalSliceAggImpl(SOperatorInfo* pOperator, SSDataBloc QUERY_CHECK_CODE(code, lino, _end); } - saveWinResult(&curKey, curPoint.pResPos, pInfo->pUpdatedMap); + code = saveWinResult(&curKey, curPoint.pResPos, pInfo->pUpdatedMap); + QUERY_CHECK_CODE(code, lino, _end); updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &curPoint.winKey.win, 1); code = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, diff --git a/source/libs/stream/src/tstreamFileState.c b/source/libs/stream/src/tstreamFileState.c index ee6629704f..d31665de90 100644 --- a/source/libs/stream/src/tstreamFileState.c +++ b/source/libs/stream/src/tstreamFileState.c @@ -1222,7 +1222,8 @@ void clearExpiredState(SStreamFileState* pFileState) { int32_t code_file = pFileState->stateFileRemoveFn(pFileState, pKey); qTrace("%s at line %d res:%d", __func__, __LINE__, code_file); - streamStateFillDel_rocksdb(pFileState->pFileStore, pKey); + code_file = streamStateFillDel_rocksdb(pFileState->pFileStore, pKey); + qTrace("%s at line %d res %d", __func__, __LINE__, code_file); } taosArrayRemoveBatch(pWinStates, 0, size - 1, NULL); }