From 2375f6c944b6e0ea0aa398edaa96df8e438756d5 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Sun, 19 Dec 2021 15:37:53 +0800 Subject: [PATCH 1/6] update cache search --- include/libs/index/index.h | 1 - source/libs/index/inc/indexInt.h | 10 +++-- source/libs/index/src/index.c | 64 +++++++++++++++++++++++++++----- 3 files changed, 62 insertions(+), 13 deletions(-) diff --git a/include/libs/index/index.h b/include/libs/index/index.h index 3ca8d10603..3d4dbade58 100644 --- a/include/libs/index/index.h +++ b/include/libs/index/index.h @@ -71,7 +71,6 @@ void indexMultiTermDestroy(SIndexMultiTerm *terms); SIndexOpts *indexOptsCreate(); void indexOptsDestroy(SIndexOpts *opts); - /* * @param: * @param: diff --git a/source/libs/index/inc/indexInt.h b/source/libs/index/inc/indexInt.h index 7e017049e8..6205d68c43 100644 --- a/source/libs/index/inc/indexInt.h +++ b/source/libs/index/inc/indexInt.h @@ -48,9 +48,13 @@ struct SIndex { struct SIndexOpts { #ifdef USE_LUCENE void *opts; -#endif - int32_t numOfItermLimit; - int8_t mergeInterval; +#endif + +#ifdef USE_INVERTED_INDEX + int32_t cacheSize; // MB + // add cache module later +#endif + }; struct SIndexMultiTermQuery { diff --git a/source/libs/index/src/index.c b/source/libs/index/src/index.c index 08c59d8d43..1bc12d2b79 100644 --- a/source/libs/index/src/index.c +++ b/source/libs/index/src/index.c @@ -16,6 +16,7 @@ #include "index.h" #include "indexInt.h" #include "index_cache.h" +#include "tdef.h" #ifdef USE_LUCENE #include "lucene++/Lucene_c.h" @@ -30,13 +31,13 @@ typedef struct SIdxColInfo { static pthread_once_t isInit = PTHREAD_ONCE_INIT; static void indexInit(); -static int indexMergeCacheIntoTindex(struct SIndex *sIdx) { - if (sIdx == NULL) { - return -1; - } - indexWarn("suid %" PRIu64 " merge cache into tindex", sIdx->suid); - return 0; -} + +static int indexTermSearch(SIndex *sIdx, SIndexTermQuery *term, SArray **result); +static int indexMergeCacheIntoTindex(SIndex *sIdx); + +static void indexInterResultsDestroy(SArray *results); +static int indexMergeFinalResults(SArray *interResults, EIndexOperatorType oType, SArray *finalResult); + int indexOpen(SIndexOpts *opts, const char *path, SIndex **index) { pthread_once(&isInit, indexInit); SIndex *sIdx = calloc(1, sizeof(SIndex)); @@ -49,8 +50,8 @@ int indexOpen(SIndexOpts *opts, const char *path, SIndex **index) { sIdx->cache = (void*)indexCacheCreate(); sIdx->tindex = NULL; - sIdx->colObj = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); - sIdx->colId = 1; + sIdx->colObj = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); + sIdx->colId = 1; sIdx->cVersion = 1; pthread_mutex_init(&sIdx->mtx, NULL); @@ -162,12 +163,25 @@ int indexSearch(SIndex *index, SIndexMultiTermQuery *multiQuerys, SArray *result #endif #ifdef USE_INVERTED_INDEX + EIndexOperatorType opera = multiQuerys->opera; // relation of querys + SArray *interResults = taosArrayInit(4, POINTER_BYTES); + int nQuery = taosArrayGetSize(multiQuerys->query); + for (size_t i = 0; i < nQuery; i++) { + SIndexTermQuery *qTerm = taosArrayGet(multiQuerys->query, i); + SArray *tResult = NULL; + indexTermSearch(index, qTerm, &tResult); + taosArrayPush(interResults, (void *)&tResult); + } + indexMergeFinalResults(interResults, opera, result); + indexInterResultsDestroy(interResults); + #endif return 1; } + int indexDelete(SIndex *index, SIndexMultiTermQuery *query) { #ifdef USE_INVERTED_INDEX #endif @@ -259,3 +273,35 @@ void indexMultiTermDestroy(SIndexMultiTerm *terms) { void indexInit() { //do nothing } +static int indexTermSearch(SIndex *sIdx, SIndexTermQuery *term, SArray **result) { + +} +static void indexInterResultsDestroy(SArray *results) { + if (results == NULL) { return; } + + size_t sz = taosArrayGetSize(results); + for (size_t i = 0; i < sz; i++) { + SArray *p = taosArrayGetP(results, i); + taosArrayDestroy(p); + } + taosArrayDestroy(results); + +} +static int indexMergeFinalResults(SArray *interResults, EIndexOperatorType oType, SArray *fResults) { + if (oType == MUST) { + + // tag1 condition && tag2 condition + } else if (oType == SHOULD) { + // tag1 condistion || tag2 condition + } else if (oType == NOT) { + // not use currently + } + return 0; +} +static int indexMergeCacheIntoTindex(SIndex *sIdx) { + if (sIdx == NULL) { + return -1; + } + indexWarn("suid %" PRIu64 " merge cache into tindex", sIdx->suid); + return 0; +} From d43ad3bed88fd0d012fb5def87b334f5905c19bb Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Sun, 19 Dec 2021 16:43:56 +0800 Subject: [PATCH 2/6] update cache search --- include/libs/index/index.h | 2 +- source/libs/index/inc/index_cache.h | 10 +++++----- source/libs/index/src/index.c | 14 +++++++++---- source/libs/index/src/index_cache.c | 31 ++++++++++++++--------------- 4 files changed, 31 insertions(+), 26 deletions(-) diff --git a/include/libs/index/index.h b/include/libs/index/index.h index 3d4dbade58..2535ec8a5b 100644 --- a/include/libs/index/index.h +++ b/include/libs/index/index.h @@ -53,7 +53,7 @@ int indexMultiTermQueryAdd(SIndexMultiTermQuery *pQuery, SIndexTerm */ int indexOpen(SIndexOpts *opt, const char *path, SIndex **index); void indexClose(SIndex *index); -int indexPut(SIndex *index, SIndexMultiTerm *terms, int uid); +int indexPut(SIndex *index, SIndexMultiTerm *terms, uint64_t uid); int indexDelete(SIndex *index, SIndexMultiTermQuery *query); int indexSearch(SIndex *index, SIndexMultiTermQuery *query, SArray *result); int indexRebuild(SIndex *index, SIndexOpts *opt); diff --git a/source/libs/index/inc/index_cache.h b/source/libs/index/inc/index_cache.h index b952e16a8e..f2e6df0bfb 100644 --- a/source/libs/index/inc/index_cache.h +++ b/source/libs/index/inc/index_cache.h @@ -16,13 +16,14 @@ #define __INDEX_CACHE_H__ #include "index.h" +#include "indexInt.h" #include "tlockfree.h" #include "tskiplist.h" // ----------------- row structure in skiplist --------------------- /* A data row, the format is like below: * content: |<--totalLen-->|<-- fieldid-->|<--field type -->|<-- value len--->|<-- value -->|<-- uid -->|<--version--->|<-- itermType -->| - * len : |<--int32_t -->|<-- int16_t-->|<-- int16_t --->|<--- int32_t --->|<--valuelen->|<--uint64_t->|<-- int32_t-->|<-- int8_t --->| + * len : |<--int32_t -->|<-- int16_t-->|<-- int8_t --->|<--- int32_t --->|<--valuelen->|<--uint64_t->|<-- int32_t-->|<-- int8_t --->| */ #ifdef __cplusplus @@ -40,11 +41,10 @@ IndexCache *indexCacheCreate(); void indexCacheDestroy(void *cache); -int indexCachePut(void *cache, int16_t fieldId, int16_t fieldType, const char *fieldValue, int32_t fvLen, - uint32_t version, uint64_t uid, int8_t operType); +int indexCachePut(void *cache, SIndexTerm *term, int16_t colId, int32_t version, uint64_t uid); -int indexCacheGet(void *cache, uint64_t *rst); -int indexCacheSearch(void *cache, SIndexMultiTermQuery *query, SArray *result); +//int indexCacheGet(void *cache, uint64_t *rst); +int indexCacheSearch(void *cache, SIndexTermQuery *query, int16_t colId, int32_t version, SArray *result); #ifdef __cplusplus } diff --git a/source/libs/index/src/index.c b/source/libs/index/src/index.c index 1bc12d2b79..e6c655d097 100644 --- a/source/libs/index/src/index.c +++ b/source/libs/index/src/index.c @@ -74,7 +74,7 @@ void indexClose(SIndex *sIdx) { return; } -int indexPut(SIndex *index, SIndexMultiTerm * fVals, int uid) { +int indexPut(SIndex *index, SIndexMultiTerm * fVals, uint64_t uid) { #ifdef USE_LUCENE index_document_t *doc = index_document_create(); @@ -116,7 +116,7 @@ int indexPut(SIndex *index, SIndexMultiTerm * fVals, int uid) { assert(fi != NULL); int32_t colId = fi->colId; int32_t version = index->cVersion; - int ret = indexCachePut(index->cache, colId, p->colType, p->colVal, p->nColVal, version, uid, p->operType); + int ret = indexCachePut(index->cache, p, colId, version, uid); if (ret != 0) { return ret; } @@ -275,6 +275,7 @@ void indexInit() { } static int indexTermSearch(SIndex *sIdx, SIndexTermQuery *term, SArray **result) { + return 0; } static void indexInterResultsDestroy(SArray *results) { if (results == NULL) { return; } @@ -288,12 +289,17 @@ static void indexInterResultsDestroy(SArray *results) { } static int indexMergeFinalResults(SArray *interResults, EIndexOperatorType oType, SArray *fResults) { + //refactor, merge interResults into fResults by oType + SArray *first = taosArrayGetP(interResults, 0); + + //taosArraySort(first, getCom) if (oType == MUST) { - - // tag1 condition && tag2 condition + } else if (oType == SHOULD) { + // tag1 condistion || tag2 condition } else if (oType == NOT) { + // not use currently } return 0; diff --git a/source/libs/index/src/index_cache.c b/source/libs/index/src/index_cache.c index 23f7a08823..7ce23e6b6e 100644 --- a/source/libs/index/src/index_cache.c +++ b/source/libs/index/src/index_cache.c @@ -103,14 +103,13 @@ void indexCacheDestroy(void *cache) { free(pCache); } -int indexCachePut(void *cache, int16_t fieldId, int16_t fieldType, const char *fieldValue, int32_t fvLen, - uint32_t version, uint64_t uid, int8_t operType) { +int indexCachePut(void *cache, SIndexTerm *term, int16_t colId, int32_t version, uint64_t uid) { if (cache == NULL) { return -1;} IndexCache *pCache = cache; // encode data - int32_t total = sizeof(int32_t) + sizeof(fieldId) + sizeof(fieldType) + sizeof(fvLen) + fvLen + sizeof(version) + sizeof(uid) + sizeof(operType); + int32_t total = sizeof(int32_t) + sizeof(colId) + sizeof(term->colType) + sizeof(term->nColVal) + term->nColVal + sizeof(version) + sizeof(uid) + sizeof(term->operType); char *buf = calloc(1, total); char *p = buf; @@ -118,16 +117,16 @@ int indexCachePut(void *cache, int16_t fieldId, int16_t fieldType, const char *f memcpy(p, &total, sizeof(total)); p += sizeof(total); - memcpy(p, &fieldId, sizeof(fieldId)); - p += sizeof(fieldId); + memcpy(p, &colId, sizeof(colId)); + p += sizeof(colId); - memcpy(p, &fieldType, sizeof(fieldType)); - p += sizeof(fieldType); + memcpy(p, &term->colType, sizeof(term->colType)); + p += sizeof(term->colType); - memcpy(p, &fvLen, sizeof(fvLen)); - p += sizeof(fvLen); - memcpy(p, fieldValue, fvLen); - p += fvLen; + memcpy(p, &term->nColVal, sizeof(term->nColVal)); + p += sizeof(term->nColVal); + memcpy(p, term->colVal, term->nColVal); + p += term->nColVal; memcpy(p, &version, sizeof(version)); p += sizeof(version); @@ -135,8 +134,8 @@ int indexCachePut(void *cache, int16_t fieldId, int16_t fieldType, const char *f memcpy(p, &uid, sizeof(uid)); p += sizeof(uid); - memcpy(p, &operType, sizeof(operType)); - p += sizeof(operType); + memcpy(p, &term->operType, sizeof(term->operType)); + p += sizeof(term->operType); tSkipListPut(pCache->skiplist, (void *)buf); // encode end @@ -146,7 +145,7 @@ int indexCacheDel(void *cache, int32_t fieldId, const char *fieldValue, int32_t IndexCache *pCache = cache; return 0; } -int indexCacheSearch(void *cache, SIndexMultiTermQuery *query, SArray *result) { - - return 0; +int indexCacheSearch(void *cache, SIndexTermQuery *query, int16_t colId, int32_t version, SArray *result) { + return 0; } + From 9140eb597c760457ed235343c5dfde1e7587d27d Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Sun, 19 Dec 2021 17:49:30 +0800 Subject: [PATCH 3/6] update index cache search --- source/libs/index/inc/indexInt.h | 10 ++++++++++ source/libs/index/inc/index_cache.h | 2 +- source/libs/index/src/index_cache.c | 24 +++++++++++++++++++++++- 3 files changed, 34 insertions(+), 2 deletions(-) diff --git a/source/libs/index/inc/indexInt.h b/source/libs/index/inc/indexInt.h index 6205d68c43..a947e24a73 100644 --- a/source/libs/index/inc/indexInt.h +++ b/source/libs/index/inc/indexInt.h @@ -31,6 +31,14 @@ extern "C" { #endif +typedef struct SIndexStat { + int32_t totalAdded; // + int32_t totalDeled; // + int32_t totalUpdated; // + int32_t totalTerms // + int32_t distCol; // distinct column +} SIndexStat; + struct SIndex { #ifdef USE_LUCENE index_t *index; @@ -42,6 +50,8 @@ struct SIndex { int64_t suid; // current super table id, -1 is normal table int colId; // field id allocated to cache int32_t cVersion; // current version allocated to cache + + SIndexStat stat; pthread_mutex_t mtx; }; diff --git a/source/libs/index/inc/index_cache.h b/source/libs/index/inc/index_cache.h index f2e6df0bfb..e3b7ae273a 100644 --- a/source/libs/index/inc/index_cache.h +++ b/source/libs/index/inc/index_cache.h @@ -19,7 +19,7 @@ #include "indexInt.h" #include "tlockfree.h" #include "tskiplist.h" -// ----------------- row structure in skiplist --------------------- +// ----------------- key structure in skiplist --------------------- /* A data row, the format is like below: * content: |<--totalLen-->|<-- fieldid-->|<--field type -->|<-- value len--->|<-- value -->|<-- uid -->|<--version--->|<-- itermType -->| diff --git a/source/libs/index/src/index_cache.c b/source/libs/index/src/index_cache.c index 7ce23e6b6e..5a23278908 100644 --- a/source/libs/index/src/index_cache.c +++ b/source/libs/index/src/index_cache.c @@ -18,6 +18,9 @@ #define MAX_INDEX_KEY_LEN 256// test only, change later +// ref index_cache.h:22 +#define CACHE_KEY_LEN(p) (sizeof(int32_t) + sizeof(uint16_t) + sizeof(p->colType) + sizeof(p->nColVal) + p->nColVal + sizeof(uint64_t) + sizeof(p->operType)) + static char* getIndexKey(const void *pData) { return NULL; } @@ -109,7 +112,7 @@ int indexCachePut(void *cache, SIndexTerm *term, int16_t colId, int32_t version, IndexCache *pCache = cache; // encode data - int32_t total = sizeof(int32_t) + sizeof(colId) + sizeof(term->colType) + sizeof(term->nColVal) + term->nColVal + sizeof(version) + sizeof(uid) + sizeof(term->operType); + int32_t total = CACHE_KEY_LEN(term); char *buf = calloc(1, total); char *p = buf; @@ -138,6 +141,7 @@ int indexCachePut(void *cache, SIndexTerm *term, int16_t colId, int32_t version, p += sizeof(term->operType); tSkipListPut(pCache->skiplist, (void *)buf); + return 0; // encode end } @@ -146,6 +150,24 @@ int indexCacheDel(void *cache, int32_t fieldId, const char *fieldValue, int32_t return 0; } int indexCacheSearch(void *cache, SIndexTermQuery *query, int16_t colId, int32_t version, SArray *result) { + if (cache == NULL) { return -1; } + IndexCache *pCache = cache; + SIndexTerm *term = query->term; + EIndexQueryType qtype = query->qType; + + int32_t keyLen = CACHE_KEY_LEN(term); + + char *buf = calloc(1, keyLen); + if (qtype == QUERY_TERM) { + + } else if (qtype == QUERY_PREFIX) { + + } else if (qtype == QUERY_SUFFIX) { + + } else if (qtype == QUERY_REGEX) { + + } + return 0; } From 21d6ba86c0cc168bc0f12534b7f4383e05ce1a27 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Sun, 19 Dec 2021 17:51:28 +0800 Subject: [PATCH 4/6] update index cache search --- source/libs/index/inc/indexInt.h | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/source/libs/index/inc/indexInt.h b/source/libs/index/inc/indexInt.h index a947e24a73..a96c8eecf4 100644 --- a/source/libs/index/inc/indexInt.h +++ b/source/libs/index/inc/indexInt.h @@ -35,8 +35,8 @@ typedef struct SIndexStat { int32_t totalAdded; // int32_t totalDeled; // int32_t totalUpdated; // - int32_t totalTerms // - int32_t distCol; // distinct column + int32_t totalTerms; // + int32_t distinctCol; // distinct column } SIndexStat; struct SIndex { From 7b91d474660eddc799db0c25aa0bbcda01d51c49 Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Sun, 19 Dec 2021 20:05:07 +0800 Subject: [PATCH 5/6] add craft demo --- cmake/cmake.options | 6 + cmake/craft_CMakeLists.txt.in | 13 ++ contrib/CMakeLists.txt | 17 ++ contrib/test/craft/clear.sh | 3 + contrib/test/craft/help.txt | 15 ++ contrib/test/craft/simulate_vnode.c | 252 ++++++++++++++++++++++++++++ 6 files changed, 306 insertions(+) create mode 100644 cmake/craft_CMakeLists.txt.in create mode 100644 contrib/test/craft/clear.sh create mode 100644 contrib/test/craft/help.txt create mode 100644 contrib/test/craft/simulate_vnode.c diff --git a/cmake/cmake.options b/cmake/cmake.options index edaab3bd45..1b2cdb6c47 100644 --- a/cmake/cmake.options +++ b/cmake/cmake.options @@ -43,6 +43,12 @@ option( OFF ) +option( + BUILD_WITH_CRAFT + "If build with canonical-raft" + OFF +) + option( BUILD_DEPENDENCY_TESTS "If build dependency tests" diff --git a/cmake/craft_CMakeLists.txt.in b/cmake/craft_CMakeLists.txt.in new file mode 100644 index 0000000000..a77fa679e0 --- /dev/null +++ b/cmake/craft_CMakeLists.txt.in @@ -0,0 +1,13 @@ + +# canonical-raft +ExternalProject_Add(craft + GIT_REPOSITORY https://github.com/canonical/raft.git + GIT_TAG v0.11.2 + SOURCE_DIR "${CMAKE_CONTRIB_DIR}/craft" + BINARY_DIR "${CMAKE_CONTRIB_DIR}/craft/.libs" + #BUILD_IN_SOURCE TRUE + CONFIGURE_COMMAND "autoreconf -i && ./configure" + BUILD_COMMAND "$(MAKE)" + INSTALL_COMMAND "" + TEST_COMMAND "" +) diff --git a/contrib/CMakeLists.txt b/contrib/CMakeLists.txt index cffe164488..0aadaccfa1 100644 --- a/contrib/CMakeLists.txt +++ b/contrib/CMakeLists.txt @@ -34,6 +34,11 @@ if(${BUILD_WITH_ROCKSDB}) add_definitions(-DUSE_ROCKSDB) endif(${BUILD_WITH_ROCKSDB}) +# canonical-raft +if(${BUILD_WITH_CRAFT}) + cat("${CMAKE_SUPPORT_DIR}/craft_CMakeLists.txt.in" ${CONTRIB_TMP_FILE}) +endif(${BUILD_WITH_CRAFT}) + # bdb if(${BUILD_WITH_BDB}) cat("${CMAKE_SUPPORT_DIR}/bdb_CMakeLists.txt.in" ${CONTRIB_TMP_FILE}) @@ -144,6 +149,18 @@ if(${BUILD_WITH_NURAFT}) add_subdirectory(nuraft) endif(${BUILD_WITH_NURAFT}) +# CRAFT +if(${BUILD_WITH_CRAFT}) + add_library(craft STATIC IMPORTED GLOBAL) + set_target_properties(craft PROPERTIES + IMPORTED_LOCATION "${CMAKE_CURRENT_SOURCE_DIR}/craft/.libs/libraft.a" + INTERFACE_INCLUDE_DIRECTORIES "${CMAKE_CURRENT_SOURCE_DIR}/craft" + ) + target_link_libraries(craft + INTERFACE pthread + ) +endif(${BUILD_WITH_CRAFT}) + # BDB if(${BUILD_WITH_BDB}) add_library(bdb STATIC IMPORTED GLOBAL) diff --git a/contrib/test/craft/clear.sh b/contrib/test/craft/clear.sh new file mode 100644 index 0000000000..6412656d77 --- /dev/null +++ b/contrib/test/craft/clear.sh @@ -0,0 +1,3 @@ +#!/bin/bash + +rm -rf 127.0.0.1* diff --git a/contrib/test/craft/help.txt b/contrib/test/craft/help.txt new file mode 100644 index 0000000000..48ce9de403 --- /dev/null +++ b/contrib/test/craft/help.txt @@ -0,0 +1,15 @@ + + +make craft: + +sudo apt-get install libuv1-dev liblz4-dev +autoreconf -i +./configure --enable-example +make + + +start: + +./simulate_vnode 10000 10001 10002 +./simulate_vnode 10001 10000 10002 +./simulate_vnode 10002 10000 10001 diff --git a/contrib/test/craft/simulate_vnode.c b/contrib/test/craft/simulate_vnode.c new file mode 100644 index 0000000000..668fe638b7 --- /dev/null +++ b/contrib/test/craft/simulate_vnode.c @@ -0,0 +1,252 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include + +const char* exe_name; + +// simulate ------------------------ +typedef struct SVnode { + int vid; +} SVnode; + + +#define VNODE_COUNT 10 +SVnode vnodes[VNODE_COUNT]; + +int vnodeApplyWMsg(SVnode *pVnode, char *pMsg, void **pRsp) { + printf("put value to tsdb, vid:%d msg:%s \n", pVnode->vid, pMsg); + return 0; +} + +int applyCB(struct raft_fsm *fsm, + const struct raft_buffer *buf, + void **result) { + char *msg = (char*)buf->base; + //printf("%s \n", msg); + + // parse msg + char* context; + char* token = strtok_r(msg, ":", &context); + int vid = atoi(token); + + token = strtok_r(NULL, ":", &context); + char *value = token; + + SVnode* tmp_vnodes = (SVnode*)(fsm->data); + vnodeApplyWMsg(&tmp_vnodes[vid], value, NULL); + + return 0; +} + +// Config ------------------------ +#define HOST_LEN 32 +#define MAX_PEERS 10 +typedef struct Address { + char host[HOST_LEN]; + uint32_t port; +} Address; + +uint64_t raftId(Address *addr) { + // test in a single machine, port is unique + // if in multi machines, use host and port + return addr->port; +} + +typedef struct Config { + Address me; + Address peers[MAX_PEERS]; + int peer_count; +} Config; + +Config gConf; + +void printConf(Config *c) { + printf("me: %s:%u \n", c->me.host, c->me.port); + for (int i = 0; i < c->peer_count; ++i) { + printf("peer%d: %s:%u \n", i, c->peers[i].host, c->peers[i].port); + } +} + +// RaftServer ------------------------ +typedef struct RaftServer { + struct uv_loop_s loop; + struct raft_uv_transport transport; + struct raft_io io; + struct raft_fsm fsm; + struct raft raft; + struct raft_configuration conf; +} RaftServer; + +RaftServer gRaftServer; + +static void* startRaftServer(void *param) { + //RaftServer* rs = (RaftServer*)param; + RaftServer* rs = &gRaftServer; + raft_start(&rs->raft); + uv_run(&rs->loop, UV_RUN_DEFAULT); +} + +static const char* state2String(unsigned short state) { + if (state == RAFT_UNAVAILABLE) { + return "RAFT_UNAVAILABLE"; + + } else if (state == RAFT_FOLLOWER) { + return "RAFT_FOLLOWER"; + + } else if (state == RAFT_CANDIDATE) { + return "RAFT_CANDIDATE"; + + } else if (state == RAFT_LEADER) { + return "RAFT_LEADER"; + + } + return "UNKNOWN_RAFT_STATE"; +} + +static void printRaftState(struct raft *r) { + printf("\n"); + printf("my_id: %llu \n", r->id); + printf("address: %s \n", r->address); + printf("current_term: %llu \n", r->current_term); + printf("voted_for: %llu \n", r->voted_for); + printf("role: %s \n", state2String(r->state)); + printf("commit_index: %llu \n", r->commit_index); + printf("last_applied: %llu \n", r->last_applied); + printf("last_stored: %llu \n", r->last_stored); + printf("\n"); +} + +// console ----------------------------------------- +#define PROPOSE_VALUE_LEN 128 +static void proposeValue(struct raft *r) { + struct raft_buffer buf; + + // need free + buf.len = PROPOSE_VALUE_LEN; + buf.base = raft_malloc(buf.len); + + // mock ts value + int vid = rand() % VNODE_COUNT; + snprintf(buf.base, buf.len, "%d:value_%ld", vid, time(NULL)); + + printf("propose value: %s \n", (char*)buf.base); + + // need free + struct raft_apply *req = raft_malloc(sizeof(struct raft_apply)); + raft_apply(r, req, &buf, 1, NULL); +} + +static void* console(void *param) { + while (1) { + // notice! memory buffer overflow! + char buf[128]; + memset(buf, 0, sizeof(buf)); + fgets(buf, 128, stdin); + if (strlen(buf) == 1) { + continue; + } + buf[strlen(buf)-1] = '\0'; + + // do not use strcmp + if (strcmp(buf, "state") == 0) { + printRaftState(&gRaftServer.raft); + + } else if (strcmp(buf, "put") == 0) { + proposeValue(&gRaftServer.raft); + + } else { + printf("unknown command: [%s], support command: state, put \n", buf); + } + } +} + +// ----------------------------------------- +void usage() { + printf("\n"); + printf("%s my_port peer1_port peer2_port ... \n", exe_name); + printf("\n"); +} + +int main(int argc, char **argv) { + srand(time(NULL)); + + exe_name = argv[0]; + if (argc < 2) { + usage(); + exit(-1); + } + + // read conf from argv + strncpy(gConf.me.host, "127.0.0.1", HOST_LEN); + sscanf(argv[1], "%u", &gConf.me.port); + + gConf.peer_count = 0; + for (int i = 2; i < argc; ++i) { + strncpy(gConf.peers[gConf.peer_count].host, "127.0.0.1", HOST_LEN); + sscanf(argv[i], "%u", &gConf.peers[gConf.peer_count].port); + gConf.peer_count++; + } + printConf(&gConf); + + // mkdir + char dir[128]; + snprintf(dir, sizeof(dir), "./%s_%u", gConf.me.host, gConf.me.port); + + char cmd[128]; + snprintf(cmd, sizeof(cmd), "rm -rf ./%s", dir); + system(cmd); + snprintf(cmd, sizeof(cmd), "mkdir -p ./%s", dir); + system(cmd); + + // init io + uv_loop_init(&gRaftServer.loop); + raft_uv_tcp_init(&gRaftServer.transport, &gRaftServer.loop); + raft_uv_init(&gRaftServer.io, &gRaftServer.loop, dir, &gRaftServer.transport); + + // init fsm + gRaftServer.fsm.apply = applyCB; + gRaftServer.fsm.data = vnodes; + for (int i = 0; i < VNODE_COUNT; ++i) { + vnodes[i].vid = i; + } + + // init raft instance with io and fsm + char address_buf[128]; + snprintf(address_buf, sizeof(address_buf), "%s:%u", gConf.me.host, gConf.me.port); + + // test in a single machine, port is unique + uint64_t raft_id = raftId(&gConf.me); + raft_init(&gRaftServer.raft, &gRaftServer.io, &gRaftServer.fsm, raft_id, address_buf); + //raft_init(&gRaftServer.raft, &gRaftServer.io, &gRaftServer.fsm, 11, "127.0.0.1:9000"); + + // init cluster configuration + struct raft_configuration conf; + raft_configuration_init(&conf); + raft_configuration_add(&conf, raftId(&gConf.me), address_buf, RAFT_VOTER); + for (int i = 0; i < gConf.peer_count; ++i) { + char address_buf[128]; + snprintf(address_buf, sizeof(address_buf), "%s:%u", gConf.peers[i].host, gConf.peers[i].port); + raft_configuration_add(&conf, raftId(&gConf.peers[i]), address_buf, RAFT_VOTER); + } + raft_bootstrap(&gRaftServer.raft, &conf); + + // start raft server and loop + pthread_t tid; + pthread_create(&tid, NULL, startRaftServer, &gRaftServer); + + // simulate console + pthread_t tid2; + pthread_create(&tid2, NULL, console, NULL); + + while (1) { + sleep(10); + } + + return 0; +} From 76f70feb49a44b51460bd9a24279966fabcbee57 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Mon, 20 Dec 2021 09:34:28 +0800 Subject: [PATCH 6/6] add index TFile --- source/libs/index/inc/indexInt.h | 2 ++ source/libs/index/inc/index_cache.h | 2 +- source/libs/index/inc/index_tfile.h | 46 ++++++++++++++++++++++++++ source/libs/index/src/index.c | 50 ++++++++++++++++++++++++++--- source/libs/index/src/index_cache.c | 6 ++-- source/libs/index/src/index_tfile.c | 31 ++++++++++++++++++ 6 files changed, 128 insertions(+), 9 deletions(-) create mode 100644 source/libs/index/inc/index_tfile.h create mode 100644 source/libs/index/src/index_tfile.c diff --git a/source/libs/index/inc/indexInt.h b/source/libs/index/inc/indexInt.h index a96c8eecf4..a258cb834d 100644 --- a/source/libs/index/inc/indexInt.h +++ b/source/libs/index/inc/indexInt.h @@ -31,6 +31,8 @@ extern "C" { #endif +typedef enum {kTypeValue, kTypeDeletion} STermValueType ; + typedef struct SIndexStat { int32_t totalAdded; // int32_t totalDeled; // diff --git a/source/libs/index/inc/index_cache.h b/source/libs/index/inc/index_cache.h index e3b7ae273a..97a7b835f6 100644 --- a/source/libs/index/inc/index_cache.h +++ b/source/libs/index/inc/index_cache.h @@ -44,7 +44,7 @@ void indexCacheDestroy(void *cache); int indexCachePut(void *cache, SIndexTerm *term, int16_t colId, int32_t version, uint64_t uid); //int indexCacheGet(void *cache, uint64_t *rst); -int indexCacheSearch(void *cache, SIndexTermQuery *query, int16_t colId, int32_t version, SArray *result); +int indexCacheSearch(void *cache, SIndexTermQuery *query, int16_t colId, int32_t version, SArray *result, STermValueType *s); #ifdef __cplusplus } diff --git a/source/libs/index/inc/index_tfile.h b/source/libs/index/inc/index_tfile.h new file mode 100644 index 0000000000..c3f4bd25e5 --- /dev/null +++ b/source/libs/index/inc/index_tfile.h @@ -0,0 +1,46 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +#ifndef __INDEX_TFILE_H__ +#define __INDEX_TFILE_H__ + +#include "index.h" +#include "indexInt.h" +#include "tlockfree.h" +#include "tskiplist.h" + +#ifdef __cplusplus +extern "C" { +#endif + +typedef struct IndexTFile { + T_REF_DECLARE() +} IndexTFile; + + + +IndexTFile *indexTFileCreate(); + + +int indexTFileSearch(void *tfile, SIndexTermQuery *query, SArray *result); + +#ifdef __cplusplus +} + + +#endif + + + +#endif diff --git a/source/libs/index/src/index.c b/source/libs/index/src/index.c index e6c655d097..ec83e84a3b 100644 --- a/source/libs/index/src/index.c +++ b/source/libs/index/src/index.c @@ -16,13 +16,19 @@ #include "index.h" #include "indexInt.h" #include "index_cache.h" +#include "index_tfile.h" #include "tdef.h" #ifdef USE_LUCENE #include "lucene++/Lucene_c.h" #endif - +static int uidCompare(const void *a, const void *b) { + uint64_t u1 = *(uint64_t *)a; + uint64_t u2 = *(uint64_t *)b; + if (u1 == u2) { return 0; } + else { return u1 < u2 ? -1 : 1; } +} typedef struct SIdxColInfo { int colId; // generated by index internal int cVersion; @@ -273,9 +279,44 @@ void indexMultiTermDestroy(SIndexMultiTerm *terms) { void indexInit() { //do nothing } -static int indexTermSearch(SIndex *sIdx, SIndexTermQuery *term, SArray **result) { +static int indexTermSearch(SIndex *sIdx, SIndexTermQuery *query, SArray **result) { + int32_t version = -1; + int16_t colId = -1; + SIdxColInfo *colInfo = NULL; + + SIndexTerm *term = query->term; + const char *colName = term->colName; + int32_t nColName = term->nColName; + + pthread_mutex_lock(&sIdx->mtx); + colInfo = taosHashGet(sIdx->colObj, colName, nColName); + if (colInfo == NULL) { + pthread_mutex_unlock(&sIdx->mtx); + return -1; + } + colId = colInfo->colId; + version = colInfo->cVersion; + pthread_mutex_unlock(&sIdx->mtx); - return 0; + *result = taosArrayInit(4, sizeof(uint64_t)); + //TODO: iterator mem and tidex + STermValueType s; + if (0 == indexCacheSearch(sIdx->cache, query, colId, version, *result, &s)) { + if (s == kTypeDeletion) { + indexInfo("col: %s already drop by other opera", term->colName); + // coloum already drop by other oper, no need to query tindex + return 0; + } else { + if (0 != indexTFileSearch(sIdx->tindex, query, *result)) { + indexError("corrupt at index(TFile) col:%s val: %s", term->colName, term->colVal); + return -1; + } + } + } else { + indexError("corrupt at index(cache) col:%s val: %s", term->colName, term->colVal); + return -1; + } + return 0; } static void indexInterResultsDestroy(SArray *results) { if (results == NULL) { return; } @@ -291,8 +332,7 @@ static void indexInterResultsDestroy(SArray *results) { static int indexMergeFinalResults(SArray *interResults, EIndexOperatorType oType, SArray *fResults) { //refactor, merge interResults into fResults by oType SArray *first = taosArrayGetP(interResults, 0); - - //taosArraySort(first, getCom) + taosArraySort(first, uidCompare); if (oType == MUST) { } else if (oType == SHOULD) { diff --git a/source/libs/index/src/index_cache.c b/source/libs/index/src/index_cache.c index 5a23278908..3c52275a4c 100644 --- a/source/libs/index/src/index_cache.c +++ b/source/libs/index/src/index_cache.c @@ -46,7 +46,7 @@ static int32_t compareKey(const void *l, const void *r) { rp += sizeof(rf); // compare field type - int16_t lft, rft; + int8_t lft, rft; memcpy(&lft, lp, sizeof(lft)); memcpy(&rft, rp, sizeof(rft)); lp += sizeof(lft); @@ -149,7 +149,7 @@ int indexCacheDel(void *cache, int32_t fieldId, const char *fieldValue, int32_t IndexCache *pCache = cache; return 0; } -int indexCacheSearch(void *cache, SIndexTermQuery *query, int16_t colId, int32_t version, SArray *result) { +int indexCacheSearch(void *cache, SIndexTermQuery *query, int16_t colId, int32_t version, SArray *result, STermValueType *s) { if (cache == NULL) { return -1; } IndexCache *pCache = cache; SIndexTerm *term = query->term; @@ -159,7 +159,7 @@ int indexCacheSearch(void *cache, SIndexTermQuery *query, int16_t colId, int32_t char *buf = calloc(1, keyLen); if (qtype == QUERY_TERM) { - + } else if (qtype == QUERY_PREFIX) { } else if (qtype == QUERY_SUFFIX) { diff --git a/source/libs/index/src/index_tfile.c b/source/libs/index/src/index_tfile.c new file mode 100644 index 0000000000..a1bba56391 --- /dev/null +++ b/source/libs/index/src/index_tfile.c @@ -0,0 +1,31 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "index_tfile.h" + +IndexTFile *indexTFileCreate() { + IndexTFile *tfile = calloc(1, sizeof(IndexTFile)); + return tfile; +} +void IndexTFileDestroy(IndexTFile *tfile) { + free(tfile); +} +int indexTFileSearch(void *tfile, SIndexTermQuery *query, SArray *result) { + IndexTFile *ptfile = (IndexTFile *)tfile; + return 0; +} + + +