From 9b9ddd9159f72e22892c6f012e78896655852381 Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao> Date: Tue, 6 Feb 2024 17:19:12 +0800 Subject: [PATCH] batch count window --- .../libs/executor/src/countwindowoperator.c | 160 ++++-- .../executor/src/streamcountwindowoperator.c | 3 +- tests/script/tsim/query/query_count0.sim | 179 ++++++ .../tsim/query/query_count_sliding0.sim | 544 ++++++++++++++++++ 4 files changed, 830 insertions(+), 56 deletions(-) create mode 100644 tests/script/tsim/query/query_count0.sim create mode 100644 tests/script/tsim/query/query_count_sliding0.sim diff --git a/source/libs/executor/src/countwindowoperator.c b/source/libs/executor/src/countwindowoperator.c index 8a8f43a5e7..3980e5ae4d 100644 --- a/source/libs/executor/src/countwindowoperator.c +++ b/source/libs/executor/src/countwindowoperator.c @@ -24,15 +24,27 @@ #include "tdatablock.h" #include "ttime.h" +typedef struct SCountWindowResult { + int32_t winRows; + SResultRow row; +} SCountWindowResult; + +typedef struct SCountWindowSupp { + SArray* pWinStates; + int32_t stateIndex; +} SCountWindowSupp; + typedef struct SCountWindowOperatorInfo { SOptrBasicInfo binfo; SAggSupporter aggSup; SExprSupp scalarSup; - SWindowRowsSup winSup; int32_t tsSlotId; // primary timestamp column slot id STimeWindowAggSupp twAggSup; uint64_t groupId; // current group id, used to identify the data block from different groups SResultRow* pRow; + int32_t windowCount; + int32_t windowSliding; + SCountWindowSupp countSup; } SCountWindowOperatorInfo; void destroyCountWindowOperatorInfo(void* param) { @@ -40,59 +52,103 @@ void destroyCountWindowOperatorInfo(void* param) { if (pInfo == NULL) { return; } - - if (pInfo->pRow != NULL) { - taosMemoryFree(pInfo->pRow); - } - cleanupBasicInfo(&pInfo->binfo); colDataDestroy(&pInfo->twAggSup.timeWindowData); cleanupAggSup(&pInfo->aggSup); cleanupExprSupp(&pInfo->scalarSup); + taosArrayDestroy(pInfo->countSup.pWinStates); taosMemoryFreeClear(param); } +static void clearWinStateBuff(SCountWindowResult* pBuff) { + pBuff->winRows = 0; +} + +static SCountWindowResult* getCountWinStateInfo(SCountWindowSupp* pCountSup) { + SCountWindowResult* pBuffInfo = taosArrayGet(pCountSup->pWinStates, pCountSup->stateIndex); + pCountSup->stateIndex = (pCountSup->stateIndex + 1) % taosArrayGetSize(pCountSup->pWinStates); + return pBuffInfo; +} + +static SCountWindowResult* setCountWindowOutputBuff(SExprSupp* pExprSup, SCountWindowSupp* pCountSup, SResultRow** pResult) { + SCountWindowResult* pBuff = getCountWinStateInfo(pCountSup); + (*pResult) = &pBuff->row; + setResultRowInitCtx(*pResult, pExprSup->pCtx, pExprSup->numOfExprs, pExprSup->rowEntryInfoOffset); + return pBuff; +} + +static int32_t updateCountWindowInfo(int32_t start, int32_t blockRows, int32_t countWinRows, int32_t* pCurrentRows) { + int32_t rows = TMIN(countWinRows - (*pCurrentRows), blockRows - start); + (*pCurrentRows) += rows; + return rows; +} + int32_t doCountWindowAggImpl(SOperatorInfo* pOperator, SSDataBlock* pBlock) { SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; - SExprSupp* pSup = &pOperator->exprSupp; + SExprSupp* pExprSup = &pOperator->exprSupp; SCountWindowOperatorInfo* pInfo = pOperator->info; SSDataBlock* pRes = pInfo->binfo.pRes; - int64_t groupId = pBlock->info.id.groupId; SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pInfo->tsSlotId); TSKEY* tsCols = (TSKEY*)pColInfoData->pData; - SWindowRowsSup* pRowSup = &pInfo->winSup; - int32_t rowIndex = 0; int32_t code = TSDB_CODE_SUCCESS; - int32_t step = 0; for (int32_t i = 0; i < pBlock->info.rows;) { - // todo(liuyao) 1.如果group id发生变化,获取新group id上一次的window的缓存,并把旧group id的信息存入缓存。 - // 没有sliding - // 只需要一个缓存即可 - // 1.如果group id发生变化,说明本group窗口全部结束,输出上次的缓存(这里需要判断缓存中是否有数据) - // 设置缓存 - // 2.计算 当前需要合并的行数 - // 3.做聚集计算。 - // 4.达到行数,将结果存入pInfo->res中。 + int32_t step = pInfo->windowSliding; + SCountWindowResult* pBuffInfo = setCountWindowOutputBuff(pExprSup, &pInfo->countSup, &pInfo->pRow); + int32_t prevRows = pBuffInfo->winRows; + int32_t num = updateCountWindowInfo(i, pBlock->info.rows, pInfo->windowCount, &pBuffInfo->winRows); + if (prevRows == 0) { + pInfo->pRow->win.skey = tsCols[i]; + } + pInfo->pRow->win.ekey = tsCols[num + i - 1]; - // 有sliding - // 缓存是一个队列 - // 1.如果group id发生变化,说明本group窗口全部结束,输出上次的缓存(这里需要判断缓存中是否有数据,可能输出多行) - // pInfo记录队列的起始位置 - // 2.计算 当前需要合并的行数 - // 3.做聚集计算。 - // 4.达到行数(pInfo记录队列的起始位置后移),将结果存入pInfo->res中。 + updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pInfo->pRow->win, 0); + applyAggFunctionOnPartialTuples(pTaskInfo, pExprSup->pCtx, &pInfo->twAggSup.timeWindowData, i, num, + pBlock->info.rows, pExprSup->numOfExprs); + if (pBuffInfo->winRows == pInfo->windowCount) { + doUpdateNumOfRows(pExprSup->pCtx, pInfo->pRow, pExprSup->numOfExprs, pExprSup->rowEntryInfoOffset); + copyResultrowToDataBlock(pExprSup->pExprInfo, pExprSup->numOfExprs, pInfo->pRow, pExprSup->pCtx, pRes, + pExprSup->rowEntryInfoOffset, pTaskInfo); + pRes->info.rows += pInfo->pRow->numOfRows; + clearWinStateBuff(pBuffInfo); + clearResultRowInitFlag(pExprSup->pCtx, pExprSup->numOfExprs); + } + if (pInfo->windowCount != pInfo->windowSliding) { + if (prevRows <= pInfo->windowSliding) { + if (pBuffInfo->winRows > pInfo->windowSliding) { + step = pInfo->windowSliding - prevRows; + } + } else { + step = 0; + } + } i += step; } return code; } +static void buildCountResult(SExprSupp* pExprSup, SCountWindowSupp* pCountSup, SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock) { + SResultRow* pResultRow = NULL; + for (int32_t i = 0; i < taosArrayGetSize(pCountSup->pWinStates); i++) { + SCountWindowResult* pBuff = setCountWindowOutputBuff(pExprSup, pCountSup, &pResultRow); + if (pBuff->winRows == 0) { + continue;; + } + doUpdateNumOfRows(pExprSup->pCtx, pResultRow, pExprSup->numOfExprs, pExprSup->rowEntryInfoOffset); + copyResultrowToDataBlock(pExprSup->pExprInfo, pExprSup->numOfExprs, pResultRow, pExprSup->pCtx, pBlock, + pExprSup->rowEntryInfoOffset, pTaskInfo); + pBlock->info.rows += pResultRow->numOfRows; + clearWinStateBuff(pBuff); + clearResultRowInitFlag(pExprSup->pCtx, pExprSup->numOfExprs); + } +} + static SSDataBlock* countWindowAggregate(SOperatorInfo* pOperator) { SCountWindowOperatorInfo* pInfo = pOperator->info; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; - SExprSupp* pSup = &pOperator->exprSupp; + SExprSupp* pExprSup = &pOperator->exprSupp; int32_t order = pInfo->binfo.inputTsOrder; SSDataBlock* pRes = pInfo->binfo.pRes; SOperatorInfo* downstream = pOperator->pDownstream[0]; @@ -106,7 +162,7 @@ static SSDataBlock* countWindowAggregate(SOperatorInfo* pOperator) { } pRes->info.scanFlag = pBlock->info.scanFlag; - setInputDataBlock(pSup, pBlock, order, MAIN_SCAN, true); + setInputDataBlock(pExprSup, pBlock, order, MAIN_SCAN, true); blockDataUpdateTsWindow(pBlock, pInfo->tsSlotId); // there is an scalar expression that needs to be calculated right before apply the group aggregation. @@ -118,12 +174,20 @@ static SSDataBlock* countWindowAggregate(SOperatorInfo* pOperator) { } } + if (pInfo->groupId == 0) { + pInfo->groupId = pBlock->info.id.groupId; + } else if (pInfo->groupId != pBlock->info.id.groupId) { + buildCountResult(pExprSup, &pInfo->countSup, pTaskInfo, pRes); + pInfo->groupId = pBlock->info.id.groupId; + } + doCountWindowAggImpl(pOperator, pBlock); if (pRes->info.rows >= pOperator->resultInfo.threshold) { return pRes; } } + buildCountResult(pExprSup, &pInfo->countSup, pTaskInfo, pRes); return pRes->info.rows == 0 ? NULL : pRes; } @@ -149,13 +213,7 @@ SOperatorInfo* createCountwindowOperatorInfo(SOperatorInfo* downstream, SPhysiNo } } - code = filterInitFromNode((SNode*)pCountWindowNode->window.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0); - if (code != TSDB_CODE_SUCCESS) { - goto _error; - } - - size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; - + size_t keyBufSize = 0; int32_t num = 0; SExprInfo* pExprInfo = createExprInfo(pCountWindowNode->window.pFuncs, NULL, &num); initResultSizeInfo(&pOperator->resultInfo, 4096); @@ -173,9 +231,20 @@ SOperatorInfo* createCountwindowOperatorInfo(SOperatorInfo* downstream, SPhysiNo initResultRowInfo(&pInfo->binfo.resultRowInfo); pInfo->binfo.inputTsOrder = physiNode->inputTsOrder; pInfo->binfo.outputTsOrder = physiNode->outputTsOrder; + pInfo->windowCount = pCountWindowNode->windowCount; + pInfo->windowSliding = pCountWindowNode->windowSliding; + //sizeof(SCountWindowResult) + int32_t itemSize = sizeof(int32_t) + pInfo->aggSup.resultRowSize; + int32_t numOfItem = 1; + if (pInfo->windowCount != pInfo->windowSliding) { + numOfItem = pInfo->windowCount / pInfo->windowSliding + 1; + } + pInfo->countSup.pWinStates = taosArrayInit_s(itemSize, numOfItem); + if (!pInfo->countSup.pWinStates) { + goto _error; + } - pInfo->twAggSup = (STimeWindowAggSupp){.waterMark = pCountWindowNode->window.watermark, - .calTrigger = pCountWindowNode->window.triggerType}; + pInfo->countSup.stateIndex = 0; initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window); @@ -200,20 +269,3 @@ _error: pTaskInfo->code = code; return NULL; } - - - -static int32_t setSingleOutputTupleBufv1(SResultRowInfo* pResultRowInfo, STimeWindow* win, SResultRow** pResult, - SExprSupp* pExprSup, SAggSupporter* pAggSup) { - if (*pResult == NULL) { - SResultRow* p = taosMemoryCalloc(1, pAggSup->resultRowSize); - pResultRowInfo->cur = (SResultRowPosition){.pageId = p->pageId, .offset = p->offset}; - *pResult = p; - } - - (*pResult)->win = *win; - - clearResultRowInitFlag(pExprSup->pCtx, pExprSup->numOfExprs); - setResultRowInitCtx(*pResult, pExprSup->pCtx, pExprSup->numOfExprs, pExprSup->rowEntryInfoOffset); - return TSDB_CODE_SUCCESS; -} diff --git a/source/libs/executor/src/streamcountwindowoperator.c b/source/libs/executor/src/streamcountwindowoperator.c index 440cfe67d8..706b4c5a01 100644 --- a/source/libs/executor/src/streamcountwindowoperator.c +++ b/source/libs/executor/src/streamcountwindowoperator.c @@ -115,7 +115,7 @@ void setCountOutputBuf(SStreamAggSupporter* pAggSup, TSKEY ts, uint64_t groupId, } } -int32_t updateCountWindowInfo(SStreamAggSupporter* pAggSup, SCountWindowInfo* pWinInfo, TSKEY* pTs, int32_t start, int32_t rows, int32_t maxRows, +static int32_t updateCountWindowInfo(SStreamAggSupporter* pAggSup, SCountWindowInfo* pWinInfo, TSKEY* pTs, int32_t start, int32_t rows, int32_t maxRows, SSHashObj* pStDeleted, bool* pRebuild) { SSessionKey sWinKey = pWinInfo->winInfo.sessionWin; int32_t num = 0; @@ -290,7 +290,6 @@ static void doStreamCountAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl buffInfo.winBuffOp = MOVE_NEXT_WINDOW; winRows = 0; } - slidingRows = (slidingRows + winRows) % pAggSup->windowSliding; } i += winRows; } diff --git a/tests/script/tsim/query/query_count0.sim b/tests/script/tsim/query/query_count0.sim new file mode 100644 index 0000000000..c3a75d635b --- /dev/null +++ b/tests/script/tsim/query/query_count0.sim @@ -0,0 +1,179 @@ +system sh/stop_dnodes.sh +system sh/deploy.sh -n dnode1 -i 1 +system sh/exec.sh -n dnode1 -s start +sleep 50 +sql connect + +print step1 +print =============== create database +sql create database test vgroups 1; +sql use test; + +sql create table t1(ts timestamp, a int, b int , c int, d double); + +sql insert into t1 values(1648791213000,0,1,1,1.0); +sql insert into t1 values(1648791213001,9,2,2,1.1); +sql insert into t1 values(1648791213009,0,3,3,1.0); + + +sql insert into t1 values(1648791223000,0,1,1,1.0); +sql insert into t1 values(1648791223001,9,2,2,1.1); +sql insert into t1 values(1648791223009,0,3,3,1.0); + +$loop_count = 0 +loop2: + +sleep 300 +print 1 sql select _wstart as s, count(*) c1, sum(b), max(c) from t1 count_window(3); +sql select _wstart as s, count(*) c1, sum(b), max(c) from t1 count_window(3); + +print $data00 $data01 $data02 $data03 +print $data10 $data11 $data12 $data13 + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +# row 0 +if $data01 != 3 then + print ======data01=$data01 + goto loop2 +endi + +if $data02 != 6 then + print ======data02=$data02 + goto loop2 +endi + +if $data03 != 3 then + print ======data03=$data03 + goto loop2 +endi + +# row 1 +if $data11 != 3 then + print ======data11=$data11 + goto loop2 +endi + +if $data12 != 6 then + print ======data12=$data12 + goto loop2 +endi + +if $data13 != 3 then + print ======data13=$data13 + goto loop2 +endi + + + +print step2 +print =============== create database +sql create database test2 vgroups 4; +sql use test2; + +sql create stable st(ts timestamp, a int, b int , c int, d double) tags(ta int,tb int,tc int); +sql create table t1 using st tags(1,1,1); +sql create table t2 using st tags(2,2,2); + +sql insert into t1 values(1648791213000,0,1,1,1.0); +sql insert into t1 values(1648791213001,9,2,2,1.1); +sql insert into t1 values(1648791213009,0,3,3,1.0); + +sql insert into t2 values(1648791213000,0,1,1,1.0); +sql insert into t2 values(1648791213001,9,2,2,1.1); +sql insert into t2 values(1648791213009,0,3,3,1.0); + +sql insert into t1 values(1648791223000,0,1,1,1.0); +sql insert into t1 values(1648791223001,9,2,2,1.1); +sql insert into t1 values(1648791223009,0,3,3,1.0); + +sql insert into t2 values(1648791223000,0,1,1,1.0); +sql insert into t2 values(1648791223001,9,2,2,1.1); +sql insert into t2 values(1648791223009,0,3,3,1.0); + +$loop_count = 0 +loop3: + +sleep 300 +print 1 sql select _wstart as s, count(*) c1, sum(b), max(c) from st partition by tbname count_window(3); +sql select _wstart as s, count(*) c1, sum(b), max(c) from st partition by tbname count_window(3); + +print $data00 $data01 $data02 $data03 +print $data10 $data11 $data12 $data13 +print $data20 $data21 $data22 $data23 +print $data30 $data31 $data32 $data33 + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +# row 0 +if $data01 != 3 then + print ======data01=$data01 + goto loop3 +endi + +if $data02 != 6 then + print ======data02=$data02 + goto loop3 +endi + +if $data03 != 3 then + print ======data03=$data03 + goto loop3 +endi + +# row 1 +if $data11 != 3 then + print ======data11=$data11 + goto loop3 +endi + +if $data12 != 6 then + print ======data12=$data12 + goto loop3 +endi + +if $data13 != 3 then + print ======data13=$data13 + goto loop3 +endi + +# row 2 +if $data21 != 3 then + print ======data21=$data21 + goto loop3 +endi + +if $data22 != 6 then + print ======data22=$data22 + goto loop3 +endi + +if $data23 != 3 then + print ======data23=$data23 + goto loop3 +endi + +# row 3 +if $data31 != 3 then + print ======data31=$data31 + goto loop3 +endi + +if $data32 != 6 then + print ======data32=$data32 + goto loop3 +endi + +if $data33 != 3 then + print ======data33=$data33 + goto loop3 +endi + +print query_count0 end +system sh/exec.sh -n dnode1 -s stop -x SIGINT diff --git a/tests/script/tsim/query/query_count_sliding0.sim b/tests/script/tsim/query/query_count_sliding0.sim new file mode 100644 index 0000000000..2363259194 --- /dev/null +++ b/tests/script/tsim/query/query_count_sliding0.sim @@ -0,0 +1,544 @@ +system sh/stop_dnodes.sh +system sh/deploy.sh -n dnode1 -i 1 +system sh/exec.sh -n dnode1 -s start +sleep 50 +sql connect + +print step1 +print =============== create database +sql create database test vgroups 1; +sql use test; + +sql create table t1(ts timestamp, a int, b int , c int, d double); + +sql insert into t1 values(1648791213000,0,1,1,1.0); + +$loop_count = 0 +loop00: + +sleep 300 +print 00 sql select _wstart as s, count(*) c1, sum(b), max(c) from t1 count_window(4, 2); +sql select _wstart as s, count(*) c1, sum(b), max(c) from t1 count_window(4, 2); + +print $data00 $data01 $data02 $data03 +print $data10 $data11 $data12 $data13 +print $data20 $data21 $data22 $data23 +print $data30 $data31 $data32 $data33 + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +# row 0 +if $rows != 1 then + print ======rows=$rows + goto loop00 +endi + +# row 0 +if $data01 != 1 then + print ======data01=$data01 + goto loop00 +endi + +sql insert into t1 values(1648791213001,9,2,2,1.1); + +$loop_count = 0 +loop01: + +sleep 300 +print 01 sql select _wstart as s, count(*) c1, sum(b), max(c) from t1 count_window(4, 2); +sql select _wstart as s, count(*) c1, sum(b), max(c) from t1 count_window(4, 2); + +print $data00 $data01 $data02 $data03 +print $data10 $data11 $data12 $data13 +print $data20 $data21 $data22 $data23 +print $data30 $data31 $data32 $data33 + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +# row 0 +if $rows != 1 then + print ======rows=$rows + goto loop01 +endi + +# row 0 +if $data01 != 2 then + print ======data01=$data01 + goto loop01 +endi + + +sql insert into t1 values(1648791213002,0,3,3,1.0); + +$loop_count = 0 +loop02: + +sleep 300 +print 02 sql select _wstart as s, count(*) c1, sum(b), max(c) from t1 count_window(4, 2); +sql select _wstart as s, count(*) c1, sum(b), max(c) from t1 count_window(4, 2); + +print $data00 $data01 $data02 $data03 +print $data10 $data11 $data12 $data13 +print $data20 $data21 $data22 $data23 +print $data30 $data31 $data32 $data33 + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +# row 0 +if $rows != 2 then + print ======rows=$rows + goto loop02 +endi + +# row 0 +if $data01 != 3 then + print ======data01=$data01 + goto loop02 +endi + +# row 1 +if $data11 != 1 then + print ======data01=$data01 + goto loop02 +endi + +sql insert into t1 values(1648791213009,0,3,3,1.0); + +$loop_count = 0 +loop0: + +sleep 300 +print 1 sql select _wstart as s, count(*) c1, sum(b), max(c) from t1 count_window(4, 2); +sql select _wstart as s, count(*) c1, sum(b), max(c) from t1 count_window(4, 2); + +print $data00 $data01 $data02 $data03 +print $data10 $data11 $data12 $data13 +print $data20 $data21 $data22 $data23 +print $data30 $data31 $data32 $data33 + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +# row 0 +if $rows != 2 then + print ======rows=$rows + goto loop0 +endi + +# row 0 +if $data01 != 4 then + print ======data01=$data01 + goto loop0 +endi + +# row 1 +if $data11 != 2 then + print ======data11=$data11 + goto loop0 +endi + +sql insert into t1 values(1648791223000,0,1,1,1.0); +sql insert into t1 values(1648791223001,9,2,2,1.1); +sql insert into t1 values(1648791223002,9,2,2,1.1); +sql insert into t1 values(1648791223009,0,3,3,1.0); + +$loop_count = 0 +loop2: + +sleep 300 +print 1 sql select _wstart as s, count(*) c1, sum(b), max(c) from t1 count_window(4, 2); +sql select _wstart as s, count(*) c1, sum(b), max(c) from t1 count_window(4, 2); + +print $data00 $data01 $data02 $data03 +print $data10 $data11 $data12 $data13 +print $data20 $data21 $data22 $data23 +print $data30 $data31 $data32 $data33 + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +# row 0 +if $rows != 4 then + print ======rows=$rows + goto loop2 +endi + +# row 0 +if $data01 != 4 then + print ======data01=$data01 + goto loop2 +endi + +# row 1 +if $data11 != 4 then + print ======data11=$data11 + goto loop2 +endi + +# row 2 +if $data21 != 4 then + print ======data21=$data21 + goto loop2 +endi + +# row 3 +if $data31 != 2 then + print ======data31=$data31 + goto loop2 +endi + +sql insert into t1 values(1648791233000,0,1,1,1.0) (1648791233001,9,2,2,1.1) (1648791233002,9,2,2,1.1) (1648791233009,0,3,3,1.0); + +$loop_count = 0 +loop3: + +sleep 300 +print 1 sql select _wstart as s, count(*) c1, sum(b), max(c) from t1 count_window(4, 2); +sql select _wstart as s, count(*) c1, sum(b), max(c) from t1 count_window(4, 2); + +print $data00 $data01 $data02 $data03 +print $data10 $data11 $data12 $data13 +print $data20 $data21 $data22 $data23 +print $data30 $data31 $data32 $data33 +print $data40 $data41 $data42 $data43 +print $data50 $data51 $data52 $data53 +print $data60 $data61 $data62 $data63 +print $data70 $data71 $data72 $data73 + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +# row 0 +if $rows != 6 then + print ======rows=$rows + goto loop3 +endi + + +sql insert into t1 values(1648791243000,0,1,1,1.0) (1648791243001,9,2,2,1.1); + +$loop_count = 0 +loop4: + +sleep 300 +print 1 sql select _wstart as s, count(*) c1, sum(b), max(c) from t1 count_window(4, 2); +sql select _wstart as s, count(*) c1, sum(b), max(c) from t1 count_window(4, 2); + +print $data00 $data01 $data02 $data03 +print $data10 $data11 $data12 $data13 +print $data20 $data21 $data22 $data23 +print $data30 $data31 $data32 $data33 +print $data40 $data41 $data42 $data43 +print $data50 $data51 $data52 $data53 +print $data60 $data61 $data62 $data63 +print $data70 $data71 $data72 $data73 + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +# row 0 +if $rows != 7 then + print ======rows=$rows + goto loop4 +endi + +sql insert into t1 values(1648791253000,0,1,1,1.0) (1648791253001,9,2,2,1.1) (1648791253002,9,2,2,1.1); + +$loop_count = 0 +loop5: + +sleep 300 +print 1 sql select _wstart as s, count(*) c1, sum(b), max(c) from t1 count_window(4, 2); +sql select _wstart as s, count(*) c1, sum(b), max(c) from t1 count_window(4, 2); + +print $data00 $data01 $data02 $data03 +print $data10 $data11 $data12 $data13 +print $data20 $data21 $data22 $data23 +print $data30 $data31 $data32 $data33 +print $data40 $data41 $data42 $data43 +print $data50 $data51 $data52 $data53 +print $data60 $data61 $data62 $data63 +print $data70 $data71 $data72 $data73 +print $data80 $data81 $data82 $data83 + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +# row 0 +if $rows != 9 then + print ======rows=$rows + goto loop5 +endi + +sql insert into t1 values(1648791263000,0,1,1,1.0); + +$loop_count = 0 +loop6: + +sleep 300 +print 1 sql select _wstart as s, count(*) c1, sum(b), max(c) from t1 count_window(4, 2); +sql select _wstart as s, count(*) c1, sum(b), max(c) from t1 count_window(4, 2); + +print $data00 $data01 $data02 $data03 +print $data10 $data11 $data12 $data13 +print $data20 $data21 $data22 $data23 +print $data30 $data31 $data32 $data33 +print $data40 $data41 $data42 $data43 +print $data50 $data51 $data52 $data53 +print $data60 $data61 $data62 $data63 +print $data70 $data71 $data72 $data73 +print $data80 $data81 $data82 $data83 + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +# row 0 +if $rows != 9 then + print ======rows=$rows + goto loop6 +endi + + + +print step2 +print =============== create database +sql create database test2 vgroups 4; +sql use test2; + +sql create stable st(ts timestamp, a int, b int , c int, d double) tags(ta int,tb int,tc int); +sql create table t1 using st tags(1,1,1); +sql create table t2 using st tags(2,2,2); + +sql insert into t1 values(1648791213000,0,1,1,1.0); +sql insert into t1 values(1648791213001,9,2,2,1.1); +sql insert into t1 values(1648791213002,0,3,3,1.0); +sql insert into t1 values(1648791213009,0,3,3,1.0); + +$loop_count = 0 +loop7: + +sleep 300 +print 1 sql select _wstart as s, count(*) c1, sum(b), max(c) from st partition by tbname count_window(4, 2); +sql select _wstart as s, count(*) c1, sum(b), max(c) from st partition by tbname count_window(4, 2); + +print $data00 $data01 $data02 $data03 +print $data10 $data11 $data12 $data13 +print $data20 $data21 $data22 $data23 +print $data30 $data31 $data32 $data33 + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +# row 0 +if $rows != 2 then + print ======rows=$rows + goto loop7 +endi + +# row 0 +if $data01 != 4 then + print ======data01=$data01 + goto loop7 +endi + +# row 1 +if $data11 != 2 then + print ======data11=$data11 + goto loop7 +endi + +sql insert into t1 values(1648791223000,0,1,1,1.0); +sql insert into t1 values(1648791223001,9,2,2,1.1); +sql insert into t1 values(1648791223002,9,2,2,1.1); +sql insert into t1 values(1648791223009,0,3,3,1.0); + +$loop_count = 0 +loop8: + +sleep 300 +print 1 sql select _wstart as s, count(*) c1, sum(b), max(c) from st partition by tbname count_window(4, 2); +sql select _wstart as s, count(*) c1, sum(b), max(c) from st partition by tbname count_window(4, 2); + +print $data00 $data01 $data02 $data03 +print $data10 $data11 $data12 $data13 +print $data20 $data21 $data22 $data23 +print $data30 $data31 $data32 $data33 + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +# row 0 +if $rows != 4 then + print ======rows=$rows + goto loop8 +endi + +# row 0 +if $data01 != 4 then + print ======data01=$data01 + goto loop8 +endi + +# row 1 +if $data11 != 4 then + print ======data11=$data11 + goto loop8 +endi + +# row 2 +if $data21 != 4 then + print ======data21=$data21 + goto loop8 +endi + +# row 3 +if $data31 != 2 then + print ======data31=$data31 + goto loop8 +endi + +sql insert into t1 values(1648791233000,0,1,1,1.0) (1648791233001,9,2,2,1.1) (1648791233002,9,2,2,1.1) (1648791233009,0,3,3,1.0); + +$loop_count = 0 +loop9: + +sleep 300 +print 1 sql select _wstart as s, count(*) c1, sum(b), max(c) from st partition by tbname count_window(4, 2); +sql select _wstart as s, count(*) c1, sum(b), max(c) from st partition by tbname count_window(4, 2); + +print $data00 $data01 $data02 $data03 +print $data10 $data11 $data12 $data13 +print $data20 $data21 $data22 $data23 +print $data30 $data31 $data32 $data33 +print $data40 $data41 $data42 $data43 +print $data50 $data51 $data52 $data53 +print $data60 $data61 $data62 $data63 +print $data70 $data71 $data72 $data73 + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +# row 0 +if $rows != 6 then + print ======rows=$rows + goto loop9 +endi + + +sql insert into t1 values(1648791243000,0,1,1,1.0) (1648791243001,9,2,2,1.1); + +$loop_count = 0 +loop10: + +sleep 300 +print 1 sql select _wstart as s, count(*) c1, sum(b), max(c) from st partition by tbname count_window(4, 2); +sql select _wstart as s, count(*) c1, sum(b), max(c) from st partition by tbname count_window(4, 2); + +print $data00 $data01 $data02 $data03 +print $data10 $data11 $data12 $data13 +print $data20 $data21 $data22 $data23 +print $data30 $data31 $data32 $data33 +print $data40 $data41 $data42 $data43 +print $data50 $data51 $data52 $data53 +print $data60 $data61 $data62 $data63 +print $data70 $data71 $data72 $data73 + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +# row 0 +if $rows != 7 then + print ======rows=$rows + goto loop10 +endi + +sql insert into t1 values(1648791253000,0,1,1,1.0) (1648791253001,9,2,2,1.1) (1648791253002,9,2,2,1.1); + +$loop_count = 0 +loop11: + +sleep 300 +print 1 sql select _wstart as s, count(*) c1, sum(b), max(c) from st partition by tbname count_window(4, 2); +sql select _wstart as s, count(*) c1, sum(b), max(c) from st partition by tbname count_window(4, 2); + +print $data00 $data01 $data02 $data03 +print $data10 $data11 $data12 $data13 +print $data20 $data21 $data22 $data23 +print $data30 $data31 $data32 $data33 +print $data40 $data41 $data42 $data43 +print $data50 $data51 $data52 $data53 +print $data60 $data61 $data62 $data63 +print $data70 $data71 $data72 $data73 +print $data80 $data81 $data82 $data83 + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +# row 0 +if $rows != 9 then + print ======rows=$rows + goto loop11 +endi + +sql insert into t1 values(1648791263000,0,1,1,1.0); + +$loop_count = 0 +loop12: + +sleep 300 +print 1 sql select _wstart as s, count(*) c1, sum(b), max(c) from st partition by tbname count_window(4, 2); +sql select _wstart as s, count(*) c1, sum(b), max(c) from st partition by tbname count_window(4, 2); + +print $data00 $data01 $data02 $data03 +print $data10 $data11 $data12 $data13 +print $data20 $data21 $data22 $data23 +print $data30 $data31 $data32 $data33 +print $data40 $data41 $data42 $data43 +print $data50 $data51 $data52 $data53 +print $data60 $data61 $data62 $data63 +print $data70 $data71 $data72 $data73 +print $data80 $data81 $data82 $data83 + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +# row 0 +if $rows != 9 then + print ======rows=$rows + goto loop12 +endi +print count sliding 0 end +system sh/exec.sh -n dnode1 -s stop -x SIGINT