From 3ac11eba0eceeae8e2a9785c806c8e66aa9f472f Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Fri, 3 Nov 2023 11:33:18 +0800 Subject: [PATCH] fix:modify upload checkpoint interface --- include/util/rsync.h | 2 +- source/libs/stream/inc/streamInt.h | 2 +- source/libs/stream/src/streamCheckpoint.c | 6 +++--- source/libs/stream/test/checkpointTest.cpp | 7 +------ source/util/src/rsync.c | 24 ++++++++++++---------- 5 files changed, 19 insertions(+), 22 deletions(-) diff --git a/include/util/rsync.h b/include/util/rsync.h index 50b27b95e0..6cce645d1e 100644 --- a/include/util/rsync.h +++ b/include/util/rsync.h @@ -13,7 +13,7 @@ extern "C" { void stopRsync(); void startRsync(); -int uploadRsync(char* id, SArray* fileList); +int uploadRsync(char* id, char* path); int downloadRsync(char* id, char* path); int deleteRsync(char* id); diff --git a/source/libs/stream/inc/streamInt.h b/source/libs/stream/inc/streamInt.h index 806124bc58..85e4e923e5 100644 --- a/source/libs/stream/inc/streamInt.h +++ b/source/libs/stream/inc/streamInt.h @@ -144,7 +144,7 @@ int32_t streamQueueGetItemSize(const SStreamQueue* pQueue); // char id[CHECKPOINT_PATH_LEN]; //} SChekpointDataHeader; -int uploadCheckpoint(char* id, SArray* fileList); +int uploadCheckpoint(char* id, char* path); int downloadCheckpoint(char* id, char* path); int deleteCheckpoint(char* id); diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index 238c7c2329..e02a591cff 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -451,13 +451,13 @@ int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) { //} -int uploadCheckpoint(char* id, SArray* fileList){ - if(id == NULL || fileList == NULL || strlen(id) == 0 || taosArrayGetSize(fileList) == 0){ +int uploadCheckpoint(char* id, char* path){ + if(id == NULL || path == NULL || strlen(id) == 0 || strlen(path) == 0){ stError("uploadCheckpoint parameters invalid"); return -1; } if(strlen(tsSnodeIp) != 0){ - uploadRsync(id, fileList); + uploadRsync(id, path); // }else if(tsS3StreamEnabled){ } diff --git a/source/libs/stream/test/checkpointTest.cpp b/source/libs/stream/test/checkpointTest.cpp index 56614cc537..6b53f13c71 100644 --- a/source/libs/stream/test/checkpointTest.cpp +++ b/source/libs/stream/test/checkpointTest.cpp @@ -40,14 +40,9 @@ TEST(testCase, checkpointUpload_Test) { startRsync(); taosSsleep(5); - SArray* fileList = taosArrayInit(0, POINTER_BYTES); char* id = "2013892036"; - char* file1 = "/Users/mingmingwanng/wal1"; - char* file2 = "/Users/mingmingwanng/java_error_in_clion.hprof"; - taosArrayPush(fileList, &file1); - taosArrayPush(fileList, &file2); - uploadCheckpoint(id, fileList); + uploadCheckpoint(id, "/Users/mingmingwanng/rsync/"); } TEST(testCase, checkpointDownload_Test) { diff --git a/source/util/src/rsync.c b/source/util/src/rsync.c index 940c631c65..b31526dc48 100644 --- a/source/util/src/rsync.c +++ b/source/util/src/rsync.c @@ -118,24 +118,26 @@ void startRsync(){ uDebug("[rsync] start server successful"); } -int uploadRsync(char* id, SArray* fileList){ - for(int i = 0; i < taosArrayGetSize(fileList); i++) { - char* fullName = (char*)taosArrayGetP(fileList, i); - char command[PATH_MAX] = {0}; +int uploadRsync(char* id, char* path){ + char command[PATH_MAX] = {0}; // char* name = strrchr(fullName, '/'); // if(name == NULL){ // uError("[rsync] file name invalid, name:%s", name); // return -1; // } // name = name + 1; - snprintf(command, PATH_MAX, "rsync -av --timeout=10 --bwlimit=100000 %s rsync://%s/checkpoint/%s/", - fullName, tsSnodeIp, id); + if(path[strlen(path) - 1] != '/'){ + snprintf(command, PATH_MAX, "rsync -av --delete --timeout=10 --bwlimit=100000 %s/ rsync://%s/checkpoint/%s/", + path, tsSnodeIp, id); + }else{ + snprintf(command, PATH_MAX, "rsync -av --delete --timeout=10 --bwlimit=100000 %s rsync://%s/checkpoint/%s/", + path, tsSnodeIp, id); + } - int code = execCommand(command); - if(code != 0){ - uError("[rsync] send failed code:%d," ERRNO_ERR_FORMAT, code, ERRNO_ERR_DATA); - return -1; - } + int code = execCommand(command); + if(code != 0){ + uError("[rsync] send failed code:%d," ERRNO_ERR_FORMAT, code, ERRNO_ERR_DATA); + return -1; } uDebug("[rsync] upload data:%s successful", id); return 0;