From b6994413d0f55d7e5580b54a5d7c183150645d2d Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Fri, 17 Nov 2023 10:13:39 +0800 Subject: [PATCH] fix stream backend convert --- source/libs/stream/src/streamBackendRocksdb.c | 91 +++++++++---------- 1 file changed, 41 insertions(+), 50 deletions(-) diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index 72f8167f49..c80b8163a4 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -296,10 +296,10 @@ int32_t validAndCvtMeta(char* path, SArray* list, int64_t chkpId) { return complete == 1 ? 0 : -1; } -int32_t rebuildFromRemote(char* key, char* chkpPath, int64_t chkpId, char* defaultPath) { +int32_t rebuildFromRemoteChkp(char* key, char* chkpPath, int64_t chkpId, char* defaultPath) { int32_t code = downloadCheckpoint(key, chkpPath); if (code != 0) { - return -1; + return code; } SArray* list = taosArrayInit(2, sizeof(void*)); @@ -318,16 +318,35 @@ int32_t rebuildFromRemote(char* key, char* chkpPath, int64_t chkpId, char* defau } return code; } + +int32_t rebuildFromLocalChkp(char* key, char* chkpPath, int64_t chkpId, char* defaultPath) { + int32_t code = 0; + if (taosIsDir(chkpPath) && isValidCheckpoint(chkpPath)) { + if (taosIsDir(defaultPath)) { + taosRemoveDir(defaultPath); + } + taosMkDir(defaultPath); + code = copyFiles(chkpPath, defaultPath); + if (code != 0) { + stError("failed to restart stream backend from %s, reason: %s", chkpPath, tstrerror(TAOS_SYSTEM_ERROR(errno))); + } else { + stInfo("start to restart stream backend at checkpoint path: %s", chkpPath); + } + + return code; + } + return -1; +} + +int32_t rebuildFromlocalDefault(char* key, char* chkpPath, int64_t chkpId, char* defaultPath) { + int32_t code = 0; + return code; +} + int32_t rebuildDirFromChkp2(const char* path, char* key, int64_t chkpId, char** dbPrefixPath, char** dbPath) { // impl later int32_t code = 0; - /*param@1: checkpointId dir - param@2: state - copy pChkpIdDir's file to state dir - opt to set hard link to previous file - */ - char* prefixPath = taosMemoryCalloc(1, strlen(path) + 128); sprintf(prefixPath, "%s%s%s", path, TD_DIRSEP, key); @@ -342,61 +361,33 @@ int32_t rebuildDirFromChkp2(const char* path, char* key, int64_t chkpId, char** taosMulMkDir(defaultPath); } + char* chkpPath = taosMemoryCalloc(1, strlen(path) + 256); if (chkpId != 0) { - char* chkpPath = taosMemoryCalloc(1, strlen(path) + 256); sprintf(chkpPath, "%s%s%s%s%s%" PRId64 "", prefixPath, TD_DIRSEP, "checkpoints", TD_DIRSEP, "checkpoint", chkpId); - if (taosIsDir(chkpPath) && isValidCheckpoint(chkpPath)) { - if (taosIsDir(defaultPath)) { - // remove dir if exists - // taosRenameFile(const char *oldName, const char *newName) - taosRemoveDir(defaultPath); - } - taosMkDir(defaultPath); - code = copyFiles(chkpPath, defaultPath); - if (code != 0) { - stError("failed to restart stream backend from %s, reason: %s", chkpPath, tstrerror(TAOS_SYSTEM_ERROR(errno))); - } else { - stInfo("start to restart stream backend at checkpoint path: %s", chkpPath); - } - - } else if (!taosIsDir(chkpPath)) { - code = rebuildFromRemote(key, chkpPath, chkpId, defaultPath); - } else { - stInfo("failed to start stream backend at %s, reason: %s, restart from default defaultPath dir:%s", chkpPath, - tstrerror(TAOS_SYSTEM_ERROR(errno)), defaultPath); - taosMkDir(defaultPath); + code = rebuildFromLocalChkp(key, chkpPath, chkpId, defaultPath); + if (code != 0) { + code = rebuildFromRemoteChkp(key, chkpPath, chkpId, defaultPath); } - taosMemoryFree(chkpPath); + if (code != 0) { + stInfo("failed to start stream backend at %s, reason: %s, restart from default defaultPath dir:%s", chkpPath, + tstrerror(TAOS_SYSTEM_ERROR(errno)), defaultPath); + code = taosMkDir(defaultPath); + } } else { - char* chkpPath = taosMemoryCalloc(1, strlen(path) + 256); sprintf(chkpPath, "%s%s%s%s%s%" PRId64 "", prefixPath, TD_DIRSEP, "checkpoints", TD_DIRSEP, "checkpoint", (int64_t)-1); - stInfo("no chkp id specified, try to restart from received chkp id -1, dir: %s", chkpPath); - if (taosIsDir(chkpPath) && isValidCheckpoint(chkpPath)) { - if (taosIsDir(defaultPath)) { - taosRemoveDir(defaultPath); - } - taosMkDir(defaultPath); - code = copyFiles(chkpPath, defaultPath); - stInfo("copy snap file from %s to %s", chkpPath, defaultPath); - if (code != 0) { - stError("failed to restart stream backend from %s, reason: %s", chkpPath, tstrerror(TAOS_SYSTEM_ERROR(errno))); - } else { - stInfo("start to restart stream backend at checkpoint path: %s", chkpPath); - } - } else { - stInfo("failed to start stream backend at %s, reason: %s, restart from default defaultPath dir:%s", chkpPath, - tstrerror(TAOS_SYSTEM_ERROR(errno)), defaultPath); - taosMkDir(defaultPath); + + code = rebuildFromLocalChkp(key, chkpPath, -1, defaultPath); + if (code != 0) { + code = taosMkDir(defaultPath); } - taosMemoryFree(chkpPath); } *dbPath = defaultPath; *dbPrefixPath = prefixPath; - return 0; + return code; } bool streamBackendDataIsExist(const char* path, int64_t chkpId, int32_t vgId) {