fix(stream): validate the stream hb msg, and discard the invalid hb msg.

This commit is contained in:
Haojun Liao 2024-07-01 13:36:06 +08:00
parent f09be802ea
commit 8144df956a
3 changed files with 60 additions and 26 deletions

View File

@ -828,8 +828,6 @@ TEST(clientCase, projection_query_tables) {
// printf("error in create db, reason:%s\n", taos_errstr(pRes));
// }
// taos_free_result(pRes);
TAOS_RES* pRes = NULL;
pRes= taos_query(pConn, "use abc1");
taos_free_result(pRes);

View File

@ -151,9 +151,9 @@ void startRsync() {
// start rsync service to backup checkpoint
code = system(cmd);
if (code != 0) {
uError("[rsync] start server failed, code:%d," ERRNO_ERR_FORMAT, code, ERRNO_ERR_DATA);
uError("[rsync] cmd:%s start server failed, code:%d," ERRNO_ERR_FORMAT, cmd, code, ERRNO_ERR_DATA);
} else {
uDebug("[rsync] start server successful");
uInfo("[rsync] cmd:%s start server successful", cmd);
}
}

View File

@ -22,9 +22,18 @@ typedef struct SFailedCheckpointInfo {
int32_t transId;
} SFailedCheckpointInfo;
static void mndStreamStartUpdateCheckpointInfo(SMnode *pMnode);
static void mndStreamStartUpdateCheckpointInfo(SMnode *pMnode);
static void updateStageInfo(STaskStatusEntry *pTaskEntry, int64_t stage);
static void addIntoCheckpointList(SArray *pList, const SFailedCheckpointInfo *pInfo);
static int32_t mndResetStatusFromCheckpoint(SMnode *pMnode, int64_t streamId, int32_t transId);
static int32_t setNodeEpsetExpiredFlag(const SArray *pNodeList);
static int32_t mndDropOrphanTasks(SMnode *pMnode, SArray *pList);
static int32_t suspendAllStreams(SMnode *pMnode, SRpcHandleInfo *info);
static bool validateHbMsg(const SArray *pNodeList, int32_t vgId);
static void cleanupAfterProcessHbMsg(SStreamHbMsg *pReq, SArray *pFailedChkptList, SArray *pOrphanTasks);
static void doSendHbMsgRsp(int32_t code, SRpcHandleInfo *pRpcInfo, int32_t vgId, int32_t msgId);
static void updateStageInfo(STaskStatusEntry *pTaskEntry, int64_t stage) {
void updateStageInfo(STaskStatusEntry *pTaskEntry, int64_t stage) {
int32_t numOfNodes = taosArrayGetSize(execInfo.pNodeList);
for (int32_t j = 0; j < numOfNodes; ++j) {
SNodeEntry *pNodeEntry = taosArrayGet(execInfo.pNodeList, j);
@ -39,7 +48,7 @@ static void updateStageInfo(STaskStatusEntry *pTaskEntry, int64_t stage) {
}
}
static void addIntoCheckpointList(SArray *pList, const SFailedCheckpointInfo *pInfo) {
void addIntoCheckpointList(SArray *pList, const SFailedCheckpointInfo *pInfo) {
int32_t num = taosArrayGetSize(pList);
for (int32_t i = 0; i < num; ++i) {
SFailedCheckpointInfo *p = taosArrayGet(pList, i);
@ -86,7 +95,7 @@ int32_t mndCreateStreamResetStatusTrans(SMnode *pMnode, SStreamObj *pStream) {
return TSDB_CODE_ACTION_IN_PROGRESS;
}
static int32_t mndResetStatusFromCheckpoint(SMnode *pMnode, int64_t streamId, int32_t transId) {
int32_t mndResetStatusFromCheckpoint(SMnode *pMnode, int64_t streamId, int32_t transId) {
int32_t code = TSDB_CODE_SUCCESS;
mndKillTransImpl(pMnode, transId, "");
@ -110,7 +119,7 @@ static int32_t mndResetStatusFromCheckpoint(SMnode *pMnode, int64_t streamId, in
return code;
}
static int32_t setNodeEpsetExpiredFlag(const SArray *pNodeList) {
int32_t setNodeEpsetExpiredFlag(const SArray *pNodeList) {
int32_t num = taosArrayGetSize(pNodeList);
mInfo("set node expired for %d nodes", num);
@ -141,7 +150,7 @@ static int32_t setNodeEpsetExpiredFlag(const SArray *pNodeList) {
return TSDB_CODE_SUCCESS;
}
static int32_t mndDropOrphanTasks(SMnode *pMnode, SArray *pList) {
int32_t mndDropOrphanTasks(SMnode *pMnode, SArray *pList) {
SOrphanTask *pTask = taosArrayGet(pList, 0);
// check if it is conflict with other trans in both sourceDb and targetDb.
@ -246,6 +255,16 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
taosThreadMutexLock(&execInfo.lock);
mndInitStreamExecInfo(pMnode, &execInfo);
if (!validateHbMsg(execInfo.pNodeList, req.vgId)) {
mError("invalid hbMsg from vgId:%d, discarded", req.vgId);
terrno = TSDB_CODE_INVALID_MSG;
doSendHbMsgRsp(terrno, &pReq->info, req.vgId, req.msgId);
taosThreadMutexUnlock(&execInfo.lock);
cleanupAfterProcessHbMsg(&req, pFailedChkpt, pOrphanTasks);
return -1;
}
int32_t numOfUpdated = taosArrayGetSize(req.pUpdateNodes);
if (numOfUpdated > 0) {
@ -336,22 +355,10 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
taosThreadMutexUnlock(&execInfo.lock);
{
SRpcMsg rsp = {.code = 0, .info = pReq->info, .contLen = sizeof(SMStreamHbRspMsg)};
rsp.pCont = rpcMallocCont(rsp.contLen);
SMStreamHbRspMsg* pMsg = rsp.pCont;
pMsg->head.vgId = htonl(req.vgId);
pMsg->msgId = req.msgId;
tmsgSendRsp(&rsp);
pReq->info.handle = NULL; // disable auto rsp
}
tCleanupStreamHbMsg(&req);
taosArrayDestroy(pFailedChkpt);
taosArrayDestroy(pOrphanTasks);
terrno = TSDB_CODE_SUCCESS;
doSendHbMsgRsp(terrno, &pReq->info, req.vgId, req.msgId);
cleanupAfterProcessHbMsg(&req, pFailedChkpt, pOrphanTasks);
return TSDB_CODE_SUCCESS;
}
@ -362,4 +369,33 @@ void mndStreamStartUpdateCheckpointInfo(SMnode *pMnode) { // here reuse the doC
SRpcMsg rpcMsg = {.msgType = TDMT_MND_STREAM_UPDATE_CHKPT_EVT, .pCont = pMsg, .contLen = size};
tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
}
}
}
bool validateHbMsg(const SArray *pNodeList, int32_t vgId) {
for (int32_t i = 0; i < taosArrayGetSize(pNodeList); ++i) {
SNodeEntry *pEntry = taosArrayGet(pNodeList, i);
if (pEntry->nodeId == vgId) {
return true;
}
}
return false;
}
void cleanupAfterProcessHbMsg(SStreamHbMsg *pReq, SArray *pFailedChkptList, SArray *pOrphanTasks) {
tCleanupStreamHbMsg(pReq);
taosArrayDestroy(pFailedChkptList);
taosArrayDestroy(pOrphanTasks);
}
void doSendHbMsgRsp(int32_t code, SRpcHandleInfo *pRpcInfo, int32_t vgId, int32_t msgId) {
SRpcMsg rsp = {.code = code, .info = *pRpcInfo, .contLen = sizeof(SMStreamHbRspMsg)};
rsp.pCont = rpcMallocCont(rsp.contLen);
SMStreamHbRspMsg *pMsg = rsp.pCont;
pMsg->head.vgId = htonl(vgId);
pMsg->msgId = msgId;
tmsgSendRsp(&rsp);
pRpcInfo->handle = NULL; // disable auto rsp
}