From 0cca12ab52aa1506b80154d8784ed3b6b1da3daa Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 18 Jul 2024 15:49:49 +0800 Subject: [PATCH] fix(stream): add some logs. --- source/common/src/rsync.c | 67 ++++++++++++++--------------- source/libs/stream/src/streamMeta.c | 14 +++++- 2 files changed, 45 insertions(+), 36 deletions(-) diff --git a/source/common/src/rsync.c b/source/common/src/rsync.c index 36d634c305..f2f6796fb0 100644 --- a/source/common/src/rsync.c +++ b/source/common/src/rsync.c @@ -278,41 +278,6 @@ int32_t downloadByRsync(const char* id, const char* path, int64_t checkpointId) uDebug("[rsync] %s start to sync data from remote to:%s, cmd:%s", id, path, command); -// while (times++ < MAX_RETRY) { - code = execCommand(command); - if (code != TSDB_CODE_SUCCESS) { - uError("[rsync] %s download checkpointId:%" PRId64 - " data:%s failed, retry after 1sec, times:%d, code:%d," ERRNO_ERR_FORMAT, - id, checkpointId, path, times, code, ERRNO_ERR_DATA); -// taosSsleep(1); - } else { - int32_t el = taosGetTimestampMs() - st; - uDebug("[rsync] %s download checkpointId:%" PRId64 " data:%s successfully, elapsed time:%dms", id, checkpointId, - path, el); -// break; - } -// } - - // if failed, try to load it from data directory -#ifdef WINDOWS - memset(pathTransform, 0, PATH_MAX); - changeDirFromWindowsToLinux(path, pathTransform); -#endif - - memset(command, 0, PATH_MAX); - snprintf( - command, PATH_MAX, - "rsync -av --debug=all --log-file=%s/rsynclog --timeout=10 --bwlimit=100000 rsync://%s/checkpoint/%s/data/ %s", - tsLogDir, tsSnodeAddress, id, -#ifdef WINDOWS - pathTransform -#else - path -#endif - ); - - uDebug("[rsync] %s start to sync data from remote data dir to:%s, cmd:%s", id, path, command); - code = execCommand(command); if (code != TSDB_CODE_SUCCESS) { uError("[rsync] %s download checkpointId:%" PRId64 @@ -324,6 +289,38 @@ int32_t downloadByRsync(const char* id, const char* path, int64_t checkpointId) path, el); } + if (code != TSDB_CODE_SUCCESS) { // if failed, try to load it from data directory +#ifdef WINDOWS + memset(pathTransform, 0, PATH_MAX); + changeDirFromWindowsToLinux(path, pathTransform); +#endif + + memset(command, 0, PATH_MAX); + snprintf( + command, PATH_MAX, + "rsync -av --debug=all --log-file=%s/rsynclog --timeout=10 --bwlimit=100000 rsync://%s/checkpoint/%s/data/ %s", + tsLogDir, tsSnodeAddress, id, +#ifdef WINDOWS + pathTransform +#else + path +#endif + ); + + uDebug("[rsync] %s start to sync data from remote data dir to:%s, cmd:%s", id, path, command); + + code = execCommand(command); + if (code != TSDB_CODE_SUCCESS) { + uError("[rsync] %s download checkpointId:%" PRId64 + " data:%s failed, retry after 1sec, times:%d, code:%d," ERRNO_ERR_FORMAT, + id, checkpointId, path, times, code, ERRNO_ERR_DATA); + } else { + int32_t el = taosGetTimestampMs() - st; + uDebug("[rsync] %s download checkpointId:%" PRId64 " data:%s successfully, elapsed time:%dms", id, checkpointId, + path, el); + } + } + return code; } diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index d2c957422b..ebc0a864fc 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -273,7 +273,19 @@ int32_t streamTaskSetDb(SStreamMeta* pMeta, SStreamTask* pTask, const char* key) pBackend->pTask = pTask; pBackend->pMeta = pMeta; - if (processVer != -1) pTask->chkInfo.processedVer = processVer; + if (processVer != -1) { + if (pTask->chkInfo.processedVer != processVer) { + stWarn("s-task:%s vgId:%d update checkpointVer:%" PRId64 "->%" PRId64 " for checkpointId:%" PRId64, + pTask->id.idStr, pTask->pMeta->vgId, pTask->chkInfo.processedVer, processVer, pTask->chkInfo.checkpointId); + pTask->chkInfo.processedVer = processVer; + pTask->chkInfo.checkpointVer = processVer; + pTask->chkInfo.nextProcessVer = processVer + 1; + } else { + stInfo("s-task:%s vgId:%d processedVer:%" PRId64 + " in task meta equals to data in checkpoint data for checkpointId:%" PRId64, + pTask->id.idStr, pTask->pMeta->vgId, pTask->chkInfo.processedVer, pTask->chkInfo.checkpointId); + } + } taosHashPut(pMeta->pTaskDbUnique, key, strlen(key), &pBackend, sizeof(void*)); taosThreadMutexUnlock(&pMeta->backendMutex);