diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index d0e343ae78..72f8167f49 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -230,6 +230,94 @@ int32_t rebuildDirFromCheckpoint(const char* path, int64_t chkpId, char** dst) { return 0; } +int32_t readMetaData(char* path, SArray* list) { + char buf[128] = {0}; + char* metaPath = taosMemoryCalloc(1, strlen(path)); + sprintf(metaPath, "%s%s%s", path, TD_DIRSEP, "META"); + + TdFilePtr pFile = taosOpenFile(path, TD_FILE_READ); + + if (taosReadFile(pFile, buf, sizeof(buf)) <= 0) { + taosMemoryFree(metaPath); + taosCloseFile(&pFile); + return -1; + } + int32_t len = strlen(buf); + for (int i = 0; i < len; i++) { + if (buf[i] == '\n') { + char* item = taosMemoryCalloc(1, i + 1); + memcpy(item, buf, i); + taosArrayPush(list, &item); + + item = taosMemoryCalloc(1, len - i); + memcpy(item, buf + i + 1, len - i - 1); + taosArrayPush(list, &item); + } + } + + taosCloseFile(&pFile); + taosMemoryFree(metaPath); + return 0; +} +int32_t validAndCvtMeta(char* path, SArray* list, int64_t chkpId) { + int32_t complete = 1; + int32_t len = strlen(path) + 32; + char* src = taosMemoryCalloc(1, len); + char* dst = taosMemoryCalloc(1, len); + + for (int i = 0; i < taosArrayGetSize(list); i++) { + char* p = taosArrayGetP(list, i); + sprintf(src, "%s%s%s", path, TD_DIRSEP, p); + if (taosStatFile(src, NULL, NULL, NULL) != 0) { + complete = 0; + break; + } + + char temp[64] = {0}; + for (int j = 0; j < strlen(p); j++) { + if (p[j] == '_') { + memcpy(temp, p, j); + } + if (taosStr2int64(p + j + 1) != chkpId) { + complete = 0; + break; + } + } + + sprintf(dst, "%s%s%s", path, TD_DIRSEP, temp); + taosRenameFile(src, dst); + + memset(src, 0, len); + memset(dst, 0, len); + } + + taosMemoryFree(src); + taosMemoryFree(dst); + return complete == 1 ? 0 : -1; +} + +int32_t rebuildFromRemote(char* key, char* chkpPath, int64_t chkpId, char* defaultPath) { + int32_t code = downloadCheckpoint(key, chkpPath); + if (code != 0) { + return -1; + } + + SArray* list = taosArrayInit(2, sizeof(void*)); + code = readMetaData(chkpPath, list); + if (code == 0) { + code = validAndCvtMeta(chkpPath, list, chkpId); + } + taosArrayDestroyP(list, taosMemoryFree); + + if (code == 0) { + if (taosIsDir(defaultPath)) { + taosRemoveDir(defaultPath); + } + taosMkDir(defaultPath); + code = copyFiles(chkpPath, defaultPath); + } + return code; +} int32_t rebuildDirFromChkp2(const char* path, char* key, int64_t chkpId, char** dbPrefixPath, char** dbPath) { // impl later int32_t code = 0; @@ -271,11 +359,14 @@ int32_t rebuildDirFromChkp2(const char* path, char* key, int64_t chkpId, char** stInfo("start to restart stream backend at checkpoint path: %s", chkpPath); } + } else if (!taosIsDir(chkpPath)) { + code = rebuildFromRemote(key, chkpPath, chkpId, defaultPath); } else { stInfo("failed to start stream backend at %s, reason: %s, restart from default defaultPath dir:%s", chkpPath, tstrerror(TAOS_SYSTEM_ERROR(errno)), defaultPath); taosMkDir(defaultPath); } + taosMemoryFree(chkpPath); } else { char* chkpPath = taosMemoryCalloc(1, strlen(path) + 256); @@ -309,7 +400,7 @@ int32_t rebuildDirFromChkp2(const char* path, char* key, int64_t chkpId, char** } bool streamBackendDataIsExist(const char* path, int64_t chkpId, int32_t vgId) { - bool exist = true; + bool exist = true; char* state = taosMemoryCalloc(1, strlen(path) + 32); sprintf(state, "%s%s%s", path, TD_DIRSEP, "state"); if (!taosDirExist(state)) {