diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 769be6268c..69aaef48f4 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -2594,8 +2594,6 @@ int32_t doStreamIntervalEncodeOpState(void** buf, int32_t len, SOperatorInfo* pO while ((pIte = tSimpleHashIterate(pInfo->aggSup.pResultRowHashTable, pIte, &iter)) != NULL) { void* key = taosHashGetKey(pIte, &keyLen); tlen += encodeSWinKey(buf, key); - SRowBuffPos* pPos = *(void**)pIte; - tlen += encodeSRowBuffPos(buf, pPos); } // 2.twAggSup @@ -2655,10 +2653,10 @@ void doStreamIntervalDecodeOpState(void* buf, int32_t len, SOperatorInfo* pOpera buf = taosDecodeFixedI32(buf, &mapSize); for (int32_t i = 0; i < mapSize; i++) { SWinKey key = {0}; - SRowBuffPos* pPos = taosMemoryCalloc(1, sizeof(SRowBuffPos)); - pPos->pKey = taosMemoryCalloc(1, sizeof(SWinKey)); buf = decodeSWinKey(buf, &key); - buf = decodeSRowBuffPos(buf, pPos); + SRowBuffPos* pPos = NULL; + int32_t resSize = pInfo->aggSup.resultRowSize; + pInfo->stateStore.streamStateAddIfNotExist(pInfo->pState, &key, (void**)&pPos, &resSize); tSimpleHashPut(pInfo->aggSup.pResultRowHashTable, &key, sizeof(SWinKey), &pPos, POINTER_BYTES); } diff --git a/tests/script/tsim/stream/checkpointInterval0.sim b/tests/script/tsim/stream/checkpointInterval0.sim index 5bc8222a54..1c212eb2a7 100644 --- a/tests/script/tsim/stream/checkpointInterval0.sim +++ b/tests/script/tsim/stream/checkpointInterval0.sim @@ -14,6 +14,7 @@ sql use test; sql create table t1(ts timestamp, a int, b int , c int, d double); sql create stream streams0 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt as select _wstart, count(*) c1, sum(a) from t1 interval(10s); +sql create stream streams1 trigger window_close IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt1 as select _wstart, count(*) c1, sum(a) from t1 interval(10s); sql insert into t1 values(1648791213000,1,2,3,1.0); sql insert into t1 values(1648791213001,2,2,3,1.1); @@ -45,6 +46,23 @@ if $data02 != 3 then goto loop0 endi +$loop_count = 0 + +loop01: +sleep 1000 + +sql select * from streamt1; + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +if $rows != 0 then + print =====rows=$rows expect 1 + goto loop01 +endi + print waiting for checkpoint generation 1 ...... sleep 25000 @@ -126,6 +144,36 @@ if $data12 != 4 then goto loop2 endi + +$loop_count = 0 + +loop3: +sleep 1000 + +print select * from streamt1; +sql select * from streamt1; + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +if $rows != 1 then + print =====rows=$rows expect 2 + goto loop3 +endi + +# row 0 +if $data01 != 3 then + print =====data01=$data01 + goto loop3 +endi + +if $data02 != 6 then + print =====data02=$data02 + goto loop3 +endi + print step 2 print restart taosd 02 ...... @@ -136,7 +184,7 @@ system sh/exec.sh -n dnode1 -s start sql insert into t1 values(1648791223004,5,2,3,1.1); -loop20: +loop4: sleep 1000 sql select * from streamt; @@ -148,29 +196,58 @@ endi if $rows != 2 then print =====rows=$rows expect 2 - goto loop20 + goto loop4 endi # row 0 if $data01 != 3 then print =====data01=$data01 - goto loop20 + goto loop4 endi if $data02 != 6 then print =====data02=$data02 - goto loop20 + goto loop4 endi # row 1 if $data11 != 2 then print =====data11=$data11 - goto loop20 + goto loop4 endi if $data12 != 9 then print =====data12=$data12 - goto loop20 + goto loop4 +endi + +$loop_count = 0 + +loop5: +sleep 1000 + +print select * from streamt1; +sql select * from streamt1; + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +if $rows != 1 then + print =====rows=$rows expect 2 + goto loop5 +endi + +# row 0 +if $data01 != 3 then + print =====data01=$data01 + goto loop5 +endi + +if $data02 != 6 then + print =====data02=$data02 + goto loop5 endi print end---------------------------------