From eee4c0853d6df2bff2a997305f709a104eafdca8 Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Wed, 17 Aug 2022 10:34:53 +0800 Subject: [PATCH] refactor(sync): add syncNodeAppendEntriesOnePeer --- include/libs/sync/sync.h | 11 +-- source/libs/sync/inc/syncReplication.h | 2 + source/libs/sync/src/syncReplication.c | 115 +++++++++++++++++++++++++ 3 files changed, 123 insertions(+), 5 deletions(-) diff --git a/include/libs/sync/sync.h b/include/libs/sync/sync.h index 6d8895eb96..7cd2ebdede 100644 --- a/include/libs/sync/sync.h +++ b/include/libs/sync/sync.h @@ -26,11 +26,12 @@ extern "C" { extern bool gRaftDetailLog; -#define SYNC_RESP_TTL_MS 10000000 -#define SYNC_SPEED_UP_HB_TIMER 400 -#define SYNC_SPEED_UP_AFTER_MS (1000 * 20) -#define SYNC_SLOW_DOWN_RANGE 100 -#define SYNC_MAX_READ_RANGE 10 +#define SYNC_RESP_TTL_MS 10000000 +#define SYNC_SPEED_UP_HB_TIMER 400 +#define SYNC_SPEED_UP_AFTER_MS (1000 * 20) +#define SYNC_SLOW_DOWN_RANGE 100 +#define SYNC_MAX_READ_RANGE 10 +#define SYNC_MAX_PROGRESS_WAIT_MS 4000 #define SYNC_MAX_BATCH_SIZE 1 #define SYNC_INDEX_BEGIN 0 diff --git a/source/libs/sync/inc/syncReplication.h b/source/libs/sync/inc/syncReplication.h index 21821be6c7..edce124ee5 100644 --- a/source/libs/sync/inc/syncReplication.h +++ b/source/libs/sync/inc/syncReplication.h @@ -55,6 +55,8 @@ int32_t syncNodeAppendEntriesPeers(SSyncNode* pSyncNode); int32_t syncNodeAppendEntriesPeersSnapshot(SSyncNode* pSyncNode); int32_t syncNodeAppendEntriesPeersSnapshot2(SSyncNode* pSyncNode); +int32_t syncNodeAppendEntriesOnePeer(SSyncNode* pSyncNode, SRaftId* pDestId, SyncIndex nextIndex); + int32_t syncNodeReplicate(SSyncNode* pSyncNode, bool isTimer); int32_t syncNodeAppendEntries(SSyncNode* pSyncNode, const SRaftId* destRaftId, const SyncAppendEntries* pMsg); int32_t syncNodeAppendEntriesBatch(SSyncNode* pSyncNode, const SRaftId* destRaftId, const SyncAppendEntriesBatch* pMsg); diff --git a/source/libs/sync/src/syncReplication.c b/source/libs/sync/src/syncReplication.c index 24f75de5d3..886f7ad199 100644 --- a/source/libs/sync/src/syncReplication.c +++ b/source/libs/sync/src/syncReplication.c @@ -116,6 +116,120 @@ int32_t syncNodeAppendEntriesPeers(SSyncNode* pSyncNode) { return ret; } +int32_t syncNodeAppendEntriesOnePeer(SSyncNode* pSyncNode, SRaftId* pDestId, SyncIndex nextIndex) { + int32_t ret = 0; + + // pre index, pre term + SyncIndex preLogIndex = syncNodeGetPreIndex(pSyncNode, nextIndex); + SyncTerm preLogTerm = syncNodeGetPreTerm(pSyncNode, nextIndex); + if (preLogTerm == SYNC_TERM_INVALID) { + SyncIndex newNextIndex = syncNodeGetLastIndex(pSyncNode) + 1; + // SyncIndex newNextIndex = nextIndex + 1; + + syncIndexMgrSetIndex(pSyncNode->pNextIndex, pDestId, newNextIndex); + syncIndexMgrSetIndex(pSyncNode->pMatchIndex, pDestId, SYNC_INDEX_INVALID); + sError("vgId:%d, sync get pre term error, nextIndex:%" PRId64 ", update next-index:%" PRId64 + ", match-index:%d, raftid:%" PRId64, + pSyncNode->vgId, nextIndex, newNextIndex, SYNC_INDEX_INVALID, pDestId->addr); + return -1; + } + + // entry pointer array + SSyncRaftEntry* entryPArr[SYNC_MAX_BATCH_SIZE]; + memset(entryPArr, 0, sizeof(entryPArr)); + + // get entry batch + int32_t getCount = 0; + SyncIndex getEntryIndex = nextIndex; + for (int32_t i = 0; i < pSyncNode->pRaftCfg->batchSize; ++i) { + SSyncRaftEntry* pEntry = NULL; + int32_t code = pSyncNode->pLogStore->syncLogGetEntry(pSyncNode->pLogStore, getEntryIndex, &pEntry); + if (code == 0) { + ASSERT(pEntry != NULL); + entryPArr[i] = pEntry; + getCount++; + getEntryIndex++; + + } else { + break; + } + } + + // event log + do { + char logBuf[128]; + char host[64]; + uint16_t port; + syncUtilU642Addr(pDestId->addr, host, sizeof(host), &port); + snprintf(logBuf, sizeof(logBuf), "build batch:%d for %s:%d", getCount, host, port); + syncNodeEventLog(pSyncNode, logBuf); + } while (0); + + // build msg + SyncAppendEntriesBatch* pMsg = syncAppendEntriesBatchBuild(entryPArr, getCount, pSyncNode->vgId); + ASSERT(pMsg != NULL); + + // free entries + for (int32_t i = 0; i < pSyncNode->pRaftCfg->batchSize; ++i) { + SSyncRaftEntry* pEntry = entryPArr[i]; + if (pEntry != NULL) { + syncEntryDestory(pEntry); + entryPArr[i] = NULL; + } + } + + // prepare msg + pMsg->srcId = pSyncNode->myRaftId; + pMsg->destId = *pDestId; + pMsg->term = pSyncNode->pRaftStore->currentTerm; + pMsg->prevLogIndex = preLogIndex; + pMsg->prevLogTerm = preLogTerm; + pMsg->commitIndex = pSyncNode->commitIndex; + pMsg->privateTerm = 0; + pMsg->dataCount = getCount; + + // send msg + syncNodeAppendEntriesBatch(pSyncNode, pDestId, pMsg); + + // speed up + if (pMsg->dataCount > 0 && pSyncNode->commitIndex - pMsg->prevLogIndex > SYNC_SLOW_DOWN_RANGE) { + ret = 1; + +#if 0 + do { + char logBuf[128]; + char host[64]; + uint16_t port; + syncUtilU642Addr(pDestId->addr, host, sizeof(host), &port); + snprintf(logBuf, sizeof(logBuf), "maybe speed up for %s:%d, pre-index:%ld", host, port, pMsg->prevLogIndex); + syncNodeEventLog(pSyncNode, logBuf); + } while (0); +#endif + } + + syncAppendEntriesBatchDestroy(pMsg); + + return ret; +} + +int32_t syncNodeAppendEntriesPeersSnapshot2(SSyncNode* pSyncNode) { + if (pSyncNode->state != TAOS_SYNC_STATE_LEADER) { + return -1; + } + + int32_t ret = 0; + for (int i = 0; i < pSyncNode->peersNum; ++i) { + SRaftId* pDestId = &(pSyncNode->peersId[i]); + + // next index + SyncIndex nextIndex = syncIndexMgrGetIndex(pSyncNode->pNextIndex, pDestId); + ret = syncNodeAppendEntriesOnePeer(pSyncNode, pDestId, nextIndex); + } + + return ret; +} + +#if 0 int32_t syncNodeAppendEntriesPeersSnapshot2(SSyncNode* pSyncNode) { if (pSyncNode->state != TAOS_SYNC_STATE_LEADER) { return -1; @@ -221,6 +335,7 @@ int32_t syncNodeAppendEntriesPeersSnapshot2(SSyncNode* pSyncNode) { return ret; } +#endif int32_t syncNodeAppendEntriesPeersSnapshot(SSyncNode* pSyncNode) { ASSERT(pSyncNode->state == TAOS_SYNC_STATE_LEADER);