add cvt state
This commit is contained in:
parent
11da631bb5
commit
8890f5c950
|
@ -140,7 +140,7 @@ enum STREAM_STATE_VER {
|
||||||
STREAM_STATA_NEED_CONVERT,
|
STREAM_STATA_NEED_CONVERT,
|
||||||
};
|
};
|
||||||
|
|
||||||
int32_t streamMetaCheckStateCompatible(SStreamMeta* pMeta) {
|
int32_t streamMetaCheckBackendCompatible(SStreamMeta* pMeta) {
|
||||||
int8_t ret = STREAM_STATA_COMPATIBLE;
|
int8_t ret = STREAM_STATA_COMPATIBLE;
|
||||||
TBC* pCur = NULL;
|
TBC* pCur = NULL;
|
||||||
|
|
||||||
|
@ -180,7 +180,7 @@ int32_t streamMetaCheckStateCompatible(SStreamMeta* pMeta) {
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t streamMetaDoStateBackendConvertImpl(SStreamMeta* pMeta) {
|
int32_t streamMetaConvertBackendFormat(SStreamMeta* pMeta) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
int64_t chkpId = streamGetLatestCheckpointId(pMeta);
|
int64_t chkpId = streamGetLatestCheckpointId(pMeta);
|
||||||
SBackendWrapper* pBackend = streamBackendInit(pMeta->path, chkpId);
|
SBackendWrapper* pBackend = streamBackendInit(pMeta->path, chkpId);
|
||||||
|
@ -201,14 +201,14 @@ _EXIT:
|
||||||
streamBackendCleanup((void*)pBackend);
|
streamBackendCleanup((void*)pBackend);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
int32_t streamMetaMayDoStateBackendConvert(SStreamMeta* pMeta) {
|
int32_t streamMetaMayConvertBackendFormat(SStreamMeta* pMeta) {
|
||||||
int8_t compatible = streamMetaCheckStateCompatible(pMeta);
|
int8_t compatible = streamMetaCheckBackendCompatible(pMeta);
|
||||||
if (compatible == STREAM_STATA_COMPATIBLE) {
|
if (compatible == STREAM_STATA_COMPATIBLE) {
|
||||||
return 0;
|
return 0;
|
||||||
} else if (compatible == STREAM_STATA_NEED_CONVERT) {
|
} else if (compatible == STREAM_STATA_NEED_CONVERT) {
|
||||||
qInfo("stream state need covert backend format");
|
qInfo("stream state need covert backend format");
|
||||||
|
|
||||||
return streamMetaDoStateBackendConvertImpl(pMeta);
|
return streamMetaConvertBackendFormat(pMeta);
|
||||||
} else if (compatible == STREAM_STATA_NO_COMPATIBLE) {
|
} else if (compatible == STREAM_STATA_NO_COMPATIBLE) {
|
||||||
qError(
|
qError(
|
||||||
"stream read incompatible data, rm %s/vnode/vnode*/tq/stream if taosd cannot start, and rebuild stream "
|
"stream read incompatible data, rm %s/vnode/vnode*/tq/stream if taosd cannot start, and rebuild stream "
|
||||||
|
@ -250,7 +250,7 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (streamMetaMayDoStateBackendConvert(pMeta) < 0) {
|
if (streamMetaMayConvertBackendFormat(pMeta) < 0) {
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue