fix(stream): fix error in downloading remote backup checkpoint data.
This commit is contained in:
parent
f8d8061260
commit
d214dd5cdd
|
@ -196,10 +196,14 @@ int32_t uploadRsync(const char* id, const char* path) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t downloadRsync(const char* id, const char* path) {
|
int32_t downloadRsync(const char* id, const char* path) {
|
||||||
|
int64_t st = taosGetTimestampMs();
|
||||||
|
uDebug("[rsync] %s start to sync data from remote to local:%s", id, path);
|
||||||
|
|
||||||
#ifdef WINDOWS
|
#ifdef WINDOWS
|
||||||
char pathTransform[PATH_MAX] = {0};
|
char pathTransform[PATH_MAX] = {0};
|
||||||
changeDirFromWindowsToLinux(path, pathTransform);
|
changeDirFromWindowsToLinux(path, pathTransform);
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
char command[PATH_MAX] = {0};
|
char command[PATH_MAX] = {0};
|
||||||
snprintf(command, PATH_MAX, "rsync -av --timeout=10 --bwlimit=100000 rsync://%s/checkpoint/%s/ %s",
|
snprintf(command, PATH_MAX, "rsync -av --timeout=10 --bwlimit=100000 rsync://%s/checkpoint/%s/ %s",
|
||||||
tsSnodeAddress, id,
|
tsSnodeAddress, id,
|
||||||
|
@ -211,13 +215,15 @@ int32_t downloadRsync(const char* id, const char* path) {
|
||||||
);
|
);
|
||||||
|
|
||||||
int32_t code = execCommand(command);
|
int32_t code = execCommand(command);
|
||||||
|
|
||||||
|
int32_t el = taosGetTimestampMs() - st;
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
uError("[rsync] download checkpoint data failed, code:%d," ERRNO_ERR_FORMAT, code, ERRNO_ERR_DATA);
|
uError("[rsync] %s download checkpoint data:%s failed, code:%d," ERRNO_ERR_FORMAT, id, path, code, ERRNO_ERR_DATA);
|
||||||
return -1;
|
} else {
|
||||||
|
uDebug("[rsync] %s download checkpoint data:%s successfully, elapsed time:%dms", id, path, el);
|
||||||
}
|
}
|
||||||
|
|
||||||
uDebug("[rsync] download checkpoint data:%s successfully", id);
|
return code;
|
||||||
return 0;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t deleteRsync(const char* id) {
|
int32_t deleteRsync(const char* id) {
|
||||||
|
|
|
@ -325,28 +325,27 @@ int32_t remoteChkp_validAndCvtMeta(char* path, SArray* list, int64_t chkpId) {
|
||||||
return complete == 1 ? 0 : -1;
|
return complete == 1 ? 0 : -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t rebuildFromRemoteChkp_rsync(char* key, char* chkpPath, int64_t checkpointId, char* defaultPath) {
|
int32_t rebuildFromRemoteChkp_rsync(char* key, char* chkptPath, int64_t checkpointId, char* defaultPath) {
|
||||||
// impl later
|
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
if (taosIsDir(chkpPath)) {
|
if (taosIsDir(chkptPath)) {
|
||||||
taosRemoveDir(chkpPath);
|
taosRemoveDir(chkptPath);
|
||||||
|
stDebug("remove local checkpoint data dir:%s succ", chkptPath);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (taosIsDir(defaultPath)) {
|
if (taosIsDir(defaultPath)) {
|
||||||
taosRemoveDir(defaultPath);
|
taosRemoveDir(defaultPath);
|
||||||
|
taosMulMkDir(defaultPath);
|
||||||
|
stDebug("clear local backend dir:%s succ", defaultPath);
|
||||||
}
|
}
|
||||||
|
|
||||||
code = streamTaskDownloadCheckpointData(key, chkpPath);
|
code = streamTaskDownloadCheckpointData(key, chkptPath);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
stError("failed to download checkpoint data:%s", key);
|
stError("failed to download checkpoint data:%s", key);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
stDebug("download backup checkpoint data into:%s, checkpointId:%" PRId64 ", %s", chkpPath, checkpointId, key);
|
stDebug("download remote checkpoint data for checkpointId:%" PRId64 ", %s", checkpointId, key);
|
||||||
|
return backendCopyFiles(chkptPath, defaultPath);
|
||||||
code = backendCopyFiles(chkpPath, defaultPath);
|
|
||||||
|
|
||||||
return code;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t rebuildFromRemoteChkp_s3(char* key, char* chkpPath, int64_t chkpId, char* defaultPath) {
|
int32_t rebuildFromRemoteChkp_s3(char* key, char* chkpPath, int64_t chkpId, char* defaultPath) {
|
||||||
|
@ -456,7 +455,7 @@ int32_t backendFileCopyFilesImpl(const char* src, const char* dst) {
|
||||||
code = copyFiles_hardlink(srcName, dstName, 0);
|
code = copyFiles_hardlink(srcName, dstName, 0);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
code = TAOS_SYSTEM_ERROR(code);
|
code = TAOS_SYSTEM_ERROR(code);
|
||||||
stError("failed to hardlink file, detail:%s to %s, reason:%s", srcName, dstName, tstrerror(code));
|
stError("failed to hard link file, detail:%s to %s, reason:%s", srcName, dstName, tstrerror(code));
|
||||||
goto _ERROR;
|
goto _ERROR;
|
||||||
} else {
|
} else {
|
||||||
stDebug("succ hard link file:%s to %s", srcName, dstName);
|
stDebug("succ hard link file:%s to %s", srcName, dstName);
|
||||||
|
@ -485,14 +484,13 @@ int32_t backendCopyFiles(const char* src, const char* dst) {
|
||||||
return backendFileCopyFilesImpl(src, dst);
|
return backendFileCopyFilesImpl(src, dst);
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t rebuildFromLocalCheckpoint(char* pTaskIdStr, const char* checkpointPath, int64_t chkpId, const char* defaultPath) {
|
static int32_t rebuildFromLocalCheckpoint(char* pTaskIdStr, const char* checkpointPath, int64_t chkptId, const char* defaultPath) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
|
||||||
if (taosIsDir(defaultPath)) {
|
if (taosIsDir(defaultPath)) {
|
||||||
taosRemoveDir(defaultPath);
|
taosRemoveDir(defaultPath);
|
||||||
taosMkDir(defaultPath);
|
taosMkDir(defaultPath);
|
||||||
|
stInfo("clear task backend dir:%s, done", defaultPath);
|
||||||
stInfo("clear task backend path:%s, done", defaultPath);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (taosIsDir(checkpointPath) && isValidCheckpoint(checkpointPath)) {
|
if (taosIsDir(checkpointPath) && isValidCheckpoint(checkpointPath)) {
|
||||||
|
@ -506,11 +504,12 @@ static int32_t rebuildFromLocalCheckpoint(char* pTaskIdStr, const char* checkpoi
|
||||||
pTaskIdStr, checkpointPath, tstrerror(TAOS_SYSTEM_ERROR(errno)), defaultPath);
|
pTaskIdStr, checkpointPath, tstrerror(TAOS_SYSTEM_ERROR(errno)), defaultPath);
|
||||||
code = TSDB_CODE_SUCCESS;
|
code = TSDB_CODE_SUCCESS;
|
||||||
} else {
|
} else {
|
||||||
stInfo("%s start to restart stream backend at checkpoint path: %s", pTaskIdStr, checkpointPath);
|
stInfo("%s copy checkpoint data from:%s to:%s succ, try to start stream backend", pTaskIdStr, checkpointPath,
|
||||||
|
defaultPath);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
code = TSDB_CODE_FAILED;
|
code = TSDB_CODE_FAILED;
|
||||||
stError("%s not valid checkpoint path/data in:%s", pTaskIdStr, checkpointPath);
|
stError("%s no valid checkpoint data for checkpointId:%" PRId64 " in %s", pTaskIdStr, chkptId, checkpointPath);
|
||||||
}
|
}
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
|
@ -522,7 +521,6 @@ int32_t rebuildFromlocalDefault(char* key, char* chkpPath, int64_t chkpId, char*
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t rebuildDirFormCheckpoint(const char* path, char* key, int64_t chkptId, char** dbPrefixPath, char** dbPath) {
|
int32_t rebuildDirFormCheckpoint(const char* path, char* key, int64_t chkptId, char** dbPrefixPath, char** dbPath) {
|
||||||
// impl later
|
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
|
||||||
char* prefixPath = taosMemoryCalloc(1, strlen(path) + 128);
|
char* prefixPath = taosMemoryCalloc(1, strlen(path) + 128);
|
||||||
|
@ -538,8 +536,7 @@ int32_t rebuildDirFormCheckpoint(const char* path, char* key, int64_t chkptId, c
|
||||||
if (!taosIsDir(defaultPath)) {
|
if (!taosIsDir(defaultPath)) {
|
||||||
taosMulMkDir(defaultPath);
|
taosMulMkDir(defaultPath);
|
||||||
}
|
}
|
||||||
|
stDebug("local default dir:%s, checkpointId:%" PRId64 ", key:%s succ", defaultPath, chkptId, key);
|
||||||
stDebug("prepare local dir:%s, checkpointId:%" PRId64 ", key:%s succ", defaultPath, chkptId, key);
|
|
||||||
|
|
||||||
char* chkptPath = taosMemoryCalloc(1, strlen(path) + 256);
|
char* chkptPath = taosMemoryCalloc(1, strlen(path) + 256);
|
||||||
if (chkptId != 0) {
|
if (chkptId != 0) {
|
||||||
|
|
Loading…
Reference in New Issue