From 159873692cfedac7bc855bd1eb3f6fa2575a0c5a Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Thu, 2 Nov 2023 20:13:29 +0800 Subject: [PATCH] feat:checkpoint bakeup using rsync --- include/common/tglobal.h | 4 + .../src/inc/vndCos.h => include/util/cos.h | 3 +- include/util/rsync.h | 24 +++ source/common/src/tglobal.c | 10 +- source/dnode/snode/src/snode.c | 4 + source/dnode/vnode/CMakeLists.txt | 1 - source/dnode/vnode/src/tsdb/tsdbCache.c | 2 +- source/dnode/vnode/src/tsdb/tsdbFS2.c | 2 +- .../dnode/vnode/src/tsdb/tsdbReaderWriter.c | 2 +- source/dnode/vnode/src/tsdb/tsdbRetention.c | 3 +- source/dnode/vnode/src/tsdb/tsdbWrite.c | 2 +- source/dnode/vnode/src/vnd/vnodeModule.c | 2 +- source/dnode/vnode/src/vnd/vnodeOpen.c | 2 +- source/dnode/vnode/src/vnd/vnodeSvr.c | 2 +- source/libs/stream/inc/streamInt.h | 11 ++ source/libs/stream/src/streamCheckpoint.c | 132 +++++++++++++ source/libs/stream/test/CMakeLists.txt | 11 ++ source/libs/stream/test/checkpointTest.cpp | 61 ++++++ .../src/vnd/vnodeCos.c => util/src/cos.c} | 2 +- source/util/src/rsync.c | 178 ++++++++++++++++++ 20 files changed, 445 insertions(+), 13 deletions(-) rename source/dnode/vnode/src/inc/vndCos.h => include/util/cos.h (96%) create mode 100644 include/util/rsync.h create mode 100644 source/libs/stream/test/checkpointTest.cpp rename source/{dnode/vnode/src/vnd/vnodeCos.c => util/src/cos.c} (99%) create mode 100644 source/util/src/rsync.c diff --git a/include/common/tglobal.h b/include/common/tglobal.h index 3e29703070..ccae8f02e2 100644 --- a/include/common/tglobal.h +++ b/include/common/tglobal.h @@ -82,6 +82,10 @@ extern int32_t tsHeartbeatTimeout; // vnode extern int64_t tsVndCommitMaxIntervalMs; +// snode +extern char tsSnodeIp[]; +extern char tsCheckpointBackupDir[]; + // mnode extern int64_t tsMndSdbWriteDelta; extern int64_t tsMndLogRetention; diff --git a/source/dnode/vnode/src/inc/vndCos.h b/include/util/cos.h similarity index 96% rename from source/dnode/vnode/src/inc/vndCos.h rename to include/util/cos.h index bb4d284f0e..9b91afea8d 100644 --- a/source/dnode/vnode/src/inc/vndCos.h +++ b/include/util/cos.h @@ -16,7 +16,7 @@ #ifndef _TD_VND_COS_H_ #define _TD_VND_COS_H_ -#include "vnd.h" +#include "os.h" #ifdef __cplusplus extern "C" { @@ -24,6 +24,7 @@ extern "C" { #define S3_BLOCK_CACHE +extern int8_t tsS3StreamEnabled; extern int8_t tsS3Enabled; extern int32_t tsS3BlockSize; extern int32_t tsS3BlockCacheSize; diff --git a/include/util/rsync.h b/include/util/rsync.h new file mode 100644 index 0000000000..50b27b95e0 --- /dev/null +++ b/include/util/rsync.h @@ -0,0 +1,24 @@ +// +// Created by mingming wanng on 2023/11/2. +// + +#ifndef TDENGINE_RSYNC_H +#define TDENGINE_RSYNC_H + +#ifdef __cplusplus +extern "C" { +#endif + +#include "tarray.h" + +void stopRsync(); +void startRsync(); +int uploadRsync(char* id, SArray* fileList); +int downloadRsync(char* id, char* path); +int deleteRsync(char* id); + +#ifdef __cplusplus +} +#endif + +#endif // TDENGINE_RSYNC_H diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index c6cff27011..241481df6a 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -133,6 +133,10 @@ char tsSmlAutoChildTableNameDelimiter[TSDB_TABLE_NAME_LEN] = ""; // bool tsSmlDataFormat = false; // int32_t tsSmlBatchSize = 10000; +// checkpoint backup +char tsSnodeIp[TSDB_FQDN_LEN] = {0}; +char tsCheckpointBackupDir[PATH_MAX] = "/var/lib/taos/backup/checkpoint/"; + // tmq int32_t tmqMaxTopicNum = 20; // query @@ -275,6 +279,7 @@ char tsS3AccessKeySecret[TSDB_FQDN_LEN] = ""; char tsS3BucketName[TSDB_FQDN_LEN] = ""; char tsS3AppId[TSDB_FQDN_LEN] = ""; int8_t tsS3Enabled = false; +int8_t tsS3StreamEnabled = false; int8_t tsS3Https = true; char tsS3Hostname[TSDB_FQDN_LEN] = ""; @@ -338,9 +343,10 @@ int32_t taosSetS3Cfg(SConfig *pCfg) { tstrncpy(tsS3AppId, appid + 1, TSDB_FQDN_LEN); } } - if (tsS3BucketName[0] != '<' && tsDiskCfgNum > 1) { + if (tsS3BucketName[0] != '<') { #if defined(USE_COS) || defined(USE_S3) - tsS3Enabled = true; + if(tsDiskCfgNum > 1) tsS3Enabled = true; + tsS3StreamEnabled = true; #endif } diff --git a/source/dnode/snode/src/snode.c b/source/dnode/snode/src/snode.c index 6451dba2da..9a6e51db31 100644 --- a/source/dnode/snode/src/snode.c +++ b/source/dnode/snode/src/snode.c @@ -13,6 +13,7 @@ * along with this program. If not, see . */ +#include "rsync.h" #include "executor.h" #include "sndInt.h" #include "tstream.h" @@ -120,6 +121,9 @@ SSnode *sndOpen(const char *path, const SSnodeOpt *pOption) { goto FAIL; } + stopRsync(); + startRsync(); + // todo fix it: send msg to mnode to rollback to an existed checkpoint streamMetaInitForSnode(pSnode->pMeta); return pSnode; diff --git a/source/dnode/vnode/CMakeLists.txt b/source/dnode/vnode/CMakeLists.txt index dcc9f9a115..27cb0f93f7 100644 --- a/source/dnode/vnode/CMakeLists.txt +++ b/source/dnode/vnode/CMakeLists.txt @@ -8,7 +8,6 @@ set( "src/vnd/vnodeCommit.c" "src/vnd/vnodeQuery.c" "src/vnd/vnodeModule.c" - "src/vnd/vnodeCos.c" "src/vnd/vnodeSvr.c" "src/vnd/vnodeSync.c" "src/vnd/vnodeSnapshot.c" diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c index ca3fb7027f..1e73fad8f6 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCache.c +++ b/source/dnode/vnode/src/tsdb/tsdbCache.c @@ -12,11 +12,11 @@ * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ +#include "cos.h" #include "tsdb.h" #include "tsdbDataFileRW.h" #include "tsdbReadUtil.h" #include "vnd.h" -#include "vndCos.h" #define ROCKS_BATCH_SIZE (4096) diff --git a/source/dnode/vnode/src/tsdb/tsdbFS2.c b/source/dnode/vnode/src/tsdb/tsdbFS2.c index 38d221d978..348397272d 100644 --- a/source/dnode/vnode/src/tsdb/tsdbFS2.c +++ b/source/dnode/vnode/src/tsdb/tsdbFS2.c @@ -14,9 +14,9 @@ */ #include "tsdbFS2.h" +#include "cos.h" #include "tsdbUpgrade.h" #include "vnd.h" -#include "vndCos.h" #define BLOCK_COMMIT_FACTOR 3 diff --git a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c index f3bcfef703..cc15fb85ca 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c +++ b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c @@ -13,8 +13,8 @@ * along with this program. If not, see . */ +#include "cos.h" #include "tsdb.h" -#include "vndCos.h" static int32_t tsdbOpenFileImpl(STsdbFD *pFD) { int32_t code = 0; diff --git a/source/dnode/vnode/src/tsdb/tsdbRetention.c b/source/dnode/vnode/src/tsdb/tsdbRetention.c index 6c41b46c73..2400c41d52 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRetention.c +++ b/source/dnode/vnode/src/tsdb/tsdbRetention.c @@ -15,7 +15,8 @@ #include "tsdb.h" #include "tsdbFS2.h" -#include "vndCos.h" +#include "cos.h" +#include "vnd.h" typedef struct { STsdb *tsdb; diff --git a/source/dnode/vnode/src/tsdb/tsdbWrite.c b/source/dnode/vnode/src/tsdb/tsdbWrite.c index 5949b103d5..e75079403e 100644 --- a/source/dnode/vnode/src/tsdb/tsdbWrite.c +++ b/source/dnode/vnode/src/tsdb/tsdbWrite.c @@ -13,8 +13,8 @@ * along with this program. If not, see . */ +#include "cos.h" #include "tsdb.h" -#include "vndCos.h" /** * @brief max key by precision diff --git a/source/dnode/vnode/src/vnd/vnodeModule.c b/source/dnode/vnode/src/vnd/vnodeModule.c index 6ccce5c9d7..df08fb8a2b 100644 --- a/source/dnode/vnode/src/vnd/vnodeModule.c +++ b/source/dnode/vnode/src/vnd/vnodeModule.c @@ -13,8 +13,8 @@ * along with this program. If not, see . */ +#include "cos.h" #include "vnd.h" -#include "vndCos.h" typedef struct SVnodeTask SVnodeTask; struct SVnodeTask { diff --git a/source/dnode/vnode/src/vnd/vnodeOpen.c b/source/dnode/vnode/src/vnd/vnodeOpen.c index 3bdecee79b..ff79e83d72 100644 --- a/source/dnode/vnode/src/vnd/vnodeOpen.c +++ b/source/dnode/vnode/src/vnd/vnodeOpen.c @@ -13,10 +13,10 @@ * along with this program. If not, see . */ +#include "cos.h" #include "sync.h" #include "tsdb.h" #include "vnd.h" -#include "vndCos.h" int32_t vnodeGetPrimaryDir(const char *relPath, int32_t diskPrimary, STfs *pTfs, char *buf, size_t bufLen) { if (pTfs) { diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index b6a4aaf388..8f6f5df850 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -16,7 +16,7 @@ #include "tencode.h" #include "tmsg.h" #include "vnd.h" -#include "vndCos.h" +#include "cos.h" #include "vnode.h" #include "vnodeInt.h" #include "audit.h" diff --git a/source/libs/stream/inc/streamInt.h b/source/libs/stream/inc/streamInt.h index 4cd8319a07..806124bc58 100644 --- a/source/libs/stream/inc/streamInt.h +++ b/source/libs/stream/inc/streamInt.h @@ -137,6 +137,17 @@ void* streamQueueNextItem(SStreamQueue* pQueue); void streamFreeQitem(SStreamQueueItem* data); int32_t streamQueueGetItemSize(const SStreamQueue* pQueue); +//#define CHECKPOINT_PATH_LEN 128 +//typedef struct SChekpointDataHeader{ +// int64_t size; +// char name[CHECKPOINT_PATH_LEN]; +// char id[CHECKPOINT_PATH_LEN]; +//} SChekpointDataHeader; + +int uploadCheckpoint(char* id, SArray* fileList); +int downloadCheckpoint(char* id, char* path); +int deleteCheckpoint(char* id); + #ifdef __cplusplus } #endif diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index 2cde368195..238c7c2329 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -14,6 +14,7 @@ */ #include "streamInt.h" +#include "rsync.h" int32_t tEncodeStreamCheckpointSourceReq(SEncoder* pEncoder, const SStreamCheckpointSourceReq* pReq) { if (tStartEncode(pEncoder) < 0) return -1; @@ -357,3 +358,134 @@ int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) { return code; } + + +//static int64_t kBlockSize = 64 * 1024; +//static int sendCheckpointToS3(char* id, SArray* fileList){ +// code = s3PutObjectFromFile2(from->fname, object_name); +// return 0; +//} +//static int sendCheckpointToSnode(char* id, SArray* fileList){ +// if(strlen(id) >= CHECKPOINT_PATH_LEN){ +// tqError("uploadCheckpoint id name too long, name:%s", id); +// return -1; +// } +// uint8_t* buf = taosMemoryCalloc(1, sizeof(SChekpointDataHeader) + kBlockSize); +// if(buf == NULL){ +// tqError("uploadCheckpoint malloc failed"); +// return -1; +// } +// +// SChekpointDataHeader* pHdr = (SChekpointDataHeader*)buf; +// strcpy(pHdr->id, id); +// +// TdFilePtr fd = NULL; +// for(int i = 0; i < taosArrayGetSize(fileList); i++){ +// char* name = (char*)taosArrayGetP(fileList, i); +// if(strlen(name) >= CHECKPOINT_PATH_LEN){ +// tqError("uploadCheckpoint file name too long, name:%s", name); +// return -1; +// } +// int64_t offset = 0; +// +// fd = taosOpenFile(name, TD_FILE_READ); +// tqDebug("uploadCheckpoint open file %s, file index: %d", name, i); +// +// while(1){ +// int64_t nread = taosPReadFile(fd, buf + sizeof(SChekpointDataHeader), kBlockSize, offset); +// if (nread == -1) { +// taosCloseFile(&fd); +// taosMemoryFree(buf); +// tqError("uploadCheckpoint failed to read file name:%s,reason:%d", name, errno); +// return -1; +// } else if (nread == 0){ +// tqDebug("uploadCheckpoint no data read, close file:%s, move to next file, open and read", name); +// taosCloseFile(&fd); +// break; +// } else if (nread == kBlockSize){ +// offset += nread; +// } else { +// taosCloseFile(&fd); +// offset = 0; +// } +// tqDebug("uploadCheckpoint read file %s, size:%" PRId64 ", current offset:%" PRId64, name, nread, offset); +// +// +// pHdr->size = nread; +// strcpy(pHdr->name, name); +// +// SRpcMsg rpcMsg = {0}; +// int32_t bytes = sizeof(SChekpointDataHeader) + nread; +// rpcMsg.pCont = rpcMallocCont(bytes); +// rpcMsg.msgType = TDMT_SYNC_SNAPSHOT_SEND; +// rpcMsg.contLen = bytes; +// if (rpcMsg.pCont == NULL) { +// tqError("uploadCheckpoint malloc failed"); +// taosCloseFile(&fd); +// taosMemoryFree(buf); +// return -1; +// } +// memcpy(rpcMsg.pCont, buf, bytes); +// int try = 3; +// int32_t code = 0; +// while(try-- > 0){ +// code = tmsgSendReq(pEpSet, &rpcMsg); +// if(code == 0) +// break; +// taosMsleep(10); +// } +// if(code != 0){ +// tqError("uploadCheckpoint send request failed code:%d", code); +// taosCloseFile(&fd); +// taosMemoryFree(buf); +// return -1; +// } +// +// if(offset == 0){ +// break; +// } +// } +// } +// +// taosMemoryFree(buf); + +//} + +int uploadCheckpoint(char* id, SArray* fileList){ + if(id == NULL || fileList == NULL || strlen(id) == 0 || taosArrayGetSize(fileList) == 0){ + stError("uploadCheckpoint parameters invalid"); + return -1; + } + if(strlen(tsSnodeIp) != 0){ + uploadRsync(id, fileList); +// }else if(tsS3StreamEnabled){ + + } + return 0; +} + +int downloadCheckpoint(char* id, char* path){ + if(id == NULL || path == NULL || strlen(id) == 0 || strlen(path) == 0){ + stError("downloadCheckpoint parameters invalid"); + return -1; + } + if(strlen(tsSnodeIp) != 0){ + downloadRsync(id, path); +// }else if(tsS3StreamEnabled){ + + } + return 0; +} + +int deleteCheckpoint(char* id){ + if(id == NULL || strlen(id) == 0){ + stError("deleteCheckpoint parameters invalid"); + return -1; + } + if(strlen(tsSnodeIp) != 0){ + deleteRsync(id); +// }else if(tsS3StreamEnabled){ + + } + return 0; +} diff --git a/source/libs/stream/test/CMakeLists.txt b/source/libs/stream/test/CMakeLists.txt index 629b04ae51..d756b71e29 100644 --- a/source/libs/stream/test/CMakeLists.txt +++ b/source/libs/stream/test/CMakeLists.txt @@ -18,6 +18,17 @@ TARGET_INCLUDE_DIRECTORIES( PRIVATE "${TD_SOURCE_DIR}/source/libs/stream/inc" ) +ADD_EXECUTABLE(checkpointTest checkpointTest.cpp) +TARGET_LINK_LIBRARIES( + checkpointTest + PUBLIC os common gtest stream executor qcom index transport util +) + +TARGET_INCLUDE_DIRECTORIES( + checkpointTest + PRIVATE "${TD_SOURCE_DIR}/source/libs/stream/inc" +) + add_test( NAME streamUpdateTest COMMAND streamUpdateTest diff --git a/source/libs/stream/test/checkpointTest.cpp b/source/libs/stream/test/checkpointTest.cpp new file mode 100644 index 0000000000..56614cc537 --- /dev/null +++ b/source/libs/stream/test/checkpointTest.cpp @@ -0,0 +1,61 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include + +#include +#include +#include + +#pragma GCC diagnostic push +#pragma GCC diagnostic ignored "-Wwrite-strings" +#pragma GCC diagnostic ignored "-Wunused-function" +#pragma GCC diagnostic ignored "-Wunused-variable" +#pragma GCC diagnostic ignored "-Wsign-compare" + +#include "rsync.h" +#include "streamInt.h" + +int main(int argc, char **argv) { + testing::InitGoogleTest(&argc, argv); + + strcpy(tsSnodeIp, "127.0.0.1"); + return RUN_ALL_TESTS(); +} + +TEST(testCase, checkpointUpload_Test) { + stopRsync(); + startRsync(); + + taosSsleep(5); + SArray* fileList = taosArrayInit(0, POINTER_BYTES); + char* id = "2013892036"; + char* file1 = "/Users/mingmingwanng/wal1"; + char* file2 = "/Users/mingmingwanng/java_error_in_clion.hprof"; + taosArrayPush(fileList, &file1); + taosArrayPush(fileList, &file2); + + uploadCheckpoint(id, fileList); +} + +TEST(testCase, checkpointDownload_Test) { + char* id = "2013892036"; + downloadRsync(id, "/Users/mingmingwanng/rsync/tmp"); +} + +TEST(testCase, checkpointDelete_Test) { + char* id = "2013892036"; + deleteRsync(id); +} diff --git a/source/dnode/vnode/src/vnd/vnodeCos.c b/source/util/src/cos.c similarity index 99% rename from source/dnode/vnode/src/vnd/vnodeCos.c rename to source/util/src/cos.c index 6e36739f5a..23b2d53990 100644 --- a/source/dnode/vnode/src/vnd/vnodeCos.c +++ b/source/util/src/cos.c @@ -1,6 +1,6 @@ #define ALLOW_FORBID_FUNC -#include "vndCos.h" +#include "cos.h" extern char tsS3Endpoint[]; extern char tsS3AccessKeyId[]; diff --git a/source/util/src/rsync.c b/source/util/src/rsync.c new file mode 100644 index 0000000000..940c631c65 --- /dev/null +++ b/source/util/src/rsync.c @@ -0,0 +1,178 @@ +// +// Created by mingming wanng on 2023/11/2. +// +#include "rsync.h" +#include +#include "tglobal.h" + +#define ERRNO_ERR_FORMAT "errno:%d,msg:%s" +#define ERRNO_ERR_DATA errno,strerror(errno) + +// deleteRsync function produce empty directories, traverse base directory to remove them +static void removeEmptyDir(){ + TdDirPtr pDir = taosOpenDir(tsCheckpointBackupDir); + if (pDir == NULL) return; + + TdDirEntryPtr de = NULL; + while ((de = taosReadDir(pDir)) != NULL) { + if (!taosDirEntryIsDir(de)) { + continue; + } + + if (strcmp(taosGetDirEntryName(de), ".") == 0 || strcmp(taosGetDirEntryName(de), "..") == 0) continue; + + char filename[PATH_MAX] = {0}; + snprintf(filename, sizeof(filename), "%s%s", tsCheckpointBackupDir, taosGetDirEntryName(de)); + + TdDirPtr pDirTmp = taosOpenDir(filename); + TdDirEntryPtr deTmp = NULL; + bool empty = true; + while ((deTmp = taosReadDir(pDirTmp)) != NULL){ + if (strcmp(taosGetDirEntryName(deTmp), ".") == 0 || strcmp(taosGetDirEntryName(deTmp), "..") == 0) continue; + empty = false; + } + if(empty) taosRemoveDir(filename); + taosCloseDir(&pDirTmp); + } + + taosCloseDir(&pDir); +} + +static int generateConfigFile(char* confDir){ + TdFilePtr pFile = taosOpenFile(confDir, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC); + if (pFile == NULL) { + uError("[rsync] open conf file error, dir:%s,"ERRNO_ERR_FORMAT, confDir, ERRNO_ERR_DATA); + return -1; + } + + char confContent[PATH_MAX*4] = {0}; + snprintf(confContent, PATH_MAX*4, + "uid = root\n" + "gid = root\n" + "use chroot = false\n" + "max connections = 200\n" + "timeout = 100\n" + "lock file = %srsync.lock\n" + "log file = %srsync.log\n" + "ignore errors = true\n" + "read only = false\n" + "list = false\n" + "[checkpoint]\n" + "path = %s", tsCheckpointBackupDir, tsCheckpointBackupDir, tsCheckpointBackupDir); + uDebug("[rsync] conf:%s", confContent); + if (taosWriteFile(pFile, confContent, strlen(confContent)) <= 0){ + uError("[rsync] write conf file error,"ERRNO_ERR_FORMAT, ERRNO_ERR_DATA); + taosCloseFile(&pFile); + return -1; + } + + taosCloseFile(&pFile); + return 0; +} + +static int execCommand(char* command){ + int try = 3; + int32_t code = 0; + while(try-- > 0) { + code = system(command); + if (code == 0) { + break; + } + taosMsleep(10); + } + return code; +} + +void stopRsync(){ + int code = system("pkill rsync"); + if(code != 0){ + uError("[rsync] stop rsync server failed,"ERRNO_ERR_FORMAT, ERRNO_ERR_DATA); + return; + } + uDebug("[rsync] stop rsync server successful"); +} + +void startRsync(){ + if(taosMulMkDir(tsCheckpointBackupDir) != 0){ + uError("[rsync] build checkpoint backup dir failed, dir:%s,"ERRNO_ERR_FORMAT, tsCheckpointBackupDir, ERRNO_ERR_DATA); + return; + } + removeEmptyDir(); + + char confDir[PATH_MAX] = {0}; + snprintf(confDir, PATH_MAX, "%srsync.conf", tsCheckpointBackupDir); + + int code = generateConfigFile(confDir); + if(code != 0){ + return; + } + + char cmd[PATH_MAX] = {0}; + snprintf(cmd, PATH_MAX, "rsync --daemon --config=%s", confDir); + // start rsync service to backup checkpoint + code = system(cmd); + if(code != 0){ + uError("[rsync] start server failed, code:%d,"ERRNO_ERR_FORMAT, code, ERRNO_ERR_DATA); + return; + } + uDebug("[rsync] start server successful"); +} + +int uploadRsync(char* id, SArray* fileList){ + for(int i = 0; i < taosArrayGetSize(fileList); i++) { + char* fullName = (char*)taosArrayGetP(fileList, i); + char command[PATH_MAX] = {0}; +// char* name = strrchr(fullName, '/'); +// if(name == NULL){ +// uError("[rsync] file name invalid, name:%s", name); +// return -1; +// } +// name = name + 1; + snprintf(command, PATH_MAX, "rsync -av --timeout=10 --bwlimit=100000 %s rsync://%s/checkpoint/%s/", + fullName, tsSnodeIp, id); + + int code = execCommand(command); + if(code != 0){ + uError("[rsync] send failed code:%d," ERRNO_ERR_FORMAT, code, ERRNO_ERR_DATA); + return -1; + } + } + uDebug("[rsync] upload data:%s successful", id); + return 0; +} + +int downloadRsync(char* id, char* path){ + char command[PATH_MAX] = {0}; + snprintf(command, PATH_MAX, "rsync -av --timeout=10 --bwlimit=100000 rsync://%s/checkpoint/%s/ %s", + tsSnodeIp, id, path); + + int code = execCommand(command); + if(code != 0){ + uError("[rsync] get failed code:%d," ERRNO_ERR_FORMAT, code, ERRNO_ERR_DATA); + return -1; + } + uDebug("[rsync] down data:%s successful", id); + return 0; +} + +int deleteRsync(char* id){ + char* tmp = "./tmp_empty/"; + int code = taosMkDir(tmp); + if(code != 0){ + uError("[rsync] make tmp dir failed. code:%d," ERRNO_ERR_FORMAT, code, ERRNO_ERR_DATA); + return -1; + } + char command[PATH_MAX] = {0}; + snprintf(command, PATH_MAX, "rsync -av --delete --timeout=10 %s rsync://%s/checkpoint/%s/", + tmp, tsSnodeIp, id); + + code = execCommand(command); + taosRemoveDir(tmp); + if(code != 0){ + uError("[rsync] get failed code:%d," ERRNO_ERR_FORMAT, code, ERRNO_ERR_DATA); + return -1; + } + uDebug("[rsync] delete data:%s successful", id); + + return 0; +} \ No newline at end of file