fix stream backend convert

This commit is contained in:
yihaoDeng 2023-11-16 14:41:04 +08:00
parent cee33ffb95
commit f5ef2e727f
3 changed files with 26 additions and 2 deletions

View File

@ -128,6 +128,7 @@ typedef struct {
TdThreadRwlock rwLock;
} SBkdMgt;
bool streamBackendDataIsExist(const char* path, int64_t chkpId, int32_t vgId);
void* streamBackendInit(const char* path, int64_t chkpId, int32_t vgId);
void streamBackendCleanup(void* arg);
void streamBackendHandleCleanup(void* arg);

View File

@ -308,6 +308,16 @@ int32_t rebuildDirFromChkp2(const char* path, char* key, int64_t chkpId, char**
return 0;
}
bool streamBackendDataIsExist(const char* path, int64_t chkpId, int32_t vgId) {
bool exist = true;
char* state = taosMemoryCalloc(1, strlen(path) + 32);
sprintf(state, "%s%s%s", path, TD_DIRSEP, "state");
if (!taosDirExist(state)) {
exist = false;
}
taosMemoryFree(state);
return exist;
}
void* streamBackendInit(const char* streamPath, int64_t chkpId, int32_t vgId) {
char* backendPath = NULL;
int32_t code = rebuildDirFromCheckpoint(streamPath, chkpId, &backendPath);

View File

@ -190,6 +190,11 @@ int32_t streamMetaCheckBackendCompatible(SStreamMeta* pMeta) {
int32_t streamMetaCvtDbFormat(SStreamMeta* pMeta) {
int32_t code = 0;
int64_t chkpId = streamMetaGetLatestCheckpointId(pMeta);
bool exist = streamBackendDataIsExist(pMeta->path, chkpId, pMeta->vgId);
if (exist == false) {
return code;
}
SBackendWrapper* pBackend = streamBackendInit(pMeta->path, chkpId, pMeta->vgId);
void* pIter = taosHashIterate(pBackend->cfInst, NULL);
@ -206,6 +211,14 @@ int32_t streamMetaCvtDbFormat(SStreamMeta* pMeta) {
_EXIT:
streamBackendCleanup((void*)pBackend);
if (code == 0) {
char* state = taosMemoryCalloc(1, strlen(pMeta->path) + 32);
sprintf(state, "%s%s%s", pMeta->path, TD_DIRSEP, "state");
taosRemoveDir(state);
taosMemoryFree(state);
}
return code;
}
int32_t streamMetaMayCvtDbFormat(SStreamMeta* pMeta) {