Merge remote-tracking branch 'origin/3.0' into feature/dnode3
This commit is contained in:
commit
3c0077a38f
|
@ -56,6 +56,12 @@ option(
|
||||||
OFF
|
OFF
|
||||||
)
|
)
|
||||||
|
|
||||||
|
option(
|
||||||
|
BUILD_WITH_TRAFT
|
||||||
|
"If build with traft"
|
||||||
|
OFF
|
||||||
|
)
|
||||||
|
|
||||||
option(
|
option(
|
||||||
BUILD_DEPENDENCY_TESTS
|
BUILD_DEPENDENCY_TESTS
|
||||||
"If build dependency tests"
|
"If build dependency tests"
|
||||||
|
|
|
@ -0,0 +1,14 @@
|
||||||
|
|
||||||
|
# traft
|
||||||
|
ExternalProject_Add(traft
|
||||||
|
GIT_REPOSITORY https://github.com/taosdata/traft.git
|
||||||
|
GIT_TAG for_3.0
|
||||||
|
SOURCE_DIR "${CMAKE_CONTRIB_DIR}/traft"
|
||||||
|
BINARY_DIR "${CMAKE_CONTRIB_DIR}/traft"
|
||||||
|
#BUILD_IN_SOURCE TRUE
|
||||||
|
# https://answers.ros.org/question/333125/how-to-include-external-automakeautoconf-projects-into-ament_cmake/
|
||||||
|
CONFIGURE_COMMAND COMMAND autoreconf -i COMMAND ./configure --enable-example
|
||||||
|
BUILD_COMMAND "$(MAKE)"
|
||||||
|
INSTALL_COMMAND ""
|
||||||
|
TEST_COMMAND ""
|
||||||
|
)
|
|
@ -41,6 +41,12 @@ if(${BUILD_WITH_CRAFT})
|
||||||
SET(BUILD_WITH_UV ON CACHE BOOL "craft need libuv" FORCE)
|
SET(BUILD_WITH_UV ON CACHE BOOL "craft need libuv" FORCE)
|
||||||
endif(${BUILD_WITH_CRAFT})
|
endif(${BUILD_WITH_CRAFT})
|
||||||
|
|
||||||
|
# traft
|
||||||
|
if(${BUILD_WITH_TRAFT})
|
||||||
|
cat("${CMAKE_SUPPORT_DIR}/traft_CMakeLists.txt.in" ${CONTRIB_TMP_FILE})
|
||||||
|
SET(BUILD_WITH_UV ON CACHE BOOL "traft need libuv" FORCE)
|
||||||
|
endif(${BUILD_WITH_TRAFT})
|
||||||
|
|
||||||
#libuv
|
#libuv
|
||||||
if(${BUILD_WITH_UV})
|
if(${BUILD_WITH_UV})
|
||||||
cat("${CMAKE_SUPPORT_DIR}/libuv_CMakeLists.txt.in" ${CONTRIB_TMP_FILE})
|
cat("${CMAKE_SUPPORT_DIR}/libuv_CMakeLists.txt.in" ${CONTRIB_TMP_FILE})
|
||||||
|
@ -173,6 +179,18 @@ if(${BUILD_WITH_CRAFT})
|
||||||
# )
|
# )
|
||||||
endif(${BUILD_WITH_CRAFT})
|
endif(${BUILD_WITH_CRAFT})
|
||||||
|
|
||||||
|
# TRAFT
|
||||||
|
if(${BUILD_WITH_TRAFT})
|
||||||
|
add_library(traft STATIC IMPORTED GLOBAL)
|
||||||
|
set_target_properties(traft PROPERTIES
|
||||||
|
IMPORTED_LOCATION "${CMAKE_CURRENT_SOURCE_DIR}/traft/.libs/libraft.a"
|
||||||
|
INTERFACE_INCLUDE_DIRECTORIES "${CMAKE_CURRENT_SOURCE_DIR}/traft/include"
|
||||||
|
)
|
||||||
|
# target_link_libraries(craft
|
||||||
|
# INTERFACE pthread
|
||||||
|
# )
|
||||||
|
endif(${BUILD_WITH_TRAFT})
|
||||||
|
|
||||||
# LIBUV
|
# LIBUV
|
||||||
if(${BUILD_WITH_UV})
|
if(${BUILD_WITH_UV})
|
||||||
add_subdirectory(libuv)
|
add_subdirectory(libuv)
|
||||||
|
|
|
@ -19,4 +19,8 @@ if(${BUILD_WITH_CRAFT})
|
||||||
add_subdirectory(craft)
|
add_subdirectory(craft)
|
||||||
endif(${BUILD_WITH_CRAFT})
|
endif(${BUILD_WITH_CRAFT})
|
||||||
|
|
||||||
|
if(${BUILD_WITH_TRAFT})
|
||||||
|
add_subdirectory(traft)
|
||||||
|
endif(${BUILD_WITH_TRAFT})
|
||||||
|
|
||||||
add_subdirectory(tdev)
|
add_subdirectory(tdev)
|
||||||
|
|
|
@ -20,6 +20,7 @@ typedef struct {
|
||||||
} Addr;
|
} Addr;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
|
int voter;
|
||||||
Addr me;
|
Addr me;
|
||||||
Addr peers[MAX_PEERS];
|
Addr peers[MAX_PEERS];
|
||||||
int peersCount;
|
int peersCount;
|
||||||
|
|
|
@ -104,7 +104,7 @@ const char* state2String(unsigned short state) {
|
||||||
void printRaftConfiguration(struct raft_configuration *c) {
|
void printRaftConfiguration(struct raft_configuration *c) {
|
||||||
printf("configuration: \n");
|
printf("configuration: \n");
|
||||||
for (int i = 0; i < c->n; ++i) {
|
for (int i = 0; i < c->n; ++i) {
|
||||||
printf("%llu -- %d -- %s\n", c->servers->id, c->servers->role, c->servers->address);
|
printf("%llu -- %d -- %s\n", c->servers[i].id, c->servers[i].role, c->servers[i].address);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -119,11 +119,9 @@ void printRaftState(struct raft *r) {
|
||||||
printf("last_applied: %llu \n", r->last_applied);
|
printf("last_applied: %llu \n", r->last_applied);
|
||||||
printf("last_stored: %llu \n", r->last_stored);
|
printf("last_stored: %llu \n", r->last_stored);
|
||||||
|
|
||||||
/*
|
|
||||||
printf("configuration_index: %llu \n", r->configuration_index);
|
printf("configuration_index: %llu \n", r->configuration_index);
|
||||||
printf("configuration_uncommitted_index: %llu \n", r->configuration_uncommitted_index);
|
printf("configuration_uncommitted_index: %llu \n", r->configuration_uncommitted_index);
|
||||||
printRaftConfiguration(&r->configuration);
|
printRaftConfiguration(&r->configuration);
|
||||||
*/
|
|
||||||
|
|
||||||
printf("----------------------------\n");
|
printf("----------------------------\n");
|
||||||
}
|
}
|
||||||
|
@ -164,6 +162,18 @@ void getValue(const char *key) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void raft_change_cb_add(struct raft_change *req, int status) {
|
||||||
|
printf("raft_change_cb_add status:%d ... \n", status);
|
||||||
|
}
|
||||||
|
|
||||||
|
void raft_change_cb_assign(struct raft_change *req, int status) {
|
||||||
|
printf("raft_change_cb_assign status:%d ... \n", status);
|
||||||
|
}
|
||||||
|
|
||||||
|
void raft_change_cb_remove(struct raft_change *req, int status) {
|
||||||
|
printf("raft_change_cb_remove status:%d ... \n", status);
|
||||||
|
}
|
||||||
|
|
||||||
void console(SRaftServer *pRaftServer) {
|
void console(SRaftServer *pRaftServer) {
|
||||||
while (1) {
|
while (1) {
|
||||||
char cmd_buf[COMMAND_LEN];
|
char cmd_buf[COMMAND_LEN];
|
||||||
|
@ -193,30 +203,59 @@ void console(SRaftServer *pRaftServer) {
|
||||||
|
|
||||||
parseCommand(cmd_buf, cmd, param1, param2, TOKEN_LEN);
|
parseCommand(cmd_buf, cmd, param1, param2, TOKEN_LEN);
|
||||||
if (strcmp(cmd, "addnode") == 0) {
|
if (strcmp(cmd, "addnode") == 0) {
|
||||||
printf("not support \n");
|
//printf("not support \n");
|
||||||
|
|
||||||
/*
|
|
||||||
char host[HOST_LEN];
|
char host[HOST_LEN];
|
||||||
uint32_t port;
|
uint32_t port;
|
||||||
parseAddr(param1, host, HOST_LEN, &port);
|
parseAddr(param1, host, HOST_LEN, &port);
|
||||||
uint64_t rid = raftId(host, port);
|
uint64_t rid = raftId(host, port);
|
||||||
|
|
||||||
struct raft_change *req = raft_malloc(sizeof(*req));
|
struct raft_change *req = raft_malloc(sizeof(*req));
|
||||||
int r = raft_add(&pRaftServer->raft, req, rid, param1, NULL);
|
int r = raft_add(&pRaftServer->raft, req, rid, param1, raft_change_cb_add);
|
||||||
if (r != 0) {
|
if (r != 0) {
|
||||||
printf("raft_add: %s \n", raft_errmsg(&pRaftServer->raft));
|
printf("raft_add error: %s \n", raft_errmsg(&pRaftServer->raft));
|
||||||
}
|
}
|
||||||
printf("add node: %lu %s \n", rid, param1);
|
printf("add node: %lu %s \n", rid, param1);
|
||||||
|
|
||||||
struct raft_change *req2 = raft_malloc(sizeof(*req2));
|
struct raft_change *req2 = raft_malloc(sizeof(*req2));
|
||||||
r = raft_assign(&pRaftServer->raft, req2, rid, RAFT_VOTER, NULL);
|
r = raft_assign(&pRaftServer->raft, req2, rid, RAFT_VOTER, raft_change_cb_assign);
|
||||||
if (r != 0) {
|
if (r != 0) {
|
||||||
printf("raft_assign: %s \n", raft_errmsg(&pRaftServer->raft));
|
printf("raft_assign error: %s \n", raft_errmsg(&pRaftServer->raft));
|
||||||
}
|
}
|
||||||
*/
|
printf("raft_assign: %s %d \n", param1, RAFT_VOTER);
|
||||||
|
|
||||||
|
} else if (strcmp(cmd, "activate") == 0) {
|
||||||
|
char host[HOST_LEN];
|
||||||
|
uint32_t port;
|
||||||
|
parseAddr(param1, host, HOST_LEN, &port);
|
||||||
|
uint64_t rid = raftId(host, port);
|
||||||
|
|
||||||
|
|
||||||
|
struct raft_change *req2 = raft_malloc(sizeof(*req2));
|
||||||
|
int r = raft_assign(&pRaftServer->raft, req2, rid, RAFT_VOTER, raft_change_cb_assign);
|
||||||
|
if (r != 0) {
|
||||||
|
printf("raft_assign error: %s \n", raft_errmsg(&pRaftServer->raft));
|
||||||
|
}
|
||||||
|
printf("raft_assign: %s %d \n", param1, RAFT_VOTER);
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
} else if (strcmp(cmd, "dropnode") == 0) {
|
} else if (strcmp(cmd, "dropnode") == 0) {
|
||||||
printf("not support \n");
|
char host[HOST_LEN];
|
||||||
|
uint32_t port;
|
||||||
|
parseAddr(param1, host, HOST_LEN, &port);
|
||||||
|
uint64_t rid = raftId(host, port);
|
||||||
|
|
||||||
|
struct raft_change *req = raft_malloc(sizeof(*req));
|
||||||
|
int r = raft_remove(&pRaftServer->raft, req, rid, raft_change_cb_remove);
|
||||||
|
if (r != 0) {
|
||||||
|
printf("raft_remove: %s \n", raft_errmsg(&pRaftServer->raft));
|
||||||
|
}
|
||||||
|
printf("drop node: %lu %s \n", rid, param1);
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
} else if (strcmp(cmd, "put") == 0) {
|
} else if (strcmp(cmd, "put") == 0) {
|
||||||
char buf[256];
|
char buf[256];
|
||||||
|
@ -234,6 +273,7 @@ void console(SRaftServer *pRaftServer) {
|
||||||
|
|
||||||
} else if (strcmp(cmd, "help") == 0) {
|
} else if (strcmp(cmd, "help") == 0) {
|
||||||
printf("addnode \"127.0.0.1:8888\" \n");
|
printf("addnode \"127.0.0.1:8888\" \n");
|
||||||
|
printf("activate \"127.0.0.1:8888\" \n");
|
||||||
printf("dropnode \"127.0.0.1:8888\" \n");
|
printf("dropnode \"127.0.0.1:8888\" \n");
|
||||||
printf("put key value \n");
|
printf("put key value \n");
|
||||||
printf("get key \n");
|
printf("get key \n");
|
||||||
|
@ -256,7 +296,9 @@ void *startConsoleFunc(void *param) {
|
||||||
// Config ---------------------------------
|
// Config ---------------------------------
|
||||||
void usage() {
|
void usage() {
|
||||||
printf("\nusage: \n");
|
printf("\nusage: \n");
|
||||||
printf("%s --me=127.0.0.1:10000 --dir=./data \n", exe_name);
|
printf("%s --me=127.0.0.1:10000 --dir=./data --voter \n", exe_name);
|
||||||
|
printf("%s --me=127.0.0.1:10001 --dir=./data \n", exe_name);
|
||||||
|
printf("%s --me=127.0.0.1:10002 --dir=./data \n", exe_name);
|
||||||
printf("\n");
|
printf("\n");
|
||||||
printf("%s --me=127.0.0.1:10000 --peers=127.0.0.1:10001,127.0.0.1:10002 --dir=./data \n", exe_name);
|
printf("%s --me=127.0.0.1:10000 --peers=127.0.0.1:10001,127.0.0.1:10002 --dir=./data \n", exe_name);
|
||||||
printf("%s --me=127.0.0.1:10001 --peers=127.0.0.1:10000,127.0.0.1:10002 --dir=./data \n", exe_name);
|
printf("%s --me=127.0.0.1:10001 --peers=127.0.0.1:10000,127.0.0.1:10002 --dir=./data \n", exe_name);
|
||||||
|
@ -271,13 +313,15 @@ void parseConf(int argc, char **argv, SRaftServerConfig *pConf) {
|
||||||
option_index = 0;
|
option_index = 0;
|
||||||
static struct option long_options[] = {
|
static struct option long_options[] = {
|
||||||
{"help", no_argument, NULL, 'h'},
|
{"help", no_argument, NULL, 'h'},
|
||||||
|
{"voter", no_argument, NULL, 'v'},
|
||||||
{"peers", required_argument, NULL, 'p'},
|
{"peers", required_argument, NULL, 'p'},
|
||||||
{"me", required_argument, NULL, 'm'},
|
{"me", required_argument, NULL, 'm'},
|
||||||
{"dir", required_argument, NULL, 'd'},
|
{"dir", required_argument, NULL, 'd'},
|
||||||
{NULL, 0, NULL, 0}
|
{NULL, 0, NULL, 0}
|
||||||
};
|
};
|
||||||
|
|
||||||
while ((option_value = getopt_long(argc, argv, "hp:m:d:", long_options, &option_index)) != -1) {
|
pConf->voter = 0;
|
||||||
|
while ((option_value = getopt_long(argc, argv, "hvp:m:d:", long_options, &option_index)) != -1) {
|
||||||
switch (option_value) {
|
switch (option_value) {
|
||||||
case 'm': {
|
case 'm': {
|
||||||
parseAddr(optarg, pConf->me.host, sizeof(pConf->me.host), &pConf->me.port);
|
parseAddr(optarg, pConf->me.host, sizeof(pConf->me.host), &pConf->me.port);
|
||||||
|
@ -295,6 +339,10 @@ void parseConf(int argc, char **argv, SRaftServerConfig *pConf) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
case 'v': {
|
||||||
|
pConf->voter = 1;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
case 'd': {
|
case 'd': {
|
||||||
snprintf(pConf->dir, sizeof(pConf->dir), "%s", optarg);
|
snprintf(pConf->dir, sizeof(pConf->dir), "%s", optarg);
|
||||||
|
@ -338,6 +386,8 @@ int main(int argc, char **argv) {
|
||||||
exit(-1);
|
exit(-1);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
signal(SIGPIPE, SIG_IGN);
|
||||||
|
|
||||||
SRaftServerConfig conf;
|
SRaftServerConfig conf;
|
||||||
parseConf(argc, argv, &conf);
|
parseConf(argc, argv, &conf);
|
||||||
printConf(&conf);
|
printConf(&conf);
|
||||||
|
|
|
@ -85,29 +85,45 @@ int32_t raftServerInit(SRaftServer *pRaftServer, const SRaftServerConfig *pConf,
|
||||||
pRaftServer->fsm = pFsm;
|
pRaftServer->fsm = pFsm;
|
||||||
|
|
||||||
ret = uv_loop_init(&pRaftServer->loop);
|
ret = uv_loop_init(&pRaftServer->loop);
|
||||||
if (!ret) {
|
if (ret != 0) {
|
||||||
fprintf(stderr, "%s \n", raft_errmsg(&pRaftServer->raft));
|
fprintf(stderr, "%s \n", raft_errmsg(&pRaftServer->raft));
|
||||||
|
assert(0);
|
||||||
}
|
}
|
||||||
|
|
||||||
ret = raft_uv_tcp_init(&pRaftServer->transport, &pRaftServer->loop);
|
ret = raft_uv_tcp_init(&pRaftServer->transport, &pRaftServer->loop);
|
||||||
if (!ret) {
|
if (ret != 0) {
|
||||||
fprintf(stderr, "%s \n", raft_errmsg(&pRaftServer->raft));
|
fprintf(stderr, "%s \n", raft_errmsg(&pRaftServer->raft));
|
||||||
|
assert(0);
|
||||||
}
|
}
|
||||||
|
|
||||||
ret = raft_uv_init(&pRaftServer->io, &pRaftServer->loop, pRaftServer->dir, &pRaftServer->transport);
|
ret = raft_uv_init(&pRaftServer->io, &pRaftServer->loop, pRaftServer->dir, &pRaftServer->transport);
|
||||||
if (!ret) {
|
if (ret != 0) {
|
||||||
fprintf(stderr, "%s \n", raft_errmsg(&pRaftServer->raft));
|
fprintf(stderr, "%s \n", raft_errmsg(&pRaftServer->raft));
|
||||||
|
assert(0);
|
||||||
}
|
}
|
||||||
|
|
||||||
ret = raft_init(&pRaftServer->raft, &pRaftServer->io, pRaftServer->fsm, pRaftServer->raftId, pRaftServer->address);
|
ret = raft_init(&pRaftServer->raft, &pRaftServer->io, pRaftServer->fsm, pRaftServer->raftId, pRaftServer->address);
|
||||||
if (!ret) {
|
if (ret != 0) {
|
||||||
fprintf(stderr, "%s \n", raft_errmsg(&pRaftServer->raft));
|
fprintf(stderr, "%s \n", raft_errmsg(&pRaftServer->raft));
|
||||||
|
assert(0);
|
||||||
}
|
}
|
||||||
|
|
||||||
struct raft_configuration conf;
|
struct raft_configuration conf;
|
||||||
raft_configuration_init(&conf);
|
raft_configuration_init(&conf);
|
||||||
|
|
||||||
|
if (pConf->voter == 0) {
|
||||||
|
raft_configuration_add(&conf, pRaftServer->raftId, pRaftServer->address, RAFT_SPARE);
|
||||||
|
|
||||||
|
} else {
|
||||||
raft_configuration_add(&conf, pRaftServer->raftId, pRaftServer->address, RAFT_VOTER);
|
raft_configuration_add(&conf, pRaftServer->raftId, pRaftServer->address, RAFT_VOTER);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
printf("add myself: %llu - %s \n", pRaftServer->raftId, pRaftServer->address);
|
printf("add myself: %llu - %s \n", pRaftServer->raftId, pRaftServer->address);
|
||||||
|
|
||||||
|
|
||||||
for (int i = 0; i < pConf->peersCount; ++i) {
|
for (int i = 0; i < pConf->peersCount; ++i) {
|
||||||
const Addr *pAddr = &pConf->peers[i];
|
const Addr *pAddr = &pConf->peers[i];
|
||||||
raft_id rid = raftId(pAddr->host, pAddr->port);
|
raft_id rid = raftId(pAddr->host, pAddr->port);
|
||||||
|
|
|
@ -0,0 +1,7 @@
|
||||||
|
add_executable(raftMain "")
|
||||||
|
target_sources(raftMain
|
||||||
|
PRIVATE
|
||||||
|
"raftMain.c"
|
||||||
|
"raftServer.c"
|
||||||
|
)
|
||||||
|
target_link_libraries(raftMain PUBLIC traft lz4 uv_a)
|
|
@ -0,0 +1,4 @@
|
||||||
|
#!/bin/bash
|
||||||
|
|
||||||
|
rm -rf 127.0.0.1*
|
||||||
|
rm -rf ./data
|
|
@ -0,0 +1,36 @@
|
||||||
|
#ifndef TDENGINE_COMMON_H
|
||||||
|
#define TDENGINE_COMMON_H
|
||||||
|
|
||||||
|
#ifdef __cplusplus
|
||||||
|
extern "C" {
|
||||||
|
#endif
|
||||||
|
|
||||||
|
#include <stdint.h>
|
||||||
|
|
||||||
|
#define MAX_INSTANCE_NUM 100
|
||||||
|
|
||||||
|
#define MAX_PEERS 10
|
||||||
|
#define COMMAND_LEN 1024
|
||||||
|
#define TOKEN_LEN 128
|
||||||
|
#define DIR_LEN 256
|
||||||
|
#define HOST_LEN 64
|
||||||
|
#define ADDRESS_LEN (HOST_LEN + 16)
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
char host[HOST_LEN];
|
||||||
|
uint32_t port;
|
||||||
|
} Addr;
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
Addr me;
|
||||||
|
Addr peers[MAX_PEERS];
|
||||||
|
int peersCount;
|
||||||
|
char dir[DIR_LEN];
|
||||||
|
char dataDir[DIR_LEN + HOST_LEN * 2];
|
||||||
|
} SRaftServerConfig;
|
||||||
|
|
||||||
|
#ifdef __cplusplus
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
|
#endif // TDENGINE_COMMON_H
|
|
@ -0,0 +1,18 @@
|
||||||
|
|
||||||
|
make raftServer
|
||||||
|
|
||||||
|
all:
|
||||||
|
gcc raftMain.c raftServer.c -I ../../traft/include/ ../../traft/.libs/libraft.a -o raftMain -luv -llz4 -lpthread -g
|
||||||
|
clean:
|
||||||
|
rm -f raftMain
|
||||||
|
sh clear.sh
|
||||||
|
|
||||||
|
|
||||||
|
make traft:
|
||||||
|
|
||||||
|
sudo apt-get install libuv1-dev liblz4-dev
|
||||||
|
autoreconf -i
|
||||||
|
./configure --enable-example
|
||||||
|
make
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,659 @@
|
||||||
|
#include <stdio.h>
|
||||||
|
#include <unistd.h>
|
||||||
|
#include <pthread.h>
|
||||||
|
#include <string.h>
|
||||||
|
#include <assert.h>
|
||||||
|
#include <getopt.h>
|
||||||
|
#include <time.h>
|
||||||
|
#include <stdlib.h>
|
||||||
|
#include <getopt.h>
|
||||||
|
#include <raft.h>
|
||||||
|
#include <raft/uv.h>
|
||||||
|
#include "raftServer.h"
|
||||||
|
#include "common.h"
|
||||||
|
|
||||||
|
const char *exe_name;
|
||||||
|
|
||||||
|
typedef struct LeaderState {
|
||||||
|
char address[48];
|
||||||
|
int leaderCount;
|
||||||
|
|
||||||
|
} LeaderState;
|
||||||
|
|
||||||
|
#define NODE_COUNT 3
|
||||||
|
LeaderState leaderStates[NODE_COUNT];
|
||||||
|
|
||||||
|
void printLeaderCount() {
|
||||||
|
for (int i = 0; i < NODE_COUNT; ++i) {
|
||||||
|
printf("%s: leaderCount:%d \n", leaderStates[i].address, leaderStates[i].leaderCount);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void updateLeaderStates(SRaftServer *pRaftServer) {
|
||||||
|
for (int i = 0; i < pRaftServer->instance[0].raft.configuration.n; ++i) {
|
||||||
|
snprintf(leaderStates[i].address, sizeof(leaderStates[i].address), "%s", pRaftServer->instance[0].raft.configuration.servers[i].address);
|
||||||
|
leaderStates[i].leaderCount = 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
for (int i = 0; i < pRaftServer->instanceCount; ++i) {
|
||||||
|
struct raft *r = &pRaftServer->instance[i].raft;
|
||||||
|
|
||||||
|
char leaderAddress[128];
|
||||||
|
memset(leaderAddress, 0, sizeof(leaderAddress));
|
||||||
|
|
||||||
|
if (r->state == RAFT_LEADER) {
|
||||||
|
snprintf(leaderAddress, sizeof(leaderAddress), "%s", r->address);
|
||||||
|
} else if (r->state == RAFT_FOLLOWER) {
|
||||||
|
snprintf(leaderAddress, sizeof(leaderAddress), "%s", r->follower_state.current_leader.address);
|
||||||
|
}
|
||||||
|
|
||||||
|
for (int j = 0; j < NODE_COUNT; j++) {
|
||||||
|
if (strcmp(leaderAddress, leaderStates[j].address) == 0) {
|
||||||
|
leaderStates[j].leaderCount++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
void raftTransferCb(struct raft_transfer *req) {
|
||||||
|
SRaftServer *pRaftServer = req->data;
|
||||||
|
raft_free(req);
|
||||||
|
|
||||||
|
printf("raftTransferCb: \n");
|
||||||
|
updateLeaderStates(pRaftServer);
|
||||||
|
printLeaderCount();
|
||||||
|
|
||||||
|
int myLeaderCount;
|
||||||
|
for (int i = 0; i < NODE_COUNT; ++i) {
|
||||||
|
if (strcmp(pRaftServer->address, leaderStates[i].address) == 0) {
|
||||||
|
myLeaderCount = leaderStates[i].leaderCount;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
printf("myLeaderCount:%d waterLevel:%d \n", myLeaderCount, pRaftServer->instanceCount / NODE_COUNT);
|
||||||
|
if (myLeaderCount > pRaftServer->instanceCount / NODE_COUNT) {
|
||||||
|
struct raft *r;
|
||||||
|
for (int j = 0; j < pRaftServer->instanceCount; ++j) {
|
||||||
|
if (pRaftServer->instance[j].raft.state == RAFT_LEADER) {
|
||||||
|
r = &pRaftServer->instance[j].raft;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
struct raft_transfer *transfer = raft_malloc(sizeof(*transfer));
|
||||||
|
transfer->data = pRaftServer;
|
||||||
|
|
||||||
|
uint64_t destRaftId;
|
||||||
|
int minIndex = -1;
|
||||||
|
int minLeaderCount = myLeaderCount;
|
||||||
|
for (int j = 0; j < NODE_COUNT; ++j) {
|
||||||
|
if (strcmp(leaderStates[j].address, pRaftServer->address) == 0) continue;
|
||||||
|
if (leaderStates[j].leaderCount <= minLeaderCount) {
|
||||||
|
minIndex = j;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
char myHost[48];
|
||||||
|
uint16_t myPort;
|
||||||
|
uint16_t myVid;
|
||||||
|
decodeRaftId(r->id, myHost, sizeof(myHost), &myPort, &myVid);
|
||||||
|
|
||||||
|
char *destAddress = leaderStates[minIndex].address;
|
||||||
|
|
||||||
|
char tokens[MAX_PEERS][MAX_TOKEN_LEN];
|
||||||
|
splitString(destAddress, ":", tokens, 2);
|
||||||
|
char *destHost = tokens[0];
|
||||||
|
uint16_t destPort = atoi(tokens[1]);
|
||||||
|
destRaftId = encodeRaftId(destHost, destPort, myVid);
|
||||||
|
|
||||||
|
raft_transfer(r, transfer, destRaftId, raftTransferCb);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
void parseAddr(const char *addr, char *host, int len, uint32_t *port) {
|
||||||
|
char* tmp = (char*)malloc(strlen(addr) + 1);
|
||||||
|
strcpy(tmp, addr);
|
||||||
|
|
||||||
|
char* context;
|
||||||
|
char* separator = ":";
|
||||||
|
char* token = strtok_r(tmp, separator, &context);
|
||||||
|
if (token) {
|
||||||
|
snprintf(host, len, "%s", token);
|
||||||
|
}
|
||||||
|
|
||||||
|
token = strtok_r(NULL, separator, &context);
|
||||||
|
if (token) {
|
||||||
|
sscanf(token, "%u", port);
|
||||||
|
}
|
||||||
|
|
||||||
|
free(tmp);
|
||||||
|
}
|
||||||
|
|
||||||
|
// only parse 3 tokens
|
||||||
|
int parseCommand3(const char* str, char* token1, char* token2, char* token3, int len)
|
||||||
|
{
|
||||||
|
char* tmp = (char*)malloc(strlen(str) + 1);
|
||||||
|
strcpy(tmp, str);
|
||||||
|
|
||||||
|
char* context;
|
||||||
|
char* separator = " ";
|
||||||
|
int n = 0;
|
||||||
|
|
||||||
|
char* token = strtok_r(tmp, separator, &context);
|
||||||
|
if (!token) {
|
||||||
|
goto ret;
|
||||||
|
}
|
||||||
|
if (strcmp(token, "") != 0) {
|
||||||
|
strncpy(token1, token, len);
|
||||||
|
n++;
|
||||||
|
}
|
||||||
|
|
||||||
|
token = strtok_r(NULL, separator, &context);
|
||||||
|
if (!token) {
|
||||||
|
goto ret;
|
||||||
|
}
|
||||||
|
if (strcmp(token, "") != 0) {
|
||||||
|
strncpy(token2, token, len);
|
||||||
|
n++;
|
||||||
|
}
|
||||||
|
|
||||||
|
token = strtok_r(NULL, separator, &context);
|
||||||
|
if (!token) {
|
||||||
|
goto ret;
|
||||||
|
}
|
||||||
|
if (strcmp(token, "") != 0) {
|
||||||
|
strncpy(token3, token, len);
|
||||||
|
n++;
|
||||||
|
}
|
||||||
|
|
||||||
|
ret:
|
||||||
|
return n;
|
||||||
|
free(tmp);
|
||||||
|
}
|
||||||
|
|
||||||
|
// only parse 4 tokens
|
||||||
|
int parseCommand4(const char* str, char* token1, char* token2, char* token3, char *token4, int len)
|
||||||
|
{
|
||||||
|
char* tmp = (char*)malloc(strlen(str) + 1);
|
||||||
|
strcpy(tmp, str);
|
||||||
|
|
||||||
|
char* context;
|
||||||
|
char* separator = " ";
|
||||||
|
int n = 0;
|
||||||
|
|
||||||
|
char* token = strtok_r(tmp, separator, &context);
|
||||||
|
if (!token) {
|
||||||
|
goto ret;
|
||||||
|
}
|
||||||
|
if (strcmp(token, "") != 0) {
|
||||||
|
strncpy(token1, token, len);
|
||||||
|
n++;
|
||||||
|
}
|
||||||
|
|
||||||
|
token = strtok_r(NULL, separator, &context);
|
||||||
|
if (!token) {
|
||||||
|
goto ret;
|
||||||
|
}
|
||||||
|
if (strcmp(token, "") != 0) {
|
||||||
|
strncpy(token2, token, len);
|
||||||
|
n++;
|
||||||
|
}
|
||||||
|
|
||||||
|
token = strtok_r(NULL, separator, &context);
|
||||||
|
if (!token) {
|
||||||
|
goto ret;
|
||||||
|
}
|
||||||
|
if (strcmp(token, "") != 0) {
|
||||||
|
strncpy(token3, token, len);
|
||||||
|
n++;
|
||||||
|
}
|
||||||
|
|
||||||
|
token = strtok_r(NULL, separator, &context);
|
||||||
|
if (!token) {
|
||||||
|
goto ret;
|
||||||
|
}
|
||||||
|
if (strcmp(token, "") != 0) {
|
||||||
|
strncpy(token4, token, len);
|
||||||
|
n++;
|
||||||
|
}
|
||||||
|
|
||||||
|
ret:
|
||||||
|
return n;
|
||||||
|
free(tmp);
|
||||||
|
}
|
||||||
|
|
||||||
|
void *startServerFunc(void *param) {
|
||||||
|
SRaftServer *pServer = (SRaftServer*)param;
|
||||||
|
int32_t r = raftServerStart(pServer);
|
||||||
|
assert(r == 0);
|
||||||
|
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Console ---------------------------------
|
||||||
|
const char* state2String(unsigned short state) {
|
||||||
|
if (state == RAFT_UNAVAILABLE) {
|
||||||
|
return "RAFT_UNAVAILABLE";
|
||||||
|
|
||||||
|
} else if (state == RAFT_FOLLOWER) {
|
||||||
|
return "RAFT_FOLLOWER";
|
||||||
|
|
||||||
|
} else if (state == RAFT_CANDIDATE) {
|
||||||
|
return "RAFT_CANDIDATE";
|
||||||
|
|
||||||
|
} else if (state == RAFT_LEADER) {
|
||||||
|
return "RAFT_LEADER";
|
||||||
|
|
||||||
|
}
|
||||||
|
return "UNKNOWN_RAFT_STATE";
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
void printRaftState2(struct raft *r) {
|
||||||
|
|
||||||
|
char leaderAddress[128];
|
||||||
|
memset(leaderAddress, 0, sizeof(leaderAddress));
|
||||||
|
|
||||||
|
if (r->state == RAFT_LEADER) {
|
||||||
|
snprintf(leaderAddress, sizeof(leaderAddress), "%s", r->address);
|
||||||
|
} else if (r->state == RAFT_FOLLOWER) {
|
||||||
|
snprintf(leaderAddress, sizeof(leaderAddress), "%s", r->follower_state.current_leader.address);
|
||||||
|
}
|
||||||
|
|
||||||
|
for (int i = 0; i < r->configuration.n; ++i) {
|
||||||
|
char tmpAddress[128];
|
||||||
|
snprintf(tmpAddress, sizeof(tmpAddress), "%s", r->configuration.servers[i].address);
|
||||||
|
|
||||||
|
uint64_t raftId = r->configuration.servers[i].id;
|
||||||
|
char host[128];
|
||||||
|
uint16_t port;
|
||||||
|
uint16_t vid;
|
||||||
|
decodeRaftId(raftId, host, 128, &port, &vid);
|
||||||
|
|
||||||
|
char buf[512];
|
||||||
|
memset(buf, 0, sizeof(buf));
|
||||||
|
if (strcmp(tmpAddress, leaderAddress) == 0) {
|
||||||
|
snprintf(buf, sizeof(buf), "<%s:%u-%u-LEADER>\t", host, port, vid);
|
||||||
|
} else {
|
||||||
|
snprintf(buf, sizeof(buf), "<%s:%u-%u-FOLLOWER>\t", host, port, vid);
|
||||||
|
}
|
||||||
|
printf("%s", buf);
|
||||||
|
}
|
||||||
|
printf("\n");
|
||||||
|
}
|
||||||
|
|
||||||
|
void printRaftConfiguration(struct raft_configuration *c) {
|
||||||
|
printf("configuration: \n");
|
||||||
|
for (int i = 0; i < c->n; ++i) {
|
||||||
|
printf("%llu -- %d -- %s\n", c->servers[i].id, c->servers[i].role, c->servers[i].address);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void printRaftState(struct raft *r) {
|
||||||
|
printf("----Raft State: -----------\n");
|
||||||
|
printf("mem_addr: %p \n", r);
|
||||||
|
printf("my_id: %llu \n", r->id);
|
||||||
|
printf("address: %s \n", r->address);
|
||||||
|
printf("current_term: %llu \n", r->current_term);
|
||||||
|
printf("voted_for: %llu \n", r->voted_for);
|
||||||
|
printf("role: %s \n", state2String(r->state));
|
||||||
|
printf("commit_index: %llu \n", r->commit_index);
|
||||||
|
printf("last_applied: %llu \n", r->last_applied);
|
||||||
|
printf("last_stored: %llu \n", r->last_stored);
|
||||||
|
|
||||||
|
printf("configuration_index: %llu \n", r->configuration_index);
|
||||||
|
printf("configuration_uncommitted_index: %llu \n", r->configuration_uncommitted_index);
|
||||||
|
printRaftConfiguration(&r->configuration);
|
||||||
|
|
||||||
|
printf("----------------------------\n");
|
||||||
|
}
|
||||||
|
|
||||||
|
void putValueCb(struct raft_apply *req, int status, void *result) {
|
||||||
|
raft_free(req);
|
||||||
|
struct raft *r = req->data;
|
||||||
|
if (status != 0) {
|
||||||
|
printf("putValueCb: %s \n", raft_errmsg(r));
|
||||||
|
} else {
|
||||||
|
printf("putValueCb: %s \n", "ok");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void putValue(struct raft *r, const char *value) {
|
||||||
|
struct raft_buffer buf;
|
||||||
|
|
||||||
|
buf.len = TOKEN_LEN;;
|
||||||
|
buf.base = raft_malloc(buf.len);
|
||||||
|
snprintf(buf.base, buf.len, "%s", value);
|
||||||
|
|
||||||
|
struct raft_apply *req = raft_malloc(sizeof(struct raft_apply));
|
||||||
|
req->data = r;
|
||||||
|
int ret = raft_apply(r, req, &buf, 1, putValueCb);
|
||||||
|
if (ret == 0) {
|
||||||
|
printf("put %s \n", (char*)buf.base);
|
||||||
|
} else {
|
||||||
|
printf("put error: %s \n", raft_errmsg(r));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void getValue(const char *key) {
|
||||||
|
char *ptr = getKV(key);
|
||||||
|
if (ptr) {
|
||||||
|
printf("get value: [%s] \n", ptr);
|
||||||
|
} else {
|
||||||
|
printf("value not found for key: [%s] \n", key);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void console(SRaftServer *pRaftServer) {
|
||||||
|
while (1) {
|
||||||
|
char cmd_buf[COMMAND_LEN];
|
||||||
|
memset(cmd_buf, 0, sizeof(cmd_buf));
|
||||||
|
char *ret = fgets(cmd_buf, COMMAND_LEN, stdin);
|
||||||
|
if (!ret) {
|
||||||
|
exit(-1);
|
||||||
|
}
|
||||||
|
|
||||||
|
int pos = strlen(cmd_buf);
|
||||||
|
if(cmd_buf[pos - 1] == '\n') {
|
||||||
|
cmd_buf[pos - 1] = '\0';
|
||||||
|
}
|
||||||
|
|
||||||
|
if (strncmp(cmd_buf, "", COMMAND_LEN) == 0) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
char cmd[TOKEN_LEN];
|
||||||
|
memset(cmd, 0, sizeof(cmd));
|
||||||
|
|
||||||
|
char param1[TOKEN_LEN];
|
||||||
|
memset(param1, 0, sizeof(param1));
|
||||||
|
|
||||||
|
char param2[TOKEN_LEN];
|
||||||
|
memset(param2, 0, sizeof(param2));
|
||||||
|
|
||||||
|
char param3[TOKEN_LEN];
|
||||||
|
memset(param2, 0, sizeof(param2));
|
||||||
|
|
||||||
|
parseCommand4(cmd_buf, cmd, param1, param2, param3, TOKEN_LEN);
|
||||||
|
if (strcmp(cmd, "addnode") == 0) {
|
||||||
|
printf("not support \n");
|
||||||
|
|
||||||
|
/*
|
||||||
|
char host[HOST_LEN];
|
||||||
|
uint32_t port;
|
||||||
|
parseAddr(param1, host, HOST_LEN, &port);
|
||||||
|
uint64_t rid = raftId(host, port);
|
||||||
|
|
||||||
|
struct raft_change *req = raft_malloc(sizeof(*req));
|
||||||
|
int r = raft_add(&pRaftServer->raft, req, rid, param1, NULL);
|
||||||
|
if (r != 0) {
|
||||||
|
printf("raft_add: %s \n", raft_errmsg(&pRaftServer->raft));
|
||||||
|
}
|
||||||
|
printf("add node: %lu %s \n", rid, param1);
|
||||||
|
|
||||||
|
struct raft_change *req2 = raft_malloc(sizeof(*req2));
|
||||||
|
r = raft_assign(&pRaftServer->raft, req2, rid, RAFT_VOTER, NULL);
|
||||||
|
if (r != 0) {
|
||||||
|
printf("raft_assign: %s \n", raft_errmsg(&pRaftServer->raft));
|
||||||
|
}
|
||||||
|
*/
|
||||||
|
|
||||||
|
} else if (strcmp(cmd, "dropnode") == 0) {
|
||||||
|
printf("not support \n");
|
||||||
|
|
||||||
|
} else if (strcmp(cmd, "rebalance") == 0) {
|
||||||
|
|
||||||
|
/*
|
||||||
|
updateLeaderStates(pRaftServer);
|
||||||
|
|
||||||
|
int myLeaderCount;
|
||||||
|
for (int i = 0; i < NODE_COUNT; ++i) {
|
||||||
|
if (strcmp(pRaftServer->address, leaderStates[i].address) == 0) {
|
||||||
|
myLeaderCount = leaderStates[i].leaderCount;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
while (myLeaderCount > pRaftServer->instanceCount / NODE_COUNT) {
|
||||||
|
printf("myLeaderCount:%d waterLevel:%d \n", myLeaderCount, pRaftServer->instanceCount / NODE_COUNT);
|
||||||
|
|
||||||
|
struct raft *r;
|
||||||
|
for (int j = 0; j < pRaftServer->instanceCount; ++j) {
|
||||||
|
if (pRaftServer->instance[j].raft.state == RAFT_LEADER) {
|
||||||
|
r = &pRaftServer->instance[j].raft;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
struct raft_transfer *transfer = raft_malloc(sizeof(*transfer));
|
||||||
|
transfer->data = pRaftServer;
|
||||||
|
|
||||||
|
uint64_t destRaftId;
|
||||||
|
int minIndex = -1;
|
||||||
|
int minLeaderCount = myLeaderCount;
|
||||||
|
for (int j = 0; j < NODE_COUNT; ++j) {
|
||||||
|
if (strcmp(leaderStates[j].address, pRaftServer->address) == 0) continue;
|
||||||
|
|
||||||
|
printf("-----leaderStates[%d].leaderCount:%d \n", j, leaderStates[j].leaderCount);
|
||||||
|
if (leaderStates[j].leaderCount <= minLeaderCount) {
|
||||||
|
minIndex = j;
|
||||||
|
printf("++++ assign minIndex : %d \n", minIndex);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
printf("minIndex:%d minLeaderCount:%d \n", minIndex, minLeaderCount);
|
||||||
|
|
||||||
|
char myHost[48];
|
||||||
|
uint16_t myPort;
|
||||||
|
uint16_t myVid;
|
||||||
|
decodeRaftId(r->id, myHost, sizeof(myHost), &myPort, &myVid);
|
||||||
|
|
||||||
|
char *destAddress = leaderStates[minIndex].address;
|
||||||
|
|
||||||
|
char tokens[MAX_PEERS][MAX_TOKEN_LEN];
|
||||||
|
splitString(destAddress, ":", tokens, 2);
|
||||||
|
char *destHost = tokens[0];
|
||||||
|
uint16_t destPort = atoi(tokens[1]);
|
||||||
|
destRaftId = encodeRaftId(destHost, destPort, myVid);
|
||||||
|
|
||||||
|
printf("destHost:%s destPort:%u myVid:%u", destHost, destPort, myVid);
|
||||||
|
raft_transfer(r, transfer, destRaftId, raftTransferCb);
|
||||||
|
sleep(1);
|
||||||
|
|
||||||
|
for (int i = 0; i < NODE_COUNT; ++i) {
|
||||||
|
if (strcmp(pRaftServer->address, leaderStates[i].address) == 0) {
|
||||||
|
myLeaderCount = leaderStates[i].leaderCount;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
*/
|
||||||
|
|
||||||
|
|
||||||
|
int leaderCount = 0;
|
||||||
|
|
||||||
|
struct raft *firstR;
|
||||||
|
for (int i = 0; i < pRaftServer->instanceCount; ++i) {
|
||||||
|
struct raft *r = &pRaftServer->instance[i].raft;
|
||||||
|
if (r->state == RAFT_LEADER) {
|
||||||
|
leaderCount++;
|
||||||
|
firstR = r;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (leaderCount > pRaftServer->instanceCount / NODE_COUNT) {
|
||||||
|
struct raft_transfer *transfer = raft_malloc(sizeof(*transfer));
|
||||||
|
transfer->data = pRaftServer;
|
||||||
|
raft_transfer(firstR, transfer, 0, raftTransferCb);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
} else if (strcmp(cmd, "put") == 0) {
|
||||||
|
char buf[256];
|
||||||
|
uint16_t vid;
|
||||||
|
sscanf(param1, "%hu", &vid);
|
||||||
|
snprintf(buf, sizeof(buf), "%s--%s", param2, param3);
|
||||||
|
putValue(&pRaftServer->instance[vid].raft, buf);
|
||||||
|
|
||||||
|
} else if (strcmp(cmd, "get") == 0) {
|
||||||
|
getValue(param1);
|
||||||
|
|
||||||
|
} else if (strcmp(cmd, "transfer") == 0) {
|
||||||
|
uint16_t vid;
|
||||||
|
sscanf(param1, "%hu", &vid);
|
||||||
|
|
||||||
|
struct raft_transfer transfer;
|
||||||
|
raft_transfer(&pRaftServer->instance[vid].raft, &transfer, 0, NULL);
|
||||||
|
|
||||||
|
|
||||||
|
} else if (strcmp(cmd, "state") == 0) {
|
||||||
|
for (int i = 0; i < pRaftServer->instanceCount; ++i) {
|
||||||
|
printf("instance %d: ", i);
|
||||||
|
printRaftState(&pRaftServer->instance[i].raft);
|
||||||
|
}
|
||||||
|
|
||||||
|
} else if (strcmp(cmd, "state2") == 0) {
|
||||||
|
for (int i = 0; i < pRaftServer->instanceCount; ++i) {
|
||||||
|
printRaftState2(&pRaftServer->instance[i].raft);
|
||||||
|
}
|
||||||
|
|
||||||
|
} else if (strcmp(cmd, "snapshot") == 0) {
|
||||||
|
printf("not support \n");
|
||||||
|
|
||||||
|
} else if (strcmp(cmd, "help") == 0) {
|
||||||
|
printf("addnode \"127.0.0.1:8888\" \n");
|
||||||
|
printf("dropnode \"127.0.0.1:8888\" \n");
|
||||||
|
printf("put key value \n");
|
||||||
|
printf("get key \n");
|
||||||
|
printf("state \n");
|
||||||
|
|
||||||
|
} else {
|
||||||
|
printf("unknown command: [%s], type \"help\" to see help \n", cmd);
|
||||||
|
}
|
||||||
|
|
||||||
|
//printf("cmd_buf: [%s] \n", cmd_buf);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void *startConsoleFunc(void *param) {
|
||||||
|
SRaftServer *pServer = (SRaftServer*)param;
|
||||||
|
console(pServer);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Config ---------------------------------
|
||||||
|
void usage() {
|
||||||
|
printf("\nusage: \n");
|
||||||
|
printf("%s --me=127.0.0.1:10000 --dir=./data \n", exe_name);
|
||||||
|
printf("\n");
|
||||||
|
printf("%s --me=127.0.0.1:10000 --peers=127.0.0.1:10001,127.0.0.1:10002 --dir=./data \n", exe_name);
|
||||||
|
printf("%s --me=127.0.0.1:10001 --peers=127.0.0.1:10000,127.0.0.1:10002 --dir=./data \n", exe_name);
|
||||||
|
printf("%s --me=127.0.0.1:10002 --peers=127.0.0.1:10000,127.0.0.1:10001 --dir=./data \n", exe_name);
|
||||||
|
printf("\n");
|
||||||
|
}
|
||||||
|
|
||||||
|
void parseConf(int argc, char **argv, SRaftServerConfig *pConf) {
|
||||||
|
memset(pConf, 0, sizeof(*pConf));
|
||||||
|
|
||||||
|
int option_index, option_value;
|
||||||
|
option_index = 0;
|
||||||
|
static struct option long_options[] = {
|
||||||
|
{"help", no_argument, NULL, 'h'},
|
||||||
|
{"peers", required_argument, NULL, 'p'},
|
||||||
|
{"me", required_argument, NULL, 'm'},
|
||||||
|
{"dir", required_argument, NULL, 'd'},
|
||||||
|
{NULL, 0, NULL, 0}
|
||||||
|
};
|
||||||
|
|
||||||
|
while ((option_value = getopt_long(argc, argv, "hp:m:d:", long_options, &option_index)) != -1) {
|
||||||
|
switch (option_value) {
|
||||||
|
case 'm': {
|
||||||
|
parseAddr(optarg, pConf->me.host, sizeof(pConf->me.host), &pConf->me.port);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
case 'p': {
|
||||||
|
char tokens[MAX_PEERS][MAX_TOKEN_LEN];
|
||||||
|
int peerCount = splitString(optarg, ",", tokens, MAX_PEERS);
|
||||||
|
pConf->peersCount = peerCount;
|
||||||
|
for (int i = 0; i < peerCount; ++i) {
|
||||||
|
Addr *pAddr = &pConf->peers[i];
|
||||||
|
parseAddr(tokens[i], pAddr->host, sizeof(pAddr->host), &pAddr->port);
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
case 'd': {
|
||||||
|
snprintf(pConf->dir, sizeof(pConf->dir), "%s", optarg);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
case 'h': {
|
||||||
|
usage();
|
||||||
|
exit(-1);
|
||||||
|
}
|
||||||
|
|
||||||
|
default: {
|
||||||
|
usage();
|
||||||
|
exit(-1);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
snprintf(pConf->dataDir, sizeof(pConf->dataDir), "%s/%s_%u", pConf->dir, pConf->me.host, pConf->me.port);
|
||||||
|
}
|
||||||
|
|
||||||
|
void printConf(SRaftServerConfig *pConf) {
|
||||||
|
printf("\nconf: \n");
|
||||||
|
printf("me: %s:%u \n", pConf->me.host, pConf->me.port);
|
||||||
|
printf("peersCount: %d \n", pConf->peersCount);
|
||||||
|
for (int i = 0; i < pConf->peersCount; ++i) {
|
||||||
|
Addr *pAddr = &pConf->peers[i];
|
||||||
|
printf("peer%d: %s:%u \n", i, pAddr->host, pAddr->port);
|
||||||
|
}
|
||||||
|
printf("dataDir: %s \n\n", pConf->dataDir);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
int main(int argc, char **argv) {
|
||||||
|
srand(time(NULL));
|
||||||
|
int32_t ret;
|
||||||
|
|
||||||
|
exe_name = argv[0];
|
||||||
|
if (argc < 3) {
|
||||||
|
usage();
|
||||||
|
exit(-1);
|
||||||
|
}
|
||||||
|
|
||||||
|
SRaftServerConfig conf;
|
||||||
|
parseConf(argc, argv, &conf);
|
||||||
|
printConf(&conf);
|
||||||
|
|
||||||
|
signal(SIGPIPE, SIG_IGN);
|
||||||
|
|
||||||
|
/*
|
||||||
|
char cmd_buf[COMMAND_LEN];
|
||||||
|
snprintf(cmd_buf, sizeof(cmd_buf), "mkdir -p %s", conf.dataDir);
|
||||||
|
system(cmd_buf);
|
||||||
|
*/
|
||||||
|
|
||||||
|
|
||||||
|
struct raft_fsm fsm;
|
||||||
|
initFsm(&fsm);
|
||||||
|
|
||||||
|
SRaftServer raftServer;
|
||||||
|
ret = raftServerInit(&raftServer, &conf, &fsm);
|
||||||
|
assert(ret == 0);
|
||||||
|
|
||||||
|
pthread_t tidRaftServer;
|
||||||
|
pthread_create(&tidRaftServer, NULL, startServerFunc, &raftServer);
|
||||||
|
|
||||||
|
pthread_t tidConsole;
|
||||||
|
pthread_create(&tidConsole, NULL, startConsoleFunc, &raftServer);
|
||||||
|
|
||||||
|
while (1) {
|
||||||
|
sleep(10);
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
|
@ -0,0 +1,222 @@
|
||||||
|
#include <unistd.h>
|
||||||
|
#include <stdlib.h>
|
||||||
|
#include "common.h"
|
||||||
|
#include "raftServer.h"
|
||||||
|
|
||||||
|
char *keys;
|
||||||
|
char *values;
|
||||||
|
|
||||||
|
void initStore() {
|
||||||
|
keys = malloc(MAX_RECORD_COUNT * MAX_KV_LEN);
|
||||||
|
values = malloc(MAX_RECORD_COUNT * MAX_KV_LEN);
|
||||||
|
writeIndex = 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
void destroyStore() {
|
||||||
|
free(keys);
|
||||||
|
free(values);
|
||||||
|
}
|
||||||
|
|
||||||
|
void putKV(const char *key, const char *value) {
|
||||||
|
if (writeIndex < MAX_RECORD_COUNT) {
|
||||||
|
strncpy(&keys[writeIndex], key, MAX_KV_LEN);
|
||||||
|
strncpy(&values[writeIndex], value, MAX_KV_LEN);
|
||||||
|
writeIndex++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
char *getKV(const char *key) {
|
||||||
|
for (int i = 0; i < MAX_RECORD_COUNT; ++i) {
|
||||||
|
if (strcmp(&keys[i], key) == 0) {
|
||||||
|
return &values[i];
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
int splitString(const char* str, char* separator, char (*arr)[MAX_TOKEN_LEN], int n_arr)
|
||||||
|
{
|
||||||
|
if (n_arr <= 0) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
char* tmp = (char*)malloc(strlen(str) + 1);
|
||||||
|
strcpy(tmp, str);
|
||||||
|
char* context;
|
||||||
|
int n = 0;
|
||||||
|
|
||||||
|
char* token = strtok_r(tmp, separator, &context);
|
||||||
|
if (!token) {
|
||||||
|
goto ret;
|
||||||
|
}
|
||||||
|
strncpy(arr[n], token, MAX_TOKEN_LEN);
|
||||||
|
n++;
|
||||||
|
|
||||||
|
while (1) {
|
||||||
|
token = strtok_r(NULL, separator, &context);
|
||||||
|
if (!token || n >= n_arr) {
|
||||||
|
goto ret;
|
||||||
|
}
|
||||||
|
strncpy(arr[n], token, MAX_TOKEN_LEN);
|
||||||
|
n++;
|
||||||
|
}
|
||||||
|
|
||||||
|
ret:
|
||||||
|
free(tmp);
|
||||||
|
return n;
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
uint64_t raftId(const char *host, uint32_t port) {
|
||||||
|
uint32_t host_uint32 = (uint32_t)inet_addr(host);
|
||||||
|
assert(host_uint32 != (uint32_t)-1);
|
||||||
|
uint64_t code = ((uint64_t)host_uint32) << 32 | port;
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
*/
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
uint64_t encodeRaftId(const char *host, uint16_t port, uint16_t vid) {
|
||||||
|
uint64_t raftId;
|
||||||
|
uint32_t host_uint32 = (uint32_t)inet_addr(host);
|
||||||
|
assert(host_uint32 != (uint32_t)-1);
|
||||||
|
|
||||||
|
raftId = (((uint64_t)host_uint32) << 32) | (((uint32_t)port) << 16) | vid;
|
||||||
|
return raftId;
|
||||||
|
}
|
||||||
|
|
||||||
|
void decodeRaftId(uint64_t raftId, char *host, int32_t len, uint16_t *port, uint16_t *vid) {
|
||||||
|
uint32_t host32 = (uint32_t)((raftId >> 32) & 0x00000000FFFFFFFF);
|
||||||
|
|
||||||
|
struct in_addr addr;
|
||||||
|
addr.s_addr = host32;
|
||||||
|
snprintf(host, len, "%s", inet_ntoa(addr));
|
||||||
|
|
||||||
|
*port = (uint16_t)((raftId & 0x00000000FFFF0000) >> 16);
|
||||||
|
*vid = (uint16_t)(raftId & 0x000000000000FFFF);
|
||||||
|
}
|
||||||
|
*/
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
int32_t raftServerInit(SRaftServer *pRaftServer, const SRaftServerConfig *pConf, struct raft_fsm *pFsm) {
|
||||||
|
int ret;
|
||||||
|
|
||||||
|
snprintf(pRaftServer->host, sizeof(pRaftServer->host), "%s", pConf->me.host);
|
||||||
|
pRaftServer->port = pConf->me.port;
|
||||||
|
snprintf(pRaftServer->address, sizeof(pRaftServer->address), "%s:%u", pRaftServer->host, pRaftServer->port);
|
||||||
|
//strncpy(pRaftServer->dir, pConf->dataDir, sizeof(pRaftServer->dir));
|
||||||
|
|
||||||
|
ret = uv_loop_init(&pRaftServer->loop);
|
||||||
|
if (ret != 0) {
|
||||||
|
fprintf(stderr, "uv_loop_init error: %s \n", uv_strerror(ret));
|
||||||
|
assert(0);
|
||||||
|
}
|
||||||
|
|
||||||
|
ret = raft_uv_tcp_init(&pRaftServer->transport, &pRaftServer->loop);
|
||||||
|
if (ret != 0) {
|
||||||
|
fprintf(stderr, "raft_uv_tcp_init: error %d \n", ret);
|
||||||
|
assert(0);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
uint16_t vid;
|
||||||
|
pRaftServer->instanceCount = 20;
|
||||||
|
|
||||||
|
|
||||||
|
for (int i = 0; i < pRaftServer->instanceCount; ++i)
|
||||||
|
{
|
||||||
|
//vid = 0;
|
||||||
|
vid = i;
|
||||||
|
|
||||||
|
|
||||||
|
pRaftServer->instance[vid].raftId = encodeRaftId(pRaftServer->host, pRaftServer->port, vid);
|
||||||
|
snprintf(pRaftServer->instance[vid].dir, sizeof(pRaftServer->instance[vid].dir), "%s_%llu", pConf->dataDir, pRaftServer->instance[vid].raftId);
|
||||||
|
|
||||||
|
char cmd_buf[COMMAND_LEN];
|
||||||
|
snprintf(cmd_buf, sizeof(cmd_buf), "mkdir -p %s", pRaftServer->instance[vid].dir);
|
||||||
|
system(cmd_buf);
|
||||||
|
sleep(1);
|
||||||
|
|
||||||
|
pRaftServer->instance[vid].fsm = pFsm;
|
||||||
|
|
||||||
|
ret = raft_uv_init(&pRaftServer->instance[vid].io, &pRaftServer->loop, pRaftServer->instance[vid].dir, &pRaftServer->transport);
|
||||||
|
if (ret != 0) {
|
||||||
|
fprintf(stderr, "%s \n", raft_errmsg(&pRaftServer->instance[vid].raft));
|
||||||
|
assert(0);
|
||||||
|
}
|
||||||
|
|
||||||
|
ret = raft_init(&pRaftServer->instance[vid].raft, &pRaftServer->instance[vid].io, pRaftServer->instance[vid].fsm, pRaftServer->instance[vid].raftId, pRaftServer->address);
|
||||||
|
if (ret != 0) {
|
||||||
|
fprintf(stderr, "%s \n", raft_errmsg(&pRaftServer->instance[vid].raft));
|
||||||
|
assert(0);
|
||||||
|
}
|
||||||
|
|
||||||
|
struct raft_configuration conf;
|
||||||
|
raft_configuration_init(&conf);
|
||||||
|
raft_configuration_add(&conf, pRaftServer->instance[vid].raftId, pRaftServer->address, RAFT_VOTER);
|
||||||
|
printf("add myself: %llu - %s \n", pRaftServer->instance[vid].raftId, pRaftServer->address);
|
||||||
|
for (int i = 0; i < pConf->peersCount; ++i) {
|
||||||
|
const Addr *pAddr = &pConf->peers[i];
|
||||||
|
|
||||||
|
raft_id rid = encodeRaftId(pAddr->host, pAddr->port, vid);
|
||||||
|
|
||||||
|
char addrBuf[ADDRESS_LEN];
|
||||||
|
snprintf(addrBuf, sizeof(addrBuf), "%s:%u", pAddr->host, pAddr->port);
|
||||||
|
raft_configuration_add(&conf, rid, addrBuf, RAFT_VOTER);
|
||||||
|
printf("add peers: %llu - %s \n", rid, addrBuf);
|
||||||
|
}
|
||||||
|
|
||||||
|
raft_bootstrap(&pRaftServer->instance[vid].raft, &conf);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t raftServerStart(SRaftServer *pRaftServer) {
|
||||||
|
int ret;
|
||||||
|
|
||||||
|
for (int i = 0; i < pRaftServer->instanceCount; ++i) {
|
||||||
|
ret = raft_start(&pRaftServer->instance[i].raft);
|
||||||
|
if (ret != 0) {
|
||||||
|
fprintf(stderr, "%s \n", raft_errmsg(&pRaftServer->instance[i].raft));
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
uv_run(&pRaftServer->loop, UV_RUN_DEFAULT);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
void raftServerClose(SRaftServer *pRaftServer) {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
int fsmApplyCb(struct raft_fsm *pFsm, const struct raft_buffer *buf, void **result) {
|
||||||
|
char *msg = (char*)buf->base;
|
||||||
|
printf("fsm apply: %s \n", msg);
|
||||||
|
|
||||||
|
char arr[2][MAX_TOKEN_LEN];
|
||||||
|
splitString(msg, "--", arr, 2);
|
||||||
|
putKV(arr[0], arr[1]);
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t initFsm(struct raft_fsm *fsm) {
|
||||||
|
initStore();
|
||||||
|
fsm->apply = fsmApplyCb;
|
||||||
|
return 0;
|
||||||
|
}
|
|
@ -0,0 +1,68 @@
|
||||||
|
#ifndef TDENGINE_RAFT_SERVER_H
|
||||||
|
#define TDENGINE_RAFT_SERVER_H
|
||||||
|
|
||||||
|
#ifdef __cplusplus
|
||||||
|
extern "C" {
|
||||||
|
#endif
|
||||||
|
|
||||||
|
#include <netinet/in.h>
|
||||||
|
#include <arpa/inet.h>
|
||||||
|
#include <assert.h>
|
||||||
|
#include <string.h>
|
||||||
|
#include "raft.h"
|
||||||
|
#include "raft/uv.h"
|
||||||
|
#include "common.h"
|
||||||
|
|
||||||
|
|
||||||
|
// simulate a db store, just for test
|
||||||
|
#define MAX_KV_LEN 100
|
||||||
|
#define MAX_RECORD_COUNT 500
|
||||||
|
char *keys;
|
||||||
|
char *values;
|
||||||
|
int writeIndex;
|
||||||
|
|
||||||
|
void initStore();
|
||||||
|
void destroyStore();
|
||||||
|
void putKV(const char *key, const char *value);
|
||||||
|
char *getKV(const char *key);
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
char dir[DIR_LEN + HOST_LEN * 4]; /* Data dir of UV I/O backend */
|
||||||
|
raft_id raftId; /* For vote */
|
||||||
|
struct raft_fsm *fsm; /* Sample application FSM */
|
||||||
|
struct raft raft; /* Raft instance */
|
||||||
|
struct raft_io io; /* UV I/O backend */
|
||||||
|
|
||||||
|
} SInstance;
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
char host[HOST_LEN];
|
||||||
|
uint32_t port;
|
||||||
|
char address[ADDRESS_LEN]; /* Raft instance address */
|
||||||
|
|
||||||
|
struct uv_loop_s loop; /* UV loop */
|
||||||
|
struct raft_uv_transport transport; /* UV I/O backend transport */
|
||||||
|
|
||||||
|
SInstance instance[MAX_INSTANCE_NUM];
|
||||||
|
int32_t instanceCount;
|
||||||
|
|
||||||
|
} SRaftServer;
|
||||||
|
|
||||||
|
#define MAX_TOKEN_LEN 32
|
||||||
|
int splitString(const char* str, char* separator, char (*arr)[MAX_TOKEN_LEN], int n_arr);
|
||||||
|
|
||||||
|
int32_t raftServerInit(SRaftServer *pRaftServer, const SRaftServerConfig *pConf, struct raft_fsm *pFsm);
|
||||||
|
int32_t raftServerStart(SRaftServer *pRaftServer);
|
||||||
|
void raftServerClose(SRaftServer *pRaftServer);
|
||||||
|
|
||||||
|
|
||||||
|
int initFsm(struct raft_fsm *fsm);
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
#ifdef __cplusplus
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
|
#endif // TDENGINE_RAFT_SERVER_H
|
|
@ -38,6 +38,7 @@ int64_t tfOpenCreateWriteAppend(const char *pathname);
|
||||||
int64_t tfClose(int64_t tfd);
|
int64_t tfClose(int64_t tfd);
|
||||||
int64_t tfWrite(int64_t tfd, void *buf, int64_t count);
|
int64_t tfWrite(int64_t tfd, void *buf, int64_t count);
|
||||||
int64_t tfRead(int64_t tfd, void *buf, int64_t count);
|
int64_t tfRead(int64_t tfd, void *buf, int64_t count);
|
||||||
|
int64_t tfPread(int64_t tfd, void *buf, int64_t count, int64_t offset);
|
||||||
int32_t tfFsync(int64_t tfd);
|
int32_t tfFsync(int64_t tfd);
|
||||||
bool tfValid(int64_t tfd);
|
bool tfValid(int64_t tfd);
|
||||||
int64_t tfLseek(int64_t tfd, int64_t offset, int32_t whence);
|
int64_t tfLseek(int64_t tfd, int64_t offset, int32_t whence);
|
||||||
|
|
|
@ -81,7 +81,7 @@ int metaOpenDB(SMeta *pMeta) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (metaOpenBDBDb(&(pDB->pSchemaDB), pDB->pEvn, "meta.db", false) < 0) {
|
if (metaOpenBDBDb(&(pDB->pSchemaDB), pDB->pEvn, "schema.db", false) < 0) {
|
||||||
metaCloseDB(pMeta);
|
metaCloseDB(pMeta);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
@ -558,6 +558,12 @@ SMTbCursor *metaOpenTbCursor(SMeta *pMeta) {
|
||||||
|
|
||||||
pDB->pTbDB->cursor(pDB->pTbDB, NULL, &(pTbCur->pCur), 0);
|
pDB->pTbDB->cursor(pDB->pTbDB, NULL, &(pTbCur->pCur), 0);
|
||||||
|
|
||||||
|
#if 0
|
||||||
|
DB_BTREE_STAT *sp;
|
||||||
|
pDB->pTbDB->stat(pDB->pTbDB, NULL, &sp, 0);
|
||||||
|
printf("**************** %ld\n", sp->bt_nkeys);
|
||||||
|
#endif
|
||||||
|
|
||||||
return pTbCur;
|
return pTbCur;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -142,7 +142,8 @@ uint64_t fstStateInputLen(FstState* state);
|
||||||
// end_addr
|
// end_addr
|
||||||
uint64_t fstStateEndAddrForOneTransNext(FstState* state, FstSlice* data);
|
uint64_t fstStateEndAddrForOneTransNext(FstState* state, FstSlice* data);
|
||||||
uint64_t fstStateEndAddrForOneTrans(FstState* state, FstSlice* data, PackSizes sizes);
|
uint64_t fstStateEndAddrForOneTrans(FstState* state, FstSlice* data, PackSizes sizes);
|
||||||
uint64_t fstStateEndAddrForAnyTrans(FstState* state, uint64_t version, FstSlice* date, PackSizes sizes, uint64_t nTrans);
|
uint64_t fstStateEndAddrForAnyTrans(FstState* state, uint64_t version, FstSlice* date, PackSizes sizes,
|
||||||
|
uint64_t nTrans);
|
||||||
// input
|
// input
|
||||||
uint8_t fstStateInput(FstState* state, FstNode* node);
|
uint8_t fstStateInput(FstState* state, FstNode* node);
|
||||||
uint8_t fstStateInputForAnyTrans(FstState* state, FstNode* node, uint64_t i);
|
uint8_t fstStateInputForAnyTrans(FstState* state, FstNode* node, uint64_t i);
|
||||||
|
@ -258,6 +259,7 @@ typedef struct Fst {
|
||||||
FstMeta* meta;
|
FstMeta* meta;
|
||||||
FstSlice* data; //
|
FstSlice* data; //
|
||||||
FstNode* root; //
|
FstNode* root; //
|
||||||
|
pthread_mutex_t mtx;
|
||||||
} Fst;
|
} Fst;
|
||||||
|
|
||||||
// refactor simple function
|
// refactor simple function
|
||||||
|
@ -310,7 +312,8 @@ StreamWithStateResult* swsResultCreate(FstSlice* data, FstOutput fOut, void* sta
|
||||||
void swsResultDestroy(StreamWithStateResult* result);
|
void swsResultDestroy(StreamWithStateResult* result);
|
||||||
|
|
||||||
typedef void* (*StreamCallback)(void*);
|
typedef void* (*StreamCallback)(void*);
|
||||||
StreamWithState* streamWithStateCreate(Fst* fst, AutomationCtx* automation, FstBoundWithData* min, FstBoundWithData* max);
|
StreamWithState* streamWithStateCreate(Fst* fst, AutomationCtx* automation, FstBoundWithData* min,
|
||||||
|
FstBoundWithData* max);
|
||||||
|
|
||||||
void streamWithStateDestroy(StreamWithState* sws);
|
void streamWithStateDestroy(StreamWithState* sws);
|
||||||
|
|
||||||
|
|
|
@ -77,6 +77,7 @@ typedef struct TFileReader {
|
||||||
Fst* fst;
|
Fst* fst;
|
||||||
WriterCtx* ctx;
|
WriterCtx* ctx;
|
||||||
TFileHeader header;
|
TFileHeader header;
|
||||||
|
bool remove;
|
||||||
} TFileReader;
|
} TFileReader;
|
||||||
|
|
||||||
typedef struct IndexTFile {
|
typedef struct IndexTFile {
|
||||||
|
|
|
@ -94,7 +94,6 @@ void indexClose(SIndex* sIdx) {
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
#ifdef USE_INVERTED_INDEX
|
#ifdef USE_INVERTED_INDEX
|
||||||
indexCacheDestroy(sIdx->cache);
|
|
||||||
void* iter = taosHashIterate(sIdx->colObj, NULL);
|
void* iter = taosHashIterate(sIdx->colObj, NULL);
|
||||||
while (iter) {
|
while (iter) {
|
||||||
IndexCache** pCache = iter;
|
IndexCache** pCache = iter;
|
||||||
|
@ -104,6 +103,7 @@ void indexClose(SIndex* sIdx) {
|
||||||
taosHashCleanup(sIdx->colObj);
|
taosHashCleanup(sIdx->colObj);
|
||||||
pthread_mutex_destroy(&sIdx->mtx);
|
pthread_mutex_destroy(&sIdx->mtx);
|
||||||
#endif
|
#endif
|
||||||
|
free(sIdx->path);
|
||||||
free(sIdx);
|
free(sIdx);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
|
@ -20,7 +20,7 @@
|
||||||
|
|
||||||
#define MAX_INDEX_KEY_LEN 256 // test only, change later
|
#define MAX_INDEX_KEY_LEN 256 // test only, change later
|
||||||
|
|
||||||
#define MEM_TERM_LIMIT 200
|
#define MEM_TERM_LIMIT 10000 * 10
|
||||||
// ref index_cache.h:22
|
// ref index_cache.h:22
|
||||||
//#define CACHE_KEY_LEN(p) \
|
//#define CACHE_KEY_LEN(p) \
|
||||||
// (sizeof(int32_t) + sizeof(uint16_t) + sizeof(p->colType) + sizeof(p->nColVal) + p->nColVal + sizeof(uint64_t) +
|
// (sizeof(int32_t) + sizeof(uint16_t) + sizeof(p->colType) + sizeof(p->nColVal) + p->nColVal + sizeof(uint64_t) +
|
||||||
|
@ -110,7 +110,10 @@ void indexCacheDestroySkiplist(SSkipList* slt) {
|
||||||
while (tSkipListIterNext(iter)) {
|
while (tSkipListIterNext(iter)) {
|
||||||
SSkipListNode* node = tSkipListIterGet(iter);
|
SSkipListNode* node = tSkipListIterGet(iter);
|
||||||
CacheTerm* ct = (CacheTerm*)SL_GET_NODE_DATA(node);
|
CacheTerm* ct = (CacheTerm*)SL_GET_NODE_DATA(node);
|
||||||
if (ct != NULL) {}
|
if (ct != NULL) {
|
||||||
|
free(ct->colVal);
|
||||||
|
free(ct);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
tSkipListDestroyIter(iter);
|
tSkipListDestroyIter(iter);
|
||||||
tSkipListDestroy(slt);
|
tSkipListDestroy(slt);
|
||||||
|
@ -271,7 +274,7 @@ int indexCacheSearch(void* cache, SIndexTermQuery* query, SArray* result, STermV
|
||||||
SIndexTerm* term = query->term;
|
SIndexTerm* term = query->term;
|
||||||
EIndexQueryType qtype = query->qType;
|
EIndexQueryType qtype = query->qType;
|
||||||
CacheTerm ct = {.colVal = term->colVal, .version = atomic_load_32(&pCache->version)};
|
CacheTerm ct = {.colVal = term->colVal, .version = atomic_load_32(&pCache->version)};
|
||||||
indexCacheDebug(pCache);
|
// indexCacheDebug(pCache);
|
||||||
|
|
||||||
int ret = indexQueryMem(mem, &ct, qtype, result, s);
|
int ret = indexQueryMem(mem, &ct, qtype, result, s);
|
||||||
if (ret == 0 && *s != kTypeDeletion) {
|
if (ret == 0 && *s != kTypeDeletion) {
|
||||||
|
|
|
@ -354,7 +354,8 @@ uint64_t fstStateEndAddrForOneTrans(FstState* s, FstSlice* data, PackSizes sizes
|
||||||
return FST_SLICE_LEN(data) - 1 - fstStateInputLen(s) - 1 // pack size
|
return FST_SLICE_LEN(data) - 1 - fstStateInputLen(s) - 1 // pack size
|
||||||
- FST_GET_TRANSITION_PACK_SIZE(sizes) - FST_GET_OUTPUT_PACK_SIZE(sizes);
|
- FST_GET_TRANSITION_PACK_SIZE(sizes) - FST_GET_OUTPUT_PACK_SIZE(sizes);
|
||||||
}
|
}
|
||||||
uint64_t fstStateEndAddrForAnyTrans(FstState* state, uint64_t version, FstSlice* date, PackSizes sizes, uint64_t nTrans) {
|
uint64_t fstStateEndAddrForAnyTrans(FstState* state, uint64_t version, FstSlice* date, PackSizes sizes,
|
||||||
|
uint64_t nTrans) {
|
||||||
uint8_t oSizes = FST_GET_OUTPUT_PACK_SIZE(sizes);
|
uint8_t oSizes = FST_GET_OUTPUT_PACK_SIZE(sizes);
|
||||||
uint8_t finalOsize = !fstStateIsFinalState(state) ? 0 : oSizes;
|
uint8_t finalOsize = !fstStateIsFinalState(state) ? 0 : oSizes;
|
||||||
return FST_SLICE_LEN(date) - 1 - fstStateNtransLen(state) - 1 // pack size
|
return FST_SLICE_LEN(date) - 1 - fstStateNtransLen(state) - 1 // pack size
|
||||||
|
@ -403,8 +404,8 @@ CompiledAddr fstStateTransAddrForAnyTrans(FstState* s, FstNode* node, uint64_t i
|
||||||
|
|
||||||
FstSlice* slice = &node->data;
|
FstSlice* slice = &node->data;
|
||||||
uint8_t tSizes = FST_GET_TRANSITION_PACK_SIZE(node->sizes);
|
uint8_t tSizes = FST_GET_TRANSITION_PACK_SIZE(node->sizes);
|
||||||
uint64_t at = node->start - fstStateNtransLen(s) - 1 - fstStateTransIndexSize(s, node->version, node->nTrans) - node->nTrans -
|
uint64_t at = node->start - fstStateNtransLen(s) - 1 - fstStateTransIndexSize(s, node->version, node->nTrans) -
|
||||||
(i * tSizes) - tSizes;
|
node->nTrans - (i * tSizes) - tSizes;
|
||||||
uint8_t* data = fstSliceData(slice, NULL);
|
uint8_t* data = fstSliceData(slice, NULL);
|
||||||
return unpackDelta(data + at, tSizes, node->end);
|
return unpackDelta(data + at, tSizes, node->end);
|
||||||
}
|
}
|
||||||
|
@ -595,7 +596,8 @@ FstNode* fstNodeCreate(int64_t version, CompiledAddr addr, FstSlice* slice) {
|
||||||
n->isFinal = fstStateIsFinalState(&st); // s.is_final_state();
|
n->isFinal = fstStateIsFinalState(&st); // s.is_final_state();
|
||||||
n->nTrans = nTrans;
|
n->nTrans = nTrans;
|
||||||
n->sizes = sz;
|
n->sizes = sz;
|
||||||
n->finalOutput = fstStateFinalOutput(&st, version, &data, sz, nTrans); // s.final_output(version, data, sz, ntrans);
|
n->finalOutput =
|
||||||
|
fstStateFinalOutput(&st, version, &data, sz, nTrans); // s.final_output(version, data, sz, ntrans);
|
||||||
}
|
}
|
||||||
return n;
|
return n;
|
||||||
}
|
}
|
||||||
|
@ -875,9 +877,7 @@ void* fstBuilderInsertInner(FstBuilder* b) {
|
||||||
// b->wrt = NULL;
|
// b->wrt = NULL;
|
||||||
return b->wrt;
|
return b->wrt;
|
||||||
}
|
}
|
||||||
void fstBuilderFinish(FstBuilder* b) {
|
void fstBuilderFinish(FstBuilder* b) { fstBuilderInsertInner(b); }
|
||||||
fstBuilderInsertInner(b);
|
|
||||||
}
|
|
||||||
|
|
||||||
FstSlice fstNodeAsSlice(FstNode* node) {
|
FstSlice fstNodeAsSlice(FstNode* node) {
|
||||||
FstSlice* slice = &node->data;
|
FstSlice* slice = &node->data;
|
||||||
|
@ -894,9 +894,7 @@ FstLastTransition* fstLastTransitionCreate(uint8_t inp, Output out) {
|
||||||
return trn;
|
return trn;
|
||||||
}
|
}
|
||||||
|
|
||||||
void fstLastTransitionDestroy(FstLastTransition* trn) {
|
void fstLastTransitionDestroy(FstLastTransition* trn) { free(trn); }
|
||||||
free(trn);
|
|
||||||
}
|
|
||||||
void fstBuilderNodeUnfinishedLastCompiled(FstBuilderNodeUnfinished* unNode, CompiledAddr addr) {
|
void fstBuilderNodeUnfinishedLastCompiled(FstBuilderNodeUnfinished* unNode, CompiledAddr addr) {
|
||||||
FstLastTransition* trn = unNode->last;
|
FstLastTransition* trn = unNode->last;
|
||||||
if (trn == NULL) { return; }
|
if (trn == NULL) { return; }
|
||||||
|
@ -959,9 +957,10 @@ Fst* fstCreate(FstSlice* slice) {
|
||||||
fst->meta->checkSum = checkSum;
|
fst->meta->checkSum = checkSum;
|
||||||
|
|
||||||
FstSlice* s = calloc(1, sizeof(FstSlice));
|
FstSlice* s = calloc(1, sizeof(FstSlice));
|
||||||
*s = fstSliceCopy(slice, 0, FST_SLICE_LEN(slice));
|
*s = fstSliceCopy(slice, 0, FST_SLICE_LEN(slice) - 1);
|
||||||
fst->data = s;
|
fst->data = s;
|
||||||
|
|
||||||
|
pthread_mutex_init(&fst->mtx, NULL);
|
||||||
return fst;
|
return fst;
|
||||||
|
|
||||||
FST_CREAT_FAILED:
|
FST_CREAT_FAILED:
|
||||||
|
@ -973,14 +972,18 @@ void fstDestroy(Fst* fst) {
|
||||||
free(fst->meta);
|
free(fst->meta);
|
||||||
fstSliceDestroy(fst->data);
|
fstSliceDestroy(fst->data);
|
||||||
free(fst->data);
|
free(fst->data);
|
||||||
|
pthread_mutex_destroy(&fst->mtx);
|
||||||
}
|
}
|
||||||
free(fst);
|
free(fst);
|
||||||
}
|
}
|
||||||
|
|
||||||
bool fstGet(Fst* fst, FstSlice* b, Output* out) {
|
bool fstGet(Fst* fst, FstSlice* b, Output* out) {
|
||||||
|
// dec lock range
|
||||||
|
pthread_mutex_lock(&fst->mtx);
|
||||||
FstNode* root = fstGetRoot(fst);
|
FstNode* root = fstGetRoot(fst);
|
||||||
Output tOut = 0;
|
Output tOut = 0;
|
||||||
int32_t len;
|
int32_t len;
|
||||||
|
|
||||||
uint8_t* data = fstSliceData(b, &len);
|
uint8_t* data = fstSliceData(b, &len);
|
||||||
|
|
||||||
SArray* nodes = (SArray*)taosArrayInit(len, sizeof(FstNode*));
|
SArray* nodes = (SArray*)taosArrayInit(len, sizeof(FstNode*));
|
||||||
|
@ -988,7 +991,10 @@ bool fstGet(Fst* fst, FstSlice* b, Output* out) {
|
||||||
for (uint32_t i = 0; i < len; i++) {
|
for (uint32_t i = 0; i < len; i++) {
|
||||||
uint8_t inp = data[i];
|
uint8_t inp = data[i];
|
||||||
Output res = 0;
|
Output res = 0;
|
||||||
if (false == fstNodeFindInput(root, inp, &res)) { return false; }
|
if (false == fstNodeFindInput(root, inp, &res)) {
|
||||||
|
pthread_mutex_unlock(&fst->mtx);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
FstTransition trn;
|
FstTransition trn;
|
||||||
fstNodeGetTransitionAt(root, res, &trn);
|
fstNodeGetTransitionAt(root, res, &trn);
|
||||||
|
@ -997,6 +1003,7 @@ bool fstGet(Fst* fst, FstSlice* b, Output* out) {
|
||||||
taosArrayPush(nodes, &root);
|
taosArrayPush(nodes, &root);
|
||||||
}
|
}
|
||||||
if (!FST_NODE_IS_FINAL(root)) {
|
if (!FST_NODE_IS_FINAL(root)) {
|
||||||
|
pthread_mutex_unlock(&fst->mtx);
|
||||||
return false;
|
return false;
|
||||||
} else {
|
} else {
|
||||||
tOut = tOut + FST_NODE_FINAL_OUTPUT(root);
|
tOut = tOut + FST_NODE_FINAL_OUTPUT(root);
|
||||||
|
@ -1007,13 +1014,13 @@ bool fstGet(Fst* fst, FstSlice* b, Output* out) {
|
||||||
fstNodeDestroy(*node);
|
fstNodeDestroy(*node);
|
||||||
}
|
}
|
||||||
taosArrayDestroy(nodes);
|
taosArrayDestroy(nodes);
|
||||||
|
|
||||||
fst->root = NULL;
|
fst->root = NULL;
|
||||||
|
pthread_mutex_unlock(&fst->mtx);
|
||||||
*out = tOut;
|
*out = tOut;
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
FstStreamBuilder* fstSearch(Fst* fst, AutomationCtx* ctx) {
|
FstStreamBuilder* fstSearch(Fst* fst, AutomationCtx* ctx) {
|
||||||
|
// refactor later
|
||||||
return fstStreamBuilderCreate(fst, ctx);
|
return fstStreamBuilderCreate(fst, ctx);
|
||||||
}
|
}
|
||||||
StreamWithState* streamBuilderIntoStream(FstStreamBuilder* sb) {
|
StreamWithState* streamBuilderIntoStream(FstStreamBuilder* sb) {
|
||||||
|
@ -1021,24 +1028,30 @@ StreamWithState* streamBuilderIntoStream(FstStreamBuilder* sb) {
|
||||||
return streamWithStateCreate(sb->fst, sb->aut, sb->min, sb->max);
|
return streamWithStateCreate(sb->fst, sb->aut, sb->min, sb->max);
|
||||||
}
|
}
|
||||||
FstStreamWithStateBuilder* fstSearchWithState(Fst* fst, AutomationCtx* ctx) {
|
FstStreamWithStateBuilder* fstSearchWithState(Fst* fst, AutomationCtx* ctx) {
|
||||||
|
// refactor later
|
||||||
return fstStreamBuilderCreate(fst, ctx);
|
return fstStreamBuilderCreate(fst, ctx);
|
||||||
}
|
}
|
||||||
|
|
||||||
FstNode* fstGetRoot(Fst* fst) {
|
FstNode* fstGetRoot(Fst* fst) {
|
||||||
if (fst->root != NULL) { return fst->root; }
|
|
||||||
CompiledAddr rAddr = fstGetRootAddr(fst);
|
CompiledAddr rAddr = fstGetRootAddr(fst);
|
||||||
fst->root = fstGetNode(fst, rAddr);
|
return fstGetNode(fst, rAddr);
|
||||||
return fst->root;
|
// pthread_mutex_lock(&fst->mtx);
|
||||||
|
// if (fst->root != NULL) {
|
||||||
|
// // pthread_mutex_unlock(&fst->mtx);
|
||||||
|
// return fst->root;
|
||||||
|
//}
|
||||||
|
// CompiledAddr rAddr = fstGetRootAddr(fst);
|
||||||
|
// fst->root = fstGetNode(fst, rAddr);
|
||||||
|
//// pthread_mutex_unlock(&fst->mtx);
|
||||||
|
// return fst->root;
|
||||||
}
|
}
|
||||||
|
|
||||||
FstNode* fstGetNode(Fst* fst, CompiledAddr addr) {
|
FstNode* fstGetNode(Fst* fst, CompiledAddr addr) {
|
||||||
|
// refactor later
|
||||||
return fstNodeCreate(fst->meta->version, addr, fst->data);
|
return fstNodeCreate(fst->meta->version, addr, fst->data);
|
||||||
}
|
}
|
||||||
FstType fstGetType(Fst* fst) {
|
FstType fstGetType(Fst* fst) { return fst->meta->ty; }
|
||||||
return fst->meta->ty;
|
CompiledAddr fstGetRootAddr(Fst* fst) { return fst->meta->rootAddr; }
|
||||||
}
|
|
||||||
CompiledAddr fstGetRootAddr(Fst* fst) {
|
|
||||||
return fst->meta->rootAddr;
|
|
||||||
}
|
|
||||||
|
|
||||||
Output fstEmptyFinalOutput(Fst* fst, bool* null) {
|
Output fstEmptyFinalOutput(Fst* fst, bool* null) {
|
||||||
Output res = 0;
|
Output res = 0;
|
||||||
|
@ -1053,8 +1066,7 @@ Output fstEmptyFinalOutput(Fst* fst, bool* null) {
|
||||||
}
|
}
|
||||||
|
|
||||||
bool fstVerify(Fst* fst) {
|
bool fstVerify(Fst* fst) {
|
||||||
uint32_t checkSum = fst->meta->checkSum;
|
uint32_t len, checkSum = fst->meta->checkSum;
|
||||||
int32_t len;
|
|
||||||
uint8_t* data = fstSliceData(fst->data, &len);
|
uint8_t* data = fstSliceData(fst->data, &len);
|
||||||
TSCKSUM initSum = 0;
|
TSCKSUM initSum = 0;
|
||||||
if (!taosCheckChecksumWhole(data, len)) { return false; }
|
if (!taosCheckChecksumWhole(data, len)) { return false; }
|
||||||
|
@ -1094,15 +1106,12 @@ bool fstBoundWithDataIsEmpty(FstBoundWithData* bound) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
bool fstBoundWithDataIsIncluded(FstBoundWithData* bound) {
|
bool fstBoundWithDataIsIncluded(FstBoundWithData* bound) { return bound->type == Excluded ? false : true; }
|
||||||
return bound->type == Excluded ? false : true;
|
|
||||||
}
|
|
||||||
|
|
||||||
void fstBoundDestroy(FstBoundWithData* bound) {
|
void fstBoundDestroy(FstBoundWithData* bound) { free(bound); }
|
||||||
free(bound);
|
|
||||||
}
|
|
||||||
|
|
||||||
StreamWithState* streamWithStateCreate(Fst* fst, AutomationCtx* automation, FstBoundWithData* min, FstBoundWithData* max) {
|
StreamWithState* streamWithStateCreate(Fst* fst, AutomationCtx* automation, FstBoundWithData* min,
|
||||||
|
FstBoundWithData* max) {
|
||||||
StreamWithState* sws = calloc(1, sizeof(StreamWithState));
|
StreamWithState* sws = calloc(1, sizeof(StreamWithState));
|
||||||
if (sws == NULL) { return NULL; }
|
if (sws == NULL) { return NULL; }
|
||||||
|
|
||||||
|
@ -1131,7 +1140,9 @@ void streamWithStateDestroy(StreamWithState* sws) {
|
||||||
bool streamWithStateSeekMin(StreamWithState* sws, FstBoundWithData* min) {
|
bool streamWithStateSeekMin(StreamWithState* sws, FstBoundWithData* min) {
|
||||||
AutomationCtx* aut = sws->aut;
|
AutomationCtx* aut = sws->aut;
|
||||||
if (fstBoundWithDataIsEmpty(min)) {
|
if (fstBoundWithDataIsEmpty(min)) {
|
||||||
if (fstBoundWithDataIsIncluded(min)) { sws->emptyOutput.out = fstEmptyFinalOutput(sws->fst, &(sws->emptyOutput.null)); }
|
if (fstBoundWithDataIsIncluded(min)) {
|
||||||
|
sws->emptyOutput.out = fstEmptyFinalOutput(sws->fst, &(sws->emptyOutput.null));
|
||||||
|
}
|
||||||
StreamState s = {.node = fstGetRoot(sws->fst),
|
StreamState s = {.node = fstGetRoot(sws->fst),
|
||||||
.trans = 0,
|
.trans = 0,
|
||||||
.out = {.null = false, .out = 0},
|
.out = {.null = false, .out = 0},
|
||||||
|
@ -1203,7 +1214,8 @@ bool streamWithStateSeekMin(StreamWithState* sws, FstBoundWithData* min) {
|
||||||
uint64_t trans = s->trans;
|
uint64_t trans = s->trans;
|
||||||
FstTransition trn;
|
FstTransition trn;
|
||||||
fstNodeGetTransitionAt(n, trans - 1, &trn);
|
fstNodeGetTransitionAt(n, trans - 1, &trn);
|
||||||
StreamState s = {.node = fstGetNode(sws->fst, trn.addr), .trans = 0, .out = {.null = false, .out = out}, .autState = autState};
|
StreamState s = {
|
||||||
|
.node = fstGetNode(sws->fst, trn.addr), .trans = 0, .out = {.null = false, .out = out}, .autState = autState};
|
||||||
taosArrayPush(sws->stack, &s);
|
taosArrayPush(sws->stack, &s);
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
@ -1260,9 +1272,7 @@ StreamWithStateResult* streamWithStateNextWith(StreamWithState* sws, StreamCallb
|
||||||
|
|
||||||
size_t isz = taosArrayGetSize(sws->inp);
|
size_t isz = taosArrayGetSize(sws->inp);
|
||||||
uint8_t* buf = (uint8_t*)malloc(isz * sizeof(uint8_t));
|
uint8_t* buf = (uint8_t*)malloc(isz * sizeof(uint8_t));
|
||||||
for (uint32_t i = 0; i < isz; i++) {
|
for (uint32_t i = 0; i < isz; i++) { buf[i] = *(uint8_t*)taosArrayGet(sws->inp, i); }
|
||||||
buf[i] = *(uint8_t*)taosArrayGet(sws->inp, i);
|
|
||||||
}
|
|
||||||
FstSlice slice = fstSliceCreate(buf, taosArrayGetSize(sws->inp));
|
FstSlice slice = fstSliceCreate(buf, taosArrayGetSize(sws->inp));
|
||||||
if (fstBoundWithDataExceededBy(sws->endAt, &slice)) {
|
if (fstBoundWithDataExceededBy(sws->endAt, &slice)) {
|
||||||
taosArrayDestroyEx(sws->stack, streamStateDestroy);
|
taosArrayDestroyEx(sws->stack, streamStateDestroy);
|
||||||
|
@ -1327,8 +1337,8 @@ FstStreamBuilder* fstStreamBuilderCreate(Fst* fst, AutomationCtx* aut) {
|
||||||
}
|
}
|
||||||
void fstStreamBuilderDestroy(FstStreamBuilder* b) {
|
void fstStreamBuilderDestroy(FstStreamBuilder* b) {
|
||||||
fstSliceDestroy(&b->min->data);
|
fstSliceDestroy(&b->min->data);
|
||||||
tfree(b->min);
|
|
||||||
fstSliceDestroy(&b->max->data);
|
fstSliceDestroy(&b->max->data);
|
||||||
|
tfree(b->min);
|
||||||
tfree(b->max);
|
tfree(b->max);
|
||||||
free(b);
|
free(b);
|
||||||
}
|
}
|
||||||
|
|
|
@ -17,9 +17,7 @@
|
||||||
|
|
||||||
StartWithStateValue* startWithStateValueCreate(StartWithStateKind kind, ValueType ty, void* val) {
|
StartWithStateValue* startWithStateValueCreate(StartWithStateKind kind, ValueType ty, void* val) {
|
||||||
StartWithStateValue* nsv = calloc(1, sizeof(StartWithStateValue));
|
StartWithStateValue* nsv = calloc(1, sizeof(StartWithStateValue));
|
||||||
if (nsv == NULL) {
|
if (nsv == NULL) { return NULL; }
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
nsv->kind = kind;
|
nsv->kind = kind;
|
||||||
nsv->type = ty;
|
nsv->type = ty;
|
||||||
|
@ -37,9 +35,7 @@ StartWithStateValue* startWithStateValueCreate(StartWithStateKind kind, ValueTyp
|
||||||
}
|
}
|
||||||
void startWithStateValueDestroy(void* val) {
|
void startWithStateValueDestroy(void* val) {
|
||||||
StartWithStateValue* sv = (StartWithStateValue*)val;
|
StartWithStateValue* sv = (StartWithStateValue*)val;
|
||||||
if (sv == NULL) {
|
if (sv == NULL) { return; }
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (sv->type == FST_INT) {
|
if (sv->type == FST_INT) {
|
||||||
//
|
//
|
||||||
|
@ -52,9 +48,7 @@ void startWithStateValueDestroy(void* val) {
|
||||||
}
|
}
|
||||||
StartWithStateValue* startWithStateValueDump(StartWithStateValue* sv) {
|
StartWithStateValue* startWithStateValueDump(StartWithStateValue* sv) {
|
||||||
StartWithStateValue* nsv = calloc(1, sizeof(StartWithStateValue));
|
StartWithStateValue* nsv = calloc(1, sizeof(StartWithStateValue));
|
||||||
if (nsv == NULL) {
|
if (nsv == NULL) { return NULL; }
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
nsv->kind = sv->kind;
|
nsv->kind = sv->kind;
|
||||||
nsv->type = sv->type;
|
nsv->type = sv->type;
|
||||||
|
@ -94,14 +88,10 @@ static bool prefixCanMatch(AutomationCtx* ctx, void* sv) {
|
||||||
static bool prefixWillAlwaysMatch(AutomationCtx* ctx, void* state) { return true; }
|
static bool prefixWillAlwaysMatch(AutomationCtx* ctx, void* state) { return true; }
|
||||||
static void* prefixAccept(AutomationCtx* ctx, void* state, uint8_t byte) {
|
static void* prefixAccept(AutomationCtx* ctx, void* state, uint8_t byte) {
|
||||||
StartWithStateValue* ssv = (StartWithStateValue*)state;
|
StartWithStateValue* ssv = (StartWithStateValue*)state;
|
||||||
if (ssv == NULL || ctx == NULL) {
|
if (ssv == NULL || ctx == NULL) { return NULL; }
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
char* data = ctx->data;
|
char* data = ctx->data;
|
||||||
if (ssv->kind == Done) {
|
if (ssv->kind == Done) { return startWithStateValueCreate(Done, FST_INT, &ssv->val); }
|
||||||
return startWithStateValueCreate(Done, FST_INT, &ssv->val);
|
|
||||||
}
|
|
||||||
if ((strlen(data) > ssv->val) && data[ssv->val] == byte) {
|
if ((strlen(data) > ssv->val) && data[ssv->val] == byte) {
|
||||||
int val = ssv->val + 1;
|
int val = ssv->val + 1;
|
||||||
|
|
||||||
|
@ -138,9 +128,7 @@ AutomationFunc automFuncs[] = {
|
||||||
|
|
||||||
AutomationCtx* automCtxCreate(void* data, AutomationType atype) {
|
AutomationCtx* automCtxCreate(void* data, AutomationType atype) {
|
||||||
AutomationCtx* ctx = calloc(1, sizeof(AutomationCtx));
|
AutomationCtx* ctx = calloc(1, sizeof(AutomationCtx));
|
||||||
if (ctx == NULL) {
|
if (ctx == NULL) { return NULL; }
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
StartWithStateValue* sv = NULL;
|
StartWithStateValue* sv = NULL;
|
||||||
if (atype == AUTOMATION_ALWAYS) {
|
if (atype == AUTOMATION_ALWAYS) {
|
||||||
|
|
|
@ -42,8 +42,8 @@ static int writeCtxDoRead(WriterCtx* ctx, uint8_t* buf, int len) {
|
||||||
static int writeCtxDoReadFrom(WriterCtx* ctx, uint8_t* buf, int len, int32_t offset) {
|
static int writeCtxDoReadFrom(WriterCtx* ctx, uint8_t* buf, int len, int32_t offset) {
|
||||||
int nRead = 0;
|
int nRead = 0;
|
||||||
if (ctx->type == TFile) {
|
if (ctx->type == TFile) {
|
||||||
tfLseek(ctx->file.fd, offset, 0);
|
// tfLseek(ctx->file.fd, offset, 0);
|
||||||
nRead = tfRead(ctx->file.fd, buf, len);
|
nRead = tfPread(ctx->file.fd, buf, len, offset);
|
||||||
} else {
|
} else {
|
||||||
// refactor later
|
// refactor later
|
||||||
assert(0);
|
assert(0);
|
||||||
|
@ -52,6 +52,7 @@ static int writeCtxDoReadFrom(WriterCtx* ctx, uint8_t* buf, int len, int32_t off
|
||||||
}
|
}
|
||||||
static int writeCtxDoFlush(WriterCtx* ctx) {
|
static int writeCtxDoFlush(WriterCtx* ctx) {
|
||||||
if (ctx->type == TFile) {
|
if (ctx->type == TFile) {
|
||||||
|
// taosFsyncFile(ctx->file.fd);
|
||||||
tfFsync(ctx->file.fd);
|
tfFsync(ctx->file.fd);
|
||||||
// tfFlush(ctx->file.fd);
|
// tfFlush(ctx->file.fd);
|
||||||
} else {
|
} else {
|
||||||
|
@ -69,13 +70,15 @@ WriterCtx* writerCtxCreate(WriterType type, const char* path, bool readOnly, int
|
||||||
// ugly code, refactor later
|
// ugly code, refactor later
|
||||||
ctx->file.readOnly = readOnly;
|
ctx->file.readOnly = readOnly;
|
||||||
if (readOnly == false) {
|
if (readOnly == false) {
|
||||||
|
// ctx->file.fd = open(path, O_WRONLY | O_CREAT | O_APPEND, S_IRWXU | S_IRWXG | S_IRWXO);
|
||||||
ctx->file.fd = tfOpenCreateWriteAppend(path);
|
ctx->file.fd = tfOpenCreateWriteAppend(path);
|
||||||
} else {
|
} else {
|
||||||
ctx->file.fd = tfOpenReadWrite(path);
|
// ctx->file.fd = open(path, O_RDONLY, S_IRWXU | S_IRWXG | S_IRWXO);
|
||||||
|
ctx->file.fd = tfOpenRead(path);
|
||||||
}
|
}
|
||||||
memcpy(ctx->file.buf, path, strlen(path));
|
memcpy(ctx->file.buf, path, strlen(path));
|
||||||
if (ctx->file.fd < 0) {
|
if (ctx->file.fd < 0) {
|
||||||
indexError("open file error %d", errno);
|
indexError("failed to open file, error %d", errno);
|
||||||
goto END;
|
goto END;
|
||||||
}
|
}
|
||||||
} else if (ctx->type == TMemory) {
|
} else if (ctx->type == TMemory) {
|
||||||
|
@ -101,10 +104,7 @@ void writerCtxDestroy(WriterCtx* ctx, bool remove) {
|
||||||
free(ctx->mem.buf);
|
free(ctx->mem.buf);
|
||||||
} else {
|
} else {
|
||||||
tfClose(ctx->file.fd);
|
tfClose(ctx->file.fd);
|
||||||
if (remove) {
|
if (remove) { unlink(ctx->file.buf); }
|
||||||
indexError("rm file %s", ctx->file.buf);
|
|
||||||
unlink(ctx->file.buf);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
free(ctx);
|
free(ctx);
|
||||||
}
|
}
|
||||||
|
@ -144,6 +144,7 @@ int fstCountingWriterRead(FstCountingWriter* write, uint8_t* buf, uint32_t len)
|
||||||
}
|
}
|
||||||
|
|
||||||
uint32_t fstCountingWriterMaskedCheckSum(FstCountingWriter* write) { return 0; }
|
uint32_t fstCountingWriterMaskedCheckSum(FstCountingWriter* write) { return 0; }
|
||||||
|
|
||||||
int fstCountingWriterFlush(FstCountingWriter* write) {
|
int fstCountingWriterFlush(FstCountingWriter* write) {
|
||||||
WriterCtx* ctx = write->wrt;
|
WriterCtx* ctx = write->wrt;
|
||||||
ctx->flush(ctx);
|
ctx->flush(ctx);
|
||||||
|
|
|
@ -53,13 +53,6 @@ static int tfileParseFileName(const char* filename, uint64_t* suid, int* colId,
|
||||||
static void tfileGenFileName(char* filename, uint64_t suid, int colId, int version);
|
static void tfileGenFileName(char* filename, uint64_t suid, int colId, int version);
|
||||||
static void tfileSerialCacheKey(TFileCacheKey* key, char* buf);
|
static void tfileSerialCacheKey(TFileCacheKey* key, char* buf);
|
||||||
|
|
||||||
static TFileReader* tfileReaderCreateImpl(WriterCtx* ctx) {
|
|
||||||
TFileReader* reader = tfileReaderCreate(ctx);
|
|
||||||
tfileReaderRef(reader);
|
|
||||||
// tfileSerialCacheKey(&key, buf);
|
|
||||||
return reader;
|
|
||||||
}
|
|
||||||
|
|
||||||
TFileCache* tfileCacheCreate(const char* path) {
|
TFileCache* tfileCacheCreate(const char* path) {
|
||||||
TFileCache* tcache = calloc(1, sizeof(TFileCache));
|
TFileCache* tcache = calloc(1, sizeof(TFileCache));
|
||||||
if (tcache == NULL) { return NULL; }
|
if (tcache == NULL) { return NULL; }
|
||||||
|
@ -88,13 +81,16 @@ TFileCache* tfileCacheCreate(const char* path) {
|
||||||
}
|
}
|
||||||
|
|
||||||
char buf[128] = {0};
|
char buf[128] = {0};
|
||||||
TFileReader* reader = tfileReaderCreateImpl(wc);
|
TFileReader* reader = tfileReaderCreate(wc);
|
||||||
TFileHeader* header = &reader->header;
|
TFileHeader* header = &reader->header;
|
||||||
TFileCacheKey key = {.suid = header->suid,
|
TFileCacheKey key = {.suid = header->suid,
|
||||||
.colName = header->colName,
|
.colName = header->colName,
|
||||||
.nColName = strlen(header->colName),
|
.nColName = strlen(header->colName),
|
||||||
.colType = header->colType};
|
.colType = header->colType};
|
||||||
tfileSerialCacheKey(&key, buf);
|
tfileSerialCacheKey(&key, buf);
|
||||||
|
|
||||||
|
tfileReaderRef(reader);
|
||||||
|
// indexTable
|
||||||
taosHashPut(tcache->tableCache, buf, strlen(buf), &reader, sizeof(void*));
|
taosHashPut(tcache->tableCache, buf, strlen(buf), &reader, sizeof(void*));
|
||||||
}
|
}
|
||||||
taosArrayDestroyEx(files, tfileDestroyFileName);
|
taosArrayDestroyEx(files, tfileDestroyFileName);
|
||||||
|
@ -139,6 +135,7 @@ void tfileCachePut(TFileCache* tcache, TFileCacheKey* key, TFileReader* reader)
|
||||||
if (p != NULL) {
|
if (p != NULL) {
|
||||||
TFileReader* oldReader = *p;
|
TFileReader* oldReader = *p;
|
||||||
taosHashRemove(tcache->tableCache, buf, strlen(buf));
|
taosHashRemove(tcache->tableCache, buf, strlen(buf));
|
||||||
|
oldReader->remove = true;
|
||||||
tfileReaderUnRef(oldReader);
|
tfileReaderUnRef(oldReader);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -152,7 +149,6 @@ TFileReader* tfileReaderCreate(WriterCtx* ctx) {
|
||||||
|
|
||||||
// T_REF_INC(reader);
|
// T_REF_INC(reader);
|
||||||
reader->ctx = ctx;
|
reader->ctx = ctx;
|
||||||
|
|
||||||
if (0 != tfileReaderLoadHeader(reader)) {
|
if (0 != tfileReaderLoadHeader(reader)) {
|
||||||
tfileReaderDestroy(reader);
|
tfileReaderDestroy(reader);
|
||||||
indexError("failed to load index header, suid: %" PRIu64 ", colName: %s", reader->header.suid,
|
indexError("failed to load index header, suid: %" PRIu64 ", colName: %s", reader->header.suid,
|
||||||
|
@ -172,7 +168,7 @@ void tfileReaderDestroy(TFileReader* reader) {
|
||||||
if (reader == NULL) { return; }
|
if (reader == NULL) { return; }
|
||||||
// T_REF_INC(reader);
|
// T_REF_INC(reader);
|
||||||
fstDestroy(reader->fst);
|
fstDestroy(reader->fst);
|
||||||
writerCtxDestroy(reader->ctx, true);
|
writerCtxDestroy(reader->ctx, reader->remove);
|
||||||
free(reader);
|
free(reader);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -232,7 +228,7 @@ TFileReader* tfileReaderOpen(char* path, uint64_t suid, int32_t version, const c
|
||||||
WriterCtx* wc = writerCtxCreate(TFile, fullname, true, 1024 * 1024 * 1024);
|
WriterCtx* wc = writerCtxCreate(TFile, fullname, true, 1024 * 1024 * 1024);
|
||||||
if (wc == NULL) { return NULL; }
|
if (wc == NULL) { return NULL; }
|
||||||
|
|
||||||
TFileReader* reader = tfileReaderCreateImpl(wc);
|
TFileReader* reader = tfileReaderCreate(wc);
|
||||||
return reader;
|
return reader;
|
||||||
|
|
||||||
// tfileSerialCacheKey(&key, buf);
|
// tfileSerialCacheKey(&key, buf);
|
||||||
|
@ -330,13 +326,16 @@ int tfileWriterPut(TFileWriter* tw, void* data, bool order) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
// write fst
|
// write fst
|
||||||
|
indexError("--------Begin----------------");
|
||||||
for (size_t i = 0; i < sz; i++) {
|
for (size_t i = 0; i < sz; i++) {
|
||||||
// TODO, fst batch write later
|
// TODO, fst batch write later
|
||||||
TFileValue* v = taosArrayGetP((SArray*)data, i);
|
TFileValue* v = taosArrayGetP((SArray*)data, i);
|
||||||
if (tfileWriteData(tw, v) == 0) {
|
if (tfileWriteData(tw, v) == 0) {
|
||||||
//
|
//
|
||||||
}
|
}
|
||||||
|
indexError("data: %s, offset: %d len: %d", v->colVal, v->offset, (int)taosArrayGetSize(v->tableId));
|
||||||
}
|
}
|
||||||
|
indexError("--------End----------------");
|
||||||
fstBuilderFinish(tw->fb);
|
fstBuilderFinish(tw->fb);
|
||||||
fstBuilderDestroy(tw->fb);
|
fstBuilderDestroy(tw->fb);
|
||||||
tw->fb = NULL;
|
tw->fb = NULL;
|
||||||
|
@ -360,7 +359,10 @@ IndexTFile* indexTFileCreate(const char* path) {
|
||||||
tfile->cache = tfileCacheCreate(path);
|
tfile->cache = tfileCacheCreate(path);
|
||||||
return tfile;
|
return tfile;
|
||||||
}
|
}
|
||||||
void IndexTFileDestroy(IndexTFile* tfile) { free(tfile); }
|
void IndexTFileDestroy(IndexTFile* tfile) {
|
||||||
|
tfileCacheDestroy(tfile->cache);
|
||||||
|
free(tfile);
|
||||||
|
}
|
||||||
|
|
||||||
int indexTFileSearch(void* tfile, SIndexTermQuery* query, SArray* result) {
|
int indexTFileSearch(void* tfile, SIndexTermQuery* query, SArray* result) {
|
||||||
int ret = -1;
|
int ret = -1;
|
||||||
|
@ -539,8 +541,14 @@ static int tfileReaderLoadHeader(TFileReader* reader) {
|
||||||
char buf[TFILE_HEADER_SIZE] = {0};
|
char buf[TFILE_HEADER_SIZE] = {0};
|
||||||
|
|
||||||
int64_t nread = reader->ctx->readFrom(reader->ctx, buf, sizeof(buf), 0);
|
int64_t nread = reader->ctx->readFrom(reader->ctx, buf, sizeof(buf), 0);
|
||||||
assert(nread == sizeof(buf));
|
if (nread == -1) {
|
||||||
|
//
|
||||||
|
indexError("actual Read: %d, to read: %d, errno: %d, filefd: %d, filename: %s", (int)(nread), (int)sizeof(buf),
|
||||||
|
errno, reader->ctx->file.fd, reader->ctx->file.buf);
|
||||||
|
}
|
||||||
|
// assert(nread == sizeof(buf));
|
||||||
memcpy(&reader->header, buf, sizeof(buf));
|
memcpy(&reader->header, buf, sizeof(buf));
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
static int tfileReaderLoadFst(TFileReader* reader) {
|
static int tfileReaderLoadFst(TFileReader* reader) {
|
||||||
|
@ -573,7 +581,7 @@ static int tfileReaderLoadTableIds(TFileReader* reader, int32_t offset, SArray*
|
||||||
char* buf = calloc(1, total);
|
char* buf = calloc(1, total);
|
||||||
if (buf == NULL) { return -1; }
|
if (buf == NULL) { return -1; }
|
||||||
|
|
||||||
nread = ctx->read(ctx, buf, total);
|
nread = ctx->readFrom(ctx, buf, total, offset + sizeof(nid));
|
||||||
assert(total == nread);
|
assert(total == nread);
|
||||||
|
|
||||||
for (int32_t i = 0; i < nid; i++) { taosArrayPush(result, (uint64_t*)buf + i); }
|
for (int32_t i = 0; i < nid; i++) { taosArrayPush(result, (uint64_t*)buf + i); }
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
|
|
||||||
#include <iostream>
|
#include <iostream>
|
||||||
#include <string>
|
#include <string>
|
||||||
|
#include <thread>
|
||||||
#include <vector>
|
#include <vector>
|
||||||
#include "index.h"
|
#include "index.h"
|
||||||
#include "indexInt.h"
|
#include "indexInt.h"
|
||||||
|
@ -42,7 +43,8 @@ class FstWriter {
|
||||||
|
|
||||||
class FstReadMemory {
|
class FstReadMemory {
|
||||||
public:
|
public:
|
||||||
FstReadMemory(size_t size) {
|
FstReadMemory(size_t size, const std::string& fileName = fileName) {
|
||||||
|
tfInit();
|
||||||
_wc = writerCtxCreate(TFile, fileName.c_str(), true, 64 * 1024);
|
_wc = writerCtxCreate(TFile, fileName.c_str(), true, 64 * 1024);
|
||||||
_w = fstCountingWriterCreate(_wc);
|
_w = fstCountingWriterCreate(_wc);
|
||||||
_size = size;
|
_size = size;
|
||||||
|
@ -101,6 +103,7 @@ class FstReadMemory {
|
||||||
fstDestroy(_fst);
|
fstDestroy(_fst);
|
||||||
fstSliceDestroy(&_s);
|
fstSliceDestroy(&_s);
|
||||||
writerCtxDestroy(_wc, false);
|
writerCtxDestroy(_wc, false);
|
||||||
|
tfCleanup();
|
||||||
}
|
}
|
||||||
|
|
||||||
private:
|
private:
|
||||||
|
@ -165,8 +168,44 @@ void checkFstCheckIterator() {
|
||||||
delete m;
|
delete m;
|
||||||
tfCleanup();
|
tfCleanup();
|
||||||
}
|
}
|
||||||
int main() {
|
|
||||||
checkFstCheckIterator();
|
void fst_get(Fst* fst) {
|
||||||
|
for (int i = 0; i < 10000; i++) {
|
||||||
|
std::string term = "Hello";
|
||||||
|
FstSlice key = fstSliceCreate((uint8_t*)term.c_str(), term.size());
|
||||||
|
uint64_t offset = 0;
|
||||||
|
bool ret = fstGet(fst, &key, &offset);
|
||||||
|
if (ret == false) {
|
||||||
|
std::cout << "not found" << std::endl;
|
||||||
|
} else {
|
||||||
|
std::cout << "found value:" << offset << std::endl;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#define NUM_OF_THREAD 10
|
||||||
|
void validateTFile(char* arg) {
|
||||||
|
tfInit();
|
||||||
|
|
||||||
|
std::thread threads[NUM_OF_THREAD];
|
||||||
|
// std::vector<std::thread> threads;
|
||||||
|
TFileReader* reader = tfileReaderOpen(arg, 0, 295868, "tag1");
|
||||||
|
|
||||||
|
for (int i = 0; i < NUM_OF_THREAD; i++) {
|
||||||
|
threads[i] = std::thread(fst_get, reader->fst);
|
||||||
|
// threads.push_back(fst_get, reader->fst);
|
||||||
|
// std::thread t(fst_get, reader->fst);
|
||||||
|
}
|
||||||
|
for (int i = 0; i < NUM_OF_THREAD; i++) {
|
||||||
|
// wait join
|
||||||
|
threads[i].join();
|
||||||
|
}
|
||||||
|
tfCleanup();
|
||||||
|
}
|
||||||
|
int main(int argc, char* argv[]) {
|
||||||
|
if (argc > 1) { validateTFile(argv[1]); }
|
||||||
|
// checkFstCheckIterator();
|
||||||
// checkFstPrefixSearch();
|
// checkFstPrefixSearch();
|
||||||
|
|
||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
|
|
|
@ -15,6 +15,7 @@
|
||||||
#include <gtest/gtest.h>
|
#include <gtest/gtest.h>
|
||||||
#include <iostream>
|
#include <iostream>
|
||||||
#include <string>
|
#include <string>
|
||||||
|
#include <thread>
|
||||||
#include "index.h"
|
#include "index.h"
|
||||||
#include "indexInt.h"
|
#include "indexInt.h"
|
||||||
#include "index_cache.h"
|
#include "index_cache.h"
|
||||||
|
@ -25,6 +26,9 @@
|
||||||
#include "tskiplist.h"
|
#include "tskiplist.h"
|
||||||
#include "tutil.h"
|
#include "tutil.h"
|
||||||
using namespace std;
|
using namespace std;
|
||||||
|
|
||||||
|
#define NUM_OF_THREAD 10
|
||||||
|
|
||||||
class DebugInfo {
|
class DebugInfo {
|
||||||
public:
|
public:
|
||||||
DebugInfo(const char* str) : info(str) {
|
DebugInfo(const char* str) : info(str) {
|
||||||
|
@ -41,6 +45,7 @@ class DebugInfo {
|
||||||
private:
|
private:
|
||||||
std::string info;
|
std::string info;
|
||||||
};
|
};
|
||||||
|
|
||||||
class FstWriter {
|
class FstWriter {
|
||||||
public:
|
public:
|
||||||
FstWriter() {
|
FstWriter() {
|
||||||
|
@ -332,6 +337,8 @@ class TFileObj {
|
||||||
TFileObj(const std::string& path = "/tmp/tindex", const std::string& colName = "voltage")
|
TFileObj(const std::string& path = "/tmp/tindex", const std::string& colName = "voltage")
|
||||||
: path_(path), colName_(colName) {
|
: path_(path), colName_(colName) {
|
||||||
colId_ = 10;
|
colId_ = 10;
|
||||||
|
reader_ = NULL;
|
||||||
|
writer_ = NULL;
|
||||||
// Do Nothing
|
// Do Nothing
|
||||||
//
|
//
|
||||||
}
|
}
|
||||||
|
@ -527,6 +534,7 @@ TEST_F(IndexCacheEnv, cache_test) {
|
||||||
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
|
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
|
||||||
colVal.c_str(), colVal.size());
|
colVal.c_str(), colVal.size());
|
||||||
coj->Put(term, colId, version++, suid++);
|
coj->Put(term, colId, version++, suid++);
|
||||||
|
// indexTermDestry(term);
|
||||||
}
|
}
|
||||||
{
|
{
|
||||||
std::string colVal("v3");
|
std::string colVal("v3");
|
||||||
|
@ -634,6 +642,23 @@ class IndexObj {
|
||||||
indexMultiTermDestroy(terms);
|
indexMultiTermDestroy(terms);
|
||||||
return numOfTable;
|
return numOfTable;
|
||||||
}
|
}
|
||||||
|
int WriteMultiMillonData(const std::string& colName, const std::string& colVal = "Hello world",
|
||||||
|
size_t numOfTable = 100 * 10000) {
|
||||||
|
std::string tColVal = colVal;
|
||||||
|
for (int i = 0; i < numOfTable; i++) {
|
||||||
|
tColVal[tColVal.size() - 1] = 'a' + i % 26;
|
||||||
|
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
|
||||||
|
colVal.c_str(), colVal.size());
|
||||||
|
SIndexMultiTerm* terms = indexMultiTermCreate();
|
||||||
|
indexMultiTermAdd(terms, term);
|
||||||
|
for (size_t i = 0; i < 10; i++) {
|
||||||
|
int ret = Put(terms, i);
|
||||||
|
assert(ret == 0);
|
||||||
|
}
|
||||||
|
indexMultiTermDestroy(terms);
|
||||||
|
}
|
||||||
|
return numOfTable;
|
||||||
|
}
|
||||||
|
|
||||||
int Put(SIndexMultiTerm* fvs, uint64_t uid) {
|
int Put(SIndexMultiTerm* fvs, uint64_t uid) {
|
||||||
numOfWrite += taosArrayGetSize(fvs);
|
numOfWrite += taosArrayGetSize(fvs);
|
||||||
|
@ -656,6 +681,14 @@ class IndexObj {
|
||||||
return taosArrayGetSize(result);
|
return taosArrayGetSize(result);
|
||||||
// assert(taosArrayGetSize(result) == targetSize);
|
// assert(taosArrayGetSize(result) == targetSize);
|
||||||
}
|
}
|
||||||
|
void PutOne(const std::string& colName, const std::string& colVal) {
|
||||||
|
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
|
||||||
|
colVal.c_str(), colVal.size());
|
||||||
|
SIndexMultiTerm* terms = indexMultiTermCreate();
|
||||||
|
indexMultiTermAdd(terms, term);
|
||||||
|
Put(terms, 10);
|
||||||
|
indexMultiTermDestroy(terms);
|
||||||
|
}
|
||||||
void Debug() {
|
void Debug() {
|
||||||
std::cout << "numOfWrite:" << numOfWrite << std::endl;
|
std::cout << "numOfWrite:" << numOfWrite << std::endl;
|
||||||
std::cout << "numOfRead:" << numOfRead << std::endl;
|
std::cout << "numOfRead:" << numOfRead << std::endl;
|
||||||
|
@ -687,7 +720,7 @@ class IndexEnv2 : public ::testing::Test {
|
||||||
IndexObj* index;
|
IndexObj* index;
|
||||||
};
|
};
|
||||||
TEST_F(IndexEnv2, testIndexOpen) {
|
TEST_F(IndexEnv2, testIndexOpen) {
|
||||||
std::string path = "/tmp";
|
std::string path = "/tmp/test";
|
||||||
if (index->Init(path) != 0) {
|
if (index->Init(path) != 0) {
|
||||||
std::cout << "failed to init index" << std::endl;
|
std::cout << "failed to init index" << std::endl;
|
||||||
exit(1);
|
exit(1);
|
||||||
|
@ -723,10 +756,24 @@ TEST_F(IndexEnv2, testIndexOpen) {
|
||||||
}
|
}
|
||||||
indexMultiTermDestroy(terms);
|
indexMultiTermDestroy(terms);
|
||||||
}
|
}
|
||||||
|
{
|
||||||
|
size_t size = 200;
|
||||||
|
std::string colName("tag1"), colVal("Hello");
|
||||||
|
|
||||||
|
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
|
||||||
|
colVal.c_str(), colVal.size());
|
||||||
|
SIndexMultiTerm* terms = indexMultiTermCreate();
|
||||||
|
indexMultiTermAdd(terms, term);
|
||||||
|
for (size_t i = size * 3; i < size * 4; i++) {
|
||||||
|
int tableId = i;
|
||||||
|
int ret = index->Put(terms, tableId);
|
||||||
|
assert(ret == 0);
|
||||||
|
}
|
||||||
|
indexMultiTermDestroy(terms);
|
||||||
|
}
|
||||||
|
|
||||||
{
|
{
|
||||||
std::string colName("tag1"), colVal("Hello");
|
std::string colName("tag1"), colVal("Hello");
|
||||||
|
|
||||||
SIndexMultiTermQuery* mq = indexMultiTermQueryCreate(MUST);
|
SIndexMultiTermQuery* mq = indexMultiTermQueryCreate(MUST);
|
||||||
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
|
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
|
||||||
colVal.c_str(), colVal.size());
|
colVal.c_str(), colVal.size());
|
||||||
|
@ -735,21 +782,44 @@ TEST_F(IndexEnv2, testIndexOpen) {
|
||||||
SArray* result = (SArray*)taosArrayInit(1, sizeof(uint64_t));
|
SArray* result = (SArray*)taosArrayInit(1, sizeof(uint64_t));
|
||||||
index->Search(mq, result);
|
index->Search(mq, result);
|
||||||
std::cout << "target size: " << taosArrayGetSize(result) << std::endl;
|
std::cout << "target size: " << taosArrayGetSize(result) << std::endl;
|
||||||
// assert(taosArrayGetSize(result) == targetSize);
|
assert(taosArrayGetSize(result) == 400);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
TEST_F(IndexEnv2, testIndex_TrigeFlush) {
|
TEST_F(IndexEnv2, testIndex_TrigeFlush) {
|
||||||
std::string path = "/tmp";
|
std::string path = "/tmp/test";
|
||||||
if (index->Init(path) != 0) {}
|
if (index->Init(path) != 0) {
|
||||||
|
// r
|
||||||
|
std::cout << "failed to init" << std::endl;
|
||||||
|
}
|
||||||
int numOfTable = 100 * 10000;
|
int numOfTable = 100 * 10000;
|
||||||
index->WriteMillonData("tag1", "Hello world", numOfTable);
|
index->WriteMillonData("tag1", "Hello", numOfTable);
|
||||||
int target = index->SearchOne("tag1", "Hello world");
|
int target = index->SearchOne("tag1", "Hello");
|
||||||
assert(numOfTable == target);
|
assert(numOfTable == target);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void write_and_search(IndexObj* idx) {
|
||||||
|
std::string colName("tag1"), colVal("Hello");
|
||||||
|
|
||||||
|
int target = idx->SearchOne("tag1", "Hello");
|
||||||
|
idx->PutOne(colName, colVal);
|
||||||
|
}
|
||||||
TEST_F(IndexEnv2, testIndex_serarch_cache_and_tfile) {
|
TEST_F(IndexEnv2, testIndex_serarch_cache_and_tfile) {
|
||||||
std::string path = "/tmp";
|
std::string path = "/tmp/cache_and_tfile";
|
||||||
if (index->Init(path) != 0) {}
|
if (index->Init(path) != 0) {
|
||||||
|
// opt
|
||||||
|
}
|
||||||
|
index->WriteMultiMillonData("tag1", "Hello", 200000);
|
||||||
|
std::thread threads[NUM_OF_THREAD];
|
||||||
|
|
||||||
|
for (int i = 0; i < NUM_OF_THREAD; i++) {
|
||||||
|
//
|
||||||
|
threads[i] = std::thread(write_and_search, index);
|
||||||
|
}
|
||||||
|
for (int i = 0; i < NUM_OF_THREAD; i++) {
|
||||||
|
// TOD
|
||||||
|
threads[i].join();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
TEST_F(IndexEnv2, testIndex_multi_thread_write) {
|
TEST_F(IndexEnv2, testIndex_multi_thread_write) {
|
||||||
std::string path = "/tmp";
|
std::string path = "/tmp";
|
||||||
|
@ -769,4 +839,7 @@ TEST_F(IndexEnv2, testIndex_performance) {
|
||||||
std::string path = "/tmp";
|
std::string path = "/tmp";
|
||||||
if (index->Init(path) != 0) {}
|
if (index->Init(path) != 0) {}
|
||||||
}
|
}
|
||||||
TEST_F(IndexEnv2, testIndexMultiTag) {}
|
TEST_F(IndexEnv2, testIndexMultiTag) {
|
||||||
|
std::string path = "/tmp";
|
||||||
|
if (index->Init(path) != 0) {}
|
||||||
|
}
|
||||||
|
|
|
@ -16,21 +16,19 @@
|
||||||
#define _DEFAULT_SOURCE
|
#define _DEFAULT_SOURCE
|
||||||
#include "os.h"
|
#include "os.h"
|
||||||
#include "taoserror.h"
|
#include "taoserror.h"
|
||||||
#include "ulog.h"
|
|
||||||
#include "tutil.h"
|
|
||||||
#include "tref.h"
|
#include "tref.h"
|
||||||
|
#include "tutil.h"
|
||||||
|
#include "ulog.h"
|
||||||
|
|
||||||
static int32_t tsFileRsetId = -1;
|
static int32_t tsFileRsetId = -1;
|
||||||
|
|
||||||
static int8_t tfInited = 0;
|
static int8_t tfInited = 0;
|
||||||
|
|
||||||
static void tfCloseFile(void *p) {
|
static void tfCloseFile(void *p) { taosCloseFile((int32_t)(uintptr_t)p); }
|
||||||
taosCloseFile((int32_t)(uintptr_t)p);
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t tfInit() {
|
int32_t tfInit() {
|
||||||
int8_t old = atomic_val_compare_exchange_8(&tfInited, 0, 1);
|
int8_t old = atomic_val_compare_exchange_8(&tfInited, 0, 1);
|
||||||
if(old == 1) return 0;
|
if (old == 1) return 0;
|
||||||
tsFileRsetId = taosOpenRef(2000, tfCloseFile);
|
tsFileRsetId = taosOpenRef(2000, tfCloseFile);
|
||||||
if (tsFileRsetId > 0) {
|
if (tsFileRsetId > 0) {
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -79,9 +77,7 @@ int64_t tfOpenCreateWriteAppend(const char *pathname, int32_t flags, mode_t mode
|
||||||
return tfOpenImp(fd);
|
return tfOpenImp(fd);
|
||||||
}
|
}
|
||||||
|
|
||||||
int64_t tfClose(int64_t tfd) {
|
int64_t tfClose(int64_t tfd) { return taosRemoveRef(tsFileRsetId, tfd); }
|
||||||
return taosRemoveRef(tsFileRsetId, tfd);
|
|
||||||
}
|
|
||||||
|
|
||||||
int64_t tfWrite(int64_t tfd, void *buf, int64_t count) {
|
int64_t tfWrite(int64_t tfd, void *buf, int64_t count) {
|
||||||
void *p = taosAcquireRef(tsFileRsetId, tfd);
|
void *p = taosAcquireRef(tsFileRsetId, tfd);
|
||||||
|
@ -109,6 +105,19 @@ int64_t tfRead(int64_t tfd, void *buf, int64_t count) {
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int64_t tfPread(int64_t tfd, void *buf, int64_t count, int32_t offset) {
|
||||||
|
void *p = taosAcquireRef(tsFileRsetId, tfd);
|
||||||
|
if (p == NULL) return -1;
|
||||||
|
|
||||||
|
int32_t fd = (int32_t)(uintptr_t)p;
|
||||||
|
|
||||||
|
int64_t ret = pread(fd, buf, count, offset);
|
||||||
|
if (ret < 0) terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
|
|
||||||
|
taosReleaseRef(tsFileRsetId, tfd);
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t tfFsync(int64_t tfd) {
|
int32_t tfFsync(int64_t tfd) {
|
||||||
void *p = taosAcquireRef(tsFileRsetId, tfd);
|
void *p = taosAcquireRef(tsFileRsetId, tfd);
|
||||||
if (p == NULL) return -1;
|
if (p == NULL) return -1;
|
||||||
|
|
Loading…
Reference in New Issue