Merge pull request #26224 from taosdata/fix/3_liaohj

fix(stream): not clear check downstream info.
This commit is contained in:
Haojun Liao 2024-06-21 08:41:11 +08:00 committed by GitHub
commit d866e6dc66
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 19 additions and 10 deletions

View File

@ -169,15 +169,22 @@ int32_t uploadByRsync(const char* id, const char* path) {
#else #else
if (path[strlen(path) - 1] != '/') { if (path[strlen(path) - 1] != '/') {
#endif #endif
snprintf(command, PATH_MAX, "rsync -av --delete --timeout=10 --bwlimit=100000 %s/ rsync://%s/checkpoint/%s/", snprintf(command, PATH_MAX,
"rsync -av --debug=all --log-file=%s/rsynclog --delete --timeout=10 --bwlimit=100000 %s/ "
"rsync://%s/checkpoint/%s/",
tsLogDir,
#ifdef WINDOWS #ifdef WINDOWS
pathTransform pathTransform
#else #else
path path
#endif #endif
, tsSnodeAddress, id); ,
tsSnodeAddress, id);
} else { } else {
snprintf(command, PATH_MAX, "rsync -av --delete --timeout=10 --bwlimit=100000 %s rsync://%s/checkpoint/%s/", snprintf(command, PATH_MAX,
"rsync -av --debug=all --log-file=%s/rsynclog --delete --timeout=10 --bwlimit=100000 %s "
"rsync://%s/checkpoint/%s/",
tsLogDir,
#ifdef WINDOWS #ifdef WINDOWS
pathTransform pathTransform
#else #else
@ -213,14 +220,15 @@ int32_t downloadRsync(const char* id, const char* path) {
#endif #endif
char command[PATH_MAX] = {0}; char command[PATH_MAX] = {0};
snprintf(command, PATH_MAX, "rsync -av --debug=all --timeout=10 --bwlimit=100000 rsync://%s/checkpoint/%s/ %s", snprintf(command, PATH_MAX,
tsSnodeAddress, id, "rsync -av --debug=all --log-file=%s/rsynclog --timeout=10 --bwlimit=100000 rsync://%s/checkpoint/%s/ %s",
tsLogDir, tsSnodeAddress, id,
#ifdef WINDOWS #ifdef WINDOWS
pathTransform pathTransform
#else #else
path path
#endif #endif
); );
uDebug("[rsync] %s start to sync data from remote to:%s, %s", id, path, command); uDebug("[rsync] %s start to sync data from remote to:%s, %s", id, path, command);
@ -249,7 +257,9 @@ int32_t deleteRsync(const char* id) {
} }
char command[PATH_MAX] = {0}; char command[PATH_MAX] = {0};
snprintf(command, PATH_MAX, "rsync -av --delete --timeout=10 %s rsync://%s/checkpoint/%s/", tmp, tsSnodeAddress, id); snprintf(command, PATH_MAX,
"rsync -av --debug=all --log-file=%s/rsynclog --delete --timeout=10 %s rsync://%s/checkpoint/%s/", tsLogDir,
tmp, tsSnodeAddress, id);
code = execCommand(command); code = execCommand(command);
taosRemoveDir(tmp); taosRemoveDir(tmp);

View File

@ -299,12 +299,10 @@ int32_t streamTaskStartMonitorCheckRsp(SStreamTask* pTask) {
int32_t streamTaskStopMonitorCheckRsp(STaskCheckInfo* pInfo, const char* id) { int32_t streamTaskStopMonitorCheckRsp(STaskCheckInfo* pInfo, const char* id) {
taosThreadMutexLock(&pInfo->checkInfoLock); taosThreadMutexLock(&pInfo->checkInfoLock);
streamTaskCompleteCheckRsp(pInfo, false, id);
pInfo->stopCheckProcess = 1; pInfo->stopCheckProcess = 1;
taosThreadMutexUnlock(&pInfo->checkInfoLock); taosThreadMutexUnlock(&pInfo->checkInfoLock);
stDebug("s-task:%s set stop check-rsp monit", id); stDebug("s-task:%s set stop check-rsp monitor flag", id);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -438,6 +436,7 @@ int32_t streamTaskStartCheckDownstream(STaskCheckInfo* pInfo, const char* id) {
ASSERT(pInfo->startTs > 0); ASSERT(pInfo->startTs > 0);
stError("s-task:%s already in check procedure, checkTs:%" PRId64 ", start monitor check rsp failed", id, stError("s-task:%s already in check procedure, checkTs:%" PRId64 ", start monitor check rsp failed", id,
pInfo->startTs); pInfo->startTs);
pInfo->stopCheckProcess = 0; // disable auto stop of check process
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} }