From 9ce8f6953d8a5ecb1aa98f14513c2ee286187006 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 8 May 2024 13:52:20 +0800 Subject: [PATCH 1/2] refactor: do some internal refactor. --- include/common/rsync.h | 10 ++--- source/common/src/cos.c | 4 +- source/common/src/rsync.c | 44 ++++++++++--------- source/libs/stream/src/streamBackendRocksdb.c | 6 ++- source/libs/stream/src/streamCheckpoint.c | 18 ++++++-- source/libs/stream/src/streamMeta.c | 3 ++ 6 files changed, 54 insertions(+), 31 deletions(-) diff --git a/include/common/rsync.h b/include/common/rsync.h index d570311694..f613a35f48 100644 --- a/include/common/rsync.h +++ b/include/common/rsync.h @@ -11,11 +11,11 @@ extern "C" { #include "tarray.h" -void stopRsync(); -void startRsync(); -int uploadRsync(const char* id, const char* path); -int downloadRsync(const char* id, const char* path); -int deleteRsync(const char* id); +void stopRsync(); +void startRsync(); +int32_t uploadRsync(const char* id, const char* path); +int32_t downloadRsync(const char* id, const char* path); +int32_t deleteRsync(const char* id); #ifdef __cplusplus } diff --git a/source/common/src/cos.c b/source/common/src/cos.c index 0db6664ab9..b249d3eff2 100644 --- a/source/common/src/cos.c +++ b/source/common/src/cos.c @@ -1208,9 +1208,11 @@ int32_t s3GetObjectToFile(const char *object_name, const char *fileName) { TdFilePtr pFile = taosOpenFile(fileName, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC); if (pFile == NULL) { - uError("[s3] open file error, errno:%d, fileName:%s", errno, fileName); + terrno = TAOS_SYSTEM_ERROR(errno); + uError("[s3] open file error, errno:%d, fileName:%s", terrno, fileName); return -1; } + TS3GetData cbd = {0}; cbd.file = pFile; do { diff --git a/source/common/src/rsync.c b/source/common/src/rsync.c index 7aec0077e7..e448aec5e0 100644 --- a/source/common/src/rsync.c +++ b/source/common/src/rsync.c @@ -43,7 +43,7 @@ static void removeEmptyDir() { static void changeDirFromWindowsToLinux(char* from, char* to){ to[0] = '/'; to[1] = from[0]; - for(int i = 2; i < strlen(from); i++) { + for(int32_t i = 2; i < strlen(from); i++) { if (from[i] == '\\') { to[i] = '/'; } else { @@ -53,7 +53,7 @@ static void changeDirFromWindowsToLinux(char* from, char* to){ } #endif -static int generateConfigFile(char* confDir) { +static int32_t generateConfigFile(char* confDir) { TdFilePtr pFile = taosOpenFile(confDir, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC); if (pFile == NULL) { uError("[rsync] open conf file error, dir:%s,"ERRNO_ERR_FORMAT, confDir, ERRNO_ERR_DATA); @@ -98,8 +98,8 @@ static int generateConfigFile(char* confDir) { return 0; } -static int execCommand(char* command){ - int try = 3; +static int32_t execCommand(char* command){ + int32_t try = 3; int32_t code = 0; while(try-- > 0) { code = system(command); @@ -112,7 +112,7 @@ static int execCommand(char* command){ } void stopRsync() { - int code = + int32_t code = #ifdef WINDOWS system("taskkill /f /im rsync.exe"); #else @@ -135,7 +135,7 @@ void startRsync() { char confDir[PATH_MAX] = {0}; snprintf(confDir, PATH_MAX, "%srsync.conf", tsCheckpointBackupDir); - int code = generateConfigFile(confDir); + int32_t code = generateConfigFile(confDir); if(code != 0){ return; } @@ -148,14 +148,16 @@ void startRsync() { uError("[rsync] start server failed, code:%d,"ERRNO_ERR_FORMAT, code, ERRNO_ERR_DATA); return; } + uDebug("[rsync] start server successful"); } -int uploadRsync(const char* id, const char* path) { +int32_t uploadRsync(const char* id, const char* path) { #ifdef WINDOWS char pathTransform[PATH_MAX] = {0}; changeDirFromWindowsToLinux(path, pathTransform); #endif + char command[PATH_MAX] = {0}; #ifdef WINDOWS if(pathTransform[strlen(pathTransform) - 1] != '/'){ @@ -169,7 +171,7 @@ int uploadRsync(const char* id, const char* path) { path #endif , tsSnodeAddress, id); - }else{ + } else { snprintf(command, PATH_MAX, "rsync -av --delete --timeout=10 --bwlimit=100000 %s rsync://%s/checkpoint/%s/", #ifdef WINDOWS pathTransform @@ -179,16 +181,17 @@ int uploadRsync(const char* id, const char* path) { , tsSnodeAddress, id); } - int code = execCommand(command); + int32_t code = execCommand(command); if(code != 0){ uError("[rsync] send failed code:%d," ERRNO_ERR_FORMAT, code, ERRNO_ERR_DATA); return -1; } + uDebug("[rsync] upload data:%s successful", id); return 0; } -int downloadRsync(const char* id, const char* path) { +int32_t downloadRsync(const char* id, const char* path) { #ifdef WINDOWS char pathTransform[PATH_MAX] = {0}; changeDirFromWindowsToLinux(path, pathTransform); @@ -203,33 +206,34 @@ int downloadRsync(const char* id, const char* path) { #endif ); - int code = execCommand(command); - if(code != 0){ + int32_t code = execCommand(command); + if (code != 0) { uError("[rsync] get failed code:%d," ERRNO_ERR_FORMAT, code, ERRNO_ERR_DATA); return -1; } + uDebug("[rsync] down data:%s successful", id); return 0; } -int deleteRsync(const char* id) { - char* tmp = "./tmp_empty/"; - int code = taosMkDir(tmp); - if(code != 0){ +int32_t deleteRsync(const char* id) { + char* tmp = "./tmp_empty/"; + int32_t code = taosMkDir(tmp); + if (code != 0) { uError("[rsync] make tmp dir failed. code:%d," ERRNO_ERR_FORMAT, code, ERRNO_ERR_DATA); return -1; } + char command[PATH_MAX] = {0}; - snprintf(command, PATH_MAX, "rsync -av --delete --timeout=10 %s rsync://%s/checkpoint/%s/", - tmp, tsSnodeAddress, id); + snprintf(command, PATH_MAX, "rsync -av --delete --timeout=10 %s rsync://%s/checkpoint/%s/", tmp, tsSnodeAddress, id); code = execCommand(command); taosRemoveDir(tmp); - if(code != 0){ + if (code != 0) { uError("[rsync] get failed code:%d," ERRNO_ERR_FORMAT, code, ERRNO_ERR_DATA); return -1; } - uDebug("[rsync] delete data:%s successful", id); + uDebug("[rsync] delete data:%s successful", id); return 0; } \ No newline at end of file diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index d97214b700..b5294a3fb7 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -329,6 +329,7 @@ int32_t rebuildFromRemoteChkp_rsync(char* key, char* chkpPath, int64_t chkpId, c if (taosIsDir(chkpPath)) { taosRemoveDir(chkpPath); } + if (taosIsDir(defaultPath)) { taosRemoveDir(defaultPath); } @@ -337,10 +338,11 @@ int32_t rebuildFromRemoteChkp_rsync(char* key, char* chkpPath, int64_t chkpId, c if (code != 0) { return code; } - code = backendCopyFiles(chkpPath, defaultPath); + code = backendCopyFiles(chkpPath, defaultPath); return code; } + int32_t rebuildFromRemoteChkp_s3(char* key, char* chkpPath, int64_t chkpId, char* defaultPath) { int32_t code = streamTaskDownloadCheckpointData(key, chkpPath); if (code != 0) { @@ -375,6 +377,7 @@ int32_t rebuildFromRemoteChkp_s3(char* key, char* chkpPath, int64_t chkpId, char taosMemoryFree(tmp); return code; } + int32_t rebuildFromRemoteChkp(char* key, char* chkpPath, int64_t chkpId, char* defaultPath) { ECHECKPOINT_BACKUP_TYPE type = streamGetCheckpointBackupType(); if (type == DATA_UPLOAD_S3) { @@ -1993,6 +1996,7 @@ _EXIT: if (cfNames) rocksdb_list_column_families_destroy(cfNames, nCf); return NULL; } + STaskDbWrapper* taskDbOpen(char* path, char* key, int64_t chkpId) { char* statePath = NULL; char* dbPath = NULL; diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index d2f9a0cbc3..1519382fe4 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -32,6 +32,7 @@ static int32_t downloadCheckpointDataByName(const char* id, const char* fname, c static int32_t deleteCheckpointFile(const char* id, const char* name); static int32_t streamTaskBackupCheckpoint(const char* id, const char* path); static int32_t deleteCheckpoint(const char* id); +static int32_t downloadCheckpointByNameS3(const char* id, const char* fname, const char* dstName); int32_t tEncodeStreamCheckpointSourceReq(SEncoder* pEncoder, const SStreamCheckpointSourceReq* pReq) { if (tStartEncode(pEncoder) < 0) return -1; @@ -444,14 +445,15 @@ int32_t uploadCheckpointData(void* param) { (int8_t)(arg->type), &path, toDelFiles)) != 0) { stError("s-task:%s failed to gen upload checkpoint:%" PRId64 "", taskStr, arg->chkpId); } + if (arg->type == DATA_UPLOAD_S3) { if (code == 0 && (code = getCheckpointDataMeta(arg->taskId, path, toDelFiles)) != 0) { - stError("s-task:%s failed to get checkpoint:%" PRId64 " meta", taskStr, arg->chkpId); + stError("s-task:%s failed to get checkpointId:%" PRId64 " meta", taskStr, arg->chkpId); } } if (code == 0 && (code = streamTaskBackupCheckpoint(arg->taskId, path)) != 0) { - stError("s-task:%s failed to upload checkpoint:%" PRId64, taskStr, arg->chkpId); + stError("s-task:%s failed to upload checkpointId:%" PRId64, taskStr, arg->chkpId); } taskReleaseDb(arg->dbRefId); @@ -610,13 +612,19 @@ static int32_t uploadCheckpointToS3(const char* id, const char* path) { return 0; } -static int32_t downloadCheckpointByNameS3(const char* id, const char* fname, const char* dstName) { +int32_t downloadCheckpointByNameS3(const char* id, const char* fname, const char* dstName) { int32_t code = 0; char* buf = taosMemoryCalloc(1, strlen(id) + strlen(dstName) + 4); + if (buf == NULL) { + code = terrno = TSDB_CODE_OUT_OF_MEMORY; + return code; + } + sprintf(buf, "%s/%s", id, fname); if (s3GetObjectToFile(buf, dstName) != 0) { - code = -1; + code = errno; } + taosMemoryFree(buf); return code; } @@ -636,11 +644,13 @@ int32_t streamTaskBackupCheckpoint(const char* id, const char* path) { stError("streamTaskBackupCheckpoint parameters invalid"); return -1; } + if (strlen(tsSnodeAddress) != 0) { return uploadRsync(id, path); } else if (tsS3StreamEnabled) { return uploadCheckpointToS3(id, path); } + return 0; } diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 877590c4b6..31a109715a 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -104,6 +104,7 @@ void metaRefMgtCleanup() { int32_t metaRefMgtAdd(int64_t vgId, int64_t* rid) { taosThreadMutexLock(&gMetaRefMgt.mutex); + void* p = taosHashGet(gMetaRefMgt.pTable, &vgId, sizeof(vgId)); if (p == NULL) { SArray* list = taosArrayInit(8, sizeof(void*)); @@ -113,6 +114,7 @@ int32_t metaRefMgtAdd(int64_t vgId, int64_t* rid) { SArray* list = *(SArray**)p; taosArrayPush(list, &rid); } + taosThreadMutexUnlock(&gMetaRefMgt.mutex); return 0; } @@ -284,6 +286,7 @@ int32_t streamTaskSetDb(SStreamMeta* pMeta, void* arg, char* key) { stDebug("s-task:0x%x set backend %p", pTask->id.taskId, pBackend); return 0; } + void streamMetaRemoveDB(void* arg, char* key) { if (arg == NULL || key == NULL) return; From bc04093d111f96fca42721acbfc8c46c5cc30ebf Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 8 May 2024 13:54:25 +0800 Subject: [PATCH 2/2] refactor: do some internal refactor. --- source/libs/stream/src/streamMeta.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 31a109715a..c401141821 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -769,7 +769,6 @@ int32_t streamMetaBegin(SStreamMeta* pMeta) { return code; } -// todo add error log int32_t streamMetaCommit(SStreamMeta* pMeta) { if (tdbCommit(pMeta->db, pMeta->txn) < 0) { stError("vgId:%d failed to commit stream meta", pMeta->vgId); @@ -787,6 +786,7 @@ int32_t streamMetaCommit(SStreamMeta* pMeta) { return -1; } + stDebug("vgId:%d stream meta file commit completed", pMeta->vgId); return 0; }