Merge pull request #10973 from taosdata/feature/3.0_wxy
stream plan bugfix
This commit is contained in:
commit
78ee535616
|
@ -475,6 +475,7 @@ int32_t* taosGetErrno();
|
|||
#define TSDB_CODE_PAR_INVALID_PORT TAOS_DEF_ERROR_CODE(0, 0x2612)
|
||||
#define TSDB_CODE_PAR_INVALID_ENDPOINT TAOS_DEF_ERROR_CODE(0, 0x2613)
|
||||
#define TSDB_CODE_PAR_EXPRIE_STATEMENT TAOS_DEF_ERROR_CODE(0, 0x2614)
|
||||
#define TSDB_CODE_PAR_INTERVAL_VALUE_TOO_SMALL TAOS_DEF_ERROR_CODE(0, 0x2615)
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
|
|
|
@ -824,11 +824,30 @@ static int32_t translateGroupBy(STranslateContext* pCxt, SNodeList* pGroupByList
|
|||
return translateExprList(pCxt, pGroupByList);
|
||||
}
|
||||
|
||||
static int32_t translateIntervalWindow(STranslateContext* pCxt, SIntervalWindowNode* pInterval) {
|
||||
SValueNode* pIntervalVal = (SValueNode*)pInterval->pInterval;
|
||||
SValueNode* pIntervalOffset = (SValueNode*)pInterval->pOffset;
|
||||
SValueNode* pSliding = (SValueNode*)pInterval->pSliding;
|
||||
if (pIntervalVal->datum.i <= 0) {
|
||||
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INTERVAL_VALUE_TOO_SMALL, pIntervalVal->literal);
|
||||
}
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t doTranslateWindow(STranslateContext* pCxt, SNode* pWindow) {
|
||||
switch (nodeType(pWindow)) {
|
||||
case QUERY_NODE_INTERVAL_WINDOW:
|
||||
return translateIntervalWindow(pCxt, (SIntervalWindowNode*)pWindow);
|
||||
default:
|
||||
break;
|
||||
}
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t translateWindow(STranslateContext* pCxt, SNode* pWindow) {
|
||||
if (NULL == pWindow) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
pCxt->currClause = SQL_CLAUSE_WINDOW;
|
||||
int32_t code = translateExpr(pCxt, pWindow);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
|
|
|
@ -59,6 +59,8 @@ static char* getSyntaxErrFormat(int32_t errCode) {
|
|||
return "Endpoint should be in the format of 'fqdn:port'";
|
||||
case TSDB_CODE_PAR_EXPRIE_STATEMENT:
|
||||
return "This statement is no longer supported";
|
||||
case TSDB_CODE_PAR_INTERVAL_VALUE_TOO_SMALL:
|
||||
return "This interval value is too small : %s";
|
||||
case TSDB_CODE_OUT_OF_MEMORY:
|
||||
return "Out of memory";
|
||||
default:
|
||||
|
|
|
@ -120,23 +120,15 @@ public:
|
|||
}
|
||||
|
||||
int32_t catalogGetTableHashVgroup(const SName* pTableName, SVgroupInfo* vgInfo) const {
|
||||
// todo
|
||||
vgInfo->vgId = 1;
|
||||
addEpIntoEpSet(&vgInfo->epSet, "node1", 6030);
|
||||
return 0;
|
||||
char db[TSDB_DB_NAME_LEN] = {0};
|
||||
tNameGetDbName(pTableName, db);
|
||||
return copyTableVgroup(db, tNameGetTableName(pTableName), vgInfo);
|
||||
}
|
||||
|
||||
int32_t catalogGetTableDistVgInfo(const SName* pTableName, SArray** pVgList) const {
|
||||
SVgroupInfo info = {0};
|
||||
info.vgId = 1;
|
||||
addEpIntoEpSet(&info.epSet, "node1", 6030);
|
||||
|
||||
info.hashBegin = 0;
|
||||
info.hashEnd = 1;
|
||||
*pVgList = taosArrayInit(4, sizeof(SVgroupInfo));
|
||||
|
||||
taosArrayPush(*pVgList, &info);
|
||||
return 0;
|
||||
int32_t catalogGetTableDistVgInfo(const SName* pTableName, SArray** vgList) const {
|
||||
char db[TSDB_DB_NAME_LEN] = {0};
|
||||
tNameGetDbName(pTableName, db);
|
||||
return copyTableVgroup(db, tNameGetTableName(pTableName), vgList);
|
||||
}
|
||||
|
||||
TableBuilder& createTableBuilder(const std::string& db, const std::string& tbname, int8_t tableType, int32_t numOfColumns, int32_t numOfTags) {
|
||||
|
@ -293,6 +285,27 @@ private:
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t copyTableVgroup(const std::string& db, const std::string& tbname, SVgroupInfo* vg) const {
|
||||
std::shared_ptr<MockTableMeta> table = getTableMeta(db, tbname);
|
||||
if (table->vgs.empty()) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
memcpy(vg, &(table->vgs[0]), sizeof(SVgroupInfo));
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t copyTableVgroup(const std::string& db, const std::string& tbname, SArray** vgList) const {
|
||||
std::shared_ptr<MockTableMeta> table = getTableMeta(db, tbname);
|
||||
if (table->vgs.empty()) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
*vgList = taosArrayInit(table->vgs.size(), sizeof(SVgroupInfo));
|
||||
for (const SVgroupInfo& vg : table->vgs) {
|
||||
taosArrayPush(*vgList, &vg);
|
||||
}
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
uint64_t id_;
|
||||
std::unique_ptr<TableBuilder> builder_;
|
||||
DbMetaCache meta_;
|
||||
|
|
|
@ -85,7 +85,7 @@ static int32_t setScanVgroup(SLogicNode* pNode, const SVgroupInfo* pVgroup) {
|
|||
}
|
||||
|
||||
static int32_t scaleOutForScan(SScaleOutContext* pCxt, SLogicSubplan* pSubplan, int32_t level, SNodeList* pGroup) {
|
||||
if (pSubplan->pVgroupList) {
|
||||
if (pSubplan->pVgroupList && !pCxt->pPlanCxt->streamQuery) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
for (int32_t i = 0; i < pSubplan->pVgroupList->numOfVgroups; ++i) {
|
||||
SLogicSubplan* pNewSubplan = singleCloneSubLogicPlan(pCxt, pSubplan, level);
|
||||
|
|
|
@ -106,6 +106,8 @@ static int32_t stsCreateExchangeNode(SSplitContext* pCxt, SLogicSubplan* pSubpla
|
|||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
pSubplan->subplanType = SUBPLAN_TYPE_MERGE;
|
||||
|
||||
if (NULL == pScan->node.pParent) {
|
||||
pSubplan->pNode = (SLogicNode*)pExchange;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
|
|
@ -41,7 +41,7 @@ protected:
|
|||
cxt_.pSql = sqlBuf_.c_str();
|
||||
}
|
||||
|
||||
bool run() {
|
||||
bool run(bool streamQuery = false) {
|
||||
int32_t code = qParseQuerySql(&cxt_, &query_);
|
||||
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
|
@ -52,7 +52,7 @@ protected:
|
|||
const string syntaxTreeStr = toString(query_->pRoot, false);
|
||||
|
||||
SLogicNode* pLogicNode = nullptr;
|
||||
SPlanContext cxt = { .queryId = 1, .acctId = 0 };
|
||||
SPlanContext cxt = { .queryId = 1, .acctId = 0, .streamQuery = streamQuery };
|
||||
setPlanContext(query_, &cxt);
|
||||
code = createLogicPlan(&cxt, &pLogicNode);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
|
@ -61,7 +61,7 @@ protected:
|
|||
}
|
||||
|
||||
cout << "====================sql : [" << cxt_.pSql << "]" << endl;
|
||||
cout << "syntax test : " << endl;
|
||||
cout << "syntax tree : " << endl;
|
||||
cout << syntaxTreeStr << endl;
|
||||
cout << "unformatted logic plan : " << endl;
|
||||
cout << toString((const SNode*)pLogicNode, false) << endl;
|
||||
|
@ -204,3 +204,10 @@ TEST_F(PlannerTest, createTopic) {
|
|||
bind("create topic tp as SELECT * FROM st1");
|
||||
ASSERT_TRUE(run());
|
||||
}
|
||||
|
||||
TEST_F(PlannerTest, stream) {
|
||||
setDatabase("root", "test");
|
||||
|
||||
bind("SELECT sum(c1) FROM st1");
|
||||
ASSERT_TRUE(run(true));
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue