From 8cda12bc18d1569bb6a4e2179d9e2140aa3820f7 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 25 Jun 2024 11:42:54 +0800 Subject: [PATCH 1/3] fix(stream): update the remote checkpoint directory in snode. --- source/common/src/rsync.c | 54 +++++++++++++++++++++++++++++++++++---- 1 file changed, 49 insertions(+), 5 deletions(-) diff --git a/source/common/src/rsync.c b/source/common/src/rsync.c index 84e9615ddd..d9f110d647 100644 --- a/source/common/src/rsync.c +++ b/source/common/src/rsync.c @@ -170,7 +170,7 @@ int32_t uploadByRsync(const char* id, const char* path) { if (path[strlen(path) - 1] != '/') { #endif snprintf(command, PATH_MAX, - "rsync -av --debug=all --log-file=%s/rsynclog --delete --timeout=10 --bwlimit=100000 %s/ " + "rsync -av --debug=all --log-file=%s/rsynclog --delete --timeout=10 --bwlimit=100000 --exclude=\"*\" %s/ " "rsync://%s/checkpoint/%s/", tsLogDir, #ifdef WINDOWS @@ -182,7 +182,7 @@ int32_t uploadByRsync(const char* id, const char* path) { tsSnodeAddress, id); } else { snprintf(command, PATH_MAX, - "rsync -av --debug=all --log-file=%s/rsynclog --delete --timeout=10 --bwlimit=100000 %s " + "rsync -av --debug=all --log-file=%s/rsynclog --delete --timeout=10 --bwlimit=100000 --exclude=\"*\" %s " "rsync://%s/checkpoint/%s/", tsLogDir, #ifdef WINDOWS @@ -194,7 +194,51 @@ int32_t uploadByRsync(const char* id, const char* path) { tsSnodeAddress, id); } + // prepare the data directory int32_t code = execCommand(command); + if (code != 0) { + uError("[rsync] s-task:%s prepare checkpoint data in %s to %s failed, code:%d," ERRNO_ERR_FORMAT, id, path, + tsSnodeAddress, code, ERRNO_ERR_DATA); + } else { + int64_t el = (taosGetTimestampMs() - st); + uDebug("[rsync] s-task:%s prepare checkpoint data in:%s to %s successfully, elapsed time:%" PRId64 "ms", id, path, + tsSnodeAddress, el); + } + +#ifdef WINDOWS + char pathTransform[PATH_MAX] = {0}; + changeDirFromWindowsToLinux(path, pathTransform); + + if (pathTransform[strlen(pathTransform) - 1] != '/') { +#else + if (path[strlen(path) - 1] != '/') { +#endif + snprintf(command, PATH_MAX, + "rsync -av --debug=all --log-file=%s/rsynclog --delete --timeout=10 --bwlimit=100000 %s/ " + "rsync://%s/checkpoint/%s/data/", + tsLogDir, +#ifdef WINDOWS + pathTransform +#else + path +#endif + , + tsSnodeAddress, id); + } else { + snprintf(command, PATH_MAX, + "rsync -av --debug=all --log-file=%s/rsynclog --delete --timeout=10 --bwlimit=100000 %s " + "rsync://%s/checkpoint/%s/data/", + tsLogDir, +#ifdef WINDOWS + pathTransform +#else + path +#endif + , + tsSnodeAddress, id); + } + + code = execCommand(command); if (code != 0) { uError("[rsync] s-task:%s upload checkpoint data in %s to %s failed, code:%d," ERRNO_ERR_FORMAT, id, path, tsSnodeAddress, code, ERRNO_ERR_DATA); @@ -210,7 +254,7 @@ int32_t uploadByRsync(const char* id, const char* path) { // abort from retry if quit int32_t downloadRsync(const char* id, const char* path) { int64_t st = taosGetTimestampMs(); - int32_t MAX_RETRY = 60; + int32_t MAX_RETRY = 10; int32_t times = 0; int32_t code = 0; @@ -221,7 +265,7 @@ int32_t downloadRsync(const char* id, const char* path) { char command[PATH_MAX] = {0}; snprintf(command, PATH_MAX, - "rsync -av --debug=all --log-file=%s/rsynclog --timeout=10 --bwlimit=100000 rsync://%s/checkpoint/%s/ %s", + "rsync -av --debug=all --log-file=%s/rsynclog --timeout=10 --bwlimit=100000 rsync://%s/checkpoint/%s/data/ %s", tsLogDir, tsSnodeAddress, id, #ifdef WINDOWS pathTransform @@ -258,7 +302,7 @@ int32_t deleteRsync(const char* id) { char command[PATH_MAX] = {0}; snprintf(command, PATH_MAX, - "rsync -av --debug=all --log-file=%s/rsynclog --delete --timeout=10 %s rsync://%s/checkpoint/%s/", tsLogDir, + "rsync -av --debug=all --log-file=%s/rsynclog --delete --timeout=10 %s rsync://%s/checkpoint/%s/data/", tsLogDir, tmp, tsSnodeAddress, id); code = execCommand(command); From 195469f1dd2f6cda5fc881a8419a9d240d59c3db Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 25 Jun 2024 12:57:50 +0800 Subject: [PATCH 2/3] fix(stream): disable the checkpoint-info-update for tasks in non-checkpoint status --- source/libs/stream/src/streamCheckpoint.c | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index e4729bee7d..ec8510d9f0 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -443,13 +443,13 @@ int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, bool restored, SV SStreamTaskState* pStatus = streamTaskGetStatus(pTask); - if (restored && (pStatus->state != TASK_STATUS__CK)) { - stDebug("s-task:0x%x vgId:%d restored:%d status:%s not update checkpoint-info, checkpointId:%" PRId64 "->%" PRId64 - " failed", - pReq->taskId, vgId, restored, pStatus->name, pInfo->checkpointId, pReq->checkpointId); - taosThreadMutexUnlock(&pTask->lock); - return TSDB_CODE_STREAM_TASK_IVLD_STATUS; - } +// if (restored && (pStatus->state != TASK_STATUS__CK)) { +// stDebug("s-task:0x%x vgId:%d restored:%d status:%s not update checkpoint-info, checkpointId:%" PRId64 "->%" PRId64 +// " failed", +// pReq->taskId, vgId, restored, pStatus->name, pInfo->checkpointId, pReq->checkpointId); +// taosThreadMutexUnlock(&pTask->lock); +// return TSDB_CODE_STREAM_TASK_IVLD_STATUS; +// } if (!restored) { // during restore procedure, do update checkpoint-info stDebug("s-task:%s vgId:%d status:%s update the checkpoint-info during restore, checkpointId:%" PRId64 "->%" PRId64 From b66853271327e76bf3fd7bd8d9ccf5f9bce22b2d Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 25 Jun 2024 14:25:59 +0800 Subject: [PATCH 3/3] fix(stream): fix compiler error. --- source/common/src/rsync.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/common/src/rsync.c b/source/common/src/rsync.c index d9f110d647..7364ca3d6c 100644 --- a/source/common/src/rsync.c +++ b/source/common/src/rsync.c @@ -206,7 +206,7 @@ int32_t uploadByRsync(const char* id, const char* path) { } #ifdef WINDOWS - char pathTransform[PATH_MAX] = {0}; + memset(pathTransform, 0, PATH_MAX); changeDirFromWindowsToLinux(path, pathTransform); if (pathTransform[strlen(pathTransform) - 1] != '/') {