diff --git a/source/libs/executor/src/streamtimewindowoperator.c b/source/libs/executor/src/streamtimewindowoperator.c index 91af218a7e..031d2e8bdc 100644 --- a/source/libs/executor/src/streamtimewindowoperator.c +++ b/source/libs/executor/src/streamtimewindowoperator.c @@ -4466,6 +4466,19 @@ _end: return code; } +static bool isWinResult(SSessionKey* pKey, SSHashObj* pSeUpdate, SSHashObj* pResults) { + SSessionKey checkKey = {0}; + getSessionHashKey(pKey, &checkKey); + if (tSimpleHashGet(pSeUpdate, &checkKey, sizeof(SSessionKey)) != NULL) { + return true; + } + + if (tSimpleHashGet(pResults, &checkKey, sizeof(SSessionKey)) != NULL) { + return true; + } + return false; +} + static void doStreamStateAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBlock, SSHashObj* pSeUpdated, SSHashObj* pStDeleted) { SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; @@ -4518,7 +4531,9 @@ static void doStreamStateAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl code = setStateOutputBuf(pAggSup, tsCols[i], groupId, pKeyData, &curWin, &nextWin); QUERY_CHECK_CODE(code, lino, _end); - releaseOutputBuf(pAggSup->pState, nextWin.winInfo.pStatePos, &pAPI->stateStore); + if (isWinResult(&nextWin.winInfo.sessionWin, pSeUpdated, pAggSup->pResultRows) == false) { + releaseOutputBuf(pAggSup->pState, nextWin.winInfo.pStatePos, &pAPI->stateStore); + } setSessionWinOutputInfo(pSeUpdated, &curWin.winInfo); code = updateStateWindowInfo(pAggSup, &curWin, &nextWin, tsCols, groupId, pKeyColInfo, rows, i, &allEqual, diff --git a/tests/script/tsim/stream/basic5.sim b/tests/script/tsim/stream/basic5.sim index d7dd603d3c..5d90740b13 100644 --- a/tests/script/tsim/stream/basic5.sim +++ b/tests/script/tsim/stream/basic5.sim @@ -2,13 +2,28 @@ system sh/stop_dnodes.sh system sh/deploy.sh -n dnode1 -i 1 system sh/cfg.sh -n dnode1 -c debugflag -v 135 system sh/cfg.sh -n dnode1 -c streamBufferSize -v 10 +system sh/cfg.sh -n dnode1 -c checkpointinterval -v 60 +system sh/cfg.sh -n dnode1 -c snodeAddress -v 127.0.0.1:873 system sh/exec.sh -n dnode1 -s start sleep 500 sql connect +print step1 ============= -print step1============= +print ================ create snode +sql show snodes +if $rows != 0 then + return -1 +endi + +sql create snode on dnode 1; +sql show snodes; +if $rows != 1 then + return -1 +endi + +print ============== snode created , create db sql create database test3 vgroups 1; sql use test3; @@ -57,7 +72,7 @@ loop8: sleep 200 $loop_count = $loop_count + 1 -if $loop_count == 10 then +if $loop_count == 30 then return -1 endi @@ -87,7 +102,7 @@ loop9: sleep 200 $loop_count = $loop_count + 1 -if $loop_count == 10 then +if $loop_count == 30 then return -1 endi @@ -127,7 +142,7 @@ loop10: sleep 200 $loop_count = $loop_count + 1 -if $loop_count == 10 then +if $loop_count == 30 then return -1 endi @@ -162,7 +177,7 @@ loop11: sleep 200 $loop_count = $loop_count + 1 -if $loop_count == 10 then +if $loop_count == 30 then return -1 endi @@ -194,7 +209,7 @@ loop11: sleep 200 $loop_count = $loop_count + 1 -if $loop_count == 10 then +if $loop_count == 30 then return -1 endi @@ -239,7 +254,7 @@ loop12: sleep 200 $loop_count = $loop_count + 1 -if $loop_count == 10 then +if $loop_count == 30 then return -1 endi @@ -315,7 +330,7 @@ loop13: sleep 200 $loop_count = $loop_count + 1 -if $loop_count == 10 then +if $loop_count == 30 then return -1 endi @@ -369,7 +384,6 @@ if $data24 != 1 then endi print step4============= - sql create database test6 vgroups 4; sql use test6; sql create stable st(ts timestamp,a int,b int,c int,d int) tags(ta int,tb int,tc int); @@ -396,7 +410,7 @@ loop14: sleep 200 $loop_count = $loop_count + 1 -if $loop_count == 10 then +if $loop_count == 30 then return -1 endi @@ -449,4 +463,17 @@ if $data25 != 2 then goto loop14 endi -system sh/exec.sh -n dnode1 -s stop -x SIGINT \ No newline at end of file +print sleep for 1min for checkpoint generate +sleep 60000 + +print ================== restart to load checkpoint from snode + +system sh/exec.sh -n dnode1 -s stop -x SIGINT +system sh/exec.sh -n dnode1 -s start + +sleep 500 +sql connect + +sleep 30000 + +sql select start_ver, checkpoint_ver from information_schema.ins_stream_tasks;