fix stream backend convert

This commit is contained in:
yihaoDeng 2023-11-17 10:13:39 +08:00
parent 52d7a26c10
commit b6994413d0
1 changed files with 41 additions and 50 deletions

View File

@ -296,10 +296,10 @@ int32_t validAndCvtMeta(char* path, SArray* list, int64_t chkpId) {
return complete == 1 ? 0 : -1;
}
int32_t rebuildFromRemote(char* key, char* chkpPath, int64_t chkpId, char* defaultPath) {
int32_t rebuildFromRemoteChkp(char* key, char* chkpPath, int64_t chkpId, char* defaultPath) {
int32_t code = downloadCheckpoint(key, chkpPath);
if (code != 0) {
return -1;
return code;
}
SArray* list = taosArrayInit(2, sizeof(void*));
@ -318,16 +318,35 @@ int32_t rebuildFromRemote(char* key, char* chkpPath, int64_t chkpId, char* defau
}
return code;
}
int32_t rebuildFromLocalChkp(char* key, char* chkpPath, int64_t chkpId, char* defaultPath) {
int32_t code = 0;
if (taosIsDir(chkpPath) && isValidCheckpoint(chkpPath)) {
if (taosIsDir(defaultPath)) {
taosRemoveDir(defaultPath);
}
taosMkDir(defaultPath);
code = copyFiles(chkpPath, defaultPath);
if (code != 0) {
stError("failed to restart stream backend from %s, reason: %s", chkpPath, tstrerror(TAOS_SYSTEM_ERROR(errno)));
} else {
stInfo("start to restart stream backend at checkpoint path: %s", chkpPath);
}
return code;
}
return -1;
}
int32_t rebuildFromlocalDefault(char* key, char* chkpPath, int64_t chkpId, char* defaultPath) {
int32_t code = 0;
return code;
}
int32_t rebuildDirFromChkp2(const char* path, char* key, int64_t chkpId, char** dbPrefixPath, char** dbPath) {
// impl later
int32_t code = 0;
/*param@1: checkpointId dir
param@2: state
copy pChkpIdDir's file to state dir
opt to set hard link to previous file
*/
char* prefixPath = taosMemoryCalloc(1, strlen(path) + 128);
sprintf(prefixPath, "%s%s%s", path, TD_DIRSEP, key);
@ -342,61 +361,33 @@ int32_t rebuildDirFromChkp2(const char* path, char* key, int64_t chkpId, char**
taosMulMkDir(defaultPath);
}
char* chkpPath = taosMemoryCalloc(1, strlen(path) + 256);
if (chkpId != 0) {
char* chkpPath = taosMemoryCalloc(1, strlen(path) + 256);
sprintf(chkpPath, "%s%s%s%s%s%" PRId64 "", prefixPath, TD_DIRSEP, "checkpoints", TD_DIRSEP, "checkpoint", chkpId);
if (taosIsDir(chkpPath) && isValidCheckpoint(chkpPath)) {
if (taosIsDir(defaultPath)) {
// remove dir if exists
// taosRenameFile(const char *oldName, const char *newName)
taosRemoveDir(defaultPath);
}
taosMkDir(defaultPath);
code = copyFiles(chkpPath, defaultPath);
if (code != 0) {
stError("failed to restart stream backend from %s, reason: %s", chkpPath, tstrerror(TAOS_SYSTEM_ERROR(errno)));
} else {
stInfo("start to restart stream backend at checkpoint path: %s", chkpPath);
}
} else if (!taosIsDir(chkpPath)) {
code = rebuildFromRemote(key, chkpPath, chkpId, defaultPath);
} else {
stInfo("failed to start stream backend at %s, reason: %s, restart from default defaultPath dir:%s", chkpPath,
tstrerror(TAOS_SYSTEM_ERROR(errno)), defaultPath);
taosMkDir(defaultPath);
code = rebuildFromLocalChkp(key, chkpPath, chkpId, defaultPath);
if (code != 0) {
code = rebuildFromRemoteChkp(key, chkpPath, chkpId, defaultPath);
}
taosMemoryFree(chkpPath);
if (code != 0) {
stInfo("failed to start stream backend at %s, reason: %s, restart from default defaultPath dir:%s", chkpPath,
tstrerror(TAOS_SYSTEM_ERROR(errno)), defaultPath);
code = taosMkDir(defaultPath);
}
} else {
char* chkpPath = taosMemoryCalloc(1, strlen(path) + 256);
sprintf(chkpPath, "%s%s%s%s%s%" PRId64 "", prefixPath, TD_DIRSEP, "checkpoints", TD_DIRSEP, "checkpoint",
(int64_t)-1);
stInfo("no chkp id specified, try to restart from received chkp id -1, dir: %s", chkpPath);
if (taosIsDir(chkpPath) && isValidCheckpoint(chkpPath)) {
if (taosIsDir(defaultPath)) {
taosRemoveDir(defaultPath);
}
taosMkDir(defaultPath);
code = copyFiles(chkpPath, defaultPath);
stInfo("copy snap file from %s to %s", chkpPath, defaultPath);
if (code != 0) {
stError("failed to restart stream backend from %s, reason: %s", chkpPath, tstrerror(TAOS_SYSTEM_ERROR(errno)));
} else {
stInfo("start to restart stream backend at checkpoint path: %s", chkpPath);
}
} else {
stInfo("failed to start stream backend at %s, reason: %s, restart from default defaultPath dir:%s", chkpPath,
tstrerror(TAOS_SYSTEM_ERROR(errno)), defaultPath);
taosMkDir(defaultPath);
code = rebuildFromLocalChkp(key, chkpPath, -1, defaultPath);
if (code != 0) {
code = taosMkDir(defaultPath);
}
taosMemoryFree(chkpPath);
}
*dbPath = defaultPath;
*dbPrefixPath = prefixPath;
return 0;
return code;
}
bool streamBackendDataIsExist(const char* path, int64_t chkpId, int32_t vgId) {