From 536c6bdbc5e732ffba754e322e12c587ae251762 Mon Sep 17 00:00:00 2001 From: Xiaoyu Wang Date: Mon, 8 Aug 2022 11:40:48 +0800 Subject: [PATCH] fix: add checks for stream query --- source/common/src/systable.c | 8 ++++---- source/libs/parser/src/parTranslater.c | 23 +++++++++++++++++------ source/libs/planner/src/planSpliter.c | 2 +- 3 files changed, 22 insertions(+), 11 deletions(-) diff --git a/source/common/src/systable.c b/source/common/src/systable.c index a79082ab23..b779f21cf5 100644 --- a/source/common/src/systable.c +++ b/source/common/src/systable.c @@ -16,8 +16,8 @@ #include "systable.h" #include "taos.h" #include "tdef.h" -#include "types.h" #include "tgrant.h" +#include "types.h" #define SYSTABLE_SCH_TABLE_NAME_LEN ((TSDB_TABLE_NAME_LEN - 1) + VARSTR_HEADER_SIZE) #define SYSTABLE_SCH_DB_NAME_LEN ((TSDB_DB_NAME_LEN - 1) + VARSTR_HEADER_SIZE) @@ -97,7 +97,7 @@ static const SSysDbTableSchema userDBSchema[] = { {.name = "wal_retention_period", .bytes = 4, .type = TSDB_DATA_TYPE_INT}, {.name = "wal_retention_size", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT}, {.name = "wal_roll_period", .bytes = 4, .type = TSDB_DATA_TYPE_INT}, - {.name = "wal_seg_size", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT}, + {.name = "wal_segment_size", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT}, }; static const SSysDbTableSchema userFuncSchema[] = { @@ -243,8 +243,8 @@ static const SSysTableMeta infosMeta[] = { {TSDB_INS_TABLE_MNODES, mnodesSchema, tListLen(mnodesSchema)}, {TSDB_INS_TABLE_MODULES, modulesSchema, tListLen(modulesSchema)}, {TSDB_INS_TABLE_QNODES, qnodesSchema, tListLen(qnodesSchema)}, -// {TSDB_INS_TABLE_SNODES, snodesSchema, tListLen(snodesSchema)}, -// {TSDB_INS_TABLE_BNODES, bnodesSchema, tListLen(bnodesSchema)}, + // {TSDB_INS_TABLE_SNODES, snodesSchema, tListLen(snodesSchema)}, + // {TSDB_INS_TABLE_BNODES, bnodesSchema, tListLen(bnodesSchema)}, {TSDB_INS_TABLE_CLUSTER, clusterSchema, tListLen(clusterSchema)}, {TSDB_INS_TABLE_DATABASES, userDBSchema, tListLen(userDBSchema)}, {TSDB_INS_TABLE_FUNCTIONS, userFuncSchema, tListLen(userFuncSchema)}, diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index 9971f20d3d..be16289595 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -4708,6 +4708,12 @@ static int32_t translateKillTransaction(STranslateContext* pCxt, SKillStmt* pStm return buildCmdMsg(pCxt, TDMT_MND_KILL_TRANS, (FSerializeFunc)tSerializeSKillTransReq, &killReq); } +static bool crossTableWithoutAggOper(SSelectStmt* pSelect) { + return NULL == pSelect->pWindow && !pSelect->hasAggFuncs && !pSelect->hasIndefiniteRowsFunc && + !pSelect->hasInterpFunc && TSDB_SUPER_TABLE == ((SRealTableNode*)pSelect->pFromTable)->pMeta->tableType && + !isPartitionByTbname(pSelect->pPartitionByList); +} + static int32_t checkCreateStream(STranslateContext* pCxt, SCreateStreamStmt* pStmt) { if (NULL != pStmt->pOptions->pWatermark && (DEAL_RES_ERROR == translateValue(pCxt, (SValueNode*)pStmt->pOptions->pWatermark))) { @@ -4723,14 +4729,19 @@ static int32_t checkCreateStream(STranslateContext* pCxt, SCreateStreamStmt* pSt return TSDB_CODE_SUCCESS; } - if (QUERY_NODE_SELECT_STMT == nodeType(pStmt->pQuery)) { - SSelectStmt* pSelect = (SSelectStmt*)pStmt->pQuery; - if (QUERY_NODE_REAL_TABLE == nodeType(pSelect->pFromTable)) { - return TSDB_CODE_SUCCESS; - } + if (QUERY_NODE_SELECT_STMT != nodeType(pStmt->pQuery)) { + return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY, "Unsupported stream query"); } - return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY, "Unsupported stream query"); + SSelectStmt* pSelect = (SSelectStmt*)pStmt->pQuery; + + if (QUERY_NODE_REAL_TABLE != nodeType(pSelect->pFromTable) || + TSDB_DATA_TYPE_TIMESTAMP != ((SExprNode*)nodesListGetNode(pSelect->pProjectionList, 0))->resType.type || + !pSelect->isTimeLineResult || crossTableWithoutAggOper(pSelect)) { + return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY, "Unsupported stream query"); + } + + return TSDB_CODE_SUCCESS; } static void getSourceDatabase(SNode* pStmt, int32_t acctId, char* pDbFName) { diff --git a/source/libs/planner/src/planSpliter.c b/source/libs/planner/src/planSpliter.c index e10f0586ca..97977878ad 100644 --- a/source/libs/planner/src/planSpliter.c +++ b/source/libs/planner/src/planSpliter.c @@ -268,7 +268,7 @@ static bool stbSplNeedSplit(bool streamQuery, SLogicNode* pNode) { case QUERY_NODE_LOGIC_PLAN_JOIN: return stbSplNeedSplitJoin(streamQuery, (SJoinLogicNode*)pNode); case QUERY_NODE_LOGIC_PLAN_PARTITION: - return stbSplIsMultiTbScanChild(streamQuery, pNode); + return streamQuery ? false : stbSplIsMultiTbScanChild(streamQuery, pNode); case QUERY_NODE_LOGIC_PLAN_AGG: return !stbSplHasGatherExecFunc(((SAggLogicNode*)pNode)->pAggFuncs) && stbSplHasMultiTbScan(streamQuery, pNode); case QUERY_NODE_LOGIC_PLAN_WINDOW: