From ee9cdb2967e7fa4b96cbbc64997c529af6690c8d Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Tue, 31 May 2022 17:23:36 +0800 Subject: [PATCH] enh(sync): syncNodeOnSnapshotSendCb, syncNodeOnSnapshotRspCb --- source/dnode/mnode/impl/src/mndMain.c | 21 ++++---- source/libs/sync/src/syncSnapshot.c | 77 +++++++++++++++++++-------- 2 files changed, 65 insertions(+), 33 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndMain.c b/source/dnode/mnode/impl/src/mndMain.c index 8968cd438e..fbfd27c1b1 100644 --- a/source/dnode/mnode/impl/src/mndMain.c +++ b/source/dnode/mnode/impl/src/mndMain.c @@ -369,15 +369,14 @@ int32_t mndProcessSyncMsg(SRpcMsg *pMsg) { mError("failed to process sync msg:%p type:%s since %s", pMsg, TMSG_INFO(pMsg->msgType), terrstr()); return TAOS_SYNC_PROPOSE_OTHER_ERROR; } - + char logBuf[512] = {0}; char *syncNodeStr = sync2SimpleStr(pMgmt->sync); snprintf(logBuf, sizeof(logBuf), "==vnodeProcessSyncReq== msgType:%d, syncNode: %s", pMsg->msgType, syncNodeStr); syncRpcMsgLog2(logBuf, pMsg); taosMemoryFree(syncNodeStr); - - // ugly! use function pointer + // ToDo: ugly! use function pointer if (syncNodeSnapshotEnable(pSyncNode)) { if (pMsg->msgType == TDMT_VND_SYNC_TIMEOUT) { SyncTimeout *pSyncMsg = syncTimeoutFromRpcMsg2(pMsg); @@ -413,14 +412,14 @@ int32_t mndProcessSyncMsg(SRpcMsg *pMsg) { code = syncNodeOnAppendEntriesReplySnapshotCb(pSyncNode, pSyncMsg); syncAppendEntriesReplyDestroy(pSyncMsg); - } else if (pMsg->msgType == TDMT_VND_SYNC_SNAPSHOT_SEND) { - SyncSnapshotSend *pSyncMsg = syncSnapshotSendFromRpcMsg2(pMsg); - code = syncNodeOnSnapshotSendCb(pSyncNode, pSyncMsg); - syncSnapshotSendDestroy(pSyncMsg); - } else if (pMsg->msgType == TDMT_VND_SYNC_SNAPSHOT_RSP) { - SyncSnapshotRsp *pSyncMsg = syncSnapshotRspFromRpcMsg2(pMsg); - code = syncNodeOnSnapshotRspCb(pSyncNode, pSyncMsg); - syncSnapshotRspDestroy(pSyncMsg); + } else if (pMsg->msgType == TDMT_VND_SYNC_SNAPSHOT_SEND) { + SyncSnapshotSend *pSyncMsg = syncSnapshotSendFromRpcMsg2(pMsg); + code = syncNodeOnSnapshotSendCb(pSyncNode, pSyncMsg); + syncSnapshotSendDestroy(pSyncMsg); + } else if (pMsg->msgType == TDMT_VND_SYNC_SNAPSHOT_RSP) { + SyncSnapshotRsp *pSyncMsg = syncSnapshotRspFromRpcMsg2(pMsg); + code = syncNodeOnSnapshotRspCb(pSyncNode, pSyncMsg); + syncSnapshotRspDestroy(pSyncMsg); } else { mError("failed to process msg:%p since invalid type:%s", pMsg, TMSG_INFO(pMsg->msgType)); diff --git a/source/libs/sync/src/syncSnapshot.c b/source/libs/sync/src/syncSnapshot.c index 6b0fea022a..ab09c3eee9 100644 --- a/source/libs/sync/src/syncSnapshot.c +++ b/source/libs/sync/src/syncSnapshot.c @@ -299,6 +299,7 @@ char *snapshotReceiver2Str(SSyncSnapshotReceiver *pReceiver) { return serialized; } +// receiver do something int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) { SSyncSnapshotReceiver *pReceiver = NULL; for (int i = 0; i < pSyncNode->replicaNum; ++i) { @@ -308,32 +309,64 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) { } ASSERT(pReceiver != NULL); - SyncSnapshotRsp *pRspMsg = syncSnapshotRspBuild(pSyncNode->vgId); - pRspMsg->srcId = pSyncNode->myRaftId; - pRspMsg->destId = pMsg->srcId; - pRspMsg->term = pSyncNode->pRaftStore->currentTerm; - pRspMsg->lastIndex = pMsg->lastIndex; - pRspMsg->lastTerm = pMsg->lastTerm; - pRspMsg->ack = pMsg->seq; + // state, term, seq/ack + if (pSyncNode->state == TAOS_SYNC_STATE_FOLLOWER) { + SyncSnapshotRsp *pRspMsg = syncSnapshotRspBuild(pSyncNode->vgId); + pRspMsg->srcId = pSyncNode->myRaftId; + pRspMsg->destId = pMsg->srcId; + pRspMsg->term = pSyncNode->pRaftStore->currentTerm; + pRspMsg->lastIndex = pMsg->lastIndex; + pRspMsg->lastTerm = pMsg->lastTerm; + pRspMsg->ack = pMsg->seq; - if (pMsg->seq == 0) { - // begin - snapshotReceiverStart(pReceiver); + if (pMsg->seq == SYNC_SNAPSHOT_SEQ_BEGIN) { + // begin + snapshotReceiverStart(pReceiver); - } else if (pMsg->seq == -1) { - // end - snapshotReceiverStop(pReceiver); - // apply msg finish + } else if (pMsg->seq == SYNC_SNAPSHOT_SEQ_END) { + // end + pSyncNode->pFsm->FpSnapshotDoWrite(pSyncNode->pFsm, pReceiver->pWriter, pMsg->data, pMsg->dataLen); + pSyncNode->pFsm->FpSnapshotStopWrite(pSyncNode->pFsm, pReceiver->pWriter, true); + snapshotReceiverStop(pReceiver); - } else { - // transfering - // apply msg + } else if (pMsg->seq == SYNC_SNAPSHOT_SEQ_FORCE_CLOSE) { + pSyncNode->pFsm->FpSnapshotStopWrite(pSyncNode->pFsm, pReceiver->pWriter, false); + snapshotReceiverStop(pReceiver); + + } else { + // transfering + if (pMsg->seq == pReceiver->ack + 1) { + pSyncNode->pFsm->FpSnapshotDoWrite(pSyncNode->pFsm, pReceiver->pWriter, pMsg->data, pMsg->dataLen); + } + } + + SRpcMsg rpcMsg; + syncSnapshotRsp2RpcMsg(pRspMsg, &rpcMsg); + syncNodeSendMsgById(&(pRspMsg->destId), pSyncNode, &rpcMsg); } - - SRpcMsg rpcMsg; - syncSnapshotRsp2RpcMsg(pRspMsg, &rpcMsg); - syncNodeSendMsgById(&(pRspMsg->destId), pSyncNode, &rpcMsg); return 0; } -int32_t syncNodeOnSnapshotRspCb(SSyncNode *ths, SyncSnapshotRsp *pMsg) { return 0; } \ No newline at end of file +// sender do something +int32_t syncNodeOnSnapshotRspCb(SSyncNode *pSyncNode, SyncSnapshotRsp *pMsg) { + SSyncSnapshotSender *pSender = NULL; + for (int i = 0; i < pSyncNode->replicaNum; ++i) { + if (syncUtilSameId(&(pMsg->srcId), &((pSyncNode->replicasId)[i]))) { + pSender = (pSyncNode->senders)[i]; + } + } + ASSERT(pSender != NULL); + + // state, term, seq/ack + if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) { + if (pMsg->term == pSyncNode->pRaftStore->currentTerm) { + if (pMsg->ack == pSender->seq) { + pSender->ack = pMsg->ack; + snapshotSend(pSender); + (pSender->seq)++; + } + } + } + + return 0; +} \ No newline at end of file