diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index cf7cda4f34..fdaf02864a 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -369,10 +369,11 @@ struct SStreamTask { SSHashObj* pNameMap; }; -typedef struct SMgmtInfo { - SEpSet epset; - int32_t mnodeId; -} SMgmtInfo; +typedef struct SMetaHbInfo { + tmr_h hbTmr; + int32_t stopFlag; + int32_t tickCounter; +} SMetaHbInfo; // meta typedef struct SStreamMeta { @@ -393,11 +394,9 @@ typedef struct SStreamMeta { int64_t streamBackendRid; SHashObj* pTaskBackendUnique; TdThreadMutex backendMutex; - tmr_h hbTmr; - - int32_t killed; - int32_t closedTask; - int32_t chkptNotReadyTasks; + SMetaHbInfo hbInfo; + int32_t closedTask; + int32_t chkptNotReadyTasks; int64_t chkpId; SArray* chkpSaved; diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index f4e6c8c919..c59b32a370 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -218,8 +218,9 @@ void tqNotifyClose(STQ* pTq) { taosWUnLockLatch(&pMeta->lock); - pMeta->killed = STREAM_META_WILL_STOP; - while(pMeta->killed != STREAM_META_OK_TO_STOP) { + // wait for the stream meta hb function stopping + pMeta->hbInfo.stopFlag = STREAM_META_WILL_STOP; + while(pMeta->hbInfo.stopFlag != STREAM_META_OK_TO_STOP) { taosMsleep(100); tqDebug("vgId:%d wait for meta to stop timer", pMeta->vgId); } diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 9a0f6a03d3..4353fcb15d 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -13,7 +13,7 @@ * along with this program. If not, see . */ -#include +#include "tmisce.h" #include "executor.h" #include "streamBackendRocksdb.h" #include "streamInt.h" @@ -21,6 +21,9 @@ #include "tstream.h" #include "ttimer.h" +#define META_HB_CHECK_INTERVAL 200 +#define META_HB_SEND_IDLE_COUNTER 25 // send hb every 5 sec + static TdThreadOnce streamMetaModuleInit = PTHREAD_ONCE_INIT; int32_t streamBackendId = 0; int32_t streamBackendCfWrapperId = 0; @@ -89,7 +92,9 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF pMeta->stage = stage; // send heartbeat every 5sec. - pMeta->hbTmr = taosTmrStart(metaHbToMnode, 5000, pMeta, streamEnv.timer); + pMeta->hbInfo.hbTmr = taosTmrStart(metaHbToMnode, META_HB_CHECK_INTERVAL, pMeta, streamEnv.timer); + pMeta->hbInfo.tickCounter = 0; + pMeta->hbInfo.stopFlag = 0; pMeta->pTaskBackendUnique = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK); @@ -605,16 +610,30 @@ int32_t tDecodeStreamHbMsg(SDecoder* pDecoder, SStreamHbMsg* pReq) { return 0; } +static bool readyToSendHb(SMetaHbInfo* pInfo) { + if ((++pInfo->tickCounter) >= META_HB_SEND_IDLE_COUNTER) { + // reset the counter + pInfo->tickCounter = 0; + return true; + } + return false; +} + void metaHbToMnode(void* param, void* tmrId) { SStreamMeta* pMeta = param; SStreamHbMsg hbMsg = {0}; - if (pMeta->killed == STREAM_META_WILL_STOP) { - pMeta->killed = STREAM_META_OK_TO_STOP; + // need to stop, stop now + if (pMeta->hbInfo.stopFlag == STREAM_META_WILL_STOP) { + pMeta->hbInfo.stopFlag = STREAM_META_OK_TO_STOP; qDebug("vgId:%d jump out of meta timer", pMeta->vgId); return; } + if (!readyToSendHb(&pMeta->hbInfo)) { + return; + } + taosRLockLatch(&pMeta->lock); int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta); @@ -679,5 +698,5 @@ void metaHbToMnode(void* param, void* tmrId) { qDebug("vgId:%d, build and send hb to mnode", pMeta->vgId); tmsgSendReq(&epset, &msg); - taosTmrReset(metaHbToMnode, 5000, pMeta, streamEnv.timer, &pMeta->hbTmr); + taosTmrReset(metaHbToMnode, META_HB_CHECK_INTERVAL, pMeta, streamEnv.timer, &pMeta->hbInfo.hbTmr); } \ No newline at end of file