diff --git a/source/libs/index/inc/index_fst.h b/source/libs/index/inc/index_fst.h index 1230fe17ff..5a8138b126 100644 --- a/source/libs/index/inc/index_fst.h +++ b/source/libs/index/inc/index_fst.h @@ -20,6 +20,7 @@ #include "tarray.h" #include "index_fst_util.h" #include "index_fst_registry.h" +#include "index_fst_counting_writer.h" typedef struct FstNode FstNode; @@ -35,7 +36,6 @@ typedef struct FstRange { typedef enum { OneTransNext, OneTrans, AnyTrans, EmptyFinal} State; typedef enum { Included, Excluded, Unbounded} FstBound; -typedef uint32_t CheckSummer; /* @@ -49,7 +49,8 @@ typedef struct FstUnFinishedNodes { #define FST_UNFINISHED_NODES_LEN(nodes) taosArrayGetSize(nodes->stack) -FstUnFinishedNodes *FstUnFinishedNodesCreate(); +FstUnFinishedNodes *fstUnFinishedNodesCreate(); +void fstUnFinishedNodesDestroy(FstUnFinishedNodes *node); void fstUnFinishedNodesPushEmpty(FstUnFinishedNodes *nodes, bool isFinal); FstBuilderNode *fstUnFinishedNodesPopRoot(FstUnFinishedNodes *nodes); FstBuilderNode *fstUnFinishedNodesPopFreeze(FstUnFinishedNodes *nodes, CompiledAddr addr); @@ -58,18 +59,13 @@ void fstUnFinishedNodesSetRootOutput(FstUnFinishedNodes *node, Output out); void fstUnFinishedNodesTopLastFreeze(FstUnFinishedNodes *node, CompiledAddr addr); void fstUnFinishedNodesAddSuffix(FstUnFinishedNodes *node, FstSlice bs, Output out); uint64_t fstUnFinishedNodesFindCommPrefix(FstUnFinishedNodes *node, FstSlice bs); -uint64_t FstUnFinishedNodesFindCommPreifxAndSetOutput(FstUnFinishedNodes *node, FstSlice bs, Output in, Output *out); +uint64_t fstUnFinishedNodesFindCommPrefixAndSetOutput(FstUnFinishedNodes *node, FstSlice bs, Output in, Output *out); -typedef struct FstCountingWriter { - void* wtr; // wrap any writer that counts and checksum bytes written - uint64_t count; - CheckSummer summer; -} FstCountingWriter; typedef struct FstBuilder { - FstCountingWriter wtr; // The FST raw data is written directly to `wtr`. + FstCountingWriter *wrt; // The FST raw data is written directly to `wtr`. FstUnFinishedNodes *unfinished; // The stack of unfinished nodes - FstRegistry registry; // A map of finished nodes. + FstRegistry* registry; // A map of finished nodes. SArray* last; // The last word added CompiledAddr lastAddr; // The address of the last compiled node uint64_t len; // num of keys added @@ -127,6 +123,8 @@ typedef struct FstNode { #define FST_NODE_ADDR(node) node->start FstNode *fstNodeCreate(int64_t version, CompiledAddr addr, FstSlice *data); +void fstNodeDestroy(FstNode *fstNode); + FstTransitions fstNodeTransitionIter(FstNode *node); FstTransitions* fstNodeTransitions(FstNode *node); bool fstNodeGetTransitionAt(FstNode *node, uint64_t i, FstTransition *res); @@ -157,6 +155,9 @@ typedef struct FstIndexedValue { uint64_t value; } FstIndexedValue; +FstLastTransition *fstLastTransitionCreate(uint8_t inp, Output out); +void fstLastTransitionDestroy(FstLastTransition *trn); + diff --git a/source/libs/index/inc/index_fst_counting_writer.h b/source/libs/index/inc/index_fst_counting_writer.h new file mode 100644 index 0000000000..0eba963239 --- /dev/null +++ b/source/libs/index/inc/index_fst_counting_writer.h @@ -0,0 +1,43 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef __INDEX_FST_COUNTING_WRITER_H__ +#define __INDEX_FST_COUNTING_WRITER_H__ + +typedef uint32_t CheckSummer; + + +typedef struct FstCountingWriter { + void* wrt; // wrap any writer that counts and checksum bytes written + uint64_t count; + CheckSummer summer; +} FstCountingWriter; + +uint64_t fstCountingWriterWrite(FstCountingWriter *write, uint8_t *buf, uint32_t bufLen); + +int FstCountingWriterFlush(FstCountingWriter *write); + + +FstCountingWriter *fstCountingWriterCreate(void *wtr); +void fstCountingWriterDestroy(FstCountingWriter *w); + + +#define FST_WRITER_COUNT(writer) (writer->count) +#define FST_WRITER_INTER_WRITER(writer) (writer->wtr) +#define FST_WRITE_CHECK_SUMMER(writer) (writer->summer) + +#endif + + diff --git a/source/libs/index/inc/index_fst_node.h b/source/libs/index/inc/index_fst_node.h index 631c7026c5..ddd7e1f450 100644 --- a/source/libs/index/inc/index_fst_node.h +++ b/source/libs/index/inc/index_fst_node.h @@ -17,7 +17,11 @@ #define __INDEX_FST_NODE_H__ #include "index_fst_util.h" +#include "index_fst_counting_writer.h" +#define FST_BUILDER_NODE_IS_FINAL(bn) (bn->isFinal) +#define FST_BUILDER_NODE_TRANS_ISEMPTY(bn) (taosArrayGetSize(bn->trans) == 0) +#define FST_BUILDER_NODE_FINALOUTPUT_ISZERO(bn) (bn->finalOutput == 0) typedef struct FstTransition { uint8_t inp; //The byte input associated with this transition. @@ -37,4 +41,8 @@ FstBuilderNode *fstBuilderNodeClone(FstBuilderNode *src); void fstBuilderNodeCloneFrom(FstBuilderNode *dst, FstBuilderNode *src); +bool fstBuilderNodeCompileTo(FstBuilderNode *b, FstCountingWriter *wrt, CompiledAddr lastAddr, CompiledAddr startAddr); + +void fstBuilderNodeDestroy(FstBuilderNode *node); + #endif diff --git a/source/libs/index/inc/index_fst_registry.h b/source/libs/index/inc/index_fst_registry.h index f19bb750c2..2504d7ccff 100644 --- a/source/libs/index/inc/index_fst_registry.h +++ b/source/libs/index/inc/index_fst_registry.h @@ -24,6 +24,8 @@ typedef struct FstRegistryCell { FstBuilderNode *node; } FstRegistryCell; +#define FST_REGISTRY_CELL_IS_EMPTY(cell) (cell->addr == NONE_ADDRESS) +#define FST_REGISTRY_CELL_INSERT(cell, tAddr) do {cell->addr = tAddr;} while(0) //typedef struct FstRegistryCache { @@ -44,14 +46,17 @@ typedef struct FstRegistryEntry { // Registry relation function typedef struct FstRegistry { - SArray *table; + SArray *table; // uint64_t tableSize; // num of rows uint64_t mruSize; // num of columns } FstRegistry; // FstRegistry* fstRegistryCreate(uint64_t tableSize, uint64_t mruSize); +void fstRegistryDestroy(FstRegistry *registry); + FstRegistryEntry* fstRegistryGetEntry(FstRegistry *registry, FstBuilderNode *bNode); +void fstRegistryEntryDestroy(FstRegistryEntry *entry); #endif diff --git a/source/libs/index/src/index.c b/source/libs/index/src/index.c index 91cfcb5cdf..f8f4311a4a 100644 --- a/source/libs/index/src/index.c +++ b/source/libs/index/src/index.c @@ -183,6 +183,7 @@ void indexMultiTermDestroy(SArray *array) { } taosArrayDestroy(array); } + void indexInit() { //do nothing } diff --git a/source/libs/index/src/index_fst.c b/source/libs/index/src/index_fst.c index 2974e7f9b5..a7ae3f2fb6 100644 --- a/source/libs/index/src/index_fst.c +++ b/source/libs/index/src/index_fst.c @@ -24,6 +24,18 @@ FstUnFinishedNodes *fstUnFinishedNodesCreate() { fstUnFinishedNodesPushEmpty(nodes, false); return nodes; } +void unFinishedNodeDestroyElem(void* elem) { + FstBuilderNodeUnfinished *b = (FstBuilderNodeUnfinished*)elem; + fstBuilderNodeDestroy(b->node); + free(b->last); +} +void fstUnFinishedNodeDestroy(FstUnFinishedNodes *nodes) { + if (nodes == NULL) { return; } + + taosArrayDestroyEx(nodes->stack, unFinishedNodeDestroyElem); + free(nodes); +} + void fstUnFinishedNodesPushEmpty(FstUnFinishedNodes *nodes, bool isFinal) { FstBuilderNode *node = malloc(sizeof(FstBuilderNode)); node->isFinal = isFinal; @@ -76,11 +88,11 @@ void fstUnFinishedNodesAddSuffix(FstUnFinishedNodes *nodes, FstSlice bs, Output assert(un->last == NULL); - FstLastTransition *trn = malloc(sizeof(FstLastTransition)); - trn->inp = s->data[s->start]; - trn->out = out; - un->last = trn; + //FstLastTransition *trn = malloc(sizeof(FstLastTransition)); + //trn->inp = s->data[s->start]; + //trn->out = out; + un->last = fstLastTransitionCreate(s->data[s->start], out); for (uint64_t i = s->start; i <= s->end; i++) { FstBuilderNode *n = malloc(sizeof(FstBuilderNode)); @@ -88,9 +100,10 @@ void fstUnFinishedNodesAddSuffix(FstUnFinishedNodes *nodes, FstSlice bs, Output n->finalOutput = 0; n->trans = NULL; - FstLastTransition *trn = malloc(sizeof(FstLastTransition)); - trn->inp = s->data[i]; - trn->out = out; + //FstLastTransition *trn = malloc(sizeof(FstLastTransition)); + //trn->inp = s->data[i]; + //trn->out = out; + FstLastTransition *trn = fstLastTransitionCreate(s->data[i], out); FstBuilderNodeUnfinished un = {.node = n, .last = trn}; taosArrayPush(nodes->stack, &un); @@ -116,7 +129,7 @@ uint64_t fstUnFinishedNodesFindCommPrefix(FstUnFinishedNodes *node, FstSlice bs) } return count; } -uint64_t FstUnFinishedNodesFindCommPrefixAndSetOutput(FstUnFinishedNodes *node, FstSlice bs, Output in, Output *out) { +uint64_t fstUnFinishedNodesFindCommPrefixAndSetOutput(FstUnFinishedNodes *node, FstSlice bs, Output in, Output *out) { FstSlice *s = &bs; size_t lsz = (size_t)(s->end - s->start + 1); // data len @@ -199,6 +212,10 @@ FstNode *fstNodeCreate(int64_t version, CompiledAddr addr, FstSlice *slice) { } return n; } +void fstNodeDestroy(FstNode *node) { + if (node == NULL) { return; } + free(node); +} FstTransitions* fstNodeTransitions(FstNode *node) { FstTransitions *t = malloc(sizeof(FstTransitions)); if (NULL == t) { @@ -275,22 +292,74 @@ bool fstNodeCompile(FstNode *node, void *w, CompiledAddr lastAddr, CompiledAddr } - - FstBuilder *fstBuilderCreate(void *w, FstType ty) { FstBuilder *b = malloc(sizeof(FstBuilder)); if (NULL == b) { return b; } - FstCountingWriter wtr = {.wtr = w, .count = 0, .summer = 0}; - b->wtr = wtr; - b->unfinished = malloc(sizeof(FstUnFinishedNodes)); + + b->wrt = fstCountingWriterCreate(w); + b->unfinished = fstUnFinishedNodesCreate(); + b->registry = fstRegistryCreate(10000, 2) ; + b->last = NULL; + b->lastAddr = NONE_ADDRESS; + b->len = 0; return b; - } + + +void fstBuilderCheckLastKey(FstBuilder *b, FstSlice bs, bool ckDupe) { + return; +} + +CompiledAddr fstBuilderCompile(FstBuilder *b, FstBuilderNode *bn) { + if (FST_BUILDER_NODE_IS_FINAL(bn) + && FST_BUILDER_NODE_TRANS_ISEMPTY(bn) + && FST_BUILDER_NODE_FINALOUTPUT_ISZERO(bn)) { + return EMPTY_ADDRESS; + } + FstRegistryEntry *entry = fstRegistryGetEntry(b->registry, bn); + if (entry->state == FOUND) { + CompiledAddr ret = entry->addr; + fstRegistryEntryDestroy(entry); + return ret; + } + CompiledAddr startAddr = (CompiledAddr)(FST_WRITER_COUNT(b->wrt)); + + fstBuilderNodeCompileTo(bn, b->wrt, b->lastAddr, startAddr); + b->lastAddr = (CompiledAddr)(FST_WRITER_COUNT(b->wrt) - 1); + if (entry->state == NOTFOUND) { + FST_REGISTRY_CELL_INSERT(entry->cell, b->lastAddr); + } + fstRegistryEntryDestroy(entry); + + return b->lastAddr; +} + + FstSlice fstNodeAsSlice(FstNode *node) { FstSlice *slice = &node->data; FstSlice s = fstSliceCopy(slice, slice->end, slice->dLen - 1); return s; } +FstLastTransition *fstLastTransitionCreate(uint8_t inp, Output out) { + FstLastTransition *trn = malloc(sizeof(FstLastTransition)); + if (trn == NULL) { return NULL; } + + trn->inp = inp; + trn->out = out; + return trn; +} + +void fstLastTransitionDestroy(FstLastTransition *trn) { + free(trn); +} +void fstBuilderNodeUnfinishedLastCompiled(FstBuilderNodeUnfinished *node, CompiledAddr addr) { + return; +} + +void fstBuilderNodeUnfinishedAddOutputPrefix(FstBuilderNodeUnfinished *node, CompiledAddr addr) { + return; +} + diff --git a/source/libs/index/src/index_fst_counting_writer.c b/source/libs/index/src/index_fst_counting_writer.c new file mode 100644 index 0000000000..1486b9b203 --- /dev/null +++ b/source/libs/index/src/index_fst_counting_writer.c @@ -0,0 +1,45 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +#include "tutil.h" +#include "index_fst_counting_writer.h" + +FstCountingWriter *fstCountingWriterCreate(void *wrt) { + FstCountingWriter *cw = calloc(1, sizeof(FstCountingWriter)); + if (cw == NULL) { return NULL; } + + cw->wrt = wrt; + return cw; +} +void fstCountingWriterDestroy(FstCountingWriter *cw) { + // free wrt object: close fd or free mem + free(cw); +} + +uint64_t fstCountingWriterWrite(FstCountingWriter *write, uint8_t *buf, uint32_t bufLen) { + if (write == NULL) { return 0; } + // update checksum + // write data to file/socket or mem + + write->count += bufLen; + return bufLen; +} + +int FstCountingWriterFlush(FstCountingWriter *write) { + //write->wtr->flush + return 1; +} + + + diff --git a/source/libs/index/src/index_fst_node.c b/source/libs/index/src/index_fst_node.c index 23af4a4a4b..5452f9cb89 100644 --- a/source/libs/index/src/index_fst_node.c +++ b/source/libs/index/src/index_fst_node.c @@ -21,19 +21,23 @@ FstBuilderNode *fstBuilderNodeDefault() { bn->trans = NULL; return bn; } +void fstBuilderNodeDestroy(FstBuilderNode *node) { + if (node == NULL) { return; } + taosArrayDestroy(node->trans); + free(node); +} FstBuilderNode *fstBuilderNodeClone(FstBuilderNode *src) { FstBuilderNode *node = malloc(sizeof(FstBuilderNode)); if (node == NULL) { return NULL; } - + // size_t sz = taosArrayGetSize(src->trans); SArray *trans = taosArrayInit(sz, sizeof(FstTransition)); for (size_t i = 0; i < sz; i++) { FstTransition *tran = taosArrayGet(src->trans, i); - FstTransition t = *tran; - taosArrayPush(trans, &t); + taosArrayPush(trans, tran); } node->trans = trans; @@ -47,9 +51,34 @@ void fstBuilderNodeCloneFrom(FstBuilderNode *dst, FstBuilderNode *src) { if (dst == NULL || src == NULL) { return; } dst->isFinal = src->isFinal; - dst->finalOutput = src->finalOutput ; - dst->trans = src->trans; + dst->finalOutput = src->finalOutput; + // avoid mem leak + taosArrayDestroy(dst->trans); + dst->trans = src->trans; src->trans = NULL; } +bool fstBuilderNodeCompileTo(FstBuilderNode *b, FstCountingWriter *wrt, CompiledAddr lastAddr, CompiledAddr startAddr) { + size_t sz = taosArrayGetSize(b->trans); + assert(sz < 256); + if (FST_BUILDER_NODE_IS_FINAL(b) + && FST_BUILDER_NODE_TRANS_ISEMPTY(b) + && FST_BUILDER_NODE_FINALOUTPUT_ISZERO(b)) { + return true; + } else if (sz != 1 || b->isFinal) { + // AnyTrans->Compile(w, addr, node); + } else { + FstTransition *tran = taosArrayGet(b->trans, 0); + if (tran->addr == lastAddr && tran->out == 0) { + //OneTransNext::compile(w, lastAddr, tran->inp); + return true; + } else { + //OneTrans::Compile(w, lastAddr, *tran); + return true; + } + } + return true; +} + + diff --git a/source/libs/index/src/index_fst_registry.c b/source/libs/index/src/index_fst_registry.c index 5d6c7b1712..b25964e0e7 100644 --- a/source/libs/index/src/index_fst_registry.c +++ b/source/libs/index/src/index_fst_registry.c @@ -32,6 +32,7 @@ uint64_t fstRegistryHash(FstRegistry *registry, FstBuilderNode *bNode) { h = (h ^ (uint64_t)(trn->addr))* FNV_PRIME; } return h %(registry->tableSize); + } static void fstRegistryCellSwap(SArray *arr, uint32_t a, uint32_t b) { size_t sz = taosArrayGetSize(arr); @@ -63,8 +64,6 @@ static void fstRegistryCellPromote(SArray *arr, uint32_t start, uint32_t end) { s -= 1; } } -#define FST_REGISTRY_CELL_IS_EMPTY(cell) (cell->addr == NONE_ADDRESS) -#define FST_REGISTRY_CELL_INSERT(cell, addr) do {cell->addr = addr;} while(0) FstRegistry* fstRegistryCreate(uint64_t tableSize, uint64_t mruSize) { FstRegistry *registry = malloc(sizeof(FstRegistry)); @@ -72,10 +71,14 @@ FstRegistry* fstRegistryCreate(uint64_t tableSize, uint64_t mruSize) { uint64_t nCells = tableSize * mruSize; SArray* tb = (SArray *)taosArrayInit(nCells, sizeof(FstRegistryCell)); + if (NULL == tb) { + free(registry); + return NULL; + } + for (uint64_t i = 0; i < nCells; i++) { - FstRegistryCell *cell = taosArrayGet(tb, i); - cell->addr = NONE_ADDRESS; - cell->node = fstBuilderNodeDefault(); + FstRegistryCell cell = {.addr = NONE_ADDRESS, .node = fstBuilderNodeDefault()}; + taosArrayPush(tb, &cell); } registry->table = tb; @@ -84,6 +87,19 @@ FstRegistry* fstRegistryCreate(uint64_t tableSize, uint64_t mruSize) { return registry; } +void fstRegistryDestroy(FstRegistry *registry) { + if (registry == NULL) { return; } + + SArray *tb = registry->table; + size_t sz = taosArrayGetSize(tb); + for (size_t i = 0; i < sz; i++) { + FstRegistryCell *cell = taosArrayGet(tb, i); + fstBuilderNodeDestroy(cell->node); + } + taosArrayDestroy(tb); + free(registry); +} + FstRegistryEntry *fstRegistryGetEntry(FstRegistry *registry, FstBuilderNode *bNode) { if (taosArrayGetSize(registry->table) <= 0) { return NULL; @@ -98,11 +114,9 @@ FstRegistryEntry *fstRegistryGetEntry(FstRegistry *registry, FstBuilderNode *bNo //cell->isNode && if (cell->addr != NONE_ADDRESS && cell->node == bNode) { entry->state = FOUND; - entry->addr = cell->addr ; + entry->addr = cell->addr ; return entry; } else { - // clone from bNode, refactor later - // fstBuilderNodeCloneFrom(cell->node, bNode); entry->state = NOTFOUND; entry->cell = cell; // copy or not @@ -154,5 +168,8 @@ FstRegistryEntry *fstRegistryGetEntry(FstRegistry *registry, FstBuilderNode *bNo } return entry; } +void fstRegistryEntryDestroy(FstRegistryEntry *entry) { + free(entry); +} diff --git a/source/libs/index/test/indexTests.cpp b/source/libs/index/test/indexTests.cpp index cc0df0d42a..857060ce5e 100644 --- a/source/libs/index/test/indexTests.cpp +++ b/source/libs/index/test/indexTests.cpp @@ -51,6 +51,7 @@ TEST(IndexTest, index_create_test) { int *v = (int *)taosArrayGet(result, i); std::cout << "value --->" << *v << std::endl; } + // add more test case indexMultiTermQueryDestroy(multiQuery); indexOptsDestroy(opts);