From 7b91d474660eddc799db0c25aa0bbcda01d51c49 Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Sun, 19 Dec 2021 20:05:07 +0800 Subject: [PATCH] add craft demo --- cmake/cmake.options | 6 + cmake/craft_CMakeLists.txt.in | 13 ++ contrib/CMakeLists.txt | 17 ++ contrib/test/craft/clear.sh | 3 + contrib/test/craft/help.txt | 15 ++ contrib/test/craft/simulate_vnode.c | 252 ++++++++++++++++++++++++++++ 6 files changed, 306 insertions(+) create mode 100644 cmake/craft_CMakeLists.txt.in create mode 100644 contrib/test/craft/clear.sh create mode 100644 contrib/test/craft/help.txt create mode 100644 contrib/test/craft/simulate_vnode.c diff --git a/cmake/cmake.options b/cmake/cmake.options index edaab3bd45..1b2cdb6c47 100644 --- a/cmake/cmake.options +++ b/cmake/cmake.options @@ -43,6 +43,12 @@ option( OFF ) +option( + BUILD_WITH_CRAFT + "If build with canonical-raft" + OFF +) + option( BUILD_DEPENDENCY_TESTS "If build dependency tests" diff --git a/cmake/craft_CMakeLists.txt.in b/cmake/craft_CMakeLists.txt.in new file mode 100644 index 0000000000..a77fa679e0 --- /dev/null +++ b/cmake/craft_CMakeLists.txt.in @@ -0,0 +1,13 @@ + +# canonical-raft +ExternalProject_Add(craft + GIT_REPOSITORY https://github.com/canonical/raft.git + GIT_TAG v0.11.2 + SOURCE_DIR "${CMAKE_CONTRIB_DIR}/craft" + BINARY_DIR "${CMAKE_CONTRIB_DIR}/craft/.libs" + #BUILD_IN_SOURCE TRUE + CONFIGURE_COMMAND "autoreconf -i && ./configure" + BUILD_COMMAND "$(MAKE)" + INSTALL_COMMAND "" + TEST_COMMAND "" +) diff --git a/contrib/CMakeLists.txt b/contrib/CMakeLists.txt index cffe164488..0aadaccfa1 100644 --- a/contrib/CMakeLists.txt +++ b/contrib/CMakeLists.txt @@ -34,6 +34,11 @@ if(${BUILD_WITH_ROCKSDB}) add_definitions(-DUSE_ROCKSDB) endif(${BUILD_WITH_ROCKSDB}) +# canonical-raft +if(${BUILD_WITH_CRAFT}) + cat("${CMAKE_SUPPORT_DIR}/craft_CMakeLists.txt.in" ${CONTRIB_TMP_FILE}) +endif(${BUILD_WITH_CRAFT}) + # bdb if(${BUILD_WITH_BDB}) cat("${CMAKE_SUPPORT_DIR}/bdb_CMakeLists.txt.in" ${CONTRIB_TMP_FILE}) @@ -144,6 +149,18 @@ if(${BUILD_WITH_NURAFT}) add_subdirectory(nuraft) endif(${BUILD_WITH_NURAFT}) +# CRAFT +if(${BUILD_WITH_CRAFT}) + add_library(craft STATIC IMPORTED GLOBAL) + set_target_properties(craft PROPERTIES + IMPORTED_LOCATION "${CMAKE_CURRENT_SOURCE_DIR}/craft/.libs/libraft.a" + INTERFACE_INCLUDE_DIRECTORIES "${CMAKE_CURRENT_SOURCE_DIR}/craft" + ) + target_link_libraries(craft + INTERFACE pthread + ) +endif(${BUILD_WITH_CRAFT}) + # BDB if(${BUILD_WITH_BDB}) add_library(bdb STATIC IMPORTED GLOBAL) diff --git a/contrib/test/craft/clear.sh b/contrib/test/craft/clear.sh new file mode 100644 index 0000000000..6412656d77 --- /dev/null +++ b/contrib/test/craft/clear.sh @@ -0,0 +1,3 @@ +#!/bin/bash + +rm -rf 127.0.0.1* diff --git a/contrib/test/craft/help.txt b/contrib/test/craft/help.txt new file mode 100644 index 0000000000..48ce9de403 --- /dev/null +++ b/contrib/test/craft/help.txt @@ -0,0 +1,15 @@ + + +make craft: + +sudo apt-get install libuv1-dev liblz4-dev +autoreconf -i +./configure --enable-example +make + + +start: + +./simulate_vnode 10000 10001 10002 +./simulate_vnode 10001 10000 10002 +./simulate_vnode 10002 10000 10001 diff --git a/contrib/test/craft/simulate_vnode.c b/contrib/test/craft/simulate_vnode.c new file mode 100644 index 0000000000..668fe638b7 --- /dev/null +++ b/contrib/test/craft/simulate_vnode.c @@ -0,0 +1,252 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include + +const char* exe_name; + +// simulate ------------------------ +typedef struct SVnode { + int vid; +} SVnode; + + +#define VNODE_COUNT 10 +SVnode vnodes[VNODE_COUNT]; + +int vnodeApplyWMsg(SVnode *pVnode, char *pMsg, void **pRsp) { + printf("put value to tsdb, vid:%d msg:%s \n", pVnode->vid, pMsg); + return 0; +} + +int applyCB(struct raft_fsm *fsm, + const struct raft_buffer *buf, + void **result) { + char *msg = (char*)buf->base; + //printf("%s \n", msg); + + // parse msg + char* context; + char* token = strtok_r(msg, ":", &context); + int vid = atoi(token); + + token = strtok_r(NULL, ":", &context); + char *value = token; + + SVnode* tmp_vnodes = (SVnode*)(fsm->data); + vnodeApplyWMsg(&tmp_vnodes[vid], value, NULL); + + return 0; +} + +// Config ------------------------ +#define HOST_LEN 32 +#define MAX_PEERS 10 +typedef struct Address { + char host[HOST_LEN]; + uint32_t port; +} Address; + +uint64_t raftId(Address *addr) { + // test in a single machine, port is unique + // if in multi machines, use host and port + return addr->port; +} + +typedef struct Config { + Address me; + Address peers[MAX_PEERS]; + int peer_count; +} Config; + +Config gConf; + +void printConf(Config *c) { + printf("me: %s:%u \n", c->me.host, c->me.port); + for (int i = 0; i < c->peer_count; ++i) { + printf("peer%d: %s:%u \n", i, c->peers[i].host, c->peers[i].port); + } +} + +// RaftServer ------------------------ +typedef struct RaftServer { + struct uv_loop_s loop; + struct raft_uv_transport transport; + struct raft_io io; + struct raft_fsm fsm; + struct raft raft; + struct raft_configuration conf; +} RaftServer; + +RaftServer gRaftServer; + +static void* startRaftServer(void *param) { + //RaftServer* rs = (RaftServer*)param; + RaftServer* rs = &gRaftServer; + raft_start(&rs->raft); + uv_run(&rs->loop, UV_RUN_DEFAULT); +} + +static const char* state2String(unsigned short state) { + if (state == RAFT_UNAVAILABLE) { + return "RAFT_UNAVAILABLE"; + + } else if (state == RAFT_FOLLOWER) { + return "RAFT_FOLLOWER"; + + } else if (state == RAFT_CANDIDATE) { + return "RAFT_CANDIDATE"; + + } else if (state == RAFT_LEADER) { + return "RAFT_LEADER"; + + } + return "UNKNOWN_RAFT_STATE"; +} + +static void printRaftState(struct raft *r) { + printf("\n"); + printf("my_id: %llu \n", r->id); + printf("address: %s \n", r->address); + printf("current_term: %llu \n", r->current_term); + printf("voted_for: %llu \n", r->voted_for); + printf("role: %s \n", state2String(r->state)); + printf("commit_index: %llu \n", r->commit_index); + printf("last_applied: %llu \n", r->last_applied); + printf("last_stored: %llu \n", r->last_stored); + printf("\n"); +} + +// console ----------------------------------------- +#define PROPOSE_VALUE_LEN 128 +static void proposeValue(struct raft *r) { + struct raft_buffer buf; + + // need free + buf.len = PROPOSE_VALUE_LEN; + buf.base = raft_malloc(buf.len); + + // mock ts value + int vid = rand() % VNODE_COUNT; + snprintf(buf.base, buf.len, "%d:value_%ld", vid, time(NULL)); + + printf("propose value: %s \n", (char*)buf.base); + + // need free + struct raft_apply *req = raft_malloc(sizeof(struct raft_apply)); + raft_apply(r, req, &buf, 1, NULL); +} + +static void* console(void *param) { + while (1) { + // notice! memory buffer overflow! + char buf[128]; + memset(buf, 0, sizeof(buf)); + fgets(buf, 128, stdin); + if (strlen(buf) == 1) { + continue; + } + buf[strlen(buf)-1] = '\0'; + + // do not use strcmp + if (strcmp(buf, "state") == 0) { + printRaftState(&gRaftServer.raft); + + } else if (strcmp(buf, "put") == 0) { + proposeValue(&gRaftServer.raft); + + } else { + printf("unknown command: [%s], support command: state, put \n", buf); + } + } +} + +// ----------------------------------------- +void usage() { + printf("\n"); + printf("%s my_port peer1_port peer2_port ... \n", exe_name); + printf("\n"); +} + +int main(int argc, char **argv) { + srand(time(NULL)); + + exe_name = argv[0]; + if (argc < 2) { + usage(); + exit(-1); + } + + // read conf from argv + strncpy(gConf.me.host, "127.0.0.1", HOST_LEN); + sscanf(argv[1], "%u", &gConf.me.port); + + gConf.peer_count = 0; + for (int i = 2; i < argc; ++i) { + strncpy(gConf.peers[gConf.peer_count].host, "127.0.0.1", HOST_LEN); + sscanf(argv[i], "%u", &gConf.peers[gConf.peer_count].port); + gConf.peer_count++; + } + printConf(&gConf); + + // mkdir + char dir[128]; + snprintf(dir, sizeof(dir), "./%s_%u", gConf.me.host, gConf.me.port); + + char cmd[128]; + snprintf(cmd, sizeof(cmd), "rm -rf ./%s", dir); + system(cmd); + snprintf(cmd, sizeof(cmd), "mkdir -p ./%s", dir); + system(cmd); + + // init io + uv_loop_init(&gRaftServer.loop); + raft_uv_tcp_init(&gRaftServer.transport, &gRaftServer.loop); + raft_uv_init(&gRaftServer.io, &gRaftServer.loop, dir, &gRaftServer.transport); + + // init fsm + gRaftServer.fsm.apply = applyCB; + gRaftServer.fsm.data = vnodes; + for (int i = 0; i < VNODE_COUNT; ++i) { + vnodes[i].vid = i; + } + + // init raft instance with io and fsm + char address_buf[128]; + snprintf(address_buf, sizeof(address_buf), "%s:%u", gConf.me.host, gConf.me.port); + + // test in a single machine, port is unique + uint64_t raft_id = raftId(&gConf.me); + raft_init(&gRaftServer.raft, &gRaftServer.io, &gRaftServer.fsm, raft_id, address_buf); + //raft_init(&gRaftServer.raft, &gRaftServer.io, &gRaftServer.fsm, 11, "127.0.0.1:9000"); + + // init cluster configuration + struct raft_configuration conf; + raft_configuration_init(&conf); + raft_configuration_add(&conf, raftId(&gConf.me), address_buf, RAFT_VOTER); + for (int i = 0; i < gConf.peer_count; ++i) { + char address_buf[128]; + snprintf(address_buf, sizeof(address_buf), "%s:%u", gConf.peers[i].host, gConf.peers[i].port); + raft_configuration_add(&conf, raftId(&gConf.peers[i]), address_buf, RAFT_VOTER); + } + raft_bootstrap(&gRaftServer.raft, &conf); + + // start raft server and loop + pthread_t tid; + pthread_create(&tid, NULL, startRaftServer, &gRaftServer); + + // simulate console + pthread_t tid2; + pthread_create(&tid2, NULL, console, NULL); + + while (1) { + sleep(10); + } + + return 0; +}