Merge pull request #26278 from taosdata/fix/3_liaohj

fix(stream): update the remote checkpoint directory in snode.
This commit is contained in:
Haojun Liao 2024-06-25 15:52:01 +08:00 committed by GitHub
commit e6f376f5cb
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 56 additions and 12 deletions

View File

@ -170,7 +170,7 @@ int32_t uploadByRsync(const char* id, const char* path) {
if (path[strlen(path) - 1] != '/') { if (path[strlen(path) - 1] != '/') {
#endif #endif
snprintf(command, PATH_MAX, snprintf(command, PATH_MAX,
"rsync -av --debug=all --log-file=%s/rsynclog --delete --timeout=10 --bwlimit=100000 %s/ " "rsync -av --debug=all --log-file=%s/rsynclog --delete --timeout=10 --bwlimit=100000 --exclude=\"*\" %s/ "
"rsync://%s/checkpoint/%s/", "rsync://%s/checkpoint/%s/",
tsLogDir, tsLogDir,
#ifdef WINDOWS #ifdef WINDOWS
@ -182,7 +182,7 @@ int32_t uploadByRsync(const char* id, const char* path) {
tsSnodeAddress, id); tsSnodeAddress, id);
} else { } else {
snprintf(command, PATH_MAX, snprintf(command, PATH_MAX,
"rsync -av --debug=all --log-file=%s/rsynclog --delete --timeout=10 --bwlimit=100000 %s " "rsync -av --debug=all --log-file=%s/rsynclog --delete --timeout=10 --bwlimit=100000 --exclude=\"*\" %s "
"rsync://%s/checkpoint/%s/", "rsync://%s/checkpoint/%s/",
tsLogDir, tsLogDir,
#ifdef WINDOWS #ifdef WINDOWS
@ -194,7 +194,51 @@ int32_t uploadByRsync(const char* id, const char* path) {
tsSnodeAddress, id); tsSnodeAddress, id);
} }
// prepare the data directory
int32_t code = execCommand(command); int32_t code = execCommand(command);
if (code != 0) {
uError("[rsync] s-task:%s prepare checkpoint data in %s to %s failed, code:%d," ERRNO_ERR_FORMAT, id, path,
tsSnodeAddress, code, ERRNO_ERR_DATA);
} else {
int64_t el = (taosGetTimestampMs() - st);
uDebug("[rsync] s-task:%s prepare checkpoint data in:%s to %s successfully, elapsed time:%" PRId64 "ms", id, path,
tsSnodeAddress, el);
}
#ifdef WINDOWS
memset(pathTransform, 0, PATH_MAX);
changeDirFromWindowsToLinux(path, pathTransform);
if (pathTransform[strlen(pathTransform) - 1] != '/') {
#else
if (path[strlen(path) - 1] != '/') {
#endif
snprintf(command, PATH_MAX,
"rsync -av --debug=all --log-file=%s/rsynclog --delete --timeout=10 --bwlimit=100000 %s/ "
"rsync://%s/checkpoint/%s/data/",
tsLogDir,
#ifdef WINDOWS
pathTransform
#else
path
#endif
,
tsSnodeAddress, id);
} else {
snprintf(command, PATH_MAX,
"rsync -av --debug=all --log-file=%s/rsynclog --delete --timeout=10 --bwlimit=100000 %s "
"rsync://%s/checkpoint/%s/data/",
tsLogDir,
#ifdef WINDOWS
pathTransform
#else
path
#endif
,
tsSnodeAddress, id);
}
code = execCommand(command);
if (code != 0) { if (code != 0) {
uError("[rsync] s-task:%s upload checkpoint data in %s to %s failed, code:%d," ERRNO_ERR_FORMAT, id, path, uError("[rsync] s-task:%s upload checkpoint data in %s to %s failed, code:%d," ERRNO_ERR_FORMAT, id, path,
tsSnodeAddress, code, ERRNO_ERR_DATA); tsSnodeAddress, code, ERRNO_ERR_DATA);
@ -210,7 +254,7 @@ int32_t uploadByRsync(const char* id, const char* path) {
// abort from retry if quit // abort from retry if quit
int32_t downloadRsync(const char* id, const char* path) { int32_t downloadRsync(const char* id, const char* path) {
int64_t st = taosGetTimestampMs(); int64_t st = taosGetTimestampMs();
int32_t MAX_RETRY = 60; int32_t MAX_RETRY = 10;
int32_t times = 0; int32_t times = 0;
int32_t code = 0; int32_t code = 0;
@ -221,7 +265,7 @@ int32_t downloadRsync(const char* id, const char* path) {
char command[PATH_MAX] = {0}; char command[PATH_MAX] = {0};
snprintf(command, PATH_MAX, snprintf(command, PATH_MAX,
"rsync -av --debug=all --log-file=%s/rsynclog --timeout=10 --bwlimit=100000 rsync://%s/checkpoint/%s/ %s", "rsync -av --debug=all --log-file=%s/rsynclog --timeout=10 --bwlimit=100000 rsync://%s/checkpoint/%s/data/ %s",
tsLogDir, tsSnodeAddress, id, tsLogDir, tsSnodeAddress, id,
#ifdef WINDOWS #ifdef WINDOWS
pathTransform pathTransform
@ -258,7 +302,7 @@ int32_t deleteRsync(const char* id) {
char command[PATH_MAX] = {0}; char command[PATH_MAX] = {0};
snprintf(command, PATH_MAX, snprintf(command, PATH_MAX,
"rsync -av --debug=all --log-file=%s/rsynclog --delete --timeout=10 %s rsync://%s/checkpoint/%s/", tsLogDir, "rsync -av --debug=all --log-file=%s/rsynclog --delete --timeout=10 %s rsync://%s/checkpoint/%s/data/", tsLogDir,
tmp, tsSnodeAddress, id); tmp, tsSnodeAddress, id);
code = execCommand(command); code = execCommand(command);

View File

@ -443,13 +443,13 @@ int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, bool restored, SV
SStreamTaskState* pStatus = streamTaskGetStatus(pTask); SStreamTaskState* pStatus = streamTaskGetStatus(pTask);
if (restored && (pStatus->state != TASK_STATUS__CK)) { // if (restored && (pStatus->state != TASK_STATUS__CK)) {
stDebug("s-task:0x%x vgId:%d restored:%d status:%s not update checkpoint-info, checkpointId:%" PRId64 "->%" PRId64 // stDebug("s-task:0x%x vgId:%d restored:%d status:%s not update checkpoint-info, checkpointId:%" PRId64 "->%" PRId64
" failed", // " failed",
pReq->taskId, vgId, restored, pStatus->name, pInfo->checkpointId, pReq->checkpointId); // pReq->taskId, vgId, restored, pStatus->name, pInfo->checkpointId, pReq->checkpointId);
taosThreadMutexUnlock(&pTask->lock); // taosThreadMutexUnlock(&pTask->lock);
return TSDB_CODE_STREAM_TASK_IVLD_STATUS; // return TSDB_CODE_STREAM_TASK_IVLD_STATUS;
} // }
if (!restored) { // during restore procedure, do update checkpoint-info if (!restored) { // during restore procedure, do update checkpoint-info
stDebug("s-task:%s vgId:%d status:%s update the checkpoint-info during restore, checkpointId:%" PRId64 "->%" PRId64 stDebug("s-task:%s vgId:%d status:%s update the checkpoint-info during restore, checkpointId:%" PRId64 "->%" PRId64