feat(stream): add ci
This commit is contained in:
parent
c4414ec646
commit
cea8a920f7
|
@ -5671,10 +5671,6 @@ static SNode* createNullValue() {
|
|||
}
|
||||
|
||||
static int32_t addNullTagsForExistTable(STranslateContext* pCxt, STableMeta* pMeta, SSelectStmt* pSelect) {
|
||||
if (NULL == pMeta) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t numOfTags = getNumOfTags(pMeta);
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
for (int32_t i = 0; TSDB_CODE_SUCCESS == code && i < numOfTags; ++i) {
|
||||
|
@ -5728,12 +5724,27 @@ static int32_t addSubtableNameToCreateStreamQuery(STranslateContext* pCxt, SCrea
|
|||
return pCxt->errCode;
|
||||
}
|
||||
|
||||
static int32_t addNullTagsForCreateTable(STranslateContext* pCxt, SCreateStreamStmt* pStmt) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
for (int32_t i = 0; TSDB_CODE_SUCCESS == code && i < LIST_LENGTH(pStmt->pTags); ++i) {
|
||||
code = nodesListMakeStrictAppend(&((SSelectStmt*)pStmt->pQuery)->pTags, createNullValue());
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t addNullTagsToCreateStreamQuery(STranslateContext* pCxt, STableMeta* pMeta, SCreateStreamStmt* pStmt) {
|
||||
if (NULL == pMeta) {
|
||||
return addNullTagsForCreateTable(pCxt, pStmt);
|
||||
}
|
||||
return addNullTagsForExistTable(pCxt, pMeta, (SSelectStmt*)pStmt->pQuery);
|
||||
}
|
||||
|
||||
static int32_t addSubtableInfoToCreateStreamQuery(STranslateContext* pCxt, STableMeta* pMeta,
|
||||
SCreateStreamStmt* pStmt) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
SSelectStmt* pSelect = (SSelectStmt*)pStmt->pQuery;
|
||||
if (NULL == pSelect->pPartitionByList) {
|
||||
code = addNullTagsForExistTable(pCxt, pMeta, pSelect);
|
||||
code = addNullTagsToCreateStreamQuery(pCxt, pMeta, pStmt);
|
||||
} else {
|
||||
code = addTagsToCreateStreamQuery(pCxt, pStmt, pSelect);
|
||||
}
|
||||
|
@ -6011,17 +6022,66 @@ static int32_t adjustTagsForExistTable(STranslateContext* pCxt, SCreateStreamStm
|
|||
return adjustOrderOfTags(pCxt, pStmt->pTags, pMeta, &pSelect->pTags, pReq);
|
||||
}
|
||||
|
||||
static int32_t adjustTagsForCreateTable(STranslateContext* pCxt, SCreateStreamStmt* pStmt, SCMCreateStreamReq* pReq) {
|
||||
SSelectStmt* pSelect = (SSelectStmt*)pStmt->pQuery;
|
||||
if (NULL == pSelect->pPartitionByList || NULL == pSelect->pTags) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
SNode* pTagDef = NULL;
|
||||
SNode* pTagExpr = NULL;
|
||||
FORBOTH(pTagDef, pStmt->pTags, pTagExpr, pSelect->pTags) {
|
||||
SColumnDefNode* pDef = (SColumnDefNode*)pTagDef;
|
||||
if (!dataTypeEqual(&pDef->dataType, &((SExprNode*)pTagExpr)->resType)) {
|
||||
SNode* pFunc = NULL;
|
||||
int32_t code = createCastFunc(pCxt, pTagExpr, pDef->dataType, &pFunc);
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
return code;
|
||||
}
|
||||
REPLACE_LIST2_NODE(pFunc);
|
||||
}
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t adjustTags(STranslateContext* pCxt, SCreateStreamStmt* pStmt, const STableMeta* pMeta,
|
||||
SCMCreateStreamReq* pReq) {
|
||||
if (NULL == pMeta) {
|
||||
return adjustTagsForCreateTable(pCxt, pStmt, pReq);
|
||||
}
|
||||
return adjustTagsForExistTable(pCxt, pStmt, pMeta, pReq);
|
||||
}
|
||||
|
||||
static bool isTagDef(SNodeList* pTags) {
|
||||
if (NULL == pTags) {
|
||||
return false;
|
||||
}
|
||||
return QUERY_NODE_COLUMN_DEF == nodeType(nodesListGetNode(pTags, 0));
|
||||
}
|
||||
|
||||
static bool isTagBound(SNodeList* pTags) {
|
||||
if (NULL == pTags) {
|
||||
return false;
|
||||
}
|
||||
return QUERY_NODE_COLUMN == nodeType(nodesListGetNode(pTags, 0));
|
||||
}
|
||||
|
||||
static int32_t translateStreamTargetTable(STranslateContext* pCxt, SCreateStreamStmt* pStmt, SCMCreateStreamReq* pReq,
|
||||
STableMeta** pMeta) {
|
||||
int32_t code = getTableMeta(pCxt, pStmt->targetDbName, pStmt->targetTabName, pMeta);
|
||||
if (TSDB_CODE_PAR_TABLE_NOT_EXIST == code) {
|
||||
if (NULL != pStmt->pCols) {
|
||||
if (NULL != pStmt->pCols || isTagBound(pStmt->pTags)) {
|
||||
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_TABLE_NOT_EXIST, pStmt->targetTabName);
|
||||
}
|
||||
pReq->createStb = STREAM_CREATE_STABLE_TRUE;
|
||||
pReq->targetStbUid = 0;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
} else {
|
||||
if (isTagDef(pStmt->pTags)) {
|
||||
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY, "Table already exist: %s",
|
||||
pStmt->targetTabName);
|
||||
}
|
||||
pReq->createStb = STREAM_CREATE_STABLE_FALSE;
|
||||
pReq->targetStbUid = (*pMeta)->suid;
|
||||
}
|
||||
|
@ -6047,8 +6107,8 @@ static int32_t buildCreateStreamQuery(STranslateContext* pCxt, SCreateStreamStmt
|
|||
if (TSDB_CODE_SUCCESS == code && NULL != pMeta) {
|
||||
code = adjustProjectionsForExistTable(pCxt, pStmt, pMeta, pReq);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code && NULL != pMeta) {
|
||||
code = adjustTagsForExistTable(pCxt, pStmt, pMeta, pReq);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = adjustTags(pCxt, pStmt, pMeta, pReq);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
getSourceDatabase(pStmt->pQuery, pCxt->pParseCxt->acctId, pReq->sourceDB);
|
||||
|
|
|
@ -68,6 +68,17 @@ if $rows != 1 then
|
|||
goto loop1
|
||||
endi
|
||||
|
||||
if $data01 != 2 then
|
||||
print =====data01=$data01
|
||||
goto loop1
|
||||
endi
|
||||
|
||||
# group id
|
||||
if $data02 == NULL then
|
||||
print =====data02=$data02
|
||||
goto loop1
|
||||
endi
|
||||
|
||||
|
||||
print ===== step3
|
||||
print ===== column name
|
||||
|
@ -155,7 +166,18 @@ sql select * from result2.streamt2;
|
|||
|
||||
if $rows != 1 then
|
||||
print =====rows=$rows
|
||||
print $data00 $data10
|
||||
print $data00
|
||||
print $data10
|
||||
goto loop3
|
||||
endi
|
||||
|
||||
if $data01 != 2 then
|
||||
print =====data01=$data01
|
||||
goto loop3
|
||||
endi
|
||||
|
||||
if $data02 != NULL then
|
||||
print =====data02=$data02
|
||||
goto loop3
|
||||
endi
|
||||
|
||||
|
@ -173,7 +195,7 @@ sql create stable st(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
|
|||
sql create table t1 using st tags(1,1,1);
|
||||
sql create table t2 using st tags(2,2,2);
|
||||
|
||||
sql create stream streams3 trigger at_once into result3.streamt3 TAGS(dd varchar(100)) SUBTABLE(concat("tbn-", 1 ) ) as select _wstart, count(*) c1 from st interval(10s);
|
||||
sql create stream streams3 trigger at_once into result3.streamt3 TAGS(dd varchar(100)) SUBTABLE(concat("tbn-", "1") ) as select _wstart, count(*) c1 from st interval(10s);
|
||||
print ===== insert into 3
|
||||
sql insert into t1 values(1648791213000,1,2,3);
|
||||
sql insert into t2 values(1648791213000,2,2,3);
|
||||
|
@ -198,29 +220,24 @@ if $rows != 1 then
|
|||
goto loop4
|
||||
endi
|
||||
|
||||
if $data00 != NULL then
|
||||
if $data00 != dd then
|
||||
print =====data00=$data00
|
||||
goto loop4
|
||||
endi
|
||||
|
||||
sql select dd from result3.streamt3 order by 1;
|
||||
|
||||
if $rows != 2 then
|
||||
if $rows != 1 then
|
||||
print =====rows=$rows
|
||||
print $data00 $data10
|
||||
goto loop4
|
||||
endi
|
||||
|
||||
if $data00 != col-1 then
|
||||
if $data00 != NULL then
|
||||
print =====data00=$data00
|
||||
goto loop4
|
||||
endi
|
||||
|
||||
if $data10 != col-2 then
|
||||
print =====data10=$data10
|
||||
goto loop4
|
||||
endi
|
||||
|
||||
$loop_count = 0
|
||||
loop5:
|
||||
|
||||
|
@ -233,9 +250,20 @@ endi
|
|||
|
||||
sql select * from result3.streamt3;
|
||||
|
||||
if $rows != 2 then
|
||||
if $rows != 1 then
|
||||
print =====rows=$rows
|
||||
print $data00 $data10
|
||||
print $data00
|
||||
print $data10
|
||||
goto loop5
|
||||
endi
|
||||
|
||||
if $data01 != 2 then
|
||||
print =====data01=$data01
|
||||
goto loop5
|
||||
endi
|
||||
|
||||
if $data02 != NULL then
|
||||
print =====data02=$data02
|
||||
goto loop5
|
||||
endi
|
||||
|
||||
|
@ -251,9 +279,10 @@ endi
|
|||
|
||||
sql select table_name from information_schema.ins_tables where db_name="result3" order by 1;
|
||||
|
||||
if $rows != 2 then
|
||||
if $rows != 1 then
|
||||
print =====rows=$rows
|
||||
print $data00 $data10
|
||||
print $data00
|
||||
print $data10
|
||||
goto loop6
|
||||
endi
|
||||
|
||||
|
@ -262,17 +291,12 @@ if $data00 != tbn-1 then
|
|||
goto loop6
|
||||
endi
|
||||
|
||||
if $data10 != tbn-2 then
|
||||
print =====data10=$data10
|
||||
goto loop6
|
||||
endi
|
||||
|
||||
print ===== step5
|
||||
print ===== tag name + table name
|
||||
|
||||
sql create database result4 vgroups 1;
|
||||
|
||||
sql create database test4 vgroups 4;
|
||||
sql create database test4 vgroups 1;
|
||||
sql use test4;
|
||||
|
||||
|
||||
|
@ -281,7 +305,7 @@ sql create table t1 using st tags(1,1,1);
|
|||
sql create table t2 using st tags(2,2,2);
|
||||
sql create table t3 using st tags(3,3,3);
|
||||
|
||||
sql create stream streams4 trigger at_once into result4.streamt4 TAGS(dd varchar(100)) SUBTABLE(concat("tbn-", dd)) as select _wstart, count(*) c1 from st interval(10s);
|
||||
sql create stream streams4 trigger at_once into result4.streamt4 TAGS(dd varchar(100)) SUBTABLE(concat("tbn-", "1")) as select _wstart, count(*) c1 from st interval(10s);
|
||||
sql insert into t1 values(1648791213000,1,1,1) t2 values(1648791213000,2,2,2) t3 values(1648791213000,3,3,3);
|
||||
|
||||
|
||||
|
@ -297,27 +321,18 @@ endi
|
|||
|
||||
sql select table_name from information_schema.ins_tables where db_name="result4" order by 1;
|
||||
|
||||
if $rows != 3 then
|
||||
if $rows != 1 then
|
||||
print =====rows=$rows
|
||||
print $data00 $data10
|
||||
print $data00
|
||||
print $data10
|
||||
goto loop7
|
||||
endi
|
||||
|
||||
if $data00 != tbn-t1 then
|
||||
if $data00 != tbn-1 then
|
||||
print =====data00=$data00
|
||||
goto loop7
|
||||
endi
|
||||
|
||||
if $data10 != tbn-t2 then
|
||||
print =====data10=$data10
|
||||
goto loop7
|
||||
endi
|
||||
|
||||
if $data20 != tbn-t3 then
|
||||
print =====data20=$data20
|
||||
goto loop7
|
||||
endi
|
||||
|
||||
$loop_count = 0
|
||||
loop8:
|
||||
|
||||
|
@ -330,42 +345,22 @@ endi
|
|||
|
||||
sql select * from result4.streamt4 order by 3;
|
||||
|
||||
if $rows != 3 then
|
||||
if $rows != 1 then
|
||||
print =====rows=$rows
|
||||
print $data00 $data10
|
||||
goto loop8
|
||||
endi
|
||||
|
||||
if $data01 != 1 then
|
||||
if $data01 != 3 then
|
||||
print =====data01=$data01
|
||||
goto loop8
|
||||
endi
|
||||
|
||||
if $data02 != t1 then
|
||||
if $data02 != NULL then
|
||||
print =====data02=$data02
|
||||
goto loop8
|
||||
endi
|
||||
|
||||
if $data11 != 1 then
|
||||
print =====data11=$data11
|
||||
goto loop8
|
||||
endi
|
||||
|
||||
if $data12 != t2 then
|
||||
print =====data12=$data12
|
||||
goto loop8
|
||||
endi
|
||||
|
||||
if $data21 != 1 then
|
||||
print =====data21=$data21
|
||||
goto loop8
|
||||
endi
|
||||
|
||||
if $data22 != t3 then
|
||||
print =====data22=$data22
|
||||
goto loop8
|
||||
endi
|
||||
|
||||
print ======over
|
||||
|
||||
system sh/stop_dnodes.sh
|
||||
|
|
Loading…
Reference in New Issue