Merge pull request #10357 from taosdata/feature/revertIndexImpl
add index test UT
This commit is contained in:
commit
c06aaf01fa
|
@ -66,7 +66,9 @@ static void indexMergeSameKey(SArray* result, TFileValue* tv);
|
|||
int indexOpen(SIndexOpts* opts, const char* path, SIndex** index) {
|
||||
pthread_once(&isInit, indexInit);
|
||||
SIndex* sIdx = calloc(1, sizeof(SIndex));
|
||||
if (sIdx == NULL) { return -1; }
|
||||
if (sIdx == NULL) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
#ifdef USE_LUCENE
|
||||
index_t* index = index_open(path);
|
||||
|
@ -76,7 +78,9 @@ int indexOpen(SIndexOpts* opts, const char* path, SIndex** index) {
|
|||
#ifdef USE_INVERTED_INDEX
|
||||
// sIdx->cache = (void*)indexCacheCreate(sIdx);
|
||||
sIdx->tindex = indexTFileCreate(path);
|
||||
if (sIdx->tindex == NULL) { goto END; }
|
||||
if (sIdx->tindex == NULL) {
|
||||
goto END;
|
||||
}
|
||||
|
||||
sIdx->colObj = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
|
||||
sIdx->cVersion = 1;
|
||||
|
@ -87,7 +91,9 @@ int indexOpen(SIndexOpts* opts, const char* path, SIndex** index) {
|
|||
#endif
|
||||
|
||||
END:
|
||||
if (sIdx != NULL) { indexClose(sIdx); }
|
||||
if (sIdx != NULL) {
|
||||
indexClose(sIdx);
|
||||
}
|
||||
|
||||
*index = NULL;
|
||||
return -1;
|
||||
|
@ -103,7 +109,9 @@ void indexClose(SIndex* sIdx) {
|
|||
void* iter = taosHashIterate(sIdx->colObj, NULL);
|
||||
while (iter) {
|
||||
IndexCache** pCache = iter;
|
||||
if (*pCache) { indexCacheUnRef(*pCache); }
|
||||
if (*pCache) {
|
||||
indexCacheUnRef(*pCache);
|
||||
}
|
||||
iter = taosHashIterate(sIdx->colObj, iter);
|
||||
}
|
||||
taosHashCleanup(sIdx->colObj);
|
||||
|
@ -161,7 +169,9 @@ int indexPut(SIndex* index, SIndexMultiTerm* fVals, uint64_t uid) {
|
|||
IndexCache** cache = taosHashGet(index->colObj, buf, sz);
|
||||
assert(*cache != NULL);
|
||||
int ret = indexCachePut(*cache, p, uid);
|
||||
if (ret != 0) { return ret; }
|
||||
if (ret != 0) {
|
||||
return ret;
|
||||
}
|
||||
}
|
||||
|
||||
#endif
|
||||
|
@ -191,7 +201,9 @@ int indexSearch(SIndex* index, SIndexMultiTermQuery* multiQuerys, SArray* result
|
|||
int tsz = 0;
|
||||
index_multi_search(index->index, (const char**)fields, (const char**)keys, types, nQuery, opera, &tResult, &tsz);
|
||||
|
||||
for (int i = 0; i < tsz; i++) { taosArrayPush(result, &tResult[i]); }
|
||||
for (int i = 0; i < tsz; i++) {
|
||||
taosArrayPush(result, &tResult[i]);
|
||||
}
|
||||
|
||||
for (int i = 0; i < nQuery; i++) {
|
||||
free(fields[i]);
|
||||
|
@ -248,7 +260,9 @@ void indexOptsDestroy(SIndexOpts* opts) {
|
|||
*/
|
||||
SIndexMultiTermQuery* indexMultiTermQueryCreate(EIndexOperatorType opera) {
|
||||
SIndexMultiTermQuery* p = (SIndexMultiTermQuery*)malloc(sizeof(SIndexMultiTermQuery));
|
||||
if (p == NULL) { return NULL; }
|
||||
if (p == NULL) {
|
||||
return NULL;
|
||||
}
|
||||
p->opera = opera;
|
||||
p->query = taosArrayInit(4, sizeof(SIndexTermQuery));
|
||||
return p;
|
||||
|
@ -270,7 +284,9 @@ int indexMultiTermQueryAdd(SIndexMultiTermQuery* pQuery, SIndexTerm* term, EInde
|
|||
SIndexTerm* indexTermCreate(int64_t suid, SIndexOperOnColumn oper, uint8_t colType, const char* colName,
|
||||
int32_t nColName, const char* colVal, int32_t nColVal) {
|
||||
SIndexTerm* t = (SIndexTerm*)calloc(1, (sizeof(SIndexTerm)));
|
||||
if (t == NULL) { return NULL; }
|
||||
if (t == NULL) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
t->suid = suid;
|
||||
t->operType = oper;
|
||||
|
@ -343,7 +359,9 @@ static int indexTermSearch(SIndex* sIdx, SIndexTermQuery* query, SArray** result
|
|||
return 0;
|
||||
}
|
||||
static void indexInterResultsDestroy(SArray* results) {
|
||||
if (results == NULL) { return; }
|
||||
if (results == NULL) {
|
||||
return;
|
||||
}
|
||||
|
||||
size_t sz = taosArrayGetSize(results);
|
||||
for (size_t i = 0; i < sz; i++) {
|
||||
|
@ -419,18 +437,24 @@ static void indexDestroyTempResult(SArray* result) {
|
|||
taosArrayDestroy(result);
|
||||
}
|
||||
int indexFlushCacheToTFile(SIndex* sIdx, void* cache) {
|
||||
if (sIdx == NULL) { return -1; }
|
||||
if (sIdx == NULL) {
|
||||
return -1;
|
||||
}
|
||||
indexInfo("suid %" PRIu64 " merge cache into tindex", sIdx->suid);
|
||||
|
||||
int64_t st = taosGetTimestampUs();
|
||||
|
||||
IndexCache* pCache = (IndexCache*)cache;
|
||||
TFileReader* pReader = tfileGetReaderByCol(sIdx->tindex, pCache->suid, pCache->colName);
|
||||
if (pReader == NULL) { indexWarn("empty tfile reader found"); }
|
||||
if (pReader == NULL) {
|
||||
indexWarn("empty tfile reader found");
|
||||
}
|
||||
// handle flush
|
||||
Iterate* cacheIter = indexCacheIteratorCreate(pCache);
|
||||
Iterate* tfileIter = tfileIteratorCreate(pReader);
|
||||
if (tfileIter == NULL) { indexWarn("empty tfile reader iterator"); }
|
||||
if (tfileIter == NULL) {
|
||||
indexWarn("empty tfile reader iterator");
|
||||
}
|
||||
|
||||
SArray* result = taosArrayInit(1024, sizeof(void*));
|
||||
|
||||
|
@ -484,7 +508,9 @@ void iterateValueDestroy(IterateValue* value, bool destroy) {
|
|||
taosArrayDestroy(value->val);
|
||||
value->val = NULL;
|
||||
} else {
|
||||
if (value->val != NULL) { taosArrayClear(value->val); }
|
||||
if (value->val != NULL) {
|
||||
taosArrayClear(value->val);
|
||||
}
|
||||
}
|
||||
free(value->colVal);
|
||||
value->colVal = NULL;
|
||||
|
@ -507,7 +533,9 @@ static int indexGenTFile(SIndex* sIdx, IndexCache* cache, SArray* batch) {
|
|||
tfileWriterClose(tw);
|
||||
|
||||
TFileReader* reader = tfileReaderOpen(sIdx->path, cache->suid, version, cache->colName);
|
||||
if (reader == NULL) { return -1; }
|
||||
if (reader == NULL) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
TFileHeader* header = &reader->header;
|
||||
ICacheKey key = {.suid = cache->suid, .colName = header->colName, .nColName = strlen(header->colName)};
|
||||
|
|
|
@ -119,13 +119,17 @@ void indexCacheDestroySkiplist(SSkipList* slt) {
|
|||
tSkipListDestroy(slt);
|
||||
}
|
||||
void indexCacheDestroyImm(IndexCache* cache) {
|
||||
if (cache == NULL) { return; }
|
||||
if (cache == NULL) {
|
||||
return;
|
||||
}
|
||||
|
||||
MemTable* tbl = NULL;
|
||||
pthread_mutex_lock(&cache->mtx);
|
||||
|
||||
tbl = cache->imm;
|
||||
cache->imm = NULL; // or throw int bg thread
|
||||
pthread_cond_broadcast(&cache->finished);
|
||||
|
||||
pthread_mutex_unlock(&cache->mtx);
|
||||
|
||||
indexMemUnRef(tbl);
|
||||
|
@ -133,7 +137,9 @@ void indexCacheDestroyImm(IndexCache* cache) {
|
|||
}
|
||||
void indexCacheDestroy(void* cache) {
|
||||
IndexCache* pCache = cache;
|
||||
if (pCache == NULL) { return; }
|
||||
if (pCache == NULL) {
|
||||
return;
|
||||
}
|
||||
indexMemUnRef(pCache->mem);
|
||||
indexMemUnRef(pCache->imm);
|
||||
free(pCache->colName);
|
||||
|
@ -146,7 +152,9 @@ void indexCacheDestroy(void* cache) {
|
|||
|
||||
Iterate* indexCacheIteratorCreate(IndexCache* cache) {
|
||||
Iterate* iiter = calloc(1, sizeof(Iterate));
|
||||
if (iiter == NULL) { return NULL; }
|
||||
if (iiter == NULL) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
pthread_mutex_lock(&cache->mtx);
|
||||
|
||||
|
@ -164,7 +172,9 @@ Iterate* indexCacheIteratorCreate(IndexCache* cache) {
|
|||
return iiter;
|
||||
}
|
||||
void indexCacheIteratorDestroy(Iterate* iter) {
|
||||
if (iter == NULL) { return; }
|
||||
if (iter == NULL) {
|
||||
return;
|
||||
}
|
||||
tSkipListDestroyIter(iter->iter);
|
||||
iterateValueDestroy(&iter->val, true);
|
||||
free(iter);
|
||||
|
@ -186,9 +196,6 @@ static void indexCacheMakeRoomForWrite(IndexCache* cache) {
|
|||
} else if (cache->imm != NULL) {
|
||||
// TODO: wake up by condition variable
|
||||
pthread_cond_wait(&cache->finished, &cache->mtx);
|
||||
// pthread_mutex_unlock(&cache->mtx);
|
||||
// taosMsleep(50);
|
||||
// pthread_mutex_lock(&cache->mtx);
|
||||
} else {
|
||||
indexCacheRef(cache);
|
||||
cache->imm = cache->mem;
|
||||
|
@ -202,13 +209,17 @@ static void indexCacheMakeRoomForWrite(IndexCache* cache) {
|
|||
}
|
||||
|
||||
int indexCachePut(void* cache, SIndexTerm* term, uint64_t uid) {
|
||||
if (cache == NULL) { return -1; }
|
||||
if (cache == NULL) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
IndexCache* pCache = cache;
|
||||
indexCacheRef(pCache);
|
||||
// encode data
|
||||
CacheTerm* ct = calloc(1, sizeof(CacheTerm));
|
||||
if (cache == NULL) { return -1; }
|
||||
if (cache == NULL) {
|
||||
return -1;
|
||||
}
|
||||
// set up key
|
||||
ct->colType = term->colType;
|
||||
ct->colVal = (char*)calloc(1, sizeof(char) * (term->nColVal + 1));
|
||||
|
@ -240,7 +251,9 @@ int indexCacheDel(void* cache, const char* fieldValue, int32_t fvlen, uint64_t u
|
|||
}
|
||||
|
||||
static int indexQueryMem(MemTable* mem, CacheTerm* ct, EIndexQueryType qtype, SArray* result, STermValueType* s) {
|
||||
if (mem == NULL) { return 0; }
|
||||
if (mem == NULL) {
|
||||
return 0;
|
||||
}
|
||||
char* key = indexCacheTermGet(ct);
|
||||
|
||||
SSkipListIterator* iter = tSkipListCreateIterFromVal(mem->mem, key, TSDB_DATA_TYPE_BINARY, TSDB_ORDER_ASC);
|
||||
|
@ -266,7 +279,9 @@ static int indexQueryMem(MemTable* mem, CacheTerm* ct, EIndexQueryType qtype, SA
|
|||
return 0;
|
||||
}
|
||||
int indexCacheSearch(void* cache, SIndexTermQuery* query, SArray* result, STermValueType* s) {
|
||||
if (cache == NULL) { return 0; }
|
||||
if (cache == NULL) {
|
||||
return 0;
|
||||
}
|
||||
IndexCache* pCache = cache;
|
||||
|
||||
MemTable *mem = NULL, *imm = NULL;
|
||||
|
@ -294,23 +309,33 @@ int indexCacheSearch(void* cache, SIndexTermQuery* query, SArray* result, STermV
|
|||
}
|
||||
|
||||
void indexCacheRef(IndexCache* cache) {
|
||||
if (cache == NULL) { return; }
|
||||
if (cache == NULL) {
|
||||
return;
|
||||
}
|
||||
int ref = T_REF_INC(cache);
|
||||
UNUSED(ref);
|
||||
}
|
||||
void indexCacheUnRef(IndexCache* cache) {
|
||||
if (cache == NULL) { return; }
|
||||
if (cache == NULL) {
|
||||
return;
|
||||
}
|
||||
int ref = T_REF_DEC(cache);
|
||||
if (ref == 0) { indexCacheDestroy(cache); }
|
||||
if (ref == 0) {
|
||||
indexCacheDestroy(cache);
|
||||
}
|
||||
}
|
||||
|
||||
void indexMemRef(MemTable* tbl) {
|
||||
if (tbl == NULL) { return; }
|
||||
if (tbl == NULL) {
|
||||
return;
|
||||
}
|
||||
int ref = T_REF_INC(tbl);
|
||||
UNUSED(ref);
|
||||
}
|
||||
void indexMemUnRef(MemTable* tbl) {
|
||||
if (tbl == NULL) { return; }
|
||||
if (tbl == NULL) {
|
||||
return;
|
||||
}
|
||||
int ref = T_REF_DEC(tbl);
|
||||
if (ref == 0) {
|
||||
SSkipList* slt = tbl->mem;
|
||||
|
@ -320,7 +345,9 @@ void indexMemUnRef(MemTable* tbl) {
|
|||
}
|
||||
|
||||
static void indexCacheTermDestroy(CacheTerm* ct) {
|
||||
if (ct == NULL) { return; }
|
||||
if (ct == NULL) {
|
||||
return;
|
||||
}
|
||||
free(ct->colVal);
|
||||
free(ct);
|
||||
}
|
||||
|
@ -333,7 +360,9 @@ static int32_t indexCacheTermCompare(const void* l, const void* r) {
|
|||
CacheTerm* rt = (CacheTerm*)r;
|
||||
// compare colVal
|
||||
int32_t cmp = strcmp(lt->colVal, rt->colVal);
|
||||
if (cmp == 0) { return rt->version - lt->version; }
|
||||
if (cmp == 0) {
|
||||
return rt->version - lt->version;
|
||||
}
|
||||
return cmp;
|
||||
}
|
||||
|
||||
|
@ -354,7 +383,9 @@ static void doMergeWork(SSchedMsg* msg) {
|
|||
}
|
||||
static bool indexCacheIteratorNext(Iterate* itera) {
|
||||
SSkipListIterator* iter = itera->iter;
|
||||
if (iter == NULL) { return false; }
|
||||
if (iter == NULL) {
|
||||
return false;
|
||||
}
|
||||
IterateValue* iv = &itera->val;
|
||||
iterateValueDestroy(iv, false);
|
||||
|
||||
|
|
|
@ -31,20 +31,24 @@ static uint8_t fstPackDetla(FstCountingWriter* wrt, CompiledAddr nodeAddr, Compi
|
|||
|
||||
FstUnFinishedNodes* fstUnFinishedNodesCreate() {
|
||||
FstUnFinishedNodes* nodes = malloc(sizeof(FstUnFinishedNodes));
|
||||
if (nodes == NULL) { return NULL; }
|
||||
if (nodes == NULL) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
nodes->stack = (SArray*)taosArrayInit(64, sizeof(FstBuilderNodeUnfinished));
|
||||
fstUnFinishedNodesPushEmpty(nodes, false);
|
||||
return nodes;
|
||||
}
|
||||
void unFinishedNodeDestroyElem(void* elem) {
|
||||
static void unFinishedNodeDestroyElem(void* elem) {
|
||||
FstBuilderNodeUnfinished* b = (FstBuilderNodeUnfinished*)elem;
|
||||
fstBuilderNodeDestroy(b->node);
|
||||
free(b->last);
|
||||
b->last = NULL;
|
||||
}
|
||||
void fstUnFinishedNodesDestroy(FstUnFinishedNodes* nodes) {
|
||||
if (nodes == NULL) { return; }
|
||||
if (nodes == NULL) {
|
||||
return;
|
||||
}
|
||||
|
||||
taosArrayDestroyEx(nodes->stack, unFinishedNodeDestroyElem);
|
||||
free(nodes);
|
||||
|
@ -92,7 +96,9 @@ void fstUnFinishedNodesTopLastFreeze(FstUnFinishedNodes* nodes, CompiledAddr add
|
|||
}
|
||||
void fstUnFinishedNodesAddSuffix(FstUnFinishedNodes* nodes, FstSlice bs, Output out) {
|
||||
FstSlice* s = &bs;
|
||||
if (fstSliceIsEmpty(s)) { return; }
|
||||
if (fstSliceIsEmpty(s)) {
|
||||
return;
|
||||
}
|
||||
size_t sz = taosArrayGetSize(nodes->stack) - 1;
|
||||
FstBuilderNodeUnfinished* un = taosArrayGet(nodes->stack, sz);
|
||||
assert(un->last == NULL);
|
||||
|
@ -172,7 +178,9 @@ uint64_t fstUnFinishedNodesFindCommPrefixAndSetOutput(FstUnFinishedNodes* node,
|
|||
|
||||
FstState fstStateCreateFrom(FstSlice* slice, CompiledAddr addr) {
|
||||
FstState fs = {.state = EmptyFinal, .val = 0};
|
||||
if (addr == EMPTY_ADDRESS) { return fs; }
|
||||
if (addr == EMPTY_ADDRESS) {
|
||||
return fs;
|
||||
}
|
||||
|
||||
uint8_t* data = fstSliceData(slice, NULL);
|
||||
uint8_t v = data[addr];
|
||||
|
@ -229,7 +237,9 @@ void fstStateCompileForOneTrans(FstCountingWriter* w, CompiledAddr addr, FstTran
|
|||
fstStateSetCommInput(&st, trn->inp);
|
||||
bool null = false;
|
||||
uint8_t inp = fstStateCommInput(&st, &null);
|
||||
if (null == true) { fstCountingWriterWrite(w, (char*)&trn->inp, sizeof(trn->inp)); }
|
||||
if (null == true) {
|
||||
fstCountingWriterWrite(w, (char*)&trn->inp, sizeof(trn->inp));
|
||||
}
|
||||
fstCountingWriterWrite(w, (char*)(&(st.val)), sizeof(st.val));
|
||||
return;
|
||||
}
|
||||
|
@ -263,7 +273,9 @@ void fstStateCompileForAnyTrans(FstCountingWriter* w, CompiledAddr addr, FstBuil
|
|||
fstStateSetStateNtrans(&st, (uint8_t)sz);
|
||||
|
||||
if (anyOuts) {
|
||||
if (FST_BUILDER_NODE_IS_FINAL(node)) { fstCountingWriterPackUintIn(w, node->finalOutput, oSize); }
|
||||
if (FST_BUILDER_NODE_IS_FINAL(node)) {
|
||||
fstCountingWriterPackUintIn(w, node->finalOutput, oSize);
|
||||
}
|
||||
for (int32_t i = sz - 1; i >= 0; i--) {
|
||||
FstTransition* t = taosArrayGet(node->trans, i);
|
||||
fstCountingWriterPackUintIn(w, t->out, oSize);
|
||||
|
@ -428,7 +440,9 @@ Output fstStateOutput(FstState* s, FstNode* node) {
|
|||
assert(s->state == OneTrans);
|
||||
|
||||
uint8_t oSizes = FST_GET_OUTPUT_PACK_SIZE(node->sizes);
|
||||
if (oSizes == 0) { return 0; }
|
||||
if (oSizes == 0) {
|
||||
return 0;
|
||||
}
|
||||
FstSlice* slice = &node->data;
|
||||
uint8_t tSizes = FST_GET_TRANSITION_PACK_SIZE(node->sizes);
|
||||
|
||||
|
@ -440,7 +454,9 @@ Output fstStateOutputForAnyTrans(FstState* s, FstNode* node, uint64_t i) {
|
|||
assert(s->state == AnyTrans);
|
||||
|
||||
uint8_t oSizes = FST_GET_OUTPUT_PACK_SIZE(node->sizes);
|
||||
if (oSizes == 0) { return 0; }
|
||||
if (oSizes == 0) {
|
||||
return 0;
|
||||
}
|
||||
FstSlice* slice = &node->data;
|
||||
uint8_t* data = fstSliceData(slice, NULL);
|
||||
uint64_t at = node->start - fstStateNtransLen(s) - 1 // pack size
|
||||
|
@ -453,7 +469,9 @@ Output fstStateOutputForAnyTrans(FstState* s, FstNode* node, uint64_t i) {
|
|||
|
||||
void fstStateSetFinalState(FstState* s, bool yes) {
|
||||
assert(s->state == AnyTrans);
|
||||
if (yes) { s->val |= 0b01000000; }
|
||||
if (yes) {
|
||||
s->val |= 0b01000000;
|
||||
}
|
||||
return;
|
||||
}
|
||||
bool fstStateIsFinalState(FstState* s) {
|
||||
|
@ -463,7 +481,9 @@ bool fstStateIsFinalState(FstState* s) {
|
|||
|
||||
void fstStateSetStateNtrans(FstState* s, uint8_t n) {
|
||||
assert(s->state == AnyTrans);
|
||||
if (n <= 0b00111111) { s->val = (s->val & 0b11000000) | n; }
|
||||
if (n <= 0b00111111) {
|
||||
s->val = (s->val & 0b11000000) | n;
|
||||
}
|
||||
return;
|
||||
}
|
||||
// state_ntrans
|
||||
|
@ -495,7 +515,9 @@ uint64_t fstStateNtransLen(FstState* s) {
|
|||
uint64_t fstStateNtrans(FstState* s, FstSlice* slice) {
|
||||
bool null = false;
|
||||
uint8_t n = fstStateStateNtrans(s, &null);
|
||||
if (null != true) { return n; }
|
||||
if (null != true) {
|
||||
return n;
|
||||
}
|
||||
int32_t len;
|
||||
uint8_t* data = fstSliceData(slice, &len);
|
||||
n = data[len - 2];
|
||||
|
@ -505,7 +527,9 @@ uint64_t fstStateNtrans(FstState* s, FstSlice* slice) {
|
|||
}
|
||||
Output fstStateFinalOutput(FstState* s, uint64_t version, FstSlice* slice, PackSizes sizes, uint64_t nTrans) {
|
||||
uint8_t oSizes = FST_GET_OUTPUT_PACK_SIZE(sizes);
|
||||
if (oSizes == 0 || !fstStateIsFinalState(s)) { return 0; }
|
||||
if (oSizes == 0 || !fstStateIsFinalState(s)) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
uint64_t at = FST_SLICE_LEN(slice) - 1 - fstStateNtransLen(s) - 1 // pack size
|
||||
- fstStateTotalTransSize(s, version, sizes, nTrans) - (nTrans * oSizes) - oSizes;
|
||||
|
@ -522,7 +546,9 @@ uint64_t fstStateFindInput(FstState* s, FstNode* node, uint8_t b, bool* null) {
|
|||
uint8_t* data = fstSliceData(slice, &dlen);
|
||||
uint64_t i = data[at + b];
|
||||
// uint64_t i = slice->data[slice->start + at + b];
|
||||
if (i >= node->nTrans) { *null = true; }
|
||||
if (i >= node->nTrans) {
|
||||
*null = true;
|
||||
}
|
||||
return i;
|
||||
} else {
|
||||
uint64_t start = node->start - fstStateNtransLen(s) - 1 // pack size
|
||||
|
@ -539,7 +565,9 @@ uint64_t fstStateFindInput(FstState* s, FstNode* node, uint8_t b, bool* null) {
|
|||
return node->nTrans - i - 1; // bug
|
||||
}
|
||||
}
|
||||
if (i == len) { *null = true; }
|
||||
if (i == len) {
|
||||
*null = true;
|
||||
}
|
||||
fstSliceDestroy(&t);
|
||||
}
|
||||
}
|
||||
|
@ -548,7 +576,9 @@ uint64_t fstStateFindInput(FstState* s, FstNode* node, uint8_t b, bool* null) {
|
|||
|
||||
FstNode* fstNodeCreate(int64_t version, CompiledAddr addr, FstSlice* slice) {
|
||||
FstNode* n = (FstNode*)malloc(sizeof(FstNode));
|
||||
if (n == NULL) { return NULL; }
|
||||
if (n == NULL) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
FstState st = fstStateCreateFrom(slice, addr);
|
||||
|
||||
|
@ -614,7 +644,9 @@ void fstNodeDestroy(FstNode* node) {
|
|||
}
|
||||
FstTransitions* fstNodeTransitions(FstNode* node) {
|
||||
FstTransitions* t = malloc(sizeof(FstTransitions));
|
||||
if (NULL == t) { return NULL; }
|
||||
if (NULL == t) {
|
||||
return NULL;
|
||||
}
|
||||
FstRange range = {.start = 0, .end = FST_NODE_LEN(node)};
|
||||
t->range = range;
|
||||
t->node = node;
|
||||
|
@ -721,7 +753,9 @@ bool fstBuilderNodeCompileTo(FstBuilderNode* b, FstCountingWriter* wrt, Compiled
|
|||
|
||||
FstBuilder* fstBuilderCreate(void* w, FstType ty) {
|
||||
FstBuilder* b = malloc(sizeof(FstBuilder));
|
||||
if (NULL == b) { return b; }
|
||||
if (NULL == b) {
|
||||
return b;
|
||||
}
|
||||
|
||||
b->wrt = fstCountingWriterCreate(w);
|
||||
b->unfinished = fstUnFinishedNodesCreate();
|
||||
|
@ -735,15 +769,17 @@ FstBuilder* fstBuilderCreate(void* w, FstType ty) {
|
|||
taosEncodeFixedU64(&pBuf64, VERSION);
|
||||
fstCountingWriterWrite(b->wrt, buf64, sizeof(buf64));
|
||||
|
||||
memset(buf64, 0, sizeof(buf64));
|
||||
pBuf64 = buf64;
|
||||
memset(buf64, 0, sizeof(buf64));
|
||||
taosEncodeFixedU64(&pBuf64, ty);
|
||||
fstCountingWriterWrite(b->wrt, buf64, sizeof(buf64));
|
||||
|
||||
return b;
|
||||
}
|
||||
void fstBuilderDestroy(FstBuilder* b) {
|
||||
if (b == NULL) { return; }
|
||||
if (b == NULL) {
|
||||
return;
|
||||
}
|
||||
|
||||
fstCountingWriterDestroy(b->wrt);
|
||||
fstUnFinishedNodesDestroy(b->unfinished);
|
||||
|
@ -830,6 +866,7 @@ void fstBuilderCompileFrom(FstBuilder* b, uint64_t istate) {
|
|||
fstUnFinishedNodesTopLastFreeze(b->unfinished, addr);
|
||||
return;
|
||||
}
|
||||
|
||||
CompiledAddr fstBuilderCompile(FstBuilder* b, FstBuilderNode* bn) {
|
||||
if (FST_BUILDER_NODE_IS_FINAL(bn) && FST_BUILDER_NODE_TRANS_ISEMPTY(bn) && FST_BUILDER_NODE_FINALOUTPUT_ISZERO(bn)) {
|
||||
return EMPTY_ADDRESS;
|
||||
|
@ -844,7 +881,9 @@ CompiledAddr fstBuilderCompile(FstBuilder* b, FstBuilderNode* bn) {
|
|||
|
||||
fstBuilderNodeCompileTo(bn, b->wrt, b->lastAddr, startAddr);
|
||||
b->lastAddr = (CompiledAddr)(FST_WRITER_COUNT(b->wrt) - 1);
|
||||
if (entry->state == NOTFOUND) { FST_REGISTRY_CELL_INSERT(entry->cell, b->lastAddr); }
|
||||
if (entry->state == NOTFOUND) {
|
||||
FST_REGISTRY_CELL_INSERT(entry->cell, b->lastAddr);
|
||||
}
|
||||
fstRegistryEntryDestroy(entry);
|
||||
|
||||
return b->lastAddr;
|
||||
|
@ -887,7 +926,9 @@ FstSlice fstNodeAsSlice(FstNode* node) {
|
|||
|
||||
FstLastTransition* fstLastTransitionCreate(uint8_t inp, Output out) {
|
||||
FstLastTransition* trn = malloc(sizeof(FstLastTransition));
|
||||
if (trn == NULL) { return NULL; }
|
||||
if (trn == NULL) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
trn->inp = inp;
|
||||
trn->out = out;
|
||||
|
@ -897,7 +938,9 @@ FstLastTransition* fstLastTransitionCreate(uint8_t inp, Output out) {
|
|||
void fstLastTransitionDestroy(FstLastTransition* trn) { free(trn); }
|
||||
void fstBuilderNodeUnfinishedLastCompiled(FstBuilderNodeUnfinished* unNode, CompiledAddr addr) {
|
||||
FstLastTransition* trn = unNode->last;
|
||||
if (trn == NULL) { return; }
|
||||
if (trn == NULL) {
|
||||
return;
|
||||
}
|
||||
FstTransition t = {.inp = trn->inp, .out = trn->out, .addr = addr};
|
||||
taosArrayPush(unNode->node->trans, &t);
|
||||
fstLastTransitionDestroy(trn);
|
||||
|
@ -906,27 +949,35 @@ void fstBuilderNodeUnfinishedLastCompiled(FstBuilderNodeUnfinished* unNode, Comp
|
|||
}
|
||||
|
||||
void fstBuilderNodeUnfinishedAddOutputPrefix(FstBuilderNodeUnfinished* unNode, Output out) {
|
||||
if (FST_BUILDER_NODE_IS_FINAL(unNode->node)) { unNode->node->finalOutput += out; }
|
||||
if (FST_BUILDER_NODE_IS_FINAL(unNode->node)) {
|
||||
unNode->node->finalOutput += out;
|
||||
}
|
||||
size_t sz = taosArrayGetSize(unNode->node->trans);
|
||||
for (size_t i = 0; i < sz; i++) {
|
||||
FstTransition* trn = taosArrayGet(unNode->node->trans, i);
|
||||
trn->out += out;
|
||||
}
|
||||
if (unNode->last) { unNode->last->out += out; }
|
||||
if (unNode->last) {
|
||||
unNode->last->out += out;
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
Fst* fstCreate(FstSlice* slice) {
|
||||
int32_t slen;
|
||||
char* buf = fstSliceData(slice, &slen);
|
||||
if (slen < 36) { return NULL; }
|
||||
if (slen < 36) {
|
||||
return NULL;
|
||||
}
|
||||
uint64_t len = slen;
|
||||
uint64_t skip = 0;
|
||||
|
||||
uint64_t version;
|
||||
taosDecodeFixedU64(buf, &version);
|
||||
skip += sizeof(version);
|
||||
if (version == 0 || version > VERSION) { return NULL; }
|
||||
if (version == 0 || version > VERSION) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
uint64_t type;
|
||||
taosDecodeFixedU64(buf + skip, &type);
|
||||
|
@ -949,10 +1000,14 @@ Fst* fstCreate(FstSlice* slice) {
|
|||
taosDecodeFixedU64(buf + len, &fstLen);
|
||||
// TODO(validate root addr)
|
||||
Fst* fst = (Fst*)calloc(1, sizeof(Fst));
|
||||
if (fst == NULL) { return NULL; }
|
||||
if (fst == NULL) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
fst->meta = (FstMeta*)malloc(sizeof(FstMeta));
|
||||
if (NULL == fst->meta) { goto FST_CREAT_FAILED; }
|
||||
if (NULL == fst->meta) {
|
||||
goto FST_CREAT_FAILED;
|
||||
}
|
||||
|
||||
fst->meta->version = version;
|
||||
fst->meta->rootAddr = rootAddr;
|
||||
|
@ -983,7 +1038,7 @@ void fstDestroy(Fst* fst) {
|
|||
|
||||
bool fstGet(Fst* fst, FstSlice* b, Output* out) {
|
||||
// dec lock range
|
||||
pthread_mutex_lock(&fst->mtx);
|
||||
// pthread_mutex_lock(&fst->mtx);
|
||||
FstNode* root = fstGetRoot(fst);
|
||||
Output tOut = 0;
|
||||
int32_t len;
|
||||
|
@ -996,7 +1051,7 @@ bool fstGet(Fst* fst, FstSlice* b, Output* out) {
|
|||
uint8_t inp = data[i];
|
||||
Output res = 0;
|
||||
if (false == fstNodeFindInput(root, inp, &res)) {
|
||||
pthread_mutex_unlock(&fst->mtx);
|
||||
// pthread_mutex_unlock(&fst->mtx);
|
||||
return false;
|
||||
}
|
||||
|
||||
|
@ -1007,7 +1062,7 @@ bool fstGet(Fst* fst, FstSlice* b, Output* out) {
|
|||
taosArrayPush(nodes, &root);
|
||||
}
|
||||
if (!FST_NODE_IS_FINAL(root)) {
|
||||
pthread_mutex_unlock(&fst->mtx);
|
||||
// pthread_mutex_unlock(&fst->mtx);
|
||||
return false;
|
||||
} else {
|
||||
tOut = tOut + FST_NODE_FINAL_OUTPUT(root);
|
||||
|
@ -1018,8 +1073,8 @@ bool fstGet(Fst* fst, FstSlice* b, Output* out) {
|
|||
fstNodeDestroy(*node);
|
||||
}
|
||||
taosArrayDestroy(nodes);
|
||||
fst->root = NULL;
|
||||
pthread_mutex_unlock(&fst->mtx);
|
||||
// fst->root = NULL;
|
||||
// pthread_mutex_unlock(&fst->mtx);
|
||||
*out = tOut;
|
||||
return true;
|
||||
}
|
||||
|
@ -1028,7 +1083,9 @@ FstStreamBuilder* fstSearch(Fst* fst, AutomationCtx* ctx) {
|
|||
return fstStreamBuilderCreate(fst, ctx);
|
||||
}
|
||||
StreamWithState* streamBuilderIntoStream(FstStreamBuilder* sb) {
|
||||
if (sb == NULL) { return NULL; }
|
||||
if (sb == NULL) {
|
||||
return NULL;
|
||||
}
|
||||
return streamWithStateCreate(sb->fst, sb->aut, sb->min, sb->max);
|
||||
}
|
||||
FstStreamWithStateBuilder* fstSearchWithState(Fst* fst, AutomationCtx* ctx) {
|
||||
|
@ -1039,15 +1096,6 @@ FstStreamWithStateBuilder* fstSearchWithState(Fst* fst, AutomationCtx* ctx) {
|
|||
FstNode* fstGetRoot(Fst* fst) {
|
||||
CompiledAddr rAddr = fstGetRootAddr(fst);
|
||||
return fstGetNode(fst, rAddr);
|
||||
// pthread_mutex_lock(&fst->mtx);
|
||||
// if (fst->root != NULL) {
|
||||
// // pthread_mutex_unlock(&fst->mtx);
|
||||
// return fst->root;
|
||||
//}
|
||||
// CompiledAddr rAddr = fstGetRootAddr(fst);
|
||||
// fst->root = fstGetNode(fst, rAddr);
|
||||
//// pthread_mutex_unlock(&fst->mtx);
|
||||
// return fst->root;
|
||||
}
|
||||
|
||||
FstNode* fstGetNode(Fst* fst, CompiledAddr addr) {
|
||||
|
@ -1074,14 +1122,18 @@ bool fstVerify(Fst* fst) {
|
|||
uint32_t len, checkSum = fst->meta->checkSum;
|
||||
uint8_t* data = fstSliceData(fst->data, &len);
|
||||
TSCKSUM initSum = 0;
|
||||
if (!taosCheckChecksumWhole(data, len)) { return false; }
|
||||
if (!taosCheckChecksumWhole(data, len)) {
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
// data bound function
|
||||
FstBoundWithData* fstBoundStateCreate(FstBound type, FstSlice* data) {
|
||||
FstBoundWithData* b = calloc(1, sizeof(FstBoundWithData));
|
||||
if (b == NULL) { return NULL; }
|
||||
if (b == NULL) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
if (data != NULL) {
|
||||
b->data = fstSliceCopy(data, data->start, data->end);
|
||||
|
@ -1118,7 +1170,9 @@ void fstBoundDestroy(FstBoundWithData* bound) { free(bound); }
|
|||
StreamWithState* streamWithStateCreate(Fst* fst, AutomationCtx* automation, FstBoundWithData* min,
|
||||
FstBoundWithData* max) {
|
||||
StreamWithState* sws = calloc(1, sizeof(StreamWithState));
|
||||
if (sws == NULL) { return NULL; }
|
||||
if (sws == NULL) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
sws->fst = fst;
|
||||
sws->aut = automation;
|
||||
|
@ -1134,7 +1188,9 @@ StreamWithState* streamWithStateCreate(Fst* fst, AutomationCtx* automation, FstB
|
|||
return sws;
|
||||
}
|
||||
void streamWithStateDestroy(StreamWithState* sws) {
|
||||
if (sws == NULL) { return; }
|
||||
if (sws == NULL) {
|
||||
return;
|
||||
}
|
||||
|
||||
taosArrayDestroy(sws->inp);
|
||||
taosArrayDestroyEx(sws->stack, streamStateDestroy);
|
||||
|
@ -1200,7 +1256,9 @@ bool streamWithStateSeekMin(StreamWithState* sws, FstBoundWithData* min) {
|
|||
uint64_t i = 0;
|
||||
for (i = trans->range.start; i < trans->range.end; i++) {
|
||||
FstTransition trn;
|
||||
if (fstNodeGetTransitionAt(node, i, &trn) && trn.inp > b) { break; }
|
||||
if (fstNodeGetTransitionAt(node, i, &trn) && trn.inp > b) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
StreamState s = {.node = node, .trans = i, .out = {.null = false, .out = out}, .autState = autState};
|
||||
|
@ -1248,7 +1306,9 @@ StreamWithStateResult* streamWithStateNextWith(StreamWithState* sws, StreamCallb
|
|||
while (taosArrayGetSize(sws->stack) > 0) {
|
||||
StreamState* p = (StreamState*)taosArrayPop(sws->stack);
|
||||
if (p->trans >= FST_NODE_LEN(p->node) || !automFuncs[aut->type].canMatch(aut, p->autState)) {
|
||||
if (FST_NODE_ADDR(p->node) != fstGetRootAddr(sws->fst)) { taosArrayPop(sws->inp); }
|
||||
if (FST_NODE_ADDR(p->node) != fstGetRootAddr(sws->fst)) {
|
||||
taosArrayPop(sws->inp);
|
||||
}
|
||||
streamStateDestroy(p);
|
||||
continue;
|
||||
}
|
||||
|
@ -1267,7 +1327,9 @@ StreamWithStateResult* streamWithStateNextWith(StreamWithState* sws, StreamCallb
|
|||
if (FST_NODE_IS_FINAL(nextNode)) {
|
||||
// void *eofState = sws->aut->acceptEof(nextState);
|
||||
void* eofState = automFuncs[aut->type].acceptEof(aut, nextState);
|
||||
if (eofState != NULL) { isMatch = automFuncs[aut->type].isMatch(aut, eofState); }
|
||||
if (eofState != NULL) {
|
||||
isMatch = automFuncs[aut->type].isMatch(aut, eofState);
|
||||
}
|
||||
}
|
||||
StreamState s1 = {.node = p->node, .trans = p->trans + 1, .out = p->out, .autState = p->autState};
|
||||
taosArrayPush(sws->stack, &s1);
|
||||
|
@ -1277,24 +1339,26 @@ StreamWithStateResult* streamWithStateNextWith(StreamWithState* sws, StreamCallb
|
|||
|
||||
size_t isz = taosArrayGetSize(sws->inp);
|
||||
uint8_t* buf = (uint8_t*)malloc(isz * sizeof(uint8_t));
|
||||
for (uint32_t i = 0; i < isz; i++) { buf[i] = *(uint8_t*)taosArrayGet(sws->inp, i); }
|
||||
for (uint32_t i = 0; i < isz; i++) {
|
||||
buf[i] = *(uint8_t*)taosArrayGet(sws->inp, i);
|
||||
}
|
||||
FstSlice slice = fstSliceCreate(buf, taosArrayGetSize(sws->inp));
|
||||
if (fstBoundWithDataExceededBy(sws->endAt, &slice)) {
|
||||
taosArrayDestroyEx(sws->stack, streamStateDestroy);
|
||||
sws->stack = (SArray*)taosArrayInit(256, sizeof(StreamState));
|
||||
free(buf);
|
||||
tfree(buf);
|
||||
fstSliceDestroy(&slice);
|
||||
return NULL;
|
||||
}
|
||||
if (FST_NODE_IS_FINAL(nextNode) && isMatch) {
|
||||
FstOutput fOutput = {.null = false, .out = out + FST_NODE_FINAL_OUTPUT(nextNode)};
|
||||
StreamWithStateResult* result = swsResultCreate(&slice, fOutput, tState);
|
||||
free(buf);
|
||||
tfree(buf);
|
||||
fstSliceDestroy(&slice);
|
||||
taosArrayDestroy(nodes);
|
||||
return result;
|
||||
}
|
||||
free(buf);
|
||||
tfree(buf);
|
||||
fstSliceDestroy(&slice);
|
||||
}
|
||||
for (size_t i = 0; i < taosArrayGetSize(nodes); i++) {
|
||||
|
@ -1307,16 +1371,19 @@ StreamWithStateResult* streamWithStateNextWith(StreamWithState* sws, StreamCallb
|
|||
|
||||
StreamWithStateResult* swsResultCreate(FstSlice* data, FstOutput fOut, void* state) {
|
||||
StreamWithStateResult* result = calloc(1, sizeof(StreamWithStateResult));
|
||||
if (result == NULL) { return NULL; }
|
||||
if (result == NULL) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
result->data = fstSliceCopy(data, 0, FST_SLICE_LEN(data) - 1);
|
||||
result->out = fOut;
|
||||
result->state = state;
|
||||
|
||||
return result;
|
||||
}
|
||||
void swsResultDestroy(StreamWithStateResult* result) {
|
||||
if (NULL == result) { return; }
|
||||
if (NULL == result) {
|
||||
return;
|
||||
}
|
||||
|
||||
fstSliceDestroy(&result->data);
|
||||
startWithStateValueDestroy(result->state);
|
||||
|
@ -1324,16 +1391,18 @@ void swsResultDestroy(StreamWithStateResult* result) {
|
|||
}
|
||||
|
||||
void streamStateDestroy(void* s) {
|
||||
if (NULL == s) { return; }
|
||||
if (NULL == s) {
|
||||
return;
|
||||
}
|
||||
StreamState* ss = (StreamState*)s;
|
||||
|
||||
fstNodeDestroy(ss->node);
|
||||
// free(s->autoState);
|
||||
}
|
||||
|
||||
FstStreamBuilder* fstStreamBuilderCreate(Fst* fst, AutomationCtx* aut) {
|
||||
FstStreamBuilder* b = calloc(1, sizeof(FstStreamBuilder));
|
||||
if (NULL == b) { return NULL; }
|
||||
if (NULL == b) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
b->fst = fst;
|
||||
b->aut = aut;
|
||||
|
@ -1349,8 +1418,9 @@ void fstStreamBuilderDestroy(FstStreamBuilder* b) {
|
|||
free(b);
|
||||
}
|
||||
FstStreamBuilder* fstStreamBuilderRange(FstStreamBuilder* b, FstSlice* val, RangeType type) {
|
||||
if (b == NULL) { return NULL; }
|
||||
|
||||
if (b == NULL) {
|
||||
return NULL;
|
||||
}
|
||||
if (type == GE) {
|
||||
b->min->type = Included;
|
||||
fstSliceDestroy(&(b->min->data));
|
||||
|
|
|
@ -17,7 +17,9 @@
|
|||
|
||||
StartWithStateValue* startWithStateValueCreate(StartWithStateKind kind, ValueType ty, void* val) {
|
||||
StartWithStateValue* nsv = calloc(1, sizeof(StartWithStateValue));
|
||||
if (nsv == NULL) { return NULL; }
|
||||
if (nsv == NULL) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
nsv->kind = kind;
|
||||
nsv->type = ty;
|
||||
|
@ -35,7 +37,9 @@ StartWithStateValue* startWithStateValueCreate(StartWithStateKind kind, ValueTyp
|
|||
}
|
||||
void startWithStateValueDestroy(void* val) {
|
||||
StartWithStateValue* sv = (StartWithStateValue*)val;
|
||||
if (sv == NULL) { return; }
|
||||
if (sv == NULL) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (sv->type == FST_INT) {
|
||||
//
|
||||
|
@ -48,7 +52,9 @@ void startWithStateValueDestroy(void* val) {
|
|||
}
|
||||
StartWithStateValue* startWithStateValueDump(StartWithStateValue* sv) {
|
||||
StartWithStateValue* nsv = calloc(1, sizeof(StartWithStateValue));
|
||||
if (nsv == NULL) { return NULL; }
|
||||
if (nsv == NULL) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
nsv->kind = sv->kind;
|
||||
nsv->type = sv->type;
|
||||
|
@ -88,10 +94,14 @@ static bool prefixCanMatch(AutomationCtx* ctx, void* sv) {
|
|||
static bool prefixWillAlwaysMatch(AutomationCtx* ctx, void* state) { return true; }
|
||||
static void* prefixAccept(AutomationCtx* ctx, void* state, uint8_t byte) {
|
||||
StartWithStateValue* ssv = (StartWithStateValue*)state;
|
||||
if (ssv == NULL || ctx == NULL) { return NULL; }
|
||||
if (ssv == NULL || ctx == NULL) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
char* data = ctx->data;
|
||||
if (ssv->kind == Done) { return startWithStateValueCreate(Done, FST_INT, &ssv->val); }
|
||||
if (ssv->kind == Done) {
|
||||
return startWithStateValueCreate(Done, FST_INT, &ssv->val);
|
||||
}
|
||||
if ((strlen(data) > ssv->val) && data[ssv->val] == byte) {
|
||||
int val = ssv->val + 1;
|
||||
|
||||
|
@ -128,7 +138,9 @@ AutomationFunc automFuncs[] = {
|
|||
|
||||
AutomationCtx* automCtxCreate(void* data, AutomationType atype) {
|
||||
AutomationCtx* ctx = calloc(1, sizeof(AutomationCtx));
|
||||
if (ctx == NULL) { return NULL; }
|
||||
if (ctx == NULL) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
StartWithStateValue* sv = NULL;
|
||||
if (atype == AUTOMATION_ALWAYS) {
|
||||
|
|
|
@ -29,18 +29,6 @@ const uint64_t VERSION = 3;
|
|||
|
||||
const uint64_t TRANS_INDEX_THRESHOLD = 32;
|
||||
|
||||
// uint8_t commonInput(uint8_t idx) {
|
||||
// if (idx == 0) { return -1; }
|
||||
// else {
|
||||
// return COMMON_INPUTS_INV[idx - 1];
|
||||
// }
|
||||
//}
|
||||
//
|
||||
// uint8_t commonIdx(uint8_t v, uint8_t max) {
|
||||
// uint8_t v = ((uint16_t)tCOMMON_INPUTS[v] + 1)%256;
|
||||
// return v > max ? 0: v;
|
||||
//}
|
||||
|
||||
uint8_t packSize(uint64_t n) {
|
||||
if (n < (1u << 8)) {
|
||||
return 1;
|
||||
|
@ -103,9 +91,6 @@ FstSlice fstSliceCreate(uint8_t* data, uint64_t len) {
|
|||
FstSlice fstSliceCopy(FstSlice* s, int32_t start, int32_t end) {
|
||||
FstString* str = s->str;
|
||||
str->ref++;
|
||||
// uint8_t *buf = fstSliceData(s, &alen);
|
||||
// start = buf + start - (buf - s->start);
|
||||
// end = buf + end - (buf - s->start);
|
||||
|
||||
FstSlice t = {.str = str, .start = start + s->start, .end = end + s->start};
|
||||
return t;
|
||||
|
@ -130,19 +115,19 @@ FstSlice fstSliceDeepCopy(FstSlice* s, int32_t start, int32_t end) {
|
|||
ans.end = tlen - 1;
|
||||
return ans;
|
||||
}
|
||||
bool fstSliceIsEmpty(FstSlice* s) {
|
||||
return s->str == NULL || s->str->len == 0 || s->start < 0 || s->end < 0;
|
||||
}
|
||||
bool fstSliceIsEmpty(FstSlice* s) { return s->str == NULL || s->str->len == 0 || s->start < 0 || s->end < 0; }
|
||||
|
||||
uint8_t* fstSliceData(FstSlice* s, int32_t* size) {
|
||||
FstString* str = s->str;
|
||||
if (size != NULL) { *size = s->end - s->start + 1; }
|
||||
if (size != NULL) {
|
||||
*size = s->end - s->start + 1;
|
||||
}
|
||||
return str->data + s->start;
|
||||
}
|
||||
void fstSliceDestroy(FstSlice* s) {
|
||||
FstString* str = s->str;
|
||||
str->ref--;
|
||||
if (str->ref <= 0) {
|
||||
if (str->ref == 0) {
|
||||
free(str->data);
|
||||
free(str);
|
||||
s->str = NULL;
|
||||
|
|
|
@ -13,8 +13,6 @@ p *
|
|||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
//#include <sys/types.h>
|
||||
//#include <dirent.h>
|
||||
#include "index_tfile.h"
|
||||
#include "index.h"
|
||||
#include "index_fst.h"
|
||||
|
@ -61,7 +59,9 @@ static void tfileGenFileFullName(char* fullname, const char* path, uint64_t s
|
|||
|
||||
TFileCache* tfileCacheCreate(const char* path) {
|
||||
TFileCache* tcache = calloc(1, sizeof(TFileCache));
|
||||
if (tcache == NULL) { return NULL; }
|
||||
if (tcache == NULL) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
tcache->tableCache = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
|
||||
tcache->capacity = 64;
|
||||
|
@ -98,7 +98,9 @@ End:
|
|||
return NULL;
|
||||
}
|
||||
void tfileCacheDestroy(TFileCache* tcache) {
|
||||
if (tcache == NULL) { return; }
|
||||
if (tcache == NULL) {
|
||||
return;
|
||||
}
|
||||
|
||||
// free table cache
|
||||
TFileReader** reader = taosHashIterate(tcache->tableCache, NULL);
|
||||
|
@ -119,7 +121,9 @@ TFileReader* tfileCacheGet(TFileCache* tcache, ICacheKey* key) {
|
|||
int32_t sz = indexSerialCacheKey(key, buf);
|
||||
assert(sz < sizeof(buf));
|
||||
TFileReader** reader = taosHashGet(tcache->tableCache, buf, sz);
|
||||
if (reader == NULL) { return NULL; }
|
||||
if (reader == NULL) {
|
||||
return NULL;
|
||||
}
|
||||
tfileReaderRef(*reader);
|
||||
|
||||
return *reader;
|
||||
|
@ -142,7 +146,9 @@ void tfileCachePut(TFileCache* tcache, ICacheKey* key, TFileReader* reader) {
|
|||
}
|
||||
TFileReader* tfileReaderCreate(WriterCtx* ctx) {
|
||||
TFileReader* reader = calloc(1, sizeof(TFileReader));
|
||||
if (reader == NULL) { return NULL; }
|
||||
if (reader == NULL) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
reader->ctx = ctx;
|
||||
|
||||
|
@ -169,7 +175,9 @@ TFileReader* tfileReaderCreate(WriterCtx* ctx) {
|
|||
return reader;
|
||||
}
|
||||
void tfileReaderDestroy(TFileReader* reader) {
|
||||
if (reader == NULL) { return; }
|
||||
if (reader == NULL) {
|
||||
return;
|
||||
}
|
||||
// T_REF_INC(reader);
|
||||
fstDestroy(reader->fst);
|
||||
writerCtxDestroy(reader->ctx, reader->remove);
|
||||
|
@ -209,7 +217,9 @@ TFileWriter* tfileWriterOpen(char* path, uint64_t suid, int32_t version, const c
|
|||
tfileGenFileFullName(fullname, path, suid, colName, version);
|
||||
// indexInfo("open write file name %s", fullname);
|
||||
WriterCtx* wcx = writerCtxCreate(TFile, fullname, false, 1024 * 1024 * 64);
|
||||
if (wcx == NULL) { return NULL; }
|
||||
if (wcx == NULL) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
TFileHeader tfh = {0};
|
||||
tfh.suid = suid;
|
||||
|
@ -225,7 +235,9 @@ TFileReader* tfileReaderOpen(char* path, uint64_t suid, int32_t version, const c
|
|||
|
||||
WriterCtx* wc = writerCtxCreate(TFile, fullname, true, 1024 * 1024 * 1024);
|
||||
indexInfo("open read file name:%s, size: %d", wc->file.buf, wc->file.size);
|
||||
if (wc == NULL) { return NULL; }
|
||||
if (wc == NULL) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
TFileReader* reader = tfileReaderCreate(wc);
|
||||
return reader;
|
||||
|
@ -316,19 +328,25 @@ int tfileWriterPut(TFileWriter* tw, void* data, bool order) {
|
|||
return 0;
|
||||
}
|
||||
void tfileWriterClose(TFileWriter* tw) {
|
||||
if (tw == NULL) { return; }
|
||||
if (tw == NULL) {
|
||||
return;
|
||||
}
|
||||
writerCtxDestroy(tw->ctx, false);
|
||||
free(tw);
|
||||
}
|
||||
void tfileWriterDestroy(TFileWriter* tw) {
|
||||
if (tw == NULL) { return; }
|
||||
if (tw == NULL) {
|
||||
return;
|
||||
}
|
||||
writerCtxDestroy(tw->ctx, false);
|
||||
free(tw);
|
||||
}
|
||||
|
||||
IndexTFile* indexTFileCreate(const char* path) {
|
||||
TFileCache* cache = tfileCacheCreate(path);
|
||||
if (cache == NULL) { return NULL; }
|
||||
if (cache == NULL) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
IndexTFile* tfile = calloc(1, sizeof(IndexTFile));
|
||||
if (tfile == NULL) {
|
||||
|
@ -340,21 +358,27 @@ IndexTFile* indexTFileCreate(const char* path) {
|
|||
return tfile;
|
||||
}
|
||||
void indexTFileDestroy(IndexTFile* tfile) {
|
||||
if (tfile == NULL) { return; }
|
||||
if (tfile == NULL) {
|
||||
return;
|
||||
}
|
||||
tfileCacheDestroy(tfile->cache);
|
||||
free(tfile);
|
||||
}
|
||||
|
||||
int indexTFileSearch(void* tfile, SIndexTermQuery* query, SArray* result) {
|
||||
int ret = -1;
|
||||
if (tfile == NULL) { return ret; }
|
||||
if (tfile == NULL) {
|
||||
return ret;
|
||||
}
|
||||
|
||||
IndexTFile* pTfile = (IndexTFile*)tfile;
|
||||
|
||||
SIndexTerm* term = query->term;
|
||||
ICacheKey key = {.suid = term->suid, .colType = term->colType, .colName = term->colName, .nColName = term->nColName};
|
||||
TFileReader* reader = tfileCacheGet(pTfile->cache, &key);
|
||||
if (reader == NULL) { return 0; }
|
||||
if (reader == NULL) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
return tfileReaderSearch(reader, query, result);
|
||||
}
|
||||
|
@ -373,7 +397,9 @@ static bool tfileIteratorNext(Iterate* iiter) {
|
|||
|
||||
TFileFstIter* tIter = iiter->iter;
|
||||
StreamWithStateResult* rt = streamWithStateNextWith(tIter->st, NULL);
|
||||
if (rt == NULL) { return false; }
|
||||
if (rt == NULL) {
|
||||
return false;
|
||||
}
|
||||
|
||||
int32_t sz = 0;
|
||||
char* ch = (char*)fstSliceData(&rt->data, &sz);
|
||||
|
@ -383,7 +409,9 @@ static bool tfileIteratorNext(Iterate* iiter) {
|
|||
offset = (uint64_t)(rt->out.out);
|
||||
swsResultDestroy(rt);
|
||||
// set up iterate value
|
||||
if (tfileReaderLoadTableIds(tIter->rdr, offset, iv->val) != 0) { return false; }
|
||||
if (tfileReaderLoadTableIds(tIter->rdr, offset, iv->val) != 0) {
|
||||
return false;
|
||||
}
|
||||
|
||||
iv->colVal = colVal;
|
||||
return true;
|
||||
|
@ -394,7 +422,9 @@ static IterateValue* tifileIterateGetValue(Iterate* iter) { return &iter->val; }
|
|||
|
||||
static TFileFstIter* tfileFstIteratorCreate(TFileReader* reader) {
|
||||
TFileFstIter* tIter = calloc(1, sizeof(TFileFstIter));
|
||||
if (tIter == NULL) { return NULL; }
|
||||
if (tIter == NULL) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
tIter->ctx = automCtxCreate(NULL, AUTOMATION_ALWAYS);
|
||||
tIter->fb = fstSearch(reader->fst, tIter->ctx);
|
||||
|
@ -404,7 +434,9 @@ static TFileFstIter* tfileFstIteratorCreate(TFileReader* reader) {
|
|||
}
|
||||
|
||||
Iterate* tfileIteratorCreate(TFileReader* reader) {
|
||||
if (reader == NULL) { return NULL; }
|
||||
if (reader == NULL) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
Iterate* iter = calloc(1, sizeof(Iterate));
|
||||
iter->iter = tfileFstIteratorCreate(reader);
|
||||
|
@ -419,7 +451,9 @@ Iterate* tfileIteratorCreate(TFileReader* reader) {
|
|||
return iter;
|
||||
}
|
||||
void tfileIteratorDestroy(Iterate* iter) {
|
||||
if (iter == NULL) { return; }
|
||||
if (iter == NULL) {
|
||||
return;
|
||||
}
|
||||
|
||||
IterateValue* iv = &iter->val;
|
||||
iterateValueDestroy(iv, true);
|
||||
|
@ -434,7 +468,9 @@ void tfileIteratorDestroy(Iterate* iter) {
|
|||
}
|
||||
|
||||
TFileReader* tfileGetReaderByCol(IndexTFile* tf, uint64_t suid, char* colName) {
|
||||
if (tf == NULL) { return NULL; }
|
||||
if (tf == NULL) {
|
||||
return NULL;
|
||||
}
|
||||
ICacheKey key = {.suid = suid, .colType = TSDB_DATA_TYPE_BINARY, .colName = colName, .nColName = strlen(colName)};
|
||||
return tfileCacheGet(tf->cache, &key);
|
||||
}
|
||||
|
@ -446,7 +482,9 @@ static int tfileUidCompare(const void* a, const void* b) {
|
|||
}
|
||||
static int tfileStrCompare(const void* a, const void* b) {
|
||||
int ret = strcmp((char*)a, (char*)b);
|
||||
if (ret == 0) { return ret; }
|
||||
if (ret == 0) {
|
||||
return ret;
|
||||
}
|
||||
return ret < 0 ? -1 : 1;
|
||||
}
|
||||
|
||||
|
@ -461,13 +499,17 @@ static int tfileValueCompare(const void* a, const void* b, const void* param) {
|
|||
|
||||
TFileValue* tfileValueCreate(char* val) {
|
||||
TFileValue* tf = calloc(1, sizeof(TFileValue));
|
||||
if (tf == NULL) { return NULL; }
|
||||
if (tf == NULL) {
|
||||
return NULL;
|
||||
}
|
||||
tf->colVal = tstrdup(val);
|
||||
tf->tableId = taosArrayInit(32, sizeof(uint64_t));
|
||||
return tf;
|
||||
}
|
||||
int tfileValuePush(TFileValue* tf, uint64_t val) {
|
||||
if (tf == NULL) { return -1; }
|
||||
if (tf == NULL) {
|
||||
return -1;
|
||||
}
|
||||
taosArrayPush(tf->tableId, &val);
|
||||
return 0;
|
||||
}
|
||||
|
@ -489,7 +531,9 @@ static int tfileWriteFstOffset(TFileWriter* tw, int32_t offset) {
|
|||
int32_t fstOffset = offset + sizeof(tw->header.fstOffset);
|
||||
tw->header.fstOffset = fstOffset;
|
||||
|
||||
if (sizeof(fstOffset) != tw->ctx->write(tw->ctx, (char*)&fstOffset, sizeof(fstOffset))) { return -1; }
|
||||
if (sizeof(fstOffset) != tw->ctx->write(tw->ctx, (char*)&fstOffset, sizeof(fstOffset))) {
|
||||
return -1;
|
||||
}
|
||||
indexInfo("tfile write fst offset: %d", tw->ctx->size(tw->ctx));
|
||||
tw->offset += sizeof(fstOffset);
|
||||
return 0;
|
||||
|
@ -502,7 +546,9 @@ static int tfileWriteHeader(TFileWriter* writer) {
|
|||
|
||||
indexInfo("tfile pre write header size: %d", writer->ctx->size(writer->ctx));
|
||||
int nwrite = writer->ctx->write(writer->ctx, buf, sizeof(buf));
|
||||
if (sizeof(buf) != nwrite) { return -1; }
|
||||
if (sizeof(buf) != nwrite) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
indexInfo("tfile after write header size: %d", writer->ctx->size(writer->ctx));
|
||||
writer->offset = nwrite;
|
||||
|
@ -556,7 +602,9 @@ static int tfileReaderLoadFst(TFileReader* reader) {
|
|||
static int FST_MAX_SIZE = 64 * 1024 * 1024;
|
||||
|
||||
char* buf = calloc(1, sizeof(char) * FST_MAX_SIZE);
|
||||
if (buf == NULL) { return -1; }
|
||||
if (buf == NULL) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
WriterCtx* ctx = reader->ctx;
|
||||
int size = ctx->size(ctx);
|
||||
|
@ -586,12 +634,16 @@ static int tfileReaderLoadTableIds(TFileReader* reader, int32_t offset, SArray*
|
|||
|
||||
int32_t total = sizeof(uint64_t) * nid;
|
||||
char* buf = calloc(1, total);
|
||||
if (buf == NULL) { return -1; }
|
||||
if (buf == NULL) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
nread = ctx->readFrom(ctx, buf, total, offset + sizeof(nid));
|
||||
assert(total == nread);
|
||||
|
||||
for (int32_t i = 0; i < nid; i++) { taosArrayPush(result, (uint64_t*)buf + i); }
|
||||
for (int32_t i = 0; i < nid; i++) {
|
||||
taosArrayPush(result, (uint64_t*)buf + i);
|
||||
}
|
||||
free(buf);
|
||||
return 0;
|
||||
}
|
||||
|
@ -615,13 +667,17 @@ static int tfileReaderVerify(TFileReader* reader) {
|
|||
}
|
||||
|
||||
void tfileReaderRef(TFileReader* reader) {
|
||||
if (reader == NULL) { return; }
|
||||
if (reader == NULL) {
|
||||
return;
|
||||
}
|
||||
int ref = T_REF_INC(reader);
|
||||
UNUSED(ref);
|
||||
}
|
||||
|
||||
void tfileReaderUnRef(TFileReader* reader) {
|
||||
if (reader == NULL) { return; }
|
||||
if (reader == NULL) {
|
||||
return;
|
||||
}
|
||||
int ref = T_REF_DEC(reader);
|
||||
if (ref == 0) {
|
||||
// do nothing
|
||||
|
@ -637,11 +693,15 @@ static SArray* tfileGetFileList(const char* path) {
|
|||
uint32_t version;
|
||||
|
||||
DIR* dir = opendir(path);
|
||||
if (NULL == dir) { return NULL; }
|
||||
if (NULL == dir) {
|
||||
return NULL;
|
||||
}
|
||||
struct dirent* entry;
|
||||
while ((entry = readdir(dir)) != NULL) {
|
||||
char* file = entry->d_name;
|
||||
if (0 != tfileParseFileName(file, &suid, buf, &version)) { continue; }
|
||||
if (0 != tfileParseFileName(file, &suid, buf, &version)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
size_t len = strlen(path) + 1 + strlen(file) + 1;
|
||||
char* buf = calloc(1, len);
|
||||
|
|
|
@ -1,5 +1,7 @@
|
|||
add_executable(indexTest "")
|
||||
add_executable(fstTest "")
|
||||
add_executable(fstUT "")
|
||||
|
||||
target_sources(indexTest
|
||||
PRIVATE
|
||||
"indexTests.cc"
|
||||
|
@ -8,6 +10,11 @@ target_sources(fstTest
|
|||
PRIVATE
|
||||
"fstTest.cc"
|
||||
)
|
||||
|
||||
target_sources(fstUT
|
||||
PRIVATE
|
||||
"fstUT.cc"
|
||||
)
|
||||
target_include_directories ( indexTest
|
||||
PUBLIC
|
||||
"${CMAKE_SOURCE_DIR}/include/libs/index"
|
||||
|
@ -18,6 +25,12 @@ target_include_directories ( fstTest
|
|||
"${CMAKE_SOURCE_DIR}/include/libs/index"
|
||||
"${CMAKE_CURRENT_SOURCE_DIR}/../inc"
|
||||
)
|
||||
|
||||
target_include_directories ( fstUT
|
||||
PUBLIC
|
||||
"${CMAKE_SOURCE_DIR}/include/libs/index"
|
||||
"${CMAKE_CURRENT_SOURCE_DIR}/../inc"
|
||||
)
|
||||
target_link_libraries (indexTest
|
||||
os
|
||||
util
|
||||
|
@ -32,6 +45,13 @@ target_link_libraries (fstTest
|
|||
gtest_main
|
||||
index
|
||||
)
|
||||
target_link_libraries (fstUT
|
||||
os
|
||||
util
|
||||
common
|
||||
gtest_main
|
||||
index
|
||||
)
|
||||
|
||||
|
||||
#add_test(
|
||||
|
|
|
@ -58,7 +58,9 @@ class FstReadMemory {
|
|||
bool init() {
|
||||
char* buf = (char*)calloc(1, sizeof(char) * _size);
|
||||
int nRead = fstCountingWriterRead(_w, (uint8_t*)buf, _size);
|
||||
if (nRead <= 0) { return false; }
|
||||
if (nRead <= 0) {
|
||||
return false;
|
||||
}
|
||||
_size = nRead;
|
||||
_s = fstSliceCreate((uint8_t*)buf, _size);
|
||||
_fst = fstCreate(&_s);
|
||||
|
@ -97,7 +99,8 @@ class FstReadMemory {
|
|||
printf("key: %s, val: %" PRIu64 "\n", key.c_str(), (uint64_t)(rt->out.out));
|
||||
swsResultDestroy(rt);
|
||||
}
|
||||
for (size_t i = 0; i < result.size(); i++) {}
|
||||
for (size_t i = 0; i < result.size(); i++) {
|
||||
}
|
||||
std::cout << std::endl;
|
||||
return true;
|
||||
}
|
||||
|
@ -173,7 +176,9 @@ void checkMillonWriteAndReadOfFst() {
|
|||
delete fw;
|
||||
FstReadMemory* fr = new FstReadMemory(1024 * 64 * 1024);
|
||||
|
||||
if (fr->init()) { printf("success to init fst read"); }
|
||||
if (fr->init()) {
|
||||
printf("success to init fst read");
|
||||
}
|
||||
|
||||
Performance_fstReadRecords(fr);
|
||||
tfCleanup();
|
||||
|
|
|
@ -0,0 +1,232 @@
|
|||
|
||||
#include <gtest/gtest.h>
|
||||
#include <algorithm>
|
||||
#include <iostream>
|
||||
#include <string>
|
||||
#include <thread>
|
||||
#include <vector>
|
||||
#include "index.h"
|
||||
#include "indexInt.h"
|
||||
#include "index_cache.h"
|
||||
#include "index_fst.h"
|
||||
#include "index_fst_counting_writer.h"
|
||||
#include "index_fst_util.h"
|
||||
#include "index_tfile.h"
|
||||
#include "tglobal.h"
|
||||
#include "tskiplist.h"
|
||||
#include "tutil.h"
|
||||
#include "ulog.h"
|
||||
|
||||
static std::string dir = "/tmp/index";
|
||||
|
||||
static char indexlog[PATH_MAX] = {0};
|
||||
static char tindex[PATH_MAX] = {0};
|
||||
static char tindexDir[PATH_MAX] = {0};
|
||||
|
||||
static void EnvInit() {
|
||||
tfInit();
|
||||
|
||||
std::string path = dir;
|
||||
taosRemoveDir(path.c_str());
|
||||
taosMkDir(path.c_str());
|
||||
// init log file
|
||||
snprintf(indexlog, PATH_MAX, "%s/tindex.idx", path.c_str());
|
||||
if (taosInitLog(indexlog, tsNumOfLogLines, 1) != 0) {
|
||||
printf("failed to init log");
|
||||
}
|
||||
// init index file
|
||||
memset(tindex, 0, sizeof(tindex));
|
||||
snprintf(tindex, PATH_MAX, "%s/tindex.idx", path.c_str());
|
||||
}
|
||||
static void EnvCleanup() {}
|
||||
class FstWriter {
|
||||
public:
|
||||
FstWriter() {
|
||||
_wc = writerCtxCreate(TFile, tindex, false, 64 * 1024 * 1024);
|
||||
_b = fstBuilderCreate(_wc, 0);
|
||||
}
|
||||
bool Put(const std::string& key, uint64_t val) {
|
||||
// char buf[128] = {0};
|
||||
// int len = 0;
|
||||
// taosMbsToUcs4(key.c_str(), key.size(), buf, 128, &len);
|
||||
// FstSlice skey = fstSliceCreate((uint8_t*)buf, len);
|
||||
FstSlice skey = fstSliceCreate((uint8_t*)key.c_str(), key.size());
|
||||
bool ok = fstBuilderInsert(_b, skey, val);
|
||||
|
||||
fstSliceDestroy(&skey);
|
||||
return ok;
|
||||
}
|
||||
~FstWriter() {
|
||||
fstBuilderFinish(_b);
|
||||
fstBuilderDestroy(_b);
|
||||
|
||||
writerCtxDestroy(_wc, false);
|
||||
}
|
||||
|
||||
private:
|
||||
FstBuilder* _b;
|
||||
WriterCtx* _wc;
|
||||
};
|
||||
|
||||
class FstReadMemory {
|
||||
public:
|
||||
FstReadMemory(size_t size) {
|
||||
_wc = writerCtxCreate(TFile, tindex, true, 64 * 1024);
|
||||
_w = fstCountingWriterCreate(_wc);
|
||||
_size = size;
|
||||
memset((void*)&_s, 0, sizeof(_s));
|
||||
}
|
||||
bool init() {
|
||||
char* buf = (char*)calloc(1, sizeof(char) * _size);
|
||||
int nRead = fstCountingWriterRead(_w, (uint8_t*)buf, _size);
|
||||
if (nRead <= 0) {
|
||||
return false;
|
||||
}
|
||||
_size = nRead;
|
||||
_s = fstSliceCreate((uint8_t*)buf, _size);
|
||||
_fst = fstCreate(&_s);
|
||||
free(buf);
|
||||
return _fst != NULL;
|
||||
}
|
||||
bool Get(const std::string& key, uint64_t* val) {
|
||||
// char buf[128] = {0};
|
||||
// int len = 0;
|
||||
// taosMbsToUcs4(key.c_str(), key.size(), buf, 128, &len);
|
||||
// FstSlice skey = fstSliceCreate((uint8_t*)buf, len);
|
||||
|
||||
FstSlice skey = fstSliceCreate((uint8_t*)key.c_str(), key.size());
|
||||
bool ok = fstGet(_fst, &skey, val);
|
||||
fstSliceDestroy(&skey);
|
||||
return ok;
|
||||
}
|
||||
bool GetWithTimeCostUs(const std::string& key, uint64_t* val, uint64_t* elapse) {
|
||||
int64_t s = taosGetTimestampUs();
|
||||
bool ok = this->Get(key, val);
|
||||
int64_t e = taosGetTimestampUs();
|
||||
*elapse = e - s;
|
||||
return ok;
|
||||
}
|
||||
// add later
|
||||
bool Search(AutomationCtx* ctx, std::vector<uint64_t>& result) {
|
||||
FstStreamBuilder* sb = fstSearch(_fst, ctx);
|
||||
StreamWithState* st = streamBuilderIntoStream(sb);
|
||||
StreamWithStateResult* rt = NULL;
|
||||
while ((rt = streamWithStateNextWith(st, NULL)) != NULL) {
|
||||
// result.push_back((uint64_t)(rt->out.out));
|
||||
FstSlice* s = &rt->data;
|
||||
int32_t sz = 0;
|
||||
char* ch = (char*)fstSliceData(s, &sz);
|
||||
std::string key(ch, sz);
|
||||
printf("key: %s, val: %" PRIu64 "\n", key.c_str(), (uint64_t)(rt->out.out));
|
||||
swsResultDestroy(rt);
|
||||
}
|
||||
for (size_t i = 0; i < result.size(); i++) {
|
||||
}
|
||||
std::cout << std::endl;
|
||||
return true;
|
||||
}
|
||||
bool SearchWithTimeCostUs(AutomationCtx* ctx, std::vector<uint64_t>& result) {
|
||||
int64_t s = taosGetTimestampUs();
|
||||
bool ok = this->Search(ctx, result);
|
||||
int64_t e = taosGetTimestampUs();
|
||||
return ok;
|
||||
}
|
||||
|
||||
~FstReadMemory() {
|
||||
fstCountingWriterDestroy(_w);
|
||||
fstDestroy(_fst);
|
||||
fstSliceDestroy(&_s);
|
||||
writerCtxDestroy(_wc, false);
|
||||
tfCleanup();
|
||||
}
|
||||
|
||||
private:
|
||||
FstCountingWriter* _w;
|
||||
Fst* _fst;
|
||||
FstSlice _s;
|
||||
WriterCtx* _wc;
|
||||
size_t _size;
|
||||
};
|
||||
|
||||
class FstWriterEnv : public ::testing::Test {
|
||||
protected:
|
||||
virtual void SetUp() { fw = new FstWriter(); }
|
||||
virtual void TearDown() { delete fw; }
|
||||
FstWriter* fw = NULL;
|
||||
};
|
||||
|
||||
class FstReadEnv : public ::testing::Test {
|
||||
protected:
|
||||
virtual void SetUp() { fr = new FstReadMemory(1024); }
|
||||
virtual void TearDown() { delete fr; }
|
||||
FstReadMemory* fr = NULL;
|
||||
};
|
||||
|
||||
class TFst {
|
||||
public:
|
||||
void CreateWriter() { fw = new FstWriter; }
|
||||
void ReCreateWriter() {
|
||||
if (fw != NULL) delete fw;
|
||||
fw = new FstWriter;
|
||||
}
|
||||
void DestroyWriter() {
|
||||
if (fw != NULL) delete fw;
|
||||
}
|
||||
void CreateReader() {
|
||||
fr = new FstReadMemory(1024);
|
||||
fr->init();
|
||||
}
|
||||
void ReCreateReader() {
|
||||
if (fr != NULL) delete fr;
|
||||
fr = new FstReadMemory(1024);
|
||||
}
|
||||
void DestroyReader() {
|
||||
delete fr;
|
||||
fr = NULL;
|
||||
}
|
||||
bool Put(const std::string& k, uint64_t v) {
|
||||
if (fw == NULL) {
|
||||
return false;
|
||||
}
|
||||
return fw->Put(k, v);
|
||||
}
|
||||
bool Get(const std::string& k, uint64_t* v) {
|
||||
if (fr == NULL) {
|
||||
return false;
|
||||
}
|
||||
return fr->Get(k, v);
|
||||
}
|
||||
|
||||
private:
|
||||
FstWriter* fw;
|
||||
FstReadMemory* fr;
|
||||
};
|
||||
class FstEnv : public ::testing::Test {
|
||||
protected:
|
||||
virtual void SetUp() {
|
||||
EnvInit();
|
||||
fst = new TFst;
|
||||
}
|
||||
virtual void TearDown() { delete fst; }
|
||||
TFst* fst;
|
||||
};
|
||||
|
||||
TEST_F(FstEnv, writeNormal) {
|
||||
fst->CreateWriter();
|
||||
std::string str("aa");
|
||||
for (int i = 0; i < 10; i++) {
|
||||
str[0] = 'a' + i;
|
||||
str.resize(2);
|
||||
assert(fst->Put(str, i) == true);
|
||||
}
|
||||
// order failed
|
||||
assert(fst->Put("aa", 1) == false);
|
||||
|
||||
fst->DestroyWriter();
|
||||
|
||||
fst->CreateReader();
|
||||
uint64_t val;
|
||||
assert(fst->Get("a", &val) == false);
|
||||
assert(fst->Get("aa", &val) == true);
|
||||
assert(val == 0);
|
||||
}
|
Loading…
Reference in New Issue