Merge remote-tracking branch 'origin/3.0' into feature/dnode3
This commit is contained in:
commit
89270a08bf
|
@ -39,6 +39,14 @@ typedef enum {
|
||||||
} EWalType;
|
} EWalType;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
|
//union {
|
||||||
|
//uint32_t info;
|
||||||
|
//struct {
|
||||||
|
//uint32_t sver:3;
|
||||||
|
//uint32_t msgtype: 5;
|
||||||
|
//uint32_t reserved : 24;
|
||||||
|
//};
|
||||||
|
//};
|
||||||
int8_t sver;
|
int8_t sver;
|
||||||
uint8_t msgType;
|
uint8_t msgType;
|
||||||
int8_t reserved[2];
|
int8_t reserved[2];
|
||||||
|
@ -71,13 +79,17 @@ typedef struct {
|
||||||
#define WAL_FILESET_MAX 128
|
#define WAL_FILESET_MAX 128
|
||||||
|
|
||||||
#define WAL_IDX_ENTRY_SIZE (sizeof(int64_t)*2)
|
#define WAL_IDX_ENTRY_SIZE (sizeof(int64_t)*2)
|
||||||
#define WAL_CUR_POS_READ_ONLY 1
|
#define WAL_CUR_POS_WRITABLE 1
|
||||||
#define WAL_CUR_FILE_READ_ONLY 2
|
#define WAL_CUR_FILE_WRITABLE 2
|
||||||
|
#define WAL_CUR_FAILED 4
|
||||||
|
|
||||||
typedef struct SWal {
|
typedef struct SWal {
|
||||||
// cfg
|
// cfg
|
||||||
int32_t vgId;
|
int32_t vgId;
|
||||||
int32_t fsyncPeriod; // millisecond
|
int32_t fsyncPeriod; // millisecond
|
||||||
|
int32_t fsyncSeq;
|
||||||
|
int32_t rollPeriod; // second
|
||||||
|
int64_t segSize;
|
||||||
EWalType level;
|
EWalType level;
|
||||||
//reference
|
//reference
|
||||||
int64_t refId;
|
int64_t refId;
|
||||||
|
@ -86,7 +98,7 @@ typedef struct SWal {
|
||||||
int64_t curIdxTfd;
|
int64_t curIdxTfd;
|
||||||
//current version
|
//current version
|
||||||
int64_t curVersion;
|
int64_t curVersion;
|
||||||
int64_t curOffset;
|
int64_t curLogOffset;
|
||||||
//current file version
|
//current file version
|
||||||
int64_t curFileFirstVersion;
|
int64_t curFileFirstVersion;
|
||||||
int64_t curFileLastVersion;
|
int64_t curFileLastVersion;
|
||||||
|
@ -94,8 +106,10 @@ typedef struct SWal {
|
||||||
int64_t firstVersion;
|
int64_t firstVersion;
|
||||||
int64_t snapshotVersion;
|
int64_t snapshotVersion;
|
||||||
int64_t lastVersion;
|
int64_t lastVersion;
|
||||||
//fsync status
|
int64_t lastFileName;
|
||||||
int32_t fsyncSeq;
|
//roll status
|
||||||
|
int64_t lastRollSeq;
|
||||||
|
int64_t lastFileWriteSize;
|
||||||
//ctl
|
//ctl
|
||||||
int32_t curStatus;
|
int32_t curStatus;
|
||||||
pthread_mutex_t mutex;
|
pthread_mutex_t mutex;
|
||||||
|
@ -119,12 +133,10 @@ int32_t walAlter(SWal *, SWalCfg *pCfg);
|
||||||
void walClose(SWal *);
|
void walClose(SWal *);
|
||||||
|
|
||||||
// write
|
// write
|
||||||
//int64_t walWriteWithMsgType(SWal*, int8_t msgType, void* body, int32_t bodyLen);
|
|
||||||
int64_t walWrite(SWal *, int64_t index, uint8_t msgType, void *body, int32_t bodyLen);
|
int64_t walWrite(SWal *, int64_t index, uint8_t msgType, void *body, int32_t bodyLen);
|
||||||
//int64_t walWriteBatch(SWal *, void **bodies, int32_t *bodyLen, int32_t batchSize);
|
void walFsync(SWal *, bool force);
|
||||||
|
|
||||||
// apis for lifecycle management
|
// apis for lifecycle management
|
||||||
void walFsync(SWal *, bool force);
|
|
||||||
int32_t walCommit(SWal *, int64_t ver);
|
int32_t walCommit(SWal *, int64_t ver);
|
||||||
// truncate after
|
// truncate after
|
||||||
int32_t walRollback(SWal *, int64_t ver);
|
int32_t walRollback(SWal *, int64_t ver);
|
||||||
|
|
|
@ -28,6 +28,7 @@ void tfCleanup();
|
||||||
|
|
||||||
// the same syntax as UNIX standard open/close/read/write
|
// the same syntax as UNIX standard open/close/read/write
|
||||||
// but FD is int64_t and will never be reused
|
// but FD is int64_t and will never be reused
|
||||||
|
int64_t tfOpenRead(const char *pathname);
|
||||||
int64_t tfOpenReadWrite(const char *pathname);
|
int64_t tfOpenReadWrite(const char *pathname);
|
||||||
int64_t tfOpenCreateWrite(const char *pathname);
|
int64_t tfOpenCreateWrite(const char *pathname);
|
||||||
int64_t tfOpenCreateWriteAppend(const char *pathname);
|
int64_t tfOpenCreateWriteAppend(const char *pathname);
|
||||||
|
|
|
@ -16,6 +16,9 @@
|
||||||
#ifndef __INDEX_FST_H__
|
#ifndef __INDEX_FST_H__
|
||||||
#define __INDEX_FST_H__
|
#define __INDEX_FST_H__
|
||||||
|
|
||||||
|
#ifdef __cplusplus
|
||||||
|
extern "C" {
|
||||||
|
#endif
|
||||||
|
|
||||||
#include "tarray.h"
|
#include "tarray.h"
|
||||||
#include "index_fst_util.h"
|
#include "index_fst_util.h"
|
||||||
|
@ -34,6 +37,7 @@ typedef struct FstRange {
|
||||||
} FstRange;
|
} FstRange;
|
||||||
|
|
||||||
|
|
||||||
|
typedef enum {GE, GT, LE, LT} RangeType;
|
||||||
typedef enum { OneTransNext, OneTrans, AnyTrans, EmptyFinal} State;
|
typedef enum { OneTransNext, OneTrans, AnyTrans, EmptyFinal} State;
|
||||||
|
|
||||||
typedef enum {Ordered, OutOfOrdered, DuplicateKey} OrderType;
|
typedef enum {Ordered, OutOfOrdered, DuplicateKey} OrderType;
|
||||||
|
@ -93,8 +97,11 @@ typedef struct FstBuilder {
|
||||||
|
|
||||||
|
|
||||||
FstBuilder *fstBuilderCreate(void *w, FstType ty);
|
FstBuilder *fstBuilderCreate(void *w, FstType ty);
|
||||||
|
|
||||||
|
|
||||||
void fstBuilderDestroy(FstBuilder *b);
|
void fstBuilderDestroy(FstBuilder *b);
|
||||||
void fstBuilderInsertOutput(FstBuilder *b, FstSlice bs, Output in);
|
void fstBuilderInsertOutput(FstBuilder *b, FstSlice bs, Output in);
|
||||||
|
bool fstBuilderInsert(FstBuilder *b, FstSlice bs, Output in);
|
||||||
OrderType fstBuilderCheckLastKey(FstBuilder *b, FstSlice bs, bool ckDup);
|
OrderType fstBuilderCheckLastKey(FstBuilder *b, FstSlice bs, bool ckDup);
|
||||||
void fstBuilderCompileFrom(FstBuilder *b, uint64_t istate);
|
void fstBuilderCompileFrom(FstBuilder *b, uint64_t istate);
|
||||||
CompiledAddr fstBuilderCompile(FstBuilder *b, FstBuilderNode *bn);
|
CompiledAddr fstBuilderCompile(FstBuilder *b, FstBuilderNode *bn);
|
||||||
|
@ -169,11 +176,6 @@ uint64_t fstStateFindInput(FstState *state, FstNode *node, uint8_t b, bool *null
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
#define FST_STATE_ONE_TRNAS_NEXT(node) (node->state.state == OneTransNext)
|
#define FST_STATE_ONE_TRNAS_NEXT(node) (node->state.state == OneTransNext)
|
||||||
#define FST_STATE_ONE_TRNAS(node) (node->state.state == OneTrans)
|
#define FST_STATE_ONE_TRNAS(node) (node->state.state == OneTrans)
|
||||||
#define FST_STATE_ANY_TRANS(node) (node->state.state == AnyTrans)
|
#define FST_STATE_ANY_TRANS(node) (node->state.state == AnyTrans)
|
||||||
|
@ -272,7 +274,6 @@ FstNode* fstGetNode(Fst *fst, CompiledAddr);
|
||||||
FstNode* fstGetRoot(Fst *fst);
|
FstNode* fstGetRoot(Fst *fst);
|
||||||
FstType fstGetType(Fst *fst);
|
FstType fstGetType(Fst *fst);
|
||||||
CompiledAddr fstGetRootAddr(Fst *fst);
|
CompiledAddr fstGetRootAddr(Fst *fst);
|
||||||
|
|
||||||
Output fstEmptyFinalOutput(Fst *fst, bool *null);
|
Output fstEmptyFinalOutput(Fst *fst, bool *null);
|
||||||
bool fstVerify(Fst *fst);
|
bool fstVerify(Fst *fst);
|
||||||
|
|
||||||
|
@ -280,10 +281,6 @@ bool fstVerify(Fst *fst);
|
||||||
//refactor this function
|
//refactor this function
|
||||||
bool fstBuilderNodeCompileTo(FstBuilderNode *b, FstCountingWriter *wrt, CompiledAddr lastAddr, CompiledAddr startAddr);
|
bool fstBuilderNodeCompileTo(FstBuilderNode *b, FstCountingWriter *wrt, CompiledAddr lastAddr, CompiledAddr startAddr);
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
typedef struct StreamState {
|
typedef struct StreamState {
|
||||||
FstNode *node;
|
FstNode *node;
|
||||||
uint64_t trans;
|
uint64_t trans;
|
||||||
|
@ -310,10 +307,30 @@ typedef struct StreamWithStateResult {
|
||||||
} StreamWithStateResult;
|
} StreamWithStateResult;
|
||||||
|
|
||||||
StreamWithStateResult *swsResultCreate(FstSlice *data, FstOutput fOut, void *state);
|
StreamWithStateResult *swsResultCreate(FstSlice *data, FstOutput fOut, void *state);
|
||||||
|
void swsResultDestroy(StreamWithStateResult *result);
|
||||||
|
|
||||||
typedef void* (*StreamCallback)(void *);
|
typedef void* (*StreamCallback)(void *);
|
||||||
StreamWithState *streamWithStateCreate(Fst *fst, Automation *automation, FstBoundWithData *min, FstBoundWithData *max) ;
|
StreamWithState *streamWithStateCreate(Fst *fst, Automation *automation, FstBoundWithData *min, FstBoundWithData *max) ;
|
||||||
void streamWithStateDestroy(StreamWithState *sws);
|
void streamWithStateDestroy(StreamWithState *sws);
|
||||||
bool streamWithStateSeekMin(StreamWithState *sws, FstBoundWithData *min);
|
bool streamWithStateSeekMin(StreamWithState *sws, FstBoundWithData *min);
|
||||||
StreamWithStateResult* streamWithStateNextWith(StreamWithState *sws, StreamCallback callback);
|
StreamWithStateResult* streamWithStateNextWith(StreamWithState *sws, StreamCallback callback);
|
||||||
|
|
||||||
|
typedef struct FstStreamBuilder {
|
||||||
|
Fst *fst;
|
||||||
|
Automation *aut;
|
||||||
|
FstBoundWithData *min;
|
||||||
|
FstBoundWithData *max;
|
||||||
|
} FstStreamBuilder;
|
||||||
|
|
||||||
|
FstStreamBuilder *fstStreamBuilderCreate(Fst *fst, Automation *aut);
|
||||||
|
// set up bound range
|
||||||
|
// refator, simple code by marco
|
||||||
|
|
||||||
|
FstStreamBuilder *fstStreamBuilderRange(FstStreamBuilder *b, FstSlice *val, RangeType type);
|
||||||
|
|
||||||
|
|
||||||
|
#ifdef __cplusplus
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -15,6 +15,10 @@
|
||||||
#ifndef __INDEX_FST_AUTAOMATION_H__
|
#ifndef __INDEX_FST_AUTAOMATION_H__
|
||||||
#define __INDEX_FST_AUTAOMATION_H__
|
#define __INDEX_FST_AUTAOMATION_H__
|
||||||
|
|
||||||
|
#ifdef __cplusplus
|
||||||
|
extern "C" {
|
||||||
|
#endif
|
||||||
|
|
||||||
typedef struct AutomationCtx AutomationCtx;
|
typedef struct AutomationCtx AutomationCtx;
|
||||||
|
|
||||||
typedef struct StartWith {
|
typedef struct StartWith {
|
||||||
|
@ -42,6 +46,8 @@ typedef struct Automation {
|
||||||
void *data;
|
void *data;
|
||||||
} Automation;
|
} Automation;
|
||||||
|
|
||||||
|
#ifdef __cplusplus
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -1,7 +1,16 @@
|
||||||
#ifndef __INDEX_FST_COMM_H__
|
#ifndef __INDEX_FST_COMM_H__
|
||||||
#define __INDEX_FST_COMM_H__
|
#define __INDEX_FST_COMM_H__
|
||||||
|
|
||||||
|
|
||||||
extern const uint8_t COMMON_INPUTS[];
|
extern const uint8_t COMMON_INPUTS[];
|
||||||
extern char const COMMON_INPUTS_INV[];
|
extern char const COMMON_INPUTS_INV[];
|
||||||
|
|
||||||
|
#ifdef __cplusplus
|
||||||
|
extern "C" {
|
||||||
|
#endif
|
||||||
|
|
||||||
|
#ifdef __cplusplus
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -16,6 +16,38 @@
|
||||||
#ifndef __INDEX_FST_COUNTING_WRITER_H__
|
#ifndef __INDEX_FST_COUNTING_WRITER_H__
|
||||||
#define __INDEX_FST_COUNTING_WRITER_H__
|
#define __INDEX_FST_COUNTING_WRITER_H__
|
||||||
|
|
||||||
|
#ifdef __cplusplus
|
||||||
|
extern "C" {
|
||||||
|
#endif
|
||||||
|
|
||||||
|
#include "tfile.h"
|
||||||
|
|
||||||
|
|
||||||
|
#define DefaultMem 1024*1024
|
||||||
|
|
||||||
|
static char tmpFile[] = "/tmp/index";
|
||||||
|
typedef enum WriterType {TMemory, TFile} WriterType;
|
||||||
|
|
||||||
|
typedef struct WriterCtx {
|
||||||
|
int (*write)(struct WriterCtx *ctx, uint8_t *buf, int len);
|
||||||
|
int (*read)(struct WriterCtx *ctx, uint8_t *buf, int len);
|
||||||
|
int (*flush)(struct WriterCtx *ctx);
|
||||||
|
WriterType type;
|
||||||
|
union {
|
||||||
|
int fd;
|
||||||
|
void *mem;
|
||||||
|
};
|
||||||
|
int32_t offset;
|
||||||
|
int32_t limit;
|
||||||
|
} WriterCtx;
|
||||||
|
|
||||||
|
static int writeCtxDoWrite(WriterCtx *ctx, uint8_t *buf, int len);
|
||||||
|
static int writeCtxDoRead(WriterCtx *ctx, uint8_t *buf, int len);
|
||||||
|
static int writeCtxDoFlush(WriterCtx *ctx);
|
||||||
|
|
||||||
|
WriterCtx* writerCtxCreate(WriterType type);
|
||||||
|
void writerCtxDestroy(WriterCtx *w);
|
||||||
|
|
||||||
typedef uint32_t CheckSummer;
|
typedef uint32_t CheckSummer;
|
||||||
|
|
||||||
|
|
||||||
|
@ -25,7 +57,7 @@ typedef struct FstCountingWriter {
|
||||||
CheckSummer summer;
|
CheckSummer summer;
|
||||||
} FstCountingWriter;
|
} FstCountingWriter;
|
||||||
|
|
||||||
uint64_t fstCountingWriterWrite(FstCountingWriter *write, uint8_t *buf, uint32_t bufLen);
|
int fstCountingWriterWrite(FstCountingWriter *write, uint8_t *buf, uint32_t bufLen);
|
||||||
|
|
||||||
int fstCountingWriterFlush(FstCountingWriter *write);
|
int fstCountingWriterFlush(FstCountingWriter *write);
|
||||||
|
|
||||||
|
@ -44,6 +76,10 @@ uint8_t fstCountingWriterPackUint(FstCountingWriter *writer, uint64_t n);
|
||||||
#define FST_WRITER_INTER_WRITER(writer) (writer->wtr)
|
#define FST_WRITER_INTER_WRITER(writer) (writer->wtr)
|
||||||
#define FST_WRITE_CHECK_SUMMER(writer) (writer->summer)
|
#define FST_WRITE_CHECK_SUMMER(writer) (writer->summer)
|
||||||
|
|
||||||
|
#ifdef __cplusplus
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -16,6 +16,10 @@
|
||||||
#ifndef __INDEX_FST_NODE_H__
|
#ifndef __INDEX_FST_NODE_H__
|
||||||
#define __INDEX_FST_NODE_H__
|
#define __INDEX_FST_NODE_H__
|
||||||
|
|
||||||
|
#ifdef __cplusplus
|
||||||
|
extern "C" {
|
||||||
|
#endif
|
||||||
|
|
||||||
#include "index_fst_util.h"
|
#include "index_fst_util.h"
|
||||||
#include "index_fst_counting_writer.h"
|
#include "index_fst_counting_writer.h"
|
||||||
|
|
||||||
|
@ -42,7 +46,12 @@ FstBuilderNode *fstBuilderNodeClone(FstBuilderNode *src);
|
||||||
void fstBuilderNodeCloneFrom(FstBuilderNode *dst, FstBuilderNode *src);
|
void fstBuilderNodeCloneFrom(FstBuilderNode *dst, FstBuilderNode *src);
|
||||||
|
|
||||||
//bool fstBuilderNodeCompileTo(FstBuilderNode *b, FstCountingWriter *wrt, CompiledAddr lastAddr, CompiledAddr startAddr);
|
//bool fstBuilderNodeCompileTo(FstBuilderNode *b, FstCountingWriter *wrt, CompiledAddr lastAddr, CompiledAddr startAddr);
|
||||||
|
bool fstBuilderNodeEqual(FstBuilderNode *n1, FstBuilderNode *n2);
|
||||||
|
|
||||||
void fstBuilderNodeDestroy(FstBuilderNode *node);
|
void fstBuilderNodeDestroy(FstBuilderNode *node);
|
||||||
|
|
||||||
|
#ifdef __cplusplus
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -15,6 +15,10 @@
|
||||||
#ifndef __FST_REGISTRY_H__
|
#ifndef __FST_REGISTRY_H__
|
||||||
#define __FST_REGISTRY_H__
|
#define __FST_REGISTRY_H__
|
||||||
|
|
||||||
|
#ifdef __cplusplus
|
||||||
|
extern "C" {
|
||||||
|
#endif
|
||||||
|
|
||||||
#include "index_fst_util.h"
|
#include "index_fst_util.h"
|
||||||
#include "tarray.h"
|
#include "tarray.h"
|
||||||
#include "index_fst_node.h"
|
#include "index_fst_node.h"
|
||||||
|
@ -59,4 +63,8 @@ void fstRegistryDestroy(FstRegistry *registry);
|
||||||
FstRegistryEntry* fstRegistryGetEntry(FstRegistry *registry, FstBuilderNode *bNode);
|
FstRegistryEntry* fstRegistryGetEntry(FstRegistry *registry, FstBuilderNode *bNode);
|
||||||
void fstRegistryEntryDestroy(FstRegistryEntry *entry);
|
void fstRegistryEntryDestroy(FstRegistryEntry *entry);
|
||||||
|
|
||||||
|
#ifdef __cplusplus
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -17,6 +17,10 @@
|
||||||
#ifndef __INDEX_FST_UTIL_H__
|
#ifndef __INDEX_FST_UTIL_H__
|
||||||
#define __INDEX_FST_UTIL_H__
|
#define __INDEX_FST_UTIL_H__
|
||||||
|
|
||||||
|
#ifdef __cplusplus
|
||||||
|
extern "C" {
|
||||||
|
#endif
|
||||||
|
|
||||||
#include "tarray.h"
|
#include "tarray.h"
|
||||||
#include "index_fst_common.h"
|
#include "index_fst_common.h"
|
||||||
|
|
||||||
|
@ -67,20 +71,30 @@ uint8_t packDeltaSize(CompiledAddr nodeAddr, CompiledAddr transAddr);
|
||||||
CompiledAddr unpackDelta(char *data, uint64_t len, uint64_t nodeAddr);
|
CompiledAddr unpackDelta(char *data, uint64_t len, uint64_t nodeAddr);
|
||||||
|
|
||||||
|
|
||||||
|
typedef struct FstString {
|
||||||
|
uint8_t *data;
|
||||||
|
uint32_t len;
|
||||||
|
int32_t ref;
|
||||||
|
} FstString;
|
||||||
|
|
||||||
typedef struct FstSlice {
|
typedef struct FstSlice {
|
||||||
uint8_t *data;
|
FstString *str;
|
||||||
uint64_t dLen;
|
|
||||||
int32_t start;
|
int32_t start;
|
||||||
int32_t end;
|
int32_t end;
|
||||||
} FstSlice;
|
} FstSlice;
|
||||||
|
|
||||||
FstSlice fstSliceCopy(FstSlice *slice, int32_t start, int32_t end);
|
FstSlice fstSliceCreate(uint8_t *data, uint64_t len);
|
||||||
FstSlice fstSliceCreate(uint8_t *data, uint64_t dLen);
|
FstSlice fstSliceCopy(FstSlice *s, int32_t start, int32_t end);
|
||||||
bool fstSliceEmpty(FstSlice *slice);
|
FstSlice fstSliceDeepCopy(FstSlice *s, int32_t start, int32_t end);
|
||||||
int fstSliceCompare(FstSlice *a, FstSlice *b);
|
bool fstSliceIsEmpty(FstSlice *s);
|
||||||
|
int fstSliceCompare(FstSlice *s1, FstSlice *s2);
|
||||||
|
void fstSliceDestroy(FstSlice *s);
|
||||||
|
uint8_t *fstSliceData(FstSlice *s, int32_t *sz);
|
||||||
|
|
||||||
#define FST_SLICE_LEN(s) ((s)->end - (s)->start + 1)
|
#define FST_SLICE_LEN(s) (s->end - s->start + 1)
|
||||||
|
|
||||||
|
#ifdef __cplusplus
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -52,7 +52,7 @@ void fstUnFinishedNodesPushEmpty(FstUnFinishedNodes *nodes, bool isFinal) {
|
||||||
FstBuilderNode *node = malloc(sizeof(FstBuilderNode));
|
FstBuilderNode *node = malloc(sizeof(FstBuilderNode));
|
||||||
node->isFinal = isFinal;
|
node->isFinal = isFinal;
|
||||||
node->finalOutput = 0;
|
node->finalOutput = 0;
|
||||||
node->trans = NULL;
|
node->trans = taosArrayInit(16, sizeof(FstTransition));
|
||||||
|
|
||||||
FstBuilderNodeUnfinished un = {.node = node, .last = NULL};
|
FstBuilderNodeUnfinished un = {.node = node, .last = NULL};
|
||||||
taosArrayPush(nodes->stack, &un);
|
taosArrayPush(nodes->stack, &un);
|
||||||
|
@ -92,7 +92,7 @@ void fstUnFinishedNodesTopLastFreeze(FstUnFinishedNodes *nodes, CompiledAddr add
|
||||||
}
|
}
|
||||||
void fstUnFinishedNodesAddSuffix(FstUnFinishedNodes *nodes, FstSlice bs, Output out) {
|
void fstUnFinishedNodesAddSuffix(FstUnFinishedNodes *nodes, FstSlice bs, Output out) {
|
||||||
FstSlice *s = &bs;
|
FstSlice *s = &bs;
|
||||||
if (s->data == NULL || s->dLen == 0 || s->start > s->end) {
|
if (fstSliceIsEmpty(s)) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
size_t sz = taosArrayGetSize(nodes->stack) - 1;
|
size_t sz = taosArrayGetSize(nodes->stack) - 1;
|
||||||
|
@ -104,18 +104,20 @@ void fstUnFinishedNodesAddSuffix(FstUnFinishedNodes *nodes, FstSlice bs, Output
|
||||||
//FstLastTransition *trn = malloc(sizeof(FstLastTransition));
|
//FstLastTransition *trn = malloc(sizeof(FstLastTransition));
|
||||||
//trn->inp = s->data[s->start];
|
//trn->inp = s->data[s->start];
|
||||||
//trn->out = out;
|
//trn->out = out;
|
||||||
un->last = fstLastTransitionCreate(s->data[s->start], out);
|
int32_t len = 0;
|
||||||
|
uint8_t *data = fstSliceData(s, &len);
|
||||||
|
un->last = fstLastTransitionCreate(data[0], out);
|
||||||
|
|
||||||
for (uint64_t i = s->start; i <= s->end; i++) {
|
for (uint64_t i = 0; i < len; i++) {
|
||||||
FstBuilderNode *n = malloc(sizeof(FstBuilderNode));
|
FstBuilderNode *n = malloc(sizeof(FstBuilderNode));
|
||||||
n->isFinal = false;
|
n->isFinal = false;
|
||||||
n->finalOutput = 0;
|
n->finalOutput = 0;
|
||||||
n->trans = NULL;
|
n->trans = taosArrayInit(16, sizeof(FstTransition));
|
||||||
|
|
||||||
//FstLastTransition *trn = malloc(sizeof(FstLastTransition));
|
//FstLastTransition *trn = malloc(sizeof(FstLastTransition));
|
||||||
//trn->inp = s->data[i];
|
//trn->inp = s->data[i];
|
||||||
//trn->out = out;
|
//trn->out = out;
|
||||||
FstLastTransition *trn = fstLastTransitionCreate(s->data[i], out);
|
FstLastTransition *trn = fstLastTransitionCreate(data[i], out);
|
||||||
|
|
||||||
FstBuilderNodeUnfinished un = {.node = n, .last = trn};
|
FstBuilderNodeUnfinished un = {.node = n, .last = trn};
|
||||||
taosArrayPush(nodes->stack, &un);
|
taosArrayPush(nodes->stack, &un);
|
||||||
|
@ -127,13 +129,13 @@ void fstUnFinishedNodesAddSuffix(FstUnFinishedNodes *nodes, FstSlice bs, Output
|
||||||
uint64_t fstUnFinishedNodesFindCommPrefix(FstUnFinishedNodes *node, FstSlice bs) {
|
uint64_t fstUnFinishedNodesFindCommPrefix(FstUnFinishedNodes *node, FstSlice bs) {
|
||||||
FstSlice *s = &bs;
|
FstSlice *s = &bs;
|
||||||
|
|
||||||
size_t lsz = (size_t)(s->end - s->start + 1); // data len
|
|
||||||
size_t ssz = taosArrayGetSize(node->stack); // stack size
|
size_t ssz = taosArrayGetSize(node->stack); // stack size
|
||||||
|
|
||||||
uint64_t count = 0;
|
uint64_t count = 0;
|
||||||
|
int32_t lsz; // data len
|
||||||
|
uint8_t *data = fstSliceData(s, &lsz);
|
||||||
for (size_t i = 0; i < ssz && i < lsz; i++) {
|
for (size_t i = 0; i < ssz && i < lsz; i++) {
|
||||||
FstBuilderNodeUnfinished *un = taosArrayGet(node->stack, i);
|
FstBuilderNodeUnfinished *un = taosArrayGet(node->stack, i);
|
||||||
if (un->last->inp == s->data[s->start + i]) {
|
if (un->last->inp == data[i]) {
|
||||||
count++;
|
count++;
|
||||||
} else {
|
} else {
|
||||||
break;
|
break;
|
||||||
|
@ -153,7 +155,8 @@ uint64_t fstUnFinishedNodesFindCommPrefixAndSetOutput(FstUnFinishedNodes *node,
|
||||||
|
|
||||||
FstLastTransition *t = un->last;
|
FstLastTransition *t = un->last;
|
||||||
uint64_t addPrefix = 0;
|
uint64_t addPrefix = 0;
|
||||||
if (t && t->inp == s->data[s->start + i]) {
|
uint8_t *data = fstSliceData(s, NULL);
|
||||||
|
if (t && t->inp == data[i]) {
|
||||||
uint64_t commPrefix = MIN(t->out, *out);
|
uint64_t commPrefix = MIN(t->out, *out);
|
||||||
uint64_t tAddPrefix = t->out - commPrefix;
|
uint64_t tAddPrefix = t->out - commPrefix;
|
||||||
(*out) = (*out) - commPrefix;
|
(*out) = (*out) - commPrefix;
|
||||||
|
@ -164,7 +167,6 @@ uint64_t fstUnFinishedNodesFindCommPrefixAndSetOutput(FstUnFinishedNodes *node,
|
||||||
}
|
}
|
||||||
if (addPrefix != 0) {
|
if (addPrefix != 0) {
|
||||||
fstBuilderNodeUnfinishedAddOutputPrefix(un, addPrefix);
|
fstBuilderNodeUnfinishedAddOutputPrefix(un, addPrefix);
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return i;
|
return i;
|
||||||
|
@ -176,7 +178,9 @@ FstState fstStateCreateFrom(FstSlice* slice, CompiledAddr addr) {
|
||||||
if (addr == EMPTY_ADDRESS) {
|
if (addr == EMPTY_ADDRESS) {
|
||||||
return fs;
|
return fs;
|
||||||
}
|
}
|
||||||
uint8_t v = slice->data[addr];
|
|
||||||
|
uint8_t *data = fstSliceData(slice, NULL);
|
||||||
|
uint8_t v = data[addr];
|
||||||
uint8_t t = (v & 0b11000000) >> 6;
|
uint8_t t = (v & 0b11000000) >> 6;
|
||||||
if (t == 0b11) {
|
if (t == 0b11) {
|
||||||
fs.state = OneTransNext;
|
fs.state = OneTransNext;
|
||||||
|
@ -376,7 +380,8 @@ uint8_t fstStateInput(FstState *s, FstNode *node) {
|
||||||
FstSlice *slice = &node->data;
|
FstSlice *slice = &node->data;
|
||||||
bool null = false;
|
bool null = false;
|
||||||
uint8_t inp = fstStateCommInput(s, &null);
|
uint8_t inp = fstStateCommInput(s, &null);
|
||||||
return null == false ? inp : slice->data[slice->start - 1];
|
uint8_t *data = fstSliceData(slice, NULL);
|
||||||
|
return null == false ? inp : data[-1];
|
||||||
}
|
}
|
||||||
uint8_t fstStateInputForAnyTrans(FstState *s, FstNode *node, uint64_t i) {
|
uint8_t fstStateInputForAnyTrans(FstState *s, FstNode *node, uint64_t i) {
|
||||||
assert(s->state == AnyTrans);
|
assert(s->state == AnyTrans);
|
||||||
|
@ -388,7 +393,9 @@ uint8_t fstStateInputForAnyTrans(FstState *s, FstNode *node, uint64_t i) {
|
||||||
- fstStateTransIndexSize(s, node->version, node->nTrans)
|
- fstStateTransIndexSize(s, node->version, node->nTrans)
|
||||||
- i
|
- i
|
||||||
- 1; // the output size
|
- 1; // the output size
|
||||||
return slice->data[at];
|
|
||||||
|
uint8_t *data = fstSliceData(slice, NULL);
|
||||||
|
return data[at];
|
||||||
}
|
}
|
||||||
|
|
||||||
// trans_addr
|
// trans_addr
|
||||||
|
@ -406,7 +413,8 @@ CompiledAddr fstStateTransAddr(FstState *s, FstNode *node) {
|
||||||
- tSizes;
|
- tSizes;
|
||||||
|
|
||||||
// refactor error logic
|
// refactor error logic
|
||||||
return unpackDelta(slice->data + slice->start + i, tSizes, node->end);
|
uint8_t *data = fstSliceData(slice, NULL);
|
||||||
|
return unpackDelta(data +i, tSizes, node->end);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
CompiledAddr fstStateTransAddrForAnyTrans(FstState *s, FstNode *node, uint64_t i) {
|
CompiledAddr fstStateTransAddrForAnyTrans(FstState *s, FstNode *node, uint64_t i) {
|
||||||
|
@ -421,7 +429,8 @@ CompiledAddr fstStateTransAddrForAnyTrans(FstState *s, FstNode *node, uint64_t i
|
||||||
- node->nTrans
|
- node->nTrans
|
||||||
- (i * tSizes)
|
- (i * tSizes)
|
||||||
- tSizes;
|
- tSizes;
|
||||||
return unpackDelta(slice->data + slice->start + at, tSizes, node->end);
|
uint8_t *data = fstSliceData(slice, NULL);
|
||||||
|
return unpackDelta(data + at, tSizes, node->end);
|
||||||
}
|
}
|
||||||
|
|
||||||
// sizes
|
// sizes
|
||||||
|
@ -434,7 +443,8 @@ PackSizes fstStateSizes(FstState *s, FstSlice *slice) {
|
||||||
i = FST_SLICE_LEN(slice) - 1 - fstStateNtransLen(s) - 1;
|
i = FST_SLICE_LEN(slice) - 1 - fstStateNtransLen(s) - 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
return (PackSizes)(slice->data[slice->start + i]);
|
uint8_t *data = fstSliceData(slice, NULL);
|
||||||
|
return (PackSizes)(*(data +i));
|
||||||
}
|
}
|
||||||
// Output
|
// Output
|
||||||
Output fstStateOutput(FstState *s, FstNode *node) {
|
Output fstStateOutput(FstState *s, FstNode *node) {
|
||||||
|
@ -452,7 +462,8 @@ Output fstStateOutput(FstState *s, FstNode *node) {
|
||||||
- 1
|
- 1
|
||||||
- tSizes
|
- tSizes
|
||||||
- oSizes;
|
- oSizes;
|
||||||
return unpackUint64(slice->data + slice->start + i, oSizes);
|
uint8_t *data = fstSliceData(slice, NULL);
|
||||||
|
return unpackUint64(data + i, oSizes);
|
||||||
|
|
||||||
}
|
}
|
||||||
Output fstStateOutputForAnyTrans(FstState *s, FstNode *node, uint64_t i) {
|
Output fstStateOutputForAnyTrans(FstState *s, FstNode *node, uint64_t i) {
|
||||||
|
@ -469,7 +480,9 @@ Output fstStateOutputForAnyTrans(FstState *s, FstNode *node, uint64_t i) {
|
||||||
- fstStateTotalTransSize(s, node->version, node->sizes, node->nTrans)
|
- fstStateTotalTransSize(s, node->version, node->sizes, node->nTrans)
|
||||||
- (i * oSizes)
|
- (i * oSizes)
|
||||||
- oSizes;
|
- oSizes;
|
||||||
return unpackUint64(slice->data + slice->start + at, oSizes);
|
|
||||||
|
uint8_t *data = fstSliceData(slice, NULL);
|
||||||
|
return unpackUint64(data + at, oSizes);
|
||||||
}
|
}
|
||||||
|
|
||||||
// anyTrans specify function
|
// anyTrans specify function
|
||||||
|
@ -523,7 +536,10 @@ uint64_t fstStateNtrans(FstState *s, FstSlice *slice) {
|
||||||
if (null != true) {
|
if (null != true) {
|
||||||
return n;
|
return n;
|
||||||
}
|
}
|
||||||
n = slice->data[slice->end - 1]; // data[data.len() - 2]
|
int32_t len;
|
||||||
|
uint8_t *data = fstSliceData(slice, &len);
|
||||||
|
n = data[len - 2];
|
||||||
|
//n = data[slice->end - 1]; // data[data.len() - 2]
|
||||||
return n == 1 ? 256: n; // // "1" is never a normal legal value here, because if there, // is only 1 transition, then it is encoded in the state byte
|
return n == 1 ? 256: n; // // "1" is never a normal legal value here, because if there, // is only 1 transition, then it is encoded in the state byte
|
||||||
}
|
}
|
||||||
Output fstStateFinalOutput(FstState *s, uint64_t version, FstSlice *slice, PackSizes sizes, uint64_t nTrans) {
|
Output fstStateFinalOutput(FstState *s, uint64_t version, FstSlice *slice, PackSizes sizes, uint64_t nTrans) {
|
||||||
|
@ -538,7 +554,8 @@ Output fstStateFinalOutput(FstState *s, uint64_t version, FstSlice *slice, Pack
|
||||||
- fstStateTotalTransSize(s, version, sizes, nTrans)
|
- fstStateTotalTransSize(s, version, sizes, nTrans)
|
||||||
- (nTrans * oSizes)
|
- (nTrans * oSizes)
|
||||||
- oSizes;
|
- oSizes;
|
||||||
return unpackUint64(slice->data + slice->start + at, (uint8_t)oSizes);
|
uint8_t *data = fstSliceData(slice, NULL);
|
||||||
|
return unpackUint64(data + at, (uint8_t)oSizes);
|
||||||
|
|
||||||
}
|
}
|
||||||
uint64_t fstStateFindInput(FstState *s, FstNode *node, uint8_t b, bool *null) {
|
uint64_t fstStateFindInput(FstState *s, FstNode *node, uint8_t b, bool *null) {
|
||||||
|
@ -549,7 +566,10 @@ uint64_t fstStateFindInput(FstState *s, FstNode *node, uint8_t b, bool *null) {
|
||||||
- fstStateNtransLen(s)
|
- fstStateNtransLen(s)
|
||||||
- 1 // pack size
|
- 1 // pack size
|
||||||
- fstStateTransIndexSize(s, node->version, node->nTrans);
|
- fstStateTransIndexSize(s, node->version, node->nTrans);
|
||||||
uint64_t i = slice->data[slice->start + at + b];
|
int32_t dlen = 0;
|
||||||
|
uint8_t *data = fstSliceData(slice, &dlen);
|
||||||
|
uint64_t i = data[at + b];
|
||||||
|
//uint64_t i = slice->data[slice->start + at + b];
|
||||||
if (i >= node->nTrans) {
|
if (i >= node->nTrans) {
|
||||||
*null = true;
|
*null = true;
|
||||||
}
|
}
|
||||||
|
@ -561,8 +581,13 @@ uint64_t fstStateFindInput(FstState *s, FstNode *node, uint8_t b, bool *null) {
|
||||||
- node->nTrans;
|
- node->nTrans;
|
||||||
uint64_t end = start + node->nTrans;
|
uint64_t end = start + node->nTrans;
|
||||||
uint64_t len = end - start;
|
uint64_t len = end - start;
|
||||||
|
int32_t dlen = 0;
|
||||||
|
uint8_t *data = fstSliceData(slice, &dlen);
|
||||||
for(int i = 0; i < len; i++) {
|
for(int i = 0; i < len; i++) {
|
||||||
uint8_t v = slice->data[slice->start + i];
|
//uint8_t v = slice->data[slice->start + i];
|
||||||
|
////slice->data[slice->start + i];
|
||||||
|
uint8_t v = data[i];
|
||||||
|
|
||||||
if (v == b) {
|
if (v == b) {
|
||||||
return node->nTrans - i - 1; // bug
|
return node->nTrans - i - 1; // bug
|
||||||
}
|
}
|
||||||
|
@ -635,6 +660,7 @@ static const char *fstNodeState(FstNode *node) {
|
||||||
|
|
||||||
|
|
||||||
void fstNodeDestroy(FstNode *node) {
|
void fstNodeDestroy(FstNode *node) {
|
||||||
|
fstSliceDestroy(&node->data);
|
||||||
free(node);
|
free(node);
|
||||||
}
|
}
|
||||||
FstTransitions* fstNodeTransitions(FstNode *node) {
|
FstTransitions* fstNodeTransitions(FstNode *node) {
|
||||||
|
@ -774,18 +800,18 @@ bool fstBuilderInsert(FstBuilder *b, FstSlice bs, Output in) {
|
||||||
|
|
||||||
void fstBuilderInsertOutput(FstBuilder *b, FstSlice bs, Output in) {
|
void fstBuilderInsertOutput(FstBuilder *b, FstSlice bs, Output in) {
|
||||||
FstSlice *s = &bs;
|
FstSlice *s = &bs;
|
||||||
if (fstSliceEmpty(s)) {
|
if (fstSliceIsEmpty(s)) {
|
||||||
b->len = 1;
|
b->len = 1;
|
||||||
fstUnFinishedNodesSetRootOutput(b->unfinished, in);
|
fstUnFinishedNodesSetRootOutput(b->unfinished, in);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
Output out;
|
|
||||||
//if (in != 0) { //if let Some(in) = in
|
//if (in != 0) { //if let Some(in) = in
|
||||||
// prefixLen = fstUnFinishedNodesFindCommPrefixAndSetOutput(b->unfinished, bs, in, &out);
|
// prefixLen = fstUnFinishedNodesFindCommPrefixAndSetOutput(b->unfinished, bs, in, &out);
|
||||||
//} else {
|
//} else {
|
||||||
// prefixLen = fstUnFinishedNodesFindCommPrefix(b->unfinished, bs);
|
// prefixLen = fstUnFinishedNodesFindCommPrefix(b->unfinished, bs);
|
||||||
// out = 0;
|
// out = 0;
|
||||||
//}
|
//}
|
||||||
|
Output out;
|
||||||
uint64_t prefixLen = fstUnFinishedNodesFindCommPrefixAndSetOutput(b->unfinished, bs, in, &out);
|
uint64_t prefixLen = fstUnFinishedNodesFindCommPrefixAndSetOutput(b->unfinished, bs, in, &out);
|
||||||
|
|
||||||
if (prefixLen == FST_SLICE_LEN(s)) {
|
if (prefixLen == FST_SLICE_LEN(s)) {
|
||||||
|
@ -798,12 +824,13 @@ void fstBuilderInsertOutput(FstBuilder *b, FstSlice bs, Output in) {
|
||||||
|
|
||||||
FstSlice sub = fstSliceCopy(s, prefixLen, s->end);
|
FstSlice sub = fstSliceCopy(s, prefixLen, s->end);
|
||||||
fstUnFinishedNodesAddSuffix(b->unfinished, sub, out);
|
fstUnFinishedNodesAddSuffix(b->unfinished, sub, out);
|
||||||
|
fstSliceDestroy(&sub);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
OrderType fstBuilderCheckLastKey(FstBuilder *b, FstSlice bs, bool ckDup) {
|
OrderType fstBuilderCheckLastKey(FstBuilder *b, FstSlice bs, bool ckDup) {
|
||||||
FstSlice *input = &bs;
|
FstSlice *input = &bs;
|
||||||
if (fstSliceEmpty(&b->last)) {
|
if (fstSliceIsEmpty(&b->last)) {
|
||||||
// deep copy or not
|
// deep copy or not
|
||||||
b->last = fstSliceCopy(&bs, input->start, input->end);
|
b->last = fstSliceCopy(&bs, input->start, input->end);
|
||||||
} else {
|
} else {
|
||||||
|
@ -829,7 +856,7 @@ void fstBuilderCompileFrom(FstBuilder *b, uint64_t istate) {
|
||||||
}
|
}
|
||||||
addr = fstBuilderCompile(b, n);
|
addr = fstBuilderCompile(b, n);
|
||||||
assert(addr != NONE_ADDRESS);
|
assert(addr != NONE_ADDRESS);
|
||||||
fstBuilderNodeDestroy(n);
|
//fstBuilderNodeDestroy(n);
|
||||||
}
|
}
|
||||||
fstUnFinishedNodesTopLastFreeze(b->unfinished, addr);
|
fstUnFinishedNodesTopLastFreeze(b->unfinished, addr);
|
||||||
return;
|
return;
|
||||||
|
@ -888,7 +915,7 @@ void fstBuilderFinish(FstBuilder *b) {
|
||||||
|
|
||||||
FstSlice fstNodeAsSlice(FstNode *node) {
|
FstSlice fstNodeAsSlice(FstNode *node) {
|
||||||
FstSlice *slice = &node->data;
|
FstSlice *slice = &node->data;
|
||||||
FstSlice s = fstSliceCopy(slice, slice->end, slice->dLen - 1);
|
FstSlice s = fstSliceCopy(slice, slice->end, FST_SLICE_LEN(slice) - 1);
|
||||||
return s;
|
return s;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -929,12 +956,13 @@ void fstBuilderNodeUnfinishedAddOutputPrefix(FstBuilderNodeUnfinished *unNode, O
|
||||||
}
|
}
|
||||||
|
|
||||||
Fst* fstCreate(FstSlice *slice) {
|
Fst* fstCreate(FstSlice *slice) {
|
||||||
char *buf = slice->data;
|
int32_t slen;
|
||||||
uint64_t skip = 0;
|
char *buf = fstSliceData(slice, &slen);
|
||||||
uint64_t len = slice->dLen;
|
if (slen < 36) {
|
||||||
if (len < 36) {
|
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
uint64_t len = slen;
|
||||||
|
uint64_t skip = 0;
|
||||||
|
|
||||||
uint64_t version;
|
uint64_t version;
|
||||||
taosDecodeFixedU64(buf, &version);
|
taosDecodeFixedU64(buf, &version);
|
||||||
|
@ -992,8 +1020,10 @@ void fstDestroy(Fst *fst) {
|
||||||
bool fstGet(Fst *fst, FstSlice *b, Output *out) {
|
bool fstGet(Fst *fst, FstSlice *b, Output *out) {
|
||||||
FstNode *root = fstGetRoot(fst);
|
FstNode *root = fstGetRoot(fst);
|
||||||
Output tOut = 0;
|
Output tOut = 0;
|
||||||
for (uint32_t i = 0; i < b->dLen; i++) {
|
int32_t len;
|
||||||
uint8_t inp = b->data[i];
|
uint8_t *data = fstSliceData(b, &len);
|
||||||
|
for (uint32_t i = 0; i < len; i++) {
|
||||||
|
uint8_t inp = data[i];
|
||||||
Output res = 0;
|
Output res = 0;
|
||||||
bool null = fstNodeFindInput(root, inp, &res);
|
bool null = fstNodeFindInput(root, inp, &res);
|
||||||
if (null) { return false; }
|
if (null) { return false; }
|
||||||
|
@ -1046,9 +1076,10 @@ Output fstEmptyFinalOutput(Fst *fst, bool *null) {
|
||||||
|
|
||||||
bool fstVerify(Fst *fst) {
|
bool fstVerify(Fst *fst) {
|
||||||
uint32_t checkSum = fst->meta->checkSum;
|
uint32_t checkSum = fst->meta->checkSum;
|
||||||
FstSlice *data = fst->data;
|
int32_t len;
|
||||||
|
uint8_t *data = fstSliceData(fst->data, &len);
|
||||||
TSCKSUM initSum = 0;
|
TSCKSUM initSum = 0;
|
||||||
if (!taosCheckChecksumWhole(data->data, data->dLen)) {
|
if (!taosCheckChecksumWhole(data, len)) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
return true;
|
return true;
|
||||||
|
@ -1059,8 +1090,13 @@ FstBoundWithData* fstBoundStateCreate(FstBound type, FstSlice *data) {
|
||||||
FstBoundWithData *b = calloc(1, sizeof(FstBoundWithData));
|
FstBoundWithData *b = calloc(1, sizeof(FstBoundWithData));
|
||||||
if (b == NULL) { return NULL; }
|
if (b == NULL) { return NULL; }
|
||||||
|
|
||||||
b->type = type;
|
if (data != NULL) {
|
||||||
b->data = fstSliceCopy(data, data->start, data->end);
|
b->data = fstSliceCopy(data, data->start, data->end);
|
||||||
|
} else {
|
||||||
|
b->data = fstSliceCreate(NULL, 0);
|
||||||
|
}
|
||||||
|
b->type = type;
|
||||||
|
|
||||||
return b;
|
return b;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1078,7 +1114,7 @@ bool fstBoundWithDataIsEmpty(FstBoundWithData *bound) {
|
||||||
if (bound->type == Unbounded) {
|
if (bound->type == Unbounded) {
|
||||||
return true;
|
return true;
|
||||||
} else {
|
} else {
|
||||||
return fstSliceEmpty(&bound->data);
|
return fstSliceIsEmpty(&bound->data);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1145,8 +1181,10 @@ bool streamWithStateSeekMin(StreamWithState *sws, FstBoundWithData *min) {
|
||||||
Output out = 0;
|
Output out = 0;
|
||||||
void* autState = sws->aut->start();
|
void* autState = sws->aut->start();
|
||||||
|
|
||||||
for (uint32_t i = 0; i < key->dLen; i++) {
|
int32_t len;
|
||||||
uint8_t b = key->data[i];
|
uint8_t *data = fstSliceData(key, &len);
|
||||||
|
for (uint32_t i = 0; i < len; i++) {
|
||||||
|
uint8_t b = data[i];
|
||||||
uint64_t res = 0;
|
uint64_t res = 0;
|
||||||
bool null = fstNodeFindInput(node, b, &res);
|
bool null = fstNodeFindInput(node, b, &res);
|
||||||
if (null == false) {
|
if (null == false) {
|
||||||
|
@ -1262,12 +1300,16 @@ StreamWithStateResult *streamWithStateNextWith(StreamWithState *sws, StreamCallb
|
||||||
if (fstBoundWithDataExceededBy(sws->endAt, &slice)) {
|
if (fstBoundWithDataExceededBy(sws->endAt, &slice)) {
|
||||||
taosArrayDestroyEx(sws->stack, streamStateDestroy);
|
taosArrayDestroyEx(sws->stack, streamStateDestroy);
|
||||||
sws->stack = (SArray *)taosArrayInit(256, sizeof(StreamState));
|
sws->stack = (SArray *)taosArrayInit(256, sizeof(StreamState));
|
||||||
|
fstSliceDestroy(&slice);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
if (FST_NODE_IS_FINAL(nextNode) && isMatch) {
|
if (FST_NODE_IS_FINAL(nextNode) && isMatch) {
|
||||||
FstOutput fOutput = {.null = false, out = out + FST_NODE_FINAL_OUTPUT(nextNode)};
|
FstOutput fOutput = {.null = false, .out = out + FST_NODE_FINAL_OUTPUT(nextNode)};
|
||||||
return swsResultCreate(&slice, fOutput , tState);
|
StreamWithStateResult *result = swsResultCreate(&slice, fOutput , tState);
|
||||||
|
fstSliceDestroy(&slice);
|
||||||
|
return result;
|
||||||
}
|
}
|
||||||
|
fstSliceDestroy(&slice);
|
||||||
}
|
}
|
||||||
return NULL;
|
return NULL;
|
||||||
|
|
||||||
|
@ -1277,14 +1319,19 @@ StreamWithStateResult *swsResultCreate(FstSlice *data, FstOutput fOut, void *sta
|
||||||
StreamWithStateResult *result = calloc(1, sizeof(StreamWithStateResult));
|
StreamWithStateResult *result = calloc(1, sizeof(StreamWithStateResult));
|
||||||
if (result == NULL) { return NULL; }
|
if (result == NULL) { return NULL; }
|
||||||
|
|
||||||
FstSlice slice = fstSliceCopy(data, 0, data->dLen - 1);
|
result->data = fstSliceCopy(data, 0, FST_SLICE_LEN(data) - 1);
|
||||||
result->data = slice;
|
|
||||||
result->out = fOut;
|
result->out = fOut;
|
||||||
result->state = state;
|
result->state = state;
|
||||||
|
|
||||||
return result;
|
return result;
|
||||||
|
|
||||||
}
|
}
|
||||||
|
void swsResultDestroy(StreamWithStateResult *result) {
|
||||||
|
if (NULL == result) { return; }
|
||||||
|
|
||||||
|
fstSliceDestroy(&result->data);
|
||||||
|
free(result);
|
||||||
|
}
|
||||||
|
|
||||||
void streamStateDestroy(void *s) {
|
void streamStateDestroy(void *s) {
|
||||||
if (NULL == s) { return; }
|
if (NULL == s) { return; }
|
||||||
StreamState *ss = (StreamState *)s;
|
StreamState *ss = (StreamState *)s;
|
||||||
|
@ -1293,5 +1340,44 @@ void streamStateDestroy(void *s) {
|
||||||
//free(s->autoState);
|
//free(s->autoState);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
FstStreamBuilder *fstStreamBuilderCreate(Fst *fst, Automation *aut) {
|
||||||
|
FstStreamBuilder *b = calloc(1, sizeof(FstStreamBuilder));
|
||||||
|
if (NULL == b) { return NULL; }
|
||||||
|
|
||||||
|
b->fst = fst;
|
||||||
|
b->aut = aut;
|
||||||
|
b->min = fstBoundStateCreate(Unbounded, NULL);
|
||||||
|
b->max = fstBoundStateCreate(Unbounded, NULL);
|
||||||
|
return b;
|
||||||
|
}
|
||||||
|
void fstStreamBuilderDestroy(FstStreamBuilder *b) {
|
||||||
|
fstSliceDestroy(&b->min->data);
|
||||||
|
fstSliceDestroy(&b->max->data);
|
||||||
|
free(b);
|
||||||
|
}
|
||||||
|
FstStreamBuilder *fstStreamBuilderRange(FstStreamBuilder *b, FstSlice *val, RangeType type) {
|
||||||
|
if (b == NULL) { return NULL; }
|
||||||
|
|
||||||
|
if (type == GE) {
|
||||||
|
b->min->type = Included;
|
||||||
|
fstSliceDestroy(&(b->min->data));
|
||||||
|
b->min->data = fstSliceDeepCopy(val, 0, FST_SLICE_LEN(val) - 1);
|
||||||
|
} else if (type == GT) {
|
||||||
|
b->min->type = Excluded;
|
||||||
|
fstSliceDestroy(&(b->min->data));
|
||||||
|
b->min->data = fstSliceDeepCopy(val, 0, FST_SLICE_LEN(val) - 1);
|
||||||
|
} else if (type == LE) {
|
||||||
|
b->max->type = Included;
|
||||||
|
fstSliceDestroy(&(b->max->data));
|
||||||
|
b->max->data = fstSliceDeepCopy(val, 0, FST_SLICE_LEN(val) - 1);
|
||||||
|
} else if (type == LT) {
|
||||||
|
b->max->type = Excluded;
|
||||||
|
fstSliceDestroy(&(b->max->data));
|
||||||
|
b->max->data = fstSliceDeepCopy(val, 0, FST_SLICE_LEN(val) - 1);
|
||||||
|
}
|
||||||
|
return b;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -12,3 +12,4 @@
|
||||||
* You should have received a copy of the GNU Affero General Public License
|
* You should have received a copy of the GNU Affero General Public License
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
|
|
|
@ -16,24 +16,88 @@
|
||||||
#include "index_fst_util.h"
|
#include "index_fst_util.h"
|
||||||
#include "index_fst_counting_writer.h"
|
#include "index_fst_counting_writer.h"
|
||||||
|
|
||||||
|
static int writeCtxDoWrite(WriterCtx *ctx, uint8_t *buf, int len) {
|
||||||
|
if (ctx->offset + len > ctx->limit) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (ctx->type == TFile) {
|
||||||
|
assert(len != tfWrite(ctx->fd, buf, len));
|
||||||
|
} else {
|
||||||
|
memcpy(ctx->mem + ctx->offset, buf, len);
|
||||||
|
}
|
||||||
|
ctx->offset += len;
|
||||||
|
return len;
|
||||||
|
}
|
||||||
|
static int writeCtxDoRead(WriterCtx *ctx, uint8_t *buf, int len) {
|
||||||
|
if (ctx->type == TFile) {
|
||||||
|
tfRead(ctx->fd, buf, len);
|
||||||
|
} else {
|
||||||
|
memcpy(buf, ctx->mem + ctx->offset, len);
|
||||||
|
}
|
||||||
|
ctx->offset += len;
|
||||||
|
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
static int writeCtxDoFlush(WriterCtx *ctx) {
|
||||||
|
if (ctx->type == TFile) {
|
||||||
|
//tfFlush(ctx->fd);
|
||||||
|
} else {
|
||||||
|
// do nothing
|
||||||
|
}
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
WriterCtx* writerCtxCreate(WriterType type) {
|
||||||
|
WriterCtx *ctx = calloc(1, sizeof(WriterCtx));
|
||||||
|
if (ctx == NULL) { return NULL; }
|
||||||
|
|
||||||
|
ctx->type == type;
|
||||||
|
if (ctx->type == TFile) {
|
||||||
|
ctx->fd = tfOpenCreateWriteAppend(tmpFile);
|
||||||
|
} else if (ctx->type == TMemory) {
|
||||||
|
ctx->mem = calloc(1, DefaultMem * sizeof(uint8_t));
|
||||||
|
}
|
||||||
|
ctx->write = writeCtxDoWrite;
|
||||||
|
ctx->read = writeCtxDoRead;
|
||||||
|
ctx->flush = writeCtxDoFlush;
|
||||||
|
|
||||||
|
ctx->offset = 0;
|
||||||
|
ctx->limit = DefaultMem;
|
||||||
|
|
||||||
|
return ctx;
|
||||||
|
}
|
||||||
|
void writerCtxDestroy(WriterCtx *ctx) {
|
||||||
|
if (ctx->type == TMemory) {
|
||||||
|
free(ctx->mem);
|
||||||
|
} else {
|
||||||
|
tfClose(ctx->fd);
|
||||||
|
}
|
||||||
|
free(ctx);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
FstCountingWriter *fstCountingWriterCreate(void *wrt) {
|
FstCountingWriter *fstCountingWriterCreate(void *wrt) {
|
||||||
FstCountingWriter *cw = calloc(1, sizeof(FstCountingWriter));
|
FstCountingWriter *cw = calloc(1, sizeof(FstCountingWriter));
|
||||||
if (cw == NULL) { return NULL; }
|
if (cw == NULL) { return NULL; }
|
||||||
|
|
||||||
cw->wrt = wrt;
|
cw->wrt = (void *)(writerCtxCreate(TFile));
|
||||||
return cw;
|
return cw;
|
||||||
}
|
}
|
||||||
void fstCountingWriterDestroy(FstCountingWriter *cw) {
|
void fstCountingWriterDestroy(FstCountingWriter *cw) {
|
||||||
// free wrt object: close fd or free mem
|
// free wrt object: close fd or free mem
|
||||||
|
writerCtxDestroy((WriterCtx *)(cw->wrt));
|
||||||
free(cw);
|
free(cw);
|
||||||
}
|
}
|
||||||
|
|
||||||
uint64_t fstCountingWriterWrite(FstCountingWriter *write, uint8_t *buf, uint32_t bufLen) {
|
int fstCountingWriterWrite(FstCountingWriter *write, uint8_t *buf, uint32_t bufLen) {
|
||||||
if (write == NULL) { return 0; }
|
if (write == NULL) { return 0; }
|
||||||
// update checksum
|
// update checksum
|
||||||
// write data to file/socket or mem
|
// write data to file/socket or mem
|
||||||
|
WriterCtx *ctx = write->wrt;
|
||||||
|
|
||||||
write->count += bufLen;
|
int nWrite = ctx->write(ctx, buf, bufLen);
|
||||||
|
write->count += nWrite;
|
||||||
return bufLen;
|
return bufLen;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -41,6 +105,8 @@ uint32_t fstCountingWriterMaskedCheckSum(FstCountingWriter *write) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
int fstCountingWriterFlush(FstCountingWriter *write) {
|
int fstCountingWriterFlush(FstCountingWriter *write) {
|
||||||
|
WriterCtx *ctx = write->wrt;
|
||||||
|
ctx->flush(ctx);
|
||||||
//write->wtr->flush
|
//write->wtr->flush
|
||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,7 +18,7 @@ FstBuilderNode *fstBuilderNodeDefault() {
|
||||||
FstBuilderNode *bn = malloc(sizeof(FstBuilderNode));
|
FstBuilderNode *bn = malloc(sizeof(FstBuilderNode));
|
||||||
bn->isFinal = false;
|
bn->isFinal = false;
|
||||||
bn->finalOutput = 0;
|
bn->finalOutput = 0;
|
||||||
bn->trans = NULL;
|
bn->trans = taosArrayInit(16, sizeof(FstTransition));
|
||||||
return bn;
|
return bn;
|
||||||
}
|
}
|
||||||
void fstBuilderNodeDestroy(FstBuilderNode *node) {
|
void fstBuilderNodeDestroy(FstBuilderNode *node) {
|
||||||
|
@ -27,6 +27,25 @@ void fstBuilderNodeDestroy(FstBuilderNode *node) {
|
||||||
taosArrayDestroy(node->trans);
|
taosArrayDestroy(node->trans);
|
||||||
free(node);
|
free(node);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bool fstBuilderNodeEqual(FstBuilderNode *n1, FstBuilderNode *n2) {
|
||||||
|
if (n1 == n2) { return true; }
|
||||||
|
|
||||||
|
if (n1->isFinal != n2->isFinal ||
|
||||||
|
n1->finalOutput != n2->finalOutput ||
|
||||||
|
taosArrayGetSize(n1->trans) != taosArrayGetSize(n2->trans)) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
size_t sz = taosArrayGetSize(n1->trans);
|
||||||
|
for (size_t i = 0; i < sz; i++) {
|
||||||
|
FstTransition *t1 = taosArrayGet(n1->trans, i);
|
||||||
|
FstTransition *t2 = taosArrayGet(n2->trans, i);
|
||||||
|
if (t1->inp != t2->inp || t1->out != t2->out || t1->addr != t2->addr) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
}
|
||||||
FstBuilderNode *fstBuilderNodeClone(FstBuilderNode *src) {
|
FstBuilderNode *fstBuilderNodeClone(FstBuilderNode *src) {
|
||||||
FstBuilderNode *node = malloc(sizeof(FstBuilderNode));
|
FstBuilderNode *node = malloc(sizeof(FstBuilderNode));
|
||||||
if (node == NULL) { return NULL; }
|
if (node == NULL) { return NULL; }
|
||||||
|
@ -53,12 +72,17 @@ void fstBuilderNodeCloneFrom(FstBuilderNode *dst, FstBuilderNode *src) {
|
||||||
dst->isFinal = src->isFinal;
|
dst->isFinal = src->isFinal;
|
||||||
dst->finalOutput = src->finalOutput;
|
dst->finalOutput = src->finalOutput;
|
||||||
|
|
||||||
// avoid mem leak
|
//release free avoid mem leak
|
||||||
taosArrayDestroy(dst->trans);
|
taosArrayDestroy(dst->trans);
|
||||||
dst->trans = src->trans;
|
size_t sz = taosArrayGetSize(src->trans);
|
||||||
src->trans = NULL;
|
dst->trans = taosArrayInit(sz, sizeof(FstTransition));
|
||||||
|
for (size_t i = 0; i < sz; i++) {
|
||||||
|
FstTransition *trn = taosArrayGet(src->trans, i);
|
||||||
|
taosArrayPush(dst->trans, trn);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
//bool fstBuilderNodeCompileTo(FstBuilderNode *b, FstCountingWriter *wrt, CompiledAddr lastAddr, CompiledAddr startAddr) {
|
//bool fstBuilderNodeCompileTo(FstBuilderNode *b, FstCountingWriter *wrt, CompiledAddr lastAddr, CompiledAddr startAddr) {
|
||||||
|
|
||||||
//size_t sz = taosArrayGetSize(b->trans);
|
//size_t sz = taosArrayGetSize(b->trans);
|
||||||
|
|
|
@ -112,7 +112,7 @@ FstRegistryEntry *fstRegistryGetEntry(FstRegistry *registry, FstBuilderNode *bNo
|
||||||
if (end - start == 1) {
|
if (end - start == 1) {
|
||||||
FstRegistryCell *cell = taosArrayGet(registry->table, start);
|
FstRegistryCell *cell = taosArrayGet(registry->table, start);
|
||||||
//cell->isNode &&
|
//cell->isNode &&
|
||||||
if (cell->addr != NONE_ADDRESS && cell->node == bNode) {
|
if (cell->addr != NONE_ADDRESS && fstBuilderNodeEqual(cell->node, bNode)) {
|
||||||
entry->state = FOUND;
|
entry->state = FOUND;
|
||||||
entry->addr = cell->addr ;
|
entry->addr = cell->addr ;
|
||||||
return entry;
|
return entry;
|
||||||
|
@ -123,13 +123,13 @@ FstRegistryEntry *fstRegistryGetEntry(FstRegistry *registry, FstBuilderNode *bNo
|
||||||
}
|
}
|
||||||
} else if (end - start == 2) {
|
} else if (end - start == 2) {
|
||||||
FstRegistryCell *cell1 = taosArrayGet(registry->table, start);
|
FstRegistryCell *cell1 = taosArrayGet(registry->table, start);
|
||||||
if (cell1->addr != NONE_ADDRESS && cell1->node == bNode) {
|
if (cell1->addr != NONE_ADDRESS && fstBuilderNodeEqual(cell1->node, bNode)) {
|
||||||
entry->state = FOUND;
|
entry->state = FOUND;
|
||||||
entry->addr = cell1->addr;
|
entry->addr = cell1->addr;
|
||||||
return entry;
|
return entry;
|
||||||
}
|
}
|
||||||
FstRegistryCell *cell2 = taosArrayGet(registry->table, start + 1);
|
FstRegistryCell *cell2 = taosArrayGet(registry->table, start + 1);
|
||||||
if (cell2->addr != NONE_ADDRESS && cell2->node == bNode) {
|
if (cell2->addr != NONE_ADDRESS && fstBuilderNodeEqual(cell2->node, bNode)) {
|
||||||
entry->state = FOUND;
|
entry->state = FOUND;
|
||||||
entry->addr = cell2->addr;
|
entry->addr = cell2->addr;
|
||||||
// must swap here
|
// must swap here
|
||||||
|
@ -147,7 +147,7 @@ FstRegistryEntry *fstRegistryGetEntry(FstRegistry *registry, FstBuilderNode *bNo
|
||||||
uint32_t i = start;
|
uint32_t i = start;
|
||||||
for (; i < end; i++) {
|
for (; i < end; i++) {
|
||||||
FstRegistryCell *cell = (FstRegistryCell *)taosArrayGet(registry->table, i);
|
FstRegistryCell *cell = (FstRegistryCell *)taosArrayGet(registry->table, i);
|
||||||
if (cell->addr != NONE_ADDRESS && cell->node == bNode) {
|
if (cell->addr != NONE_ADDRESS && fstBuilderNodeEqual(cell->node, bNode)) {
|
||||||
entry->state = FOUND;
|
entry->state = FOUND;
|
||||||
entry->addr = cell->addr;
|
entry->addr = cell->addr;
|
||||||
fstRegistryCellPromote(registry->table, i, start);
|
fstRegistryCellPromote(registry->table, i, start);
|
||||||
|
|
|
@ -91,42 +91,87 @@ CompiledAddr unpackDelta(char *data, uint64_t len, uint64_t nodeAddr) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// fst slice func
|
// fst slice func
|
||||||
FstSlice fstSliceCreate(uint8_t *data, uint64_t dLen) {
|
//
|
||||||
FstSlice slice = {.data = data, .dLen = dLen, .start = 0, .end = dLen - 1};
|
|
||||||
return slice;
|
FstSlice fstSliceCreate(uint8_t *data, uint64_t len) {
|
||||||
|
FstString *str = (FstString *)malloc(sizeof(FstString));
|
||||||
|
str->ref = 1;
|
||||||
|
str->len = len;
|
||||||
|
str->data = malloc(len * sizeof(uint8_t));
|
||||||
|
memcpy(str->data, data, len);
|
||||||
|
|
||||||
|
FstSlice s = {.str = str, .start = 0, .end = len - 1};
|
||||||
|
return s;
|
||||||
}
|
}
|
||||||
// just shallow copy
|
// just shallow copy
|
||||||
FstSlice fstSliceCopy(FstSlice *slice, int32_t start, int32_t end) {
|
FstSlice fstSliceCopy(FstSlice *s, int32_t start, int32_t end) {
|
||||||
FstSlice t;
|
FstString *str = s->str;
|
||||||
if (start >= slice->dLen || end >= slice->dLen || start > end) {
|
str->ref++;
|
||||||
t.data = NULL;
|
//uint8_t *buf = fstSliceData(s, &alen);
|
||||||
return t;
|
//start = buf + start - (buf - s->start);
|
||||||
};
|
//end = buf + end - (buf - s->start);
|
||||||
|
|
||||||
t.data = slice->data;
|
FstSlice t = {.str = str, .start = start + s->start, .end = end + s->start};
|
||||||
t.dLen = slice->dLen;
|
|
||||||
t.start = start;
|
|
||||||
t.end = end;
|
|
||||||
return t;
|
return t;
|
||||||
}
|
}
|
||||||
bool fstSliceEmpty(FstSlice *slice) {
|
FstSlice fstSliceDeepCopy(FstSlice *s, int32_t start, int32_t end) {
|
||||||
return slice->data == NULL || slice->dLen <= 0;
|
|
||||||
|
int32_t tlen = end - start + 1;
|
||||||
|
int32_t slen;
|
||||||
|
uint8_t *data = fstSliceData(s, &slen);
|
||||||
|
assert(tlen <= slen);
|
||||||
|
|
||||||
|
uint8_t *buf = malloc(sizeof(uint8_t) * tlen);
|
||||||
|
memcpy(buf, data + start, tlen);
|
||||||
|
|
||||||
|
FstString *str = malloc(sizeof(FstString));
|
||||||
|
str->data = buf;
|
||||||
|
str->len = tlen;
|
||||||
|
str->ref = 1;
|
||||||
|
|
||||||
|
FstSlice ans;
|
||||||
|
ans.str = str;
|
||||||
|
ans.start = 0;
|
||||||
|
ans.end = tlen - 1;
|
||||||
|
return ans;
|
||||||
|
}
|
||||||
|
bool fstSliceIsEmpty(FstSlice *s) {
|
||||||
|
return s->str == NULL || s->str->len == 0 || s->start < 0 || s->end < 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
uint8_t *fstSliceData(FstSlice *s, int32_t *size) {
|
||||||
|
FstString *str = s->str;
|
||||||
|
if (size != NULL) {
|
||||||
|
*size = s->end - s->start + 1;
|
||||||
|
}
|
||||||
|
return str->data + s->start;
|
||||||
|
}
|
||||||
|
void fstSliceDestroy(FstSlice *s) {
|
||||||
|
FstString *str = s->str;
|
||||||
|
str->ref--;
|
||||||
|
if (str->ref <= 0) {
|
||||||
|
free(str->data);
|
||||||
|
free(str);
|
||||||
|
s->str = NULL;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
int fstSliceCompare(FstSlice *a, FstSlice *b) {
|
int fstSliceCompare(FstSlice *a, FstSlice *b) {
|
||||||
int32_t aLen = (a->end - a->start + 1);
|
int32_t alen, blen;
|
||||||
int32_t bLen = (b->end - b->start + 1);
|
uint8_t *aBuf = fstSliceData(a, &alen);
|
||||||
int32_t mLen = (aLen < bLen ? aLen : bLen);
|
uint8_t *bBuf = fstSliceData(b, &blen);
|
||||||
for (int i = 0; i < mLen; i++) {
|
|
||||||
uint8_t x = a->data[i + a->start];
|
uint32_t i, j;
|
||||||
uint8_t y = b->data[i + b->start];
|
for (i = 0, j = 0; i < alen && j < blen; i++, j++) {
|
||||||
if (x == y) { continue; }
|
uint8_t x = aBuf[i];
|
||||||
|
uint8_t y = bBuf[j];
|
||||||
|
if (x == y) { continue;}
|
||||||
else if (x < y) { return -1; }
|
else if (x < y) { return -1; }
|
||||||
else { return 1; }
|
else { return 1; };
|
||||||
}
|
}
|
||||||
if (aLen == bLen) { return 0; }
|
if (i < alen) { return 1; }
|
||||||
else if (aLen < bLen) { return -1; }
|
else if (j < blen) { return -1; }
|
||||||
else { return 1; }
|
else { return 0; }
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -1,7 +1,6 @@
|
||||||
add_executable(indexTest "")
|
add_executable(indexTest "")
|
||||||
target_sources(indexTest
|
target_sources(indexTest
|
||||||
PRIVATE
|
PRIVATE
|
||||||
"../src/index.c"
|
|
||||||
"indexTests.cpp"
|
"indexTests.cpp"
|
||||||
)
|
)
|
||||||
target_include_directories ( indexTest
|
target_include_directories ( indexTest
|
||||||
|
|
|
@ -3,58 +3,84 @@
|
||||||
#include <iostream>
|
#include <iostream>
|
||||||
#include "index.h"
|
#include "index.h"
|
||||||
#include "indexInt.h"
|
#include "indexInt.h"
|
||||||
|
#include "index_fst.h"
|
||||||
|
#include "index_fst_util.h"
|
||||||
|
#include "index_fst_counting_writer.h"
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
TEST(IndexTest, index_create_test) {
|
//TEST(IndexTest, index_create_test) {
|
||||||
SIndexOpts *opts = indexOptsCreate();
|
// SIndexOpts *opts = indexOptsCreate();
|
||||||
SIndex *index = indexOpen(opts, "./test");
|
// SIndex *index = indexOpen(opts, "./test");
|
||||||
if (index == NULL) {
|
// if (index == NULL) {
|
||||||
std::cout << "index open failed" << std::endl;
|
// std::cout << "index open failed" << std::endl;
|
||||||
}
|
// }
|
||||||
|
//
|
||||||
|
//
|
||||||
|
// // write
|
||||||
|
// for (int i = 0; i < 100000; i++) {
|
||||||
|
// SIndexMultiTerm* terms = indexMultiTermCreate();
|
||||||
|
// std::string val = "field";
|
||||||
|
//
|
||||||
|
// indexMultiTermAdd(terms, "tag1", strlen("tag1"), val.c_str(), val.size());
|
||||||
|
//
|
||||||
|
// val.append(std::to_string(i));
|
||||||
|
// indexMultiTermAdd(terms, "tag2", strlen("tag2"), val.c_str(), val.size());
|
||||||
|
//
|
||||||
|
// val.insert(0, std::to_string(i));
|
||||||
|
// indexMultiTermAdd(terms, "tag3", strlen("tag3"), val.c_str(), val.size());
|
||||||
|
//
|
||||||
|
// val.append("const");
|
||||||
|
// indexMultiTermAdd(terms, "tag4", strlen("tag4"), val.c_str(), val.size());
|
||||||
|
//
|
||||||
|
//
|
||||||
|
// indexPut(index, terms, i);
|
||||||
|
// indexMultiTermDestroy(terms);
|
||||||
|
// }
|
||||||
|
//
|
||||||
|
//
|
||||||
|
// // query
|
||||||
|
// SIndexMultiTermQuery *multiQuery = indexMultiTermQueryCreate(MUST);
|
||||||
|
//
|
||||||
|
// indexMultiTermQueryAdd(multiQuery, "tag1", strlen("tag1"), "field", strlen("field"), QUERY_PREFIX);
|
||||||
|
// indexMultiTermQueryAdd(multiQuery, "tag3", strlen("tag3"), "0field0", strlen("0field0"), QUERY_TERM);
|
||||||
|
//
|
||||||
|
// SArray *result = (SArray *)taosArrayInit(10, sizeof(int));
|
||||||
|
// indexSearch(index, multiQuery, result);
|
||||||
|
//
|
||||||
|
// std::cout << "taos'size : " << taosArrayGetSize(result) << std::endl;
|
||||||
|
// for (int i = 0; i < taosArrayGetSize(result); i++) {
|
||||||
|
// int *v = (int *)taosArrayGet(result, i);
|
||||||
|
// std::cout << "value --->" << *v << std::endl;
|
||||||
|
// }
|
||||||
|
// // add more test case
|
||||||
|
// indexMultiTermQueryDestroy(multiQuery);
|
||||||
|
//
|
||||||
|
// indexOptsDestroy(opts);
|
||||||
|
// indexClose(index);
|
||||||
|
// //
|
||||||
|
//}
|
||||||
|
|
||||||
|
int main(int argc, char** argv) {
|
||||||
|
std::string str("abc");
|
||||||
|
FstSlice key = fstSliceCreate((uint8_t *)str.c_str(), str.size());
|
||||||
|
Output val = 10;
|
||||||
|
|
||||||
// write
|
std::string str1("bcd");
|
||||||
for (int i = 0; i < 100000; i++) {
|
FstSlice key1 = fstSliceCreate((uint8_t *)str1.c_str(), str1.size());
|
||||||
SIndexMultiTerm* terms = indexMultiTermCreate();
|
Output val2 = 10;
|
||||||
std::string val = "field";
|
FstBuilder *b = fstBuilderCreate(NULL, 1);
|
||||||
|
fstBuilderInsert(b, key, val);
|
||||||
indexMultiTermAdd(terms, "tag1", strlen("tag1"), val.c_str(), val.size());
|
fstBuilderInsert(b, key1, val2);
|
||||||
|
fstBuilderFinish(b);
|
||||||
val.append(std::to_string(i));
|
fstBuilderDestroy(b);
|
||||||
indexMultiTermAdd(terms, "tag2", strlen("tag2"), val.c_str(), val.size());
|
fstSliceDestroy(&key);
|
||||||
|
return 1;
|
||||||
val.insert(0, std::to_string(i));
|
|
||||||
indexMultiTermAdd(terms, "tag3", strlen("tag3"), val.c_str(), val.size());
|
|
||||||
|
|
||||||
val.append("const");
|
|
||||||
indexMultiTermAdd(terms, "tag4", strlen("tag4"), val.c_str(), val.size());
|
|
||||||
|
|
||||||
|
|
||||||
indexPut(index, terms, i);
|
|
||||||
indexMultiTermDestroy(terms);
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
// query
|
|
||||||
SIndexMultiTermQuery *multiQuery = indexMultiTermQueryCreate(MUST);
|
|
||||||
|
|
||||||
indexMultiTermQueryAdd(multiQuery, "tag1", strlen("tag1"), "field", strlen("field"), QUERY_PREFIX);
|
|
||||||
indexMultiTermQueryAdd(multiQuery, "tag3", strlen("tag3"), "0field0", strlen("0field0"), QUERY_TERM);
|
|
||||||
|
|
||||||
SArray *result = (SArray *)taosArrayInit(10, sizeof(int));
|
|
||||||
indexSearch(index, multiQuery, result);
|
|
||||||
|
|
||||||
std::cout << "taos'size : " << taosArrayGetSize(result) << std::endl;
|
|
||||||
for (int i = 0; i < taosArrayGetSize(result); i++) {
|
|
||||||
int *v = (int *)taosArrayGet(result, i);
|
|
||||||
std::cout << "value --->" << *v << std::endl;
|
|
||||||
}
|
|
||||||
// add more test case
|
|
||||||
indexMultiTermQueryDestroy(multiQuery);
|
|
||||||
|
|
||||||
indexOptsDestroy(opts);
|
|
||||||
indexClose(index);
|
|
||||||
//
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//TEST(IndexFstBuilder, IndexFstInput) {
|
||||||
|
//
|
||||||
|
//}
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -17,14 +17,16 @@
|
||||||
#define _TD_WAL_INT_H_
|
#define _TD_WAL_INT_H_
|
||||||
|
|
||||||
#include "wal.h"
|
#include "wal.h"
|
||||||
|
#include "compare.h"
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
extern "C" {
|
extern "C" {
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
int walRotate(SWal* pWal);
|
|
||||||
int walGetFile(SWal* pWal, int32_t version);
|
int walGetFile(SWal* pWal, int32_t version);
|
||||||
|
|
||||||
|
int64_t walGetSeq();
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -20,9 +20,38 @@
|
||||||
#include "tfile.h"
|
#include "tfile.h"
|
||||||
#include "walInt.h"
|
#include "walInt.h"
|
||||||
|
|
||||||
int walSeekVerImpl(SWal *pWal, int64_t ver) {
|
static int walSeekFilePos(SWal* pWal, int64_t ver) {
|
||||||
//close old file
|
|
||||||
int code = 0;
|
int code = 0;
|
||||||
|
|
||||||
|
int64_t idxTfd = pWal->curIdxTfd;
|
||||||
|
int64_t logTfd = pWal->curLogTfd;
|
||||||
|
|
||||||
|
//seek position
|
||||||
|
int64_t offset = (ver - pWal->curFileFirstVersion) * WAL_IDX_ENTRY_SIZE;
|
||||||
|
code = tfLseek(idxTfd, offset, SEEK_SET);
|
||||||
|
if(code != 0) {
|
||||||
|
|
||||||
|
}
|
||||||
|
int64_t readBuf[2];
|
||||||
|
code = tfRead(idxTfd, readBuf, sizeof(readBuf));
|
||||||
|
if(code != 0) {
|
||||||
|
|
||||||
|
}
|
||||||
|
//TODO:deserialize
|
||||||
|
ASSERT(readBuf[0] == ver);
|
||||||
|
code = tfLseek(logTfd, readBuf[1], SEEK_CUR);
|
||||||
|
if (code != 0) {
|
||||||
|
|
||||||
|
}
|
||||||
|
pWal->curLogOffset = readBuf[1];
|
||||||
|
pWal->curVersion = ver;
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int walChangeFile(SWal *pWal, int64_t ver) {
|
||||||
|
int code = 0;
|
||||||
|
int64_t idxTfd, logTfd;
|
||||||
|
char fnameStr[WAL_FILE_LEN];
|
||||||
code = tfClose(pWal->curLogTfd);
|
code = tfClose(pWal->curLogTfd);
|
||||||
if(code != 0) {
|
if(code != 0) {
|
||||||
//TODO
|
//TODO
|
||||||
|
@ -32,29 +61,36 @@ int walSeekVerImpl(SWal *pWal, int64_t ver) {
|
||||||
//TODO
|
//TODO
|
||||||
}
|
}
|
||||||
//bsearch in fileSet
|
//bsearch in fileSet
|
||||||
int fName = 0;//TODO
|
int64_t* pRet = taosArraySearch(pWal->fileSet, &ver, compareInt64Val, TD_LE);
|
||||||
//open the right file
|
ASSERT(pRet != NULL);
|
||||||
char fNameStr[WAL_FILE_LEN];
|
int64_t fname = *pRet;
|
||||||
sprintf(fNameStr, "%d."WAL_INDEX_SUFFIX, fName);
|
if(fname < pWal->lastFileName) {
|
||||||
bool closed = 1; //TODO:read only
|
pWal->curStatus &= ~WAL_CUR_FILE_WRITABLE;
|
||||||
int64_t idxTfd = tfOpenReadWrite(fNameStr);
|
pWal->curFileLastVersion = pRet[1]-1;
|
||||||
sprintf(fNameStr, "%d."WAL_LOG_SUFFIX, fName);
|
sprintf(fnameStr, "%"PRId64"."WAL_INDEX_SUFFIX, fname);
|
||||||
int64_t logTfd = tfOpenReadWrite(fNameStr);
|
idxTfd = tfOpenRead(fnameStr);
|
||||||
//seek position
|
sprintf(fnameStr, "%"PRId64"."WAL_LOG_SUFFIX, fname);
|
||||||
int64_t offset = (ver - fName) * WAL_IDX_ENTRY_SIZE;
|
logTfd = tfOpenRead(fnameStr);
|
||||||
tfLseek(idxTfd, offset, SEEK_SET);
|
} else {
|
||||||
//set cur version, cur file version and cur status
|
pWal->curStatus |= WAL_CUR_FILE_WRITABLE;
|
||||||
pWal->curFileFirstVersion = fName;
|
pWal->curFileLastVersion = -1;
|
||||||
pWal->curFileLastVersion = 1;//TODO
|
sprintf(fnameStr, "%"PRId64"."WAL_INDEX_SUFFIX, fname);
|
||||||
|
idxTfd = tfOpenReadWrite(fnameStr);
|
||||||
|
sprintf(fnameStr, "%"PRId64"."WAL_LOG_SUFFIX, fname);
|
||||||
|
logTfd = tfOpenReadWrite(fnameStr);
|
||||||
|
}
|
||||||
|
|
||||||
|
pWal->curFileFirstVersion = fname;
|
||||||
pWal->curLogTfd = logTfd;
|
pWal->curLogTfd = logTfd;
|
||||||
pWal->curIdxTfd = idxTfd;
|
pWal->curIdxTfd = idxTfd;
|
||||||
pWal->curVersion = ver;
|
|
||||||
pWal->curOffset = offset;
|
|
||||||
pWal->curStatus = 0;//TODO
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
int walSeekVer(SWal *pWal, int64_t ver) {
|
int walSeekVer(SWal *pWal, int64_t ver) {
|
||||||
|
if((!(pWal->curStatus & WAL_CUR_FAILED))
|
||||||
|
&& ver == pWal->curVersion) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
if(ver > pWal->lastVersion) {
|
if(ver > pWal->lastVersion) {
|
||||||
//TODO: some records are skipped
|
//TODO: some records are skipped
|
||||||
return -1;
|
return -1;
|
||||||
|
@ -64,54 +100,13 @@ int walSeekVer(SWal *pWal, int64_t ver) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
if(ver < pWal->snapshotVersion) {
|
if(ver < pWal->snapshotVersion) {
|
||||||
//TODO: seek snapshotted log
|
//TODO: seek snapshotted log, invalid in some cases
|
||||||
}
|
|
||||||
if(ver >= pWal->curFileFirstVersion
|
|
||||||
&& ((pWal->curFileLastVersion == -1 && ver <= pWal->lastVersion) || (ver <= pWal->curFileLastVersion))) {
|
|
||||||
|
|
||||||
}
|
|
||||||
if(ver < pWal->curFileFirstVersion || (pWal->curFileLastVersion != -1 && ver > pWal->curFileLastVersion)) {
|
|
||||||
int index = 0;
|
|
||||||
index = 1;
|
|
||||||
//back up to avoid inconsistency
|
|
||||||
int64_t curVersion = pWal->curVersion;
|
|
||||||
int64_t curOffset = pWal->curOffset;
|
|
||||||
int64_t curFileFirstVersion = pWal->curFileFirstVersion;
|
|
||||||
int64_t curFileLastVersion = pWal->curFileLastVersion;
|
|
||||||
if(walSeekVerImpl(pWal, ver) < 0) {
|
|
||||||
//TODO: errno
|
|
||||||
pWal->curVersion = curVersion;
|
|
||||||
pWal->curOffset = curOffset;
|
|
||||||
pWal->curFileFirstVersion = curFileFirstVersion;
|
|
||||||
pWal->curFileLastVersion = curFileLastVersion;
|
|
||||||
return -1;
|
|
||||||
}
|
}
|
||||||
|
if(ver < pWal->curFileFirstVersion ||
|
||||||
|
(pWal->curFileLastVersion != -1 && ver > pWal->curFileLastVersion)) {
|
||||||
|
walChangeFile(pWal, ver);
|
||||||
}
|
}
|
||||||
|
walSeekFilePos(pWal, ver);
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int walWriteIndex(SWal *pWal, int64_t ver, int64_t offset) {
|
|
||||||
int code = 0;
|
|
||||||
//get index file
|
|
||||||
if(!tfValid(pWal->curIdxTfd)) {
|
|
||||||
code = TAOS_SYSTEM_ERROR(errno);
|
|
||||||
wError("vgId:%d, file:%"PRId64".idx, failed to open since %s", pWal->vgId, pWal->curFileFirstVersion, strerror(errno));
|
|
||||||
}
|
|
||||||
if(pWal->curVersion != ver) {
|
|
||||||
if(walSeekVer(pWal, ver) != 0) {
|
|
||||||
//TODO: some records are skipped
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
//check file checksum
|
|
||||||
//append index
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
int walRotateIndex(SWal *pWal) {
|
|
||||||
//check file checksum
|
|
||||||
//create new file
|
|
||||||
//switch file
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
|
@ -26,57 +26,44 @@ int32_t walGetNextFile(SWal *pWal, int64_t *nextFileId);
|
||||||
int32_t walGetOldFile(SWal *pWal, int64_t curFileId, int32_t minDiff, int64_t *oldFileId);
|
int32_t walGetOldFile(SWal *pWal, int64_t curFileId, int32_t minDiff, int64_t *oldFileId);
|
||||||
int32_t walGetNewFile(SWal *pWal, int64_t *newFileId);
|
int32_t walGetNewFile(SWal *pWal, int64_t *newFileId);
|
||||||
|
|
||||||
static pthread_mutex_t walInitLock = PTHREAD_MUTEX_INITIALIZER;
|
|
||||||
static int8_t walInited = 0;
|
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int32_t refSetId;
|
int32_t refSetId;
|
||||||
int32_t seq;
|
uint32_t seq;
|
||||||
int8_t stop;
|
int8_t stop;
|
||||||
|
int8_t inited;
|
||||||
pthread_t thread;
|
pthread_t thread;
|
||||||
pthread_mutex_t mutex;
|
|
||||||
} SWalMgmt;
|
} SWalMgmt;
|
||||||
|
|
||||||
static SWalMgmt tsWal = {0};
|
static SWalMgmt tsWal = {0, .seq = 1};
|
||||||
static int32_t walCreateThread();
|
static int32_t walCreateThread();
|
||||||
static void walStopThread();
|
static void walStopThread();
|
||||||
static int32_t walInitObj(SWal *pWal);
|
static int32_t walInitObj(SWal *pWal);
|
||||||
static void walFreeObj(void *pWal);
|
static void walFreeObj(void *pWal);
|
||||||
|
|
||||||
int32_t walInit() {
|
int64_t walGetSeq() {
|
||||||
//TODO: change to atomic
|
return (int64_t)atomic_load_32(&tsWal.seq);
|
||||||
pthread_mutex_lock(&walInitLock);
|
}
|
||||||
if(walInited) {
|
|
||||||
pthread_mutex_unlock(&walInitLock);
|
int32_t walInit() {
|
||||||
return 0;
|
int8_t old = atomic_val_compare_exchange_8(&tsWal.inited, 0, 1);
|
||||||
} else {
|
if(old == 1) return 0;
|
||||||
walInited = 1;
|
|
||||||
pthread_mutex_unlock(&walInitLock);
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t code = 0;
|
|
||||||
tsWal.refSetId = taosOpenRef(TSDB_MIN_VNODES, walFreeObj);
|
tsWal.refSetId = taosOpenRef(TSDB_MIN_VNODES, walFreeObj);
|
||||||
|
|
||||||
code = pthread_mutex_init(&tsWal.mutex, NULL);
|
int code = walCreateThread();
|
||||||
if (code != 0) {
|
|
||||||
wError("failed to init wal mutex since %s", tstrerror(code));
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
|
|
||||||
code = walCreateThread();
|
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
wError("failed to init wal module since %s", tstrerror(code));
|
wError("failed to init wal module since %s", tstrerror(code));
|
||||||
|
atomic_store_8(&tsWal.inited, 0);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
wInfo("wal module is initialized, rsetId:%d", tsWal.refSetId);
|
wInfo("wal module is initialized, rsetId:%d", tsWal.refSetId);
|
||||||
return code;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
void walCleanUp() {
|
void walCleanUp() {
|
||||||
walStopThread();
|
walStopThread();
|
||||||
taosCloseRef(tsWal.refSetId);
|
taosCloseRef(tsWal.refSetId);
|
||||||
pthread_mutex_destroy(&tsWal.mutex);
|
|
||||||
wInfo("wal module is cleaned up");
|
wInfo("wal module is cleaned up");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -92,7 +79,7 @@ static int walLoadFileset(SWal *pWal) {
|
||||||
char *name = ent->d_name;
|
char *name = ent->d_name;
|
||||||
name[WAL_NOSUFFIX_LEN] = 0;
|
name[WAL_NOSUFFIX_LEN] = 0;
|
||||||
//validate file name by regex matching
|
//validate file name by regex matching
|
||||||
if(1 /* regex match */) {
|
if(1 /* TODO:regex match */) {
|
||||||
int64_t fnameInt64 = atoll(name);
|
int64_t fnameInt64 = atoll(name);
|
||||||
taosArrayPush(pWal->fileSet, &fnameInt64);
|
taosArrayPush(pWal->fileSet, &fnameInt64);
|
||||||
}
|
}
|
||||||
|
@ -133,6 +120,7 @@ SWal *walOpen(const char *path, SWalCfg *pCfg) {
|
||||||
walFreeObj(pWal);
|
walFreeObj(pWal);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
walLoadFileset(pWal);
|
||||||
|
|
||||||
wDebug("vgId:%d, wal:%p is opened, level:%d fsyncPeriod:%d", pWal->vgId, pWal, pWal->level, pWal->fsyncPeriod);
|
wDebug("vgId:%d, wal:%p is opened, level:%d fsyncPeriod:%d", pWal->vgId, pWal, pWal->level, pWal->fsyncPeriod);
|
||||||
|
|
||||||
|
@ -164,6 +152,9 @@ void walClose(SWal *pWal) {
|
||||||
|
|
||||||
pthread_mutex_lock(&pWal->mutex);
|
pthread_mutex_lock(&pWal->mutex);
|
||||||
tfClose(pWal->curLogTfd);
|
tfClose(pWal->curLogTfd);
|
||||||
|
tfClose(pWal->curIdxTfd);
|
||||||
|
taosArrayDestroy(pWal->fileSet);
|
||||||
|
pWal->fileSet = NULL;
|
||||||
pthread_mutex_unlock(&pWal->mutex);
|
pthread_mutex_unlock(&pWal->mutex);
|
||||||
taosRemoveRef(tsWal.refSetId, pWal->refId);
|
taosRemoveRef(tsWal.refSetId, pWal->refId);
|
||||||
}
|
}
|
||||||
|
@ -188,6 +179,9 @@ static void walFreeObj(void *wal) {
|
||||||
wDebug("vgId:%d, wal:%p is freed", pWal->vgId, pWal);
|
wDebug("vgId:%d, wal:%p is freed", pWal->vgId, pWal);
|
||||||
|
|
||||||
tfClose(pWal->curLogTfd);
|
tfClose(pWal->curLogTfd);
|
||||||
|
tfClose(pWal->curIdxTfd);
|
||||||
|
taosArrayDestroy(pWal->fileSet);
|
||||||
|
pWal->fileSet = NULL;
|
||||||
pthread_mutex_destroy(&pWal->mutex);
|
pthread_mutex_destroy(&pWal->mutex);
|
||||||
tfree(pWal);
|
tfree(pWal);
|
||||||
}
|
}
|
||||||
|
@ -197,7 +191,7 @@ static bool walNeedFsync(SWal *pWal) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (tsWal.seq % pWal->fsyncSeq == 0) {
|
if (atomic_load_32(&tsWal.seq) % pWal->fsyncSeq == 0) {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -206,16 +200,14 @@ static bool walNeedFsync(SWal *pWal) {
|
||||||
|
|
||||||
static void walUpdateSeq() {
|
static void walUpdateSeq() {
|
||||||
taosMsleep(WAL_REFRESH_MS);
|
taosMsleep(WAL_REFRESH_MS);
|
||||||
if (++tsWal.seq <= 0) {
|
atomic_add_fetch_32(&tsWal.seq, 1);
|
||||||
tsWal.seq = 1;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static void walFsyncAll() {
|
static void walFsyncAll() {
|
||||||
SWal *pWal = taosIterateRef(tsWal.refSetId, 0);
|
SWal *pWal = taosIterateRef(tsWal.refSetId, 0);
|
||||||
while (pWal) {
|
while (pWal) {
|
||||||
if (walNeedFsync(pWal)) {
|
if (walNeedFsync(pWal)) {
|
||||||
wTrace("vgId:%d, do fsync, level:%d seq:%d rseq:%d", pWal->vgId, pWal->level, pWal->fsyncSeq, tsWal.seq);
|
wTrace("vgId:%d, do fsync, level:%d seq:%d rseq:%d", pWal->vgId, pWal->level, pWal->fsyncSeq, atomic_load_32(&tsWal.seq));
|
||||||
int32_t code = tfFsync(pWal->curLogTfd);
|
int32_t code = tfFsync(pWal->curLogTfd);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
wError("vgId:%d, file:%"PRId64".log, failed to fsync since %s", pWal->vgId, pWal->curFileFirstVersion, strerror(code));
|
wError("vgId:%d, file:%"PRId64".log, failed to fsync since %s", pWal->vgId, pWal->curFileFirstVersion, strerror(code));
|
||||||
|
@ -226,16 +218,12 @@ static void walFsyncAll() {
|
||||||
}
|
}
|
||||||
|
|
||||||
static void *walThreadFunc(void *param) {
|
static void *walThreadFunc(void *param) {
|
||||||
int stop = 0;
|
|
||||||
setThreadName("wal");
|
setThreadName("wal");
|
||||||
while (1) {
|
while (1) {
|
||||||
walUpdateSeq();
|
walUpdateSeq();
|
||||||
walFsyncAll();
|
walFsyncAll();
|
||||||
|
|
||||||
pthread_mutex_lock(&tsWal.mutex);
|
if (atomic_load_8(&tsWal.stop)) break;
|
||||||
stop = tsWal.stop;
|
|
||||||
pthread_mutex_unlock(&tsWal.mutex);
|
|
||||||
if (stop) break;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return NULL;
|
return NULL;
|
||||||
|
@ -258,9 +246,7 @@ static int32_t walCreateThread() {
|
||||||
}
|
}
|
||||||
|
|
||||||
static void walStopThread() {
|
static void walStopThread() {
|
||||||
pthread_mutex_lock(&tsWal.mutex);
|
atomic_store_8(&tsWal.stop, 1);
|
||||||
tsWal.stop = 1;
|
|
||||||
pthread_mutex_unlock(&tsWal.mutex);
|
|
||||||
|
|
||||||
if (taosCheckPthreadValid(tsWal.thread)) {
|
if (taosCheckPthreadValid(tsWal.thread)) {
|
||||||
pthread_join(tsWal.thread, NULL);
|
pthread_join(tsWal.thread, NULL);
|
||||||
|
|
|
@ -26,10 +26,24 @@ int32_t walCommit(SWal *pWal, int64_t ver) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t walRollback(SWal *pWal, int64_t ver) {
|
int32_t walRollback(SWal *pWal, int64_t ver) {
|
||||||
|
//TODO: ftruncate
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t walTakeSnapshot(SWal *pWal, int64_t ver) {
|
int32_t walTakeSnapshot(SWal *pWal, int64_t ver) {
|
||||||
|
pWal->snapshotVersion = ver;
|
||||||
|
|
||||||
|
//mark files safe to delete
|
||||||
|
int64_t* pRet = taosArraySearch(pWal->fileSet, &ver, compareInt64Val, TD_LE);
|
||||||
|
if(pRet != pWal->fileSet->pData) {
|
||||||
|
//delete files until less than retention size
|
||||||
|
|
||||||
|
//find first file that exceeds retention time
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
//delete files living longer than retention limit
|
||||||
|
//remove file from fileset
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -124,13 +138,102 @@ void walRemoveAllOldFiles(void *handle) {
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
static int walRoll(SWal *pWal) {
|
||||||
|
int code = 0;
|
||||||
|
code = tfClose(pWal->curIdxTfd);
|
||||||
|
if(code != 0) {
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
code = tfClose(pWal->curLogTfd);
|
||||||
|
if(code != 0) {
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
int64_t idxTfd, logTfd;
|
||||||
|
//create new file
|
||||||
|
int64_t newFileFirstVersion = pWal->lastVersion + 1;
|
||||||
|
char fnameStr[WAL_FILE_LEN];
|
||||||
|
sprintf(fnameStr, "%"PRId64"."WAL_INDEX_SUFFIX, newFileFirstVersion);
|
||||||
|
idxTfd = tfOpenCreateWrite(fnameStr);
|
||||||
|
sprintf(fnameStr, "%"PRId64"."WAL_LOG_SUFFIX, newFileFirstVersion);
|
||||||
|
logTfd = tfOpenCreateWrite(fnameStr);
|
||||||
|
|
||||||
|
taosArrayPush(pWal->fileSet, &newFileFirstVersion);
|
||||||
|
|
||||||
|
//switch file
|
||||||
|
pWal->curIdxTfd = idxTfd;
|
||||||
|
pWal->curLogTfd = logTfd;
|
||||||
|
//change status
|
||||||
|
pWal->curFileLastVersion = -1;
|
||||||
|
pWal->curFileFirstVersion = newFileFirstVersion;
|
||||||
|
pWal->curVersion = newFileFirstVersion;
|
||||||
|
pWal->curLogOffset = 0;
|
||||||
|
pWal->curStatus = WAL_CUR_FILE_WRITABLE & WAL_CUR_POS_WRITABLE;
|
||||||
|
|
||||||
|
pWal->lastFileName = newFileFirstVersion;
|
||||||
|
pWal->lastFileWriteSize = 0;
|
||||||
|
pWal->lastRollSeq = walGetSeq();
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int walChangeFileToLast(SWal *pWal) {
|
||||||
|
int64_t idxTfd, logTfd;
|
||||||
|
int64_t* pRet = taosArrayGetLast(pWal->fileSet);
|
||||||
|
ASSERT(pRet != NULL);
|
||||||
|
int64_t fname = *pRet;
|
||||||
|
|
||||||
|
char fnameStr[WAL_FILE_LEN];
|
||||||
|
sprintf(fnameStr, "%"PRId64"."WAL_INDEX_SUFFIX, fname);
|
||||||
|
idxTfd = tfOpenReadWrite(fnameStr);
|
||||||
|
sprintf(fnameStr, "%"PRId64"."WAL_LOG_SUFFIX, fname);
|
||||||
|
logTfd = tfOpenReadWrite(fnameStr);
|
||||||
|
//switch file
|
||||||
|
pWal->curIdxTfd = idxTfd;
|
||||||
|
pWal->curLogTfd = logTfd;
|
||||||
|
//change status
|
||||||
|
pWal->curFileLastVersion = -1;
|
||||||
|
pWal->curFileFirstVersion = fname;
|
||||||
|
pWal->curVersion = fname;
|
||||||
|
pWal->curLogOffset = 0;
|
||||||
|
pWal->curStatus = WAL_CUR_FILE_WRITABLE;
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int walWriteIndex(SWal *pWal, int64_t ver, int64_t offset) {
|
||||||
|
int code = 0;
|
||||||
|
//get index file
|
||||||
|
if(!tfValid(pWal->curIdxTfd)) {
|
||||||
|
code = TAOS_SYSTEM_ERROR(errno);
|
||||||
|
wError("vgId:%d, file:%"PRId64".idx, failed to open since %s", pWal->vgId, pWal->curFileFirstVersion, strerror(errno));
|
||||||
|
}
|
||||||
|
int64_t writeBuf[2] = { ver, offset };
|
||||||
|
int size = tfWrite(pWal->curIdxTfd, writeBuf, sizeof(writeBuf));
|
||||||
|
if(size != sizeof(writeBuf)) {
|
||||||
|
//TODO:
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
int64_t walWrite(SWal *pWal, int64_t index, uint8_t msgType, void *body, int32_t bodyLen) {
|
int64_t walWrite(SWal *pWal, int64_t index, uint8_t msgType, void *body, int32_t bodyLen) {
|
||||||
if (pWal == NULL) return -1;
|
if (pWal == NULL) return -1;
|
||||||
|
|
||||||
// no wal
|
// no wal
|
||||||
if (!tfValid(pWal->curLogTfd)) return 0;
|
|
||||||
if (pWal->level == TAOS_WAL_NOLOG) return 0;
|
if (pWal->level == TAOS_WAL_NOLOG) return 0;
|
||||||
if (index > pWal->lastVersion + 1) return -1;
|
|
||||||
|
if (index == pWal->lastVersion + 1) {
|
||||||
|
int64_t passed = walGetSeq() - pWal->lastRollSeq;
|
||||||
|
if(passed > pWal->rollPeriod) {
|
||||||
|
walRoll(pWal);
|
||||||
|
} else if(pWal->lastFileWriteSize > pWal->segSize) {
|
||||||
|
walRoll(pWal);
|
||||||
|
} else {
|
||||||
|
walChangeFileToLast(pWal);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
//reject skip log or rewrite log
|
||||||
|
//must truncate explicitly first
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
if (!tfValid(pWal->curLogTfd)) return 0;
|
||||||
|
|
||||||
pWal->head.version = index;
|
pWal->head.version = index;
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
@ -155,7 +258,11 @@ int64_t walWrite(SWal *pWal, int64_t index, uint8_t msgType, void *body, int32_t
|
||||||
code = TAOS_SYSTEM_ERROR(errno);
|
code = TAOS_SYSTEM_ERROR(errno);
|
||||||
wError("vgId:%d, file:%"PRId64".log, failed to write since %s", pWal->vgId, pWal->curFileFirstVersion, strerror(errno));
|
wError("vgId:%d, file:%"PRId64".log, failed to write since %s", pWal->vgId, pWal->curFileFirstVersion, strerror(errno));
|
||||||
}
|
}
|
||||||
//TODO:write idx
|
walWriteIndex(pWal, index, pWal->curLogOffset);
|
||||||
|
pWal->curLogOffset += sizeof(SWalHead) + bodyLen;
|
||||||
|
|
||||||
|
//set status
|
||||||
|
pWal->lastVersion = index;
|
||||||
|
|
||||||
pthread_mutex_unlock(&pWal->mutex);
|
pthread_mutex_unlock(&pWal->mutex);
|
||||||
|
|
||||||
|
|
|
@ -53,6 +53,11 @@ static int64_t tfOpenImp(int32_t fd) {
|
||||||
return rid;
|
return rid;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int64_t tfOpenRead(const char *pathname, int32_t flags) {
|
||||||
|
int32_t fd = taosOpenFileRead(pathname);
|
||||||
|
return tfOpenImp(fd);
|
||||||
|
}
|
||||||
|
|
||||||
int64_t tfOpenReadWrite(const char *pathname, int32_t flags) {
|
int64_t tfOpenReadWrite(const char *pathname, int32_t flags) {
|
||||||
int32_t fd = taosOpenFileReadWrite(pathname);
|
int32_t fd = taosOpenFileReadWrite(pathname);
|
||||||
return tfOpenImp(fd);
|
return tfOpenImp(fd);
|
||||||
|
|
Loading…
Reference in New Issue