[TD-10645][raft]<feature>add raft module
This commit is contained in:
parent
2dc480adf6
commit
c319d1cb12
|
@ -89,6 +89,10 @@ typedef struct SSyncLogStore {
|
||||||
// write log with given index
|
// write log with given index
|
||||||
int32_t (*logWrite)(struct SSyncLogStore* logStore, SyncIndex index, SSyncBuffer* pBuf);
|
int32_t (*logWrite)(struct SSyncLogStore* logStore, SyncIndex index, SSyncBuffer* pBuf);
|
||||||
|
|
||||||
|
// read log from given index with limit, return the actual num in nBuf
|
||||||
|
int32_t (*logRead)(struct SSyncLogStore* logStore, SyncIndex index, int limit,
|
||||||
|
SSyncBuffer* pBuf, int* nBuf);
|
||||||
|
|
||||||
// mark log with given index has been commtted
|
// mark log with given index has been commtted
|
||||||
int32_t (*logCommit)(struct SSyncLogStore* logStore, SyncIndex index);
|
int32_t (*logCommit)(struct SSyncLogStore* logStore, SyncIndex index);
|
||||||
|
|
||||||
|
@ -102,6 +106,7 @@ typedef struct SSyncLogStore {
|
||||||
typedef struct SSyncServerState {
|
typedef struct SSyncServerState {
|
||||||
SyncNodeId voteFor;
|
SyncNodeId voteFor;
|
||||||
SSyncTerm term;
|
SSyncTerm term;
|
||||||
|
SyncIndex commitIndex;
|
||||||
} SSyncServerState;
|
} SSyncServerState;
|
||||||
|
|
||||||
typedef struct SSyncClusterConfig {
|
typedef struct SSyncClusterConfig {
|
||||||
|
@ -146,7 +151,7 @@ SSyncNode* syncStart(const SSyncInfo*);
|
||||||
void syncReconfig(const SSyncNode*, const SSyncCluster*);
|
void syncReconfig(const SSyncNode*, const SSyncCluster*);
|
||||||
void syncStop(const SSyncNode*);
|
void syncStop(const SSyncNode*);
|
||||||
|
|
||||||
int32_t syncPropose(SSyncNode* syncNode, SSyncBuffer buffer, void* pData, bool isWeak);
|
int32_t syncPropose(SSyncNode* syncNode, const SSyncBuffer* pBuf, void* pData, bool isWeak);
|
||||||
|
|
||||||
// int32_t syncAddNode(SSyncNode syncNode, const SNodeInfo *pNode);
|
// int32_t syncAddNode(SSyncNode syncNode, const SNodeInfo *pNode);
|
||||||
|
|
||||||
|
|
|
@ -16,8 +16,18 @@
|
||||||
#ifndef _TD_LIBS_SYNC_RAFT_H
|
#ifndef _TD_LIBS_SYNC_RAFT_H
|
||||||
#define _TD_LIBS_SYNC_RAFT_H
|
#define _TD_LIBS_SYNC_RAFT_H
|
||||||
|
|
||||||
|
#include "sync.h"
|
||||||
|
#include "raft_message.h"
|
||||||
|
|
||||||
typedef struct SSyncRaft {
|
typedef struct SSyncRaft {
|
||||||
|
// owner sync node
|
||||||
|
SSyncNode* pNode;
|
||||||
|
|
||||||
|
SSyncInfo info;
|
||||||
|
|
||||||
} SSyncRaft;
|
} SSyncRaft;
|
||||||
|
|
||||||
|
int32_t syncRaftStart(SSyncRaft* pRaft, const SSyncInfo* pInfo);
|
||||||
|
int32_t syncRaftStep(SSyncRaft* pRaft, const RaftMessage* pMsg);
|
||||||
|
|
||||||
#endif /* _TD_LIBS_SYNC_RAFT_H */
|
#endif /* _TD_LIBS_SYNC_RAFT_H */
|
|
@ -0,0 +1,76 @@
|
||||||
|
/*
|
||||||
|
* Copyright (c) 2019 TAOS Data, Inc. <cli@taosdata.com>
|
||||||
|
*
|
||||||
|
* This program is free software: you can use, redistribute, and/or modify
|
||||||
|
* it under the terms of the GNU Affero General Public License, version 3
|
||||||
|
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||||
|
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||||
|
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU Affero General Public License
|
||||||
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#ifndef _TD_LIBS_SYNC_RAFT_MESSAGE_H
|
||||||
|
#define _TD_LIBS_SYNC_RAFT_MESSAGE_H
|
||||||
|
|
||||||
|
#include "sync.h"
|
||||||
|
|
||||||
|
/**
|
||||||
|
* below define message type which handled by Raft node thread
|
||||||
|
* internal message, which communicate in threads, start with RAFT_MSG_INTERNAL_*,
|
||||||
|
* internal message use pointer only, need not to be decode/encode
|
||||||
|
* outter message start with RAFT_MSG_*, need to implement its decode/encode functions
|
||||||
|
**/
|
||||||
|
typedef enum RaftMessageType {
|
||||||
|
// client propose a cmd
|
||||||
|
RAFT_MSG_INTERNAL_PROP = 1,
|
||||||
|
|
||||||
|
RAFT_MSG_APPEND,
|
||||||
|
RAFT_MSG_APPEND_RESP,
|
||||||
|
|
||||||
|
RAFT_MSG_VOTE,
|
||||||
|
RAFT_MSG_VOTE_RESP,
|
||||||
|
|
||||||
|
RAFT_MSG_PRE_VOTE,
|
||||||
|
RAFT_MSG_PRE_VOTE_RESP,
|
||||||
|
|
||||||
|
} RaftMessageType;
|
||||||
|
|
||||||
|
typedef struct RaftMsgInternal_Prop {
|
||||||
|
const SSyncBuffer *pBuf;
|
||||||
|
bool isWeak;
|
||||||
|
void* pData;
|
||||||
|
} RaftMsgInternal_Prop;
|
||||||
|
|
||||||
|
typedef struct RaftMessage {
|
||||||
|
RaftMessageType msgType;
|
||||||
|
SSyncTerm term;
|
||||||
|
SyncNodeId from;
|
||||||
|
SyncNodeId to;
|
||||||
|
|
||||||
|
union {
|
||||||
|
RaftMsgInternal_Prop propose;
|
||||||
|
};
|
||||||
|
} RaftMessage;
|
||||||
|
|
||||||
|
static FORCE_INLINE RaftMessage* syncInitPropMsg(RaftMessage* pMsg, const SSyncBuffer* pBuf, void* pData, bool isWeak) {
|
||||||
|
*pMsg = (RaftMessage) {
|
||||||
|
.msgType = RAFT_MSG_INTERNAL_PROP,
|
||||||
|
.propose = (RaftMsgInternal_Prop) {
|
||||||
|
.isWeak = isWeak,
|
||||||
|
.pBuf = pBuf,
|
||||||
|
.pData = pData,
|
||||||
|
},
|
||||||
|
};
|
||||||
|
|
||||||
|
return pMsg;
|
||||||
|
}
|
||||||
|
|
||||||
|
static FORCE_INLINE bool syncIsInternalMsg(const RaftMessage* pMsg) {
|
||||||
|
return pMsg->msgType == RAFT_MSG_INTERNAL_PROP;
|
||||||
|
}
|
||||||
|
|
||||||
|
#endif /* _TD_LIBS_SYNC_RAFT_MESSAGE_H */
|
|
@ -40,6 +40,9 @@ typedef struct SSyncManager {
|
||||||
// worker threads
|
// worker threads
|
||||||
SSyncWorker worker[TAOS_SYNC_MAX_WORKER];
|
SSyncWorker worker[TAOS_SYNC_MAX_WORKER];
|
||||||
|
|
||||||
|
// sync net worker
|
||||||
|
SSyncWorker netWorker;
|
||||||
|
|
||||||
// vgroup hash table
|
// vgroup hash table
|
||||||
SHashObj* vgroupTable;
|
SHashObj* vgroupTable;
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,74 @@
|
||||||
|
/*
|
||||||
|
* Copyright (c) 2019 TAOS Data, Inc. <cli@taosdata.com>
|
||||||
|
*
|
||||||
|
* This program is free software: you can use, redistribute, and/or modify
|
||||||
|
* it under the terms of the GNU Affero General Public License, version 3
|
||||||
|
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||||
|
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||||
|
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU Affero General Public License
|
||||||
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#include "raft.h"
|
||||||
|
#include "syncInt.h"
|
||||||
|
|
||||||
|
#ifndef MIN
|
||||||
|
#define MIN(x, y) (((x) < (y)) ? (x) : (y))
|
||||||
|
#endif
|
||||||
|
|
||||||
|
#define RAFT_READ_LOG_MAX_NUM 100
|
||||||
|
|
||||||
|
int32_t syncRaftStart(SSyncRaft* pRaft, const SSyncInfo* pInfo) {
|
||||||
|
SSyncNode* pNode = pRaft->pNode;
|
||||||
|
SSyncServerState serverState;
|
||||||
|
SStateManager* stateManager;
|
||||||
|
SSyncLogStore* logStore;
|
||||||
|
SSyncFSM* fsm;
|
||||||
|
SyncIndex initIndex = pInfo->snapshotIndex;
|
||||||
|
SSyncBuffer buffer[RAFT_READ_LOG_MAX_NUM];
|
||||||
|
int nBuf, limit, i;
|
||||||
|
|
||||||
|
memcpy(&pRaft->info, pInfo, sizeof(SSyncInfo));
|
||||||
|
stateManager = &(pRaft->info.stateManager);
|
||||||
|
logStore = &(pRaft->info.logStore);
|
||||||
|
fsm = &(pRaft->info.fsm);
|
||||||
|
|
||||||
|
// read server state
|
||||||
|
if (stateManager->readServerState(stateManager, &serverState) != 0) {
|
||||||
|
syncError("readServerState for vgid %d fail", pInfo->vgId);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
assert(initIndex <= serverState.commitIndex);
|
||||||
|
|
||||||
|
// restore fsm state from snapshot index + 1, until commitIndex
|
||||||
|
++initIndex;
|
||||||
|
while (initIndex < serverState.commitIndex) {
|
||||||
|
limit = MIN(RAFT_READ_LOG_MAX_NUM, serverState.commitIndex - initIndex);
|
||||||
|
|
||||||
|
if (logStore->logRead(logStore, initIndex, limit, buffer, &nBuf) != 0) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
assert(limit == nBuf);
|
||||||
|
|
||||||
|
for (i = 0; i < limit; ++i) {
|
||||||
|
fsm->applyLog(fsm, initIndex + i, &(buffer[i]), NULL);
|
||||||
|
free(buffer[i].data);
|
||||||
|
}
|
||||||
|
initIndex += nBuf;
|
||||||
|
}
|
||||||
|
assert(initIndex == serverState.commitIndex);
|
||||||
|
|
||||||
|
syncInfo("restore vgid %d state: snapshot index:", pInfo->vgId);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t syncRaftStep(SSyncRaft* pRaft, const RaftMessage* pMsg) {
|
||||||
|
if (!syncIsInternalMsg(pMsg)) {
|
||||||
|
free(pMsg);
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
|
@ -0,0 +1,17 @@
|
||||||
|
/*
|
||||||
|
* Copyright (c) 2019 TAOS Data, Inc. <cli@taosdata.com>
|
||||||
|
*
|
||||||
|
* This program is free software: you can use, redistribute, and/or modify
|
||||||
|
* it under the terms of the GNU Affero General Public License, version 3
|
||||||
|
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||||
|
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||||
|
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU Affero General Public License
|
||||||
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#include "raft_message.h"
|
||||||
|
|
|
@ -75,7 +75,15 @@ SSyncNode* syncStart(const SSyncInfo* pInfo) {
|
||||||
|
|
||||||
SSyncNode *pNode = (SSyncNode*)malloc(sizeof(SSyncNode));
|
SSyncNode *pNode = (SSyncNode*)malloc(sizeof(SSyncNode));
|
||||||
if (pNode == NULL) {
|
if (pNode == NULL) {
|
||||||
syncInfo("malloc vgroup %d node fail", pInfo->vgId);
|
syncError("malloc vgroup %d node fail", pInfo->vgId);
|
||||||
|
pthread_mutex_unlock(&gSyncManager->mutex);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
// start raft
|
||||||
|
pNode->raft.pNode = pNode;
|
||||||
|
if (syncRaftStart(&pNode->raft, pInfo) != 0) {
|
||||||
|
syncError("raft start at %d node fail", pInfo->vgId);
|
||||||
pthread_mutex_unlock(&gSyncManager->mutex);
|
pthread_mutex_unlock(&gSyncManager->mutex);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
@ -102,10 +110,19 @@ void syncStop(const SSyncNode* pNode) {
|
||||||
taosHashRemove(gSyncManager->vgroupTable, &pNode->vgId, sizeof(SyncGroupId));
|
taosHashRemove(gSyncManager->vgroupTable, &pNode->vgId, sizeof(SyncGroupId));
|
||||||
pthread_mutex_unlock(&gSyncManager->mutex);
|
pthread_mutex_unlock(&gSyncManager->mutex);
|
||||||
|
|
||||||
pthread_mutex_destroy(&pNode->mutex);
|
pthread_mutex_destroy(&((*ppNode)->mutex));
|
||||||
free(*ppNode);
|
free(*ppNode);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t syncPropose(SSyncNode* syncNode, const SSyncBuffer* pBuf, void* pData, bool isWeak) {
|
||||||
|
RaftMessage msg;
|
||||||
|
|
||||||
|
pthread_mutex_lock(&syncNode->mutex);
|
||||||
|
int32_t ret = syncRaftStep(&syncNode->raft, syncInitPropMsg(&msg, pBuf, pData, isWeak));
|
||||||
|
pthread_mutex_unlock(&syncNode->mutex);
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
void syncReconfig(const SSyncNode* pNode, const SSyncCluster* pCfg) {}
|
void syncReconfig(const SSyncNode* pNode, const SSyncCluster* pCfg) {}
|
||||||
|
|
||||||
static int syncOpenWorkerPool(SSyncManager* syncManager) {
|
static int syncOpenWorkerPool(SSyncManager* syncManager) {
|
||||||
|
|
Loading…
Reference in New Issue