fix(stream): add ts in HbMsg.
This commit is contained in:
parent
b57b263534
commit
aefb9d275e
|
@ -164,6 +164,7 @@ int32_t tDecodeStreamTaskCheckpointReq(SDecoder* pDecoder, SStreamTaskCheckpoint
|
|||
typedef struct SStreamHbMsg {
|
||||
int32_t vgId;
|
||||
int32_t msgId;
|
||||
int64_t ts;
|
||||
int32_t numOfTasks;
|
||||
SArray* pTaskStatus; // SArray<STaskStatusEntry>
|
||||
SArray* pUpdateNodes; // SArray<int32_t>, needs update the epsets in stream tasks for those nodes.
|
||||
|
|
|
@ -80,6 +80,7 @@ typedef struct SNodeEntry {
|
|||
SEpSet epset; // compare the epset to identify the vgroup tranferring between different dnodes.
|
||||
int64_t hbTimestamp; // second
|
||||
int32_t lastHbMsgId; // latest hb msgId
|
||||
int64_t lastHbMsgTs;
|
||||
} SNodeEntry;
|
||||
|
||||
typedef struct {
|
||||
|
|
|
@ -2085,7 +2085,7 @@ static int32_t refreshNodeListFromExistedStreams(SMnode *pMnode, SArray *pNodeLi
|
|||
break;
|
||||
}
|
||||
|
||||
SNodeEntry entry = {.hbTimestamp = -1, .nodeId = pTask->info.nodeId};
|
||||
SNodeEntry entry = {.hbTimestamp = -1, .nodeId = pTask->info.nodeId, .lastHbMsgId = -1};
|
||||
epsetAssign(&entry.epset, &pTask->info.epSet);
|
||||
(void)taosHashPut(pHash, &entry.nodeId, sizeof(entry.nodeId), &entry, sizeof(entry));
|
||||
}
|
||||
|
@ -2265,7 +2265,7 @@ void saveTaskAndNodeInfoIntoBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode)
|
|||
}
|
||||
|
||||
if (!exist) {
|
||||
SNodeEntry nodeEntry = {.hbTimestamp = -1, .nodeId = pTask->info.nodeId};
|
||||
SNodeEntry nodeEntry = {.hbTimestamp = -1, .nodeId = pTask->info.nodeId, .lastHbMsgId = -1};
|
||||
epsetAssign(&nodeEntry.epset, &pTask->info.epSet);
|
||||
|
||||
void* px = taosArrayPush(pExecNode->pNodeList, &nodeEntry);
|
||||
|
|
|
@ -333,7 +333,8 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
|
|||
}
|
||||
tDecoderClear(&decoder);
|
||||
|
||||
mDebug("receive stream-meta hb from vgId:%d, active numOfTasks:%d, msgId:%d", req.vgId, req.numOfTasks, req.msgId);
|
||||
mDebug("receive stream-meta hb from vgId:%d, active numOfTasks:%d, HbMsgId:%d, HbMsgTs:%" PRId64, req.vgId,
|
||||
req.numOfTasks, req.msgId, req.ts);
|
||||
|
||||
pFailedChkpt = taosArrayInit(4, sizeof(SFailedCheckpointInfo));
|
||||
pOrphanTasks = taosArrayInit(4, sizeof(SOrphanTask));
|
||||
|
@ -366,17 +367,18 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
|
|||
continue;
|
||||
}
|
||||
|
||||
if (pEntry->lastHbMsgId == req.msgId) {
|
||||
mError("vgId:%d Hb msgId:%d already handled, discard", pEntry->nodeId, req.msgId);
|
||||
if ((pEntry->lastHbMsgId == req.msgId) && (pEntry->lastHbMsgTs == req.ts)) {
|
||||
mError("vgId:%d HbMsgId:%d already handled, bh msg discard", pEntry->nodeId, req.msgId);
|
||||
|
||||
terrno = TSDB_CODE_INVALID_MSG;
|
||||
doSendHbMsgRsp(terrno, &pReq->info, req.vgId, req.msgId);
|
||||
|
||||
streamMutexUnlock(&execInfo.lock);
|
||||
cleanupAfterProcessHbMsg(&req, pFailedChkpt, pOrphanTasks);
|
||||
return -1;
|
||||
return terrno;
|
||||
} else {
|
||||
pEntry->lastHbMsgId = req.msgId;
|
||||
pEntry->lastHbMsgTs = req.ts;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -417,6 +419,7 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
|
|||
SStreamObj *pStream = NULL;
|
||||
code = mndGetStreamObj(pMnode, p->id.streamId, &pStream);
|
||||
if (code) {
|
||||
mError("stream obj not exist, failed to handle consensus checkpoint-info req, code:%s", tstrerror(code));
|
||||
continue;
|
||||
}
|
||||
|
||||
|
|
|
@ -142,11 +142,12 @@ int32_t streamMetaSendHbHelper(SStreamMeta* pMeta) {
|
|||
}
|
||||
|
||||
SStreamHbMsg* pMsg = &pInfo->hbMsg;
|
||||
stDebug("vgId:%d build stream hbMsg, leader:%d msgId:%d", pMeta->vgId, (pMeta->role == NODE_ROLE_LEADER),
|
||||
pMeta->pHbInfo->hbCount);
|
||||
|
||||
pMsg->vgId = pMeta->vgId;
|
||||
pMsg->msgId = pMeta->pHbInfo->hbCount;
|
||||
pMsg->ts = taosGetTimestampMs();
|
||||
|
||||
stDebug("vgId:%d build stream hbMsg, leader:%d HbMsgId:%d, HbMsgTs:%" PRId64, pMeta->vgId,
|
||||
(pMeta->role == NODE_ROLE_LEADER), pMsg->msgId, pMsg->ts);
|
||||
|
||||
pMsg->pTaskStatus = taosArrayInit(numOfTasks, sizeof(STaskStatusEntry));
|
||||
pMsg->pUpdateNodes = taosArrayInit(numOfTasks, sizeof(int32_t));
|
||||
|
|
|
@ -382,6 +382,7 @@ int32_t tEncodeStreamHbMsg(SEncoder* pEncoder, const SStreamHbMsg* pReq) {
|
|||
}
|
||||
|
||||
if (tEncodeI32(pEncoder, pReq->msgId) < 0) return -1;
|
||||
if (tEncodeI64(pEncoder, pReq->ts) < 0) return -1;
|
||||
tEndEncode(pEncoder);
|
||||
return pEncoder->pos;
|
||||
}
|
||||
|
@ -454,6 +455,7 @@ int32_t tDecodeStreamHbMsg(SDecoder* pDecoder, SStreamHbMsg* pReq) {
|
|||
}
|
||||
|
||||
if (tDecodeI32(pDecoder, &pReq->msgId) < 0) return -1;
|
||||
if (tDecodeI64(pDecoder, &pReq->ts) < 0) return -1;
|
||||
tEndDecode(pDecoder);
|
||||
return 0;
|
||||
|
||||
|
|
|
@ -76,6 +76,8 @@ system sh/stop_dnodes.sh
|
|||
|
||||
system sh/exec.sh -n dnode1 -s start
|
||||
|
||||
run tsim/stream/checkTaskStatus.sim
|
||||
|
||||
sql insert into t1 values(1648791213002,3,2,3,1.1);
|
||||
|
||||
$loop_count = 0
|
||||
|
|
Loading…
Reference in New Issue