Merge pull request #17675 from taosdata/fix/3.0_bugfix_wxy

enh: add unit test and delete useless code
This commit is contained in:
Shengliang Guan 2022-10-28 13:29:26 +08:00 committed by GitHub
commit 800ff0b957
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
26 changed files with 3250 additions and 3179 deletions

View File

@ -1726,6 +1726,9 @@ typedef struct {
#define STREAM_TRIGGER_WINDOW_CLOSE 2
#define STREAM_TRIGGER_MAX_DELAY 3
#define STREAM_DEFAULT_IGNORE_EXPIRED 0
#define STREAM_FILL_HISTORY_ON 1
#define STREAM_FILL_HISTORY_OFF 0
#define STREAM_DEFAULT_FILL_HISTORY STREAM_FILL_HISTORY_OFF
typedef struct {
char name[TSDB_STREAM_FNAME_LEN];

View File

@ -203,135 +203,136 @@
#define TK_WINDOW_CLOSE 185
#define TK_IGNORE 186
#define TK_EXPIRED 187
#define TK_SUBTABLE 188
#define TK_KILL 189
#define TK_CONNECTION 190
#define TK_TRANSACTION 191
#define TK_BALANCE 192
#define TK_VGROUP 193
#define TK_MERGE 194
#define TK_REDISTRIBUTE 195
#define TK_SPLIT 196
#define TK_DELETE 197
#define TK_INSERT 198
#define TK_NULL 199
#define TK_NK_QUESTION 200
#define TK_NK_ARROW 201
#define TK_ROWTS 202
#define TK_TBNAME 203
#define TK_QSTART 204
#define TK_QEND 205
#define TK_QDURATION 206
#define TK_WSTART 207
#define TK_WEND 208
#define TK_WDURATION 209
#define TK_IROWTS 210
#define TK_QTAGS 211
#define TK_CAST 212
#define TK_NOW 213
#define TK_TODAY 214
#define TK_TIMEZONE 215
#define TK_CLIENT_VERSION 216
#define TK_SERVER_VERSION 217
#define TK_SERVER_STATUS 218
#define TK_CURRENT_USER 219
#define TK_COUNT 220
#define TK_LAST_ROW 221
#define TK_CASE 222
#define TK_END 223
#define TK_WHEN 224
#define TK_THEN 225
#define TK_ELSE 226
#define TK_BETWEEN 227
#define TK_IS 228
#define TK_NK_LT 229
#define TK_NK_GT 230
#define TK_NK_LE 231
#define TK_NK_GE 232
#define TK_NK_NE 233
#define TK_MATCH 234
#define TK_NMATCH 235
#define TK_CONTAINS 236
#define TK_IN 237
#define TK_JOIN 238
#define TK_INNER 239
#define TK_SELECT 240
#define TK_DISTINCT 241
#define TK_WHERE 242
#define TK_PARTITION 243
#define TK_BY 244
#define TK_SESSION 245
#define TK_STATE_WINDOW 246
#define TK_SLIDING 247
#define TK_FILL 248
#define TK_VALUE 249
#define TK_NONE 250
#define TK_PREV 251
#define TK_LINEAR 252
#define TK_NEXT 253
#define TK_HAVING 254
#define TK_RANGE 255
#define TK_EVERY 256
#define TK_ORDER 257
#define TK_SLIMIT 258
#define TK_SOFFSET 259
#define TK_LIMIT 260
#define TK_OFFSET 261
#define TK_ASC 262
#define TK_NULLS 263
#define TK_ABORT 264
#define TK_AFTER 265
#define TK_ATTACH 266
#define TK_BEFORE 267
#define TK_BEGIN 268
#define TK_BITAND 269
#define TK_BITNOT 270
#define TK_BITOR 271
#define TK_BLOCKS 272
#define TK_CHANGE 273
#define TK_COMMA 274
#define TK_COMPACT 275
#define TK_CONCAT 276
#define TK_CONFLICT 277
#define TK_COPY 278
#define TK_DEFERRED 279
#define TK_DELIMITERS 280
#define TK_DETACH 281
#define TK_DIVIDE 282
#define TK_DOT 283
#define TK_EACH 284
#define TK_FAIL 285
#define TK_FILE 286
#define TK_FOR 287
#define TK_GLOB 288
#define TK_ID 289
#define TK_IMMEDIATE 290
#define TK_IMPORT 291
#define TK_INITIALLY 292
#define TK_INSTEAD 293
#define TK_ISNULL 294
#define TK_KEY 295
#define TK_MODULES 296
#define TK_NK_BITNOT 297
#define TK_NK_SEMI 298
#define TK_NOTNULL 299
#define TK_OF 300
#define TK_PLUS 301
#define TK_PRIVILEGE 302
#define TK_RAISE 303
#define TK_REPLACE 304
#define TK_RESTRICT 305
#define TK_ROW 306
#define TK_SEMI 307
#define TK_STAR 308
#define TK_STATEMENT 309
#define TK_STRING 310
#define TK_TIMES 311
#define TK_UPDATE 312
#define TK_VALUES 313
#define TK_VARIABLE 314
#define TK_VIEW 315
#define TK_WAL 316
#define TK_FILL_HISTORY 188
#define TK_SUBTABLE 189
#define TK_KILL 190
#define TK_CONNECTION 191
#define TK_TRANSACTION 192
#define TK_BALANCE 193
#define TK_VGROUP 194
#define TK_MERGE 195
#define TK_REDISTRIBUTE 196
#define TK_SPLIT 197
#define TK_DELETE 198
#define TK_INSERT 199
#define TK_NULL 200
#define TK_NK_QUESTION 201
#define TK_NK_ARROW 202
#define TK_ROWTS 203
#define TK_TBNAME 204
#define TK_QSTART 205
#define TK_QEND 206
#define TK_QDURATION 207
#define TK_WSTART 208
#define TK_WEND 209
#define TK_WDURATION 210
#define TK_IROWTS 211
#define TK_QTAGS 212
#define TK_CAST 213
#define TK_NOW 214
#define TK_TODAY 215
#define TK_TIMEZONE 216
#define TK_CLIENT_VERSION 217
#define TK_SERVER_VERSION 218
#define TK_SERVER_STATUS 219
#define TK_CURRENT_USER 220
#define TK_COUNT 221
#define TK_LAST_ROW 222
#define TK_CASE 223
#define TK_END 224
#define TK_WHEN 225
#define TK_THEN 226
#define TK_ELSE 227
#define TK_BETWEEN 228
#define TK_IS 229
#define TK_NK_LT 230
#define TK_NK_GT 231
#define TK_NK_LE 232
#define TK_NK_GE 233
#define TK_NK_NE 234
#define TK_MATCH 235
#define TK_NMATCH 236
#define TK_CONTAINS 237
#define TK_IN 238
#define TK_JOIN 239
#define TK_INNER 240
#define TK_SELECT 241
#define TK_DISTINCT 242
#define TK_WHERE 243
#define TK_PARTITION 244
#define TK_BY 245
#define TK_SESSION 246
#define TK_STATE_WINDOW 247
#define TK_SLIDING 248
#define TK_FILL 249
#define TK_VALUE 250
#define TK_NONE 251
#define TK_PREV 252
#define TK_LINEAR 253
#define TK_NEXT 254
#define TK_HAVING 255
#define TK_RANGE 256
#define TK_EVERY 257
#define TK_ORDER 258
#define TK_SLIMIT 259
#define TK_SOFFSET 260
#define TK_LIMIT 261
#define TK_OFFSET 262
#define TK_ASC 263
#define TK_NULLS 264
#define TK_ABORT 265
#define TK_AFTER 266
#define TK_ATTACH 267
#define TK_BEFORE 268
#define TK_BEGIN 269
#define TK_BITAND 270
#define TK_BITNOT 271
#define TK_BITOR 272
#define TK_BLOCKS 273
#define TK_CHANGE 274
#define TK_COMMA 275
#define TK_COMPACT 276
#define TK_CONCAT 277
#define TK_CONFLICT 278
#define TK_COPY 279
#define TK_DEFERRED 280
#define TK_DELIMITERS 281
#define TK_DETACH 282
#define TK_DIVIDE 283
#define TK_DOT 284
#define TK_EACH 285
#define TK_FAIL 286
#define TK_FILE 287
#define TK_FOR 288
#define TK_GLOB 289
#define TK_ID 290
#define TK_IMMEDIATE 291
#define TK_IMPORT 292
#define TK_INITIALLY 293
#define TK_INSTEAD 294
#define TK_ISNULL 295
#define TK_KEY 296
#define TK_MODULES 297
#define TK_NK_BITNOT 298
#define TK_NK_SEMI 299
#define TK_NOTNULL 300
#define TK_OF 301
#define TK_PLUS 302
#define TK_PRIVILEGE 303
#define TK_RAISE 304
#define TK_REPLACE 305
#define TK_RESTRICT 306
#define TK_ROW 307
#define TK_SEMI 308
#define TK_STAR 309
#define TK_STATEMENT 310
#define TK_STRING 311
#define TK_TIMES 312
#define TK_UPDATE 313
#define TK_VALUES 314
#define TK_VARIABLE 315
#define TK_VIEW 316
#define TK_WAL 317
#define TK_NK_SPACE 300
#define TK_NK_COMMENT 301

View File

@ -374,6 +374,7 @@ typedef struct SStreamOptions {
int8_t triggerType;
SNode* pDelay;
SNode* pWatermark;
int8_t fillHistory;
int8_t ignoreExpired;
} SStreamOptions;

View File

@ -567,6 +567,7 @@ typedef struct SSubplan {
SDataSinkNode* pDataSink; // data of the subplan flow into the datasink
SNode* pTagCond;
SNode* pTagIndexCond;
bool showRewrite;
} SSubplan;
typedef enum EExplainMode { EXPLAIN_MODE_DISABLE = 1, EXPLAIN_MODE_STATIC, EXPLAIN_MODE_ANALYZE } EExplainMode;
@ -585,8 +586,6 @@ typedef struct SQueryPlan {
SExplainInfo explainInfo;
} SQueryPlan;
void nodesWalkPhysiPlan(SNode* pNode, FNodeWalker walker, void* pContext);
const char* dataOrderStr(EDataOrderLevel order);
#ifdef __cplusplus

View File

@ -61,7 +61,6 @@ int32_t qStringToSubplan(const char* pStr, SSubplan** pSubplan);
int32_t qSubPlanToMsg(const SSubplan* pSubplan, char** pStr, int32_t* pLen);
int32_t qMsgToSubplan(const char* pStr, int32_t len, SSubplan** pSubplan);
char* qQueryPlanToString(const SQueryPlan* pPlan);
SQueryPlan* qStringToQueryPlan(const char* pStr);
void qDestroyQueryPlan(SQueryPlan* pPlan);

View File

@ -4996,6 +4996,7 @@ int32_t tDeserializeSMRecoverStreamReq(void *buf, int32_t bufLen, SMRecoverStrea
}
void tFreeSCMCreateStreamReq(SCMCreateStreamReq *pReq) {
taosArrayDestroy(pReq->pTags);
taosMemoryFreeClear(pReq->sql);
taosMemoryFreeClear(pReq->ast);
}

View File

@ -111,6 +111,8 @@ const char* nodesNodeName(ENodeType type) {
return "DropSuperTableStmt";
case QUERY_NODE_ALTER_TABLE_STMT:
return "AlterTableStmt";
case QUERY_NODE_ALTER_SUPER_TABLE_STMT:
return "AlterSuperTableStmt";
case QUERY_NODE_CREATE_USER_STMT:
return "CreateUserStmt";
case QUERY_NODE_ALTER_USER_STMT:
@ -669,7 +671,7 @@ static int32_t logicProjectNodeToJson(const void* pObj, SJson* pJson) {
code = nodeListToJson(pJson, jkProjectLogicPlanProjections, pNode->pProjections);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkProjectLogicPlanIgnoreGroupId, pNode->ignoreGroupId);
code = tjsonAddBoolToObject(pJson, jkProjectLogicPlanIgnoreGroupId, pNode->ignoreGroupId);
}
return code;
@ -2632,6 +2634,7 @@ static const char* jkSubplanRootNode = "RootNode";
static const char* jkSubplanDataSink = "DataSink";
static const char* jkSubplanTagCond = "TagCond";
static const char* jkSubplanTagIndexCond = "TagIndexCond";
static const char* jkSubplanShowRewrite = "ShowRewrite";
static int32_t subplanToJson(const void* pObj, SJson* pJson) {
const SSubplan* pNode = (const SSubplan*)pObj;
@ -2667,6 +2670,9 @@ static int32_t subplanToJson(const void* pObj, SJson* pJson) {
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddObject(pJson, jkSubplanTagIndexCond, nodeToJson, pNode->pTagIndexCond);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddBoolToObject(pJson, jkSubplanShowRewrite, pNode->showRewrite);
}
return code;
}
@ -2705,6 +2711,9 @@ static int32_t jsonToSubplan(const SJson* pJson, void* pObj) {
if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeObject(pJson, jkSubplanTagIndexCond, (SNode**)&pNode->pTagIndexCond);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetBoolValue(pJson, jkSubplanShowRewrite, &pNode->showRewrite);
}
return code;
}
@ -2758,6 +2767,20 @@ static int32_t logicAggNodeToJson(const void* pObj, SJson* pJson) {
return code;
}
static int32_t jsonToLogicAggNode(const SJson* pJson, void* pObj) {
SAggLogicNode* pNode = (SAggLogicNode*)pObj;
int32_t code = jsonToLogicPlanNode(pJson, pObj);
if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeList(pJson, jkAggLogicPlanGroupKeys, &pNode->pGroupKeys);
}
if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeList(pJson, jkAggLogicPlanAggFuncs, &pNode->pAggFuncs);
}
return code;
}
static const char* jkDataTypeType = "Type";
static const char* jkDataTypePrecision = "Precision";
static const char* jkDataTypeScale = "Scale";
@ -4735,6 +4758,8 @@ static int32_t jsonToSpecificNode(const SJson* pJson, void* pObj) {
return jsonToDeleteStmt(pJson, pObj);
case QUERY_NODE_LOGIC_PLAN_SCAN:
return jsonToLogicScanNode(pJson, pObj);
case QUERY_NODE_LOGIC_PLAN_AGG:
return jsonToLogicAggNode(pJson, pObj);
case QUERY_NODE_LOGIC_PLAN_PROJECT:
return jsonToLogicProjectNode(pJson, pObj);
case QUERY_NODE_LOGIC_PLAN_VNODE_MODIFY:

View File

@ -1969,7 +1969,12 @@ static int32_t msgToPhysiScanNode(STlvDecoder* pDecoder, void* pObj) {
return code;
}
enum { PHY_LAST_ROW_SCAN_CODE_SCAN = 1, PHY_LAST_ROW_SCAN_CODE_GROUP_TAGS, PHY_LAST_ROW_SCAN_CODE_GROUP_SORT, PHY_LAST_ROW_SCAN_CODE_IGNULL };
enum {
PHY_LAST_ROW_SCAN_CODE_SCAN = 1,
PHY_LAST_ROW_SCAN_CODE_GROUP_TAGS,
PHY_LAST_ROW_SCAN_CODE_GROUP_SORT,
PHY_LAST_ROW_SCAN_CODE_IGNULL
};
static int32_t physiLastRowScanNodeToMsg(const void* pObj, STlvEncoder* pEncoder) {
const SLastRowScanPhysiNode* pNode = (const SLastRowScanPhysiNode*)pObj;
@ -3433,6 +3438,9 @@ static int32_t subplanInlineToMsg(const void* pObj, STlvEncoder* pEncoder) {
if (TSDB_CODE_SUCCESS == code) {
code = queryNodeAddrInlineToMsg(&pNode->execNode, pEncoder);
}
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeValueBool(pEncoder, pNode->showRewrite);
}
return code;
}
@ -3479,6 +3487,9 @@ static int32_t msgToSubplanInline(STlvDecoder* pDecoder, void* pObj) {
if (TSDB_CODE_SUCCESS == code) {
code = msgToQueryNodeAddrInline(pDecoder, &pNode->execNode);
}
if (TSDB_CODE_SUCCESS == code) {
code = tlvDecodeValueBool(pDecoder, &pNode->showRewrite);
}
return code;
}

View File

@ -438,210 +438,3 @@ void nodesRewriteSelectStmt(SSelectStmt* pSelect, ESqlClause clause, FNodeRewrit
return;
}
static EDealRes walkPhysiNode(SPhysiNode* pNode, ETraversalOrder order, FNodeWalker walker, void* pContext) {
EDealRes res = walkPhysiPlan((SNode*)pNode->pOutputDataBlockDesc, order, walker, pContext);
if (DEAL_RES_ERROR != res && DEAL_RES_END != res) {
res = walkPhysiPlan(pNode->pConditions, order, walker, pContext);
}
if (DEAL_RES_ERROR != res && DEAL_RES_END != res) {
res = walkPhysiPlans(pNode->pChildren, order, walker, pContext);
}
return res;
}
static EDealRes walkScanPhysi(SScanPhysiNode* pScan, ETraversalOrder order, FNodeWalker walker, void* pContext) {
EDealRes res = walkPhysiNode((SPhysiNode*)pScan, order, walker, pContext);
if (DEAL_RES_ERROR != res && DEAL_RES_END != res) {
res = walkPhysiPlans(pScan->pScanCols, order, walker, pContext);
}
return res;
}
static EDealRes walkTableScanPhysi(STableScanPhysiNode* pScan, ETraversalOrder order, FNodeWalker walker,
void* pContext) {
EDealRes res = walkScanPhysi((SScanPhysiNode*)pScan, order, walker, pContext);
if (DEAL_RES_ERROR != res && DEAL_RES_END != res) {
res = walkPhysiPlans(pScan->pDynamicScanFuncs, order, walker, pContext);
}
return res;
}
static EDealRes walkWindowPhysi(SWinodwPhysiNode* pWindow, ETraversalOrder order, FNodeWalker walker, void* pContext) {
EDealRes res = walkPhysiNode((SPhysiNode*)pWindow, order, walker, pContext);
if (DEAL_RES_ERROR != res && DEAL_RES_END != res) {
res = walkPhysiPlans(pWindow->pExprs, order, walker, pContext);
}
if (DEAL_RES_ERROR != res && DEAL_RES_END != res) {
res = walkPhysiPlans(pWindow->pFuncs, order, walker, pContext);
}
if (DEAL_RES_ERROR != res && DEAL_RES_END != res) {
res = walkPhysiPlan(pWindow->pTspk, order, walker, pContext);
}
return res;
}
static EDealRes dispatchPhysiPlan(SNode* pNode, ETraversalOrder order, FNodeWalker walker, void* pContext) {
EDealRes res = DEAL_RES_CONTINUE;
switch (nodeType(pNode)) {
case QUERY_NODE_NODE_LIST:
res = walkPhysiPlans(((SNodeListNode*)pNode)->pNodeList, order, walker, pContext);
break;
case QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN:
res = walkScanPhysi((SScanPhysiNode*)pNode, order, walker, pContext);
break;
case QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN:
res = walkTableScanPhysi((STableScanPhysiNode*)pNode, order, walker, pContext);
break;
case QUERY_NODE_PHYSICAL_PLAN_TABLE_SEQ_SCAN:
res = walkTableScanPhysi((STableScanPhysiNode*)pNode, order, walker, pContext);
break;
case QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN:
res = walkScanPhysi((SScanPhysiNode*)pNode, order, walker, pContext);
break;
case QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN:
res = walkScanPhysi((SScanPhysiNode*)pNode, order, walker, pContext);
break;
case QUERY_NODE_PHYSICAL_PLAN_PROJECT: {
SProjectPhysiNode* pProject = (SProjectPhysiNode*)pNode;
res = walkPhysiNode((SPhysiNode*)pNode, order, walker, pContext);
if (DEAL_RES_ERROR != res && DEAL_RES_END != res) {
res = walkPhysiPlans(pProject->pProjections, order, walker, pContext);
}
break;
}
case QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN: {
SSortMergeJoinPhysiNode* pJoin = (SSortMergeJoinPhysiNode*)pNode;
res = walkPhysiNode((SPhysiNode*)pNode, order, walker, pContext);
if (DEAL_RES_ERROR != res && DEAL_RES_END != res) {
res = walkPhysiPlan(pJoin->pMergeCondition, order, walker, pContext);
}
if (DEAL_RES_ERROR != res && DEAL_RES_END != res) {
res = walkPhysiPlan(pJoin->pOnConditions, order, walker, pContext);
}
if (DEAL_RES_ERROR != res && DEAL_RES_END != res) {
res = walkPhysiPlans(pJoin->pTargets, order, walker, pContext);
}
break;
}
case QUERY_NODE_PHYSICAL_PLAN_HASH_AGG: {
SAggPhysiNode* pAgg = (SAggPhysiNode*)pNode;
res = walkPhysiNode((SPhysiNode*)pNode, order, walker, pContext);
if (DEAL_RES_ERROR != res && DEAL_RES_END != res) {
res = walkPhysiPlans(pAgg->pExprs, order, walker, pContext);
}
if (DEAL_RES_ERROR != res && DEAL_RES_END != res) {
res = walkPhysiPlans(pAgg->pGroupKeys, order, walker, pContext);
}
if (DEAL_RES_ERROR != res && DEAL_RES_END != res) {
res = walkPhysiPlans(pAgg->pAggFuncs, order, walker, pContext);
}
break;
}
case QUERY_NODE_PHYSICAL_PLAN_EXCHANGE: {
SExchangePhysiNode* pExchange = (SExchangePhysiNode*)pNode;
res = walkPhysiNode((SPhysiNode*)pNode, order, walker, pContext);
if (DEAL_RES_ERROR != res && DEAL_RES_END != res) {
res = walkPhysiPlans(pExchange->pSrcEndPoints, order, walker, pContext);
}
break;
}
case QUERY_NODE_PHYSICAL_PLAN_SORT:
case QUERY_NODE_PHYSICAL_PLAN_GROUP_SORT: {
SSortPhysiNode* pSort = (SSortPhysiNode*)pNode;
res = walkPhysiNode((SPhysiNode*)pNode, order, walker, pContext);
if (DEAL_RES_ERROR != res && DEAL_RES_END != res) {
res = walkPhysiPlans(pSort->pExprs, order, walker, pContext);
}
if (DEAL_RES_ERROR != res && DEAL_RES_END != res) {
res = walkPhysiPlans(pSort->pSortKeys, order, walker, pContext);
}
if (DEAL_RES_ERROR != res && DEAL_RES_END != res) {
res = walkPhysiPlans(pSort->pTargets, order, walker, pContext);
}
break;
}
case QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL:
case QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL:
res = walkWindowPhysi((SWinodwPhysiNode*)pNode, order, walker, pContext);
break;
case QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION:
case QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION:
res = walkWindowPhysi((SWinodwPhysiNode*)pNode, order, walker, pContext);
break;
case QUERY_NODE_PHYSICAL_PLAN_MERGE_STATE:
case QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE: {
SStateWinodwPhysiNode* pState = (SStateWinodwPhysiNode*)pNode;
res = walkWindowPhysi((SWinodwPhysiNode*)pNode, order, walker, pContext);
if (DEAL_RES_ERROR != res && DEAL_RES_END != res) {
res = walkPhysiPlan(pState->pStateKey, order, walker, pContext);
}
break;
}
case QUERY_NODE_PHYSICAL_PLAN_PARTITION:
case QUERY_NODE_PHYSICAL_PLAN_STREAM_PARTITION: {
SPartitionPhysiNode* pPart = (SPartitionPhysiNode*)pNode;
res = walkPhysiNode((SPhysiNode*)pNode, order, walker, pContext);
if (DEAL_RES_ERROR != res && DEAL_RES_END != res) {
res = walkPhysiPlans(pPart->pExprs, order, walker, pContext);
}
if (DEAL_RES_ERROR != res && DEAL_RES_END != res) {
res = walkPhysiPlans(pPart->pPartitionKeys, order, walker, pContext);
}
if (DEAL_RES_ERROR != res && DEAL_RES_END != res) {
res = walkPhysiPlans(pPart->pTargets, order, walker, pContext);
}
break;
}
case QUERY_NODE_PHYSICAL_PLAN_DISPATCH:
res = walkPhysiPlan((SNode*)(((SDataSinkNode*)pNode)->pInputDataBlockDesc), order, walker, pContext);
break;
case QUERY_NODE_PHYSICAL_PLAN_INSERT:
res = walkPhysiPlan((SNode*)(((SDataSinkNode*)pNode)->pInputDataBlockDesc), order, walker, pContext);
break;
case QUERY_NODE_PHYSICAL_SUBPLAN: {
SSubplan* pSubplan = (SSubplan*)pNode;
res = walkPhysiPlans(pSubplan->pChildren, order, walker, pContext);
if (DEAL_RES_ERROR != res && DEAL_RES_END != res) {
res = walkPhysiPlan((SNode*)pSubplan->pNode, order, walker, pContext);
}
if (DEAL_RES_ERROR != res && DEAL_RES_END != res) {
res = walkPhysiPlan((SNode*)pSubplan->pDataSink, order, walker, pContext);
}
break;
}
case QUERY_NODE_PHYSICAL_PLAN: {
SQueryPlan* pPlan = (SQueryPlan*)pNode;
if (NULL != pPlan->pSubplans) {
// only need to walk the top-level subplans, because they will recurse to all the subplans below
walkPhysiPlan(nodesListGetNode(pPlan->pSubplans, 0), order, walker, pContext);
}
break;
}
default:
res = dispatchExpr(pNode, order, walker, pContext);
break;
}
return res;
}
static EDealRes walkPhysiPlan(SNode* pNode, ETraversalOrder order, FNodeWalker walker, void* pContext) {
return walkNode(pNode, order, walker, pContext, dispatchPhysiPlan);
}
static EDealRes walkPhysiPlans(SNodeList* pNodeList, ETraversalOrder order, FNodeWalker walker, void* pContext) {
SNode* node;
FOREACH(node, pNodeList) {
EDealRes res = walkPhysiPlan(node, order, walker, pContext);
if (DEAL_RES_ERROR == res || DEAL_RES_END == res) {
return res;
}
}
return DEAL_RES_CONTINUE;
}
void nodesWalkPhysiPlan(SNode* pNode, FNodeWalker walker, void* pContext) {
(void)walkPhysiPlan(pNode, TRAVERSAL_PREORDER, walker, pContext);
}

View File

@ -328,6 +328,7 @@ SNode* nodesMakeNode(ENodeType type) {
case QUERY_NODE_DROP_SUPER_TABLE_STMT:
return makeNode(type, sizeof(SDropSuperTableStmt));
case QUERY_NODE_ALTER_TABLE_STMT:
case QUERY_NODE_ALTER_SUPER_TABLE_STMT:
return makeNode(type, sizeof(SAlterTableStmt));
case QUERY_NODE_CREATE_USER_STMT:
return makeNode(type, sizeof(SCreateUserStmt));

View File

@ -1,19 +1,26 @@
MESSAGE(STATUS "build nodes unit test")
# GoogleTest requires at least C++11
SET(CMAKE_CXX_STANDARD 11)
AUX_SOURCE_DIRECTORY(${CMAKE_CURRENT_SOURCE_DIR} SOURCE_LIST)
ADD_EXECUTABLE(nodesTest ${SOURCE_LIST})
TARGET_INCLUDE_DIRECTORIES(
IF(NOT TD_DARWIN)
# GoogleTest requires at least C++11
SET(CMAKE_CXX_STANDARD 11)
AUX_SOURCE_DIRECTORY(${CMAKE_CURRENT_SOURCE_DIR} SOURCE_LIST)
ADD_EXECUTABLE(nodesTest ${SOURCE_LIST})
TARGET_INCLUDE_DIRECTORIES(
nodesTest
PUBLIC "${TD_SOURCE_DIR}/include/nodes/"
PRIVATE "${TD_SOURCE_DIR}/source/nodes/inc"
)
TARGET_LINK_LIBRARIES(
)
TARGET_LINK_LIBRARIES(
nodesTest
PUBLIC os util common nodes gtest
)
PUBLIC os util common nodes qcom gtest
)
add_test(
NAME nodesTest
COMMAND nodesTest
)
ENDIF()

View File

@ -0,0 +1,299 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, AND/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <gtest/gtest.h>
#include "nodes.h"
#include "plannodes.h"
#include "querynodes.h"
class NodesCloneTest : public testing::Test {
public:
void registerCheckFunc(const std::function<void(const SNode*, const SNode*)>& func) { checkFunc_ = func; }
void run(const SNode* pSrc) {
std::unique_ptr<SNode, void (*)(SNode*)> pDst(nodesCloneNode(pSrc), nodesDestroyNode);
checkFunc_(pSrc, pDst.get());
}
private:
std::function<void(const SNode*, const SNode*)> checkFunc_;
};
TEST_F(NodesCloneTest, tempTable) {
registerCheckFunc([](const SNode* pSrc, const SNode* pDst) {
ASSERT_EQ(nodeType(pSrc), nodeType(pDst));
STempTableNode* pSrcNode = (STempTableNode*)pSrc;
STempTableNode* pDstNode = (STempTableNode*)pDst;
ASSERT_EQ(nodeType(pSrcNode->pSubquery), nodeType(pDstNode->pSubquery));
});
std::unique_ptr<SNode, void (*)(SNode*)> srcNode(nullptr, nodesDestroyNode);
run([&]() {
srcNode.reset(nodesMakeNode(QUERY_NODE_TEMP_TABLE));
STempTableNode* pNode = (STempTableNode*)srcNode.get();
pNode->pSubquery = nodesMakeNode(QUERY_NODE_SELECT_STMT);
return srcNode.get();
}());
}
TEST_F(NodesCloneTest, joinTable) {
registerCheckFunc([](const SNode* pSrc, const SNode* pDst) {
ASSERT_EQ(nodeType(pSrc), nodeType(pDst));
SJoinTableNode* pSrcNode = (SJoinTableNode*)pSrc;
SJoinTableNode* pDstNode = (SJoinTableNode*)pDst;
ASSERT_EQ(pSrcNode->joinType, pDstNode->joinType);
ASSERT_EQ(nodeType(pSrcNode->pLeft), nodeType(pDstNode->pLeft));
ASSERT_EQ(nodeType(pSrcNode->pRight), nodeType(pDstNode->pRight));
if (NULL != pSrcNode->pOnCond) {
ASSERT_EQ(nodeType(pSrcNode->pOnCond), nodeType(pDstNode->pOnCond));
}
});
std::unique_ptr<SNode, void (*)(SNode*)> srcNode(nullptr, nodesDestroyNode);
run([&]() {
srcNode.reset(nodesMakeNode(QUERY_NODE_JOIN_TABLE));
SJoinTableNode* pNode = (SJoinTableNode*)srcNode.get();
pNode->joinType = JOIN_TYPE_INNER;
pNode->pLeft = nodesMakeNode(QUERY_NODE_REAL_TABLE);
pNode->pRight = nodesMakeNode(QUERY_NODE_REAL_TABLE);
pNode->pOnCond = nodesMakeNode(QUERY_NODE_OPERATOR);
return srcNode.get();
}());
}
TEST_F(NodesCloneTest, stateWindow) {
registerCheckFunc([](const SNode* pSrc, const SNode* pDst) {
ASSERT_EQ(nodeType(pSrc), nodeType(pDst));
SStateWindowNode* pSrcNode = (SStateWindowNode*)pSrc;
SStateWindowNode* pDstNode = (SStateWindowNode*)pDst;
ASSERT_EQ(nodeType(pSrcNode->pCol), nodeType(pDstNode->pCol));
ASSERT_EQ(nodeType(pSrcNode->pExpr), nodeType(pDstNode->pExpr));
});
std::unique_ptr<SNode, void (*)(SNode*)> srcNode(nullptr, nodesDestroyNode);
run([&]() {
srcNode.reset(nodesMakeNode(QUERY_NODE_STATE_WINDOW));
SStateWindowNode* pNode = (SStateWindowNode*)srcNode.get();
pNode->pCol = nodesMakeNode(QUERY_NODE_COLUMN);
pNode->pExpr = nodesMakeNode(QUERY_NODE_OPERATOR);
return srcNode.get();
}());
}
TEST_F(NodesCloneTest, sessionWindow) {
registerCheckFunc([](const SNode* pSrc, const SNode* pDst) {
ASSERT_EQ(nodeType(pSrc), nodeType(pDst));
SSessionWindowNode* pSrcNode = (SSessionWindowNode*)pSrc;
SSessionWindowNode* pDstNode = (SSessionWindowNode*)pDst;
ASSERT_EQ(nodeType(pSrcNode->pCol), nodeType(pDstNode->pCol));
ASSERT_EQ(nodeType(pSrcNode->pGap), nodeType(pDstNode->pGap));
});
std::unique_ptr<SNode, void (*)(SNode*)> srcNode(nullptr, nodesDestroyNode);
run([&]() {
srcNode.reset(nodesMakeNode(QUERY_NODE_SESSION_WINDOW));
SSessionWindowNode* pNode = (SSessionWindowNode*)srcNode.get();
pNode->pCol = (SColumnNode*)nodesMakeNode(QUERY_NODE_COLUMN);
pNode->pGap = (SValueNode*)nodesMakeNode(QUERY_NODE_VALUE);
return srcNode.get();
}());
}
TEST_F(NodesCloneTest, intervalWindow) {
registerCheckFunc([](const SNode* pSrc, const SNode* pDst) {
ASSERT_EQ(nodeType(pSrc), nodeType(pDst));
SIntervalWindowNode* pSrcNode = (SIntervalWindowNode*)pSrc;
SIntervalWindowNode* pDstNode = (SIntervalWindowNode*)pDst;
ASSERT_EQ(nodeType(pSrcNode->pInterval), nodeType(pDstNode->pInterval));
if (NULL != pSrcNode->pOffset) {
ASSERT_EQ(nodeType(pSrcNode->pOffset), nodeType(pDstNode->pOffset));
}
if (NULL != pSrcNode->pSliding) {
ASSERT_EQ(nodeType(pSrcNode->pSliding), nodeType(pDstNode->pSliding));
}
if (NULL != pSrcNode->pFill) {
ASSERT_EQ(nodeType(pSrcNode->pFill), nodeType(pDstNode->pFill));
}
});
std::unique_ptr<SNode, void (*)(SNode*)> srcNode(nullptr, nodesDestroyNode);
run([&]() {
srcNode.reset(nodesMakeNode(QUERY_NODE_INTERVAL_WINDOW));
SIntervalWindowNode* pNode = (SIntervalWindowNode*)srcNode.get();
pNode->pInterval = nodesMakeNode(QUERY_NODE_VALUE);
pNode->pOffset = nodesMakeNode(QUERY_NODE_VALUE);
pNode->pSliding = nodesMakeNode(QUERY_NODE_VALUE);
pNode->pFill = nodesMakeNode(QUERY_NODE_FILL);
return srcNode.get();
}());
}
TEST_F(NodesCloneTest, fill) {
registerCheckFunc([](const SNode* pSrc, const SNode* pDst) {
ASSERT_EQ(nodeType(pSrc), nodeType(pDst));
SFillNode* pSrcNode = (SFillNode*)pSrc;
SFillNode* pDstNode = (SFillNode*)pDst;
ASSERT_EQ(pSrcNode->mode, pDstNode->mode);
if (NULL != pSrcNode->pValues) {
ASSERT_EQ(nodeType(pSrcNode->pValues), nodeType(pDstNode->pValues));
}
ASSERT_EQ(nodeType(pSrcNode->pWStartTs), nodeType(pDstNode->pWStartTs));
ASSERT_EQ(pSrcNode->timeRange.skey, pDstNode->timeRange.skey);
ASSERT_EQ(pSrcNode->timeRange.ekey, pDstNode->timeRange.ekey);
});
std::unique_ptr<SNode, void (*)(SNode*)> srcNode(nullptr, nodesDestroyNode);
run([&]() {
srcNode.reset(nodesMakeNode(QUERY_NODE_FILL));
SFillNode* pNode = (SFillNode*)srcNode.get();
pNode->mode = FILL_MODE_VALUE;
pNode->pValues = nodesMakeNode(QUERY_NODE_NODE_LIST);
pNode->pWStartTs = nodesMakeNode(QUERY_NODE_COLUMN);
pNode->timeRange.skey = 1666756692907;
pNode->timeRange.ekey = 1666756699907;
return srcNode.get();
}());
}
TEST_F(NodesCloneTest, logicSubplan) {
registerCheckFunc([](const SNode* pSrc, const SNode* pDst) {
ASSERT_EQ(nodeType(pSrc), nodeType(pDst));
SLogicSubplan* pSrcNode = (SLogicSubplan*)pSrc;
SLogicSubplan* pDstNode = (SLogicSubplan*)pDst;
ASSERT_EQ(pSrcNode->subplanType, pDstNode->subplanType);
ASSERT_EQ(pSrcNode->level, pDstNode->level);
ASSERT_EQ(pSrcNode->splitFlag, pDstNode->splitFlag);
ASSERT_EQ(pSrcNode->numOfComputeNodes, pDstNode->numOfComputeNodes);
});
std::unique_ptr<SNode, void (*)(SNode*)> srcNode(nullptr, nodesDestroyNode);
run([&]() {
srcNode.reset(nodesMakeNode(QUERY_NODE_LOGIC_SUBPLAN));
SLogicSubplan* pNode = (SLogicSubplan*)srcNode.get();
return srcNode.get();
}());
}
TEST_F(NodesCloneTest, physiScan) {
registerCheckFunc([](const SNode* pSrc, const SNode* pDst) {
ASSERT_EQ(nodeType(pSrc), nodeType(pDst));
STagScanPhysiNode* pSrcNode = (STagScanPhysiNode*)pSrc;
STagScanPhysiNode* pDstNode = (STagScanPhysiNode*)pDst;
ASSERT_EQ(pSrcNode->uid, pDstNode->uid);
ASSERT_EQ(pSrcNode->suid, pDstNode->suid);
ASSERT_EQ(pSrcNode->tableType, pDstNode->tableType);
});
std::unique_ptr<SNode, void (*)(SNode*)> srcNode(nullptr, nodesDestroyNode);
run([&]() {
srcNode.reset(nodesMakeNode(QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN));
STagScanPhysiNode* pNode = (STagScanPhysiNode*)srcNode.get();
return srcNode.get();
}());
}
TEST_F(NodesCloneTest, physiSystemTableScan) {
registerCheckFunc([](const SNode* pSrc, const SNode* pDst) {
ASSERT_EQ(nodeType(pSrc), nodeType(pDst));
SSystemTableScanPhysiNode* pSrcNode = (SSystemTableScanPhysiNode*)pSrc;
SSystemTableScanPhysiNode* pDstNode = (SSystemTableScanPhysiNode*)pDst;
ASSERT_EQ(pSrcNode->showRewrite, pDstNode->showRewrite);
ASSERT_EQ(pSrcNode->accountId, pDstNode->accountId);
ASSERT_EQ(pSrcNode->sysInfo, pDstNode->sysInfo);
});
std::unique_ptr<SNode, void (*)(SNode*)> srcNode(nullptr, nodesDestroyNode);
run([&]() {
srcNode.reset(nodesMakeNode(QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN));
SSystemTableScanPhysiNode* pNode = (SSystemTableScanPhysiNode*)srcNode.get();
return srcNode.get();
}());
}
TEST_F(NodesCloneTest, physiStreamSemiSessionWinodw) {
registerCheckFunc([](const SNode* pSrc, const SNode* pDst) {
ASSERT_EQ(nodeType(pSrc), nodeType(pDst));
SStreamSemiSessionWinodwPhysiNode* pSrcNode = (SStreamSemiSessionWinodwPhysiNode*)pSrc;
SStreamSemiSessionWinodwPhysiNode* pDstNode = (SStreamSemiSessionWinodwPhysiNode*)pDst;
ASSERT_EQ(pSrcNode->gap, pDstNode->gap);
});
std::unique_ptr<SNode, void (*)(SNode*)> srcNode(nullptr, nodesDestroyNode);
run([&]() {
srcNode.reset(nodesMakeNode(QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION));
SStreamSemiSessionWinodwPhysiNode* pNode = (SStreamSemiSessionWinodwPhysiNode*)srcNode.get();
return srcNode.get();
}());
}
TEST_F(NodesCloneTest, physiStreamFinalSessionWinodw) {
registerCheckFunc([](const SNode* pSrc, const SNode* pDst) {
ASSERT_EQ(nodeType(pSrc), nodeType(pDst));
SStreamFinalSessionWinodwPhysiNode* pSrcNode = (SStreamFinalSessionWinodwPhysiNode*)pSrc;
SStreamFinalSessionWinodwPhysiNode* pDstNode = (SStreamFinalSessionWinodwPhysiNode*)pDst;
ASSERT_EQ(pSrcNode->gap, pDstNode->gap);
});
std::unique_ptr<SNode, void (*)(SNode*)> srcNode(nullptr, nodesDestroyNode);
run([&]() {
srcNode.reset(nodesMakeNode(QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION));
SStreamFinalSessionWinodwPhysiNode* pNode = (SStreamFinalSessionWinodwPhysiNode*)srcNode.get();
return srcNode.get();
}());
}
TEST_F(NodesCloneTest, physiStreamPartition) {
registerCheckFunc([](const SNode* pSrc, const SNode* pDst) {
ASSERT_EQ(nodeType(pSrc), nodeType(pDst));
SStreamPartitionPhysiNode* pSrcNode = (SStreamPartitionPhysiNode*)pSrc;
SStreamPartitionPhysiNode* pDstNode = (SStreamPartitionPhysiNode*)pDst;
});
std::unique_ptr<SNode, void (*)(SNode*)> srcNode(nullptr, nodesDestroyNode);
run([&]() {
srcNode.reset(nodesMakeNode(QUERY_NODE_PHYSICAL_PLAN_STREAM_PARTITION));
SStreamPartitionPhysiNode* pNode = (SStreamPartitionPhysiNode*)srcNode.get();
return srcNode.get();
}());
}
TEST_F(NodesCloneTest, physiPartition) {
registerCheckFunc([](const SNode* pSrc, const SNode* pDst) {
ASSERT_EQ(nodeType(pSrc), nodeType(pDst));
SPartitionPhysiNode* pSrcNode = (SPartitionPhysiNode*)pSrc;
SPartitionPhysiNode* pDstNode = (SPartitionPhysiNode*)pDst;
});
std::unique_ptr<SNode, void (*)(SNode*)> srcNode(nullptr, nodesDestroyNode);
run([&]() {
srcNode.reset(nodesMakeNode(QUERY_NODE_PHYSICAL_PLAN_PARTITION));
SPartitionPhysiNode* pNode = (SPartitionPhysiNode*)srcNode.get();
return srcNode.get();
}());
}

View File

@ -518,6 +518,7 @@ stream_options(A) ::= stream_options(B) TRIGGER WINDOW_CLOSE.
stream_options(A) ::= stream_options(B) TRIGGER MAX_DELAY duration_literal(C). { ((SStreamOptions*)B)->triggerType = STREAM_TRIGGER_MAX_DELAY; ((SStreamOptions*)B)->pDelay = releaseRawExprNode(pCxt, C); A = B; }
stream_options(A) ::= stream_options(B) WATERMARK duration_literal(C). { ((SStreamOptions*)B)->pWatermark = releaseRawExprNode(pCxt, C); A = B; }
stream_options(A) ::= stream_options(B) IGNORE EXPIRED NK_INTEGER(C). { ((SStreamOptions*)B)->ignoreExpired = taosStr2Int8(C.z, NULL, 10); A = B; }
stream_options(A) ::= stream_options(B) FILL_HISTORY NK_INTEGER(C). { ((SStreamOptions*)B)->fillHistory = taosStr2Int8(C.z, NULL, 10); A = B; }
subtable_opt(A) ::= . { A = NULL; }
subtable_opt(A) ::= SUBTABLE NK_LP expression(B) NK_RP. { A = releaseRawExprNode(pCxt, B); }

View File

@ -1702,6 +1702,7 @@ SNode* createStreamOptions(SAstCreateContext* pCxt) {
SStreamOptions* pOptions = (SStreamOptions*)nodesMakeNode(QUERY_NODE_STREAM_OPTIONS);
CHECK_OUT_OF_MEM(pOptions);
pOptions->triggerType = STREAM_TRIGGER_AT_ONCE;
pOptions->fillHistory = STREAM_DEFAULT_FILL_HISTORY;
pOptions->ignoreExpired = STREAM_DEFAULT_IGNORE_EXPIRED;
return (SNode*)pOptions;
}

View File

@ -92,6 +92,7 @@ static SKeyword keywordTable[] = {
{"EVERY", TK_EVERY},
{"FILE", TK_FILE},
{"FILL", TK_FILL},
{"FILL_HISTORY", TK_FILL_HISTORY},
{"FIRST", TK_FIRST},
{"FLOAT", TK_FLOAT},
{"FLUSH", TK_FLUSH},

View File

@ -5542,6 +5542,7 @@ static int32_t buildCreateStreamReq(STranslateContext* pCxt, SCreateStreamStmt*
pReq->triggerType = pStmt->pOptions->triggerType;
pReq->maxDelay = (NULL != pStmt->pOptions->pDelay ? ((SValueNode*)pStmt->pOptions->pDelay)->datum.i : 0);
pReq->watermark = (NULL != pStmt->pOptions->pWatermark ? ((SValueNode*)pStmt->pOptions->pWatermark)->datum.i : 0);
pReq->fillHistory = pStmt->pOptions->fillHistory;
pReq->igExpired = pStmt->pOptions->ignoreExpired;
columnDefNodeToField(pStmt->pTags, &pReq->pTags);
pReq->numOfTags = LIST_LENGTH(pStmt->pTags);

File diff suppressed because it is too large Load Diff

View File

@ -576,10 +576,10 @@ TEST_F(ParserInitialCTest, createStream) {
memset(&expect, 0, sizeof(SCMCreateStreamReq));
};
auto setCreateStreamReqFunc = [&](const char* pStream, const char* pSrcDb, const char* pSql, const char* pDstStb,
int8_t igExists = 0, int8_t triggerType = STREAM_TRIGGER_AT_ONCE,
int64_t maxDelay = 0, int64_t watermark = 0,
int8_t igExpired = STREAM_DEFAULT_IGNORE_EXPIRED) {
auto setCreateStreamReq = [&](const char* pStream, const char* pSrcDb, const char* pSql, const char* pDstStb,
int8_t igExists = 0, int8_t triggerType = STREAM_TRIGGER_AT_ONCE, int64_t maxDelay = 0,
int64_t watermark = 0, int8_t igExpired = STREAM_DEFAULT_IGNORE_EXPIRED,
int8_t fillHistory = STREAM_DEFAULT_FILL_HISTORY) {
snprintf(expect.name, sizeof(expect.name), "0.%s", pStream);
snprintf(expect.sourceDB, sizeof(expect.sourceDB), "0.%s", pSrcDb);
snprintf(expect.targetStbFullName, sizeof(expect.targetStbFullName), "0.test.%s", pDstStb);
@ -588,6 +588,7 @@ TEST_F(ParserInitialCTest, createStream) {
expect.triggerType = triggerType;
expect.maxDelay = maxDelay;
expect.watermark = watermark;
expect.fillHistory = fillHistory;
expect.igExpired = igExpired;
};
@ -619,6 +620,7 @@ TEST_F(ParserInitialCTest, createStream) {
ASSERT_EQ(req.triggerType, expect.triggerType);
ASSERT_EQ(req.maxDelay, expect.maxDelay);
ASSERT_EQ(req.watermark, expect.watermark);
ASSERT_EQ(req.fillHistory, expect.fillHistory);
ASSERT_EQ(req.igExpired, expect.igExpired);
ASSERT_EQ(req.numOfTags, expect.numOfTags);
if (expect.numOfTags > 0) {
@ -636,24 +638,24 @@ TEST_F(ParserInitialCTest, createStream) {
tFreeSCMCreateStreamReq(&req);
});
setCreateStreamReqFunc("s1", "test", "create stream s1 into st1 as select count(*) from t1 interval(10s)", "st1");
setCreateStreamReq("s1", "test", "create stream s1 into st1 as select count(*) from t1 interval(10s)", "st1");
run("CREATE STREAM s1 INTO st1 AS SELECT COUNT(*) FROM t1 INTERVAL(10S)");
clearCreateStreamReq();
setCreateStreamReqFunc("s1", "test",
"create stream if not exists s1 trigger max_delay 20s watermark 10s ignore expired 0 into st1 "
"as select count(*) from t1 interval(10s)",
"st1", 1, STREAM_TRIGGER_MAX_DELAY, 20 * MILLISECOND_PER_SECOND, 10 * MILLISECOND_PER_SECOND,
0);
run("CREATE STREAM IF NOT EXISTS s1 TRIGGER MAX_DELAY 20s WATERMARK 10s IGNORE EXPIRED 0 INTO st1 AS SELECT COUNT(*) "
setCreateStreamReq(
"s1", "test",
"create stream if not exists s1 trigger max_delay 20s watermark 10s ignore expired 0 fill_history 1 into st1 "
"as select count(*) from t1 interval(10s)",
"st1", 1, STREAM_TRIGGER_MAX_DELAY, 20 * MILLISECOND_PER_SECOND, 10 * MILLISECOND_PER_SECOND, 0, 1);
run("CREATE STREAM IF NOT EXISTS s1 TRIGGER MAX_DELAY 20s WATERMARK 10s IGNORE EXPIRED 0 FILL_HISTORY 1 INTO st1 AS "
"SELECT COUNT(*) "
"FROM t1 INTERVAL(10S)");
clearCreateStreamReq();
setCreateStreamReqFunc(
"s1", "test",
"create stream s1 into st3 tags(tname varchar(10), id int) subtable(concat('new-', tname)) as "
"select _wstart wstart, count(*) cnt from st1 partition by tbname tname, tag1 id interval(10s)",
"st3");
setCreateStreamReq("s1", "test",
"create stream s1 into st3 tags(tname varchar(10), id int) subtable(concat('new-', tname)) as "
"select _wstart wstart, count(*) cnt from st1 partition by tbname tname, tag1 id interval(10s)",
"st3");
addTag("tname", TSDB_DATA_TYPE_VARCHAR, 10 + VARSTR_HEADER_SIZE);
addTag("id", TSDB_DATA_TYPE_INT);
run("CREATE STREAM s1 INTO st3 TAGS(tname VARCHAR(10), id INT) SUBTABLE(CONCAT('new-', tname)) "

View File

@ -295,6 +295,10 @@ class ParserTestBaseImpl {
char* pStr = NULL;
int32_t len = 0;
DO_WITH_THROW(nodesNodeToString, pRoot, false, &pStr, &len)
// check toObject
SNode* pCopy = NULL;
DO_WITH_THROW(nodesStringToNode, pStr, &pCopy)
nodesDestroyNode(pCopy);
string str(pStr);
taosMemoryFreeClear(pStr);
return str;

View File

@ -339,29 +339,6 @@ static int32_t createScanLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect
return code;
}
static int32_t createColumnByLastRow(SNodeList* pFuncs, SNodeList** pOutput) {
int32_t code = TSDB_CODE_SUCCESS;
SNodeList* pCols = NULL;
SNode* pFunc = NULL;
FOREACH(pFunc, pFuncs) {
SFunctionNode* pLastRow = (SFunctionNode*)pFunc;
SColumnNode* pCol = (SColumnNode*)nodesListGetNode(pLastRow->pParameterList, 0);
snprintf(pCol->colName, sizeof(pCol->colName), "%s", pLastRow->node.aliasName);
snprintf(pCol->node.aliasName, sizeof(pCol->colName), "%s", pLastRow->node.aliasName);
NODES_CLEAR_LIST(pLastRow->pParameterList);
code = nodesListMakeStrictAppend(&pCols, (SNode*)pCol);
if (TSDB_CODE_SUCCESS != code) {
break;
}
}
if (TSDB_CODE_SUCCESS == code) {
*pOutput = pCols;
} else {
nodesDestroyList(pCols);
}
return code;
}
static int32_t createSubqueryLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect, STempTableNode* pTable,
SLogicNode** pLogicNode) {
return createQueryLogicNode(pCxt, pTable->pSubquery, pLogicNode);
@ -491,20 +468,6 @@ static SNode* createGroupingSetNode(SNode* pExpr) {
return (SNode*)pGroupingSet;
}
static int32_t createGroupKeysFromPartKeys(SNodeList* pPartKeys, SNodeList** pOutput) {
SNodeList* pGroupKeys = NULL;
SNode* pPartKey = NULL;
FOREACH(pPartKey, pPartKeys) {
int32_t code = nodesListMakeStrictAppend(&pGroupKeys, createGroupingSetNode(pPartKey));
if (TSDB_CODE_SUCCESS != code) {
nodesDestroyList(pGroupKeys);
return code;
}
}
*pOutput = pGroupKeys;
return TSDB_CODE_SUCCESS;
}
static EGroupAction getGroupAction(SLogicPlanContext* pCxt, SSelectStmt* pSelect) {
return (pCxt->pPlanCxt->streamQuery || NULL != pSelect->pLimit || NULL != pSelect->pSlimit) ? GROUP_ACTION_KEEP
: GROUP_ACTION_NONE;

View File

@ -596,18 +596,6 @@ static int32_t pushDownCondOptPushCondToOnCond(SOptimizeContext* pCxt, SJoinLogi
return pushDownCondOptAppendCond(&pJoin->pOnConditions, pCond);
}
static int32_t pushDownCondOptPushCondToScan(SOptimizeContext* pCxt, SScanLogicNode* pScan, SNode** pCond) {
return pushDownCondOptAppendCond(&pScan->node.pConditions, pCond);
}
static int32_t pushDownCondOptPushCondToProject(SOptimizeContext* pCxt, SProjectLogicNode* pProject, SNode** pCond) {
return pushDownCondOptAppendCond(&pProject->node.pConditions, pCond);
}
static int32_t pushDownCondOptPushCondToJoin(SOptimizeContext* pCxt, SJoinLogicNode* pJoin, SNode** pCond) {
return pushDownCondOptAppendCond(&pJoin->node.pConditions, pCond);
}
static int32_t pushDownCondOptPushCondToChild(SOptimizeContext* pCxt, SLogicNode* pChild, SNode** pCond) {
return pushDownCondOptAppendCond(&pChild->pConditions, pCond);
}
@ -1201,40 +1189,6 @@ static bool smaIndexOptMayBeOptimized(SLogicNode* pNode) {
return true;
}
static int32_t smaIndexOptCreateMerge(SLogicNode* pChild, SNodeList* pMergeKeys, SNodeList* pTargets,
SLogicNode** pOutput) {
SMergeLogicNode* pMerge = (SMergeLogicNode*)nodesMakeNode(QUERY_NODE_LOGIC_PLAN_MERGE);
if (NULL == pMerge) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pMerge->node.precision = pChild->precision;
pMerge->numOfChannels = 2;
pMerge->pMergeKeys = pMergeKeys;
pMerge->node.pTargets = pTargets;
pMerge->pInputs = nodesCloneList(pChild->pTargets);
if (NULL == pMerge->pInputs) {
nodesDestroyNode((SNode*)pMerge);
return TSDB_CODE_OUT_OF_MEMORY;
}
*pOutput = (SLogicNode*)pMerge;
return TSDB_CODE_SUCCESS;
}
static int32_t smaIndexOptRecombinationNode(SLogicSubplan* pLogicSubplan, SLogicNode* pInterval, SLogicNode* pMerge,
SLogicNode* pSmaScan) {
int32_t code = nodesListMakeAppend(&pMerge->pChildren, (SNode*)pInterval);
if (TSDB_CODE_SUCCESS == code) {
code = nodesListMakeAppend(&pMerge->pChildren, (SNode*)pSmaScan);
}
if (TSDB_CODE_SUCCESS == code) {
code = replaceLogicNode(pLogicSubplan, pInterval, pMerge);
pSmaScan->pParent = pMerge;
pInterval->pParent = pMerge;
}
return code;
}
static int32_t smaIndexOptCreateSmaScan(SScanLogicNode* pScan, STableIndexInfo* pIndex, SNodeList* pCols,
SLogicNode** pOutput) {
SScanLogicNode* pSmaScan = (SScanLogicNode*)nodesMakeNode(QUERY_NODE_LOGIC_PLAN_SCAN);
@ -1770,7 +1724,7 @@ static int32_t rewriteTailOptCreateLimit(SNode* pLimit, SNode* pOffset, SNode**
return TSDB_CODE_OUT_OF_MEMORY;
}
pLimitNode->limit = NULL == pLimit ? -1 : ((SValueNode*)pLimit)->datum.i;
pLimitNode->offset = NULL == pOffset ? -1 : ((SValueNode*)pOffset)->datum.i;
pLimitNode->offset = NULL == pOffset ? 0 : ((SValueNode*)pOffset)->datum.i;
*pOutput = (SNode*)pLimitNode;
return TSDB_CODE_SUCCESS;
}

View File

@ -584,6 +584,7 @@ static int32_t createSystemTableScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan*
return TSDB_CODE_OUT_OF_MEMORY;
}
pSubplan->showRewrite = pScanLogicNode->showRewrite;
pScan->showRewrite = pScanLogicNode->showRewrite;
pScan->accountId = pCxt->pPlanCxt->acctId;
pScan->sysInfo = pCxt->pPlanCxt->sysInfo;

View File

@ -1419,7 +1419,8 @@ typedef struct SQnodeSplitInfo {
static bool qndSplFindSplitNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SLogicNode* pNode,
SQnodeSplitInfo* pInfo) {
if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pNode) && NULL != pNode->pParent &&
((SScanLogicNode*)pNode)->scanSeq[0] < 1 && ((SScanLogicNode*)pNode)->scanSeq[1] < 1) {
QUERY_NODE_LOGIC_PLAN_INTERP_FUNC != nodeType(pNode->pParent) && ((SScanLogicNode*)pNode)->scanSeq[0] <= 1 &&
((SScanLogicNode*)pNode)->scanSeq[1] <= 1) {
pInfo->pSplitNode = pNode;
pInfo->pSubplan = pSubplan;
return true;

View File

@ -142,15 +142,6 @@ int32_t qMsgToSubplan(const char* pStr, int32_t len, SSubplan** pSubplan) {
return nodesMsgToNode(pStr, len, (SNode**)pSubplan);
}
char* qQueryPlanToString(const SQueryPlan* pPlan) {
char* pStr = NULL;
int32_t len = 0;
if (TSDB_CODE_SUCCESS != nodesNodeToString((SNode*)pPlan, false, &pStr, &len)) {
return NULL;
}
return pStr;
}
SQueryPlan* qStringToQueryPlan(const char* pStr) {
SQueryPlan* pPlan = NULL;
if (TSDB_CODE_SUCCESS != nodesStringToNode(pStr, (SNode**)&pPlan)) {

View File

@ -466,8 +466,13 @@ class PlannerTestBaseImpl {
char* pStr = NULL;
int32_t len = 0;
DO_WITH_THROW(nodesNodeToString, pRoot, false, &pStr, &len)
// check toObject
SNode* pCopy = NULL;
DO_WITH_THROW(nodesStringToNode, pStr, &pCopy)
nodesDestroyNode(pCopy);
string str(pStr);
taosMemoryFreeClear(pStr);
return str;
}