From 34ff1322e4029fed522df63ce6bb5b19538adf16 Mon Sep 17 00:00:00 2001 From: liuyao <54liuyao@163.com> Date: Mon, 10 Apr 2023 16:56:24 +0800 Subject: [PATCH 1/2] op final window --- source/libs/executor/src/timewindowoperator.c | 55 ++-- .../tsim/stream/distributeInterval0.sim | 238 ++++++++++++++++-- 2 files changed, 227 insertions(+), 66 deletions(-) diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index d9da8e076c..f4205455b4 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -2172,7 +2172,7 @@ bool isDeletedWindow(STimeWindow* pWin, uint64_t groupId, SAggSupporter* pSup) { } bool isDeletedStreamWindow(STimeWindow* pWin, uint64_t groupId, SStreamState* pState, STimeWindowAggSupp* pTwSup) { - if (pWin->ekey < pTwSup->maxTs - pTwSup->deleteMark) { + if (pTwSup->maxTs != INT64_MIN && pWin->ekey < pTwSup->maxTs - pTwSup->deleteMark) { SWinKey key = {.ts = pWin->skey, .groupId = groupId}; if (streamStateCheck(pState, &key)) { return true; @@ -2279,17 +2279,18 @@ static void addRetriveWindow(SArray* wins, SStreamIntervalOperatorInfo* pInfo) { for (int32_t i = 0; i < size; i++) { SWinKey* winKey = taosArrayGet(wins, i); STimeWindow nextWin = getFinalTimeWindow(winKey->ts, &pInfo->interval); - if (needDeleteWindowBuf(&nextWin, &pInfo->twAggSup) && !pInfo->ignoreExpiredData) { - void* chIds = taosHashGet(pInfo->pPullDataMap, winKey, sizeof(SWinKey)); - if (!chIds) { - SPullWindowInfo pull = { - .window = nextWin, .groupId = winKey->groupId, .calWin.skey = nextWin.skey, .calWin.ekey = nextWin.skey}; - // add pull data request - if (savePullWindow(&pull, pInfo->pPullWins) == TSDB_CODE_SUCCESS) { - int32_t size1 = taosArrayGetSize(pInfo->pChildren); - addPullWindow(pInfo->pPullDataMap, winKey, size1); - qDebug("===stream===prepare retrive for delete %" PRId64 ", size:%d", winKey->ts, size1); - } + if (isOverdue(nextWin.ekey, &pInfo->twAggSup) && pInfo->ignoreExpiredData) { + continue; + } + void* chIds = taosHashGet(pInfo->pPullDataMap, winKey, sizeof(SWinKey)); + if (!chIds) { + SPullWindowInfo pull = { + .window = nextWin, .groupId = winKey->groupId, .calWin.skey = nextWin.skey, .calWin.ekey = nextWin.skey}; + // add pull data request + if (savePullWindow(&pull, pInfo->pPullWins) == TSDB_CODE_SUCCESS) { + int32_t size1 = taosArrayGetSize(pInfo->pChildren); + addPullWindow(pInfo->pPullDataMap, winKey, size1); + qDebug("===stream===prepare retrive for delete %" PRId64 ", size:%d", winKey->ts, size1); } } } @@ -2450,14 +2451,14 @@ static void doStreamIntervalAggImpl(SOperatorInfo* pOperatorInfo, SSDataBlock* p continue; } - if (IS_FINAL_OP(pInfo) && isClosed && pInfo->pChildren) { + if (IS_FINAL_OP(pInfo) && pInfo->pChildren) { bool ignore = true; SWinKey winRes = { .ts = nextWin.skey, .groupId = groupId, }; void* chIds = taosHashGet(pInfo->pPullDataMap, &winRes, sizeof(SWinKey)); - if (isDeletedStreamWindow(&nextWin, groupId, pInfo->pState, &pInfo->twAggSup) && !chIds) { + if (isDeletedStreamWindow(&nextWin, groupId, pInfo->pState, &pInfo->twAggSup) && isClosed && !chIds) { SPullWindowInfo pull = { .window = nextWin, .groupId = groupId, .calWin.skey = nextWin.skey, .calWin.ekey = nextWin.skey}; // add pull data request @@ -2654,12 +2655,6 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { SArray* delWins = taosArrayInit(8, sizeof(SWinKey)); doDeleteWindows(pOperator, &pInfo->interval, pBlock, delWins, pInfo->pUpdatedMap); if (IS_FINAL_OP(pInfo)) { - int32_t childIndex = getChildIndex(pBlock); - SOperatorInfo* pChildOp = taosArrayGetP(pInfo->pChildren, childIndex); - SStreamIntervalOperatorInfo* pChildInfo = pChildOp->info; - SExprSupp* pChildSup = &pChildOp->exprSupp; - doDeleteWindows(pChildOp, &pChildInfo->interval, pBlock, NULL, NULL); - rebuildIntervalWindow(pOperator, delWins, pInfo->pUpdatedMap); addRetriveWindow(delWins, pInfo); taosArrayAddAll(pInfo->pDelWins, delWins); taosArrayDestroy(delWins); @@ -2701,25 +2696,6 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { } setInputDataBlock(pSup, pBlock, TSDB_ORDER_ASC, MAIN_SCAN, true); doStreamIntervalAggImpl(pOperator, pBlock, pBlock->info.id.groupId, pInfo->pUpdatedMap); - if (IS_FINAL_OP(pInfo)) { - int32_t chIndex = getChildIndex(pBlock); - int32_t size = taosArrayGetSize(pInfo->pChildren); - // if chIndex + 1 - size > 0, add new child - for (int32_t i = 0; i < chIndex + 1 - size; i++) { - SOperatorInfo* pChildOp = createStreamFinalIntervalOperatorInfo(NULL, pInfo->pPhyNode, pOperator->pTaskInfo, 0); - if (!pChildOp) { - T_LONG_JMP(pOperator->pTaskInfo->env, TSDB_CODE_OUT_OF_MEMORY); - } - SStreamIntervalOperatorInfo* pTmpInfo = pChildOp->info; - pTmpInfo->twAggSup.calTrigger = STREAM_TRIGGER_AT_ONCE; - taosArrayPush(pInfo->pChildren, &pChildOp); - qDebug("===stream===add child, id:%d", chIndex); - } - SOperatorInfo* pChildOp = taosArrayGetP(pInfo->pChildren, chIndex); - SStreamIntervalOperatorInfo* pChInfo = pChildOp->info; - setInputDataBlock(&pChildOp->exprSupp, pBlock, TSDB_ORDER_ASC, MAIN_SCAN, true); - doStreamIntervalAggImpl(pChildOp, pBlock, pBlock->info.id.groupId, NULL); - } pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, pBlock->info.window.ekey); pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, pBlock->info.watermark); pInfo->twAggSup.minTs = TMIN(pInfo->twAggSup.minTs, pBlock->info.window.skey); @@ -2729,7 +2705,6 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { if (IS_FINAL_OP(pInfo)) { closeStreamIntervalWindow(pInfo->aggSup.pResultRowHashTable, &pInfo->twAggSup, &pInfo->interval, pInfo->pPullDataMap, pInfo->pUpdatedMap, pInfo->pDelWins, pOperator); - closeChildIntervalWindow(pOperator, pInfo->pChildren, pInfo->twAggSup.maxTs); } pInfo->binfo.pRes->info.watermark = pInfo->twAggSup.maxTs; diff --git a/tests/script/tsim/stream/distributeInterval0.sim b/tests/script/tsim/stream/distributeInterval0.sim index 1d58922928..bf05d59a81 100644 --- a/tests/script/tsim/stream/distributeInterval0.sim +++ b/tests/script/tsim/stream/distributeInterval0.sim @@ -6,32 +6,6 @@ system sh/exec.sh -n dnode1 -s start sleep 50 sql connect -sql create dnode $hostname2 port 7200 - -system sh/exec.sh -n dnode2 -s start - -print ===== step1 -$x = 0 -step1: - $x = $x + 1 - sleep 1000 - if $x == 10 then - print ====> dnode not ready! - return -1 - endi -sql select * from information_schema.ins_dnodes -print ===> $data00 $data01 $data02 $data03 $data04 $data05 -print ===> $data10 $data11 $data12 $data13 $data14 $data15 -if $rows != 2 then - return -1 -endi -if $data(1)[4] != ready then - goto step1 -endi -if $data(2)[4] != ready then - goto step1 -endi - print ===== step2 sql drop stream if exists stream_t1; sql drop database if exists test; @@ -58,6 +32,28 @@ sql insert into ts2 values(1648791213002,NULL,NULL,NULL,NULL); sql insert into ts3 values(1648791213002,NULL,NULL,NULL,NULL); sql insert into ts4 values(1648791213002,NULL,NULL,NULL,NULL); +$loop_count = 0 +loop0: + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +sleep 300 +print 1 select * from streamtST1; +sql select * from streamtST1; + +if $rows != 1 then + print =====rows=$rows + goto loop0 +endi + +if $data01 != 8 then + print =====data01=$data01 + goto loop0 +endi + sql insert into ts1 values(1648791223002,2,2,3,1.1); sql insert into ts1 values(1648791233003,3,2,3,2.1); sql insert into ts2 values(1648791243004,4,2,43,73.1); @@ -66,10 +62,162 @@ sql insert into ts1 values(1648791243005,4,20,3,3.1); sql insert into ts2 values(1648791243006,4,2,3,3.1) (1648791243007,4,2,3,3.1) ; sql insert into ts1 values(1648791243008,4,2,30,3.1) (1648791243009,4,2,3,3.1) (1648791243010,4,2,3,3.1) ; sql insert into ts2 values(1648791243011,4,2,3,3.1) (1648791243012,34,32,33,3.1) (1648791243013,4,2,3,3.1) (1648791243014,4,2,13,3.1); + +$loop_count = 0 +loop01: + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +sleep 300 +print 2 select * from streamtST1; +sql select * from streamtST1; + +if $rows != 4 then + print =====rows=$rows + goto loop01 +endi + +if $data01 != 8 then + print =====data01=$data01 + goto loop01 +endi + +if $data11 != 1 then + print =====data11=$data11 + goto loop01 +endi + +if $data21 != 1 then + print =====data21=$data21 + goto loop01 +endi + +if $data31 != 11 then + print =====data31=$data31 + goto loop01 +endi + sql insert into ts1 values(1648791243005,4,42,3,3.1) (1648791243003,4,2,33,3.1) (1648791243006,4,2,3,3.1) (1648791213001,1,52,13,1.0) (1648791223001,22,22,83,1.1) ; + +$loop_count = 0 +loop011: + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +sleep 300 +print 3 select * from streamtST1; +sql select * from streamtST1; + +if $rows != 4 then + print =====rows=$rows + goto loop011 +endi + +if $data01 != 8 then + print =====data01=$data01 + goto loop011 +endi + +if $data11 != 2 then + print =====data11=$data11 + goto loop011 +endi + +if $data21 != 1 then + print =====data21=$data21 + goto loop011 +endi + +if $data31 != 13 then + print =====data31=$data31 + goto loop011 +endi + sql insert into ts2 values(1648791243005,4,42,3,3.1) (1648791243003,4,2,33,3.1) (1648791243006,4,2,3,3.1) (1648791213001,1,52,13,1.0) (1648791223001,22,22,83,1.1) (1648791233004,13,12,13,2.1) ; + +$loop_count = 0 +loop02: + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +sleep 300 +print 4 select * from streamtST1; +sql select * from streamtST1; + +if $rows != 4 then + print =====rows=$rows + goto loop02 +endi + +if $data01 != 8 then + print =====data01=$data01 + goto loop02 +endi + +if $data11 != 3 then + print =====data11=$data11 + goto loop02 +endi + +if $data21 != 2 then + print =====data21=$data21 + goto loop02 +endi + +if $data31 != 15 then + print =====data31=$data31 + goto loop02 +endi + + sql insert into ts1 values(1648791243006,4,2,3,3.1) (1648791213001,1,52,13,1.0) (1648791223001,22,22,83,1.1) ; +$loop_count = 0 +loop03: + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +sleep 300 +print 5 select * from streamtST1; +sql select * from streamtST1; + +if $rows != 4 then + print =====rows=$rows + goto loop03 +endi + +if $data01 != 8 then + print =====data01=$data01 + goto loop03 +endi + +if $data11 != 3 then + print =====data11=$data11 + goto loop03 +endi + +if $data21 != 2 then + print =====data21=$data21 + goto loop03 +endi + +if $data31 != 15 then + print =====data31=$data31 + goto loop03 +endi + sql insert into ts3 values(1648791223002,2,2,3,1.1); sql insert into ts4 values(1648791233003,3,2,3,2.1); sql insert into ts3 values(1648791243004,4,2,43,73.1); @@ -79,6 +227,44 @@ sql insert into ts4 values(1648791243006,4,2,3,3.1) (1648791243007,4,2,3,3.1) ; sql insert into ts3 values(1648791243008,4,2,30,3.1) (1648791243009,4,2,3,3.1) (1648791243010,4,2,3,3.1) ; sql insert into ts4 values(1648791243011,4,2,3,3.1) (1648791243012,34,32,33,3.1) (1648791243013,4,2,3,3.1) (1648791243014,4,2,13,3.1); sql insert into ts3 values(1648791243005,4,42,3,3.1) (1648791243003,4,2,33,3.1) (1648791243006,4,2,3,3.1) (1648791213001,1,52,13,1.0) (1648791223001,22,22,83,1.1) ; + +$loop_count = 0 +loop04: + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +sleep 300 +print 6 select * from streamtST1; +sql select * from streamtST1; + +if $rows != 4 then + print =====rows=$rows + goto loop04 +endi + +if $data01 != 8 then + print =====data01=$data01 + goto loop04 +endi + +if $data11 != 5 then + print =====data11=$data11 + goto loop04 +endi + +if $data21 != 3 then + print =====data21=$data21 + goto loop04 +endi + +if $data31 != 28 then + print =====data31=$data31 + goto loop04 +endi + sql insert into ts4 values(1648791243005,4,42,3,3.1) (1648791243003,4,2,33,3.1) (1648791243006,4,2,3,3.1) (1648791213001,1,52,13,1.0) (1648791223001,22,22,83,1.1) (1648791233004,13,12,13,2.1) ; sql insert into ts3 values(1648791243006,4,2,3,3.1) (1648791213001,1,52,13,1.0) (1648791223001,22,22,83,1.1) ; From 4c2d0b5f63e609bb98eb9620db73a9da3391fab3 Mon Sep 17 00:00:00 2001 From: liuyao <54liuyao@163.com> Date: Tue, 11 Apr 2023 19:09:32 +0800 Subject: [PATCH 2/2] feat:opt final window --- source/libs/executor/src/groupoperator.c | 2 +- source/libs/executor/src/scanoperator.c | 1 + source/libs/executor/src/timewindowoperator.c | 8 +- .../tsim/stream/distributeInterval0.sim | 144 ++++++++++++++++++ 4 files changed, 153 insertions(+), 2 deletions(-) diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index 9537a76bd6..d7fce3a0b8 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -1111,7 +1111,7 @@ static SSDataBlock* doStreamHashPartition(SOperatorInfo* pOperator) { return pInfo->pDelRes; } break; default: - ASSERTS(pBlock->info.type == STREAM_CREATE_CHILD_TABLE, "invalid SSDataBlock type"); + ASSERTS(pBlock->info.type == STREAM_CREATE_CHILD_TABLE || pBlock->info.type == STREAM_RETRIEVE, "invalid SSDataBlock type"); return pBlock; } diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index b8d293bdf6..ccdf0b6fa1 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1897,6 +1897,7 @@ FETCH_NEXT_BLOCK: pInfo->blockType = STREAM_INPUT__DATA_SUBMIT; pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RETRIEVE; copyDataBlock(pInfo->pUpdateRes, pBlock); + pInfo->updateResIndex = 0; prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex); updateInfoAddCloseWindowSBF(pInfo->pUpdateInfo); } break; diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index f4205455b4..7a23203e72 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -1364,8 +1364,13 @@ static void doDeleteWindows(SOperatorInfo* pOperator, SInterval* pInterval, SSDa continue; } uint64_t winGpId = pGpDatas[i]; - bool res = doDeleteWindow(pOperator, win.skey, winGpId); SWinKey winRes = {.ts = win.skey, .groupId = winGpId}; + void* chIds = taosHashGet(pInfo->pPullDataMap, &winRes, sizeof(SWinKey)); + if (chIds) { + getNextTimeWindow(pInterval, pInterval->precision, TSDB_ORDER_ASC, &win); + continue; + } + bool res = doDeleteWindow(pOperator, win.skey, winGpId); if (pUpWins && res) { taosArrayPush(pUpWins, &winRes); } @@ -2615,6 +2620,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { streamStateCommit(pInfo->pState); pInfo->twAggSup.checkPointTs = pInfo->twAggSup.maxTs; } + qDebug("===stream===interval final close"); } return NULL; } else { diff --git a/tests/script/tsim/stream/distributeInterval0.sim b/tests/script/tsim/stream/distributeInterval0.sim index bf05d59a81..1cec299ccd 100644 --- a/tests/script/tsim/stream/distributeInterval0.sim +++ b/tests/script/tsim/stream/distributeInterval0.sim @@ -6,6 +6,32 @@ system sh/exec.sh -n dnode1 -s start sleep 50 sql connect +sql create dnode $hostname2 port 7200 + +system sh/exec.sh -n dnode2 -s start + +print ===== step1 +$x = 0 +step1: + $x = $x + 1 + sleep 1000 + if $x == 10 then + print ====> dnode not ready! + return -1 + endi +sql select * from information_schema.ins_dnodes +print ===> $data00 $data01 $data02 $data03 $data04 $data05 +print ===> $data10 $data11 $data12 $data13 $data14 $data15 +if $rows != 2 then + return -1 +endi +if $data(1)[4] != ready then + goto step1 +endi +if $data(2)[4] != ready then + goto step1 +endi + print ===== step2 sql drop stream if exists stream_t1; sql drop database if exists test; @@ -458,4 +484,122 @@ if $data12 != 2 then goto loop3 endi +print ===== step3 + +sql drop database if exists test4; +sql create database test4 vgroups 10; +sql use test4; +sql create stable st(ts timestamp,a int,b int,c varchar(250) ) tags(ta int,tb int,tc int); +sql create table aaa using st tags(1,1,1); +sql create table bbb using st tags(2,2,2); +sql create table ccc using st tags(3,2,2); +sql create table ddd using st tags(4,2,2); + + +sql create stream streams1 ignore expired 0 fill_history 0 watermark 3s into streamst subtable(c) as select _wstart, c , count(*) c1, last_row(b) c2 from st partition by c interval(1s) ; + +sql insert into aaa values(1648791221001,2,2,"/a1/aa/aa"); +sql insert into bbb values(1648791221001,2,2,"/a1/aa/aa"); +sql insert into ccc values(1648791221001,2,2,"/a1/aa/aa"); +sql insert into ddd values(1648791221001,2,2,"/a1/aa/aa"); + +sql insert into aaa values(1648791222002,2,2,"/a2/aa/aa"); +sql insert into bbb values(1648791222002,2,2,"/a2/aa/aa"); +sql insert into ccc values(1648791222002,2,2,"/a2/aa/aa"); +sql insert into ddd values(1648791222002,2,2,"/a2/aa/aa"); + +sql insert into aaa values(1648791223003,2,2,"/a3/aa/aa"); +sql insert into bbb values(1648791223003,2,2,"/a3/aa/aa"); +sql insert into ccc values(1648791223003,2,2,"/a3/aa/aa"); +sql insert into ddd values(1648791223003,2,2,"/a3/aa/aa"); + +sql insert into aaa values(1648791224003,2,2,"/a4/aa/aa"); +sql insert into bbb values(1648791224003,2,2,"/a4/aa/aa"); +sql insert into ccc values(1648791224003,2,2,"/a4/aa/aa"); +sql insert into ddd values(1648791224003,2,2,"/a4/aa/aa"); + + +sql insert into aaa values(1648791225003,2,2,"/a5/aa/aa"); +sql insert into bbb values(1648791225003,2,2,"/a5/aa/aa"); +sql insert into ccc values(1648791225003,2,2,"/a5/aa/aa"); +sql insert into ddd values(1648791225003,2,2,"/a5/aa/aa"); + +sql insert into aaa values(1648791226003,2,2,"/a6/aa/aa"); +sql insert into bbb values(1648791226003,2,2,"/a6/aa/aa"); +sql insert into ccc values(1648791226003,2,2,"/a6/aa/aa"); +sql insert into ddd values(1648791226003,2,2,"/a6/aa/aa"); + +$loop_count = 0 + +loop4: +sleep 200 + +$loop_count = $loop_count + 1 +if $loop_count == 20 then + return -1 +endi + +sql select * from streamst; + +if $rows == 0 then + goto loop4 +endi + +sql delete from aaa where ts = 1648791223003 ; + +$loop_count = 0 + +loop5: +sleep 200 + +$loop_count = $loop_count + 1 +if $loop_count == 20 then + return -1 +endi + +sql select * from streamst; + +if $rows == 0 then + goto loop5 +endi + + +sql delete from ccc; + +$loop_count = 0 + +loop6: +sleep 200 + +$loop_count = $loop_count + 1 +if $loop_count == 20 then + return -1 +endi + +sql select * from streamst; + +if $rows == 0 then + goto loop6 +endi + +sql delete from ddd; + +$loop_count = 0 + +loop7: +sleep 200 + +$loop_count = $loop_count + 1 +if $loop_count == 20 then + return -1 +endi + +sql select * from streamst; + +if $rows == 0 then + goto loop7 +endi + +print ===== over + system sh/stop_dnodes.sh