fix:test s3 interface in checkpoint

This commit is contained in:
wangmm0220 2023-11-07 11:06:50 +08:00
parent dffaac55ea
commit e0b55cc61f
6 changed files with 100 additions and 27 deletions

View File

@ -83,7 +83,8 @@ extern int32_t tsHeartbeatTimeout;
extern int64_t tsVndCommitMaxIntervalMs;
// snode
extern char tsSnodeIp[];
extern char tsSnodeAddress[]; //127.0.0.1:873
extern int32_t tsRsyncPort;
extern char tsCheckpointBackupDir[];
// mnode

View File

@ -141,7 +141,7 @@ void startRsync(){
}
char cmd[PATH_MAX] = {0};
snprintf(cmd, PATH_MAX, "rsync --daemon --config=%s", confDir);
snprintf(cmd, PATH_MAX, "rsync --daemon --port=%d --config=%s", tsRsyncPort, confDir);
// start rsync service to backup checkpoint
code = system(cmd);
if(code != 0){
@ -168,7 +168,7 @@ int uploadRsync(char* id, char* path){
#else
path
#endif
, tsSnodeIp, id);
, tsSnodeAddress, id);
}else{
snprintf(command, PATH_MAX, "rsync -av --delete --timeout=10 --bwlimit=100000 %s rsync://%s/checkpoint/%s/",
#ifdef WINDOWS
@ -176,7 +176,7 @@ int uploadRsync(char* id, char* path){
#else
path
#endif
, tsSnodeIp, id);
, tsSnodeAddress, id);
}
int code = execCommand(command);
@ -195,7 +195,7 @@ int downloadRsync(char* id, char* path){
#endif
char command[PATH_MAX] = {0};
snprintf(command, PATH_MAX, "rsync -av --timeout=10 --bwlimit=100000 rsync://%s/checkpoint/%s/ %s",
tsSnodeIp, id,
tsSnodeAddress, id,
#ifdef WINDOWS
pathTransform
#else
@ -221,7 +221,7 @@ int deleteRsync(char* id){
}
char command[PATH_MAX] = {0};
snprintf(command, PATH_MAX, "rsync -av --delete --timeout=10 %s rsync://%s/checkpoint/%s/",
tmp, tsSnodeIp, id);
tmp, tsSnodeAddress, id);
code = execCommand(command);
taosRemoveDir(tmp);

View File

@ -134,7 +134,8 @@ char tsSmlAutoChildTableNameDelimiter[TSDB_TABLE_NAME_LEN] = "";
// int32_t tsSmlBatchSize = 10000;
// checkpoint backup
char tsSnodeIp[TSDB_FQDN_LEN] = {0};
char tsSnodeAddress[TSDB_FQDN_LEN] = {0};
int32_t tsRsyncPort = 873;
#ifdef WINDOWS
char tsCheckpointBackupDir[PATH_MAX] = "C:\\TDengine\\data\\backup\\checkpoint\\";
#else
@ -661,7 +662,8 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
if (cfgAddString(pCfg, "telemetryServer", tsTelemServer, CFG_SCOPE_BOTH) != 0) return -1;
if (cfgAddInt32(pCfg, "telemetryPort", tsTelemPort, 1, 65056, CFG_SCOPE_BOTH) != 0) return -1;
if (cfgAddString(pCfg, "snodeIp", tsSnodeIp, CFG_SCOPE_SERVER) != 0) return -1;
if (cfgAddInt32(pCfg, "snodeRsyncPort", tsRsyncPort, 1, 65535, CFG_SCOPE_BOTH) != 0) return -1;
if (cfgAddString(pCfg, "snodeRsyncAddress", tsSnodeAddress, CFG_SCOPE_SERVER) != 0) return -1;
if (cfgAddString(pCfg, "checkpointBackupDir", tsCheckpointBackupDir, CFG_SCOPE_SERVER) != 0) return -1;
if (cfgAddInt32(pCfg, "tmqMaxTopicNum", tmqMaxTopicNum, 1, 10000, CFG_SCOPE_SERVER) != 0) return -1;
@ -1087,8 +1089,9 @@ static int32_t taosSetServerCfg(SConfig *pCfg) {
tsTtlChangeOnWrite = cfgGetItem(pCfg, "ttlChangeOnWrite")->bval;
tsTtlFlushThreshold = cfgGetItem(pCfg, "ttlFlushThreshold")->i32;
tsTelemInterval = cfgGetItem(pCfg, "telemetryInterval")->i32;
tsRsyncPort = cfgGetItem(pCfg, "rsyncPort")->i32;
tstrncpy(tsTelemServer, cfgGetItem(pCfg, "telemetryServer")->str, TSDB_FQDN_LEN);
tstrncpy(tsSnodeIp, cfgGetItem(pCfg, "snodeIp")->str, TSDB_FQDN_LEN);
tstrncpy(tsSnodeAddress, cfgGetItem(pCfg, "snodeAddress")->str, TSDB_FQDN_LEN);
tstrncpy(tsCheckpointBackupDir, cfgGetItem(pCfg, "checkpointBackupDir")->str, PATH_MAX);
tsTelemPort = (uint16_t)cfgGetItem(pCfg, "telemetryPort")->i32;
@ -1676,6 +1679,9 @@ int32_t taosInitCfg(const char *cfgDir, const char **envCmd, const char *envFile
return -1;
}
if (taosSetS3Cfg(tsCfg) != 0) return -1;
return 0;
if (tsc) {
if (taosSetClientCfg(tsCfg)) return -1;
} else {

View File

@ -148,10 +148,16 @@ int32_t streamQueueGetItemSize(const SStreamQueue* pQueue);
// char name[CHECKPOINT_PATH_LEN];
// char id[CHECKPOINT_PATH_LEN];
//} SChekpointDataHeader;
typedef enum UPLOAD_TYPE{
UPLOAD_DISABLE = -1,
UPLOAD_S3 = 0,
UPLOAD_RSYNC = 1,
} UPLOAD_TYPE;
UPLOAD_TYPE getUploadType();
int uploadCheckpoint(char* id, char* path);
int downloadCheckpoint(char* id, char* path);
int deleteCheckpoint(char* id);
int deleteCheckpointFile(char* id, char* name);
#ifdef __cplusplus
}

View File

@ -15,6 +15,7 @@
#include "streamInt.h"
#include "rsync.h"
#include "cos.h"
int32_t tEncodeStreamCheckpointSourceReq(SEncoder* pEncoder, const SStreamCheckpointSourceReq* pReq) {
if (tStartEncode(pEncoder) < 0) return -1;
@ -466,15 +467,55 @@ int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) {
//}
static int uploadCheckpointToS3(char* id, char* path){
TdDirPtr pDir = taosOpenDir(path);
if (pDir == NULL) return -1;
TdDirEntryPtr de = NULL;
while ((de = taosReadDir(pDir)) != NULL) {
char* name = taosGetDirEntryName(de);
if (strcmp(name, ".") == 0 || strcmp(name, "..") == 0 ||
taosDirEntryIsDir(de)) continue;
char filename[PATH_MAX] = {0};
if(path[strlen(path - 1)] == '/'){
snprintf(filename, sizeof(filename), "%s%s", path, name);
}else{
snprintf(filename, sizeof(filename), "%s%s%s", path, TD_DIRSEP, name);
}
char object[PATH_MAX] = {0};
snprintf(object, sizeof(object), "%s%s%s", id, TD_DIRSEP, name);
if(s3PutObjectFromFile2(filename, object) != 0){
taosCloseDir(&pDir);
return -1;
}
}
taosCloseDir(&pDir);
return 0;
}
UPLOAD_TYPE getUploadType(){
if(strlen(tsSnodeAddress) != 0){
return UPLOAD_RSYNC;
}else if(tsS3StreamEnabled){
return UPLOAD_S3;
}else{
return UPLOAD_DISABLE;
}
}
int uploadCheckpoint(char* id, char* path){
if(id == NULL || path == NULL || strlen(id) == 0 || strlen(path) == 0 || strlen(path) >= PATH_MAX){
stError("uploadCheckpoint parameters invalid");
return -1;
}
if(strlen(tsSnodeIp) != 0){
uploadRsync(id, path);
// }else if(tsS3StreamEnabled){
if(strlen(tsSnodeAddress) != 0){
return uploadRsync(id, path);
}else if(tsS3StreamEnabled){
return uploadCheckpointToS3(id, path);
}
return 0;
}
@ -484,9 +525,9 @@ int downloadCheckpoint(char* id, char* path){
stError("downloadCheckpoint parameters invalid");
return -1;
}
if(strlen(tsSnodeIp) != 0){
downloadRsync(id, path);
// }else if(tsS3StreamEnabled){
if(strlen(tsSnodeAddress) != 0){
return downloadRsync(id, path);
}else if(tsS3StreamEnabled){
}
return 0;
@ -497,10 +538,17 @@ int deleteCheckpoint(char* id){
stError("deleteCheckpoint parameters invalid");
return -1;
}
if(strlen(tsSnodeIp) != 0){
deleteRsync(id);
// }else if(tsS3StreamEnabled){
if(strlen(tsSnodeAddress) != 0){
return deleteRsync(id);
}else if(tsS3StreamEnabled){
s3DeleteObjectsByPrefix(id);
}
return 0;
}
int deleteCheckpointFile(char* id, char* name){
char object[128] = {0};
snprintf(object, sizeof(object), "%s%s%s", id, TD_DIRSEP, name);
s3DeleteObjects((const char**)&object, 1);
return 0;
}

View File

@ -27,30 +27,42 @@
#include "rsync.h"
#include "streamInt.h"
#include "cos.h"
int main(int argc, char **argv) {
testing::InitGoogleTest(&argc, argv);
strcpy(tsSnodeIp, "127.0.0.1");
if (taosInitCfg("/etc/taos/", NULL, NULL, NULL, NULL, 0) != 0) {
printf("error");
}
if (s3Init() < 0) {
return -1;
}
// strcpy(tsSnodeIp, "127.0.0.1");
return RUN_ALL_TESTS();
}
TEST(testCase, checkpointUpload_Test) {
stopRsync();
startRsync();
// stopRsync();
// startRsync();
taosSsleep(5);
char* id = "2013892036";
uploadCheckpoint(id, "/Users/mingmingwanng/rsync/");
uploadCheckpoint(id, "/root/offset/");
}
TEST(testCase, checkpointDownload_Test) {
char* id = "2013892036";
downloadRsync(id, "/Users/mingmingwanng/rsync/tmp");
downloadCheckpoint(id, "/Users/mingmingwanng/rsync/tmp");
}
TEST(testCase, checkpointDelete_Test) {
char* id = "2013892036";
deleteRsync(id);
deleteCheckpoint(id);
}
TEST(testCase, checkpointDeleteFile_Test) {
char* id = "2013892036";
deleteCheckpointFile(id, "offset-ver0");
}