From cea8a920f7a1c623760c6b8a0d2468bf1483959e Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao@163.com> Date: Wed, 1 Feb 2023 17:50:08 +0800 Subject: [PATCH] feat(stream): add ci --- source/libs/parser/src/parTranslater.c | 76 ++++++++++++-- tests/script/tsim/stream/udTableAndTag2.sim | 109 ++++++++++---------- 2 files changed, 120 insertions(+), 65 deletions(-) diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index 0e92d66d73..0a9adaf60c 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -5671,10 +5671,6 @@ static SNode* createNullValue() { } static int32_t addNullTagsForExistTable(STranslateContext* pCxt, STableMeta* pMeta, SSelectStmt* pSelect) { - if (NULL == pMeta) { - return TSDB_CODE_SUCCESS; - } - int32_t numOfTags = getNumOfTags(pMeta); int32_t code = TSDB_CODE_SUCCESS; for (int32_t i = 0; TSDB_CODE_SUCCESS == code && i < numOfTags; ++i) { @@ -5728,12 +5724,27 @@ static int32_t addSubtableNameToCreateStreamQuery(STranslateContext* pCxt, SCrea return pCxt->errCode; } +static int32_t addNullTagsForCreateTable(STranslateContext* pCxt, SCreateStreamStmt* pStmt) { + int32_t code = TSDB_CODE_SUCCESS; + for (int32_t i = 0; TSDB_CODE_SUCCESS == code && i < LIST_LENGTH(pStmt->pTags); ++i) { + code = nodesListMakeStrictAppend(&((SSelectStmt*)pStmt->pQuery)->pTags, createNullValue()); + } + return code; +} + +static int32_t addNullTagsToCreateStreamQuery(STranslateContext* pCxt, STableMeta* pMeta, SCreateStreamStmt* pStmt) { + if (NULL == pMeta) { + return addNullTagsForCreateTable(pCxt, pStmt); + } + return addNullTagsForExistTable(pCxt, pMeta, (SSelectStmt*)pStmt->pQuery); +} + static int32_t addSubtableInfoToCreateStreamQuery(STranslateContext* pCxt, STableMeta* pMeta, SCreateStreamStmt* pStmt) { int32_t code = TSDB_CODE_SUCCESS; SSelectStmt* pSelect = (SSelectStmt*)pStmt->pQuery; if (NULL == pSelect->pPartitionByList) { - code = addNullTagsForExistTable(pCxt, pMeta, pSelect); + code = addNullTagsToCreateStreamQuery(pCxt, pMeta, pStmt); } else { code = addTagsToCreateStreamQuery(pCxt, pStmt, pSelect); } @@ -6011,17 +6022,66 @@ static int32_t adjustTagsForExistTable(STranslateContext* pCxt, SCreateStreamStm return adjustOrderOfTags(pCxt, pStmt->pTags, pMeta, &pSelect->pTags, pReq); } +static int32_t adjustTagsForCreateTable(STranslateContext* pCxt, SCreateStreamStmt* pStmt, SCMCreateStreamReq* pReq) { + SSelectStmt* pSelect = (SSelectStmt*)pStmt->pQuery; + if (NULL == pSelect->pPartitionByList || NULL == pSelect->pTags) { + return TSDB_CODE_SUCCESS; + } + + SNode* pTagDef = NULL; + SNode* pTagExpr = NULL; + FORBOTH(pTagDef, pStmt->pTags, pTagExpr, pSelect->pTags) { + SColumnDefNode* pDef = (SColumnDefNode*)pTagDef; + if (!dataTypeEqual(&pDef->dataType, &((SExprNode*)pTagExpr)->resType)) { + SNode* pFunc = NULL; + int32_t code = createCastFunc(pCxt, pTagExpr, pDef->dataType, &pFunc); + if (TSDB_CODE_SUCCESS != code) { + return code; + } + REPLACE_LIST2_NODE(pFunc); + } + } + + return TSDB_CODE_SUCCESS; +} + +static int32_t adjustTags(STranslateContext* pCxt, SCreateStreamStmt* pStmt, const STableMeta* pMeta, + SCMCreateStreamReq* pReq) { + if (NULL == pMeta) { + return adjustTagsForCreateTable(pCxt, pStmt, pReq); + } + return adjustTagsForExistTable(pCxt, pStmt, pMeta, pReq); +} + +static bool isTagDef(SNodeList* pTags) { + if (NULL == pTags) { + return false; + } + return QUERY_NODE_COLUMN_DEF == nodeType(nodesListGetNode(pTags, 0)); +} + +static bool isTagBound(SNodeList* pTags) { + if (NULL == pTags) { + return false; + } + return QUERY_NODE_COLUMN == nodeType(nodesListGetNode(pTags, 0)); +} + static int32_t translateStreamTargetTable(STranslateContext* pCxt, SCreateStreamStmt* pStmt, SCMCreateStreamReq* pReq, STableMeta** pMeta) { int32_t code = getTableMeta(pCxt, pStmt->targetDbName, pStmt->targetTabName, pMeta); if (TSDB_CODE_PAR_TABLE_NOT_EXIST == code) { - if (NULL != pStmt->pCols) { + if (NULL != pStmt->pCols || isTagBound(pStmt->pTags)) { return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_TABLE_NOT_EXIST, pStmt->targetTabName); } pReq->createStb = STREAM_CREATE_STABLE_TRUE; pReq->targetStbUid = 0; return TSDB_CODE_SUCCESS; } else { + if (isTagDef(pStmt->pTags)) { + return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY, "Table already exist: %s", + pStmt->targetTabName); + } pReq->createStb = STREAM_CREATE_STABLE_FALSE; pReq->targetStbUid = (*pMeta)->suid; } @@ -6047,8 +6107,8 @@ static int32_t buildCreateStreamQuery(STranslateContext* pCxt, SCreateStreamStmt if (TSDB_CODE_SUCCESS == code && NULL != pMeta) { code = adjustProjectionsForExistTable(pCxt, pStmt, pMeta, pReq); } - if (TSDB_CODE_SUCCESS == code && NULL != pMeta) { - code = adjustTagsForExistTable(pCxt, pStmt, pMeta, pReq); + if (TSDB_CODE_SUCCESS == code) { + code = adjustTags(pCxt, pStmt, pMeta, pReq); } if (TSDB_CODE_SUCCESS == code) { getSourceDatabase(pStmt->pQuery, pCxt->pParseCxt->acctId, pReq->sourceDB); diff --git a/tests/script/tsim/stream/udTableAndTag2.sim b/tests/script/tsim/stream/udTableAndTag2.sim index c42ac21dcd..5dd2e3ae2b 100644 --- a/tests/script/tsim/stream/udTableAndTag2.sim +++ b/tests/script/tsim/stream/udTableAndTag2.sim @@ -68,6 +68,17 @@ if $rows != 1 then goto loop1 endi +if $data01 != 2 then + print =====data01=$data01 + goto loop1 +endi + +# group id +if $data02 == NULL then + print =====data02=$data02 + goto loop1 +endi + print ===== step3 print ===== column name @@ -155,7 +166,18 @@ sql select * from result2.streamt2; if $rows != 1 then print =====rows=$rows - print $data00 $data10 + print $data00 + print $data10 + goto loop3 +endi + +if $data01 != 2 then + print =====data01=$data01 + goto loop3 +endi + +if $data02 != NULL then + print =====data02=$data02 goto loop3 endi @@ -173,7 +195,7 @@ sql create stable st(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int); sql create table t1 using st tags(1,1,1); sql create table t2 using st tags(2,2,2); -sql create stream streams3 trigger at_once into result3.streamt3 TAGS(dd varchar(100)) SUBTABLE(concat("tbn-", 1 ) ) as select _wstart, count(*) c1 from st interval(10s); +sql create stream streams3 trigger at_once into result3.streamt3 TAGS(dd varchar(100)) SUBTABLE(concat("tbn-", "1") ) as select _wstart, count(*) c1 from st interval(10s); print ===== insert into 3 sql insert into t1 values(1648791213000,1,2,3); sql insert into t2 values(1648791213000,2,2,3); @@ -198,29 +220,24 @@ if $rows != 1 then goto loop4 endi -if $data00 != NULL then +if $data00 != dd then print =====data00=$data00 goto loop4 endi sql select dd from result3.streamt3 order by 1; -if $rows != 2 then +if $rows != 1 then print =====rows=$rows print $data00 $data10 goto loop4 endi -if $data00 != col-1 then +if $data00 != NULL then print =====data00=$data00 goto loop4 endi -if $data10 != col-2 then - print =====data10=$data10 - goto loop4 -endi - $loop_count = 0 loop5: @@ -233,9 +250,20 @@ endi sql select * from result3.streamt3; -if $rows != 2 then +if $rows != 1 then print =====rows=$rows - print $data00 $data10 + print $data00 + print $data10 + goto loop5 +endi + +if $data01 != 2 then + print =====data01=$data01 + goto loop5 +endi + +if $data02 != NULL then + print =====data02=$data02 goto loop5 endi @@ -251,9 +279,10 @@ endi sql select table_name from information_schema.ins_tables where db_name="result3" order by 1; -if $rows != 2 then +if $rows != 1 then print =====rows=$rows - print $data00 $data10 + print $data00 + print $data10 goto loop6 endi @@ -262,17 +291,12 @@ if $data00 != tbn-1 then goto loop6 endi -if $data10 != tbn-2 then - print =====data10=$data10 - goto loop6 -endi - print ===== step5 print ===== tag name + table name sql create database result4 vgroups 1; -sql create database test4 vgroups 4; +sql create database test4 vgroups 1; sql use test4; @@ -281,7 +305,7 @@ sql create table t1 using st tags(1,1,1); sql create table t2 using st tags(2,2,2); sql create table t3 using st tags(3,3,3); -sql create stream streams4 trigger at_once into result4.streamt4 TAGS(dd varchar(100)) SUBTABLE(concat("tbn-", dd)) as select _wstart, count(*) c1 from st interval(10s); +sql create stream streams4 trigger at_once into result4.streamt4 TAGS(dd varchar(100)) SUBTABLE(concat("tbn-", "1")) as select _wstart, count(*) c1 from st interval(10s); sql insert into t1 values(1648791213000,1,1,1) t2 values(1648791213000,2,2,2) t3 values(1648791213000,3,3,3); @@ -297,27 +321,18 @@ endi sql select table_name from information_schema.ins_tables where db_name="result4" order by 1; -if $rows != 3 then +if $rows != 1 then print =====rows=$rows - print $data00 $data10 + print $data00 + print $data10 goto loop7 endi -if $data00 != tbn-t1 then +if $data00 != tbn-1 then print =====data00=$data00 goto loop7 endi -if $data10 != tbn-t2 then - print =====data10=$data10 - goto loop7 -endi - -if $data20 != tbn-t3 then - print =====data20=$data20 - goto loop7 -endi - $loop_count = 0 loop8: @@ -330,42 +345,22 @@ endi sql select * from result4.streamt4 order by 3; -if $rows != 3 then +if $rows != 1 then print =====rows=$rows print $data00 $data10 goto loop8 endi -if $data01 != 1 then +if $data01 != 3 then print =====data01=$data01 goto loop8 endi -if $data02 != t1 then +if $data02 != NULL then print =====data02=$data02 goto loop8 endi -if $data11 != 1 then - print =====data11=$data11 - goto loop8 -endi - -if $data12 != t2 then - print =====data12=$data12 - goto loop8 -endi - -if $data21 != 1 then - print =====data21=$data21 - goto loop8 -endi - -if $data22 != t3 then - print =====data22=$data22 - goto loop8 -endi - print ======over system sh/stop_dnodes.sh