cos/multi-write: include headers part

This commit is contained in:
Minglei Jin 2024-04-08 10:33:02 +08:00
parent d5e9169769
commit 44b9785853
14 changed files with 844 additions and 396 deletions

View File

@ -33,10 +33,12 @@ extern int32_t tsS3UploadDelaySec;
int32_t s3Init();
void s3CleanUp();
int32_t s3CheckCfg();
int32_t s3PutObjectFromFile(const char *file, const char *object);
int32_t s3PutObjectFromFile2(const char *file, const char *object, int8_t withcp);
int32_t s3PutObjectFromFileOffset(const char *file, const char *object_name, int64_t offset, int64_t size);
void s3DeleteObjectsByPrefix(const char *prefix);
void s3DeleteObjects(const char *object_name[], int nobject);
int32_t s3DeleteObjects(const char *object_name[], int nobject);
bool s3Exists(const char *object_name);
bool s3Get(const char *object_name, const char *path);
int32_t s3GetObjectBlock(const char *object_name, int64_t offset, int64_t size, bool check, uint8_t **ppBlock);
@ -45,6 +47,8 @@ void s3EvictCache(const char *path, long object_size);
long s3Size(const char *object_name);
int32_t s3GetObjectToFile(const char *object_name, char *fileName);
#define S3_DATA_CHUNK_PAGES (256 * 1024 * 1024)
#ifdef __cplusplus
}
#endif

View File

@ -111,9 +111,9 @@ extern int32_t tsMonitorIntervalForBasic;
extern bool tsMonitorForceV2;
// audit
extern bool tsEnableAudit;
extern bool tsEnableAuditCreateTable;
extern int32_t tsAuditInterval;
extern bool tsEnableAudit;
extern bool tsEnableAuditCreateTable;
extern int32_t tsAuditInterval;
// telem
extern bool tsEnableTelem;
@ -121,9 +121,9 @@ extern int32_t tsTelemInterval;
extern char tsTelemServer[];
extern uint16_t tsTelemPort;
extern bool tsEnableCrashReport;
extern char * tsTelemUri;
extern char * tsClientCrashReportUri;
extern char * tsSvrCrashReportUri;
extern char *tsTelemUri;
extern char *tsClientCrashReportUri;
extern char *tsSvrCrashReportUri;
// query buffer management
extern int32_t tsQueryBufferSize; // maximum allowed usage buffer size in MB for each data node during query processing
@ -209,6 +209,8 @@ extern int32_t tsTtlUnit;
extern int32_t tsTtlPushIntervalSec;
extern int32_t tsTtlBatchDropNum;
extern int32_t tsTrimVDbIntervalSec;
extern int32_t tsS3MigrateIntervalSec;
extern bool tsS3MigrateEnabled;
extern int32_t tsGrantHBInterval;
extern int32_t tsUptimeInterval;

View File

@ -300,7 +300,8 @@ typedef enum ENodeType {
QUERY_NODE_GRANT_STMT,
QUERY_NODE_REVOKE_STMT,
QUERY_NODE_ALTER_CLUSTER_STMT,
// placeholder for [153, 180]
QUERY_NODE_S3MIGRATE_DATABASE_STMT,
// placeholder for [154, 180]
QUERY_NODE_SHOW_CREATE_VIEW_STMT = 181,
QUERY_NODE_SHOW_CREATE_DATABASE_STMT,
QUERY_NODE_SHOW_CREATE_TABLE_STMT,
@ -584,7 +585,7 @@ typedef struct {
// int32_t tEncodeSSubmitRsp(SEncoder* pEncoder, const SSubmitRsp* pRsp);
// int32_t tDecodeSSubmitRsp(SDecoder* pDecoder, SSubmitRsp* pRsp);
// void tFreeSSubmitBlkRsp(void* param);
void tFreeSSubmitRsp(SSubmitRsp* pRsp);
void tFreeSSubmitRsp(SSubmitRsp* pRsp);
#define COL_SMA_ON ((int8_t)0x1)
#define COL_IDX_ON ((int8_t)0x2)
@ -1149,6 +1150,9 @@ typedef struct {
int32_t sstTrigger;
int16_t hashPrefix;
int16_t hashSuffix;
int32_t s3ChunkSize;
int32_t s3KeepLocal;
int8_t s3Compact;
int32_t tsdbPageSize;
int32_t sqlLen;
char* sql;
@ -1178,6 +1182,8 @@ typedef struct {
int32_t minRows;
int32_t walRetentionPeriod;
int32_t walRetentionSize;
int32_t s3KeepLocal;
int8_t s3Compact;
int32_t sqlLen;
char* sql;
} SAlterDbReq;
@ -1257,6 +1263,20 @@ typedef struct {
int32_t tSerializeSVTrimDbReq(void* buf, int32_t bufLen, SVTrimDbReq* pReq);
int32_t tDeserializeSVTrimDbReq(void* buf, int32_t bufLen, SVTrimDbReq* pReq);
typedef struct {
char db[TSDB_DB_FNAME_LEN];
} SS3MigrateDbReq;
int32_t tSerializeSS3MigrateDbReq(void* buf, int32_t bufLen, SS3MigrateDbReq* pReq);
int32_t tDeserializeSS3MigrateDbReq(void* buf, int32_t bufLen, SS3MigrateDbReq* pReq);
typedef struct {
int32_t timestamp;
} SVS3MigrateDbReq;
int32_t tSerializeSVS3MigrateDbReq(void* buf, int32_t bufLen, SVS3MigrateDbReq* pReq);
int32_t tDeserializeSVS3MigrateDbReq(void* buf, int32_t bufLen, SVS3MigrateDbReq* pReq);
typedef struct {
int32_t timestampSec;
int32_t ttlDropMaxCount;
@ -1293,6 +1313,9 @@ typedef struct {
int8_t replications;
int8_t strict;
int8_t cacheLast;
int32_t s3ChunkSize;
int32_t s3KeepLocal;
int8_t s3Compact;
int32_t tsdbPageSize;
int32_t walRetentionPeriod;
int32_t walRollPeriod;
@ -1582,13 +1605,13 @@ int32_t tDeserializeSStatusReq(void* buf, int32_t bufLen, SStatusReq* pReq);
void tFreeSStatusReq(SStatusReq* pReq);
typedef struct {
int32_t contLen;
char* pCont;
int32_t contLen;
char* pCont;
} SStatisReq;
int32_t tSerializeSStatisReq(void* buf, int32_t bufLen, SStatisReq* pReq);
int32_t tDeserializeSStatisReq(void* buf, int32_t bufLen, SStatisReq* pReq);
void tFreeSStatisReq(SStatisReq *pReq);
void tFreeSStatisReq(SStatisReq* pReq);
typedef struct {
int32_t dnodeId;
@ -1693,7 +1716,10 @@ typedef struct {
int16_t hashPrefix;
int16_t hashSuffix;
int32_t tsdbPageSize;
int64_t reserved[8];
int32_t s3ChunkSize;
int32_t s3KeepLocal;
int8_t s3Compact;
int64_t reserved[6];
int8_t learnerReplica;
int8_t learnerSelfIndex;
SReplica learnerReplicas[TSDB_MAX_LEARNER_REPLICA];
@ -1781,13 +1807,15 @@ typedef struct {
int8_t walLevel;
int8_t strict;
int8_t cacheLast;
int64_t reserved[8];
int64_t reserved[7];
// 1st modification
int16_t sttTrigger;
int32_t minRows;
// 2nd modification
int32_t walRetentionPeriod;
int32_t walRetentionSize;
int32_t s3KeepLocal;
int8_t s3Compact;
} SAlterVnodeConfigReq;
int32_t tSerializeSAlterVnodeConfigReq(void* buf, int32_t bufLen, SAlterVnodeConfigReq* pReq);
@ -1946,7 +1974,7 @@ typedef struct {
int32_t tSerializeSShowReq(void* buf, int32_t bufLen, SShowReq* pReq);
// int32_t tDeserializeSShowReq(void* buf, int32_t bufLen, SShowReq* pReq);
void tFreeSShowReq(SShowReq* pReq);
void tFreeSShowReq(SShowReq* pReq);
typedef struct {
int64_t showId;
@ -2735,7 +2763,7 @@ typedef struct {
SVCreateTbReq* pReqs;
SArray* pArray;
};
int8_t source; // TD_REQ_FROM_TAOX-taosX or TD_REQ_FROM_APP-taosClient
int8_t source; // TD_REQ_FROM_TAOX-taosX or TD_REQ_FROM_APP-taosClient
} SVCreateTbBatchReq;
int tEncodeSVCreateTbBatchReq(SEncoder* pCoder, const SVCreateTbBatchReq* pReq);
@ -2828,7 +2856,7 @@ typedef struct {
int32_t newCommentLen;
char* newComment;
int64_t ctimeMs; // fill by vnode
int8_t source; // TD_REQ_FROM_TAOX-taosX or TD_REQ_FROM_APP-taosClient
int8_t source; // TD_REQ_FROM_TAOX-taosX or TD_REQ_FROM_APP-taosClient
} SVAlterTbReq;
int32_t tEncodeSVAlterTbReq(SEncoder* pEncoder, const SVAlterTbReq* pReq);
@ -3932,7 +3960,7 @@ int32_t tDeserializeSMqSeekReq(void* buf, int32_t bufLen, SMqSeekReq* pReq);
#define SUBMIT_REQ_FROM_FILE 0x4
#define TD_REQ_FROM_TAOX 0x8
#define TD_REQ_FROM_TAOX_OLD 0x1 // for compatibility
#define TD_REQ_FROM_TAOX_OLD 0x1 // for compatibility
typedef struct {
int32_t flags;

View File

@ -220,6 +220,8 @@
TD_DEF_MSG_TYPE(TDMT_MND_COMPACT_TIMER, "compact-tmr", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_STREAM_REQ_CHKPT, "stream-req-checkpoint", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_CONFIG_CLUSTER, "config-cluster", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_S3MIGRATE_DB, "s3migrate-db", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_S3MIGRATE_DB_TIMER, "s3migrate-db-tmr", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_MAX_MSG, "mnd-max", NULL, NULL)
TD_CLOSE_MSG_SEG(TDMT_END_MND_MSG)
@ -272,6 +274,7 @@
TD_DEF_MSG_TYPE(TDMT_VND_DISABLE_WRITE, "vnode-disable-write", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_QUERY_COMPACT_PROGRESS, "vnode-query-compact-progress", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_KILL_COMPACT, "kill-compact", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_S3MIGRATE, "vnode-s3migrate", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_MAX_MSG, "vnd-max", NULL, NULL)
TD_CLOSE_MSG_SEG(TDMT_END_VND_MSG)

View File

@ -16,7 +16,6 @@
#ifndef _TD_COMMON_TOKEN_H_
#define _TD_COMMON_TOKEN_H_
#define TK_OR 1
#define TK_AND 2
#define TK_UNION 3
@ -85,288 +84,292 @@
#define TK_USE 66
#define TK_FLUSH 67
#define TK_TRIM 68
#define TK_COMPACT 69
#define TK_IF 70
#define TK_NOT 71
#define TK_EXISTS 72
#define TK_BUFFER 73
#define TK_CACHEMODEL 74
#define TK_CACHESIZE 75
#define TK_COMP 76
#define TK_DURATION 77
#define TK_NK_VARIABLE 78
#define TK_MAXROWS 79
#define TK_MINROWS 80
#define TK_KEEP 81
#define TK_PAGES 82
#define TK_PAGESIZE 83
#define TK_TSDB_PAGESIZE 84
#define TK_PRECISION 85
#define TK_REPLICA 86
#define TK_VGROUPS 87
#define TK_SINGLE_STABLE 88
#define TK_RETENTIONS 89
#define TK_SCHEMALESS 90
#define TK_WAL_LEVEL 91
#define TK_WAL_FSYNC_PERIOD 92
#define TK_WAL_RETENTION_PERIOD 93
#define TK_WAL_RETENTION_SIZE 94
#define TK_WAL_ROLL_PERIOD 95
#define TK_WAL_SEGMENT_SIZE 96
#define TK_STT_TRIGGER 97
#define TK_TABLE_PREFIX 98
#define TK_TABLE_SUFFIX 99
#define TK_KEEP_TIME_OFFSET 100
#define TK_NK_COLON 101
#define TK_BWLIMIT 102
#define TK_START 103
#define TK_TIMESTAMP 104
#define TK_END 105
#define TK_TABLE 106
#define TK_NK_LP 107
#define TK_NK_RP 108
#define TK_STABLE 109
#define TK_COLUMN 110
#define TK_MODIFY 111
#define TK_RENAME 112
#define TK_TAG 113
#define TK_SET 114
#define TK_NK_EQ 115
#define TK_USING 116
#define TK_TAGS 117
#define TK_BOOL 118
#define TK_TINYINT 119
#define TK_SMALLINT 120
#define TK_INT 121
#define TK_INTEGER 122
#define TK_BIGINT 123
#define TK_FLOAT 124
#define TK_DOUBLE 125
#define TK_BINARY 126
#define TK_NCHAR 127
#define TK_UNSIGNED 128
#define TK_JSON 129
#define TK_VARCHAR 130
#define TK_MEDIUMBLOB 131
#define TK_BLOB 132
#define TK_VARBINARY 133
#define TK_GEOMETRY 134
#define TK_DECIMAL 135
#define TK_COMMENT 136
#define TK_MAX_DELAY 137
#define TK_WATERMARK 138
#define TK_ROLLUP 139
#define TK_TTL 140
#define TK_SMA 141
#define TK_DELETE_MARK 142
#define TK_FIRST 143
#define TK_LAST 144
#define TK_SHOW 145
#define TK_PRIVILEGES 146
#define TK_DATABASES 147
#define TK_TABLES 148
#define TK_STABLES 149
#define TK_MNODES 150
#define TK_QNODES 151
#define TK_FUNCTIONS 152
#define TK_INDEXES 153
#define TK_ACCOUNTS 154
#define TK_APPS 155
#define TK_CONNECTIONS 156
#define TK_LICENCES 157
#define TK_GRANTS 158
#define TK_FULL 159
#define TK_LOGS 160
#define TK_MACHINES 161
#define TK_QUERIES 162
#define TK_SCORES 163
#define TK_TOPICS 164
#define TK_VARIABLES 165
#define TK_BNODES 166
#define TK_SNODES 167
#define TK_TRANSACTIONS 168
#define TK_DISTRIBUTED 169
#define TK_CONSUMERS 170
#define TK_SUBSCRIPTIONS 171
#define TK_VNODES 172
#define TK_ALIVE 173
#define TK_VIEWS 174
#define TK_VIEW 175
#define TK_COMPACTS 176
#define TK_NORMAL 177
#define TK_CHILD 178
#define TK_LIKE 179
#define TK_TBNAME 180
#define TK_QTAGS 181
#define TK_AS 182
#define TK_SYSTEM 183
#define TK_INDEX 184
#define TK_FUNCTION 185
#define TK_INTERVAL 186
#define TK_COUNT 187
#define TK_LAST_ROW 188
#define TK_META 189
#define TK_ONLY 190
#define TK_TOPIC 191
#define TK_CONSUMER 192
#define TK_GROUP 193
#define TK_DESC 194
#define TK_DESCRIBE 195
#define TK_RESET 196
#define TK_QUERY 197
#define TK_CACHE 198
#define TK_EXPLAIN 199
#define TK_ANALYZE 200
#define TK_VERBOSE 201
#define TK_NK_BOOL 202
#define TK_RATIO 203
#define TK_NK_FLOAT 204
#define TK_OUTPUTTYPE 205
#define TK_AGGREGATE 206
#define TK_BUFSIZE 207
#define TK_LANGUAGE 208
#define TK_REPLACE 209
#define TK_STREAM 210
#define TK_INTO 211
#define TK_PAUSE 212
#define TK_RESUME 213
#define TK_TRIGGER 214
#define TK_AT_ONCE 215
#define TK_WINDOW_CLOSE 216
#define TK_IGNORE 217
#define TK_EXPIRED 218
#define TK_FILL_HISTORY 219
#define TK_UPDATE 220
#define TK_SUBTABLE 221
#define TK_UNTREATED 222
#define TK_KILL 223
#define TK_CONNECTION 224
#define TK_TRANSACTION 225
#define TK_BALANCE 226
#define TK_VGROUP 227
#define TK_LEADER 228
#define TK_MERGE 229
#define TK_REDISTRIBUTE 230
#define TK_SPLIT 231
#define TK_DELETE 232
#define TK_INSERT 233
#define TK_NULL 234
#define TK_NK_QUESTION 235
#define TK_NK_ALIAS 236
#define TK_NK_ARROW 237
#define TK_ROWTS 238
#define TK_QSTART 239
#define TK_QEND 240
#define TK_QDURATION 241
#define TK_WSTART 242
#define TK_WEND 243
#define TK_WDURATION 244
#define TK_IROWTS 245
#define TK_ISFILLED 246
#define TK_CAST 247
#define TK_NOW 248
#define TK_TODAY 249
#define TK_TIMEZONE 250
#define TK_CLIENT_VERSION 251
#define TK_SERVER_VERSION 252
#define TK_SERVER_STATUS 253
#define TK_CURRENT_USER 254
#define TK_CASE 255
#define TK_WHEN 256
#define TK_THEN 257
#define TK_ELSE 258
#define TK_BETWEEN 259
#define TK_IS 260
#define TK_NK_LT 261
#define TK_NK_GT 262
#define TK_NK_LE 263
#define TK_NK_GE 264
#define TK_NK_NE 265
#define TK_MATCH 266
#define TK_NMATCH 267
#define TK_CONTAINS 268
#define TK_IN 269
#define TK_JOIN 270
#define TK_INNER 271
#define TK_SELECT 272
#define TK_NK_HINT 273
#define TK_DISTINCT 274
#define TK_WHERE 275
#define TK_PARTITION 276
#define TK_BY 277
#define TK_SESSION 278
#define TK_STATE_WINDOW 279
#define TK_EVENT_WINDOW 280
#define TK_COUNT_WINDOW 281
#define TK_SLIDING 282
#define TK_FILL 283
#define TK_VALUE 284
#define TK_VALUE_F 285
#define TK_NONE 286
#define TK_PREV 287
#define TK_NULL_F 288
#define TK_LINEAR 289
#define TK_NEXT 290
#define TK_HAVING 291
#define TK_RANGE 292
#define TK_EVERY 293
#define TK_ORDER 294
#define TK_SLIMIT 295
#define TK_SOFFSET 296
#define TK_LIMIT 297
#define TK_OFFSET 298
#define TK_ASC 299
#define TK_NULLS 300
#define TK_ABORT 301
#define TK_AFTER 302
#define TK_ATTACH 303
#define TK_BEFORE 304
#define TK_BEGIN 305
#define TK_BITAND 306
#define TK_BITNOT 307
#define TK_BITOR 308
#define TK_BLOCKS 309
#define TK_CHANGE 310
#define TK_COMMA 311
#define TK_CONCAT 312
#define TK_CONFLICT 313
#define TK_COPY 314
#define TK_DEFERRED 315
#define TK_DELIMITERS 316
#define TK_DETACH 317
#define TK_DIVIDE 318
#define TK_DOT 319
#define TK_EACH 320
#define TK_FAIL 321
#define TK_FILE 322
#define TK_FOR 323
#define TK_GLOB 324
#define TK_ID 325
#define TK_IMMEDIATE 326
#define TK_IMPORT 327
#define TK_INITIALLY 328
#define TK_INSTEAD 329
#define TK_ISNULL 330
#define TK_KEY 331
#define TK_MODULES 332
#define TK_NK_BITNOT 333
#define TK_NK_SEMI 334
#define TK_NOTNULL 335
#define TK_OF 336
#define TK_PLUS 337
#define TK_PRIVILEGE 338
#define TK_RAISE 339
#define TK_RESTRICT 340
#define TK_ROW 341
#define TK_SEMI 342
#define TK_STAR 343
#define TK_STATEMENT 344
#define TK_STRICT 345
#define TK_STRING 346
#define TK_TIMES 347
#define TK_VALUES 348
#define TK_VARIABLE 349
#define TK_WAL 350
#define TK_S3MIGRATE 69
#define TK_COMPACT 70
#define TK_IF 71
#define TK_NOT 72
#define TK_EXISTS 73
#define TK_BUFFER 74
#define TK_CACHEMODEL 75
#define TK_CACHESIZE 76
#define TK_COMP 77
#define TK_DURATION 78
#define TK_NK_VARIABLE 79
#define TK_MAXROWS 80
#define TK_MINROWS 81
#define TK_KEEP 82
#define TK_PAGES 83
#define TK_PAGESIZE 84
#define TK_TSDB_PAGESIZE 85
#define TK_PRECISION 86
#define TK_REPLICA 87
#define TK_VGROUPS 88
#define TK_SINGLE_STABLE 89
#define TK_RETENTIONS 90
#define TK_SCHEMALESS 91
#define TK_WAL_LEVEL 92
#define TK_WAL_FSYNC_PERIOD 93
#define TK_WAL_RETENTION_PERIOD 94
#define TK_WAL_RETENTION_SIZE 95
#define TK_WAL_ROLL_PERIOD 96
#define TK_WAL_SEGMENT_SIZE 97
#define TK_STT_TRIGGER 98
#define TK_TABLE_PREFIX 99
#define TK_TABLE_SUFFIX 100
#define TK_S3_CHUNKSIZE 101
#define TK_S3_KEEPLOCAL 102
#define TK_S3_COMPACT 103
#define TK_KEEP_TIME_OFFSET 104
#define TK_NK_COLON 105
#define TK_BWLIMIT 106
#define TK_START 107
#define TK_TIMESTAMP 108
#define TK_END 109
#define TK_TABLE 110
#define TK_NK_LP 111
#define TK_NK_RP 112
#define TK_STABLE 113
#define TK_COLUMN 114
#define TK_MODIFY 115
#define TK_RENAME 116
#define TK_TAG 117
#define TK_SET 118
#define TK_NK_EQ 119
#define TK_USING 120
#define TK_TAGS 121
#define TK_BOOL 122
#define TK_TINYINT 123
#define TK_SMALLINT 124
#define TK_INT 125
#define TK_INTEGER 126
#define TK_BIGINT 127
#define TK_FLOAT 128
#define TK_DOUBLE 129
#define TK_BINARY 130
#define TK_NCHAR 131
#define TK_UNSIGNED 132
#define TK_JSON 133
#define TK_VARCHAR 134
#define TK_MEDIUMBLOB 135
#define TK_BLOB 136
#define TK_VARBINARY 137
#define TK_GEOMETRY 138
#define TK_DECIMAL 139
#define TK_COMMENT 140
#define TK_MAX_DELAY 141
#define TK_WATERMARK 142
#define TK_ROLLUP 143
#define TK_TTL 144
#define TK_SMA 145
#define TK_DELETE_MARK 146
#define TK_FIRST 147
#define TK_LAST 148
#define TK_SHOW 149
#define TK_PRIVILEGES 150
#define TK_DATABASES 151
#define TK_TABLES 152
#define TK_STABLES 153
#define TK_MNODES 154
#define TK_QNODES 155
#define TK_FUNCTIONS 156
#define TK_INDEXES 157
#define TK_ACCOUNTS 158
#define TK_APPS 159
#define TK_CONNECTIONS 160
#define TK_LICENCES 161
#define TK_GRANTS 162
#define TK_FULL 163
#define TK_LOGS 164
#define TK_MACHINES 165
#define TK_QUERIES 166
#define TK_SCORES 167
#define TK_TOPICS 168
#define TK_VARIABLES 169
#define TK_BNODES 170
#define TK_SNODES 171
#define TK_TRANSACTIONS 172
#define TK_DISTRIBUTED 173
#define TK_CONSUMERS 174
#define TK_SUBSCRIPTIONS 175
#define TK_VNODES 176
#define TK_ALIVE 177
#define TK_VIEWS 178
#define TK_VIEW 179
#define TK_COMPACTS 180
#define TK_NORMAL 181
#define TK_CHILD 182
#define TK_LIKE 183
#define TK_TBNAME 184
#define TK_QTAGS 185
#define TK_AS 186
#define TK_SYSTEM 187
#define TK_INDEX 188
#define TK_FUNCTION 189
#define TK_INTERVAL 190
#define TK_COUNT 191
#define TK_LAST_ROW 192
#define TK_META 193
#define TK_ONLY 194
#define TK_TOPIC 195
#define TK_CONSUMER 196
#define TK_GROUP 197
#define TK_DESC 198
#define TK_DESCRIBE 199
#define TK_RESET 200
#define TK_QUERY 201
#define TK_CACHE 202
#define TK_EXPLAIN 203
#define TK_ANALYZE 204
#define TK_VERBOSE 205
#define TK_NK_BOOL 206
#define TK_RATIO 207
#define TK_NK_FLOAT 208
#define TK_OUTPUTTYPE 209
#define TK_AGGREGATE 210
#define TK_BUFSIZE 211
#define TK_LANGUAGE 212
#define TK_REPLACE 213
#define TK_STREAM 214
#define TK_INTO 215
#define TK_PAUSE 216
#define TK_RESUME 217
#define TK_TRIGGER 218
#define TK_AT_ONCE 219
#define TK_WINDOW_CLOSE 220
#define TK_IGNORE 221
#define TK_EXPIRED 222
#define TK_FILL_HISTORY 223
#define TK_UPDATE 224
#define TK_SUBTABLE 225
#define TK_UNTREATED 226
#define TK_KILL 227
#define TK_CONNECTION 228
#define TK_TRANSACTION 229
#define TK_BALANCE 230
#define TK_VGROUP 231
#define TK_LEADER 232
#define TK_MERGE 233
#define TK_REDISTRIBUTE 234
#define TK_SPLIT 235
#define TK_DELETE 236
#define TK_INSERT 237
#define TK_NULL 238
#define TK_NK_QUESTION 239
#define TK_NK_ALIAS 240
#define TK_NK_ARROW 241
#define TK_ROWTS 242
#define TK_QSTART 243
#define TK_QEND 244
#define TK_QDURATION 245
#define TK_WSTART 246
#define TK_WEND 247
#define TK_WDURATION 248
#define TK_IROWTS 249
#define TK_ISFILLED 250
#define TK_CAST 251
#define TK_NOW 252
#define TK_TODAY 253
#define TK_TIMEZONE 254
#define TK_CLIENT_VERSION 255
#define TK_SERVER_VERSION 256
#define TK_SERVER_STATUS 257
#define TK_CURRENT_USER 258
#define TK_CASE 259
#define TK_WHEN 260
#define TK_THEN 261
#define TK_ELSE 262
#define TK_BETWEEN 263
#define TK_IS 264
#define TK_NK_LT 265
#define TK_NK_GT 266
#define TK_NK_LE 267
#define TK_NK_GE 268
#define TK_NK_NE 269
#define TK_MATCH 270
#define TK_NMATCH 271
#define TK_CONTAINS 272
#define TK_IN 273
#define TK_JOIN 274
#define TK_INNER 275
#define TK_SELECT 276
#define TK_NK_HINT 277
#define TK_DISTINCT 278
#define TK_WHERE 279
#define TK_PARTITION 280
#define TK_BY 281
#define TK_SESSION 282
#define TK_STATE_WINDOW 283
#define TK_EVENT_WINDOW 284
#define TK_COUNT_WINDOW 285
#define TK_SLIDING 286
#define TK_FILL 287
#define TK_VALUE 288
#define TK_VALUE_F 289
#define TK_NONE 290
#define TK_PREV 291
#define TK_NULL_F 292
#define TK_LINEAR 293
#define TK_NEXT 294
#define TK_HAVING 295
#define TK_RANGE 296
#define TK_EVERY 297
#define TK_ORDER 298
#define TK_SLIMIT 299
#define TK_SOFFSET 300
#define TK_LIMIT 301
#define TK_OFFSET 302
#define TK_ASC 303
#define TK_NULLS 304
#define TK_ABORT 305
#define TK_AFTER 306
#define TK_ATTACH 307
#define TK_BEFORE 308
#define TK_BEGIN 309
#define TK_BITAND 310
#define TK_BITNOT 311
#define TK_BITOR 312
#define TK_BLOCKS 313
#define TK_CHANGE 314
#define TK_COMMA 315
#define TK_CONCAT 316
#define TK_CONFLICT 317
#define TK_COPY 318
#define TK_DEFERRED 319
#define TK_DELIMITERS 320
#define TK_DETACH 321
#define TK_DIVIDE 322
#define TK_DOT 323
#define TK_EACH 324
#define TK_FAIL 325
#define TK_FILE 326
#define TK_FOR 327
#define TK_GLOB 328
#define TK_ID 329
#define TK_IMMEDIATE 330
#define TK_IMPORT 331
#define TK_INITIALLY 332
#define TK_INSTEAD 333
#define TK_ISNULL 334
#define TK_KEY 335
#define TK_MODULES 336
#define TK_NK_BITNOT 337
#define TK_NK_SEMI 338
#define TK_NOTNULL 339
#define TK_OF 340
#define TK_PLUS 341
#define TK_PRIVILEGE 342
#define TK_RAISE 343
#define TK_RESTRICT 344
#define TK_ROW 345
#define TK_SEMI 346
#define TK_STAR 347
#define TK_STATEMENT 348
#define TK_STRICT 349
#define TK_STRING 350
#define TK_TIMES 351
#define TK_VALUES 352
#define TK_VARIABLE 353
#define TK_WAL 354
#define TK_NK_SPACE 600
@ -379,8 +382,7 @@
#define TK_NO_BATCH_SCAN 607
#define TK_SORT_FOR_GROUP 608
#define TK_PARTITION_FIRST 609
#define TK_PARA_TABLES_SORT 610
#define TK_PARA_TABLES_SORT 610
#define TK_NK_NIL 65535

View File

@ -100,6 +100,10 @@ typedef struct SDatabaseOptions {
int32_t sstTrigger;
int32_t tablePrefix;
int32_t tableSuffix;
int32_t s3ChunkSize;
int32_t s3KeepLocal;
SValueNode* s3KeepLocalStr;
int8_t s3Compact;
} SDatabaseOptions;
typedef struct SCreateDatabaseStmt {
@ -137,6 +141,11 @@ typedef struct STrimDatabaseStmt {
int32_t maxSpeed;
} STrimDatabaseStmt;
typedef struct SS3MigrateDatabaseStmt {
ENodeType type;
char dbName[TSDB_DB_NAME_LEN];
} SS3MigrateDatabaseStmt;
typedef struct SCompactDatabaseStmt {
ENodeType type;
char dbName[TSDB_DB_NAME_LEN];

View File

@ -51,11 +51,7 @@ typedef enum {
CFG_DTYPE_TIMEZONE
} ECfgDataType;
typedef enum {
CFG_SCOPE_SERVER,
CFG_SCOPE_CLIENT,
CFG_SCOPE_BOTH
} ECfgScopeType;
typedef enum { CFG_SCOPE_SERVER, CFG_SCOPE_CLIENT, CFG_SCOPE_BOTH } ECfgScopeType;
typedef enum {
CFG_DYN_NONE = 0,
@ -138,6 +134,7 @@ void cfgDumpItemValue(SConfigItem *pItem, char *buf, int32_t bufSize, int32_t *p
void cfgDumpItemScope(SConfigItem *pItem, char *buf, int32_t bufSize, int32_t *pLen);
void cfgDumpCfg(SConfig *pCfg, bool tsc, bool dump);
void cfgDumpCfgS3(SConfig *pCfg, bool tsc, bool dump);
int32_t cfgGetApollUrl(const char **envCmd, const char *envFile, char *apolloUrl);

View File

@ -197,8 +197,8 @@ typedef enum ELogicConditionType {
#define TSDB_POINTER_PRINT_BYTES 18 // 0x1122334455667788
// ACCOUNT is a 32 bit positive integer
// this is the length of its string representation, including the terminator zero
#define TSDB_ACCT_ID_LEN 11
#define TSDB_NODE_ID_LEN 11
#define TSDB_ACCT_ID_LEN 11
#define TSDB_NODE_ID_LEN 11
#define TSDB_VGROUP_ID_LEN 11
#define TSDB_MAX_COLUMNS 4096
@ -290,8 +290,8 @@ typedef enum ELogicConditionType {
#define TSDB_DNODE_CONFIG_LEN 128
#define TSDB_DNODE_VALUE_LEN 256
#define TSDB_CLUSTER_VALUE_LEN 1000
#define TSDB_GRANT_LOG_COL_LEN 15600
#define TSDB_CLUSTER_VALUE_LEN 1000
#define TSDB_GRANT_LOG_COL_LEN 15600
#define TSDB_ACTIVE_KEY_LEN 109
#define TSDB_CONN_ACTIVE_KEY_LEN 255
@ -413,6 +413,16 @@ typedef enum ELogicConditionType {
#define TSDB_MAX_HASH_SUFFIX (TSDB_TABLE_NAME_LEN - 2)
#define TSDB_DEFAULT_HASH_SUFFIX 0
#define TSDB_MIN_S3_CHUNK_SIZE (32 * 1024)
#define TSDB_MAX_S3_CHUNK_SIZE (1024 * 1024)
#define TSDB_DEFAULT_S3_CHUNK_SIZE (256 * 1024)
#define TSDB_MIN_S3_KEEP_LOCAL (1 * 1440) // unit minute
#define TSDB_MAX_S3_KEEP_LOCAL (365000 * 1440)
#define TSDB_DEFAULT_S3_KEEP_LOCAL (30 * 1440)
#define TSDB_MIN_S3_COMPACT 0
#define TSDB_MAX_S3_COMPACT 1
#define TSDB_DEFAULT_S3_COMPACT 0
#define TSDB_DB_MIN_WAL_RETENTION_PERIOD -1
#define TSDB_REP_DEF_DB_WAL_RET_PERIOD 3600
#define TSDB_REPS_DEF_DB_WAL_RET_PERIOD 3600
@ -551,10 +561,10 @@ enum {
#define VNODE_TIMEOUT_SEC 60
#define MNODE_TIMEOUT_SEC 60
#define MONITOR_TABLENAME_LEN 200
#define MONITOR_TAG_NAME_LEN 100
#define MONITOR_TAG_VALUE_LEN 300
#define MONITOR_METRIC_NAME_LEN 100
#define MONITOR_TABLENAME_LEN 200
#define MONITOR_TAG_NAME_LEN 100
#define MONITOR_TAG_VALUE_LEN 300
#define MONITOR_METRIC_NAME_LEN 100
#ifdef __cplusplus
}
#endif

View File

@ -16,14 +16,15 @@ ENDIF ()
IF (TD_STORAGE)
ADD_DEFINITIONS(-D_STORAGE)
TARGET_LINK_LIBRARIES(common PRIVATE storage)
ENDIF ()
IF (TD_ENTERPRISE)
IF(${BUILD_WITH_S3})
add_definitions(-DUSE_S3)
ELSEIF(${BUILD_WITH_COS})
add_definitions(-DUSE_COS)
ENDIF()
ENDIF ()
ENDIF()
target_include_directories(
common

View File

@ -53,6 +53,100 @@ int32_t s3Init() { return 0; /*s3Begin();*/ }
void s3CleanUp() { /*s3End();*/
}
static int32_t s3ListBucket(char const *bucketname);
int32_t s3CheckCfg() {
int32_t code = 0;
code = s3Begin();
if (code != 0) {
fprintf(stderr, "failed to initialize s3.\n");
goto _exit;
}
// test put
char testdata[17] = "0123456789abcdef";
const char *objectname[] = {"s3test.txt"};
char path[PATH_MAX] = {0};
int ds_len = strlen(TD_DIRSEP);
int tmp_len = strlen(tsTempDir);
snprintf(path, PATH_MAX, "%s", tsTempDir);
if (strncmp(tsTempDir + tmp_len - ds_len, TD_DIRSEP, ds_len) != 0) {
snprintf(path + tmp_len, PATH_MAX, "%s", TD_DIRSEP);
snprintf(path + tmp_len + ds_len, PATH_MAX, "%s", objectname[0]);
} else {
snprintf(path + tmp_len, PATH_MAX, "%s", objectname[0]);
}
TdFilePtr fp = taosOpenFile(path, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_READ | TD_FILE_TRUNC);
if (!fp) {
code = TAOS_SYSTEM_ERROR(errno);
fprintf(stderr, "failed to open test file: %s.\n", path);
// uError("ERROR: %s Failed to open %s", __func__, path);
goto _exit;
}
if (taosWriteFile(fp, testdata, strlen(testdata)) < 0) {
code = TAOS_SYSTEM_ERROR(errno);
fprintf(stderr, "failed to write test file: %s.\n", path);
goto _exit;
}
if (taosFsyncFile(fp) < 0) {
code = TAOS_SYSTEM_ERROR(errno);
fprintf(stderr, "failed to fsync test file: %s.\n", path);
goto _exit;
}
taosCloseFile(&fp);
fprintf(stderr, "\nstart to put object: %s, file: %s content: %s\n", objectname[0], path, testdata);
code = s3PutObjectFromFileOffset(path, objectname[0], 0, 16);
if (code != 0) {
fprintf(stderr, "put object %s : failed.\n", objectname[0]);
goto _exit;
}
fprintf(stderr, "put object %s: success.\n\n", objectname[0]);
// list buckets
fprintf(stderr, "start to list bucket %s by prefix s3.\n", tsS3BucketName);
code = s3ListBucket(tsS3BucketName);
if (code != 0) {
fprintf(stderr, "listing bucket %s : failed.\n", tsS3BucketName);
goto _exit;
}
fprintf(stderr, "listing bucket %s: success.\n\n", tsS3BucketName);
// test range get
uint8_t *pBlock = NULL;
int c_offset = 10;
int c_len = 6;
fprintf(stderr, "start to range get object %s offset: %d len: %d.\n", objectname[0], c_offset, c_len);
code = s3GetObjectBlock(objectname[0], c_offset, c_len, true, &pBlock);
if (code != 0) {
fprintf(stderr, "get object %s : failed.\n", objectname[0]);
goto _exit;
}
char buf[7] = {0};
memcpy(buf, pBlock, c_len);
taosMemoryFree(pBlock);
fprintf(stderr, "object content: %s\n", buf);
fprintf(stderr, "get object %s: success.\n\n", objectname[0]);
// delete test object
fprintf(stderr, "start to delete object: %s.\n", objectname[0]);
code = s3DeleteObjects(objectname, 1);
if (code != 0) {
fprintf(stderr, "delete object %s : failed.\n", objectname[0]);
goto _exit;
}
fprintf(stderr, "delete object %s: success.\n\n", objectname[0]);
s3End();
_exit:
return code;
}
static int should_retry() {
/*
if (retriesG--) {
@ -85,7 +179,7 @@ typedef struct {
} TS3GetData;
typedef struct {
char err_msg[128];
char err_msg[512];
S3Status status;
uint64_t content_length;
char *buf;
@ -137,6 +231,30 @@ static void responseCompleteCallback(S3Status status, const S3ErrorDetails *erro
}
}
static SArray *getListByPrefix(const char *prefix);
static void s3FreeObjectKey(void *pItem);
static int32_t s3ListBucket(char const *bucketname) {
int32_t code = 0;
SArray *objectArray = getListByPrefix("s3");
if (objectArray == NULL) {
return -1;
}
const char **object_name = TARRAY_DATA(objectArray);
int size = TARRAY_SIZE(objectArray);
fprintf(stderr, "objects:\n");
for (int i = 0; i < size; ++i) {
fprintf(stderr, "%s\n", object_name[i]);
}
taosArrayDestroyEx(objectArray, s3FreeObjectKey);
return code;
}
typedef struct growbuffer {
// The total number of bytes, and the start byte
int size;
@ -471,10 +589,11 @@ static int32_t s3PutObjectFromFileSimple(S3BucketContext *bucket_context, char c
} while (S3_status_is_retryable(data->status) && should_retry());
if (data->status != S3StatusOK) {
s3PrintError(__FILE__, __LINE__, __func__, data->status, data->err_msg);
// s3PrintError(__FILE__, __LINE__, __func__, data->status, data->err_msg);
s3PrintError(NULL, __LINE__, __func__, data->status, data->err_msg);
code = TAOS_SYSTEM_ERROR(EIO);
} else if (data->contentLength) {
uError("%s Failed to read remaining %llu bytes from input", __func__, (unsigned long long)data->contentLength);
uError("%s Failed to put remaining %llu bytes", __func__, (unsigned long long)data->contentLength);
code = TAOS_SYSTEM_ERROR(EIO);
}
@ -830,6 +949,66 @@ int32_t s3PutObjectFromFile2(const char *file, const char *object_name, int8_t w
return code;
}
int32_t s3PutObjectFromFileOffset(const char *file, const char *object_name, int64_t offset, int64_t size) {
int32_t code = 0;
int32_t lmtime = 0;
const char *filename = 0;
uint64_t contentLength = 0;
const char *cacheControl = 0, *contentType = 0, *md5 = 0;
const char *contentDispositionFilename = 0, *contentEncoding = 0;
int64_t expires = -1;
S3CannedAcl cannedAcl = S3CannedAclPrivate;
int metaPropertiesCount = 0;
S3NameValue metaProperties[S3_MAX_METADATA_COUNT];
char useServerSideEncryption = 0;
put_object_callback_data data = {0};
if (taosStatFile(file, &contentLength, &lmtime, NULL) < 0) {
code = TAOS_SYSTEM_ERROR(errno);
uError("ERROR: %s Failed to stat file %s: ", __func__, file);
return code;
}
contentLength = size;
if (!(data.infileFD = taosOpenFile(file, TD_FILE_READ))) {
code = TAOS_SYSTEM_ERROR(errno);
uError("ERROR: %s Failed to open file %s: ", __func__, file);
return code;
}
if (taosLSeekFile(data.infileFD, offset, SEEK_SET) < 0) {
taosCloseFile(&data.infileFD);
code = TAOS_SYSTEM_ERROR(errno);
return code;
}
data.totalContentLength = data.totalOriginalContentLength = data.contentLength = data.originalContentLength =
contentLength;
S3BucketContext bucketContext = {0, tsS3BucketName, protocolG, uriStyleG, tsS3AccessKeyId, tsS3AccessKeySecret,
0, awsRegionG};
S3PutProperties putProperties = {contentType, md5,
cacheControl, contentDispositionFilename,
contentEncoding, expires,
cannedAcl, metaPropertiesCount,
metaProperties, useServerSideEncryption};
if (contentLength <= MULTIPART_CHUNK_SIZE) {
code = s3PutObjectFromFileSimple(&bucketContext, object_name, contentLength, &putProperties, &data);
} else {
code = s3PutObjectFromFileWithoutCp(&bucketContext, object_name, contentLength, &putProperties, &data);
}
if (data.infileFD) {
taosCloseFile(&data.infileFD);
} else if (data.gb) {
growbuffer_destroy(data.gb);
}
return code;
}
typedef struct list_bucket_callback_data {
char err_msg[512];
S3Status status;
@ -888,7 +1067,7 @@ static SArray *getListByPrefix(const char *prefix) {
const char *marker = 0, *delimiter = 0;
int maxkeys = 0, allDetails = 0;
list_bucket_callback_data data;
list_bucket_callback_data data = {0};
data.objectArray = taosArrayInit(32, sizeof(void *));
if (!data.objectArray) {
uError("%s: %s", __func__, "out of memoty");
@ -918,14 +1097,17 @@ static SArray *getListByPrefix(const char *prefix) {
return data.objectArray;
}
} else {
s3PrintError(__FILE__, __LINE__, __func__, data.status, data.err_msg);
uError("failed to list with prefix %s: %s", prefix, S3_get_status_name(data.status));
// s3PrintError(__FILE__, __LINE__, __func__, data.status, data.err_msg);
}
taosArrayDestroyEx(data.objectArray, s3FreeObjectKey);
return NULL;
}
void s3DeleteObjects(const char *object_name[], int nobject) {
int32_t s3DeleteObjects(const char *object_name[], int nobject) {
int32_t code = 0;
S3BucketContext bucketContext = {0, tsS3BucketName, protocolG, uriStyleG, tsS3AccessKeyId, tsS3AccessKeySecret,
0, awsRegionG};
S3ResponseHandler responseHandler = {0, &responseCompleteCallback};
@ -938,8 +1120,11 @@ void s3DeleteObjects(const char *object_name[], int nobject) {
if ((cbd.status != S3StatusOK) && (cbd.status != S3StatusErrorPreconditionFailed)) {
s3PrintError(__FILE__, __LINE__, __func__, cbd.status, cbd.err_msg);
code = -1;
}
}
return code;
}
void s3DeleteObjectsByPrefix(const char *prefix) {
@ -991,12 +1176,12 @@ int32_t s3GetObjectBlock(const char *object_name, int64_t offset, int64_t size,
} while (S3_status_is_retryable(cbd.status) && should_retry());
if (cbd.status != S3StatusOK) {
uError("%s: %d(%s)", __func__, cbd.status, cbd.err_msg);
uError("%s: %d/%s(%s)", __func__, cbd.status, S3_get_status_name(cbd.status), cbd.err_msg);
return TAOS_SYSTEM_ERROR(EIO);
}
if (check && cbd.buf_pos != size) {
uError("%s: %d(%s)", __func__, cbd.status, cbd.err_msg);
uError("%s: %d/%s(%s)", __func__, cbd.status, S3_get_status_name(cbd.status), cbd.err_msg);
return TAOS_SYSTEM_ERROR(EIO);
}
@ -1233,7 +1418,7 @@ void s3DeleteObjectsByPrefix(const char *prefix_str) {
cos_pool_destroy(p);
}
void s3DeleteObjects(const char *object_name[], int nobject) {
int32_t s3DeleteObjects(const char *object_name[], int nobject) {
cos_pool_t *p = NULL;
int is_cname = 0;
cos_string_t bucket;
@ -1267,6 +1452,8 @@ void s3DeleteObjects(const char *object_name[], int nobject) {
} else {
cos_warn_log("delete objects failed\n");
}
return 0;
}
bool s3Exists(const char *object_name) {
@ -1536,7 +1723,7 @@ void s3CleanUp() {}
int32_t s3PutObjectFromFile(const char *file, const char *object) { return 0; }
int32_t s3PutObjectFromFile2(const char *file, const char *object, int8_t withcp) { return 0; }
void s3DeleteObjectsByPrefix(const char *prefix) {}
void s3DeleteObjects(const char *object_name[], int nobject) {}
int32_t s3DeleteObjects(const char *object_name[], int nobject) { return 0; }
bool s3Exists(const char *object_name) { return false; }
bool s3Get(const char *object_name, const char *path) { return false; }
int32_t s3GetObjectBlock(const char *object_name, int64_t offset, int64_t size, bool check, uint8_t **ppBlock) {

View File

@ -24,7 +24,7 @@
#define SYSTABLE_SCH_TABLE_NAME_LEN ((TSDB_TABLE_NAME_LEN - 1) + VARSTR_HEADER_SIZE)
#define SYSTABLE_SCH_DB_NAME_LEN ((TSDB_DB_NAME_LEN - 1) + VARSTR_HEADER_SIZE)
#define SYSTABLE_SCH_COL_NAME_LEN ((TSDB_COL_NAME_LEN - 1) + VARSTR_HEADER_SIZE)
#define SYSTABLE_SCH_VIEW_NAME_LEN ((TSDB_VIEW_NAME_LEN - 1) + VARSTR_HEADER_SIZE)
#define SYSTABLE_SCH_VIEW_NAME_LEN ((TSDB_VIEW_NAME_LEN - 1) + VARSTR_HEADER_SIZE)
// clang-format off
static const SSysDbTableSchema dnodesSchema[] = {
@ -107,6 +107,9 @@ static const SSysDbTableSchema userDBSchema[] = {
{.name = "table_suffix", .bytes = 2, .type = TSDB_DATA_TYPE_SMALLINT, .sysInfo = true},
{.name = "tsdb_pagesize", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = true},
{.name = "keep_time_offset", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = false},
{.name = "s3_chunksize", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = false},
{.name = "s3_keeplocal", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = false},
{.name = "s3_compact", .bytes = 1, .type = TSDB_DATA_TYPE_TINYINT, .sysInfo = false},
};
static const SSysDbTableSchema userFuncSchema[] = {

View File

@ -259,7 +259,9 @@ float tsSinkDataRate = 2.0;
int32_t tsStreamNodeCheckInterval = 16;
int32_t tsTtlUnit = 86400;
int32_t tsTtlPushIntervalSec = 10;
int32_t tsTrimVDbIntervalSec = 60 * 60; // interval of trimming db in all vgroups
int32_t tsTrimVDbIntervalSec = 60 * 60; // interval of trimming db in all vgroups
int32_t tsS3MigrateIntervalSec = 60 * 60; // interval of s3migrate db in all vgroups
bool tsS3MigrateEnabled = 1;
int32_t tsGrantHBInterval = 60;
int32_t tsUptimeInterval = 300; // seconds
char tsUdfdResFuncs[512] = ""; // udfd resident funcs that teardown when udfd exits
@ -285,7 +287,7 @@ char tsS3Hostname[TSDB_FQDN_LEN] = "<hostname>";
int32_t tsS3BlockSize = -1; // number of tsdb pages (4096)
int32_t tsS3BlockCacheSize = 16; // number of blocks
int32_t tsS3PageCacheSize = 4096; // number of pages
int32_t tsS3UploadDelaySec = 60 * 60 * 24;
int32_t tsS3UploadDelaySec = 60;
bool tsExperimental = true;
@ -345,7 +347,9 @@ int32_t taosSetS3Cfg(SConfig *pCfg) {
}
if (tsS3BucketName[0] != '<') {
#if defined(USE_COS) || defined(USE_S3)
if (tsDiskCfgNum > 1) tsS3Enabled = true;
#ifdef TD_ENTERPRISE
/*if (tsDiskCfgNum > 1) */ tsS3Enabled = true;
#endif
tsS3StreamEnabled = true;
#endif
}
@ -539,7 +543,8 @@ static int32_t taosAddClientCfg(SConfig *pCfg) {
if (cfgAddBool(pCfg, "experimental", tsExperimental, CFG_SCOPE_BOTH, CFG_DYN_BOTH) != 0) return -1;
if (cfgAddBool(pCfg, "monitor", tsEnableMonitor, CFG_SCOPE_SERVER, CFG_DYN_SERVER) != 0) return -1;
if (cfgAddInt32(pCfg, "monitorInterval", tsMonitorInterval, 1, 200000, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1;
if (cfgAddInt32(pCfg, "monitorInterval", tsMonitorInterval, 1, 200000, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0)
return -1;
return 0;
}
@ -680,8 +685,8 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
if (cfgAddInt32(pCfg, "syncHeartbeatTimeout", tsHeartbeatTimeout, 10, 1000 * 60 * 24 * 2, CFG_SCOPE_SERVER,
CFG_DYN_NONE) != 0)
return -1;
if (cfgAddInt32(pCfg, "syncSnapReplMaxWaitN", tsSnapReplMaxWaitN, 16,
(TSDB_SYNC_SNAP_BUFFER_SIZE >> 2), CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0)
if (cfgAddInt32(pCfg, "syncSnapReplMaxWaitN", tsSnapReplMaxWaitN, 16, (TSDB_SYNC_SNAP_BUFFER_SIZE >> 2),
CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0)
return -1;
if (cfgAddInt64(pCfg, "mndSdbWriteDelta", tsMndSdbWriteDelta, 20, 10000, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != 0)
@ -699,7 +704,8 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
if (cfgAddInt32(pCfg, "monitorMaxLogs", tsMonitorMaxLogs, 1, 1000000, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1;
if (cfgAddBool(pCfg, "monitorComp", tsMonitorComp, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1;
if (cfgAddBool(pCfg, "monitorLogProtocol", tsMonitorLogProtocol, CFG_SCOPE_SERVER, CFG_DYN_SERVER) != 0) return -1;
if (cfgAddInt32(pCfg, "monitorIntervalForBasic", tsMonitorIntervalForBasic, 1, 200000, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0)
if (cfgAddInt32(pCfg, "monitorIntervalForBasic", tsMonitorIntervalForBasic, 1, 200000, CFG_SCOPE_SERVER,
CFG_DYN_NONE) != 0)
return -1;
if (cfgAddBool(pCfg, "monitorForceV2", tsMonitorForceV2, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1;
@ -742,6 +748,10 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
if (cfgAddInt32(pCfg, "trimVDbIntervalSec", tsTrimVDbIntervalSec, 1, 100000, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) !=
0)
return -1;
if (cfgAddInt32(pCfg, "s3MigrateIntervalSec", tsS3MigrateIntervalSec, 600, 100000, CFG_SCOPE_SERVER,
CFG_DYN_ENT_SERVER) != 0)
return -1;
if (cfgAddBool(pCfg, "s3MigrateEnabled", tsS3MigrateEnabled, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != 0) return -1;
if (cfgAddInt32(pCfg, "uptimeInterval", tsUptimeInterval, 1, 100000, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1;
if (cfgAddInt32(pCfg, "queryRsmaTolerance", tsQueryRsmaTolerance, 0, 900000, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0)
return -1;
@ -760,8 +770,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
if (cfgAddBool(pCfg, "disableStream", tsDisableStream, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != 0) return -1;
if (cfgAddInt64(pCfg, "streamBufferSize", tsStreamBufferSize, 0, INT64_MAX, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0)
return -1;
if (cfgAddInt64(pCfg, "streamAggCnt", tsStreamAggCnt, 2, INT32_MAX, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0)
return -1;
if (cfgAddInt64(pCfg, "streamAggCnt", tsStreamAggCnt, 2, INT32_MAX, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1;
if (cfgAddInt32(pCfg, "checkpointInterval", tsStreamCheckpointInterval, 60, 1200, CFG_SCOPE_SERVER,
CFG_DYN_ENT_SERVER) != 0)
@ -792,6 +801,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
if (cfgAddString(pCfg, "s3Accesskey", tsS3AccessKey, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1;
if (cfgAddString(pCfg, "s3Endpoint", tsS3Endpoint, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1;
if (cfgAddString(pCfg, "s3BucketName", tsS3BucketName, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1;
/*
if (cfgAddInt32(pCfg, "s3BlockSize", tsS3BlockSize, -1, 1024 * 1024, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != 0)
return -1;
if (tsS3BlockSize > -1 && tsS3BlockSize < 1024) {
@ -801,10 +811,11 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
if (cfgAddInt32(pCfg, "s3BlockCacheSize", tsS3BlockCacheSize, 4, 1024 * 1024, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) !=
0)
return -1;
*/
if (cfgAddInt32(pCfg, "s3PageCacheSize", tsS3PageCacheSize, 4, 1024 * 1024 * 1024, CFG_SCOPE_SERVER,
CFG_DYN_ENT_SERVER) != 0)
return -1;
if (cfgAddInt32(pCfg, "s3UploadDelaySec", tsS3UploadDelaySec, 60 * 1, 60 * 60 * 24 * 30, CFG_SCOPE_SERVER,
if (cfgAddInt32(pCfg, "s3UploadDelaySec", tsS3UploadDelaySec, 1, 60 * 60 * 24 * 30, CFG_SCOPE_SERVER,
CFG_DYN_ENT_SERVER) != 0)
return -1;
@ -1241,8 +1252,8 @@ static int32_t taosSetServerCfg(SConfig *pCfg) {
tsResolveFQDNRetryTime = cfgGetItem(pCfg, "resolveFQDNRetryTime")->i32;
tsMinDiskFreeSize = cfgGetItem(pCfg, "minDiskFreeSize")->i64;
tsS3BlockSize = cfgGetItem(pCfg, "s3BlockSize")->i32;
tsS3BlockCacheSize = cfgGetItem(pCfg, "s3BlockCacheSize")->i32;
// tsS3BlockSize = cfgGetItem(pCfg, "s3BlockSize")->i32;
// tsS3BlockCacheSize = cfgGetItem(pCfg, "s3BlockCacheSize")->i32;
tsS3PageCacheSize = cfgGetItem(pCfg, "s3PageCacheSize")->i32;
tsS3UploadDelaySec = cfgGetItem(pCfg, "s3UploadDelaySec")->i32;
@ -1516,6 +1527,8 @@ static int32_t taosCfgDynamicOptionsForServer(SConfig *pCfg, char *name) {
{"ttlBatchDropNum", &tsTtlBatchDropNum},
{"ttlFlushThreshold", &tsTtlFlushThreshold},
{"ttlPushInterval", &tsTtlPushIntervalSec},
{"s3MigrateIntervalSec", &tsS3MigrateIntervalSec},
{"s3MigrateEnabled", &tsS3MigrateEnabled},
//{"s3BlockSize", &tsS3BlockSize},
{"s3BlockCacheSize", &tsS3BlockCacheSize},
{"s3PageCacheSize", &tsS3PageCacheSize},

View File

@ -1471,9 +1471,7 @@ int32_t tDeserializeSStatisReq(void *buf, int32_t bufLen, SStatisReq *pReq) {
return 0;
}
void tFreeSStatisReq(SStatisReq *pReq) {
taosMemoryFreeClear(pReq->pCont);
}
void tFreeSStatisReq(SStatisReq *pReq) { taosMemoryFreeClear(pReq->pCont); }
// int32_t tSerializeSCreateAcctReq(void *buf, int32_t bufLen, SCreateAcctReq *pReq) {
// SEncoder encoder = {0};
@ -1872,7 +1870,7 @@ int32_t tSerializeSGetUserAuthRspImpl(SEncoder *pEncoder, SGetUserAuthRsp *pRsp)
char *tb = taosHashIterate(pRsp->readTbs, NULL);
while (tb != NULL) {
size_t keyLen = 0;
void * key = taosHashGetKey(tb, &keyLen);
void *key = taosHashGetKey(tb, &keyLen);
if (tEncodeI32(pEncoder, keyLen) < 0) return -1;
if (tEncodeCStr(pEncoder, key) < 0) return -1;
@ -1887,7 +1885,7 @@ int32_t tSerializeSGetUserAuthRspImpl(SEncoder *pEncoder, SGetUserAuthRsp *pRsp)
tb = taosHashIterate(pRsp->writeTbs, NULL);
while (tb != NULL) {
size_t keyLen = 0;
void * key = taosHashGetKey(tb, &keyLen);
void *key = taosHashGetKey(tb, &keyLen);
if (tEncodeI32(pEncoder, keyLen) < 0) return -1;
if (tEncodeCStr(pEncoder, key) < 0) return -1;
@ -1902,7 +1900,7 @@ int32_t tSerializeSGetUserAuthRspImpl(SEncoder *pEncoder, SGetUserAuthRsp *pRsp)
tb = taosHashIterate(pRsp->alterTbs, NULL);
while (tb != NULL) {
size_t keyLen = 0;
void * key = taosHashGetKey(tb, &keyLen);
void *key = taosHashGetKey(tb, &keyLen);
if (tEncodeI32(pEncoder, keyLen) < 0) return -1;
if (tEncodeCStr(pEncoder, key) < 0) return -1;
@ -1917,7 +1915,7 @@ int32_t tSerializeSGetUserAuthRspImpl(SEncoder *pEncoder, SGetUserAuthRsp *pRsp)
tb = taosHashIterate(pRsp->readViews, NULL);
while (tb != NULL) {
size_t keyLen = 0;
void * key = taosHashGetKey(tb, &keyLen);
void *key = taosHashGetKey(tb, &keyLen);
if (tEncodeI32(pEncoder, keyLen) < 0) return -1;
if (tEncodeCStr(pEncoder, key) < 0) return -1;
@ -1932,7 +1930,7 @@ int32_t tSerializeSGetUserAuthRspImpl(SEncoder *pEncoder, SGetUserAuthRsp *pRsp)
tb = taosHashIterate(pRsp->writeViews, NULL);
while (tb != NULL) {
size_t keyLen = 0;
void * key = taosHashGetKey(tb, &keyLen);
void *key = taosHashGetKey(tb, &keyLen);
if (tEncodeI32(pEncoder, keyLen) < 0) return -1;
if (tEncodeCStr(pEncoder, key) < 0) return -1;
@ -1947,7 +1945,7 @@ int32_t tSerializeSGetUserAuthRspImpl(SEncoder *pEncoder, SGetUserAuthRsp *pRsp)
tb = taosHashIterate(pRsp->alterViews, NULL);
while (tb != NULL) {
size_t keyLen = 0;
void * key = taosHashGetKey(tb, &keyLen);
void *key = taosHashGetKey(tb, &keyLen);
if (tEncodeI32(pEncoder, keyLen) < 0) return -1;
if (tEncodeCStr(pEncoder, key) < 0) return -1;
@ -1962,7 +1960,7 @@ int32_t tSerializeSGetUserAuthRspImpl(SEncoder *pEncoder, SGetUserAuthRsp *pRsp)
int32_t *useDb = taosHashIterate(pRsp->useDbs, NULL);
while (useDb != NULL) {
size_t keyLen = 0;
void * key = taosHashGetKey(useDb, &keyLen);
void *key = taosHashGetKey(useDb, &keyLen);
if (tEncodeI32(pEncoder, keyLen) < 0) return -1;
if (tEncodeCStr(pEncoder, key) < 0) return -1;
@ -3022,6 +3020,9 @@ int32_t tSerializeSCreateDbReq(void *buf, int32_t bufLen, SCreateDbReq *pReq) {
}
if (tEncodeI32(&encoder, pReq->tsdbPageSize) < 0) return -1;
if (tEncodeI32(&encoder, pReq->keepTimeOffset) < 0) return -1;
if (tEncodeI32(&encoder, pReq->s3ChunkSize) < 0) return -1;
if (tEncodeI32(&encoder, pReq->s3KeepLocal) < 0) return -1;
if (tEncodeI8(&encoder, pReq->s3Compact) < 0) return -1;
ENCODESQL();
tEndEncode(&encoder);
@ -3091,6 +3092,15 @@ int32_t tDeserializeSCreateDbReq(void *buf, int32_t bufLen, SCreateDbReq *pReq)
if (tDecodeI32(&decoder, &pReq->keepTimeOffset) < 0) return -1;
}
pReq->s3ChunkSize = TSDB_DEFAULT_S3_CHUNK_SIZE;
pReq->s3KeepLocal = TSDB_DEFAULT_S3_KEEP_LOCAL;
pReq->s3Compact = TSDB_DEFAULT_S3_COMPACT;
if (!tDecodeIsEnd(&decoder)) {
if (tDecodeI32(&decoder, &pReq->s3ChunkSize) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->s3KeepLocal) < 0) return -1;
if (tDecodeI8(&decoder, &pReq->s3Compact) < 0) return -1;
}
DECODESQL();
tEndDecode(&decoder);
@ -3132,6 +3142,10 @@ int32_t tSerializeSAlterDbReq(void *buf, int32_t bufLen, SAlterDbReq *pReq) {
if (tEncodeI32(&encoder, pReq->walRetentionPeriod) < 0) return -1;
if (tEncodeI32(&encoder, pReq->walRetentionSize) < 0) return -1;
if (tEncodeI32(&encoder, pReq->keepTimeOffset) < 0) return -1;
if (tEncodeI32(&encoder, pReq->s3KeepLocal) < 0) return -1;
if (tEncodeI8(&encoder, pReq->s3Compact) < 0) return -1;
ENCODESQL();
tEndEncode(&encoder);
@ -3181,6 +3195,13 @@ int32_t tDeserializeSAlterDbReq(void *buf, int32_t bufLen, SAlterDbReq *pReq) {
if (tDecodeI32(&decoder, &pReq->keepTimeOffset) < 0) return -1;
}
pReq->s3KeepLocal = TSDB_DEFAULT_S3_KEEP_LOCAL;
pReq->s3Compact = TSDB_DEFAULT_S3_COMPACT;
if (!tDecodeIsEnd(&decoder)) {
if (tDecodeI32(&decoder, &pReq->s3KeepLocal) < 0) return -1;
if (tDecodeI8(&decoder, &pReq->s3Compact) < 0) return -1;
}
DECODESQL();
tEndDecode(&decoder);
@ -3873,6 +3894,57 @@ int32_t tDeserializeSVTrimDbReq(void *buf, int32_t bufLen, SVTrimDbReq *pReq) {
return 0;
}
int32_t tSerializeSS3MigrateDbReq(void *buf, int32_t bufLen, SS3MigrateDbReq *pReq) {
SEncoder encoder = {0};
tEncoderInit(&encoder, buf, bufLen);
if (tStartEncode(&encoder) < 0) return -1;
if (tEncodeCStr(&encoder, pReq->db) < 0) return -1;
tEndEncode(&encoder);
int32_t tlen = encoder.pos;
tEncoderClear(&encoder);
return tlen;
}
int32_t tDeserializeSS3MigrateDbReq(void *buf, int32_t bufLen, SS3MigrateDbReq *pReq) {
SDecoder decoder = {0};
tDecoderInit(&decoder, buf, bufLen);
if (tStartDecode(&decoder) < 0) return -1;
if (tDecodeCStrTo(&decoder, pReq->db) < 0) return -1;
tEndDecode(&decoder);
tDecoderClear(&decoder);
return 0;
}
int32_t tSerializeSVS3MigrateDbReq(void *buf, int32_t bufLen, SVS3MigrateDbReq *pReq) {
SEncoder encoder = {0};
tEncoderInit(&encoder, buf, bufLen);
if (tStartEncode(&encoder) < 0) return -1;
if (tEncodeI32(&encoder, pReq->timestamp) < 0) return -1;
tEndEncode(&encoder);
int32_t tlen = encoder.pos;
tEncoderClear(&encoder);
return tlen;
}
int32_t tDeserializeSVS3MigrateDbReq(void *buf, int32_t bufLen, SVS3MigrateDbReq *pReq) {
SDecoder decoder = {0};
tDecoderInit(&decoder, buf, bufLen);
if (tStartDecode(&decoder) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->timestamp) < 0) return -1;
tEndDecode(&decoder);
tDecoderClear(&decoder);
return 0;
}
int32_t tSerializeSVDropTtlTableReq(void *buf, int32_t bufLen, SVDropTtlTableReq *pReq) {
SEncoder encoder = {0};
tEncoderInit(&encoder, buf, bufLen);
@ -3969,6 +4041,9 @@ int32_t tSerializeSDbCfgRspImpl(SEncoder *encoder, const SDbCfgRsp *pRsp) {
if (tEncodeI8(encoder, pRsp->schemaless) < 0) return -1;
if (tEncodeI16(encoder, pRsp->sstTrigger) < 0) return -1;
if (tEncodeI32(encoder, pRsp->keepTimeOffset) < 0) return -1;
if (tEncodeI32(encoder, pRsp->s3ChunkSize) < 0) return -1;
if (tEncodeI32(encoder, pRsp->s3KeepLocal) < 0) return -1;
if (tEncodeI8(encoder, pRsp->s3Compact) < 0) return -1;
return 0;
}
@ -4042,6 +4117,15 @@ int32_t tDeserializeSDbCfgRspImpl(SDecoder *decoder, SDbCfgRsp *pRsp) {
if (tDecodeI32(decoder, &pRsp->keepTimeOffset) < 0) return -1;
}
pRsp->s3ChunkSize = TSDB_DEFAULT_S3_CHUNK_SIZE;
pRsp->s3KeepLocal = TSDB_DEFAULT_S3_KEEP_LOCAL;
pRsp->s3Compact = TSDB_DEFAULT_S3_COMPACT;
if (!tDecodeIsEnd(decoder)) {
if (tDecodeI32(decoder, &pRsp->s3ChunkSize) < 0) return -1;
if (tDecodeI32(decoder, &pRsp->s3KeepLocal) < 0) return -1;
if (tDecodeI8(decoder, &pRsp->s3Compact) < 0) return -1;
}
return 0;
}
@ -5065,7 +5149,7 @@ int32_t tSerializeSCreateVnodeReq(void *buf, int32_t bufLen, SCreateVnodeReq *pR
if (tEncodeI16(&encoder, pReq->hashPrefix) < 0) return -1;
if (tEncodeI16(&encoder, pReq->hashSuffix) < 0) return -1;
if (tEncodeI32(&encoder, pReq->tsdbPageSize) < 0) return -1;
for (int32_t i = 0; i < 8; ++i) {
for (int32_t i = 0; i < 6; ++i) {
if (tEncodeI64(&encoder, pReq->reserved[i]) < 0) return -1;
}
if (tEncodeI8(&encoder, pReq->learnerReplica) < 0) return -1;
@ -5076,6 +5160,9 @@ int32_t tSerializeSCreateVnodeReq(void *buf, int32_t bufLen, SCreateVnodeReq *pR
}
if (tEncodeI32(&encoder, pReq->changeVersion) < 0) return -1;
if (tEncodeI32(&encoder, pReq->keepTimeOffset) < 0) return -1;
if (tEncodeI32(&encoder, pReq->s3ChunkSize) < 0) return -1;
if (tEncodeI32(&encoder, pReq->s3KeepLocal) < 0) return -1;
if (tEncodeI8(&encoder, pReq->s3Compact) < 0) return -1;
tEndEncode(&encoder);
@ -5151,7 +5238,7 @@ int32_t tDeserializeSCreateVnodeReq(void *buf, int32_t bufLen, SCreateVnodeReq *
if (tDecodeI16(&decoder, &pReq->hashPrefix) < 0) return -1;
if (tDecodeI16(&decoder, &pReq->hashSuffix) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->tsdbPageSize) < 0) return -1;
for (int32_t i = 0; i < 8; ++i) {
for (int32_t i = 0; i < 6; ++i) {
if (tDecodeI64(&decoder, &pReq->reserved[i]) < 0) return -1;
}
if (!tDecodeIsEnd(&decoder)) {
@ -5170,6 +5257,15 @@ int32_t tDeserializeSCreateVnodeReq(void *buf, int32_t bufLen, SCreateVnodeReq *
if (tDecodeI32(&decoder, &pReq->keepTimeOffset) < 0) return -1;
}
pReq->s3ChunkSize = TSDB_DEFAULT_S3_CHUNK_SIZE;
pReq->s3KeepLocal = TSDB_DEFAULT_S3_KEEP_LOCAL;
pReq->s3Compact = TSDB_DEFAULT_S3_COMPACT;
if (!tDecodeIsEnd(&decoder)) {
if (tDecodeI32(&decoder, &pReq->s3ChunkSize) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->s3KeepLocal) < 0) return -1;
if (tDecodeI8(&decoder, &pReq->s3Compact) < 0) return -1;
}
tEndDecode(&decoder);
tDecoderClear(&decoder);
return 0;
@ -5425,7 +5521,7 @@ int32_t tSerializeSAlterVnodeConfigReq(void *buf, int32_t bufLen, SAlterVnodeCon
if (tEncodeI8(&encoder, pReq->walLevel) < 0) return -1;
if (tEncodeI8(&encoder, pReq->strict) < 0) return -1;
if (tEncodeI8(&encoder, pReq->cacheLast) < 0) return -1;
for (int32_t i = 0; i < 8; ++i) {
for (int32_t i = 0; i < 7; ++i) {
if (tEncodeI64(&encoder, pReq->reserved[i]) < 0) return -1;
}
@ -5436,6 +5532,10 @@ int32_t tSerializeSAlterVnodeConfigReq(void *buf, int32_t bufLen, SAlterVnodeCon
if (tEncodeI32(&encoder, pReq->walRetentionPeriod) < 0) return -1;
if (tEncodeI32(&encoder, pReq->walRetentionSize) < 0) return -1;
if (tEncodeI32(&encoder, pReq->keepTimeOffset) < 0) return -1;
if (tEncodeI32(&encoder, pReq->s3KeepLocal) < 0) return -1;
if (tEncodeI8(&encoder, pReq->s3Compact) < 0) return -1;
tEndEncode(&encoder);
int32_t tlen = encoder.pos;
@ -5461,7 +5561,7 @@ int32_t tDeserializeSAlterVnodeConfigReq(void *buf, int32_t bufLen, SAlterVnodeC
if (tDecodeI8(&decoder, &pReq->walLevel) < 0) return -1;
if (tDecodeI8(&decoder, &pReq->strict) < 0) return -1;
if (tDecodeI8(&decoder, &pReq->cacheLast) < 0) return -1;
for (int32_t i = 0; i < 8; ++i) {
for (int32_t i = 0; i < 7; ++i) {
if (tDecodeI64(&decoder, &pReq->reserved[i]) < 0) return -1;
}
@ -5487,6 +5587,13 @@ int32_t tDeserializeSAlterVnodeConfigReq(void *buf, int32_t bufLen, SAlterVnodeC
if (tDecodeI32(&decoder, &pReq->keepTimeOffset) < 0) return -1;
}
pReq->s3KeepLocal = TSDB_DEFAULT_S3_KEEP_LOCAL;
pReq->s3Compact = TSDB_DEFAULT_S3_COMPACT;
if (!tDecodeIsEnd(&decoder)) {
if (tDecodeI32(&decoder, &pReq->s3KeepLocal) < 0) return -1;
if (tDecodeI8(&decoder, &pReq->s3Compact) < 0) return -1;
}
tEndDecode(&decoder);
tDecoderClear(&decoder);
return 0;
@ -7008,27 +7115,27 @@ void tFreeSSchedulerHbRsp(SSchedulerHbRsp *pRsp) { taosArrayDestroy(pRsp->taskSt
// return 0;
// }
//int32_t tDeserializeSVCreateTbBatchRsp(void *buf, int32_t bufLen, SVCreateTbBatchRsp *pRsp) {
// SDecoder decoder = {0};
// int32_t num = 0;
// tDecoderInit(&decoder, buf, bufLen);
// int32_t tDeserializeSVCreateTbBatchRsp(void *buf, int32_t bufLen, SVCreateTbBatchRsp *pRsp) {
// SDecoder decoder = {0};
// int32_t num = 0;
// tDecoderInit(&decoder, buf, bufLen);
// if (tStartDecode(&decoder) < 0) return -1;
// if (tDecodeI32(&decoder, &num) < 0) return -1;
// if (num > 0) {
// pRsp->rspList = taosArrayInit(num, sizeof(SVCreateTbRsp));
// if (NULL == pRsp->rspList) return -1;
// for (int32_t i = 0; i < num; ++i) {
// SVCreateTbRsp rsp = {0};
// if (tDecodeI32(&decoder, &rsp.code) < 0) return -1;
// if (NULL == taosArrayPush(pRsp->rspList, &rsp)) return -1;
// }
// } else {
// pRsp->rspList = NULL;
// }
// tEndDecode(&decoder);
// if (tStartDecode(&decoder) < 0) return -1;
// if (tDecodeI32(&decoder, &num) < 0) return -1;
// if (num > 0) {
// pRsp->rspList = taosArrayInit(num, sizeof(SVCreateTbRsp));
// if (NULL == pRsp->rspList) return -1;
// for (int32_t i = 0; i < num; ++i) {
// SVCreateTbRsp rsp = {0};
// if (tDecodeI32(&decoder, &rsp.code) < 0) return -1;
// if (NULL == taosArrayPush(pRsp->rspList, &rsp)) return -1;
// }
// } else {
// pRsp->rspList = NULL;
// }
// tEndDecode(&decoder);
// tDecoderClear(&decoder);
// tDecoderClear(&decoder);
// return 0;
//}
@ -7301,8 +7408,8 @@ int32_t tSerializeSCMCreateStreamReq(void *buf, int32_t bufLen, const SCMCreateS
if (tEncodeI32(&encoder, taosArrayGetSize(pReq->pVgroupVerList)) < 0) return -1;
for(int32_t i = 0; i < taosArrayGetSize(pReq->pVgroupVerList); ++i) {
SVgroupVer* p = taosArrayGet(pReq->pVgroupVerList, i);
for (int32_t i = 0; i < taosArrayGetSize(pReq->pVgroupVerList); ++i) {
SVgroupVer *p = taosArrayGet(pReq->pVgroupVerList, i);
if (tEncodeI32(&encoder, p->vgId) < 0) return -1;
if (tEncodeI64(&encoder, p->ver) < 0) return -1;
}
@ -8478,7 +8585,7 @@ int32_t tEncodeMqDataRspCommon(SEncoder *pEncoder, const SMqDataRsp *pRsp) {
for (int32_t i = 0; i < pRsp->blockNum; i++) {
int32_t bLen = *(int32_t *)taosArrayGet(pRsp->blockDataLen, i);
void * data = taosArrayGetP(pRsp->blockData, i);
void *data = taosArrayGetP(pRsp->blockData, i);
if (tEncodeBinary(pEncoder, (const uint8_t *)data, bLen) < 0) return -1;
if (pRsp->withSchema) {
SSchemaWrapper *pSW = (SSchemaWrapper *)taosArrayGetP(pRsp->blockSchema, i);
@ -8516,7 +8623,7 @@ int32_t tDecodeMqDataRspCommon(SDecoder *pDecoder, SMqDataRsp *pRsp) {
}
for (int32_t i = 0; i < pRsp->blockNum; i++) {
void * data;
void *data;
uint64_t bLen;
if (tDecodeBinaryAlloc(pDecoder, &data, &bLen) < 0) return -1;
taosArrayPush(pRsp->blockData, &data);
@ -8570,7 +8677,7 @@ int32_t tEncodeSTaosxRsp(SEncoder *pEncoder, const STaosxRsp *pRsp) {
if (tEncodeI32(pEncoder, pRsp->createTableNum) < 0) return -1;
if (pRsp->createTableNum) {
for (int32_t i = 0; i < pRsp->createTableNum; i++) {
void * createTableReq = taosArrayGetP(pRsp->createTableReq, i);
void *createTableReq = taosArrayGetP(pRsp->createTableReq, i);
int32_t createTableLen = *(int32_t *)taosArrayGet(pRsp->createTableLen, i);
if (tEncodeBinary(pEncoder, createTableReq, createTableLen) < 0) return -1;
}
@ -8579,14 +8686,14 @@ int32_t tEncodeSTaosxRsp(SEncoder *pEncoder, const STaosxRsp *pRsp) {
}
int32_t tDecodeSTaosxRsp(SDecoder *pDecoder, STaosxRsp *pRsp) {
if (tDecodeMqDataRspCommon(pDecoder, (SMqDataRsp*)pRsp) < 0) return -1;
if (tDecodeMqDataRspCommon(pDecoder, (SMqDataRsp *)pRsp) < 0) return -1;
if (tDecodeI32(pDecoder, &pRsp->createTableNum) < 0) return -1;
if (pRsp->createTableNum) {
pRsp->createTableLen = taosArrayInit(pRsp->createTableNum, sizeof(int32_t));
pRsp->createTableReq = taosArrayInit(pRsp->createTableNum, sizeof(void *));
for (int32_t i = 0; i < pRsp->createTableNum; i++) {
void * pCreate = NULL;
void *pCreate = NULL;
uint64_t len;
if (tDecodeBinaryAlloc(pDecoder, &pCreate, &len) < 0) return -1;
int32_t l = (int32_t)len;
@ -8889,7 +8996,7 @@ void tDestroySubmitTbData(SSubmitTbData *pTbData, int32_t flag) {
taosArrayDestroy(pTbData->aCol);
} else {
int32_t nRow = TARRAY_SIZE(pTbData->aRowP);
SRow ** rows = (SRow **)TARRAY_DATA(pTbData->aRowP);
SRow **rows = (SRow **)TARRAY_DATA(pTbData->aRowP);
for (int32_t i = 0; i < nRow; ++i) {
tRowDestroy(rows[i]);

View File

@ -21,8 +21,8 @@
#include "tgrant.h"
#include "tjson.h"
#include "tlog.h"
#include "tutil.h"
#include "tunit.h"
#include "tutil.h"
#define CFG_NAME_PRINT_LEN 24
#define CFG_SRC_PRINT_LEN 12
@ -310,19 +310,19 @@ static int32_t cfgSetTfsItem(SConfig *pCfg, const char *name, const char *value,
static int32_t cfgUpdateDebugFlagItem(SConfig *pCfg, const char *name, bool resetArray) {
SConfigItem *pDebugFlagItem = cfgGetItem(pCfg, "debugFlag");
if (resetArray) {
// reset
if (pDebugFlagItem == NULL) return -1;
// reset
if (pDebugFlagItem == NULL) return -1;
// logflag names that should 'not' be set by 'debugFlag'
// logflag names that should 'not' be set by 'debugFlag'
if (pDebugFlagItem->array == NULL) {
pDebugFlagItem->array = taosArrayInit(16, sizeof(SLogVar));
if (pDebugFlagItem->array == NULL) {
pDebugFlagItem->array = taosArrayInit(16, sizeof(SLogVar));
if (pDebugFlagItem->array == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
taosArrayClear(pDebugFlagItem->array);
return 0;
}
taosArrayClear(pDebugFlagItem->array);
return 0;
}
// update
@ -401,8 +401,7 @@ int32_t cfgCheckRangeForDynUpdate(SConfig *pCfg, const char *name, const char *p
case CFG_DTYPE_BOOL: {
int32_t ival = (int32_t)atoi(pVal);
if (ival != 0 && ival != 1) {
uError("cfg:%s, type:%s value:%d out of range[0, 1]", pItem->name,
cfgDtypeStr(pItem->dtype), ival);
uError("cfg:%s, type:%s value:%d out of range[0, 1]", pItem->name, cfgDtypeStr(pItem->dtype), ival);
terrno = TSDB_CODE_OUT_OF_RANGE;
return -1;
}
@ -670,6 +669,89 @@ void cfgDumpItemScope(SConfigItem *pItem, char *buf, int32_t bufSize, int32_t *p
*pLen = len;
}
void cfgDumpCfgS3(SConfig *pCfg, bool tsc, bool dump) {
if (dump) {
printf(" s3 config");
printf("\n");
printf("=================================================================");
printf("\n");
} else {
uInfo(" s3 config");
uInfo("=================================================================");
}
char src[CFG_SRC_PRINT_LEN + 1] = {0};
char name[CFG_NAME_PRINT_LEN + 1] = {0};
int32_t size = taosArrayGetSize(pCfg->array);
for (int32_t i = 0; i < size; ++i) {
SConfigItem *pItem = taosArrayGet(pCfg->array, i);
if (tsc && pItem->scope == CFG_SCOPE_SERVER) continue;
if (dump && strcmp(pItem->name, "scriptDir") == 0) continue;
if (dump && strncmp(pItem->name, "s3", 2) != 0) continue;
tstrncpy(src, cfgStypeStr(pItem->stype), CFG_SRC_PRINT_LEN);
for (int32_t j = 0; j < CFG_SRC_PRINT_LEN; ++j) {
if (src[j] == 0) src[j] = ' ';
}
tstrncpy(name, pItem->name, CFG_NAME_PRINT_LEN);
for (int32_t j = 0; j < CFG_NAME_PRINT_LEN; ++j) {
if (name[j] == 0) name[j] = ' ';
}
switch (pItem->dtype) {
case CFG_DTYPE_BOOL:
if (dump) {
printf("%s %s %u\n", src, name, pItem->bval);
} else {
uInfo("%s %s %u", src, name, pItem->bval);
}
break;
case CFG_DTYPE_INT32:
if (dump) {
printf("%s %s %d\n", src, name, pItem->i32);
} else {
uInfo("%s %s %d", src, name, pItem->i32);
}
break;
case CFG_DTYPE_INT64:
if (dump) {
printf("%s %s %" PRId64 "\n", src, name, pItem->i64);
} else {
uInfo("%s %s %" PRId64, src, name, pItem->i64);
}
break;
case CFG_DTYPE_DOUBLE:
case CFG_DTYPE_FLOAT:
if (dump) {
printf("%s %s %.2f\n", src, name, pItem->fval);
} else {
uInfo("%s %s %.2f", src, name, pItem->fval);
}
break;
case CFG_DTYPE_STRING:
case CFG_DTYPE_DIR:
case CFG_DTYPE_LOCALE:
case CFG_DTYPE_CHARSET:
case CFG_DTYPE_TIMEZONE:
case CFG_DTYPE_NONE:
if (dump) {
printf("%s %s %s\n", src, name, pItem->str);
} else {
uInfo("%s %s %s", src, name, pItem->str);
}
break;
}
}
if (dump) {
printf("=================================================================\n");
} else {
uInfo("=================================================================");
}
}
void cfgDumpCfg(SConfig *pCfg, bool tsc, bool dump) {
if (dump) {
printf(" global config");
@ -717,7 +799,7 @@ void cfgDumpCfg(SConfig *pCfg, bool tsc, bool dump) {
break;
case CFG_DTYPE_INT64:
if (dump) {
printf("%s %s %" PRId64"\n", src, name, pItem->i64);
printf("%s %s %" PRId64 "\n", src, name, pItem->i64);
} else {
uInfo("%s %s %" PRId64, src, name, pItem->i64);
}