From d1445018324332a59fdf4e2564b858610bc7356a Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Tue, 30 Nov 2021 15:42:13 +0800 Subject: [PATCH 01/10] update fst core struct --- source/libs/index/src/index_fst.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/index/src/index_fst.c b/source/libs/index/src/index_fst.c index 315253d907..1d6e26e612 100644 --- a/source/libs/index/src/index_fst.c +++ b/source/libs/index/src/index_fst.c @@ -1265,7 +1265,7 @@ StreamWithStateResult *streamWithStateNextWith(StreamWithState *sws, StreamCallb return NULL; } if (FST_NODE_IS_FINAL(nextNode) && isMatch) { - FstOutput fOutput = {.null = false, out = out + FST_NODE_FINAL_OUTPUT(nextNode)}; + FstOutput fOutput = {.null = false, .out = out + FST_NODE_FINAL_OUTPUT(nextNode)}; return swsResultCreate(&slice, fOutput , tState); } } From d46ca756bf327841be6e7ab3f9c7f639d32cd985 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Wed, 1 Dec 2021 00:24:12 +0800 Subject: [PATCH 02/10] refactor builder struct --- source/libs/index/inc/index_fst.h | 26 ++-- source/libs/index/inc/index_fst_util.h | 25 ++-- source/libs/index/src/index_fst.c | 141 ++++++++++++++----- source/libs/index/src/index_fst_automation.c | 1 + source/libs/index/src/index_fst_util.c | 102 ++++++++++---- 5 files changed, 216 insertions(+), 79 deletions(-) diff --git a/source/libs/index/inc/index_fst.h b/source/libs/index/inc/index_fst.h index bcb7070755..fe669c8567 100644 --- a/source/libs/index/inc/index_fst.h +++ b/source/libs/index/inc/index_fst.h @@ -34,6 +34,7 @@ typedef struct FstRange { } FstRange; +typedef enum {GE, GT, LE, LT} RangeType; typedef enum { OneTransNext, OneTrans, AnyTrans, EmptyFinal} State; typedef enum {Ordered, OutOfOrdered, DuplicateKey} OrderType; @@ -169,11 +170,6 @@ uint64_t fstStateFindInput(FstState *state, FstNode *node, uint8_t b, bool *null - - - - - #define FST_STATE_ONE_TRNAS_NEXT(node) (node->state.state == OneTransNext) #define FST_STATE_ONE_TRNAS(node) (node->state.state == OneTrans) #define FST_STATE_ANY_TRANS(node) (node->state.state == AnyTrans) @@ -273,6 +269,8 @@ FstNode* fstGetRoot(Fst *fst); FstType fstGetType(Fst *fst); CompiledAddr fstGetRootAddr(Fst *fst); + + Output fstEmptyFinalOutput(Fst *fst, bool *null); bool fstVerify(Fst *fst); @@ -280,10 +278,6 @@ bool fstVerify(Fst *fst); //refactor this function bool fstBuilderNodeCompileTo(FstBuilderNode *b, FstCountingWriter *wrt, CompiledAddr lastAddr, CompiledAddr startAddr); - - - - typedef struct StreamState { FstNode *node; uint64_t trans; @@ -316,4 +310,18 @@ StreamWithState *streamWithStateCreate(Fst *fst, Automation *automation, FstBoun void streamWithStateDestroy(StreamWithState *sws); bool streamWithStateSeekMin(StreamWithState *sws, FstBoundWithData *min); StreamWithStateResult* streamWithStateNextWith(StreamWithState *sws, StreamCallback callback); + +typedef struct FstStreamBuilder { + Fst *fst; + Automation *aut; + FstBoundWithData *min; + FstBoundWithData *max; +} FstStreamBuilder; + +FstStreamBuilder *fstStreamBuilderCreate(Fst *fst, Automation *aut); +// set up bound range +// refator, simple code by marco + +FstStreamBuilder *fstStreamBuilderRange(FstStreamBuilder *b, FstSlice *val, RangeType type); + #endif diff --git a/source/libs/index/inc/index_fst_util.h b/source/libs/index/inc/index_fst_util.h index ff0946063d..a3cef493eb 100644 --- a/source/libs/index/inc/index_fst_util.h +++ b/source/libs/index/inc/index_fst_util.h @@ -67,20 +67,27 @@ uint8_t packDeltaSize(CompiledAddr nodeAddr, CompiledAddr transAddr); CompiledAddr unpackDelta(char *data, uint64_t len, uint64_t nodeAddr); +typedef struct FstString { + uint8_t *data; + uint32_t len; + int32_t ref; +} FstString; typedef struct FstSlice { - uint8_t *data; - uint64_t dLen; - int32_t start; - int32_t end; + FstString *str; + int32_t start; + int32_t end; } FstSlice; -FstSlice fstSliceCopy(FstSlice *slice, int32_t start, int32_t end); -FstSlice fstSliceCreate(uint8_t *data, uint64_t dLen); -bool fstSliceEmpty(FstSlice *slice); -int fstSliceCompare(FstSlice *a, FstSlice *b); +FstSlice fstSliceCreate(uint8_t *data, uint64_t len); +FstSlice fstSliceCopy(FstSlice *s, int32_t start, int32_t end); +FstSlice fstSliceDeepCopy(FstSlice *s, int32_t start, int32_t end); +bool fstSliceEmpty(FstSlice *s); +int fstSliceCompare(FstSlice *s1, FstSlice *s2); +void fstSliceDestroy(FstSlice *s); +uint8_t *fstSliceData(FstSlice *s, int32_t *sz); -#define FST_SLICE_LEN(s) ((s)->end - (s)->start + 1) +#define FST_SLICE_LEN(s) (s->end - s->start + 1) #endif diff --git a/source/libs/index/src/index_fst.c b/source/libs/index/src/index_fst.c index 1d6e26e612..f7ed124f43 100644 --- a/source/libs/index/src/index_fst.c +++ b/source/libs/index/src/index_fst.c @@ -92,7 +92,7 @@ void fstUnFinishedNodesTopLastFreeze(FstUnFinishedNodes *nodes, CompiledAddr add } void fstUnFinishedNodesAddSuffix(FstUnFinishedNodes *nodes, FstSlice bs, Output out) { FstSlice *s = &bs; - if (s->data == NULL || s->dLen == 0 || s->start > s->end) { + if (fstSliceEmpty(s)) { return; } size_t sz = taosArrayGetSize(nodes->stack) - 1; @@ -104,9 +104,11 @@ void fstUnFinishedNodesAddSuffix(FstUnFinishedNodes *nodes, FstSlice bs, Output //FstLastTransition *trn = malloc(sizeof(FstLastTransition)); //trn->inp = s->data[s->start]; //trn->out = out; - un->last = fstLastTransitionCreate(s->data[s->start], out); + int32_t len = 0; + uint8_t *data = fstSliceData(s, &len); + un->last = fstLastTransitionCreate(data[0], out); - for (uint64_t i = s->start; i <= s->end; i++) { + for (uint64_t i = 0; i < len; i++) { FstBuilderNode *n = malloc(sizeof(FstBuilderNode)); n->isFinal = false; n->finalOutput = 0; @@ -115,7 +117,7 @@ void fstUnFinishedNodesAddSuffix(FstUnFinishedNodes *nodes, FstSlice bs, Output //FstLastTransition *trn = malloc(sizeof(FstLastTransition)); //trn->inp = s->data[i]; //trn->out = out; - FstLastTransition *trn = fstLastTransitionCreate(s->data[i], out); + FstLastTransition *trn = fstLastTransitionCreate(data[i], out); FstBuilderNodeUnfinished un = {.node = n, .last = trn}; taosArrayPush(nodes->stack, &un); @@ -133,7 +135,8 @@ uint64_t fstUnFinishedNodesFindCommPrefix(FstUnFinishedNodes *node, FstSlice bs) uint64_t count = 0; for (size_t i = 0; i < ssz && i < lsz; i++) { FstBuilderNodeUnfinished *un = taosArrayGet(node->stack, i); - if (un->last->inp == s->data[s->start + i]) { + uint8_t *data = fstSliceData(s, NULL); + if (un->last->inp == data[i]) { count++; } else { break; @@ -153,7 +156,8 @@ uint64_t fstUnFinishedNodesFindCommPrefixAndSetOutput(FstUnFinishedNodes *node, FstLastTransition *t = un->last; uint64_t addPrefix = 0; - if (t && t->inp == s->data[s->start + i]) { + uint8_t *data = fstSliceData(s, NULL); + if (t && t->inp == data[i]) { uint64_t commPrefix = MIN(t->out, *out); uint64_t tAddPrefix = t->out - commPrefix; (*out) = (*out) - commPrefix; @@ -176,7 +180,9 @@ FstState fstStateCreateFrom(FstSlice* slice, CompiledAddr addr) { if (addr == EMPTY_ADDRESS) { return fs; } - uint8_t v = slice->data[addr]; + + uint8_t *data = fstSliceData(slice, NULL); + uint8_t v = data[addr]; uint8_t t = (v & 0b11000000) >> 6; if (t == 0b11) { fs.state = OneTransNext; @@ -376,7 +382,8 @@ uint8_t fstStateInput(FstState *s, FstNode *node) { FstSlice *slice = &node->data; bool null = false; uint8_t inp = fstStateCommInput(s, &null); - return null == false ? inp : slice->data[slice->start - 1]; + uint8_t *data = fstSliceData(slice, NULL); + return null == false ? inp : data[-1]; } uint8_t fstStateInputForAnyTrans(FstState *s, FstNode *node, uint64_t i) { assert(s->state == AnyTrans); @@ -388,7 +395,9 @@ uint8_t fstStateInputForAnyTrans(FstState *s, FstNode *node, uint64_t i) { - fstStateTransIndexSize(s, node->version, node->nTrans) - i - 1; // the output size - return slice->data[at]; + + uint8_t *data = fstSliceData(slice, NULL); + return data[at]; } // trans_addr @@ -406,7 +415,8 @@ CompiledAddr fstStateTransAddr(FstState *s, FstNode *node) { - tSizes; // refactor error logic - return unpackDelta(slice->data + slice->start + i, tSizes, node->end); + uint8_t *data = fstSliceData(slice, NULL); + return unpackDelta(data +i, tSizes, node->end); } } CompiledAddr fstStateTransAddrForAnyTrans(FstState *s, FstNode *node, uint64_t i) { @@ -421,7 +431,8 @@ CompiledAddr fstStateTransAddrForAnyTrans(FstState *s, FstNode *node, uint64_t i - node->nTrans - (i * tSizes) - tSizes; - return unpackDelta(slice->data + slice->start + at, tSizes, node->end); + uint8_t *data = fstSliceData(slice, NULL); + return unpackDelta(data + at, tSizes, node->end); } // sizes @@ -434,7 +445,8 @@ PackSizes fstStateSizes(FstState *s, FstSlice *slice) { i = FST_SLICE_LEN(slice) - 1 - fstStateNtransLen(s) - 1; } - return (PackSizes)(slice->data[slice->start + i]); + uint8_t *data = fstSliceData(slice, NULL); + return (PackSizes)(*(data +i)); } // Output Output fstStateOutput(FstState *s, FstNode *node) { @@ -452,7 +464,8 @@ Output fstStateOutput(FstState *s, FstNode *node) { - 1 - tSizes - oSizes; - return unpackUint64(slice->data + slice->start + i, oSizes); + uint8_t *data = fstSliceData(slice, NULL); + return unpackUint64(data + i, oSizes); } Output fstStateOutputForAnyTrans(FstState *s, FstNode *node, uint64_t i) { @@ -469,7 +482,9 @@ Output fstStateOutputForAnyTrans(FstState *s, FstNode *node, uint64_t i) { - fstStateTotalTransSize(s, node->version, node->sizes, node->nTrans) - (i * oSizes) - oSizes; - return unpackUint64(slice->data + slice->start + at, oSizes); + + uint8_t *data = fstSliceData(slice, NULL); + return unpackUint64(data + at, oSizes); } // anyTrans specify function @@ -523,7 +538,10 @@ uint64_t fstStateNtrans(FstState *s, FstSlice *slice) { if (null != true) { return n; } - n = slice->data[slice->end - 1]; // data[data.len() - 2] + int32_t len; + uint8_t *data = fstSliceData(slice, &len); + n = data[len - 2]; + //n = data[slice->end - 1]; // data[data.len() - 2] return n == 1 ? 256: n; // // "1" is never a normal legal value here, because if there, // is only 1 transition, then it is encoded in the state byte } Output fstStateFinalOutput(FstState *s, uint64_t version, FstSlice *slice, PackSizes sizes, uint64_t nTrans) { @@ -538,7 +556,8 @@ Output fstStateFinalOutput(FstState *s, uint64_t version, FstSlice *slice, Pack - fstStateTotalTransSize(s, version, sizes, nTrans) - (nTrans * oSizes) - oSizes; - return unpackUint64(slice->data + slice->start + at, (uint8_t)oSizes); + uint8_t *data = fstSliceData(slice, NULL); + return unpackUint64(data + at, (uint8_t)oSizes); } uint64_t fstStateFindInput(FstState *s, FstNode *node, uint8_t b, bool *null) { @@ -549,7 +568,10 @@ uint64_t fstStateFindInput(FstState *s, FstNode *node, uint8_t b, bool *null) { - fstStateNtransLen(s) - 1 // pack size - fstStateTransIndexSize(s, node->version, node->nTrans); - uint64_t i = slice->data[slice->start + at + b]; + int32_t dlen = 0; + uint8_t *data = fstSliceData(slice, &dlen); + uint64_t i = data[at + b]; + //uint64_t i = slice->data[slice->start + at + b]; if (i >= node->nTrans) { *null = true; } @@ -561,8 +583,13 @@ uint64_t fstStateFindInput(FstState *s, FstNode *node, uint8_t b, bool *null) { - node->nTrans; uint64_t end = start + node->nTrans; uint64_t len = end - start; + int32_t dlen = 0; + uint8_t *data = fstSliceData(slice, &dlen); for(int i = 0; i < len; i++) { - uint8_t v = slice->data[slice->start + i]; + //uint8_t v = slice->data[slice->start + i]; + ////slice->data[slice->start + i]; + uint8_t v = data[i]; + if (v == b) { return node->nTrans - i - 1; // bug } @@ -888,7 +915,7 @@ void fstBuilderFinish(FstBuilder *b) { FstSlice fstNodeAsSlice(FstNode *node) { FstSlice *slice = &node->data; - FstSlice s = fstSliceCopy(slice, slice->end, slice->dLen - 1); + FstSlice s = fstSliceCopy(slice, slice->end, FST_SLICE_LEN(slice) - 1); return s; } @@ -929,12 +956,13 @@ void fstBuilderNodeUnfinishedAddOutputPrefix(FstBuilderNodeUnfinished *unNode, O } Fst* fstCreate(FstSlice *slice) { - char *buf = slice->data; - uint64_t skip = 0; - uint64_t len = slice->dLen; - if (len < 36) { + int32_t slen; + char *buf = fstSliceData(slice, &slen); + if (slen < 36) { return NULL; } + uint64_t len = slen; + uint64_t skip = 0; uint64_t version; taosDecodeFixedU64(buf, &version); @@ -992,8 +1020,10 @@ void fstDestroy(Fst *fst) { bool fstGet(Fst *fst, FstSlice *b, Output *out) { FstNode *root = fstGetRoot(fst); Output tOut = 0; - for (uint32_t i = 0; i < b->dLen; i++) { - uint8_t inp = b->data[i]; + int32_t len; + uint8_t *data = fstSliceData(b, &len); + for (uint32_t i = 0; i < len; i++) { + uint8_t inp = data[i]; Output res = 0; bool null = fstNodeFindInput(root, inp, &res); if (null) { return false; } @@ -1046,9 +1076,10 @@ Output fstEmptyFinalOutput(Fst *fst, bool *null) { bool fstVerify(Fst *fst) { uint32_t checkSum = fst->meta->checkSum; - FstSlice *data = fst->data; + int32_t len; + uint8_t *data = fstSliceData(fst->data, &len); TSCKSUM initSum = 0; - if (!taosCheckChecksumWhole(data->data, data->dLen)) { + if (!taosCheckChecksumWhole(data, len)) { return false; } return true; @@ -1058,9 +1089,14 @@ bool fstVerify(Fst *fst) { FstBoundWithData* fstBoundStateCreate(FstBound type, FstSlice *data) { FstBoundWithData *b = calloc(1, sizeof(FstBoundWithData)); if (b == NULL) { return NULL; } - + + if (data != NULL) { + b->data = fstSliceCopy(data, data->start, data->end); + } else { + b->data = fstSliceCreate(NULL, 0); + } b->type = type; - b->data = fstSliceCopy(data, data->start, data->end); + return b; } @@ -1145,8 +1181,10 @@ bool streamWithStateSeekMin(StreamWithState *sws, FstBoundWithData *min) { Output out = 0; void* autState = sws->aut->start(); - for (uint32_t i = 0; i < key->dLen; i++) { - uint8_t b = key->data[i]; + int32_t len; + uint8_t *data = fstSliceData(key, &len); + for (uint32_t i = 0; i < len; i++) { + uint8_t b = data[i]; uint64_t res = 0; bool null = fstNodeFindInput(node, b, &res); if (null == false) { @@ -1277,8 +1315,8 @@ StreamWithStateResult *swsResultCreate(FstSlice *data, FstOutput fOut, void *sta StreamWithStateResult *result = calloc(1, sizeof(StreamWithStateResult)); if (result == NULL) { return NULL; } - FstSlice slice = fstSliceCopy(data, 0, data->dLen - 1); - result->data = slice; + FstSlice s = fstSliceCopy(data, 0, FST_SLICE_LEN(data) - 1); + result->data = s; result->out = fOut; result->state = state; @@ -1293,5 +1331,42 @@ void streamStateDestroy(void *s) { //free(s->autoState); } +FstStreamBuilder *fstStreamBuilderCreate(Fst *fst, Automation *aut) { + FstStreamBuilder *b = calloc(1, sizeof(FstStreamBuilder)); + if (NULL == b) { return NULL; } + + b->fst = fst; + b->aut = aut; + b->min = fstBoundStateCreate(Unbounded, NULL); + b->max = fstBoundStateCreate(Unbounded, NULL); + return b; +} +void fstStreamBuilderDestroy(FstStreamBuilder *b) { + free(b); +} +FstStreamBuilder *fstStreamBuilderRange(FstStreamBuilder *b, FstSlice *val, RangeType type) { + if (b == NULL) { return NULL; } + + if (type == GE) { + b->min->type = Included; + fstSliceDestroy(&(b->min->data)); + b->min->data = fstSliceDeepCopy(val, 0, FST_SLICE_LEN(val) - 1); + } else if (type == GT) { + b->min->type = Excluded; + fstSliceDestroy(&(b->min->data)); + b->min->data = fstSliceDeepCopy(val, 0, FST_SLICE_LEN(val) - 1); + } else if (type == LE) { + b->max->type = Included; + fstSliceDestroy(&(b->max->data)); + b->max->data = fstSliceDeepCopy(val, 0, FST_SLICE_LEN(val) - 1); + } else if (type == LT) { + b->max->type = Excluded; + fstSliceDestroy(&(b->max->data)); + b->max->data = fstSliceDeepCopy(val, 0, FST_SLICE_LEN(val) - 1); + } + return b; +} + + diff --git a/source/libs/index/src/index_fst_automation.c b/source/libs/index/src/index_fst_automation.c index f2f48bbc8a..3d5efd30f3 100644 --- a/source/libs/index/src/index_fst_automation.c +++ b/source/libs/index/src/index_fst_automation.c @@ -12,3 +12,4 @@ * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ + diff --git a/source/libs/index/src/index_fst_util.c b/source/libs/index/src/index_fst_util.c index 952c710676..92949d1ff2 100644 --- a/source/libs/index/src/index_fst_util.c +++ b/source/libs/index/src/index_fst_util.c @@ -91,42 +91,88 @@ CompiledAddr unpackDelta(char *data, uint64_t len, uint64_t nodeAddr) { } // fst slice func -FstSlice fstSliceCreate(uint8_t *data, uint64_t dLen) { - FstSlice slice = {.data = data, .dLen = dLen, .start = 0, .end = dLen - 1}; - return slice; +// + +FstSlice fstSliceCreate(uint8_t *data, uint64_t len) { + FstString *str = (FstString *)malloc(sizeof(FstString)); + str->ref = 1; + str->len = len; + str->data = malloc(len * sizeof(uint8_t)); + memcpy(str->data, data, len); + + FstSlice s = {.str = str, .start = 0, .end = len - 1}; + return s; } // just shallow copy -FstSlice fstSliceCopy(FstSlice *slice, int32_t start, int32_t end) { - FstSlice t; - if (start >= slice->dLen || end >= slice->dLen || start > end) { - t.data = NULL; - return t; - }; - - t.data = slice->data; - t.dLen = slice->dLen; - t.start = start; - t.end = end; +FstSlice fstSliceCopy(FstSlice *s, int32_t start, int32_t end) { + FstString *str = s->str; + str->ref++; + int32_t alen; + //uint8_t *buf = fstSliceData(s, &alen); + //start = buf + start - (buf - s->start); + //end = buf + end - (buf - s->start); + + FstSlice t = {.str = str, .start = start + s->start, .end = end + s->start}; return t; } -bool fstSliceEmpty(FstSlice *slice) { - return slice->data == NULL || slice->dLen <= 0; +FstSlice fstSliceDeepCopy(FstSlice *s, int32_t start, int32_t end) { + + int32_t alen, tlen = end - start + 1; + uint8_t *data = fstSliceData(s, &alen); + assert(tlen <= alen); + + uint8_t *buf = malloc(sizeof(uint8_t) * tlen); + memcpy(buf, data, tlen); + + FstString *str = malloc(sizeof(FstString)); + str->data = buf; + str->len = tlen; + str->ref = 1; + + FstSlice ans; + ans.str = str; + ans.start = 0; + ans.end = tlen - 1; + return ans; +} +bool fstSliceEmpty(FstSlice *s) { + return s->str == NULL || s->start < 0 || s->end < 0; +} + +uint8_t *fstSliceData(FstSlice *s, int32_t *size) { + FstString *str = s->str; + if (size != NULL) { + *size = s->end - s->start + 1; + } + return str->data + s->start; +} +void fstSliceDestroy(FstSlice *s) { + FstString *str = s->str; + str->ref--; + if (str->ref <= 0) { + free(str->data); + free(str); + s->str = NULL; + } } int fstSliceCompare(FstSlice *a, FstSlice *b) { - int32_t aLen = (a->end - a->start + 1); - int32_t bLen = (b->end - b->start + 1); - int32_t mLen = (aLen < bLen ? aLen : bLen); - for (int i = 0; i < mLen; i++) { - uint8_t x = a->data[i + a->start]; - uint8_t y = b->data[i + b->start]; - if (x == y) { continue; } + int32_t alen, blen; + uint8_t *aBuf = fstSliceData(a, &alen); + uint8_t *bBuf = fstSliceData(b, &blen); + + + uint32_t i, j; + for (i = 0, j = 0; i < alen && j < blen; i++, j++) { + uint8_t x = aBuf[i]; + uint8_t y = bBuf[j]; + if (x == y) { continue;} else if (x < y) { return -1; } - else { return 1; } - } - if (aLen == bLen) { return 0; } - else if (aLen < bLen) { return -1; } - else { return 1; } + else { return 1; }; + } + if (i < alen) { return 1; } + else if (j < blen) { return -1; } + else { return 0; } } From f6908c8d1429ec741b9c014c288f6d413036a5dc Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Wed, 1 Dec 2021 08:46:22 +0800 Subject: [PATCH 03/10] refactor builder struct --- source/libs/index/src/index_fst.c | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/source/libs/index/src/index_fst.c b/source/libs/index/src/index_fst.c index f7ed124f43..e0ffbdbe9f 100644 --- a/source/libs/index/src/index_fst.c +++ b/source/libs/index/src/index_fst.c @@ -1315,7 +1315,7 @@ StreamWithStateResult *swsResultCreate(FstSlice *data, FstOutput fOut, void *sta StreamWithStateResult *result = calloc(1, sizeof(StreamWithStateResult)); if (result == NULL) { return NULL; } - FstSlice s = fstSliceCopy(data, 0, FST_SLICE_LEN(data) - 1); + result->data = fstSliceCopy(data, 0, FST_SLICE_LEN(data) - 1); result->data = s; result->out = fOut; result->state = state; @@ -1342,6 +1342,8 @@ FstStreamBuilder *fstStreamBuilderCreate(Fst *fst, Automation *aut) { return b; } void fstStreamBuilderDestroy(FstStreamBuilder *b) { + fstSliceDestroy(&b->min); + fstSliceDestroy(&b->max); free(b); } FstStreamBuilder *fstStreamBuilderRange(FstStreamBuilder *b, FstSlice *val, RangeType type) { From 7ee1cf62ca436f6dac136177532b637c2afed09e Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Wed, 1 Dec 2021 12:00:20 +0800 Subject: [PATCH 04/10] refactor builder struct --- source/libs/index/src/index_fst.c | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/source/libs/index/src/index_fst.c b/source/libs/index/src/index_fst.c index e0ffbdbe9f..dd43890209 100644 --- a/source/libs/index/src/index_fst.c +++ b/source/libs/index/src/index_fst.c @@ -129,13 +129,12 @@ void fstUnFinishedNodesAddSuffix(FstUnFinishedNodes *nodes, FstSlice bs, Output uint64_t fstUnFinishedNodesFindCommPrefix(FstUnFinishedNodes *node, FstSlice bs) { FstSlice *s = &bs; - size_t lsz = (size_t)(s->end - s->start + 1); // data len size_t ssz = taosArrayGetSize(node->stack); // stack size - uint64_t count = 0; + int32_t lsz; // data len + uint8_t *data = fstSliceData(s, &lsz); for (size_t i = 0; i < ssz && i < lsz; i++) { FstBuilderNodeUnfinished *un = taosArrayGet(node->stack, i); - uint8_t *data = fstSliceData(s, NULL); if (un->last->inp == data[i]) { count++; } else { @@ -662,6 +661,7 @@ static const char *fstNodeState(FstNode *node) { void fstNodeDestroy(FstNode *node) { + fstSliceDestroy(&node->data); free(node); } FstTransitions* fstNodeTransitions(FstNode *node) { @@ -825,6 +825,7 @@ void fstBuilderInsertOutput(FstBuilder *b, FstSlice bs, Output in) { FstSlice sub = fstSliceCopy(s, prefixLen, s->end); fstUnFinishedNodesAddSuffix(b->unfinished, sub, out); + fstSliceDestroy(&sub); return; } @@ -1316,7 +1317,6 @@ StreamWithStateResult *swsResultCreate(FstSlice *data, FstOutput fOut, void *sta if (result == NULL) { return NULL; } result->data = fstSliceCopy(data, 0, FST_SLICE_LEN(data) - 1); - result->data = s; result->out = fOut; result->state = state; @@ -1342,8 +1342,8 @@ FstStreamBuilder *fstStreamBuilderCreate(Fst *fst, Automation *aut) { return b; } void fstStreamBuilderDestroy(FstStreamBuilder *b) { - fstSliceDestroy(&b->min); - fstSliceDestroy(&b->max); + fstSliceDestroy(&b->min->data); + fstSliceDestroy(&b->max->data); free(b); } FstStreamBuilder *fstStreamBuilderRange(FstStreamBuilder *b, FstSlice *val, RangeType type) { From 716c0045f8aa32ddff8e8fd8cb7d6d8c9726010a Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Wed, 1 Dec 2021 17:29:59 +0800 Subject: [PATCH 05/10] refactor builder struct --- source/libs/index/inc/index_fst_util.h | 4 ++-- source/libs/index/src/index_fst.c | 8 ++++---- source/libs/index/src/index_fst_util.c | 4 ++-- 3 files changed, 8 insertions(+), 8 deletions(-) diff --git a/source/libs/index/inc/index_fst_util.h b/source/libs/index/inc/index_fst_util.h index a3cef493eb..d87e5bd57e 100644 --- a/source/libs/index/inc/index_fst_util.h +++ b/source/libs/index/inc/index_fst_util.h @@ -82,8 +82,8 @@ typedef struct FstSlice { FstSlice fstSliceCreate(uint8_t *data, uint64_t len); FstSlice fstSliceCopy(FstSlice *s, int32_t start, int32_t end); FstSlice fstSliceDeepCopy(FstSlice *s, int32_t start, int32_t end); -bool fstSliceEmpty(FstSlice *s); -int fstSliceCompare(FstSlice *s1, FstSlice *s2); +bool fstSliceIsEmpty(FstSlice *s); +int fstSliceCompare(FstSlice *s1, FstSlice *s2); void fstSliceDestroy(FstSlice *s); uint8_t *fstSliceData(FstSlice *s, int32_t *sz); diff --git a/source/libs/index/src/index_fst.c b/source/libs/index/src/index_fst.c index dd43890209..ca1c7910ed 100644 --- a/source/libs/index/src/index_fst.c +++ b/source/libs/index/src/index_fst.c @@ -92,7 +92,7 @@ void fstUnFinishedNodesTopLastFreeze(FstUnFinishedNodes *nodes, CompiledAddr add } void fstUnFinishedNodesAddSuffix(FstUnFinishedNodes *nodes, FstSlice bs, Output out) { FstSlice *s = &bs; - if (fstSliceEmpty(s)) { + if (fstSliceIsEmpty(s)) { return; } size_t sz = taosArrayGetSize(nodes->stack) - 1; @@ -801,7 +801,7 @@ bool fstBuilderInsert(FstBuilder *b, FstSlice bs, Output in) { void fstBuilderInsertOutput(FstBuilder *b, FstSlice bs, Output in) { FstSlice *s = &bs; - if (fstSliceEmpty(s)) { + if (fstSliceIsEmpty(s)) { b->len = 1; fstUnFinishedNodesSetRootOutput(b->unfinished, in); return; @@ -831,7 +831,7 @@ void fstBuilderInsertOutput(FstBuilder *b, FstSlice bs, Output in) { OrderType fstBuilderCheckLastKey(FstBuilder *b, FstSlice bs, bool ckDup) { FstSlice *input = &bs; - if (fstSliceEmpty(&b->last)) { + if (fstSliceIsEmpty(&b->last)) { // deep copy or not b->last = fstSliceCopy(&bs, input->start, input->end); } else { @@ -1115,7 +1115,7 @@ bool fstBoundWithDataIsEmpty(FstBoundWithData *bound) { if (bound->type == Unbounded) { return true; } else { - return fstSliceEmpty(&bound->data); + return fstSliceIsEmpty(&bound->data); } } diff --git a/source/libs/index/src/index_fst_util.c b/source/libs/index/src/index_fst_util.c index 92949d1ff2..1c7a1a9e9d 100644 --- a/source/libs/index/src/index_fst_util.c +++ b/source/libs/index/src/index_fst_util.c @@ -122,7 +122,7 @@ FstSlice fstSliceDeepCopy(FstSlice *s, int32_t start, int32_t end) { assert(tlen <= alen); uint8_t *buf = malloc(sizeof(uint8_t) * tlen); - memcpy(buf, data, tlen); + memcpy(buf, data + start, tlen); FstString *str = malloc(sizeof(FstString)); str->data = buf; @@ -135,7 +135,7 @@ FstSlice fstSliceDeepCopy(FstSlice *s, int32_t start, int32_t end) { ans.end = tlen - 1; return ans; } -bool fstSliceEmpty(FstSlice *s) { +bool fstSliceIsEmpty(FstSlice *s) { return s->str == NULL || s->start < 0 || s->end < 0; } From e3ccb28c097e4ba845c7293c134dd38a7adb8a57 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Thu, 2 Dec 2021 10:32:58 +0800 Subject: [PATCH 06/10] update fst write --- source/libs/index/inc/index_fst.h | 8 +++----- source/libs/index/src/index_fst.c | 14 ++++++++++++-- source/libs/index/src/index_fst_util.c | 11 +++++------ 3 files changed, 20 insertions(+), 13 deletions(-) diff --git a/source/libs/index/inc/index_fst.h b/source/libs/index/inc/index_fst.h index fe669c8567..cd5073187c 100644 --- a/source/libs/index/inc/index_fst.h +++ b/source/libs/index/inc/index_fst.h @@ -268,11 +268,8 @@ FstNode* fstGetNode(Fst *fst, CompiledAddr); FstNode* fstGetRoot(Fst *fst); FstType fstGetType(Fst *fst); CompiledAddr fstGetRootAddr(Fst *fst); - - - -Output fstEmptyFinalOutput(Fst *fst, bool *null); -bool fstVerify(Fst *fst); +Output fstEmptyFinalOutput(Fst *fst, bool *null); +bool fstVerify(Fst *fst); //refactor this function @@ -304,6 +301,7 @@ typedef struct StreamWithStateResult { } StreamWithStateResult; StreamWithStateResult *swsResultCreate(FstSlice *data, FstOutput fOut, void *state); +void swsResultDestroy(StreamWithStateResult *result); typedef void* (*StreamCallback)(void *); StreamWithState *streamWithStateCreate(Fst *fst, Automation *automation, FstBoundWithData *min, FstBoundWithData *max) ; diff --git a/source/libs/index/src/index_fst.c b/source/libs/index/src/index_fst.c index ca1c7910ed..68d2011b8f 100644 --- a/source/libs/index/src/index_fst.c +++ b/source/libs/index/src/index_fst.c @@ -1301,12 +1301,16 @@ StreamWithStateResult *streamWithStateNextWith(StreamWithState *sws, StreamCallb if (fstBoundWithDataExceededBy(sws->endAt, &slice)) { taosArrayDestroyEx(sws->stack, streamStateDestroy); sws->stack = (SArray *)taosArrayInit(256, sizeof(StreamState)); + fstSliceDestroy(&slice); return NULL; } if (FST_NODE_IS_FINAL(nextNode) && isMatch) { FstOutput fOutput = {.null = false, .out = out + FST_NODE_FINAL_OUTPUT(nextNode)}; - return swsResultCreate(&slice, fOutput , tState); + StreamWithStateResult *result = swsResultCreate(&slice, fOutput , tState); + fstSliceDestroy(&slice); + return result; } + fstSliceDestroy(&slice); } return NULL; @@ -1321,8 +1325,14 @@ StreamWithStateResult *swsResultCreate(FstSlice *data, FstOutput fOut, void *sta result->state = state; return result; - } +void swsResultDestroy(StreamWithStateResult *result) { + if (NULL == result) { return; } + + fstSliceDestroy(&result->data); + free(result); +} + void streamStateDestroy(void *s) { if (NULL == s) { return; } StreamState *ss = (StreamState *)s; diff --git a/source/libs/index/src/index_fst_util.c b/source/libs/index/src/index_fst_util.c index 1c7a1a9e9d..532b0b8ac3 100644 --- a/source/libs/index/src/index_fst_util.c +++ b/source/libs/index/src/index_fst_util.c @@ -107,7 +107,6 @@ FstSlice fstSliceCreate(uint8_t *data, uint64_t len) { FstSlice fstSliceCopy(FstSlice *s, int32_t start, int32_t end) { FstString *str = s->str; str->ref++; - int32_t alen; //uint8_t *buf = fstSliceData(s, &alen); //start = buf + start - (buf - s->start); //end = buf + end - (buf - s->start); @@ -117,9 +116,10 @@ FstSlice fstSliceCopy(FstSlice *s, int32_t start, int32_t end) { } FstSlice fstSliceDeepCopy(FstSlice *s, int32_t start, int32_t end) { - int32_t alen, tlen = end - start + 1; - uint8_t *data = fstSliceData(s, &alen); - assert(tlen <= alen); + int32_t tlen = end - start + 1; + int32_t slen; + uint8_t *data = fstSliceData(s, &slen); + assert(tlen <= slen); uint8_t *buf = malloc(sizeof(uint8_t) * tlen); memcpy(buf, data + start, tlen); @@ -136,7 +136,7 @@ FstSlice fstSliceDeepCopy(FstSlice *s, int32_t start, int32_t end) { return ans; } bool fstSliceIsEmpty(FstSlice *s) { - return s->str == NULL || s->start < 0 || s->end < 0; + return s->str == NULL || s->str->len == 0 || s->start < 0 || s->end < 0; } uint8_t *fstSliceData(FstSlice *s, int32_t *size) { @@ -161,7 +161,6 @@ int fstSliceCompare(FstSlice *a, FstSlice *b) { uint8_t *aBuf = fstSliceData(a, &alen); uint8_t *bBuf = fstSliceData(b, &blen); - uint32_t i, j; for (i = 0, j = 0; i < alen && j < blen; i++, j++) { uint8_t x = aBuf[i]; From 4bab45c039be102094218eaccb5293d125ef06f7 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Thu, 2 Dec 2021 15:48:30 +0800 Subject: [PATCH 07/10] refactor builder struct --- source/libs/index/inc/index_fst.h | 6 +- .../index/inc/index_fst_counting_writer.h | 30 +++++++- .../index/src/index_fst_counting_writer.c | 76 +++++++++++++++++-- 3 files changed, 104 insertions(+), 8 deletions(-) diff --git a/source/libs/index/inc/index_fst.h b/source/libs/index/inc/index_fst.h index cd5073187c..96838d5843 100644 --- a/source/libs/index/inc/index_fst.h +++ b/source/libs/index/inc/index_fst.h @@ -86,14 +86,16 @@ uint64_t fstUnFinishedNodesFindCommPrefixAndSetOutput(FstUnFinishedNodes *node, typedef struct FstBuilder { FstCountingWriter *wrt; // The FST raw data is written directly to `wtr`. FstUnFinishedNodes *unfinished; // The stack of unfinished nodes - FstRegistry* registry; // A map of finished nodes. - FstSlice last; // The last word added + FstRegistry* registry; // A map of finished nodes. + FstSlice last; // The last word added CompiledAddr lastAddr; // The address of the last compiled node uint64_t len; // num of keys added } FstBuilder; FstBuilder *fstBuilderCreate(void *w, FstType ty); + + void fstBuilderDestroy(FstBuilder *b); void fstBuilderInsertOutput(FstBuilder *b, FstSlice bs, Output in); OrderType fstBuilderCheckLastKey(FstBuilder *b, FstSlice bs, bool ckDup); diff --git a/source/libs/index/inc/index_fst_counting_writer.h b/source/libs/index/inc/index_fst_counting_writer.h index 4650804034..42d77cacb0 100644 --- a/source/libs/index/inc/index_fst_counting_writer.h +++ b/source/libs/index/inc/index_fst_counting_writer.h @@ -16,6 +16,34 @@ #ifndef __INDEX_FST_COUNTING_WRITER_H__ #define __INDEX_FST_COUNTING_WRITER_H__ +#include "tfile.h" + + +#define DefaultMem 1024*1024 + +static char tmpFile[] = "/tmp/index"; +typedef enum WriterType {TMemory, TFile} WriterType; + +typedef struct WriterCtx { + int (*write)(struct WriterCtx *ctx, uint8_t *buf, int len); + int (*read)(struct WriterCtx *ctx, uint8_t *buf, int len); + int (*flush)(struct WriterCtx *ctx); + WriterType type; + union { + int fd; + void *mem; + }; + int32_t offset; + int32_t limit; +} WriterCtx; + +static int writeCtxDoWrite(WriterCtx *ctx, uint8_t *buf, int len); +static int writeCtxDoRead(WriterCtx *ctx, uint8_t *buf, int len); +static int writeCtxDoFlush(WriterCtx *ctx); + +WriterCtx* writerCtxCreate(WriterType type); +void writerCtxDestroy(WriterCtx *w); + typedef uint32_t CheckSummer; @@ -25,7 +53,7 @@ typedef struct FstCountingWriter { CheckSummer summer; } FstCountingWriter; -uint64_t fstCountingWriterWrite(FstCountingWriter *write, uint8_t *buf, uint32_t bufLen); +int fstCountingWriterWrite(FstCountingWriter *write, uint8_t *buf, uint32_t bufLen); int fstCountingWriterFlush(FstCountingWriter *write); diff --git a/source/libs/index/src/index_fst_counting_writer.c b/source/libs/index/src/index_fst_counting_writer.c index a0a2c380f1..f04f6eddad 100644 --- a/source/libs/index/src/index_fst_counting_writer.c +++ b/source/libs/index/src/index_fst_counting_writer.c @@ -16,24 +16,88 @@ #include "index_fst_util.h" #include "index_fst_counting_writer.h" +static int writeCtxDoWrite(WriterCtx *ctx, uint8_t *buf, int len) { + if (ctx->offset + len > ctx->limit) { + return -1; + } + + if (ctx->type == TFile) { + assert(len != tfWrite(ctx->fd, buf, len)); + } else { + memcpy(ctx->mem + ctx->offset, buf, len); + } + ctx->offset += len; + return len; +} +static int writeCtxDoRead(WriterCtx *ctx, uint8_t *buf, int len) { + if (ctx->type == TFile) { + tfRead(ctx->fd, buf, len); + } else { + memcpy(buf, ctx->mem + ctx->offset, len); + } + ctx->offset += len; + + return 1; +} +static int writeCtxDoFlush(WriterCtx *ctx) { + if (ctx->type == TFile) { + //tfFlush(ctx->fd); + } else { + // do nothing + } + return 1; +} + +WriterCtx* writerCtxCreate(WriterType type) { + WriterCtx *ctx = calloc(1, sizeof(WriterCtx)); + if (ctx == NULL) { return NULL; } + + ctx->type == type; + if (ctx->type == TFile) { + ctx->fd = tfOpenCreateWriteAppend(tmpFile); + } else if (ctx->type == TMemory) { + ctx->mem = calloc(1, DefaultMem * sizeof(uint8_t)); + } + ctx->write = writeCtxDoWrite; + ctx->read = writeCtxDoRead; + ctx->flush = writeCtxDoFlush; + + ctx->offset = 0; + ctx->limit = DefaultMem; + + return ctx; +} +void writerCtxDestroy(WriterCtx *ctx) { + if (ctx->type == TMemory) { + free(ctx->mem); + } else { + tfClose(ctx->fd); + } + free(ctx); +} + + FstCountingWriter *fstCountingWriterCreate(void *wrt) { FstCountingWriter *cw = calloc(1, sizeof(FstCountingWriter)); if (cw == NULL) { return NULL; } - - cw->wrt = wrt; + + cw->wrt = (void *)(writerCtxCreate(TMemory)); return cw; } void fstCountingWriterDestroy(FstCountingWriter *cw) { // free wrt object: close fd or free mem + writerCtxDestroy((WriterCtx *)(cw->wrt)); free(cw); } -uint64_t fstCountingWriterWrite(FstCountingWriter *write, uint8_t *buf, uint32_t bufLen) { +int fstCountingWriterWrite(FstCountingWriter *write, uint8_t *buf, uint32_t bufLen) { if (write == NULL) { return 0; } // update checksum // write data to file/socket or mem - - write->count += bufLen; + WriterCtx *ctx = write->wrt; + + int nWrite = ctx->write(ctx, buf, bufLen); + write->count += nWrite; return bufLen; } @@ -41,6 +105,8 @@ uint32_t fstCountingWriterMaskedCheckSum(FstCountingWriter *write) { return 0; } int fstCountingWriterFlush(FstCountingWriter *write) { + WriterCtx *ctx = write->wrt; + ctx->flush(ctx); //write->wtr->flush return 1; } From 5f0fd023cb157e7b1cd65edafe106acc3ebfcca0 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Fri, 3 Dec 2021 10:45:32 +0800 Subject: [PATCH 08/10] add test case --- source/libs/index/inc/index_fst.h | 9 ++ source/libs/index/inc/index_fst_automation.h | 8 +- source/libs/index/inc/index_fst_common.h | 9 ++ .../index/inc/index_fst_counting_writer.h | 8 ++ source/libs/index/inc/index_fst_node.h | 8 ++ source/libs/index/inc/index_fst_registry.h | 8 ++ source/libs/index/inc/index_fst_util.h | 7 ++ source/libs/index/src/index_fst.c | 8 +- .../index/src/index_fst_counting_writer.c | 2 +- source/libs/index/test/CMakeLists.txt | 1 - source/libs/index/test/indexTests.cpp | 119 ++++++++++-------- 11 files changed, 131 insertions(+), 56 deletions(-) diff --git a/source/libs/index/inc/index_fst.h b/source/libs/index/inc/index_fst.h index 96838d5843..eb288e0aa2 100644 --- a/source/libs/index/inc/index_fst.h +++ b/source/libs/index/inc/index_fst.h @@ -16,6 +16,9 @@ #ifndef __INDEX_FST_H__ #define __INDEX_FST_H__ +#ifdef __cplusplus +extern "C" { +#endif #include "tarray.h" #include "index_fst_util.h" @@ -98,6 +101,7 @@ FstBuilder *fstBuilderCreate(void *w, FstType ty); void fstBuilderDestroy(FstBuilder *b); void fstBuilderInsertOutput(FstBuilder *b, FstSlice bs, Output in); +bool fstBuilderInsert(FstBuilder *b, FstSlice bs, Output in); OrderType fstBuilderCheckLastKey(FstBuilder *b, FstSlice bs, bool ckDup); void fstBuilderCompileFrom(FstBuilder *b, uint64_t istate); CompiledAddr fstBuilderCompile(FstBuilder *b, FstBuilderNode *bn); @@ -324,4 +328,9 @@ FstStreamBuilder *fstStreamBuilderCreate(Fst *fst, Automation *aut); FstStreamBuilder *fstStreamBuilderRange(FstStreamBuilder *b, FstSlice *val, RangeType type); + +#ifdef __cplusplus +} +#endif + #endif diff --git a/source/libs/index/inc/index_fst_automation.h b/source/libs/index/inc/index_fst_automation.h index c9df9c219e..480d3110c4 100644 --- a/source/libs/index/inc/index_fst_automation.h +++ b/source/libs/index/inc/index_fst_automation.h @@ -15,6 +15,10 @@ #ifndef __INDEX_FST_AUTAOMATION_H__ #define __INDEX_FST_AUTAOMATION_H__ +#ifdef __cplusplus +extern "C" { +#endif + typedef struct AutomationCtx AutomationCtx; typedef struct StartWith { @@ -42,6 +46,8 @@ typedef struct Automation { void *data; } Automation; - +#ifdef __cplusplus +} +#endif #endif diff --git a/source/libs/index/inc/index_fst_common.h b/source/libs/index/inc/index_fst_common.h index b261f4090c..9c802faa33 100644 --- a/source/libs/index/inc/index_fst_common.h +++ b/source/libs/index/inc/index_fst_common.h @@ -1,7 +1,16 @@ #ifndef __INDEX_FST_COMM_H__ #define __INDEX_FST_COMM_H__ + extern const uint8_t COMMON_INPUTS[]; extern char const COMMON_INPUTS_INV[]; +#ifdef __cplusplus +extern "C" { +#endif + +#ifdef __cplusplus +} +#endif + #endif diff --git a/source/libs/index/inc/index_fst_counting_writer.h b/source/libs/index/inc/index_fst_counting_writer.h index 42d77cacb0..9280461780 100644 --- a/source/libs/index/inc/index_fst_counting_writer.h +++ b/source/libs/index/inc/index_fst_counting_writer.h @@ -16,6 +16,10 @@ #ifndef __INDEX_FST_COUNTING_WRITER_H__ #define __INDEX_FST_COUNTING_WRITER_H__ +#ifdef __cplusplus +extern "C" { +#endif + #include "tfile.h" @@ -72,6 +76,10 @@ uint8_t fstCountingWriterPackUint(FstCountingWriter *writer, uint64_t n); #define FST_WRITER_INTER_WRITER(writer) (writer->wtr) #define FST_WRITE_CHECK_SUMMER(writer) (writer->summer) +#ifdef __cplusplus +} +#endif + #endif diff --git a/source/libs/index/inc/index_fst_node.h b/source/libs/index/inc/index_fst_node.h index 0645aa1158..d1b45a16ab 100644 --- a/source/libs/index/inc/index_fst_node.h +++ b/source/libs/index/inc/index_fst_node.h @@ -16,6 +16,10 @@ #ifndef __INDEX_FST_NODE_H__ #define __INDEX_FST_NODE_H__ +#ifdef __cplusplus +extern "C" { +#endif + #include "index_fst_util.h" #include "index_fst_counting_writer.h" @@ -45,4 +49,8 @@ void fstBuilderNodeCloneFrom(FstBuilderNode *dst, FstBuilderNode *src); void fstBuilderNodeDestroy(FstBuilderNode *node); +#ifdef __cplusplus +} +#endif + #endif diff --git a/source/libs/index/inc/index_fst_registry.h b/source/libs/index/inc/index_fst_registry.h index 2504d7ccff..1d89e57e52 100644 --- a/source/libs/index/inc/index_fst_registry.h +++ b/source/libs/index/inc/index_fst_registry.h @@ -15,6 +15,10 @@ #ifndef __FST_REGISTRY_H__ #define __FST_REGISTRY_H__ +#ifdef __cplusplus +extern "C" { +#endif + #include "index_fst_util.h" #include "tarray.h" #include "index_fst_node.h" @@ -59,4 +63,8 @@ void fstRegistryDestroy(FstRegistry *registry); FstRegistryEntry* fstRegistryGetEntry(FstRegistry *registry, FstBuilderNode *bNode); void fstRegistryEntryDestroy(FstRegistryEntry *entry); +#ifdef __cplusplus +} +#endif + #endif diff --git a/source/libs/index/inc/index_fst_util.h b/source/libs/index/inc/index_fst_util.h index d87e5bd57e..4af885816f 100644 --- a/source/libs/index/inc/index_fst_util.h +++ b/source/libs/index/inc/index_fst_util.h @@ -17,6 +17,10 @@ #ifndef __INDEX_FST_UTIL_H__ #define __INDEX_FST_UTIL_H__ +#ifdef __cplusplus +extern "C" { +#endif + #include "tarray.h" #include "index_fst_common.h" @@ -89,5 +93,8 @@ uint8_t *fstSliceData(FstSlice *s, int32_t *sz); #define FST_SLICE_LEN(s) (s->end - s->start + 1) +#ifdef __cplusplus +} +#endif #endif diff --git a/source/libs/index/src/index_fst.c b/source/libs/index/src/index_fst.c index 68d2011b8f..9e5c1f5d78 100644 --- a/source/libs/index/src/index_fst.c +++ b/source/libs/index/src/index_fst.c @@ -52,7 +52,7 @@ void fstUnFinishedNodesPushEmpty(FstUnFinishedNodes *nodes, bool isFinal) { FstBuilderNode *node = malloc(sizeof(FstBuilderNode)); node->isFinal = isFinal; node->finalOutput = 0; - node->trans = NULL; + node->trans = taosArrayInit(16, sizeof(FstTransition)); FstBuilderNodeUnfinished un = {.node = node, .last = NULL}; taosArrayPush(nodes->stack, &un); @@ -112,7 +112,7 @@ void fstUnFinishedNodesAddSuffix(FstUnFinishedNodes *nodes, FstSlice bs, Output FstBuilderNode *n = malloc(sizeof(FstBuilderNode)); n->isFinal = false; n->finalOutput = 0; - n->trans = NULL; + n->trans = taosArrayInit(16, sizeof(FstTransition)); //FstLastTransition *trn = malloc(sizeof(FstLastTransition)); //trn->inp = s->data[i]; @@ -806,13 +806,13 @@ void fstBuilderInsertOutput(FstBuilder *b, FstSlice bs, Output in) { fstUnFinishedNodesSetRootOutput(b->unfinished, in); return; } - Output out; //if (in != 0) { //if let Some(in) = in // prefixLen = fstUnFinishedNodesFindCommPrefixAndSetOutput(b->unfinished, bs, in, &out); //} else { // prefixLen = fstUnFinishedNodesFindCommPrefix(b->unfinished, bs); // out = 0; //} + Output out; uint64_t prefixLen = fstUnFinishedNodesFindCommPrefixAndSetOutput(b->unfinished, bs, in, &out); if (prefixLen == FST_SLICE_LEN(s)) { @@ -857,7 +857,7 @@ void fstBuilderCompileFrom(FstBuilder *b, uint64_t istate) { } addr = fstBuilderCompile(b, n); assert(addr != NONE_ADDRESS); - fstBuilderNodeDestroy(n); + //fstBuilderNodeDestroy(n); } fstUnFinishedNodesTopLastFreeze(b->unfinished, addr); return; diff --git a/source/libs/index/src/index_fst_counting_writer.c b/source/libs/index/src/index_fst_counting_writer.c index f04f6eddad..1989a5a425 100644 --- a/source/libs/index/src/index_fst_counting_writer.c +++ b/source/libs/index/src/index_fst_counting_writer.c @@ -81,7 +81,7 @@ FstCountingWriter *fstCountingWriterCreate(void *wrt) { FstCountingWriter *cw = calloc(1, sizeof(FstCountingWriter)); if (cw == NULL) { return NULL; } - cw->wrt = (void *)(writerCtxCreate(TMemory)); + cw->wrt = (void *)(writerCtxCreate(TFile)); return cw; } void fstCountingWriterDestroy(FstCountingWriter *cw) { diff --git a/source/libs/index/test/CMakeLists.txt b/source/libs/index/test/CMakeLists.txt index f2a7442a5a..f84f874a23 100644 --- a/source/libs/index/test/CMakeLists.txt +++ b/source/libs/index/test/CMakeLists.txt @@ -1,7 +1,6 @@ add_executable(indexTest "") target_sources(indexTest PRIVATE - "../src/index.c" "indexTests.cpp" ) target_include_directories ( indexTest diff --git a/source/libs/index/test/indexTests.cpp b/source/libs/index/test/indexTests.cpp index 857060ce5e..b94a241489 100644 --- a/source/libs/index/test/indexTests.cpp +++ b/source/libs/index/test/indexTests.cpp @@ -3,58 +3,79 @@ #include #include "index.h" #include "indexInt.h" +#include "index_fst.h" +#include "index_fst_util.h" +#include "index_fst_counting_writer.h" -TEST(IndexTest, index_create_test) { - SIndexOpts *opts = indexOptsCreate(); - SIndex *index = indexOpen(opts, "./test"); - if (index == NULL) { - std::cout << "index open failed" << std::endl; - } +//TEST(IndexTest, index_create_test) { +// SIndexOpts *opts = indexOptsCreate(); +// SIndex *index = indexOpen(opts, "./test"); +// if (index == NULL) { +// std::cout << "index open failed" << std::endl; +// } +// +// +// // write +// for (int i = 0; i < 100000; i++) { +// SIndexMultiTerm* terms = indexMultiTermCreate(); +// std::string val = "field"; +// +// indexMultiTermAdd(terms, "tag1", strlen("tag1"), val.c_str(), val.size()); +// +// val.append(std::to_string(i)); +// indexMultiTermAdd(terms, "tag2", strlen("tag2"), val.c_str(), val.size()); +// +// val.insert(0, std::to_string(i)); +// indexMultiTermAdd(terms, "tag3", strlen("tag3"), val.c_str(), val.size()); +// +// val.append("const"); +// indexMultiTermAdd(terms, "tag4", strlen("tag4"), val.c_str(), val.size()); +// +// +// indexPut(index, terms, i); +// indexMultiTermDestroy(terms); +// } +// +// +// // query +// SIndexMultiTermQuery *multiQuery = indexMultiTermQueryCreate(MUST); +// +// indexMultiTermQueryAdd(multiQuery, "tag1", strlen("tag1"), "field", strlen("field"), QUERY_PREFIX); +// indexMultiTermQueryAdd(multiQuery, "tag3", strlen("tag3"), "0field0", strlen("0field0"), QUERY_TERM); +// +// SArray *result = (SArray *)taosArrayInit(10, sizeof(int)); +// indexSearch(index, multiQuery, result); +// +// std::cout << "taos'size : " << taosArrayGetSize(result) << std::endl; +// for (int i = 0; i < taosArrayGetSize(result); i++) { +// int *v = (int *)taosArrayGet(result, i); +// std::cout << "value --->" << *v << std::endl; +// } +// // add more test case +// indexMultiTermQueryDestroy(multiQuery); +// +// indexOptsDestroy(opts); +// indexClose(index); +// // +//} - - // write - for (int i = 0; i < 100000; i++) { - SIndexMultiTerm* terms = indexMultiTermCreate(); - std::string val = "field"; - - indexMultiTermAdd(terms, "tag1", strlen("tag1"), val.c_str(), val.size()); - - val.append(std::to_string(i)); - indexMultiTermAdd(terms, "tag2", strlen("tag2"), val.c_str(), val.size()); - - val.insert(0, std::to_string(i)); - indexMultiTermAdd(terms, "tag3", strlen("tag3"), val.c_str(), val.size()); - - val.append("const"); - indexMultiTermAdd(terms, "tag4", strlen("tag4"), val.c_str(), val.size()); - - - indexPut(index, terms, i); - indexMultiTermDestroy(terms); - } - - - // query - SIndexMultiTermQuery *multiQuery = indexMultiTermQueryCreate(MUST); - - indexMultiTermQueryAdd(multiQuery, "tag1", strlen("tag1"), "field", strlen("field"), QUERY_PREFIX); - indexMultiTermQueryAdd(multiQuery, "tag3", strlen("tag3"), "0field0", strlen("0field0"), QUERY_TERM); - - SArray *result = (SArray *)taosArrayInit(10, sizeof(int)); - indexSearch(index, multiQuery, result); - - std::cout << "taos'size : " << taosArrayGetSize(result) << std::endl; - for (int i = 0; i < taosArrayGetSize(result); i++) { - int *v = (int *)taosArrayGet(result, i); - std::cout << "value --->" << *v << std::endl; - } - // add more test case - indexMultiTermQueryDestroy(multiQuery); - - indexOptsDestroy(opts); - indexClose(index); - // +int main(int argc, char** argv) { + std::string str("Hello world"); + FstSlice key = fstSliceCreate((uint8_t *)str.c_str(), str.size()); + Output val = 10; + FstBuilder *b = fstBuilderCreate(NULL, 1); + fstBuilderInsert(b, key, val); + fstBuilderFinish(b); + fstBuilderDestroy(b); + fstSliceDestroy(&key); + return 1; } + +//TEST(IndexFstBuilder, IndexFstInput) { +// +//} + + From 0f682ca20a2d162101f0bacb4f4e89632ae7d03a Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Fri, 3 Dec 2021 14:16:32 +0800 Subject: [PATCH 09/10] refactor builder struct --- source/libs/index/inc/index_fst_node.h | 1 + source/libs/index/src/index_fst.c | 1 - source/libs/index/src/index_fst_node.c | 32 +++++++++++++++++++--- source/libs/index/src/index_fst_registry.c | 8 +++--- source/libs/index/test/indexTests.cpp | 7 ++++- 5 files changed, 39 insertions(+), 10 deletions(-) diff --git a/source/libs/index/inc/index_fst_node.h b/source/libs/index/inc/index_fst_node.h index d1b45a16ab..87eb7cb746 100644 --- a/source/libs/index/inc/index_fst_node.h +++ b/source/libs/index/inc/index_fst_node.h @@ -46,6 +46,7 @@ FstBuilderNode *fstBuilderNodeClone(FstBuilderNode *src); void fstBuilderNodeCloneFrom(FstBuilderNode *dst, FstBuilderNode *src); //bool fstBuilderNodeCompileTo(FstBuilderNode *b, FstCountingWriter *wrt, CompiledAddr lastAddr, CompiledAddr startAddr); +bool fstBuilderNodeEqual(FstBuilderNode *n1, FstBuilderNode *n2); void fstBuilderNodeDestroy(FstBuilderNode *node); diff --git a/source/libs/index/src/index_fst.c b/source/libs/index/src/index_fst.c index 9e5c1f5d78..b2d21cd0b0 100644 --- a/source/libs/index/src/index_fst.c +++ b/source/libs/index/src/index_fst.c @@ -167,7 +167,6 @@ uint64_t fstUnFinishedNodesFindCommPrefixAndSetOutput(FstUnFinishedNodes *node, } if (addPrefix != 0) { fstBuilderNodeUnfinishedAddOutputPrefix(un, addPrefix); - } } return i; diff --git a/source/libs/index/src/index_fst_node.c b/source/libs/index/src/index_fst_node.c index b33b8e4428..af69f98f32 100644 --- a/source/libs/index/src/index_fst_node.c +++ b/source/libs/index/src/index_fst_node.c @@ -18,7 +18,7 @@ FstBuilderNode *fstBuilderNodeDefault() { FstBuilderNode *bn = malloc(sizeof(FstBuilderNode)); bn->isFinal = false; bn->finalOutput = 0; - bn->trans = NULL; + bn->trans = taosArrayInit(16, sizeof(FstTransition)); return bn; } void fstBuilderNodeDestroy(FstBuilderNode *node) { @@ -27,6 +27,25 @@ void fstBuilderNodeDestroy(FstBuilderNode *node) { taosArrayDestroy(node->trans); free(node); } + +bool fstBuilderNodeEqual(FstBuilderNode *n1, FstBuilderNode *n2) { + if (n1 == n2) { return true; } + + if (n1->isFinal != n2->isFinal || + n1->finalOutput != n2->finalOutput || + taosArrayGetSize(n1->trans) != taosArrayGetSize(n2->trans)) { + return false; + } + size_t sz = taosArrayGetSize(n1->trans); + for (size_t i = 0; i < sz; i++) { + FstTransition *t1 = taosArrayGet(n1->trans, i); + FstTransition *t2 = taosArrayGet(n2->trans, i); + if (t1->inp != t2->inp || t1->out != t2->out || t1->addr != t2->addr) { + return false; + } + } + return true; +} FstBuilderNode *fstBuilderNodeClone(FstBuilderNode *src) { FstBuilderNode *node = malloc(sizeof(FstBuilderNode)); if (node == NULL) { return NULL; } @@ -53,12 +72,17 @@ void fstBuilderNodeCloneFrom(FstBuilderNode *dst, FstBuilderNode *src) { dst->isFinal = src->isFinal; dst->finalOutput = src->finalOutput; - // avoid mem leak + //release free avoid mem leak taosArrayDestroy(dst->trans); - dst->trans = src->trans; - src->trans = NULL; + size_t sz = taosArrayGetSize(src->trans); + dst->trans = taosArrayInit(sz, sizeof(FstTransition)); + for (size_t i = 0; i < sz; i++) { + FstTransition *trn = taosArrayGet(src->trans, i); + taosArrayPush(dst->trans, trn); + } } + //bool fstBuilderNodeCompileTo(FstBuilderNode *b, FstCountingWriter *wrt, CompiledAddr lastAddr, CompiledAddr startAddr) { //size_t sz = taosArrayGetSize(b->trans); diff --git a/source/libs/index/src/index_fst_registry.c b/source/libs/index/src/index_fst_registry.c index b25964e0e7..8fb0dbfcaa 100644 --- a/source/libs/index/src/index_fst_registry.c +++ b/source/libs/index/src/index_fst_registry.c @@ -112,7 +112,7 @@ FstRegistryEntry *fstRegistryGetEntry(FstRegistry *registry, FstBuilderNode *bNo if (end - start == 1) { FstRegistryCell *cell = taosArrayGet(registry->table, start); //cell->isNode && - if (cell->addr != NONE_ADDRESS && cell->node == bNode) { + if (cell->addr != NONE_ADDRESS && fstBuilderNodeEqual(cell->node, bNode)) { entry->state = FOUND; entry->addr = cell->addr ; return entry; @@ -123,13 +123,13 @@ FstRegistryEntry *fstRegistryGetEntry(FstRegistry *registry, FstBuilderNode *bNo } } else if (end - start == 2) { FstRegistryCell *cell1 = taosArrayGet(registry->table, start); - if (cell1->addr != NONE_ADDRESS && cell1->node == bNode) { + if (cell1->addr != NONE_ADDRESS && fstBuilderNodeEqual(cell1->node, bNode)) { entry->state = FOUND; entry->addr = cell1->addr; return entry; } FstRegistryCell *cell2 = taosArrayGet(registry->table, start + 1); - if (cell2->addr != NONE_ADDRESS && cell2->node == bNode) { + if (cell2->addr != NONE_ADDRESS && fstBuilderNodeEqual(cell2->node, bNode)) { entry->state = FOUND; entry->addr = cell2->addr; // must swap here @@ -147,7 +147,7 @@ FstRegistryEntry *fstRegistryGetEntry(FstRegistry *registry, FstBuilderNode *bNo uint32_t i = start; for (; i < end; i++) { FstRegistryCell *cell = (FstRegistryCell *)taosArrayGet(registry->table, i); - if (cell->addr != NONE_ADDRESS && cell->node == bNode) { + if (cell->addr != NONE_ADDRESS && fstBuilderNodeEqual(cell->node, bNode)) { entry->state = FOUND; entry->addr = cell->addr; fstRegistryCellPromote(registry->table, i, start); diff --git a/source/libs/index/test/indexTests.cpp b/source/libs/index/test/indexTests.cpp index b94a241489..9f3d1b962e 100644 --- a/source/libs/index/test/indexTests.cpp +++ b/source/libs/index/test/indexTests.cpp @@ -63,11 +63,16 @@ //} int main(int argc, char** argv) { - std::string str("Hello world"); + std::string str("abc"); FstSlice key = fstSliceCreate((uint8_t *)str.c_str(), str.size()); Output val = 10; + + std::string str1("bcd"); + FstSlice key1 = fstSliceCreate((uint8_t *)str1.c_str(), str1.size()); + Output val2 = 10; FstBuilder *b = fstBuilderCreate(NULL, 1); fstBuilderInsert(b, key, val); + fstBuilderInsert(b, key1, val2); fstBuilderFinish(b); fstBuilderDestroy(b); fstSliceDestroy(&key); From 6de9692b7d759d85fadcd2001cbcae748129ee58 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Fri, 3 Dec 2021 14:28:54 +0800 Subject: [PATCH 10/10] refactor wal --- include/libs/wal/wal.h | 28 +++++--- include/util/tfile.h | 1 + source/libs/wal/inc/walInt.h | 4 +- source/libs/wal/src/walIndex.c | 125 ++++++++++++++++----------------- source/libs/wal/src/walMgmt.c | 66 +++++++---------- source/libs/wal/src/walWrite.c | 113 ++++++++++++++++++++++++++++- source/util/src/tfile.c | 5 ++ 7 files changed, 225 insertions(+), 117 deletions(-) diff --git a/include/libs/wal/wal.h b/include/libs/wal/wal.h index e77540bf90..b514648bbd 100644 --- a/include/libs/wal/wal.h +++ b/include/libs/wal/wal.h @@ -39,6 +39,14 @@ typedef enum { } EWalType; typedef struct { + //union { + //uint32_t info; + //struct { + //uint32_t sver:3; + //uint32_t msgtype: 5; + //uint32_t reserved : 24; + //}; + //}; int8_t sver; uint8_t msgType; int8_t reserved[2]; @@ -71,13 +79,17 @@ typedef struct { #define WAL_FILESET_MAX 128 #define WAL_IDX_ENTRY_SIZE (sizeof(int64_t)*2) -#define WAL_CUR_POS_READ_ONLY 1 -#define WAL_CUR_FILE_READ_ONLY 2 +#define WAL_CUR_POS_WRITABLE 1 +#define WAL_CUR_FILE_WRITABLE 2 +#define WAL_CUR_FAILED 4 typedef struct SWal { // cfg int32_t vgId; int32_t fsyncPeriod; // millisecond + int32_t fsyncSeq; + int32_t rollPeriod; // second + int64_t segSize; EWalType level; //reference int64_t refId; @@ -86,7 +98,7 @@ typedef struct SWal { int64_t curIdxTfd; //current version int64_t curVersion; - int64_t curOffset; + int64_t curLogOffset; //current file version int64_t curFileFirstVersion; int64_t curFileLastVersion; @@ -94,8 +106,10 @@ typedef struct SWal { int64_t firstVersion; int64_t snapshotVersion; int64_t lastVersion; - //fsync status - int32_t fsyncSeq; + int64_t lastFileName; + //roll status + int64_t lastRollSeq; + int64_t lastFileWriteSize; //ctl int32_t curStatus; pthread_mutex_t mutex; @@ -119,12 +133,10 @@ int32_t walAlter(SWal *, SWalCfg *pCfg); void walClose(SWal *); // write -//int64_t walWriteWithMsgType(SWal*, int8_t msgType, void* body, int32_t bodyLen); int64_t walWrite(SWal *, int64_t index, uint8_t msgType, void *body, int32_t bodyLen); -//int64_t walWriteBatch(SWal *, void **bodies, int32_t *bodyLen, int32_t batchSize); +void walFsync(SWal *, bool force); // apis for lifecycle management -void walFsync(SWal *, bool force); int32_t walCommit(SWal *, int64_t ver); // truncate after int32_t walRollback(SWal *, int64_t ver); diff --git a/include/util/tfile.h b/include/util/tfile.h index ff62c9e341..3d0e2177ac 100644 --- a/include/util/tfile.h +++ b/include/util/tfile.h @@ -28,6 +28,7 @@ void tfCleanup(); // the same syntax as UNIX standard open/close/read/write // but FD is int64_t and will never be reused +int64_t tfOpenRead(const char *pathname); int64_t tfOpenReadWrite(const char *pathname); int64_t tfOpenCreateWrite(const char *pathname); int64_t tfOpenCreateWriteAppend(const char *pathname); diff --git a/source/libs/wal/inc/walInt.h b/source/libs/wal/inc/walInt.h index 42ede49c6b..285d7e2576 100644 --- a/source/libs/wal/inc/walInt.h +++ b/source/libs/wal/inc/walInt.h @@ -17,14 +17,16 @@ #define _TD_WAL_INT_H_ #include "wal.h" +#include "compare.h" #ifdef __cplusplus extern "C" { #endif -int walRotate(SWal* pWal); int walGetFile(SWal* pWal, int32_t version); +int64_t walGetSeq(); + #ifdef __cplusplus } #endif diff --git a/source/libs/wal/src/walIndex.c b/source/libs/wal/src/walIndex.c index 2569af841f..1aa64b34b5 100644 --- a/source/libs/wal/src/walIndex.c +++ b/source/libs/wal/src/walIndex.c @@ -20,9 +20,38 @@ #include "tfile.h" #include "walInt.h" -int walSeekVerImpl(SWal *pWal, int64_t ver) { - //close old file +static int walSeekFilePos(SWal* pWal, int64_t ver) { int code = 0; + + int64_t idxTfd = pWal->curIdxTfd; + int64_t logTfd = pWal->curLogTfd; + + //seek position + int64_t offset = (ver - pWal->curFileFirstVersion) * WAL_IDX_ENTRY_SIZE; + code = tfLseek(idxTfd, offset, SEEK_SET); + if(code != 0) { + + } + int64_t readBuf[2]; + code = tfRead(idxTfd, readBuf, sizeof(readBuf)); + if(code != 0) { + + } + //TODO:deserialize + ASSERT(readBuf[0] == ver); + code = tfLseek(logTfd, readBuf[1], SEEK_CUR); + if (code != 0) { + + } + pWal->curLogOffset = readBuf[1]; + pWal->curVersion = ver; + return code; +} + +static int walChangeFile(SWal *pWal, int64_t ver) { + int code = 0; + int64_t idxTfd, logTfd; + char fnameStr[WAL_FILE_LEN]; code = tfClose(pWal->curLogTfd); if(code != 0) { //TODO @@ -32,29 +61,36 @@ int walSeekVerImpl(SWal *pWal, int64_t ver) { //TODO } //bsearch in fileSet - int fName = 0;//TODO - //open the right file - char fNameStr[WAL_FILE_LEN]; - sprintf(fNameStr, "%d."WAL_INDEX_SUFFIX, fName); - bool closed = 1; //TODO:read only - int64_t idxTfd = tfOpenReadWrite(fNameStr); - sprintf(fNameStr, "%d."WAL_LOG_SUFFIX, fName); - int64_t logTfd = tfOpenReadWrite(fNameStr); - //seek position - int64_t offset = (ver - fName) * WAL_IDX_ENTRY_SIZE; - tfLseek(idxTfd, offset, SEEK_SET); - //set cur version, cur file version and cur status - pWal->curFileFirstVersion = fName; - pWal->curFileLastVersion = 1;//TODO + int64_t* pRet = taosArraySearch(pWal->fileSet, &ver, compareInt64Val, TD_LE); + ASSERT(pRet != NULL); + int64_t fname = *pRet; + if(fname < pWal->lastFileName) { + pWal->curStatus &= ~WAL_CUR_FILE_WRITABLE; + pWal->curFileLastVersion = pRet[1]-1; + sprintf(fnameStr, "%"PRId64"."WAL_INDEX_SUFFIX, fname); + idxTfd = tfOpenRead(fnameStr); + sprintf(fnameStr, "%"PRId64"."WAL_LOG_SUFFIX, fname); + logTfd = tfOpenRead(fnameStr); + } else { + pWal->curStatus |= WAL_CUR_FILE_WRITABLE; + pWal->curFileLastVersion = -1; + sprintf(fnameStr, "%"PRId64"."WAL_INDEX_SUFFIX, fname); + idxTfd = tfOpenReadWrite(fnameStr); + sprintf(fnameStr, "%"PRId64"."WAL_LOG_SUFFIX, fname); + logTfd = tfOpenReadWrite(fnameStr); + } + + pWal->curFileFirstVersion = fname; pWal->curLogTfd = logTfd; pWal->curIdxTfd = idxTfd; - pWal->curVersion = ver; - pWal->curOffset = offset; - pWal->curStatus = 0;//TODO return code; } int walSeekVer(SWal *pWal, int64_t ver) { + if((!(pWal->curStatus & WAL_CUR_FAILED)) + && ver == pWal->curVersion) { + return 0; + } if(ver > pWal->lastVersion) { //TODO: some records are skipped return -1; @@ -64,54 +100,13 @@ int walSeekVer(SWal *pWal, int64_t ver) { return -1; } if(ver < pWal->snapshotVersion) { - //TODO: seek snapshotted log + //TODO: seek snapshotted log, invalid in some cases } - if(ver >= pWal->curFileFirstVersion - && ((pWal->curFileLastVersion == -1 && ver <= pWal->lastVersion) || (ver <= pWal->curFileLastVersion))) { - - } - if(ver < pWal->curFileFirstVersion || (pWal->curFileLastVersion != -1 && ver > pWal->curFileLastVersion)) { - int index = 0; - index = 1; - //back up to avoid inconsistency - int64_t curVersion = pWal->curVersion; - int64_t curOffset = pWal->curOffset; - int64_t curFileFirstVersion = pWal->curFileFirstVersion; - int64_t curFileLastVersion = pWal->curFileLastVersion; - if(walSeekVerImpl(pWal, ver) < 0) { - //TODO: errno - pWal->curVersion = curVersion; - pWal->curOffset = curOffset; - pWal->curFileFirstVersion = curFileFirstVersion; - pWal->curFileLastVersion = curFileLastVersion; - return -1; - } + if(ver < pWal->curFileFirstVersion || + (pWal->curFileLastVersion != -1 && ver > pWal->curFileLastVersion)) { + walChangeFile(pWal, ver); } + walSeekFilePos(pWal, ver); return 0; } - -int walWriteIndex(SWal *pWal, int64_t ver, int64_t offset) { - int code = 0; - //get index file - if(!tfValid(pWal->curIdxTfd)) { - code = TAOS_SYSTEM_ERROR(errno); - wError("vgId:%d, file:%"PRId64".idx, failed to open since %s", pWal->vgId, pWal->curFileFirstVersion, strerror(errno)); - } - if(pWal->curVersion != ver) { - if(walSeekVer(pWal, ver) != 0) { - //TODO: some records are skipped - return -1; - } - } - //check file checksum - //append index - return 0; -} - -int walRotateIndex(SWal *pWal) { - //check file checksum - //create new file - //switch file - return 0; -} diff --git a/source/libs/wal/src/walMgmt.c b/source/libs/wal/src/walMgmt.c index d60cdfe118..bc2e687069 100644 --- a/source/libs/wal/src/walMgmt.c +++ b/source/libs/wal/src/walMgmt.c @@ -26,57 +26,44 @@ int32_t walGetNextFile(SWal *pWal, int64_t *nextFileId); int32_t walGetOldFile(SWal *pWal, int64_t curFileId, int32_t minDiff, int64_t *oldFileId); int32_t walGetNewFile(SWal *pWal, int64_t *newFileId); -static pthread_mutex_t walInitLock = PTHREAD_MUTEX_INITIALIZER; -static int8_t walInited = 0; - typedef struct { int32_t refSetId; - int32_t seq; + uint32_t seq; int8_t stop; + int8_t inited; pthread_t thread; - pthread_mutex_t mutex; } SWalMgmt; -static SWalMgmt tsWal = {0}; +static SWalMgmt tsWal = {0, .seq = 1}; static int32_t walCreateThread(); static void walStopThread(); static int32_t walInitObj(SWal *pWal); static void walFreeObj(void *pWal); -int32_t walInit() { - //TODO: change to atomic - pthread_mutex_lock(&walInitLock); - if(walInited) { - pthread_mutex_unlock(&walInitLock); - return 0; - } else { - walInited = 1; - pthread_mutex_unlock(&walInitLock); - } +int64_t walGetSeq() { + return (int64_t)atomic_load_32(&tsWal.seq); +} + +int32_t walInit() { + int8_t old = atomic_val_compare_exchange_8(&tsWal.inited, 0, 1); + if(old == 1) return 0; - int32_t code = 0; tsWal.refSetId = taosOpenRef(TSDB_MIN_VNODES, walFreeObj); - code = pthread_mutex_init(&tsWal.mutex, NULL); - if (code != 0) { - wError("failed to init wal mutex since %s", tstrerror(code)); - return code; - } - - code = walCreateThread(); + int code = walCreateThread(); if (code != 0) { wError("failed to init wal module since %s", tstrerror(code)); + atomic_store_8(&tsWal.inited, 0); return code; } wInfo("wal module is initialized, rsetId:%d", tsWal.refSetId); - return code; + return 0; } void walCleanUp() { walStopThread(); taosCloseRef(tsWal.refSetId); - pthread_mutex_destroy(&tsWal.mutex); wInfo("wal module is cleaned up"); } @@ -92,7 +79,7 @@ static int walLoadFileset(SWal *pWal) { char *name = ent->d_name; name[WAL_NOSUFFIX_LEN] = 0; //validate file name by regex matching - if(1 /* regex match */) { + if(1 /* TODO:regex match */) { int64_t fnameInt64 = atoll(name); taosArrayPush(pWal->fileSet, &fnameInt64); } @@ -133,6 +120,7 @@ SWal *walOpen(const char *path, SWalCfg *pCfg) { walFreeObj(pWal); return NULL; } + walLoadFileset(pWal); wDebug("vgId:%d, wal:%p is opened, level:%d fsyncPeriod:%d", pWal->vgId, pWal, pWal->level, pWal->fsyncPeriod); @@ -164,6 +152,9 @@ void walClose(SWal *pWal) { pthread_mutex_lock(&pWal->mutex); tfClose(pWal->curLogTfd); + tfClose(pWal->curIdxTfd); + taosArrayDestroy(pWal->fileSet); + pWal->fileSet = NULL; pthread_mutex_unlock(&pWal->mutex); taosRemoveRef(tsWal.refSetId, pWal->refId); } @@ -188,6 +179,9 @@ static void walFreeObj(void *wal) { wDebug("vgId:%d, wal:%p is freed", pWal->vgId, pWal); tfClose(pWal->curLogTfd); + tfClose(pWal->curIdxTfd); + taosArrayDestroy(pWal->fileSet); + pWal->fileSet = NULL; pthread_mutex_destroy(&pWal->mutex); tfree(pWal); } @@ -197,7 +191,7 @@ static bool walNeedFsync(SWal *pWal) { return false; } - if (tsWal.seq % pWal->fsyncSeq == 0) { + if (atomic_load_32(&tsWal.seq) % pWal->fsyncSeq == 0) { return true; } @@ -206,16 +200,14 @@ static bool walNeedFsync(SWal *pWal) { static void walUpdateSeq() { taosMsleep(WAL_REFRESH_MS); - if (++tsWal.seq <= 0) { - tsWal.seq = 1; - } + atomic_add_fetch_32(&tsWal.seq, 1); } static void walFsyncAll() { SWal *pWal = taosIterateRef(tsWal.refSetId, 0); while (pWal) { if (walNeedFsync(pWal)) { - wTrace("vgId:%d, do fsync, level:%d seq:%d rseq:%d", pWal->vgId, pWal->level, pWal->fsyncSeq, tsWal.seq); + wTrace("vgId:%d, do fsync, level:%d seq:%d rseq:%d", pWal->vgId, pWal->level, pWal->fsyncSeq, atomic_load_32(&tsWal.seq)); int32_t code = tfFsync(pWal->curLogTfd); if (code != 0) { wError("vgId:%d, file:%"PRId64".log, failed to fsync since %s", pWal->vgId, pWal->curFileFirstVersion, strerror(code)); @@ -226,16 +218,12 @@ static void walFsyncAll() { } static void *walThreadFunc(void *param) { - int stop = 0; setThreadName("wal"); while (1) { walUpdateSeq(); walFsyncAll(); - pthread_mutex_lock(&tsWal.mutex); - stop = tsWal.stop; - pthread_mutex_unlock(&tsWal.mutex); - if (stop) break; + if (atomic_load_8(&tsWal.stop)) break; } return NULL; @@ -258,9 +246,7 @@ static int32_t walCreateThread() { } static void walStopThread() { - pthread_mutex_lock(&tsWal.mutex); - tsWal.stop = 1; - pthread_mutex_unlock(&tsWal.mutex); + atomic_store_8(&tsWal.stop, 1); if (taosCheckPthreadValid(tsWal.thread)) { pthread_join(tsWal.thread, NULL); diff --git a/source/libs/wal/src/walWrite.c b/source/libs/wal/src/walWrite.c index 7563ec02c7..69c83a9912 100644 --- a/source/libs/wal/src/walWrite.c +++ b/source/libs/wal/src/walWrite.c @@ -26,10 +26,24 @@ int32_t walCommit(SWal *pWal, int64_t ver) { } int32_t walRollback(SWal *pWal, int64_t ver) { + //TODO: ftruncate return 0; } int32_t walTakeSnapshot(SWal *pWal, int64_t ver) { + pWal->snapshotVersion = ver; + + //mark files safe to delete + int64_t* pRet = taosArraySearch(pWal->fileSet, &ver, compareInt64Val, TD_LE); + if(pRet != pWal->fileSet->pData) { + //delete files until less than retention size + + //find first file that exceeds retention time + + } + + //delete files living longer than retention limit + //remove file from fileset return 0; } @@ -124,13 +138,102 @@ void walRemoveAllOldFiles(void *handle) { } #endif +static int walRoll(SWal *pWal) { + int code = 0; + code = tfClose(pWal->curIdxTfd); + if(code != 0) { + return code; + } + code = tfClose(pWal->curLogTfd); + if(code != 0) { + return code; + } + int64_t idxTfd, logTfd; + //create new file + int64_t newFileFirstVersion = pWal->lastVersion + 1; + char fnameStr[WAL_FILE_LEN]; + sprintf(fnameStr, "%"PRId64"."WAL_INDEX_SUFFIX, newFileFirstVersion); + idxTfd = tfOpenCreateWrite(fnameStr); + sprintf(fnameStr, "%"PRId64"."WAL_LOG_SUFFIX, newFileFirstVersion); + logTfd = tfOpenCreateWrite(fnameStr); + + taosArrayPush(pWal->fileSet, &newFileFirstVersion); + + //switch file + pWal->curIdxTfd = idxTfd; + pWal->curLogTfd = logTfd; + //change status + pWal->curFileLastVersion = -1; + pWal->curFileFirstVersion = newFileFirstVersion; + pWal->curVersion = newFileFirstVersion; + pWal->curLogOffset = 0; + pWal->curStatus = WAL_CUR_FILE_WRITABLE & WAL_CUR_POS_WRITABLE; + + pWal->lastFileName = newFileFirstVersion; + pWal->lastFileWriteSize = 0; + pWal->lastRollSeq = walGetSeq(); + return 0; +} + +int walChangeFileToLast(SWal *pWal) { + int64_t idxTfd, logTfd; + int64_t* pRet = taosArrayGetLast(pWal->fileSet); + ASSERT(pRet != NULL); + int64_t fname = *pRet; + + char fnameStr[WAL_FILE_LEN]; + sprintf(fnameStr, "%"PRId64"."WAL_INDEX_SUFFIX, fname); + idxTfd = tfOpenReadWrite(fnameStr); + sprintf(fnameStr, "%"PRId64"."WAL_LOG_SUFFIX, fname); + logTfd = tfOpenReadWrite(fnameStr); + //switch file + pWal->curIdxTfd = idxTfd; + pWal->curLogTfd = logTfd; + //change status + pWal->curFileLastVersion = -1; + pWal->curFileFirstVersion = fname; + pWal->curVersion = fname; + pWal->curLogOffset = 0; + pWal->curStatus = WAL_CUR_FILE_WRITABLE; + return 0; +} + +int walWriteIndex(SWal *pWal, int64_t ver, int64_t offset) { + int code = 0; + //get index file + if(!tfValid(pWal->curIdxTfd)) { + code = TAOS_SYSTEM_ERROR(errno); + wError("vgId:%d, file:%"PRId64".idx, failed to open since %s", pWal->vgId, pWal->curFileFirstVersion, strerror(errno)); + } + int64_t writeBuf[2] = { ver, offset }; + int size = tfWrite(pWal->curIdxTfd, writeBuf, sizeof(writeBuf)); + if(size != sizeof(writeBuf)) { + //TODO: + } + return 0; +} + int64_t walWrite(SWal *pWal, int64_t index, uint8_t msgType, void *body, int32_t bodyLen) { if (pWal == NULL) return -1; // no wal - if (!tfValid(pWal->curLogTfd)) return 0; if (pWal->level == TAOS_WAL_NOLOG) return 0; - if (index > pWal->lastVersion + 1) return -1; + + if (index == pWal->lastVersion + 1) { + int64_t passed = walGetSeq() - pWal->lastRollSeq; + if(passed > pWal->rollPeriod) { + walRoll(pWal); + } else if(pWal->lastFileWriteSize > pWal->segSize) { + walRoll(pWal); + } else { + walChangeFileToLast(pWal); + } + } else { + //reject skip log or rewrite log + //must truncate explicitly first + return -1; + } + if (!tfValid(pWal->curLogTfd)) return 0; pWal->head.version = index; int32_t code = 0; @@ -155,8 +258,12 @@ int64_t walWrite(SWal *pWal, int64_t index, uint8_t msgType, void *body, int32_t code = TAOS_SYSTEM_ERROR(errno); wError("vgId:%d, file:%"PRId64".log, failed to write since %s", pWal->vgId, pWal->curFileFirstVersion, strerror(errno)); } - //TODO:write idx + walWriteIndex(pWal, index, pWal->curLogOffset); + pWal->curLogOffset += sizeof(SWalHead) + bodyLen; + //set status + pWal->lastVersion = index; + pthread_mutex_unlock(&pWal->mutex); return code; diff --git a/source/util/src/tfile.c b/source/util/src/tfile.c index 5b61186b12..5d4789aae6 100644 --- a/source/util/src/tfile.c +++ b/source/util/src/tfile.c @@ -53,6 +53,11 @@ static int64_t tfOpenImp(int32_t fd) { return rid; } +int64_t tfOpenRead(const char *pathname, int32_t flags) { + int32_t fd = taosOpenFileRead(pathname); + return tfOpenImp(fd); +} + int64_t tfOpenReadWrite(const char *pathname, int32_t flags) { int32_t fd = taosOpenFileReadWrite(pathname); return tfOpenImp(fd);