From 44b9785853bdaca643672f4df592964ed211f69a Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Mon, 8 Apr 2024 10:33:02 +0800 Subject: [PATCH] cos/multi-write: include headers part --- include/common/cos.h | 6 +- include/common/tglobal.h | 14 +- include/common/tmsg.h | 50 ++- include/common/tmsgdef.h | 3 + include/common/ttokendef.h | 572 +++++++++++++++++----------------- include/libs/nodes/cmdnodes.h | 9 + include/util/tconfig.h | 7 +- include/util/tdef.h | 26 +- source/common/CMakeLists.txt | 5 +- source/common/src/cos.c | 207 +++++++++++- source/common/src/systable.c | 5 +- source/common/src/tglobal.c | 37 ++- source/common/src/tmsg.c | 189 ++++++++--- source/util/src/tconfig.c | 110 ++++++- 14 files changed, 844 insertions(+), 396 deletions(-) diff --git a/include/common/cos.h b/include/common/cos.h index afeca3ca03..8e48533304 100644 --- a/include/common/cos.h +++ b/include/common/cos.h @@ -33,10 +33,12 @@ extern int32_t tsS3UploadDelaySec; int32_t s3Init(); void s3CleanUp(); +int32_t s3CheckCfg(); int32_t s3PutObjectFromFile(const char *file, const char *object); int32_t s3PutObjectFromFile2(const char *file, const char *object, int8_t withcp); +int32_t s3PutObjectFromFileOffset(const char *file, const char *object_name, int64_t offset, int64_t size); void s3DeleteObjectsByPrefix(const char *prefix); -void s3DeleteObjects(const char *object_name[], int nobject); +int32_t s3DeleteObjects(const char *object_name[], int nobject); bool s3Exists(const char *object_name); bool s3Get(const char *object_name, const char *path); int32_t s3GetObjectBlock(const char *object_name, int64_t offset, int64_t size, bool check, uint8_t **ppBlock); @@ -45,6 +47,8 @@ void s3EvictCache(const char *path, long object_size); long s3Size(const char *object_name); int32_t s3GetObjectToFile(const char *object_name, char *fileName); +#define S3_DATA_CHUNK_PAGES (256 * 1024 * 1024) + #ifdef __cplusplus } #endif diff --git a/include/common/tglobal.h b/include/common/tglobal.h index 93f17fa887..0763321bba 100644 --- a/include/common/tglobal.h +++ b/include/common/tglobal.h @@ -111,9 +111,9 @@ extern int32_t tsMonitorIntervalForBasic; extern bool tsMonitorForceV2; // audit -extern bool tsEnableAudit; -extern bool tsEnableAuditCreateTable; -extern int32_t tsAuditInterval; +extern bool tsEnableAudit; +extern bool tsEnableAuditCreateTable; +extern int32_t tsAuditInterval; // telem extern bool tsEnableTelem; @@ -121,9 +121,9 @@ extern int32_t tsTelemInterval; extern char tsTelemServer[]; extern uint16_t tsTelemPort; extern bool tsEnableCrashReport; -extern char * tsTelemUri; -extern char * tsClientCrashReportUri; -extern char * tsSvrCrashReportUri; +extern char *tsTelemUri; +extern char *tsClientCrashReportUri; +extern char *tsSvrCrashReportUri; // query buffer management extern int32_t tsQueryBufferSize; // maximum allowed usage buffer size in MB for each data node during query processing @@ -209,6 +209,8 @@ extern int32_t tsTtlUnit; extern int32_t tsTtlPushIntervalSec; extern int32_t tsTtlBatchDropNum; extern int32_t tsTrimVDbIntervalSec; +extern int32_t tsS3MigrateIntervalSec; +extern bool tsS3MigrateEnabled; extern int32_t tsGrantHBInterval; extern int32_t tsUptimeInterval; diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 958789178a..44b705a6cb 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -300,7 +300,8 @@ typedef enum ENodeType { QUERY_NODE_GRANT_STMT, QUERY_NODE_REVOKE_STMT, QUERY_NODE_ALTER_CLUSTER_STMT, - // placeholder for [153, 180] + QUERY_NODE_S3MIGRATE_DATABASE_STMT, + // placeholder for [154, 180] QUERY_NODE_SHOW_CREATE_VIEW_STMT = 181, QUERY_NODE_SHOW_CREATE_DATABASE_STMT, QUERY_NODE_SHOW_CREATE_TABLE_STMT, @@ -584,7 +585,7 @@ typedef struct { // int32_t tEncodeSSubmitRsp(SEncoder* pEncoder, const SSubmitRsp* pRsp); // int32_t tDecodeSSubmitRsp(SDecoder* pDecoder, SSubmitRsp* pRsp); // void tFreeSSubmitBlkRsp(void* param); -void tFreeSSubmitRsp(SSubmitRsp* pRsp); +void tFreeSSubmitRsp(SSubmitRsp* pRsp); #define COL_SMA_ON ((int8_t)0x1) #define COL_IDX_ON ((int8_t)0x2) @@ -1149,6 +1150,9 @@ typedef struct { int32_t sstTrigger; int16_t hashPrefix; int16_t hashSuffix; + int32_t s3ChunkSize; + int32_t s3KeepLocal; + int8_t s3Compact; int32_t tsdbPageSize; int32_t sqlLen; char* sql; @@ -1178,6 +1182,8 @@ typedef struct { int32_t minRows; int32_t walRetentionPeriod; int32_t walRetentionSize; + int32_t s3KeepLocal; + int8_t s3Compact; int32_t sqlLen; char* sql; } SAlterDbReq; @@ -1257,6 +1263,20 @@ typedef struct { int32_t tSerializeSVTrimDbReq(void* buf, int32_t bufLen, SVTrimDbReq* pReq); int32_t tDeserializeSVTrimDbReq(void* buf, int32_t bufLen, SVTrimDbReq* pReq); +typedef struct { + char db[TSDB_DB_FNAME_LEN]; +} SS3MigrateDbReq; + +int32_t tSerializeSS3MigrateDbReq(void* buf, int32_t bufLen, SS3MigrateDbReq* pReq); +int32_t tDeserializeSS3MigrateDbReq(void* buf, int32_t bufLen, SS3MigrateDbReq* pReq); + +typedef struct { + int32_t timestamp; +} SVS3MigrateDbReq; + +int32_t tSerializeSVS3MigrateDbReq(void* buf, int32_t bufLen, SVS3MigrateDbReq* pReq); +int32_t tDeserializeSVS3MigrateDbReq(void* buf, int32_t bufLen, SVS3MigrateDbReq* pReq); + typedef struct { int32_t timestampSec; int32_t ttlDropMaxCount; @@ -1293,6 +1313,9 @@ typedef struct { int8_t replications; int8_t strict; int8_t cacheLast; + int32_t s3ChunkSize; + int32_t s3KeepLocal; + int8_t s3Compact; int32_t tsdbPageSize; int32_t walRetentionPeriod; int32_t walRollPeriod; @@ -1582,13 +1605,13 @@ int32_t tDeserializeSStatusReq(void* buf, int32_t bufLen, SStatusReq* pReq); void tFreeSStatusReq(SStatusReq* pReq); typedef struct { - int32_t contLen; - char* pCont; + int32_t contLen; + char* pCont; } SStatisReq; int32_t tSerializeSStatisReq(void* buf, int32_t bufLen, SStatisReq* pReq); int32_t tDeserializeSStatisReq(void* buf, int32_t bufLen, SStatisReq* pReq); -void tFreeSStatisReq(SStatisReq *pReq); +void tFreeSStatisReq(SStatisReq* pReq); typedef struct { int32_t dnodeId; @@ -1693,7 +1716,10 @@ typedef struct { int16_t hashPrefix; int16_t hashSuffix; int32_t tsdbPageSize; - int64_t reserved[8]; + int32_t s3ChunkSize; + int32_t s3KeepLocal; + int8_t s3Compact; + int64_t reserved[6]; int8_t learnerReplica; int8_t learnerSelfIndex; SReplica learnerReplicas[TSDB_MAX_LEARNER_REPLICA]; @@ -1781,13 +1807,15 @@ typedef struct { int8_t walLevel; int8_t strict; int8_t cacheLast; - int64_t reserved[8]; + int64_t reserved[7]; // 1st modification int16_t sttTrigger; int32_t minRows; // 2nd modification int32_t walRetentionPeriod; int32_t walRetentionSize; + int32_t s3KeepLocal; + int8_t s3Compact; } SAlterVnodeConfigReq; int32_t tSerializeSAlterVnodeConfigReq(void* buf, int32_t bufLen, SAlterVnodeConfigReq* pReq); @@ -1946,7 +1974,7 @@ typedef struct { int32_t tSerializeSShowReq(void* buf, int32_t bufLen, SShowReq* pReq); // int32_t tDeserializeSShowReq(void* buf, int32_t bufLen, SShowReq* pReq); -void tFreeSShowReq(SShowReq* pReq); +void tFreeSShowReq(SShowReq* pReq); typedef struct { int64_t showId; @@ -2735,7 +2763,7 @@ typedef struct { SVCreateTbReq* pReqs; SArray* pArray; }; - int8_t source; // TD_REQ_FROM_TAOX-taosX or TD_REQ_FROM_APP-taosClient + int8_t source; // TD_REQ_FROM_TAOX-taosX or TD_REQ_FROM_APP-taosClient } SVCreateTbBatchReq; int tEncodeSVCreateTbBatchReq(SEncoder* pCoder, const SVCreateTbBatchReq* pReq); @@ -2828,7 +2856,7 @@ typedef struct { int32_t newCommentLen; char* newComment; int64_t ctimeMs; // fill by vnode - int8_t source; // TD_REQ_FROM_TAOX-taosX or TD_REQ_FROM_APP-taosClient + int8_t source; // TD_REQ_FROM_TAOX-taosX or TD_REQ_FROM_APP-taosClient } SVAlterTbReq; int32_t tEncodeSVAlterTbReq(SEncoder* pEncoder, const SVAlterTbReq* pReq); @@ -3932,7 +3960,7 @@ int32_t tDeserializeSMqSeekReq(void* buf, int32_t bufLen, SMqSeekReq* pReq); #define SUBMIT_REQ_FROM_FILE 0x4 #define TD_REQ_FROM_TAOX 0x8 -#define TD_REQ_FROM_TAOX_OLD 0x1 // for compatibility +#define TD_REQ_FROM_TAOX_OLD 0x1 // for compatibility typedef struct { int32_t flags; diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index 62392a9caa..4481f1f153 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -220,6 +220,8 @@ TD_DEF_MSG_TYPE(TDMT_MND_COMPACT_TIMER, "compact-tmr", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_STREAM_REQ_CHKPT, "stream-req-checkpoint", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_CONFIG_CLUSTER, "config-cluster", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_MND_S3MIGRATE_DB, "s3migrate-db", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_MND_S3MIGRATE_DB_TIMER, "s3migrate-db-tmr", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_MAX_MSG, "mnd-max", NULL, NULL) TD_CLOSE_MSG_SEG(TDMT_END_MND_MSG) @@ -272,6 +274,7 @@ TD_DEF_MSG_TYPE(TDMT_VND_DISABLE_WRITE, "vnode-disable-write", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_QUERY_COMPACT_PROGRESS, "vnode-query-compact-progress", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_KILL_COMPACT, "kill-compact", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_VND_S3MIGRATE, "vnode-s3migrate", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_MAX_MSG, "vnd-max", NULL, NULL) TD_CLOSE_MSG_SEG(TDMT_END_VND_MSG) diff --git a/include/common/ttokendef.h b/include/common/ttokendef.h index 8f89857d33..fed96c3b2b 100644 --- a/include/common/ttokendef.h +++ b/include/common/ttokendef.h @@ -16,7 +16,6 @@ #ifndef _TD_COMMON_TOKEN_H_ #define _TD_COMMON_TOKEN_H_ - #define TK_OR 1 #define TK_AND 2 #define TK_UNION 3 @@ -85,288 +84,292 @@ #define TK_USE 66 #define TK_FLUSH 67 #define TK_TRIM 68 -#define TK_COMPACT 69 -#define TK_IF 70 -#define TK_NOT 71 -#define TK_EXISTS 72 -#define TK_BUFFER 73 -#define TK_CACHEMODEL 74 -#define TK_CACHESIZE 75 -#define TK_COMP 76 -#define TK_DURATION 77 -#define TK_NK_VARIABLE 78 -#define TK_MAXROWS 79 -#define TK_MINROWS 80 -#define TK_KEEP 81 -#define TK_PAGES 82 -#define TK_PAGESIZE 83 -#define TK_TSDB_PAGESIZE 84 -#define TK_PRECISION 85 -#define TK_REPLICA 86 -#define TK_VGROUPS 87 -#define TK_SINGLE_STABLE 88 -#define TK_RETENTIONS 89 -#define TK_SCHEMALESS 90 -#define TK_WAL_LEVEL 91 -#define TK_WAL_FSYNC_PERIOD 92 -#define TK_WAL_RETENTION_PERIOD 93 -#define TK_WAL_RETENTION_SIZE 94 -#define TK_WAL_ROLL_PERIOD 95 -#define TK_WAL_SEGMENT_SIZE 96 -#define TK_STT_TRIGGER 97 -#define TK_TABLE_PREFIX 98 -#define TK_TABLE_SUFFIX 99 -#define TK_KEEP_TIME_OFFSET 100 -#define TK_NK_COLON 101 -#define TK_BWLIMIT 102 -#define TK_START 103 -#define TK_TIMESTAMP 104 -#define TK_END 105 -#define TK_TABLE 106 -#define TK_NK_LP 107 -#define TK_NK_RP 108 -#define TK_STABLE 109 -#define TK_COLUMN 110 -#define TK_MODIFY 111 -#define TK_RENAME 112 -#define TK_TAG 113 -#define TK_SET 114 -#define TK_NK_EQ 115 -#define TK_USING 116 -#define TK_TAGS 117 -#define TK_BOOL 118 -#define TK_TINYINT 119 -#define TK_SMALLINT 120 -#define TK_INT 121 -#define TK_INTEGER 122 -#define TK_BIGINT 123 -#define TK_FLOAT 124 -#define TK_DOUBLE 125 -#define TK_BINARY 126 -#define TK_NCHAR 127 -#define TK_UNSIGNED 128 -#define TK_JSON 129 -#define TK_VARCHAR 130 -#define TK_MEDIUMBLOB 131 -#define TK_BLOB 132 -#define TK_VARBINARY 133 -#define TK_GEOMETRY 134 -#define TK_DECIMAL 135 -#define TK_COMMENT 136 -#define TK_MAX_DELAY 137 -#define TK_WATERMARK 138 -#define TK_ROLLUP 139 -#define TK_TTL 140 -#define TK_SMA 141 -#define TK_DELETE_MARK 142 -#define TK_FIRST 143 -#define TK_LAST 144 -#define TK_SHOW 145 -#define TK_PRIVILEGES 146 -#define TK_DATABASES 147 -#define TK_TABLES 148 -#define TK_STABLES 149 -#define TK_MNODES 150 -#define TK_QNODES 151 -#define TK_FUNCTIONS 152 -#define TK_INDEXES 153 -#define TK_ACCOUNTS 154 -#define TK_APPS 155 -#define TK_CONNECTIONS 156 -#define TK_LICENCES 157 -#define TK_GRANTS 158 -#define TK_FULL 159 -#define TK_LOGS 160 -#define TK_MACHINES 161 -#define TK_QUERIES 162 -#define TK_SCORES 163 -#define TK_TOPICS 164 -#define TK_VARIABLES 165 -#define TK_BNODES 166 -#define TK_SNODES 167 -#define TK_TRANSACTIONS 168 -#define TK_DISTRIBUTED 169 -#define TK_CONSUMERS 170 -#define TK_SUBSCRIPTIONS 171 -#define TK_VNODES 172 -#define TK_ALIVE 173 -#define TK_VIEWS 174 -#define TK_VIEW 175 -#define TK_COMPACTS 176 -#define TK_NORMAL 177 -#define TK_CHILD 178 -#define TK_LIKE 179 -#define TK_TBNAME 180 -#define TK_QTAGS 181 -#define TK_AS 182 -#define TK_SYSTEM 183 -#define TK_INDEX 184 -#define TK_FUNCTION 185 -#define TK_INTERVAL 186 -#define TK_COUNT 187 -#define TK_LAST_ROW 188 -#define TK_META 189 -#define TK_ONLY 190 -#define TK_TOPIC 191 -#define TK_CONSUMER 192 -#define TK_GROUP 193 -#define TK_DESC 194 -#define TK_DESCRIBE 195 -#define TK_RESET 196 -#define TK_QUERY 197 -#define TK_CACHE 198 -#define TK_EXPLAIN 199 -#define TK_ANALYZE 200 -#define TK_VERBOSE 201 -#define TK_NK_BOOL 202 -#define TK_RATIO 203 -#define TK_NK_FLOAT 204 -#define TK_OUTPUTTYPE 205 -#define TK_AGGREGATE 206 -#define TK_BUFSIZE 207 -#define TK_LANGUAGE 208 -#define TK_REPLACE 209 -#define TK_STREAM 210 -#define TK_INTO 211 -#define TK_PAUSE 212 -#define TK_RESUME 213 -#define TK_TRIGGER 214 -#define TK_AT_ONCE 215 -#define TK_WINDOW_CLOSE 216 -#define TK_IGNORE 217 -#define TK_EXPIRED 218 -#define TK_FILL_HISTORY 219 -#define TK_UPDATE 220 -#define TK_SUBTABLE 221 -#define TK_UNTREATED 222 -#define TK_KILL 223 -#define TK_CONNECTION 224 -#define TK_TRANSACTION 225 -#define TK_BALANCE 226 -#define TK_VGROUP 227 -#define TK_LEADER 228 -#define TK_MERGE 229 -#define TK_REDISTRIBUTE 230 -#define TK_SPLIT 231 -#define TK_DELETE 232 -#define TK_INSERT 233 -#define TK_NULL 234 -#define TK_NK_QUESTION 235 -#define TK_NK_ALIAS 236 -#define TK_NK_ARROW 237 -#define TK_ROWTS 238 -#define TK_QSTART 239 -#define TK_QEND 240 -#define TK_QDURATION 241 -#define TK_WSTART 242 -#define TK_WEND 243 -#define TK_WDURATION 244 -#define TK_IROWTS 245 -#define TK_ISFILLED 246 -#define TK_CAST 247 -#define TK_NOW 248 -#define TK_TODAY 249 -#define TK_TIMEZONE 250 -#define TK_CLIENT_VERSION 251 -#define TK_SERVER_VERSION 252 -#define TK_SERVER_STATUS 253 -#define TK_CURRENT_USER 254 -#define TK_CASE 255 -#define TK_WHEN 256 -#define TK_THEN 257 -#define TK_ELSE 258 -#define TK_BETWEEN 259 -#define TK_IS 260 -#define TK_NK_LT 261 -#define TK_NK_GT 262 -#define TK_NK_LE 263 -#define TK_NK_GE 264 -#define TK_NK_NE 265 -#define TK_MATCH 266 -#define TK_NMATCH 267 -#define TK_CONTAINS 268 -#define TK_IN 269 -#define TK_JOIN 270 -#define TK_INNER 271 -#define TK_SELECT 272 -#define TK_NK_HINT 273 -#define TK_DISTINCT 274 -#define TK_WHERE 275 -#define TK_PARTITION 276 -#define TK_BY 277 -#define TK_SESSION 278 -#define TK_STATE_WINDOW 279 -#define TK_EVENT_WINDOW 280 -#define TK_COUNT_WINDOW 281 -#define TK_SLIDING 282 -#define TK_FILL 283 -#define TK_VALUE 284 -#define TK_VALUE_F 285 -#define TK_NONE 286 -#define TK_PREV 287 -#define TK_NULL_F 288 -#define TK_LINEAR 289 -#define TK_NEXT 290 -#define TK_HAVING 291 -#define TK_RANGE 292 -#define TK_EVERY 293 -#define TK_ORDER 294 -#define TK_SLIMIT 295 -#define TK_SOFFSET 296 -#define TK_LIMIT 297 -#define TK_OFFSET 298 -#define TK_ASC 299 -#define TK_NULLS 300 -#define TK_ABORT 301 -#define TK_AFTER 302 -#define TK_ATTACH 303 -#define TK_BEFORE 304 -#define TK_BEGIN 305 -#define TK_BITAND 306 -#define TK_BITNOT 307 -#define TK_BITOR 308 -#define TK_BLOCKS 309 -#define TK_CHANGE 310 -#define TK_COMMA 311 -#define TK_CONCAT 312 -#define TK_CONFLICT 313 -#define TK_COPY 314 -#define TK_DEFERRED 315 -#define TK_DELIMITERS 316 -#define TK_DETACH 317 -#define TK_DIVIDE 318 -#define TK_DOT 319 -#define TK_EACH 320 -#define TK_FAIL 321 -#define TK_FILE 322 -#define TK_FOR 323 -#define TK_GLOB 324 -#define TK_ID 325 -#define TK_IMMEDIATE 326 -#define TK_IMPORT 327 -#define TK_INITIALLY 328 -#define TK_INSTEAD 329 -#define TK_ISNULL 330 -#define TK_KEY 331 -#define TK_MODULES 332 -#define TK_NK_BITNOT 333 -#define TK_NK_SEMI 334 -#define TK_NOTNULL 335 -#define TK_OF 336 -#define TK_PLUS 337 -#define TK_PRIVILEGE 338 -#define TK_RAISE 339 -#define TK_RESTRICT 340 -#define TK_ROW 341 -#define TK_SEMI 342 -#define TK_STAR 343 -#define TK_STATEMENT 344 -#define TK_STRICT 345 -#define TK_STRING 346 -#define TK_TIMES 347 -#define TK_VALUES 348 -#define TK_VARIABLE 349 -#define TK_WAL 350 +#define TK_S3MIGRATE 69 +#define TK_COMPACT 70 +#define TK_IF 71 +#define TK_NOT 72 +#define TK_EXISTS 73 +#define TK_BUFFER 74 +#define TK_CACHEMODEL 75 +#define TK_CACHESIZE 76 +#define TK_COMP 77 +#define TK_DURATION 78 +#define TK_NK_VARIABLE 79 +#define TK_MAXROWS 80 +#define TK_MINROWS 81 +#define TK_KEEP 82 +#define TK_PAGES 83 +#define TK_PAGESIZE 84 +#define TK_TSDB_PAGESIZE 85 +#define TK_PRECISION 86 +#define TK_REPLICA 87 +#define TK_VGROUPS 88 +#define TK_SINGLE_STABLE 89 +#define TK_RETENTIONS 90 +#define TK_SCHEMALESS 91 +#define TK_WAL_LEVEL 92 +#define TK_WAL_FSYNC_PERIOD 93 +#define TK_WAL_RETENTION_PERIOD 94 +#define TK_WAL_RETENTION_SIZE 95 +#define TK_WAL_ROLL_PERIOD 96 +#define TK_WAL_SEGMENT_SIZE 97 +#define TK_STT_TRIGGER 98 +#define TK_TABLE_PREFIX 99 +#define TK_TABLE_SUFFIX 100 +#define TK_S3_CHUNKSIZE 101 +#define TK_S3_KEEPLOCAL 102 +#define TK_S3_COMPACT 103 +#define TK_KEEP_TIME_OFFSET 104 +#define TK_NK_COLON 105 +#define TK_BWLIMIT 106 +#define TK_START 107 +#define TK_TIMESTAMP 108 +#define TK_END 109 +#define TK_TABLE 110 +#define TK_NK_LP 111 +#define TK_NK_RP 112 +#define TK_STABLE 113 +#define TK_COLUMN 114 +#define TK_MODIFY 115 +#define TK_RENAME 116 +#define TK_TAG 117 +#define TK_SET 118 +#define TK_NK_EQ 119 +#define TK_USING 120 +#define TK_TAGS 121 +#define TK_BOOL 122 +#define TK_TINYINT 123 +#define TK_SMALLINT 124 +#define TK_INT 125 +#define TK_INTEGER 126 +#define TK_BIGINT 127 +#define TK_FLOAT 128 +#define TK_DOUBLE 129 +#define TK_BINARY 130 +#define TK_NCHAR 131 +#define TK_UNSIGNED 132 +#define TK_JSON 133 +#define TK_VARCHAR 134 +#define TK_MEDIUMBLOB 135 +#define TK_BLOB 136 +#define TK_VARBINARY 137 +#define TK_GEOMETRY 138 +#define TK_DECIMAL 139 +#define TK_COMMENT 140 +#define TK_MAX_DELAY 141 +#define TK_WATERMARK 142 +#define TK_ROLLUP 143 +#define TK_TTL 144 +#define TK_SMA 145 +#define TK_DELETE_MARK 146 +#define TK_FIRST 147 +#define TK_LAST 148 +#define TK_SHOW 149 +#define TK_PRIVILEGES 150 +#define TK_DATABASES 151 +#define TK_TABLES 152 +#define TK_STABLES 153 +#define TK_MNODES 154 +#define TK_QNODES 155 +#define TK_FUNCTIONS 156 +#define TK_INDEXES 157 +#define TK_ACCOUNTS 158 +#define TK_APPS 159 +#define TK_CONNECTIONS 160 +#define TK_LICENCES 161 +#define TK_GRANTS 162 +#define TK_FULL 163 +#define TK_LOGS 164 +#define TK_MACHINES 165 +#define TK_QUERIES 166 +#define TK_SCORES 167 +#define TK_TOPICS 168 +#define TK_VARIABLES 169 +#define TK_BNODES 170 +#define TK_SNODES 171 +#define TK_TRANSACTIONS 172 +#define TK_DISTRIBUTED 173 +#define TK_CONSUMERS 174 +#define TK_SUBSCRIPTIONS 175 +#define TK_VNODES 176 +#define TK_ALIVE 177 +#define TK_VIEWS 178 +#define TK_VIEW 179 +#define TK_COMPACTS 180 +#define TK_NORMAL 181 +#define TK_CHILD 182 +#define TK_LIKE 183 +#define TK_TBNAME 184 +#define TK_QTAGS 185 +#define TK_AS 186 +#define TK_SYSTEM 187 +#define TK_INDEX 188 +#define TK_FUNCTION 189 +#define TK_INTERVAL 190 +#define TK_COUNT 191 +#define TK_LAST_ROW 192 +#define TK_META 193 +#define TK_ONLY 194 +#define TK_TOPIC 195 +#define TK_CONSUMER 196 +#define TK_GROUP 197 +#define TK_DESC 198 +#define TK_DESCRIBE 199 +#define TK_RESET 200 +#define TK_QUERY 201 +#define TK_CACHE 202 +#define TK_EXPLAIN 203 +#define TK_ANALYZE 204 +#define TK_VERBOSE 205 +#define TK_NK_BOOL 206 +#define TK_RATIO 207 +#define TK_NK_FLOAT 208 +#define TK_OUTPUTTYPE 209 +#define TK_AGGREGATE 210 +#define TK_BUFSIZE 211 +#define TK_LANGUAGE 212 +#define TK_REPLACE 213 +#define TK_STREAM 214 +#define TK_INTO 215 +#define TK_PAUSE 216 +#define TK_RESUME 217 +#define TK_TRIGGER 218 +#define TK_AT_ONCE 219 +#define TK_WINDOW_CLOSE 220 +#define TK_IGNORE 221 +#define TK_EXPIRED 222 +#define TK_FILL_HISTORY 223 +#define TK_UPDATE 224 +#define TK_SUBTABLE 225 +#define TK_UNTREATED 226 +#define TK_KILL 227 +#define TK_CONNECTION 228 +#define TK_TRANSACTION 229 +#define TK_BALANCE 230 +#define TK_VGROUP 231 +#define TK_LEADER 232 +#define TK_MERGE 233 +#define TK_REDISTRIBUTE 234 +#define TK_SPLIT 235 +#define TK_DELETE 236 +#define TK_INSERT 237 +#define TK_NULL 238 +#define TK_NK_QUESTION 239 +#define TK_NK_ALIAS 240 +#define TK_NK_ARROW 241 +#define TK_ROWTS 242 +#define TK_QSTART 243 +#define TK_QEND 244 +#define TK_QDURATION 245 +#define TK_WSTART 246 +#define TK_WEND 247 +#define TK_WDURATION 248 +#define TK_IROWTS 249 +#define TK_ISFILLED 250 +#define TK_CAST 251 +#define TK_NOW 252 +#define TK_TODAY 253 +#define TK_TIMEZONE 254 +#define TK_CLIENT_VERSION 255 +#define TK_SERVER_VERSION 256 +#define TK_SERVER_STATUS 257 +#define TK_CURRENT_USER 258 +#define TK_CASE 259 +#define TK_WHEN 260 +#define TK_THEN 261 +#define TK_ELSE 262 +#define TK_BETWEEN 263 +#define TK_IS 264 +#define TK_NK_LT 265 +#define TK_NK_GT 266 +#define TK_NK_LE 267 +#define TK_NK_GE 268 +#define TK_NK_NE 269 +#define TK_MATCH 270 +#define TK_NMATCH 271 +#define TK_CONTAINS 272 +#define TK_IN 273 +#define TK_JOIN 274 +#define TK_INNER 275 +#define TK_SELECT 276 +#define TK_NK_HINT 277 +#define TK_DISTINCT 278 +#define TK_WHERE 279 +#define TK_PARTITION 280 +#define TK_BY 281 +#define TK_SESSION 282 +#define TK_STATE_WINDOW 283 +#define TK_EVENT_WINDOW 284 +#define TK_COUNT_WINDOW 285 +#define TK_SLIDING 286 +#define TK_FILL 287 +#define TK_VALUE 288 +#define TK_VALUE_F 289 +#define TK_NONE 290 +#define TK_PREV 291 +#define TK_NULL_F 292 +#define TK_LINEAR 293 +#define TK_NEXT 294 +#define TK_HAVING 295 +#define TK_RANGE 296 +#define TK_EVERY 297 +#define TK_ORDER 298 +#define TK_SLIMIT 299 +#define TK_SOFFSET 300 +#define TK_LIMIT 301 +#define TK_OFFSET 302 +#define TK_ASC 303 +#define TK_NULLS 304 +#define TK_ABORT 305 +#define TK_AFTER 306 +#define TK_ATTACH 307 +#define TK_BEFORE 308 +#define TK_BEGIN 309 +#define TK_BITAND 310 +#define TK_BITNOT 311 +#define TK_BITOR 312 +#define TK_BLOCKS 313 +#define TK_CHANGE 314 +#define TK_COMMA 315 +#define TK_CONCAT 316 +#define TK_CONFLICT 317 +#define TK_COPY 318 +#define TK_DEFERRED 319 +#define TK_DELIMITERS 320 +#define TK_DETACH 321 +#define TK_DIVIDE 322 +#define TK_DOT 323 +#define TK_EACH 324 +#define TK_FAIL 325 +#define TK_FILE 326 +#define TK_FOR 327 +#define TK_GLOB 328 +#define TK_ID 329 +#define TK_IMMEDIATE 330 +#define TK_IMPORT 331 +#define TK_INITIALLY 332 +#define TK_INSTEAD 333 +#define TK_ISNULL 334 +#define TK_KEY 335 +#define TK_MODULES 336 +#define TK_NK_BITNOT 337 +#define TK_NK_SEMI 338 +#define TK_NOTNULL 339 +#define TK_OF 340 +#define TK_PLUS 341 +#define TK_PRIVILEGE 342 +#define TK_RAISE 343 +#define TK_RESTRICT 344 +#define TK_ROW 345 +#define TK_SEMI 346 +#define TK_STAR 347 +#define TK_STATEMENT 348 +#define TK_STRICT 349 +#define TK_STRING 350 +#define TK_TIMES 351 +#define TK_VALUES 352 +#define TK_VARIABLE 353 +#define TK_WAL 354 #define TK_NK_SPACE 600 @@ -379,8 +382,7 @@ #define TK_NO_BATCH_SCAN 607 #define TK_SORT_FOR_GROUP 608 #define TK_PARTITION_FIRST 609 -#define TK_PARA_TABLES_SORT 610 - +#define TK_PARA_TABLES_SORT 610 #define TK_NK_NIL 65535 diff --git a/include/libs/nodes/cmdnodes.h b/include/libs/nodes/cmdnodes.h index 9a12d7b98f..675fd9a3e4 100644 --- a/include/libs/nodes/cmdnodes.h +++ b/include/libs/nodes/cmdnodes.h @@ -100,6 +100,10 @@ typedef struct SDatabaseOptions { int32_t sstTrigger; int32_t tablePrefix; int32_t tableSuffix; + int32_t s3ChunkSize; + int32_t s3KeepLocal; + SValueNode* s3KeepLocalStr; + int8_t s3Compact; } SDatabaseOptions; typedef struct SCreateDatabaseStmt { @@ -137,6 +141,11 @@ typedef struct STrimDatabaseStmt { int32_t maxSpeed; } STrimDatabaseStmt; +typedef struct SS3MigrateDatabaseStmt { + ENodeType type; + char dbName[TSDB_DB_NAME_LEN]; +} SS3MigrateDatabaseStmt; + typedef struct SCompactDatabaseStmt { ENodeType type; char dbName[TSDB_DB_NAME_LEN]; diff --git a/include/util/tconfig.h b/include/util/tconfig.h index f2a9446700..45abe2ff83 100644 --- a/include/util/tconfig.h +++ b/include/util/tconfig.h @@ -51,11 +51,7 @@ typedef enum { CFG_DTYPE_TIMEZONE } ECfgDataType; -typedef enum { - CFG_SCOPE_SERVER, - CFG_SCOPE_CLIENT, - CFG_SCOPE_BOTH -} ECfgScopeType; +typedef enum { CFG_SCOPE_SERVER, CFG_SCOPE_CLIENT, CFG_SCOPE_BOTH } ECfgScopeType; typedef enum { CFG_DYN_NONE = 0, @@ -138,6 +134,7 @@ void cfgDumpItemValue(SConfigItem *pItem, char *buf, int32_t bufSize, int32_t *p void cfgDumpItemScope(SConfigItem *pItem, char *buf, int32_t bufSize, int32_t *pLen); void cfgDumpCfg(SConfig *pCfg, bool tsc, bool dump); +void cfgDumpCfgS3(SConfig *pCfg, bool tsc, bool dump); int32_t cfgGetApollUrl(const char **envCmd, const char *envFile, char *apolloUrl); diff --git a/include/util/tdef.h b/include/util/tdef.h index e2d1beb5a5..0dfc88cc1d 100644 --- a/include/util/tdef.h +++ b/include/util/tdef.h @@ -197,8 +197,8 @@ typedef enum ELogicConditionType { #define TSDB_POINTER_PRINT_BYTES 18 // 0x1122334455667788 // ACCOUNT is a 32 bit positive integer // this is the length of its string representation, including the terminator zero -#define TSDB_ACCT_ID_LEN 11 -#define TSDB_NODE_ID_LEN 11 +#define TSDB_ACCT_ID_LEN 11 +#define TSDB_NODE_ID_LEN 11 #define TSDB_VGROUP_ID_LEN 11 #define TSDB_MAX_COLUMNS 4096 @@ -290,8 +290,8 @@ typedef enum ELogicConditionType { #define TSDB_DNODE_CONFIG_LEN 128 #define TSDB_DNODE_VALUE_LEN 256 -#define TSDB_CLUSTER_VALUE_LEN 1000 -#define TSDB_GRANT_LOG_COL_LEN 15600 +#define TSDB_CLUSTER_VALUE_LEN 1000 +#define TSDB_GRANT_LOG_COL_LEN 15600 #define TSDB_ACTIVE_KEY_LEN 109 #define TSDB_CONN_ACTIVE_KEY_LEN 255 @@ -413,6 +413,16 @@ typedef enum ELogicConditionType { #define TSDB_MAX_HASH_SUFFIX (TSDB_TABLE_NAME_LEN - 2) #define TSDB_DEFAULT_HASH_SUFFIX 0 +#define TSDB_MIN_S3_CHUNK_SIZE (32 * 1024) +#define TSDB_MAX_S3_CHUNK_SIZE (1024 * 1024) +#define TSDB_DEFAULT_S3_CHUNK_SIZE (256 * 1024) +#define TSDB_MIN_S3_KEEP_LOCAL (1 * 1440) // unit minute +#define TSDB_MAX_S3_KEEP_LOCAL (365000 * 1440) +#define TSDB_DEFAULT_S3_KEEP_LOCAL (30 * 1440) +#define TSDB_MIN_S3_COMPACT 0 +#define TSDB_MAX_S3_COMPACT 1 +#define TSDB_DEFAULT_S3_COMPACT 0 + #define TSDB_DB_MIN_WAL_RETENTION_PERIOD -1 #define TSDB_REP_DEF_DB_WAL_RET_PERIOD 3600 #define TSDB_REPS_DEF_DB_WAL_RET_PERIOD 3600 @@ -551,10 +561,10 @@ enum { #define VNODE_TIMEOUT_SEC 60 #define MNODE_TIMEOUT_SEC 60 -#define MONITOR_TABLENAME_LEN 200 -#define MONITOR_TAG_NAME_LEN 100 -#define MONITOR_TAG_VALUE_LEN 300 -#define MONITOR_METRIC_NAME_LEN 100 +#define MONITOR_TABLENAME_LEN 200 +#define MONITOR_TAG_NAME_LEN 100 +#define MONITOR_TAG_VALUE_LEN 300 +#define MONITOR_METRIC_NAME_LEN 100 #ifdef __cplusplus } #endif diff --git a/source/common/CMakeLists.txt b/source/common/CMakeLists.txt index d3df1345df..eb3dd95e95 100644 --- a/source/common/CMakeLists.txt +++ b/source/common/CMakeLists.txt @@ -16,14 +16,15 @@ ENDIF () IF (TD_STORAGE) ADD_DEFINITIONS(-D_STORAGE) TARGET_LINK_LIBRARIES(common PRIVATE storage) +ENDIF () +IF (TD_ENTERPRISE) IF(${BUILD_WITH_S3}) add_definitions(-DUSE_S3) ELSEIF(${BUILD_WITH_COS}) add_definitions(-DUSE_COS) ENDIF() - -ENDIF () +ENDIF() target_include_directories( common diff --git a/source/common/src/cos.c b/source/common/src/cos.c index fcc777ac99..95b216c285 100644 --- a/source/common/src/cos.c +++ b/source/common/src/cos.c @@ -53,6 +53,100 @@ int32_t s3Init() { return 0; /*s3Begin();*/ } void s3CleanUp() { /*s3End();*/ } +static int32_t s3ListBucket(char const *bucketname); + +int32_t s3CheckCfg() { + int32_t code = 0; + + code = s3Begin(); + if (code != 0) { + fprintf(stderr, "failed to initialize s3.\n"); + goto _exit; + } + + // test put + char testdata[17] = "0123456789abcdef"; + const char *objectname[] = {"s3test.txt"}; + char path[PATH_MAX] = {0}; + int ds_len = strlen(TD_DIRSEP); + int tmp_len = strlen(tsTempDir); + + snprintf(path, PATH_MAX, "%s", tsTempDir); + if (strncmp(tsTempDir + tmp_len - ds_len, TD_DIRSEP, ds_len) != 0) { + snprintf(path + tmp_len, PATH_MAX, "%s", TD_DIRSEP); + snprintf(path + tmp_len + ds_len, PATH_MAX, "%s", objectname[0]); + } else { + snprintf(path + tmp_len, PATH_MAX, "%s", objectname[0]); + } + + TdFilePtr fp = taosOpenFile(path, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_READ | TD_FILE_TRUNC); + if (!fp) { + code = TAOS_SYSTEM_ERROR(errno); + fprintf(stderr, "failed to open test file: %s.\n", path); + // uError("ERROR: %s Failed to open %s", __func__, path); + goto _exit; + } + if (taosWriteFile(fp, testdata, strlen(testdata)) < 0) { + code = TAOS_SYSTEM_ERROR(errno); + fprintf(stderr, "failed to write test file: %s.\n", path); + goto _exit; + } + if (taosFsyncFile(fp) < 0) { + code = TAOS_SYSTEM_ERROR(errno); + fprintf(stderr, "failed to fsync test file: %s.\n", path); + goto _exit; + } + taosCloseFile(&fp); + + fprintf(stderr, "\nstart to put object: %s, file: %s content: %s\n", objectname[0], path, testdata); + code = s3PutObjectFromFileOffset(path, objectname[0], 0, 16); + if (code != 0) { + fprintf(stderr, "put object %s : failed.\n", objectname[0]); + goto _exit; + } + fprintf(stderr, "put object %s: success.\n\n", objectname[0]); + + // list buckets + fprintf(stderr, "start to list bucket %s by prefix s3.\n", tsS3BucketName); + code = s3ListBucket(tsS3BucketName); + if (code != 0) { + fprintf(stderr, "listing bucket %s : failed.\n", tsS3BucketName); + goto _exit; + } + fprintf(stderr, "listing bucket %s: success.\n\n", tsS3BucketName); + + // test range get + uint8_t *pBlock = NULL; + int c_offset = 10; + int c_len = 6; + + fprintf(stderr, "start to range get object %s offset: %d len: %d.\n", objectname[0], c_offset, c_len); + code = s3GetObjectBlock(objectname[0], c_offset, c_len, true, &pBlock); + if (code != 0) { + fprintf(stderr, "get object %s : failed.\n", objectname[0]); + goto _exit; + } + char buf[7] = {0}; + memcpy(buf, pBlock, c_len); + taosMemoryFree(pBlock); + fprintf(stderr, "object content: %s\n", buf); + fprintf(stderr, "get object %s: success.\n\n", objectname[0]); + + // delete test object + fprintf(stderr, "start to delete object: %s.\n", objectname[0]); + code = s3DeleteObjects(objectname, 1); + if (code != 0) { + fprintf(stderr, "delete object %s : failed.\n", objectname[0]); + goto _exit; + } + fprintf(stderr, "delete object %s: success.\n\n", objectname[0]); + + s3End(); + +_exit: + return code; +} + static int should_retry() { /* if (retriesG--) { @@ -85,7 +179,7 @@ typedef struct { } TS3GetData; typedef struct { - char err_msg[128]; + char err_msg[512]; S3Status status; uint64_t content_length; char *buf; @@ -137,6 +231,30 @@ static void responseCompleteCallback(S3Status status, const S3ErrorDetails *erro } } +static SArray *getListByPrefix(const char *prefix); +static void s3FreeObjectKey(void *pItem); + +static int32_t s3ListBucket(char const *bucketname) { + int32_t code = 0; + + SArray *objectArray = getListByPrefix("s3"); + if (objectArray == NULL) { + return -1; + } + + const char **object_name = TARRAY_DATA(objectArray); + int size = TARRAY_SIZE(objectArray); + + fprintf(stderr, "objects:\n"); + for (int i = 0; i < size; ++i) { + fprintf(stderr, "%s\n", object_name[i]); + } + + taosArrayDestroyEx(objectArray, s3FreeObjectKey); + + return code; +} + typedef struct growbuffer { // The total number of bytes, and the start byte int size; @@ -471,10 +589,11 @@ static int32_t s3PutObjectFromFileSimple(S3BucketContext *bucket_context, char c } while (S3_status_is_retryable(data->status) && should_retry()); if (data->status != S3StatusOK) { - s3PrintError(__FILE__, __LINE__, __func__, data->status, data->err_msg); + // s3PrintError(__FILE__, __LINE__, __func__, data->status, data->err_msg); + s3PrintError(NULL, __LINE__, __func__, data->status, data->err_msg); code = TAOS_SYSTEM_ERROR(EIO); } else if (data->contentLength) { - uError("%s Failed to read remaining %llu bytes from input", __func__, (unsigned long long)data->contentLength); + uError("%s Failed to put remaining %llu bytes", __func__, (unsigned long long)data->contentLength); code = TAOS_SYSTEM_ERROR(EIO); } @@ -830,6 +949,66 @@ int32_t s3PutObjectFromFile2(const char *file, const char *object_name, int8_t w return code; } +int32_t s3PutObjectFromFileOffset(const char *file, const char *object_name, int64_t offset, int64_t size) { + int32_t code = 0; + int32_t lmtime = 0; + const char *filename = 0; + uint64_t contentLength = 0; + const char *cacheControl = 0, *contentType = 0, *md5 = 0; + const char *contentDispositionFilename = 0, *contentEncoding = 0; + int64_t expires = -1; + S3CannedAcl cannedAcl = S3CannedAclPrivate; + int metaPropertiesCount = 0; + S3NameValue metaProperties[S3_MAX_METADATA_COUNT]; + char useServerSideEncryption = 0; + put_object_callback_data data = {0}; + + if (taosStatFile(file, &contentLength, &lmtime, NULL) < 0) { + code = TAOS_SYSTEM_ERROR(errno); + uError("ERROR: %s Failed to stat file %s: ", __func__, file); + return code; + } + + contentLength = size; + + if (!(data.infileFD = taosOpenFile(file, TD_FILE_READ))) { + code = TAOS_SYSTEM_ERROR(errno); + uError("ERROR: %s Failed to open file %s: ", __func__, file); + return code; + } + if (taosLSeekFile(data.infileFD, offset, SEEK_SET) < 0) { + taosCloseFile(&data.infileFD); + code = TAOS_SYSTEM_ERROR(errno); + return code; + } + + data.totalContentLength = data.totalOriginalContentLength = data.contentLength = data.originalContentLength = + contentLength; + + S3BucketContext bucketContext = {0, tsS3BucketName, protocolG, uriStyleG, tsS3AccessKeyId, tsS3AccessKeySecret, + 0, awsRegionG}; + + S3PutProperties putProperties = {contentType, md5, + cacheControl, contentDispositionFilename, + contentEncoding, expires, + cannedAcl, metaPropertiesCount, + metaProperties, useServerSideEncryption}; + + if (contentLength <= MULTIPART_CHUNK_SIZE) { + code = s3PutObjectFromFileSimple(&bucketContext, object_name, contentLength, &putProperties, &data); + } else { + code = s3PutObjectFromFileWithoutCp(&bucketContext, object_name, contentLength, &putProperties, &data); + } + + if (data.infileFD) { + taosCloseFile(&data.infileFD); + } else if (data.gb) { + growbuffer_destroy(data.gb); + } + + return code; +} + typedef struct list_bucket_callback_data { char err_msg[512]; S3Status status; @@ -888,7 +1067,7 @@ static SArray *getListByPrefix(const char *prefix) { const char *marker = 0, *delimiter = 0; int maxkeys = 0, allDetails = 0; - list_bucket_callback_data data; + list_bucket_callback_data data = {0}; data.objectArray = taosArrayInit(32, sizeof(void *)); if (!data.objectArray) { uError("%s: %s", __func__, "out of memoty"); @@ -918,14 +1097,17 @@ static SArray *getListByPrefix(const char *prefix) { return data.objectArray; } } else { - s3PrintError(__FILE__, __LINE__, __func__, data.status, data.err_msg); + uError("failed to list with prefix %s: %s", prefix, S3_get_status_name(data.status)); + // s3PrintError(__FILE__, __LINE__, __func__, data.status, data.err_msg); } taosArrayDestroyEx(data.objectArray, s3FreeObjectKey); return NULL; } -void s3DeleteObjects(const char *object_name[], int nobject) { +int32_t s3DeleteObjects(const char *object_name[], int nobject) { + int32_t code = 0; + S3BucketContext bucketContext = {0, tsS3BucketName, protocolG, uriStyleG, tsS3AccessKeyId, tsS3AccessKeySecret, 0, awsRegionG}; S3ResponseHandler responseHandler = {0, &responseCompleteCallback}; @@ -938,8 +1120,11 @@ void s3DeleteObjects(const char *object_name[], int nobject) { if ((cbd.status != S3StatusOK) && (cbd.status != S3StatusErrorPreconditionFailed)) { s3PrintError(__FILE__, __LINE__, __func__, cbd.status, cbd.err_msg); + code = -1; } } + + return code; } void s3DeleteObjectsByPrefix(const char *prefix) { @@ -991,12 +1176,12 @@ int32_t s3GetObjectBlock(const char *object_name, int64_t offset, int64_t size, } while (S3_status_is_retryable(cbd.status) && should_retry()); if (cbd.status != S3StatusOK) { - uError("%s: %d(%s)", __func__, cbd.status, cbd.err_msg); + uError("%s: %d/%s(%s)", __func__, cbd.status, S3_get_status_name(cbd.status), cbd.err_msg); return TAOS_SYSTEM_ERROR(EIO); } if (check && cbd.buf_pos != size) { - uError("%s: %d(%s)", __func__, cbd.status, cbd.err_msg); + uError("%s: %d/%s(%s)", __func__, cbd.status, S3_get_status_name(cbd.status), cbd.err_msg); return TAOS_SYSTEM_ERROR(EIO); } @@ -1233,7 +1418,7 @@ void s3DeleteObjectsByPrefix(const char *prefix_str) { cos_pool_destroy(p); } -void s3DeleteObjects(const char *object_name[], int nobject) { +int32_t s3DeleteObjects(const char *object_name[], int nobject) { cos_pool_t *p = NULL; int is_cname = 0; cos_string_t bucket; @@ -1267,6 +1452,8 @@ void s3DeleteObjects(const char *object_name[], int nobject) { } else { cos_warn_log("delete objects failed\n"); } + + return 0; } bool s3Exists(const char *object_name) { @@ -1536,7 +1723,7 @@ void s3CleanUp() {} int32_t s3PutObjectFromFile(const char *file, const char *object) { return 0; } int32_t s3PutObjectFromFile2(const char *file, const char *object, int8_t withcp) { return 0; } void s3DeleteObjectsByPrefix(const char *prefix) {} -void s3DeleteObjects(const char *object_name[], int nobject) {} +int32_t s3DeleteObjects(const char *object_name[], int nobject) { return 0; } bool s3Exists(const char *object_name) { return false; } bool s3Get(const char *object_name, const char *path) { return false; } int32_t s3GetObjectBlock(const char *object_name, int64_t offset, int64_t size, bool check, uint8_t **ppBlock) { diff --git a/source/common/src/systable.c b/source/common/src/systable.c index 25cc5d7c79..e2dff7e95e 100644 --- a/source/common/src/systable.c +++ b/source/common/src/systable.c @@ -24,7 +24,7 @@ #define SYSTABLE_SCH_TABLE_NAME_LEN ((TSDB_TABLE_NAME_LEN - 1) + VARSTR_HEADER_SIZE) #define SYSTABLE_SCH_DB_NAME_LEN ((TSDB_DB_NAME_LEN - 1) + VARSTR_HEADER_SIZE) #define SYSTABLE_SCH_COL_NAME_LEN ((TSDB_COL_NAME_LEN - 1) + VARSTR_HEADER_SIZE) -#define SYSTABLE_SCH_VIEW_NAME_LEN ((TSDB_VIEW_NAME_LEN - 1) + VARSTR_HEADER_SIZE) +#define SYSTABLE_SCH_VIEW_NAME_LEN ((TSDB_VIEW_NAME_LEN - 1) + VARSTR_HEADER_SIZE) // clang-format off static const SSysDbTableSchema dnodesSchema[] = { @@ -107,6 +107,9 @@ static const SSysDbTableSchema userDBSchema[] = { {.name = "table_suffix", .bytes = 2, .type = TSDB_DATA_TYPE_SMALLINT, .sysInfo = true}, {.name = "tsdb_pagesize", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = true}, {.name = "keep_time_offset", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = false}, + {.name = "s3_chunksize", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = false}, + {.name = "s3_keeplocal", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = false}, + {.name = "s3_compact", .bytes = 1, .type = TSDB_DATA_TYPE_TINYINT, .sysInfo = false}, }; static const SSysDbTableSchema userFuncSchema[] = { diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 55df80ca44..157fe942de 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -259,7 +259,9 @@ float tsSinkDataRate = 2.0; int32_t tsStreamNodeCheckInterval = 16; int32_t tsTtlUnit = 86400; int32_t tsTtlPushIntervalSec = 10; -int32_t tsTrimVDbIntervalSec = 60 * 60; // interval of trimming db in all vgroups +int32_t tsTrimVDbIntervalSec = 60 * 60; // interval of trimming db in all vgroups +int32_t tsS3MigrateIntervalSec = 60 * 60; // interval of s3migrate db in all vgroups +bool tsS3MigrateEnabled = 1; int32_t tsGrantHBInterval = 60; int32_t tsUptimeInterval = 300; // seconds char tsUdfdResFuncs[512] = ""; // udfd resident funcs that teardown when udfd exits @@ -285,7 +287,7 @@ char tsS3Hostname[TSDB_FQDN_LEN] = ""; int32_t tsS3BlockSize = -1; // number of tsdb pages (4096) int32_t tsS3BlockCacheSize = 16; // number of blocks int32_t tsS3PageCacheSize = 4096; // number of pages -int32_t tsS3UploadDelaySec = 60 * 60 * 24; +int32_t tsS3UploadDelaySec = 60; bool tsExperimental = true; @@ -345,7 +347,9 @@ int32_t taosSetS3Cfg(SConfig *pCfg) { } if (tsS3BucketName[0] != '<') { #if defined(USE_COS) || defined(USE_S3) - if (tsDiskCfgNum > 1) tsS3Enabled = true; +#ifdef TD_ENTERPRISE + /*if (tsDiskCfgNum > 1) */ tsS3Enabled = true; +#endif tsS3StreamEnabled = true; #endif } @@ -539,7 +543,8 @@ static int32_t taosAddClientCfg(SConfig *pCfg) { if (cfgAddBool(pCfg, "experimental", tsExperimental, CFG_SCOPE_BOTH, CFG_DYN_BOTH) != 0) return -1; if (cfgAddBool(pCfg, "monitor", tsEnableMonitor, CFG_SCOPE_SERVER, CFG_DYN_SERVER) != 0) return -1; - if (cfgAddInt32(pCfg, "monitorInterval", tsMonitorInterval, 1, 200000, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1; + if (cfgAddInt32(pCfg, "monitorInterval", tsMonitorInterval, 1, 200000, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) + return -1; return 0; } @@ -680,8 +685,8 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { if (cfgAddInt32(pCfg, "syncHeartbeatTimeout", tsHeartbeatTimeout, 10, 1000 * 60 * 24 * 2, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1; - if (cfgAddInt32(pCfg, "syncSnapReplMaxWaitN", tsSnapReplMaxWaitN, 16, - (TSDB_SYNC_SNAP_BUFFER_SIZE >> 2), CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) + if (cfgAddInt32(pCfg, "syncSnapReplMaxWaitN", tsSnapReplMaxWaitN, 16, (TSDB_SYNC_SNAP_BUFFER_SIZE >> 2), + CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1; if (cfgAddInt64(pCfg, "mndSdbWriteDelta", tsMndSdbWriteDelta, 20, 10000, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != 0) @@ -699,7 +704,8 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { if (cfgAddInt32(pCfg, "monitorMaxLogs", tsMonitorMaxLogs, 1, 1000000, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1; if (cfgAddBool(pCfg, "monitorComp", tsMonitorComp, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1; if (cfgAddBool(pCfg, "monitorLogProtocol", tsMonitorLogProtocol, CFG_SCOPE_SERVER, CFG_DYN_SERVER) != 0) return -1; - if (cfgAddInt32(pCfg, "monitorIntervalForBasic", tsMonitorIntervalForBasic, 1, 200000, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) + if (cfgAddInt32(pCfg, "monitorIntervalForBasic", tsMonitorIntervalForBasic, 1, 200000, CFG_SCOPE_SERVER, + CFG_DYN_NONE) != 0) return -1; if (cfgAddBool(pCfg, "monitorForceV2", tsMonitorForceV2, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1; @@ -742,6 +748,10 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { if (cfgAddInt32(pCfg, "trimVDbIntervalSec", tsTrimVDbIntervalSec, 1, 100000, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != 0) return -1; + if (cfgAddInt32(pCfg, "s3MigrateIntervalSec", tsS3MigrateIntervalSec, 600, 100000, CFG_SCOPE_SERVER, + CFG_DYN_ENT_SERVER) != 0) + return -1; + if (cfgAddBool(pCfg, "s3MigrateEnabled", tsS3MigrateEnabled, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != 0) return -1; if (cfgAddInt32(pCfg, "uptimeInterval", tsUptimeInterval, 1, 100000, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1; if (cfgAddInt32(pCfg, "queryRsmaTolerance", tsQueryRsmaTolerance, 0, 900000, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1; @@ -760,8 +770,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { if (cfgAddBool(pCfg, "disableStream", tsDisableStream, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != 0) return -1; if (cfgAddInt64(pCfg, "streamBufferSize", tsStreamBufferSize, 0, INT64_MAX, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1; - if (cfgAddInt64(pCfg, "streamAggCnt", tsStreamAggCnt, 2, INT32_MAX, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) - return -1; + if (cfgAddInt64(pCfg, "streamAggCnt", tsStreamAggCnt, 2, INT32_MAX, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1; if (cfgAddInt32(pCfg, "checkpointInterval", tsStreamCheckpointInterval, 60, 1200, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != 0) @@ -792,6 +801,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { if (cfgAddString(pCfg, "s3Accesskey", tsS3AccessKey, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1; if (cfgAddString(pCfg, "s3Endpoint", tsS3Endpoint, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1; if (cfgAddString(pCfg, "s3BucketName", tsS3BucketName, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1; + /* if (cfgAddInt32(pCfg, "s3BlockSize", tsS3BlockSize, -1, 1024 * 1024, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != 0) return -1; if (tsS3BlockSize > -1 && tsS3BlockSize < 1024) { @@ -801,10 +811,11 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { if (cfgAddInt32(pCfg, "s3BlockCacheSize", tsS3BlockCacheSize, 4, 1024 * 1024, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != 0) return -1; + */ if (cfgAddInt32(pCfg, "s3PageCacheSize", tsS3PageCacheSize, 4, 1024 * 1024 * 1024, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != 0) return -1; - if (cfgAddInt32(pCfg, "s3UploadDelaySec", tsS3UploadDelaySec, 60 * 1, 60 * 60 * 24 * 30, CFG_SCOPE_SERVER, + if (cfgAddInt32(pCfg, "s3UploadDelaySec", tsS3UploadDelaySec, 1, 60 * 60 * 24 * 30, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != 0) return -1; @@ -1241,8 +1252,8 @@ static int32_t taosSetServerCfg(SConfig *pCfg) { tsResolveFQDNRetryTime = cfgGetItem(pCfg, "resolveFQDNRetryTime")->i32; tsMinDiskFreeSize = cfgGetItem(pCfg, "minDiskFreeSize")->i64; - tsS3BlockSize = cfgGetItem(pCfg, "s3BlockSize")->i32; - tsS3BlockCacheSize = cfgGetItem(pCfg, "s3BlockCacheSize")->i32; + // tsS3BlockSize = cfgGetItem(pCfg, "s3BlockSize")->i32; + // tsS3BlockCacheSize = cfgGetItem(pCfg, "s3BlockCacheSize")->i32; tsS3PageCacheSize = cfgGetItem(pCfg, "s3PageCacheSize")->i32; tsS3UploadDelaySec = cfgGetItem(pCfg, "s3UploadDelaySec")->i32; @@ -1516,6 +1527,8 @@ static int32_t taosCfgDynamicOptionsForServer(SConfig *pCfg, char *name) { {"ttlBatchDropNum", &tsTtlBatchDropNum}, {"ttlFlushThreshold", &tsTtlFlushThreshold}, {"ttlPushInterval", &tsTtlPushIntervalSec}, + {"s3MigrateIntervalSec", &tsS3MigrateIntervalSec}, + {"s3MigrateEnabled", &tsS3MigrateEnabled}, //{"s3BlockSize", &tsS3BlockSize}, {"s3BlockCacheSize", &tsS3BlockCacheSize}, {"s3PageCacheSize", &tsS3PageCacheSize}, diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index f8df2edd61..b6067c7266 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -1471,9 +1471,7 @@ int32_t tDeserializeSStatisReq(void *buf, int32_t bufLen, SStatisReq *pReq) { return 0; } -void tFreeSStatisReq(SStatisReq *pReq) { - taosMemoryFreeClear(pReq->pCont); -} +void tFreeSStatisReq(SStatisReq *pReq) { taosMemoryFreeClear(pReq->pCont); } // int32_t tSerializeSCreateAcctReq(void *buf, int32_t bufLen, SCreateAcctReq *pReq) { // SEncoder encoder = {0}; @@ -1872,7 +1870,7 @@ int32_t tSerializeSGetUserAuthRspImpl(SEncoder *pEncoder, SGetUserAuthRsp *pRsp) char *tb = taosHashIterate(pRsp->readTbs, NULL); while (tb != NULL) { size_t keyLen = 0; - void * key = taosHashGetKey(tb, &keyLen); + void *key = taosHashGetKey(tb, &keyLen); if (tEncodeI32(pEncoder, keyLen) < 0) return -1; if (tEncodeCStr(pEncoder, key) < 0) return -1; @@ -1887,7 +1885,7 @@ int32_t tSerializeSGetUserAuthRspImpl(SEncoder *pEncoder, SGetUserAuthRsp *pRsp) tb = taosHashIterate(pRsp->writeTbs, NULL); while (tb != NULL) { size_t keyLen = 0; - void * key = taosHashGetKey(tb, &keyLen); + void *key = taosHashGetKey(tb, &keyLen); if (tEncodeI32(pEncoder, keyLen) < 0) return -1; if (tEncodeCStr(pEncoder, key) < 0) return -1; @@ -1902,7 +1900,7 @@ int32_t tSerializeSGetUserAuthRspImpl(SEncoder *pEncoder, SGetUserAuthRsp *pRsp) tb = taosHashIterate(pRsp->alterTbs, NULL); while (tb != NULL) { size_t keyLen = 0; - void * key = taosHashGetKey(tb, &keyLen); + void *key = taosHashGetKey(tb, &keyLen); if (tEncodeI32(pEncoder, keyLen) < 0) return -1; if (tEncodeCStr(pEncoder, key) < 0) return -1; @@ -1917,7 +1915,7 @@ int32_t tSerializeSGetUserAuthRspImpl(SEncoder *pEncoder, SGetUserAuthRsp *pRsp) tb = taosHashIterate(pRsp->readViews, NULL); while (tb != NULL) { size_t keyLen = 0; - void * key = taosHashGetKey(tb, &keyLen); + void *key = taosHashGetKey(tb, &keyLen); if (tEncodeI32(pEncoder, keyLen) < 0) return -1; if (tEncodeCStr(pEncoder, key) < 0) return -1; @@ -1932,7 +1930,7 @@ int32_t tSerializeSGetUserAuthRspImpl(SEncoder *pEncoder, SGetUserAuthRsp *pRsp) tb = taosHashIterate(pRsp->writeViews, NULL); while (tb != NULL) { size_t keyLen = 0; - void * key = taosHashGetKey(tb, &keyLen); + void *key = taosHashGetKey(tb, &keyLen); if (tEncodeI32(pEncoder, keyLen) < 0) return -1; if (tEncodeCStr(pEncoder, key) < 0) return -1; @@ -1947,7 +1945,7 @@ int32_t tSerializeSGetUserAuthRspImpl(SEncoder *pEncoder, SGetUserAuthRsp *pRsp) tb = taosHashIterate(pRsp->alterViews, NULL); while (tb != NULL) { size_t keyLen = 0; - void * key = taosHashGetKey(tb, &keyLen); + void *key = taosHashGetKey(tb, &keyLen); if (tEncodeI32(pEncoder, keyLen) < 0) return -1; if (tEncodeCStr(pEncoder, key) < 0) return -1; @@ -1962,7 +1960,7 @@ int32_t tSerializeSGetUserAuthRspImpl(SEncoder *pEncoder, SGetUserAuthRsp *pRsp) int32_t *useDb = taosHashIterate(pRsp->useDbs, NULL); while (useDb != NULL) { size_t keyLen = 0; - void * key = taosHashGetKey(useDb, &keyLen); + void *key = taosHashGetKey(useDb, &keyLen); if (tEncodeI32(pEncoder, keyLen) < 0) return -1; if (tEncodeCStr(pEncoder, key) < 0) return -1; @@ -3022,6 +3020,9 @@ int32_t tSerializeSCreateDbReq(void *buf, int32_t bufLen, SCreateDbReq *pReq) { } if (tEncodeI32(&encoder, pReq->tsdbPageSize) < 0) return -1; if (tEncodeI32(&encoder, pReq->keepTimeOffset) < 0) return -1; + if (tEncodeI32(&encoder, pReq->s3ChunkSize) < 0) return -1; + if (tEncodeI32(&encoder, pReq->s3KeepLocal) < 0) return -1; + if (tEncodeI8(&encoder, pReq->s3Compact) < 0) return -1; ENCODESQL(); tEndEncode(&encoder); @@ -3091,6 +3092,15 @@ int32_t tDeserializeSCreateDbReq(void *buf, int32_t bufLen, SCreateDbReq *pReq) if (tDecodeI32(&decoder, &pReq->keepTimeOffset) < 0) return -1; } + pReq->s3ChunkSize = TSDB_DEFAULT_S3_CHUNK_SIZE; + pReq->s3KeepLocal = TSDB_DEFAULT_S3_KEEP_LOCAL; + pReq->s3Compact = TSDB_DEFAULT_S3_COMPACT; + if (!tDecodeIsEnd(&decoder)) { + if (tDecodeI32(&decoder, &pReq->s3ChunkSize) < 0) return -1; + if (tDecodeI32(&decoder, &pReq->s3KeepLocal) < 0) return -1; + if (tDecodeI8(&decoder, &pReq->s3Compact) < 0) return -1; + } + DECODESQL(); tEndDecode(&decoder); @@ -3132,6 +3142,10 @@ int32_t tSerializeSAlterDbReq(void *buf, int32_t bufLen, SAlterDbReq *pReq) { if (tEncodeI32(&encoder, pReq->walRetentionPeriod) < 0) return -1; if (tEncodeI32(&encoder, pReq->walRetentionSize) < 0) return -1; if (tEncodeI32(&encoder, pReq->keepTimeOffset) < 0) return -1; + + if (tEncodeI32(&encoder, pReq->s3KeepLocal) < 0) return -1; + if (tEncodeI8(&encoder, pReq->s3Compact) < 0) return -1; + ENCODESQL(); tEndEncode(&encoder); @@ -3181,6 +3195,13 @@ int32_t tDeserializeSAlterDbReq(void *buf, int32_t bufLen, SAlterDbReq *pReq) { if (tDecodeI32(&decoder, &pReq->keepTimeOffset) < 0) return -1; } + pReq->s3KeepLocal = TSDB_DEFAULT_S3_KEEP_LOCAL; + pReq->s3Compact = TSDB_DEFAULT_S3_COMPACT; + if (!tDecodeIsEnd(&decoder)) { + if (tDecodeI32(&decoder, &pReq->s3KeepLocal) < 0) return -1; + if (tDecodeI8(&decoder, &pReq->s3Compact) < 0) return -1; + } + DECODESQL(); tEndDecode(&decoder); @@ -3873,6 +3894,57 @@ int32_t tDeserializeSVTrimDbReq(void *buf, int32_t bufLen, SVTrimDbReq *pReq) { return 0; } +int32_t tSerializeSS3MigrateDbReq(void *buf, int32_t bufLen, SS3MigrateDbReq *pReq) { + SEncoder encoder = {0}; + tEncoderInit(&encoder, buf, bufLen); + + if (tStartEncode(&encoder) < 0) return -1; + if (tEncodeCStr(&encoder, pReq->db) < 0) return -1; + tEndEncode(&encoder); + + int32_t tlen = encoder.pos; + tEncoderClear(&encoder); + return tlen; +} + +int32_t tDeserializeSS3MigrateDbReq(void *buf, int32_t bufLen, SS3MigrateDbReq *pReq) { + SDecoder decoder = {0}; + tDecoderInit(&decoder, buf, bufLen); + + if (tStartDecode(&decoder) < 0) return -1; + if (tDecodeCStrTo(&decoder, pReq->db) < 0) return -1; + tEndDecode(&decoder); + + tDecoderClear(&decoder); + return 0; +} + +int32_t tSerializeSVS3MigrateDbReq(void *buf, int32_t bufLen, SVS3MigrateDbReq *pReq) { + SEncoder encoder = {0}; + tEncoderInit(&encoder, buf, bufLen); + + if (tStartEncode(&encoder) < 0) return -1; + if (tEncodeI32(&encoder, pReq->timestamp) < 0) return -1; + tEndEncode(&encoder); + + int32_t tlen = encoder.pos; + tEncoderClear(&encoder); + return tlen; +} + +int32_t tDeserializeSVS3MigrateDbReq(void *buf, int32_t bufLen, SVS3MigrateDbReq *pReq) { + SDecoder decoder = {0}; + tDecoderInit(&decoder, buf, bufLen); + + if (tStartDecode(&decoder) < 0) return -1; + if (tDecodeI32(&decoder, &pReq->timestamp) < 0) return -1; + + tEndDecode(&decoder); + + tDecoderClear(&decoder); + return 0; +} + int32_t tSerializeSVDropTtlTableReq(void *buf, int32_t bufLen, SVDropTtlTableReq *pReq) { SEncoder encoder = {0}; tEncoderInit(&encoder, buf, bufLen); @@ -3969,6 +4041,9 @@ int32_t tSerializeSDbCfgRspImpl(SEncoder *encoder, const SDbCfgRsp *pRsp) { if (tEncodeI8(encoder, pRsp->schemaless) < 0) return -1; if (tEncodeI16(encoder, pRsp->sstTrigger) < 0) return -1; if (tEncodeI32(encoder, pRsp->keepTimeOffset) < 0) return -1; + if (tEncodeI32(encoder, pRsp->s3ChunkSize) < 0) return -1; + if (tEncodeI32(encoder, pRsp->s3KeepLocal) < 0) return -1; + if (tEncodeI8(encoder, pRsp->s3Compact) < 0) return -1; return 0; } @@ -4042,6 +4117,15 @@ int32_t tDeserializeSDbCfgRspImpl(SDecoder *decoder, SDbCfgRsp *pRsp) { if (tDecodeI32(decoder, &pRsp->keepTimeOffset) < 0) return -1; } + pRsp->s3ChunkSize = TSDB_DEFAULT_S3_CHUNK_SIZE; + pRsp->s3KeepLocal = TSDB_DEFAULT_S3_KEEP_LOCAL; + pRsp->s3Compact = TSDB_DEFAULT_S3_COMPACT; + if (!tDecodeIsEnd(decoder)) { + if (tDecodeI32(decoder, &pRsp->s3ChunkSize) < 0) return -1; + if (tDecodeI32(decoder, &pRsp->s3KeepLocal) < 0) return -1; + if (tDecodeI8(decoder, &pRsp->s3Compact) < 0) return -1; + } + return 0; } @@ -5065,7 +5149,7 @@ int32_t tSerializeSCreateVnodeReq(void *buf, int32_t bufLen, SCreateVnodeReq *pR if (tEncodeI16(&encoder, pReq->hashPrefix) < 0) return -1; if (tEncodeI16(&encoder, pReq->hashSuffix) < 0) return -1; if (tEncodeI32(&encoder, pReq->tsdbPageSize) < 0) return -1; - for (int32_t i = 0; i < 8; ++i) { + for (int32_t i = 0; i < 6; ++i) { if (tEncodeI64(&encoder, pReq->reserved[i]) < 0) return -1; } if (tEncodeI8(&encoder, pReq->learnerReplica) < 0) return -1; @@ -5076,6 +5160,9 @@ int32_t tSerializeSCreateVnodeReq(void *buf, int32_t bufLen, SCreateVnodeReq *pR } if (tEncodeI32(&encoder, pReq->changeVersion) < 0) return -1; if (tEncodeI32(&encoder, pReq->keepTimeOffset) < 0) return -1; + if (tEncodeI32(&encoder, pReq->s3ChunkSize) < 0) return -1; + if (tEncodeI32(&encoder, pReq->s3KeepLocal) < 0) return -1; + if (tEncodeI8(&encoder, pReq->s3Compact) < 0) return -1; tEndEncode(&encoder); @@ -5151,7 +5238,7 @@ int32_t tDeserializeSCreateVnodeReq(void *buf, int32_t bufLen, SCreateVnodeReq * if (tDecodeI16(&decoder, &pReq->hashPrefix) < 0) return -1; if (tDecodeI16(&decoder, &pReq->hashSuffix) < 0) return -1; if (tDecodeI32(&decoder, &pReq->tsdbPageSize) < 0) return -1; - for (int32_t i = 0; i < 8; ++i) { + for (int32_t i = 0; i < 6; ++i) { if (tDecodeI64(&decoder, &pReq->reserved[i]) < 0) return -1; } if (!tDecodeIsEnd(&decoder)) { @@ -5170,6 +5257,15 @@ int32_t tDeserializeSCreateVnodeReq(void *buf, int32_t bufLen, SCreateVnodeReq * if (tDecodeI32(&decoder, &pReq->keepTimeOffset) < 0) return -1; } + pReq->s3ChunkSize = TSDB_DEFAULT_S3_CHUNK_SIZE; + pReq->s3KeepLocal = TSDB_DEFAULT_S3_KEEP_LOCAL; + pReq->s3Compact = TSDB_DEFAULT_S3_COMPACT; + if (!tDecodeIsEnd(&decoder)) { + if (tDecodeI32(&decoder, &pReq->s3ChunkSize) < 0) return -1; + if (tDecodeI32(&decoder, &pReq->s3KeepLocal) < 0) return -1; + if (tDecodeI8(&decoder, &pReq->s3Compact) < 0) return -1; + } + tEndDecode(&decoder); tDecoderClear(&decoder); return 0; @@ -5425,7 +5521,7 @@ int32_t tSerializeSAlterVnodeConfigReq(void *buf, int32_t bufLen, SAlterVnodeCon if (tEncodeI8(&encoder, pReq->walLevel) < 0) return -1; if (tEncodeI8(&encoder, pReq->strict) < 0) return -1; if (tEncodeI8(&encoder, pReq->cacheLast) < 0) return -1; - for (int32_t i = 0; i < 8; ++i) { + for (int32_t i = 0; i < 7; ++i) { if (tEncodeI64(&encoder, pReq->reserved[i]) < 0) return -1; } @@ -5436,6 +5532,10 @@ int32_t tSerializeSAlterVnodeConfigReq(void *buf, int32_t bufLen, SAlterVnodeCon if (tEncodeI32(&encoder, pReq->walRetentionPeriod) < 0) return -1; if (tEncodeI32(&encoder, pReq->walRetentionSize) < 0) return -1; if (tEncodeI32(&encoder, pReq->keepTimeOffset) < 0) return -1; + + if (tEncodeI32(&encoder, pReq->s3KeepLocal) < 0) return -1; + if (tEncodeI8(&encoder, pReq->s3Compact) < 0) return -1; + tEndEncode(&encoder); int32_t tlen = encoder.pos; @@ -5461,7 +5561,7 @@ int32_t tDeserializeSAlterVnodeConfigReq(void *buf, int32_t bufLen, SAlterVnodeC if (tDecodeI8(&decoder, &pReq->walLevel) < 0) return -1; if (tDecodeI8(&decoder, &pReq->strict) < 0) return -1; if (tDecodeI8(&decoder, &pReq->cacheLast) < 0) return -1; - for (int32_t i = 0; i < 8; ++i) { + for (int32_t i = 0; i < 7; ++i) { if (tDecodeI64(&decoder, &pReq->reserved[i]) < 0) return -1; } @@ -5487,6 +5587,13 @@ int32_t tDeserializeSAlterVnodeConfigReq(void *buf, int32_t bufLen, SAlterVnodeC if (tDecodeI32(&decoder, &pReq->keepTimeOffset) < 0) return -1; } + pReq->s3KeepLocal = TSDB_DEFAULT_S3_KEEP_LOCAL; + pReq->s3Compact = TSDB_DEFAULT_S3_COMPACT; + if (!tDecodeIsEnd(&decoder)) { + if (tDecodeI32(&decoder, &pReq->s3KeepLocal) < 0) return -1; + if (tDecodeI8(&decoder, &pReq->s3Compact) < 0) return -1; + } + tEndDecode(&decoder); tDecoderClear(&decoder); return 0; @@ -7008,27 +7115,27 @@ void tFreeSSchedulerHbRsp(SSchedulerHbRsp *pRsp) { taosArrayDestroy(pRsp->taskSt // return 0; // } -//int32_t tDeserializeSVCreateTbBatchRsp(void *buf, int32_t bufLen, SVCreateTbBatchRsp *pRsp) { - // SDecoder decoder = {0}; - // int32_t num = 0; - // tDecoderInit(&decoder, buf, bufLen); +// int32_t tDeserializeSVCreateTbBatchRsp(void *buf, int32_t bufLen, SVCreateTbBatchRsp *pRsp) { +// SDecoder decoder = {0}; +// int32_t num = 0; +// tDecoderInit(&decoder, buf, bufLen); - // if (tStartDecode(&decoder) < 0) return -1; - // if (tDecodeI32(&decoder, &num) < 0) return -1; - // if (num > 0) { - // pRsp->rspList = taosArrayInit(num, sizeof(SVCreateTbRsp)); - // if (NULL == pRsp->rspList) return -1; - // for (int32_t i = 0; i < num; ++i) { - // SVCreateTbRsp rsp = {0}; - // if (tDecodeI32(&decoder, &rsp.code) < 0) return -1; - // if (NULL == taosArrayPush(pRsp->rspList, &rsp)) return -1; - // } - // } else { - // pRsp->rspList = NULL; - // } - // tEndDecode(&decoder); +// if (tStartDecode(&decoder) < 0) return -1; +// if (tDecodeI32(&decoder, &num) < 0) return -1; +// if (num > 0) { +// pRsp->rspList = taosArrayInit(num, sizeof(SVCreateTbRsp)); +// if (NULL == pRsp->rspList) return -1; +// for (int32_t i = 0; i < num; ++i) { +// SVCreateTbRsp rsp = {0}; +// if (tDecodeI32(&decoder, &rsp.code) < 0) return -1; +// if (NULL == taosArrayPush(pRsp->rspList, &rsp)) return -1; +// } +// } else { +// pRsp->rspList = NULL; +// } +// tEndDecode(&decoder); - // tDecoderClear(&decoder); +// tDecoderClear(&decoder); // return 0; //} @@ -7301,8 +7408,8 @@ int32_t tSerializeSCMCreateStreamReq(void *buf, int32_t bufLen, const SCMCreateS if (tEncodeI32(&encoder, taosArrayGetSize(pReq->pVgroupVerList)) < 0) return -1; - for(int32_t i = 0; i < taosArrayGetSize(pReq->pVgroupVerList); ++i) { - SVgroupVer* p = taosArrayGet(pReq->pVgroupVerList, i); + for (int32_t i = 0; i < taosArrayGetSize(pReq->pVgroupVerList); ++i) { + SVgroupVer *p = taosArrayGet(pReq->pVgroupVerList, i); if (tEncodeI32(&encoder, p->vgId) < 0) return -1; if (tEncodeI64(&encoder, p->ver) < 0) return -1; } @@ -8478,7 +8585,7 @@ int32_t tEncodeMqDataRspCommon(SEncoder *pEncoder, const SMqDataRsp *pRsp) { for (int32_t i = 0; i < pRsp->blockNum; i++) { int32_t bLen = *(int32_t *)taosArrayGet(pRsp->blockDataLen, i); - void * data = taosArrayGetP(pRsp->blockData, i); + void *data = taosArrayGetP(pRsp->blockData, i); if (tEncodeBinary(pEncoder, (const uint8_t *)data, bLen) < 0) return -1; if (pRsp->withSchema) { SSchemaWrapper *pSW = (SSchemaWrapper *)taosArrayGetP(pRsp->blockSchema, i); @@ -8516,7 +8623,7 @@ int32_t tDecodeMqDataRspCommon(SDecoder *pDecoder, SMqDataRsp *pRsp) { } for (int32_t i = 0; i < pRsp->blockNum; i++) { - void * data; + void *data; uint64_t bLen; if (tDecodeBinaryAlloc(pDecoder, &data, &bLen) < 0) return -1; taosArrayPush(pRsp->blockData, &data); @@ -8570,7 +8677,7 @@ int32_t tEncodeSTaosxRsp(SEncoder *pEncoder, const STaosxRsp *pRsp) { if (tEncodeI32(pEncoder, pRsp->createTableNum) < 0) return -1; if (pRsp->createTableNum) { for (int32_t i = 0; i < pRsp->createTableNum; i++) { - void * createTableReq = taosArrayGetP(pRsp->createTableReq, i); + void *createTableReq = taosArrayGetP(pRsp->createTableReq, i); int32_t createTableLen = *(int32_t *)taosArrayGet(pRsp->createTableLen, i); if (tEncodeBinary(pEncoder, createTableReq, createTableLen) < 0) return -1; } @@ -8579,14 +8686,14 @@ int32_t tEncodeSTaosxRsp(SEncoder *pEncoder, const STaosxRsp *pRsp) { } int32_t tDecodeSTaosxRsp(SDecoder *pDecoder, STaosxRsp *pRsp) { - if (tDecodeMqDataRspCommon(pDecoder, (SMqDataRsp*)pRsp) < 0) return -1; + if (tDecodeMqDataRspCommon(pDecoder, (SMqDataRsp *)pRsp) < 0) return -1; if (tDecodeI32(pDecoder, &pRsp->createTableNum) < 0) return -1; if (pRsp->createTableNum) { pRsp->createTableLen = taosArrayInit(pRsp->createTableNum, sizeof(int32_t)); pRsp->createTableReq = taosArrayInit(pRsp->createTableNum, sizeof(void *)); for (int32_t i = 0; i < pRsp->createTableNum; i++) { - void * pCreate = NULL; + void *pCreate = NULL; uint64_t len; if (tDecodeBinaryAlloc(pDecoder, &pCreate, &len) < 0) return -1; int32_t l = (int32_t)len; @@ -8889,7 +8996,7 @@ void tDestroySubmitTbData(SSubmitTbData *pTbData, int32_t flag) { taosArrayDestroy(pTbData->aCol); } else { int32_t nRow = TARRAY_SIZE(pTbData->aRowP); - SRow ** rows = (SRow **)TARRAY_DATA(pTbData->aRowP); + SRow **rows = (SRow **)TARRAY_DATA(pTbData->aRowP); for (int32_t i = 0; i < nRow; ++i) { tRowDestroy(rows[i]); diff --git a/source/util/src/tconfig.c b/source/util/src/tconfig.c index ad3c766510..40cf4395d8 100644 --- a/source/util/src/tconfig.c +++ b/source/util/src/tconfig.c @@ -21,8 +21,8 @@ #include "tgrant.h" #include "tjson.h" #include "tlog.h" -#include "tutil.h" #include "tunit.h" +#include "tutil.h" #define CFG_NAME_PRINT_LEN 24 #define CFG_SRC_PRINT_LEN 12 @@ -310,19 +310,19 @@ static int32_t cfgSetTfsItem(SConfig *pCfg, const char *name, const char *value, static int32_t cfgUpdateDebugFlagItem(SConfig *pCfg, const char *name, bool resetArray) { SConfigItem *pDebugFlagItem = cfgGetItem(pCfg, "debugFlag"); if (resetArray) { - // reset - if (pDebugFlagItem == NULL) return -1; + // reset + if (pDebugFlagItem == NULL) return -1; - // logflag names that should 'not' be set by 'debugFlag' + // logflag names that should 'not' be set by 'debugFlag' + if (pDebugFlagItem->array == NULL) { + pDebugFlagItem->array = taosArrayInit(16, sizeof(SLogVar)); if (pDebugFlagItem->array == NULL) { - pDebugFlagItem->array = taosArrayInit(16, sizeof(SLogVar)); - if (pDebugFlagItem->array == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; } - taosArrayClear(pDebugFlagItem->array); - return 0; + } + taosArrayClear(pDebugFlagItem->array); + return 0; } // update @@ -401,8 +401,7 @@ int32_t cfgCheckRangeForDynUpdate(SConfig *pCfg, const char *name, const char *p case CFG_DTYPE_BOOL: { int32_t ival = (int32_t)atoi(pVal); if (ival != 0 && ival != 1) { - uError("cfg:%s, type:%s value:%d out of range[0, 1]", pItem->name, - cfgDtypeStr(pItem->dtype), ival); + uError("cfg:%s, type:%s value:%d out of range[0, 1]", pItem->name, cfgDtypeStr(pItem->dtype), ival); terrno = TSDB_CODE_OUT_OF_RANGE; return -1; } @@ -670,6 +669,89 @@ void cfgDumpItemScope(SConfigItem *pItem, char *buf, int32_t bufSize, int32_t *p *pLen = len; } +void cfgDumpCfgS3(SConfig *pCfg, bool tsc, bool dump) { + if (dump) { + printf(" s3 config"); + printf("\n"); + printf("================================================================="); + printf("\n"); + } else { + uInfo(" s3 config"); + uInfo("================================================================="); + } + + char src[CFG_SRC_PRINT_LEN + 1] = {0}; + char name[CFG_NAME_PRINT_LEN + 1] = {0}; + + int32_t size = taosArrayGetSize(pCfg->array); + for (int32_t i = 0; i < size; ++i) { + SConfigItem *pItem = taosArrayGet(pCfg->array, i); + if (tsc && pItem->scope == CFG_SCOPE_SERVER) continue; + if (dump && strcmp(pItem->name, "scriptDir") == 0) continue; + if (dump && strncmp(pItem->name, "s3", 2) != 0) continue; + tstrncpy(src, cfgStypeStr(pItem->stype), CFG_SRC_PRINT_LEN); + for (int32_t j = 0; j < CFG_SRC_PRINT_LEN; ++j) { + if (src[j] == 0) src[j] = ' '; + } + + tstrncpy(name, pItem->name, CFG_NAME_PRINT_LEN); + for (int32_t j = 0; j < CFG_NAME_PRINT_LEN; ++j) { + if (name[j] == 0) name[j] = ' '; + } + + switch (pItem->dtype) { + case CFG_DTYPE_BOOL: + if (dump) { + printf("%s %s %u\n", src, name, pItem->bval); + } else { + uInfo("%s %s %u", src, name, pItem->bval); + } + + break; + case CFG_DTYPE_INT32: + if (dump) { + printf("%s %s %d\n", src, name, pItem->i32); + } else { + uInfo("%s %s %d", src, name, pItem->i32); + } + break; + case CFG_DTYPE_INT64: + if (dump) { + printf("%s %s %" PRId64 "\n", src, name, pItem->i64); + } else { + uInfo("%s %s %" PRId64, src, name, pItem->i64); + } + break; + case CFG_DTYPE_DOUBLE: + case CFG_DTYPE_FLOAT: + if (dump) { + printf("%s %s %.2f\n", src, name, pItem->fval); + } else { + uInfo("%s %s %.2f", src, name, pItem->fval); + } + break; + case CFG_DTYPE_STRING: + case CFG_DTYPE_DIR: + case CFG_DTYPE_LOCALE: + case CFG_DTYPE_CHARSET: + case CFG_DTYPE_TIMEZONE: + case CFG_DTYPE_NONE: + if (dump) { + printf("%s %s %s\n", src, name, pItem->str); + } else { + uInfo("%s %s %s", src, name, pItem->str); + } + break; + } + } + + if (dump) { + printf("=================================================================\n"); + } else { + uInfo("================================================================="); + } +} + void cfgDumpCfg(SConfig *pCfg, bool tsc, bool dump) { if (dump) { printf(" global config"); @@ -717,7 +799,7 @@ void cfgDumpCfg(SConfig *pCfg, bool tsc, bool dump) { break; case CFG_DTYPE_INT64: if (dump) { - printf("%s %s %" PRId64"\n", src, name, pItem->i64); + printf("%s %s %" PRId64 "\n", src, name, pItem->i64); } else { uInfo("%s %s %" PRId64, src, name, pItem->i64); }