Merge pull request #16744 from taosdata/feature/TD-19015
feat(stream): stream delete for session&state
This commit is contained in:
commit
f9d73f69a0
|
@ -932,6 +932,7 @@ static SSDataBlock* doStreamHashPartition(SOperatorInfo* pOperator) {
|
|||
case STREAM_DELETE_DATA: {
|
||||
copyDataBlock(pInfo->pDelRes, pBlock);
|
||||
pInfo->pDelRes->info.type = STREAM_DELETE_RESULT;
|
||||
return pInfo->pDelRes;
|
||||
} break;
|
||||
default:
|
||||
return pBlock;
|
||||
|
|
|
@ -1424,7 +1424,7 @@ void doDeleteSpecifyIntervalWindow(SAggSupporter* pAggSup, SSDataBlock* pBlock,
|
|||
taosHashRemove(pUpdatedMap, &winRes, sizeof(SWinKey));
|
||||
}
|
||||
getNextTimeWindow(pInterval, pInterval->precision, TSDB_ORDER_ASC, &win);
|
||||
} while (win.skey < tsEnds[i]);
|
||||
} while (win.skey <= tsEnds[i]);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -3595,7 +3595,8 @@ SArray* getWinInfos(SStreamAggSupporter* pAggSup, uint64_t groupId) {
|
|||
// don't add new window
|
||||
SResultWindowInfo* getCurSessionWindow(SStreamAggSupporter* pAggSup, TSKEY startTs, TSKEY endTs, uint64_t groupId,
|
||||
int64_t gap, int32_t* pIndex) {
|
||||
SArray* pWinInfos = getWinInfos(pAggSup, groupId);
|
||||
STimeWindow searchWin = {.skey = startTs, .ekey = endTs};
|
||||
SArray* pWinInfos = getWinInfos(pAggSup, groupId);
|
||||
pAggSup->pCurWins = pWinInfos;
|
||||
|
||||
int32_t size = taosArrayGetSize(pWinInfos);
|
||||
|
@ -3607,7 +3608,7 @@ SResultWindowInfo* getCurSessionWindow(SStreamAggSupporter* pAggSup, TSKEY start
|
|||
SResultWindowInfo* pWin = NULL;
|
||||
if (index >= 0) {
|
||||
pWin = taosArrayGet(pWinInfos, index);
|
||||
if (isInWindow(pWin, startTs, gap)) {
|
||||
if (isInWindow(pWin, startTs, gap) || isInTimeWindow(&searchWin, pWin->win.skey, gap)) {
|
||||
*pIndex = index;
|
||||
return pWin;
|
||||
}
|
||||
|
@ -3615,7 +3616,7 @@ SResultWindowInfo* getCurSessionWindow(SStreamAggSupporter* pAggSup, TSKEY start
|
|||
|
||||
if (index + 1 < size) {
|
||||
pWin = taosArrayGet(pWinInfos, index + 1);
|
||||
if (isInWindow(pWin, startTs, gap)) {
|
||||
if (isInWindow(pWin, startTs, gap) || isInTimeWindow(&searchWin, pWin->win.skey, gap)) {
|
||||
*pIndex = index + 1;
|
||||
return pWin;
|
||||
} else if (endTs != INT64_MIN && isInWindow(pWin, endTs, gap)) {
|
||||
|
@ -3793,7 +3794,7 @@ void compactTimeWindow(SStreamSessionAggOperatorInfo* pInfo, int32_t startIndex,
|
|||
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pCurWin->win, true);
|
||||
compactFunctions(pSup->pCtx, pInfo->pDummyCtx, numOfOutput, pTaskInfo, &pInfo->twAggSup.timeWindowData);
|
||||
taosHashRemove(pStUpdated, &pWinInfo->pos, sizeof(SResultRowPosition));
|
||||
if (pWinInfo->isOutput) {
|
||||
if (pWinInfo->isOutput && pStDeleted) {
|
||||
SWinKey res = {.ts = pWinInfo->win.skey, .groupId = groupId};
|
||||
taosHashPut(pStDeleted, &res, sizeof(SWinKey), &res, sizeof(SWinKey));
|
||||
pWinInfo->isOutput = false;
|
||||
|
@ -3886,19 +3887,24 @@ static void doDeleteTimeWindows(SStreamAggSupporter* pAggSup, SSDataBlock* pBloc
|
|||
SColumnInfoData* pGroupCol = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX);
|
||||
uint64_t* gpDatas = (uint64_t*)pGroupCol->pData;
|
||||
for (int32_t i = 0; i < pBlock->info.rows; i++) {
|
||||
int32_t winIndex = 0;
|
||||
while (1) {
|
||||
SResultWindowInfo* pCurWin = getCurSessionWindow(pAggSup, startDatas[i], endDatas[i], gpDatas[i], gap, &winIndex);
|
||||
if (!pCurWin) {
|
||||
break;
|
||||
}
|
||||
int32_t winIndex = 0;
|
||||
SResultWindowInfo* pCurWin = getCurSessionWindow(pAggSup, startDatas[i], endDatas[i], gpDatas[i], gap, &winIndex);
|
||||
if (!pCurWin) {
|
||||
continue;
|
||||
}
|
||||
|
||||
do {
|
||||
SResultWindowInfo delWin = *pCurWin;
|
||||
deleteWindow(pAggSup->pCurWins, winIndex, fp);
|
||||
if (result) {
|
||||
delWin.groupId = gpDatas[i];
|
||||
taosArrayPush(result, &delWin);
|
||||
}
|
||||
}
|
||||
if (winIndex >= taosArrayGetSize(pAggSup->pCurWins)) {
|
||||
break;
|
||||
}
|
||||
pCurWin = taosArrayGet(pAggSup->pCurWins, winIndex);
|
||||
} while (pCurWin->win.skey <= endDatas[i]);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -3979,26 +3985,16 @@ void doBuildDeleteDataBlock(SHashObj* pStDeleted, SSDataBlock* pBlock, void** It
|
|||
}
|
||||
|
||||
static void rebuildTimeWindow(SStreamSessionAggOperatorInfo* pInfo, SArray* pWinArray, int32_t numOfOutput,
|
||||
SOperatorInfo* pOperator, SHashObj* pStUpdated, bool needCreate) {
|
||||
SOperatorInfo* pOperator, SHashObj* pStUpdated) {
|
||||
SExprSupp* pSup = &pOperator->exprSupp;
|
||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||
|
||||
int32_t size = taosArrayGetSize(pWinArray);
|
||||
int32_t size = taosArrayGetSize(pWinArray);
|
||||
ASSERT(pInfo->pChildren);
|
||||
|
||||
for (int32_t i = 0; i < size; i++) {
|
||||
SResultWindowInfo* pParentWin = taosArrayGet(pWinArray, i);
|
||||
SResultRow* pCurResult = NULL;
|
||||
uint64_t groupId = pParentWin->groupId;
|
||||
int32_t winIndex = 0;
|
||||
if (needCreate) {
|
||||
pParentWin =
|
||||
getSessionTimeWindow(&pInfo->streamAggSup, pParentWin->win.skey, pParentWin->win.ekey, groupId, 0, &winIndex);
|
||||
}
|
||||
setWindowOutputBuf(pParentWin, &pCurResult, pSup->pCtx, groupId, numOfOutput, pSup->rowEntryInfoOffset,
|
||||
&pInfo->streamAggSup, pTaskInfo);
|
||||
int32_t numOfChildren = taosArrayGetSize(pInfo->pChildren);
|
||||
int32_t num = 0;
|
||||
int32_t numOfChildren = taosArrayGetSize(pInfo->pChildren);
|
||||
for (int32_t j = 0; j < numOfChildren; j++) {
|
||||
SOperatorInfo* pChild = taosArrayGetP(pInfo->pChildren, j);
|
||||
SStreamSessionAggOperatorInfo* pChInfo = pChild->info;
|
||||
|
@ -4011,31 +4007,36 @@ static void rebuildTimeWindow(SStreamSessionAggOperatorInfo* pInfo, SArray* pWin
|
|||
for (int32_t k = index; k < chWinSize; k++) {
|
||||
SResultWindowInfo* pChWin = taosArrayGet(pChWins, k);
|
||||
if (pParentWin->win.skey <= pChWin->win.skey && pChWin->win.ekey <= pParentWin->win.ekey) {
|
||||
int32_t winIndex = 0;
|
||||
SResultWindowInfo* pNewParWin =
|
||||
getSessionTimeWindow(&pInfo->streamAggSup, pChWin->win.skey, pChWin->win.ekey, groupId, 0, &winIndex);
|
||||
SResultRow* pPareResult = NULL;
|
||||
setWindowOutputBuf(pNewParWin, &pPareResult, pSup->pCtx, groupId, numOfOutput, pSup->rowEntryInfoOffset,
|
||||
&pInfo->streamAggSup, pTaskInfo);
|
||||
SResultRow* pChResult = NULL;
|
||||
setWindowOutputBuf(pChWin, &pChResult, pChild->exprSupp.pCtx, groupId, numOfOutput,
|
||||
pChild->exprSupp.rowEntryInfoOffset, &pChInfo->streamAggSup, pTaskInfo);
|
||||
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pChWin->win, true);
|
||||
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pNewParWin->win, true);
|
||||
compactFunctions(pSup->pCtx, pChild->exprSupp.pCtx, numOfOutput, pTaskInfo, &pInfo->twAggSup.timeWindowData);
|
||||
|
||||
int32_t winNum = getNumCompactWindow(pInfo->streamAggSup.pCurWins, winIndex, pInfo->gap);
|
||||
if (winNum > 0) {
|
||||
compactTimeWindow(pInfo, winIndex, winNum, groupId, numOfOutput, pStUpdated, NULL, pOperator);
|
||||
}
|
||||
|
||||
SFilePage* bufPage = getBufPage(pChInfo->streamAggSup.pResultBuf, pChWin->pos.pageId);
|
||||
releaseBufPage(pChInfo->streamAggSup.pResultBuf, bufPage);
|
||||
num++;
|
||||
continue;
|
||||
|
||||
bufPage = getBufPage(pInfo->streamAggSup.pResultBuf, pNewParWin->pos.pageId);
|
||||
setBufPageDirty(bufPage, true);
|
||||
releaseBufPage(pInfo->streamAggSup.pResultBuf, bufPage);
|
||||
SWinKey value = {.ts = pNewParWin->win.skey, .groupId = groupId};
|
||||
taosHashPut(pStUpdated, &pNewParWin->pos, sizeof(SResultRowPosition), &value, sizeof(SWinKey));
|
||||
} else if (!pChWin->isClosed) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
if (num == 0 && needCreate) {
|
||||
deleteWindow(pInfo->streamAggSup.pCurWins, winIndex, NULL);
|
||||
}
|
||||
if (pStUpdated && num > 0) {
|
||||
SWinKey value = {.ts = pParentWin->win.skey, .groupId = groupId};
|
||||
taosHashPut(pStUpdated, &pParentWin->pos, sizeof(SResultRowPosition), &value, sizeof(SWinKey));
|
||||
}
|
||||
SFilePage* bufPage = getBufPage(pInfo->streamAggSup.pResultBuf, pParentWin->pos.pageId);
|
||||
ASSERT(size > 0);
|
||||
setBufPageDirty(bufPage, true);
|
||||
releaseBufPage(pInfo->streamAggSup.pResultBuf, bufPage);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -4196,7 +4197,7 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
|
|||
SStreamSessionAggOperatorInfo* pChildInfo = pChildOp->info;
|
||||
doClearSessionWindows(&pChildInfo->streamAggSup, &pChildOp->exprSupp, pBlock, START_TS_COLUMN_INDEX,
|
||||
pChildOp->exprSupp.numOfExprs, 0, NULL);
|
||||
rebuildTimeWindow(pInfo, pWins, pOperator->exprSupp.numOfExprs, pOperator, NULL, false);
|
||||
rebuildTimeWindow(pInfo, pWins, pOperator->exprSupp.numOfExprs, pOperator, pStUpdated);
|
||||
}
|
||||
taosArrayDestroy(pWins);
|
||||
continue;
|
||||
|
@ -4210,7 +4211,7 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
|
|||
SStreamSessionAggOperatorInfo* pChildInfo = pChildOp->info;
|
||||
// gap must be 0
|
||||
doDeleteTimeWindows(&pChildInfo->streamAggSup, pBlock, 0, NULL, NULL);
|
||||
rebuildTimeWindow(pInfo, pWins, pOperator->exprSupp.numOfExprs, pOperator, pStUpdated, true);
|
||||
rebuildTimeWindow(pInfo, pWins, pOperator->exprSupp.numOfExprs, pOperator, pStUpdated);
|
||||
}
|
||||
copyDeleteWindowInfo(pWins, pInfo->pStDeleted);
|
||||
removeSessionResults(pStUpdated, pWins);
|
||||
|
@ -4747,7 +4748,7 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) {
|
|||
if (pBlock->info.type == STREAM_CLEAR) {
|
||||
doClearStateWindows(&pInfo->streamAggSup, pBlock, pSeUpdated, pInfo->pSeDeleted);
|
||||
continue;
|
||||
} else if (pBlock->info.type == STREAM_DELETE_DATA) {
|
||||
} else if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT) {
|
||||
SArray* pWins = taosArrayInit(16, sizeof(SResultWindowInfo));
|
||||
doDeleteTimeWindows(&pInfo->streamAggSup, pBlock, 0, pWins, destroyStateWinInfo);
|
||||
copyDeleteWindowInfo(pWins, pInfo->pSeDeleted);
|
||||
|
@ -5674,7 +5675,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
|
|||
NULL);
|
||||
qDebug("%s clear existed time window results for updates checked", GET_TASKID(pTaskInfo));
|
||||
continue;
|
||||
} else if (pBlock->info.type == STREAM_DELETE_DATA) {
|
||||
} else if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT) {
|
||||
doDeleteSpecifyIntervalWindow(&pInfo->aggSup, pBlock, pInfo->pDelWins, &pInfo->interval, pUpdatedMap);
|
||||
continue;
|
||||
} else if (pBlock->info.type == STREAM_GET_ALL) {
|
||||
|
|
|
@ -248,6 +248,12 @@
|
|||
./test.sh -f tsim/stream/windowClose.sim
|
||||
./test.sh -f tsim/stream/ignoreExpiredData.sim
|
||||
./test.sh -f tsim/stream/sliding.sim
|
||||
#./test.sh -f tsim/stream/partitionbyColumnInterval.sim
|
||||
#./test.sh -f tsim/stream/partitionbyColumnSession.sim
|
||||
#./test.sh -f tsim/stream/partitionbyColumnState.sim
|
||||
#./test.sh -f tsim/stream/deleteInterval.sim
|
||||
#./test.sh -f tsim/stream/deleteSession.sim
|
||||
#./test.sh -f tsim/stream/deleteState.sim
|
||||
|
||||
# ---- transaction ----
|
||||
./test.sh -f tsim/trans/lossdata1.sim
|
||||
|
|
|
@ -0,0 +1,532 @@
|
|||
$loop_all = 0
|
||||
looptest:
|
||||
|
||||
system sh/stop_dnodes.sh
|
||||
system sh/deploy.sh -n dnode1 -i 1
|
||||
system sh/exec.sh -n dnode1 -s start
|
||||
sleep 200
|
||||
sql connect
|
||||
|
||||
sql drop stream if exists streams0;
|
||||
sql drop stream if exists streams1;
|
||||
sql drop stream if exists streams2;
|
||||
sql drop stream if exists streams3;
|
||||
sql drop stream if exists streams4;
|
||||
sql drop database if exists test;
|
||||
sql create database test vgroups 1;
|
||||
sql use test;
|
||||
sql create table t1(ts timestamp, a int, b int , c int, d double);
|
||||
sql create stream streams0 trigger at_once into streamt as select _wstart c1, count(*) c2, max(a) c3 from t1 session(ts, 5s);
|
||||
|
||||
sql insert into t1 values(1648791213000,NULL,NULL,NULL,NULL);
|
||||
sleep 200
|
||||
sql delete from t1 where ts = 1648791213000;
|
||||
|
||||
$loop_count = 0
|
||||
|
||||
loop0:
|
||||
sleep 200
|
||||
sql select * from streamt order by c1, c2, c3;
|
||||
|
||||
$loop_count = $loop_count + 1
|
||||
if $loop_count == 10 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $rows != 0 then
|
||||
print =====rows=$rows
|
||||
goto loop0
|
||||
endi
|
||||
|
||||
sql insert into t1 values(1648791213000,NULL,NULL,NULL,NULL);
|
||||
|
||||
$loop_count = 0
|
||||
|
||||
loop1:
|
||||
sleep 200
|
||||
sql select * from streamt order by c1, c2, c3;
|
||||
|
||||
$loop_count = $loop_count + 1
|
||||
if $loop_count == 10 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data01 != 1 then
|
||||
print =====data01=$data01
|
||||
goto loop1
|
||||
endi
|
||||
|
||||
if $data02 != NULL then
|
||||
print =====data02=$data02
|
||||
goto loop1
|
||||
endi
|
||||
|
||||
sql insert into t1 values(1648791213000,1,1,1,1.0);
|
||||
sql insert into t1 values(1648791213001,2,2,2,2.0);
|
||||
sql insert into t1 values(1648791213002,3,3,3,3.0);
|
||||
sql insert into t1 values(1648791213003,4,4,4,4.0);
|
||||
|
||||
sleep 200
|
||||
sql delete from t1 where ts >= 1648791213001 and ts <= 1648791213002;
|
||||
|
||||
$loop_count = 0
|
||||
|
||||
loop3:
|
||||
sleep 200
|
||||
sql select * from streamt order by c1, c2, c3;
|
||||
|
||||
$loop_count = $loop_count + 1
|
||||
if $loop_count == 10 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data01 != 2 then
|
||||
print =====data01=$data01
|
||||
goto loop3
|
||||
endi
|
||||
|
||||
if $data02 != 4 then
|
||||
print =====data02=$data02
|
||||
goto loop3
|
||||
endi
|
||||
|
||||
sql insert into t1 values(1648791223000,1,2,3,1.0);
|
||||
sql insert into t1 values(1648791223001,1,2,3,1.0);
|
||||
sql insert into t1 values(1648791223002,3,2,3,1.0);
|
||||
sql insert into t1 values(1648791223003,3,2,3,1.0);
|
||||
|
||||
$loop_count = 0
|
||||
|
||||
loop4:
|
||||
sleep 200
|
||||
sql select * from streamt order by c1, c2, c3;
|
||||
|
||||
$loop_count = $loop_count + 1
|
||||
if $loop_count == 10 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $rows != 2 then
|
||||
print =====rows=$rows
|
||||
goto loop4
|
||||
endi
|
||||
|
||||
sleep 200
|
||||
|
||||
sql delete from t1 where ts >= 1648791223000 and ts <= 1648791223003;
|
||||
|
||||
$loop_count = 0
|
||||
|
||||
loop5:
|
||||
sleep 200
|
||||
sql select * from streamt order by c1, c2, c3;
|
||||
|
||||
$loop_count = $loop_count + 1
|
||||
if $loop_count == 10 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data01 != 2 then
|
||||
print =====data01=$data01
|
||||
goto loop5
|
||||
endi
|
||||
|
||||
if $data02 != 4 then
|
||||
print =====data02=$data02
|
||||
goto loop5
|
||||
endi
|
||||
|
||||
sql insert into t1 values(1648791213000,1,1,1,1.0);
|
||||
sql insert into t1 values(1648791213005,2,2,2,2.0);
|
||||
sql insert into t1 values(1648791213006,3,3,3,3.0);
|
||||
sql insert into t1 values(1648791213007,4,4,4,4.0);
|
||||
|
||||
sql insert into t1 values(1648791223000,1,1,1,1.0);
|
||||
sql insert into t1 values(1648791223001,2,2,2,2.0);
|
||||
sql insert into t1 values(1648791223002,3,3,3,3.0);
|
||||
sql insert into t1 values(1648791223003,4,4,4,4.0);
|
||||
|
||||
sql insert into t1 values(1648791233000,1,1,1,1.0);
|
||||
sql insert into t1 values(1648791233001,2,2,2,2.0);
|
||||
sql insert into t1 values(1648791233008,3,3,3,3.0);
|
||||
sql insert into t1 values(1648791233009,4,4,4,4.0);
|
||||
|
||||
sql delete from t1 where ts >= 1648791213001 and ts <= 1648791233005;
|
||||
|
||||
$loop_count = 0
|
||||
|
||||
loop6:
|
||||
sleep 200
|
||||
sql select * from streamt order by c1, c2, c3;
|
||||
|
||||
$loop_count = $loop_count + 1
|
||||
if $loop_count == 10 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data01 != 1 then
|
||||
print =====data01=$data01
|
||||
goto loop6
|
||||
endi
|
||||
|
||||
if $data02 != 1 then
|
||||
print =====data02=$data02
|
||||
goto loop6
|
||||
endi
|
||||
|
||||
if $data11 != 2 then
|
||||
print =====data11=$data11
|
||||
goto loop6
|
||||
endi
|
||||
|
||||
if $data12 != 4 then
|
||||
print =====data12=$data12
|
||||
goto loop6
|
||||
endi
|
||||
|
||||
sql drop stream if exists streams2;
|
||||
sql drop database if exists test2;
|
||||
sql create database test2 vgroups 4;
|
||||
sql use test2;
|
||||
sql create stable st(ts timestamp, a int, b int, c int, d double) tags(ta int,tb int,tc int);
|
||||
sql create table t1 using st tags(1,1,1);
|
||||
sql create table t2 using st tags(2,2,2);
|
||||
sql create stream streams2 trigger at_once into test.streamt2 as select _wstart c1, count(*) c2, max(a) c3 from st session(ts,5s);
|
||||
|
||||
sql insert into t1 values(1648791213000,NULL,NULL,NULL,NULL);
|
||||
sql insert into t2 values(1648791213000,NULL,NULL,NULL,NULL);
|
||||
|
||||
$loop_count = 0
|
||||
|
||||
loop7:
|
||||
sleep 200
|
||||
sql select * from test.streamt2 order by c1, c2, c3;
|
||||
|
||||
$loop_count = $loop_count + 1
|
||||
if $loop_count == 10 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $rows != 1 then
|
||||
print =====rows=$rows
|
||||
goto loop7
|
||||
endi
|
||||
|
||||
sleep 200
|
||||
|
||||
sql delete from t1 where ts = 1648791213000;
|
||||
|
||||
$loop_count = 0
|
||||
|
||||
loop8:
|
||||
sleep 200
|
||||
|
||||
sql select * from test.streamt2 order by c1, c2, c3;
|
||||
|
||||
$loop_count = $loop_count + 1
|
||||
if $loop_count == 10 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data01 != 1 then
|
||||
print =====data01=$data01
|
||||
goto loop8
|
||||
endi
|
||||
|
||||
if $data02 != NULL then
|
||||
print =====data02=$data02
|
||||
goto loop8
|
||||
endi
|
||||
|
||||
sql insert into t1 values(1648791223000,1,2,3,1.0);
|
||||
sql insert into t1 values(1648791223001,1,2,3,1.0);
|
||||
sql insert into t1 values(1648791223002,3,2,3,1.0);
|
||||
sql insert into t1 values(1648791223003,3,2,3,1.0);
|
||||
sql insert into t2 values(1648791223000,1,2,3,1.0);
|
||||
sql insert into t2 values(1648791223001,1,2,3,1.0);
|
||||
sql insert into t2 values(1648791223002,3,2,3,1.0);
|
||||
sql insert into t2 values(1648791223003,3,2,3,1.0);
|
||||
|
||||
sleep 200
|
||||
|
||||
sql delete from t2 where ts >= 1648791223000 and ts <= 1648791223001;
|
||||
|
||||
$loop_count = 0
|
||||
|
||||
loop11:
|
||||
sleep 200
|
||||
sql select * from test.streamt2 order by c1, c2, c3;
|
||||
|
||||
$loop_count = $loop_count + 1
|
||||
if $loop_count == 10 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data01 != 1 then
|
||||
print =====data01=$data01
|
||||
goto loop11
|
||||
endi
|
||||
|
||||
if $data02 != NULL then
|
||||
print =====data02=$data02
|
||||
goto loop11
|
||||
endi
|
||||
|
||||
if $data11 != 6 then
|
||||
print =====data11=$data11
|
||||
goto loop11
|
||||
endi
|
||||
|
||||
if $data12 != 3 then
|
||||
print =====data12=$data12
|
||||
goto loop11
|
||||
endi
|
||||
|
||||
sleep 200
|
||||
|
||||
sql delete from st where ts >= 1648791223000 and ts <= 1648791223003;
|
||||
|
||||
$loop_count = 0
|
||||
|
||||
loop12:
|
||||
sleep 200
|
||||
sql select * from test.streamt2 order by c1, c2, c3;
|
||||
|
||||
$loop_count = $loop_count + 1
|
||||
if $loop_count == 10 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $rows != 1 then
|
||||
print =====rows=$rows
|
||||
goto loop12
|
||||
endi
|
||||
|
||||
if $data01 != 1 then
|
||||
print =====data01=$data01
|
||||
goto loop12
|
||||
endi
|
||||
|
||||
if $data02 != NULL then
|
||||
print =====data02=$data02
|
||||
goto loop12
|
||||
endi
|
||||
|
||||
sql insert into t1 values(1648791213004,3,2,3,1.0);
|
||||
sql insert into t1 values(1648791213005,3,2,3,1.0);
|
||||
sql insert into t1 values(1648791213006,3,2,3,1.0);
|
||||
sql insert into t1 values(1648791223004,1,2,3,1.0);
|
||||
sql insert into t2 values(1648791213004,3,2,3,1.0);
|
||||
sql insert into t2 values(1648791213005,3,2,3,1.0);
|
||||
sql insert into t2 values(1648791213006,3,2,3,1.0);
|
||||
sql insert into t2 values(1648791223004,1,2,3,1.0);
|
||||
|
||||
sleep 200
|
||||
|
||||
sql delete from t2 where ts >= 1648791213004 and ts <= 1648791213006;
|
||||
|
||||
$loop_count = 0
|
||||
|
||||
loop13:
|
||||
sleep 200
|
||||
sql select * from test.streamt2 order by c1, c2, c3;
|
||||
|
||||
$loop_count = $loop_count + 1
|
||||
if $loop_count == 10 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $rows != 2 then
|
||||
print =====rows=$rows
|
||||
goto loop13
|
||||
endi
|
||||
|
||||
if $data01 != 4 then
|
||||
print =====data01=$data01
|
||||
goto loop13
|
||||
endi
|
||||
|
||||
if $data02 != 3 then
|
||||
print =====data02=$data02
|
||||
goto loop13
|
||||
endi
|
||||
|
||||
if $data11 != 2 then
|
||||
print =====data11=$data11
|
||||
goto loop13
|
||||
endi
|
||||
|
||||
if $data12 != 1 then
|
||||
print =====data12=$data12
|
||||
goto loop13
|
||||
endi
|
||||
|
||||
sql insert into t1 values(1648791223005,1,2,3,1.0);
|
||||
sql insert into t1 values(1648791223006,1,2,3,1.0);
|
||||
sql insert into t2 values(1648791223005,1,2,3,1.0);
|
||||
sql insert into t2 values(1648791223006,1,2,3,1.0);
|
||||
|
||||
sql insert into t1 values(1648791233005,4,2,3,1.0);
|
||||
sql insert into t1 values(1648791233006,2,2,3,1.0);
|
||||
sql insert into t2 values(1648791233005,5,2,3,1.0);
|
||||
sql insert into t2 values(1648791233006,3,2,3,1.0);
|
||||
|
||||
sleep 200
|
||||
|
||||
sql delete from st where ts >= 1648791213001 and ts <= 1648791233005;
|
||||
|
||||
$loop_count = 0
|
||||
|
||||
loop14:
|
||||
sleep 200
|
||||
sql select * from test.streamt2 order by c1, c2, c3;
|
||||
|
||||
$loop_count = $loop_count + 1
|
||||
if $loop_count == 10 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $rows != 2 then
|
||||
print =====rows=$rows
|
||||
goto loop14
|
||||
endi
|
||||
|
||||
if $data01 != 1 then
|
||||
print =====data01=$data01
|
||||
goto loop14
|
||||
endi
|
||||
|
||||
if $data02 != NULL then
|
||||
print =====data02=$data02
|
||||
goto loop14
|
||||
endi
|
||||
|
||||
if $data11 != 2 then
|
||||
print =====data11=$data11
|
||||
goto loop14
|
||||
endi
|
||||
|
||||
if $data12 != 3 then
|
||||
print =====data12=$data12
|
||||
goto loop14
|
||||
endi
|
||||
|
||||
sql drop stream if exists streams1;
|
||||
sql drop stream if exists streams2;
|
||||
sql drop stream if exists streams3;
|
||||
sql drop database if exists test3;
|
||||
sql drop database if exists test;
|
||||
sql create database test3 vgroups 4;
|
||||
sql create database test vgroups 1;
|
||||
sql use test3;
|
||||
sql create stable st(ts timestamp, a int, b int, c int, d double) tags(ta int,tb int,tc int);
|
||||
sql create table t1 using st tags(1,1,1);
|
||||
sql create table t2 using st tags(2,2,2);
|
||||
sql create stream streams3 trigger at_once into test.streamt3 as select _wstart c1, count(*) c2, max(a) c3 from st session(ts,5s);
|
||||
|
||||
sql insert into t1 values(1648791210000,1,1,1,NULL);
|
||||
sql insert into t1 values(1648791210001,2,2,2,NULL);
|
||||
sql insert into t2 values(1648791213001,3,3,3,NULL);
|
||||
sql insert into t2 values(1648791213003,4,4,4,NULL);
|
||||
sql insert into t1 values(1648791216000,5,5,5,NULL);
|
||||
sql insert into t1 values(1648791216002,6,6,6,NULL);
|
||||
sql insert into t1 values(1648791216004,7,7,7,NULL);
|
||||
sql insert into t2 values(1648791218001,8,8,8,NULL);
|
||||
sql insert into t2 values(1648791218003,9,9,9,NULL);
|
||||
sql insert into t1 values(1648791222000,10,10,10,NULL);
|
||||
sql insert into t1 values(1648791222003,11,11,11,NULL);
|
||||
sql insert into t1 values(1648791222005,12,12,12,NULL);
|
||||
|
||||
sql insert into t1 values(1648791232005,13,13,13,NULL);
|
||||
sql insert into t2 values(1648791242005,14,14,14,NULL);
|
||||
|
||||
$loop_count = 0
|
||||
|
||||
loop19:
|
||||
sleep 200
|
||||
sql select * from test.streamt3 order by c1, c2, c3;
|
||||
|
||||
$loop_count = $loop_count + 1
|
||||
if $loop_count == 10 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $rows != 3 then
|
||||
print =====rows=$rows
|
||||
goto loop19
|
||||
endi
|
||||
|
||||
sql delete from t2 where ts >= 1648791213001 and ts <= 1648791218003;
|
||||
|
||||
$loop_count = 0
|
||||
|
||||
loop20:
|
||||
sleep 200
|
||||
sql select * from test.streamt3 order by c1, c2, c3;
|
||||
|
||||
$loop_count = $loop_count + 1
|
||||
if $loop_count == 10 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $rows != 5 then
|
||||
print =====rows=$rows
|
||||
goto loop20
|
||||
endi
|
||||
|
||||
if $data01 != 2 then
|
||||
print =====data01=$data01
|
||||
goto loop20
|
||||
endi
|
||||
|
||||
if $data02 != 2 then
|
||||
print =====data02=$data02
|
||||
goto loop20
|
||||
endi
|
||||
|
||||
if $data11 != 3 then
|
||||
print =====data11=$data11
|
||||
goto loop20
|
||||
endi
|
||||
|
||||
if $data12 != 7 then
|
||||
print =====data12=$data12
|
||||
goto loop20
|
||||
endi
|
||||
|
||||
if $data21 != 3 then
|
||||
print =====data21=$data21
|
||||
goto loop20
|
||||
endi
|
||||
|
||||
if $data22 != 12 then
|
||||
print =====data22=$data22
|
||||
goto loop20
|
||||
endi
|
||||
|
||||
if $data31 != 1 then
|
||||
print =====data31=$data31
|
||||
goto loop20
|
||||
endi
|
||||
|
||||
if $data32 != 13 then
|
||||
print =====data32=$data32
|
||||
goto loop20
|
||||
endi
|
||||
|
||||
if $data41 != 1 then
|
||||
print =====data41=$data41
|
||||
goto loop20
|
||||
endi
|
||||
|
||||
if $data42 != 14 then
|
||||
print =====data42=$data42
|
||||
goto loop20
|
||||
endi
|
||||
|
||||
$loop_all = $loop_all + 1
|
||||
print ============loop_all=$loop_all
|
||||
|
||||
system sh/stop_dnodes.sh
|
||||
|
||||
#goto looptest
|
|
@ -0,0 +1,198 @@
|
|||
$loop_all = 0
|
||||
looptest:
|
||||
|
||||
system sh/stop_dnodes.sh
|
||||
system sh/deploy.sh -n dnode1 -i 1
|
||||
system sh/exec.sh -n dnode1 -s start
|
||||
sleep 200
|
||||
sql connect
|
||||
|
||||
sql drop stream if exists streams0;
|
||||
sql drop stream if exists streams1;
|
||||
sql drop stream if exists streams2;
|
||||
sql drop stream if exists streams3;
|
||||
sql drop stream if exists streams4;
|
||||
sql drop database if exists test;
|
||||
sql create database test vgroups 1;
|
||||
sql use test;
|
||||
sql create table t1(ts timestamp, a int, b int , c int, d double);
|
||||
sql create stream streams0 trigger at_once into streamt as select _wstart c1, count(*) c2, max(b) c3 from t1 state_window(a);
|
||||
|
||||
sql insert into t1 values(1648791213000,NULL,NULL,NULL,NULL);
|
||||
sleep 200
|
||||
sql delete from t1 where ts = 1648791213000;
|
||||
|
||||
$loop_count = 0
|
||||
|
||||
loop0:
|
||||
sleep 200
|
||||
sql select * from streamt order by c1, c2, c3;
|
||||
|
||||
$loop_count = $loop_count + 1
|
||||
if $loop_count == 10 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $rows != 0 then
|
||||
print =====rows=$rows
|
||||
goto loop0
|
||||
endi
|
||||
|
||||
sql insert into t1 values(1648791213000,NULL,NULL,NULL,NULL);
|
||||
|
||||
$loop_count = 0
|
||||
|
||||
loop1:
|
||||
sleep 200
|
||||
sql select * from streamt order by c1, c2, c3;
|
||||
|
||||
$loop_count = $loop_count + 1
|
||||
if $loop_count == 10 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data01 != 1 then
|
||||
print =====data01=$data01
|
||||
goto loop1
|
||||
endi
|
||||
|
||||
if $data02 != NULL then
|
||||
print =====data02=$data02
|
||||
goto loop1
|
||||
endi
|
||||
|
||||
sql insert into t1 values(1648791213000,1,1,1,1.0);
|
||||
sql insert into t1 values(1648791213001,1,2,2,2.0);
|
||||
sql insert into t1 values(1648791213002,1,3,3,3.0);
|
||||
sql insert into t1 values(1648791213003,1,4,4,4.0);
|
||||
|
||||
sleep 200
|
||||
sql delete from t1 where ts >= 1648791213001 and ts <= 1648791213002;
|
||||
|
||||
$loop_count = 0
|
||||
|
||||
loop3:
|
||||
sleep 200
|
||||
sql select * from streamt order by c1, c2, c3;
|
||||
|
||||
$loop_count = $loop_count + 1
|
||||
if $loop_count == 10 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data01 != 2 then
|
||||
print =====data01=$data01
|
||||
goto loop3
|
||||
endi
|
||||
|
||||
if $data02 != 4 then
|
||||
print =====data02=$data02
|
||||
goto loop3
|
||||
endi
|
||||
|
||||
sql insert into t1 values(1648791223000,2,2,3,1.0);
|
||||
sql insert into t1 values(1648791223001,2,2,3,1.0);
|
||||
sql insert into t1 values(1648791223002,2,2,3,1.0);
|
||||
sql insert into t1 values(1648791223003,2,2,3,1.0);
|
||||
|
||||
$loop_count = 0
|
||||
|
||||
loop4:
|
||||
sleep 200
|
||||
sql select * from streamt order by c1, c2, c3;
|
||||
|
||||
$loop_count = $loop_count + 1
|
||||
if $loop_count == 10 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $rows != 2 then
|
||||
print =====rows=$rows
|
||||
goto loop4
|
||||
endi
|
||||
|
||||
sleep 200
|
||||
|
||||
sql delete from t1 where ts >= 1648791223000 and ts <= 1648791223003;
|
||||
|
||||
$loop_count = 0
|
||||
|
||||
loop5:
|
||||
sleep 200
|
||||
sql select * from streamt order by c1, c2, c3;
|
||||
|
||||
$loop_count = $loop_count + 1
|
||||
if $loop_count == 10 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data01 != 2 then
|
||||
print =====data01=$data01
|
||||
goto loop5
|
||||
endi
|
||||
|
||||
if $data02 != 4 then
|
||||
print =====data02=$data02
|
||||
goto loop5
|
||||
endi
|
||||
|
||||
sql insert into t1 values(1648791213000,1,1,1,1.0);
|
||||
sql insert into t1 values(1648791213005,1,2,2,2.0);
|
||||
sql insert into t1 values(1648791213006,1,3,3,3.0);
|
||||
sql insert into t1 values(1648791213007,1,4,4,4.0);
|
||||
|
||||
sql insert into t1 values(1648791223000,2,1,1,1.0);
|
||||
sql insert into t1 values(1648791223001,2,2,2,2.0);
|
||||
sql insert into t1 values(1648791223002,2,3,3,3.0);
|
||||
sql insert into t1 values(1648791223003,2,4,4,4.0);
|
||||
|
||||
sql insert into t1 values(1648791233000,3,1,1,1.0);
|
||||
sql insert into t1 values(1648791233001,3,2,2,2.0);
|
||||
sql insert into t1 values(1648791233008,3,3,3,3.0);
|
||||
sql insert into t1 values(1648791233009,3,4,4,4.0);
|
||||
|
||||
sql delete from t1 where ts >= 1648791213001 and ts <= 1648791233005;
|
||||
|
||||
$loop_count = 0
|
||||
|
||||
loop6:
|
||||
sleep 200
|
||||
sql select * from streamt order by c1, c2, c3;
|
||||
|
||||
$loop_count = $loop_count + 1
|
||||
if $loop_count == 10 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $rows != 2 then
|
||||
print =====rows=$rows
|
||||
goto loop6
|
||||
endi
|
||||
|
||||
if $data01 != 1 then
|
||||
print =====data01=$data01
|
||||
goto loop6
|
||||
endi
|
||||
|
||||
if $data02 != 1 then
|
||||
print =====data02=$data02
|
||||
goto loop6
|
||||
endi
|
||||
|
||||
if $data11 != 2 then
|
||||
print =====data11=$data11
|
||||
goto loop6
|
||||
endi
|
||||
|
||||
if $data12 != 4 then
|
||||
print =====data12=$data12
|
||||
goto loop6
|
||||
endi
|
||||
|
||||
|
||||
$loop_all = $loop_all + 1
|
||||
print ============loop_all=$loop_all
|
||||
|
||||
system sh/stop_dnodes.sh
|
||||
|
||||
#goto looptest
|
Loading…
Reference in New Issue