From fbb9d515bf6cce41254c728bacbe225735ee7626 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Tue, 30 Nov 2021 14:51:24 +0800 Subject: [PATCH] update fst core struct --- source/libs/index/inc/index_fst.h | 13 ++- source/libs/index/inc/index_fst_automation.h | 6 +- source/libs/index/src/index_fst.c | 101 +++++++++++++++++-- 3 files changed, 108 insertions(+), 12 deletions(-) diff --git a/source/libs/index/inc/index_fst.h b/source/libs/index/inc/index_fst.h index 35858bb789..bcb7070755 100644 --- a/source/libs/index/inc/index_fst.h +++ b/source/libs/index/inc/index_fst.h @@ -291,6 +291,8 @@ typedef struct StreamState { void *autState; } StreamState; +void streamStateDestroy(void *s); + typedef struct StreamWithState { Fst *fst; Automation *aut; @@ -300,9 +302,18 @@ typedef struct StreamWithState { FstBoundWithData *endAt; } StreamWithState ; +typedef struct StreamWithStateResult { + FstSlice data; + FstOutput out; + void *state; + +} StreamWithStateResult; + +StreamWithStateResult *swsResultCreate(FstSlice *data, FstOutput fOut, void *state); + typedef void* (*StreamCallback)(void *); StreamWithState *streamWithStateCreate(Fst *fst, Automation *automation, FstBoundWithData *min, FstBoundWithData *max) ; void streamWithStateDestroy(StreamWithState *sws); bool streamWithStateSeekMin(StreamWithState *sws, FstBoundWithData *min); -void *streamWithStateNextWith(StreamWithState *sws, StreamCallback callback); +StreamWithStateResult* streamWithStateNextWith(StreamWithState *sws, StreamCallback callback); #endif diff --git a/source/libs/index/inc/index_fst_automation.h b/source/libs/index/inc/index_fst_automation.h index 63243a0189..c9df9c219e 100644 --- a/source/libs/index/inc/index_fst_automation.h +++ b/source/libs/index/inc/index_fst_automation.h @@ -34,11 +34,11 @@ typedef struct AutomationCtx { typedef struct Automation { void* (*start)() ; - bool (*isMatch)(); + bool (*isMatch)(void *); bool (*canMatch)(void *data); bool (*willAlwaysMatch)(void *state); - void* (*accpet)(void *state, uint8_t byte); - void* (*accpetEof)(void *state); + void* (*accept)(void *state, uint8_t byte); + void* (*acceptEof)(void *state); void *data; } Automation; diff --git a/source/libs/index/src/index_fst.c b/source/libs/index/src/index_fst.c index 6c777e704e..315253d907 100644 --- a/source/libs/index/src/index_fst.c +++ b/source/libs/index/src/index_fst.c @@ -1014,16 +1014,15 @@ bool fstGet(Fst *fst, FstSlice *b, Output *out) { } FstNode *fstGetRoot(Fst *fst) { - CompiledAddr root = fstGetRootAddr(fst); - return fstGetNode(fst, root); -} -FstNode* fstGetNode(Fst *fst, CompiledAddr addr) { if (fst->root != NULL) { return fst->root; } - fst->root = fstNodeCreate(fst->meta->version, addr, fst->data); - + CompiledAddr rAddr = fstGetRootAddr(fst); + fst->root = fstGetNode(fst, rAddr); return fst->root; +} +FstNode* fstGetNode(Fst *fst, CompiledAddr addr) { + return fstNodeCreate(fst->meta->version, addr, fst->data); } FstType fstGetType(Fst *fst) { @@ -1113,7 +1112,7 @@ void streamWithStateDestroy(StreamWithState *sws) { if (sws == NULL) { return; } taosArrayDestroy(sws->inp); - taosArrayDestroy(sws->stack); + taosArrayDestroyEx(sws->stack, streamStateDestroy); free(sws); } @@ -1154,7 +1153,7 @@ bool streamWithStateSeekMin(StreamWithState *sws, FstBoundWithData *min) { FstTransition trn; fstNodeGetTransitionAt(node, res, &trn); void *preState = autState; - autState = sws->aut->accpet(preState, b); + autState = sws->aut->accept(preState, b); taosArrayPush(sws->inp, &b); StreamState s = {.node = node, .trans = res + 1, @@ -1209,4 +1208,90 @@ bool streamWithStateSeekMin(StreamWithState *sws, FstBoundWithData *min) { } } +StreamWithStateResult *streamWithStateNextWith(StreamWithState *sws, StreamCallback callback) { + FstOutput output = sws->emptyOutput; + if (output.null == false) { + FstSlice emptySlice = fstSliceCreate(NULL, 0); + if (fstBoundWithDataExceededBy(sws->endAt, &emptySlice)) { + taosArrayDestroyEx(sws->stack, streamStateDestroy); + sws->stack = (SArray *)taosArrayInit(256, sizeof(StreamState)); + return NULL; + } + void* start = sws->aut->start(); + if (sws->aut->isMatch(start)) { + FstSlice s = fstSliceCreate(NULL, 0); + return swsResultCreate(&s, output, callback(start)); + } + } + while (taosArrayGetSize(sws->stack) > 0) { + StreamState *p = (StreamState *)taosArrayPop(sws->stack); + if (p->trans >= FST_NODE_LEN(p->node) || !sws->aut->canMatch(p->autState)) { + if (FST_NODE_ADDR(p->node) != fstGetRootAddr(sws->fst)) { + taosArrayPop(sws->inp); + } + streamStateDestroy(p); + continue; + } + FstTransition trn; + fstNodeGetTransitionAt(p->node, p->trans, &trn); + Output out = p->out.out + trn.out; + void* nextState = sws->aut->accept(p->autState, trn.inp); + void* tState = callback(nextState); + bool isMatch = sws->aut->isMatch(nextState); + FstNode *nextNode = fstGetNode(sws->fst, trn.addr); + taosArrayPush(sws->inp, &(trn.inp)); + + if (FST_NODE_IS_FINAL(nextNode)) { + void *eofState = sws->aut->acceptEof(nextState); + if (eofState != NULL) { + isMatch = sws->aut->isMatch(eofState); + } + } + StreamState s1 = { .node = p->node, .trans = p->trans + 1, .out = p->out, .autState = p->autState}; + taosArrayPush(sws->stack, &s1); + + StreamState s2 = {.node = nextNode, .trans = 0, .out = {.null = false, .out = out}, .autState = nextState}; + taosArrayPush(sws->stack, &s2); + + uint8_t *buf = (uint8_t *)malloc(taosArrayGetSize(sws->inp) * sizeof(uint8_t)); + for (uint32_t i = 0; i < taosArrayGetSize(sws->inp); i++) { + uint8_t *t = (uint8_t *)taosArrayGet(sws->inp, i); + buf[i] = *t; + } + FstSlice slice = fstSliceCreate(buf, taosArrayGetSize(sws->inp)); + if (fstBoundWithDataExceededBy(sws->endAt, &slice)) { + taosArrayDestroyEx(sws->stack, streamStateDestroy); + sws->stack = (SArray *)taosArrayInit(256, sizeof(StreamState)); + return NULL; + } + if (FST_NODE_IS_FINAL(nextNode) && isMatch) { + FstOutput fOutput = {.null = false, out = out + FST_NODE_FINAL_OUTPUT(nextNode)}; + return swsResultCreate(&slice, fOutput , tState); + } + } + return NULL; + +} + +StreamWithStateResult *swsResultCreate(FstSlice *data, FstOutput fOut, void *state) { + StreamWithStateResult *result = calloc(1, sizeof(StreamWithStateResult)); + if (result == NULL) { return NULL; } + + FstSlice slice = fstSliceCopy(data, 0, data->dLen - 1); + result->data = slice; + result->out = fOut; + result->state = state; + + return result; + +} +void streamStateDestroy(void *s) { + if (NULL == s) { return; } + StreamState *ss = (StreamState *)s; + + fstNodeDestroy(ss->node); + //free(s->autoState); +} + +