diff --git a/include/libs/wal/wal.h b/include/libs/wal/wal.h
index e77540bf90..b514648bbd 100644
--- a/include/libs/wal/wal.h
+++ b/include/libs/wal/wal.h
@@ -39,6 +39,14 @@ typedef enum {
} EWalType;
typedef struct {
+ //union {
+ //uint32_t info;
+ //struct {
+ //uint32_t sver:3;
+ //uint32_t msgtype: 5;
+ //uint32_t reserved : 24;
+ //};
+ //};
int8_t sver;
uint8_t msgType;
int8_t reserved[2];
@@ -71,13 +79,17 @@ typedef struct {
#define WAL_FILESET_MAX 128
#define WAL_IDX_ENTRY_SIZE (sizeof(int64_t)*2)
-#define WAL_CUR_POS_READ_ONLY 1
-#define WAL_CUR_FILE_READ_ONLY 2
+#define WAL_CUR_POS_WRITABLE 1
+#define WAL_CUR_FILE_WRITABLE 2
+#define WAL_CUR_FAILED 4
typedef struct SWal {
// cfg
int32_t vgId;
int32_t fsyncPeriod; // millisecond
+ int32_t fsyncSeq;
+ int32_t rollPeriod; // second
+ int64_t segSize;
EWalType level;
//reference
int64_t refId;
@@ -86,7 +98,7 @@ typedef struct SWal {
int64_t curIdxTfd;
//current version
int64_t curVersion;
- int64_t curOffset;
+ int64_t curLogOffset;
//current file version
int64_t curFileFirstVersion;
int64_t curFileLastVersion;
@@ -94,8 +106,10 @@ typedef struct SWal {
int64_t firstVersion;
int64_t snapshotVersion;
int64_t lastVersion;
- //fsync status
- int32_t fsyncSeq;
+ int64_t lastFileName;
+ //roll status
+ int64_t lastRollSeq;
+ int64_t lastFileWriteSize;
//ctl
int32_t curStatus;
pthread_mutex_t mutex;
@@ -119,12 +133,10 @@ int32_t walAlter(SWal *, SWalCfg *pCfg);
void walClose(SWal *);
// write
-//int64_t walWriteWithMsgType(SWal*, int8_t msgType, void* body, int32_t bodyLen);
int64_t walWrite(SWal *, int64_t index, uint8_t msgType, void *body, int32_t bodyLen);
-//int64_t walWriteBatch(SWal *, void **bodies, int32_t *bodyLen, int32_t batchSize);
+void walFsync(SWal *, bool force);
// apis for lifecycle management
-void walFsync(SWal *, bool force);
int32_t walCommit(SWal *, int64_t ver);
// truncate after
int32_t walRollback(SWal *, int64_t ver);
diff --git a/include/util/tfile.h b/include/util/tfile.h
index ff62c9e341..3d0e2177ac 100644
--- a/include/util/tfile.h
+++ b/include/util/tfile.h
@@ -28,6 +28,7 @@ void tfCleanup();
// the same syntax as UNIX standard open/close/read/write
// but FD is int64_t and will never be reused
+int64_t tfOpenRead(const char *pathname);
int64_t tfOpenReadWrite(const char *pathname);
int64_t tfOpenCreateWrite(const char *pathname);
int64_t tfOpenCreateWriteAppend(const char *pathname);
diff --git a/source/libs/index/inc/index_fst.h b/source/libs/index/inc/index_fst.h
index bcb7070755..eb288e0aa2 100644
--- a/source/libs/index/inc/index_fst.h
+++ b/source/libs/index/inc/index_fst.h
@@ -16,6 +16,9 @@
#ifndef __INDEX_FST_H__
#define __INDEX_FST_H__
+#ifdef __cplusplus
+extern "C" {
+#endif
#include "tarray.h"
#include "index_fst_util.h"
@@ -34,6 +37,7 @@ typedef struct FstRange {
} FstRange;
+typedef enum {GE, GT, LE, LT} RangeType;
typedef enum { OneTransNext, OneTrans, AnyTrans, EmptyFinal} State;
typedef enum {Ordered, OutOfOrdered, DuplicateKey} OrderType;
@@ -85,16 +89,19 @@ uint64_t fstUnFinishedNodesFindCommPrefixAndSetOutput(FstUnFinishedNodes *node,
typedef struct FstBuilder {
FstCountingWriter *wrt; // The FST raw data is written directly to `wtr`.
FstUnFinishedNodes *unfinished; // The stack of unfinished nodes
- FstRegistry* registry; // A map of finished nodes.
- FstSlice last; // The last word added
+ FstRegistry* registry; // A map of finished nodes.
+ FstSlice last; // The last word added
CompiledAddr lastAddr; // The address of the last compiled node
uint64_t len; // num of keys added
} FstBuilder;
FstBuilder *fstBuilderCreate(void *w, FstType ty);
+
+
void fstBuilderDestroy(FstBuilder *b);
void fstBuilderInsertOutput(FstBuilder *b, FstSlice bs, Output in);
+bool fstBuilderInsert(FstBuilder *b, FstSlice bs, Output in);
OrderType fstBuilderCheckLastKey(FstBuilder *b, FstSlice bs, bool ckDup);
void fstBuilderCompileFrom(FstBuilder *b, uint64_t istate);
CompiledAddr fstBuilderCompile(FstBuilder *b, FstBuilderNode *bn);
@@ -169,11 +176,6 @@ uint64_t fstStateFindInput(FstState *state, FstNode *node, uint8_t b, bool *null
-
-
-
-
-
#define FST_STATE_ONE_TRNAS_NEXT(node) (node->state.state == OneTransNext)
#define FST_STATE_ONE_TRNAS(node) (node->state.state == OneTrans)
#define FST_STATE_ANY_TRANS(node) (node->state.state == AnyTrans)
@@ -272,18 +274,13 @@ FstNode* fstGetNode(Fst *fst, CompiledAddr);
FstNode* fstGetRoot(Fst *fst);
FstType fstGetType(Fst *fst);
CompiledAddr fstGetRootAddr(Fst *fst);
-
-Output fstEmptyFinalOutput(Fst *fst, bool *null);
-bool fstVerify(Fst *fst);
+Output fstEmptyFinalOutput(Fst *fst, bool *null);
+bool fstVerify(Fst *fst);
//refactor this function
bool fstBuilderNodeCompileTo(FstBuilderNode *b, FstCountingWriter *wrt, CompiledAddr lastAddr, CompiledAddr startAddr);
-
-
-
-
typedef struct StreamState {
FstNode *node;
uint64_t trans;
@@ -310,10 +307,30 @@ typedef struct StreamWithStateResult {
} StreamWithStateResult;
StreamWithStateResult *swsResultCreate(FstSlice *data, FstOutput fOut, void *state);
+void swsResultDestroy(StreamWithStateResult *result);
typedef void* (*StreamCallback)(void *);
StreamWithState *streamWithStateCreate(Fst *fst, Automation *automation, FstBoundWithData *min, FstBoundWithData *max) ;
void streamWithStateDestroy(StreamWithState *sws);
bool streamWithStateSeekMin(StreamWithState *sws, FstBoundWithData *min);
StreamWithStateResult* streamWithStateNextWith(StreamWithState *sws, StreamCallback callback);
+
+typedef struct FstStreamBuilder {
+ Fst *fst;
+ Automation *aut;
+ FstBoundWithData *min;
+ FstBoundWithData *max;
+} FstStreamBuilder;
+
+FstStreamBuilder *fstStreamBuilderCreate(Fst *fst, Automation *aut);
+// set up bound range
+// refator, simple code by marco
+
+FstStreamBuilder *fstStreamBuilderRange(FstStreamBuilder *b, FstSlice *val, RangeType type);
+
+
+#ifdef __cplusplus
+}
+#endif
+
#endif
diff --git a/source/libs/index/inc/index_fst_automation.h b/source/libs/index/inc/index_fst_automation.h
index c9df9c219e..480d3110c4 100644
--- a/source/libs/index/inc/index_fst_automation.h
+++ b/source/libs/index/inc/index_fst_automation.h
@@ -15,6 +15,10 @@
#ifndef __INDEX_FST_AUTAOMATION_H__
#define __INDEX_FST_AUTAOMATION_H__
+#ifdef __cplusplus
+extern "C" {
+#endif
+
typedef struct AutomationCtx AutomationCtx;
typedef struct StartWith {
@@ -42,6 +46,8 @@ typedef struct Automation {
void *data;
} Automation;
-
+#ifdef __cplusplus
+}
+#endif
#endif
diff --git a/source/libs/index/inc/index_fst_common.h b/source/libs/index/inc/index_fst_common.h
index b261f4090c..9c802faa33 100644
--- a/source/libs/index/inc/index_fst_common.h
+++ b/source/libs/index/inc/index_fst_common.h
@@ -1,7 +1,16 @@
#ifndef __INDEX_FST_COMM_H__
#define __INDEX_FST_COMM_H__
+
extern const uint8_t COMMON_INPUTS[];
extern char const COMMON_INPUTS_INV[];
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+#ifdef __cplusplus
+}
+#endif
+
#endif
diff --git a/source/libs/index/inc/index_fst_counting_writer.h b/source/libs/index/inc/index_fst_counting_writer.h
index 4650804034..9280461780 100644
--- a/source/libs/index/inc/index_fst_counting_writer.h
+++ b/source/libs/index/inc/index_fst_counting_writer.h
@@ -16,6 +16,38 @@
#ifndef __INDEX_FST_COUNTING_WRITER_H__
#define __INDEX_FST_COUNTING_WRITER_H__
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+#include "tfile.h"
+
+
+#define DefaultMem 1024*1024
+
+static char tmpFile[] = "/tmp/index";
+typedef enum WriterType {TMemory, TFile} WriterType;
+
+typedef struct WriterCtx {
+ int (*write)(struct WriterCtx *ctx, uint8_t *buf, int len);
+ int (*read)(struct WriterCtx *ctx, uint8_t *buf, int len);
+ int (*flush)(struct WriterCtx *ctx);
+ WriterType type;
+ union {
+ int fd;
+ void *mem;
+ };
+ int32_t offset;
+ int32_t limit;
+} WriterCtx;
+
+static int writeCtxDoWrite(WriterCtx *ctx, uint8_t *buf, int len);
+static int writeCtxDoRead(WriterCtx *ctx, uint8_t *buf, int len);
+static int writeCtxDoFlush(WriterCtx *ctx);
+
+WriterCtx* writerCtxCreate(WriterType type);
+void writerCtxDestroy(WriterCtx *w);
+
typedef uint32_t CheckSummer;
@@ -25,7 +57,7 @@ typedef struct FstCountingWriter {
CheckSummer summer;
} FstCountingWriter;
-uint64_t fstCountingWriterWrite(FstCountingWriter *write, uint8_t *buf, uint32_t bufLen);
+int fstCountingWriterWrite(FstCountingWriter *write, uint8_t *buf, uint32_t bufLen);
int fstCountingWriterFlush(FstCountingWriter *write);
@@ -44,6 +76,10 @@ uint8_t fstCountingWriterPackUint(FstCountingWriter *writer, uint64_t n);
#define FST_WRITER_INTER_WRITER(writer) (writer->wtr)
#define FST_WRITE_CHECK_SUMMER(writer) (writer->summer)
+#ifdef __cplusplus
+}
+#endif
+
#endif
diff --git a/source/libs/index/inc/index_fst_node.h b/source/libs/index/inc/index_fst_node.h
index 0645aa1158..87eb7cb746 100644
--- a/source/libs/index/inc/index_fst_node.h
+++ b/source/libs/index/inc/index_fst_node.h
@@ -16,6 +16,10 @@
#ifndef __INDEX_FST_NODE_H__
#define __INDEX_FST_NODE_H__
+#ifdef __cplusplus
+extern "C" {
+#endif
+
#include "index_fst_util.h"
#include "index_fst_counting_writer.h"
@@ -42,7 +46,12 @@ FstBuilderNode *fstBuilderNodeClone(FstBuilderNode *src);
void fstBuilderNodeCloneFrom(FstBuilderNode *dst, FstBuilderNode *src);
//bool fstBuilderNodeCompileTo(FstBuilderNode *b, FstCountingWriter *wrt, CompiledAddr lastAddr, CompiledAddr startAddr);
+bool fstBuilderNodeEqual(FstBuilderNode *n1, FstBuilderNode *n2);
void fstBuilderNodeDestroy(FstBuilderNode *node);
+#ifdef __cplusplus
+}
+#endif
+
#endif
diff --git a/source/libs/index/inc/index_fst_registry.h b/source/libs/index/inc/index_fst_registry.h
index 2504d7ccff..1d89e57e52 100644
--- a/source/libs/index/inc/index_fst_registry.h
+++ b/source/libs/index/inc/index_fst_registry.h
@@ -15,6 +15,10 @@
#ifndef __FST_REGISTRY_H__
#define __FST_REGISTRY_H__
+#ifdef __cplusplus
+extern "C" {
+#endif
+
#include "index_fst_util.h"
#include "tarray.h"
#include "index_fst_node.h"
@@ -59,4 +63,8 @@ void fstRegistryDestroy(FstRegistry *registry);
FstRegistryEntry* fstRegistryGetEntry(FstRegistry *registry, FstBuilderNode *bNode);
void fstRegistryEntryDestroy(FstRegistryEntry *entry);
+#ifdef __cplusplus
+}
+#endif
+
#endif
diff --git a/source/libs/index/inc/index_fst_util.h b/source/libs/index/inc/index_fst_util.h
index ff0946063d..4af885816f 100644
--- a/source/libs/index/inc/index_fst_util.h
+++ b/source/libs/index/inc/index_fst_util.h
@@ -17,6 +17,10 @@
#ifndef __INDEX_FST_UTIL_H__
#define __INDEX_FST_UTIL_H__
+#ifdef __cplusplus
+extern "C" {
+#endif
+
#include "tarray.h"
#include "index_fst_common.h"
@@ -67,20 +71,30 @@ uint8_t packDeltaSize(CompiledAddr nodeAddr, CompiledAddr transAddr);
CompiledAddr unpackDelta(char *data, uint64_t len, uint64_t nodeAddr);
+typedef struct FstString {
+ uint8_t *data;
+ uint32_t len;
+ int32_t ref;
+} FstString;
typedef struct FstSlice {
- uint8_t *data;
- uint64_t dLen;
- int32_t start;
- int32_t end;
+ FstString *str;
+ int32_t start;
+ int32_t end;
} FstSlice;
-FstSlice fstSliceCopy(FstSlice *slice, int32_t start, int32_t end);
-FstSlice fstSliceCreate(uint8_t *data, uint64_t dLen);
-bool fstSliceEmpty(FstSlice *slice);
-int fstSliceCompare(FstSlice *a, FstSlice *b);
+FstSlice fstSliceCreate(uint8_t *data, uint64_t len);
+FstSlice fstSliceCopy(FstSlice *s, int32_t start, int32_t end);
+FstSlice fstSliceDeepCopy(FstSlice *s, int32_t start, int32_t end);
+bool fstSliceIsEmpty(FstSlice *s);
+int fstSliceCompare(FstSlice *s1, FstSlice *s2);
+void fstSliceDestroy(FstSlice *s);
+uint8_t *fstSliceData(FstSlice *s, int32_t *sz);
-#define FST_SLICE_LEN(s) ((s)->end - (s)->start + 1)
+#define FST_SLICE_LEN(s) (s->end - s->start + 1)
+#ifdef __cplusplus
+}
+#endif
#endif
diff --git a/source/libs/index/src/index_fst.c b/source/libs/index/src/index_fst.c
index 315253d907..b2d21cd0b0 100644
--- a/source/libs/index/src/index_fst.c
+++ b/source/libs/index/src/index_fst.c
@@ -52,7 +52,7 @@ void fstUnFinishedNodesPushEmpty(FstUnFinishedNodes *nodes, bool isFinal) {
FstBuilderNode *node = malloc(sizeof(FstBuilderNode));
node->isFinal = isFinal;
node->finalOutput = 0;
- node->trans = NULL;
+ node->trans = taosArrayInit(16, sizeof(FstTransition));
FstBuilderNodeUnfinished un = {.node = node, .last = NULL};
taosArrayPush(nodes->stack, &un);
@@ -92,7 +92,7 @@ void fstUnFinishedNodesTopLastFreeze(FstUnFinishedNodes *nodes, CompiledAddr add
}
void fstUnFinishedNodesAddSuffix(FstUnFinishedNodes *nodes, FstSlice bs, Output out) {
FstSlice *s = &bs;
- if (s->data == NULL || s->dLen == 0 || s->start > s->end) {
+ if (fstSliceIsEmpty(s)) {
return;
}
size_t sz = taosArrayGetSize(nodes->stack) - 1;
@@ -104,18 +104,20 @@ void fstUnFinishedNodesAddSuffix(FstUnFinishedNodes *nodes, FstSlice bs, Output
//FstLastTransition *trn = malloc(sizeof(FstLastTransition));
//trn->inp = s->data[s->start];
//trn->out = out;
- un->last = fstLastTransitionCreate(s->data[s->start], out);
+ int32_t len = 0;
+ uint8_t *data = fstSliceData(s, &len);
+ un->last = fstLastTransitionCreate(data[0], out);
- for (uint64_t i = s->start; i <= s->end; i++) {
+ for (uint64_t i = 0; i < len; i++) {
FstBuilderNode *n = malloc(sizeof(FstBuilderNode));
n->isFinal = false;
n->finalOutput = 0;
- n->trans = NULL;
+ n->trans = taosArrayInit(16, sizeof(FstTransition));
//FstLastTransition *trn = malloc(sizeof(FstLastTransition));
//trn->inp = s->data[i];
//trn->out = out;
- FstLastTransition *trn = fstLastTransitionCreate(s->data[i], out);
+ FstLastTransition *trn = fstLastTransitionCreate(data[i], out);
FstBuilderNodeUnfinished un = {.node = n, .last = trn};
taosArrayPush(nodes->stack, &un);
@@ -127,13 +129,13 @@ void fstUnFinishedNodesAddSuffix(FstUnFinishedNodes *nodes, FstSlice bs, Output
uint64_t fstUnFinishedNodesFindCommPrefix(FstUnFinishedNodes *node, FstSlice bs) {
FstSlice *s = &bs;
- size_t lsz = (size_t)(s->end - s->start + 1); // data len
size_t ssz = taosArrayGetSize(node->stack); // stack size
-
uint64_t count = 0;
+ int32_t lsz; // data len
+ uint8_t *data = fstSliceData(s, &lsz);
for (size_t i = 0; i < ssz && i < lsz; i++) {
FstBuilderNodeUnfinished *un = taosArrayGet(node->stack, i);
- if (un->last->inp == s->data[s->start + i]) {
+ if (un->last->inp == data[i]) {
count++;
} else {
break;
@@ -153,7 +155,8 @@ uint64_t fstUnFinishedNodesFindCommPrefixAndSetOutput(FstUnFinishedNodes *node,
FstLastTransition *t = un->last;
uint64_t addPrefix = 0;
- if (t && t->inp == s->data[s->start + i]) {
+ uint8_t *data = fstSliceData(s, NULL);
+ if (t && t->inp == data[i]) {
uint64_t commPrefix = MIN(t->out, *out);
uint64_t tAddPrefix = t->out - commPrefix;
(*out) = (*out) - commPrefix;
@@ -164,7 +167,6 @@ uint64_t fstUnFinishedNodesFindCommPrefixAndSetOutput(FstUnFinishedNodes *node,
}
if (addPrefix != 0) {
fstBuilderNodeUnfinishedAddOutputPrefix(un, addPrefix);
-
}
}
return i;
@@ -176,7 +178,9 @@ FstState fstStateCreateFrom(FstSlice* slice, CompiledAddr addr) {
if (addr == EMPTY_ADDRESS) {
return fs;
}
- uint8_t v = slice->data[addr];
+
+ uint8_t *data = fstSliceData(slice, NULL);
+ uint8_t v = data[addr];
uint8_t t = (v & 0b11000000) >> 6;
if (t == 0b11) {
fs.state = OneTransNext;
@@ -376,7 +380,8 @@ uint8_t fstStateInput(FstState *s, FstNode *node) {
FstSlice *slice = &node->data;
bool null = false;
uint8_t inp = fstStateCommInput(s, &null);
- return null == false ? inp : slice->data[slice->start - 1];
+ uint8_t *data = fstSliceData(slice, NULL);
+ return null == false ? inp : data[-1];
}
uint8_t fstStateInputForAnyTrans(FstState *s, FstNode *node, uint64_t i) {
assert(s->state == AnyTrans);
@@ -388,7 +393,9 @@ uint8_t fstStateInputForAnyTrans(FstState *s, FstNode *node, uint64_t i) {
- fstStateTransIndexSize(s, node->version, node->nTrans)
- i
- 1; // the output size
- return slice->data[at];
+
+ uint8_t *data = fstSliceData(slice, NULL);
+ return data[at];
}
// trans_addr
@@ -406,7 +413,8 @@ CompiledAddr fstStateTransAddr(FstState *s, FstNode *node) {
- tSizes;
// refactor error logic
- return unpackDelta(slice->data + slice->start + i, tSizes, node->end);
+ uint8_t *data = fstSliceData(slice, NULL);
+ return unpackDelta(data +i, tSizes, node->end);
}
}
CompiledAddr fstStateTransAddrForAnyTrans(FstState *s, FstNode *node, uint64_t i) {
@@ -421,7 +429,8 @@ CompiledAddr fstStateTransAddrForAnyTrans(FstState *s, FstNode *node, uint64_t i
- node->nTrans
- (i * tSizes)
- tSizes;
- return unpackDelta(slice->data + slice->start + at, tSizes, node->end);
+ uint8_t *data = fstSliceData(slice, NULL);
+ return unpackDelta(data + at, tSizes, node->end);
}
// sizes
@@ -434,7 +443,8 @@ PackSizes fstStateSizes(FstState *s, FstSlice *slice) {
i = FST_SLICE_LEN(slice) - 1 - fstStateNtransLen(s) - 1;
}
- return (PackSizes)(slice->data[slice->start + i]);
+ uint8_t *data = fstSliceData(slice, NULL);
+ return (PackSizes)(*(data +i));
}
// Output
Output fstStateOutput(FstState *s, FstNode *node) {
@@ -452,7 +462,8 @@ Output fstStateOutput(FstState *s, FstNode *node) {
- 1
- tSizes
- oSizes;
- return unpackUint64(slice->data + slice->start + i, oSizes);
+ uint8_t *data = fstSliceData(slice, NULL);
+ return unpackUint64(data + i, oSizes);
}
Output fstStateOutputForAnyTrans(FstState *s, FstNode *node, uint64_t i) {
@@ -469,7 +480,9 @@ Output fstStateOutputForAnyTrans(FstState *s, FstNode *node, uint64_t i) {
- fstStateTotalTransSize(s, node->version, node->sizes, node->nTrans)
- (i * oSizes)
- oSizes;
- return unpackUint64(slice->data + slice->start + at, oSizes);
+
+ uint8_t *data = fstSliceData(slice, NULL);
+ return unpackUint64(data + at, oSizes);
}
// anyTrans specify function
@@ -523,7 +536,10 @@ uint64_t fstStateNtrans(FstState *s, FstSlice *slice) {
if (null != true) {
return n;
}
- n = slice->data[slice->end - 1]; // data[data.len() - 2]
+ int32_t len;
+ uint8_t *data = fstSliceData(slice, &len);
+ n = data[len - 2];
+ //n = data[slice->end - 1]; // data[data.len() - 2]
return n == 1 ? 256: n; // // "1" is never a normal legal value here, because if there, // is only 1 transition, then it is encoded in the state byte
}
Output fstStateFinalOutput(FstState *s, uint64_t version, FstSlice *slice, PackSizes sizes, uint64_t nTrans) {
@@ -538,7 +554,8 @@ Output fstStateFinalOutput(FstState *s, uint64_t version, FstSlice *slice, Pack
- fstStateTotalTransSize(s, version, sizes, nTrans)
- (nTrans * oSizes)
- oSizes;
- return unpackUint64(slice->data + slice->start + at, (uint8_t)oSizes);
+ uint8_t *data = fstSliceData(slice, NULL);
+ return unpackUint64(data + at, (uint8_t)oSizes);
}
uint64_t fstStateFindInput(FstState *s, FstNode *node, uint8_t b, bool *null) {
@@ -549,7 +566,10 @@ uint64_t fstStateFindInput(FstState *s, FstNode *node, uint8_t b, bool *null) {
- fstStateNtransLen(s)
- 1 // pack size
- fstStateTransIndexSize(s, node->version, node->nTrans);
- uint64_t i = slice->data[slice->start + at + b];
+ int32_t dlen = 0;
+ uint8_t *data = fstSliceData(slice, &dlen);
+ uint64_t i = data[at + b];
+ //uint64_t i = slice->data[slice->start + at + b];
if (i >= node->nTrans) {
*null = true;
}
@@ -561,8 +581,13 @@ uint64_t fstStateFindInput(FstState *s, FstNode *node, uint8_t b, bool *null) {
- node->nTrans;
uint64_t end = start + node->nTrans;
uint64_t len = end - start;
+ int32_t dlen = 0;
+ uint8_t *data = fstSliceData(slice, &dlen);
for(int i = 0; i < len; i++) {
- uint8_t v = slice->data[slice->start + i];
+ //uint8_t v = slice->data[slice->start + i];
+ ////slice->data[slice->start + i];
+ uint8_t v = data[i];
+
if (v == b) {
return node->nTrans - i - 1; // bug
}
@@ -635,6 +660,7 @@ static const char *fstNodeState(FstNode *node) {
void fstNodeDestroy(FstNode *node) {
+ fstSliceDestroy(&node->data);
free(node);
}
FstTransitions* fstNodeTransitions(FstNode *node) {
@@ -774,18 +800,18 @@ bool fstBuilderInsert(FstBuilder *b, FstSlice bs, Output in) {
void fstBuilderInsertOutput(FstBuilder *b, FstSlice bs, Output in) {
FstSlice *s = &bs;
- if (fstSliceEmpty(s)) {
+ if (fstSliceIsEmpty(s)) {
b->len = 1;
fstUnFinishedNodesSetRootOutput(b->unfinished, in);
return;
}
- Output out;
//if (in != 0) { //if let Some(in) = in
// prefixLen = fstUnFinishedNodesFindCommPrefixAndSetOutput(b->unfinished, bs, in, &out);
//} else {
// prefixLen = fstUnFinishedNodesFindCommPrefix(b->unfinished, bs);
// out = 0;
//}
+ Output out;
uint64_t prefixLen = fstUnFinishedNodesFindCommPrefixAndSetOutput(b->unfinished, bs, in, &out);
if (prefixLen == FST_SLICE_LEN(s)) {
@@ -798,12 +824,13 @@ void fstBuilderInsertOutput(FstBuilder *b, FstSlice bs, Output in) {
FstSlice sub = fstSliceCopy(s, prefixLen, s->end);
fstUnFinishedNodesAddSuffix(b->unfinished, sub, out);
+ fstSliceDestroy(&sub);
return;
}
OrderType fstBuilderCheckLastKey(FstBuilder *b, FstSlice bs, bool ckDup) {
FstSlice *input = &bs;
- if (fstSliceEmpty(&b->last)) {
+ if (fstSliceIsEmpty(&b->last)) {
// deep copy or not
b->last = fstSliceCopy(&bs, input->start, input->end);
} else {
@@ -829,7 +856,7 @@ void fstBuilderCompileFrom(FstBuilder *b, uint64_t istate) {
}
addr = fstBuilderCompile(b, n);
assert(addr != NONE_ADDRESS);
- fstBuilderNodeDestroy(n);
+ //fstBuilderNodeDestroy(n);
}
fstUnFinishedNodesTopLastFreeze(b->unfinished, addr);
return;
@@ -888,7 +915,7 @@ void fstBuilderFinish(FstBuilder *b) {
FstSlice fstNodeAsSlice(FstNode *node) {
FstSlice *slice = &node->data;
- FstSlice s = fstSliceCopy(slice, slice->end, slice->dLen - 1);
+ FstSlice s = fstSliceCopy(slice, slice->end, FST_SLICE_LEN(slice) - 1);
return s;
}
@@ -929,12 +956,13 @@ void fstBuilderNodeUnfinishedAddOutputPrefix(FstBuilderNodeUnfinished *unNode, O
}
Fst* fstCreate(FstSlice *slice) {
- char *buf = slice->data;
- uint64_t skip = 0;
- uint64_t len = slice->dLen;
- if (len < 36) {
+ int32_t slen;
+ char *buf = fstSliceData(slice, &slen);
+ if (slen < 36) {
return NULL;
}
+ uint64_t len = slen;
+ uint64_t skip = 0;
uint64_t version;
taosDecodeFixedU64(buf, &version);
@@ -992,8 +1020,10 @@ void fstDestroy(Fst *fst) {
bool fstGet(Fst *fst, FstSlice *b, Output *out) {
FstNode *root = fstGetRoot(fst);
Output tOut = 0;
- for (uint32_t i = 0; i < b->dLen; i++) {
- uint8_t inp = b->data[i];
+ int32_t len;
+ uint8_t *data = fstSliceData(b, &len);
+ for (uint32_t i = 0; i < len; i++) {
+ uint8_t inp = data[i];
Output res = 0;
bool null = fstNodeFindInput(root, inp, &res);
if (null) { return false; }
@@ -1046,9 +1076,10 @@ Output fstEmptyFinalOutput(Fst *fst, bool *null) {
bool fstVerify(Fst *fst) {
uint32_t checkSum = fst->meta->checkSum;
- FstSlice *data = fst->data;
+ int32_t len;
+ uint8_t *data = fstSliceData(fst->data, &len);
TSCKSUM initSum = 0;
- if (!taosCheckChecksumWhole(data->data, data->dLen)) {
+ if (!taosCheckChecksumWhole(data, len)) {
return false;
}
return true;
@@ -1058,9 +1089,14 @@ bool fstVerify(Fst *fst) {
FstBoundWithData* fstBoundStateCreate(FstBound type, FstSlice *data) {
FstBoundWithData *b = calloc(1, sizeof(FstBoundWithData));
if (b == NULL) { return NULL; }
-
+
+ if (data != NULL) {
+ b->data = fstSliceCopy(data, data->start, data->end);
+ } else {
+ b->data = fstSliceCreate(NULL, 0);
+ }
b->type = type;
- b->data = fstSliceCopy(data, data->start, data->end);
+
return b;
}
@@ -1078,7 +1114,7 @@ bool fstBoundWithDataIsEmpty(FstBoundWithData *bound) {
if (bound->type == Unbounded) {
return true;
} else {
- return fstSliceEmpty(&bound->data);
+ return fstSliceIsEmpty(&bound->data);
}
}
@@ -1145,8 +1181,10 @@ bool streamWithStateSeekMin(StreamWithState *sws, FstBoundWithData *min) {
Output out = 0;
void* autState = sws->aut->start();
- for (uint32_t i = 0; i < key->dLen; i++) {
- uint8_t b = key->data[i];
+ int32_t len;
+ uint8_t *data = fstSliceData(key, &len);
+ for (uint32_t i = 0; i < len; i++) {
+ uint8_t b = data[i];
uint64_t res = 0;
bool null = fstNodeFindInput(node, b, &res);
if (null == false) {
@@ -1262,12 +1300,16 @@ StreamWithStateResult *streamWithStateNextWith(StreamWithState *sws, StreamCallb
if (fstBoundWithDataExceededBy(sws->endAt, &slice)) {
taosArrayDestroyEx(sws->stack, streamStateDestroy);
sws->stack = (SArray *)taosArrayInit(256, sizeof(StreamState));
+ fstSliceDestroy(&slice);
return NULL;
}
if (FST_NODE_IS_FINAL(nextNode) && isMatch) {
- FstOutput fOutput = {.null = false, out = out + FST_NODE_FINAL_OUTPUT(nextNode)};
- return swsResultCreate(&slice, fOutput , tState);
+ FstOutput fOutput = {.null = false, .out = out + FST_NODE_FINAL_OUTPUT(nextNode)};
+ StreamWithStateResult *result = swsResultCreate(&slice, fOutput , tState);
+ fstSliceDestroy(&slice);
+ return result;
}
+ fstSliceDestroy(&slice);
}
return NULL;
@@ -1277,14 +1319,19 @@ StreamWithStateResult *swsResultCreate(FstSlice *data, FstOutput fOut, void *sta
StreamWithStateResult *result = calloc(1, sizeof(StreamWithStateResult));
if (result == NULL) { return NULL; }
- FstSlice slice = fstSliceCopy(data, 0, data->dLen - 1);
- result->data = slice;
+ result->data = fstSliceCopy(data, 0, FST_SLICE_LEN(data) - 1);
result->out = fOut;
result->state = state;
return result;
-
}
+void swsResultDestroy(StreamWithStateResult *result) {
+ if (NULL == result) { return; }
+
+ fstSliceDestroy(&result->data);
+ free(result);
+}
+
void streamStateDestroy(void *s) {
if (NULL == s) { return; }
StreamState *ss = (StreamState *)s;
@@ -1293,5 +1340,44 @@ void streamStateDestroy(void *s) {
//free(s->autoState);
}
+FstStreamBuilder *fstStreamBuilderCreate(Fst *fst, Automation *aut) {
+ FstStreamBuilder *b = calloc(1, sizeof(FstStreamBuilder));
+ if (NULL == b) { return NULL; }
+
+ b->fst = fst;
+ b->aut = aut;
+ b->min = fstBoundStateCreate(Unbounded, NULL);
+ b->max = fstBoundStateCreate(Unbounded, NULL);
+ return b;
+}
+void fstStreamBuilderDestroy(FstStreamBuilder *b) {
+ fstSliceDestroy(&b->min->data);
+ fstSliceDestroy(&b->max->data);
+ free(b);
+}
+FstStreamBuilder *fstStreamBuilderRange(FstStreamBuilder *b, FstSlice *val, RangeType type) {
+ if (b == NULL) { return NULL; }
+
+ if (type == GE) {
+ b->min->type = Included;
+ fstSliceDestroy(&(b->min->data));
+ b->min->data = fstSliceDeepCopy(val, 0, FST_SLICE_LEN(val) - 1);
+ } else if (type == GT) {
+ b->min->type = Excluded;
+ fstSliceDestroy(&(b->min->data));
+ b->min->data = fstSliceDeepCopy(val, 0, FST_SLICE_LEN(val) - 1);
+ } else if (type == LE) {
+ b->max->type = Included;
+ fstSliceDestroy(&(b->max->data));
+ b->max->data = fstSliceDeepCopy(val, 0, FST_SLICE_LEN(val) - 1);
+ } else if (type == LT) {
+ b->max->type = Excluded;
+ fstSliceDestroy(&(b->max->data));
+ b->max->data = fstSliceDeepCopy(val, 0, FST_SLICE_LEN(val) - 1);
+ }
+ return b;
+}
+
+
diff --git a/source/libs/index/src/index_fst_automation.c b/source/libs/index/src/index_fst_automation.c
index f2f48bbc8a..3d5efd30f3 100644
--- a/source/libs/index/src/index_fst_automation.c
+++ b/source/libs/index/src/index_fst_automation.c
@@ -12,3 +12,4 @@
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*/
+
diff --git a/source/libs/index/src/index_fst_counting_writer.c b/source/libs/index/src/index_fst_counting_writer.c
index a0a2c380f1..1989a5a425 100644
--- a/source/libs/index/src/index_fst_counting_writer.c
+++ b/source/libs/index/src/index_fst_counting_writer.c
@@ -16,24 +16,88 @@
#include "index_fst_util.h"
#include "index_fst_counting_writer.h"
+static int writeCtxDoWrite(WriterCtx *ctx, uint8_t *buf, int len) {
+ if (ctx->offset + len > ctx->limit) {
+ return -1;
+ }
+
+ if (ctx->type == TFile) {
+ assert(len != tfWrite(ctx->fd, buf, len));
+ } else {
+ memcpy(ctx->mem + ctx->offset, buf, len);
+ }
+ ctx->offset += len;
+ return len;
+}
+static int writeCtxDoRead(WriterCtx *ctx, uint8_t *buf, int len) {
+ if (ctx->type == TFile) {
+ tfRead(ctx->fd, buf, len);
+ } else {
+ memcpy(buf, ctx->mem + ctx->offset, len);
+ }
+ ctx->offset += len;
+
+ return 1;
+}
+static int writeCtxDoFlush(WriterCtx *ctx) {
+ if (ctx->type == TFile) {
+ //tfFlush(ctx->fd);
+ } else {
+ // do nothing
+ }
+ return 1;
+}
+
+WriterCtx* writerCtxCreate(WriterType type) {
+ WriterCtx *ctx = calloc(1, sizeof(WriterCtx));
+ if (ctx == NULL) { return NULL; }
+
+ ctx->type == type;
+ if (ctx->type == TFile) {
+ ctx->fd = tfOpenCreateWriteAppend(tmpFile);
+ } else if (ctx->type == TMemory) {
+ ctx->mem = calloc(1, DefaultMem * sizeof(uint8_t));
+ }
+ ctx->write = writeCtxDoWrite;
+ ctx->read = writeCtxDoRead;
+ ctx->flush = writeCtxDoFlush;
+
+ ctx->offset = 0;
+ ctx->limit = DefaultMem;
+
+ return ctx;
+}
+void writerCtxDestroy(WriterCtx *ctx) {
+ if (ctx->type == TMemory) {
+ free(ctx->mem);
+ } else {
+ tfClose(ctx->fd);
+ }
+ free(ctx);
+}
+
+
FstCountingWriter *fstCountingWriterCreate(void *wrt) {
FstCountingWriter *cw = calloc(1, sizeof(FstCountingWriter));
if (cw == NULL) { return NULL; }
-
- cw->wrt = wrt;
+
+ cw->wrt = (void *)(writerCtxCreate(TFile));
return cw;
}
void fstCountingWriterDestroy(FstCountingWriter *cw) {
// free wrt object: close fd or free mem
+ writerCtxDestroy((WriterCtx *)(cw->wrt));
free(cw);
}
-uint64_t fstCountingWriterWrite(FstCountingWriter *write, uint8_t *buf, uint32_t bufLen) {
+int fstCountingWriterWrite(FstCountingWriter *write, uint8_t *buf, uint32_t bufLen) {
if (write == NULL) { return 0; }
// update checksum
// write data to file/socket or mem
-
- write->count += bufLen;
+ WriterCtx *ctx = write->wrt;
+
+ int nWrite = ctx->write(ctx, buf, bufLen);
+ write->count += nWrite;
return bufLen;
}
@@ -41,6 +105,8 @@ uint32_t fstCountingWriterMaskedCheckSum(FstCountingWriter *write) {
return 0;
}
int fstCountingWriterFlush(FstCountingWriter *write) {
+ WriterCtx *ctx = write->wrt;
+ ctx->flush(ctx);
//write->wtr->flush
return 1;
}
diff --git a/source/libs/index/src/index_fst_node.c b/source/libs/index/src/index_fst_node.c
index b33b8e4428..af69f98f32 100644
--- a/source/libs/index/src/index_fst_node.c
+++ b/source/libs/index/src/index_fst_node.c
@@ -18,7 +18,7 @@ FstBuilderNode *fstBuilderNodeDefault() {
FstBuilderNode *bn = malloc(sizeof(FstBuilderNode));
bn->isFinal = false;
bn->finalOutput = 0;
- bn->trans = NULL;
+ bn->trans = taosArrayInit(16, sizeof(FstTransition));
return bn;
}
void fstBuilderNodeDestroy(FstBuilderNode *node) {
@@ -27,6 +27,25 @@ void fstBuilderNodeDestroy(FstBuilderNode *node) {
taosArrayDestroy(node->trans);
free(node);
}
+
+bool fstBuilderNodeEqual(FstBuilderNode *n1, FstBuilderNode *n2) {
+ if (n1 == n2) { return true; }
+
+ if (n1->isFinal != n2->isFinal ||
+ n1->finalOutput != n2->finalOutput ||
+ taosArrayGetSize(n1->trans) != taosArrayGetSize(n2->trans)) {
+ return false;
+ }
+ size_t sz = taosArrayGetSize(n1->trans);
+ for (size_t i = 0; i < sz; i++) {
+ FstTransition *t1 = taosArrayGet(n1->trans, i);
+ FstTransition *t2 = taosArrayGet(n2->trans, i);
+ if (t1->inp != t2->inp || t1->out != t2->out || t1->addr != t2->addr) {
+ return false;
+ }
+ }
+ return true;
+}
FstBuilderNode *fstBuilderNodeClone(FstBuilderNode *src) {
FstBuilderNode *node = malloc(sizeof(FstBuilderNode));
if (node == NULL) { return NULL; }
@@ -53,12 +72,17 @@ void fstBuilderNodeCloneFrom(FstBuilderNode *dst, FstBuilderNode *src) {
dst->isFinal = src->isFinal;
dst->finalOutput = src->finalOutput;
- // avoid mem leak
+ //release free avoid mem leak
taosArrayDestroy(dst->trans);
- dst->trans = src->trans;
- src->trans = NULL;
+ size_t sz = taosArrayGetSize(src->trans);
+ dst->trans = taosArrayInit(sz, sizeof(FstTransition));
+ for (size_t i = 0; i < sz; i++) {
+ FstTransition *trn = taosArrayGet(src->trans, i);
+ taosArrayPush(dst->trans, trn);
+ }
}
+
//bool fstBuilderNodeCompileTo(FstBuilderNode *b, FstCountingWriter *wrt, CompiledAddr lastAddr, CompiledAddr startAddr) {
//size_t sz = taosArrayGetSize(b->trans);
diff --git a/source/libs/index/src/index_fst_registry.c b/source/libs/index/src/index_fst_registry.c
index b25964e0e7..8fb0dbfcaa 100644
--- a/source/libs/index/src/index_fst_registry.c
+++ b/source/libs/index/src/index_fst_registry.c
@@ -112,7 +112,7 @@ FstRegistryEntry *fstRegistryGetEntry(FstRegistry *registry, FstBuilderNode *bNo
if (end - start == 1) {
FstRegistryCell *cell = taosArrayGet(registry->table, start);
//cell->isNode &&
- if (cell->addr != NONE_ADDRESS && cell->node == bNode) {
+ if (cell->addr != NONE_ADDRESS && fstBuilderNodeEqual(cell->node, bNode)) {
entry->state = FOUND;
entry->addr = cell->addr ;
return entry;
@@ -123,13 +123,13 @@ FstRegistryEntry *fstRegistryGetEntry(FstRegistry *registry, FstBuilderNode *bNo
}
} else if (end - start == 2) {
FstRegistryCell *cell1 = taosArrayGet(registry->table, start);
- if (cell1->addr != NONE_ADDRESS && cell1->node == bNode) {
+ if (cell1->addr != NONE_ADDRESS && fstBuilderNodeEqual(cell1->node, bNode)) {
entry->state = FOUND;
entry->addr = cell1->addr;
return entry;
}
FstRegistryCell *cell2 = taosArrayGet(registry->table, start + 1);
- if (cell2->addr != NONE_ADDRESS && cell2->node == bNode) {
+ if (cell2->addr != NONE_ADDRESS && fstBuilderNodeEqual(cell2->node, bNode)) {
entry->state = FOUND;
entry->addr = cell2->addr;
// must swap here
@@ -147,7 +147,7 @@ FstRegistryEntry *fstRegistryGetEntry(FstRegistry *registry, FstBuilderNode *bNo
uint32_t i = start;
for (; i < end; i++) {
FstRegistryCell *cell = (FstRegistryCell *)taosArrayGet(registry->table, i);
- if (cell->addr != NONE_ADDRESS && cell->node == bNode) {
+ if (cell->addr != NONE_ADDRESS && fstBuilderNodeEqual(cell->node, bNode)) {
entry->state = FOUND;
entry->addr = cell->addr;
fstRegistryCellPromote(registry->table, i, start);
diff --git a/source/libs/index/src/index_fst_util.c b/source/libs/index/src/index_fst_util.c
index 952c710676..532b0b8ac3 100644
--- a/source/libs/index/src/index_fst_util.c
+++ b/source/libs/index/src/index_fst_util.c
@@ -91,42 +91,87 @@ CompiledAddr unpackDelta(char *data, uint64_t len, uint64_t nodeAddr) {
}
// fst slice func
-FstSlice fstSliceCreate(uint8_t *data, uint64_t dLen) {
- FstSlice slice = {.data = data, .dLen = dLen, .start = 0, .end = dLen - 1};
- return slice;
+//
+
+FstSlice fstSliceCreate(uint8_t *data, uint64_t len) {
+ FstString *str = (FstString *)malloc(sizeof(FstString));
+ str->ref = 1;
+ str->len = len;
+ str->data = malloc(len * sizeof(uint8_t));
+ memcpy(str->data, data, len);
+
+ FstSlice s = {.str = str, .start = 0, .end = len - 1};
+ return s;
}
// just shallow copy
-FstSlice fstSliceCopy(FstSlice *slice, int32_t start, int32_t end) {
- FstSlice t;
- if (start >= slice->dLen || end >= slice->dLen || start > end) {
- t.data = NULL;
- return t;
- };
-
- t.data = slice->data;
- t.dLen = slice->dLen;
- t.start = start;
- t.end = end;
+FstSlice fstSliceCopy(FstSlice *s, int32_t start, int32_t end) {
+ FstString *str = s->str;
+ str->ref++;
+ //uint8_t *buf = fstSliceData(s, &alen);
+ //start = buf + start - (buf - s->start);
+ //end = buf + end - (buf - s->start);
+
+ FstSlice t = {.str = str, .start = start + s->start, .end = end + s->start};
return t;
}
-bool fstSliceEmpty(FstSlice *slice) {
- return slice->data == NULL || slice->dLen <= 0;
+FstSlice fstSliceDeepCopy(FstSlice *s, int32_t start, int32_t end) {
+
+ int32_t tlen = end - start + 1;
+ int32_t slen;
+ uint8_t *data = fstSliceData(s, &slen);
+ assert(tlen <= slen);
+
+ uint8_t *buf = malloc(sizeof(uint8_t) * tlen);
+ memcpy(buf, data + start, tlen);
+
+ FstString *str = malloc(sizeof(FstString));
+ str->data = buf;
+ str->len = tlen;
+ str->ref = 1;
+
+ FstSlice ans;
+ ans.str = str;
+ ans.start = 0;
+ ans.end = tlen - 1;
+ return ans;
+}
+bool fstSliceIsEmpty(FstSlice *s) {
+ return s->str == NULL || s->str->len == 0 || s->start < 0 || s->end < 0;
+}
+
+uint8_t *fstSliceData(FstSlice *s, int32_t *size) {
+ FstString *str = s->str;
+ if (size != NULL) {
+ *size = s->end - s->start + 1;
+ }
+ return str->data + s->start;
+}
+void fstSliceDestroy(FstSlice *s) {
+ FstString *str = s->str;
+ str->ref--;
+ if (str->ref <= 0) {
+ free(str->data);
+ free(str);
+ s->str = NULL;
+ }
}
int fstSliceCompare(FstSlice *a, FstSlice *b) {
- int32_t aLen = (a->end - a->start + 1);
- int32_t bLen = (b->end - b->start + 1);
- int32_t mLen = (aLen < bLen ? aLen : bLen);
- for (int i = 0; i < mLen; i++) {
- uint8_t x = a->data[i + a->start];
- uint8_t y = b->data[i + b->start];
- if (x == y) { continue; }
+ int32_t alen, blen;
+ uint8_t *aBuf = fstSliceData(a, &alen);
+ uint8_t *bBuf = fstSliceData(b, &blen);
+
+ uint32_t i, j;
+ for (i = 0, j = 0; i < alen && j < blen; i++, j++) {
+ uint8_t x = aBuf[i];
+ uint8_t y = bBuf[j];
+ if (x == y) { continue;}
else if (x < y) { return -1; }
- else { return 1; }
- }
- if (aLen == bLen) { return 0; }
- else if (aLen < bLen) { return -1; }
- else { return 1; }
+ else { return 1; };
+ }
+ if (i < alen) { return 1; }
+ else if (j < blen) { return -1; }
+ else { return 0; }
}
diff --git a/source/libs/index/test/CMakeLists.txt b/source/libs/index/test/CMakeLists.txt
index f2a7442a5a..f84f874a23 100644
--- a/source/libs/index/test/CMakeLists.txt
+++ b/source/libs/index/test/CMakeLists.txt
@@ -1,7 +1,6 @@
add_executable(indexTest "")
target_sources(indexTest
PRIVATE
- "../src/index.c"
"indexTests.cpp"
)
target_include_directories ( indexTest
diff --git a/source/libs/index/test/indexTests.cpp b/source/libs/index/test/indexTests.cpp
index 857060ce5e..9f3d1b962e 100644
--- a/source/libs/index/test/indexTests.cpp
+++ b/source/libs/index/test/indexTests.cpp
@@ -3,58 +3,84 @@
#include
#include "index.h"
#include "indexInt.h"
+#include "index_fst.h"
+#include "index_fst_util.h"
+#include "index_fst_counting_writer.h"
-TEST(IndexTest, index_create_test) {
- SIndexOpts *opts = indexOptsCreate();
- SIndex *index = indexOpen(opts, "./test");
- if (index == NULL) {
- std::cout << "index open failed" << std::endl;
- }
+//TEST(IndexTest, index_create_test) {
+// SIndexOpts *opts = indexOptsCreate();
+// SIndex *index = indexOpen(opts, "./test");
+// if (index == NULL) {
+// std::cout << "index open failed" << std::endl;
+// }
+//
+//
+// // write
+// for (int i = 0; i < 100000; i++) {
+// SIndexMultiTerm* terms = indexMultiTermCreate();
+// std::string val = "field";
+//
+// indexMultiTermAdd(terms, "tag1", strlen("tag1"), val.c_str(), val.size());
+//
+// val.append(std::to_string(i));
+// indexMultiTermAdd(terms, "tag2", strlen("tag2"), val.c_str(), val.size());
+//
+// val.insert(0, std::to_string(i));
+// indexMultiTermAdd(terms, "tag3", strlen("tag3"), val.c_str(), val.size());
+//
+// val.append("const");
+// indexMultiTermAdd(terms, "tag4", strlen("tag4"), val.c_str(), val.size());
+//
+//
+// indexPut(index, terms, i);
+// indexMultiTermDestroy(terms);
+// }
+//
+//
+// // query
+// SIndexMultiTermQuery *multiQuery = indexMultiTermQueryCreate(MUST);
+//
+// indexMultiTermQueryAdd(multiQuery, "tag1", strlen("tag1"), "field", strlen("field"), QUERY_PREFIX);
+// indexMultiTermQueryAdd(multiQuery, "tag3", strlen("tag3"), "0field0", strlen("0field0"), QUERY_TERM);
+//
+// SArray *result = (SArray *)taosArrayInit(10, sizeof(int));
+// indexSearch(index, multiQuery, result);
+//
+// std::cout << "taos'size : " << taosArrayGetSize(result) << std::endl;
+// for (int i = 0; i < taosArrayGetSize(result); i++) {
+// int *v = (int *)taosArrayGet(result, i);
+// std::cout << "value --->" << *v << std::endl;
+// }
+// // add more test case
+// indexMultiTermQueryDestroy(multiQuery);
+//
+// indexOptsDestroy(opts);
+// indexClose(index);
+// //
+//}
-
- // write
- for (int i = 0; i < 100000; i++) {
- SIndexMultiTerm* terms = indexMultiTermCreate();
- std::string val = "field";
+int main(int argc, char** argv) {
+ std::string str("abc");
+ FstSlice key = fstSliceCreate((uint8_t *)str.c_str(), str.size());
+ Output val = 10;
- indexMultiTermAdd(terms, "tag1", strlen("tag1"), val.c_str(), val.size());
-
- val.append(std::to_string(i));
- indexMultiTermAdd(terms, "tag2", strlen("tag2"), val.c_str(), val.size());
-
- val.insert(0, std::to_string(i));
- indexMultiTermAdd(terms, "tag3", strlen("tag3"), val.c_str(), val.size());
-
- val.append("const");
- indexMultiTermAdd(terms, "tag4", strlen("tag4"), val.c_str(), val.size());
-
-
- indexPut(index, terms, i);
- indexMultiTermDestroy(terms);
- }
-
-
- // query
- SIndexMultiTermQuery *multiQuery = indexMultiTermQueryCreate(MUST);
-
- indexMultiTermQueryAdd(multiQuery, "tag1", strlen("tag1"), "field", strlen("field"), QUERY_PREFIX);
- indexMultiTermQueryAdd(multiQuery, "tag3", strlen("tag3"), "0field0", strlen("0field0"), QUERY_TERM);
-
- SArray *result = (SArray *)taosArrayInit(10, sizeof(int));
- indexSearch(index, multiQuery, result);
-
- std::cout << "taos'size : " << taosArrayGetSize(result) << std::endl;
- for (int i = 0; i < taosArrayGetSize(result); i++) {
- int *v = (int *)taosArrayGet(result, i);
- std::cout << "value --->" << *v << std::endl;
- }
- // add more test case
- indexMultiTermQueryDestroy(multiQuery);
-
- indexOptsDestroy(opts);
- indexClose(index);
- //
+ std::string str1("bcd");
+ FstSlice key1 = fstSliceCreate((uint8_t *)str1.c_str(), str1.size());
+ Output val2 = 10;
+ FstBuilder *b = fstBuilderCreate(NULL, 1);
+ fstBuilderInsert(b, key, val);
+ fstBuilderInsert(b, key1, val2);
+ fstBuilderFinish(b);
+ fstBuilderDestroy(b);
+ fstSliceDestroy(&key);
+ return 1;
}
+
+//TEST(IndexFstBuilder, IndexFstInput) {
+//
+//}
+
+
diff --git a/source/libs/wal/inc/walInt.h b/source/libs/wal/inc/walInt.h
index 42ede49c6b..285d7e2576 100644
--- a/source/libs/wal/inc/walInt.h
+++ b/source/libs/wal/inc/walInt.h
@@ -17,14 +17,16 @@
#define _TD_WAL_INT_H_
#include "wal.h"
+#include "compare.h"
#ifdef __cplusplus
extern "C" {
#endif
-int walRotate(SWal* pWal);
int walGetFile(SWal* pWal, int32_t version);
+int64_t walGetSeq();
+
#ifdef __cplusplus
}
#endif
diff --git a/source/libs/wal/src/walIndex.c b/source/libs/wal/src/walIndex.c
index 2569af841f..1aa64b34b5 100644
--- a/source/libs/wal/src/walIndex.c
+++ b/source/libs/wal/src/walIndex.c
@@ -20,9 +20,38 @@
#include "tfile.h"
#include "walInt.h"
-int walSeekVerImpl(SWal *pWal, int64_t ver) {
- //close old file
+static int walSeekFilePos(SWal* pWal, int64_t ver) {
int code = 0;
+
+ int64_t idxTfd = pWal->curIdxTfd;
+ int64_t logTfd = pWal->curLogTfd;
+
+ //seek position
+ int64_t offset = (ver - pWal->curFileFirstVersion) * WAL_IDX_ENTRY_SIZE;
+ code = tfLseek(idxTfd, offset, SEEK_SET);
+ if(code != 0) {
+
+ }
+ int64_t readBuf[2];
+ code = tfRead(idxTfd, readBuf, sizeof(readBuf));
+ if(code != 0) {
+
+ }
+ //TODO:deserialize
+ ASSERT(readBuf[0] == ver);
+ code = tfLseek(logTfd, readBuf[1], SEEK_CUR);
+ if (code != 0) {
+
+ }
+ pWal->curLogOffset = readBuf[1];
+ pWal->curVersion = ver;
+ return code;
+}
+
+static int walChangeFile(SWal *pWal, int64_t ver) {
+ int code = 0;
+ int64_t idxTfd, logTfd;
+ char fnameStr[WAL_FILE_LEN];
code = tfClose(pWal->curLogTfd);
if(code != 0) {
//TODO
@@ -32,29 +61,36 @@ int walSeekVerImpl(SWal *pWal, int64_t ver) {
//TODO
}
//bsearch in fileSet
- int fName = 0;//TODO
- //open the right file
- char fNameStr[WAL_FILE_LEN];
- sprintf(fNameStr, "%d."WAL_INDEX_SUFFIX, fName);
- bool closed = 1; //TODO:read only
- int64_t idxTfd = tfOpenReadWrite(fNameStr);
- sprintf(fNameStr, "%d."WAL_LOG_SUFFIX, fName);
- int64_t logTfd = tfOpenReadWrite(fNameStr);
- //seek position
- int64_t offset = (ver - fName) * WAL_IDX_ENTRY_SIZE;
- tfLseek(idxTfd, offset, SEEK_SET);
- //set cur version, cur file version and cur status
- pWal->curFileFirstVersion = fName;
- pWal->curFileLastVersion = 1;//TODO
+ int64_t* pRet = taosArraySearch(pWal->fileSet, &ver, compareInt64Val, TD_LE);
+ ASSERT(pRet != NULL);
+ int64_t fname = *pRet;
+ if(fname < pWal->lastFileName) {
+ pWal->curStatus &= ~WAL_CUR_FILE_WRITABLE;
+ pWal->curFileLastVersion = pRet[1]-1;
+ sprintf(fnameStr, "%"PRId64"."WAL_INDEX_SUFFIX, fname);
+ idxTfd = tfOpenRead(fnameStr);
+ sprintf(fnameStr, "%"PRId64"."WAL_LOG_SUFFIX, fname);
+ logTfd = tfOpenRead(fnameStr);
+ } else {
+ pWal->curStatus |= WAL_CUR_FILE_WRITABLE;
+ pWal->curFileLastVersion = -1;
+ sprintf(fnameStr, "%"PRId64"."WAL_INDEX_SUFFIX, fname);
+ idxTfd = tfOpenReadWrite(fnameStr);
+ sprintf(fnameStr, "%"PRId64"."WAL_LOG_SUFFIX, fname);
+ logTfd = tfOpenReadWrite(fnameStr);
+ }
+
+ pWal->curFileFirstVersion = fname;
pWal->curLogTfd = logTfd;
pWal->curIdxTfd = idxTfd;
- pWal->curVersion = ver;
- pWal->curOffset = offset;
- pWal->curStatus = 0;//TODO
return code;
}
int walSeekVer(SWal *pWal, int64_t ver) {
+ if((!(pWal->curStatus & WAL_CUR_FAILED))
+ && ver == pWal->curVersion) {
+ return 0;
+ }
if(ver > pWal->lastVersion) {
//TODO: some records are skipped
return -1;
@@ -64,54 +100,13 @@ int walSeekVer(SWal *pWal, int64_t ver) {
return -1;
}
if(ver < pWal->snapshotVersion) {
- //TODO: seek snapshotted log
+ //TODO: seek snapshotted log, invalid in some cases
}
- if(ver >= pWal->curFileFirstVersion
- && ((pWal->curFileLastVersion == -1 && ver <= pWal->lastVersion) || (ver <= pWal->curFileLastVersion))) {
-
- }
- if(ver < pWal->curFileFirstVersion || (pWal->curFileLastVersion != -1 && ver > pWal->curFileLastVersion)) {
- int index = 0;
- index = 1;
- //back up to avoid inconsistency
- int64_t curVersion = pWal->curVersion;
- int64_t curOffset = pWal->curOffset;
- int64_t curFileFirstVersion = pWal->curFileFirstVersion;
- int64_t curFileLastVersion = pWal->curFileLastVersion;
- if(walSeekVerImpl(pWal, ver) < 0) {
- //TODO: errno
- pWal->curVersion = curVersion;
- pWal->curOffset = curOffset;
- pWal->curFileFirstVersion = curFileFirstVersion;
- pWal->curFileLastVersion = curFileLastVersion;
- return -1;
- }
+ if(ver < pWal->curFileFirstVersion ||
+ (pWal->curFileLastVersion != -1 && ver > pWal->curFileLastVersion)) {
+ walChangeFile(pWal, ver);
}
+ walSeekFilePos(pWal, ver);
return 0;
}
-
-int walWriteIndex(SWal *pWal, int64_t ver, int64_t offset) {
- int code = 0;
- //get index file
- if(!tfValid(pWal->curIdxTfd)) {
- code = TAOS_SYSTEM_ERROR(errno);
- wError("vgId:%d, file:%"PRId64".idx, failed to open since %s", pWal->vgId, pWal->curFileFirstVersion, strerror(errno));
- }
- if(pWal->curVersion != ver) {
- if(walSeekVer(pWal, ver) != 0) {
- //TODO: some records are skipped
- return -1;
- }
- }
- //check file checksum
- //append index
- return 0;
-}
-
-int walRotateIndex(SWal *pWal) {
- //check file checksum
- //create new file
- //switch file
- return 0;
-}
diff --git a/source/libs/wal/src/walMgmt.c b/source/libs/wal/src/walMgmt.c
index d60cdfe118..bc2e687069 100644
--- a/source/libs/wal/src/walMgmt.c
+++ b/source/libs/wal/src/walMgmt.c
@@ -26,57 +26,44 @@ int32_t walGetNextFile(SWal *pWal, int64_t *nextFileId);
int32_t walGetOldFile(SWal *pWal, int64_t curFileId, int32_t minDiff, int64_t *oldFileId);
int32_t walGetNewFile(SWal *pWal, int64_t *newFileId);
-static pthread_mutex_t walInitLock = PTHREAD_MUTEX_INITIALIZER;
-static int8_t walInited = 0;
-
typedef struct {
int32_t refSetId;
- int32_t seq;
+ uint32_t seq;
int8_t stop;
+ int8_t inited;
pthread_t thread;
- pthread_mutex_t mutex;
} SWalMgmt;
-static SWalMgmt tsWal = {0};
+static SWalMgmt tsWal = {0, .seq = 1};
static int32_t walCreateThread();
static void walStopThread();
static int32_t walInitObj(SWal *pWal);
static void walFreeObj(void *pWal);
-int32_t walInit() {
- //TODO: change to atomic
- pthread_mutex_lock(&walInitLock);
- if(walInited) {
- pthread_mutex_unlock(&walInitLock);
- return 0;
- } else {
- walInited = 1;
- pthread_mutex_unlock(&walInitLock);
- }
+int64_t walGetSeq() {
+ return (int64_t)atomic_load_32(&tsWal.seq);
+}
+
+int32_t walInit() {
+ int8_t old = atomic_val_compare_exchange_8(&tsWal.inited, 0, 1);
+ if(old == 1) return 0;
- int32_t code = 0;
tsWal.refSetId = taosOpenRef(TSDB_MIN_VNODES, walFreeObj);
- code = pthread_mutex_init(&tsWal.mutex, NULL);
- if (code != 0) {
- wError("failed to init wal mutex since %s", tstrerror(code));
- return code;
- }
-
- code = walCreateThread();
+ int code = walCreateThread();
if (code != 0) {
wError("failed to init wal module since %s", tstrerror(code));
+ atomic_store_8(&tsWal.inited, 0);
return code;
}
wInfo("wal module is initialized, rsetId:%d", tsWal.refSetId);
- return code;
+ return 0;
}
void walCleanUp() {
walStopThread();
taosCloseRef(tsWal.refSetId);
- pthread_mutex_destroy(&tsWal.mutex);
wInfo("wal module is cleaned up");
}
@@ -92,7 +79,7 @@ static int walLoadFileset(SWal *pWal) {
char *name = ent->d_name;
name[WAL_NOSUFFIX_LEN] = 0;
//validate file name by regex matching
- if(1 /* regex match */) {
+ if(1 /* TODO:regex match */) {
int64_t fnameInt64 = atoll(name);
taosArrayPush(pWal->fileSet, &fnameInt64);
}
@@ -133,6 +120,7 @@ SWal *walOpen(const char *path, SWalCfg *pCfg) {
walFreeObj(pWal);
return NULL;
}
+ walLoadFileset(pWal);
wDebug("vgId:%d, wal:%p is opened, level:%d fsyncPeriod:%d", pWal->vgId, pWal, pWal->level, pWal->fsyncPeriod);
@@ -164,6 +152,9 @@ void walClose(SWal *pWal) {
pthread_mutex_lock(&pWal->mutex);
tfClose(pWal->curLogTfd);
+ tfClose(pWal->curIdxTfd);
+ taosArrayDestroy(pWal->fileSet);
+ pWal->fileSet = NULL;
pthread_mutex_unlock(&pWal->mutex);
taosRemoveRef(tsWal.refSetId, pWal->refId);
}
@@ -188,6 +179,9 @@ static void walFreeObj(void *wal) {
wDebug("vgId:%d, wal:%p is freed", pWal->vgId, pWal);
tfClose(pWal->curLogTfd);
+ tfClose(pWal->curIdxTfd);
+ taosArrayDestroy(pWal->fileSet);
+ pWal->fileSet = NULL;
pthread_mutex_destroy(&pWal->mutex);
tfree(pWal);
}
@@ -197,7 +191,7 @@ static bool walNeedFsync(SWal *pWal) {
return false;
}
- if (tsWal.seq % pWal->fsyncSeq == 0) {
+ if (atomic_load_32(&tsWal.seq) % pWal->fsyncSeq == 0) {
return true;
}
@@ -206,16 +200,14 @@ static bool walNeedFsync(SWal *pWal) {
static void walUpdateSeq() {
taosMsleep(WAL_REFRESH_MS);
- if (++tsWal.seq <= 0) {
- tsWal.seq = 1;
- }
+ atomic_add_fetch_32(&tsWal.seq, 1);
}
static void walFsyncAll() {
SWal *pWal = taosIterateRef(tsWal.refSetId, 0);
while (pWal) {
if (walNeedFsync(pWal)) {
- wTrace("vgId:%d, do fsync, level:%d seq:%d rseq:%d", pWal->vgId, pWal->level, pWal->fsyncSeq, tsWal.seq);
+ wTrace("vgId:%d, do fsync, level:%d seq:%d rseq:%d", pWal->vgId, pWal->level, pWal->fsyncSeq, atomic_load_32(&tsWal.seq));
int32_t code = tfFsync(pWal->curLogTfd);
if (code != 0) {
wError("vgId:%d, file:%"PRId64".log, failed to fsync since %s", pWal->vgId, pWal->curFileFirstVersion, strerror(code));
@@ -226,16 +218,12 @@ static void walFsyncAll() {
}
static void *walThreadFunc(void *param) {
- int stop = 0;
setThreadName("wal");
while (1) {
walUpdateSeq();
walFsyncAll();
- pthread_mutex_lock(&tsWal.mutex);
- stop = tsWal.stop;
- pthread_mutex_unlock(&tsWal.mutex);
- if (stop) break;
+ if (atomic_load_8(&tsWal.stop)) break;
}
return NULL;
@@ -258,9 +246,7 @@ static int32_t walCreateThread() {
}
static void walStopThread() {
- pthread_mutex_lock(&tsWal.mutex);
- tsWal.stop = 1;
- pthread_mutex_unlock(&tsWal.mutex);
+ atomic_store_8(&tsWal.stop, 1);
if (taosCheckPthreadValid(tsWal.thread)) {
pthread_join(tsWal.thread, NULL);
diff --git a/source/libs/wal/src/walWrite.c b/source/libs/wal/src/walWrite.c
index 7563ec02c7..69c83a9912 100644
--- a/source/libs/wal/src/walWrite.c
+++ b/source/libs/wal/src/walWrite.c
@@ -26,10 +26,24 @@ int32_t walCommit(SWal *pWal, int64_t ver) {
}
int32_t walRollback(SWal *pWal, int64_t ver) {
+ //TODO: ftruncate
return 0;
}
int32_t walTakeSnapshot(SWal *pWal, int64_t ver) {
+ pWal->snapshotVersion = ver;
+
+ //mark files safe to delete
+ int64_t* pRet = taosArraySearch(pWal->fileSet, &ver, compareInt64Val, TD_LE);
+ if(pRet != pWal->fileSet->pData) {
+ //delete files until less than retention size
+
+ //find first file that exceeds retention time
+
+ }
+
+ //delete files living longer than retention limit
+ //remove file from fileset
return 0;
}
@@ -124,13 +138,102 @@ void walRemoveAllOldFiles(void *handle) {
}
#endif
+static int walRoll(SWal *pWal) {
+ int code = 0;
+ code = tfClose(pWal->curIdxTfd);
+ if(code != 0) {
+ return code;
+ }
+ code = tfClose(pWal->curLogTfd);
+ if(code != 0) {
+ return code;
+ }
+ int64_t idxTfd, logTfd;
+ //create new file
+ int64_t newFileFirstVersion = pWal->lastVersion + 1;
+ char fnameStr[WAL_FILE_LEN];
+ sprintf(fnameStr, "%"PRId64"."WAL_INDEX_SUFFIX, newFileFirstVersion);
+ idxTfd = tfOpenCreateWrite(fnameStr);
+ sprintf(fnameStr, "%"PRId64"."WAL_LOG_SUFFIX, newFileFirstVersion);
+ logTfd = tfOpenCreateWrite(fnameStr);
+
+ taosArrayPush(pWal->fileSet, &newFileFirstVersion);
+
+ //switch file
+ pWal->curIdxTfd = idxTfd;
+ pWal->curLogTfd = logTfd;
+ //change status
+ pWal->curFileLastVersion = -1;
+ pWal->curFileFirstVersion = newFileFirstVersion;
+ pWal->curVersion = newFileFirstVersion;
+ pWal->curLogOffset = 0;
+ pWal->curStatus = WAL_CUR_FILE_WRITABLE & WAL_CUR_POS_WRITABLE;
+
+ pWal->lastFileName = newFileFirstVersion;
+ pWal->lastFileWriteSize = 0;
+ pWal->lastRollSeq = walGetSeq();
+ return 0;
+}
+
+int walChangeFileToLast(SWal *pWal) {
+ int64_t idxTfd, logTfd;
+ int64_t* pRet = taosArrayGetLast(pWal->fileSet);
+ ASSERT(pRet != NULL);
+ int64_t fname = *pRet;
+
+ char fnameStr[WAL_FILE_LEN];
+ sprintf(fnameStr, "%"PRId64"."WAL_INDEX_SUFFIX, fname);
+ idxTfd = tfOpenReadWrite(fnameStr);
+ sprintf(fnameStr, "%"PRId64"."WAL_LOG_SUFFIX, fname);
+ logTfd = tfOpenReadWrite(fnameStr);
+ //switch file
+ pWal->curIdxTfd = idxTfd;
+ pWal->curLogTfd = logTfd;
+ //change status
+ pWal->curFileLastVersion = -1;
+ pWal->curFileFirstVersion = fname;
+ pWal->curVersion = fname;
+ pWal->curLogOffset = 0;
+ pWal->curStatus = WAL_CUR_FILE_WRITABLE;
+ return 0;
+}
+
+int walWriteIndex(SWal *pWal, int64_t ver, int64_t offset) {
+ int code = 0;
+ //get index file
+ if(!tfValid(pWal->curIdxTfd)) {
+ code = TAOS_SYSTEM_ERROR(errno);
+ wError("vgId:%d, file:%"PRId64".idx, failed to open since %s", pWal->vgId, pWal->curFileFirstVersion, strerror(errno));
+ }
+ int64_t writeBuf[2] = { ver, offset };
+ int size = tfWrite(pWal->curIdxTfd, writeBuf, sizeof(writeBuf));
+ if(size != sizeof(writeBuf)) {
+ //TODO:
+ }
+ return 0;
+}
+
int64_t walWrite(SWal *pWal, int64_t index, uint8_t msgType, void *body, int32_t bodyLen) {
if (pWal == NULL) return -1;
// no wal
- if (!tfValid(pWal->curLogTfd)) return 0;
if (pWal->level == TAOS_WAL_NOLOG) return 0;
- if (index > pWal->lastVersion + 1) return -1;
+
+ if (index == pWal->lastVersion + 1) {
+ int64_t passed = walGetSeq() - pWal->lastRollSeq;
+ if(passed > pWal->rollPeriod) {
+ walRoll(pWal);
+ } else if(pWal->lastFileWriteSize > pWal->segSize) {
+ walRoll(pWal);
+ } else {
+ walChangeFileToLast(pWal);
+ }
+ } else {
+ //reject skip log or rewrite log
+ //must truncate explicitly first
+ return -1;
+ }
+ if (!tfValid(pWal->curLogTfd)) return 0;
pWal->head.version = index;
int32_t code = 0;
@@ -155,8 +258,12 @@ int64_t walWrite(SWal *pWal, int64_t index, uint8_t msgType, void *body, int32_t
code = TAOS_SYSTEM_ERROR(errno);
wError("vgId:%d, file:%"PRId64".log, failed to write since %s", pWal->vgId, pWal->curFileFirstVersion, strerror(errno));
}
- //TODO:write idx
+ walWriteIndex(pWal, index, pWal->curLogOffset);
+ pWal->curLogOffset += sizeof(SWalHead) + bodyLen;
+ //set status
+ pWal->lastVersion = index;
+
pthread_mutex_unlock(&pWal->mutex);
return code;
diff --git a/source/util/src/tfile.c b/source/util/src/tfile.c
index 5b61186b12..5d4789aae6 100644
--- a/source/util/src/tfile.c
+++ b/source/util/src/tfile.c
@@ -53,6 +53,11 @@ static int64_t tfOpenImp(int32_t fd) {
return rid;
}
+int64_t tfOpenRead(const char *pathname, int32_t flags) {
+ int32_t fd = taosOpenFileRead(pathname);
+ return tfOpenImp(fd);
+}
+
int64_t tfOpenReadWrite(const char *pathname, int32_t flags) {
int32_t fd = taosOpenFileReadWrite(pathname);
return tfOpenImp(fd);