From a44e17d5ea718ec3359ad65103985ccf1ed4c7c9 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 26 Jun 2024 15:21:14 +0800 Subject: [PATCH] fix(stream): enable to re-send hbmsg if mnode failed to recv this hbmsg. --- include/libs/stream/streamMsg.h | 6 + include/libs/stream/tstream.h | 4 +- source/dnode/mnode/impl/inc/mndStream.h | 2 +- source/dnode/mnode/impl/src/mndStreamHb.c | 16 +- source/dnode/vnode/src/tqCommon/tqCommon.c | 4 +- source/libs/stream/src/streamHb.c | 170 ++++++++++++++------- source/libs/stream/src/streamMsg.c | 10 +- 7 files changed, 146 insertions(+), 66 deletions(-) diff --git a/include/libs/stream/streamMsg.h b/include/libs/stream/streamMsg.h index b16932370d..5ee70e71bd 100644 --- a/include/libs/stream/streamMsg.h +++ b/include/libs/stream/streamMsg.h @@ -163,6 +163,7 @@ int32_t tDecodeStreamTaskCheckpointReq(SDecoder* pDecoder, SStreamTaskCheckpoint typedef struct SStreamHbMsg { int32_t vgId; + int32_t msgId; int32_t numOfTasks; SArray* pTaskStatus; // SArray SArray* pUpdateNodes; // SArray, needs update the epsets in stream tasks for those nodes. @@ -172,6 +173,11 @@ int32_t tEncodeStreamHbMsg(SEncoder* pEncoder, const SStreamHbMsg* pRsp); int32_t tDecodeStreamHbMsg(SDecoder* pDecoder, SStreamHbMsg* pRsp); void tCleanupStreamHbMsg(SStreamHbMsg* pMsg); +typedef struct { + SMsgHead head; + int32_t msgId; +} SMStreamHbRspMsg; + typedef struct SRetrieveChkptTriggerReq { SMsgHead head; int64_t streamId; diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 6acd7aa60a..74c4b00c84 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -523,7 +523,6 @@ typedef struct STaskUpdateEntry { } STaskUpdateEntry; typedef int32_t (*__state_trans_user_fn)(SStreamTask*, void* param); -typedef int32_t (*__stream_task_expand_fn)(struct SStreamTask* pTask); SStreamTask* tNewStreamTask(int64_t streamId, int8_t taskLevel, SEpSet* pEpset, bool fillHistory, int64_t triggerParam, SArray* pTaskList, bool hasFillhistory, int8_t subtableWithoutMd5); @@ -791,7 +790,8 @@ int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq); int32_t streamTaskBroadcastRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq *req); void streamTaskSendRetrieveRsp(SStreamRetrieveReq *pReq, SRpcMsg* pRsp); -int32_t streamTaskSendLatestCheckpointInfo(SStreamTask* pTask); +int32_t streamProcessHeartbeatRsp(SStreamMeta* pMeta, SMStreamHbRspMsg* pRsp); + #ifdef __cplusplus } diff --git a/source/dnode/mnode/impl/inc/mndStream.h b/source/dnode/mnode/impl/inc/mndStream.h index e358e0f6e5..54fbb6c31b 100644 --- a/source/dnode/mnode/impl/inc/mndStream.h +++ b/source/dnode/mnode/impl/inc/mndStream.h @@ -83,7 +83,7 @@ typedef struct SOrphanTask { typedef struct { SMsgHead head; -} SMStreamHbRspMsg, SMStreamReqCheckpointRsp, SMStreamUpdateChkptRsp; +} SMStreamReqCheckpointRsp, SMStreamUpdateChkptRsp; typedef struct STaskChkptInfo { int32_t nodeId; diff --git a/source/dnode/mnode/impl/src/mndStreamHb.c b/source/dnode/mnode/impl/src/mndStreamHb.c index a79fe0cf0a..170a1f1843 100644 --- a/source/dnode/mnode/impl/src/mndStreamHb.c +++ b/source/dnode/mnode/impl/src/mndStreamHb.c @@ -236,7 +236,7 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { } tDecoderClear(&decoder); - mTrace("receive stream-meta hb from vgId:%d, active numOfTasks:%d", req.vgId, req.numOfTasks); + mTrace("receive stream-meta hb from vgId:%d, active numOfTasks:%d, msgId:%d", req.vgId, req.numOfTasks, req.msgId); pFailedChkpt = taosArrayInit(4, sizeof(SFailedCheckpointInfo)); pOrphanTasks = taosArrayInit(4, sizeof(SOrphanTask)); @@ -333,21 +333,23 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { } taosThreadMutexUnlock(&execInfo.lock); - tCleanupStreamHbMsg(&req); - - taosArrayDestroy(pFailedChkpt); - taosArrayDestroy(pOrphanTasks); { SRpcMsg rsp = {.code = 0, .info = pReq->info, .contLen = sizeof(SMStreamHbRspMsg)}; rsp.pCont = rpcMallocCont(rsp.contLen); - SMsgHead* pHead = rsp.pCont; - pHead->vgId = htonl(req.vgId); + + SMStreamHbRspMsg* pMsg = rsp.pCont; + pMsg->head.vgId = htonl(req.vgId); + pMsg->msgId = req.msgId; tmsgSendRsp(&rsp); pReq->info.handle = NULL; // disable auto rsp } + tCleanupStreamHbMsg(&req); + taosArrayDestroy(pFailedChkpt); + taosArrayDestroy(pOrphanTasks); + return TSDB_CODE_SUCCESS; } diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index a47dbf1f3f..71981ece3b 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -1080,7 +1080,9 @@ int32_t doProcessDummyRspMsg(SStreamMeta* UNUSED_PARAM(pMeta), SRpcMsg* pMsg) { return TSDB_CODE_SUCCESS; } -int32_t tqStreamProcessStreamHbRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) { return doProcessDummyRspMsg(pMeta, pMsg); } +int32_t tqStreamProcessStreamHbRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) { + return streamProcessHeartbeatRsp(pMeta, pMsg->pCont); +} int32_t tqStreamProcessReqCheckpointRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) { return doProcessDummyRspMsg(pMeta, pMsg); } diff --git a/source/libs/stream/src/streamHb.c b/source/libs/stream/src/streamHb.c index 95bbbcea8b..a4bd43e6e6 100644 --- a/source/libs/stream/src/streamHb.c +++ b/source/libs/stream/src/streamHb.c @@ -24,11 +24,13 @@ int32_t streamMetaId = 0; struct SMetaHbInfo { - tmr_h hbTmr; - int32_t stopFlag; - int32_t tickCounter; - int32_t hbCount; - int64_t hbStart; + tmr_h hbTmr; + int32_t stopFlag; + int32_t tickCounter; + int32_t hbCount; + int64_t hbStart; + int64_t msgSendTs; + SStreamHbMsg hbMsg; }; static bool waitForEnoughDuration(SMetaHbInfo* pInfo) { @@ -61,7 +63,7 @@ static void addUpdateNodeIntoHbMsg(SStreamTask* pTask, SStreamHbMsg* pMsg) { bool exist = existInHbMsg(pMsg, pTaskEpset); if (!exist) { taosArrayPush(pMsg->pUpdateNodes, &pTaskEpset->nodeId); - stDebug("vgId:%d nodeId:%d added into hb update list, total:%d", pMeta->vgId, pTaskEpset->nodeId, + stDebug("vgId:%d nodeId:%d added into hbMsg update list, total:%d", pMeta->vgId, pTaskEpset->nodeId, (int32_t)taosArrayGetSize(pMsg->pUpdateNodes)); } } @@ -70,20 +72,91 @@ static void addUpdateNodeIntoHbMsg(SStreamTask* pTask, SStreamHbMsg* pMsg) { taosThreadMutexUnlock(&pTask->lock); } +static int32_t doSendHbMsgInfo(SStreamHbMsg* pMsg, SStreamMeta* pMeta, SEpSet* pEpset) { + int32_t code = 0; + int32_t tlen = 0; + + tEncodeSize(tEncodeStreamHbMsg, pMsg, tlen, code); + if (code < 0) { + stError("vgId:%d encode stream hb msg failed, code:%s", pMeta->vgId, tstrerror(code)); + return TSDB_CODE_FAILED; + } + + void* buf = rpcMallocCont(tlen); + if (buf == NULL) { + stError("vgId:%d encode stream hb msg failed, code:%s", pMeta->vgId, tstrerror(TSDB_CODE_OUT_OF_MEMORY)); + return TSDB_CODE_FAILED; + } + + SEncoder encoder; + tEncoderInit(&encoder, buf, tlen); + if ((code = tEncodeStreamHbMsg(&encoder, pMsg)) < 0) { + rpcFreeCont(buf); + stError("vgId:%d encode stream hb msg failed, code:%s", pMeta->vgId, tstrerror(code)); + return TSDB_CODE_FAILED; + } + tEncoderClear(&encoder); + + stDebug("vgId:%d send hb to mnode, numOfTasks:%d msgId:%d", pMeta->vgId, pMsg->numOfTasks, pMsg->msgId); + + SRpcMsg msg = {0}; + initRpcMsg(&msg, TDMT_MND_STREAM_HEARTBEAT, buf, tlen); + tmsgSendReq(pEpset, &msg); + + return TSDB_CODE_SUCCESS; +} + +// NOTE: this task should be executed within the SStreamMeta lock region. int32_t streamMetaSendHbHelper(SStreamMeta* pMeta) { - SStreamHbMsg hbMsg = {0}; SEpSet epset = {0}; bool hasMnodeEpset = false; int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta); + SMetaHbInfo* pInfo = pMeta->pHbInfo; - hbMsg.vgId = pMeta->vgId; - hbMsg.pTaskStatus = taosArrayInit(numOfTasks, sizeof(STaskStatusEntry)); - hbMsg.pUpdateNodes = taosArrayInit(numOfTasks, sizeof(int32_t)); + // not recv the hb msg rsp yet, send current hb msg again + if (pInfo->msgSendTs > 0) { + stDebug("vgId:%d hbMsg rsp not recv, send current hbMsg, msgId:%d, total:%d again", pMeta->vgId, pInfo->hbMsg.msgId, + pInfo->hbCount); + + for(int32_t i = 0; i < numOfTasks; ++i) { + SStreamTaskId* pId = taosArrayGet(pMeta->pTaskList, i); + STaskId id = {.streamId = pId->streamId, .taskId = pId->taskId}; + SStreamTask** pTask = taosHashGet(pMeta->pTasksMap, &id, sizeof(id)); + if (pTask == NULL) { + continue; + } + + if ((*pTask)->info.fillHistory == 1) { + continue; + } + + epsetAssign(&epset, &(*pTask)->info.mnodeEpset); + break; + } + + pInfo->msgSendTs = taosGetTimestampMs(); + doSendHbMsgInfo(&pInfo->hbMsg, pMeta, &epset); + return TSDB_CODE_SUCCESS; + } + + SStreamHbMsg* pMsg = &pInfo->hbMsg; + stDebug("vgId:%d build stream hbMsg, leader:%d msgId:%d", pMeta->vgId, (pMeta->role == NODE_ROLE_LEADER), + pMeta->pHbInfo->hbCount); + + pMsg->vgId = pMeta->vgId; + pMsg->msgId = pMeta->pHbInfo->hbCount; + pMsg->pTaskStatus = taosArrayInit(numOfTasks, sizeof(STaskStatusEntry)); + pMsg->pUpdateNodes = taosArrayInit(numOfTasks, sizeof(int32_t)); + + if (pMsg->pTaskStatus == NULL || pMsg->pUpdateNodes == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return terrno; + } for (int32_t i = 0; i < numOfTasks; ++i) { SStreamTaskId* pId = taosArrayGet(pMeta->pTaskList, i); - STaskId id = {.streamId = pId->streamId, .taskId = pId->taskId}; + STaskId id = {.streamId = pId->streamId, .taskId = pId->taskId}; SStreamTask** pTask = taosHashGet(pMeta->pTasksMap, &id, sizeof(id)); if (pTask == NULL) { continue; @@ -103,12 +176,14 @@ int32_t streamMetaSendHbHelper(SStreamMeta* pMeta) { } if ((*pTask)->chkInfo.pActiveInfo->activeId != 0) { - entry.checkpointInfo.failed = ((*pTask)->chkInfo.pActiveInfo->failedId >= (*pTask)->chkInfo.pActiveInfo->activeId) ? 1 : 0; + entry.checkpointInfo.failed = + ((*pTask)->chkInfo.pActiveInfo->failedId >= (*pTask)->chkInfo.pActiveInfo->activeId) ? 1 : 0; entry.checkpointInfo.activeId = (*pTask)->chkInfo.pActiveInfo->activeId; entry.checkpointInfo.activeTransId = (*pTask)->chkInfo.pActiveInfo->transId; if (entry.checkpointInfo.failed) { - stInfo("s-task:%s set kill checkpoint trans in hb, transId:%d", (*pTask)->id.idStr, (*pTask)->chkInfo.pActiveInfo->transId); + stInfo("s-task:%s set kill checkpoint trans in hbMsg, transId:%d", (*pTask)->id.idStr, + (*pTask)->chkInfo.pActiveInfo->transId); } } @@ -121,55 +196,23 @@ int32_t streamMetaSendHbHelper(SStreamMeta* pMeta) { walReaderValidVersionRange((*pTask)->exec.pWalReader, &entry.verRange.minVer, &entry.verRange.maxVer); } - addUpdateNodeIntoHbMsg(*pTask, &hbMsg); - taosArrayPush(hbMsg.pTaskStatus, &entry); + addUpdateNodeIntoHbMsg(*pTask, pMsg); + taosArrayPush(pMsg->pTaskStatus, &entry); if (!hasMnodeEpset) { epsetAssign(&epset, &(*pTask)->info.mnodeEpset); hasMnodeEpset = true; } } - hbMsg.numOfTasks = taosArrayGetSize(hbMsg.pTaskStatus); + pMsg->numOfTasks = taosArrayGetSize(pMsg->pTaskStatus); if (hasMnodeEpset) { - int32_t code = 0; - int32_t tlen = 0; - - tEncodeSize(tEncodeStreamHbMsg, &hbMsg, tlen, code); - if (code < 0) { - stError("vgId:%d encode stream hb msg failed, code:%s", pMeta->vgId, tstrerror(code)); - goto _end; - } - - void* buf = rpcMallocCont(tlen); - if (buf == NULL) { - stError("vgId:%d encode stream hb msg failed, code:%s", pMeta->vgId, tstrerror(TSDB_CODE_OUT_OF_MEMORY)); - goto _end; - } - - SEncoder encoder; - tEncoderInit(&encoder, buf, tlen); - if ((code = tEncodeStreamHbMsg(&encoder, &hbMsg)) < 0) { - rpcFreeCont(buf); - stError("vgId:%d encode stream hb msg failed, code:%s", pMeta->vgId, tstrerror(code)); - goto _end; - } - tEncoderClear(&encoder); - - SRpcMsg msg = {0}; - initRpcMsg(&msg, TDMT_MND_STREAM_HEARTBEAT, buf, tlen); - - pMeta->pHbInfo->hbCount += 1; - stDebug("vgId:%d build and send hb to mnode, numOfTasks:%d total:%d", pMeta->vgId, hbMsg.numOfTasks, - pMeta->pHbInfo->hbCount); - - tmsgSendReq(&epset, &msg); + pInfo->msgSendTs = taosGetTimestampMs(); + doSendHbMsgInfo(pMsg, pMeta, &epset); } else { - stDebug("vgId:%d no tasks and no mnd epset, not send stream hb to mnode", pMeta->vgId); + stDebug("vgId:%d no tasks or no mnd epset, not send stream hb to mnode", pMeta->vgId); } - _end: - tCleanupStreamHbMsg(&hbMsg); return TSDB_CODE_SUCCESS; } @@ -209,7 +252,6 @@ void streamMetaHbToMnode(void* param, void* tmrId) { return; } - stDebug("vgId:%d build stream task hb, leader:%d", pMeta->vgId, (pMeta->role == NODE_ROLE_LEADER)); streamMetaRLock(pMeta); streamMetaSendHbHelper(pMeta); streamMetaRUnLock(pMeta); @@ -228,7 +270,8 @@ SMetaHbInfo* createMetaHbInfo(int64_t* pRid) { pInfo->hbTmr = taosTmrStart(streamMetaHbToMnode, META_HB_CHECK_INTERVAL, pRid, streamTimer); pInfo->tickCounter = 0; pInfo->stopFlag = 0; - + pInfo->msgSendTs = -1; + pInfo->hbCount = 0; return pInfo; } @@ -253,4 +296,25 @@ void streamMetaGetHbSendInfo(SMetaHbInfo* pInfo, int64_t* pStartTs, int32_t* pSe *pStartTs = pInfo->hbStart; *pSendCount = pInfo->hbCount; +} + +int32_t streamProcessHeartbeatRsp(SStreamMeta* pMeta, SMStreamHbRspMsg* pRsp) { + stDebug("vgId:%d process hbMsg rsp, msgId:%d rsp confirmed", pMeta->vgId, pRsp->msgId); + SMetaHbInfo* pInfo = pMeta->pHbInfo; + + streamMetaRLock(pMeta); + + // current waiting rsp recved + if (pRsp->msgId == pInfo->hbCount) { + tCleanupStreamHbMsg(&pInfo->hbMsg); + stDebug("vgId:%d hbMsg msgId:%d sendTs:%" PRId64 " recved confirmed", pMeta->vgId, pRsp->msgId, pInfo->msgSendTs); + + pInfo->hbCount += 1; + pInfo->msgSendTs = -1; + } else { + stWarn("vgId:%d recv expired hb rsp, msgId:%d, discarded", pMeta->vgId, pRsp->msgId); + } + + streamMetaRUnLock(pMeta); + return TSDB_CODE_SUCCESS; } \ No newline at end of file diff --git a/source/libs/stream/src/streamMsg.c b/source/libs/stream/src/streamMsg.c index 13a02e5637..25549b43f6 100644 --- a/source/libs/stream/src/streamMsg.c +++ b/source/libs/stream/src/streamMsg.c @@ -365,6 +365,7 @@ int32_t tEncodeStreamHbMsg(SEncoder* pEncoder, const SStreamHbMsg* pReq) { if (tEncodeI32(pEncoder, *pVgId) < 0) return -1; } + if (tEncodeI32(pEncoder, pReq->msgId) < 0) return -1; tEndEncode(pEncoder); return pEncoder->pos; } @@ -424,6 +425,7 @@ int32_t tDecodeStreamHbMsg(SDecoder* pDecoder, SStreamHbMsg* pReq) { taosArrayPush(pReq->pUpdateNodes, &vgId); } + if (tDecodeI32(pDecoder, &pReq->msgId) < 0) return -1; tEndDecode(pDecoder); return 0; } @@ -434,12 +436,16 @@ void tCleanupStreamHbMsg(SStreamHbMsg* pMsg) { } if (pMsg->pUpdateNodes != NULL) { - taosArrayDestroy(pMsg->pUpdateNodes); + pMsg->pUpdateNodes = taosArrayDestroy(pMsg->pUpdateNodes); } if (pMsg->pTaskStatus != NULL) { - taosArrayDestroy(pMsg->pTaskStatus); + pMsg->pTaskStatus = taosArrayDestroy(pMsg->pTaskStatus); } + + pMsg->msgId = -1; + pMsg->vgId = -1; + pMsg->numOfTasks = -1; } int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) {