fix stream backend convert

This commit is contained in:
yihaoDeng 2023-11-16 21:22:47 +08:00
parent 7905f22aae
commit 52d7a26c10
1 changed files with 92 additions and 1 deletions

View File

@ -230,6 +230,94 @@ int32_t rebuildDirFromCheckpoint(const char* path, int64_t chkpId, char** dst) {
return 0;
}
int32_t readMetaData(char* path, SArray* list) {
char buf[128] = {0};
char* metaPath = taosMemoryCalloc(1, strlen(path));
sprintf(metaPath, "%s%s%s", path, TD_DIRSEP, "META");
TdFilePtr pFile = taosOpenFile(path, TD_FILE_READ);
if (taosReadFile(pFile, buf, sizeof(buf)) <= 0) {
taosMemoryFree(metaPath);
taosCloseFile(&pFile);
return -1;
}
int32_t len = strlen(buf);
for (int i = 0; i < len; i++) {
if (buf[i] == '\n') {
char* item = taosMemoryCalloc(1, i + 1);
memcpy(item, buf, i);
taosArrayPush(list, &item);
item = taosMemoryCalloc(1, len - i);
memcpy(item, buf + i + 1, len - i - 1);
taosArrayPush(list, &item);
}
}
taosCloseFile(&pFile);
taosMemoryFree(metaPath);
return 0;
}
int32_t validAndCvtMeta(char* path, SArray* list, int64_t chkpId) {
int32_t complete = 1;
int32_t len = strlen(path) + 32;
char* src = taosMemoryCalloc(1, len);
char* dst = taosMemoryCalloc(1, len);
for (int i = 0; i < taosArrayGetSize(list); i++) {
char* p = taosArrayGetP(list, i);
sprintf(src, "%s%s%s", path, TD_DIRSEP, p);
if (taosStatFile(src, NULL, NULL, NULL) != 0) {
complete = 0;
break;
}
char temp[64] = {0};
for (int j = 0; j < strlen(p); j++) {
if (p[j] == '_') {
memcpy(temp, p, j);
}
if (taosStr2int64(p + j + 1) != chkpId) {
complete = 0;
break;
}
}
sprintf(dst, "%s%s%s", path, TD_DIRSEP, temp);
taosRenameFile(src, dst);
memset(src, 0, len);
memset(dst, 0, len);
}
taosMemoryFree(src);
taosMemoryFree(dst);
return complete == 1 ? 0 : -1;
}
int32_t rebuildFromRemote(char* key, char* chkpPath, int64_t chkpId, char* defaultPath) {
int32_t code = downloadCheckpoint(key, chkpPath);
if (code != 0) {
return -1;
}
SArray* list = taosArrayInit(2, sizeof(void*));
code = readMetaData(chkpPath, list);
if (code == 0) {
code = validAndCvtMeta(chkpPath, list, chkpId);
}
taosArrayDestroyP(list, taosMemoryFree);
if (code == 0) {
if (taosIsDir(defaultPath)) {
taosRemoveDir(defaultPath);
}
taosMkDir(defaultPath);
code = copyFiles(chkpPath, defaultPath);
}
return code;
}
int32_t rebuildDirFromChkp2(const char* path, char* key, int64_t chkpId, char** dbPrefixPath, char** dbPath) {
// impl later
int32_t code = 0;
@ -271,11 +359,14 @@ int32_t rebuildDirFromChkp2(const char* path, char* key, int64_t chkpId, char**
stInfo("start to restart stream backend at checkpoint path: %s", chkpPath);
}
} else if (!taosIsDir(chkpPath)) {
code = rebuildFromRemote(key, chkpPath, chkpId, defaultPath);
} else {
stInfo("failed to start stream backend at %s, reason: %s, restart from default defaultPath dir:%s", chkpPath,
tstrerror(TAOS_SYSTEM_ERROR(errno)), defaultPath);
taosMkDir(defaultPath);
}
taosMemoryFree(chkpPath);
} else {
char* chkpPath = taosMemoryCalloc(1, strlen(path) + 256);
@ -309,7 +400,7 @@ int32_t rebuildDirFromChkp2(const char* path, char* key, int64_t chkpId, char**
}
bool streamBackendDataIsExist(const char* path, int64_t chkpId, int32_t vgId) {
bool exist = true;
bool exist = true;
char* state = taosMemoryCalloc(1, strlen(path) + 32);
sprintf(state, "%s%s%s", path, TD_DIRSEP, "state");
if (!taosDirExist(state)) {