From 7e7a39f077527d5c95bdc016a815fc344fc8417b Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Thu, 2 Nov 2023 10:15:34 +0800 Subject: [PATCH] Merge branch 'enh/new3.0' into enh/refactorBackend --- source/libs/stream/src/streamMeta.c | 20 ++++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index a5bb9e59d8..d12661c494 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -65,6 +65,7 @@ static void streamMetaEnvInit() { } void streamMetaInit() { taosThreadOnce(&streamMetaModuleInit, streamMetaEnvInit); } + void streamMetaCleanup() { taosCloseRef(streamBackendId); taosCloseRef(streamBackendCfWrapperId); @@ -319,15 +320,7 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF pMeta->expandFunc = expandFunc; pMeta->stage = stage; - int64_t* pRid = taosMemoryMalloc(sizeof(int64_t)); - pMeta->rid = taosAddRef(streamMetaId, pMeta); - *pRid = pMeta->rid; - metaRefMgtAdd(pMeta->vgId, pRid); - - pMeta->pHbInfo->hbTmr = taosTmrStart(metaHbToMnode, META_HB_CHECK_INTERVAL, pRid, streamEnv.timer); - pMeta->pHbInfo->tickCounter = 0; - pMeta->pHbInfo->stopFlag = 0; pMeta->pTaskDbUnique = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK); @@ -347,6 +340,17 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF pMeta->numOfStreamTasks = 0; stInfo("vgId:%d open stream meta successfully, latest checkpoint:%" PRId64 ", stage:%" PRId64, vgId, pMeta->chkpId, stage); + + pMeta->rid = taosAddRef(streamMetaId, pMeta); + + int64_t* pRid = taosMemoryMalloc(sizeof(int64_t)); + memcpy(pRid, &pMeta->rid, sizeof(pMeta->rid)); + metaRefMgtAdd(pMeta->vgId, pRid); + + pMeta->pHbInfo->hbTmr = taosTmrStart(metaHbToMnode, META_HB_CHECK_INTERVAL, pRid, streamEnv.timer); + pMeta->pHbInfo->tickCounter = 0; + pMeta->pHbInfo->stopFlag = 0; + return pMeta; _err: