From 32ac2d02c87d7f5ba46db2204fa4f78289b199ae Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Mon, 29 Nov 2021 10:54:49 +0800 Subject: [PATCH 1/6] update fst core struct --- source/libs/index/inc/index_fst.h | 4 +++ source/libs/index/inc/index_fst_node.h | 2 +- source/libs/index/src/index_fst.c | 5 +++ source/libs/index/src/index_fst_node.c | 43 +++++++++++++------------- 4 files changed, 32 insertions(+), 22 deletions(-) diff --git a/source/libs/index/inc/index_fst.h b/source/libs/index/inc/index_fst.h index 37feb79ac8..01be2f8b2b 100644 --- a/source/libs/index/inc/index_fst.h +++ b/source/libs/index/inc/index_fst.h @@ -255,4 +255,8 @@ FstType fstGetType(Fst *fst); CompiledAddr fstGetRootAddr(Fst *fst); Output fstEmptyFinalOutput(Fst *fst, bool *null); bool fstVerify(Fst *fst); + + +//refactor this function +bool fstBuilderNodeCompileTo(FstBuilderNode *b, FstCountingWriter *wrt, CompiledAddr lastAddr, CompiledAddr startAddr); #endif diff --git a/source/libs/index/inc/index_fst_node.h b/source/libs/index/inc/index_fst_node.h index ddd7e1f450..0645aa1158 100644 --- a/source/libs/index/inc/index_fst_node.h +++ b/source/libs/index/inc/index_fst_node.h @@ -41,7 +41,7 @@ FstBuilderNode *fstBuilderNodeClone(FstBuilderNode *src); void fstBuilderNodeCloneFrom(FstBuilderNode *dst, FstBuilderNode *src); -bool fstBuilderNodeCompileTo(FstBuilderNode *b, FstCountingWriter *wrt, CompiledAddr lastAddr, CompiledAddr startAddr); +//bool fstBuilderNodeCompileTo(FstBuilderNode *b, FstCountingWriter *wrt, CompiledAddr lastAddr, CompiledAddr startAddr); void fstBuilderNodeDestroy(FstBuilderNode *node); diff --git a/source/libs/index/src/index_fst.c b/source/libs/index/src/index_fst.c index 6c1ea8cfeb..f9cb68baa9 100644 --- a/source/libs/index/src/index_fst.c +++ b/source/libs/index/src/index_fst.c @@ -733,6 +733,11 @@ bool fstNodeCompile(FstNode *node, void *w, CompiledAddr lastAddr, CompiledAddr return true; } +bool fstBuilderNodeCompileTo(FstBuilderNode *b, FstCountingWriter *wrt, CompiledAddr lastAddr, CompiledAddr startAddr) { + return fstNodeCompile(NULL, wrt, lastAddr, startAddr, b); +} + + FstBuilder *fstBuilderCreate(void *w, FstType ty) { FstBuilder *b = malloc(sizeof(FstBuilder)); diff --git a/source/libs/index/src/index_fst_node.c b/source/libs/index/src/index_fst_node.c index 5452f9cb89..b33b8e4428 100644 --- a/source/libs/index/src/index_fst_node.c +++ b/source/libs/index/src/index_fst_node.c @@ -59,26 +59,27 @@ void fstBuilderNodeCloneFrom(FstBuilderNode *dst, FstBuilderNode *src) { src->trans = NULL; } -bool fstBuilderNodeCompileTo(FstBuilderNode *b, FstCountingWriter *wrt, CompiledAddr lastAddr, CompiledAddr startAddr) { - size_t sz = taosArrayGetSize(b->trans); - assert(sz < 256); - if (FST_BUILDER_NODE_IS_FINAL(b) - && FST_BUILDER_NODE_TRANS_ISEMPTY(b) - && FST_BUILDER_NODE_FINALOUTPUT_ISZERO(b)) { - return true; - } else if (sz != 1 || b->isFinal) { - // AnyTrans->Compile(w, addr, node); - } else { - FstTransition *tran = taosArrayGet(b->trans, 0); - if (tran->addr == lastAddr && tran->out == 0) { - //OneTransNext::compile(w, lastAddr, tran->inp); - return true; - } else { - //OneTrans::Compile(w, lastAddr, *tran); - return true; - } - } - return true; -} +//bool fstBuilderNodeCompileTo(FstBuilderNode *b, FstCountingWriter *wrt, CompiledAddr lastAddr, CompiledAddr startAddr) { + + //size_t sz = taosArrayGetSize(b->trans); + //assert(sz < 256); + //if (FST_BUILDER_NODE_IS_FINAL(b) + // && FST_BUILDER_NODE_TRANS_ISEMPTY(b) + // && FST_BUILDER_NODE_FINALOUTPUT_ISZERO(b)) { + // return true; + //} else if (sz != 1 || b->isFinal) { + // // AnyTrans->Compile(w, addr, node); + //} else { + // FstTransition *tran = taosArrayGet(b->trans, 0); + // if (tran->addr == lastAddr && tran->out == 0) { + // //OneTransNext::compile(w, lastAddr, tran->inp); + // return true; + // } else { + // //OneTrans::Compile(w, lastAddr, *tran); + // return true; + // } + //} + //return true; +//} From a09dcbdd85fd343f49963d561bc3c805bd2479d4 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Mon, 29 Nov 2021 10:59:51 +0800 Subject: [PATCH 2/6] update fst core struct --- source/libs/index/src/index_fst.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/source/libs/index/src/index_fst.c b/source/libs/index/src/index_fst.c index f9cb68baa9..1ee364d4ad 100644 --- a/source/libs/index/src/index_fst.c +++ b/source/libs/index/src/index_fst.c @@ -1026,10 +1026,10 @@ bool fstVerify(Fst *fst) { uint32_t checkSum = fst->meta->checkSum; FstSlice *data = fst->data; TSCKSUM initSum = 0; - if (taosCheckChecksumWhole(data->data, data->dLen)) { + if (!taosCheckChecksumWhole(data->data, data->dLen)) { return false; } - + return true; } From 2082e86476bbe7778e289013a7684619f7ad4e05 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Mon, 29 Nov 2021 14:01:25 +0800 Subject: [PATCH 3/6] update fst core struct --- source/libs/index/inc/index_fst.h | 2 ++ source/libs/index/src/index_fst.c | 27 +++++++++++++++++++++++++-- 2 files changed, 27 insertions(+), 2 deletions(-) diff --git a/source/libs/index/inc/index_fst.h b/source/libs/index/inc/index_fst.h index 01be2f8b2b..8f76c2d5cf 100644 --- a/source/libs/index/inc/index_fst.h +++ b/source/libs/index/inc/index_fst.h @@ -251,8 +251,10 @@ void fstDestroy(Fst *fst); bool fstGet(Fst *fst, FstSlice *b, Output *out); FstNode* fstGetNode(Fst *fst, CompiledAddr); +FstNode* fstGetRoot(Fst *fst); FstType fstGetType(Fst *fst); CompiledAddr fstGetRootAddr(Fst *fst); + Output fstEmptyFinalOutput(Fst *fst, bool *null); bool fstVerify(Fst *fst); diff --git a/source/libs/index/src/index_fst.c b/source/libs/index/src/index_fst.c index 1ee364d4ad..94be05d0c1 100644 --- a/source/libs/index/src/index_fst.c +++ b/source/libs/index/src/index_fst.c @@ -990,15 +990,39 @@ void fstDestroy(Fst *fst) { } bool fstGet(Fst *fst, FstSlice *b, Output *out) { - + FstNode *root = fstGetRoot(fst); + Output tOut = 0; + for (uint32_t i = 0; i < b->dLen; i++) { + uint8_t inp = b->data[i]; + Output res = 0; + bool null = fstNodeFindInput(root, inp, &res); + if (null) { return false; } + + FstTransition trn; + fstNodeGetTransitionAt(root, res, &trn); + tOut += trn.out; + root = fstGetNode(fst, trn.addr); + } + if (!FST_NODE_IS_FINAL(root)) { + return false; + } else { + tOut = tOut + FST_NODE_FINAL_OUTPUT(root); + } + *out = tOut; + return false; } +FstNode *fstGetRoot(Fst *fst) { + CompiledAddr root = fstGetRootAddr(fst); + return fstGetNode(fst, root); +} FstNode* fstGetNode(Fst *fst, CompiledAddr addr) { if (fst->root != NULL) { return fst->root; } fst->root = fstNodeCreate(fst->meta->version, addr, fst->data); + return fst->root; } @@ -1021,7 +1045,6 @@ Output fstEmptyFinalOutput(Fst *fst, bool *null) { return res; } - bool fstVerify(Fst *fst) { uint32_t checkSum = fst->meta->checkSum; FstSlice *data = fst->data; From 23c9ea680b3097cf78a3753ad2104716f680bc62 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Mon, 29 Nov 2021 23:37:21 +0800 Subject: [PATCH 4/6] update fst core struct --- source/libs/index/inc/index_fst.h | 46 +++++- source/libs/index/inc/index_fst_automation.h | 21 ++- source/libs/index/src/index_fst.c | 154 +++++++++++++++++++ source/libs/index/src/index_fst_util.c | 1 + 4 files changed, 213 insertions(+), 9 deletions(-) diff --git a/source/libs/index/inc/index_fst.h b/source/libs/index/inc/index_fst.h index 8f76c2d5cf..35858bb789 100644 --- a/source/libs/index/inc/index_fst.h +++ b/source/libs/index/inc/index_fst.h @@ -21,6 +21,7 @@ #include "index_fst_util.h" #include "index_fst_registry.h" #include "index_fst_counting_writer.h" +#include "index_fst_automation.h" typedef struct FstNode FstNode; @@ -34,11 +35,28 @@ typedef struct FstRange { typedef enum { OneTransNext, OneTrans, AnyTrans, EmptyFinal} State; -typedef enum { Included, Excluded, Unbounded} FstBound; typedef enum {Ordered, OutOfOrdered, DuplicateKey} OrderType; +typedef enum { Included, Excluded, Unbounded} FstBound; +typedef struct FstBoundWithData { + FstSlice data; + FstBound type; +} FstBoundWithData; + +FstBoundWithData* fstBoundStateCreate(FstBound type, FstSlice *data); +bool fstBoundWithDataExceededBy(FstBoundWithData *bound, FstSlice *slice); +bool fstBoundWithDataIsEmpty(FstBoundWithData *bound); +bool fstBoundWithDataIsIncluded(FstBoundWithData *bound); + + +typedef struct FstOutput { + bool null; + Output out; +} FstOutput; + + /* * @@ -261,4 +279,30 @@ bool fstVerify(Fst *fst); //refactor this function bool fstBuilderNodeCompileTo(FstBuilderNode *b, FstCountingWriter *wrt, CompiledAddr lastAddr, CompiledAddr startAddr); + + + + + +typedef struct StreamState { + FstNode *node; + uint64_t trans; + FstOutput out; + void *autState; +} StreamState; + +typedef struct StreamWithState { + Fst *fst; + Automation *aut; + SArray *inp; + FstOutput emptyOutput; + SArray *stack; // + FstBoundWithData *endAt; +} StreamWithState ; + +typedef void* (*StreamCallback)(void *); +StreamWithState *streamWithStateCreate(Fst *fst, Automation *automation, FstBoundWithData *min, FstBoundWithData *max) ; +void streamWithStateDestroy(StreamWithState *sws); +bool streamWithStateSeekMin(StreamWithState *sws, FstBoundWithData *min); +void *streamWithStateNextWith(StreamWithState *sws, StreamCallback callback); #endif diff --git a/source/libs/index/inc/index_fst_automation.h b/source/libs/index/inc/index_fst_automation.h index 7ad9a500cc..63243a0189 100644 --- a/source/libs/index/inc/index_fst_automation.h +++ b/source/libs/index/inc/index_fst_automation.h @@ -15,7 +15,7 @@ #ifndef __INDEX_FST_AUTAOMATION_H__ #define __INDEX_FST_AUTAOMATION_H__ -struct AutomationCtx; +typedef struct AutomationCtx AutomationCtx; typedef struct StartWith { AutomationCtx *autoSelf; @@ -23,20 +23,25 @@ typedef struct StartWith { typedef struct Complement { AutomationCtx *autoSelf; + } Complement; // automation typedef struct AutomationCtx { +// automation interface void *data; } AutomationCtx; -// automation interface -void (*start)(AutomationCtx *ctx); -bool (*isMatch)(AutomationCtx *ctx); -bool (*canMatch)(AutomationCtx *ctx, void *data); -bool (*willAlwaysMatch)(AutomationCtx *ctx, void *state); -void* (*accpet)(AutomationCtx *ctx, void *state, uint8_t byte); -void* (*accpetEof)(AutomationCtx *ctx, *state); +typedef struct Automation { + void* (*start)() ; + bool (*isMatch)(); + bool (*canMatch)(void *data); + bool (*willAlwaysMatch)(void *state); + void* (*accpet)(void *state, uint8_t byte); + void* (*accpetEof)(void *state); + void *data; +} Automation; + #endif diff --git a/source/libs/index/src/index_fst.c b/source/libs/index/src/index_fst.c index 94be05d0c1..6c777e704e 100644 --- a/source/libs/index/src/index_fst.c +++ b/source/libs/index/src/index_fst.c @@ -1055,4 +1055,158 @@ bool fstVerify(Fst *fst) { return true; } +// data bound function +FstBoundWithData* fstBoundStateCreate(FstBound type, FstSlice *data) { + FstBoundWithData *b = calloc(1, sizeof(FstBoundWithData)); + if (b == NULL) { return NULL; } + + b->type = type; + b->data = fstSliceCopy(data, data->start, data->end); + return b; +} + +bool fstBoundWithDataExceededBy(FstBoundWithData *bound, FstSlice *slice) { + int comp = fstSliceCompare(slice, &bound->data); + if (bound->type == Included) { + return comp > 0 ? true : false; + } else if (bound->type == Excluded) { + return comp >= 0 ? true : false; + } else { + return true; + } +} +bool fstBoundWithDataIsEmpty(FstBoundWithData *bound) { + if (bound->type == Unbounded) { + return true; + } else { + return fstSliceEmpty(&bound->data); + } +} + + +bool fstBoundWithDataIsIncluded(FstBoundWithData *bound) { + return bound->type == Included ? true : false; +} + +void fstBoundDestroy(FstBoundWithData *bound) { + free(bound); +} + +StreamWithState *streamWithStateCreate(Fst *fst, Automation *automation, FstBoundWithData *min, FstBoundWithData *max) { + StreamWithState *sws = calloc(1, sizeof(StreamWithState)); + if (sws == NULL) { return NULL; } + + sws->fst = fst; + sws->aut = automation; + sws->inp = (SArray *)taosArrayInit(256, sizeof(uint8_t)); + + sws->emptyOutput.null = false; + sws->emptyOutput.out = 0; + + sws->stack = (SArray *)taosArrayInit(256, sizeof(StreamState)); + sws->endAt = max; + streamWithStateSeekMin(sws, min); + + return sws; +} +void streamWithStateDestroy(StreamWithState *sws) { + if (sws == NULL) { return; } + + taosArrayDestroy(sws->inp); + taosArrayDestroy(sws->stack); + + free(sws); +} + +bool streamWithStateSeekMin(StreamWithState *sws, FstBoundWithData *min) { + if (fstBoundWithDataIsEmpty(min)) { + if (fstBoundWithDataIsIncluded(min)) { + sws->emptyOutput.out = fstEmptyFinalOutput(sws->fst, &(sws->emptyOutput.null)); + } + StreamState s = {.node = fstGetRoot(sws->fst), + .trans = 0, + .out = {.null = false, .out = 0}, + .autState = sws->aut->start()}; // auto.start callback + taosArrayPush(sws->stack, &s); + return true; + } + FstSlice *key = NULL; + bool inclusize = false;; + + if (min->type == Included) { + key = &min->data; + inclusize = true; + } else if (min->type == Excluded) { + key = &min->data; + } else { + return false; + } + + FstNode *node = fstGetRoot(sws->fst); + Output out = 0; + void* autState = sws->aut->start(); + + for (uint32_t i = 0; i < key->dLen; i++) { + uint8_t b = key->data[i]; + uint64_t res = 0; + bool null = fstNodeFindInput(node, b, &res); + if (null == false) { + FstTransition trn; + fstNodeGetTransitionAt(node, res, &trn); + void *preState = autState; + autState = sws->aut->accpet(preState, b); + taosArrayPush(sws->inp, &b); + StreamState s = {.node = node, + .trans = res + 1, + .out = {.null = false, .out = out}, + .autState = preState}; + taosArrayPush(sws->stack, &s); + out += trn.out; + node = fstGetNode(sws->fst, trn.addr); + } else { + + // This is a little tricky. We're in this case if the + // given bound is not a prefix of any key in the FST. + // Since this is a minimum bound, we need to find the + // first transition in this node that proceeds the current + // input byte. + FstTransitions *trans = fstNodeTransitions(node); + uint64_t i = 0; + for (i = trans->range.start; i < trans->range.end; i++) { + FstTransition trn; + if (fstNodeGetTransitionAt(node, i, &trn) && trn.inp > b) { + break; + } + } + + StreamState s = {.node = node, + .trans = i, + .out = {.null = false, .out = out}, + .autState = autState}; + taosArrayPush(sws->stack, &s); + return true; + } + } + uint32_t sz = taosArrayGetSize(sws->stack); + if (sz != 0) { + StreamState *s = taosArrayGet(sws->stack, sz - 1); + if (inclusize) { + s->trans -= 1; + taosArrayPop(sws->inp); + } else { + FstNode *n = s->node; + uint64_t trans = s->trans; + FstTransition trn; + fstNodeGetTransitionAt(n, trans - 1, &trn); + StreamState s = {.node = fstGetNode(sws->fst, trn.addr), + .trans= 0, + .out = {.null = false, .out = out}, + .autState = autState}; + taosArrayPush(sws->stack, &s); + return true; + } + return false; + } +} + diff --git a/source/libs/index/src/index_fst_util.c b/source/libs/index/src/index_fst_util.c index 94bf650acd..952c710676 100644 --- a/source/libs/index/src/index_fst_util.c +++ b/source/libs/index/src/index_fst_util.c @@ -95,6 +95,7 @@ FstSlice fstSliceCreate(uint8_t *data, uint64_t dLen) { FstSlice slice = {.data = data, .dLen = dLen, .start = 0, .end = dLen - 1}; return slice; } +// just shallow copy FstSlice fstSliceCopy(FstSlice *slice, int32_t start, int32_t end) { FstSlice t; if (start >= slice->dLen || end >= slice->dLen || start > end) { From fbb9d515bf6cce41254c728bacbe225735ee7626 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Tue, 30 Nov 2021 14:51:24 +0800 Subject: [PATCH 5/6] update fst core struct --- source/libs/index/inc/index_fst.h | 13 ++- source/libs/index/inc/index_fst_automation.h | 6 +- source/libs/index/src/index_fst.c | 101 +++++++++++++++++-- 3 files changed, 108 insertions(+), 12 deletions(-) diff --git a/source/libs/index/inc/index_fst.h b/source/libs/index/inc/index_fst.h index 35858bb789..bcb7070755 100644 --- a/source/libs/index/inc/index_fst.h +++ b/source/libs/index/inc/index_fst.h @@ -291,6 +291,8 @@ typedef struct StreamState { void *autState; } StreamState; +void streamStateDestroy(void *s); + typedef struct StreamWithState { Fst *fst; Automation *aut; @@ -300,9 +302,18 @@ typedef struct StreamWithState { FstBoundWithData *endAt; } StreamWithState ; +typedef struct StreamWithStateResult { + FstSlice data; + FstOutput out; + void *state; + +} StreamWithStateResult; + +StreamWithStateResult *swsResultCreate(FstSlice *data, FstOutput fOut, void *state); + typedef void* (*StreamCallback)(void *); StreamWithState *streamWithStateCreate(Fst *fst, Automation *automation, FstBoundWithData *min, FstBoundWithData *max) ; void streamWithStateDestroy(StreamWithState *sws); bool streamWithStateSeekMin(StreamWithState *sws, FstBoundWithData *min); -void *streamWithStateNextWith(StreamWithState *sws, StreamCallback callback); +StreamWithStateResult* streamWithStateNextWith(StreamWithState *sws, StreamCallback callback); #endif diff --git a/source/libs/index/inc/index_fst_automation.h b/source/libs/index/inc/index_fst_automation.h index 63243a0189..c9df9c219e 100644 --- a/source/libs/index/inc/index_fst_automation.h +++ b/source/libs/index/inc/index_fst_automation.h @@ -34,11 +34,11 @@ typedef struct AutomationCtx { typedef struct Automation { void* (*start)() ; - bool (*isMatch)(); + bool (*isMatch)(void *); bool (*canMatch)(void *data); bool (*willAlwaysMatch)(void *state); - void* (*accpet)(void *state, uint8_t byte); - void* (*accpetEof)(void *state); + void* (*accept)(void *state, uint8_t byte); + void* (*acceptEof)(void *state); void *data; } Automation; diff --git a/source/libs/index/src/index_fst.c b/source/libs/index/src/index_fst.c index 6c777e704e..315253d907 100644 --- a/source/libs/index/src/index_fst.c +++ b/source/libs/index/src/index_fst.c @@ -1014,16 +1014,15 @@ bool fstGet(Fst *fst, FstSlice *b, Output *out) { } FstNode *fstGetRoot(Fst *fst) { - CompiledAddr root = fstGetRootAddr(fst); - return fstGetNode(fst, root); -} -FstNode* fstGetNode(Fst *fst, CompiledAddr addr) { if (fst->root != NULL) { return fst->root; } - fst->root = fstNodeCreate(fst->meta->version, addr, fst->data); - + CompiledAddr rAddr = fstGetRootAddr(fst); + fst->root = fstGetNode(fst, rAddr); return fst->root; +} +FstNode* fstGetNode(Fst *fst, CompiledAddr addr) { + return fstNodeCreate(fst->meta->version, addr, fst->data); } FstType fstGetType(Fst *fst) { @@ -1113,7 +1112,7 @@ void streamWithStateDestroy(StreamWithState *sws) { if (sws == NULL) { return; } taosArrayDestroy(sws->inp); - taosArrayDestroy(sws->stack); + taosArrayDestroyEx(sws->stack, streamStateDestroy); free(sws); } @@ -1154,7 +1153,7 @@ bool streamWithStateSeekMin(StreamWithState *sws, FstBoundWithData *min) { FstTransition trn; fstNodeGetTransitionAt(node, res, &trn); void *preState = autState; - autState = sws->aut->accpet(preState, b); + autState = sws->aut->accept(preState, b); taosArrayPush(sws->inp, &b); StreamState s = {.node = node, .trans = res + 1, @@ -1209,4 +1208,90 @@ bool streamWithStateSeekMin(StreamWithState *sws, FstBoundWithData *min) { } } +StreamWithStateResult *streamWithStateNextWith(StreamWithState *sws, StreamCallback callback) { + FstOutput output = sws->emptyOutput; + if (output.null == false) { + FstSlice emptySlice = fstSliceCreate(NULL, 0); + if (fstBoundWithDataExceededBy(sws->endAt, &emptySlice)) { + taosArrayDestroyEx(sws->stack, streamStateDestroy); + sws->stack = (SArray *)taosArrayInit(256, sizeof(StreamState)); + return NULL; + } + void* start = sws->aut->start(); + if (sws->aut->isMatch(start)) { + FstSlice s = fstSliceCreate(NULL, 0); + return swsResultCreate(&s, output, callback(start)); + } + } + while (taosArrayGetSize(sws->stack) > 0) { + StreamState *p = (StreamState *)taosArrayPop(sws->stack); + if (p->trans >= FST_NODE_LEN(p->node) || !sws->aut->canMatch(p->autState)) { + if (FST_NODE_ADDR(p->node) != fstGetRootAddr(sws->fst)) { + taosArrayPop(sws->inp); + } + streamStateDestroy(p); + continue; + } + FstTransition trn; + fstNodeGetTransitionAt(p->node, p->trans, &trn); + Output out = p->out.out + trn.out; + void* nextState = sws->aut->accept(p->autState, trn.inp); + void* tState = callback(nextState); + bool isMatch = sws->aut->isMatch(nextState); + FstNode *nextNode = fstGetNode(sws->fst, trn.addr); + taosArrayPush(sws->inp, &(trn.inp)); + + if (FST_NODE_IS_FINAL(nextNode)) { + void *eofState = sws->aut->acceptEof(nextState); + if (eofState != NULL) { + isMatch = sws->aut->isMatch(eofState); + } + } + StreamState s1 = { .node = p->node, .trans = p->trans + 1, .out = p->out, .autState = p->autState}; + taosArrayPush(sws->stack, &s1); + + StreamState s2 = {.node = nextNode, .trans = 0, .out = {.null = false, .out = out}, .autState = nextState}; + taosArrayPush(sws->stack, &s2); + + uint8_t *buf = (uint8_t *)malloc(taosArrayGetSize(sws->inp) * sizeof(uint8_t)); + for (uint32_t i = 0; i < taosArrayGetSize(sws->inp); i++) { + uint8_t *t = (uint8_t *)taosArrayGet(sws->inp, i); + buf[i] = *t; + } + FstSlice slice = fstSliceCreate(buf, taosArrayGetSize(sws->inp)); + if (fstBoundWithDataExceededBy(sws->endAt, &slice)) { + taosArrayDestroyEx(sws->stack, streamStateDestroy); + sws->stack = (SArray *)taosArrayInit(256, sizeof(StreamState)); + return NULL; + } + if (FST_NODE_IS_FINAL(nextNode) && isMatch) { + FstOutput fOutput = {.null = false, out = out + FST_NODE_FINAL_OUTPUT(nextNode)}; + return swsResultCreate(&slice, fOutput , tState); + } + } + return NULL; + +} + +StreamWithStateResult *swsResultCreate(FstSlice *data, FstOutput fOut, void *state) { + StreamWithStateResult *result = calloc(1, sizeof(StreamWithStateResult)); + if (result == NULL) { return NULL; } + + FstSlice slice = fstSliceCopy(data, 0, data->dLen - 1); + result->data = slice; + result->out = fOut; + result->state = state; + + return result; + +} +void streamStateDestroy(void *s) { + if (NULL == s) { return; } + StreamState *ss = (StreamState *)s; + + fstNodeDestroy(ss->node); + //free(s->autoState); +} + + From 2e859aa138252eb5fb1d8262a268fe12782ca57a Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Tue, 30 Nov 2021 14:36:24 +0800 Subject: [PATCH 6/6] refactor wal --- include/libs/wal/wal.h | 31 +++++---- source/libs/wal/src/walIndex.c | 48 ++++++++++--- source/libs/wal/src/walMgmt.c | 53 +++++++++++++- source/libs/wal/src/{wal.c => walRead.c} | 24 +++---- source/libs/wal/src/walWrite.c | 88 +++++++++--------------- 5 files changed, 152 insertions(+), 92 deletions(-) rename source/libs/wal/src/{wal.c => walRead.c} (65%) diff --git a/include/libs/wal/wal.h b/include/libs/wal/wal.h index 4d5b4d977a..e77540bf90 100644 --- a/include/libs/wal/wal.h +++ b/include/libs/wal/wal.h @@ -18,6 +18,7 @@ #include "os.h" #include "tdef.h" #include "tlog.h" +#include "tarray.h" #ifdef __cplusplus extern "C" { #endif @@ -39,12 +40,14 @@ typedef enum { typedef struct { int8_t sver; - int8_t reserved[3]; + uint8_t msgType; + int8_t reserved[2]; int32_t len; int64_t version; uint32_t signature; - uint32_t cksum; - char cont[]; + uint32_t cksumHead; + uint32_t cksumBody; + //char cont[]; } SWalHead; typedef struct { @@ -54,16 +57,20 @@ typedef struct { } SWalCfg; #define WAL_PREFIX "wal" +#define WAL_PREFIX_LEN 3 +#define WAL_NOSUFFIX_LEN 20 +#define WAL_SUFFIX_AT (WAL_NOSUFFIX_LEN+1) #define WAL_LOG_SUFFIX "log" #define WAL_INDEX_SUFFIX "idx" -#define WAL_PREFIX_LEN 3 #define WAL_REFRESH_MS 1000 #define WAL_MAX_SIZE (TSDB_MAX_WAL_SIZE + sizeof(SWalHead) + 16) #define WAL_SIGNATURE ((uint32_t)(0xFAFBFDFEUL)) #define WAL_PATH_LEN (TSDB_FILENAME_LEN + 12) #define WAL_FILE_LEN (WAL_PATH_LEN + 32) //#define WAL_FILE_NUM 1 // 3 +#define WAL_FILESET_MAX 128 +#define WAL_IDX_ENTRY_SIZE (sizeof(int64_t)*2) #define WAL_CUR_POS_READ_ONLY 1 #define WAL_CUR_FILE_READ_ONLY 2 @@ -94,6 +101,10 @@ typedef struct SWal { pthread_mutex_t mutex; //path char path[WAL_PATH_LEN]; + //file set + SArray* fileSet; + //reusable write head + SWalHead head; } SWal; // WAL HANDLE typedef int32_t (*FWalWrite)(void *ahandle, void *pHead); @@ -104,21 +115,20 @@ void walCleanUp(); // handle open and ctl SWal *walOpen(const char *path, SWalCfg *pCfg); -void walStop(SWal *pWal); int32_t walAlter(SWal *, SWalCfg *pCfg); void walClose(SWal *); // write //int64_t walWriteWithMsgType(SWal*, int8_t msgType, void* body, int32_t bodyLen); -int64_t walWrite(SWal *, int64_t index, void *body, int32_t bodyLen); -int64_t walWriteBatch(SWal *, void **bodies, int32_t *bodyLen, int32_t batchSize); +int64_t walWrite(SWal *, int64_t index, uint8_t msgType, void *body, int32_t bodyLen); +//int64_t walWriteBatch(SWal *, void **bodies, int32_t *bodyLen, int32_t batchSize); // apis for lifecycle management void walFsync(SWal *, bool force); int32_t walCommit(SWal *, int64_t ver); // truncate after int32_t walRollback(SWal *, int64_t ver); -// notify that previous log can be pruned safely +// notify that previous logs can be pruned safely int32_t walTakeSnapshot(SWal *, int64_t ver); //int32_t walDataCorrupted(SWal*); @@ -131,11 +141,6 @@ int64_t walGetFirstVer(SWal *); int64_t walGetSnapshotVer(SWal *); int64_t walGetLastVer(SWal *); -//internal -int32_t walGetNextFile(SWal *pWal, int64_t *nextFileId); -int32_t walGetOldFile(SWal *pWal, int64_t curFileId, int32_t minDiff, int64_t *oldFileId); -int32_t walGetNewFile(SWal *pWal, int64_t *newFileId); - #ifdef __cplusplus } #endif diff --git a/source/libs/wal/src/walIndex.c b/source/libs/wal/src/walIndex.c index e1fa8c72dd..2569af841f 100644 --- a/source/libs/wal/src/walIndex.c +++ b/source/libs/wal/src/walIndex.c @@ -20,16 +20,42 @@ #include "tfile.h" #include "walInt.h" -int walSetCurVerImpl(SWal *pWal, int64_t ver) { +int walSeekVerImpl(SWal *pWal, int64_t ver) { //close old file - //iterate all files - //open right file + int code = 0; + code = tfClose(pWal->curLogTfd); + if(code != 0) { + //TODO + } + code = tfClose(pWal->curIdxTfd); + if(code != 0) { + //TODO + } + //bsearch in fileSet + int fName = 0;//TODO + //open the right file + char fNameStr[WAL_FILE_LEN]; + sprintf(fNameStr, "%d."WAL_INDEX_SUFFIX, fName); + bool closed = 1; //TODO:read only + int64_t idxTfd = tfOpenReadWrite(fNameStr); + sprintf(fNameStr, "%d."WAL_LOG_SUFFIX, fName); + int64_t logTfd = tfOpenReadWrite(fNameStr); + //seek position + int64_t offset = (ver - fName) * WAL_IDX_ENTRY_SIZE; + tfLseek(idxTfd, offset, SEEK_SET); //set cur version, cur file version and cur status - return 0; + pWal->curFileFirstVersion = fName; + pWal->curFileLastVersion = 1;//TODO + pWal->curLogTfd = logTfd; + pWal->curIdxTfd = idxTfd; + pWal->curVersion = ver; + pWal->curOffset = offset; + pWal->curStatus = 0;//TODO + return code; } -int walSetCurVer(SWal *pWal, int64_t ver) { - if(ver > pWal->lastVersion + 1) { +int walSeekVer(SWal *pWal, int64_t ver) { + if(ver > pWal->lastVersion) { //TODO: some records are skipped return -1; } @@ -40,13 +66,19 @@ int walSetCurVer(SWal *pWal, int64_t ver) { if(ver < pWal->snapshotVersion) { //TODO: seek snapshotted log } + if(ver >= pWal->curFileFirstVersion + && ((pWal->curFileLastVersion == -1 && ver <= pWal->lastVersion) || (ver <= pWal->curFileLastVersion))) { + + } if(ver < pWal->curFileFirstVersion || (pWal->curFileLastVersion != -1 && ver > pWal->curFileLastVersion)) { + int index = 0; + index = 1; //back up to avoid inconsistency int64_t curVersion = pWal->curVersion; int64_t curOffset = pWal->curOffset; int64_t curFileFirstVersion = pWal->curFileFirstVersion; int64_t curFileLastVersion = pWal->curFileLastVersion; - if(walSetCurVerImpl(pWal, ver) < 0) { + if(walSeekVerImpl(pWal, ver) < 0) { //TODO: errno pWal->curVersion = curVersion; pWal->curOffset = curOffset; @@ -67,7 +99,7 @@ int walWriteIndex(SWal *pWal, int64_t ver, int64_t offset) { wError("vgId:%d, file:%"PRId64".idx, failed to open since %s", pWal->vgId, pWal->curFileFirstVersion, strerror(errno)); } if(pWal->curVersion != ver) { - if(walSetCurVer(pWal, ver) != 0) { + if(walSeekVer(pWal, ver) != 0) { //TODO: some records are skipped return -1; } diff --git a/source/libs/wal/src/walMgmt.c b/source/libs/wal/src/walMgmt.c index 4168c21a6e..7557be7cef 100644 --- a/source/libs/wal/src/walMgmt.c +++ b/source/libs/wal/src/walMgmt.c @@ -18,8 +18,17 @@ #include "taoserror.h" #include "tref.h" #include "tfile.h" +#include "compare.h" #include "walInt.h" +//internal +int32_t walGetNextFile(SWal *pWal, int64_t *nextFileId); +int32_t walGetOldFile(SWal *pWal, int64_t curFileId, int32_t minDiff, int64_t *oldFileId); +int32_t walGetNewFile(SWal *pWal, int64_t *newFileId); + +static pthread_mutex_t walInitLock = PTHREAD_MUTEX_INITIALIZER; +static int8_t walInited = 0; + typedef struct { int32_t refSetId; int32_t seq; @@ -35,11 +44,21 @@ static int32_t walInitObj(SWal *pWal); static void walFreeObj(void *pWal); int32_t walInit() { + //TODO: change to atomic + pthread_mutex_lock(&walInitLock); + if(walInited) { + pthread_mutex_unlock(&walInitLock); + return 0; + } else { + walInited = 1; + pthread_mutex_unlock(&walInitLock); + } + int32_t code = 0; tsWal.refSetId = taosOpenRef(TSDB_MIN_VNODES, walFreeObj); code = pthread_mutex_init(&tsWal.mutex, NULL); - if (code) { + if (code != 0) { wError("failed to init wal mutex since %s", tstrerror(code)); return code; } @@ -61,6 +80,27 @@ void walCleanUp() { wInfo("wal module is cleaned up"); } +static int walLoadFileset(SWal *pWal) { + DIR *dir = opendir(pWal->path); + if (dir == NULL) { + wError("vgId:%d, path:%s, failed to open since %s", pWal->vgId, pWal->path, strerror(errno)); + return -1; + } + + struct dirent* ent; + while ((ent = readdir(dir)) != NULL) { + char *name = ent->d_name; + name[WAL_NOSUFFIX_LEN] = 0; + //validate file name by regex matching + if(1 /* regex match */) { + int64_t fnameInt64 = atoll(name); + taosArrayPush(pWal->fileSet, &fnameInt64); + } + } + taosArraySort(pWal->fileSet, compareInt64Val); + return 0; +} + SWal *walOpen(const char *path, SWalCfg *pCfg) { SWal *pWal = malloc(sizeof(SWal)); if (pWal == NULL) { @@ -70,9 +110,13 @@ SWal *walOpen(const char *path, SWalCfg *pCfg) { pWal->vgId = pCfg->vgId; pWal->curLogTfd = -1; - /*pWal->curFileId = -1;*/ + pWal->curIdxTfd = -1; pWal->level = pCfg->walLevel; pWal->fsyncPeriod = pCfg->fsyncPeriod; + + memset(&pWal->head, 0, sizeof(SWalHead)); + pWal->head.sver = 0; + tstrncpy(pWal->path, path, sizeof(pWal->path)); pthread_mutex_init(&pWal->mutex, NULL); @@ -129,6 +173,11 @@ static int32_t walInitObj(SWal *pWal) { wError("vgId:%d, path:%s, failed to create directory since %s", pWal->vgId, pWal->path, strerror(errno)); return TAOS_SYSTEM_ERROR(errno); } + pWal->fileSet = taosArrayInit(0, sizeof(int64_t)); + if(pWal->fileSet == NULL) { + wError("vgId:%d, path:%s, failed to init taosArray %s", pWal->vgId, pWal->path, strerror(errno)); + return TAOS_SYSTEM_ERROR(errno); + } wDebug("vgId:%d, object is initialized", pWal->vgId); return 0; diff --git a/source/libs/wal/src/wal.c b/source/libs/wal/src/walRead.c similarity index 65% rename from source/libs/wal/src/wal.c rename to source/libs/wal/src/walRead.c index 59f9c48814..b475183b7b 100644 --- a/source/libs/wal/src/wal.c +++ b/source/libs/wal/src/walRead.c @@ -14,18 +14,13 @@ */ #include "wal.h" +#include "tchecksum.h" -int32_t walCommit(SWal *pWal, int64_t ver) { - return 0; +static int walValidateChecksum(SWalHead *pHead, void* body, int64_t bodyLen) { + return taosCheckChecksum((uint8_t*)pHead, sizeof(SWalHead) - sizeof(uint32_t)*2, pHead->cksumHead) && + taosCheckChecksum(body, bodyLen, pHead->cksumBody); } -int32_t walRollback(SWal *pWal, int64_t ver) { - return 0; -} - -int32_t walTakeSnapshot(SWal *pWal, int64_t ver) { - return 0; -} int32_t walRead(SWal *pWal, SWalHead **ppHead, int64_t ver) { return 0; @@ -36,13 +31,16 @@ int32_t walReadWithFp(SWal *pWal, FWalWrite writeFp, int64_t verStart, int32_t r } int64_t walGetFirstVer(SWal *pWal) { - return 0; + if (pWal == NULL) return 0; + return pWal->firstVersion; } -int64_t walGetSnapshotVer(SWal *pWal) { - return 0; +int64_t walGetSnaphostVer(SWal *pWal) { + if (pWal == NULL) return 0; + return pWal->snapshotVersion; } int64_t walGetLastVer(SWal *pWal) { - return 0; + if (pWal == NULL) return 0; + return pWal->lastVersion; } diff --git a/source/libs/wal/src/walWrite.c b/source/libs/wal/src/walWrite.c index a8123f9c25..7563ec02c7 100644 --- a/source/libs/wal/src/walWrite.c +++ b/source/libs/wal/src/walWrite.c @@ -21,6 +21,18 @@ #include "tfile.h" #include "walInt.h" +int32_t walCommit(SWal *pWal, int64_t ver) { + return 0; +} + +int32_t walRollback(SWal *pWal, int64_t ver) { + return 0; +} + +int32_t walTakeSnapshot(SWal *pWal, int64_t ver) { + return 0; +} + #if 0 static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, char *name, int64_t fileId); @@ -112,60 +124,40 @@ void walRemoveAllOldFiles(void *handle) { } #endif -static void walUpdateChecksum(SWalHead *pHead) { - pHead->sver = 2; - pHead->cksum = taosCalcChecksum(0, (uint8_t *)pHead, sizeof(SWalHead) + pHead->len); -} - -static int walValidateChecksum(SWalHead *pHead) { - if (pHead->sver == 0) { // for compatible with wal before sver 1 - return taosCheckChecksumWhole((uint8_t *)pHead, sizeof(*pHead)); - } else if (pHead->sver >= 1) { - uint32_t cksum = pHead->cksum; - pHead->cksum = 0; - return taosCheckChecksum((uint8_t *)pHead, sizeof(*pHead) + pHead->len, cksum); - } - - return 0; -} - -int64_t walWrite(SWal *pWal, int64_t index, void *body, int32_t bodyLen) { +int64_t walWrite(SWal *pWal, int64_t index, uint8_t msgType, void *body, int32_t bodyLen) { if (pWal == NULL) return -1; - SWalHead *pHead = malloc(sizeof(SWalHead) + bodyLen); - if(pHead == NULL) { - return -1; - } - pHead->version = index; - int32_t code = 0; - // no wal if (!tfValid(pWal->curLogTfd)) return 0; if (pWal->level == TAOS_WAL_NOLOG) return 0; - if (pHead->version <= pWal->curVersion) return 0; + if (index > pWal->lastVersion + 1) return -1; - pHead->signature = WAL_SIGNATURE; - pHead->len = bodyLen; - memcpy(pHead->cont, body, bodyLen); + pWal->head.version = index; + int32_t code = 0; - walUpdateChecksum(pHead); + pWal->head.signature = WAL_SIGNATURE; + pWal->head.len = bodyLen; + pWal->head.msgType = msgType; - int32_t contLen = pHead->len + sizeof(SWalHead); + pWal->head.cksumHead = taosCalcChecksum(0, (const uint8_t*)&pWal->head, sizeof(SWalHead)- sizeof(uint32_t)*2); + pWal->head.cksumBody = taosCalcChecksum(0, (const uint8_t*)&body, bodyLen); pthread_mutex_lock(&pWal->mutex); - if (tfWrite(pWal->curLogTfd, pHead, contLen) != contLen) { + if (tfWrite(pWal->curLogTfd, &pWal->head, sizeof(SWalHead)) != sizeof(SWalHead)) { + //ftruncate code = TAOS_SYSTEM_ERROR(errno); wError("vgId:%d, file:%"PRId64".log, failed to write since %s", pWal->vgId, pWal->curFileFirstVersion, strerror(errno)); - } else { - /*wTrace("vgId:%d, write wal, fileId:%" PRId64 " tfd:%" PRId64 " hver:%" PRId64 " wver:%" PRIu64 " len:%d", pWal->vgId,*/ - /*pWal->curFileId, pWal->logTfd, pHead->version, pWal->curVersion, pHead->len);*/ - pWal->curVersion = pHead->version; } - pthread_mutex_unlock(&pWal->mutex); + if (tfWrite(pWal->curLogTfd, &body, bodyLen) != bodyLen) { + //ftruncate + code = TAOS_SYSTEM_ERROR(errno); + wError("vgId:%d, file:%"PRId64".log, failed to write since %s", pWal->vgId, pWal->curFileFirstVersion, strerror(errno)); + } + //TODO:write idx - ASSERT(contLen == pHead->len + sizeof(SWalHead)); + pthread_mutex_unlock(&pWal->mutex); return code; } @@ -254,6 +246,7 @@ static void walFtruncate(SWal *pWal, int64_t tfd, int64_t offset) { tfFsync(tfd); } +#if 0 static int32_t walSkipCorruptedRecord(SWal *pWal, SWalHead *pHead, int64_t tfd, int64_t *offset) { int64_t pos = *offset; while (1) { @@ -387,21 +380,4 @@ static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, ch wDebug("vgId:%d, file:%s, it is closed after restore", pWal->vgId, name); return code; } - -uint64_t walGetVersion(SWal *pWal) { - if (pWal == NULL) return 0; - - return pWal->curVersion; -} - -// Wal version in slave (dnode1) must be reset. -// Because after the data file is recovered from peer (dnode2), the new file version in dnode1 may become smaller than origin. -// Some new wal record cannot be written to the wal file in dnode1 for wal version not reset, then fversion and the record in wal file may inconsistent, -// At this time, if dnode2 down, dnode1 switched to master. After dnode2 start and restore data from dnode1, data loss will occur - -void walResetVersion(SWal *pWal, uint64_t newVer) { - if (pWal == NULL) return; - wInfo("vgId:%d, version reset from %" PRIu64 " to %" PRIu64, pWal->vgId, pWal->curVersion, newVer); - - pWal->curVersion = newVer; -} +#endif