From e0b55cc61fc8a4f4208edccbb127ff45b1c210b4 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Tue, 7 Nov 2023 11:06:50 +0800 Subject: [PATCH] fix:test s3 interface in checkpoint --- include/common/tglobal.h | 3 +- source/common/src/rsync.c | 10 ++-- source/common/src/tglobal.c | 12 +++- source/libs/stream/inc/streamInt.h | 8 ++- source/libs/stream/src/streamCheckpoint.c | 70 ++++++++++++++++++---- source/libs/stream/test/checkpointTest.cpp | 24 ++++++-- 6 files changed, 100 insertions(+), 27 deletions(-) diff --git a/include/common/tglobal.h b/include/common/tglobal.h index b9bc046504..cc45e5ec68 100644 --- a/include/common/tglobal.h +++ b/include/common/tglobal.h @@ -83,7 +83,8 @@ extern int32_t tsHeartbeatTimeout; extern int64_t tsVndCommitMaxIntervalMs; // snode -extern char tsSnodeIp[]; +extern char tsSnodeAddress[]; //127.0.0.1:873 +extern int32_t tsRsyncPort; extern char tsCheckpointBackupDir[]; // mnode diff --git a/source/common/src/rsync.c b/source/common/src/rsync.c index cbb447a33e..ffab85761e 100644 --- a/source/common/src/rsync.c +++ b/source/common/src/rsync.c @@ -141,7 +141,7 @@ void startRsync(){ } char cmd[PATH_MAX] = {0}; - snprintf(cmd, PATH_MAX, "rsync --daemon --config=%s", confDir); + snprintf(cmd, PATH_MAX, "rsync --daemon --port=%d --config=%s", tsRsyncPort, confDir); // start rsync service to backup checkpoint code = system(cmd); if(code != 0){ @@ -168,7 +168,7 @@ int uploadRsync(char* id, char* path){ #else path #endif - , tsSnodeIp, id); + , tsSnodeAddress, id); }else{ snprintf(command, PATH_MAX, "rsync -av --delete --timeout=10 --bwlimit=100000 %s rsync://%s/checkpoint/%s/", #ifdef WINDOWS @@ -176,7 +176,7 @@ int uploadRsync(char* id, char* path){ #else path #endif - , tsSnodeIp, id); + , tsSnodeAddress, id); } int code = execCommand(command); @@ -195,7 +195,7 @@ int downloadRsync(char* id, char* path){ #endif char command[PATH_MAX] = {0}; snprintf(command, PATH_MAX, "rsync -av --timeout=10 --bwlimit=100000 rsync://%s/checkpoint/%s/ %s", - tsSnodeIp, id, + tsSnodeAddress, id, #ifdef WINDOWS pathTransform #else @@ -221,7 +221,7 @@ int deleteRsync(char* id){ } char command[PATH_MAX] = {0}; snprintf(command, PATH_MAX, "rsync -av --delete --timeout=10 %s rsync://%s/checkpoint/%s/", - tmp, tsSnodeIp, id); + tmp, tsSnodeAddress, id); code = execCommand(command); taosRemoveDir(tmp); diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 3593665de9..14036b3e74 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -134,7 +134,8 @@ char tsSmlAutoChildTableNameDelimiter[TSDB_TABLE_NAME_LEN] = ""; // int32_t tsSmlBatchSize = 10000; // checkpoint backup -char tsSnodeIp[TSDB_FQDN_LEN] = {0}; +char tsSnodeAddress[TSDB_FQDN_LEN] = {0}; +int32_t tsRsyncPort = 873; #ifdef WINDOWS char tsCheckpointBackupDir[PATH_MAX] = "C:\\TDengine\\data\\backup\\checkpoint\\"; #else @@ -661,7 +662,8 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { if (cfgAddString(pCfg, "telemetryServer", tsTelemServer, CFG_SCOPE_BOTH) != 0) return -1; if (cfgAddInt32(pCfg, "telemetryPort", tsTelemPort, 1, 65056, CFG_SCOPE_BOTH) != 0) return -1; - if (cfgAddString(pCfg, "snodeIp", tsSnodeIp, CFG_SCOPE_SERVER) != 0) return -1; + if (cfgAddInt32(pCfg, "snodeRsyncPort", tsRsyncPort, 1, 65535, CFG_SCOPE_BOTH) != 0) return -1; + if (cfgAddString(pCfg, "snodeRsyncAddress", tsSnodeAddress, CFG_SCOPE_SERVER) != 0) return -1; if (cfgAddString(pCfg, "checkpointBackupDir", tsCheckpointBackupDir, CFG_SCOPE_SERVER) != 0) return -1; if (cfgAddInt32(pCfg, "tmqMaxTopicNum", tmqMaxTopicNum, 1, 10000, CFG_SCOPE_SERVER) != 0) return -1; @@ -1087,8 +1089,9 @@ static int32_t taosSetServerCfg(SConfig *pCfg) { tsTtlChangeOnWrite = cfgGetItem(pCfg, "ttlChangeOnWrite")->bval; tsTtlFlushThreshold = cfgGetItem(pCfg, "ttlFlushThreshold")->i32; tsTelemInterval = cfgGetItem(pCfg, "telemetryInterval")->i32; + tsRsyncPort = cfgGetItem(pCfg, "rsyncPort")->i32; tstrncpy(tsTelemServer, cfgGetItem(pCfg, "telemetryServer")->str, TSDB_FQDN_LEN); - tstrncpy(tsSnodeIp, cfgGetItem(pCfg, "snodeIp")->str, TSDB_FQDN_LEN); + tstrncpy(tsSnodeAddress, cfgGetItem(pCfg, "snodeAddress")->str, TSDB_FQDN_LEN); tstrncpy(tsCheckpointBackupDir, cfgGetItem(pCfg, "checkpointBackupDir")->str, PATH_MAX); tsTelemPort = (uint16_t)cfgGetItem(pCfg, "telemetryPort")->i32; @@ -1676,6 +1679,9 @@ int32_t taosInitCfg(const char *cfgDir, const char **envCmd, const char *envFile return -1; } + if (taosSetS3Cfg(tsCfg) != 0) return -1; + return 0; + if (tsc) { if (taosSetClientCfg(tsCfg)) return -1; } else { diff --git a/source/libs/stream/inc/streamInt.h b/source/libs/stream/inc/streamInt.h index 86dd8ed542..fb47bf135b 100644 --- a/source/libs/stream/inc/streamInt.h +++ b/source/libs/stream/inc/streamInt.h @@ -148,10 +148,16 @@ int32_t streamQueueGetItemSize(const SStreamQueue* pQueue); // char name[CHECKPOINT_PATH_LEN]; // char id[CHECKPOINT_PATH_LEN]; //} SChekpointDataHeader; - +typedef enum UPLOAD_TYPE{ + UPLOAD_DISABLE = -1, + UPLOAD_S3 = 0, + UPLOAD_RSYNC = 1, +} UPLOAD_TYPE; +UPLOAD_TYPE getUploadType(); int uploadCheckpoint(char* id, char* path); int downloadCheckpoint(char* id, char* path); int deleteCheckpoint(char* id); +int deleteCheckpointFile(char* id, char* name); #ifdef __cplusplus } diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index b30a3bdc50..57f56f93f8 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -15,6 +15,7 @@ #include "streamInt.h" #include "rsync.h" +#include "cos.h" int32_t tEncodeStreamCheckpointSourceReq(SEncoder* pEncoder, const SStreamCheckpointSourceReq* pReq) { if (tStartEncode(pEncoder) < 0) return -1; @@ -466,15 +467,55 @@ int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) { //} +static int uploadCheckpointToS3(char* id, char* path){ + TdDirPtr pDir = taosOpenDir(path); + if (pDir == NULL) return -1; + + TdDirEntryPtr de = NULL; + while ((de = taosReadDir(pDir)) != NULL) { + char* name = taosGetDirEntryName(de); + if (strcmp(name, ".") == 0 || strcmp(name, "..") == 0 || + taosDirEntryIsDir(de)) continue; + + char filename[PATH_MAX] = {0}; + if(path[strlen(path - 1)] == '/'){ + snprintf(filename, sizeof(filename), "%s%s", path, name); + }else{ + snprintf(filename, sizeof(filename), "%s%s%s", path, TD_DIRSEP, name); + } + + char object[PATH_MAX] = {0}; + snprintf(object, sizeof(object), "%s%s%s", id, TD_DIRSEP, name); + + if(s3PutObjectFromFile2(filename, object) != 0){ + taosCloseDir(&pDir); + return -1; + } + } + taosCloseDir(&pDir); + + return 0; +} + +UPLOAD_TYPE getUploadType(){ + if(strlen(tsSnodeAddress) != 0){ + return UPLOAD_RSYNC; + }else if(tsS3StreamEnabled){ + return UPLOAD_S3; + }else{ + return UPLOAD_DISABLE; + } +} + int uploadCheckpoint(char* id, char* path){ if(id == NULL || path == NULL || strlen(id) == 0 || strlen(path) == 0 || strlen(path) >= PATH_MAX){ stError("uploadCheckpoint parameters invalid"); return -1; } - if(strlen(tsSnodeIp) != 0){ - uploadRsync(id, path); -// }else if(tsS3StreamEnabled){ - + if(strlen(tsSnodeAddress) != 0){ + return uploadRsync(id, path); + }else if(tsS3StreamEnabled){ + return uploadCheckpointToS3(id, path); } return 0; } @@ -484,9 +525,9 @@ int downloadCheckpoint(char* id, char* path){ stError("downloadCheckpoint parameters invalid"); return -1; } - if(strlen(tsSnodeIp) != 0){ - downloadRsync(id, path); -// }else if(tsS3StreamEnabled){ + if(strlen(tsSnodeAddress) != 0){ + return downloadRsync(id, path); + }else if(tsS3StreamEnabled){ } return 0; @@ -497,10 +538,17 @@ int deleteCheckpoint(char* id){ stError("deleteCheckpoint parameters invalid"); return -1; } - if(strlen(tsSnodeIp) != 0){ - deleteRsync(id); -// }else if(tsS3StreamEnabled){ - + if(strlen(tsSnodeAddress) != 0){ + return deleteRsync(id); + }else if(tsS3StreamEnabled){ + s3DeleteObjectsByPrefix(id); } return 0; } + +int deleteCheckpointFile(char* id, char* name){ + char object[128] = {0}; + snprintf(object, sizeof(object), "%s%s%s", id, TD_DIRSEP, name); + s3DeleteObjects((const char**)&object, 1); + return 0; +} diff --git a/source/libs/stream/test/checkpointTest.cpp b/source/libs/stream/test/checkpointTest.cpp index 6b53f13c71..3da30121a7 100644 --- a/source/libs/stream/test/checkpointTest.cpp +++ b/source/libs/stream/test/checkpointTest.cpp @@ -27,30 +27,42 @@ #include "rsync.h" #include "streamInt.h" +#include "cos.h" int main(int argc, char **argv) { testing::InitGoogleTest(&argc, argv); - strcpy(tsSnodeIp, "127.0.0.1"); + if (taosInitCfg("/etc/taos/", NULL, NULL, NULL, NULL, 0) != 0) { + printf("error"); + } + if (s3Init() < 0) { + return -1; + } +// strcpy(tsSnodeIp, "127.0.0.1"); return RUN_ALL_TESTS(); } TEST(testCase, checkpointUpload_Test) { - stopRsync(); - startRsync(); +// stopRsync(); +// startRsync(); taosSsleep(5); char* id = "2013892036"; - uploadCheckpoint(id, "/Users/mingmingwanng/rsync/"); + uploadCheckpoint(id, "/root/offset/"); } TEST(testCase, checkpointDownload_Test) { char* id = "2013892036"; - downloadRsync(id, "/Users/mingmingwanng/rsync/tmp"); + downloadCheckpoint(id, "/Users/mingmingwanng/rsync/tmp"); } TEST(testCase, checkpointDelete_Test) { char* id = "2013892036"; - deleteRsync(id); + deleteCheckpoint(id); +} + +TEST(testCase, checkpointDeleteFile_Test) { + char* id = "2013892036"; + deleteCheckpointFile(id, "offset-ver0"); }