From 90bec3cd3f416128020d22d10f0888968d628cd9 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 29 Aug 2024 16:00:22 +0800 Subject: [PATCH] fix(stream): remove wait for quit in meta hb timer. --- source/libs/stream/src/streamHb.c | 54 +++++++++++++++---------------- 1 file changed, 26 insertions(+), 28 deletions(-) diff --git a/source/libs/stream/src/streamHb.c b/source/libs/stream/src/streamHb.c index 7a703ae30c..1ef938494e 100644 --- a/source/libs/stream/src/streamHb.c +++ b/source/libs/stream/src/streamHb.c @@ -25,7 +25,6 @@ int32_t streamMetaId = 0; struct SMetaHbInfo { tmr_h hbTmr; - int32_t stopFlag; int32_t tickCounter; int32_t hbCount; int64_t hbStart; @@ -242,6 +241,8 @@ int32_t streamMetaSendHbHelper(SStreamMeta* pMeta) { void streamMetaHbToMnode(void* param, void* tmrId) { int64_t rid = *(int64_t*)param; int32_t code = 0; + int32_t vgId = 0; + int32_t role = 0; SStreamMeta* pMeta = taosAcquireRef(streamMetaId, rid); if (pMeta == NULL) { @@ -249,29 +250,41 @@ void streamMetaHbToMnode(void* param, void* tmrId) { return; } + vgId = pMeta->vgId; + role = pMeta->role; + // need to stop, stop now - if (pMeta->pHbInfo->stopFlag == STREAM_META_WILL_STOP) { // todo refactor: not need this now, use closeFlag in Meta - pMeta->pHbInfo->stopFlag = STREAM_META_OK_TO_STOP; + if (pMeta->closeFlag) { + pMeta->pHbInfo->hbStart = 0; code = taosReleaseRef(streamMetaId, rid); if (code == TSDB_CODE_SUCCESS) { - stDebug("vgId:%d jump out of meta timer", pMeta->vgId); + stDebug("vgId:%d jump out of meta timer", vgId); } else { - stError("vgId:%d jump out of meta timer, failed to release the meta rid:%" PRId64, pMeta->vgId, rid); + stError("vgId:%d jump out of meta timer, failed to release the meta rid:%" PRId64, vgId, rid); } return; } // not leader not send msg if (pMeta->role != NODE_ROLE_LEADER) { + pMeta->pHbInfo->hbStart = 0; code = taosReleaseRef(streamMetaId, rid); if (code == TSDB_CODE_SUCCESS) { - stInfo("vgId:%d role:%d not leader not send hb to mnode", pMeta->vgId, pMeta->role); + stInfo("vgId:%d role:%d not leader not send hb to mnode", vgId, role); } else { - stError("vgId:%d role:%d not leader not send hb to mnodefailed to release the meta rid:%" PRId64, pMeta->vgId, - pMeta->role, rid); + stError("vgId:%d role:%d not leader not send hb to mnodefailed to release the meta rid:%" PRId64, vgId, role, rid); } + return; + } - pMeta->pHbInfo->hbStart = 0; + if (!waitForEnoughDuration(pMeta->pHbInfo)) { + streamTmrReset(streamMetaHbToMnode, META_HB_CHECK_INTERVAL, param, streamTimer, &pMeta->pHbInfo->hbTmr, vgId, + "meta-hb-tmr"); + + code = taosReleaseRef(streamMetaId, rid); + if (code) { + stError("vgId:%d in meta timer, failed to release the meta rid:%" PRId64, vgId, rid); + } return; } @@ -280,17 +293,6 @@ void streamMetaHbToMnode(void* param, void* tmrId) { pMeta->pHbInfo->hbStart = taosGetTimestampMs(); } - if (!waitForEnoughDuration(pMeta->pHbInfo)) { - streamTmrReset(streamMetaHbToMnode, META_HB_CHECK_INTERVAL, param, streamTimer, &pMeta->pHbInfo->hbTmr, pMeta->vgId, - "meta-hb-tmr"); - - code = taosReleaseRef(streamMetaId, rid); - if (code) { - stError("vgId:%d in meta timer, failed to release the meta rid:%" PRId64, pMeta->vgId, rid); - } - return; - } - streamMetaRLock(pMeta); code = streamMetaSendHbHelper(pMeta); if (code) { @@ -300,10 +302,10 @@ void streamMetaHbToMnode(void* param, void* tmrId) { streamTmrReset(streamMetaHbToMnode, META_HB_CHECK_INTERVAL, param, streamTimer, &pMeta->pHbInfo->hbTmr, pMeta->vgId, "meta-hb-tmr"); - code = taosReleaseRef(streamMetaId, rid); + code = taosReleaseRef(streamMetaId, rid); if (code) { - stError("vgId:%d in meta timer, failed to release the meta rid:%" PRId64, pMeta->vgId, rid); + stError("vgId:%d in meta timer, failed to release the meta rid:%" PRId64, vgId, rid); } } @@ -316,7 +318,6 @@ int32_t createMetaHbInfo(int64_t* pRid, SMetaHbInfo** pRes) { pInfo->hbTmr = taosTmrStart(streamMetaHbToMnode, META_HB_CHECK_INTERVAL, pRid, streamTimer); pInfo->tickCounter = 0; - pInfo->stopFlag = 0; pInfo->msgSendTs = -1; pInfo->hbCount = 0; @@ -340,11 +341,8 @@ void destroyMetaHbInfo(SMetaHbInfo* pInfo) { void streamMetaWaitForHbTmrQuit(SStreamMeta* pMeta) { // wait for the stream meta hb function stopping if (pMeta->role == NODE_ROLE_LEADER) { - pMeta->pHbInfo->stopFlag = STREAM_META_WILL_STOP; - while (pMeta->pHbInfo->stopFlag != STREAM_META_OK_TO_STOP) { - taosMsleep(100); - stDebug("vgId:%d wait for meta to stop timer", pMeta->vgId); - } + taosMsleep(2 * META_HB_CHECK_INTERVAL); + stDebug("vgId:%d wait for meta to stop timer", pMeta->vgId); } }