From a88635129841692ac9cd4be75d058215c33901bb Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 15 Jul 2024 14:52:28 +0800 Subject: [PATCH] fix(stream): update checkpoint into different dir. --- include/common/rsync.h | 4 +- source/common/src/rsync.c | 74 ++++++++++++++----- source/libs/stream/src/streamBackendRocksdb.c | 4 +- source/libs/stream/src/streamCheckpoint.c | 12 +-- 4 files changed, 64 insertions(+), 30 deletions(-) diff --git a/include/common/rsync.h b/include/common/rsync.h index 0840b51793..4221fb432f 100644 --- a/include/common/rsync.h +++ b/include/common/rsync.h @@ -13,8 +13,8 @@ extern "C" { void stopRsync(); void startRsync(); -int32_t uploadByRsync(const char* id, const char* path); -int32_t downloadRsync(const char* id, const char* path); +int32_t uploadByRsync(const char* id, const char* path, int64_t checkpointId); +int32_t downloadByRsync(const char* id, const char* path, int64_t checkpointId); int32_t deleteRsync(const char* id); #ifdef __cplusplus diff --git a/source/common/src/rsync.c b/source/common/src/rsync.c index d0b10b7f41..36d634c305 100644 --- a/source/common/src/rsync.c +++ b/source/common/src/rsync.c @@ -157,7 +157,7 @@ void startRsync() { } } -int32_t uploadByRsync(const char* id, const char* path) { +int32_t uploadByRsync(const char* id, const char* path, int64_t checkpointId) { int64_t st = taosGetTimestampMs(); char command[PATH_MAX] = {0}; @@ -197,11 +197,11 @@ int32_t uploadByRsync(const char* id, const char* path) { // prepare the data directory int32_t code = execCommand(command); if (code != 0) { - uError("[rsync] s-task:%s prepare checkpoint data in %s to %s failed, code:%d," ERRNO_ERR_FORMAT, id, path, + uError("[rsync] s-task:%s prepare checkpoint dir in %s to %s failed, code:%d," ERRNO_ERR_FORMAT, id, path, tsSnodeAddress, code, ERRNO_ERR_DATA); } else { int64_t el = (taosGetTimestampMs() - st); - uDebug("[rsync] s-task:%s prepare checkpoint data in:%s to %s successfully, elapsed time:%" PRId64 "ms", id, path, + uDebug("[rsync] s-task:%s prepare checkpoint dir in:%s to %s successfully, elapsed time:%" PRId64 "ms", id, path, tsSnodeAddress, el); } @@ -215,7 +215,7 @@ int32_t uploadByRsync(const char* id, const char* path) { #endif snprintf(command, PATH_MAX, "rsync -av --debug=all --log-file=%s/rsynclog --delete --timeout=10 --bwlimit=100000 %s/ " - "rsync://%s/checkpoint/%s/data/", + "rsync://%s/checkpoint/%s/%" PRId64 "/", tsLogDir, #ifdef WINDOWS pathTransform @@ -223,11 +223,11 @@ int32_t uploadByRsync(const char* id, const char* path) { path #endif , - tsSnodeAddress, id); + tsSnodeAddress, id, checkpointId); } else { snprintf(command, PATH_MAX, "rsync -av --debug=all --log-file=%s/rsynclog --delete --timeout=10 --bwlimit=100000 %s " - "rsync://%s/checkpoint/%s/data/", + "rsync://%s/checkpoint/%s/%" PRId64 "/", tsLogDir, #ifdef WINDOWS pathTransform @@ -235,7 +235,7 @@ int32_t uploadByRsync(const char* id, const char* path) { path #endif , - tsSnodeAddress, id); + tsSnodeAddress, id, checkpointId); } code = execCommand(command); @@ -252,7 +252,7 @@ int32_t uploadByRsync(const char* id, const char* path) { } // abort from retry if quit -int32_t downloadRsync(const char* id, const char* path) { +int32_t downloadByRsync(const char* id, const char* path, int64_t checkpointId) { int64_t st = taosGetTimestampMs(); int32_t MAX_RETRY = 10; int32_t times = 0; @@ -264,6 +264,42 @@ int32_t downloadRsync(const char* id, const char* path) { #endif char command[PATH_MAX] = {0}; + snprintf( + command, PATH_MAX, + "rsync -av --debug=all --log-file=%s/rsynclog --timeout=10 --bwlimit=100000 rsync://%s/checkpoint/%s/%" PRId64 + "/ %s", + tsLogDir, tsSnodeAddress, id, checkpointId, +#ifdef WINDOWS + pathTransform +#else + path +#endif + ); + + uDebug("[rsync] %s start to sync data from remote to:%s, cmd:%s", id, path, command); + +// while (times++ < MAX_RETRY) { + code = execCommand(command); + if (code != TSDB_CODE_SUCCESS) { + uError("[rsync] %s download checkpointId:%" PRId64 + " data:%s failed, retry after 1sec, times:%d, code:%d," ERRNO_ERR_FORMAT, + id, checkpointId, path, times, code, ERRNO_ERR_DATA); +// taosSsleep(1); + } else { + int32_t el = taosGetTimestampMs() - st; + uDebug("[rsync] %s download checkpointId:%" PRId64 " data:%s successfully, elapsed time:%dms", id, checkpointId, + path, el); +// break; + } +// } + + // if failed, try to load it from data directory +#ifdef WINDOWS + memset(pathTransform, 0, PATH_MAX); + changeDirFromWindowsToLinux(path, pathTransform); +#endif + + memset(command, 0, PATH_MAX); snprintf( command, PATH_MAX, "rsync -av --debug=all --log-file=%s/rsynclog --timeout=10 --bwlimit=100000 rsync://%s/checkpoint/%s/data/ %s", @@ -275,19 +311,17 @@ int32_t downloadRsync(const char* id, const char* path) { #endif ); - uDebug("[rsync] %s start to sync data from remote to:%s, %s", id, path, command); + uDebug("[rsync] %s start to sync data from remote data dir to:%s, cmd:%s", id, path, command); - while (times++ < MAX_RETRY) { - code = execCommand(command); - if (code != TSDB_CODE_SUCCESS) { - uError("[rsync] %s download checkpoint data:%s failed, retry after 1sec, times:%d, code:%d," ERRNO_ERR_FORMAT, id, - path, times, code, ERRNO_ERR_DATA); - taosSsleep(1); - } else { - int32_t el = taosGetTimestampMs() - st; - uDebug("[rsync] %s download checkpoint data:%s successfully, elapsed time:%dms", id, path, el); - break; - } + code = execCommand(command); + if (code != TSDB_CODE_SUCCESS) { + uError("[rsync] %s download checkpointId:%" PRId64 + " data:%s failed, retry after 1sec, times:%d, code:%d," ERRNO_ERR_FORMAT, + id, checkpointId, path, times, code, ERRNO_ERR_DATA); + } else { + int32_t el = taosGetTimestampMs() - st; + uDebug("[rsync] %s download checkpointId:%" PRId64 " data:%s successfully, elapsed time:%dms", id, checkpointId, + path, el); } return code; diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index 8b87019ee0..15a8be6eaa 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -447,7 +447,7 @@ int32_t rebuildFromRemoteChkp_rsync(const char* key, char* checkpointPath, int64 cleanDir(defaultPath, key); stDebug("clear local default dir before downloading checkpoint data:%s succ", defaultPath); - code = streamTaskDownloadCheckpointData(key, checkpointPath); + code = streamTaskDownloadCheckpointData(key, checkpointPath, checkpointId); if (code != 0) { stError("failed to download checkpoint data:%s", key); return code; @@ -482,7 +482,7 @@ int32_t rebuildDataFromS3(char* chkpPath, int64_t chkpId) { int32_t rebuildFromRemoteChkp_s3(const char* key, char* chkpPath, int64_t chkpId, char* defaultPath) { int8_t rename = 0; - int32_t code = streamTaskDownloadCheckpointData(key, chkpPath); + int32_t code = streamTaskDownloadCheckpointData(key, chkpPath, chkpId); if (code != 0) { return code; } diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index bcdd1a047c..87c2af5207 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -20,7 +20,7 @@ static int32_t downloadCheckpointDataByName(const char* id, const char* fname, const char* dstName); static int32_t deleteCheckpointFile(const char* id, const char* name); -static int32_t streamTaskUploadCheckpoint(const char* id, const char* path); +static int32_t streamTaskUploadCheckpoint(const char* id, const char* path, int64_t checkpointId); static int32_t deleteCheckpoint(const char* id); static int32_t downloadCheckpointByNameS3(const char* id, const char* fname, const char* dstName); static int32_t continueDispatchCheckpointTriggerBlock(SStreamDataBlock* pBlock, SStreamTask* pTask); @@ -601,7 +601,7 @@ int32_t uploadCheckpointData(SStreamTask* pTask, int64_t checkpointId, int64_t d } if (code == TSDB_CODE_SUCCESS) { - code = streamTaskUploadCheckpoint(idStr, path); + code = streamTaskUploadCheckpoint(idStr, path, checkpointId); if (code == TSDB_CODE_SUCCESS) { stDebug("s-task:%s upload checkpointId:%" PRId64 " to remote succ", idStr, checkpointId); } else { @@ -1082,7 +1082,7 @@ ECHECKPOINT_BACKUP_TYPE streamGetCheckpointBackupType() { } } -int32_t streamTaskUploadCheckpoint(const char* id, const char* path) { +int32_t streamTaskUploadCheckpoint(const char* id, const char* path, int64_t checkpointId) { int32_t code = 0; if (id == NULL || path == NULL || strlen(id) == 0 || strlen(path) == 0 || strlen(path) >= PATH_MAX) { stError("invalid parameters in upload checkpoint, %s", id); @@ -1090,7 +1090,7 @@ int32_t streamTaskUploadCheckpoint(const char* id, const char* path) { } if (strlen(tsSnodeAddress) != 0) { - code = uploadByRsync(id, path); + code = uploadByRsync(id, path, checkpointId); if (code != 0) { return TAOS_SYSTEM_ERROR(errno); } @@ -1117,14 +1117,14 @@ int32_t downloadCheckpointDataByName(const char* id, const char* fname, const ch return 0; } -int32_t streamTaskDownloadCheckpointData(const char* id, char* path) { +int32_t streamTaskDownloadCheckpointData(const char* id, char* path, int64_t checkpointId) { if (id == NULL || path == NULL || strlen(id) == 0 || strlen(path) == 0 || strlen(path) >= PATH_MAX) { stError("down checkpoint data parameters invalid"); return -1; } if (strlen(tsSnodeAddress) != 0) { - return downloadRsync(id, path); + return downloadByRsync(id, path, checkpointId); } else if (tsS3StreamEnabled) { return s3GetObjectsByPrefix(id, path); }