enh: proceed sync log buffer on failure of appending too

This commit is contained in:
Benguang Zhao 2023-10-12 20:32:04 +08:00
parent 488fd88eb6
commit 4ca897246c
1 changed files with 9 additions and 6 deletions

View File

@ -2860,11 +2860,12 @@ int32_t syncNodeChangeConfig(SSyncNode* ths, SSyncRaftEntry* pEntry, char* str){
} }
int32_t syncNodeAppend(SSyncNode* ths, SSyncRaftEntry* pEntry) { int32_t syncNodeAppend(SSyncNode* ths, SSyncRaftEntry* pEntry) {
int32_t code = -1;
if (pEntry->dataLen < sizeof(SMsgHead)) { if (pEntry->dataLen < sizeof(SMsgHead)) {
sError("vgId:%d, cannot append an invalid client request with no msg head. type:%s, dataLen:%d", ths->vgId, sError("vgId:%d, cannot append an invalid client request with no msg head. type:%s, dataLen:%d", ths->vgId,
TMSG_INFO(pEntry->originalRpcType), pEntry->dataLen); TMSG_INFO(pEntry->originalRpcType), pEntry->dataLen);
syncEntryDestroy(pEntry); syncEntryDestroy(pEntry);
return -1; goto _out;
} }
// append to log buffer // append to log buffer
@ -2873,9 +2874,11 @@ int32_t syncNodeAppend(SSyncNode* ths, SSyncRaftEntry* pEntry) {
ASSERT(terrno != 0); ASSERT(terrno != 0);
(void)syncFsmExecute(ths, ths->pFsm, ths->state, raftStoreGetTerm(ths), pEntry, terrno, false); (void)syncFsmExecute(ths, ths->pFsm, ths->state, raftStoreGetTerm(ths), pEntry, terrno, false);
syncEntryDestroy(pEntry); syncEntryDestroy(pEntry);
return -1; goto _out;
} }
code = 0;
_out:;
// proceed match index, with replicating on needed // proceed match index, with replicating on needed
SyncIndex matchIndex = syncLogBufferProceed(ths->pLogBuf, ths, NULL, "Append"); SyncIndex matchIndex = syncLogBufferProceed(ths->pLogBuf, ths, NULL, "Append");
@ -2886,7 +2889,7 @@ int32_t syncNodeAppend(SSyncNode* ths, SSyncRaftEntry* pEntry) {
// multi replica // multi replica
if (ths->replicaNum > 1) { if (ths->replicaNum > 1) {
return 0; return code;
} }
// single replica // single replica
@ -2894,10 +2897,10 @@ int32_t syncNodeAppend(SSyncNode* ths, SSyncRaftEntry* pEntry) {
if (syncLogBufferCommit(ths->pLogBuf, ths, ths->commitIndex) < 0) { if (syncLogBufferCommit(ths->pLogBuf, ths, ths->commitIndex) < 0) {
sError("vgId:%d, failed to commit until commitIndex:%" PRId64 "", ths->vgId, ths->commitIndex); sError("vgId:%d, failed to commit until commitIndex:%" PRId64 "", ths->vgId, ths->commitIndex);
return -1; code = -1;
} }
return 0; return code;
} }
bool syncNodeHeartbeatReplyTimeout(SSyncNode* pSyncNode) { bool syncNodeHeartbeatReplyTimeout(SSyncNode* pSyncNode) {