feat: add stream option 'delete_mark'
This commit is contained in:
parent
c45001744f
commit
bc413b8fff
|
@ -504,6 +504,8 @@ typedef struct {
|
||||||
char* pComment;
|
char* pComment;
|
||||||
char* pAst1;
|
char* pAst1;
|
||||||
char* pAst2;
|
char* pAst2;
|
||||||
|
int64_t deleteMark1;
|
||||||
|
int64_t deleteMark2;
|
||||||
} SMCreateStbReq;
|
} SMCreateStbReq;
|
||||||
|
|
||||||
int32_t tSerializeSMCreateStbReq(void* buf, int32_t bufLen, SMCreateStbReq* pReq);
|
int32_t tSerializeSMCreateStbReq(void* buf, int32_t bufLen, SMCreateStbReq* pReq);
|
||||||
|
@ -2017,6 +2019,7 @@ typedef struct {
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int64_t maxdelay[2];
|
int64_t maxdelay[2];
|
||||||
int64_t watermark[2];
|
int64_t watermark[2];
|
||||||
|
int64_t deleteMark[2];
|
||||||
int32_t qmsgLen[2];
|
int32_t qmsgLen[2];
|
||||||
char* qmsg[2]; // pAst:qmsg:SRetention => trigger aggr task1/2
|
char* qmsg[2]; // pAst:qmsg:SRetention => trigger aggr task1/2
|
||||||
} SRSmaParam;
|
} SRSmaParam;
|
||||||
|
@ -2738,6 +2741,7 @@ typedef struct {
|
||||||
char* tagsFilter;
|
char* tagsFilter;
|
||||||
char* sql;
|
char* sql;
|
||||||
char* ast;
|
char* ast;
|
||||||
|
int64_t deleteMark;
|
||||||
} SMCreateSmaReq;
|
} SMCreateSmaReq;
|
||||||
|
|
||||||
int32_t tSerializeSMCreateSmaReq(void* buf, int32_t bufLen, SMCreateSmaReq* pReq);
|
int32_t tSerializeSMCreateSmaReq(void* buf, int32_t bufLen, SMCreateSmaReq* pReq);
|
||||||
|
|
|
@ -147,195 +147,196 @@
|
||||||
#define TK_ROLLUP 129
|
#define TK_ROLLUP 129
|
||||||
#define TK_TTL 130
|
#define TK_TTL 130
|
||||||
#define TK_SMA 131
|
#define TK_SMA 131
|
||||||
#define TK_FIRST 132
|
#define TK_DELETE_MARK 132
|
||||||
#define TK_LAST 133
|
#define TK_FIRST 133
|
||||||
#define TK_SHOW 134
|
#define TK_LAST 134
|
||||||
#define TK_PRIVILEGES 135
|
#define TK_SHOW 135
|
||||||
#define TK_DATABASES 136
|
#define TK_PRIVILEGES 136
|
||||||
#define TK_TABLES 137
|
#define TK_DATABASES 137
|
||||||
#define TK_STABLES 138
|
#define TK_TABLES 138
|
||||||
#define TK_MNODES 139
|
#define TK_STABLES 139
|
||||||
#define TK_QNODES 140
|
#define TK_MNODES 140
|
||||||
#define TK_FUNCTIONS 141
|
#define TK_QNODES 141
|
||||||
#define TK_INDEXES 142
|
#define TK_FUNCTIONS 142
|
||||||
#define TK_ACCOUNTS 143
|
#define TK_INDEXES 143
|
||||||
#define TK_APPS 144
|
#define TK_ACCOUNTS 144
|
||||||
#define TK_CONNECTIONS 145
|
#define TK_APPS 145
|
||||||
#define TK_LICENCES 146
|
#define TK_CONNECTIONS 146
|
||||||
#define TK_GRANTS 147
|
#define TK_LICENCES 147
|
||||||
#define TK_QUERIES 148
|
#define TK_GRANTS 148
|
||||||
#define TK_SCORES 149
|
#define TK_QUERIES 149
|
||||||
#define TK_TOPICS 150
|
#define TK_SCORES 150
|
||||||
#define TK_VARIABLES 151
|
#define TK_TOPICS 151
|
||||||
#define TK_CLUSTER 152
|
#define TK_VARIABLES 152
|
||||||
#define TK_BNODES 153
|
#define TK_CLUSTER 153
|
||||||
#define TK_SNODES 154
|
#define TK_BNODES 154
|
||||||
#define TK_TRANSACTIONS 155
|
#define TK_SNODES 155
|
||||||
#define TK_DISTRIBUTED 156
|
#define TK_TRANSACTIONS 156
|
||||||
#define TK_CONSUMERS 157
|
#define TK_DISTRIBUTED 157
|
||||||
#define TK_SUBSCRIPTIONS 158
|
#define TK_CONSUMERS 158
|
||||||
#define TK_VNODES 159
|
#define TK_SUBSCRIPTIONS 159
|
||||||
#define TK_LIKE 160
|
#define TK_VNODES 160
|
||||||
#define TK_TBNAME 161
|
#define TK_LIKE 161
|
||||||
#define TK_QTAGS 162
|
#define TK_TBNAME 162
|
||||||
#define TK_AS 163
|
#define TK_QTAGS 163
|
||||||
#define TK_INDEX 164
|
#define TK_AS 164
|
||||||
#define TK_FUNCTION 165
|
#define TK_INDEX 165
|
||||||
#define TK_INTERVAL 166
|
#define TK_FUNCTION 166
|
||||||
#define TK_TOPIC 167
|
#define TK_INTERVAL 167
|
||||||
#define TK_WITH 168
|
#define TK_TOPIC 168
|
||||||
#define TK_META 169
|
#define TK_WITH 169
|
||||||
#define TK_CONSUMER 170
|
#define TK_META 170
|
||||||
#define TK_GROUP 171
|
#define TK_CONSUMER 171
|
||||||
#define TK_DESC 172
|
#define TK_GROUP 172
|
||||||
#define TK_DESCRIBE 173
|
#define TK_DESC 173
|
||||||
#define TK_RESET 174
|
#define TK_DESCRIBE 174
|
||||||
#define TK_QUERY 175
|
#define TK_RESET 175
|
||||||
#define TK_CACHE 176
|
#define TK_QUERY 176
|
||||||
#define TK_EXPLAIN 177
|
#define TK_CACHE 177
|
||||||
#define TK_ANALYZE 178
|
#define TK_EXPLAIN 178
|
||||||
#define TK_VERBOSE 179
|
#define TK_ANALYZE 179
|
||||||
#define TK_NK_BOOL 180
|
#define TK_VERBOSE 180
|
||||||
#define TK_RATIO 181
|
#define TK_NK_BOOL 181
|
||||||
#define TK_NK_FLOAT 182
|
#define TK_RATIO 182
|
||||||
#define TK_OUTPUTTYPE 183
|
#define TK_NK_FLOAT 183
|
||||||
#define TK_AGGREGATE 184
|
#define TK_OUTPUTTYPE 184
|
||||||
#define TK_BUFSIZE 185
|
#define TK_AGGREGATE 185
|
||||||
#define TK_STREAM 186
|
#define TK_BUFSIZE 186
|
||||||
#define TK_INTO 187
|
#define TK_STREAM 187
|
||||||
#define TK_TRIGGER 188
|
#define TK_INTO 188
|
||||||
#define TK_AT_ONCE 189
|
#define TK_TRIGGER 189
|
||||||
#define TK_WINDOW_CLOSE 190
|
#define TK_AT_ONCE 190
|
||||||
#define TK_IGNORE 191
|
#define TK_WINDOW_CLOSE 191
|
||||||
#define TK_EXPIRED 192
|
#define TK_IGNORE 192
|
||||||
#define TK_FILL_HISTORY 193
|
#define TK_EXPIRED 193
|
||||||
#define TK_SUBTABLE 194
|
#define TK_FILL_HISTORY 194
|
||||||
#define TK_KILL 195
|
#define TK_SUBTABLE 195
|
||||||
#define TK_CONNECTION 196
|
#define TK_KILL 196
|
||||||
#define TK_TRANSACTION 197
|
#define TK_CONNECTION 197
|
||||||
#define TK_BALANCE 198
|
#define TK_TRANSACTION 198
|
||||||
#define TK_VGROUP 199
|
#define TK_BALANCE 199
|
||||||
#define TK_MERGE 200
|
#define TK_VGROUP 200
|
||||||
#define TK_REDISTRIBUTE 201
|
#define TK_MERGE 201
|
||||||
#define TK_SPLIT 202
|
#define TK_REDISTRIBUTE 202
|
||||||
#define TK_DELETE 203
|
#define TK_SPLIT 203
|
||||||
#define TK_INSERT 204
|
#define TK_DELETE 204
|
||||||
#define TK_NULL 205
|
#define TK_INSERT 205
|
||||||
#define TK_NK_QUESTION 206
|
#define TK_NULL 206
|
||||||
#define TK_NK_ARROW 207
|
#define TK_NK_QUESTION 207
|
||||||
#define TK_ROWTS 208
|
#define TK_NK_ARROW 208
|
||||||
#define TK_QSTART 209
|
#define TK_ROWTS 209
|
||||||
#define TK_QEND 210
|
#define TK_QSTART 210
|
||||||
#define TK_QDURATION 211
|
#define TK_QEND 211
|
||||||
#define TK_WSTART 212
|
#define TK_QDURATION 212
|
||||||
#define TK_WEND 213
|
#define TK_WSTART 213
|
||||||
#define TK_WDURATION 214
|
#define TK_WEND 214
|
||||||
#define TK_IROWTS 215
|
#define TK_WDURATION 215
|
||||||
#define TK_CAST 216
|
#define TK_IROWTS 216
|
||||||
#define TK_NOW 217
|
#define TK_CAST 217
|
||||||
#define TK_TODAY 218
|
#define TK_NOW 218
|
||||||
#define TK_TIMEZONE 219
|
#define TK_TODAY 219
|
||||||
#define TK_CLIENT_VERSION 220
|
#define TK_TIMEZONE 220
|
||||||
#define TK_SERVER_VERSION 221
|
#define TK_CLIENT_VERSION 221
|
||||||
#define TK_SERVER_STATUS 222
|
#define TK_SERVER_VERSION 222
|
||||||
#define TK_CURRENT_USER 223
|
#define TK_SERVER_STATUS 223
|
||||||
#define TK_COUNT 224
|
#define TK_CURRENT_USER 224
|
||||||
#define TK_LAST_ROW 225
|
#define TK_COUNT 225
|
||||||
#define TK_CASE 226
|
#define TK_LAST_ROW 226
|
||||||
#define TK_END 227
|
#define TK_CASE 227
|
||||||
#define TK_WHEN 228
|
#define TK_END 228
|
||||||
#define TK_THEN 229
|
#define TK_WHEN 229
|
||||||
#define TK_ELSE 230
|
#define TK_THEN 230
|
||||||
#define TK_BETWEEN 231
|
#define TK_ELSE 231
|
||||||
#define TK_IS 232
|
#define TK_BETWEEN 232
|
||||||
#define TK_NK_LT 233
|
#define TK_IS 233
|
||||||
#define TK_NK_GT 234
|
#define TK_NK_LT 234
|
||||||
#define TK_NK_LE 235
|
#define TK_NK_GT 235
|
||||||
#define TK_NK_GE 236
|
#define TK_NK_LE 236
|
||||||
#define TK_NK_NE 237
|
#define TK_NK_GE 237
|
||||||
#define TK_MATCH 238
|
#define TK_NK_NE 238
|
||||||
#define TK_NMATCH 239
|
#define TK_MATCH 239
|
||||||
#define TK_CONTAINS 240
|
#define TK_NMATCH 240
|
||||||
#define TK_IN 241
|
#define TK_CONTAINS 241
|
||||||
#define TK_JOIN 242
|
#define TK_IN 242
|
||||||
#define TK_INNER 243
|
#define TK_JOIN 243
|
||||||
#define TK_SELECT 244
|
#define TK_INNER 244
|
||||||
#define TK_DISTINCT 245
|
#define TK_SELECT 245
|
||||||
#define TK_WHERE 246
|
#define TK_DISTINCT 246
|
||||||
#define TK_PARTITION 247
|
#define TK_WHERE 247
|
||||||
#define TK_BY 248
|
#define TK_PARTITION 248
|
||||||
#define TK_SESSION 249
|
#define TK_BY 249
|
||||||
#define TK_STATE_WINDOW 250
|
#define TK_SESSION 250
|
||||||
#define TK_SLIDING 251
|
#define TK_STATE_WINDOW 251
|
||||||
#define TK_FILL 252
|
#define TK_SLIDING 252
|
||||||
#define TK_VALUE 253
|
#define TK_FILL 253
|
||||||
#define TK_NONE 254
|
#define TK_VALUE 254
|
||||||
#define TK_PREV 255
|
#define TK_NONE 255
|
||||||
#define TK_LINEAR 256
|
#define TK_PREV 256
|
||||||
#define TK_NEXT 257
|
#define TK_LINEAR 257
|
||||||
#define TK_HAVING 258
|
#define TK_NEXT 258
|
||||||
#define TK_RANGE 259
|
#define TK_HAVING 259
|
||||||
#define TK_EVERY 260
|
#define TK_RANGE 260
|
||||||
#define TK_ORDER 261
|
#define TK_EVERY 261
|
||||||
#define TK_SLIMIT 262
|
#define TK_ORDER 262
|
||||||
#define TK_SOFFSET 263
|
#define TK_SLIMIT 263
|
||||||
#define TK_LIMIT 264
|
#define TK_SOFFSET 264
|
||||||
#define TK_OFFSET 265
|
#define TK_LIMIT 265
|
||||||
#define TK_ASC 266
|
#define TK_OFFSET 266
|
||||||
#define TK_NULLS 267
|
#define TK_ASC 267
|
||||||
#define TK_ABORT 268
|
#define TK_NULLS 268
|
||||||
#define TK_AFTER 269
|
#define TK_ABORT 269
|
||||||
#define TK_ATTACH 270
|
#define TK_AFTER 270
|
||||||
#define TK_BEFORE 271
|
#define TK_ATTACH 271
|
||||||
#define TK_BEGIN 272
|
#define TK_BEFORE 272
|
||||||
#define TK_BITAND 273
|
#define TK_BEGIN 273
|
||||||
#define TK_BITNOT 274
|
#define TK_BITAND 274
|
||||||
#define TK_BITOR 275
|
#define TK_BITNOT 275
|
||||||
#define TK_BLOCKS 276
|
#define TK_BITOR 276
|
||||||
#define TK_CHANGE 277
|
#define TK_BLOCKS 277
|
||||||
#define TK_COMMA 278
|
#define TK_CHANGE 278
|
||||||
#define TK_COMPACT 279
|
#define TK_COMMA 279
|
||||||
#define TK_CONCAT 280
|
#define TK_COMPACT 280
|
||||||
#define TK_CONFLICT 281
|
#define TK_CONCAT 281
|
||||||
#define TK_COPY 282
|
#define TK_CONFLICT 282
|
||||||
#define TK_DEFERRED 283
|
#define TK_COPY 283
|
||||||
#define TK_DELIMITERS 284
|
#define TK_DEFERRED 284
|
||||||
#define TK_DETACH 285
|
#define TK_DELIMITERS 285
|
||||||
#define TK_DIVIDE 286
|
#define TK_DETACH 286
|
||||||
#define TK_DOT 287
|
#define TK_DIVIDE 287
|
||||||
#define TK_EACH 288
|
#define TK_DOT 288
|
||||||
#define TK_FAIL 289
|
#define TK_EACH 289
|
||||||
#define TK_FILE 290
|
#define TK_FAIL 290
|
||||||
#define TK_FOR 291
|
#define TK_FILE 291
|
||||||
#define TK_GLOB 292
|
#define TK_FOR 292
|
||||||
#define TK_ID 293
|
#define TK_GLOB 293
|
||||||
#define TK_IMMEDIATE 294
|
#define TK_ID 294
|
||||||
#define TK_IMPORT 295
|
#define TK_IMMEDIATE 295
|
||||||
#define TK_INITIALLY 296
|
#define TK_IMPORT 296
|
||||||
#define TK_INSTEAD 297
|
#define TK_INITIALLY 297
|
||||||
#define TK_ISNULL 298
|
#define TK_INSTEAD 298
|
||||||
#define TK_KEY 299
|
#define TK_ISNULL 299
|
||||||
#define TK_MODULES 300
|
#define TK_KEY 300
|
||||||
#define TK_NK_BITNOT 301
|
#define TK_MODULES 301
|
||||||
#define TK_NK_SEMI 302
|
#define TK_NK_BITNOT 302
|
||||||
#define TK_NOTNULL 303
|
#define TK_NK_SEMI 303
|
||||||
#define TK_OF 304
|
#define TK_NOTNULL 304
|
||||||
#define TK_PLUS 305
|
#define TK_OF 305
|
||||||
#define TK_PRIVILEGE 306
|
#define TK_PLUS 306
|
||||||
#define TK_RAISE 307
|
#define TK_PRIVILEGE 307
|
||||||
#define TK_REPLACE 308
|
#define TK_RAISE 308
|
||||||
#define TK_RESTRICT 309
|
#define TK_REPLACE 309
|
||||||
#define TK_ROW 310
|
#define TK_RESTRICT 310
|
||||||
#define TK_SEMI 311
|
#define TK_ROW 311
|
||||||
#define TK_STAR 312
|
#define TK_SEMI 312
|
||||||
#define TK_STATEMENT 313
|
#define TK_STAR 313
|
||||||
#define TK_STRING 314
|
#define TK_STATEMENT 314
|
||||||
#define TK_TIMES 315
|
#define TK_STRING 315
|
||||||
#define TK_UPDATE 316
|
#define TK_TIMES 316
|
||||||
#define TK_VALUES 317
|
#define TK_UPDATE 317
|
||||||
#define TK_VARIABLE 318
|
#define TK_VALUES 318
|
||||||
#define TK_VIEW 319
|
#define TK_VARIABLE 319
|
||||||
#define TK_WAL 320
|
#define TK_VIEW 320
|
||||||
|
#define TK_WAL 321
|
||||||
|
|
||||||
#define TK_NK_SPACE 600
|
#define TK_NK_SPACE 600
|
||||||
#define TK_NK_COMMENT 601
|
#define TK_NK_COMMENT 601
|
||||||
|
|
|
@ -133,6 +133,9 @@ typedef struct STableOptions {
|
||||||
SNodeList* pWatermark;
|
SNodeList* pWatermark;
|
||||||
int64_t watermark1;
|
int64_t watermark1;
|
||||||
int64_t watermark2;
|
int64_t watermark2;
|
||||||
|
SNodeList* pDeleteMark;
|
||||||
|
int64_t deleteMark1;
|
||||||
|
int64_t deleteMark2;
|
||||||
SNodeList* pRollupFuncs;
|
SNodeList* pRollupFuncs;
|
||||||
int32_t ttl;
|
int32_t ttl;
|
||||||
SNodeList* pSma;
|
SNodeList* pSma;
|
||||||
|
@ -383,6 +386,7 @@ typedef struct SStreamOptions {
|
||||||
int8_t triggerType;
|
int8_t triggerType;
|
||||||
SNode* pDelay;
|
SNode* pDelay;
|
||||||
SNode* pWatermark;
|
SNode* pWatermark;
|
||||||
|
SNode* pDeleteMark;
|
||||||
int8_t fillHistory;
|
int8_t fillHistory;
|
||||||
int8_t ignoreExpired;
|
int8_t ignoreExpired;
|
||||||
} SStreamOptions;
|
} SStreamOptions;
|
||||||
|
|
|
@ -91,6 +91,7 @@ typedef struct SScanLogicNode {
|
||||||
SNode* pTagIndexCond;
|
SNode* pTagIndexCond;
|
||||||
int8_t triggerType;
|
int8_t triggerType;
|
||||||
int64_t watermark;
|
int64_t watermark;
|
||||||
|
int64_t deleteMark;
|
||||||
int8_t igExpired;
|
int8_t igExpired;
|
||||||
SArray* pSmaIndexes;
|
SArray* pSmaIndexes;
|
||||||
SNodeList* pGroupTags;
|
SNodeList* pGroupTags;
|
||||||
|
@ -213,6 +214,7 @@ typedef struct SWindowLogicNode {
|
||||||
SNode* pStateExpr;
|
SNode* pStateExpr;
|
||||||
int8_t triggerType;
|
int8_t triggerType;
|
||||||
int64_t watermark;
|
int64_t watermark;
|
||||||
|
int64_t deleteMark;
|
||||||
int8_t igExpired;
|
int8_t igExpired;
|
||||||
EWindowAlgorithm windowAlgo;
|
EWindowAlgorithm windowAlgo;
|
||||||
EOrder inputTsOrder;
|
EOrder inputTsOrder;
|
||||||
|
@ -440,6 +442,7 @@ typedef struct SWinodwPhysiNode {
|
||||||
SNode* pTsEnd; // window end timestamp
|
SNode* pTsEnd; // window end timestamp
|
||||||
int8_t triggerType;
|
int8_t triggerType;
|
||||||
int64_t watermark;
|
int64_t watermark;
|
||||||
|
int64_t deleteMark;
|
||||||
int8_t igExpired;
|
int8_t igExpired;
|
||||||
EOrder inputTsOrder;
|
EOrder inputTsOrder;
|
||||||
EOrder outputTsOrder;
|
EOrder outputTsOrder;
|
||||||
|
|
|
@ -34,6 +34,7 @@ typedef struct SPlanContext {
|
||||||
bool showRewrite;
|
bool showRewrite;
|
||||||
int8_t triggerType;
|
int8_t triggerType;
|
||||||
int64_t watermark;
|
int64_t watermark;
|
||||||
|
int64_t deleteMark;
|
||||||
int8_t igExpired;
|
int8_t igExpired;
|
||||||
char* pMsg;
|
char* pMsg;
|
||||||
int32_t msgLen;
|
int32_t msgLen;
|
||||||
|
|
|
@ -386,6 +386,9 @@ typedef enum ELogicConditionType {
|
||||||
#define TSDB_MIN_ROLLUP_WATERMARK 0 // unit millisecond
|
#define TSDB_MIN_ROLLUP_WATERMARK 0 // unit millisecond
|
||||||
#define TSDB_MAX_ROLLUP_WATERMARK (15 * 60 * 1000)
|
#define TSDB_MAX_ROLLUP_WATERMARK (15 * 60 * 1000)
|
||||||
#define TSDB_DEFAULT_ROLLUP_WATERMARK 5000
|
#define TSDB_DEFAULT_ROLLUP_WATERMARK 5000
|
||||||
|
#define TSDB_MIN_ROLLUP_DELETE_MARK 0 // unit millisecond
|
||||||
|
#define TSDB_MAX_ROLLUP_DELETE_MARK INT64_MAX
|
||||||
|
#define TSDB_DEFAULT_ROLLUP_DELETE_MARK 900000 // 900s
|
||||||
#define TSDB_MIN_TABLE_TTL 0
|
#define TSDB_MIN_TABLE_TTL 0
|
||||||
#define TSDB_DEFAULT_TABLE_TTL 0
|
#define TSDB_DEFAULT_TABLE_TTL 0
|
||||||
|
|
||||||
|
|
|
@ -551,6 +551,8 @@ int32_t tSerializeSMCreateStbReq(void *buf, int32_t bufLen, SMCreateStbReq *pReq
|
||||||
if (pReq->ast2Len > 0) {
|
if (pReq->ast2Len > 0) {
|
||||||
if (tEncodeBinary(&encoder, pReq->pAst2, pReq->ast2Len) < 0) return -1;
|
if (tEncodeBinary(&encoder, pReq->pAst2, pReq->ast2Len) < 0) return -1;
|
||||||
}
|
}
|
||||||
|
if (tEncodeI64(&encoder, pReq->deleteMark1) < 0) return -1;
|
||||||
|
if (tEncodeI64(&encoder, pReq->deleteMark2) < 0) return -1;
|
||||||
|
|
||||||
tEndEncode(&encoder);
|
tEndEncode(&encoder);
|
||||||
|
|
||||||
|
@ -644,6 +646,9 @@ int32_t tDeserializeSMCreateStbReq(void *buf, int32_t bufLen, SMCreateStbReq *pR
|
||||||
if (tDecodeCStrTo(&decoder, pReq->pAst2) < 0) return -1;
|
if (tDecodeCStrTo(&decoder, pReq->pAst2) < 0) return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (tDecodeI64(&decoder, &pReq->deleteMark1) < 0) return -1;
|
||||||
|
if (tDecodeI64(&decoder, &pReq->deleteMark2) < 0) return -1;
|
||||||
|
|
||||||
tEndDecode(&decoder);
|
tEndDecode(&decoder);
|
||||||
tDecoderClear(&decoder);
|
tDecoderClear(&decoder);
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -822,6 +827,7 @@ int32_t tSerializeSMCreateSmaReq(void *buf, int32_t bufLen, SMCreateSmaReq *pReq
|
||||||
if (pReq->astLen > 0) {
|
if (pReq->astLen > 0) {
|
||||||
if (tEncodeBinary(&encoder, pReq->ast, pReq->astLen) < 0) return -1;
|
if (tEncodeBinary(&encoder, pReq->ast, pReq->astLen) < 0) return -1;
|
||||||
}
|
}
|
||||||
|
if (tEncodeI64(&encoder, pReq->deleteMark) < 0) return -1;
|
||||||
tEndEncode(&encoder);
|
tEndEncode(&encoder);
|
||||||
|
|
||||||
int32_t tlen = encoder.pos;
|
int32_t tlen = encoder.pos;
|
||||||
|
@ -870,7 +876,7 @@ int32_t tDeserializeSMCreateSmaReq(void *buf, int32_t bufLen, SMCreateSmaReq *pR
|
||||||
if (pReq->ast == NULL) return -1;
|
if (pReq->ast == NULL) return -1;
|
||||||
if (tDecodeCStrTo(&decoder, pReq->ast) < 0) return -1;
|
if (tDecodeCStrTo(&decoder, pReq->ast) < 0) return -1;
|
||||||
}
|
}
|
||||||
|
if (tDecodeI64(&decoder, &pReq->deleteMark) < 0) return -1;
|
||||||
tEndDecode(&decoder);
|
tEndDecode(&decoder);
|
||||||
tDecoderClear(&decoder);
|
tDecoderClear(&decoder);
|
||||||
return 0;
|
return 0;
|
||||||
|
|
|
@ -644,6 +644,7 @@ typedef struct {
|
||||||
// 3.0.20
|
// 3.0.20
|
||||||
int64_t checkpointFreq; // ms
|
int64_t checkpointFreq; // ms
|
||||||
int64_t currentTick; // do not serialize
|
int64_t currentTick; // do not serialize
|
||||||
|
int64_t deleteMark;
|
||||||
} SStreamObj;
|
} SStreamObj;
|
||||||
|
|
||||||
int32_t tEncodeSStreamObj(SEncoder* pEncoder, const SStreamObj* pObj);
|
int32_t tEncodeSStreamObj(SEncoder* pEncoder, const SStreamObj* pObj);
|
||||||
|
|
|
@ -28,7 +28,7 @@ void mndCleanupScheduler(SMnode* pMnode);
|
||||||
int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscribeObj* pSub);
|
int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscribeObj* pSub);
|
||||||
|
|
||||||
int32_t mndConvertRsmaTask(char** pDst, int32_t* pDstLen, const char* ast, int64_t uid, int8_t triggerType,
|
int32_t mndConvertRsmaTask(char** pDst, int32_t* pDstLen, const char* ast, int64_t uid, int8_t triggerType,
|
||||||
int64_t watermark);
|
int64_t watermark, int64_t deleteMark);
|
||||||
|
|
||||||
int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream);
|
int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream);
|
||||||
|
|
||||||
|
|
|
@ -42,7 +42,7 @@ static int32_t mndAddTaskToTaskSet(SArray* pArray, SStreamTask* pTask) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t mndConvertRsmaTask(char** pDst, int32_t* pDstLen, const char* ast, int64_t uid, int8_t triggerType,
|
int32_t mndConvertRsmaTask(char** pDst, int32_t* pDstLen, const char* ast, int64_t uid, int8_t triggerType,
|
||||||
int64_t watermark) {
|
int64_t watermark, int64_t deleteMark) {
|
||||||
SNode* pAst = NULL;
|
SNode* pAst = NULL;
|
||||||
SQueryPlan* pPlan = NULL;
|
SQueryPlan* pPlan = NULL;
|
||||||
terrno = TSDB_CODE_SUCCESS;
|
terrno = TSDB_CODE_SUCCESS;
|
||||||
|
@ -64,6 +64,7 @@ int32_t mndConvertRsmaTask(char** pDst, int32_t* pDstLen, const char* ast, int64
|
||||||
.rSmaQuery = true,
|
.rSmaQuery = true,
|
||||||
.triggerType = triggerType,
|
.triggerType = triggerType,
|
||||||
.watermark = watermark,
|
.watermark = watermark,
|
||||||
|
.deleteMark = deleteMark,
|
||||||
};
|
};
|
||||||
|
|
||||||
if (qCreateQueryPlan(&cxt, &pPlan, NULL) < 0) {
|
if (qCreateQueryPlan(&cxt, &pPlan, NULL) < 0) {
|
||||||
|
|
|
@ -534,6 +534,7 @@ static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCrea
|
||||||
streamObj.sql = strdup(pCreate->sql);
|
streamObj.sql = strdup(pCreate->sql);
|
||||||
streamObj.smaId = smaObj.uid;
|
streamObj.smaId = smaObj.uid;
|
||||||
streamObj.watermark = pCreate->watermark;
|
streamObj.watermark = pCreate->watermark;
|
||||||
|
streamObj.deleteMark = pCreate->deleteMark;
|
||||||
streamObj.fillHistory = STREAM_FILL_HISTORY_ON;
|
streamObj.fillHistory = STREAM_FILL_HISTORY_ON;
|
||||||
streamObj.trigger = STREAM_TRIGGER_WINDOW_CLOSE;
|
streamObj.trigger = STREAM_TRIGGER_WINDOW_CLOSE;
|
||||||
streamObj.triggerParam = pCreate->maxDelay;
|
streamObj.triggerParam = pCreate->maxDelay;
|
||||||
|
@ -574,6 +575,7 @@ static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCrea
|
||||||
.streamQuery = true,
|
.streamQuery = true,
|
||||||
.triggerType = streamObj.trigger,
|
.triggerType = streamObj.trigger,
|
||||||
.watermark = streamObj.watermark,
|
.watermark = streamObj.watermark,
|
||||||
|
.deleteMark = streamObj.deleteMark,
|
||||||
};
|
};
|
||||||
|
|
||||||
if (qCreateQueryPlan(&cxt, &pPlan, NULL) < 0) {
|
if (qCreateQueryPlan(&cxt, &pPlan, NULL) < 0) {
|
||||||
|
|
|
@ -450,13 +450,15 @@ static void *mndBuildVCreateStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pSt
|
||||||
req.rsmaParam.watermark[1] = pStb->watermark[1];
|
req.rsmaParam.watermark[1] = pStb->watermark[1];
|
||||||
if (pStb->ast1Len > 0) {
|
if (pStb->ast1Len > 0) {
|
||||||
if (mndConvertRsmaTask(&req.rsmaParam.qmsg[0], &req.rsmaParam.qmsgLen[0], pStb->pAst1, pStb->uid,
|
if (mndConvertRsmaTask(&req.rsmaParam.qmsg[0], &req.rsmaParam.qmsgLen[0], pStb->pAst1, pStb->uid,
|
||||||
STREAM_TRIGGER_WINDOW_CLOSE, req.rsmaParam.watermark[0]) < 0) {
|
STREAM_TRIGGER_WINDOW_CLOSE, req.rsmaParam.watermark[0],
|
||||||
|
req.rsmaParam.deleteMark[0]) < 0) {
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (pStb->ast2Len > 0) {
|
if (pStb->ast2Len > 0) {
|
||||||
if (mndConvertRsmaTask(&req.rsmaParam.qmsg[1], &req.rsmaParam.qmsgLen[1], pStb->pAst2, pStb->uid,
|
if (mndConvertRsmaTask(&req.rsmaParam.qmsg[1], &req.rsmaParam.qmsgLen[1], pStb->pAst2, pStb->uid,
|
||||||
STREAM_TRIGGER_WINDOW_CLOSE, req.rsmaParam.watermark[1]) < 0) {
|
STREAM_TRIGGER_WINDOW_CLOSE, req.rsmaParam.watermark[1],
|
||||||
|
req.rsmaParam.deleteMark[1]) < 0) {
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -378,6 +378,7 @@ static int32_t logicScanCopy(const SScanLogicNode* pSrc, SScanLogicNode* pDst) {
|
||||||
CLONE_NODE_FIELD(pTagIndexCond);
|
CLONE_NODE_FIELD(pTagIndexCond);
|
||||||
COPY_SCALAR_FIELD(triggerType);
|
COPY_SCALAR_FIELD(triggerType);
|
||||||
COPY_SCALAR_FIELD(watermark);
|
COPY_SCALAR_FIELD(watermark);
|
||||||
|
COPY_SCALAR_FIELD(deleteMark);
|
||||||
COPY_SCALAR_FIELD(igExpired);
|
COPY_SCALAR_FIELD(igExpired);
|
||||||
CLONE_NODE_LIST_FIELD(pGroupTags);
|
CLONE_NODE_LIST_FIELD(pGroupTags);
|
||||||
COPY_SCALAR_FIELD(groupSort);
|
COPY_SCALAR_FIELD(groupSort);
|
||||||
|
@ -463,6 +464,7 @@ static int32_t logicWindowCopy(const SWindowLogicNode* pSrc, SWindowLogicNode* p
|
||||||
CLONE_NODE_FIELD(pStateExpr);
|
CLONE_NODE_FIELD(pStateExpr);
|
||||||
COPY_SCALAR_FIELD(triggerType);
|
COPY_SCALAR_FIELD(triggerType);
|
||||||
COPY_SCALAR_FIELD(watermark);
|
COPY_SCALAR_FIELD(watermark);
|
||||||
|
COPY_SCALAR_FIELD(deleteMark);
|
||||||
COPY_SCALAR_FIELD(igExpired);
|
COPY_SCALAR_FIELD(igExpired);
|
||||||
COPY_SCALAR_FIELD(windowAlgo);
|
COPY_SCALAR_FIELD(windowAlgo);
|
||||||
COPY_SCALAR_FIELD(inputTsOrder);
|
COPY_SCALAR_FIELD(inputTsOrder);
|
||||||
|
|
|
@ -819,6 +819,7 @@ static const char* jkWindowLogicPlanTspk = "Tspk";
|
||||||
static const char* jkWindowLogicPlanStateExpr = "StateExpr";
|
static const char* jkWindowLogicPlanStateExpr = "StateExpr";
|
||||||
static const char* jkWindowLogicPlanTriggerType = "TriggerType";
|
static const char* jkWindowLogicPlanTriggerType = "TriggerType";
|
||||||
static const char* jkWindowLogicPlanWatermark = "Watermark";
|
static const char* jkWindowLogicPlanWatermark = "Watermark";
|
||||||
|
static const char* jkWindowLogicPlanDeleteMark = "DeleteMark";
|
||||||
|
|
||||||
static int32_t logicWindowNodeToJson(const void* pObj, SJson* pJson) {
|
static int32_t logicWindowNodeToJson(const void* pObj, SJson* pJson) {
|
||||||
const SWindowLogicNode* pNode = (const SWindowLogicNode*)pObj;
|
const SWindowLogicNode* pNode = (const SWindowLogicNode*)pObj;
|
||||||
|
@ -860,6 +861,9 @@ static int32_t logicWindowNodeToJson(const void* pObj, SJson* pJson) {
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = tjsonAddIntegerToObject(pJson, jkWindowLogicPlanWatermark, pNode->watermark);
|
code = tjsonAddIntegerToObject(pJson, jkWindowLogicPlanWatermark, pNode->watermark);
|
||||||
}
|
}
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
code = tjsonAddIntegerToObject(pJson, jkWindowLogicPlanDeleteMark, pNode->deleteMark);
|
||||||
|
}
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
@ -904,6 +908,9 @@ static int32_t jsonToLogicWindowNode(const SJson* pJson, void* pObj) {
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = tjsonGetBigIntValue(pJson, jkWindowLogicPlanWatermark, &pNode->watermark);
|
code = tjsonGetBigIntValue(pJson, jkWindowLogicPlanWatermark, &pNode->watermark);
|
||||||
}
|
}
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
code = tjsonGetBigIntValue(pJson, jkWindowLogicPlanDeleteMark, &pNode->deleteMark);
|
||||||
|
}
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
@ -2004,6 +2011,7 @@ static const char* jkWindowPhysiPlanTsPk = "TsPk";
|
||||||
static const char* jkWindowPhysiPlanTsEnd = "TsEnd";
|
static const char* jkWindowPhysiPlanTsEnd = "TsEnd";
|
||||||
static const char* jkWindowPhysiPlanTriggerType = "TriggerType";
|
static const char* jkWindowPhysiPlanTriggerType = "TriggerType";
|
||||||
static const char* jkWindowPhysiPlanWatermark = "Watermark";
|
static const char* jkWindowPhysiPlanWatermark = "Watermark";
|
||||||
|
static const char* jkWindowPhysiPlanDeleteMark = "DeleteMark";
|
||||||
static const char* jkWindowPhysiPlanIgnoreExpired = "IgnoreExpired";
|
static const char* jkWindowPhysiPlanIgnoreExpired = "IgnoreExpired";
|
||||||
static const char* jkWindowPhysiPlanInputTsOrder = "InputTsOrder";
|
static const char* jkWindowPhysiPlanInputTsOrder = "InputTsOrder";
|
||||||
static const char* jkWindowPhysiPlanOutputTsOrder = "outputTsOrder";
|
static const char* jkWindowPhysiPlanOutputTsOrder = "outputTsOrder";
|
||||||
|
@ -2031,6 +2039,9 @@ static int32_t physiWindowNodeToJson(const void* pObj, SJson* pJson) {
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = tjsonAddIntegerToObject(pJson, jkWindowPhysiPlanWatermark, pNode->watermark);
|
code = tjsonAddIntegerToObject(pJson, jkWindowPhysiPlanWatermark, pNode->watermark);
|
||||||
}
|
}
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
code = tjsonAddIntegerToObject(pJson, jkWindowPhysiPlanDeleteMark, pNode->deleteMark);
|
||||||
|
}
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = tjsonAddIntegerToObject(pJson, jkWindowPhysiPlanIgnoreExpired, pNode->igExpired);
|
code = tjsonAddIntegerToObject(pJson, jkWindowPhysiPlanIgnoreExpired, pNode->igExpired);
|
||||||
}
|
}
|
||||||
|
@ -2069,6 +2080,9 @@ static int32_t jsonToPhysiWindowNode(const SJson* pJson, void* pObj) {
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = tjsonGetBigIntValue(pJson, jkWindowPhysiPlanWatermark, &pNode->watermark);
|
code = tjsonGetBigIntValue(pJson, jkWindowPhysiPlanWatermark, &pNode->watermark);
|
||||||
}
|
}
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
code = tjsonGetBigIntValue(pJson, jkWindowPhysiPlanDeleteMark, &pNode->deleteMark);
|
||||||
|
}
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = tjsonGetTinyIntValue(pJson, jkWindowPhysiPlanIgnoreExpired, &pNode->igExpired);
|
code = tjsonGetTinyIntValue(pJson, jkWindowPhysiPlanIgnoreExpired, &pNode->igExpired);
|
||||||
}
|
}
|
||||||
|
@ -3532,6 +3546,18 @@ static int32_t groupingSetNodeToJson(const void* pObj, SJson* pJson) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t jsonToGroupingSetNode(const SJson* pJson, void* pObj) {
|
||||||
|
SGroupingSetNode* pNode = (SGroupingSetNode*)pObj;
|
||||||
|
|
||||||
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
tjsonGetNumberValue(pJson, jkGroupingSetType, pNode->groupingSetType, code);
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
code = jsonToNodeList(pJson, jkGroupingSetParameter, &pNode->pParameterList);
|
||||||
|
}
|
||||||
|
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
static const char* jkOrderByExprExpr = "Expr";
|
static const char* jkOrderByExprExpr = "Expr";
|
||||||
static const char* jkOrderByExprOrder = "Order";
|
static const char* jkOrderByExprOrder = "Order";
|
||||||
static const char* jkOrderByExprNullOrder = "NullOrder";
|
static const char* jkOrderByExprNullOrder = "NullOrder";
|
||||||
|
@ -4729,6 +4755,8 @@ static int32_t jsonToSpecificNode(const SJson* pJson, void* pObj) {
|
||||||
return jsonToRealTableNode(pJson, pObj);
|
return jsonToRealTableNode(pJson, pObj);
|
||||||
case QUERY_NODE_TEMP_TABLE:
|
case QUERY_NODE_TEMP_TABLE:
|
||||||
return jsonToTempTableNode(pJson, pObj);
|
return jsonToTempTableNode(pJson, pObj);
|
||||||
|
case QUERY_NODE_GROUPING_SET:
|
||||||
|
return jsonToGroupingSetNode(pJson, pObj);
|
||||||
case QUERY_NODE_ORDER_BY_EXPR:
|
case QUERY_NODE_ORDER_BY_EXPR:
|
||||||
return jsonToOrderByExprNode(pJson, pObj);
|
return jsonToOrderByExprNode(pJson, pObj);
|
||||||
case QUERY_NODE_LIMIT:
|
case QUERY_NODE_LIMIT:
|
||||||
|
|
|
@ -2607,6 +2607,7 @@ enum {
|
||||||
PHY_WINDOW_CODE_TS_END,
|
PHY_WINDOW_CODE_TS_END,
|
||||||
PHY_WINDOW_CODE_TRIGGER_TYPE,
|
PHY_WINDOW_CODE_TRIGGER_TYPE,
|
||||||
PHY_WINDOW_CODE_WATERMARK,
|
PHY_WINDOW_CODE_WATERMARK,
|
||||||
|
PHY_WINDOW_CODE_DELETE_MARK,
|
||||||
PHY_WINDOW_CODE_IG_EXPIRED,
|
PHY_WINDOW_CODE_IG_EXPIRED,
|
||||||
PHY_WINDOW_CODE_INPUT_TS_ORDER,
|
PHY_WINDOW_CODE_INPUT_TS_ORDER,
|
||||||
PHY_WINDOW_CODE_OUTPUT_TS_ORDER,
|
PHY_WINDOW_CODE_OUTPUT_TS_ORDER,
|
||||||
|
@ -2635,6 +2636,9 @@ static int32_t physiWindowNodeToMsg(const void* pObj, STlvEncoder* pEncoder) {
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = tlvEncodeI64(pEncoder, PHY_WINDOW_CODE_WATERMARK, pNode->watermark);
|
code = tlvEncodeI64(pEncoder, PHY_WINDOW_CODE_WATERMARK, pNode->watermark);
|
||||||
}
|
}
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
code = tlvEncodeI64(pEncoder, PHY_WINDOW_CODE_DELETE_MARK, pNode->deleteMark);
|
||||||
|
}
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = tlvEncodeI8(pEncoder, PHY_WINDOW_CODE_IG_EXPIRED, pNode->igExpired);
|
code = tlvEncodeI8(pEncoder, PHY_WINDOW_CODE_IG_EXPIRED, pNode->igExpired);
|
||||||
}
|
}
|
||||||
|
@ -2679,6 +2683,9 @@ static int32_t msgToPhysiWindowNode(STlvDecoder* pDecoder, void* pObj) {
|
||||||
case PHY_WINDOW_CODE_WATERMARK:
|
case PHY_WINDOW_CODE_WATERMARK:
|
||||||
code = tlvDecodeI64(pTlv, &pNode->watermark);
|
code = tlvDecodeI64(pTlv, &pNode->watermark);
|
||||||
break;
|
break;
|
||||||
|
case PHY_WINDOW_CODE_DELETE_MARK:
|
||||||
|
code = tlvDecodeI64(pTlv, &pNode->deleteMark);
|
||||||
|
break;
|
||||||
case PHY_WINDOW_CODE_IG_EXPIRED:
|
case PHY_WINDOW_CODE_IG_EXPIRED:
|
||||||
code = tlvDecodeI8(pTlv, &pNode->igExpired);
|
code = tlvDecodeI8(pTlv, &pNode->igExpired);
|
||||||
break;
|
break;
|
||||||
|
|
|
@ -596,6 +596,13 @@ static void destroyWinodwPhysiNode(SWinodwPhysiNode* pNode) {
|
||||||
nodesDestroyNode(pNode->pTsEnd);
|
nodesDestroyNode(pNode->pTsEnd);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void destroyPartitionPhysiNode(SPartitionPhysiNode* pNode) {
|
||||||
|
destroyPhysiNode((SPhysiNode*)pNode);
|
||||||
|
nodesDestroyList(pNode->pExprs);
|
||||||
|
nodesDestroyList(pNode->pPartitionKeys);
|
||||||
|
nodesDestroyList(pNode->pTargets);
|
||||||
|
}
|
||||||
|
|
||||||
static void destroyScanPhysiNode(SScanPhysiNode* pNode) {
|
static void destroyScanPhysiNode(SScanPhysiNode* pNode) {
|
||||||
destroyPhysiNode((SPhysiNode*)pNode);
|
destroyPhysiNode((SPhysiNode*)pNode);
|
||||||
nodesDestroyList(pNode->pScanCols);
|
nodesDestroyList(pNode->pScanCols);
|
||||||
|
@ -733,6 +740,7 @@ void nodesDestroyNode(SNode* pNode) {
|
||||||
nodesDestroyList(pOptions->pWatermark);
|
nodesDestroyList(pOptions->pWatermark);
|
||||||
nodesDestroyList(pOptions->pRollupFuncs);
|
nodesDestroyList(pOptions->pRollupFuncs);
|
||||||
nodesDestroyList(pOptions->pSma);
|
nodesDestroyList(pOptions->pSma);
|
||||||
|
nodesDestroyList(pOptions->pDeleteMark);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
case QUERY_NODE_INDEX_OPTIONS: {
|
case QUERY_NODE_INDEX_OPTIONS: {
|
||||||
|
@ -750,6 +758,7 @@ void nodesDestroyNode(SNode* pNode) {
|
||||||
SStreamOptions* pOptions = (SStreamOptions*)pNode;
|
SStreamOptions* pOptions = (SStreamOptions*)pNode;
|
||||||
nodesDestroyNode(pOptions->pDelay);
|
nodesDestroyNode(pOptions->pDelay);
|
||||||
nodesDestroyNode(pOptions->pWatermark);
|
nodesDestroyNode(pOptions->pWatermark);
|
||||||
|
nodesDestroyNode(pOptions->pDeleteMark);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
case QUERY_NODE_LEFT_VALUE: // no pointer field
|
case QUERY_NODE_LEFT_VALUE: // no pointer field
|
||||||
|
@ -905,6 +914,8 @@ void nodesDestroyNode(SNode* pNode) {
|
||||||
SCreateStreamStmt* pStmt = (SCreateStreamStmt*)pNode;
|
SCreateStreamStmt* pStmt = (SCreateStreamStmt*)pNode;
|
||||||
nodesDestroyNode((SNode*)pStmt->pOptions);
|
nodesDestroyNode((SNode*)pStmt->pOptions);
|
||||||
nodesDestroyNode(pStmt->pQuery);
|
nodesDestroyNode(pStmt->pQuery);
|
||||||
|
nodesDestroyList(pStmt->pTags);
|
||||||
|
nodesDestroyNode(pStmt->pSubtable);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
case QUERY_NODE_DROP_STREAM_STMT: // no pointer field
|
case QUERY_NODE_DROP_STREAM_STMT: // no pointer field
|
||||||
|
@ -1020,6 +1031,8 @@ void nodesDestroyNode(SNode* pNode) {
|
||||||
nodesDestroyNode(pLogicNode->pTagIndexCond);
|
nodesDestroyNode(pLogicNode->pTagIndexCond);
|
||||||
taosArrayDestroyEx(pLogicNode->pSmaIndexes, destroySmaIndex);
|
taosArrayDestroyEx(pLogicNode->pSmaIndexes, destroySmaIndex);
|
||||||
nodesDestroyList(pLogicNode->pGroupTags);
|
nodesDestroyList(pLogicNode->pGroupTags);
|
||||||
|
nodesDestroyList(pLogicNode->pTags);
|
||||||
|
nodesDestroyNode(pLogicNode->pSubtable);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
case QUERY_NODE_LOGIC_PLAN_JOIN: {
|
case QUERY_NODE_LOGIC_PLAN_JOIN: {
|
||||||
|
@ -1092,6 +1105,8 @@ void nodesDestroyNode(SNode* pNode) {
|
||||||
SPartitionLogicNode* pLogicNode = (SPartitionLogicNode*)pNode;
|
SPartitionLogicNode* pLogicNode = (SPartitionLogicNode*)pNode;
|
||||||
destroyLogicNode((SLogicNode*)pLogicNode);
|
destroyLogicNode((SLogicNode*)pLogicNode);
|
||||||
nodesDestroyList(pLogicNode->pPartitionKeys);
|
nodesDestroyList(pLogicNode->pPartitionKeys);
|
||||||
|
nodesDestroyList(pLogicNode->pTags);
|
||||||
|
nodesDestroyNode(pLogicNode->pSubtable);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
case QUERY_NODE_LOGIC_PLAN_INDEF_ROWS_FUNC: {
|
case QUERY_NODE_LOGIC_PLAN_INDEF_ROWS_FUNC: {
|
||||||
|
@ -1122,10 +1137,10 @@ void nodesDestroyNode(SNode* pNode) {
|
||||||
case QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN:
|
case QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN:
|
||||||
case QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN:
|
case QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN:
|
||||||
case QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN:
|
case QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN:
|
||||||
case QUERY_NODE_PHYSICAL_PLAN_TABLE_COUNT_SCAN:
|
|
||||||
destroyScanPhysiNode((SScanPhysiNode*)pNode);
|
destroyScanPhysiNode((SScanPhysiNode*)pNode);
|
||||||
break;
|
break;
|
||||||
case QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN: {
|
case QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN:
|
||||||
|
case QUERY_NODE_PHYSICAL_PLAN_TABLE_COUNT_SCAN: {
|
||||||
SLastRowScanPhysiNode* pPhyNode = (SLastRowScanPhysiNode*)pNode;
|
SLastRowScanPhysiNode* pPhyNode = (SLastRowScanPhysiNode*)pNode;
|
||||||
destroyScanPhysiNode((SScanPhysiNode*)pNode);
|
destroyScanPhysiNode((SScanPhysiNode*)pNode);
|
||||||
nodesDestroyList(pPhyNode->pGroupTags);
|
nodesDestroyList(pPhyNode->pGroupTags);
|
||||||
|
@ -1139,6 +1154,8 @@ void nodesDestroyNode(SNode* pNode) {
|
||||||
destroyScanPhysiNode((SScanPhysiNode*)pNode);
|
destroyScanPhysiNode((SScanPhysiNode*)pNode);
|
||||||
nodesDestroyList(pPhyNode->pDynamicScanFuncs);
|
nodesDestroyList(pPhyNode->pDynamicScanFuncs);
|
||||||
nodesDestroyList(pPhyNode->pGroupTags);
|
nodesDestroyList(pPhyNode->pGroupTags);
|
||||||
|
nodesDestroyList(pPhyNode->pTags);
|
||||||
|
nodesDestroyNode(pPhyNode->pSubtable);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
case QUERY_NODE_PHYSICAL_PLAN_PROJECT: {
|
case QUERY_NODE_PHYSICAL_PLAN_PROJECT: {
|
||||||
|
@ -1215,13 +1232,15 @@ void nodesDestroyNode(SNode* pNode) {
|
||||||
nodesDestroyNode(pPhyNode->pStateKey);
|
nodesDestroyNode(pPhyNode->pStateKey);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
case QUERY_NODE_PHYSICAL_PLAN_PARTITION:
|
case QUERY_NODE_PHYSICAL_PLAN_PARTITION: {
|
||||||
|
destroyPartitionPhysiNode((SPartitionPhysiNode*)pNode);
|
||||||
|
break;
|
||||||
|
}
|
||||||
case QUERY_NODE_PHYSICAL_PLAN_STREAM_PARTITION: {
|
case QUERY_NODE_PHYSICAL_PLAN_STREAM_PARTITION: {
|
||||||
SPartitionPhysiNode* pPhyNode = (SPartitionPhysiNode*)pNode;
|
SStreamPartitionPhysiNode* pPhyNode = (SStreamPartitionPhysiNode*)pNode;
|
||||||
destroyPhysiNode((SPhysiNode*)pPhyNode);
|
destroyPartitionPhysiNode((SPartitionPhysiNode*)pPhyNode);
|
||||||
nodesDestroyList(pPhyNode->pExprs);
|
nodesDestroyList(pPhyNode->pTags);
|
||||||
nodesDestroyList(pPhyNode->pPartitionKeys);
|
nodesDestroyNode(pPhyNode->pSubtable);
|
||||||
nodesDestroyList(pPhyNode->pTargets);
|
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
case QUERY_NODE_PHYSICAL_PLAN_INDEF_ROWS_FUNC: {
|
case QUERY_NODE_PHYSICAL_PLAN_INDEF_ROWS_FUNC: {
|
||||||
|
|
|
@ -72,7 +72,8 @@ typedef enum ETableOptionType {
|
||||||
TABLE_OPTION_WATERMARK,
|
TABLE_OPTION_WATERMARK,
|
||||||
TABLE_OPTION_ROLLUP,
|
TABLE_OPTION_ROLLUP,
|
||||||
TABLE_OPTION_TTL,
|
TABLE_OPTION_TTL,
|
||||||
TABLE_OPTION_SMA
|
TABLE_OPTION_SMA,
|
||||||
|
TABLE_OPTION_DELETE_MARK
|
||||||
} ETableOptionType;
|
} ETableOptionType;
|
||||||
|
|
||||||
typedef struct SAlterOption {
|
typedef struct SAlterOption {
|
||||||
|
|
|
@ -362,6 +362,7 @@ table_options(A) ::= table_options(B) WATERMARK duration_list(C).
|
||||||
table_options(A) ::= table_options(B) ROLLUP NK_LP rollup_func_list(C) NK_RP. { A = setTableOption(pCxt, B, TABLE_OPTION_ROLLUP, C); }
|
table_options(A) ::= table_options(B) ROLLUP NK_LP rollup_func_list(C) NK_RP. { A = setTableOption(pCxt, B, TABLE_OPTION_ROLLUP, C); }
|
||||||
table_options(A) ::= table_options(B) TTL NK_INTEGER(C). { A = setTableOption(pCxt, B, TABLE_OPTION_TTL, &C); }
|
table_options(A) ::= table_options(B) TTL NK_INTEGER(C). { A = setTableOption(pCxt, B, TABLE_OPTION_TTL, &C); }
|
||||||
table_options(A) ::= table_options(B) SMA NK_LP col_name_list(C) NK_RP. { A = setTableOption(pCxt, B, TABLE_OPTION_SMA, C); }
|
table_options(A) ::= table_options(B) SMA NK_LP col_name_list(C) NK_RP. { A = setTableOption(pCxt, B, TABLE_OPTION_SMA, C); }
|
||||||
|
table_options(A) ::= table_options(B) DELETE_MARK duration_list(C). { A = setTableOption(pCxt, B, TABLE_OPTION_DELETE_MARK, C); }
|
||||||
|
|
||||||
alter_table_options(A) ::= alter_table_option(B). { A = createAlterTableOptions(pCxt); A = setTableOption(pCxt, A, B.type, &B.val); }
|
alter_table_options(A) ::= alter_table_option(B). { A = createAlterTableOptions(pCxt); A = setTableOption(pCxt, A, B.type, &B.val); }
|
||||||
alter_table_options(A) ::= alter_table_options(B) alter_table_option(C). { A = setTableOption(pCxt, B, C.type, &C.val); }
|
alter_table_options(A) ::= alter_table_options(B) alter_table_option(C). { A = setTableOption(pCxt, B, C.type, &C.val); }
|
||||||
|
@ -475,8 +476,9 @@ func_list(A) ::= func_list(B) NK_COMMA func(C).
|
||||||
func(A) ::= function_name(B) NK_LP expression_list(C) NK_RP. { A = createFunctionNode(pCxt, &B, C); }
|
func(A) ::= function_name(B) NK_LP expression_list(C) NK_RP. { A = createFunctionNode(pCxt, &B, C); }
|
||||||
|
|
||||||
sma_stream_opt(A) ::= . { A = createStreamOptions(pCxt); }
|
sma_stream_opt(A) ::= . { A = createStreamOptions(pCxt); }
|
||||||
sma_stream_opt(A) ::= stream_options(B) WATERMARK duration_literal(C). { ((SStreamOptions*)B)->pWatermark = releaseRawExprNode(pCxt, C); A = B; }
|
sma_stream_opt(A) ::= sma_stream_opt(B) WATERMARK duration_literal(C). { ((SStreamOptions*)B)->pWatermark = releaseRawExprNode(pCxt, C); A = B; }
|
||||||
sma_stream_opt(A) ::= stream_options(B) MAX_DELAY duration_literal(C). { ((SStreamOptions*)B)->pDelay = releaseRawExprNode(pCxt, C); A = B; }
|
sma_stream_opt(A) ::= sma_stream_opt(B) MAX_DELAY duration_literal(C). { ((SStreamOptions*)B)->pDelay = releaseRawExprNode(pCxt, C); A = B; }
|
||||||
|
sma_stream_opt(A) ::= sma_stream_opt(B) DELETE_MARK duration_literal(C). { ((SStreamOptions*)B)->pDeleteMark = releaseRawExprNode(pCxt, C); A = B; }
|
||||||
|
|
||||||
/************************************************ create/drop topic ***************************************************/
|
/************************************************ create/drop topic ***************************************************/
|
||||||
cmd ::= CREATE TOPIC not_exists_opt(A) topic_name(B) AS query_or_subquery(C). { pCxt->pRootNode = createCreateTopicStmtUseQuery(pCxt, A, &B, C); }
|
cmd ::= CREATE TOPIC not_exists_opt(A) topic_name(B) AS query_or_subquery(C). { pCxt->pRootNode = createCreateTopicStmtUseQuery(pCxt, A, &B, C); }
|
||||||
|
|
|
@ -1124,6 +1124,9 @@ SNode* setTableOption(SAstCreateContext* pCxt, SNode* pOptions, ETableOptionType
|
||||||
case TABLE_OPTION_SMA:
|
case TABLE_OPTION_SMA:
|
||||||
((STableOptions*)pOptions)->pSma = pVal;
|
((STableOptions*)pOptions)->pSma = pVal;
|
||||||
break;
|
break;
|
||||||
|
case TABLE_OPTION_DELETE_MARK:
|
||||||
|
((STableOptions*)pOptions)->pDeleteMark = pVal;
|
||||||
|
break;
|
||||||
default:
|
default:
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
|
@ -74,6 +74,7 @@ static SKeyword keywordTable[] = {
|
||||||
{"DATABASES", TK_DATABASES},
|
{"DATABASES", TK_DATABASES},
|
||||||
{"DBS", TK_DBS},
|
{"DBS", TK_DBS},
|
||||||
{"DELETE", TK_DELETE},
|
{"DELETE", TK_DELETE},
|
||||||
|
{"DELETE_MARK", TK_DELETE_MARK},
|
||||||
{"DESC", TK_DESC},
|
{"DESC", TK_DESC},
|
||||||
{"DESCRIBE", TK_DESCRIBE},
|
{"DESCRIBE", TK_DESCRIBE},
|
||||||
{"DISTINCT", TK_DISTINCT},
|
{"DISTINCT", TK_DISTINCT},
|
||||||
|
|
|
@ -3804,10 +3804,11 @@ static int32_t buildCreateDbReq(STranslateContext* pCxt, SCreateDatabaseStmt* pS
|
||||||
return buildCreateDbRetentions(pStmt->pOptions->pRetentions, pReq);
|
return buildCreateDbRetentions(pStmt->pOptions->pRetentions, pReq);
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t checkRangeOption(STranslateContext* pCxt, int32_t code, const char* pName, int32_t val, int32_t minVal,
|
static int32_t checkRangeOption(STranslateContext* pCxt, int32_t code, const char* pName, int64_t val, int64_t minVal,
|
||||||
int32_t maxVal) {
|
int64_t maxVal) {
|
||||||
if (val >= 0 && (val < minVal || val > maxVal)) {
|
if (val >= 0 && (val < minVal || val > maxVal)) {
|
||||||
return generateSyntaxErrMsgExt(&pCxt->msgBuf, code, "Invalid option %s: %d valid range: [%d, %d]", pName, val,
|
return generateSyntaxErrMsgExt(&pCxt->msgBuf, code,
|
||||||
|
"Invalid option %s: %" PRId64 " valid range: [%" PRId64 ", %" PRId64 "]", pName, val,
|
||||||
minVal, maxVal);
|
minVal, maxVal);
|
||||||
}
|
}
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
@ -3818,8 +3819,8 @@ static int32_t checkDbRangeOption(STranslateContext* pCxt, const char* pName, in
|
||||||
return checkRangeOption(pCxt, TSDB_CODE_PAR_INVALID_DB_OPTION, pName, val, minVal, maxVal);
|
return checkRangeOption(pCxt, TSDB_CODE_PAR_INVALID_DB_OPTION, pName, val, minVal, maxVal);
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t checkTableRangeOption(STranslateContext* pCxt, const char* pName, int32_t val, int32_t minVal,
|
static int32_t checkTableRangeOption(STranslateContext* pCxt, const char* pName, int64_t val, int64_t minVal,
|
||||||
int32_t maxVal) {
|
int64_t maxVal) {
|
||||||
return checkRangeOption(pCxt, TSDB_CODE_PAR_INVALID_TABLE_OPTION, pName, val, minVal, maxVal);
|
return checkRangeOption(pCxt, TSDB_CODE_PAR_INVALID_TABLE_OPTION, pName, val, minVal, maxVal);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -4463,6 +4464,37 @@ static int32_t checkTableWatermarkOption(STranslateContext* pCxt, STableOptions*
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t getTableDeleteMarkOption(STranslateContext* pCxt, SValueNode* pVal, int64_t* pMaxDelay) {
|
||||||
|
return getTableDelayOrWatermarkOption(pCxt, "delete_mark", TSDB_MIN_ROLLUP_DELETE_MARK, TSDB_MAX_ROLLUP_DELETE_MARK,
|
||||||
|
pVal, pMaxDelay);
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t checkTableDeleteMarkOption(STranslateContext* pCxt, STableOptions* pOptions, bool createStable,
|
||||||
|
SDbCfgInfo* pDbCfg) {
|
||||||
|
if (NULL == pOptions->pDeleteMark) {
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!createStable || NULL == pDbCfg->pRetensions) {
|
||||||
|
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_TABLE_OPTION,
|
||||||
|
"Invalid option delete_mark: Only supported for create super table in databases "
|
||||||
|
"configured with the 'RETENTIONS' option");
|
||||||
|
}
|
||||||
|
|
||||||
|
if (LIST_LENGTH(pOptions->pDeleteMark) > 2) {
|
||||||
|
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_TABLE_OPTION, "Invalid option delete_mark");
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t code =
|
||||||
|
getTableDeleteMarkOption(pCxt, (SValueNode*)nodesListGetNode(pOptions->pDeleteMark, 0), &pOptions->deleteMark1);
|
||||||
|
if (TSDB_CODE_SUCCESS == code && 2 == LIST_LENGTH(pOptions->pDeleteMark)) {
|
||||||
|
code =
|
||||||
|
getTableDeleteMarkOption(pCxt, (SValueNode*)nodesListGetNode(pOptions->pDeleteMark, 1), &pOptions->deleteMark2);
|
||||||
|
}
|
||||||
|
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t checkCreateTable(STranslateContext* pCxt, SCreateTableStmt* pStmt, bool createStable) {
|
static int32_t checkCreateTable(STranslateContext* pCxt, SCreateTableStmt* pStmt, bool createStable) {
|
||||||
if (NULL != strchr(pStmt->tableName, '.')) {
|
if (NULL != strchr(pStmt->tableName, '.')) {
|
||||||
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_IDENTIFIER_NAME,
|
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_IDENTIFIER_NAME,
|
||||||
|
@ -4482,6 +4514,9 @@ static int32_t checkCreateTable(STranslateContext* pCxt, SCreateTableStmt* pStmt
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = checkTableWatermarkOption(pCxt, pStmt->pOptions, createStable, &dbCfg);
|
code = checkTableWatermarkOption(pCxt, pStmt->pOptions, createStable, &dbCfg);
|
||||||
}
|
}
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
code = checkTableDeleteMarkOption(pCxt, pStmt->pOptions, createStable, &dbCfg);
|
||||||
|
}
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = checkTableRollupOption(pCxt, pStmt->pOptions->pRollupFuncs, createStable, &dbCfg);
|
code = checkTableRollupOption(pCxt, pStmt->pOptions->pRollupFuncs, createStable, &dbCfg);
|
||||||
}
|
}
|
||||||
|
@ -4749,6 +4784,8 @@ static int32_t buildCreateStbReq(STranslateContext* pCxt, SCreateTableStmt* pStm
|
||||||
pReq->delay2 = pStmt->pOptions->maxDelay2;
|
pReq->delay2 = pStmt->pOptions->maxDelay2;
|
||||||
pReq->watermark1 = pStmt->pOptions->watermark1;
|
pReq->watermark1 = pStmt->pOptions->watermark1;
|
||||||
pReq->watermark2 = pStmt->pOptions->watermark2;
|
pReq->watermark2 = pStmt->pOptions->watermark2;
|
||||||
|
pReq->deleteMark1 = pStmt->pOptions->deleteMark1;
|
||||||
|
pReq->deleteMark2 = pStmt->pOptions->deleteMark2;
|
||||||
pReq->colVer = 1;
|
pReq->colVer = 1;
|
||||||
pReq->tagVer = 1;
|
pReq->tagVer = 1;
|
||||||
pReq->source = TD_REQ_FROM_APP;
|
pReq->source = TD_REQ_FROM_APP;
|
||||||
|
@ -5144,20 +5181,34 @@ static int32_t buildCreateSmaReq(STranslateContext* pCxt, SCreateIndexStmt* pStm
|
||||||
(NULL != pStmt->pOptions->pSliding ? ((SValueNode*)pStmt->pOptions->pSliding)->datum.i : pReq->interval);
|
(NULL != pStmt->pOptions->pSliding ? ((SValueNode*)pStmt->pOptions->pSliding)->datum.i : pReq->interval);
|
||||||
pReq->slidingUnit =
|
pReq->slidingUnit =
|
||||||
(NULL != pStmt->pOptions->pSliding ? ((SValueNode*)pStmt->pOptions->pSliding)->unit : pReq->intervalUnit);
|
(NULL != pStmt->pOptions->pSliding ? ((SValueNode*)pStmt->pOptions->pSliding)->unit : pReq->intervalUnit);
|
||||||
|
|
||||||
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
if (NULL != pStmt->pOptions->pStreamOptions) {
|
if (NULL != pStmt->pOptions->pStreamOptions) {
|
||||||
SStreamOptions* pStreamOpt = (SStreamOptions*)pStmt->pOptions->pStreamOptions;
|
SStreamOptions* pStreamOpt = (SStreamOptions*)pStmt->pOptions->pStreamOptions;
|
||||||
pReq->maxDelay = (NULL != pStreamOpt->pDelay ? ((SValueNode*)pStreamOpt->pDelay)->datum.i : -1);
|
if (NULL != pStreamOpt->pDelay) {
|
||||||
pReq->watermark = (NULL != pStreamOpt->pWatermark ? ((SValueNode*)pStreamOpt->pWatermark)->datum.i
|
code = getTableMaxDelayOption(pCxt, (SValueNode*)pStreamOpt->pDelay, &pReq->maxDelay);
|
||||||
: TSDB_DEFAULT_ROLLUP_WATERMARK);
|
} else {
|
||||||
if (pReq->watermark < TSDB_MIN_ROLLUP_WATERMARK) {
|
pReq->maxDelay = -1;
|
||||||
pReq->watermark = TSDB_MIN_ROLLUP_WATERMARK;
|
}
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
if (NULL != pStreamOpt->pWatermark) {
|
||||||
|
code = getTableWatermarkOption(pCxt, (SValueNode*)pStreamOpt->pWatermark, &pReq->watermark);
|
||||||
|
} else {
|
||||||
|
pReq->watermark = TSDB_DEFAULT_ROLLUP_WATERMARK;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
if (NULL != pStreamOpt->pDeleteMark) {
|
||||||
|
code = getTableDeleteMarkOption(pCxt, (SValueNode*)pStreamOpt->pDeleteMark, &pReq->deleteMark);
|
||||||
|
} else {
|
||||||
|
pReq->deleteMark = TSDB_DEFAULT_ROLLUP_DELETE_MARK;
|
||||||
}
|
}
|
||||||
if (pReq->watermark > TSDB_MAX_ROLLUP_WATERMARK) {
|
|
||||||
pReq->watermark = TSDB_MAX_ROLLUP_WATERMARK;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t code = getSmaIndexDstVgId(pCxt, pStmt->dbName, pStmt->tableName, &pReq->dstVgId);
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
code = getSmaIndexDstVgId(pCxt, pStmt->dbName, pStmt->tableName, &pReq->dstVgId);
|
||||||
|
}
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = getSmaIndexSql(pCxt, &pReq->sql, &pReq->sqlLen);
|
code = getSmaIndexSql(pCxt, &pReq->sql, &pReq->sqlLen);
|
||||||
}
|
}
|
||||||
|
@ -5185,16 +5236,6 @@ static int32_t checkCreateSmaIndex(STranslateContext* pCxt, SCreateIndexStmt* pS
|
||||||
code = doTranslateValue(pCxt, (SValueNode*)pStmt->pOptions->pSliding);
|
code = doTranslateValue(pCxt, (SValueNode*)pStmt->pOptions->pSliding);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (TSDB_CODE_SUCCESS == code && NULL != pStmt->pOptions->pStreamOptions) {
|
|
||||||
SStreamOptions* pStreamOpt = (SStreamOptions*)pStmt->pOptions->pStreamOptions;
|
|
||||||
if (NULL != pStreamOpt->pWatermark) {
|
|
||||||
code = doTranslateValue(pCxt, (SValueNode*)pStreamOpt->pWatermark);
|
|
||||||
}
|
|
||||||
if (TSDB_CODE_SUCCESS == code && NULL != pStreamOpt->pDelay) {
|
|
||||||
code = doTranslateValue(pCxt, (SValueNode*)pStreamOpt->pDelay);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -375,9 +375,68 @@ TEST_F(ParserInitialCTest, createFunction) {
|
||||||
TEST_F(ParserInitialCTest, createSmaIndex) {
|
TEST_F(ParserInitialCTest, createSmaIndex) {
|
||||||
useDb("root", "test");
|
useDb("root", "test");
|
||||||
|
|
||||||
|
SMCreateSmaReq expect = {0};
|
||||||
|
|
||||||
|
auto setCreateSmacReq = [&](const char* pIndexName, const char* pStbName, int64_t interval, int8_t intervalUnit,
|
||||||
|
int64_t offset = 0, int64_t sliding = -1, int8_t slidingUnit = -1, int8_t igExists = 0) {
|
||||||
|
memset(&expect, 0, sizeof(SMCreateSmaReq));
|
||||||
|
strcpy(expect.name, pIndexName);
|
||||||
|
strcpy(expect.stb, pStbName);
|
||||||
|
expect.igExists = igExists;
|
||||||
|
expect.intervalUnit = intervalUnit;
|
||||||
|
expect.slidingUnit = slidingUnit < 0 ? intervalUnit : slidingUnit;
|
||||||
|
expect.timezone = 0;
|
||||||
|
expect.dstVgId = 1;
|
||||||
|
expect.interval = interval;
|
||||||
|
expect.offset = offset;
|
||||||
|
expect.sliding = sliding < 0 ? interval : sliding;
|
||||||
|
expect.maxDelay = -1;
|
||||||
|
expect.watermark = TSDB_DEFAULT_ROLLUP_WATERMARK;
|
||||||
|
expect.deleteMark = TSDB_DEFAULT_ROLLUP_DELETE_MARK;
|
||||||
|
};
|
||||||
|
|
||||||
|
auto setOptionsForCreateSmacReq = [&](int64_t maxDelay, int64_t watermark, int64_t deleteMark) {
|
||||||
|
expect.maxDelay = maxDelay;
|
||||||
|
expect.watermark = watermark;
|
||||||
|
expect.deleteMark = deleteMark;
|
||||||
|
};
|
||||||
|
|
||||||
|
setCheckDdlFunc([&](const SQuery* pQuery, ParserStage stage) {
|
||||||
|
ASSERT_EQ(nodeType(pQuery->pRoot), QUERY_NODE_CREATE_INDEX_STMT);
|
||||||
|
SMCreateSmaReq req = {0};
|
||||||
|
ASSERT_TRUE(TSDB_CODE_SUCCESS == tDeserializeSMCreateSmaReq(pQuery->pCmdMsg->pMsg, pQuery->pCmdMsg->msgLen, &req));
|
||||||
|
|
||||||
|
ASSERT_EQ(std::string(req.name), std::string(expect.name));
|
||||||
|
ASSERT_EQ(std::string(req.stb), std::string(expect.stb));
|
||||||
|
ASSERT_EQ(req.igExists, expect.igExists);
|
||||||
|
ASSERT_EQ(req.intervalUnit, expect.intervalUnit);
|
||||||
|
ASSERT_EQ(req.slidingUnit, expect.slidingUnit);
|
||||||
|
ASSERT_EQ(req.timezone, expect.timezone);
|
||||||
|
ASSERT_EQ(req.dstVgId, expect.dstVgId);
|
||||||
|
ASSERT_EQ(req.interval, expect.interval);
|
||||||
|
ASSERT_EQ(req.offset, expect.offset);
|
||||||
|
ASSERT_EQ(req.sliding, expect.sliding);
|
||||||
|
ASSERT_EQ(req.maxDelay, expect.maxDelay);
|
||||||
|
ASSERT_EQ(req.watermark, expect.watermark);
|
||||||
|
ASSERT_EQ(req.deleteMark, expect.deleteMark);
|
||||||
|
ASSERT_GT(req.exprLen, 0);
|
||||||
|
ASSERT_EQ(req.tagsFilterLen, 0);
|
||||||
|
ASSERT_GT(req.sqlLen, 0);
|
||||||
|
ASSERT_GT(req.astLen, 0);
|
||||||
|
ASSERT_NE(req.expr, nullptr);
|
||||||
|
ASSERT_EQ(req.tagsFilter, nullptr);
|
||||||
|
ASSERT_NE(req.sql, nullptr);
|
||||||
|
ASSERT_NE(req.ast, nullptr);
|
||||||
|
tFreeSMCreateSmaReq(&req);
|
||||||
|
});
|
||||||
|
|
||||||
|
setCreateSmacReq("0.test.index1", "0.test.t1", 10 * MILLISECOND_PER_SECOND, 's');
|
||||||
run("CREATE SMA INDEX index1 ON t1 FUNCTION(MAX(c1), MIN(c3 + 10), SUM(c4)) INTERVAL(10s)");
|
run("CREATE SMA INDEX index1 ON t1 FUNCTION(MAX(c1), MIN(c3 + 10), SUM(c4)) INTERVAL(10s)");
|
||||||
|
|
||||||
run("CREATE SMA INDEX index2 ON st1 FUNCTION(MAX(c1), MIN(tag1)) INTERVAL(10s)");
|
setCreateSmacReq("0.test.index2", "0.test.st1", 5 * MILLISECOND_PER_SECOND, 's');
|
||||||
|
setOptionsForCreateSmacReq(10 * MILLISECOND_PER_SECOND, 20 * MILLISECOND_PER_SECOND, 1000 * MILLISECOND_PER_SECOND);
|
||||||
|
run("CREATE SMA INDEX index2 ON st1 FUNCTION(MAX(c1), MIN(tag1)) INTERVAL(5s) WATERMARK 20s MAX_DELAY 10s "
|
||||||
|
"DELETE_MARK 1000s");
|
||||||
}
|
}
|
||||||
|
|
||||||
TEST_F(ParserInitialCTest, createMnode) {
|
TEST_F(ParserInitialCTest, createMnode) {
|
||||||
|
@ -408,9 +467,10 @@ TEST_F(ParserInitialCTest, createStable) {
|
||||||
memset(&expect, 0, sizeof(SMCreateStbReq));
|
memset(&expect, 0, sizeof(SMCreateStbReq));
|
||||||
};
|
};
|
||||||
|
|
||||||
auto setCreateStbReqFunc = [&](const char* pDbName, const char* pTbName, int8_t igExists = 0, int64_t delay1 = -1,
|
auto setCreateStbReqFunc =
|
||||||
int64_t delay2 = -1, int64_t watermark1 = TSDB_DEFAULT_ROLLUP_WATERMARK,
|
[&](const char* pDbName, const char* pTbName, int8_t igExists = 0, int64_t delay1 = -1, int64_t delay2 = -1,
|
||||||
int64_t watermark2 = TSDB_DEFAULT_ROLLUP_WATERMARK,
|
int64_t watermark1 = TSDB_DEFAULT_ROLLUP_WATERMARK, int64_t watermark2 = TSDB_DEFAULT_ROLLUP_WATERMARK,
|
||||||
|
int64_t deleteMark1 = TSDB_DEFAULT_ROLLUP_DELETE_MARK, int64_t deleteMark2 = TSDB_DEFAULT_ROLLUP_DELETE_MARK,
|
||||||
int32_t ttl = TSDB_DEFAULT_TABLE_TTL, const char* pComment = nullptr) {
|
int32_t ttl = TSDB_DEFAULT_TABLE_TTL, const char* pComment = nullptr) {
|
||||||
int32_t len = snprintf(expect.name, sizeof(expect.name), "0.%s.%s", pDbName, pTbName);
|
int32_t len = snprintf(expect.name, sizeof(expect.name), "0.%s.%s", pDbName, pTbName);
|
||||||
expect.name[len] = '\0';
|
expect.name[len] = '\0';
|
||||||
|
@ -419,6 +479,8 @@ TEST_F(ParserInitialCTest, createStable) {
|
||||||
expect.delay2 = delay2;
|
expect.delay2 = delay2;
|
||||||
expect.watermark1 = watermark1;
|
expect.watermark1 = watermark1;
|
||||||
expect.watermark2 = watermark2;
|
expect.watermark2 = watermark2;
|
||||||
|
expect.deleteMark1 = deleteMark1;
|
||||||
|
expect.deleteMark2 = deleteMark2;
|
||||||
// expect.ttl = ttl;
|
// expect.ttl = ttl;
|
||||||
if (nullptr != pComment) {
|
if (nullptr != pComment) {
|
||||||
expect.pComment = strdup(pComment);
|
expect.pComment = strdup(pComment);
|
||||||
|
@ -511,7 +573,8 @@ TEST_F(ParserInitialCTest, createStable) {
|
||||||
clearCreateStbReq();
|
clearCreateStbReq();
|
||||||
|
|
||||||
setCreateStbReqFunc("rollup_db", "t1", 1, 100 * MILLISECOND_PER_SECOND, 10 * MILLISECOND_PER_MINUTE, 10,
|
setCreateStbReqFunc("rollup_db", "t1", 1, 100 * MILLISECOND_PER_SECOND, 10 * MILLISECOND_PER_MINUTE, 10,
|
||||||
1 * MILLISECOND_PER_MINUTE, 100, "test create table");
|
1 * MILLISECOND_PER_MINUTE, 1000 * MILLISECOND_PER_SECOND, 200 * MILLISECOND_PER_MINUTE, 100,
|
||||||
|
"test create table");
|
||||||
addFieldToCreateStbReqFunc(true, "ts", TSDB_DATA_TYPE_TIMESTAMP, 0, 0);
|
addFieldToCreateStbReqFunc(true, "ts", TSDB_DATA_TYPE_TIMESTAMP, 0, 0);
|
||||||
addFieldToCreateStbReqFunc(true, "c1", TSDB_DATA_TYPE_INT);
|
addFieldToCreateStbReqFunc(true, "c1", TSDB_DATA_TYPE_INT);
|
||||||
addFieldToCreateStbReqFunc(true, "c2", TSDB_DATA_TYPE_UINT);
|
addFieldToCreateStbReqFunc(true, "c2", TSDB_DATA_TYPE_UINT);
|
||||||
|
@ -549,7 +612,8 @@ TEST_F(ParserInitialCTest, createStable) {
|
||||||
"TAGS (a1 TIMESTAMP, a2 INT, a3 INT UNSIGNED, a4 BIGINT, a5 BIGINT UNSIGNED, a6 FLOAT, a7 DOUBLE, "
|
"TAGS (a1 TIMESTAMP, a2 INT, a3 INT UNSIGNED, a4 BIGINT, a5 BIGINT UNSIGNED, a6 FLOAT, a7 DOUBLE, "
|
||||||
"a8 BINARY(20), a9 SMALLINT, a10 SMALLINT UNSIGNED COMMENT 'test column comment', a11 TINYINT, "
|
"a8 BINARY(20), a9 SMALLINT, a10 SMALLINT UNSIGNED COMMENT 'test column comment', a11 TINYINT, "
|
||||||
"a12 TINYINT UNSIGNED, a13 BOOL, a14 NCHAR(30), a15 VARCHAR(50)) "
|
"a12 TINYINT UNSIGNED, a13 BOOL, a14 NCHAR(30), a15 VARCHAR(50)) "
|
||||||
"TTL 100 COMMENT 'test create table' SMA(c1, c2, c3) ROLLUP (MIN) MAX_DELAY 100s,10m WATERMARK 10a,1m");
|
"TTL 100 COMMENT 'test create table' SMA(c1, c2, c3) ROLLUP (MIN) MAX_DELAY 100s,10m WATERMARK 10a,1m "
|
||||||
|
"DELETE_MARK 1000s,200m");
|
||||||
clearCreateStbReq();
|
clearCreateStbReq();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -702,6 +702,7 @@ static int32_t createWindowLogicNodeFinalize(SLogicPlanContext* pCxt, SSelectStm
|
||||||
if (pCxt->pPlanCxt->streamQuery) {
|
if (pCxt->pPlanCxt->streamQuery) {
|
||||||
pWindow->triggerType = pCxt->pPlanCxt->triggerType;
|
pWindow->triggerType = pCxt->pPlanCxt->triggerType;
|
||||||
pWindow->watermark = pCxt->pPlanCxt->watermark;
|
pWindow->watermark = pCxt->pPlanCxt->watermark;
|
||||||
|
pWindow->deleteMark = pCxt->pPlanCxt->deleteMark;
|
||||||
pWindow->igExpired = pCxt->pPlanCxt->igExpired;
|
pWindow->igExpired = pCxt->pPlanCxt->igExpired;
|
||||||
}
|
}
|
||||||
pWindow->inputTsOrder = ORDER_ASC;
|
pWindow->inputTsOrder = ORDER_ASC;
|
||||||
|
|
|
@ -330,6 +330,7 @@ static void scanPathOptSetScanWin(SScanLogicNode* pScan) {
|
||||||
pScan->slidingUnit = ((SWindowLogicNode*)pParent)->slidingUnit;
|
pScan->slidingUnit = ((SWindowLogicNode*)pParent)->slidingUnit;
|
||||||
pScan->triggerType = ((SWindowLogicNode*)pParent)->triggerType;
|
pScan->triggerType = ((SWindowLogicNode*)pParent)->triggerType;
|
||||||
pScan->watermark = ((SWindowLogicNode*)pParent)->watermark;
|
pScan->watermark = ((SWindowLogicNode*)pParent)->watermark;
|
||||||
|
pScan->deleteMark = ((SWindowLogicNode*)pParent)->deleteMark;
|
||||||
pScan->igExpired = ((SWindowLogicNode*)pParent)->igExpired;
|
pScan->igExpired = ((SWindowLogicNode*)pParent)->igExpired;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1139,6 +1139,7 @@ static int32_t createWindowPhysiNodeFinalize(SPhysiPlanContext* pCxt, SNodeList*
|
||||||
SWindowLogicNode* pWindowLogicNode) {
|
SWindowLogicNode* pWindowLogicNode) {
|
||||||
pWindow->triggerType = pWindowLogicNode->triggerType;
|
pWindow->triggerType = pWindowLogicNode->triggerType;
|
||||||
pWindow->watermark = pWindowLogicNode->watermark;
|
pWindow->watermark = pWindowLogicNode->watermark;
|
||||||
|
pWindow->deleteMark = pWindowLogicNode->deleteMark;
|
||||||
pWindow->igExpired = pWindowLogicNode->igExpired;
|
pWindow->igExpired = pWindowLogicNode->igExpired;
|
||||||
pWindow->inputTsOrder = pWindowLogicNode->inputTsOrder;
|
pWindow->inputTsOrder = pWindowLogicNode->inputTsOrder;
|
||||||
pWindow->outputTsOrder = pWindowLogicNode->outputTsOrder;
|
pWindow->outputTsOrder = pWindowLogicNode->outputTsOrder;
|
||||||
|
|
|
@ -51,7 +51,7 @@ TEST_F(PlanOtherTest, createStreamUseSTable) {
|
||||||
TEST_F(PlanOtherTest, createSmaIndex) {
|
TEST_F(PlanOtherTest, createSmaIndex) {
|
||||||
useDb("root", "test");
|
useDb("root", "test");
|
||||||
|
|
||||||
run("CREATE SMA INDEX idx1 ON t1 FUNCTION(MAX(c1), MIN(c3 + 10), SUM(c4)) INTERVAL(10s)");
|
run("CREATE SMA INDEX idx1 ON t1 FUNCTION(MAX(c1), MIN(c3 + 10), SUM(c4)) INTERVAL(10s) DELETE_MARK 1000s");
|
||||||
|
|
||||||
run("SELECT SUM(c4) FROM t1 INTERVAL(10s)");
|
run("SELECT SUM(c4) FROM t1 INTERVAL(10s)");
|
||||||
|
|
||||||
|
|
|
@ -444,6 +444,7 @@ class PlannerTestBaseImpl {
|
||||||
tDeserializeSMCreateSmaReq(pQuery->pCmdMsg->pMsg, pQuery->pCmdMsg->msgLen, &req);
|
tDeserializeSMCreateSmaReq(pQuery->pCmdMsg->pMsg, pQuery->pCmdMsg->msgLen, &req);
|
||||||
g_mockCatalogService->createSmaIndex(&req);
|
g_mockCatalogService->createSmaIndex(&req);
|
||||||
nodesStringToNode(req.ast, &pCxt->pAstRoot);
|
nodesStringToNode(req.ast, &pCxt->pAstRoot);
|
||||||
|
pCxt->deleteMark = req.deleteMark;
|
||||||
tFreeSMCreateSmaReq(&req);
|
tFreeSMCreateSmaReq(&req);
|
||||||
nodesDestroyNode(pQuery->pRoot);
|
nodesDestroyNode(pQuery->pRoot);
|
||||||
pQuery->pRoot = pCxt->pAstRoot;
|
pQuery->pRoot = pCxt->pAstRoot;
|
||||||
|
|
Loading…
Reference in New Issue