diff --git a/source/dnode/vnode/src/inc/vndCos.h b/include/common/cos.h similarity index 92% rename from source/dnode/vnode/src/inc/vndCos.h rename to include/common/cos.h index 8581b039f8..0610c63f9e 100644 --- a/source/dnode/vnode/src/inc/vndCos.h +++ b/include/common/cos.h @@ -16,7 +16,7 @@ #ifndef _TD_VND_COS_H_ #define _TD_VND_COS_H_ -#include "vnd.h" +#include "os.h" #ifdef __cplusplus extern "C" { @@ -24,6 +24,7 @@ extern "C" { #define S3_BLOCK_CACHE +extern int8_t tsS3StreamEnabled; extern int8_t tsS3Enabled; extern int32_t tsS3BlockSize; extern int32_t tsS3BlockCacheSize; @@ -38,6 +39,7 @@ void s3DeleteObjectsByPrefix(const char *prefix); void s3DeleteObjects(const char *object_name[], int nobject); bool s3Exists(const char *object_name); bool s3Get(const char *object_name, const char *path); +int32_t s3GetObjectsByPrefix(const char *prefix, const char* path); int32_t s3GetObjectBlock(const char *object_name, int64_t offset, int64_t size, uint8_t **ppBlock); void s3EvictCache(const char *path, long object_size); long s3Size(const char *object_name); diff --git a/include/common/rsync.h b/include/common/rsync.h new file mode 100644 index 0000000000..6cce645d1e --- /dev/null +++ b/include/common/rsync.h @@ -0,0 +1,24 @@ +// +// Created by mingming wanng on 2023/11/2. +// + +#ifndef TDENGINE_RSYNC_H +#define TDENGINE_RSYNC_H + +#ifdef __cplusplus +extern "C" { +#endif + +#include "tarray.h" + +void stopRsync(); +void startRsync(); +int uploadRsync(char* id, char* path); +int downloadRsync(char* id, char* path); +int deleteRsync(char* id); + +#ifdef __cplusplus +} +#endif + +#endif // TDENGINE_RSYNC_H diff --git a/include/common/tglobal.h b/include/common/tglobal.h index f2ffde7b15..8688e07932 100644 --- a/include/common/tglobal.h +++ b/include/common/tglobal.h @@ -75,6 +75,13 @@ extern int32_t tsElectInterval; extern int32_t tsHeartbeatInterval; extern int32_t tsHeartbeatTimeout; +// snode +extern int32_t tsRsyncPort; +extern char tsCheckpointBackupDir[]; + +// vnode checkpoint +extern char tsSnodeAddress[]; //127.0.0.1:873 + // mnode extern int64_t tsMndSdbWriteDelta; extern int64_t tsMndLogRetention; diff --git a/include/os/osDef.h b/include/os/osDef.h index bad4536fa6..1a831f2e86 100644 --- a/include/os/osDef.h +++ b/include/os/osDef.h @@ -237,6 +237,12 @@ void syslog(int unused, const char *format, ...); #define TD_DIRSEP "/" #endif +#if defined(_WIN32) +#define TD_DIRSEP_CHAR '\\' +#else +#define TD_DIRSEP_CHAR '/' +#endif + #define TD_LOCALE_LEN 64 #define TD_CHARSET_LEN 64 #define TD_TIMEZONE_LEN 96 diff --git a/source/common/CMakeLists.txt b/source/common/CMakeLists.txt index c35845f9df..c7831b2b6d 100644 --- a/source/common/CMakeLists.txt +++ b/source/common/CMakeLists.txt @@ -46,6 +46,75 @@ target_link_libraries( INTERFACE api ) +if(${BUILD_S3}) + + if(${BUILD_WITH_S3}) + target_include_directories( + common + + PUBLIC "$ENV{HOME}/.cos-local.2/include" + ) + + set(CMAKE_FIND_LIBRARY_SUFFIXES ".a") + set(CMAKE_PREFIX_PATH $ENV{HOME}/.cos-local.2) + find_library(S3_LIBRARY s3) + find_library(CURL_LIBRARY curl $ENV{HOME}/.cos-local.2/lib NO_DEFAULT_PATH) + find_library(XML2_LIBRARY xml2) + find_library(SSL_LIBRARY ssl $ENV{HOME}/.cos-local.2/lib64 NO_DEFAULT_PATH) + find_library(CRYPTO_LIBRARY crypto $ENV{HOME}/.cos-local.2/lib64 NO_DEFAULT_PATH) + target_link_libraries( + common + + # s3 + PUBLIC ${S3_LIBRARY} + PUBLIC ${CURL_LIBRARY} + PUBLIC ${SSL_LIBRARY} + PUBLIC ${CRYPTO_LIBRARY} + PUBLIC ${XML2_LIBRARY} + ) + + add_definitions(-DUSE_S3) + endif() + + if(${BUILD_WITH_COS}) + + set(CMAKE_FIND_LIBRARY_SUFFIXES ".a") + find_library(APR_LIBRARY apr-1 PATHS /usr/local/apr/lib/) + find_library(APR_UTIL_LIBRARY aprutil-1 PATHS /usr/local/apr/lib/) + find_library(MINIXML_LIBRARY mxml) + find_library(CURL_LIBRARY curl) + target_link_libraries( + common + + # s3 + PUBLIC cos_c_sdk_static + PUBLIC ${APR_UTIL_LIBRARY} + PUBLIC ${APR_LIBRARY} + PUBLIC ${MINIXML_LIBRARY} + PUBLIC ${CURL_LIBRARY} + ) + + # s3 + FIND_PROGRAM(APR_CONFIG_BIN NAMES apr-config apr-1-config PATHS /usr/bin /usr/local/bin /usr/local/apr/bin/) + IF (APR_CONFIG_BIN) + EXECUTE_PROCESS( + COMMAND ${APR_CONFIG_BIN} --includedir + OUTPUT_VARIABLE APR_INCLUDE_DIR + OUTPUT_STRIP_TRAILING_WHITESPACE + ) + ENDIF() + include_directories (${APR_INCLUDE_DIR}) + target_include_directories( + common + PUBLIC "${TD_SOURCE_DIR}/contrib/cos-c-sdk-v5/cos_c_sdk" + PUBLIC "$ENV{HOME}/.cos-local.1/include" + ) + + add_definitions(-DUSE_COS) + endif(${BUILD_WITH_COS}) + +endif() + if(${BUILD_TEST}) ADD_SUBDIRECTORY(test) endif(${BUILD_TEST}) diff --git a/source/dnode/vnode/src/vnd/vnodeCos.c b/source/common/src/cos.c similarity index 90% rename from source/dnode/vnode/src/vnd/vnodeCos.c rename to source/common/src/cos.c index 6e36739f5a..daeb81b8a3 100644 --- a/source/dnode/vnode/src/vnd/vnodeCos.c +++ b/source/common/src/cos.c @@ -1,6 +1,6 @@ #define ALLOW_FORBID_FUNC -#include "vndCos.h" +#include "cos.h" extern char tsS3Endpoint[]; extern char tsS3AccessKeyId[]; @@ -13,6 +13,7 @@ extern int8_t tsS3Https; #if defined(USE_S3) #include "libs3.h" +#include "tarray.h" static int verifyPeerG = 0; static const char *awsRegionG = NULL; @@ -34,7 +35,7 @@ static int32_t s3Begin() { } if ((status = S3_initialize("s3", verifyPeerG | S3_INIT_ALL, hostname)) != S3StatusOK) { - vError("Failed to initialize libs3: %s\n", S3_get_status_name(status)); + uError("Failed to initialize libs3: %s\n", S3_get_status_name(status)); return -1; } @@ -65,12 +66,18 @@ static int should_retry() { static void s3PrintError(const char *func, S3Status status, char error_details[]) { if (status < S3StatusErrorAccessDenied) { - vError("%s: %s", __func__, S3_get_status_name(status)); + uError("%s: %s", __func__, S3_get_status_name(status)); } else { - vError("%s: %s, %s", __func__, S3_get_status_name(status), error_details); + uError("%s: %s, %s", __func__, S3_get_status_name(status), error_details); } } +typedef struct { + char err_msg[128]; + S3Status status; + TdFilePtr file; +} TS3GetData; + typedef struct { char err_msg[128]; S3Status status; @@ -78,6 +85,11 @@ typedef struct { char *buf; } TS3SizeCBD; +static S3Status responsePropertiesCallbackNull(const S3ResponseProperties *properties, void *callbackData) { +// (void)callbackData; + return S3StatusOK; +} + static S3Status responsePropertiesCallback(const S3ResponseProperties *properties, void *callbackData) { //(void)callbackData; TS3SizeCBD *cbd = callbackData; @@ -291,7 +303,7 @@ S3Status initial_multipart_callback(const char *upload_id, void *callbackData) { } S3Status MultipartResponseProperiesCallback(const S3ResponseProperties *properties, void *callbackData) { - responsePropertiesCallback(properties, callbackData); + responsePropertiesCallbackNull(properties, callbackData); MultipartPartData *data = (MultipartPartData *)callbackData; int seq = data->seq; @@ -390,7 +402,7 @@ static int try_get_parts_info(const char *bucketName, const char *key, UploadMan S3BucketContext bucketContext = {0, tsS3BucketName, protocolG, uriStyleG, tsS3AccessKeyId, tsS3AccessKeySecret, 0, awsRegionG}; - S3ListPartsHandler listPartsHandler = {{&responsePropertiesCallback, &responseCompleteCallback}, &listPartsCallback}; + S3ListPartsHandler listPartsHandler = {{&responsePropertiesCallbackNull, &responseCompleteCallback}, &listPartsCallback}; list_parts_callback_data data; @@ -445,13 +457,13 @@ int32_t s3PutObjectFromFile2(const char *file, const char *object) { data.noStatus = noStatus; if (taosStatFile(file, &contentLength, NULL, NULL) < 0) { - vError("ERROR: %s Failed to stat file %s: ", __func__, file); + uError("ERROR: %s Failed to stat file %s: ", __func__, file); code = TAOS_SYSTEM_ERROR(errno); return code; } if (!(data.infileFD = taosOpenFile(file, TD_FILE_READ))) { - vError("ERROR: %s Failed to open file %s: ", __func__, file); + uError("ERROR: %s Failed to open file %s: ", __func__, file); code = TAOS_SYSTEM_ERROR(errno); return code; } @@ -469,7 +481,7 @@ int32_t s3PutObjectFromFile2(const char *file, const char *object) { metaProperties, useServerSideEncryption}; if (contentLength <= MULTIPART_CHUNK_SIZE) { - S3PutObjectHandler putObjectHandler = {{&responsePropertiesCallback, &responseCompleteCallback}, + S3PutObjectHandler putObjectHandler = {{&responsePropertiesCallbackNull, &responseCompleteCallback}, &putObjectDataCallback}; do { @@ -486,7 +498,7 @@ int32_t s3PutObjectFromFile2(const char *file, const char *object) { s3PrintError(__func__, data.status, data.err_msg); code = TAOS_SYSTEM_ERROR(EIO); } else if (data.contentLength) { - vError("ERROR: %s Failed to read remaining %llu bytes from input", __func__, + uError("ERROR: %s Failed to read remaining %llu bytes from input", __func__, (unsigned long long)data.contentLength); code = TAOS_SYSTEM_ERROR(EIO); } @@ -506,14 +518,14 @@ int32_t s3PutObjectFromFile2(const char *file, const char *object) { memset(&partData, 0, sizeof(MultipartPartData)); int partContentLength = 0; - S3MultipartInitialHandler handler = {{&responsePropertiesCallback, &responseCompleteCallback}, + S3MultipartInitialHandler handler = {{&responsePropertiesCallbackNull, &responseCompleteCallback}, &initial_multipart_callback}; S3PutObjectHandler putObjectHandler = {{&MultipartResponseProperiesCallback, &responseCompleteCallback}, &putObjectDataCallback}; S3MultipartCommitHandler commit_handler = { - {&responsePropertiesCallback, &responseCompleteCallback}, &multipartPutXmlCallback, 0}; + {&responsePropertiesCallbackNull, &responseCompleteCallback}, &multipartPutXmlCallback, 0}; manager.etags = (char **)taosMemoryMalloc(sizeof(char *) * totalSeq); manager.next_etags_pos = 0; @@ -658,19 +670,19 @@ static void s3FreeObjectKey(void *pItem) { taosMemoryFree(key); } -void s3DeleteObjectsByPrefix(const char *prefix) { +static SArray* getListByPrefix(const char *prefix){ S3BucketContext bucketContext = {0, tsS3BucketName, protocolG, uriStyleG, tsS3AccessKeyId, tsS3AccessKeySecret, 0, awsRegionG}; - S3ListBucketHandler listBucketHandler = {{&responsePropertiesCallback, &responseCompleteCallback}, + S3ListBucketHandler listBucketHandler = {{&responsePropertiesCallbackNull, &responseCompleteCallback}, &listBucketCallback}; const char *marker = 0, *delimiter = 0; int maxkeys = 0, allDetails = 0; list_bucket_callback_data data; - data.objectArray = taosArrayInit(32, POINTER_BYTES); + data.objectArray = taosArrayInit(32, sizeof(void*)); if (!data.objectArray) { - vError("%s: %s", __func__, "out of memoty"); - return; + uError("%s: %s", __func__, "out of memoty"); + return NULL; } if (marker) { snprintf(data.nextMarker, sizeof(data.nextMarker), "%s", marker); @@ -693,18 +705,15 @@ void s3DeleteObjectsByPrefix(const char *prefix) { if (data.status == S3StatusOK) { if (data.keyCount > 0) { - // printListBucketHeader(allDetails); - s3DeleteObjects(TARRAY_DATA(data.objectArray), TARRAY_SIZE(data.objectArray)); + return data.objectArray; } } else { s3PrintError(__func__, data.status, data.err_msg); } - - taosArrayDestroyEx(data.objectArray, s3FreeObjectKey); + return NULL; } void s3DeleteObjects(const char *object_name[], int nobject) { - int status = 0; S3BucketContext bucketContext = {0, tsS3BucketName, protocolG, uriStyleG, tsS3AccessKeyId, tsS3AccessKeySecret, 0, awsRegionG}; S3ResponseHandler responseHandler = {0, &responseCompleteCallback}; @@ -721,6 +730,13 @@ void s3DeleteObjects(const char *object_name[], int nobject) { } } +void s3DeleteObjectsByPrefix(const char *prefix) { + SArray* objectArray = getListByPrefix(prefix); + if(objectArray == NULL)return; + s3DeleteObjects(TARRAY_DATA(objectArray), TARRAY_SIZE(objectArray)); + taosArrayDestroyEx(objectArray, s3FreeObjectKey); +} + static S3Status getObjectDataCallback(int bufferSize, const char *buffer, void *callbackData) { TS3SizeCBD *cbd = callbackData; if (cbd->content_length != bufferSize) { @@ -758,7 +774,7 @@ int32_t s3GetObjectBlock(const char *object_name, int64_t offset, int64_t size, } while (S3_status_is_retryable(cbd.status) && should_retry()); if (cbd.status != S3StatusOK) { - vError("%s: %d(%s)", __func__, cbd.status, cbd.err_msg); + uError("%s: %d(%s)", __func__, cbd.status, cbd.err_msg); return TAOS_SYSTEM_ERROR(EIO); } @@ -767,6 +783,67 @@ int32_t s3GetObjectBlock(const char *object_name, int64_t offset, int64_t size, return 0; } +static S3Status getObjectCallback(int bufferSize, const char *buffer, void *callbackData) { + TS3GetData *cbd = (TS3GetData *) callbackData; + size_t wrote = taosWriteFile(cbd->file, buffer, bufferSize); + return ((wrote < (size_t) bufferSize) ? + S3StatusAbortedByCallback : S3StatusOK); +} + +int32_t s3GetObjectToFile(const char *object_name, char* fileName) { + int64_t ifModifiedSince = -1, ifNotModifiedSince = -1; + const char *ifMatch = 0, *ifNotMatch = 0; + + S3BucketContext bucketContext = {0, tsS3BucketName, protocolG, uriStyleG, tsS3AccessKeyId, tsS3AccessKeySecret, + 0, awsRegionG}; + S3GetConditions getConditions = {ifModifiedSince, ifNotModifiedSince, ifMatch, ifNotMatch}; + S3GetObjectHandler getObjectHandler = {{&responsePropertiesCallbackNull, &responseCompleteCallback}, + &getObjectCallback}; + + TdFilePtr pFile = taosOpenFile(fileName, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC); + if (pFile == NULL) { + uError("[s3] open file error, errno:%d, fileName:%s", errno, fileName); + return -1; + } + TS3GetData cbd = {0}; + cbd.file = pFile; + do { + S3_get_object(&bucketContext, object_name, &getConditions, 0, 0, 0, 0, &getObjectHandler, &cbd); + } while (S3_status_is_retryable(cbd.status) && should_retry()); + + if (cbd.status != S3StatusOK) { + uError("%s: %d(%s)", __func__, cbd.status, cbd.err_msg); + taosCloseFile(&pFile); + return TAOS_SYSTEM_ERROR(EIO); + } + + taosCloseFile(&pFile); + return 0; +} + +int32_t s3GetObjectsByPrefix(const char *prefix, const char* path){ + SArray* objectArray = getListByPrefix(prefix); + if(objectArray == NULL) return -1; + + for(size_t i = 0; i < taosArrayGetSize(objectArray); i++){ + char* object = taosArrayGetP(objectArray, i); + const char* tmp = strchr(object, '/'); + tmp = (tmp == NULL) ? object : tmp + 1; + char fileName[PATH_MAX] = {0}; + if(path[strlen(path) - 1] != TD_DIRSEP_CHAR){ + snprintf(fileName, PATH_MAX, "%s%s%s", path, TD_DIRSEP, tmp); + }else{ + snprintf(fileName, PATH_MAX, "%s%s", path, tmp); + } + if(s3GetObjectToFile(object, fileName) != 0){ + taosArrayDestroyEx(objectArray, s3FreeObjectKey); + return -1; + } + } + taosArrayDestroyEx(objectArray, s3FreeObjectKey); + return 0; +} + long s3Size(const char *object_name) { long size = 0; int status = 0; @@ -782,7 +859,7 @@ long s3Size(const char *object_name) { } while (S3_status_is_retryable(cbd.status) && should_retry()); if ((cbd.status != S3StatusOK) && (cbd.status != S3StatusErrorPreconditionFailed)) { - vError("%s: %d(%s)", __func__, cbd.status, cbd.err_msg); + uError("%s: %d(%s)", __func__, cbd.status, cbd.err_msg); } size = cbd.content_length; @@ -1237,6 +1314,7 @@ void s3DeleteObjectsByPrefix(const char *prefix) {} void s3DeleteObjects(const char *object_name[], int nobject) {} bool s3Exists(const char *object_name) { return false; } bool s3Get(const char *object_name, const char *path) { return false; } +int32_t s3GetObjectsByPrefix(const char *prefix, const char* path) { return 0; } int32_t s3GetObjectBlock(const char *object_name, int64_t offset, int64_t size, uint8_t **ppBlock) { return 0; } void s3EvictCache(const char *path, long object_size) {} long s3Size(const char *object_name) { return 0; } diff --git a/source/common/src/rsync.c b/source/common/src/rsync.c new file mode 100644 index 0000000000..ffab85761e --- /dev/null +++ b/source/common/src/rsync.c @@ -0,0 +1,235 @@ +// +// Created by mingming wanng on 2023/11/2. +// +#include "rsync.h" +#include +#include "tglobal.h" + +#define ERRNO_ERR_FORMAT "errno:%d,msg:%s" +#define ERRNO_ERR_DATA errno,strerror(errno) + +// deleteRsync function produce empty directories, traverse base directory to remove them +static void removeEmptyDir(){ + TdDirPtr pDir = taosOpenDir(tsCheckpointBackupDir); + if (pDir == NULL) return; + + TdDirEntryPtr de = NULL; + while ((de = taosReadDir(pDir)) != NULL) { + if (!taosDirEntryIsDir(de)) { + continue; + } + + if (strcmp(taosGetDirEntryName(de), ".") == 0 || strcmp(taosGetDirEntryName(de), "..") == 0) continue; + + char filename[PATH_MAX] = {0}; + snprintf(filename, sizeof(filename), "%s%s", tsCheckpointBackupDir, taosGetDirEntryName(de)); + + TdDirPtr pDirTmp = taosOpenDir(filename); + TdDirEntryPtr deTmp = NULL; + bool empty = true; + while ((deTmp = taosReadDir(pDirTmp)) != NULL){ + if (strcmp(taosGetDirEntryName(deTmp), ".") == 0 || strcmp(taosGetDirEntryName(deTmp), "..") == 0) continue; + empty = false; + } + if(empty) taosRemoveDir(filename); + taosCloseDir(&pDirTmp); + } + + taosCloseDir(&pDir); +} + +#ifdef WINDOWS +// C:\TDengine\data\backup\checkpoint\ -> /c/TDengine/data/backup/checkpoint/ +static void changeDirFromWindowsToLinux(char* from, char* to){ + to[0] = '/'; + to[1] = from[0]; + for(int i = 2; i < strlen(from); i++) { + if (from[i] == '\\') { + to[i] = '/'; + } else { + to[i] = from[i]; + } + } +} +#endif + +static int generateConfigFile(char* confDir){ + TdFilePtr pFile = taosOpenFile(confDir, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC); + if (pFile == NULL) { + uError("[rsync] open conf file error, dir:%s,"ERRNO_ERR_FORMAT, confDir, ERRNO_ERR_DATA); + return -1; + } + +#ifdef WINDOWS + char path[PATH_MAX] = {0}; + changeDirFromWindowsToLinux(tsCheckpointBackupDir, path); +#endif + + char confContent[PATH_MAX*4] = {0}; + snprintf(confContent, PATH_MAX*4, +#ifndef WINDOWS + "uid = root\n" + "gid = root\n" +#endif + "use chroot = false\n" + "max connections = 200\n" + "timeout = 100\n" + "lock file = %srsync.lock\n" + "log file = %srsync.log\n" + "ignore errors = true\n" + "read only = false\n" + "list = false\n" + "[checkpoint]\n" + "path = %s", tsCheckpointBackupDir, tsCheckpointBackupDir, +#ifdef WINDOWS + path +#else + tsCheckpointBackupDir +#endif + ); + uDebug("[rsync] conf:%s", confContent); + if (taosWriteFile(pFile, confContent, strlen(confContent)) <= 0){ + uError("[rsync] write conf file error,"ERRNO_ERR_FORMAT, ERRNO_ERR_DATA); + taosCloseFile(&pFile); + return -1; + } + + taosCloseFile(&pFile); + return 0; +} + +static int execCommand(char* command){ + int try = 3; + int32_t code = 0; + while(try-- > 0) { + code = system(command); + if (code == 0) { + break; + } + taosMsleep(10); + } + return code; +} + +void stopRsync(){ + int code = +#ifdef WINDOWS + system("taskkill /f /im rsync.exe"); +#else + system("pkill rsync"); +#endif + if(code != 0){ + uError("[rsync] stop rsync server failed,"ERRNO_ERR_FORMAT, ERRNO_ERR_DATA); + return; + } + uDebug("[rsync] stop rsync server successful"); +} + +void startRsync(){ + if(taosMulMkDir(tsCheckpointBackupDir) != 0){ + uError("[rsync] build checkpoint backup dir failed, dir:%s,"ERRNO_ERR_FORMAT, tsCheckpointBackupDir, ERRNO_ERR_DATA); + return; + } + removeEmptyDir(); + + char confDir[PATH_MAX] = {0}; + snprintf(confDir, PATH_MAX, "%srsync.conf", tsCheckpointBackupDir); + + int code = generateConfigFile(confDir); + if(code != 0){ + return; + } + + char cmd[PATH_MAX] = {0}; + snprintf(cmd, PATH_MAX, "rsync --daemon --port=%d --config=%s", tsRsyncPort, confDir); + // start rsync service to backup checkpoint + code = system(cmd); + if(code != 0){ + uError("[rsync] start server failed, code:%d,"ERRNO_ERR_FORMAT, code, ERRNO_ERR_DATA); + return; + } + uDebug("[rsync] start server successful"); +} + +int uploadRsync(char* id, char* path){ +#ifdef WINDOWS + char pathTransform[PATH_MAX] = {0}; + changeDirFromWindowsToLinux(path, pathTransform); +#endif + char command[PATH_MAX] = {0}; +#ifdef WINDOWS + if(pathTransform[strlen(pathTransform) - 1] != '/'){ +#else + if(path[strlen(path) - 1] != '/'){ +#endif + snprintf(command, PATH_MAX, "rsync -av --delete --timeout=10 --bwlimit=100000 %s/ rsync://%s/checkpoint/%s/", +#ifdef WINDOWS + pathTransform +#else + path +#endif + , tsSnodeAddress, id); + }else{ + snprintf(command, PATH_MAX, "rsync -av --delete --timeout=10 --bwlimit=100000 %s rsync://%s/checkpoint/%s/", +#ifdef WINDOWS + pathTransform +#else + path +#endif + , tsSnodeAddress, id); + } + + int code = execCommand(command); + if(code != 0){ + uError("[rsync] send failed code:%d," ERRNO_ERR_FORMAT, code, ERRNO_ERR_DATA); + return -1; + } + uDebug("[rsync] upload data:%s successful", id); + return 0; +} + +int downloadRsync(char* id, char* path){ +#ifdef WINDOWS + char pathTransform[PATH_MAX] = {0}; + changeDirFromWindowsToLinux(path, pathTransform); +#endif + char command[PATH_MAX] = {0}; + snprintf(command, PATH_MAX, "rsync -av --timeout=10 --bwlimit=100000 rsync://%s/checkpoint/%s/ %s", + tsSnodeAddress, id, +#ifdef WINDOWS + pathTransform +#else + path +#endif + ); + + int code = execCommand(command); + if(code != 0){ + uError("[rsync] get failed code:%d," ERRNO_ERR_FORMAT, code, ERRNO_ERR_DATA); + return -1; + } + uDebug("[rsync] down data:%s successful", id); + return 0; +} + +int deleteRsync(char* id){ + char* tmp = "./tmp_empty/"; + int code = taosMkDir(tmp); + if(code != 0){ + uError("[rsync] make tmp dir failed. code:%d," ERRNO_ERR_FORMAT, code, ERRNO_ERR_DATA); + return -1; + } + char command[PATH_MAX] = {0}; + snprintf(command, PATH_MAX, "rsync -av --delete --timeout=10 %s rsync://%s/checkpoint/%s/", + tmp, tsSnodeAddress, id); + + code = execCommand(command); + taosRemoveDir(tmp); + if(code != 0){ + uError("[rsync] get failed code:%d," ERRNO_ERR_FORMAT, code, ERRNO_ERR_DATA); + return -1; + } + uDebug("[rsync] delete data:%s successful", id); + + return 0; +} \ No newline at end of file diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index e3ccf93940..d441b22aa3 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -127,6 +127,15 @@ char tsSmlAutoChildTableNameDelimiter[TSDB_TABLE_NAME_LEN] = ""; // bool tsSmlDataFormat = false; // int32_t tsSmlBatchSize = 10000; +// checkpoint backup +char tsSnodeAddress[TSDB_FQDN_LEN] = {0}; +int32_t tsRsyncPort = 873; +#ifdef WINDOWS +char tsCheckpointBackupDir[PATH_MAX] = "C:\\TDengine\\data\\backup\\checkpoint\\"; +#else +char tsCheckpointBackupDir[PATH_MAX] = "/var/lib/taos/backup/checkpoint/"; +#endif + // tmq int32_t tmqMaxTopicNum = 20; // query @@ -260,6 +269,7 @@ char tsS3AccessKeySecret[TSDB_FQDN_LEN] = ""; char tsS3BucketName[TSDB_FQDN_LEN] = ""; char tsS3AppId[TSDB_FQDN_LEN] = ""; int8_t tsS3Enabled = false; +int8_t tsS3StreamEnabled = false; int8_t tsS3Https = true; char tsS3Hostname[TSDB_FQDN_LEN] = ""; @@ -323,9 +333,10 @@ int32_t taosSetS3Cfg(SConfig *pCfg) { tstrncpy(tsS3AppId, appid + 1, TSDB_FQDN_LEN); } } - if (tsS3BucketName[0] != '<' && tsDiskCfgNum > 1) { + if (tsS3BucketName[0] != '<') { #if defined(USE_COS) || defined(USE_S3) - tsS3Enabled = true; + if(tsDiskCfgNum > 1) tsS3Enabled = true; + tsS3StreamEnabled = true; #endif } @@ -661,6 +672,10 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { if (cfgAddString(pCfg, "telemetryServer", tsTelemServer, CFG_SCOPE_BOTH, CFG_DYN_BOTH) != 0) return -1; if (cfgAddInt32(pCfg, "telemetryPort", tsTelemPort, 1, 65056, CFG_SCOPE_BOTH, CFG_DYN_NONE) != 0) return -1; + if (cfgAddInt32(pCfg, "rsyncPort", tsRsyncPort, 1, 65535, CFG_SCOPE_BOTH, CFG_DYN_SERVER) != 0) return -1; + if (cfgAddString(pCfg, "snodeAddress", tsSnodeAddress, CFG_SCOPE_SERVER, CFG_DYN_SERVER) != 0) return -1; + if (cfgAddString(pCfg, "checkpointBackupDir", tsCheckpointBackupDir, CFG_SCOPE_SERVER, CFG_DYN_SERVER) != 0) return -1; + if (cfgAddInt32(pCfg, "tmqMaxTopicNum", tmqMaxTopicNum, 1, 10000, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != 0) return -1; @@ -1099,7 +1114,10 @@ static int32_t taosSetServerCfg(SConfig *pCfg) { tsTtlChangeOnWrite = cfgGetItem(pCfg, "ttlChangeOnWrite")->bval; tsTtlFlushThreshold = cfgGetItem(pCfg, "ttlFlushThreshold")->i32; tsTelemInterval = cfgGetItem(pCfg, "telemetryInterval")->i32; + tsRsyncPort = cfgGetItem(pCfg, "rsyncPort")->i32; tstrncpy(tsTelemServer, cfgGetItem(pCfg, "telemetryServer")->str, TSDB_FQDN_LEN); + tstrncpy(tsSnodeAddress, cfgGetItem(pCfg, "snodeAddress")->str, TSDB_FQDN_LEN); + tstrncpy(tsCheckpointBackupDir, cfgGetItem(pCfg, "checkpointBackupDir")->str, PATH_MAX); tsTelemPort = (uint16_t)cfgGetItem(pCfg, "telemetryPort")->i32; tmqMaxTopicNum = cfgGetItem(pCfg, "tmqMaxTopicNum")->i32; diff --git a/source/dnode/snode/src/snode.c b/source/dnode/snode/src/snode.c index f2ef00c534..6f5b370826 100644 --- a/source/dnode/snode/src/snode.c +++ b/source/dnode/snode/src/snode.c @@ -13,6 +13,7 @@ * along with this program. If not, see . */ +#include "rsync.h" #include "executor.h" #include "sndInt.h" #include "tstream.h" @@ -122,6 +123,9 @@ SSnode *sndOpen(const char *path, const SSnodeOpt *pOption) { goto FAIL; } + stopRsync(); + startRsync(); + // todo fix it: send msg to mnode to rollback to an existed checkpoint streamMetaInitForSnode(pSnode->pMeta); return pSnode; diff --git a/source/dnode/vnode/CMakeLists.txt b/source/dnode/vnode/CMakeLists.txt index 2a29a10822..dc43da7fe7 100644 --- a/source/dnode/vnode/CMakeLists.txt +++ b/source/dnode/vnode/CMakeLists.txt @@ -8,7 +8,6 @@ set( "src/vnd/vnodeCommit.c" "src/vnd/vnodeQuery.c" "src/vnd/vnodeModule.c" - "src/vnd/vnodeCos.c" "src/vnd/vnodeSvr.c" "src/vnd/vnodeSync.c" "src/vnd/vnodeSnapshot.c" @@ -161,75 +160,6 @@ target_link_libraries( PUBLIC index ) -if(${BUILD_S3}) - -if(${BUILD_WITH_S3}) - target_include_directories( - vnode - - PUBLIC "$ENV{HOME}/.cos-local.2/include" - ) - - set(CMAKE_FIND_LIBRARY_SUFFIXES ".a") - set(CMAKE_PREFIX_PATH $ENV{HOME}/.cos-local.2) - find_library(S3_LIBRARY s3) - find_library(CURL_LIBRARY curl $ENV{HOME}/.cos-local.2/lib NO_DEFAULT_PATH) - find_library(XML2_LIBRARY xml2) - find_library(SSL_LIBRARY ssl $ENV{HOME}/.cos-local.2/lib $ENV{HOME}/.cos-local.2/lib64 NO_DEFAULT_PATH) - find_library(CRYPTO_LIBRARY crypto $ENV{HOME}/.cos-local.2/lib $ENV{HOME}/.cos-local.2/lib64 NO_DEFAULT_PATH) - target_link_libraries( - vnode - - # s3 - PUBLIC ${S3_LIBRARY} - PUBLIC ${CURL_LIBRARY} - PUBLIC ${SSL_LIBRARY} - PUBLIC ${CRYPTO_LIBRARY} - PUBLIC ${XML2_LIBRARY} - ) - - add_definitions(-DUSE_S3) -endif() - -if(${BUILD_WITH_COS}) - -set(CMAKE_FIND_LIBRARY_SUFFIXES ".a") -find_library(APR_LIBRARY apr-1 PATHS /usr/local/apr/lib/) -find_library(APR_UTIL_LIBRARY aprutil-1 PATHS /usr/local/apr/lib/) -find_library(MINIXML_LIBRARY mxml) -find_library(CURL_LIBRARY curl) -target_link_libraries( - vnode - - # s3 - PUBLIC cos_c_sdk_static - PUBLIC ${APR_UTIL_LIBRARY} - PUBLIC ${APR_LIBRARY} - PUBLIC ${MINIXML_LIBRARY} - PUBLIC ${CURL_LIBRARY} -) - -# s3 -FIND_PROGRAM(APR_CONFIG_BIN NAMES apr-config apr-1-config PATHS /usr/bin /usr/local/bin /usr/local/apr/bin/) -IF (APR_CONFIG_BIN) - EXECUTE_PROCESS( - COMMAND ${APR_CONFIG_BIN} --includedir - OUTPUT_VARIABLE APR_INCLUDE_DIR - OUTPUT_STRIP_TRAILING_WHITESPACE - ) -ENDIF() -include_directories (${APR_INCLUDE_DIR}) -target_include_directories( - vnode - PUBLIC "${TD_SOURCE_DIR}/contrib/cos-c-sdk-v5/cos_c_sdk" - PUBLIC "$ENV{HOME}/.cos-local.1/include" - ) - - add_definitions(-DUSE_COS) -endif(${BUILD_WITH_COS}) - -endif() - IF (TD_GRANT) TARGET_LINK_LIBRARIES(vnode PUBLIC grant) ENDIF () diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c index bf6f0cf4d6..aa83478105 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCache.c +++ b/source/dnode/vnode/src/tsdb/tsdbCache.c @@ -12,11 +12,11 @@ * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ +#include "cos.h" #include "tsdb.h" #include "tsdbDataFileRW.h" #include "tsdbReadUtil.h" #include "vnd.h" -#include "vndCos.h" #define ROCKS_BATCH_SIZE (4096) diff --git a/source/dnode/vnode/src/tsdb/tsdbFS2.c b/source/dnode/vnode/src/tsdb/tsdbFS2.c index 38d221d978..348397272d 100644 --- a/source/dnode/vnode/src/tsdb/tsdbFS2.c +++ b/source/dnode/vnode/src/tsdb/tsdbFS2.c @@ -14,9 +14,9 @@ */ #include "tsdbFS2.h" +#include "cos.h" #include "tsdbUpgrade.h" #include "vnd.h" -#include "vndCos.h" #define BLOCK_COMMIT_FACTOR 3 diff --git a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c index def9a73d10..151c6419cc 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c +++ b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c @@ -13,8 +13,8 @@ * along with this program. If not, see . */ +#include "cos.h" #include "tsdb.h" -#include "vndCos.h" static int32_t tsdbOpenFileImpl(STsdbFD *pFD) { int32_t code = 0; diff --git a/source/dnode/vnode/src/tsdb/tsdbRetention.c b/source/dnode/vnode/src/tsdb/tsdbRetention.c index 0fc1e1b64b..1908f16529 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRetention.c +++ b/source/dnode/vnode/src/tsdb/tsdbRetention.c @@ -15,7 +15,8 @@ #include "tsdb.h" #include "tsdbFS2.h" -#include "vndCos.h" +#include "cos.h" +#include "vnd.h" typedef struct { STsdb *tsdb; diff --git a/source/dnode/vnode/src/tsdb/tsdbWrite.c b/source/dnode/vnode/src/tsdb/tsdbWrite.c index 1e6526da48..43025cbe91 100644 --- a/source/dnode/vnode/src/tsdb/tsdbWrite.c +++ b/source/dnode/vnode/src/tsdb/tsdbWrite.c @@ -13,8 +13,8 @@ * along with this program. If not, see . */ +#include "cos.h" #include "tsdb.h" -#include "vndCos.h" /** * @brief max key by precision diff --git a/source/dnode/vnode/src/vnd/vnodeModule.c b/source/dnode/vnode/src/vnd/vnodeModule.c index 6ccce5c9d7..df08fb8a2b 100644 --- a/source/dnode/vnode/src/vnd/vnodeModule.c +++ b/source/dnode/vnode/src/vnd/vnodeModule.c @@ -13,8 +13,8 @@ * along with this program. If not, see . */ +#include "cos.h" #include "vnd.h" -#include "vndCos.h" typedef struct SVnodeTask SVnodeTask; struct SVnodeTask { diff --git a/source/dnode/vnode/src/vnd/vnodeOpen.c b/source/dnode/vnode/src/vnd/vnodeOpen.c index 3bdecee79b..ff79e83d72 100644 --- a/source/dnode/vnode/src/vnd/vnodeOpen.c +++ b/source/dnode/vnode/src/vnd/vnodeOpen.c @@ -13,10 +13,10 @@ * along with this program. If not, see . */ +#include "cos.h" #include "sync.h" #include "tsdb.h" #include "vnd.h" -#include "vndCos.h" int32_t vnodeGetPrimaryDir(const char *relPath, int32_t diskPrimary, STfs *pTfs, char *buf, size_t bufLen) { if (pTfs) { diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index b31463ac00..3f58de773a 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -18,7 +18,7 @@ #include "tmsg.h" #include "tstrbuild.h" #include "vnd.h" -#include "vndCos.h" +#include "cos.h" #include "vnode.h" #include "vnodeInt.h" diff --git a/source/libs/stream/inc/streamInt.h b/source/libs/stream/inc/streamInt.h index 4a7d3a2a05..4efba478f3 100644 --- a/source/libs/stream/inc/streamInt.h +++ b/source/libs/stream/inc/streamInt.h @@ -140,6 +140,18 @@ void* streamQueueNextItem(SStreamQueue* pQueue); void streamFreeQitem(SStreamQueueItem* data); int32_t streamQueueGetItemSize(const SStreamQueue* pQueue); +typedef enum UPLOAD_TYPE{ + UPLOAD_DISABLE = -1, + UPLOAD_S3 = 0, + UPLOAD_RSYNC = 1, +} UPLOAD_TYPE; + +UPLOAD_TYPE getUploadType(); +int uploadCheckpoint(char* id, char* path); +int downloadCheckpoint(char* id, char* path); +int deleteCheckpoint(char* id); +int deleteCheckpointFile(char* id, char* name); + int32_t onNormalTaskReady(SStreamTask* pTask); int32_t onScanhistoryTaskReady(SStreamTask* pTask); diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index 81840aaeb7..5479a2dab2 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -14,6 +14,8 @@ */ #include "streamInt.h" +#include "rsync.h" +#include "cos.h" int32_t tEncodeStreamCheckpointSourceReq(SEncoder* pEncoder, const SStreamCheckpointSourceReq* pReq) { if (tStartEncode(pEncoder) < 0) return -1; @@ -372,3 +374,91 @@ int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) { return code; } + +static int uploadCheckpointToS3(char* id, char* path){ + TdDirPtr pDir = taosOpenDir(path); + if (pDir == NULL) return -1; + + TdDirEntryPtr de = NULL; + while ((de = taosReadDir(pDir)) != NULL) { + char* name = taosGetDirEntryName(de); + if (strcmp(name, ".") == 0 || strcmp(name, "..") == 0 || + taosDirEntryIsDir(de)) continue; + + char filename[PATH_MAX] = {0}; + if(path[strlen(path) - 1] == TD_DIRSEP_CHAR){ + snprintf(filename, sizeof(filename), "%s%s", path, name); + }else{ + snprintf(filename, sizeof(filename), "%s%s%s", path, TD_DIRSEP, name); + } + + char object[PATH_MAX] = {0}; + snprintf(object, sizeof(object), "%s%s%s", id, TD_DIRSEP, name); + + if(s3PutObjectFromFile2(filename, object) != 0){ + taosCloseDir(&pDir); + return -1; + } + stDebug("[s3] upload checkpoint:%s", filename); + } + taosCloseDir(&pDir); + + return 0; +} + +UPLOAD_TYPE getUploadType(){ + if(strlen(tsSnodeAddress) != 0){ + return UPLOAD_RSYNC; + }else if(tsS3StreamEnabled){ + return UPLOAD_S3; + }else{ + return UPLOAD_DISABLE; + } +} + +int uploadCheckpoint(char* id, char* path){ + if(id == NULL || path == NULL || strlen(id) == 0 || strlen(path) == 0 || strlen(path) >= PATH_MAX){ + stError("uploadCheckpoint parameters invalid"); + return -1; + } + if(strlen(tsSnodeAddress) != 0){ + return uploadRsync(id, path); + }else if(tsS3StreamEnabled){ + return uploadCheckpointToS3(id, path); + } + return 0; +} + +int downloadCheckpoint(char* id, char* path){ + if(id == NULL || path == NULL || strlen(id) == 0 || strlen(path) == 0 || strlen(path) >= PATH_MAX){ + stError("downloadCheckpoint parameters invalid"); + return -1; + } + if(strlen(tsSnodeAddress) != 0){ + return downloadRsync(id, path); + }else if(tsS3StreamEnabled){ + return s3GetObjectsByPrefix(id, path); + } + return 0; +} + +int deleteCheckpoint(char* id){ + if(id == NULL || strlen(id) == 0){ + stError("deleteCheckpoint parameters invalid"); + return -1; + } + if(strlen(tsSnodeAddress) != 0){ + return deleteRsync(id); + }else if(tsS3StreamEnabled){ + s3DeleteObjectsByPrefix(id); + } + return 0; +} + +int deleteCheckpointFile(char* id, char* name){ + char object[128] = {0}; + snprintf(object, sizeof(object), "%s/%s", id, name); + char *tmp = object; + s3DeleteObjects((const char**)&tmp, 1); + return 0; +} diff --git a/source/libs/stream/src/streamSnapshot.c b/source/libs/stream/src/streamSnapshot.c index 3de5de9967..757164739a 100644 --- a/source/libs/stream/src/streamSnapshot.c +++ b/source/libs/stream/src/streamSnapshot.c @@ -19,7 +19,6 @@ #include "streamBackendRocksdb.h" #include "streamInt.h" #include "tcommon.h" -#include "streamInt.h" enum SBackendFileType { ROCKSDB_OPTIONS_TYPE = 1, @@ -52,6 +51,7 @@ struct SStreamSnapHandle { int8_t filetype; SArray* pFileList; int32_t currFileIdx; + int8_t delFlag; // 0 : not del, 1: del }; struct SStreamSnapBlockHdr { int8_t type; @@ -148,6 +148,7 @@ int32_t streamSnapHandleInit(SStreamSnapHandle* pHandle, char* path, int64_t chk taosMemoryFree(tdir); return code; } + pHandle->delFlag = 1; chkpId = 0; } @@ -274,7 +275,7 @@ void streamSnapHandleDestroy(SStreamSnapHandle* handle) { if (handle->checkpointId == 0) { // del tmp dir if (pFile && taosIsDir(pFile->path)) { - taosRemoveDir(pFile->path); + if (handle->delFlag) taosRemoveDir(pFile->path); } } else { streamBackendDelInUseChkp(handle->handle, handle->checkpointId); @@ -344,10 +345,10 @@ int32_t streamSnapRead(SStreamSnapReader* pReader, uint8_t** ppData, int64_t* si stDebug("%s start to read file %s, current offset:%" PRId64 ", size:%" PRId64 ", file no.%d", STREAM_STATE_TRANSFER, item->name, (int64_t)pHandle->offset, item->size, pHandle->currFileIdx); uint8_t* buf = taosMemoryCalloc(1, sizeof(SStreamSnapBlockHdr) + kBlockSize); - if(buf == NULL){ + if (buf == NULL) { return TSDB_CODE_OUT_OF_MEMORY; } - int64_t nread = taosPReadFile(pHandle->fd, buf + sizeof(SStreamSnapBlockHdr), kBlockSize, pHandle->offset); + int64_t nread = taosPReadFile(pHandle->fd, buf + sizeof(SStreamSnapBlockHdr), kBlockSize, pHandle->offset); if (nread == -1) { taosMemoryFree(buf); code = TAOS_SYSTEM_ERROR(terrno); @@ -423,6 +424,7 @@ int32_t streamSnapWriterOpen(void* pMeta, int64_t sver, int64_t ever, char* path pHandle->pFileList = list; pHandle->currFileIdx = 0; pHandle->offset = 0; + pHandle->delFlag = 0; *ppWriter = pWriter; return 0; diff --git a/source/libs/stream/test/CMakeLists.txt b/source/libs/stream/test/CMakeLists.txt index 629b04ae51..d756b71e29 100644 --- a/source/libs/stream/test/CMakeLists.txt +++ b/source/libs/stream/test/CMakeLists.txt @@ -18,6 +18,17 @@ TARGET_INCLUDE_DIRECTORIES( PRIVATE "${TD_SOURCE_DIR}/source/libs/stream/inc" ) +ADD_EXECUTABLE(checkpointTest checkpointTest.cpp) +TARGET_LINK_LIBRARIES( + checkpointTest + PUBLIC os common gtest stream executor qcom index transport util +) + +TARGET_INCLUDE_DIRECTORIES( + checkpointTest + PRIVATE "${TD_SOURCE_DIR}/source/libs/stream/inc" +) + add_test( NAME streamUpdateTest COMMAND streamUpdateTest diff --git a/source/libs/stream/test/checkpointTest.cpp b/source/libs/stream/test/checkpointTest.cpp new file mode 100644 index 0000000000..dca2e97c28 --- /dev/null +++ b/source/libs/stream/test/checkpointTest.cpp @@ -0,0 +1,68 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include + +#include +#include +#include + +#pragma GCC diagnostic push +#pragma GCC diagnostic ignored "-Wwrite-strings" +#pragma GCC diagnostic ignored "-Wunused-function" +#pragma GCC diagnostic ignored "-Wunused-variable" +#pragma GCC diagnostic ignored "-Wsign-compare" + +#include "rsync.h" +#include "streamInt.h" +#include "cos.h" + +int main(int argc, char **argv) { + testing::InitGoogleTest(&argc, argv); + + if (taosInitCfg("/etc/taos/", NULL, NULL, NULL, NULL, 0) != 0) { + printf("error"); + } + if (s3Init() < 0) { + return -1; + } + strcpy(tsSnodeAddress, "127.0.0.1"); + return RUN_ALL_TESTS(); +} + +TEST(testCase, checkpointUpload_Test) { + stopRsync(); + startRsync(); + + taosSsleep(5); + char* id = "2013892036"; + + uploadCheckpoint(id, "/root/offset/"); +} + +TEST(testCase, checkpointDownload_Test) { + char* id = "2013892036"; + downloadCheckpoint(id, "/root/offset/download/"); +} + +TEST(testCase, checkpointDelete_Test) { + char* id = "2013892036"; + deleteCheckpoint(id); +} + +TEST(testCase, checkpointDeleteFile_Test) { + char* id = "2013892036"; + deleteCheckpointFile(id, "offset-ver0"); +}