add iterator interface for fst
This commit is contained in:
parent
0365c67b81
commit
f45119ea5e
|
@ -3,7 +3,9 @@ add_library(index ${INDEX_SRC})
|
|||
target_include_directories(
|
||||
index
|
||||
PUBLIC "${CMAKE_SOURCE_DIR}/include/libs/index"
|
||||
PUBLIC "${CMAKE_SOURCE_DIR}/include/os"
|
||||
PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc"
|
||||
|
||||
)
|
||||
target_link_libraries(
|
||||
index
|
||||
|
|
|
@ -23,7 +23,7 @@ extern "C" {
|
|||
|
||||
typedef struct AutomationCtx AutomationCtx;
|
||||
|
||||
typedef enum AutomationType { AUTOMATION_PREFIX, AUTMMATION_MATCH } AutomationType;
|
||||
typedef enum AutomationType { AUTOMATION_ALWAYS, AUTOMATION_PREFIX, AUTMMATION_MATCH } AutomationType;
|
||||
|
||||
typedef struct StartWith {
|
||||
AutomationCtx* autoSelf;
|
||||
|
|
|
@ -355,7 +355,6 @@ static int indexMergeFinalResults(SArray* interResults, EIndexOperatorType oType
|
|||
}
|
||||
static int indexFlushCacheTFile(SIndex* sIdx) {
|
||||
if (sIdx == NULL) { return -1; }
|
||||
|
||||
indexWarn("suid %" PRIu64 " merge cache into tindex", sIdx->suid);
|
||||
|
||||
return 0;
|
||||
|
|
|
@ -1083,7 +1083,7 @@ bool fstBoundWithDataExceededBy(FstBoundWithData* bound, FstSlice* slice) {
|
|||
} else if (bound->type == Excluded) {
|
||||
return comp >= 0 ? true : false;
|
||||
} else {
|
||||
return true;
|
||||
return false;
|
||||
}
|
||||
}
|
||||
bool fstBoundWithDataIsEmpty(FstBoundWithData* bound) {
|
||||
|
@ -1224,7 +1224,7 @@ StreamWithStateResult* streamWithStateNextWith(StreamWithState* sws, StreamCallb
|
|||
void* start = automFuncs[aut->type].start(aut);
|
||||
if (automFuncs[aut->type].isMatch(aut, start)) {
|
||||
FstSlice s = fstSliceCreate(NULL, 0);
|
||||
return swsResultCreate(&s, output, callback(start));
|
||||
return swsResultCreate(&s, output, callback == NULL ? NULL : callback(start));
|
||||
}
|
||||
}
|
||||
SArray* nodes = taosArrayInit(8, sizeof(FstNode*));
|
||||
|
@ -1237,10 +1237,12 @@ StreamWithStateResult* streamWithStateNextWith(StreamWithState* sws, StreamCallb
|
|||
}
|
||||
FstTransition trn;
|
||||
fstNodeGetTransitionAt(p->node, p->trans, &trn);
|
||||
Output out = p->out.out + trn.out;
|
||||
void* nextState = automFuncs[aut->type].accept(aut, p->autState, trn.inp);
|
||||
void* tState = callback(nextState);
|
||||
bool isMatch = automFuncs[aut->type].isMatch(aut, nextState);
|
||||
|
||||
Output out = p->out.out + trn.out;
|
||||
void* nextState = automFuncs[aut->type].accept(aut, p->autState, trn.inp);
|
||||
void* tState = (callback == NULL) ? NULL : callback(nextState);
|
||||
bool isMatch = automFuncs[aut->type].isMatch(aut, nextState);
|
||||
|
||||
FstNode* nextNode = fstGetNode(sws->fst, trn.addr);
|
||||
taosArrayPush(nodes, &nextNode);
|
||||
taosArrayPush(sws->inp, &(trn.inp));
|
||||
|
|
|
@ -64,6 +64,25 @@ StartWithStateValue* startWithStateValueDump(StartWithStateValue* sv) {
|
|||
return nsv;
|
||||
}
|
||||
|
||||
// iterate fst
|
||||
static void* alwaysMatchStart(AutomationCtx* ctx) {
|
||||
return NULL;
|
||||
}
|
||||
static bool alwaysMatchIsMatch(AutomationCtx* ctx, void* state) {
|
||||
return true;
|
||||
}
|
||||
static bool alwaysMatchCanMatch(AutomationCtx* ctx, void* state) {
|
||||
return true;
|
||||
}
|
||||
static bool alwaysMatchWillAlwaysMatch(AutomationCtx* ctx, void* state) {
|
||||
return true;
|
||||
}
|
||||
static void* alwaysMatchAccpet(AutomationCtx* ctx, void* state, uint8_t byte) {
|
||||
return NULL;
|
||||
}
|
||||
static void* alwaysMatchAccpetEof(AutomationCtx* ctx, void* state) {
|
||||
return NULL;
|
||||
}
|
||||
// prefix query, impl later
|
||||
|
||||
static void* prefixStart(AutomationCtx* ctx) {
|
||||
|
@ -127,6 +146,7 @@ static void* patternAcceptEof(AutomationCtx* ctx, void* state) {
|
|||
}
|
||||
|
||||
AutomationFunc automFuncs[] = {
|
||||
{alwaysMatchStart, alwaysMatchIsMatch, alwaysMatchCanMatch, alwaysMatchWillAlwaysMatch, alwaysMatchAccpet, alwaysMatchAccpetEof},
|
||||
{prefixStart, prefixIsMatch, prefixCanMatch, prefixWillAlwaysMatch, prefixAccept, prefixAcceptEof},
|
||||
{patternStart, patternIsMatch, patternCanMatch, patternWillAlwaysMatch, patternAccept, patternAcceptEof}
|
||||
// add more search type
|
||||
|
@ -137,7 +157,11 @@ AutomationCtx* automCtxCreate(void* data, AutomationType atype) {
|
|||
if (ctx == NULL) { return NULL; }
|
||||
|
||||
StartWithStateValue* sv = NULL;
|
||||
if (atype == AUTOMATION_PREFIX) {
|
||||
if (atype == AUTOMATION_ALWAYS) {
|
||||
int val = 0;
|
||||
sv = startWithStateValueCreate(Running, FST_INT, &val);
|
||||
ctx->stdata = (void*)sv;
|
||||
} else if (atype == AUTOMATION_PREFIX) {
|
||||
int val = 0;
|
||||
sv = startWithStateValueCreate(Running, FST_INT, &val);
|
||||
ctx->stdata = (void*)sv;
|
||||
|
|
|
@ -1,13 +1,23 @@
|
|||
add_executable(indexTest "")
|
||||
add_executable(fstTest "")
|
||||
target_sources(indexTest
|
||||
PRIVATE
|
||||
"indexTests.cc"
|
||||
)
|
||||
target_sources(fstTest
|
||||
PRIVATE
|
||||
"fstTest.cc"
|
||||
)
|
||||
target_include_directories ( indexTest
|
||||
PUBLIC
|
||||
"${CMAKE_SOURCE_DIR}/include/libs/index"
|
||||
"${CMAKE_CURRENT_SOURCE_DIR}/../inc"
|
||||
)
|
||||
target_include_directories ( fstTest
|
||||
PUBLIC
|
||||
"${CMAKE_SOURCE_DIR}/include/libs/index"
|
||||
"${CMAKE_CURRENT_SOURCE_DIR}/../inc"
|
||||
)
|
||||
target_link_libraries (indexTest
|
||||
os
|
||||
util
|
||||
|
@ -15,8 +25,16 @@ target_link_libraries (indexTest
|
|||
gtest_main
|
||||
index
|
||||
)
|
||||
|
||||
add_test(
|
||||
NAME index_test
|
||||
COMMAND indexTest
|
||||
target_link_libraries (fstTest
|
||||
os
|
||||
util
|
||||
common
|
||||
gtest_main
|
||||
index
|
||||
)
|
||||
|
||||
|
||||
#add_test(
|
||||
# NAME index_test
|
||||
# COMMAND indexTest
|
||||
#)
|
||||
|
|
|
@ -0,0 +1,174 @@
|
|||
|
||||
#include <iostream>
|
||||
#include <string>
|
||||
#include <vector>
|
||||
#include "index.h"
|
||||
#include "indexInt.h"
|
||||
#include "index_cache.h"
|
||||
#include "index_fst.h"
|
||||
#include "index_fst_counting_writer.h"
|
||||
#include "index_fst_util.h"
|
||||
#include "index_tfile.h"
|
||||
#include "tskiplist.h"
|
||||
#include "tutil.h"
|
||||
|
||||
void* callback(void* s) {
|
||||
return s;
|
||||
}
|
||||
|
||||
static std::string fileName = "/tmp/tindex.tindex";
|
||||
class FstWriter {
|
||||
public:
|
||||
FstWriter() {
|
||||
remove(fileName.c_str());
|
||||
_wc = writerCtxCreate(TFile, fileName.c_str(), false, 64 * 1024 * 1024);
|
||||
_b = fstBuilderCreate(_wc, 0);
|
||||
}
|
||||
bool Put(const std::string& key, uint64_t val) {
|
||||
FstSlice skey = fstSliceCreate((uint8_t*)key.c_str(), key.size());
|
||||
bool ok = fstBuilderInsert(_b, skey, val);
|
||||
fstSliceDestroy(&skey);
|
||||
return ok;
|
||||
}
|
||||
~FstWriter() {
|
||||
fstBuilderFinish(_b);
|
||||
fstBuilderDestroy(_b);
|
||||
|
||||
writerCtxDestroy(_wc);
|
||||
}
|
||||
|
||||
private:
|
||||
FstBuilder* _b;
|
||||
WriterCtx* _wc;
|
||||
};
|
||||
|
||||
class FstReadMemory {
|
||||
public:
|
||||
FstReadMemory(size_t size) {
|
||||
_wc = writerCtxCreate(TFile, fileName.c_str(), true, 64 * 1024);
|
||||
_w = fstCountingWriterCreate(_wc);
|
||||
_size = size;
|
||||
memset((void*)&_s, 0, sizeof(_s));
|
||||
}
|
||||
bool init() {
|
||||
char* buf = (char*)calloc(1, sizeof(char) * _size);
|
||||
int nRead = fstCountingWriterRead(_w, (uint8_t*)buf, _size);
|
||||
if (nRead <= 0) { return false; }
|
||||
_size = nRead;
|
||||
_s = fstSliceCreate((uint8_t*)buf, _size);
|
||||
_fst = fstCreate(&_s);
|
||||
free(buf);
|
||||
return _fst != NULL;
|
||||
}
|
||||
bool Get(const std::string& key, uint64_t* val) {
|
||||
FstSlice skey = fstSliceCreate((uint8_t*)key.c_str(), key.size());
|
||||
bool ok = fstGet(_fst, &skey, val);
|
||||
fstSliceDestroy(&skey);
|
||||
return ok;
|
||||
}
|
||||
bool GetWithTimeCostUs(const std::string& key, uint64_t* val, uint64_t* elapse) {
|
||||
int64_t s = taosGetTimestampUs();
|
||||
bool ok = this->Get(key, val);
|
||||
int64_t e = taosGetTimestampUs();
|
||||
*elapse = e - s;
|
||||
return ok;
|
||||
}
|
||||
// add later
|
||||
bool Search(AutomationCtx* ctx, std::vector<uint64_t>& result) {
|
||||
FstStreamBuilder* sb = fstSearch(_fst, ctx);
|
||||
StreamWithState* st = streamBuilderIntoStream(sb);
|
||||
StreamWithStateResult* rt = NULL;
|
||||
while ((rt = streamWithStateNextWith(st, NULL)) != NULL) {
|
||||
// result.push_back((uint64_t)(rt->out.out));
|
||||
FstSlice* s = &rt->data;
|
||||
int32_t sz = 0;
|
||||
char* ch = (char*)fstSliceData(s, &sz);
|
||||
std::string key(ch, sz);
|
||||
printf("key: %s, val: %" PRIu64 "\n", key.c_str(), (uint64_t)(rt->out.out));
|
||||
swsResultDestroy(rt);
|
||||
}
|
||||
for (size_t i = 0; i < result.size(); i++) {}
|
||||
std::cout << std::endl;
|
||||
return true;
|
||||
}
|
||||
bool SearchWithTimeCostUs(AutomationCtx* ctx, std::vector<uint64_t>& result) {
|
||||
int64_t s = taosGetTimestampUs();
|
||||
bool ok = this->Search(ctx, result);
|
||||
int64_t e = taosGetTimestampUs();
|
||||
return ok;
|
||||
}
|
||||
|
||||
~FstReadMemory() {
|
||||
fstCountingWriterDestroy(_w);
|
||||
fstDestroy(_fst);
|
||||
fstSliceDestroy(&_s);
|
||||
writerCtxDestroy(_wc);
|
||||
}
|
||||
|
||||
private:
|
||||
FstCountingWriter* _w;
|
||||
Fst* _fst;
|
||||
FstSlice _s;
|
||||
WriterCtx* _wc;
|
||||
size_t _size;
|
||||
};
|
||||
|
||||
#define L 100
|
||||
#define M 100
|
||||
#define N 100
|
||||
|
||||
int Performance_fstWriteRecords(FstWriter* b) {
|
||||
std::string str("aa");
|
||||
for (int i = 0; i < L; i++) {
|
||||
str[0] = 'a' + i;
|
||||
str.resize(2);
|
||||
for (int j = 0; j < M; j++) {
|
||||
str[1] = 'a' + j;
|
||||
str.resize(2);
|
||||
for (int k = 0; k < N; k++) {
|
||||
str.push_back('a');
|
||||
b->Put(str, k);
|
||||
printf("(%d, %d, %d, %s)\n", i, j, k, str.c_str());
|
||||
}
|
||||
}
|
||||
}
|
||||
return L * M * N;
|
||||
}
|
||||
void checkFstCheckIterator() {
|
||||
tfInit();
|
||||
FstWriter* fw = new FstWriter;
|
||||
int64_t s = taosGetTimestampUs();
|
||||
int count = 2;
|
||||
Performance_fstWriteRecords(fw);
|
||||
int64_t e = taosGetTimestampUs();
|
||||
|
||||
std::cout << "insert data count : " << count << "elapas time: " << e - s << std::endl;
|
||||
delete fw;
|
||||
|
||||
FstReadMemory* m = new FstReadMemory(1024 * 64);
|
||||
if (m->init() == false) {
|
||||
std::cout << "init readMemory failed" << std::endl;
|
||||
delete m;
|
||||
return;
|
||||
}
|
||||
|
||||
// prefix search
|
||||
std::vector<uint64_t> result;
|
||||
|
||||
AutomationCtx* ctx = automCtxCreate((void*)"ab", AUTOMATION_ALWAYS);
|
||||
m->Search(ctx, result);
|
||||
std::cout << "size: " << result.size() << std::endl;
|
||||
// assert(result.size() == count);
|
||||
for (int i = 0; i < result.size(); i++) {
|
||||
// assert(result[i] == i); // check result
|
||||
}
|
||||
|
||||
free(ctx);
|
||||
delete m;
|
||||
tfCleanup();
|
||||
}
|
||||
int main() {
|
||||
checkFstCheckIterator();
|
||||
// checkFstPrefixSearch();
|
||||
return 1;
|
||||
}
|
|
@ -481,6 +481,10 @@ class CacheObj {
|
|||
}
|
||||
return ret;
|
||||
}
|
||||
void Debug() {
|
||||
//
|
||||
indexCacheDebug(cache);
|
||||
}
|
||||
int Get(SIndexTermQuery* query, int16_t colId, int32_t version, SArray* result, STermValueType* s) {
|
||||
int ret = indexCacheSearch(cache, query, colId, version, result, s);
|
||||
if (ret != 0) {
|
||||
|
@ -515,6 +519,7 @@ class IndexCacheEnv : public ::testing::Test {
|
|||
TEST_F(IndexCacheEnv, cache_test) {
|
||||
int version = 0;
|
||||
int16_t colId = 0;
|
||||
int16_t othColId = 10;
|
||||
|
||||
uint64_t suid = 0;
|
||||
std::string colName("voltage");
|
||||
|
@ -544,6 +549,16 @@ TEST_F(IndexCacheEnv, cache_test) {
|
|||
coj->Put(term, colId, version++, suid++);
|
||||
}
|
||||
|
||||
{
|
||||
std::string colVal("v3");
|
||||
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), colVal.c_str(), colVal.size());
|
||||
coj->Put(term, othColId, version++, suid++);
|
||||
}
|
||||
{
|
||||
std::string colVal("v4");
|
||||
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), colVal.c_str(), colVal.size());
|
||||
coj->Put(term, othColId, version++, suid++);
|
||||
}
|
||||
{
|
||||
std::string colVal("v4");
|
||||
for (size_t i = 0; i < 100; i++) {
|
||||
|
@ -553,6 +568,8 @@ TEST_F(IndexCacheEnv, cache_test) {
|
|||
coj->Put(term, colId, version++, suid++);
|
||||
}
|
||||
}
|
||||
coj->Debug();
|
||||
// begin query
|
||||
{
|
||||
std::string colVal("v3");
|
||||
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), colVal.c_str(), colVal.size());
|
||||
|
|
Loading…
Reference in New Issue