fix: improve create stream check

This commit is contained in:
Xiaoyu Wang 2022-08-11 22:08:19 +08:00
parent ad92904ac4
commit 03784406c4
3 changed files with 17 additions and 1 deletions

View File

@ -269,6 +269,7 @@ typedef struct SSelectStmt {
bool hasInterpFunc; bool hasInterpFunc;
bool hasLastRowFunc; bool hasLastRowFunc;
bool hasTimeLineFunc; bool hasTimeLineFunc;
bool hasUdaf;
bool onlyHasKeepOrderFunc; bool onlyHasKeepOrderFunc;
bool groupSort; bool groupSort;
} SSelectStmt; } SSelectStmt;

View File

@ -1350,6 +1350,7 @@ static void setFuncClassification(SNode* pCurrStmt, SFunctionNode* pFunc) {
pSelect->hasInterpFunc = pSelect->hasInterpFunc ? true : (FUNCTION_TYPE_INTERP == pFunc->funcType); pSelect->hasInterpFunc = pSelect->hasInterpFunc ? true : (FUNCTION_TYPE_INTERP == pFunc->funcType);
pSelect->hasLastRowFunc = pSelect->hasLastRowFunc ? true : (FUNCTION_TYPE_LAST_ROW == pFunc->funcType); pSelect->hasLastRowFunc = pSelect->hasLastRowFunc ? true : (FUNCTION_TYPE_LAST_ROW == pFunc->funcType);
pSelect->hasTimeLineFunc = pSelect->hasTimeLineFunc ? true : fmIsTimelineFunc(pFunc->funcId); pSelect->hasTimeLineFunc = pSelect->hasTimeLineFunc ? true : fmIsTimelineFunc(pFunc->funcId);
pSelect->hasUdaf = pSelect->hasUdaf ? true : fmIsUserDefinedFunc(pFunc->funcId) && fmIsAggFunc(pFunc->funcId);
pSelect->onlyHasKeepOrderFunc = pSelect->onlyHasKeepOrderFunc ? fmIsKeepOrderFunc(pFunc->funcId) : false; pSelect->onlyHasKeepOrderFunc = pSelect->onlyHasKeepOrderFunc ? fmIsKeepOrderFunc(pFunc->funcId) : false;
} }
} }
@ -2644,6 +2645,11 @@ static int32_t translateInterp(STranslateContext* pCxt, SSelectStmt* pSelect) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
if (NULL == pSelect->pRange || NULL == pSelect->pEvery || NULL == pSelect->pFill) {
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_INTERP_CLAUSE,
"Missing RANGE clause, EVERY clause or FILL clause");
}
int32_t code = translateExpr(pCxt, &pSelect->pRange); int32_t code = translateExpr(pCxt, &pSelect->pRange);
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = translateExpr(pCxt, &pSelect->pEvery); code = translateExpr(pCxt, &pSelect->pEvery);
@ -4734,6 +4740,11 @@ static bool crossTableWithoutAggOper(SSelectStmt* pSelect) {
!isPartitionByTbname(pSelect->pPartitionByList); !isPartitionByTbname(pSelect->pPartitionByList);
} }
static bool crossTableWithUdaf(SSelectStmt* pSelect) {
return pSelect->hasUdaf && TSDB_SUPER_TABLE == ((SRealTableNode*)pSelect->pFromTable)->pMeta->tableType &&
!isPartitionByTbname(pSelect->pPartitionByList);
}
static int32_t checkCreateStream(STranslateContext* pCxt, SCreateStreamStmt* pStmt) { static int32_t checkCreateStream(STranslateContext* pCxt, SCreateStreamStmt* pStmt) {
if (NULL != pStmt->pOptions->pWatermark && if (NULL != pStmt->pOptions->pWatermark &&
(DEAL_RES_ERROR == translateValue(pCxt, (SValueNode*)pStmt->pOptions->pWatermark))) { (DEAL_RES_ERROR == translateValue(pCxt, (SValueNode*)pStmt->pOptions->pWatermark))) {
@ -4785,7 +4796,8 @@ static int32_t addWstartTsToCreateStreamQuery(SNode* pStmt) {
static int32_t checkStreamQuery(STranslateContext* pCxt, SSelectStmt* pSelect) { static int32_t checkStreamQuery(STranslateContext* pCxt, SSelectStmt* pSelect) {
if (TSDB_DATA_TYPE_TIMESTAMP != ((SExprNode*)nodesListGetNode(pSelect->pProjectionList, 0))->resType.type || if (TSDB_DATA_TYPE_TIMESTAMP != ((SExprNode*)nodesListGetNode(pSelect->pProjectionList, 0))->resType.type ||
!pSelect->isTimeLineResult || crossTableWithoutAggOper(pSelect)) { !pSelect->isTimeLineResult || crossTableWithoutAggOper(pSelect) || NULL != pSelect->pOrderByList ||
crossTableWithUdaf(pSelect)) {
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY, "Unsupported stream query"); return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY, "Unsupported stream query");
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;

View File

@ -1615,6 +1615,9 @@ static int32_t partTagsOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicSub
if (QUERY_NODE_LOGIC_PLAN_PARTITION == nodeType(pNode)) { if (QUERY_NODE_LOGIC_PLAN_PARTITION == nodeType(pNode)) {
TSWAP(((SPartitionLogicNode*)pNode)->pPartitionKeys, pScan->pGroupTags); TSWAP(((SPartitionLogicNode*)pNode)->pPartitionKeys, pScan->pGroupTags);
int32_t code = replaceLogicNode(pLogicSubplan, pNode, (SLogicNode*)pScan); int32_t code = replaceLogicNode(pLogicSubplan, pNode, (SLogicNode*)pScan);
if (TSDB_CODE_SUCCESS == code) {
code = adjustLogicNodeDataRequirement((SLogicNode*)pScan, pNode->resultDataOrder);
}
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
NODES_CLEAR_LIST(pNode->pChildren); NODES_CLEAR_LIST(pNode->pChildren);
nodesDestroyNode((SNode*)pNode); nodesDestroyNode((SNode*)pNode);