batch count window

This commit is contained in:
54liuyao 2024-02-06 17:19:12 +08:00
parent ba59d4bcb1
commit 9b9ddd9159
4 changed files with 830 additions and 56 deletions

View File

@ -24,15 +24,27 @@
#include "tdatablock.h" #include "tdatablock.h"
#include "ttime.h" #include "ttime.h"
typedef struct SCountWindowResult {
int32_t winRows;
SResultRow row;
} SCountWindowResult;
typedef struct SCountWindowSupp {
SArray* pWinStates;
int32_t stateIndex;
} SCountWindowSupp;
typedef struct SCountWindowOperatorInfo { typedef struct SCountWindowOperatorInfo {
SOptrBasicInfo binfo; SOptrBasicInfo binfo;
SAggSupporter aggSup; SAggSupporter aggSup;
SExprSupp scalarSup; SExprSupp scalarSup;
SWindowRowsSup winSup;
int32_t tsSlotId; // primary timestamp column slot id int32_t tsSlotId; // primary timestamp column slot id
STimeWindowAggSupp twAggSup; STimeWindowAggSupp twAggSup;
uint64_t groupId; // current group id, used to identify the data block from different groups uint64_t groupId; // current group id, used to identify the data block from different groups
SResultRow* pRow; SResultRow* pRow;
int32_t windowCount;
int32_t windowSliding;
SCountWindowSupp countSup;
} SCountWindowOperatorInfo; } SCountWindowOperatorInfo;
void destroyCountWindowOperatorInfo(void* param) { void destroyCountWindowOperatorInfo(void* param) {
@ -40,59 +52,103 @@ void destroyCountWindowOperatorInfo(void* param) {
if (pInfo == NULL) { if (pInfo == NULL) {
return; return;
} }
if (pInfo->pRow != NULL) {
taosMemoryFree(pInfo->pRow);
}
cleanupBasicInfo(&pInfo->binfo); cleanupBasicInfo(&pInfo->binfo);
colDataDestroy(&pInfo->twAggSup.timeWindowData); colDataDestroy(&pInfo->twAggSup.timeWindowData);
cleanupAggSup(&pInfo->aggSup); cleanupAggSup(&pInfo->aggSup);
cleanupExprSupp(&pInfo->scalarSup); cleanupExprSupp(&pInfo->scalarSup);
taosArrayDestroy(pInfo->countSup.pWinStates);
taosMemoryFreeClear(param); taosMemoryFreeClear(param);
} }
static void clearWinStateBuff(SCountWindowResult* pBuff) {
pBuff->winRows = 0;
}
static SCountWindowResult* getCountWinStateInfo(SCountWindowSupp* pCountSup) {
SCountWindowResult* pBuffInfo = taosArrayGet(pCountSup->pWinStates, pCountSup->stateIndex);
pCountSup->stateIndex = (pCountSup->stateIndex + 1) % taosArrayGetSize(pCountSup->pWinStates);
return pBuffInfo;
}
static SCountWindowResult* setCountWindowOutputBuff(SExprSupp* pExprSup, SCountWindowSupp* pCountSup, SResultRow** pResult) {
SCountWindowResult* pBuff = getCountWinStateInfo(pCountSup);
(*pResult) = &pBuff->row;
setResultRowInitCtx(*pResult, pExprSup->pCtx, pExprSup->numOfExprs, pExprSup->rowEntryInfoOffset);
return pBuff;
}
static int32_t updateCountWindowInfo(int32_t start, int32_t blockRows, int32_t countWinRows, int32_t* pCurrentRows) {
int32_t rows = TMIN(countWinRows - (*pCurrentRows), blockRows - start);
(*pCurrentRows) += rows;
return rows;
}
int32_t doCountWindowAggImpl(SOperatorInfo* pOperator, SSDataBlock* pBlock) { int32_t doCountWindowAggImpl(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SExprSupp* pSup = &pOperator->exprSupp; SExprSupp* pExprSup = &pOperator->exprSupp;
SCountWindowOperatorInfo* pInfo = pOperator->info; SCountWindowOperatorInfo* pInfo = pOperator->info;
SSDataBlock* pRes = pInfo->binfo.pRes; SSDataBlock* pRes = pInfo->binfo.pRes;
int64_t groupId = pBlock->info.id.groupId;
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pInfo->tsSlotId); SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pInfo->tsSlotId);
TSKEY* tsCols = (TSKEY*)pColInfoData->pData; TSKEY* tsCols = (TSKEY*)pColInfoData->pData;
SWindowRowsSup* pRowSup = &pInfo->winSup;
int32_t rowIndex = 0;
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
int32_t step = 0;
for (int32_t i = 0; i < pBlock->info.rows;) { for (int32_t i = 0; i < pBlock->info.rows;) {
// todo(liuyao) 1.如果group id发生变化获取新group id上一次的window的缓存并把旧group id的信息存入缓存。 int32_t step = pInfo->windowSliding;
// 没有sliding SCountWindowResult* pBuffInfo = setCountWindowOutputBuff(pExprSup, &pInfo->countSup, &pInfo->pRow);
// 只需要一个缓存即可 int32_t prevRows = pBuffInfo->winRows;
// 1.如果group id发生变化说明本group窗口全部结束输出上次的缓存这里需要判断缓存中是否有数据) int32_t num = updateCountWindowInfo(i, pBlock->info.rows, pInfo->windowCount, &pBuffInfo->winRows);
// 设置缓存 if (prevRows == 0) {
// 2.计算 当前需要合并的行数 pInfo->pRow->win.skey = tsCols[i];
// 3.做聚集计算。 }
// 4.达到行数将结果存入pInfo->res中。 pInfo->pRow->win.ekey = tsCols[num + i - 1];
// 有sliding updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pInfo->pRow->win, 0);
// 缓存是一个队列 applyAggFunctionOnPartialTuples(pTaskInfo, pExprSup->pCtx, &pInfo->twAggSup.timeWindowData, i, num,
// 1.如果group id发生变化说明本group窗口全部结束输出上次的缓存这里需要判断缓存中是否有数据可能输出多行) pBlock->info.rows, pExprSup->numOfExprs);
// pInfo记录队列的起始位置 if (pBuffInfo->winRows == pInfo->windowCount) {
// 2.计算 当前需要合并的行数 doUpdateNumOfRows(pExprSup->pCtx, pInfo->pRow, pExprSup->numOfExprs, pExprSup->rowEntryInfoOffset);
// 3.做聚集计算。 copyResultrowToDataBlock(pExprSup->pExprInfo, pExprSup->numOfExprs, pInfo->pRow, pExprSup->pCtx, pRes,
// 4.达到行数pInfo记录队列的起始位置后移将结果存入pInfo->res中。 pExprSup->rowEntryInfoOffset, pTaskInfo);
pRes->info.rows += pInfo->pRow->numOfRows;
clearWinStateBuff(pBuffInfo);
clearResultRowInitFlag(pExprSup->pCtx, pExprSup->numOfExprs);
}
if (pInfo->windowCount != pInfo->windowSliding) {
if (prevRows <= pInfo->windowSliding) {
if (pBuffInfo->winRows > pInfo->windowSliding) {
step = pInfo->windowSliding - prevRows;
}
} else {
step = 0;
}
}
i += step; i += step;
} }
return code; return code;
} }
static void buildCountResult(SExprSupp* pExprSup, SCountWindowSupp* pCountSup, SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock) {
SResultRow* pResultRow = NULL;
for (int32_t i = 0; i < taosArrayGetSize(pCountSup->pWinStates); i++) {
SCountWindowResult* pBuff = setCountWindowOutputBuff(pExprSup, pCountSup, &pResultRow);
if (pBuff->winRows == 0) {
continue;;
}
doUpdateNumOfRows(pExprSup->pCtx, pResultRow, pExprSup->numOfExprs, pExprSup->rowEntryInfoOffset);
copyResultrowToDataBlock(pExprSup->pExprInfo, pExprSup->numOfExprs, pResultRow, pExprSup->pCtx, pBlock,
pExprSup->rowEntryInfoOffset, pTaskInfo);
pBlock->info.rows += pResultRow->numOfRows;
clearWinStateBuff(pBuff);
clearResultRowInitFlag(pExprSup->pCtx, pExprSup->numOfExprs);
}
}
static SSDataBlock* countWindowAggregate(SOperatorInfo* pOperator) { static SSDataBlock* countWindowAggregate(SOperatorInfo* pOperator) {
SCountWindowOperatorInfo* pInfo = pOperator->info; SCountWindowOperatorInfo* pInfo = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SExprSupp* pSup = &pOperator->exprSupp; SExprSupp* pExprSup = &pOperator->exprSupp;
int32_t order = pInfo->binfo.inputTsOrder; int32_t order = pInfo->binfo.inputTsOrder;
SSDataBlock* pRes = pInfo->binfo.pRes; SSDataBlock* pRes = pInfo->binfo.pRes;
SOperatorInfo* downstream = pOperator->pDownstream[0]; SOperatorInfo* downstream = pOperator->pDownstream[0];
@ -106,7 +162,7 @@ static SSDataBlock* countWindowAggregate(SOperatorInfo* pOperator) {
} }
pRes->info.scanFlag = pBlock->info.scanFlag; pRes->info.scanFlag = pBlock->info.scanFlag;
setInputDataBlock(pSup, pBlock, order, MAIN_SCAN, true); setInputDataBlock(pExprSup, pBlock, order, MAIN_SCAN, true);
blockDataUpdateTsWindow(pBlock, pInfo->tsSlotId); blockDataUpdateTsWindow(pBlock, pInfo->tsSlotId);
// there is an scalar expression that needs to be calculated right before apply the group aggregation. // there is an scalar expression that needs to be calculated right before apply the group aggregation.
@ -118,12 +174,20 @@ static SSDataBlock* countWindowAggregate(SOperatorInfo* pOperator) {
} }
} }
if (pInfo->groupId == 0) {
pInfo->groupId = pBlock->info.id.groupId;
} else if (pInfo->groupId != pBlock->info.id.groupId) {
buildCountResult(pExprSup, &pInfo->countSup, pTaskInfo, pRes);
pInfo->groupId = pBlock->info.id.groupId;
}
doCountWindowAggImpl(pOperator, pBlock); doCountWindowAggImpl(pOperator, pBlock);
if (pRes->info.rows >= pOperator->resultInfo.threshold) { if (pRes->info.rows >= pOperator->resultInfo.threshold) {
return pRes; return pRes;
} }
} }
buildCountResult(pExprSup, &pInfo->countSup, pTaskInfo, pRes);
return pRes->info.rows == 0 ? NULL : pRes; return pRes->info.rows == 0 ? NULL : pRes;
} }
@ -149,13 +213,7 @@ SOperatorInfo* createCountwindowOperatorInfo(SOperatorInfo* downstream, SPhysiNo
} }
} }
code = filterInitFromNode((SNode*)pCountWindowNode->window.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0); size_t keyBufSize = 0;
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
int32_t num = 0; int32_t num = 0;
SExprInfo* pExprInfo = createExprInfo(pCountWindowNode->window.pFuncs, NULL, &num); SExprInfo* pExprInfo = createExprInfo(pCountWindowNode->window.pFuncs, NULL, &num);
initResultSizeInfo(&pOperator->resultInfo, 4096); initResultSizeInfo(&pOperator->resultInfo, 4096);
@ -173,9 +231,20 @@ SOperatorInfo* createCountwindowOperatorInfo(SOperatorInfo* downstream, SPhysiNo
initResultRowInfo(&pInfo->binfo.resultRowInfo); initResultRowInfo(&pInfo->binfo.resultRowInfo);
pInfo->binfo.inputTsOrder = physiNode->inputTsOrder; pInfo->binfo.inputTsOrder = physiNode->inputTsOrder;
pInfo->binfo.outputTsOrder = physiNode->outputTsOrder; pInfo->binfo.outputTsOrder = physiNode->outputTsOrder;
pInfo->windowCount = pCountWindowNode->windowCount;
pInfo->windowSliding = pCountWindowNode->windowSliding;
//sizeof(SCountWindowResult)
int32_t itemSize = sizeof(int32_t) + pInfo->aggSup.resultRowSize;
int32_t numOfItem = 1;
if (pInfo->windowCount != pInfo->windowSliding) {
numOfItem = pInfo->windowCount / pInfo->windowSliding + 1;
}
pInfo->countSup.pWinStates = taosArrayInit_s(itemSize, numOfItem);
if (!pInfo->countSup.pWinStates) {
goto _error;
}
pInfo->twAggSup = (STimeWindowAggSupp){.waterMark = pCountWindowNode->window.watermark, pInfo->countSup.stateIndex = 0;
.calTrigger = pCountWindowNode->window.triggerType};
initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window); initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);
@ -200,20 +269,3 @@ _error:
pTaskInfo->code = code; pTaskInfo->code = code;
return NULL; return NULL;
} }
static int32_t setSingleOutputTupleBufv1(SResultRowInfo* pResultRowInfo, STimeWindow* win, SResultRow** pResult,
SExprSupp* pExprSup, SAggSupporter* pAggSup) {
if (*pResult == NULL) {
SResultRow* p = taosMemoryCalloc(1, pAggSup->resultRowSize);
pResultRowInfo->cur = (SResultRowPosition){.pageId = p->pageId, .offset = p->offset};
*pResult = p;
}
(*pResult)->win = *win;
clearResultRowInitFlag(pExprSup->pCtx, pExprSup->numOfExprs);
setResultRowInitCtx(*pResult, pExprSup->pCtx, pExprSup->numOfExprs, pExprSup->rowEntryInfoOffset);
return TSDB_CODE_SUCCESS;
}

View File

@ -115,7 +115,7 @@ void setCountOutputBuf(SStreamAggSupporter* pAggSup, TSKEY ts, uint64_t groupId,
} }
} }
int32_t updateCountWindowInfo(SStreamAggSupporter* pAggSup, SCountWindowInfo* pWinInfo, TSKEY* pTs, int32_t start, int32_t rows, int32_t maxRows, static int32_t updateCountWindowInfo(SStreamAggSupporter* pAggSup, SCountWindowInfo* pWinInfo, TSKEY* pTs, int32_t start, int32_t rows, int32_t maxRows,
SSHashObj* pStDeleted, bool* pRebuild) { SSHashObj* pStDeleted, bool* pRebuild) {
SSessionKey sWinKey = pWinInfo->winInfo.sessionWin; SSessionKey sWinKey = pWinInfo->winInfo.sessionWin;
int32_t num = 0; int32_t num = 0;
@ -290,7 +290,6 @@ static void doStreamCountAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl
buffInfo.winBuffOp = MOVE_NEXT_WINDOW; buffInfo.winBuffOp = MOVE_NEXT_WINDOW;
winRows = 0; winRows = 0;
} }
slidingRows = (slidingRows + winRows) % pAggSup->windowSliding;
} }
i += winRows; i += winRows;
} }

View File

@ -0,0 +1,179 @@
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/exec.sh -n dnode1 -s start
sleep 50
sql connect
print step1
print =============== create database
sql create database test vgroups 1;
sql use test;
sql create table t1(ts timestamp, a int, b int , c int, d double);
sql insert into t1 values(1648791213000,0,1,1,1.0);
sql insert into t1 values(1648791213001,9,2,2,1.1);
sql insert into t1 values(1648791213009,0,3,3,1.0);
sql insert into t1 values(1648791223000,0,1,1,1.0);
sql insert into t1 values(1648791223001,9,2,2,1.1);
sql insert into t1 values(1648791223009,0,3,3,1.0);
$loop_count = 0
loop2:
sleep 300
print 1 sql select _wstart as s, count(*) c1, sum(b), max(c) from t1 count_window(3);
sql select _wstart as s, count(*) c1, sum(b), max(c) from t1 count_window(3);
print $data00 $data01 $data02 $data03
print $data10 $data11 $data12 $data13
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
# row 0
if $data01 != 3 then
print ======data01=$data01
goto loop2
endi
if $data02 != 6 then
print ======data02=$data02
goto loop2
endi
if $data03 != 3 then
print ======data03=$data03
goto loop2
endi
# row 1
if $data11 != 3 then
print ======data11=$data11
goto loop2
endi
if $data12 != 6 then
print ======data12=$data12
goto loop2
endi
if $data13 != 3 then
print ======data13=$data13
goto loop2
endi
print step2
print =============== create database
sql create database test2 vgroups 4;
sql use test2;
sql create stable st(ts timestamp, a int, b int , c int, d double) tags(ta int,tb int,tc int);
sql create table t1 using st tags(1,1,1);
sql create table t2 using st tags(2,2,2);
sql insert into t1 values(1648791213000,0,1,1,1.0);
sql insert into t1 values(1648791213001,9,2,2,1.1);
sql insert into t1 values(1648791213009,0,3,3,1.0);
sql insert into t2 values(1648791213000,0,1,1,1.0);
sql insert into t2 values(1648791213001,9,2,2,1.1);
sql insert into t2 values(1648791213009,0,3,3,1.0);
sql insert into t1 values(1648791223000,0,1,1,1.0);
sql insert into t1 values(1648791223001,9,2,2,1.1);
sql insert into t1 values(1648791223009,0,3,3,1.0);
sql insert into t2 values(1648791223000,0,1,1,1.0);
sql insert into t2 values(1648791223001,9,2,2,1.1);
sql insert into t2 values(1648791223009,0,3,3,1.0);
$loop_count = 0
loop3:
sleep 300
print 1 sql select _wstart as s, count(*) c1, sum(b), max(c) from st partition by tbname count_window(3);
sql select _wstart as s, count(*) c1, sum(b), max(c) from st partition by tbname count_window(3);
print $data00 $data01 $data02 $data03
print $data10 $data11 $data12 $data13
print $data20 $data21 $data22 $data23
print $data30 $data31 $data32 $data33
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
# row 0
if $data01 != 3 then
print ======data01=$data01
goto loop3
endi
if $data02 != 6 then
print ======data02=$data02
goto loop3
endi
if $data03 != 3 then
print ======data03=$data03
goto loop3
endi
# row 1
if $data11 != 3 then
print ======data11=$data11
goto loop3
endi
if $data12 != 6 then
print ======data12=$data12
goto loop3
endi
if $data13 != 3 then
print ======data13=$data13
goto loop3
endi
# row 2
if $data21 != 3 then
print ======data21=$data21
goto loop3
endi
if $data22 != 6 then
print ======data22=$data22
goto loop3
endi
if $data23 != 3 then
print ======data23=$data23
goto loop3
endi
# row 3
if $data31 != 3 then
print ======data31=$data31
goto loop3
endi
if $data32 != 6 then
print ======data32=$data32
goto loop3
endi
if $data33 != 3 then
print ======data33=$data33
goto loop3
endi
print query_count0 end
system sh/exec.sh -n dnode1 -s stop -x SIGINT

View File

@ -0,0 +1,544 @@
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/exec.sh -n dnode1 -s start
sleep 50
sql connect
print step1
print =============== create database
sql create database test vgroups 1;
sql use test;
sql create table t1(ts timestamp, a int, b int , c int, d double);
sql insert into t1 values(1648791213000,0,1,1,1.0);
$loop_count = 0
loop00:
sleep 300
print 00 sql select _wstart as s, count(*) c1, sum(b), max(c) from t1 count_window(4, 2);
sql select _wstart as s, count(*) c1, sum(b), max(c) from t1 count_window(4, 2);
print $data00 $data01 $data02 $data03
print $data10 $data11 $data12 $data13
print $data20 $data21 $data22 $data23
print $data30 $data31 $data32 $data33
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
# row 0
if $rows != 1 then
print ======rows=$rows
goto loop00
endi
# row 0
if $data01 != 1 then
print ======data01=$data01
goto loop00
endi
sql insert into t1 values(1648791213001,9,2,2,1.1);
$loop_count = 0
loop01:
sleep 300
print 01 sql select _wstart as s, count(*) c1, sum(b), max(c) from t1 count_window(4, 2);
sql select _wstart as s, count(*) c1, sum(b), max(c) from t1 count_window(4, 2);
print $data00 $data01 $data02 $data03
print $data10 $data11 $data12 $data13
print $data20 $data21 $data22 $data23
print $data30 $data31 $data32 $data33
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
# row 0
if $rows != 1 then
print ======rows=$rows
goto loop01
endi
# row 0
if $data01 != 2 then
print ======data01=$data01
goto loop01
endi
sql insert into t1 values(1648791213002,0,3,3,1.0);
$loop_count = 0
loop02:
sleep 300
print 02 sql select _wstart as s, count(*) c1, sum(b), max(c) from t1 count_window(4, 2);
sql select _wstart as s, count(*) c1, sum(b), max(c) from t1 count_window(4, 2);
print $data00 $data01 $data02 $data03
print $data10 $data11 $data12 $data13
print $data20 $data21 $data22 $data23
print $data30 $data31 $data32 $data33
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
# row 0
if $rows != 2 then
print ======rows=$rows
goto loop02
endi
# row 0
if $data01 != 3 then
print ======data01=$data01
goto loop02
endi
# row 1
if $data11 != 1 then
print ======data01=$data01
goto loop02
endi
sql insert into t1 values(1648791213009,0,3,3,1.0);
$loop_count = 0
loop0:
sleep 300
print 1 sql select _wstart as s, count(*) c1, sum(b), max(c) from t1 count_window(4, 2);
sql select _wstart as s, count(*) c1, sum(b), max(c) from t1 count_window(4, 2);
print $data00 $data01 $data02 $data03
print $data10 $data11 $data12 $data13
print $data20 $data21 $data22 $data23
print $data30 $data31 $data32 $data33
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
# row 0
if $rows != 2 then
print ======rows=$rows
goto loop0
endi
# row 0
if $data01 != 4 then
print ======data01=$data01
goto loop0
endi
# row 1
if $data11 != 2 then
print ======data11=$data11
goto loop0
endi
sql insert into t1 values(1648791223000,0,1,1,1.0);
sql insert into t1 values(1648791223001,9,2,2,1.1);
sql insert into t1 values(1648791223002,9,2,2,1.1);
sql insert into t1 values(1648791223009,0,3,3,1.0);
$loop_count = 0
loop2:
sleep 300
print 1 sql select _wstart as s, count(*) c1, sum(b), max(c) from t1 count_window(4, 2);
sql select _wstart as s, count(*) c1, sum(b), max(c) from t1 count_window(4, 2);
print $data00 $data01 $data02 $data03
print $data10 $data11 $data12 $data13
print $data20 $data21 $data22 $data23
print $data30 $data31 $data32 $data33
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
# row 0
if $rows != 4 then
print ======rows=$rows
goto loop2
endi
# row 0
if $data01 != 4 then
print ======data01=$data01
goto loop2
endi
# row 1
if $data11 != 4 then
print ======data11=$data11
goto loop2
endi
# row 2
if $data21 != 4 then
print ======data21=$data21
goto loop2
endi
# row 3
if $data31 != 2 then
print ======data31=$data31
goto loop2
endi
sql insert into t1 values(1648791233000,0,1,1,1.0) (1648791233001,9,2,2,1.1) (1648791233002,9,2,2,1.1) (1648791233009,0,3,3,1.0);
$loop_count = 0
loop3:
sleep 300
print 1 sql select _wstart as s, count(*) c1, sum(b), max(c) from t1 count_window(4, 2);
sql select _wstart as s, count(*) c1, sum(b), max(c) from t1 count_window(4, 2);
print $data00 $data01 $data02 $data03
print $data10 $data11 $data12 $data13
print $data20 $data21 $data22 $data23
print $data30 $data31 $data32 $data33
print $data40 $data41 $data42 $data43
print $data50 $data51 $data52 $data53
print $data60 $data61 $data62 $data63
print $data70 $data71 $data72 $data73
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
# row 0
if $rows != 6 then
print ======rows=$rows
goto loop3
endi
sql insert into t1 values(1648791243000,0,1,1,1.0) (1648791243001,9,2,2,1.1);
$loop_count = 0
loop4:
sleep 300
print 1 sql select _wstart as s, count(*) c1, sum(b), max(c) from t1 count_window(4, 2);
sql select _wstart as s, count(*) c1, sum(b), max(c) from t1 count_window(4, 2);
print $data00 $data01 $data02 $data03
print $data10 $data11 $data12 $data13
print $data20 $data21 $data22 $data23
print $data30 $data31 $data32 $data33
print $data40 $data41 $data42 $data43
print $data50 $data51 $data52 $data53
print $data60 $data61 $data62 $data63
print $data70 $data71 $data72 $data73
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
# row 0
if $rows != 7 then
print ======rows=$rows
goto loop4
endi
sql insert into t1 values(1648791253000,0,1,1,1.0) (1648791253001,9,2,2,1.1) (1648791253002,9,2,2,1.1);
$loop_count = 0
loop5:
sleep 300
print 1 sql select _wstart as s, count(*) c1, sum(b), max(c) from t1 count_window(4, 2);
sql select _wstart as s, count(*) c1, sum(b), max(c) from t1 count_window(4, 2);
print $data00 $data01 $data02 $data03
print $data10 $data11 $data12 $data13
print $data20 $data21 $data22 $data23
print $data30 $data31 $data32 $data33
print $data40 $data41 $data42 $data43
print $data50 $data51 $data52 $data53
print $data60 $data61 $data62 $data63
print $data70 $data71 $data72 $data73
print $data80 $data81 $data82 $data83
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
# row 0
if $rows != 9 then
print ======rows=$rows
goto loop5
endi
sql insert into t1 values(1648791263000,0,1,1,1.0);
$loop_count = 0
loop6:
sleep 300
print 1 sql select _wstart as s, count(*) c1, sum(b), max(c) from t1 count_window(4, 2);
sql select _wstart as s, count(*) c1, sum(b), max(c) from t1 count_window(4, 2);
print $data00 $data01 $data02 $data03
print $data10 $data11 $data12 $data13
print $data20 $data21 $data22 $data23
print $data30 $data31 $data32 $data33
print $data40 $data41 $data42 $data43
print $data50 $data51 $data52 $data53
print $data60 $data61 $data62 $data63
print $data70 $data71 $data72 $data73
print $data80 $data81 $data82 $data83
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
# row 0
if $rows != 9 then
print ======rows=$rows
goto loop6
endi
print step2
print =============== create database
sql create database test2 vgroups 4;
sql use test2;
sql create stable st(ts timestamp, a int, b int , c int, d double) tags(ta int,tb int,tc int);
sql create table t1 using st tags(1,1,1);
sql create table t2 using st tags(2,2,2);
sql insert into t1 values(1648791213000,0,1,1,1.0);
sql insert into t1 values(1648791213001,9,2,2,1.1);
sql insert into t1 values(1648791213002,0,3,3,1.0);
sql insert into t1 values(1648791213009,0,3,3,1.0);
$loop_count = 0
loop7:
sleep 300
print 1 sql select _wstart as s, count(*) c1, sum(b), max(c) from st partition by tbname count_window(4, 2);
sql select _wstart as s, count(*) c1, sum(b), max(c) from st partition by tbname count_window(4, 2);
print $data00 $data01 $data02 $data03
print $data10 $data11 $data12 $data13
print $data20 $data21 $data22 $data23
print $data30 $data31 $data32 $data33
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
# row 0
if $rows != 2 then
print ======rows=$rows
goto loop7
endi
# row 0
if $data01 != 4 then
print ======data01=$data01
goto loop7
endi
# row 1
if $data11 != 2 then
print ======data11=$data11
goto loop7
endi
sql insert into t1 values(1648791223000,0,1,1,1.0);
sql insert into t1 values(1648791223001,9,2,2,1.1);
sql insert into t1 values(1648791223002,9,2,2,1.1);
sql insert into t1 values(1648791223009,0,3,3,1.0);
$loop_count = 0
loop8:
sleep 300
print 1 sql select _wstart as s, count(*) c1, sum(b), max(c) from st partition by tbname count_window(4, 2);
sql select _wstart as s, count(*) c1, sum(b), max(c) from st partition by tbname count_window(4, 2);
print $data00 $data01 $data02 $data03
print $data10 $data11 $data12 $data13
print $data20 $data21 $data22 $data23
print $data30 $data31 $data32 $data33
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
# row 0
if $rows != 4 then
print ======rows=$rows
goto loop8
endi
# row 0
if $data01 != 4 then
print ======data01=$data01
goto loop8
endi
# row 1
if $data11 != 4 then
print ======data11=$data11
goto loop8
endi
# row 2
if $data21 != 4 then
print ======data21=$data21
goto loop8
endi
# row 3
if $data31 != 2 then
print ======data31=$data31
goto loop8
endi
sql insert into t1 values(1648791233000,0,1,1,1.0) (1648791233001,9,2,2,1.1) (1648791233002,9,2,2,1.1) (1648791233009,0,3,3,1.0);
$loop_count = 0
loop9:
sleep 300
print 1 sql select _wstart as s, count(*) c1, sum(b), max(c) from st partition by tbname count_window(4, 2);
sql select _wstart as s, count(*) c1, sum(b), max(c) from st partition by tbname count_window(4, 2);
print $data00 $data01 $data02 $data03
print $data10 $data11 $data12 $data13
print $data20 $data21 $data22 $data23
print $data30 $data31 $data32 $data33
print $data40 $data41 $data42 $data43
print $data50 $data51 $data52 $data53
print $data60 $data61 $data62 $data63
print $data70 $data71 $data72 $data73
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
# row 0
if $rows != 6 then
print ======rows=$rows
goto loop9
endi
sql insert into t1 values(1648791243000,0,1,1,1.0) (1648791243001,9,2,2,1.1);
$loop_count = 0
loop10:
sleep 300
print 1 sql select _wstart as s, count(*) c1, sum(b), max(c) from st partition by tbname count_window(4, 2);
sql select _wstart as s, count(*) c1, sum(b), max(c) from st partition by tbname count_window(4, 2);
print $data00 $data01 $data02 $data03
print $data10 $data11 $data12 $data13
print $data20 $data21 $data22 $data23
print $data30 $data31 $data32 $data33
print $data40 $data41 $data42 $data43
print $data50 $data51 $data52 $data53
print $data60 $data61 $data62 $data63
print $data70 $data71 $data72 $data73
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
# row 0
if $rows != 7 then
print ======rows=$rows
goto loop10
endi
sql insert into t1 values(1648791253000,0,1,1,1.0) (1648791253001,9,2,2,1.1) (1648791253002,9,2,2,1.1);
$loop_count = 0
loop11:
sleep 300
print 1 sql select _wstart as s, count(*) c1, sum(b), max(c) from st partition by tbname count_window(4, 2);
sql select _wstart as s, count(*) c1, sum(b), max(c) from st partition by tbname count_window(4, 2);
print $data00 $data01 $data02 $data03
print $data10 $data11 $data12 $data13
print $data20 $data21 $data22 $data23
print $data30 $data31 $data32 $data33
print $data40 $data41 $data42 $data43
print $data50 $data51 $data52 $data53
print $data60 $data61 $data62 $data63
print $data70 $data71 $data72 $data73
print $data80 $data81 $data82 $data83
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
# row 0
if $rows != 9 then
print ======rows=$rows
goto loop11
endi
sql insert into t1 values(1648791263000,0,1,1,1.0);
$loop_count = 0
loop12:
sleep 300
print 1 sql select _wstart as s, count(*) c1, sum(b), max(c) from st partition by tbname count_window(4, 2);
sql select _wstart as s, count(*) c1, sum(b), max(c) from st partition by tbname count_window(4, 2);
print $data00 $data01 $data02 $data03
print $data10 $data11 $data12 $data13
print $data20 $data21 $data22 $data23
print $data30 $data31 $data32 $data33
print $data40 $data41 $data42 $data43
print $data50 $data51 $data52 $data53
print $data60 $data61 $data62 $data63
print $data70 $data71 $data72 $data73
print $data80 $data81 $data82 $data83
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
# row 0
if $rows != 9 then
print ======rows=$rows
goto loop12
endi
print count sliding 0 end
system sh/exec.sh -n dnode1 -s stop -x SIGINT