From f0b2ed567c50afb189c245e0f6a565f747675b68 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Tue, 7 Nov 2023 18:49:12 +0800 Subject: [PATCH] fix:upload checkpoint to s3 --- include/common/cos.h | 1 + include/common/tglobal.h | 3 +- source/common/src/cos.c | 76 ++++++++++++++++++++-- source/common/src/rsync.c | 10 +-- source/common/src/tglobal.c | 12 +++- source/libs/stream/inc/streamInt.h | 8 ++- source/libs/stream/src/streamCheckpoint.c | 73 +++++++++++++++++---- source/libs/stream/test/checkpointTest.cpp | 24 +++++-- 8 files changed, 172 insertions(+), 35 deletions(-) diff --git a/include/common/cos.h b/include/common/cos.h index 4d676d9ca7..0610c63f9e 100644 --- a/include/common/cos.h +++ b/include/common/cos.h @@ -39,6 +39,7 @@ void s3DeleteObjectsByPrefix(const char *prefix); void s3DeleteObjects(const char *object_name[], int nobject); bool s3Exists(const char *object_name); bool s3Get(const char *object_name, const char *path); +int32_t s3GetObjectsByPrefix(const char *prefix, const char* path); int32_t s3GetObjectBlock(const char *object_name, int64_t offset, int64_t size, uint8_t **ppBlock); void s3EvictCache(const char *path, long object_size); long s3Size(const char *object_name); diff --git a/include/common/tglobal.h b/include/common/tglobal.h index b9bc046504..cc45e5ec68 100644 --- a/include/common/tglobal.h +++ b/include/common/tglobal.h @@ -83,7 +83,8 @@ extern int32_t tsHeartbeatTimeout; extern int64_t tsVndCommitMaxIntervalMs; // snode -extern char tsSnodeIp[]; +extern char tsSnodeAddress[]; //127.0.0.1:873 +extern int32_t tsRsyncPort; extern char tsCheckpointBackupDir[]; // mnode diff --git a/source/common/src/cos.c b/source/common/src/cos.c index 67ddb4a748..c209f1a063 100644 --- a/source/common/src/cos.c +++ b/source/common/src/cos.c @@ -72,6 +72,12 @@ static void s3PrintError(const char *func, S3Status status, char error_details[] } } +typedef struct { + char err_msg[128]; + S3Status status; + TdFilePtr file; +} TS3GetData; + typedef struct { char err_msg[128]; S3Status status; @@ -659,7 +665,7 @@ static void s3FreeObjectKey(void *pItem) { taosMemoryFree(key); } -void s3DeleteObjectsByPrefix(const char *prefix) { +static SArray* getListByPrefix(const char *prefix){ S3BucketContext bucketContext = {0, tsS3BucketName, protocolG, uriStyleG, tsS3AccessKeyId, tsS3AccessKeySecret, 0, awsRegionG}; S3ListBucketHandler listBucketHandler = {{&responsePropertiesCallback, &responseCompleteCallback}, @@ -671,7 +677,7 @@ void s3DeleteObjectsByPrefix(const char *prefix) { data.objectArray = taosArrayInit(32, sizeof(void*)); if (!data.objectArray) { uError("%s: %s", __func__, "out of memoty"); - return; + return NULL; } if (marker) { snprintf(data.nextMarker, sizeof(data.nextMarker), "%s", marker); @@ -694,18 +700,15 @@ void s3DeleteObjectsByPrefix(const char *prefix) { if (data.status == S3StatusOK) { if (data.keyCount > 0) { - // printListBucketHeader(allDetails); - s3DeleteObjects(TARRAY_DATA(data.objectArray), TARRAY_SIZE(data.objectArray)); + return data.objectArray; } } else { s3PrintError(__func__, data.status, data.err_msg); } - - taosArrayDestroyEx(data.objectArray, s3FreeObjectKey); + return NULL; } void s3DeleteObjects(const char *object_name[], int nobject) { - int status = 0; S3BucketContext bucketContext = {0, tsS3BucketName, protocolG, uriStyleG, tsS3AccessKeyId, tsS3AccessKeySecret, 0, awsRegionG}; S3ResponseHandler responseHandler = {0, &responseCompleteCallback}; @@ -722,6 +725,13 @@ void s3DeleteObjects(const char *object_name[], int nobject) { } } +void s3DeleteObjectsByPrefix(const char *prefix) { + SArray* objectArray = getListByPrefix(prefix); + if(objectArray == NULL)return; + s3DeleteObjects(TARRAY_DATA(objectArray), TARRAY_SIZE(objectArray)); + taosArrayDestroyEx(objectArray, s3FreeObjectKey); +} + static S3Status getObjectDataCallback(int bufferSize, const char *buffer, void *callbackData) { TS3SizeCBD *cbd = callbackData; if (cbd->content_length != bufferSize) { @@ -768,6 +778,58 @@ int32_t s3GetObjectBlock(const char *object_name, int64_t offset, int64_t size, return 0; } +static S3Status getObjectCallback(int bufferSize, const char *buffer, void *callbackData) { + TS3GetData *cbd = (TS3GetData *) callbackData; + size_t wrote = taosWriteFile(cbd->file, buffer, bufferSize); + return ((wrote < (size_t) bufferSize) ? + S3StatusAbortedByCallback : S3StatusOK); +} + +int32_t s3GetObjectToFile(const char *object_name, char* fileName) { + int64_t ifModifiedSince = -1, ifNotModifiedSince = -1; + const char *ifMatch = 0, *ifNotMatch = 0; + + S3BucketContext bucketContext = {0, tsS3BucketName, protocolG, uriStyleG, tsS3AccessKeyId, tsS3AccessKeySecret, + 0, awsRegionG}; + S3GetConditions getConditions = {ifModifiedSince, ifNotModifiedSince, ifMatch, ifNotMatch}; + S3GetObjectHandler getObjectHandler = {{NULL, &responseCompleteCallback}, + &getObjectCallback}; + + TdFilePtr pFile = taosOpenFile(fileName, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC); + if (pFile == NULL) { + uError("[s3] open file error, errno:%d, fileName:%s", errno, fileName); + return -1; + } + TS3GetData cbd = {0}; + cbd.file = pFile; + do { + S3_get_object(&bucketContext, object_name, &getConditions, 0, 0, 0, 0, &getObjectHandler, &cbd); + } while (S3_status_is_retryable(cbd.status) && should_retry()); + + if (cbd.status != S3StatusOK) { + uError("%s: %d(%s)", __func__, cbd.status, cbd.err_msg); + taosCloseFile(&pFile); + return TAOS_SYSTEM_ERROR(EIO); + } + + taosCloseFile(&pFile); + return 0; +} + +int32_t s3GetObjectsByPrefix(const char *prefix, const char* path){ + SArray* objectArray = getListByPrefix(prefix); + if(objectArray == NULL) return -1; + + for(size_t i = 0; i < taosArrayGetSize(objectArray); i++){ + char* object = taosArrayGetP(objectArray, i); + char fileName[PATH_MAX] = {0}; + snprintf(fileName, PATH_MAX, "%s/%s", path, object); + s3GetObjectToFile(object, fileName); + } + taosArrayDestroyEx(objectArray, s3FreeObjectKey); + return 0; +} + long s3Size(const char *object_name) { long size = 0; int status = 0; diff --git a/source/common/src/rsync.c b/source/common/src/rsync.c index cbb447a33e..ffab85761e 100644 --- a/source/common/src/rsync.c +++ b/source/common/src/rsync.c @@ -141,7 +141,7 @@ void startRsync(){ } char cmd[PATH_MAX] = {0}; - snprintf(cmd, PATH_MAX, "rsync --daemon --config=%s", confDir); + snprintf(cmd, PATH_MAX, "rsync --daemon --port=%d --config=%s", tsRsyncPort, confDir); // start rsync service to backup checkpoint code = system(cmd); if(code != 0){ @@ -168,7 +168,7 @@ int uploadRsync(char* id, char* path){ #else path #endif - , tsSnodeIp, id); + , tsSnodeAddress, id); }else{ snprintf(command, PATH_MAX, "rsync -av --delete --timeout=10 --bwlimit=100000 %s rsync://%s/checkpoint/%s/", #ifdef WINDOWS @@ -176,7 +176,7 @@ int uploadRsync(char* id, char* path){ #else path #endif - , tsSnodeIp, id); + , tsSnodeAddress, id); } int code = execCommand(command); @@ -195,7 +195,7 @@ int downloadRsync(char* id, char* path){ #endif char command[PATH_MAX] = {0}; snprintf(command, PATH_MAX, "rsync -av --timeout=10 --bwlimit=100000 rsync://%s/checkpoint/%s/ %s", - tsSnodeIp, id, + tsSnodeAddress, id, #ifdef WINDOWS pathTransform #else @@ -221,7 +221,7 @@ int deleteRsync(char* id){ } char command[PATH_MAX] = {0}; snprintf(command, PATH_MAX, "rsync -av --delete --timeout=10 %s rsync://%s/checkpoint/%s/", - tmp, tsSnodeIp, id); + tmp, tsSnodeAddress, id); code = execCommand(command); taosRemoveDir(tmp); diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 3593665de9..14036b3e74 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -134,7 +134,8 @@ char tsSmlAutoChildTableNameDelimiter[TSDB_TABLE_NAME_LEN] = ""; // int32_t tsSmlBatchSize = 10000; // checkpoint backup -char tsSnodeIp[TSDB_FQDN_LEN] = {0}; +char tsSnodeAddress[TSDB_FQDN_LEN] = {0}; +int32_t tsRsyncPort = 873; #ifdef WINDOWS char tsCheckpointBackupDir[PATH_MAX] = "C:\\TDengine\\data\\backup\\checkpoint\\"; #else @@ -661,7 +662,8 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { if (cfgAddString(pCfg, "telemetryServer", tsTelemServer, CFG_SCOPE_BOTH) != 0) return -1; if (cfgAddInt32(pCfg, "telemetryPort", tsTelemPort, 1, 65056, CFG_SCOPE_BOTH) != 0) return -1; - if (cfgAddString(pCfg, "snodeIp", tsSnodeIp, CFG_SCOPE_SERVER) != 0) return -1; + if (cfgAddInt32(pCfg, "snodeRsyncPort", tsRsyncPort, 1, 65535, CFG_SCOPE_BOTH) != 0) return -1; + if (cfgAddString(pCfg, "snodeRsyncAddress", tsSnodeAddress, CFG_SCOPE_SERVER) != 0) return -1; if (cfgAddString(pCfg, "checkpointBackupDir", tsCheckpointBackupDir, CFG_SCOPE_SERVER) != 0) return -1; if (cfgAddInt32(pCfg, "tmqMaxTopicNum", tmqMaxTopicNum, 1, 10000, CFG_SCOPE_SERVER) != 0) return -1; @@ -1087,8 +1089,9 @@ static int32_t taosSetServerCfg(SConfig *pCfg) { tsTtlChangeOnWrite = cfgGetItem(pCfg, "ttlChangeOnWrite")->bval; tsTtlFlushThreshold = cfgGetItem(pCfg, "ttlFlushThreshold")->i32; tsTelemInterval = cfgGetItem(pCfg, "telemetryInterval")->i32; + tsRsyncPort = cfgGetItem(pCfg, "rsyncPort")->i32; tstrncpy(tsTelemServer, cfgGetItem(pCfg, "telemetryServer")->str, TSDB_FQDN_LEN); - tstrncpy(tsSnodeIp, cfgGetItem(pCfg, "snodeIp")->str, TSDB_FQDN_LEN); + tstrncpy(tsSnodeAddress, cfgGetItem(pCfg, "snodeAddress")->str, TSDB_FQDN_LEN); tstrncpy(tsCheckpointBackupDir, cfgGetItem(pCfg, "checkpointBackupDir")->str, PATH_MAX); tsTelemPort = (uint16_t)cfgGetItem(pCfg, "telemetryPort")->i32; @@ -1676,6 +1679,9 @@ int32_t taosInitCfg(const char *cfgDir, const char **envCmd, const char *envFile return -1; } + if (taosSetS3Cfg(tsCfg) != 0) return -1; + return 0; + if (tsc) { if (taosSetClientCfg(tsCfg)) return -1; } else { diff --git a/source/libs/stream/inc/streamInt.h b/source/libs/stream/inc/streamInt.h index 86dd8ed542..fb47bf135b 100644 --- a/source/libs/stream/inc/streamInt.h +++ b/source/libs/stream/inc/streamInt.h @@ -148,10 +148,16 @@ int32_t streamQueueGetItemSize(const SStreamQueue* pQueue); // char name[CHECKPOINT_PATH_LEN]; // char id[CHECKPOINT_PATH_LEN]; //} SChekpointDataHeader; - +typedef enum UPLOAD_TYPE{ + UPLOAD_DISABLE = -1, + UPLOAD_S3 = 0, + UPLOAD_RSYNC = 1, +} UPLOAD_TYPE; +UPLOAD_TYPE getUploadType(); int uploadCheckpoint(char* id, char* path); int downloadCheckpoint(char* id, char* path); int deleteCheckpoint(char* id); +int deleteCheckpointFile(char* id, char* name); #ifdef __cplusplus } diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index b30a3bdc50..c3152aebb7 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -15,6 +15,7 @@ #include "streamInt.h" #include "rsync.h" +#include "cos.h" int32_t tEncodeStreamCheckpointSourceReq(SEncoder* pEncoder, const SStreamCheckpointSourceReq* pReq) { if (tStartEncode(pEncoder) < 0) return -1; @@ -466,15 +467,55 @@ int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) { //} +static int uploadCheckpointToS3(char* id, char* path){ + TdDirPtr pDir = taosOpenDir(path); + if (pDir == NULL) return -1; + + TdDirEntryPtr de = NULL; + while ((de = taosReadDir(pDir)) != NULL) { + char* name = taosGetDirEntryName(de); + if (strcmp(name, ".") == 0 || strcmp(name, "..") == 0 || + taosDirEntryIsDir(de)) continue; + + char filename[PATH_MAX] = {0}; + if(path[strlen(path - 1)] == '/'){ + snprintf(filename, sizeof(filename), "%s%s", path, name); + }else{ + snprintf(filename, sizeof(filename), "%s%s%s", path, TD_DIRSEP, name); + } + + char object[PATH_MAX] = {0}; + snprintf(object, sizeof(object), "%s%s%s", id, TD_DIRSEP, name); + + if(s3PutObjectFromFile2(filename, object) != 0){ + taosCloseDir(&pDir); + return -1; + } + } + taosCloseDir(&pDir); + + return 0; +} + +UPLOAD_TYPE getUploadType(){ + if(strlen(tsSnodeAddress) != 0){ + return UPLOAD_RSYNC; + }else if(tsS3StreamEnabled){ + return UPLOAD_S3; + }else{ + return UPLOAD_DISABLE; + } +} + int uploadCheckpoint(char* id, char* path){ if(id == NULL || path == NULL || strlen(id) == 0 || strlen(path) == 0 || strlen(path) >= PATH_MAX){ stError("uploadCheckpoint parameters invalid"); return -1; } - if(strlen(tsSnodeIp) != 0){ - uploadRsync(id, path); -// }else if(tsS3StreamEnabled){ - + if(strlen(tsSnodeAddress) != 0){ + return uploadRsync(id, path); + }else if(tsS3StreamEnabled){ + return uploadCheckpointToS3(id, path); } return 0; } @@ -484,10 +525,10 @@ int downloadCheckpoint(char* id, char* path){ stError("downloadCheckpoint parameters invalid"); return -1; } - if(strlen(tsSnodeIp) != 0){ - downloadRsync(id, path); -// }else if(tsS3StreamEnabled){ - + if(strlen(tsSnodeAddress) != 0){ + return downloadRsync(id, path); + }else if(tsS3StreamEnabled){ + return s3GetObjectsByPrefix(id, path); } return 0; } @@ -497,10 +538,18 @@ int deleteCheckpoint(char* id){ stError("deleteCheckpoint parameters invalid"); return -1; } - if(strlen(tsSnodeIp) != 0){ - deleteRsync(id); -// }else if(tsS3StreamEnabled){ - + if(strlen(tsSnodeAddress) != 0){ + return deleteRsync(id); + }else if(tsS3StreamEnabled){ + s3DeleteObjectsByPrefix(id); } return 0; } + +int deleteCheckpointFile(char* id, char* name){ + char object[128] = {0}; + snprintf(object, sizeof(object), "%s%s%s", id, TD_DIRSEP, name); + char *tmp = object; + s3DeleteObjects((const char**)&tmp, 1); + return 0; +} diff --git a/source/libs/stream/test/checkpointTest.cpp b/source/libs/stream/test/checkpointTest.cpp index 6b53f13c71..135431cfb7 100644 --- a/source/libs/stream/test/checkpointTest.cpp +++ b/source/libs/stream/test/checkpointTest.cpp @@ -27,30 +27,42 @@ #include "rsync.h" #include "streamInt.h" +#include "cos.h" int main(int argc, char **argv) { testing::InitGoogleTest(&argc, argv); - strcpy(tsSnodeIp, "127.0.0.1"); + if (taosInitCfg("/etc/taos/", NULL, NULL, NULL, NULL, 0) != 0) { + printf("error"); + } + if (s3Init() < 0) { + return -1; + } +// strcpy(tsSnodeIp, "127.0.0.1"); return RUN_ALL_TESTS(); } TEST(testCase, checkpointUpload_Test) { - stopRsync(); - startRsync(); +// stopRsync(); +// startRsync(); taosSsleep(5); char* id = "2013892036"; - uploadCheckpoint(id, "/Users/mingmingwanng/rsync/"); + uploadCheckpoint(id, "/root/offset/"); } TEST(testCase, checkpointDownload_Test) { char* id = "2013892036"; - downloadRsync(id, "/Users/mingmingwanng/rsync/tmp"); + downloadCheckpoint(id, "/root/offset/download/"); } TEST(testCase, checkpointDelete_Test) { char* id = "2013892036"; - deleteRsync(id); + deleteCheckpoint(id); +} + +TEST(testCase, checkpointDeleteFile_Test) { + char* id = "2013892036"; + deleteCheckpointFile(id, "offset-ver0"); }