From 097d3caa902beab27bd2b0ba4dbefe6fa85cef8c Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 29 Oct 2024 10:20:27 +0800 Subject: [PATCH 1/4] fix(stream): add refId for meta at the end of init functions --- source/libs/stream/src/streamMeta.c | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 7e9b60b61a..17883f5fb1 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -434,7 +434,6 @@ int32_t streamMetaOpen(const char* path, void* ahandle, FTaskBuild buildTaskFn, pMeta->closeFlag = false; stInfo("vgId:%d open stream meta succ, latest checkpoint:%" PRId64 ", stage:%" PRId64, vgId, pMeta->chkpId, stage); - pMeta->rid = taosAddRef(streamMetaId, pMeta); // set the attribute when running on Linux OS TdThreadRwlockAttr attr; @@ -468,6 +467,9 @@ int32_t streamMetaOpen(const char* path, void* ahandle, FTaskBuild buildTaskFn, code = taosThreadMutexInit(&pMeta->backendMutex, NULL); TSDB_CHECK_CODE(code, lino, _err); + // add refId at the end of initialization function + pMeta->rid = taosAddRef(streamMetaId, pMeta); + *p = pMeta; return code; From c149449717079441ead616a0aca7197a8e307983 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 29 Oct 2024 18:45:30 +0800 Subject: [PATCH 2/4] fix(stream): adjust init ref position. --- source/libs/stream/src/streamMeta.c | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 17883f5fb1..064098ed35 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -458,9 +458,6 @@ int32_t streamMetaOpen(const char* path, void* ahandle, FTaskBuild buildTaskFn, code = metaRefMgtAdd(pMeta->vgId, pRid); TSDB_CHECK_CODE(code, lino, _err); - code = createMetaHbInfo(pRid, &pMeta->pHbInfo); - TSDB_CHECK_CODE(code, lino, _err); - code = bkdMgtCreate(tpath, (SBkdMgt**)&pMeta->bkdChkptMgt); TSDB_CHECK_CODE(code, lino, _err); @@ -469,6 +466,9 @@ int32_t streamMetaOpen(const char* path, void* ahandle, FTaskBuild buildTaskFn, // add refId at the end of initialization function pMeta->rid = taosAddRef(streamMetaId, pMeta); + code = createMetaHbInfo(pRid, &pMeta->pHbInfo); + + TSDB_CHECK_CODE(code, lino, _err); *p = pMeta; return code; From 71f4ae695720e108c20bcf6513d15e5199f7aa9c Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 29 Oct 2024 19:48:17 +0800 Subject: [PATCH 3/4] fix(stream): adjust refId init position. --- source/libs/stream/src/streamMeta.c | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 064098ed35..bc7878f761 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -451,13 +451,6 @@ int32_t streamMetaOpen(const char* path, void* ahandle, FTaskBuild buildTaskFn, code = taosThreadRwlockAttrDestroy(&attr); TSDB_CHECK_CODE(code, lino, _err); - int64_t* pRid = taosMemoryMalloc(sizeof(int64_t)); - TSDB_CHECK_NULL(pRid, code, lino, _err, terrno); - - memcpy(pRid, &pMeta->rid, sizeof(pMeta->rid)); - code = metaRefMgtAdd(pMeta->vgId, pRid); - TSDB_CHECK_CODE(code, lino, _err); - code = bkdMgtCreate(tpath, (SBkdMgt**)&pMeta->bkdChkptMgt); TSDB_CHECK_CODE(code, lino, _err); @@ -466,6 +459,15 @@ int32_t streamMetaOpen(const char* path, void* ahandle, FTaskBuild buildTaskFn, // add refId at the end of initialization function pMeta->rid = taosAddRef(streamMetaId, pMeta); + + int64_t* pRid = taosMemoryMalloc(sizeof(int64_t)); + TSDB_CHECK_NULL(pRid, code, lino, _err, terrno); + + memcpy(pRid, &pMeta->rid, sizeof(pMeta->rid)); + + code = metaRefMgtAdd(pMeta->vgId, pRid); + TSDB_CHECK_CODE(code, lino, _err); + code = createMetaHbInfo(pRid, &pMeta->pHbInfo); TSDB_CHECK_CODE(code, lino, _err); From e618a04e510e2b83f0cb929d3fac85656d7a2f9b Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 30 Oct 2024 09:13:07 +0800 Subject: [PATCH 4/4] fix(stream): adjust start timer position. --- source/libs/stream/src/streamHb.c | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/source/libs/stream/src/streamHb.c b/source/libs/stream/src/streamHb.c index 19391bf7a0..65102edc24 100644 --- a/source/libs/stream/src/streamHb.c +++ b/source/libs/stream/src/streamHb.c @@ -316,12 +316,13 @@ int32_t createMetaHbInfo(int64_t* pRid, SMetaHbInfo** pRes) { return terrno; } - streamTmrStart(streamMetaHbToMnode, META_HB_CHECK_INTERVAL, pRid, streamTimer, &pInfo->hbTmr, 0, "stream-hb"); pInfo->tickCounter = 0; pInfo->msgSendTs = -1; pInfo->hbCount = 0; *pRes = pInfo; + + streamTmrStart(streamMetaHbToMnode, META_HB_CHECK_INTERVAL, pRid, streamTimer, &pInfo->hbTmr, 0, "stream-hb"); return TSDB_CODE_SUCCESS; }