From 96e148778705a826cd3922dfc091f6fcb2968417 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 22 May 2024 16:57:09 +0800 Subject: [PATCH] refactor: do some internal refactor. --- source/libs/stream/src/streamMeta.c | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 7f1128b929..519800be28 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -151,10 +151,10 @@ int32_t streamMetaCheckBackendCompatible(SStreamMeta* pMeta) { int8_t ret = STREAM_STATA_COMPATIBLE; TBC* pCur = NULL; - if (tdbTbcOpen(pMeta->pTaskDb, &pCur, NULL) < 0) { - // no task info, no stream + if (tdbTbcOpen(pMeta->pTaskDb, &pCur, NULL) < 0) { // no task info, no stream return ret; } + void* pKey = NULL; int32_t kLen = 0; void* pVal = NULL; @@ -165,20 +165,24 @@ int32_t streamMetaCheckBackendCompatible(SStreamMeta* pMeta) { if (pVal == NULL || vLen == 0) { break; } + SDecoder decoder; SCheckpointInfo info; tDecoderInit(&decoder, (uint8_t*)pVal, vLen); if (tDecodeStreamTaskChkInfo(&decoder, &info) < 0) { continue; } + if (info.msgVer <= SSTREAM_TASK_INCOMPATIBLE_VER) { ret = STREAM_STATA_NO_COMPATIBLE; } else if (info.msgVer >= SSTREAM_TASK_NEED_CONVERT_VER) { ret = STREAM_STATA_NEED_CONVERT; } + tDecoderClear(&decoder); break; } + tdbFree(pKey); tdbFree(pVal); tdbTbcClose(pCur); @@ -193,6 +197,7 @@ int32_t streamMetaCvtDbFormat(SStreamMeta* pMeta) { if (exist == false) { return code; } + SBackendWrapper* pBackend = streamBackendInit(pMeta->path, chkpId, pMeta->vgId); void* pIter = taosHashIterate(pBackend->cfInst, NULL); @@ -219,6 +224,7 @@ _EXIT: return code; } + int32_t streamMetaMayCvtDbFormat(SStreamMeta* pMeta) { int8_t compatible = streamMetaCheckBackendCompatible(pMeta); if (compatible == STREAM_STATA_COMPATIBLE) { @@ -854,6 +860,10 @@ void streamMetaLoadAllTasks(SStreamMeta* pMeta) { tdbTbcMoveToFirst(pCur); while (tdbTbcNext(pCur, &pKey, &kLen, &pVal, &vLen) == 0) { + if (pVal == NULL || vLen == 0) { + break; + } + SStreamTask* pTask = taosMemoryCalloc(1, sizeof(SStreamTask)); if (pTask == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY;