fix: automatically supplement primary key columns when creating stream
This commit is contained in:
parent
8b486765d3
commit
047f65935d
|
@ -4264,6 +4264,38 @@ static void getSourceDatabase(SNode* pStmt, int32_t acctId, char* pDbFName) {
|
||||||
tNameGetFullDbName(&name, pDbFName);
|
tNameGetFullDbName(&name, pDbFName);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t addWstartTsToCreateStreamQuery(SNode* pStmt) {
|
||||||
|
SSelectStmt* pSelect = (SSelectStmt*)pStmt;
|
||||||
|
SNode* pProj = nodesListGetNode(pSelect->pProjectionList, 0);
|
||||||
|
if (QUERY_NODE_FUNCTION == nodeType(pProj) && FUNCTION_TYPE_WSTARTTS == ((SFunctionNode*)pProj)->funcType) {
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
SFunctionNode* pFunc = (SFunctionNode*)nodesMakeNode(QUERY_NODE_FUNCTION);
|
||||||
|
if (NULL == pFunc) {
|
||||||
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
strcpy(pFunc->functionName, "_wstartts");
|
||||||
|
strcpy(pFunc->node.aliasName, pFunc->functionName);
|
||||||
|
int32_t code = nodesListPushFront(pSelect->pProjectionList, (SNode*)pFunc);
|
||||||
|
if (TSDB_CODE_SUCCESS != code) {
|
||||||
|
nodesDestroyNode((SNode*)pFunc);
|
||||||
|
}
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t buildCreateStreamQuery(STranslateContext* pCxt, SNode* pStmt, SCMCreateStreamReq* pReq) {
|
||||||
|
pCxt->createStream = true;
|
||||||
|
int32_t code = addWstartTsToCreateStreamQuery(pStmt);
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
code = translateQuery(pCxt, pStmt);
|
||||||
|
}
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
getSourceDatabase(pStmt, pCxt->pParseCxt->acctId, pReq->sourceDB);
|
||||||
|
code = nodesNodeToString(pStmt, false, &pReq->ast, NULL);
|
||||||
|
}
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t buildCreateStreamReq(STranslateContext* pCxt, SCreateStreamStmt* pStmt, SCMCreateStreamReq* pReq) {
|
static int32_t buildCreateStreamReq(STranslateContext* pCxt, SCreateStreamStmt* pStmt, SCMCreateStreamReq* pReq) {
|
||||||
pReq->igExists = pStmt->ignoreExists;
|
pReq->igExists = pStmt->ignoreExists;
|
||||||
|
|
||||||
|
@ -4277,13 +4309,7 @@ static int32_t buildCreateStreamReq(STranslateContext* pCxt, SCreateStreamStmt*
|
||||||
tNameExtractFullName(&name, pReq->targetStbFullName);
|
tNameExtractFullName(&name, pReq->targetStbFullName);
|
||||||
}
|
}
|
||||||
|
|
||||||
pCxt->createStream = true;
|
int32_t code = buildCreateStreamQuery(pCxt, pStmt->pQuery, pReq);
|
||||||
int32_t code = translateQuery(pCxt, pStmt->pQuery);
|
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
|
||||||
getSourceDatabase(pStmt->pQuery, pCxt->pParseCxt->acctId, pReq->sourceDB);
|
|
||||||
code = nodesNodeToString(pStmt->pQuery, false, &pReq->ast, NULL);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
pReq->sql = strdup(pCxt->pParseCxt->pSql);
|
pReq->sql = strdup(pCxt->pParseCxt->pSql);
|
||||||
if (NULL == pReq->sql) {
|
if (NULL == pReq->sql) {
|
||||||
|
|
Loading…
Reference in New Issue