fix ci check

This commit is contained in:
54liuyao 2024-10-16 13:39:32 +08:00
parent 17939a184b
commit b24c9c5d20
3 changed files with 27 additions and 13 deletions

View File

@ -1177,7 +1177,8 @@ void doBuildForceFillResultImpl(SOperatorInfo* pOperator, SStreamFillSupporter*
if (winCode == TSDB_CODE_SUCCESS) {
pFillSup->cur.key = pKey->ts;
pFillSup->cur.pRowVal = val;
buildFillResult(&pFillSup->cur, pFillSup, pKey->ts, pBlock, &res);
code = buildFillResult(&pFillSup->cur, pFillSup, pKey->ts, pBlock, &res);
QUERY_CHECK_CODE(code, lino, _end);
resetFillWindow(&pFillSup->cur);
} else {
SStreamStateCur* pCur = pAPI->stateStore.streamStateFillSeekKeyPrev(pState, pKey);
@ -1254,14 +1255,15 @@ static int32_t doStreamForceFillImpl(SOperatorInfo* pOperator) {
SStreamFillInfo* pFillInfo = pInfo->pFillInfo;
SSDataBlock* pBlock = pInfo->pSrcBlock;
uint64_t groupId = pBlock->info.id.groupId;
SSDataBlock* pRes = pInfo->pRes;
SStreamAggSupporter* pAggSup = pInfo->pStreamAggSup;
SColumnInfoData* pTsCol = taosArrayGet(pInfo->pSrcBlock->pDataBlock, pInfo->primaryTsCol);
TSKEY* tsCol = (TSKEY*)pTsCol->pData;
if (pFillInfo->type == TSDB_FILL_PREV) {
for (int32_t i = 0; i < pBlock->info.rows; i++){
code = keepBlockRowInDiscBuf(pOperator, pFillInfo, pBlock, tsCol, i, groupId, pFillSup->rowSize);
QUERY_CHECK_CODE(code, lino, _end);
}
}
code = pAggSup->stateStore.streamStateGroupPut(pAggSup->pState, groupId, NULL, 0);
QUERY_CHECK_CODE(code, lino, _end);
@ -1322,7 +1324,9 @@ static int32_t doStreamForceFillNext(SOperatorInfo* pOperator, SSDataBlock** ppR
goto _end;
}
if (pInfo->pFillInfo->type == TSDB_FILL_PREV) {
pInfo->pStreamAggSup->stateStore.streamStateClearExpiredState(pInfo->pStreamAggSup->pState);
}
setStreamOperatorCompleted(pOperator);
(*ppRes) = NULL;
goto _end;
@ -1364,7 +1368,8 @@ static int32_t doStreamForceFillNext(SOperatorInfo* pOperator, SSDataBlock** ppR
QUERY_CHECK_CODE(code, lino, _end);
}
doStreamForceFillImpl(pOperator);
code = doStreamForceFillImpl(pOperator);
QUERY_CHECK_CODE(code, lino, _end);
}
for (int32_t i = 0; i < taosArrayGetSize(pInfo->pCloseTs); i++) {
@ -1388,7 +1393,9 @@ static int32_t doStreamForceFillNext(SOperatorInfo* pOperator, SSDataBlock** ppR
QUERY_CHECK_CODE(code, lino, _end);
if ((*ppRes) == NULL) {
if (pInfo->pFillInfo->type == TSDB_FILL_PREV) {
pInfo->pStreamAggSup->stateStore.streamStateClearExpiredState(pInfo->pStreamAggSup->pState);
}
setStreamOperatorCompleted(pOperator);
}

View File

@ -248,7 +248,9 @@ static int32_t doStreamIntervalSliceAggImpl(SOperatorInfo* pOperator, SSDataBloc
QUERY_CHECK_CODE(code, lino, _end);
if (IS_VALID_WIN_KEY(prevPoint.winKey.win.skey) && prevPoint.pLastRow->key != prevPoint.winKey.win.ekey) {
setIntervalSliceOutputBuf(&prevPoint, pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset);
code = setIntervalSliceOutputBuf(&prevPoint, pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset);
QUERY_CHECK_CODE(code, lino, _end);
resetIntervalSliceFunctionKey(pSup->pCtx, numOfOutput);
doStreamSliceInterpolation(prevPoint.pLastRow, prevPoint.winKey.win.ekey, curTs, pBlock, startPos, &pOperator->exprSupp, INTERVAL_SLICE_END);
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &prevPoint.winKey.win, 1);
@ -256,10 +258,13 @@ static int32_t doStreamIntervalSliceAggImpl(SOperatorInfo* pOperator, SSDataBloc
0, pBlock->info.rows, numOfOutput);
QUERY_CHECK_CODE(code, lino, _end);
SWinKey prevKey = {.ts = prevPoint.winKey.win.skey, .groupId = prevPoint.winKey.groupId};
saveWinResult(&prevKey, prevPoint.pResPos, pInfo->pUpdatedMap);
code = saveWinResult(&prevKey, prevPoint.pResPos, pInfo->pUpdatedMap);
QUERY_CHECK_CODE(code, lino, _end);
}
setIntervalSliceOutputBuf(&curPoint, pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset);
code = setIntervalSliceOutputBuf(&curPoint, pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset);
QUERY_CHECK_CODE(code, lino, _end);
resetIntervalSliceFunctionKey(pSup->pCtx, numOfOutput);
if (IS_VALID_WIN_KEY(prevPoint.winKey.win.skey) && curPoint.winKey.win.skey != curTs) {
doStreamSliceInterpolation(prevPoint.pLastRow, curPoint.winKey.win.skey, curTs, pBlock, startPos, &pOperator->exprSupp, INTERVAL_SLICE_START);
@ -278,7 +283,8 @@ static int32_t doStreamIntervalSliceAggImpl(SOperatorInfo* pOperator, SSDataBloc
QUERY_CHECK_CODE(code, lino, _end);
}
saveWinResult(&curKey, curPoint.pResPos, pInfo->pUpdatedMap);
code = saveWinResult(&curKey, curPoint.pResPos, pInfo->pUpdatedMap);
QUERY_CHECK_CODE(code, lino, _end);
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &curPoint.winKey.win, 1);
code = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos,

View File

@ -1222,7 +1222,8 @@ void clearExpiredState(SStreamFileState* pFileState) {
int32_t code_file = pFileState->stateFileRemoveFn(pFileState, pKey);
qTrace("%s at line %d res:%d", __func__, __LINE__, code_file);
streamStateFillDel_rocksdb(pFileState->pFileStore, pKey);
code_file = streamStateFillDel_rocksdb(pFileState->pFileStore, pKey);
qTrace("%s at line %d res %d", __func__, __LINE__, code_file);
}
taosArrayRemoveBatch(pWinStates, 0, size - 1, NULL);
}