From cfcb85ef94ad30b2fdd5d548ab3bbe21d7902e1e Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 11 Dec 2024 17:25:01 +0800 Subject: [PATCH] refactor(stream): update mnode epset in hb. --- include/common/streamMsg.h | 1 + source/common/src/msg/streamMsg.c | 2 + source/dnode/mnode/impl/src/mndStreamHb.c | 22 ++++-- source/libs/stream/src/streamHb.c | 94 ++++++++++++++++++----- 4 files changed, 92 insertions(+), 27 deletions(-) diff --git a/include/common/streamMsg.h b/include/common/streamMsg.h index 3db92ba58d..d410bd17e0 100644 --- a/include/common/streamMsg.h +++ b/include/common/streamMsg.h @@ -188,6 +188,7 @@ void tCleanupStreamHbMsg(SStreamHbMsg* pMsg); typedef struct { SMsgHead head; int32_t msgId; + SEpSet mndEpset; } SMStreamHbRspMsg; int32_t tEncodeStreamHbRsp(SEncoder* pEncoder, const SMStreamHbRspMsg* pRsp); diff --git a/source/common/src/msg/streamMsg.c b/source/common/src/msg/streamMsg.c index c92ab52ac1..b5f6900321 100644 --- a/source/common/src/msg/streamMsg.c +++ b/source/common/src/msg/streamMsg.c @@ -651,6 +651,7 @@ int32_t tEncodeStreamHbRsp(SEncoder* pEncoder, const SMStreamHbRspMsg* pRsp) { TAOS_CHECK_EXIT(tStartEncode(pEncoder)); TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pRsp->msgId)); + TAOS_CHECK_EXIT(tEncodeSEpSet(pEncoder, &pRsp->mndEpset)); tEndEncode(pEncoder); _exit: @@ -663,6 +664,7 @@ int32_t tDecodeStreamHbRsp(SDecoder* pDecoder, SMStreamHbRspMsg* pRsp) { TAOS_CHECK_EXIT(tStartDecode(pDecoder)); TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pRsp->msgId)); + TAOS_CHECK_EXIT(tDecodeSEpSet(pDecoder, &pRsp->mndEpset)); tEndDecode(pDecoder); _exit: diff --git a/source/dnode/mnode/impl/src/mndStreamHb.c b/source/dnode/mnode/impl/src/mndStreamHb.c index 4b3db28aa1..b7b2764442 100644 --- a/source/dnode/mnode/impl/src/mndStreamHb.c +++ b/source/dnode/mnode/impl/src/mndStreamHb.c @@ -15,6 +15,8 @@ #include "mndStream.h" #include "mndTrans.h" +#include "mndMnode.h" +#include "tmisce.h" typedef struct SFailedCheckpointInfo { int64_t streamUid; @@ -31,7 +33,7 @@ static int32_t setNodeEpsetExpiredFlag(const SArray *pNodeList); static int32_t suspendAllStreams(SMnode *pMnode, SRpcHandleInfo *info); static bool validateHbMsg(const SArray *pNodeList, int32_t vgId); static void cleanupAfterProcessHbMsg(SStreamHbMsg *pReq, SArray *pFailedChkptList, SArray *pOrphanTasks); -static void doSendHbMsgRsp(int32_t code, SRpcHandleInfo *pRpcInfo, int32_t vgId, int32_t msgId); +static void doSendHbMsgRsp(int32_t code, SRpcHandleInfo *pRpcInfo, SEpSet* pEpset, int32_t vgId, int32_t msgId); static void checkforOrphanTask(SMnode* pMnode, STaskStatusEntry* p, SArray* pOrphanTasks); void updateStageInfo(STaskStatusEntry *pTaskEntry, int64_t stage) { @@ -329,6 +331,8 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { SArray *pFailedChkpt = NULL; SArray *pOrphanTasks = NULL; int32_t code = 0; + SDecoder decoder = {0}; + SEpSet mnodeEpset = {0}; if ((code = grantCheckExpire(TSDB_GRANT_STREAMS)) < 0) { if (suspendAllStreams(pMnode, &pReq->info) < 0) { @@ -336,7 +340,6 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { } } - SDecoder decoder = {0}; tDecoderInit(&decoder, pReq->pCont, pReq->contLen); if (tDecodeStreamHbMsg(&decoder, &req) < 0) { @@ -357,13 +360,15 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { TAOS_RETURN(terrno); } + mndGetMnodeEpSet(pMnode, &mnodeEpset); + streamMutexLock(&execInfo.lock); mndInitStreamExecInfo(pMnode, &execInfo); if (!validateHbMsg(execInfo.pNodeList, req.vgId)) { mError("vgId:%d not exists in nodeList buf, discarded", req.vgId); - doSendHbMsgRsp(terrno, &pReq->info, req.vgId, req.msgId); + doSendHbMsgRsp(terrno, &pReq->info, &mnodeEpset, req.vgId, req.msgId); streamMutexUnlock(&execInfo.lock); cleanupAfterProcessHbMsg(&req, pFailedChkpt, pOrphanTasks); @@ -383,9 +388,9 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { if ((pEntry->lastHbMsgId == req.msgId) && (pEntry->lastHbMsgTs == req.ts)) { mError("vgId:%d HbMsgId:%d already handled, bh msg discard, and send HbRsp", pEntry->nodeId, req.msgId); - // return directly and after the vnode to continue to send the next HbMsg. + // return directly and allow the vnode to continue to send the next HbMsg. terrno = TSDB_CODE_SUCCESS; - doSendHbMsgRsp(terrno, &pReq->info, req.vgId, req.msgId); + doSendHbMsgRsp(terrno, &pReq->info, &mnodeEpset, req.vgId, req.msgId); streamMutexUnlock(&execInfo.lock); cleanupAfterProcessHbMsg(&req, pFailedChkpt, pOrphanTasks); @@ -529,7 +534,7 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { streamMutexUnlock(&execInfo.lock); - doSendHbMsgRsp(TSDB_CODE_SUCCESS, &pReq->info, req.vgId, req.msgId); + doSendHbMsgRsp(TSDB_CODE_SUCCESS, &pReq->info, &mnodeEpset, req.vgId, req.msgId); cleanupAfterProcessHbMsg(&req, pFailedChkpt, pOrphanTasks); return code; @@ -552,12 +557,13 @@ void cleanupAfterProcessHbMsg(SStreamHbMsg *pReq, SArray *pFailedChkptList, SArr taosArrayDestroy(pOrphanTasks); } -void doSendHbMsgRsp(int32_t code, SRpcHandleInfo *pRpcInfo, int32_t vgId, int32_t msgId) { +void doSendHbMsgRsp(int32_t code, SRpcHandleInfo *pRpcInfo, SEpSet* pMndEpset, int32_t vgId, int32_t msgId) { int32_t ret = 0; int32_t tlen = 0; void *buf = NULL; - const SMStreamHbRspMsg msg = {.msgId = msgId}; + SMStreamHbRspMsg msg = {.msgId = msgId};//, .mndEpset = *pMndEpset}; + epsetAssign(&msg.mndEpset, pMndEpset); tEncodeSize(tEncodeStreamHbRsp, &msg, tlen, ret); if (ret < 0) { diff --git a/source/libs/stream/src/streamHb.c b/source/libs/stream/src/streamHb.c index 25cb28f77c..aadf10db6a 100644 --- a/source/libs/stream/src/streamHb.c +++ b/source/libs/stream/src/streamHb.c @@ -108,6 +108,67 @@ static int32_t doSendHbMsgInfo(SStreamHbMsg* pMsg, SStreamMeta* pMeta, SEpSet* p return tmsgSendReq(pEpset, &msg); } +static int32_t streamTaskGetMndEpset(SStreamMeta* pMeta, SEpSet* pEpSet) { + int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta); + for (int32_t i = 0; i < numOfTasks; ++i) { + SStreamTaskId* pId = taosArrayGet(pMeta->pTaskList, i); + STaskId id = {.streamId = pId->streamId, .taskId = pId->taskId}; + SStreamTask* pTask = NULL; + + int32_t code = streamMetaAcquireTaskUnsafe(pMeta, &id, &pTask); + if (code != 0) { + continue; + } + + if (pTask->info.fillHistory == 1) { + streamMetaReleaseTask(pMeta, pTask); + continue; + } + + epsetAssign(pEpSet, &pTask->info.mnodeEpset); + streamMetaReleaseTask(pMeta, pTask); + return TSDB_CODE_SUCCESS; + } + + return TSDB_CODE_FAILED; +} + +static int32_t streamTaskUpdateMndEpset(SStreamMeta* pMeta, SEpSet* pEpSet) { + int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta); + + for (int32_t i = 0; i < numOfTasks; ++i) { + SStreamTaskId* pId = taosArrayGet(pMeta->pTaskList, i); + STaskId id = {.streamId = pId->streamId, .taskId = pId->taskId}; + SStreamTask* pTask = NULL; + + int32_t code = streamMetaAcquireTaskUnsafe(pMeta, &id, &pTask); + if (code != 0) { + stError("vgId:%d s-task:0x%x failed to acquire it for updating mnode epset, code:%s", pMeta->vgId, pId->taskId, + tstrerror(code)); + continue; + } + + // ignore this error since it is only for log file + char buf[256] = {0}; + int32_t ret = epsetToStr(&pTask->info.mnodeEpset, buf, tListLen(buf)); + if (ret != 0) { // print error and continue + stError("failed to convert epset to str, code:%s", tstrerror(ret)); + } + + char newBuf[256] = {0}; + ret = epsetToStr(pEpSet, newBuf, tListLen(newBuf)); + if (ret != 0) { + stError("failed to convert epset to str, code:%s", tstrerror(ret)); + } + + epsetAssign(&pTask->info.mnodeEpset, pEpSet); + stInfo("s-task:0x%x update mnd epset, from %s to %s", pId->taskId, buf, newBuf); + streamMetaReleaseTask(pMeta, pTask); + } + + return TSDB_CODE_SUCCESS; +} + // NOTE: this task should be executed within the SStreamMeta lock region. int32_t streamMetaSendHbHelper(SStreamMeta* pMeta) { SEpSet epset = {0}; @@ -121,24 +182,11 @@ int32_t streamMetaSendHbHelper(SStreamMeta* pMeta) { stDebug("vgId:%d hbMsg rsp not recv, send current hbMsg, msgId:%d, total:%d again", pMeta->vgId, pInfo->hbMsg.msgId, pInfo->hbCount); - for(int32_t i = 0; i < numOfTasks; ++i) { - SStreamTaskId* pId = taosArrayGet(pMeta->pTaskList, i); - STaskId id = {.streamId = pId->streamId, .taskId = pId->taskId}; - SStreamTask* pTask = NULL; - - code = streamMetaAcquireTaskUnsafe(pMeta, &id, &pTask); - if (code != 0) { - continue; - } - - if (pTask->info.fillHistory == 1) { - streamMetaReleaseTask(pMeta, pTask); - continue; - } - - epsetAssign(&epset, &pTask->info.mnodeEpset); - streamMetaReleaseTask(pMeta, pTask); - break; + code = streamTaskGetMndEpset(pMeta, &epset); + if (code != 0) { + stError("vgId:%d failed to get the mnode epset, not retrying sending hbMsg, msgId:%d", pMeta->vgId, + pInfo->hbMsg.msgId); + return code; } pInfo->msgSendTs = taosGetTimestampMs(); @@ -372,9 +420,10 @@ void streamMetaGetHbSendInfo(SMetaHbInfo* pInfo, int64_t* pStartTs, int32_t* pSe } int32_t streamProcessHeartbeatRsp(SStreamMeta* pMeta, SMStreamHbRspMsg* pRsp) { - stDebug("vgId:%d process hbMsg rsp, msgId:%d rsp confirmed", pMeta->vgId, pRsp->msgId); SMetaHbInfo* pInfo = pMeta->pHbInfo; + SEpSet epset = {0}; + stDebug("vgId:%d process hbMsg rsp, msgId:%d rsp confirmed", pMeta->vgId, pRsp->msgId); streamMetaWLock(pMeta); // current waiting rsp recved @@ -384,6 +433,13 @@ int32_t streamProcessHeartbeatRsp(SStreamMeta* pMeta, SMStreamHbRspMsg* pRsp) { pInfo->hbCount += 1; pInfo->msgSendTs = -1; + + streamTaskGetMndEpset(pMeta, &epset); + if (!isEpsetEqual(&pRsp->mndEpset, &epset)) { + // we need to update the mnode epset for each tasks + stInfo("vgId:%d mnode epset updated, update mnode epset for all tasks", pMeta->vgId); + streamTaskUpdateMndEpset(pMeta, &pRsp->mndEpset); + } } else { stWarn("vgId:%d recv expired hb rsp, msgId:%d, discarded", pMeta->vgId, pRsp->msgId); }