fix: create stream error
This commit is contained in:
parent
dc64bb5110
commit
903804883d
|
@ -2837,7 +2837,7 @@ static int32_t rewriteProjectAlias(SNodeList* pProjectionList) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t checkProjectAlias(STranslateContext* pCxt, SNodeList* pProjectionList) {
|
||||
static int32_t checkProjectAlias(STranslateContext* pCxt, SNodeList* pProjectionList, SHashObj** pOutput) {
|
||||
SHashObj* pUserAliasSet = taosHashInit(LIST_LENGTH(pProjectionList),
|
||||
taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
|
||||
SNode* pProject = NULL;
|
||||
|
@ -2849,13 +2849,17 @@ static int32_t checkProjectAlias(STranslateContext* pCxt, SNodeList* pProjection
|
|||
}
|
||||
taosHashPut(pUserAliasSet, pExpr->userAlias, strlen(pExpr->userAlias), &pExpr, POINTER_BYTES);
|
||||
}
|
||||
taosHashCleanup(pUserAliasSet);
|
||||
if (NULL == pOutput) {
|
||||
taosHashCleanup(pUserAliasSet);
|
||||
} else {
|
||||
*pOutput = pUserAliasSet;
|
||||
}
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t translateProjectionList(STranslateContext* pCxt, SSelectStmt* pSelect) {
|
||||
if (pSelect->isSubquery) {
|
||||
return checkProjectAlias(pCxt, pSelect->pProjectionList);
|
||||
return checkProjectAlias(pCxt, pSelect->pProjectionList, NULL);
|
||||
}
|
||||
return rewriteProjectAlias(pSelect->pProjectionList);
|
||||
}
|
||||
|
@ -3888,8 +3892,7 @@ static int32_t checkDbKeepOption(STranslateContext* pCxt, SDatabaseOptions* pOpt
|
|||
pOptions->keep[0] > tsdbMaxKeep || pOptions->keep[1] > tsdbMaxKeep || pOptions->keep[2] > tsdbMaxKeep) {
|
||||
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_DB_OPTION,
|
||||
"Invalid option keep: %" PRId64 ", %" PRId64 ", %" PRId64 " valid range: [%dm, %dm]",
|
||||
pOptions->keep[0], pOptions->keep[1], pOptions->keep[2], TSDB_MIN_KEEP,
|
||||
tsdbMaxKeep);
|
||||
pOptions->keep[0], pOptions->keep[1], pOptions->keep[2], TSDB_MIN_KEEP, tsdbMaxKeep);
|
||||
}
|
||||
|
||||
if (!((pOptions->keep[0] <= pOptions->keep[1]) && (pOptions->keep[1] <= pOptions->keep[2]))) {
|
||||
|
@ -4044,7 +4047,7 @@ static int32_t checkDatabaseOptions(STranslateContext* pCxt, const char* pDbName
|
|||
code = checkDbPrecisionOption(pCxt, pOptions);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = checkDbKeepOption(pCxt, pOptions); // use precision
|
||||
code = checkDbKeepOption(pCxt, pOptions); // use precision
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = checkDbRangeOption(pCxt, "pages", pOptions->pages, TSDB_MIN_PAGES_PER_VNODE, TSDB_MAX_PAGES_PER_VNODE);
|
||||
|
@ -5504,9 +5507,21 @@ static void getSourceDatabase(SNode* pStmt, int32_t acctId, char* pDbFName) {
|
|||
tNameGetFullDbName(&name, pDbFName);
|
||||
}
|
||||
|
||||
static int32_t addWstartTsToCreateStreamQuery(SNode* pStmt) {
|
||||
SSelectStmt* pSelect = (SSelectStmt*)pStmt;
|
||||
SNode* pProj = nodesListGetNode(pSelect->pProjectionList, 0);
|
||||
static void getStreamQueryFirstProjectAliasName(SHashObj* pUserAliasSet, char* aliasName, int32_t len) {
|
||||
if (NULL == taosHashGet(pUserAliasSet, "_wstart", strlen("_wstart"))) {
|
||||
snprintf(aliasName, len, "%s", "_wstart");
|
||||
return;
|
||||
}
|
||||
if (NULL == taosHashGet(pUserAliasSet, "ts", strlen("ts"))) {
|
||||
snprintf(aliasName, len, "%s", "ts");
|
||||
return;
|
||||
}
|
||||
taosRandStr(aliasName, len);
|
||||
return;
|
||||
}
|
||||
|
||||
static int32_t addWstartTsToCreateStreamQueryImpl(SSelectStmt* pSelect, SHashObj* pUserAliasSet) {
|
||||
SNode* pProj = nodesListGetNode(pSelect->pProjectionList, 0);
|
||||
if (NULL == pSelect->pWindow ||
|
||||
(QUERY_NODE_FUNCTION == nodeType(pProj) && 0 == strcmp("_wstart", ((SFunctionNode*)pProj)->functionName))) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
@ -5516,7 +5531,7 @@ static int32_t addWstartTsToCreateStreamQuery(SNode* pStmt) {
|
|||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
strcpy(pFunc->functionName, "_wstart");
|
||||
strcpy(pFunc->node.aliasName, pFunc->functionName);
|
||||
getStreamQueryFirstProjectAliasName(pUserAliasSet, pFunc->node.aliasName, sizeof(pFunc->node.aliasName));
|
||||
int32_t code = nodesListPushFront(pSelect->pProjectionList, (SNode*)pFunc);
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
nodesDestroyNode((SNode*)pFunc);
|
||||
|
@ -5524,6 +5539,17 @@ static int32_t addWstartTsToCreateStreamQuery(SNode* pStmt) {
|
|||
return code;
|
||||
}
|
||||
|
||||
static int32_t addWstartTsToCreateStreamQuery(STranslateContext* pCxt, SNode* pStmt) {
|
||||
SSelectStmt* pSelect = (SSelectStmt*)pStmt;
|
||||
SHashObj* pUserAliasSet = NULL;
|
||||
int32_t code = checkProjectAlias(pCxt, pSelect->pProjectionList, &pUserAliasSet);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = addWstartTsToCreateStreamQueryImpl(pSelect, pUserAliasSet);
|
||||
}
|
||||
taosHashCleanup(pUserAliasSet);
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t addTagsToCreateStreamQuery(STranslateContext* pCxt, SCreateStreamStmt* pStmt, SSelectStmt* pSelect) {
|
||||
if (NULL == pStmt->pTags) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
@ -5626,7 +5652,7 @@ static int32_t checkStreamQuery(STranslateContext* pCxt, SSelectStmt* pSelect) {
|
|||
|
||||
static int32_t buildCreateStreamQuery(STranslateContext* pCxt, SCreateStreamStmt* pStmt, SCMCreateStreamReq* pReq) {
|
||||
pCxt->createStream = true;
|
||||
int32_t code = addWstartTsToCreateStreamQuery(pStmt->pQuery);
|
||||
int32_t code = addWstartTsToCreateStreamQuery(pCxt, pStmt->pQuery);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = addSubtableInfoToCreateStreamQuery(pCxt, pStmt);
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue