add twa ci
This commit is contained in:
parent
b24c9c5d20
commit
39379af1a2
|
@ -879,6 +879,7 @@ typedef struct SStreamIntervalSliceOperatorInfo {
|
|||
SGroupResInfo groupResInfo;
|
||||
struct SOperatorInfo* pOperator;
|
||||
bool hasFill;
|
||||
bool hasInterpoFunc;
|
||||
} SStreamIntervalSliceOperatorInfo;
|
||||
|
||||
#define OPTR_IS_OPENED(_optr) (((_optr)->status & OP_OPENED) == OP_OPENED)
|
||||
|
|
|
@ -2821,6 +2821,8 @@ char* getStreamOpName(uint16_t opType) {
|
|||
return "stream count";
|
||||
case QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERP_FUNC:
|
||||
return "stream interp";
|
||||
case QUERY_NODE_PHYSICAL_PLAN_STREAM_CONTINUE_INTERVAL:
|
||||
return "interval continue";
|
||||
}
|
||||
return "";
|
||||
}
|
||||
|
|
|
@ -1258,11 +1258,9 @@ static int32_t doStreamForceFillImpl(SOperatorInfo* pOperator) {
|
|||
SStreamAggSupporter* pAggSup = pInfo->pStreamAggSup;
|
||||
SColumnInfoData* pTsCol = taosArrayGet(pInfo->pSrcBlock->pDataBlock, pInfo->primaryTsCol);
|
||||
TSKEY* tsCol = (TSKEY*)pTsCol->pData;
|
||||
if (pFillInfo->type == TSDB_FILL_PREV) {
|
||||
for (int32_t i = 0; i < pBlock->info.rows; i++){
|
||||
code = keepBlockRowInDiscBuf(pOperator, pFillInfo, pBlock, tsCol, i, groupId, pFillSup->rowSize);
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
}
|
||||
for (int32_t i = 0; i < pBlock->info.rows; i++){
|
||||
code = keepBlockRowInDiscBuf(pOperator, pFillInfo, pBlock, tsCol, i, groupId, pFillSup->rowSize);
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
}
|
||||
code = pAggSup->stateStore.streamStateGroupPut(pAggSup->pState, groupId, NULL, 0);
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
|
@ -1323,10 +1321,7 @@ static int32_t doStreamForceFillNext(SOperatorInfo* pOperator, SSDataBlock** ppR
|
|||
(*ppRes) = resBlock;
|
||||
goto _end;
|
||||
}
|
||||
|
||||
if (pInfo->pFillInfo->type == TSDB_FILL_PREV) {
|
||||
pInfo->pStreamAggSup->stateStore.streamStateClearExpiredState(pInfo->pStreamAggSup->pState);
|
||||
}
|
||||
pInfo->pStreamAggSup->stateStore.streamStateClearExpiredState(pInfo->pStreamAggSup->pState);
|
||||
setStreamOperatorCompleted(pOperator);
|
||||
(*ppRes) = NULL;
|
||||
goto _end;
|
||||
|
@ -1393,9 +1388,7 @@ static int32_t doStreamForceFillNext(SOperatorInfo* pOperator, SSDataBlock** ppR
|
|||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
|
||||
if ((*ppRes) == NULL) {
|
||||
if (pInfo->pFillInfo->type == TSDB_FILL_PREV) {
|
||||
pInfo->pStreamAggSup->stateStore.streamStateClearExpiredState(pInfo->pStreamAggSup->pState);
|
||||
}
|
||||
pInfo->pStreamAggSup->stateStore.streamStateClearExpiredState(pInfo->pStreamAggSup->pState);
|
||||
setStreamOperatorCompleted(pOperator);
|
||||
}
|
||||
|
||||
|
@ -1655,24 +1648,6 @@ _end:
|
|||
return code;
|
||||
}
|
||||
|
||||
int32_t initForceFillDownStream(SOperatorInfo* downstream) {
|
||||
SExecTaskInfo* pTaskInfo = downstream->pTaskInfo;
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
int32_t lino = 0;
|
||||
|
||||
if (downstream == NULL) {
|
||||
return TSDB_CODE_STREAM_INTERNAL_ERROR;
|
||||
}
|
||||
|
||||
if (downstream->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
|
||||
code = initForceFillDownStream(downstream->pDownstream[0]);
|
||||
return code;
|
||||
}
|
||||
SStreamScanInfo* pInfo = (SStreamScanInfo*) downstream->info;
|
||||
pInfo->useGetResultRange = true;
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t createStreamFillOperatorInfo(SOperatorInfo* downstream, SStreamFillPhysiNode* pPhyFillNode,
|
||||
SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
|
||||
QRY_PARAM_CHECK(pOptrInfo);
|
||||
|
@ -1755,9 +1730,6 @@ int32_t createStreamFillOperatorInfo(SOperatorInfo* downstream, SStreamFillPhysi
|
|||
pTaskInfo);
|
||||
|
||||
if (triggerType == STREAM_TRIGGER_FORCE_WINDOW_CLOSE) {
|
||||
code = initForceFillDownStream(downstream);
|
||||
QUERY_CHECK_CODE(code, lino, _error);
|
||||
|
||||
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamForceFillNext, NULL, destroyStreamFillOperatorInfo,
|
||||
optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
|
||||
} else {
|
||||
|
|
|
@ -116,7 +116,7 @@ void initIntervalSlicePoint(SStreamAggSupporter* pAggSup, STimeWindow* pTWin, in
|
|||
pPoint->pLastRow = POINTER_SHIFT(pPoint->pResPos->pRowBuff, pAggSup->resultRowSize - pAggSup->stateKeySize);
|
||||
}
|
||||
|
||||
static int32_t getIntervalSliceCurStateBuf(SStreamAggSupporter* pAggSup, SInterval* pInterval, STimeWindow* pTWin, int64_t groupId,
|
||||
static int32_t getIntervalSliceCurStateBuf(SStreamAggSupporter* pAggSup, SInterval* pInterval, bool needPrev, STimeWindow* pTWin, int64_t groupId,
|
||||
SInervalSlicePoint* pCurPoint, SInervalSlicePoint* pPrevPoint, int32_t* pWinCode) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
int32_t lino = 0;
|
||||
|
@ -126,26 +126,28 @@ static int32_t getIntervalSliceCurStateBuf(SStreamAggSupporter* pAggSup, SInterv
|
|||
&curVLen, pWinCode);
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
|
||||
qDebug("===stream=== set stream twa next point buf.ts:%" PRId64 ", groupId:%" PRId64 ", res:%d",
|
||||
qDebug("===stream=== set stream twa next point buf.ts:%" PRId64 ", groupId:%" PRIu64 ", res:%d",
|
||||
curKey.ts, curKey.groupId, *pWinCode);
|
||||
|
||||
initIntervalSlicePoint(pAggSup, pTWin, groupId, pCurPoint);
|
||||
|
||||
SWinKey prevKey = {.groupId = groupId};
|
||||
SET_WIN_KEY_INVALID(prevKey.ts);
|
||||
int32_t prevVLen = 0;
|
||||
int32_t prevWinCode = TSDB_CODE_SUCCESS;
|
||||
code = pAggSup->stateStore.streamStateGetPrev(pAggSup->pState, &curKey, &prevKey,
|
||||
(void**)&pPrevPoint->pResPos, &prevVLen, &prevWinCode);
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
if (needPrev) {
|
||||
SWinKey prevKey = {.groupId = groupId};
|
||||
SET_WIN_KEY_INVALID(prevKey.ts);
|
||||
int32_t prevVLen = 0;
|
||||
int32_t prevWinCode = TSDB_CODE_SUCCESS;
|
||||
code = pAggSup->stateStore.streamStateGetPrev(pAggSup->pState, &curKey, &prevKey, (void**)&pPrevPoint->pResPos,
|
||||
&prevVLen, &prevWinCode);
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
|
||||
if (prevWinCode == TSDB_CODE_SUCCESS) {
|
||||
STimeWindow prevSTW = {.skey = prevKey.ts};
|
||||
prevSTW.ekey = taosTimeGetIntervalEnd(prevSTW.skey, pInterval);
|
||||
initIntervalSlicePoint(pAggSup, &prevSTW, groupId, pPrevPoint);
|
||||
} else {
|
||||
SET_WIN_KEY_INVALID(pPrevPoint->winKey.win.skey);
|
||||
SET_WIN_KEY_INVALID(pPrevPoint->winKey.win.ekey);
|
||||
if (prevWinCode == TSDB_CODE_SUCCESS) {
|
||||
STimeWindow prevSTW = {.skey = prevKey.ts};
|
||||
prevSTW.ekey = taosTimeGetIntervalEnd(prevSTW.skey, pInterval);
|
||||
initIntervalSlicePoint(pAggSup, &prevSTW, groupId, pPrevPoint);
|
||||
} else {
|
||||
SET_WIN_KEY_INVALID(pPrevPoint->winKey.win.skey);
|
||||
SET_WIN_KEY_INVALID(pPrevPoint->winKey.win.ekey);
|
||||
}
|
||||
}
|
||||
|
||||
_end:
|
||||
|
@ -244,10 +246,10 @@ static int32_t doStreamIntervalSliceAggImpl(SOperatorInfo* pOperator, SSDataBloc
|
|||
}
|
||||
|
||||
int32_t winCode = TSDB_CODE_SUCCESS;
|
||||
code = getIntervalSliceCurStateBuf(&pInfo->streamAggSup, &pInfo->interval, &curWin, groupId, &curPoint, &prevPoint, &winCode);
|
||||
code = getIntervalSliceCurStateBuf(&pInfo->streamAggSup, &pInfo->interval, pInfo->hasInterpoFunc, &curWin, groupId, &curPoint, &prevPoint, &winCode);
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
|
||||
if (IS_VALID_WIN_KEY(prevPoint.winKey.win.skey) && prevPoint.pLastRow->key != prevPoint.winKey.win.ekey) {
|
||||
if (pInfo->hasInterpoFunc && IS_VALID_WIN_KEY(prevPoint.winKey.win.skey) && prevPoint.pLastRow->key != prevPoint.winKey.win.ekey) {
|
||||
code = setIntervalSliceOutputBuf(&prevPoint, pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset);
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
|
||||
|
@ -266,13 +268,13 @@ static int32_t doStreamIntervalSliceAggImpl(SOperatorInfo* pOperator, SSDataBloc
|
|||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
|
||||
resetIntervalSliceFunctionKey(pSup->pCtx, numOfOutput);
|
||||
if (IS_VALID_WIN_KEY(prevPoint.winKey.win.skey) && curPoint.winKey.win.skey != curTs) {
|
||||
if (pInfo->hasInterpoFunc && IS_VALID_WIN_KEY(prevPoint.winKey.win.skey) && curPoint.winKey.win.skey != curTs) {
|
||||
doStreamSliceInterpolation(prevPoint.pLastRow, curPoint.winKey.win.skey, curTs, pBlock, startPos, &pOperator->exprSupp, INTERVAL_SLICE_START);
|
||||
}
|
||||
forwardRows = getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, curWin.ekey, binarySearchForKey, NULL,
|
||||
TSDB_ORDER_ASC);
|
||||
int32_t prevEndPos = (forwardRows - 1) + startPos;
|
||||
if (winCode != TSDB_CODE_SUCCESS) {
|
||||
if (pInfo->hasInterpoFunc && winCode != TSDB_CODE_SUCCESS) {
|
||||
int32_t endRowId = getQualifiedRowNumDesc(pSup, pBlock, tsCols, prevEndPos, false);
|
||||
TSKEY endRowTs = tsCols[endRowId];
|
||||
transBlockToSliceResultRow(pBlock, endRowId, endRowTs, curPoint.pLastRow, 0, NULL, NULL);
|
||||
|
@ -387,7 +389,7 @@ static int32_t doStreamIntervalSliceNext(SOperatorInfo* pOperator, SSDataBlock**
|
|||
|
||||
code = setInputDataBlock(&pOperator->exprSupp, pBlock, TSDB_ORDER_ASC, MAIN_SCAN, true);
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
code = doStreamIntervalSliceAggImpl(pOperator, pBlock, pInfo->pUpdatedMap, pInfo->pDeletedMap);
|
||||
code = doStreamIntervalSliceAggImpl(pOperator, pBlock, pInfo->pUpdatedMap, NULL);
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
|
||||
}
|
||||
|
@ -435,7 +437,7 @@ _end:
|
|||
|
||||
int32_t initIntervalSliceDownStream(SOperatorInfo* downstream, SStreamAggSupporter* pAggSup, uint16_t type,
|
||||
int32_t tsColIndex, STimeWindowAggSupp* pTwSup, struct SSteamOpBasicInfo* pBasic,
|
||||
SInterval* pInterval) {
|
||||
SInterval* pInterval, bool hasInterpoFunc) {
|
||||
SExecTaskInfo* pTaskInfo = downstream->pTaskInfo;
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
int32_t lino = 0;
|
||||
|
@ -447,10 +449,11 @@ int32_t initIntervalSliceDownStream(SOperatorInfo* downstream, SStreamAggSupport
|
|||
|
||||
if (downstream->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
|
||||
code =
|
||||
initIntervalSliceDownStream(downstream->pDownstream[0], pAggSup, type, tsColIndex, pTwSup, pBasic, pInterval);
|
||||
initIntervalSliceDownStream(downstream->pDownstream[0], pAggSup, type, tsColIndex, pTwSup, pBasic, pInterval, hasInterpoFunc);
|
||||
return code;
|
||||
}
|
||||
SStreamScanInfo* pScanInfo = downstream->info;
|
||||
pScanInfo->useGetResultRange = hasInterpoFunc;
|
||||
pScanInfo->igCheckUpdate = true;
|
||||
pScanInfo->windowSup = (SWindowSupporter){.pStreamAggSup = pAggSup, .gap = pAggSup->gap, .parentType = type};
|
||||
pScanInfo->pState = pAggSup->pState;
|
||||
|
@ -474,6 +477,18 @@ _end:
|
|||
return code;
|
||||
}
|
||||
|
||||
static bool windowinterpNeeded(SqlFunctionCtx* pCtx, int32_t numOfCols) {
|
||||
bool needed = false;
|
||||
for (int32_t i = 0; i < numOfCols; ++i) {
|
||||
SExprInfo* pExpr = pCtx[i].pExpr;
|
||||
if (fmIsIntervalInterpoFunc(pCtx[i].functionId)) {
|
||||
needed = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
return needed;
|
||||
}
|
||||
|
||||
int32_t createStreamIntervalSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo,
|
||||
SReadHandle* pHandle, SOperatorInfo** ppOptInfo) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
|
@ -564,6 +579,7 @@ int32_t createStreamIntervalSliceOperatorInfo(SOperatorInfo* downstream, SPhysiN
|
|||
pInfo->destHasPrimaryKey = pIntervalPhyNode->window.destHasPrimaryKey;
|
||||
pInfo->pOperator = pOperator;
|
||||
pInfo->hasFill = false;
|
||||
pInfo->hasInterpoFunc = windowinterpNeeded(pExpSup->pCtx, numOfExprs);
|
||||
|
||||
setOperatorInfo(pOperator, "StreamIntervalSliceOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_CONTINUE_INTERVAL, true, OP_NOT_OPENED,
|
||||
pInfo, pTaskInfo);
|
||||
|
@ -574,7 +590,7 @@ int32_t createStreamIntervalSliceOperatorInfo(SOperatorInfo* downstream, SPhysiN
|
|||
initStreamBasicInfo(&pInfo->basic);
|
||||
if (downstream) {
|
||||
code = initIntervalSliceDownStream(downstream, &pInfo->streamAggSup, pPhyNode->type, pInfo->primaryTsIndex,
|
||||
&pInfo->twAggSup, &pInfo->basic, &pInfo->interval);
|
||||
&pInfo->twAggSup, &pInfo->basic, &pInfo->interval, pInfo->hasInterpoFunc);
|
||||
QUERY_CHECK_CODE(code, lino, _error);
|
||||
|
||||
code = appendDownstream(pOperator, &downstream, 1);
|
||||
|
|
|
@ -0,0 +1,29 @@
|
|||
system sh/stop_dnodes.sh
|
||||
system sh/deploy.sh -n dnode1 -i 1
|
||||
system sh/exec.sh -n dnode1 -s start
|
||||
sleep 50
|
||||
sql connect
|
||||
|
||||
print step1
|
||||
print =============== create database
|
||||
sql create database test vgroups 1;
|
||||
sql use test;
|
||||
|
||||
sql create stable st(ts timestamp, a int, b int , c int)tags(ta int,tb int,tc int);
|
||||
sql create table t1 using st tags(1,1,1);
|
||||
sql create table t2 using st tags(2,2,2);
|
||||
|
||||
sql create stream streams1 trigger force_window_close IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt as select _wstart, twa(a) from st partition by tbname,ta interval(2s) fill(prev);
|
||||
|
||||
sql_error create stream streams2 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt2 as select _wstart, twa(a) from st partition by tbname,ta interval(2s) fill(prev);
|
||||
sql_error create stream streams3 trigger window_close IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt3 as select _wstart, twa(a) from st partition by tbname,ta interval(2s) fill(prev);
|
||||
sql_error create stream streams4 trigger max_delay 5s IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt4 as select _wstart, twa(a) from st partition by tbname,ta interval(2s) fill(prev);
|
||||
|
||||
sql_error create stream streams5 trigger force_window_close IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt5 as select _wstart, twa(a) from st interval(2s) fill(prev);
|
||||
sql_error create stream streams6 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt6 as select last(ts), twa(a) from st partition by tbname,ta;
|
||||
sql_error create stream streams7 trigger force_window_close IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt7 as select _wstart, twa(a) from st partition by tbname,ta session(ts, 2s);
|
||||
sql_error create stream streams8 trigger force_window_close IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt8 as select _wstart, twa(a) from st partition by tbname,ta state_window(a);
|
||||
|
||||
print end
|
||||
|
||||
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
|
@ -0,0 +1,278 @@
|
|||
system sh/stop_dnodes.sh
|
||||
system sh/deploy.sh -n dnode1 -i 1
|
||||
system sh/exec.sh -n dnode1 -s start
|
||||
sleep 50
|
||||
sql connect
|
||||
|
||||
print step1
|
||||
print =============== create database
|
||||
sql create database test vgroups 1;
|
||||
sql use test;
|
||||
|
||||
sql create stable st(ts timestamp, a int, b int , c int)tags(ta int,tb int,tc int);
|
||||
sql create table t1 using st tags(1,1,1);
|
||||
sql create table t2 using st tags(2,2,2);
|
||||
|
||||
sql create stream streams1 trigger force_window_close IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt as select _wstart, twa(a), twa(b), elapsed(ts), now ,timezone(), ta from st partition by tbname,ta interval(2s) fill(prev);
|
||||
|
||||
run tsim/stream/checkTaskStatus.sim
|
||||
|
||||
sql insert into t1 values(now + 3s,1,1,1) (now + 4s,10,1,1) (now + 7s,20,2,2) (now + 8s,30,3,3);
|
||||
sql insert into t2 values(now + 4s,1,1,1) (now + 5s,10,1,1) (now + 8s,20,2,2) (now + 9s,30,3,3);
|
||||
|
||||
|
||||
print sql select * from t1;
|
||||
sql select * from t1;
|
||||
|
||||
print $data00 $data01 $data02 $data03 $data04
|
||||
print $data10 $data11 $data12 $data13 $data14
|
||||
print $data20 $data21 $data22 $data23 $data24
|
||||
print $data30 $data31 $data32 $data33 $data34
|
||||
print $data40 $data41 $data42 $data43 $data44
|
||||
print $data50 $data51 $data52 $data53 $data54
|
||||
|
||||
print sql select * from t2;
|
||||
sql select * from t2;
|
||||
|
||||
print $data00 $data01 $data02 $data03 $data04
|
||||
print $data10 $data11 $data12 $data13 $data14
|
||||
print $data20 $data21 $data22 $data23 $data24
|
||||
print $data30 $data31 $data32 $data33 $data34
|
||||
print $data40 $data41 $data42 $data43 $data44
|
||||
print $data50 $data51 $data52 $data53 $data54
|
||||
|
||||
$loop_count = 0
|
||||
loop0:
|
||||
|
||||
sleep 2000
|
||||
|
||||
$loop_count = $loop_count + 1
|
||||
if $loop_count == 20 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
print 2 sql select * from streamt where ta == 1;
|
||||
sql select * from streamt where ta == 1;
|
||||
|
||||
print $data00 $data01 $data02 $data03 $data04
|
||||
print $data10 $data11 $data12 $data13 $data14
|
||||
print $data20 $data21 $data22 $data23 $data24
|
||||
print $data30 $data31 $data32 $data33 $data34
|
||||
print $data40 $data41 $data42 $data43 $data44
|
||||
print $data50 $data51 $data52 $data53 $data54
|
||||
|
||||
|
||||
# row 0
|
||||
if $rows < 5 then
|
||||
print ======rows=$rows
|
||||
goto loop0
|
||||
endi
|
||||
|
||||
$loop_count = 0
|
||||
loop1:
|
||||
|
||||
sleep 2000
|
||||
|
||||
$loop_count = $loop_count + 1
|
||||
if $loop_count == 20 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
print 2 sql select * from streamt where ta == 2;
|
||||
sql select * from streamt where ta == 2;
|
||||
|
||||
print $data00 $data01 $data02 $data03 $data04
|
||||
print $data10 $data11 $data12 $data13 $data14
|
||||
print $data20 $data21 $data22 $data23 $data24
|
||||
print $data30 $data31 $data32 $data33 $data34
|
||||
print $data40 $data41 $data42 $data43 $data44
|
||||
print $data50 $data51 $data52 $data53 $data54
|
||||
|
||||
# row 0
|
||||
if $rows < 5 then
|
||||
print ======rows=$rows
|
||||
goto loop1
|
||||
endi
|
||||
|
||||
|
||||
print step2
|
||||
print =============== create database
|
||||
sql create database test2 vgroups 1;
|
||||
sql use test2;
|
||||
|
||||
sql create stable st(ts timestamp, a int, b int , c int)tags(ta int,tb int,tc int);
|
||||
sql create table t1 using st tags(1,1,1);
|
||||
sql create table t2 using st tags(2,2,2);
|
||||
sql create stream streams2 trigger force_window_close IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt as select _wstart, twa(a), twa(b), elapsed(ts), now ,timezone(), ta from st partition by tbname interval(2s) fill(NULL);
|
||||
|
||||
run tsim/stream/checkTaskStatus.sim
|
||||
|
||||
sql insert into t1 values(now + 3s,1,1,1) (now + 4s,10,1,1) (now + 7s,20,2,2) (now + 8s,30,3,3);
|
||||
sql insert into t2 values(now + 4s,1,1,1) (now + 5s,10,1,1) (now + 8s,20,2,2) (now + 9s,30,3,3);
|
||||
|
||||
|
||||
print sql select * from t1;
|
||||
sql select * from t1;
|
||||
|
||||
print $data00 $data01 $data02 $data03 $data04
|
||||
print $data10 $data11 $data12 $data13 $data14
|
||||
print $data20 $data21 $data22 $data23 $data24
|
||||
print $data30 $data31 $data32 $data33 $data34
|
||||
print $data40 $data41 $data42 $data43 $data44
|
||||
print $data50 $data51 $data52 $data53 $data54
|
||||
|
||||
print sql select * from t2;
|
||||
sql select * from t2;
|
||||
|
||||
print $data00 $data01 $data02 $data03 $data04
|
||||
print $data10 $data11 $data12 $data13 $data14
|
||||
print $data20 $data21 $data22 $data23 $data24
|
||||
print $data30 $data31 $data32 $data33 $data34
|
||||
print $data40 $data41 $data42 $data43 $data44
|
||||
print $data50 $data51 $data52 $data53 $data54
|
||||
|
||||
$loop_count = 0
|
||||
loop2:
|
||||
|
||||
sleep 2000
|
||||
|
||||
$loop_count = $loop_count + 1
|
||||
if $loop_count == 20 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
print 2 sql select * from streamt where ta == 1;
|
||||
sql select * from streamt where ta == 1;
|
||||
|
||||
print $data00 $data01 $data02 $data03 $data04
|
||||
print $data10 $data11 $data12 $data13 $data14
|
||||
print $data20 $data21 $data22 $data23 $data24
|
||||
print $data30 $data31 $data32 $data33 $data34
|
||||
print $data40 $data41 $data42 $data43 $data44
|
||||
print $data50 $data51 $data52 $data53 $data54
|
||||
|
||||
|
||||
# row 0
|
||||
if $rows < 5 then
|
||||
print ======rows=$rows
|
||||
goto loop2
|
||||
endi
|
||||
|
||||
$loop_count = 0
|
||||
loop3:
|
||||
|
||||
sleep 2000
|
||||
|
||||
$loop_count = $loop_count + 1
|
||||
if $loop_count == 20 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
print 2 sql select * from streamt where ta == 2;
|
||||
sql select * from streamt where ta == 2;
|
||||
|
||||
print $data00 $data01 $data02 $data03 $data04
|
||||
print $data10 $data11 $data12 $data13 $data14
|
||||
print $data20 $data21 $data22 $data23 $data24
|
||||
print $data30 $data31 $data32 $data33 $data34
|
||||
print $data40 $data41 $data42 $data43 $data44
|
||||
print $data50 $data51 $data52 $data53 $data54
|
||||
|
||||
# row 0
|
||||
if $rows < 5 then
|
||||
print ======rows=$rows
|
||||
goto loop3
|
||||
endi
|
||||
|
||||
print step3
|
||||
print =============== create database
|
||||
sql create database test3 vgroups 1;
|
||||
sql use test3;
|
||||
|
||||
sql create stable st(ts timestamp, a int, b int , c int)tags(ta int,tb int,tc int);
|
||||
sql create table t1 using st tags(1,1,1);
|
||||
sql create table t2 using st tags(2,2,2);
|
||||
sql create stream streams3 trigger force_window_close IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt as select _wstart, twa(a), twa(b), elapsed(ts), now ,timezone(), ta from st partition by tbname interval(2s) fill(value,100,200,300);
|
||||
|
||||
run tsim/stream/checkTaskStatus.sim
|
||||
|
||||
sql insert into t1 values(now + 3s,1,1,1) (now + 4s,10,1,1) (now + 7s,20,2,2) (now + 8s,30,3,3);
|
||||
sql insert into t2 values(now + 4s,1,1,1) (now + 5s,10,1,1) (now + 8s,20,2,2) (now + 9s,30,3,3);
|
||||
|
||||
|
||||
print sql select * from t1;
|
||||
sql select * from t1;
|
||||
|
||||
print $data00 $data01 $data02 $data03 $data04
|
||||
print $data10 $data11 $data12 $data13 $data14
|
||||
print $data20 $data21 $data22 $data23 $data24
|
||||
print $data30 $data31 $data32 $data33 $data34
|
||||
print $data40 $data41 $data42 $data43 $data44
|
||||
print $data50 $data51 $data52 $data53 $data54
|
||||
|
||||
print sql select * from t2;
|
||||
sql select * from t2;
|
||||
|
||||
print $data00 $data01 $data02 $data03 $data04
|
||||
print $data10 $data11 $data12 $data13 $data14
|
||||
print $data20 $data21 $data22 $data23 $data24
|
||||
print $data30 $data31 $data32 $data33 $data34
|
||||
print $data40 $data41 $data42 $data43 $data44
|
||||
print $data50 $data51 $data52 $data53 $data54
|
||||
|
||||
$loop_count = 0
|
||||
loop4:
|
||||
|
||||
sleep 2000
|
||||
|
||||
$loop_count = $loop_count + 1
|
||||
if $loop_count == 20 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
print 2 sql select * from streamt where ta == 1;
|
||||
sql select * from streamt where ta == 1;
|
||||
|
||||
print $data00 $data01 $data02 $data03 $data04
|
||||
print $data10 $data11 $data12 $data13 $data14
|
||||
print $data20 $data21 $data22 $data23 $data24
|
||||
print $data30 $data31 $data32 $data33 $data34
|
||||
print $data40 $data41 $data42 $data43 $data44
|
||||
print $data50 $data51 $data52 $data53 $data54
|
||||
|
||||
|
||||
# row 0
|
||||
if $rows < 5 then
|
||||
print ======rows=$rows
|
||||
goto loop4
|
||||
endi
|
||||
|
||||
$loop_count = 0
|
||||
loop5:
|
||||
|
||||
sleep 2000
|
||||
|
||||
$loop_count = $loop_count + 1
|
||||
if $loop_count == 20 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
print 2 sql select * from streamt where ta == 2;
|
||||
sql select * from streamt where ta == 2;
|
||||
|
||||
print $data00 $data01 $data02 $data03 $data04
|
||||
print $data10 $data11 $data12 $data13 $data14
|
||||
print $data20 $data21 $data22 $data23 $data24
|
||||
print $data30 $data31 $data32 $data33 $data34
|
||||
print $data40 $data41 $data42 $data43 $data44
|
||||
print $data50 $data51 $data52 $data53 $data54
|
||||
|
||||
# row 0
|
||||
if $rows < 5 then
|
||||
print ======rows=$rows
|
||||
goto loop5
|
||||
endi
|
||||
|
||||
print end
|
||||
|
||||
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
|
@ -0,0 +1,222 @@
|
|||
system sh/stop_dnodes.sh
|
||||
system sh/deploy.sh -n dnode1 -i 1
|
||||
system sh/exec.sh -n dnode1 -s start
|
||||
sleep 50
|
||||
sql connect
|
||||
|
||||
print step1
|
||||
print =============== create database
|
||||
sql create database test vgroups 1;
|
||||
sql use test;
|
||||
|
||||
sql create stable st(ts timestamp, a int primary key, b int , c int)tags(ta int,tb int,tc int);
|
||||
sql create table t1 using st tags(1,1,1);
|
||||
sql create table t2 using st tags(2,2,2);
|
||||
|
||||
sql create stream streams1 trigger force_window_close IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt as select _wstart, twa(b), count(*),ta from st partition by tbname, ta interval(2s) fill(prev);
|
||||
|
||||
run tsim/stream/checkTaskStatus.sim
|
||||
|
||||
sql select now;
|
||||
|
||||
sql insert into t1 values(now + 3s,1,1,1) (now + 3s,2,10,10) (now + 3s,3,30,30);
|
||||
sql insert into t2 values(now + 4s,1,1,1) (now + 4s,2,10,10) (now + 4s,3,30,30);
|
||||
|
||||
|
||||
print sql select _wstart, twa(b), count(*),ta from t1 partition by tbname, ta interval(2s);
|
||||
sql select _wstart, twa(b), count(*),ta from t1 partition by tbname, ta interval(2s);
|
||||
|
||||
$query1_data = $data01
|
||||
|
||||
print $data00 $data01 $data02 $data03 $data04
|
||||
print $data10 $data11 $data12 $data13 $data14
|
||||
print $data20 $data21 $data22 $data23 $data24
|
||||
print $data30 $data31 $data32 $data33 $data34
|
||||
print $data40 $data41 $data42 $data43 $data44
|
||||
print $data50 $data51 $data52 $data53 $data54
|
||||
|
||||
print sql select _wstart, twa(b), count(*),ta from t2 partition by tbname, ta interval(2s);
|
||||
sql select _wstart, twa(b), count(*),ta from t2 partition by tbname, ta interval(2s);
|
||||
|
||||
$query2_data = $data01
|
||||
|
||||
print $data00 $data01 $data02 $data03 $data04
|
||||
print $data10 $data11 $data12 $data13 $data14
|
||||
print $data20 $data21 $data22 $data23 $data24
|
||||
print $data30 $data31 $data32 $data33 $data34
|
||||
print $data40 $data41 $data42 $data43 $data44
|
||||
print $data50 $data51 $data52 $data53 $data54
|
||||
|
||||
$loop_count = 0
|
||||
loop0:
|
||||
|
||||
sleep 2000
|
||||
|
||||
$loop_count = $loop_count + 1
|
||||
if $loop_count == 20 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
print 2 sql select * from streamt where ta == 1;
|
||||
sql select * from streamt where ta == 1;
|
||||
|
||||
print $data00 $data01 $data02 $data03 $data04
|
||||
print $data10 $data11 $data12 $data13 $data14
|
||||
print $data20 $data21 $data22 $data23 $data24
|
||||
print $data30 $data31 $data32 $data33 $data34
|
||||
print $data40 $data41 $data42 $data43 $data44
|
||||
print $data50 $data51 $data52 $data53 $data54
|
||||
|
||||
|
||||
# row 0
|
||||
if $rows < 6 then
|
||||
print ======rows=$rows
|
||||
goto loop0
|
||||
endi
|
||||
|
||||
if $data01 != $query1_data then
|
||||
print ======data01=$data01
|
||||
return -1
|
||||
endi
|
||||
|
||||
|
||||
$loop_count = 0
|
||||
loop1:
|
||||
|
||||
sleep 2000
|
||||
|
||||
$loop_count = $loop_count + 1
|
||||
if $loop_count == 20 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
print 2 sql select * from streamt where ta == 2;
|
||||
sql select * from streamt where ta == 2;
|
||||
|
||||
print $data00 $data01 $data02 $data03 $data04
|
||||
print $data10 $data11 $data12 $data13 $data14
|
||||
print $data20 $data21 $data22 $data23 $data24
|
||||
print $data30 $data31 $data32 $data33 $data34
|
||||
print $data40 $data41 $data42 $data43 $data44
|
||||
print $data50 $data51 $data52 $data53 $data54
|
||||
|
||||
# row 0
|
||||
if $rows < 6 then
|
||||
print ======rows=$rows
|
||||
goto loop1
|
||||
endi
|
||||
|
||||
|
||||
if $data01 != $query2_data then
|
||||
print ======data01=$data01
|
||||
return -1
|
||||
endi
|
||||
|
||||
|
||||
print step2
|
||||
print =============== create database
|
||||
sql create database test2 vgroups 1;
|
||||
sql use test2;
|
||||
|
||||
sql create stable st(ts timestamp, a int primary key, b int , c int)tags(ta int,tb int,tc int);
|
||||
sql create table t1 using st tags(1,1,1);
|
||||
sql create table t2 using st tags(2,2,2);
|
||||
sql create stream streams2 trigger force_window_close IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt as select _wstart, twa(b), ta from st partition by tbname, ta interval(2s) fill(NULL);
|
||||
|
||||
run tsim/stream/checkTaskStatus.sim
|
||||
|
||||
sql insert into t1 values(now + 3s,1,1,1) (now + 3s,2,10,10) (now + 3s,3,30,30);
|
||||
sql insert into t2 values(now + 4s,1,1,1) (now + 4s,2,10,10) (now + 4s,3,30,30);
|
||||
|
||||
|
||||
print sql select _wstart, twa(b), count(*),ta from t1 partition by tbname, ta interval(2s);
|
||||
sql select _wstart, twa(b), count(*),ta from t1 partition by tbname, ta interval(2s);
|
||||
|
||||
$query1_data = $data01
|
||||
|
||||
print $data00 $data01 $data02 $data03 $data04
|
||||
print $data10 $data11 $data12 $data13 $data14
|
||||
print $data20 $data21 $data22 $data23 $data24
|
||||
print $data30 $data31 $data32 $data33 $data34
|
||||
print $data40 $data41 $data42 $data43 $data44
|
||||
print $data50 $data51 $data52 $data53 $data54
|
||||
|
||||
print sql select _wstart, twa(b), count(*),ta from t2 partition by tbname, ta interval(2s);
|
||||
sql select _wstart, twa(b), count(*),ta from t2 partition by tbname, ta interval(2s);
|
||||
|
||||
$query2_data = $data01
|
||||
|
||||
print $data00 $data01 $data02 $data03 $data04
|
||||
print $data10 $data11 $data12 $data13 $data14
|
||||
print $data20 $data21 $data22 $data23 $data24
|
||||
print $data30 $data31 $data32 $data33 $data34
|
||||
print $data40 $data41 $data42 $data43 $data44
|
||||
print $data50 $data51 $data52 $data53 $data54
|
||||
|
||||
$loop_count = 0
|
||||
loop2:
|
||||
|
||||
sleep 2000
|
||||
|
||||
$loop_count = $loop_count + 1
|
||||
if $loop_count == 20 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
print 2 sql select * from streamt where ta == 1;
|
||||
sql select * from streamt where ta == 1;
|
||||
|
||||
print $data00 $data01 $data02 $data03 $data04
|
||||
print $data10 $data11 $data12 $data13 $data14
|
||||
print $data20 $data21 $data22 $data23 $data24
|
||||
print $data30 $data31 $data32 $data33 $data34
|
||||
print $data40 $data41 $data42 $data43 $data44
|
||||
print $data50 $data51 $data52 $data53 $data54
|
||||
|
||||
|
||||
# row 0
|
||||
if $rows < 6 then
|
||||
print ======rows=$rows
|
||||
goto loop2
|
||||
endi
|
||||
|
||||
if $data01 != $query1_data then
|
||||
print ======data01=$data01
|
||||
return -1
|
||||
endi
|
||||
|
||||
$loop_count = 0
|
||||
loop3:
|
||||
|
||||
sleep 2000
|
||||
|
||||
$loop_count = $loop_count + 1
|
||||
if $loop_count == 20 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
print 2 sql select * from streamt where ta == 2;
|
||||
sql select * from streamt where ta == 2;
|
||||
|
||||
print $data00 $data01 $data02 $data03 $data04
|
||||
print $data10 $data11 $data12 $data13 $data14
|
||||
print $data20 $data21 $data22 $data23 $data24
|
||||
print $data30 $data31 $data32 $data33 $data34
|
||||
print $data40 $data41 $data42 $data43 $data44
|
||||
print $data50 $data51 $data52 $data53 $data54
|
||||
|
||||
# row 0
|
||||
if $rows < 6 then
|
||||
print ======rows=$rows
|
||||
goto loop3
|
||||
endi
|
||||
|
||||
|
||||
if $data01 != $query2_data then
|
||||
print ======data01=$data01
|
||||
return -1
|
||||
endi
|
||||
|
||||
print end
|
||||
|
||||
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
|
@ -0,0 +1,109 @@
|
|||
system sh/stop_dnodes.sh
|
||||
system sh/deploy.sh -n dnode1 -i 1
|
||||
system sh/exec.sh -n dnode1 -s start
|
||||
sleep 50
|
||||
sql connect
|
||||
|
||||
print step1
|
||||
print =============== create database
|
||||
sql create database test vgroups 1;
|
||||
sql use test;
|
||||
|
||||
sql create stable st(ts timestamp, a int primary key, b int , c int)tags(ta int,tb int,tc int);
|
||||
sql create table t1 using st tags(1,1,1);
|
||||
sql create table t2 using st tags(2,2,2);
|
||||
|
||||
sql create stream streams1 trigger force_window_close IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt as select _wstart, count(*), ta from st partition by tbname,ta interval(2s);
|
||||
|
||||
run tsim/stream/checkTaskStatus.sim
|
||||
|
||||
sql insert into t1 values(now + 3s,1,1,1) (now + 3s,2,10,10) (now + 3s,3,30,30) (now + 11s,1,1,1) (now + 11s,2,10,10);
|
||||
sql insert into t2 values(now + 4s,1,1,1) (now + 4s,2,10,10) (now + 4s,3,30,30) (now + 12s,1,1,1) (now + 12s,2,10,10);
|
||||
|
||||
|
||||
print sql select _wstart, count(*) from st partition by tbname,ta interval(2s);
|
||||
sql select _wstart, count(*) from st partition by tbname,ta interval(2s);
|
||||
|
||||
print $data00 $data01 $data02 $data03 $data04
|
||||
print $data10 $data11 $data12 $data13 $data14
|
||||
print $data20 $data21 $data22 $data23 $data24
|
||||
print $data30 $data31 $data32 $data33 $data34
|
||||
print $data40 $data41 $data42 $data43 $data44
|
||||
print $data50 $data51 $data52 $data53 $data54
|
||||
|
||||
$loop_count = 0
|
||||
loop0:
|
||||
|
||||
sleep 2000
|
||||
|
||||
$loop_count = $loop_count + 1
|
||||
if $loop_count == 20 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
print 2 sql select * from streamt order by
|
||||
sql select * from streamt where ta == 1;
|
||||
|
||||
print $data00 $data01 $data02 $data03 $data04
|
||||
print $data10 $data11 $data12 $data13 $data14
|
||||
print $data20 $data21 $data22 $data23 $data24
|
||||
print $data30 $data31 $data32 $data33 $data34
|
||||
print $data40 $data41 $data42 $data43 $data44
|
||||
print $data50 $data51 $data52 $data53 $data54
|
||||
|
||||
|
||||
# row 0
|
||||
if $rows < 2 then
|
||||
print ======rows=$rows
|
||||
goto loop0
|
||||
endi
|
||||
|
||||
if $data01 != 3 then
|
||||
print ======data01=$data01
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data11 != 2 then
|
||||
print ======data11=$data11
|
||||
return -1
|
||||
endi
|
||||
|
||||
$loop_count = 0
|
||||
loop1:
|
||||
|
||||
sleep 2000
|
||||
|
||||
$loop_count = $loop_count + 1
|
||||
if $loop_count == 20 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
print 2 sql select * from streamt where ta == 2;
|
||||
sql select * from streamt where ta == 2;
|
||||
|
||||
print $data00 $data01 $data02 $data03 $data04
|
||||
print $data10 $data11 $data12 $data13 $data14
|
||||
print $data20 $data21 $data22 $data23 $data24
|
||||
print $data30 $data31 $data32 $data33 $data34
|
||||
print $data40 $data41 $data42 $data43 $data44
|
||||
print $data50 $data51 $data52 $data53 $data54
|
||||
|
||||
# row 0
|
||||
if $rows < 2 then
|
||||
print ======rows=$rows
|
||||
goto loop1
|
||||
endi
|
||||
|
||||
if $data01 != 3 then
|
||||
print ======data01=$data01
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data11 != 2 then
|
||||
print ======data11=$data11
|
||||
return -1
|
||||
endi
|
||||
|
||||
print end
|
||||
|
||||
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
Loading…
Reference in New Issue