add sync code

This commit is contained in:
Minghao Li 2022-02-25 15:34:24 +08:00
parent 113618bea3
commit 296d9abe2a
35 changed files with 114 additions and 76 deletions

View File

@ -131,10 +131,6 @@ typedef struct SStateMgr {
typedef struct { typedef struct {
SyncGroupId vgId; SyncGroupId vgId;
SSyncCfg syncCfg; SSyncCfg syncCfg;
SSyncLogStore logStore;
SStateMgr stateManager;
SSyncFSM syncFsm;
} SSyncInfo; } SSyncInfo;
struct SSyncNode; struct SSyncNode;

View File

@ -23,6 +23,7 @@ extern "C" {
#include <stdint.h> #include <stdint.h>
#include <stdio.h> #include <stdio.h>
#include <stdlib.h> #include <stdlib.h>
#include "syncInt.h"
#include "syncMessage.h" #include "syncMessage.h"
#include "syncRaft.h" #include "syncRaft.h"
#include "taosdef.h" #include "taosdef.h"

View File

@ -23,6 +23,7 @@ extern "C" {
#include <stdint.h> #include <stdint.h>
#include <stdio.h> #include <stdio.h>
#include <stdlib.h> #include <stdlib.h>
#include "syncInt.h"
#include "syncMessage.h" #include "syncMessage.h"
#include "syncRaft.h" #include "syncRaft.h"
#include "taosdef.h" #include "taosdef.h"

View File

@ -24,6 +24,7 @@ extern "C" {
#include <stdio.h> #include <stdio.h>
#include <stdlib.h> #include <stdlib.h>
#include "taosdef.h" #include "taosdef.h"
#include "syncInt.h"
#ifdef __cplusplus #ifdef __cplusplus
} }

View File

@ -50,13 +50,16 @@ typedef struct SSyncIO {
} SSyncIO; } SSyncIO;
int32_t syncIOStart();
int32_t syncIOStop();
SSyncIO *syncIOCreate(); SSyncIO *syncIOCreate();
static int32_t syncIOStart(SSyncIO *io); static int32_t doSyncIOStart(SSyncIO *io);
static int32_t syncIOStop(SSyncIO *io); static int32_t doSyncIOStop(SSyncIO *io);
static int32_t syncIOPing(SSyncIO *io); static int32_t doSyncIOPing(SSyncIO *io);
static int32_t syncIOOnMessage(struct SSyncIO *io, void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet); static int32_t doSyncIOOnMessage(struct SSyncIO *io, void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet);
static int32_t syncIODestroy(SSyncIO *io); static int32_t doSyncIODestroy(SSyncIO *io);
#ifdef __cplusplus #ifdef __cplusplus
} }

View File

@ -23,7 +23,7 @@ extern "C" {
#include <stdint.h> #include <stdint.h>
#include <stdio.h> #include <stdio.h>
#include <stdlib.h> #include <stdlib.h>
#include "sync.h" #include "syncInt.h"
#include "syncRaftEntry.h" #include "syncRaftEntry.h"
#include "taosdef.h" #include "taosdef.h"

View File

@ -23,7 +23,7 @@ extern "C" {
#include <stdint.h> #include <stdint.h>
#include <stdio.h> #include <stdio.h>
#include <stdlib.h> #include <stdlib.h>
#include "sync.h" #include "syncInt.h"
#include "syncMessage.h" #include "syncMessage.h"
#include "taosdef.h" #include "taosdef.h"
@ -34,6 +34,11 @@ typedef struct SRaftId {
typedef struct SRaft { typedef struct SRaft {
SRaftId id; SRaftId id;
SSyncLogStore *logStore;
SStateMgr *stateManager;
SSyncFSM *syncFsm;
} SRaft; } SRaft;
int32_t raftPropose(SRaft* pRaft, const SSyncBuffer* pBuf, bool isWeak); int32_t raftPropose(SRaft* pRaft, const SSyncBuffer* pBuf, bool isWeak);

View File

@ -23,7 +23,7 @@ extern "C" {
#include <stdint.h> #include <stdint.h>
#include <stdio.h> #include <stdio.h>
#include <stdlib.h> #include <stdlib.h>
#include "sync.h" #include "syncInt.h"
#include "taosdef.h" #include "taosdef.h"
typedef struct SSyncRaftEntry { typedef struct SSyncRaftEntry {

View File

@ -23,7 +23,7 @@ extern "C" {
#include <stdint.h> #include <stdint.h>
#include <stdio.h> #include <stdio.h>
#include <stdlib.h> #include <stdlib.h>
#include "sync.h" #include "syncInt.h"
#include "taosdef.h" #include "taosdef.h"
int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncBuffer* pBuf); int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncBuffer* pBuf);

View File

@ -24,6 +24,7 @@ extern "C" {
#include <stdio.h> #include <stdio.h>
#include <stdlib.h> #include <stdlib.h>
#include "taosdef.h" #include "taosdef.h"
#include "syncInt.h"
#ifdef __cplusplus #ifdef __cplusplus
} }

View File

@ -23,6 +23,7 @@ extern "C" {
#include <stdint.h> #include <stdint.h>
#include <stdio.h> #include <stdio.h>
#include <stdlib.h> #include <stdlib.h>
#include "syncInt.h"
#include "syncMessage.h" #include "syncMessage.h"
#include "syncRaft.h" #include "syncRaft.h"
#include "taosdef.h" #include "taosdef.h"

View File

@ -23,6 +23,7 @@ extern "C" {
#include <stdint.h> #include <stdint.h>
#include <stdio.h> #include <stdio.h>
#include <stdlib.h> #include <stdlib.h>
#include "syncInt.h"
#include "syncMessage.h" #include "syncMessage.h"
#include "syncRaft.h" #include "syncRaft.h"
#include "taosdef.h" #include "taosdef.h"

View File

@ -23,7 +23,7 @@ extern "C" {
#include <stdint.h> #include <stdint.h>
#include <stdio.h> #include <stdio.h>
#include <stdlib.h> #include <stdlib.h>
#include "sync.h" #include "syncInt.h"
#include "syncRaft.h" #include "syncRaft.h"
#include "taosdef.h" #include "taosdef.h"

View File

@ -26,6 +26,7 @@ extern "C" {
#include "syncMessage.h" #include "syncMessage.h"
#include "syncRaft.h" #include "syncRaft.h"
#include "taosdef.h" #include "taosdef.h"
#include "syncInt.h"
void onTimeout(SRaft *pRaft, void *pMsg); void onTimeout(SRaft *pRaft, void *pMsg);

View File

@ -0,0 +1,34 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_LIBS_SYNC_VOTG_MGR_H
#define _TD_LIBS_SYNC_VOTG_MGR_H
#ifdef __cplusplus
extern "C" {
#endif
#include <stdio.h>
#include <stdlib.h>
#include <stdint.h>
#include "taosdef.h"
#include "syncInt.h"
#ifdef __cplusplus
}
#endif
#endif /*_TD_LIBS_SYNC_VOTG_MGR_H*/

View File

@ -14,7 +14,6 @@
*/ */
#include "syncAppendEntries.h" #include "syncAppendEntries.h"
#include "sync.h"
void appendEntries(SRaft *pRaft, const SyncAppendEntries *pMsg) { void appendEntries(SRaft *pRaft, const SyncAppendEntries *pMsg) {
// TLA+ Spec // TLA+ Spec

View File

@ -14,7 +14,6 @@
*/ */
#include "syncAppendEntriesReply.h" #include "syncAppendEntriesReply.h"
#include "sync.h"
void onAppendEntriesReply(SRaft *pRaft, const SyncAppendEntriesReply *pMsg) { void onAppendEntriesReply(SRaft *pRaft, const SyncAppendEntriesReply *pMsg) {
// TLA+ Spec // TLA+ Spec

View File

@ -13,4 +13,4 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include "sync.h" #include "syncElection.h"

View File

@ -15,8 +15,6 @@
#include "syncEnv.h" #include "syncEnv.h"
#include <assert.h> #include <assert.h>
#include "sync.h"
#include "syncInt.h"
SSyncEnv *gSyncEnv = NULL; SSyncEnv *gSyncEnv = NULL;

View File

@ -20,6 +20,10 @@
#include "ttimer.h" #include "ttimer.h"
#include "tutil.h" #include "tutil.h"
int32_t syncIOStart() { return 0; }
int32_t syncIOStop() { return 0; }
static void syncTick(void *param, void *tmrId) { static void syncTick(void *param, void *tmrId) {
SSyncIO *io = (SSyncIO *)param; SSyncIO *io = (SSyncIO *)param;
sDebug("syncTick ... "); sDebug("syncTick ... ");
@ -114,16 +118,16 @@ SSyncIO *syncIOCreate() {
io->pQset = taosOpenQset(); io->pQset = taosOpenQset();
taosAddIntoQset(io->pQset, io->pMsgQ, NULL); taosAddIntoQset(io->pQset, io->pMsgQ, NULL);
io->start = syncIOStart; io->start = doSyncIOStart;
io->stop = syncIOStop; io->stop = doSyncIOStop;
io->ping = syncIOPing; io->ping = doSyncIOPing;
io->onMessage = syncIOOnMessage; io->onMessage = doSyncIOOnMessage;
io->destroy = syncIODestroy; io->destroy = doSyncIODestroy;
return io; return io;
} }
static int32_t syncIOStart(SSyncIO *io) { static int32_t doSyncIOStart(SSyncIO *io) {
taosBlockSIGPIPE(); taosBlockSIGPIPE();
tsRpcForceTcp = 1; tsRpcForceTcp = 1;
@ -191,13 +195,13 @@ static int32_t syncIOStart(SSyncIO *io) {
return 0; return 0;
} }
static int32_t syncIOStop(SSyncIO *io) { static int32_t doSyncIOStop(SSyncIO *io) {
atomic_store_8(&io->isStart, 0); atomic_store_8(&io->isStart, 0);
pthread_join(io->tid, NULL); pthread_join(io->tid, NULL);
return 0; return 0;
} }
static int32_t syncIOPing(SSyncIO *io) { static int32_t doSyncIOPing(SSyncIO *io) {
SRpcMsg rpcMsg, rspMsg; SRpcMsg rpcMsg, rspMsg;
rpcMsg.pCont = rpcMallocCont(10); rpcMsg.pCont = rpcMallocCont(10);
@ -211,9 +215,9 @@ static int32_t syncIOPing(SSyncIO *io) {
return 0; return 0;
} }
static int32_t syncIOOnMessage(struct SSyncIO *io, void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet) { return 0; } static int32_t doSyncIOOnMessage(struct SSyncIO *io, void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet) { return 0; }
static int32_t syncIODestroy(SSyncIO *io) { static int32_t doSyncIODestroy(SSyncIO *io) {
int8_t start = atomic_load_8(&io->isStart); int8_t start = atomic_load_8(&io->isStart);
assert(start == 0); assert(start == 0);

View File

@ -14,7 +14,6 @@
*/ */
#include <stdint.h> #include <stdint.h>
#include "sync.h"
#include "syncEnv.h" #include "syncEnv.h"
#include "syncInt.h" #include "syncInt.h"

View File

@ -14,7 +14,6 @@
*/ */
#include "syncMessage.h" #include "syncMessage.h"
#include "sync.h"
#include "syncRaft.h" #include "syncRaft.h"
void onMessage(SRaft *pRaft, void *pMsg) {} void onMessage(SRaft *pRaft, void *pMsg) {}

View File

@ -13,4 +13,4 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include "sync.h" #include "syncOnMessage.h"

View File

@ -14,7 +14,6 @@
*/ */
#include "syncRaft.h" #include "syncRaft.h"
#include "sync.h"
int32_t raftPropose(SRaft* pRaft, const SSyncBuffer* pBuf, bool isWeak) { return 0; } int32_t raftPropose(SRaft* pRaft, const SSyncBuffer* pBuf, bool isWeak) { return 0; }

View File

@ -13,4 +13,4 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include "sync.h" #include "syncRaftEntry.h"

View File

@ -14,7 +14,6 @@
*/ */
#include "syncRaftLog.h" #include "syncRaftLog.h"
#include "sync.h"
int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncBuffer* pBuf) { return 0; } int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncBuffer* pBuf) { return 0; }

View File

@ -15,7 +15,6 @@
#include "syncRaftStore.h" #include "syncRaftStore.h"
#include "cJSON.h" #include "cJSON.h"
#include "sync.h"
SRaftStore *raftStoreOpen(const char *path) { SRaftStore *raftStoreOpen(const char *path) {
int32_t ret; int32_t ret;

View File

@ -13,4 +13,4 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include "sync.h" #include "syncReplication.h"

View File

@ -14,7 +14,6 @@
*/ */
#include "syncRequestVote.h" #include "syncRequestVote.h"
#include "sync.h"
void requestVote(SRaft *pRaft, const SyncRequestVote *pMsg) { void requestVote(SRaft *pRaft, const SyncRequestVote *pMsg) {
// TLA+ Spec // TLA+ Spec

View File

@ -14,7 +14,6 @@
*/ */
#include "syncRequestVoteReply.h" #include "syncRequestVoteReply.h"
#include "sync.h"
void onRequestVoteReply(SRaft *pRaft, const SyncRequestVoteReply *pMsg) { void onRequestVoteReply(SRaft *pRaft, const SyncRequestVoteReply *pMsg) {
// TLA+ Spec // TLA+ Spec

View File

@ -14,7 +14,6 @@
*/ */
#include "syncSnapshot.h" #include "syncSnapshot.h"
#include "sync.h"
#include "syncRaft.h" #include "syncRaft.h"
int32_t takeSnapshot(SSyncFSM *pFsm, SSnapshot *pSnapshot) { return 0; } int32_t takeSnapshot(SSyncFSM *pFsm, SSnapshot *pSnapshot) { return 0; }

View File

@ -14,6 +14,5 @@
*/ */
#include "syncTimeout.h" #include "syncTimeout.h"
#include "sync.h"
void onTimeout(SRaft *pRaft, void *pMsg) {} void onTimeout(SRaft *pRaft, void *pMsg) {}

View File

@ -0,0 +1,16 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "syncVoteMgr.h"

View File

@ -1,52 +1,40 @@
#include "syncEnv.h"
#include <stdio.h> #include <stdio.h>
#include "syncIO.h" #include "syncIO.h"
#include "syncInt.h" #include "syncInt.h"
#include "syncRaftStore.h" #include "syncRaftStore.h"
void *pingFunc(void *param) { void logTest() {
SSyncIO *io = (SSyncIO *)param; sTrace("--- sync log test: trace");
while (1) { sDebug("--- sync log test: debug");
sDebug("io->ping"); sInfo("--- sync log test: info");
io->ping(io); sWarn("--- sync log test: warn");
sleep(1); sError("--- sync log test: error");
} sFatal("--- sync log test: fatal");
return NULL;
} }
int main() { int main() {
taosInitLog((char*)"syncEnvTest.log", 100000, 10);
tsAsyncLog = 0; tsAsyncLog = 0;
taosInitLog((char *)"syncTest.log", 100000, 10); sDebugFlag = 143 + 64;
SRaftStore *pRaftStore = raftStoreOpen("./raft_store.json"); logTest();
assert(pRaftStore != NULL);
raftStorePrint(pRaftStore); int32_t ret = syncEnvStart();
assert(ret == 0);
pRaftStore->currentTerm = 100; ret = syncIOStart();
pRaftStore->voteFor.addr = 200; assert(ret == 0);
pRaftStore->voteFor.vgId = 300;
raftStorePrint(pRaftStore); SSyncInfo syncInfo;
syncInfo.vgId = 1;
raftStorePersist(pRaftStore); SSyncNode* pSyncNode = syncNodeStart(&syncInfo);
assert(pSyncNode != NULL);
tsAsyncLog = 0;
taosInitLog((char *)"syncTest.log", 100000, 10);
sDebug("sync test");
SSyncIO *syncIO = syncIOCreate();
assert(syncIO != NULL);
syncIO->start(syncIO);
sleep(2);
pthread_t tid;
pthread_create(&tid, NULL, pingFunc, syncIO);
while (1) { while (1) {
sleep(1); taosMsleep(1000);
} }
return 0; return 0;
} }

View File

@ -25,8 +25,6 @@ int main() {
sError("sync log test: error"); sError("sync log test: error");
sFatal("sync log test: fatal"); sFatal("sync log test: fatal");
SRaftStore *pRaftStore = raftStoreOpen("./raft_store.json"); SRaftStore *pRaftStore = raftStoreOpen("./raft_store.json");
assert(pRaftStore != NULL); assert(pRaftStore != NULL);