Merge branch '3.0' of https://github.com/taosdata/TDengine into refact/submit_req

This commit is contained in:
Hongze Cheng 2022-12-07 09:04:00 +08:00
commit 2d85c7c451
47 changed files with 3697 additions and 3282 deletions

View File

@ -502,6 +502,8 @@ typedef struct {
char* pComment; char* pComment;
char* pAst1; char* pAst1;
char* pAst2; char* pAst2;
int64_t deleteMark1;
int64_t deleteMark2;
} SMCreateStbReq; } SMCreateStbReq;
int32_t tSerializeSMCreateStbReq(void* buf, int32_t bufLen, SMCreateStbReq* pReq); int32_t tSerializeSMCreateStbReq(void* buf, int32_t bufLen, SMCreateStbReq* pReq);
@ -2017,6 +2019,7 @@ typedef struct {
typedef struct { typedef struct {
int64_t maxdelay[2]; int64_t maxdelay[2];
int64_t watermark[2]; int64_t watermark[2];
int64_t deleteMark[2];
int32_t qmsgLen[2]; int32_t qmsgLen[2];
char* qmsg[2]; // pAst:qmsg:SRetention => trigger aggr task1/2 char* qmsg[2]; // pAst:qmsg:SRetention => trigger aggr task1/2
} SRSmaParam; } SRSmaParam;
@ -2743,6 +2746,7 @@ typedef struct {
char* tagsFilter; char* tagsFilter;
char* sql; char* sql;
char* ast; char* ast;
int64_t deleteMark;
} SMCreateSmaReq; } SMCreateSmaReq;
int32_t tSerializeSMCreateSmaReq(void* buf, int32_t bufLen, SMCreateSmaReq* pReq); int32_t tSerializeSMCreateSmaReq(void* buf, int32_t bufLen, SMCreateSmaReq* pReq);

View File

@ -147,195 +147,196 @@
#define TK_ROLLUP 129 #define TK_ROLLUP 129
#define TK_TTL 130 #define TK_TTL 130
#define TK_SMA 131 #define TK_SMA 131
#define TK_FIRST 132 #define TK_DELETE_MARK 132
#define TK_LAST 133 #define TK_FIRST 133
#define TK_SHOW 134 #define TK_LAST 134
#define TK_PRIVILEGES 135 #define TK_SHOW 135
#define TK_DATABASES 136 #define TK_PRIVILEGES 136
#define TK_TABLES 137 #define TK_DATABASES 137
#define TK_STABLES 138 #define TK_TABLES 138
#define TK_MNODES 139 #define TK_STABLES 139
#define TK_QNODES 140 #define TK_MNODES 140
#define TK_FUNCTIONS 141 #define TK_QNODES 141
#define TK_INDEXES 142 #define TK_FUNCTIONS 142
#define TK_ACCOUNTS 143 #define TK_INDEXES 143
#define TK_APPS 144 #define TK_ACCOUNTS 144
#define TK_CONNECTIONS 145 #define TK_APPS 145
#define TK_LICENCES 146 #define TK_CONNECTIONS 146
#define TK_GRANTS 147 #define TK_LICENCES 147
#define TK_QUERIES 148 #define TK_GRANTS 148
#define TK_SCORES 149 #define TK_QUERIES 149
#define TK_TOPICS 150 #define TK_SCORES 150
#define TK_VARIABLES 151 #define TK_TOPICS 151
#define TK_CLUSTER 152 #define TK_VARIABLES 152
#define TK_BNODES 153 #define TK_CLUSTER 153
#define TK_SNODES 154 #define TK_BNODES 154
#define TK_TRANSACTIONS 155 #define TK_SNODES 155
#define TK_DISTRIBUTED 156 #define TK_TRANSACTIONS 156
#define TK_CONSUMERS 157 #define TK_DISTRIBUTED 157
#define TK_SUBSCRIPTIONS 158 #define TK_CONSUMERS 158
#define TK_VNODES 159 #define TK_SUBSCRIPTIONS 159
#define TK_LIKE 160 #define TK_VNODES 160
#define TK_TBNAME 161 #define TK_LIKE 161
#define TK_QTAGS 162 #define TK_TBNAME 162
#define TK_AS 163 #define TK_QTAGS 163
#define TK_INDEX 164 #define TK_AS 164
#define TK_FUNCTION 165 #define TK_INDEX 165
#define TK_INTERVAL 166 #define TK_FUNCTION 166
#define TK_TOPIC 167 #define TK_INTERVAL 167
#define TK_WITH 168 #define TK_TOPIC 168
#define TK_META 169 #define TK_WITH 169
#define TK_CONSUMER 170 #define TK_META 170
#define TK_GROUP 171 #define TK_CONSUMER 171
#define TK_DESC 172 #define TK_GROUP 172
#define TK_DESCRIBE 173 #define TK_DESC 173
#define TK_RESET 174 #define TK_DESCRIBE 174
#define TK_QUERY 175 #define TK_RESET 175
#define TK_CACHE 176 #define TK_QUERY 176
#define TK_EXPLAIN 177 #define TK_CACHE 177
#define TK_ANALYZE 178 #define TK_EXPLAIN 178
#define TK_VERBOSE 179 #define TK_ANALYZE 179
#define TK_NK_BOOL 180 #define TK_VERBOSE 180
#define TK_RATIO 181 #define TK_NK_BOOL 181
#define TK_NK_FLOAT 182 #define TK_RATIO 182
#define TK_OUTPUTTYPE 183 #define TK_NK_FLOAT 183
#define TK_AGGREGATE 184 #define TK_OUTPUTTYPE 184
#define TK_BUFSIZE 185 #define TK_AGGREGATE 185
#define TK_STREAM 186 #define TK_BUFSIZE 186
#define TK_INTO 187 #define TK_STREAM 187
#define TK_TRIGGER 188 #define TK_INTO 188
#define TK_AT_ONCE 189 #define TK_TRIGGER 189
#define TK_WINDOW_CLOSE 190 #define TK_AT_ONCE 190
#define TK_IGNORE 191 #define TK_WINDOW_CLOSE 191
#define TK_EXPIRED 192 #define TK_IGNORE 192
#define TK_FILL_HISTORY 193 #define TK_EXPIRED 193
#define TK_SUBTABLE 194 #define TK_FILL_HISTORY 194
#define TK_KILL 195 #define TK_SUBTABLE 195
#define TK_CONNECTION 196 #define TK_KILL 196
#define TK_TRANSACTION 197 #define TK_CONNECTION 197
#define TK_BALANCE 198 #define TK_TRANSACTION 198
#define TK_VGROUP 199 #define TK_BALANCE 199
#define TK_MERGE 200 #define TK_VGROUP 200
#define TK_REDISTRIBUTE 201 #define TK_MERGE 201
#define TK_SPLIT 202 #define TK_REDISTRIBUTE 202
#define TK_DELETE 203 #define TK_SPLIT 203
#define TK_INSERT 204 #define TK_DELETE 204
#define TK_NULL 205 #define TK_INSERT 205
#define TK_NK_QUESTION 206 #define TK_NULL 206
#define TK_NK_ARROW 207 #define TK_NK_QUESTION 207
#define TK_ROWTS 208 #define TK_NK_ARROW 208
#define TK_QSTART 209 #define TK_ROWTS 209
#define TK_QEND 210 #define TK_QSTART 210
#define TK_QDURATION 211 #define TK_QEND 211
#define TK_WSTART 212 #define TK_QDURATION 212
#define TK_WEND 213 #define TK_WSTART 213
#define TK_WDURATION 214 #define TK_WEND 214
#define TK_IROWTS 215 #define TK_WDURATION 215
#define TK_CAST 216 #define TK_IROWTS 216
#define TK_NOW 217 #define TK_CAST 217
#define TK_TODAY 218 #define TK_NOW 218
#define TK_TIMEZONE 219 #define TK_TODAY 219
#define TK_CLIENT_VERSION 220 #define TK_TIMEZONE 220
#define TK_SERVER_VERSION 221 #define TK_CLIENT_VERSION 221
#define TK_SERVER_STATUS 222 #define TK_SERVER_VERSION 222
#define TK_CURRENT_USER 223 #define TK_SERVER_STATUS 223
#define TK_COUNT 224 #define TK_CURRENT_USER 224
#define TK_LAST_ROW 225 #define TK_COUNT 225
#define TK_CASE 226 #define TK_LAST_ROW 226
#define TK_END 227 #define TK_CASE 227
#define TK_WHEN 228 #define TK_END 228
#define TK_THEN 229 #define TK_WHEN 229
#define TK_ELSE 230 #define TK_THEN 230
#define TK_BETWEEN 231 #define TK_ELSE 231
#define TK_IS 232 #define TK_BETWEEN 232
#define TK_NK_LT 233 #define TK_IS 233
#define TK_NK_GT 234 #define TK_NK_LT 234
#define TK_NK_LE 235 #define TK_NK_GT 235
#define TK_NK_GE 236 #define TK_NK_LE 236
#define TK_NK_NE 237 #define TK_NK_GE 237
#define TK_MATCH 238 #define TK_NK_NE 238
#define TK_NMATCH 239 #define TK_MATCH 239
#define TK_CONTAINS 240 #define TK_NMATCH 240
#define TK_IN 241 #define TK_CONTAINS 241
#define TK_JOIN 242 #define TK_IN 242
#define TK_INNER 243 #define TK_JOIN 243
#define TK_SELECT 244 #define TK_INNER 244
#define TK_DISTINCT 245 #define TK_SELECT 245
#define TK_WHERE 246 #define TK_DISTINCT 246
#define TK_PARTITION 247 #define TK_WHERE 247
#define TK_BY 248 #define TK_PARTITION 248
#define TK_SESSION 249 #define TK_BY 249
#define TK_STATE_WINDOW 250 #define TK_SESSION 250
#define TK_SLIDING 251 #define TK_STATE_WINDOW 251
#define TK_FILL 252 #define TK_SLIDING 252
#define TK_VALUE 253 #define TK_FILL 253
#define TK_NONE 254 #define TK_VALUE 254
#define TK_PREV 255 #define TK_NONE 255
#define TK_LINEAR 256 #define TK_PREV 256
#define TK_NEXT 257 #define TK_LINEAR 257
#define TK_HAVING 258 #define TK_NEXT 258
#define TK_RANGE 259 #define TK_HAVING 259
#define TK_EVERY 260 #define TK_RANGE 260
#define TK_ORDER 261 #define TK_EVERY 261
#define TK_SLIMIT 262 #define TK_ORDER 262
#define TK_SOFFSET 263 #define TK_SLIMIT 263
#define TK_LIMIT 264 #define TK_SOFFSET 264
#define TK_OFFSET 265 #define TK_LIMIT 265
#define TK_ASC 266 #define TK_OFFSET 266
#define TK_NULLS 267 #define TK_ASC 267
#define TK_ABORT 268 #define TK_NULLS 268
#define TK_AFTER 269 #define TK_ABORT 269
#define TK_ATTACH 270 #define TK_AFTER 270
#define TK_BEFORE 271 #define TK_ATTACH 271
#define TK_BEGIN 272 #define TK_BEFORE 272
#define TK_BITAND 273 #define TK_BEGIN 273
#define TK_BITNOT 274 #define TK_BITAND 274
#define TK_BITOR 275 #define TK_BITNOT 275
#define TK_BLOCKS 276 #define TK_BITOR 276
#define TK_CHANGE 277 #define TK_BLOCKS 277
#define TK_COMMA 278 #define TK_CHANGE 278
#define TK_COMPACT 279 #define TK_COMMA 279
#define TK_CONCAT 280 #define TK_COMPACT 280
#define TK_CONFLICT 281 #define TK_CONCAT 281
#define TK_COPY 282 #define TK_CONFLICT 282
#define TK_DEFERRED 283 #define TK_COPY 283
#define TK_DELIMITERS 284 #define TK_DEFERRED 284
#define TK_DETACH 285 #define TK_DELIMITERS 285
#define TK_DIVIDE 286 #define TK_DETACH 286
#define TK_DOT 287 #define TK_DIVIDE 287
#define TK_EACH 288 #define TK_DOT 288
#define TK_FAIL 289 #define TK_EACH 289
#define TK_FILE 290 #define TK_FAIL 290
#define TK_FOR 291 #define TK_FILE 291
#define TK_GLOB 292 #define TK_FOR 292
#define TK_ID 293 #define TK_GLOB 293
#define TK_IMMEDIATE 294 #define TK_ID 294
#define TK_IMPORT 295 #define TK_IMMEDIATE 295
#define TK_INITIALLY 296 #define TK_IMPORT 296
#define TK_INSTEAD 297 #define TK_INITIALLY 297
#define TK_ISNULL 298 #define TK_INSTEAD 298
#define TK_KEY 299 #define TK_ISNULL 299
#define TK_MODULES 300 #define TK_KEY 300
#define TK_NK_BITNOT 301 #define TK_MODULES 301
#define TK_NK_SEMI 302 #define TK_NK_BITNOT 302
#define TK_NOTNULL 303 #define TK_NK_SEMI 303
#define TK_OF 304 #define TK_NOTNULL 304
#define TK_PLUS 305 #define TK_OF 305
#define TK_PRIVILEGE 306 #define TK_PLUS 306
#define TK_RAISE 307 #define TK_PRIVILEGE 307
#define TK_REPLACE 308 #define TK_RAISE 308
#define TK_RESTRICT 309 #define TK_REPLACE 309
#define TK_ROW 310 #define TK_RESTRICT 310
#define TK_SEMI 311 #define TK_ROW 311
#define TK_STAR 312 #define TK_SEMI 312
#define TK_STATEMENT 313 #define TK_STAR 313
#define TK_STRING 314 #define TK_STATEMENT 314
#define TK_TIMES 315 #define TK_STRING 315
#define TK_UPDATE 316 #define TK_TIMES 316
#define TK_VALUES 317 #define TK_UPDATE 317
#define TK_VARIABLE 318 #define TK_VALUES 318
#define TK_VIEW 319 #define TK_VARIABLE 319
#define TK_WAL 320 #define TK_VIEW 320
#define TK_WAL 321
#define TK_NK_SPACE 600 #define TK_NK_SPACE 600
#define TK_NK_COMMENT 601 #define TK_NK_COMMENT 601

View File

@ -133,6 +133,9 @@ typedef struct STableOptions {
SNodeList* pWatermark; SNodeList* pWatermark;
int64_t watermark1; int64_t watermark1;
int64_t watermark2; int64_t watermark2;
SNodeList* pDeleteMark;
int64_t deleteMark1;
int64_t deleteMark2;
SNodeList* pRollupFuncs; SNodeList* pRollupFuncs;
int32_t ttl; int32_t ttl;
SNodeList* pSma; SNodeList* pSma;
@ -383,6 +386,7 @@ typedef struct SStreamOptions {
int8_t triggerType; int8_t triggerType;
SNode* pDelay; SNode* pDelay;
SNode* pWatermark; SNode* pWatermark;
SNode* pDeleteMark;
int8_t fillHistory; int8_t fillHistory;
int8_t ignoreExpired; int8_t ignoreExpired;
} SStreamOptions; } SStreamOptions;

View File

@ -91,6 +91,7 @@ typedef struct SScanLogicNode {
SNode* pTagIndexCond; SNode* pTagIndexCond;
int8_t triggerType; int8_t triggerType;
int64_t watermark; int64_t watermark;
int64_t deleteMark;
int8_t igExpired; int8_t igExpired;
SArray* pSmaIndexes; SArray* pSmaIndexes;
SNodeList* pGroupTags; SNodeList* pGroupTags;
@ -213,6 +214,7 @@ typedef struct SWindowLogicNode {
SNode* pStateExpr; SNode* pStateExpr;
int8_t triggerType; int8_t triggerType;
int64_t watermark; int64_t watermark;
int64_t deleteMark;
int8_t igExpired; int8_t igExpired;
EWindowAlgorithm windowAlgo; EWindowAlgorithm windowAlgo;
EOrder inputTsOrder; EOrder inputTsOrder;
@ -440,6 +442,7 @@ typedef struct SWinodwPhysiNode {
SNode* pTsEnd; // window end timestamp SNode* pTsEnd; // window end timestamp
int8_t triggerType; int8_t triggerType;
int64_t watermark; int64_t watermark;
int64_t deleteMark;
int8_t igExpired; int8_t igExpired;
EOrder inputTsOrder; EOrder inputTsOrder;
EOrder outputTsOrder; EOrder outputTsOrder;

View File

@ -34,6 +34,7 @@ typedef struct SPlanContext {
bool showRewrite; bool showRewrite;
int8_t triggerType; int8_t triggerType;
int64_t watermark; int64_t watermark;
int64_t deleteMark;
int8_t igExpired; int8_t igExpired;
char* pMsg; char* pMsg;
int32_t msgLen; int32_t msgLen;

View File

@ -25,7 +25,7 @@ extern "C" {
#include "tlrucache.h" #include "tlrucache.h"
#include "tmsgcb.h" #include "tmsgcb.h"
#define SYNC_RESP_TTL_MS 10000000 #define SYNC_RESP_TTL_MS 30000
#define SYNC_SPEED_UP_HB_TIMER 400 #define SYNC_SPEED_UP_HB_TIMER 400
#define SYNC_SPEED_UP_AFTER_MS (1000 * 20) #define SYNC_SPEED_UP_AFTER_MS (1000 * 20)
#define SYNC_SLOW_DOWN_RANGE 100 #define SYNC_SLOW_DOWN_RANGE 100

View File

@ -72,6 +72,7 @@ typedef struct SRpcMsg {
typedef void (*RpcCfp)(void *parent, SRpcMsg *, SEpSet *epset); typedef void (*RpcCfp)(void *parent, SRpcMsg *, SEpSet *epset);
typedef bool (*RpcRfp)(int32_t code, tmsg_t msgType); typedef bool (*RpcRfp)(int32_t code, tmsg_t msgType);
typedef bool (*RpcTfp)(int32_t code, tmsg_t msgType); typedef bool (*RpcTfp)(int32_t code, tmsg_t msgType);
typedef bool (*RpcFFfp)(tmsg_t msgType);
typedef void (*RpcDfp)(void *ahandle); typedef void (*RpcDfp)(void *ahandle);
typedef struct SRpcInit { typedef struct SRpcInit {
@ -90,6 +91,9 @@ typedef struct SRpcInit {
int32_t retryMaxInterval; // retry max interval int32_t retryMaxInterval; // retry max interval
int64_t retryMaxTimouet; int64_t retryMaxTimouet;
int32_t failFastThreshold;
int32_t failFastInterval;
int32_t compressSize; // -1: no compress, 0 : all data compressed, size: compress data if larger than size int32_t compressSize; // -1: no compress, 0 : all data compressed, size: compress data if larger than size
int8_t encryption; // encrypt or not int8_t encryption; // encrypt or not
@ -107,6 +111,8 @@ typedef struct SRpcInit {
// destroy client ahandle; // destroy client ahandle;
RpcDfp dfp; RpcDfp dfp;
// fail fast fp
RpcFFfp ffp;
void *parent; void *parent;
} SRpcInit; } SRpcInit;

View File

@ -382,13 +382,16 @@ typedef enum ELogicConditionType {
#define TSDB_DB_MIN_WAL_SEGMENT_SIZE 0 #define TSDB_DB_MIN_WAL_SEGMENT_SIZE 0
#define TSDB_DEFAULT_DB_WAL_SEGMENT_SIZE 0 #define TSDB_DEFAULT_DB_WAL_SEGMENT_SIZE 0
#define TSDB_MIN_ROLLUP_MAX_DELAY 1 // unit millisecond #define TSDB_MIN_ROLLUP_MAX_DELAY 1 // unit millisecond
#define TSDB_MAX_ROLLUP_MAX_DELAY (15 * 60 * 1000) #define TSDB_MAX_ROLLUP_MAX_DELAY (15 * 60 * 1000)
#define TSDB_MIN_ROLLUP_WATERMARK 0 // unit millisecond #define TSDB_MIN_ROLLUP_WATERMARK 0 // unit millisecond
#define TSDB_MAX_ROLLUP_WATERMARK (15 * 60 * 1000) #define TSDB_MAX_ROLLUP_WATERMARK (15 * 60 * 1000)
#define TSDB_DEFAULT_ROLLUP_WATERMARK 5000 #define TSDB_DEFAULT_ROLLUP_WATERMARK 5000
#define TSDB_MIN_TABLE_TTL 0 #define TSDB_MIN_ROLLUP_DELETE_MARK 0 // unit millisecond
#define TSDB_DEFAULT_TABLE_TTL 0 #define TSDB_MAX_ROLLUP_DELETE_MARK INT64_MAX
#define TSDB_DEFAULT_ROLLUP_DELETE_MARK 900000 // 900s
#define TSDB_MIN_TABLE_TTL 0
#define TSDB_DEFAULT_TABLE_TTL 0
#define TSDB_MIN_EXPLAIN_RATIO 0 #define TSDB_MIN_EXPLAIN_RATIO 0
#define TSDB_MAX_EXPLAIN_RATIO 1 #define TSDB_MAX_EXPLAIN_RATIO 1

View File

@ -231,10 +231,9 @@ void destroyTscObj(void *pObj) {
tscDebug("connObj 0x%" PRIx64 " p:%p destroyed, remain inst totalConn:%" PRId64, pTscObj->id, pTscObj, tscDebug("connObj 0x%" PRIx64 " p:%p destroyed, remain inst totalConn:%" PRId64, pTscObj->id, pTscObj,
pTscObj->pAppInfo->numOfConns); pTscObj->pAppInfo->numOfConns);
int64_t connNum = atomic_sub_fetch_64(&pTscObj->pAppInfo->numOfConns, 1); // In any cases, we should not free app inst here. Or an race condition rises.
if (0 == connNum) { /*int64_t connNum = */atomic_sub_fetch_64(&pTscObj->pAppInfo->numOfConns, 1);
destroyAppInst(pTscObj->pAppInfo);
}
taosThreadMutexDestroy(&pTscObj->mutex); taosThreadMutexDestroy(&pTscObj->mutex);
taosMemoryFree(pTscObj); taosMemoryFree(pTscObj);

View File

@ -551,6 +551,8 @@ int32_t tSerializeSMCreateStbReq(void *buf, int32_t bufLen, SMCreateStbReq *pReq
if (pReq->ast2Len > 0) { if (pReq->ast2Len > 0) {
if (tEncodeBinary(&encoder, pReq->pAst2, pReq->ast2Len) < 0) return -1; if (tEncodeBinary(&encoder, pReq->pAst2, pReq->ast2Len) < 0) return -1;
} }
if (tEncodeI64(&encoder, pReq->deleteMark1) < 0) return -1;
if (tEncodeI64(&encoder, pReq->deleteMark2) < 0) return -1;
tEndEncode(&encoder); tEndEncode(&encoder);
@ -644,6 +646,9 @@ int32_t tDeserializeSMCreateStbReq(void *buf, int32_t bufLen, SMCreateStbReq *pR
if (tDecodeCStrTo(&decoder, pReq->pAst2) < 0) return -1; if (tDecodeCStrTo(&decoder, pReq->pAst2) < 0) return -1;
} }
if (tDecodeI64(&decoder, &pReq->deleteMark1) < 0) return -1;
if (tDecodeI64(&decoder, &pReq->deleteMark2) < 0) return -1;
tEndDecode(&decoder); tEndDecode(&decoder);
tDecoderClear(&decoder); tDecoderClear(&decoder);
return 0; return 0;
@ -822,6 +827,7 @@ int32_t tSerializeSMCreateSmaReq(void *buf, int32_t bufLen, SMCreateSmaReq *pReq
if (pReq->astLen > 0) { if (pReq->astLen > 0) {
if (tEncodeBinary(&encoder, pReq->ast, pReq->astLen) < 0) return -1; if (tEncodeBinary(&encoder, pReq->ast, pReq->astLen) < 0) return -1;
} }
if (tEncodeI64(&encoder, pReq->deleteMark) < 0) return -1;
tEndEncode(&encoder); tEndEncode(&encoder);
int32_t tlen = encoder.pos; int32_t tlen = encoder.pos;
@ -870,7 +876,7 @@ int32_t tDeserializeSMCreateSmaReq(void *buf, int32_t bufLen, SMCreateSmaReq *pR
if (pReq->ast == NULL) return -1; if (pReq->ast == NULL) return -1;
if (tDecodeCStrTo(&decoder, pReq->ast) < 0) return -1; if (tDecodeCStrTo(&decoder, pReq->ast) < 0) return -1;
} }
if (tDecodeI64(&decoder, &pReq->deleteMark) < 0) return -1;
tEndDecode(&decoder); tEndDecode(&decoder);
tDecoderClear(&decoder); tDecoderClear(&decoder);
return 0; return 0;

View File

@ -48,6 +48,11 @@ int32_t dmProcessNodeMsg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg) {
return (*msgFp)(pWrapper->pMgmt, pMsg); return (*msgFp)(pWrapper->pMgmt, pMsg);
} }
static bool dmFailFastFp(tmsg_t msgType) {
// add more msg type later
return msgType == TDMT_SYNC_HEARTBEAT;
}
static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) { static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) {
SDnodeTrans *pTrans = &pDnode->trans; SDnodeTrans *pTrans = &pDnode->trans;
int32_t code = -1; int32_t code = -1;
@ -260,6 +265,10 @@ int32_t dmInitClient(SDnode *pDnode) {
rpcInit.retryMaxInterval = tsRedirectMaxPeriod; rpcInit.retryMaxInterval = tsRedirectMaxPeriod;
rpcInit.retryMaxTimouet = tsMaxRetryWaitTime; rpcInit.retryMaxTimouet = tsMaxRetryWaitTime;
rpcInit.failFastInterval = 1000; // interval threshold(ms)
rpcInit.failFastThreshold = 3; // failed threshold
rpcInit.ffp = dmFailFastFp;
pTrans->clientRpc = rpcOpen(&rpcInit); pTrans->clientRpc = rpcOpen(&rpcInit);
if (pTrans->clientRpc == NULL) { if (pTrans->clientRpc == NULL) {
dError("failed to init dnode rpc client"); dError("failed to init dnode rpc client");

View File

@ -645,6 +645,7 @@ typedef struct {
// 3.0.20 // 3.0.20
int64_t checkpointFreq; // ms int64_t checkpointFreq; // ms
int64_t currentTick; // do not serialize int64_t currentTick; // do not serialize
int64_t deleteMark;
} SStreamObj; } SStreamObj;
int32_t tEncodeSStreamObj(SEncoder* pEncoder, const SStreamObj* pObj); int32_t tEncodeSStreamObj(SEncoder* pEncoder, const SStreamObj* pObj);

View File

@ -28,7 +28,7 @@ void mndCleanupScheduler(SMnode* pMnode);
int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscribeObj* pSub); int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscribeObj* pSub);
int32_t mndConvertRsmaTask(char** pDst, int32_t* pDstLen, const char* ast, int64_t uid, int8_t triggerType, int32_t mndConvertRsmaTask(char** pDst, int32_t* pDstLen, const char* ast, int64_t uid, int8_t triggerType,
int64_t watermark); int64_t watermark, int64_t deleteMark);
int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream); int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream);

View File

@ -383,9 +383,9 @@ static int32_t mndProcessStatusReq(SRpcMsg *pReq) {
pGid->syncCanRead != pVload->syncCanRead) { pGid->syncCanRead != pVload->syncCanRead) {
mInfo( mInfo(
"vgId:%d, state changed by status msg, old state:%s restored:%d canRead:%d new state:%s restored:%d " "vgId:%d, state changed by status msg, old state:%s restored:%d canRead:%d new state:%s restored:%d "
"canRead:%d", "canRead:%d, dnode:%d",
pVgroup->vgId, syncStr(pGid->syncState), pGid->syncRestore, pGid->syncCanRead, pVgroup->vgId, syncStr(pGid->syncState), pGid->syncRestore, pGid->syncCanRead,
syncStr(pVload->syncState), pVload->syncRestore, pVload->syncCanRead); syncStr(pVload->syncState), pVload->syncRestore, pVload->syncCanRead, pDnode->id);
pGid->syncState = pVload->syncState; pGid->syncState = pVload->syncState;
pGid->syncRestore = pVload->syncRestore; pGid->syncRestore = pVload->syncRestore;
pGid->syncCanRead = pVload->syncCanRead; pGid->syncCanRead = pVload->syncCanRead;

View File

@ -42,7 +42,7 @@ static int32_t mndAddTaskToTaskSet(SArray* pArray, SStreamTask* pTask) {
} }
int32_t mndConvertRsmaTask(char** pDst, int32_t* pDstLen, const char* ast, int64_t uid, int8_t triggerType, int32_t mndConvertRsmaTask(char** pDst, int32_t* pDstLen, const char* ast, int64_t uid, int8_t triggerType,
int64_t watermark) { int64_t watermark, int64_t deleteMark) {
SNode* pAst = NULL; SNode* pAst = NULL;
SQueryPlan* pPlan = NULL; SQueryPlan* pPlan = NULL;
terrno = TSDB_CODE_SUCCESS; terrno = TSDB_CODE_SUCCESS;
@ -64,6 +64,7 @@ int32_t mndConvertRsmaTask(char** pDst, int32_t* pDstLen, const char* ast, int64
.rSmaQuery = true, .rSmaQuery = true,
.triggerType = triggerType, .triggerType = triggerType,
.watermark = watermark, .watermark = watermark,
.deleteMark = deleteMark,
}; };
if (qCreateQueryPlan(&cxt, &pPlan, NULL) < 0) { if (qCreateQueryPlan(&cxt, &pPlan, NULL) < 0) {

View File

@ -534,6 +534,7 @@ static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCrea
streamObj.sql = strdup(pCreate->sql); streamObj.sql = strdup(pCreate->sql);
streamObj.smaId = smaObj.uid; streamObj.smaId = smaObj.uid;
streamObj.watermark = pCreate->watermark; streamObj.watermark = pCreate->watermark;
streamObj.deleteMark = pCreate->deleteMark;
streamObj.fillHistory = STREAM_FILL_HISTORY_ON; streamObj.fillHistory = STREAM_FILL_HISTORY_ON;
streamObj.trigger = STREAM_TRIGGER_WINDOW_CLOSE; streamObj.trigger = STREAM_TRIGGER_WINDOW_CLOSE;
streamObj.triggerParam = pCreate->maxDelay; streamObj.triggerParam = pCreate->maxDelay;
@ -574,6 +575,7 @@ static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCrea
.streamQuery = true, .streamQuery = true,
.triggerType = streamObj.trigger, .triggerType = streamObj.trigger,
.watermark = streamObj.watermark, .watermark = streamObj.watermark,
.deleteMark = streamObj.deleteMark,
}; };
if (qCreateQueryPlan(&cxt, &pPlan, NULL) < 0) { if (qCreateQueryPlan(&cxt, &pPlan, NULL) < 0) {

View File

@ -450,13 +450,15 @@ static void *mndBuildVCreateStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pSt
req.rsmaParam.watermark[1] = pStb->watermark[1]; req.rsmaParam.watermark[1] = pStb->watermark[1];
if (pStb->ast1Len > 0) { if (pStb->ast1Len > 0) {
if (mndConvertRsmaTask(&req.rsmaParam.qmsg[0], &req.rsmaParam.qmsgLen[0], pStb->pAst1, pStb->uid, if (mndConvertRsmaTask(&req.rsmaParam.qmsg[0], &req.rsmaParam.qmsgLen[0], pStb->pAst1, pStb->uid,
STREAM_TRIGGER_WINDOW_CLOSE, req.rsmaParam.watermark[0]) < 0) { STREAM_TRIGGER_WINDOW_CLOSE, req.rsmaParam.watermark[0],
req.rsmaParam.deleteMark[0]) < 0) {
goto _err; goto _err;
} }
} }
if (pStb->ast2Len > 0) { if (pStb->ast2Len > 0) {
if (mndConvertRsmaTask(&req.rsmaParam.qmsg[1], &req.rsmaParam.qmsgLen[1], pStb->pAst2, pStb->uid, if (mndConvertRsmaTask(&req.rsmaParam.qmsg[1], &req.rsmaParam.qmsgLen[1], pStb->pAst2, pStb->uid,
STREAM_TRIGGER_WINDOW_CLOSE, req.rsmaParam.watermark[1]) < 0) { STREAM_TRIGGER_WINDOW_CLOSE, req.rsmaParam.watermark[1],
req.rsmaParam.deleteMark[1]) < 0) {
goto _err; goto _err;
} }
} }

View File

@ -1126,34 +1126,61 @@ int32_t mndSetMoveVgroupInfoToTrans(SMnode *pMnode, STrans *pTrans, SDbObj *pDb,
} }
if (!force) { if (!force) {
mInfo("vgId:%d, will add 1 vnode", pVgroup->vgId); if (newVg.replica == 1) {
if (mndAddVnodeToVgroup(pMnode, pTrans, &newVg, pArray) != 0) return -1; mInfo("vgId:%d, will add 1 vnode, replca:1", pVgroup->vgId);
for (int32_t i = 0; i < newVg.replica - 1; ++i) { if (mndAddVnodeToVgroup(pMnode, pTrans, &newVg, pArray) != 0) return -1;
if (mndAddAlterVnodeReplicaAction(pMnode, pTrans, pDb, &newVg, newVg.vnodeGid[i].dnodeId) != 0) return -1; for (int32_t i = 0; i < newVg.replica - 1; ++i) {
} if (mndAddAlterVnodeReplicaAction(pMnode, pTrans, pDb, &newVg, newVg.vnodeGid[i].dnodeId) != 0) return -1;
if (mndAddCreateVnodeAction(pMnode, pTrans, pDb, &newVg, &newVg.vnodeGid[newVg.replica - 1]) != 0) return -1;
if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVg) != 0) return -1;
mInfo("vgId:%d, will remove 1 vnode", pVgroup->vgId);
newVg.replica--;
SVnodeGid del = newVg.vnodeGid[vnIndex];
newVg.vnodeGid[vnIndex] = newVg.vnodeGid[newVg.replica];
memset(&newVg.vnodeGid[newVg.replica], 0, sizeof(SVnodeGid));
{
SSdbRaw *pRaw = mndVgroupActionEncode(&newVg);
if (pRaw == NULL) return -1;
if (mndTransAppendRedolog(pTrans, pRaw) != 0) {
sdbFreeRaw(pRaw);
return -1;
} }
(void)sdbSetRawStatus(pRaw, SDB_STATUS_READY); if (mndAddCreateVnodeAction(pMnode, pTrans, pDb, &newVg, &newVg.vnodeGid[newVg.replica - 1]) != 0) return -1;
} if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVg) != 0) return -1;
if (mndAddDropVnodeAction(pMnode, pTrans, pDb, &newVg, &del, true) != 0) return -1; mInfo("vgId:%d, will remove 1 vnode, replca:2", pVgroup->vgId);
for (int32_t i = 0; i < newVg.replica; ++i) { newVg.replica--;
if (mndAddAlterVnodeReplicaAction(pMnode, pTrans, pDb, &newVg, newVg.vnodeGid[i].dnodeId) != 0) return -1; SVnodeGid del = newVg.vnodeGid[vnIndex];
newVg.vnodeGid[vnIndex] = newVg.vnodeGid[newVg.replica];
memset(&newVg.vnodeGid[newVg.replica], 0, sizeof(SVnodeGid));
{
SSdbRaw *pRaw = mndVgroupActionEncode(&newVg);
if (pRaw == NULL) return -1;
if (mndTransAppendRedolog(pTrans, pRaw) != 0) {
sdbFreeRaw(pRaw);
return -1;
}
(void)sdbSetRawStatus(pRaw, SDB_STATUS_READY);
}
if (mndAddDropVnodeAction(pMnode, pTrans, pDb, &newVg, &del, true) != 0) return -1;
for (int32_t i = 0; i < newVg.replica; ++i) {
if (mndAddAlterVnodeReplicaAction(pMnode, pTrans, pDb, &newVg, newVg.vnodeGid[i].dnodeId) != 0) return -1;
}
if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVg) != 0) return -1;
} else { // new replica == 3
mInfo("vgId:%d, will add 1 vnode, replca:3", pVgroup->vgId);
if (mndAddVnodeToVgroup(pMnode, pTrans, &newVg, pArray) != 0) return -1;
mInfo("vgId:%d, will remove 1 vnode, replca:4", pVgroup->vgId);
newVg.replica--;
SVnodeGid del = newVg.vnodeGid[vnIndex];
newVg.vnodeGid[vnIndex] = newVg.vnodeGid[newVg.replica];
memset(&newVg.vnodeGid[newVg.replica], 0, sizeof(SVnodeGid));
{
SSdbRaw *pRaw = mndVgroupActionEncode(&newVg);
if (pRaw == NULL) return -1;
if (mndTransAppendRedolog(pTrans, pRaw) != 0) {
sdbFreeRaw(pRaw);
return -1;
}
(void)sdbSetRawStatus(pRaw, SDB_STATUS_READY);
}
if (mndAddDropVnodeAction(pMnode, pTrans, pDb, &newVg, &del, true) != 0) return -1;
for (int32_t i = 0; i < newVg.replica; ++i) {
if (i == vnIndex) continue;
if (mndAddAlterVnodeReplicaAction(pMnode, pTrans, pDb, &newVg, newVg.vnodeGid[i].dnodeId) != 0) return -1;
}
if (mndAddCreateVnodeAction(pMnode, pTrans, pDb, &newVg, &newVg.vnodeGid[vnIndex]) != 0) return -1;
if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVg) != 0) return -1;
} }
if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVg) != 0) return -1;
} else { } else {
mInfo("vgId:%d, will add 1 vnode and force remove 1 vnode", pVgroup->vgId); mInfo("vgId:%d, will add 1 vnode and force remove 1 vnode", pVgroup->vgId);
if (mndAddVnodeToVgroup(pMnode, pTrans, &newVg, pArray) != 0) return -1; if (mndAddVnodeToVgroup(pMnode, pTrans, &newVg, pArray) != 0) return -1;

View File

@ -214,6 +214,7 @@ static bool hasDataInLastBlock(SLastBlockReader* pLastBlockReader);
static int32_t doBuildDataBlock(STsdbReader* pReader); static int32_t doBuildDataBlock(STsdbReader* pReader);
static TSDBKEY getCurrentKeyInBuf(STableBlockScanInfo* pScanInfo, STsdbReader* pReader); static TSDBKEY getCurrentKeyInBuf(STableBlockScanInfo* pScanInfo, STsdbReader* pReader);
static bool hasDataInFileBlock(const SBlockData* pBlockData, const SFileBlockDumpInfo* pDumpInfo); static bool hasDataInFileBlock(const SBlockData* pBlockData, const SFileBlockDumpInfo* pDumpInfo);
static void initBlockDumpInfo(STsdbReader* pReader, SDataBlockIter* pBlockIter);
static bool outOfTimeWindow(int64_t ts, STimeWindow* pWindow) { return (ts > pWindow->ekey) || (ts < pWindow->skey); } static bool outOfTimeWindow(int64_t ts, STimeWindow* pWindow) { return (ts > pWindow->ekey) || (ts < pWindow->skey); }
@ -2477,8 +2478,39 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) {
SDataBlk* pBlock = getCurrentBlock(&pReader->status.blockIter); SDataBlk* pBlock = getCurrentBlock(&pReader->status.blockIter);
if (pDumpInfo->rowIndex >= pBlock->nRow || pDumpInfo->rowIndex < 0) { if (pDumpInfo->rowIndex >= pBlock->nRow || pDumpInfo->rowIndex < 0) {
setBlockAllDumped(pDumpInfo, pBlock->maxKey.ts, pReader->order); int32_t nextIndex = -1;
break; SBlockIndex bIndex = {0};
bool hasNeighbor =
getNeighborBlockOfSameTable(pBlockInfo, pBlockScanInfo, &nextIndex, pReader->order, &bIndex);
if (!hasNeighbor) { // do nothing
setBlockAllDumped(pDumpInfo, pBlock->maxKey.ts, pReader->order);
break;
}
if (overlapWithNeighborBlock(pBlock, &bIndex, pReader->order)) { // load next block
SReaderStatus* pStatus = &pReader->status;
SDataBlockIter* pBlockIter = &pStatus->blockIter;
// 1. find the next neighbor block in the scan block list
SFileDataBlockInfo fb = {.uid = pBlockInfo->uid, .tbBlockIdx = nextIndex};
int32_t neighborIndex = findFileBlockInfoIndex(pBlockIter, &fb);
// 2. remove it from the scan block list
setFileBlockActiveInBlockIter(pBlockIter, neighborIndex, step);
// 3. load the neighbor block, and set it to be the currently accessed file data block
code = doLoadFileBlockData(pReader, pBlockIter, &pStatus->fileBlockData, pBlockInfo->uid);
if (code != TSDB_CODE_SUCCESS) {
setBlockAllDumped(pDumpInfo, pBlock->maxKey.ts, pReader->order);
break;
}
// 4. check the data values
initBlockDumpInfo(pReader, pBlockIter);
} else {
setBlockAllDumped(pDumpInfo, pBlock->maxKey.ts, pReader->order);
break;
}
} }
} }
} }
@ -2887,7 +2919,7 @@ static int32_t buildBlockFromBufferSequentially(STsdbReader* pReader) {
} }
// set the correct start position in case of the first/last file block, according to the query time window // set the correct start position in case of the first/last file block, according to the query time window
static void initBlockDumpInfo(STsdbReader* pReader, SDataBlockIter* pBlockIter) { void initBlockDumpInfo(STsdbReader* pReader, SDataBlockIter* pBlockIter) {
SDataBlk* pBlock = getCurrentBlock(pBlockIter); SDataBlk* pBlock = getCurrentBlock(pBlockIter);
SReaderStatus* pStatus = &pReader->status; SReaderStatus* pStatus = &pReader->status;
@ -4067,6 +4099,29 @@ void tsdbRetrieveDataBlockInfo(const STsdbReader* pReader, int32_t* rows, uint64
} }
} }
static void doFillNullColSMA(SBlockLoadSuppInfo* pSup, int32_t numOfRows, int32_t numOfCols, SColumnDataAgg* pTsAgg) {
// do fill all null column value SMA info
int32_t i = 0, j = 0;
int32_t size = (int32_t)taosArrayGetSize(pSup->pColAgg);
taosArrayInsert(pSup->pColAgg, 0, pTsAgg);
while (j < numOfCols && i < size) {
SColumnDataAgg* pAgg = taosArrayGet(pSup->pColAgg, i);
if (pAgg->colId == pSup->colId[j]) {
i += 1;
j += 1;
} else if (pAgg->colId < pSup->colId[j]) {
i += 1;
} else if (pSup->colId[j] < pAgg->colId) {
if (pSup->colId[j] != PRIMARYKEY_TIMESTAMP_COL_ID) {
SColumnDataAgg nullColAgg = {.colId = pSup->colId[j], .numOfNull = numOfRows};
taosArrayInsert(pSup->pColAgg, i, &nullColAgg);
}
j += 1;
}
}
}
int32_t tsdbRetrieveDatablockSMA(STsdbReader* pReader, SColumnDataAgg*** pBlockSMA, bool* allHave) { int32_t tsdbRetrieveDatablockSMA(STsdbReader* pReader, SColumnDataAgg*** pBlockSMA, bool* allHave) {
int32_t code = 0; int32_t code = 0;
*allHave = false; *allHave = false;
@ -4122,6 +4177,10 @@ int32_t tsdbRetrieveDatablockSMA(STsdbReader* pReader, SColumnDataAgg*** pBlockS
pResBlock->pBlockAgg = taosMemoryCalloc(num, sizeof(SColumnDataAgg)); pResBlock->pBlockAgg = taosMemoryCalloc(num, sizeof(SColumnDataAgg));
} }
// do fill all null column value SMA info
doFillNullColSMA(pSup, pBlock->nRow, numOfCols, pTsAgg);
i = 0, j = 0;
while (j < numOfCols && i < size) { while (j < numOfCols && i < size) {
SColumnDataAgg* pAgg = taosArrayGet(pSup->pColAgg, i); SColumnDataAgg* pAgg = taosArrayGet(pSup->pColAgg, i);
if (pAgg->colId == pSup->colId[j]) { if (pAgg->colId == pSup->colId[j]) {

View File

@ -439,7 +439,9 @@ static SColumnInfoData* getColInfoResult(void* metaHandle, int64_t suid, SArray*
goto end; goto end;
} }
} }
removeInvalidTable(uidList, tags); if (suid != 0) {
removeInvalidTable(uidList, tags);
}
int32_t rows = taosArrayGetSize(uidList); int32_t rows = taosArrayGetSize(uidList);
if (rows == 0) { if (rows == 0) {
@ -1604,7 +1606,7 @@ int32_t initQueryTableDataCond(SQueryTableDataCond* pCond, const STableScanPhysi
pCond->numOfCols = LIST_LENGTH(pTableScanNode->scan.pScanCols); pCond->numOfCols = LIST_LENGTH(pTableScanNode->scan.pScanCols);
pCond->colList = taosMemoryCalloc(pCond->numOfCols, sizeof(SColumnInfo)); pCond->colList = taosMemoryCalloc(pCond->numOfCols, sizeof(SColumnInfo));
pCond->pSlotList = taosMemoryMalloc(sizeof(int32_t)*pCond->numOfCols); pCond->pSlotList = taosMemoryMalloc(sizeof(int32_t) * pCond->numOfCols);
if (pCond->colList == NULL || pCond->pSlotList == NULL) { if (pCond->colList == NULL || pCond->pSlotList == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
taosMemoryFreeClear(pCond->colList); taosMemoryFreeClear(pCond->colList);

View File

@ -1580,6 +1580,7 @@ void destroyOperatorInfo(SOperatorInfo* pOperator) {
int32_t optrDefaultBufFn(SOperatorInfo* pOperator) { int32_t optrDefaultBufFn(SOperatorInfo* pOperator) {
if (pOperator->blocking) { if (pOperator->blocking) {
ASSERT(0); ASSERT(0);
return 0;
} else { } else {
return 0; return 0;
} }

View File

@ -1218,27 +1218,56 @@ static int32_t generateIntervalScanRange(SStreamScanInfo* pInfo, SSDataBlock* pS
if (rows == 0) { if (rows == 0) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
SColumnInfoData* pSrcStartTsCol = (SColumnInfoData*)taosArrayGet(pSrcBlock->pDataBlock, START_TS_COLUMN_INDEX);
SColumnInfoData* pSrcEndTsCol = (SColumnInfoData*)taosArrayGet(pSrcBlock->pDataBlock, END_TS_COLUMN_INDEX);
SColumnInfoData* pSrcUidCol = taosArrayGet(pSrcBlock->pDataBlock, UID_COLUMN_INDEX);
SColumnInfoData* pSrcGpCol = taosArrayGet(pSrcBlock->pDataBlock, GROUPID_COLUMN_INDEX);
uint64_t* srcUidData = (uint64_t*)pSrcUidCol->pData;
ASSERT(pSrcStartTsCol->info.type == TSDB_DATA_TYPE_TIMESTAMP);
TSKEY* srcStartTsCol = (TSKEY*)pSrcStartTsCol->pData;
TSKEY* srcEndTsCol = (TSKEY*)pSrcEndTsCol->pData;
int64_t version = pSrcBlock->info.version - 1;
if (pInfo->partitionSup.needCalc && srcStartTsCol[0] != srcEndTsCol[0]) {
uint64_t srcUid = srcUidData[0];
TSKEY startTs = srcStartTsCol[0];
TSKEY endTs = srcEndTsCol[0];
SSDataBlock* pPreRes = readPreVersionData(pInfo->pTableScanOp, srcUid, startTs, endTs, version);
printDataBlock(pPreRes, "pre res");
blockDataCleanup(pSrcBlock);
int32_t code = blockDataEnsureCapacity(pSrcBlock, pPreRes->info.rows);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
SColumnInfoData* pTsCol = (SColumnInfoData*)taosArrayGet(pPreRes->pDataBlock, pInfo->primaryTsIndex);
rows = pPreRes->info.rows;
for (int32_t i = 0; i < rows; i++) {
uint64_t groupId = calGroupIdByData(&pInfo->partitionSup, pInfo->pPartScalarSup, pPreRes, i);
appendOneRowToStreamSpecialBlock(pSrcBlock, ((TSKEY*)pTsCol->pData) + i, ((TSKEY*)pTsCol->pData) + i, &srcUid,
&groupId, NULL);
}
printDataBlock(pSrcBlock, "new delete");
}
uint64_t* srcGp = (uint64_t*)pSrcGpCol->pData;
srcStartTsCol = (TSKEY*)pSrcStartTsCol->pData;
srcEndTsCol = (TSKEY*)pSrcEndTsCol->pData;
srcUidData = (uint64_t*)pSrcUidCol->pData;
int32_t code = blockDataEnsureCapacity(pDestBlock, rows); int32_t code = blockDataEnsureCapacity(pDestBlock, rows);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
SColumnInfoData* pSrcStartTsCol = (SColumnInfoData*)taosArrayGet(pSrcBlock->pDataBlock, START_TS_COLUMN_INDEX);
SColumnInfoData* pSrcEndTsCol = (SColumnInfoData*)taosArrayGet(pSrcBlock->pDataBlock, END_TS_COLUMN_INDEX);
SColumnInfoData* pSrcUidCol = taosArrayGet(pSrcBlock->pDataBlock, UID_COLUMN_INDEX);
uint64_t* srcUidData = (uint64_t*)pSrcUidCol->pData;
SColumnInfoData* pSrcGpCol = taosArrayGet(pSrcBlock->pDataBlock, GROUPID_COLUMN_INDEX);
uint64_t* srcGp = (uint64_t*)pSrcGpCol->pData;
ASSERT(pSrcStartTsCol->info.type == TSDB_DATA_TYPE_TIMESTAMP);
TSKEY* srcStartTsCol = (TSKEY*)pSrcStartTsCol->pData;
TSKEY* srcEndTsCol = (TSKEY*)pSrcEndTsCol->pData;
SColumnInfoData* pStartTsCol = taosArrayGet(pDestBlock->pDataBlock, START_TS_COLUMN_INDEX); SColumnInfoData* pStartTsCol = taosArrayGet(pDestBlock->pDataBlock, START_TS_COLUMN_INDEX);
SColumnInfoData* pEndTsCol = taosArrayGet(pDestBlock->pDataBlock, END_TS_COLUMN_INDEX); SColumnInfoData* pEndTsCol = taosArrayGet(pDestBlock->pDataBlock, END_TS_COLUMN_INDEX);
SColumnInfoData* pDeUidCol = taosArrayGet(pDestBlock->pDataBlock, UID_COLUMN_INDEX); SColumnInfoData* pDeUidCol = taosArrayGet(pDestBlock->pDataBlock, UID_COLUMN_INDEX);
SColumnInfoData* pGpCol = taosArrayGet(pDestBlock->pDataBlock, GROUPID_COLUMN_INDEX); SColumnInfoData* pGpCol = taosArrayGet(pDestBlock->pDataBlock, GROUPID_COLUMN_INDEX);
SColumnInfoData* pCalStartTsCol = taosArrayGet(pDestBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX); SColumnInfoData* pCalStartTsCol = taosArrayGet(pDestBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX);
SColumnInfoData* pCalEndTsCol = taosArrayGet(pDestBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX); SColumnInfoData* pCalEndTsCol = taosArrayGet(pDestBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX);
int64_t version = pSrcBlock->info.version - 1;
for (int32_t i = 0; i < rows;) { for (int32_t i = 0; i < rows;) {
uint64_t srcUid = srcUidData[i]; uint64_t srcUid = srcUidData[i];
uint64_t groupId = srcGp[i]; uint64_t groupId = srcGp[i];
@ -1658,13 +1687,6 @@ static void setBlockGroupIdByUid(SStreamScanInfo* pInfo, SSDataBlock* pBlock) {
uint64_t groupId = getGroupIdByUid(pInfo, uidCol[i]); uint64_t groupId = getGroupIdByUid(pInfo, uidCol[i]);
colDataAppend(pGpCol, i, (const char*)&groupId, false); colDataAppend(pGpCol, i, (const char*)&groupId, false);
} }
} else {
// SSDataBlock* pPreRes = readPreVersionData(pInfo->pTableScanOp, uidCol[i], startTsCol, ts, maxVersion);
// if (!pPreRes || pPreRes->info.rows == 0) {
// return 0;
// }
// ASSERT(pPreRes->info.rows == 1);
// return calGroupIdByData(&pInfo->partitionSup, pInfo->pPartScalarSup, pPreRes, 0);
} }
} }
@ -3038,8 +3060,10 @@ void fillTableCountScanDataBlock(STableCountScanSupp* pSupp, char* dbName, char*
if (pSupp->dbNameSlotId != -1) { if (pSupp->dbNameSlotId != -1) {
ASSERT(strlen(dbName)); ASSERT(strlen(dbName));
SColumnInfoData* colInfoData = taosArrayGet(pRes->pDataBlock, pSupp->dbNameSlotId); SColumnInfoData* colInfoData = taosArrayGet(pRes->pDataBlock, pSupp->dbNameSlotId);
char varDbName[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
strncpy(varDataVal(varDbName), dbName, strlen(dbName)); char varDbName[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
tstrncpy(varDataVal(varDbName), dbName, TSDB_DB_NAME_LEN);
varDataSetLen(varDbName, strlen(dbName)); varDataSetLen(varDbName, strlen(dbName));
colDataAppend(colInfoData, 0, varDbName, false); colDataAppend(colInfoData, 0, varDbName, false);
} }
@ -3048,7 +3072,7 @@ void fillTableCountScanDataBlock(STableCountScanSupp* pSupp, char* dbName, char*
SColumnInfoData* colInfoData = taosArrayGet(pRes->pDataBlock, pSupp->stbNameSlotId); SColumnInfoData* colInfoData = taosArrayGet(pRes->pDataBlock, pSupp->stbNameSlotId);
if (strlen(stbName) != 0) { if (strlen(stbName) != 0) {
char varStbName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0}; char varStbName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
strncpy(varDataVal(varStbName), stbName, strlen(stbName)); strncpy(varDataVal(varStbName), stbName, TSDB_TABLE_NAME_LEN);
varDataSetLen(varStbName, strlen(stbName)); varDataSetLen(varStbName, strlen(stbName));
colDataAppend(colInfoData, 0, varStbName, false); colDataAppend(colInfoData, 0, varStbName, false);
} else { } else {

View File

@ -504,7 +504,7 @@ static int32_t getNumOfElems(SqlFunctionCtx* pCtx) {
*/ */
SInputColumnInfoData* pInput = &pCtx->input; SInputColumnInfoData* pInput = &pCtx->input;
SColumnInfoData* pInputCol = pInput->pData[0]; SColumnInfoData* pInputCol = pInput->pData[0];
if (pInput->colDataSMAIsSet && pInput->totalRows == pInput->numOfRows) { if (pInput->colDataSMAIsSet && pInput->totalRows == pInput->numOfRows && !IS_VAR_DATA_TYPE(pInputCol->info.type)) {
numOfElem = pInput->numOfRows - pInput->pColumnDataAgg[0]->numOfNull; numOfElem = pInput->numOfRows - pInput->pColumnDataAgg[0]->numOfNull;
ASSERT(numOfElem >= 0); ASSERT(numOfElem >= 0);
} else { } else {

View File

@ -378,6 +378,7 @@ static int32_t logicScanCopy(const SScanLogicNode* pSrc, SScanLogicNode* pDst) {
CLONE_NODE_FIELD(pTagIndexCond); CLONE_NODE_FIELD(pTagIndexCond);
COPY_SCALAR_FIELD(triggerType); COPY_SCALAR_FIELD(triggerType);
COPY_SCALAR_FIELD(watermark); COPY_SCALAR_FIELD(watermark);
COPY_SCALAR_FIELD(deleteMark);
COPY_SCALAR_FIELD(igExpired); COPY_SCALAR_FIELD(igExpired);
CLONE_NODE_LIST_FIELD(pGroupTags); CLONE_NODE_LIST_FIELD(pGroupTags);
COPY_SCALAR_FIELD(groupSort); COPY_SCALAR_FIELD(groupSort);
@ -463,6 +464,7 @@ static int32_t logicWindowCopy(const SWindowLogicNode* pSrc, SWindowLogicNode* p
CLONE_NODE_FIELD(pStateExpr); CLONE_NODE_FIELD(pStateExpr);
COPY_SCALAR_FIELD(triggerType); COPY_SCALAR_FIELD(triggerType);
COPY_SCALAR_FIELD(watermark); COPY_SCALAR_FIELD(watermark);
COPY_SCALAR_FIELD(deleteMark);
COPY_SCALAR_FIELD(igExpired); COPY_SCALAR_FIELD(igExpired);
COPY_SCALAR_FIELD(windowAlgo); COPY_SCALAR_FIELD(windowAlgo);
COPY_SCALAR_FIELD(inputTsOrder); COPY_SCALAR_FIELD(inputTsOrder);

View File

@ -819,6 +819,7 @@ static const char* jkWindowLogicPlanTspk = "Tspk";
static const char* jkWindowLogicPlanStateExpr = "StateExpr"; static const char* jkWindowLogicPlanStateExpr = "StateExpr";
static const char* jkWindowLogicPlanTriggerType = "TriggerType"; static const char* jkWindowLogicPlanTriggerType = "TriggerType";
static const char* jkWindowLogicPlanWatermark = "Watermark"; static const char* jkWindowLogicPlanWatermark = "Watermark";
static const char* jkWindowLogicPlanDeleteMark = "DeleteMark";
static int32_t logicWindowNodeToJson(const void* pObj, SJson* pJson) { static int32_t logicWindowNodeToJson(const void* pObj, SJson* pJson) {
const SWindowLogicNode* pNode = (const SWindowLogicNode*)pObj; const SWindowLogicNode* pNode = (const SWindowLogicNode*)pObj;
@ -860,6 +861,9 @@ static int32_t logicWindowNodeToJson(const void* pObj, SJson* pJson) {
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkWindowLogicPlanWatermark, pNode->watermark); code = tjsonAddIntegerToObject(pJson, jkWindowLogicPlanWatermark, pNode->watermark);
} }
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkWindowLogicPlanDeleteMark, pNode->deleteMark);
}
return code; return code;
} }
@ -904,6 +908,9 @@ static int32_t jsonToLogicWindowNode(const SJson* pJson, void* pObj) {
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetBigIntValue(pJson, jkWindowLogicPlanWatermark, &pNode->watermark); code = tjsonGetBigIntValue(pJson, jkWindowLogicPlanWatermark, &pNode->watermark);
} }
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetBigIntValue(pJson, jkWindowLogicPlanDeleteMark, &pNode->deleteMark);
}
return code; return code;
} }
@ -2004,6 +2011,7 @@ static const char* jkWindowPhysiPlanTsPk = "TsPk";
static const char* jkWindowPhysiPlanTsEnd = "TsEnd"; static const char* jkWindowPhysiPlanTsEnd = "TsEnd";
static const char* jkWindowPhysiPlanTriggerType = "TriggerType"; static const char* jkWindowPhysiPlanTriggerType = "TriggerType";
static const char* jkWindowPhysiPlanWatermark = "Watermark"; static const char* jkWindowPhysiPlanWatermark = "Watermark";
static const char* jkWindowPhysiPlanDeleteMark = "DeleteMark";
static const char* jkWindowPhysiPlanIgnoreExpired = "IgnoreExpired"; static const char* jkWindowPhysiPlanIgnoreExpired = "IgnoreExpired";
static const char* jkWindowPhysiPlanInputTsOrder = "InputTsOrder"; static const char* jkWindowPhysiPlanInputTsOrder = "InputTsOrder";
static const char* jkWindowPhysiPlanOutputTsOrder = "outputTsOrder"; static const char* jkWindowPhysiPlanOutputTsOrder = "outputTsOrder";
@ -2031,6 +2039,9 @@ static int32_t physiWindowNodeToJson(const void* pObj, SJson* pJson) {
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkWindowPhysiPlanWatermark, pNode->watermark); code = tjsonAddIntegerToObject(pJson, jkWindowPhysiPlanWatermark, pNode->watermark);
} }
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkWindowPhysiPlanDeleteMark, pNode->deleteMark);
}
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkWindowPhysiPlanIgnoreExpired, pNode->igExpired); code = tjsonAddIntegerToObject(pJson, jkWindowPhysiPlanIgnoreExpired, pNode->igExpired);
} }
@ -2069,6 +2080,9 @@ static int32_t jsonToPhysiWindowNode(const SJson* pJson, void* pObj) {
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetBigIntValue(pJson, jkWindowPhysiPlanWatermark, &pNode->watermark); code = tjsonGetBigIntValue(pJson, jkWindowPhysiPlanWatermark, &pNode->watermark);
} }
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetBigIntValue(pJson, jkWindowPhysiPlanDeleteMark, &pNode->deleteMark);
}
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetTinyIntValue(pJson, jkWindowPhysiPlanIgnoreExpired, &pNode->igExpired); code = tjsonGetTinyIntValue(pJson, jkWindowPhysiPlanIgnoreExpired, &pNode->igExpired);
} }
@ -3532,6 +3546,18 @@ static int32_t groupingSetNodeToJson(const void* pObj, SJson* pJson) {
return code; return code;
} }
static int32_t jsonToGroupingSetNode(const SJson* pJson, void* pObj) {
SGroupingSetNode* pNode = (SGroupingSetNode*)pObj;
int32_t code = TSDB_CODE_SUCCESS;
tjsonGetNumberValue(pJson, jkGroupingSetType, pNode->groupingSetType, code);
if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeList(pJson, jkGroupingSetParameter, &pNode->pParameterList);
}
return code;
}
static const char* jkOrderByExprExpr = "Expr"; static const char* jkOrderByExprExpr = "Expr";
static const char* jkOrderByExprOrder = "Order"; static const char* jkOrderByExprOrder = "Order";
static const char* jkOrderByExprNullOrder = "NullOrder"; static const char* jkOrderByExprNullOrder = "NullOrder";
@ -4729,6 +4755,8 @@ static int32_t jsonToSpecificNode(const SJson* pJson, void* pObj) {
return jsonToRealTableNode(pJson, pObj); return jsonToRealTableNode(pJson, pObj);
case QUERY_NODE_TEMP_TABLE: case QUERY_NODE_TEMP_TABLE:
return jsonToTempTableNode(pJson, pObj); return jsonToTempTableNode(pJson, pObj);
case QUERY_NODE_GROUPING_SET:
return jsonToGroupingSetNode(pJson, pObj);
case QUERY_NODE_ORDER_BY_EXPR: case QUERY_NODE_ORDER_BY_EXPR:
return jsonToOrderByExprNode(pJson, pObj); return jsonToOrderByExprNode(pJson, pObj);
case QUERY_NODE_LIMIT: case QUERY_NODE_LIMIT:

View File

@ -2607,6 +2607,7 @@ enum {
PHY_WINDOW_CODE_TS_END, PHY_WINDOW_CODE_TS_END,
PHY_WINDOW_CODE_TRIGGER_TYPE, PHY_WINDOW_CODE_TRIGGER_TYPE,
PHY_WINDOW_CODE_WATERMARK, PHY_WINDOW_CODE_WATERMARK,
PHY_WINDOW_CODE_DELETE_MARK,
PHY_WINDOW_CODE_IG_EXPIRED, PHY_WINDOW_CODE_IG_EXPIRED,
PHY_WINDOW_CODE_INPUT_TS_ORDER, PHY_WINDOW_CODE_INPUT_TS_ORDER,
PHY_WINDOW_CODE_OUTPUT_TS_ORDER, PHY_WINDOW_CODE_OUTPUT_TS_ORDER,
@ -2635,6 +2636,9 @@ static int32_t physiWindowNodeToMsg(const void* pObj, STlvEncoder* pEncoder) {
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeI64(pEncoder, PHY_WINDOW_CODE_WATERMARK, pNode->watermark); code = tlvEncodeI64(pEncoder, PHY_WINDOW_CODE_WATERMARK, pNode->watermark);
} }
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeI64(pEncoder, PHY_WINDOW_CODE_DELETE_MARK, pNode->deleteMark);
}
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeI8(pEncoder, PHY_WINDOW_CODE_IG_EXPIRED, pNode->igExpired); code = tlvEncodeI8(pEncoder, PHY_WINDOW_CODE_IG_EXPIRED, pNode->igExpired);
} }
@ -2679,6 +2683,9 @@ static int32_t msgToPhysiWindowNode(STlvDecoder* pDecoder, void* pObj) {
case PHY_WINDOW_CODE_WATERMARK: case PHY_WINDOW_CODE_WATERMARK:
code = tlvDecodeI64(pTlv, &pNode->watermark); code = tlvDecodeI64(pTlv, &pNode->watermark);
break; break;
case PHY_WINDOW_CODE_DELETE_MARK:
code = tlvDecodeI64(pTlv, &pNode->deleteMark);
break;
case PHY_WINDOW_CODE_IG_EXPIRED: case PHY_WINDOW_CODE_IG_EXPIRED:
code = tlvDecodeI8(pTlv, &pNode->igExpired); code = tlvDecodeI8(pTlv, &pNode->igExpired);
break; break;

View File

@ -596,6 +596,13 @@ static void destroyWinodwPhysiNode(SWinodwPhysiNode* pNode) {
nodesDestroyNode(pNode->pTsEnd); nodesDestroyNode(pNode->pTsEnd);
} }
static void destroyPartitionPhysiNode(SPartitionPhysiNode* pNode) {
destroyPhysiNode((SPhysiNode*)pNode);
nodesDestroyList(pNode->pExprs);
nodesDestroyList(pNode->pPartitionKeys);
nodesDestroyList(pNode->pTargets);
}
static void destroyScanPhysiNode(SScanPhysiNode* pNode) { static void destroyScanPhysiNode(SScanPhysiNode* pNode) {
destroyPhysiNode((SPhysiNode*)pNode); destroyPhysiNode((SPhysiNode*)pNode);
nodesDestroyList(pNode->pScanCols); nodesDestroyList(pNode->pScanCols);
@ -733,6 +740,7 @@ void nodesDestroyNode(SNode* pNode) {
nodesDestroyList(pOptions->pWatermark); nodesDestroyList(pOptions->pWatermark);
nodesDestroyList(pOptions->pRollupFuncs); nodesDestroyList(pOptions->pRollupFuncs);
nodesDestroyList(pOptions->pSma); nodesDestroyList(pOptions->pSma);
nodesDestroyList(pOptions->pDeleteMark);
break; break;
} }
case QUERY_NODE_INDEX_OPTIONS: { case QUERY_NODE_INDEX_OPTIONS: {
@ -750,6 +758,7 @@ void nodesDestroyNode(SNode* pNode) {
SStreamOptions* pOptions = (SStreamOptions*)pNode; SStreamOptions* pOptions = (SStreamOptions*)pNode;
nodesDestroyNode(pOptions->pDelay); nodesDestroyNode(pOptions->pDelay);
nodesDestroyNode(pOptions->pWatermark); nodesDestroyNode(pOptions->pWatermark);
nodesDestroyNode(pOptions->pDeleteMark);
break; break;
} }
case QUERY_NODE_LEFT_VALUE: // no pointer field case QUERY_NODE_LEFT_VALUE: // no pointer field
@ -906,6 +915,8 @@ void nodesDestroyNode(SNode* pNode) {
SCreateStreamStmt* pStmt = (SCreateStreamStmt*)pNode; SCreateStreamStmt* pStmt = (SCreateStreamStmt*)pNode;
nodesDestroyNode((SNode*)pStmt->pOptions); nodesDestroyNode((SNode*)pStmt->pOptions);
nodesDestroyNode(pStmt->pQuery); nodesDestroyNode(pStmt->pQuery);
nodesDestroyList(pStmt->pTags);
nodesDestroyNode(pStmt->pSubtable);
break; break;
} }
case QUERY_NODE_DROP_STREAM_STMT: // no pointer field case QUERY_NODE_DROP_STREAM_STMT: // no pointer field
@ -1021,6 +1032,8 @@ void nodesDestroyNode(SNode* pNode) {
nodesDestroyNode(pLogicNode->pTagIndexCond); nodesDestroyNode(pLogicNode->pTagIndexCond);
taosArrayDestroyEx(pLogicNode->pSmaIndexes, destroySmaIndex); taosArrayDestroyEx(pLogicNode->pSmaIndexes, destroySmaIndex);
nodesDestroyList(pLogicNode->pGroupTags); nodesDestroyList(pLogicNode->pGroupTags);
nodesDestroyList(pLogicNode->pTags);
nodesDestroyNode(pLogicNode->pSubtable);
break; break;
} }
case QUERY_NODE_LOGIC_PLAN_JOIN: { case QUERY_NODE_LOGIC_PLAN_JOIN: {
@ -1093,6 +1106,8 @@ void nodesDestroyNode(SNode* pNode) {
SPartitionLogicNode* pLogicNode = (SPartitionLogicNode*)pNode; SPartitionLogicNode* pLogicNode = (SPartitionLogicNode*)pNode;
destroyLogicNode((SLogicNode*)pLogicNode); destroyLogicNode((SLogicNode*)pLogicNode);
nodesDestroyList(pLogicNode->pPartitionKeys); nodesDestroyList(pLogicNode->pPartitionKeys);
nodesDestroyList(pLogicNode->pTags);
nodesDestroyNode(pLogicNode->pSubtable);
break; break;
} }
case QUERY_NODE_LOGIC_PLAN_INDEF_ROWS_FUNC: { case QUERY_NODE_LOGIC_PLAN_INDEF_ROWS_FUNC: {
@ -1123,10 +1138,10 @@ void nodesDestroyNode(SNode* pNode) {
case QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN: case QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN:
case QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN: case QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN:
case QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN: case QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN:
case QUERY_NODE_PHYSICAL_PLAN_TABLE_COUNT_SCAN:
destroyScanPhysiNode((SScanPhysiNode*)pNode); destroyScanPhysiNode((SScanPhysiNode*)pNode);
break; break;
case QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN: { case QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN:
case QUERY_NODE_PHYSICAL_PLAN_TABLE_COUNT_SCAN: {
SLastRowScanPhysiNode* pPhyNode = (SLastRowScanPhysiNode*)pNode; SLastRowScanPhysiNode* pPhyNode = (SLastRowScanPhysiNode*)pNode;
destroyScanPhysiNode((SScanPhysiNode*)pNode); destroyScanPhysiNode((SScanPhysiNode*)pNode);
nodesDestroyList(pPhyNode->pGroupTags); nodesDestroyList(pPhyNode->pGroupTags);
@ -1140,6 +1155,8 @@ void nodesDestroyNode(SNode* pNode) {
destroyScanPhysiNode((SScanPhysiNode*)pNode); destroyScanPhysiNode((SScanPhysiNode*)pNode);
nodesDestroyList(pPhyNode->pDynamicScanFuncs); nodesDestroyList(pPhyNode->pDynamicScanFuncs);
nodesDestroyList(pPhyNode->pGroupTags); nodesDestroyList(pPhyNode->pGroupTags);
nodesDestroyList(pPhyNode->pTags);
nodesDestroyNode(pPhyNode->pSubtable);
break; break;
} }
case QUERY_NODE_PHYSICAL_PLAN_PROJECT: { case QUERY_NODE_PHYSICAL_PLAN_PROJECT: {
@ -1216,13 +1233,15 @@ void nodesDestroyNode(SNode* pNode) {
nodesDestroyNode(pPhyNode->pStateKey); nodesDestroyNode(pPhyNode->pStateKey);
break; break;
} }
case QUERY_NODE_PHYSICAL_PLAN_PARTITION: case QUERY_NODE_PHYSICAL_PLAN_PARTITION: {
destroyPartitionPhysiNode((SPartitionPhysiNode*)pNode);
break;
}
case QUERY_NODE_PHYSICAL_PLAN_STREAM_PARTITION: { case QUERY_NODE_PHYSICAL_PLAN_STREAM_PARTITION: {
SPartitionPhysiNode* pPhyNode = (SPartitionPhysiNode*)pNode; SStreamPartitionPhysiNode* pPhyNode = (SStreamPartitionPhysiNode*)pNode;
destroyPhysiNode((SPhysiNode*)pPhyNode); destroyPartitionPhysiNode((SPartitionPhysiNode*)pPhyNode);
nodesDestroyList(pPhyNode->pExprs); nodesDestroyList(pPhyNode->pTags);
nodesDestroyList(pPhyNode->pPartitionKeys); nodesDestroyNode(pPhyNode->pSubtable);
nodesDestroyList(pPhyNode->pTargets);
break; break;
} }
case QUERY_NODE_PHYSICAL_PLAN_INDEF_ROWS_FUNC: { case QUERY_NODE_PHYSICAL_PLAN_INDEF_ROWS_FUNC: {

View File

@ -72,7 +72,8 @@ typedef enum ETableOptionType {
TABLE_OPTION_WATERMARK, TABLE_OPTION_WATERMARK,
TABLE_OPTION_ROLLUP, TABLE_OPTION_ROLLUP,
TABLE_OPTION_TTL, TABLE_OPTION_TTL,
TABLE_OPTION_SMA TABLE_OPTION_SMA,
TABLE_OPTION_DELETE_MARK
} ETableOptionType; } ETableOptionType;
typedef struct SAlterOption { typedef struct SAlterOption {

View File

@ -362,6 +362,7 @@ table_options(A) ::= table_options(B) WATERMARK duration_list(C).
table_options(A) ::= table_options(B) ROLLUP NK_LP rollup_func_list(C) NK_RP. { A = setTableOption(pCxt, B, TABLE_OPTION_ROLLUP, C); } table_options(A) ::= table_options(B) ROLLUP NK_LP rollup_func_list(C) NK_RP. { A = setTableOption(pCxt, B, TABLE_OPTION_ROLLUP, C); }
table_options(A) ::= table_options(B) TTL NK_INTEGER(C). { A = setTableOption(pCxt, B, TABLE_OPTION_TTL, &C); } table_options(A) ::= table_options(B) TTL NK_INTEGER(C). { A = setTableOption(pCxt, B, TABLE_OPTION_TTL, &C); }
table_options(A) ::= table_options(B) SMA NK_LP col_name_list(C) NK_RP. { A = setTableOption(pCxt, B, TABLE_OPTION_SMA, C); } table_options(A) ::= table_options(B) SMA NK_LP col_name_list(C) NK_RP. { A = setTableOption(pCxt, B, TABLE_OPTION_SMA, C); }
table_options(A) ::= table_options(B) DELETE_MARK duration_list(C). { A = setTableOption(pCxt, B, TABLE_OPTION_DELETE_MARK, C); }
alter_table_options(A) ::= alter_table_option(B). { A = createAlterTableOptions(pCxt); A = setTableOption(pCxt, A, B.type, &B.val); } alter_table_options(A) ::= alter_table_option(B). { A = createAlterTableOptions(pCxt); A = setTableOption(pCxt, A, B.type, &B.val); }
alter_table_options(A) ::= alter_table_options(B) alter_table_option(C). { A = setTableOption(pCxt, B, C.type, &C.val); } alter_table_options(A) ::= alter_table_options(B) alter_table_option(C). { A = setTableOption(pCxt, B, C.type, &C.val); }
@ -475,8 +476,9 @@ func_list(A) ::= func_list(B) NK_COMMA func(C).
func(A) ::= function_name(B) NK_LP expression_list(C) NK_RP. { A = createFunctionNode(pCxt, &B, C); } func(A) ::= function_name(B) NK_LP expression_list(C) NK_RP. { A = createFunctionNode(pCxt, &B, C); }
sma_stream_opt(A) ::= . { A = createStreamOptions(pCxt); } sma_stream_opt(A) ::= . { A = createStreamOptions(pCxt); }
sma_stream_opt(A) ::= stream_options(B) WATERMARK duration_literal(C). { ((SStreamOptions*)B)->pWatermark = releaseRawExprNode(pCxt, C); A = B; } sma_stream_opt(A) ::= sma_stream_opt(B) WATERMARK duration_literal(C). { ((SStreamOptions*)B)->pWatermark = releaseRawExprNode(pCxt, C); A = B; }
sma_stream_opt(A) ::= stream_options(B) MAX_DELAY duration_literal(C). { ((SStreamOptions*)B)->pDelay = releaseRawExprNode(pCxt, C); A = B; } sma_stream_opt(A) ::= sma_stream_opt(B) MAX_DELAY duration_literal(C). { ((SStreamOptions*)B)->pDelay = releaseRawExprNode(pCxt, C); A = B; }
sma_stream_opt(A) ::= sma_stream_opt(B) DELETE_MARK duration_literal(C). { ((SStreamOptions*)B)->pDeleteMark = releaseRawExprNode(pCxt, C); A = B; }
/************************************************ create/drop topic ***************************************************/ /************************************************ create/drop topic ***************************************************/
cmd ::= CREATE TOPIC not_exists_opt(A) topic_name(B) AS query_or_subquery(C). { pCxt->pRootNode = createCreateTopicStmtUseQuery(pCxt, A, &B, C); } cmd ::= CREATE TOPIC not_exists_opt(A) topic_name(B) AS query_or_subquery(C). { pCxt->pRootNode = createCreateTopicStmtUseQuery(pCxt, A, &B, C); }

View File

@ -1124,6 +1124,9 @@ SNode* setTableOption(SAstCreateContext* pCxt, SNode* pOptions, ETableOptionType
case TABLE_OPTION_SMA: case TABLE_OPTION_SMA:
((STableOptions*)pOptions)->pSma = pVal; ((STableOptions*)pOptions)->pSma = pVal;
break; break;
case TABLE_OPTION_DELETE_MARK:
((STableOptions*)pOptions)->pDeleteMark = pVal;
break;
default: default:
break; break;
} }

View File

@ -74,6 +74,7 @@ static SKeyword keywordTable[] = {
{"DATABASES", TK_DATABASES}, {"DATABASES", TK_DATABASES},
{"DBS", TK_DBS}, {"DBS", TK_DBS},
{"DELETE", TK_DELETE}, {"DELETE", TK_DELETE},
{"DELETE_MARK", TK_DELETE_MARK},
{"DESC", TK_DESC}, {"DESC", TK_DESC},
{"DESCRIBE", TK_DESCRIBE}, {"DESCRIBE", TK_DESCRIBE},
{"DISTINCT", TK_DISTINCT}, {"DISTINCT", TK_DISTINCT},

View File

@ -3804,10 +3804,11 @@ static int32_t buildCreateDbReq(STranslateContext* pCxt, SCreateDatabaseStmt* pS
return buildCreateDbRetentions(pStmt->pOptions->pRetentions, pReq); return buildCreateDbRetentions(pStmt->pOptions->pRetentions, pReq);
} }
static int32_t checkRangeOption(STranslateContext* pCxt, int32_t code, const char* pName, int32_t val, int32_t minVal, static int32_t checkRangeOption(STranslateContext* pCxt, int32_t code, const char* pName, int64_t val, int64_t minVal,
int32_t maxVal) { int64_t maxVal) {
if (val >= 0 && (val < minVal || val > maxVal)) { if (val >= 0 && (val < minVal || val > maxVal)) {
return generateSyntaxErrMsgExt(&pCxt->msgBuf, code, "Invalid option %s: %d valid range: [%d, %d]", pName, val, return generateSyntaxErrMsgExt(&pCxt->msgBuf, code,
"Invalid option %s: %" PRId64 " valid range: [%" PRId64 ", %" PRId64 "]", pName, val,
minVal, maxVal); minVal, maxVal);
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
@ -3818,8 +3819,8 @@ static int32_t checkDbRangeOption(STranslateContext* pCxt, const char* pName, in
return checkRangeOption(pCxt, TSDB_CODE_PAR_INVALID_DB_OPTION, pName, val, minVal, maxVal); return checkRangeOption(pCxt, TSDB_CODE_PAR_INVALID_DB_OPTION, pName, val, minVal, maxVal);
} }
static int32_t checkTableRangeOption(STranslateContext* pCxt, const char* pName, int32_t val, int32_t minVal, static int32_t checkTableRangeOption(STranslateContext* pCxt, const char* pName, int64_t val, int64_t minVal,
int32_t maxVal) { int64_t maxVal) {
return checkRangeOption(pCxt, TSDB_CODE_PAR_INVALID_TABLE_OPTION, pName, val, minVal, maxVal); return checkRangeOption(pCxt, TSDB_CODE_PAR_INVALID_TABLE_OPTION, pName, val, minVal, maxVal);
} }
@ -4463,6 +4464,37 @@ static int32_t checkTableWatermarkOption(STranslateContext* pCxt, STableOptions*
return code; return code;
} }
static int32_t getTableDeleteMarkOption(STranslateContext* pCxt, SValueNode* pVal, int64_t* pMaxDelay) {
return getTableDelayOrWatermarkOption(pCxt, "delete_mark", TSDB_MIN_ROLLUP_DELETE_MARK, TSDB_MAX_ROLLUP_DELETE_MARK,
pVal, pMaxDelay);
}
static int32_t checkTableDeleteMarkOption(STranslateContext* pCxt, STableOptions* pOptions, bool createStable,
SDbCfgInfo* pDbCfg) {
if (NULL == pOptions->pDeleteMark) {
return TSDB_CODE_SUCCESS;
}
if (!createStable || NULL == pDbCfg->pRetensions) {
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_TABLE_OPTION,
"Invalid option delete_mark: Only supported for create super table in databases "
"configured with the 'RETENTIONS' option");
}
if (LIST_LENGTH(pOptions->pDeleteMark) > 2) {
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_TABLE_OPTION, "Invalid option delete_mark");
}
int32_t code =
getTableDeleteMarkOption(pCxt, (SValueNode*)nodesListGetNode(pOptions->pDeleteMark, 0), &pOptions->deleteMark1);
if (TSDB_CODE_SUCCESS == code && 2 == LIST_LENGTH(pOptions->pDeleteMark)) {
code =
getTableDeleteMarkOption(pCxt, (SValueNode*)nodesListGetNode(pOptions->pDeleteMark, 1), &pOptions->deleteMark2);
}
return code;
}
static int32_t checkCreateTable(STranslateContext* pCxt, SCreateTableStmt* pStmt, bool createStable) { static int32_t checkCreateTable(STranslateContext* pCxt, SCreateTableStmt* pStmt, bool createStable) {
if (NULL != strchr(pStmt->tableName, '.')) { if (NULL != strchr(pStmt->tableName, '.')) {
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_IDENTIFIER_NAME, return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_IDENTIFIER_NAME,
@ -4482,6 +4514,9 @@ static int32_t checkCreateTable(STranslateContext* pCxt, SCreateTableStmt* pStmt
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = checkTableWatermarkOption(pCxt, pStmt->pOptions, createStable, &dbCfg); code = checkTableWatermarkOption(pCxt, pStmt->pOptions, createStable, &dbCfg);
} }
if (TSDB_CODE_SUCCESS == code) {
code = checkTableDeleteMarkOption(pCxt, pStmt->pOptions, createStable, &dbCfg);
}
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = checkTableRollupOption(pCxt, pStmt->pOptions->pRollupFuncs, createStable, &dbCfg); code = checkTableRollupOption(pCxt, pStmt->pOptions->pRollupFuncs, createStable, &dbCfg);
} }
@ -4749,6 +4784,8 @@ static int32_t buildCreateStbReq(STranslateContext* pCxt, SCreateTableStmt* pStm
pReq->delay2 = pStmt->pOptions->maxDelay2; pReq->delay2 = pStmt->pOptions->maxDelay2;
pReq->watermark1 = pStmt->pOptions->watermark1; pReq->watermark1 = pStmt->pOptions->watermark1;
pReq->watermark2 = pStmt->pOptions->watermark2; pReq->watermark2 = pStmt->pOptions->watermark2;
pReq->deleteMark1 = pStmt->pOptions->deleteMark1;
pReq->deleteMark2 = pStmt->pOptions->deleteMark2;
pReq->colVer = 1; pReq->colVer = 1;
pReq->tagVer = 1; pReq->tagVer = 1;
pReq->source = TD_REQ_FROM_APP; pReq->source = TD_REQ_FROM_APP;
@ -5144,20 +5181,34 @@ static int32_t buildCreateSmaReq(STranslateContext* pCxt, SCreateIndexStmt* pStm
(NULL != pStmt->pOptions->pSliding ? ((SValueNode*)pStmt->pOptions->pSliding)->datum.i : pReq->interval); (NULL != pStmt->pOptions->pSliding ? ((SValueNode*)pStmt->pOptions->pSliding)->datum.i : pReq->interval);
pReq->slidingUnit = pReq->slidingUnit =
(NULL != pStmt->pOptions->pSliding ? ((SValueNode*)pStmt->pOptions->pSliding)->unit : pReq->intervalUnit); (NULL != pStmt->pOptions->pSliding ? ((SValueNode*)pStmt->pOptions->pSliding)->unit : pReq->intervalUnit);
int32_t code = TSDB_CODE_SUCCESS;
if (NULL != pStmt->pOptions->pStreamOptions) { if (NULL != pStmt->pOptions->pStreamOptions) {
SStreamOptions* pStreamOpt = (SStreamOptions*)pStmt->pOptions->pStreamOptions; SStreamOptions* pStreamOpt = (SStreamOptions*)pStmt->pOptions->pStreamOptions;
pReq->maxDelay = (NULL != pStreamOpt->pDelay ? ((SValueNode*)pStreamOpt->pDelay)->datum.i : -1); if (NULL != pStreamOpt->pDelay) {
pReq->watermark = (NULL != pStreamOpt->pWatermark ? ((SValueNode*)pStreamOpt->pWatermark)->datum.i code = getTableMaxDelayOption(pCxt, (SValueNode*)pStreamOpt->pDelay, &pReq->maxDelay);
: TSDB_DEFAULT_ROLLUP_WATERMARK); } else {
if (pReq->watermark < TSDB_MIN_ROLLUP_WATERMARK) { pReq->maxDelay = -1;
pReq->watermark = TSDB_MIN_ROLLUP_WATERMARK;
} }
if (pReq->watermark > TSDB_MAX_ROLLUP_WATERMARK) { if (TSDB_CODE_SUCCESS == code) {
pReq->watermark = TSDB_MAX_ROLLUP_WATERMARK; if (NULL != pStreamOpt->pWatermark) {
code = getTableWatermarkOption(pCxt, (SValueNode*)pStreamOpt->pWatermark, &pReq->watermark);
} else {
pReq->watermark = TSDB_DEFAULT_ROLLUP_WATERMARK;
}
}
if (TSDB_CODE_SUCCESS == code) {
if (NULL != pStreamOpt->pDeleteMark) {
code = getTableDeleteMarkOption(pCxt, (SValueNode*)pStreamOpt->pDeleteMark, &pReq->deleteMark);
} else {
pReq->deleteMark = TSDB_DEFAULT_ROLLUP_DELETE_MARK;
}
} }
} }
int32_t code = getSmaIndexDstVgId(pCxt, pStmt->dbName, pStmt->tableName, &pReq->dstVgId); if (TSDB_CODE_SUCCESS == code) {
code = getSmaIndexDstVgId(pCxt, pStmt->dbName, pStmt->tableName, &pReq->dstVgId);
}
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = getSmaIndexSql(pCxt, &pReq->sql, &pReq->sqlLen); code = getSmaIndexSql(pCxt, &pReq->sql, &pReq->sqlLen);
} }
@ -5185,16 +5236,6 @@ static int32_t checkCreateSmaIndex(STranslateContext* pCxt, SCreateIndexStmt* pS
code = doTranslateValue(pCxt, (SValueNode*)pStmt->pOptions->pSliding); code = doTranslateValue(pCxt, (SValueNode*)pStmt->pOptions->pSliding);
} }
if (TSDB_CODE_SUCCESS == code && NULL != pStmt->pOptions->pStreamOptions) {
SStreamOptions* pStreamOpt = (SStreamOptions*)pStmt->pOptions->pStreamOptions;
if (NULL != pStreamOpt->pWatermark) {
code = doTranslateValue(pCxt, (SValueNode*)pStreamOpt->pWatermark);
}
if (TSDB_CODE_SUCCESS == code && NULL != pStreamOpt->pDelay) {
code = doTranslateValue(pCxt, (SValueNode*)pStreamOpt->pDelay);
}
}
return code; return code;
} }

File diff suppressed because it is too large Load Diff

View File

@ -375,9 +375,68 @@ TEST_F(ParserInitialCTest, createFunction) {
TEST_F(ParserInitialCTest, createSmaIndex) { TEST_F(ParserInitialCTest, createSmaIndex) {
useDb("root", "test"); useDb("root", "test");
SMCreateSmaReq expect = {0};
auto setCreateSmacReq = [&](const char* pIndexName, const char* pStbName, int64_t interval, int8_t intervalUnit,
int64_t offset = 0, int64_t sliding = -1, int8_t slidingUnit = -1, int8_t igExists = 0) {
memset(&expect, 0, sizeof(SMCreateSmaReq));
strcpy(expect.name, pIndexName);
strcpy(expect.stb, pStbName);
expect.igExists = igExists;
expect.intervalUnit = intervalUnit;
expect.slidingUnit = slidingUnit < 0 ? intervalUnit : slidingUnit;
expect.timezone = 0;
expect.dstVgId = 1;
expect.interval = interval;
expect.offset = offset;
expect.sliding = sliding < 0 ? interval : sliding;
expect.maxDelay = -1;
expect.watermark = TSDB_DEFAULT_ROLLUP_WATERMARK;
expect.deleteMark = TSDB_DEFAULT_ROLLUP_DELETE_MARK;
};
auto setOptionsForCreateSmacReq = [&](int64_t maxDelay, int64_t watermark, int64_t deleteMark) {
expect.maxDelay = maxDelay;
expect.watermark = watermark;
expect.deleteMark = deleteMark;
};
setCheckDdlFunc([&](const SQuery* pQuery, ParserStage stage) {
ASSERT_EQ(nodeType(pQuery->pRoot), QUERY_NODE_CREATE_INDEX_STMT);
SMCreateSmaReq req = {0};
ASSERT_TRUE(TSDB_CODE_SUCCESS == tDeserializeSMCreateSmaReq(pQuery->pCmdMsg->pMsg, pQuery->pCmdMsg->msgLen, &req));
ASSERT_EQ(std::string(req.name), std::string(expect.name));
ASSERT_EQ(std::string(req.stb), std::string(expect.stb));
ASSERT_EQ(req.igExists, expect.igExists);
ASSERT_EQ(req.intervalUnit, expect.intervalUnit);
ASSERT_EQ(req.slidingUnit, expect.slidingUnit);
ASSERT_EQ(req.timezone, expect.timezone);
ASSERT_EQ(req.dstVgId, expect.dstVgId);
ASSERT_EQ(req.interval, expect.interval);
ASSERT_EQ(req.offset, expect.offset);
ASSERT_EQ(req.sliding, expect.sliding);
ASSERT_EQ(req.maxDelay, expect.maxDelay);
ASSERT_EQ(req.watermark, expect.watermark);
ASSERT_EQ(req.deleteMark, expect.deleteMark);
ASSERT_GT(req.exprLen, 0);
ASSERT_EQ(req.tagsFilterLen, 0);
ASSERT_GT(req.sqlLen, 0);
ASSERT_GT(req.astLen, 0);
ASSERT_NE(req.expr, nullptr);
ASSERT_EQ(req.tagsFilter, nullptr);
ASSERT_NE(req.sql, nullptr);
ASSERT_NE(req.ast, nullptr);
tFreeSMCreateSmaReq(&req);
});
setCreateSmacReq("0.test.index1", "0.test.t1", 10 * MILLISECOND_PER_SECOND, 's');
run("CREATE SMA INDEX index1 ON t1 FUNCTION(MAX(c1), MIN(c3 + 10), SUM(c4)) INTERVAL(10s)"); run("CREATE SMA INDEX index1 ON t1 FUNCTION(MAX(c1), MIN(c3 + 10), SUM(c4)) INTERVAL(10s)");
run("CREATE SMA INDEX index2 ON st1 FUNCTION(MAX(c1), MIN(tag1)) INTERVAL(10s)"); setCreateSmacReq("0.test.index2", "0.test.st1", 5 * MILLISECOND_PER_SECOND, 's');
setOptionsForCreateSmacReq(10 * MILLISECOND_PER_SECOND, 20 * MILLISECOND_PER_SECOND, 1000 * MILLISECOND_PER_SECOND);
run("CREATE SMA INDEX index2 ON st1 FUNCTION(MAX(c1), MIN(tag1)) INTERVAL(5s) WATERMARK 20s MAX_DELAY 10s "
"DELETE_MARK 1000s");
} }
TEST_F(ParserInitialCTest, createMnode) { TEST_F(ParserInitialCTest, createMnode) {
@ -408,23 +467,26 @@ TEST_F(ParserInitialCTest, createStable) {
memset(&expect, 0, sizeof(SMCreateStbReq)); memset(&expect, 0, sizeof(SMCreateStbReq));
}; };
auto setCreateStbReqFunc = [&](const char* pDbName, const char* pTbName, int8_t igExists = 0, int64_t delay1 = -1, auto setCreateStbReqFunc =
int64_t delay2 = -1, int64_t watermark1 = TSDB_DEFAULT_ROLLUP_WATERMARK, [&](const char* pDbName, const char* pTbName, int8_t igExists = 0, int64_t delay1 = -1, int64_t delay2 = -1,
int64_t watermark2 = TSDB_DEFAULT_ROLLUP_WATERMARK, int64_t watermark1 = TSDB_DEFAULT_ROLLUP_WATERMARK, int64_t watermark2 = TSDB_DEFAULT_ROLLUP_WATERMARK,
int32_t ttl = TSDB_DEFAULT_TABLE_TTL, const char* pComment = nullptr) { int64_t deleteMark1 = TSDB_DEFAULT_ROLLUP_DELETE_MARK, int64_t deleteMark2 = TSDB_DEFAULT_ROLLUP_DELETE_MARK,
int32_t len = snprintf(expect.name, sizeof(expect.name), "0.%s.%s", pDbName, pTbName); int32_t ttl = TSDB_DEFAULT_TABLE_TTL, const char* pComment = nullptr) {
expect.name[len] = '\0'; int32_t len = snprintf(expect.name, sizeof(expect.name), "0.%s.%s", pDbName, pTbName);
expect.igExists = igExists; expect.name[len] = '\0';
expect.delay1 = delay1; expect.igExists = igExists;
expect.delay2 = delay2; expect.delay1 = delay1;
expect.watermark1 = watermark1; expect.delay2 = delay2;
expect.watermark2 = watermark2; expect.watermark1 = watermark1;
// expect.ttl = ttl; expect.watermark2 = watermark2;
if (nullptr != pComment) { expect.deleteMark1 = deleteMark1;
expect.pComment = strdup(pComment); expect.deleteMark2 = deleteMark2;
expect.commentLen = strlen(pComment); // expect.ttl = ttl;
} if (nullptr != pComment) {
}; expect.pComment = strdup(pComment);
expect.commentLen = strlen(pComment);
}
};
auto addFieldToCreateStbReqFunc = [&](bool col, const char* pFieldName, uint8_t type, int32_t bytes = 0, auto addFieldToCreateStbReqFunc = [&](bool col, const char* pFieldName, uint8_t type, int32_t bytes = 0,
int8_t flags = COL_SMA_ON) { int8_t flags = COL_SMA_ON) {
@ -511,7 +573,8 @@ TEST_F(ParserInitialCTest, createStable) {
clearCreateStbReq(); clearCreateStbReq();
setCreateStbReqFunc("rollup_db", "t1", 1, 100 * MILLISECOND_PER_SECOND, 10 * MILLISECOND_PER_MINUTE, 10, setCreateStbReqFunc("rollup_db", "t1", 1, 100 * MILLISECOND_PER_SECOND, 10 * MILLISECOND_PER_MINUTE, 10,
1 * MILLISECOND_PER_MINUTE, 100, "test create table"); 1 * MILLISECOND_PER_MINUTE, 1000 * MILLISECOND_PER_SECOND, 200 * MILLISECOND_PER_MINUTE, 100,
"test create table");
addFieldToCreateStbReqFunc(true, "ts", TSDB_DATA_TYPE_TIMESTAMP, 0, 0); addFieldToCreateStbReqFunc(true, "ts", TSDB_DATA_TYPE_TIMESTAMP, 0, 0);
addFieldToCreateStbReqFunc(true, "c1", TSDB_DATA_TYPE_INT); addFieldToCreateStbReqFunc(true, "c1", TSDB_DATA_TYPE_INT);
addFieldToCreateStbReqFunc(true, "c2", TSDB_DATA_TYPE_UINT); addFieldToCreateStbReqFunc(true, "c2", TSDB_DATA_TYPE_UINT);
@ -549,7 +612,8 @@ TEST_F(ParserInitialCTest, createStable) {
"TAGS (a1 TIMESTAMP, a2 INT, a3 INT UNSIGNED, a4 BIGINT, a5 BIGINT UNSIGNED, a6 FLOAT, a7 DOUBLE, " "TAGS (a1 TIMESTAMP, a2 INT, a3 INT UNSIGNED, a4 BIGINT, a5 BIGINT UNSIGNED, a6 FLOAT, a7 DOUBLE, "
"a8 BINARY(20), a9 SMALLINT, a10 SMALLINT UNSIGNED COMMENT 'test column comment', a11 TINYINT, " "a8 BINARY(20), a9 SMALLINT, a10 SMALLINT UNSIGNED COMMENT 'test column comment', a11 TINYINT, "
"a12 TINYINT UNSIGNED, a13 BOOL, a14 NCHAR(30), a15 VARCHAR(50)) " "a12 TINYINT UNSIGNED, a13 BOOL, a14 NCHAR(30), a15 VARCHAR(50)) "
"TTL 100 COMMENT 'test create table' SMA(c1, c2, c3) ROLLUP (MIN) MAX_DELAY 100s,10m WATERMARK 10a,1m"); "TTL 100 COMMENT 'test create table' SMA(c1, c2, c3) ROLLUP (MIN) MAX_DELAY 100s,10m WATERMARK 10a,1m "
"DELETE_MARK 1000s,200m");
clearCreateStbReq(); clearCreateStbReq();
} }

View File

@ -702,6 +702,7 @@ static int32_t createWindowLogicNodeFinalize(SLogicPlanContext* pCxt, SSelectStm
if (pCxt->pPlanCxt->streamQuery) { if (pCxt->pPlanCxt->streamQuery) {
pWindow->triggerType = pCxt->pPlanCxt->triggerType; pWindow->triggerType = pCxt->pPlanCxt->triggerType;
pWindow->watermark = pCxt->pPlanCxt->watermark; pWindow->watermark = pCxt->pPlanCxt->watermark;
pWindow->deleteMark = pCxt->pPlanCxt->deleteMark;
pWindow->igExpired = pCxt->pPlanCxt->igExpired; pWindow->igExpired = pCxt->pPlanCxt->igExpired;
} }
pWindow->inputTsOrder = ORDER_ASC; pWindow->inputTsOrder = ORDER_ASC;

View File

@ -330,6 +330,7 @@ static void scanPathOptSetScanWin(SScanLogicNode* pScan) {
pScan->slidingUnit = ((SWindowLogicNode*)pParent)->slidingUnit; pScan->slidingUnit = ((SWindowLogicNode*)pParent)->slidingUnit;
pScan->triggerType = ((SWindowLogicNode*)pParent)->triggerType; pScan->triggerType = ((SWindowLogicNode*)pParent)->triggerType;
pScan->watermark = ((SWindowLogicNode*)pParent)->watermark; pScan->watermark = ((SWindowLogicNode*)pParent)->watermark;
pScan->deleteMark = ((SWindowLogicNode*)pParent)->deleteMark;
pScan->igExpired = ((SWindowLogicNode*)pParent)->igExpired; pScan->igExpired = ((SWindowLogicNode*)pParent)->igExpired;
} }
} }

View File

@ -1139,6 +1139,7 @@ static int32_t createWindowPhysiNodeFinalize(SPhysiPlanContext* pCxt, SNodeList*
SWindowLogicNode* pWindowLogicNode) { SWindowLogicNode* pWindowLogicNode) {
pWindow->triggerType = pWindowLogicNode->triggerType; pWindow->triggerType = pWindowLogicNode->triggerType;
pWindow->watermark = pWindowLogicNode->watermark; pWindow->watermark = pWindowLogicNode->watermark;
pWindow->deleteMark = pWindowLogicNode->deleteMark;
pWindow->igExpired = pWindowLogicNode->igExpired; pWindow->igExpired = pWindowLogicNode->igExpired;
pWindow->inputTsOrder = pWindowLogicNode->inputTsOrder; pWindow->inputTsOrder = pWindowLogicNode->inputTsOrder;
pWindow->outputTsOrder = pWindowLogicNode->outputTsOrder; pWindow->outputTsOrder = pWindowLogicNode->outputTsOrder;

View File

@ -51,7 +51,7 @@ TEST_F(PlanOtherTest, createStreamUseSTable) {
TEST_F(PlanOtherTest, createSmaIndex) { TEST_F(PlanOtherTest, createSmaIndex) {
useDb("root", "test"); useDb("root", "test");
run("CREATE SMA INDEX idx1 ON t1 FUNCTION(MAX(c1), MIN(c3 + 10), SUM(c4)) INTERVAL(10s)"); run("CREATE SMA INDEX idx1 ON t1 FUNCTION(MAX(c1), MIN(c3 + 10), SUM(c4)) INTERVAL(10s) DELETE_MARK 1000s");
run("SELECT SUM(c4) FROM t1 INTERVAL(10s)"); run("SELECT SUM(c4) FROM t1 INTERVAL(10s)");

View File

@ -444,6 +444,7 @@ class PlannerTestBaseImpl {
tDeserializeSMCreateSmaReq(pQuery->pCmdMsg->pMsg, pQuery->pCmdMsg->msgLen, &req); tDeserializeSMCreateSmaReq(pQuery->pCmdMsg->pMsg, pQuery->pCmdMsg->msgLen, &req);
g_mockCatalogService->createSmaIndex(&req); g_mockCatalogService->createSmaIndex(&req);
nodesStringToNode(req.ast, &pCxt->pAstRoot); nodesStringToNode(req.ast, &pCxt->pAstRoot);
pCxt->deleteMark = req.deleteMark;
tFreeSMCreateSmaReq(&req); tFreeSMCreateSmaReq(&req);
nodesDestroyNode(pQuery->pRoot); nodesDestroyNode(pQuery->pRoot);
pQuery->pRoot = pCxt->pAstRoot; pQuery->pRoot = pCxt->pAstRoot;

View File

@ -46,6 +46,7 @@ typedef struct SyncClientRequest {
uint32_t originalRpcType; // origin RpcMsg msgType uint32_t originalRpcType; // origin RpcMsg msgType
uint64_t seqNum; uint64_t seqNum;
bool isWeak; bool isWeak;
int16_t reserved;
uint32_t dataLen; // origin RpcMsg.contLen uint32_t dataLen; // origin RpcMsg.contLen
char data[]; // origin RpcMsg.pCont char data[]; // origin RpcMsg.pCont
} SyncClientRequest; } SyncClientRequest;
@ -56,6 +57,7 @@ typedef struct SyncClientRequestReply {
uint32_t msgType; uint32_t msgType;
int32_t errCode; int32_t errCode;
SRaftId leaderHint; SRaftId leaderHint;
int16_t reserved;
} SyncClientRequestReply; } SyncClientRequestReply;
typedef struct SyncRequestVote { typedef struct SyncRequestVote {
@ -68,6 +70,7 @@ typedef struct SyncRequestVote {
SyncTerm term; SyncTerm term;
SyncIndex lastLogIndex; SyncIndex lastLogIndex;
SyncTerm lastLogTerm; SyncTerm lastLogTerm;
int16_t reserved;
} SyncRequestVote; } SyncRequestVote;
typedef struct SyncRequestVoteReply { typedef struct SyncRequestVoteReply {
@ -79,6 +82,7 @@ typedef struct SyncRequestVoteReply {
// private data // private data
SyncTerm term; SyncTerm term;
bool voteGranted; bool voteGranted;
int16_t reserved;
} SyncRequestVoteReply; } SyncRequestVoteReply;
typedef struct SyncAppendEntries { typedef struct SyncAppendEntries {
@ -94,6 +98,7 @@ typedef struct SyncAppendEntries {
SyncTerm prevLogTerm; SyncTerm prevLogTerm;
SyncIndex commitIndex; SyncIndex commitIndex;
SyncTerm privateTerm; SyncTerm privateTerm;
int16_t reserved;
uint32_t dataLen; uint32_t dataLen;
char data[]; char data[];
} SyncAppendEntries; } SyncAppendEntries;
@ -111,6 +116,7 @@ typedef struct SyncAppendEntriesReply {
SyncIndex matchIndex; SyncIndex matchIndex;
SyncIndex lastSendIndex; SyncIndex lastSendIndex;
int64_t startTime; int64_t startTime;
int16_t reserved;
} SyncAppendEntriesReply; } SyncAppendEntriesReply;
typedef struct SyncHeartbeat { typedef struct SyncHeartbeat {
@ -126,6 +132,7 @@ typedef struct SyncHeartbeat {
SyncTerm privateTerm; SyncTerm privateTerm;
SyncTerm minMatchIndex; SyncTerm minMatchIndex;
int64_t timeStamp; int64_t timeStamp;
int16_t reserved;
} SyncHeartbeat; } SyncHeartbeat;
typedef struct SyncHeartbeatReply { typedef struct SyncHeartbeatReply {
@ -140,6 +147,7 @@ typedef struct SyncHeartbeatReply {
SyncTerm privateTerm; SyncTerm privateTerm;
int64_t startTime; int64_t startTime;
int64_t timeStamp; int64_t timeStamp;
int16_t reserved;
} SyncHeartbeatReply; } SyncHeartbeatReply;
typedef struct SyncPreSnapshot { typedef struct SyncPreSnapshot {
@ -151,6 +159,7 @@ typedef struct SyncPreSnapshot {
// private data // private data
SyncTerm term; SyncTerm term;
int16_t reserved;
} SyncPreSnapshot; } SyncPreSnapshot;
typedef struct SyncPreSnapshotReply { typedef struct SyncPreSnapshotReply {
@ -163,6 +172,7 @@ typedef struct SyncPreSnapshotReply {
// private data // private data
SyncTerm term; SyncTerm term;
SyncIndex snapStart; SyncIndex snapStart;
int16_t reserved;
} SyncPreSnapshotReply; } SyncPreSnapshotReply;
typedef struct SyncApplyMsg { typedef struct SyncApplyMsg {
@ -190,6 +200,7 @@ typedef struct SyncSnapshotSend {
SSyncCfg lastConfig; SSyncCfg lastConfig;
int64_t startTime; int64_t startTime;
int32_t seq; int32_t seq;
int16_t reserved;
uint32_t dataLen; uint32_t dataLen;
char data[]; char data[];
} SyncSnapshotSend; } SyncSnapshotSend;
@ -208,6 +219,7 @@ typedef struct SyncSnapshotRsp {
int32_t ack; int32_t ack;
int32_t code; int32_t code;
SyncIndex snapBeginIndex; // when ack = SYNC_SNAPSHOT_SEQ_BEGIN, it's valid SyncIndex snapBeginIndex; // when ack = SYNC_SNAPSHOT_SEQ_BEGIN, it's valid
int16_t reserved;
} SyncSnapshotRsp; } SyncSnapshotRsp;
typedef struct SyncLeaderTransfer { typedef struct SyncLeaderTransfer {

View File

@ -19,6 +19,7 @@
#include "syncRaftCfg.h" #include "syncRaftCfg.h"
#include "syncRaftLog.h" #include "syncRaftLog.h"
#include "syncReplication.h" #include "syncReplication.h"
#include "syncRespMgr.h"
#include "syncUtil.h" #include "syncUtil.h"
static void syncNodeCleanConfigIndex(SSyncNode* ths) { static void syncNodeCleanConfigIndex(SSyncNode* ths) {
@ -85,11 +86,9 @@ static int32_t syncNodeTimerRoutine(SSyncNode* ths) {
} }
} }
#if 0
if (!syncNodeIsMnode(ths)) { if (!syncNodeIsMnode(ths)) {
syncRespClean(ths->pSyncRespMgr); syncRespClean(ths->pSyncRespMgr);
} }
#endif
return 0; return 0;
} }

View File

@ -57,10 +57,14 @@ typedef struct {
int32_t retryMaxInterval; // retry max interval int32_t retryMaxInterval; // retry max interval
int32_t retryMaxTimouet; int32_t retryMaxTimouet;
int32_t failFastThreshold;
int32_t failFastInterval;
void (*cfp)(void* parent, SRpcMsg*, SEpSet*); void (*cfp)(void* parent, SRpcMsg*, SEpSet*);
bool (*retry)(int32_t code, tmsg_t msgType); bool (*retry)(int32_t code, tmsg_t msgType);
bool (*startTimer)(int32_t code, tmsg_t msgType); bool (*startTimer)(int32_t code, tmsg_t msgType);
void (*destroyFp)(void* ahandle); void (*destroyFp)(void* ahandle);
bool (*failFastFp)(tmsg_t msgType);
int index; int index;
void* parent; void* parent;

View File

@ -56,11 +56,15 @@ void* rpcOpen(const SRpcInit* pInit) {
pRpc->retryMaxInterval = pInit->retryMaxInterval; pRpc->retryMaxInterval = pInit->retryMaxInterval;
pRpc->retryMaxTimouet = pInit->retryMaxTimouet; pRpc->retryMaxTimouet = pInit->retryMaxTimouet;
pRpc->failFastThreshold = pInit->failFastThreshold;
pRpc->failFastInterval = pInit->failFastInterval;
// register callback handle // register callback handle
pRpc->cfp = pInit->cfp; pRpc->cfp = pInit->cfp;
pRpc->retry = pInit->rfp; pRpc->retry = pInit->rfp;
pRpc->startTimer = pInit->tfp; pRpc->startTimer = pInit->tfp;
pRpc->destroyFp = pInit->dfp; pRpc->destroyFp = pInit->dfp;
pRpc->failFastFp = pInit->ffp;
pRpc->numOfThreads = pInit->numOfThreads > TSDB_MAX_RPC_THREADS ? TSDB_MAX_RPC_THREADS : pInit->numOfThreads; pRpc->numOfThreads = pInit->numOfThreads > TSDB_MAX_RPC_THREADS ? TSDB_MAX_RPC_THREADS : pInit->numOfThreads;
if (pRpc->numOfThreads <= 0) { if (pRpc->numOfThreads <= 0) {

View File

@ -84,6 +84,8 @@ typedef struct SCliThrd {
SHashObj* fqdn2ipCache; SHashObj* fqdn2ipCache;
SCvtAddr cvtAddr; SCvtAddr cvtAddr;
SHashObj* failFastCache;
SCliMsg* stopMsg; SCliMsg* stopMsg;
bool quit; bool quit;
@ -96,6 +98,13 @@ typedef struct SCliObj {
SCliThrd** pThreadObj; SCliThrd** pThreadObj;
} SCliObj; } SCliObj;
typedef struct {
int32_t reinit;
int64_t timestamp;
int32_t count;
int32_t threshold;
int64_t interval;
} SFailFastItem;
// conn pool // conn pool
// add expire timeout and capacity limit // add expire timeout and capacity limit
static void* createConnPool(int size); static void* createConnPool(int size);
@ -853,7 +862,7 @@ void cliSend(SCliConn* pConn) {
int status = uv_write(req, (uv_stream_t*)pConn->stream, &wb, 1, cliSendCb); int status = uv_write(req, (uv_stream_t*)pConn->stream, &wb, 1, cliSendCb);
if (status != 0) { if (status != 0) {
tGError("%s conn %p failed to sent msg:%s, errmsg:%s", CONN_GET_INST_LABEL(pConn), pConn, TMSG_INFO(pMsg->msgType), tGError("%s conn %p failed to send msg:%s, errmsg:%s", CONN_GET_INST_LABEL(pConn), pConn, TMSG_INFO(pMsg->msgType),
uv_err_name(status)); uv_err_name(status));
cliHandleExcept(pConn); cliHandleExcept(pConn);
} }
@ -863,7 +872,6 @@ _RETURN:
} }
void cliConnCb(uv_connect_t* req, int status) { void cliConnCb(uv_connect_t* req, int status) {
// impl later
SCliConn* pConn = req->data; SCliConn* pConn = req->data;
SCliThrd* pThrd = pConn->hostThrd; SCliThrd* pThrd = pConn->hostThrd;
@ -875,7 +883,32 @@ void cliConnCb(uv_connect_t* req, int status) {
} }
if (status != 0) { if (status != 0) {
tError("%s conn %p failed to connect server:%s", CONN_GET_INST_LABEL(pConn), pConn, uv_strerror(status)); tError("%s conn %p failed to connect to %s:%d, reason:%s", CONN_GET_INST_LABEL(pConn), pConn, pConn->ip,
pConn->port, uv_strerror(status));
SCliMsg* pMsg = transQueueGet(&pConn->cliMsgs, 0);
STrans* pTransInst = pThrd->pTransInst;
if (pMsg != NULL && REQUEST_NO_RESP(&pMsg->msg) &&
(pTransInst->failFastFp != NULL && pTransInst->failFastFp(pMsg->msg.msgType))) {
char* ip = pConn->ip;
uint32_t port = pConn->port;
char key[TSDB_FQDN_LEN + 64] = {0};
CONN_CONSTRUCT_HASH_KEY(key, ip, port);
SFailFastItem* item = taosHashGet(pThrd->failFastCache, key, strlen(key));
int64_t cTimestamp = taosGetTimestampMs();
if (item != NULL) {
int32_t elapse = cTimestamp - item->timestamp;
if (elapse >= 0 && elapse <= pTransInst->failFastInterval) {
item->count++;
} else {
item->count = 1;
item->timestamp = cTimestamp;
}
} else {
SFailFastItem item = {.count = 1, .timestamp = cTimestamp};
taosHashPut(pThrd->failFastCache, key, strlen(key), &item, sizeof(SFailFastItem));
}
}
cliHandleExcept(pConn); cliHandleExcept(pConn);
return; return;
} }
@ -1027,6 +1060,25 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) {
return; return;
} }
if (REQUEST_NO_RESP(&pMsg->msg) && (pTransInst->failFastFp != NULL && pTransInst->failFastFp(pMsg->msg.msgType))) {
char* ip = EPSET_GET_INUSE_IP(&pCtx->epSet);
uint32_t port = EPSET_GET_INUSE_PORT(&pCtx->epSet);
char key[TSDB_FQDN_LEN + 64] = {0};
CONN_CONSTRUCT_HASH_KEY(key, ip, port);
SFailFastItem* item = taosHashGet(pThrd->failFastCache, key, strlen(key));
if (item != NULL) {
int32_t elapse = (int32_t)(taosGetTimestampMs() - item->timestamp);
if (item->count >= pTransInst->failFastThreshold && (elapse >= 0 && elapse <= pTransInst->failFastInterval)) {
STraceId* trace = &(pMsg->msg.info.traceId);
tGTrace("%s, msg %p cancel to send, reason: failed to connect %s:%d: count: %d, at %d", pTransInst->label, pMsg,
ip, port, item->count, elapse);
destroyCmsg(pMsg);
return;
}
}
}
bool ignore = false; bool ignore = false;
SCliConn* conn = cliGetConn(pMsg, pThrd, &ignore); SCliConn* conn = cliGetConn(pMsg, pThrd, &ignore);
if (ignore == true) { if (ignore == true) {
@ -1299,6 +1351,8 @@ static SCliThrd* createThrdObj(void* trans) {
pThrd->destroyAhandleFp = pTransInst->destroyFp; pThrd->destroyAhandleFp = pTransInst->destroyFp;
pThrd->fqdn2ipCache = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); pThrd->fqdn2ipCache = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
pThrd->failFastCache = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
pThrd->quit = false; pThrd->quit = false;
return pThrd; return pThrd;
} }
@ -1325,6 +1379,7 @@ static void destroyThrdObj(SCliThrd* pThrd) {
taosMemoryFree(pThrd->prepare); taosMemoryFree(pThrd->prepare);
taosMemoryFree(pThrd->loop); taosMemoryFree(pThrd->loop);
taosHashCleanup(pThrd->fqdn2ipCache); taosHashCleanup(pThrd->fqdn2ipCache);
taosHashCleanup(pThrd->failFastCache);
taosMemoryFree(pThrd); taosMemoryFree(pThrd);
} }

View File

@ -572,7 +572,7 @@ int32_t taosClockGetTime(int clock_id, struct timespec *pTS) {
offsetInitFinished = true; offsetInitFinished = true;
} else { } else {
while (!offsetInitFinished) while (!offsetInitFinished)
; // Ensure initialization is completed. ; // Ensure initialization is completed.
} }
GetSystemTimeAsFileTime(&f); GetSystemTimeAsFileTime(&f);

View File

@ -496,7 +496,7 @@ void taosPrintLongString(const char *flags, ELogLevel level, int32_t dflag, cons
if (!osLogSpaceAvailable()) return; if (!osLogSpaceAvailable()) return;
if (!(dflag & DEBUG_FILE) && !(dflag & DEBUG_SCREEN)) return; if (!(dflag & DEBUG_FILE) && !(dflag & DEBUG_SCREEN)) return;
char *buffer = taosMemoryMalloc(LOG_MAX_LINE_DUMP_BUFFER_SIZE); char *buffer = taosMemoryMalloc(LOG_MAX_LINE_DUMP_BUFFER_SIZE);
int32_t len = taosBuildLogHead(buffer, flags); int32_t len = taosBuildLogHead(buffer, flags);
va_list argpointer; va_list argpointer;

View File

@ -6,8 +6,8 @@ sql connect
print =============== create db print =============== create db
sql create database d1 vgroups 1; sql create database d1 vgroups 1;
sql use d1 sql use d1
sql create table stb (ts timestamp, i int) tags (j int) sql create table d1_stb (ts timestamp, i int) tags (j int)
# sql create topic topic_1 as select ts, i from stb sql create topic d1_topic_1 as select ts, i from d1_stb
sql create database d2 vgroups 1; sql create database d2 vgroups 1;
sql create database d3 vgroups 1; sql create database d3 vgroups 1;
@ -72,7 +72,6 @@ sql REVOKE read,write ON d1.* from user1;
sql REVOKE read,write ON d2.* from user1; sql REVOKE read,write ON d2.* from user1;
sql REVOKE read,write ON *.* from user1; sql REVOKE read,write ON *.* from user1;
print =============== create users print =============== create users
sql create user u1 PASS 'taosdata' sql create user u1 PASS 'taosdata'
sql select * from information_schema.ins_users sql select * from information_schema.ins_users
@ -92,7 +91,17 @@ sql_error drop database d1;
sql_error drop database d2; sql_error drop database d2;
sql_error create stable d1.st (ts timestamp, i int) tags (j int) sql_error create stable d1.st (ts timestamp, i int) tags (j int)
sql create stable d2.st (ts timestamp, i int) tags (j int) sql use d2
#sql create topic topic_2 as select ts, i from stb sql create table d2_stb (ts timestamp, i int) tags (j int)
# Insufficient privilege for operation
sql_error create topic d2_topic_1 as select ts, i from d2_stb
sql use d1
# Insufficient privilege for operation
sql_error drop topic d1_topic_1
sql create topic d1_topic_2 as select ts, i from d1_stb
sql drop topic d1_topic_2
system sh/exec.sh -n dnode1 -s stop -x SIGINT system sh/exec.sh -n dnode1 -s stop -x SIGINT