Merge pull request #19640 from taosdata/feat/3.0_stream_wxy
fix: support writing streams to existing tables
This commit is contained in:
commit
922f0781ad
|
@ -5662,6 +5662,30 @@ static int32_t addTagsToCreateStreamQuery(STranslateContext* pCxt, SCreateStream
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static SNode* createNullValue() {
|
||||||
|
SValueNode* pValue = (SValueNode*)nodesMakeNode(QUERY_NODE_VALUE);
|
||||||
|
if (NULL == pValue) {
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
pValue->isNull = true;
|
||||||
|
pValue->node.resType.type = TSDB_DATA_TYPE_NULL;
|
||||||
|
pValue->node.resType.bytes = tDataTypes[TSDB_DATA_TYPE_NULL].bytes;
|
||||||
|
return (SNode*)pValue;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t addNullTagsForExistTable(STranslateContext* pCxt, STableMeta* pMeta, SSelectStmt* pSelect) {
|
||||||
|
if (NULL == pMeta) {
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t numOfTags = getNumOfTags(pMeta);
|
||||||
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
for (int32_t i = 0; TSDB_CODE_SUCCESS == code && i < numOfTags; ++i) {
|
||||||
|
code = nodesListMakeStrictAppend(&pSelect->pTags, createNullValue());
|
||||||
|
}
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
typedef struct SRewriteSubtableCxt {
|
typedef struct SRewriteSubtableCxt {
|
||||||
STranslateContext* pCxt;
|
STranslateContext* pCxt;
|
||||||
SNodeList* pPartitionList;
|
SNodeList* pPartitionList;
|
||||||
|
@ -5707,13 +5731,11 @@ static int32_t addSubtableNameToCreateStreamQuery(STranslateContext* pCxt, SCrea
|
||||||
return pCxt->errCode;
|
return pCxt->errCode;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t addSubtableInfoToCreateStreamQuery(STranslateContext* pCxt, SCreateStreamStmt* pStmt) {
|
static int32_t addSubtableInfoToCreateStreamQuery(STranslateContext* pCxt, STableMeta* pMeta,
|
||||||
|
SCreateStreamStmt* pStmt) {
|
||||||
SSelectStmt* pSelect = (SSelectStmt*)pStmt->pQuery;
|
SSelectStmt* pSelect = (SSelectStmt*)pStmt->pQuery;
|
||||||
if (NULL == pSelect->pPartitionByList) {
|
if (NULL == pSelect->pPartitionByList) {
|
||||||
if (NULL != pStmt->pTags || NULL != pStmt->pSubtable) {
|
return addNullTagsForExistTable(pCxt, pMeta, pSelect);
|
||||||
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY, "Unsupported stream query");
|
|
||||||
}
|
|
||||||
return TSDB_CODE_SUCCESS;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t code = addTagsToCreateStreamQuery(pCxt, pStmt, pSelect);
|
int32_t code = addTagsToCreateStreamQuery(pCxt, pStmt, pSelect);
|
||||||
|
@ -5914,17 +5936,6 @@ static int32_t adjustDataTypeOfTags(STranslateContext* pCxt, const STableMeta* p
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
static SNode* createNullValue() {
|
|
||||||
SValueNode* pValue = (SValueNode*)nodesMakeNode(QUERY_NODE_VALUE);
|
|
||||||
if (NULL == pValue) {
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
pValue->isNull = true;
|
|
||||||
pValue->node.resType.type = TSDB_DATA_TYPE_NULL;
|
|
||||||
pValue->node.resType.bytes = tDataTypes[TSDB_DATA_TYPE_NULL].bytes;
|
|
||||||
return (SNode*)pValue;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t adjustOrderOfTags(STranslateContext* pCxt, SNodeList* pTags, const STableMeta* pMeta,
|
static int32_t adjustOrderOfTags(STranslateContext* pCxt, SNodeList* pTags, const STableMeta* pMeta,
|
||||||
SNodeList** pTagExprs, SCMCreateStreamReq* pReq) {
|
SNodeList** pTagExprs, SCMCreateStreamReq* pReq) {
|
||||||
if (LIST_LENGTH(pTags) != LIST_LENGTH(*pTagExprs)) {
|
if (LIST_LENGTH(pTags) != LIST_LENGTH(*pTagExprs)) {
|
||||||
|
@ -5993,6 +6004,9 @@ static int32_t adjustOrderOfTags(STranslateContext* pCxt, SNodeList* pTags, cons
|
||||||
static int32_t adjustTagsForExistTable(STranslateContext* pCxt, SCreateStreamStmt* pStmt, const STableMeta* pMeta,
|
static int32_t adjustTagsForExistTable(STranslateContext* pCxt, SCreateStreamStmt* pStmt, const STableMeta* pMeta,
|
||||||
SCMCreateStreamReq* pReq) {
|
SCMCreateStreamReq* pReq) {
|
||||||
SSelectStmt* pSelect = (SSelectStmt*)pStmt->pQuery;
|
SSelectStmt* pSelect = (SSelectStmt*)pStmt->pQuery;
|
||||||
|
if (NULL == pSelect->pPartitionByList) {
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
if (NULL == pStmt->pTags) {
|
if (NULL == pStmt->pTags) {
|
||||||
return adjustDataTypeOfTags(pCxt, pMeta, pSelect->pTags);
|
return adjustDataTypeOfTags(pCxt, pMeta, pSelect->pTags);
|
||||||
}
|
}
|
||||||
|
@ -6021,7 +6035,7 @@ static int32_t buildCreateStreamQuery(STranslateContext* pCxt, SCreateStreamStmt
|
||||||
STableMeta* pMeta = NULL;
|
STableMeta* pMeta = NULL;
|
||||||
int32_t code = translateStreamTargetTable(pCxt, pStmt, pReq, &pMeta);
|
int32_t code = translateStreamTargetTable(pCxt, pStmt, pReq, &pMeta);
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = addSubtableInfoToCreateStreamQuery(pCxt, pStmt);
|
code = addSubtableInfoToCreateStreamQuery(pCxt, pMeta, pStmt);
|
||||||
}
|
}
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = translateQuery(pCxt, pStmt->pQuery);
|
code = translateQuery(pCxt, pStmt->pQuery);
|
||||||
|
|
|
@ -20,7 +20,7 @@ sql create stable st(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
|
||||||
sql create table t1 using st tags(1,1,1);
|
sql create table t1 using st tags(1,1,1);
|
||||||
sql create table t2 using st tags(2,2,2);
|
sql create table t2 using st tags(2,2,2);
|
||||||
|
|
||||||
sql_error create stream streams1 trigger at_once into result.streamt SUBTABLE("aaa") as select _wstart, count(*) c1 from st interval(10s);
|
#sql_error create stream streams1 trigger at_once into result.streamt SUBTABLE("aaa") as select _wstart, count(*) c1 from st interval(10s);
|
||||||
sql create stream streams1 trigger at_once into result.streamt SUBTABLE(concat("aaa-", tbname)) as select _wstart, count(*) c1 from st partition by tbname interval(10s);
|
sql create stream streams1 trigger at_once into result.streamt SUBTABLE(concat("aaa-", tbname)) as select _wstart, count(*) c1 from st partition by tbname interval(10s);
|
||||||
sql insert into t1 values(1648791213000,1,2,3);
|
sql insert into t1 values(1648791213000,1,2,3);
|
||||||
sql insert into t2 values(1648791213000,1,2,3);
|
sql insert into t2 values(1648791213000,1,2,3);
|
||||||
|
|
|
@ -20,7 +20,7 @@ sql create stable st(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
|
||||||
sql create table t1 using st tags(1,1,1);
|
sql create table t1 using st tags(1,1,1);
|
||||||
sql create table t2 using st tags(2,2,2);
|
sql create table t2 using st tags(2,2,2);
|
||||||
|
|
||||||
sql_error create stream streams1 trigger at_once into result.streamt SUBTABLE("aaa") as select _wstart, count(*) c1 from st interval(10s);
|
#sql_error create stream streams1 trigger at_once into result.streamt SUBTABLE("aaa") as select _wstart, count(*) c1 from st interval(10s);
|
||||||
sql create stream streams1 trigger at_once into result.streamt SUBTABLE( concat("aaa-", cast(a as varchar(10) ) ) ) as select _wstart, count(*) c1 from st partition by a interval(10s);
|
sql create stream streams1 trigger at_once into result.streamt SUBTABLE( concat("aaa-", cast(a as varchar(10) ) ) ) as select _wstart, count(*) c1 from st partition by a interval(10s);
|
||||||
print ===== insert into 1
|
print ===== insert into 1
|
||||||
sql insert into t1 values(1648791213000,1,2,3);
|
sql insert into t1 values(1648791213000,1,2,3);
|
||||||
|
|
Loading…
Reference in New Issue