From 6951abce82b75a2af9b0b0154f62f85d38b1d70b Mon Sep 17 00:00:00 2001 From: Xiaoyu Wang Date: Thu, 24 Mar 2022 04:25:41 -0400 Subject: [PATCH] bugfix --- .../libs/parser/test/mockCatalogService.cpp | 43 ++++++++++++------- source/libs/planner/src/planScaleOut.c | 2 +- source/libs/planner/src/planSpliter.c | 2 + source/libs/planner/test/plannerTest.cpp | 13 ++++-- 4 files changed, 41 insertions(+), 19 deletions(-) diff --git a/source/libs/parser/test/mockCatalogService.cpp b/source/libs/parser/test/mockCatalogService.cpp index 9abaf5b14f..4d80c47ec6 100644 --- a/source/libs/parser/test/mockCatalogService.cpp +++ b/source/libs/parser/test/mockCatalogService.cpp @@ -120,23 +120,15 @@ public: } int32_t catalogGetTableHashVgroup(const SName* pTableName, SVgroupInfo* vgInfo) const { - // todo - vgInfo->vgId = 1; - addEpIntoEpSet(&vgInfo->epSet, "node1", 6030); - return 0; + char db[TSDB_DB_NAME_LEN] = {0}; + tNameGetDbName(pTableName, db); + return copyTableVgroup(db, tNameGetTableName(pTableName), vgInfo); } - int32_t catalogGetTableDistVgInfo(const SName* pTableName, SArray** pVgList) const { - SVgroupInfo info = {0}; - info.vgId = 1; - addEpIntoEpSet(&info.epSet, "node1", 6030); - - info.hashBegin = 0; - info.hashEnd = 1; - *pVgList = taosArrayInit(4, sizeof(SVgroupInfo)); - - taosArrayPush(*pVgList, &info); - return 0; + int32_t catalogGetTableDistVgInfo(const SName* pTableName, SArray** vgList) const { + char db[TSDB_DB_NAME_LEN] = {0}; + tNameGetDbName(pTableName, db); + return copyTableVgroup(db, tNameGetTableName(pTableName), vgList); } TableBuilder& createTableBuilder(const std::string& db, const std::string& tbname, int8_t tableType, int32_t numOfColumns, int32_t numOfTags) { @@ -293,6 +285,27 @@ private: return TSDB_CODE_SUCCESS; } + int32_t copyTableVgroup(const std::string& db, const std::string& tbname, SVgroupInfo* vg) const { + std::shared_ptr table = getTableMeta(db, tbname); + if (table->vgs.empty()) { + return TSDB_CODE_SUCCESS; + } + memcpy(vg, &(table->vgs[0]), sizeof(SVgroupInfo)); + return TSDB_CODE_SUCCESS; + } + + int32_t copyTableVgroup(const std::string& db, const std::string& tbname, SArray** vgList) const { + std::shared_ptr table = getTableMeta(db, tbname); + if (table->vgs.empty()) { + return TSDB_CODE_SUCCESS; + } + *vgList = taosArrayInit(table->vgs.size(), sizeof(SVgroupInfo)); + for (const SVgroupInfo& vg : table->vgs) { + taosArrayPush(*vgList, &vg); + } + return TSDB_CODE_SUCCESS; + } + uint64_t id_; std::unique_ptr builder_; DbMetaCache meta_; diff --git a/source/libs/planner/src/planScaleOut.c b/source/libs/planner/src/planScaleOut.c index d88b564ee5..6928b1728f 100644 --- a/source/libs/planner/src/planScaleOut.c +++ b/source/libs/planner/src/planScaleOut.c @@ -85,7 +85,7 @@ static int32_t setScanVgroup(SLogicNode* pNode, const SVgroupInfo* pVgroup) { } static int32_t scaleOutForScan(SScaleOutContext* pCxt, SLogicSubplan* pSubplan, int32_t level, SNodeList* pGroup) { - if (pSubplan->pVgroupList) { + if (pSubplan->pVgroupList && !pCxt->pPlanCxt->streamQuery) { int32_t code = TSDB_CODE_SUCCESS; for (int32_t i = 0; i < pSubplan->pVgroupList->numOfVgroups; ++i) { SLogicSubplan* pNewSubplan = singleCloneSubLogicPlan(pCxt, pSubplan, level); diff --git a/source/libs/planner/src/planSpliter.c b/source/libs/planner/src/planSpliter.c index 2bba9aefb1..d203d41cb0 100644 --- a/source/libs/planner/src/planSpliter.c +++ b/source/libs/planner/src/planSpliter.c @@ -106,6 +106,8 @@ static int32_t stsCreateExchangeNode(SSplitContext* pCxt, SLogicSubplan* pSubpla return TSDB_CODE_OUT_OF_MEMORY; } + pSubplan->subplanType = SUBPLAN_TYPE_MERGE; + if (NULL == pScan->node.pParent) { pSubplan->pNode = (SLogicNode*)pExchange; return TSDB_CODE_SUCCESS; diff --git a/source/libs/planner/test/plannerTest.cpp b/source/libs/planner/test/plannerTest.cpp index 67a637a956..8958af4663 100644 --- a/source/libs/planner/test/plannerTest.cpp +++ b/source/libs/planner/test/plannerTest.cpp @@ -41,7 +41,7 @@ protected: cxt_.pSql = sqlBuf_.c_str(); } - bool run() { + bool run(bool streamQuery = false) { int32_t code = qParseQuerySql(&cxt_, &query_); if (code != TSDB_CODE_SUCCESS) { @@ -52,7 +52,7 @@ protected: const string syntaxTreeStr = toString(query_->pRoot, false); SLogicNode* pLogicNode = nullptr; - SPlanContext cxt = { .queryId = 1, .acctId = 0 }; + SPlanContext cxt = { .queryId = 1, .acctId = 0, .streamQuery = streamQuery }; setPlanContext(query_, &cxt); code = createLogicPlan(&cxt, &pLogicNode); if (code != TSDB_CODE_SUCCESS) { @@ -61,7 +61,7 @@ protected: } cout << "====================sql : [" << cxt_.pSql << "]" << endl; - cout << "syntax test : " << endl; + cout << "syntax tree : " << endl; cout << syntaxTreeStr << endl; cout << "unformatted logic plan : " << endl; cout << toString((const SNode*)pLogicNode, false) << endl; @@ -204,3 +204,10 @@ TEST_F(PlannerTest, createTopic) { bind("create topic tp as SELECT * FROM st1"); ASSERT_TRUE(run()); } + +TEST_F(PlannerTest, stream) { + setDatabase("root", "test"); + + bind("SELECT sum(c1) FROM st1"); + ASSERT_TRUE(run(true)); +}