add ci
This commit is contained in:
parent
fd0f117544
commit
52e693511f
|
@ -9975,15 +9975,14 @@ static void getSourceDatabase(SNode* pStmt, int32_t acctId, char* pDbFName) {
|
|||
(void)tNameGetFullDbName(&name, pDbFName);
|
||||
}
|
||||
|
||||
static void getStreamQueryFirstProjectAliasName(SHashObj* pUserAliasSet, char* aliasName, int32_t len) {
|
||||
if (NULL == taosHashGet(pUserAliasSet, "_wstart", strlen("_wstart"))) {
|
||||
snprintf(aliasName, len, "%s", "_wstart");
|
||||
return;
|
||||
}
|
||||
if (NULL == taosHashGet(pUserAliasSet, "ts", strlen("ts"))) {
|
||||
snprintf(aliasName, len, "%s", "ts");
|
||||
return;
|
||||
static void getStreamQueryFirstProjectAliasName(SHashObj* pUserAliasSet, char* aliasName, int32_t len, char* defaultName[], int32_t defaultNum) {
|
||||
for (int32_t i = 0; i < defaultNum; i++) {
|
||||
if (NULL == taosHashGet(pUserAliasSet, defaultName[i], strlen(defaultName[i]))) {
|
||||
snprintf(aliasName, len, "%s", defaultName[i]);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
do {
|
||||
taosRandStr(aliasName, len - 1);
|
||||
aliasName[len - 1] = '\0';
|
||||
|
@ -10002,6 +10001,44 @@ static int32_t setColumnDefNodePrimaryKey(SColumnDefNode* pNode, bool isPk) {
|
|||
return code;
|
||||
}
|
||||
|
||||
static int32_t addTsToCreateStreamQueryImpl(STranslateContext* pCxt, SSelectStmt* pSelect,
|
||||
SHashObj* pUserAliasSet, SNodeList* pCols, SCMCreateStreamReq* pReq) {
|
||||
SNode* pProj = nodesListGetNode(pSelect->pProjectionList, 0);
|
||||
if (!pSelect->hasInterpFunc ||
|
||||
(QUERY_NODE_FUNCTION == nodeType(pProj) && 0 == strcmp("_irowts", ((SFunctionNode*)pProj)->functionName))) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
SFunctionNode* pFunc = NULL;
|
||||
int32_t code = nodesMakeNode(QUERY_NODE_FUNCTION, (SNode**)&pFunc);
|
||||
if (NULL == pFunc) {
|
||||
return code;
|
||||
}
|
||||
strcpy(pFunc->functionName, "_irowts");
|
||||
char* defaultName[] = {"_irowts"};
|
||||
getStreamQueryFirstProjectAliasName(pUserAliasSet, pFunc->node.aliasName, sizeof(pFunc->node.aliasName), defaultName, sizeof(defaultName));
|
||||
code = getFuncInfo(pCxt, pFunc);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = nodesListPushFront(pSelect->pProjectionList, (SNode*)pFunc);
|
||||
}
|
||||
|
||||
if (TSDB_CODE_SUCCESS == code && STREAM_CREATE_STABLE_TRUE == pReq->createStb) {
|
||||
SColumnDefNode* pColDef = NULL;
|
||||
code = nodesMakeNode(QUERY_NODE_COLUMN_DEF, (SNode**)&pColDef);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
strcpy(pColDef->colName, pFunc->node.aliasName);
|
||||
pColDef->dataType = pFunc->node.resType;
|
||||
pColDef->sma = true;
|
||||
code = setColumnDefNodePrimaryKey(pColDef, false);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) code = nodesListPushFront(pCols, (SNode*)pColDef);
|
||||
if (TSDB_CODE_SUCCESS != code) nodesDestroyNode((SNode*)pColDef);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
nodesDestroyNode((SNode*)pFunc);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t addWstartTsToCreateStreamQueryImpl(STranslateContext* pCxt, SSelectStmt* pSelect,
|
||||
SHashObj* pUserAliasSet, SNodeList* pCols, SCMCreateStreamReq* pReq) {
|
||||
SNode* pProj = nodesListGetNode(pSelect->pProjectionList, 0);
|
||||
|
@ -10015,7 +10052,8 @@ static int32_t addWstartTsToCreateStreamQueryImpl(STranslateContext* pCxt, SSele
|
|||
return code;
|
||||
}
|
||||
strcpy(pFunc->functionName, "_wstart");
|
||||
getStreamQueryFirstProjectAliasName(pUserAliasSet, pFunc->node.aliasName, sizeof(pFunc->node.aliasName));
|
||||
char* defaultName[] = {"_wstart", "ts"};
|
||||
getStreamQueryFirstProjectAliasName(pUserAliasSet, pFunc->node.aliasName, sizeof(pFunc->node.aliasName), defaultName, sizeof(defaultName));
|
||||
code = getFuncInfo(pCxt, pFunc);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = nodesListPushFront(pSelect->pProjectionList, (SNode*)pFunc);
|
||||
|
@ -10047,6 +10085,9 @@ static int32_t addWstartTsToCreateStreamQuery(STranslateContext* pCxt, SNode* pS
|
|||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = addWstartTsToCreateStreamQueryImpl(pCxt, pSelect, pUserAliasSet, pCols, pReq);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = addTsToCreateStreamQueryImpl(pCxt, pSelect, pUserAliasSet, pCols, pReq);
|
||||
}
|
||||
taosHashCleanup(pUserAliasSet);
|
||||
return code;
|
||||
}
|
||||
|
|
|
@ -0,0 +1,628 @@
|
|||
system sh/stop_dnodes.sh
|
||||
system sh/deploy.sh -n dnode1 -i 1
|
||||
system sh/exec.sh -n dnode1 -s start
|
||||
sleep 50
|
||||
sql connect
|
||||
|
||||
print step1
|
||||
print =============== create database
|
||||
sql create database test vgroups 4;
|
||||
sql use test;
|
||||
|
||||
sql create table t1(ts timestamp, a int, b int , c int, d double);
|
||||
sql create stream streams1_1 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt1_1 as select interp(a), _isfilled as a1, interp(b), _isfilled as a2, interp(c), _isfilled as a3, interp(d) from t1 every(1s) fill(prev);
|
||||
sql create stream streams1_2 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt1_2 as select interp(a), _isfilled as a1, interp(b), _isfilled as a2, interp(c), _isfilled as a3, interp(d) from t1 every(1s) fill(next);
|
||||
sql create stream streams1_3 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt1_3 as select interp(a), _isfilled as a1, interp(b), _isfilled as a2, interp(c), _isfilled as a3, interp(d) from t1 every(1s) fill(linear);
|
||||
sql create stream streams1_4 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt1_4 as select interp(a), _isfilled as a1, interp(b), _isfilled as a2, interp(c), _isfilled as a3, interp(d) from t1 every(1s) fill(NULL);
|
||||
sql create stream streams1_5 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt1_5 as select interp(a), _isfilled as a1, interp(b), _isfilled as a2, interp(c), _isfilled as a3, interp(d) from t1 every(1s) fill(value,11,22,33,44);
|
||||
|
||||
run tsim/stream/checkTaskStatus.sim
|
||||
|
||||
sql insert into t1 values(1648791215000,0,0,0,0.0);
|
||||
|
||||
sql insert into t1 values(1648791212000,10,10,10,10.0);
|
||||
|
||||
$loop_count = 0
|
||||
loop0:
|
||||
|
||||
$loop_count = $loop_count + 1
|
||||
if $loop_count == 10 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
sleep 300
|
||||
|
||||
print sql desc streamt1_1;
|
||||
sql desc streamt1_1;
|
||||
|
||||
if $rows != 9 then
|
||||
print ======rows=$rows
|
||||
goto loop0
|
||||
endi
|
||||
|
||||
sql desc streamt1_2;
|
||||
|
||||
if $rows != 9 then
|
||||
print ======rows=$rows
|
||||
goto loop0
|
||||
endi
|
||||
|
||||
sql desc streamt1_3;
|
||||
|
||||
if $rows != 9 then
|
||||
print ======rows=$rows
|
||||
goto loop0
|
||||
endi
|
||||
|
||||
sql desc streamt1_4;
|
||||
|
||||
if $rows != 9 then
|
||||
print ======rows=$rows
|
||||
goto loop0
|
||||
endi
|
||||
|
||||
sql desc streamt1_5;
|
||||
|
||||
if $rows != 9 then
|
||||
print ======rows=$rows
|
||||
goto loop0
|
||||
endi
|
||||
|
||||
$loop_count = 0
|
||||
loop0_1:
|
||||
|
||||
$loop_count = $loop_count + 1
|
||||
if $loop_count == 10 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
sleep 300
|
||||
|
||||
print sql select * from streamt1_1;
|
||||
sql select * from streamt1_1;
|
||||
|
||||
if $rows != 4 then
|
||||
print ======rows=$rows
|
||||
goto loop0_1
|
||||
endi
|
||||
|
||||
print sql select * from streamt1_2;
|
||||
sql select * from streamt1_2;
|
||||
|
||||
if $rows != 4 then
|
||||
print ======rows=$rows
|
||||
goto loop0_1
|
||||
endi
|
||||
|
||||
print sql select * from streamt1_3;
|
||||
sql select * from streamt1_3;
|
||||
|
||||
if $rows != 4 then
|
||||
print ======rows=$rows
|
||||
goto loop0_1
|
||||
endi
|
||||
|
||||
print sql select * from streamt1_4;
|
||||
sql select * from streamt1_4;
|
||||
|
||||
if $rows != 4 then
|
||||
print ======rows=$rows
|
||||
goto loop0_1
|
||||
endi
|
||||
|
||||
print sql select * from streamt1_5;
|
||||
sql select * from streamt1_5;
|
||||
|
||||
if $rows != 4 then
|
||||
print ======rows=$rows
|
||||
goto loop0_1
|
||||
endi
|
||||
|
||||
print sql select interp(a), _isfilled as a1, interp(b), _isfilled as a2, interp(c), _isfilled as a3, interp(d) from t1 range(1648791212000, 1648791215000) every(1s) fill(value,11,22,33,44);
|
||||
sql select interp(a), _isfilled as a1, interp(b), _isfilled as a2, interp(c), _isfilled as a3, interp(d) from t1 range(1648791212000, 1648791215000) every(1s) fill(value,11,22,33,44);
|
||||
|
||||
print $data00 $data01 $data02 $data03 $data04 $data05
|
||||
print $data10 $data11 $data12 $data13 $data14 $data15
|
||||
print $data20 $data21 $data22 $data23 $data24 $data25
|
||||
print $data30 $data31 $data32 $data33 $data34 $data35
|
||||
print $data40 $data41 $data42 $data43 $data44 $data45
|
||||
print $data50 $data51 $data52 $data53 $data54 $data55
|
||||
|
||||
$loop_count = 0
|
||||
loop0_2:
|
||||
|
||||
$loop_count = $loop_count + 1
|
||||
if $loop_count == 10 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
sleep 300
|
||||
|
||||
print sql select * from streamt1_5;
|
||||
sql select * from streamt1_5;
|
||||
|
||||
if $data01 != 10 then
|
||||
print ======data01=$data01
|
||||
goto loop0_2
|
||||
endi
|
||||
|
||||
if $data02 != 0 then
|
||||
print ======data02=$data02
|
||||
goto loop0_2
|
||||
endi
|
||||
|
||||
if $data03 != 10 then
|
||||
print ======data03=$data03
|
||||
goto loop0_2
|
||||
endi
|
||||
|
||||
if $data04 != 0 then
|
||||
print ======data04=$data04
|
||||
goto loop0_2
|
||||
endi
|
||||
|
||||
if $data05 != 10 then
|
||||
print ======data05=$data05
|
||||
goto loop0_2
|
||||
endi
|
||||
|
||||
if $data06 != 0 then
|
||||
print ======data06=$data06
|
||||
goto loop0_2
|
||||
endi
|
||||
|
||||
if $data07 != 10.000000000 then
|
||||
print ======data07=$data07
|
||||
goto loop0_2
|
||||
endi
|
||||
|
||||
if $data11 != 11 then
|
||||
print ======data11=$data11
|
||||
goto loop0_2
|
||||
endi
|
||||
|
||||
if $data12 != 1 then
|
||||
print ======data12=$data12
|
||||
goto loop0_2
|
||||
endi
|
||||
|
||||
if $data13 != 22 then
|
||||
print ======data13=$data13
|
||||
goto loop0_2
|
||||
endi
|
||||
|
||||
if $data14 != 1 then
|
||||
print ======data14=$data14
|
||||
goto loop0_2
|
||||
endi
|
||||
|
||||
if $data15 != 33 then
|
||||
print ======data15=$data15
|
||||
goto loop0_2
|
||||
endi
|
||||
|
||||
if $data16 != 1 then
|
||||
print ======data16=$data16
|
||||
goto loop0_2
|
||||
endi
|
||||
|
||||
if $data17 != 44.000000000 then
|
||||
print ======data17=$data17
|
||||
goto loop0_2
|
||||
endi
|
||||
|
||||
print step2
|
||||
|
||||
sql create database test2 vgroups 1;
|
||||
sql use test2;
|
||||
|
||||
sql create stable st(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
|
||||
sql create table t1 using st tags(1,1,1);
|
||||
sql create table t2 using st tags(2,2,2);
|
||||
|
||||
sql_error create stream streams2_1_1 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt2_1 as select interp(a), _isfilled as a1, interp(b), _isfilled as a2, interp(c), _isfilled as a3, interp(d) from t1 range(1648791212000, 1648791215000) every(1s) fill(prev);
|
||||
sql_error create stream streams2_1_2 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt2_2 as select interp(a), _isfilled as a1, interp(b), _isfilled as a2, interp(c), _isfilled as a3, interp(d) from t1 range(1648791212000, 1648791215000) every(1s) fill(next);
|
||||
sql_error create stream streams2_1_3 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt2_3 as select interp(a), _isfilled as a1, interp(b), _isfilled as a2, interp(c), _isfilled as a3, interp(d) from t1 range(1648791212000, 1648791215000) every(1s) fill(linear);
|
||||
sql_error create stream streams2_1_4 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt2_4 as select interp(a), _isfilled as a1, interp(b), _isfilled as a2, interp(c), _isfilled as a3, interp(d) from t1 range(1648791212000, 1648791215000) every(1s) fill(NULL);
|
||||
sql_error create stream streams2_1_5 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt2_5 as select interp(a), _isfilled as a1, interp(b), _isfilled as a2, interp(c), _isfilled as a3, interp(d) from t1 range(1648791212000, 1648791215000) every(1s) fill(value,11,22,33,44);
|
||||
|
||||
sql_error create stream streams2_2_1 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt1_1 as select interp(a), _isfilled as a1, interp(b), _isfilled as a2, interp(c), _isfilled as a3, interp(d) from st every(1s) fill(prev);
|
||||
sql_error create stream streams2_2_2 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt1_2 as select interp(a), _isfilled as a1, interp(b), _isfilled as a2, interp(c), _isfilled as a3, interp(d) from st every(1s) fill(next);
|
||||
sql_error create stream streams2_2_3 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt1_3 as select interp(a), _isfilled as a1, interp(b), _isfilled as a2, interp(c), _isfilled as a3, interp(d) from st every(1s) fill(linear);
|
||||
sql_error create stream streams2_2_4 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt1_4 as select interp(a), _isfilled as a1, interp(b), _isfilled as a2, interp(c), _isfilled as a3, interp(d) from st every(1s) fill(NULL);
|
||||
sql_error create stream streams2_2_5 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt1_5 as select interp(a), _isfilled as a1, interp(b), _isfilled as a2, interp(c), _isfilled as a3, interp(d) from st every(1s) fill(value,11,22,33,44);
|
||||
|
||||
sql_error create stream streams2_3_1 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt2_1 as select interp(a), _isfilled as a1, interp(b), _isfilled as a2, interp(c), _isfilled as a3, interp(d) from st prartition by a every(1s) fill(prev);
|
||||
sql_error create stream streams2_3_2 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt2_2 as select interp(a), _isfilled as a1, interp(b), _isfilled as a2, interp(c), _isfilled as a3, interp(d) from st prartition by a every(1s) fill(next);
|
||||
sql_error create stream streams2_3_3 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt2_3 as select interp(a), _isfilled as a1, interp(b), _isfilled as a2, interp(c), _isfilled as a3, interp(d) from st prartition by a every(1s) fill(linear);
|
||||
sql_error create stream streams2_3_4 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt2_4 as select interp(a), _isfilled as a1, interp(b), _isfilled as a2, interp(c), _isfilled as a3, interp(d) from st prartition by a every(1s) fill(NULL);
|
||||
sql_error create stream streams2_3_5 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt2_5 as select interp(a), _isfilled as a1, interp(b), _isfilled as a2, interp(c), _isfilled as a3, interp(d) from st prartition by a every(1s) fill(value,11,22,33,44);
|
||||
|
||||
sql_error create stream streams2_4_1 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt3_1 as select INTERP(a) FROM t1 RANGE('2023-01-01 00:00:00') fill(prev);
|
||||
sql_error create stream streams2_4_2 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt3_2 as select INTERP(a) FROM t1 RANGE('2023-01-01 00:00:00') fill(next);
|
||||
sql_error create stream streams2_4_3 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt3_3 as select INTERP(a) FROM t1 RANGE('2023-01-01 00:00:00') fill(linear);
|
||||
sql_error create stream streams2_4_4 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt3_4 as select INTERP(a) FROM t1 RANGE('2023-01-01 00:00:00') fill(NULL);
|
||||
sql_error create stream streams2_4_5 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt3_5 as select INTERP(a) FROM t1 RANGE('2023-01-01 00:00:00') fill(value,11,22,33,44);
|
||||
|
||||
print step3
|
||||
|
||||
sql create database test3 vgroups 4;
|
||||
sql use test3;
|
||||
|
||||
sql create stable st(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
|
||||
sql create table t1 using st tags(1,1,1);
|
||||
sql create table t2 using st tags(2,2,2);
|
||||
|
||||
|
||||
sql create stream streams3_1 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt3_1 TAGS(cc varchar(100)) SUBTABLE(concat(concat("tbn-", tbname), "_1")) as select interp(a), _isfilled as a1 from st partition by tbname, b as cc every(1s) fill(prev);
|
||||
sql create stream streams3_2 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt3_2 TAGS(cc varchar(100)) SUBTABLE(concat(concat("tbn-", tbname), "_2")) as select interp(a), _isfilled as a1 from st partition by tbname, b as cc every(1s) fill(next);
|
||||
sql create stream streams3_3 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt3_3 TAGS(cc varchar(100)) SUBTABLE(concat(concat("tbn-", tbname), "_3")) as select interp(a), _isfilled as a1 from st partition by tbname, b as cc every(1s) fill(linear);
|
||||
sql create stream streams3_4 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt3_4 TAGS(cc varchar(100)) SUBTABLE(concat(concat("tbn-", tbname), "_4")) as select interp(a), _isfilled as a1 from st partition by tbname, b as cc every(1s) fill(NULL);
|
||||
sql create stream streams3_5 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt3_5 TAGS(cc varchar(100)) SUBTABLE(concat(concat("tbn-", tbname), "_5")) as select interp(a), _isfilled as a1 from st partition by tbname, b as cc every(1s) fill(value,11);
|
||||
|
||||
run tsim/stream/checkTaskStatus.sim
|
||||
|
||||
sql insert into t1 values(1648791217000,1,2,3);
|
||||
|
||||
sleep 500
|
||||
|
||||
sql insert into t1 values(1648791212000,10,2,3);
|
||||
|
||||
sleep 500
|
||||
|
||||
sql insert into t1 values(1648791215001,20,2,3);
|
||||
|
||||
sleep 500
|
||||
|
||||
sql insert into t2 values(1648791215001,20,2,3);
|
||||
|
||||
$loop_count = 0
|
||||
loop3:
|
||||
|
||||
$loop_count = $loop_count + 1
|
||||
if $loop_count == 10 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
sleep 300
|
||||
|
||||
print sql select cc, * from `tbn-t1_1_streamt3_1_914568691400502130`;
|
||||
sql select cc, * from `tbn-t1_1_streamt3_1_914568691400502130`;
|
||||
|
||||
if $rows != 6 then
|
||||
print ======rows=$rows
|
||||
goto loop3
|
||||
endi
|
||||
|
||||
if $data00 != 2 then
|
||||
print ======rows=$rows
|
||||
goto loop3
|
||||
endi
|
||||
|
||||
print sql select cc, * from `tbn-t1_2_streamt3_2_914568691400502130`;
|
||||
sql select cc, * from `tbn-t1_2_streamt3_2_914568691400502130`;
|
||||
|
||||
if $rows != 6 then
|
||||
print ======rows=$rows
|
||||
goto loop3
|
||||
endi
|
||||
|
||||
if $data00 != 2 then
|
||||
print ======rows=$rows
|
||||
goto loop3
|
||||
endi
|
||||
|
||||
print sql select cc, * from `tbn-t1_3_streamt3_3_914568691400502130`;
|
||||
sql select cc, * from `tbn-t1_3_streamt3_3_914568691400502130`;
|
||||
|
||||
if $rows != 6 then
|
||||
print ======rows=$rows
|
||||
goto loop3
|
||||
endi
|
||||
|
||||
if $data00 != 2 then
|
||||
print ======rows=$rows
|
||||
goto loop3
|
||||
endi
|
||||
|
||||
print sql select cc, * from `tbn-t1_4_streamt3_4_914568691400502130`;
|
||||
sql select cc, * from `tbn-t1_4_streamt3_4_914568691400502130`;
|
||||
|
||||
if $rows != 6 then
|
||||
print ======rows=$rows
|
||||
goto loop3
|
||||
endi
|
||||
|
||||
if $data00 != 2 then
|
||||
print ======rows=$rows
|
||||
goto loop3
|
||||
endi
|
||||
|
||||
print sql select cc, * from `tbn-t1_5_streamt3_5_914568691400502130`;
|
||||
sql select cc, * from `tbn-t1_5_streamt3_5_914568691400502130`;
|
||||
|
||||
if $rows != 6 then
|
||||
print ======rows=$rows
|
||||
goto loop3
|
||||
endi
|
||||
|
||||
if $data00 != 2 then
|
||||
print ======rows=$rows
|
||||
goto loop3
|
||||
endi
|
||||
|
||||
|
||||
|
||||
print sql select * from `tbn-t2_1_streamt3_1_8905952758123525205`;
|
||||
sql select * from `tbn-t2_1_streamt3_1_8905952758123525205`;
|
||||
|
||||
if $rows != 0 then
|
||||
print ======rows=$rows
|
||||
goto loop3
|
||||
endi
|
||||
|
||||
print sql select * from `tbn-t2_2_streamt3_2_8905952758123525205`;
|
||||
sql select * from `tbn-t2_2_streamt3_2_8905952758123525205`;
|
||||
|
||||
if $rows != 0 then
|
||||
print ======rows=$rows
|
||||
goto loop3
|
||||
endi
|
||||
|
||||
print sql select * from `tbn-t2_3_streamt3_3_8905952758123525205`;
|
||||
sql select * from `tbn-t2_3_streamt3_3_8905952758123525205`;
|
||||
|
||||
if $rows != 0 then
|
||||
print ======rows=$rows
|
||||
goto loop3
|
||||
endi
|
||||
|
||||
print sql select * from `tbn-t2_4_streamt3_4_8905952758123525205`;
|
||||
sql select * from `tbn-t2_4_streamt3_4_8905952758123525205`;
|
||||
|
||||
if $rows != 0 then
|
||||
print ======rows=$rows
|
||||
goto loop3
|
||||
endi
|
||||
|
||||
print sql select * from `tbn-t2_5_streamt3_5_8905952758123525205`;
|
||||
sql select * from `tbn-t2_5_streamt3_5_8905952758123525205`;
|
||||
|
||||
if $rows != 0 then
|
||||
print ======rows=$rows
|
||||
goto loop3
|
||||
endi
|
||||
|
||||
print step4
|
||||
|
||||
sql create database test4 vgroups 4;
|
||||
sql use test4;
|
||||
|
||||
sql create stable st(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
|
||||
sql create table t1 using st tags(1,1,1);
|
||||
sql create table t2 using st tags(2,2,2);
|
||||
|
||||
sql create stable streamt4_1(ts timestamp,a varchar(10),b tinyint,c tinyint) tags(ta int,cc int,tc int);
|
||||
sql create stable streamt4_2(ts timestamp,a varchar(10),b tinyint,c tinyint) tags(ta int,cc int,tc int);
|
||||
sql create stable streamt4_3(ts timestamp,a varchar(10),b tinyint,c tinyint) tags(ta int,cc int,tc int);
|
||||
sql create stable streamt4_4(ts timestamp,a varchar(10),b tinyint,c tinyint) tags(ta int,cc int,tc int);
|
||||
sql create stable streamt4_5(ts timestamp,a varchar(10),b tinyint,c tinyint) tags(ta int,cc int,tc int);
|
||||
|
||||
sql create stream streams4_1 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt4_1(ts, b, a) TAGS(cc) SUBTABLE(concat(concat("tbn-", tbname), "_1")) as select _irowts, interp(a), _isfilled as a1 from st partition by tbname, b as cc every(1s) fill(prev);
|
||||
sql create stream streams4_2 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt4_2(ts, b, a) TAGS(cc) SUBTABLE(concat(concat("tbn-", tbname), "_2")) as select _irowts, interp(a), _isfilled as a1 from st partition by tbname, b as cc every(1s) fill(next);
|
||||
sql create stream streams4_3 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt4_3(ts, b, a) TAGS(cc) SUBTABLE(concat(concat("tbn-", tbname), "_3")) as select _irowts, interp(a), _isfilled as a1 from st partition by tbname, b as cc every(1s) fill(linear);
|
||||
sql create stream streams4_4 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt4_4(ts, b, a) TAGS(cc) SUBTABLE(concat(concat("tbn-", tbname), "_4")) as select _irowts, interp(a), _isfilled as a1 from st partition by tbname, b as cc every(1s) fill(NULL);
|
||||
sql create stream streams4_5 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt4_5(ts, b, a) TAGS(cc) SUBTABLE(concat(concat("tbn-", tbname), "_5")) as select _irowts, interp(a), _isfilled as a1 from st partition by tbname, b as cc every(1s) fill(value,1100);
|
||||
|
||||
run tsim/stream/checkTaskStatus.sim
|
||||
|
||||
sql insert into t1 values(1648791217000,20000,2,3);
|
||||
|
||||
sleep 500
|
||||
|
||||
sql insert into t1 values(1648791212000,10000,2,3);
|
||||
|
||||
sleep 500
|
||||
|
||||
sql insert into t1 values(1648791215001,20,2,3);
|
||||
|
||||
$loop_count = 0
|
||||
loop4:
|
||||
|
||||
$loop_count = $loop_count + 1
|
||||
if $loop_count == 10 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
sleep 300
|
||||
|
||||
print sql select a,b from streamt4_1;
|
||||
sql select a,b from streamt4_1;
|
||||
|
||||
if $rows != 6 then
|
||||
print ======rows=$rows
|
||||
goto loop4
|
||||
endi
|
||||
|
||||
if $data00 != false then
|
||||
print ======data00=$data00
|
||||
goto loop4
|
||||
endi
|
||||
|
||||
if $data01 != 16 then
|
||||
print ======data01=$data01
|
||||
goto loop4
|
||||
endi
|
||||
|
||||
if $data10 != true then
|
||||
print ======data10=$data10
|
||||
goto loop4
|
||||
endi
|
||||
|
||||
if $data20 != true then
|
||||
print ======data20=$data20
|
||||
goto loop4
|
||||
endi
|
||||
|
||||
if $data50 != false then
|
||||
print ======data50=$data50
|
||||
goto loop4
|
||||
endi
|
||||
|
||||
if $data51 != 32 then
|
||||
print ======data51=$data51
|
||||
goto loop4
|
||||
endi
|
||||
|
||||
print sql select a,b from streamt4_2;
|
||||
sql select a,b from streamt4_2;
|
||||
|
||||
if $rows != 6 then
|
||||
print ======rows=$rows
|
||||
goto loop4
|
||||
endi
|
||||
|
||||
if $data00 != false then
|
||||
print ======data00=$data00
|
||||
goto loop4
|
||||
endi
|
||||
|
||||
if $data01 != 16 then
|
||||
print ======data01=$data01
|
||||
goto loop4
|
||||
endi
|
||||
|
||||
if $data10 != true then
|
||||
print ======data10=$data10
|
||||
goto loop4
|
||||
endi
|
||||
|
||||
if $data20 != true then
|
||||
print ======data20=$data20
|
||||
goto loop4
|
||||
endi
|
||||
|
||||
if $data50 != false then
|
||||
print ======data50=$data50
|
||||
goto loop4
|
||||
endi
|
||||
|
||||
if $data51 != 32 then
|
||||
print ======data51=$data51
|
||||
goto loop4
|
||||
endi
|
||||
|
||||
print sql select a,b from streamt4_3;
|
||||
sql select a,b from streamt4_3;
|
||||
|
||||
if $rows != 6 then
|
||||
print ======rows=$rows
|
||||
goto loop4
|
||||
endi
|
||||
|
||||
if $data00 != false then
|
||||
print ======data00=$data00
|
||||
goto loop4
|
||||
endi
|
||||
|
||||
if $data01 != 16 then
|
||||
print ======data01=$data01
|
||||
goto loop4
|
||||
endi
|
||||
|
||||
if $data10 != true then
|
||||
print ======data10=$data10
|
||||
goto loop4
|
||||
endi
|
||||
|
||||
if $data20 != true then
|
||||
print ======data20=$data20
|
||||
goto loop4
|
||||
endi
|
||||
|
||||
if $data50 != false then
|
||||
print ======data50=$data50
|
||||
goto loop4
|
||||
endi
|
||||
|
||||
if $data51 != 32 then
|
||||
print ======data51=$data51
|
||||
goto loop4
|
||||
endi
|
||||
|
||||
print sql select a,b from streamt4_4;
|
||||
sql select a,b from streamt4_4;
|
||||
|
||||
if $rows != 6 then
|
||||
print ======rows=$rows
|
||||
goto loop4
|
||||
endi
|
||||
|
||||
if $data00 != false then
|
||||
print ======data00=$data00
|
||||
goto loop4
|
||||
endi
|
||||
|
||||
if $data01 != 16 then
|
||||
print ======data01=$data01
|
||||
goto loop4
|
||||
endi
|
||||
|
||||
if $data10 != true then
|
||||
print ======data10=$data10
|
||||
goto loop4
|
||||
endi
|
||||
|
||||
if $data20 != true then
|
||||
print ======data20=$data20
|
||||
goto loop4
|
||||
endi
|
||||
|
||||
if $data50 != false then
|
||||
print ======data50=$data50
|
||||
goto loop4
|
||||
endi
|
||||
|
||||
if $data51 != 32 then
|
||||
print ======data51=$data51
|
||||
goto loop4
|
||||
endi
|
||||
|
||||
print sql select a,b from streamt4_5;
|
||||
sql select a,b from streamt4_5;
|
||||
|
||||
if $rows != 6 then
|
||||
print ======rows=$rows
|
||||
goto loop4
|
||||
endi
|
||||
|
||||
if $data00 != false then
|
||||
print ======data00=$data00
|
||||
goto loop4
|
||||
endi
|
||||
|
||||
if $data01 != 16 then
|
||||
print ======data01=$data01
|
||||
goto loop4
|
||||
endi
|
||||
|
||||
if $data10 != true then
|
||||
print ======data10=$data10
|
||||
goto loop4
|
||||
endi
|
||||
|
||||
if $data20 != true then
|
||||
print ======data20=$data20
|
||||
goto loop4
|
||||
endi
|
||||
|
||||
if $data50 != false then
|
||||
print ======data50=$data50
|
||||
goto loop4
|
||||
endi
|
||||
|
||||
if $data51 != 32 then
|
||||
print ======data51=$data51
|
||||
goto loop4
|
||||
endi
|
||||
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
Loading…
Reference in New Issue