diff --git a/contrib/test/craft/clear.sh b/contrib/test/craft/clear.sh index 6412656d77..398b3088f2 100644 --- a/contrib/test/craft/clear.sh +++ b/contrib/test/craft/clear.sh @@ -1,3 +1,4 @@ #!/bin/bash rm -rf 127.0.0.1* +rm -rf ./data diff --git a/contrib/test/craft/common.h b/contrib/test/craft/common.h new file mode 100644 index 0000000000..670e5fb927 --- /dev/null +++ b/contrib/test/craft/common.h @@ -0,0 +1,17 @@ +#ifndef TDENGINE_COMMON_H +#define TDENGINE_COMMON_H + +#ifdef __cplusplus +extern "C" { +#endif + +#define COMMAND_LEN 256 +#define DIR_LEN 128 +#define HOST_LEN 128 +#define ADDRESS_LEN (HOST_LEN + 16) + +#ifdef __cplusplus +} +#endif + +#endif // TDENGINE_COMMON_H diff --git a/contrib/test/craft/raftMain.c b/contrib/test/craft/raftMain.c index 6e9f3b5dfb..9d00cb4f70 100644 --- a/contrib/test/craft/raftMain.c +++ b/contrib/test/craft/raftMain.c @@ -1,6 +1,4 @@ #include -#include -#include #include #include #include @@ -8,9 +6,10 @@ #include #include #include +#include +#include #include "raftServer.h" - -#define COMMAND_LEN 128 +#include "common.h" const char *exe_name; @@ -52,8 +51,6 @@ void *startConsoleFunc(void *param) { } // Config --------------------------------- -#define DIR_LEN 128 -#define HOST_LEN 128 typedef struct SRaftServerConfig { char host[HOST_LEN]; uint32_t port; @@ -61,16 +58,22 @@ typedef struct SRaftServerConfig { } SRaftServerConfig; void parseConf(int argc, char **argv, SRaftServerConfig *pConf) { - snprintf(pConf->dir, sizeof(pConf->dir), "%s", argv[1]); + snprintf(pConf->host, sizeof(pConf->host), "%s", argv[1]); sscanf(argv[2], "%u", &pConf->port); snprintf(pConf->dir, sizeof(pConf->dir), "%s", argv[3]); } +void printConf(SRaftServerConfig *pConf) { + printf("conf: %s:%u %s \n", pConf->host, pConf->port, pConf->dir); +} + // ----------------------------------------- void usage() { printf("\n"); printf("usage: %s host port dir \n", exe_name); - printf("eg : %s 127.0.0.1 10000 ./data \n", exe_name); + printf("\n"); + printf("eg: \n"); + printf("%s 127.0.0.1 10000 ./data \n", exe_name); printf("\n"); } @@ -86,12 +89,14 @@ int main(int argc, char **argv) { SRaftServerConfig conf; parseConf(argc, argv, &conf); - + printConf(&conf); + struct raft_fsm fsm; initFsm(&fsm); SRaftServer raftServer; - ret = raftServerInit(&raftServer, &conf, &fsm); + ret = raftServerInit(&raftServer, conf.host, conf.port, conf.dir, &fsm); + assert(ret == 0); pthread_t tidRaftServer; pthread_create(&tidRaftServer, NULL, startServerFunc, &raftServer); diff --git a/contrib/test/craft/raftServer.c b/contrib/test/craft/raftServer.c index 5633b211ac..4a4d6cb7a3 100644 --- a/contrib/test/craft/raftServer.c +++ b/contrib/test/craft/raftServer.c @@ -1,12 +1,65 @@ +#include +#include "common.h" #include "raftServer.h" -int32_t raftServerInit(SRaftServer *pRaftServer, struct SRaftServerConfig *pConf, struct raft_fsm *pFsm) { +uint64_t raftId(const char *host, uint32_t port) { + uint32_t host_uint32 = (uint32_t)inet_addr(host); + assert(host_uint32 != (uint32_t)-1); + uint64_t code = ((uint64_t)host_uint32) << 32 | port; + return code; +} + +int32_t raftServerInit(SRaftServer *pRaftServer, const char *host, uint32_t port, const char *dir, struct raft_fsm *pFsm) { + int ret; + + char cmd_buf[COMMAND_LEN]; + snprintf(cmd_buf, sizeof(cmd_buf), "mkdir -p %s", dir); + system(cmd_buf); + + snprintf(pRaftServer->host, sizeof(pRaftServer->host), "%s", host); + pRaftServer->port = port; + snprintf(pRaftServer->address, sizeof(pRaftServer->address), "%s:%u", host, port); + strncpy(pRaftServer->dir, dir, sizeof(pRaftServer->dir)); + + pRaftServer->raftId = raftId(pRaftServer->host, pRaftServer->port); + pRaftServer->fsm = pFsm; + + ret = uv_loop_init(&pRaftServer->loop); + if (!ret) { + fprintf(stderr, "%s \n", raft_errmsg(&pRaftServer->raft)); + } + + ret = raft_uv_tcp_init(&pRaftServer->transport, &pRaftServer->loop); + if (!ret) { + fprintf(stderr, "%s \n", raft_errmsg(&pRaftServer->raft)); + } + + ret = raft_uv_init(&pRaftServer->io, &pRaftServer->loop, dir, &pRaftServer->transport); + if (!ret) { + fprintf(stderr, "%s \n", raft_errmsg(&pRaftServer->raft)); + } + + ret = raft_init(&pRaftServer->raft, &pRaftServer->io, pRaftServer->fsm, pRaftServer->raftId, pRaftServer->address); + if (!ret) { + fprintf(stderr, "%s \n", raft_errmsg(&pRaftServer->raft)); + } + + struct raft_configuration conf; + raft_configuration_init(&conf); + raft_configuration_add(&conf, pRaftServer->raftId, pRaftServer->address, RAFT_VOTER); + raft_bootstrap(&pRaftServer->raft, &conf); return 0; } int32_t raftServerStart(SRaftServer *pRaftServer) { + int ret; + ret = raft_start(&pRaftServer->raft); + if (!ret) { + fprintf(stderr, "%s \n", raft_errmsg(&pRaftServer->raft)); + } + uv_run(&pRaftServer->loop, UV_RUN_DEFAULT); } @@ -14,7 +67,15 @@ void raftServerClose(SRaftServer *pRaftServer) { } -int32_t initFsm(struct raft_fsm *fsm) { +int fsmApplyCb(struct raft_fsm *pFsm, const struct raft_buffer *buf, void **result) { + char *msg = (char*)buf->base; + printf("%s \n", msg); + + return 0; +} + +int32_t initFsm(struct raft_fsm *fsm) { + fsm->apply = fsmApplyCb; return 0; } diff --git a/contrib/test/craft/raftServer.h b/contrib/test/craft/raftServer.h index 9a717260de..44cfa54dfe 100644 --- a/contrib/test/craft/raftServer.h +++ b/contrib/test/craft/raftServer.h @@ -5,15 +5,20 @@ extern "C" { #endif +#include +#include +#include +#include #include "raft.h" #include "raft/uv.h" - - #define DIR_LEN 128 -#define ADDRESS_LEN 128 +#define HOST_LEN 128 +#define ADDRESS_LEN (HOST_LEN + 16) typedef struct { char dir[DIR_LEN]; /* Data dir of UV I/O backend */ + char host[HOST_LEN]; + uint32_t port; char address[ADDRESS_LEN]; /* Raft instance address */ raft_id raftId; /* For vote */ struct raft_fsm *fsm; /* Sample application FSM */ @@ -24,8 +29,7 @@ typedef struct { struct raft_uv_transport transport; /* UV I/O backend transport */ } SRaftServer; -struct SRaftServerConfig; -int32_t raftServerInit(SRaftServer *pRaftServer, struct SRaftServerConfig *pConf, struct raft_fsm *pFsm); +int32_t raftServerInit(SRaftServer *pRaftServer, const char *host, uint32_t port, const char *dir, struct raft_fsm *pFsm); int32_t raftServerStart(SRaftServer *pRaftServer); void raftServerClose(SRaftServer *pRaftServer);