fix: support writing streams to existing tables

This commit is contained in:
Xiaoyu Wang 2023-01-18 12:00:53 +08:00
parent caf3de2959
commit 57258e2527
1 changed files with 31 additions and 17 deletions

View File

@ -5662,6 +5662,30 @@ static int32_t addTagsToCreateStreamQuery(STranslateContext* pCxt, SCreateStream
return TSDB_CODE_SUCCESS;
}
static SNode* createNullValue() {
SValueNode* pValue = (SValueNode*)nodesMakeNode(QUERY_NODE_VALUE);
if (NULL == pValue) {
return NULL;
}
pValue->isNull = true;
pValue->node.resType.type = TSDB_DATA_TYPE_NULL;
pValue->node.resType.bytes = tDataTypes[TSDB_DATA_TYPE_NULL].bytes;
return (SNode*)pValue;
}
static int32_t addNullTagsForExistTable(STranslateContext* pCxt, STableMeta* pMeta, SSelectStmt* pSelect) {
if (NULL == pMeta) {
return TSDB_CODE_SUCCESS;
}
int32_t numOfTags = getNumOfTags(pMeta);
int32_t code = TSDB_CODE_SUCCESS;
for (int32_t i = 0; TSDB_CODE_SUCCESS == code && i < numOfTags; ++i) {
code = nodesListMakeStrictAppend(&pSelect->pTags, createNullValue());
}
return code;
}
typedef struct SRewriteSubtableCxt {
STranslateContext* pCxt;
SNodeList* pPartitionList;
@ -5707,13 +5731,11 @@ static int32_t addSubtableNameToCreateStreamQuery(STranslateContext* pCxt, SCrea
return pCxt->errCode;
}
static int32_t addSubtableInfoToCreateStreamQuery(STranslateContext* pCxt, SCreateStreamStmt* pStmt) {
static int32_t addSubtableInfoToCreateStreamQuery(STranslateContext* pCxt, STableMeta* pMeta,
SCreateStreamStmt* pStmt) {
SSelectStmt* pSelect = (SSelectStmt*)pStmt->pQuery;
if (NULL == pSelect->pPartitionByList) {
if (NULL != pStmt->pTags || NULL != pStmt->pSubtable) {
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY, "Unsupported stream query");
}
return TSDB_CODE_SUCCESS;
return addNullTagsForExistTable(pCxt, pMeta, pSelect);
}
int32_t code = addTagsToCreateStreamQuery(pCxt, pStmt, pSelect);
@ -5914,17 +5936,6 @@ static int32_t adjustDataTypeOfTags(STranslateContext* pCxt, const STableMeta* p
return TSDB_CODE_SUCCESS;
}
static SNode* createNullValue() {
SValueNode* pValue = (SValueNode*)nodesMakeNode(QUERY_NODE_VALUE);
if (NULL == pValue) {
return NULL;
}
pValue->isNull = true;
pValue->node.resType.type = TSDB_DATA_TYPE_NULL;
pValue->node.resType.bytes = tDataTypes[TSDB_DATA_TYPE_NULL].bytes;
return (SNode*)pValue;
}
static int32_t adjustOrderOfTags(STranslateContext* pCxt, SNodeList* pTags, const STableMeta* pMeta,
SNodeList** pTagExprs, SCMCreateStreamReq* pReq) {
if (LIST_LENGTH(pTags) != LIST_LENGTH(*pTagExprs)) {
@ -5993,6 +6004,9 @@ static int32_t adjustOrderOfTags(STranslateContext* pCxt, SNodeList* pTags, cons
static int32_t adjustTagsForExistTable(STranslateContext* pCxt, SCreateStreamStmt* pStmt, const STableMeta* pMeta,
SCMCreateStreamReq* pReq) {
SSelectStmt* pSelect = (SSelectStmt*)pStmt->pQuery;
if (NULL == pSelect->pPartitionByList) {
return TSDB_CODE_SUCCESS;
}
if (NULL == pStmt->pTags) {
return adjustDataTypeOfTags(pCxt, pMeta, pSelect->pTags);
}
@ -6021,7 +6035,7 @@ static int32_t buildCreateStreamQuery(STranslateContext* pCxt, SCreateStreamStmt
STableMeta* pMeta = NULL;
int32_t code = translateStreamTargetTable(pCxt, pStmt, pReq, &pMeta);
if (TSDB_CODE_SUCCESS == code) {
code = addSubtableInfoToCreateStreamQuery(pCxt, pStmt);
code = addSubtableInfoToCreateStreamQuery(pCxt, pMeta, pStmt);
}
if (TSDB_CODE_SUCCESS == code) {
code = translateQuery(pCxt, pStmt->pQuery);