From 8144df956a2d6e45efdbf522fdbc9fad87193087 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 1 Jul 2024 13:36:06 +0800 Subject: [PATCH] fix(stream): validate the stream hb msg, and discard the invalid hb msg. --- source/client/test/clientTests.cpp | 2 - source/common/src/rsync.c | 4 +- source/dnode/mnode/impl/src/mndStreamHb.c | 80 ++++++++++++++++------- 3 files changed, 60 insertions(+), 26 deletions(-) diff --git a/source/client/test/clientTests.cpp b/source/client/test/clientTests.cpp index 60b36b3461..836a171ad7 100644 --- a/source/client/test/clientTests.cpp +++ b/source/client/test/clientTests.cpp @@ -828,8 +828,6 @@ TEST(clientCase, projection_query_tables) { // printf("error in create db, reason:%s\n", taos_errstr(pRes)); // } // taos_free_result(pRes); - TAOS_RES* pRes = NULL; - pRes= taos_query(pConn, "use abc1"); taos_free_result(pRes); diff --git a/source/common/src/rsync.c b/source/common/src/rsync.c index ac94625f8e..d0b10b7f41 100644 --- a/source/common/src/rsync.c +++ b/source/common/src/rsync.c @@ -151,9 +151,9 @@ void startRsync() { // start rsync service to backup checkpoint code = system(cmd); if (code != 0) { - uError("[rsync] start server failed, code:%d," ERRNO_ERR_FORMAT, code, ERRNO_ERR_DATA); + uError("[rsync] cmd:%s start server failed, code:%d," ERRNO_ERR_FORMAT, cmd, code, ERRNO_ERR_DATA); } else { - uDebug("[rsync] start server successful"); + uInfo("[rsync] cmd:%s start server successful", cmd); } } diff --git a/source/dnode/mnode/impl/src/mndStreamHb.c b/source/dnode/mnode/impl/src/mndStreamHb.c index f379d7d7ab..6cfba40ee7 100644 --- a/source/dnode/mnode/impl/src/mndStreamHb.c +++ b/source/dnode/mnode/impl/src/mndStreamHb.c @@ -22,9 +22,18 @@ typedef struct SFailedCheckpointInfo { int32_t transId; } SFailedCheckpointInfo; -static void mndStreamStartUpdateCheckpointInfo(SMnode *pMnode); +static void mndStreamStartUpdateCheckpointInfo(SMnode *pMnode); +static void updateStageInfo(STaskStatusEntry *pTaskEntry, int64_t stage); +static void addIntoCheckpointList(SArray *pList, const SFailedCheckpointInfo *pInfo); +static int32_t mndResetStatusFromCheckpoint(SMnode *pMnode, int64_t streamId, int32_t transId); +static int32_t setNodeEpsetExpiredFlag(const SArray *pNodeList); +static int32_t mndDropOrphanTasks(SMnode *pMnode, SArray *pList); +static int32_t suspendAllStreams(SMnode *pMnode, SRpcHandleInfo *info); +static bool validateHbMsg(const SArray *pNodeList, int32_t vgId); +static void cleanupAfterProcessHbMsg(SStreamHbMsg *pReq, SArray *pFailedChkptList, SArray *pOrphanTasks); +static void doSendHbMsgRsp(int32_t code, SRpcHandleInfo *pRpcInfo, int32_t vgId, int32_t msgId); -static void updateStageInfo(STaskStatusEntry *pTaskEntry, int64_t stage) { +void updateStageInfo(STaskStatusEntry *pTaskEntry, int64_t stage) { int32_t numOfNodes = taosArrayGetSize(execInfo.pNodeList); for (int32_t j = 0; j < numOfNodes; ++j) { SNodeEntry *pNodeEntry = taosArrayGet(execInfo.pNodeList, j); @@ -39,7 +48,7 @@ static void updateStageInfo(STaskStatusEntry *pTaskEntry, int64_t stage) { } } -static void addIntoCheckpointList(SArray *pList, const SFailedCheckpointInfo *pInfo) { +void addIntoCheckpointList(SArray *pList, const SFailedCheckpointInfo *pInfo) { int32_t num = taosArrayGetSize(pList); for (int32_t i = 0; i < num; ++i) { SFailedCheckpointInfo *p = taosArrayGet(pList, i); @@ -86,7 +95,7 @@ int32_t mndCreateStreamResetStatusTrans(SMnode *pMnode, SStreamObj *pStream) { return TSDB_CODE_ACTION_IN_PROGRESS; } -static int32_t mndResetStatusFromCheckpoint(SMnode *pMnode, int64_t streamId, int32_t transId) { +int32_t mndResetStatusFromCheckpoint(SMnode *pMnode, int64_t streamId, int32_t transId) { int32_t code = TSDB_CODE_SUCCESS; mndKillTransImpl(pMnode, transId, ""); @@ -110,7 +119,7 @@ static int32_t mndResetStatusFromCheckpoint(SMnode *pMnode, int64_t streamId, in return code; } -static int32_t setNodeEpsetExpiredFlag(const SArray *pNodeList) { +int32_t setNodeEpsetExpiredFlag(const SArray *pNodeList) { int32_t num = taosArrayGetSize(pNodeList); mInfo("set node expired for %d nodes", num); @@ -141,7 +150,7 @@ static int32_t setNodeEpsetExpiredFlag(const SArray *pNodeList) { return TSDB_CODE_SUCCESS; } -static int32_t mndDropOrphanTasks(SMnode *pMnode, SArray *pList) { +int32_t mndDropOrphanTasks(SMnode *pMnode, SArray *pList) { SOrphanTask *pTask = taosArrayGet(pList, 0); // check if it is conflict with other trans in both sourceDb and targetDb. @@ -246,6 +255,16 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { taosThreadMutexLock(&execInfo.lock); mndInitStreamExecInfo(pMnode, &execInfo); + if (!validateHbMsg(execInfo.pNodeList, req.vgId)) { + mError("invalid hbMsg from vgId:%d, discarded", req.vgId); + + terrno = TSDB_CODE_INVALID_MSG; + doSendHbMsgRsp(terrno, &pReq->info, req.vgId, req.msgId); + + taosThreadMutexUnlock(&execInfo.lock); + cleanupAfterProcessHbMsg(&req, pFailedChkpt, pOrphanTasks); + return -1; + } int32_t numOfUpdated = taosArrayGetSize(req.pUpdateNodes); if (numOfUpdated > 0) { @@ -336,22 +355,10 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { taosThreadMutexUnlock(&execInfo.lock); - { - SRpcMsg rsp = {.code = 0, .info = pReq->info, .contLen = sizeof(SMStreamHbRspMsg)}; - rsp.pCont = rpcMallocCont(rsp.contLen); - - SMStreamHbRspMsg* pMsg = rsp.pCont; - pMsg->head.vgId = htonl(req.vgId); - pMsg->msgId = req.msgId; - - tmsgSendRsp(&rsp); - pReq->info.handle = NULL; // disable auto rsp - } - - tCleanupStreamHbMsg(&req); - taosArrayDestroy(pFailedChkpt); - taosArrayDestroy(pOrphanTasks); + terrno = TSDB_CODE_SUCCESS; + doSendHbMsgRsp(terrno, &pReq->info, req.vgId, req.msgId); + cleanupAfterProcessHbMsg(&req, pFailedChkpt, pOrphanTasks); return TSDB_CODE_SUCCESS; } @@ -362,4 +369,33 @@ void mndStreamStartUpdateCheckpointInfo(SMnode *pMnode) { // here reuse the doC SRpcMsg rpcMsg = {.msgType = TDMT_MND_STREAM_UPDATE_CHKPT_EVT, .pCont = pMsg, .contLen = size}; tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg); } -} \ No newline at end of file +} + +bool validateHbMsg(const SArray *pNodeList, int32_t vgId) { + for (int32_t i = 0; i < taosArrayGetSize(pNodeList); ++i) { + SNodeEntry *pEntry = taosArrayGet(pNodeList, i); + if (pEntry->nodeId == vgId) { + return true; + } + } + + return false; +} + +void cleanupAfterProcessHbMsg(SStreamHbMsg *pReq, SArray *pFailedChkptList, SArray *pOrphanTasks) { + tCleanupStreamHbMsg(pReq); + taosArrayDestroy(pFailedChkptList); + taosArrayDestroy(pOrphanTasks); +} + +void doSendHbMsgRsp(int32_t code, SRpcHandleInfo *pRpcInfo, int32_t vgId, int32_t msgId) { + SRpcMsg rsp = {.code = code, .info = *pRpcInfo, .contLen = sizeof(SMStreamHbRspMsg)}; + rsp.pCont = rpcMallocCont(rsp.contLen); + + SMStreamHbRspMsg *pMsg = rsp.pCont; + pMsg->head.vgId = htonl(vgId); + pMsg->msgId = msgId; + + tmsgSendRsp(&rsp); + pRpcInfo->handle = NULL; // disable auto rsp +}