refactor: do some internal refactor.

This commit is contained in:
Haojun Liao 2024-05-08 13:52:20 +08:00
parent c43fbf6fee
commit 9ce8f6953d
6 changed files with 54 additions and 31 deletions

View File

@ -11,11 +11,11 @@ extern "C" {
#include "tarray.h"
void stopRsync();
void startRsync();
int uploadRsync(const char* id, const char* path);
int downloadRsync(const char* id, const char* path);
int deleteRsync(const char* id);
void stopRsync();
void startRsync();
int32_t uploadRsync(const char* id, const char* path);
int32_t downloadRsync(const char* id, const char* path);
int32_t deleteRsync(const char* id);
#ifdef __cplusplus
}

View File

@ -1208,9 +1208,11 @@ int32_t s3GetObjectToFile(const char *object_name, const char *fileName) {
TdFilePtr pFile = taosOpenFile(fileName, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC);
if (pFile == NULL) {
uError("[s3] open file error, errno:%d, fileName:%s", errno, fileName);
terrno = TAOS_SYSTEM_ERROR(errno);
uError("[s3] open file error, errno:%d, fileName:%s", terrno, fileName);
return -1;
}
TS3GetData cbd = {0};
cbd.file = pFile;
do {

View File

@ -43,7 +43,7 @@ static void removeEmptyDir() {
static void changeDirFromWindowsToLinux(char* from, char* to){
to[0] = '/';
to[1] = from[0];
for(int i = 2; i < strlen(from); i++) {
for(int32_t i = 2; i < strlen(from); i++) {
if (from[i] == '\\') {
to[i] = '/';
} else {
@ -53,7 +53,7 @@ static void changeDirFromWindowsToLinux(char* from, char* to){
}
#endif
static int generateConfigFile(char* confDir) {
static int32_t generateConfigFile(char* confDir) {
TdFilePtr pFile = taosOpenFile(confDir, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC);
if (pFile == NULL) {
uError("[rsync] open conf file error, dir:%s,"ERRNO_ERR_FORMAT, confDir, ERRNO_ERR_DATA);
@ -98,8 +98,8 @@ static int generateConfigFile(char* confDir) {
return 0;
}
static int execCommand(char* command){
int try = 3;
static int32_t execCommand(char* command){
int32_t try = 3;
int32_t code = 0;
while(try-- > 0) {
code = system(command);
@ -112,7 +112,7 @@ static int execCommand(char* command){
}
void stopRsync() {
int code =
int32_t code =
#ifdef WINDOWS
system("taskkill /f /im rsync.exe");
#else
@ -135,7 +135,7 @@ void startRsync() {
char confDir[PATH_MAX] = {0};
snprintf(confDir, PATH_MAX, "%srsync.conf", tsCheckpointBackupDir);
int code = generateConfigFile(confDir);
int32_t code = generateConfigFile(confDir);
if(code != 0){
return;
}
@ -148,14 +148,16 @@ void startRsync() {
uError("[rsync] start server failed, code:%d,"ERRNO_ERR_FORMAT, code, ERRNO_ERR_DATA);
return;
}
uDebug("[rsync] start server successful");
}
int uploadRsync(const char* id, const char* path) {
int32_t uploadRsync(const char* id, const char* path) {
#ifdef WINDOWS
char pathTransform[PATH_MAX] = {0};
changeDirFromWindowsToLinux(path, pathTransform);
#endif
char command[PATH_MAX] = {0};
#ifdef WINDOWS
if(pathTransform[strlen(pathTransform) - 1] != '/'){
@ -169,7 +171,7 @@ int uploadRsync(const char* id, const char* path) {
path
#endif
, tsSnodeAddress, id);
}else{
} else {
snprintf(command, PATH_MAX, "rsync -av --delete --timeout=10 --bwlimit=100000 %s rsync://%s/checkpoint/%s/",
#ifdef WINDOWS
pathTransform
@ -179,16 +181,17 @@ int uploadRsync(const char* id, const char* path) {
, tsSnodeAddress, id);
}
int code = execCommand(command);
int32_t code = execCommand(command);
if(code != 0){
uError("[rsync] send failed code:%d," ERRNO_ERR_FORMAT, code, ERRNO_ERR_DATA);
return -1;
}
uDebug("[rsync] upload data:%s successful", id);
return 0;
}
int downloadRsync(const char* id, const char* path) {
int32_t downloadRsync(const char* id, const char* path) {
#ifdef WINDOWS
char pathTransform[PATH_MAX] = {0};
changeDirFromWindowsToLinux(path, pathTransform);
@ -203,33 +206,34 @@ int downloadRsync(const char* id, const char* path) {
#endif
);
int code = execCommand(command);
if(code != 0){
int32_t code = execCommand(command);
if (code != 0) {
uError("[rsync] get failed code:%d," ERRNO_ERR_FORMAT, code, ERRNO_ERR_DATA);
return -1;
}
uDebug("[rsync] down data:%s successful", id);
return 0;
}
int deleteRsync(const char* id) {
char* tmp = "./tmp_empty/";
int code = taosMkDir(tmp);
if(code != 0){
int32_t deleteRsync(const char* id) {
char* tmp = "./tmp_empty/";
int32_t code = taosMkDir(tmp);
if (code != 0) {
uError("[rsync] make tmp dir failed. code:%d," ERRNO_ERR_FORMAT, code, ERRNO_ERR_DATA);
return -1;
}
char command[PATH_MAX] = {0};
snprintf(command, PATH_MAX, "rsync -av --delete --timeout=10 %s rsync://%s/checkpoint/%s/",
tmp, tsSnodeAddress, id);
snprintf(command, PATH_MAX, "rsync -av --delete --timeout=10 %s rsync://%s/checkpoint/%s/", tmp, tsSnodeAddress, id);
code = execCommand(command);
taosRemoveDir(tmp);
if(code != 0){
if (code != 0) {
uError("[rsync] get failed code:%d," ERRNO_ERR_FORMAT, code, ERRNO_ERR_DATA);
return -1;
}
uDebug("[rsync] delete data:%s successful", id);
uDebug("[rsync] delete data:%s successful", id);
return 0;
}

View File

@ -329,6 +329,7 @@ int32_t rebuildFromRemoteChkp_rsync(char* key, char* chkpPath, int64_t chkpId, c
if (taosIsDir(chkpPath)) {
taosRemoveDir(chkpPath);
}
if (taosIsDir(defaultPath)) {
taosRemoveDir(defaultPath);
}
@ -337,10 +338,11 @@ int32_t rebuildFromRemoteChkp_rsync(char* key, char* chkpPath, int64_t chkpId, c
if (code != 0) {
return code;
}
code = backendCopyFiles(chkpPath, defaultPath);
code = backendCopyFiles(chkpPath, defaultPath);
return code;
}
int32_t rebuildFromRemoteChkp_s3(char* key, char* chkpPath, int64_t chkpId, char* defaultPath) {
int32_t code = streamTaskDownloadCheckpointData(key, chkpPath);
if (code != 0) {
@ -375,6 +377,7 @@ int32_t rebuildFromRemoteChkp_s3(char* key, char* chkpPath, int64_t chkpId, char
taosMemoryFree(tmp);
return code;
}
int32_t rebuildFromRemoteChkp(char* key, char* chkpPath, int64_t chkpId, char* defaultPath) {
ECHECKPOINT_BACKUP_TYPE type = streamGetCheckpointBackupType();
if (type == DATA_UPLOAD_S3) {
@ -1993,6 +1996,7 @@ _EXIT:
if (cfNames) rocksdb_list_column_families_destroy(cfNames, nCf);
return NULL;
}
STaskDbWrapper* taskDbOpen(char* path, char* key, int64_t chkpId) {
char* statePath = NULL;
char* dbPath = NULL;

View File

@ -32,6 +32,7 @@ static int32_t downloadCheckpointDataByName(const char* id, const char* fname, c
static int32_t deleteCheckpointFile(const char* id, const char* name);
static int32_t streamTaskBackupCheckpoint(const char* id, const char* path);
static int32_t deleteCheckpoint(const char* id);
static int32_t downloadCheckpointByNameS3(const char* id, const char* fname, const char* dstName);
int32_t tEncodeStreamCheckpointSourceReq(SEncoder* pEncoder, const SStreamCheckpointSourceReq* pReq) {
if (tStartEncode(pEncoder) < 0) return -1;
@ -444,14 +445,15 @@ int32_t uploadCheckpointData(void* param) {
(int8_t)(arg->type), &path, toDelFiles)) != 0) {
stError("s-task:%s failed to gen upload checkpoint:%" PRId64 "", taskStr, arg->chkpId);
}
if (arg->type == DATA_UPLOAD_S3) {
if (code == 0 && (code = getCheckpointDataMeta(arg->taskId, path, toDelFiles)) != 0) {
stError("s-task:%s failed to get checkpoint:%" PRId64 " meta", taskStr, arg->chkpId);
stError("s-task:%s failed to get checkpointId:%" PRId64 " meta", taskStr, arg->chkpId);
}
}
if (code == 0 && (code = streamTaskBackupCheckpoint(arg->taskId, path)) != 0) {
stError("s-task:%s failed to upload checkpoint:%" PRId64, taskStr, arg->chkpId);
stError("s-task:%s failed to upload checkpointId:%" PRId64, taskStr, arg->chkpId);
}
taskReleaseDb(arg->dbRefId);
@ -610,13 +612,19 @@ static int32_t uploadCheckpointToS3(const char* id, const char* path) {
return 0;
}
static int32_t downloadCheckpointByNameS3(const char* id, const char* fname, const char* dstName) {
int32_t downloadCheckpointByNameS3(const char* id, const char* fname, const char* dstName) {
int32_t code = 0;
char* buf = taosMemoryCalloc(1, strlen(id) + strlen(dstName) + 4);
if (buf == NULL) {
code = terrno = TSDB_CODE_OUT_OF_MEMORY;
return code;
}
sprintf(buf, "%s/%s", id, fname);
if (s3GetObjectToFile(buf, dstName) != 0) {
code = -1;
code = errno;
}
taosMemoryFree(buf);
return code;
}
@ -636,11 +644,13 @@ int32_t streamTaskBackupCheckpoint(const char* id, const char* path) {
stError("streamTaskBackupCheckpoint parameters invalid");
return -1;
}
if (strlen(tsSnodeAddress) != 0) {
return uploadRsync(id, path);
} else if (tsS3StreamEnabled) {
return uploadCheckpointToS3(id, path);
}
return 0;
}

View File

@ -104,6 +104,7 @@ void metaRefMgtCleanup() {
int32_t metaRefMgtAdd(int64_t vgId, int64_t* rid) {
taosThreadMutexLock(&gMetaRefMgt.mutex);
void* p = taosHashGet(gMetaRefMgt.pTable, &vgId, sizeof(vgId));
if (p == NULL) {
SArray* list = taosArrayInit(8, sizeof(void*));
@ -113,6 +114,7 @@ int32_t metaRefMgtAdd(int64_t vgId, int64_t* rid) {
SArray* list = *(SArray**)p;
taosArrayPush(list, &rid);
}
taosThreadMutexUnlock(&gMetaRefMgt.mutex);
return 0;
}
@ -284,6 +286,7 @@ int32_t streamTaskSetDb(SStreamMeta* pMeta, void* arg, char* key) {
stDebug("s-task:0x%x set backend %p", pTask->id.taskId, pBackend);
return 0;
}
void streamMetaRemoveDB(void* arg, char* key) {
if (arg == NULL || key == NULL) return;