From debd1e2308d36a82df776c21d46de17b588e9d20 Mon Sep 17 00:00:00 2001 From: Shungang Li Date: Fri, 9 Aug 2024 11:09:10 +0800 Subject: [PATCH 1/5] feat: cos support for multi s3 cfg --- include/util/tdef.h | 2 + source/common/src/cos.c | 427 ++++++++++++++++++++++++++++-------- source/common/src/tglobal.c | 130 +++++++---- 3 files changed, 421 insertions(+), 138 deletions(-) diff --git a/include/util/tdef.h b/include/util/tdef.h index 39c65d2c71..890f1d8f95 100644 --- a/include/util/tdef.h +++ b/include/util/tdef.h @@ -294,6 +294,8 @@ typedef enum ELogicConditionType { #define TSDB_SHOW_SUBQUERY_LEN 1000 #define TSDB_LOG_VAR_LEN 32 +#define TSDB_MAX_EP_NUM 10 + #define TSDB_ARB_GROUP_MEMBER_NUM 2 #define TSDB_ARB_TOKEN_SIZE 32 diff --git a/source/common/src/cos.c b/source/common/src/cos.c index 335c654acd..471ab53322 100644 --- a/source/common/src/cos.c +++ b/source/common/src/cos.c @@ -5,14 +5,25 @@ #include "tdef.h" #include "tutil.h" -extern char tsS3Endpoint[]; -extern char tsS3AccessKeyId[]; -extern char tsS3AccessKeySecret[]; -extern char tsS3BucketName[]; -extern char tsS3AppId[]; -extern char tsS3Hostname[]; +extern int8_t tsS3EpNum; +extern char tsS3Endpoint[][TSDB_FQDN_LEN]; +extern char tsS3AccessKeyId[][TSDB_FQDN_LEN]; +extern char tsS3AccessKeySecret[][TSDB_FQDN_LEN]; +extern char tsS3BucketName[][TSDB_FQDN_LEN]; +extern char tsS3AppId[][TSDB_FQDN_LEN]; +extern char tsS3Hostname[][TSDB_FQDN_LEN]; extern int8_t tsS3Https; +static int32_t s3ListBucketByEp(char const *bucketname, int8_t epIndex); +static int32_t s3PutObjectFromFileOffsetByEp(const char *file, const char *object_name, int64_t offset, int64_t size, + int8_t epIndex); +static int32_t s3DeleteObjectsByEp(const char *object_name[], int nobject, int8_t epIndex); +static SArray *getListByPrefixByEp(const char *prefix, int8_t epIndex); +static int32_t s3GetObjectBlockByEp(const char *object_name, int64_t offset, int64_t size, bool check, + uint8_t **ppBlock, int8_t epIndex); +static int32_t s3GetObjectToFileByEp(const char *object_name, const char *fileName, int8_t epIndex); +static long s3SizeByEp(const char *object_name, int8_t epIndex); + #if defined(USE_S3) #include "libs3.h" @@ -32,7 +43,7 @@ extern int8_t tsS3Oss; int32_t s3Begin() { S3Status status; - const char *hostname = tsS3Hostname; + const char *hostname = tsS3Hostname[0]; const char *env_hn = getenv("S3_HOSTNAME"); if (env_hn) { @@ -63,6 +74,7 @@ static int32_t s3ListBucket(char const *bucketname); int32_t s3CheckCfg() { int32_t code = 0, lino = 0; + int8_t i = 0; if (!tsS3Enabled) { (void)fprintf(stderr, "s3 not configured.\n"); @@ -75,83 +87,91 @@ int32_t s3CheckCfg() { TAOS_CHECK_GOTO(code, &lino, _exit); } - // test put - char testdata[17] = "0123456789abcdef"; - const char *objectname[] = {"s3test.txt"}; - char path[PATH_MAX] = {0}; - int ds_len = strlen(TD_DIRSEP); - int tmp_len = strlen(tsTempDir); + for (; i < tsS3EpNum; i++) { + (void)fprintf(stdout, "test s3 ep: %d/%d.\n", i + 1, tsS3EpNum); - (void)snprintf(path, PATH_MAX, "%s", tsTempDir); - if (strncmp(tsTempDir + tmp_len - ds_len, TD_DIRSEP, ds_len) != 0) { - (void)snprintf(path + tmp_len, PATH_MAX - tmp_len, "%s", TD_DIRSEP); - (void)snprintf(path + tmp_len + ds_len, PATH_MAX - tmp_len - ds_len, "%s", objectname[0]); - } else { - (void)snprintf(path + tmp_len, PATH_MAX - tmp_len, "%s", objectname[0]); - } + // test put + char testdata[17] = "0123456789abcdef"; + const char *objectname[] = {"s3test.txt"}; + char path[PATH_MAX] = {0}; + int ds_len = strlen(TD_DIRSEP); + int tmp_len = strlen(tsTempDir); - TdFilePtr fp = taosOpenFile(path, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_READ | TD_FILE_TRUNC); - if (!fp) { - (void)fprintf(stderr, "failed to open test file: %s.\n", path); - // uError("ERROR: %s Failed to open %s", __func__, path); - TAOS_CHECK_GOTO(TAOS_SYSTEM_ERROR(errno), &lino, _exit); - } - if (taosWriteFile(fp, testdata, strlen(testdata)) < 0) { - (void)fprintf(stderr, "failed to write test file: %s.\n", path); - TAOS_CHECK_GOTO(TAOS_SYSTEM_ERROR(errno), &lino, _exit); - } - if (taosFsyncFile(fp) < 0) { - (void)fprintf(stderr, "failed to fsync test file: %s.\n", path); - TAOS_CHECK_GOTO(TAOS_SYSTEM_ERROR(errno), &lino, _exit); - } - (void)taosCloseFile(&fp); + (void)snprintf(path, PATH_MAX, "%s", tsTempDir); + if (strncmp(tsTempDir + tmp_len - ds_len, TD_DIRSEP, ds_len) != 0) { + (void)snprintf(path + tmp_len, PATH_MAX - tmp_len, "%s", TD_DIRSEP); + (void)snprintf(path + tmp_len + ds_len, PATH_MAX - tmp_len - ds_len, "%s", objectname[0]); + } else { + (void)snprintf(path + tmp_len, PATH_MAX - tmp_len, "%s", objectname[0]); + } - (void)fprintf(stderr, "\nstart to put object: %s, file: %s content: %s\n", objectname[0], path, testdata); - code = s3PutObjectFromFileOffset(path, objectname[0], 0, 16); - if (code != 0) { - (void)fprintf(stderr, "put object %s : failed.\n", objectname[0]); - TAOS_CHECK_GOTO(code, &lino, _exit); - } - (void)fprintf(stderr, "put object %s: success.\n\n", objectname[0]); + TdFilePtr fp = taosOpenFile(path, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_READ | TD_FILE_TRUNC); + if (!fp) { + (void)fprintf(stderr, "failed to open test file: %s.\n", path); + // uError("ERROR: %s Failed to open %s", __func__, path); + TAOS_CHECK_GOTO(TAOS_SYSTEM_ERROR(errno), &lino, _exit); + } + if (taosWriteFile(fp, testdata, strlen(testdata)) < 0) { + (void)fprintf(stderr, "failed to write test file: %s.\n", path); + TAOS_CHECK_GOTO(TAOS_SYSTEM_ERROR(errno), &lino, _exit); + } + if (taosFsyncFile(fp) < 0) { + (void)fprintf(stderr, "failed to fsync test file: %s.\n", path); + TAOS_CHECK_GOTO(TAOS_SYSTEM_ERROR(errno), &lino, _exit); + } + (void)taosCloseFile(&fp); - // list buckets - (void)fprintf(stderr, "start to list bucket %s by prefix s3.\n", tsS3BucketName); - code = s3ListBucket(tsS3BucketName); - if (code != 0) { - (void)fprintf(stderr, "listing bucket %s : failed.\n", tsS3BucketName); - TAOS_CHECK_GOTO(code, &lino, _exit); - } - (void)fprintf(stderr, "listing bucket %s: success.\n\n", tsS3BucketName); + (void)fprintf(stderr, "\nstart to put object: %s, file: %s content: %s\n", objectname[0], path, testdata); + code = s3PutObjectFromFileOffsetByEp(path, objectname[0], 0, 16, i); + if (code != 0) { + (void)fprintf(stderr, "put object %s : failed.\n", objectname[0]); + TAOS_CHECK_GOTO(code, &lino, _exit); + } + (void)fprintf(stderr, "put object %s: success.\n\n", objectname[0]); - // test range get - uint8_t *pBlock = NULL; - int c_offset = 10; - int c_len = 6; + // list buckets + (void)fprintf(stderr, "start to list bucket %s by prefix s3.\n", tsS3BucketName[i]); + code = s3ListBucketByEp(tsS3BucketName[i], i); + if (code != 0) { + (void)fprintf(stderr, "listing bucket %s : failed.\n", tsS3BucketName[i]); + TAOS_CHECK_GOTO(code, &lino, _exit); + } + (void)fprintf(stderr, "listing bucket %s: success.\n\n", tsS3BucketName[i]); - (void)fprintf(stderr, "start to range get object %s offset: %d len: %d.\n", objectname[0], c_offset, c_len); - code = s3GetObjectBlock(objectname[0], c_offset, c_len, true, &pBlock); - if (code != 0) { - (void)fprintf(stderr, "get object %s : failed.\n", objectname[0]); - TAOS_CHECK_GOTO(code, &lino, _exit); - } - char buf[7] = {0}; - (void)memcpy(buf, pBlock, c_len); - taosMemoryFree(pBlock); - (void)fprintf(stderr, "object content: %s\n", buf); - (void)fprintf(stderr, "get object %s: success.\n\n", objectname[0]); + // test range get + uint8_t *pBlock = NULL; + int c_offset = 10; + int c_len = 6; - // delete test object - (void)fprintf(stderr, "start to delete object: %s.\n", objectname[0]); - code = s3DeleteObjects(objectname, 1); - if (code != 0) { - (void)fprintf(stderr, "delete object %s : failed.\n", objectname[0]); - TAOS_CHECK_GOTO(code, &lino, _exit); + (void)fprintf(stderr, "start to range get object %s offset: %d len: %d.\n", objectname[0], c_offset, c_len); + code = s3GetObjectBlockByEp(objectname[0], c_offset, c_len, true, &pBlock, i); + if (code != 0) { + (void)fprintf(stderr, "get object %s : failed.\n", objectname[0]); + TAOS_CHECK_GOTO(code, &lino, _exit); + } + char buf[7] = {0}; + (void)memcpy(buf, pBlock, c_len); + taosMemoryFree(pBlock); + (void)fprintf(stderr, "object content: %s\n", buf); + (void)fprintf(stderr, "get object %s: success.\n\n", objectname[0]); + + // delete test object + (void)fprintf(stderr, "start to delete object: %s.\n", objectname[0]); + code = s3DeleteObjectsByEp(objectname, 1, i); + if (code != 0) { + (void)fprintf(stderr, "delete object %s : failed.\n", objectname[0]); + TAOS_CHECK_GOTO(code, &lino, _exit); + } + (void)fprintf(stderr, "delete object %s: success.\n\n", objectname[0]); } - (void)fprintf(stderr, "delete object %s: success.\n\n", objectname[0]); s3End(); _exit: + if (TSDB_CODE_SUCCESS != code) { + (void)fprintf(stderr, "s3 check failed, code: %d, line: %d, index: %d.\n", code, lino, i); + } + TAOS_RETURN(code); } @@ -241,6 +261,27 @@ static void responseCompleteCallback(S3Status status, const S3ErrorDetails *erro static SArray *getListByPrefix(const char *prefix); static void s3FreeObjectKey(void *pItem); +static int32_t s3ListBucketByEp(char const *bucketname, int8_t epIndex) { + int32_t code = 0; + + SArray *objectArray = getListByPrefixByEp("s3", epIndex); + if (objectArray == NULL) { + TAOS_RETURN(TSDB_CODE_FAILED); + } + + const char **object_name = TARRAY_DATA(objectArray); + int size = TARRAY_SIZE(objectArray); + + (void)fprintf(stderr, "objects:\n"); + for (int i = 0; i < size; ++i) { + (void)fprintf(stderr, "%s\n", object_name[i]); + } + + taosArrayDestroyEx(objectArray, s3FreeObjectKey); + + TAOS_RETURN(code); +} + static int32_t s3ListBucket(char const *bucketname) { int32_t code = 0; @@ -906,7 +947,29 @@ _exit: TAOS_RETURN(code); } -int32_t s3PutObjectFromFile2(const char *file, const char *object_name, int8_t withcp) { +static int32_t s3InitEpIndexArray(SArray** pIndexArray) { + SArray* indexArray = taosArrayInit(tsS3EpNum, sizeof(int8_t)); + if (!indexArray) { + uError("%s: %s", __func__, "out of memory"); + TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY); + } + + for (int8_t i = 0; i < tsS3EpNum; ++i) { + taosArraySet(indexArray, i, &i); + } + + for (int8_t i = 0; i < tsS3EpNum; ++i) { + int8_t j = taosRand() % tsS3EpNum; + int8_t tmp = *(int8_t *)taosArrayGet(indexArray, i); + taosArraySet(indexArray, i, taosArrayGet(indexArray, j)); + taosArraySet(indexArray, j, &tmp); + } + + *pIndexArray = indexArray; + TAOS_RETURN(TSDB_CODE_SUCCESS); +} + +int32_t s3PutObjectFromFile2ByEp(const char *file, const char *object_name, int8_t withcp, int8_t epIndex) { int32_t code = 0; int32_t lmtime = 0; const char *filename = 0; @@ -933,8 +996,14 @@ int32_t s3PutObjectFromFile2(const char *file, const char *object_name, int8_t w data.totalContentLength = data.totalOriginalContentLength = data.contentLength = data.originalContentLength = contentLength; - S3BucketContext bucketContext = {0, tsS3BucketName, protocolG, uriStyleG, tsS3AccessKeyId, tsS3AccessKeySecret, - 0, awsRegionG}; + S3BucketContext bucketContext = {0, + tsS3BucketName[epIndex], + protocolG, + uriStyleG, + tsS3AccessKeyId[epIndex], + tsS3AccessKeySecret[epIndex], + 0, + awsRegionG}; S3PutProperties putProperties = {contentType, md5, cacheControl, contentDispositionFilename, @@ -961,7 +1030,27 @@ int32_t s3PutObjectFromFile2(const char *file, const char *object_name, int8_t w TAOS_RETURN(code); } -int32_t s3PutObjectFromFileOffset(const char *file, const char *object_name, int64_t offset, int64_t size) { +int32_t s3PutObjectFromFile2(const char *file, const char *object_name, int8_t withcp) { + int32_t code = TSDB_CODE_SUCCESS; + + SArray *indexArray = NULL; + TAOS_CHECK_RETURN(s3InitEpIndexArray(&indexArray)); + + for (int8_t i = 0; i < tsS3EpNum; ++i) { + int8_t epIndex = *(int8_t *)taosArrayGet(indexArray, i); + code = s3PutObjectFromFile2ByEp(file, object_name, withcp, epIndex); + if (code == TSDB_CODE_SUCCESS) { + break; + } + } + + taosArrayDestroy(indexArray); + + return code; +} + +static int32_t s3PutObjectFromFileOffsetByEp(const char *file, const char *object_name, int64_t offset, int64_t size, + int8_t epIndex) { int32_t code = 0; int32_t lmtime = 0; const char *filename = 0; @@ -994,8 +1083,14 @@ int32_t s3PutObjectFromFileOffset(const char *file, const char *object_name, int data.totalContentLength = data.totalOriginalContentLength = data.contentLength = data.originalContentLength = contentLength; - S3BucketContext bucketContext = {0, tsS3BucketName, protocolG, uriStyleG, tsS3AccessKeyId, tsS3AccessKeySecret, - 0, awsRegionG}; + S3BucketContext bucketContext = {0, + tsS3BucketName[epIndex], + protocolG, + uriStyleG, + tsS3AccessKeyId[epIndex], + tsS3AccessKeySecret[epIndex], + 0, + awsRegionG}; S3PutProperties putProperties = {contentType, md5, cacheControl, contentDispositionFilename, @@ -1018,6 +1113,25 @@ int32_t s3PutObjectFromFileOffset(const char *file, const char *object_name, int TAOS_RETURN(code); } +int32_t s3PutObjectFromFileOffset(const char *file, const char *object_name, int64_t offset, int64_t size) { + int32_t code = TSDB_CODE_SUCCESS; + + SArray *indexArray = NULL; + TAOS_CHECK_RETURN(s3InitEpIndexArray(&indexArray)); + + for (int8_t i = 0; i < tsS3EpNum; ++i) { + int8_t epIndex = *(int8_t *)taosArrayGet(indexArray, i); + code = s3PutObjectFromFileOffsetByEp(file, object_name, offset, size, epIndex); + if (code == TSDB_CODE_SUCCESS) { + break; + } + } + + taosArrayDestroy(indexArray); + + return code; +} + typedef struct list_bucket_callback_data { char err_msg[512]; S3Status status; @@ -1068,9 +1182,15 @@ static void s3FreeObjectKey(void *pItem) { taosMemoryFree(key); } -static SArray *getListByPrefix(const char *prefix) { - S3BucketContext bucketContext = {0, tsS3BucketName, protocolG, uriStyleG, tsS3AccessKeyId, tsS3AccessKeySecret, - 0, awsRegionG}; +static SArray *getListByPrefixByEp(const char *prefix, int8_t epIndex) { + S3BucketContext bucketContext = {0, + tsS3BucketName[epIndex], + protocolG, + uriStyleG, + tsS3AccessKeyId[epIndex], + tsS3AccessKeySecret[epIndex], + 0, + awsRegionG}; S3ListBucketHandler listBucketHandler = {{&responsePropertiesCallbackNull, &responseCompleteCallback}, &listBucketCallback}; @@ -1114,11 +1234,37 @@ static SArray *getListByPrefix(const char *prefix) { return NULL; } -int32_t s3DeleteObjects(const char *object_name[], int nobject) { +static SArray *getListByPrefix(const char *prefix) { + SArray *objectArray = NULL; + SArray *indexArray = NULL; + if (s3InitEpIndexArray(&indexArray) != TSDB_CODE_SUCCESS) { + return NULL; + } + + for (int8_t i = 0; i < tsS3EpNum; ++i) { + int8_t epIndex = *(int8_t *)taosArrayGet(indexArray, i); + objectArray = getListByPrefixByEp(prefix, epIndex); + if (objectArray) { + break; + } + } + + taosArrayDestroy(indexArray); + + return objectArray; +} + +static int32_t s3DeleteObjectsByEp(const char *object_name[], int nobject, int8_t epIndex) { int32_t code = 0; - S3BucketContext bucketContext = {0, tsS3BucketName, protocolG, uriStyleG, tsS3AccessKeyId, tsS3AccessKeySecret, - 0, awsRegionG}; + S3BucketContext bucketContext = {0, + tsS3BucketName[epIndex], + protocolG, + uriStyleG, + tsS3AccessKeyId[epIndex], + tsS3AccessKeySecret[epIndex], + 0, + awsRegionG}; S3ResponseHandler responseHandler = {0, &responseCompleteCallback}; for (int i = 0; i < nobject; ++i) { @@ -1136,6 +1282,25 @@ int32_t s3DeleteObjects(const char *object_name[], int nobject) { TAOS_RETURN(code); } +int32_t s3DeleteObjects(const char *object_name[], int nobject) { + int32_t code = 0; + + SArray *indexArray = NULL; + TAOS_CHECK_RETURN(s3InitEpIndexArray(&indexArray)); + + for (int8_t i = 0; i < tsS3EpNum; ++i) { + int8_t epIndex = *(int8_t *)taosArrayGet(indexArray, i); + code = s3DeleteObjectsByEp(object_name, nobject, epIndex); + if (code == TSDB_CODE_SUCCESS) { + break; + } + } + + taosArrayDestroy(indexArray); + + return code; +} + void s3DeleteObjectsByPrefix(const char *prefix) { SArray *objectArray = getListByPrefix(prefix); if (objectArray == NULL) return; @@ -1166,13 +1331,20 @@ static S3Status getObjectDataCallback(int bufferSize, const char *buffer, void * } } -int32_t s3GetObjectBlock(const char *object_name, int64_t offset, int64_t size, bool check, uint8_t **ppBlock) { +static int32_t s3GetObjectBlockByEp(const char *object_name, int64_t offset, int64_t size, bool check, + uint8_t **ppBlock, int8_t epIndex) { int status = 0; int64_t ifModifiedSince = -1, ifNotModifiedSince = -1; const char *ifMatch = 0, *ifNotMatch = 0; - S3BucketContext bucketContext = {0, tsS3BucketName, protocolG, uriStyleG, tsS3AccessKeyId, tsS3AccessKeySecret, - 0, awsRegionG}; + S3BucketContext bucketContext = {0, + tsS3BucketName[epIndex], + protocolG, + uriStyleG, + tsS3AccessKeyId[epIndex], + tsS3AccessKeySecret[epIndex], + 0, + awsRegionG}; S3GetConditions getConditions = {ifModifiedSince, ifNotModifiedSince, ifMatch, ifNotMatch}; S3GetObjectHandler getObjectHandler = {{&responsePropertiesCallback, &responseCompleteCallback}, &getObjectDataCallback}; @@ -1213,18 +1385,43 @@ _retry: TAOS_RETURN(TSDB_CODE_SUCCESS); } +int32_t s3GetObjectBlock(const char *object_name, int64_t offset, int64_t size, bool check, uint8_t **ppBlock) { + int32_t code = 0; + + SArray *indexArray = NULL; + TAOS_CHECK_RETURN(s3InitEpIndexArray(&indexArray)); + + for (int8_t i = 0; i < tsS3EpNum; ++i) { + int8_t epIndex = *(int8_t *)taosArrayGet(indexArray, i); + code = s3GetObjectBlockByEp(object_name, offset, size, check, ppBlock, epIndex); + if (code == TSDB_CODE_SUCCESS) { + break; + } + } + + taosArrayDestroy(indexArray); + + return code; +} + static S3Status getObjectCallback(int bufferSize, const char *buffer, void *callbackData) { TS3GetData *cbd = (TS3GetData *)callbackData; size_t wrote = taosWriteFile(cbd->file, buffer, bufferSize); return ((wrote < (size_t)bufferSize) ? S3StatusAbortedByCallback : S3StatusOK); } -int32_t s3GetObjectToFile(const char *object_name, const char *fileName) { +static int32_t s3GetObjectToFileByEp(const char *object_name, const char *fileName, int8_t epIndex) { int64_t ifModifiedSince = -1, ifNotModifiedSince = -1; const char *ifMatch = 0, *ifNotMatch = 0; - S3BucketContext bucketContext = {0, tsS3BucketName, protocolG, uriStyleG, tsS3AccessKeyId, tsS3AccessKeySecret, - 0, awsRegionG}; + S3BucketContext bucketContext = {0, + tsS3BucketName[epIndex], + protocolG, + uriStyleG, + tsS3AccessKeyId[epIndex], + tsS3AccessKeySecret[epIndex], + 0, + awsRegionG}; S3GetConditions getConditions = {ifModifiedSince, ifNotModifiedSince, ifMatch, ifNotMatch}; S3GetObjectHandler getObjectHandler = {{&responsePropertiesCallbackNull, &responseCompleteCallback}, &getObjectCallback}; @@ -1252,6 +1449,25 @@ int32_t s3GetObjectToFile(const char *object_name, const char *fileName) { TAOS_RETURN(TSDB_CODE_SUCCESS); } +int32_t s3GetObjectToFile(const char *object_name, const char *fileName) { + int32_t code = 0; + + SArray *indexArray = NULL; + TAOS_CHECK_RETURN(s3InitEpIndexArray(&indexArray)); + + for (int8_t i = 0; i < tsS3EpNum; ++i) { + int8_t epIndex = *(int8_t *)taosArrayGet(indexArray, i); + code = s3GetObjectToFileByEp(object_name, fileName, epIndex); + if (code == TSDB_CODE_SUCCESS) { + break; + } + } + + taosArrayDestroy(indexArray); + + return code; +} + int32_t s3GetObjectsByPrefix(const char *prefix, const char *path) { SArray *objectArray = getListByPrefix(prefix); if (objectArray == NULL) TAOS_RETURN(TSDB_CODE_FAILED); @@ -1275,12 +1491,18 @@ int32_t s3GetObjectsByPrefix(const char *prefix, const char *path) { return 0; } -long s3Size(const char *object_name) { +static long s3SizeByEp(const char *object_name, int8_t epIndex) { long size = 0; int status = 0; - S3BucketContext bucketContext = {0, tsS3BucketName, protocolG, uriStyleG, tsS3AccessKeyId, tsS3AccessKeySecret, - 0, awsRegionG}; + S3BucketContext bucketContext = {0, + tsS3BucketName[epIndex], + protocolG, + uriStyleG, + tsS3AccessKeyId[epIndex], + tsS3AccessKeySecret[epIndex], + 0, + awsRegionG}; S3ResponseHandler responseHandler = {&responsePropertiesCallback, &responseCompleteCallback}; @@ -1300,6 +1522,25 @@ long s3Size(const char *object_name) { return size; } +long s3Size(const char *object_name) { + long size = 0; + + SArray *indexArray = NULL; + TAOS_CHECK_RETURN(s3InitEpIndexArray(&indexArray)); + + for (int8_t i = 0; i < tsS3EpNum; ++i) { + int8_t epIndex = *(int8_t *)taosArrayGet(indexArray, i); + size = s3SizeByEp(object_name, epIndex); + if (size > 0) { + break; + } + } + + taosArrayDestroy(indexArray); + + return size; +} + void s3EvictCache(const char *path, long object_size) {} #elif defined(USE_COS) diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index cd0ca95d3b..e3048bcecf 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -294,19 +294,20 @@ bool tsFilterScalarMode = false; int tsResolveFQDNRetryTime = 100; // seconds int tsStreamAggCnt = 100000; -char tsS3Endpoint[TSDB_FQDN_LEN] = ""; -char tsS3AccessKey[TSDB_FQDN_LEN] = ""; -char tsS3AccessKeyId[TSDB_FQDN_LEN] = ""; -char tsS3AccessKeySecret[TSDB_FQDN_LEN] = ""; -char tsS3BucketName[TSDB_FQDN_LEN] = ""; -char tsS3AppId[TSDB_FQDN_LEN] = ""; +int8_t tsS3EpNum = 0; +char tsS3Endpoint[TSDB_MAX_EP_NUM][TSDB_FQDN_LEN] = {""}; +char tsS3AccessKey[TSDB_MAX_EP_NUM][TSDB_FQDN_LEN] = {""}; +char tsS3AccessKeyId[TSDB_MAX_EP_NUM][TSDB_FQDN_LEN] = {""}; +char tsS3AccessKeySecret[TSDB_MAX_EP_NUM][TSDB_FQDN_LEN] = {""}; +char tsS3BucketName[TSDB_MAX_EP_NUM][TSDB_FQDN_LEN] = {""}; +char tsS3AppId[TSDB_MAX_EP_NUM][TSDB_FQDN_LEN] = {""}; int8_t tsS3Enabled = false; int8_t tsS3EnabledCfg = false; int8_t tsS3Oss = false; int8_t tsS3StreamEnabled = false; int8_t tsS3Https = true; -char tsS3Hostname[TSDB_FQDN_LEN] = ""; +char tsS3Hostname[TSDB_MAX_EP_NUM][TSDB_FQDN_LEN] = {""}; int32_t tsS3BlockSize = -1; // number of tsdb pages (4096) int32_t tsS3BlockCacheSize = 16; // number of blocks @@ -349,49 +350,88 @@ int32_t taosSetTfsCfg(SConfig *pCfg) { int32_t taosSetTfsCfg(SConfig *pCfg); #endif -int32_t taosSetS3Cfg(SConfig *pCfg) { - SConfigItem *pItem = NULL; +static int32_t taosSplitS3Cfg(SConfig *pCfg, const char *name, char gVarible[TSDB_MAX_EP_NUM][TSDB_FQDN_LEN], + int8_t *pNum) { + int32_t code = TSDB_CODE_SUCCESS; - TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "s3Accesskey"); - tstrncpy(tsS3AccessKey, pItem->str, TSDB_FQDN_LEN); - if (tsS3AccessKey[0] == '<') { + SConfigItem *pItem = NULL; + int32_t num = 0; + TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, name); + + char *strDup = NULL; + if ((strDup = taosStrdup(pItem->str))== NULL){ + code = TSDB_CODE_OUT_OF_MEMORY; + goto _exit; + } + + char **slices = strsplit(strDup, ",", &num); + if (num > TSDB_MAX_EP_NUM) { + code = TSDB_CODE_INVALID_CFG; + goto _exit; + } + + for (int i = 0; i < num; ++i) { + tstrncpy(gVarible[i], slices[i], TSDB_FQDN_LEN); + } + *pNum = num; + +_exit: + taosMemoryFreeClear(strDup); + TAOS_RETURN(code); +} + +int32_t taosSetS3Cfg(SConfig *pCfg) { + int8_t num = 0; + + TAOS_CHECK_RETURN(taosSplitS3Cfg(pCfg, "s3Accesskey", tsS3AccessKey, &num)); + if (num == 0) TAOS_RETURN(TSDB_CODE_SUCCESS); + + tsS3EpNum = num; + + if (tsS3AccessKey[0][0] == '<') { TAOS_RETURN(TSDB_CODE_SUCCESS); } - char *colon = strchr(tsS3AccessKey, ':'); - if (!colon) { - uError("invalid access key:%s", tsS3AccessKey); - TAOS_RETURN(TSDB_CODE_INVALID_CFG); - } - *colon = '\0'; - tstrncpy(tsS3AccessKeyId, tsS3AccessKey, TSDB_FQDN_LEN); - tstrncpy(tsS3AccessKeySecret, colon + 1, TSDB_FQDN_LEN); - TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "s3Endpoint"); - tstrncpy(tsS3Endpoint, pItem->str, TSDB_FQDN_LEN); - TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "s3BucketName"); - tstrncpy(tsS3BucketName, pItem->str, TSDB_FQDN_LEN); - char *proto = strstr(tsS3Endpoint, "https://"); - if (!proto) { - tsS3Https = false; - tstrncpy(tsS3Hostname, tsS3Endpoint + 7, TSDB_FQDN_LEN); - } else { - tstrncpy(tsS3Hostname, tsS3Endpoint + 8, TSDB_FQDN_LEN); + for (int i = 0; i < tsS3EpNum; ++i) { + char *colon = strchr(tsS3AccessKey[i], ':'); + if (!colon) { + uError("invalid access key:%s", tsS3AccessKey[i]); + TAOS_RETURN(TSDB_CODE_INVALID_CFG); + } + *colon = '\0'; + tstrncpy(tsS3AccessKeyId[i], tsS3AccessKey[i], TSDB_FQDN_LEN); + tstrncpy(tsS3AccessKeySecret[i], colon + 1, TSDB_FQDN_LEN); } - char *cos = strstr(tsS3Endpoint, "cos."); - if (cos) { - char *appid = strrchr(tsS3BucketName, '-'); - if (!appid) { - uError("failed to locate appid in bucket:%s", tsS3BucketName); - TAOS_RETURN(TSDB_CODE_INVALID_CFG); + TAOS_CHECK_RETURN(taosSplitS3Cfg(pCfg, "s3Endpoint", tsS3Endpoint, &num)); + if (num != tsS3EpNum) TAOS_RETURN(TSDB_CODE_INVALID_CFG); + + TAOS_CHECK_RETURN(taosSplitS3Cfg(pCfg, "s3BucketName", tsS3BucketName, &num)); + if (num != tsS3EpNum) TAOS_RETURN(TSDB_CODE_INVALID_CFG); + + for (int i = 0; i < tsS3EpNum; ++i) { + char *proto = strstr(tsS3Endpoint[i], "https://"); + if (!proto) { + tstrncpy(tsS3Hostname[i], tsS3Endpoint[i] + 7, TSDB_FQDN_LEN); } else { - tstrncpy(tsS3AppId, appid + 1, TSDB_FQDN_LEN); + tstrncpy(tsS3Hostname[i], tsS3Endpoint[i] + 8, TSDB_FQDN_LEN); + } + + char *cos = strstr(tsS3Endpoint[i], "cos."); + if (cos) { + char *appid = strrchr(tsS3BucketName[i], '-'); + if (!appid) { + uError("failed to locate appid in bucket:%s", tsS3BucketName[i]); + TAOS_RETURN(TSDB_CODE_INVALID_CFG); + } else { + tstrncpy(tsS3AppId[i], appid + 1, TSDB_FQDN_LEN); + } } } - char *oss = strstr(tsS3Endpoint, "aliyuncs."); - if (oss) { - tsS3Oss = true; - } - if (tsS3BucketName[0] != '<') { + + tsS3Https = (strstr(tsS3Endpoint[0], "https://") != NULL); + tsS3Oss = (strstr(tsS3Endpoint[0], "aliyuncs.") != NULL); + + if (tsS3BucketName[0][0] != '<') { #if defined(USE_COS) || defined(USE_S3) #ifdef TD_ENTERPRISE /*if (tsDiskCfgNum > 1) */ tsS3Enabled = true; @@ -775,9 +815,9 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "pqSortMemThreshold", tsPQSortMemThreshold, 1, 10240, CFG_SCOPE_SERVER, CFG_DYN_NONE)); TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "resolveFQDNRetryTime", tsResolveFQDNRetryTime, 1, 10240, CFG_SCOPE_SERVER, CFG_DYN_NONE)); - TAOS_CHECK_RETURN(cfgAddString(pCfg, "s3Accesskey", tsS3AccessKey, CFG_SCOPE_SERVER, CFG_DYN_NONE)); - TAOS_CHECK_RETURN(cfgAddString(pCfg, "s3Endpoint", tsS3Endpoint, CFG_SCOPE_SERVER, CFG_DYN_NONE)); - TAOS_CHECK_RETURN(cfgAddString(pCfg, "s3BucketName", tsS3BucketName, CFG_SCOPE_SERVER, CFG_DYN_NONE)); + TAOS_CHECK_RETURN(cfgAddString(pCfg, "s3Accesskey", tsS3AccessKey[0], CFG_SCOPE_SERVER, CFG_DYN_NONE)); + TAOS_CHECK_RETURN(cfgAddString(pCfg, "s3Endpoint", tsS3Endpoint[0], CFG_SCOPE_SERVER, CFG_DYN_NONE)); + TAOS_CHECK_RETURN(cfgAddString(pCfg, "s3BucketName", tsS3BucketName[0], CFG_SCOPE_SERVER, CFG_DYN_NONE)); TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "s3PageCacheSize", tsS3PageCacheSize, 4, 1024 * 1024 * 1024, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER)); TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "s3UploadDelaySec", tsS3UploadDelaySec, 1, 60 * 60 * 24 * 30, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER)); From 5d62dce2c2c58fb95bdac8bd4b6ba4f6d6eaa3a0 Mon Sep 17 00:00:00 2001 From: Shungang Li Date: Mon, 12 Aug 2024 10:56:25 +0800 Subject: [PATCH 2/5] enh: no shuffle for array with one element --- source/common/src/cos.c | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/source/common/src/cos.c b/source/common/src/cos.c index 471ab53322..ab882f5fb8 100644 --- a/source/common/src/cos.c +++ b/source/common/src/cos.c @@ -947,8 +947,8 @@ _exit: TAOS_RETURN(code); } -static int32_t s3InitEpIndexArray(SArray** pIndexArray) { - SArray* indexArray = taosArrayInit(tsS3EpNum, sizeof(int8_t)); +static int32_t s3InitEpIndexArray(SArray **pIndexArray) { + SArray *indexArray = taosArrayInit(tsS3EpNum, sizeof(int8_t)); if (!indexArray) { uError("%s: %s", __func__, "out of memory"); TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY); @@ -958,11 +958,13 @@ static int32_t s3InitEpIndexArray(SArray** pIndexArray) { taosArraySet(indexArray, i, &i); } - for (int8_t i = 0; i < tsS3EpNum; ++i) { - int8_t j = taosRand() % tsS3EpNum; - int8_t tmp = *(int8_t *)taosArrayGet(indexArray, i); - taosArraySet(indexArray, i, taosArrayGet(indexArray, j)); - taosArraySet(indexArray, j, &tmp); + if (tsS3EpNum > 1) { + for (int8_t i = 0; i < tsS3EpNum; ++i) { + int8_t j = taosRand() % tsS3EpNum; + int8_t tmp = *(int8_t *)taosArrayGet(indexArray, i); + taosArraySet(indexArray, i, taosArrayGet(indexArray, j)); + taosArraySet(indexArray, j, &tmp); + } } *pIndexArray = indexArray; From 1e83be102d19e25c7f124e0ac622b294797a3d5e Mon Sep 17 00:00:00 2001 From: Shungang Li Date: Mon, 12 Aug 2024 11:13:12 +0800 Subject: [PATCH 3/5] fix: return value check --- source/common/src/cos.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/common/src/cos.c b/source/common/src/cos.c index ab882f5fb8..7fa390d3bb 100644 --- a/source/common/src/cos.c +++ b/source/common/src/cos.c @@ -63,7 +63,7 @@ int32_t s3Begin() { TAOS_RETURN(TSDB_CODE_SUCCESS); } -void s3End() { S3_deinitialize(); } +void s3End() { (void)S3_deinitialize(); } int32_t s3Init() { TAOS_RETURN(TSDB_CODE_SUCCESS); /*s3Begin();*/ } From 21f8d6b7c1d5a5a6d35bd9f41baaa9e564c3b8c9 Mon Sep 17 00:00:00 2001 From: Shungang Li Date: Mon, 12 Aug 2024 14:26:02 +0800 Subject: [PATCH 4/5] fix: memleak in taosSplitS3Cfg --- source/common/src/cos.c | 6 +++++- source/common/src/tglobal.c | 1 + 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/source/common/src/cos.c b/source/common/src/cos.c index 7fa390d3bb..8312d02bff 100644 --- a/source/common/src/cos.c +++ b/source/common/src/cos.c @@ -955,7 +955,11 @@ static int32_t s3InitEpIndexArray(SArray **pIndexArray) { } for (int8_t i = 0; i < tsS3EpNum; ++i) { - taosArraySet(indexArray, i, &i); + if (taosArrayPush(indexArray, &i) == NULL) { + taosArrayDestroy(indexArray); + uError("%s: %s", __func__, "out of memory"); + TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY); + } } if (tsS3EpNum > 1) { diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index e3048bcecf..4e32288123 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -376,6 +376,7 @@ static int32_t taosSplitS3Cfg(SConfig *pCfg, const char *name, char gVarible[TSD *pNum = num; _exit: + taosMemoryFreeClear(slices); taosMemoryFreeClear(strDup); TAOS_RETURN(code); } From c3b1e267b9be76550dcc385cf9919553e5db70c2 Mon Sep 17 00:00:00 2001 From: Shungang Li Date: Tue, 13 Aug 2024 15:09:54 +0800 Subject: [PATCH 5/5] enh: adjust the implementation of random selection --- source/common/src/cos.c | 86 +++++++---------------------------------- 1 file changed, 14 insertions(+), 72 deletions(-) diff --git a/source/common/src/cos.c b/source/common/src/cos.c index 8312d02bff..db0dadbc46 100644 --- a/source/common/src/cos.c +++ b/source/common/src/cos.c @@ -947,34 +947,6 @@ _exit: TAOS_RETURN(code); } -static int32_t s3InitEpIndexArray(SArray **pIndexArray) { - SArray *indexArray = taosArrayInit(tsS3EpNum, sizeof(int8_t)); - if (!indexArray) { - uError("%s: %s", __func__, "out of memory"); - TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY); - } - - for (int8_t i = 0; i < tsS3EpNum; ++i) { - if (taosArrayPush(indexArray, &i) == NULL) { - taosArrayDestroy(indexArray); - uError("%s: %s", __func__, "out of memory"); - TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY); - } - } - - if (tsS3EpNum > 1) { - for (int8_t i = 0; i < tsS3EpNum; ++i) { - int8_t j = taosRand() % tsS3EpNum; - int8_t tmp = *(int8_t *)taosArrayGet(indexArray, i); - taosArraySet(indexArray, i, taosArrayGet(indexArray, j)); - taosArraySet(indexArray, j, &tmp); - } - } - - *pIndexArray = indexArray; - TAOS_RETURN(TSDB_CODE_SUCCESS); -} - int32_t s3PutObjectFromFile2ByEp(const char *file, const char *object_name, int8_t withcp, int8_t epIndex) { int32_t code = 0; int32_t lmtime = 0; @@ -1039,19 +1011,15 @@ int32_t s3PutObjectFromFile2ByEp(const char *file, const char *object_name, int8 int32_t s3PutObjectFromFile2(const char *file, const char *object_name, int8_t withcp) { int32_t code = TSDB_CODE_SUCCESS; - SArray *indexArray = NULL; - TAOS_CHECK_RETURN(s3InitEpIndexArray(&indexArray)); - + int8_t startIndex = taosRand() % tsS3EpNum; for (int8_t i = 0; i < tsS3EpNum; ++i) { - int8_t epIndex = *(int8_t *)taosArrayGet(indexArray, i); + int8_t epIndex = (startIndex + i) % tsS3EpNum; code = s3PutObjectFromFile2ByEp(file, object_name, withcp, epIndex); if (code == TSDB_CODE_SUCCESS) { break; } } - taosArrayDestroy(indexArray); - return code; } @@ -1122,19 +1090,15 @@ static int32_t s3PutObjectFromFileOffsetByEp(const char *file, const char *objec int32_t s3PutObjectFromFileOffset(const char *file, const char *object_name, int64_t offset, int64_t size) { int32_t code = TSDB_CODE_SUCCESS; - SArray *indexArray = NULL; - TAOS_CHECK_RETURN(s3InitEpIndexArray(&indexArray)); - + int8_t startIndex = taosRand() % tsS3EpNum; for (int8_t i = 0; i < tsS3EpNum; ++i) { - int8_t epIndex = *(int8_t *)taosArrayGet(indexArray, i); + int8_t epIndex = (startIndex + i) % tsS3EpNum; code = s3PutObjectFromFileOffsetByEp(file, object_name, offset, size, epIndex); if (code == TSDB_CODE_SUCCESS) { break; } } - taosArrayDestroy(indexArray); - return code; } @@ -1242,21 +1206,15 @@ static SArray *getListByPrefixByEp(const char *prefix, int8_t epIndex) { static SArray *getListByPrefix(const char *prefix) { SArray *objectArray = NULL; - SArray *indexArray = NULL; - if (s3InitEpIndexArray(&indexArray) != TSDB_CODE_SUCCESS) { - return NULL; - } - + int8_t startIndex = taosRand() % tsS3EpNum; for (int8_t i = 0; i < tsS3EpNum; ++i) { - int8_t epIndex = *(int8_t *)taosArrayGet(indexArray, i); + int8_t epIndex = (startIndex + i) % tsS3EpNum; objectArray = getListByPrefixByEp(prefix, epIndex); if (objectArray) { break; } } - taosArrayDestroy(indexArray); - return objectArray; } @@ -1291,19 +1249,15 @@ static int32_t s3DeleteObjectsByEp(const char *object_name[], int nobject, int8_ int32_t s3DeleteObjects(const char *object_name[], int nobject) { int32_t code = 0; - SArray *indexArray = NULL; - TAOS_CHECK_RETURN(s3InitEpIndexArray(&indexArray)); - + int8_t startIndex = taosRand() % tsS3EpNum; for (int8_t i = 0; i < tsS3EpNum; ++i) { - int8_t epIndex = *(int8_t *)taosArrayGet(indexArray, i); + int8_t epIndex = (startIndex + i) % tsS3EpNum; code = s3DeleteObjectsByEp(object_name, nobject, epIndex); if (code == TSDB_CODE_SUCCESS) { break; } } - taosArrayDestroy(indexArray); - return code; } @@ -1394,19 +1348,15 @@ _retry: int32_t s3GetObjectBlock(const char *object_name, int64_t offset, int64_t size, bool check, uint8_t **ppBlock) { int32_t code = 0; - SArray *indexArray = NULL; - TAOS_CHECK_RETURN(s3InitEpIndexArray(&indexArray)); - + int8_t startIndex = taosRand() % tsS3EpNum; for (int8_t i = 0; i < tsS3EpNum; ++i) { - int8_t epIndex = *(int8_t *)taosArrayGet(indexArray, i); + int8_t epIndex = (startIndex + i) % tsS3EpNum; code = s3GetObjectBlockByEp(object_name, offset, size, check, ppBlock, epIndex); if (code == TSDB_CODE_SUCCESS) { break; } } - taosArrayDestroy(indexArray); - return code; } @@ -1458,19 +1408,15 @@ static int32_t s3GetObjectToFileByEp(const char *object_name, const char *fileNa int32_t s3GetObjectToFile(const char *object_name, const char *fileName) { int32_t code = 0; - SArray *indexArray = NULL; - TAOS_CHECK_RETURN(s3InitEpIndexArray(&indexArray)); - + int8_t startIndex = taosRand() % tsS3EpNum; for (int8_t i = 0; i < tsS3EpNum; ++i) { - int8_t epIndex = *(int8_t *)taosArrayGet(indexArray, i); + int8_t epIndex = (startIndex + i) % tsS3EpNum; code = s3GetObjectToFileByEp(object_name, fileName, epIndex); if (code == TSDB_CODE_SUCCESS) { break; } } - taosArrayDestroy(indexArray); - return code; } @@ -1531,19 +1477,15 @@ static long s3SizeByEp(const char *object_name, int8_t epIndex) { long s3Size(const char *object_name) { long size = 0; - SArray *indexArray = NULL; - TAOS_CHECK_RETURN(s3InitEpIndexArray(&indexArray)); - + int8_t startIndex = taosRand() % tsS3EpNum; for (int8_t i = 0; i < tsS3EpNum; ++i) { - int8_t epIndex = *(int8_t *)taosArrayGet(indexArray, i); + int8_t epIndex = (startIndex + i) % tsS3EpNum; size = s3SizeByEp(object_name, epIndex); if (size > 0) { break; } } - taosArrayDestroy(indexArray); - return size; }