From a0a1414af56ac0701299e403cf667faa6aee1bef Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao> Date: Tue, 3 Sep 2024 09:23:42 +0800 Subject: [PATCH] fix issue --- source/libs/executor/src/streamtimesliceoperator.c | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/source/libs/executor/src/streamtimesliceoperator.c b/source/libs/executor/src/streamtimesliceoperator.c index e4db77b43f..eed49032a6 100644 --- a/source/libs/executor/src/streamtimesliceoperator.c +++ b/source/libs/executor/src/streamtimesliceoperator.c @@ -1152,22 +1152,23 @@ static void transBlockToResultRow(const SSDataBlock* pBlock, int32_t rowId, TSKE pRowVal->key = ts; } -static int32_t saveTimeSliceWinResultInfo(SStreamAggSupporter* pAggSup, int8_t trigger, SWinKey* pKey, +static int32_t saveTimeSliceWinResultInfo(SStreamAggSupporter* pAggSup, STimeWindowAggSupp* pTwAggSup, SWinKey* pKey, SSHashObj* pUpdatedMap, bool needDel, SSHashObj* pDeletedMap) { int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; - if (trigger == STREAM_TRIGGER_AT_ONCE) { + if (pTwAggSup->calTrigger == STREAM_TRIGGER_AT_ONCE) { code = saveTimeSliceWinResult(pKey, pUpdatedMap); QUERY_CHECK_CODE(code, lino, _end); if (needDel) { code = saveTimeSliceWinResult(pKey, pDeletedMap); QUERY_CHECK_CODE(code, lino, _end); } - } else if (trigger == STREAM_TRIGGER_FORCE_WINDOW_CLOSE) { + } else if (pTwAggSup->calTrigger == STREAM_TRIGGER_FORCE_WINDOW_CLOSE) { code = pAggSup->stateStore.streamStateGroupPut(pAggSup->pState, pKey->groupId, NULL, 0); QUERY_CHECK_CODE(code, lino, _end); } + pTwAggSup->maxTs = TMAX(pTwAggSup->maxTs, pKey->ts); _end: if (code != TSDB_CODE_SUCCESS) { @@ -1244,7 +1245,7 @@ static void doStreamTimeSliceImpl(SOperatorInfo* pOperator, SSDataBlock* pBlock) if (right) { transBlockToResultRow(pBlock, startPos, tsCols[startPos], curPoint.pRightRow); bool needDel = pInfo->destHasPrimaryKey && winCode == TSDB_CODE_SUCCESS; - code = saveTimeSliceWinResultInfo(pAggSup, pInfo->twAggSup.calTrigger, &curPoint.key, pInfo->pUpdatedMap, needDel, + code = saveTimeSliceWinResultInfo(pAggSup, &pInfo->twAggSup, &curPoint.key, pInfo->pUpdatedMap, needDel, pInfo->pDeletedMap); QUERY_CHECK_CODE(code, lino, _end); } @@ -1260,7 +1261,7 @@ static void doStreamTimeSliceImpl(SOperatorInfo* pOperator, SSDataBlock* pBlock) if (left) { transBlockToResultRow(pBlock, leftRowId, tsCols[leftRowId], nextPoint.pLeftRow); bool needDel = pInfo->destHasPrimaryKey && winCode == TSDB_CODE_SUCCESS; - code = saveTimeSliceWinResultInfo(pAggSup, pInfo->twAggSup.calTrigger, &nextPoint.key, pInfo->pUpdatedMap, + code = saveTimeSliceWinResultInfo(pAggSup, &pInfo->twAggSup, &nextPoint.key, pInfo->pUpdatedMap, needDel, pInfo->pDeletedMap); QUERY_CHECK_CODE(code, lino, _end); } @@ -1282,7 +1283,7 @@ static void doStreamTimeSliceImpl(SOperatorInfo* pOperator, SSDataBlock* pBlock) if (right) { transBlockToResultRow(pBlock, startPos, tsCols[startPos], curPoint.pRightRow); bool needDel = pInfo->destHasPrimaryKey && winCode == TSDB_CODE_SUCCESS; - code = saveTimeSliceWinResultInfo(pAggSup, pInfo->twAggSup.calTrigger, &curPoint.key, pInfo->pUpdatedMap, needDel, + code = saveTimeSliceWinResultInfo(pAggSup, &pInfo->twAggSup, &curPoint.key, pInfo->pUpdatedMap, needDel, pInfo->pDeletedMap); QUERY_CHECK_CODE(code, lino, _end); }