From a0dcec5890126c299dbad341a64ab3f1b62066b1 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 8 Aug 2023 11:23:19 +0800 Subject: [PATCH] enh(stream): add function handler for hb message. --- source/dnode/mnode/impl/src/mndStream.c | 2 ++ source/libs/stream/src/streamMeta.c | 14 +++++++++++--- 2 files changed, 13 insertions(+), 3 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index bbff43371c..d23efe175d 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -54,6 +54,7 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq); static int32_t mndProcessDropStreamReq(SRpcMsg *pReq); static int32_t mndProcessStreamCheckpointTmr(SRpcMsg *pReq); static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq); +static int32_t mndProcessStreamHb(SRpcMsg *pReq); static int32_t mndProcessRecoverStreamReq(SRpcMsg *pReq); static int32_t mndProcessStreamMetaReq(SRpcMsg *pReq); static int32_t mndGetStreamMeta(SRpcMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta); @@ -92,6 +93,7 @@ int32_t mndInitStream(SMnode *pMnode) { mndSetMsgHandle(pMnode, TDMT_VND_STREAM_CHECK_POINT_SOURCE_RSP, mndTransProcessRsp); mndSetMsgHandle(pMnode, TDMT_MND_STREAM_CHECKPOINT_TIMER, mndProcessStreamCheckpointTmr); mndSetMsgHandle(pMnode, TDMT_MND_STREAM_BEGIN_CHECKPOINT, mndProcessStreamDoCheckpoint); + mndSetMsgHandle(pMnode, TDMT_MND_STREAM_HEARTBEAT, mndProcessStreamHb); mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_REPORT_CHECKPOINT, mndTransProcessRsp); mndSetMsgHandle(pMnode, TDMT_MND_PAUSE_STREAM, mndProcessPauseStreamReq); diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index f9537a51c8..6b99d52f98 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -93,7 +93,7 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF pMeta->expandFunc = expandFunc; // send heartbeat every 20sec. -// pMeta->hbTmr = taosTmrStart(metaHbToMnode, 20000, pMeta, streamEnv.timer); +// pMeta->hbTmr = taosTmrStart(metaHbToMnode, 20000, ahandle, streamEnv.timer); pMeta->pTaskBackendUnique = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK); @@ -549,9 +549,17 @@ int32_t tDecodeStreamHbMsg(SDecoder* pDecoder, SStreamHbMsg* pReq) { } void metaHbToMnode(void* param, void* tmrId) { - SStreamMeta* pMeta = param; + STQ* pMeta = param; SStreamHbMsg hbMsg = {0}; + taosRLockLatch(&pMeta->lock); + int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta); + taosRUnLockLatch(&pMeta->lock); + + hbMsg.numOfTasks = numOfTasks; + hbMsg.vgId = pMeta->vgId; + hbMsg.epset = ; + int32_t code = 0; int32_t tlen = 0; @@ -581,7 +589,7 @@ void metaHbToMnode(void* param, void* tmrId) { SRpcMsg msg = {0}; initRpcMsg(&msg, TDMT_MND_STREAM_HEARTBEAT, buf, tlen + sizeof(SMsgHead)); - qDebug("vgId:%d, send hb to mnode", pMeta->mgmtInfo.mnodeId); + qDebug("vgId:%d, build and send hb to mnode", pMeta->mgmtInfo.mnodeId); tmsgSendReq(&pMeta->mgmtInfo.epset, &msg); } \ No newline at end of file