diff --git a/source/libs/index/CMakeLists.txt b/source/libs/index/CMakeLists.txt index 4805bd3b77..50e76abd3f 100644 --- a/source/libs/index/CMakeLists.txt +++ b/source/libs/index/CMakeLists.txt @@ -3,7 +3,9 @@ add_library(index ${INDEX_SRC}) target_include_directories( index PUBLIC "${CMAKE_SOURCE_DIR}/include/libs/index" + PUBLIC "${CMAKE_SOURCE_DIR}/include/os" PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc" + ) target_link_libraries( index diff --git a/source/libs/index/inc/index_fst_automation.h b/source/libs/index/inc/index_fst_automation.h index c7269eda0f..be056f38fa 100644 --- a/source/libs/index/inc/index_fst_automation.h +++ b/source/libs/index/inc/index_fst_automation.h @@ -23,7 +23,7 @@ extern "C" { typedef struct AutomationCtx AutomationCtx; -typedef enum AutomationType { AUTOMATION_PREFIX, AUTMMATION_MATCH } AutomationType; +typedef enum AutomationType { AUTOMATION_ALWAYS, AUTOMATION_PREFIX, AUTMMATION_MATCH } AutomationType; typedef struct StartWith { AutomationCtx* autoSelf; diff --git a/source/libs/index/src/index.c b/source/libs/index/src/index.c index 1c65dd03d5..1fec9e8d0b 100644 --- a/source/libs/index/src/index.c +++ b/source/libs/index/src/index.c @@ -355,7 +355,6 @@ static int indexMergeFinalResults(SArray* interResults, EIndexOperatorType oType } static int indexFlushCacheTFile(SIndex* sIdx) { if (sIdx == NULL) { return -1; } - indexWarn("suid %" PRIu64 " merge cache into tindex", sIdx->suid); return 0; diff --git a/source/libs/index/src/index_fst.c b/source/libs/index/src/index_fst.c index 18514cd0d5..18024fa391 100644 --- a/source/libs/index/src/index_fst.c +++ b/source/libs/index/src/index_fst.c @@ -1083,7 +1083,7 @@ bool fstBoundWithDataExceededBy(FstBoundWithData* bound, FstSlice* slice) { } else if (bound->type == Excluded) { return comp >= 0 ? true : false; } else { - return true; + return false; } } bool fstBoundWithDataIsEmpty(FstBoundWithData* bound) { @@ -1224,7 +1224,7 @@ StreamWithStateResult* streamWithStateNextWith(StreamWithState* sws, StreamCallb void* start = automFuncs[aut->type].start(aut); if (automFuncs[aut->type].isMatch(aut, start)) { FstSlice s = fstSliceCreate(NULL, 0); - return swsResultCreate(&s, output, callback(start)); + return swsResultCreate(&s, output, callback == NULL ? NULL : callback(start)); } } SArray* nodes = taosArrayInit(8, sizeof(FstNode*)); @@ -1237,10 +1237,12 @@ StreamWithStateResult* streamWithStateNextWith(StreamWithState* sws, StreamCallb } FstTransition trn; fstNodeGetTransitionAt(p->node, p->trans, &trn); - Output out = p->out.out + trn.out; - void* nextState = automFuncs[aut->type].accept(aut, p->autState, trn.inp); - void* tState = callback(nextState); - bool isMatch = automFuncs[aut->type].isMatch(aut, nextState); + + Output out = p->out.out + trn.out; + void* nextState = automFuncs[aut->type].accept(aut, p->autState, trn.inp); + void* tState = (callback == NULL) ? NULL : callback(nextState); + bool isMatch = automFuncs[aut->type].isMatch(aut, nextState); + FstNode* nextNode = fstGetNode(sws->fst, trn.addr); taosArrayPush(nodes, &nextNode); taosArrayPush(sws->inp, &(trn.inp)); diff --git a/source/libs/index/src/index_fst_automation.c b/source/libs/index/src/index_fst_automation.c index eb4c479c94..72983809d1 100644 --- a/source/libs/index/src/index_fst_automation.c +++ b/source/libs/index/src/index_fst_automation.c @@ -64,6 +64,25 @@ StartWithStateValue* startWithStateValueDump(StartWithStateValue* sv) { return nsv; } +// iterate fst +static void* alwaysMatchStart(AutomationCtx* ctx) { + return NULL; +} +static bool alwaysMatchIsMatch(AutomationCtx* ctx, void* state) { + return true; +} +static bool alwaysMatchCanMatch(AutomationCtx* ctx, void* state) { + return true; +} +static bool alwaysMatchWillAlwaysMatch(AutomationCtx* ctx, void* state) { + return true; +} +static void* alwaysMatchAccpet(AutomationCtx* ctx, void* state, uint8_t byte) { + return NULL; +} +static void* alwaysMatchAccpetEof(AutomationCtx* ctx, void* state) { + return NULL; +} // prefix query, impl later static void* prefixStart(AutomationCtx* ctx) { @@ -127,6 +146,7 @@ static void* patternAcceptEof(AutomationCtx* ctx, void* state) { } AutomationFunc automFuncs[] = { + {alwaysMatchStart, alwaysMatchIsMatch, alwaysMatchCanMatch, alwaysMatchWillAlwaysMatch, alwaysMatchAccpet, alwaysMatchAccpetEof}, {prefixStart, prefixIsMatch, prefixCanMatch, prefixWillAlwaysMatch, prefixAccept, prefixAcceptEof}, {patternStart, patternIsMatch, patternCanMatch, patternWillAlwaysMatch, patternAccept, patternAcceptEof} // add more search type @@ -137,7 +157,11 @@ AutomationCtx* automCtxCreate(void* data, AutomationType atype) { if (ctx == NULL) { return NULL; } StartWithStateValue* sv = NULL; - if (atype == AUTOMATION_PREFIX) { + if (atype == AUTOMATION_ALWAYS) { + int val = 0; + sv = startWithStateValueCreate(Running, FST_INT, &val); + ctx->stdata = (void*)sv; + } else if (atype == AUTOMATION_PREFIX) { int val = 0; sv = startWithStateValueCreate(Running, FST_INT, &val); ctx->stdata = (void*)sv; diff --git a/source/libs/index/test/CMakeLists.txt b/source/libs/index/test/CMakeLists.txt index 6eb532b41e..3957554748 100644 --- a/source/libs/index/test/CMakeLists.txt +++ b/source/libs/index/test/CMakeLists.txt @@ -1,13 +1,23 @@ add_executable(indexTest "") +add_executable(fstTest "") target_sources(indexTest PRIVATE "indexTests.cc" ) +target_sources(fstTest + PRIVATE + "fstTest.cc" +) target_include_directories ( indexTest PUBLIC "${CMAKE_SOURCE_DIR}/include/libs/index" "${CMAKE_CURRENT_SOURCE_DIR}/../inc" ) +target_include_directories ( fstTest + PUBLIC + "${CMAKE_SOURCE_DIR}/include/libs/index" + "${CMAKE_CURRENT_SOURCE_DIR}/../inc" +) target_link_libraries (indexTest os util @@ -15,8 +25,16 @@ target_link_libraries (indexTest gtest_main index ) - -add_test( - NAME index_test - COMMAND indexTest +target_link_libraries (fstTest + os + util + common + gtest_main + index ) + + +#add_test( +# NAME index_test +# COMMAND indexTest +#) diff --git a/source/libs/index/test/fstTest.cc b/source/libs/index/test/fstTest.cc new file mode 100644 index 0000000000..85bb5e2b15 --- /dev/null +++ b/source/libs/index/test/fstTest.cc @@ -0,0 +1,174 @@ + +#include +#include +#include +#include "index.h" +#include "indexInt.h" +#include "index_cache.h" +#include "index_fst.h" +#include "index_fst_counting_writer.h" +#include "index_fst_util.h" +#include "index_tfile.h" +#include "tskiplist.h" +#include "tutil.h" + +void* callback(void* s) { + return s; +} + +static std::string fileName = "/tmp/tindex.tindex"; +class FstWriter { + public: + FstWriter() { + remove(fileName.c_str()); + _wc = writerCtxCreate(TFile, fileName.c_str(), false, 64 * 1024 * 1024); + _b = fstBuilderCreate(_wc, 0); + } + bool Put(const std::string& key, uint64_t val) { + FstSlice skey = fstSliceCreate((uint8_t*)key.c_str(), key.size()); + bool ok = fstBuilderInsert(_b, skey, val); + fstSliceDestroy(&skey); + return ok; + } + ~FstWriter() { + fstBuilderFinish(_b); + fstBuilderDestroy(_b); + + writerCtxDestroy(_wc); + } + + private: + FstBuilder* _b; + WriterCtx* _wc; +}; + +class FstReadMemory { + public: + FstReadMemory(size_t size) { + _wc = writerCtxCreate(TFile, fileName.c_str(), true, 64 * 1024); + _w = fstCountingWriterCreate(_wc); + _size = size; + memset((void*)&_s, 0, sizeof(_s)); + } + bool init() { + char* buf = (char*)calloc(1, sizeof(char) * _size); + int nRead = fstCountingWriterRead(_w, (uint8_t*)buf, _size); + if (nRead <= 0) { return false; } + _size = nRead; + _s = fstSliceCreate((uint8_t*)buf, _size); + _fst = fstCreate(&_s); + free(buf); + return _fst != NULL; + } + bool Get(const std::string& key, uint64_t* val) { + FstSlice skey = fstSliceCreate((uint8_t*)key.c_str(), key.size()); + bool ok = fstGet(_fst, &skey, val); + fstSliceDestroy(&skey); + return ok; + } + bool GetWithTimeCostUs(const std::string& key, uint64_t* val, uint64_t* elapse) { + int64_t s = taosGetTimestampUs(); + bool ok = this->Get(key, val); + int64_t e = taosGetTimestampUs(); + *elapse = e - s; + return ok; + } + // add later + bool Search(AutomationCtx* ctx, std::vector& result) { + FstStreamBuilder* sb = fstSearch(_fst, ctx); + StreamWithState* st = streamBuilderIntoStream(sb); + StreamWithStateResult* rt = NULL; + while ((rt = streamWithStateNextWith(st, NULL)) != NULL) { + // result.push_back((uint64_t)(rt->out.out)); + FstSlice* s = &rt->data; + int32_t sz = 0; + char* ch = (char*)fstSliceData(s, &sz); + std::string key(ch, sz); + printf("key: %s, val: %" PRIu64 "\n", key.c_str(), (uint64_t)(rt->out.out)); + swsResultDestroy(rt); + } + for (size_t i = 0; i < result.size(); i++) {} + std::cout << std::endl; + return true; + } + bool SearchWithTimeCostUs(AutomationCtx* ctx, std::vector& result) { + int64_t s = taosGetTimestampUs(); + bool ok = this->Search(ctx, result); + int64_t e = taosGetTimestampUs(); + return ok; + } + + ~FstReadMemory() { + fstCountingWriterDestroy(_w); + fstDestroy(_fst); + fstSliceDestroy(&_s); + writerCtxDestroy(_wc); + } + + private: + FstCountingWriter* _w; + Fst* _fst; + FstSlice _s; + WriterCtx* _wc; + size_t _size; +}; + +#define L 100 +#define M 100 +#define N 100 + +int Performance_fstWriteRecords(FstWriter* b) { + std::string str("aa"); + for (int i = 0; i < L; i++) { + str[0] = 'a' + i; + str.resize(2); + for (int j = 0; j < M; j++) { + str[1] = 'a' + j; + str.resize(2); + for (int k = 0; k < N; k++) { + str.push_back('a'); + b->Put(str, k); + printf("(%d, %d, %d, %s)\n", i, j, k, str.c_str()); + } + } + } + return L * M * N; +} +void checkFstCheckIterator() { + tfInit(); + FstWriter* fw = new FstWriter; + int64_t s = taosGetTimestampUs(); + int count = 2; + Performance_fstWriteRecords(fw); + int64_t e = taosGetTimestampUs(); + + std::cout << "insert data count : " << count << "elapas time: " << e - s << std::endl; + delete fw; + + FstReadMemory* m = new FstReadMemory(1024 * 64); + if (m->init() == false) { + std::cout << "init readMemory failed" << std::endl; + delete m; + return; + } + + // prefix search + std::vector result; + + AutomationCtx* ctx = automCtxCreate((void*)"ab", AUTOMATION_ALWAYS); + m->Search(ctx, result); + std::cout << "size: " << result.size() << std::endl; + // assert(result.size() == count); + for (int i = 0; i < result.size(); i++) { + // assert(result[i] == i); // check result + } + + free(ctx); + delete m; + tfCleanup(); +} +int main() { + checkFstCheckIterator(); + // checkFstPrefixSearch(); + return 1; +} diff --git a/source/libs/index/test/indexTests.cc b/source/libs/index/test/indexTests.cc index bed2b82daa..b046f32c21 100644 --- a/source/libs/index/test/indexTests.cc +++ b/source/libs/index/test/indexTests.cc @@ -481,6 +481,10 @@ class CacheObj { } return ret; } + void Debug() { + // + indexCacheDebug(cache); + } int Get(SIndexTermQuery* query, int16_t colId, int32_t version, SArray* result, STermValueType* s) { int ret = indexCacheSearch(cache, query, colId, version, result, s); if (ret != 0) { @@ -515,6 +519,7 @@ class IndexCacheEnv : public ::testing::Test { TEST_F(IndexCacheEnv, cache_test) { int version = 0; int16_t colId = 0; + int16_t othColId = 10; uint64_t suid = 0; std::string colName("voltage"); @@ -544,6 +549,16 @@ TEST_F(IndexCacheEnv, cache_test) { coj->Put(term, colId, version++, suid++); } + { + std::string colVal("v3"); + SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), colVal.c_str(), colVal.size()); + coj->Put(term, othColId, version++, suid++); + } + { + std::string colVal("v4"); + SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), colVal.c_str(), colVal.size()); + coj->Put(term, othColId, version++, suid++); + } { std::string colVal("v4"); for (size_t i = 0; i < 100; i++) { @@ -553,6 +568,8 @@ TEST_F(IndexCacheEnv, cache_test) { coj->Put(term, colId, version++, suid++); } } + coj->Debug(); + // begin query { std::string colVal("v3"); SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), colVal.c_str(), colVal.size());