refactor(sync): snapshot sender, receiver
This commit is contained in:
parent
6437d20cda
commit
e8160342c3
|
@ -83,9 +83,9 @@ typedef struct SSyncSnapshotReceiver {
|
|||
|
||||
SSyncSnapshotReceiver *snapshotReceiverCreate(SSyncNode *pSyncNode, SRaftId fromId);
|
||||
void snapshotReceiverDestroy(SSyncSnapshotReceiver *pReceiver);
|
||||
int32_t snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver, SyncTerm privateTerm, SyncSnapshotSend *pBeginMsg);
|
||||
int32_t snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver);
|
||||
bool snapshotReceiverIsStart(SSyncSnapshotReceiver *pReceiver);
|
||||
int32_t snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pBeginMsg);
|
||||
int32_t snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver);
|
||||
bool snapshotReceiverIsStart(SSyncSnapshotReceiver *pReceiver);
|
||||
|
||||
cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver);
|
||||
char *snapshotReceiver2Str(SSyncSnapshotReceiver *pReceiver);
|
||||
|
|
|
@ -22,9 +22,11 @@
|
|||
#include "wal.h"
|
||||
|
||||
//----------------------------------
|
||||
static void snapshotReceiverDoStart(SSyncSnapshotReceiver *pReceiver, SyncTerm privateTerm,
|
||||
SyncSnapshotSend *pBeginMsg);
|
||||
static void snapshotReceiverGotData(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pMsg);
|
||||
static void snapshotSenderUpdateProgress(SSyncSnapshotSender *pSender, SyncSnapshotRsp *pMsg);
|
||||
static void snapshotReceiverDoStart(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pBeginMsg);
|
||||
static void snapshotReceiverForceStop(SSyncSnapshotReceiver *pReceiver);
|
||||
static void snapshotReceiverGotData(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pMsg);
|
||||
static int32_t snapshotReceiverFinish(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pMsg);
|
||||
|
||||
//----------------------------------
|
||||
SSyncSnapshotSender *snapshotSenderCreate(SSyncNode *pSyncNode, int32_t replicaIndex) {
|
||||
|
@ -68,7 +70,9 @@ void snapshotSenderDestroy(SSyncSnapshotSender *pSender) {
|
|||
// close reader
|
||||
if (pSender->pReader != NULL) {
|
||||
int32_t ret = pSender->pSyncNode->pFsm->FpSnapshotStopRead(pSender->pSyncNode->pFsm, pSender->pReader);
|
||||
ASSERT(ret == 0);
|
||||
if (ret != 0) {
|
||||
syncNodeErrorLog(pSender->pSyncNode, "stop reader error");
|
||||
}
|
||||
pSender->pReader = NULL;
|
||||
}
|
||||
|
||||
|
@ -79,7 +83,12 @@ void snapshotSenderDestroy(SSyncSnapshotSender *pSender) {
|
|||
|
||||
bool snapshotSenderIsStart(SSyncSnapshotSender *pSender) { return pSender->start; }
|
||||
|
||||
// begin send snapshot by snapshot, pReader
|
||||
// begin send snapshot by param, snapshot, pReader
|
||||
//
|
||||
// action:
|
||||
// 1. assert reader not start
|
||||
// 2. update state
|
||||
// 3. send first snapshot block
|
||||
int32_t snapshotSenderStart(SSyncSnapshotSender *pSender, SSnapshotParam snapshotParam, SSnapshot snapshot,
|
||||
void *pReader) {
|
||||
ASSERT(!snapshotSenderIsStart(pSender));
|
||||
|
@ -98,7 +107,7 @@ int32_t snapshotSenderStart(SSyncSnapshotSender *pSender, SSnapshotParam snapsho
|
|||
|
||||
// update term
|
||||
pSender->term = pSender->pSyncNode->pRaftStore->currentTerm;
|
||||
++(pSender->privateTerm);
|
||||
++(pSender->privateTerm); // increase private term
|
||||
|
||||
// update state
|
||||
pSender->finish = false;
|
||||
|
@ -114,9 +123,7 @@ int32_t snapshotSenderStart(SSyncSnapshotSender *pSender, SSnapshotParam snapsho
|
|||
|
||||
code = pSender->pSyncNode->pLogStore->syncLogGetEntry(pSender->pSyncNode->pLogStore,
|
||||
pSender->snapshot.lastConfigIndex, &pEntry);
|
||||
if (code == 0) {
|
||||
ASSERT(pEntry != NULL);
|
||||
|
||||
if (code == 0 && pEntry != NULL) {
|
||||
SRpcMsg rpcMsg;
|
||||
syncEntry2OriginalRpc(pEntry, &rpcMsg);
|
||||
|
||||
|
@ -207,6 +214,8 @@ int32_t snapshotSenderStop(SSyncSnapshotSender *pSender, bool finish) {
|
|||
pSender->start = false;
|
||||
pSender->finish = finish;
|
||||
|
||||
// do not update term, maybe print
|
||||
|
||||
// event log
|
||||
do {
|
||||
char *eventLog = snapshotSender2SimpleStr(pSender, "snapshot sender stop");
|
||||
|
@ -243,6 +252,7 @@ int32_t snapshotSend(SSyncSnapshotSender *pSender) {
|
|||
pMsg->srcId = pSender->pSyncNode->myRaftId;
|
||||
pMsg->destId = (pSender->pSyncNode->replicasId)[pSender->replicaIndex];
|
||||
pMsg->term = pSender->pSyncNode->pRaftStore->currentTerm;
|
||||
pMsg->beginIndex = pSender->snapshotParam.start;
|
||||
pMsg->lastIndex = pSender->snapshot.lastApplyIndex;
|
||||
pMsg->lastTerm = pSender->snapshot.lastApplyTerm;
|
||||
pMsg->lastConfigIndex = pSender->snapshot.lastConfigIndex;
|
||||
|
@ -281,11 +291,13 @@ int32_t snapshotReSend(SSyncSnapshotSender *pSender) {
|
|||
pMsg->srcId = pSender->pSyncNode->myRaftId;
|
||||
pMsg->destId = (pSender->pSyncNode->replicasId)[pSender->replicaIndex];
|
||||
pMsg->term = pSender->pSyncNode->pRaftStore->currentTerm;
|
||||
pMsg->beginIndex = pSender->snapshotParam.start;
|
||||
pMsg->lastIndex = pSender->snapshot.lastApplyIndex;
|
||||
pMsg->lastTerm = pSender->snapshot.lastApplyTerm;
|
||||
pMsg->lastConfigIndex = pSender->snapshot.lastConfigIndex;
|
||||
pMsg->lastConfig = pSender->lastConfig;
|
||||
pMsg->seq = pSender->seq;
|
||||
pMsg->privateTerm = pSender->privateTerm;
|
||||
memcpy(pMsg->data, pSender->pCurrentBlock, pSender->blockLen);
|
||||
|
||||
// send msg
|
||||
|
@ -305,6 +317,12 @@ int32_t snapshotReSend(SSyncSnapshotSender *pSender) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
static void snapshotSenderUpdateProgress(SSyncSnapshotSender *pSender, SyncSnapshotRsp *pMsg) {
|
||||
ASSERT(pMsg->ack == pSender->seq);
|
||||
pSender->ack = pMsg->ack;
|
||||
++(pSender->seq);
|
||||
}
|
||||
|
||||
cJSON *snapshotSender2Json(SSyncSnapshotSender *pSender) {
|
||||
char u64buf[128];
|
||||
cJSON *pRoot = cJSON_CreateObject();
|
||||
|
@ -371,10 +389,11 @@ char *snapshotSender2SimpleStr(SSyncSnapshotSender *pSender, char *event) {
|
|||
syncUtilU642Addr(destId.addr, host, sizeof(host), &port);
|
||||
|
||||
snprintf(s, len,
|
||||
"%s {%p laindex:%ld laterm:%lu lcindex:%ld seq:%d ack:%d finish:%d pterm:%lu replica-index:%d %s:%d}", event,
|
||||
pSender, pSender->snapshot.lastApplyIndex, pSender->snapshot.lastApplyTerm,
|
||||
pSender->snapshot.lastConfigIndex, pSender->seq, pSender->ack, pSender->finish, pSender->privateTerm,
|
||||
pSender->replicaIndex, host, port);
|
||||
"%s {%p s-param:%ld e-param:%ld laindex:%ld laterm:%lu lcindex:%ld seq:%d ack:%d finish:%d pterm:%lu "
|
||||
"replica-index:%d %s:%d}",
|
||||
event, pSender, pSender->snapshotParam.start, pSender->snapshotParam.end, pSender->snapshot.lastApplyIndex,
|
||||
pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex, pSender->seq, pSender->ack,
|
||||
pSender->finish, pSender->privateTerm, pSender->replicaIndex, host, port);
|
||||
|
||||
return s;
|
||||
}
|
||||
|
@ -429,11 +448,10 @@ bool snapshotReceiverIsStart(SSyncSnapshotReceiver *pReceiver) { return pReceive
|
|||
// static do start by privateTerm, pBeginMsg
|
||||
// receive first snapshot data
|
||||
// write first block data
|
||||
static void snapshotReceiverDoStart(SSyncSnapshotReceiver *pReceiver, SyncTerm privateTerm,
|
||||
SyncSnapshotSend *pBeginMsg) {
|
||||
static void snapshotReceiverDoStart(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pBeginMsg) {
|
||||
// update state
|
||||
pReceiver->term = pReceiver->pSyncNode->pRaftStore->currentTerm;
|
||||
pReceiver->privateTerm = privateTerm;
|
||||
pReceiver->privateTerm = pBeginMsg->privateTerm;
|
||||
pReceiver->ack = SYNC_SNAPSHOT_SEQ_BEGIN;
|
||||
pReceiver->fromId = pBeginMsg->srcId;
|
||||
pReceiver->start = true;
|
||||
|
@ -445,7 +463,7 @@ static void snapshotReceiverDoStart(SSyncSnapshotReceiver *pReceiver, SyncTerm p
|
|||
pReceiver->snapshotParam.start = pBeginMsg->beginIndex;
|
||||
pReceiver->snapshotParam.end = pBeginMsg->lastIndex;
|
||||
|
||||
// write data
|
||||
// start writer
|
||||
ASSERT(pReceiver->pWriter == NULL);
|
||||
int32_t ret = pReceiver->pSyncNode->pFsm->FpSnapshotStartWrite(pReceiver->pSyncNode->pFsm,
|
||||
&(pReceiver->snapshotParam), &(pReceiver->pWriter));
|
||||
|
@ -481,10 +499,10 @@ static void snapshotReceiverForceStop(SSyncSnapshotReceiver *pReceiver) {
|
|||
|
||||
// if receiver receive msg from seq = SYNC_SNAPSHOT_SEQ_BEGIN, start receiver
|
||||
// if already start, force close, start again
|
||||
int32_t snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver, SyncTerm privateTerm, SyncSnapshotSend *pBeginMsg) {
|
||||
int32_t snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pBeginMsg) {
|
||||
if (!snapshotReceiverIsStart(pReceiver)) {
|
||||
// first start
|
||||
snapshotReceiverDoStart(pReceiver, privateTerm, pBeginMsg);
|
||||
snapshotReceiverDoStart(pReceiver, pBeginMsg);
|
||||
|
||||
} else {
|
||||
// already start
|
||||
|
@ -494,12 +512,14 @@ int32_t snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver, SyncTerm private
|
|||
snapshotReceiverForceStop(pReceiver);
|
||||
|
||||
// start again
|
||||
snapshotReceiverDoStart(pReceiver, privateTerm, pBeginMsg);
|
||||
snapshotReceiverDoStart(pReceiver, pBeginMsg);
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
// just set start = false
|
||||
// FpSnapshotStopWrite should not be called, assert writer == NULL
|
||||
int32_t snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver) {
|
||||
if (pReceiver->pWriter != NULL) {
|
||||
int32_t ret =
|
||||
|
@ -522,6 +542,7 @@ int32_t snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
// when recv last snapshot block, apply data into snapshot
|
||||
static int32_t snapshotReceiverFinish(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pMsg) {
|
||||
ASSERT(pMsg->seq == SYNC_SNAPSHOT_SEQ_END);
|
||||
|
||||
|
@ -550,7 +571,7 @@ static int32_t snapshotReceiverFinish(SSyncSnapshotReceiver *pReceiver, SyncSnap
|
|||
pReceiver->pSyncNode->commitIndex = pReceiver->snapshot.lastApplyIndex;
|
||||
}
|
||||
|
||||
// stop writer
|
||||
// stop writer, apply data
|
||||
code = pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, true);
|
||||
if (code != 0) {
|
||||
syncNodeErrorLog(pReceiver->pSyncNode, "snapshot stop writer true error");
|
||||
|
@ -579,15 +600,20 @@ static int32_t snapshotReceiverFinish(SSyncSnapshotReceiver *pReceiver, SyncSnap
|
|||
return 0;
|
||||
}
|
||||
|
||||
// apply data block
|
||||
// update progress
|
||||
static void snapshotReceiverGotData(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pMsg) {
|
||||
ASSERT(pMsg->seq == pReceiver->ack + 1);
|
||||
|
||||
if (pReceiver->pWriter != NULL) {
|
||||
if (pMsg->dataLen > 0) {
|
||||
// apply data block
|
||||
int32_t code = pReceiver->pSyncNode->pFsm->FpSnapshotDoWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter,
|
||||
pMsg->data, pMsg->dataLen);
|
||||
ASSERT(code == 0);
|
||||
}
|
||||
|
||||
// update progress
|
||||
pReceiver->ack = pMsg->seq;
|
||||
|
||||
// event log
|
||||
|
@ -665,14 +691,22 @@ char *snapshotReceiver2SimpleStr(SSyncSnapshotReceiver *pReceiver, char *event)
|
|||
uint16_t port;
|
||||
syncUtilU642Addr(fromId.addr, host, sizeof(host), &port);
|
||||
|
||||
snprintf(s, len, "%s {%p start:%d ack:%d term:%lu pterm:%lu from:%s:%d laindex:%ld laterm:%lu lcindex:%ld}", event,
|
||||
pReceiver, pReceiver->start, pReceiver->ack, pReceiver->term, pReceiver->privateTerm, host, port,
|
||||
pReceiver->snapshot.lastApplyIndex, pReceiver->snapshot.lastApplyTerm, pReceiver->snapshot.lastConfigIndex);
|
||||
snprintf(s, len,
|
||||
"%s {%p start:%d ack:%d term:%lu pterm:%lu from:%s:%d s-param:%ld e-param:%ld laindex:%ld laterm:%lu "
|
||||
"lcindex:%ld}",
|
||||
event, pReceiver, pReceiver->start, pReceiver->ack, pReceiver->term, pReceiver->privateTerm, host, port,
|
||||
pReceiver->snapshotParam.start, pReceiver->snapshotParam.end, pReceiver->snapshot.lastApplyIndex,
|
||||
pReceiver->snapshot.lastApplyTerm, pReceiver->snapshot.lastConfigIndex);
|
||||
|
||||
return s;
|
||||
}
|
||||
|
||||
// receiver do something
|
||||
// receiver on message
|
||||
//
|
||||
// condition 1, recv SYNC_SNAPSHOT_SEQ_BEGIN, start receiver, update privateTerm
|
||||
// condition 2, recv SYNC_SNAPSHOT_SEQ_END, finish receiver(apply snapshot data, update commit index, maybe reconfig)
|
||||
// condition 3, recv SYNC_SNAPSHOT_SEQ_FORCE_CLOSE, force close
|
||||
// condition 4, got data, update ack
|
||||
int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
|
||||
// get receiver
|
||||
SSyncSnapshotReceiver *pReceiver = pSyncNode->pNewNodeReceiver;
|
||||
|
@ -683,11 +717,13 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
|
|||
if (pSyncNode->state == TAOS_SYNC_STATE_FOLLOWER) {
|
||||
if (pMsg->term == pSyncNode->pRaftStore->currentTerm) {
|
||||
if (pMsg->seq == SYNC_SNAPSHOT_SEQ_BEGIN) {
|
||||
// condition 1
|
||||
// begin, no data
|
||||
snapshotReceiverStart(pReceiver, pMsg->privateTerm, pMsg);
|
||||
snapshotReceiverStart(pReceiver, pMsg);
|
||||
needRsp = true;
|
||||
|
||||
} else if (pMsg->seq == SYNC_SNAPSHOT_SEQ_END) {
|
||||
// condition 2
|
||||
// end, finish FSM
|
||||
code = snapshotReceiverFinish(pReceiver, pMsg);
|
||||
if (code == 0) {
|
||||
|
@ -697,7 +733,6 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
|
|||
|
||||
// maybe update lastconfig
|
||||
if (pMsg->lastConfigIndex >= SYNC_INDEX_BEGIN) {
|
||||
// int32_t oldReplicaNum = pSyncNode->replicaNum;
|
||||
SSyncCfg oldSyncCfg = pSyncNode->pRaftCfg->cfg;
|
||||
|
||||
// update new config myIndex
|
||||
|
@ -709,11 +744,13 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
|
|||
}
|
||||
|
||||
} else if (pMsg->seq == SYNC_SNAPSHOT_SEQ_FORCE_CLOSE) {
|
||||
// condition 3
|
||||
// force close
|
||||
snapshotReceiverForceStop(pReceiver);
|
||||
needRsp = false;
|
||||
|
||||
} else if (pMsg->seq > SYNC_SNAPSHOT_SEQ_BEGIN && pMsg->seq < SYNC_SNAPSHOT_SEQ_END) {
|
||||
// condition 4
|
||||
// transfering
|
||||
if (pMsg->seq == pReceiver->ack + 1) {
|
||||
snapshotReceiverGotData(pReceiver, pMsg);
|
||||
|
@ -752,6 +789,7 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
|
|||
syncNodeSendMsgById(&(pRspMsg->destId), pSyncNode, &rpcMsg);
|
||||
syncSnapshotRspDestroy(pRspMsg);
|
||||
}
|
||||
|
||||
} else {
|
||||
// error log
|
||||
do {
|
||||
|
@ -759,6 +797,8 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
|
|||
syncNodeErrorLog(pSyncNode, eventLog);
|
||||
taosMemoryFree(eventLog);
|
||||
} while (0);
|
||||
|
||||
return -1;
|
||||
}
|
||||
} else {
|
||||
// error log
|
||||
|
@ -767,19 +807,19 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
|
|||
syncNodeErrorLog(pSyncNode, eventLog);
|
||||
taosMemoryFree(eventLog);
|
||||
} while (0);
|
||||
|
||||
return -1;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static void snapshotSenderUpdateProgress(SSyncSnapshotSender *pSender, SyncSnapshotRsp *pMsg) {
|
||||
ASSERT(pMsg->ack == pSender->seq);
|
||||
pSender->ack = pMsg->ack;
|
||||
++(pSender->seq);
|
||||
}
|
||||
|
||||
// sender receives ack, set seq = ack + 1, send msg from seq
|
||||
// if ack == SYNC_SNAPSHOT_SEQ_END, stop sender
|
||||
// sender on message
|
||||
//
|
||||
// condition 1 sender receives SYNC_SNAPSHOT_SEQ_END, close sender
|
||||
// condition 2 sender receives ack, set seq = ack + 1, send msg from seq
|
||||
// condition 3 sender receives error msg, just print error log
|
||||
//
|
||||
int32_t syncNodeOnSnapshotRspCb(SSyncNode *pSyncNode, SyncSnapshotRsp *pMsg) {
|
||||
// if already drop replica, do not process
|
||||
if (!syncNodeInRaftGroup(pSyncNode, &(pMsg->srcId)) && pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
|
||||
|
@ -794,12 +834,14 @@ int32_t syncNodeOnSnapshotRspCb(SSyncNode *pSyncNode, SyncSnapshotRsp *pMsg) {
|
|||
// state, term, seq/ack
|
||||
if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
|
||||
if (pMsg->term == pSyncNode->pRaftStore->currentTerm) {
|
||||
// receiver ack is finish, close sender
|
||||
// condition 1
|
||||
// receive ack is finish, close sender
|
||||
if (pMsg->ack == SYNC_SNAPSHOT_SEQ_END) {
|
||||
snapshotSenderStop(pSender, true);
|
||||
return 0;
|
||||
}
|
||||
|
||||
// condition 2
|
||||
// send next msg
|
||||
if (pMsg->ack == pSender->seq) {
|
||||
// update sender ack
|
||||
|
@ -807,6 +849,7 @@ int32_t syncNodeOnSnapshotRspCb(SSyncNode *pSyncNode, SyncSnapshotRsp *pMsg) {
|
|||
snapshotSend(pSender);
|
||||
|
||||
} else if (pMsg->ack == pSender->seq - 1) {
|
||||
// maybe resend
|
||||
snapshotReSend(pSender);
|
||||
|
||||
} else {
|
||||
|
|
Loading…
Reference in New Issue