From 3f3fb4a27168c8d42262a53833ebda0f3ee20734 Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Tue, 28 Dec 2021 17:44:01 +0800 Subject: [PATCH] add node by manual operation --- contrib/test/craft/common.h | 1 + contrib/test/craft/raftMain.c | 76 +++++++++++++++++++++++++++------ contrib/test/craft/raftServer.c | 26 ++++++++--- 3 files changed, 85 insertions(+), 18 deletions(-) diff --git a/contrib/test/craft/common.h b/contrib/test/craft/common.h index 1e94ee8bca..282e282543 100644 --- a/contrib/test/craft/common.h +++ b/contrib/test/craft/common.h @@ -20,6 +20,7 @@ typedef struct { } Addr; typedef struct { + int voter; Addr me; Addr peers[MAX_PEERS]; int peersCount; diff --git a/contrib/test/craft/raftMain.c b/contrib/test/craft/raftMain.c index 52e0b694dc..bae083cf94 100644 --- a/contrib/test/craft/raftMain.c +++ b/contrib/test/craft/raftMain.c @@ -104,7 +104,7 @@ const char* state2String(unsigned short state) { void printRaftConfiguration(struct raft_configuration *c) { printf("configuration: \n"); for (int i = 0; i < c->n; ++i) { - printf("%llu -- %d -- %s\n", c->servers->id, c->servers->role, c->servers->address); + printf("%llu -- %d -- %s\n", c->servers[i].id, c->servers[i].role, c->servers[i].address); } } @@ -119,11 +119,9 @@ void printRaftState(struct raft *r) { printf("last_applied: %llu \n", r->last_applied); printf("last_stored: %llu \n", r->last_stored); - /* printf("configuration_index: %llu \n", r->configuration_index); printf("configuration_uncommitted_index: %llu \n", r->configuration_uncommitted_index); printRaftConfiguration(&r->configuration); - */ printf("----------------------------\n"); } @@ -164,6 +162,18 @@ void getValue(const char *key) { } } +void raft_change_cb_add(struct raft_change *req, int status) { + printf("raft_change_cb_add status:%d ... \n", status); +} + +void raft_change_cb_assign(struct raft_change *req, int status) { + printf("raft_change_cb_assign status:%d ... \n", status); +} + +void raft_change_cb_remove(struct raft_change *req, int status) { + printf("raft_change_cb_remove status:%d ... \n", status); +} + void console(SRaftServer *pRaftServer) { while (1) { char cmd_buf[COMMAND_LEN]; @@ -193,30 +203,59 @@ void console(SRaftServer *pRaftServer) { parseCommand(cmd_buf, cmd, param1, param2, TOKEN_LEN); if (strcmp(cmd, "addnode") == 0) { - printf("not support \n"); + //printf("not support \n"); - /* char host[HOST_LEN]; uint32_t port; parseAddr(param1, host, HOST_LEN, &port); uint64_t rid = raftId(host, port); struct raft_change *req = raft_malloc(sizeof(*req)); - int r = raft_add(&pRaftServer->raft, req, rid, param1, NULL); + int r = raft_add(&pRaftServer->raft, req, rid, param1, raft_change_cb_add); if (r != 0) { - printf("raft_add: %s \n", raft_errmsg(&pRaftServer->raft)); + printf("raft_add error: %s \n", raft_errmsg(&pRaftServer->raft)); } printf("add node: %lu %s \n", rid, param1); struct raft_change *req2 = raft_malloc(sizeof(*req2)); - r = raft_assign(&pRaftServer->raft, req2, rid, RAFT_VOTER, NULL); + r = raft_assign(&pRaftServer->raft, req2, rid, RAFT_VOTER, raft_change_cb_assign); if (r != 0) { - printf("raft_assign: %s \n", raft_errmsg(&pRaftServer->raft)); + printf("raft_assign error: %s \n", raft_errmsg(&pRaftServer->raft)); } - */ + printf("raft_assign: %s %d \n", param1, RAFT_VOTER); + + } else if (strcmp(cmd, "activate") == 0) { + char host[HOST_LEN]; + uint32_t port; + parseAddr(param1, host, HOST_LEN, &port); + uint64_t rid = raftId(host, port); + + + struct raft_change *req2 = raft_malloc(sizeof(*req2)); + int r = raft_assign(&pRaftServer->raft, req2, rid, RAFT_VOTER, raft_change_cb_assign); + if (r != 0) { + printf("raft_assign error: %s \n", raft_errmsg(&pRaftServer->raft)); + } + printf("raft_assign: %s %d \n", param1, RAFT_VOTER); + + + + } else if (strcmp(cmd, "dropnode") == 0) { - printf("not support \n"); + char host[HOST_LEN]; + uint32_t port; + parseAddr(param1, host, HOST_LEN, &port); + uint64_t rid = raftId(host, port); + + struct raft_change *req = raft_malloc(sizeof(*req)); + int r = raft_remove(&pRaftServer->raft, req, rid, raft_change_cb_remove); + if (r != 0) { + printf("raft_remove: %s \n", raft_errmsg(&pRaftServer->raft)); + } + printf("drop node: %lu %s \n", rid, param1); + + } else if (strcmp(cmd, "put") == 0) { char buf[256]; @@ -234,6 +273,7 @@ void console(SRaftServer *pRaftServer) { } else if (strcmp(cmd, "help") == 0) { printf("addnode \"127.0.0.1:8888\" \n"); + printf("activate \"127.0.0.1:8888\" \n"); printf("dropnode \"127.0.0.1:8888\" \n"); printf("put key value \n"); printf("get key \n"); @@ -256,7 +296,9 @@ void *startConsoleFunc(void *param) { // Config --------------------------------- void usage() { printf("\nusage: \n"); - printf("%s --me=127.0.0.1:10000 --dir=./data \n", exe_name); + printf("%s --me=127.0.0.1:10000 --dir=./data --voter \n", exe_name); + printf("%s --me=127.0.0.1:10001 --dir=./data \n", exe_name); + printf("%s --me=127.0.0.1:10002 --dir=./data \n", exe_name); printf("\n"); printf("%s --me=127.0.0.1:10000 --peers=127.0.0.1:10001,127.0.0.1:10002 --dir=./data \n", exe_name); printf("%s --me=127.0.0.1:10001 --peers=127.0.0.1:10000,127.0.0.1:10002 --dir=./data \n", exe_name); @@ -271,13 +313,15 @@ void parseConf(int argc, char **argv, SRaftServerConfig *pConf) { option_index = 0; static struct option long_options[] = { {"help", no_argument, NULL, 'h'}, + {"voter", no_argument, NULL, 'v'}, {"peers", required_argument, NULL, 'p'}, {"me", required_argument, NULL, 'm'}, {"dir", required_argument, NULL, 'd'}, {NULL, 0, NULL, 0} }; - while ((option_value = getopt_long(argc, argv, "hp:m:d:", long_options, &option_index)) != -1) { + pConf->voter = 0; + while ((option_value = getopt_long(argc, argv, "hvp:m:d:", long_options, &option_index)) != -1) { switch (option_value) { case 'm': { parseAddr(optarg, pConf->me.host, sizeof(pConf->me.host), &pConf->me.port); @@ -295,6 +339,10 @@ void parseConf(int argc, char **argv, SRaftServerConfig *pConf) { break; } + case 'v': { + pConf->voter = 1; + break; + } case 'd': { snprintf(pConf->dir, sizeof(pConf->dir), "%s", optarg); @@ -338,6 +386,8 @@ int main(int argc, char **argv) { exit(-1); } + signal(SIGPIPE, SIG_IGN); + SRaftServerConfig conf; parseConf(argc, argv, &conf); printConf(&conf); diff --git a/contrib/test/craft/raftServer.c b/contrib/test/craft/raftServer.c index 6f4dbc1997..ffec22e646 100644 --- a/contrib/test/craft/raftServer.c +++ b/contrib/test/craft/raftServer.c @@ -85,29 +85,45 @@ int32_t raftServerInit(SRaftServer *pRaftServer, const SRaftServerConfig *pConf, pRaftServer->fsm = pFsm; ret = uv_loop_init(&pRaftServer->loop); - if (!ret) { + if (ret != 0) { fprintf(stderr, "%s \n", raft_errmsg(&pRaftServer->raft)); + assert(0); } ret = raft_uv_tcp_init(&pRaftServer->transport, &pRaftServer->loop); - if (!ret) { + if (ret != 0) { fprintf(stderr, "%s \n", raft_errmsg(&pRaftServer->raft)); + assert(0); } ret = raft_uv_init(&pRaftServer->io, &pRaftServer->loop, pRaftServer->dir, &pRaftServer->transport); - if (!ret) { + if (ret != 0) { fprintf(stderr, "%s \n", raft_errmsg(&pRaftServer->raft)); + assert(0); } ret = raft_init(&pRaftServer->raft, &pRaftServer->io, pRaftServer->fsm, pRaftServer->raftId, pRaftServer->address); - if (!ret) { + if (ret != 0) { fprintf(stderr, "%s \n", raft_errmsg(&pRaftServer->raft)); + assert(0); } struct raft_configuration conf; raft_configuration_init(&conf); - raft_configuration_add(&conf, pRaftServer->raftId, pRaftServer->address, RAFT_VOTER); + + if (pConf->voter == 0) { + raft_configuration_add(&conf, pRaftServer->raftId, pRaftServer->address, RAFT_SPARE); + + } else { + raft_configuration_add(&conf, pRaftServer->raftId, pRaftServer->address, RAFT_VOTER); + + } + + + printf("add myself: %llu - %s \n", pRaftServer->raftId, pRaftServer->address); + + for (int i = 0; i < pConf->peersCount; ++i) { const Addr *pAddr = &pConf->peers[i]; raft_id rid = raftId(pAddr->host, pAddr->port);