fix:modify upload checkpoint interface

This commit is contained in:
wangmm0220 2023-11-03 11:33:18 +08:00
parent 159873692c
commit 3ac11eba0e
5 changed files with 19 additions and 22 deletions

View File

@ -13,7 +13,7 @@ extern "C" {
void stopRsync(); void stopRsync();
void startRsync(); void startRsync();
int uploadRsync(char* id, SArray* fileList); int uploadRsync(char* id, char* path);
int downloadRsync(char* id, char* path); int downloadRsync(char* id, char* path);
int deleteRsync(char* id); int deleteRsync(char* id);

View File

@ -144,7 +144,7 @@ int32_t streamQueueGetItemSize(const SStreamQueue* pQueue);
// char id[CHECKPOINT_PATH_LEN]; // char id[CHECKPOINT_PATH_LEN];
//} SChekpointDataHeader; //} SChekpointDataHeader;
int uploadCheckpoint(char* id, SArray* fileList); int uploadCheckpoint(char* id, char* path);
int downloadCheckpoint(char* id, char* path); int downloadCheckpoint(char* id, char* path);
int deleteCheckpoint(char* id); int deleteCheckpoint(char* id);

View File

@ -451,13 +451,13 @@ int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) {
//} //}
int uploadCheckpoint(char* id, SArray* fileList){ int uploadCheckpoint(char* id, char* path){
if(id == NULL || fileList == NULL || strlen(id) == 0 || taosArrayGetSize(fileList) == 0){ if(id == NULL || path == NULL || strlen(id) == 0 || strlen(path) == 0){
stError("uploadCheckpoint parameters invalid"); stError("uploadCheckpoint parameters invalid");
return -1; return -1;
} }
if(strlen(tsSnodeIp) != 0){ if(strlen(tsSnodeIp) != 0){
uploadRsync(id, fileList); uploadRsync(id, path);
// }else if(tsS3StreamEnabled){ // }else if(tsS3StreamEnabled){
} }

View File

@ -40,14 +40,9 @@ TEST(testCase, checkpointUpload_Test) {
startRsync(); startRsync();
taosSsleep(5); taosSsleep(5);
SArray* fileList = taosArrayInit(0, POINTER_BYTES);
char* id = "2013892036"; char* id = "2013892036";
char* file1 = "/Users/mingmingwanng/wal1";
char* file2 = "/Users/mingmingwanng/java_error_in_clion.hprof";
taosArrayPush(fileList, &file1);
taosArrayPush(fileList, &file2);
uploadCheckpoint(id, fileList); uploadCheckpoint(id, "/Users/mingmingwanng/rsync/");
} }
TEST(testCase, checkpointDownload_Test) { TEST(testCase, checkpointDownload_Test) {

View File

@ -118,9 +118,7 @@ void startRsync(){
uDebug("[rsync] start server successful"); uDebug("[rsync] start server successful");
} }
int uploadRsync(char* id, SArray* fileList){ int uploadRsync(char* id, char* path){
for(int i = 0; i < taosArrayGetSize(fileList); i++) {
char* fullName = (char*)taosArrayGetP(fileList, i);
char command[PATH_MAX] = {0}; char command[PATH_MAX] = {0};
// char* name = strrchr(fullName, '/'); // char* name = strrchr(fullName, '/');
// if(name == NULL){ // if(name == NULL){
@ -128,15 +126,19 @@ int uploadRsync(char* id, SArray* fileList){
// return -1; // return -1;
// } // }
// name = name + 1; // name = name + 1;
snprintf(command, PATH_MAX, "rsync -av --timeout=10 --bwlimit=100000 %s rsync://%s/checkpoint/%s/", if(path[strlen(path) - 1] != '/'){
fullName, tsSnodeIp, id); snprintf(command, PATH_MAX, "rsync -av --delete --timeout=10 --bwlimit=100000 %s/ rsync://%s/checkpoint/%s/",
path, tsSnodeIp, id);
}else{
snprintf(command, PATH_MAX, "rsync -av --delete --timeout=10 --bwlimit=100000 %s rsync://%s/checkpoint/%s/",
path, tsSnodeIp, id);
}
int code = execCommand(command); int code = execCommand(command);
if(code != 0){ if(code != 0){
uError("[rsync] send failed code:%d," ERRNO_ERR_FORMAT, code, ERRNO_ERR_DATA); uError("[rsync] send failed code:%d," ERRNO_ERR_FORMAT, code, ERRNO_ERR_DATA);
return -1; return -1;
} }
}
uDebug("[rsync] upload data:%s successful", id); uDebug("[rsync] upload data:%s successful", id);
return 0; return 0;
} }