From 82f898753451e8feeacac99af4d0e1a056c55eef Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Wed, 22 Dec 2021 17:45:52 +0800 Subject: [PATCH 1/3] add raftServer code --- contrib/test/craft/raftMain.c | 107 ++++++++++++++++++++++++++++++++ contrib/test/craft/raftServer.c | 20 ++++++ contrib/test/craft/raftServer.h | 42 +++++++++++++ 3 files changed, 169 insertions(+) create mode 100644 contrib/test/craft/raftMain.c create mode 100644 contrib/test/craft/raftServer.c create mode 100644 contrib/test/craft/raftServer.h diff --git a/contrib/test/craft/raftMain.c b/contrib/test/craft/raftMain.c new file mode 100644 index 0000000000..6e9f3b5dfb --- /dev/null +++ b/contrib/test/craft/raftMain.c @@ -0,0 +1,107 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include "raftServer.h" + +#define COMMAND_LEN 128 + +const char *exe_name; + +void *startServerFunc(void *param) { + SRaftServer *pServer = (SRaftServer*)param; + int32_t r = raftServerStart(pServer); + assert(r == 0); + + return NULL; +} + +// Console --------------------------------- +void console(SRaftServer *pRaftServer) { + while (1) { + char cmd_buf[COMMAND_LEN]; + memset(cmd_buf, 0, sizeof(cmd_buf)); + char *ret = fgets(cmd_buf, COMMAND_LEN, stdin); + if (!ret) { + exit(-1); + } + + int pos = strlen(cmd_buf); + if(cmd_buf[pos - 1] == '\n') { + cmd_buf[pos - 1] = '\0'; + } + + if (strncmp(cmd_buf, "", COMMAND_LEN) == 0) { + continue; + } + + printf("cmd_buf: [%s] \n", cmd_buf); + } +} + +void *startConsoleFunc(void *param) { + SRaftServer *pServer = (SRaftServer*)param; + console(pServer); + return NULL; +} + +// Config --------------------------------- +#define DIR_LEN 128 +#define HOST_LEN 128 +typedef struct SRaftServerConfig { + char host[HOST_LEN]; + uint32_t port; + char dir[DIR_LEN]; +} SRaftServerConfig; + +void parseConf(int argc, char **argv, SRaftServerConfig *pConf) { + snprintf(pConf->dir, sizeof(pConf->dir), "%s", argv[1]); + sscanf(argv[2], "%u", &pConf->port); + snprintf(pConf->dir, sizeof(pConf->dir), "%s", argv[3]); +} + +// ----------------------------------------- +void usage() { + printf("\n"); + printf("usage: %s host port dir \n", exe_name); + printf("eg : %s 127.0.0.1 10000 ./data \n", exe_name); + printf("\n"); +} + +int main(int argc, char **argv) { + srand(time(NULL)); + int32_t ret; + + exe_name = argv[0]; + if (argc != 4) { + usage(); + exit(-1); + } + + SRaftServerConfig conf; + parseConf(argc, argv, &conf); + + struct raft_fsm fsm; + initFsm(&fsm); + + SRaftServer raftServer; + ret = raftServerInit(&raftServer, &conf, &fsm); + + pthread_t tidRaftServer; + pthread_create(&tidRaftServer, NULL, startServerFunc, &raftServer); + + pthread_t tidConsole; + pthread_create(&tidConsole, NULL, startConsoleFunc, &raftServer); + + while (1) { + sleep(10); + } + + return 0; +} diff --git a/contrib/test/craft/raftServer.c b/contrib/test/craft/raftServer.c new file mode 100644 index 0000000000..5633b211ac --- /dev/null +++ b/contrib/test/craft/raftServer.c @@ -0,0 +1,20 @@ +#include "raftServer.h" + +int32_t raftServerInit(SRaftServer *pRaftServer, struct SRaftServerConfig *pConf, struct raft_fsm *pFsm) { + + return 0; +} + +int32_t raftServerStart(SRaftServer *pRaftServer) { + +} + + +void raftServerClose(SRaftServer *pRaftServer) { + +} + +int32_t initFsm(struct raft_fsm *fsm) { + + return 0; +} diff --git a/contrib/test/craft/raftServer.h b/contrib/test/craft/raftServer.h new file mode 100644 index 0000000000..9a717260de --- /dev/null +++ b/contrib/test/craft/raftServer.h @@ -0,0 +1,42 @@ +#ifndef TDENGINE_RAFT_SERVER_H +#define TDENGINE_RAFT_SERVER_H + +#ifdef __cplusplus +extern "C" { +#endif + +#include "raft.h" +#include "raft/uv.h" + + + +#define DIR_LEN 128 +#define ADDRESS_LEN 128 +typedef struct { + char dir[DIR_LEN]; /* Data dir of UV I/O backend */ + char address[ADDRESS_LEN]; /* Raft instance address */ + raft_id raftId; /* For vote */ + struct raft_fsm *fsm; /* Sample application FSM */ + + struct raft raft; /* Raft instance */ + struct raft_io io; /* UV I/O backend */ + struct uv_loop_s loop; /* UV loop */ + struct raft_uv_transport transport; /* UV I/O backend transport */ +} SRaftServer; + +struct SRaftServerConfig; +int32_t raftServerInit(SRaftServer *pRaftServer, struct SRaftServerConfig *pConf, struct raft_fsm *pFsm); +int32_t raftServerStart(SRaftServer *pRaftServer); +void raftServerClose(SRaftServer *pRaftServer); + + +int initFsm(struct raft_fsm *fsm); + + + + +#ifdef __cplusplus +} +#endif + +#endif // TDENGINE_RAFT_SERVER_H From 6dc92d703d3c7b3f5e5dc8e4524a01210abcd3cf Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Wed, 22 Dec 2021 20:40:18 +0800 Subject: [PATCH 2/3] add raft code --- contrib/test/craft/clear.sh | 1 + contrib/test/craft/common.h | 17 +++++++++ contrib/test/craft/raftMain.c | 25 ++++++++----- contrib/test/craft/raftServer.c | 65 ++++++++++++++++++++++++++++++++- contrib/test/craft/raftServer.h | 14 ++++--- 5 files changed, 105 insertions(+), 17 deletions(-) create mode 100644 contrib/test/craft/common.h diff --git a/contrib/test/craft/clear.sh b/contrib/test/craft/clear.sh index 6412656d77..398b3088f2 100644 --- a/contrib/test/craft/clear.sh +++ b/contrib/test/craft/clear.sh @@ -1,3 +1,4 @@ #!/bin/bash rm -rf 127.0.0.1* +rm -rf ./data diff --git a/contrib/test/craft/common.h b/contrib/test/craft/common.h new file mode 100644 index 0000000000..670e5fb927 --- /dev/null +++ b/contrib/test/craft/common.h @@ -0,0 +1,17 @@ +#ifndef TDENGINE_COMMON_H +#define TDENGINE_COMMON_H + +#ifdef __cplusplus +extern "C" { +#endif + +#define COMMAND_LEN 256 +#define DIR_LEN 128 +#define HOST_LEN 128 +#define ADDRESS_LEN (HOST_LEN + 16) + +#ifdef __cplusplus +} +#endif + +#endif // TDENGINE_COMMON_H diff --git a/contrib/test/craft/raftMain.c b/contrib/test/craft/raftMain.c index 6e9f3b5dfb..9d00cb4f70 100644 --- a/contrib/test/craft/raftMain.c +++ b/contrib/test/craft/raftMain.c @@ -1,6 +1,4 @@ #include -#include -#include #include #include #include @@ -8,9 +6,10 @@ #include #include #include +#include +#include #include "raftServer.h" - -#define COMMAND_LEN 128 +#include "common.h" const char *exe_name; @@ -52,8 +51,6 @@ void *startConsoleFunc(void *param) { } // Config --------------------------------- -#define DIR_LEN 128 -#define HOST_LEN 128 typedef struct SRaftServerConfig { char host[HOST_LEN]; uint32_t port; @@ -61,16 +58,22 @@ typedef struct SRaftServerConfig { } SRaftServerConfig; void parseConf(int argc, char **argv, SRaftServerConfig *pConf) { - snprintf(pConf->dir, sizeof(pConf->dir), "%s", argv[1]); + snprintf(pConf->host, sizeof(pConf->host), "%s", argv[1]); sscanf(argv[2], "%u", &pConf->port); snprintf(pConf->dir, sizeof(pConf->dir), "%s", argv[3]); } +void printConf(SRaftServerConfig *pConf) { + printf("conf: %s:%u %s \n", pConf->host, pConf->port, pConf->dir); +} + // ----------------------------------------- void usage() { printf("\n"); printf("usage: %s host port dir \n", exe_name); - printf("eg : %s 127.0.0.1 10000 ./data \n", exe_name); + printf("\n"); + printf("eg: \n"); + printf("%s 127.0.0.1 10000 ./data \n", exe_name); printf("\n"); } @@ -86,12 +89,14 @@ int main(int argc, char **argv) { SRaftServerConfig conf; parseConf(argc, argv, &conf); - + printConf(&conf); + struct raft_fsm fsm; initFsm(&fsm); SRaftServer raftServer; - ret = raftServerInit(&raftServer, &conf, &fsm); + ret = raftServerInit(&raftServer, conf.host, conf.port, conf.dir, &fsm); + assert(ret == 0); pthread_t tidRaftServer; pthread_create(&tidRaftServer, NULL, startServerFunc, &raftServer); diff --git a/contrib/test/craft/raftServer.c b/contrib/test/craft/raftServer.c index 5633b211ac..4a4d6cb7a3 100644 --- a/contrib/test/craft/raftServer.c +++ b/contrib/test/craft/raftServer.c @@ -1,12 +1,65 @@ +#include +#include "common.h" #include "raftServer.h" -int32_t raftServerInit(SRaftServer *pRaftServer, struct SRaftServerConfig *pConf, struct raft_fsm *pFsm) { +uint64_t raftId(const char *host, uint32_t port) { + uint32_t host_uint32 = (uint32_t)inet_addr(host); + assert(host_uint32 != (uint32_t)-1); + uint64_t code = ((uint64_t)host_uint32) << 32 | port; + return code; +} + +int32_t raftServerInit(SRaftServer *pRaftServer, const char *host, uint32_t port, const char *dir, struct raft_fsm *pFsm) { + int ret; + + char cmd_buf[COMMAND_LEN]; + snprintf(cmd_buf, sizeof(cmd_buf), "mkdir -p %s", dir); + system(cmd_buf); + + snprintf(pRaftServer->host, sizeof(pRaftServer->host), "%s", host); + pRaftServer->port = port; + snprintf(pRaftServer->address, sizeof(pRaftServer->address), "%s:%u", host, port); + strncpy(pRaftServer->dir, dir, sizeof(pRaftServer->dir)); + + pRaftServer->raftId = raftId(pRaftServer->host, pRaftServer->port); + pRaftServer->fsm = pFsm; + + ret = uv_loop_init(&pRaftServer->loop); + if (!ret) { + fprintf(stderr, "%s \n", raft_errmsg(&pRaftServer->raft)); + } + + ret = raft_uv_tcp_init(&pRaftServer->transport, &pRaftServer->loop); + if (!ret) { + fprintf(stderr, "%s \n", raft_errmsg(&pRaftServer->raft)); + } + + ret = raft_uv_init(&pRaftServer->io, &pRaftServer->loop, dir, &pRaftServer->transport); + if (!ret) { + fprintf(stderr, "%s \n", raft_errmsg(&pRaftServer->raft)); + } + + ret = raft_init(&pRaftServer->raft, &pRaftServer->io, pRaftServer->fsm, pRaftServer->raftId, pRaftServer->address); + if (!ret) { + fprintf(stderr, "%s \n", raft_errmsg(&pRaftServer->raft)); + } + + struct raft_configuration conf; + raft_configuration_init(&conf); + raft_configuration_add(&conf, pRaftServer->raftId, pRaftServer->address, RAFT_VOTER); + raft_bootstrap(&pRaftServer->raft, &conf); return 0; } int32_t raftServerStart(SRaftServer *pRaftServer) { + int ret; + ret = raft_start(&pRaftServer->raft); + if (!ret) { + fprintf(stderr, "%s \n", raft_errmsg(&pRaftServer->raft)); + } + uv_run(&pRaftServer->loop, UV_RUN_DEFAULT); } @@ -14,7 +67,15 @@ void raftServerClose(SRaftServer *pRaftServer) { } -int32_t initFsm(struct raft_fsm *fsm) { +int fsmApplyCb(struct raft_fsm *pFsm, const struct raft_buffer *buf, void **result) { + char *msg = (char*)buf->base; + printf("%s \n", msg); + + return 0; +} + +int32_t initFsm(struct raft_fsm *fsm) { + fsm->apply = fsmApplyCb; return 0; } diff --git a/contrib/test/craft/raftServer.h b/contrib/test/craft/raftServer.h index 9a717260de..44cfa54dfe 100644 --- a/contrib/test/craft/raftServer.h +++ b/contrib/test/craft/raftServer.h @@ -5,15 +5,20 @@ extern "C" { #endif +#include +#include +#include +#include #include "raft.h" #include "raft/uv.h" - - #define DIR_LEN 128 -#define ADDRESS_LEN 128 +#define HOST_LEN 128 +#define ADDRESS_LEN (HOST_LEN + 16) typedef struct { char dir[DIR_LEN]; /* Data dir of UV I/O backend */ + char host[HOST_LEN]; + uint32_t port; char address[ADDRESS_LEN]; /* Raft instance address */ raft_id raftId; /* For vote */ struct raft_fsm *fsm; /* Sample application FSM */ @@ -24,8 +29,7 @@ typedef struct { struct raft_uv_transport transport; /* UV I/O backend transport */ } SRaftServer; -struct SRaftServerConfig; -int32_t raftServerInit(SRaftServer *pRaftServer, struct SRaftServerConfig *pConf, struct raft_fsm *pFsm); +int32_t raftServerInit(SRaftServer *pRaftServer, const char *host, uint32_t port, const char *dir, struct raft_fsm *pFsm); int32_t raftServerStart(SRaftServer *pRaftServer); void raftServerClose(SRaftServer *pRaftServer); From 829fd712f7ffeedef426fecde39c02f4bc773ed2 Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Thu, 23 Dec 2021 19:49:34 +0800 Subject: [PATCH 3/3] simulate a raft KVStore, for self testing --- contrib/test/craft/common.h | 23 ++- contrib/test/craft/raftMain.c | 297 +++++++++++++++++++++++++++++--- contrib/test/craft/raftServer.c | 97 +++++++++-- contrib/test/craft/raftServer.h | 25 ++- 4 files changed, 402 insertions(+), 40 deletions(-) diff --git a/contrib/test/craft/common.h b/contrib/test/craft/common.h index 670e5fb927..1e94ee8bca 100644 --- a/contrib/test/craft/common.h +++ b/contrib/test/craft/common.h @@ -5,11 +5,28 @@ extern "C" { #endif -#define COMMAND_LEN 256 -#define DIR_LEN 128 -#define HOST_LEN 128 +#include + +#define MAX_PEERS 10 +#define COMMAND_LEN 512 +#define TOKEN_LEN 128 +#define DIR_LEN 256 +#define HOST_LEN 64 #define ADDRESS_LEN (HOST_LEN + 16) +typedef struct { + char host[HOST_LEN]; + uint32_t port; +} Addr; + +typedef struct { + Addr me; + Addr peers[MAX_PEERS]; + int peersCount; + char dir[DIR_LEN]; + char dataDir[DIR_LEN + HOST_LEN * 2]; +} SRaftServerConfig; + #ifdef __cplusplus } #endif diff --git a/contrib/test/craft/raftMain.c b/contrib/test/craft/raftMain.c index 9d00cb4f70..52e0b694dc 100644 --- a/contrib/test/craft/raftMain.c +++ b/contrib/test/craft/raftMain.c @@ -3,6 +3,7 @@ #include #include #include +#include #include #include #include @@ -13,6 +14,67 @@ const char *exe_name; +void parseAddr(const char *addr, char *host, int len, uint32_t *port) { + char* tmp = (char*)malloc(strlen(addr) + 1); + strcpy(tmp, addr); + + char* context; + char* separator = ":"; + char* token = strtok_r(tmp, separator, &context); + if (token) { + snprintf(host, len, "%s", token); + } + + token = strtok_r(NULL, separator, &context); + if (token) { + sscanf(token, "%u", port); + } + + free(tmp); +} + +// only parse 3 tokens +int parseCommand(const char* str, char* token1, char* token2, char* token3, int len) +{ + char* tmp = (char*)malloc(strlen(str) + 1); + strcpy(tmp, str); + + char* context; + char* separator = " "; + int n = 0; + + char* token = strtok_r(tmp, separator, &context); + if (!token) { + goto ret; + } + if (strcmp(token, "") != 0) { + strncpy(token1, token, len); + n++; + } + + token = strtok_r(NULL, separator, &context); + if (!token) { + goto ret; + } + if (strcmp(token, "") != 0) { + strncpy(token2, token, len); + n++; + } + + token = strtok_r(NULL, separator, &context); + if (!token) { + goto ret; + } + if (strcmp(token, "") != 0) { + strncpy(token3, token, len); + n++; + } + +ret: + return n; + free(tmp); +} + void *startServerFunc(void *param) { SRaftServer *pServer = (SRaftServer*)param; int32_t r = raftServerStart(pServer); @@ -22,6 +84,86 @@ void *startServerFunc(void *param) { } // Console --------------------------------- +const char* state2String(unsigned short state) { + if (state == RAFT_UNAVAILABLE) { + return "RAFT_UNAVAILABLE"; + + } else if (state == RAFT_FOLLOWER) { + return "RAFT_FOLLOWER"; + + } else if (state == RAFT_CANDIDATE) { + return "RAFT_CANDIDATE"; + + } else if (state == RAFT_LEADER) { + return "RAFT_LEADER"; + + } + return "UNKNOWN_RAFT_STATE"; +} + +void printRaftConfiguration(struct raft_configuration *c) { + printf("configuration: \n"); + for (int i = 0; i < c->n; ++i) { + printf("%llu -- %d -- %s\n", c->servers->id, c->servers->role, c->servers->address); + } +} + +void printRaftState(struct raft *r) { + printf("----Raft State: -----------\n"); + printf("my_id: %llu \n", r->id); + printf("address: %s \n", r->address); + printf("current_term: %llu \n", r->current_term); + printf("voted_for: %llu \n", r->voted_for); + printf("role: %s \n", state2String(r->state)); + printf("commit_index: %llu \n", r->commit_index); + printf("last_applied: %llu \n", r->last_applied); + printf("last_stored: %llu \n", r->last_stored); + + /* + printf("configuration_index: %llu \n", r->configuration_index); + printf("configuration_uncommitted_index: %llu \n", r->configuration_uncommitted_index); + printRaftConfiguration(&r->configuration); + */ + + printf("----------------------------\n"); +} + +void putValueCb(struct raft_apply *req, int status, void *result) { + raft_free(req); + struct raft *r = req->data; + if (status != 0) { + printf("putValueCb: %s \n", raft_errmsg(r)); + } else { + printf("putValueCb: %s \n", "ok"); + } +} + +void putValue(struct raft *r, const char *value) { + struct raft_buffer buf; + + buf.len = TOKEN_LEN;; + buf.base = raft_malloc(buf.len); + snprintf(buf.base, buf.len, "%s", value); + + struct raft_apply *req = raft_malloc(sizeof(struct raft_apply)); + req->data = r; + int ret = raft_apply(r, req, &buf, 1, putValueCb); + if (ret == 0) { + printf("put %s \n", (char*)buf.base); + } else { + printf("put error: %s \n", raft_errmsg(r)); + } +} + +void getValue(const char *key) { + char *ptr = getKV(key); + if (ptr) { + printf("get value: [%s] \n", ptr); + } else { + printf("value not found for key: [%s] \n", key); + } +} + void console(SRaftServer *pRaftServer) { while (1) { char cmd_buf[COMMAND_LEN]; @@ -40,7 +182,68 @@ void console(SRaftServer *pRaftServer) { continue; } - printf("cmd_buf: [%s] \n", cmd_buf); + char cmd[TOKEN_LEN]; + memset(cmd, 0, sizeof(cmd)); + + char param1[TOKEN_LEN]; + memset(param1, 0, sizeof(param1)); + + char param2[TOKEN_LEN]; + memset(param2, 0, sizeof(param2)); + + parseCommand(cmd_buf, cmd, param1, param2, TOKEN_LEN); + if (strcmp(cmd, "addnode") == 0) { + printf("not support \n"); + + /* + char host[HOST_LEN]; + uint32_t port; + parseAddr(param1, host, HOST_LEN, &port); + uint64_t rid = raftId(host, port); + + struct raft_change *req = raft_malloc(sizeof(*req)); + int r = raft_add(&pRaftServer->raft, req, rid, param1, NULL); + if (r != 0) { + printf("raft_add: %s \n", raft_errmsg(&pRaftServer->raft)); + } + printf("add node: %lu %s \n", rid, param1); + + struct raft_change *req2 = raft_malloc(sizeof(*req2)); + r = raft_assign(&pRaftServer->raft, req2, rid, RAFT_VOTER, NULL); + if (r != 0) { + printf("raft_assign: %s \n", raft_errmsg(&pRaftServer->raft)); + } + */ + + } else if (strcmp(cmd, "dropnode") == 0) { + printf("not support \n"); + + } else if (strcmp(cmd, "put") == 0) { + char buf[256]; + snprintf(buf, sizeof(buf), "%s--%s", param1, param2); + putValue(&pRaftServer->raft, buf); + + } else if (strcmp(cmd, "get") == 0) { + getValue(param1); + + } else if (strcmp(cmd, "state") == 0) { + printRaftState(&pRaftServer->raft); + + } else if (strcmp(cmd, "snapshot") == 0) { + printf("not support \n"); + + } else if (strcmp(cmd, "help") == 0) { + printf("addnode \"127.0.0.1:8888\" \n"); + printf("dropnode \"127.0.0.1:8888\" \n"); + printf("put key value \n"); + printf("get key \n"); + printf("state \n"); + + } else { + printf("unknown command: [%s], type \"help\" to see help \n", cmd); + } + + //printf("cmd_buf: [%s] \n", cmd_buf); } } @@ -51,38 +254,86 @@ void *startConsoleFunc(void *param) { } // Config --------------------------------- -typedef struct SRaftServerConfig { - char host[HOST_LEN]; - uint32_t port; - char dir[DIR_LEN]; -} SRaftServerConfig; +void usage() { + printf("\nusage: \n"); + printf("%s --me=127.0.0.1:10000 --dir=./data \n", exe_name); + printf("\n"); + printf("%s --me=127.0.0.1:10000 --peers=127.0.0.1:10001,127.0.0.1:10002 --dir=./data \n", exe_name); + printf("%s --me=127.0.0.1:10001 --peers=127.0.0.1:10000,127.0.0.1:10002 --dir=./data \n", exe_name); + printf("%s --me=127.0.0.1:10002 --peers=127.0.0.1:10000,127.0.0.1:10001 --dir=./data \n", exe_name); + printf("\n"); +} void parseConf(int argc, char **argv, SRaftServerConfig *pConf) { - snprintf(pConf->host, sizeof(pConf->host), "%s", argv[1]); - sscanf(argv[2], "%u", &pConf->port); - snprintf(pConf->dir, sizeof(pConf->dir), "%s", argv[3]); + memset(pConf, 0, sizeof(*pConf)); + + int option_index, option_value; + option_index = 0; + static struct option long_options[] = { + {"help", no_argument, NULL, 'h'}, + {"peers", required_argument, NULL, 'p'}, + {"me", required_argument, NULL, 'm'}, + {"dir", required_argument, NULL, 'd'}, + {NULL, 0, NULL, 0} + }; + + while ((option_value = getopt_long(argc, argv, "hp:m:d:", long_options, &option_index)) != -1) { + switch (option_value) { + case 'm': { + parseAddr(optarg, pConf->me.host, sizeof(pConf->me.host), &pConf->me.port); + break; + } + + case 'p': { + char tokens[MAX_PEERS][MAX_TOKEN_LEN]; + int peerCount = splitString(optarg, ",", tokens, MAX_PEERS); + pConf->peersCount = peerCount; + for (int i = 0; i < peerCount; ++i) { + Addr *pAddr = &pConf->peers[i]; + parseAddr(tokens[i], pAddr->host, sizeof(pAddr->host), &pAddr->port); + } + break; + } + + + case 'd': { + snprintf(pConf->dir, sizeof(pConf->dir), "%s", optarg); + break; + } + + case 'h': { + usage(); + exit(-1); + } + + default: { + usage(); + exit(-1); + } + } + } + snprintf(pConf->dataDir, sizeof(pConf->dataDir), "%s/%s:%u", pConf->dir, pConf->me.host, pConf->me.port); } void printConf(SRaftServerConfig *pConf) { - printf("conf: %s:%u %s \n", pConf->host, pConf->port, pConf->dir); + printf("\nconf: \n"); + printf("me: %s:%u \n", pConf->me.host, pConf->me.port); + printf("peersCount: %d \n", pConf->peersCount); + for (int i = 0; i < pConf->peersCount; ++i) { + Addr *pAddr = &pConf->peers[i]; + printf("peer%d: %s:%u \n", i, pAddr->host, pAddr->port); + } + printf("dataDir: %s \n\n", pConf->dataDir); + } -// ----------------------------------------- -void usage() { - printf("\n"); - printf("usage: %s host port dir \n", exe_name); - printf("\n"); - printf("eg: \n"); - printf("%s 127.0.0.1 10000 ./data \n", exe_name); - printf("\n"); -} int main(int argc, char **argv) { srand(time(NULL)); int32_t ret; exe_name = argv[0]; - if (argc != 4) { + if (argc < 3) { usage(); exit(-1); } @@ -91,11 +342,15 @@ int main(int argc, char **argv) { parseConf(argc, argv, &conf); printConf(&conf); + char cmd_buf[COMMAND_LEN]; + snprintf(cmd_buf, sizeof(cmd_buf), "mkdir -p %s", conf.dataDir); + system(cmd_buf); + struct raft_fsm fsm; initFsm(&fsm); SRaftServer raftServer; - ret = raftServerInit(&raftServer, conf.host, conf.port, conf.dir, &fsm); + ret = raftServerInit(&raftServer, &conf, &fsm); assert(ret == 0); pthread_t tidRaftServer; diff --git a/contrib/test/craft/raftServer.c b/contrib/test/craft/raftServer.c index 4a4d6cb7a3..6f4dbc1997 100644 --- a/contrib/test/craft/raftServer.c +++ b/contrib/test/craft/raftServer.c @@ -2,6 +2,70 @@ #include "common.h" #include "raftServer.h" +char *keys; +char *values; + +void initStore() { + keys = malloc(MAX_RECORD_COUNT * MAX_KV_LEN); + values = malloc(MAX_RECORD_COUNT * MAX_KV_LEN); + writeIndex = 0; +} + +void destroyStore() { + free(keys); + free(values); +} + +void putKV(const char *key, const char *value) { + if (writeIndex < MAX_RECORD_COUNT) { + strncpy(&keys[writeIndex], key, MAX_KV_LEN); + strncpy(&values[writeIndex], value, MAX_KV_LEN); + writeIndex++; + } +} + +char *getKV(const char *key) { + for (int i = 0; i < MAX_RECORD_COUNT; ++i) { + if (strcmp(&keys[i], key) == 0) { + return &values[i]; + } + } + return NULL; +} + + +int splitString(const char* str, char* separator, char (*arr)[MAX_TOKEN_LEN], int n_arr) +{ + if (n_arr <= 0) { + return -1; + } + + char* tmp = (char*)malloc(strlen(str) + 1); + strcpy(tmp, str); + char* context; + int n = 0; + + char* token = strtok_r(tmp, separator, &context); + if (!token) { + goto ret; + } + strncpy(arr[n], token, MAX_TOKEN_LEN); + n++; + + while (1) { + token = strtok_r(NULL, separator, &context); + if (!token || n >= n_arr) { + goto ret; + } + strncpy(arr[n], token, MAX_TOKEN_LEN); + n++; + } + +ret: + free(tmp); + return n; +} + uint64_t raftId(const char *host, uint32_t port) { uint32_t host_uint32 = (uint32_t)inet_addr(host); assert(host_uint32 != (uint32_t)-1); @@ -9,17 +73,13 @@ uint64_t raftId(const char *host, uint32_t port) { return code; } -int32_t raftServerInit(SRaftServer *pRaftServer, const char *host, uint32_t port, const char *dir, struct raft_fsm *pFsm) { +int32_t raftServerInit(SRaftServer *pRaftServer, const SRaftServerConfig *pConf, struct raft_fsm *pFsm) { int ret; - char cmd_buf[COMMAND_LEN]; - snprintf(cmd_buf, sizeof(cmd_buf), "mkdir -p %s", dir); - system(cmd_buf); - - snprintf(pRaftServer->host, sizeof(pRaftServer->host), "%s", host); - pRaftServer->port = port; - snprintf(pRaftServer->address, sizeof(pRaftServer->address), "%s:%u", host, port); - strncpy(pRaftServer->dir, dir, sizeof(pRaftServer->dir)); + snprintf(pRaftServer->host, sizeof(pRaftServer->host), "%s", pConf->me.host); + pRaftServer->port = pConf->me.port; + snprintf(pRaftServer->address, sizeof(pRaftServer->address), "%s:%u", pRaftServer->host, pRaftServer->port); + strncpy(pRaftServer->dir, pConf->dataDir, sizeof(pRaftServer->dir)); pRaftServer->raftId = raftId(pRaftServer->host, pRaftServer->port); pRaftServer->fsm = pFsm; @@ -34,7 +94,7 @@ int32_t raftServerInit(SRaftServer *pRaftServer, const char *host, uint32_t port fprintf(stderr, "%s \n", raft_errmsg(&pRaftServer->raft)); } - ret = raft_uv_init(&pRaftServer->io, &pRaftServer->loop, dir, &pRaftServer->transport); + ret = raft_uv_init(&pRaftServer->io, &pRaftServer->loop, pRaftServer->dir, &pRaftServer->transport); if (!ret) { fprintf(stderr, "%s \n", raft_errmsg(&pRaftServer->raft)); } @@ -47,6 +107,16 @@ int32_t raftServerInit(SRaftServer *pRaftServer, const char *host, uint32_t port struct raft_configuration conf; raft_configuration_init(&conf); raft_configuration_add(&conf, pRaftServer->raftId, pRaftServer->address, RAFT_VOTER); + printf("add myself: %llu - %s \n", pRaftServer->raftId, pRaftServer->address); + for (int i = 0; i < pConf->peersCount; ++i) { + const Addr *pAddr = &pConf->peers[i]; + raft_id rid = raftId(pAddr->host, pAddr->port); + char addrBuf[ADDRESS_LEN]; + snprintf(addrBuf, sizeof(addrBuf), "%s:%u", pAddr->host, pAddr->port); + raft_configuration_add(&conf, rid, addrBuf, RAFT_VOTER); + printf("add peers: %llu - %s \n", rid, addrBuf); + } + raft_bootstrap(&pRaftServer->raft, &conf); return 0; @@ -70,12 +140,17 @@ void raftServerClose(SRaftServer *pRaftServer) { int fsmApplyCb(struct raft_fsm *pFsm, const struct raft_buffer *buf, void **result) { char *msg = (char*)buf->base; - printf("%s \n", msg); + printf("fsm apply: %s \n", msg); + + char arr[2][MAX_TOKEN_LEN]; + splitString(msg, "--", arr, 2); + putKV(arr[0], arr[1]); return 0; } int32_t initFsm(struct raft_fsm *fsm) { + initStore(); fsm->apply = fsmApplyCb; return 0; } diff --git a/contrib/test/craft/raftServer.h b/contrib/test/craft/raftServer.h index 44cfa54dfe..5fccde6bf2 100644 --- a/contrib/test/craft/raftServer.h +++ b/contrib/test/craft/raftServer.h @@ -11,12 +11,23 @@ extern "C" { #include #include "raft.h" #include "raft/uv.h" +#include "common.h" + + +// simulate a db store, just for test +#define MAX_KV_LEN 100 +#define MAX_RECORD_COUNT 500 +char *keys; +char *values; +int writeIndex; + +void initStore(); +void destroyStore(); +void putKV(const char *key, const char *value); +char *getKV(const char *key); -#define DIR_LEN 128 -#define HOST_LEN 128 -#define ADDRESS_LEN (HOST_LEN + 16) typedef struct { - char dir[DIR_LEN]; /* Data dir of UV I/O backend */ + char dir[DIR_LEN + HOST_LEN * 2]; /* Data dir of UV I/O backend */ char host[HOST_LEN]; uint32_t port; char address[ADDRESS_LEN]; /* Raft instance address */ @@ -29,7 +40,11 @@ typedef struct { struct raft_uv_transport transport; /* UV I/O backend transport */ } SRaftServer; -int32_t raftServerInit(SRaftServer *pRaftServer, const char *host, uint32_t port, const char *dir, struct raft_fsm *pFsm); +#define MAX_TOKEN_LEN 32 +int splitString(const char* str, char* separator, char (*arr)[MAX_TOKEN_LEN], int n_arr); + +uint64_t raftId(const char *host, uint32_t port); +int32_t raftServerInit(SRaftServer *pRaftServer, const SRaftServerConfig *pConf, struct raft_fsm *pFsm); int32_t raftServerStart(SRaftServer *pRaftServer); void raftServerClose(SRaftServer *pRaftServer);