From 764f8d2e456a00357dae70ea846dc3926f8cbf9c Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Wed, 8 Nov 2023 09:03:19 +0800 Subject: [PATCH 01/16] fix(vnode/s3): move init & cleanup to dnode --- source/dnode/mgmt/node_mgmt/src/dmEnv.c | 36 +++++++++++++------------ source/dnode/vnode/src/vnd/vnodeCos.c | 10 ++++--- 2 files changed, 25 insertions(+), 21 deletions(-) diff --git a/source/dnode/mgmt/node_mgmt/src/dmEnv.c b/source/dnode/mgmt/node_mgmt/src/dmEnv.c index d560ba1644..f79c9d97b8 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmEnv.c +++ b/source/dnode/mgmt/node_mgmt/src/dmEnv.c @@ -18,17 +18,17 @@ #include "audit.h" #include "libs/function/tudf.h" -#define DM_INIT_AUDIT() \ - do { \ - auditCfg.port = tsMonitorPort; \ - auditCfg.server = tsMonitorFqdn; \ - auditCfg.comp = tsMonitorComp; \ - if (auditInit(&auditCfg) != 0) { \ - return -1; \ - } \ +#define DM_INIT_AUDIT() \ + do { \ + auditCfg.port = tsMonitorPort; \ + auditCfg.server = tsMonitorFqdn; \ + auditCfg.comp = tsMonitorComp; \ + if (auditInit(&auditCfg) != 0) { \ + return -1; \ + } \ } while (0) -static SDnode globalDnode = {0}; +static SDnode globalDnode = {0}; SDnode *dmInstance() { return &globalDnode; } @@ -146,6 +146,9 @@ static bool dmCheckDataDirVersion() { return true; } +extern int32_t s3Begin(); +extern void s3End(); + int32_t dmInit() { dInfo("start to init dnode env"); if (dmDiskInit() != 0) return -1; @@ -156,6 +159,7 @@ int32_t dmInit() { if (dmInitMonitor() != 0) return -1; if (dmInitAudit() != 0) return -1; if (dmInitDnode(dmInstance()) != 0) return -1; + if (s3Begin() != 0) return -1; dInfo("dnode env is initialized"); return 0; @@ -181,6 +185,7 @@ void dmCleanup() { udfStopUdfd(); taosStopCacheRefreshWorker(); dmDiskClose(); + s3End(); dInfo("dnode env is cleaned up"); taosCleanupCfg(); @@ -265,19 +270,19 @@ static int32_t dmProcessAlterNodeTypeReq(EDndNodeType ntype, SRpcMsg *pMsg) { pWrapper = &pDnode->wrappers[ntype]; - if(pWrapper->func.nodeRoleFp != NULL){ + if (pWrapper->func.nodeRoleFp != NULL) { ESyncRole role = (*pWrapper->func.nodeRoleFp)(pWrapper->pMgmt); dInfo("node:%s, checking node role:%d", pWrapper->name, role); - if(role == TAOS_SYNC_ROLE_VOTER){ + if (role == TAOS_SYNC_ROLE_VOTER) { dError("node:%s, failed to alter node type since node already is role:%d", pWrapper->name, role); terrno = TSDB_CODE_MNODE_ALREADY_IS_VOTER; return -1; } } - if(pWrapper->func.isCatchUpFp != NULL){ + if (pWrapper->func.isCatchUpFp != NULL) { dInfo("node:%s, checking node catch up", pWrapper->name); - if((*pWrapper->func.isCatchUpFp)(pWrapper->pMgmt) != 1){ + if ((*pWrapper->func.isCatchUpFp)(pWrapper->pMgmt) != 1) { terrno = TSDB_CODE_MNODE_NOT_CATCH_UP; return -1; } @@ -394,7 +399,4 @@ void dmReportStartup(const char *pName, const char *pDesc) { dDebug("step:%s, %s", pStartup->name, pStartup->desc); } -int64_t dmGetClusterId() { - return globalDnode.data.clusterId; -} - +int64_t dmGetClusterId() { return globalDnode.data.clusterId; } diff --git a/source/dnode/vnode/src/vnd/vnodeCos.c b/source/dnode/vnode/src/vnd/vnodeCos.c index 6e36739f5a..a16f926f0f 100644 --- a/source/dnode/vnode/src/vnd/vnodeCos.c +++ b/source/dnode/vnode/src/vnd/vnodeCos.c @@ -24,7 +24,7 @@ static S3UriStyle uriStyleG = S3UriStylePath; static int retriesG = 5; static int timeoutMsG = 0; -static int32_t s3Begin() { +int32_t s3Begin() { S3Status status; const char *hostname = tsS3Hostname; const char *env_hn = getenv("S3_HOSTNAME"); @@ -43,10 +43,12 @@ static int32_t s3Begin() { return 0; } -static void s3End() { S3_deinitialize(); } -int32_t s3Init() { return s3Begin(); } +void s3End() { S3_deinitialize(); } -void s3CleanUp() { s3End(); } +int32_t s3Init() { return 0; /*s3Begin();*/ } + +void s3CleanUp() { /*s3End();*/ +} static int should_retry() { /* From 8bb65119a4c77f4ab9d309a7bbe9bbe51e0e7dfe Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Wed, 8 Nov 2023 10:52:23 +0800 Subject: [PATCH 02/16] dnode: fix dnode s3 init --- source/dnode/mgmt/node_mgmt/src/dmEnv.c | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/source/dnode/mgmt/node_mgmt/src/dmEnv.c b/source/dnode/mgmt/node_mgmt/src/dmEnv.c index f79c9d97b8..6f13abcebc 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmEnv.c +++ b/source/dnode/mgmt/node_mgmt/src/dmEnv.c @@ -146,9 +146,13 @@ static bool dmCheckDataDirVersion() { return true; } +#if defined(USE_S3) + extern int32_t s3Begin(); extern void s3End(); +#endif + int32_t dmInit() { dInfo("start to init dnode env"); if (dmDiskInit() != 0) return -1; @@ -159,7 +163,9 @@ int32_t dmInit() { if (dmInitMonitor() != 0) return -1; if (dmInitAudit() != 0) return -1; if (dmInitDnode(dmInstance()) != 0) return -1; +#if defined(USE_S3) if (s3Begin() != 0) return -1; +#endif dInfo("dnode env is initialized"); return 0; @@ -185,7 +191,9 @@ void dmCleanup() { udfStopUdfd(); taosStopCacheRefreshWorker(); dmDiskClose(); +#if defined(USE_S3) s3End(); +#endif dInfo("dnode env is cleaned up"); taosCleanupCfg(); From 15e1e4cd2c1ccc2729651f7a492616d57469641a Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Wed, 8 Nov 2023 14:00:10 +0800 Subject: [PATCH 03/16] dnode/node_mgmt: cmake define for use_s3 --- source/dnode/mgmt/node_mgmt/CMakeLists.txt | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/source/dnode/mgmt/node_mgmt/CMakeLists.txt b/source/dnode/mgmt/node_mgmt/CMakeLists.txt index f1be20289a..6b875db860 100644 --- a/source/dnode/mgmt/node_mgmt/CMakeLists.txt +++ b/source/dnode/mgmt/node_mgmt/CMakeLists.txt @@ -3,6 +3,17 @@ add_library(dnode STATIC ${IMPLEMENT_SRC}) target_link_libraries( dnode mgmt_mnode mgmt_qnode mgmt_snode mgmt_vnode mgmt_dnode ) + +IF (TD_STORAGE) + + IF(${BUILD_WITH_S3}) + add_definitions(-DUSE_S3) + ELSEIF(${BUILD_WITH_COS}) + add_definitions(-DUSE_COS) + ENDIF() + +ENDIF () + target_include_directories( dnode PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc" From 3f125bc6bf45c878a41bebdaa79251a7c282b2ac Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Wed, 8 Nov 2023 14:30:30 +0800 Subject: [PATCH 04/16] vnode/cos: make err msg buffer big enough for detailed msg --- source/dnode/vnode/src/vnd/vnodeCos.c | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/source/dnode/vnode/src/vnd/vnodeCos.c b/source/dnode/vnode/src/vnd/vnodeCos.c index a16f926f0f..d2db8be026 100644 --- a/source/dnode/vnode/src/vnd/vnodeCos.c +++ b/source/dnode/vnode/src/vnd/vnodeCos.c @@ -74,7 +74,7 @@ static void s3PrintError(const char *func, S3Status status, char error_details[] } typedef struct { - char err_msg[128]; + char err_msg[512]; S3Status status; uint64_t content_length; char *buf; @@ -203,7 +203,7 @@ static void growbuffer_destroy(growbuffer *gb) { } typedef struct put_object_callback_data { - char err_msg[128]; + char err_msg[512]; S3Status status; // FILE *infile; TdFilePtr infileFD; @@ -216,7 +216,7 @@ typedef struct put_object_callback_data { #define MULTIPART_CHUNK_SIZE (768 << 20) // multipart is 768M typedef struct UploadManager { - char err_msg[128]; + char err_msg[512]; S3Status status; // used for initial multipart char *upload_id; @@ -231,7 +231,7 @@ typedef struct UploadManager { } UploadManager; typedef struct list_parts_callback_data { - char err_msg[128]; + char err_msg[512]; S3Status status; int isTruncated; char nextPartNumberMarker[24]; @@ -248,7 +248,7 @@ typedef struct list_parts_callback_data { } list_parts_callback_data; typedef struct MultipartPartData { - char err_msg[128]; + char err_msg[512]; S3Status status; put_object_callback_data put_object_data; int seq; @@ -611,7 +611,7 @@ int32_t s3PutObjectFromFile2(const char *file, const char *object) { } typedef struct list_bucket_callback_data { - char err_msg[128]; + char err_msg[512]; S3Status status; int isTruncated; char nextMarker[1024]; From a1e692a796b64d0d3585cbade05f528b69907084 Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Wed, 8 Nov 2023 15:18:55 +0800 Subject: [PATCH 05/16] fix(vnode/cos): fix error printing to avoid buffer overflow --- source/dnode/vnode/src/vnd/vnodeCos.c | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/source/dnode/vnode/src/vnd/vnodeCos.c b/source/dnode/vnode/src/vnd/vnodeCos.c index d2db8be026..d7eced78fb 100644 --- a/source/dnode/vnode/src/vnd/vnodeCos.c +++ b/source/dnode/vnode/src/vnd/vnodeCos.c @@ -99,20 +99,22 @@ static void responseCompleteCallback(S3Status status, const S3ErrorDetails *erro int len = 0; const int elen = sizeof(cbd->err_msg); if (error) { - if (error->message) { + if (error->message && elen - len > 0) { len += snprintf(&(cbd->err_msg[len]), elen - len, " Message: %s\n", error->message); } - if (error->resource) { + if (error->resource && elen - len > 0) { len += snprintf(&(cbd->err_msg[len]), elen - len, " Resource: %s\n", error->resource); } - if (error->furtherDetails) { + if (error->furtherDetails && elen - len > 0) { len += snprintf(&(cbd->err_msg[len]), elen - len, " Further Details: %s\n", error->furtherDetails); } - if (error->extraDetailsCount) { + if (error->extraDetailsCount && elen - len > 0) { len += snprintf(&(cbd->err_msg[len]), elen - len, "%s", " Extra Details:\n"); for (int i = 0; i < error->extraDetailsCount; i++) { - len += snprintf(&(cbd->err_msg[len]), elen - len, " %s: %s\n", error->extraDetails[i].name, - error->extraDetails[i].value); + if (elen - len > 0) { + len += snprintf(&(cbd->err_msg[len]), elen - len, " %s: %s\n", error->extraDetails[i].name, + error->extraDetails[i].value); + } } } } @@ -205,6 +207,7 @@ static void growbuffer_destroy(growbuffer *gb) { typedef struct put_object_callback_data { char err_msg[512]; S3Status status; + uint64_t content_length; // FILE *infile; TdFilePtr infileFD; growbuffer *gb; @@ -218,6 +221,7 @@ typedef struct put_object_callback_data { typedef struct UploadManager { char err_msg[512]; S3Status status; + uint64_t content_length; // used for initial multipart char *upload_id; @@ -233,6 +237,7 @@ typedef struct UploadManager { typedef struct list_parts_callback_data { char err_msg[512]; S3Status status; + uint64_t content_length; int isTruncated; char nextPartNumberMarker[24]; char initiatorId[256]; From e5bbcf76f1843a68fe478d5ecf22d80bab16f1f1 Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Wed, 8 Nov 2023 15:57:56 +0800 Subject: [PATCH 06/16] vnode/cos: fix get object block callback --- source/dnode/vnode/src/vnd/vnodeCos.c | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/source/dnode/vnode/src/vnd/vnodeCos.c b/source/dnode/vnode/src/vnd/vnodeCos.c index d7eced78fb..f9d6ffe544 100644 --- a/source/dnode/vnode/src/vnd/vnodeCos.c +++ b/source/dnode/vnode/src/vnd/vnodeCos.c @@ -78,6 +78,7 @@ typedef struct { S3Status status; uint64_t content_length; char *buf; + int64_t buf_pos; } TS3SizeCBD; static S3Status responsePropertiesCallback(const S3ResponseProperties *properties, void *callbackData) { @@ -730,15 +731,19 @@ void s3DeleteObjects(const char *object_name[], int nobject) { static S3Status getObjectDataCallback(int bufferSize, const char *buffer, void *callbackData) { TS3SizeCBD *cbd = callbackData; + /* if (cbd->content_length != bufferSize) { cbd->status = S3StatusAbortedByCallback; return S3StatusAbortedByCallback; } + */ + if (!cbd->buf) { + cbd->buf = taosMemoryCalloc(1, bufferSize); + } - char *buf = taosMemoryCalloc(1, bufferSize); - if (buf) { - memcpy(buf, buffer, bufferSize); - cbd->buf = buf; + if (cbd->buf) { + memcpy(cbd->buf + cbd->buf_pos, buffer, bufferSize); + cbd->buf_pos += bufferSize; cbd->status = S3StatusOK; return S3StatusOK; } else { @@ -760,6 +765,7 @@ int32_t s3GetObjectBlock(const char *object_name, int64_t offset, int64_t size, TS3SizeCBD cbd = {0}; cbd.content_length = size; + cbd.buf_pos = 0; do { S3_get_object(&bucketContext, object_name, &getConditions, offset, size, 0, 0, &getObjectHandler, &cbd); } while (S3_status_is_retryable(cbd.status) && should_retry()); From 128353a861e0a18a116ff6ab44738d3716101a80 Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Wed, 8 Nov 2023 16:04:24 +0800 Subject: [PATCH 07/16] vnode/cos: fix get object handler mem --- source/dnode/vnode/src/vnd/vnodeCos.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/dnode/vnode/src/vnd/vnodeCos.c b/source/dnode/vnode/src/vnd/vnodeCos.c index f9d6ffe544..0bb16fcd9c 100644 --- a/source/dnode/vnode/src/vnd/vnodeCos.c +++ b/source/dnode/vnode/src/vnd/vnodeCos.c @@ -738,7 +738,7 @@ static S3Status getObjectDataCallback(int bufferSize, const char *buffer, void * } */ if (!cbd->buf) { - cbd->buf = taosMemoryCalloc(1, bufferSize); + cbd->buf = taosMemoryCalloc(1, cbd->content_length); } if (cbd->buf) { From 5ccdde4495df36350ec342236c1755da8603b16c Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Wed, 8 Nov 2023 20:43:52 +0800 Subject: [PATCH 08/16] vnode/cos: error on incomplete fetching --- source/dnode/vnode/src/vnd/vnodeCos.c | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/source/dnode/vnode/src/vnd/vnodeCos.c b/source/dnode/vnode/src/vnd/vnodeCos.c index 0bb16fcd9c..9941c53750 100644 --- a/source/dnode/vnode/src/vnd/vnodeCos.c +++ b/source/dnode/vnode/src/vnd/vnodeCos.c @@ -775,6 +775,11 @@ int32_t s3GetObjectBlock(const char *object_name, int64_t offset, int64_t size, return TAOS_SYSTEM_ERROR(EIO); } + if (cbd.buf_pos != size) { + vError("%s: %d(%s)", __func__, cbd.status, cbd.err_msg); + return TAOS_SYSTEM_ERROR(EIO); + } + *ppBlock = cbd.buf; return 0; From c58ec72031866186a1fe495d6ca3e12f5d3b6c3a Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Thu, 9 Nov 2023 09:27:43 +0800 Subject: [PATCH 09/16] config/s3blocksize: enable alter for debugging --- source/common/src/tglobal.c | 30 ++++++++++++++++++------------ 1 file changed, 18 insertions(+), 12 deletions(-) diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index cb67fc1ba3..ead9a5926b 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -94,8 +94,8 @@ int32_t tsMonitorMaxLogs = 100; bool tsMonitorComp = false; // audit -bool tsEnableAudit = true; -bool tsEnableAuditCreateTable = true; +bool tsEnableAudit = true; +bool tsEnableAuditCreateTable = true; // telem #ifdef TD_ENTERPRISE @@ -222,7 +222,7 @@ char tsCompressor[32] = "ZSTD_COMPRESSOR"; // ZSTD_COMPRESSOR or GZIP_COMPR #ifdef WINDOWS bool tsStartUdfd = false; #else -bool tsStartUdfd = true; +bool tsStartUdfd = true; #endif // wal @@ -332,7 +332,9 @@ int32_t taosSetS3Cfg(SConfig *pCfg) { return 0; } -struct SConfig *taosGetCfg() { return tsCfg; } +struct SConfig *taosGetCfg() { + return tsCfg; +} static int32_t taosLoadCfg(SConfig *pCfg, const char **envCmd, const char *inputCfgDir, const char *envFile, char *apolloUrl) { @@ -442,8 +444,8 @@ static int32_t taosAddClientCfg(SConfig *pCfg) { if (cfgAddBool(pCfg, "enableScience", tsEnableScience, CFG_SCOPE_CLIENT, CFG_DYN_NONE) != 0) return -1; if (cfgAddInt32(pCfg, "querySmaOptimize", tsQuerySmaOptimize, 0, 1, CFG_SCOPE_CLIENT, CFG_DYN_CLIENT) != 0) return -1; if (cfgAddBool(pCfg, "queryPlannerTrace", tsQueryPlannerTrace, CFG_SCOPE_CLIENT, CFG_DYN_CLIENT) != 0) return -1; - if (cfgAddInt32(pCfg, "queryNodeChunkSize", tsQueryNodeChunkSize, 1024, 128 * 1024, CFG_SCOPE_CLIENT, CFG_DYN_CLIENT) != - 0) + if (cfgAddInt32(pCfg, "queryNodeChunkSize", tsQueryNodeChunkSize, 1024, 128 * 1024, CFG_SCOPE_CLIENT, + CFG_DYN_CLIENT) != 0) return -1; if (cfgAddBool(pCfg, "queryUseNodeAllocator", tsQueryUseNodeAllocator, CFG_SCOPE_CLIENT, CFG_DYN_CLIENT) != 0) return -1; @@ -459,7 +461,8 @@ static int32_t taosAddClientCfg(SConfig *pCfg) { // if (cfgAddInt32(pCfg, "smlBatchSize", tsSmlBatchSize, 1, INT32_MAX, CFG_SCOPE_CLIENT, CFG_DYN_NONE) != 0) // return -1; if (cfgAddInt32(pCfg, "maxShellConns", tsMaxShellConns, 10, 50000000, CFG_SCOPE_CLIENT, CFG_DYN_NONE) != 0) return -1; - if (cfgAddInt32(pCfg, "maxInsertBatchRows", tsMaxInsertBatchRows, 1, INT32_MAX, CFG_SCOPE_CLIENT, CFG_DYN_CLIENT) != 0) + if (cfgAddInt32(pCfg, "maxInsertBatchRows", tsMaxInsertBatchRows, 1, INT32_MAX, CFG_SCOPE_CLIENT, CFG_DYN_CLIENT) != + 0) return -1; if (cfgAddInt32(pCfg, "maxRetryWaitTime", tsMaxRetryWaitTime, 0, 86400000, CFG_SCOPE_BOTH, CFG_DYN_CLIENT) != 0) return -1; @@ -546,7 +549,8 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { if (cfgAddInt32(pCfg, "supportVnodes", tsNumOfSupportVnodes, 0, 4096, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1; if (cfgAddInt32(pCfg, "statusInterval", tsStatusInterval, 1, 30, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1; - if (cfgAddInt32(pCfg, "minSlidingTime", tsMinSlidingTime, 1, 1000000, CFG_SCOPE_CLIENT, CFG_DYN_CLIENT) != 0) return -1; + if (cfgAddInt32(pCfg, "minSlidingTime", tsMinSlidingTime, 1, 1000000, CFG_SCOPE_CLIENT, CFG_DYN_CLIENT) != 0) + return -1; if (cfgAddInt32(pCfg, "minIntervalTime", tsMinIntervalTime, 1, 1000000, CFG_SCOPE_CLIENT, CFG_DYN_CLIENT) != 0) return -1; @@ -685,7 +689,8 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { if (cfgAddInt32(pCfg, "uptimeInterval", tsUptimeInterval, 1, 100000, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1; if (cfgAddInt32(pCfg, "queryRsmaTolerance", tsQueryRsmaTolerance, 0, 900000, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1; - if (cfgAddInt32(pCfg, "timeseriesThreshold", tsTimeSeriesThreshold, 0, 2000, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != 0) + if (cfgAddInt32(pCfg, "timeseriesThreshold", tsTimeSeriesThreshold, 0, 2000, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != + 0) return -1; if (cfgAddInt64(pCfg, "walFsyncDataSizeLimit", tsWalFsyncDataSizeLimit, 100 * 1024 * 1024, INT64_MAX, @@ -728,7 +733,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { if (cfgAddString(pCfg, "s3Accesskey", tsS3AccessKey, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1; if (cfgAddString(pCfg, "s3Endpoint", tsS3Endpoint, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1; if (cfgAddString(pCfg, "s3BucketName", tsS3BucketName, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1; - if (cfgAddInt32(pCfg, "s3BlockSize", tsS3BlockSize, -100, 1024 * 1024, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) + if (cfgAddInt32(pCfg, "s3BlockSize", tsS3BlockSize, -100, 1024 * 1024, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != 0) return -1; if (cfgAddInt32(pCfg, "s3BlockCacheSize", tsS3BlockCacheSize, 4, 1024 * 1024, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != 0) @@ -1669,6 +1674,7 @@ void taosCfgDynamicOptions(const char *option, const char *value) { {"ttlBatchDropNum", &tsTtlBatchDropNum}, {"ttlFlushThreshold", &tsTtlFlushThreshold}, {"ttlPushInterval", &tsTtlPushIntervalSec}, + {"s3BlockSize", &tsS3BlockSize}, {"s3BlockCacheSize", &tsS3BlockCacheSize}, {"s3PageCacheSize", &tsS3PageCacheSize}, {"s3UploadDelaySec", &tsS3UploadDelaySec}, @@ -1692,8 +1698,8 @@ void taosCfgDynamicOptions(const char *option, const char *value) { switch (pItem->dtype) { case CFG_DTYPE_BOOL: { - int32_t flag = atoi(value); - bool *pVar = options[d].optionVar; + int32_t flag = atoi(value); + bool *pVar = options[d].optionVar; uInfo("%s set from %d to %d", optName, *pVar, flag); *pVar = flag; } break; From 1bb10bb8625a9c30ad651ad9f8d1d15d3a792670 Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Thu, 9 Nov 2023 09:32:13 +0800 Subject: [PATCH 10/16] vnode/cos: check get object block size optionally --- source/dnode/vnode/src/inc/vndCos.h | 2 +- source/dnode/vnode/src/tsdb/tsdbCache.c | 2 +- source/dnode/vnode/src/tsdb/tsdbReaderWriter.c | 2 +- source/dnode/vnode/src/vnd/vnodeCos.c | 15 +++++++++------ 4 files changed, 12 insertions(+), 9 deletions(-) diff --git a/source/dnode/vnode/src/inc/vndCos.h b/source/dnode/vnode/src/inc/vndCos.h index 8581b039f8..0a055ed32a 100644 --- a/source/dnode/vnode/src/inc/vndCos.h +++ b/source/dnode/vnode/src/inc/vndCos.h @@ -38,7 +38,7 @@ void s3DeleteObjectsByPrefix(const char *prefix); void s3DeleteObjects(const char *object_name[], int nobject); bool s3Exists(const char *object_name); bool s3Get(const char *object_name, const char *path); -int32_t s3GetObjectBlock(const char *object_name, int64_t offset, int64_t size, uint8_t **ppBlock); +int32_t s3GetObjectBlock(const char *object_name, int64_t offset, int64_t size, bool check, uint8_t **ppBlock); void s3EvictCache(const char *path, long object_size); long s3Size(const char *object_name); diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c index bf6f0cf4d6..249479d6bd 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCache.c +++ b/source/dnode/vnode/src/tsdb/tsdbCache.c @@ -3099,7 +3099,7 @@ static int32_t tsdbCacheLoadBlockS3(STsdbFD *pFD, uint8_t **ppBlock) { } */ int64_t block_offset = (pFD->blkno - 1) * tsS3BlockSize * pFD->szPage; - code = s3GetObjectBlock(pFD->objName, block_offset, tsS3BlockSize * pFD->szPage, ppBlock); + code = s3GetObjectBlock(pFD->objName, block_offset, tsS3BlockSize * pFD->szPage, 0, ppBlock); if (code != TSDB_CODE_SUCCESS) { // taosMemoryFree(pBlock); // code = TSDB_CODE_OUT_OF_MEMORY; diff --git a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c index def9a73d10..ba9507530c 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c +++ b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c @@ -340,7 +340,7 @@ static int32_t tsdbReadFileS3(STsdbFD *pFD, int64_t offset, uint8_t *pBuf, int64 int64_t retrieve_offset = PAGE_OFFSET(pgno, pFD->szPage); int64_t pgnoEnd = pgno - 1 + (size - n + szPgCont - 1) / szPgCont; int64_t retrieve_size = (pgnoEnd - pgno + 1) * pFD->szPage; - code = s3GetObjectBlock(pFD->objName, retrieve_offset, retrieve_size, &pBlock); + code = s3GetObjectBlock(pFD->objName, retrieve_offset, retrieve_size, 1, &pBlock); if (code != TSDB_CODE_SUCCESS) { goto _exit; } diff --git a/source/dnode/vnode/src/vnd/vnodeCos.c b/source/dnode/vnode/src/vnd/vnodeCos.c index 9941c53750..b7a13c1664 100644 --- a/source/dnode/vnode/src/vnd/vnodeCos.c +++ b/source/dnode/vnode/src/vnd/vnodeCos.c @@ -752,7 +752,7 @@ static S3Status getObjectDataCallback(int bufferSize, const char *buffer, void * } } -int32_t s3GetObjectBlock(const char *object_name, int64_t offset, int64_t size, uint8_t **ppBlock) { +int32_t s3GetObjectBlock(const char *object_name, int64_t offset, int64_t size, bool check, uint8_t **ppBlock) { int status = 0; int64_t ifModifiedSince = -1, ifNotModifiedSince = -1; const char *ifMatch = 0, *ifNotMatch = 0; @@ -775,7 +775,7 @@ int32_t s3GetObjectBlock(const char *object_name, int64_t offset, int64_t size, return TAOS_SYSTEM_ERROR(EIO); } - if (cbd.buf_pos != size) { + if (check && cbd.buf_pos != size) { vError("%s: %d(%s)", __func__, cbd.status, cbd.err_msg); return TAOS_SYSTEM_ERROR(EIO); } @@ -1063,7 +1063,8 @@ bool s3Get(const char *object_name, const char *path) { return ret; } -int32_t s3GetObjectBlock(const char *object_name, int64_t offset, int64_t block_size, uint8_t **ppBlock) { +int32_t s3GetObjectBlock(const char *object_name, int64_t offset, int64_t block_size, bool check, uint8_t **ppBlock) { + (void)check; int32_t code = 0; cos_pool_t *p = NULL; int is_cname = 0; @@ -1255,8 +1256,10 @@ void s3DeleteObjectsByPrefix(const char *prefix) {} void s3DeleteObjects(const char *object_name[], int nobject) {} bool s3Exists(const char *object_name) { return false; } bool s3Get(const char *object_name, const char *path) { return false; } -int32_t s3GetObjectBlock(const char *object_name, int64_t offset, int64_t size, uint8_t **ppBlock) { return 0; } -void s3EvictCache(const char *path, long object_size) {} -long s3Size(const char *object_name) { return 0; } +int32_t s3GetObjectBlock(const char *object_name, int64_t offset, int64_t size, bool check, uint8_t **ppBlock) { + return 0; +} +void s3EvictCache(const char *path, long object_size) {} +long s3Size(const char *object_name) { return 0; } #endif From 5ac66679db18c91006eda9741a3f73d31b5d4cf1 Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Thu, 9 Nov 2023 10:17:46 +0800 Subject: [PATCH 11/16] config/s3blocksize: move range check from global to mnode --- source/common/src/tglobal.c | 2 +- source/dnode/mnode/impl/src/mndDnode.c | 16 ++++++++++++++++ 2 files changed, 17 insertions(+), 1 deletion(-) diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index ead9a5926b..8bb2fa3ab7 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -1674,7 +1674,7 @@ void taosCfgDynamicOptions(const char *option, const char *value) { {"ttlBatchDropNum", &tsTtlBatchDropNum}, {"ttlFlushThreshold", &tsTtlFlushThreshold}, {"ttlPushInterval", &tsTtlPushIntervalSec}, - {"s3BlockSize", &tsS3BlockSize}, + //{"s3BlockSize", &tsS3BlockSize}, {"s3BlockCacheSize", &tsS3BlockCacheSize}, {"s3PageCacheSize", &tsS3PageCacheSize}, {"s3UploadDelaySec", &tsS3UploadDelaySec}, diff --git a/source/dnode/mnode/impl/src/mndDnode.c b/source/dnode/mnode/impl/src/mndDnode.c index 85e4ef0fc2..f4108b52c6 100644 --- a/source/dnode/mnode/impl/src/mndDnode.c +++ b/source/dnode/mnode/impl/src/mndDnode.c @@ -1310,6 +1310,22 @@ static int32_t mndProcessConfigDnodeReq(SRpcMsg *pReq) { } tFreeSMCfgDnodeReq(&cfgReq); return 0; + } else if (strncasecmp(cfgReq.config, "s3blocksize", 11) == 0) { + int32_t optLen = strlen("s3blocksize"); + int32_t flag = -1; + int32_t code = mndMCfgGetValInt32(&cfgReq, optLen, &flag); + if (code < 0) return code; + + if (flag > 1024 * 1024 || (flag > -1 && flag < 4) || flag < -1) { + mError("dnode:%d, failed to config s3blocksize since value:%d. Valid range: -1 or [4, 1024 * 1024]", + cfgReq.dnodeId, flag); + terrno = TSDB_CODE_INVALID_CFG; + tFreeSMCfgDnodeReq(&cfgReq); + return -1; + } + + strcpy(dcfgReq.config, "s3blocksize"); + snprintf(dcfgReq.value, TSDB_DNODE_VALUE_LEN, "%d", flag); #endif } else { mndMCfg2DCfg(&cfgReq, &dcfgReq); From 8b6fc10bbd6313d40fe94bb443135197b16c01ab Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Thu, 9 Nov 2023 13:59:39 +0800 Subject: [PATCH 12/16] config/block-size: make range > 0 --- source/common/src/tglobal.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 8bb2fa3ab7..eb0059676e 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -733,7 +733,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { if (cfgAddString(pCfg, "s3Accesskey", tsS3AccessKey, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1; if (cfgAddString(pCfg, "s3Endpoint", tsS3Endpoint, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1; if (cfgAddString(pCfg, "s3BucketName", tsS3BucketName, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1; - if (cfgAddInt32(pCfg, "s3BlockSize", tsS3BlockSize, -100, 1024 * 1024, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != 0) + if (cfgAddInt32(pCfg, "s3BlockSize", tsS3BlockSize, 1024, 1024 * 1024, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != 0) return -1; if (cfgAddInt32(pCfg, "s3BlockCacheSize", tsS3BlockCacheSize, 4, 1024 * 1024, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != 0) From 17bddf5ff49128cb30fd0737708990c722444bad Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Thu, 9 Nov 2023 14:09:58 +0800 Subject: [PATCH 13/16] cos: use uError instead of vError --- source/common/src/cos.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/common/src/cos.c b/source/common/src/cos.c index 7d8175637b..0b6b0db885 100644 --- a/source/common/src/cos.c +++ b/source/common/src/cos.c @@ -794,7 +794,7 @@ int32_t s3GetObjectBlock(const char *object_name, int64_t offset, int64_t size, } if (check && cbd.buf_pos != size) { - vError("%s: %d(%s)", __func__, cbd.status, cbd.err_msg); + uError("%s: %d(%s)", __func__, cbd.status, cbd.err_msg); return TAOS_SYSTEM_ERROR(EIO); } From e88fb845088bdfcfebd4cab04f7251f9f9c51742 Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Thu, 9 Nov 2023 14:15:01 +0800 Subject: [PATCH 14/16] config/block-size fix --- source/common/src/tglobal.c | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index eab4c2ea77..b66d811d10 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -128,12 +128,12 @@ char tsSmlAutoChildTableNameDelimiter[TSDB_TABLE_NAME_LEN] = ""; // int32_t tsSmlBatchSize = 10000; // checkpoint backup -char tsSnodeAddress[TSDB_FQDN_LEN] = {0}; +char tsSnodeAddress[TSDB_FQDN_LEN] = {0}; int32_t tsRsyncPort = 873; #ifdef WINDOWS char tsCheckpointBackupDir[PATH_MAX] = "C:\\TDengine\\data\\backup\\checkpoint\\"; #else -char tsCheckpointBackupDir[PATH_MAX] = "/var/lib/taos/backup/checkpoint/"; +char tsCheckpointBackupDir[PATH_MAX] = "/var/lib/taos/backup/checkpoint/"; #endif // tmq @@ -335,7 +335,7 @@ int32_t taosSetS3Cfg(SConfig *pCfg) { } if (tsS3BucketName[0] != '<') { #if defined(USE_COS) || defined(USE_S3) - if(tsDiskCfgNum > 1) tsS3Enabled = true; + if (tsDiskCfgNum > 1) tsS3Enabled = true; tsS3StreamEnabled = true; #endif } @@ -678,7 +678,8 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { if (cfgAddInt32(pCfg, "rsyncPort", tsRsyncPort, 1, 65535, CFG_SCOPE_BOTH, CFG_DYN_SERVER) != 0) return -1; if (cfgAddString(pCfg, "snodeAddress", tsSnodeAddress, CFG_SCOPE_SERVER, CFG_DYN_SERVER) != 0) return -1; - if (cfgAddString(pCfg, "checkpointBackupDir", tsCheckpointBackupDir, CFG_SCOPE_SERVER, CFG_DYN_SERVER) != 0) return -1; + if (cfgAddString(pCfg, "checkpointBackupDir", tsCheckpointBackupDir, CFG_SCOPE_SERVER, CFG_DYN_SERVER) != 0) + return -1; if (cfgAddInt32(pCfg, "tmqMaxTopicNum", tmqMaxTopicNum, 1, 10000, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != 0) return -1; @@ -748,7 +749,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { if (cfgAddString(pCfg, "s3Accesskey", tsS3AccessKey, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1; if (cfgAddString(pCfg, "s3Endpoint", tsS3Endpoint, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1; if (cfgAddString(pCfg, "s3BucketName", tsS3BucketName, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1; - if (cfgAddInt32(pCfg, "s3BlockSize", tsS3BlockSize, 1024, 1024 * 1024, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != 0) + if (cfgAddInt32(pCfg, "s3BlockSize", tsS3BlockSize, -1, 1024 * 1024, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != 0) return -1; if (cfgAddInt32(pCfg, "s3BlockCacheSize", tsS3BlockCacheSize, 4, 1024 * 1024, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != 0) From 386c25a99a71092b8c7dd641e5ba2286415408d9 Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Thu, 9 Nov 2023 15:24:19 +0800 Subject: [PATCH 15/16] tsdb/retention: remove file when ref's clear --- source/dnode/vnode/src/tsdb/tsdbFile2.c | 13 ++++++++++++- source/dnode/vnode/src/tsdb/tsdbRetention.c | 12 ++++++------ 2 files changed, 18 insertions(+), 7 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbFile2.c b/source/dnode/vnode/src/tsdb/tsdbFile2.c index 9edb03d35b..bf3357dabb 100644 --- a/source/dnode/vnode/src/tsdb/tsdbFile2.c +++ b/source/dnode/vnode/src/tsdb/tsdbFile2.c @@ -14,6 +14,7 @@ */ #include "tsdbFile2.h" +#include "cos.h" // to_json static int32_t head_to_json(const STFile *file, cJSON *json); @@ -44,7 +45,17 @@ static const struct { void remove_file(const char *fname) { int32_t code = taosRemoveFile(fname); if (code) { - tsdbError("file:%s remove failed", fname); + if (tsS3Enabled) { + const char *object_name = taosDirEntryBaseName((char *)fname); + long s3_size = tsS3Enabled ? s3Size(object_name) : 0; + if (!strncmp(fname + strlen(fname) - 5, ".data", 5) && s3_size > 0) { + s3DeleteObjects(&object_name, 1); + } else { + tsdbError("file:%s remove failed", fname); + } + } else { + tsdbError("file:%s remove failed", fname); + } } else { tsdbInfo("file:%s is removed", fname); } diff --git a/source/dnode/vnode/src/tsdb/tsdbRetention.c b/source/dnode/vnode/src/tsdb/tsdbRetention.c index 1908f16529..0a41ac3cc8 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRetention.c +++ b/source/dnode/vnode/src/tsdb/tsdbRetention.c @@ -13,9 +13,9 @@ * along with this program. If not, see . */ +#include "cos.h" #include "tsdb.h" #include "tsdbFS2.h" -#include "cos.h" #include "vnd.h" typedef struct { @@ -292,15 +292,15 @@ static int32_t tsdbDoRetentionOnFileSet(SRTNer *rtner, STFileSet *fset) { if (expLevel < 0) { // remove the fileset for (int32_t ftype = 0; (ftype < TSDB_FTYPE_MAX) && (fobj = fset->farr[ftype], 1); ++ftype) { if (fobj == NULL) continue; - + /* int32_t nlevel = tfsGetLevel(rtner->tsdb->pVnode->pTfs); if (tsS3Enabled && nlevel > 1 && TSDB_FTYPE_DATA == ftype && fobj->f->did.level == nlevel - 1) { code = tsdbRemoveFileObjectS3(rtner, fobj); TSDB_CHECK_CODE(code, lino, _exit); - } else { - code = tsdbDoRemoveFileObject(rtner, fobj); - TSDB_CHECK_CODE(code, lino, _exit); - } + } else {*/ + code = tsdbDoRemoveFileObject(rtner, fobj); + TSDB_CHECK_CODE(code, lino, _exit); + //} } SSttLvl *lvl; From d3cf6a4340b7f919c2a99f15cd9949f3655811cf Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Thu, 9 Nov 2023 17:44:24 +0800 Subject: [PATCH 16/16] tsdb/file: new nlevel field for remove --- source/dnode/vnode/src/tsdb/tsdbFS2.c | 7 ++++--- source/dnode/vnode/src/tsdb/tsdbFile2.c | 9 +++++---- source/dnode/vnode/src/tsdb/tsdbFile2.h | 1 + 3 files changed, 10 insertions(+), 7 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbFS2.c b/source/dnode/vnode/src/tsdb/tsdbFS2.c index 348397272d..02ef75ae86 100644 --- a/source/dnode/vnode/src/tsdb/tsdbFS2.c +++ b/source/dnode/vnode/src/tsdb/tsdbFS2.c @@ -22,7 +22,7 @@ extern int vnodeScheduleTask(int (*execute)(void *), void *arg); extern int vnodeScheduleTaskEx(int tpid, int (*execute)(void *), void *arg); -extern void remove_file(const char *fname); +extern void remove_file(const char *fname, bool last_level); #define TSDB_FS_EDIT_MIN TSDB_FEDIT_COMMIT #define TSDB_FS_EDIT_MAX (TSDB_FEDIT_MERGE + 1) @@ -532,7 +532,8 @@ static int32_t tsdbFSDoSanAndFix(STFileSystem *fs) { if (taosIsDir(file->aname)) continue; if (tsdbFSGetFileObjHashEntry(&fobjHash, file->aname) == NULL) { - remove_file(file->aname); + int32_t nlevel = tfsGetLevel(fs->tsdb->pVnode->pTfs); + remove_file(file->aname, nlevel > 1 && file->did.level == nlevel - 1); } } @@ -1282,4 +1283,4 @@ int32_t tsdbFSEnableBgTask(STFileSystem *fs) { fs->stop = false; taosThreadMutexUnlock(&fs->tsdb->mutex); return 0; -} \ No newline at end of file +} diff --git a/source/dnode/vnode/src/tsdb/tsdbFile2.c b/source/dnode/vnode/src/tsdb/tsdbFile2.c index bf3357dabb..cc05b8ee18 100644 --- a/source/dnode/vnode/src/tsdb/tsdbFile2.c +++ b/source/dnode/vnode/src/tsdb/tsdbFile2.c @@ -42,10 +42,10 @@ static const struct { [TSDB_FTYPE_STT] = {"stt", stt_to_json, stt_from_json}, }; -void remove_file(const char *fname) { +void remove_file(const char *fname, bool last_level) { int32_t code = taosRemoveFile(fname); if (code) { - if (tsS3Enabled) { + if (tsS3Enabled && last_level) { const char *object_name = taosDirEntryBaseName((char *)fname); long s3_size = tsS3Enabled ? s3Size(object_name) : 0; if (!strncmp(fname + strlen(fname) - 5, ".data", 5) && s3_size > 0) { @@ -235,6 +235,7 @@ int32_t tsdbTFileObjInit(STsdb *pTsdb, const STFile *f, STFileObj **fobj) { fobj[0]->state = TSDB_FSTATE_LIVE; fobj[0]->ref = 1; tsdbTFileName(pTsdb, f, fobj[0]->fname); + fobj[0]->nlevel = tfsGetLevel(pTsdb->pVnode->pTfs); return 0; } @@ -256,7 +257,7 @@ int32_t tsdbTFileObjUnref(STFileObj *fobj) { tsdbTrace("unref file %s, fobj:%p ref %d", fobj->fname, fobj, nRef); if (nRef == 0) { if (fobj->state == TSDB_FSTATE_DEAD) { - remove_file(fobj->fname); + remove_file(fobj->fname, fobj->nlevel > 1 && fobj->f->did.level == fobj->nlevel - 1); } taosMemoryFree(fobj); } @@ -272,7 +273,7 @@ int32_t tsdbTFileObjRemove(STFileObj *fobj) { taosThreadMutexUnlock(&fobj->mutex); tsdbTrace("remove unref file %s, fobj:%p ref %d", fobj->fname, fobj, nRef); if (nRef == 0) { - remove_file(fobj->fname); + remove_file(fobj->fname, fobj->nlevel > 1 && fobj->f->did.level == fobj->nlevel - 1); taosMemoryFree(fobj); } return 0; diff --git a/source/dnode/vnode/src/tsdb/tsdbFile2.h b/source/dnode/vnode/src/tsdb/tsdbFile2.h index 9da198c1f0..b94f7a9fd0 100644 --- a/source/dnode/vnode/src/tsdb/tsdbFile2.h +++ b/source/dnode/vnode/src/tsdb/tsdbFile2.h @@ -76,6 +76,7 @@ struct STFileObj { STFile f[1]; int32_t state; int32_t ref; + int32_t nlevel; char fname[TSDB_FILENAME_LEN]; };