fix(stream): add some logs.
This commit is contained in:
parent
fb3fe03c1f
commit
0cca12ab52
|
@ -278,41 +278,6 @@ int32_t downloadByRsync(const char* id, const char* path, int64_t checkpointId)
|
||||||
|
|
||||||
uDebug("[rsync] %s start to sync data from remote to:%s, cmd:%s", id, path, command);
|
uDebug("[rsync] %s start to sync data from remote to:%s, cmd:%s", id, path, command);
|
||||||
|
|
||||||
// while (times++ < MAX_RETRY) {
|
|
||||||
code = execCommand(command);
|
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
|
||||||
uError("[rsync] %s download checkpointId:%" PRId64
|
|
||||||
" data:%s failed, retry after 1sec, times:%d, code:%d," ERRNO_ERR_FORMAT,
|
|
||||||
id, checkpointId, path, times, code, ERRNO_ERR_DATA);
|
|
||||||
// taosSsleep(1);
|
|
||||||
} else {
|
|
||||||
int32_t el = taosGetTimestampMs() - st;
|
|
||||||
uDebug("[rsync] %s download checkpointId:%" PRId64 " data:%s successfully, elapsed time:%dms", id, checkpointId,
|
|
||||||
path, el);
|
|
||||||
// break;
|
|
||||||
}
|
|
||||||
// }
|
|
||||||
|
|
||||||
// if failed, try to load it from data directory
|
|
||||||
#ifdef WINDOWS
|
|
||||||
memset(pathTransform, 0, PATH_MAX);
|
|
||||||
changeDirFromWindowsToLinux(path, pathTransform);
|
|
||||||
#endif
|
|
||||||
|
|
||||||
memset(command, 0, PATH_MAX);
|
|
||||||
snprintf(
|
|
||||||
command, PATH_MAX,
|
|
||||||
"rsync -av --debug=all --log-file=%s/rsynclog --timeout=10 --bwlimit=100000 rsync://%s/checkpoint/%s/data/ %s",
|
|
||||||
tsLogDir, tsSnodeAddress, id,
|
|
||||||
#ifdef WINDOWS
|
|
||||||
pathTransform
|
|
||||||
#else
|
|
||||||
path
|
|
||||||
#endif
|
|
||||||
);
|
|
||||||
|
|
||||||
uDebug("[rsync] %s start to sync data from remote data dir to:%s, cmd:%s", id, path, command);
|
|
||||||
|
|
||||||
code = execCommand(command);
|
code = execCommand(command);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
uError("[rsync] %s download checkpointId:%" PRId64
|
uError("[rsync] %s download checkpointId:%" PRId64
|
||||||
|
@ -324,6 +289,38 @@ int32_t downloadByRsync(const char* id, const char* path, int64_t checkpointId)
|
||||||
path, el);
|
path, el);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (code != TSDB_CODE_SUCCESS) { // if failed, try to load it from data directory
|
||||||
|
#ifdef WINDOWS
|
||||||
|
memset(pathTransform, 0, PATH_MAX);
|
||||||
|
changeDirFromWindowsToLinux(path, pathTransform);
|
||||||
|
#endif
|
||||||
|
|
||||||
|
memset(command, 0, PATH_MAX);
|
||||||
|
snprintf(
|
||||||
|
command, PATH_MAX,
|
||||||
|
"rsync -av --debug=all --log-file=%s/rsynclog --timeout=10 --bwlimit=100000 rsync://%s/checkpoint/%s/data/ %s",
|
||||||
|
tsLogDir, tsSnodeAddress, id,
|
||||||
|
#ifdef WINDOWS
|
||||||
|
pathTransform
|
||||||
|
#else
|
||||||
|
path
|
||||||
|
#endif
|
||||||
|
);
|
||||||
|
|
||||||
|
uDebug("[rsync] %s start to sync data from remote data dir to:%s, cmd:%s", id, path, command);
|
||||||
|
|
||||||
|
code = execCommand(command);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
uError("[rsync] %s download checkpointId:%" PRId64
|
||||||
|
" data:%s failed, retry after 1sec, times:%d, code:%d," ERRNO_ERR_FORMAT,
|
||||||
|
id, checkpointId, path, times, code, ERRNO_ERR_DATA);
|
||||||
|
} else {
|
||||||
|
int32_t el = taosGetTimestampMs() - st;
|
||||||
|
uDebug("[rsync] %s download checkpointId:%" PRId64 " data:%s successfully, elapsed time:%dms", id, checkpointId,
|
||||||
|
path, el);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -273,7 +273,19 @@ int32_t streamTaskSetDb(SStreamMeta* pMeta, SStreamTask* pTask, const char* key)
|
||||||
pBackend->pTask = pTask;
|
pBackend->pTask = pTask;
|
||||||
pBackend->pMeta = pMeta;
|
pBackend->pMeta = pMeta;
|
||||||
|
|
||||||
if (processVer != -1) pTask->chkInfo.processedVer = processVer;
|
if (processVer != -1) {
|
||||||
|
if (pTask->chkInfo.processedVer != processVer) {
|
||||||
|
stWarn("s-task:%s vgId:%d update checkpointVer:%" PRId64 "->%" PRId64 " for checkpointId:%" PRId64,
|
||||||
|
pTask->id.idStr, pTask->pMeta->vgId, pTask->chkInfo.processedVer, processVer, pTask->chkInfo.checkpointId);
|
||||||
|
pTask->chkInfo.processedVer = processVer;
|
||||||
|
pTask->chkInfo.checkpointVer = processVer;
|
||||||
|
pTask->chkInfo.nextProcessVer = processVer + 1;
|
||||||
|
} else {
|
||||||
|
stInfo("s-task:%s vgId:%d processedVer:%" PRId64
|
||||||
|
" in task meta equals to data in checkpoint data for checkpointId:%" PRId64,
|
||||||
|
pTask->id.idStr, pTask->pMeta->vgId, pTask->chkInfo.processedVer, pTask->chkInfo.checkpointId);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
taosHashPut(pMeta->pTaskDbUnique, key, strlen(key), &pBackend, sizeof(void*));
|
taosHashPut(pMeta->pTaskDbUnique, key, strlen(key), &pBackend, sizeof(void*));
|
||||||
taosThreadMutexUnlock(&pMeta->backendMutex);
|
taosThreadMutexUnlock(&pMeta->backendMutex);
|
||||||
|
|
Loading…
Reference in New Issue