Merge pull request #27134 from taosdata/feat/TD-31289-3.0
feat: cos support for multi s3 cfg
This commit is contained in:
commit
9ead181d19
|
@ -294,6 +294,8 @@ typedef enum ELogicConditionType {
|
|||
#define TSDB_SHOW_SUBQUERY_LEN 1000
|
||||
#define TSDB_LOG_VAR_LEN 32
|
||||
|
||||
#define TSDB_MAX_EP_NUM 10
|
||||
|
||||
#define TSDB_ARB_GROUP_MEMBER_NUM 2
|
||||
#define TSDB_ARB_TOKEN_SIZE 32
|
||||
|
||||
|
|
|
@ -5,14 +5,25 @@
|
|||
#include "tdef.h"
|
||||
#include "tutil.h"
|
||||
|
||||
extern char tsS3Endpoint[];
|
||||
extern char tsS3AccessKeyId[];
|
||||
extern char tsS3AccessKeySecret[];
|
||||
extern char tsS3BucketName[];
|
||||
extern char tsS3AppId[];
|
||||
extern char tsS3Hostname[];
|
||||
extern int8_t tsS3EpNum;
|
||||
extern char tsS3Endpoint[][TSDB_FQDN_LEN];
|
||||
extern char tsS3AccessKeyId[][TSDB_FQDN_LEN];
|
||||
extern char tsS3AccessKeySecret[][TSDB_FQDN_LEN];
|
||||
extern char tsS3BucketName[][TSDB_FQDN_LEN];
|
||||
extern char tsS3AppId[][TSDB_FQDN_LEN];
|
||||
extern char tsS3Hostname[][TSDB_FQDN_LEN];
|
||||
extern int8_t tsS3Https;
|
||||
|
||||
static int32_t s3ListBucketByEp(char const *bucketname, int8_t epIndex);
|
||||
static int32_t s3PutObjectFromFileOffsetByEp(const char *file, const char *object_name, int64_t offset, int64_t size,
|
||||
int8_t epIndex);
|
||||
static int32_t s3DeleteObjectsByEp(const char *object_name[], int nobject, int8_t epIndex);
|
||||
static SArray *getListByPrefixByEp(const char *prefix, int8_t epIndex);
|
||||
static int32_t s3GetObjectBlockByEp(const char *object_name, int64_t offset, int64_t size, bool check,
|
||||
uint8_t **ppBlock, int8_t epIndex);
|
||||
static int32_t s3GetObjectToFileByEp(const char *object_name, const char *fileName, int8_t epIndex);
|
||||
static long s3SizeByEp(const char *object_name, int8_t epIndex);
|
||||
|
||||
#if defined(USE_S3)
|
||||
|
||||
#include "libs3.h"
|
||||
|
@ -32,7 +43,7 @@ extern int8_t tsS3Oss;
|
|||
|
||||
int32_t s3Begin() {
|
||||
S3Status status;
|
||||
const char *hostname = tsS3Hostname;
|
||||
const char *hostname = tsS3Hostname[0];
|
||||
const char *env_hn = getenv("S3_HOSTNAME");
|
||||
|
||||
if (env_hn) {
|
||||
|
@ -52,7 +63,7 @@ int32_t s3Begin() {
|
|||
TAOS_RETURN(TSDB_CODE_SUCCESS);
|
||||
}
|
||||
|
||||
void s3End() { S3_deinitialize(); }
|
||||
void s3End() { (void)S3_deinitialize(); }
|
||||
|
||||
int32_t s3Init() { TAOS_RETURN(TSDB_CODE_SUCCESS); /*s3Begin();*/ }
|
||||
|
||||
|
@ -63,6 +74,7 @@ static int32_t s3ListBucket(char const *bucketname);
|
|||
|
||||
int32_t s3CheckCfg() {
|
||||
int32_t code = 0, lino = 0;
|
||||
int8_t i = 0;
|
||||
|
||||
if (!tsS3Enabled) {
|
||||
(void)fprintf(stderr, "s3 not configured.\n");
|
||||
|
@ -75,83 +87,91 @@ int32_t s3CheckCfg() {
|
|||
TAOS_CHECK_GOTO(code, &lino, _exit);
|
||||
}
|
||||
|
||||
// test put
|
||||
char testdata[17] = "0123456789abcdef";
|
||||
const char *objectname[] = {"s3test.txt"};
|
||||
char path[PATH_MAX] = {0};
|
||||
int ds_len = strlen(TD_DIRSEP);
|
||||
int tmp_len = strlen(tsTempDir);
|
||||
for (; i < tsS3EpNum; i++) {
|
||||
(void)fprintf(stdout, "test s3 ep: %d/%d.\n", i + 1, tsS3EpNum);
|
||||
|
||||
(void)snprintf(path, PATH_MAX, "%s", tsTempDir);
|
||||
if (strncmp(tsTempDir + tmp_len - ds_len, TD_DIRSEP, ds_len) != 0) {
|
||||
(void)snprintf(path + tmp_len, PATH_MAX - tmp_len, "%s", TD_DIRSEP);
|
||||
(void)snprintf(path + tmp_len + ds_len, PATH_MAX - tmp_len - ds_len, "%s", objectname[0]);
|
||||
} else {
|
||||
(void)snprintf(path + tmp_len, PATH_MAX - tmp_len, "%s", objectname[0]);
|
||||
}
|
||||
// test put
|
||||
char testdata[17] = "0123456789abcdef";
|
||||
const char *objectname[] = {"s3test.txt"};
|
||||
char path[PATH_MAX] = {0};
|
||||
int ds_len = strlen(TD_DIRSEP);
|
||||
int tmp_len = strlen(tsTempDir);
|
||||
|
||||
TdFilePtr fp = taosOpenFile(path, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_READ | TD_FILE_TRUNC);
|
||||
if (!fp) {
|
||||
(void)fprintf(stderr, "failed to open test file: %s.\n", path);
|
||||
// uError("ERROR: %s Failed to open %s", __func__, path);
|
||||
TAOS_CHECK_GOTO(TAOS_SYSTEM_ERROR(errno), &lino, _exit);
|
||||
}
|
||||
if (taosWriteFile(fp, testdata, strlen(testdata)) < 0) {
|
||||
(void)fprintf(stderr, "failed to write test file: %s.\n", path);
|
||||
TAOS_CHECK_GOTO(TAOS_SYSTEM_ERROR(errno), &lino, _exit);
|
||||
}
|
||||
if (taosFsyncFile(fp) < 0) {
|
||||
(void)fprintf(stderr, "failed to fsync test file: %s.\n", path);
|
||||
TAOS_CHECK_GOTO(TAOS_SYSTEM_ERROR(errno), &lino, _exit);
|
||||
}
|
||||
(void)taosCloseFile(&fp);
|
||||
(void)snprintf(path, PATH_MAX, "%s", tsTempDir);
|
||||
if (strncmp(tsTempDir + tmp_len - ds_len, TD_DIRSEP, ds_len) != 0) {
|
||||
(void)snprintf(path + tmp_len, PATH_MAX - tmp_len, "%s", TD_DIRSEP);
|
||||
(void)snprintf(path + tmp_len + ds_len, PATH_MAX - tmp_len - ds_len, "%s", objectname[0]);
|
||||
} else {
|
||||
(void)snprintf(path + tmp_len, PATH_MAX - tmp_len, "%s", objectname[0]);
|
||||
}
|
||||
|
||||
(void)fprintf(stderr, "\nstart to put object: %s, file: %s content: %s\n", objectname[0], path, testdata);
|
||||
code = s3PutObjectFromFileOffset(path, objectname[0], 0, 16);
|
||||
if (code != 0) {
|
||||
(void)fprintf(stderr, "put object %s : failed.\n", objectname[0]);
|
||||
TAOS_CHECK_GOTO(code, &lino, _exit);
|
||||
}
|
||||
(void)fprintf(stderr, "put object %s: success.\n\n", objectname[0]);
|
||||
TdFilePtr fp = taosOpenFile(path, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_READ | TD_FILE_TRUNC);
|
||||
if (!fp) {
|
||||
(void)fprintf(stderr, "failed to open test file: %s.\n", path);
|
||||
// uError("ERROR: %s Failed to open %s", __func__, path);
|
||||
TAOS_CHECK_GOTO(TAOS_SYSTEM_ERROR(errno), &lino, _exit);
|
||||
}
|
||||
if (taosWriteFile(fp, testdata, strlen(testdata)) < 0) {
|
||||
(void)fprintf(stderr, "failed to write test file: %s.\n", path);
|
||||
TAOS_CHECK_GOTO(TAOS_SYSTEM_ERROR(errno), &lino, _exit);
|
||||
}
|
||||
if (taosFsyncFile(fp) < 0) {
|
||||
(void)fprintf(stderr, "failed to fsync test file: %s.\n", path);
|
||||
TAOS_CHECK_GOTO(TAOS_SYSTEM_ERROR(errno), &lino, _exit);
|
||||
}
|
||||
(void)taosCloseFile(&fp);
|
||||
|
||||
// list buckets
|
||||
(void)fprintf(stderr, "start to list bucket %s by prefix s3.\n", tsS3BucketName);
|
||||
code = s3ListBucket(tsS3BucketName);
|
||||
if (code != 0) {
|
||||
(void)fprintf(stderr, "listing bucket %s : failed.\n", tsS3BucketName);
|
||||
TAOS_CHECK_GOTO(code, &lino, _exit);
|
||||
}
|
||||
(void)fprintf(stderr, "listing bucket %s: success.\n\n", tsS3BucketName);
|
||||
(void)fprintf(stderr, "\nstart to put object: %s, file: %s content: %s\n", objectname[0], path, testdata);
|
||||
code = s3PutObjectFromFileOffsetByEp(path, objectname[0], 0, 16, i);
|
||||
if (code != 0) {
|
||||
(void)fprintf(stderr, "put object %s : failed.\n", objectname[0]);
|
||||
TAOS_CHECK_GOTO(code, &lino, _exit);
|
||||
}
|
||||
(void)fprintf(stderr, "put object %s: success.\n\n", objectname[0]);
|
||||
|
||||
// test range get
|
||||
uint8_t *pBlock = NULL;
|
||||
int c_offset = 10;
|
||||
int c_len = 6;
|
||||
// list buckets
|
||||
(void)fprintf(stderr, "start to list bucket %s by prefix s3.\n", tsS3BucketName[i]);
|
||||
code = s3ListBucketByEp(tsS3BucketName[i], i);
|
||||
if (code != 0) {
|
||||
(void)fprintf(stderr, "listing bucket %s : failed.\n", tsS3BucketName[i]);
|
||||
TAOS_CHECK_GOTO(code, &lino, _exit);
|
||||
}
|
||||
(void)fprintf(stderr, "listing bucket %s: success.\n\n", tsS3BucketName[i]);
|
||||
|
||||
(void)fprintf(stderr, "start to range get object %s offset: %d len: %d.\n", objectname[0], c_offset, c_len);
|
||||
code = s3GetObjectBlock(objectname[0], c_offset, c_len, true, &pBlock);
|
||||
if (code != 0) {
|
||||
(void)fprintf(stderr, "get object %s : failed.\n", objectname[0]);
|
||||
TAOS_CHECK_GOTO(code, &lino, _exit);
|
||||
}
|
||||
char buf[7] = {0};
|
||||
(void)memcpy(buf, pBlock, c_len);
|
||||
taosMemoryFree(pBlock);
|
||||
(void)fprintf(stderr, "object content: %s\n", buf);
|
||||
(void)fprintf(stderr, "get object %s: success.\n\n", objectname[0]);
|
||||
// test range get
|
||||
uint8_t *pBlock = NULL;
|
||||
int c_offset = 10;
|
||||
int c_len = 6;
|
||||
|
||||
// delete test object
|
||||
(void)fprintf(stderr, "start to delete object: %s.\n", objectname[0]);
|
||||
code = s3DeleteObjects(objectname, 1);
|
||||
if (code != 0) {
|
||||
(void)fprintf(stderr, "delete object %s : failed.\n", objectname[0]);
|
||||
TAOS_CHECK_GOTO(code, &lino, _exit);
|
||||
(void)fprintf(stderr, "start to range get object %s offset: %d len: %d.\n", objectname[0], c_offset, c_len);
|
||||
code = s3GetObjectBlockByEp(objectname[0], c_offset, c_len, true, &pBlock, i);
|
||||
if (code != 0) {
|
||||
(void)fprintf(stderr, "get object %s : failed.\n", objectname[0]);
|
||||
TAOS_CHECK_GOTO(code, &lino, _exit);
|
||||
}
|
||||
char buf[7] = {0};
|
||||
(void)memcpy(buf, pBlock, c_len);
|
||||
taosMemoryFree(pBlock);
|
||||
(void)fprintf(stderr, "object content: %s\n", buf);
|
||||
(void)fprintf(stderr, "get object %s: success.\n\n", objectname[0]);
|
||||
|
||||
// delete test object
|
||||
(void)fprintf(stderr, "start to delete object: %s.\n", objectname[0]);
|
||||
code = s3DeleteObjectsByEp(objectname, 1, i);
|
||||
if (code != 0) {
|
||||
(void)fprintf(stderr, "delete object %s : failed.\n", objectname[0]);
|
||||
TAOS_CHECK_GOTO(code, &lino, _exit);
|
||||
}
|
||||
(void)fprintf(stderr, "delete object %s: success.\n\n", objectname[0]);
|
||||
}
|
||||
(void)fprintf(stderr, "delete object %s: success.\n\n", objectname[0]);
|
||||
|
||||
s3End();
|
||||
|
||||
_exit:
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
(void)fprintf(stderr, "s3 check failed, code: %d, line: %d, index: %d.\n", code, lino, i);
|
||||
}
|
||||
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
|
||||
|
@ -241,6 +261,27 @@ static void responseCompleteCallback(S3Status status, const S3ErrorDetails *erro
|
|||
static SArray *getListByPrefix(const char *prefix);
|
||||
static void s3FreeObjectKey(void *pItem);
|
||||
|
||||
static int32_t s3ListBucketByEp(char const *bucketname, int8_t epIndex) {
|
||||
int32_t code = 0;
|
||||
|
||||
SArray *objectArray = getListByPrefixByEp("s3", epIndex);
|
||||
if (objectArray == NULL) {
|
||||
TAOS_RETURN(TSDB_CODE_FAILED);
|
||||
}
|
||||
|
||||
const char **object_name = TARRAY_DATA(objectArray);
|
||||
int size = TARRAY_SIZE(objectArray);
|
||||
|
||||
(void)fprintf(stderr, "objects:\n");
|
||||
for (int i = 0; i < size; ++i) {
|
||||
(void)fprintf(stderr, "%s\n", object_name[i]);
|
||||
}
|
||||
|
||||
taosArrayDestroyEx(objectArray, s3FreeObjectKey);
|
||||
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
|
||||
static int32_t s3ListBucket(char const *bucketname) {
|
||||
int32_t code = 0;
|
||||
|
||||
|
@ -906,7 +947,7 @@ _exit:
|
|||
TAOS_RETURN(code);
|
||||
}
|
||||
|
||||
int32_t s3PutObjectFromFile2(const char *file, const char *object_name, int8_t withcp) {
|
||||
int32_t s3PutObjectFromFile2ByEp(const char *file, const char *object_name, int8_t withcp, int8_t epIndex) {
|
||||
int32_t code = 0;
|
||||
int32_t lmtime = 0;
|
||||
const char *filename = 0;
|
||||
|
@ -933,8 +974,14 @@ int32_t s3PutObjectFromFile2(const char *file, const char *object_name, int8_t w
|
|||
data.totalContentLength = data.totalOriginalContentLength = data.contentLength = data.originalContentLength =
|
||||
contentLength;
|
||||
|
||||
S3BucketContext bucketContext = {0, tsS3BucketName, protocolG, uriStyleG, tsS3AccessKeyId, tsS3AccessKeySecret,
|
||||
0, awsRegionG};
|
||||
S3BucketContext bucketContext = {0,
|
||||
tsS3BucketName[epIndex],
|
||||
protocolG,
|
||||
uriStyleG,
|
||||
tsS3AccessKeyId[epIndex],
|
||||
tsS3AccessKeySecret[epIndex],
|
||||
0,
|
||||
awsRegionG};
|
||||
|
||||
S3PutProperties putProperties = {contentType, md5,
|
||||
cacheControl, contentDispositionFilename,
|
||||
|
@ -961,7 +1008,23 @@ int32_t s3PutObjectFromFile2(const char *file, const char *object_name, int8_t w
|
|||
TAOS_RETURN(code);
|
||||
}
|
||||
|
||||
int32_t s3PutObjectFromFileOffset(const char *file, const char *object_name, int64_t offset, int64_t size) {
|
||||
int32_t s3PutObjectFromFile2(const char *file, const char *object_name, int8_t withcp) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
|
||||
int8_t startIndex = taosRand() % tsS3EpNum;
|
||||
for (int8_t i = 0; i < tsS3EpNum; ++i) {
|
||||
int8_t epIndex = (startIndex + i) % tsS3EpNum;
|
||||
code = s3PutObjectFromFile2ByEp(file, object_name, withcp, epIndex);
|
||||
if (code == TSDB_CODE_SUCCESS) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t s3PutObjectFromFileOffsetByEp(const char *file, const char *object_name, int64_t offset, int64_t size,
|
||||
int8_t epIndex) {
|
||||
int32_t code = 0;
|
||||
int32_t lmtime = 0;
|
||||
const char *filename = 0;
|
||||
|
@ -994,8 +1057,14 @@ int32_t s3PutObjectFromFileOffset(const char *file, const char *object_name, int
|
|||
data.totalContentLength = data.totalOriginalContentLength = data.contentLength = data.originalContentLength =
|
||||
contentLength;
|
||||
|
||||
S3BucketContext bucketContext = {0, tsS3BucketName, protocolG, uriStyleG, tsS3AccessKeyId, tsS3AccessKeySecret,
|
||||
0, awsRegionG};
|
||||
S3BucketContext bucketContext = {0,
|
||||
tsS3BucketName[epIndex],
|
||||
protocolG,
|
||||
uriStyleG,
|
||||
tsS3AccessKeyId[epIndex],
|
||||
tsS3AccessKeySecret[epIndex],
|
||||
0,
|
||||
awsRegionG};
|
||||
|
||||
S3PutProperties putProperties = {contentType, md5,
|
||||
cacheControl, contentDispositionFilename,
|
||||
|
@ -1018,6 +1087,21 @@ int32_t s3PutObjectFromFileOffset(const char *file, const char *object_name, int
|
|||
TAOS_RETURN(code);
|
||||
}
|
||||
|
||||
int32_t s3PutObjectFromFileOffset(const char *file, const char *object_name, int64_t offset, int64_t size) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
|
||||
int8_t startIndex = taosRand() % tsS3EpNum;
|
||||
for (int8_t i = 0; i < tsS3EpNum; ++i) {
|
||||
int8_t epIndex = (startIndex + i) % tsS3EpNum;
|
||||
code = s3PutObjectFromFileOffsetByEp(file, object_name, offset, size, epIndex);
|
||||
if (code == TSDB_CODE_SUCCESS) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
typedef struct list_bucket_callback_data {
|
||||
char err_msg[512];
|
||||
S3Status status;
|
||||
|
@ -1068,9 +1152,15 @@ static void s3FreeObjectKey(void *pItem) {
|
|||
taosMemoryFree(key);
|
||||
}
|
||||
|
||||
static SArray *getListByPrefix(const char *prefix) {
|
||||
S3BucketContext bucketContext = {0, tsS3BucketName, protocolG, uriStyleG, tsS3AccessKeyId, tsS3AccessKeySecret,
|
||||
0, awsRegionG};
|
||||
static SArray *getListByPrefixByEp(const char *prefix, int8_t epIndex) {
|
||||
S3BucketContext bucketContext = {0,
|
||||
tsS3BucketName[epIndex],
|
||||
protocolG,
|
||||
uriStyleG,
|
||||
tsS3AccessKeyId[epIndex],
|
||||
tsS3AccessKeySecret[epIndex],
|
||||
0,
|
||||
awsRegionG};
|
||||
S3ListBucketHandler listBucketHandler = {{&responsePropertiesCallbackNull, &responseCompleteCallback},
|
||||
&listBucketCallback};
|
||||
|
||||
|
@ -1114,11 +1204,31 @@ static SArray *getListByPrefix(const char *prefix) {
|
|||
return NULL;
|
||||
}
|
||||
|
||||
int32_t s3DeleteObjects(const char *object_name[], int nobject) {
|
||||
static SArray *getListByPrefix(const char *prefix) {
|
||||
SArray *objectArray = NULL;
|
||||
int8_t startIndex = taosRand() % tsS3EpNum;
|
||||
for (int8_t i = 0; i < tsS3EpNum; ++i) {
|
||||
int8_t epIndex = (startIndex + i) % tsS3EpNum;
|
||||
objectArray = getListByPrefixByEp(prefix, epIndex);
|
||||
if (objectArray) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
return objectArray;
|
||||
}
|
||||
|
||||
static int32_t s3DeleteObjectsByEp(const char *object_name[], int nobject, int8_t epIndex) {
|
||||
int32_t code = 0;
|
||||
|
||||
S3BucketContext bucketContext = {0, tsS3BucketName, protocolG, uriStyleG, tsS3AccessKeyId, tsS3AccessKeySecret,
|
||||
0, awsRegionG};
|
||||
S3BucketContext bucketContext = {0,
|
||||
tsS3BucketName[epIndex],
|
||||
protocolG,
|
||||
uriStyleG,
|
||||
tsS3AccessKeyId[epIndex],
|
||||
tsS3AccessKeySecret[epIndex],
|
||||
0,
|
||||
awsRegionG};
|
||||
S3ResponseHandler responseHandler = {0, &responseCompleteCallback};
|
||||
|
||||
for (int i = 0; i < nobject; ++i) {
|
||||
|
@ -1136,6 +1246,21 @@ int32_t s3DeleteObjects(const char *object_name[], int nobject) {
|
|||
TAOS_RETURN(code);
|
||||
}
|
||||
|
||||
int32_t s3DeleteObjects(const char *object_name[], int nobject) {
|
||||
int32_t code = 0;
|
||||
|
||||
int8_t startIndex = taosRand() % tsS3EpNum;
|
||||
for (int8_t i = 0; i < tsS3EpNum; ++i) {
|
||||
int8_t epIndex = (startIndex + i) % tsS3EpNum;
|
||||
code = s3DeleteObjectsByEp(object_name, nobject, epIndex);
|
||||
if (code == TSDB_CODE_SUCCESS) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
void s3DeleteObjectsByPrefix(const char *prefix) {
|
||||
SArray *objectArray = getListByPrefix(prefix);
|
||||
if (objectArray == NULL) return;
|
||||
|
@ -1166,13 +1291,20 @@ static S3Status getObjectDataCallback(int bufferSize, const char *buffer, void *
|
|||
}
|
||||
}
|
||||
|
||||
int32_t s3GetObjectBlock(const char *object_name, int64_t offset, int64_t size, bool check, uint8_t **ppBlock) {
|
||||
static int32_t s3GetObjectBlockByEp(const char *object_name, int64_t offset, int64_t size, bool check,
|
||||
uint8_t **ppBlock, int8_t epIndex) {
|
||||
int status = 0;
|
||||
int64_t ifModifiedSince = -1, ifNotModifiedSince = -1;
|
||||
const char *ifMatch = 0, *ifNotMatch = 0;
|
||||
|
||||
S3BucketContext bucketContext = {0, tsS3BucketName, protocolG, uriStyleG, tsS3AccessKeyId, tsS3AccessKeySecret,
|
||||
0, awsRegionG};
|
||||
S3BucketContext bucketContext = {0,
|
||||
tsS3BucketName[epIndex],
|
||||
protocolG,
|
||||
uriStyleG,
|
||||
tsS3AccessKeyId[epIndex],
|
||||
tsS3AccessKeySecret[epIndex],
|
||||
0,
|
||||
awsRegionG};
|
||||
S3GetConditions getConditions = {ifModifiedSince, ifNotModifiedSince, ifMatch, ifNotMatch};
|
||||
S3GetObjectHandler getObjectHandler = {{&responsePropertiesCallback, &responseCompleteCallback},
|
||||
&getObjectDataCallback};
|
||||
|
@ -1213,18 +1345,39 @@ _retry:
|
|||
TAOS_RETURN(TSDB_CODE_SUCCESS);
|
||||
}
|
||||
|
||||
int32_t s3GetObjectBlock(const char *object_name, int64_t offset, int64_t size, bool check, uint8_t **ppBlock) {
|
||||
int32_t code = 0;
|
||||
|
||||
int8_t startIndex = taosRand() % tsS3EpNum;
|
||||
for (int8_t i = 0; i < tsS3EpNum; ++i) {
|
||||
int8_t epIndex = (startIndex + i) % tsS3EpNum;
|
||||
code = s3GetObjectBlockByEp(object_name, offset, size, check, ppBlock, epIndex);
|
||||
if (code == TSDB_CODE_SUCCESS) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
static S3Status getObjectCallback(int bufferSize, const char *buffer, void *callbackData) {
|
||||
TS3GetData *cbd = (TS3GetData *)callbackData;
|
||||
size_t wrote = taosWriteFile(cbd->file, buffer, bufferSize);
|
||||
return ((wrote < (size_t)bufferSize) ? S3StatusAbortedByCallback : S3StatusOK);
|
||||
}
|
||||
|
||||
int32_t s3GetObjectToFile(const char *object_name, const char *fileName) {
|
||||
static int32_t s3GetObjectToFileByEp(const char *object_name, const char *fileName, int8_t epIndex) {
|
||||
int64_t ifModifiedSince = -1, ifNotModifiedSince = -1;
|
||||
const char *ifMatch = 0, *ifNotMatch = 0;
|
||||
|
||||
S3BucketContext bucketContext = {0, tsS3BucketName, protocolG, uriStyleG, tsS3AccessKeyId, tsS3AccessKeySecret,
|
||||
0, awsRegionG};
|
||||
S3BucketContext bucketContext = {0,
|
||||
tsS3BucketName[epIndex],
|
||||
protocolG,
|
||||
uriStyleG,
|
||||
tsS3AccessKeyId[epIndex],
|
||||
tsS3AccessKeySecret[epIndex],
|
||||
0,
|
||||
awsRegionG};
|
||||
S3GetConditions getConditions = {ifModifiedSince, ifNotModifiedSince, ifMatch, ifNotMatch};
|
||||
S3GetObjectHandler getObjectHandler = {{&responsePropertiesCallbackNull, &responseCompleteCallback},
|
||||
&getObjectCallback};
|
||||
|
@ -1252,6 +1405,21 @@ int32_t s3GetObjectToFile(const char *object_name, const char *fileName) {
|
|||
TAOS_RETURN(TSDB_CODE_SUCCESS);
|
||||
}
|
||||
|
||||
int32_t s3GetObjectToFile(const char *object_name, const char *fileName) {
|
||||
int32_t code = 0;
|
||||
|
||||
int8_t startIndex = taosRand() % tsS3EpNum;
|
||||
for (int8_t i = 0; i < tsS3EpNum; ++i) {
|
||||
int8_t epIndex = (startIndex + i) % tsS3EpNum;
|
||||
code = s3GetObjectToFileByEp(object_name, fileName, epIndex);
|
||||
if (code == TSDB_CODE_SUCCESS) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t s3GetObjectsByPrefix(const char *prefix, const char *path) {
|
||||
SArray *objectArray = getListByPrefix(prefix);
|
||||
if (objectArray == NULL) TAOS_RETURN(TSDB_CODE_FAILED);
|
||||
|
@ -1275,12 +1443,18 @@ int32_t s3GetObjectsByPrefix(const char *prefix, const char *path) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
long s3Size(const char *object_name) {
|
||||
static long s3SizeByEp(const char *object_name, int8_t epIndex) {
|
||||
long size = 0;
|
||||
int status = 0;
|
||||
|
||||
S3BucketContext bucketContext = {0, tsS3BucketName, protocolG, uriStyleG, tsS3AccessKeyId, tsS3AccessKeySecret,
|
||||
0, awsRegionG};
|
||||
S3BucketContext bucketContext = {0,
|
||||
tsS3BucketName[epIndex],
|
||||
protocolG,
|
||||
uriStyleG,
|
||||
tsS3AccessKeyId[epIndex],
|
||||
tsS3AccessKeySecret[epIndex],
|
||||
0,
|
||||
awsRegionG};
|
||||
|
||||
S3ResponseHandler responseHandler = {&responsePropertiesCallback, &responseCompleteCallback};
|
||||
|
||||
|
@ -1300,6 +1474,21 @@ long s3Size(const char *object_name) {
|
|||
return size;
|
||||
}
|
||||
|
||||
long s3Size(const char *object_name) {
|
||||
long size = 0;
|
||||
|
||||
int8_t startIndex = taosRand() % tsS3EpNum;
|
||||
for (int8_t i = 0; i < tsS3EpNum; ++i) {
|
||||
int8_t epIndex = (startIndex + i) % tsS3EpNum;
|
||||
size = s3SizeByEp(object_name, epIndex);
|
||||
if (size > 0) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
return size;
|
||||
}
|
||||
|
||||
void s3EvictCache(const char *path, long object_size) {}
|
||||
|
||||
#elif defined(USE_COS)
|
||||
|
|
|
@ -294,19 +294,20 @@ bool tsFilterScalarMode = false;
|
|||
int tsResolveFQDNRetryTime = 100; // seconds
|
||||
int tsStreamAggCnt = 100000;
|
||||
|
||||
char tsS3Endpoint[TSDB_FQDN_LEN] = "<endpoint>";
|
||||
char tsS3AccessKey[TSDB_FQDN_LEN] = "<accesskey>";
|
||||
char tsS3AccessKeyId[TSDB_FQDN_LEN] = "<accesskeyid>";
|
||||
char tsS3AccessKeySecret[TSDB_FQDN_LEN] = "<accesskeysecrect>";
|
||||
char tsS3BucketName[TSDB_FQDN_LEN] = "<bucketname>";
|
||||
char tsS3AppId[TSDB_FQDN_LEN] = "<appid>";
|
||||
int8_t tsS3EpNum = 0;
|
||||
char tsS3Endpoint[TSDB_MAX_EP_NUM][TSDB_FQDN_LEN] = {"<endpoint>"};
|
||||
char tsS3AccessKey[TSDB_MAX_EP_NUM][TSDB_FQDN_LEN] = {"<accesskey>"};
|
||||
char tsS3AccessKeyId[TSDB_MAX_EP_NUM][TSDB_FQDN_LEN] = {"<accesskeyid>"};
|
||||
char tsS3AccessKeySecret[TSDB_MAX_EP_NUM][TSDB_FQDN_LEN] = {"<accesskeysecrect>"};
|
||||
char tsS3BucketName[TSDB_MAX_EP_NUM][TSDB_FQDN_LEN] = {"<bucketname>"};
|
||||
char tsS3AppId[TSDB_MAX_EP_NUM][TSDB_FQDN_LEN] = {"<appid>"};
|
||||
int8_t tsS3Enabled = false;
|
||||
int8_t tsS3EnabledCfg = false;
|
||||
int8_t tsS3Oss = false;
|
||||
int8_t tsS3StreamEnabled = false;
|
||||
|
||||
int8_t tsS3Https = true;
|
||||
char tsS3Hostname[TSDB_FQDN_LEN] = "<hostname>";
|
||||
char tsS3Hostname[TSDB_MAX_EP_NUM][TSDB_FQDN_LEN] = {"<hostname>"};
|
||||
|
||||
int32_t tsS3BlockSize = -1; // number of tsdb pages (4096)
|
||||
int32_t tsS3BlockCacheSize = 16; // number of blocks
|
||||
|
@ -349,49 +350,89 @@ int32_t taosSetTfsCfg(SConfig *pCfg) {
|
|||
int32_t taosSetTfsCfg(SConfig *pCfg);
|
||||
#endif
|
||||
|
||||
int32_t taosSetS3Cfg(SConfig *pCfg) {
|
||||
SConfigItem *pItem = NULL;
|
||||
static int32_t taosSplitS3Cfg(SConfig *pCfg, const char *name, char gVarible[TSDB_MAX_EP_NUM][TSDB_FQDN_LEN],
|
||||
int8_t *pNum) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
|
||||
TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "s3Accesskey");
|
||||
tstrncpy(tsS3AccessKey, pItem->str, TSDB_FQDN_LEN);
|
||||
if (tsS3AccessKey[0] == '<') {
|
||||
SConfigItem *pItem = NULL;
|
||||
int32_t num = 0;
|
||||
TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, name);
|
||||
|
||||
char *strDup = NULL;
|
||||
if ((strDup = taosStrdup(pItem->str))== NULL){
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _exit;
|
||||
}
|
||||
|
||||
char **slices = strsplit(strDup, ",", &num);
|
||||
if (num > TSDB_MAX_EP_NUM) {
|
||||
code = TSDB_CODE_INVALID_CFG;
|
||||
goto _exit;
|
||||
}
|
||||
|
||||
for (int i = 0; i < num; ++i) {
|
||||
tstrncpy(gVarible[i], slices[i], TSDB_FQDN_LEN);
|
||||
}
|
||||
*pNum = num;
|
||||
|
||||
_exit:
|
||||
taosMemoryFreeClear(slices);
|
||||
taosMemoryFreeClear(strDup);
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
|
||||
int32_t taosSetS3Cfg(SConfig *pCfg) {
|
||||
int8_t num = 0;
|
||||
|
||||
TAOS_CHECK_RETURN(taosSplitS3Cfg(pCfg, "s3Accesskey", tsS3AccessKey, &num));
|
||||
if (num == 0) TAOS_RETURN(TSDB_CODE_SUCCESS);
|
||||
|
||||
tsS3EpNum = num;
|
||||
|
||||
if (tsS3AccessKey[0][0] == '<') {
|
||||
TAOS_RETURN(TSDB_CODE_SUCCESS);
|
||||
}
|
||||
char *colon = strchr(tsS3AccessKey, ':');
|
||||
if (!colon) {
|
||||
uError("invalid access key:%s", tsS3AccessKey);
|
||||
TAOS_RETURN(TSDB_CODE_INVALID_CFG);
|
||||
}
|
||||
*colon = '\0';
|
||||
tstrncpy(tsS3AccessKeyId, tsS3AccessKey, TSDB_FQDN_LEN);
|
||||
tstrncpy(tsS3AccessKeySecret, colon + 1, TSDB_FQDN_LEN);
|
||||
TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "s3Endpoint");
|
||||
tstrncpy(tsS3Endpoint, pItem->str, TSDB_FQDN_LEN);
|
||||
TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "s3BucketName");
|
||||
tstrncpy(tsS3BucketName, pItem->str, TSDB_FQDN_LEN);
|
||||
char *proto = strstr(tsS3Endpoint, "https://");
|
||||
if (!proto) {
|
||||
tsS3Https = false;
|
||||
tstrncpy(tsS3Hostname, tsS3Endpoint + 7, TSDB_FQDN_LEN);
|
||||
} else {
|
||||
tstrncpy(tsS3Hostname, tsS3Endpoint + 8, TSDB_FQDN_LEN);
|
||||
for (int i = 0; i < tsS3EpNum; ++i) {
|
||||
char *colon = strchr(tsS3AccessKey[i], ':');
|
||||
if (!colon) {
|
||||
uError("invalid access key:%s", tsS3AccessKey[i]);
|
||||
TAOS_RETURN(TSDB_CODE_INVALID_CFG);
|
||||
}
|
||||
*colon = '\0';
|
||||
tstrncpy(tsS3AccessKeyId[i], tsS3AccessKey[i], TSDB_FQDN_LEN);
|
||||
tstrncpy(tsS3AccessKeySecret[i], colon + 1, TSDB_FQDN_LEN);
|
||||
}
|
||||
|
||||
char *cos = strstr(tsS3Endpoint, "cos.");
|
||||
if (cos) {
|
||||
char *appid = strrchr(tsS3BucketName, '-');
|
||||
if (!appid) {
|
||||
uError("failed to locate appid in bucket:%s", tsS3BucketName);
|
||||
TAOS_RETURN(TSDB_CODE_INVALID_CFG);
|
||||
TAOS_CHECK_RETURN(taosSplitS3Cfg(pCfg, "s3Endpoint", tsS3Endpoint, &num));
|
||||
if (num != tsS3EpNum) TAOS_RETURN(TSDB_CODE_INVALID_CFG);
|
||||
|
||||
TAOS_CHECK_RETURN(taosSplitS3Cfg(pCfg, "s3BucketName", tsS3BucketName, &num));
|
||||
if (num != tsS3EpNum) TAOS_RETURN(TSDB_CODE_INVALID_CFG);
|
||||
|
||||
for (int i = 0; i < tsS3EpNum; ++i) {
|
||||
char *proto = strstr(tsS3Endpoint[i], "https://");
|
||||
if (!proto) {
|
||||
tstrncpy(tsS3Hostname[i], tsS3Endpoint[i] + 7, TSDB_FQDN_LEN);
|
||||
} else {
|
||||
tstrncpy(tsS3AppId, appid + 1, TSDB_FQDN_LEN);
|
||||
tstrncpy(tsS3Hostname[i], tsS3Endpoint[i] + 8, TSDB_FQDN_LEN);
|
||||
}
|
||||
|
||||
char *cos = strstr(tsS3Endpoint[i], "cos.");
|
||||
if (cos) {
|
||||
char *appid = strrchr(tsS3BucketName[i], '-');
|
||||
if (!appid) {
|
||||
uError("failed to locate appid in bucket:%s", tsS3BucketName[i]);
|
||||
TAOS_RETURN(TSDB_CODE_INVALID_CFG);
|
||||
} else {
|
||||
tstrncpy(tsS3AppId[i], appid + 1, TSDB_FQDN_LEN);
|
||||
}
|
||||
}
|
||||
}
|
||||
char *oss = strstr(tsS3Endpoint, "aliyuncs.");
|
||||
if (oss) {
|
||||
tsS3Oss = true;
|
||||
}
|
||||
if (tsS3BucketName[0] != '<') {
|
||||
|
||||
tsS3Https = (strstr(tsS3Endpoint[0], "https://") != NULL);
|
||||
tsS3Oss = (strstr(tsS3Endpoint[0], "aliyuncs.") != NULL);
|
||||
|
||||
if (tsS3BucketName[0][0] != '<') {
|
||||
#if defined(USE_COS) || defined(USE_S3)
|
||||
#ifdef TD_ENTERPRISE
|
||||
/*if (tsDiskCfgNum > 1) */ tsS3Enabled = true;
|
||||
|
@ -775,9 +816,9 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
|
|||
TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "pqSortMemThreshold", tsPQSortMemThreshold, 1, 10240, CFG_SCOPE_SERVER, CFG_DYN_NONE));
|
||||
TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "resolveFQDNRetryTime", tsResolveFQDNRetryTime, 1, 10240, CFG_SCOPE_SERVER, CFG_DYN_NONE));
|
||||
|
||||
TAOS_CHECK_RETURN(cfgAddString(pCfg, "s3Accesskey", tsS3AccessKey, CFG_SCOPE_SERVER, CFG_DYN_NONE));
|
||||
TAOS_CHECK_RETURN(cfgAddString(pCfg, "s3Endpoint", tsS3Endpoint, CFG_SCOPE_SERVER, CFG_DYN_NONE));
|
||||
TAOS_CHECK_RETURN(cfgAddString(pCfg, "s3BucketName", tsS3BucketName, CFG_SCOPE_SERVER, CFG_DYN_NONE));
|
||||
TAOS_CHECK_RETURN(cfgAddString(pCfg, "s3Accesskey", tsS3AccessKey[0], CFG_SCOPE_SERVER, CFG_DYN_NONE));
|
||||
TAOS_CHECK_RETURN(cfgAddString(pCfg, "s3Endpoint", tsS3Endpoint[0], CFG_SCOPE_SERVER, CFG_DYN_NONE));
|
||||
TAOS_CHECK_RETURN(cfgAddString(pCfg, "s3BucketName", tsS3BucketName[0], CFG_SCOPE_SERVER, CFG_DYN_NONE));
|
||||
|
||||
TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "s3PageCacheSize", tsS3PageCacheSize, 4, 1024 * 1024 * 1024, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER));
|
||||
TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "s3UploadDelaySec", tsS3UploadDelaySec, 1, 60 * 60 * 24 * 30, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER));
|
||||
|
|
Loading…
Reference in New Issue