diff --git a/include/libs/sync/sync.h b/include/libs/sync/sync.h index 1c228675bd..ef8773f5cc 100644 --- a/include/libs/sync/sync.h +++ b/include/libs/sync/sync.h @@ -61,13 +61,13 @@ typedef struct { typedef struct SSyncFSM { void* pData; - // apply committed log, bufs will be free by raft module + // apply committed log, bufs will be free by sync module int32_t (*applyLog)(struct SSyncFSM* fsm, SyncIndex index, const SSyncBuffer* buf, void* pData); // cluster commit callback int32_t (*onClusterChanged)(struct SSyncFSM* fsm, const SSyncCluster* cluster, void* pData); - // fsm return snapshot in ppBuf, bufs will be free by raft module + // fsm return snapshot in ppBuf, bufs will be free by sync module // TODO: getSnapshot SHOULD be async? int32_t (*getSnapshot)(struct SSyncFSM* fsm, SSyncBuffer** ppBuf, int32_t* objId, bool* isLast); @@ -89,18 +89,24 @@ typedef struct SSyncLogStore { // write log with given index int32_t (*logWrite)(struct SSyncLogStore* logStore, SyncIndex index, SSyncBuffer* pBuf); - // read log from given index with limit, return the actual num in nBuf + /** + * read log from given index(included) with limit, return the actual num in nBuf, + * pBuf will be free in sync module + **/ int32_t (*logRead)(struct SSyncLogStore* logStore, SyncIndex index, int limit, SSyncBuffer* pBuf, int* nBuf); // mark log with given index has been commtted int32_t (*logCommit)(struct SSyncLogStore* logStore, SyncIndex index); - // prune log before given index + // prune log before given index(not included) int32_t (*logPrune)(struct SSyncLogStore* logStore, SyncIndex index); - // rollback log after given index + // rollback log after given index(included) int32_t (*logRollback)(struct SSyncLogStore* logStore, SyncIndex index); + + // return last index of log + SyncIndex (*logLastIndex)(struct SSyncLogStore* logStore); } SSyncLogStore; typedef struct SSyncServerState { diff --git a/source/libs/sync/inc/raft.h b/source/libs/sync/inc/raft.h index f81040658e..869baecdda 100644 --- a/source/libs/sync/inc/raft.h +++ b/source/libs/sync/inc/raft.h @@ -17,15 +17,46 @@ #define _TD_LIBS_SYNC_RAFT_H #include "sync.h" +#include "sync_type.h" #include "raft_message.h" -typedef struct SSyncRaft { +typedef struct SSyncRaftProgress SSyncRaftProgress; + +typedef struct RaftLeaderState { + int nProgress; + SSyncRaftProgress* progress; +} RaftLeaderState; + +typedef struct SSyncRaftIOMethods { + SyncTime (*time)(SSyncRaft*); + +} SSyncRaftIOMethods; + +struct SSyncRaft { // owner sync node SSyncNode* pNode; SSyncInfo info; -} SSyncRaft; + // election timeout tick(random in [3:6] tick) + uint16_t electionTick; + + // heartbeat timeout tick(default: 1 tick) + uint16_t heartbeatTick; + + int installSnapShotTimeoutMS; + + // + int heartbeatTimeoutMS; + + bool preVote; + + SSyncRaftIOMethods io; + + RaftLeaderState leaderState; + + SSyncRaftUnstableLog *log; +}; int32_t syncRaftStart(SSyncRaft* pRaft, const SSyncInfo* pInfo); int32_t syncRaftStep(SSyncRaft* pRaft, const RaftMessage* pMsg); diff --git a/source/libs/sync/inc/raft_progress.h b/source/libs/sync/inc/raft_progress.h new file mode 100644 index 0000000000..73aa9db59f --- /dev/null +++ b/source/libs/sync/inc/raft_progress.h @@ -0,0 +1,181 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef TD_SYNC_RAFT_PROGRESS_H +#define TD_SYNC_RAFT_PROGRESS_H + +#include "sync_type.h" + +/** + * SSyncRaftInflights is a sliding window for the inflight messages. + * Thus inflight effectively limits both the number of inflight messages + * and the bandwidth each Progress can use. + * When inflights is full, no more message should be sent. + * When a leader sends out a message, the index of the last + * entry should be added to inflights. The index MUST be added + * into inflights in order. + * When a leader receives a reply, the previous inflights should + * be freed by calling syncRaftInflightFreeTo with the index of the last + * received entry. + **/ +typedef struct SSyncRaftInflights { + /* the starting index in the buffer */ + int start; + + /* number of inflights in the buffer */ + int count; + + /* the size of the buffer */ + int size; + + /** + * buffer contains the index of the last entry + * inside one message. + **/ + SyncIndex* buffer; +} SSyncRaftInflights; + +/** + * State defines how the leader should interact with the follower. + * + * When in PROGRESS_PROBE, leader sends at most one replication message + * per heartbeat interval. It also probes actual progress of the follower. + * + * When in PROGRESS_REPLICATE, leader optimistically increases next + * to the latest entry sent after sending replication message. This is + * an optimized state for fast replicating log entries to the follower. + * + * When in PROGRESS_SNAPSHOT, leader should have sent out snapshot + * before and stops sending any replication message. + * + * PROGRESS_PROBE is the initial state. + **/ +typedef enum RaftProgressState { + PROGRESS_PROBE = 0, + PROGRESS_REPLICATE, + PROGRESS_SNAPSHOT, +} RaftProgressState; + +/** + * Progress represents a follower’s progress in the view of the leader. Leader maintains + * progresses of all followers, and sends entries to the follower based on its progress. + **/ +struct SSyncRaftProgress { + SyncIndex nextIndex; + + SyncIndex matchIndex; + + RaftProgressState state; + + /** + * paused is used in PROGRESS_PROBE. + * When paused is true, raft should pause sending replication message to this peer. + **/ + bool paused; + + /** + * pendingSnapshotIndex is used in PROGRESS_SNAPSHOT. + * If there is a pending snapshot, the pendingSnapshotIndex will be set to the + * index of the snapshot. If pendingSnapshotIndex is set, the replication process of + * this Progress will be paused. raft will not resend snapshot until the pending one + * is reported to be failed. + **/ + SyncIndex pendingSnapshotIndex; + + /** + * recentActive is true if the progress is recently active. Receiving any messages + * from the corresponding follower indicates the progress is active. + * RecentActive can be reset to false after an election timeout. + **/ + bool recentActive; + + /** + * flow control sliding window + **/ + SSyncRaftInflights inflights; +}; + +int syncRaftProgressCreate(SSyncRaft* pRaft); +//int syncRaftProgressRecreate(SSyncRaft* pRaft, const RaftConfiguration* configuration); + +/** + * syncRaftProgressMaybeUpdate returns false if the given lastIndex index comes from i-th node's log. + * Otherwise it updates the progress and returns true. + **/ +bool syncRaftProgressMaybeUpdate(SSyncRaft* pRaft, int i, SyncIndex lastIndex); + +void syncRaftProgressOptimisticNextIndex(SSyncRaft* pRaft, int i, SyncIndex nextIndex); + +/** + * syncRaftProgressMaybeDecrTo returns false if the given to index comes from an out of order message. + * Otherwise it decreases the progress next index to min(rejected, last) and returns true. + **/ +bool syncRaftProgressMaybeDecrTo(SSyncRaft* pRaft, int i, + SyncIndex rejected, SyncIndex lastIndex); + +/** + * syncRaftProgressIsPaused returns whether sending log entries to this node has been + * paused. A node may be paused because it has rejected recent + * MsgApps, is currently waiting for a snapshot, or has reached the + * MaxInflightMsgs limit. + **/ +bool syncRaftProgressIsPaused(SSyncRaft* pRaft, int i); + +void syncRaftProgressFailure(SSyncRaft* pRaft, int i); + +bool syncRaftProgressNeedAbortSnapshot(SSyncRaft* pRaft, int i); + +/** + * return true if i-th node's log is up-todate + **/ +bool syncRaftProgressIsUptodate(SSyncRaft* pRaft, int i); + +void syncRaftProgressBecomeProbe(SSyncRaft* pRaft, int i); + +void syncRaftProgressBecomeReplicate(SSyncRaft* pRaft, int i); + +void syncRaftProgressBecomeSnapshot(SSyncRaft* pRaft, int i, SyncIndex snapshotIndex); + +int syncRaftInflightReset(SSyncRaftInflights* inflights); +bool syncRaftInflightFull(SSyncRaftInflights* inflights); +void syncRaftInflightAdd(SSyncRaftInflights* inflights, SyncIndex inflightIndex); +void syncRaftInflightFreeTo(SSyncRaftInflights* inflights, SyncIndex toIndex); +void syncRaftInflightFreeFirstOne(SSyncRaftInflights* inflights); + +#if 0 + +void syncRaftProgressAbortSnapshot(SSyncRaft* pRaft, int i); + +SyncIndex syncRaftProgressNextIndex(SSyncRaft* pRaft, int i); + +SyncIndex syncRaftProgressMatchIndex(SSyncRaft* pRaft, int i); + +void syncRaftProgressUpdateLastSend(SSyncRaft* pRaft, int i); + +void syncRaftProgressUpdateSnapshotLastSend(SSyncRaft* pRaft, int i); + +bool syncRaftProgressResetRecentRecv(SSyncRaft* pRaft, int i); + +void syncRaftProgressMarkRecentRecv(SSyncRaft* pRaft, int i); + +bool syncRaftProgressGetRecentRecv(SSyncRaft* pRaft, int i); + +void syncRaftProgressAbortSnapshot(SSyncRaft* pRaft, int i); + +RaftProgressState syncRaftProgressState(SSyncRaft* pRaft, int i); + +#endif + +#endif /* TD_SYNC_RAFT_PROGRESS_H */ \ No newline at end of file diff --git a/source/libs/sync/inc/raft_unstable_log.h b/source/libs/sync/inc/raft_unstable_log.h new file mode 100644 index 0000000000..2b7b30c15a --- /dev/null +++ b/source/libs/sync/inc/raft_unstable_log.h @@ -0,0 +1,115 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef TD_SYNC_RAFT_UNSTABLE_LOG_H +#define TD_SYNC_RAFT_UNSTABLE_LOG_H + +#include "sync_type.h" + +/* in-memory unstable raft log storage */ +struct SSyncRaftUnstableLog { +#if 0 + /* Circular buffer of log entries */ + RaftEntry *entries; + + /* size of Circular buffer */ + int size; + + /* Indexes of used slots [front, back) */ + int front, back; + + /* Index of first entry is offset + 1 */ + SyncIndex offset; + + /* meta data of snapshot */ + SSyncRaftUnstableLog snapshot; +#endif +}; + +/** + * return index of last in memory log, return 0 if log is empty + **/ +SyncIndex syncRaftLogLastIndex(SSyncRaftUnstableLog* pLog); + +#if 0 +void raftLogInit(RaftLog* pLog); + +void raftLogClose(RaftLog* pLog); + +/** + * When startup populating log entrues loaded from disk, + * init raft memory log with snapshot index,term and log start idnex. + **/ +/* +void raftLogStart(RaftLog* pLog, + RaftSnapshotMeta snapshot, + SyncIndex startIndex); +*/ +/** + * Get the number of entries the log. + **/ +int raftLogNumEntries(const RaftLog* pLog); + + + +/** + * return last term of in memory log, return 0 if log is empty + **/ +SSyncTerm raftLogLastTerm(RaftLog* pLog); + +/** + * return term of log with the given index, return 0 if the term of index cannot be found + * , errCode will save the error code. + **/ +SSyncTerm raftLogTermOf(RaftLog* pLog, SyncIndex index, RaftCode* errCode); + +/** + * Get the last index of the most recent snapshot. Return 0 if there are no * + * snapshots. + **/ +SyncIndex raftLogSnapshotIndex(RaftLog* pLog); + +/* Append a new entry to the log. */ +int raftLogAppend(RaftLog* pLog, + SSyncTerm term, + const SSyncBuffer *buf); + +/** + * acquire log from given index onwards. + **/ +/* +int raftLogAcquire(RaftLog* pLog, + SyncIndex index, + RaftEntry **ppEntries, + int *n); + +void raftLogRelease(RaftLog* pLog, + SyncIndex index, + RaftEntry *pEntries, + int n); +*/ +/* Delete all entries from the given index (included) onwards. */ +void raftLogTruncate(RaftLog* pLog, SyncIndex index); + +/** + * when taking a new snapshot, the function will update the last snapshot information and delete + * all entries up last_index - trailing (included). If the log contains no entry + * a last_index - trailing, then no entry will be deleted. + **/ +void raftLogSnapshot(RaftLog* pLog, SyncIndex index, SyncIndex trailing); + +#endif + +#endif /* TD_SYNC_RAFT_UNSTABLE_LOG_H */ \ No newline at end of file diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h index 73015e87a1..f99fb066ae 100644 --- a/source/libs/sync/inc/syncInt.h +++ b/source/libs/sync/inc/syncInt.h @@ -19,6 +19,7 @@ #include "thash.h" #include "os.h" #include "sync.h" +#include "sync_type.h" #include "raft.h" #include "tlog.h" diff --git a/source/libs/sync/inc/sync_type.h b/source/libs/sync/inc/sync_type.h new file mode 100644 index 0000000000..2c9f24287a --- /dev/null +++ b/source/libs/sync/inc/sync_type.h @@ -0,0 +1,33 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef _TD_LIBS_SYNC_TYPE_H +#define _TD_LIBS_SYNC_TYPE_H + +typedef int32_t SyncTime; + +typedef struct SSyncRaftUnstableLog SSyncRaftUnstableLog; + +typedef struct SSyncRaft SSyncRaft; + +#ifndef MIN +#define MIN(x, y) (((x) < (y)) ? (x) : (y)) +#endif + +#ifndef MAX +#define MAX(x, y) (((x) > (y)) ? (x) : (y)) +#endif + +#endif /* _TD_LIBS_SYNC_TYPE_H */ diff --git a/source/libs/sync/src/raft.c b/source/libs/sync/src/raft.c index 23442803c4..42b220e642 100644 --- a/source/libs/sync/src/raft.c +++ b/source/libs/sync/src/raft.c @@ -16,12 +16,10 @@ #include "raft.h" #include "syncInt.h" -#ifndef MIN -#define MIN(x, y) (((x) < (y)) ? (x) : (y)) -#endif - #define RAFT_READ_LOG_MAX_NUM 100 +static void syncRaftBecomeFollower(SSyncRaft* pRaft, SSyncTerm term); + int32_t syncRaftStart(SSyncRaft* pRaft, const SSyncInfo* pInfo) { SSyncNode* pNode = pRaft->pNode; SSyncServerState serverState; @@ -44,10 +42,10 @@ int32_t syncRaftStart(SSyncRaft* pRaft, const SSyncInfo* pInfo) { } assert(initIndex <= serverState.commitIndex); - // restore fsm state from snapshot index + 1, until commitIndex + // restore fsm state from snapshot index + 1 until commitIndex ++initIndex; - while (initIndex < serverState.commitIndex) { - limit = MIN(RAFT_READ_LOG_MAX_NUM, serverState.commitIndex - initIndex); + while (initIndex <= serverState.commitIndex) { + limit = MIN(RAFT_READ_LOG_MAX_NUM, serverState.commitIndex - initIndex + 1); if (logStore->logRead(logStore, initIndex, limit, buffer, &nBuf) != 0) { return -1; @@ -62,7 +60,11 @@ int32_t syncRaftStart(SSyncRaft* pRaft, const SSyncInfo* pInfo) { } assert(initIndex == serverState.commitIndex); - syncInfo("restore vgid %d state: snapshot index:", pInfo->vgId); + pRaft->heartbeatTick = 1; + + syncRaftBecomeFollower(pRaft, 1); + + syncInfo("restore vgid %d state: snapshot index success", pInfo->vgId); return 0; } @@ -73,4 +75,9 @@ int32_t syncRaftStep(SSyncRaft* pRaft, const RaftMessage* pMsg) { int32_t syncRaftTick(SSyncRaft* pRaft) { return 0; +} + +static void syncRaftBecomeFollower(SSyncRaft* pRaft, SSyncTerm term) { + pRaft->electionTick = taosRand() % 3 + 3; + return; } \ No newline at end of file diff --git a/source/libs/sync/src/raft_progress.c b/source/libs/sync/src/raft_progress.c new file mode 100644 index 0000000000..0f51d20531 --- /dev/null +++ b/source/libs/sync/src/raft_progress.c @@ -0,0 +1,317 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "raft.h" +#include "raft_unstable_log.h" +#include "raft_progress.h" +#include "sync.h" +#include "syncInt.h" + +static void resetProgressState(SSyncRaftProgress* progress, RaftProgressState state); + +static void resumeProgress(SSyncRaftProgress* progress); +static void pauseProgress(SSyncRaftProgress* progress); + +int syncRaftProgressCreate(SSyncRaft* pRaft) { + +/* + inflights->buffer = (SyncIndex*)malloc(sizeof(SyncIndex) * pRaft->maxInflightMsgs); + if (inflights->buffer == NULL) { + return RAFT_OOM; + } + inflights->size = pRaft->maxInflightMsgs; +*/ +} + +/* +int syncRaftProgressRecreate(SSyncRaft* pRaft, const RaftConfiguration* configuration) { + +} +*/ + +bool syncRaftProgressMaybeUpdate(SSyncRaft* pRaft, int i, SyncIndex lastIndex) { + assert(i >= 0 && i < pRaft->leaderState.nProgress); + SSyncRaftProgress* progress = &(pRaft->leaderState.progress[i]); + bool updated = false; + + if (progress->matchIndex < lastIndex) { + progress->matchIndex = lastIndex; + updated = true; + resumeProgress(progress); + } + if (progress->nextIndex < lastIndex + 1) { + progress->nextIndex = lastIndex + 1; + } + + return updated; +} + +void syncRaftProgressOptimisticNextIndex(SSyncRaft* pRaft, int i, SyncIndex nextIndex) { + assert(i >= 0 && i < pRaft->leaderState.nProgress); + pRaft->leaderState.progress[i].nextIndex = nextIndex + 1; +} + +bool syncRaftProgressMaybeDecrTo(SSyncRaft* pRaft, int i, + SyncIndex rejected, SyncIndex lastIndex) { + assert(i >= 0 && i < pRaft->leaderState.nProgress); + SSyncRaftProgress* progress = &(pRaft->leaderState.progress[i]); + + if (progress->state == PROGRESS_REPLICATE) { + /** + * the rejection must be stale if the progress has matched and "rejected" + * is smaller than "match". + **/ + if (rejected <= progress->matchIndex) { + syncDebug("match index is up to date,ignore"); + return false; + } + + /* directly decrease next to match + 1 */ + progress->nextIndex = progress->matchIndex + 1; + //syncRaftProgressBecomeProbe(raft, i); + return true; + } + + if (rejected != progress->nextIndex - 1) { + syncDebug("rejected index %" PRId64 " different from next index %" PRId64 " -> ignore" + , rejected, progress->nextIndex); + return false; + } + + progress->nextIndex = MIN(rejected, lastIndex + 1); + if (progress->nextIndex < 1) { + progress->nextIndex = 1; + } + + resumeProgress(progress); + return true; +} + +static void resumeProgress(SSyncRaftProgress* progress) { + progress->paused = false; +} + +static void pauseProgress(SSyncRaftProgress* progress) { + progress->paused = true; +} + +bool syncRaftProgressIsPaused(SSyncRaft* pRaft, int i) { + assert(i >= 0 && i < pRaft->leaderState.nProgress); + + SSyncRaftProgress* progress = &(pRaft->leaderState.progress[i]); + + switch (progress->state) { + case PROGRESS_PROBE: + return progress->paused; + case PROGRESS_REPLICATE: + return syncRaftInflightFull(&progress->inflights); + case PROGRESS_SNAPSHOT: + return true; + default: + syncFatal("error sync state:%d", progress->state); + } +} + +void syncRaftProgressFailure(SSyncRaft* pRaft, int i) { + assert(i >= 0 && i < pRaft->leaderState.nProgress); + + SSyncRaftProgress* progress = &(pRaft->leaderState.progress[i]); + + progress->pendingSnapshotIndex = 0; +} + +bool syncRaftProgressNeedAbortSnapshot(SSyncRaft* pRaft, int i) { + assert(i >= 0 && i < pRaft->leaderState.nProgress); + SSyncRaftProgress* progress = &(pRaft->leaderState.progress[i]); + + return progress->state == PROGRESS_SNAPSHOT && progress->matchIndex >= progress->pendingSnapshotIndex; +} + +bool syncRaftProgressIsUptodate(SSyncRaft* pRaft, int i) { + assert(i >= 0 && i < pRaft->leaderState.nProgress); + SSyncRaftProgress* progress = &(pRaft->leaderState.progress[i]); + return syncRaftLogLastIndex(pRaft->log) + 1 == progress->nextIndex; +} + +void syncRaftProgressBecomeProbe(SSyncRaft* pRaft, int i) { + assert(i >= 0 && i < pRaft->leaderState.nProgress); + SSyncRaftProgress* progress = &(pRaft->leaderState.progress[i]); + /** + * If the original state is ProgressStateSnapshot, progress knows that + * the pending snapshot has been sent to this peer successfully, then + * probes from pendingSnapshot + 1. + **/ + if (progress->state == PROGRESS_SNAPSHOT) { + SyncIndex pendingSnapshotIndex = progress->pendingSnapshotIndex; + resetProgressState(progress, PROGRESS_PROBE); + progress->nextIndex = MAX(progress->matchIndex + 1, pendingSnapshotIndex + 1); + } else { + resetProgressState(progress, PROGRESS_PROBE); + progress->nextIndex = progress->matchIndex + 1; + } +} + +void syncRaftProgressBecomeReplicate(SSyncRaft* pRaft, int i) { + assert(i >= 0 && i < pRaft->leaderState.nProgress); + SSyncRaftProgress* progress = &(pRaft->leaderState.progress[i]); + resetProgressState(progress, PROGRESS_REPLICATE); + progress->nextIndex = progress->matchIndex + 1; +} + +void syncRaftProgressBecomeSnapshot(SSyncRaft* pRaft, int i, SyncIndex snapshotIndex) { + assert(i >= 0 && i < pRaft->leaderState.nProgress); + SSyncRaftProgress* progress = &(pRaft->leaderState.progress[i]); + resetProgressState(progress, PROGRESS_SNAPSHOT); + progress->pendingSnapshotIndex = snapshotIndex; +} + +static void resetProgressState(SSyncRaftProgress* progress, RaftProgressState state) { + progress->paused = false; + progress->pendingSnapshotIndex = 0; + progress->state = state; + syncRaftInflightReset(&(progress->inflights)); +} + + +int syncRaftInflightReset(SSyncRaftInflights* inflights) { + inflights->count = 0; + inflights->start = 0; + + return 0; +} + +bool syncRaftInflightFull(SSyncRaftInflights* inflights) { + return inflights->count == inflights->size; +} + +void syncRaftInflightAdd(SSyncRaftInflights* inflights, SyncIndex inflightIndex) { + assert(!syncRaftInflightFull(inflights)); + + int next = inflights->start + inflights->count; + int size = inflights->size; + /* is next wrapped around buffer? */ + if (next >= size) { + next -= size; + } + + inflights->buffer[next] = inflightIndex; + inflights->count++; +} + +void syncRaftInflightFreeTo(SSyncRaftInflights* inflights, SyncIndex toIndex) { + if (inflights->count == 0 || toIndex < inflights->buffer[inflights->start]) { + return; + } + + int i, idx; + for (i = 0, idx = inflights->start; i < inflights->count; i++) { + if (toIndex < inflights->buffer[idx]) { + break; + } + + int size = inflights->size; + idx++; + if (idx >= size) { + idx -= size; + } + } + + inflights->count -= i; + inflights->start = idx; + assert(inflights->count >= 0); + if (inflights->count == 0) { + inflights->start = 0; + } +} + +void syncRaftInflightFreeFirstOne(SSyncRaftInflights* inflights) { + syncRaftInflightFreeTo(inflights, inflights->buffer[inflights->start]); +} + + + + + +#if 0 + +SyncIndex syncRaftProgressNextIndex(SSyncRaft* pRaft, int i) { + return pRaft->leaderState.progress[i].nextIndex; +} + +SyncIndex syncRaftProgressMatchIndex(SSyncRaft* pRaft, int i) { + return pRaft->leaderState.progress[i].matchIndex; +} + +void syncRaftProgressUpdateLastSend(SSyncRaft* pRaft, int i) { + pRaft->leaderState.progress[i].lastSend = pRaft->io.time(pRaft); +} + +void syncRaftProgressUpdateSnapshotLastSend(SSyncRaft* pRaft, int i) { + pRaft->leaderState.progress[i].lastSendSnapshot = pRaft->io.time(pRaft); +} + +bool syncRaftProgressResetRecentRecv(SSyncRaft* pRaft, int i) { + SSyncRaftProgress* progress = &(pRaft->leaderState.progress[i]); + bool prev = progress->recentRecv; + progress->recentRecv = false; + return prev; +} + +void syncRaftProgressMarkRecentRecv(SSyncRaft* pRaft, int i) { + pRaft->leaderState.progress[i].recentRecv = true; +} + +bool syncRaftProgressGetRecentRecv(SSyncRaft* pRaft, int i) { + return pRaft->leaderState.progress[i].recentRecv; +} + +void syncRaftProgressBecomeSnapshot(SSyncRaft* pRaft, int i) { + SSyncRaftProgress* progress = &(pRaft->leaderState.progress[i]); + resetProgressState(progress, PROGRESS_SNAPSHOT); + progress->pendingSnapshotIndex = raftLogSnapshotIndex(pRaft->log); +} + +void syncRaftProgressBecomeProbe(SSyncRaft* pRaft, int i) { + SSyncRaftProgress* progress = &(pRaft->leaderState.progress[i]); + + if (progress->state == PROGRESS_SNAPSHOT) { + assert(progress->pendingSnapshotIndex > 0); + SyncIndex pendingSnapshotIndex = progress->pendingSnapshotIndex; + resetProgressState(progress, PROGRESS_PROBE); + progress->nextIndex = max(progress->matchIndex + 1, pendingSnapshotIndex); + } else { + resetProgressState(progress, PROGRESS_PROBE); + progress->nextIndex = progress->matchIndex + 1; + } +} + +void syncRaftProgressBecomeReplicate(SSyncRaft* pRaft, int i) { + resetProgressState(pRaft->leaderState.progress, PROGRESS_REPLICATE); + pRaft->leaderState.progress->nextIndex = pRaft->leaderState.progress->matchIndex + 1; +} + +void syncRaftProgressAbortSnapshot(SSyncRaft* pRaft, int i) { + SSyncRaftProgress* progress = &(pRaft->leaderState.progress[i]); + progress->pendingSnapshotIndex = 0; + progress->state = PROGRESS_PROBE; +} + +RaftProgressState syncRaftProgressState(SSyncRaft* pRaft, int i) { + return pRaft->leaderState.progress[i].state; +} + + + +#endif \ No newline at end of file diff --git a/source/libs/sync/src/raft_unstable_log.c b/source/libs/sync/src/raft_unstable_log.c new file mode 100644 index 0000000000..4735242d3c --- /dev/null +++ b/source/libs/sync/src/raft_unstable_log.c @@ -0,0 +1,21 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "sync.h" +#include "raft_unstable_log.h" + +SyncIndex syncRaftLogLastIndex(SSyncRaftUnstableLog* pLog) { + return 0; +} \ No newline at end of file