TD-12035 Scan subquery physical plan code.
This commit is contained in:
parent
e6d2e2e4d1
commit
c84b6d09e4
|
@ -132,13 +132,15 @@ struct SInsertStmtInfo;
|
|||
bool qIsInsertSql(const char* pStr, size_t length);
|
||||
|
||||
typedef struct SParseContext {
|
||||
const char* pAcctId;
|
||||
const char* pDbname;
|
||||
void *pRpc;
|
||||
const char* pClusterId;
|
||||
const SEpSet* pEpSet;
|
||||
int64_t id; // query id, generated by uuid generator
|
||||
int8_t schemaAttached; // denote if submit block is built with table schema or not
|
||||
const char* pSql; // sql string
|
||||
size_t sqlLen; // length of the sql string
|
||||
int64_t id; // operator id, generated by uuid generator
|
||||
const char* pDbname;
|
||||
const SEpSet* pEpSet;
|
||||
int8_t schemaAttached; // denote if submit block is built with table schema or not
|
||||
|
||||
char* pMsg; // extended error message if exists to help avoid the problem in sql statement.
|
||||
int32_t msgLen; // max length of the msg
|
||||
} SParseContext;
|
||||
|
|
|
@ -20,51 +20,92 @@
|
|||
extern "C" {
|
||||
#endif
|
||||
|
||||
#include "taosmsg.h"
|
||||
|
||||
#define QUERY_TYPE_MERGE 1
|
||||
#define QUERY_TYPE_PARTIAL 2
|
||||
#define QUERY_TYPE_SCAN 3
|
||||
|
||||
enum OPERATOR_TYPE_E {
|
||||
OP_TableScan = 1,
|
||||
OP_DataBlocksOptScan = 2,
|
||||
OP_TableSeqScan = 3,
|
||||
OP_TagScan = 4,
|
||||
OP_TableBlockInfoScan= 5,
|
||||
OP_Aggregate = 6,
|
||||
OP_Project = 7,
|
||||
OP_Groupby = 8,
|
||||
OP_Limit = 9,
|
||||
OP_SLimit = 10,
|
||||
OP_TimeWindow = 11,
|
||||
OP_SessionWindow = 12,
|
||||
OP_StateWindow = 22,
|
||||
OP_Fill = 13,
|
||||
OP_MultiTableAggregate = 14,
|
||||
OP_MultiTableTimeInterval = 15,
|
||||
// OP_DummyInput = 16, //TODO remove it after fully refactor.
|
||||
// OP_MultiwayMergeSort = 17, // multi-way data merge into one input stream.
|
||||
// OP_GlobalAggregate = 18, // global merge for the multi-way data sources.
|
||||
OP_Filter = 19,
|
||||
OP_Distinct = 20,
|
||||
OP_Join = 21,
|
||||
OP_AllTimeWindow = 23,
|
||||
OP_AllMultiTableTimeInterval = 24,
|
||||
OP_Order = 25,
|
||||
OP_Exchange = 26,
|
||||
OP_Unknown,
|
||||
#define INCLUDE_AS_ENUM
|
||||
#include "plannerOp.h"
|
||||
#undef INCLUDE_AS_ENUM
|
||||
OP_TotalNum
|
||||
};
|
||||
|
||||
struct SEpSet;
|
||||
struct SPhyNode;
|
||||
struct SQueryStmtInfo;
|
||||
|
||||
typedef SSchema SSlotSchema;
|
||||
|
||||
typedef struct SDataBlockSchema {
|
||||
SSlotSchema *pSchema;
|
||||
int32_t numOfCols; // number of columns
|
||||
} SDataBlockSchema;
|
||||
|
||||
typedef struct SQueryNodeBasicInfo {
|
||||
int32_t type; // operator type
|
||||
const char *name; // operator name
|
||||
} SQueryNodeBasicInfo;
|
||||
|
||||
typedef struct SPhyNode {
|
||||
SQueryNodeBasicInfo info;
|
||||
SArray *pTargets; // target list to be computed or scanned at this node
|
||||
SArray *pConditions; // implicitly-ANDed qual conditions
|
||||
SDataBlockSchema targetSchema;
|
||||
// children plan to generated result for current node to process
|
||||
// in case of join, multiple plan nodes exist.
|
||||
SArray *pChildren;
|
||||
struct SPhyNode *pParent;
|
||||
} SPhyNode;
|
||||
|
||||
typedef struct SScanPhyNode {
|
||||
SPhyNode node;
|
||||
uint64_t uid; // unique id of the table
|
||||
int8_t tableType;
|
||||
} SScanPhyNode;
|
||||
|
||||
typedef SScanPhyNode SSystemTableScanPhyNode;
|
||||
typedef SScanPhyNode STagScanPhyNode;
|
||||
|
||||
typedef struct STableScanPhyNode {
|
||||
SScanPhyNode scan;
|
||||
uint8_t scanFlag; // denotes reversed scan of data or not
|
||||
STimeWindow window;
|
||||
SArray *pTagsConditions; // implicitly-ANDed tag qual conditions
|
||||
} STableScanPhyNode;
|
||||
|
||||
typedef STableScanPhyNode STableSeqScanPhyNode;
|
||||
|
||||
typedef struct SProjectPhyNode {
|
||||
SPhyNode node;
|
||||
} SProjectPhyNode;
|
||||
|
||||
typedef struct SExchangePhyNode {
|
||||
SPhyNode node;
|
||||
uint64_t templateId;
|
||||
SArray *pSourceEpSet; // SEpSet
|
||||
} SExchangePhyNode;
|
||||
|
||||
typedef struct SSubplanId {
|
||||
uint64_t queryId;
|
||||
uint64_t templateId;
|
||||
uint64_t subplanId;
|
||||
} SSubplanId;
|
||||
|
||||
typedef struct SSubplan {
|
||||
int32_t type; // QUERY_TYPE_MERGE|QUERY_TYPE_PARTIAL|QUERY_TYPE_SCAN
|
||||
SArray *pDatasource; // the datasource subplan,from which to fetch the result
|
||||
struct SPhyNode *pNode; // physical plan of current subplan
|
||||
SSubplanId id; // unique id of the subplan
|
||||
int32_t type; // QUERY_TYPE_MERGE|QUERY_TYPE_PARTIAL|QUERY_TYPE_SCAN
|
||||
int32_t level; // the execution level of current subplan, starting from 0.
|
||||
SEpSet execEpSet; // for the scan sub plan, the optional execution node
|
||||
SArray *pChildern; // the datasource subplan,from which to fetch the result
|
||||
SArray *pParents; // the data destination subplan, get data from current subplan
|
||||
SPhyNode *pNode; // physical plan of current subplan
|
||||
} SSubplan;
|
||||
|
||||
typedef struct SQueryDag {
|
||||
SArray **pSubplans;
|
||||
SArray *pSubplans; // Element is SArray*, and nested element is SSubplan. The execution level of subplan, starting from 0.
|
||||
} SQueryDag;
|
||||
|
||||
/**
|
||||
|
@ -74,6 +115,7 @@ int32_t qCreateQueryDag(const struct SQueryStmtInfo* pQueryInfo, struct SEpSet*
|
|||
|
||||
int32_t qExplainQuery(const struct SQueryStmtInfo* pQueryInfo, struct SEpSet* pQnode, char** str);
|
||||
|
||||
|
||||
/**
|
||||
* Convert to subplan to string for the scheduler to send to the executor
|
||||
*/
|
||||
|
|
|
@ -0,0 +1,48 @@
|
|||
/*
|
||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||
*
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3
|
||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#if defined(INCLUDE_AS_ENUM) // enum define mode
|
||||
#undef OP_ENUM_MACRO
|
||||
#define OP_ENUM_MACRO(op) OP_##op,
|
||||
#elif defined(INCLUDE_AS_NAME) // comment define mode
|
||||
#undef OP_ENUM_MACRO
|
||||
#define OP_ENUM_MACRO(op) #op,
|
||||
#else
|
||||
#error To use this include file, first define either INCLUDE_AS_ENUM or INCLUDE_AS_NAME
|
||||
#endif
|
||||
|
||||
OP_ENUM_MACRO(TableScan)
|
||||
OP_ENUM_MACRO(DataBlocksOptScan)
|
||||
OP_ENUM_MACRO(TableSeqScan)
|
||||
OP_ENUM_MACRO(TagScan)
|
||||
OP_ENUM_MACRO(TableBlockInfoScan)
|
||||
OP_ENUM_MACRO(Aggregate)
|
||||
OP_ENUM_MACRO(Project)
|
||||
OP_ENUM_MACRO(Groupby)
|
||||
OP_ENUM_MACRO(Limit)
|
||||
OP_ENUM_MACRO(SLimit)
|
||||
OP_ENUM_MACRO(TimeWindow)
|
||||
OP_ENUM_MACRO(SessionWindow)
|
||||
OP_ENUM_MACRO(StateWindow)
|
||||
OP_ENUM_MACRO(Fill)
|
||||
OP_ENUM_MACRO(MultiTableAggregate)
|
||||
OP_ENUM_MACRO(MultiTableTimeInterval)
|
||||
OP_ENUM_MACRO(Filter)
|
||||
OP_ENUM_MACRO(Distinct)
|
||||
OP_ENUM_MACRO(Join)
|
||||
OP_ENUM_MACRO(AllTimeWindow)
|
||||
OP_ENUM_MACRO(AllMultiTableTimeInterval)
|
||||
OP_ENUM_MACRO(Order)
|
||||
OP_ENUM_MACRO(Exchange)
|
|
@ -41,7 +41,7 @@ typedef struct SArray {
|
|||
* @param elemSize
|
||||
* @return
|
||||
*/
|
||||
void* taosArrayInit(size_t size, size_t elemSize);
|
||||
SArray* taosArrayInit(size_t size, size_t elemSize);
|
||||
|
||||
/**
|
||||
*
|
||||
|
|
|
@ -71,8 +71,7 @@ typedef struct SInsertParseContext {
|
|||
const char* pSql;
|
||||
SMsgBuf msg;
|
||||
struct SCatalog* pCatalog;
|
||||
SMetaData meta; // need release
|
||||
const STableMeta* pTableMeta;
|
||||
STableMeta tableMeta;
|
||||
SHashObj* pTableBlockHashObj; // data block for each table. need release
|
||||
int32_t totalNum;
|
||||
SInsertStmtInfo* pOutput;
|
||||
|
@ -165,29 +164,29 @@ static int32_t skipInsertInto(SInsertParseContext* pCxt) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t buildTableName(SInsertParseContext* pCxt, SToken* pStname, SArray* tableNameList) {
|
||||
static int32_t buildName(SInsertParseContext* pCxt, SToken* pStname, char* fullDbName, char* tableName) {
|
||||
if (parserValidateIdToken(pStname) != TSDB_CODE_SUCCESS) {
|
||||
return buildSyntaxErrMsg(&pCxt->msg, "invalid table name", pStname->z);
|
||||
}
|
||||
|
||||
SName name = {0};
|
||||
strcpy(name.dbname, pCxt->pComCxt->pDbname);
|
||||
strncpy(name.tname, pStname->z, pStname->n);
|
||||
taosArrayPush(tableNameList, &name);
|
||||
|
||||
char* p = strnchr(pStname->z, TS_PATH_DELIMITER[0], pStname->n, false);
|
||||
if (NULL != p) { // db.table
|
||||
strcpy(fullDbName, pCxt->pComCxt->pAcctId);
|
||||
fullDbName[strlen(pCxt->pComCxt->pAcctId)] = TS_PATH_DELIMITER[0];
|
||||
strncpy(fullDbName, pStname->z, p - pStname->z);
|
||||
strncpy(tableName, p + 1, pStname->n - (p - pStname->z) - 1);
|
||||
} else {
|
||||
snprintf(fullDbName, TSDB_FULL_DB_NAME_LEN, "%s.%s", pCxt->pComCxt->pAcctId, pCxt->pComCxt->pDbname);
|
||||
strncpy(tableName, pStname->z, pStname->n);
|
||||
}
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t buildMetaReq(SInsertParseContext* pCxt, SToken* pStname, SCatalogReq* pMetaReq) {
|
||||
pMetaReq->pTableName = taosArrayInit(4, sizeof(SName));
|
||||
return buildTableName(pCxt, pStname, pMetaReq->pTableName);
|
||||
}
|
||||
|
||||
static int32_t getTableMeta(SInsertParseContext* pCxt, SToken* pTname) {
|
||||
SCatalogReq req;
|
||||
CHECK_CODE(buildMetaReq(pCxt, pTname, &req));
|
||||
CHECK_CODE(catalogGetTableMeta(pCxt->pCatalog, NULL, NULL, NULL, &pCxt->meta)); //TODO
|
||||
pCxt->pTableMeta = (STableMeta*)taosArrayGetP(pCxt->meta.pTableMeta, 0);
|
||||
char fullDbName[TSDB_FULL_DB_NAME_LEN] = {0};
|
||||
char tableName[TSDB_TABLE_NAME_LEN] = {0};
|
||||
CHECK_CODE(buildName(pCxt, pTname, fullDbName, tableName));
|
||||
CHECK_CODE(catalogGetTableMeta(pCxt->pCatalog, pCxt->pComCxt->pRpc, pCxt->pComCxt->pEpSet, fullDbName, &pCxt->tableMeta));
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
@ -646,13 +645,13 @@ static int32_t parseUsingClause(SInsertParseContext* pCxt, SToken* pTbnameToken)
|
|||
// pSql -> stb_name [(tag1_name, ...)] TAGS (tag1_value, ...)
|
||||
NEXT_TOKEN(pCxt->pSql, sToken);
|
||||
CHECK_CODE(getTableMeta(pCxt, &sToken));
|
||||
if (TSDB_SUPER_TABLE != pCxt->pTableMeta->tableType) {
|
||||
if (TSDB_SUPER_TABLE != pCxt->tableMeta.tableType) {
|
||||
return buildInvalidOperationMsg(&pCxt->msg, "create table only from super table is allowed");
|
||||
}
|
||||
|
||||
SSchema* pTagsSchema = getTableTagSchema(pCxt->pTableMeta);
|
||||
SSchema* pTagsSchema = getTableTagSchema(&pCxt->tableMeta);
|
||||
SParsedDataColInfo spd = {0};
|
||||
setBoundColumnInfo(&spd, pTagsSchema, getNumOfTags(pCxt->pTableMeta));
|
||||
setBoundColumnInfo(&spd, pTagsSchema, getNumOfTags(&pCxt->tableMeta));
|
||||
|
||||
// pSql -> [(tag1_name, ...)] TAGS (tag1_value, ...)
|
||||
NEXT_TOKEN(pCxt->pSql, sToken);
|
||||
|
@ -669,7 +668,7 @@ static int32_t parseUsingClause(SInsertParseContext* pCxt, SToken* pTbnameToken)
|
|||
if (TK_LP != sToken.type) {
|
||||
return buildSyntaxErrMsg(&pCxt->msg, "( is expected", sToken.z);
|
||||
}
|
||||
CHECK_CODE(parseTagsClause(pCxt, &spd, pTagsSchema, getTableInfo(pCxt->pTableMeta).precision));
|
||||
CHECK_CODE(parseTagsClause(pCxt, &spd, pTagsSchema, getTableInfo(&pCxt->tableMeta).precision));
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
@ -811,12 +810,12 @@ static int32_t parseInsertBody(SInsertParseContext* pCxt) {
|
|||
}
|
||||
|
||||
STableDataBlocks *dataBuf = NULL;
|
||||
CHECK_CODE(getDataBlockFromList(pCxt->pTableBlockHashObj, pCxt->pTableMeta->uid, TSDB_DEFAULT_PAYLOAD_SIZE,
|
||||
sizeof(SSubmitBlk), getTableInfo(pCxt->pTableMeta).rowSize, pCxt->pTableMeta, &dataBuf, NULL));
|
||||
CHECK_CODE(getDataBlockFromList(pCxt->pTableBlockHashObj, pCxt->tableMeta.uid, TSDB_DEFAULT_PAYLOAD_SIZE,
|
||||
sizeof(SSubmitBlk), getTableInfo(&pCxt->tableMeta).rowSize, &pCxt->tableMeta, &dataBuf, NULL));
|
||||
|
||||
if (TK_LP == sToken.type) {
|
||||
// pSql -> field1_name, ...)
|
||||
CHECK_CODE_1(parseBoundColumns(pCxt, &dataBuf->boundColumnInfo, getTableColumnSchema(pCxt->pTableMeta)), destroyBoundColumnInfo(&dataBuf->boundColumnInfo));
|
||||
CHECK_CODE_1(parseBoundColumns(pCxt, &dataBuf->boundColumnInfo, getTableColumnSchema(&pCxt->tableMeta)), destroyBoundColumnInfo(&dataBuf->boundColumnInfo));
|
||||
NEXT_TOKEN(pCxt->pSql, sToken);
|
||||
}
|
||||
|
||||
|
@ -862,18 +861,17 @@ int32_t parseInsertSql(SParseContext* pContext, SInsertStmtInfo** pInfo) {
|
|||
.pSql = pContext->pSql,
|
||||
.msg = {.buf = pContext->pMsg, .len = pContext->msgLen},
|
||||
.pCatalog = NULL,
|
||||
.pTableMeta = NULL,
|
||||
.tableMeta = {0},
|
||||
.pTableBlockHashObj = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false),
|
||||
.totalNum = 0,
|
||||
.pOutput = *pInfo
|
||||
};
|
||||
|
||||
CHECK_CODE(catalogGetHandle(NULL, &context.pCatalog)); //TODO
|
||||
|
||||
if (NULL == context.pTableBlockHashObj) {
|
||||
return TSDB_CODE_TSC_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
CHECK_CODE(catalogGetHandle(pContext->pClusterId, &context.pCatalog));
|
||||
CHECK_CODE(skipInsertInto(&context));
|
||||
CHECK_CODE(parseInsertBody(&context));
|
||||
|
||||
|
|
|
@ -6,13 +6,16 @@ SET(CMAKE_CXX_STANDARD 11)
|
|||
AUX_SOURCE_DIRECTORY(${CMAKE_CURRENT_SOURCE_DIR} SOURCE_LIST)
|
||||
|
||||
ADD_EXECUTABLE(parserTest ${SOURCE_LIST})
|
||||
TARGET_LINK_LIBRARIES(
|
||||
parserTest
|
||||
PUBLIC os util common parser catalog transport gtest function planner query
|
||||
)
|
||||
|
||||
TARGET_INCLUDE_DIRECTORIES(
|
||||
parserTest
|
||||
PUBLIC "${CMAKE_SOURCE_DIR}/include/libs/parser/"
|
||||
PRIVATE "${CMAKE_SOURCE_DIR}/source/libs/parser/inc"
|
||||
)
|
||||
|
||||
TARGET_LINK_LIBRARIES(
|
||||
parserTest
|
||||
PUBLIC os util common parser catalog transport gtest function planner query
|
||||
)
|
||||
|
||||
TARGET_LINK_OPTIONS(parserTest PRIVATE -Wl,-wrap,malloc)
|
||||
|
|
|
@ -27,6 +27,27 @@ namespace {
|
|||
}
|
||||
}
|
||||
|
||||
extern "C" {
|
||||
|
||||
#include <execinfo.h>
|
||||
|
||||
void *__real_malloc(size_t);
|
||||
|
||||
void *__wrap_malloc(size_t c) {
|
||||
// printf("My MALLOC called: %d\n", c);
|
||||
// void *array[32];
|
||||
// int size = backtrace(array, 32);
|
||||
// char **symbols = backtrace_symbols(array, size);
|
||||
// for (int i = 0; i < size; ++i) {
|
||||
// cout << symbols[i] << endl;
|
||||
// }
|
||||
// free(symbols);
|
||||
|
||||
return __real_malloc(c);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
// syntax:
|
||||
// INSERT INTO
|
||||
// tb_name
|
||||
|
|
|
@ -23,6 +23,7 @@ extern "C" {
|
|||
#include "common.h"
|
||||
#include "tarray.h"
|
||||
#include "planner.h"
|
||||
#include "parser.h"
|
||||
#include "taosmsg.h"
|
||||
|
||||
#define QNODE_TAGSCAN 1
|
||||
|
@ -40,11 +41,6 @@ extern "C" {
|
|||
#define QNODE_STATEWINDOW 13
|
||||
#define QNODE_FILL 14
|
||||
|
||||
typedef struct SQueryNodeBasicInfo {
|
||||
int32_t type; // operator type
|
||||
char *name; // operator name
|
||||
} SQueryNodeBasicInfo;
|
||||
|
||||
typedef struct SQueryDistPlanNodeInfo {
|
||||
bool stableQuery; // super table query or not
|
||||
int32_t phase; // merge|partial
|
||||
|
@ -54,8 +50,9 @@ typedef struct SQueryDistPlanNodeInfo {
|
|||
} SQueryDistPlanNodeInfo;
|
||||
|
||||
typedef struct SQueryTableInfo {
|
||||
char *tableName;
|
||||
uint64_t uid;
|
||||
char *tableName; // to be deleted
|
||||
uint64_t uid; // to be deleted
|
||||
STableMetaInfo* pMeta;
|
||||
STimeWindow window;
|
||||
} SQueryTableInfo;
|
||||
|
||||
|
@ -72,46 +69,6 @@ typedef struct SQueryPlanNode {
|
|||
struct SQueryPlanNode *pParent;
|
||||
} SQueryPlanNode;
|
||||
|
||||
typedef SSchema SSlotSchema;
|
||||
|
||||
typedef struct SDataBlockSchema {
|
||||
int32_t index;
|
||||
SSlotSchema *pSchema;
|
||||
int32_t numOfCols; // number of columns
|
||||
} SDataBlockSchema;
|
||||
|
||||
typedef struct SPhyNode {
|
||||
SQueryNodeBasicInfo info;
|
||||
SArray *pTargets; // target list to be computed or scanned at this node
|
||||
SArray *pConditions; // implicitly-ANDed qual conditions
|
||||
SDataBlockSchema targetSchema;
|
||||
// children plan to generated result for current node to process
|
||||
// in case of join, multiple plan nodes exist.
|
||||
SArray *pChildren;
|
||||
struct SPhyNode *pParent;
|
||||
} SPhyNode;
|
||||
|
||||
typedef struct SScanPhyNode {
|
||||
SPhyNode node;
|
||||
STimeWindow window;
|
||||
uint64_t uid; // unique id of the table
|
||||
} SScanPhyNode;
|
||||
|
||||
typedef SScanPhyNode STagScanPhyNode;
|
||||
|
||||
typedef SScanPhyNode SSystemTableScanPhyNode;
|
||||
|
||||
typedef struct SMultiTableScanPhyNode {
|
||||
SScanPhyNode scan;
|
||||
SArray *pTagsConditions; // implicitly-ANDed tag qual conditions
|
||||
} SMultiTableScanPhyNode;
|
||||
|
||||
typedef SMultiTableScanPhyNode SMultiTableSeqScanPhyNode;
|
||||
|
||||
typedef struct SProjectPhyNode {
|
||||
SPhyNode node;
|
||||
} SProjectPhyNode;
|
||||
|
||||
/**
|
||||
* Optimize the query execution plan, currently not implement yet.
|
||||
* @param pQueryNode
|
||||
|
|
|
@ -14,64 +14,107 @@
|
|||
*/
|
||||
|
||||
#include "plannerInt.h"
|
||||
#include "parser.h"
|
||||
|
||||
// typedef struct SQueryPlanNode {
|
||||
// void *pExtInfo; // additional information
|
||||
// SArray *pPrevNodes; // children
|
||||
// struct SQueryPlanNode *nextNode; // parent
|
||||
// } SQueryPlanNode;
|
||||
static const char* gOpName[] = {
|
||||
"Unknown",
|
||||
#define INCLUDE_AS_NAME
|
||||
#include "plannerOp.h"
|
||||
#undef INCLUDE_AS_NAME
|
||||
};
|
||||
|
||||
// typedef struct SSubplan {
|
||||
// int32_t type; // QUERY_TYPE_MERGE|QUERY_TYPE_PARTIAL|QUERY_TYPE_SCAN
|
||||
// SArray *pDatasource; // the datasource subplan,from which to fetch the result
|
||||
// struct SPhyNode *pNode; // physical plan of current subplan
|
||||
// } SSubplan;
|
||||
typedef struct SPlanContext {
|
||||
struct SCatalog* pCatalog;
|
||||
struct SQueryDag* pDag;
|
||||
SSubplan* pCurrentSubplan;
|
||||
SSubplanId nextId;
|
||||
} SPlanContext;
|
||||
|
||||
// typedef struct SQueryDag {
|
||||
// SArray **pSubplans;
|
||||
// } SQueryDag;
|
||||
|
||||
// typedef struct SScanPhyNode {
|
||||
// SPhyNode node;
|
||||
// STimeWindow window;
|
||||
// uint64_t uid; // unique id of the table
|
||||
// } SScanPhyNode;
|
||||
|
||||
// typedef SScanPhyNode STagScanPhyNode;
|
||||
|
||||
void fillDataBlockSchema(SQueryPlanNode* pPlanNode, SDataBlockSchema* dataBlockSchema) {
|
||||
dataBlockSchema->index = 0; // todo
|
||||
static void toDataBlockSchema(SQueryPlanNode* pPlanNode, SDataBlockSchema* dataBlockSchema) {
|
||||
SWAP(dataBlockSchema->pSchema, pPlanNode->pSchema, SSchema*);
|
||||
dataBlockSchema->numOfCols = pPlanNode->numOfCols;
|
||||
}
|
||||
|
||||
void fillPhyNode(SQueryPlanNode* pPlanNode, int32_t type, const char* name, SPhyNode* node) {
|
||||
static SPhyNode* initPhyNode(SQueryPlanNode* pPlanNode, int32_t type, int32_t size) {
|
||||
SPhyNode* node = (SPhyNode*)calloc(1, size);
|
||||
node->info.type = type;
|
||||
node->info.name = name;
|
||||
node->info.name = gOpName[type];
|
||||
SWAP(node->pTargets, pPlanNode->pExpr, SArray*);
|
||||
fillDataBlockSchema(pPlanNode, &(node->targetSchema));
|
||||
toDataBlockSchema(pPlanNode, &(node->targetSchema));
|
||||
}
|
||||
|
||||
SPhyNode* createTagScanNode(SQueryPlanNode* pPlanNode) {
|
||||
STagScanPhyNode* node = calloc(1, sizeof(STagScanPhyNode));
|
||||
fillPhyNode(pPlanNode, OP_TagScan, "TagScan", (SPhyNode*)node);
|
||||
return (SPhyNode*)node;
|
||||
static SPhyNode* createTagScanNode(SQueryPlanNode* pPlanNode) {
|
||||
return initPhyNode(pPlanNode, OP_TagScan, sizeof(STagScanPhyNode));
|
||||
}
|
||||
|
||||
SPhyNode* createScanNode(SQueryPlanNode* pPlanNode) {
|
||||
STagScanPhyNode* node = calloc(1, sizeof(STagScanPhyNode));
|
||||
fillPhyNode(pPlanNode, OP_TableScan, "SingleTableScan", (SPhyNode*)node);
|
||||
return (SPhyNode*)node;
|
||||
static SSubplan* initSubplan(SPlanContext* pCxt, int32_t type) {
|
||||
SSubplan* subplan = calloc(1, sizeof(SSubplan));
|
||||
subplan->id = pCxt->nextId;
|
||||
++(pCxt->nextId.subplanId);
|
||||
subplan->type = type;
|
||||
subplan->level = 0;
|
||||
if (NULL != pCxt->pCurrentSubplan) {
|
||||
subplan->level = pCxt->pCurrentSubplan->level + 1;
|
||||
if (NULL == pCxt->pCurrentSubplan->pChildern) {
|
||||
pCxt->pCurrentSubplan->pChildern = taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES);
|
||||
}
|
||||
taosArrayPush(pCxt->pCurrentSubplan->pChildern, subplan);
|
||||
subplan->pParents = taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES);
|
||||
taosArrayPush(subplan->pParents, pCxt->pCurrentSubplan);
|
||||
}
|
||||
pCxt->pCurrentSubplan = subplan;
|
||||
return subplan;
|
||||
}
|
||||
|
||||
SPhyNode* createPhyNode(SQueryPlanNode* pPlanNode) {
|
||||
static uint8_t getScanFlag(SQueryPlanNode* pPlanNode) {
|
||||
// todo
|
||||
return MASTER_SCAN;
|
||||
}
|
||||
|
||||
static SPhyNode* createTableScanNode(SPlanContext* pCxt, SQueryPlanNode* pPlanNode, SQueryTableInfo* pTable) {
|
||||
STableScanPhyNode* node = (STableScanPhyNode*)initPhyNode(pPlanNode, OP_TableScan, sizeof(STableScanPhyNode));
|
||||
node->scan.uid = pTable->pMeta->pTableMeta->uid;
|
||||
node->scan.tableType = pTable->pMeta->pTableMeta->tableType;
|
||||
node->scanFlag = getScanFlag(pPlanNode);
|
||||
node->window = pTable->window;
|
||||
// todo tag cond
|
||||
}
|
||||
|
||||
static void vgroupToEpSet(const SVgroupMsg* vg, SEpSet* epSet) {
|
||||
// todo
|
||||
}
|
||||
|
||||
static void splitSubplanBySTable(SPlanContext* pCxt, SQueryPlanNode* pPlanNode, SQueryTableInfo* pTable) {
|
||||
SVgroupsInfo* vgroupList = pTable->pMeta->vgroupList;
|
||||
for (int32_t i = 0; i < pTable->pMeta->vgroupList->numOfVgroups; ++i) {
|
||||
SSubplan* subplan = initSubplan(pCxt, QUERY_TYPE_SCAN);
|
||||
vgroupToEpSet(&(pTable->pMeta->vgroupList->vgroups[i]), &subplan->execEpSet);
|
||||
subplan->pNode = createTableScanNode(pCxt, pPlanNode, pTable);
|
||||
// todo reset pCxt->pCurrentSubplan
|
||||
}
|
||||
}
|
||||
|
||||
static SPhyNode* createExchangeNode() {
|
||||
|
||||
}
|
||||
|
||||
static SPhyNode* createScanNode(SPlanContext* pCxt, SQueryPlanNode* pPlanNode) {
|
||||
SQueryTableInfo* pTable = (SQueryTableInfo*)pPlanNode->pExtInfo;
|
||||
if (TSDB_SUPER_TABLE == pTable->pMeta->pTableMeta->tableType) {
|
||||
splitSubplanBySTable(pCxt, pPlanNode, pTable);
|
||||
return createExchangeNode(pCxt, pTable);
|
||||
}
|
||||
return createTableScanNode(pCxt, pPlanNode, pTable);
|
||||
}
|
||||
|
||||
static SPhyNode* createPhyNode(SPlanContext* pCxt, SQueryPlanNode* pPlanNode) {
|
||||
SPhyNode* node = NULL;
|
||||
switch (pPlanNode->info.type) {
|
||||
case QNODE_TAGSCAN:
|
||||
node = createTagScanNode(pPlanNode);
|
||||
break;
|
||||
case QNODE_TABLESCAN:
|
||||
node = createScanNode(pPlanNode);
|
||||
node = createScanNode(pCxt, pPlanNode);
|
||||
break;
|
||||
default:
|
||||
assert(false);
|
||||
|
@ -80,7 +123,7 @@ SPhyNode* createPhyNode(SQueryPlanNode* pPlanNode) {
|
|||
node->pChildren = taosArrayInit(4, POINTER_BYTES);
|
||||
size_t size = taosArrayGetSize(pPlanNode->pChildren);
|
||||
for(int32_t i = 0; i < size; ++i) {
|
||||
SPhyNode* child = createPhyNode(taosArrayGet(pPlanNode->pChildren, i));
|
||||
SPhyNode* child = createPhyNode(pCxt, taosArrayGet(pPlanNode->pChildren, i));
|
||||
child->pParent = node;
|
||||
taosArrayPush(node->pChildren, &child);
|
||||
}
|
||||
|
@ -88,13 +131,30 @@ SPhyNode* createPhyNode(SQueryPlanNode* pPlanNode) {
|
|||
return node;
|
||||
}
|
||||
|
||||
SSubplan* createSubplan(SQueryPlanNode* pSubquery) {
|
||||
SSubplan* subplan = calloc(1, sizeof(SSubplan));
|
||||
subplan->pNode = createPhyNode(pSubquery);
|
||||
// todo
|
||||
return subplan;
|
||||
static void createSubplanByLevel(SPlanContext* pCxt, SQueryPlanNode* pRoot) {
|
||||
SSubplan* subplan = initSubplan(pCxt, QUERY_TYPE_MERGE);
|
||||
subplan->pNode = createPhyNode(pCxt, pRoot);
|
||||
SArray* l0 = taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES);
|
||||
taosArrayPush(l0, &subplan);
|
||||
taosArrayPush(pCxt->pDag->pSubplans, &l0);
|
||||
// todo deal subquery
|
||||
}
|
||||
|
||||
int32_t createDag(struct SQueryPlanNode* pQueryNode, struct SEpSet* pQnode, struct SQueryDag** pDag) {
|
||||
return 0;
|
||||
int32_t createDag(SQueryPlanNode* pQueryNode, struct SCatalog* pCatalog, SQueryDag** pDag) {
|
||||
SPlanContext context = {
|
||||
.pCatalog = pCatalog,
|
||||
.pDag = calloc(1, sizeof(SQueryDag)),
|
||||
.pCurrentSubplan = NULL
|
||||
};
|
||||
if (NULL == context.pDag) {
|
||||
return TSDB_CODE_TSC_OUT_OF_MEMORY;
|
||||
}
|
||||
context.pDag->pSubplans = taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES);
|
||||
createSubplanByLevel(&context, pQueryNode);
|
||||
*pDag = context.pDag;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t subPlanToString(struct SSubplan *pPhyNode, char** str) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
|
|
@ -95,6 +95,7 @@ static SQueryPlanNode* createQueryNode(int32_t type, const char* name, SQueryPla
|
|||
}
|
||||
|
||||
switch(type) {
|
||||
case QNODE_TAGSCAN:
|
||||
case QNODE_TABLESCAN: {
|
||||
SQueryTableInfo* info = calloc(1, sizeof(SQueryTableInfo));
|
||||
memcpy(info, pExtInfo, sizeof(SQueryTableInfo));
|
||||
|
@ -162,7 +163,7 @@ static SQueryPlanNode* doAddTableColumnNode(SQueryStmtInfo* pQueryInfo, STableMe
|
|||
SArray* pExprs, SArray* tableCols) {
|
||||
if (pQueryInfo->info.onlyTagQuery) {
|
||||
int32_t num = (int32_t) taosArrayGetSize(pExprs);
|
||||
SQueryPlanNode* pNode = createQueryNode(QNODE_TAGSCAN, "TableTagScan", NULL, 0, pExprs->pData, num, NULL);
|
||||
SQueryPlanNode* pNode = createQueryNode(QNODE_TAGSCAN, "TableTagScan", NULL, 0, pExprs->pData, num, info);
|
||||
|
||||
if (pQueryInfo->info.distinct) {
|
||||
pNode = createQueryNode(QNODE_DISTINCT, "Distinct", &pNode, 1, pExprs->pData, num, NULL);
|
||||
|
|
|
@ -9,5 +9,5 @@ target_include_directories(
|
|||
|
||||
target_link_libraries(
|
||||
scheduler
|
||||
PRIVATE os util planner
|
||||
PRIVATE os util planner common
|
||||
)
|
|
@ -17,7 +17,7 @@
|
|||
#include "tarray.h"
|
||||
#include "talgo.h"
|
||||
|
||||
void* taosArrayInit(size_t size, size_t elemSize) {
|
||||
SArray* taosArrayInit(size_t size, size_t elemSize) {
|
||||
assert(elemSize > 0);
|
||||
|
||||
if (size < TARRAY_MIN_SIZE) {
|
||||
|
|
Loading…
Reference in New Issue