From 2e543d950ab62caca9827ab7c8cf33cc551e914e Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao> Date: Mon, 15 Jan 2024 17:36:23 +0800 Subject: [PATCH] set delete for session op --- .../libs/executor/src/streamtimewindowoperator.c | 16 +++++++++------- source/libs/stream/src/tstreamFileState.c | 3 +-- 2 files changed, 10 insertions(+), 9 deletions(-) diff --git a/source/libs/executor/src/streamtimewindowoperator.c b/source/libs/executor/src/streamtimewindowoperator.c index 22339aa384..02f8b90864 100644 --- a/source/libs/executor/src/streamtimewindowoperator.c +++ b/source/libs/executor/src/streamtimewindowoperator.c @@ -2893,6 +2893,14 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh goto _error; } + pInfo->twAggSup = (STimeWindowAggSupp){ + .waterMark = pSessionNode->window.watermark, + .calTrigger = pSessionNode->window.triggerType, + .maxTs = INT64_MIN, + .minTs = INT64_MAX, + .deleteMark = getDeleteMark(&pSessionNode->window, 0), + }; + code = initStreamAggSupporter(&pInfo->streamAggSup, pExpSup, numOfCols, pSessionNode->gap, pTaskInfo->streamInfo.pState, 0, 0, &pTaskInfo->storageAPI.stateStore, pHandle, &pInfo->twAggSup, GET_TASKID(pTaskInfo), &pTaskInfo->storageAPI); @@ -2900,13 +2908,6 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh goto _error; } - pInfo->twAggSup = (STimeWindowAggSupp){ - .waterMark = pSessionNode->window.watermark, - .calTrigger = pSessionNode->window.triggerType, - .maxTs = INT64_MIN, - .minTs = INT64_MAX, - }; - initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window); pInfo->primaryTsIndex = ((SColumnNode*)pSessionNode->window.pTspk)->slotId; @@ -3775,6 +3776,7 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys .calTrigger = pStateNode->window.triggerType, .maxTs = INT64_MIN, .minTs = INT64_MAX, + .deleteMark = getDeleteMark(&pStateNode->window, 0), }; initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window); diff --git a/source/libs/stream/src/tstreamFileState.c b/source/libs/stream/src/tstreamFileState.c index aac78cf03b..fb5e02c827 100644 --- a/source/libs/stream/src/tstreamFileState.c +++ b/source/libs/stream/src/tstreamFileState.c @@ -527,8 +527,7 @@ bool hasRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen) { } SStreamSnapshot* getSnapshot(SStreamFileState* pFileState) { - int64_t mark = (INT64_MIN + pFileState->deleteMark >= pFileState->maxTs) ? INT64_MIN - : pFileState->maxTs - pFileState->deleteMark; + int64_t mark = (pFileState->deleteMark == INT64_MAX) ? INT64_MIN : pFileState->maxTs - pFileState->deleteMark; clearExpiredRowBuff(pFileState, mark, false); return pFileState->usedBuffs; }