Merge pull request #9525 from taosdata/feature/3.0_mhli
Feature/3.0 mhli
This commit is contained in:
commit
fe38808477
|
@ -56,6 +56,12 @@ option(
|
|||
OFF
|
||||
)
|
||||
|
||||
option(
|
||||
BUILD_WITH_TRAFT
|
||||
"If build with traft"
|
||||
OFF
|
||||
)
|
||||
|
||||
option(
|
||||
BUILD_DEPENDENCY_TESTS
|
||||
"If build dependency tests"
|
||||
|
|
|
@ -0,0 +1,14 @@
|
|||
|
||||
# traft
|
||||
ExternalProject_Add(traft
|
||||
GIT_REPOSITORY https://github.com/taosdata/traft.git
|
||||
GIT_TAG for_3.0
|
||||
SOURCE_DIR "${CMAKE_CONTRIB_DIR}/traft"
|
||||
BINARY_DIR "${CMAKE_CONTRIB_DIR}/traft"
|
||||
#BUILD_IN_SOURCE TRUE
|
||||
# https://answers.ros.org/question/333125/how-to-include-external-automakeautoconf-projects-into-ament_cmake/
|
||||
CONFIGURE_COMMAND COMMAND autoreconf -i COMMAND ./configure --enable-example
|
||||
BUILD_COMMAND "$(MAKE)"
|
||||
INSTALL_COMMAND ""
|
||||
TEST_COMMAND ""
|
||||
)
|
|
@ -41,6 +41,12 @@ if(${BUILD_WITH_CRAFT})
|
|||
SET(BUILD_WITH_UV ON CACHE BOOL "craft need libuv" FORCE)
|
||||
endif(${BUILD_WITH_CRAFT})
|
||||
|
||||
# traft
|
||||
if(${BUILD_WITH_TRAFT})
|
||||
cat("${CMAKE_SUPPORT_DIR}/traft_CMakeLists.txt.in" ${CONTRIB_TMP_FILE})
|
||||
SET(BUILD_WITH_UV ON CACHE BOOL "traft need libuv" FORCE)
|
||||
endif(${BUILD_WITH_TRAFT})
|
||||
|
||||
#libuv
|
||||
if(${BUILD_WITH_UV})
|
||||
cat("${CMAKE_SUPPORT_DIR}/libuv_CMakeLists.txt.in" ${CONTRIB_TMP_FILE})
|
||||
|
@ -173,6 +179,18 @@ if(${BUILD_WITH_CRAFT})
|
|||
# )
|
||||
endif(${BUILD_WITH_CRAFT})
|
||||
|
||||
# TRAFT
|
||||
if(${BUILD_WITH_TRAFT})
|
||||
add_library(traft STATIC IMPORTED GLOBAL)
|
||||
set_target_properties(traft PROPERTIES
|
||||
IMPORTED_LOCATION "${CMAKE_CURRENT_SOURCE_DIR}/traft/.libs/libraft.a"
|
||||
INTERFACE_INCLUDE_DIRECTORIES "${CMAKE_CURRENT_SOURCE_DIR}/traft/include"
|
||||
)
|
||||
# target_link_libraries(craft
|
||||
# INTERFACE pthread
|
||||
# )
|
||||
endif(${BUILD_WITH_TRAFT})
|
||||
|
||||
# LIBUV
|
||||
if(${BUILD_WITH_UV})
|
||||
add_subdirectory(libuv)
|
||||
|
|
|
@ -19,4 +19,8 @@ if(${BUILD_WITH_CRAFT})
|
|||
add_subdirectory(craft)
|
||||
endif(${BUILD_WITH_CRAFT})
|
||||
|
||||
if(${BUILD_WITH_TRAFT})
|
||||
add_subdirectory(traft)
|
||||
endif(${BUILD_WITH_TRAFT})
|
||||
|
||||
add_subdirectory(tdev)
|
||||
|
|
|
@ -20,6 +20,7 @@ typedef struct {
|
|||
} Addr;
|
||||
|
||||
typedef struct {
|
||||
int voter;
|
||||
Addr me;
|
||||
Addr peers[MAX_PEERS];
|
||||
int peersCount;
|
||||
|
|
|
@ -104,7 +104,7 @@ const char* state2String(unsigned short state) {
|
|||
void printRaftConfiguration(struct raft_configuration *c) {
|
||||
printf("configuration: \n");
|
||||
for (int i = 0; i < c->n; ++i) {
|
||||
printf("%llu -- %d -- %s\n", c->servers->id, c->servers->role, c->servers->address);
|
||||
printf("%llu -- %d -- %s\n", c->servers[i].id, c->servers[i].role, c->servers[i].address);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -119,11 +119,9 @@ void printRaftState(struct raft *r) {
|
|||
printf("last_applied: %llu \n", r->last_applied);
|
||||
printf("last_stored: %llu \n", r->last_stored);
|
||||
|
||||
/*
|
||||
printf("configuration_index: %llu \n", r->configuration_index);
|
||||
printf("configuration_uncommitted_index: %llu \n", r->configuration_uncommitted_index);
|
||||
printRaftConfiguration(&r->configuration);
|
||||
*/
|
||||
|
||||
printf("----------------------------\n");
|
||||
}
|
||||
|
@ -164,6 +162,18 @@ void getValue(const char *key) {
|
|||
}
|
||||
}
|
||||
|
||||
void raft_change_cb_add(struct raft_change *req, int status) {
|
||||
printf("raft_change_cb_add status:%d ... \n", status);
|
||||
}
|
||||
|
||||
void raft_change_cb_assign(struct raft_change *req, int status) {
|
||||
printf("raft_change_cb_assign status:%d ... \n", status);
|
||||
}
|
||||
|
||||
void raft_change_cb_remove(struct raft_change *req, int status) {
|
||||
printf("raft_change_cb_remove status:%d ... \n", status);
|
||||
}
|
||||
|
||||
void console(SRaftServer *pRaftServer) {
|
||||
while (1) {
|
||||
char cmd_buf[COMMAND_LEN];
|
||||
|
@ -193,30 +203,59 @@ void console(SRaftServer *pRaftServer) {
|
|||
|
||||
parseCommand(cmd_buf, cmd, param1, param2, TOKEN_LEN);
|
||||
if (strcmp(cmd, "addnode") == 0) {
|
||||
printf("not support \n");
|
||||
//printf("not support \n");
|
||||
|
||||
/*
|
||||
char host[HOST_LEN];
|
||||
uint32_t port;
|
||||
parseAddr(param1, host, HOST_LEN, &port);
|
||||
uint64_t rid = raftId(host, port);
|
||||
|
||||
struct raft_change *req = raft_malloc(sizeof(*req));
|
||||
int r = raft_add(&pRaftServer->raft, req, rid, param1, NULL);
|
||||
int r = raft_add(&pRaftServer->raft, req, rid, param1, raft_change_cb_add);
|
||||
if (r != 0) {
|
||||
printf("raft_add: %s \n", raft_errmsg(&pRaftServer->raft));
|
||||
printf("raft_add error: %s \n", raft_errmsg(&pRaftServer->raft));
|
||||
}
|
||||
printf("add node: %lu %s \n", rid, param1);
|
||||
|
||||
struct raft_change *req2 = raft_malloc(sizeof(*req2));
|
||||
r = raft_assign(&pRaftServer->raft, req2, rid, RAFT_VOTER, NULL);
|
||||
r = raft_assign(&pRaftServer->raft, req2, rid, RAFT_VOTER, raft_change_cb_assign);
|
||||
if (r != 0) {
|
||||
printf("raft_assign: %s \n", raft_errmsg(&pRaftServer->raft));
|
||||
printf("raft_assign error: %s \n", raft_errmsg(&pRaftServer->raft));
|
||||
}
|
||||
*/
|
||||
printf("raft_assign: %s %d \n", param1, RAFT_VOTER);
|
||||
|
||||
} else if (strcmp(cmd, "activate") == 0) {
|
||||
char host[HOST_LEN];
|
||||
uint32_t port;
|
||||
parseAddr(param1, host, HOST_LEN, &port);
|
||||
uint64_t rid = raftId(host, port);
|
||||
|
||||
|
||||
struct raft_change *req2 = raft_malloc(sizeof(*req2));
|
||||
int r = raft_assign(&pRaftServer->raft, req2, rid, RAFT_VOTER, raft_change_cb_assign);
|
||||
if (r != 0) {
|
||||
printf("raft_assign error: %s \n", raft_errmsg(&pRaftServer->raft));
|
||||
}
|
||||
printf("raft_assign: %s %d \n", param1, RAFT_VOTER);
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
} else if (strcmp(cmd, "dropnode") == 0) {
|
||||
printf("not support \n");
|
||||
char host[HOST_LEN];
|
||||
uint32_t port;
|
||||
parseAddr(param1, host, HOST_LEN, &port);
|
||||
uint64_t rid = raftId(host, port);
|
||||
|
||||
struct raft_change *req = raft_malloc(sizeof(*req));
|
||||
int r = raft_remove(&pRaftServer->raft, req, rid, raft_change_cb_remove);
|
||||
if (r != 0) {
|
||||
printf("raft_remove: %s \n", raft_errmsg(&pRaftServer->raft));
|
||||
}
|
||||
printf("drop node: %lu %s \n", rid, param1);
|
||||
|
||||
|
||||
|
||||
} else if (strcmp(cmd, "put") == 0) {
|
||||
char buf[256];
|
||||
|
@ -234,6 +273,7 @@ void console(SRaftServer *pRaftServer) {
|
|||
|
||||
} else if (strcmp(cmd, "help") == 0) {
|
||||
printf("addnode \"127.0.0.1:8888\" \n");
|
||||
printf("activate \"127.0.0.1:8888\" \n");
|
||||
printf("dropnode \"127.0.0.1:8888\" \n");
|
||||
printf("put key value \n");
|
||||
printf("get key \n");
|
||||
|
@ -256,7 +296,9 @@ void *startConsoleFunc(void *param) {
|
|||
// Config ---------------------------------
|
||||
void usage() {
|
||||
printf("\nusage: \n");
|
||||
printf("%s --me=127.0.0.1:10000 --dir=./data \n", exe_name);
|
||||
printf("%s --me=127.0.0.1:10000 --dir=./data --voter \n", exe_name);
|
||||
printf("%s --me=127.0.0.1:10001 --dir=./data \n", exe_name);
|
||||
printf("%s --me=127.0.0.1:10002 --dir=./data \n", exe_name);
|
||||
printf("\n");
|
||||
printf("%s --me=127.0.0.1:10000 --peers=127.0.0.1:10001,127.0.0.1:10002 --dir=./data \n", exe_name);
|
||||
printf("%s --me=127.0.0.1:10001 --peers=127.0.0.1:10000,127.0.0.1:10002 --dir=./data \n", exe_name);
|
||||
|
@ -271,13 +313,15 @@ void parseConf(int argc, char **argv, SRaftServerConfig *pConf) {
|
|||
option_index = 0;
|
||||
static struct option long_options[] = {
|
||||
{"help", no_argument, NULL, 'h'},
|
||||
{"voter", no_argument, NULL, 'v'},
|
||||
{"peers", required_argument, NULL, 'p'},
|
||||
{"me", required_argument, NULL, 'm'},
|
||||
{"dir", required_argument, NULL, 'd'},
|
||||
{NULL, 0, NULL, 0}
|
||||
};
|
||||
|
||||
while ((option_value = getopt_long(argc, argv, "hp:m:d:", long_options, &option_index)) != -1) {
|
||||
pConf->voter = 0;
|
||||
while ((option_value = getopt_long(argc, argv, "hvp:m:d:", long_options, &option_index)) != -1) {
|
||||
switch (option_value) {
|
||||
case 'm': {
|
||||
parseAddr(optarg, pConf->me.host, sizeof(pConf->me.host), &pConf->me.port);
|
||||
|
@ -295,6 +339,10 @@ void parseConf(int argc, char **argv, SRaftServerConfig *pConf) {
|
|||
break;
|
||||
}
|
||||
|
||||
case 'v': {
|
||||
pConf->voter = 1;
|
||||
break;
|
||||
}
|
||||
|
||||
case 'd': {
|
||||
snprintf(pConf->dir, sizeof(pConf->dir), "%s", optarg);
|
||||
|
@ -338,6 +386,8 @@ int main(int argc, char **argv) {
|
|||
exit(-1);
|
||||
}
|
||||
|
||||
signal(SIGPIPE, SIG_IGN);
|
||||
|
||||
SRaftServerConfig conf;
|
||||
parseConf(argc, argv, &conf);
|
||||
printConf(&conf);
|
||||
|
|
|
@ -85,29 +85,45 @@ int32_t raftServerInit(SRaftServer *pRaftServer, const SRaftServerConfig *pConf,
|
|||
pRaftServer->fsm = pFsm;
|
||||
|
||||
ret = uv_loop_init(&pRaftServer->loop);
|
||||
if (!ret) {
|
||||
if (ret != 0) {
|
||||
fprintf(stderr, "%s \n", raft_errmsg(&pRaftServer->raft));
|
||||
assert(0);
|
||||
}
|
||||
|
||||
ret = raft_uv_tcp_init(&pRaftServer->transport, &pRaftServer->loop);
|
||||
if (!ret) {
|
||||
if (ret != 0) {
|
||||
fprintf(stderr, "%s \n", raft_errmsg(&pRaftServer->raft));
|
||||
assert(0);
|
||||
}
|
||||
|
||||
ret = raft_uv_init(&pRaftServer->io, &pRaftServer->loop, pRaftServer->dir, &pRaftServer->transport);
|
||||
if (!ret) {
|
||||
if (ret != 0) {
|
||||
fprintf(stderr, "%s \n", raft_errmsg(&pRaftServer->raft));
|
||||
assert(0);
|
||||
}
|
||||
|
||||
ret = raft_init(&pRaftServer->raft, &pRaftServer->io, pRaftServer->fsm, pRaftServer->raftId, pRaftServer->address);
|
||||
if (!ret) {
|
||||
if (ret != 0) {
|
||||
fprintf(stderr, "%s \n", raft_errmsg(&pRaftServer->raft));
|
||||
assert(0);
|
||||
}
|
||||
|
||||
struct raft_configuration conf;
|
||||
raft_configuration_init(&conf);
|
||||
raft_configuration_add(&conf, pRaftServer->raftId, pRaftServer->address, RAFT_VOTER);
|
||||
|
||||
if (pConf->voter == 0) {
|
||||
raft_configuration_add(&conf, pRaftServer->raftId, pRaftServer->address, RAFT_SPARE);
|
||||
|
||||
} else {
|
||||
raft_configuration_add(&conf, pRaftServer->raftId, pRaftServer->address, RAFT_VOTER);
|
||||
|
||||
}
|
||||
|
||||
|
||||
|
||||
printf("add myself: %llu - %s \n", pRaftServer->raftId, pRaftServer->address);
|
||||
|
||||
|
||||
for (int i = 0; i < pConf->peersCount; ++i) {
|
||||
const Addr *pAddr = &pConf->peers[i];
|
||||
raft_id rid = raftId(pAddr->host, pAddr->port);
|
||||
|
|
|
@ -0,0 +1,7 @@
|
|||
add_executable(raftMain "")
|
||||
target_sources(raftMain
|
||||
PRIVATE
|
||||
"raftMain.c"
|
||||
"raftServer.c"
|
||||
)
|
||||
target_link_libraries(raftMain PUBLIC traft lz4 uv_a)
|
|
@ -0,0 +1,4 @@
|
|||
#!/bin/bash
|
||||
|
||||
rm -rf 127.0.0.1*
|
||||
rm -rf ./data
|
|
@ -0,0 +1,36 @@
|
|||
#ifndef TDENGINE_COMMON_H
|
||||
#define TDENGINE_COMMON_H
|
||||
|
||||
#ifdef __cplusplus
|
||||
extern "C" {
|
||||
#endif
|
||||
|
||||
#include <stdint.h>
|
||||
|
||||
#define MAX_INSTANCE_NUM 100
|
||||
|
||||
#define MAX_PEERS 10
|
||||
#define COMMAND_LEN 1024
|
||||
#define TOKEN_LEN 128
|
||||
#define DIR_LEN 256
|
||||
#define HOST_LEN 64
|
||||
#define ADDRESS_LEN (HOST_LEN + 16)
|
||||
|
||||
typedef struct {
|
||||
char host[HOST_LEN];
|
||||
uint32_t port;
|
||||
} Addr;
|
||||
|
||||
typedef struct {
|
||||
Addr me;
|
||||
Addr peers[MAX_PEERS];
|
||||
int peersCount;
|
||||
char dir[DIR_LEN];
|
||||
char dataDir[DIR_LEN + HOST_LEN * 2];
|
||||
} SRaftServerConfig;
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
||||
#endif // TDENGINE_COMMON_H
|
|
@ -0,0 +1,18 @@
|
|||
|
||||
make raftServer
|
||||
|
||||
all:
|
||||
gcc raftMain.c raftServer.c -I ../../traft/include/ ../../traft/.libs/libraft.a -o raftMain -luv -llz4 -lpthread -g
|
||||
clean:
|
||||
rm -f raftMain
|
||||
sh clear.sh
|
||||
|
||||
|
||||
make traft:
|
||||
|
||||
sudo apt-get install libuv1-dev liblz4-dev
|
||||
autoreconf -i
|
||||
./configure --enable-example
|
||||
make
|
||||
|
||||
|
|
@ -0,0 +1,659 @@
|
|||
#include <stdio.h>
|
||||
#include <unistd.h>
|
||||
#include <pthread.h>
|
||||
#include <string.h>
|
||||
#include <assert.h>
|
||||
#include <getopt.h>
|
||||
#include <time.h>
|
||||
#include <stdlib.h>
|
||||
#include <getopt.h>
|
||||
#include <raft.h>
|
||||
#include <raft/uv.h>
|
||||
#include "raftServer.h"
|
||||
#include "common.h"
|
||||
|
||||
const char *exe_name;
|
||||
|
||||
typedef struct LeaderState {
|
||||
char address[48];
|
||||
int leaderCount;
|
||||
|
||||
} LeaderState;
|
||||
|
||||
#define NODE_COUNT 3
|
||||
LeaderState leaderStates[NODE_COUNT];
|
||||
|
||||
void printLeaderCount() {
|
||||
for (int i = 0; i < NODE_COUNT; ++i) {
|
||||
printf("%s: leaderCount:%d \n", leaderStates[i].address, leaderStates[i].leaderCount);
|
||||
}
|
||||
}
|
||||
|
||||
void updateLeaderStates(SRaftServer *pRaftServer) {
|
||||
for (int i = 0; i < pRaftServer->instance[0].raft.configuration.n; ++i) {
|
||||
snprintf(leaderStates[i].address, sizeof(leaderStates[i].address), "%s", pRaftServer->instance[0].raft.configuration.servers[i].address);
|
||||
leaderStates[i].leaderCount = 0;
|
||||
}
|
||||
|
||||
for (int i = 0; i < pRaftServer->instanceCount; ++i) {
|
||||
struct raft *r = &pRaftServer->instance[i].raft;
|
||||
|
||||
char leaderAddress[128];
|
||||
memset(leaderAddress, 0, sizeof(leaderAddress));
|
||||
|
||||
if (r->state == RAFT_LEADER) {
|
||||
snprintf(leaderAddress, sizeof(leaderAddress), "%s", r->address);
|
||||
} else if (r->state == RAFT_FOLLOWER) {
|
||||
snprintf(leaderAddress, sizeof(leaderAddress), "%s", r->follower_state.current_leader.address);
|
||||
}
|
||||
|
||||
for (int j = 0; j < NODE_COUNT; j++) {
|
||||
if (strcmp(leaderAddress, leaderStates[j].address) == 0) {
|
||||
leaderStates[j].leaderCount++;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
void raftTransferCb(struct raft_transfer *req) {
|
||||
SRaftServer *pRaftServer = req->data;
|
||||
raft_free(req);
|
||||
|
||||
printf("raftTransferCb: \n");
|
||||
updateLeaderStates(pRaftServer);
|
||||
printLeaderCount();
|
||||
|
||||
int myLeaderCount;
|
||||
for (int i = 0; i < NODE_COUNT; ++i) {
|
||||
if (strcmp(pRaftServer->address, leaderStates[i].address) == 0) {
|
||||
myLeaderCount = leaderStates[i].leaderCount;
|
||||
}
|
||||
}
|
||||
|
||||
printf("myLeaderCount:%d waterLevel:%d \n", myLeaderCount, pRaftServer->instanceCount / NODE_COUNT);
|
||||
if (myLeaderCount > pRaftServer->instanceCount / NODE_COUNT) {
|
||||
struct raft *r;
|
||||
for (int j = 0; j < pRaftServer->instanceCount; ++j) {
|
||||
if (pRaftServer->instance[j].raft.state == RAFT_LEADER) {
|
||||
r = &pRaftServer->instance[j].raft;
|
||||
}
|
||||
}
|
||||
|
||||
struct raft_transfer *transfer = raft_malloc(sizeof(*transfer));
|
||||
transfer->data = pRaftServer;
|
||||
|
||||
uint64_t destRaftId;
|
||||
int minIndex = -1;
|
||||
int minLeaderCount = myLeaderCount;
|
||||
for (int j = 0; j < NODE_COUNT; ++j) {
|
||||
if (strcmp(leaderStates[j].address, pRaftServer->address) == 0) continue;
|
||||
if (leaderStates[j].leaderCount <= minLeaderCount) {
|
||||
minIndex = j;
|
||||
}
|
||||
}
|
||||
|
||||
char myHost[48];
|
||||
uint16_t myPort;
|
||||
uint16_t myVid;
|
||||
decodeRaftId(r->id, myHost, sizeof(myHost), &myPort, &myVid);
|
||||
|
||||
char *destAddress = leaderStates[minIndex].address;
|
||||
|
||||
char tokens[MAX_PEERS][MAX_TOKEN_LEN];
|
||||
splitString(destAddress, ":", tokens, 2);
|
||||
char *destHost = tokens[0];
|
||||
uint16_t destPort = atoi(tokens[1]);
|
||||
destRaftId = encodeRaftId(destHost, destPort, myVid);
|
||||
|
||||
raft_transfer(r, transfer, destRaftId, raftTransferCb);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
void parseAddr(const char *addr, char *host, int len, uint32_t *port) {
|
||||
char* tmp = (char*)malloc(strlen(addr) + 1);
|
||||
strcpy(tmp, addr);
|
||||
|
||||
char* context;
|
||||
char* separator = ":";
|
||||
char* token = strtok_r(tmp, separator, &context);
|
||||
if (token) {
|
||||
snprintf(host, len, "%s", token);
|
||||
}
|
||||
|
||||
token = strtok_r(NULL, separator, &context);
|
||||
if (token) {
|
||||
sscanf(token, "%u", port);
|
||||
}
|
||||
|
||||
free(tmp);
|
||||
}
|
||||
|
||||
// only parse 3 tokens
|
||||
int parseCommand3(const char* str, char* token1, char* token2, char* token3, int len)
|
||||
{
|
||||
char* tmp = (char*)malloc(strlen(str) + 1);
|
||||
strcpy(tmp, str);
|
||||
|
||||
char* context;
|
||||
char* separator = " ";
|
||||
int n = 0;
|
||||
|
||||
char* token = strtok_r(tmp, separator, &context);
|
||||
if (!token) {
|
||||
goto ret;
|
||||
}
|
||||
if (strcmp(token, "") != 0) {
|
||||
strncpy(token1, token, len);
|
||||
n++;
|
||||
}
|
||||
|
||||
token = strtok_r(NULL, separator, &context);
|
||||
if (!token) {
|
||||
goto ret;
|
||||
}
|
||||
if (strcmp(token, "") != 0) {
|
||||
strncpy(token2, token, len);
|
||||
n++;
|
||||
}
|
||||
|
||||
token = strtok_r(NULL, separator, &context);
|
||||
if (!token) {
|
||||
goto ret;
|
||||
}
|
||||
if (strcmp(token, "") != 0) {
|
||||
strncpy(token3, token, len);
|
||||
n++;
|
||||
}
|
||||
|
||||
ret:
|
||||
return n;
|
||||
free(tmp);
|
||||
}
|
||||
|
||||
// only parse 4 tokens
|
||||
int parseCommand4(const char* str, char* token1, char* token2, char* token3, char *token4, int len)
|
||||
{
|
||||
char* tmp = (char*)malloc(strlen(str) + 1);
|
||||
strcpy(tmp, str);
|
||||
|
||||
char* context;
|
||||
char* separator = " ";
|
||||
int n = 0;
|
||||
|
||||
char* token = strtok_r(tmp, separator, &context);
|
||||
if (!token) {
|
||||
goto ret;
|
||||
}
|
||||
if (strcmp(token, "") != 0) {
|
||||
strncpy(token1, token, len);
|
||||
n++;
|
||||
}
|
||||
|
||||
token = strtok_r(NULL, separator, &context);
|
||||
if (!token) {
|
||||
goto ret;
|
||||
}
|
||||
if (strcmp(token, "") != 0) {
|
||||
strncpy(token2, token, len);
|
||||
n++;
|
||||
}
|
||||
|
||||
token = strtok_r(NULL, separator, &context);
|
||||
if (!token) {
|
||||
goto ret;
|
||||
}
|
||||
if (strcmp(token, "") != 0) {
|
||||
strncpy(token3, token, len);
|
||||
n++;
|
||||
}
|
||||
|
||||
token = strtok_r(NULL, separator, &context);
|
||||
if (!token) {
|
||||
goto ret;
|
||||
}
|
||||
if (strcmp(token, "") != 0) {
|
||||
strncpy(token4, token, len);
|
||||
n++;
|
||||
}
|
||||
|
||||
ret:
|
||||
return n;
|
||||
free(tmp);
|
||||
}
|
||||
|
||||
void *startServerFunc(void *param) {
|
||||
SRaftServer *pServer = (SRaftServer*)param;
|
||||
int32_t r = raftServerStart(pServer);
|
||||
assert(r == 0);
|
||||
|
||||
return NULL;
|
||||
}
|
||||
|
||||
// Console ---------------------------------
|
||||
const char* state2String(unsigned short state) {
|
||||
if (state == RAFT_UNAVAILABLE) {
|
||||
return "RAFT_UNAVAILABLE";
|
||||
|
||||
} else if (state == RAFT_FOLLOWER) {
|
||||
return "RAFT_FOLLOWER";
|
||||
|
||||
} else if (state == RAFT_CANDIDATE) {
|
||||
return "RAFT_CANDIDATE";
|
||||
|
||||
} else if (state == RAFT_LEADER) {
|
||||
return "RAFT_LEADER";
|
||||
|
||||
}
|
||||
return "UNKNOWN_RAFT_STATE";
|
||||
}
|
||||
|
||||
|
||||
void printRaftState2(struct raft *r) {
|
||||
|
||||
char leaderAddress[128];
|
||||
memset(leaderAddress, 0, sizeof(leaderAddress));
|
||||
|
||||
if (r->state == RAFT_LEADER) {
|
||||
snprintf(leaderAddress, sizeof(leaderAddress), "%s", r->address);
|
||||
} else if (r->state == RAFT_FOLLOWER) {
|
||||
snprintf(leaderAddress, sizeof(leaderAddress), "%s", r->follower_state.current_leader.address);
|
||||
}
|
||||
|
||||
for (int i = 0; i < r->configuration.n; ++i) {
|
||||
char tmpAddress[128];
|
||||
snprintf(tmpAddress, sizeof(tmpAddress), "%s", r->configuration.servers[i].address);
|
||||
|
||||
uint64_t raftId = r->configuration.servers[i].id;
|
||||
char host[128];
|
||||
uint16_t port;
|
||||
uint16_t vid;
|
||||
decodeRaftId(raftId, host, 128, &port, &vid);
|
||||
|
||||
char buf[512];
|
||||
memset(buf, 0, sizeof(buf));
|
||||
if (strcmp(tmpAddress, leaderAddress) == 0) {
|
||||
snprintf(buf, sizeof(buf), "<%s:%u-%u-LEADER>\t", host, port, vid);
|
||||
} else {
|
||||
snprintf(buf, sizeof(buf), "<%s:%u-%u-FOLLOWER>\t", host, port, vid);
|
||||
}
|
||||
printf("%s", buf);
|
||||
}
|
||||
printf("\n");
|
||||
}
|
||||
|
||||
void printRaftConfiguration(struct raft_configuration *c) {
|
||||
printf("configuration: \n");
|
||||
for (int i = 0; i < c->n; ++i) {
|
||||
printf("%llu -- %d -- %s\n", c->servers[i].id, c->servers[i].role, c->servers[i].address);
|
||||
}
|
||||
}
|
||||
|
||||
void printRaftState(struct raft *r) {
|
||||
printf("----Raft State: -----------\n");
|
||||
printf("mem_addr: %p \n", r);
|
||||
printf("my_id: %llu \n", r->id);
|
||||
printf("address: %s \n", r->address);
|
||||
printf("current_term: %llu \n", r->current_term);
|
||||
printf("voted_for: %llu \n", r->voted_for);
|
||||
printf("role: %s \n", state2String(r->state));
|
||||
printf("commit_index: %llu \n", r->commit_index);
|
||||
printf("last_applied: %llu \n", r->last_applied);
|
||||
printf("last_stored: %llu \n", r->last_stored);
|
||||
|
||||
printf("configuration_index: %llu \n", r->configuration_index);
|
||||
printf("configuration_uncommitted_index: %llu \n", r->configuration_uncommitted_index);
|
||||
printRaftConfiguration(&r->configuration);
|
||||
|
||||
printf("----------------------------\n");
|
||||
}
|
||||
|
||||
void putValueCb(struct raft_apply *req, int status, void *result) {
|
||||
raft_free(req);
|
||||
struct raft *r = req->data;
|
||||
if (status != 0) {
|
||||
printf("putValueCb: %s \n", raft_errmsg(r));
|
||||
} else {
|
||||
printf("putValueCb: %s \n", "ok");
|
||||
}
|
||||
}
|
||||
|
||||
void putValue(struct raft *r, const char *value) {
|
||||
struct raft_buffer buf;
|
||||
|
||||
buf.len = TOKEN_LEN;;
|
||||
buf.base = raft_malloc(buf.len);
|
||||
snprintf(buf.base, buf.len, "%s", value);
|
||||
|
||||
struct raft_apply *req = raft_malloc(sizeof(struct raft_apply));
|
||||
req->data = r;
|
||||
int ret = raft_apply(r, req, &buf, 1, putValueCb);
|
||||
if (ret == 0) {
|
||||
printf("put %s \n", (char*)buf.base);
|
||||
} else {
|
||||
printf("put error: %s \n", raft_errmsg(r));
|
||||
}
|
||||
}
|
||||
|
||||
void getValue(const char *key) {
|
||||
char *ptr = getKV(key);
|
||||
if (ptr) {
|
||||
printf("get value: [%s] \n", ptr);
|
||||
} else {
|
||||
printf("value not found for key: [%s] \n", key);
|
||||
}
|
||||
}
|
||||
|
||||
void console(SRaftServer *pRaftServer) {
|
||||
while (1) {
|
||||
char cmd_buf[COMMAND_LEN];
|
||||
memset(cmd_buf, 0, sizeof(cmd_buf));
|
||||
char *ret = fgets(cmd_buf, COMMAND_LEN, stdin);
|
||||
if (!ret) {
|
||||
exit(-1);
|
||||
}
|
||||
|
||||
int pos = strlen(cmd_buf);
|
||||
if(cmd_buf[pos - 1] == '\n') {
|
||||
cmd_buf[pos - 1] = '\0';
|
||||
}
|
||||
|
||||
if (strncmp(cmd_buf, "", COMMAND_LEN) == 0) {
|
||||
continue;
|
||||
}
|
||||
|
||||
char cmd[TOKEN_LEN];
|
||||
memset(cmd, 0, sizeof(cmd));
|
||||
|
||||
char param1[TOKEN_LEN];
|
||||
memset(param1, 0, sizeof(param1));
|
||||
|
||||
char param2[TOKEN_LEN];
|
||||
memset(param2, 0, sizeof(param2));
|
||||
|
||||
char param3[TOKEN_LEN];
|
||||
memset(param2, 0, sizeof(param2));
|
||||
|
||||
parseCommand4(cmd_buf, cmd, param1, param2, param3, TOKEN_LEN);
|
||||
if (strcmp(cmd, "addnode") == 0) {
|
||||
printf("not support \n");
|
||||
|
||||
/*
|
||||
char host[HOST_LEN];
|
||||
uint32_t port;
|
||||
parseAddr(param1, host, HOST_LEN, &port);
|
||||
uint64_t rid = raftId(host, port);
|
||||
|
||||
struct raft_change *req = raft_malloc(sizeof(*req));
|
||||
int r = raft_add(&pRaftServer->raft, req, rid, param1, NULL);
|
||||
if (r != 0) {
|
||||
printf("raft_add: %s \n", raft_errmsg(&pRaftServer->raft));
|
||||
}
|
||||
printf("add node: %lu %s \n", rid, param1);
|
||||
|
||||
struct raft_change *req2 = raft_malloc(sizeof(*req2));
|
||||
r = raft_assign(&pRaftServer->raft, req2, rid, RAFT_VOTER, NULL);
|
||||
if (r != 0) {
|
||||
printf("raft_assign: %s \n", raft_errmsg(&pRaftServer->raft));
|
||||
}
|
||||
*/
|
||||
|
||||
} else if (strcmp(cmd, "dropnode") == 0) {
|
||||
printf("not support \n");
|
||||
|
||||
} else if (strcmp(cmd, "rebalance") == 0) {
|
||||
|
||||
/*
|
||||
updateLeaderStates(pRaftServer);
|
||||
|
||||
int myLeaderCount;
|
||||
for (int i = 0; i < NODE_COUNT; ++i) {
|
||||
if (strcmp(pRaftServer->address, leaderStates[i].address) == 0) {
|
||||
myLeaderCount = leaderStates[i].leaderCount;
|
||||
}
|
||||
}
|
||||
|
||||
while (myLeaderCount > pRaftServer->instanceCount / NODE_COUNT) {
|
||||
printf("myLeaderCount:%d waterLevel:%d \n", myLeaderCount, pRaftServer->instanceCount / NODE_COUNT);
|
||||
|
||||
struct raft *r;
|
||||
for (int j = 0; j < pRaftServer->instanceCount; ++j) {
|
||||
if (pRaftServer->instance[j].raft.state == RAFT_LEADER) {
|
||||
r = &pRaftServer->instance[j].raft;
|
||||
}
|
||||
}
|
||||
|
||||
struct raft_transfer *transfer = raft_malloc(sizeof(*transfer));
|
||||
transfer->data = pRaftServer;
|
||||
|
||||
uint64_t destRaftId;
|
||||
int minIndex = -1;
|
||||
int minLeaderCount = myLeaderCount;
|
||||
for (int j = 0; j < NODE_COUNT; ++j) {
|
||||
if (strcmp(leaderStates[j].address, pRaftServer->address) == 0) continue;
|
||||
|
||||
printf("-----leaderStates[%d].leaderCount:%d \n", j, leaderStates[j].leaderCount);
|
||||
if (leaderStates[j].leaderCount <= minLeaderCount) {
|
||||
minIndex = j;
|
||||
printf("++++ assign minIndex : %d \n", minIndex);
|
||||
}
|
||||
}
|
||||
|
||||
printf("minIndex:%d minLeaderCount:%d \n", minIndex, minLeaderCount);
|
||||
|
||||
char myHost[48];
|
||||
uint16_t myPort;
|
||||
uint16_t myVid;
|
||||
decodeRaftId(r->id, myHost, sizeof(myHost), &myPort, &myVid);
|
||||
|
||||
char *destAddress = leaderStates[minIndex].address;
|
||||
|
||||
char tokens[MAX_PEERS][MAX_TOKEN_LEN];
|
||||
splitString(destAddress, ":", tokens, 2);
|
||||
char *destHost = tokens[0];
|
||||
uint16_t destPort = atoi(tokens[1]);
|
||||
destRaftId = encodeRaftId(destHost, destPort, myVid);
|
||||
|
||||
printf("destHost:%s destPort:%u myVid:%u", destHost, destPort, myVid);
|
||||
raft_transfer(r, transfer, destRaftId, raftTransferCb);
|
||||
sleep(1);
|
||||
|
||||
for (int i = 0; i < NODE_COUNT; ++i) {
|
||||
if (strcmp(pRaftServer->address, leaderStates[i].address) == 0) {
|
||||
myLeaderCount = leaderStates[i].leaderCount;
|
||||
}
|
||||
}
|
||||
}
|
||||
*/
|
||||
|
||||
|
||||
int leaderCount = 0;
|
||||
|
||||
struct raft *firstR;
|
||||
for (int i = 0; i < pRaftServer->instanceCount; ++i) {
|
||||
struct raft *r = &pRaftServer->instance[i].raft;
|
||||
if (r->state == RAFT_LEADER) {
|
||||
leaderCount++;
|
||||
firstR = r;
|
||||
}
|
||||
}
|
||||
|
||||
if (leaderCount > pRaftServer->instanceCount / NODE_COUNT) {
|
||||
struct raft_transfer *transfer = raft_malloc(sizeof(*transfer));
|
||||
transfer->data = pRaftServer;
|
||||
raft_transfer(firstR, transfer, 0, raftTransferCb);
|
||||
}
|
||||
|
||||
|
||||
} else if (strcmp(cmd, "put") == 0) {
|
||||
char buf[256];
|
||||
uint16_t vid;
|
||||
sscanf(param1, "%hu", &vid);
|
||||
snprintf(buf, sizeof(buf), "%s--%s", param2, param3);
|
||||
putValue(&pRaftServer->instance[vid].raft, buf);
|
||||
|
||||
} else if (strcmp(cmd, "get") == 0) {
|
||||
getValue(param1);
|
||||
|
||||
} else if (strcmp(cmd, "transfer") == 0) {
|
||||
uint16_t vid;
|
||||
sscanf(param1, "%hu", &vid);
|
||||
|
||||
struct raft_transfer transfer;
|
||||
raft_transfer(&pRaftServer->instance[vid].raft, &transfer, 0, NULL);
|
||||
|
||||
|
||||
} else if (strcmp(cmd, "state") == 0) {
|
||||
for (int i = 0; i < pRaftServer->instanceCount; ++i) {
|
||||
printf("instance %d: ", i);
|
||||
printRaftState(&pRaftServer->instance[i].raft);
|
||||
}
|
||||
|
||||
} else if (strcmp(cmd, "state2") == 0) {
|
||||
for (int i = 0; i < pRaftServer->instanceCount; ++i) {
|
||||
printRaftState2(&pRaftServer->instance[i].raft);
|
||||
}
|
||||
|
||||
} else if (strcmp(cmd, "snapshot") == 0) {
|
||||
printf("not support \n");
|
||||
|
||||
} else if (strcmp(cmd, "help") == 0) {
|
||||
printf("addnode \"127.0.0.1:8888\" \n");
|
||||
printf("dropnode \"127.0.0.1:8888\" \n");
|
||||
printf("put key value \n");
|
||||
printf("get key \n");
|
||||
printf("state \n");
|
||||
|
||||
} else {
|
||||
printf("unknown command: [%s], type \"help\" to see help \n", cmd);
|
||||
}
|
||||
|
||||
//printf("cmd_buf: [%s] \n", cmd_buf);
|
||||
}
|
||||
}
|
||||
|
||||
void *startConsoleFunc(void *param) {
|
||||
SRaftServer *pServer = (SRaftServer*)param;
|
||||
console(pServer);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
// Config ---------------------------------
|
||||
void usage() {
|
||||
printf("\nusage: \n");
|
||||
printf("%s --me=127.0.0.1:10000 --dir=./data \n", exe_name);
|
||||
printf("\n");
|
||||
printf("%s --me=127.0.0.1:10000 --peers=127.0.0.1:10001,127.0.0.1:10002 --dir=./data \n", exe_name);
|
||||
printf("%s --me=127.0.0.1:10001 --peers=127.0.0.1:10000,127.0.0.1:10002 --dir=./data \n", exe_name);
|
||||
printf("%s --me=127.0.0.1:10002 --peers=127.0.0.1:10000,127.0.0.1:10001 --dir=./data \n", exe_name);
|
||||
printf("\n");
|
||||
}
|
||||
|
||||
void parseConf(int argc, char **argv, SRaftServerConfig *pConf) {
|
||||
memset(pConf, 0, sizeof(*pConf));
|
||||
|
||||
int option_index, option_value;
|
||||
option_index = 0;
|
||||
static struct option long_options[] = {
|
||||
{"help", no_argument, NULL, 'h'},
|
||||
{"peers", required_argument, NULL, 'p'},
|
||||
{"me", required_argument, NULL, 'm'},
|
||||
{"dir", required_argument, NULL, 'd'},
|
||||
{NULL, 0, NULL, 0}
|
||||
};
|
||||
|
||||
while ((option_value = getopt_long(argc, argv, "hp:m:d:", long_options, &option_index)) != -1) {
|
||||
switch (option_value) {
|
||||
case 'm': {
|
||||
parseAddr(optarg, pConf->me.host, sizeof(pConf->me.host), &pConf->me.port);
|
||||
break;
|
||||
}
|
||||
|
||||
case 'p': {
|
||||
char tokens[MAX_PEERS][MAX_TOKEN_LEN];
|
||||
int peerCount = splitString(optarg, ",", tokens, MAX_PEERS);
|
||||
pConf->peersCount = peerCount;
|
||||
for (int i = 0; i < peerCount; ++i) {
|
||||
Addr *pAddr = &pConf->peers[i];
|
||||
parseAddr(tokens[i], pAddr->host, sizeof(pAddr->host), &pAddr->port);
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
||||
|
||||
case 'd': {
|
||||
snprintf(pConf->dir, sizeof(pConf->dir), "%s", optarg);
|
||||
break;
|
||||
}
|
||||
|
||||
case 'h': {
|
||||
usage();
|
||||
exit(-1);
|
||||
}
|
||||
|
||||
default: {
|
||||
usage();
|
||||
exit(-1);
|
||||
}
|
||||
}
|
||||
}
|
||||
snprintf(pConf->dataDir, sizeof(pConf->dataDir), "%s/%s_%u", pConf->dir, pConf->me.host, pConf->me.port);
|
||||
}
|
||||
|
||||
void printConf(SRaftServerConfig *pConf) {
|
||||
printf("\nconf: \n");
|
||||
printf("me: %s:%u \n", pConf->me.host, pConf->me.port);
|
||||
printf("peersCount: %d \n", pConf->peersCount);
|
||||
for (int i = 0; i < pConf->peersCount; ++i) {
|
||||
Addr *pAddr = &pConf->peers[i];
|
||||
printf("peer%d: %s:%u \n", i, pAddr->host, pAddr->port);
|
||||
}
|
||||
printf("dataDir: %s \n\n", pConf->dataDir);
|
||||
|
||||
}
|
||||
|
||||
|
||||
int main(int argc, char **argv) {
|
||||
srand(time(NULL));
|
||||
int32_t ret;
|
||||
|
||||
exe_name = argv[0];
|
||||
if (argc < 3) {
|
||||
usage();
|
||||
exit(-1);
|
||||
}
|
||||
|
||||
SRaftServerConfig conf;
|
||||
parseConf(argc, argv, &conf);
|
||||
printConf(&conf);
|
||||
|
||||
signal(SIGPIPE, SIG_IGN);
|
||||
|
||||
/*
|
||||
char cmd_buf[COMMAND_LEN];
|
||||
snprintf(cmd_buf, sizeof(cmd_buf), "mkdir -p %s", conf.dataDir);
|
||||
system(cmd_buf);
|
||||
*/
|
||||
|
||||
|
||||
struct raft_fsm fsm;
|
||||
initFsm(&fsm);
|
||||
|
||||
SRaftServer raftServer;
|
||||
ret = raftServerInit(&raftServer, &conf, &fsm);
|
||||
assert(ret == 0);
|
||||
|
||||
pthread_t tidRaftServer;
|
||||
pthread_create(&tidRaftServer, NULL, startServerFunc, &raftServer);
|
||||
|
||||
pthread_t tidConsole;
|
||||
pthread_create(&tidConsole, NULL, startConsoleFunc, &raftServer);
|
||||
|
||||
while (1) {
|
||||
sleep(10);
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
|
@ -0,0 +1,222 @@
|
|||
#include <unistd.h>
|
||||
#include <stdlib.h>
|
||||
#include "common.h"
|
||||
#include "raftServer.h"
|
||||
|
||||
char *keys;
|
||||
char *values;
|
||||
|
||||
void initStore() {
|
||||
keys = malloc(MAX_RECORD_COUNT * MAX_KV_LEN);
|
||||
values = malloc(MAX_RECORD_COUNT * MAX_KV_LEN);
|
||||
writeIndex = 0;
|
||||
}
|
||||
|
||||
void destroyStore() {
|
||||
free(keys);
|
||||
free(values);
|
||||
}
|
||||
|
||||
void putKV(const char *key, const char *value) {
|
||||
if (writeIndex < MAX_RECORD_COUNT) {
|
||||
strncpy(&keys[writeIndex], key, MAX_KV_LEN);
|
||||
strncpy(&values[writeIndex], value, MAX_KV_LEN);
|
||||
writeIndex++;
|
||||
}
|
||||
}
|
||||
|
||||
char *getKV(const char *key) {
|
||||
for (int i = 0; i < MAX_RECORD_COUNT; ++i) {
|
||||
if (strcmp(&keys[i], key) == 0) {
|
||||
return &values[i];
|
||||
}
|
||||
}
|
||||
return NULL;
|
||||
}
|
||||
|
||||
|
||||
int splitString(const char* str, char* separator, char (*arr)[MAX_TOKEN_LEN], int n_arr)
|
||||
{
|
||||
if (n_arr <= 0) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
char* tmp = (char*)malloc(strlen(str) + 1);
|
||||
strcpy(tmp, str);
|
||||
char* context;
|
||||
int n = 0;
|
||||
|
||||
char* token = strtok_r(tmp, separator, &context);
|
||||
if (!token) {
|
||||
goto ret;
|
||||
}
|
||||
strncpy(arr[n], token, MAX_TOKEN_LEN);
|
||||
n++;
|
||||
|
||||
while (1) {
|
||||
token = strtok_r(NULL, separator, &context);
|
||||
if (!token || n >= n_arr) {
|
||||
goto ret;
|
||||
}
|
||||
strncpy(arr[n], token, MAX_TOKEN_LEN);
|
||||
n++;
|
||||
}
|
||||
|
||||
ret:
|
||||
free(tmp);
|
||||
return n;
|
||||
}
|
||||
|
||||
/*
|
||||
uint64_t raftId(const char *host, uint32_t port) {
|
||||
uint32_t host_uint32 = (uint32_t)inet_addr(host);
|
||||
assert(host_uint32 != (uint32_t)-1);
|
||||
uint64_t code = ((uint64_t)host_uint32) << 32 | port;
|
||||
return code;
|
||||
}
|
||||
*/
|
||||
|
||||
|
||||
/*
|
||||
uint64_t encodeRaftId(const char *host, uint16_t port, uint16_t vid) {
|
||||
uint64_t raftId;
|
||||
uint32_t host_uint32 = (uint32_t)inet_addr(host);
|
||||
assert(host_uint32 != (uint32_t)-1);
|
||||
|
||||
raftId = (((uint64_t)host_uint32) << 32) | (((uint32_t)port) << 16) | vid;
|
||||
return raftId;
|
||||
}
|
||||
|
||||
void decodeRaftId(uint64_t raftId, char *host, int32_t len, uint16_t *port, uint16_t *vid) {
|
||||
uint32_t host32 = (uint32_t)((raftId >> 32) & 0x00000000FFFFFFFF);
|
||||
|
||||
struct in_addr addr;
|
||||
addr.s_addr = host32;
|
||||
snprintf(host, len, "%s", inet_ntoa(addr));
|
||||
|
||||
*port = (uint16_t)((raftId & 0x00000000FFFF0000) >> 16);
|
||||
*vid = (uint16_t)(raftId & 0x000000000000FFFF);
|
||||
}
|
||||
*/
|
||||
|
||||
|
||||
|
||||
|
||||
int32_t raftServerInit(SRaftServer *pRaftServer, const SRaftServerConfig *pConf, struct raft_fsm *pFsm) {
|
||||
int ret;
|
||||
|
||||
snprintf(pRaftServer->host, sizeof(pRaftServer->host), "%s", pConf->me.host);
|
||||
pRaftServer->port = pConf->me.port;
|
||||
snprintf(pRaftServer->address, sizeof(pRaftServer->address), "%s:%u", pRaftServer->host, pRaftServer->port);
|
||||
//strncpy(pRaftServer->dir, pConf->dataDir, sizeof(pRaftServer->dir));
|
||||
|
||||
ret = uv_loop_init(&pRaftServer->loop);
|
||||
if (ret != 0) {
|
||||
fprintf(stderr, "uv_loop_init error: %s \n", uv_strerror(ret));
|
||||
assert(0);
|
||||
}
|
||||
|
||||
ret = raft_uv_tcp_init(&pRaftServer->transport, &pRaftServer->loop);
|
||||
if (ret != 0) {
|
||||
fprintf(stderr, "raft_uv_tcp_init: error %d \n", ret);
|
||||
assert(0);
|
||||
}
|
||||
|
||||
|
||||
uint16_t vid;
|
||||
pRaftServer->instanceCount = 20;
|
||||
|
||||
|
||||
for (int i = 0; i < pRaftServer->instanceCount; ++i)
|
||||
{
|
||||
//vid = 0;
|
||||
vid = i;
|
||||
|
||||
|
||||
pRaftServer->instance[vid].raftId = encodeRaftId(pRaftServer->host, pRaftServer->port, vid);
|
||||
snprintf(pRaftServer->instance[vid].dir, sizeof(pRaftServer->instance[vid].dir), "%s_%llu", pConf->dataDir, pRaftServer->instance[vid].raftId);
|
||||
|
||||
char cmd_buf[COMMAND_LEN];
|
||||
snprintf(cmd_buf, sizeof(cmd_buf), "mkdir -p %s", pRaftServer->instance[vid].dir);
|
||||
system(cmd_buf);
|
||||
sleep(1);
|
||||
|
||||
pRaftServer->instance[vid].fsm = pFsm;
|
||||
|
||||
ret = raft_uv_init(&pRaftServer->instance[vid].io, &pRaftServer->loop, pRaftServer->instance[vid].dir, &pRaftServer->transport);
|
||||
if (ret != 0) {
|
||||
fprintf(stderr, "%s \n", raft_errmsg(&pRaftServer->instance[vid].raft));
|
||||
assert(0);
|
||||
}
|
||||
|
||||
ret = raft_init(&pRaftServer->instance[vid].raft, &pRaftServer->instance[vid].io, pRaftServer->instance[vid].fsm, pRaftServer->instance[vid].raftId, pRaftServer->address);
|
||||
if (ret != 0) {
|
||||
fprintf(stderr, "%s \n", raft_errmsg(&pRaftServer->instance[vid].raft));
|
||||
assert(0);
|
||||
}
|
||||
|
||||
struct raft_configuration conf;
|
||||
raft_configuration_init(&conf);
|
||||
raft_configuration_add(&conf, pRaftServer->instance[vid].raftId, pRaftServer->address, RAFT_VOTER);
|
||||
printf("add myself: %llu - %s \n", pRaftServer->instance[vid].raftId, pRaftServer->address);
|
||||
for (int i = 0; i < pConf->peersCount; ++i) {
|
||||
const Addr *pAddr = &pConf->peers[i];
|
||||
|
||||
raft_id rid = encodeRaftId(pAddr->host, pAddr->port, vid);
|
||||
|
||||
char addrBuf[ADDRESS_LEN];
|
||||
snprintf(addrBuf, sizeof(addrBuf), "%s:%u", pAddr->host, pAddr->port);
|
||||
raft_configuration_add(&conf, rid, addrBuf, RAFT_VOTER);
|
||||
printf("add peers: %llu - %s \n", rid, addrBuf);
|
||||
}
|
||||
|
||||
raft_bootstrap(&pRaftServer->instance[vid].raft, &conf);
|
||||
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t raftServerStart(SRaftServer *pRaftServer) {
|
||||
int ret;
|
||||
|
||||
for (int i = 0; i < pRaftServer->instanceCount; ++i) {
|
||||
ret = raft_start(&pRaftServer->instance[i].raft);
|
||||
if (ret != 0) {
|
||||
fprintf(stderr, "%s \n", raft_errmsg(&pRaftServer->instance[i].raft));
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
uv_run(&pRaftServer->loop, UV_RUN_DEFAULT);
|
||||
}
|
||||
|
||||
|
||||
void raftServerClose(SRaftServer *pRaftServer) {
|
||||
|
||||
}
|
||||
|
||||
|
||||
int fsmApplyCb(struct raft_fsm *pFsm, const struct raft_buffer *buf, void **result) {
|
||||
char *msg = (char*)buf->base;
|
||||
printf("fsm apply: %s \n", msg);
|
||||
|
||||
char arr[2][MAX_TOKEN_LEN];
|
||||
splitString(msg, "--", arr, 2);
|
||||
putKV(arr[0], arr[1]);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t initFsm(struct raft_fsm *fsm) {
|
||||
initStore();
|
||||
fsm->apply = fsmApplyCb;
|
||||
return 0;
|
||||
}
|
|
@ -0,0 +1,68 @@
|
|||
#ifndef TDENGINE_RAFT_SERVER_H
|
||||
#define TDENGINE_RAFT_SERVER_H
|
||||
|
||||
#ifdef __cplusplus
|
||||
extern "C" {
|
||||
#endif
|
||||
|
||||
#include <netinet/in.h>
|
||||
#include <arpa/inet.h>
|
||||
#include <assert.h>
|
||||
#include <string.h>
|
||||
#include "raft.h"
|
||||
#include "raft/uv.h"
|
||||
#include "common.h"
|
||||
|
||||
|
||||
// simulate a db store, just for test
|
||||
#define MAX_KV_LEN 100
|
||||
#define MAX_RECORD_COUNT 500
|
||||
char *keys;
|
||||
char *values;
|
||||
int writeIndex;
|
||||
|
||||
void initStore();
|
||||
void destroyStore();
|
||||
void putKV(const char *key, const char *value);
|
||||
char *getKV(const char *key);
|
||||
|
||||
typedef struct {
|
||||
char dir[DIR_LEN + HOST_LEN * 4]; /* Data dir of UV I/O backend */
|
||||
raft_id raftId; /* For vote */
|
||||
struct raft_fsm *fsm; /* Sample application FSM */
|
||||
struct raft raft; /* Raft instance */
|
||||
struct raft_io io; /* UV I/O backend */
|
||||
|
||||
} SInstance;
|
||||
|
||||
typedef struct {
|
||||
char host[HOST_LEN];
|
||||
uint32_t port;
|
||||
char address[ADDRESS_LEN]; /* Raft instance address */
|
||||
|
||||
struct uv_loop_s loop; /* UV loop */
|
||||
struct raft_uv_transport transport; /* UV I/O backend transport */
|
||||
|
||||
SInstance instance[MAX_INSTANCE_NUM];
|
||||
int32_t instanceCount;
|
||||
|
||||
} SRaftServer;
|
||||
|
||||
#define MAX_TOKEN_LEN 32
|
||||
int splitString(const char* str, char* separator, char (*arr)[MAX_TOKEN_LEN], int n_arr);
|
||||
|
||||
int32_t raftServerInit(SRaftServer *pRaftServer, const SRaftServerConfig *pConf, struct raft_fsm *pFsm);
|
||||
int32_t raftServerStart(SRaftServer *pRaftServer);
|
||||
void raftServerClose(SRaftServer *pRaftServer);
|
||||
|
||||
|
||||
int initFsm(struct raft_fsm *fsm);
|
||||
|
||||
|
||||
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
||||
#endif // TDENGINE_RAFT_SERVER_H
|
Loading…
Reference in New Issue