add cvt state

This commit is contained in:
yihaoDeng 2023-09-22 08:38:41 +00:00
parent 173604ee1a
commit 970b1cb840
1 changed files with 9 additions and 7 deletions

View File

@ -180,7 +180,7 @@ int32_t streamMetaCheckStateCompatible(SStreamMeta* pMeta) {
return ret; return ret;
} }
int32_t streamMetaDoStateDataConvertImpl(SStreamMeta* pMeta) { int32_t streamMetaDoStateBackendConvertImpl(SStreamMeta* pMeta) {
int32_t code = 0; int32_t code = 0;
int64_t chkpId = streamGetLatestCheckpointId(pMeta); int64_t chkpId = streamGetLatestCheckpointId(pMeta);
SBackendWrapper* pBackend = streamBackendInit(pMeta->path, chkpId); SBackendWrapper* pBackend = streamBackendInit(pMeta->path, chkpId);
@ -190,23 +190,25 @@ int32_t streamMetaDoStateDataConvertImpl(SStreamMeta* pMeta) {
void* key = taosHashGetKey(pIter, NULL); void* key = taosHashGetKey(pIter, NULL);
code = streamStateConvertDataFormat(pMeta->path, key, *(void**)pIter); code = streamStateConvertDataFormat(pMeta->path, key, *(void**)pIter);
if (code != 0) { if (code != 0) {
// continue qError("failed to cvt data");
goto _EXIT;
} }
pIter = taosHashIterate(pBackend->cfInst, pIter); pIter = taosHashIterate(pBackend->cfInst, pIter);
} }
// streamBackendCleanup();
return 0; _EXIT:
streamBackendCleanup((void*)pBackend);
return code;
} }
int32_t streamMetaDoStateDataConvert(SStreamMeta* pMeta) { int32_t streamMetaMayDoStateBackendConvert(SStreamMeta* pMeta) {
int8_t compatible = streamMetaCheckStateCompatible(pMeta); int8_t compatible = streamMetaCheckStateCompatible(pMeta);
if (compatible == STREAM_STATA_COMPATIBLE) { if (compatible == STREAM_STATA_COMPATIBLE) {
return 0; return 0;
} else if (compatible == STREAM_STATA_NEED_CONVERT) { } else if (compatible == STREAM_STATA_NEED_CONVERT) {
qInfo("stream state need covert backend format"); qInfo("stream state need covert backend format");
return streamMetaDoStateDataConvertImpl(pMeta); return streamMetaDoStateBackendConvertImpl(pMeta);
} else if (compatible == STREAM_STATA_NO_COMPATIBLE) { } else if (compatible == STREAM_STATA_NO_COMPATIBLE) {
qError( qError(
"stream read incompatible data, rm %s/vnode/vnode*/tq/stream if taosd cannot start, and rebuild stream " "stream read incompatible data, rm %s/vnode/vnode*/tq/stream if taosd cannot start, and rebuild stream "
@ -237,7 +239,7 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
goto _err; goto _err;
} }
if (streamMetaDoStateDataConvert(pMeta) < 0) { if (streamMetaMayDoStateBackendConvert(pMeta) < 0) {
goto _err; goto _err;
} }