diff --git a/source/libs/sync/inc/syncCommit.h b/source/libs/sync/inc/syncCommit.h new file mode 100644 index 0000000000..4b327935c0 --- /dev/null +++ b/source/libs/sync/inc/syncCommit.h @@ -0,0 +1,35 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef _TD_LIBS_SYNC_COMMIT_H +#define _TD_LIBS_SYNC_COMMIT_H + +#ifdef __cplusplus +extern "C" { +#endif + +#include +#include +#include +#include "syncInt.h" +#include "taosdef.h" + +void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode); + +#ifdef __cplusplus +} +#endif + +#endif /*_TD_LIBS_SYNC_COMMIT_H*/ diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h index 5a9af83827..8e36424f19 100644 --- a/source/libs/sync/inc/syncInt.h +++ b/source/libs/sync/inc/syncInt.h @@ -236,7 +236,6 @@ void syncNodeCandidate2Follower(SSyncNode* pSyncNode); // raft vote -------------- void syncNodeVoteForTerm(SSyncNode* pSyncNode, SyncTerm term, SRaftId* pRaftId); void syncNodeVoteForSelf(SSyncNode* pSyncNode); -void syncNodeMaybeAdvanceCommitIndex(SSyncNode* pSyncNode); // for debug -------------- void syncNodePrint(SSyncNode* pObj); diff --git a/source/libs/sync/src/syncAppendEntriesReply.c b/source/libs/sync/src/syncAppendEntriesReply.c index 93539db938..9db9a3e8ac 100644 --- a/source/libs/sync/src/syncAppendEntriesReply.c +++ b/source/libs/sync/src/syncAppendEntriesReply.c @@ -14,6 +14,7 @@ */ #include "syncAppendEntriesReply.h" +#include "syncCommit.h" #include "syncIndexMgr.h" #include "syncInt.h" #include "syncRaftLog.h" @@ -59,7 +60,7 @@ int32_t syncNodeOnAppendEntriesReplyCb(SSyncNode* ths, SyncAppendEntriesReply* p syncIndexMgrSetIndex(ths->pMatchIndex, &(pMsg->srcId), pMsg->matchIndex); // maybe commit - syncNodeMaybeAdvanceCommitIndex(ths); + syncMaybeAdvanceCommitIndex(ths); } else { SyncIndex nextIndex = syncIndexMgrGetIndex(ths->pNextIndex, &(pMsg->srcId)); diff --git a/source/libs/sync/src/syncCommit.c b/source/libs/sync/src/syncCommit.c index bc2a39aa89..0b27f35220 100644 --- a/source/libs/sync/src/syncCommit.c +++ b/source/libs/sync/src/syncCommit.c @@ -13,6 +13,7 @@ * along with this program. If not, see . */ +#include "syncCommit.h" #include "syncIndexMgr.h" #include "syncInt.h" #include "syncRaftLog.h" @@ -40,7 +41,7 @@ // IN commitIndex' = [commitIndex EXCEPT ![i] = newCommitIndex] // /\ UNCHANGED <> // -void syncNodeMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) { +void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) { syncIndexMgrLog2("==syncNodeMaybeAdvanceCommitIndex== pNextIndex", pSyncNode->pNextIndex); syncIndexMgrLog2("==syncNodeMaybeAdvanceCommitIndex== pMatchIndex", pSyncNode->pMatchIndex); diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 1f3e709a27..7845490a84 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -17,6 +17,7 @@ #include "sync.h" #include "syncAppendEntries.h" #include "syncAppendEntriesReply.h" +#include "syncCommit.h" #include "syncElection.h" #include "syncEnv.h" #include "syncIndexMgr.h" @@ -740,7 +741,7 @@ static int32_t syncNodeOnClientRequestCb(SSyncNode* ths, SyncClientRequest* pMsg ths->pLogStore->appendEntry(ths->pLogStore, pEntry); // only myself, maybe commit - syncNodeMaybeAdvanceCommitIndex(ths); + syncMaybeAdvanceCommitIndex(ths); // start replicate right now! syncNodeReplicate(ths); diff --git a/source/libs/sync/test/syncWriteTest.cpp b/source/libs/sync/test/syncWriteTest.cpp index d4c2dde6d0..2598abbddd 100644 --- a/source/libs/sync/test/syncWriteTest.cpp +++ b/source/libs/sync/test/syncWriteTest.cpp @@ -162,7 +162,7 @@ int main(int argc, char **argv) { SyncClientRequest *pMsg1 = step1(pMsg0); syncClientRequestPrint2((char *)"==step1==", pMsg1); - for (int i = 0; i < 5; ++i) { + for (int i = 0; i < 10; ++i) { SyncClientRequest *pSyncClientRequest = pMsg1; SRpcMsg rpcMsg; syncClientRequest2RpcMsg(pSyncClientRequest, &rpcMsg);