diff --git a/source/libs/sync/inc/syncPipeline.h b/source/libs/sync/inc/syncPipeline.h index ca07876def..e7980bc2dd 100644 --- a/source/libs/sync/inc/syncPipeline.h +++ b/source/libs/sync/inc/syncPipeline.h @@ -87,7 +87,7 @@ int32_t syncLogBufferReplicateOneTo(SSyncLogReplMgr* pMgr, SSyncNode* pNode, Sy SRaftId* pDestId, bool* pBarrier); int32_t syncLogReplMgrProcessReply(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEntriesReply* pMsg); int32_t syncLogReplMgrReplicateAttemptedOnce(SSyncLogReplMgr* pMgr, SSyncNode* pNode); -int32_t syncLogReplMgrReplicateProbeOnce(SSyncLogReplMgr* pMgr, SSyncNode* pNode); +int32_t syncLogReplMgrReplicateProbeOnce(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex index); int32_t syncLogReplMgrProcessReplyInRecoveryMode(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEntriesReply* pMsg); int32_t syncLogReplMgrProcessReplyInNormalMode(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEntriesReply* pMsg); int32_t syncLogReplMgrProcessHeartbeatReply(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncHeartbeatReply* pMsg); diff --git a/source/libs/sync/src/syncPipeline.c b/source/libs/sync/src/syncPipeline.c index cfea6e8bc6..b4f2541f9c 100644 --- a/source/libs/sync/src/syncPipeline.c +++ b/source/libs/sync/src/syncPipeline.c @@ -612,6 +612,7 @@ int32_t syncLogReplMgrRetryOnNeed(SSyncLogReplMgr* pMgr, SSyncNode* pNode) { pMgr->states[pos].timeMs = nowMs; pMgr->states[pos].term = term; pMgr->states[pos].acked = false; + retried = true; if (firstIndex == -1) firstIndex = index; count++; @@ -658,6 +659,7 @@ int32_t syncLogReplMgrProcessReplyInRecoveryMode(SSyncLogReplMgr* pMgr, SSyncNod pMgr->states[pMsg->lastSendIndex % pMgr->size].acked = true; if (pMsg->success && pMsg->matchIndex == pMsg->lastSendIndex) { + pMgr->matchIndex = pMsg->matchIndex; pMgr->restored = true; sInfo("vgId:%d, sync log repl mgr restored. peer: %s:%d (%" PRIx64 "), mgr: rs(%d) [%" PRId64 " %" PRId64 ", %" PRId64 "), buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")", @@ -667,8 +669,8 @@ int32_t syncLogReplMgrProcessReplyInRecoveryMode(SSyncLogReplMgr* pMgr, SSyncNod } if (pMsg->success == false && pMsg->matchIndex >= pMsg->lastSendIndex) { - sError("vgId:%d, failed to rollback match index. peer: %s:%d, match index: %" PRId64 ", last sent: %" PRId64, - pNode->vgId, host, port, pMsg->matchIndex, pMsg->lastSendIndex); + sWarn("vgId:%d, failed to rollback match index. peer: %s:%d, match index: %" PRId64 ", last sent: %" PRId64, + pNode->vgId, host, port, pMsg->matchIndex, pMsg->lastSendIndex); if (syncNodeStartSnapshot(pNode, &destId) < 0) { sError("vgId:%d, failed to start snapshot for peer %s:%d", pNode->vgId, host, port); return -1; @@ -676,8 +678,6 @@ int32_t syncLogReplMgrProcessReplyInRecoveryMode(SSyncLogReplMgr* pMgr, SSyncNod sInfo("vgId:%d, snapshot replication to peer %s:%d", pNode->vgId, host, port); return 0; } - - (void)syncLogReplMgrReset(pMgr); } // check last match term @@ -709,24 +709,8 @@ int32_t syncLogReplMgrProcessReplyInRecoveryMode(SSyncLogReplMgr* pMgr, SSyncNod } // attempt to replicate the raft log at index - bool barrier = false; - ASSERT(index >= 0); - if (syncLogBufferReplicateOneTo(pMgr, pNode, index, &term, &destId, &barrier) < 0) { - sError("vgId:%d, failed to replicate log entry since %s. index: %" PRId64 ", peer %s:%d", pNode->vgId, - terrstr(), index, host, port); - return -1; - } - - int64_t nowMs = taosGetMonoTimestampMs(); - pMgr->states[index % pMgr->size].barrier = barrier; - pMgr->states[index % pMgr->size].timeMs = nowMs; - pMgr->states[index % pMgr->size].term = term; - pMgr->states[index % pMgr->size].acked = false; - - pMgr->matchIndex = index; - pMgr->startIndex = index; - pMgr->endIndex = index + 1; - return 0; + (void)syncLogReplMgrReset(pMgr); + return syncLogReplMgrReplicateProbeOnce(pMgr, pNode, index); } int32_t syncLogReplMgrProcessHeartbeatReply(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncHeartbeatReply* pMsg) { @@ -766,14 +750,23 @@ int32_t syncLogReplMgrReplicateOnce(SSyncLogReplMgr* pMgr, SSyncNode* pNode) { if (pMgr->restored) { (void)syncLogReplMgrReplicateAttemptedOnce(pMgr, pNode); } else { - (void)syncLogReplMgrReplicateProbeOnce(pMgr, pNode); + (void)syncLogReplMgrReplicateProbeOnce(pMgr, pNode, pNode->pLogBuf->matchIndex); } return 0; } -int32_t syncLogReplMgrReplicateProbeOnce(SSyncLogReplMgr* pMgr, SSyncNode* pNode) { +int32_t syncLogReplMgrReplicateProbeOnce(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex index) { ASSERT(!pMgr->restored); - SyncIndex index = pNode->pLogBuf->matchIndex; + ASSERT(pMgr->startIndex >= 0); + int64_t retryMaxWaitMs = SYNC_LOG_REPL_RETRY_WAIT_MS * (1 << SYNC_MAX_RETRY_BACKOFF); + int64_t nowMs = taosGetMonoTimestampMs(); + + if (pMgr->endIndex > pMgr->startIndex && + nowMs < pMgr->states[pMgr->startIndex % pMgr->size].timeMs + retryMaxWaitMs) { + return 0; + } + (void)syncLogReplMgrReset(pMgr); + SRaftId* pDestId = &pNode->replicasId[pMgr->peerId]; bool barrier = false; SyncTerm term = -1; @@ -783,6 +776,15 @@ int32_t syncLogReplMgrReplicateProbeOnce(SSyncLogReplMgr* pMgr, SSyncNode* pNode return -1; } + ASSERT(index >= 0); + pMgr->states[index % pMgr->size].barrier = barrier; + pMgr->states[index % pMgr->size].timeMs = nowMs; + pMgr->states[index % pMgr->size].term = term; + pMgr->states[index % pMgr->size].acked = false; + + pMgr->startIndex = index; + pMgr->endIndex = index + 1; + SSyncLogBuffer* pBuf = pNode->pLogBuf; sTrace("vgId:%d, attempted to probe the %d'th peer with msg of index:%" PRId64 " term: %" PRId64 ". pMgr(rs:%d): [%" PRId64 " %" PRId64 ", %" PRId64 "), pBuf: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64