From 970b1cb84065ed9bb240d97267b0617b749f496d Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Fri, 22 Sep 2023 08:38:41 +0000 Subject: [PATCH] add cvt state --- source/libs/stream/src/streamMeta.c | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index d1f59d0a8f..aa29ea8159 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -180,7 +180,7 @@ int32_t streamMetaCheckStateCompatible(SStreamMeta* pMeta) { return ret; } -int32_t streamMetaDoStateDataConvertImpl(SStreamMeta* pMeta) { +int32_t streamMetaDoStateBackendConvertImpl(SStreamMeta* pMeta) { int32_t code = 0; int64_t chkpId = streamGetLatestCheckpointId(pMeta); SBackendWrapper* pBackend = streamBackendInit(pMeta->path, chkpId); @@ -190,23 +190,25 @@ int32_t streamMetaDoStateDataConvertImpl(SStreamMeta* pMeta) { void* key = taosHashGetKey(pIter, NULL); code = streamStateConvertDataFormat(pMeta->path, key, *(void**)pIter); if (code != 0) { - // continue + qError("failed to cvt data"); + goto _EXIT; } pIter = taosHashIterate(pBackend->cfInst, pIter); } - // streamBackendCleanup(); - return 0; +_EXIT: + streamBackendCleanup((void*)pBackend); + return code; } -int32_t streamMetaDoStateDataConvert(SStreamMeta* pMeta) { +int32_t streamMetaMayDoStateBackendConvert(SStreamMeta* pMeta) { int8_t compatible = streamMetaCheckStateCompatible(pMeta); if (compatible == STREAM_STATA_COMPATIBLE) { return 0; } else if (compatible == STREAM_STATA_NEED_CONVERT) { qInfo("stream state need covert backend format"); - return streamMetaDoStateDataConvertImpl(pMeta); + return streamMetaDoStateBackendConvertImpl(pMeta); } else if (compatible == STREAM_STATA_NO_COMPATIBLE) { qError( "stream read incompatible data, rm %s/vnode/vnode*/tq/stream if taosd cannot start, and rebuild stream " @@ -237,7 +239,7 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF goto _err; } - if (streamMetaDoStateDataConvert(pMeta) < 0) { + if (streamMetaMayDoStateBackendConvert(pMeta) < 0) { goto _err; }