fix(stream): update checkpoint into different dir.
This commit is contained in:
parent
c4cde6f268
commit
a886351298
|
@ -13,8 +13,8 @@ extern "C" {
|
||||||
|
|
||||||
void stopRsync();
|
void stopRsync();
|
||||||
void startRsync();
|
void startRsync();
|
||||||
int32_t uploadByRsync(const char* id, const char* path);
|
int32_t uploadByRsync(const char* id, const char* path, int64_t checkpointId);
|
||||||
int32_t downloadRsync(const char* id, const char* path);
|
int32_t downloadByRsync(const char* id, const char* path, int64_t checkpointId);
|
||||||
int32_t deleteRsync(const char* id);
|
int32_t deleteRsync(const char* id);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
|
|
|
@ -157,7 +157,7 @@ void startRsync() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t uploadByRsync(const char* id, const char* path) {
|
int32_t uploadByRsync(const char* id, const char* path, int64_t checkpointId) {
|
||||||
int64_t st = taosGetTimestampMs();
|
int64_t st = taosGetTimestampMs();
|
||||||
char command[PATH_MAX] = {0};
|
char command[PATH_MAX] = {0};
|
||||||
|
|
||||||
|
@ -197,11 +197,11 @@ int32_t uploadByRsync(const char* id, const char* path) {
|
||||||
// prepare the data directory
|
// prepare the data directory
|
||||||
int32_t code = execCommand(command);
|
int32_t code = execCommand(command);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
uError("[rsync] s-task:%s prepare checkpoint data in %s to %s failed, code:%d," ERRNO_ERR_FORMAT, id, path,
|
uError("[rsync] s-task:%s prepare checkpoint dir in %s to %s failed, code:%d," ERRNO_ERR_FORMAT, id, path,
|
||||||
tsSnodeAddress, code, ERRNO_ERR_DATA);
|
tsSnodeAddress, code, ERRNO_ERR_DATA);
|
||||||
} else {
|
} else {
|
||||||
int64_t el = (taosGetTimestampMs() - st);
|
int64_t el = (taosGetTimestampMs() - st);
|
||||||
uDebug("[rsync] s-task:%s prepare checkpoint data in:%s to %s successfully, elapsed time:%" PRId64 "ms", id, path,
|
uDebug("[rsync] s-task:%s prepare checkpoint dir in:%s to %s successfully, elapsed time:%" PRId64 "ms", id, path,
|
||||||
tsSnodeAddress, el);
|
tsSnodeAddress, el);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -215,7 +215,7 @@ int32_t uploadByRsync(const char* id, const char* path) {
|
||||||
#endif
|
#endif
|
||||||
snprintf(command, PATH_MAX,
|
snprintf(command, PATH_MAX,
|
||||||
"rsync -av --debug=all --log-file=%s/rsynclog --delete --timeout=10 --bwlimit=100000 %s/ "
|
"rsync -av --debug=all --log-file=%s/rsynclog --delete --timeout=10 --bwlimit=100000 %s/ "
|
||||||
"rsync://%s/checkpoint/%s/data/",
|
"rsync://%s/checkpoint/%s/%" PRId64 "/",
|
||||||
tsLogDir,
|
tsLogDir,
|
||||||
#ifdef WINDOWS
|
#ifdef WINDOWS
|
||||||
pathTransform
|
pathTransform
|
||||||
|
@ -223,11 +223,11 @@ int32_t uploadByRsync(const char* id, const char* path) {
|
||||||
path
|
path
|
||||||
#endif
|
#endif
|
||||||
,
|
,
|
||||||
tsSnodeAddress, id);
|
tsSnodeAddress, id, checkpointId);
|
||||||
} else {
|
} else {
|
||||||
snprintf(command, PATH_MAX,
|
snprintf(command, PATH_MAX,
|
||||||
"rsync -av --debug=all --log-file=%s/rsynclog --delete --timeout=10 --bwlimit=100000 %s "
|
"rsync -av --debug=all --log-file=%s/rsynclog --delete --timeout=10 --bwlimit=100000 %s "
|
||||||
"rsync://%s/checkpoint/%s/data/",
|
"rsync://%s/checkpoint/%s/%" PRId64 "/",
|
||||||
tsLogDir,
|
tsLogDir,
|
||||||
#ifdef WINDOWS
|
#ifdef WINDOWS
|
||||||
pathTransform
|
pathTransform
|
||||||
|
@ -235,7 +235,7 @@ int32_t uploadByRsync(const char* id, const char* path) {
|
||||||
path
|
path
|
||||||
#endif
|
#endif
|
||||||
,
|
,
|
||||||
tsSnodeAddress, id);
|
tsSnodeAddress, id, checkpointId);
|
||||||
}
|
}
|
||||||
|
|
||||||
code = execCommand(command);
|
code = execCommand(command);
|
||||||
|
@ -252,7 +252,7 @@ int32_t uploadByRsync(const char* id, const char* path) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// abort from retry if quit
|
// abort from retry if quit
|
||||||
int32_t downloadRsync(const char* id, const char* path) {
|
int32_t downloadByRsync(const char* id, const char* path, int64_t checkpointId) {
|
||||||
int64_t st = taosGetTimestampMs();
|
int64_t st = taosGetTimestampMs();
|
||||||
int32_t MAX_RETRY = 10;
|
int32_t MAX_RETRY = 10;
|
||||||
int32_t times = 0;
|
int32_t times = 0;
|
||||||
|
@ -264,6 +264,42 @@ int32_t downloadRsync(const char* id, const char* path) {
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
char command[PATH_MAX] = {0};
|
char command[PATH_MAX] = {0};
|
||||||
|
snprintf(
|
||||||
|
command, PATH_MAX,
|
||||||
|
"rsync -av --debug=all --log-file=%s/rsynclog --timeout=10 --bwlimit=100000 rsync://%s/checkpoint/%s/%" PRId64
|
||||||
|
"/ %s",
|
||||||
|
tsLogDir, tsSnodeAddress, id, checkpointId,
|
||||||
|
#ifdef WINDOWS
|
||||||
|
pathTransform
|
||||||
|
#else
|
||||||
|
path
|
||||||
|
#endif
|
||||||
|
);
|
||||||
|
|
||||||
|
uDebug("[rsync] %s start to sync data from remote to:%s, cmd:%s", id, path, command);
|
||||||
|
|
||||||
|
// while (times++ < MAX_RETRY) {
|
||||||
|
code = execCommand(command);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
uError("[rsync] %s download checkpointId:%" PRId64
|
||||||
|
" data:%s failed, retry after 1sec, times:%d, code:%d," ERRNO_ERR_FORMAT,
|
||||||
|
id, checkpointId, path, times, code, ERRNO_ERR_DATA);
|
||||||
|
// taosSsleep(1);
|
||||||
|
} else {
|
||||||
|
int32_t el = taosGetTimestampMs() - st;
|
||||||
|
uDebug("[rsync] %s download checkpointId:%" PRId64 " data:%s successfully, elapsed time:%dms", id, checkpointId,
|
||||||
|
path, el);
|
||||||
|
// break;
|
||||||
|
}
|
||||||
|
// }
|
||||||
|
|
||||||
|
// if failed, try to load it from data directory
|
||||||
|
#ifdef WINDOWS
|
||||||
|
memset(pathTransform, 0, PATH_MAX);
|
||||||
|
changeDirFromWindowsToLinux(path, pathTransform);
|
||||||
|
#endif
|
||||||
|
|
||||||
|
memset(command, 0, PATH_MAX);
|
||||||
snprintf(
|
snprintf(
|
||||||
command, PATH_MAX,
|
command, PATH_MAX,
|
||||||
"rsync -av --debug=all --log-file=%s/rsynclog --timeout=10 --bwlimit=100000 rsync://%s/checkpoint/%s/data/ %s",
|
"rsync -av --debug=all --log-file=%s/rsynclog --timeout=10 --bwlimit=100000 rsync://%s/checkpoint/%s/data/ %s",
|
||||||
|
@ -275,19 +311,17 @@ int32_t downloadRsync(const char* id, const char* path) {
|
||||||
#endif
|
#endif
|
||||||
);
|
);
|
||||||
|
|
||||||
uDebug("[rsync] %s start to sync data from remote to:%s, %s", id, path, command);
|
uDebug("[rsync] %s start to sync data from remote data dir to:%s, cmd:%s", id, path, command);
|
||||||
|
|
||||||
while (times++ < MAX_RETRY) {
|
code = execCommand(command);
|
||||||
code = execCommand(command);
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
uError("[rsync] %s download checkpointId:%" PRId64
|
||||||
uError("[rsync] %s download checkpoint data:%s failed, retry after 1sec, times:%d, code:%d," ERRNO_ERR_FORMAT, id,
|
" data:%s failed, retry after 1sec, times:%d, code:%d," ERRNO_ERR_FORMAT,
|
||||||
path, times, code, ERRNO_ERR_DATA);
|
id, checkpointId, path, times, code, ERRNO_ERR_DATA);
|
||||||
taosSsleep(1);
|
} else {
|
||||||
} else {
|
int32_t el = taosGetTimestampMs() - st;
|
||||||
int32_t el = taosGetTimestampMs() - st;
|
uDebug("[rsync] %s download checkpointId:%" PRId64 " data:%s successfully, elapsed time:%dms", id, checkpointId,
|
||||||
uDebug("[rsync] %s download checkpoint data:%s successfully, elapsed time:%dms", id, path, el);
|
path, el);
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
|
|
|
@ -447,7 +447,7 @@ int32_t rebuildFromRemoteChkp_rsync(const char* key, char* checkpointPath, int64
|
||||||
cleanDir(defaultPath, key);
|
cleanDir(defaultPath, key);
|
||||||
stDebug("clear local default dir before downloading checkpoint data:%s succ", defaultPath);
|
stDebug("clear local default dir before downloading checkpoint data:%s succ", defaultPath);
|
||||||
|
|
||||||
code = streamTaskDownloadCheckpointData(key, checkpointPath);
|
code = streamTaskDownloadCheckpointData(key, checkpointPath, checkpointId);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
stError("failed to download checkpoint data:%s", key);
|
stError("failed to download checkpoint data:%s", key);
|
||||||
return code;
|
return code;
|
||||||
|
@ -482,7 +482,7 @@ int32_t rebuildDataFromS3(char* chkpPath, int64_t chkpId) {
|
||||||
|
|
||||||
int32_t rebuildFromRemoteChkp_s3(const char* key, char* chkpPath, int64_t chkpId, char* defaultPath) {
|
int32_t rebuildFromRemoteChkp_s3(const char* key, char* chkpPath, int64_t chkpId, char* defaultPath) {
|
||||||
int8_t rename = 0;
|
int8_t rename = 0;
|
||||||
int32_t code = streamTaskDownloadCheckpointData(key, chkpPath);
|
int32_t code = streamTaskDownloadCheckpointData(key, chkpPath, chkpId);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
|
@ -20,7 +20,7 @@
|
||||||
|
|
||||||
static int32_t downloadCheckpointDataByName(const char* id, const char* fname, const char* dstName);
|
static int32_t downloadCheckpointDataByName(const char* id, const char* fname, const char* dstName);
|
||||||
static int32_t deleteCheckpointFile(const char* id, const char* name);
|
static int32_t deleteCheckpointFile(const char* id, const char* name);
|
||||||
static int32_t streamTaskUploadCheckpoint(const char* id, const char* path);
|
static int32_t streamTaskUploadCheckpoint(const char* id, const char* path, int64_t checkpointId);
|
||||||
static int32_t deleteCheckpoint(const char* id);
|
static int32_t deleteCheckpoint(const char* id);
|
||||||
static int32_t downloadCheckpointByNameS3(const char* id, const char* fname, const char* dstName);
|
static int32_t downloadCheckpointByNameS3(const char* id, const char* fname, const char* dstName);
|
||||||
static int32_t continueDispatchCheckpointTriggerBlock(SStreamDataBlock* pBlock, SStreamTask* pTask);
|
static int32_t continueDispatchCheckpointTriggerBlock(SStreamDataBlock* pBlock, SStreamTask* pTask);
|
||||||
|
@ -601,7 +601,7 @@ int32_t uploadCheckpointData(SStreamTask* pTask, int64_t checkpointId, int64_t d
|
||||||
}
|
}
|
||||||
|
|
||||||
if (code == TSDB_CODE_SUCCESS) {
|
if (code == TSDB_CODE_SUCCESS) {
|
||||||
code = streamTaskUploadCheckpoint(idStr, path);
|
code = streamTaskUploadCheckpoint(idStr, path, checkpointId);
|
||||||
if (code == TSDB_CODE_SUCCESS) {
|
if (code == TSDB_CODE_SUCCESS) {
|
||||||
stDebug("s-task:%s upload checkpointId:%" PRId64 " to remote succ", idStr, checkpointId);
|
stDebug("s-task:%s upload checkpointId:%" PRId64 " to remote succ", idStr, checkpointId);
|
||||||
} else {
|
} else {
|
||||||
|
@ -1082,7 +1082,7 @@ ECHECKPOINT_BACKUP_TYPE streamGetCheckpointBackupType() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t streamTaskUploadCheckpoint(const char* id, const char* path) {
|
int32_t streamTaskUploadCheckpoint(const char* id, const char* path, int64_t checkpointId) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
if (id == NULL || path == NULL || strlen(id) == 0 || strlen(path) == 0 || strlen(path) >= PATH_MAX) {
|
if (id == NULL || path == NULL || strlen(id) == 0 || strlen(path) == 0 || strlen(path) >= PATH_MAX) {
|
||||||
stError("invalid parameters in upload checkpoint, %s", id);
|
stError("invalid parameters in upload checkpoint, %s", id);
|
||||||
|
@ -1090,7 +1090,7 @@ int32_t streamTaskUploadCheckpoint(const char* id, const char* path) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (strlen(tsSnodeAddress) != 0) {
|
if (strlen(tsSnodeAddress) != 0) {
|
||||||
code = uploadByRsync(id, path);
|
code = uploadByRsync(id, path, checkpointId);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
return TAOS_SYSTEM_ERROR(errno);
|
return TAOS_SYSTEM_ERROR(errno);
|
||||||
}
|
}
|
||||||
|
@ -1117,14 +1117,14 @@ int32_t downloadCheckpointDataByName(const char* id, const char* fname, const ch
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t streamTaskDownloadCheckpointData(const char* id, char* path) {
|
int32_t streamTaskDownloadCheckpointData(const char* id, char* path, int64_t checkpointId) {
|
||||||
if (id == NULL || path == NULL || strlen(id) == 0 || strlen(path) == 0 || strlen(path) >= PATH_MAX) {
|
if (id == NULL || path == NULL || strlen(id) == 0 || strlen(path) == 0 || strlen(path) >= PATH_MAX) {
|
||||||
stError("down checkpoint data parameters invalid");
|
stError("down checkpoint data parameters invalid");
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (strlen(tsSnodeAddress) != 0) {
|
if (strlen(tsSnodeAddress) != 0) {
|
||||||
return downloadRsync(id, path);
|
return downloadByRsync(id, path, checkpointId);
|
||||||
} else if (tsS3StreamEnabled) {
|
} else if (tsS3StreamEnabled) {
|
||||||
return s3GetObjectsByPrefix(id, path);
|
return s3GetObjectsByPrefix(id, path);
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue