enh: add unit test and delete useless code
This commit is contained in:
parent
cb41374c90
commit
02ac6fa552
|
@ -1726,6 +1726,9 @@ typedef struct {
|
|||
#define STREAM_TRIGGER_WINDOW_CLOSE 2
|
||||
#define STREAM_TRIGGER_MAX_DELAY 3
|
||||
#define STREAM_DEFAULT_IGNORE_EXPIRED 0
|
||||
#define STREAM_FILL_HISTORY_ON 1
|
||||
#define STREAM_FILL_HISTORY_OFF 0
|
||||
#define STREAM_DEFAULT_FILL_HISTORY STREAM_FILL_HISTORY_OFF
|
||||
|
||||
typedef struct {
|
||||
char name[TSDB_STREAM_FNAME_LEN];
|
||||
|
|
|
@ -203,135 +203,136 @@
|
|||
#define TK_WINDOW_CLOSE 185
|
||||
#define TK_IGNORE 186
|
||||
#define TK_EXPIRED 187
|
||||
#define TK_SUBTABLE 188
|
||||
#define TK_KILL 189
|
||||
#define TK_CONNECTION 190
|
||||
#define TK_TRANSACTION 191
|
||||
#define TK_BALANCE 192
|
||||
#define TK_VGROUP 193
|
||||
#define TK_MERGE 194
|
||||
#define TK_REDISTRIBUTE 195
|
||||
#define TK_SPLIT 196
|
||||
#define TK_DELETE 197
|
||||
#define TK_INSERT 198
|
||||
#define TK_NULL 199
|
||||
#define TK_NK_QUESTION 200
|
||||
#define TK_NK_ARROW 201
|
||||
#define TK_ROWTS 202
|
||||
#define TK_TBNAME 203
|
||||
#define TK_QSTART 204
|
||||
#define TK_QEND 205
|
||||
#define TK_QDURATION 206
|
||||
#define TK_WSTART 207
|
||||
#define TK_WEND 208
|
||||
#define TK_WDURATION 209
|
||||
#define TK_IROWTS 210
|
||||
#define TK_QTAGS 211
|
||||
#define TK_CAST 212
|
||||
#define TK_NOW 213
|
||||
#define TK_TODAY 214
|
||||
#define TK_TIMEZONE 215
|
||||
#define TK_CLIENT_VERSION 216
|
||||
#define TK_SERVER_VERSION 217
|
||||
#define TK_SERVER_STATUS 218
|
||||
#define TK_CURRENT_USER 219
|
||||
#define TK_COUNT 220
|
||||
#define TK_LAST_ROW 221
|
||||
#define TK_CASE 222
|
||||
#define TK_END 223
|
||||
#define TK_WHEN 224
|
||||
#define TK_THEN 225
|
||||
#define TK_ELSE 226
|
||||
#define TK_BETWEEN 227
|
||||
#define TK_IS 228
|
||||
#define TK_NK_LT 229
|
||||
#define TK_NK_GT 230
|
||||
#define TK_NK_LE 231
|
||||
#define TK_NK_GE 232
|
||||
#define TK_NK_NE 233
|
||||
#define TK_MATCH 234
|
||||
#define TK_NMATCH 235
|
||||
#define TK_CONTAINS 236
|
||||
#define TK_IN 237
|
||||
#define TK_JOIN 238
|
||||
#define TK_INNER 239
|
||||
#define TK_SELECT 240
|
||||
#define TK_DISTINCT 241
|
||||
#define TK_WHERE 242
|
||||
#define TK_PARTITION 243
|
||||
#define TK_BY 244
|
||||
#define TK_SESSION 245
|
||||
#define TK_STATE_WINDOW 246
|
||||
#define TK_SLIDING 247
|
||||
#define TK_FILL 248
|
||||
#define TK_VALUE 249
|
||||
#define TK_NONE 250
|
||||
#define TK_PREV 251
|
||||
#define TK_LINEAR 252
|
||||
#define TK_NEXT 253
|
||||
#define TK_HAVING 254
|
||||
#define TK_RANGE 255
|
||||
#define TK_EVERY 256
|
||||
#define TK_ORDER 257
|
||||
#define TK_SLIMIT 258
|
||||
#define TK_SOFFSET 259
|
||||
#define TK_LIMIT 260
|
||||
#define TK_OFFSET 261
|
||||
#define TK_ASC 262
|
||||
#define TK_NULLS 263
|
||||
#define TK_ABORT 264
|
||||
#define TK_AFTER 265
|
||||
#define TK_ATTACH 266
|
||||
#define TK_BEFORE 267
|
||||
#define TK_BEGIN 268
|
||||
#define TK_BITAND 269
|
||||
#define TK_BITNOT 270
|
||||
#define TK_BITOR 271
|
||||
#define TK_BLOCKS 272
|
||||
#define TK_CHANGE 273
|
||||
#define TK_COMMA 274
|
||||
#define TK_COMPACT 275
|
||||
#define TK_CONCAT 276
|
||||
#define TK_CONFLICT 277
|
||||
#define TK_COPY 278
|
||||
#define TK_DEFERRED 279
|
||||
#define TK_DELIMITERS 280
|
||||
#define TK_DETACH 281
|
||||
#define TK_DIVIDE 282
|
||||
#define TK_DOT 283
|
||||
#define TK_EACH 284
|
||||
#define TK_FAIL 285
|
||||
#define TK_FILE 286
|
||||
#define TK_FOR 287
|
||||
#define TK_GLOB 288
|
||||
#define TK_ID 289
|
||||
#define TK_IMMEDIATE 290
|
||||
#define TK_IMPORT 291
|
||||
#define TK_INITIALLY 292
|
||||
#define TK_INSTEAD 293
|
||||
#define TK_ISNULL 294
|
||||
#define TK_KEY 295
|
||||
#define TK_MODULES 296
|
||||
#define TK_NK_BITNOT 297
|
||||
#define TK_NK_SEMI 298
|
||||
#define TK_NOTNULL 299
|
||||
#define TK_OF 300
|
||||
#define TK_PLUS 301
|
||||
#define TK_PRIVILEGE 302
|
||||
#define TK_RAISE 303
|
||||
#define TK_REPLACE 304
|
||||
#define TK_RESTRICT 305
|
||||
#define TK_ROW 306
|
||||
#define TK_SEMI 307
|
||||
#define TK_STAR 308
|
||||
#define TK_STATEMENT 309
|
||||
#define TK_STRING 310
|
||||
#define TK_TIMES 311
|
||||
#define TK_UPDATE 312
|
||||
#define TK_VALUES 313
|
||||
#define TK_VARIABLE 314
|
||||
#define TK_VIEW 315
|
||||
#define TK_WAL 316
|
||||
#define TK_FILL_HISTORY 188
|
||||
#define TK_SUBTABLE 189
|
||||
#define TK_KILL 190
|
||||
#define TK_CONNECTION 191
|
||||
#define TK_TRANSACTION 192
|
||||
#define TK_BALANCE 193
|
||||
#define TK_VGROUP 194
|
||||
#define TK_MERGE 195
|
||||
#define TK_REDISTRIBUTE 196
|
||||
#define TK_SPLIT 197
|
||||
#define TK_DELETE 198
|
||||
#define TK_INSERT 199
|
||||
#define TK_NULL 200
|
||||
#define TK_NK_QUESTION 201
|
||||
#define TK_NK_ARROW 202
|
||||
#define TK_ROWTS 203
|
||||
#define TK_TBNAME 204
|
||||
#define TK_QSTART 205
|
||||
#define TK_QEND 206
|
||||
#define TK_QDURATION 207
|
||||
#define TK_WSTART 208
|
||||
#define TK_WEND 209
|
||||
#define TK_WDURATION 210
|
||||
#define TK_IROWTS 211
|
||||
#define TK_QTAGS 212
|
||||
#define TK_CAST 213
|
||||
#define TK_NOW 214
|
||||
#define TK_TODAY 215
|
||||
#define TK_TIMEZONE 216
|
||||
#define TK_CLIENT_VERSION 217
|
||||
#define TK_SERVER_VERSION 218
|
||||
#define TK_SERVER_STATUS 219
|
||||
#define TK_CURRENT_USER 220
|
||||
#define TK_COUNT 221
|
||||
#define TK_LAST_ROW 222
|
||||
#define TK_CASE 223
|
||||
#define TK_END 224
|
||||
#define TK_WHEN 225
|
||||
#define TK_THEN 226
|
||||
#define TK_ELSE 227
|
||||
#define TK_BETWEEN 228
|
||||
#define TK_IS 229
|
||||
#define TK_NK_LT 230
|
||||
#define TK_NK_GT 231
|
||||
#define TK_NK_LE 232
|
||||
#define TK_NK_GE 233
|
||||
#define TK_NK_NE 234
|
||||
#define TK_MATCH 235
|
||||
#define TK_NMATCH 236
|
||||
#define TK_CONTAINS 237
|
||||
#define TK_IN 238
|
||||
#define TK_JOIN 239
|
||||
#define TK_INNER 240
|
||||
#define TK_SELECT 241
|
||||
#define TK_DISTINCT 242
|
||||
#define TK_WHERE 243
|
||||
#define TK_PARTITION 244
|
||||
#define TK_BY 245
|
||||
#define TK_SESSION 246
|
||||
#define TK_STATE_WINDOW 247
|
||||
#define TK_SLIDING 248
|
||||
#define TK_FILL 249
|
||||
#define TK_VALUE 250
|
||||
#define TK_NONE 251
|
||||
#define TK_PREV 252
|
||||
#define TK_LINEAR 253
|
||||
#define TK_NEXT 254
|
||||
#define TK_HAVING 255
|
||||
#define TK_RANGE 256
|
||||
#define TK_EVERY 257
|
||||
#define TK_ORDER 258
|
||||
#define TK_SLIMIT 259
|
||||
#define TK_SOFFSET 260
|
||||
#define TK_LIMIT 261
|
||||
#define TK_OFFSET 262
|
||||
#define TK_ASC 263
|
||||
#define TK_NULLS 264
|
||||
#define TK_ABORT 265
|
||||
#define TK_AFTER 266
|
||||
#define TK_ATTACH 267
|
||||
#define TK_BEFORE 268
|
||||
#define TK_BEGIN 269
|
||||
#define TK_BITAND 270
|
||||
#define TK_BITNOT 271
|
||||
#define TK_BITOR 272
|
||||
#define TK_BLOCKS 273
|
||||
#define TK_CHANGE 274
|
||||
#define TK_COMMA 275
|
||||
#define TK_COMPACT 276
|
||||
#define TK_CONCAT 277
|
||||
#define TK_CONFLICT 278
|
||||
#define TK_COPY 279
|
||||
#define TK_DEFERRED 280
|
||||
#define TK_DELIMITERS 281
|
||||
#define TK_DETACH 282
|
||||
#define TK_DIVIDE 283
|
||||
#define TK_DOT 284
|
||||
#define TK_EACH 285
|
||||
#define TK_FAIL 286
|
||||
#define TK_FILE 287
|
||||
#define TK_FOR 288
|
||||
#define TK_GLOB 289
|
||||
#define TK_ID 290
|
||||
#define TK_IMMEDIATE 291
|
||||
#define TK_IMPORT 292
|
||||
#define TK_INITIALLY 293
|
||||
#define TK_INSTEAD 294
|
||||
#define TK_ISNULL 295
|
||||
#define TK_KEY 296
|
||||
#define TK_MODULES 297
|
||||
#define TK_NK_BITNOT 298
|
||||
#define TK_NK_SEMI 299
|
||||
#define TK_NOTNULL 300
|
||||
#define TK_OF 301
|
||||
#define TK_PLUS 302
|
||||
#define TK_PRIVILEGE 303
|
||||
#define TK_RAISE 304
|
||||
#define TK_REPLACE 305
|
||||
#define TK_RESTRICT 306
|
||||
#define TK_ROW 307
|
||||
#define TK_SEMI 308
|
||||
#define TK_STAR 309
|
||||
#define TK_STATEMENT 310
|
||||
#define TK_STRING 311
|
||||
#define TK_TIMES 312
|
||||
#define TK_UPDATE 313
|
||||
#define TK_VALUES 314
|
||||
#define TK_VARIABLE 315
|
||||
#define TK_VIEW 316
|
||||
#define TK_WAL 317
|
||||
|
||||
#define TK_NK_SPACE 300
|
||||
#define TK_NK_COMMENT 301
|
||||
|
|
|
@ -374,6 +374,7 @@ typedef struct SStreamOptions {
|
|||
int8_t triggerType;
|
||||
SNode* pDelay;
|
||||
SNode* pWatermark;
|
||||
int8_t fillHistory;
|
||||
int8_t ignoreExpired;
|
||||
} SStreamOptions;
|
||||
|
||||
|
|
|
@ -585,8 +585,6 @@ typedef struct SQueryPlan {
|
|||
SExplainInfo explainInfo;
|
||||
} SQueryPlan;
|
||||
|
||||
void nodesWalkPhysiPlan(SNode* pNode, FNodeWalker walker, void* pContext);
|
||||
|
||||
const char* dataOrderStr(EDataOrderLevel order);
|
||||
|
||||
#ifdef __cplusplus
|
||||
|
|
|
@ -61,7 +61,6 @@ int32_t qStringToSubplan(const char* pStr, SSubplan** pSubplan);
|
|||
int32_t qSubPlanToMsg(const SSubplan* pSubplan, char** pStr, int32_t* pLen);
|
||||
int32_t qMsgToSubplan(const char* pStr, int32_t len, SSubplan** pSubplan);
|
||||
|
||||
char* qQueryPlanToString(const SQueryPlan* pPlan);
|
||||
SQueryPlan* qStringToQueryPlan(const char* pStr);
|
||||
|
||||
void qDestroyQueryPlan(SQueryPlan* pPlan);
|
||||
|
|
|
@ -111,6 +111,8 @@ const char* nodesNodeName(ENodeType type) {
|
|||
return "DropSuperTableStmt";
|
||||
case QUERY_NODE_ALTER_TABLE_STMT:
|
||||
return "AlterTableStmt";
|
||||
case QUERY_NODE_ALTER_SUPER_TABLE_STMT:
|
||||
return "AlterSuperTableStmt";
|
||||
case QUERY_NODE_CREATE_USER_STMT:
|
||||
return "CreateUserStmt";
|
||||
case QUERY_NODE_ALTER_USER_STMT:
|
||||
|
@ -669,7 +671,7 @@ static int32_t logicProjectNodeToJson(const void* pObj, SJson* pJson) {
|
|||
code = nodeListToJson(pJson, jkProjectLogicPlanProjections, pNode->pProjections);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonAddIntegerToObject(pJson, jkProjectLogicPlanIgnoreGroupId, pNode->ignoreGroupId);
|
||||
code = tjsonAddBoolToObject(pJson, jkProjectLogicPlanIgnoreGroupId, pNode->ignoreGroupId);
|
||||
}
|
||||
|
||||
return code;
|
||||
|
@ -2758,6 +2760,20 @@ static int32_t logicAggNodeToJson(const void* pObj, SJson* pJson) {
|
|||
return code;
|
||||
}
|
||||
|
||||
static int32_t jsonToLogicAggNode(const SJson* pJson, void* pObj) {
|
||||
SAggLogicNode* pNode = (SAggLogicNode*)pObj;
|
||||
|
||||
int32_t code = jsonToLogicPlanNode(pJson, pObj);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = jsonToNodeList(pJson, jkAggLogicPlanGroupKeys, &pNode->pGroupKeys);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = jsonToNodeList(pJson, jkAggLogicPlanAggFuncs, &pNode->pAggFuncs);
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
static const char* jkDataTypeType = "Type";
|
||||
static const char* jkDataTypePrecision = "Precision";
|
||||
static const char* jkDataTypeScale = "Scale";
|
||||
|
@ -4735,6 +4751,8 @@ static int32_t jsonToSpecificNode(const SJson* pJson, void* pObj) {
|
|||
return jsonToDeleteStmt(pJson, pObj);
|
||||
case QUERY_NODE_LOGIC_PLAN_SCAN:
|
||||
return jsonToLogicScanNode(pJson, pObj);
|
||||
case QUERY_NODE_LOGIC_PLAN_AGG:
|
||||
return jsonToLogicAggNode(pJson, pObj);
|
||||
case QUERY_NODE_LOGIC_PLAN_PROJECT:
|
||||
return jsonToLogicProjectNode(pJson, pObj);
|
||||
case QUERY_NODE_LOGIC_PLAN_VNODE_MODIFY:
|
||||
|
|
|
@ -438,210 +438,3 @@ void nodesRewriteSelectStmt(SSelectStmt* pSelect, ESqlClause clause, FNodeRewrit
|
|||
|
||||
return;
|
||||
}
|
||||
|
||||
static EDealRes walkPhysiNode(SPhysiNode* pNode, ETraversalOrder order, FNodeWalker walker, void* pContext) {
|
||||
EDealRes res = walkPhysiPlan((SNode*)pNode->pOutputDataBlockDesc, order, walker, pContext);
|
||||
if (DEAL_RES_ERROR != res && DEAL_RES_END != res) {
|
||||
res = walkPhysiPlan(pNode->pConditions, order, walker, pContext);
|
||||
}
|
||||
if (DEAL_RES_ERROR != res && DEAL_RES_END != res) {
|
||||
res = walkPhysiPlans(pNode->pChildren, order, walker, pContext);
|
||||
}
|
||||
return res;
|
||||
}
|
||||
|
||||
static EDealRes walkScanPhysi(SScanPhysiNode* pScan, ETraversalOrder order, FNodeWalker walker, void* pContext) {
|
||||
EDealRes res = walkPhysiNode((SPhysiNode*)pScan, order, walker, pContext);
|
||||
if (DEAL_RES_ERROR != res && DEAL_RES_END != res) {
|
||||
res = walkPhysiPlans(pScan->pScanCols, order, walker, pContext);
|
||||
}
|
||||
return res;
|
||||
}
|
||||
|
||||
static EDealRes walkTableScanPhysi(STableScanPhysiNode* pScan, ETraversalOrder order, FNodeWalker walker,
|
||||
void* pContext) {
|
||||
EDealRes res = walkScanPhysi((SScanPhysiNode*)pScan, order, walker, pContext);
|
||||
if (DEAL_RES_ERROR != res && DEAL_RES_END != res) {
|
||||
res = walkPhysiPlans(pScan->pDynamicScanFuncs, order, walker, pContext);
|
||||
}
|
||||
return res;
|
||||
}
|
||||
|
||||
static EDealRes walkWindowPhysi(SWinodwPhysiNode* pWindow, ETraversalOrder order, FNodeWalker walker, void* pContext) {
|
||||
EDealRes res = walkPhysiNode((SPhysiNode*)pWindow, order, walker, pContext);
|
||||
if (DEAL_RES_ERROR != res && DEAL_RES_END != res) {
|
||||
res = walkPhysiPlans(pWindow->pExprs, order, walker, pContext);
|
||||
}
|
||||
if (DEAL_RES_ERROR != res && DEAL_RES_END != res) {
|
||||
res = walkPhysiPlans(pWindow->pFuncs, order, walker, pContext);
|
||||
}
|
||||
if (DEAL_RES_ERROR != res && DEAL_RES_END != res) {
|
||||
res = walkPhysiPlan(pWindow->pTspk, order, walker, pContext);
|
||||
}
|
||||
return res;
|
||||
}
|
||||
|
||||
static EDealRes dispatchPhysiPlan(SNode* pNode, ETraversalOrder order, FNodeWalker walker, void* pContext) {
|
||||
EDealRes res = DEAL_RES_CONTINUE;
|
||||
|
||||
switch (nodeType(pNode)) {
|
||||
case QUERY_NODE_NODE_LIST:
|
||||
res = walkPhysiPlans(((SNodeListNode*)pNode)->pNodeList, order, walker, pContext);
|
||||
break;
|
||||
case QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN:
|
||||
res = walkScanPhysi((SScanPhysiNode*)pNode, order, walker, pContext);
|
||||
break;
|
||||
case QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN:
|
||||
res = walkTableScanPhysi((STableScanPhysiNode*)pNode, order, walker, pContext);
|
||||
break;
|
||||
case QUERY_NODE_PHYSICAL_PLAN_TABLE_SEQ_SCAN:
|
||||
res = walkTableScanPhysi((STableScanPhysiNode*)pNode, order, walker, pContext);
|
||||
break;
|
||||
case QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN:
|
||||
res = walkScanPhysi((SScanPhysiNode*)pNode, order, walker, pContext);
|
||||
break;
|
||||
case QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN:
|
||||
res = walkScanPhysi((SScanPhysiNode*)pNode, order, walker, pContext);
|
||||
break;
|
||||
case QUERY_NODE_PHYSICAL_PLAN_PROJECT: {
|
||||
SProjectPhysiNode* pProject = (SProjectPhysiNode*)pNode;
|
||||
res = walkPhysiNode((SPhysiNode*)pNode, order, walker, pContext);
|
||||
if (DEAL_RES_ERROR != res && DEAL_RES_END != res) {
|
||||
res = walkPhysiPlans(pProject->pProjections, order, walker, pContext);
|
||||
}
|
||||
break;
|
||||
}
|
||||
case QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN: {
|
||||
SSortMergeJoinPhysiNode* pJoin = (SSortMergeJoinPhysiNode*)pNode;
|
||||
res = walkPhysiNode((SPhysiNode*)pNode, order, walker, pContext);
|
||||
if (DEAL_RES_ERROR != res && DEAL_RES_END != res) {
|
||||
res = walkPhysiPlan(pJoin->pMergeCondition, order, walker, pContext);
|
||||
}
|
||||
if (DEAL_RES_ERROR != res && DEAL_RES_END != res) {
|
||||
res = walkPhysiPlan(pJoin->pOnConditions, order, walker, pContext);
|
||||
}
|
||||
if (DEAL_RES_ERROR != res && DEAL_RES_END != res) {
|
||||
res = walkPhysiPlans(pJoin->pTargets, order, walker, pContext);
|
||||
}
|
||||
break;
|
||||
}
|
||||
case QUERY_NODE_PHYSICAL_PLAN_HASH_AGG: {
|
||||
SAggPhysiNode* pAgg = (SAggPhysiNode*)pNode;
|
||||
res = walkPhysiNode((SPhysiNode*)pNode, order, walker, pContext);
|
||||
if (DEAL_RES_ERROR != res && DEAL_RES_END != res) {
|
||||
res = walkPhysiPlans(pAgg->pExprs, order, walker, pContext);
|
||||
}
|
||||
if (DEAL_RES_ERROR != res && DEAL_RES_END != res) {
|
||||
res = walkPhysiPlans(pAgg->pGroupKeys, order, walker, pContext);
|
||||
}
|
||||
if (DEAL_RES_ERROR != res && DEAL_RES_END != res) {
|
||||
res = walkPhysiPlans(pAgg->pAggFuncs, order, walker, pContext);
|
||||
}
|
||||
break;
|
||||
}
|
||||
case QUERY_NODE_PHYSICAL_PLAN_EXCHANGE: {
|
||||
SExchangePhysiNode* pExchange = (SExchangePhysiNode*)pNode;
|
||||
res = walkPhysiNode((SPhysiNode*)pNode, order, walker, pContext);
|
||||
if (DEAL_RES_ERROR != res && DEAL_RES_END != res) {
|
||||
res = walkPhysiPlans(pExchange->pSrcEndPoints, order, walker, pContext);
|
||||
}
|
||||
break;
|
||||
}
|
||||
case QUERY_NODE_PHYSICAL_PLAN_SORT:
|
||||
case QUERY_NODE_PHYSICAL_PLAN_GROUP_SORT: {
|
||||
SSortPhysiNode* pSort = (SSortPhysiNode*)pNode;
|
||||
res = walkPhysiNode((SPhysiNode*)pNode, order, walker, pContext);
|
||||
if (DEAL_RES_ERROR != res && DEAL_RES_END != res) {
|
||||
res = walkPhysiPlans(pSort->pExprs, order, walker, pContext);
|
||||
}
|
||||
if (DEAL_RES_ERROR != res && DEAL_RES_END != res) {
|
||||
res = walkPhysiPlans(pSort->pSortKeys, order, walker, pContext);
|
||||
}
|
||||
if (DEAL_RES_ERROR != res && DEAL_RES_END != res) {
|
||||
res = walkPhysiPlans(pSort->pTargets, order, walker, pContext);
|
||||
}
|
||||
break;
|
||||
}
|
||||
case QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL:
|
||||
case QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL:
|
||||
res = walkWindowPhysi((SWinodwPhysiNode*)pNode, order, walker, pContext);
|
||||
break;
|
||||
case QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION:
|
||||
case QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION:
|
||||
res = walkWindowPhysi((SWinodwPhysiNode*)pNode, order, walker, pContext);
|
||||
break;
|
||||
case QUERY_NODE_PHYSICAL_PLAN_MERGE_STATE:
|
||||
case QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE: {
|
||||
SStateWinodwPhysiNode* pState = (SStateWinodwPhysiNode*)pNode;
|
||||
res = walkWindowPhysi((SWinodwPhysiNode*)pNode, order, walker, pContext);
|
||||
if (DEAL_RES_ERROR != res && DEAL_RES_END != res) {
|
||||
res = walkPhysiPlan(pState->pStateKey, order, walker, pContext);
|
||||
}
|
||||
break;
|
||||
}
|
||||
case QUERY_NODE_PHYSICAL_PLAN_PARTITION:
|
||||
case QUERY_NODE_PHYSICAL_PLAN_STREAM_PARTITION: {
|
||||
SPartitionPhysiNode* pPart = (SPartitionPhysiNode*)pNode;
|
||||
res = walkPhysiNode((SPhysiNode*)pNode, order, walker, pContext);
|
||||
if (DEAL_RES_ERROR != res && DEAL_RES_END != res) {
|
||||
res = walkPhysiPlans(pPart->pExprs, order, walker, pContext);
|
||||
}
|
||||
if (DEAL_RES_ERROR != res && DEAL_RES_END != res) {
|
||||
res = walkPhysiPlans(pPart->pPartitionKeys, order, walker, pContext);
|
||||
}
|
||||
if (DEAL_RES_ERROR != res && DEAL_RES_END != res) {
|
||||
res = walkPhysiPlans(pPart->pTargets, order, walker, pContext);
|
||||
}
|
||||
break;
|
||||
}
|
||||
case QUERY_NODE_PHYSICAL_PLAN_DISPATCH:
|
||||
res = walkPhysiPlan((SNode*)(((SDataSinkNode*)pNode)->pInputDataBlockDesc), order, walker, pContext);
|
||||
break;
|
||||
case QUERY_NODE_PHYSICAL_PLAN_INSERT:
|
||||
res = walkPhysiPlan((SNode*)(((SDataSinkNode*)pNode)->pInputDataBlockDesc), order, walker, pContext);
|
||||
break;
|
||||
case QUERY_NODE_PHYSICAL_SUBPLAN: {
|
||||
SSubplan* pSubplan = (SSubplan*)pNode;
|
||||
res = walkPhysiPlans(pSubplan->pChildren, order, walker, pContext);
|
||||
if (DEAL_RES_ERROR != res && DEAL_RES_END != res) {
|
||||
res = walkPhysiPlan((SNode*)pSubplan->pNode, order, walker, pContext);
|
||||
}
|
||||
if (DEAL_RES_ERROR != res && DEAL_RES_END != res) {
|
||||
res = walkPhysiPlan((SNode*)pSubplan->pDataSink, order, walker, pContext);
|
||||
}
|
||||
break;
|
||||
}
|
||||
case QUERY_NODE_PHYSICAL_PLAN: {
|
||||
SQueryPlan* pPlan = (SQueryPlan*)pNode;
|
||||
if (NULL != pPlan->pSubplans) {
|
||||
// only need to walk the top-level subplans, because they will recurse to all the subplans below
|
||||
walkPhysiPlan(nodesListGetNode(pPlan->pSubplans, 0), order, walker, pContext);
|
||||
}
|
||||
break;
|
||||
}
|
||||
default:
|
||||
res = dispatchExpr(pNode, order, walker, pContext);
|
||||
break;
|
||||
}
|
||||
|
||||
return res;
|
||||
}
|
||||
|
||||
static EDealRes walkPhysiPlan(SNode* pNode, ETraversalOrder order, FNodeWalker walker, void* pContext) {
|
||||
return walkNode(pNode, order, walker, pContext, dispatchPhysiPlan);
|
||||
}
|
||||
|
||||
static EDealRes walkPhysiPlans(SNodeList* pNodeList, ETraversalOrder order, FNodeWalker walker, void* pContext) {
|
||||
SNode* node;
|
||||
FOREACH(node, pNodeList) {
|
||||
EDealRes res = walkPhysiPlan(node, order, walker, pContext);
|
||||
if (DEAL_RES_ERROR == res || DEAL_RES_END == res) {
|
||||
return res;
|
||||
}
|
||||
}
|
||||
return DEAL_RES_CONTINUE;
|
||||
}
|
||||
|
||||
void nodesWalkPhysiPlan(SNode* pNode, FNodeWalker walker, void* pContext) {
|
||||
(void)walkPhysiPlan(pNode, TRAVERSAL_PREORDER, walker, pContext);
|
||||
}
|
||||
|
|
|
@ -328,6 +328,7 @@ SNode* nodesMakeNode(ENodeType type) {
|
|||
case QUERY_NODE_DROP_SUPER_TABLE_STMT:
|
||||
return makeNode(type, sizeof(SDropSuperTableStmt));
|
||||
case QUERY_NODE_ALTER_TABLE_STMT:
|
||||
case QUERY_NODE_ALTER_SUPER_TABLE_STMT:
|
||||
return makeNode(type, sizeof(SAlterTableStmt));
|
||||
case QUERY_NODE_CREATE_USER_STMT:
|
||||
return makeNode(type, sizeof(SCreateUserStmt));
|
||||
|
|
|
@ -1,19 +1,26 @@
|
|||
|
||||
MESSAGE(STATUS "build nodes unit test")
|
||||
|
||||
# GoogleTest requires at least C++11
|
||||
SET(CMAKE_CXX_STANDARD 11)
|
||||
AUX_SOURCE_DIRECTORY(${CMAKE_CURRENT_SOURCE_DIR} SOURCE_LIST)
|
||||
|
||||
ADD_EXECUTABLE(nodesTest ${SOURCE_LIST})
|
||||
|
||||
TARGET_INCLUDE_DIRECTORIES(
|
||||
IF(NOT TD_DARWIN)
|
||||
# GoogleTest requires at least C++11
|
||||
SET(CMAKE_CXX_STANDARD 11)
|
||||
AUX_SOURCE_DIRECTORY(${CMAKE_CURRENT_SOURCE_DIR} SOURCE_LIST)
|
||||
|
||||
ADD_EXECUTABLE(nodesTest ${SOURCE_LIST})
|
||||
|
||||
TARGET_INCLUDE_DIRECTORIES(
|
||||
nodesTest
|
||||
PUBLIC "${TD_SOURCE_DIR}/include/nodes/"
|
||||
PRIVATE "${TD_SOURCE_DIR}/source/nodes/inc"
|
||||
)
|
||||
|
||||
TARGET_LINK_LIBRARIES(
|
||||
)
|
||||
|
||||
TARGET_LINK_LIBRARIES(
|
||||
nodesTest
|
||||
PUBLIC os util common nodes gtest
|
||||
)
|
||||
PUBLIC os util common nodes qcom gtest
|
||||
)
|
||||
|
||||
add_test(
|
||||
NAME nodesTest
|
||||
COMMAND nodesTest
|
||||
)
|
||||
ENDIF()
|
||||
|
|
|
@ -0,0 +1,299 @@
|
|||
/*
|
||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||
*
|
||||
* This program is free software: you can use, redistribute, AND/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3
|
||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#include <gtest/gtest.h>
|
||||
|
||||
#include "nodes.h"
|
||||
#include "plannodes.h"
|
||||
#include "querynodes.h"
|
||||
|
||||
class NodesCloneTest : public testing::Test {
|
||||
public:
|
||||
void registerCheckFunc(const std::function<void(const SNode*, const SNode*)>& func) { checkFunc_ = func; }
|
||||
|
||||
void run(const SNode* pSrc) {
|
||||
std::unique_ptr<SNode, void (*)(SNode*)> pDst(nodesCloneNode(pSrc), nodesDestroyNode);
|
||||
checkFunc_(pSrc, pDst.get());
|
||||
}
|
||||
|
||||
private:
|
||||
std::function<void(const SNode*, const SNode*)> checkFunc_;
|
||||
};
|
||||
|
||||
TEST_F(NodesCloneTest, tempTable) {
|
||||
registerCheckFunc([](const SNode* pSrc, const SNode* pDst) {
|
||||
ASSERT_EQ(nodeType(pSrc), nodeType(pDst));
|
||||
STempTableNode* pSrcNode = (STempTableNode*)pSrc;
|
||||
STempTableNode* pDstNode = (STempTableNode*)pDst;
|
||||
ASSERT_EQ(nodeType(pSrcNode->pSubquery), nodeType(pDstNode->pSubquery));
|
||||
});
|
||||
|
||||
std::unique_ptr<SNode, void (*)(SNode*)> srcNode(nullptr, nodesDestroyNode);
|
||||
|
||||
run([&]() {
|
||||
srcNode.reset(nodesMakeNode(QUERY_NODE_TEMP_TABLE));
|
||||
STempTableNode* pNode = (STempTableNode*)srcNode.get();
|
||||
pNode->pSubquery = nodesMakeNode(QUERY_NODE_SELECT_STMT);
|
||||
return srcNode.get();
|
||||
}());
|
||||
}
|
||||
|
||||
TEST_F(NodesCloneTest, joinTable) {
|
||||
registerCheckFunc([](const SNode* pSrc, const SNode* pDst) {
|
||||
ASSERT_EQ(nodeType(pSrc), nodeType(pDst));
|
||||
SJoinTableNode* pSrcNode = (SJoinTableNode*)pSrc;
|
||||
SJoinTableNode* pDstNode = (SJoinTableNode*)pDst;
|
||||
ASSERT_EQ(pSrcNode->joinType, pDstNode->joinType);
|
||||
ASSERT_EQ(nodeType(pSrcNode->pLeft), nodeType(pDstNode->pLeft));
|
||||
ASSERT_EQ(nodeType(pSrcNode->pRight), nodeType(pDstNode->pRight));
|
||||
if (NULL != pSrcNode->pOnCond) {
|
||||
ASSERT_EQ(nodeType(pSrcNode->pOnCond), nodeType(pDstNode->pOnCond));
|
||||
}
|
||||
});
|
||||
|
||||
std::unique_ptr<SNode, void (*)(SNode*)> srcNode(nullptr, nodesDestroyNode);
|
||||
|
||||
run([&]() {
|
||||
srcNode.reset(nodesMakeNode(QUERY_NODE_JOIN_TABLE));
|
||||
SJoinTableNode* pNode = (SJoinTableNode*)srcNode.get();
|
||||
pNode->joinType = JOIN_TYPE_INNER;
|
||||
pNode->pLeft = nodesMakeNode(QUERY_NODE_REAL_TABLE);
|
||||
pNode->pRight = nodesMakeNode(QUERY_NODE_REAL_TABLE);
|
||||
pNode->pOnCond = nodesMakeNode(QUERY_NODE_OPERATOR);
|
||||
return srcNode.get();
|
||||
}());
|
||||
}
|
||||
|
||||
TEST_F(NodesCloneTest, stateWindow) {
|
||||
registerCheckFunc([](const SNode* pSrc, const SNode* pDst) {
|
||||
ASSERT_EQ(nodeType(pSrc), nodeType(pDst));
|
||||
SStateWindowNode* pSrcNode = (SStateWindowNode*)pSrc;
|
||||
SStateWindowNode* pDstNode = (SStateWindowNode*)pDst;
|
||||
ASSERT_EQ(nodeType(pSrcNode->pCol), nodeType(pDstNode->pCol));
|
||||
ASSERT_EQ(nodeType(pSrcNode->pExpr), nodeType(pDstNode->pExpr));
|
||||
});
|
||||
|
||||
std::unique_ptr<SNode, void (*)(SNode*)> srcNode(nullptr, nodesDestroyNode);
|
||||
|
||||
run([&]() {
|
||||
srcNode.reset(nodesMakeNode(QUERY_NODE_STATE_WINDOW));
|
||||
SStateWindowNode* pNode = (SStateWindowNode*)srcNode.get();
|
||||
pNode->pCol = nodesMakeNode(QUERY_NODE_COLUMN);
|
||||
pNode->pExpr = nodesMakeNode(QUERY_NODE_OPERATOR);
|
||||
return srcNode.get();
|
||||
}());
|
||||
}
|
||||
|
||||
TEST_F(NodesCloneTest, sessionWindow) {
|
||||
registerCheckFunc([](const SNode* pSrc, const SNode* pDst) {
|
||||
ASSERT_EQ(nodeType(pSrc), nodeType(pDst));
|
||||
SSessionWindowNode* pSrcNode = (SSessionWindowNode*)pSrc;
|
||||
SSessionWindowNode* pDstNode = (SSessionWindowNode*)pDst;
|
||||
ASSERT_EQ(nodeType(pSrcNode->pCol), nodeType(pDstNode->pCol));
|
||||
ASSERT_EQ(nodeType(pSrcNode->pGap), nodeType(pDstNode->pGap));
|
||||
});
|
||||
|
||||
std::unique_ptr<SNode, void (*)(SNode*)> srcNode(nullptr, nodesDestroyNode);
|
||||
|
||||
run([&]() {
|
||||
srcNode.reset(nodesMakeNode(QUERY_NODE_SESSION_WINDOW));
|
||||
SSessionWindowNode* pNode = (SSessionWindowNode*)srcNode.get();
|
||||
pNode->pCol = (SColumnNode*)nodesMakeNode(QUERY_NODE_COLUMN);
|
||||
pNode->pGap = (SValueNode*)nodesMakeNode(QUERY_NODE_VALUE);
|
||||
return srcNode.get();
|
||||
}());
|
||||
}
|
||||
|
||||
TEST_F(NodesCloneTest, intervalWindow) {
|
||||
registerCheckFunc([](const SNode* pSrc, const SNode* pDst) {
|
||||
ASSERT_EQ(nodeType(pSrc), nodeType(pDst));
|
||||
SIntervalWindowNode* pSrcNode = (SIntervalWindowNode*)pSrc;
|
||||
SIntervalWindowNode* pDstNode = (SIntervalWindowNode*)pDst;
|
||||
ASSERT_EQ(nodeType(pSrcNode->pInterval), nodeType(pDstNode->pInterval));
|
||||
if (NULL != pSrcNode->pOffset) {
|
||||
ASSERT_EQ(nodeType(pSrcNode->pOffset), nodeType(pDstNode->pOffset));
|
||||
}
|
||||
if (NULL != pSrcNode->pSliding) {
|
||||
ASSERT_EQ(nodeType(pSrcNode->pSliding), nodeType(pDstNode->pSliding));
|
||||
}
|
||||
if (NULL != pSrcNode->pFill) {
|
||||
ASSERT_EQ(nodeType(pSrcNode->pFill), nodeType(pDstNode->pFill));
|
||||
}
|
||||
});
|
||||
|
||||
std::unique_ptr<SNode, void (*)(SNode*)> srcNode(nullptr, nodesDestroyNode);
|
||||
|
||||
run([&]() {
|
||||
srcNode.reset(nodesMakeNode(QUERY_NODE_INTERVAL_WINDOW));
|
||||
SIntervalWindowNode* pNode = (SIntervalWindowNode*)srcNode.get();
|
||||
pNode->pInterval = nodesMakeNode(QUERY_NODE_VALUE);
|
||||
pNode->pOffset = nodesMakeNode(QUERY_NODE_VALUE);
|
||||
pNode->pSliding = nodesMakeNode(QUERY_NODE_VALUE);
|
||||
pNode->pFill = nodesMakeNode(QUERY_NODE_FILL);
|
||||
return srcNode.get();
|
||||
}());
|
||||
}
|
||||
|
||||
TEST_F(NodesCloneTest, fill) {
|
||||
registerCheckFunc([](const SNode* pSrc, const SNode* pDst) {
|
||||
ASSERT_EQ(nodeType(pSrc), nodeType(pDst));
|
||||
SFillNode* pSrcNode = (SFillNode*)pSrc;
|
||||
SFillNode* pDstNode = (SFillNode*)pDst;
|
||||
ASSERT_EQ(pSrcNode->mode, pDstNode->mode);
|
||||
if (NULL != pSrcNode->pValues) {
|
||||
ASSERT_EQ(nodeType(pSrcNode->pValues), nodeType(pDstNode->pValues));
|
||||
}
|
||||
ASSERT_EQ(nodeType(pSrcNode->pWStartTs), nodeType(pDstNode->pWStartTs));
|
||||
ASSERT_EQ(pSrcNode->timeRange.skey, pDstNode->timeRange.skey);
|
||||
ASSERT_EQ(pSrcNode->timeRange.ekey, pDstNode->timeRange.ekey);
|
||||
});
|
||||
|
||||
std::unique_ptr<SNode, void (*)(SNode*)> srcNode(nullptr, nodesDestroyNode);
|
||||
|
||||
run([&]() {
|
||||
srcNode.reset(nodesMakeNode(QUERY_NODE_FILL));
|
||||
SFillNode* pNode = (SFillNode*)srcNode.get();
|
||||
pNode->mode = FILL_MODE_VALUE;
|
||||
pNode->pValues = nodesMakeNode(QUERY_NODE_NODE_LIST);
|
||||
pNode->pWStartTs = nodesMakeNode(QUERY_NODE_COLUMN);
|
||||
pNode->timeRange.skey = 1666756692907;
|
||||
pNode->timeRange.ekey = 1666756699907;
|
||||
return srcNode.get();
|
||||
}());
|
||||
}
|
||||
|
||||
TEST_F(NodesCloneTest, logicSubplan) {
|
||||
registerCheckFunc([](const SNode* pSrc, const SNode* pDst) {
|
||||
ASSERT_EQ(nodeType(pSrc), nodeType(pDst));
|
||||
SLogicSubplan* pSrcNode = (SLogicSubplan*)pSrc;
|
||||
SLogicSubplan* pDstNode = (SLogicSubplan*)pDst;
|
||||
ASSERT_EQ(pSrcNode->subplanType, pDstNode->subplanType);
|
||||
ASSERT_EQ(pSrcNode->level, pDstNode->level);
|
||||
ASSERT_EQ(pSrcNode->splitFlag, pDstNode->splitFlag);
|
||||
ASSERT_EQ(pSrcNode->numOfComputeNodes, pDstNode->numOfComputeNodes);
|
||||
});
|
||||
|
||||
std::unique_ptr<SNode, void (*)(SNode*)> srcNode(nullptr, nodesDestroyNode);
|
||||
|
||||
run([&]() {
|
||||
srcNode.reset(nodesMakeNode(QUERY_NODE_LOGIC_SUBPLAN));
|
||||
SLogicSubplan* pNode = (SLogicSubplan*)srcNode.get();
|
||||
return srcNode.get();
|
||||
}());
|
||||
}
|
||||
|
||||
TEST_F(NodesCloneTest, physiScan) {
|
||||
registerCheckFunc([](const SNode* pSrc, const SNode* pDst) {
|
||||
ASSERT_EQ(nodeType(pSrc), nodeType(pDst));
|
||||
STagScanPhysiNode* pSrcNode = (STagScanPhysiNode*)pSrc;
|
||||
STagScanPhysiNode* pDstNode = (STagScanPhysiNode*)pDst;
|
||||
ASSERT_EQ(pSrcNode->uid, pDstNode->uid);
|
||||
ASSERT_EQ(pSrcNode->suid, pDstNode->suid);
|
||||
ASSERT_EQ(pSrcNode->tableType, pDstNode->tableType);
|
||||
});
|
||||
|
||||
std::unique_ptr<SNode, void (*)(SNode*)> srcNode(nullptr, nodesDestroyNode);
|
||||
|
||||
run([&]() {
|
||||
srcNode.reset(nodesMakeNode(QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN));
|
||||
STagScanPhysiNode* pNode = (STagScanPhysiNode*)srcNode.get();
|
||||
return srcNode.get();
|
||||
}());
|
||||
}
|
||||
|
||||
TEST_F(NodesCloneTest, physiSystemTableScan) {
|
||||
registerCheckFunc([](const SNode* pSrc, const SNode* pDst) {
|
||||
ASSERT_EQ(nodeType(pSrc), nodeType(pDst));
|
||||
SSystemTableScanPhysiNode* pSrcNode = (SSystemTableScanPhysiNode*)pSrc;
|
||||
SSystemTableScanPhysiNode* pDstNode = (SSystemTableScanPhysiNode*)pDst;
|
||||
ASSERT_EQ(pSrcNode->showRewrite, pDstNode->showRewrite);
|
||||
ASSERT_EQ(pSrcNode->accountId, pDstNode->accountId);
|
||||
ASSERT_EQ(pSrcNode->sysInfo, pDstNode->sysInfo);
|
||||
});
|
||||
|
||||
std::unique_ptr<SNode, void (*)(SNode*)> srcNode(nullptr, nodesDestroyNode);
|
||||
|
||||
run([&]() {
|
||||
srcNode.reset(nodesMakeNode(QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN));
|
||||
SSystemTableScanPhysiNode* pNode = (SSystemTableScanPhysiNode*)srcNode.get();
|
||||
return srcNode.get();
|
||||
}());
|
||||
}
|
||||
|
||||
TEST_F(NodesCloneTest, physiStreamSemiSessionWinodw) {
|
||||
registerCheckFunc([](const SNode* pSrc, const SNode* pDst) {
|
||||
ASSERT_EQ(nodeType(pSrc), nodeType(pDst));
|
||||
SStreamSemiSessionWinodwPhysiNode* pSrcNode = (SStreamSemiSessionWinodwPhysiNode*)pSrc;
|
||||
SStreamSemiSessionWinodwPhysiNode* pDstNode = (SStreamSemiSessionWinodwPhysiNode*)pDst;
|
||||
ASSERT_EQ(pSrcNode->gap, pDstNode->gap);
|
||||
});
|
||||
|
||||
std::unique_ptr<SNode, void (*)(SNode*)> srcNode(nullptr, nodesDestroyNode);
|
||||
|
||||
run([&]() {
|
||||
srcNode.reset(nodesMakeNode(QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION));
|
||||
SStreamSemiSessionWinodwPhysiNode* pNode = (SStreamSemiSessionWinodwPhysiNode*)srcNode.get();
|
||||
return srcNode.get();
|
||||
}());
|
||||
}
|
||||
|
||||
TEST_F(NodesCloneTest, physiStreamFinalSessionWinodw) {
|
||||
registerCheckFunc([](const SNode* pSrc, const SNode* pDst) {
|
||||
ASSERT_EQ(nodeType(pSrc), nodeType(pDst));
|
||||
SStreamFinalSessionWinodwPhysiNode* pSrcNode = (SStreamFinalSessionWinodwPhysiNode*)pSrc;
|
||||
SStreamFinalSessionWinodwPhysiNode* pDstNode = (SStreamFinalSessionWinodwPhysiNode*)pDst;
|
||||
ASSERT_EQ(pSrcNode->gap, pDstNode->gap);
|
||||
});
|
||||
|
||||
std::unique_ptr<SNode, void (*)(SNode*)> srcNode(nullptr, nodesDestroyNode);
|
||||
|
||||
run([&]() {
|
||||
srcNode.reset(nodesMakeNode(QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION));
|
||||
SStreamFinalSessionWinodwPhysiNode* pNode = (SStreamFinalSessionWinodwPhysiNode*)srcNode.get();
|
||||
return srcNode.get();
|
||||
}());
|
||||
}
|
||||
|
||||
TEST_F(NodesCloneTest, physiStreamPartition) {
|
||||
registerCheckFunc([](const SNode* pSrc, const SNode* pDst) {
|
||||
ASSERT_EQ(nodeType(pSrc), nodeType(pDst));
|
||||
SStreamPartitionPhysiNode* pSrcNode = (SStreamPartitionPhysiNode*)pSrc;
|
||||
SStreamPartitionPhysiNode* pDstNode = (SStreamPartitionPhysiNode*)pDst;
|
||||
});
|
||||
|
||||
std::unique_ptr<SNode, void (*)(SNode*)> srcNode(nullptr, nodesDestroyNode);
|
||||
|
||||
run([&]() {
|
||||
srcNode.reset(nodesMakeNode(QUERY_NODE_PHYSICAL_PLAN_STREAM_PARTITION));
|
||||
SStreamPartitionPhysiNode* pNode = (SStreamPartitionPhysiNode*)srcNode.get();
|
||||
return srcNode.get();
|
||||
}());
|
||||
}
|
||||
|
||||
TEST_F(NodesCloneTest, physiPartition) {
|
||||
registerCheckFunc([](const SNode* pSrc, const SNode* pDst) {
|
||||
ASSERT_EQ(nodeType(pSrc), nodeType(pDst));
|
||||
SPartitionPhysiNode* pSrcNode = (SPartitionPhysiNode*)pSrc;
|
||||
SPartitionPhysiNode* pDstNode = (SPartitionPhysiNode*)pDst;
|
||||
});
|
||||
|
||||
std::unique_ptr<SNode, void (*)(SNode*)> srcNode(nullptr, nodesDestroyNode);
|
||||
|
||||
run([&]() {
|
||||
srcNode.reset(nodesMakeNode(QUERY_NODE_PHYSICAL_PLAN_PARTITION));
|
||||
SPartitionPhysiNode* pNode = (SPartitionPhysiNode*)srcNode.get();
|
||||
return srcNode.get();
|
||||
}());
|
||||
}
|
|
@ -518,6 +518,7 @@ stream_options(A) ::= stream_options(B) TRIGGER WINDOW_CLOSE.
|
|||
stream_options(A) ::= stream_options(B) TRIGGER MAX_DELAY duration_literal(C). { ((SStreamOptions*)B)->triggerType = STREAM_TRIGGER_MAX_DELAY; ((SStreamOptions*)B)->pDelay = releaseRawExprNode(pCxt, C); A = B; }
|
||||
stream_options(A) ::= stream_options(B) WATERMARK duration_literal(C). { ((SStreamOptions*)B)->pWatermark = releaseRawExprNode(pCxt, C); A = B; }
|
||||
stream_options(A) ::= stream_options(B) IGNORE EXPIRED NK_INTEGER(C). { ((SStreamOptions*)B)->ignoreExpired = taosStr2Int8(C.z, NULL, 10); A = B; }
|
||||
stream_options(A) ::= stream_options(B) FILL_HISTORY NK_INTEGER(C). { ((SStreamOptions*)B)->fillHistory = taosStr2Int8(C.z, NULL, 10); A = B; }
|
||||
|
||||
subtable_opt(A) ::= . { A = NULL; }
|
||||
subtable_opt(A) ::= SUBTABLE NK_LP expression(B) NK_RP. { A = releaseRawExprNode(pCxt, B); }
|
||||
|
|
|
@ -1702,6 +1702,7 @@ SNode* createStreamOptions(SAstCreateContext* pCxt) {
|
|||
SStreamOptions* pOptions = (SStreamOptions*)nodesMakeNode(QUERY_NODE_STREAM_OPTIONS);
|
||||
CHECK_OUT_OF_MEM(pOptions);
|
||||
pOptions->triggerType = STREAM_TRIGGER_AT_ONCE;
|
||||
pOptions->fillHistory = STREAM_DEFAULT_FILL_HISTORY;
|
||||
pOptions->ignoreExpired = STREAM_DEFAULT_IGNORE_EXPIRED;
|
||||
return (SNode*)pOptions;
|
||||
}
|
||||
|
|
|
@ -92,6 +92,7 @@ static SKeyword keywordTable[] = {
|
|||
{"EVERY", TK_EVERY},
|
||||
{"FILE", TK_FILE},
|
||||
{"FILL", TK_FILL},
|
||||
{"FILL_HISTORY", TK_FILL_HISTORY},
|
||||
{"FIRST", TK_FIRST},
|
||||
{"FLOAT", TK_FLOAT},
|
||||
{"FLUSH", TK_FLUSH},
|
||||
|
|
|
@ -5542,6 +5542,7 @@ static int32_t buildCreateStreamReq(STranslateContext* pCxt, SCreateStreamStmt*
|
|||
pReq->triggerType = pStmt->pOptions->triggerType;
|
||||
pReq->maxDelay = (NULL != pStmt->pOptions->pDelay ? ((SValueNode*)pStmt->pOptions->pDelay)->datum.i : 0);
|
||||
pReq->watermark = (NULL != pStmt->pOptions->pWatermark ? ((SValueNode*)pStmt->pOptions->pWatermark)->datum.i : 0);
|
||||
pReq->fillHistory = pStmt->pOptions->fillHistory;
|
||||
pReq->igExpired = pStmt->pOptions->ignoreExpired;
|
||||
columnDefNodeToField(pStmt->pTags, &pReq->pTags);
|
||||
pReq->numOfTags = LIST_LENGTH(pStmt->pTags);
|
||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -576,10 +576,10 @@ TEST_F(ParserInitialCTest, createStream) {
|
|||
memset(&expect, 0, sizeof(SCMCreateStreamReq));
|
||||
};
|
||||
|
||||
auto setCreateStreamReqFunc = [&](const char* pStream, const char* pSrcDb, const char* pSql, const char* pDstStb,
|
||||
int8_t igExists = 0, int8_t triggerType = STREAM_TRIGGER_AT_ONCE,
|
||||
int64_t maxDelay = 0, int64_t watermark = 0,
|
||||
int8_t igExpired = STREAM_DEFAULT_IGNORE_EXPIRED) {
|
||||
auto setCreateStreamReq = [&](const char* pStream, const char* pSrcDb, const char* pSql, const char* pDstStb,
|
||||
int8_t igExists = 0, int8_t triggerType = STREAM_TRIGGER_AT_ONCE, int64_t maxDelay = 0,
|
||||
int64_t watermark = 0, int8_t igExpired = STREAM_DEFAULT_IGNORE_EXPIRED,
|
||||
int8_t fillHistory = STREAM_DEFAULT_FILL_HISTORY) {
|
||||
snprintf(expect.name, sizeof(expect.name), "0.%s", pStream);
|
||||
snprintf(expect.sourceDB, sizeof(expect.sourceDB), "0.%s", pSrcDb);
|
||||
snprintf(expect.targetStbFullName, sizeof(expect.targetStbFullName), "0.test.%s", pDstStb);
|
||||
|
@ -588,6 +588,7 @@ TEST_F(ParserInitialCTest, createStream) {
|
|||
expect.triggerType = triggerType;
|
||||
expect.maxDelay = maxDelay;
|
||||
expect.watermark = watermark;
|
||||
expect.fillHistory = fillHistory;
|
||||
expect.igExpired = igExpired;
|
||||
};
|
||||
|
||||
|
@ -619,6 +620,7 @@ TEST_F(ParserInitialCTest, createStream) {
|
|||
ASSERT_EQ(req.triggerType, expect.triggerType);
|
||||
ASSERT_EQ(req.maxDelay, expect.maxDelay);
|
||||
ASSERT_EQ(req.watermark, expect.watermark);
|
||||
ASSERT_EQ(req.fillHistory, expect.fillHistory);
|
||||
ASSERT_EQ(req.igExpired, expect.igExpired);
|
||||
ASSERT_EQ(req.numOfTags, expect.numOfTags);
|
||||
if (expect.numOfTags > 0) {
|
||||
|
@ -636,24 +638,24 @@ TEST_F(ParserInitialCTest, createStream) {
|
|||
tFreeSCMCreateStreamReq(&req);
|
||||
});
|
||||
|
||||
setCreateStreamReqFunc("s1", "test", "create stream s1 into st1 as select count(*) from t1 interval(10s)", "st1");
|
||||
setCreateStreamReq("s1", "test", "create stream s1 into st1 as select count(*) from t1 interval(10s)", "st1");
|
||||
run("CREATE STREAM s1 INTO st1 AS SELECT COUNT(*) FROM t1 INTERVAL(10S)");
|
||||
clearCreateStreamReq();
|
||||
|
||||
setCreateStreamReqFunc("s1", "test",
|
||||
"create stream if not exists s1 trigger max_delay 20s watermark 10s ignore expired 0 into st1 "
|
||||
"as select count(*) from t1 interval(10s)",
|
||||
"st1", 1, STREAM_TRIGGER_MAX_DELAY, 20 * MILLISECOND_PER_SECOND, 10 * MILLISECOND_PER_SECOND,
|
||||
0);
|
||||
run("CREATE STREAM IF NOT EXISTS s1 TRIGGER MAX_DELAY 20s WATERMARK 10s IGNORE EXPIRED 0 INTO st1 AS SELECT COUNT(*) "
|
||||
setCreateStreamReq(
|
||||
"s1", "test",
|
||||
"create stream if not exists s1 trigger max_delay 20s watermark 10s ignore expired 0 fill_history 1 into st1 "
|
||||
"as select count(*) from t1 interval(10s)",
|
||||
"st1", 1, STREAM_TRIGGER_MAX_DELAY, 20 * MILLISECOND_PER_SECOND, 10 * MILLISECOND_PER_SECOND, 0, 1);
|
||||
run("CREATE STREAM IF NOT EXISTS s1 TRIGGER MAX_DELAY 20s WATERMARK 10s IGNORE EXPIRED 0 FILL_HISTORY 1 INTO st1 AS "
|
||||
"SELECT COUNT(*) "
|
||||
"FROM t1 INTERVAL(10S)");
|
||||
clearCreateStreamReq();
|
||||
|
||||
setCreateStreamReqFunc(
|
||||
"s1", "test",
|
||||
"create stream s1 into st3 tags(tname varchar(10), id int) subtable(concat('new-', tname)) as "
|
||||
"select _wstart wstart, count(*) cnt from st1 partition by tbname tname, tag1 id interval(10s)",
|
||||
"st3");
|
||||
setCreateStreamReq("s1", "test",
|
||||
"create stream s1 into st3 tags(tname varchar(10), id int) subtable(concat('new-', tname)) as "
|
||||
"select _wstart wstart, count(*) cnt from st1 partition by tbname tname, tag1 id interval(10s)",
|
||||
"st3");
|
||||
addTag("tname", TSDB_DATA_TYPE_VARCHAR, 10 + VARSTR_HEADER_SIZE);
|
||||
addTag("id", TSDB_DATA_TYPE_INT);
|
||||
run("CREATE STREAM s1 INTO st3 TAGS(tname VARCHAR(10), id INT) SUBTABLE(CONCAT('new-', tname)) "
|
||||
|
|
|
@ -295,6 +295,10 @@ class ParserTestBaseImpl {
|
|||
char* pStr = NULL;
|
||||
int32_t len = 0;
|
||||
DO_WITH_THROW(nodesNodeToString, pRoot, false, &pStr, &len)
|
||||
// check toObject
|
||||
SNode* pCopy = NULL;
|
||||
DO_WITH_THROW(nodesStringToNode, pStr, &pCopy)
|
||||
nodesDestroyNode(pCopy);
|
||||
string str(pStr);
|
||||
taosMemoryFreeClear(pStr);
|
||||
return str;
|
||||
|
|
|
@ -339,29 +339,6 @@ static int32_t createScanLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect
|
|||
return code;
|
||||
}
|
||||
|
||||
static int32_t createColumnByLastRow(SNodeList* pFuncs, SNodeList** pOutput) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
SNodeList* pCols = NULL;
|
||||
SNode* pFunc = NULL;
|
||||
FOREACH(pFunc, pFuncs) {
|
||||
SFunctionNode* pLastRow = (SFunctionNode*)pFunc;
|
||||
SColumnNode* pCol = (SColumnNode*)nodesListGetNode(pLastRow->pParameterList, 0);
|
||||
snprintf(pCol->colName, sizeof(pCol->colName), "%s", pLastRow->node.aliasName);
|
||||
snprintf(pCol->node.aliasName, sizeof(pCol->colName), "%s", pLastRow->node.aliasName);
|
||||
NODES_CLEAR_LIST(pLastRow->pParameterList);
|
||||
code = nodesListMakeStrictAppend(&pCols, (SNode*)pCol);
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
*pOutput = pCols;
|
||||
} else {
|
||||
nodesDestroyList(pCols);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t createSubqueryLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect, STempTableNode* pTable,
|
||||
SLogicNode** pLogicNode) {
|
||||
return createQueryLogicNode(pCxt, pTable->pSubquery, pLogicNode);
|
||||
|
@ -491,20 +468,6 @@ static SNode* createGroupingSetNode(SNode* pExpr) {
|
|||
return (SNode*)pGroupingSet;
|
||||
}
|
||||
|
||||
static int32_t createGroupKeysFromPartKeys(SNodeList* pPartKeys, SNodeList** pOutput) {
|
||||
SNodeList* pGroupKeys = NULL;
|
||||
SNode* pPartKey = NULL;
|
||||
FOREACH(pPartKey, pPartKeys) {
|
||||
int32_t code = nodesListMakeStrictAppend(&pGroupKeys, createGroupingSetNode(pPartKey));
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
nodesDestroyList(pGroupKeys);
|
||||
return code;
|
||||
}
|
||||
}
|
||||
*pOutput = pGroupKeys;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static EGroupAction getGroupAction(SLogicPlanContext* pCxt, SSelectStmt* pSelect) {
|
||||
return (pCxt->pPlanCxt->streamQuery || NULL != pSelect->pLimit || NULL != pSelect->pSlimit) ? GROUP_ACTION_KEEP
|
||||
: GROUP_ACTION_NONE;
|
||||
|
|
|
@ -596,18 +596,6 @@ static int32_t pushDownCondOptPushCondToOnCond(SOptimizeContext* pCxt, SJoinLogi
|
|||
return pushDownCondOptAppendCond(&pJoin->pOnConditions, pCond);
|
||||
}
|
||||
|
||||
static int32_t pushDownCondOptPushCondToScan(SOptimizeContext* pCxt, SScanLogicNode* pScan, SNode** pCond) {
|
||||
return pushDownCondOptAppendCond(&pScan->node.pConditions, pCond);
|
||||
}
|
||||
|
||||
static int32_t pushDownCondOptPushCondToProject(SOptimizeContext* pCxt, SProjectLogicNode* pProject, SNode** pCond) {
|
||||
return pushDownCondOptAppendCond(&pProject->node.pConditions, pCond);
|
||||
}
|
||||
|
||||
static int32_t pushDownCondOptPushCondToJoin(SOptimizeContext* pCxt, SJoinLogicNode* pJoin, SNode** pCond) {
|
||||
return pushDownCondOptAppendCond(&pJoin->node.pConditions, pCond);
|
||||
}
|
||||
|
||||
static int32_t pushDownCondOptPushCondToChild(SOptimizeContext* pCxt, SLogicNode* pChild, SNode** pCond) {
|
||||
return pushDownCondOptAppendCond(&pChild->pConditions, pCond);
|
||||
}
|
||||
|
@ -1201,40 +1189,6 @@ static bool smaIndexOptMayBeOptimized(SLogicNode* pNode) {
|
|||
return true;
|
||||
}
|
||||
|
||||
static int32_t smaIndexOptCreateMerge(SLogicNode* pChild, SNodeList* pMergeKeys, SNodeList* pTargets,
|
||||
SLogicNode** pOutput) {
|
||||
SMergeLogicNode* pMerge = (SMergeLogicNode*)nodesMakeNode(QUERY_NODE_LOGIC_PLAN_MERGE);
|
||||
if (NULL == pMerge) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
pMerge->node.precision = pChild->precision;
|
||||
pMerge->numOfChannels = 2;
|
||||
pMerge->pMergeKeys = pMergeKeys;
|
||||
pMerge->node.pTargets = pTargets;
|
||||
pMerge->pInputs = nodesCloneList(pChild->pTargets);
|
||||
if (NULL == pMerge->pInputs) {
|
||||
nodesDestroyNode((SNode*)pMerge);
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
*pOutput = (SLogicNode*)pMerge;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t smaIndexOptRecombinationNode(SLogicSubplan* pLogicSubplan, SLogicNode* pInterval, SLogicNode* pMerge,
|
||||
SLogicNode* pSmaScan) {
|
||||
int32_t code = nodesListMakeAppend(&pMerge->pChildren, (SNode*)pInterval);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = nodesListMakeAppend(&pMerge->pChildren, (SNode*)pSmaScan);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = replaceLogicNode(pLogicSubplan, pInterval, pMerge);
|
||||
pSmaScan->pParent = pMerge;
|
||||
pInterval->pParent = pMerge;
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t smaIndexOptCreateSmaScan(SScanLogicNode* pScan, STableIndexInfo* pIndex, SNodeList* pCols,
|
||||
SLogicNode** pOutput) {
|
||||
SScanLogicNode* pSmaScan = (SScanLogicNode*)nodesMakeNode(QUERY_NODE_LOGIC_PLAN_SCAN);
|
||||
|
|
|
@ -1407,7 +1407,7 @@ typedef struct SQnodeSplitInfo {
|
|||
static bool qndSplFindSplitNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SLogicNode* pNode,
|
||||
SQnodeSplitInfo* pInfo) {
|
||||
if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pNode) && NULL != pNode->pParent &&
|
||||
((SScanLogicNode*)pNode)->scanSeq[0] < 1 && ((SScanLogicNode*)pNode)->scanSeq[1] < 1) {
|
||||
((SScanLogicNode*)pNode)->scanSeq[0] <= 1 && ((SScanLogicNode*)pNode)->scanSeq[1] <= 1) {
|
||||
pInfo->pSplitNode = pNode;
|
||||
pInfo->pSubplan = pSubplan;
|
||||
return true;
|
||||
|
|
|
@ -142,15 +142,6 @@ int32_t qMsgToSubplan(const char* pStr, int32_t len, SSubplan** pSubplan) {
|
|||
return nodesMsgToNode(pStr, len, (SNode**)pSubplan);
|
||||
}
|
||||
|
||||
char* qQueryPlanToString(const SQueryPlan* pPlan) {
|
||||
char* pStr = NULL;
|
||||
int32_t len = 0;
|
||||
if (TSDB_CODE_SUCCESS != nodesNodeToString((SNode*)pPlan, false, &pStr, &len)) {
|
||||
return NULL;
|
||||
}
|
||||
return pStr;
|
||||
}
|
||||
|
||||
SQueryPlan* qStringToQueryPlan(const char* pStr) {
|
||||
SQueryPlan* pPlan = NULL;
|
||||
if (TSDB_CODE_SUCCESS != nodesStringToNode(pStr, (SNode**)&pPlan)) {
|
||||
|
|
|
@ -466,8 +466,13 @@ class PlannerTestBaseImpl {
|
|||
char* pStr = NULL;
|
||||
int32_t len = 0;
|
||||
DO_WITH_THROW(nodesNodeToString, pRoot, false, &pStr, &len)
|
||||
// check toObject
|
||||
SNode* pCopy = NULL;
|
||||
DO_WITH_THROW(nodesStringToNode, pStr, &pCopy)
|
||||
nodesDestroyNode(pCopy);
|
||||
string str(pStr);
|
||||
taosMemoryFreeClear(pStr);
|
||||
|
||||
return str;
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue