From 57258e2527ee8b2a198999cb8a0a61a208c04b40 Mon Sep 17 00:00:00 2001 From: Xiaoyu Wang Date: Wed, 18 Jan 2023 12:00:53 +0800 Subject: [PATCH 1/2] fix: support writing streams to existing tables --- source/libs/parser/src/parTranslater.c | 48 +++++++++++++++++--------- 1 file changed, 31 insertions(+), 17 deletions(-) diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index 395f90d070..6e94883849 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -5662,6 +5662,30 @@ static int32_t addTagsToCreateStreamQuery(STranslateContext* pCxt, SCreateStream return TSDB_CODE_SUCCESS; } +static SNode* createNullValue() { + SValueNode* pValue = (SValueNode*)nodesMakeNode(QUERY_NODE_VALUE); + if (NULL == pValue) { + return NULL; + } + pValue->isNull = true; + pValue->node.resType.type = TSDB_DATA_TYPE_NULL; + pValue->node.resType.bytes = tDataTypes[TSDB_DATA_TYPE_NULL].bytes; + return (SNode*)pValue; +} + +static int32_t addNullTagsForExistTable(STranslateContext* pCxt, STableMeta* pMeta, SSelectStmt* pSelect) { + if (NULL == pMeta) { + return TSDB_CODE_SUCCESS; + } + + int32_t numOfTags = getNumOfTags(pMeta); + int32_t code = TSDB_CODE_SUCCESS; + for (int32_t i = 0; TSDB_CODE_SUCCESS == code && i < numOfTags; ++i) { + code = nodesListMakeStrictAppend(&pSelect->pTags, createNullValue()); + } + return code; +} + typedef struct SRewriteSubtableCxt { STranslateContext* pCxt; SNodeList* pPartitionList; @@ -5707,13 +5731,11 @@ static int32_t addSubtableNameToCreateStreamQuery(STranslateContext* pCxt, SCrea return pCxt->errCode; } -static int32_t addSubtableInfoToCreateStreamQuery(STranslateContext* pCxt, SCreateStreamStmt* pStmt) { +static int32_t addSubtableInfoToCreateStreamQuery(STranslateContext* pCxt, STableMeta* pMeta, + SCreateStreamStmt* pStmt) { SSelectStmt* pSelect = (SSelectStmt*)pStmt->pQuery; if (NULL == pSelect->pPartitionByList) { - if (NULL != pStmt->pTags || NULL != pStmt->pSubtable) { - return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY, "Unsupported stream query"); - } - return TSDB_CODE_SUCCESS; + return addNullTagsForExistTable(pCxt, pMeta, pSelect); } int32_t code = addTagsToCreateStreamQuery(pCxt, pStmt, pSelect); @@ -5914,17 +5936,6 @@ static int32_t adjustDataTypeOfTags(STranslateContext* pCxt, const STableMeta* p return TSDB_CODE_SUCCESS; } -static SNode* createNullValue() { - SValueNode* pValue = (SValueNode*)nodesMakeNode(QUERY_NODE_VALUE); - if (NULL == pValue) { - return NULL; - } - pValue->isNull = true; - pValue->node.resType.type = TSDB_DATA_TYPE_NULL; - pValue->node.resType.bytes = tDataTypes[TSDB_DATA_TYPE_NULL].bytes; - return (SNode*)pValue; -} - static int32_t adjustOrderOfTags(STranslateContext* pCxt, SNodeList* pTags, const STableMeta* pMeta, SNodeList** pTagExprs, SCMCreateStreamReq* pReq) { if (LIST_LENGTH(pTags) != LIST_LENGTH(*pTagExprs)) { @@ -5993,6 +6004,9 @@ static int32_t adjustOrderOfTags(STranslateContext* pCxt, SNodeList* pTags, cons static int32_t adjustTagsForExistTable(STranslateContext* pCxt, SCreateStreamStmt* pStmt, const STableMeta* pMeta, SCMCreateStreamReq* pReq) { SSelectStmt* pSelect = (SSelectStmt*)pStmt->pQuery; + if (NULL == pSelect->pPartitionByList) { + return TSDB_CODE_SUCCESS; + } if (NULL == pStmt->pTags) { return adjustDataTypeOfTags(pCxt, pMeta, pSelect->pTags); } @@ -6021,7 +6035,7 @@ static int32_t buildCreateStreamQuery(STranslateContext* pCxt, SCreateStreamStmt STableMeta* pMeta = NULL; int32_t code = translateStreamTargetTable(pCxt, pStmt, pReq, &pMeta); if (TSDB_CODE_SUCCESS == code) { - code = addSubtableInfoToCreateStreamQuery(pCxt, pStmt); + code = addSubtableInfoToCreateStreamQuery(pCxt, pMeta, pStmt); } if (TSDB_CODE_SUCCESS == code) { code = translateQuery(pCxt, pStmt->pQuery); From 175f2050637a27d526eff092f24526c033f855ff Mon Sep 17 00:00:00 2001 From: Xiaoyu Wang Date: Sat, 28 Jan 2023 09:18:09 +0800 Subject: [PATCH 2/2] fix: support writing streams to existing tables --- tests/script/tsim/stream/udTableAndTag0.sim | 2 +- tests/script/tsim/stream/udTableAndTag1.sim | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/script/tsim/stream/udTableAndTag0.sim b/tests/script/tsim/stream/udTableAndTag0.sim index 86feca1918..5cb5c2dd8b 100644 --- a/tests/script/tsim/stream/udTableAndTag0.sim +++ b/tests/script/tsim/stream/udTableAndTag0.sim @@ -20,7 +20,7 @@ sql create stable st(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int); sql create table t1 using st tags(1,1,1); sql create table t2 using st tags(2,2,2); -sql_error create stream streams1 trigger at_once into result.streamt SUBTABLE("aaa") as select _wstart, count(*) c1 from st interval(10s); +#sql_error create stream streams1 trigger at_once into result.streamt SUBTABLE("aaa") as select _wstart, count(*) c1 from st interval(10s); sql create stream streams1 trigger at_once into result.streamt SUBTABLE(concat("aaa-", tbname)) as select _wstart, count(*) c1 from st partition by tbname interval(10s); sql insert into t1 values(1648791213000,1,2,3); sql insert into t2 values(1648791213000,1,2,3); diff --git a/tests/script/tsim/stream/udTableAndTag1.sim b/tests/script/tsim/stream/udTableAndTag1.sim index a0393a03cd..4229de2cf0 100644 --- a/tests/script/tsim/stream/udTableAndTag1.sim +++ b/tests/script/tsim/stream/udTableAndTag1.sim @@ -20,7 +20,7 @@ sql create stable st(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int); sql create table t1 using st tags(1,1,1); sql create table t2 using st tags(2,2,2); -sql_error create stream streams1 trigger at_once into result.streamt SUBTABLE("aaa") as select _wstart, count(*) c1 from st interval(10s); +#sql_error create stream streams1 trigger at_once into result.streamt SUBTABLE("aaa") as select _wstart, count(*) c1 from st interval(10s); sql create stream streams1 trigger at_once into result.streamt SUBTABLE( concat("aaa-", cast(a as varchar(10) ) ) ) as select _wstart, count(*) c1 from st partition by a interval(10s); print ===== insert into 1 sql insert into t1 values(1648791213000,1,2,3);