TD-12666 add vnode into vgroup one by one
This commit is contained in:
parent
c5cd0766ed
commit
b76938130e
|
@ -48,10 +48,11 @@ int32_t raftServerInit(SRaftServer *pRaftServer, const SRaftServerConfig *pConf,
|
||||||
int32_t raftServerStart(SRaftServer *pRaftServer);
|
int32_t raftServerStart(SRaftServer *pRaftServer);
|
||||||
void raftServerClose(SRaftServer *pRaftServer);
|
void raftServerClose(SRaftServer *pRaftServer);
|
||||||
|
|
||||||
|
|
||||||
int initFsm(struct raft_fsm *fsm);
|
int initFsm(struct raft_fsm *fsm);
|
||||||
|
|
||||||
|
const char* state2String(unsigned short state);
|
||||||
|
void printRaftConfiguration(struct raft_configuration *c);
|
||||||
|
void printRaftState(struct raft *r);
|
||||||
|
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
|
|
|
@ -1,7 +1,3 @@
|
||||||
add_executable(raftMain "")
|
add_subdirectory(rebalance_leader)
|
||||||
target_sources(raftMain
|
add_subdirectory(make_cluster)
|
||||||
PRIVATE
|
|
||||||
"raftMain.c"
|
|
||||||
"raftServer.c"
|
|
||||||
)
|
|
||||||
target_link_libraries(raftMain PUBLIC traft lz4 uv_a)
|
|
||||||
|
|
|
@ -0,0 +1,11 @@
|
||||||
|
add_executable(makeCluster "")
|
||||||
|
target_sources(makeCluster
|
||||||
|
PRIVATE
|
||||||
|
"raftMain.c"
|
||||||
|
"raftServer.c"
|
||||||
|
"config.c"
|
||||||
|
"console.c"
|
||||||
|
"simpleHash.c"
|
||||||
|
"util.c"
|
||||||
|
)
|
||||||
|
target_link_libraries(makeCluster PUBLIC traft lz4 uv_a)
|
|
@ -0,0 +1,23 @@
|
||||||
|
#ifndef TRAFT_COMMON_H
|
||||||
|
#define TRAFT_COMMON_H
|
||||||
|
|
||||||
|
#ifdef __cplusplus
|
||||||
|
extern "C" {
|
||||||
|
#endif
|
||||||
|
|
||||||
|
#include <stdint.h>
|
||||||
|
|
||||||
|
#define COMMAND_LEN 512
|
||||||
|
#define MAX_CMD_COUNT 10
|
||||||
|
#define TOKEN_LEN 128
|
||||||
|
#define MAX_PEERS_COUNT 19
|
||||||
|
|
||||||
|
#define HOST_LEN 64
|
||||||
|
#define ADDRESS_LEN (HOST_LEN * 2)
|
||||||
|
#define BASE_DIR_LEN 128
|
||||||
|
|
||||||
|
#ifdef __cplusplus
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
|
#endif
|
|
@ -0,0 +1,64 @@
|
||||||
|
#include "config.h"
|
||||||
|
#include <stdio.h>
|
||||||
|
#include <stdlib.h>
|
||||||
|
#include <string.h>
|
||||||
|
|
||||||
|
void addrToString(const char *host, uint16_t port, char *addr, int len) { snprintf(addr, len, "%s:%hu", host, port); }
|
||||||
|
|
||||||
|
void parseAddr(const char *addr, char *host, int len, uint16_t *port) {
|
||||||
|
char *tmp = (char *)malloc(strlen(addr) + 1);
|
||||||
|
strcpy(tmp, addr);
|
||||||
|
|
||||||
|
char *context;
|
||||||
|
char *separator = ":";
|
||||||
|
char *token = strtok_r(tmp, separator, &context);
|
||||||
|
if (token) {
|
||||||
|
snprintf(host, len, "%s", token);
|
||||||
|
}
|
||||||
|
|
||||||
|
token = strtok_r(NULL, separator, &context);
|
||||||
|
if (token) {
|
||||||
|
sscanf(token, "%hu", port);
|
||||||
|
}
|
||||||
|
|
||||||
|
free(tmp);
|
||||||
|
}
|
||||||
|
|
||||||
|
int parseConf(int argc, char **argv, RaftServerConfig *pConf) {
|
||||||
|
memset(pConf, 0, sizeof(*pConf));
|
||||||
|
|
||||||
|
int option_index, option_value;
|
||||||
|
option_index = 0;
|
||||||
|
static struct option long_options[] = {{"help", no_argument, NULL, 'h'},
|
||||||
|
{"addr", required_argument, NULL, 'a'},
|
||||||
|
{"dir", required_argument, NULL, 'd'},
|
||||||
|
{NULL, 0, NULL, 0}};
|
||||||
|
|
||||||
|
while ((option_value = getopt_long(argc, argv, "ha:d:", long_options, &option_index)) != -1) {
|
||||||
|
switch (option_value) {
|
||||||
|
case 'a': {
|
||||||
|
parseAddr(optarg, pConf->me.host, sizeof(pConf->me.host), &pConf->me.port);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
case 'd': {
|
||||||
|
snprintf(pConf->baseDir, sizeof(pConf->baseDir), "%s", optarg);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
case 'h': {
|
||||||
|
return -2;
|
||||||
|
}
|
||||||
|
|
||||||
|
default: { return -2; }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
void printConf(RaftServerConfig *pConf) {
|
||||||
|
printf("\n---printConf: \n");
|
||||||
|
printf("me: [%s:%hu] \n", pConf->me.host, pConf->me.port);
|
||||||
|
printf("dataDir: [%s] \n\n", pConf->baseDir);
|
||||||
|
}
|
|
@ -0,0 +1,31 @@
|
||||||
|
#ifndef TRAFT_CONFIG_H
|
||||||
|
#define TRAFT_CONFIG_H
|
||||||
|
|
||||||
|
#ifdef __cplusplus
|
||||||
|
extern "C" {
|
||||||
|
#endif
|
||||||
|
|
||||||
|
#include <getopt.h>
|
||||||
|
#include <stdint.h>
|
||||||
|
#include "common.h"
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
char host[HOST_LEN];
|
||||||
|
uint16_t port;
|
||||||
|
} Addr;
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
Addr me;
|
||||||
|
char baseDir[BASE_DIR_LEN];
|
||||||
|
} RaftServerConfig;
|
||||||
|
|
||||||
|
void addrToString(const char *host, uint16_t port, char *addr, int len);
|
||||||
|
void parseAddr(const char *addr, char *host, int len, uint16_t *port);
|
||||||
|
int parseConf(int argc, char **argv, RaftServerConfig *pConf);
|
||||||
|
void printConf(RaftServerConfig *pConf);
|
||||||
|
|
||||||
|
#ifdef __cplusplus
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
|
#endif
|
|
@ -0,0 +1,202 @@
|
||||||
|
#include "console.h"
|
||||||
|
#include <stdio.h>
|
||||||
|
#include <stdlib.h>
|
||||||
|
#include <string.h>
|
||||||
|
#include "raftServer.h"
|
||||||
|
#include "util.h"
|
||||||
|
|
||||||
|
void printHelp() {
|
||||||
|
printf("---------------------\n");
|
||||||
|
printf("help: \n\n");
|
||||||
|
printf("create a vgroup with 3 replicas: \n");
|
||||||
|
printf("create vnode voter vid 100 peers 127.0.0.1:10001 127.0.0.1:10002 \n");
|
||||||
|
printf("create vnode voter vid 100 peers 127.0.0.1:10000 127.0.0.1:10002 \n");
|
||||||
|
printf("create vnode voter vid 100 peers 127.0.0.1:10000 127.0.0.1:10001 \n");
|
||||||
|
printf("\n");
|
||||||
|
printf("create a vgroup with only one replica: \n");
|
||||||
|
printf("create vnode voter vid 200 \n");
|
||||||
|
printf("\n");
|
||||||
|
printf("add vnode into vgroup: \n");
|
||||||
|
printf("create vnode spare vid 100 ---- run at 127.0.0.1:10003\n");
|
||||||
|
printf("join vnode vid 100 addr 127.0.0.1:10003 ---- run at leader of vgroup 100\n");
|
||||||
|
printf("\n");
|
||||||
|
printf("run \n");
|
||||||
|
printf("put 0 key value \n");
|
||||||
|
printf("get 0 key \n");
|
||||||
|
printf("---------------------\n");
|
||||||
|
}
|
||||||
|
|
||||||
|
void console(RaftServer *pRaftServer) {
|
||||||
|
while (1) {
|
||||||
|
int ret;
|
||||||
|
char cmdBuf[COMMAND_LEN];
|
||||||
|
memset(cmdBuf, 0, sizeof(cmdBuf));
|
||||||
|
printf("(console)> ");
|
||||||
|
char *retp = fgets(cmdBuf, COMMAND_LEN, stdin);
|
||||||
|
if (!retp) {
|
||||||
|
exit(-1);
|
||||||
|
}
|
||||||
|
|
||||||
|
int pos = strlen(cmdBuf);
|
||||||
|
if (cmdBuf[pos - 1] == '\n') {
|
||||||
|
cmdBuf[pos - 1] = '\0';
|
||||||
|
}
|
||||||
|
|
||||||
|
if (strncmp(cmdBuf, "", COMMAND_LEN) == 0) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
char cmds[MAX_CMD_COUNT][TOKEN_LEN];
|
||||||
|
memset(cmds, 0, sizeof(cmds));
|
||||||
|
|
||||||
|
int cmdCount;
|
||||||
|
cmdCount = splitString(cmdBuf, " ", cmds, MAX_CMD_COUNT);
|
||||||
|
|
||||||
|
if (strcmp(cmds[0], "create") == 0 && strcmp(cmds[1], "vnode") == 0 && strcmp(cmds[3], "vid") == 0) {
|
||||||
|
uint16_t vid;
|
||||||
|
sscanf(cmds[4], "%hu", &vid);
|
||||||
|
|
||||||
|
if (strcmp(cmds[2], "voter") == 0) {
|
||||||
|
char peers[MAX_PEERS_COUNT][ADDRESS_LEN];
|
||||||
|
memset(peers, 0, sizeof(peers));
|
||||||
|
uint32_t peersCount = 0;
|
||||||
|
|
||||||
|
if (strcmp(cmds[5], "peers") == 0 && cmdCount > 6) {
|
||||||
|
// create vnode voter vid 100 peers 127.0.0.1:10001 127.0.0.1:10002
|
||||||
|
for (int i = 6; i < cmdCount; ++i) {
|
||||||
|
snprintf(peers[i - 6], ADDRESS_LEN, "%s", cmds[i]);
|
||||||
|
peersCount++;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// create vnode voter vid 200
|
||||||
|
}
|
||||||
|
ret = addRaftVoter(pRaftServer, peers, peersCount, vid);
|
||||||
|
if (ret == 0) {
|
||||||
|
printf("create vnode voter ok \n");
|
||||||
|
} else {
|
||||||
|
printf("create vnode voter error \n");
|
||||||
|
}
|
||||||
|
} else if (strcmp(cmds[2], "spare") == 0) {
|
||||||
|
ret = addRaftSpare(pRaftServer, vid);
|
||||||
|
if (ret == 0) {
|
||||||
|
printf("create vnode spare ok \n");
|
||||||
|
} else {
|
||||||
|
printf("create vnode spare error \n");
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
printHelp();
|
||||||
|
}
|
||||||
|
|
||||||
|
} else if (strcmp(cmds[0], "join") == 0 && strcmp(cmds[1], "vnode") == 0 && strcmp(cmds[2], "vid") == 0 &&
|
||||||
|
strcmp(cmds[4], "addr") == 0 && cmdCount == 6) {
|
||||||
|
// join vnode vid 100 addr 127.0.0.1:10004
|
||||||
|
|
||||||
|
char * address = cmds[5];
|
||||||
|
char host[64];
|
||||||
|
uint16_t port;
|
||||||
|
parseAddr(address, host, sizeof(host), &port);
|
||||||
|
|
||||||
|
uint16_t vid;
|
||||||
|
sscanf(cmds[3], "%hu", &vid);
|
||||||
|
|
||||||
|
HashNode **pp = pRaftServer->raftInstances.find(&pRaftServer->raftInstances, vid);
|
||||||
|
if (*pp == NULL) {
|
||||||
|
printf("vid:%hu not found \n", vid);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
RaftInstance *pRaftInstance = (*pp)->data;
|
||||||
|
|
||||||
|
uint64_t destRaftId = encodeRaftId(host, port, vid);
|
||||||
|
|
||||||
|
struct raft_change *req = raft_malloc(sizeof(*req));
|
||||||
|
RaftJoin * pRaftJoin = raft_malloc(sizeof(*pRaftJoin));
|
||||||
|
pRaftJoin->r = &pRaftInstance->raft;
|
||||||
|
pRaftJoin->joinId = destRaftId;
|
||||||
|
req->data = pRaftJoin;
|
||||||
|
ret = raft_add(&pRaftInstance->raft, req, destRaftId, address, raftChangeAddCb);
|
||||||
|
if (ret != 0) {
|
||||||
|
printf("raft_add error: %s \n", raft_errmsg(&pRaftInstance->raft));
|
||||||
|
}
|
||||||
|
|
||||||
|
} else if (strcmp(cmds[0], "dropnode") == 0) {
|
||||||
|
} else if (strcmp(cmds[0], "state") == 0) {
|
||||||
|
pRaftServer->raftInstances.print(&pRaftServer->raftInstances);
|
||||||
|
for (size_t i = 0; i < pRaftServer->raftInstances.length; ++i) {
|
||||||
|
HashNode *ptr = pRaftServer->raftInstances.table[i];
|
||||||
|
if (ptr != NULL) {
|
||||||
|
while (ptr != NULL) {
|
||||||
|
RaftInstance *pRaftInstance = ptr->data;
|
||||||
|
printf("instance vid:%hu raftId:%llu \n", ptr->vgroupId, pRaftInstance->raftId);
|
||||||
|
printRaftState(&pRaftInstance->raft);
|
||||||
|
printf("\n");
|
||||||
|
ptr = ptr->next;
|
||||||
|
}
|
||||||
|
printf("\n");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
} else if (strcmp(cmds[0], "put") == 0 && cmdCount == 4) {
|
||||||
|
uint16_t vid;
|
||||||
|
sscanf(cmds[1], "%hu", &vid);
|
||||||
|
char * key = cmds[2];
|
||||||
|
char * value = cmds[3];
|
||||||
|
HashNode **pp = pRaftServer->raftInstances.find(&pRaftServer->raftInstances, vid);
|
||||||
|
if (*pp == NULL) {
|
||||||
|
printf("vid:%hu not found \n", vid);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
RaftInstance *pRaftInstance = (*pp)->data;
|
||||||
|
|
||||||
|
char *raftValue = malloc(TOKEN_LEN * 2 + 3);
|
||||||
|
snprintf(raftValue, TOKEN_LEN * 2 + 3, "%s--%s", key, value);
|
||||||
|
putValue(&pRaftInstance->raft, raftValue);
|
||||||
|
free(raftValue);
|
||||||
|
|
||||||
|
} else if (strcmp(cmds[0], "run") == 0) {
|
||||||
|
pthread_t tidRaftServer;
|
||||||
|
pthread_create(&tidRaftServer, NULL, startServerFunc, pRaftServer);
|
||||||
|
|
||||||
|
} else if (strcmp(cmds[0], "get") == 0 && cmdCount == 3) {
|
||||||
|
uint16_t vid;
|
||||||
|
sscanf(cmds[1], "%hu", &vid);
|
||||||
|
char * key = cmds[2];
|
||||||
|
HashNode **pp = pRaftServer->raftInstances.find(&pRaftServer->raftInstances, vid);
|
||||||
|
if (*pp == NULL) {
|
||||||
|
printf("vid:%hu not found \n", vid);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
RaftInstance * pRaftInstance = (*pp)->data;
|
||||||
|
SimpleHash * pKV = pRaftInstance->fsm.data;
|
||||||
|
SimpleHashNode **ppNode = pKV->find_cstr(pKV, key);
|
||||||
|
if (*ppNode == NULL) {
|
||||||
|
printf("key:%s not found \n", key);
|
||||||
|
} else {
|
||||||
|
printf("find key:%s value:%s \n", key, (char *)((*ppNode)->data));
|
||||||
|
}
|
||||||
|
|
||||||
|
} else if (strcmp(cmds[0], "transfer") == 0) {
|
||||||
|
} else if (strcmp(cmds[0], "state") == 0) {
|
||||||
|
} else if (strcmp(cmds[0], "snapshot") == 0) {
|
||||||
|
} else if (strcmp(cmds[0], "exit") == 0) {
|
||||||
|
exit(0);
|
||||||
|
|
||||||
|
} else if (strcmp(cmds[0], "quit") == 0) {
|
||||||
|
exit(0);
|
||||||
|
|
||||||
|
} else if (strcmp(cmds[0], "help") == 0) {
|
||||||
|
printHelp();
|
||||||
|
|
||||||
|
} else {
|
||||||
|
printf("unknown command: %s \n", cmdBuf);
|
||||||
|
printHelp();
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
printf("cmdBuf: [%s] \n", cmdBuf);
|
||||||
|
printf("cmdCount : %d \n", cmdCount);
|
||||||
|
for (int i = 0; i < MAX_CMD_COUNT; ++i) {
|
||||||
|
printf("cmd%d : %s \n", i, cmds[i]);
|
||||||
|
}
|
||||||
|
*/
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,19 @@
|
||||||
|
#ifndef TRAFT_CONSOLE_H
|
||||||
|
#define TRAFT_CONSOLE_H
|
||||||
|
|
||||||
|
#ifdef __cplusplus
|
||||||
|
extern "C" {
|
||||||
|
#endif
|
||||||
|
|
||||||
|
#include <getopt.h>
|
||||||
|
#include <stdint.h>
|
||||||
|
#include "common.h"
|
||||||
|
#include "raftServer.h"
|
||||||
|
|
||||||
|
void console(RaftServer *pRaftServer);
|
||||||
|
|
||||||
|
#ifdef __cplusplus
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
|
#endif
|
|
@ -0,0 +1,81 @@
|
||||||
|
#include <assert.h>
|
||||||
|
#include <getopt.h>
|
||||||
|
#include <pthread.h>
|
||||||
|
#include <raft.h>
|
||||||
|
#include <raft/uv.h>
|
||||||
|
#include <stdio.h>
|
||||||
|
#include <stdlib.h>
|
||||||
|
#include <string.h>
|
||||||
|
#include <sys/stat.h>
|
||||||
|
#include <sys/types.h>
|
||||||
|
#include <time.h>
|
||||||
|
#include <unistd.h>
|
||||||
|
#include "common.h"
|
||||||
|
#include "config.h"
|
||||||
|
#include "console.h"
|
||||||
|
#include "raftServer.h"
|
||||||
|
#include "simpleHash.h"
|
||||||
|
#include "util.h"
|
||||||
|
|
||||||
|
const char *exe_name;
|
||||||
|
|
||||||
|
void *startConsoleFunc(void *param) {
|
||||||
|
RaftServer *pRaftServer = (RaftServer *)param;
|
||||||
|
console(pRaftServer);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
void usage() {
|
||||||
|
printf("\nusage: \n");
|
||||||
|
printf("%s --addr=127.0.0.1:10000 --dir=./data \n", exe_name);
|
||||||
|
printf("\n");
|
||||||
|
}
|
||||||
|
|
||||||
|
RaftServerConfig gConfig;
|
||||||
|
RaftServer gRaftServer;
|
||||||
|
|
||||||
|
int main(int argc, char **argv) {
|
||||||
|
srand(time(NULL));
|
||||||
|
int32_t ret;
|
||||||
|
|
||||||
|
exe_name = argv[0];
|
||||||
|
if (argc < 3) {
|
||||||
|
usage();
|
||||||
|
exit(-1);
|
||||||
|
}
|
||||||
|
|
||||||
|
ret = parseConf(argc, argv, &gConfig);
|
||||||
|
if (ret != 0) {
|
||||||
|
usage();
|
||||||
|
exit(-1);
|
||||||
|
}
|
||||||
|
printConf(&gConfig);
|
||||||
|
|
||||||
|
if (!dirOK(gConfig.baseDir)) {
|
||||||
|
ret = mkdir(gConfig.baseDir, 0775);
|
||||||
|
if (ret != 0) {
|
||||||
|
fprintf(stderr, "mkdir error, %s \n", gConfig.baseDir);
|
||||||
|
exit(-1);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
ret = raftServerInit(&gRaftServer, &gConfig);
|
||||||
|
if (ret != 0) {
|
||||||
|
fprintf(stderr, "raftServerInit error \n");
|
||||||
|
exit(-1);
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
pthread_t tidRaftServer;
|
||||||
|
pthread_create(&tidRaftServer, NULL, startServerFunc, &gRaftServer);
|
||||||
|
*/
|
||||||
|
|
||||||
|
pthread_t tidConsole;
|
||||||
|
pthread_create(&tidConsole, NULL, startConsoleFunc, &gRaftServer);
|
||||||
|
|
||||||
|
while (1) {
|
||||||
|
sleep(10);
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
|
@ -0,0 +1,286 @@
|
||||||
|
#include "raftServer.h"
|
||||||
|
#include <stdlib.h>
|
||||||
|
#include <unistd.h>
|
||||||
|
#include "common.h"
|
||||||
|
#include "simpleHash.h"
|
||||||
|
#include "util.h"
|
||||||
|
|
||||||
|
void *startServerFunc(void *param) {
|
||||||
|
RaftServer *pRaftServer = (RaftServer *)param;
|
||||||
|
int32_t r = raftServerStart(pRaftServer);
|
||||||
|
assert(r == 0);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
void raftChangeAssignCb(struct raft_change *req, int status) {
|
||||||
|
struct raft *r = req->data;
|
||||||
|
if (status != 0) {
|
||||||
|
printf("raftChangeAssignCb error: %s \n", raft_errmsg(r));
|
||||||
|
} else {
|
||||||
|
printf("raftChangeAssignCb ok \n");
|
||||||
|
}
|
||||||
|
raft_free(req);
|
||||||
|
}
|
||||||
|
|
||||||
|
void raftChangeAddCb(struct raft_change *req, int status) {
|
||||||
|
RaftJoin *pRaftJoin = req->data;
|
||||||
|
if (status != 0) {
|
||||||
|
printf("raftChangeAddCb error: %s \n", raft_errmsg(pRaftJoin->r));
|
||||||
|
} else {
|
||||||
|
struct raft_change *req2 = raft_malloc(sizeof(*req2));
|
||||||
|
req2->data = pRaftJoin->r;
|
||||||
|
int ret = raft_assign(pRaftJoin->r, req2, pRaftJoin->joinId, RAFT_VOTER, raftChangeAssignCb);
|
||||||
|
if (ret != 0) {
|
||||||
|
printf("raftChangeAddCb error: %s \n", raft_errmsg(pRaftJoin->r));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
raft_free(req->data);
|
||||||
|
raft_free(req);
|
||||||
|
}
|
||||||
|
|
||||||
|
int fsmApplyCb(struct raft_fsm *pFsm, const struct raft_buffer *buf, void **result) {
|
||||||
|
// get fsm data
|
||||||
|
SimpleHash *sh = pFsm->data;
|
||||||
|
|
||||||
|
// get commit value
|
||||||
|
char *msg = (char *)buf->base;
|
||||||
|
printf("fsm apply: [%s] \n", msg);
|
||||||
|
char arr[2][TOKEN_LEN];
|
||||||
|
int r = splitString(msg, "--", arr, 2);
|
||||||
|
assert(r == 2);
|
||||||
|
|
||||||
|
// do the value on fsm
|
||||||
|
sh->insert_cstr(sh, arr[0], arr[1]);
|
||||||
|
|
||||||
|
raft_free(buf->base);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
void putValueCb(struct raft_apply *req, int status, void *result) {
|
||||||
|
struct raft *r = req->data;
|
||||||
|
if (status != 0) {
|
||||||
|
printf("putValueCb error: %s \n", raft_errmsg(r));
|
||||||
|
} else {
|
||||||
|
printf("putValueCb: %s \n", "ok");
|
||||||
|
}
|
||||||
|
raft_free(req);
|
||||||
|
}
|
||||||
|
|
||||||
|
void putValue(struct raft *r, const char *value) {
|
||||||
|
struct raft_buffer buf;
|
||||||
|
|
||||||
|
buf.len = strlen(value) + 1;
|
||||||
|
buf.base = raft_malloc(buf.len);
|
||||||
|
snprintf(buf.base, buf.len, "%s", value);
|
||||||
|
|
||||||
|
struct raft_apply *req = raft_malloc(sizeof(*req));
|
||||||
|
req->data = r;
|
||||||
|
int ret = raft_apply(r, req, &buf, 1, putValueCb);
|
||||||
|
if (ret == 0) {
|
||||||
|
printf("put %s \n", (char *)buf.base);
|
||||||
|
} else {
|
||||||
|
printf("put error: %s \n", raft_errmsg(r));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
const char *state2String(unsigned short state) {
|
||||||
|
if (state == RAFT_UNAVAILABLE) {
|
||||||
|
return "RAFT_UNAVAILABLE";
|
||||||
|
|
||||||
|
} else if (state == RAFT_FOLLOWER) {
|
||||||
|
return "RAFT_FOLLOWER";
|
||||||
|
|
||||||
|
} else if (state == RAFT_CANDIDATE) {
|
||||||
|
return "RAFT_CANDIDATE";
|
||||||
|
|
||||||
|
} else if (state == RAFT_LEADER) {
|
||||||
|
return "RAFT_LEADER";
|
||||||
|
}
|
||||||
|
return "UNKNOWN_RAFT_STATE";
|
||||||
|
}
|
||||||
|
|
||||||
|
void printRaftConfiguration(struct raft_configuration *c) {
|
||||||
|
printf("configuration: \n");
|
||||||
|
for (int i = 0; i < c->n; ++i) {
|
||||||
|
printf("%llu -- %d -- %s\n", c->servers[i].id, c->servers[i].role, c->servers[i].address);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void printRaftState(struct raft *r) {
|
||||||
|
printf("----Raft State: -----------\n");
|
||||||
|
printf("mem_addr: %p \n", r);
|
||||||
|
printf("my_id: %llu \n", r->id);
|
||||||
|
printf("address: %s \n", r->address);
|
||||||
|
printf("current_term: %llu \n", r->current_term);
|
||||||
|
printf("voted_for: %llu \n", r->voted_for);
|
||||||
|
printf("role: %s \n", state2String(r->state));
|
||||||
|
printf("commit_index: %llu \n", r->commit_index);
|
||||||
|
printf("last_applied: %llu \n", r->last_applied);
|
||||||
|
printf("last_stored: %llu \n", r->last_stored);
|
||||||
|
|
||||||
|
printf("configuration_index: %llu \n", r->configuration_index);
|
||||||
|
printf("configuration_uncommitted_index: %llu \n", r->configuration_uncommitted_index);
|
||||||
|
printRaftConfiguration(&r->configuration);
|
||||||
|
|
||||||
|
printf("----------------------------\n");
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t addRaftVoter(RaftServer *pRaftServer, char peers[][ADDRESS_LEN], uint32_t peersCount, uint16_t vid) {
|
||||||
|
int ret;
|
||||||
|
|
||||||
|
RaftInstance *pRaftInstance = malloc(sizeof(*pRaftInstance));
|
||||||
|
assert(pRaftInstance != NULL);
|
||||||
|
|
||||||
|
// init raftId
|
||||||
|
pRaftInstance->raftId = encodeRaftId(pRaftServer->host, pRaftServer->port, vid);
|
||||||
|
|
||||||
|
// init dir
|
||||||
|
snprintf(pRaftInstance->dir, sizeof(pRaftInstance->dir), "%s/%s_%hu_%hu_%llu", pRaftServer->baseDir,
|
||||||
|
pRaftServer->host, pRaftServer->port, vid, pRaftInstance->raftId);
|
||||||
|
|
||||||
|
if (!dirOK(pRaftInstance->dir)) {
|
||||||
|
ret = mkdir(pRaftInstance->dir, 0775);
|
||||||
|
if (ret != 0) {
|
||||||
|
fprintf(stderr, "mkdir error, %s \n", pRaftInstance->dir);
|
||||||
|
assert(0);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// init fsm
|
||||||
|
pRaftInstance->fsm.data = newSimpleHash(2);
|
||||||
|
pRaftInstance->fsm.apply = fsmApplyCb;
|
||||||
|
|
||||||
|
// init io
|
||||||
|
ret = raft_uv_init(&pRaftInstance->io, &pRaftServer->loop, pRaftInstance->dir, &pRaftServer->transport);
|
||||||
|
if (ret != 0) {
|
||||||
|
fprintf(stderr, "raft_uv_init error, %s \n", raft_errmsg(&pRaftInstance->raft));
|
||||||
|
assert(0);
|
||||||
|
}
|
||||||
|
|
||||||
|
// init raft
|
||||||
|
ret = raft_init(&pRaftInstance->raft, &pRaftInstance->io, &pRaftInstance->fsm, pRaftInstance->raftId,
|
||||||
|
pRaftServer->address);
|
||||||
|
if (ret != 0) {
|
||||||
|
fprintf(stderr, "raft_init error, %s \n", raft_errmsg(&pRaftInstance->raft));
|
||||||
|
assert(0);
|
||||||
|
}
|
||||||
|
|
||||||
|
// init raft_configuration
|
||||||
|
struct raft_configuration conf;
|
||||||
|
raft_configuration_init(&conf);
|
||||||
|
raft_configuration_add(&conf, pRaftInstance->raftId, pRaftServer->address, RAFT_VOTER);
|
||||||
|
for (int i = 0; i < peersCount; ++i) {
|
||||||
|
char * peerAddress = peers[i];
|
||||||
|
char host[64];
|
||||||
|
uint16_t port;
|
||||||
|
parseAddr(peerAddress, host, sizeof(host), &port);
|
||||||
|
uint64_t raftId = encodeRaftId(host, port, vid);
|
||||||
|
raft_configuration_add(&conf, raftId, peers[i], RAFT_VOTER);
|
||||||
|
}
|
||||||
|
raft_bootstrap(&pRaftInstance->raft, &conf);
|
||||||
|
|
||||||
|
// start raft
|
||||||
|
ret = raft_start(&pRaftInstance->raft);
|
||||||
|
if (ret != 0) {
|
||||||
|
fprintf(stderr, "raft_start error, %s \n", raft_errmsg(&pRaftInstance->raft));
|
||||||
|
assert(0);
|
||||||
|
}
|
||||||
|
|
||||||
|
// add raft instance into raft server
|
||||||
|
pRaftServer->raftInstances.insert(&pRaftServer->raftInstances, vid, pRaftInstance);
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t addRaftSpare(RaftServer *pRaftServer, uint16_t vid) {
|
||||||
|
int ret;
|
||||||
|
|
||||||
|
RaftInstance *pRaftInstance = malloc(sizeof(*pRaftInstance));
|
||||||
|
assert(pRaftInstance != NULL);
|
||||||
|
|
||||||
|
// init raftId
|
||||||
|
pRaftInstance->raftId = encodeRaftId(pRaftServer->host, pRaftServer->port, vid);
|
||||||
|
|
||||||
|
// init dir
|
||||||
|
snprintf(pRaftInstance->dir, sizeof(pRaftInstance->dir), "%s/%s_%hu_%hu_%llu", pRaftServer->baseDir,
|
||||||
|
pRaftServer->host, pRaftServer->port, vid, pRaftInstance->raftId);
|
||||||
|
ret = mkdir(pRaftInstance->dir, 0775);
|
||||||
|
if (ret != 0) {
|
||||||
|
fprintf(stderr, "mkdir error, %s \n", pRaftInstance->dir);
|
||||||
|
assert(0);
|
||||||
|
}
|
||||||
|
|
||||||
|
// init fsm
|
||||||
|
pRaftInstance->fsm.data = newSimpleHash(2);
|
||||||
|
pRaftInstance->fsm.apply = fsmApplyCb;
|
||||||
|
|
||||||
|
// init io
|
||||||
|
ret = raft_uv_init(&pRaftInstance->io, &pRaftServer->loop, pRaftInstance->dir, &pRaftServer->transport);
|
||||||
|
if (ret != 0) {
|
||||||
|
fprintf(stderr, "raft_uv_init error, %s \n", raft_errmsg(&pRaftInstance->raft));
|
||||||
|
assert(0);
|
||||||
|
}
|
||||||
|
|
||||||
|
// init raft
|
||||||
|
ret = raft_init(&pRaftInstance->raft, &pRaftInstance->io, &pRaftInstance->fsm, pRaftInstance->raftId,
|
||||||
|
pRaftServer->address);
|
||||||
|
if (ret != 0) {
|
||||||
|
fprintf(stderr, "raft_init error, %s \n", raft_errmsg(&pRaftInstance->raft));
|
||||||
|
assert(0);
|
||||||
|
}
|
||||||
|
|
||||||
|
// init raft_configuration
|
||||||
|
struct raft_configuration conf;
|
||||||
|
raft_configuration_init(&conf);
|
||||||
|
raft_configuration_add(&conf, pRaftInstance->raftId, pRaftServer->address, RAFT_SPARE);
|
||||||
|
raft_bootstrap(&pRaftInstance->raft, &conf);
|
||||||
|
|
||||||
|
// start raft
|
||||||
|
ret = raft_start(&pRaftInstance->raft);
|
||||||
|
if (ret != 0) {
|
||||||
|
fprintf(stderr, "raft_start error, %s \n", raft_errmsg(&pRaftInstance->raft));
|
||||||
|
assert(0);
|
||||||
|
}
|
||||||
|
|
||||||
|
// add raft instance into raft server
|
||||||
|
pRaftServer->raftInstances.insert(&pRaftServer->raftInstances, vid, pRaftInstance);
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t raftServerInit(RaftServer *pRaftServer, const RaftServerConfig *pConf) {
|
||||||
|
int ret;
|
||||||
|
|
||||||
|
// init host, port, address, dir
|
||||||
|
snprintf(pRaftServer->host, sizeof(pRaftServer->host), "%s", pConf->me.host);
|
||||||
|
pRaftServer->port = pConf->me.port;
|
||||||
|
snprintf(pRaftServer->address, sizeof(pRaftServer->address), "%s:%u", pRaftServer->host, pRaftServer->port);
|
||||||
|
snprintf(pRaftServer->baseDir, sizeof(pRaftServer->baseDir), "%s", pConf->baseDir);
|
||||||
|
|
||||||
|
// init loop
|
||||||
|
ret = uv_loop_init(&pRaftServer->loop);
|
||||||
|
if (ret != 0) {
|
||||||
|
fprintf(stderr, "uv_loop_init error: %s \n", uv_strerror(ret));
|
||||||
|
assert(0);
|
||||||
|
}
|
||||||
|
|
||||||
|
// init network
|
||||||
|
ret = raft_uv_tcp_init(&pRaftServer->transport, &pRaftServer->loop);
|
||||||
|
if (ret != 0) {
|
||||||
|
fprintf(stderr, "raft_uv_tcp_init: error %d \n", ret);
|
||||||
|
assert(0);
|
||||||
|
}
|
||||||
|
|
||||||
|
// init raft instance container
|
||||||
|
initIdHash(&pRaftServer->raftInstances, 2);
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t raftServerStart(RaftServer *pRaftServer) {
|
||||||
|
// start loop
|
||||||
|
uv_run(&pRaftServer->loop, UV_RUN_DEFAULT);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
void raftServerStop(RaftServer *pRaftServer) {}
|
|
@ -0,0 +1,66 @@
|
||||||
|
#ifndef TDENGINE_RAFT_SERVER_H
|
||||||
|
#define TDENGINE_RAFT_SERVER_H
|
||||||
|
|
||||||
|
#ifdef __cplusplus
|
||||||
|
extern "C" {
|
||||||
|
#endif
|
||||||
|
|
||||||
|
#include <arpa/inet.h>
|
||||||
|
#include <assert.h>
|
||||||
|
#include <netinet/in.h>
|
||||||
|
#include <string.h>
|
||||||
|
#include "common.h"
|
||||||
|
#include "config.h"
|
||||||
|
#include "raft.h"
|
||||||
|
#include "raft/uv.h"
|
||||||
|
#include "simpleHash.h"
|
||||||
|
|
||||||
|
typedef struct RaftJoin {
|
||||||
|
struct raft *r;
|
||||||
|
raft_id joinId;
|
||||||
|
} RaftJoin;
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
raft_id raftId;
|
||||||
|
char dir[BASE_DIR_LEN * 2];
|
||||||
|
struct raft_fsm fsm;
|
||||||
|
struct raft_io io;
|
||||||
|
struct raft raft;
|
||||||
|
} RaftInstance;
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
char host[HOST_LEN];
|
||||||
|
uint16_t port;
|
||||||
|
char address[ADDRESS_LEN]; /* Raft instance address */
|
||||||
|
char baseDir[BASE_DIR_LEN]; /* Raft instance address */
|
||||||
|
|
||||||
|
struct uv_loop_s loop; /* UV loop */
|
||||||
|
struct raft_uv_transport transport; /* UV I/O backend transport */
|
||||||
|
|
||||||
|
IdHash raftInstances; /* multi raft instances. traft use IdHash to manager multi vgroup inside, here we can use IdHash
|
||||||
|
too. */
|
||||||
|
} RaftServer;
|
||||||
|
|
||||||
|
void * startServerFunc(void *param);
|
||||||
|
int32_t addRaftVoter(RaftServer *pRaftServer, char peers[][ADDRESS_LEN], uint32_t peersCount, uint16_t vid);
|
||||||
|
int32_t addRaftSpare(RaftServer *pRaftServer, uint16_t vid);
|
||||||
|
|
||||||
|
int32_t raftServerInit(RaftServer *pRaftServer, const RaftServerConfig *pConf);
|
||||||
|
int32_t raftServerStart(RaftServer *pRaftServer);
|
||||||
|
void raftServerStop(RaftServer *pRaftServer);
|
||||||
|
|
||||||
|
int fsmApplyCb(struct raft_fsm *pFsm, const struct raft_buffer *buf, void **result);
|
||||||
|
void putValueCb(struct raft_apply *req, int status, void *result);
|
||||||
|
void putValue(struct raft *r, const char *value);
|
||||||
|
|
||||||
|
void raftChangeAddCb(struct raft_change *req, int status);
|
||||||
|
|
||||||
|
const char *state2String(unsigned short state);
|
||||||
|
void printRaftConfiguration(struct raft_configuration *c);
|
||||||
|
void printRaftState(struct raft *r);
|
||||||
|
|
||||||
|
#ifdef __cplusplus
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
|
#endif // TDENGINE_RAFT_SERVER_H
|
|
@ -0,0 +1,218 @@
|
||||||
|
#include "simpleHash.h"
|
||||||
|
|
||||||
|
uint32_t mySimpleHash(const char* data, size_t n, uint32_t seed) {
|
||||||
|
// Similar to murmur hash
|
||||||
|
const uint32_t m = 0xc6a4a793;
|
||||||
|
const uint32_t r = 24;
|
||||||
|
const char* limit = data + n;
|
||||||
|
uint32_t h = seed ^ (n * m);
|
||||||
|
|
||||||
|
// Pick up four bytes at a time
|
||||||
|
while (data + 4 <= limit) {
|
||||||
|
// uint32_t w = DecodeFixed32(data);
|
||||||
|
uint32_t w;
|
||||||
|
memcpy(&w, data, 4);
|
||||||
|
|
||||||
|
data += 4;
|
||||||
|
h += w;
|
||||||
|
h *= m;
|
||||||
|
h ^= (h >> 16);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Pick up remaining bytes
|
||||||
|
switch (limit - data) {
|
||||||
|
case 3:
|
||||||
|
h += (unsigned char)(data[2]) << 16;
|
||||||
|
do {
|
||||||
|
} while (0);
|
||||||
|
case 2:
|
||||||
|
h += (unsigned char)(data[1]) << 8;
|
||||||
|
do {
|
||||||
|
} while (0);
|
||||||
|
case 1:
|
||||||
|
h += (unsigned char)(data[0]);
|
||||||
|
h *= m;
|
||||||
|
h ^= (h >> r);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
return h;
|
||||||
|
}
|
||||||
|
|
||||||
|
int insertCStrSimpleHash(struct SimpleHash* ths, char* key, char* data) {
|
||||||
|
return insertSimpleHash(ths, key, strlen(key) + 1, data, strlen(data) + 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
int removeCStrSimpleHash(struct SimpleHash* ths, char* key) { return removeSimpleHash(ths, key, strlen(key) + 1); }
|
||||||
|
|
||||||
|
SimpleHashNode** findCStrSimpleHash(struct SimpleHash* ths, char* key) {
|
||||||
|
return findSimpleHash(ths, key, strlen(key) + 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
int insertSimpleHash(struct SimpleHash* ths, char* key, size_t keyLen, char* data, size_t dataLen) {
|
||||||
|
SimpleHashNode** pp = ths->find(ths, key, keyLen);
|
||||||
|
if (*pp != NULL) {
|
||||||
|
fprintf(stderr, "insertSimpleHash, already has key \n");
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
SimpleHashNode* node = malloc(sizeof(*node));
|
||||||
|
node->hashCode = ths->hashFunc(key, keyLen);
|
||||||
|
node->key = malloc(keyLen);
|
||||||
|
node->keyLen = keyLen;
|
||||||
|
memcpy(node->key, key, keyLen);
|
||||||
|
node->data = malloc(dataLen);
|
||||||
|
node->dataLen = dataLen;
|
||||||
|
memcpy(node->data, data, dataLen);
|
||||||
|
node->next = NULL;
|
||||||
|
|
||||||
|
// printf("insertSimpleHash: <%s, %ld, %s, %ld, %u> \n", node->key, node->keyLen, node->data, node->dataLen,
|
||||||
|
// node->hashCode);
|
||||||
|
|
||||||
|
size_t index = node->hashCode & (ths->length - 1);
|
||||||
|
|
||||||
|
SimpleHashNode* ptr = ths->table[index];
|
||||||
|
if (ptr != NULL) {
|
||||||
|
node->next = ptr;
|
||||||
|
ths->table[index] = node;
|
||||||
|
|
||||||
|
} else {
|
||||||
|
ths->table[index] = node;
|
||||||
|
}
|
||||||
|
ths->elems++;
|
||||||
|
if (ths->elems > 2 * ths->length) {
|
||||||
|
ths->resize(ths);
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int removeSimpleHash(struct SimpleHash* ths, char* key, size_t keyLen) {
|
||||||
|
SimpleHashNode** pp = ths->find(ths, key, keyLen);
|
||||||
|
if (*pp == NULL) {
|
||||||
|
fprintf(stderr, "removeSimpleHash, key not exist \n");
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
SimpleHashNode* del = *pp;
|
||||||
|
*pp = del->next;
|
||||||
|
free(del->key);
|
||||||
|
free(del->data);
|
||||||
|
free(del);
|
||||||
|
ths->elems--;
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
SimpleHashNode** findSimpleHash(struct SimpleHash* ths, char* key, size_t keyLen) {
|
||||||
|
uint32_t hashCode = ths->hashFunc(key, keyLen);
|
||||||
|
// size_t index = hashCode % ths->length;
|
||||||
|
size_t index = hashCode & (ths->length - 1);
|
||||||
|
|
||||||
|
// printf("findSimpleHash: %s %ld %u \n", key, keyLen, hashCode);
|
||||||
|
|
||||||
|
SimpleHashNode** pp = &(ths->table[index]);
|
||||||
|
while (*pp != NULL && ((*pp)->hashCode != hashCode || memcmp(key, (*pp)->key, keyLen) != 0)) {
|
||||||
|
pp = &((*pp)->next);
|
||||||
|
}
|
||||||
|
|
||||||
|
return pp;
|
||||||
|
}
|
||||||
|
|
||||||
|
void printCStrSimpleHash(struct SimpleHash* ths) {
|
||||||
|
printf("\n--- printCStrSimpleHash: elems:%d length:%d \n", ths->elems, ths->length);
|
||||||
|
for (size_t i = 0; i < ths->length; ++i) {
|
||||||
|
SimpleHashNode* ptr = ths->table[i];
|
||||||
|
if (ptr != NULL) {
|
||||||
|
printf("%zu: ", i);
|
||||||
|
while (ptr != NULL) {
|
||||||
|
printf("<%u, %s, %ld, %s, %ld> ", ptr->hashCode, (char*)ptr->key, ptr->keyLen, (char*)ptr->data, ptr->dataLen);
|
||||||
|
ptr = ptr->next;
|
||||||
|
}
|
||||||
|
printf("\n");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
printf("---------------\n");
|
||||||
|
}
|
||||||
|
|
||||||
|
void destroySimpleHash(struct SimpleHash* ths) {
|
||||||
|
for (size_t i = 0; i < ths->length; ++i) {
|
||||||
|
SimpleHashNode* ptr = ths->table[i];
|
||||||
|
while (ptr != NULL) {
|
||||||
|
SimpleHashNode* tmp = ptr;
|
||||||
|
ptr = ptr->next;
|
||||||
|
free(tmp->key);
|
||||||
|
free(tmp->data);
|
||||||
|
free(tmp);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
ths->length = 0;
|
||||||
|
ths->elems = 0;
|
||||||
|
free(ths->table);
|
||||||
|
free(ths);
|
||||||
|
}
|
||||||
|
|
||||||
|
void resizeSimpleHash(struct SimpleHash* ths) {
|
||||||
|
uint32_t new_length = ths->length;
|
||||||
|
while (new_length < ths->elems) {
|
||||||
|
new_length *= 2;
|
||||||
|
}
|
||||||
|
|
||||||
|
printf("resizeSimpleHash: %p from %u to %u \n", ths, ths->length, new_length);
|
||||||
|
|
||||||
|
SimpleHashNode** new_table = malloc(new_length * sizeof(SimpleHashNode*));
|
||||||
|
memset(new_table, 0, new_length * sizeof(SimpleHashNode*));
|
||||||
|
|
||||||
|
uint32_t count = 0;
|
||||||
|
for (uint32_t i = 0; i < ths->length; i++) {
|
||||||
|
if (ths->table[i] == NULL) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
SimpleHashNode* it = ths->table[i];
|
||||||
|
while (it != NULL) {
|
||||||
|
SimpleHashNode* move_node = it;
|
||||||
|
it = it->next;
|
||||||
|
|
||||||
|
// move move_node
|
||||||
|
move_node->next = NULL;
|
||||||
|
size_t index = move_node->hashCode & (new_length - 1);
|
||||||
|
|
||||||
|
SimpleHashNode* ptr = new_table[index];
|
||||||
|
if (ptr != NULL) {
|
||||||
|
move_node->next = ptr;
|
||||||
|
new_table[index] = move_node;
|
||||||
|
} else {
|
||||||
|
new_table[index] = move_node;
|
||||||
|
}
|
||||||
|
count++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
assert(ths->elems == count);
|
||||||
|
free(ths->table);
|
||||||
|
ths->table = new_table;
|
||||||
|
ths->length = new_length;
|
||||||
|
}
|
||||||
|
|
||||||
|
uint32_t simpleHashFunc(const char* key, size_t keyLen) { return mySimpleHash(key, keyLen, 1); }
|
||||||
|
|
||||||
|
struct SimpleHash* newSimpleHash(size_t length) {
|
||||||
|
struct SimpleHash* ths = malloc(sizeof(*ths));
|
||||||
|
|
||||||
|
ths->length = length;
|
||||||
|
ths->elems = 0;
|
||||||
|
ths->table = malloc(length * sizeof(SimpleHashNode*));
|
||||||
|
memset(ths->table, 0, length * sizeof(SimpleHashNode*));
|
||||||
|
|
||||||
|
ths->insert = insertSimpleHash;
|
||||||
|
ths->remove = removeSimpleHash;
|
||||||
|
ths->find = findSimpleHash;
|
||||||
|
ths->insert_cstr = insertCStrSimpleHash;
|
||||||
|
ths->remove_cstr = removeCStrSimpleHash;
|
||||||
|
ths->find_cstr = findCStrSimpleHash;
|
||||||
|
ths->print_cstr = printCStrSimpleHash;
|
||||||
|
ths->destroy = destroySimpleHash;
|
||||||
|
ths->resize = resizeSimpleHash;
|
||||||
|
ths->hashFunc = simpleHashFunc;
|
||||||
|
}
|
|
@ -0,0 +1,61 @@
|
||||||
|
#ifndef __SIMPLE_HASH_H__
|
||||||
|
#define __SIMPLE_HASH_H__
|
||||||
|
|
||||||
|
#include <assert.h>
|
||||||
|
#include <stdint.h>
|
||||||
|
#include <stdio.h>
|
||||||
|
#include <stdlib.h>
|
||||||
|
#include <string.h>
|
||||||
|
|
||||||
|
uint32_t mySimpleHash(const char* data, size_t n, uint32_t seed);
|
||||||
|
|
||||||
|
typedef struct SimpleHashNode {
|
||||||
|
uint32_t hashCode;
|
||||||
|
void* key;
|
||||||
|
size_t keyLen;
|
||||||
|
void* data;
|
||||||
|
size_t dataLen;
|
||||||
|
struct SimpleHashNode* next;
|
||||||
|
} SimpleHashNode;
|
||||||
|
|
||||||
|
typedef struct SimpleHash {
|
||||||
|
// public:
|
||||||
|
|
||||||
|
int (*insert)(struct SimpleHash* ths, char* key, size_t keyLen, char* data, size_t dataLen);
|
||||||
|
int (*remove)(struct SimpleHash* ths, char* key, size_t keyLen);
|
||||||
|
SimpleHashNode** (*find)(struct SimpleHash* ths, char* key, size_t keyLen);
|
||||||
|
|
||||||
|
// wrapper
|
||||||
|
int (*insert_cstr)(struct SimpleHash* ths, char* key, char* data);
|
||||||
|
int (*remove_cstr)(struct SimpleHash* ths, char* key);
|
||||||
|
SimpleHashNode** (*find_cstr)(struct SimpleHash* ths, char* key);
|
||||||
|
|
||||||
|
void (*print_cstr)(struct SimpleHash* ths);
|
||||||
|
void (*destroy)(struct SimpleHash* ths);
|
||||||
|
|
||||||
|
uint32_t length;
|
||||||
|
uint32_t elems;
|
||||||
|
|
||||||
|
// private:
|
||||||
|
void (*resize)(struct SimpleHash* ths);
|
||||||
|
uint32_t (*hashFunc)(const char* key, size_t keyLen);
|
||||||
|
|
||||||
|
SimpleHashNode** table;
|
||||||
|
|
||||||
|
} SimpleHash;
|
||||||
|
|
||||||
|
int insertCStrSimpleHash(struct SimpleHash* ths, char* key, char* data);
|
||||||
|
int removeCStrSimpleHash(struct SimpleHash* ths, char* key);
|
||||||
|
SimpleHashNode** findCStrSimpleHash(struct SimpleHash* ths, char* key);
|
||||||
|
void printCStrSimpleHash(struct SimpleHash* ths);
|
||||||
|
|
||||||
|
int insertSimpleHash(struct SimpleHash* ths, char* key, size_t keyLen, char* data, size_t dataLen);
|
||||||
|
int removeSimpleHash(struct SimpleHash* ths, char* key, size_t keyLen);
|
||||||
|
SimpleHashNode** findSimpleHash(struct SimpleHash* ths, char* key, size_t keyLen);
|
||||||
|
void destroySimpleHash(struct SimpleHash* ths);
|
||||||
|
void resizeSimpleHash(struct SimpleHash* ths);
|
||||||
|
uint32_t simpleHashFunc(const char* key, size_t keyLen);
|
||||||
|
|
||||||
|
struct SimpleHash* newSimpleHash(size_t length);
|
||||||
|
|
||||||
|
#endif
|
|
@ -0,0 +1,45 @@
|
||||||
|
#include "util.h"
|
||||||
|
#include <dirent.h>
|
||||||
|
#include <stdlib.h>
|
||||||
|
#include <string.h>
|
||||||
|
|
||||||
|
int dirOK(const char *path) {
|
||||||
|
DIR *dir = opendir(path);
|
||||||
|
if (dir != NULL) {
|
||||||
|
closedir(dir);
|
||||||
|
return 1;
|
||||||
|
} else {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
int splitString(const char *str, char *separator, char (*arr)[TOKEN_LEN], int n_arr) {
|
||||||
|
if (n_arr <= 0) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
char *tmp = (char *)malloc(strlen(str) + 1);
|
||||||
|
strcpy(tmp, str);
|
||||||
|
char *context;
|
||||||
|
int n = 0;
|
||||||
|
|
||||||
|
char *token = strtok_r(tmp, separator, &context);
|
||||||
|
if (!token) {
|
||||||
|
goto ret;
|
||||||
|
}
|
||||||
|
strncpy(arr[n], token, TOKEN_LEN);
|
||||||
|
n++;
|
||||||
|
|
||||||
|
while (1) {
|
||||||
|
token = strtok_r(NULL, separator, &context);
|
||||||
|
if (!token || n >= n_arr) {
|
||||||
|
goto ret;
|
||||||
|
}
|
||||||
|
strncpy(arr[n], token, TOKEN_LEN);
|
||||||
|
n++;
|
||||||
|
}
|
||||||
|
|
||||||
|
ret:
|
||||||
|
free(tmp);
|
||||||
|
return n;
|
||||||
|
}
|
|
@ -0,0 +1,17 @@
|
||||||
|
#ifndef TRAFT_UTIL_H
|
||||||
|
#define TRAFT_UTIL_H
|
||||||
|
|
||||||
|
#ifdef __cplusplus
|
||||||
|
extern "C" {
|
||||||
|
#endif
|
||||||
|
|
||||||
|
#include "common.h"
|
||||||
|
|
||||||
|
int dirOK(const char *path);
|
||||||
|
int splitString(const char *str, char *separator, char (*arr)[TOKEN_LEN], int n_arr);
|
||||||
|
|
||||||
|
#ifdef __cplusplus
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
|
#endif
|
|
@ -0,0 +1,7 @@
|
||||||
|
add_executable(rebalanceLeader "")
|
||||||
|
target_sources(rebalanceLeader
|
||||||
|
PRIVATE
|
||||||
|
"raftMain.c"
|
||||||
|
"raftServer.c"
|
||||||
|
)
|
||||||
|
target_link_libraries(rebalanceLeader PUBLIC traft lz4 uv_a)
|
|
@ -0,0 +1,4 @@
|
||||||
|
#!/bin/bash
|
||||||
|
|
||||||
|
rm -rf 127.0.0.1*
|
||||||
|
rm -rf ./data
|
|
@ -60,9 +60,9 @@ void raftTransferCb(struct raft_transfer *req) {
|
||||||
SRaftServer *pRaftServer = req->data;
|
SRaftServer *pRaftServer = req->data;
|
||||||
raft_free(req);
|
raft_free(req);
|
||||||
|
|
||||||
printf("raftTransferCb: \n");
|
//printf("raftTransferCb: \n");
|
||||||
updateLeaderStates(pRaftServer);
|
updateLeaderStates(pRaftServer);
|
||||||
printLeaderCount();
|
//printLeaderCount();
|
||||||
|
|
||||||
int myLeaderCount;
|
int myLeaderCount;
|
||||||
for (int i = 0; i < NODE_COUNT; ++i) {
|
for (int i = 0; i < NODE_COUNT; ++i) {
|
||||||
|
@ -71,12 +71,13 @@ void raftTransferCb(struct raft_transfer *req) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
printf("myLeaderCount:%d waterLevel:%d \n", myLeaderCount, pRaftServer->instanceCount / NODE_COUNT);
|
//printf("myLeaderCount:%d waterLevel:%d \n", myLeaderCount, pRaftServer->instanceCount / NODE_COUNT);
|
||||||
if (myLeaderCount > pRaftServer->instanceCount / NODE_COUNT) {
|
if (myLeaderCount > pRaftServer->instanceCount / NODE_COUNT) {
|
||||||
struct raft *r;
|
struct raft *r;
|
||||||
for (int j = 0; j < pRaftServer->instanceCount; ++j) {
|
for (int j = 0; j < pRaftServer->instanceCount; ++j) {
|
||||||
if (pRaftServer->instance[j].raft.state == RAFT_LEADER) {
|
if (pRaftServer->instance[j].raft.state == RAFT_LEADER) {
|
||||||
r = &pRaftServer->instance[j].raft;
|
r = &pRaftServer->instance[j].raft;
|
||||||
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -87,17 +88,25 @@ void raftTransferCb(struct raft_transfer *req) {
|
||||||
int minIndex = -1;
|
int minIndex = -1;
|
||||||
int minLeaderCount = myLeaderCount;
|
int minLeaderCount = myLeaderCount;
|
||||||
for (int j = 0; j < NODE_COUNT; ++j) {
|
for (int j = 0; j < NODE_COUNT; ++j) {
|
||||||
if (strcmp(leaderStates[j].address, pRaftServer->address) == 0) continue;
|
if (strcmp(leaderStates[j].address, pRaftServer->address) == 0) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
if (leaderStates[j].leaderCount <= minLeaderCount) {
|
if (leaderStates[j].leaderCount <= minLeaderCount) {
|
||||||
|
minLeaderCount = leaderStates[j].leaderCount;
|
||||||
minIndex = j;
|
minIndex = j;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
char myHost[48];
|
char myHost[48];
|
||||||
uint16_t myPort;
|
uint16_t myPort;
|
||||||
uint16_t myVid;
|
uint16_t myVid;
|
||||||
decodeRaftId(r->id, myHost, sizeof(myHost), &myPort, &myVid);
|
decodeRaftId(r->id, myHost, sizeof(myHost), &myPort, &myVid);
|
||||||
|
|
||||||
|
|
||||||
|
//printf("raftTransferCb transfer leader: vid[%u] choose: index:%d, leaderStates[%d].address:%s, leaderStates[%d].leaderCount:%d \n", minIndex, minIndex, leaderStates[minIndex].address, minIndex, leaderStates[minIndex].leaderCount);
|
||||||
|
|
||||||
char *destAddress = leaderStates[minIndex].address;
|
char *destAddress = leaderStates[minIndex].address;
|
||||||
|
|
||||||
char tokens[MAX_PEERS][MAX_TOKEN_LEN];
|
char tokens[MAX_PEERS][MAX_TOKEN_LEN];
|
||||||
|
@ -106,6 +115,9 @@ void raftTransferCb(struct raft_transfer *req) {
|
||||||
uint16_t destPort = atoi(tokens[1]);
|
uint16_t destPort = atoi(tokens[1]);
|
||||||
destRaftId = encodeRaftId(destHost, destPort, myVid);
|
destRaftId = encodeRaftId(destHost, destPort, myVid);
|
||||||
|
|
||||||
|
printf("\nraftTransferCb transfer leader: vgroupId:%u from:%s:%u --> to:%s:%u ", myVid, myHost, myPort, destHost, destPort);
|
||||||
|
fflush(stdout);
|
||||||
|
|
||||||
raft_transfer(r, transfer, destRaftId, raftTransferCb);
|
raft_transfer(r, transfer, destRaftId, raftTransferCb);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -252,7 +264,6 @@ const char* state2String(unsigned short state) {
|
||||||
|
|
||||||
|
|
||||||
void printRaftState2(struct raft *r) {
|
void printRaftState2(struct raft *r) {
|
||||||
|
|
||||||
char leaderAddress[128];
|
char leaderAddress[128];
|
||||||
memset(leaderAddress, 0, sizeof(leaderAddress));
|
memset(leaderAddress, 0, sizeof(leaderAddress));
|
||||||
|
|
||||||
|
@ -350,6 +361,7 @@ void console(SRaftServer *pRaftServer) {
|
||||||
while (1) {
|
while (1) {
|
||||||
char cmd_buf[COMMAND_LEN];
|
char cmd_buf[COMMAND_LEN];
|
||||||
memset(cmd_buf, 0, sizeof(cmd_buf));
|
memset(cmd_buf, 0, sizeof(cmd_buf));
|
||||||
|
printf("(console)> ");
|
||||||
char *ret = fgets(cmd_buf, COMMAND_LEN, stdin);
|
char *ret = fgets(cmd_buf, COMMAND_LEN, stdin);
|
||||||
if (!ret) {
|
if (!ret) {
|
||||||
exit(-1);
|
exit(-1);
|
||||||
|
@ -403,7 +415,10 @@ void console(SRaftServer *pRaftServer) {
|
||||||
} else if (strcmp(cmd, "dropnode") == 0) {
|
} else if (strcmp(cmd, "dropnode") == 0) {
|
||||||
printf("not support \n");
|
printf("not support \n");
|
||||||
|
|
||||||
} else if (strcmp(cmd, "rebalance") == 0) {
|
} else if (strcmp(cmd, "quit") == 0 || strcmp(cmd, "exit") == 0) {
|
||||||
|
exit(0);
|
||||||
|
|
||||||
|
} else if (strcmp(cmd, "rebalance") == 0 && strcmp(param1, "leader") == 0) {
|
||||||
|
|
||||||
/*
|
/*
|
||||||
updateLeaderStates(pRaftServer);
|
updateLeaderStates(pRaftServer);
|
||||||
|
@ -511,10 +526,14 @@ void console(SRaftServer *pRaftServer) {
|
||||||
printRaftState(&pRaftServer->instance[i].raft);
|
printRaftState(&pRaftServer->instance[i].raft);
|
||||||
}
|
}
|
||||||
|
|
||||||
} else if (strcmp(cmd, "state2") == 0) {
|
} else if (strcmp(cmd, "leader") == 0 && strcmp(param1, "state") == 0) {
|
||||||
|
updateLeaderStates(pRaftServer);
|
||||||
|
printf("\n--------------------------------------------\n");
|
||||||
|
printLeaderCount();
|
||||||
for (int i = 0; i < pRaftServer->instanceCount; ++i) {
|
for (int i = 0; i < pRaftServer->instanceCount; ++i) {
|
||||||
printRaftState2(&pRaftServer->instance[i].raft);
|
printRaftState2(&pRaftServer->instance[i].raft);
|
||||||
}
|
}
|
||||||
|
printf("--------------------------------------------\n");
|
||||||
|
|
||||||
} else if (strcmp(cmd, "snapshot") == 0) {
|
} else if (strcmp(cmd, "snapshot") == 0) {
|
||||||
printf("not support \n");
|
printf("not support \n");
|
|
@ -3,32 +3,34 @@
|
||||||
#include "common.h"
|
#include "common.h"
|
||||||
#include "raftServer.h"
|
#include "raftServer.h"
|
||||||
|
|
||||||
char *keys;
|
//char *keys = malloc(MAX_RECORD_COUNT * MAX_KV_LEN);;
|
||||||
char *values;
|
//char *values = malloc(MAX_RECORD_COUNT * MAX_KV_LEN);
|
||||||
|
|
||||||
|
|
||||||
|
char keys[MAX_KV_LEN][MAX_RECORD_COUNT];
|
||||||
|
char values[MAX_KV_LEN][MAX_RECORD_COUNT];
|
||||||
|
int writeIndex = 0;
|
||||||
|
|
||||||
void initStore() {
|
void initStore() {
|
||||||
keys = malloc(MAX_RECORD_COUNT * MAX_KV_LEN);
|
|
||||||
values = malloc(MAX_RECORD_COUNT * MAX_KV_LEN);
|
|
||||||
writeIndex = 0;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void destroyStore() {
|
void destroyStore() {
|
||||||
free(keys);
|
//free(keys);
|
||||||
free(values);
|
//free(values);
|
||||||
}
|
}
|
||||||
|
|
||||||
void putKV(const char *key, const char *value) {
|
void putKV(const char *key, const char *value) {
|
||||||
if (writeIndex < MAX_RECORD_COUNT) {
|
if (writeIndex < MAX_RECORD_COUNT) {
|
||||||
strncpy(&keys[writeIndex], key, MAX_KV_LEN);
|
strncpy(keys[writeIndex], key, MAX_KV_LEN);
|
||||||
strncpy(&values[writeIndex], value, MAX_KV_LEN);
|
strncpy(values[writeIndex], value, MAX_KV_LEN);
|
||||||
writeIndex++;
|
writeIndex++;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
char *getKV(const char *key) {
|
char *getKV(const char *key) {
|
||||||
for (int i = 0; i < MAX_RECORD_COUNT; ++i) {
|
for (int i = 0; i < MAX_RECORD_COUNT; ++i) {
|
||||||
if (strcmp(&keys[i], key) == 0) {
|
if (strcmp(keys[i], key) == 0) {
|
||||||
return &values[i];
|
return values[i];
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return NULL;
|
return NULL;
|
|
@ -15,11 +15,13 @@ extern "C" {
|
||||||
|
|
||||||
|
|
||||||
// simulate a db store, just for test
|
// simulate a db store, just for test
|
||||||
#define MAX_KV_LEN 100
|
#define MAX_KV_LEN 20
|
||||||
#define MAX_RECORD_COUNT 500
|
#define MAX_RECORD_COUNT 16
|
||||||
char *keys;
|
|
||||||
char *values;
|
|
||||||
int writeIndex;
|
//char *keys;
|
||||||
|
//char *values;
|
||||||
|
//int writeIndex;
|
||||||
|
|
||||||
void initStore();
|
void initStore();
|
||||||
void destroyStore();
|
void destroyStore();
|
Loading…
Reference in New Issue