sync refactor

This commit is contained in:
Minghao Li 2022-03-16 21:11:20 +08:00
parent acea32ed41
commit 4d4fabf403
2 changed files with 12 additions and 4 deletions

View File

@ -101,7 +101,7 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
assert(pMsg->dataLen >= 0); assert(pMsg->dataLen >= 0);
SyncTerm localPreLogTerm = 0; SyncTerm localPreLogTerm = 0;
if (pMsg->prevLogTerm >= SYNC_INDEX_BEGIN && pMsg->prevLogTerm <= ths->pLogStore->getLastIndex(ths->pLogStore)) { if (pMsg->prevLogIndex >= SYNC_INDEX_BEGIN && pMsg->prevLogIndex <= ths->pLogStore->getLastIndex(ths->pLogStore)) {
SSyncRaftEntry* pEntry = logStoreGetEntry(ths->pLogStore, pMsg->prevLogIndex); SSyncRaftEntry* pEntry = logStoreGetEntry(ths->pLogStore, pMsg->prevLogIndex);
assert(pEntry != NULL); assert(pEntry != NULL);
localPreLogTerm = pEntry->term; localPreLogTerm = pEntry->term;
@ -174,7 +174,12 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
pReply->destId = pMsg->srcId; pReply->destId = pMsg->srcId;
pReply->term = ths->pRaftStore->currentTerm; pReply->term = ths->pRaftStore->currentTerm;
pReply->success = true; pReply->success = true;
if (pMsg->dataLen > 0) {
pReply->matchIndex = pMsg->prevLogIndex + 1; pReply->matchIndex = pMsg->prevLogIndex + 1;
} else {
pReply->matchIndex = pMsg->prevLogIndex;
}
SRpcMsg rpcMsg; SRpcMsg rpcMsg;
syncAppendEntriesReply2RpcMsg(pReply, &rpcMsg); syncAppendEntriesReply2RpcMsg(pReply, &rpcMsg);

View File

@ -73,7 +73,8 @@ int32_t syncNodeAppendEntriesPeers(SSyncNode* pSyncNode) {
SyncAppendEntries* pMsg = NULL; SyncAppendEntries* pMsg = NULL;
SSyncRaftEntry* pEntry = logStoreGetEntry(pSyncNode->pLogStore, nextIndex); SSyncRaftEntry* pEntry = logStoreGetEntry(pSyncNode->pLogStore, nextIndex);
if (pEntry != NULL) { if (pEntry != NULL) {
SyncAppendEntries* pMsg = syncAppendEntriesBuild(pEntry->bytes); pMsg = syncAppendEntriesBuild(pEntry->bytes);
assert(pMsg != NULL);
// add pEntry into msg // add pEntry into msg
uint32_t len; uint32_t len;
@ -86,9 +87,11 @@ int32_t syncNodeAppendEntriesPeers(SSyncNode* pSyncNode) {
} else { } else {
// maybe overflow, send empty record // maybe overflow, send empty record
SyncAppendEntries* pMsg = syncAppendEntriesBuild(0); pMsg = syncAppendEntriesBuild(0);
assert(pMsg != NULL);
} }
assert(pMsg != NULL);
pMsg->srcId = pSyncNode->myRaftId; pMsg->srcId = pSyncNode->myRaftId;
pMsg->destId = *pDestId; pMsg->destId = *pDestId;
pMsg->term = pSyncNode->pRaftStore->currentTerm; pMsg->term = pSyncNode->pRaftStore->currentTerm;