Merge pull request #18902 from taosdata/fix/main_bugfix_wxy
fix: create stream error
This commit is contained in:
commit
5122a7e964
|
@ -2837,7 +2837,7 @@ static int32_t rewriteProjectAlias(SNodeList* pProjectionList) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t checkProjectAlias(STranslateContext* pCxt, SNodeList* pProjectionList) {
|
static int32_t checkProjectAlias(STranslateContext* pCxt, SNodeList* pProjectionList, SHashObj** pOutput) {
|
||||||
SHashObj* pUserAliasSet = taosHashInit(LIST_LENGTH(pProjectionList),
|
SHashObj* pUserAliasSet = taosHashInit(LIST_LENGTH(pProjectionList),
|
||||||
taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
|
taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
|
||||||
SNode* pProject = NULL;
|
SNode* pProject = NULL;
|
||||||
|
@ -2849,13 +2849,17 @@ static int32_t checkProjectAlias(STranslateContext* pCxt, SNodeList* pProjection
|
||||||
}
|
}
|
||||||
taosHashPut(pUserAliasSet, pExpr->userAlias, strlen(pExpr->userAlias), &pExpr, POINTER_BYTES);
|
taosHashPut(pUserAliasSet, pExpr->userAlias, strlen(pExpr->userAlias), &pExpr, POINTER_BYTES);
|
||||||
}
|
}
|
||||||
taosHashCleanup(pUserAliasSet);
|
if (NULL == pOutput) {
|
||||||
|
taosHashCleanup(pUserAliasSet);
|
||||||
|
} else {
|
||||||
|
*pOutput = pUserAliasSet;
|
||||||
|
}
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t translateProjectionList(STranslateContext* pCxt, SSelectStmt* pSelect) {
|
static int32_t translateProjectionList(STranslateContext* pCxt, SSelectStmt* pSelect) {
|
||||||
if (pSelect->isSubquery) {
|
if (pSelect->isSubquery) {
|
||||||
return checkProjectAlias(pCxt, pSelect->pProjectionList);
|
return checkProjectAlias(pCxt, pSelect->pProjectionList, NULL);
|
||||||
}
|
}
|
||||||
return rewriteProjectAlias(pSelect->pProjectionList);
|
return rewriteProjectAlias(pSelect->pProjectionList);
|
||||||
}
|
}
|
||||||
|
@ -3888,8 +3892,7 @@ static int32_t checkDbKeepOption(STranslateContext* pCxt, SDatabaseOptions* pOpt
|
||||||
pOptions->keep[0] > tsdbMaxKeep || pOptions->keep[1] > tsdbMaxKeep || pOptions->keep[2] > tsdbMaxKeep) {
|
pOptions->keep[0] > tsdbMaxKeep || pOptions->keep[1] > tsdbMaxKeep || pOptions->keep[2] > tsdbMaxKeep) {
|
||||||
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_DB_OPTION,
|
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_DB_OPTION,
|
||||||
"Invalid option keep: %" PRId64 ", %" PRId64 ", %" PRId64 " valid range: [%dm, %dm]",
|
"Invalid option keep: %" PRId64 ", %" PRId64 ", %" PRId64 " valid range: [%dm, %dm]",
|
||||||
pOptions->keep[0], pOptions->keep[1], pOptions->keep[2], TSDB_MIN_KEEP,
|
pOptions->keep[0], pOptions->keep[1], pOptions->keep[2], TSDB_MIN_KEEP, tsdbMaxKeep);
|
||||||
tsdbMaxKeep);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!((pOptions->keep[0] <= pOptions->keep[1]) && (pOptions->keep[1] <= pOptions->keep[2]))) {
|
if (!((pOptions->keep[0] <= pOptions->keep[1]) && (pOptions->keep[1] <= pOptions->keep[2]))) {
|
||||||
|
@ -4044,7 +4047,7 @@ static int32_t checkDatabaseOptions(STranslateContext* pCxt, const char* pDbName
|
||||||
code = checkDbPrecisionOption(pCxt, pOptions);
|
code = checkDbPrecisionOption(pCxt, pOptions);
|
||||||
}
|
}
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = checkDbKeepOption(pCxt, pOptions); // use precision
|
code = checkDbKeepOption(pCxt, pOptions); // use precision
|
||||||
}
|
}
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = checkDbRangeOption(pCxt, "pages", pOptions->pages, TSDB_MIN_PAGES_PER_VNODE, TSDB_MAX_PAGES_PER_VNODE);
|
code = checkDbRangeOption(pCxt, "pages", pOptions->pages, TSDB_MIN_PAGES_PER_VNODE, TSDB_MAX_PAGES_PER_VNODE);
|
||||||
|
@ -5504,9 +5507,24 @@ static void getSourceDatabase(SNode* pStmt, int32_t acctId, char* pDbFName) {
|
||||||
tNameGetFullDbName(&name, pDbFName);
|
tNameGetFullDbName(&name, pDbFName);
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t addWstartTsToCreateStreamQuery(SNode* pStmt) {
|
static void getStreamQueryFirstProjectAliasName(SHashObj* pUserAliasSet, char* aliasName, int32_t len) {
|
||||||
SSelectStmt* pSelect = (SSelectStmt*)pStmt;
|
if (NULL == taosHashGet(pUserAliasSet, "_wstart", strlen("_wstart"))) {
|
||||||
SNode* pProj = nodesListGetNode(pSelect->pProjectionList, 0);
|
snprintf(aliasName, len, "%s", "_wstart");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if (NULL == taosHashGet(pUserAliasSet, "ts", strlen("ts"))) {
|
||||||
|
snprintf(aliasName, len, "%s", "ts");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
do {
|
||||||
|
taosRandStr(aliasName, len - 1);
|
||||||
|
aliasName[len - 1] = '\0';
|
||||||
|
} while (NULL != taosHashGet(pUserAliasSet, aliasName, strlen(aliasName)));
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t addWstartTsToCreateStreamQueryImpl(SSelectStmt* pSelect, SHashObj* pUserAliasSet) {
|
||||||
|
SNode* pProj = nodesListGetNode(pSelect->pProjectionList, 0);
|
||||||
if (NULL == pSelect->pWindow ||
|
if (NULL == pSelect->pWindow ||
|
||||||
(QUERY_NODE_FUNCTION == nodeType(pProj) && 0 == strcmp("_wstart", ((SFunctionNode*)pProj)->functionName))) {
|
(QUERY_NODE_FUNCTION == nodeType(pProj) && 0 == strcmp("_wstart", ((SFunctionNode*)pProj)->functionName))) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
@ -5516,7 +5534,7 @@ static int32_t addWstartTsToCreateStreamQuery(SNode* pStmt) {
|
||||||
return TSDB_CODE_OUT_OF_MEMORY;
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
}
|
}
|
||||||
strcpy(pFunc->functionName, "_wstart");
|
strcpy(pFunc->functionName, "_wstart");
|
||||||
strcpy(pFunc->node.aliasName, pFunc->functionName);
|
getStreamQueryFirstProjectAliasName(pUserAliasSet, pFunc->node.aliasName, sizeof(pFunc->node.aliasName));
|
||||||
int32_t code = nodesListPushFront(pSelect->pProjectionList, (SNode*)pFunc);
|
int32_t code = nodesListPushFront(pSelect->pProjectionList, (SNode*)pFunc);
|
||||||
if (TSDB_CODE_SUCCESS != code) {
|
if (TSDB_CODE_SUCCESS != code) {
|
||||||
nodesDestroyNode((SNode*)pFunc);
|
nodesDestroyNode((SNode*)pFunc);
|
||||||
|
@ -5524,6 +5542,17 @@ static int32_t addWstartTsToCreateStreamQuery(SNode* pStmt) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t addWstartTsToCreateStreamQuery(STranslateContext* pCxt, SNode* pStmt) {
|
||||||
|
SSelectStmt* pSelect = (SSelectStmt*)pStmt;
|
||||||
|
SHashObj* pUserAliasSet = NULL;
|
||||||
|
int32_t code = checkProjectAlias(pCxt, pSelect->pProjectionList, &pUserAliasSet);
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
code = addWstartTsToCreateStreamQueryImpl(pSelect, pUserAliasSet);
|
||||||
|
}
|
||||||
|
taosHashCleanup(pUserAliasSet);
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t addTagsToCreateStreamQuery(STranslateContext* pCxt, SCreateStreamStmt* pStmt, SSelectStmt* pSelect) {
|
static int32_t addTagsToCreateStreamQuery(STranslateContext* pCxt, SCreateStreamStmt* pStmt, SSelectStmt* pSelect) {
|
||||||
if (NULL == pStmt->pTags) {
|
if (NULL == pStmt->pTags) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
@ -5626,7 +5655,7 @@ static int32_t checkStreamQuery(STranslateContext* pCxt, SSelectStmt* pSelect) {
|
||||||
|
|
||||||
static int32_t buildCreateStreamQuery(STranslateContext* pCxt, SCreateStreamStmt* pStmt, SCMCreateStreamReq* pReq) {
|
static int32_t buildCreateStreamQuery(STranslateContext* pCxt, SCreateStreamStmt* pStmt, SCMCreateStreamReq* pReq) {
|
||||||
pCxt->createStream = true;
|
pCxt->createStream = true;
|
||||||
int32_t code = addWstartTsToCreateStreamQuery(pStmt->pQuery);
|
int32_t code = addWstartTsToCreateStreamQuery(pCxt, pStmt->pQuery);
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = addSubtableInfoToCreateStreamQuery(pCxt, pStmt);
|
code = addSubtableInfoToCreateStreamQuery(pCxt, pStmt);
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue