Merge branch '3.0' of https://github.com/taosdata/TDengine into fix/ly_stream

This commit is contained in:
54liuyao 2024-10-11 08:52:43 +08:00
commit 998eeb2fe9
17 changed files with 6924 additions and 5926 deletions

View File

@ -69,335 +69,341 @@
#define TK_NK_DOT 51
#define TK_WITH 52
#define TK_ENCRYPT_KEY 53
#define TK_DNODE 54
#define TK_PORT 55
#define TK_DNODES 56
#define TK_RESTORE 57
#define TK_NK_IPTOKEN 58
#define TK_FORCE 59
#define TK_UNSAFE 60
#define TK_CLUSTER 61
#define TK_LOCAL 62
#define TK_QNODE 63
#define TK_BNODE 64
#define TK_SNODE 65
#define TK_MNODE 66
#define TK_VNODE 67
#define TK_DATABASE 68
#define TK_USE 69
#define TK_FLUSH 70
#define TK_TRIM 71
#define TK_S3MIGRATE 72
#define TK_COMPACT 73
#define TK_IF 74
#define TK_NOT 75
#define TK_EXISTS 76
#define TK_BUFFER 77
#define TK_CACHEMODEL 78
#define TK_CACHESIZE 79
#define TK_COMP 80
#define TK_DURATION 81
#define TK_NK_VARIABLE 82
#define TK_MAXROWS 83
#define TK_MINROWS 84
#define TK_KEEP 85
#define TK_PAGES 86
#define TK_PAGESIZE 87
#define TK_TSDB_PAGESIZE 88
#define TK_PRECISION 89
#define TK_REPLICA 90
#define TK_VGROUPS 91
#define TK_SINGLE_STABLE 92
#define TK_RETENTIONS 93
#define TK_SCHEMALESS 94
#define TK_WAL_LEVEL 95
#define TK_WAL_FSYNC_PERIOD 96
#define TK_WAL_RETENTION_PERIOD 97
#define TK_WAL_RETENTION_SIZE 98
#define TK_WAL_ROLL_PERIOD 99
#define TK_WAL_SEGMENT_SIZE 100
#define TK_STT_TRIGGER 101
#define TK_TABLE_PREFIX 102
#define TK_TABLE_SUFFIX 103
#define TK_S3_CHUNKSIZE 104
#define TK_S3_KEEPLOCAL 105
#define TK_S3_COMPACT 106
#define TK_KEEP_TIME_OFFSET 107
#define TK_ENCRYPT_ALGORITHM 108
#define TK_NK_COLON 109
#define TK_BWLIMIT 110
#define TK_START 111
#define TK_TIMESTAMP 112
#define TK_END 113
#define TK_TABLE 114
#define TK_NK_LP 115
#define TK_NK_RP 116
#define TK_USING 117
#define TK_FILE 118
#define TK_STABLE 119
#define TK_COLUMN 120
#define TK_MODIFY 121
#define TK_RENAME 122
#define TK_TAG 123
#define TK_SET 124
#define TK_NK_EQ 125
#define TK_TAGS 126
#define TK_BOOL 127
#define TK_TINYINT 128
#define TK_SMALLINT 129
#define TK_INT 130
#define TK_INTEGER 131
#define TK_BIGINT 132
#define TK_FLOAT 133
#define TK_DOUBLE 134
#define TK_BINARY 135
#define TK_NCHAR 136
#define TK_UNSIGNED 137
#define TK_JSON 138
#define TK_VARCHAR 139
#define TK_MEDIUMBLOB 140
#define TK_BLOB 141
#define TK_VARBINARY 142
#define TK_GEOMETRY 143
#define TK_DECIMAL 144
#define TK_COMMENT 145
#define TK_MAX_DELAY 146
#define TK_WATERMARK 147
#define TK_ROLLUP 148
#define TK_TTL 149
#define TK_SMA 150
#define TK_DELETE_MARK 151
#define TK_FIRST 152
#define TK_LAST 153
#define TK_SHOW 154
#define TK_FULL 155
#define TK_PRIVILEGES 156
#define TK_DATABASES 157
#define TK_TABLES 158
#define TK_STABLES 159
#define TK_MNODES 160
#define TK_QNODES 161
#define TK_ARBGROUPS 162
#define TK_FUNCTIONS 163
#define TK_INDEXES 164
#define TK_ACCOUNTS 165
#define TK_APPS 166
#define TK_CONNECTIONS 167
#define TK_LICENCES 168
#define TK_GRANTS 169
#define TK_LOGS 170
#define TK_MACHINES 171
#define TK_ENCRYPTIONS 172
#define TK_QUERIES 173
#define TK_SCORES 174
#define TK_TOPICS 175
#define TK_VARIABLES 176
#define TK_BNODES 177
#define TK_SNODES 178
#define TK_TRANSACTIONS 179
#define TK_DISTRIBUTED 180
#define TK_CONSUMERS 181
#define TK_SUBSCRIPTIONS 182
#define TK_VNODES 183
#define TK_ALIVE 184
#define TK_VIEWS 185
#define TK_VIEW 186
#define TK_COMPACTS 187
#define TK_NORMAL 188
#define TK_CHILD 189
#define TK_LIKE 190
#define TK_TBNAME 191
#define TK_QTAGS 192
#define TK_AS 193
#define TK_SYSTEM 194
#define TK_TSMA 195
#define TK_INTERVAL 196
#define TK_RECURSIVE 197
#define TK_TSMAS 198
#define TK_FUNCTION 199
#define TK_INDEX 200
#define TK_COUNT 201
#define TK_LAST_ROW 202
#define TK_META 203
#define TK_ONLY 204
#define TK_TOPIC 205
#define TK_CONSUMER 206
#define TK_GROUP 207
#define TK_DESC 208
#define TK_DESCRIBE 209
#define TK_RESET 210
#define TK_QUERY 211
#define TK_CACHE 212
#define TK_EXPLAIN 213
#define TK_ANALYZE 214
#define TK_VERBOSE 215
#define TK_NK_BOOL 216
#define TK_RATIO 217
#define TK_NK_FLOAT 218
#define TK_OUTPUTTYPE 219
#define TK_AGGREGATE 220
#define TK_BUFSIZE 221
#define TK_LANGUAGE 222
#define TK_REPLACE 223
#define TK_STREAM 224
#define TK_INTO 225
#define TK_PAUSE 226
#define TK_RESUME 227
#define TK_PRIMARY 228
#define TK_KEY 229
#define TK_TRIGGER 230
#define TK_AT_ONCE 231
#define TK_WINDOW_CLOSE 232
#define TK_IGNORE 233
#define TK_EXPIRED 234
#define TK_FILL_HISTORY 235
#define TK_UPDATE 236
#define TK_SUBTABLE 237
#define TK_UNTREATED 238
#define TK_KILL 239
#define TK_CONNECTION 240
#define TK_TRANSACTION 241
#define TK_BALANCE 242
#define TK_VGROUP 243
#define TK_LEADER 244
#define TK_MERGE 245
#define TK_REDISTRIBUTE 246
#define TK_SPLIT 247
#define TK_DELETE 248
#define TK_INSERT 249
#define TK_NK_BIN 250
#define TK_NK_HEX 251
#define TK_NULL 252
#define TK_NK_QUESTION 253
#define TK_NK_ALIAS 254
#define TK_NK_ARROW 255
#define TK_ROWTS 256
#define TK_QSTART 257
#define TK_QEND 258
#define TK_QDURATION 259
#define TK_WSTART 260
#define TK_WEND 261
#define TK_WDURATION 262
#define TK_IROWTS 263
#define TK_ISFILLED 264
#define TK_CAST 265
#define TK_POSITION 266
#define TK_IN 267
#define TK_FOR 268
#define TK_NOW 269
#define TK_TODAY 270
#define TK_RAND 271
#define TK_SUBSTR 272
#define TK_SUBSTRING 273
#define TK_BOTH 274
#define TK_TRAILING 275
#define TK_LEADING 276
#define TK_TIMEZONE 277
#define TK_CLIENT_VERSION 278
#define TK_SERVER_VERSION 279
#define TK_SERVER_STATUS 280
#define TK_CURRENT_USER 281
#define TK_PI 282
#define TK_CASE 283
#define TK_WHEN 284
#define TK_THEN 285
#define TK_ELSE 286
#define TK_BETWEEN 287
#define TK_IS 288
#define TK_NK_LT 289
#define TK_NK_GT 290
#define TK_NK_LE 291
#define TK_NK_GE 292
#define TK_NK_NE 293
#define TK_MATCH 294
#define TK_NMATCH 295
#define TK_CONTAINS 296
#define TK_JOIN 297
#define TK_INNER 298
#define TK_LEFT 299
#define TK_RIGHT 300
#define TK_OUTER 301
#define TK_SEMI 302
#define TK_ANTI 303
#define TK_ASOF 304
#define TK_WINDOW 305
#define TK_WINDOW_OFFSET 306
#define TK_JLIMIT 307
#define TK_SELECT 308
#define TK_NK_HINT 309
#define TK_DISTINCT 310
#define TK_WHERE 311
#define TK_PARTITION 312
#define TK_BY 313
#define TK_SESSION 314
#define TK_STATE_WINDOW 315
#define TK_EVENT_WINDOW 316
#define TK_COUNT_WINDOW 317
#define TK_SLIDING 318
#define TK_FILL 319
#define TK_VALUE 320
#define TK_VALUE_F 321
#define TK_NONE 322
#define TK_PREV 323
#define TK_NULL_F 324
#define TK_LINEAR 325
#define TK_NEXT 326
#define TK_HAVING 327
#define TK_RANGE 328
#define TK_EVERY 329
#define TK_ORDER 330
#define TK_SLIMIT 331
#define TK_SOFFSET 332
#define TK_LIMIT 333
#define TK_OFFSET 334
#define TK_ASC 335
#define TK_NULLS 336
#define TK_ABORT 337
#define TK_AFTER 338
#define TK_ATTACH 339
#define TK_BEFORE 340
#define TK_BEGIN 341
#define TK_BITAND 342
#define TK_BITNOT 343
#define TK_BITOR 344
#define TK_BLOCKS 345
#define TK_CHANGE 346
#define TK_COMMA 347
#define TK_CONCAT 348
#define TK_CONFLICT 349
#define TK_COPY 350
#define TK_DEFERRED 351
#define TK_DELIMITERS 352
#define TK_DETACH 353
#define TK_DIVIDE 354
#define TK_DOT 355
#define TK_EACH 356
#define TK_FAIL 357
#define TK_GLOB 358
#define TK_ID 359
#define TK_IMMEDIATE 360
#define TK_IMPORT 361
#define TK_INITIALLY 362
#define TK_INSTEAD 363
#define TK_ISNULL 364
#define TK_MODULES 365
#define TK_NK_BITNOT 366
#define TK_NK_SEMI 367
#define TK_NOTNULL 368
#define TK_OF 369
#define TK_PLUS 370
#define TK_PRIVILEGE 371
#define TK_RAISE 372
#define TK_RESTRICT 373
#define TK_ROW 374
#define TK_STAR 375
#define TK_STATEMENT 376
#define TK_STRICT 377
#define TK_STRING 378
#define TK_TIMES 379
#define TK_VALUES 380
#define TK_VARIABLE 381
#define TK_WAL 382
#define TK_ANODE 54
#define TK_UPDATE 55
#define TK_ANODES 56
#define TK_DNODE 57
#define TK_PORT 58
#define TK_DNODES 59
#define TK_RESTORE 60
#define TK_NK_IPTOKEN 61
#define TK_FORCE 62
#define TK_UNSAFE 63
#define TK_CLUSTER 64
#define TK_LOCAL 65
#define TK_QNODE 66
#define TK_BNODE 67
#define TK_SNODE 68
#define TK_MNODE 69
#define TK_VNODE 70
#define TK_DATABASE 71
#define TK_USE 72
#define TK_FLUSH 73
#define TK_TRIM 74
#define TK_S3MIGRATE 75
#define TK_COMPACT 76
#define TK_IF 77
#define TK_NOT 78
#define TK_EXISTS 79
#define TK_BUFFER 80
#define TK_CACHEMODEL 81
#define TK_CACHESIZE 82
#define TK_COMP 83
#define TK_DURATION 84
#define TK_NK_VARIABLE 85
#define TK_MAXROWS 86
#define TK_MINROWS 87
#define TK_KEEP 88
#define TK_PAGES 89
#define TK_PAGESIZE 90
#define TK_TSDB_PAGESIZE 91
#define TK_PRECISION 92
#define TK_REPLICA 93
#define TK_VGROUPS 94
#define TK_SINGLE_STABLE 95
#define TK_RETENTIONS 96
#define TK_SCHEMALESS 97
#define TK_WAL_LEVEL 98
#define TK_WAL_FSYNC_PERIOD 99
#define TK_WAL_RETENTION_PERIOD 100
#define TK_WAL_RETENTION_SIZE 101
#define TK_WAL_ROLL_PERIOD 102
#define TK_WAL_SEGMENT_SIZE 103
#define TK_STT_TRIGGER 104
#define TK_TABLE_PREFIX 105
#define TK_TABLE_SUFFIX 106
#define TK_S3_CHUNKSIZE 107
#define TK_S3_KEEPLOCAL 108
#define TK_S3_COMPACT 109
#define TK_KEEP_TIME_OFFSET 110
#define TK_ENCRYPT_ALGORITHM 111
#define TK_NK_COLON 112
#define TK_BWLIMIT 113
#define TK_START 114
#define TK_TIMESTAMP 115
#define TK_END 116
#define TK_TABLE 117
#define TK_NK_LP 118
#define TK_NK_RP 119
#define TK_USING 120
#define TK_FILE 121
#define TK_STABLE 122
#define TK_COLUMN 123
#define TK_MODIFY 124
#define TK_RENAME 125
#define TK_TAG 126
#define TK_SET 127
#define TK_NK_EQ 128
#define TK_TAGS 129
#define TK_BOOL 130
#define TK_TINYINT 131
#define TK_SMALLINT 132
#define TK_INT 133
#define TK_INTEGER 134
#define TK_BIGINT 135
#define TK_FLOAT 136
#define TK_DOUBLE 137
#define TK_BINARY 138
#define TK_NCHAR 139
#define TK_UNSIGNED 140
#define TK_JSON 141
#define TK_VARCHAR 142
#define TK_MEDIUMBLOB 143
#define TK_BLOB 144
#define TK_VARBINARY 145
#define TK_GEOMETRY 146
#define TK_DECIMAL 147
#define TK_COMMENT 148
#define TK_MAX_DELAY 149
#define TK_WATERMARK 150
#define TK_ROLLUP 151
#define TK_TTL 152
#define TK_SMA 153
#define TK_DELETE_MARK 154
#define TK_FIRST 155
#define TK_LAST 156
#define TK_SHOW 157
#define TK_FULL 158
#define TK_PRIVILEGES 159
#define TK_DATABASES 160
#define TK_TABLES 161
#define TK_STABLES 162
#define TK_MNODES 163
#define TK_QNODES 164
#define TK_ARBGROUPS 165
#define TK_FUNCTIONS 166
#define TK_INDEXES 167
#define TK_ACCOUNTS 168
#define TK_APPS 169
#define TK_CONNECTIONS 170
#define TK_LICENCES 171
#define TK_GRANTS 172
#define TK_LOGS 173
#define TK_MACHINES 174
#define TK_ENCRYPTIONS 175
#define TK_QUERIES 176
#define TK_SCORES 177
#define TK_TOPICS 178
#define TK_VARIABLES 179
#define TK_BNODES 180
#define TK_SNODES 181
#define TK_TRANSACTIONS 182
#define TK_DISTRIBUTED 183
#define TK_CONSUMERS 184
#define TK_SUBSCRIPTIONS 185
#define TK_VNODES 186
#define TK_ALIVE 187
#define TK_VIEWS 188
#define TK_VIEW 189
#define TK_COMPACTS 190
#define TK_NORMAL 191
#define TK_CHILD 192
#define TK_LIKE 193
#define TK_TBNAME 194
#define TK_QTAGS 195
#define TK_AS 196
#define TK_SYSTEM 197
#define TK_TSMA 198
#define TK_INTERVAL 199
#define TK_RECURSIVE 200
#define TK_TSMAS 201
#define TK_FUNCTION 202
#define TK_INDEX 203
#define TK_COUNT 204
#define TK_LAST_ROW 205
#define TK_META 206
#define TK_ONLY 207
#define TK_TOPIC 208
#define TK_CONSUMER 209
#define TK_GROUP 210
#define TK_DESC 211
#define TK_DESCRIBE 212
#define TK_RESET 213
#define TK_QUERY 214
#define TK_CACHE 215
#define TK_EXPLAIN 216
#define TK_ANALYZE 217
#define TK_VERBOSE 218
#define TK_NK_BOOL 219
#define TK_RATIO 220
#define TK_NK_FLOAT 221
#define TK_OUTPUTTYPE 222
#define TK_AGGREGATE 223
#define TK_BUFSIZE 224
#define TK_LANGUAGE 225
#define TK_REPLACE 226
#define TK_STREAM 227
#define TK_INTO 228
#define TK_PAUSE 229
#define TK_RESUME 230
#define TK_PRIMARY 231
#define TK_KEY 232
#define TK_TRIGGER 233
#define TK_AT_ONCE 234
#define TK_WINDOW_CLOSE 235
#define TK_IGNORE 236
#define TK_EXPIRED 237
#define TK_FILL_HISTORY 238
#define TK_SUBTABLE 239
#define TK_UNTREATED 240
#define TK_KILL 241
#define TK_CONNECTION 242
#define TK_TRANSACTION 243
#define TK_BALANCE 244
#define TK_VGROUP 245
#define TK_LEADER 246
#define TK_MERGE 247
#define TK_REDISTRIBUTE 248
#define TK_SPLIT 249
#define TK_DELETE 250
#define TK_INSERT 251
#define TK_NK_BIN 252
#define TK_NK_HEX 253
#define TK_NULL 254
#define TK_NK_QUESTION 255
#define TK_NK_ALIAS 256
#define TK_NK_ARROW 257
#define TK_ROWTS 258
#define TK_QSTART 259
#define TK_QEND 260
#define TK_QDURATION 261
#define TK_WSTART 262
#define TK_WEND 263
#define TK_WDURATION 264
#define TK_IROWTS 265
#define TK_ISFILLED 266
#define TK_FLOW 267
#define TK_FHIGH 268
#define TK_FROWTS 269
#define TK_CAST 270
#define TK_POSITION 271
#define TK_IN 272
#define TK_FOR 273
#define TK_NOW 274
#define TK_TODAY 275
#define TK_RAND 276
#define TK_SUBSTR 277
#define TK_SUBSTRING 278
#define TK_BOTH 279
#define TK_TRAILING 280
#define TK_LEADING 281
#define TK_TIMEZONE 282
#define TK_CLIENT_VERSION 283
#define TK_SERVER_VERSION 284
#define TK_SERVER_STATUS 285
#define TK_CURRENT_USER 286
#define TK_PI 287
#define TK_CASE 288
#define TK_WHEN 289
#define TK_THEN 290
#define TK_ELSE 291
#define TK_BETWEEN 292
#define TK_IS 293
#define TK_NK_LT 294
#define TK_NK_GT 295
#define TK_NK_LE 296
#define TK_NK_GE 297
#define TK_NK_NE 298
#define TK_MATCH 299
#define TK_NMATCH 300
#define TK_CONTAINS 301
#define TK_JOIN 302
#define TK_INNER 303
#define TK_LEFT 304
#define TK_RIGHT 305
#define TK_OUTER 306
#define TK_SEMI 307
#define TK_ANTI 308
#define TK_ASOF 309
#define TK_WINDOW 310
#define TK_WINDOW_OFFSET 311
#define TK_JLIMIT 312
#define TK_SELECT 313
#define TK_NK_HINT 314
#define TK_DISTINCT 315
#define TK_WHERE 316
#define TK_PARTITION 317
#define TK_BY 318
#define TK_SESSION 319
#define TK_STATE_WINDOW 320
#define TK_EVENT_WINDOW 321
#define TK_COUNT_WINDOW 322
#define TK_ANOMALY_WINDOW 323
#define TK_SLIDING 324
#define TK_FILL 325
#define TK_VALUE 326
#define TK_VALUE_F 327
#define TK_NONE 328
#define TK_PREV 329
#define TK_NULL_F 330
#define TK_LINEAR 331
#define TK_NEXT 332
#define TK_HAVING 333
#define TK_RANGE 334
#define TK_EVERY 335
#define TK_ORDER 336
#define TK_SLIMIT 337
#define TK_SOFFSET 338
#define TK_LIMIT 339
#define TK_OFFSET 340
#define TK_ASC 341
#define TK_NULLS 342
#define TK_ABORT 343
#define TK_AFTER 344
#define TK_ATTACH 345
#define TK_BEFORE 346
#define TK_BEGIN 347
#define TK_BITAND 348
#define TK_BITNOT 349
#define TK_BITOR 350
#define TK_BLOCKS 351
#define TK_CHANGE 352
#define TK_COMMA 353
#define TK_CONCAT 354
#define TK_CONFLICT 355
#define TK_COPY 356
#define TK_DEFERRED 357
#define TK_DELIMITERS 358
#define TK_DETACH 359
#define TK_DIVIDE 360
#define TK_DOT 361
#define TK_EACH 362
#define TK_FAIL 363
#define TK_GLOB 364
#define TK_ID 365
#define TK_IMMEDIATE 366
#define TK_IMPORT 367
#define TK_INITIALLY 368
#define TK_INSTEAD 369
#define TK_ISNULL 370
#define TK_MODULES 371
#define TK_NK_BITNOT 372
#define TK_NK_SEMI 373
#define TK_NOTNULL 374
#define TK_OF 375
#define TK_PLUS 376
#define TK_PRIVILEGE 377
#define TK_RAISE 378
#define TK_RESTRICT 379
#define TK_ROW 380
#define TK_STAR 381
#define TK_STATEMENT 382
#define TK_STRICT 383
#define TK_STRING 384
#define TK_TIMES 385
#define TK_VALUES 386
#define TK_VARIABLE 387
#define TK_WAL 388
#define TK_NK_SPACE 600
#define TK_NK_COMMENT 601

View File

@ -4839,6 +4839,48 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.sprocessFunc = randFunction,
.finalizeFunc = NULL
},
{
.name = "forecast",
.type = FUNCTION_TYPE_FORECAST,
.classification = FUNC_MGT_TIMELINE_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC |
FUNC_MGT_FORBID_STREAM_FUNC | FUNC_MGT_FORBID_SYSTABLE_FUNC | FUNC_MGT_KEEP_ORDER_FUNC | FUNC_MGT_PRIMARY_KEY_FUNC,
.translateFunc = translateForecast,
.getEnvFunc = getSelectivityFuncEnv,
.initFunc = functionSetup,
.processFunc = NULL,
.finalizeFunc = NULL,
.estimateReturnRowsFunc = forecastEstReturnRows,
},
{
.name = "_frowts",
.type = FUNCTION_TYPE_FORECAST_ROWTS,
.classification = FUNC_MGT_PSEUDO_COLUMN_FUNC | FUNC_MGT_FORECAST_PC_FUNC | FUNC_MGT_KEEP_ORDER_FUNC,
.translateFunc = translateTimePseudoColumn,
.getEnvFunc = getTimePseudoFuncEnv,
.initFunc = NULL,
.sprocessFunc = NULL,
.finalizeFunc = NULL
},
{
.name = "_flow",
.type = FUNCTION_TYPE_FORECAST_LOW,
.classification = FUNC_MGT_PSEUDO_COLUMN_FUNC | FUNC_MGT_FORECAST_PC_FUNC | FUNC_MGT_KEEP_ORDER_FUNC,
.translateFunc = translateForecastConf,
.getEnvFunc = getForecastConfEnv,
.initFunc = NULL,
.sprocessFunc = NULL,
.finalizeFunc = NULL
},
{
.name = "_fhigh",
.type = FUNCTION_TYPE_FORECAST_HIGH,
.classification = FUNC_MGT_PSEUDO_COLUMN_FUNC | FUNC_MGT_FORECAST_PC_FUNC | FUNC_MGT_KEEP_ORDER_FUNC,
.translateFunc = translateForecastConf,
.getEnvFunc = getForecastConfEnv,
.initFunc = NULL,
.sprocessFunc = NULL,
.finalizeFunc = NULL
},
};
// clang-format on

View File

@ -368,6 +368,13 @@ static int32_t countWindowNodeCopy(const SCountWindowNode* pSrc, SCountWindowNod
return TSDB_CODE_SUCCESS;
}
static int32_t anomalyWindowNodeCopy(const SAnomalyWindowNode* pSrc, SAnomalyWindowNode* pDst) {
CLONE_NODE_FIELD(pCol);
CLONE_NODE_FIELD(pExpr);
COPY_CHAR_ARRAY_FIELD(anomalyOpt);
return TSDB_CODE_SUCCESS;
}
static int32_t sessionWindowNodeCopy(const SSessionWindowNode* pSrc, SSessionWindowNode* pDst) {
CLONE_NODE_FIELD_EX(pCol, SColumnNode*);
CLONE_NODE_FIELD_EX(pGap, SValueNode*);
@ -622,6 +629,8 @@ static int32_t logicWindowCopy(const SWindowLogicNode* pSrc, SWindowLogicNode* p
COPY_SCALAR_FIELD(windowAlgo);
COPY_SCALAR_FIELD(windowCount);
COPY_SCALAR_FIELD(windowSliding);
CLONE_NODE_FIELD(pAnomalyExpr);
COPY_CHAR_ARRAY_FIELD(anomalyOpt);
return TSDB_CODE_SUCCESS;
}
@ -674,6 +683,12 @@ static int32_t logicInterpFuncCopy(const SInterpFuncLogicNode* pSrc, SInterpFunc
return TSDB_CODE_SUCCESS;
}
static int32_t logicForecastFuncCopy(const SForecastFuncLogicNode* pSrc, SForecastFuncLogicNode* pDst) {
COPY_BASE_OBJECT_FIELD(node, logicNodeCopy);
CLONE_NODE_LIST_FIELD(pFuncs);
return TSDB_CODE_SUCCESS;
}
static int32_t logicGroupCacheCopy(const SGroupCacheLogicNode* pSrc, SGroupCacheLogicNode* pDst) {
COPY_BASE_OBJECT_FIELD(node, logicNodeCopy);
COPY_SCALAR_FIELD(grpColsMayBeNull);
@ -937,6 +952,9 @@ int32_t nodesCloneNode(const SNode* pNode, SNode** ppNode) {
case QUERY_NODE_COUNT_WINDOW:
code = countWindowNodeCopy((const SCountWindowNode*)pNode, (SCountWindowNode*)pDst);
break;
case QUERY_NODE_ANOMALY_WINDOW:
code = anomalyWindowNodeCopy((const SAnomalyWindowNode*)pNode, (SAnomalyWindowNode*)pDst);
break;
case QUERY_NODE_SESSION_WINDOW:
code = sessionWindowNodeCopy((const SSessionWindowNode*)pNode, (SSessionWindowNode*)pDst);
break;
@ -1021,6 +1039,9 @@ int32_t nodesCloneNode(const SNode* pNode, SNode** ppNode) {
case QUERY_NODE_LOGIC_PLAN_INTERP_FUNC:
code = logicInterpFuncCopy((const SInterpFuncLogicNode*)pNode, (SInterpFuncLogicNode*)pDst);
break;
case QUERY_NODE_LOGIC_PLAN_FORECAST_FUNC:
code = logicForecastFuncCopy((const SForecastFuncLogicNode*)pNode, (SForecastFuncLogicNode*)pDst);
break;
case QUERY_NODE_LOGIC_PLAN_GROUP_CACHE:
code = logicGroupCacheCopy((const SGroupCacheLogicNode*)pNode, (SGroupCacheLogicNode*)pDst);
break;

View File

@ -97,6 +97,8 @@ const char* nodesNodeName(ENodeType type) {
return "WindowOffset";
case QUERY_NODE_COUNT_WINDOW:
return "CountWindow";
case QUERY_NODE_ANOMALY_WINDOW:
return "AnomalyWindow";
case QUERY_NODE_SET_OPERATOR:
return "SetOperator";
case QUERY_NODE_SELECT_STMT:
@ -153,6 +155,12 @@ const char* nodesNodeName(ENodeType type) {
return "CreateQnodeStmt";
case QUERY_NODE_DROP_QNODE_STMT:
return "DropQnodeStmt";
case QUERY_NODE_CREATE_ANODE_STMT:
return "CreateAnodeStmt";
case QUERY_NODE_DROP_ANODE_STMT:
return "DropAnodeStmt";
case QUERY_NODE_UPDATE_ANODE_STMT:
return "UpdateAnodeStmt";
case QUERY_NODE_CREATE_SNODE_STMT:
return "CreateSnodeStmt";
case QUERY_NODE_DROP_SNODE_STMT:
@ -213,6 +221,10 @@ const char* nodesNodeName(ENodeType type) {
return "ShowModulesStmt";
case QUERY_NODE_SHOW_QNODES_STMT:
return "ShowQnodesStmt";
case QUERY_NODE_SHOW_ANODES_STMT:
return "ShowAnodesStmt";
case QUERY_NODE_SHOW_ANODES_FULL_STMT:
return "ShowAnodesFullStmt";
case QUERY_NODE_SHOW_SNODES_STMT:
return "ShowSnodesStmt";
case QUERY_NODE_SHOW_BNODES_STMT:
@ -328,6 +340,8 @@ const char* nodesNodeName(ENodeType type) {
return "LogicIndefRowsFunc";
case QUERY_NODE_LOGIC_PLAN_INTERP_FUNC:
return "LogicInterpFunc";
case QUERY_NODE_LOGIC_PLAN_FORECAST_FUNC:
return "LogicForecastFunc";
case QUERY_NODE_LOGIC_PLAN_GROUP_CACHE:
return "LogicGroupCache";
case QUERY_NODE_LOGIC_PLAN_DYN_QUERY_CTRL:
@ -362,6 +376,10 @@ const char* nodesNodeName(ENodeType type) {
return "PhysiMergeCountWindow";
case QUERY_NODE_PHYSICAL_PLAN_STREAM_COUNT:
return "PhysiStreamCountWindow";
case QUERY_NODE_PHYSICAL_PLAN_MERGE_ANOMALY:
return "PhysiMergeAnomalyWindow";
case QUERY_NODE_PHYSICAL_PLAN_STREAM_ANOMALY:
return "PhysiStreamAnomalyWindow";
case QUERY_NODE_PHYSICAL_PLAN_PROJECT:
return "PhysiProject";
case QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN:
@ -413,6 +431,8 @@ const char* nodesNodeName(ENodeType type) {
return "PhysiIndefRowsFunc";
case QUERY_NODE_PHYSICAL_PLAN_INTERP_FUNC:
return "PhysiInterpFunc";
case QUERY_NODE_PHYSICAL_PLAN_FORECAST_FUNC:
return "PhysiForecastFunc";
case QUERY_NODE_PHYSICAL_PLAN_DISPATCH:
return "PhysiDispatch";
case QUERY_NODE_PHYSICAL_PLAN_INSERT:
@ -1260,6 +1280,30 @@ static int32_t jsonToLogicInterpFuncNode(const SJson* pJson, void* pObj) {
return code;
}
static const char* jkForecastFuncLogicPlanFuncs = "Funcs";
static int32_t logicForecastFuncNodeToJson(const void* pObj, SJson* pJson) {
const SForecastFuncLogicNode* pNode = (const SForecastFuncLogicNode*)pObj;
int32_t code = logicPlanNodeToJson(pObj, pJson);
if (TSDB_CODE_SUCCESS == code) {
code = nodeListToJson(pJson, jkForecastFuncLogicPlanFuncs, pNode->pFuncs);
}
return code;
}
static int32_t jsonToLogicForecastFuncNode(const SJson* pJson, void* pObj) {
SForecastFuncLogicNode* pNode = (SForecastFuncLogicNode*)pObj;
int32_t code = jsonToLogicPlanNode(pJson, pObj);
if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeList(pJson, jkForecastFuncLogicPlanFuncs, &pNode->pFuncs);
}
return code;
}
static const char* jkGroupCacheLogicPlanGrpColsMayBeNull = "GroupColsMayBeNull";
static const char* jkGroupCacheLogicPlanGroupByUid = "GroupByUid";
static const char* jkGroupCacheLogicPlanGlobalGroup = "GlobalGroup";
@ -3011,6 +3055,36 @@ static int32_t jsonToPhysiCountWindowNode(const SJson* pJson, void* pObj) {
return code;
}
static const char* jkAnomalyWindowPhysiPlanAnomalyKey = "AnomalyKey";
static const char* jkAnomalyWindowPhysiPlanAnomalyOption = "AnomalyOpt";
static int32_t physiAnomalyWindowNodeToJson(const void* pObj, SJson* pJson) {
const SAnomalyWindowPhysiNode* pNode = (const SAnomalyWindowPhysiNode*)pObj;
int32_t code = physiWindowNodeToJson(pObj, pJson);
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddObject(pJson, jkAnomalyWindowPhysiPlanAnomalyKey, nodeToJson, pNode->pAnomalyKey);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddStringToObject(pJson, jkAnomalyWindowPhysiPlanAnomalyOption, pNode->anomalyOpt);
}
return code;
}
static int32_t jsonToPhysiAnomalyWindowNode(const SJson* pJson, void* pObj) {
SAnomalyWindowPhysiNode* pNode = (SAnomalyWindowPhysiNode*)pObj;
int32_t code = jsonToPhysiWindowNode(pJson, pObj);
if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeObject(pJson, jkAnomalyWindowPhysiPlanAnomalyKey, &pNode->pAnomalyKey);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetStringValue(pJson, jkAnomalyWindowPhysiPlanAnomalyOption, pNode->anomalyOpt);
}
return code;
}
static const char* jkPartitionPhysiPlanExprs = "Exprs";
static const char* jkPartitionPhysiPlanPartitionKeys = "PartitionKeys";
static const char* jkPartitionPhysiPlanTargets = "Targets";
@ -3198,6 +3272,37 @@ static int32_t jsonToPhysiInterpFuncNode(const SJson* pJson, void* pObj) {
return code;
}
static const char* jkForecastFuncPhysiPlanExprs = "Exprs";
static const char* jkForecastFuncPhysiPlanFuncs = "Funcs";
static int32_t physiForecastFuncNodeToJson(const void* pObj, SJson* pJson) {
const SForecastFuncPhysiNode* pNode = (const SForecastFuncPhysiNode*)pObj;
int32_t code = physicPlanNodeToJson(pObj, pJson);
if (TSDB_CODE_SUCCESS == code) {
code = nodeListToJson(pJson, jkForecastFuncPhysiPlanExprs, pNode->pExprs);
}
if (TSDB_CODE_SUCCESS == code) {
code = nodeListToJson(pJson, jkForecastFuncPhysiPlanFuncs, pNode->pFuncs);
}
return code;
}
static int32_t jsonToPhysiForecastFuncNode(const SJson* pJson, void* pObj) {
SForecastFuncPhysiNode* pNode = (SForecastFuncPhysiNode*)pObj;
int32_t code = jsonToPhysicPlanNode(pJson, pObj);
if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeList(pJson, jkForecastFuncPhysiPlanExprs, &pNode->pExprs);
}
if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeList(pJson, jkForecastFuncPhysiPlanFuncs, &pNode->pFuncs);
}
return code;
}
static const char* jkDataSinkInputDataBlockDesc = "InputDataBlockDesc";
static int32_t physicDataSinkNodeToJson(const void* pObj, SJson* pJson) {
@ -4763,6 +4868,36 @@ static int32_t jsonToCountWindowNode(const SJson* pJson, void* pObj) {
return code;
}
static const char* jkAnomalyWindowTsPrimaryKey = "AnomalyTsPrimaryKey";
static const char* jkAnomalyWindowExpr = "AnomalyWindowExpr";
static const char* jkAnomalyWindowOption = "AnomalyWindowOpt";
static int32_t anomalyWindowNodeToJson(const void* pObj, SJson* pJson) {
const SAnomalyWindowNode* pNode = (const SAnomalyWindowNode*)pObj;
int32_t code = tjsonAddObject(pJson, jkAnomalyWindowTsPrimaryKey, nodeToJson, pNode->pCol);
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddObject(pJson, jkAnomalyWindowExpr, nodeToJson, pNode->pExpr);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddStringToObject(pJson, jkAnomalyWindowOption, pNode->anomalyOpt);
}
return code;
}
static int32_t jsonToAnomalyWindowNode(const SJson* pJson, void* pObj) {
SAnomalyWindowNode* pNode = (SAnomalyWindowNode*)pObj;
int32_t code = jsonToNodeObject(pJson, jkAnomalyWindowTsPrimaryKey, &pNode->pCol);
if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeObject(pJson, jkAnomalyWindowExpr, (SNode**)&pNode->pExpr);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetStringValue(pJson, jkAnomalyWindowOption, pNode->anomalyOpt);
}
return code;
}
static const char* jkIntervalWindowInterval = "Interval";
static const char* jkIntervalWindowOffset = "Offset";
static const char* jkIntervalWindowSliding = "Sliding";
@ -6449,6 +6584,39 @@ static int32_t dropQnodeStmtToJson(const void* pObj, SJson* pJson) { return drop
static int32_t jsonToDropQnodeStmt(const SJson* pJson, void* pObj) { return jsonToDropComponentNodeStmt(pJson, pObj); }
static const char* jkCreateAnodeStmtUrl = "Url";
static const char* jkUpdateDropANodeStmtId = "AnodeId";
static int32_t createAnodeStmtToJson(const void* pObj, SJson* pJson) {
const SCreateAnodeStmt* pNode = (const SCreateAnodeStmt*)pObj;
return tjsonAddStringToObject(pJson, jkCreateAnodeStmtUrl, pNode->url);
}
static int32_t jsonToCreateAnodeStmt(const SJson* pJson, void* pObj) {
SCreateAnodeStmt* pNode = (SCreateAnodeStmt*)pObj;
return tjsonGetStringValue(pJson, jkCreateAnodeStmtUrl, pNode->url);
}
static int32_t updateAnodeStmtToJson(const void* pObj, SJson* pJson) {
const SUpdateAnodeStmt* pNode = (const SUpdateAnodeStmt*)pObj;
return tjsonAddIntegerToObject(pJson, jkUpdateDropANodeStmtId, pNode->anodeId);
}
static int32_t jsonToUpdateAnodeStmt(const SJson* pJson, void* pObj) {
SUpdateAnodeStmt* pNode = (SUpdateAnodeStmt*)pObj;
return tjsonGetIntValue(pJson, jkUpdateDropANodeStmtId, &pNode->anodeId);
}
static int32_t dropAnodeStmtToJson(const void* pObj, SJson* pJson) {
const SDropAnodeStmt* pNode = (const SDropAnodeStmt*)pObj;
return tjsonAddIntegerToObject(pJson, jkUpdateDropANodeStmtId, pNode->anodeId);
}
static int32_t jsonToDropAnodeStmt(const SJson* pJson, void* pObj) {
SDropAnodeStmt* pNode = (SDropAnodeStmt*)pObj;
return tjsonGetIntValue(pJson, jkUpdateDropANodeStmtId, &pNode->anodeId);
}
static int32_t createSnodeStmtToJson(const void* pObj, SJson* pJson) {
return createComponentNodeStmtToJson(pObj, pJson);
}
@ -7014,6 +7182,14 @@ static int32_t showQnodesStmtToJson(const void* pObj, SJson* pJson) { return sho
static int32_t jsonToShowQnodesStmt(const SJson* pJson, void* pObj) { return jsonToShowStmt(pJson, pObj); }
static int32_t showAnodesStmtToJson(const void* pObj, SJson* pJson) { return showStmtToJson(pObj, pJson); }
static int32_t jsonToShowAnodesStmt(const SJson* pJson, void* pObj) { return jsonToShowStmt(pJson, pObj); }
static int32_t showAnodesFullStmtToJson(const void* pObj, SJson* pJson) { return showStmtToJson(pObj, pJson); }
static int32_t jsonToShowAnodesFullStmt(const SJson* pJson, void* pObj) { return jsonToShowStmt(pJson, pObj); }
static int32_t showArbGroupsStmtToJson(const void* pObj, SJson* pJson) { return showStmtToJson(pObj, pJson); }
static int32_t jsonToShowArbGroupsStmt(const SJson* pJson, void* pObj) { return jsonToShowStmt(pJson, pObj); }
@ -7550,6 +7726,8 @@ static int32_t specificNodeToJson(const void* pObj, SJson* pJson) {
return windowOffsetNodeToJson(pObj, pJson);
case QUERY_NODE_COUNT_WINDOW:
return countWindowNodeToJson(pObj, pJson);
case QUERY_NODE_ANOMALY_WINDOW:
return anomalyWindowNodeToJson(pObj, pJson);
case QUERY_NODE_SET_OPERATOR:
return setOperatorToJson(pObj, pJson);
case QUERY_NODE_SELECT_STMT:
@ -7602,6 +7780,12 @@ static int32_t specificNodeToJson(const void* pObj, SJson* pJson) {
return createQnodeStmtToJson(pObj, pJson);
case QUERY_NODE_DROP_QNODE_STMT:
return dropQnodeStmtToJson(pObj, pJson);
case QUERY_NODE_CREATE_ANODE_STMT:
return createAnodeStmtToJson(pObj, pJson);
case QUERY_NODE_DROP_ANODE_STMT:
return dropAnodeStmtToJson(pObj, pJson);
case QUERY_NODE_UPDATE_ANODE_STMT:
return updateAnodeStmtToJson(pObj, pJson);
case QUERY_NODE_CREATE_SNODE_STMT:
return createSnodeStmtToJson(pObj, pJson);
case QUERY_NODE_DROP_SNODE_STMT:
@ -7652,6 +7836,10 @@ static int32_t specificNodeToJson(const void* pObj, SJson* pJson) {
return showMnodesStmtToJson(pObj, pJson);
case QUERY_NODE_SHOW_QNODES_STMT:
return showQnodesStmtToJson(pObj, pJson);
case QUERY_NODE_SHOW_ANODES_STMT:
return showAnodesStmtToJson(pObj, pJson);
case QUERY_NODE_SHOW_ANODES_FULL_STMT:
return showAnodesFullStmtToJson(pObj, pJson);
case QUERY_NODE_SHOW_ARBGROUPS_STMT:
return showArbGroupsStmtToJson(pObj, pJson);
case QUERY_NODE_SHOW_CLUSTER_STMT:
@ -7741,6 +7929,8 @@ static int32_t specificNodeToJson(const void* pObj, SJson* pJson) {
return logicIndefRowsFuncNodeToJson(pObj, pJson);
case QUERY_NODE_LOGIC_PLAN_INTERP_FUNC:
return logicInterpFuncNodeToJson(pObj, pJson);
case QUERY_NODE_LOGIC_PLAN_FORECAST_FUNC:
return logicForecastFuncNodeToJson(pObj, pJson);
case QUERY_NODE_LOGIC_PLAN_GROUP_CACHE:
return logicGroupCacheNodeToJson(pObj, pJson);
case QUERY_NODE_LOGIC_PLAN_DYN_QUERY_CTRL:
@ -7801,6 +7991,8 @@ static int32_t specificNodeToJson(const void* pObj, SJson* pJson) {
case QUERY_NODE_PHYSICAL_PLAN_MERGE_COUNT:
case QUERY_NODE_PHYSICAL_PLAN_STREAM_COUNT:
return physiCountWindowNodeToJson(pObj, pJson);
case QUERY_NODE_PHYSICAL_PLAN_MERGE_ANOMALY:
return physiAnomalyWindowNodeToJson(pObj, pJson);
case QUERY_NODE_PHYSICAL_PLAN_PARTITION:
return physiPartitionNodeToJson(pObj, pJson);
case QUERY_NODE_PHYSICAL_PLAN_STREAM_PARTITION:
@ -7809,6 +8001,8 @@ static int32_t specificNodeToJson(const void* pObj, SJson* pJson) {
return physiIndefRowsFuncNodeToJson(pObj, pJson);
case QUERY_NODE_PHYSICAL_PLAN_INTERP_FUNC:
return physiInterpFuncNodeToJson(pObj, pJson);
case QUERY_NODE_PHYSICAL_PLAN_FORECAST_FUNC:
return physiForecastFuncNodeToJson(pObj, pJson);
case QUERY_NODE_PHYSICAL_PLAN_DISPATCH:
return physiDispatchNodeToJson(pObj, pJson);
case QUERY_NODE_PHYSICAL_PLAN_INSERT:
@ -7900,6 +8094,8 @@ static int32_t jsonToSpecificNode(const SJson* pJson, void* pObj) {
return jsonToWindowOffsetNode(pJson, pObj);
case QUERY_NODE_COUNT_WINDOW:
return jsonToCountWindowNode(pJson, pObj);
case QUERY_NODE_ANOMALY_WINDOW:
return jsonToAnomalyWindowNode(pJson, pObj);
case QUERY_NODE_SET_OPERATOR:
return jsonToSetOperator(pJson, pObj);
case QUERY_NODE_SELECT_STMT:
@ -8002,6 +8198,10 @@ static int32_t jsonToSpecificNode(const SJson* pJson, void* pObj) {
return jsonToShowMnodesStmt(pJson, pObj);
case QUERY_NODE_SHOW_QNODES_STMT:
return jsonToShowQnodesStmt(pJson, pObj);
case QUERY_NODE_SHOW_ANODES_STMT:
return jsonToShowAnodesStmt(pJson, pObj);
case QUERY_NODE_SHOW_ANODES_FULL_STMT:
return jsonToShowAnodesFullStmt(pJson, pObj);
case QUERY_NODE_SHOW_ARBGROUPS_STMT:
return jsonToShowArbGroupsStmt(pJson, pObj);
case QUERY_NODE_SHOW_CLUSTER_STMT:
@ -8099,6 +8299,8 @@ static int32_t jsonToSpecificNode(const SJson* pJson, void* pObj) {
return jsonToLogicIndefRowsFuncNode(pJson, pObj);
case QUERY_NODE_LOGIC_PLAN_INTERP_FUNC:
return jsonToLogicInterpFuncNode(pJson, pObj);
case QUERY_NODE_LOGIC_PLAN_FORECAST_FUNC:
return jsonToLogicForecastFuncNode(pJson, pObj);
case QUERY_NODE_LOGIC_PLAN_GROUP_CACHE:
return jsonToLogicGroupCacheNode(pJson, pObj);
case QUERY_NODE_LOGIC_PLAN_DYN_QUERY_CTRL:
@ -8159,6 +8361,8 @@ static int32_t jsonToSpecificNode(const SJson* pJson, void* pObj) {
case QUERY_NODE_PHYSICAL_PLAN_MERGE_COUNT:
case QUERY_NODE_PHYSICAL_PLAN_STREAM_COUNT:
return jsonToPhysiCountWindowNode(pJson, pObj);
case QUERY_NODE_PHYSICAL_PLAN_MERGE_ANOMALY:
return jsonToPhysiAnomalyWindowNode(pJson, pObj);
case QUERY_NODE_PHYSICAL_PLAN_PARTITION:
return jsonToPhysiPartitionNode(pJson, pObj);
case QUERY_NODE_PHYSICAL_PLAN_STREAM_PARTITION:
@ -8167,6 +8371,8 @@ static int32_t jsonToSpecificNode(const SJson* pJson, void* pObj) {
return jsonToPhysiIndefRowsFuncNode(pJson, pObj);
case QUERY_NODE_PHYSICAL_PLAN_INTERP_FUNC:
return jsonToPhysiInterpFuncNode(pJson, pObj);
case QUERY_NODE_PHYSICAL_PLAN_FORECAST_FUNC:
return jsonToPhysiForecastFuncNode(pJson, pObj);
case QUERY_NODE_PHYSICAL_PLAN_DISPATCH:
return jsonToPhysiDispatchNode(pJson, pObj);
case QUERY_NODE_PHYSICAL_PLAN_QUERY_INSERT:

View File

@ -3539,6 +3539,46 @@ static int32_t msgToPhysiCountWindowNode(STlvDecoder* pDecoder, void* pObj) {
return code;
}
enum { PHY_ANOMALY_CODE_WINDOW = 1, PHY_ANOMALY_CODE_KEY, PHY_ANOMALY_CODE_WINDOW_OPTION };
static int32_t physiAnomalyWindowNodeToMsg(const void* pObj, STlvEncoder* pEncoder) {
const SAnomalyWindowPhysiNode* pNode = (const SAnomalyWindowPhysiNode*)pObj;
int32_t code = tlvEncodeObj(pEncoder, PHY_ANOMALY_CODE_WINDOW, physiWindowNodeToMsg, &pNode->window);
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeObj(pEncoder, PHY_ANOMALY_CODE_KEY, nodeToMsg, pNode->pAnomalyKey);
}
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeCStr(pEncoder, PHY_ANOMALY_CODE_WINDOW_OPTION, pNode->anomalyOpt);
}
return code;
}
static int32_t msgToPhysiAnomalyWindowNode(STlvDecoder* pDecoder, void* pObj) {
SAnomalyWindowPhysiNode* pNode = (SAnomalyWindowPhysiNode*)pObj;
int32_t code = TSDB_CODE_SUCCESS;
STlv* pTlv = NULL;
tlvForEach(pDecoder, pTlv, code) {
switch (pTlv->type) {
case PHY_ANOMALY_CODE_WINDOW:
code = tlvDecodeObjFromTlv(pTlv, msgToPhysiWindowNode, &pNode->window);
break;
case PHY_ANOMALY_CODE_KEY:
code = msgToNodeFromTlv(pTlv, (void**)&pNode->pAnomalyKey);
break;
case PHY_ANOMALY_CODE_WINDOW_OPTION:
code = tlvDecodeCStr(pTlv, pNode->anomalyOpt, sizeof(pNode->anomalyOpt));
break;
default:
break;
}
}
return code;
}
enum {
PHY_PARTITION_CODE_BASE_NODE = 1,
PHY_PARTITION_CODE_EXPR,
@ -3770,6 +3810,50 @@ static int32_t msgToPhysiInterpFuncNode(STlvDecoder* pDecoder, void* pObj) {
return code;
}
enum {
PHY_FORECAST_FUNC_CODE_BASE_NODE = 1,
PHY_FORECAST_FUNC_CODE_EXPR,
PHY_FORECAST_FUNC_CODE_FUNCS,
};
static int32_t physiForecastFuncNodeToMsg(const void* pObj, STlvEncoder* pEncoder) {
const SForecastFuncPhysiNode* pNode = (const SForecastFuncPhysiNode*)pObj;
int32_t code = tlvEncodeObj(pEncoder, PHY_FORECAST_FUNC_CODE_BASE_NODE, physiNodeToMsg, &pNode->node);
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeObj(pEncoder, PHY_FORECAST_FUNC_CODE_EXPR, nodeListToMsg, pNode->pExprs);
}
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeObj(pEncoder, PHY_FORECAST_FUNC_CODE_FUNCS, nodeListToMsg, pNode->pFuncs);
}
return code;
}
static int32_t msgToPhysiForecastFuncNode(STlvDecoder* pDecoder, void* pObj) {
SForecastFuncPhysiNode* pNode = (SForecastFuncPhysiNode*)pObj;
int32_t code = TSDB_CODE_SUCCESS;
STlv* pTlv = NULL;
tlvForEach(pDecoder, pTlv, code) {
switch (pTlv->type) {
case PHY_FORECAST_FUNC_CODE_BASE_NODE:
code = tlvDecodeObjFromTlv(pTlv, msgToPhysiNode, &pNode->node);
break;
case PHY_FORECAST_FUNC_CODE_EXPR:
code = msgToNodeListFromTlv(pTlv, (void**)&pNode->pExprs);
break;
case PHY_FORECAST_FUNC_CODE_FUNCS:
code = msgToNodeListFromTlv(pTlv, (void**)&pNode->pFuncs);
break;
default:
break;
}
}
return code;
}
enum { PHY_DATA_SINK_CODE_INPUT_DESC = 1 };
static int32_t physicDataSinkNodeToMsg(const void* pObj, STlvEncoder* pEncoder) {
@ -4536,6 +4620,9 @@ static int32_t specificNodeToMsg(const void* pObj, STlvEncoder* pEncoder) {
case QUERY_NODE_PHYSICAL_PLAN_STREAM_COUNT:
code = physiCountWindowNodeToMsg(pObj, pEncoder);
break;
case QUERY_NODE_PHYSICAL_PLAN_MERGE_ANOMALY:
code = physiAnomalyWindowNodeToMsg(pObj, pEncoder);
break;
case QUERY_NODE_PHYSICAL_PLAN_PARTITION:
code = physiPartitionNodeToMsg(pObj, pEncoder);
break;
@ -4548,6 +4635,9 @@ static int32_t specificNodeToMsg(const void* pObj, STlvEncoder* pEncoder) {
case QUERY_NODE_PHYSICAL_PLAN_INTERP_FUNC:
code = physiInterpFuncNodeToMsg(pObj, pEncoder);
break;
case QUERY_NODE_PHYSICAL_PLAN_FORECAST_FUNC:
code = physiForecastFuncNodeToMsg(pObj, pEncoder);
break;
case QUERY_NODE_PHYSICAL_PLAN_DISPATCH:
code = physiDispatchNodeToMsg(pObj, pEncoder);
break;
@ -4698,6 +4788,9 @@ static int32_t msgToSpecificNode(STlvDecoder* pDecoder, void* pObj) {
case QUERY_NODE_PHYSICAL_PLAN_STREAM_COUNT:
code = msgToPhysiCountWindowNode(pDecoder, pObj);
break;
case QUERY_NODE_PHYSICAL_PLAN_MERGE_ANOMALY:
code = msgToPhysiAnomalyWindowNode(pDecoder, pObj);
break;
case QUERY_NODE_PHYSICAL_PLAN_PARTITION:
code = msgToPhysiPartitionNode(pDecoder, pObj);
break;
@ -4710,6 +4803,9 @@ static int32_t msgToSpecificNode(STlvDecoder* pDecoder, void* pObj) {
case QUERY_NODE_PHYSICAL_PLAN_INTERP_FUNC:
code = msgToPhysiInterpFuncNode(pDecoder, pObj);
break;
case QUERY_NODE_PHYSICAL_PLAN_FORECAST_FUNC:
code = msgToPhysiForecastFuncNode(pDecoder, pObj);
break;
case QUERY_NODE_PHYSICAL_PLAN_DISPATCH:
code = msgToPhysiDispatchNode(pDecoder, pObj);
break;

View File

@ -181,6 +181,14 @@ static EDealRes dispatchExpr(SNode* pNode, ETraversalOrder order, FNodeWalker wa
res = walkExpr(pEvent->pCol, order, walker, pContext);
break;
}
case QUERY_NODE_ANOMALY_WINDOW: {
SAnomalyWindowNode* pAnomaly = (SAnomalyWindowNode*)pNode;
res = walkExpr(pAnomaly->pExpr, order, walker, pContext);
if (DEAL_RES_ERROR != res && DEAL_RES_END != res) {
res = walkExpr(pAnomaly->pCol, order, walker, pContext);
}
break;
}
default:
break;
}
@ -392,6 +400,14 @@ static EDealRes rewriteExpr(SNode** pRawNode, ETraversalOrder order, FNodeRewrit
res = rewriteExpr(&pEvent->pCol, order, rewriter, pContext);
break;
}
case QUERY_NODE_ANOMALY_WINDOW: {
SAnomalyWindowNode* pAnomaly = (SAnomalyWindowNode*)pNode;
res = rewriteExpr(&pAnomaly->pExpr, order, rewriter, pContext);
if (DEAL_RES_ERROR != res && DEAL_RES_END != res) {
res = rewriteExpr(&pAnomaly->pCol, order, rewriter, pContext);
}
break;
}
default:
break;
}

View File

@ -419,6 +419,8 @@ int32_t nodesMakeNode(ENodeType type, SNode** ppNodeOut) {
code = makeNode(type, sizeof(SEventWindowNode), &pNode); break;
case QUERY_NODE_COUNT_WINDOW:
code = makeNode(type, sizeof(SCountWindowNode), &pNode); break;
case QUERY_NODE_ANOMALY_WINDOW:
code = makeNode(type, sizeof(SAnomalyWindowNode), &pNode); break;
case QUERY_NODE_HINT:
code = makeNode(type, sizeof(SHintNode), &pNode); break;
case QUERY_NODE_VIEW:
@ -474,6 +476,12 @@ int32_t nodesMakeNode(ENodeType type, SNode** ppNodeOut) {
code = makeNode(type, sizeof(SDropDnodeStmt), &pNode); break;
case QUERY_NODE_ALTER_DNODE_STMT:
code = makeNode(type, sizeof(SAlterDnodeStmt), &pNode); break;
case QUERY_NODE_CREATE_ANODE_STMT:
code = makeNode(type, sizeof(SCreateAnodeStmt), &pNode); break;
case QUERY_NODE_DROP_ANODE_STMT:
code = makeNode(type, sizeof(SDropAnodeStmt), &pNode); break;
case QUERY_NODE_UPDATE_ANODE_STMT:
code = makeNode(type, sizeof(SUpdateAnodeStmt), &pNode); break;
case QUERY_NODE_CREATE_INDEX_STMT:
code = makeNode(type, sizeof(SCreateIndexStmt), &pNode); break;
case QUERY_NODE_DROP_INDEX_STMT:
@ -540,6 +548,8 @@ int32_t nodesMakeNode(ENodeType type, SNode** ppNodeOut) {
case QUERY_NODE_SHOW_MNODES_STMT:
case QUERY_NODE_SHOW_MODULES_STMT:
case QUERY_NODE_SHOW_QNODES_STMT:
case QUERY_NODE_SHOW_ANODES_STMT:
case QUERY_NODE_SHOW_ANODES_FULL_STMT:
case QUERY_NODE_SHOW_SNODES_STMT:
case QUERY_NODE_SHOW_BNODES_STMT:
case QUERY_NODE_SHOW_ARBGROUPS_STMT:
@ -647,6 +657,8 @@ int32_t nodesMakeNode(ENodeType type, SNode** ppNodeOut) {
code = makeNode(type, sizeof(SIndefRowsFuncLogicNode), &pNode); break;
case QUERY_NODE_LOGIC_PLAN_INTERP_FUNC:
code = makeNode(type, sizeof(SInterpFuncLogicNode), &pNode); break;
case QUERY_NODE_LOGIC_PLAN_FORECAST_FUNC:
code = makeNode(type, sizeof(SForecastFuncLogicNode), &pNode); break;
case QUERY_NODE_LOGIC_PLAN_GROUP_CACHE:
code = makeNode(type, sizeof(SGroupCacheLogicNode), &pNode); break;
case QUERY_NODE_LOGIC_PLAN_DYN_QUERY_CTRL:
@ -722,6 +734,8 @@ int32_t nodesMakeNode(ENodeType type, SNode** ppNodeOut) {
code = makeNode(type, sizeof(SStreamEventWinodwPhysiNode), &pNode); break;
case QUERY_NODE_PHYSICAL_PLAN_MERGE_COUNT:
code = makeNode(type, sizeof(SCountWinodwPhysiNode), &pNode); break;
case QUERY_NODE_PHYSICAL_PLAN_MERGE_ANOMALY:
code = makeNode(type, sizeof(SAnomalyWindowPhysiNode), &pNode); break;
case QUERY_NODE_PHYSICAL_PLAN_STREAM_COUNT:
code = makeNode(type, sizeof(SStreamCountWinodwPhysiNode), &pNode); break;
case QUERY_NODE_PHYSICAL_PLAN_PARTITION:
@ -732,6 +746,8 @@ int32_t nodesMakeNode(ENodeType type, SNode** ppNodeOut) {
code = makeNode(type, sizeof(SIndefRowsFuncPhysiNode), &pNode); break;
case QUERY_NODE_PHYSICAL_PLAN_INTERP_FUNC:
code = makeNode(type, sizeof(SInterpFuncLogicNode), &pNode); break;
case QUERY_NODE_PHYSICAL_PLAN_FORECAST_FUNC:
code = makeNode(type, sizeof(SForecastFuncLogicNode), &pNode); break;
case QUERY_NODE_PHYSICAL_PLAN_DISPATCH:
code = makeNode(type, sizeof(SDataDispatcherNode), &pNode); break;
case QUERY_NODE_PHYSICAL_PLAN_INSERT:
@ -1019,6 +1035,11 @@ void nodesDestroyNode(SNode* pNode) {
nodesDestroyNode(pEvent->pCol);
break;
}
case QUERY_NODE_ANOMALY_WINDOW: {
SAnomalyWindowNode* pAnomaly = (SAnomalyWindowNode*)pNode;
nodesDestroyNode(pAnomaly->pCol);
break;
}
case QUERY_NODE_HINT: {
SHintNode* pHint = (SHintNode*)pNode;
destroyHintValue(pHint->option, pHint->value);
@ -1167,6 +1188,9 @@ void nodesDestroyNode(SNode* pNode) {
case QUERY_NODE_CREATE_DNODE_STMT: // no pointer field
case QUERY_NODE_DROP_DNODE_STMT: // no pointer field
case QUERY_NODE_ALTER_DNODE_STMT: // no pointer field
case QUERY_NODE_CREATE_ANODE_STMT: // no pointer field
case QUERY_NODE_UPDATE_ANODE_STMT: // no pointer field
case QUERY_NODE_DROP_ANODE_STMT: // no pointer field
break;
case QUERY_NODE_CREATE_INDEX_STMT: {
SCreateIndexStmt* pStmt = (SCreateIndexStmt*)pNode;
@ -1252,6 +1276,8 @@ void nodesDestroyNode(SNode* pNode) {
case QUERY_NODE_SHOW_MNODES_STMT:
case QUERY_NODE_SHOW_MODULES_STMT:
case QUERY_NODE_SHOW_QNODES_STMT:
case QUERY_NODE_SHOW_ANODES_STMT:
case QUERY_NODE_SHOW_ANODES_FULL_STMT:
case QUERY_NODE_SHOW_SNODES_STMT:
case QUERY_NODE_SHOW_BNODES_STMT:
case QUERY_NODE_SHOW_ARBGROUPS_STMT:
@ -1500,6 +1526,12 @@ void nodesDestroyNode(SNode* pNode) {
nodesDestroyNode(pLogicNode->pTimeSeries);
break;
}
case QUERY_NODE_LOGIC_PLAN_FORECAST_FUNC: {
SForecastFuncLogicNode* pLogicNode = (SForecastFuncLogicNode*)pNode;
destroyLogicNode((SLogicNode*)pLogicNode);
nodesDestroyList(pLogicNode->pFuncs);
break;
}
case QUERY_NODE_LOGIC_PLAN_GROUP_CACHE: {
SGroupCacheLogicNode* pLogicNode = (SGroupCacheLogicNode*)pNode;
destroyLogicNode((SLogicNode*)pLogicNode);
@ -1663,6 +1695,11 @@ void nodesDestroyNode(SNode* pNode) {
destroyWinodwPhysiNode((SWindowPhysiNode*)pPhyNode);
break;
}
case QUERY_NODE_PHYSICAL_PLAN_MERGE_ANOMALY: {
SAnomalyWindowPhysiNode* pPhyNode = (SAnomalyWindowPhysiNode*)pNode;
destroyWinodwPhysiNode((SWindowPhysiNode*)pPhyNode);
break;
}
case QUERY_NODE_PHYSICAL_PLAN_PARTITION: {
destroyPartitionPhysiNode((SPartitionPhysiNode*)pNode);
break;
@ -1690,6 +1727,13 @@ void nodesDestroyNode(SNode* pNode) {
nodesDestroyNode(pPhyNode->pTimeSeries);
break;
}
case QUERY_NODE_PHYSICAL_PLAN_FORECAST_FUNC: {
SForecastFuncPhysiNode* pPhyNode = (SForecastFuncPhysiNode*)pNode;
destroyPhysiNode((SPhysiNode*)pPhyNode);
nodesDestroyList(pPhyNode->pExprs);
nodesDestroyList(pPhyNode->pFuncs);
break;
}
case QUERY_NODE_PHYSICAL_PLAN_DISPATCH:
destroyDataSinkNode((SDataSinkNode*)pNode);
break;

View File

@ -157,6 +157,12 @@ with_clause_opt(A) ::= WITH search_condition(B).
/************************************************ create encrypt_key *********************************************/
cmd ::= CREATE ENCRYPT_KEY NK_STRING(A). { pCxt->pRootNode = createEncryptKeyStmt(pCxt, &A); }
/************************************************ create drop update anode ***************************************/
cmd ::= CREATE ANODE NK_STRING(A). { pCxt->pRootNode = createCreateAnodeStmt(pCxt, &A); }
cmd ::= UPDATE ANODE NK_INTEGER(A). { pCxt->pRootNode = createUpdateAnodeStmt(pCxt, &A, false); }
cmd ::= UPDATE ALL ANODES. { pCxt->pRootNode = createUpdateAnodeStmt(pCxt, NULL, true); }
cmd ::= DROP ANODE NK_INTEGER(A). { pCxt->pRootNode = createDropAnodeStmt(pCxt, &A); }
/************************************************ create/drop/alter/restore dnode *********************************************/
cmd ::= CREATE DNODE dnode_endpoint(A). { pCxt->pRootNode = createCreateDnodeStmt(pCxt, &A, NULL); }
cmd ::= CREATE DNODE dnode_endpoint(A) PORT NK_INTEGER(B). { pCxt->pRootNode = createCreateDnodeStmt(pCxt, &A, &B); }
@ -524,6 +530,8 @@ cmd ::= SHOW db_name_cond_opt(A) VGROUPS.
cmd ::= SHOW MNODES. { pCxt->pRootNode = createShowStmt(pCxt, QUERY_NODE_SHOW_MNODES_STMT); }
//cmd ::= SHOW MODULES. { pCxt->pRootNode = createShowStmt(pCxt, QUERY_NODE_SHOW_MODULES_STMT); }
cmd ::= SHOW QNODES. { pCxt->pRootNode = createShowStmt(pCxt, QUERY_NODE_SHOW_QNODES_STMT); }
cmd ::= SHOW ANODES. { pCxt->pRootNode = createShowStmt(pCxt, QUERY_NODE_SHOW_ANODES_STMT); }
cmd ::= SHOW ANODES FULL. { pCxt->pRootNode = createShowStmt(pCxt, QUERY_NODE_SHOW_ANODES_FULL_STMT); }
cmd ::= SHOW ARBGROUPS. { pCxt->pRootNode = createShowStmt(pCxt, QUERY_NODE_SHOW_ARBGROUPS_STMT); }
cmd ::= SHOW FUNCTIONS. { pCxt->pRootNode = createShowStmt(pCxt, QUERY_NODE_SHOW_FUNCTIONS_STMT); }
cmd ::= SHOW INDEXES FROM table_name_cond(A) from_db_opt(B). { pCxt->pRootNode = createShowStmtWithCond(pCxt, QUERY_NODE_SHOW_INDEXES_STMT, B, A, OP_TYPE_EQUAL); }
@ -1187,6 +1195,9 @@ pseudo_column(A) ::= WDURATION(B).
pseudo_column(A) ::= IROWTS(B). { A = createRawExprNode(pCxt, &B, createFunctionNode(pCxt, &B, NULL)); }
pseudo_column(A) ::= ISFILLED(B). { A = createRawExprNode(pCxt, &B, createFunctionNode(pCxt, &B, NULL)); }
pseudo_column(A) ::= QTAGS(B). { A = createRawExprNode(pCxt, &B, createFunctionNode(pCxt, &B, NULL)); }
pseudo_column(A) ::= FLOW(B). { A = createRawExprNode(pCxt, &B, createFunctionNode(pCxt, &B, NULL)); }
pseudo_column(A) ::= FHIGH(B). { A = createRawExprNode(pCxt, &B, createFunctionNode(pCxt, &B, NULL)); }
pseudo_column(A) ::= FROWTS(B). { A = createRawExprNode(pCxt, &B, createFunctionNode(pCxt, &B, NULL)); }
function_expression(A) ::= function_name(B) NK_LP expression_list(C) NK_RP(D). { A = createRawExprNodeExt(pCxt, &B, &D, createFunctionNode(pCxt, &B, C)); }
function_expression(A) ::= star_func(B) NK_LP star_func_para_list(C) NK_RP(D). { A = createRawExprNodeExt(pCxt, &B, &D, createFunctionNode(pCxt, &B, C)); }
@ -1505,6 +1516,10 @@ twindow_clause_opt(A) ::=
COUNT_WINDOW NK_LP NK_INTEGER(B) NK_RP. { A = createCountWindowNode(pCxt, &B, &B); }
twindow_clause_opt(A) ::=
COUNT_WINDOW NK_LP NK_INTEGER(B) NK_COMMA NK_INTEGER(C) NK_RP. { A = createCountWindowNode(pCxt, &B, &C); }
twindow_clause_opt(A) ::=
ANOMALY_WINDOW NK_LP expr_or_subquery(B) NK_RP. { A = createAnomalyWindowNode(pCxt, releaseRawExprNode(pCxt, B), NULL); }
twindow_clause_opt(A) ::=
ANOMALY_WINDOW NK_LP expr_or_subquery(B) NK_COMMA NK_STRING(C) NK_RP. { A = createAnomalyWindowNode(pCxt, releaseRawExprNode(pCxt, B), &C); }
sliding_opt(A) ::= . { A = NULL; }
sliding_opt(A) ::= SLIDING NK_LP interval_sliding_duration_literal(B) NK_RP. { A = releaseRawExprNode(pCxt, B); }

View File

@ -555,6 +555,22 @@ static int32_t collectMetaKeyFromShowSnodes(SCollectMetaKeyCxt* pCxt, SShowStmt*
return TSDB_CODE_SUCCESS;
}
static int32_t collectMetaKeyFromShowAnodes(SCollectMetaKeyCxt* pCxt, SShowStmt* pStmt) {
if (pCxt->pParseCxt->enableSysInfo) {
return reserveTableMetaInCache(pCxt->pParseCxt->acctId, TSDB_INFORMATION_SCHEMA_DB, TSDB_INS_TABLE_ANODES,
pCxt->pMetaCache);
}
return TSDB_CODE_SUCCESS;
}
static int32_t collectMetaKeyFromShowAnodesFull(SCollectMetaKeyCxt* pCxt, SShowStmt* pStmt) {
if (pCxt->pParseCxt->enableSysInfo) {
return reserveTableMetaInCache(pCxt->pParseCxt->acctId, TSDB_INFORMATION_SCHEMA_DB, TSDB_INS_TABLE_ANODES_FULL,
pCxt->pMetaCache);
}
return TSDB_CODE_SUCCESS;
}
static int32_t collectMetaKeyFromShowBnodes(SCollectMetaKeyCxt* pCxt, SShowStmt* pStmt) {
if (pCxt->pParseCxt->enableSysInfo) {
return reserveTableMetaInCache(pCxt->pParseCxt->acctId, TSDB_INFORMATION_SCHEMA_DB, TSDB_INS_TABLE_BNODES,
@ -983,6 +999,10 @@ static int32_t collectMetaKeyFromQuery(SCollectMetaKeyCxt* pCxt, SNode* pStmt) {
return collectMetaKeyFromShowQnodes(pCxt, (SShowStmt*)pStmt);
case QUERY_NODE_SHOW_SNODES_STMT:
return collectMetaKeyFromShowSnodes(pCxt, (SShowStmt*)pStmt);
case QUERY_NODE_SHOW_ANODES_STMT:
return collectMetaKeyFromShowAnodes(pCxt, (SShowStmt*)pStmt);
case QUERY_NODE_SHOW_ANODES_FULL_STMT:
return collectMetaKeyFromShowAnodesFull(pCxt, (SShowStmt*)pStmt);
case QUERY_NODE_SHOW_BNODES_STMT:
return collectMetaKeyFromShowBnodes(pCxt, (SShowStmt*)pStmt);
case QUERY_NODE_SHOW_ARBGROUPS_STMT:

View File

@ -358,6 +358,8 @@ static int32_t authQuery(SAuthCxt* pCxt, SNode* pStmt) {
case QUERY_NODE_SHOW_MNODES_STMT:
case QUERY_NODE_SHOW_MODULES_STMT:
case QUERY_NODE_SHOW_QNODES_STMT:
case QUERY_NODE_SHOW_ANODES_STMT:
case QUERY_NODE_SHOW_ANODES_FULL_STMT:
case QUERY_NODE_SHOW_SNODES_STMT:
case QUERY_NODE_SHOW_BNODES_STMT:
case QUERY_NODE_SHOW_CLUSTER_STMT:

View File

@ -38,6 +38,9 @@ static SKeyword keywordTable[] = {
{"ANALYZE", TK_ANALYZE},
{"AND", TK_AND},
{"ANTI", TK_ANTI},
{"ANODE", TK_ANODE},
{"ANODES", TK_ANODES},
{"ANOMALY_WINDOW", TK_ANOMALY_WINDOW},
// {"ANY", TK_ANY},
{"APPS", TK_APPS},
{"AS", TK_AS},
@ -332,6 +335,9 @@ static SKeyword keywordTable[] = {
{"_WDURATION", TK_WDURATION},
{"_WEND", TK_WEND},
{"_WSTART", TK_WSTART},
{"_FLOW", TK_FLOW},
{"_FHIGH", TK_FHIGH},
{"_FROWTS", TK_FROWTS},
{"ALIVE", TK_ALIVE},
{"VARBINARY", TK_VARBINARY},
{"S3_CHUNKSIZE", TK_S3_CHUNKSIZE},

View File

@ -24,6 +24,7 @@
#include "parUtil.h"
#include "scalar.h"
#include "systable.h"
#include "tanal.h"
#include "tcol.h"
#include "tglobal.h"
#include "ttime.h"
@ -348,6 +349,20 @@ static const SSysTableShowAdapter sysTableShowAdapter[] = {
.numOfShowCols = 1,
.pShowCols = {"*"}
},
{
.showType = QUERY_NODE_SHOW_ANODES_STMT,
.pDbName = TSDB_INFORMATION_SCHEMA_DB,
.pTableName = TSDB_INS_TABLE_ANODES,
.numOfShowCols = 1,
.pShowCols = {"*"}
},
{
.showType = QUERY_NODE_SHOW_ANODES_FULL_STMT,
.pDbName = TSDB_INFORMATION_SCHEMA_DB,
.pTableName = TSDB_INS_TABLE_ANODES_FULL,
.numOfShowCols = 1,
.pShowCols = {"*"}
},
};
// clang-format on
@ -1035,6 +1050,14 @@ static bool isInterpPseudoColumnFunc(const SNode* pNode) {
return (QUERY_NODE_FUNCTION == nodeType(pNode) && fmIsInterpPseudoColumnFunc(((SFunctionNode*)pNode)->funcId));
}
static bool isForecastFunc(const SNode* pNode) {
return (QUERY_NODE_FUNCTION == nodeType(pNode) && fmIsForecastFunc(((SFunctionNode*)pNode)->funcId));
}
static bool isForecastPseudoColumnFunc(const SNode* pNode) {
return (QUERY_NODE_FUNCTION == nodeType(pNode) && fmIsForecastPseudoColumnFunc(((SFunctionNode*)pNode)->funcId));
}
#ifdef BUILD_NO_CALL
static bool isTimelineFunc(const SNode* pNode) {
return (QUERY_NODE_FUNCTION == nodeType(pNode) && fmIsTimelineFunc(((SFunctionNode*)pNode)->funcId));
@ -1237,7 +1260,7 @@ bool isPrimaryKeyImpl(SNode* pExpr) {
FUNCTION_TYPE_LAST_ROW == pFunc->funcType || FUNCTION_TYPE_TIMETRUNCATE == pFunc->funcType) {
return isPrimaryKeyImpl(nodesListGetNode(pFunc->pParameterList, 0));
} else if (FUNCTION_TYPE_WSTART == pFunc->funcType || FUNCTION_TYPE_WEND == pFunc->funcType ||
FUNCTION_TYPE_IROWTS == pFunc->funcType) {
FUNCTION_TYPE_IROWTS == pFunc->funcType || FUNCTION_TYPE_FORECAST_ROWTS == pFunc->funcType) {
return true;
}
} else if (QUERY_NODE_OPERATOR == nodeType(pExpr)) {
@ -2250,7 +2273,7 @@ static EDealRes translateOperator(STranslateContext* pCxt, SOperatorNode* pOp) {
static EDealRes haveVectorFunction(SNode* pNode, void* pContext) {
if (isAggFunc(pNode) || isIndefiniteRowsFunc(pNode) || isWindowPseudoColumnFunc(pNode) ||
isInterpPseudoColumnFunc(pNode)) {
isInterpPseudoColumnFunc(pNode) || isForecastPseudoColumnFunc(pNode)) {
*((bool*)pContext) = true;
return DEAL_RES_END;
}
@ -2553,6 +2576,72 @@ static int32_t translateInterpPseudoColumnFunc(STranslateContext* pCxt, SNode**
return TSDB_CODE_SUCCESS;
}
static int32_t translateForecastFunc(STranslateContext* pCxt, SFunctionNode* pFunc) {
if (!fmIsForecastFunc(pFunc->funcId)) {
return TSDB_CODE_SUCCESS;
}
if (!isSelectStmt(pCxt->pCurrStmt) || SQL_CLAUSE_SELECT != pCxt->currClause) {
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_NOT_ALLOWED_FUNC);
}
SSelectStmt* pSelect = (SSelectStmt*)pCxt->pCurrStmt;
SNode* pTable = pSelect->pFromTable;
if (pSelect->hasAggFuncs || pSelect->hasMultiRowsFunc || pSelect->hasIndefiniteRowsFunc) {
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_NOT_ALLOWED_FUNC);
}
if (pSelect->hasForecastFunc &&
(FUNC_RETURN_ROWS_INDEFINITE == pSelect->returnRows || pSelect->returnRows != fmGetFuncReturnRows(pFunc))) {
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_NOT_ALLOWED_FUNC,
"%s ignoring null value options cannot be used when applying to multiple columns",
pFunc->functionName);
}
if (NULL != pSelect->pWindow || NULL != pSelect->pGroupByList) {
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_NOT_ALLOWED_FUNC,
"%s function is not supported in window query or group query", pFunc->functionName);
}
if (hasInvalidFuncNesting(pFunc->pParameterList)) {
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_AGG_FUNC_NESTING);
}
return TSDB_CODE_SUCCESS;
}
static int32_t translateForecastPseudoColumnFunc(STranslateContext* pCxt, SNode** ppNode, bool* pRewriteToColumn) {
SFunctionNode* pFunc = (SFunctionNode*)(*ppNode);
if (!fmIsForecastPseudoColumnFunc(pFunc->funcId)) {
return TSDB_CODE_SUCCESS;
}
if (!isSelectStmt(pCxt->pCurrStmt)) {
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_NOT_ALLOWED_FUNC,
"%s must be used in select statements", pFunc->functionName);
}
if (pCxt->currClause == SQL_CLAUSE_WHERE) {
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_INTERP_CLAUSE,
"%s is not allowed in where clause", pFunc->functionName);
}
SSelectStmt* pSelect = (SSelectStmt*)pCxt->pCurrStmt;
SNode* pNode = NULL;
bool bFound = false;
FOREACH(pNode, pSelect->pProjectionList) {
if (nodeType(pNode) == QUERY_NODE_FUNCTION && strcasecmp(((SFunctionNode*)pNode)->functionName, "forecast") == 0) {
bFound = true;
break;
}
}
if (!bFound) {
*pRewriteToColumn = true;
int32_t code = replacePsedudoColumnFuncWithColumn(pCxt, ppNode);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
(void)translateColumn(pCxt, (SColumnNode**)ppNode);
return pCxt->errCode;
}
return TSDB_CODE_SUCCESS;
}
static int32_t translateTimelineFunc(STranslateContext* pCxt, SFunctionNode* pFunc) {
if (!fmIsTimelineFunc(pFunc->funcId)) {
return TSDB_CODE_SUCCESS;
@ -2738,6 +2827,8 @@ static void setFuncClassification(STranslateContext* pCxt, SFunctionNode* pFunc)
pSelect->returnRows = fmGetFuncReturnRows(pFunc);
} else if (fmIsInterpFunc(pFunc->funcId)) {
pSelect->returnRows = fmGetFuncReturnRows(pFunc);
} else if (fmIsForecastFunc(pFunc->funcId)) {
pSelect->returnRows = fmGetFuncReturnRows(pFunc);
}
if (fmIsProcessByRowFunc(pFunc->funcId)) {
pSelect->lastProcessByRowFuncId = pFunc->funcId;
@ -2755,6 +2846,9 @@ static void setFuncClassification(STranslateContext* pCxt, SFunctionNode* pFunc)
pSelect->hasInterpFunc = pSelect->hasInterpFunc ? true : (FUNCTION_TYPE_INTERP == pFunc->funcType);
pSelect->hasInterpPseudoColFunc =
pSelect->hasInterpPseudoColFunc ? true : fmIsInterpPseudoColumnFunc(pFunc->funcId);
pSelect->hasForecastFunc = pSelect->hasForecastFunc ? true : (FUNCTION_TYPE_FORECAST == pFunc->funcType);
pSelect->hasForecastPseudoColFunc =
pSelect->hasForecastPseudoColFunc ? true : fmIsForecastPseudoColumnFunc(pFunc->funcId);
pSelect->hasLastRowFunc = pSelect->hasLastRowFunc ? true : (FUNCTION_TYPE_LAST_ROW == pFunc->funcType);
pSelect->hasLastFunc = pSelect->hasLastFunc ? true : (FUNCTION_TYPE_LAST == pFunc->funcType);
pSelect->hasTimeLineFunc = pSelect->hasTimeLineFunc ? true : fmIsTimelineFunc(pFunc->funcId);
@ -2946,6 +3040,9 @@ static int32_t translateScanPseudoColumnFunc(STranslateContext* pCxt, SNode** pp
return TSDB_CODE_SUCCESS;
}
if (0 == LIST_LENGTH(pFunc->pParameterList)) {
if (pFunc->funcType == FUNCTION_TYPE_FORECAST_LOW || pFunc->funcType == FUNCTION_TYPE_FORECAST_HIGH) {
return TSDB_CODE_SUCCESS;
}
if (!isSelectStmt(pCxt->pCurrStmt) || NULL == ((SSelectStmt*)pCxt->pCurrStmt)->pFromTable) {
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_TBNAME);
}
@ -3016,6 +3113,16 @@ static int32_t translateNormalFunction(STranslateContext* pCxt, SNode** ppNode)
return code;
}
}
if (TSDB_CODE_SUCCESS == code) {
code = translateForecastFunc(pCxt, pFunc);
}
if (TSDB_CODE_SUCCESS == code) {
bool bRewriteToColumn = false;
code = translateForecastPseudoColumnFunc(pCxt, ppNode, &bRewriteToColumn);
if (bRewriteToColumn) {
return code;
}
}
if (TSDB_CODE_SUCCESS == code) {
code = translateTimelineFunc(pCxt, pFunc);
}
@ -3759,7 +3866,8 @@ static int32_t resetSelectFuncNumWithoutDup(SSelectStmt* pSelect) {
static int32_t checkAggColCoexist(STranslateContext* pCxt, SSelectStmt* pSelect) {
if (NULL != pSelect->pGroupByList || NULL != pSelect->pWindow || isWindowJoinStmt(pSelect) ||
(!pSelect->hasAggFuncs && !pSelect->hasIndefiniteRowsFunc && !pSelect->hasInterpFunc)) {
(!pSelect->hasAggFuncs && !pSelect->hasIndefiniteRowsFunc && !pSelect->hasInterpFunc &&
!pSelect->hasForecastFunc)) {
return TSDB_CODE_SUCCESS;
}
if (!pSelect->onlyHasKeepOrderFunc) {
@ -3781,8 +3889,8 @@ static int32_t checkAggColCoexist(STranslateContext* pCxt, SSelectStmt* pSelect)
}
static int32_t checkWinJoinAggColCoexist(STranslateContext* pCxt, SSelectStmt* pSelect) {
if (!isWindowJoinStmt(pSelect) ||
(!pSelect->hasAggFuncs && !pSelect->hasIndefiniteRowsFunc && !pSelect->hasInterpFunc)) {
if (!isWindowJoinStmt(pSelect) || (!pSelect->hasAggFuncs && !pSelect->hasIndefiniteRowsFunc &&
!pSelect->hasInterpFunc && !pSelect->hasForecastFunc)) {
return TSDB_CODE_SUCCESS;
}
if (!pSelect->onlyHasKeepOrderFunc) {
@ -5795,6 +5903,40 @@ static int32_t translateCountWindow(STranslateContext* pCxt, SSelectStmt* pSelec
return TSDB_CODE_SUCCESS;
}
static int32_t checkAnomalyExpr(STranslateContext* pCxt, SNode* pNode) {
int32_t type = ((SExprNode*)pNode)->resType.type;
if (!IS_MATHABLE_TYPE(type)) {
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_ANOMALY_WIN_TYPE,
"ANOMALY_WINDOW only support mathable column");
}
if (QUERY_NODE_COLUMN == nodeType(pNode) && COLUMN_TYPE_TAG == ((SColumnNode*)pNode)->colType) {
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_ANOMALY_WIN_COL,
"ANOMALY_WINDOW not support on tag column");
}
return TSDB_CODE_SUCCESS;
}
static int32_t translateAnomalyWindow(STranslateContext* pCxt, SSelectStmt* pSelect) {
if (QUERY_NODE_TEMP_TABLE == nodeType(pSelect->pFromTable) &&
!isGlobalTimeLineQuery(((STempTableNode*)pSelect->pFromTable)->pSubquery)) {
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_TIMELINE_QUERY,
"ANOMALY_WINDOW requires valid time series input");
}
SAnomalyWindowNode* pAnomaly = (SAnomalyWindowNode*)pSelect->pWindow;
int32_t code = checkAnomalyExpr(pCxt, pAnomaly->pExpr);
if (TSDB_CODE_SUCCESS == code) {
if (!taosAnalGetOptStr(pAnomaly->anomalyOpt, "algo", NULL, 0) != 0) {
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_ANOMALY_WIN_OPT,
"ANOMALY_WINDOW option should include algo field");
}
}
return code;
}
static int32_t translateSpecificWindow(STranslateContext* pCxt, SSelectStmt* pSelect) {
switch (nodeType(pSelect->pWindow)) {
case QUERY_NODE_STATE_WINDOW:
@ -5807,6 +5949,8 @@ static int32_t translateSpecificWindow(STranslateContext* pCxt, SSelectStmt* pSe
return translateEventWindow(pCxt, pSelect);
case QUERY_NODE_COUNT_WINDOW:
return translateCountWindow(pCxt, pSelect);
case QUERY_NODE_ANOMALY_WINDOW:
return translateAnomalyWindow(pCxt, pSelect);
default:
break;
}
@ -6043,6 +6187,26 @@ static int32_t translateInterp(STranslateContext* pCxt, SSelectStmt* pSelect) {
return code;
}
static int32_t translateForecast(STranslateContext* pCxt, SSelectStmt* pSelect) {
if (!pSelect->hasForecastFunc) {
if (pSelect->hasForecastPseudoColFunc) {
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_NOT_ALLOWED_FUNC,
"Has Forecast pseudo column(s) but missing forcast function");
}
return TSDB_CODE_SUCCESS;
}
if ((NULL != pSelect->pFromTable) && (QUERY_NODE_JOIN_TABLE == nodeType(pSelect->pFromTable))) {
SJoinTableNode* pJoinTable = (SJoinTableNode*)pSelect->pFromTable;
if (IS_WINDOW_JOIN(pJoinTable->subType)) {
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_FORECAST_CLAUSE,
"Forecast not supported to be used in WINDOW join");
}
}
return 0;
}
static int32_t removeConstantValueFromList(SNodeList** pList) {
SNode* pNode = NULL;
WHERE_EACH(pNode, *pList) {
@ -6884,6 +7048,9 @@ static int32_t translateSelectFrom(STranslateContext* pCxt, SSelectStmt* pSelect
if (TSDB_CODE_SUCCESS == code) {
code = translateInterp(pCxt, pSelect);
}
if (TSDB_CODE_SUCCESS == code) {
code = translateForecast(pCxt, pSelect);
}
if (TSDB_CODE_SUCCESS == code) {
code = appendTsForImplicitTsFunc(pCxt, pSelect);
}
@ -7895,6 +8062,19 @@ static int32_t fillCmdSql(STranslateContext* pCxt, int16_t msgType, void* pReq)
break;
}
case TDMT_MND_CREATE_ANODE: {
FILL_CMD_SQL(sql, sqlLen, pCmdReq, SMCreateAnodeReq, pReq);
break;
}
case TDMT_MND_DROP_ANODE: {
FILL_CMD_SQL(sql, sqlLen, pCmdReq, SMDropAnodeReq, pReq);
break;
}
case TDMT_MND_UPDATE_ANODE: {
FILL_CMD_SQL(sql, sqlLen, pCmdReq, SMUpdateAnodeReq, pReq);
break;
}
case TDMT_MND_CREATE_MNODE: {
FILL_CMD_SQL(sql, sqlLen, pCmdReq, SMCreateMnodeReq, pReq);
break;
@ -9398,6 +9578,39 @@ static int32_t translateDropUser(STranslateContext* pCxt, SDropUserStmt* pStmt)
return code;
}
static int32_t translateCreateAnode(STranslateContext* pCxt, SCreateAnodeStmt* pStmt) {
SMCreateAnodeReq createReq = {0};
createReq.urlLen = strlen(pStmt->url) + 1;
createReq.url = taosMemoryCalloc(createReq.urlLen, 1);
if (createReq.url == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
tstrncpy(createReq.url, pStmt->url, createReq.urlLen);
int32_t code = buildCmdMsg(pCxt, TDMT_MND_CREATE_ANODE, (FSerializeFunc)tSerializeSMCreateAnodeReq, &createReq);
tFreeSMCreateAnodeReq(&createReq);
return code;
}
static int32_t translateDropAnode(STranslateContext* pCxt, SDropAnodeStmt* pStmt) {
SMDropAnodeReq dropReq = {0};
dropReq.anodeId = pStmt->anodeId;
int32_t code = buildCmdMsg(pCxt, TDMT_MND_DROP_ANODE, (FSerializeFunc)tSerializeSMDropAnodeReq, &dropReq);
tFreeSMDropAnodeReq(&dropReq);
return code;
}
static int32_t translateUpdateAnode(STranslateContext* pCxt, SUpdateAnodeStmt* pStmt) {
SMUpdateAnodeReq updateReq = {0};
updateReq.anodeId = pStmt->anodeId;
int32_t code = buildCmdMsg(pCxt, TDMT_MND_UPDATE_ANODE, (FSerializeFunc)tSerializeSMUpdateAnodeReq, &updateReq);
tFreeSMUpdateAnodeReq(&updateReq);
return code;
}
static int32_t translateCreateDnode(STranslateContext* pCxt, SCreateDnodeStmt* pStmt) {
SCreateDnodeReq createReq = {0};
strcpy(createReq.fqdn, pStmt->fqdn);
@ -9820,7 +10033,7 @@ static int32_t translateDropComponentNode(STranslateContext* pCxt, SDropComponen
}
static int32_t checkTopicQuery(STranslateContext* pCxt, SSelectStmt* pSelect) {
if (pSelect->hasAggFuncs || pSelect->hasInterpFunc || pSelect->hasIndefiniteRowsFunc) {
if (pSelect->hasAggFuncs || pSelect->hasForecastFunc || pSelect->hasInterpFunc || pSelect->hasIndefiniteRowsFunc) {
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_TOPIC_QUERY);
}
return TSDB_CODE_SUCCESS;
@ -10186,7 +10399,8 @@ static int32_t translateKillTransaction(STranslateContext* pCxt, SKillStmt* pStm
static bool crossTableWithoutAggOper(SSelectStmt* pSelect) {
return NULL == pSelect->pWindow && !pSelect->hasAggFuncs && !pSelect->hasIndefiniteRowsFunc &&
!pSelect->hasInterpFunc && TSDB_SUPER_TABLE == ((SRealTableNode*)pSelect->pFromTable)->pMeta->tableType &&
!pSelect->hasInterpFunc && !pSelect->hasForecastFunc &&
TSDB_SUPER_TABLE == ((SRealTableNode*)pSelect->pFromTable)->pMeta->tableType &&
!hasTbnameFunction(pSelect->pPartitionByList);
}
@ -12389,6 +12603,15 @@ static int32_t translateQuery(STranslateContext* pCxt, SNode* pNode) {
case QUERY_NODE_ALTER_DNODE_STMT:
code = translateAlterDnode(pCxt, (SAlterDnodeStmt*)pNode);
break;
case QUERY_NODE_CREATE_ANODE_STMT:
code = translateCreateAnode(pCxt, (SCreateAnodeStmt*)pNode);
break;
case QUERY_NODE_DROP_ANODE_STMT:
code = translateDropAnode(pCxt, (SDropAnodeStmt*)pNode);
break;
case QUERY_NODE_UPDATE_ANODE_STMT:
code = translateUpdateAnode(pCxt, (SUpdateAnodeStmt*)pNode);
break;
case QUERY_NODE_CREATE_INDEX_STMT:
code = translateCreateIndex(pCxt, (SCreateIndexStmt*)pNode);
break;
@ -15749,6 +15972,8 @@ static int32_t rewriteQuery(STranslateContext* pCxt, SQuery* pQuery) {
case QUERY_NODE_SHOW_MNODES_STMT:
case QUERY_NODE_SHOW_MODULES_STMT:
case QUERY_NODE_SHOW_QNODES_STMT:
case QUERY_NODE_SHOW_ANODES_STMT:
case QUERY_NODE_SHOW_ANODES_FULL_STMT:
case QUERY_NODE_SHOW_FUNCTIONS_STMT:
case QUERY_NODE_SHOW_INDEXES_STMT:
case QUERY_NODE_SHOW_STREAMS_STMT:

File diff suppressed because it is too large Load Diff

View File

@ -973,6 +973,45 @@ static int32_t createInterpFuncLogicNode(SLogicPlanContext* pCxt, SSelectStmt* p
return code;
}
static bool isForecastFunc(int32_t funcId) {
return fmIsForecastFunc(funcId) || fmIsForecastPseudoColumnFunc(funcId) || fmIsGroupKeyFunc(funcId) || fmisSelectGroupConstValueFunc(funcId);
}
static int32_t createForecastFuncLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect, SLogicNode** pLogicNode) {
if (!pSelect->hasForecastFunc) {
return TSDB_CODE_SUCCESS;
}
SForecastFuncLogicNode* pForecastFunc = NULL;
int32_t code = nodesMakeNode(QUERY_NODE_LOGIC_PLAN_FORECAST_FUNC, (SNode**)&pForecastFunc);
if (NULL == pForecastFunc) {
return code;
}
pForecastFunc->node.groupAction = getGroupAction(pCxt, pSelect);
pForecastFunc->node.requireDataOrder = getRequireDataOrder(true, pSelect);
pForecastFunc->node.resultDataOrder = pForecastFunc->node.requireDataOrder;
// interp functions and _group_key functions
code = nodesCollectFuncs(pSelect, SQL_CLAUSE_SELECT, NULL, isForecastFunc, &pForecastFunc->pFuncs);
if (TSDB_CODE_SUCCESS == code) {
code = rewriteExprsForSelect(pForecastFunc->pFuncs, pSelect, SQL_CLAUSE_SELECT, NULL);
}
// set the output
if (TSDB_CODE_SUCCESS == code) {
code = createColumnByRewriteExprs(pForecastFunc->pFuncs, &pForecastFunc->node.pTargets);
}
if (TSDB_CODE_SUCCESS == code) {
*pLogicNode = (SLogicNode*)pForecastFunc;
} else {
nodesDestroyNode((SNode*)pForecastFunc);
}
return code;
}
static int32_t createWindowLogicNodeFinalize(SLogicPlanContext* pCxt, SSelectStmt* pSelect, SWindowLogicNode* pWindow,
SLogicNode** pLogicNode) {
if (pCxt->pPlanCxt->streamQuery) {
@ -1174,6 +1213,48 @@ static int32_t createWindowLogicNodeByCount(SLogicPlanContext* pCxt, SCountWindo
return createWindowLogicNodeFinalize(pCxt, pSelect, pWindow, pLogicNode);
}
static int32_t createWindowLogicNodeByAnomaly(SLogicPlanContext* pCxt, SAnomalyWindowNode* pAnomaly,
SSelectStmt* pSelect, SLogicNode** pLogicNode) {
SWindowLogicNode* pWindow = NULL;
int32_t code = nodesMakeNode(QUERY_NODE_LOGIC_PLAN_WINDOW, (SNode**)&pWindow);
if (NULL == pWindow) {
return code;
}
pWindow->winType = WINDOW_TYPE_ANOMALY;
pWindow->node.groupAction = getGroupAction(pCxt, pSelect);
pWindow->node.requireDataOrder =
pCxt->pPlanCxt->streamQuery ? DATA_ORDER_LEVEL_IN_BLOCK : getRequireDataOrder(true, pSelect);
pWindow->node.resultDataOrder =
pCxt->pPlanCxt->streamQuery ? DATA_ORDER_LEVEL_GLOBAL : pWindow->node.requireDataOrder;
pWindow->pAnomalyExpr = NULL;
code = nodesCloneNode(pAnomaly->pExpr, &pWindow->pAnomalyExpr);
if (TSDB_CODE_SUCCESS != code) {
nodesDestroyNode((SNode*)pWindow);
return code;
}
tstrncpy(pWindow->anomalyOpt, pAnomaly->anomalyOpt, sizeof(pWindow->anomalyOpt));
pWindow->pTspk = NULL;
code = nodesCloneNode(pAnomaly->pCol, &pWindow->pTspk);
if (NULL == pWindow->pTspk) {
nodesDestroyNode((SNode*)pWindow);
return code;
}
// rewrite the expression in subsequent clauses
code = rewriteExprForSelect(pWindow->pAnomalyExpr, pSelect, SQL_CLAUSE_WINDOW);
if (TSDB_CODE_SUCCESS == code) {
code = createWindowLogicNodeFinalize(pCxt, pSelect, pWindow, pLogicNode);
} else {
nodesDestroyNode((SNode*)pWindow);
}
return code;
}
static int32_t createWindowLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect, SLogicNode** pLogicNode) {
if (NULL == pSelect->pWindow) {
return TSDB_CODE_SUCCESS;
@ -1189,6 +1270,8 @@ static int32_t createWindowLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSele
return createWindowLogicNodeByEvent(pCxt, (SEventWindowNode*)pSelect->pWindow, pSelect, pLogicNode);
case QUERY_NODE_COUNT_WINDOW:
return createWindowLogicNodeByCount(pCxt, (SCountWindowNode*)pSelect->pWindow, pSelect, pLogicNode);
case QUERY_NODE_ANOMALY_WINDOW:
return createWindowLogicNodeByAnomaly(pCxt, (SAnomalyWindowNode*)pSelect->pWindow, pSelect, pLogicNode);
default:
break;
}
@ -1600,6 +1683,9 @@ static int32_t createSelectFromLogicNode(SLogicPlanContext* pCxt, SSelectStmt* p
if (TSDB_CODE_SUCCESS == code) {
code = createSelectRootLogicNode(pCxt, pSelect, createInterpFuncLogicNode, &pRoot);
}
if (TSDB_CODE_SUCCESS == code) {
code = createSelectRootLogicNode(pCxt, pSelect, createForecastFuncLogicNode, &pRoot);
}
if (TSDB_CODE_SUCCESS == code) {
code = createSelectRootLogicNode(pCxt, pSelect, createDistinctLogicNode, &pRoot);
}

View File

@ -1990,6 +1990,50 @@ static int32_t createInterpFuncPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pCh
return code;
}
static int32_t createForecastFuncPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren,
SForecastFuncLogicNode* pFuncLogicNode, SPhysiNode** pPhyNode) {
SForecastFuncPhysiNode* pForecastFunc =
(SForecastFuncPhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pFuncLogicNode, QUERY_NODE_PHYSICAL_PLAN_FORECAST_FUNC);
if (NULL == pForecastFunc) {
return terrno;
}
SNodeList* pPrecalcExprs = NULL;
SNodeList* pFuncs = NULL;
int32_t code = rewritePrecalcExprs(pCxt, pFuncLogicNode->pFuncs, &pPrecalcExprs, &pFuncs);
SDataBlockDescNode* pChildTupe = (((SPhysiNode*)nodesListGetNode(pChildren, 0))->pOutputDataBlockDesc);
// push down expression to pOutputDataBlockDesc of child node
if (TSDB_CODE_SUCCESS == code && NULL != pPrecalcExprs) {
code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pPrecalcExprs, &pForecastFunc->pExprs);
if (TSDB_CODE_SUCCESS == code) {
code = pushdownDataBlockSlots(pCxt, pForecastFunc->pExprs, pChildTupe);
}
}
if (TSDB_CODE_SUCCESS == code) {
code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pFuncs, &pForecastFunc->pFuncs);
if (TSDB_CODE_SUCCESS == code) {
code = addDataBlockSlots(pCxt, pForecastFunc->pFuncs, pForecastFunc->node.pOutputDataBlockDesc);
}
}
if (TSDB_CODE_SUCCESS == code) {
code = setConditionsSlotId(pCxt, (const SLogicNode*)pFuncLogicNode, (SPhysiNode*)pForecastFunc);
}
if (TSDB_CODE_SUCCESS == code) {
*pPhyNode = (SPhysiNode*)pForecastFunc;
} else {
nodesDestroyNode((SNode*)pForecastFunc);
}
nodesDestroyList(pPrecalcExprs);
nodesDestroyList(pFuncs);
return code;
}
static bool projectCanMergeDataBlock(SProjectLogicNode* pProject) {
if (GROUP_ACTION_KEEP == pProject->node.groupAction) {
return false;
@ -2325,6 +2369,53 @@ static int32_t createCountWindowPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pC
return code;
}
static int32_t createAnomalyWindowPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren,
SWindowLogicNode* pWindowLogicNode, SPhysiNode** pPhyNode) {
SAnomalyWindowPhysiNode* pAnomaly = (SAnomalyWindowPhysiNode*)makePhysiNode(
pCxt, (SLogicNode*)pWindowLogicNode,
(pCxt->pPlanCxt->streamQuery ? QUERY_NODE_PHYSICAL_PLAN_STREAM_ANOMALY : QUERY_NODE_PHYSICAL_PLAN_MERGE_ANOMALY));
if (NULL == pAnomaly) {
return terrno;
}
SNodeList* pPrecalcExprs = NULL;
SNode* pAnomalyKey = NULL;
int32_t code = rewritePrecalcExpr(pCxt, pWindowLogicNode->pAnomalyExpr, &pPrecalcExprs, &pAnomalyKey);
SDataBlockDescNode* pChildTupe = (((SPhysiNode*)nodesListGetNode(pChildren, 0))->pOutputDataBlockDesc);
// push down expression to pOutputDataBlockDesc of child node
if (TSDB_CODE_SUCCESS == code && NULL != pPrecalcExprs) {
code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pPrecalcExprs, &pAnomaly->window.pExprs);
if (TSDB_CODE_SUCCESS == code) {
code = addDataBlockSlots(pCxt, pAnomaly->window.pExprs, pChildTupe);
}
}
if (TSDB_CODE_SUCCESS == code) {
code = setNodeSlotId(pCxt, pChildTupe->dataBlockId, -1, pAnomalyKey, &pAnomaly->pAnomalyKey);
// if (TSDB_CODE_SUCCESS == code) {
// code = addDataBlockSlot(pCxt, &pAnomaly->pAnomalyKey, pAnomaly->window.node.pOutputDataBlockDesc);
// }
}
tstrncpy(pAnomaly->anomalyOpt, pWindowLogicNode->anomalyOpt, sizeof(pAnomaly->anomalyOpt));
if (TSDB_CODE_SUCCESS == code) {
code = createWindowPhysiNodeFinalize(pCxt, pChildren, &pAnomaly->window, pWindowLogicNode);
}
if (TSDB_CODE_SUCCESS == code) {
*pPhyNode = (SPhysiNode*)pAnomaly;
} else {
nodesDestroyNode((SNode*)pAnomaly);
}
nodesDestroyList(pPrecalcExprs);
nodesDestroyNode(pAnomalyKey);
return code;
}
static int32_t createWindowPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren, SWindowLogicNode* pWindowLogicNode,
SPhysiNode** pPhyNode) {
switch (pWindowLogicNode->winType) {
@ -2338,6 +2429,8 @@ static int32_t createWindowPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildr
return createEventWindowPhysiNode(pCxt, pChildren, pWindowLogicNode, pPhyNode);
case WINDOW_TYPE_COUNT:
return createCountWindowPhysiNode(pCxt, pChildren, pWindowLogicNode, pPhyNode);
case WINDOW_TYPE_ANOMALY:
return createAnomalyWindowPhysiNode(pCxt, pChildren, pWindowLogicNode, pPhyNode);
default:
break;
}
@ -2652,6 +2745,8 @@ static int32_t doCreatePhysiNode(SPhysiPlanContext* pCxt, SLogicNode* pLogicNode
return createIndefRowsFuncPhysiNode(pCxt, pChildren, (SIndefRowsFuncLogicNode*)pLogicNode, pPhyNode);
case QUERY_NODE_LOGIC_PLAN_INTERP_FUNC:
return createInterpFuncPhysiNode(pCxt, pChildren, (SInterpFuncLogicNode*)pLogicNode, pPhyNode);
case QUERY_NODE_LOGIC_PLAN_FORECAST_FUNC:
return createForecastFuncPhysiNode(pCxt, pChildren, (SForecastFuncLogicNode*)pLogicNode, pPhyNode);
case QUERY_NODE_LOGIC_PLAN_MERGE:
return createMergePhysiNode(pCxt, pChildren, (SMergeLogicNode*)pLogicNode, pPhyNode);
case QUERY_NODE_LOGIC_PLAN_GROUP_CACHE:

View File

@ -256,6 +256,15 @@ static int32_t adjustCountDataRequirement(SWindowLogicNode* pWindow, EDataOrderL
return TSDB_CODE_SUCCESS;
}
static int32_t adjustAnomalyDataRequirement(SWindowLogicNode* pWindow, EDataOrderLevel requirement) {
if (requirement <= pWindow->node.resultDataOrder) {
return TSDB_CODE_SUCCESS;
}
pWindow->node.resultDataOrder = requirement;
pWindow->node.requireDataOrder = requirement;
return TSDB_CODE_SUCCESS;
}
static int32_t adjustWindowDataRequirement(SWindowLogicNode* pWindow, EDataOrderLevel requirement) {
switch (pWindow->winType) {
case WINDOW_TYPE_INTERVAL:
@ -268,6 +277,8 @@ static int32_t adjustWindowDataRequirement(SWindowLogicNode* pWindow, EDataOrder
return adjustEventDataRequirement(pWindow, requirement);
case WINDOW_TYPE_COUNT:
return adjustCountDataRequirement(pWindow, requirement);
case WINDOW_TYPE_ANOMALY:
return adjustAnomalyDataRequirement(pWindow, requirement);
default:
break;
}
@ -318,6 +329,15 @@ static int32_t adjustInterpDataRequirement(SInterpFuncLogicNode* pInterp, EDataO
return TSDB_CODE_SUCCESS;
}
static int32_t adjustForecastDataRequirement(SForecastFuncLogicNode* pForecast, EDataOrderLevel requirement) {
if (requirement <= pForecast->node.requireDataOrder) {
return TSDB_CODE_SUCCESS;
}
pForecast->node.resultDataOrder = requirement;
pForecast->node.requireDataOrder = requirement;
return TSDB_CODE_SUCCESS;
}
int32_t adjustLogicNodeDataRequirement(SLogicNode* pNode, EDataOrderLevel requirement) {
int32_t code = TSDB_CODE_SUCCESS;
switch (nodeType(pNode)) {
@ -355,6 +375,9 @@ int32_t adjustLogicNodeDataRequirement(SLogicNode* pNode, EDataOrderLevel requir
case QUERY_NODE_LOGIC_PLAN_INTERP_FUNC:
code = adjustInterpDataRequirement((SInterpFuncLogicNode*)pNode, requirement);
break;
case QUERY_NODE_LOGIC_PLAN_FORECAST_FUNC:
code = adjustForecastDataRequirement((SForecastFuncLogicNode*)pNode, requirement);
break;
default:
break;
}

View File

@ -6,6 +6,7 @@
,,n,unit-test,bash test.sh
,,n,develop-test,python3 ./test.py -f 5-taos-tools/taosbenchmark/auto_create_table_json.py
#
# army-test
@ -1542,7 +1543,6 @@
,,n,develop-test,python3 ./test.py -f 2-query/ts-range.py
,,n,develop-test,python3 ./test.py -f 2-query/tag_scan.py
,,n,develop-test,python3 ./test.py -f 2-query/show_create_db.py
,,n,develop-test,python3 ./test.py -f 5-taos-tools/taosbenchmark/auto_create_table_json.py
,,n,develop-test,python3 ./test.py -f 5-taos-tools/taosbenchmark/custom_col_tag.py
,,n,develop-test,python3 ./test.py -f 5-taos-tools/taosbenchmark/default_json.py
,,n,develop-test,python3 ./test.py -f 5-taos-tools/taosbenchmark/demo.py