diff --git a/source/libs/executor/inc/executorInt.h b/source/libs/executor/inc/executorInt.h index 04c413e941..273477c8a0 100644 --- a/source/libs/executor/inc/executorInt.h +++ b/source/libs/executor/inc/executorInt.h @@ -620,6 +620,7 @@ typedef struct SStreamEventAggOperatorInfo { bool ignoreExpiredDataSaved; SArray* pUpdated; SSHashObj* pSeUpdated; + SSHashObj* pAllUpdated; int64_t dataVersion; bool isHistoryOp; SArray* historyWins; diff --git a/source/libs/executor/src/streameventwindowoperator.c b/source/libs/executor/src/streameventwindowoperator.c index 73d347e153..56bf4f818d 100644 --- a/source/libs/executor/src/streameventwindowoperator.c +++ b/source/libs/executor/src/streameventwindowoperator.c @@ -58,6 +58,7 @@ void destroyStreamEventOperatorInfo(void* param) { colDataDestroy(&pInfo->twAggSup.timeWindowData); blockDataDestroy(pInfo->pDelRes); tSimpleHashCleanup(pInfo->pSeUpdated); + tSimpleHashCleanup(pInfo->pAllUpdated); tSimpleHashCleanup(pInfo->pSeDeleted); pInfo->pUpdated = taosArrayDestroy(pInfo->pUpdated); cleanupGroupResInfo(&pInfo->groupResInfo); @@ -326,6 +327,10 @@ static void doStreamEventAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl compactEventWindow(pOperator, &curWin, pInfo->pSeUpdated, pInfo->pSeDeleted, false); saveSessionOutputBuf(pAggSup, &curWin.winInfo); + if (pInfo->isHistoryOp) { + saveResult(curWin.winInfo, pInfo->pAllUpdated); + } + if (isWindowIncomplete(&curWin)) { continue; } @@ -527,7 +532,10 @@ static SSDataBlock* doStreamEventAgg(SOperatorInfo* pOperator) { removeSessionDeleteResults(pInfo->pSeDeleted, pInfo->pUpdated); if (pInfo->isHistoryOp) { - getMaxTsWins(pInfo->pUpdated, pInfo->historyWins); + SArray* pHisWins = taosArrayInit(16, sizeof(SEventWindowInfo)); + copyUpdateResult(&pInfo->pAllUpdated, pHisWins, sessionKeyCompareAsc); + getMaxTsWins(pHisWins, pInfo->historyWins); + taosArrayDestroy(pHisWins); } initGroupResInfoFromArrayList(&pInfo->groupResInfo, pInfo->pUpdated); @@ -594,16 +602,13 @@ void streamEventReloadState(SOperatorInfo* pOperator) { pSeKeyBuf[i].groupId, i); getSessionWindowInfoByKey(pAggSup, pSeKeyBuf + i, &curInfo.winInfo); setEventWindowFlag(pAggSup, &curInfo); + if (!curInfo.pWinFlag->startFlag || curInfo.pWinFlag->endFlag) { + continue; + } + compactEventWindow(pOperator, &curInfo, pInfo->pSeUpdated, pInfo->pSeDeleted, false); qDebug("===stream=== reload state. save result %" PRId64 ", %" PRIu64, curInfo.winInfo.sessionWin.win.skey, curInfo.winInfo.sessionWin.groupId); - if (isWindowIncomplete(&curInfo)) { - if (curInfo.winInfo.isOutput) { - ASSERT(0); - saveDeleteRes(pInfo->pSeDeleted, curInfo.winInfo.sessionWin); - } - continue; - } if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE) { saveResult(curInfo.winInfo, pInfo->pSeUpdated); @@ -694,6 +699,13 @@ SOperatorInfo* createStreamEventAggOperatorInfo(SOperatorInfo* downstream, SPhys pInfo->isHistoryOp = pHandle->fillHistory; } + if (pInfo->isHistoryOp) { + _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); + pInfo->pAllUpdated = tSimpleHashInit(64, hashFn); + } else { + pInfo->pAllUpdated = NULL; + } + pInfo->pCheckpointRes = createSpecialDataBlock(STREAM_CHECKPOINT); pInfo->reCkBlock = false; diff --git a/tests/script/tsim/stream/event2.sim b/tests/script/tsim/stream/event2.sim new file mode 100644 index 0000000000..eb9fca46e6 --- /dev/null +++ b/tests/script/tsim/stream/event2.sim @@ -0,0 +1,84 @@ +system sh/stop_dnodes.sh +system sh/deploy.sh -n dnode1 -i 1 +system sh/exec.sh -n dnode1 -s start +sleep 50 +sql connect + +print step1 +print =============== create database test +sql create database test vgroups 1; +sql use test; + +sql create stable st(ts timestamp, a int, b int , c int, d double) tags(ta int,tb int,tc int); +sql create table t1 using st tags(1,1,1); +sql create table t2 using st tags(2,2,2); +sql create table t3 using st tags(3,3,3); +sql create table t4 using st tags(3,3,3); + +sql insert into t1 values(1648791223000,0,1,1,1.0); +sql insert into t1 values(1648791233000,0,2,2,2.0); +sql insert into t1 values(1648791243000,1,3,3,3.0); + +sql insert into t2 values(1648791223000,0,1,4,3.0); +sql insert into t2 values(1648791233000,0,2,5,1.0); +sql insert into t2 values(1648791243000,1,3,6,2.0); + +sql insert into t3 values(1648791223000,1,1,7,3.0); +sql insert into t3 values(1648791233000,1,2,8,1.0); +sql insert into t3 values(1648791243000,1,3,9,2.0); + +sql insert into t4 values(1648791223000,1,1,10,3.0); +sql insert into t4 values(1648791233000,0,2,11,1.0); +sql insert into t4 values(1648791243000,1,9,12,2.0); + +sql create stream streams0 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 fill_history 1 into streamt0 as select _wstart as s, count(*) c1, sum(b), max(c), _wend as e from st partition by tbname event_window start with a = 0 end with b = 9; + +sleep 1000 + +sql insert into t1 values(1648791253000,1,9,13,2.0); +sql insert into t2 values(1648791253000,1,9,14,2.0); +sql insert into t3 values(1648791253000,1,9,15,2.0); +sql insert into t4 values(1648791253000,1,9,16,2.0); + +$loop_count = 0 +loop0: + +sleep 300 +print 1 sql select * from streamt0 order by 1, 2, 3, 4; +sql select * from streamt0 order by 1, 2, 3, 4; + +print +print $data00 $data01 $data02 $data03 $data04 +print $data10 $data11 $data12 $data13 $data14 +print $data20 $data21 $data22 $data23 $data24 +print $data30 $data31 $data32 $data33 $data34 +print + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +if $rows != 3 then + print ======rows=$rows + goto loop0 +endi + +if $data01 != 4 then + print ======data01=$data01 + goto loop0 +endi + +if $data11 != 4 then + print ======data11=$data11 + goto loop0 +endi + +if $data21 != 2 then + print ======data21=$data21 + goto loop0 +endi + + +print event1 end +system sh/exec.sh -n dnode1 -s stop -x SIGINT diff --git a/tests/script/tsim/stream/eventtmp.sim b/tests/script/tsim/stream/eventtmp.sim new file mode 100644 index 0000000000..52ca4fd9ef --- /dev/null +++ b/tests/script/tsim/stream/eventtmp.sim @@ -0,0 +1,61 @@ +system sh/stop_dnodes.sh +system sh/deploy.sh -n dnode1 -i 1 +system sh/exec.sh -n dnode1 -s start +sleep 50 +sql connect + +print step1 +print =============== create database test +sql create database test vgroups 1; +sql use test; + +sql create stable st(ts timestamp, a int, b int , c int, d double) tags(ta int,tb int,tc int); +sql create table t1 using st tags(1,1,1); +sql create table t2 using st tags(2,2,2); +sql create table t3 using st tags(3,3,3); +sql create table t4 using st tags(3,3,3); + + +sql insert into t4 values(1648791223000,1,1,10,3.0); +sql insert into t4 values(1648791233000,0,2,11,1.0); +sql insert into t4 values(1648791243000,1,9,12,2.0); + +sql create stream streams0 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 fill_history 1 into streamt0 as select _wstart as s, count(*) c1, sum(b), max(c), _wend as e from st partition by tbname event_window start with a = 0 end with b = 9; + +sleep 1000 + +sql insert into t4 values(1648791253000,1,9,16,2.0); + +$loop_count = 0 +loop0: + +sleep 300 +print 1 sql select * from streamt0 order by 1, 2, 3, 4; +sql select * from streamt0 order by 1, 2, 3, 4; + +print +print $data00 $data01 $data02 $data03 $data04 +print $data10 $data11 $data12 $data13 $data14 +print $data20 $data21 $data22 $data23 $data24 +print $data30 $data31 $data32 $data33 $data34 +print + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +if $rows != 1 then + print ======rows=$rows + goto loop0 +endi + +if $data01 != 2 then + print ======data01=$data01 + goto loop0 +endi + + + +print event1 end +system sh/exec.sh -n dnode1 -s stop -x SIGINT