From f5ef2e727fd0a0aff10d957bb3b3d0d91321649e Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Thu, 16 Nov 2023 14:41:04 +0800 Subject: [PATCH] fix stream backend convert --- source/libs/stream/inc/streamBackendRocksdb.h | 1 + source/libs/stream/src/streamBackendRocksdb.c | 10 ++++++++++ source/libs/stream/src/streamMeta.c | 17 +++++++++++++++-- 3 files changed, 26 insertions(+), 2 deletions(-) diff --git a/source/libs/stream/inc/streamBackendRocksdb.h b/source/libs/stream/inc/streamBackendRocksdb.h index 47404f311f..e6554e2fdf 100644 --- a/source/libs/stream/inc/streamBackendRocksdb.h +++ b/source/libs/stream/inc/streamBackendRocksdb.h @@ -128,6 +128,7 @@ typedef struct { TdThreadRwlock rwLock; } SBkdMgt; +bool streamBackendDataIsExist(const char* path, int64_t chkpId, int32_t vgId); void* streamBackendInit(const char* path, int64_t chkpId, int32_t vgId); void streamBackendCleanup(void* arg); void streamBackendHandleCleanup(void* arg); diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index c3196708ee..d0e343ae78 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -308,6 +308,16 @@ int32_t rebuildDirFromChkp2(const char* path, char* key, int64_t chkpId, char** return 0; } +bool streamBackendDataIsExist(const char* path, int64_t chkpId, int32_t vgId) { + bool exist = true; + char* state = taosMemoryCalloc(1, strlen(path) + 32); + sprintf(state, "%s%s%s", path, TD_DIRSEP, "state"); + if (!taosDirExist(state)) { + exist = false; + } + taosMemoryFree(state); + return exist; +} void* streamBackendInit(const char* streamPath, int64_t chkpId, int32_t vgId) { char* backendPath = NULL; int32_t code = rebuildDirFromCheckpoint(streamPath, chkpId, &backendPath); diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 6b1ef6a7a0..afc7512cf8 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -188,8 +188,13 @@ int32_t streamMetaCheckBackendCompatible(SStreamMeta* pMeta) { } int32_t streamMetaCvtDbFormat(SStreamMeta* pMeta) { - int32_t code = 0; - int64_t chkpId = streamMetaGetLatestCheckpointId(pMeta); + int32_t code = 0; + int64_t chkpId = streamMetaGetLatestCheckpointId(pMeta); + + bool exist = streamBackendDataIsExist(pMeta->path, chkpId, pMeta->vgId); + if (exist == false) { + return code; + } SBackendWrapper* pBackend = streamBackendInit(pMeta->path, chkpId, pMeta->vgId); void* pIter = taosHashIterate(pBackend->cfInst, NULL); @@ -206,6 +211,14 @@ int32_t streamMetaCvtDbFormat(SStreamMeta* pMeta) { _EXIT: streamBackendCleanup((void*)pBackend); + + if (code == 0) { + char* state = taosMemoryCalloc(1, strlen(pMeta->path) + 32); + sprintf(state, "%s%s%s", pMeta->path, TD_DIRSEP, "state"); + taosRemoveDir(state); + taosMemoryFree(state); + } + return code; } int32_t streamMetaMayCvtDbFormat(SStreamMeta* pMeta) {