Merge branch '3.0' into enh/refactorBackend

This commit is contained in:
yihaoDeng 2023-11-20 16:55:14 +08:00
parent 734bf07317
commit 0d724244dc
2 changed files with 47 additions and 25 deletions

View File

@ -321,7 +321,25 @@ int32_t remoteChkp_validAndCvtMeta(char* path, SArray* list, int64_t chkpId) {
return complete == 1 ? 0 : -1;
}
int32_t rebuildFromRemoteChkp(char* key, char* chkpPath, int64_t chkpId, char* defaultPath) {
int32_t rebuildFromRemoteChkp_rsync(char* key, char* chkpPath, int64_t chkpId, char* defaultPath) {
// impl later
int32_t code = 0;
if (taosIsDir(chkpPath)) {
taosRemoveDir(chkpPath);
}
if (taosIsDir(defaultPath)) {
taosRemoveDir(defaultPath);
}
code = downloadCheckpoint(key, chkpPath);
if (code != 0) {
return code;
}
code = copyFiles(chkpPath, defaultPath);
return code;
}
int32_t rebuildFromRemoteChkp_s3(char* key, char* chkpPath, int64_t chkpId, char* defaultPath) {
int32_t code = downloadCheckpoint(key, chkpPath);
if (code != 0) {
return code;
@ -355,6 +373,15 @@ int32_t rebuildFromRemoteChkp(char* key, char* chkpPath, int64_t chkpId, char* d
taosMemoryFree(tmp);
return code;
}
int32_t rebuildFromRemoteChkp(char* key, char* chkpPath, int64_t chkpId, char* defaultPath) {
UPLOAD_TYPE type = getUploadType();
if (type == UPLOAD_S3) {
return rebuildFromRemoteChkp_s3(key, chkpPath, chkpId, defaultPath);
} else if (type == UPLOAD_RSYNC) {
return rebuildFromRemoteChkp_rsync(key, chkpPath, chkpId, defaultPath);
}
return -1;
}
int32_t rebuildFromLocalChkp(char* key, char* chkpPath, int64_t chkpId, char* defaultPath) {
int32_t code = -1;
@ -944,7 +971,7 @@ int32_t chkpPreBuildDir(char* path, int64_t chkpId, char** chkpDir, char** chkpI
sprintf(pChkpIdDir, "%s%s%s%" PRId64, pChkpDir, TD_DIRSEP, "checkpoint", chkpId);
if (taosIsDir(pChkpIdDir)) {
stInfo("stream rm exist checkpoint%s", pChkpIdDir);
taosRemoveFile(pChkpIdDir);
taosRemoveDir(pChkpIdDir);
}
*chkpDir = pChkpDir;
*chkpIdDir = pChkpIdDir;
@ -1848,22 +1875,17 @@ int32_t taskDbGenChkpUploadData__rsync(STaskDbWrapper* pDb, int64_t chkpId, char
if (taosAcquireRef(taskDbWrapperId, refId) == NULL) {
return -1;
}
char* pChkpDir = NULL;
char* pChkpIdDir = NULL;
if (chkpPreBuildDir(pDb->path, chkpId, &pChkpDir, &pChkpIdDir) != 0) {
code = -1;
}
if (taosIsDir(pChkpIdDir) && isValidCheckpoint(pChkpIdDir)) {
char* buf = taosMemoryCalloc(1, strlen(pDb->path) + 128);
sprintf(buf, "%s%s%s%s%s%" PRId64 "", pDb->path, TD_DIRSEP, "checkpoints", TD_DIRSEP, "checkpoint", chkpId);
if (taosIsDir(buf)) {
code = 0;
*path = pChkpIdDir;
pChkpIdDir = NULL;
*path = buf;
} else {
taosMemoryFree(buf);
}
taosMemoryFree(pChkpDir);
taosMemoryFree(pChkpIdDir);
taosReleaseRef(taskDbWrapperId, refId);
return code;
}

View File

@ -375,17 +375,16 @@ int32_t doUploadChkp(void* param) {
SAsyncUploadArg* arg = param;
char* path = NULL;
int32_t code = 0;
SArray* list = taosArrayInit(4, sizeof(void*));
SArray* toDelFiles = taosArrayInit(4, sizeof(void*));
if ((code = taskDbGenChkpUploadData(arg->pTask->pBackend, arg->pTask->pMeta->bkdChkptMgt, arg->chkpId,
(int8_t)(arg->type), &path, list)) != 0) {
(int8_t)(arg->type), &path, toDelFiles)) != 0) {
stError("s-task:%s failed to gen upload checkpoint:%" PRId64 "", arg->pTask->id.idStr, arg->chkpId);
}
code = getChkpMeta(arg->taskId, path, list);
if (code != 0) {
code = 0;
if (arg->type == UPLOAD_S3) {
if (code == 0 && (code = getChkpMeta(arg->taskId, path, toDelFiles)) != 0) {
stError("s-task:%s failed to get checkpoint:%" PRId64 " meta", arg->pTask->id.idStr, arg->chkpId);
}
}
if (code == 0 && (code = uploadCheckpoint(arg->taskId, path)) != 0) {
@ -393,23 +392,24 @@ int32_t doUploadChkp(void* param) {
}
if (code == 0) {
for (int i = 0; i < taosArrayGetSize(list); i++) {
char* p = taosArrayGetP(list, i);
for (int i = 0; i < taosArrayGetSize(toDelFiles); i++) {
char* p = taosArrayGetP(toDelFiles, i);
code = deleteCheckpointFile(arg->taskId, p);
stDebug("try to del file: %s", p);
stDebug("s-task:%s try to del file: %s", arg->pTask->id.idStr, p);
if (code != 0) {
break;
}
}
}
taosArrayDestroyP(list, taosMemoryFree);
taosArrayDestroyP(toDelFiles, taosMemoryFree);
taosRemoveDir(path);
taosMemoryFree(path);
taosMemoryFree(arg->taskId);
taosMemoryFree(arg);
return 0;
return code;
}
int32_t streamTaskUploadChkp(SStreamTask* pTask, int64_t chkpId, char* taskId) {
// async upload