diff --git a/contrib/test/craft/raftServer.h b/contrib/test/craft/raftServer.h index 5fccde6bf2..f4087cf1a9 100644 --- a/contrib/test/craft/raftServer.h +++ b/contrib/test/craft/raftServer.h @@ -48,10 +48,11 @@ int32_t raftServerInit(SRaftServer *pRaftServer, const SRaftServerConfig *pConf, int32_t raftServerStart(SRaftServer *pRaftServer); void raftServerClose(SRaftServer *pRaftServer); - int initFsm(struct raft_fsm *fsm); - +const char* state2String(unsigned short state); +void printRaftConfiguration(struct raft_configuration *c); +void printRaftState(struct raft *r); #ifdef __cplusplus diff --git a/contrib/test/traft/CMakeLists.txt b/contrib/test/traft/CMakeLists.txt index e29fea04f1..d165c6d4dc 100644 --- a/contrib/test/traft/CMakeLists.txt +++ b/contrib/test/traft/CMakeLists.txt @@ -1,7 +1,3 @@ -add_executable(raftMain "") -target_sources(raftMain - PRIVATE - "raftMain.c" - "raftServer.c" -) -target_link_libraries(raftMain PUBLIC traft lz4 uv_a) +add_subdirectory(rebalance_leader) +add_subdirectory(make_cluster) + diff --git a/contrib/test/traft/make_cluster/CMakeLists.txt b/contrib/test/traft/make_cluster/CMakeLists.txt new file mode 100644 index 0000000000..afd19d5435 --- /dev/null +++ b/contrib/test/traft/make_cluster/CMakeLists.txt @@ -0,0 +1,11 @@ +add_executable(makeCluster "") +target_sources(makeCluster + PRIVATE + "raftMain.c" + "raftServer.c" + "config.c" + "console.c" + "simpleHash.c" + "util.c" +) +target_link_libraries(makeCluster PUBLIC traft lz4 uv_a) diff --git a/contrib/test/traft/clear.sh b/contrib/test/traft/make_cluster/clear.sh similarity index 100% rename from contrib/test/traft/clear.sh rename to contrib/test/traft/make_cluster/clear.sh diff --git a/contrib/test/traft/make_cluster/common.h b/contrib/test/traft/make_cluster/common.h new file mode 100644 index 0000000000..df7422033a --- /dev/null +++ b/contrib/test/traft/make_cluster/common.h @@ -0,0 +1,23 @@ +#ifndef TRAFT_COMMON_H +#define TRAFT_COMMON_H + +#ifdef __cplusplus +extern "C" { +#endif + +#include + +#define COMMAND_LEN 512 +#define MAX_CMD_COUNT 10 +#define TOKEN_LEN 128 +#define MAX_PEERS_COUNT 19 + +#define HOST_LEN 64 +#define ADDRESS_LEN (HOST_LEN * 2) +#define BASE_DIR_LEN 128 + +#ifdef __cplusplus +} +#endif + +#endif diff --git a/contrib/test/traft/make_cluster/config.c b/contrib/test/traft/make_cluster/config.c new file mode 100644 index 0000000000..3b96839fd9 --- /dev/null +++ b/contrib/test/traft/make_cluster/config.c @@ -0,0 +1,64 @@ +#include "config.h" +#include +#include +#include + +void addrToString(const char *host, uint16_t port, char *addr, int len) { snprintf(addr, len, "%s:%hu", host, port); } + +void parseAddr(const char *addr, char *host, int len, uint16_t *port) { + char *tmp = (char *)malloc(strlen(addr) + 1); + strcpy(tmp, addr); + + char *context; + char *separator = ":"; + char *token = strtok_r(tmp, separator, &context); + if (token) { + snprintf(host, len, "%s", token); + } + + token = strtok_r(NULL, separator, &context); + if (token) { + sscanf(token, "%hu", port); + } + + free(tmp); +} + +int parseConf(int argc, char **argv, RaftServerConfig *pConf) { + memset(pConf, 0, sizeof(*pConf)); + + int option_index, option_value; + option_index = 0; + static struct option long_options[] = {{"help", no_argument, NULL, 'h'}, + {"addr", required_argument, NULL, 'a'}, + {"dir", required_argument, NULL, 'd'}, + {NULL, 0, NULL, 0}}; + + while ((option_value = getopt_long(argc, argv, "ha:d:", long_options, &option_index)) != -1) { + switch (option_value) { + case 'a': { + parseAddr(optarg, pConf->me.host, sizeof(pConf->me.host), &pConf->me.port); + break; + } + + case 'd': { + snprintf(pConf->baseDir, sizeof(pConf->baseDir), "%s", optarg); + break; + } + + case 'h': { + return -2; + } + + default: { return -2; } + } + } + + return 0; +} + +void printConf(RaftServerConfig *pConf) { + printf("\n---printConf: \n"); + printf("me: [%s:%hu] \n", pConf->me.host, pConf->me.port); + printf("dataDir: [%s] \n\n", pConf->baseDir); +} diff --git a/contrib/test/traft/make_cluster/config.h b/contrib/test/traft/make_cluster/config.h new file mode 100644 index 0000000000..13c43d0d28 --- /dev/null +++ b/contrib/test/traft/make_cluster/config.h @@ -0,0 +1,31 @@ +#ifndef TRAFT_CONFIG_H +#define TRAFT_CONFIG_H + +#ifdef __cplusplus +extern "C" { +#endif + +#include +#include +#include "common.h" + +typedef struct { + char host[HOST_LEN]; + uint16_t port; +} Addr; + +typedef struct { + Addr me; + char baseDir[BASE_DIR_LEN]; +} RaftServerConfig; + +void addrToString(const char *host, uint16_t port, char *addr, int len); +void parseAddr(const char *addr, char *host, int len, uint16_t *port); +int parseConf(int argc, char **argv, RaftServerConfig *pConf); +void printConf(RaftServerConfig *pConf); + +#ifdef __cplusplus +} +#endif + +#endif diff --git a/contrib/test/traft/make_cluster/console.c b/contrib/test/traft/make_cluster/console.c new file mode 100644 index 0000000000..b00550c681 --- /dev/null +++ b/contrib/test/traft/make_cluster/console.c @@ -0,0 +1,202 @@ +#include "console.h" +#include +#include +#include +#include "raftServer.h" +#include "util.h" + +void printHelp() { + printf("---------------------\n"); + printf("help: \n\n"); + printf("create a vgroup with 3 replicas: \n"); + printf("create vnode voter vid 100 peers 127.0.0.1:10001 127.0.0.1:10002 \n"); + printf("create vnode voter vid 100 peers 127.0.0.1:10000 127.0.0.1:10002 \n"); + printf("create vnode voter vid 100 peers 127.0.0.1:10000 127.0.0.1:10001 \n"); + printf("\n"); + printf("create a vgroup with only one replica: \n"); + printf("create vnode voter vid 200 \n"); + printf("\n"); + printf("add vnode into vgroup: \n"); + printf("create vnode spare vid 100 ---- run at 127.0.0.1:10003\n"); + printf("join vnode vid 100 addr 127.0.0.1:10003 ---- run at leader of vgroup 100\n"); + printf("\n"); + printf("run \n"); + printf("put 0 key value \n"); + printf("get 0 key \n"); + printf("---------------------\n"); +} + +void console(RaftServer *pRaftServer) { + while (1) { + int ret; + char cmdBuf[COMMAND_LEN]; + memset(cmdBuf, 0, sizeof(cmdBuf)); + printf("(console)> "); + char *retp = fgets(cmdBuf, COMMAND_LEN, stdin); + if (!retp) { + exit(-1); + } + + int pos = strlen(cmdBuf); + if (cmdBuf[pos - 1] == '\n') { + cmdBuf[pos - 1] = '\0'; + } + + if (strncmp(cmdBuf, "", COMMAND_LEN) == 0) { + continue; + } + + char cmds[MAX_CMD_COUNT][TOKEN_LEN]; + memset(cmds, 0, sizeof(cmds)); + + int cmdCount; + cmdCount = splitString(cmdBuf, " ", cmds, MAX_CMD_COUNT); + + if (strcmp(cmds[0], "create") == 0 && strcmp(cmds[1], "vnode") == 0 && strcmp(cmds[3], "vid") == 0) { + uint16_t vid; + sscanf(cmds[4], "%hu", &vid); + + if (strcmp(cmds[2], "voter") == 0) { + char peers[MAX_PEERS_COUNT][ADDRESS_LEN]; + memset(peers, 0, sizeof(peers)); + uint32_t peersCount = 0; + + if (strcmp(cmds[5], "peers") == 0 && cmdCount > 6) { + // create vnode voter vid 100 peers 127.0.0.1:10001 127.0.0.1:10002 + for (int i = 6; i < cmdCount; ++i) { + snprintf(peers[i - 6], ADDRESS_LEN, "%s", cmds[i]); + peersCount++; + } + } else { + // create vnode voter vid 200 + } + ret = addRaftVoter(pRaftServer, peers, peersCount, vid); + if (ret == 0) { + printf("create vnode voter ok \n"); + } else { + printf("create vnode voter error \n"); + } + } else if (strcmp(cmds[2], "spare") == 0) { + ret = addRaftSpare(pRaftServer, vid); + if (ret == 0) { + printf("create vnode spare ok \n"); + } else { + printf("create vnode spare error \n"); + } + } else { + printHelp(); + } + + } else if (strcmp(cmds[0], "join") == 0 && strcmp(cmds[1], "vnode") == 0 && strcmp(cmds[2], "vid") == 0 && + strcmp(cmds[4], "addr") == 0 && cmdCount == 6) { + // join vnode vid 100 addr 127.0.0.1:10004 + + char * address = cmds[5]; + char host[64]; + uint16_t port; + parseAddr(address, host, sizeof(host), &port); + + uint16_t vid; + sscanf(cmds[3], "%hu", &vid); + + HashNode **pp = pRaftServer->raftInstances.find(&pRaftServer->raftInstances, vid); + if (*pp == NULL) { + printf("vid:%hu not found \n", vid); + break; + } + RaftInstance *pRaftInstance = (*pp)->data; + + uint64_t destRaftId = encodeRaftId(host, port, vid); + + struct raft_change *req = raft_malloc(sizeof(*req)); + RaftJoin * pRaftJoin = raft_malloc(sizeof(*pRaftJoin)); + pRaftJoin->r = &pRaftInstance->raft; + pRaftJoin->joinId = destRaftId; + req->data = pRaftJoin; + ret = raft_add(&pRaftInstance->raft, req, destRaftId, address, raftChangeAddCb); + if (ret != 0) { + printf("raft_add error: %s \n", raft_errmsg(&pRaftInstance->raft)); + } + + } else if (strcmp(cmds[0], "dropnode") == 0) { + } else if (strcmp(cmds[0], "state") == 0) { + pRaftServer->raftInstances.print(&pRaftServer->raftInstances); + for (size_t i = 0; i < pRaftServer->raftInstances.length; ++i) { + HashNode *ptr = pRaftServer->raftInstances.table[i]; + if (ptr != NULL) { + while (ptr != NULL) { + RaftInstance *pRaftInstance = ptr->data; + printf("instance vid:%hu raftId:%llu \n", ptr->vgroupId, pRaftInstance->raftId); + printRaftState(&pRaftInstance->raft); + printf("\n"); + ptr = ptr->next; + } + printf("\n"); + } + } + + } else if (strcmp(cmds[0], "put") == 0 && cmdCount == 4) { + uint16_t vid; + sscanf(cmds[1], "%hu", &vid); + char * key = cmds[2]; + char * value = cmds[3]; + HashNode **pp = pRaftServer->raftInstances.find(&pRaftServer->raftInstances, vid); + if (*pp == NULL) { + printf("vid:%hu not found \n", vid); + break; + } + RaftInstance *pRaftInstance = (*pp)->data; + + char *raftValue = malloc(TOKEN_LEN * 2 + 3); + snprintf(raftValue, TOKEN_LEN * 2 + 3, "%s--%s", key, value); + putValue(&pRaftInstance->raft, raftValue); + free(raftValue); + + } else if (strcmp(cmds[0], "run") == 0) { + pthread_t tidRaftServer; + pthread_create(&tidRaftServer, NULL, startServerFunc, pRaftServer); + + } else if (strcmp(cmds[0], "get") == 0 && cmdCount == 3) { + uint16_t vid; + sscanf(cmds[1], "%hu", &vid); + char * key = cmds[2]; + HashNode **pp = pRaftServer->raftInstances.find(&pRaftServer->raftInstances, vid); + if (*pp == NULL) { + printf("vid:%hu not found \n", vid); + break; + } + RaftInstance * pRaftInstance = (*pp)->data; + SimpleHash * pKV = pRaftInstance->fsm.data; + SimpleHashNode **ppNode = pKV->find_cstr(pKV, key); + if (*ppNode == NULL) { + printf("key:%s not found \n", key); + } else { + printf("find key:%s value:%s \n", key, (char *)((*ppNode)->data)); + } + + } else if (strcmp(cmds[0], "transfer") == 0) { + } else if (strcmp(cmds[0], "state") == 0) { + } else if (strcmp(cmds[0], "snapshot") == 0) { + } else if (strcmp(cmds[0], "exit") == 0) { + exit(0); + + } else if (strcmp(cmds[0], "quit") == 0) { + exit(0); + + } else if (strcmp(cmds[0], "help") == 0) { + printHelp(); + + } else { + printf("unknown command: %s \n", cmdBuf); + printHelp(); + } + + /* + printf("cmdBuf: [%s] \n", cmdBuf); + printf("cmdCount : %d \n", cmdCount); + for (int i = 0; i < MAX_CMD_COUNT; ++i) { + printf("cmd%d : %s \n", i, cmds[i]); + } + */ + } +} diff --git a/contrib/test/traft/make_cluster/console.h b/contrib/test/traft/make_cluster/console.h new file mode 100644 index 0000000000..f9ed12baf5 --- /dev/null +++ b/contrib/test/traft/make_cluster/console.h @@ -0,0 +1,19 @@ +#ifndef TRAFT_CONSOLE_H +#define TRAFT_CONSOLE_H + +#ifdef __cplusplus +extern "C" { +#endif + +#include +#include +#include "common.h" +#include "raftServer.h" + +void console(RaftServer *pRaftServer); + +#ifdef __cplusplus +} +#endif + +#endif diff --git a/contrib/test/traft/make_cluster/raftMain.c b/contrib/test/traft/make_cluster/raftMain.c new file mode 100644 index 0000000000..e25636de91 --- /dev/null +++ b/contrib/test/traft/make_cluster/raftMain.c @@ -0,0 +1,81 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include "common.h" +#include "config.h" +#include "console.h" +#include "raftServer.h" +#include "simpleHash.h" +#include "util.h" + +const char *exe_name; + +void *startConsoleFunc(void *param) { + RaftServer *pRaftServer = (RaftServer *)param; + console(pRaftServer); + return NULL; +} + +void usage() { + printf("\nusage: \n"); + printf("%s --addr=127.0.0.1:10000 --dir=./data \n", exe_name); + printf("\n"); +} + +RaftServerConfig gConfig; +RaftServer gRaftServer; + +int main(int argc, char **argv) { + srand(time(NULL)); + int32_t ret; + + exe_name = argv[0]; + if (argc < 3) { + usage(); + exit(-1); + } + + ret = parseConf(argc, argv, &gConfig); + if (ret != 0) { + usage(); + exit(-1); + } + printConf(&gConfig); + + if (!dirOK(gConfig.baseDir)) { + ret = mkdir(gConfig.baseDir, 0775); + if (ret != 0) { + fprintf(stderr, "mkdir error, %s \n", gConfig.baseDir); + exit(-1); + } + } + + ret = raftServerInit(&gRaftServer, &gConfig); + if (ret != 0) { + fprintf(stderr, "raftServerInit error \n"); + exit(-1); + } + + /* + pthread_t tidRaftServer; + pthread_create(&tidRaftServer, NULL, startServerFunc, &gRaftServer); + */ + + pthread_t tidConsole; + pthread_create(&tidConsole, NULL, startConsoleFunc, &gRaftServer); + + while (1) { + sleep(10); + } + + return 0; +} diff --git a/contrib/test/traft/make_cluster/raftServer.c b/contrib/test/traft/make_cluster/raftServer.c new file mode 100644 index 0000000000..bbf67b9420 --- /dev/null +++ b/contrib/test/traft/make_cluster/raftServer.c @@ -0,0 +1,286 @@ +#include "raftServer.h" +#include +#include +#include "common.h" +#include "simpleHash.h" +#include "util.h" + +void *startServerFunc(void *param) { + RaftServer *pRaftServer = (RaftServer *)param; + int32_t r = raftServerStart(pRaftServer); + assert(r == 0); + return NULL; +} + +void raftChangeAssignCb(struct raft_change *req, int status) { + struct raft *r = req->data; + if (status != 0) { + printf("raftChangeAssignCb error: %s \n", raft_errmsg(r)); + } else { + printf("raftChangeAssignCb ok \n"); + } + raft_free(req); +} + +void raftChangeAddCb(struct raft_change *req, int status) { + RaftJoin *pRaftJoin = req->data; + if (status != 0) { + printf("raftChangeAddCb error: %s \n", raft_errmsg(pRaftJoin->r)); + } else { + struct raft_change *req2 = raft_malloc(sizeof(*req2)); + req2->data = pRaftJoin->r; + int ret = raft_assign(pRaftJoin->r, req2, pRaftJoin->joinId, RAFT_VOTER, raftChangeAssignCb); + if (ret != 0) { + printf("raftChangeAddCb error: %s \n", raft_errmsg(pRaftJoin->r)); + } + } + raft_free(req->data); + raft_free(req); +} + +int fsmApplyCb(struct raft_fsm *pFsm, const struct raft_buffer *buf, void **result) { + // get fsm data + SimpleHash *sh = pFsm->data; + + // get commit value + char *msg = (char *)buf->base; + printf("fsm apply: [%s] \n", msg); + char arr[2][TOKEN_LEN]; + int r = splitString(msg, "--", arr, 2); + assert(r == 2); + + // do the value on fsm + sh->insert_cstr(sh, arr[0], arr[1]); + + raft_free(buf->base); + return 0; +} + +void putValueCb(struct raft_apply *req, int status, void *result) { + struct raft *r = req->data; + if (status != 0) { + printf("putValueCb error: %s \n", raft_errmsg(r)); + } else { + printf("putValueCb: %s \n", "ok"); + } + raft_free(req); +} + +void putValue(struct raft *r, const char *value) { + struct raft_buffer buf; + + buf.len = strlen(value) + 1; + buf.base = raft_malloc(buf.len); + snprintf(buf.base, buf.len, "%s", value); + + struct raft_apply *req = raft_malloc(sizeof(*req)); + req->data = r; + int ret = raft_apply(r, req, &buf, 1, putValueCb); + if (ret == 0) { + printf("put %s \n", (char *)buf.base); + } else { + printf("put error: %s \n", raft_errmsg(r)); + } +} + +const char *state2String(unsigned short state) { + if (state == RAFT_UNAVAILABLE) { + return "RAFT_UNAVAILABLE"; + + } else if (state == RAFT_FOLLOWER) { + return "RAFT_FOLLOWER"; + + } else if (state == RAFT_CANDIDATE) { + return "RAFT_CANDIDATE"; + + } else if (state == RAFT_LEADER) { + return "RAFT_LEADER"; + } + return "UNKNOWN_RAFT_STATE"; +} + +void printRaftConfiguration(struct raft_configuration *c) { + printf("configuration: \n"); + for (int i = 0; i < c->n; ++i) { + printf("%llu -- %d -- %s\n", c->servers[i].id, c->servers[i].role, c->servers[i].address); + } +} + +void printRaftState(struct raft *r) { + printf("----Raft State: -----------\n"); + printf("mem_addr: %p \n", r); + printf("my_id: %llu \n", r->id); + printf("address: %s \n", r->address); + printf("current_term: %llu \n", r->current_term); + printf("voted_for: %llu \n", r->voted_for); + printf("role: %s \n", state2String(r->state)); + printf("commit_index: %llu \n", r->commit_index); + printf("last_applied: %llu \n", r->last_applied); + printf("last_stored: %llu \n", r->last_stored); + + printf("configuration_index: %llu \n", r->configuration_index); + printf("configuration_uncommitted_index: %llu \n", r->configuration_uncommitted_index); + printRaftConfiguration(&r->configuration); + + printf("----------------------------\n"); +} + +int32_t addRaftVoter(RaftServer *pRaftServer, char peers[][ADDRESS_LEN], uint32_t peersCount, uint16_t vid) { + int ret; + + RaftInstance *pRaftInstance = malloc(sizeof(*pRaftInstance)); + assert(pRaftInstance != NULL); + + // init raftId + pRaftInstance->raftId = encodeRaftId(pRaftServer->host, pRaftServer->port, vid); + + // init dir + snprintf(pRaftInstance->dir, sizeof(pRaftInstance->dir), "%s/%s_%hu_%hu_%llu", pRaftServer->baseDir, + pRaftServer->host, pRaftServer->port, vid, pRaftInstance->raftId); + + if (!dirOK(pRaftInstance->dir)) { + ret = mkdir(pRaftInstance->dir, 0775); + if (ret != 0) { + fprintf(stderr, "mkdir error, %s \n", pRaftInstance->dir); + assert(0); + } + } + + // init fsm + pRaftInstance->fsm.data = newSimpleHash(2); + pRaftInstance->fsm.apply = fsmApplyCb; + + // init io + ret = raft_uv_init(&pRaftInstance->io, &pRaftServer->loop, pRaftInstance->dir, &pRaftServer->transport); + if (ret != 0) { + fprintf(stderr, "raft_uv_init error, %s \n", raft_errmsg(&pRaftInstance->raft)); + assert(0); + } + + // init raft + ret = raft_init(&pRaftInstance->raft, &pRaftInstance->io, &pRaftInstance->fsm, pRaftInstance->raftId, + pRaftServer->address); + if (ret != 0) { + fprintf(stderr, "raft_init error, %s \n", raft_errmsg(&pRaftInstance->raft)); + assert(0); + } + + // init raft_configuration + struct raft_configuration conf; + raft_configuration_init(&conf); + raft_configuration_add(&conf, pRaftInstance->raftId, pRaftServer->address, RAFT_VOTER); + for (int i = 0; i < peersCount; ++i) { + char * peerAddress = peers[i]; + char host[64]; + uint16_t port; + parseAddr(peerAddress, host, sizeof(host), &port); + uint64_t raftId = encodeRaftId(host, port, vid); + raft_configuration_add(&conf, raftId, peers[i], RAFT_VOTER); + } + raft_bootstrap(&pRaftInstance->raft, &conf); + + // start raft + ret = raft_start(&pRaftInstance->raft); + if (ret != 0) { + fprintf(stderr, "raft_start error, %s \n", raft_errmsg(&pRaftInstance->raft)); + assert(0); + } + + // add raft instance into raft server + pRaftServer->raftInstances.insert(&pRaftServer->raftInstances, vid, pRaftInstance); + + return 0; +} + +int32_t addRaftSpare(RaftServer *pRaftServer, uint16_t vid) { + int ret; + + RaftInstance *pRaftInstance = malloc(sizeof(*pRaftInstance)); + assert(pRaftInstance != NULL); + + // init raftId + pRaftInstance->raftId = encodeRaftId(pRaftServer->host, pRaftServer->port, vid); + + // init dir + snprintf(pRaftInstance->dir, sizeof(pRaftInstance->dir), "%s/%s_%hu_%hu_%llu", pRaftServer->baseDir, + pRaftServer->host, pRaftServer->port, vid, pRaftInstance->raftId); + ret = mkdir(pRaftInstance->dir, 0775); + if (ret != 0) { + fprintf(stderr, "mkdir error, %s \n", pRaftInstance->dir); + assert(0); + } + + // init fsm + pRaftInstance->fsm.data = newSimpleHash(2); + pRaftInstance->fsm.apply = fsmApplyCb; + + // init io + ret = raft_uv_init(&pRaftInstance->io, &pRaftServer->loop, pRaftInstance->dir, &pRaftServer->transport); + if (ret != 0) { + fprintf(stderr, "raft_uv_init error, %s \n", raft_errmsg(&pRaftInstance->raft)); + assert(0); + } + + // init raft + ret = raft_init(&pRaftInstance->raft, &pRaftInstance->io, &pRaftInstance->fsm, pRaftInstance->raftId, + pRaftServer->address); + if (ret != 0) { + fprintf(stderr, "raft_init error, %s \n", raft_errmsg(&pRaftInstance->raft)); + assert(0); + } + + // init raft_configuration + struct raft_configuration conf; + raft_configuration_init(&conf); + raft_configuration_add(&conf, pRaftInstance->raftId, pRaftServer->address, RAFT_SPARE); + raft_bootstrap(&pRaftInstance->raft, &conf); + + // start raft + ret = raft_start(&pRaftInstance->raft); + if (ret != 0) { + fprintf(stderr, "raft_start error, %s \n", raft_errmsg(&pRaftInstance->raft)); + assert(0); + } + + // add raft instance into raft server + pRaftServer->raftInstances.insert(&pRaftServer->raftInstances, vid, pRaftInstance); + + return 0; +} + +int32_t raftServerInit(RaftServer *pRaftServer, const RaftServerConfig *pConf) { + int ret; + + // init host, port, address, dir + snprintf(pRaftServer->host, sizeof(pRaftServer->host), "%s", pConf->me.host); + pRaftServer->port = pConf->me.port; + snprintf(pRaftServer->address, sizeof(pRaftServer->address), "%s:%u", pRaftServer->host, pRaftServer->port); + snprintf(pRaftServer->baseDir, sizeof(pRaftServer->baseDir), "%s", pConf->baseDir); + + // init loop + ret = uv_loop_init(&pRaftServer->loop); + if (ret != 0) { + fprintf(stderr, "uv_loop_init error: %s \n", uv_strerror(ret)); + assert(0); + } + + // init network + ret = raft_uv_tcp_init(&pRaftServer->transport, &pRaftServer->loop); + if (ret != 0) { + fprintf(stderr, "raft_uv_tcp_init: error %d \n", ret); + assert(0); + } + + // init raft instance container + initIdHash(&pRaftServer->raftInstances, 2); + + return 0; +} + +int32_t raftServerStart(RaftServer *pRaftServer) { + // start loop + uv_run(&pRaftServer->loop, UV_RUN_DEFAULT); + return 0; +} + +void raftServerStop(RaftServer *pRaftServer) {} diff --git a/contrib/test/traft/make_cluster/raftServer.h b/contrib/test/traft/make_cluster/raftServer.h new file mode 100644 index 0000000000..b6dbddb2b7 --- /dev/null +++ b/contrib/test/traft/make_cluster/raftServer.h @@ -0,0 +1,66 @@ +#ifndef TDENGINE_RAFT_SERVER_H +#define TDENGINE_RAFT_SERVER_H + +#ifdef __cplusplus +extern "C" { +#endif + +#include +#include +#include +#include +#include "common.h" +#include "config.h" +#include "raft.h" +#include "raft/uv.h" +#include "simpleHash.h" + +typedef struct RaftJoin { + struct raft *r; + raft_id joinId; +} RaftJoin; + +typedef struct { + raft_id raftId; + char dir[BASE_DIR_LEN * 2]; + struct raft_fsm fsm; + struct raft_io io; + struct raft raft; +} RaftInstance; + +typedef struct { + char host[HOST_LEN]; + uint16_t port; + char address[ADDRESS_LEN]; /* Raft instance address */ + char baseDir[BASE_DIR_LEN]; /* Raft instance address */ + + struct uv_loop_s loop; /* UV loop */ + struct raft_uv_transport transport; /* UV I/O backend transport */ + + IdHash raftInstances; /* multi raft instances. traft use IdHash to manager multi vgroup inside, here we can use IdHash + too. */ +} RaftServer; + +void * startServerFunc(void *param); +int32_t addRaftVoter(RaftServer *pRaftServer, char peers[][ADDRESS_LEN], uint32_t peersCount, uint16_t vid); +int32_t addRaftSpare(RaftServer *pRaftServer, uint16_t vid); + +int32_t raftServerInit(RaftServer *pRaftServer, const RaftServerConfig *pConf); +int32_t raftServerStart(RaftServer *pRaftServer); +void raftServerStop(RaftServer *pRaftServer); + +int fsmApplyCb(struct raft_fsm *pFsm, const struct raft_buffer *buf, void **result); +void putValueCb(struct raft_apply *req, int status, void *result); +void putValue(struct raft *r, const char *value); + +void raftChangeAddCb(struct raft_change *req, int status); + +const char *state2String(unsigned short state); +void printRaftConfiguration(struct raft_configuration *c); +void printRaftState(struct raft *r); + +#ifdef __cplusplus +} +#endif + +#endif // TDENGINE_RAFT_SERVER_H diff --git a/contrib/test/traft/make_cluster/simpleHash.c b/contrib/test/traft/make_cluster/simpleHash.c new file mode 100644 index 0000000000..6694843874 --- /dev/null +++ b/contrib/test/traft/make_cluster/simpleHash.c @@ -0,0 +1,218 @@ +#include "simpleHash.h" + +uint32_t mySimpleHash(const char* data, size_t n, uint32_t seed) { + // Similar to murmur hash + const uint32_t m = 0xc6a4a793; + const uint32_t r = 24; + const char* limit = data + n; + uint32_t h = seed ^ (n * m); + + // Pick up four bytes at a time + while (data + 4 <= limit) { + // uint32_t w = DecodeFixed32(data); + uint32_t w; + memcpy(&w, data, 4); + + data += 4; + h += w; + h *= m; + h ^= (h >> 16); + } + + // Pick up remaining bytes + switch (limit - data) { + case 3: + h += (unsigned char)(data[2]) << 16; + do { + } while (0); + case 2: + h += (unsigned char)(data[1]) << 8; + do { + } while (0); + case 1: + h += (unsigned char)(data[0]); + h *= m; + h ^= (h >> r); + break; + } + return h; +} + +int insertCStrSimpleHash(struct SimpleHash* ths, char* key, char* data) { + return insertSimpleHash(ths, key, strlen(key) + 1, data, strlen(data) + 1); +} + +int removeCStrSimpleHash(struct SimpleHash* ths, char* key) { return removeSimpleHash(ths, key, strlen(key) + 1); } + +SimpleHashNode** findCStrSimpleHash(struct SimpleHash* ths, char* key) { + return findSimpleHash(ths, key, strlen(key) + 1); +} + +int insertSimpleHash(struct SimpleHash* ths, char* key, size_t keyLen, char* data, size_t dataLen) { + SimpleHashNode** pp = ths->find(ths, key, keyLen); + if (*pp != NULL) { + fprintf(stderr, "insertSimpleHash, already has key \n"); + return -1; + } + + SimpleHashNode* node = malloc(sizeof(*node)); + node->hashCode = ths->hashFunc(key, keyLen); + node->key = malloc(keyLen); + node->keyLen = keyLen; + memcpy(node->key, key, keyLen); + node->data = malloc(dataLen); + node->dataLen = dataLen; + memcpy(node->data, data, dataLen); + node->next = NULL; + + // printf("insertSimpleHash: <%s, %ld, %s, %ld, %u> \n", node->key, node->keyLen, node->data, node->dataLen, + // node->hashCode); + + size_t index = node->hashCode & (ths->length - 1); + + SimpleHashNode* ptr = ths->table[index]; + if (ptr != NULL) { + node->next = ptr; + ths->table[index] = node; + + } else { + ths->table[index] = node; + } + ths->elems++; + if (ths->elems > 2 * ths->length) { + ths->resize(ths); + } + + return 0; +} + +int removeSimpleHash(struct SimpleHash* ths, char* key, size_t keyLen) { + SimpleHashNode** pp = ths->find(ths, key, keyLen); + if (*pp == NULL) { + fprintf(stderr, "removeSimpleHash, key not exist \n"); + return -1; + } + + SimpleHashNode* del = *pp; + *pp = del->next; + free(del->key); + free(del->data); + free(del); + ths->elems--; + + return 0; +} + +SimpleHashNode** findSimpleHash(struct SimpleHash* ths, char* key, size_t keyLen) { + uint32_t hashCode = ths->hashFunc(key, keyLen); + // size_t index = hashCode % ths->length; + size_t index = hashCode & (ths->length - 1); + + // printf("findSimpleHash: %s %ld %u \n", key, keyLen, hashCode); + + SimpleHashNode** pp = &(ths->table[index]); + while (*pp != NULL && ((*pp)->hashCode != hashCode || memcmp(key, (*pp)->key, keyLen) != 0)) { + pp = &((*pp)->next); + } + + return pp; +} + +void printCStrSimpleHash(struct SimpleHash* ths) { + printf("\n--- printCStrSimpleHash: elems:%d length:%d \n", ths->elems, ths->length); + for (size_t i = 0; i < ths->length; ++i) { + SimpleHashNode* ptr = ths->table[i]; + if (ptr != NULL) { + printf("%zu: ", i); + while (ptr != NULL) { + printf("<%u, %s, %ld, %s, %ld> ", ptr->hashCode, (char*)ptr->key, ptr->keyLen, (char*)ptr->data, ptr->dataLen); + ptr = ptr->next; + } + printf("\n"); + } + } + printf("---------------\n"); +} + +void destroySimpleHash(struct SimpleHash* ths) { + for (size_t i = 0; i < ths->length; ++i) { + SimpleHashNode* ptr = ths->table[i]; + while (ptr != NULL) { + SimpleHashNode* tmp = ptr; + ptr = ptr->next; + free(tmp->key); + free(tmp->data); + free(tmp); + } + } + + ths->length = 0; + ths->elems = 0; + free(ths->table); + free(ths); +} + +void resizeSimpleHash(struct SimpleHash* ths) { + uint32_t new_length = ths->length; + while (new_length < ths->elems) { + new_length *= 2; + } + + printf("resizeSimpleHash: %p from %u to %u \n", ths, ths->length, new_length); + + SimpleHashNode** new_table = malloc(new_length * sizeof(SimpleHashNode*)); + memset(new_table, 0, new_length * sizeof(SimpleHashNode*)); + + uint32_t count = 0; + for (uint32_t i = 0; i < ths->length; i++) { + if (ths->table[i] == NULL) { + continue; + } + + SimpleHashNode* it = ths->table[i]; + while (it != NULL) { + SimpleHashNode* move_node = it; + it = it->next; + + // move move_node + move_node->next = NULL; + size_t index = move_node->hashCode & (new_length - 1); + + SimpleHashNode* ptr = new_table[index]; + if (ptr != NULL) { + move_node->next = ptr; + new_table[index] = move_node; + } else { + new_table[index] = move_node; + } + count++; + } + } + + assert(ths->elems == count); + free(ths->table); + ths->table = new_table; + ths->length = new_length; +} + +uint32_t simpleHashFunc(const char* key, size_t keyLen) { return mySimpleHash(key, keyLen, 1); } + +struct SimpleHash* newSimpleHash(size_t length) { + struct SimpleHash* ths = malloc(sizeof(*ths)); + + ths->length = length; + ths->elems = 0; + ths->table = malloc(length * sizeof(SimpleHashNode*)); + memset(ths->table, 0, length * sizeof(SimpleHashNode*)); + + ths->insert = insertSimpleHash; + ths->remove = removeSimpleHash; + ths->find = findSimpleHash; + ths->insert_cstr = insertCStrSimpleHash; + ths->remove_cstr = removeCStrSimpleHash; + ths->find_cstr = findCStrSimpleHash; + ths->print_cstr = printCStrSimpleHash; + ths->destroy = destroySimpleHash; + ths->resize = resizeSimpleHash; + ths->hashFunc = simpleHashFunc; +} diff --git a/contrib/test/traft/make_cluster/simpleHash.h b/contrib/test/traft/make_cluster/simpleHash.h new file mode 100644 index 0000000000..c6fcd93888 --- /dev/null +++ b/contrib/test/traft/make_cluster/simpleHash.h @@ -0,0 +1,61 @@ +#ifndef __SIMPLE_HASH_H__ +#define __SIMPLE_HASH_H__ + +#include +#include +#include +#include +#include + +uint32_t mySimpleHash(const char* data, size_t n, uint32_t seed); + +typedef struct SimpleHashNode { + uint32_t hashCode; + void* key; + size_t keyLen; + void* data; + size_t dataLen; + struct SimpleHashNode* next; +} SimpleHashNode; + +typedef struct SimpleHash { + // public: + + int (*insert)(struct SimpleHash* ths, char* key, size_t keyLen, char* data, size_t dataLen); + int (*remove)(struct SimpleHash* ths, char* key, size_t keyLen); + SimpleHashNode** (*find)(struct SimpleHash* ths, char* key, size_t keyLen); + + // wrapper + int (*insert_cstr)(struct SimpleHash* ths, char* key, char* data); + int (*remove_cstr)(struct SimpleHash* ths, char* key); + SimpleHashNode** (*find_cstr)(struct SimpleHash* ths, char* key); + + void (*print_cstr)(struct SimpleHash* ths); + void (*destroy)(struct SimpleHash* ths); + + uint32_t length; + uint32_t elems; + + // private: + void (*resize)(struct SimpleHash* ths); + uint32_t (*hashFunc)(const char* key, size_t keyLen); + + SimpleHashNode** table; + +} SimpleHash; + +int insertCStrSimpleHash(struct SimpleHash* ths, char* key, char* data); +int removeCStrSimpleHash(struct SimpleHash* ths, char* key); +SimpleHashNode** findCStrSimpleHash(struct SimpleHash* ths, char* key); +void printCStrSimpleHash(struct SimpleHash* ths); + +int insertSimpleHash(struct SimpleHash* ths, char* key, size_t keyLen, char* data, size_t dataLen); +int removeSimpleHash(struct SimpleHash* ths, char* key, size_t keyLen); +SimpleHashNode** findSimpleHash(struct SimpleHash* ths, char* key, size_t keyLen); +void destroySimpleHash(struct SimpleHash* ths); +void resizeSimpleHash(struct SimpleHash* ths); +uint32_t simpleHashFunc(const char* key, size_t keyLen); + +struct SimpleHash* newSimpleHash(size_t length); + +#endif diff --git a/contrib/test/traft/make_cluster/util.c b/contrib/test/traft/make_cluster/util.c new file mode 100644 index 0000000000..ff704f3660 --- /dev/null +++ b/contrib/test/traft/make_cluster/util.c @@ -0,0 +1,45 @@ +#include "util.h" +#include +#include +#include + +int dirOK(const char *path) { + DIR *dir = opendir(path); + if (dir != NULL) { + closedir(dir); + return 1; + } else { + return 0; + } +} + +int splitString(const char *str, char *separator, char (*arr)[TOKEN_LEN], int n_arr) { + if (n_arr <= 0) { + return -1; + } + + char *tmp = (char *)malloc(strlen(str) + 1); + strcpy(tmp, str); + char *context; + int n = 0; + + char *token = strtok_r(tmp, separator, &context); + if (!token) { + goto ret; + } + strncpy(arr[n], token, TOKEN_LEN); + n++; + + while (1) { + token = strtok_r(NULL, separator, &context); + if (!token || n >= n_arr) { + goto ret; + } + strncpy(arr[n], token, TOKEN_LEN); + n++; + } + +ret: + free(tmp); + return n; +} diff --git a/contrib/test/traft/make_cluster/util.h b/contrib/test/traft/make_cluster/util.h new file mode 100644 index 0000000000..fb4ccb9c5c --- /dev/null +++ b/contrib/test/traft/make_cluster/util.h @@ -0,0 +1,17 @@ +#ifndef TRAFT_UTIL_H +#define TRAFT_UTIL_H + +#ifdef __cplusplus +extern "C" { +#endif + +#include "common.h" + +int dirOK(const char *path); +int splitString(const char *str, char *separator, char (*arr)[TOKEN_LEN], int n_arr); + +#ifdef __cplusplus +} +#endif + +#endif diff --git a/contrib/test/traft/rebalance_leader/CMakeLists.txt b/contrib/test/traft/rebalance_leader/CMakeLists.txt new file mode 100644 index 0000000000..92640bdd80 --- /dev/null +++ b/contrib/test/traft/rebalance_leader/CMakeLists.txt @@ -0,0 +1,7 @@ +add_executable(rebalanceLeader "") +target_sources(rebalanceLeader + PRIVATE + "raftMain.c" + "raftServer.c" +) +target_link_libraries(rebalanceLeader PUBLIC traft lz4 uv_a) diff --git a/contrib/test/traft/rebalance_leader/clear.sh b/contrib/test/traft/rebalance_leader/clear.sh new file mode 100644 index 0000000000..398b3088f2 --- /dev/null +++ b/contrib/test/traft/rebalance_leader/clear.sh @@ -0,0 +1,4 @@ +#!/bin/bash + +rm -rf 127.0.0.1* +rm -rf ./data diff --git a/contrib/test/traft/common.h b/contrib/test/traft/rebalance_leader/common.h similarity index 100% rename from contrib/test/traft/common.h rename to contrib/test/traft/rebalance_leader/common.h diff --git a/contrib/test/traft/raftMain.c b/contrib/test/traft/rebalance_leader/raftMain.c similarity index 94% rename from contrib/test/traft/raftMain.c rename to contrib/test/traft/rebalance_leader/raftMain.c index 24ad93856c..70dc191997 100644 --- a/contrib/test/traft/raftMain.c +++ b/contrib/test/traft/rebalance_leader/raftMain.c @@ -60,9 +60,9 @@ void raftTransferCb(struct raft_transfer *req) { SRaftServer *pRaftServer = req->data; raft_free(req); - printf("raftTransferCb: \n"); + //printf("raftTransferCb: \n"); updateLeaderStates(pRaftServer); - printLeaderCount(); + //printLeaderCount(); int myLeaderCount; for (int i = 0; i < NODE_COUNT; ++i) { @@ -71,12 +71,13 @@ void raftTransferCb(struct raft_transfer *req) { } } - printf("myLeaderCount:%d waterLevel:%d \n", myLeaderCount, pRaftServer->instanceCount / NODE_COUNT); + //printf("myLeaderCount:%d waterLevel:%d \n", myLeaderCount, pRaftServer->instanceCount / NODE_COUNT); if (myLeaderCount > pRaftServer->instanceCount / NODE_COUNT) { struct raft *r; for (int j = 0; j < pRaftServer->instanceCount; ++j) { if (pRaftServer->instance[j].raft.state == RAFT_LEADER) { r = &pRaftServer->instance[j].raft; + break; } } @@ -87,17 +88,25 @@ void raftTransferCb(struct raft_transfer *req) { int minIndex = -1; int minLeaderCount = myLeaderCount; for (int j = 0; j < NODE_COUNT; ++j) { - if (strcmp(leaderStates[j].address, pRaftServer->address) == 0) continue; + if (strcmp(leaderStates[j].address, pRaftServer->address) == 0) { + continue; + } + if (leaderStates[j].leaderCount <= minLeaderCount) { + minLeaderCount = leaderStates[j].leaderCount; minIndex = j; } } + char myHost[48]; uint16_t myPort; uint16_t myVid; decodeRaftId(r->id, myHost, sizeof(myHost), &myPort, &myVid); - + + + //printf("raftTransferCb transfer leader: vid[%u] choose: index:%d, leaderStates[%d].address:%s, leaderStates[%d].leaderCount:%d \n", minIndex, minIndex, leaderStates[minIndex].address, minIndex, leaderStates[minIndex].leaderCount); + char *destAddress = leaderStates[minIndex].address; char tokens[MAX_PEERS][MAX_TOKEN_LEN]; @@ -106,6 +115,9 @@ void raftTransferCb(struct raft_transfer *req) { uint16_t destPort = atoi(tokens[1]); destRaftId = encodeRaftId(destHost, destPort, myVid); + printf("\nraftTransferCb transfer leader: vgroupId:%u from:%s:%u --> to:%s:%u ", myVid, myHost, myPort, destHost, destPort); + fflush(stdout); + raft_transfer(r, transfer, destRaftId, raftTransferCb); } @@ -252,7 +264,6 @@ const char* state2String(unsigned short state) { void printRaftState2(struct raft *r) { - char leaderAddress[128]; memset(leaderAddress, 0, sizeof(leaderAddress)); @@ -350,6 +361,7 @@ void console(SRaftServer *pRaftServer) { while (1) { char cmd_buf[COMMAND_LEN]; memset(cmd_buf, 0, sizeof(cmd_buf)); + printf("(console)> "); char *ret = fgets(cmd_buf, COMMAND_LEN, stdin); if (!ret) { exit(-1); @@ -403,7 +415,10 @@ void console(SRaftServer *pRaftServer) { } else if (strcmp(cmd, "dropnode") == 0) { printf("not support \n"); - } else if (strcmp(cmd, "rebalance") == 0) { + } else if (strcmp(cmd, "quit") == 0 || strcmp(cmd, "exit") == 0) { + exit(0); + + } else if (strcmp(cmd, "rebalance") == 0 && strcmp(param1, "leader") == 0) { /* updateLeaderStates(pRaftServer); @@ -511,10 +526,14 @@ void console(SRaftServer *pRaftServer) { printRaftState(&pRaftServer->instance[i].raft); } - } else if (strcmp(cmd, "state2") == 0) { + } else if (strcmp(cmd, "leader") == 0 && strcmp(param1, "state") == 0) { + updateLeaderStates(pRaftServer); + printf("\n--------------------------------------------\n"); + printLeaderCount(); for (int i = 0; i < pRaftServer->instanceCount; ++i) { printRaftState2(&pRaftServer->instance[i].raft); } + printf("--------------------------------------------\n"); } else if (strcmp(cmd, "snapshot") == 0) { printf("not support \n"); diff --git a/contrib/test/traft/raftServer.c b/contrib/test/traft/rebalance_leader/raftServer.c similarity index 92% rename from contrib/test/traft/raftServer.c rename to contrib/test/traft/rebalance_leader/raftServer.c index 94de49cd0f..165d3c9023 100644 --- a/contrib/test/traft/raftServer.c +++ b/contrib/test/traft/rebalance_leader/raftServer.c @@ -3,32 +3,34 @@ #include "common.h" #include "raftServer.h" -char *keys; -char *values; +//char *keys = malloc(MAX_RECORD_COUNT * MAX_KV_LEN);; +//char *values = malloc(MAX_RECORD_COUNT * MAX_KV_LEN); + + +char keys[MAX_KV_LEN][MAX_RECORD_COUNT]; +char values[MAX_KV_LEN][MAX_RECORD_COUNT]; +int writeIndex = 0; void initStore() { - keys = malloc(MAX_RECORD_COUNT * MAX_KV_LEN); - values = malloc(MAX_RECORD_COUNT * MAX_KV_LEN); - writeIndex = 0; } void destroyStore() { - free(keys); - free(values); + //free(keys); + //free(values); } void putKV(const char *key, const char *value) { if (writeIndex < MAX_RECORD_COUNT) { - strncpy(&keys[writeIndex], key, MAX_KV_LEN); - strncpy(&values[writeIndex], value, MAX_KV_LEN); + strncpy(keys[writeIndex], key, MAX_KV_LEN); + strncpy(values[writeIndex], value, MAX_KV_LEN); writeIndex++; } } char *getKV(const char *key) { for (int i = 0; i < MAX_RECORD_COUNT; ++i) { - if (strcmp(&keys[i], key) == 0) { - return &values[i]; + if (strcmp(keys[i], key) == 0) { + return values[i]; } } return NULL; diff --git a/contrib/test/traft/raftServer.h b/contrib/test/traft/rebalance_leader/raftServer.h similarity index 93% rename from contrib/test/traft/raftServer.h rename to contrib/test/traft/rebalance_leader/raftServer.h index b1f62caac5..5ea43985c9 100644 --- a/contrib/test/traft/raftServer.h +++ b/contrib/test/traft/rebalance_leader/raftServer.h @@ -15,11 +15,13 @@ extern "C" { // simulate a db store, just for test -#define MAX_KV_LEN 100 -#define MAX_RECORD_COUNT 500 -char *keys; -char *values; -int writeIndex; +#define MAX_KV_LEN 20 +#define MAX_RECORD_COUNT 16 + + +//char *keys; +//char *values; +//int writeIndex; void initStore(); void destroyStore(); diff --git a/include/common/tmsg.h b/include/common/tmsg.h index f69f46a601..a59326fa7c 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -26,6 +26,7 @@ extern "C" { #include "tarray.h" #include "tcoding.h" #include "tdataformat.h" +#include "thash.h" #include "tlist.h" /* ------------------------ MESSAGE DEFINITIONS ------------------------ */ @@ -132,6 +133,73 @@ typedef enum _mgmt_table { #define TSDB_COL_IS_UD_COL(f) ((f & (~(TSDB_COL_NULL))) == TSDB_COL_UDC) #define TSDB_COL_REQ_NULL(f) (((f)&TSDB_COL_NULL) != 0) +typedef struct SKlv { + int32_t keyLen; + int32_t valueLen; + void* key; + void* value; +} SKlv; + +static FORCE_INLINE int taosEncodeSKlv(void** buf, const SKlv* pKlv) { + int tlen = 0; + tlen += taosEncodeFixedI32(buf, pKlv->keyLen); + tlen += taosEncodeFixedI32(buf, pKlv->valueLen); + tlen += taosEncodeBinary(buf, pKlv->key, pKlv->keyLen); + tlen += taosEncodeBinary(buf, pKlv->value, pKlv->valueLen); + return tlen; +} + +static FORCE_INLINE void* taosDecodeSKlv(void* buf, SKlv* pKlv) { + buf = taosDecodeFixedI32(buf, &pKlv->keyLen); + buf = taosDecodeFixedI32(buf, &pKlv->valueLen); + buf = taosDecodeBinary(buf, &pKlv->key, pKlv->keyLen); + buf = taosDecodeBinary(buf, &pKlv->value, pKlv->valueLen); + return buf; +} +typedef struct SClientHbKey { + int32_t connId; + int32_t hbType; +} SClientHbKey; + +static FORCE_INLINE int taosEncodeSClientHbKey(void** buf, const SClientHbKey* pKey) { + int tlen = 0; + tlen += taosEncodeFixedI32(buf, pKey->connId); + tlen += taosEncodeFixedI32(buf, pKey->hbType); + return tlen; +} + +static FORCE_INLINE void* taosDecodeSClientHbKey(void* buf, SClientHbKey* pKey) { + buf = taosDecodeFixedI32(buf, &pKey->connId); + buf = taosDecodeFixedI32(buf, &pKey->hbType); + return buf; +} + +typedef struct SClientHbReq { + SClientHbKey connKey; + SHashObj* info; // hash +} SClientHbReq; + +typedef struct SClientHbBatchReq { + int64_t reqId; + SArray* reqs; // SArray +} SClientHbBatchReq; + +int tSerializeSClientHbReq(void** buf, const SClientHbReq* pReq); +void* tDeserializeClientHbReq(void* buf, SClientHbReq* pReq); + +typedef struct SClientHbRsp { + SClientHbKey connKey; + int32_t status; + int32_t bodyLen; + void* body; +} SClientHbRsp; + +typedef struct SClientHbBatchRsp { + int64_t reqId; + int64_t rspId; + SArray* rsps; // SArray +} SClientHbBatchRsp; + typedef struct SBuildTableMetaInput { int32_t vgId; char* dbName; @@ -1123,7 +1191,7 @@ typedef struct { int32_t topicNum; int64_t consumerId; char* consumerGroup; - char* topicName[]; + SArray* topicNames; // SArray } SCMSubscribeReq; static FORCE_INLINE int tSerializeSCMSubscribeReq(void** buf, const SCMSubscribeReq* pReq) { @@ -1131,8 +1199,9 @@ static FORCE_INLINE int tSerializeSCMSubscribeReq(void** buf, const SCMSubscribe tlen += taosEncodeFixedI32(buf, pReq->topicNum); tlen += taosEncodeFixedI64(buf, pReq->consumerId); tlen += taosEncodeString(buf, pReq->consumerGroup); - for(int i = 0; i < pReq->topicNum; i++) { - tlen += taosEncodeString(buf, pReq->topicName[i]); + + for (int i = 0; i < pReq->topicNum; i++) { + tlen += taosEncodeString(buf, (char*)taosArrayGetP(pReq->topicNames, i)); } return tlen; } @@ -1141,8 +1210,11 @@ static FORCE_INLINE void* tDeserializeSCMSubscribeReq(void* buf, SCMSubscribeReq buf = taosDecodeFixedI32(buf, &pReq->topicNum); buf = taosDecodeFixedI64(buf, &pReq->consumerId); buf = taosDecodeString(buf, &pReq->consumerGroup); - for(int i = 0; i < pReq->topicNum; i++) { - buf = taosDecodeString(buf, &pReq->topicName[i]); + pReq->topicNames = taosArrayInit(pReq->topicNum, sizeof(void*)); + for (int i = 0; i < pReq->topicNum; i++) { + char* name = NULL; + buf = taosDecodeString(buf, &name); + taosArrayPush(pReq->topicNames, &name); } return buf; } @@ -1154,14 +1226,14 @@ typedef struct SMqSubTopic { } SMqSubTopic; typedef struct { - int32_t topicNum; + int32_t topicNum; SMqSubTopic topics[]; } SCMSubscribeRsp; static FORCE_INLINE int tSerializeSCMSubscribeRsp(void** buf, const SCMSubscribeRsp* pRsp) { int tlen = 0; tlen += taosEncodeFixedI32(buf, pRsp->topicNum); - for(int i = 0; i < pRsp->topicNum; i++) { + for (int i = 0; i < pRsp->topicNum; i++) { tlen += taosEncodeFixedI32(buf, pRsp->topics[i].vgId); tlen += taosEncodeFixedI64(buf, pRsp->topics[i].topicId); tlen += taosEncodeSEpSet(buf, &pRsp->topics[i].epSet); @@ -1171,7 +1243,7 @@ static FORCE_INLINE int tSerializeSCMSubscribeRsp(void** buf, const SCMSubscribe static FORCE_INLINE void* tDeserializeSCMSubscribeRsp(void* buf, SCMSubscribeRsp* pRsp) { buf = taosDecodeFixedI32(buf, &pRsp->topicNum); - for(int i = 0; i < pRsp->topicNum; i++) { + for (int i = 0; i < pRsp->topicNum; i++) { buf = taosDecodeFixedI32(buf, &pRsp->topics[i].vgId); buf = taosDecodeFixedI64(buf, &pRsp->topics[i].topicId); buf = taosDecodeSEpSet(buf, &pRsp->topics[i].epSet); diff --git a/include/libs/catalog/catalog.h b/include/libs/catalog/catalog.h index 3916898829..af14e52fc0 100644 --- a/include/libs/catalog/catalog.h +++ b/include/libs/catalog/catalog.h @@ -72,7 +72,7 @@ int32_t catalogGetDBVgroupVersion(struct SCatalog* pCatalog, const char* dbName, * @param pVgroupList (output, vgroup info list, element is SVgroupInfo, NEED to simply free the array by caller) * @return error code */ -int32_t catalogGetDBVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pDBName, int32_t forceUpdate, SArray** pVgroupList); +int32_t catalogGetDBVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pDBName, bool forceUpdate, SArray** pVgroupList); int32_t catalogUpdateDBVgroup(struct SCatalog* pCatalog, const char* dbName, SDBVgroupInfo* dbInfo); diff --git a/include/util/tcoding.h b/include/util/tcoding.h index e1edf0d792..226856901f 100644 --- a/include/util/tcoding.h +++ b/include/util/tcoding.h @@ -370,10 +370,33 @@ static FORCE_INLINE void *taosDecodeStringTo(void *buf, char *value) { return POINTER_SHIFT(buf, size); } +// ---- binary +static FORCE_INLINE int taosEncodeBinary(void **buf, const void *value, int valueLen) { + int tlen = 0; + + if (buf != NULL) { + memcpy(*buf, value, valueLen); + *buf = POINTER_SHIFT(*buf, valueLen); + } + tlen += (int)valueLen; + + return tlen; +} + +static FORCE_INLINE void *taosDecodeBinary(void *buf, void **value, int valueLen) { + uint64_t size = 0; + + *value = malloc((size_t)valueLen); + if (*value == NULL) return NULL; + memcpy(*value, buf, (size_t)size); + + return POINTER_SHIFT(buf, size); +} + #endif #ifdef __cplusplus } #endif -#endif /*_TD_UTIL_CODING_H*/ \ No newline at end of file +#endif /*_TD_UTIL_CODING_H*/ diff --git a/include/util/thash.h b/include/util/thash.h index a736fc26af..4558162ac5 100644 --- a/include/util/thash.h +++ b/include/util/thash.h @@ -210,6 +210,14 @@ void taosHashCancelIterate(SHashObj *pHashObj, void *p); */ int32_t taosHashGetKey(void *data, void** key, size_t* keyLen); + +/** + * Get the corresponding data length for a given data in hash table + * @param data + * @return + */ +int32_t taosHashGetDataLen(void *data); + /** * return the payload data with the specified key(reference number added) * diff --git a/source/client/inc/clientHb.h b/source/client/inc/clientHb.h new file mode 100644 index 0000000000..73adb41308 --- /dev/null +++ b/source/client/inc/clientHb.h @@ -0,0 +1,53 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "os.h" +#include "tarray.h" +#include "thash.h" +#include "tmsg.h" + +typedef enum { + mq = 0, + // type can be added here + // + HEARTBEAT_TYPE_MAX +} EHbType; + +typedef int32_t (*FHbRspHandle)(SClientHbRsp* pReq); +typedef int32_t (*FGetConnInfo)(SClientHbKey connKey, void* param); + +typedef struct SClientHbMgr { + int8_t inited; + int32_t reportInterval; // unit ms + int32_t stats; + SRWLatch lock; + SHashObj* activeInfo; // hash + SHashObj* getInfoFuncs; // hash + FHbRspHandle handle[HEARTBEAT_TYPE_MAX]; + // input queue +} SClientHbMgr; + +static SClientHbMgr clientHbMgr = {0}; + +int hbMgrInit(); +void hbMgrCleanUp(); +int hbHandleRsp(void* hbMsg); + + +int hbRegisterConn(SClientHbKey connKey, FGetConnInfo func); + + +int hbAddConnInfo(SClientHbKey connKey, void* key, void* value, int32_t keyLen, int32_t valueLen); + diff --git a/source/client/src/clientHb.c b/source/client/src/clientHb.c new file mode 100644 index 0000000000..7daa1204d0 --- /dev/null +++ b/source/client/src/clientHb.c @@ -0,0 +1,77 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "clientHb.h" + +static int32_t mqHbRspHandle(SClientHbRsp* pReq) { + return 0; +} + +uint32_t hbKeyHashFunc(const char* key, uint32_t keyLen) { + return 0; +} + +static void hbMgrInitMqHbFunc() { + clientHbMgr.handle[mq] = mqHbRspHandle; +} + +int hbMgrInit() { + //init once + int8_t old = atomic_val_compare_exchange_8(&clientHbMgr.inited, 0, 1); + if (old == 1) return 0; + + //init config + clientHbMgr.reportInterval = 1500; + + //init stat + clientHbMgr.stats = 0; + + //init lock + taosInitRWLatch(&clientHbMgr.lock); + + //init handle funcs + hbMgrInitMqHbFunc(); + + //init hash info + clientHbMgr.activeInfo = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK); + //init getInfoFunc + clientHbMgr.getInfoFuncs = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK); + return 0; +} + +void hbMgrCleanUp() { + int8_t old = atomic_val_compare_exchange_8(&clientHbMgr.inited, 1, 0); + if (old == 0) return; + + taosHashCleanup(clientHbMgr.activeInfo); + taosHashCleanup(clientHbMgr.getInfoFuncs); +} + +int hbRegisterConn(SClientHbKey connKey, FGetConnInfo func) { + + return 0; +} + +int hbAddConnInfo(SClientHbKey connKey, void* key, void* value, int32_t keyLen, int32_t valueLen) { + //lock + + //find req by connection id + SClientHbReq* data = taosHashGet(clientHbMgr.activeInfo, &connKey, sizeof(SClientHbKey)); + ASSERT(data != NULL); + taosHashPut(data->info, key, keyLen, value, valueLen); + + //unlock + return 0; +} diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index 782eb7d902..9673937811 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -140,7 +140,7 @@ int32_t buildRequest(STscObj *pTscObj, const char *sql, int sqlLen, SRequestObj* (*pRequest)->sqlstr[sqlLen] = 0; (*pRequest)->sqlLen = sqlLen; - tscDebugL("0x%"PRIx64" SQL: %s, reqId:0x"PRIx64, (*pRequest)->self, (*pRequest)->sqlstr, (*pRequest)->requestId); + tscDebugL("0x%"PRIx64" SQL: %s, reqId:0x%"PRIx64, (*pRequest)->self, (*pRequest)->sqlstr, (*pRequest)->requestId); return TSDB_CODE_SUCCESS; } diff --git a/source/client/test/clientTests.cpp b/source/client/test/clientTests.cpp index 39b43d1602..0ef96e657d 100644 --- a/source/client/test/clientTests.cpp +++ b/source/client/test/clientTests.cpp @@ -101,13 +101,13 @@ TEST(testCase, show_user_Test) { assert(pConn != NULL); TAOS_RES* pRes = taos_query(pConn, "show users"); - TAOS_ROW pRow = NULL; + TAOS_ROW pRow = NULL; TAOS_FIELD* pFields = taos_fetch_fields(pRes); - int32_t numOfFields = taos_num_fields(pRes); + int32_t numOfFields = taos_num_fields(pRes); char str[512] = {0}; - while((pRow = taos_fetch_row(pRes)) != NULL) { + while ((pRow = taos_fetch_row(pRes)) != NULL) { int32_t code = taos_print_row(str, pRow, pFields, numOfFields); printf("%s\n", str); } @@ -134,13 +134,13 @@ TEST(testCase, show_db_Test) { assert(pConn != NULL); TAOS_RES* pRes = taos_query(pConn, "show databases"); - TAOS_ROW pRow = NULL; + TAOS_ROW pRow = NULL; TAOS_FIELD* pFields = taos_fetch_fields(pRes); - int32_t numOfFields = taos_num_fields(pRes); + int32_t numOfFields = taos_num_fields(pRes); char str[512] = {0}; - while((pRow = taos_fetch_row(pRes)) != NULL) { + while ((pRow = taos_fetch_row(pRes)) != NULL) { int32_t code = taos_print_row(str, pRow, pFields, numOfFields); printf("%s\n", str); } @@ -152,7 +152,7 @@ TEST(testCase, create_db_Test) { TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); assert(pConn != NULL); - TAOS_RES* pRes = taos_query(pConn, "create database abc1"); + TAOS_RES* pRes = taos_query(pConn, "create database abc1 vgroups 2"); if (taos_errno(pRes) != 0) { printf("error in create db, reason:%s\n", taos_errstr(pRes)); } @@ -228,33 +228,33 @@ TEST(testCase, use_db_test) { taos_close(pConn); } -//TEST(testCase, drop_db_test) { -//// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); -//// assert(pConn != NULL); -//// -//// showDB(pConn); -//// -//// TAOS_RES* pRes = taos_query(pConn, "drop database abc1"); -//// if (taos_errno(pRes) != 0) { -//// printf("failed to drop db, reason:%s\n", taos_errstr(pRes)); -//// } -//// taos_free_result(pRes); -//// -//// showDB(pConn); -//// -//// pRes = taos_query(pConn, "create database abc1"); -//// if (taos_errno(pRes) != 0) { -//// printf("create to drop db, reason:%s\n", taos_errstr(pRes)); -//// } -//// taos_free_result(pRes); -//// taos_close(pConn); +// TEST(testCase, drop_db_test) { +// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); +// assert(pConn != NULL); +// +// showDB(pConn); +// +// TAOS_RES* pRes = taos_query(pConn, "drop database abc1"); +// if (taos_errno(pRes) != 0) { +// printf("failed to drop db, reason:%s\n", taos_errstr(pRes)); +// } +// taos_free_result(pRes); +// +// showDB(pConn); +// +// pRes = taos_query(pConn, "create database abc1"); +// if (taos_errno(pRes) != 0) { +// printf("create to drop db, reason:%s\n", taos_errstr(pRes)); +// } +// taos_free_result(pRes); +// taos_close(pConn); //} - TEST(testCase, create_stable_Test) { +TEST(testCase, create_stable_Test) { TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); assert(pConn != NULL); - TAOS_RES* pRes = taos_query(pConn, "create database abc1"); + TAOS_RES* pRes = taos_query(pConn, "create database abc1 vgroups 2"); if (taos_errno(pRes) != 0) { printf("error in create db, reason:%s\n", taos_errstr(pRes)); } @@ -281,128 +281,211 @@ TEST(testCase, use_db_test) { taos_close(pConn); } -//TEST(testCase, create_table_Test) { -// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); -// assert(pConn != NULL); -// -// TAOS_RES* pRes = taos_query(pConn, "use abc1"); -// taos_free_result(pRes); -// -// pRes = taos_query(pConn, "create table tm0(ts timestamp, k int)"); -// taos_free_result(pRes); -// -// taos_close(pConn); -//} +TEST(testCase, create_table_Test) { + TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); + assert(pConn != NULL); -//TEST(testCase, create_ctable_Test) { -// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); -// assert(pConn != NULL); -// -// TAOS_RES* pRes = taos_query(pConn, "use abc1"); -// if (taos_errno(pRes) != 0) { -// printf("failed to use db, reason:%s\n", taos_errstr(pRes)); -// } -// taos_free_result(pRes); -// -// pRes = taos_query(pConn, "create table tm0 using st1 tags(1)"); -// if (taos_errno(pRes) != 0) { -// printf("failed to create child table tm0, reason:%s\n", taos_errstr(pRes)); -// } -// -// taos_free_result(pRes); -// taos_close(pConn); -//} -// -//TEST(testCase, show_stable_Test) { -// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); -// assert(pConn != NULL); -// -// TAOS_RES* pRes = taos_query(pConn, "use abc1"); -// if (taos_errno(pRes) != 0) { -// printf("failed to use db, reason:%s\n", taos_errstr(pRes)); -// } -// taos_free_result(pRes); -// -// pRes = taos_query(pConn, "show stables"); -// if (taos_errno(pRes) != 0) { -// printf("failed to show stables, reason:%s\n", taos_errstr(pRes)); -// taos_free_result(pRes); -// ASSERT_TRUE(false); -// } -// -// TAOS_ROW pRow = NULL; -// TAOS_FIELD* pFields = taos_fetch_fields(pRes); -// int32_t numOfFields = taos_num_fields(pRes); -// -// char str[512] = {0}; -// while((pRow = taos_fetch_row(pRes)) != NULL) { -// int32_t code = taos_print_row(str, pRow, pFields, numOfFields); -// printf("%s\n", str); -// } -// -// taos_free_result(pRes); -// taos_close(pConn); -//} + TAOS_RES* pRes = taos_query(pConn, "use abc1"); + taos_free_result(pRes); -//TEST(testCase, show_vgroup_Test) { -// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); -// assert(pConn != NULL); -// -// TAOS_RES* pRes = taos_query(pConn, "use abc1"); -// if (taos_errno(pRes) != 0) { -// printf("failed to use db, reason:%s\n", taos_errstr(pRes)); -// } -// taos_free_result(pRes); -// -// pRes = taos_query(pConn, "show vgroups"); -// if (taos_errno(pRes) != 0) { -// printf("failed to show vgroups, reason:%s\n", taos_errstr(pRes)); -// taos_free_result(pRes); -// ASSERT_TRUE(false); -// } -// -// TAOS_ROW pRow = NULL; -// -// TAOS_FIELD* pFields = taos_fetch_fields(pRes); -// int32_t numOfFields = taos_num_fields(pRes); -// -// char str[512] = {0}; -// while((pRow = taos_fetch_row(pRes)) != NULL) { -// int32_t code = taos_print_row(str, pRow, pFields, numOfFields); -// printf("%s\n", str); -// } -// -// taos_free_result(pRes); -// -// taos_close(pConn); -//} + pRes = taos_query(pConn, "create table tm0(ts timestamp, k int)"); + taos_free_result(pRes); -//TEST(testCase, drop_stable_Test) { -// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); -// assert(pConn != NULL); -// -// TAOS_RES* pRes = taos_query(pConn, "create database abc1"); -// if (taos_errno(pRes) != 0) { -// printf("error in creating db, reason:%s\n", taos_errstr(pRes)); -// } -// taos_free_result(pRes); -// -// pRes = taos_query(pConn, "use abc1"); -// if (taos_errno(pRes) != 0) { -// printf("error in using db, reason:%s\n", taos_errstr(pRes)); -// } -// taos_free_result(pRes); -// -// pRes = taos_query(pConn, "drop stable st1"); -// if (taos_errno(pRes) != 0) { -// printf("failed to drop stable, reason:%s\n", taos_errstr(pRes)); -// } -// -// taos_free_result(pRes); -// taos_close(pConn); -//} + taos_close(pConn); +} -//TEST(testCase, create_topic_Test) { +TEST(testCase, create_ctable_Test) { + TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); + assert(pConn != NULL); + + TAOS_RES* pRes = taos_query(pConn, "use abc1"); + if (taos_errno(pRes) != 0) { + printf("failed to use db, reason:%s\n", taos_errstr(pRes)); + } + taos_free_result(pRes); + + pRes = taos_query(pConn, "create table tm0 using st1 tags(1)"); + if (taos_errno(pRes) != 0) { + printf("failed to create child table tm0, reason:%s\n", taos_errstr(pRes)); + } + + taos_free_result(pRes); + taos_close(pConn); +} + +TEST(testCase, show_stable_Test) { + TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); + assert(pConn != NULL); + + TAOS_RES* pRes = taos_query(pConn, "use abc1"); + if (taos_errno(pRes) != 0) { + printf("failed to use db, reason:%s\n", taos_errstr(pRes)); + } + taos_free_result(pRes); + + pRes = taos_query(pConn, "show stables"); + if (taos_errno(pRes) != 0) { + printf("failed to show stables, reason:%s\n", taos_errstr(pRes)); + taos_free_result(pRes); + ASSERT_TRUE(false); + } + + TAOS_ROW pRow = NULL; + TAOS_FIELD* pFields = taos_fetch_fields(pRes); + int32_t numOfFields = taos_num_fields(pRes); + + char str[512] = {0}; + while ((pRow = taos_fetch_row(pRes)) != NULL) { + int32_t code = taos_print_row(str, pRow, pFields, numOfFields); + printf("%s\n", str); + } + + taos_free_result(pRes); + taos_close(pConn); +} + +TEST(testCase, show_vgroup_Test) { + TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); + assert(pConn != NULL); + + TAOS_RES* pRes = taos_query(pConn, "use abc1"); + if (taos_errno(pRes) != 0) { + printf("failed to use db, reason:%s\n", taos_errstr(pRes)); + } + taos_free_result(pRes); + + pRes = taos_query(pConn, "show vgroups"); + if (taos_errno(pRes) != 0) { + printf("failed to show vgroups, reason:%s\n", taos_errstr(pRes)); + taos_free_result(pRes); + ASSERT_TRUE(false); + } + + TAOS_ROW pRow = NULL; + + TAOS_FIELD* pFields = taos_fetch_fields(pRes); + int32_t numOfFields = taos_num_fields(pRes); + + char str[512] = {0}; + while ((pRow = taos_fetch_row(pRes)) != NULL) { + int32_t code = taos_print_row(str, pRow, pFields, numOfFields); + printf("%s\n", str); + } + + taos_free_result(pRes); + taos_close(pConn); +} + +TEST(testCase, create_multiple_tables) { + TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); + ASSERT_NE(pConn, nullptr); + + TAOS_RES* pRes = taos_query(pConn, "use abc1"); + if (taos_errno(pRes) != 0) { + printf("failed to use db, reason:%s", taos_errstr(pRes)); + taos_free_result(pRes); + taos_close(pConn); + return; + } + + taos_free_result(pRes); + + pRes = taos_query(pConn, "create table t_2 using st1 tags(1)"); + if (taos_errno(pRes) != 0) { + printf("failed to create multiple tables, reason:%s\n", taos_errstr(pRes)); + taos_free_result(pRes); + ASSERT_TRUE(false); + } + + taos_free_result(pRes); + pRes = taos_query(pConn, "create table t_3 using st1 tags(2)"); + if (taos_errno(pRes) != 0) { + printf("failed to create multiple tables, reason:%s\n", taos_errstr(pRes)); + taos_free_result(pRes); + ASSERT_TRUE(false); + } + + TAOS_ROW pRow = NULL; + TAOS_FIELD* pFields = taos_fetch_fields(pRes); + int32_t numOfFields = taos_num_fields(pRes); + + char str[512] = {0}; + while ((pRow = taos_fetch_row(pRes)) != NULL) { + int32_t code = taos_print_row(str, pRow, pFields, numOfFields); + printf("%s\n", str); + } + + taos_free_result(pRes); + + for (int32_t i = 0; i < 20; ++i) { + char sql[512] = {0}; + snprintf(sql, tListLen(sql), + "create table t_x_%d using st1 tags(2) t_x_%d using st1 tags(5) t_x_%d using st1 tags(911)", i, + (i + 1) * 30, (i + 2) * 40); + TAOS_RES* pres = taos_query(pConn, sql); + if (taos_errno(pres) != 0) { + printf("failed to create table %d\n, reason:%s", i, taos_errstr(pres)); + } + taos_free_result(pres); + } + + taos_close(pConn); +} + +TEST(testCase, show_table_Test) { + TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); + assert(pConn != NULL); + + TAOS_RES* pRes = taos_query(pConn, "use abc1"); + taos_free_result(pRes); + + pRes = taos_query(pConn, "show tables"); + if (taos_errno(pRes) != 0) { + printf("failed to show vgroups, reason:%s\n", taos_errstr(pRes)); + taos_free_result(pRes); + ASSERT_TRUE(false); + } + + TAOS_ROW pRow = NULL; + TAOS_FIELD* pFields = taos_fetch_fields(pRes); + int32_t numOfFields = taos_num_fields(pRes); + + char str[512] = {0}; + while ((pRow = taos_fetch_row(pRes)) != NULL) { + int32_t code = taos_print_row(str, pRow, pFields, numOfFields); + printf("%s\n", str); + } + + taos_free_result(pRes); + taos_close(pConn); +} + +TEST(testCase, drop_stable_Test) { + TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); + assert(pConn != NULL); + + TAOS_RES* pRes = taos_query(pConn, "create database abc1"); + if (taos_errno(pRes) != 0) { + printf("error in creating db, reason:%s\n", taos_errstr(pRes)); + } + taos_free_result(pRes); + + pRes = taos_query(pConn, "use abc1"); + if (taos_errno(pRes) != 0) { + printf("error in using db, reason:%s\n", taos_errstr(pRes)); + } + taos_free_result(pRes); + + pRes = taos_query(pConn, "drop stable st1"); + if (taos_errno(pRes) != 0) { + printf("failed to drop stable, reason:%s\n", taos_errstr(pRes)); + } + + taos_free_result(pRes); + taos_close(pConn); +} + +// TEST(testCase, create_topic_Test) { // TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); // assert(pConn != NULL); // @@ -435,95 +518,12 @@ TEST(testCase, use_db_test) { // tmq_create_topic(pConn, "test_topic_1", sql, strlen(sql)); // taos_close(pConn); //} - -//TEST(testCase, show_table_Test) { -// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); -// assert(pConn != NULL); -// -// TAOS_RES* pRes = taos_query(pConn, "use abc1"); -// taos_free_result(pRes); -// -// pRes = taos_query(pConn, "show tables"); -// if (taos_errno(pRes) != 0) { -// printf("failed to show vgroups, reason:%s\n", taos_errstr(pRes)); -// taos_free_result(pRes); -// ASSERT_TRUE(false); -// } -// -// TAOS_ROW pRow = NULL; -// TAOS_FIELD* pFields = taos_fetch_fields(pRes); -// int32_t numOfFields = taos_num_fields(pRes); -// -// char str[512] = {0}; -// while((pRow = taos_fetch_row(pRes)) != NULL) { -// int32_t code = taos_print_row(str, pRow, pFields, numOfFields); -// printf("%s\n", str); -// } -// -// taos_free_result(pRes); -// taos_close(pConn); -//} - -TEST(testCase, create_multiple_tables) { - TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); - ASSERT_NE(pConn, nullptr); - - TAOS_RES* pRes = taos_query(pConn, "use abc1"); - if (taos_errno(pRes) != 0) { - printf("failed to use db, reason:%s", taos_errstr(pRes)); - taos_free_result(pRes); - taos_close(pConn); - return; - } - - taos_free_result(pRes); - - pRes = taos_query(pConn, "create table t_2 using st1 tags(1)"); - if (taos_errno(pRes) != 0) { - printf("failed to create multiple tables, reason:%s\n", taos_errstr(pRes)); - taos_free_result(pRes); - ASSERT_TRUE(false); - } - - taos_free_result(pRes); - pRes = taos_query(pConn, "create table t_3 using st1 tags(2)"); - if (taos_errno(pRes) != 0) { - printf("failed to create multiple tables, reason:%s\n", taos_errstr(pRes)); - taos_free_result(pRes); - ASSERT_TRUE(false); - } - - TAOS_ROW pRow = NULL; - TAOS_FIELD* pFields = taos_fetch_fields(pRes); - int32_t numOfFields = taos_num_fields(pRes); - - char str[512] = {0}; - while((pRow = taos_fetch_row(pRes)) != NULL) { - int32_t code = taos_print_row(str, pRow, pFields, numOfFields); - printf("%s\n", str); - } - - taos_free_result(pRes); - -// for(int32_t i = 0; i < 1000; ++i) { -// char sql[512] = {0}; -// snprintf(sql, tListLen(sql), "create table t_x_%d using st1 tags(2)", i); -// TAOS_RES* pres = taos_query(pConn, sql); -// if (taos_errno(pres) != 0) { -// printf("failed to create table %d\n, reason:%s", i, taos_errstr(pres)); -// } -// taos_free_result(pres); -// } - - taos_close(pConn); -} - TEST(testCase, generated_request_id_test) { - SHashObj *phash = taosHashInit(10000, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK); + SHashObj* phash = taosHashInit(10000, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK); - for(int32_t i = 0; i < 50000; ++i) { + for (int32_t i = 0; i < 50000; ++i) { uint64_t v = generateRequestId(); - void* result = taosHashGet(phash, &v, sizeof(v)); + void* result = taosHashGet(phash, &v, sizeof(v)); if (result != nullptr) { printf("0x%lx, index:%d\n", v, i); } @@ -534,7 +534,7 @@ TEST(testCase, generated_request_id_test) { taosHashCleanup(phash); } -//TEST(testCase, projection_query_tables) { +// TEST(testCase, projection_query_tables) { // TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); // ASSERT_EQ(pConn, nullptr); // @@ -561,4 +561,3 @@ TEST(testCase, generated_request_id_test) { // taos_free_result(pRes); // taos_close(pConn); //} - diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 9ddadc9ba6..d9043861e7 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -910,7 +910,7 @@ static void doInitGlobalConfig(void) { cfg.option = "tsdbDebugFlag"; cfg.ptr = &tsdbDebugFlag; cfg.valType = TAOS_CFG_VTYPE_INT32; - cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_LOG | TSDB_CFG_CTYPE_B_CLIENT; + cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_LOG; cfg.minValue = 0; cfg.maxValue = 255; cfg.ptrLength = 0; @@ -927,6 +927,16 @@ static void doInitGlobalConfig(void) { cfg.unitType = TAOS_CFG_UTYPE_NONE; taosAddConfigOption(cfg); + cfg.option = "ctgDebugFlag"; + cfg.ptr = &ctgDebugFlag; + cfg.valType = TAOS_CFG_VTYPE_INT32; + cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_LOG | TSDB_CFG_CTYPE_B_CLIENT; + cfg.minValue = 0; + cfg.maxValue = 255; + cfg.ptrLength = 0; + cfg.unitType = TAOS_CFG_UTYPE_NONE; + taosAddConfigOption(cfg); + cfg.option = "enableRecordSql"; cfg.ptr = &tsTscEnableRecordSql; cfg.valType = TAOS_CFG_VTYPE_INT8; diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index c1048f8482..5cbb42c1e6 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -27,6 +27,40 @@ #undef TD_MSG_SEG_CODE_ #include "tmsgdef.h" +int tSerializeSClientHbReq(void **buf, const SClientHbReq *pReq) { + int tlen = 0; + tlen += taosEncodeSClientHbKey(buf, &pReq->connKey); + + void *pIter = NULL; + void *data; + SKlv klv; + data = taosHashIterate(pReq->info, pIter); + while (data != NULL) { + taosHashGetKey(data, &klv.key, (size_t *)&klv.keyLen); + klv.valueLen = taosHashGetDataLen(data); + klv.value = data; + taosEncodeSKlv(buf, &klv); + + data = taosHashIterate(pReq->info, pIter); + } + return tlen; +} + +void *tDeserializeClientHbReq(void *buf, SClientHbReq *pReq) { + ASSERT(pReq->info != NULL); + buf = taosDecodeSClientHbKey(buf, &pReq->connKey); + + // TODO: error handling + if (pReq->info == NULL) { + pReq->info = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); + } + SKlv klv; + buf = taosDecodeSKlv(buf, &klv); + taosHashPut(pReq->info, klv.key, klv.keyLen, klv.value, klv.valueLen); + + return buf; +} + int tSerializeSVCreateTbReq(void **buf, SVCreateTbReq *pReq) { int tlen = 0; @@ -148,4 +182,4 @@ void *tSVCreateTbBatchReqDeserialize(void *buf, SVCreateTbBatchReq *pReq) { } return buf; -} \ No newline at end of file +} diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index 7aa818c7bb..828eb4a5b7 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -127,8 +127,8 @@ static SSdbRow *mndConsumerActionDecode(SSdbRaw *pRaw) { goto CONSUME_DECODE_OVER; } - int32_t size = sizeof(SMqConsumerObj); - SSdbRow *pRow = sdbAllocRow(size); + int32_t size = sizeof(SMqConsumerObj); + SSdbRow *pRow = sdbAllocRow(size); if (pRow == NULL) goto CONSUME_DECODE_OVER; SMqConsumerObj *pConsumer = sdbGetRowObj(pRow); @@ -155,7 +155,6 @@ static SSdbRow *mndConsumerActionDecode(SSdbRaw *pRaw) { SDB_GET_INT32(pRaw, dataPos, &vgSize, CONSUME_DECODE_OVER); } - CONSUME_DECODE_OVER: if (terrno != 0) { mError("consumer:%ld, failed to decode from raw:%p since %s", pConsumer->consumerId, pRaw, terrstr()); @@ -209,6 +208,7 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) { tDeserializeSCMSubscribeReq(msgStr, pSubscribe); int64_t consumerId = pSubscribe->consumerId; char *consumerGroup = pSubscribe->consumerGroup; + int32_t cgroupLen = strlen(consumerGroup); SArray *newSub = NULL; int newTopicNum = pSubscribe->topicNum; @@ -216,13 +216,14 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) { newSub = taosArrayInit(newTopicNum, sizeof(SMqConsumerTopic)); } for (int i = 0; i < newTopicNum; i++) { - char *topic = pSubscribe->topicName[i]; + char *newTopicName = taosArrayGetP(newSub, i); SMqConsumerTopic *pConsumerTopic = malloc(sizeof(SMqConsumerTopic)); if (pConsumerTopic == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; // TODO: free return -1; } + strcpy(pConsumerTopic->name, newTopicName); pConsumerTopic->vgroups = tdListNew(sizeof(int64_t)); taosArrayPush(newSub, pConsumerTopic); free(pConsumerTopic); @@ -239,7 +240,8 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } - strcpy(pConsumer->cgroup, pSubscribe->consumerGroup); + pConsumer->consumerId = consumerId; + strcpy(pConsumer->cgroup, consumerGroup); } else { oldSub = pConsumer->topics; @@ -260,6 +262,7 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) { j++; } else if (j >= oldTopicNum) { pNewTopic = taosArrayGet(newSub, i); + i++; } else { pNewTopic = taosArrayGet(newSub, i); pOldTopic = taosArrayGet(oldSub, j); @@ -292,7 +295,7 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) { SMqTopicObj *pTopic = mndAcquireTopic(pMnode, oldTopicName); ASSERT(pTopic != NULL); - SMqCGroup *pGroup = taosHashGet(pTopic->cgroups, pSubscribe->consumerGroup, strlen(pSubscribe->consumerGroup)); + SMqCGroup *pGroup = taosHashGet(pTopic->cgroups, consumerGroup, cgroupLen); while ((pn = tdListNext(&iter)) != NULL) { int32_t vgId = *(int64_t *)pn->data; SVgObj *pVgObj = mndAcquireVgroup(pMnode, vgId); @@ -302,8 +305,7 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) { continue; } // acquire and get epset - void *pMqVgSetReq = - mndBuildMqVGroupSetReq(pMnode, oldTopicName, vgId, pSubscribe->consumerId, pSubscribe->consumerGroup); + void *pMqVgSetReq = mndBuildMqVGroupSetReq(pMnode, oldTopicName, vgId, consumerId, consumerGroup); // TODO:serialize if (pMsg == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -321,7 +323,8 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) { return -1; } } - taosHashRemove(pTopic->cgroups, pSubscribe->consumerGroup, strlen(pSubscribe->consumerGroup)); + taosHashRemove(pTopic->cgroups, consumerGroup, cgroupLen); + mndReleaseTopic(pMnode, pTopic); } else if (pNewTopic != NULL) { ASSERT(pOldTopic == NULL); @@ -330,7 +333,7 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) { SMqTopicObj *pTopic = mndAcquireTopic(pMnode, newTopicName); ASSERT(pTopic != NULL); - SMqCGroup *pGroup = taosHashGet(pTopic->cgroups, pSubscribe->consumerGroup, strlen(pSubscribe->consumerGroup)); + SMqCGroup *pGroup = taosHashGet(pTopic->cgroups, consumerGroup, cgroupLen); if (pGroup == NULL) { // add new group pGroup = malloc(sizeof(SMqCGroup)); @@ -346,18 +349,20 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) { } pGroup->status = 0; // add into cgroups - taosHashPut(pTopic->cgroups, pSubscribe->consumerGroup, strlen(pSubscribe->consumerGroup), pGroup, - sizeof(SMqCGroup)); + taosHashPut(pTopic->cgroups, consumerGroup, cgroupLen, pGroup, sizeof(SMqCGroup)); } // put the consumer into list // rebalance will be triggered by timer - tdListAppend(pGroup->consumerIds, &pSubscribe->consumerId); + tdListAppend(pGroup->consumerIds, &consumerId); SSdbRaw *pTopicRaw = mndTopicActionEncode(pTopic); sdbSetRawStatus(pTopicRaw, SDB_STATUS_READY); // TODO: error handling mndTransAppendRedolog(pTrans, pTopicRaw); + + mndReleaseTopic(pMnode, pTopic); + } else { ASSERT(0); } @@ -376,11 +381,13 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) { if (mndTransPrepare(pMnode, pTrans) != 0) { mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr()); mndTransDrop(pTrans); + mndReleaseConsumer(pMnode, pConsumer); return -1; } // TODO: free memory mndTransDrop(pTrans); + mndReleaseConsumer(pMnode, pConsumer); return 0; } diff --git a/source/dnode/mnode/impl/src/mndTopic.c b/source/dnode/mnode/impl/src/mndTopic.c index c31b91c3fc..57b7c16c39 100644 --- a/source/dnode/mnode/impl/src/mndTopic.c +++ b/source/dnode/mnode/impl/src/mndTopic.c @@ -58,25 +58,36 @@ int32_t mndInitTopic(SMnode *pMnode) { void mndCleanupTopic(SMnode *pMnode) {} SSdbRaw *mndTopicActionEncode(SMqTopicObj *pTopic) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + int32_t size = sizeof(SMqTopicObj) + MND_TOPIC_RESERVE_SIZE; SSdbRaw *pRaw = sdbAllocRaw(SDB_TOPIC, MND_TOPIC_VER_NUMBER, size); - if (pRaw == NULL) goto WTF; + if (pRaw == NULL) goto TOPIC_ENCODE_OVER; int32_t dataPos = 0; - SDB_SET_BINARY(pRaw, dataPos, pTopic->name, TSDB_TABLE_FNAME_LEN, WTF); - SDB_SET_BINARY(pRaw, dataPos, pTopic->db, TSDB_DB_FNAME_LEN, WTF); - SDB_SET_INT64(pRaw, dataPos, pTopic->createTime, WTF); - SDB_SET_INT64(pRaw, dataPos, pTopic->updateTime, WTF); - SDB_SET_INT64(pRaw, dataPos, pTopic->uid, WTF); - SDB_SET_INT64(pRaw, dataPos, pTopic->dbUid, WTF); - SDB_SET_INT32(pRaw, dataPos, pTopic->version, WTF); - SDB_SET_INT32(pRaw, dataPos, pTopic->sqlLen, WTF); - SDB_SET_BINARY(pRaw, dataPos, pTopic->sql, pTopic->sqlLen, WTF); + SDB_SET_BINARY(pRaw, dataPos, pTopic->name, TSDB_TABLE_FNAME_LEN, TOPIC_ENCODE_OVER); + SDB_SET_BINARY(pRaw, dataPos, pTopic->db, TSDB_DB_FNAME_LEN, TOPIC_ENCODE_OVER); + SDB_SET_INT64(pRaw, dataPos, pTopic->createTime, TOPIC_ENCODE_OVER); + SDB_SET_INT64(pRaw, dataPos, pTopic->updateTime, TOPIC_ENCODE_OVER); + SDB_SET_INT64(pRaw, dataPos, pTopic->uid, TOPIC_ENCODE_OVER); + SDB_SET_INT64(pRaw, dataPos, pTopic->dbUid, TOPIC_ENCODE_OVER); + SDB_SET_INT32(pRaw, dataPos, pTopic->version, TOPIC_ENCODE_OVER); + SDB_SET_INT32(pRaw, dataPos, pTopic->sqlLen, TOPIC_ENCODE_OVER); + SDB_SET_BINARY(pRaw, dataPos, pTopic->sql, pTopic->sqlLen, TOPIC_ENCODE_OVER); - SDB_SET_RESERVE(pRaw, dataPos, MND_TOPIC_RESERVE_SIZE, WTF); - SDB_SET_DATALEN(pRaw, dataPos, WTF); + SDB_SET_RESERVE(pRaw, dataPos, MND_TOPIC_RESERVE_SIZE, TOPIC_ENCODE_OVER); + SDB_SET_DATALEN(pRaw, dataPos, TOPIC_ENCODE_OVER); -WTF: + terrno = TSDB_CODE_SUCCESS; + +TOPIC_ENCODE_OVER: + if (terrno != TSDB_CODE_SUCCESS) { + mError("topic:%s, failed to encode to raw:%p since %s", pTopic->name, pRaw, terrstr()); + sdbFreeRaw(pRaw); + return NULL; + } + + mTrace("topic:%s, encode to raw:%p, row:%p", pTopic->name, pRaw, pTopic); return pRaw; } @@ -90,8 +101,8 @@ SSdbRow *mndTopicActionDecode(SSdbRaw *pRaw) { goto TOPIC_DECODE_OVER; } - int32_t size = sizeof(SMqTopicObj); - SSdbRow *pRow = sdbAllocRow(size); + int32_t size = sizeof(SMqTopicObj); + SSdbRow *pRow = sdbAllocRow(size); if (pRow == NULL) goto TOPIC_DECODE_OVER; SMqTopicObj *pTopic = sdbGetRowObj(pRow); @@ -115,10 +126,10 @@ SSdbRow *mndTopicActionDecode(SSdbRaw *pRaw) { SDB_GET_RESERVE(pRaw, dataPos, MND_TOPIC_RESERVE_SIZE, TOPIC_DECODE_OVER) - terrno = 0; + terrno = TSDB_CODE_SUCCESS; TOPIC_DECODE_OVER: - if (terrno != 0) { + if (terrno != TSDB_CODE_SUCCESS) { mError("topic:%s, failed to decode from raw:%p since %s", pTopic->name, pRaw, terrstr()); tfree(pRow); return NULL; diff --git a/source/libs/catalog/src/catalog.c b/source/libs/catalog/src/catalog.c index 3b26f8b9ef..79c8e4ea63 100644 --- a/source/libs/catalog/src/catalog.c +++ b/source/libs/catalog/src/catalog.c @@ -302,6 +302,7 @@ int32_t ctgGetVgInfoFromHashValue(SDBVgroupInfo *dbInfo, const SName *pTableName while (pIter) { vgInfo = pIter; if (hashValue >= vgInfo->hashBegin && hashValue <= vgInfo->hashEnd) { + taosHashCancelIterate(dbInfo->vgInfo, pIter); break; } @@ -310,23 +311,14 @@ int32_t ctgGetVgInfoFromHashValue(SDBVgroupInfo *dbInfo, const SName *pTableName } if (NULL == vgInfo) { - ctgError("no hash range found for hashvalue[%u]", hashValue); - - void *pIter1 = taosHashIterate(dbInfo->vgInfo, NULL); - while (pIter1) { - vgInfo = pIter1; - ctgError("valid range:[%d, %d], vgId:%d", vgInfo->hashBegin, vgInfo->hashEnd, vgInfo->vgId); - pIter1 = taosHashIterate(dbInfo->vgInfo, pIter1); - } - + ctgError("no hash range found for hash value [%u], numOfVgId:%d", hashValue, taosHashGetSize(dbInfo->vgInfo)); CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR); } *pVgroup = *vgInfo; _return: - - CTG_RET(TSDB_CODE_SUCCESS); + CTG_RET(code); } int32_t ctgGetTableMetaImpl(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const SName* pTableName, bool forceUpdate, STableMeta** pTableMeta) { @@ -425,33 +417,30 @@ int32_t ctgUpdateTableMetaCache(struct SCatalog *pCatalog, STableMetaOutput *out CTG_RET(code); } - -int32_t ctgGetDBVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* dbName, int32_t forceUpdate, SDBVgroupInfo** dbInfo) { +int32_t ctgGetDBVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* dbName, bool forceUpdate, SDBVgroupInfo** dbInfo) { bool inCache = false; - if (0 == forceUpdate) { + if (!forceUpdate) { CTG_ERR_RET(ctgGetDBVgroupFromCache(pCatalog, dbName, dbInfo, &inCache)); - if (inCache) { return TSDB_CODE_SUCCESS; } + + ctgDebug("failed to get DB vgroupInfo from cache, dbName:%s, load it from mnode, update:%d", dbName, forceUpdate); } SUseDbOutput DbOut = {0}; SBuildUseDBInput input = {0}; - strncpy(input.db, dbName, sizeof(input.db)); - input.db[sizeof(input.db) - 1] = 0; + tstrncpy(input.db, dbName, tListLen(input.db)); input.vgVersion = CTG_DEFAULT_INVALID_VERSION; while (true) { CTG_ERR_RET(ctgGetDBVgroupFromMnode(pCatalog, pRpc, pMgmtEps, &input, &DbOut)); - CTG_ERR_RET(catalogUpdateDBVgroup(pCatalog, dbName, &DbOut.dbVgroup)); - CTG_ERR_RET(ctgGetDBVgroupFromCache(pCatalog, dbName, dbInfo, &inCache)); if (!inCache) { - ctgWarn("get db vgroup from cache failed, db:%s", dbName); + ctgWarn("get DB vgroupInfo from cache failed, dbName:%s", dbName); continue; } @@ -574,14 +563,15 @@ int32_t catalogGetDBVgroupVersion(struct SCatalog* pCatalog, const char* dbName, return TSDB_CODE_SUCCESS; } -int32_t catalogGetDBVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* dbName, int32_t forceUpdate, SArray** vgroupList) { +int32_t catalogGetDBVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* dbName, bool forceUpdate, SArray** vgroupList) { if (NULL == pCatalog || NULL == dbName || NULL == pRpc || NULL == pMgmtEps || NULL == vgroupList) { CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT); } SDBVgroupInfo* db = NULL; - int32_t code = 0; SVgroupInfo *vgInfo = NULL; + + int32_t code = 0; SArray *vgList = NULL; CTG_ERR_JRET(ctgGetDBVgroup(pCatalog, pRpc, pMgmtEps, dbName, forceUpdate, &db)); @@ -777,7 +767,6 @@ int32_t catalogGetTableHashVgroup(struct SCatalog *pCatalog, void *pTransporter, tNameGetFullDbName(pTableName, db); CTG_ERR_RET(ctgGetDBVgroup(pCatalog, pTransporter, pMgmtEps, db, false, &dbInfo)); - CTG_ERR_JRET(ctgGetVgInfoFromHashValue(dbInfo, pTableName, pVgroup)); _return: diff --git a/source/libs/parser/src/dCDAstProcess.c b/source/libs/parser/src/dCDAstProcess.c index e944fb01bd..abee162255 100644 --- a/source/libs/parser/src/dCDAstProcess.c +++ b/source/libs/parser/src/dCDAstProcess.c @@ -43,7 +43,7 @@ static int32_t setShowInfo(SShowInfo* pShowInfo, SParseBasicCtx* pCtx, void** ou char dbFname[TSDB_DB_FNAME_LEN] = {0}; tNameGetFullDbName(&name, dbFname); - catalogGetDBVgroup(pCtx->pCatalog, pCtx->pTransporter, &pCtx->mgmtEpSet, dbFname, 0, &array); + catalogGetDBVgroup(pCtx->pCatalog, pCtx->pTransporter, &pCtx->mgmtEpSet, dbFname, false, &array); SVgroupInfo* info = taosArrayGet(array, 0); pShowReq->head.vgId = htonl(info->vgId); @@ -626,11 +626,12 @@ static int32_t doBuildSingleTableBatchReq(SName* pTableName, SArray* pColumns, S int32_t doCheckAndBuildCreateTableReq(SCreateTableSql* pCreateTable, SParseBasicCtx* pCtx, SMsgBuf* pMsgBuf, char** pOutput, int32_t* len) { SArray* pBufArray = NULL; + int32_t code = 0; // it is a sql statement to create a normal table if (pCreateTable->childTableInfo == NULL) { assert(taosArrayGetSize(pCreateTable->colInfo.pColumns) > 0 && pCreateTable->colInfo.pTagColumns == NULL); - int32_t code = doCheckForCreateTable(pCreateTable, pMsgBuf); + code = doCheckForCreateTable(pCreateTable, pMsgBuf); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -659,7 +660,10 @@ int32_t doCheckAndBuildCreateTableReq(SCreateTableSql* pCreateTable, SParseBasic destroyCreateTbReqBatch(&tbatch); } else { // it is a child table, created according to a super table - doCheckAndBuildCreateCTableReq(pCreateTable, pCtx, pMsgBuf, &pBufArray); + code = doCheckAndBuildCreateCTableReq(pCreateTable, pCtx, pMsgBuf, &pBufArray); + if (code != 0) { + return code; + } } SVnodeModifOpStmtInfo* pStmtInfo = calloc(1, sizeof(SVnodeModifOpStmtInfo)); diff --git a/source/libs/parser/src/parser.c b/source/libs/parser/src/parser.c index 85a8d9e047..3971f90ac4 100644 --- a/source/libs/parser/src/parser.c +++ b/source/libs/parser/src/parser.c @@ -44,15 +44,7 @@ int32_t parseQuerySql(SParseContext* pCxt, SQueryNode** pQuery) { } if (!isDqlSqlStatement(&info)) { -// bool toVnode = false; if (info.type == TSDB_SQL_CREATE_TABLE) { -// SCreateTableSql* pCreateSql = info.pCreateTableInfo; -// if (pCreateSql->type == TSQL_CREATE_CTABLE || pCreateSql->type == TSQL_CREATE_TABLE) { -// toVnode = true; -// } -// } - -// if (toVnode) { SVnodeModifOpStmtInfo * pModifStmtInfo = qParserValidateCreateTbSqlNode(&info, &pCxt->ctx, pCxt->pMsg, pCxt->msgLen); if (pModifStmtInfo == NULL) { return terrno; diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index 38a823de2c..f5a49e782c 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -273,19 +273,20 @@ int32_t schSetTaskCandidateAddrs(SSchJob *job, SSchTask *task) { return TSDB_CODE_SUCCESS; } - -int32_t schPushTaskToExecList(SSchJob *job, SSchTask *task) { - if (0 != taosHashPut(job->execTasks, &task->taskId, sizeof(task->taskId), &task, POINTER_BYTES)) { - qError("taosHashPut failed"); +int32_t schPushTaskToExecList(SSchJob *pJob, SSchTask *pTask) { + if (0 != taosHashPut(pJob->execTasks, &pTask->taskId, sizeof(pTask->taskId), &pTask, POINTER_BYTES)) { + qError("failed to add new task, taskId:0x%"PRIx64", reqId:0x"PRIx64", out of memory", pJob->queryId); SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } + qDebug("add one task, taskId:0x%"PRIx64", numOfTasks:%d, reqId:0x%"PRIx64, pTask->taskId, taosHashGetSize(pJob->execTasks), + pJob->queryId); return TSDB_CODE_SUCCESS; } int32_t schMoveTaskToSuccList(SSchJob *job, SSchTask *task, bool *moved) { if (0 != taosHashRemove(job->execTasks, &task->taskId, sizeof(task->taskId))) { - qWarn("remove task[%"PRIx64"] from execTasks failed", task->taskId); + qError("remove task taskId:0x%"PRIx64" from execTasks failed, reqId:0x%"PRIx64, task->taskId, job->queryId); return TSDB_CODE_SUCCESS; } @@ -583,9 +584,12 @@ int32_t schHandleCallback(void* param, const SDataBuf* pMsg, int32_t msgType, in SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR); } + int32_t s = taosHashGetSize((*job)->execTasks); + assert(s != 0); + SSchTask **task = taosHashGet((*job)->execTasks, &pParam->taskId, sizeof(pParam->taskId)); if (NULL == task || NULL == (*task)) { - qError("taosHashGet taskId:%"PRIx64" not exist", pParam->taskId); + qError("failed to get task, taskId:%"PRIx64" not exist, reqId:0x%"PRIx64, pParam->taskId, (*job)->queryId); SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR); } @@ -819,12 +823,13 @@ int32_t schLaunchTask(SSchJob *job, SSchTask *task) { SCH_ERR_RET(schSetTaskCandidateAddrs(job, task)); if (NULL == task->candidateAddrs || taosArrayGetSize(task->candidateAddrs) <= 0) { - SCH_TASK_ERR_LOG("no valid condidate node for task:%"PRIx64, task->taskId); + SCH_TASK_ERR_LOG("no valid candidate node for task:%"PRIx64, task->taskId); SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR); } - SCH_ERR_RET(schBuildAndSendMsg(job, task, plan->msgType)); + // NOTE: race condition: the task should be put into the hash table before send msg to server SCH_ERR_RET(schPushTaskToExecList(job, task)); + SCH_ERR_RET(schBuildAndSendMsg(job, task, plan->msgType)); task->status = JOB_TASK_STATUS_EXECUTING; return TSDB_CODE_SUCCESS; @@ -975,7 +980,7 @@ _return: } int32_t scheduleExecJob(void *transport, SArray *nodeList, SQueryDag* pDag, void** pJob, SQueryResult *pRes) { - if (NULL == transport || /* NULL == nodeList || */ NULL == pDag || NULL == pDag->pSubplans || NULL == pJob || NULL == pRes) { + if (NULL == transport || NULL == pDag || NULL == pDag->pSubplans || NULL == pJob || NULL == pRes) { SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); } diff --git a/source/util/src/thash.c b/source/util/src/thash.c index f90b157558..5e056d213e 100644 --- a/source/util/src/thash.c +++ b/source/util/src/thash.c @@ -19,8 +19,9 @@ #include "taos.h" #include "tdef.h" -#define EXT_SIZE 1024 - +// the add ref count operation may trigger the warning if the reference count is greater than the MAX_WARNING_REF_COUNT +#define MAX_WARNING_REF_COUNT 10000 +#define EXT_SIZE 1024 #define HASH_NEED_RESIZE(_h) ((_h)->size >= (_h)->capacity * HASH_DEFAULT_LOAD_FACTOR) #define DO_FREE_HASH_NODE(_n) \ @@ -799,6 +800,11 @@ FORCE_INLINE int32_t taosHashGetKey(void *data, void** key, size_t* keyLen) { return 0; } +FORCE_INLINE int32_t taosHashGetDataLen(void *data) { + SHashNode * node = GET_HASH_PNODE(data); + return node->keyLen; +} + FORCE_INLINE uint32_t taosHashGetDataKeyLen(SHashObj *pHashObj, void *data) { SHashNode * node = GET_HASH_PNODE(data); return node->keyLen; @@ -902,8 +908,24 @@ void *taosHashIterate(SHashObj *pHashObj, void *p) { if (pNode) { SHashEntry *pe = pHashObj->hashList[slot]; - pNode->count++; - data = GET_HASH_NODE_DATA(pNode); + + uint16_t prevRef = atomic_load_16(&pNode->count); + uint16_t afterRef = atomic_add_fetch_16(&pNode->count, 1); + + // the reference count value is overflow, which will cause the delete node operation immediately. + if (prevRef > afterRef) { + uError("hash entry ref count overflow, prev ref:%d, current ref:%d", prevRef, afterRef); + // restore the value + atomic_sub_fetch_16(&pNode->count, 1); + data = NULL; + } else { + data = GET_HASH_NODE_DATA(pNode); + } + + if (afterRef >= MAX_WARNING_REF_COUNT) { + uWarn("hash entry ref count is abnormally high: %d", afterRef); + } + if (pHashObj->type == HASH_ENTRY_LOCK) { taosWUnLockLatch(&pe->latch); } @@ -911,7 +933,6 @@ void *taosHashIterate(SHashObj *pHashObj, void *p) { __rd_unlock((void*) &pHashObj->lock, pHashObj->type); return data; - } void taosHashCancelIterate(SHashObj *pHashObj, void *p) {