Merge pull request #9197 from taosdata/feature/3.0_mhli
TD-12278 add craft demo
This commit is contained in:
commit
6377bab3ab
|
@ -44,6 +44,12 @@ option(
|
|||
OFF
|
||||
)
|
||||
|
||||
option(
|
||||
BUILD_WITH_CRAFT
|
||||
"If build with canonical-raft"
|
||||
OFF
|
||||
)
|
||||
|
||||
option(
|
||||
BUILD_DEPENDENCY_TESTS
|
||||
"If build dependency tests"
|
||||
|
|
|
@ -0,0 +1,13 @@
|
|||
|
||||
# canonical-raft
|
||||
ExternalProject_Add(craft
|
||||
GIT_REPOSITORY https://github.com/canonical/raft.git
|
||||
GIT_TAG v0.11.2
|
||||
SOURCE_DIR "${CMAKE_CONTRIB_DIR}/craft"
|
||||
BINARY_DIR "${CMAKE_CONTRIB_DIR}/craft/.libs"
|
||||
#BUILD_IN_SOURCE TRUE
|
||||
CONFIGURE_COMMAND "autoreconf -i && ./configure"
|
||||
BUILD_COMMAND "$(MAKE)"
|
||||
INSTALL_COMMAND ""
|
||||
TEST_COMMAND ""
|
||||
)
|
|
@ -34,6 +34,11 @@ if(${BUILD_WITH_ROCKSDB})
|
|||
add_definitions(-DUSE_ROCKSDB)
|
||||
endif(${BUILD_WITH_ROCKSDB})
|
||||
|
||||
# canonical-raft
|
||||
if(${BUILD_WITH_CRAFT})
|
||||
cat("${CMAKE_SUPPORT_DIR}/craft_CMakeLists.txt.in" ${CONTRIB_TMP_FILE})
|
||||
endif(${BUILD_WITH_CRAFT})
|
||||
|
||||
# bdb
|
||||
if(${BUILD_WITH_BDB})
|
||||
cat("${CMAKE_SUPPORT_DIR}/bdb_CMakeLists.txt.in" ${CONTRIB_TMP_FILE})
|
||||
|
@ -144,6 +149,18 @@ if(${BUILD_WITH_NURAFT})
|
|||
add_subdirectory(nuraft)
|
||||
endif(${BUILD_WITH_NURAFT})
|
||||
|
||||
# CRAFT
|
||||
if(${BUILD_WITH_CRAFT})
|
||||
add_library(craft STATIC IMPORTED GLOBAL)
|
||||
set_target_properties(craft PROPERTIES
|
||||
IMPORTED_LOCATION "${CMAKE_CURRENT_SOURCE_DIR}/craft/.libs/libraft.a"
|
||||
INTERFACE_INCLUDE_DIRECTORIES "${CMAKE_CURRENT_SOURCE_DIR}/craft"
|
||||
)
|
||||
target_link_libraries(craft
|
||||
INTERFACE pthread
|
||||
)
|
||||
endif(${BUILD_WITH_CRAFT})
|
||||
|
||||
# BDB
|
||||
if(${BUILD_WITH_BDB})
|
||||
add_library(bdb STATIC IMPORTED GLOBAL)
|
||||
|
|
|
@ -0,0 +1,3 @@
|
|||
#!/bin/bash
|
||||
|
||||
rm -rf 127.0.0.1*
|
|
@ -0,0 +1,15 @@
|
|||
|
||||
|
||||
make craft:
|
||||
|
||||
sudo apt-get install libuv1-dev liblz4-dev
|
||||
autoreconf -i
|
||||
./configure --enable-example
|
||||
make
|
||||
|
||||
|
||||
start:
|
||||
|
||||
./simulate_vnode 10000 10001 10002
|
||||
./simulate_vnode 10001 10000 10002
|
||||
./simulate_vnode 10002 10000 10001
|
|
@ -0,0 +1,252 @@
|
|||
#include <stdio.h>
|
||||
#include <raft.h>
|
||||
#include <raft/uv.h>
|
||||
#include <unistd.h>
|
||||
#include <pthread.h>
|
||||
#include <string.h>
|
||||
#include <time.h>
|
||||
#include <stdlib.h>
|
||||
#include <getopt.h>
|
||||
|
||||
const char* exe_name;
|
||||
|
||||
// simulate ------------------------
|
||||
typedef struct SVnode {
|
||||
int vid;
|
||||
} SVnode;
|
||||
|
||||
|
||||
#define VNODE_COUNT 10
|
||||
SVnode vnodes[VNODE_COUNT];
|
||||
|
||||
int vnodeApplyWMsg(SVnode *pVnode, char *pMsg, void **pRsp) {
|
||||
printf("put value to tsdb, vid:%d msg:%s \n", pVnode->vid, pMsg);
|
||||
return 0;
|
||||
}
|
||||
|
||||
int applyCB(struct raft_fsm *fsm,
|
||||
const struct raft_buffer *buf,
|
||||
void **result) {
|
||||
char *msg = (char*)buf->base;
|
||||
//printf("%s \n", msg);
|
||||
|
||||
// parse msg
|
||||
char* context;
|
||||
char* token = strtok_r(msg, ":", &context);
|
||||
int vid = atoi(token);
|
||||
|
||||
token = strtok_r(NULL, ":", &context);
|
||||
char *value = token;
|
||||
|
||||
SVnode* tmp_vnodes = (SVnode*)(fsm->data);
|
||||
vnodeApplyWMsg(&tmp_vnodes[vid], value, NULL);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
// Config ------------------------
|
||||
#define HOST_LEN 32
|
||||
#define MAX_PEERS 10
|
||||
typedef struct Address {
|
||||
char host[HOST_LEN];
|
||||
uint32_t port;
|
||||
} Address;
|
||||
|
||||
uint64_t raftId(Address *addr) {
|
||||
// test in a single machine, port is unique
|
||||
// if in multi machines, use host and port
|
||||
return addr->port;
|
||||
}
|
||||
|
||||
typedef struct Config {
|
||||
Address me;
|
||||
Address peers[MAX_PEERS];
|
||||
int peer_count;
|
||||
} Config;
|
||||
|
||||
Config gConf;
|
||||
|
||||
void printConf(Config *c) {
|
||||
printf("me: %s:%u \n", c->me.host, c->me.port);
|
||||
for (int i = 0; i < c->peer_count; ++i) {
|
||||
printf("peer%d: %s:%u \n", i, c->peers[i].host, c->peers[i].port);
|
||||
}
|
||||
}
|
||||
|
||||
// RaftServer ------------------------
|
||||
typedef struct RaftServer {
|
||||
struct uv_loop_s loop;
|
||||
struct raft_uv_transport transport;
|
||||
struct raft_io io;
|
||||
struct raft_fsm fsm;
|
||||
struct raft raft;
|
||||
struct raft_configuration conf;
|
||||
} RaftServer;
|
||||
|
||||
RaftServer gRaftServer;
|
||||
|
||||
static void* startRaftServer(void *param) {
|
||||
//RaftServer* rs = (RaftServer*)param;
|
||||
RaftServer* rs = &gRaftServer;
|
||||
raft_start(&rs->raft);
|
||||
uv_run(&rs->loop, UV_RUN_DEFAULT);
|
||||
}
|
||||
|
||||
static const char* state2String(unsigned short state) {
|
||||
if (state == RAFT_UNAVAILABLE) {
|
||||
return "RAFT_UNAVAILABLE";
|
||||
|
||||
} else if (state == RAFT_FOLLOWER) {
|
||||
return "RAFT_FOLLOWER";
|
||||
|
||||
} else if (state == RAFT_CANDIDATE) {
|
||||
return "RAFT_CANDIDATE";
|
||||
|
||||
} else if (state == RAFT_LEADER) {
|
||||
return "RAFT_LEADER";
|
||||
|
||||
}
|
||||
return "UNKNOWN_RAFT_STATE";
|
||||
}
|
||||
|
||||
static void printRaftState(struct raft *r) {
|
||||
printf("\n");
|
||||
printf("my_id: %llu \n", r->id);
|
||||
printf("address: %s \n", r->address);
|
||||
printf("current_term: %llu \n", r->current_term);
|
||||
printf("voted_for: %llu \n", r->voted_for);
|
||||
printf("role: %s \n", state2String(r->state));
|
||||
printf("commit_index: %llu \n", r->commit_index);
|
||||
printf("last_applied: %llu \n", r->last_applied);
|
||||
printf("last_stored: %llu \n", r->last_stored);
|
||||
printf("\n");
|
||||
}
|
||||
|
||||
// console -----------------------------------------
|
||||
#define PROPOSE_VALUE_LEN 128
|
||||
static void proposeValue(struct raft *r) {
|
||||
struct raft_buffer buf;
|
||||
|
||||
// need free
|
||||
buf.len = PROPOSE_VALUE_LEN;
|
||||
buf.base = raft_malloc(buf.len);
|
||||
|
||||
// mock ts value
|
||||
int vid = rand() % VNODE_COUNT;
|
||||
snprintf(buf.base, buf.len, "%d:value_%ld", vid, time(NULL));
|
||||
|
||||
printf("propose value: %s \n", (char*)buf.base);
|
||||
|
||||
// need free
|
||||
struct raft_apply *req = raft_malloc(sizeof(struct raft_apply));
|
||||
raft_apply(r, req, &buf, 1, NULL);
|
||||
}
|
||||
|
||||
static void* console(void *param) {
|
||||
while (1) {
|
||||
// notice! memory buffer overflow!
|
||||
char buf[128];
|
||||
memset(buf, 0, sizeof(buf));
|
||||
fgets(buf, 128, stdin);
|
||||
if (strlen(buf) == 1) {
|
||||
continue;
|
||||
}
|
||||
buf[strlen(buf)-1] = '\0';
|
||||
|
||||
// do not use strcmp
|
||||
if (strcmp(buf, "state") == 0) {
|
||||
printRaftState(&gRaftServer.raft);
|
||||
|
||||
} else if (strcmp(buf, "put") == 0) {
|
||||
proposeValue(&gRaftServer.raft);
|
||||
|
||||
} else {
|
||||
printf("unknown command: [%s], support command: state, put \n", buf);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// -----------------------------------------
|
||||
void usage() {
|
||||
printf("\n");
|
||||
printf("%s my_port peer1_port peer2_port ... \n", exe_name);
|
||||
printf("\n");
|
||||
}
|
||||
|
||||
int main(int argc, char **argv) {
|
||||
srand(time(NULL));
|
||||
|
||||
exe_name = argv[0];
|
||||
if (argc < 2) {
|
||||
usage();
|
||||
exit(-1);
|
||||
}
|
||||
|
||||
// read conf from argv
|
||||
strncpy(gConf.me.host, "127.0.0.1", HOST_LEN);
|
||||
sscanf(argv[1], "%u", &gConf.me.port);
|
||||
|
||||
gConf.peer_count = 0;
|
||||
for (int i = 2; i < argc; ++i) {
|
||||
strncpy(gConf.peers[gConf.peer_count].host, "127.0.0.1", HOST_LEN);
|
||||
sscanf(argv[i], "%u", &gConf.peers[gConf.peer_count].port);
|
||||
gConf.peer_count++;
|
||||
}
|
||||
printConf(&gConf);
|
||||
|
||||
// mkdir
|
||||
char dir[128];
|
||||
snprintf(dir, sizeof(dir), "./%s_%u", gConf.me.host, gConf.me.port);
|
||||
|
||||
char cmd[128];
|
||||
snprintf(cmd, sizeof(cmd), "rm -rf ./%s", dir);
|
||||
system(cmd);
|
||||
snprintf(cmd, sizeof(cmd), "mkdir -p ./%s", dir);
|
||||
system(cmd);
|
||||
|
||||
// init io
|
||||
uv_loop_init(&gRaftServer.loop);
|
||||
raft_uv_tcp_init(&gRaftServer.transport, &gRaftServer.loop);
|
||||
raft_uv_init(&gRaftServer.io, &gRaftServer.loop, dir, &gRaftServer.transport);
|
||||
|
||||
// init fsm
|
||||
gRaftServer.fsm.apply = applyCB;
|
||||
gRaftServer.fsm.data = vnodes;
|
||||
for (int i = 0; i < VNODE_COUNT; ++i) {
|
||||
vnodes[i].vid = i;
|
||||
}
|
||||
|
||||
// init raft instance with io and fsm
|
||||
char address_buf[128];
|
||||
snprintf(address_buf, sizeof(address_buf), "%s:%u", gConf.me.host, gConf.me.port);
|
||||
|
||||
// test in a single machine, port is unique
|
||||
uint64_t raft_id = raftId(&gConf.me);
|
||||
raft_init(&gRaftServer.raft, &gRaftServer.io, &gRaftServer.fsm, raft_id, address_buf);
|
||||
//raft_init(&gRaftServer.raft, &gRaftServer.io, &gRaftServer.fsm, 11, "127.0.0.1:9000");
|
||||
|
||||
// init cluster configuration
|
||||
struct raft_configuration conf;
|
||||
raft_configuration_init(&conf);
|
||||
raft_configuration_add(&conf, raftId(&gConf.me), address_buf, RAFT_VOTER);
|
||||
for (int i = 0; i < gConf.peer_count; ++i) {
|
||||
char address_buf[128];
|
||||
snprintf(address_buf, sizeof(address_buf), "%s:%u", gConf.peers[i].host, gConf.peers[i].port);
|
||||
raft_configuration_add(&conf, raftId(&gConf.peers[i]), address_buf, RAFT_VOTER);
|
||||
}
|
||||
raft_bootstrap(&gRaftServer.raft, &conf);
|
||||
|
||||
// start raft server and loop
|
||||
pthread_t tid;
|
||||
pthread_create(&tid, NULL, startRaftServer, &gRaftServer);
|
||||
|
||||
// simulate console
|
||||
pthread_t tid2;
|
||||
pthread_create(&tid2, NULL, console, NULL);
|
||||
|
||||
while (1) {
|
||||
sleep(10);
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
Loading…
Reference in New Issue