refactor rsync

This commit is contained in:
Yihao Deng 2024-07-18 08:45:58 +00:00
parent 597d239c4a
commit e20a972b7f
3 changed files with 26 additions and 12 deletions

View File

@ -12,7 +12,7 @@ extern "C" {
#include "tarray.h"
void stopRsync();
void startRsync();
int32_t startRsync();
int32_t uploadByRsync(const char* id, const char* path);
int32_t downloadRsync(const char* id, const char* path);
int32_t deleteRsync(const char* id);

View File

@ -54,10 +54,11 @@ static void changeDirFromWindowsToLinux(char* from, char* to) {
#endif
static int32_t generateConfigFile(char* confDir) {
int32_t code = 0;
TdFilePtr pFile = taosOpenFile(confDir, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC);
if (pFile == NULL) {
uError("[rsync] open conf file error, dir:%s," ERRNO_ERR_FORMAT, confDir, ERRNO_ERR_DATA);
return -1;
return TAOS_SYSTEM_ERROR(errno);
}
#ifdef WINDOWS
@ -92,7 +93,8 @@ static int32_t generateConfigFile(char* confDir) {
if (taosWriteFile(pFile, confContent, strlen(confContent)) <= 0) {
uError("[rsync] write conf file error," ERRNO_ERR_FORMAT, ERRNO_ERR_DATA);
taosCloseFile(&pFile);
return -1;
code = TAOS_SYSTEM_ERROR(errno);
return code;
}
taosCloseFile(&pFile);
@ -129,11 +131,13 @@ void stopRsync() {
taosMsleep(500); // sleep 500 ms to wait for the completion of kill operation.
}
void startRsync() {
int32_t startRsync() {
int32_t code = 0;
if (taosMulMkDir(tsCheckpointBackupDir) != 0) {
uError("[rsync] build checkpoint backup dir failed, path:%s," ERRNO_ERR_FORMAT, tsCheckpointBackupDir,
ERRNO_ERR_DATA);
return;
code = TAOS_SYSTEM_ERROR(errno);
return code;
}
removeEmptyDir();
@ -141,9 +145,9 @@ void startRsync() {
char confDir[PATH_MAX] = {0};
snprintf(confDir, PATH_MAX, "%srsync.conf", tsCheckpointBackupDir);
int32_t code = generateConfigFile(confDir);
code = generateConfigFile(confDir);
if (code != 0) {
return;
return code;
}
char cmd[PATH_MAX] = {0};
@ -152,9 +156,11 @@ void startRsync() {
code = system(cmd);
if (code != 0) {
uError("[rsync] cmd:%s start server failed, code:%d," ERRNO_ERR_FORMAT, cmd, code, ERRNO_ERR_DATA);
code = TAOS_SYSTEM_ERROR(errno);
} else {
uInfo("[rsync] cmd:%s start server successful", cmd);
}
return code;
}
int32_t uploadByRsync(const char* id, const char* path) {
@ -199,6 +205,7 @@ int32_t uploadByRsync(const char* id, const char* path) {
if (code != 0) {
uError("[rsync] s-task:%s prepare checkpoint data in %s to %s failed, code:%d," ERRNO_ERR_FORMAT, id, path,
tsSnodeAddress, code, ERRNO_ERR_DATA);
code = TAOS_SYSTEM_ERROR(errno);
} else {
int64_t el = (taosGetTimestampMs() - st);
uDebug("[rsync] s-task:%s prepare checkpoint data in:%s to %s successfully, elapsed time:%" PRId64 "ms", id, path,
@ -242,6 +249,7 @@ int32_t uploadByRsync(const char* id, const char* path) {
if (code != 0) {
uError("[rsync] s-task:%s upload checkpoint data in %s to %s failed, code:%d," ERRNO_ERR_FORMAT, id, path,
tsSnodeAddress, code, ERRNO_ERR_DATA);
code = TAOS_SYSTEM_ERROR(errno);
} else {
int64_t el = (taosGetTimestampMs() - st);
uDebug("[rsync] s-task:%s upload checkpoint data in:%s to %s successfully, elapsed time:%" PRId64 "ms", id, path,
@ -283,13 +291,13 @@ int32_t downloadRsync(const char* id, const char* path) {
uError("[rsync] %s download checkpoint data:%s failed, retry after 1sec, times:%d, code:%d," ERRNO_ERR_FORMAT, id,
path, times, code, ERRNO_ERR_DATA);
taosSsleep(1);
code = TAOS_SYSTEM_ERROR(errno);
} else {
int32_t el = taosGetTimestampMs() - st;
uDebug("[rsync] %s download checkpoint data:%s successfully, elapsed time:%dms", id, path, el);
break;
}
}
return code;
}
@ -298,7 +306,7 @@ int32_t deleteRsync(const char* id) {
int32_t code = taosMkDir(tmp);
if (code != 0) {
uError("[rsync] make tmp dir failed. code:%d," ERRNO_ERR_FORMAT, code, ERRNO_ERR_DATA);
return -1;
return TAOS_SYSTEM_ERROR(errno);
}
char command[PATH_MAX] = {0};
@ -310,7 +318,7 @@ int32_t deleteRsync(const char* id) {
taosRemoveDir(tmp);
if (code != 0) {
uError("[rsync] get failed code:%d," ERRNO_ERR_FORMAT, code, ERRNO_ERR_DATA);
return -1;
return TAOS_SYSTEM_ERROR(errno);
}
uDebug("[rsync] delete data:%s successful", id);

View File

@ -61,6 +61,7 @@ int32_t sndBuildStreamTask(SSnode *pSnode, SStreamTask *pTask, int64_t nextProce
}
SSnode *sndOpen(const char *path, const SSnodeOpt *pOption) {
int32_t code = 0;
SSnode *pSnode = taosMemoryCalloc(1, sizeof(SSnode));
if (pSnode == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
@ -68,10 +69,15 @@ SSnode *sndOpen(const char *path, const SSnodeOpt *pOption) {
}
stopRsync();
startRsync();
code = startRsync();
if (code != 0) {
terrno = code;
goto FAIL;
}
pSnode->msgCb = pOption->msgCb;
pSnode->pMeta = streamMetaOpen(path, pSnode, (FTaskBuild *)sndBuildStreamTask, tqExpandStreamTask, SNODE_HANDLE, taosGetTimestampMs(), tqStartTaskCompleteCallback);
pSnode->pMeta = streamMetaOpen(path, pSnode, (FTaskBuild *)sndBuildStreamTask, tqExpandStreamTask, SNODE_HANDLE,
taosGetTimestampMs(), tqStartTaskCompleteCallback);
if (pSnode->pMeta == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto FAIL;