From 0c284c4398283629c80d964f697aaa9cdc50e29b Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao> Date: Tue, 12 Dec 2023 16:04:58 +0800 Subject: [PATCH] reset stream trigger --- source/libs/executor/src/executor.c | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 1fa911e646..6cee79bff2 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -984,6 +984,21 @@ int32_t qSetStreamOperatorOptionForScanHistory(qTaskInfo_t tinfo) { qInfo("save stream param for state: %d, %" PRId64, pSup->calTrigger, pSup->deleteMark); + pSup->calTriggerSaved = pSup->calTrigger; + pSup->deleteMarkSaved = pSup->deleteMark; + pSup->calTrigger = STREAM_TRIGGER_AT_ONCE; + pSup->deleteMark = INT64_MAX; + pInfo->ignoreExpiredDataSaved = pInfo->ignoreExpiredData; + pInfo->ignoreExpiredData = false; + } else if (type == QUERY_NODE_PHYSICAL_PLAN_STREAM_EVENT) { + SStreamEventAggOperatorInfo* pInfo = pOperator->info; + STimeWindowAggSupp* pSup = &pInfo->twAggSup; + + ASSERT(pSup->calTrigger == STREAM_TRIGGER_AT_ONCE || pSup->calTrigger == STREAM_TRIGGER_WINDOW_CLOSE); + ASSERT(pSup->calTriggerSaved == 0 && pSup->deleteMarkSaved == 0); + + qInfo("save stream param for state: %d, %" PRId64, pSup->calTrigger, pSup->deleteMark); + pSup->calTriggerSaved = pSup->calTrigger; pSup->deleteMarkSaved = pSup->deleteMark; pSup->calTrigger = STREAM_TRIGGER_AT_ONCE;