diff --git a/include/util/taoserror.h b/include/util/taoserror.h index 2fb4657407..87781b6313 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -475,6 +475,7 @@ int32_t* taosGetErrno(); #define TSDB_CODE_PAR_INVALID_PORT TAOS_DEF_ERROR_CODE(0, 0x2612) #define TSDB_CODE_PAR_INVALID_ENDPOINT TAOS_DEF_ERROR_CODE(0, 0x2613) #define TSDB_CODE_PAR_EXPRIE_STATEMENT TAOS_DEF_ERROR_CODE(0, 0x2614) +#define TSDB_CODE_PAR_INTERVAL_VALUE_TOO_SMALL TAOS_DEF_ERROR_CODE(0, 0x2615) #ifdef __cplusplus } diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index 930aadd9ad..f82ce2c1b4 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -824,11 +824,30 @@ static int32_t translateGroupBy(STranslateContext* pCxt, SNodeList* pGroupByList return translateExprList(pCxt, pGroupByList); } +static int32_t translateIntervalWindow(STranslateContext* pCxt, SIntervalWindowNode* pInterval) { + SValueNode* pIntervalVal = (SValueNode*)pInterval->pInterval; + SValueNode* pIntervalOffset = (SValueNode*)pInterval->pOffset; + SValueNode* pSliding = (SValueNode*)pInterval->pSliding; + if (pIntervalVal->datum.i <= 0) { + return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INTERVAL_VALUE_TOO_SMALL, pIntervalVal->literal); + } + return TSDB_CODE_SUCCESS; +} + static int32_t doTranslateWindow(STranslateContext* pCxt, SNode* pWindow) { + switch (nodeType(pWindow)) { + case QUERY_NODE_INTERVAL_WINDOW: + return translateIntervalWindow(pCxt, (SIntervalWindowNode*)pWindow); + default: + break; + } return TSDB_CODE_SUCCESS; } static int32_t translateWindow(STranslateContext* pCxt, SNode* pWindow) { + if (NULL == pWindow) { + return TSDB_CODE_SUCCESS; + } pCxt->currClause = SQL_CLAUSE_WINDOW; int32_t code = translateExpr(pCxt, pWindow); if (TSDB_CODE_SUCCESS == code) { diff --git a/source/libs/parser/src/parUtil.c b/source/libs/parser/src/parUtil.c index 64c6b761d9..2e3936a6fb 100644 --- a/source/libs/parser/src/parUtil.c +++ b/source/libs/parser/src/parUtil.c @@ -59,6 +59,8 @@ static char* getSyntaxErrFormat(int32_t errCode) { return "Endpoint should be in the format of 'fqdn:port'"; case TSDB_CODE_PAR_EXPRIE_STATEMENT: return "This statement is no longer supported"; + case TSDB_CODE_PAR_INTERVAL_VALUE_TOO_SMALL: + return "This interval value is too small : %s"; case TSDB_CODE_OUT_OF_MEMORY: return "Out of memory"; default: diff --git a/source/libs/parser/test/mockCatalogService.cpp b/source/libs/parser/test/mockCatalogService.cpp index 9abaf5b14f..4d80c47ec6 100644 --- a/source/libs/parser/test/mockCatalogService.cpp +++ b/source/libs/parser/test/mockCatalogService.cpp @@ -120,23 +120,15 @@ public: } int32_t catalogGetTableHashVgroup(const SName* pTableName, SVgroupInfo* vgInfo) const { - // todo - vgInfo->vgId = 1; - addEpIntoEpSet(&vgInfo->epSet, "node1", 6030); - return 0; + char db[TSDB_DB_NAME_LEN] = {0}; + tNameGetDbName(pTableName, db); + return copyTableVgroup(db, tNameGetTableName(pTableName), vgInfo); } - int32_t catalogGetTableDistVgInfo(const SName* pTableName, SArray** pVgList) const { - SVgroupInfo info = {0}; - info.vgId = 1; - addEpIntoEpSet(&info.epSet, "node1", 6030); - - info.hashBegin = 0; - info.hashEnd = 1; - *pVgList = taosArrayInit(4, sizeof(SVgroupInfo)); - - taosArrayPush(*pVgList, &info); - return 0; + int32_t catalogGetTableDistVgInfo(const SName* pTableName, SArray** vgList) const { + char db[TSDB_DB_NAME_LEN] = {0}; + tNameGetDbName(pTableName, db); + return copyTableVgroup(db, tNameGetTableName(pTableName), vgList); } TableBuilder& createTableBuilder(const std::string& db, const std::string& tbname, int8_t tableType, int32_t numOfColumns, int32_t numOfTags) { @@ -293,6 +285,27 @@ private: return TSDB_CODE_SUCCESS; } + int32_t copyTableVgroup(const std::string& db, const std::string& tbname, SVgroupInfo* vg) const { + std::shared_ptr table = getTableMeta(db, tbname); + if (table->vgs.empty()) { + return TSDB_CODE_SUCCESS; + } + memcpy(vg, &(table->vgs[0]), sizeof(SVgroupInfo)); + return TSDB_CODE_SUCCESS; + } + + int32_t copyTableVgroup(const std::string& db, const std::string& tbname, SArray** vgList) const { + std::shared_ptr table = getTableMeta(db, tbname); + if (table->vgs.empty()) { + return TSDB_CODE_SUCCESS; + } + *vgList = taosArrayInit(table->vgs.size(), sizeof(SVgroupInfo)); + for (const SVgroupInfo& vg : table->vgs) { + taosArrayPush(*vgList, &vg); + } + return TSDB_CODE_SUCCESS; + } + uint64_t id_; std::unique_ptr builder_; DbMetaCache meta_; diff --git a/source/libs/planner/src/planScaleOut.c b/source/libs/planner/src/planScaleOut.c index d88b564ee5..6928b1728f 100644 --- a/source/libs/planner/src/planScaleOut.c +++ b/source/libs/planner/src/planScaleOut.c @@ -85,7 +85,7 @@ static int32_t setScanVgroup(SLogicNode* pNode, const SVgroupInfo* pVgroup) { } static int32_t scaleOutForScan(SScaleOutContext* pCxt, SLogicSubplan* pSubplan, int32_t level, SNodeList* pGroup) { - if (pSubplan->pVgroupList) { + if (pSubplan->pVgroupList && !pCxt->pPlanCxt->streamQuery) { int32_t code = TSDB_CODE_SUCCESS; for (int32_t i = 0; i < pSubplan->pVgroupList->numOfVgroups; ++i) { SLogicSubplan* pNewSubplan = singleCloneSubLogicPlan(pCxt, pSubplan, level); diff --git a/source/libs/planner/src/planSpliter.c b/source/libs/planner/src/planSpliter.c index 2bba9aefb1..d203d41cb0 100644 --- a/source/libs/planner/src/planSpliter.c +++ b/source/libs/planner/src/planSpliter.c @@ -106,6 +106,8 @@ static int32_t stsCreateExchangeNode(SSplitContext* pCxt, SLogicSubplan* pSubpla return TSDB_CODE_OUT_OF_MEMORY; } + pSubplan->subplanType = SUBPLAN_TYPE_MERGE; + if (NULL == pScan->node.pParent) { pSubplan->pNode = (SLogicNode*)pExchange; return TSDB_CODE_SUCCESS; diff --git a/source/libs/planner/test/plannerTest.cpp b/source/libs/planner/test/plannerTest.cpp index 67a637a956..8958af4663 100644 --- a/source/libs/planner/test/plannerTest.cpp +++ b/source/libs/planner/test/plannerTest.cpp @@ -41,7 +41,7 @@ protected: cxt_.pSql = sqlBuf_.c_str(); } - bool run() { + bool run(bool streamQuery = false) { int32_t code = qParseQuerySql(&cxt_, &query_); if (code != TSDB_CODE_SUCCESS) { @@ -52,7 +52,7 @@ protected: const string syntaxTreeStr = toString(query_->pRoot, false); SLogicNode* pLogicNode = nullptr; - SPlanContext cxt = { .queryId = 1, .acctId = 0 }; + SPlanContext cxt = { .queryId = 1, .acctId = 0, .streamQuery = streamQuery }; setPlanContext(query_, &cxt); code = createLogicPlan(&cxt, &pLogicNode); if (code != TSDB_CODE_SUCCESS) { @@ -61,7 +61,7 @@ protected: } cout << "====================sql : [" << cxt_.pSql << "]" << endl; - cout << "syntax test : " << endl; + cout << "syntax tree : " << endl; cout << syntaxTreeStr << endl; cout << "unformatted logic plan : " << endl; cout << toString((const SNode*)pLogicNode, false) << endl; @@ -204,3 +204,10 @@ TEST_F(PlannerTest, createTopic) { bind("create topic tp as SELECT * FROM st1"); ASSERT_TRUE(run()); } + +TEST_F(PlannerTest, stream) { + setDatabase("root", "test"); + + bind("SELECT sum(c1) FROM st1"); + ASSERT_TRUE(run(true)); +}