From 8e065ab47e267c59def9cb637d71b242423cdfda Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 20 Jun 2024 21:28:54 +0800 Subject: [PATCH 1/3] fix(stream): add some logs. --- source/common/src/rsync.c | 22 +++++++++++++++------- 1 file changed, 15 insertions(+), 7 deletions(-) diff --git a/source/common/src/rsync.c b/source/common/src/rsync.c index 2ed21616dc..5d056cdade 100644 --- a/source/common/src/rsync.c +++ b/source/common/src/rsync.c @@ -169,15 +169,22 @@ int32_t uploadByRsync(const char* id, const char* path) { #else if (path[strlen(path) - 1] != '/') { #endif - snprintf(command, PATH_MAX, "rsync -av --delete --timeout=10 --bwlimit=100000 %s/ rsync://%s/checkpoint/%s/", + snprintf(command, PATH_MAX, + "rsync -av --debug=all --log-file=%s/rsynclog --delete --timeout=10 --bwlimit=100000 %s/ " + "rsync://%s/checkpoint/%s/", + tsLogDir, #ifdef WINDOWS pathTransform #else path #endif - , tsSnodeAddress, id); + , + tsSnodeAddress, id); } else { - snprintf(command, PATH_MAX, "rsync -av --delete --timeout=10 --bwlimit=100000 %s rsync://%s/checkpoint/%s/", + snprintf(command, PATH_MAX, + "rsync -av --debug=all --log-file=%s/rsynclog --delete --timeout=10 --bwlimit=100000 %s " + "rsync://%s/checkpoint/%s/", + tsLogDir, #ifdef WINDOWS pathTransform #else @@ -213,14 +220,15 @@ int32_t downloadRsync(const char* id, const char* path) { #endif char command[PATH_MAX] = {0}; - snprintf(command, PATH_MAX, "rsync -av --debug=all --timeout=10 --bwlimit=100000 rsync://%s/checkpoint/%s/ %s", - tsSnodeAddress, id, + snprintf(command, PATH_MAX, + "rsync -av --debug=all --log-file=%s/rsynclog --timeout=10 --bwlimit=100000 rsync://%s/checkpoint/%s/ %s", + tsLogDir, tsSnodeAddress, id, #ifdef WINDOWS pathTransform #else path #endif - ); + ); uDebug("[rsync] %s start to sync data from remote to:%s, %s", id, path, command); @@ -249,7 +257,7 @@ int32_t deleteRsync(const char* id) { } char command[PATH_MAX] = {0}; - snprintf(command, PATH_MAX, "rsync -av --delete --timeout=10 %s rsync://%s/checkpoint/%s/", tmp, tsSnodeAddress, id); + snprintf(command, PATH_MAX, "rsync -av --debug=all --log-file=%s/rsynclog --delete --timeout=10 %s rsync://%s/checkpoint/%s/", tmp, tsSnodeAddress, id); code = execCommand(command); taosRemoveDir(tmp); From 2ecc725b1a646cdb38d216b00651b4e058f8fed5 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 20 Jun 2024 21:31:23 +0800 Subject: [PATCH 2/3] fix(stream): not clear check downstream info. --- source/libs/stream/src/streamCheckStatus.c | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/source/libs/stream/src/streamCheckStatus.c b/source/libs/stream/src/streamCheckStatus.c index b577147171..b64e0bb6d2 100644 --- a/source/libs/stream/src/streamCheckStatus.c +++ b/source/libs/stream/src/streamCheckStatus.c @@ -299,12 +299,10 @@ int32_t streamTaskStartMonitorCheckRsp(SStreamTask* pTask) { int32_t streamTaskStopMonitorCheckRsp(STaskCheckInfo* pInfo, const char* id) { taosThreadMutexLock(&pInfo->checkInfoLock); - streamTaskCompleteCheckRsp(pInfo, false, id); - pInfo->stopCheckProcess = 1; taosThreadMutexUnlock(&pInfo->checkInfoLock); - stDebug("s-task:%s set stop check-rsp monit", id); + stDebug("s-task:%s set stop check-rsp monitor flag", id); return TSDB_CODE_SUCCESS; } @@ -438,6 +436,7 @@ int32_t streamTaskStartCheckDownstream(STaskCheckInfo* pInfo, const char* id) { ASSERT(pInfo->startTs > 0); stError("s-task:%s already in check procedure, checkTs:%" PRId64 ", start monitor check rsp failed", id, pInfo->startTs); + pInfo->stopCheckProcess = 0; // disable auto stop of check process return TSDB_CODE_FAILED; } From 28b0148df97f85a1b8766adf43e4ecd073f6d5e1 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 20 Jun 2024 22:41:33 +0800 Subject: [PATCH 3/3] fix(stream): fix syntax error. --- source/common/src/rsync.c | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/source/common/src/rsync.c b/source/common/src/rsync.c index 5d056cdade..84e9615ddd 100644 --- a/source/common/src/rsync.c +++ b/source/common/src/rsync.c @@ -257,7 +257,9 @@ int32_t deleteRsync(const char* id) { } char command[PATH_MAX] = {0}; - snprintf(command, PATH_MAX, "rsync -av --debug=all --log-file=%s/rsynclog --delete --timeout=10 %s rsync://%s/checkpoint/%s/", tmp, tsSnodeAddress, id); + snprintf(command, PATH_MAX, + "rsync -av --debug=all --log-file=%s/rsynclog --delete --timeout=10 %s rsync://%s/checkpoint/%s/", tsLogDir, + tmp, tsSnodeAddress, id); code = execCommand(command); taosRemoveDir(tmp);