diff --git a/include/util/tdef.h b/include/util/tdef.h index 39c65d2c71..890f1d8f95 100644 --- a/include/util/tdef.h +++ b/include/util/tdef.h @@ -294,6 +294,8 @@ typedef enum ELogicConditionType { #define TSDB_SHOW_SUBQUERY_LEN 1000 #define TSDB_LOG_VAR_LEN 32 +#define TSDB_MAX_EP_NUM 10 + #define TSDB_ARB_GROUP_MEMBER_NUM 2 #define TSDB_ARB_TOKEN_SIZE 32 diff --git a/source/common/src/cos.c b/source/common/src/cos.c index 335c654acd..db0dadbc46 100644 --- a/source/common/src/cos.c +++ b/source/common/src/cos.c @@ -5,14 +5,25 @@ #include "tdef.h" #include "tutil.h" -extern char tsS3Endpoint[]; -extern char tsS3AccessKeyId[]; -extern char tsS3AccessKeySecret[]; -extern char tsS3BucketName[]; -extern char tsS3AppId[]; -extern char tsS3Hostname[]; +extern int8_t tsS3EpNum; +extern char tsS3Endpoint[][TSDB_FQDN_LEN]; +extern char tsS3AccessKeyId[][TSDB_FQDN_LEN]; +extern char tsS3AccessKeySecret[][TSDB_FQDN_LEN]; +extern char tsS3BucketName[][TSDB_FQDN_LEN]; +extern char tsS3AppId[][TSDB_FQDN_LEN]; +extern char tsS3Hostname[][TSDB_FQDN_LEN]; extern int8_t tsS3Https; +static int32_t s3ListBucketByEp(char const *bucketname, int8_t epIndex); +static int32_t s3PutObjectFromFileOffsetByEp(const char *file, const char *object_name, int64_t offset, int64_t size, + int8_t epIndex); +static int32_t s3DeleteObjectsByEp(const char *object_name[], int nobject, int8_t epIndex); +static SArray *getListByPrefixByEp(const char *prefix, int8_t epIndex); +static int32_t s3GetObjectBlockByEp(const char *object_name, int64_t offset, int64_t size, bool check, + uint8_t **ppBlock, int8_t epIndex); +static int32_t s3GetObjectToFileByEp(const char *object_name, const char *fileName, int8_t epIndex); +static long s3SizeByEp(const char *object_name, int8_t epIndex); + #if defined(USE_S3) #include "libs3.h" @@ -32,7 +43,7 @@ extern int8_t tsS3Oss; int32_t s3Begin() { S3Status status; - const char *hostname = tsS3Hostname; + const char *hostname = tsS3Hostname[0]; const char *env_hn = getenv("S3_HOSTNAME"); if (env_hn) { @@ -52,7 +63,7 @@ int32_t s3Begin() { TAOS_RETURN(TSDB_CODE_SUCCESS); } -void s3End() { S3_deinitialize(); } +void s3End() { (void)S3_deinitialize(); } int32_t s3Init() { TAOS_RETURN(TSDB_CODE_SUCCESS); /*s3Begin();*/ } @@ -63,6 +74,7 @@ static int32_t s3ListBucket(char const *bucketname); int32_t s3CheckCfg() { int32_t code = 0, lino = 0; + int8_t i = 0; if (!tsS3Enabled) { (void)fprintf(stderr, "s3 not configured.\n"); @@ -75,83 +87,91 @@ int32_t s3CheckCfg() { TAOS_CHECK_GOTO(code, &lino, _exit); } - // test put - char testdata[17] = "0123456789abcdef"; - const char *objectname[] = {"s3test.txt"}; - char path[PATH_MAX] = {0}; - int ds_len = strlen(TD_DIRSEP); - int tmp_len = strlen(tsTempDir); + for (; i < tsS3EpNum; i++) { + (void)fprintf(stdout, "test s3 ep: %d/%d.\n", i + 1, tsS3EpNum); - (void)snprintf(path, PATH_MAX, "%s", tsTempDir); - if (strncmp(tsTempDir + tmp_len - ds_len, TD_DIRSEP, ds_len) != 0) { - (void)snprintf(path + tmp_len, PATH_MAX - tmp_len, "%s", TD_DIRSEP); - (void)snprintf(path + tmp_len + ds_len, PATH_MAX - tmp_len - ds_len, "%s", objectname[0]); - } else { - (void)snprintf(path + tmp_len, PATH_MAX - tmp_len, "%s", objectname[0]); - } + // test put + char testdata[17] = "0123456789abcdef"; + const char *objectname[] = {"s3test.txt"}; + char path[PATH_MAX] = {0}; + int ds_len = strlen(TD_DIRSEP); + int tmp_len = strlen(tsTempDir); - TdFilePtr fp = taosOpenFile(path, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_READ | TD_FILE_TRUNC); - if (!fp) { - (void)fprintf(stderr, "failed to open test file: %s.\n", path); - // uError("ERROR: %s Failed to open %s", __func__, path); - TAOS_CHECK_GOTO(TAOS_SYSTEM_ERROR(errno), &lino, _exit); - } - if (taosWriteFile(fp, testdata, strlen(testdata)) < 0) { - (void)fprintf(stderr, "failed to write test file: %s.\n", path); - TAOS_CHECK_GOTO(TAOS_SYSTEM_ERROR(errno), &lino, _exit); - } - if (taosFsyncFile(fp) < 0) { - (void)fprintf(stderr, "failed to fsync test file: %s.\n", path); - TAOS_CHECK_GOTO(TAOS_SYSTEM_ERROR(errno), &lino, _exit); - } - (void)taosCloseFile(&fp); + (void)snprintf(path, PATH_MAX, "%s", tsTempDir); + if (strncmp(tsTempDir + tmp_len - ds_len, TD_DIRSEP, ds_len) != 0) { + (void)snprintf(path + tmp_len, PATH_MAX - tmp_len, "%s", TD_DIRSEP); + (void)snprintf(path + tmp_len + ds_len, PATH_MAX - tmp_len - ds_len, "%s", objectname[0]); + } else { + (void)snprintf(path + tmp_len, PATH_MAX - tmp_len, "%s", objectname[0]); + } - (void)fprintf(stderr, "\nstart to put object: %s, file: %s content: %s\n", objectname[0], path, testdata); - code = s3PutObjectFromFileOffset(path, objectname[0], 0, 16); - if (code != 0) { - (void)fprintf(stderr, "put object %s : failed.\n", objectname[0]); - TAOS_CHECK_GOTO(code, &lino, _exit); - } - (void)fprintf(stderr, "put object %s: success.\n\n", objectname[0]); + TdFilePtr fp = taosOpenFile(path, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_READ | TD_FILE_TRUNC); + if (!fp) { + (void)fprintf(stderr, "failed to open test file: %s.\n", path); + // uError("ERROR: %s Failed to open %s", __func__, path); + TAOS_CHECK_GOTO(TAOS_SYSTEM_ERROR(errno), &lino, _exit); + } + if (taosWriteFile(fp, testdata, strlen(testdata)) < 0) { + (void)fprintf(stderr, "failed to write test file: %s.\n", path); + TAOS_CHECK_GOTO(TAOS_SYSTEM_ERROR(errno), &lino, _exit); + } + if (taosFsyncFile(fp) < 0) { + (void)fprintf(stderr, "failed to fsync test file: %s.\n", path); + TAOS_CHECK_GOTO(TAOS_SYSTEM_ERROR(errno), &lino, _exit); + } + (void)taosCloseFile(&fp); - // list buckets - (void)fprintf(stderr, "start to list bucket %s by prefix s3.\n", tsS3BucketName); - code = s3ListBucket(tsS3BucketName); - if (code != 0) { - (void)fprintf(stderr, "listing bucket %s : failed.\n", tsS3BucketName); - TAOS_CHECK_GOTO(code, &lino, _exit); - } - (void)fprintf(stderr, "listing bucket %s: success.\n\n", tsS3BucketName); + (void)fprintf(stderr, "\nstart to put object: %s, file: %s content: %s\n", objectname[0], path, testdata); + code = s3PutObjectFromFileOffsetByEp(path, objectname[0], 0, 16, i); + if (code != 0) { + (void)fprintf(stderr, "put object %s : failed.\n", objectname[0]); + TAOS_CHECK_GOTO(code, &lino, _exit); + } + (void)fprintf(stderr, "put object %s: success.\n\n", objectname[0]); - // test range get - uint8_t *pBlock = NULL; - int c_offset = 10; - int c_len = 6; + // list buckets + (void)fprintf(stderr, "start to list bucket %s by prefix s3.\n", tsS3BucketName[i]); + code = s3ListBucketByEp(tsS3BucketName[i], i); + if (code != 0) { + (void)fprintf(stderr, "listing bucket %s : failed.\n", tsS3BucketName[i]); + TAOS_CHECK_GOTO(code, &lino, _exit); + } + (void)fprintf(stderr, "listing bucket %s: success.\n\n", tsS3BucketName[i]); - (void)fprintf(stderr, "start to range get object %s offset: %d len: %d.\n", objectname[0], c_offset, c_len); - code = s3GetObjectBlock(objectname[0], c_offset, c_len, true, &pBlock); - if (code != 0) { - (void)fprintf(stderr, "get object %s : failed.\n", objectname[0]); - TAOS_CHECK_GOTO(code, &lino, _exit); - } - char buf[7] = {0}; - (void)memcpy(buf, pBlock, c_len); - taosMemoryFree(pBlock); - (void)fprintf(stderr, "object content: %s\n", buf); - (void)fprintf(stderr, "get object %s: success.\n\n", objectname[0]); + // test range get + uint8_t *pBlock = NULL; + int c_offset = 10; + int c_len = 6; - // delete test object - (void)fprintf(stderr, "start to delete object: %s.\n", objectname[0]); - code = s3DeleteObjects(objectname, 1); - if (code != 0) { - (void)fprintf(stderr, "delete object %s : failed.\n", objectname[0]); - TAOS_CHECK_GOTO(code, &lino, _exit); + (void)fprintf(stderr, "start to range get object %s offset: %d len: %d.\n", objectname[0], c_offset, c_len); + code = s3GetObjectBlockByEp(objectname[0], c_offset, c_len, true, &pBlock, i); + if (code != 0) { + (void)fprintf(stderr, "get object %s : failed.\n", objectname[0]); + TAOS_CHECK_GOTO(code, &lino, _exit); + } + char buf[7] = {0}; + (void)memcpy(buf, pBlock, c_len); + taosMemoryFree(pBlock); + (void)fprintf(stderr, "object content: %s\n", buf); + (void)fprintf(stderr, "get object %s: success.\n\n", objectname[0]); + + // delete test object + (void)fprintf(stderr, "start to delete object: %s.\n", objectname[0]); + code = s3DeleteObjectsByEp(objectname, 1, i); + if (code != 0) { + (void)fprintf(stderr, "delete object %s : failed.\n", objectname[0]); + TAOS_CHECK_GOTO(code, &lino, _exit); + } + (void)fprintf(stderr, "delete object %s: success.\n\n", objectname[0]); } - (void)fprintf(stderr, "delete object %s: success.\n\n", objectname[0]); s3End(); _exit: + if (TSDB_CODE_SUCCESS != code) { + (void)fprintf(stderr, "s3 check failed, code: %d, line: %d, index: %d.\n", code, lino, i); + } + TAOS_RETURN(code); } @@ -241,6 +261,27 @@ static void responseCompleteCallback(S3Status status, const S3ErrorDetails *erro static SArray *getListByPrefix(const char *prefix); static void s3FreeObjectKey(void *pItem); +static int32_t s3ListBucketByEp(char const *bucketname, int8_t epIndex) { + int32_t code = 0; + + SArray *objectArray = getListByPrefixByEp("s3", epIndex); + if (objectArray == NULL) { + TAOS_RETURN(TSDB_CODE_FAILED); + } + + const char **object_name = TARRAY_DATA(objectArray); + int size = TARRAY_SIZE(objectArray); + + (void)fprintf(stderr, "objects:\n"); + for (int i = 0; i < size; ++i) { + (void)fprintf(stderr, "%s\n", object_name[i]); + } + + taosArrayDestroyEx(objectArray, s3FreeObjectKey); + + TAOS_RETURN(code); +} + static int32_t s3ListBucket(char const *bucketname) { int32_t code = 0; @@ -906,7 +947,7 @@ _exit: TAOS_RETURN(code); } -int32_t s3PutObjectFromFile2(const char *file, const char *object_name, int8_t withcp) { +int32_t s3PutObjectFromFile2ByEp(const char *file, const char *object_name, int8_t withcp, int8_t epIndex) { int32_t code = 0; int32_t lmtime = 0; const char *filename = 0; @@ -933,8 +974,14 @@ int32_t s3PutObjectFromFile2(const char *file, const char *object_name, int8_t w data.totalContentLength = data.totalOriginalContentLength = data.contentLength = data.originalContentLength = contentLength; - S3BucketContext bucketContext = {0, tsS3BucketName, protocolG, uriStyleG, tsS3AccessKeyId, tsS3AccessKeySecret, - 0, awsRegionG}; + S3BucketContext bucketContext = {0, + tsS3BucketName[epIndex], + protocolG, + uriStyleG, + tsS3AccessKeyId[epIndex], + tsS3AccessKeySecret[epIndex], + 0, + awsRegionG}; S3PutProperties putProperties = {contentType, md5, cacheControl, contentDispositionFilename, @@ -961,7 +1008,23 @@ int32_t s3PutObjectFromFile2(const char *file, const char *object_name, int8_t w TAOS_RETURN(code); } -int32_t s3PutObjectFromFileOffset(const char *file, const char *object_name, int64_t offset, int64_t size) { +int32_t s3PutObjectFromFile2(const char *file, const char *object_name, int8_t withcp) { + int32_t code = TSDB_CODE_SUCCESS; + + int8_t startIndex = taosRand() % tsS3EpNum; + for (int8_t i = 0; i < tsS3EpNum; ++i) { + int8_t epIndex = (startIndex + i) % tsS3EpNum; + code = s3PutObjectFromFile2ByEp(file, object_name, withcp, epIndex); + if (code == TSDB_CODE_SUCCESS) { + break; + } + } + + return code; +} + +static int32_t s3PutObjectFromFileOffsetByEp(const char *file, const char *object_name, int64_t offset, int64_t size, + int8_t epIndex) { int32_t code = 0; int32_t lmtime = 0; const char *filename = 0; @@ -994,8 +1057,14 @@ int32_t s3PutObjectFromFileOffset(const char *file, const char *object_name, int data.totalContentLength = data.totalOriginalContentLength = data.contentLength = data.originalContentLength = contentLength; - S3BucketContext bucketContext = {0, tsS3BucketName, protocolG, uriStyleG, tsS3AccessKeyId, tsS3AccessKeySecret, - 0, awsRegionG}; + S3BucketContext bucketContext = {0, + tsS3BucketName[epIndex], + protocolG, + uriStyleG, + tsS3AccessKeyId[epIndex], + tsS3AccessKeySecret[epIndex], + 0, + awsRegionG}; S3PutProperties putProperties = {contentType, md5, cacheControl, contentDispositionFilename, @@ -1018,6 +1087,21 @@ int32_t s3PutObjectFromFileOffset(const char *file, const char *object_name, int TAOS_RETURN(code); } +int32_t s3PutObjectFromFileOffset(const char *file, const char *object_name, int64_t offset, int64_t size) { + int32_t code = TSDB_CODE_SUCCESS; + + int8_t startIndex = taosRand() % tsS3EpNum; + for (int8_t i = 0; i < tsS3EpNum; ++i) { + int8_t epIndex = (startIndex + i) % tsS3EpNum; + code = s3PutObjectFromFileOffsetByEp(file, object_name, offset, size, epIndex); + if (code == TSDB_CODE_SUCCESS) { + break; + } + } + + return code; +} + typedef struct list_bucket_callback_data { char err_msg[512]; S3Status status; @@ -1068,9 +1152,15 @@ static void s3FreeObjectKey(void *pItem) { taosMemoryFree(key); } -static SArray *getListByPrefix(const char *prefix) { - S3BucketContext bucketContext = {0, tsS3BucketName, protocolG, uriStyleG, tsS3AccessKeyId, tsS3AccessKeySecret, - 0, awsRegionG}; +static SArray *getListByPrefixByEp(const char *prefix, int8_t epIndex) { + S3BucketContext bucketContext = {0, + tsS3BucketName[epIndex], + protocolG, + uriStyleG, + tsS3AccessKeyId[epIndex], + tsS3AccessKeySecret[epIndex], + 0, + awsRegionG}; S3ListBucketHandler listBucketHandler = {{&responsePropertiesCallbackNull, &responseCompleteCallback}, &listBucketCallback}; @@ -1114,11 +1204,31 @@ static SArray *getListByPrefix(const char *prefix) { return NULL; } -int32_t s3DeleteObjects(const char *object_name[], int nobject) { +static SArray *getListByPrefix(const char *prefix) { + SArray *objectArray = NULL; + int8_t startIndex = taosRand() % tsS3EpNum; + for (int8_t i = 0; i < tsS3EpNum; ++i) { + int8_t epIndex = (startIndex + i) % tsS3EpNum; + objectArray = getListByPrefixByEp(prefix, epIndex); + if (objectArray) { + break; + } + } + + return objectArray; +} + +static int32_t s3DeleteObjectsByEp(const char *object_name[], int nobject, int8_t epIndex) { int32_t code = 0; - S3BucketContext bucketContext = {0, tsS3BucketName, protocolG, uriStyleG, tsS3AccessKeyId, tsS3AccessKeySecret, - 0, awsRegionG}; + S3BucketContext bucketContext = {0, + tsS3BucketName[epIndex], + protocolG, + uriStyleG, + tsS3AccessKeyId[epIndex], + tsS3AccessKeySecret[epIndex], + 0, + awsRegionG}; S3ResponseHandler responseHandler = {0, &responseCompleteCallback}; for (int i = 0; i < nobject; ++i) { @@ -1136,6 +1246,21 @@ int32_t s3DeleteObjects(const char *object_name[], int nobject) { TAOS_RETURN(code); } +int32_t s3DeleteObjects(const char *object_name[], int nobject) { + int32_t code = 0; + + int8_t startIndex = taosRand() % tsS3EpNum; + for (int8_t i = 0; i < tsS3EpNum; ++i) { + int8_t epIndex = (startIndex + i) % tsS3EpNum; + code = s3DeleteObjectsByEp(object_name, nobject, epIndex); + if (code == TSDB_CODE_SUCCESS) { + break; + } + } + + return code; +} + void s3DeleteObjectsByPrefix(const char *prefix) { SArray *objectArray = getListByPrefix(prefix); if (objectArray == NULL) return; @@ -1166,13 +1291,20 @@ static S3Status getObjectDataCallback(int bufferSize, const char *buffer, void * } } -int32_t s3GetObjectBlock(const char *object_name, int64_t offset, int64_t size, bool check, uint8_t **ppBlock) { +static int32_t s3GetObjectBlockByEp(const char *object_name, int64_t offset, int64_t size, bool check, + uint8_t **ppBlock, int8_t epIndex) { int status = 0; int64_t ifModifiedSince = -1, ifNotModifiedSince = -1; const char *ifMatch = 0, *ifNotMatch = 0; - S3BucketContext bucketContext = {0, tsS3BucketName, protocolG, uriStyleG, tsS3AccessKeyId, tsS3AccessKeySecret, - 0, awsRegionG}; + S3BucketContext bucketContext = {0, + tsS3BucketName[epIndex], + protocolG, + uriStyleG, + tsS3AccessKeyId[epIndex], + tsS3AccessKeySecret[epIndex], + 0, + awsRegionG}; S3GetConditions getConditions = {ifModifiedSince, ifNotModifiedSince, ifMatch, ifNotMatch}; S3GetObjectHandler getObjectHandler = {{&responsePropertiesCallback, &responseCompleteCallback}, &getObjectDataCallback}; @@ -1213,18 +1345,39 @@ _retry: TAOS_RETURN(TSDB_CODE_SUCCESS); } +int32_t s3GetObjectBlock(const char *object_name, int64_t offset, int64_t size, bool check, uint8_t **ppBlock) { + int32_t code = 0; + + int8_t startIndex = taosRand() % tsS3EpNum; + for (int8_t i = 0; i < tsS3EpNum; ++i) { + int8_t epIndex = (startIndex + i) % tsS3EpNum; + code = s3GetObjectBlockByEp(object_name, offset, size, check, ppBlock, epIndex); + if (code == TSDB_CODE_SUCCESS) { + break; + } + } + + return code; +} + static S3Status getObjectCallback(int bufferSize, const char *buffer, void *callbackData) { TS3GetData *cbd = (TS3GetData *)callbackData; size_t wrote = taosWriteFile(cbd->file, buffer, bufferSize); return ((wrote < (size_t)bufferSize) ? S3StatusAbortedByCallback : S3StatusOK); } -int32_t s3GetObjectToFile(const char *object_name, const char *fileName) { +static int32_t s3GetObjectToFileByEp(const char *object_name, const char *fileName, int8_t epIndex) { int64_t ifModifiedSince = -1, ifNotModifiedSince = -1; const char *ifMatch = 0, *ifNotMatch = 0; - S3BucketContext bucketContext = {0, tsS3BucketName, protocolG, uriStyleG, tsS3AccessKeyId, tsS3AccessKeySecret, - 0, awsRegionG}; + S3BucketContext bucketContext = {0, + tsS3BucketName[epIndex], + protocolG, + uriStyleG, + tsS3AccessKeyId[epIndex], + tsS3AccessKeySecret[epIndex], + 0, + awsRegionG}; S3GetConditions getConditions = {ifModifiedSince, ifNotModifiedSince, ifMatch, ifNotMatch}; S3GetObjectHandler getObjectHandler = {{&responsePropertiesCallbackNull, &responseCompleteCallback}, &getObjectCallback}; @@ -1252,6 +1405,21 @@ int32_t s3GetObjectToFile(const char *object_name, const char *fileName) { TAOS_RETURN(TSDB_CODE_SUCCESS); } +int32_t s3GetObjectToFile(const char *object_name, const char *fileName) { + int32_t code = 0; + + int8_t startIndex = taosRand() % tsS3EpNum; + for (int8_t i = 0; i < tsS3EpNum; ++i) { + int8_t epIndex = (startIndex + i) % tsS3EpNum; + code = s3GetObjectToFileByEp(object_name, fileName, epIndex); + if (code == TSDB_CODE_SUCCESS) { + break; + } + } + + return code; +} + int32_t s3GetObjectsByPrefix(const char *prefix, const char *path) { SArray *objectArray = getListByPrefix(prefix); if (objectArray == NULL) TAOS_RETURN(TSDB_CODE_FAILED); @@ -1275,12 +1443,18 @@ int32_t s3GetObjectsByPrefix(const char *prefix, const char *path) { return 0; } -long s3Size(const char *object_name) { +static long s3SizeByEp(const char *object_name, int8_t epIndex) { long size = 0; int status = 0; - S3BucketContext bucketContext = {0, tsS3BucketName, protocolG, uriStyleG, tsS3AccessKeyId, tsS3AccessKeySecret, - 0, awsRegionG}; + S3BucketContext bucketContext = {0, + tsS3BucketName[epIndex], + protocolG, + uriStyleG, + tsS3AccessKeyId[epIndex], + tsS3AccessKeySecret[epIndex], + 0, + awsRegionG}; S3ResponseHandler responseHandler = {&responsePropertiesCallback, &responseCompleteCallback}; @@ -1300,6 +1474,21 @@ long s3Size(const char *object_name) { return size; } +long s3Size(const char *object_name) { + long size = 0; + + int8_t startIndex = taosRand() % tsS3EpNum; + for (int8_t i = 0; i < tsS3EpNum; ++i) { + int8_t epIndex = (startIndex + i) % tsS3EpNum; + size = s3SizeByEp(object_name, epIndex); + if (size > 0) { + break; + } + } + + return size; +} + void s3EvictCache(const char *path, long object_size) {} #elif defined(USE_COS) diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index ad069356c4..718384690a 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -294,19 +294,20 @@ bool tsFilterScalarMode = false; int tsResolveFQDNRetryTime = 100; // seconds int tsStreamAggCnt = 100000; -char tsS3Endpoint[TSDB_FQDN_LEN] = ""; -char tsS3AccessKey[TSDB_FQDN_LEN] = ""; -char tsS3AccessKeyId[TSDB_FQDN_LEN] = ""; -char tsS3AccessKeySecret[TSDB_FQDN_LEN] = ""; -char tsS3BucketName[TSDB_FQDN_LEN] = ""; -char tsS3AppId[TSDB_FQDN_LEN] = ""; +int8_t tsS3EpNum = 0; +char tsS3Endpoint[TSDB_MAX_EP_NUM][TSDB_FQDN_LEN] = {""}; +char tsS3AccessKey[TSDB_MAX_EP_NUM][TSDB_FQDN_LEN] = {""}; +char tsS3AccessKeyId[TSDB_MAX_EP_NUM][TSDB_FQDN_LEN] = {""}; +char tsS3AccessKeySecret[TSDB_MAX_EP_NUM][TSDB_FQDN_LEN] = {""}; +char tsS3BucketName[TSDB_MAX_EP_NUM][TSDB_FQDN_LEN] = {""}; +char tsS3AppId[TSDB_MAX_EP_NUM][TSDB_FQDN_LEN] = {""}; int8_t tsS3Enabled = false; int8_t tsS3EnabledCfg = false; int8_t tsS3Oss = false; int8_t tsS3StreamEnabled = false; int8_t tsS3Https = true; -char tsS3Hostname[TSDB_FQDN_LEN] = ""; +char tsS3Hostname[TSDB_MAX_EP_NUM][TSDB_FQDN_LEN] = {""}; int32_t tsS3BlockSize = -1; // number of tsdb pages (4096) int32_t tsS3BlockCacheSize = 16; // number of blocks @@ -349,49 +350,89 @@ int32_t taosSetTfsCfg(SConfig *pCfg) { int32_t taosSetTfsCfg(SConfig *pCfg); #endif -int32_t taosSetS3Cfg(SConfig *pCfg) { - SConfigItem *pItem = NULL; +static int32_t taosSplitS3Cfg(SConfig *pCfg, const char *name, char gVarible[TSDB_MAX_EP_NUM][TSDB_FQDN_LEN], + int8_t *pNum) { + int32_t code = TSDB_CODE_SUCCESS; - TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "s3Accesskey"); - tstrncpy(tsS3AccessKey, pItem->str, TSDB_FQDN_LEN); - if (tsS3AccessKey[0] == '<') { + SConfigItem *pItem = NULL; + int32_t num = 0; + TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, name); + + char *strDup = NULL; + if ((strDup = taosStrdup(pItem->str))== NULL){ + code = TSDB_CODE_OUT_OF_MEMORY; + goto _exit; + } + + char **slices = strsplit(strDup, ",", &num); + if (num > TSDB_MAX_EP_NUM) { + code = TSDB_CODE_INVALID_CFG; + goto _exit; + } + + for (int i = 0; i < num; ++i) { + tstrncpy(gVarible[i], slices[i], TSDB_FQDN_LEN); + } + *pNum = num; + +_exit: + taosMemoryFreeClear(slices); + taosMemoryFreeClear(strDup); + TAOS_RETURN(code); +} + +int32_t taosSetS3Cfg(SConfig *pCfg) { + int8_t num = 0; + + TAOS_CHECK_RETURN(taosSplitS3Cfg(pCfg, "s3Accesskey", tsS3AccessKey, &num)); + if (num == 0) TAOS_RETURN(TSDB_CODE_SUCCESS); + + tsS3EpNum = num; + + if (tsS3AccessKey[0][0] == '<') { TAOS_RETURN(TSDB_CODE_SUCCESS); } - char *colon = strchr(tsS3AccessKey, ':'); - if (!colon) { - uError("invalid access key:%s", tsS3AccessKey); - TAOS_RETURN(TSDB_CODE_INVALID_CFG); - } - *colon = '\0'; - tstrncpy(tsS3AccessKeyId, tsS3AccessKey, TSDB_FQDN_LEN); - tstrncpy(tsS3AccessKeySecret, colon + 1, TSDB_FQDN_LEN); - TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "s3Endpoint"); - tstrncpy(tsS3Endpoint, pItem->str, TSDB_FQDN_LEN); - TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "s3BucketName"); - tstrncpy(tsS3BucketName, pItem->str, TSDB_FQDN_LEN); - char *proto = strstr(tsS3Endpoint, "https://"); - if (!proto) { - tsS3Https = false; - tstrncpy(tsS3Hostname, tsS3Endpoint + 7, TSDB_FQDN_LEN); - } else { - tstrncpy(tsS3Hostname, tsS3Endpoint + 8, TSDB_FQDN_LEN); + for (int i = 0; i < tsS3EpNum; ++i) { + char *colon = strchr(tsS3AccessKey[i], ':'); + if (!colon) { + uError("invalid access key:%s", tsS3AccessKey[i]); + TAOS_RETURN(TSDB_CODE_INVALID_CFG); + } + *colon = '\0'; + tstrncpy(tsS3AccessKeyId[i], tsS3AccessKey[i], TSDB_FQDN_LEN); + tstrncpy(tsS3AccessKeySecret[i], colon + 1, TSDB_FQDN_LEN); } - char *cos = strstr(tsS3Endpoint, "cos."); - if (cos) { - char *appid = strrchr(tsS3BucketName, '-'); - if (!appid) { - uError("failed to locate appid in bucket:%s", tsS3BucketName); - TAOS_RETURN(TSDB_CODE_INVALID_CFG); + TAOS_CHECK_RETURN(taosSplitS3Cfg(pCfg, "s3Endpoint", tsS3Endpoint, &num)); + if (num != tsS3EpNum) TAOS_RETURN(TSDB_CODE_INVALID_CFG); + + TAOS_CHECK_RETURN(taosSplitS3Cfg(pCfg, "s3BucketName", tsS3BucketName, &num)); + if (num != tsS3EpNum) TAOS_RETURN(TSDB_CODE_INVALID_CFG); + + for (int i = 0; i < tsS3EpNum; ++i) { + char *proto = strstr(tsS3Endpoint[i], "https://"); + if (!proto) { + tstrncpy(tsS3Hostname[i], tsS3Endpoint[i] + 7, TSDB_FQDN_LEN); } else { - tstrncpy(tsS3AppId, appid + 1, TSDB_FQDN_LEN); + tstrncpy(tsS3Hostname[i], tsS3Endpoint[i] + 8, TSDB_FQDN_LEN); + } + + char *cos = strstr(tsS3Endpoint[i], "cos."); + if (cos) { + char *appid = strrchr(tsS3BucketName[i], '-'); + if (!appid) { + uError("failed to locate appid in bucket:%s", tsS3BucketName[i]); + TAOS_RETURN(TSDB_CODE_INVALID_CFG); + } else { + tstrncpy(tsS3AppId[i], appid + 1, TSDB_FQDN_LEN); + } } } - char *oss = strstr(tsS3Endpoint, "aliyuncs."); - if (oss) { - tsS3Oss = true; - } - if (tsS3BucketName[0] != '<') { + + tsS3Https = (strstr(tsS3Endpoint[0], "https://") != NULL); + tsS3Oss = (strstr(tsS3Endpoint[0], "aliyuncs.") != NULL); + + if (tsS3BucketName[0][0] != '<') { #if defined(USE_COS) || defined(USE_S3) #ifdef TD_ENTERPRISE /*if (tsDiskCfgNum > 1) */ tsS3Enabled = true; @@ -775,9 +816,9 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "pqSortMemThreshold", tsPQSortMemThreshold, 1, 10240, CFG_SCOPE_SERVER, CFG_DYN_NONE)); TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "resolveFQDNRetryTime", tsResolveFQDNRetryTime, 1, 10240, CFG_SCOPE_SERVER, CFG_DYN_NONE)); - TAOS_CHECK_RETURN(cfgAddString(pCfg, "s3Accesskey", tsS3AccessKey, CFG_SCOPE_SERVER, CFG_DYN_NONE)); - TAOS_CHECK_RETURN(cfgAddString(pCfg, "s3Endpoint", tsS3Endpoint, CFG_SCOPE_SERVER, CFG_DYN_NONE)); - TAOS_CHECK_RETURN(cfgAddString(pCfg, "s3BucketName", tsS3BucketName, CFG_SCOPE_SERVER, CFG_DYN_NONE)); + TAOS_CHECK_RETURN(cfgAddString(pCfg, "s3Accesskey", tsS3AccessKey[0], CFG_SCOPE_SERVER, CFG_DYN_NONE)); + TAOS_CHECK_RETURN(cfgAddString(pCfg, "s3Endpoint", tsS3Endpoint[0], CFG_SCOPE_SERVER, CFG_DYN_NONE)); + TAOS_CHECK_RETURN(cfgAddString(pCfg, "s3BucketName", tsS3BucketName[0], CFG_SCOPE_SERVER, CFG_DYN_NONE)); TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "s3PageCacheSize", tsS3PageCacheSize, 4, 1024 * 1024 * 1024, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER)); TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "s3UploadDelaySec", tsS3UploadDelaySec, 1, 60 * 60 * 24 * 30, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER));