enh(sync): syncNodeOnSnapshotSendCb, syncNodeOnSnapshotRspCb

This commit is contained in:
Minghao Li 2022-05-31 17:23:36 +08:00
parent 21191ae2a8
commit ee9cdb2967
2 changed files with 65 additions and 33 deletions

View File

@ -369,15 +369,14 @@ int32_t mndProcessSyncMsg(SRpcMsg *pMsg) {
mError("failed to process sync msg:%p type:%s since %s", pMsg, TMSG_INFO(pMsg->msgType), terrstr()); mError("failed to process sync msg:%p type:%s since %s", pMsg, TMSG_INFO(pMsg->msgType), terrstr());
return TAOS_SYNC_PROPOSE_OTHER_ERROR; return TAOS_SYNC_PROPOSE_OTHER_ERROR;
} }
char logBuf[512] = {0}; char logBuf[512] = {0};
char *syncNodeStr = sync2SimpleStr(pMgmt->sync); char *syncNodeStr = sync2SimpleStr(pMgmt->sync);
snprintf(logBuf, sizeof(logBuf), "==vnodeProcessSyncReq== msgType:%d, syncNode: %s", pMsg->msgType, syncNodeStr); snprintf(logBuf, sizeof(logBuf), "==vnodeProcessSyncReq== msgType:%d, syncNode: %s", pMsg->msgType, syncNodeStr);
syncRpcMsgLog2(logBuf, pMsg); syncRpcMsgLog2(logBuf, pMsg);
taosMemoryFree(syncNodeStr); taosMemoryFree(syncNodeStr);
// ToDo: ugly! use function pointer
// ugly! use function pointer
if (syncNodeSnapshotEnable(pSyncNode)) { if (syncNodeSnapshotEnable(pSyncNode)) {
if (pMsg->msgType == TDMT_VND_SYNC_TIMEOUT) { if (pMsg->msgType == TDMT_VND_SYNC_TIMEOUT) {
SyncTimeout *pSyncMsg = syncTimeoutFromRpcMsg2(pMsg); SyncTimeout *pSyncMsg = syncTimeoutFromRpcMsg2(pMsg);
@ -413,14 +412,14 @@ int32_t mndProcessSyncMsg(SRpcMsg *pMsg) {
code = syncNodeOnAppendEntriesReplySnapshotCb(pSyncNode, pSyncMsg); code = syncNodeOnAppendEntriesReplySnapshotCb(pSyncNode, pSyncMsg);
syncAppendEntriesReplyDestroy(pSyncMsg); syncAppendEntriesReplyDestroy(pSyncMsg);
} else if (pMsg->msgType == TDMT_VND_SYNC_SNAPSHOT_SEND) { } else if (pMsg->msgType == TDMT_VND_SYNC_SNAPSHOT_SEND) {
SyncSnapshotSend *pSyncMsg = syncSnapshotSendFromRpcMsg2(pMsg); SyncSnapshotSend *pSyncMsg = syncSnapshotSendFromRpcMsg2(pMsg);
code = syncNodeOnSnapshotSendCb(pSyncNode, pSyncMsg); code = syncNodeOnSnapshotSendCb(pSyncNode, pSyncMsg);
syncSnapshotSendDestroy(pSyncMsg); syncSnapshotSendDestroy(pSyncMsg);
} else if (pMsg->msgType == TDMT_VND_SYNC_SNAPSHOT_RSP) { } else if (pMsg->msgType == TDMT_VND_SYNC_SNAPSHOT_RSP) {
SyncSnapshotRsp *pSyncMsg = syncSnapshotRspFromRpcMsg2(pMsg); SyncSnapshotRsp *pSyncMsg = syncSnapshotRspFromRpcMsg2(pMsg);
code = syncNodeOnSnapshotRspCb(pSyncNode, pSyncMsg); code = syncNodeOnSnapshotRspCb(pSyncNode, pSyncMsg);
syncSnapshotRspDestroy(pSyncMsg); syncSnapshotRspDestroy(pSyncMsg);
} else { } else {
mError("failed to process msg:%p since invalid type:%s", pMsg, TMSG_INFO(pMsg->msgType)); mError("failed to process msg:%p since invalid type:%s", pMsg, TMSG_INFO(pMsg->msgType));

View File

@ -299,6 +299,7 @@ char *snapshotReceiver2Str(SSyncSnapshotReceiver *pReceiver) {
return serialized; return serialized;
} }
// receiver do something
int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) { int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
SSyncSnapshotReceiver *pReceiver = NULL; SSyncSnapshotReceiver *pReceiver = NULL;
for (int i = 0; i < pSyncNode->replicaNum; ++i) { for (int i = 0; i < pSyncNode->replicaNum; ++i) {
@ -308,32 +309,64 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
} }
ASSERT(pReceiver != NULL); ASSERT(pReceiver != NULL);
SyncSnapshotRsp *pRspMsg = syncSnapshotRspBuild(pSyncNode->vgId); // state, term, seq/ack
pRspMsg->srcId = pSyncNode->myRaftId; if (pSyncNode->state == TAOS_SYNC_STATE_FOLLOWER) {
pRspMsg->destId = pMsg->srcId; SyncSnapshotRsp *pRspMsg = syncSnapshotRspBuild(pSyncNode->vgId);
pRspMsg->term = pSyncNode->pRaftStore->currentTerm; pRspMsg->srcId = pSyncNode->myRaftId;
pRspMsg->lastIndex = pMsg->lastIndex; pRspMsg->destId = pMsg->srcId;
pRspMsg->lastTerm = pMsg->lastTerm; pRspMsg->term = pSyncNode->pRaftStore->currentTerm;
pRspMsg->ack = pMsg->seq; pRspMsg->lastIndex = pMsg->lastIndex;
pRspMsg->lastTerm = pMsg->lastTerm;
pRspMsg->ack = pMsg->seq;
if (pMsg->seq == 0) { if (pMsg->seq == SYNC_SNAPSHOT_SEQ_BEGIN) {
// begin // begin
snapshotReceiverStart(pReceiver); snapshotReceiverStart(pReceiver);
} else if (pMsg->seq == -1) { } else if (pMsg->seq == SYNC_SNAPSHOT_SEQ_END) {
// end // end
snapshotReceiverStop(pReceiver); pSyncNode->pFsm->FpSnapshotDoWrite(pSyncNode->pFsm, pReceiver->pWriter, pMsg->data, pMsg->dataLen);
// apply msg finish pSyncNode->pFsm->FpSnapshotStopWrite(pSyncNode->pFsm, pReceiver->pWriter, true);
snapshotReceiverStop(pReceiver);
} else { } else if (pMsg->seq == SYNC_SNAPSHOT_SEQ_FORCE_CLOSE) {
// transfering pSyncNode->pFsm->FpSnapshotStopWrite(pSyncNode->pFsm, pReceiver->pWriter, false);
// apply msg snapshotReceiverStop(pReceiver);
} else {
// transfering
if (pMsg->seq == pReceiver->ack + 1) {
pSyncNode->pFsm->FpSnapshotDoWrite(pSyncNode->pFsm, pReceiver->pWriter, pMsg->data, pMsg->dataLen);
}
}
SRpcMsg rpcMsg;
syncSnapshotRsp2RpcMsg(pRspMsg, &rpcMsg);
syncNodeSendMsgById(&(pRspMsg->destId), pSyncNode, &rpcMsg);
} }
SRpcMsg rpcMsg;
syncSnapshotRsp2RpcMsg(pRspMsg, &rpcMsg);
syncNodeSendMsgById(&(pRspMsg->destId), pSyncNode, &rpcMsg);
return 0; return 0;
} }
int32_t syncNodeOnSnapshotRspCb(SSyncNode *ths, SyncSnapshotRsp *pMsg) { return 0; } // sender do something
int32_t syncNodeOnSnapshotRspCb(SSyncNode *pSyncNode, SyncSnapshotRsp *pMsg) {
SSyncSnapshotSender *pSender = NULL;
for (int i = 0; i < pSyncNode->replicaNum; ++i) {
if (syncUtilSameId(&(pMsg->srcId), &((pSyncNode->replicasId)[i]))) {
pSender = (pSyncNode->senders)[i];
}
}
ASSERT(pSender != NULL);
// state, term, seq/ack
if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
if (pMsg->term == pSyncNode->pRaftStore->currentTerm) {
if (pMsg->ack == pSender->seq) {
pSender->ack = pMsg->ack;
snapshotSend(pSender);
(pSender->seq)++;
}
}
}
return 0;
}