Merge pull request #9132 from taosdata/feature/3.0_wxy
TD-12192 Physical plan code of super table query.
This commit is contained in:
commit
e35bac2d28
|
@ -84,8 +84,8 @@ typedef struct SProjectPhyNode {
|
|||
|
||||
typedef struct SExchangePhyNode {
|
||||
SPhyNode node;
|
||||
uint64_t templateId;
|
||||
SArray *pSourceEpSet; // SEpSet
|
||||
uint64_t srcTemplateId; // template id of datasource suplans
|
||||
SArray *pSourceEpSet; // SEpSet, scheduler fill by calling qSetSuplanExecutionNode
|
||||
} SExchangePhyNode;
|
||||
|
||||
typedef struct SSubplanId {
|
||||
|
@ -113,6 +113,8 @@ typedef struct SQueryDag {
|
|||
*/
|
||||
int32_t qCreateQueryDag(const struct SQueryStmtInfo* pQueryInfo, struct SEpSet* pQnode, struct SQueryDag** pDag);
|
||||
|
||||
int32_t qSetSuplanExecutionNode(SArray* subplans, SArray* nodes);
|
||||
|
||||
int32_t qExplainQuery(const struct SQueryStmtInfo* pQueryInfo, struct SEpSet* pQnode, char** str);
|
||||
|
||||
|
||||
|
@ -126,7 +128,7 @@ int32_t qSubPlanToString(struct SSubplan *pPhyNode, char** str);
|
|||
* @param pQueryPhyNode
|
||||
* @return
|
||||
*/
|
||||
void* qDestroyQueryDag(struct SQueryDag* pDag);
|
||||
void qDestroyQueryDag(struct SQueryDag* pDag);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
|
|
|
@ -27,7 +27,7 @@ OP_ENUM_MACRO(TableScan)
|
|||
OP_ENUM_MACRO(DataBlocksOptScan)
|
||||
OP_ENUM_MACRO(TableSeqScan)
|
||||
OP_ENUM_MACRO(TagScan)
|
||||
OP_ENUM_MACRO(TableBlockInfoScan)
|
||||
OP_ENUM_MACRO(SystemTableScan)
|
||||
OP_ENUM_MACRO(Aggregate)
|
||||
OP_ENUM_MACRO(Project)
|
||||
OP_ENUM_MACRO(Groupby)
|
||||
|
|
|
@ -99,6 +99,8 @@ int32_t queryPlanToString(struct SQueryPlanNode* pQueryNode, char** str);
|
|||
*/
|
||||
int32_t queryPlanToSql(struct SQueryPlanNode* pQueryNode, char** sql);
|
||||
|
||||
int32_t createDag(SQueryPlanNode* pQueryNode, struct SCatalog* pCatalog, SQueryDag** pDag);
|
||||
|
||||
/**
|
||||
* Convert to physical plan to string to enable to print it out in the shell.
|
||||
* @param pPhyNode
|
||||
|
@ -111,7 +113,7 @@ int32_t phyPlanToString(struct SPhyNode *pPhyNode, char** str);
|
|||
* Destroy the query plan object.
|
||||
* @return
|
||||
*/
|
||||
void* destroyQueryPlan(struct SQueryPlanNode* pQueryNode);
|
||||
void destroyQueryPlan(struct SQueryPlanNode* pQueryNode);
|
||||
|
||||
/**
|
||||
* Destroy the physical plan.
|
||||
|
|
|
@ -0,0 +1,604 @@
|
|||
/*
|
||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||
*
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3
|
||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#include "function.h"
|
||||
#include "os.h"
|
||||
#include "parser.h"
|
||||
#include "plannerInt.h"
|
||||
|
||||
typedef struct SFillEssInfo {
|
||||
int32_t fillType; // fill type
|
||||
int64_t *val; // fill value
|
||||
} SFillEssInfo;
|
||||
|
||||
typedef struct SJoinCond {
|
||||
bool tagExists; // denote if tag condition exists or not
|
||||
SColumn *tagCond[2];
|
||||
SColumn *colCond[2];
|
||||
} SJoinCond;
|
||||
|
||||
static SArray* createQueryPlanImpl(SQueryStmtInfo* pQueryInfo);
|
||||
static void doDestroyQueryNode(SQueryPlanNode* pQueryNode);
|
||||
|
||||
int32_t printExprInfo(char* buf, const SQueryPlanNode* pQueryNode, int32_t len);
|
||||
int32_t optimizeQueryPlan(struct SQueryPlanNode* pQueryNode) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t createQueryPlan(const struct SQueryStmtInfo* pQueryInfo, struct SQueryPlanNode** pQueryNode) {
|
||||
SArray* upstream = createQueryPlanImpl((struct SQueryStmtInfo*) pQueryInfo);
|
||||
assert(taosArrayGetSize(upstream) == 1);
|
||||
|
||||
*pQueryNode = taosArrayGetP(upstream, 0);
|
||||
|
||||
taosArrayDestroy(upstream);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t queryPlanToSql(struct SQueryPlanNode* pQueryNode, char** sql) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
void destroyQueryPlan(SQueryPlanNode* pQueryNode) {
|
||||
if (pQueryNode == NULL) {
|
||||
return;
|
||||
}
|
||||
|
||||
doDestroyQueryNode(pQueryNode);
|
||||
}
|
||||
|
||||
//======================================================================================================================
|
||||
|
||||
static SQueryPlanNode* createQueryNode(int32_t type, const char* name, SQueryPlanNode** prev, int32_t numOfPrev,
|
||||
SExprInfo** pExpr, int32_t numOfOutput, void* pExtInfo) {
|
||||
SQueryPlanNode* pNode = calloc(1, sizeof(SQueryPlanNode));
|
||||
|
||||
pNode->info.type = type;
|
||||
pNode->info.name = strdup(name);
|
||||
|
||||
pNode->numOfExpr = numOfOutput;
|
||||
pNode->pExpr = taosArrayInit(numOfOutput, POINTER_BYTES);
|
||||
|
||||
for(int32_t i = 0; i < numOfOutput; ++i) {
|
||||
taosArrayPush(pNode->pExpr, &pExpr[i]);
|
||||
}
|
||||
|
||||
pNode->pChildren = taosArrayInit(4, POINTER_BYTES);
|
||||
for(int32_t i = 0; i < numOfPrev; ++i) {
|
||||
taosArrayPush(pNode->pChildren, &prev[i]);
|
||||
}
|
||||
|
||||
switch(type) {
|
||||
case QNODE_TAGSCAN:
|
||||
case QNODE_TABLESCAN: {
|
||||
SQueryTableInfo* info = calloc(1, sizeof(SQueryTableInfo));
|
||||
memcpy(info, pExtInfo, sizeof(SQueryTableInfo));
|
||||
info->tableName = strdup(((SQueryTableInfo*) pExtInfo)->tableName);
|
||||
pNode->pExtInfo = info;
|
||||
break;
|
||||
}
|
||||
|
||||
case QNODE_TIMEWINDOW: {
|
||||
SInterval* pInterval = calloc(1, sizeof(SInterval));
|
||||
pNode->pExtInfo = pInterval;
|
||||
memcpy(pInterval, pExtInfo, sizeof(SInterval));
|
||||
break;
|
||||
}
|
||||
|
||||
case QNODE_STATEWINDOW: {
|
||||
SColumn* psw = calloc(1, sizeof(SColumn));
|
||||
pNode->pExtInfo = psw;
|
||||
memcpy(psw, pExtInfo, sizeof(SColumn));
|
||||
break;
|
||||
}
|
||||
|
||||
case QNODE_SESSIONWINDOW: {
|
||||
SSessionWindow *pSessionWindow = calloc(1, sizeof(SSessionWindow));
|
||||
pNode->pExtInfo = pSessionWindow;
|
||||
memcpy(pSessionWindow, pExtInfo, sizeof(struct SSessionWindow));
|
||||
break;
|
||||
}
|
||||
|
||||
case QNODE_GROUPBY: {
|
||||
SGroupbyExpr* p = (SGroupbyExpr*) pExtInfo;
|
||||
|
||||
SGroupbyExpr* pGroupbyExpr = calloc(1, sizeof(SGroupbyExpr));
|
||||
pGroupbyExpr->groupbyTag = p->groupbyTag;
|
||||
pGroupbyExpr->columnInfo = taosArrayDup(p->columnInfo);
|
||||
|
||||
pNode->pExtInfo = pGroupbyExpr;
|
||||
break;
|
||||
}
|
||||
|
||||
case QNODE_FILL: { // todo !!
|
||||
pNode->pExtInfo = pExtInfo;
|
||||
break;
|
||||
}
|
||||
|
||||
case QNODE_LIMIT: {
|
||||
pNode->pExtInfo = calloc(1, sizeof(SLimit));
|
||||
memcpy(pNode->pExtInfo, pExtInfo, sizeof(SLimit));
|
||||
break;
|
||||
}
|
||||
|
||||
case QNODE_SORT: {
|
||||
pNode->pExtInfo = taosArrayDup(pExtInfo);
|
||||
break;
|
||||
}
|
||||
|
||||
default:
|
||||
break;
|
||||
}
|
||||
|
||||
return pNode;
|
||||
}
|
||||
|
||||
static SQueryPlanNode* doAddTableColumnNode(SQueryStmtInfo* pQueryInfo, STableMetaInfo* pTableMetaInfo, SQueryTableInfo* info,
|
||||
SArray* pExprs, SArray* tableCols) {
|
||||
if (pQueryInfo->info.onlyTagQuery) {
|
||||
int32_t num = (int32_t) taosArrayGetSize(pExprs);
|
||||
SQueryPlanNode* pNode = createQueryNode(QNODE_TAGSCAN, "TableTagScan", NULL, 0, pExprs->pData, num, info);
|
||||
|
||||
if (pQueryInfo->info.distinct) {
|
||||
pNode = createQueryNode(QNODE_DISTINCT, "Distinct", &pNode, 1, pExprs->pData, num, NULL);
|
||||
}
|
||||
|
||||
return pNode;
|
||||
}
|
||||
|
||||
SQueryPlanNode* pNode = createQueryNode(QNODE_TABLESCAN, "TableScan", NULL, 0, NULL, 0, info);
|
||||
|
||||
if (pQueryInfo->info.projectionQuery) {
|
||||
int32_t numOfOutput = (int32_t) taosArrayGetSize(pExprs);
|
||||
pNode = createQueryNode(QNODE_PROJECT, "Projection", &pNode, 1, pExprs->pData, numOfOutput, NULL);
|
||||
} else {
|
||||
STableMetaInfo* pTableMetaInfo1 = getMetaInfo(pQueryInfo, 0);
|
||||
|
||||
// table source column projection, generate the projection expr
|
||||
int32_t numOfCols = (int32_t) taosArrayGetSize(tableCols);
|
||||
SExprInfo** pExpr = calloc(numOfCols, POINTER_BYTES);
|
||||
for (int32_t i = 0; i < numOfCols; ++i) {
|
||||
SColumn* pCol = taosArrayGetP(tableCols, i);
|
||||
|
||||
SSourceParam param = {0};
|
||||
addIntoSourceParam(¶m, NULL, pCol);
|
||||
SSchema s = createSchema(pCol->info.type, pCol->info.bytes, pCol->info.colId, pCol->name);
|
||||
SExprInfo* p = createExprInfo(pTableMetaInfo1, "project", ¶m, &s, 0);
|
||||
pExpr[i] = p;
|
||||
}
|
||||
|
||||
pNode = createQueryNode(QNODE_PROJECT, "Projection", &pNode, 1, pExpr, numOfCols, NULL);
|
||||
tfree(pExpr);
|
||||
}
|
||||
|
||||
return pNode;
|
||||
}
|
||||
|
||||
static SQueryPlanNode* doCreateQueryPlanForSingleTableImpl(SQueryStmtInfo* pQueryInfo, SQueryPlanNode* pNode, SQueryTableInfo* info) {
|
||||
// group by column not by tag
|
||||
size_t numOfGroupCols = taosArrayGetSize(pQueryInfo->groupbyExpr.columnInfo);
|
||||
|
||||
// check for aggregation
|
||||
int32_t level = getExprFunctionLevel(pQueryInfo);
|
||||
|
||||
for(int32_t i = level - 1; i >= 0; --i) {
|
||||
SArray* p = pQueryInfo->exprList[i];
|
||||
size_t num = taosArrayGetSize(p);
|
||||
|
||||
bool aggregateFunc = false;
|
||||
for(int32_t j = 0; j < num; ++j) {
|
||||
SExprInfo* pExpr = (SExprInfo*)taosArrayGetP(p, 0);
|
||||
if (pExpr->pExpr->nodeType != TEXPR_FUNCTION_NODE) {
|
||||
continue;
|
||||
}
|
||||
|
||||
aggregateFunc = qIsAggregateFunction(pExpr->pExpr->_function.functionName);
|
||||
if (aggregateFunc) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (aggregateFunc) {
|
||||
if (pQueryInfo->interval.interval > 0) {
|
||||
pNode = createQueryNode(QNODE_TIMEWINDOW, "TimeWindowAgg", &pNode, 1, p->pData, num, &pQueryInfo->interval);
|
||||
} else if (pQueryInfo->sessionWindow.gap > 0) {
|
||||
pNode = createQueryNode(QNODE_SESSIONWINDOW, "SessionWindowAgg", &pNode, 1, p->pData, num, &pQueryInfo->sessionWindow);
|
||||
} else if (pQueryInfo->stateWindow.col.info.colId > 0) {
|
||||
pNode = createQueryNode(QNODE_STATEWINDOW, "StateWindowAgg", &pNode, 1, p->pData, num, &pQueryInfo->stateWindow);
|
||||
} else if (numOfGroupCols != 0 && !pQueryInfo->groupbyExpr.groupbyTag) {
|
||||
pNode = createQueryNode(QNODE_GROUPBY, "Groupby", &pNode, 1, p->pData, num, &pQueryInfo->groupbyExpr);
|
||||
} else {
|
||||
pNode = createQueryNode(QNODE_AGGREGATE, "Aggregate", &pNode, 1, p->pData, num, NULL);
|
||||
}
|
||||
} else {
|
||||
pNode = createQueryNode(QNODE_PROJECT, "Projection", &pNode, 1, p->pData, num, NULL);
|
||||
}
|
||||
}
|
||||
|
||||
if (pQueryInfo->havingFieldNum > 0) {
|
||||
// int32_t numOfExpr = (int32_t)taosArrayGetSize(pQueryInfo->exprList1);
|
||||
// pNode = createQueryNode(QNODE_PROJECT, "Projection", &pNode, 1, pQueryInfo->exprList1->pData, numOfExpr, NULL);
|
||||
}
|
||||
|
||||
if (pQueryInfo->fillType != TSDB_FILL_NONE) {
|
||||
SFillEssInfo* pInfo = calloc(1, sizeof(SFillEssInfo));
|
||||
pInfo->fillType = pQueryInfo->fillType;
|
||||
pInfo->val = calloc(pNode->numOfExpr, sizeof(int64_t));
|
||||
memcpy(pInfo->val, pQueryInfo->fillVal, pNode->numOfExpr);
|
||||
|
||||
SArray* p = pQueryInfo->exprList[0]; // top expression in select clause
|
||||
pNode = createQueryNode(QNODE_FILL, "Fill", &pNode, 1, p, taosArrayGetSize(p), pInfo);
|
||||
}
|
||||
|
||||
if (pQueryInfo->order != NULL) {
|
||||
SArray* pList = pQueryInfo->exprList[0];
|
||||
pNode = createQueryNode(QNODE_SORT, "Sort", &pNode, 1, pList->pData, taosArrayGetSize(pList), pQueryInfo->order);
|
||||
}
|
||||
|
||||
if (pQueryInfo->limit.limit != -1 || pQueryInfo->limit.offset != 0) {
|
||||
pNode = createQueryNode(QNODE_LIMIT, "Limit", &pNode, 1, NULL, 0, &pQueryInfo->limit);
|
||||
}
|
||||
|
||||
return pNode;
|
||||
}
|
||||
|
||||
static SQueryPlanNode* doCreateQueryPlanForSingleTable(SQueryStmtInfo* pQueryInfo, STableMetaInfo* pTableMetaInfo, SArray* pExprs,
|
||||
SArray* tableCols) {
|
||||
char name[TSDB_TABLE_FNAME_LEN] = {0};
|
||||
tstrncpy(name, pTableMetaInfo->name.tname, TSDB_TABLE_FNAME_LEN);
|
||||
|
||||
SQueryTableInfo info = {.tableName = strdup(name), .uid = pTableMetaInfo->pTableMeta->uid,};
|
||||
|
||||
// handle the only tag query
|
||||
SQueryPlanNode* pNode = doAddTableColumnNode(pQueryInfo, pTableMetaInfo, &info, pExprs, tableCols);
|
||||
if (pQueryInfo->info.onlyTagQuery) {
|
||||
tfree(info.tableName);
|
||||
return pNode;
|
||||
}
|
||||
|
||||
SQueryPlanNode* pNode1 = doCreateQueryPlanForSingleTableImpl(pQueryInfo, pNode, &info);
|
||||
tfree(info.tableName);
|
||||
return pNode1;
|
||||
}
|
||||
|
||||
static bool isAllAggExpr(SArray* pList) {
|
||||
assert(pList != NULL);
|
||||
|
||||
for (int32_t k = 0; k < taosArrayGetSize(pList); ++k) {
|
||||
SExprInfo* p = taosArrayGetP(pList, k);
|
||||
if (p->pExpr->nodeType != TEXPR_FUNCTION_NODE || !qIsAggregateFunction(p->pExpr->_function.functionName)) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
SArray* createQueryPlanImpl(SQueryStmtInfo* pQueryInfo) {
|
||||
SArray* upstream = NULL;
|
||||
|
||||
if (pQueryInfo->pUpstream != NULL && taosArrayGetSize(pQueryInfo->pUpstream) > 0) { // subquery in the from clause
|
||||
upstream = taosArrayInit(4, POINTER_BYTES);
|
||||
|
||||
size_t size = taosArrayGetSize(pQueryInfo->pUpstream);
|
||||
for(int32_t i = 0; i < size; ++i) {
|
||||
SQueryStmtInfo* pq = taosArrayGet(pQueryInfo->pUpstream, i);
|
||||
SArray* p = createQueryPlanImpl(pq);
|
||||
taosArrayAddBatch(upstream, p->pData, (int32_t) taosArrayGetSize(p));
|
||||
}
|
||||
}
|
||||
|
||||
if (pQueryInfo->numOfTables > 1) { // it is a join query
|
||||
// 1. separate the select clause according to table
|
||||
taosArrayDestroy(upstream);
|
||||
upstream = taosArrayInit(5, POINTER_BYTES);
|
||||
|
||||
for(int32_t i = 0; i < pQueryInfo->numOfTables; ++i) {
|
||||
STableMetaInfo* pTableMetaInfo = pQueryInfo->pTableMetaInfo[i];
|
||||
uint64_t uid = pTableMetaInfo->pTableMeta->uid;
|
||||
|
||||
SArray* exprList = taosArrayInit(4, POINTER_BYTES);
|
||||
if (copyExprInfoList(exprList, pQueryInfo->exprList[0], uid, true) != 0) {
|
||||
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
|
||||
// dropAllExprInfo(exprList);
|
||||
exit(-1);
|
||||
}
|
||||
|
||||
// 2. create the query execution node
|
||||
char name[TSDB_TABLE_FNAME_LEN] = {0};
|
||||
tNameExtractFullName(&pTableMetaInfo->name, name);
|
||||
SQueryTableInfo info = {.tableName = strdup(name), .uid = pTableMetaInfo->pTableMeta->uid,};
|
||||
|
||||
// 3. get the required table column list
|
||||
SArray* tableColumnList = taosArrayInit(4, sizeof(SColumn));
|
||||
columnListCopy(tableColumnList, pQueryInfo->colList, uid);
|
||||
|
||||
// 4. add the projection query node
|
||||
SQueryPlanNode* pNode = doAddTableColumnNode(pQueryInfo, pTableMetaInfo, &info, exprList, tableColumnList);
|
||||
columnListDestroy(tableColumnList);
|
||||
// dropAllExprInfo(exprList);
|
||||
taosArrayPush(upstream, &pNode);
|
||||
}
|
||||
|
||||
// 3. add the join node here
|
||||
SQueryTableInfo info = {0};
|
||||
int32_t num = (int32_t) taosArrayGetSize(pQueryInfo->exprList[0]);
|
||||
SQueryPlanNode* pNode = createQueryNode(QNODE_JOIN, "Join", upstream->pData, pQueryInfo->numOfTables,
|
||||
pQueryInfo->exprList[0]->pData, num, NULL);
|
||||
|
||||
// 4. add the aggregation or projection execution node
|
||||
pNode = doCreateQueryPlanForSingleTableImpl(pQueryInfo, pNode, &info);
|
||||
upstream = taosArrayInit(5, POINTER_BYTES);
|
||||
taosArrayPush(upstream, &pNode);
|
||||
} else { // only one table, normal query process
|
||||
STableMetaInfo* pTableMetaInfo = pQueryInfo->pTableMetaInfo[0];
|
||||
SQueryPlanNode* pNode = doCreateQueryPlanForSingleTable(pQueryInfo, pTableMetaInfo, pQueryInfo->exprList[0], pQueryInfo->colList);
|
||||
upstream = taosArrayInit(5, POINTER_BYTES);
|
||||
taosArrayPush(upstream, &pNode);
|
||||
}
|
||||
|
||||
return upstream;
|
||||
}
|
||||
|
||||
static void doDestroyQueryNode(SQueryPlanNode* pQueryNode) {
|
||||
tfree(pQueryNode->pExtInfo);
|
||||
tfree(pQueryNode->pSchema);
|
||||
tfree(pQueryNode->info.name);
|
||||
// dropAllExprInfo(pQueryNode->pExpr);
|
||||
|
||||
if (pQueryNode->pChildren != NULL) {
|
||||
int32_t size = (int32_t) taosArrayGetSize(pQueryNode->pChildren);
|
||||
for(int32_t i = 0; i < size; ++i) {
|
||||
SQueryPlanNode* p = taosArrayGetP(pQueryNode->pChildren, i);
|
||||
doDestroyQueryNode(p);
|
||||
}
|
||||
|
||||
taosArrayDestroy(pQueryNode->pChildren);
|
||||
}
|
||||
|
||||
tfree(pQueryNode);
|
||||
}
|
||||
|
||||
static int32_t doPrintPlan(char* buf, SQueryPlanNode* pQueryNode, int32_t level, int32_t totalLen) {
|
||||
if (level > 0) {
|
||||
sprintf(buf + totalLen, "%*c", level, ' ');
|
||||
totalLen += level;
|
||||
}
|
||||
|
||||
int32_t len1 = sprintf(buf + totalLen, "%s(", pQueryNode->info.name);
|
||||
int32_t len = len1 + totalLen;
|
||||
|
||||
switch(pQueryNode->info.type) {
|
||||
case QNODE_TABLESCAN: {
|
||||
SQueryTableInfo* pInfo = (SQueryTableInfo*)pQueryNode->pExtInfo;
|
||||
len1 = sprintf(buf + len, "%s #%" PRIu64 ") time_range: %" PRId64 " - %" PRId64, pInfo->tableName, pInfo->uid,
|
||||
pInfo->window.skey, pInfo->window.ekey);
|
||||
assert(len1 > 0);
|
||||
len += len1;
|
||||
|
||||
for (int32_t i = 0; i < pQueryNode->numOfExpr; ++i) {
|
||||
SColumn* pCol = taosArrayGetP(pQueryNode->pExpr, i);
|
||||
len1 = sprintf(buf + len, " [%s #%d] ", pCol->name, pCol->info.colId);
|
||||
|
||||
assert(len1 > 0);
|
||||
len += len1;
|
||||
}
|
||||
|
||||
len1 = sprintf(buf + len, "\n");
|
||||
assert(len1 > 0);
|
||||
|
||||
len += len1;
|
||||
break;
|
||||
}
|
||||
|
||||
case QNODE_PROJECT: {
|
||||
len1 = sprintf(buf + len, "cols:");
|
||||
assert(len1 > 0);
|
||||
len += len1;
|
||||
|
||||
len = printExprInfo(buf, pQueryNode, len);
|
||||
len1 = sprintf(buf + len, ")");
|
||||
len += len1;
|
||||
|
||||
// todo print filter info
|
||||
len1 = sprintf(buf + len, " filters:(nil)\n");
|
||||
len += len1;
|
||||
break;
|
||||
}
|
||||
|
||||
case QNODE_AGGREGATE: {
|
||||
len = printExprInfo(buf, pQueryNode, len);
|
||||
len1 = sprintf(buf + len, ")\n");
|
||||
len += len1;
|
||||
|
||||
break;
|
||||
}
|
||||
|
||||
case QNODE_TIMEWINDOW: {
|
||||
len = printExprInfo(buf, pQueryNode, len);
|
||||
len1 = sprintf(buf + len, ") ");
|
||||
len += len1;
|
||||
|
||||
SInterval* pInterval = pQueryNode->pExtInfo;
|
||||
|
||||
// todo dynamic return the time precision
|
||||
len1 = sprintf(buf + len, "interval:%" PRId64 "(%s), sliding:%" PRId64 "(%s), offset:%" PRId64 "(%s)\n",
|
||||
pInterval->interval, TSDB_TIME_PRECISION_MILLI_STR, pInterval->sliding,
|
||||
TSDB_TIME_PRECISION_MILLI_STR, pInterval->offset, TSDB_TIME_PRECISION_MILLI_STR);
|
||||
len += len1;
|
||||
|
||||
break;
|
||||
}
|
||||
|
||||
case QNODE_STATEWINDOW: {
|
||||
len = printExprInfo(buf, pQueryNode, len);
|
||||
len1 = sprintf(buf + len, ") ");
|
||||
len += len1;
|
||||
|
||||
SColumn* pCol = pQueryNode->pExtInfo;
|
||||
len1 = sprintf(buf + len, "col:%s #%d\n", pCol->name, pCol->info.colId);
|
||||
len += len1;
|
||||
break;
|
||||
}
|
||||
|
||||
case QNODE_SESSIONWINDOW: {
|
||||
len = printExprInfo(buf, pQueryNode, len);
|
||||
|
||||
len1 = sprintf(buf + len, ") ");
|
||||
len += len1;
|
||||
|
||||
struct SSessionWindow* ps = pQueryNode->pExtInfo;
|
||||
len1 = sprintf(buf + len, "col:[%s #%d], gap:%" PRId64 " (ms) \n", ps->col.name, ps->col.info.colId, ps->gap);
|
||||
len += len1;
|
||||
break;
|
||||
}
|
||||
|
||||
case QNODE_GROUPBY: {
|
||||
len = printExprInfo(buf, pQueryNode, len);
|
||||
|
||||
SGroupbyExpr* pGroupbyExpr = pQueryNode->pExtInfo;
|
||||
len1 = sprintf(buf + len, ") groupby_col: ");
|
||||
len += len1;
|
||||
|
||||
for (int32_t i = 0; i < taosArrayGetSize(pGroupbyExpr->columnInfo); ++i) {
|
||||
SColumn* pCol = taosArrayGet(pGroupbyExpr->columnInfo, i);
|
||||
len1 = sprintf(buf + len, "[%s #%d] ", pCol->name, pCol->info.colId);
|
||||
len += len1;
|
||||
}
|
||||
|
||||
len += sprintf(buf + len, "\n");
|
||||
break;
|
||||
}
|
||||
|
||||
case QNODE_FILL: {
|
||||
SFillEssInfo* pEssInfo = pQueryNode->pExtInfo;
|
||||
len1 = sprintf(buf + len, "%d", pEssInfo->fillType);
|
||||
len += len1;
|
||||
|
||||
if (pEssInfo->fillType == TSDB_FILL_SET_VALUE) {
|
||||
len1 = sprintf(buf + len, ", val:");
|
||||
len += len1;
|
||||
|
||||
// todo get the correct fill data type
|
||||
for (int32_t i = 0; i < pQueryNode->numOfExpr; ++i) {
|
||||
len1 = sprintf(buf + len, "%" PRId64, pEssInfo->val[i]);
|
||||
len += len1;
|
||||
|
||||
if (i < pQueryNode->numOfExpr - 1) {
|
||||
len1 = sprintf(buf + len, ", ");
|
||||
len += len1;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
len1 = sprintf(buf + len, ")\n");
|
||||
len += len1;
|
||||
break;
|
||||
}
|
||||
|
||||
case QNODE_LIMIT: {
|
||||
SLimit* pVal = pQueryNode->pExtInfo;
|
||||
len1 = sprintf(buf + len, "limit: %" PRId64 ", offset: %" PRId64 ")\n", pVal->limit, pVal->offset);
|
||||
len += len1;
|
||||
break;
|
||||
}
|
||||
|
||||
case QNODE_DISTINCT:
|
||||
case QNODE_TAGSCAN: {
|
||||
len1 = sprintf(buf + len, "cols: ");
|
||||
len += len1;
|
||||
|
||||
len = printExprInfo(buf, pQueryNode, len);
|
||||
|
||||
len1 = sprintf(buf + len, ")\n");
|
||||
len += len1;
|
||||
|
||||
break;
|
||||
}
|
||||
|
||||
case QNODE_SORT: {
|
||||
len1 = sprintf(buf + len, "cols:");
|
||||
len += len1;
|
||||
|
||||
SArray* pSort = pQueryNode->pExtInfo;
|
||||
for (int32_t i = 0; i < taosArrayGetSize(pSort); ++i) {
|
||||
SOrder* p = taosArrayGet(pSort, i);
|
||||
len1 = sprintf(buf + len, " [%s #%d %s]", p->col.name, p->col.info.colId, p->order == TSDB_ORDER_ASC? "ASC":"DESC");
|
||||
|
||||
len += len1;
|
||||
}
|
||||
|
||||
len1 = sprintf(buf + len, ")\n");
|
||||
len += len1;
|
||||
break;
|
||||
}
|
||||
|
||||
case QNODE_JOIN: {
|
||||
// print join condition
|
||||
len1 = sprintf(buf + len, ")\n");
|
||||
len += len1;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
return len;
|
||||
}
|
||||
|
||||
int32_t printExprInfo(char* buf, const SQueryPlanNode* pQueryNode, int32_t len) {
|
||||
int32_t len1 = 0;
|
||||
|
||||
for (int32_t i = 0; i < pQueryNode->numOfExpr; ++i) {
|
||||
SExprInfo* pExprInfo = taosArrayGetP(pQueryNode->pExpr, i);
|
||||
|
||||
SSqlExpr* pExpr = &pExprInfo->base;
|
||||
len1 = sprintf(buf + len, "%s [%s #%d]", pExpr->token, pExpr->resSchema.name, pExpr->resSchema.colId);
|
||||
assert(len1 > 0);
|
||||
|
||||
len += len1;
|
||||
if (i < pQueryNode->numOfExpr - 1) {
|
||||
len1 = sprintf(buf + len, ", ");
|
||||
len += len1;
|
||||
}
|
||||
}
|
||||
|
||||
return len;
|
||||
}
|
||||
|
||||
int32_t queryPlanToStringImpl(char* buf, SQueryPlanNode* pQueryNode, int32_t level, int32_t totalLen) {
|
||||
int32_t len = doPrintPlan(buf, pQueryNode, level, totalLen);
|
||||
|
||||
for(int32_t i = 0; i < taosArrayGetSize(pQueryNode->pChildren); ++i) {
|
||||
SQueryPlanNode* p1 = taosArrayGetP(pQueryNode->pChildren, i);
|
||||
int32_t len1 = queryPlanToStringImpl(buf, p1, level + 1, len);
|
||||
len = len1;
|
||||
}
|
||||
|
||||
return len;
|
||||
}
|
||||
|
||||
int32_t queryPlanToString(struct SQueryPlanNode* pQueryNode, char** str) {
|
||||
assert(pQueryNode);
|
||||
|
||||
*str = calloc(1, 4096);
|
||||
|
||||
int32_t len = sprintf(*str, "===== logic plan =====\n");
|
||||
queryPlanToStringImpl(*str, pQueryNode, 0, len);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
SQueryPlanNode* queryPlanFromString() {
|
||||
return NULL;
|
||||
}
|
|
@ -16,6 +16,9 @@
|
|||
#include "plannerInt.h"
|
||||
#include "parser.h"
|
||||
|
||||
#define STORE_CURRENT_SUBPLAN(cxt) SSubplan* _ = cxt->pCurrentSubplan
|
||||
#define RECOVERY_CURRENT_SUBPLAN(cxt) cxt->pCurrentSubplan = _
|
||||
|
||||
static const char* gOpName[] = {
|
||||
"Unknown",
|
||||
#define INCLUDE_AS_NAME
|
||||
|
@ -43,8 +46,56 @@ static SPhyNode* initPhyNode(SQueryPlanNode* pPlanNode, int32_t type, int32_t si
|
|||
toDataBlockSchema(pPlanNode, &(node->targetSchema));
|
||||
}
|
||||
|
||||
static SPhyNode* initScanNode(SQueryPlanNode* pPlanNode, SQueryTableInfo* pTable, int32_t type, int32_t size) {
|
||||
SScanPhyNode* node = (SScanPhyNode*)initPhyNode(pPlanNode, type, size);
|
||||
node->uid = pTable->pMeta->pTableMeta->uid;
|
||||
node->tableType = pTable->pMeta->pTableMeta->tableType;
|
||||
return (SPhyNode*)node;
|
||||
}
|
||||
|
||||
static SPhyNode* createPseudoScanNode(SQueryPlanNode* pPlanNode, SQueryTableInfo* pTable, int32_t op) {
|
||||
return initScanNode(pPlanNode, pTable, op, sizeof(SScanPhyNode));
|
||||
}
|
||||
|
||||
static SPhyNode* createTagScanNode(SQueryPlanNode* pPlanNode) {
|
||||
return initPhyNode(pPlanNode, OP_TagScan, sizeof(STagScanPhyNode));
|
||||
SQueryTableInfo* pTable = (SQueryTableInfo*)pPlanNode->pExtInfo;
|
||||
return createPseudoScanNode(pPlanNode, pTable, OP_TagScan);
|
||||
}
|
||||
|
||||
static uint8_t getScanFlag(SQueryPlanNode* pPlanNode, SQueryTableInfo* pTable) {
|
||||
// todo
|
||||
return MASTER_SCAN;
|
||||
}
|
||||
|
||||
static SPhyNode* createUserTableScanNode(SQueryPlanNode* pPlanNode, SQueryTableInfo* pTable, int32_t op) {
|
||||
STableScanPhyNode* node = (STableScanPhyNode*)initScanNode(pPlanNode, pTable, op, sizeof(STableScanPhyNode));
|
||||
node->scanFlag = getScanFlag(pPlanNode, pTable);
|
||||
node->window = pTable->window;
|
||||
// todo tag cond
|
||||
return (SPhyNode*)node;
|
||||
}
|
||||
|
||||
static SPhyNode* createSingleTableScanNode(SQueryPlanNode* pPlanNode, SQueryTableInfo* pTable) {
|
||||
return createUserTableScanNode(pPlanNode, pTable, OP_TableScan);
|
||||
}
|
||||
|
||||
static bool isSystemTable(SQueryTableInfo* pTable) {
|
||||
// todo
|
||||
return false;
|
||||
}
|
||||
|
||||
static bool needSeqScan(SQueryPlanNode* pPlanNode) {
|
||||
// todo
|
||||
return false;
|
||||
}
|
||||
|
||||
static SPhyNode* createMultiTableScanNode(SQueryPlanNode* pPlanNode, SQueryTableInfo* pTable) {
|
||||
if (isSystemTable(pTable)) {
|
||||
return createPseudoScanNode(pPlanNode, pTable, OP_SystemTableScan);
|
||||
} else if (needSeqScan(pPlanNode)) {
|
||||
return createUserTableScanNode(pPlanNode, pTable, OP_TableSeqScan);
|
||||
}
|
||||
return createUserTableScanNode(pPlanNode, pTable, OP_DataBlocksOptScan);
|
||||
}
|
||||
|
||||
static SSubplan* initSubplan(SPlanContext* pCxt, int32_t type) {
|
||||
|
@ -58,53 +109,61 @@ static SSubplan* initSubplan(SPlanContext* pCxt, int32_t type) {
|
|||
if (NULL == pCxt->pCurrentSubplan->pChildern) {
|
||||
pCxt->pCurrentSubplan->pChildern = taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES);
|
||||
}
|
||||
taosArrayPush(pCxt->pCurrentSubplan->pChildern, subplan);
|
||||
taosArrayPush(pCxt->pCurrentSubplan->pChildern, &subplan);
|
||||
subplan->pParents = taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES);
|
||||
taosArrayPush(subplan->pParents, pCxt->pCurrentSubplan);
|
||||
taosArrayPush(subplan->pParents, &pCxt->pCurrentSubplan);
|
||||
}
|
||||
SArray* currentLevel;
|
||||
if (subplan->level >= taosArrayGetSize(pCxt->pDag->pSubplans)) {
|
||||
currentLevel = taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES);
|
||||
taosArrayPush(pCxt->pDag->pSubplans, ¤tLevel);
|
||||
} else {
|
||||
currentLevel = taosArrayGetP(pCxt->pDag->pSubplans, subplan->level);
|
||||
}
|
||||
taosArrayPush(currentLevel, &subplan);
|
||||
pCxt->pCurrentSubplan = subplan;
|
||||
return subplan;
|
||||
}
|
||||
|
||||
static uint8_t getScanFlag(SQueryPlanNode* pPlanNode) {
|
||||
// todo
|
||||
return MASTER_SCAN;
|
||||
}
|
||||
|
||||
static SPhyNode* createTableScanNode(SPlanContext* pCxt, SQueryPlanNode* pPlanNode, SQueryTableInfo* pTable) {
|
||||
STableScanPhyNode* node = (STableScanPhyNode*)initPhyNode(pPlanNode, OP_TableScan, sizeof(STableScanPhyNode));
|
||||
node->scan.uid = pTable->pMeta->pTableMeta->uid;
|
||||
node->scan.tableType = pTable->pMeta->pTableMeta->tableType;
|
||||
node->scanFlag = getScanFlag(pPlanNode);
|
||||
node->window = pTable->window;
|
||||
// todo tag cond
|
||||
}
|
||||
|
||||
static void vgroupToEpSet(const SVgroupMsg* vg, SEpSet* epSet) {
|
||||
// todo
|
||||
epSet->inUse = 0; // todo
|
||||
epSet->numOfEps = vg->numOfEps;
|
||||
for (int8_t i = 0; i < vg->numOfEps; ++i) {
|
||||
epSet->port[i] = vg->epAddr[i].port;
|
||||
strcpy(epSet->fqdn[i], vg->epAddr[i].fqdn);
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
static void splitSubplanBySTable(SPlanContext* pCxt, SQueryPlanNode* pPlanNode, SQueryTableInfo* pTable) {
|
||||
static uint64_t splitSubplanByTable(SPlanContext* pCxt, SQueryPlanNode* pPlanNode, SQueryTableInfo* pTable) {
|
||||
SVgroupsInfo* vgroupList = pTable->pMeta->vgroupList;
|
||||
for (int32_t i = 0; i < pTable->pMeta->vgroupList->numOfVgroups; ++i) {
|
||||
STORE_CURRENT_SUBPLAN(pCxt);
|
||||
SSubplan* subplan = initSubplan(pCxt, QUERY_TYPE_SCAN);
|
||||
vgroupToEpSet(&(pTable->pMeta->vgroupList->vgroups[i]), &subplan->execEpSet);
|
||||
subplan->pNode = createTableScanNode(pCxt, pPlanNode, pTable);
|
||||
// todo reset pCxt->pCurrentSubplan
|
||||
subplan->pNode = createMultiTableScanNode(pPlanNode, pTable);
|
||||
RECOVERY_CURRENT_SUBPLAN(pCxt);
|
||||
}
|
||||
return pCxt->nextId.templateId++;
|
||||
}
|
||||
|
||||
static SPhyNode* createExchangeNode() {
|
||||
|
||||
static SPhyNode* createExchangeNode(SPlanContext* pCxt, SQueryPlanNode* pPlanNode, uint64_t srcTemplateId) {
|
||||
SExchangePhyNode* node = (SExchangePhyNode*)initPhyNode(pPlanNode, OP_Exchange, sizeof(SExchangePhyNode));
|
||||
node->srcTemplateId = srcTemplateId;
|
||||
return (SPhyNode*)node;
|
||||
}
|
||||
|
||||
static SPhyNode* createScanNode(SPlanContext* pCxt, SQueryPlanNode* pPlanNode) {
|
||||
static bool needMultiNodeScan(SQueryTableInfo* pTable) {
|
||||
// todo system table, for instance, user_tables
|
||||
return (TSDB_SUPER_TABLE == pTable->pMeta->pTableMeta->tableType);
|
||||
}
|
||||
|
||||
static SPhyNode* createTableScanNode(SPlanContext* pCxt, SQueryPlanNode* pPlanNode) {
|
||||
SQueryTableInfo* pTable = (SQueryTableInfo*)pPlanNode->pExtInfo;
|
||||
if (TSDB_SUPER_TABLE == pTable->pMeta->pTableMeta->tableType) {
|
||||
splitSubplanBySTable(pCxt, pPlanNode, pTable);
|
||||
return createExchangeNode(pCxt, pTable);
|
||||
if (needMultiNodeScan(pTable)) {
|
||||
return createExchangeNode(pCxt, pPlanNode, splitSubplanByTable(pCxt, pPlanNode, pTable));
|
||||
}
|
||||
return createTableScanNode(pCxt, pPlanNode, pTable);
|
||||
return createSingleTableScanNode(pPlanNode, pTable);
|
||||
}
|
||||
|
||||
static SPhyNode* createPhyNode(SPlanContext* pCxt, SQueryPlanNode* pPlanNode) {
|
||||
|
@ -114,7 +173,7 @@ static SPhyNode* createPhyNode(SPlanContext* pCxt, SQueryPlanNode* pPlanNode) {
|
|||
node = createTagScanNode(pPlanNode);
|
||||
break;
|
||||
case QNODE_TABLESCAN:
|
||||
node = createScanNode(pCxt, pPlanNode);
|
||||
node = createTableScanNode(pCxt, pPlanNode);
|
||||
break;
|
||||
default:
|
||||
assert(false);
|
||||
|
@ -133,6 +192,7 @@ static SPhyNode* createPhyNode(SPlanContext* pCxt, SQueryPlanNode* pPlanNode) {
|
|||
|
||||
static void createSubplanByLevel(SPlanContext* pCxt, SQueryPlanNode* pRoot) {
|
||||
SSubplan* subplan = initSubplan(pCxt, QUERY_TYPE_MERGE);
|
||||
++(pCxt->nextId.templateId);
|
||||
subplan->pNode = createPhyNode(pCxt, pRoot);
|
||||
SArray* l0 = taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES);
|
||||
taosArrayPush(l0, &subplan);
|
||||
|
@ -144,7 +204,8 @@ int32_t createDag(SQueryPlanNode* pQueryNode, struct SCatalog* pCatalog, SQueryD
|
|||
SPlanContext context = {
|
||||
.pCatalog = pCatalog,
|
||||
.pDag = calloc(1, sizeof(SQueryDag)),
|
||||
.pCurrentSubplan = NULL
|
||||
.pCurrentSubplan = NULL,
|
||||
.nextId = {0} // todo queryid
|
||||
};
|
||||
if (NULL == context.pDag) {
|
||||
return TSDB_CODE_TSC_OUT_OF_MEMORY;
|
||||
|
|
|
@ -13,606 +13,31 @@
|
|||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#include "function.h"
|
||||
#include "os.h"
|
||||
#include "parser.h"
|
||||
#include "plannerInt.h"
|
||||
|
||||
typedef struct SFillEssInfo {
|
||||
int32_t fillType; // fill type
|
||||
int64_t *val; // fill value
|
||||
} SFillEssInfo;
|
||||
|
||||
typedef struct SJoinCond {
|
||||
bool tagExists; // denote if tag condition exists or not
|
||||
SColumn *tagCond[2];
|
||||
SColumn *colCond[2];
|
||||
} SJoinCond;
|
||||
|
||||
static SArray* createQueryPlanImpl(SQueryStmtInfo* pQueryInfo);
|
||||
static void doDestroyQueryNode(SQueryPlanNode* pQueryNode);
|
||||
|
||||
int32_t printExprInfo(const char* buf, const SQueryPlanNode* pQueryNode, int32_t len);
|
||||
int32_t optimizeQueryPlan(struct SQueryPlanNode* pQueryNode) {
|
||||
return 0;
|
||||
void qDestroyQueryDag(struct SQueryDag* pDag) {
|
||||
// todo
|
||||
}
|
||||
|
||||
int32_t createQueryPlan(const struct SQueryStmtInfo* pQueryInfo, struct SQueryPlanNode** pQueryNode) {
|
||||
SArray* upstream = createQueryPlanImpl((struct SQueryStmtInfo*) pQueryInfo);
|
||||
assert(taosArrayGetSize(upstream) == 1);
|
||||
|
||||
*pQueryNode = taosArrayGetP(upstream, 0);
|
||||
|
||||
taosArrayDestroy(upstream);
|
||||
int32_t qCreateQueryDag(const struct SQueryStmtInfo* pQueryInfo, struct SEpSet* pQnode, struct SQueryDag** pDag) {
|
||||
SQueryPlanNode* logicPlan;
|
||||
int32_t code = createQueryPlan(pQueryInfo, &logicPlan);
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
destroyQueryPlan(logicPlan);
|
||||
return code;
|
||||
}
|
||||
code = optimizeQueryPlan(logicPlan);
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
destroyQueryPlan(logicPlan);
|
||||
return code;
|
||||
}
|
||||
code = createDag(logicPlan, NULL, pDag);
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
destroyQueryPlan(logicPlan);
|
||||
qDestroyQueryDag(*pDag);
|
||||
return code;
|
||||
}
|
||||
destroyQueryPlan(logicPlan);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t queryPlanToSql(struct SQueryPlanNode* pQueryNode, char** sql) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t qCreatePhysicalPlan(struct SQueryPlanNode* pQueryNode, struct SEpSet* pQnode, struct SQueryDag** pDag) {
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t phyPlanToString(struct SPhyNode *pPhyNode, char** str) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
void* destroyQueryPlan(SQueryPlanNode* pQueryNode) {
|
||||
if (pQueryNode == NULL) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
doDestroyQueryNode(pQueryNode);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
void* destroyQueryPhyPlan(struct SPhyNode* pQueryPhyNode) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
//======================================================================================================================
|
||||
|
||||
static SQueryPlanNode* createQueryNode(int32_t type, const char* name, SQueryPlanNode** prev, int32_t numOfPrev,
|
||||
SExprInfo** pExpr, int32_t numOfOutput, void* pExtInfo) {
|
||||
SQueryPlanNode* pNode = calloc(1, sizeof(SQueryPlanNode));
|
||||
|
||||
pNode->info.type = type;
|
||||
pNode->info.name = strdup(name);
|
||||
|
||||
pNode->numOfExpr = numOfOutput;
|
||||
pNode->pExpr = taosArrayInit(numOfOutput, POINTER_BYTES);
|
||||
|
||||
for(int32_t i = 0; i < numOfOutput; ++i) {
|
||||
taosArrayPush(pNode->pExpr, &pExpr[i]);
|
||||
}
|
||||
|
||||
pNode->pChildren = taosArrayInit(4, POINTER_BYTES);
|
||||
for(int32_t i = 0; i < numOfPrev; ++i) {
|
||||
taosArrayPush(pNode->pChildren, &prev[i]);
|
||||
}
|
||||
|
||||
switch(type) {
|
||||
case QNODE_TAGSCAN:
|
||||
case QNODE_TABLESCAN: {
|
||||
SQueryTableInfo* info = calloc(1, sizeof(SQueryTableInfo));
|
||||
memcpy(info, pExtInfo, sizeof(SQueryTableInfo));
|
||||
info->tableName = strdup(((SQueryTableInfo*) pExtInfo)->tableName);
|
||||
pNode->pExtInfo = info;
|
||||
break;
|
||||
}
|
||||
|
||||
case QNODE_TIMEWINDOW: {
|
||||
SInterval* pInterval = calloc(1, sizeof(SInterval));
|
||||
pNode->pExtInfo = pInterval;
|
||||
memcpy(pInterval, pExtInfo, sizeof(SInterval));
|
||||
break;
|
||||
}
|
||||
|
||||
case QNODE_STATEWINDOW: {
|
||||
SColumn* psw = calloc(1, sizeof(SColumn));
|
||||
pNode->pExtInfo = psw;
|
||||
memcpy(psw, pExtInfo, sizeof(SColumn));
|
||||
break;
|
||||
}
|
||||
|
||||
case QNODE_SESSIONWINDOW: {
|
||||
SSessionWindow *pSessionWindow = calloc(1, sizeof(SSessionWindow));
|
||||
pNode->pExtInfo = pSessionWindow;
|
||||
memcpy(pSessionWindow, pExtInfo, sizeof(struct SSessionWindow));
|
||||
break;
|
||||
}
|
||||
|
||||
case QNODE_GROUPBY: {
|
||||
SGroupbyExpr* p = (SGroupbyExpr*) pExtInfo;
|
||||
|
||||
SGroupbyExpr* pGroupbyExpr = calloc(1, sizeof(SGroupbyExpr));
|
||||
pGroupbyExpr->groupbyTag = p->groupbyTag;
|
||||
pGroupbyExpr->columnInfo = taosArrayDup(p->columnInfo);
|
||||
|
||||
pNode->pExtInfo = pGroupbyExpr;
|
||||
break;
|
||||
}
|
||||
|
||||
case QNODE_FILL: { // todo !!
|
||||
pNode->pExtInfo = pExtInfo;
|
||||
break;
|
||||
}
|
||||
|
||||
case QNODE_LIMIT: {
|
||||
pNode->pExtInfo = calloc(1, sizeof(SLimit));
|
||||
memcpy(pNode->pExtInfo, pExtInfo, sizeof(SLimit));
|
||||
break;
|
||||
}
|
||||
|
||||
case QNODE_SORT: {
|
||||
pNode->pExtInfo = taosArrayDup(pExtInfo);
|
||||
break;
|
||||
}
|
||||
|
||||
default:
|
||||
break;
|
||||
}
|
||||
|
||||
return pNode;
|
||||
}
|
||||
|
||||
static SQueryPlanNode* doAddTableColumnNode(SQueryStmtInfo* pQueryInfo, STableMetaInfo* pTableMetaInfo, SQueryTableInfo* info,
|
||||
SArray* pExprs, SArray* tableCols) {
|
||||
if (pQueryInfo->info.onlyTagQuery) {
|
||||
int32_t num = (int32_t) taosArrayGetSize(pExprs);
|
||||
SQueryPlanNode* pNode = createQueryNode(QNODE_TAGSCAN, "TableTagScan", NULL, 0, pExprs->pData, num, info);
|
||||
|
||||
if (pQueryInfo->info.distinct) {
|
||||
pNode = createQueryNode(QNODE_DISTINCT, "Distinct", &pNode, 1, pExprs->pData, num, NULL);
|
||||
}
|
||||
|
||||
return pNode;
|
||||
}
|
||||
|
||||
SQueryPlanNode* pNode = createQueryNode(QNODE_TABLESCAN, "TableScan", NULL, 0, NULL, 0, info);
|
||||
|
||||
if (pQueryInfo->info.projectionQuery) {
|
||||
int32_t numOfOutput = (int32_t) taosArrayGetSize(pExprs);
|
||||
pNode = createQueryNode(QNODE_PROJECT, "Projection", &pNode, 1, pExprs->pData, numOfOutput, NULL);
|
||||
} else {
|
||||
STableMetaInfo* pTableMetaInfo1 = getMetaInfo(pQueryInfo, 0);
|
||||
|
||||
// table source column projection, generate the projection expr
|
||||
int32_t numOfCols = (int32_t) taosArrayGetSize(tableCols);
|
||||
SExprInfo** pExpr = calloc(numOfCols, POINTER_BYTES);
|
||||
for (int32_t i = 0; i < numOfCols; ++i) {
|
||||
SColumn* pCol = taosArrayGetP(tableCols, i);
|
||||
|
||||
SSourceParam param = {0};
|
||||
addIntoSourceParam(¶m, NULL, pCol);
|
||||
SSchema s = createSchema(pCol->info.type, pCol->info.bytes, pCol->info.colId, pCol->name);
|
||||
SExprInfo* p = createExprInfo(pTableMetaInfo1, "project", ¶m, &s, 0);
|
||||
pExpr[i] = p;
|
||||
}
|
||||
|
||||
pNode = createQueryNode(QNODE_PROJECT, "Projection", &pNode, 1, pExpr, numOfCols, NULL);
|
||||
tfree(pExpr);
|
||||
}
|
||||
|
||||
return pNode;
|
||||
}
|
||||
|
||||
static SQueryPlanNode* doCreateQueryPlanForSingleTableImpl(SQueryStmtInfo* pQueryInfo, SQueryPlanNode* pNode, SQueryTableInfo* info) {
|
||||
// group by column not by tag
|
||||
size_t numOfGroupCols = taosArrayGetSize(pQueryInfo->groupbyExpr.columnInfo);
|
||||
|
||||
// check for aggregation
|
||||
int32_t level = getExprFunctionLevel(pQueryInfo);
|
||||
|
||||
for(int32_t i = level - 1; i >= 0; --i) {
|
||||
SArray* p = pQueryInfo->exprList[i];
|
||||
size_t num = taosArrayGetSize(p);
|
||||
|
||||
bool aggregateFunc = false;
|
||||
for(int32_t j = 0; j < num; ++j) {
|
||||
SExprInfo* pExpr = (SExprInfo*)taosArrayGetP(p, 0);
|
||||
if (pExpr->pExpr->nodeType != TEXPR_FUNCTION_NODE) {
|
||||
continue;
|
||||
}
|
||||
|
||||
aggregateFunc = qIsAggregateFunction(pExpr->pExpr->_function.functionName);
|
||||
if (aggregateFunc) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (aggregateFunc) {
|
||||
if (pQueryInfo->interval.interval > 0) {
|
||||
pNode = createQueryNode(QNODE_TIMEWINDOW, "TimeWindowAgg", &pNode, 1, p->pData, num, &pQueryInfo->interval);
|
||||
} else if (pQueryInfo->sessionWindow.gap > 0) {
|
||||
pNode = createQueryNode(QNODE_SESSIONWINDOW, "SessionWindowAgg", &pNode, 1, p->pData, num, &pQueryInfo->sessionWindow);
|
||||
} else if (pQueryInfo->stateWindow.col.info.colId > 0) {
|
||||
pNode = createQueryNode(QNODE_STATEWINDOW, "StateWindowAgg", &pNode, 1, p->pData, num, &pQueryInfo->stateWindow);
|
||||
} else if (numOfGroupCols != 0 && !pQueryInfo->groupbyExpr.groupbyTag) {
|
||||
pNode = createQueryNode(QNODE_GROUPBY, "Groupby", &pNode, 1, p->pData, num, &pQueryInfo->groupbyExpr);
|
||||
} else {
|
||||
pNode = createQueryNode(QNODE_AGGREGATE, "Aggregate", &pNode, 1, p->pData, num, NULL);
|
||||
}
|
||||
} else {
|
||||
pNode = createQueryNode(QNODE_PROJECT, "Projection", &pNode, 1, p->pData, num, NULL);
|
||||
}
|
||||
}
|
||||
|
||||
if (pQueryInfo->havingFieldNum > 0) {
|
||||
// int32_t numOfExpr = (int32_t)taosArrayGetSize(pQueryInfo->exprList1);
|
||||
// pNode = createQueryNode(QNODE_PROJECT, "Projection", &pNode, 1, pQueryInfo->exprList1->pData, numOfExpr, NULL);
|
||||
}
|
||||
|
||||
if (pQueryInfo->fillType != TSDB_FILL_NONE) {
|
||||
SFillEssInfo* pInfo = calloc(1, sizeof(SFillEssInfo));
|
||||
pInfo->fillType = pQueryInfo->fillType;
|
||||
pInfo->val = calloc(pNode->numOfExpr, sizeof(int64_t));
|
||||
memcpy(pInfo->val, pQueryInfo->fillVal, pNode->numOfExpr);
|
||||
|
||||
SArray* p = pQueryInfo->exprList[0]; // top expression in select clause
|
||||
pNode = createQueryNode(QNODE_FILL, "Fill", &pNode, 1, p, taosArrayGetSize(p), pInfo);
|
||||
}
|
||||
|
||||
if (pQueryInfo->order != NULL) {
|
||||
SArray* pList = pQueryInfo->exprList[0];
|
||||
pNode = createQueryNode(QNODE_SORT, "Sort", &pNode, 1, pList->pData, taosArrayGetSize(pList), pQueryInfo->order);
|
||||
}
|
||||
|
||||
if (pQueryInfo->limit.limit != -1 || pQueryInfo->limit.offset != 0) {
|
||||
pNode = createQueryNode(QNODE_LIMIT, "Limit", &pNode, 1, NULL, 0, &pQueryInfo->limit);
|
||||
}
|
||||
|
||||
return pNode;
|
||||
}
|
||||
|
||||
static SQueryPlanNode* doCreateQueryPlanForSingleTable(SQueryStmtInfo* pQueryInfo, STableMetaInfo* pTableMetaInfo, SArray* pExprs,
|
||||
SArray* tableCols) {
|
||||
char name[TSDB_TABLE_FNAME_LEN] = {0};
|
||||
tstrncpy(name, pTableMetaInfo->name.tname, TSDB_TABLE_FNAME_LEN);
|
||||
|
||||
SQueryTableInfo info = {.tableName = strdup(name), .uid = pTableMetaInfo->pTableMeta->uid,};
|
||||
|
||||
// handle the only tag query
|
||||
SQueryPlanNode* pNode = doAddTableColumnNode(pQueryInfo, pTableMetaInfo, &info, pExprs, tableCols);
|
||||
if (pQueryInfo->info.onlyTagQuery) {
|
||||
tfree(info.tableName);
|
||||
return pNode;
|
||||
}
|
||||
|
||||
SQueryPlanNode* pNode1 = doCreateQueryPlanForSingleTableImpl(pQueryInfo, pNode, &info);
|
||||
tfree(info.tableName);
|
||||
return pNode1;
|
||||
}
|
||||
|
||||
static bool isAllAggExpr(SArray* pList) {
|
||||
assert(pList != NULL);
|
||||
|
||||
for (int32_t k = 0; k < taosArrayGetSize(pList); ++k) {
|
||||
SExprInfo* p = taosArrayGetP(pList, k);
|
||||
if (p->pExpr->nodeType != TEXPR_FUNCTION_NODE || !qIsAggregateFunction(p->pExpr->_function.functionName)) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
SArray* createQueryPlanImpl(SQueryStmtInfo* pQueryInfo) {
|
||||
SArray* upstream = NULL;
|
||||
|
||||
if (pQueryInfo->pUpstream != NULL && taosArrayGetSize(pQueryInfo->pUpstream) > 0) { // subquery in the from clause
|
||||
upstream = taosArrayInit(4, POINTER_BYTES);
|
||||
|
||||
size_t size = taosArrayGetSize(pQueryInfo->pUpstream);
|
||||
for(int32_t i = 0; i < size; ++i) {
|
||||
SQueryStmtInfo* pq = taosArrayGet(pQueryInfo->pUpstream, i);
|
||||
SArray* p = createQueryPlanImpl(pq);
|
||||
taosArrayAddBatch(upstream, p->pData, (int32_t) taosArrayGetSize(p));
|
||||
}
|
||||
}
|
||||
|
||||
if (pQueryInfo->numOfTables > 1) { // it is a join query
|
||||
// 1. separate the select clause according to table
|
||||
taosArrayDestroy(upstream);
|
||||
upstream = taosArrayInit(5, POINTER_BYTES);
|
||||
|
||||
for(int32_t i = 0; i < pQueryInfo->numOfTables; ++i) {
|
||||
STableMetaInfo* pTableMetaInfo = pQueryInfo->pTableMetaInfo[i];
|
||||
uint64_t uid = pTableMetaInfo->pTableMeta->uid;
|
||||
|
||||
SArray* exprList = taosArrayInit(4, POINTER_BYTES);
|
||||
if (copyExprInfoList(exprList, pQueryInfo->exprList[0], uid, true) != 0) {
|
||||
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
|
||||
// dropAllExprInfo(exprList);
|
||||
exit(-1);
|
||||
}
|
||||
|
||||
// 2. create the query execution node
|
||||
char name[TSDB_TABLE_FNAME_LEN] = {0};
|
||||
tNameExtractFullName(&pTableMetaInfo->name, name);
|
||||
SQueryTableInfo info = {.tableName = strdup(name), .uid = pTableMetaInfo->pTableMeta->uid,};
|
||||
|
||||
// 3. get the required table column list
|
||||
SArray* tableColumnList = taosArrayInit(4, sizeof(SColumn));
|
||||
columnListCopy(tableColumnList, pQueryInfo->colList, uid);
|
||||
|
||||
// 4. add the projection query node
|
||||
SQueryPlanNode* pNode = doAddTableColumnNode(pQueryInfo, pTableMetaInfo, &info, exprList, tableColumnList);
|
||||
columnListDestroy(tableColumnList);
|
||||
// dropAllExprInfo(exprList);
|
||||
taosArrayPush(upstream, &pNode);
|
||||
}
|
||||
|
||||
// 3. add the join node here
|
||||
SQueryTableInfo info = {0};
|
||||
int32_t num = (int32_t) taosArrayGetSize(pQueryInfo->exprList[0]);
|
||||
SQueryPlanNode* pNode = createQueryNode(QNODE_JOIN, "Join", upstream->pData, pQueryInfo->numOfTables,
|
||||
pQueryInfo->exprList[0]->pData, num, NULL);
|
||||
|
||||
// 4. add the aggregation or projection execution node
|
||||
pNode = doCreateQueryPlanForSingleTableImpl(pQueryInfo, pNode, &info);
|
||||
upstream = taosArrayInit(5, POINTER_BYTES);
|
||||
taosArrayPush(upstream, &pNode);
|
||||
} else { // only one table, normal query process
|
||||
STableMetaInfo* pTableMetaInfo = pQueryInfo->pTableMetaInfo[0];
|
||||
SQueryPlanNode* pNode = doCreateQueryPlanForSingleTable(pQueryInfo, pTableMetaInfo, pQueryInfo->exprList[0], pQueryInfo->colList);
|
||||
upstream = taosArrayInit(5, POINTER_BYTES);
|
||||
taosArrayPush(upstream, &pNode);
|
||||
}
|
||||
|
||||
return upstream;
|
||||
}
|
||||
|
||||
static void doDestroyQueryNode(SQueryPlanNode* pQueryNode) {
|
||||
tfree(pQueryNode->pExtInfo);
|
||||
tfree(pQueryNode->pSchema);
|
||||
tfree(pQueryNode->info.name);
|
||||
// dropAllExprInfo(pQueryNode->pExpr);
|
||||
|
||||
if (pQueryNode->pChildren != NULL) {
|
||||
int32_t size = (int32_t) taosArrayGetSize(pQueryNode->pChildren);
|
||||
for(int32_t i = 0; i < size; ++i) {
|
||||
SQueryPlanNode* p = taosArrayGetP(pQueryNode->pChildren, i);
|
||||
doDestroyQueryNode(p);
|
||||
}
|
||||
|
||||
taosArrayDestroy(pQueryNode->pChildren);
|
||||
}
|
||||
|
||||
tfree(pQueryNode);
|
||||
}
|
||||
|
||||
static int32_t doPrintPlan(char* buf, SQueryPlanNode* pQueryNode, int32_t level, int32_t totalLen) {
|
||||
if (level > 0) {
|
||||
sprintf(buf + totalLen, "%*c", level, ' ');
|
||||
totalLen += level;
|
||||
}
|
||||
|
||||
int32_t len1 = sprintf(buf + totalLen, "%s(", pQueryNode->info.name);
|
||||
int32_t len = len1 + totalLen;
|
||||
|
||||
switch(pQueryNode->info.type) {
|
||||
case QNODE_TABLESCAN: {
|
||||
SQueryTableInfo* pInfo = (SQueryTableInfo*)pQueryNode->pExtInfo;
|
||||
len1 = sprintf(buf + len, "%s #%" PRIu64 ") time_range: %" PRId64 " - %" PRId64, pInfo->tableName, pInfo->uid,
|
||||
pInfo->window.skey, pInfo->window.ekey);
|
||||
assert(len1 > 0);
|
||||
len += len1;
|
||||
|
||||
for (int32_t i = 0; i < pQueryNode->numOfExpr; ++i) {
|
||||
SColumn* pCol = taosArrayGetP(pQueryNode->pExpr, i);
|
||||
len1 = sprintf(buf + len, " [%s #%d] ", pCol->name, pCol->info.colId);
|
||||
|
||||
assert(len1 > 0);
|
||||
len += len1;
|
||||
}
|
||||
|
||||
len1 = sprintf(buf + len, "\n");
|
||||
assert(len1 > 0);
|
||||
|
||||
len += len1;
|
||||
break;
|
||||
}
|
||||
|
||||
case QNODE_PROJECT: {
|
||||
len1 = sprintf(buf + len, "cols:");
|
||||
assert(len1 > 0);
|
||||
len += len1;
|
||||
|
||||
len = printExprInfo(buf, pQueryNode, len);
|
||||
len1 = sprintf(buf + len, ")");
|
||||
len += len1;
|
||||
|
||||
// todo print filter info
|
||||
len1 = sprintf(buf + len, " filters:(nil)\n");
|
||||
len += len1;
|
||||
break;
|
||||
}
|
||||
|
||||
case QNODE_AGGREGATE: {
|
||||
len = printExprInfo(buf, pQueryNode, len);
|
||||
len1 = sprintf(buf + len, ")\n");
|
||||
len += len1;
|
||||
|
||||
break;
|
||||
}
|
||||
|
||||
case QNODE_TIMEWINDOW: {
|
||||
len = printExprInfo(buf, pQueryNode, len);
|
||||
len1 = sprintf(buf + len, ") ");
|
||||
len += len1;
|
||||
|
||||
SInterval* pInterval = pQueryNode->pExtInfo;
|
||||
|
||||
// todo dynamic return the time precision
|
||||
len1 = sprintf(buf + len, "interval:%" PRId64 "(%s), sliding:%" PRId64 "(%s), offset:%" PRId64 "(%s)\n",
|
||||
pInterval->interval, TSDB_TIME_PRECISION_MILLI_STR, pInterval->sliding,
|
||||
TSDB_TIME_PRECISION_MILLI_STR, pInterval->offset, TSDB_TIME_PRECISION_MILLI_STR);
|
||||
len += len1;
|
||||
|
||||
break;
|
||||
}
|
||||
|
||||
case QNODE_STATEWINDOW: {
|
||||
len = printExprInfo(buf, pQueryNode, len);
|
||||
len1 = sprintf(buf + len, ") ");
|
||||
len += len1;
|
||||
|
||||
SColumn* pCol = pQueryNode->pExtInfo;
|
||||
len1 = sprintf(buf + len, "col:%s #%d\n", pCol->name, pCol->info.colId);
|
||||
len += len1;
|
||||
break;
|
||||
}
|
||||
|
||||
case QNODE_SESSIONWINDOW: {
|
||||
len = printExprInfo(buf, pQueryNode, len);
|
||||
|
||||
len1 = sprintf(buf + len, ") ");
|
||||
len += len1;
|
||||
|
||||
struct SSessionWindow* ps = pQueryNode->pExtInfo;
|
||||
len1 = sprintf(buf + len, "col:[%s #%d], gap:%" PRId64 " (ms) \n", ps->col.name, ps->col.info.colId, ps->gap);
|
||||
len += len1;
|
||||
break;
|
||||
}
|
||||
|
||||
case QNODE_GROUPBY: {
|
||||
len = printExprInfo(buf, pQueryNode, len);
|
||||
|
||||
SGroupbyExpr* pGroupbyExpr = pQueryNode->pExtInfo;
|
||||
len1 = sprintf(buf + len, ") groupby_col: ");
|
||||
len += len1;
|
||||
|
||||
for (int32_t i = 0; i < taosArrayGetSize(pGroupbyExpr->columnInfo); ++i) {
|
||||
SColumn* pCol = taosArrayGet(pGroupbyExpr->columnInfo, i);
|
||||
len1 = sprintf(buf + len, "[%s #%d] ", pCol->name, pCol->info.colId);
|
||||
len += len1;
|
||||
}
|
||||
|
||||
len += sprintf(buf + len, "\n");
|
||||
break;
|
||||
}
|
||||
|
||||
case QNODE_FILL: {
|
||||
SFillEssInfo* pEssInfo = pQueryNode->pExtInfo;
|
||||
len1 = sprintf(buf + len, "%d", pEssInfo->fillType);
|
||||
len += len1;
|
||||
|
||||
if (pEssInfo->fillType == TSDB_FILL_SET_VALUE) {
|
||||
len1 = sprintf(buf + len, ", val:");
|
||||
len += len1;
|
||||
|
||||
// todo get the correct fill data type
|
||||
for (int32_t i = 0; i < pQueryNode->numOfExpr; ++i) {
|
||||
len1 = sprintf(buf + len, "%" PRId64, pEssInfo->val[i]);
|
||||
len += len1;
|
||||
|
||||
if (i < pQueryNode->numOfExpr - 1) {
|
||||
len1 = sprintf(buf + len, ", ");
|
||||
len += len1;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
len1 = sprintf(buf + len, ")\n");
|
||||
len += len1;
|
||||
break;
|
||||
}
|
||||
|
||||
case QNODE_LIMIT: {
|
||||
SLimit* pVal = pQueryNode->pExtInfo;
|
||||
len1 = sprintf(buf + len, "limit: %" PRId64 ", offset: %" PRId64 ")\n", pVal->limit, pVal->offset);
|
||||
len += len1;
|
||||
break;
|
||||
}
|
||||
|
||||
case QNODE_DISTINCT:
|
||||
case QNODE_TAGSCAN: {
|
||||
len1 = sprintf(buf + len, "cols: ");
|
||||
len += len1;
|
||||
|
||||
len = printExprInfo(buf, pQueryNode, len);
|
||||
|
||||
len1 = sprintf(buf + len, ")\n");
|
||||
len += len1;
|
||||
|
||||
break;
|
||||
}
|
||||
|
||||
case QNODE_SORT: {
|
||||
len1 = sprintf(buf + len, "cols:");
|
||||
len += len1;
|
||||
|
||||
SArray* pSort = pQueryNode->pExtInfo;
|
||||
for (int32_t i = 0; i < taosArrayGetSize(pSort); ++i) {
|
||||
SOrder* p = taosArrayGet(pSort, i);
|
||||
len1 = sprintf(buf + len, " [%s #%d %s]", p->col.name, p->col.info.colId, p->order == TSDB_ORDER_ASC? "ASC":"DESC");
|
||||
|
||||
len += len1;
|
||||
}
|
||||
|
||||
len1 = sprintf(buf + len, ")\n");
|
||||
len += len1;
|
||||
break;
|
||||
}
|
||||
|
||||
case QNODE_JOIN: {
|
||||
// print join condition
|
||||
len1 = sprintf(buf + len, ")\n");
|
||||
len += len1;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
return len;
|
||||
}
|
||||
|
||||
int32_t printExprInfo(const char* buf, const SQueryPlanNode* pQueryNode, int32_t len) {
|
||||
int32_t len1 = 0;
|
||||
|
||||
for (int32_t i = 0; i < pQueryNode->numOfExpr; ++i) {
|
||||
SExprInfo* pExprInfo = taosArrayGetP(pQueryNode->pExpr, i);
|
||||
|
||||
SSqlExpr* pExpr = &pExprInfo->base;
|
||||
len1 = sprintf(buf + len, "%s [%s #%d]", pExpr->token, pExpr->resSchema.name, pExpr->resSchema.colId);
|
||||
assert(len1 > 0);
|
||||
|
||||
len += len1;
|
||||
if (i < pQueryNode->numOfExpr - 1) {
|
||||
len1 = sprintf(buf + len, ", ");
|
||||
len += len1;
|
||||
}
|
||||
}
|
||||
|
||||
return len;
|
||||
}
|
||||
|
||||
int32_t queryPlanToStringImpl(char* buf, SQueryPlanNode* pQueryNode, int32_t level, int32_t totalLen) {
|
||||
int32_t len = doPrintPlan(buf, pQueryNode, level, totalLen);
|
||||
|
||||
for(int32_t i = 0; i < taosArrayGetSize(pQueryNode->pChildren); ++i) {
|
||||
SQueryPlanNode* p1 = taosArrayGetP(pQueryNode->pChildren, i);
|
||||
int32_t len1 = queryPlanToStringImpl(buf, p1, level + 1, len);
|
||||
len = len1;
|
||||
}
|
||||
|
||||
return len;
|
||||
}
|
||||
|
||||
int32_t queryPlanToString(struct SQueryPlanNode* pQueryNode, char** str) {
|
||||
assert(pQueryNode);
|
||||
|
||||
*str = calloc(1, 4096);
|
||||
|
||||
int32_t len = sprintf(*str, "===== logic plan =====\n");
|
||||
queryPlanToStringImpl(*str, pQueryNode, 0, len);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
SQueryPlanNode* queryPlanFromString() {
|
||||
return NULL;
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue