From d214dd5cdd7103a3a31a8f4a09c9998a556e43b4 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 11 May 2024 16:19:11 +0800 Subject: [PATCH] fix(stream): fix error in downloading remote backup checkpoint data. --- source/common/src/rsync.c | 14 +++++--- source/libs/stream/src/streamBackendRocksdb.c | 35 +++++++++---------- 2 files changed, 26 insertions(+), 23 deletions(-) diff --git a/source/common/src/rsync.c b/source/common/src/rsync.c index 35ae9450f1..302f17942f 100644 --- a/source/common/src/rsync.c +++ b/source/common/src/rsync.c @@ -196,10 +196,14 @@ int32_t uploadRsync(const char* id, const char* path) { } int32_t downloadRsync(const char* id, const char* path) { + int64_t st = taosGetTimestampMs(); + uDebug("[rsync] %s start to sync data from remote to local:%s", id, path); + #ifdef WINDOWS char pathTransform[PATH_MAX] = {0}; changeDirFromWindowsToLinux(path, pathTransform); #endif + char command[PATH_MAX] = {0}; snprintf(command, PATH_MAX, "rsync -av --timeout=10 --bwlimit=100000 rsync://%s/checkpoint/%s/ %s", tsSnodeAddress, id, @@ -211,13 +215,15 @@ int32_t downloadRsync(const char* id, const char* path) { ); int32_t code = execCommand(command); + + int32_t el = taosGetTimestampMs() - st; if (code != 0) { - uError("[rsync] download checkpoint data failed, code:%d," ERRNO_ERR_FORMAT, code, ERRNO_ERR_DATA); - return -1; + uError("[rsync] %s download checkpoint data:%s failed, code:%d," ERRNO_ERR_FORMAT, id, path, code, ERRNO_ERR_DATA); + } else { + uDebug("[rsync] %s download checkpoint data:%s successfully, elapsed time:%dms", id, path, el); } - uDebug("[rsync] download checkpoint data:%s successfully", id); - return 0; + return code; } int32_t deleteRsync(const char* id) { diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index 455466f64c..1a05acf3eb 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -325,28 +325,27 @@ int32_t remoteChkp_validAndCvtMeta(char* path, SArray* list, int64_t chkpId) { return complete == 1 ? 0 : -1; } -int32_t rebuildFromRemoteChkp_rsync(char* key, char* chkpPath, int64_t checkpointId, char* defaultPath) { - // impl later +int32_t rebuildFromRemoteChkp_rsync(char* key, char* chkptPath, int64_t checkpointId, char* defaultPath) { int32_t code = 0; - if (taosIsDir(chkpPath)) { - taosRemoveDir(chkpPath); + if (taosIsDir(chkptPath)) { + taosRemoveDir(chkptPath); + stDebug("remove local checkpoint data dir:%s succ", chkptPath); } if (taosIsDir(defaultPath)) { taosRemoveDir(defaultPath); + taosMulMkDir(defaultPath); + stDebug("clear local backend dir:%s succ", defaultPath); } - code = streamTaskDownloadCheckpointData(key, chkpPath); + code = streamTaskDownloadCheckpointData(key, chkptPath); if (code != 0) { stError("failed to download checkpoint data:%s", key); return code; } - stDebug("download backup checkpoint data into:%s, checkpointId:%" PRId64 ", %s", chkpPath, checkpointId, key); - - code = backendCopyFiles(chkpPath, defaultPath); - - return code; + stDebug("download remote checkpoint data for checkpointId:%" PRId64 ", %s", checkpointId, key); + return backendCopyFiles(chkptPath, defaultPath); } int32_t rebuildFromRemoteChkp_s3(char* key, char* chkpPath, int64_t chkpId, char* defaultPath) { @@ -456,7 +455,7 @@ int32_t backendFileCopyFilesImpl(const char* src, const char* dst) { code = copyFiles_hardlink(srcName, dstName, 0); if (code != 0) { code = TAOS_SYSTEM_ERROR(code); - stError("failed to hardlink file, detail:%s to %s, reason:%s", srcName, dstName, tstrerror(code)); + stError("failed to hard link file, detail:%s to %s, reason:%s", srcName, dstName, tstrerror(code)); goto _ERROR; } else { stDebug("succ hard link file:%s to %s", srcName, dstName); @@ -485,14 +484,13 @@ int32_t backendCopyFiles(const char* src, const char* dst) { return backendFileCopyFilesImpl(src, dst); } -static int32_t rebuildFromLocalCheckpoint(char* pTaskIdStr, const char* checkpointPath, int64_t chkpId, const char* defaultPath) { +static int32_t rebuildFromLocalCheckpoint(char* pTaskIdStr, const char* checkpointPath, int64_t chkptId, const char* defaultPath) { int32_t code = 0; if (taosIsDir(defaultPath)) { taosRemoveDir(defaultPath); taosMkDir(defaultPath); - - stInfo("clear task backend path:%s, done", defaultPath); + stInfo("clear task backend dir:%s, done", defaultPath); } if (taosIsDir(checkpointPath) && isValidCheckpoint(checkpointPath)) { @@ -506,11 +504,12 @@ static int32_t rebuildFromLocalCheckpoint(char* pTaskIdStr, const char* checkpoi pTaskIdStr, checkpointPath, tstrerror(TAOS_SYSTEM_ERROR(errno)), defaultPath); code = TSDB_CODE_SUCCESS; } else { - stInfo("%s start to restart stream backend at checkpoint path: %s", pTaskIdStr, checkpointPath); + stInfo("%s copy checkpoint data from:%s to:%s succ, try to start stream backend", pTaskIdStr, checkpointPath, + defaultPath); } } else { code = TSDB_CODE_FAILED; - stError("%s not valid checkpoint path/data in:%s", pTaskIdStr, checkpointPath); + stError("%s no valid checkpoint data for checkpointId:%" PRId64 " in %s", pTaskIdStr, chkptId, checkpointPath); } return code; @@ -522,7 +521,6 @@ int32_t rebuildFromlocalDefault(char* key, char* chkpPath, int64_t chkpId, char* } int32_t rebuildDirFormCheckpoint(const char* path, char* key, int64_t chkptId, char** dbPrefixPath, char** dbPath) { - // impl later int32_t code = 0; char* prefixPath = taosMemoryCalloc(1, strlen(path) + 128); @@ -538,8 +536,7 @@ int32_t rebuildDirFormCheckpoint(const char* path, char* key, int64_t chkptId, c if (!taosIsDir(defaultPath)) { taosMulMkDir(defaultPath); } - - stDebug("prepare local dir:%s, checkpointId:%" PRId64 ", key:%s succ", defaultPath, chkptId, key); + stDebug("local default dir:%s, checkpointId:%" PRId64 ", key:%s succ", defaultPath, chkptId, key); char* chkptPath = taosMemoryCalloc(1, strlen(path) + 256); if (chkptId != 0) {