diff --git a/source/libs/index/inc/index_fst.h b/source/libs/index/inc/index_fst.h index 8f76c2d5cf..35858bb789 100644 --- a/source/libs/index/inc/index_fst.h +++ b/source/libs/index/inc/index_fst.h @@ -21,6 +21,7 @@ #include "index_fst_util.h" #include "index_fst_registry.h" #include "index_fst_counting_writer.h" +#include "index_fst_automation.h" typedef struct FstNode FstNode; @@ -34,11 +35,28 @@ typedef struct FstRange { typedef enum { OneTransNext, OneTrans, AnyTrans, EmptyFinal} State; -typedef enum { Included, Excluded, Unbounded} FstBound; typedef enum {Ordered, OutOfOrdered, DuplicateKey} OrderType; +typedef enum { Included, Excluded, Unbounded} FstBound; +typedef struct FstBoundWithData { + FstSlice data; + FstBound type; +} FstBoundWithData; + +FstBoundWithData* fstBoundStateCreate(FstBound type, FstSlice *data); +bool fstBoundWithDataExceededBy(FstBoundWithData *bound, FstSlice *slice); +bool fstBoundWithDataIsEmpty(FstBoundWithData *bound); +bool fstBoundWithDataIsIncluded(FstBoundWithData *bound); + + +typedef struct FstOutput { + bool null; + Output out; +} FstOutput; + + /* * @@ -261,4 +279,30 @@ bool fstVerify(Fst *fst); //refactor this function bool fstBuilderNodeCompileTo(FstBuilderNode *b, FstCountingWriter *wrt, CompiledAddr lastAddr, CompiledAddr startAddr); + + + + + +typedef struct StreamState { + FstNode *node; + uint64_t trans; + FstOutput out; + void *autState; +} StreamState; + +typedef struct StreamWithState { + Fst *fst; + Automation *aut; + SArray *inp; + FstOutput emptyOutput; + SArray *stack; // + FstBoundWithData *endAt; +} StreamWithState ; + +typedef void* (*StreamCallback)(void *); +StreamWithState *streamWithStateCreate(Fst *fst, Automation *automation, FstBoundWithData *min, FstBoundWithData *max) ; +void streamWithStateDestroy(StreamWithState *sws); +bool streamWithStateSeekMin(StreamWithState *sws, FstBoundWithData *min); +void *streamWithStateNextWith(StreamWithState *sws, StreamCallback callback); #endif diff --git a/source/libs/index/inc/index_fst_automation.h b/source/libs/index/inc/index_fst_automation.h index 7ad9a500cc..63243a0189 100644 --- a/source/libs/index/inc/index_fst_automation.h +++ b/source/libs/index/inc/index_fst_automation.h @@ -15,7 +15,7 @@ #ifndef __INDEX_FST_AUTAOMATION_H__ #define __INDEX_FST_AUTAOMATION_H__ -struct AutomationCtx; +typedef struct AutomationCtx AutomationCtx; typedef struct StartWith { AutomationCtx *autoSelf; @@ -23,20 +23,25 @@ typedef struct StartWith { typedef struct Complement { AutomationCtx *autoSelf; + } Complement; // automation typedef struct AutomationCtx { +// automation interface void *data; } AutomationCtx; -// automation interface -void (*start)(AutomationCtx *ctx); -bool (*isMatch)(AutomationCtx *ctx); -bool (*canMatch)(AutomationCtx *ctx, void *data); -bool (*willAlwaysMatch)(AutomationCtx *ctx, void *state); -void* (*accpet)(AutomationCtx *ctx, void *state, uint8_t byte); -void* (*accpetEof)(AutomationCtx *ctx, *state); +typedef struct Automation { + void* (*start)() ; + bool (*isMatch)(); + bool (*canMatch)(void *data); + bool (*willAlwaysMatch)(void *state); + void* (*accpet)(void *state, uint8_t byte); + void* (*accpetEof)(void *state); + void *data; +} Automation; + #endif diff --git a/source/libs/index/src/index_fst.c b/source/libs/index/src/index_fst.c index 94be05d0c1..6c777e704e 100644 --- a/source/libs/index/src/index_fst.c +++ b/source/libs/index/src/index_fst.c @@ -1055,4 +1055,158 @@ bool fstVerify(Fst *fst) { return true; } +// data bound function +FstBoundWithData* fstBoundStateCreate(FstBound type, FstSlice *data) { + FstBoundWithData *b = calloc(1, sizeof(FstBoundWithData)); + if (b == NULL) { return NULL; } + + b->type = type; + b->data = fstSliceCopy(data, data->start, data->end); + return b; +} + +bool fstBoundWithDataExceededBy(FstBoundWithData *bound, FstSlice *slice) { + int comp = fstSliceCompare(slice, &bound->data); + if (bound->type == Included) { + return comp > 0 ? true : false; + } else if (bound->type == Excluded) { + return comp >= 0 ? true : false; + } else { + return true; + } +} +bool fstBoundWithDataIsEmpty(FstBoundWithData *bound) { + if (bound->type == Unbounded) { + return true; + } else { + return fstSliceEmpty(&bound->data); + } +} + + +bool fstBoundWithDataIsIncluded(FstBoundWithData *bound) { + return bound->type == Included ? true : false; +} + +void fstBoundDestroy(FstBoundWithData *bound) { + free(bound); +} + +StreamWithState *streamWithStateCreate(Fst *fst, Automation *automation, FstBoundWithData *min, FstBoundWithData *max) { + StreamWithState *sws = calloc(1, sizeof(StreamWithState)); + if (sws == NULL) { return NULL; } + + sws->fst = fst; + sws->aut = automation; + sws->inp = (SArray *)taosArrayInit(256, sizeof(uint8_t)); + + sws->emptyOutput.null = false; + sws->emptyOutput.out = 0; + + sws->stack = (SArray *)taosArrayInit(256, sizeof(StreamState)); + sws->endAt = max; + streamWithStateSeekMin(sws, min); + + return sws; +} +void streamWithStateDestroy(StreamWithState *sws) { + if (sws == NULL) { return; } + + taosArrayDestroy(sws->inp); + taosArrayDestroy(sws->stack); + + free(sws); +} + +bool streamWithStateSeekMin(StreamWithState *sws, FstBoundWithData *min) { + if (fstBoundWithDataIsEmpty(min)) { + if (fstBoundWithDataIsIncluded(min)) { + sws->emptyOutput.out = fstEmptyFinalOutput(sws->fst, &(sws->emptyOutput.null)); + } + StreamState s = {.node = fstGetRoot(sws->fst), + .trans = 0, + .out = {.null = false, .out = 0}, + .autState = sws->aut->start()}; // auto.start callback + taosArrayPush(sws->stack, &s); + return true; + } + FstSlice *key = NULL; + bool inclusize = false;; + + if (min->type == Included) { + key = &min->data; + inclusize = true; + } else if (min->type == Excluded) { + key = &min->data; + } else { + return false; + } + + FstNode *node = fstGetRoot(sws->fst); + Output out = 0; + void* autState = sws->aut->start(); + + for (uint32_t i = 0; i < key->dLen; i++) { + uint8_t b = key->data[i]; + uint64_t res = 0; + bool null = fstNodeFindInput(node, b, &res); + if (null == false) { + FstTransition trn; + fstNodeGetTransitionAt(node, res, &trn); + void *preState = autState; + autState = sws->aut->accpet(preState, b); + taosArrayPush(sws->inp, &b); + StreamState s = {.node = node, + .trans = res + 1, + .out = {.null = false, .out = out}, + .autState = preState}; + taosArrayPush(sws->stack, &s); + out += trn.out; + node = fstGetNode(sws->fst, trn.addr); + } else { + + // This is a little tricky. We're in this case if the + // given bound is not a prefix of any key in the FST. + // Since this is a minimum bound, we need to find the + // first transition in this node that proceeds the current + // input byte. + FstTransitions *trans = fstNodeTransitions(node); + uint64_t i = 0; + for (i = trans->range.start; i < trans->range.end; i++) { + FstTransition trn; + if (fstNodeGetTransitionAt(node, i, &trn) && trn.inp > b) { + break; + } + } + + StreamState s = {.node = node, + .trans = i, + .out = {.null = false, .out = out}, + .autState = autState}; + taosArrayPush(sws->stack, &s); + return true; + } + } + uint32_t sz = taosArrayGetSize(sws->stack); + if (sz != 0) { + StreamState *s = taosArrayGet(sws->stack, sz - 1); + if (inclusize) { + s->trans -= 1; + taosArrayPop(sws->inp); + } else { + FstNode *n = s->node; + uint64_t trans = s->trans; + FstTransition trn; + fstNodeGetTransitionAt(n, trans - 1, &trn); + StreamState s = {.node = fstGetNode(sws->fst, trn.addr), + .trans= 0, + .out = {.null = false, .out = out}, + .autState = autState}; + taosArrayPush(sws->stack, &s); + return true; + } + return false; + } +} + diff --git a/source/libs/index/src/index_fst_util.c b/source/libs/index/src/index_fst_util.c index 94bf650acd..952c710676 100644 --- a/source/libs/index/src/index_fst_util.c +++ b/source/libs/index/src/index_fst_util.c @@ -95,6 +95,7 @@ FstSlice fstSliceCreate(uint8_t *data, uint64_t dLen) { FstSlice slice = {.data = data, .dLen = dLen, .start = 0, .end = dLen - 1}; return slice; } +// just shallow copy FstSlice fstSliceCopy(FstSlice *slice, int32_t start, int32_t end) { FstSlice t; if (start >= slice->dLen || end >= slice->dLen || start > end) {