fix: the problem of creating a stream with tag define
This commit is contained in:
parent
fecd2f5835
commit
7532ab9abd
|
@ -5671,10 +5671,6 @@ static SNode* createNullValue() {
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t addNullTagsForExistTable(STranslateContext* pCxt, STableMeta* pMeta, SSelectStmt* pSelect) {
|
static int32_t addNullTagsForExistTable(STranslateContext* pCxt, STableMeta* pMeta, SSelectStmt* pSelect) {
|
||||||
if (NULL == pMeta) {
|
|
||||||
return TSDB_CODE_SUCCESS;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t numOfTags = getNumOfTags(pMeta);
|
int32_t numOfTags = getNumOfTags(pMeta);
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
for (int32_t i = 0; TSDB_CODE_SUCCESS == code && i < numOfTags; ++i) {
|
for (int32_t i = 0; TSDB_CODE_SUCCESS == code && i < numOfTags; ++i) {
|
||||||
|
@ -5728,12 +5724,27 @@ static int32_t addSubtableNameToCreateStreamQuery(STranslateContext* pCxt, SCrea
|
||||||
return pCxt->errCode;
|
return pCxt->errCode;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t addNullTagsForCreateTable(STranslateContext* pCxt, SCreateStreamStmt* pStmt) {
|
||||||
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
for (int32_t i = 0; TSDB_CODE_SUCCESS == code && i < LIST_LENGTH(pStmt->pTags); ++i) {
|
||||||
|
code = nodesListMakeStrictAppend(&((SSelectStmt*)pStmt->pQuery)->pTags, createNullValue());
|
||||||
|
}
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t addNullTagsToCreateStreamQuery(STranslateContext* pCxt, STableMeta* pMeta, SCreateStreamStmt* pStmt) {
|
||||||
|
if (NULL == pMeta) {
|
||||||
|
return addNullTagsForCreateTable(pCxt, pStmt);
|
||||||
|
}
|
||||||
|
return addNullTagsForExistTable(pCxt, pMeta, (SSelectStmt*)pStmt->pQuery);
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t addSubtableInfoToCreateStreamQuery(STranslateContext* pCxt, STableMeta* pMeta,
|
static int32_t addSubtableInfoToCreateStreamQuery(STranslateContext* pCxt, STableMeta* pMeta,
|
||||||
SCreateStreamStmt* pStmt) {
|
SCreateStreamStmt* pStmt) {
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
SSelectStmt* pSelect = (SSelectStmt*)pStmt->pQuery;
|
SSelectStmt* pSelect = (SSelectStmt*)pStmt->pQuery;
|
||||||
if (NULL == pSelect->pPartitionByList) {
|
if (NULL == pSelect->pPartitionByList) {
|
||||||
code = addNullTagsForExistTable(pCxt, pMeta, pSelect);
|
code = addNullTagsToCreateStreamQuery(pCxt, pMeta, pStmt);
|
||||||
} else {
|
} else {
|
||||||
code = addTagsToCreateStreamQuery(pCxt, pStmt, pSelect);
|
code = addTagsToCreateStreamQuery(pCxt, pStmt, pSelect);
|
||||||
}
|
}
|
||||||
|
@ -6011,17 +6022,66 @@ static int32_t adjustTagsForExistTable(STranslateContext* pCxt, SCreateStreamStm
|
||||||
return adjustOrderOfTags(pCxt, pStmt->pTags, pMeta, &pSelect->pTags, pReq);
|
return adjustOrderOfTags(pCxt, pStmt->pTags, pMeta, &pSelect->pTags, pReq);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t adjustTagsForCreateTable(STranslateContext* pCxt, SCreateStreamStmt* pStmt, SCMCreateStreamReq* pReq) {
|
||||||
|
SSelectStmt* pSelect = (SSelectStmt*)pStmt->pQuery;
|
||||||
|
if (NULL == pSelect->pPartitionByList || NULL == pSelect->pTags) {
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
SNode* pTagDef = NULL;
|
||||||
|
SNode* pTagExpr = NULL;
|
||||||
|
FORBOTH(pTagDef, pStmt->pTags, pTagExpr, pSelect->pTags) {
|
||||||
|
SColumnDefNode* pDef = (SColumnDefNode*)pTagDef;
|
||||||
|
if (!dataTypeEqual(&pDef->dataType, &((SExprNode*)pTagExpr)->resType)) {
|
||||||
|
SNode* pFunc = NULL;
|
||||||
|
int32_t code = createCastFunc(pCxt, pTagExpr, pDef->dataType, &pFunc);
|
||||||
|
if (TSDB_CODE_SUCCESS != code) {
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
REPLACE_LIST2_NODE(pFunc);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t adjustTags(STranslateContext* pCxt, SCreateStreamStmt* pStmt, const STableMeta* pMeta,
|
||||||
|
SCMCreateStreamReq* pReq) {
|
||||||
|
if (NULL == pMeta) {
|
||||||
|
return adjustTagsForCreateTable(pCxt, pStmt, pReq);
|
||||||
|
}
|
||||||
|
return adjustTagsForExistTable(pCxt, pStmt, pMeta, pReq);
|
||||||
|
}
|
||||||
|
|
||||||
|
static bool isTagDef(SNodeList* pTags) {
|
||||||
|
if (NULL == pTags) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
return QUERY_NODE_COLUMN_DEF == nodeType(nodesListGetNode(pTags, 0));
|
||||||
|
}
|
||||||
|
|
||||||
|
static bool isTagBound(SNodeList* pTags) {
|
||||||
|
if (NULL == pTags) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
return QUERY_NODE_COLUMN == nodeType(nodesListGetNode(pTags, 0));
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t translateStreamTargetTable(STranslateContext* pCxt, SCreateStreamStmt* pStmt, SCMCreateStreamReq* pReq,
|
static int32_t translateStreamTargetTable(STranslateContext* pCxt, SCreateStreamStmt* pStmt, SCMCreateStreamReq* pReq,
|
||||||
STableMeta** pMeta) {
|
STableMeta** pMeta) {
|
||||||
int32_t code = getTableMeta(pCxt, pStmt->targetDbName, pStmt->targetTabName, pMeta);
|
int32_t code = getTableMeta(pCxt, pStmt->targetDbName, pStmt->targetTabName, pMeta);
|
||||||
if (TSDB_CODE_PAR_TABLE_NOT_EXIST == code) {
|
if (TSDB_CODE_PAR_TABLE_NOT_EXIST == code) {
|
||||||
if (NULL != pStmt->pCols) {
|
if (NULL != pStmt->pCols || isTagBound(pStmt->pTags)) {
|
||||||
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_TABLE_NOT_EXIST, pStmt->targetTabName);
|
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_TABLE_NOT_EXIST, pStmt->targetTabName);
|
||||||
}
|
}
|
||||||
pReq->createStb = STREAM_CREATE_STABLE_TRUE;
|
pReq->createStb = STREAM_CREATE_STABLE_TRUE;
|
||||||
pReq->targetStbUid = 0;
|
pReq->targetStbUid = 0;
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
} else {
|
} else {
|
||||||
|
if (isTagDef(pStmt->pTags)) {
|
||||||
|
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY, "Table already exist: %s",
|
||||||
|
pStmt->targetTabName);
|
||||||
|
}
|
||||||
pReq->createStb = STREAM_CREATE_STABLE_FALSE;
|
pReq->createStb = STREAM_CREATE_STABLE_FALSE;
|
||||||
pReq->targetStbUid = (*pMeta)->suid;
|
pReq->targetStbUid = (*pMeta)->suid;
|
||||||
}
|
}
|
||||||
|
@ -6047,8 +6107,8 @@ static int32_t buildCreateStreamQuery(STranslateContext* pCxt, SCreateStreamStmt
|
||||||
if (TSDB_CODE_SUCCESS == code && NULL != pMeta) {
|
if (TSDB_CODE_SUCCESS == code && NULL != pMeta) {
|
||||||
code = adjustProjectionsForExistTable(pCxt, pStmt, pMeta, pReq);
|
code = adjustProjectionsForExistTable(pCxt, pStmt, pMeta, pReq);
|
||||||
}
|
}
|
||||||
if (TSDB_CODE_SUCCESS == code && NULL != pMeta) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = adjustTagsForExistTable(pCxt, pStmt, pMeta, pReq);
|
code = adjustTags(pCxt, pStmt, pMeta, pReq);
|
||||||
}
|
}
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
getSourceDatabase(pStmt->pQuery, pCxt->pParseCxt->acctId, pReq->sourceDB);
|
getSourceDatabase(pStmt->pQuery, pCxt->pParseCxt->acctId, pReq->sourceDB);
|
||||||
|
|
Loading…
Reference in New Issue