Merge pull request #23600 from taosdata/fix/TD-27206
fix(vnode/cos): fix error printing to avoid buffer overflow
This commit is contained in:
commit
ec3685db94
|
@ -39,8 +39,8 @@ void s3DeleteObjectsByPrefix(const char *prefix);
|
|||
void s3DeleteObjects(const char *object_name[], int nobject);
|
||||
bool s3Exists(const char *object_name);
|
||||
bool s3Get(const char *object_name, const char *path);
|
||||
int32_t s3GetObjectsByPrefix(const char *prefix, const char* path);
|
||||
int32_t s3GetObjectBlock(const char *object_name, int64_t offset, int64_t size, uint8_t **ppBlock);
|
||||
int32_t s3GetObjectBlock(const char *object_name, int64_t offset, int64_t size, bool check, uint8_t **ppBlock);
|
||||
int32_t s3GetObjectsByPrefix(const char *prefix, const char *path);
|
||||
void s3EvictCache(const char *path, long object_size);
|
||||
long s3Size(const char *object_name);
|
||||
|
||||
|
|
|
@ -25,7 +25,7 @@ static S3UriStyle uriStyleG = S3UriStylePath;
|
|||
static int retriesG = 5;
|
||||
static int timeoutMsG = 0;
|
||||
|
||||
static int32_t s3Begin() {
|
||||
int32_t s3Begin() {
|
||||
S3Status status;
|
||||
const char *hostname = tsS3Hostname;
|
||||
const char *env_hn = getenv("S3_HOSTNAME");
|
||||
|
@ -44,10 +44,12 @@ static int32_t s3Begin() {
|
|||
return 0;
|
||||
}
|
||||
|
||||
static void s3End() { S3_deinitialize(); }
|
||||
int32_t s3Init() { return s3Begin(); }
|
||||
void s3End() { S3_deinitialize(); }
|
||||
|
||||
void s3CleanUp() { s3End(); }
|
||||
int32_t s3Init() { return 0; /*s3Begin();*/ }
|
||||
|
||||
void s3CleanUp() { /*s3End();*/
|
||||
}
|
||||
|
||||
static int should_retry() {
|
||||
/*
|
||||
|
@ -73,8 +75,9 @@ static void s3PrintError(const char *func, S3Status status, char error_details[]
|
|||
}
|
||||
|
||||
typedef struct {
|
||||
char err_msg[128];
|
||||
char err_msg[512];
|
||||
S3Status status;
|
||||
uint64_t content_length;
|
||||
TdFilePtr file;
|
||||
} TS3GetData;
|
||||
|
||||
|
@ -83,10 +86,11 @@ typedef struct {
|
|||
S3Status status;
|
||||
uint64_t content_length;
|
||||
char *buf;
|
||||
int64_t buf_pos;
|
||||
} TS3SizeCBD;
|
||||
|
||||
static S3Status responsePropertiesCallbackNull(const S3ResponseProperties *properties, void *callbackData) {
|
||||
// (void)callbackData;
|
||||
// (void)callbackData;
|
||||
return S3StatusOK;
|
||||
}
|
||||
|
||||
|
@ -109,20 +113,22 @@ static void responseCompleteCallback(S3Status status, const S3ErrorDetails *erro
|
|||
int len = 0;
|
||||
const int elen = sizeof(cbd->err_msg);
|
||||
if (error) {
|
||||
if (error->message) {
|
||||
if (error->message && elen - len > 0) {
|
||||
len += snprintf(&(cbd->err_msg[len]), elen - len, " Message: %s\n", error->message);
|
||||
}
|
||||
if (error->resource) {
|
||||
if (error->resource && elen - len > 0) {
|
||||
len += snprintf(&(cbd->err_msg[len]), elen - len, " Resource: %s\n", error->resource);
|
||||
}
|
||||
if (error->furtherDetails) {
|
||||
if (error->furtherDetails && elen - len > 0) {
|
||||
len += snprintf(&(cbd->err_msg[len]), elen - len, " Further Details: %s\n", error->furtherDetails);
|
||||
}
|
||||
if (error->extraDetailsCount) {
|
||||
if (error->extraDetailsCount && elen - len > 0) {
|
||||
len += snprintf(&(cbd->err_msg[len]), elen - len, "%s", " Extra Details:\n");
|
||||
for (int i = 0; i < error->extraDetailsCount; i++) {
|
||||
len += snprintf(&(cbd->err_msg[len]), elen - len, " %s: %s\n", error->extraDetails[i].name,
|
||||
error->extraDetails[i].value);
|
||||
if (elen - len > 0) {
|
||||
len += snprintf(&(cbd->err_msg[len]), elen - len, " %s: %s\n", error->extraDetails[i].name,
|
||||
error->extraDetails[i].value);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -213,8 +219,9 @@ static void growbuffer_destroy(growbuffer *gb) {
|
|||
}
|
||||
|
||||
typedef struct put_object_callback_data {
|
||||
char err_msg[128];
|
||||
char err_msg[512];
|
||||
S3Status status;
|
||||
uint64_t content_length;
|
||||
// FILE *infile;
|
||||
TdFilePtr infileFD;
|
||||
growbuffer *gb;
|
||||
|
@ -226,8 +233,9 @@ typedef struct put_object_callback_data {
|
|||
#define MULTIPART_CHUNK_SIZE (768 << 20) // multipart is 768M
|
||||
|
||||
typedef struct UploadManager {
|
||||
char err_msg[128];
|
||||
char err_msg[512];
|
||||
S3Status status;
|
||||
uint64_t content_length;
|
||||
// used for initial multipart
|
||||
char *upload_id;
|
||||
|
||||
|
@ -241,8 +249,9 @@ typedef struct UploadManager {
|
|||
} UploadManager;
|
||||
|
||||
typedef struct list_parts_callback_data {
|
||||
char err_msg[128];
|
||||
char err_msg[512];
|
||||
S3Status status;
|
||||
uint64_t content_length;
|
||||
int isTruncated;
|
||||
char nextPartNumberMarker[24];
|
||||
char initiatorId[256];
|
||||
|
@ -258,7 +267,7 @@ typedef struct list_parts_callback_data {
|
|||
} list_parts_callback_data;
|
||||
|
||||
typedef struct MultipartPartData {
|
||||
char err_msg[128];
|
||||
char err_msg[512];
|
||||
S3Status status;
|
||||
put_object_callback_data put_object_data;
|
||||
int seq;
|
||||
|
@ -402,7 +411,8 @@ static int try_get_parts_info(const char *bucketName, const char *key, UploadMan
|
|||
S3BucketContext bucketContext = {0, tsS3BucketName, protocolG, uriStyleG, tsS3AccessKeyId, tsS3AccessKeySecret,
|
||||
0, awsRegionG};
|
||||
|
||||
S3ListPartsHandler listPartsHandler = {{&responsePropertiesCallbackNull, &responseCompleteCallback}, &listPartsCallback};
|
||||
S3ListPartsHandler listPartsHandler = {{&responsePropertiesCallbackNull, &responseCompleteCallback},
|
||||
&listPartsCallback};
|
||||
|
||||
list_parts_callback_data data;
|
||||
|
||||
|
@ -621,7 +631,7 @@ int32_t s3PutObjectFromFile2(const char *file, const char *object) {
|
|||
}
|
||||
|
||||
typedef struct list_bucket_callback_data {
|
||||
char err_msg[128];
|
||||
char err_msg[512];
|
||||
S3Status status;
|
||||
int isTruncated;
|
||||
char nextMarker[1024];
|
||||
|
@ -670,7 +680,7 @@ static void s3FreeObjectKey(void *pItem) {
|
|||
taosMemoryFree(key);
|
||||
}
|
||||
|
||||
static SArray* getListByPrefix(const char *prefix){
|
||||
static SArray *getListByPrefix(const char *prefix) {
|
||||
S3BucketContext bucketContext = {0, tsS3BucketName, protocolG, uriStyleG, tsS3AccessKeyId, tsS3AccessKeySecret,
|
||||
0, awsRegionG};
|
||||
S3ListBucketHandler listBucketHandler = {{&responsePropertiesCallbackNull, &responseCompleteCallback},
|
||||
|
@ -679,7 +689,7 @@ static SArray* getListByPrefix(const char *prefix){
|
|||
const char *marker = 0, *delimiter = 0;
|
||||
int maxkeys = 0, allDetails = 0;
|
||||
list_bucket_callback_data data;
|
||||
data.objectArray = taosArrayInit(32, sizeof(void*));
|
||||
data.objectArray = taosArrayInit(32, sizeof(void *));
|
||||
if (!data.objectArray) {
|
||||
uError("%s: %s", __func__, "out of memoty");
|
||||
return NULL;
|
||||
|
@ -731,23 +741,27 @@ void s3DeleteObjects(const char *object_name[], int nobject) {
|
|||
}
|
||||
|
||||
void s3DeleteObjectsByPrefix(const char *prefix) {
|
||||
SArray* objectArray = getListByPrefix(prefix);
|
||||
if(objectArray == NULL)return;
|
||||
SArray *objectArray = getListByPrefix(prefix);
|
||||
if (objectArray == NULL) return;
|
||||
s3DeleteObjects(TARRAY_DATA(objectArray), TARRAY_SIZE(objectArray));
|
||||
taosArrayDestroyEx(objectArray, s3FreeObjectKey);
|
||||
}
|
||||
|
||||
static S3Status getObjectDataCallback(int bufferSize, const char *buffer, void *callbackData) {
|
||||
TS3SizeCBD *cbd = callbackData;
|
||||
/*
|
||||
if (cbd->content_length != bufferSize) {
|
||||
cbd->status = S3StatusAbortedByCallback;
|
||||
return S3StatusAbortedByCallback;
|
||||
}
|
||||
*/
|
||||
if (!cbd->buf) {
|
||||
cbd->buf = taosMemoryCalloc(1, cbd->content_length);
|
||||
}
|
||||
|
||||
char *buf = taosMemoryCalloc(1, bufferSize);
|
||||
if (buf) {
|
||||
memcpy(buf, buffer, bufferSize);
|
||||
cbd->buf = buf;
|
||||
if (cbd->buf) {
|
||||
memcpy(cbd->buf + cbd->buf_pos, buffer, bufferSize);
|
||||
cbd->buf_pos += bufferSize;
|
||||
cbd->status = S3StatusOK;
|
||||
return S3StatusOK;
|
||||
} else {
|
||||
|
@ -756,7 +770,7 @@ static S3Status getObjectDataCallback(int bufferSize, const char *buffer, void *
|
|||
}
|
||||
}
|
||||
|
||||
int32_t s3GetObjectBlock(const char *object_name, int64_t offset, int64_t size, uint8_t **ppBlock) {
|
||||
int32_t s3GetObjectBlock(const char *object_name, int64_t offset, int64_t size, bool check, uint8_t **ppBlock) {
|
||||
int status = 0;
|
||||
int64_t ifModifiedSince = -1, ifNotModifiedSince = -1;
|
||||
const char *ifMatch = 0, *ifNotMatch = 0;
|
||||
|
@ -769,6 +783,7 @@ int32_t s3GetObjectBlock(const char *object_name, int64_t offset, int64_t size,
|
|||
|
||||
TS3SizeCBD cbd = {0};
|
||||
cbd.content_length = size;
|
||||
cbd.buf_pos = 0;
|
||||
do {
|
||||
S3_get_object(&bucketContext, object_name, &getConditions, offset, size, 0, 0, &getObjectHandler, &cbd);
|
||||
} while (S3_status_is_retryable(cbd.status) && should_retry());
|
||||
|
@ -778,19 +793,23 @@ int32_t s3GetObjectBlock(const char *object_name, int64_t offset, int64_t size,
|
|||
return TAOS_SYSTEM_ERROR(EIO);
|
||||
}
|
||||
|
||||
if (check && cbd.buf_pos != size) {
|
||||
uError("%s: %d(%s)", __func__, cbd.status, cbd.err_msg);
|
||||
return TAOS_SYSTEM_ERROR(EIO);
|
||||
}
|
||||
|
||||
*ppBlock = cbd.buf;
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static S3Status getObjectCallback(int bufferSize, const char *buffer, void *callbackData) {
|
||||
TS3GetData *cbd = (TS3GetData *) callbackData;
|
||||
size_t wrote = taosWriteFile(cbd->file, buffer, bufferSize);
|
||||
return ((wrote < (size_t) bufferSize) ?
|
||||
S3StatusAbortedByCallback : S3StatusOK);
|
||||
TS3GetData *cbd = (TS3GetData *)callbackData;
|
||||
size_t wrote = taosWriteFile(cbd->file, buffer, bufferSize);
|
||||
return ((wrote < (size_t)bufferSize) ? S3StatusAbortedByCallback : S3StatusOK);
|
||||
}
|
||||
|
||||
int32_t s3GetObjectToFile(const char *object_name, char* fileName) {
|
||||
int32_t s3GetObjectToFile(const char *object_name, char *fileName) {
|
||||
int64_t ifModifiedSince = -1, ifNotModifiedSince = -1;
|
||||
const char *ifMatch = 0, *ifNotMatch = 0;
|
||||
|
||||
|
@ -821,21 +840,21 @@ int32_t s3GetObjectToFile(const char *object_name, char* fileName) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t s3GetObjectsByPrefix(const char *prefix, const char* path){
|
||||
SArray* objectArray = getListByPrefix(prefix);
|
||||
if(objectArray == NULL) return -1;
|
||||
int32_t s3GetObjectsByPrefix(const char *prefix, const char *path) {
|
||||
SArray *objectArray = getListByPrefix(prefix);
|
||||
if (objectArray == NULL) return -1;
|
||||
|
||||
for(size_t i = 0; i < taosArrayGetSize(objectArray); i++){
|
||||
char* object = taosArrayGetP(objectArray, i);
|
||||
const char* tmp = strchr(object, '/');
|
||||
for (size_t i = 0; i < taosArrayGetSize(objectArray); i++) {
|
||||
char *object = taosArrayGetP(objectArray, i);
|
||||
const char *tmp = strchr(object, '/');
|
||||
tmp = (tmp == NULL) ? object : tmp + 1;
|
||||
char fileName[PATH_MAX] = {0};
|
||||
if(path[strlen(path) - 1] != TD_DIRSEP_CHAR){
|
||||
if (path[strlen(path) - 1] != TD_DIRSEP_CHAR) {
|
||||
snprintf(fileName, PATH_MAX, "%s%s%s", path, TD_DIRSEP, tmp);
|
||||
}else{
|
||||
} else {
|
||||
snprintf(fileName, PATH_MAX, "%s%s", path, tmp);
|
||||
}
|
||||
if(s3GetObjectToFile(object, fileName) != 0){
|
||||
if (s3GetObjectToFile(object, fileName) != 0) {
|
||||
taosArrayDestroyEx(objectArray, s3FreeObjectKey);
|
||||
return -1;
|
||||
}
|
||||
|
@ -1122,7 +1141,8 @@ bool s3Get(const char *object_name, const char *path) {
|
|||
return ret;
|
||||
}
|
||||
|
||||
int32_t s3GetObjectBlock(const char *object_name, int64_t offset, int64_t block_size, uint8_t **ppBlock) {
|
||||
int32_t s3GetObjectBlock(const char *object_name, int64_t offset, int64_t block_size, bool check, uint8_t **ppBlock) {
|
||||
(void)check;
|
||||
int32_t code = 0;
|
||||
cos_pool_t *p = NULL;
|
||||
int is_cname = 0;
|
||||
|
@ -1314,9 +1334,11 @@ void s3DeleteObjectsByPrefix(const char *prefix) {}
|
|||
void s3DeleteObjects(const char *object_name[], int nobject) {}
|
||||
bool s3Exists(const char *object_name) { return false; }
|
||||
bool s3Get(const char *object_name, const char *path) { return false; }
|
||||
int32_t s3GetObjectsByPrefix(const char *prefix, const char* path) { return 0; }
|
||||
int32_t s3GetObjectBlock(const char *object_name, int64_t offset, int64_t size, uint8_t **ppBlock) { return 0; }
|
||||
int32_t s3GetObjectBlock(const char *object_name, int64_t offset, int64_t size, bool check, uint8_t **ppBlock) {
|
||||
return 0;
|
||||
}
|
||||
void s3EvictCache(const char *path, long object_size) {}
|
||||
long s3Size(const char *object_name) { return 0; }
|
||||
int32_t s3GetObjectsByPrefix(const char *prefix, const char *path) { return 0; }
|
||||
|
||||
#endif
|
||||
|
|
|
@ -94,8 +94,8 @@ int32_t tsMonitorMaxLogs = 100;
|
|||
bool tsMonitorComp = false;
|
||||
|
||||
// audit
|
||||
bool tsEnableAudit = true;
|
||||
bool tsEnableAuditCreateTable = true;
|
||||
bool tsEnableAudit = true;
|
||||
bool tsEnableAuditCreateTable = true;
|
||||
|
||||
// telem
|
||||
#ifdef TD_ENTERPRISE
|
||||
|
@ -128,12 +128,12 @@ char tsSmlAutoChildTableNameDelimiter[TSDB_TABLE_NAME_LEN] = "";
|
|||
// int32_t tsSmlBatchSize = 10000;
|
||||
|
||||
// checkpoint backup
|
||||
char tsSnodeAddress[TSDB_FQDN_LEN] = {0};
|
||||
char tsSnodeAddress[TSDB_FQDN_LEN] = {0};
|
||||
int32_t tsRsyncPort = 873;
|
||||
#ifdef WINDOWS
|
||||
char tsCheckpointBackupDir[PATH_MAX] = "C:\\TDengine\\data\\backup\\checkpoint\\";
|
||||
#else
|
||||
char tsCheckpointBackupDir[PATH_MAX] = "/var/lib/taos/backup/checkpoint/";
|
||||
char tsCheckpointBackupDir[PATH_MAX] = "/var/lib/taos/backup/checkpoint/";
|
||||
#endif
|
||||
|
||||
// tmq
|
||||
|
@ -231,7 +231,7 @@ char tsCompressor[32] = "ZSTD_COMPRESSOR"; // ZSTD_COMPRESSOR or GZIP_COMPR
|
|||
#ifdef WINDOWS
|
||||
bool tsStartUdfd = false;
|
||||
#else
|
||||
bool tsStartUdfd = true;
|
||||
bool tsStartUdfd = true;
|
||||
#endif
|
||||
|
||||
// wal
|
||||
|
@ -335,7 +335,7 @@ int32_t taosSetS3Cfg(SConfig *pCfg) {
|
|||
}
|
||||
if (tsS3BucketName[0] != '<') {
|
||||
#if defined(USE_COS) || defined(USE_S3)
|
||||
if(tsDiskCfgNum > 1) tsS3Enabled = true;
|
||||
if (tsDiskCfgNum > 1) tsS3Enabled = true;
|
||||
tsS3StreamEnabled = true;
|
||||
#endif
|
||||
}
|
||||
|
@ -343,7 +343,9 @@ int32_t taosSetS3Cfg(SConfig *pCfg) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
struct SConfig *taosGetCfg() { return tsCfg; }
|
||||
struct SConfig *taosGetCfg() {
|
||||
return tsCfg;
|
||||
}
|
||||
|
||||
static int32_t taosLoadCfg(SConfig *pCfg, const char **envCmd, const char *inputCfgDir, const char *envFile,
|
||||
char *apolloUrl) {
|
||||
|
@ -453,8 +455,8 @@ static int32_t taosAddClientCfg(SConfig *pCfg) {
|
|||
if (cfgAddBool(pCfg, "enableScience", tsEnableScience, CFG_SCOPE_CLIENT, CFG_DYN_NONE) != 0) return -1;
|
||||
if (cfgAddInt32(pCfg, "querySmaOptimize", tsQuerySmaOptimize, 0, 1, CFG_SCOPE_CLIENT, CFG_DYN_CLIENT) != 0) return -1;
|
||||
if (cfgAddBool(pCfg, "queryPlannerTrace", tsQueryPlannerTrace, CFG_SCOPE_CLIENT, CFG_DYN_CLIENT) != 0) return -1;
|
||||
if (cfgAddInt32(pCfg, "queryNodeChunkSize", tsQueryNodeChunkSize, 1024, 128 * 1024, CFG_SCOPE_CLIENT, CFG_DYN_CLIENT) !=
|
||||
0)
|
||||
if (cfgAddInt32(pCfg, "queryNodeChunkSize", tsQueryNodeChunkSize, 1024, 128 * 1024, CFG_SCOPE_CLIENT,
|
||||
CFG_DYN_CLIENT) != 0)
|
||||
return -1;
|
||||
if (cfgAddBool(pCfg, "queryUseNodeAllocator", tsQueryUseNodeAllocator, CFG_SCOPE_CLIENT, CFG_DYN_CLIENT) != 0)
|
||||
return -1;
|
||||
|
@ -470,7 +472,8 @@ static int32_t taosAddClientCfg(SConfig *pCfg) {
|
|||
// if (cfgAddInt32(pCfg, "smlBatchSize", tsSmlBatchSize, 1, INT32_MAX, CFG_SCOPE_CLIENT, CFG_DYN_NONE) != 0)
|
||||
// return -1;
|
||||
if (cfgAddInt32(pCfg, "maxShellConns", tsMaxShellConns, 10, 50000000, CFG_SCOPE_CLIENT, CFG_DYN_NONE) != 0) return -1;
|
||||
if (cfgAddInt32(pCfg, "maxInsertBatchRows", tsMaxInsertBatchRows, 1, INT32_MAX, CFG_SCOPE_CLIENT, CFG_DYN_CLIENT) != 0)
|
||||
if (cfgAddInt32(pCfg, "maxInsertBatchRows", tsMaxInsertBatchRows, 1, INT32_MAX, CFG_SCOPE_CLIENT, CFG_DYN_CLIENT) !=
|
||||
0)
|
||||
return -1;
|
||||
if (cfgAddInt32(pCfg, "maxRetryWaitTime", tsMaxRetryWaitTime, 0, 86400000, CFG_SCOPE_BOTH, CFG_DYN_CLIENT) != 0)
|
||||
return -1;
|
||||
|
@ -557,7 +560,8 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
|
|||
if (cfgAddInt32(pCfg, "supportVnodes", tsNumOfSupportVnodes, 0, 4096, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1;
|
||||
|
||||
if (cfgAddInt32(pCfg, "statusInterval", tsStatusInterval, 1, 30, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1;
|
||||
if (cfgAddInt32(pCfg, "minSlidingTime", tsMinSlidingTime, 1, 1000000, CFG_SCOPE_CLIENT, CFG_DYN_CLIENT) != 0) return -1;
|
||||
if (cfgAddInt32(pCfg, "minSlidingTime", tsMinSlidingTime, 1, 1000000, CFG_SCOPE_CLIENT, CFG_DYN_CLIENT) != 0)
|
||||
return -1;
|
||||
if (cfgAddInt32(pCfg, "minIntervalTime", tsMinIntervalTime, 1, 1000000, CFG_SCOPE_CLIENT, CFG_DYN_CLIENT) != 0)
|
||||
return -1;
|
||||
|
||||
|
@ -674,7 +678,8 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
|
|||
|
||||
if (cfgAddInt32(pCfg, "rsyncPort", tsRsyncPort, 1, 65535, CFG_SCOPE_BOTH, CFG_DYN_SERVER) != 0) return -1;
|
||||
if (cfgAddString(pCfg, "snodeAddress", tsSnodeAddress, CFG_SCOPE_SERVER, CFG_DYN_SERVER) != 0) return -1;
|
||||
if (cfgAddString(pCfg, "checkpointBackupDir", tsCheckpointBackupDir, CFG_SCOPE_SERVER, CFG_DYN_SERVER) != 0) return -1;
|
||||
if (cfgAddString(pCfg, "checkpointBackupDir", tsCheckpointBackupDir, CFG_SCOPE_SERVER, CFG_DYN_SERVER) != 0)
|
||||
return -1;
|
||||
|
||||
if (cfgAddInt32(pCfg, "tmqMaxTopicNum", tmqMaxTopicNum, 1, 10000, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != 0)
|
||||
return -1;
|
||||
|
@ -700,7 +705,8 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
|
|||
if (cfgAddInt32(pCfg, "uptimeInterval", tsUptimeInterval, 1, 100000, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1;
|
||||
if (cfgAddInt32(pCfg, "queryRsmaTolerance", tsQueryRsmaTolerance, 0, 900000, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0)
|
||||
return -1;
|
||||
if (cfgAddInt32(pCfg, "timeseriesThreshold", tsTimeSeriesThreshold, 0, 2000, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != 0)
|
||||
if (cfgAddInt32(pCfg, "timeseriesThreshold", tsTimeSeriesThreshold, 0, 2000, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) !=
|
||||
0)
|
||||
return -1;
|
||||
|
||||
if (cfgAddInt64(pCfg, "walFsyncDataSizeLimit", tsWalFsyncDataSizeLimit, 100 * 1024 * 1024, INT64_MAX,
|
||||
|
@ -743,7 +749,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
|
|||
if (cfgAddString(pCfg, "s3Accesskey", tsS3AccessKey, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1;
|
||||
if (cfgAddString(pCfg, "s3Endpoint", tsS3Endpoint, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1;
|
||||
if (cfgAddString(pCfg, "s3BucketName", tsS3BucketName, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1;
|
||||
if (cfgAddInt32(pCfg, "s3BlockSize", tsS3BlockSize, -100, 1024 * 1024, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0)
|
||||
if (cfgAddInt32(pCfg, "s3BlockSize", tsS3BlockSize, -1, 1024 * 1024, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != 0)
|
||||
return -1;
|
||||
if (cfgAddInt32(pCfg, "s3BlockCacheSize", tsS3BlockCacheSize, 4, 1024 * 1024, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) !=
|
||||
0)
|
||||
|
@ -1687,6 +1693,7 @@ void taosCfgDynamicOptions(const char *option, const char *value) {
|
|||
{"ttlBatchDropNum", &tsTtlBatchDropNum},
|
||||
{"ttlFlushThreshold", &tsTtlFlushThreshold},
|
||||
{"ttlPushInterval", &tsTtlPushIntervalSec},
|
||||
//{"s3BlockSize", &tsS3BlockSize},
|
||||
{"s3BlockCacheSize", &tsS3BlockCacheSize},
|
||||
{"s3PageCacheSize", &tsS3PageCacheSize},
|
||||
{"s3UploadDelaySec", &tsS3UploadDelaySec},
|
||||
|
@ -1710,8 +1717,8 @@ void taosCfgDynamicOptions(const char *option, const char *value) {
|
|||
|
||||
switch (pItem->dtype) {
|
||||
case CFG_DTYPE_BOOL: {
|
||||
int32_t flag = atoi(value);
|
||||
bool *pVar = options[d].optionVar;
|
||||
int32_t flag = atoi(value);
|
||||
bool *pVar = options[d].optionVar;
|
||||
uInfo("%s set from %d to %d", optName, *pVar, flag);
|
||||
*pVar = flag;
|
||||
} break;
|
||||
|
|
|
@ -3,6 +3,17 @@ add_library(dnode STATIC ${IMPLEMENT_SRC})
|
|||
target_link_libraries(
|
||||
dnode mgmt_mnode mgmt_qnode mgmt_snode mgmt_vnode mgmt_dnode
|
||||
)
|
||||
|
||||
IF (TD_STORAGE)
|
||||
|
||||
IF(${BUILD_WITH_S3})
|
||||
add_definitions(-DUSE_S3)
|
||||
ELSEIF(${BUILD_WITH_COS})
|
||||
add_definitions(-DUSE_COS)
|
||||
ENDIF()
|
||||
|
||||
ENDIF ()
|
||||
|
||||
target_include_directories(
|
||||
dnode
|
||||
PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc"
|
||||
|
|
|
@ -18,17 +18,17 @@
|
|||
#include "audit.h"
|
||||
#include "libs/function/tudf.h"
|
||||
|
||||
#define DM_INIT_AUDIT() \
|
||||
do { \
|
||||
auditCfg.port = tsMonitorPort; \
|
||||
auditCfg.server = tsMonitorFqdn; \
|
||||
auditCfg.comp = tsMonitorComp; \
|
||||
if (auditInit(&auditCfg) != 0) { \
|
||||
return -1; \
|
||||
} \
|
||||
#define DM_INIT_AUDIT() \
|
||||
do { \
|
||||
auditCfg.port = tsMonitorPort; \
|
||||
auditCfg.server = tsMonitorFqdn; \
|
||||
auditCfg.comp = tsMonitorComp; \
|
||||
if (auditInit(&auditCfg) != 0) { \
|
||||
return -1; \
|
||||
} \
|
||||
} while (0)
|
||||
|
||||
static SDnode globalDnode = {0};
|
||||
static SDnode globalDnode = {0};
|
||||
|
||||
SDnode *dmInstance() { return &globalDnode; }
|
||||
|
||||
|
@ -146,6 +146,13 @@ static bool dmCheckDataDirVersion() {
|
|||
return true;
|
||||
}
|
||||
|
||||
#if defined(USE_S3)
|
||||
|
||||
extern int32_t s3Begin();
|
||||
extern void s3End();
|
||||
|
||||
#endif
|
||||
|
||||
int32_t dmInit() {
|
||||
dInfo("start to init dnode env");
|
||||
if (dmDiskInit() != 0) return -1;
|
||||
|
@ -156,6 +163,9 @@ int32_t dmInit() {
|
|||
if (dmInitMonitor() != 0) return -1;
|
||||
if (dmInitAudit() != 0) return -1;
|
||||
if (dmInitDnode(dmInstance()) != 0) return -1;
|
||||
#if defined(USE_S3)
|
||||
if (s3Begin() != 0) return -1;
|
||||
#endif
|
||||
|
||||
dInfo("dnode env is initialized");
|
||||
return 0;
|
||||
|
@ -181,6 +191,9 @@ void dmCleanup() {
|
|||
udfStopUdfd();
|
||||
taosStopCacheRefreshWorker();
|
||||
dmDiskClose();
|
||||
#if defined(USE_S3)
|
||||
s3End();
|
||||
#endif
|
||||
dInfo("dnode env is cleaned up");
|
||||
|
||||
taosCleanupCfg();
|
||||
|
@ -265,19 +278,19 @@ static int32_t dmProcessAlterNodeTypeReq(EDndNodeType ntype, SRpcMsg *pMsg) {
|
|||
|
||||
pWrapper = &pDnode->wrappers[ntype];
|
||||
|
||||
if(pWrapper->func.nodeRoleFp != NULL){
|
||||
if (pWrapper->func.nodeRoleFp != NULL) {
|
||||
ESyncRole role = (*pWrapper->func.nodeRoleFp)(pWrapper->pMgmt);
|
||||
dInfo("node:%s, checking node role:%d", pWrapper->name, role);
|
||||
if(role == TAOS_SYNC_ROLE_VOTER){
|
||||
if (role == TAOS_SYNC_ROLE_VOTER) {
|
||||
dError("node:%s, failed to alter node type since node already is role:%d", pWrapper->name, role);
|
||||
terrno = TSDB_CODE_MNODE_ALREADY_IS_VOTER;
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
if(pWrapper->func.isCatchUpFp != NULL){
|
||||
if (pWrapper->func.isCatchUpFp != NULL) {
|
||||
dInfo("node:%s, checking node catch up", pWrapper->name);
|
||||
if((*pWrapper->func.isCatchUpFp)(pWrapper->pMgmt) != 1){
|
||||
if ((*pWrapper->func.isCatchUpFp)(pWrapper->pMgmt) != 1) {
|
||||
terrno = TSDB_CODE_MNODE_NOT_CATCH_UP;
|
||||
return -1;
|
||||
}
|
||||
|
@ -394,7 +407,4 @@ void dmReportStartup(const char *pName, const char *pDesc) {
|
|||
dDebug("step:%s, %s", pStartup->name, pStartup->desc);
|
||||
}
|
||||
|
||||
int64_t dmGetClusterId() {
|
||||
return globalDnode.data.clusterId;
|
||||
}
|
||||
|
||||
int64_t dmGetClusterId() { return globalDnode.data.clusterId; }
|
||||
|
|
|
@ -1310,6 +1310,22 @@ static int32_t mndProcessConfigDnodeReq(SRpcMsg *pReq) {
|
|||
}
|
||||
tFreeSMCfgDnodeReq(&cfgReq);
|
||||
return 0;
|
||||
} else if (strncasecmp(cfgReq.config, "s3blocksize", 11) == 0) {
|
||||
int32_t optLen = strlen("s3blocksize");
|
||||
int32_t flag = -1;
|
||||
int32_t code = mndMCfgGetValInt32(&cfgReq, optLen, &flag);
|
||||
if (code < 0) return code;
|
||||
|
||||
if (flag > 1024 * 1024 || (flag > -1 && flag < 4) || flag < -1) {
|
||||
mError("dnode:%d, failed to config s3blocksize since value:%d. Valid range: -1 or [4, 1024 * 1024]",
|
||||
cfgReq.dnodeId, flag);
|
||||
terrno = TSDB_CODE_INVALID_CFG;
|
||||
tFreeSMCfgDnodeReq(&cfgReq);
|
||||
return -1;
|
||||
}
|
||||
|
||||
strcpy(dcfgReq.config, "s3blocksize");
|
||||
snprintf(dcfgReq.value, TSDB_DNODE_VALUE_LEN, "%d", flag);
|
||||
#endif
|
||||
} else {
|
||||
mndMCfg2DCfg(&cfgReq, &dcfgReq);
|
||||
|
|
|
@ -3099,7 +3099,7 @@ static int32_t tsdbCacheLoadBlockS3(STsdbFD *pFD, uint8_t **ppBlock) {
|
|||
}
|
||||
*/
|
||||
int64_t block_offset = (pFD->blkno - 1) * tsS3BlockSize * pFD->szPage;
|
||||
code = s3GetObjectBlock(pFD->objName, block_offset, tsS3BlockSize * pFD->szPage, ppBlock);
|
||||
code = s3GetObjectBlock(pFD->objName, block_offset, tsS3BlockSize * pFD->szPage, 0, ppBlock);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
// taosMemoryFree(pBlock);
|
||||
// code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
|
|
|
@ -22,7 +22,7 @@
|
|||
|
||||
extern int vnodeScheduleTask(int (*execute)(void *), void *arg);
|
||||
extern int vnodeScheduleTaskEx(int tpid, int (*execute)(void *), void *arg);
|
||||
extern void remove_file(const char *fname);
|
||||
extern void remove_file(const char *fname, bool last_level);
|
||||
|
||||
#define TSDB_FS_EDIT_MIN TSDB_FEDIT_COMMIT
|
||||
#define TSDB_FS_EDIT_MAX (TSDB_FEDIT_MERGE + 1)
|
||||
|
@ -532,7 +532,8 @@ static int32_t tsdbFSDoSanAndFix(STFileSystem *fs) {
|
|||
if (taosIsDir(file->aname)) continue;
|
||||
|
||||
if (tsdbFSGetFileObjHashEntry(&fobjHash, file->aname) == NULL) {
|
||||
remove_file(file->aname);
|
||||
int32_t nlevel = tfsGetLevel(fs->tsdb->pVnode->pTfs);
|
||||
remove_file(file->aname, nlevel > 1 && file->did.level == nlevel - 1);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1282,4 +1283,4 @@ int32_t tsdbFSEnableBgTask(STFileSystem *fs) {
|
|||
fs->stop = false;
|
||||
taosThreadMutexUnlock(&fs->tsdb->mutex);
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -14,6 +14,7 @@
|
|||
*/
|
||||
|
||||
#include "tsdbFile2.h"
|
||||
#include "cos.h"
|
||||
|
||||
// to_json
|
||||
static int32_t head_to_json(const STFile *file, cJSON *json);
|
||||
|
@ -41,10 +42,20 @@ static const struct {
|
|||
[TSDB_FTYPE_STT] = {"stt", stt_to_json, stt_from_json},
|
||||
};
|
||||
|
||||
void remove_file(const char *fname) {
|
||||
void remove_file(const char *fname, bool last_level) {
|
||||
int32_t code = taosRemoveFile(fname);
|
||||
if (code) {
|
||||
tsdbError("file:%s remove failed", fname);
|
||||
if (tsS3Enabled && last_level) {
|
||||
const char *object_name = taosDirEntryBaseName((char *)fname);
|
||||
long s3_size = tsS3Enabled ? s3Size(object_name) : 0;
|
||||
if (!strncmp(fname + strlen(fname) - 5, ".data", 5) && s3_size > 0) {
|
||||
s3DeleteObjects(&object_name, 1);
|
||||
} else {
|
||||
tsdbError("file:%s remove failed", fname);
|
||||
}
|
||||
} else {
|
||||
tsdbError("file:%s remove failed", fname);
|
||||
}
|
||||
} else {
|
||||
tsdbInfo("file:%s is removed", fname);
|
||||
}
|
||||
|
@ -224,6 +235,7 @@ int32_t tsdbTFileObjInit(STsdb *pTsdb, const STFile *f, STFileObj **fobj) {
|
|||
fobj[0]->state = TSDB_FSTATE_LIVE;
|
||||
fobj[0]->ref = 1;
|
||||
tsdbTFileName(pTsdb, f, fobj[0]->fname);
|
||||
fobj[0]->nlevel = tfsGetLevel(pTsdb->pVnode->pTfs);
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -245,7 +257,7 @@ int32_t tsdbTFileObjUnref(STFileObj *fobj) {
|
|||
tsdbTrace("unref file %s, fobj:%p ref %d", fobj->fname, fobj, nRef);
|
||||
if (nRef == 0) {
|
||||
if (fobj->state == TSDB_FSTATE_DEAD) {
|
||||
remove_file(fobj->fname);
|
||||
remove_file(fobj->fname, fobj->nlevel > 1 && fobj->f->did.level == fobj->nlevel - 1);
|
||||
}
|
||||
taosMemoryFree(fobj);
|
||||
}
|
||||
|
@ -261,7 +273,7 @@ int32_t tsdbTFileObjRemove(STFileObj *fobj) {
|
|||
taosThreadMutexUnlock(&fobj->mutex);
|
||||
tsdbTrace("remove unref file %s, fobj:%p ref %d", fobj->fname, fobj, nRef);
|
||||
if (nRef == 0) {
|
||||
remove_file(fobj->fname);
|
||||
remove_file(fobj->fname, fobj->nlevel > 1 && fobj->f->did.level == fobj->nlevel - 1);
|
||||
taosMemoryFree(fobj);
|
||||
}
|
||||
return 0;
|
||||
|
|
|
@ -76,6 +76,7 @@ struct STFileObj {
|
|||
STFile f[1];
|
||||
int32_t state;
|
||||
int32_t ref;
|
||||
int32_t nlevel;
|
||||
char fname[TSDB_FILENAME_LEN];
|
||||
};
|
||||
|
||||
|
|
|
@ -340,7 +340,7 @@ static int32_t tsdbReadFileS3(STsdbFD *pFD, int64_t offset, uint8_t *pBuf, int64
|
|||
int64_t retrieve_offset = PAGE_OFFSET(pgno, pFD->szPage);
|
||||
int64_t pgnoEnd = pgno - 1 + (size - n + szPgCont - 1) / szPgCont;
|
||||
int64_t retrieve_size = (pgnoEnd - pgno + 1) * pFD->szPage;
|
||||
code = s3GetObjectBlock(pFD->objName, retrieve_offset, retrieve_size, &pBlock);
|
||||
code = s3GetObjectBlock(pFD->objName, retrieve_offset, retrieve_size, 1, &pBlock);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto _exit;
|
||||
}
|
||||
|
|
|
@ -13,9 +13,9 @@
|
|||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#include "cos.h"
|
||||
#include "tsdb.h"
|
||||
#include "tsdbFS2.h"
|
||||
#include "cos.h"
|
||||
#include "vnd.h"
|
||||
|
||||
typedef struct {
|
||||
|
@ -292,15 +292,15 @@ static int32_t tsdbDoRetentionOnFileSet(SRTNer *rtner, STFileSet *fset) {
|
|||
if (expLevel < 0) { // remove the fileset
|
||||
for (int32_t ftype = 0; (ftype < TSDB_FTYPE_MAX) && (fobj = fset->farr[ftype], 1); ++ftype) {
|
||||
if (fobj == NULL) continue;
|
||||
|
||||
/*
|
||||
int32_t nlevel = tfsGetLevel(rtner->tsdb->pVnode->pTfs);
|
||||
if (tsS3Enabled && nlevel > 1 && TSDB_FTYPE_DATA == ftype && fobj->f->did.level == nlevel - 1) {
|
||||
code = tsdbRemoveFileObjectS3(rtner, fobj);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
} else {
|
||||
code = tsdbDoRemoveFileObject(rtner, fobj);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
} else {*/
|
||||
code = tsdbDoRemoveFileObject(rtner, fobj);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
//}
|
||||
}
|
||||
|
||||
SSttLvl *lvl;
|
||||
|
|
Loading…
Reference in New Issue