From 829fd712f7ffeedef426fecde39c02f4bc773ed2 Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Thu, 23 Dec 2021 19:49:34 +0800 Subject: [PATCH] simulate a raft KVStore, for self testing --- contrib/test/craft/common.h | 23 ++- contrib/test/craft/raftMain.c | 297 +++++++++++++++++++++++++++++--- contrib/test/craft/raftServer.c | 97 +++++++++-- contrib/test/craft/raftServer.h | 25 ++- 4 files changed, 402 insertions(+), 40 deletions(-) diff --git a/contrib/test/craft/common.h b/contrib/test/craft/common.h index 670e5fb927..1e94ee8bca 100644 --- a/contrib/test/craft/common.h +++ b/contrib/test/craft/common.h @@ -5,11 +5,28 @@ extern "C" { #endif -#define COMMAND_LEN 256 -#define DIR_LEN 128 -#define HOST_LEN 128 +#include + +#define MAX_PEERS 10 +#define COMMAND_LEN 512 +#define TOKEN_LEN 128 +#define DIR_LEN 256 +#define HOST_LEN 64 #define ADDRESS_LEN (HOST_LEN + 16) +typedef struct { + char host[HOST_LEN]; + uint32_t port; +} Addr; + +typedef struct { + Addr me; + Addr peers[MAX_PEERS]; + int peersCount; + char dir[DIR_LEN]; + char dataDir[DIR_LEN + HOST_LEN * 2]; +} SRaftServerConfig; + #ifdef __cplusplus } #endif diff --git a/contrib/test/craft/raftMain.c b/contrib/test/craft/raftMain.c index 9d00cb4f70..52e0b694dc 100644 --- a/contrib/test/craft/raftMain.c +++ b/contrib/test/craft/raftMain.c @@ -3,6 +3,7 @@ #include #include #include +#include #include #include #include @@ -13,6 +14,67 @@ const char *exe_name; +void parseAddr(const char *addr, char *host, int len, uint32_t *port) { + char* tmp = (char*)malloc(strlen(addr) + 1); + strcpy(tmp, addr); + + char* context; + char* separator = ":"; + char* token = strtok_r(tmp, separator, &context); + if (token) { + snprintf(host, len, "%s", token); + } + + token = strtok_r(NULL, separator, &context); + if (token) { + sscanf(token, "%u", port); + } + + free(tmp); +} + +// only parse 3 tokens +int parseCommand(const char* str, char* token1, char* token2, char* token3, int len) +{ + char* tmp = (char*)malloc(strlen(str) + 1); + strcpy(tmp, str); + + char* context; + char* separator = " "; + int n = 0; + + char* token = strtok_r(tmp, separator, &context); + if (!token) { + goto ret; + } + if (strcmp(token, "") != 0) { + strncpy(token1, token, len); + n++; + } + + token = strtok_r(NULL, separator, &context); + if (!token) { + goto ret; + } + if (strcmp(token, "") != 0) { + strncpy(token2, token, len); + n++; + } + + token = strtok_r(NULL, separator, &context); + if (!token) { + goto ret; + } + if (strcmp(token, "") != 0) { + strncpy(token3, token, len); + n++; + } + +ret: + return n; + free(tmp); +} + void *startServerFunc(void *param) { SRaftServer *pServer = (SRaftServer*)param; int32_t r = raftServerStart(pServer); @@ -22,6 +84,86 @@ void *startServerFunc(void *param) { } // Console --------------------------------- +const char* state2String(unsigned short state) { + if (state == RAFT_UNAVAILABLE) { + return "RAFT_UNAVAILABLE"; + + } else if (state == RAFT_FOLLOWER) { + return "RAFT_FOLLOWER"; + + } else if (state == RAFT_CANDIDATE) { + return "RAFT_CANDIDATE"; + + } else if (state == RAFT_LEADER) { + return "RAFT_LEADER"; + + } + return "UNKNOWN_RAFT_STATE"; +} + +void printRaftConfiguration(struct raft_configuration *c) { + printf("configuration: \n"); + for (int i = 0; i < c->n; ++i) { + printf("%llu -- %d -- %s\n", c->servers->id, c->servers->role, c->servers->address); + } +} + +void printRaftState(struct raft *r) { + printf("----Raft State: -----------\n"); + printf("my_id: %llu \n", r->id); + printf("address: %s \n", r->address); + printf("current_term: %llu \n", r->current_term); + printf("voted_for: %llu \n", r->voted_for); + printf("role: %s \n", state2String(r->state)); + printf("commit_index: %llu \n", r->commit_index); + printf("last_applied: %llu \n", r->last_applied); + printf("last_stored: %llu \n", r->last_stored); + + /* + printf("configuration_index: %llu \n", r->configuration_index); + printf("configuration_uncommitted_index: %llu \n", r->configuration_uncommitted_index); + printRaftConfiguration(&r->configuration); + */ + + printf("----------------------------\n"); +} + +void putValueCb(struct raft_apply *req, int status, void *result) { + raft_free(req); + struct raft *r = req->data; + if (status != 0) { + printf("putValueCb: %s \n", raft_errmsg(r)); + } else { + printf("putValueCb: %s \n", "ok"); + } +} + +void putValue(struct raft *r, const char *value) { + struct raft_buffer buf; + + buf.len = TOKEN_LEN;; + buf.base = raft_malloc(buf.len); + snprintf(buf.base, buf.len, "%s", value); + + struct raft_apply *req = raft_malloc(sizeof(struct raft_apply)); + req->data = r; + int ret = raft_apply(r, req, &buf, 1, putValueCb); + if (ret == 0) { + printf("put %s \n", (char*)buf.base); + } else { + printf("put error: %s \n", raft_errmsg(r)); + } +} + +void getValue(const char *key) { + char *ptr = getKV(key); + if (ptr) { + printf("get value: [%s] \n", ptr); + } else { + printf("value not found for key: [%s] \n", key); + } +} + void console(SRaftServer *pRaftServer) { while (1) { char cmd_buf[COMMAND_LEN]; @@ -40,7 +182,68 @@ void console(SRaftServer *pRaftServer) { continue; } - printf("cmd_buf: [%s] \n", cmd_buf); + char cmd[TOKEN_LEN]; + memset(cmd, 0, sizeof(cmd)); + + char param1[TOKEN_LEN]; + memset(param1, 0, sizeof(param1)); + + char param2[TOKEN_LEN]; + memset(param2, 0, sizeof(param2)); + + parseCommand(cmd_buf, cmd, param1, param2, TOKEN_LEN); + if (strcmp(cmd, "addnode") == 0) { + printf("not support \n"); + + /* + char host[HOST_LEN]; + uint32_t port; + parseAddr(param1, host, HOST_LEN, &port); + uint64_t rid = raftId(host, port); + + struct raft_change *req = raft_malloc(sizeof(*req)); + int r = raft_add(&pRaftServer->raft, req, rid, param1, NULL); + if (r != 0) { + printf("raft_add: %s \n", raft_errmsg(&pRaftServer->raft)); + } + printf("add node: %lu %s \n", rid, param1); + + struct raft_change *req2 = raft_malloc(sizeof(*req2)); + r = raft_assign(&pRaftServer->raft, req2, rid, RAFT_VOTER, NULL); + if (r != 0) { + printf("raft_assign: %s \n", raft_errmsg(&pRaftServer->raft)); + } + */ + + } else if (strcmp(cmd, "dropnode") == 0) { + printf("not support \n"); + + } else if (strcmp(cmd, "put") == 0) { + char buf[256]; + snprintf(buf, sizeof(buf), "%s--%s", param1, param2); + putValue(&pRaftServer->raft, buf); + + } else if (strcmp(cmd, "get") == 0) { + getValue(param1); + + } else if (strcmp(cmd, "state") == 0) { + printRaftState(&pRaftServer->raft); + + } else if (strcmp(cmd, "snapshot") == 0) { + printf("not support \n"); + + } else if (strcmp(cmd, "help") == 0) { + printf("addnode \"127.0.0.1:8888\" \n"); + printf("dropnode \"127.0.0.1:8888\" \n"); + printf("put key value \n"); + printf("get key \n"); + printf("state \n"); + + } else { + printf("unknown command: [%s], type \"help\" to see help \n", cmd); + } + + //printf("cmd_buf: [%s] \n", cmd_buf); } } @@ -51,38 +254,86 @@ void *startConsoleFunc(void *param) { } // Config --------------------------------- -typedef struct SRaftServerConfig { - char host[HOST_LEN]; - uint32_t port; - char dir[DIR_LEN]; -} SRaftServerConfig; +void usage() { + printf("\nusage: \n"); + printf("%s --me=127.0.0.1:10000 --dir=./data \n", exe_name); + printf("\n"); + printf("%s --me=127.0.0.1:10000 --peers=127.0.0.1:10001,127.0.0.1:10002 --dir=./data \n", exe_name); + printf("%s --me=127.0.0.1:10001 --peers=127.0.0.1:10000,127.0.0.1:10002 --dir=./data \n", exe_name); + printf("%s --me=127.0.0.1:10002 --peers=127.0.0.1:10000,127.0.0.1:10001 --dir=./data \n", exe_name); + printf("\n"); +} void parseConf(int argc, char **argv, SRaftServerConfig *pConf) { - snprintf(pConf->host, sizeof(pConf->host), "%s", argv[1]); - sscanf(argv[2], "%u", &pConf->port); - snprintf(pConf->dir, sizeof(pConf->dir), "%s", argv[3]); + memset(pConf, 0, sizeof(*pConf)); + + int option_index, option_value; + option_index = 0; + static struct option long_options[] = { + {"help", no_argument, NULL, 'h'}, + {"peers", required_argument, NULL, 'p'}, + {"me", required_argument, NULL, 'm'}, + {"dir", required_argument, NULL, 'd'}, + {NULL, 0, NULL, 0} + }; + + while ((option_value = getopt_long(argc, argv, "hp:m:d:", long_options, &option_index)) != -1) { + switch (option_value) { + case 'm': { + parseAddr(optarg, pConf->me.host, sizeof(pConf->me.host), &pConf->me.port); + break; + } + + case 'p': { + char tokens[MAX_PEERS][MAX_TOKEN_LEN]; + int peerCount = splitString(optarg, ",", tokens, MAX_PEERS); + pConf->peersCount = peerCount; + for (int i = 0; i < peerCount; ++i) { + Addr *pAddr = &pConf->peers[i]; + parseAddr(tokens[i], pAddr->host, sizeof(pAddr->host), &pAddr->port); + } + break; + } + + + case 'd': { + snprintf(pConf->dir, sizeof(pConf->dir), "%s", optarg); + break; + } + + case 'h': { + usage(); + exit(-1); + } + + default: { + usage(); + exit(-1); + } + } + } + snprintf(pConf->dataDir, sizeof(pConf->dataDir), "%s/%s:%u", pConf->dir, pConf->me.host, pConf->me.port); } void printConf(SRaftServerConfig *pConf) { - printf("conf: %s:%u %s \n", pConf->host, pConf->port, pConf->dir); + printf("\nconf: \n"); + printf("me: %s:%u \n", pConf->me.host, pConf->me.port); + printf("peersCount: %d \n", pConf->peersCount); + for (int i = 0; i < pConf->peersCount; ++i) { + Addr *pAddr = &pConf->peers[i]; + printf("peer%d: %s:%u \n", i, pAddr->host, pAddr->port); + } + printf("dataDir: %s \n\n", pConf->dataDir); + } -// ----------------------------------------- -void usage() { - printf("\n"); - printf("usage: %s host port dir \n", exe_name); - printf("\n"); - printf("eg: \n"); - printf("%s 127.0.0.1 10000 ./data \n", exe_name); - printf("\n"); -} int main(int argc, char **argv) { srand(time(NULL)); int32_t ret; exe_name = argv[0]; - if (argc != 4) { + if (argc < 3) { usage(); exit(-1); } @@ -91,11 +342,15 @@ int main(int argc, char **argv) { parseConf(argc, argv, &conf); printConf(&conf); + char cmd_buf[COMMAND_LEN]; + snprintf(cmd_buf, sizeof(cmd_buf), "mkdir -p %s", conf.dataDir); + system(cmd_buf); + struct raft_fsm fsm; initFsm(&fsm); SRaftServer raftServer; - ret = raftServerInit(&raftServer, conf.host, conf.port, conf.dir, &fsm); + ret = raftServerInit(&raftServer, &conf, &fsm); assert(ret == 0); pthread_t tidRaftServer; diff --git a/contrib/test/craft/raftServer.c b/contrib/test/craft/raftServer.c index 4a4d6cb7a3..6f4dbc1997 100644 --- a/contrib/test/craft/raftServer.c +++ b/contrib/test/craft/raftServer.c @@ -2,6 +2,70 @@ #include "common.h" #include "raftServer.h" +char *keys; +char *values; + +void initStore() { + keys = malloc(MAX_RECORD_COUNT * MAX_KV_LEN); + values = malloc(MAX_RECORD_COUNT * MAX_KV_LEN); + writeIndex = 0; +} + +void destroyStore() { + free(keys); + free(values); +} + +void putKV(const char *key, const char *value) { + if (writeIndex < MAX_RECORD_COUNT) { + strncpy(&keys[writeIndex], key, MAX_KV_LEN); + strncpy(&values[writeIndex], value, MAX_KV_LEN); + writeIndex++; + } +} + +char *getKV(const char *key) { + for (int i = 0; i < MAX_RECORD_COUNT; ++i) { + if (strcmp(&keys[i], key) == 0) { + return &values[i]; + } + } + return NULL; +} + + +int splitString(const char* str, char* separator, char (*arr)[MAX_TOKEN_LEN], int n_arr) +{ + if (n_arr <= 0) { + return -1; + } + + char* tmp = (char*)malloc(strlen(str) + 1); + strcpy(tmp, str); + char* context; + int n = 0; + + char* token = strtok_r(tmp, separator, &context); + if (!token) { + goto ret; + } + strncpy(arr[n], token, MAX_TOKEN_LEN); + n++; + + while (1) { + token = strtok_r(NULL, separator, &context); + if (!token || n >= n_arr) { + goto ret; + } + strncpy(arr[n], token, MAX_TOKEN_LEN); + n++; + } + +ret: + free(tmp); + return n; +} + uint64_t raftId(const char *host, uint32_t port) { uint32_t host_uint32 = (uint32_t)inet_addr(host); assert(host_uint32 != (uint32_t)-1); @@ -9,17 +73,13 @@ uint64_t raftId(const char *host, uint32_t port) { return code; } -int32_t raftServerInit(SRaftServer *pRaftServer, const char *host, uint32_t port, const char *dir, struct raft_fsm *pFsm) { +int32_t raftServerInit(SRaftServer *pRaftServer, const SRaftServerConfig *pConf, struct raft_fsm *pFsm) { int ret; - char cmd_buf[COMMAND_LEN]; - snprintf(cmd_buf, sizeof(cmd_buf), "mkdir -p %s", dir); - system(cmd_buf); - - snprintf(pRaftServer->host, sizeof(pRaftServer->host), "%s", host); - pRaftServer->port = port; - snprintf(pRaftServer->address, sizeof(pRaftServer->address), "%s:%u", host, port); - strncpy(pRaftServer->dir, dir, sizeof(pRaftServer->dir)); + snprintf(pRaftServer->host, sizeof(pRaftServer->host), "%s", pConf->me.host); + pRaftServer->port = pConf->me.port; + snprintf(pRaftServer->address, sizeof(pRaftServer->address), "%s:%u", pRaftServer->host, pRaftServer->port); + strncpy(pRaftServer->dir, pConf->dataDir, sizeof(pRaftServer->dir)); pRaftServer->raftId = raftId(pRaftServer->host, pRaftServer->port); pRaftServer->fsm = pFsm; @@ -34,7 +94,7 @@ int32_t raftServerInit(SRaftServer *pRaftServer, const char *host, uint32_t port fprintf(stderr, "%s \n", raft_errmsg(&pRaftServer->raft)); } - ret = raft_uv_init(&pRaftServer->io, &pRaftServer->loop, dir, &pRaftServer->transport); + ret = raft_uv_init(&pRaftServer->io, &pRaftServer->loop, pRaftServer->dir, &pRaftServer->transport); if (!ret) { fprintf(stderr, "%s \n", raft_errmsg(&pRaftServer->raft)); } @@ -47,6 +107,16 @@ int32_t raftServerInit(SRaftServer *pRaftServer, const char *host, uint32_t port struct raft_configuration conf; raft_configuration_init(&conf); raft_configuration_add(&conf, pRaftServer->raftId, pRaftServer->address, RAFT_VOTER); + printf("add myself: %llu - %s \n", pRaftServer->raftId, pRaftServer->address); + for (int i = 0; i < pConf->peersCount; ++i) { + const Addr *pAddr = &pConf->peers[i]; + raft_id rid = raftId(pAddr->host, pAddr->port); + char addrBuf[ADDRESS_LEN]; + snprintf(addrBuf, sizeof(addrBuf), "%s:%u", pAddr->host, pAddr->port); + raft_configuration_add(&conf, rid, addrBuf, RAFT_VOTER); + printf("add peers: %llu - %s \n", rid, addrBuf); + } + raft_bootstrap(&pRaftServer->raft, &conf); return 0; @@ -70,12 +140,17 @@ void raftServerClose(SRaftServer *pRaftServer) { int fsmApplyCb(struct raft_fsm *pFsm, const struct raft_buffer *buf, void **result) { char *msg = (char*)buf->base; - printf("%s \n", msg); + printf("fsm apply: %s \n", msg); + + char arr[2][MAX_TOKEN_LEN]; + splitString(msg, "--", arr, 2); + putKV(arr[0], arr[1]); return 0; } int32_t initFsm(struct raft_fsm *fsm) { + initStore(); fsm->apply = fsmApplyCb; return 0; } diff --git a/contrib/test/craft/raftServer.h b/contrib/test/craft/raftServer.h index 44cfa54dfe..5fccde6bf2 100644 --- a/contrib/test/craft/raftServer.h +++ b/contrib/test/craft/raftServer.h @@ -11,12 +11,23 @@ extern "C" { #include #include "raft.h" #include "raft/uv.h" +#include "common.h" + + +// simulate a db store, just for test +#define MAX_KV_LEN 100 +#define MAX_RECORD_COUNT 500 +char *keys; +char *values; +int writeIndex; + +void initStore(); +void destroyStore(); +void putKV(const char *key, const char *value); +char *getKV(const char *key); -#define DIR_LEN 128 -#define HOST_LEN 128 -#define ADDRESS_LEN (HOST_LEN + 16) typedef struct { - char dir[DIR_LEN]; /* Data dir of UV I/O backend */ + char dir[DIR_LEN + HOST_LEN * 2]; /* Data dir of UV I/O backend */ char host[HOST_LEN]; uint32_t port; char address[ADDRESS_LEN]; /* Raft instance address */ @@ -29,7 +40,11 @@ typedef struct { struct raft_uv_transport transport; /* UV I/O backend transport */ } SRaftServer; -int32_t raftServerInit(SRaftServer *pRaftServer, const char *host, uint32_t port, const char *dir, struct raft_fsm *pFsm); +#define MAX_TOKEN_LEN 32 +int splitString(const char* str, char* separator, char (*arr)[MAX_TOKEN_LEN], int n_arr); + +uint64_t raftId(const char *host, uint32_t port); +int32_t raftServerInit(SRaftServer *pRaftServer, const SRaftServerConfig *pConf, struct raft_fsm *pFsm); int32_t raftServerStart(SRaftServer *pRaftServer); void raftServerClose(SRaftServer *pRaftServer);