From 7570fa758bb9e14b6b00bab6eceb4f7370f726e2 Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao> Date: Fri, 8 Dec 2023 16:28:18 +0800 Subject: [PATCH] init delete mark --- source/libs/executor/inc/executorInt.h | 3 +++ .../executor/src/streameventwindowoperator.c | 8 ++++++++ .../libs/executor/src/streamtimewindowoperator.c | 16 +++++++++------- 3 files changed, 20 insertions(+), 7 deletions(-) diff --git a/source/libs/executor/inc/executorInt.h b/source/libs/executor/inc/executorInt.h index cba26c46b5..9b0052976c 100644 --- a/source/libs/executor/inc/executorInt.h +++ b/source/libs/executor/inc/executorInt.h @@ -636,6 +636,7 @@ typedef struct SStreamEventAggOperatorInfo { bool isHistoryOp; SArray* historyWins; bool reCkBlock; + bool recvGetAll; SSDataBlock* pCheckpointRes; SFilterInfo* pStartCondInfo; SFilterInfo* pEndCondInfo; @@ -837,6 +838,8 @@ void compactTimeWindow(SExprSupp* pSup, SStreamAggSupporter* pAggSup, STimeW int32_t releaseOutputBuf(void* pState, SRowBuffPos* pPos, SStateStore* pAPI); void resetWinRange(STimeWindow* winRange); bool checkExpiredData(SStateStore* pAPI, SUpdateInfo* pUpdateInfo, STimeWindowAggSupp* pTwSup, uint64_t tableId, TSKEY ts); +int64_t getDeleteMark(SWindowPhysiNode* pWinPhyNode, int64_t interval); +void resetUnCloseSessionWinInfo(SSHashObj* winMap); int32_t encodeSSessionKey(void** buf, SSessionKey* key); void* decodeSSessionKey(void* buf, SSessionKey* key); diff --git a/source/libs/executor/src/streameventwindowoperator.c b/source/libs/executor/src/streameventwindowoperator.c index 914b95ba83..3d38dfffa4 100644 --- a/source/libs/executor/src/streameventwindowoperator.c +++ b/source/libs/executor/src/streameventwindowoperator.c @@ -486,6 +486,11 @@ static SSDataBlock* doStreamEventAgg(SOperatorInfo* pOperator) { return pInfo->pCheckpointRes; } + if (pInfo->recvGetAll) { + pInfo->recvGetAll = false; + resetUnCloseSessionWinInfo(pInfo->streamAggSup.pResultRows); + } + setOperatorCompleted(pOperator); return NULL; } @@ -510,6 +515,7 @@ static SSDataBlock* doStreamEventAgg(SOperatorInfo* pOperator) { deleteSessionWinState(&pInfo->streamAggSup, pBlock, pInfo->pSeUpdated, pInfo->pSeDeleted); continue; } else if (pBlock->info.type == STREAM_GET_ALL) { + pInfo->recvGetAll = true; getAllSessionWindow(pInfo->streamAggSup.pResultRows, pInfo->pSeUpdated); continue; } else if (pBlock->info.type == STREAM_CREATE_CHILD_TABLE) { @@ -672,6 +678,7 @@ SOperatorInfo* createStreamEventAggOperatorInfo(SOperatorInfo* downstream, SPhys .calTrigger = pEventNode->window.triggerType, .maxTs = INT64_MIN, .minTs = INT64_MAX, + .deleteMark = getDeleteMark(&pEventNode->window, 0), }; initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window); @@ -720,6 +727,7 @@ SOperatorInfo* createStreamEventAggOperatorInfo(SOperatorInfo* downstream, SPhys pInfo->pCheckpointRes = createSpecialDataBlock(STREAM_CHECKPOINT); pInfo->reCkBlock = false; + pInfo->recvGetAll = false; // for stream void* buff = NULL; diff --git a/source/libs/executor/src/streamtimewindowoperator.c b/source/libs/executor/src/streamtimewindowoperator.c index 3dfc92d953..d230442c6d 100644 --- a/source/libs/executor/src/streamtimewindowoperator.c +++ b/source/libs/executor/src/streamtimewindowoperator.c @@ -1345,12 +1345,12 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { return buildIntervalResult(pOperator); } -static int64_t getDeleteMark(SIntervalPhysiNode* pIntervalPhyNode) { - if (pIntervalPhyNode->window.deleteMark <= 0) { +int64_t getDeleteMark(SWindowPhysiNode* pWinPhyNode, int64_t interval) { + if (pWinPhyNode->deleteMark <= 0) { return DEAULT_DELETE_MARK; } - int64_t deleteMark = TMAX(pIntervalPhyNode->window.deleteMark, pIntervalPhyNode->window.watermark); - deleteMark = TMAX(deleteMark, pIntervalPhyNode->interval); + int64_t deleteMark = TMAX(pWinPhyNode->deleteMark, pWinPhyNode->watermark); + deleteMark = TMAX(deleteMark, interval); return deleteMark; } @@ -1442,7 +1442,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, .calTrigger = pIntervalPhyNode->window.triggerType, .maxTs = INT64_MIN, .minTs = INT64_MAX, - .deleteMark = getDeleteMark(pIntervalPhyNode), + .deleteMark = getDeleteMark(&pIntervalPhyNode->window, pIntervalPhyNode->interval), .deleteMarkSaved = 0, .calTriggerSaved = 0, }; @@ -2565,7 +2565,7 @@ void doStreamSessionSaveCheckpoint(SOperatorInfo* pOperator) { taosMemoryFree(buf); } -static void resetUnCloseSessionWinInfo(SSHashObj* winMap) { +void resetUnCloseSessionWinInfo(SSHashObj* winMap) { void* pIte = NULL; int32_t iter = 0; while ((pIte = tSimpleHashIterate(winMap, pIte, &iter)) != NULL) { @@ -2864,6 +2864,7 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh .calTrigger = pSessionNode->window.triggerType, .maxTs = INT64_MIN, .minTs = INT64_MAX, + .deleteMark = getDeleteMark(&pSessionNode->window, 0), }; initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window); @@ -3732,6 +3733,7 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys .calTrigger = pStateNode->window.triggerType, .maxTs = INT64_MIN, .minTs = INT64_MAX, + .deleteMark = getDeleteMark(&pStateNode->window, 0), }; initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window); @@ -3963,7 +3965,7 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys .calTrigger = pIntervalPhyNode->window.triggerType, .maxTs = INT64_MIN, .minTs = INT64_MAX, - .deleteMark = getDeleteMark(pIntervalPhyNode)}; + .deleteMark = getDeleteMark(&pIntervalPhyNode->window, pIntervalPhyNode->interval)}; ASSERTS(pInfo->twAggSup.calTrigger != STREAM_TRIGGER_MAX_DELAY, "trigger type should not be max delay");