This commit is contained in:
Xiaoyu Wang 2022-03-24 04:25:41 -04:00
parent 4e0bdd14db
commit 6951abce82
4 changed files with 41 additions and 19 deletions

View File

@ -120,23 +120,15 @@ public:
} }
int32_t catalogGetTableHashVgroup(const SName* pTableName, SVgroupInfo* vgInfo) const { int32_t catalogGetTableHashVgroup(const SName* pTableName, SVgroupInfo* vgInfo) const {
// todo char db[TSDB_DB_NAME_LEN] = {0};
vgInfo->vgId = 1; tNameGetDbName(pTableName, db);
addEpIntoEpSet(&vgInfo->epSet, "node1", 6030); return copyTableVgroup(db, tNameGetTableName(pTableName), vgInfo);
return 0;
} }
int32_t catalogGetTableDistVgInfo(const SName* pTableName, SArray** pVgList) const { int32_t catalogGetTableDistVgInfo(const SName* pTableName, SArray** vgList) const {
SVgroupInfo info = {0}; char db[TSDB_DB_NAME_LEN] = {0};
info.vgId = 1; tNameGetDbName(pTableName, db);
addEpIntoEpSet(&info.epSet, "node1", 6030); return copyTableVgroup(db, tNameGetTableName(pTableName), vgList);
info.hashBegin = 0;
info.hashEnd = 1;
*pVgList = taosArrayInit(4, sizeof(SVgroupInfo));
taosArrayPush(*pVgList, &info);
return 0;
} }
TableBuilder& createTableBuilder(const std::string& db, const std::string& tbname, int8_t tableType, int32_t numOfColumns, int32_t numOfTags) { TableBuilder& createTableBuilder(const std::string& db, const std::string& tbname, int8_t tableType, int32_t numOfColumns, int32_t numOfTags) {
@ -293,6 +285,27 @@ private:
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t copyTableVgroup(const std::string& db, const std::string& tbname, SVgroupInfo* vg) const {
std::shared_ptr<MockTableMeta> table = getTableMeta(db, tbname);
if (table->vgs.empty()) {
return TSDB_CODE_SUCCESS;
}
memcpy(vg, &(table->vgs[0]), sizeof(SVgroupInfo));
return TSDB_CODE_SUCCESS;
}
int32_t copyTableVgroup(const std::string& db, const std::string& tbname, SArray** vgList) const {
std::shared_ptr<MockTableMeta> table = getTableMeta(db, tbname);
if (table->vgs.empty()) {
return TSDB_CODE_SUCCESS;
}
*vgList = taosArrayInit(table->vgs.size(), sizeof(SVgroupInfo));
for (const SVgroupInfo& vg : table->vgs) {
taosArrayPush(*vgList, &vg);
}
return TSDB_CODE_SUCCESS;
}
uint64_t id_; uint64_t id_;
std::unique_ptr<TableBuilder> builder_; std::unique_ptr<TableBuilder> builder_;
DbMetaCache meta_; DbMetaCache meta_;

View File

@ -85,7 +85,7 @@ static int32_t setScanVgroup(SLogicNode* pNode, const SVgroupInfo* pVgroup) {
} }
static int32_t scaleOutForScan(SScaleOutContext* pCxt, SLogicSubplan* pSubplan, int32_t level, SNodeList* pGroup) { static int32_t scaleOutForScan(SScaleOutContext* pCxt, SLogicSubplan* pSubplan, int32_t level, SNodeList* pGroup) {
if (pSubplan->pVgroupList) { if (pSubplan->pVgroupList && !pCxt->pPlanCxt->streamQuery) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
for (int32_t i = 0; i < pSubplan->pVgroupList->numOfVgroups; ++i) { for (int32_t i = 0; i < pSubplan->pVgroupList->numOfVgroups; ++i) {
SLogicSubplan* pNewSubplan = singleCloneSubLogicPlan(pCxt, pSubplan, level); SLogicSubplan* pNewSubplan = singleCloneSubLogicPlan(pCxt, pSubplan, level);

View File

@ -106,6 +106,8 @@ static int32_t stsCreateExchangeNode(SSplitContext* pCxt, SLogicSubplan* pSubpla
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
pSubplan->subplanType = SUBPLAN_TYPE_MERGE;
if (NULL == pScan->node.pParent) { if (NULL == pScan->node.pParent) {
pSubplan->pNode = (SLogicNode*)pExchange; pSubplan->pNode = (SLogicNode*)pExchange;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;

View File

@ -41,7 +41,7 @@ protected:
cxt_.pSql = sqlBuf_.c_str(); cxt_.pSql = sqlBuf_.c_str();
} }
bool run() { bool run(bool streamQuery = false) {
int32_t code = qParseQuerySql(&cxt_, &query_); int32_t code = qParseQuerySql(&cxt_, &query_);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
@ -52,7 +52,7 @@ protected:
const string syntaxTreeStr = toString(query_->pRoot, false); const string syntaxTreeStr = toString(query_->pRoot, false);
SLogicNode* pLogicNode = nullptr; SLogicNode* pLogicNode = nullptr;
SPlanContext cxt = { .queryId = 1, .acctId = 0 }; SPlanContext cxt = { .queryId = 1, .acctId = 0, .streamQuery = streamQuery };
setPlanContext(query_, &cxt); setPlanContext(query_, &cxt);
code = createLogicPlan(&cxt, &pLogicNode); code = createLogicPlan(&cxt, &pLogicNode);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
@ -61,7 +61,7 @@ protected:
} }
cout << "====================sql : [" << cxt_.pSql << "]" << endl; cout << "====================sql : [" << cxt_.pSql << "]" << endl;
cout << "syntax test : " << endl; cout << "syntax tree : " << endl;
cout << syntaxTreeStr << endl; cout << syntaxTreeStr << endl;
cout << "unformatted logic plan : " << endl; cout << "unformatted logic plan : " << endl;
cout << toString((const SNode*)pLogicNode, false) << endl; cout << toString((const SNode*)pLogicNode, false) << endl;
@ -204,3 +204,10 @@ TEST_F(PlannerTest, createTopic) {
bind("create topic tp as SELECT * FROM st1"); bind("create topic tp as SELECT * FROM st1");
ASSERT_TRUE(run()); ASSERT_TRUE(run());
} }
TEST_F(PlannerTest, stream) {
setDatabase("root", "test");
bind("SELECT sum(c1) FROM st1");
ASSERT_TRUE(run(true));
}