From c319d1cb12840088e0c60fc0db54b92ef0bd17a4 Mon Sep 17 00:00:00 2001 From: lichuang Date: Fri, 29 Oct 2021 16:05:25 +0800 Subject: [PATCH] [TD-10645][raft]add raft module --- include/libs/sync/sync.h | 7 ++- source/libs/sync/inc/raft.h | 12 ++++- source/libs/sync/inc/raft_message.h | 76 +++++++++++++++++++++++++++++ source/libs/sync/inc/syncInt.h | 3 ++ source/libs/sync/src/raft.c | 74 ++++++++++++++++++++++++++++ source/libs/sync/src/raft_message.c | 17 +++++++ source/libs/sync/src/sync.c | 21 +++++++- 7 files changed, 206 insertions(+), 4 deletions(-) create mode 100644 source/libs/sync/inc/raft_message.h create mode 100644 source/libs/sync/src/raft.c create mode 100644 source/libs/sync/src/raft_message.c diff --git a/include/libs/sync/sync.h b/include/libs/sync/sync.h index f9d348d77e..1c228675bd 100644 --- a/include/libs/sync/sync.h +++ b/include/libs/sync/sync.h @@ -89,6 +89,10 @@ typedef struct SSyncLogStore { // write log with given index int32_t (*logWrite)(struct SSyncLogStore* logStore, SyncIndex index, SSyncBuffer* pBuf); + // read log from given index with limit, return the actual num in nBuf + int32_t (*logRead)(struct SSyncLogStore* logStore, SyncIndex index, int limit, + SSyncBuffer* pBuf, int* nBuf); + // mark log with given index has been commtted int32_t (*logCommit)(struct SSyncLogStore* logStore, SyncIndex index); @@ -102,6 +106,7 @@ typedef struct SSyncLogStore { typedef struct SSyncServerState { SyncNodeId voteFor; SSyncTerm term; + SyncIndex commitIndex; } SSyncServerState; typedef struct SSyncClusterConfig { @@ -146,7 +151,7 @@ SSyncNode* syncStart(const SSyncInfo*); void syncReconfig(const SSyncNode*, const SSyncCluster*); void syncStop(const SSyncNode*); -int32_t syncPropose(SSyncNode* syncNode, SSyncBuffer buffer, void* pData, bool isWeak); +int32_t syncPropose(SSyncNode* syncNode, const SSyncBuffer* pBuf, void* pData, bool isWeak); // int32_t syncAddNode(SSyncNode syncNode, const SNodeInfo *pNode); diff --git a/source/libs/sync/inc/raft.h b/source/libs/sync/inc/raft.h index 78c0c97ed6..0df46db3fc 100644 --- a/source/libs/sync/inc/raft.h +++ b/source/libs/sync/inc/raft.h @@ -16,8 +16,18 @@ #ifndef _TD_LIBS_SYNC_RAFT_H #define _TD_LIBS_SYNC_RAFT_H +#include "sync.h" +#include "raft_message.h" + typedef struct SSyncRaft { - + // owner sync node + SSyncNode* pNode; + + SSyncInfo info; + } SSyncRaft; +int32_t syncRaftStart(SSyncRaft* pRaft, const SSyncInfo* pInfo); +int32_t syncRaftStep(SSyncRaft* pRaft, const RaftMessage* pMsg); + #endif /* _TD_LIBS_SYNC_RAFT_H */ \ No newline at end of file diff --git a/source/libs/sync/inc/raft_message.h b/source/libs/sync/inc/raft_message.h new file mode 100644 index 0000000000..cb0552500a --- /dev/null +++ b/source/libs/sync/inc/raft_message.h @@ -0,0 +1,76 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef _TD_LIBS_SYNC_RAFT_MESSAGE_H +#define _TD_LIBS_SYNC_RAFT_MESSAGE_H + +#include "sync.h" + +/** + * below define message type which handled by Raft node thread + * internal message, which communicate in threads, start with RAFT_MSG_INTERNAL_*, + * internal message use pointer only, need not to be decode/encode + * outter message start with RAFT_MSG_*, need to implement its decode/encode functions + **/ +typedef enum RaftMessageType { + // client propose a cmd + RAFT_MSG_INTERNAL_PROP = 1, + + RAFT_MSG_APPEND, + RAFT_MSG_APPEND_RESP, + + RAFT_MSG_VOTE, + RAFT_MSG_VOTE_RESP, + + RAFT_MSG_PRE_VOTE, + RAFT_MSG_PRE_VOTE_RESP, + +} RaftMessageType; + +typedef struct RaftMsgInternal_Prop { + const SSyncBuffer *pBuf; + bool isWeak; + void* pData; +} RaftMsgInternal_Prop; + +typedef struct RaftMessage { + RaftMessageType msgType; + SSyncTerm term; + SyncNodeId from; + SyncNodeId to; + + union { + RaftMsgInternal_Prop propose; + }; +} RaftMessage; + +static FORCE_INLINE RaftMessage* syncInitPropMsg(RaftMessage* pMsg, const SSyncBuffer* pBuf, void* pData, bool isWeak) { + *pMsg = (RaftMessage) { + .msgType = RAFT_MSG_INTERNAL_PROP, + .propose = (RaftMsgInternal_Prop) { + .isWeak = isWeak, + .pBuf = pBuf, + .pData = pData, + }, + }; + + return pMsg; +} + +static FORCE_INLINE bool syncIsInternalMsg(const RaftMessage* pMsg) { + return pMsg->msgType == RAFT_MSG_INTERNAL_PROP; +} + +#endif /* _TD_LIBS_SYNC_RAFT_MESSAGE_H */ \ No newline at end of file diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h index 33cbd836a1..c1c3ed17a8 100644 --- a/source/libs/sync/inc/syncInt.h +++ b/source/libs/sync/inc/syncInt.h @@ -40,6 +40,9 @@ typedef struct SSyncManager { // worker threads SSyncWorker worker[TAOS_SYNC_MAX_WORKER]; + // sync net worker + SSyncWorker netWorker; + // vgroup hash table SHashObj* vgroupTable; diff --git a/source/libs/sync/src/raft.c b/source/libs/sync/src/raft.c new file mode 100644 index 0000000000..109b08902a --- /dev/null +++ b/source/libs/sync/src/raft.c @@ -0,0 +1,74 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "raft.h" +#include "syncInt.h" + +#ifndef MIN +#define MIN(x, y) (((x) < (y)) ? (x) : (y)) +#endif + +#define RAFT_READ_LOG_MAX_NUM 100 + +int32_t syncRaftStart(SSyncRaft* pRaft, const SSyncInfo* pInfo) { + SSyncNode* pNode = pRaft->pNode; + SSyncServerState serverState; + SStateManager* stateManager; + SSyncLogStore* logStore; + SSyncFSM* fsm; + SyncIndex initIndex = pInfo->snapshotIndex; + SSyncBuffer buffer[RAFT_READ_LOG_MAX_NUM]; + int nBuf, limit, i; + + memcpy(&pRaft->info, pInfo, sizeof(SSyncInfo)); + stateManager = &(pRaft->info.stateManager); + logStore = &(pRaft->info.logStore); + fsm = &(pRaft->info.fsm); + + // read server state + if (stateManager->readServerState(stateManager, &serverState) != 0) { + syncError("readServerState for vgid %d fail", pInfo->vgId); + return -1; + } + assert(initIndex <= serverState.commitIndex); + + // restore fsm state from snapshot index + 1, until commitIndex + ++initIndex; + while (initIndex < serverState.commitIndex) { + limit = MIN(RAFT_READ_LOG_MAX_NUM, serverState.commitIndex - initIndex); + + if (logStore->logRead(logStore, initIndex, limit, buffer, &nBuf) != 0) { + return -1; + } + assert(limit == nBuf); + + for (i = 0; i < limit; ++i) { + fsm->applyLog(fsm, initIndex + i, &(buffer[i]), NULL); + free(buffer[i].data); + } + initIndex += nBuf; + } + assert(initIndex == serverState.commitIndex); + + syncInfo("restore vgid %d state: snapshot index:", pInfo->vgId); + return 0; +} + +int32_t syncRaftStep(SSyncRaft* pRaft, const RaftMessage* pMsg) { + if (!syncIsInternalMsg(pMsg)) { + free(pMsg); + } + return 0; +} \ No newline at end of file diff --git a/source/libs/sync/src/raft_message.c b/source/libs/sync/src/raft_message.c new file mode 100644 index 0000000000..d35efce9db --- /dev/null +++ b/source/libs/sync/src/raft_message.c @@ -0,0 +1,17 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "raft_message.h" + diff --git a/source/libs/sync/src/sync.c b/source/libs/sync/src/sync.c index a974a17ad2..e627cf8bc1 100644 --- a/source/libs/sync/src/sync.c +++ b/source/libs/sync/src/sync.c @@ -75,7 +75,15 @@ SSyncNode* syncStart(const SSyncInfo* pInfo) { SSyncNode *pNode = (SSyncNode*)malloc(sizeof(SSyncNode)); if (pNode == NULL) { - syncInfo("malloc vgroup %d node fail", pInfo->vgId); + syncError("malloc vgroup %d node fail", pInfo->vgId); + pthread_mutex_unlock(&gSyncManager->mutex); + return NULL; + } + + // start raft + pNode->raft.pNode = pNode; + if (syncRaftStart(&pNode->raft, pInfo) != 0) { + syncError("raft start at %d node fail", pInfo->vgId); pthread_mutex_unlock(&gSyncManager->mutex); return NULL; } @@ -102,10 +110,19 @@ void syncStop(const SSyncNode* pNode) { taosHashRemove(gSyncManager->vgroupTable, &pNode->vgId, sizeof(SyncGroupId)); pthread_mutex_unlock(&gSyncManager->mutex); - pthread_mutex_destroy(&pNode->mutex); + pthread_mutex_destroy(&((*ppNode)->mutex)); free(*ppNode); } +int32_t syncPropose(SSyncNode* syncNode, const SSyncBuffer* pBuf, void* pData, bool isWeak) { + RaftMessage msg; + + pthread_mutex_lock(&syncNode->mutex); + int32_t ret = syncRaftStep(&syncNode->raft, syncInitPropMsg(&msg, pBuf, pData, isWeak)); + pthread_mutex_unlock(&syncNode->mutex); + return ret; +} + void syncReconfig(const SSyncNode* pNode, const SSyncCluster* pCfg) {} static int syncOpenWorkerPool(SSyncManager* syncManager) {