From 6d8733a232a9a235d86963114a439bf10b71779e Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Tue, 31 May 2022 16:14:19 +0800 Subject: [PATCH] enh(sync): add SSyncSnapshotSender --- include/libs/sync/syncTools.h | 7 +- source/dnode/mnode/impl/src/mndMain.c | 10 ++ source/libs/sync/inc/syncSnapshot.h | 42 +++-- source/libs/sync/src/syncMessage.c | 4 +- source/libs/sync/src/syncSnapshot.c | 234 +++++++++++++++++++++++++- 5 files changed, 270 insertions(+), 27 deletions(-) diff --git a/include/libs/sync/syncTools.h b/include/libs/sync/syncTools.h index 5d8d30329a..daa93c51ad 100644 --- a/include/libs/sync/syncTools.h +++ b/include/libs/sync/syncTools.h @@ -397,7 +397,6 @@ typedef struct SyncSnapshotSend { SyncIndex lastIndex; SyncTerm lastTerm; int32_t seq; - int32_t ack; uint32_t dataLen; char data[]; } SyncSnapshotSend; @@ -431,11 +430,10 @@ typedef struct SyncSnapshotRsp { SyncTerm term; SyncIndex lastIndex; SyncTerm lastTerm; - int32_t seq; int32_t ack; } SyncSnapshotRsp; -SyncSnapshotRsp* syncSnapshotRspBuild(uint32_t dataLen, int32_t vgId); +SyncSnapshotRsp* syncSnapshotRspBuild(int32_t vgId); void syncSnapshotRspDestroy(SyncSnapshotRsp* pMsg); void syncSnapshotRspSerialize(const SyncSnapshotRsp* pMsg, char* buf, uint32_t bufLen); void syncSnapshotRspDeserialize(const char* buf, uint32_t len, SyncSnapshotRsp* pMsg); @@ -467,6 +465,9 @@ int32_t syncNodeOnRequestVoteReplySnapshotCb(SSyncNode* ths, SyncRequestVoteRepl int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMsg); int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntriesReply* pMsg); +int32_t syncNodeOnSnapshotSendCb(SSyncNode* ths, SyncSnapshotSend* pMsg); +int32_t syncNodeOnSnapshotRspCb(SSyncNode* ths, SyncSnapshotRsp* pMsg); + // ----------------------------------------- typedef int32_t (*FpOnPingCb)(SSyncNode* ths, SyncPing* pMsg); typedef int32_t (*FpOnPingReplyCb)(SSyncNode* ths, SyncPingReply* pMsg); diff --git a/source/dnode/mnode/impl/src/mndMain.c b/source/dnode/mnode/impl/src/mndMain.c index ba794b5a93..8968cd438e 100644 --- a/source/dnode/mnode/impl/src/mndMain.c +++ b/source/dnode/mnode/impl/src/mndMain.c @@ -412,6 +412,16 @@ int32_t mndProcessSyncMsg(SRpcMsg *pMsg) { SyncAppendEntriesReply *pSyncMsg = syncAppendEntriesReplyFromRpcMsg2(pMsg); code = syncNodeOnAppendEntriesReplySnapshotCb(pSyncNode, pSyncMsg); syncAppendEntriesReplyDestroy(pSyncMsg); + + } else if (pMsg->msgType == TDMT_VND_SYNC_SNAPSHOT_SEND) { + SyncSnapshotSend *pSyncMsg = syncSnapshotSendFromRpcMsg2(pMsg); + code = syncNodeOnSnapshotSendCb(pSyncNode, pSyncMsg); + syncSnapshotSendDestroy(pSyncMsg); + } else if (pMsg->msgType == TDMT_VND_SYNC_SNAPSHOT_RSP) { + SyncSnapshotRsp *pSyncMsg = syncSnapshotRspFromRpcMsg2(pMsg); + code = syncNodeOnSnapshotRspCb(pSyncNode, pSyncMsg); + syncSnapshotRspDestroy(pSyncMsg); + } else { mError("failed to process msg:%p since invalid type:%s", pMsg, TMSG_INFO(pMsg->msgType)); code = TAOS_SYNC_PROPOSE_OTHER_ERROR; diff --git a/source/libs/sync/inc/syncSnapshot.h b/source/libs/sync/inc/syncSnapshot.h index 2e2975fbc9..5f4db4a0b2 100644 --- a/source/libs/sync/inc/syncSnapshot.h +++ b/source/libs/sync/inc/syncSnapshot.h @@ -25,42 +25,56 @@ extern "C" { #include #include "cJSON.h" #include "syncInt.h" +#include "syncMessage.h" #include "taosdef.h" +#define SYNC_SNAPSHOT_SEQ_INVALID -1 +#define SYNC_SNAPSHOT_SEQ_FORCE_CLOSE -2 +#define SYNC_SNAPSHOT_SEQ_BEGIN 0 +#define SYNC_SNAPSHOT_SEQ_END 0x7FFFFFFF + typedef struct SSyncSnapshotSender { - int32_t sending; - int32_t received; - bool finish; - void * pCurrentBlock; + bool start; + int32_t seq; + int32_t ack; + void *pReader; + void *pCurrentBlock; int32_t blockLen; + SSnapshot snapshot; int64_t sendingMS; - SSnapshot *pSnapshot; SSyncNode *pSyncNode; + int32_t replicaIndex; + SyncTerm term; } SSyncSnapshotSender; -SSyncSnapshotSender *snapshotSenderCreate(SSyncNode *pSyncNode); +SSyncSnapshotSender *snapshotSenderCreate(SSyncNode *pSyncNode, int32_t replicaIndex); void snapshotSenderDestroy(SSyncSnapshotSender *pSender); void snapshotSenderStart(SSyncSnapshotSender *pSender); void snapshotSenderStop(SSyncSnapshotSender *pSender); int32_t snapshotSend(SSyncSnapshotSender *pSender); -cJSON * snapshotSender2Json(SSyncSnapshotSender *pSender); -char * snapshotSender2Str(SSyncSnapshotSender *pSender); +cJSON *snapshotSender2Json(SSyncSnapshotSender *pSender); +char *snapshotSender2Str(SSyncSnapshotSender *pSender); typedef struct SSyncSnapshotReceiver { bool start; - int32_t received; - int32_t progressIndex; - void * pCurrentBlock; + int32_t ack; + void *pWriter; + void *pCurrentBlock; int32_t len; - SSnapshot *pSnapshot; SSyncNode *pSyncNode; + int32_t replicaIndex; } SSyncSnapshotReceiver; SSyncSnapshotReceiver *snapshotReceiverCreate(SSyncNode *pSyncNode); void snapshotReceiverDestroy(SSyncSnapshotReceiver *pReceiver); +void snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver); +void snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver); int32_t snapshotReceive(SSyncSnapshotReceiver *pReceiver); -cJSON * snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver); -char * snapshotReceiver2Str(SSyncSnapshotReceiver *pReceiver); +cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver); +char *snapshotReceiver2Str(SSyncSnapshotReceiver *pReceiver); + +int32_t syncNodeOnSnapshotSendCb(SSyncNode *ths, SyncSnapshotSend *pMsg); +int32_t syncNodeOnSnapshotRspCb(SSyncNode *ths, SyncSnapshotRsp *pMsg); #ifdef __cplusplus } diff --git a/source/libs/sync/src/syncMessage.c b/source/libs/sync/src/syncMessage.c index c04c310066..7314744e3b 100644 --- a/source/libs/sync/src/syncMessage.c +++ b/source/libs/sync/src/syncMessage.c @@ -1806,7 +1806,6 @@ cJSON* syncSnapshotSend2Json(const SyncSnapshotSend* pMsg) { cJSON_AddStringToObject(pRoot, "lastTerm", u64buf); cJSON_AddNumberToObject(pRoot, "seq", pMsg->seq); - cJSON_AddNumberToObject(pRoot, "ack", pMsg->ack); cJSON_AddNumberToObject(pRoot, "dataLen", pMsg->dataLen); char* s; @@ -1858,7 +1857,7 @@ void syncSnapshotSendLog2(char* s, const SyncSnapshotSend* pMsg) { } // --------------------------------------------- -SyncSnapshotRsp* syncSnapshotRspBuild(uint32_t dataLen, int32_t vgId) { +SyncSnapshotRsp* syncSnapshotRspBuild(int32_t vgId) { uint32_t bytes = sizeof(SyncSnapshotRsp); SyncSnapshotRsp* pMsg = taosMemoryMalloc(bytes); memset(pMsg, 0, bytes); @@ -1969,7 +1968,6 @@ cJSON* syncSnapshotRsp2Json(const SyncSnapshotRsp* pMsg) { snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->lastTerm); cJSON_AddStringToObject(pRoot, "lastTerm", u64buf); - cJSON_AddNumberToObject(pRoot, "seq", pMsg->seq); cJSON_AddNumberToObject(pRoot, "ack", pMsg->ack); } diff --git a/source/libs/sync/src/syncSnapshot.c b/source/libs/sync/src/syncSnapshot.c index 760d72235c..04795b65f3 100644 --- a/source/libs/sync/src/syncSnapshot.c +++ b/source/libs/sync/src/syncSnapshot.c @@ -14,27 +14,247 @@ */ #include "syncSnapshot.h" +#include "syncRaftStore.h" +#include "syncUtil.h" -SSyncSnapshotSender *snapshotSenderCreate(SSyncNode *pSyncNode) { return NULL; } +static void snapshotSenderDoStart(SSyncSnapshotSender *pSender); -void snapshotSenderDestroy(SSyncSnapshotSender *pSender) {} +SSyncSnapshotSender *snapshotSenderCreate(SSyncNode *pSyncNode, int32_t replicaIndex) { + ASSERT(pSyncNode->pFsm->FpSnapshotStartRead != NULL); + ASSERT(pSyncNode->pFsm->FpSnapshotStopRead != NULL); + ASSERT(pSyncNode->pFsm->FpSnapshotDoRead != NULL); -void snapshotSenderStart(SSyncSnapshotSender *pSender) {} + SSyncSnapshotSender *pSender = taosMemoryMalloc(sizeof(SSyncSnapshotSender)); + ASSERT(pSender != NULL); + memset(pSender, 0, sizeof(*pSender)); -void snapshotSenderStop(SSyncSnapshotSender *pSender) {} + pSender->start = false; + pSender->seq = SYNC_SNAPSHOT_SEQ_INVALID; + pSender->ack = SYNC_SNAPSHOT_SEQ_INVALID; + pSender->pReader = NULL; + pSender->pCurrentBlock = NULL; + pSender->blockLen = 0; + pSender->sendingMS = 5000; + pSender->pSyncNode = pSyncNode; + pSender->replicaIndex = replicaIndex; + pSender->term = pSyncNode->pRaftStore->currentTerm; -int32_t snapshotSend(SSyncSnapshotSender *pSender) { return 0; } + return pSender; +} -cJSON *snapshotSender2Json(SSyncSnapshotSender *pSender) { return NULL; } +void snapshotSenderDestroy(SSyncSnapshotSender *pSender) { + if (pSender != NULL) { + taosMemoryFree(pSender); + } +} -char *snapshotSender2Str(SSyncSnapshotSender *pSender) { return NULL; } +static void snapshotSenderDoStart(SSyncSnapshotSender *pSender) { + pSender->term = pSender->pSyncNode->pRaftStore->currentTerm; + pSender->seq = SYNC_SNAPSHOT_SEQ_BEGIN; + pSender->ack = SYNC_SNAPSHOT_SEQ_INVALID; + int32_t ret = pSender->pSyncNode->pFsm->FpSnapshotStartRead(pSender->pSyncNode->pFsm, &(pSender->pReader)); + ASSERT(ret == 0); + + pSender->pSyncNode->pFsm->FpGetSnapshot(pSender->pSyncNode->pFsm, &(pSender->snapshot)); + + SyncSnapshotSend *pMsg = syncSnapshotSendBuild(0, pSender->pSyncNode->vgId); + pMsg->srcId = pSender->pSyncNode->myRaftId; + pMsg->destId = (pSender->pSyncNode->replicasId)[pSender->replicaIndex]; + pMsg->term = pSender->pSyncNode->pRaftStore->currentTerm; + pMsg->lastIndex = pSender->snapshot.lastApplyIndex; + pMsg->lastTerm = pSender->snapshot.lastApplyTerm; + pMsg->seq = pSender->seq; + + SRpcMsg rpcMsg; + syncSnapshotSend2RpcMsg(pMsg, &rpcMsg); + syncNodeSendMsgById(&(pMsg->destId), pSender->pSyncNode, &rpcMsg); + syncSnapshotSendDestroy(pMsg); +} + +void snapshotSenderStart(SSyncSnapshotSender *pSender) { + if (!(pSender->start)) { + snapshotSenderDoStart(pSender); + pSender->start = true; + } else { + ASSERT(pSender->pSyncNode->pRaftStore->currentTerm >= pSender->term); + + // leader change + if (pSender->pSyncNode->pRaftStore->currentTerm > pSender->term) { + // force peer rollback + SyncSnapshotSend *pMsg = syncSnapshotSendBuild(0, pSender->pSyncNode->vgId); + pMsg->srcId = pSender->pSyncNode->myRaftId; + pMsg->destId = (pSender->pSyncNode->replicasId)[pSender->replicaIndex]; + pMsg->term = pSender->pSyncNode->pRaftStore->currentTerm; + pMsg->lastIndex = pSender->snapshot.lastApplyIndex; + pMsg->lastTerm = pSender->snapshot.lastApplyTerm; + pMsg->seq = SYNC_SNAPSHOT_SEQ_FORCE_CLOSE; + + SRpcMsg rpcMsg; + syncSnapshotSend2RpcMsg(pMsg, &rpcMsg); + syncNodeSendMsgById(&(pMsg->destId), pSender->pSyncNode, &rpcMsg); + syncSnapshotSendDestroy(pMsg); + + // close reader + int32_t ret = pSender->pSyncNode->pFsm->FpSnapshotStopRead(pSender->pSyncNode->pFsm, pSender->pReader); + ASSERT(ret == 0); + + // start again + snapshotSenderDoStart(pSender); + } else { + // do nothing + } + } +} + +void snapshotSenderStop(SSyncSnapshotSender *pSender) { + int32_t ret = pSender->pSyncNode->pFsm->FpSnapshotStopRead(pSender->pSyncNode->pFsm, pSender->pReader); + ASSERT(ret == 0); + + if (pSender->pCurrentBlock != NULL) { + taosMemoryFree(pSender->pCurrentBlock); + pSender->blockLen = 0; + } + + ret = pSender->pSyncNode->pFsm->FpSnapshotDoRead(pSender->pSyncNode->pFsm, pSender->pReader, + &(pSender->pCurrentBlock), &(pSender->blockLen)); + ASSERT(ret == 0); +} + +// send msg from seq, seq is already updated +int32_t snapshotSend(SSyncSnapshotSender *pSender) { + // free memory last time (seq - 1) + if (pSender->pCurrentBlock != NULL) { + taosMemoryFree(pSender->pCurrentBlock); + pSender->blockLen = 0; + } + + // read data + int32_t ret = pSender->pSyncNode->pFsm->FpSnapshotDoRead(pSender->pSyncNode->pFsm, pSender->pReader, + &(pSender->pCurrentBlock), &(pSender->blockLen)); + ASSERT(ret == 0); + + SyncSnapshotSend *pMsg = syncSnapshotSendBuild(pSender->blockLen, pSender->pSyncNode->vgId); + pMsg->srcId = pSender->pSyncNode->myRaftId; + pMsg->destId = (pSender->pSyncNode->replicasId)[pSender->replicaIndex]; + pMsg->term = pSender->pSyncNode->pRaftStore->currentTerm; + pMsg->lastIndex = pSender->snapshot.lastApplyIndex; + pMsg->lastTerm = pSender->snapshot.lastApplyTerm; + pMsg->seq = pSender->seq; + memcpy(pMsg->data, pSender->pCurrentBlock, pSender->blockLen); + + SRpcMsg rpcMsg; + syncSnapshotSend2RpcMsg(pMsg, &rpcMsg); + syncNodeSendMsgById(&(pMsg->destId), pSender->pSyncNode, &rpcMsg); + syncSnapshotSendDestroy(pMsg); + + return 0; +} + +cJSON *snapshotSender2Json(SSyncSnapshotSender *pSender) { + char u64buf[128]; + cJSON *pRoot = cJSON_CreateObject(); + + if (pSender != NULL) { + cJSON_AddNumberToObject(pRoot, "start", pSender->start); + cJSON_AddNumberToObject(pRoot, "seq", pSender->seq); + cJSON_AddNumberToObject(pRoot, "ack", pSender->ack); + + snprintf(u64buf, sizeof(u64buf), "%p", pSender->pReader); + cJSON_AddStringToObject(pRoot, "pReader", u64buf); + + snprintf(u64buf, sizeof(u64buf), "%p", pSender->pCurrentBlock); + cJSON_AddStringToObject(pRoot, "pCurrentBlock", u64buf); + cJSON_AddNumberToObject(pRoot, "blockLen", pSender->blockLen); + + if (pSender->pCurrentBlock != NULL) { + char *s; + s = syncUtilprintBin((char *)(pSender->pCurrentBlock), pSender->blockLen); + cJSON_AddStringToObject(pRoot, "pCurrentBlock", s); + taosMemoryFree(s); + s = syncUtilprintBin2((char *)(pSender->pCurrentBlock), pSender->blockLen); + cJSON_AddStringToObject(pRoot, "pCurrentBlock2", s); + taosMemoryFree(s); + } + + cJSON *pSnapshot = cJSON_CreateObject(); + snprintf(u64buf, sizeof(u64buf), "%lu", pSender->snapshot.lastApplyIndex); + cJSON_AddStringToObject(pRoot, "lastApplyIndex", u64buf); + snprintf(u64buf, sizeof(u64buf), "%lu", pSender->snapshot.lastApplyTerm); + cJSON_AddStringToObject(pRoot, "lastApplyTerm", u64buf); + cJSON_AddItemToObject(pRoot, "snapshot", pSnapshot); + + snprintf(u64buf, sizeof(u64buf), "%lu", pSender->sendingMS); + cJSON_AddStringToObject(pRoot, "sendingMS", u64buf); + snprintf(u64buf, sizeof(u64buf), "%p", pSender->pSyncNode); + cJSON_AddStringToObject(pRoot, "pSyncNode", u64buf); + cJSON_AddNumberToObject(pRoot, "replicaIndex", pSender->replicaIndex); + snprintf(u64buf, sizeof(u64buf), "%lu", pSender->term); + cJSON_AddStringToObject(pRoot, "term", u64buf); + } + + cJSON *pJson = cJSON_CreateObject(); + cJSON_AddItemToObject(pJson, "SSyncSnapshotSender", pRoot); + return pJson; +} + +char *snapshotSender2Str(SSyncSnapshotSender *pSender) { + cJSON *pJson = snapshotSender2Json(pSender); + char *serialized = cJSON_Print(pJson); + cJSON_Delete(pJson); + return serialized; +} + +// ------------------------------------- SSyncSnapshotReceiver *snapshotReceiverCreate(SSyncNode *pSyncNode) { return NULL; } void snapshotReceiverDestroy(SSyncSnapshotReceiver *pReceiver) {} +void snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver) {} + +void snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver) {} + int32_t snapshotReceive(SSyncSnapshotReceiver *pReceiver) { return 0; } cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver) { return NULL; } char *snapshotReceiver2Str(SSyncSnapshotReceiver *pReceiver) { return NULL; } + +int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) { + SSyncSnapshotReceiver *pReceiver = NULL; + for (int i = 0; i < pSyncNode->replicaNum; ++i) { + if (syncUtilSameId(&(pMsg->srcId), &((pSyncNode->replicasId)[i]))) { + pReceiver = (pSyncNode->receivers)[i]; + } + } + ASSERT(pReceiver != NULL); + + SyncSnapshotRsp *pRspMsg = syncSnapshotRspBuild(pSyncNode->vgId); + pRspMsg->srcId = pSyncNode->myRaftId; + pRspMsg->destId = pMsg->srcId; + pRspMsg->term = pSyncNode->pRaftStore->currentTerm; + pRspMsg->lastIndex = pMsg->lastIndex; + pRspMsg->lastTerm = pMsg->lastTerm; + pRspMsg->ack = pMsg->seq; + + if (pMsg->seq == 0) { + // begin + snapshotReceiverStart(pReceiver); + + } else if (pMsg->seq == -1) { + // end + snapshotReceiverStop(pReceiver); + // apply msg finish + + } else { + // transfering + // apply msg + } + + SRpcMsg rpcMsg; + syncSnapshotRsp2RpcMsg(pRspMsg, &rpcMsg); + syncNodeSendMsgById(&(pRspMsg->destId), pSyncNode, &rpcMsg); + return 0; +} + +int32_t syncNodeOnSnapshotRspCb(SSyncNode *ths, SyncSnapshotRsp *pMsg) { return 0; } \ No newline at end of file