refactor backend
This commit is contained in:
parent
7730413924
commit
52a8082622
|
@ -133,7 +133,7 @@ typedef struct {
|
||||||
|
|
||||||
#define META_ON_S3_FORMATE "%s_%" PRId64 "\n%s_%" PRId64 "\n%s_%" PRId64 ""
|
#define META_ON_S3_FORMATE "%s_%" PRId64 "\n%s_%" PRId64 "\n%s_%" PRId64 ""
|
||||||
|
|
||||||
bool streamBackendDataIsExist(const char* path, int64_t chkpId, int32_t vgId);
|
bool streamBackendDataIsExist(const char* path, int64_t chkpId);
|
||||||
void* streamBackendInit(const char* path, int64_t chkpId, int32_t vgId);
|
void* streamBackendInit(const char* path, int64_t chkpId, int32_t vgId);
|
||||||
void streamBackendCleanup(void* arg);
|
void streamBackendCleanup(void* arg);
|
||||||
void streamBackendHandleCleanup(void* arg);
|
void streamBackendHandleCleanup(void* arg);
|
||||||
|
|
|
@ -786,6 +786,29 @@ _EXIT:
|
||||||
taosMemoryFree(checkpointRoot);
|
taosMemoryFree(checkpointRoot);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
bool streamBackendDataIsExist(const char* path, int64_t chkpId) {
|
||||||
|
bool exist = true;
|
||||||
|
int32_t cap = strlen(path) + 32;
|
||||||
|
|
||||||
|
char* state = taosMemoryCalloc(1, cap);
|
||||||
|
if (state == NULL) {
|
||||||
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
int16_t nBytes = snprintf(state, cap, "%s%s%s", path, TD_DIRSEP, "state");
|
||||||
|
if (nBytes <= 0 || nBytes >= cap) {
|
||||||
|
terrno = TSDB_CODE_OUT_OF_RANGE;
|
||||||
|
exist = false;
|
||||||
|
} else {
|
||||||
|
if (!taosDirExist(state)) {
|
||||||
|
exist = false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
taosMemoryFree(state);
|
||||||
|
return exist;
|
||||||
|
}
|
||||||
|
|
||||||
void* streamBackendInit(const char* streamPath, int64_t chkpId, int32_t vgId) {
|
void* streamBackendInit(const char* streamPath, int64_t chkpId, int32_t vgId) {
|
||||||
char* backendPath = NULL;
|
char* backendPath = NULL;
|
||||||
|
|
|
@ -183,9 +183,15 @@ int32_t streamMetaCvtDbFormat(SStreamMeta* pMeta) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
int64_t chkpId = streamMetaGetLatestCheckpointId(pMeta);
|
int64_t chkpId = streamMetaGetLatestCheckpointId(pMeta);
|
||||||
|
|
||||||
bool exist = streamBackendDataIsExist(pMeta->path, chkpId, pMeta->vgId);
|
bool exist = streamBackendDataIsExist(pMeta->path, chkpId);
|
||||||
if (exist == false) {
|
if (exist == false) {
|
||||||
stError("failed to check backend data exist, reason:%s", tstrerror(terrno));
|
if (terrno != 0) {
|
||||||
|
code = terrno;
|
||||||
|
terrno = 0;
|
||||||
|
stError("failed to check backend data exist, reason:%s", tstrerror(code));
|
||||||
|
} else {
|
||||||
|
stInfo("not need to convert stream backend formate");
|
||||||
|
}
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue