Merge pull request #14313 from taosdata/feature/3.0_mhli
refactor(sync): refactor snapshot code
This commit is contained in:
commit
474b6d1ee3
|
@ -57,7 +57,7 @@ SSyncSnapshotSender *snapshotSenderCreate(SSyncNode *pSyncNode, int32_t replicaI
|
||||||
void snapshotSenderDestroy(SSyncSnapshotSender *pSender);
|
void snapshotSenderDestroy(SSyncSnapshotSender *pSender);
|
||||||
bool snapshotSenderIsStart(SSyncSnapshotSender *pSender);
|
bool snapshotSenderIsStart(SSyncSnapshotSender *pSender);
|
||||||
void snapshotSenderStart(SSyncSnapshotSender *pSender, SSnapshot snapshot, void *pReader);
|
void snapshotSenderStart(SSyncSnapshotSender *pSender, SSnapshot snapshot, void *pReader);
|
||||||
void snapshotSenderStop(SSyncSnapshotSender *pSender);
|
void snapshotSenderStop(SSyncSnapshotSender *pSender, bool finish);
|
||||||
int32_t snapshotSend(SSyncSnapshotSender *pSender);
|
int32_t snapshotSend(SSyncSnapshotSender *pSender);
|
||||||
int32_t snapshotReSend(SSyncSnapshotSender *pSender);
|
int32_t snapshotReSend(SSyncSnapshotSender *pSender);
|
||||||
|
|
||||||
|
@ -82,7 +82,7 @@ SSyncSnapshotReceiver *snapshotReceiverCreate(SSyncNode *pSyncNode, SRaftId from
|
||||||
void snapshotReceiverDestroy(SSyncSnapshotReceiver *pReceiver);
|
void snapshotReceiverDestroy(SSyncSnapshotReceiver *pReceiver);
|
||||||
void snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver, SyncTerm privateTerm, SyncSnapshotSend *pBeginMsg);
|
void snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver, SyncTerm privateTerm, SyncSnapshotSend *pBeginMsg);
|
||||||
bool snapshotReceiverIsStart(SSyncSnapshotReceiver *pReceiver);
|
bool snapshotReceiverIsStart(SSyncSnapshotReceiver *pReceiver);
|
||||||
void snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver, bool apply);
|
void snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver);
|
||||||
|
|
||||||
cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver);
|
cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver);
|
||||||
char *snapshotReceiver2Str(SSyncSnapshotReceiver *pReceiver);
|
char *snapshotReceiver2Str(SSyncSnapshotReceiver *pReceiver);
|
||||||
|
|
|
@ -240,7 +240,10 @@ int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntries
|
||||||
SSyncSnapshotSender* pSender = syncNodeGetSnapshotSender(ths, &(pMsg->srcId));
|
SSyncSnapshotSender* pSender = syncNodeGetSnapshotSender(ths, &(pMsg->srcId));
|
||||||
ASSERT(pSender != NULL);
|
ASSERT(pSender != NULL);
|
||||||
|
|
||||||
SSnapshot snapshot;
|
SSnapshot snapshot = {.data = NULL,
|
||||||
|
.lastApplyIndex = SYNC_INDEX_INVALID,
|
||||||
|
.lastApplyTerm = 0,
|
||||||
|
.lastConfigIndex = SYNC_INDEX_INVALID};
|
||||||
void* pReader = NULL;
|
void* pReader = NULL;
|
||||||
ths->pFsm->FpGetSnapshot(ths->pFsm, &snapshot, NULL, &pReader);
|
ths->pFsm->FpGetSnapshot(ths->pFsm, &snapshot, NULL, &pReader);
|
||||||
if (snapshot.lastApplyIndex >= SYNC_INDEX_BEGIN && nextIndex <= snapshot.lastApplyIndex + 1 &&
|
if (snapshot.lastApplyIndex >= SYNC_INDEX_BEGIN && nextIndex <= snapshot.lastApplyIndex + 1 &&
|
||||||
|
@ -249,10 +252,6 @@ int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntries
|
||||||
ASSERT(pReader != NULL);
|
ASSERT(pReader != NULL);
|
||||||
snapshotSenderStart(pSender, snapshot, pReader);
|
snapshotSenderStart(pSender, snapshot, pReader);
|
||||||
|
|
||||||
char* eventLog = snapshotSender2SimpleStr(pSender, "snapshot sender start");
|
|
||||||
syncNodeEventLog(ths, eventLog);
|
|
||||||
taosMemoryFree(eventLog);
|
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
// no snapshot
|
// no snapshot
|
||||||
if (pReader != NULL) {
|
if (pReader != NULL) {
|
||||||
|
@ -260,23 +259,6 @@ int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntries
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
|
||||||
bool hasSnapshot = syncNodeHasSnapshot(ths);
|
|
||||||
SSnapshot snapshot;
|
|
||||||
ths->pFsm->FpGetSnapshotInfo(ths->pFsm, &snapshot);
|
|
||||||
|
|
||||||
// start sending snapshot first time
|
|
||||||
// start here, stop by receiver
|
|
||||||
if (hasSnapshot && nextIndex <= snapshot.lastApplyIndex + 1 && !snapshotSenderIsStart(pSender) &&
|
|
||||||
pMsg->privateTerm < pSender->privateTerm) {
|
|
||||||
snapshotSenderStart(pSender);
|
|
||||||
|
|
||||||
char* eventLog = snapshotSender2SimpleStr(pSender, "snapshot sender start");
|
|
||||||
syncNodeEventLog(ths, eventLog);
|
|
||||||
taosMemoryFree(eventLog);
|
|
||||||
}
|
|
||||||
*/
|
|
||||||
|
|
||||||
SyncIndex sentryIndex = pSender->snapshot.lastApplyIndex + 1;
|
SyncIndex sentryIndex = pSender->snapshot.lastApplyIndex + 1;
|
||||||
|
|
||||||
// update nextIndex to sentryIndex
|
// update nextIndex to sentryIndex
|
||||||
|
@ -300,5 +282,5 @@ int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntries
|
||||||
syncIndexMgrLog2("recv sync-append-entries-reply, after pNextIndex:", ths->pNextIndex);
|
syncIndexMgrLog2("recv sync-append-entries-reply, after pNextIndex:", ths->pNextIndex);
|
||||||
syncIndexMgrLog2("recv sync-append-entries-reply, after pMatchIndex:", ths->pMatchIndex);
|
syncIndexMgrLog2("recv sync-append-entries-reply, after pMatchIndex:", ths->pMatchIndex);
|
||||||
|
|
||||||
return ret;
|
return 0;
|
||||||
}
|
}
|
|
@ -24,6 +24,7 @@
|
||||||
//----------------------------------
|
//----------------------------------
|
||||||
static void snapshotReceiverDoStart(SSyncSnapshotReceiver *pReceiver, SyncTerm privateTerm,
|
static void snapshotReceiverDoStart(SSyncSnapshotReceiver *pReceiver, SyncTerm privateTerm,
|
||||||
SyncSnapshotSend *pBeginMsg);
|
SyncSnapshotSend *pBeginMsg);
|
||||||
|
static void snapshotReceiverGotData(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pMsg);
|
||||||
|
|
||||||
//----------------------------------
|
//----------------------------------
|
||||||
SSyncSnapshotSender *snapshotSenderCreate(SSyncNode *pSyncNode, int32_t replicaIndex) {
|
SSyncSnapshotSender *snapshotSenderCreate(SSyncNode *pSyncNode, int32_t replicaIndex) {
|
||||||
|
@ -50,7 +51,7 @@ SSyncSnapshotSender *snapshotSenderCreate(SSyncNode *pSyncNode, int32_t replicaI
|
||||||
pSender->pSyncNode->pFsm->FpGetSnapshotInfo(pSender->pSyncNode->pFsm, &(pSender->snapshot));
|
pSender->pSyncNode->pFsm->FpGetSnapshotInfo(pSender->pSyncNode->pFsm, &(pSender->snapshot));
|
||||||
pSender->finish = false;
|
pSender->finish = false;
|
||||||
} else {
|
} else {
|
||||||
sError("vgId:%d cannot create snapshot sender", pSyncNode->vgId);
|
sError("vgId:%d, cannot create snapshot sender", pSyncNode->vgId);
|
||||||
}
|
}
|
||||||
|
|
||||||
return pSender;
|
return pSender;
|
||||||
|
@ -127,7 +128,7 @@ void snapshotSenderStart(SSyncSnapshotSender *pSender, SSnapshot snapshot, void
|
||||||
syncEntryDestory(pEntry);
|
syncEntryDestory(pEntry);
|
||||||
} else {
|
} else {
|
||||||
if (pSender->snapshot.lastConfigIndex == pSender->pSyncNode->pRaftCfg->lastConfigIndex) {
|
if (pSender->snapshot.lastConfigIndex == pSender->pSyncNode->pRaftCfg->lastConfigIndex) {
|
||||||
sTrace("vgId:%d sync sender get cfg from local", pSender->pSyncNode->vgId);
|
sTrace("vgId:%d, sync sender get cfg from local", pSender->pSyncNode->vgId);
|
||||||
pSender->lastConfig = pSender->pSyncNode->pRaftCfg->cfg;
|
pSender->lastConfig = pSender->pSyncNode->pRaftCfg->cfg;
|
||||||
getLastConfig = true;
|
getLastConfig = true;
|
||||||
}
|
}
|
||||||
|
@ -176,13 +177,13 @@ void snapshotSenderStart(SSyncSnapshotSender *pSender, SSnapshot snapshot, void
|
||||||
|
|
||||||
// event log
|
// event log
|
||||||
do {
|
do {
|
||||||
char *eventLog = snapshotSender2SimpleStr(pSender, "snapshot sender send");
|
char *eventLog = snapshotSender2SimpleStr(pSender, "snapshot sender start");
|
||||||
syncNodeEventLog(pSender->pSyncNode, eventLog);
|
syncNodeEventLog(pSender->pSyncNode, eventLog);
|
||||||
taosMemoryFree(eventLog);
|
taosMemoryFree(eventLog);
|
||||||
} while (0);
|
} while (0);
|
||||||
}
|
}
|
||||||
|
|
||||||
void snapshotSenderStop(SSyncSnapshotSender *pSender) {
|
void snapshotSenderStop(SSyncSnapshotSender *pSender, bool finish) {
|
||||||
// close reader
|
// close reader
|
||||||
if (pSender->pReader != NULL) {
|
if (pSender->pReader != NULL) {
|
||||||
int32_t ret = pSender->pSyncNode->pFsm->FpSnapshotStopRead(pSender->pSyncNode->pFsm, pSender->pReader);
|
int32_t ret = pSender->pSyncNode->pFsm->FpSnapshotStopRead(pSender->pSyncNode->pFsm, pSender->pReader);
|
||||||
|
@ -199,6 +200,14 @@ void snapshotSenderStop(SSyncSnapshotSender *pSender) {
|
||||||
|
|
||||||
// update flag
|
// update flag
|
||||||
pSender->start = false;
|
pSender->start = false;
|
||||||
|
pSender->finish = finish;
|
||||||
|
|
||||||
|
// event log
|
||||||
|
do {
|
||||||
|
char *eventLog = snapshotSender2SimpleStr(pSender, "snapshot sender stop");
|
||||||
|
syncNodeEventLog(pSender->pSyncNode, eventLog);
|
||||||
|
taosMemoryFree(eventLog);
|
||||||
|
} while (0);
|
||||||
}
|
}
|
||||||
|
|
||||||
// when sender receive ack, call this function to send msg from seq
|
// when sender receive ack, call this function to send msg from seq
|
||||||
|
@ -386,7 +395,7 @@ SSyncSnapshotReceiver *snapshotReceiverCreate(SSyncNode *pSyncNode, SRaftId from
|
||||||
pReceiver->snapshot.lastConfigIndex = SYNC_INDEX_INVALID;
|
pReceiver->snapshot.lastConfigIndex = SYNC_INDEX_INVALID;
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
sError("vgId:%d cannot create snapshot receiver", pSyncNode->vgId);
|
sError("vgId:%d, cannot create snapshot receiver", pSyncNode->vgId);
|
||||||
}
|
}
|
||||||
|
|
||||||
return pReceiver;
|
return pReceiver;
|
||||||
|
@ -409,9 +418,9 @@ void snapshotReceiverDestroy(SSyncSnapshotReceiver *pReceiver) {
|
||||||
|
|
||||||
bool snapshotReceiverIsStart(SSyncSnapshotReceiver *pReceiver) { return pReceiver->start; }
|
bool snapshotReceiverIsStart(SSyncSnapshotReceiver *pReceiver) { return pReceiver->start; }
|
||||||
|
|
||||||
// static do start
|
// static do start by privateTerm, pBeginMsg
|
||||||
// receive first snapshot data
|
// receive first snapshot data
|
||||||
// privateTerm, pBeginMsg
|
// write first block data
|
||||||
static void snapshotReceiverDoStart(SSyncSnapshotReceiver *pReceiver, SyncTerm privateTerm,
|
static void snapshotReceiverDoStart(SSyncSnapshotReceiver *pReceiver, SyncTerm privateTerm,
|
||||||
SyncSnapshotSend *pBeginMsg) {
|
SyncSnapshotSend *pBeginMsg) {
|
||||||
// update state
|
// update state
|
||||||
|
@ -419,6 +428,7 @@ static void snapshotReceiverDoStart(SSyncSnapshotReceiver *pReceiver, SyncTerm p
|
||||||
pReceiver->privateTerm = privateTerm;
|
pReceiver->privateTerm = privateTerm;
|
||||||
pReceiver->ack = SYNC_SNAPSHOT_SEQ_BEGIN;
|
pReceiver->ack = SYNC_SNAPSHOT_SEQ_BEGIN;
|
||||||
pReceiver->fromId = pBeginMsg->srcId;
|
pReceiver->fromId = pBeginMsg->srcId;
|
||||||
|
pReceiver->start = true;
|
||||||
|
|
||||||
// update snapshot
|
// update snapshot
|
||||||
pReceiver->snapshot.lastApplyIndex = pBeginMsg->lastIndex;
|
pReceiver->snapshot.lastApplyIndex = pBeginMsg->lastIndex;
|
||||||
|
@ -429,8 +439,16 @@ static void snapshotReceiverDoStart(SSyncSnapshotReceiver *pReceiver, SyncTerm p
|
||||||
ASSERT(pReceiver->pWriter == NULL);
|
ASSERT(pReceiver->pWriter == NULL);
|
||||||
int32_t ret = pReceiver->pSyncNode->pFsm->FpSnapshotStartWrite(pReceiver->pSyncNode->pFsm, &(pReceiver->pWriter));
|
int32_t ret = pReceiver->pSyncNode->pFsm->FpSnapshotStartWrite(pReceiver->pSyncNode->pFsm, &(pReceiver->pWriter));
|
||||||
ASSERT(ret == 0);
|
ASSERT(ret == 0);
|
||||||
|
|
||||||
|
// event log
|
||||||
|
do {
|
||||||
|
char *eventLog = snapshotReceiver2SimpleStr(pReceiver, "snapshot receiver start");
|
||||||
|
syncNodeEventLog(pReceiver->pSyncNode, eventLog);
|
||||||
|
taosMemoryFree(eventLog);
|
||||||
|
} while (0);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// force stop
|
||||||
static void snapshotReceiverForceStop(SSyncSnapshotReceiver *pReceiver) {
|
static void snapshotReceiverForceStop(SSyncSnapshotReceiver *pReceiver) {
|
||||||
// force close, abandon incomplete data
|
// force close, abandon incomplete data
|
||||||
if (pReceiver->pWriter != NULL) {
|
if (pReceiver->pWriter != NULL) {
|
||||||
|
@ -441,33 +459,35 @@ static void snapshotReceiverForceStop(SSyncSnapshotReceiver *pReceiver) {
|
||||||
}
|
}
|
||||||
|
|
||||||
pReceiver->start = false;
|
pReceiver->start = false;
|
||||||
|
|
||||||
|
// event log
|
||||||
|
do {
|
||||||
|
char *eventLog = snapshotReceiver2SimpleStr(pReceiver, "snapshot receiver force stop");
|
||||||
|
syncNodeEventLog(pReceiver->pSyncNode, eventLog);
|
||||||
|
taosMemoryFree(eventLog);
|
||||||
|
} while (0);
|
||||||
}
|
}
|
||||||
|
|
||||||
// if receiver receive msg from seq = SYNC_SNAPSHOT_SEQ_BEGIN, start receiver
|
// if receiver receive msg from seq = SYNC_SNAPSHOT_SEQ_BEGIN, start receiver
|
||||||
// if already start, force close, start again
|
// if already start, force close, start again
|
||||||
void snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver, SyncTerm privateTerm, SyncSnapshotSend *pBeginMsg) {
|
void snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver, SyncTerm privateTerm, SyncSnapshotSend *pBeginMsg) {
|
||||||
if (!snapshotReceiverIsStart(pReceiver)) {
|
if (!snapshotReceiverIsStart(pReceiver)) {
|
||||||
// start
|
// first start
|
||||||
snapshotReceiverDoStart(pReceiver, privateTerm, pBeginMsg);
|
snapshotReceiverDoStart(pReceiver, privateTerm, pBeginMsg);
|
||||||
pReceiver->start = true;
|
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
// already start
|
// already start
|
||||||
sInfo("snapshot recv, receiver already start");
|
sInfo("vgId:%d, snapshot recv, receiver already start", pReceiver->pSyncNode->vgId);
|
||||||
|
|
||||||
// force close, abandon incomplete data
|
// force close, abandon incomplete data
|
||||||
int32_t ret =
|
snapshotReceiverForceStop(pReceiver);
|
||||||
pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, false);
|
|
||||||
ASSERT(ret == 0);
|
|
||||||
pReceiver->pWriter = NULL;
|
|
||||||
|
|
||||||
// start again
|
// start again
|
||||||
snapshotReceiverDoStart(pReceiver, privateTerm, pBeginMsg);
|
snapshotReceiverDoStart(pReceiver, privateTerm, pBeginMsg);
|
||||||
pReceiver->start = true;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver, bool apply) {
|
void snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver) {
|
||||||
if (pReceiver->pWriter != NULL) {
|
if (pReceiver->pWriter != NULL) {
|
||||||
int32_t ret =
|
int32_t ret =
|
||||||
pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, false);
|
pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, false);
|
||||||
|
@ -477,8 +497,69 @@ void snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver, bool apply) {
|
||||||
|
|
||||||
pReceiver->start = false;
|
pReceiver->start = false;
|
||||||
|
|
||||||
if (apply) {
|
// event log
|
||||||
// ++(pReceiver->privateTerm);
|
do {
|
||||||
|
SSnapshot snapshot;
|
||||||
|
pReceiver->pSyncNode->pFsm->FpGetSnapshotInfo(pReceiver->pSyncNode->pFsm, &snapshot);
|
||||||
|
char *eventLog = snapshotReceiver2SimpleStr(pReceiver, "snapshot receiver stop");
|
||||||
|
syncNodeEventLog(pReceiver->pSyncNode, eventLog);
|
||||||
|
taosMemoryFree(eventLog);
|
||||||
|
} while (0);
|
||||||
|
}
|
||||||
|
|
||||||
|
static void snapshotReceiverFinish(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pMsg) {
|
||||||
|
ASSERT(pMsg->seq == SYNC_SNAPSHOT_SEQ_END);
|
||||||
|
|
||||||
|
if (pReceiver->pWriter != NULL) {
|
||||||
|
int32_t code = 0;
|
||||||
|
if (pMsg->dataLen > 0) {
|
||||||
|
code = pReceiver->pSyncNode->pFsm->FpSnapshotDoWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, pMsg->data,
|
||||||
|
pMsg->dataLen);
|
||||||
|
ASSERT(code == 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
code = pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, true);
|
||||||
|
ASSERT(code == 0);
|
||||||
|
pReceiver->pWriter = NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
pReceiver->ack = SYNC_SNAPSHOT_SEQ_END;
|
||||||
|
|
||||||
|
// update commit index
|
||||||
|
if (pReceiver->snapshot.lastApplyIndex > pReceiver->pSyncNode->commitIndex) {
|
||||||
|
pReceiver->pSyncNode->commitIndex = pReceiver->snapshot.lastApplyIndex;
|
||||||
|
}
|
||||||
|
|
||||||
|
// reset wal
|
||||||
|
pReceiver->pSyncNode->pLogStore->syncLogRestoreFromSnapshot(pReceiver->pSyncNode->pLogStore, pMsg->lastIndex);
|
||||||
|
|
||||||
|
// event log
|
||||||
|
do {
|
||||||
|
SSnapshot snapshot;
|
||||||
|
pReceiver->pSyncNode->pFsm->FpGetSnapshotInfo(pReceiver->pSyncNode->pFsm, &snapshot);
|
||||||
|
char *eventLog = snapshotReceiver2SimpleStr(pReceiver, "snapshot receiver got last data, finish, apply snapshot");
|
||||||
|
syncNodeEventLog(pReceiver->pSyncNode, eventLog);
|
||||||
|
taosMemoryFree(eventLog);
|
||||||
|
} while (0);
|
||||||
|
}
|
||||||
|
|
||||||
|
static void snapshotReceiverGotData(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pMsg) {
|
||||||
|
ASSERT(pMsg->seq == pReceiver->ack + 1);
|
||||||
|
|
||||||
|
if (pReceiver->pWriter != NULL) {
|
||||||
|
if (pMsg->dataLen > 0) {
|
||||||
|
int32_t code = pReceiver->pSyncNode->pFsm->FpSnapshotDoWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter,
|
||||||
|
pMsg->data, pMsg->dataLen);
|
||||||
|
ASSERT(code == 0);
|
||||||
|
}
|
||||||
|
pReceiver->ack = pMsg->seq;
|
||||||
|
|
||||||
|
// event log
|
||||||
|
do {
|
||||||
|
char *eventLog = snapshotReceiver2SimpleStr(pReceiver, "snapshot receiver receiving");
|
||||||
|
syncNodeEventLog(pReceiver->pSyncNode, eventLog);
|
||||||
|
taosMemoryFree(eventLog);
|
||||||
|
} while (0);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -560,33 +641,20 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
|
||||||
// get receiver
|
// get receiver
|
||||||
SSyncSnapshotReceiver *pReceiver = pSyncNode->pNewNodeReceiver;
|
SSyncSnapshotReceiver *pReceiver = pSyncNode->pNewNodeReceiver;
|
||||||
bool needRsp = false;
|
bool needRsp = false;
|
||||||
int32_t writeCode = 0;
|
|
||||||
|
|
||||||
// state, term, seq/ack
|
// state, term, seq/ack
|
||||||
if (pSyncNode->state == TAOS_SYNC_STATE_FOLLOWER) {
|
if (pSyncNode->state == TAOS_SYNC_STATE_FOLLOWER) {
|
||||||
if (pMsg->term == pSyncNode->pRaftStore->currentTerm) {
|
if (pMsg->term == pSyncNode->pRaftStore->currentTerm) {
|
||||||
if (pMsg->seq == SYNC_SNAPSHOT_SEQ_BEGIN) {
|
if (pMsg->seq == SYNC_SNAPSHOT_SEQ_BEGIN) {
|
||||||
// begin
|
// begin, no data
|
||||||
snapshotReceiverStart(pReceiver, pMsg->privateTerm, pMsg);
|
snapshotReceiverStart(pReceiver, pMsg->privateTerm, pMsg);
|
||||||
pReceiver->ack = pMsg->seq;
|
|
||||||
needRsp = true;
|
needRsp = true;
|
||||||
|
|
||||||
char *eventLog = snapshotReceiver2SimpleStr(pReceiver, "snapshot receiver begin");
|
|
||||||
syncNodeEventLog(pSyncNode, eventLog);
|
|
||||||
taosMemoryFree(eventLog);
|
|
||||||
|
|
||||||
} else if (pMsg->seq == SYNC_SNAPSHOT_SEQ_END) {
|
} else if (pMsg->seq == SYNC_SNAPSHOT_SEQ_END) {
|
||||||
// end, finish FSM
|
// end, finish FSM
|
||||||
writeCode = pSyncNode->pFsm->FpSnapshotDoWrite(pSyncNode->pFsm, pReceiver->pWriter, pMsg->data, pMsg->dataLen);
|
snapshotReceiverFinish(pReceiver, pMsg);
|
||||||
ASSERT(writeCode == 0);
|
snapshotReceiverStop(pReceiver);
|
||||||
|
needRsp = true;
|
||||||
pSyncNode->pFsm->FpSnapshotStopWrite(pSyncNode->pFsm, pReceiver->pWriter, true);
|
|
||||||
if (pReceiver->snapshot.lastApplyIndex > pReceiver->pSyncNode->commitIndex) {
|
|
||||||
pReceiver->pSyncNode->commitIndex = pReceiver->snapshot.lastApplyIndex;
|
|
||||||
}
|
|
||||||
|
|
||||||
// pSyncNode->pLogStore->syncLogSetBeginIndex(pSyncNode->pLogStore, pMsg->lastIndex + 1);
|
|
||||||
pSyncNode->pLogStore->syncLogRestoreFromSnapshot(pSyncNode->pLogStore, pMsg->lastIndex);
|
|
||||||
|
|
||||||
// maybe update lastconfig
|
// maybe update lastconfig
|
||||||
if (pMsg->lastConfigIndex >= SYNC_INDEX_BEGIN) {
|
if (pMsg->lastConfigIndex >= SYNC_INDEX_BEGIN) {
|
||||||
|
@ -601,89 +669,74 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
|
||||||
syncNodeDoConfigChange(pSyncNode, &newSyncCfg, pMsg->lastConfigIndex);
|
syncNodeDoConfigChange(pSyncNode, &newSyncCfg, pMsg->lastConfigIndex);
|
||||||
}
|
}
|
||||||
|
|
||||||
SSnapshot snapshot;
|
|
||||||
pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
|
|
||||||
|
|
||||||
do {
|
|
||||||
char *eventLog = snapshotReceiver2SimpleStr(pReceiver, "snapshot receiver finish, apply snapshot");
|
|
||||||
syncNodeEventLog(pSyncNode, eventLog);
|
|
||||||
taosMemoryFree(eventLog);
|
|
||||||
} while (0);
|
|
||||||
|
|
||||||
pReceiver->pWriter = NULL;
|
|
||||||
snapshotReceiverStop(pReceiver, true);
|
|
||||||
pReceiver->ack = pMsg->seq;
|
|
||||||
needRsp = true;
|
|
||||||
|
|
||||||
do {
|
|
||||||
char *eventLog = snapshotReceiver2SimpleStr(pReceiver, "snapshot receiver stop");
|
|
||||||
syncNodeEventLog(pSyncNode, eventLog);
|
|
||||||
taosMemoryFree(eventLog);
|
|
||||||
} while (0);
|
|
||||||
|
|
||||||
} else if (pMsg->seq == SYNC_SNAPSHOT_SEQ_FORCE_CLOSE) {
|
} else if (pMsg->seq == SYNC_SNAPSHOT_SEQ_FORCE_CLOSE) {
|
||||||
pSyncNode->pFsm->FpSnapshotStopWrite(pSyncNode->pFsm, pReceiver->pWriter, false);
|
// force close
|
||||||
snapshotReceiverStop(pReceiver, false);
|
snapshotReceiverForceStop(pReceiver);
|
||||||
needRsp = false;
|
needRsp = false;
|
||||||
|
|
||||||
do {
|
|
||||||
char *eventLog = snapshotReceiver2SimpleStr(pReceiver, "snapshot receiver force close");
|
|
||||||
syncNodeEventLog(pSyncNode, eventLog);
|
|
||||||
taosMemoryFree(eventLog);
|
|
||||||
} while (0);
|
|
||||||
|
|
||||||
} else if (pMsg->seq > SYNC_SNAPSHOT_SEQ_BEGIN && pMsg->seq < SYNC_SNAPSHOT_SEQ_END) {
|
} else if (pMsg->seq > SYNC_SNAPSHOT_SEQ_BEGIN && pMsg->seq < SYNC_SNAPSHOT_SEQ_END) {
|
||||||
// transfering
|
// transfering
|
||||||
if (pMsg->seq == pReceiver->ack + 1) {
|
if (pMsg->seq == pReceiver->ack + 1) {
|
||||||
writeCode =
|
snapshotReceiverGotData(pReceiver, pMsg);
|
||||||
pSyncNode->pFsm->FpSnapshotDoWrite(pSyncNode->pFsm, pReceiver->pWriter, pMsg->data, pMsg->dataLen);
|
|
||||||
ASSERT(writeCode == 0);
|
|
||||||
pReceiver->ack = pMsg->seq;
|
|
||||||
}
|
}
|
||||||
needRsp = true;
|
needRsp = true;
|
||||||
|
|
||||||
do {
|
|
||||||
char *eventLog = snapshotReceiver2SimpleStr(pReceiver, "snapshot receiver receiving");
|
|
||||||
syncNodeEventLog(pSyncNode, eventLog);
|
|
||||||
taosMemoryFree(eventLog);
|
|
||||||
} while (0);
|
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
ASSERT(0);
|
ASSERT(0);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// send ack
|
||||||
if (needRsp) {
|
if (needRsp) {
|
||||||
|
// build msg
|
||||||
SyncSnapshotRsp *pRspMsg = syncSnapshotRspBuild(pSyncNode->vgId);
|
SyncSnapshotRsp *pRspMsg = syncSnapshotRspBuild(pSyncNode->vgId);
|
||||||
pRspMsg->srcId = pSyncNode->myRaftId;
|
pRspMsg->srcId = pSyncNode->myRaftId;
|
||||||
pRspMsg->destId = pMsg->srcId;
|
pRspMsg->destId = pMsg->srcId;
|
||||||
pRspMsg->term = pSyncNode->pRaftStore->currentTerm;
|
pRspMsg->term = pSyncNode->pRaftStore->currentTerm;
|
||||||
pRspMsg->lastIndex = pMsg->lastIndex;
|
pRspMsg->lastIndex = pMsg->lastIndex;
|
||||||
pRspMsg->lastTerm = pMsg->lastTerm;
|
pRspMsg->lastTerm = pMsg->lastTerm;
|
||||||
pRspMsg->ack = pReceiver->ack;
|
pRspMsg->ack = pReceiver->ack; // receiver maybe already closed
|
||||||
pRspMsg->code = writeCode;
|
pRspMsg->code = 0;
|
||||||
pRspMsg->privateTerm = pReceiver->privateTerm;
|
pRspMsg->privateTerm = pReceiver->privateTerm; // receiver maybe already closed
|
||||||
|
|
||||||
|
// send msg
|
||||||
SRpcMsg rpcMsg;
|
SRpcMsg rpcMsg;
|
||||||
syncSnapshotRsp2RpcMsg(pRspMsg, &rpcMsg);
|
syncSnapshotRsp2RpcMsg(pRspMsg, &rpcMsg);
|
||||||
syncNodeSendMsgById(&(pRspMsg->destId), pSyncNode, &rpcMsg);
|
syncNodeSendMsgById(&(pRspMsg->destId), pSyncNode, &rpcMsg);
|
||||||
|
|
||||||
syncSnapshotRspDestroy(pRspMsg);
|
syncSnapshotRspDestroy(pRspMsg);
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
// error log
|
||||||
|
do {
|
||||||
|
char *eventLog = snapshotReceiver2SimpleStr(pReceiver, "snapshot receiver term not equal");
|
||||||
|
syncNodeErrorLog(pSyncNode, eventLog);
|
||||||
|
taosMemoryFree(eventLog);
|
||||||
|
} while (0);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
syncNodeLog2("syncNodeOnSnapshotSendCb not follower", pSyncNode);
|
// error log
|
||||||
|
do {
|
||||||
|
char *eventLog = snapshotReceiver2SimpleStr(pReceiver, "snapshot receiver not follower");
|
||||||
|
syncNodeErrorLog(pSyncNode, eventLog);
|
||||||
|
taosMemoryFree(eventLog);
|
||||||
|
} while (0);
|
||||||
}
|
}
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void snapshotSenderUpdateProgress(SSyncSnapshotSender *pSender, SyncSnapshotRsp *pMsg) {
|
||||||
|
ASSERT(pMsg->ack == pSender->seq);
|
||||||
|
pSender->ack = pMsg->ack;
|
||||||
|
++(pSender->seq);
|
||||||
|
}
|
||||||
|
|
||||||
// sender receives ack, set seq = ack + 1, send msg from seq
|
// sender receives ack, set seq = ack + 1, send msg from seq
|
||||||
// if ack == SYNC_SNAPSHOT_SEQ_END, stop sender
|
// if ack == SYNC_SNAPSHOT_SEQ_END, stop sender
|
||||||
int32_t syncNodeOnSnapshotRspCb(SSyncNode *pSyncNode, SyncSnapshotRsp *pMsg) {
|
int32_t syncNodeOnSnapshotRspCb(SSyncNode *pSyncNode, SyncSnapshotRsp *pMsg) {
|
||||||
// if already drop replica, do not process
|
// if already drop replica, do not process
|
||||||
if (!syncNodeInRaftGroup(pSyncNode, &(pMsg->srcId)) && pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
|
if (!syncNodeInRaftGroup(pSyncNode, &(pMsg->srcId)) && pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
|
||||||
sInfo("recv SyncSnapshotRsp maybe replica already dropped");
|
sError("vgId:%d, recv sync-snapshot-rsp, maybe replica already dropped", pSyncNode->vgId);
|
||||||
return 0;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
// get sender
|
// get sender
|
||||||
|
@ -695,27 +748,49 @@ int32_t syncNodeOnSnapshotRspCb(SSyncNode *pSyncNode, SyncSnapshotRsp *pMsg) {
|
||||||
if (pMsg->term == pSyncNode->pRaftStore->currentTerm) {
|
if (pMsg->term == pSyncNode->pRaftStore->currentTerm) {
|
||||||
// receiver ack is finish, close sender
|
// receiver ack is finish, close sender
|
||||||
if (pMsg->ack == SYNC_SNAPSHOT_SEQ_END) {
|
if (pMsg->ack == SYNC_SNAPSHOT_SEQ_END) {
|
||||||
pSender->finish = true;
|
snapshotSenderStop(pSender, true);
|
||||||
snapshotSenderStop(pSender);
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
// send next msg
|
// send next msg
|
||||||
if (pMsg->ack == pSender->seq) {
|
if (pMsg->ack == pSender->seq) {
|
||||||
// update sender ack
|
// update sender ack
|
||||||
pSender->ack = pMsg->ack;
|
snapshotSenderUpdateProgress(pSender, pMsg);
|
||||||
(pSender->seq)++;
|
|
||||||
snapshotSend(pSender);
|
snapshotSend(pSender);
|
||||||
|
|
||||||
} else if (pMsg->ack == pSender->seq - 1) {
|
} else if (pMsg->ack == pSender->seq - 1) {
|
||||||
snapshotReSend(pSender);
|
snapshotReSend(pSender);
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
ASSERT(0);
|
do {
|
||||||
}
|
char logBuf[64];
|
||||||
|
snprintf(logBuf, sizeof(logBuf), "error ack, recv ack:%d, my seq:%d", pMsg->ack, pSender->seq);
|
||||||
|
char *eventLog = snapshotSender2SimpleStr(pSender, logBuf);
|
||||||
|
syncNodeErrorLog(pSyncNode, eventLog);
|
||||||
|
taosMemoryFree(eventLog);
|
||||||
|
} while (0);
|
||||||
|
|
||||||
|
return -1;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
syncNodeLog2("syncNodeOnSnapshotRspCb not leader", pSyncNode);
|
// error log
|
||||||
|
do {
|
||||||
|
char *eventLog = snapshotSender2SimpleStr(pSender, "snapshot sender term not equal");
|
||||||
|
syncNodeErrorLog(pSyncNode, eventLog);
|
||||||
|
taosMemoryFree(eventLog);
|
||||||
|
} while (0);
|
||||||
|
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// error log
|
||||||
|
do {
|
||||||
|
char *eventLog = snapshotSender2SimpleStr(pSender, "snapshot sender not leader");
|
||||||
|
syncNodeErrorLog(pSyncNode, eventLog);
|
||||||
|
taosMemoryFree(eventLog);
|
||||||
|
} while (0);
|
||||||
|
|
||||||
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
|
|
Loading…
Reference in New Issue