From 060f8224f4ef18088ff079bc7617f31edb9f3362 Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Wed, 1 Jun 2022 13:32:52 +0800 Subject: [PATCH] refactor: snapshotReSend --- source/libs/sync/inc/syncSnapshot.h | 3 +- source/libs/sync/src/syncSnapshot.c | 159 ++++++++++++------ .../sync/test/syncSnapshotReceiverTest.cpp | 3 - 3 files changed, 104 insertions(+), 61 deletions(-) diff --git a/source/libs/sync/inc/syncSnapshot.h b/source/libs/sync/inc/syncSnapshot.h index 281e30f44e..cf69b385ec 100644 --- a/source/libs/sync/inc/syncSnapshot.h +++ b/source/libs/sync/inc/syncSnapshot.h @@ -54,6 +54,7 @@ void snapshotSenderDestroy(SSyncSnapshotSender *pSender); void snapshotSenderStart(SSyncSnapshotSender *pSender); void snapshotSenderStop(SSyncSnapshotSender *pSender); int32_t snapshotSend(SSyncSnapshotSender *pSender); +int32_t snapshotReSend(SSyncSnapshotSender *pSender); cJSON *snapshotSender2Json(SSyncSnapshotSender *pSender); char *snapshotSender2Str(SSyncSnapshotSender *pSender); @@ -62,8 +63,6 @@ typedef struct SSyncSnapshotReceiver { int32_t ack; void *pWriter; - void *pCurrentBlock; - int32_t blockLen; SyncTerm term; SSyncNode *pSyncNode; diff --git a/source/libs/sync/src/syncSnapshot.c b/source/libs/sync/src/syncSnapshot.c index b9cd5422c9..7825a7cd95 100644 --- a/source/libs/sync/src/syncSnapshot.c +++ b/source/libs/sync/src/syncSnapshot.c @@ -18,6 +18,7 @@ #include "syncUtil.h" static void snapshotSenderDoStart(SSyncSnapshotSender *pSender); +static void snapshotReceiverDoStart(SSyncSnapshotReceiver *pReceiver); SSyncSnapshotSender *snapshotSenderCreate(SSyncNode *pSyncNode, int32_t replicaIndex) { bool condition = (pSyncNode->pFsm->FpSnapshotStartRead != NULL) && (pSyncNode->pFsm->FpSnapshotStopRead != NULL) && @@ -176,6 +177,26 @@ int32_t snapshotSend(SSyncSnapshotSender *pSender) { return 0; } +// send snapshot data from cache +int32_t snapshotReSend(SSyncSnapshotSender *pSender) { + if (pSender->pCurrentBlock != NULL) { + SyncSnapshotSend *pMsg = syncSnapshotSendBuild(pSender->blockLen, pSender->pSyncNode->vgId); + pMsg->srcId = pSender->pSyncNode->myRaftId; + pMsg->destId = (pSender->pSyncNode->replicasId)[pSender->replicaIndex]; + pMsg->term = pSender->pSyncNode->pRaftStore->currentTerm; + pMsg->lastIndex = pSender->snapshot.lastApplyIndex; + pMsg->lastTerm = pSender->snapshot.lastApplyTerm; + pMsg->seq = pSender->seq; + memcpy(pMsg->data, pSender->pCurrentBlock, pSender->blockLen); + + SRpcMsg rpcMsg; + syncSnapshotSend2RpcMsg(pMsg, &rpcMsg); + syncNodeSendMsgById(&(pMsg->destId), pSender->pSyncNode, &rpcMsg); + syncSnapshotSendDestroy(pMsg); + } + return 0; +} + cJSON *snapshotSender2Json(SSyncSnapshotSender *pSender) { char u64buf[128]; cJSON *pRoot = cJSON_CreateObject(); @@ -232,22 +253,24 @@ char *snapshotSender2Str(SSyncSnapshotSender *pSender) { // ------------------------------------- SSyncSnapshotReceiver *snapshotReceiverCreate(SSyncNode *pSyncNode, int32_t replicaIndex) { - ASSERT(pSyncNode->pFsm->FpSnapshotStartWrite != NULL); - ASSERT(pSyncNode->pFsm->FpSnapshotStopWrite != NULL); - ASSERT(pSyncNode->pFsm->FpSnapshotDoWrite != NULL); + bool condition = (pSyncNode->pFsm->FpSnapshotStartWrite != NULL) && (pSyncNode->pFsm->FpSnapshotStopWrite != NULL) && + (pSyncNode->pFsm->FpSnapshotDoWrite != NULL); - SSyncSnapshotReceiver *pReceiver = taosMemoryMalloc(sizeof(SSyncSnapshotReceiver)); - ASSERT(pReceiver != NULL); - memset(pReceiver, 0, sizeof(*pReceiver)); + SSyncSnapshotReceiver *pReceiver; + if (condition) { + pReceiver = taosMemoryMalloc(sizeof(SSyncSnapshotReceiver)); + ASSERT(pReceiver != NULL); + memset(pReceiver, 0, sizeof(*pReceiver)); - pReceiver->start = false; - pReceiver->ack = SYNC_SNAPSHOT_SEQ_BEGIN; - pReceiver->pWriter = NULL; - pReceiver->pCurrentBlock = NULL; - pReceiver->blockLen = 0; - pReceiver->pSyncNode = pSyncNode; - pReceiver->replicaIndex = replicaIndex; - pReceiver->term = pSyncNode->pRaftStore->currentTerm; + pReceiver->start = false; + pReceiver->ack = SYNC_SNAPSHOT_SEQ_BEGIN; + pReceiver->pWriter = NULL; + pReceiver->pSyncNode = pSyncNode; + pReceiver->replicaIndex = replicaIndex; + pReceiver->term = pSyncNode->pRaftStore->currentTerm; + } else { + sInfo("snapshotReceiverCreate cannot create receiver"); + } return pReceiver; } @@ -258,30 +281,48 @@ void snapshotReceiverDestroy(SSyncSnapshotReceiver *pReceiver) { } } +// begin receive snapshot msg (current term, seq begin) +static void snapshotReceiverDoStart(SSyncSnapshotReceiver *pReceiver) { + pReceiver->term = pReceiver->pSyncNode->pRaftStore->currentTerm; + pReceiver->ack = SYNC_SNAPSHOT_SEQ_BEGIN; + + ASSERT(pReceiver->pWriter == NULL); + int32_t ret = pReceiver->pSyncNode->pFsm->FpSnapshotStartWrite(pReceiver->pSyncNode->pFsm, &(pReceiver->pWriter)); + ASSERT(ret == 0); +} + +// if receiver receive msg from seq = SYNC_SNAPSHOT_SEQ_BEGIN, start receiver +// if already start, force close, start again void snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver) { if (!(pReceiver->start)) { - pReceiver->ack = SYNC_SNAPSHOT_SEQ_BEGIN; - int32_t ret = pReceiver->pSyncNode->pFsm->FpSnapshotStartWrite(pReceiver->pSyncNode->pFsm, &(pReceiver->pWriter)); - ASSERT(ret == 0); - if (pReceiver->pCurrentBlock != NULL) { - taosMemoryFree(pReceiver->pCurrentBlock); - pReceiver->pCurrentBlock = NULL; - pReceiver->blockLen = 0; - } - pReceiver->term = pReceiver->pSyncNode->pRaftStore->currentTerm; + // start + snapshotReceiverDoStart(pReceiver); + } else { + // already start + + // force close, abandon incomplete data + int32_t ret = + pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, false); + ASSERT(ret == 0); + pReceiver->pWriter = NULL; + + // start again + snapshotReceiverDoStart(pReceiver); + ASSERT(0); } } void snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver) { - int32_t ret = pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, true); - ASSERT(ret == 0); - - if (pReceiver->pCurrentBlock != NULL) { - taosMemoryFree(pReceiver->pCurrentBlock); - pReceiver->blockLen = 0; + if (pReceiver->pWriter != NULL) { + int32_t ret = + pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, false); + ASSERT(ret == 0); + pReceiver->pWriter = NULL; } + + pReceiver->start = false; } cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver) { @@ -295,20 +336,6 @@ cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver) { snprintf(u64buf, sizeof(u64buf), "%p", pReceiver->pWriter); cJSON_AddStringToObject(pRoot, "pWriter", u64buf); - snprintf(u64buf, sizeof(u64buf), "%p", pReceiver->pCurrentBlock); - cJSON_AddStringToObject(pRoot, "pCurrentBlock", u64buf); - cJSON_AddNumberToObject(pRoot, "blockLen", pReceiver->blockLen); - - if (pReceiver->pCurrentBlock != NULL) { - char *s; - s = syncUtilprintBin((char *)(pReceiver->pCurrentBlock), pReceiver->blockLen); - cJSON_AddStringToObject(pRoot, "pCurrentBlock", s); - taosMemoryFree(s); - s = syncUtilprintBin2((char *)(pReceiver->pCurrentBlock), pReceiver->blockLen); - cJSON_AddStringToObject(pRoot, "pCurrentBlock2", s); - taosMemoryFree(s); - } - snprintf(u64buf, sizeof(u64buf), "%p", pReceiver->pSyncNode); cJSON_AddStringToObject(pRoot, "pSyncNode", u64buf); cJSON_AddNumberToObject(pRoot, "replicaIndex", pReceiver->replicaIndex); @@ -330,6 +357,7 @@ char *snapshotReceiver2Str(SSyncSnapshotReceiver *pReceiver) { // receiver do something int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) { + // get receiver SSyncSnapshotReceiver *pReceiver = NULL; for (int i = 0; i < pSyncNode->replicaNum; ++i) { if (syncUtilSameId(&(pMsg->srcId), &((pSyncNode->replicasId)[i]))) { @@ -338,40 +366,55 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) { } ASSERT(pReceiver != NULL); + bool needRsp = false; + // state, term, seq/ack if (pSyncNode->state == TAOS_SYNC_STATE_FOLLOWER) { - SyncSnapshotRsp *pRspMsg = syncSnapshotRspBuild(pSyncNode->vgId); - pRspMsg->srcId = pSyncNode->myRaftId; - pRspMsg->destId = pMsg->srcId; - pRspMsg->term = pSyncNode->pRaftStore->currentTerm; - pRspMsg->lastIndex = pMsg->lastIndex; - pRspMsg->lastTerm = pMsg->lastTerm; - pRspMsg->ack = pMsg->seq; - if (pMsg->seq == SYNC_SNAPSHOT_SEQ_BEGIN) { // begin snapshotReceiverStart(pReceiver); + pReceiver->ack = pMsg->seq; + needRsp = true; } else if (pMsg->seq == SYNC_SNAPSHOT_SEQ_END) { - // end + // end, finish FSM pSyncNode->pFsm->FpSnapshotDoWrite(pSyncNode->pFsm, pReceiver->pWriter, pMsg->data, pMsg->dataLen); pSyncNode->pFsm->FpSnapshotStopWrite(pSyncNode->pFsm, pReceiver->pWriter, true); snapshotReceiverStop(pReceiver); + pReceiver->ack = pMsg->seq; + needRsp = true; } else if (pMsg->seq == SYNC_SNAPSHOT_SEQ_FORCE_CLOSE) { pSyncNode->pFsm->FpSnapshotStopWrite(pSyncNode->pFsm, pReceiver->pWriter, false); snapshotReceiverStop(pReceiver); + needRsp = false; - } else { + } else if (pMsg->seq > SYNC_SNAPSHOT_SEQ_BEGIN && pMsg->seq < SYNC_SNAPSHOT_SEQ_END) { // transfering if (pMsg->seq == pReceiver->ack + 1) { pSyncNode->pFsm->FpSnapshotDoWrite(pSyncNode->pFsm, pReceiver->pWriter, pMsg->data, pMsg->dataLen); + pReceiver->ack = pMsg->seq; } + needRsp = true; + + } else { + ASSERT(0); } - SRpcMsg rpcMsg; - syncSnapshotRsp2RpcMsg(pRspMsg, &rpcMsg); - syncNodeSendMsgById(&(pRspMsg->destId), pSyncNode, &rpcMsg); + if (needRsp) { + SyncSnapshotRsp *pRspMsg = syncSnapshotRspBuild(pSyncNode->vgId); + pRspMsg->srcId = pSyncNode->myRaftId; + pRspMsg->destId = pMsg->srcId; + pRspMsg->term = pSyncNode->pRaftStore->currentTerm; + pRspMsg->lastIndex = pMsg->lastIndex; + pRspMsg->lastTerm = pMsg->lastTerm; + pRspMsg->ack = pReceiver->ack; + + SRpcMsg rpcMsg; + syncSnapshotRsp2RpcMsg(pRspMsg, &rpcMsg); + syncNodeSendMsgById(&(pRspMsg->destId), pSyncNode, &rpcMsg); + syncSnapshotRspDestroy(pRspMsg); + } } return 0; } @@ -403,6 +446,10 @@ int32_t syncNodeOnSnapshotRspCb(SSyncNode *pSyncNode, SyncSnapshotRsp *pMsg) { pSender->ack = pMsg->ack; (pSender->seq)++; snapshotSend(pSender); + } else if (pMsg->ack == pSender->seq - 1) { + snapshotReSend(pSender); + } else { + ASSERT(0); } } } diff --git a/source/libs/sync/test/syncSnapshotReceiverTest.cpp b/source/libs/sync/test/syncSnapshotReceiverTest.cpp index e912ecc597..a9e87ba12d 100644 --- a/source/libs/sync/test/syncSnapshotReceiverTest.cpp +++ b/source/libs/sync/test/syncSnapshotReceiverTest.cpp @@ -45,9 +45,6 @@ SSyncSnapshotReceiver* createReceiver() { pReceiver->start = true; pReceiver->ack = 20; pReceiver->pWriter = (void*)0x11; - pReceiver->blockLen = 20; - pReceiver->pCurrentBlock = taosMemoryMalloc(pReceiver->blockLen); - snprintf((char*)(pReceiver->pCurrentBlock), pReceiver->blockLen, "%s", "hello"); pReceiver->term = 66; return pReceiver;