diff --git a/include/libs/sync/sync.h b/include/libs/sync/sync.h index 0ec741ec3e..f5881837c6 100644 --- a/include/libs/sync/sync.h +++ b/include/libs/sync/sync.h @@ -131,10 +131,6 @@ typedef struct SStateMgr { typedef struct { SyncGroupId vgId; SSyncCfg syncCfg; - SSyncLogStore logStore; - SStateMgr stateManager; - SSyncFSM syncFsm; - } SSyncInfo; struct SSyncNode; diff --git a/source/libs/sync/inc/syncAppendEntries.h b/source/libs/sync/inc/syncAppendEntries.h index 9ca0de19c5..b7c1c051cc 100644 --- a/source/libs/sync/inc/syncAppendEntries.h +++ b/source/libs/sync/inc/syncAppendEntries.h @@ -23,6 +23,7 @@ extern "C" { #include #include #include +#include "syncInt.h" #include "syncMessage.h" #include "syncRaft.h" #include "taosdef.h" diff --git a/source/libs/sync/inc/syncAppendEntriesReply.h b/source/libs/sync/inc/syncAppendEntriesReply.h index 8b5cbf1da5..22f8eb464f 100644 --- a/source/libs/sync/inc/syncAppendEntriesReply.h +++ b/source/libs/sync/inc/syncAppendEntriesReply.h @@ -23,6 +23,7 @@ extern "C" { #include #include #include +#include "syncInt.h" #include "syncMessage.h" #include "syncRaft.h" #include "taosdef.h" diff --git a/source/libs/sync/inc/syncElection.h b/source/libs/sync/inc/syncElection.h index 34dfdb3d09..0c86e9a6d3 100644 --- a/source/libs/sync/inc/syncElection.h +++ b/source/libs/sync/inc/syncElection.h @@ -24,6 +24,7 @@ extern "C" { #include #include #include "taosdef.h" +#include "syncInt.h" #ifdef __cplusplus } diff --git a/source/libs/sync/inc/syncIO.h b/source/libs/sync/inc/syncIO.h index 4bfeadb15a..54a3d2b8c1 100644 --- a/source/libs/sync/inc/syncIO.h +++ b/source/libs/sync/inc/syncIO.h @@ -50,13 +50,16 @@ typedef struct SSyncIO { } SSyncIO; +int32_t syncIOStart(); +int32_t syncIOStop(); + SSyncIO *syncIOCreate(); -static int32_t syncIOStart(SSyncIO *io); -static int32_t syncIOStop(SSyncIO *io); -static int32_t syncIOPing(SSyncIO *io); -static int32_t syncIOOnMessage(struct SSyncIO *io, void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet); -static int32_t syncIODestroy(SSyncIO *io); +static int32_t doSyncIOStart(SSyncIO *io); +static int32_t doSyncIOStop(SSyncIO *io); +static int32_t doSyncIOPing(SSyncIO *io); +static int32_t doSyncIOOnMessage(struct SSyncIO *io, void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet); +static int32_t doSyncIODestroy(SSyncIO *io); #ifdef __cplusplus } diff --git a/source/libs/sync/inc/syncMessage.h b/source/libs/sync/inc/syncMessage.h index f410c8cf6e..41a19eb49a 100644 --- a/source/libs/sync/inc/syncMessage.h +++ b/source/libs/sync/inc/syncMessage.h @@ -23,7 +23,7 @@ extern "C" { #include #include #include -#include "sync.h" +#include "syncInt.h" #include "syncRaftEntry.h" #include "taosdef.h" diff --git a/source/libs/sync/inc/syncRaft.h b/source/libs/sync/inc/syncRaft.h index f11ef50b06..3964b95f28 100644 --- a/source/libs/sync/inc/syncRaft.h +++ b/source/libs/sync/inc/syncRaft.h @@ -23,7 +23,7 @@ extern "C" { #include #include #include -#include "sync.h" +#include "syncInt.h" #include "syncMessage.h" #include "taosdef.h" @@ -34,6 +34,11 @@ typedef struct SRaftId { typedef struct SRaft { SRaftId id; + + SSyncLogStore *logStore; + SStateMgr *stateManager; + SSyncFSM *syncFsm; + } SRaft; int32_t raftPropose(SRaft* pRaft, const SSyncBuffer* pBuf, bool isWeak); diff --git a/source/libs/sync/inc/syncRaftEntry.h b/source/libs/sync/inc/syncRaftEntry.h index 65f77f3759..516bef4d48 100644 --- a/source/libs/sync/inc/syncRaftEntry.h +++ b/source/libs/sync/inc/syncRaftEntry.h @@ -23,7 +23,7 @@ extern "C" { #include #include #include -#include "sync.h" +#include "syncInt.h" #include "taosdef.h" typedef struct SSyncRaftEntry { diff --git a/source/libs/sync/inc/syncRaftLog.h b/source/libs/sync/inc/syncRaftLog.h index 8c4b5116ea..ee971062cf 100644 --- a/source/libs/sync/inc/syncRaftLog.h +++ b/source/libs/sync/inc/syncRaftLog.h @@ -23,7 +23,7 @@ extern "C" { #include #include #include -#include "sync.h" +#include "syncInt.h" #include "taosdef.h" int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncBuffer* pBuf); diff --git a/source/libs/sync/inc/syncReplication.h b/source/libs/sync/inc/syncReplication.h index 40c5ff790b..bfe071853c 100644 --- a/source/libs/sync/inc/syncReplication.h +++ b/source/libs/sync/inc/syncReplication.h @@ -24,6 +24,7 @@ extern "C" { #include #include #include "taosdef.h" +#include "syncInt.h" #ifdef __cplusplus } diff --git a/source/libs/sync/inc/syncRequestVote.h b/source/libs/sync/inc/syncRequestVote.h index 3ff96bbe8f..c2eca55151 100644 --- a/source/libs/sync/inc/syncRequestVote.h +++ b/source/libs/sync/inc/syncRequestVote.h @@ -23,6 +23,7 @@ extern "C" { #include #include #include +#include "syncInt.h" #include "syncMessage.h" #include "syncRaft.h" #include "taosdef.h" diff --git a/source/libs/sync/inc/syncRequestVoteReply.h b/source/libs/sync/inc/syncRequestVoteReply.h index 033ac89bc2..38068dd0e2 100644 --- a/source/libs/sync/inc/syncRequestVoteReply.h +++ b/source/libs/sync/inc/syncRequestVoteReply.h @@ -23,6 +23,7 @@ extern "C" { #include #include #include +#include "syncInt.h" #include "syncMessage.h" #include "syncRaft.h" #include "taosdef.h" diff --git a/source/libs/sync/inc/syncSnapshot.h b/source/libs/sync/inc/syncSnapshot.h index 3b6121578a..89fcb230fb 100644 --- a/source/libs/sync/inc/syncSnapshot.h +++ b/source/libs/sync/inc/syncSnapshot.h @@ -23,7 +23,7 @@ extern "C" { #include #include #include -#include "sync.h" +#include "syncInt.h" #include "syncRaft.h" #include "taosdef.h" diff --git a/source/libs/sync/inc/syncTimeout.h b/source/libs/sync/inc/syncTimeout.h index 8159d2566c..2f5517069e 100644 --- a/source/libs/sync/inc/syncTimeout.h +++ b/source/libs/sync/inc/syncTimeout.h @@ -26,6 +26,7 @@ extern "C" { #include "syncMessage.h" #include "syncRaft.h" #include "taosdef.h" +#include "syncInt.h" void onTimeout(SRaft *pRaft, void *pMsg); diff --git a/source/libs/sync/inc/syncVoteMgr.h b/source/libs/sync/inc/syncVoteMgr.h new file mode 100644 index 0000000000..93729a859c --- /dev/null +++ b/source/libs/sync/inc/syncVoteMgr.h @@ -0,0 +1,34 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef _TD_LIBS_SYNC_VOTG_MGR_H +#define _TD_LIBS_SYNC_VOTG_MGR_H + +#ifdef __cplusplus +extern "C" { +#endif + +#include +#include +#include +#include "taosdef.h" +#include "syncInt.h" + + +#ifdef __cplusplus +} +#endif + +#endif /*_TD_LIBS_SYNC_VOTG_MGR_H*/ diff --git a/source/libs/sync/src/syncAppendEntries.c b/source/libs/sync/src/syncAppendEntries.c index 65654564ab..2b9c59ec92 100644 --- a/source/libs/sync/src/syncAppendEntries.c +++ b/source/libs/sync/src/syncAppendEntries.c @@ -14,7 +14,6 @@ */ #include "syncAppendEntries.h" -#include "sync.h" void appendEntries(SRaft *pRaft, const SyncAppendEntries *pMsg) { // TLA+ Spec diff --git a/source/libs/sync/src/syncAppendEntriesReply.c b/source/libs/sync/src/syncAppendEntriesReply.c index 20235ef720..05734237b9 100644 --- a/source/libs/sync/src/syncAppendEntriesReply.c +++ b/source/libs/sync/src/syncAppendEntriesReply.c @@ -14,7 +14,6 @@ */ #include "syncAppendEntriesReply.h" -#include "sync.h" void onAppendEntriesReply(SRaft *pRaft, const SyncAppendEntriesReply *pMsg) { // TLA+ Spec diff --git a/source/libs/sync/src/syncElection.c b/source/libs/sync/src/syncElection.c index 738fc4c5e1..329105e2a1 100644 --- a/source/libs/sync/src/syncElection.c +++ b/source/libs/sync/src/syncElection.c @@ -13,4 +13,4 @@ * along with this program. If not, see . */ -#include "sync.h" +#include "syncElection.h" diff --git a/source/libs/sync/src/syncEnv.c b/source/libs/sync/src/syncEnv.c index b314e89b44..e71cf55cb1 100644 --- a/source/libs/sync/src/syncEnv.c +++ b/source/libs/sync/src/syncEnv.c @@ -15,8 +15,6 @@ #include "syncEnv.h" #include -#include "sync.h" -#include "syncInt.h" SSyncEnv *gSyncEnv = NULL; diff --git a/source/libs/sync/src/syncIO.c b/source/libs/sync/src/syncIO.c index 736dd6632c..0e32d9ac50 100644 --- a/source/libs/sync/src/syncIO.c +++ b/source/libs/sync/src/syncIO.c @@ -20,6 +20,10 @@ #include "ttimer.h" #include "tutil.h" +int32_t syncIOStart() { return 0; } + +int32_t syncIOStop() { return 0; } + static void syncTick(void *param, void *tmrId) { SSyncIO *io = (SSyncIO *)param; sDebug("syncTick ... "); @@ -114,16 +118,16 @@ SSyncIO *syncIOCreate() { io->pQset = taosOpenQset(); taosAddIntoQset(io->pQset, io->pMsgQ, NULL); - io->start = syncIOStart; - io->stop = syncIOStop; - io->ping = syncIOPing; - io->onMessage = syncIOOnMessage; - io->destroy = syncIODestroy; + io->start = doSyncIOStart; + io->stop = doSyncIOStop; + io->ping = doSyncIOPing; + io->onMessage = doSyncIOOnMessage; + io->destroy = doSyncIODestroy; return io; } -static int32_t syncIOStart(SSyncIO *io) { +static int32_t doSyncIOStart(SSyncIO *io) { taosBlockSIGPIPE(); tsRpcForceTcp = 1; @@ -191,13 +195,13 @@ static int32_t syncIOStart(SSyncIO *io) { return 0; } -static int32_t syncIOStop(SSyncIO *io) { +static int32_t doSyncIOStop(SSyncIO *io) { atomic_store_8(&io->isStart, 0); pthread_join(io->tid, NULL); return 0; } -static int32_t syncIOPing(SSyncIO *io) { +static int32_t doSyncIOPing(SSyncIO *io) { SRpcMsg rpcMsg, rspMsg; rpcMsg.pCont = rpcMallocCont(10); @@ -211,9 +215,9 @@ static int32_t syncIOPing(SSyncIO *io) { return 0; } -static int32_t syncIOOnMessage(struct SSyncIO *io, void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet) { return 0; } +static int32_t doSyncIOOnMessage(struct SSyncIO *io, void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet) { return 0; } -static int32_t syncIODestroy(SSyncIO *io) { +static int32_t doSyncIODestroy(SSyncIO *io) { int8_t start = atomic_load_8(&io->isStart); assert(start == 0); diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 4606496141..0e7d83d39a 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -14,7 +14,6 @@ */ #include -#include "sync.h" #include "syncEnv.h" #include "syncInt.h" diff --git a/source/libs/sync/src/syncMessage.c b/source/libs/sync/src/syncMessage.c index dcfc940f76..8937303725 100644 --- a/source/libs/sync/src/syncMessage.c +++ b/source/libs/sync/src/syncMessage.c @@ -14,7 +14,6 @@ */ #include "syncMessage.h" -#include "sync.h" #include "syncRaft.h" void onMessage(SRaft *pRaft, void *pMsg) {} \ No newline at end of file diff --git a/source/libs/sync/src/syncOnMessage.c b/source/libs/sync/src/syncOnMessage.c index 738fc4c5e1..19a97ee156 100644 --- a/source/libs/sync/src/syncOnMessage.c +++ b/source/libs/sync/src/syncOnMessage.c @@ -13,4 +13,4 @@ * along with this program. If not, see . */ -#include "sync.h" +#include "syncOnMessage.h" diff --git a/source/libs/sync/src/syncRaft.c b/source/libs/sync/src/syncRaft.c index 85c2c6fe27..f0a29917e0 100644 --- a/source/libs/sync/src/syncRaft.c +++ b/source/libs/sync/src/syncRaft.c @@ -14,7 +14,6 @@ */ #include "syncRaft.h" -#include "sync.h" int32_t raftPropose(SRaft* pRaft, const SSyncBuffer* pBuf, bool isWeak) { return 0; } diff --git a/source/libs/sync/src/syncRaftEntry.c b/source/libs/sync/src/syncRaftEntry.c index 738fc4c5e1..e525d3c7c2 100644 --- a/source/libs/sync/src/syncRaftEntry.c +++ b/source/libs/sync/src/syncRaftEntry.c @@ -13,4 +13,4 @@ * along with this program. If not, see . */ -#include "sync.h" +#include "syncRaftEntry.h" diff --git a/source/libs/sync/src/syncRaftLog.c b/source/libs/sync/src/syncRaftLog.c index 4a5fc201b0..37bb3ce48c 100644 --- a/source/libs/sync/src/syncRaftLog.c +++ b/source/libs/sync/src/syncRaftLog.c @@ -14,7 +14,6 @@ */ #include "syncRaftLog.h" -#include "sync.h" int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncBuffer* pBuf) { return 0; } diff --git a/source/libs/sync/src/syncRaftStore.c b/source/libs/sync/src/syncRaftStore.c index 7d9c812f8a..4391f5d25c 100644 --- a/source/libs/sync/src/syncRaftStore.c +++ b/source/libs/sync/src/syncRaftStore.c @@ -15,7 +15,6 @@ #include "syncRaftStore.h" #include "cJSON.h" -#include "sync.h" SRaftStore *raftStoreOpen(const char *path) { int32_t ret; diff --git a/source/libs/sync/src/syncReplication.c b/source/libs/sync/src/syncReplication.c index 738fc4c5e1..4cea7c150e 100644 --- a/source/libs/sync/src/syncReplication.c +++ b/source/libs/sync/src/syncReplication.c @@ -13,4 +13,4 @@ * along with this program. If not, see . */ -#include "sync.h" +#include "syncReplication.h" diff --git a/source/libs/sync/src/syncRequestVote.c b/source/libs/sync/src/syncRequestVote.c index 88056c95ff..7aee47b8e4 100644 --- a/source/libs/sync/src/syncRequestVote.c +++ b/source/libs/sync/src/syncRequestVote.c @@ -14,7 +14,6 @@ */ #include "syncRequestVote.h" -#include "sync.h" void requestVote(SRaft *pRaft, const SyncRequestVote *pMsg) { // TLA+ Spec diff --git a/source/libs/sync/src/syncRequestVoteReply.c b/source/libs/sync/src/syncRequestVoteReply.c index 4ca1b1343f..a9c88a7975 100644 --- a/source/libs/sync/src/syncRequestVoteReply.c +++ b/source/libs/sync/src/syncRequestVoteReply.c @@ -14,7 +14,6 @@ */ #include "syncRequestVoteReply.h" -#include "sync.h" void onRequestVoteReply(SRaft *pRaft, const SyncRequestVoteReply *pMsg) { // TLA+ Spec diff --git a/source/libs/sync/src/syncSnapshot.c b/source/libs/sync/src/syncSnapshot.c index 8a27f097d1..da194780ff 100644 --- a/source/libs/sync/src/syncSnapshot.c +++ b/source/libs/sync/src/syncSnapshot.c @@ -14,7 +14,6 @@ */ #include "syncSnapshot.h" -#include "sync.h" #include "syncRaft.h" int32_t takeSnapshot(SSyncFSM *pFsm, SSnapshot *pSnapshot) { return 0; } diff --git a/source/libs/sync/src/syncTimeout.c b/source/libs/sync/src/syncTimeout.c index 206dd70046..e27df55d07 100644 --- a/source/libs/sync/src/syncTimeout.c +++ b/source/libs/sync/src/syncTimeout.c @@ -14,6 +14,5 @@ */ #include "syncTimeout.h" -#include "sync.h" void onTimeout(SRaft *pRaft, void *pMsg) {} \ No newline at end of file diff --git a/source/libs/sync/src/syncVoteMgr.c b/source/libs/sync/src/syncVoteMgr.c new file mode 100644 index 0000000000..02cf4ac033 --- /dev/null +++ b/source/libs/sync/src/syncVoteMgr.c @@ -0,0 +1,16 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "syncVoteMgr.h" diff --git a/source/libs/sync/test/syncEnvTest.cpp b/source/libs/sync/test/syncEnvTest.cpp index 5b8ccabdcc..c682284b25 100644 --- a/source/libs/sync/test/syncEnvTest.cpp +++ b/source/libs/sync/test/syncEnvTest.cpp @@ -1,52 +1,40 @@ +#include "syncEnv.h" #include #include "syncIO.h" #include "syncInt.h" #include "syncRaftStore.h" -void *pingFunc(void *param) { - SSyncIO *io = (SSyncIO *)param; - while (1) { - sDebug("io->ping"); - io->ping(io); - sleep(1); - } - return NULL; +void logTest() { + sTrace("--- sync log test: trace"); + sDebug("--- sync log test: debug"); + sInfo("--- sync log test: info"); + sWarn("--- sync log test: warn"); + sError("--- sync log test: error"); + sFatal("--- sync log test: fatal"); } int main() { + taosInitLog((char*)"syncEnvTest.log", 100000, 10); tsAsyncLog = 0; - taosInitLog((char *)"syncTest.log", 100000, 10); + sDebugFlag = 143 + 64; - SRaftStore *pRaftStore = raftStoreOpen("./raft_store.json"); - assert(pRaftStore != NULL); + logTest(); - raftStorePrint(pRaftStore); + int32_t ret = syncEnvStart(); + assert(ret == 0); - pRaftStore->currentTerm = 100; - pRaftStore->voteFor.addr = 200; - pRaftStore->voteFor.vgId = 300; + ret = syncIOStart(); + assert(ret == 0); - raftStorePrint(pRaftStore); + SSyncInfo syncInfo; + syncInfo.vgId = 1; - raftStorePersist(pRaftStore); - - tsAsyncLog = 0; - taosInitLog((char *)"syncTest.log", 100000, 10); - - sDebug("sync test"); - - SSyncIO *syncIO = syncIOCreate(); - assert(syncIO != NULL); - - syncIO->start(syncIO); - - sleep(2); - - pthread_t tid; - pthread_create(&tid, NULL, pingFunc, syncIO); + SSyncNode* pSyncNode = syncNodeStart(&syncInfo); + assert(pSyncNode != NULL); while (1) { - sleep(1); + taosMsleep(1000); } + return 0; } diff --git a/source/libs/sync/test/syncTest.cpp b/source/libs/sync/test/syncTest.cpp index 8e85278960..12b0905fa0 100644 --- a/source/libs/sync/test/syncTest.cpp +++ b/source/libs/sync/test/syncTest.cpp @@ -25,8 +25,6 @@ int main() { sError("sync log test: error"); sFatal("sync log test: fatal"); - - SRaftStore *pRaftStore = raftStoreOpen("./raft_store.json"); assert(pRaftStore != NULL);