Merge pull request #29105 from taosdata/fix/3_liaohj

enh: update the mnode epset in stream hb rsp.
This commit is contained in:
Shengliang Guan 2024-12-16 09:23:38 +08:00 committed by GitHub
commit 6b49eededd
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 98 additions and 28 deletions

View File

@ -188,6 +188,7 @@ void tCleanupStreamHbMsg(SStreamHbMsg* pMsg);
typedef struct { typedef struct {
SMsgHead head; SMsgHead head;
int32_t msgId; int32_t msgId;
SEpSet mndEpset;
} SMStreamHbRspMsg; } SMStreamHbRspMsg;
int32_t tEncodeStreamHbRsp(SEncoder* pEncoder, const SMStreamHbRspMsg* pRsp); int32_t tEncodeStreamHbRsp(SEncoder* pEncoder, const SMStreamHbRspMsg* pRsp);

View File

@ -651,6 +651,7 @@ int32_t tEncodeStreamHbRsp(SEncoder* pEncoder, const SMStreamHbRspMsg* pRsp) {
TAOS_CHECK_EXIT(tStartEncode(pEncoder)); TAOS_CHECK_EXIT(tStartEncode(pEncoder));
TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pRsp->msgId)); TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pRsp->msgId));
TAOS_CHECK_EXIT(tEncodeSEpSet(pEncoder, &pRsp->mndEpset));
tEndEncode(pEncoder); tEndEncode(pEncoder);
_exit: _exit:
@ -663,6 +664,7 @@ int32_t tDecodeStreamHbRsp(SDecoder* pDecoder, SMStreamHbRspMsg* pRsp) {
TAOS_CHECK_EXIT(tStartDecode(pDecoder)); TAOS_CHECK_EXIT(tStartDecode(pDecoder));
TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pRsp->msgId)); TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pRsp->msgId));
TAOS_CHECK_EXIT(tDecodeSEpSet(pDecoder, &pRsp->mndEpset));
tEndDecode(pDecoder); tEndDecode(pDecoder);
_exit: _exit:

View File

@ -248,6 +248,10 @@ bool mndIsMnode(SMnode *pMnode, int32_t dnodeId) {
} }
void mndGetMnodeEpSet(SMnode *pMnode, SEpSet *pEpSet) { void mndGetMnodeEpSet(SMnode *pMnode, SEpSet *pEpSet) {
if (pMnode == NULL || pEpSet == NULL) {
return;
}
SSdb *pSdb = pMnode->pSdb; SSdb *pSdb = pMnode->pSdb;
int32_t totalMnodes = sdbGetSize(pSdb, SDB_MNODE); int32_t totalMnodes = sdbGetSize(pSdb, SDB_MNODE);
if (totalMnodes == 0) { if (totalMnodes == 0) {

View File

@ -2343,7 +2343,7 @@ int32_t mndProcessStreamReqCheckpoint(SRpcMsg *pReq) {
} }
int32_t total = taosArrayGetSize(*pReqTaskList); int32_t total = taosArrayGetSize(*pReqTaskList);
if (total == numOfTasks) { // all tasks has send the reqs if (total == numOfTasks) { // all tasks have sent the reqs
int64_t checkpointId = mndStreamGenChkptId(pMnode, false); int64_t checkpointId = mndStreamGenChkptId(pMnode, false);
mInfo("stream:0x%" PRIx64 " all tasks req checkpoint, start checkpointId:%" PRId64, req.streamId, checkpointId); mInfo("stream:0x%" PRIx64 " all tasks req checkpoint, start checkpointId:%" PRId64, req.streamId, checkpointId);

View File

@ -15,6 +15,8 @@
#include "mndStream.h" #include "mndStream.h"
#include "mndTrans.h" #include "mndTrans.h"
#include "mndMnode.h"
#include "tmisce.h"
typedef struct SFailedCheckpointInfo { typedef struct SFailedCheckpointInfo {
int64_t streamUid; int64_t streamUid;
@ -31,7 +33,7 @@ static int32_t setNodeEpsetExpiredFlag(const SArray *pNodeList);
static int32_t suspendAllStreams(SMnode *pMnode, SRpcHandleInfo *info); static int32_t suspendAllStreams(SMnode *pMnode, SRpcHandleInfo *info);
static bool validateHbMsg(const SArray *pNodeList, int32_t vgId); static bool validateHbMsg(const SArray *pNodeList, int32_t vgId);
static void cleanupAfterProcessHbMsg(SStreamHbMsg *pReq, SArray *pFailedChkptList, SArray *pOrphanTasks); static void cleanupAfterProcessHbMsg(SStreamHbMsg *pReq, SArray *pFailedChkptList, SArray *pOrphanTasks);
static void doSendHbMsgRsp(int32_t code, SRpcHandleInfo *pRpcInfo, int32_t vgId, int32_t msgId); static void doSendHbMsgRsp(int32_t code, SRpcHandleInfo *pRpcInfo, SEpSet* pEpset, int32_t vgId, int32_t msgId);
static void checkforOrphanTask(SMnode* pMnode, STaskStatusEntry* p, SArray* pOrphanTasks); static void checkforOrphanTask(SMnode* pMnode, STaskStatusEntry* p, SArray* pOrphanTasks);
void updateStageInfo(STaskStatusEntry *pTaskEntry, int64_t stage) { void updateStageInfo(STaskStatusEntry *pTaskEntry, int64_t stage) {
@ -329,6 +331,8 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
SArray *pFailedChkpt = NULL; SArray *pFailedChkpt = NULL;
SArray *pOrphanTasks = NULL; SArray *pOrphanTasks = NULL;
int32_t code = 0; int32_t code = 0;
SDecoder decoder = {0};
SEpSet mnodeEpset = {0};
if ((code = grantCheckExpire(TSDB_GRANT_STREAMS)) < 0) { if ((code = grantCheckExpire(TSDB_GRANT_STREAMS)) < 0) {
if (suspendAllStreams(pMnode, &pReq->info) < 0) { if (suspendAllStreams(pMnode, &pReq->info) < 0) {
@ -336,7 +340,6 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
} }
} }
SDecoder decoder = {0};
tDecoderInit(&decoder, pReq->pCont, pReq->contLen); tDecoderInit(&decoder, pReq->pCont, pReq->contLen);
if (tDecodeStreamHbMsg(&decoder, &req) < 0) { if (tDecodeStreamHbMsg(&decoder, &req) < 0) {
@ -357,13 +360,15 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
TAOS_RETURN(terrno); TAOS_RETURN(terrno);
} }
mndGetMnodeEpSet(pMnode, &mnodeEpset);
streamMutexLock(&execInfo.lock); streamMutexLock(&execInfo.lock);
mndInitStreamExecInfo(pMnode, &execInfo); mndInitStreamExecInfo(pMnode, &execInfo);
if (!validateHbMsg(execInfo.pNodeList, req.vgId)) { if (!validateHbMsg(execInfo.pNodeList, req.vgId)) {
mError("vgId:%d not exists in nodeList buf, discarded", req.vgId); mError("vgId:%d not exists in nodeList buf, discarded", req.vgId);
doSendHbMsgRsp(terrno, &pReq->info, req.vgId, req.msgId); doSendHbMsgRsp(terrno, &pReq->info, &mnodeEpset, req.vgId, req.msgId);
streamMutexUnlock(&execInfo.lock); streamMutexUnlock(&execInfo.lock);
cleanupAfterProcessHbMsg(&req, pFailedChkpt, pOrphanTasks); cleanupAfterProcessHbMsg(&req, pFailedChkpt, pOrphanTasks);
@ -383,9 +388,9 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
if ((pEntry->lastHbMsgId == req.msgId) && (pEntry->lastHbMsgTs == req.ts)) { if ((pEntry->lastHbMsgId == req.msgId) && (pEntry->lastHbMsgTs == req.ts)) {
mError("vgId:%d HbMsgId:%d already handled, bh msg discard, and send HbRsp", pEntry->nodeId, req.msgId); mError("vgId:%d HbMsgId:%d already handled, bh msg discard, and send HbRsp", pEntry->nodeId, req.msgId);
// return directly and after the vnode to continue to send the next HbMsg. // return directly and allow the vnode to continue to send the next HbMsg.
terrno = TSDB_CODE_SUCCESS; terrno = TSDB_CODE_SUCCESS;
doSendHbMsgRsp(terrno, &pReq->info, req.vgId, req.msgId); doSendHbMsgRsp(terrno, &pReq->info, &mnodeEpset, req.vgId, req.msgId);
streamMutexUnlock(&execInfo.lock); streamMutexUnlock(&execInfo.lock);
cleanupAfterProcessHbMsg(&req, pFailedChkpt, pOrphanTasks); cleanupAfterProcessHbMsg(&req, pFailedChkpt, pOrphanTasks);
@ -529,7 +534,7 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
streamMutexUnlock(&execInfo.lock); streamMutexUnlock(&execInfo.lock);
doSendHbMsgRsp(TSDB_CODE_SUCCESS, &pReq->info, req.vgId, req.msgId); doSendHbMsgRsp(TSDB_CODE_SUCCESS, &pReq->info, &mnodeEpset, req.vgId, req.msgId);
cleanupAfterProcessHbMsg(&req, pFailedChkpt, pOrphanTasks); cleanupAfterProcessHbMsg(&req, pFailedChkpt, pOrphanTasks);
return code; return code;
@ -552,12 +557,13 @@ void cleanupAfterProcessHbMsg(SStreamHbMsg *pReq, SArray *pFailedChkptList, SArr
taosArrayDestroy(pOrphanTasks); taosArrayDestroy(pOrphanTasks);
} }
void doSendHbMsgRsp(int32_t code, SRpcHandleInfo *pRpcInfo, int32_t vgId, int32_t msgId) { void doSendHbMsgRsp(int32_t code, SRpcHandleInfo *pRpcInfo, SEpSet* pMndEpset, int32_t vgId, int32_t msgId) {
int32_t ret = 0; int32_t ret = 0;
int32_t tlen = 0; int32_t tlen = 0;
void *buf = NULL; void *buf = NULL;
const SMStreamHbRspMsg msg = {.msgId = msgId}; SMStreamHbRspMsg msg = {.msgId = msgId};//, .mndEpset = *pMndEpset};
epsetAssign(&msg.mndEpset, pMndEpset);
tEncodeSize(tEncodeStreamHbRsp, &msg, tlen, ret); tEncodeSize(tEncodeStreamHbRsp, &msg, tlen, ret);
if (ret < 0) { if (ret < 0) {

View File

@ -127,6 +127,67 @@ static int32_t doSendHbMsgInfo(SStreamHbMsg* pMsg, SStreamMeta* pMeta, SEpSet* p
return tmsgSendReq(pEpset, &msg); return tmsgSendReq(pEpset, &msg);
} }
static int32_t streamTaskGetMndEpset(SStreamMeta* pMeta, SEpSet* pEpSet) {
int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta);
for (int32_t i = 0; i < numOfTasks; ++i) {
SStreamTaskId* pId = taosArrayGet(pMeta->pTaskList, i);
STaskId id = {.streamId = pId->streamId, .taskId = pId->taskId};
SStreamTask* pTask = NULL;
int32_t code = streamMetaAcquireTaskUnsafe(pMeta, &id, &pTask);
if (code != 0) {
continue;
}
if (pTask->info.fillHistory == 1) {
streamMetaReleaseTask(pMeta, pTask);
continue;
}
epsetAssign(pEpSet, &pTask->info.mnodeEpset);
streamMetaReleaseTask(pMeta, pTask);
return TSDB_CODE_SUCCESS;
}
return TSDB_CODE_FAILED;
}
static void streamTaskUpdateMndEpset(SStreamMeta* pMeta, SEpSet* pEpSet) {
int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta);
for (int32_t i = 0; i < numOfTasks; ++i) {
SStreamTaskId* pId = taosArrayGet(pMeta->pTaskList, i);
STaskId id = {.streamId = pId->streamId, .taskId = pId->taskId};
SStreamTask* pTask = NULL;
int32_t code = streamMetaAcquireTaskUnsafe(pMeta, &id, &pTask);
if (code != 0) {
stError("vgId:%d s-task:0x%x failed to acquire it for updating mnode epset, code:%s", pMeta->vgId, pId->taskId,
tstrerror(code));
continue;
}
// ignore this error since it is only for log file
char buf[256] = {0};
int32_t ret = epsetToStr(&pTask->info.mnodeEpset, buf, tListLen(buf));
if (ret != 0) { // print error and continue
stError("failed to convert epset to str, code:%s", tstrerror(ret));
}
char newBuf[256] = {0};
ret = epsetToStr(pEpSet, newBuf, tListLen(newBuf));
if (ret != 0) {
stError("failed to convert epset to str, code:%s", tstrerror(ret));
}
epsetAssign(&pTask->info.mnodeEpset, pEpSet);
stInfo("s-task:0x%x update mnd epset, from %s to %s", pId->taskId, buf, newBuf);
streamMetaReleaseTask(pMeta, pTask);
}
stDebug("vgId:%d update mnd epset for %d tasks completed", pMeta->vgId, numOfTasks);
}
// NOTE: this task should be executed within the SStreamMeta lock region. // NOTE: this task should be executed within the SStreamMeta lock region.
int32_t streamMetaSendHbHelper(SStreamMeta* pMeta) { int32_t streamMetaSendHbHelper(SStreamMeta* pMeta) {
SEpSet epset = {0}; SEpSet epset = {0};
@ -140,24 +201,11 @@ int32_t streamMetaSendHbHelper(SStreamMeta* pMeta) {
stDebug("vgId:%d hbMsg rsp not recv, send current hbMsg, msgId:%d, total:%d again", pMeta->vgId, pInfo->hbMsg.msgId, stDebug("vgId:%d hbMsg rsp not recv, send current hbMsg, msgId:%d, total:%d again", pMeta->vgId, pInfo->hbMsg.msgId,
pInfo->hbCount); pInfo->hbCount);
for(int32_t i = 0; i < numOfTasks; ++i) { code = streamTaskGetMndEpset(pMeta, &epset);
SStreamTaskId* pId = taosArrayGet(pMeta->pTaskList, i); if (code != 0) {
STaskId id = {.streamId = pId->streamId, .taskId = pId->taskId}; stError("vgId:%d failed to get the mnode epset, not retrying sending hbMsg, msgId:%d", pMeta->vgId,
SStreamTask* pTask = NULL; pInfo->hbMsg.msgId);
return code;
code = streamMetaAcquireTaskUnsafe(pMeta, &id, &pTask);
if (code != 0) {
continue;
}
if (pTask->info.fillHistory == 1) {
streamMetaReleaseTask(pMeta, pTask);
continue;
}
epsetAssign(&epset, &pTask->info.mnodeEpset);
streamMetaReleaseTask(pMeta, pTask);
break;
} }
pInfo->msgSendTs = taosGetTimestampMs(); pInfo->msgSendTs = taosGetTimestampMs();
@ -384,9 +432,11 @@ void streamMetaGetHbSendInfo(SMetaHbInfo* pInfo, int64_t* pStartTs, int32_t* pSe
} }
int32_t streamProcessHeartbeatRsp(SStreamMeta* pMeta, SMStreamHbRspMsg* pRsp) { int32_t streamProcessHeartbeatRsp(SStreamMeta* pMeta, SMStreamHbRspMsg* pRsp) {
stDebug("vgId:%d process hbMsg rsp, msgId:%d rsp confirmed", pMeta->vgId, pRsp->msgId);
SMetaHbInfo* pInfo = pMeta->pHbInfo; SMetaHbInfo* pInfo = pMeta->pHbInfo;
SEpSet epset = {0};
int32_t code = 0;
stDebug("vgId:%d process hbMsg rsp, msgId:%d rsp confirmed", pMeta->vgId, pRsp->msgId);
streamMetaWLock(pMeta); streamMetaWLock(pMeta);
// current waiting rsp recved // current waiting rsp recved
@ -396,6 +446,13 @@ int32_t streamProcessHeartbeatRsp(SStreamMeta* pMeta, SMStreamHbRspMsg* pRsp) {
pInfo->hbCount += 1; pInfo->hbCount += 1;
pInfo->msgSendTs = -1; pInfo->msgSendTs = -1;
code = streamTaskGetMndEpset(pMeta, &epset);
if (!isEpsetEqual(&pRsp->mndEpset, &epset) && (code == 0)) {
// we need to update the mnode epset for each tasks
stInfo("vgId:%d mnode epset updated, update mnode epset for all tasks", pMeta->vgId);
streamTaskUpdateMndEpset(pMeta, &pRsp->mndEpset);
}
} else { } else {
stWarn("vgId:%d recv expired hb rsp, msgId:%d, discarded", pMeta->vgId, pRsp->msgId); stWarn("vgId:%d recv expired hb rsp, msgId:%d, discarded", pMeta->vgId, pRsp->msgId);
} }