refactor(stream): update mnode epset in hb.

This commit is contained in:
Haojun Liao 2024-12-11 17:25:01 +08:00
parent 6bbfc1aead
commit cfcb85ef94
4 changed files with 92 additions and 27 deletions

View File

@ -188,6 +188,7 @@ void tCleanupStreamHbMsg(SStreamHbMsg* pMsg);
typedef struct {
SMsgHead head;
int32_t msgId;
SEpSet mndEpset;
} SMStreamHbRspMsg;
int32_t tEncodeStreamHbRsp(SEncoder* pEncoder, const SMStreamHbRspMsg* pRsp);

View File

@ -651,6 +651,7 @@ int32_t tEncodeStreamHbRsp(SEncoder* pEncoder, const SMStreamHbRspMsg* pRsp) {
TAOS_CHECK_EXIT(tStartEncode(pEncoder));
TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pRsp->msgId));
TAOS_CHECK_EXIT(tEncodeSEpSet(pEncoder, &pRsp->mndEpset));
tEndEncode(pEncoder);
_exit:
@ -663,6 +664,7 @@ int32_t tDecodeStreamHbRsp(SDecoder* pDecoder, SMStreamHbRspMsg* pRsp) {
TAOS_CHECK_EXIT(tStartDecode(pDecoder));
TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pRsp->msgId));
TAOS_CHECK_EXIT(tDecodeSEpSet(pDecoder, &pRsp->mndEpset));
tEndDecode(pDecoder);
_exit:

View File

@ -15,6 +15,8 @@
#include "mndStream.h"
#include "mndTrans.h"
#include "mndMnode.h"
#include "tmisce.h"
typedef struct SFailedCheckpointInfo {
int64_t streamUid;
@ -31,7 +33,7 @@ static int32_t setNodeEpsetExpiredFlag(const SArray *pNodeList);
static int32_t suspendAllStreams(SMnode *pMnode, SRpcHandleInfo *info);
static bool validateHbMsg(const SArray *pNodeList, int32_t vgId);
static void cleanupAfterProcessHbMsg(SStreamHbMsg *pReq, SArray *pFailedChkptList, SArray *pOrphanTasks);
static void doSendHbMsgRsp(int32_t code, SRpcHandleInfo *pRpcInfo, int32_t vgId, int32_t msgId);
static void doSendHbMsgRsp(int32_t code, SRpcHandleInfo *pRpcInfo, SEpSet* pEpset, int32_t vgId, int32_t msgId);
static void checkforOrphanTask(SMnode* pMnode, STaskStatusEntry* p, SArray* pOrphanTasks);
void updateStageInfo(STaskStatusEntry *pTaskEntry, int64_t stage) {
@ -329,6 +331,8 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
SArray *pFailedChkpt = NULL;
SArray *pOrphanTasks = NULL;
int32_t code = 0;
SDecoder decoder = {0};
SEpSet mnodeEpset = {0};
if ((code = grantCheckExpire(TSDB_GRANT_STREAMS)) < 0) {
if (suspendAllStreams(pMnode, &pReq->info) < 0) {
@ -336,7 +340,6 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
}
}
SDecoder decoder = {0};
tDecoderInit(&decoder, pReq->pCont, pReq->contLen);
if (tDecodeStreamHbMsg(&decoder, &req) < 0) {
@ -357,13 +360,15 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
TAOS_RETURN(terrno);
}
mndGetMnodeEpSet(pMnode, &mnodeEpset);
streamMutexLock(&execInfo.lock);
mndInitStreamExecInfo(pMnode, &execInfo);
if (!validateHbMsg(execInfo.pNodeList, req.vgId)) {
mError("vgId:%d not exists in nodeList buf, discarded", req.vgId);
doSendHbMsgRsp(terrno, &pReq->info, req.vgId, req.msgId);
doSendHbMsgRsp(terrno, &pReq->info, &mnodeEpset, req.vgId, req.msgId);
streamMutexUnlock(&execInfo.lock);
cleanupAfterProcessHbMsg(&req, pFailedChkpt, pOrphanTasks);
@ -383,9 +388,9 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
if ((pEntry->lastHbMsgId == req.msgId) && (pEntry->lastHbMsgTs == req.ts)) {
mError("vgId:%d HbMsgId:%d already handled, bh msg discard, and send HbRsp", pEntry->nodeId, req.msgId);
// return directly and after the vnode to continue to send the next HbMsg.
// return directly and allow the vnode to continue to send the next HbMsg.
terrno = TSDB_CODE_SUCCESS;
doSendHbMsgRsp(terrno, &pReq->info, req.vgId, req.msgId);
doSendHbMsgRsp(terrno, &pReq->info, &mnodeEpset, req.vgId, req.msgId);
streamMutexUnlock(&execInfo.lock);
cleanupAfterProcessHbMsg(&req, pFailedChkpt, pOrphanTasks);
@ -529,7 +534,7 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
streamMutexUnlock(&execInfo.lock);
doSendHbMsgRsp(TSDB_CODE_SUCCESS, &pReq->info, req.vgId, req.msgId);
doSendHbMsgRsp(TSDB_CODE_SUCCESS, &pReq->info, &mnodeEpset, req.vgId, req.msgId);
cleanupAfterProcessHbMsg(&req, pFailedChkpt, pOrphanTasks);
return code;
@ -552,12 +557,13 @@ void cleanupAfterProcessHbMsg(SStreamHbMsg *pReq, SArray *pFailedChkptList, SArr
taosArrayDestroy(pOrphanTasks);
}
void doSendHbMsgRsp(int32_t code, SRpcHandleInfo *pRpcInfo, int32_t vgId, int32_t msgId) {
void doSendHbMsgRsp(int32_t code, SRpcHandleInfo *pRpcInfo, SEpSet* pMndEpset, int32_t vgId, int32_t msgId) {
int32_t ret = 0;
int32_t tlen = 0;
void *buf = NULL;
const SMStreamHbRspMsg msg = {.msgId = msgId};
SMStreamHbRspMsg msg = {.msgId = msgId};//, .mndEpset = *pMndEpset};
epsetAssign(&msg.mndEpset, pMndEpset);
tEncodeSize(tEncodeStreamHbRsp, &msg, tlen, ret);
if (ret < 0) {

View File

@ -108,6 +108,67 @@ static int32_t doSendHbMsgInfo(SStreamHbMsg* pMsg, SStreamMeta* pMeta, SEpSet* p
return tmsgSendReq(pEpset, &msg);
}
static int32_t streamTaskGetMndEpset(SStreamMeta* pMeta, SEpSet* pEpSet) {
int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta);
for (int32_t i = 0; i < numOfTasks; ++i) {
SStreamTaskId* pId = taosArrayGet(pMeta->pTaskList, i);
STaskId id = {.streamId = pId->streamId, .taskId = pId->taskId};
SStreamTask* pTask = NULL;
int32_t code = streamMetaAcquireTaskUnsafe(pMeta, &id, &pTask);
if (code != 0) {
continue;
}
if (pTask->info.fillHistory == 1) {
streamMetaReleaseTask(pMeta, pTask);
continue;
}
epsetAssign(pEpSet, &pTask->info.mnodeEpset);
streamMetaReleaseTask(pMeta, pTask);
return TSDB_CODE_SUCCESS;
}
return TSDB_CODE_FAILED;
}
static int32_t streamTaskUpdateMndEpset(SStreamMeta* pMeta, SEpSet* pEpSet) {
int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta);
for (int32_t i = 0; i < numOfTasks; ++i) {
SStreamTaskId* pId = taosArrayGet(pMeta->pTaskList, i);
STaskId id = {.streamId = pId->streamId, .taskId = pId->taskId};
SStreamTask* pTask = NULL;
int32_t code = streamMetaAcquireTaskUnsafe(pMeta, &id, &pTask);
if (code != 0) {
stError("vgId:%d s-task:0x%x failed to acquire it for updating mnode epset, code:%s", pMeta->vgId, pId->taskId,
tstrerror(code));
continue;
}
// ignore this error since it is only for log file
char buf[256] = {0};
int32_t ret = epsetToStr(&pTask->info.mnodeEpset, buf, tListLen(buf));
if (ret != 0) { // print error and continue
stError("failed to convert epset to str, code:%s", tstrerror(ret));
}
char newBuf[256] = {0};
ret = epsetToStr(pEpSet, newBuf, tListLen(newBuf));
if (ret != 0) {
stError("failed to convert epset to str, code:%s", tstrerror(ret));
}
epsetAssign(&pTask->info.mnodeEpset, pEpSet);
stInfo("s-task:0x%x update mnd epset, from %s to %s", pId->taskId, buf, newBuf);
streamMetaReleaseTask(pMeta, pTask);
}
return TSDB_CODE_SUCCESS;
}
// NOTE: this task should be executed within the SStreamMeta lock region.
int32_t streamMetaSendHbHelper(SStreamMeta* pMeta) {
SEpSet epset = {0};
@ -121,24 +182,11 @@ int32_t streamMetaSendHbHelper(SStreamMeta* pMeta) {
stDebug("vgId:%d hbMsg rsp not recv, send current hbMsg, msgId:%d, total:%d again", pMeta->vgId, pInfo->hbMsg.msgId,
pInfo->hbCount);
for(int32_t i = 0; i < numOfTasks; ++i) {
SStreamTaskId* pId = taosArrayGet(pMeta->pTaskList, i);
STaskId id = {.streamId = pId->streamId, .taskId = pId->taskId};
SStreamTask* pTask = NULL;
code = streamMetaAcquireTaskUnsafe(pMeta, &id, &pTask);
if (code != 0) {
continue;
}
if (pTask->info.fillHistory == 1) {
streamMetaReleaseTask(pMeta, pTask);
continue;
}
epsetAssign(&epset, &pTask->info.mnodeEpset);
streamMetaReleaseTask(pMeta, pTask);
break;
code = streamTaskGetMndEpset(pMeta, &epset);
if (code != 0) {
stError("vgId:%d failed to get the mnode epset, not retrying sending hbMsg, msgId:%d", pMeta->vgId,
pInfo->hbMsg.msgId);
return code;
}
pInfo->msgSendTs = taosGetTimestampMs();
@ -372,9 +420,10 @@ void streamMetaGetHbSendInfo(SMetaHbInfo* pInfo, int64_t* pStartTs, int32_t* pSe
}
int32_t streamProcessHeartbeatRsp(SStreamMeta* pMeta, SMStreamHbRspMsg* pRsp) {
stDebug("vgId:%d process hbMsg rsp, msgId:%d rsp confirmed", pMeta->vgId, pRsp->msgId);
SMetaHbInfo* pInfo = pMeta->pHbInfo;
SEpSet epset = {0};
stDebug("vgId:%d process hbMsg rsp, msgId:%d rsp confirmed", pMeta->vgId, pRsp->msgId);
streamMetaWLock(pMeta);
// current waiting rsp recved
@ -384,6 +433,13 @@ int32_t streamProcessHeartbeatRsp(SStreamMeta* pMeta, SMStreamHbRspMsg* pRsp) {
pInfo->hbCount += 1;
pInfo->msgSendTs = -1;
streamTaskGetMndEpset(pMeta, &epset);
if (!isEpsetEqual(&pRsp->mndEpset, &epset)) {
// we need to update the mnode epset for each tasks
stInfo("vgId:%d mnode epset updated, update mnode epset for all tasks", pMeta->vgId);
streamTaskUpdateMndEpset(pMeta, &pRsp->mndEpset);
}
} else {
stWarn("vgId:%d recv expired hb rsp, msgId:%d, discarded", pMeta->vgId, pRsp->msgId);
}