fix(stream): enable to re-send hbmsg if mnode failed to recv this hbmsg.

This commit is contained in:
Haojun Liao 2024-06-26 15:21:14 +08:00
parent b6948dcc4e
commit a44e17d5ea
7 changed files with 146 additions and 66 deletions

View File

@ -163,6 +163,7 @@ int32_t tDecodeStreamTaskCheckpointReq(SDecoder* pDecoder, SStreamTaskCheckpoint
typedef struct SStreamHbMsg {
int32_t vgId;
int32_t msgId;
int32_t numOfTasks;
SArray* pTaskStatus; // SArray<STaskStatusEntry>
SArray* pUpdateNodes; // SArray<int32_t>, needs update the epsets in stream tasks for those nodes.
@ -172,6 +173,11 @@ int32_t tEncodeStreamHbMsg(SEncoder* pEncoder, const SStreamHbMsg* pRsp);
int32_t tDecodeStreamHbMsg(SDecoder* pDecoder, SStreamHbMsg* pRsp);
void tCleanupStreamHbMsg(SStreamHbMsg* pMsg);
typedef struct {
SMsgHead head;
int32_t msgId;
} SMStreamHbRspMsg;
typedef struct SRetrieveChkptTriggerReq {
SMsgHead head;
int64_t streamId;

View File

@ -523,7 +523,6 @@ typedef struct STaskUpdateEntry {
} STaskUpdateEntry;
typedef int32_t (*__state_trans_user_fn)(SStreamTask*, void* param);
typedef int32_t (*__stream_task_expand_fn)(struct SStreamTask* pTask);
SStreamTask* tNewStreamTask(int64_t streamId, int8_t taskLevel, SEpSet* pEpset, bool fillHistory, int64_t triggerParam,
SArray* pTaskList, bool hasFillhistory, int8_t subtableWithoutMd5);
@ -791,7 +790,8 @@ int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq);
int32_t streamTaskBroadcastRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq *req);
void streamTaskSendRetrieveRsp(SStreamRetrieveReq *pReq, SRpcMsg* pRsp);
int32_t streamTaskSendLatestCheckpointInfo(SStreamTask* pTask);
int32_t streamProcessHeartbeatRsp(SStreamMeta* pMeta, SMStreamHbRspMsg* pRsp);
#ifdef __cplusplus
}

View File

@ -83,7 +83,7 @@ typedef struct SOrphanTask {
typedef struct {
SMsgHead head;
} SMStreamHbRspMsg, SMStreamReqCheckpointRsp, SMStreamUpdateChkptRsp;
} SMStreamReqCheckpointRsp, SMStreamUpdateChkptRsp;
typedef struct STaskChkptInfo {
int32_t nodeId;

View File

@ -236,7 +236,7 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
}
tDecoderClear(&decoder);
mTrace("receive stream-meta hb from vgId:%d, active numOfTasks:%d", req.vgId, req.numOfTasks);
mTrace("receive stream-meta hb from vgId:%d, active numOfTasks:%d, msgId:%d", req.vgId, req.numOfTasks, req.msgId);
pFailedChkpt = taosArrayInit(4, sizeof(SFailedCheckpointInfo));
pOrphanTasks = taosArrayInit(4, sizeof(SOrphanTask));
@ -333,21 +333,23 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
}
taosThreadMutexUnlock(&execInfo.lock);
tCleanupStreamHbMsg(&req);
taosArrayDestroy(pFailedChkpt);
taosArrayDestroy(pOrphanTasks);
{
SRpcMsg rsp = {.code = 0, .info = pReq->info, .contLen = sizeof(SMStreamHbRspMsg)};
rsp.pCont = rpcMallocCont(rsp.contLen);
SMsgHead* pHead = rsp.pCont;
pHead->vgId = htonl(req.vgId);
SMStreamHbRspMsg* pMsg = rsp.pCont;
pMsg->head.vgId = htonl(req.vgId);
pMsg->msgId = req.msgId;
tmsgSendRsp(&rsp);
pReq->info.handle = NULL; // disable auto rsp
}
tCleanupStreamHbMsg(&req);
taosArrayDestroy(pFailedChkpt);
taosArrayDestroy(pOrphanTasks);
return TSDB_CODE_SUCCESS;
}

View File

@ -1080,7 +1080,9 @@ int32_t doProcessDummyRspMsg(SStreamMeta* UNUSED_PARAM(pMeta), SRpcMsg* pMsg) {
return TSDB_CODE_SUCCESS;
}
int32_t tqStreamProcessStreamHbRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) { return doProcessDummyRspMsg(pMeta, pMsg); }
int32_t tqStreamProcessStreamHbRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) {
return streamProcessHeartbeatRsp(pMeta, pMsg->pCont);
}
int32_t tqStreamProcessReqCheckpointRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) { return doProcessDummyRspMsg(pMeta, pMsg); }

View File

@ -24,11 +24,13 @@
int32_t streamMetaId = 0;
struct SMetaHbInfo {
tmr_h hbTmr;
int32_t stopFlag;
int32_t tickCounter;
int32_t hbCount;
int64_t hbStart;
tmr_h hbTmr;
int32_t stopFlag;
int32_t tickCounter;
int32_t hbCount;
int64_t hbStart;
int64_t msgSendTs;
SStreamHbMsg hbMsg;
};
static bool waitForEnoughDuration(SMetaHbInfo* pInfo) {
@ -61,7 +63,7 @@ static void addUpdateNodeIntoHbMsg(SStreamTask* pTask, SStreamHbMsg* pMsg) {
bool exist = existInHbMsg(pMsg, pTaskEpset);
if (!exist) {
taosArrayPush(pMsg->pUpdateNodes, &pTaskEpset->nodeId);
stDebug("vgId:%d nodeId:%d added into hb update list, total:%d", pMeta->vgId, pTaskEpset->nodeId,
stDebug("vgId:%d nodeId:%d added into hbMsg update list, total:%d", pMeta->vgId, pTaskEpset->nodeId,
(int32_t)taosArrayGetSize(pMsg->pUpdateNodes));
}
}
@ -70,20 +72,91 @@ static void addUpdateNodeIntoHbMsg(SStreamTask* pTask, SStreamHbMsg* pMsg) {
taosThreadMutexUnlock(&pTask->lock);
}
static int32_t doSendHbMsgInfo(SStreamHbMsg* pMsg, SStreamMeta* pMeta, SEpSet* pEpset) {
int32_t code = 0;
int32_t tlen = 0;
tEncodeSize(tEncodeStreamHbMsg, pMsg, tlen, code);
if (code < 0) {
stError("vgId:%d encode stream hb msg failed, code:%s", pMeta->vgId, tstrerror(code));
return TSDB_CODE_FAILED;
}
void* buf = rpcMallocCont(tlen);
if (buf == NULL) {
stError("vgId:%d encode stream hb msg failed, code:%s", pMeta->vgId, tstrerror(TSDB_CODE_OUT_OF_MEMORY));
return TSDB_CODE_FAILED;
}
SEncoder encoder;
tEncoderInit(&encoder, buf, tlen);
if ((code = tEncodeStreamHbMsg(&encoder, pMsg)) < 0) {
rpcFreeCont(buf);
stError("vgId:%d encode stream hb msg failed, code:%s", pMeta->vgId, tstrerror(code));
return TSDB_CODE_FAILED;
}
tEncoderClear(&encoder);
stDebug("vgId:%d send hb to mnode, numOfTasks:%d msgId:%d", pMeta->vgId, pMsg->numOfTasks, pMsg->msgId);
SRpcMsg msg = {0};
initRpcMsg(&msg, TDMT_MND_STREAM_HEARTBEAT, buf, tlen);
tmsgSendReq(pEpset, &msg);
return TSDB_CODE_SUCCESS;
}
// NOTE: this task should be executed within the SStreamMeta lock region.
int32_t streamMetaSendHbHelper(SStreamMeta* pMeta) {
SStreamHbMsg hbMsg = {0};
SEpSet epset = {0};
bool hasMnodeEpset = false;
int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta);
SMetaHbInfo* pInfo = pMeta->pHbInfo;
hbMsg.vgId = pMeta->vgId;
hbMsg.pTaskStatus = taosArrayInit(numOfTasks, sizeof(STaskStatusEntry));
hbMsg.pUpdateNodes = taosArrayInit(numOfTasks, sizeof(int32_t));
// not recv the hb msg rsp yet, send current hb msg again
if (pInfo->msgSendTs > 0) {
stDebug("vgId:%d hbMsg rsp not recv, send current hbMsg, msgId:%d, total:%d again", pMeta->vgId, pInfo->hbMsg.msgId,
pInfo->hbCount);
for(int32_t i = 0; i < numOfTasks; ++i) {
SStreamTaskId* pId = taosArrayGet(pMeta->pTaskList, i);
STaskId id = {.streamId = pId->streamId, .taskId = pId->taskId};
SStreamTask** pTask = taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
if (pTask == NULL) {
continue;
}
if ((*pTask)->info.fillHistory == 1) {
continue;
}
epsetAssign(&epset, &(*pTask)->info.mnodeEpset);
break;
}
pInfo->msgSendTs = taosGetTimestampMs();
doSendHbMsgInfo(&pInfo->hbMsg, pMeta, &epset);
return TSDB_CODE_SUCCESS;
}
SStreamHbMsg* pMsg = &pInfo->hbMsg;
stDebug("vgId:%d build stream hbMsg, leader:%d msgId:%d", pMeta->vgId, (pMeta->role == NODE_ROLE_LEADER),
pMeta->pHbInfo->hbCount);
pMsg->vgId = pMeta->vgId;
pMsg->msgId = pMeta->pHbInfo->hbCount;
pMsg->pTaskStatus = taosArrayInit(numOfTasks, sizeof(STaskStatusEntry));
pMsg->pUpdateNodes = taosArrayInit(numOfTasks, sizeof(int32_t));
if (pMsg->pTaskStatus == NULL || pMsg->pUpdateNodes == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return terrno;
}
for (int32_t i = 0; i < numOfTasks; ++i) {
SStreamTaskId* pId = taosArrayGet(pMeta->pTaskList, i);
STaskId id = {.streamId = pId->streamId, .taskId = pId->taskId};
STaskId id = {.streamId = pId->streamId, .taskId = pId->taskId};
SStreamTask** pTask = taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
if (pTask == NULL) {
continue;
@ -103,12 +176,14 @@ int32_t streamMetaSendHbHelper(SStreamMeta* pMeta) {
}
if ((*pTask)->chkInfo.pActiveInfo->activeId != 0) {
entry.checkpointInfo.failed = ((*pTask)->chkInfo.pActiveInfo->failedId >= (*pTask)->chkInfo.pActiveInfo->activeId) ? 1 : 0;
entry.checkpointInfo.failed =
((*pTask)->chkInfo.pActiveInfo->failedId >= (*pTask)->chkInfo.pActiveInfo->activeId) ? 1 : 0;
entry.checkpointInfo.activeId = (*pTask)->chkInfo.pActiveInfo->activeId;
entry.checkpointInfo.activeTransId = (*pTask)->chkInfo.pActiveInfo->transId;
if (entry.checkpointInfo.failed) {
stInfo("s-task:%s set kill checkpoint trans in hb, transId:%d", (*pTask)->id.idStr, (*pTask)->chkInfo.pActiveInfo->transId);
stInfo("s-task:%s set kill checkpoint trans in hbMsg, transId:%d", (*pTask)->id.idStr,
(*pTask)->chkInfo.pActiveInfo->transId);
}
}
@ -121,55 +196,23 @@ int32_t streamMetaSendHbHelper(SStreamMeta* pMeta) {
walReaderValidVersionRange((*pTask)->exec.pWalReader, &entry.verRange.minVer, &entry.verRange.maxVer);
}
addUpdateNodeIntoHbMsg(*pTask, &hbMsg);
taosArrayPush(hbMsg.pTaskStatus, &entry);
addUpdateNodeIntoHbMsg(*pTask, pMsg);
taosArrayPush(pMsg->pTaskStatus, &entry);
if (!hasMnodeEpset) {
epsetAssign(&epset, &(*pTask)->info.mnodeEpset);
hasMnodeEpset = true;
}
}
hbMsg.numOfTasks = taosArrayGetSize(hbMsg.pTaskStatus);
pMsg->numOfTasks = taosArrayGetSize(pMsg->pTaskStatus);
if (hasMnodeEpset) {
int32_t code = 0;
int32_t tlen = 0;
tEncodeSize(tEncodeStreamHbMsg, &hbMsg, tlen, code);
if (code < 0) {
stError("vgId:%d encode stream hb msg failed, code:%s", pMeta->vgId, tstrerror(code));
goto _end;
}
void* buf = rpcMallocCont(tlen);
if (buf == NULL) {
stError("vgId:%d encode stream hb msg failed, code:%s", pMeta->vgId, tstrerror(TSDB_CODE_OUT_OF_MEMORY));
goto _end;
}
SEncoder encoder;
tEncoderInit(&encoder, buf, tlen);
if ((code = tEncodeStreamHbMsg(&encoder, &hbMsg)) < 0) {
rpcFreeCont(buf);
stError("vgId:%d encode stream hb msg failed, code:%s", pMeta->vgId, tstrerror(code));
goto _end;
}
tEncoderClear(&encoder);
SRpcMsg msg = {0};
initRpcMsg(&msg, TDMT_MND_STREAM_HEARTBEAT, buf, tlen);
pMeta->pHbInfo->hbCount += 1;
stDebug("vgId:%d build and send hb to mnode, numOfTasks:%d total:%d", pMeta->vgId, hbMsg.numOfTasks,
pMeta->pHbInfo->hbCount);
tmsgSendReq(&epset, &msg);
pInfo->msgSendTs = taosGetTimestampMs();
doSendHbMsgInfo(pMsg, pMeta, &epset);
} else {
stDebug("vgId:%d no tasks and no mnd epset, not send stream hb to mnode", pMeta->vgId);
stDebug("vgId:%d no tasks or no mnd epset, not send stream hb to mnode", pMeta->vgId);
}
_end:
tCleanupStreamHbMsg(&hbMsg);
return TSDB_CODE_SUCCESS;
}
@ -209,7 +252,6 @@ void streamMetaHbToMnode(void* param, void* tmrId) {
return;
}
stDebug("vgId:%d build stream task hb, leader:%d", pMeta->vgId, (pMeta->role == NODE_ROLE_LEADER));
streamMetaRLock(pMeta);
streamMetaSendHbHelper(pMeta);
streamMetaRUnLock(pMeta);
@ -228,7 +270,8 @@ SMetaHbInfo* createMetaHbInfo(int64_t* pRid) {
pInfo->hbTmr = taosTmrStart(streamMetaHbToMnode, META_HB_CHECK_INTERVAL, pRid, streamTimer);
pInfo->tickCounter = 0;
pInfo->stopFlag = 0;
pInfo->msgSendTs = -1;
pInfo->hbCount = 0;
return pInfo;
}
@ -253,4 +296,25 @@ void streamMetaGetHbSendInfo(SMetaHbInfo* pInfo, int64_t* pStartTs, int32_t* pSe
*pStartTs = pInfo->hbStart;
*pSendCount = pInfo->hbCount;
}
int32_t streamProcessHeartbeatRsp(SStreamMeta* pMeta, SMStreamHbRspMsg* pRsp) {
stDebug("vgId:%d process hbMsg rsp, msgId:%d rsp confirmed", pMeta->vgId, pRsp->msgId);
SMetaHbInfo* pInfo = pMeta->pHbInfo;
streamMetaRLock(pMeta);
// current waiting rsp recved
if (pRsp->msgId == pInfo->hbCount) {
tCleanupStreamHbMsg(&pInfo->hbMsg);
stDebug("vgId:%d hbMsg msgId:%d sendTs:%" PRId64 " recved confirmed", pMeta->vgId, pRsp->msgId, pInfo->msgSendTs);
pInfo->hbCount += 1;
pInfo->msgSendTs = -1;
} else {
stWarn("vgId:%d recv expired hb rsp, msgId:%d, discarded", pMeta->vgId, pRsp->msgId);
}
streamMetaRUnLock(pMeta);
return TSDB_CODE_SUCCESS;
}

View File

@ -365,6 +365,7 @@ int32_t tEncodeStreamHbMsg(SEncoder* pEncoder, const SStreamHbMsg* pReq) {
if (tEncodeI32(pEncoder, *pVgId) < 0) return -1;
}
if (tEncodeI32(pEncoder, pReq->msgId) < 0) return -1;
tEndEncode(pEncoder);
return pEncoder->pos;
}
@ -424,6 +425,7 @@ int32_t tDecodeStreamHbMsg(SDecoder* pDecoder, SStreamHbMsg* pReq) {
taosArrayPush(pReq->pUpdateNodes, &vgId);
}
if (tDecodeI32(pDecoder, &pReq->msgId) < 0) return -1;
tEndDecode(pDecoder);
return 0;
}
@ -434,12 +436,16 @@ void tCleanupStreamHbMsg(SStreamHbMsg* pMsg) {
}
if (pMsg->pUpdateNodes != NULL) {
taosArrayDestroy(pMsg->pUpdateNodes);
pMsg->pUpdateNodes = taosArrayDestroy(pMsg->pUpdateNodes);
}
if (pMsg->pTaskStatus != NULL) {
taosArrayDestroy(pMsg->pTaskStatus);
pMsg->pTaskStatus = taosArrayDestroy(pMsg->pTaskStatus);
}
pMsg->msgId = -1;
pMsg->vgId = -1;
pMsg->numOfTasks = -1;
}
int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) {