enh(sync): syncNodeOnAppendEntriesSnapshotCb
This commit is contained in:
parent
7917716b4f
commit
057fda21ce
|
@ -432,4 +432,352 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMsg) { return 0; }
|
int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
|
||||||
|
int32_t ret = 0;
|
||||||
|
|
||||||
|
char logBuf[128] = {0};
|
||||||
|
snprintf(logBuf, sizeof(logBuf), "==syncNodeOnAppendEntriesSnapshotCb== term:%lu", ths->pRaftStore->currentTerm);
|
||||||
|
syncAppendEntriesLog2(logBuf, pMsg);
|
||||||
|
|
||||||
|
if (pMsg->term > ths->pRaftStore->currentTerm) {
|
||||||
|
syncNodeUpdateTerm(ths, pMsg->term);
|
||||||
|
}
|
||||||
|
assert(pMsg->term <= ths->pRaftStore->currentTerm);
|
||||||
|
|
||||||
|
// reset elect timer
|
||||||
|
if (pMsg->term == ths->pRaftStore->currentTerm) {
|
||||||
|
ths->leaderCache = pMsg->srcId;
|
||||||
|
syncNodeResetElectTimer(ths);
|
||||||
|
}
|
||||||
|
assert(pMsg->dataLen >= 0);
|
||||||
|
|
||||||
|
SyncIndex localPreLogIndex;
|
||||||
|
SyncTerm localPreLogTerm;
|
||||||
|
ret = syncNodeGetPreIndexTerm(ths, pMsg->prevLogIndex + 1, &localPreLogIndex, &localPreLogTerm);
|
||||||
|
ASSERT(ret == 0);
|
||||||
|
|
||||||
|
SyncIndex localLastIndex;
|
||||||
|
SyncTerm localLastTerm;
|
||||||
|
ret = syncNodeGetLastIndexTerm(ths, &localLastIndex, &localLastTerm);
|
||||||
|
ASSERT(ret == 0);
|
||||||
|
|
||||||
|
bool logOK = (pMsg->prevLogIndex == SYNC_INDEX_INVALID) ||
|
||||||
|
((pMsg->prevLogIndex >= SYNC_INDEX_BEGIN) && (pMsg->prevLogIndex <= localLastIndex) &&
|
||||||
|
(pMsg->prevLogTerm == localPreLogTerm));
|
||||||
|
|
||||||
|
// reject request
|
||||||
|
if ((pMsg->term < ths->pRaftStore->currentTerm) ||
|
||||||
|
((pMsg->term == ths->pRaftStore->currentTerm) && (ths->state == TAOS_SYNC_STATE_FOLLOWER) && !logOK)) {
|
||||||
|
sTrace(
|
||||||
|
"syncNodeOnAppendEntriesSnapshotCb --> reject, pMsg->term:%lu, ths->pRaftStore->currentTerm:%lu, "
|
||||||
|
"ths->state:%d, "
|
||||||
|
"logOK:%d",
|
||||||
|
pMsg->term, ths->pRaftStore->currentTerm, ths->state, logOK);
|
||||||
|
|
||||||
|
SyncAppendEntriesReply* pReply = syncAppendEntriesReplyBuild(ths->vgId);
|
||||||
|
pReply->srcId = ths->myRaftId;
|
||||||
|
pReply->destId = pMsg->srcId;
|
||||||
|
pReply->term = ths->pRaftStore->currentTerm;
|
||||||
|
pReply->success = false;
|
||||||
|
pReply->matchIndex = SYNC_INDEX_INVALID;
|
||||||
|
|
||||||
|
SRpcMsg rpcMsg;
|
||||||
|
syncAppendEntriesReply2RpcMsg(pReply, &rpcMsg);
|
||||||
|
syncNodeSendMsgById(&pReply->destId, ths, &rpcMsg);
|
||||||
|
syncAppendEntriesReplyDestroy(pReply);
|
||||||
|
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
|
// return to follower state
|
||||||
|
if (pMsg->term == ths->pRaftStore->currentTerm && ths->state == TAOS_SYNC_STATE_CANDIDATE) {
|
||||||
|
sTrace(
|
||||||
|
"syncNodeOnAppendEntriesSnapshotCb --> return to follower, pMsg->term:%lu, ths->pRaftStore->currentTerm:%lu, "
|
||||||
|
"ths->state:%d, logOK:%d",
|
||||||
|
pMsg->term, ths->pRaftStore->currentTerm, ths->state, logOK);
|
||||||
|
|
||||||
|
syncNodeBecomeFollower(ths);
|
||||||
|
|
||||||
|
// ret or reply?
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
|
// accept request
|
||||||
|
if (pMsg->term == ths->pRaftStore->currentTerm && ths->state == TAOS_SYNC_STATE_FOLLOWER && logOK) {
|
||||||
|
// preIndex = -1, or has preIndex entry in local log
|
||||||
|
assert(pMsg->prevLogIndex <= localLastIndex);
|
||||||
|
|
||||||
|
// has extra entries (> preIndex) in local log
|
||||||
|
bool hasExtraEntries = pMsg->prevLogIndex < localLastIndex;
|
||||||
|
|
||||||
|
// has entries in SyncAppendEntries msg
|
||||||
|
bool hasAppendEntries = pMsg->dataLen > 0;
|
||||||
|
|
||||||
|
sTrace(
|
||||||
|
"syncNodeOnAppendEntriesSnapshotCb --> accept, pMsg->term:%lu, ths->pRaftStore->currentTerm:%lu, "
|
||||||
|
"ths->state:%d, "
|
||||||
|
"logOK:%d, hasExtraEntries:%d, hasAppendEntries:%d",
|
||||||
|
pMsg->term, ths->pRaftStore->currentTerm, ths->state, logOK, hasExtraEntries, hasAppendEntries);
|
||||||
|
|
||||||
|
if (hasExtraEntries && hasAppendEntries) {
|
||||||
|
// not conflict by default
|
||||||
|
bool conflict = false;
|
||||||
|
|
||||||
|
SyncIndex extraIndex = pMsg->prevLogIndex + 1;
|
||||||
|
SSyncRaftEntry* pExtraEntry = ths->pLogStore->getEntry(ths->pLogStore, extraIndex);
|
||||||
|
assert(pExtraEntry != NULL);
|
||||||
|
|
||||||
|
SSyncRaftEntry* pAppendEntry = syncEntryDeserialize(pMsg->data, pMsg->dataLen);
|
||||||
|
assert(pAppendEntry != NULL);
|
||||||
|
|
||||||
|
// log not match, conflict
|
||||||
|
assert(extraIndex == pAppendEntry->index);
|
||||||
|
if (pExtraEntry->term != pAppendEntry->term) {
|
||||||
|
conflict = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (conflict) {
|
||||||
|
// roll back
|
||||||
|
SyncIndex delBegin = ths->pLogStore->getLastIndex(ths->pLogStore);
|
||||||
|
SyncIndex delEnd = extraIndex;
|
||||||
|
|
||||||
|
sTrace("syncNodeOnAppendEntriesSnapshotCb --> conflict:%d, delBegin:%ld, delEnd:%ld", conflict, delBegin,
|
||||||
|
delEnd);
|
||||||
|
|
||||||
|
// notice! reverse roll back!
|
||||||
|
for (SyncIndex index = delEnd; index >= delBegin; --index) {
|
||||||
|
if (ths->pFsm->FpRollBackCb != NULL) {
|
||||||
|
SSyncRaftEntry* pRollBackEntry = ths->pLogStore->getEntry(ths->pLogStore, index);
|
||||||
|
assert(pRollBackEntry != NULL);
|
||||||
|
|
||||||
|
// if (pRollBackEntry->msgType != TDMT_VND_SYNC_NOOP) {
|
||||||
|
if (syncUtilUserRollback(pRollBackEntry->msgType)) {
|
||||||
|
SRpcMsg rpcMsg;
|
||||||
|
syncEntry2OriginalRpc(pRollBackEntry, &rpcMsg);
|
||||||
|
|
||||||
|
SFsmCbMeta cbMeta;
|
||||||
|
cbMeta.index = pRollBackEntry->index;
|
||||||
|
cbMeta.isWeak = pRollBackEntry->isWeak;
|
||||||
|
cbMeta.code = 0;
|
||||||
|
cbMeta.state = ths->state;
|
||||||
|
cbMeta.seqNum = pRollBackEntry->seqNum;
|
||||||
|
ths->pFsm->FpRollBackCb(ths->pFsm, &rpcMsg, cbMeta);
|
||||||
|
rpcFreeCont(rpcMsg.pCont);
|
||||||
|
}
|
||||||
|
|
||||||
|
syncEntryDestory(pRollBackEntry);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// delete confict entries
|
||||||
|
ths->pLogStore->truncate(ths->pLogStore, extraIndex);
|
||||||
|
|
||||||
|
// append new entries
|
||||||
|
ths->pLogStore->appendEntry(ths->pLogStore, pAppendEntry);
|
||||||
|
|
||||||
|
// pre commit
|
||||||
|
SRpcMsg rpcMsg;
|
||||||
|
syncEntry2OriginalRpc(pAppendEntry, &rpcMsg);
|
||||||
|
if (ths->pFsm != NULL) {
|
||||||
|
// if (ths->pFsm->FpPreCommitCb != NULL && pAppendEntry->originalRpcType != TDMT_VND_SYNC_NOOP) {
|
||||||
|
if (ths->pFsm->FpPreCommitCb != NULL && syncUtilUserPreCommit(pAppendEntry->originalRpcType)) {
|
||||||
|
SFsmCbMeta cbMeta;
|
||||||
|
cbMeta.index = pAppendEntry->index;
|
||||||
|
cbMeta.isWeak = pAppendEntry->isWeak;
|
||||||
|
cbMeta.code = 2;
|
||||||
|
cbMeta.state = ths->state;
|
||||||
|
cbMeta.seqNum = pAppendEntry->seqNum;
|
||||||
|
ths->pFsm->FpPreCommitCb(ths->pFsm, &rpcMsg, cbMeta);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
rpcFreeCont(rpcMsg.pCont);
|
||||||
|
}
|
||||||
|
|
||||||
|
// free memory
|
||||||
|
syncEntryDestory(pExtraEntry);
|
||||||
|
syncEntryDestory(pAppendEntry);
|
||||||
|
|
||||||
|
} else if (hasExtraEntries && !hasAppendEntries) {
|
||||||
|
// do nothing
|
||||||
|
|
||||||
|
} else if (!hasExtraEntries && hasAppendEntries) {
|
||||||
|
SSyncRaftEntry* pAppendEntry = syncEntryDeserialize(pMsg->data, pMsg->dataLen);
|
||||||
|
assert(pAppendEntry != NULL);
|
||||||
|
|
||||||
|
// append new entries
|
||||||
|
ths->pLogStore->appendEntry(ths->pLogStore, pAppendEntry);
|
||||||
|
|
||||||
|
// pre commit
|
||||||
|
SRpcMsg rpcMsg;
|
||||||
|
syncEntry2OriginalRpc(pAppendEntry, &rpcMsg);
|
||||||
|
if (ths->pFsm != NULL) {
|
||||||
|
// if (ths->pFsm->FpPreCommitCb != NULL && pAppendEntry->originalRpcType != TDMT_VND_SYNC_NOOP) {
|
||||||
|
if (ths->pFsm->FpPreCommitCb != NULL && syncUtilUserPreCommit(pAppendEntry->originalRpcType)) {
|
||||||
|
SFsmCbMeta cbMeta;
|
||||||
|
cbMeta.index = pAppendEntry->index;
|
||||||
|
cbMeta.isWeak = pAppendEntry->isWeak;
|
||||||
|
cbMeta.code = 3;
|
||||||
|
cbMeta.state = ths->state;
|
||||||
|
cbMeta.seqNum = pAppendEntry->seqNum;
|
||||||
|
ths->pFsm->FpPreCommitCb(ths->pFsm, &rpcMsg, cbMeta);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
rpcFreeCont(rpcMsg.pCont);
|
||||||
|
|
||||||
|
// free memory
|
||||||
|
syncEntryDestory(pAppendEntry);
|
||||||
|
|
||||||
|
} else if (!hasExtraEntries && !hasAppendEntries) {
|
||||||
|
// do nothing
|
||||||
|
|
||||||
|
} else {
|
||||||
|
assert(0);
|
||||||
|
}
|
||||||
|
|
||||||
|
SyncAppendEntriesReply* pReply = syncAppendEntriesReplyBuild(ths->vgId);
|
||||||
|
pReply->srcId = ths->myRaftId;
|
||||||
|
pReply->destId = pMsg->srcId;
|
||||||
|
pReply->term = ths->pRaftStore->currentTerm;
|
||||||
|
pReply->success = true;
|
||||||
|
|
||||||
|
if (hasAppendEntries) {
|
||||||
|
pReply->matchIndex = pMsg->prevLogIndex + 1;
|
||||||
|
} else {
|
||||||
|
pReply->matchIndex = pMsg->prevLogIndex;
|
||||||
|
}
|
||||||
|
|
||||||
|
SRpcMsg rpcMsg;
|
||||||
|
syncAppendEntriesReply2RpcMsg(pReply, &rpcMsg);
|
||||||
|
syncNodeSendMsgById(&pReply->destId, ths, &rpcMsg);
|
||||||
|
syncAppendEntriesReplyDestroy(pReply);
|
||||||
|
|
||||||
|
// maybe update commit index from leader
|
||||||
|
if (pMsg->commitIndex > ths->commitIndex) {
|
||||||
|
// has commit entry in local
|
||||||
|
if (pMsg->commitIndex <= ths->pLogStore->getLastIndex(ths->pLogStore)) {
|
||||||
|
SyncIndex beginIndex = ths->commitIndex + 1;
|
||||||
|
SyncIndex endIndex = pMsg->commitIndex;
|
||||||
|
|
||||||
|
// update commit index
|
||||||
|
ths->commitIndex = pMsg->commitIndex;
|
||||||
|
|
||||||
|
// call back Wal
|
||||||
|
ths->pLogStore->updateCommitIndex(ths->pLogStore, ths->commitIndex);
|
||||||
|
|
||||||
|
// execute fsm
|
||||||
|
if (ths->pFsm != NULL) {
|
||||||
|
for (SyncIndex i = beginIndex; i <= endIndex; ++i) {
|
||||||
|
if (i != SYNC_INDEX_INVALID) {
|
||||||
|
SSyncRaftEntry* pEntry = ths->pLogStore->getEntry(ths->pLogStore, i);
|
||||||
|
assert(pEntry != NULL);
|
||||||
|
|
||||||
|
SRpcMsg rpcMsg;
|
||||||
|
syncEntry2OriginalRpc(pEntry, &rpcMsg);
|
||||||
|
|
||||||
|
if (ths->pFsm->FpCommitCb != NULL && syncUtilUserCommit(pEntry->originalRpcType)) {
|
||||||
|
SFsmCbMeta cbMeta;
|
||||||
|
cbMeta.index = pEntry->index;
|
||||||
|
cbMeta.isWeak = pEntry->isWeak;
|
||||||
|
cbMeta.code = 0;
|
||||||
|
cbMeta.state = ths->state;
|
||||||
|
cbMeta.seqNum = pEntry->seqNum;
|
||||||
|
cbMeta.term = pEntry->term;
|
||||||
|
cbMeta.currentTerm = ths->pRaftStore->currentTerm;
|
||||||
|
cbMeta.flag = 0x11;
|
||||||
|
|
||||||
|
SSnapshot snapshot;
|
||||||
|
ASSERT(ths->pFsm->FpGetSnapshot != NULL);
|
||||||
|
ths->pFsm->FpGetSnapshot(ths->pFsm, &snapshot);
|
||||||
|
|
||||||
|
bool needExecute = true;
|
||||||
|
if (cbMeta.index <= snapshot.lastApplyIndex) {
|
||||||
|
needExecute = false;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (needExecute) {
|
||||||
|
ths->pFsm->FpCommitCb(ths->pFsm, &rpcMsg, cbMeta);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// config change
|
||||||
|
if (pEntry->originalRpcType == TDMT_VND_SYNC_CONFIG_CHANGE) {
|
||||||
|
SSyncCfg oldSyncCfg = ths->pRaftCfg->cfg;
|
||||||
|
|
||||||
|
SSyncCfg newSyncCfg;
|
||||||
|
int32_t ret = syncCfgFromStr(rpcMsg.pCont, &newSyncCfg);
|
||||||
|
ASSERT(ret == 0);
|
||||||
|
|
||||||
|
// update new config myIndex
|
||||||
|
bool hit = false;
|
||||||
|
for (int i = 0; i < newSyncCfg.replicaNum; ++i) {
|
||||||
|
if (strcmp(ths->myNodeInfo.nodeFqdn, (newSyncCfg.nodeInfo)[i].nodeFqdn) == 0 &&
|
||||||
|
ths->myNodeInfo.nodePort == (newSyncCfg.nodeInfo)[i].nodePort) {
|
||||||
|
newSyncCfg.myIndex = i;
|
||||||
|
hit = true;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
SReConfigCbMeta cbMeta = {0};
|
||||||
|
bool isDrop;
|
||||||
|
|
||||||
|
// I am in newConfig
|
||||||
|
if (hit) {
|
||||||
|
syncNodeUpdateConfig(ths, &newSyncCfg, &isDrop);
|
||||||
|
|
||||||
|
// change isStandBy to normal
|
||||||
|
if (!isDrop) {
|
||||||
|
if (ths->state == TAOS_SYNC_STATE_LEADER) {
|
||||||
|
syncNodeBecomeLeader(ths);
|
||||||
|
} else {
|
||||||
|
syncNodeBecomeFollower(ths);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
char* sOld = syncCfg2Str(&oldSyncCfg);
|
||||||
|
char* sNew = syncCfg2Str(&newSyncCfg);
|
||||||
|
sInfo("==config change== 0x11 old:%s new:%s isDrop:%d \n", sOld, sNew, isDrop);
|
||||||
|
taosMemoryFree(sOld);
|
||||||
|
taosMemoryFree(sNew);
|
||||||
|
}
|
||||||
|
|
||||||
|
// always call FpReConfigCb
|
||||||
|
if (ths->pFsm->FpReConfigCb != NULL) {
|
||||||
|
cbMeta.code = 0;
|
||||||
|
cbMeta.currentTerm = ths->pRaftStore->currentTerm;
|
||||||
|
cbMeta.index = pEntry->index;
|
||||||
|
cbMeta.term = pEntry->term;
|
||||||
|
cbMeta.oldCfg = oldSyncCfg;
|
||||||
|
cbMeta.flag = 0x11;
|
||||||
|
cbMeta.isDrop = isDrop;
|
||||||
|
ths->pFsm->FpReConfigCb(ths->pFsm, newSyncCfg, cbMeta);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// restore finish
|
||||||
|
if (pEntry->index == ths->pLogStore->getLastIndex(ths->pLogStore)) {
|
||||||
|
if (ths->restoreFinish == false) {
|
||||||
|
if (ths->pFsm->FpRestoreFinishCb != NULL) {
|
||||||
|
ths->pFsm->FpRestoreFinishCb(ths->pFsm);
|
||||||
|
}
|
||||||
|
ths->restoreFinish = true;
|
||||||
|
sInfo("==syncNodeOnAppendEntriesSnapshotCb== restoreFinish set true %p vgId:%d", ths, ths->vgId);
|
||||||
|
|
||||||
|
/*
|
||||||
|
tsem_post(&ths->restoreSem);
|
||||||
|
sInfo("==syncNodeOnAppendEntriesSnapshotCb== RestoreFinish tsem_post %p", ths);
|
||||||
|
*/
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
rpcFreeCont(rpcMsg.pCont);
|
||||||
|
syncEntryDestory(pEntry);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return ret;
|
||||||
|
}
|
Loading…
Reference in New Issue