From 5d21f50340ec7a4317cd6893ca1acd22a95c8f74 Mon Sep 17 00:00:00 2001 From: liuyao <54liuyao@163.com> Date: Thu, 27 Jul 2023 17:45:12 +0800 Subject: [PATCH] adjust history task trigger mode --- source/libs/executor/src/timewindowoperator.c | 24 +++++++++++++++++-- source/libs/stream/src/streamRecover.c | 19 ++++++++++----- 2 files changed, 35 insertions(+), 8 deletions(-) diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 375c40b015..db7f12983f 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -3737,7 +3737,18 @@ void streamSessionReloadState(SOperatorInfo* pOperator) { int32_t winNum = compactSessionWindow(pOperator, &winInfo, pInfo->pStUpdated, pInfo->pStDeleted, true); if (winNum > 0) { saveSessionOutputBuf(pAggSup, &winInfo); - saveResult(winInfo, pInfo->pStUpdated); + if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE) { + saveResult(winInfo, pInfo->pStUpdated); + } else if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) { + if (!isCloseWindow(&winInfo.sessionWin.win, &pInfo->twAggSup)) { + saveDeleteRes(pInfo->pStDeleted, winInfo.sessionWin); + } + SSessionKey key = {0}; + getSessionHashKey(&winInfo.sessionWin, &key); + tSimpleHashPut(pAggSup->pResultRows, &key, sizeof(SSessionKey), &winInfo, sizeof(SResultWindowInfo)); + } + } else { + releaseOutputBuf(pAggSup->pState, NULL, (SResultRow*)winInfo.pOutputBuf, &pAggSup->stateStore); } } taosMemoryFree(pBuf); @@ -4388,7 +4399,16 @@ void streamStateReloadState(SOperatorInfo* pOperator) { if (compareStateKey(curInfo.pStateKey,nextInfo.pStateKey)) { compactStateWindow(pOperator, &curInfo.winInfo, &nextInfo.winInfo, pInfo->pSeUpdated, pInfo->pSeUpdated); saveSessionOutputBuf(pAggSup, &curInfo.winInfo); - saveResult(curInfo.winInfo, pInfo->pSeUpdated); + if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE) { + saveResult(curInfo.winInfo, pInfo->pSeUpdated); + } else if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) { + if (!isCloseWindow(&curInfo.winInfo.sessionWin.win, &pInfo->twAggSup)) { + saveDeleteRes(pInfo->pSeDeleted, curInfo.winInfo.sessionWin); + } + SSessionKey key = {0}; + getSessionHashKey(&curInfo.winInfo.sessionWin, &key); + tSimpleHashPut(pAggSup->pResultRows, &key, sizeof(SSessionKey), &curInfo.winInfo, sizeof(SResultWindowInfo)); + } } if (IS_VALID_SESSION_WIN(curInfo.winInfo)) { diff --git a/source/libs/stream/src/streamRecover.c b/source/libs/stream/src/streamRecover.c index dffa28e769..4b3103cf5f 100644 --- a/source/libs/stream/src/streamRecover.c +++ b/source/libs/stream/src/streamRecover.c @@ -62,7 +62,9 @@ const char* streamGetTaskStatusStr(int32_t status) { static int32_t doLaunchScanHistoryTask(SStreamTask* pTask) { SVersionRange* pRange = &pTask->dataRange.range; - streamSetParamForScanHistory(pTask); + if (pTask->info.fillHistory) { + streamSetParamForScanHistory(pTask); + } streamSetParamForStreamScannerStep1(pTask, pRange, &pTask->dataRange.window); int32_t code = streamStartRecoverTask(pTask, 0); @@ -80,7 +82,9 @@ int32_t streamTaskLaunchScanHistory(SStreamTask* pTask) { walReaderGetCurrentVer(pTask->exec.pWalReader)); } } else if (pTask->info.taskLevel == TASK_LEVEL__AGG) { - streamSetParamForScanHistory(pTask); + if (pTask->info.fillHistory) { + streamSetParamForScanHistory(pTask); + } streamTaskScanHistoryPrepare(pTask); } else if (pTask->info.taskLevel == TASK_LEVEL__SINK) { qDebug("s-task:%s sink task do nothing to handle scan-history", pTask->id.idStr); @@ -434,7 +438,7 @@ int32_t streamTaskScanHistoryPrepare(SStreamTask* pTask) { int32_t streamAggUpstreamScanHistoryFinish(SStreamTask* pTask) { void* exec = pTask->exec.pExecutor; - if (qRestoreStreamOperatorOption(exec) < 0) { + if (pTask->info.fillHistory && qRestoreStreamOperatorOption(exec) < 0) { return -1; } @@ -625,9 +629,12 @@ int32_t streamTaskScanHistoryDataComplete(SStreamTask* pTask) { } // restore param - int32_t code = streamRestoreParam(pTask); - if (code < 0) { - return -1; + int32_t code = 0; + if (pTask->info.fillHistory) { + code = streamRestoreParam(pTask); + if (code < 0) { + return -1; + } } // dispatch recover finish req to all related downstream task