fix:upload checkpoint to s3
This commit is contained in:
parent
dffaac55ea
commit
f0b2ed567c
|
@ -39,6 +39,7 @@ void s3DeleteObjectsByPrefix(const char *prefix);
|
|||
void s3DeleteObjects(const char *object_name[], int nobject);
|
||||
bool s3Exists(const char *object_name);
|
||||
bool s3Get(const char *object_name, const char *path);
|
||||
int32_t s3GetObjectsByPrefix(const char *prefix, const char* path);
|
||||
int32_t s3GetObjectBlock(const char *object_name, int64_t offset, int64_t size, uint8_t **ppBlock);
|
||||
void s3EvictCache(const char *path, long object_size);
|
||||
long s3Size(const char *object_name);
|
||||
|
|
|
@ -83,7 +83,8 @@ extern int32_t tsHeartbeatTimeout;
|
|||
extern int64_t tsVndCommitMaxIntervalMs;
|
||||
|
||||
// snode
|
||||
extern char tsSnodeIp[];
|
||||
extern char tsSnodeAddress[]; //127.0.0.1:873
|
||||
extern int32_t tsRsyncPort;
|
||||
extern char tsCheckpointBackupDir[];
|
||||
|
||||
// mnode
|
||||
|
|
|
@ -72,6 +72,12 @@ static void s3PrintError(const char *func, S3Status status, char error_details[]
|
|||
}
|
||||
}
|
||||
|
||||
typedef struct {
|
||||
char err_msg[128];
|
||||
S3Status status;
|
||||
TdFilePtr file;
|
||||
} TS3GetData;
|
||||
|
||||
typedef struct {
|
||||
char err_msg[128];
|
||||
S3Status status;
|
||||
|
@ -659,7 +665,7 @@ static void s3FreeObjectKey(void *pItem) {
|
|||
taosMemoryFree(key);
|
||||
}
|
||||
|
||||
void s3DeleteObjectsByPrefix(const char *prefix) {
|
||||
static SArray* getListByPrefix(const char *prefix){
|
||||
S3BucketContext bucketContext = {0, tsS3BucketName, protocolG, uriStyleG, tsS3AccessKeyId, tsS3AccessKeySecret,
|
||||
0, awsRegionG};
|
||||
S3ListBucketHandler listBucketHandler = {{&responsePropertiesCallback, &responseCompleteCallback},
|
||||
|
@ -671,7 +677,7 @@ void s3DeleteObjectsByPrefix(const char *prefix) {
|
|||
data.objectArray = taosArrayInit(32, sizeof(void*));
|
||||
if (!data.objectArray) {
|
||||
uError("%s: %s", __func__, "out of memoty");
|
||||
return;
|
||||
return NULL;
|
||||
}
|
||||
if (marker) {
|
||||
snprintf(data.nextMarker, sizeof(data.nextMarker), "%s", marker);
|
||||
|
@ -694,18 +700,15 @@ void s3DeleteObjectsByPrefix(const char *prefix) {
|
|||
|
||||
if (data.status == S3StatusOK) {
|
||||
if (data.keyCount > 0) {
|
||||
// printListBucketHeader(allDetails);
|
||||
s3DeleteObjects(TARRAY_DATA(data.objectArray), TARRAY_SIZE(data.objectArray));
|
||||
return data.objectArray;
|
||||
}
|
||||
} else {
|
||||
s3PrintError(__func__, data.status, data.err_msg);
|
||||
}
|
||||
|
||||
taosArrayDestroyEx(data.objectArray, s3FreeObjectKey);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
void s3DeleteObjects(const char *object_name[], int nobject) {
|
||||
int status = 0;
|
||||
S3BucketContext bucketContext = {0, tsS3BucketName, protocolG, uriStyleG, tsS3AccessKeyId, tsS3AccessKeySecret,
|
||||
0, awsRegionG};
|
||||
S3ResponseHandler responseHandler = {0, &responseCompleteCallback};
|
||||
|
@ -722,6 +725,13 @@ void s3DeleteObjects(const char *object_name[], int nobject) {
|
|||
}
|
||||
}
|
||||
|
||||
void s3DeleteObjectsByPrefix(const char *prefix) {
|
||||
SArray* objectArray = getListByPrefix(prefix);
|
||||
if(objectArray == NULL)return;
|
||||
s3DeleteObjects(TARRAY_DATA(objectArray), TARRAY_SIZE(objectArray));
|
||||
taosArrayDestroyEx(objectArray, s3FreeObjectKey);
|
||||
}
|
||||
|
||||
static S3Status getObjectDataCallback(int bufferSize, const char *buffer, void *callbackData) {
|
||||
TS3SizeCBD *cbd = callbackData;
|
||||
if (cbd->content_length != bufferSize) {
|
||||
|
@ -768,6 +778,58 @@ int32_t s3GetObjectBlock(const char *object_name, int64_t offset, int64_t size,
|
|||
return 0;
|
||||
}
|
||||
|
||||
static S3Status getObjectCallback(int bufferSize, const char *buffer, void *callbackData) {
|
||||
TS3GetData *cbd = (TS3GetData *) callbackData;
|
||||
size_t wrote = taosWriteFile(cbd->file, buffer, bufferSize);
|
||||
return ((wrote < (size_t) bufferSize) ?
|
||||
S3StatusAbortedByCallback : S3StatusOK);
|
||||
}
|
||||
|
||||
int32_t s3GetObjectToFile(const char *object_name, char* fileName) {
|
||||
int64_t ifModifiedSince = -1, ifNotModifiedSince = -1;
|
||||
const char *ifMatch = 0, *ifNotMatch = 0;
|
||||
|
||||
S3BucketContext bucketContext = {0, tsS3BucketName, protocolG, uriStyleG, tsS3AccessKeyId, tsS3AccessKeySecret,
|
||||
0, awsRegionG};
|
||||
S3GetConditions getConditions = {ifModifiedSince, ifNotModifiedSince, ifMatch, ifNotMatch};
|
||||
S3GetObjectHandler getObjectHandler = {{NULL, &responseCompleteCallback},
|
||||
&getObjectCallback};
|
||||
|
||||
TdFilePtr pFile = taosOpenFile(fileName, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC);
|
||||
if (pFile == NULL) {
|
||||
uError("[s3] open file error, errno:%d, fileName:%s", errno, fileName);
|
||||
return -1;
|
||||
}
|
||||
TS3GetData cbd = {0};
|
||||
cbd.file = pFile;
|
||||
do {
|
||||
S3_get_object(&bucketContext, object_name, &getConditions, 0, 0, 0, 0, &getObjectHandler, &cbd);
|
||||
} while (S3_status_is_retryable(cbd.status) && should_retry());
|
||||
|
||||
if (cbd.status != S3StatusOK) {
|
||||
uError("%s: %d(%s)", __func__, cbd.status, cbd.err_msg);
|
||||
taosCloseFile(&pFile);
|
||||
return TAOS_SYSTEM_ERROR(EIO);
|
||||
}
|
||||
|
||||
taosCloseFile(&pFile);
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t s3GetObjectsByPrefix(const char *prefix, const char* path){
|
||||
SArray* objectArray = getListByPrefix(prefix);
|
||||
if(objectArray == NULL) return -1;
|
||||
|
||||
for(size_t i = 0; i < taosArrayGetSize(objectArray); i++){
|
||||
char* object = taosArrayGetP(objectArray, i);
|
||||
char fileName[PATH_MAX] = {0};
|
||||
snprintf(fileName, PATH_MAX, "%s/%s", path, object);
|
||||
s3GetObjectToFile(object, fileName);
|
||||
}
|
||||
taosArrayDestroyEx(objectArray, s3FreeObjectKey);
|
||||
return 0;
|
||||
}
|
||||
|
||||
long s3Size(const char *object_name) {
|
||||
long size = 0;
|
||||
int status = 0;
|
||||
|
|
|
@ -141,7 +141,7 @@ void startRsync(){
|
|||
}
|
||||
|
||||
char cmd[PATH_MAX] = {0};
|
||||
snprintf(cmd, PATH_MAX, "rsync --daemon --config=%s", confDir);
|
||||
snprintf(cmd, PATH_MAX, "rsync --daemon --port=%d --config=%s", tsRsyncPort, confDir);
|
||||
// start rsync service to backup checkpoint
|
||||
code = system(cmd);
|
||||
if(code != 0){
|
||||
|
@ -168,7 +168,7 @@ int uploadRsync(char* id, char* path){
|
|||
#else
|
||||
path
|
||||
#endif
|
||||
, tsSnodeIp, id);
|
||||
, tsSnodeAddress, id);
|
||||
}else{
|
||||
snprintf(command, PATH_MAX, "rsync -av --delete --timeout=10 --bwlimit=100000 %s rsync://%s/checkpoint/%s/",
|
||||
#ifdef WINDOWS
|
||||
|
@ -176,7 +176,7 @@ int uploadRsync(char* id, char* path){
|
|||
#else
|
||||
path
|
||||
#endif
|
||||
, tsSnodeIp, id);
|
||||
, tsSnodeAddress, id);
|
||||
}
|
||||
|
||||
int code = execCommand(command);
|
||||
|
@ -195,7 +195,7 @@ int downloadRsync(char* id, char* path){
|
|||
#endif
|
||||
char command[PATH_MAX] = {0};
|
||||
snprintf(command, PATH_MAX, "rsync -av --timeout=10 --bwlimit=100000 rsync://%s/checkpoint/%s/ %s",
|
||||
tsSnodeIp, id,
|
||||
tsSnodeAddress, id,
|
||||
#ifdef WINDOWS
|
||||
pathTransform
|
||||
#else
|
||||
|
@ -221,7 +221,7 @@ int deleteRsync(char* id){
|
|||
}
|
||||
char command[PATH_MAX] = {0};
|
||||
snprintf(command, PATH_MAX, "rsync -av --delete --timeout=10 %s rsync://%s/checkpoint/%s/",
|
||||
tmp, tsSnodeIp, id);
|
||||
tmp, tsSnodeAddress, id);
|
||||
|
||||
code = execCommand(command);
|
||||
taosRemoveDir(tmp);
|
||||
|
|
|
@ -134,7 +134,8 @@ char tsSmlAutoChildTableNameDelimiter[TSDB_TABLE_NAME_LEN] = "";
|
|||
// int32_t tsSmlBatchSize = 10000;
|
||||
|
||||
// checkpoint backup
|
||||
char tsSnodeIp[TSDB_FQDN_LEN] = {0};
|
||||
char tsSnodeAddress[TSDB_FQDN_LEN] = {0};
|
||||
int32_t tsRsyncPort = 873;
|
||||
#ifdef WINDOWS
|
||||
char tsCheckpointBackupDir[PATH_MAX] = "C:\\TDengine\\data\\backup\\checkpoint\\";
|
||||
#else
|
||||
|
@ -661,7 +662,8 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
|
|||
if (cfgAddString(pCfg, "telemetryServer", tsTelemServer, CFG_SCOPE_BOTH) != 0) return -1;
|
||||
if (cfgAddInt32(pCfg, "telemetryPort", tsTelemPort, 1, 65056, CFG_SCOPE_BOTH) != 0) return -1;
|
||||
|
||||
if (cfgAddString(pCfg, "snodeIp", tsSnodeIp, CFG_SCOPE_SERVER) != 0) return -1;
|
||||
if (cfgAddInt32(pCfg, "snodeRsyncPort", tsRsyncPort, 1, 65535, CFG_SCOPE_BOTH) != 0) return -1;
|
||||
if (cfgAddString(pCfg, "snodeRsyncAddress", tsSnodeAddress, CFG_SCOPE_SERVER) != 0) return -1;
|
||||
if (cfgAddString(pCfg, "checkpointBackupDir", tsCheckpointBackupDir, CFG_SCOPE_SERVER) != 0) return -1;
|
||||
|
||||
if (cfgAddInt32(pCfg, "tmqMaxTopicNum", tmqMaxTopicNum, 1, 10000, CFG_SCOPE_SERVER) != 0) return -1;
|
||||
|
@ -1087,8 +1089,9 @@ static int32_t taosSetServerCfg(SConfig *pCfg) {
|
|||
tsTtlChangeOnWrite = cfgGetItem(pCfg, "ttlChangeOnWrite")->bval;
|
||||
tsTtlFlushThreshold = cfgGetItem(pCfg, "ttlFlushThreshold")->i32;
|
||||
tsTelemInterval = cfgGetItem(pCfg, "telemetryInterval")->i32;
|
||||
tsRsyncPort = cfgGetItem(pCfg, "rsyncPort")->i32;
|
||||
tstrncpy(tsTelemServer, cfgGetItem(pCfg, "telemetryServer")->str, TSDB_FQDN_LEN);
|
||||
tstrncpy(tsSnodeIp, cfgGetItem(pCfg, "snodeIp")->str, TSDB_FQDN_LEN);
|
||||
tstrncpy(tsSnodeAddress, cfgGetItem(pCfg, "snodeAddress")->str, TSDB_FQDN_LEN);
|
||||
tstrncpy(tsCheckpointBackupDir, cfgGetItem(pCfg, "checkpointBackupDir")->str, PATH_MAX);
|
||||
tsTelemPort = (uint16_t)cfgGetItem(pCfg, "telemetryPort")->i32;
|
||||
|
||||
|
@ -1676,6 +1679,9 @@ int32_t taosInitCfg(const char *cfgDir, const char **envCmd, const char *envFile
|
|||
return -1;
|
||||
}
|
||||
|
||||
if (taosSetS3Cfg(tsCfg) != 0) return -1;
|
||||
return 0;
|
||||
|
||||
if (tsc) {
|
||||
if (taosSetClientCfg(tsCfg)) return -1;
|
||||
} else {
|
||||
|
|
|
@ -148,10 +148,16 @@ int32_t streamQueueGetItemSize(const SStreamQueue* pQueue);
|
|||
// char name[CHECKPOINT_PATH_LEN];
|
||||
// char id[CHECKPOINT_PATH_LEN];
|
||||
//} SChekpointDataHeader;
|
||||
|
||||
typedef enum UPLOAD_TYPE{
|
||||
UPLOAD_DISABLE = -1,
|
||||
UPLOAD_S3 = 0,
|
||||
UPLOAD_RSYNC = 1,
|
||||
} UPLOAD_TYPE;
|
||||
UPLOAD_TYPE getUploadType();
|
||||
int uploadCheckpoint(char* id, char* path);
|
||||
int downloadCheckpoint(char* id, char* path);
|
||||
int deleteCheckpoint(char* id);
|
||||
int deleteCheckpointFile(char* id, char* name);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
|
|
|
@ -15,6 +15,7 @@
|
|||
|
||||
#include "streamInt.h"
|
||||
#include "rsync.h"
|
||||
#include "cos.h"
|
||||
|
||||
int32_t tEncodeStreamCheckpointSourceReq(SEncoder* pEncoder, const SStreamCheckpointSourceReq* pReq) {
|
||||
if (tStartEncode(pEncoder) < 0) return -1;
|
||||
|
@ -466,15 +467,55 @@ int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) {
|
|||
|
||||
//}
|
||||
|
||||
static int uploadCheckpointToS3(char* id, char* path){
|
||||
TdDirPtr pDir = taosOpenDir(path);
|
||||
if (pDir == NULL) return -1;
|
||||
|
||||
TdDirEntryPtr de = NULL;
|
||||
while ((de = taosReadDir(pDir)) != NULL) {
|
||||
char* name = taosGetDirEntryName(de);
|
||||
if (strcmp(name, ".") == 0 || strcmp(name, "..") == 0 ||
|
||||
taosDirEntryIsDir(de)) continue;
|
||||
|
||||
char filename[PATH_MAX] = {0};
|
||||
if(path[strlen(path - 1)] == '/'){
|
||||
snprintf(filename, sizeof(filename), "%s%s", path, name);
|
||||
}else{
|
||||
snprintf(filename, sizeof(filename), "%s%s%s", path, TD_DIRSEP, name);
|
||||
}
|
||||
|
||||
char object[PATH_MAX] = {0};
|
||||
snprintf(object, sizeof(object), "%s%s%s", id, TD_DIRSEP, name);
|
||||
|
||||
if(s3PutObjectFromFile2(filename, object) != 0){
|
||||
taosCloseDir(&pDir);
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
taosCloseDir(&pDir);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
UPLOAD_TYPE getUploadType(){
|
||||
if(strlen(tsSnodeAddress) != 0){
|
||||
return UPLOAD_RSYNC;
|
||||
}else if(tsS3StreamEnabled){
|
||||
return UPLOAD_S3;
|
||||
}else{
|
||||
return UPLOAD_DISABLE;
|
||||
}
|
||||
}
|
||||
|
||||
int uploadCheckpoint(char* id, char* path){
|
||||
if(id == NULL || path == NULL || strlen(id) == 0 || strlen(path) == 0 || strlen(path) >= PATH_MAX){
|
||||
stError("uploadCheckpoint parameters invalid");
|
||||
return -1;
|
||||
}
|
||||
if(strlen(tsSnodeIp) != 0){
|
||||
uploadRsync(id, path);
|
||||
// }else if(tsS3StreamEnabled){
|
||||
|
||||
if(strlen(tsSnodeAddress) != 0){
|
||||
return uploadRsync(id, path);
|
||||
}else if(tsS3StreamEnabled){
|
||||
return uploadCheckpointToS3(id, path);
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
@ -484,10 +525,10 @@ int downloadCheckpoint(char* id, char* path){
|
|||
stError("downloadCheckpoint parameters invalid");
|
||||
return -1;
|
||||
}
|
||||
if(strlen(tsSnodeIp) != 0){
|
||||
downloadRsync(id, path);
|
||||
// }else if(tsS3StreamEnabled){
|
||||
|
||||
if(strlen(tsSnodeAddress) != 0){
|
||||
return downloadRsync(id, path);
|
||||
}else if(tsS3StreamEnabled){
|
||||
return s3GetObjectsByPrefix(id, path);
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
@ -497,10 +538,18 @@ int deleteCheckpoint(char* id){
|
|||
stError("deleteCheckpoint parameters invalid");
|
||||
return -1;
|
||||
}
|
||||
if(strlen(tsSnodeIp) != 0){
|
||||
deleteRsync(id);
|
||||
// }else if(tsS3StreamEnabled){
|
||||
|
||||
if(strlen(tsSnodeAddress) != 0){
|
||||
return deleteRsync(id);
|
||||
}else if(tsS3StreamEnabled){
|
||||
s3DeleteObjectsByPrefix(id);
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
int deleteCheckpointFile(char* id, char* name){
|
||||
char object[128] = {0};
|
||||
snprintf(object, sizeof(object), "%s%s%s", id, TD_DIRSEP, name);
|
||||
char *tmp = object;
|
||||
s3DeleteObjects((const char**)&tmp, 1);
|
||||
return 0;
|
||||
}
|
||||
|
|
|
@ -27,30 +27,42 @@
|
|||
|
||||
#include "rsync.h"
|
||||
#include "streamInt.h"
|
||||
#include "cos.h"
|
||||
|
||||
int main(int argc, char **argv) {
|
||||
testing::InitGoogleTest(&argc, argv);
|
||||
|
||||
strcpy(tsSnodeIp, "127.0.0.1");
|
||||
if (taosInitCfg("/etc/taos/", NULL, NULL, NULL, NULL, 0) != 0) {
|
||||
printf("error");
|
||||
}
|
||||
if (s3Init() < 0) {
|
||||
return -1;
|
||||
}
|
||||
// strcpy(tsSnodeIp, "127.0.0.1");
|
||||
return RUN_ALL_TESTS();
|
||||
}
|
||||
|
||||
TEST(testCase, checkpointUpload_Test) {
|
||||
stopRsync();
|
||||
startRsync();
|
||||
// stopRsync();
|
||||
// startRsync();
|
||||
|
||||
taosSsleep(5);
|
||||
char* id = "2013892036";
|
||||
|
||||
uploadCheckpoint(id, "/Users/mingmingwanng/rsync/");
|
||||
uploadCheckpoint(id, "/root/offset/");
|
||||
}
|
||||
|
||||
TEST(testCase, checkpointDownload_Test) {
|
||||
char* id = "2013892036";
|
||||
downloadRsync(id, "/Users/mingmingwanng/rsync/tmp");
|
||||
downloadCheckpoint(id, "/root/offset/download/");
|
||||
}
|
||||
|
||||
TEST(testCase, checkpointDelete_Test) {
|
||||
char* id = "2013892036";
|
||||
deleteRsync(id);
|
||||
deleteCheckpoint(id);
|
||||
}
|
||||
|
||||
TEST(testCase, checkpointDeleteFile_Test) {
|
||||
char* id = "2013892036";
|
||||
deleteCheckpointFile(id, "offset-ver0");
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue