update index TFile write

This commit is contained in:
yihaoDeng 2021-12-21 22:57:30 +08:00
parent f7be790924
commit 10b70175b5
20 changed files with 816 additions and 1006 deletions

View File

@ -45,8 +45,7 @@ void indexCacheDestroy(void *cache);
int indexCachePut(void* cache, SIndexTerm* term, int16_t colId, int32_t version, uint64_t uid); int indexCachePut(void* cache, SIndexTerm* term, int16_t colId, int32_t version, uint64_t uid);
// int indexCacheGet(void *cache, uint64_t *rst); // int indexCacheGet(void *cache, uint64_t *rst);
int indexCacheSearch( int indexCacheSearch(void* cache, SIndexTermQuery* query, int16_t colId, int32_t version, SArray* result, STermValueType* s);
void *cache, SIndexTermQuery *query, int16_t colId, int32_t version, SArray *result, STermValueType *s);
#ifdef __cplusplus #ifdef __cplusplus
} }

View File

@ -142,8 +142,7 @@ uint64_t fstStateInputLen(FstState *state);
// end_addr // end_addr
uint64_t fstStateEndAddrForOneTransNext(FstState* state, FstSlice* data); uint64_t fstStateEndAddrForOneTransNext(FstState* state, FstSlice* data);
uint64_t fstStateEndAddrForOneTrans(FstState* state, FstSlice* data, PackSizes sizes); uint64_t fstStateEndAddrForOneTrans(FstState* state, FstSlice* data, PackSizes sizes);
uint64_t fstStateEndAddrForAnyTrans( uint64_t fstStateEndAddrForAnyTrans(FstState* state, uint64_t version, FstSlice* date, PackSizes sizes, uint64_t nTrans);
FstState *state, uint64_t version, FstSlice *date, PackSizes sizes, uint64_t nTrans);
// input // input
uint8_t fstStateInput(FstState* state, FstNode* node); uint8_t fstStateInput(FstState* state, FstNode* node);
uint8_t fstStateInputForAnyTrans(FstState* state, FstNode* node, uint64_t i); uint8_t fstStateInputForAnyTrans(FstState* state, FstNode* node, uint64_t i);
@ -311,8 +310,7 @@ StreamWithStateResult *swsResultCreate(FstSlice *data, FstOutput fOut, void *sta
void swsResultDestroy(StreamWithStateResult* result); void swsResultDestroy(StreamWithStateResult* result);
typedef void* (*StreamCallback)(void*); typedef void* (*StreamCallback)(void*);
StreamWithState *streamWithStateCreate( StreamWithState* streamWithStateCreate(Fst* fst, AutomationCtx* automation, FstBoundWithData* min, FstBoundWithData* max);
Fst *fst, AutomationCtx *automation, FstBoundWithData *min, FstBoundWithData *max);
void streamWithStateDestroy(StreamWithState* sws); void streamWithStateDestroy(StreamWithState* sws);

View File

@ -30,7 +30,9 @@ typedef struct FstRegistryCell {
#define FST_REGISTRY_CELL_IS_EMPTY(cell) (cell->addr == NONE_ADDRESS) #define FST_REGISTRY_CELL_IS_EMPTY(cell) (cell->addr == NONE_ADDRESS)
#define FST_REGISTRY_CELL_INSERT(cell, tAddr) \ #define FST_REGISTRY_CELL_INSERT(cell, tAddr) \
do { cell->addr = tAddr; } while (0) do { \
cell->addr = tAddr; \
} while (0)
// typedef struct FstRegistryCache { // typedef struct FstRegistryCache {
// SArray *cells; // SArray *cells;

View File

@ -37,7 +37,7 @@ typedef struct TFileHeader {
uint8_t colType; uint8_t colType;
} TFileHeader; } TFileHeader;
#define TFILE_HEADER_SIZE (sizeof(TFILE_HEADER_SIZE) + sizeof(uint32_t)); #define TFILE_HEADER_SIZE (sizeof(TFileHeader) + sizeof(uint32_t))
#define TFILE_HADER_PRE_SIZE (sizeof(uint64_t) + sizeof(int32_t) + sizeof(int32_t)) #define TFILE_HADER_PRE_SIZE (sizeof(uint64_t) + sizeof(int32_t) + sizeof(int32_t))
typedef struct TFileCacheKey { typedef struct TFileCacheKey {

View File

@ -49,7 +49,9 @@ static int indexMergeFinalResults(SArray *interResults, EIndexOperatorType oTyp
int indexOpen(SIndexOpts* opts, const char* path, SIndex** index) { int indexOpen(SIndexOpts* opts, const char* path, SIndex** index) {
pthread_once(&isInit, indexInit); pthread_once(&isInit, indexInit);
SIndex* sIdx = calloc(1, sizeof(SIndex)); SIndex* sIdx = calloc(1, sizeof(SIndex));
if (sIdx == NULL) { return -1; } if (sIdx == NULL) {
return -1;
}
#ifdef USE_LUCENE #ifdef USE_LUCENE
index_t* index = index_open(path); index_t* index = index_open(path);
@ -129,7 +131,9 @@ int indexPut(SIndex *index, SIndexMultiTerm *fVals, uint64_t uid) {
int32_t colId = fi->colId; int32_t colId = fi->colId;
int32_t version = index->cVersion; int32_t version = index->cVersion;
int ret = indexCachePut(index->cache, p, colId, version, uid); int ret = indexCachePut(index->cache, p, colId, version, uid);
if (ret != 0) { return ret; } if (ret != 0) {
return ret;
}
} }
#endif #endif
@ -159,7 +163,9 @@ int indexSearch(SIndex *index, SIndexMultiTermQuery *multiQuerys, SArray *result
int tsz = 0; int tsz = 0;
index_multi_search(index->index, (const char**)fields, (const char**)keys, types, nQuery, opera, &tResult, &tsz); index_multi_search(index->index, (const char**)fields, (const char**)keys, types, nQuery, opera, &tResult, &tsz);
for (int i = 0; i < tsz; i++) { taosArrayPush(result, &tResult[i]); } for (int i = 0; i < tsz; i++) {
taosArrayPush(result, &tResult[i]);
}
for (int i = 0; i < nQuery; i++) { for (int i = 0; i < nQuery; i++) {
free(fields[i]); free(fields[i]);
@ -215,7 +221,9 @@ void indexOptsDestroy(SIndexOpts *opts){
SIndexMultiTermQuery* indexMultiTermQueryCreate(EIndexOperatorType opera) { SIndexMultiTermQuery* indexMultiTermQueryCreate(EIndexOperatorType opera) {
SIndexMultiTermQuery* p = (SIndexMultiTermQuery*)malloc(sizeof(SIndexMultiTermQuery)); SIndexMultiTermQuery* p = (SIndexMultiTermQuery*)malloc(sizeof(SIndexMultiTermQuery));
if (p == NULL) { return NULL; } if (p == NULL) {
return NULL;
}
p->opera = opera; p->opera = opera;
p->query = taosArrayInit(4, sizeof(SIndexTermQuery)); p->query = taosArrayInit(4, sizeof(SIndexTermQuery));
return p; return p;
@ -234,10 +242,17 @@ int indexMultiTermQueryAdd(SIndexMultiTermQuery *pQuery, SIndexTerm *term, EInde
return 0; return 0;
} }
SIndexTerm *indexTermCreate(int64_t suid, SIndexOperOnColumn oper, uint8_t colType, const char *colName, SIndexTerm* indexTermCreate(int64_t suid,
int32_t nColName, const char *colVal, int32_t nColVal) { SIndexOperOnColumn oper,
uint8_t colType,
const char* colName,
int32_t nColName,
const char* colVal,
int32_t nColVal) {
SIndexTerm* t = (SIndexTerm*)calloc(1, (sizeof(SIndexTerm))); SIndexTerm* t = (SIndexTerm*)calloc(1, (sizeof(SIndexTerm)));
if (t == NULL) { return NULL; } if (t == NULL) {
return NULL;
}
t->suid = suid; t->suid = suid;
t->operType = oper; t->operType = oper;
@ -258,7 +273,9 @@ void indexTermDestroy(SIndexTerm *p) {
free(p); free(p);
} }
SIndexMultiTerm *indexMultiTermCreate() { return taosArrayInit(4, sizeof(SIndexTerm *)); } SIndexMultiTerm* indexMultiTermCreate() {
return taosArrayInit(4, sizeof(SIndexTerm*));
}
int indexMultiTermAdd(SIndexMultiTerm* terms, SIndexTerm* term) { int indexMultiTermAdd(SIndexMultiTerm* terms, SIndexTerm* term) {
taosArrayPush(terms, &term); taosArrayPush(terms, &term);
@ -315,7 +332,9 @@ static int indexTermSearch(SIndex *sIdx, SIndexTermQuery *query, SArray **result
return 0; return 0;
} }
static void indexInterResultsDestroy(SArray* results) { static void indexInterResultsDestroy(SArray* results) {
if (results == NULL) { return; } if (results == NULL) {
return;
}
size_t sz = taosArrayGetSize(results); size_t sz = taosArrayGetSize(results);
for (size_t i = 0; i < sz; i++) { for (size_t i = 0; i < sz; i++) {
@ -345,7 +364,9 @@ static int indexMergeFinalResults(SArray *interResults, EIndexOperatorType oType
return 0; return 0;
} }
static int indexMergeCacheIntoTindex(SIndex* sIdx) { static int indexMergeCacheIntoTindex(SIndex* sIdx) {
if (sIdx == NULL) { return -1; } if (sIdx == NULL) {
return -1;
}
indexWarn("suid %" PRIu64 " merge cache into tindex", sIdx->suid); indexWarn("suid %" PRIu64 " merge cache into tindex", sIdx->suid);
return 0; return 0;
} }

View File

@ -21,10 +21,11 @@
// ref index_cache.h:22 // ref index_cache.h:22
#define CACHE_KEY_LEN(p) \ #define CACHE_KEY_LEN(p) \
(sizeof(int32_t) + sizeof(uint16_t) + sizeof(p->colType) + sizeof(p->nColVal) + p->nColVal + sizeof(uint64_t) + \ (sizeof(int32_t) + sizeof(uint16_t) + sizeof(p->colType) + sizeof(p->nColVal) + p->nColVal + sizeof(uint64_t) + sizeof(p->operType))
sizeof(p->operType))
static char * getIndexKey(const void *pData) { return NULL; } static char* getIndexKey(const void* pData) {
return NULL;
}
static int32_t compareKey(const void* l, const void* r) { static int32_t compareKey(const void* l, const void* r) {
char* lp = (char*)l; char* lp = (char*)l;
char* rp = (char*)r; char* rp = (char*)r;
@ -101,8 +102,8 @@ IndexCache *indexCacheCreate() {
indexError("failed to create index cache"); indexError("failed to create index cache");
return NULL; return NULL;
} }
cache->skiplist = tSkipListCreate( cache->skiplist =
MAX_SKIP_LIST_LEVEL, TSDB_DATA_TYPE_BINARY, MAX_INDEX_KEY_LEN, compareKey, SL_ALLOW_DUP_KEY, getIndexKey); tSkipListCreate(MAX_SKIP_LIST_LEVEL, TSDB_DATA_TYPE_BINARY, MAX_INDEX_KEY_LEN, compareKey, SL_ALLOW_DUP_KEY, getIndexKey);
return cache; return cache;
} }
@ -143,12 +144,7 @@ int indexCacheDel(void *cache, int32_t fieldId, const char *fieldValue, int32_t
IndexCache* pCache = cache; IndexCache* pCache = cache;
return 0; return 0;
} }
int indexCacheSearch(void *cache, int indexCacheSearch(void* cache, SIndexTermQuery* query, int16_t colId, int32_t version, SArray* result, STermValueType* s) {
SIndexTermQuery * query,
int16_t colId,
int32_t version,
SArray * result,
STermValueType * s) {
if (cache == NULL) { return -1; } if (cache == NULL) { return -1; }
IndexCache* pCache = cache; IndexCache* pCache = cache;
SIndexTerm* term = query->term; SIndexTerm* term = query->term;

View File

@ -87,8 +87,7 @@ void fstUnFinishedNodesSetRootOutput(FstUnFinishedNodes *nodes, Output out) {
// un->node->trans = NULL; // un->node->trans = NULL;
} }
void fstUnFinishedNodesTopLastFreeze(FstUnFinishedNodes* nodes, CompiledAddr addr) { void fstUnFinishedNodesTopLastFreeze(FstUnFinishedNodes* nodes, CompiledAddr addr) {
size_t sz = taosArrayGetSize(nodes->stack) - 1; FstBuilderNodeUnfinished* un = taosArrayGet(nodes->stack, taosArrayGetSize(nodes->stack) - 1);
FstBuilderNodeUnfinished *un = taosArrayGet(nodes->stack, sz);
fstBuilderNodeUnfinishedLastCompiled(un, addr); fstBuilderNodeUnfinishedLastCompiled(un, addr);
} }
void fstUnFinishedNodesAddSuffix(FstUnFinishedNodes* nodes, FstSlice bs, Output out) { void fstUnFinishedNodesAddSuffix(FstUnFinishedNodes* nodes, FstSlice bs, Output out) {
@ -189,8 +188,10 @@ FstState fstStateCreateFrom(FstSlice *slice, CompiledAddr addr) {
return fs; return fs;
} }
static FstState fstStateDict[] = {{.state = OneTransNext, .val = 0b11000000}, {.state = OneTrans, .val = 0b10000000}, static FstState fstStateDict[] = {{.state = OneTransNext, .val = 0b11000000},
{.state = AnyTrans, .val = 0b00000000}, {.state = EmptyFinal, .val = 0b00000000}}; {.state = OneTrans, .val = 0b10000000},
{.state = AnyTrans, .val = 0b00000000},
{.state = EmptyFinal, .val = 0b00000000}};
// debug // debug
static const char* fstStateStr[] = {"ONE_TRANS_NEXT", "ONE_TRANS", "ANY_TRANS", "EMPTY_FINAL"}; static const char* fstStateStr[] = {"ONE_TRANS_NEXT", "ONE_TRANS", "ANY_TRANS", "EMPTY_FINAL"};
@ -353,8 +354,7 @@ uint64_t fstStateEndAddrForOneTrans(FstState *s, FstSlice *data, PackSizes sizes
return FST_SLICE_LEN(data) - 1 - fstStateInputLen(s) - 1 // pack size return FST_SLICE_LEN(data) - 1 - fstStateInputLen(s) - 1 // pack size
- FST_GET_TRANSITION_PACK_SIZE(sizes) - FST_GET_OUTPUT_PACK_SIZE(sizes); - FST_GET_TRANSITION_PACK_SIZE(sizes) - FST_GET_OUTPUT_PACK_SIZE(sizes);
} }
uint64_t fstStateEndAddrForAnyTrans( uint64_t fstStateEndAddrForAnyTrans(FstState* state, uint64_t version, FstSlice* date, PackSizes sizes, uint64_t nTrans) {
FstState *state, uint64_t version, FstSlice *date, PackSizes sizes, uint64_t nTrans) {
uint8_t oSizes = FST_GET_OUTPUT_PACK_SIZE(sizes); uint8_t oSizes = FST_GET_OUTPUT_PACK_SIZE(sizes);
uint8_t finalOsize = !fstStateIsFinalState(state) ? 0 : oSizes; uint8_t finalOsize = !fstStateIsFinalState(state) ? 0 : oSizes;
return FST_SLICE_LEN(date) - 1 - fstStateNtransLen(state) - 1 // pack size return FST_SLICE_LEN(date) - 1 - fstStateNtransLen(state) - 1 // pack size
@ -403,8 +403,8 @@ CompiledAddr fstStateTransAddrForAnyTrans(FstState *s, FstNode *node, uint64_t i
FstSlice* slice = &node->data; FstSlice* slice = &node->data;
uint8_t tSizes = FST_GET_TRANSITION_PACK_SIZE(node->sizes); uint8_t tSizes = FST_GET_TRANSITION_PACK_SIZE(node->sizes);
uint64_t at = node->start - fstStateNtransLen(s) - 1 - fstStateTransIndexSize(s, node->version, node->nTrans) - uint64_t at = node->start - fstStateNtransLen(s) - 1 - fstStateTransIndexSize(s, node->version, node->nTrans) - node->nTrans -
node->nTrans - (i * tSizes) - tSizes; (i * tSizes) - tSizes;
uint8_t* data = fstSliceData(slice, NULL); uint8_t* data = fstSliceData(slice, NULL);
return unpackDelta(data + at, tSizes, node->end); return unpackDelta(data + at, tSizes, node->end);
} }
@ -595,8 +595,7 @@ FstNode *fstNodeCreate(int64_t version, CompiledAddr addr, FstSlice *slice) {
n->isFinal = fstStateIsFinalState(&st); // s.is_final_state(); n->isFinal = fstStateIsFinalState(&st); // s.is_final_state();
n->nTrans = nTrans; n->nTrans = nTrans;
n->sizes = sz; n->sizes = sz;
n->finalOutput = n->finalOutput = fstStateFinalOutput(&st, version, &data, sz, nTrans); // s.final_output(version, data, sz, ntrans);
fstStateFinalOutput(&st, version, &data, sz, nTrans); // s.final_output(version, data, sz, ntrans);
} }
return n; return n;
} }
@ -876,7 +875,9 @@ void *fstBuilderInsertInner(FstBuilder *b) {
// b->wrt = NULL; // b->wrt = NULL;
return b->wrt; return b->wrt;
} }
void fstBuilderFinish(FstBuilder *b) { fstBuilderInsertInner(b); } void fstBuilderFinish(FstBuilder* b) {
fstBuilderInsertInner(b);
}
FstSlice fstNodeAsSlice(FstNode* node) { FstSlice fstNodeAsSlice(FstNode* node) {
FstSlice* slice = &node->data; FstSlice* slice = &node->data;
@ -893,7 +894,9 @@ FstLastTransition *fstLastTransitionCreate(uint8_t inp, Output out) {
return trn; return trn;
} }
void fstLastTransitionDestroy(FstLastTransition *trn) { free(trn); } void fstLastTransitionDestroy(FstLastTransition* trn) {
free(trn);
}
void fstBuilderNodeUnfinishedLastCompiled(FstBuilderNodeUnfinished* unNode, CompiledAddr addr) { void fstBuilderNodeUnfinishedLastCompiled(FstBuilderNodeUnfinished* unNode, CompiledAddr addr) {
FstLastTransition* trn = unNode->last; FstLastTransition* trn = unNode->last;
if (trn == NULL) { return; } if (trn == NULL) { return; }
@ -1010,12 +1013,16 @@ bool fstGet(Fst *fst, FstSlice *b, Output *out) {
return true; return true;
} }
FstStreamBuilder *fstSearch(Fst *fst, AutomationCtx *ctx) { return fstStreamBuilderCreate(fst, ctx); } FstStreamBuilder* fstSearch(Fst* fst, AutomationCtx* ctx) {
return fstStreamBuilderCreate(fst, ctx);
}
StreamWithState* streamBuilderIntoStream(FstStreamBuilder* sb) { StreamWithState* streamBuilderIntoStream(FstStreamBuilder* sb) {
if (sb == NULL) { return NULL; } if (sb == NULL) { return NULL; }
return streamWithStateCreate(sb->fst, sb->aut, sb->min, sb->max); return streamWithStateCreate(sb->fst, sb->aut, sb->min, sb->max);
} }
FstStreamWithStateBuilder *fstSearchWithState(Fst *fst, AutomationCtx *ctx) { return fstStreamBuilderCreate(fst, ctx); } FstStreamWithStateBuilder* fstSearchWithState(Fst* fst, AutomationCtx* ctx) {
return fstStreamBuilderCreate(fst, ctx);
}
FstNode* fstGetRoot(Fst* fst) { FstNode* fstGetRoot(Fst* fst) {
if (fst->root != NULL) { return fst->root; } if (fst->root != NULL) { return fst->root; }
@ -1023,9 +1030,15 @@ FstNode *fstGetRoot(Fst *fst) {
fst->root = fstGetNode(fst, rAddr); fst->root = fstGetNode(fst, rAddr);
return fst->root; return fst->root;
} }
FstNode * fstGetNode(Fst *fst, CompiledAddr addr) { return fstNodeCreate(fst->meta->version, addr, fst->data); } FstNode* fstGetNode(Fst* fst, CompiledAddr addr) {
FstType fstGetType(Fst *fst) { return fst->meta->ty; } return fstNodeCreate(fst->meta->version, addr, fst->data);
CompiledAddr fstGetRootAddr(Fst *fst) { return fst->meta->rootAddr; } }
FstType fstGetType(Fst* fst) {
return fst->meta->ty;
}
CompiledAddr fstGetRootAddr(Fst* fst) {
return fst->meta->rootAddr;
}
Output fstEmptyFinalOutput(Fst* fst, bool* null) { Output fstEmptyFinalOutput(Fst* fst, bool* null) {
Output res = 0; Output res = 0;
@ -1081,12 +1094,15 @@ bool fstBoundWithDataIsEmpty(FstBoundWithData *bound) {
} }
} }
bool fstBoundWithDataIsIncluded(FstBoundWithData *bound) { return bound->type == Excluded ? false : true; } bool fstBoundWithDataIsIncluded(FstBoundWithData* bound) {
return bound->type == Excluded ? false : true;
}
void fstBoundDestroy(FstBoundWithData *bound) { free(bound); } void fstBoundDestroy(FstBoundWithData* bound) {
free(bound);
}
StreamWithState *streamWithStateCreate( StreamWithState* streamWithStateCreate(Fst* fst, AutomationCtx* automation, FstBoundWithData* min, FstBoundWithData* max) {
Fst *fst, AutomationCtx *automation, FstBoundWithData *min, FstBoundWithData *max) {
StreamWithState* sws = calloc(1, sizeof(StreamWithState)); StreamWithState* sws = calloc(1, sizeof(StreamWithState));
if (sws == NULL) { return NULL; } if (sws == NULL) { return NULL; }
@ -1115,9 +1131,7 @@ void streamWithStateDestroy(StreamWithState *sws) {
bool streamWithStateSeekMin(StreamWithState* sws, FstBoundWithData* min) { bool streamWithStateSeekMin(StreamWithState* sws, FstBoundWithData* min) {
AutomationCtx* aut = sws->aut; AutomationCtx* aut = sws->aut;
if (fstBoundWithDataIsEmpty(min)) { if (fstBoundWithDataIsEmpty(min)) {
if (fstBoundWithDataIsIncluded(min)) { if (fstBoundWithDataIsIncluded(min)) { sws->emptyOutput.out = fstEmptyFinalOutput(sws->fst, &(sws->emptyOutput.null)); }
sws->emptyOutput.out = fstEmptyFinalOutput(sws->fst, &(sws->emptyOutput.null));
}
StreamState s = {.node = fstGetRoot(sws->fst), StreamState s = {.node = fstGetRoot(sws->fst),
.trans = 0, .trans = 0,
.out = {.null = false, .out = 0}, .out = {.null = false, .out = 0},
@ -1189,8 +1203,7 @@ bool streamWithStateSeekMin(StreamWithState *sws, FstBoundWithData *min) {
uint64_t trans = s->trans; uint64_t trans = s->trans;
FstTransition trn; FstTransition trn;
fstNodeGetTransitionAt(n, trans - 1, &trn); fstNodeGetTransitionAt(n, trans - 1, &trn);
StreamState s = { StreamState s = {.node = fstGetNode(sws->fst, trn.addr), .trans = 0, .out = {.null = false, .out = out}, .autState = autState};
.node = fstGetNode(sws->fst, trn.addr), .trans = 0, .out = {.null = false, .out = out}, .autState = autState};
taosArrayPush(sws->stack, &s); taosArrayPush(sws->stack, &s);
return true; return true;
} }
@ -1245,7 +1258,9 @@ StreamWithStateResult *streamWithStateNextWith(StreamWithState *sws, StreamCallb
size_t isz = taosArrayGetSize(sws->inp); size_t isz = taosArrayGetSize(sws->inp);
uint8_t* buf = (uint8_t*)malloc(isz * sizeof(uint8_t)); uint8_t* buf = (uint8_t*)malloc(isz * sizeof(uint8_t));
for (uint32_t i = 0; i < isz; i++) { buf[i] = *(uint8_t *)taosArrayGet(sws->inp, i); } for (uint32_t i = 0; i < isz; i++) {
buf[i] = *(uint8_t*)taosArrayGet(sws->inp, i);
}
FstSlice slice = fstSliceCreate(buf, taosArrayGetSize(sws->inp)); FstSlice slice = fstSliceCreate(buf, taosArrayGetSize(sws->inp));
if (fstBoundWithDataExceededBy(sws->endAt, &slice)) { if (fstBoundWithDataExceededBy(sws->endAt, &slice)) {
taosArrayDestroyEx(sws->stack, streamStateDestroy); taosArrayDestroyEx(sws->stack, streamStateDestroy);

View File

@ -17,9 +17,7 @@
StartWithStateValue* startWithStateValueCreate(StartWithStateKind kind, ValueType ty, void* val) { StartWithStateValue* startWithStateValueCreate(StartWithStateKind kind, ValueType ty, void* val) {
StartWithStateValue* nsv = calloc(1, sizeof(StartWithStateValue)); StartWithStateValue* nsv = calloc(1, sizeof(StartWithStateValue));
if (nsv == NULL) { if (nsv == NULL) { return NULL; }
return NULL;
}
nsv->kind = kind; nsv->kind = kind;
nsv->type = ty; nsv->type = ty;
@ -37,9 +35,7 @@ StartWithStateValue *startWithStateValueCreate(StartWithStateKind kind, ValueTyp
} }
void startWithStateValueDestroy(void* val) { void startWithStateValueDestroy(void* val) {
StartWithStateValue* sv = (StartWithStateValue*)val; StartWithStateValue* sv = (StartWithStateValue*)val;
if (sv == NULL) { if (sv == NULL) { return; }
return;
}
if (sv->type == FST_INT) { if (sv->type == FST_INT) {
// //
@ -52,9 +48,7 @@ void startWithStateValueDestroy(void *val) {
} }
StartWithStateValue* startWithStateValueDump(StartWithStateValue* sv) { StartWithStateValue* startWithStateValueDump(StartWithStateValue* sv) {
StartWithStateValue* nsv = calloc(1, sizeof(StartWithStateValue)); StartWithStateValue* nsv = calloc(1, sizeof(StartWithStateValue));
if (nsv == NULL) { if (nsv == NULL) { return NULL; }
return NULL;
}
nsv->kind = sv->kind; nsv->kind = sv->kind;
nsv->type = sv->type; nsv->type = sv->type;
@ -65,6 +59,7 @@ StartWithStateValue *startWithStateValueDump(StartWithStateValue *sv) {
nsv->ptr = (char*)calloc(1, len + 1); nsv->ptr = (char*)calloc(1, len + 1);
memcpy(nsv->ptr, sv->ptr, len); memcpy(nsv->ptr, sv->ptr, len);
} else if (nsv->type == FST_ARRAY) { } else if (nsv->type == FST_ARRAY) {
//
} }
return nsv; return nsv;
} }
@ -83,17 +78,15 @@ static bool prefixCanMatch(AutomationCtx *ctx, void *sv) {
StartWithStateValue* ssv = (StartWithStateValue*)sv; StartWithStateValue* ssv = (StartWithStateValue*)sv;
return ssv->val >= 0; return ssv->val >= 0;
} }
static bool prefixWillAlwaysMatch(AutomationCtx *ctx, void *state) { return true; } static bool prefixWillAlwaysMatch(AutomationCtx* ctx, void* state) {
return true;
}
static void* prefixAccept(AutomationCtx* ctx, void* state, uint8_t byte) { static void* prefixAccept(AutomationCtx* ctx, void* state, uint8_t byte) {
StartWithStateValue* ssv = (StartWithStateValue*)state; StartWithStateValue* ssv = (StartWithStateValue*)state;
if (ssv == NULL || ctx == NULL) { if (ssv == NULL || ctx == NULL) { return NULL; }
return NULL;
}
char* data = ctx->data; char* data = ctx->data;
if (ssv->kind == Done) { if (ssv->kind == Done) { return startWithStateValueCreate(Done, FST_INT, &ssv->val); }
return startWithStateValueCreate(Done, FST_INT, &ssv->val);
}
if ((strlen(data) > ssv->val) && data[ssv->val] == byte) { if ((strlen(data) > ssv->val) && data[ssv->val] == byte) {
int val = ssv->val + 1; int val = ssv->val + 1;
StartWithStateValue* nsv = startWithStateValueCreate(Running, FST_INT, &val); StartWithStateValue* nsv = startWithStateValueCreate(Running, FST_INT, &val);
@ -106,18 +99,32 @@ static void *prefixAccept(AutomationCtx *ctx, void *state, uint8_t byte) {
} }
return NULL; return NULL;
} }
static void *prefixAcceptEof(AutomationCtx *ctx, void *state) { return NULL; } static void* prefixAcceptEof(AutomationCtx* ctx, void* state) {
return NULL;
}
// pattern query, impl later // pattern query, impl later
static void *patternStart(AutomationCtx *ctx) { return NULL; } static void* patternStart(AutomationCtx* ctx) {
static bool patternIsMatch(AutomationCtx *ctx, void *data) { return true; } return NULL;
static bool patternCanMatch(AutomationCtx *ctx, void *data) { return true; } }
static bool patternWillAlwaysMatch(AutomationCtx *ctx, void *state) { return true; } static bool patternIsMatch(AutomationCtx* ctx, void* data) {
return true;
}
static bool patternCanMatch(AutomationCtx* ctx, void* data) {
return true;
}
static bool patternWillAlwaysMatch(AutomationCtx* ctx, void* state) {
return true;
}
static void *patternAccept(AutomationCtx *ctx, void *state, uint8_t byte) { return NULL; } static void* patternAccept(AutomationCtx* ctx, void* state, uint8_t byte) {
return NULL;
}
static void *patternAcceptEof(AutomationCtx *ctx, void *state) { return NULL; } static void* patternAcceptEof(AutomationCtx* ctx, void* state) {
return NULL;
}
AutomationFunc automFuncs[] = { AutomationFunc automFuncs[] = {
{prefixStart, prefixIsMatch, prefixCanMatch, prefixWillAlwaysMatch, prefixAccept, prefixAcceptEof}, {prefixStart, prefixIsMatch, prefixCanMatch, prefixWillAlwaysMatch, prefixAccept, prefixAcceptEof},
@ -127,9 +134,7 @@ AutomationFunc automFuncs[] = {
AutomationCtx* automCtxCreate(void* data, AutomationType atype) { AutomationCtx* automCtxCreate(void* data, AutomationType atype) {
AutomationCtx* ctx = calloc(1, sizeof(AutomationCtx)); AutomationCtx* ctx = calloc(1, sizeof(AutomationCtx));
if (ctx == NULL) { if (ctx == NULL) { return NULL; }
return NULL;
}
StartWithStateValue* sv = NULL; StartWithStateValue* sv = NULL;
if (atype == AUTOMATION_PREFIX) { if (atype == AUTOMATION_PREFIX) {

View File

@ -274,260 +274,20 @@ const uint8_t COMMON_INPUTS[] = {
}; };
const char COMMON_INPUTS_INV[] = { const char COMMON_INPUTS_INV[] = {
't', 't', 'e', '/', 'o', 'a', 's', 'r', 'i', 'p', 'c', 'n', 'w', '.', 'h', 'l', 'm',
'e', '-', 'd', 'u', '0', '1', '2', 'g', '=', ':', 'b', 'f', '3', 'y', '5', '&', '_',
'/', '4', 'v', '9', '6', '7', '8', 'k', '%', '?', 'x', 'C', 'D', 'A', 'S', 'F', 'I',
'o', 'B', 'E', 'j', 'P', 'T', 'z', 'R', 'N', 'M', '+', 'L', 'O', 'q', 'H', 'G', 'W',
'a', 'U', 'V', ',', 'Y', 'K', 'J', 'Z', 'X', 'Q', ';', ')', '(', '~', '[', ']', '$',
's', '!', '\'', '*', '@', '\x00', '\x01', '\x02', '\x03', '\x04', '\x05', '\x06', '\x07', '\x08', '\t', '\n', '\x0b',
'r', '\x0c', '\r', '\x0e', '\x0f', '\x10', '\x11', '\x12', '\x13', '\x14', '\x15', '\x16', '\x17', '\x18', '\x19', '\x1a', '\x1b',
'i', '\x1c', '\x1d', '\x1e', '\x1f', ' ', '"', '#', '<', '>', '\\', '^', '`', '{', '|', '}', '\x7f',
'p', '\x80', '\x81', '\x82', '\x83', '\x84', '\x85', '\x86', '\x87', '\x88', '\x89', '\x8a', '\x8b', '\x8c', '\x8d', '\x8e', '\x8f',
'c', '\x90', '\x91', '\x92', '\x93', '\x94', '\x95', '\x96', '\x97', '\x98', '\x99', '\x9a', '\x9b', '\x9c', '\x9d', '\x9e', '\x9f',
'n', '\xa0', '\xa1', '\xa2', '\xa3', '\xa4', '\xa5', '\xa6', '\xa7', '\xa8', '\xa9', '\xaa', '\xab', '\xac', '\xad', '\xae', '\xaf',
'w', '\xb0', '\xb1', '\xb2', '\xb3', '\xb4', '\xb5', '\xb6', '\xb7', '\xb8', '\xb9', '\xba', '\xbb', '\xbc', '\xbd', '\xbe', '\xbf',
'.', '\xc0', '\xc1', '\xc2', '\xc3', '\xc4', '\xc5', '\xc6', '\xc7', '\xc8', '\xc9', '\xca', '\xcb', '\xcc', '\xcd', '\xce', '\xcf',
'h', '\xd0', '\xd1', '\xd2', '\xd3', '\xd4', '\xd5', '\xd6', '\xd7', '\xd8', '\xd9', '\xda', '\xdb', '\xdc', '\xdd', '\xde', '\xdf',
'l', '\xe0', '\xe1', '\xe2', '\xe3', '\xe4', '\xe5', '\xe6', '\xe7', '\xe8', '\xe9', '\xea', '\xeb', '\xec', '\xed', '\xee', '\xef',
'm', '\xf0', '\xf1', '\xf2', '\xf3', '\xf4', '\xf5', '\xf6', '\xf7', '\xf8', '\xf9', '\xfa', '\xfb', '\xfc', '\xfd', '\xfe', '\xff',
'-',
'd',
'u',
'0',
'1',
'2',
'g',
'=',
':',
'b',
'f',
'3',
'y',
'5',
'&',
'_',
'4',
'v',
'9',
'6',
'7',
'8',
'k',
'%',
'?',
'x',
'C',
'D',
'A',
'S',
'F',
'I',
'B',
'E',
'j',
'P',
'T',
'z',
'R',
'N',
'M',
'+',
'L',
'O',
'q',
'H',
'G',
'W',
'U',
'V',
',',
'Y',
'K',
'J',
'Z',
'X',
'Q',
';',
')',
'(',
'~',
'[',
']',
'$',
'!',
'\'',
'*',
'@',
'\x00',
'\x01',
'\x02',
'\x03',
'\x04',
'\x05',
'\x06',
'\x07',
'\x08',
'\t',
'\n',
'\x0b',
'\x0c',
'\r',
'\x0e',
'\x0f',
'\x10',
'\x11',
'\x12',
'\x13',
'\x14',
'\x15',
'\x16',
'\x17',
'\x18',
'\x19',
'\x1a',
'\x1b',
'\x1c',
'\x1d',
'\x1e',
'\x1f',
' ',
'"',
'#',
'<',
'>',
'\\',
'^',
'`',
'{',
'|',
'}',
'\x7f',
'\x80',
'\x81',
'\x82',
'\x83',
'\x84',
'\x85',
'\x86',
'\x87',
'\x88',
'\x89',
'\x8a',
'\x8b',
'\x8c',
'\x8d',
'\x8e',
'\x8f',
'\x90',
'\x91',
'\x92',
'\x93',
'\x94',
'\x95',
'\x96',
'\x97',
'\x98',
'\x99',
'\x9a',
'\x9b',
'\x9c',
'\x9d',
'\x9e',
'\x9f',
'\xa0',
'\xa1',
'\xa2',
'\xa3',
'\xa4',
'\xa5',
'\xa6',
'\xa7',
'\xa8',
'\xa9',
'\xaa',
'\xab',
'\xac',
'\xad',
'\xae',
'\xaf',
'\xb0',
'\xb1',
'\xb2',
'\xb3',
'\xb4',
'\xb5',
'\xb6',
'\xb7',
'\xb8',
'\xb9',
'\xba',
'\xbb',
'\xbc',
'\xbd',
'\xbe',
'\xbf',
'\xc0',
'\xc1',
'\xc2',
'\xc3',
'\xc4',
'\xc5',
'\xc6',
'\xc7',
'\xc8',
'\xc9',
'\xca',
'\xcb',
'\xcc',
'\xcd',
'\xce',
'\xcf',
'\xd0',
'\xd1',
'\xd2',
'\xd3',
'\xd4',
'\xd5',
'\xd6',
'\xd7',
'\xd8',
'\xd9',
'\xda',
'\xdb',
'\xdc',
'\xdd',
'\xde',
'\xdf',
'\xe0',
'\xe1',
'\xe2',
'\xe3',
'\xe4',
'\xe5',
'\xe6',
'\xe7',
'\xe8',
'\xe9',
'\xea',
'\xeb',
'\xec',
'\xed',
'\xee',
'\xef',
'\xf0',
'\xf1',
'\xf2',
'\xf3',
'\xf4',
'\xf5',
'\xf6',
'\xf7',
'\xf8',
'\xf9',
'\xfa',
'\xfb',
'\xfc',
'\xfd',
'\xfe',
'\xff',
}; };

View File

@ -125,7 +125,9 @@ int fstCountingWriterRead(FstCountingWriter *write, uint8_t *buf, uint32_t len)
return nRead; return nRead;
} }
uint32_t fstCountingWriterMaskedCheckSum(FstCountingWriter *write) { return 0; } uint32_t fstCountingWriterMaskedCheckSum(FstCountingWriter* write) {
return 0;
}
int fstCountingWriterFlush(FstCountingWriter* write) { int fstCountingWriterFlush(FstCountingWriter* write) {
WriterCtx* ctx = write->wrt; WriterCtx* ctx = write->wrt;
ctx->flush(ctx); ctx->flush(ctx);

View File

@ -22,45 +22,31 @@ FstBuilderNode *fstBuilderNodeDefault() {
return bn; return bn;
} }
void fstBuilderNodeDestroy(FstBuilderNode* node) { void fstBuilderNodeDestroy(FstBuilderNode* node) {
if (node == NULL) { if (node == NULL) { return; }
return;
}
taosArrayDestroy(node->trans); taosArrayDestroy(node->trans);
free(node); free(node);
} }
bool fstBuilderNodeEqual(FstBuilderNode* n1, FstBuilderNode* n2) { bool fstBuilderNodeEqual(FstBuilderNode* n1, FstBuilderNode* n2) {
if (n1 == n2) { if (n1 == n2) { return true; }
return true; if (n1 == NULL || n2 == NULL) { return false; }
}
if (n1 == NULL || n2 == NULL) {
return false;
}
if (n1->isFinal != n2->isFinal || n1->finalOutput != n2->finalOutput) { if (n1->isFinal != n2->isFinal || n1->finalOutput != n2->finalOutput) { return false; }
return false;
}
size_t s1 = n1->trans ? taosArrayGetSize(n1->trans) : 0; size_t s1 = n1->trans ? taosArrayGetSize(n1->trans) : 0;
size_t s2 = n2->trans ? taosArrayGetSize(n2->trans) : 0; size_t s2 = n2->trans ? taosArrayGetSize(n2->trans) : 0;
if (s1 != s2) { if (s1 != s2) { return false; }
return false;
}
for (size_t i = 0; i < s1; i++) { for (size_t i = 0; i < s1; i++) {
FstTransition* t1 = taosArrayGet(n1->trans, i); FstTransition* t1 = taosArrayGet(n1->trans, i);
FstTransition* t2 = taosArrayGet(n2->trans, i); FstTransition* t2 = taosArrayGet(n2->trans, i);
if (t1->inp != t2->inp || t1->out != t2->out || t1->addr != t2->addr) { if (t1->inp != t2->inp || t1->out != t2->out || t1->addr != t2->addr) { return false; }
return false;
}
} }
return true; return true;
} }
FstBuilderNode* fstBuilderNodeClone(FstBuilderNode* src) { FstBuilderNode* fstBuilderNodeClone(FstBuilderNode* src) {
FstBuilderNode* node = malloc(sizeof(FstBuilderNode)); FstBuilderNode* node = malloc(sizeof(FstBuilderNode));
if (node == NULL) { if (node == NULL) { return NULL; }
return NULL;
}
// //
size_t sz = taosArrayGetSize(src->trans); size_t sz = taosArrayGetSize(src->trans);
@ -78,9 +64,7 @@ FstBuilderNode *fstBuilderNodeClone(FstBuilderNode *src) {
} }
// not destroy src, User's bussiness // not destroy src, User's bussiness
void fstBuilderNodeCloneFrom(FstBuilderNode* dst, FstBuilderNode* src) { void fstBuilderNodeCloneFrom(FstBuilderNode* dst, FstBuilderNode* src) {
if (dst == NULL || src == NULL) { if (dst == NULL || src == NULL) { return; }
return;
}
dst->isFinal = src->isFinal; dst->isFinal = src->isFinal;
dst->finalOutput = src->finalOutput; dst->finalOutput = src->finalOutput;

View File

@ -164,4 +164,6 @@ FstRegistryEntry *fstRegistryGetEntry(FstRegistry *registry, FstBuilderNode *bNo
} }
return entry; return entry;
} }
void fstRegistryEntryDestroy(FstRegistryEntry *entry) { free(entry); } void fstRegistryEntryDestroy(FstRegistryEntry* entry) {
free(entry);
}

View File

@ -130,7 +130,9 @@ FstSlice fstSliceDeepCopy(FstSlice *s, int32_t start, int32_t end) {
ans.end = tlen - 1; ans.end = tlen - 1;
return ans; return ans;
} }
bool fstSliceIsEmpty(FstSlice *s) { return s->str == NULL || s->str->len == 0 || s->start < 0 || s->end < 0; } bool fstSliceIsEmpty(FstSlice* s) {
return s->str == NULL || s->str->len == 0 || s->start < 0 || s->end < 0;
}
uint8_t* fstSliceData(FstSlice* s, int32_t* size) { uint8_t* fstSliceData(FstSlice* s, int32_t* size) {
FstString* str = s->str; FstString* str = s->str;

View File

@ -22,7 +22,21 @@
#include "index_util.h" #include "index_util.h"
#include "taosdef.h" #include "taosdef.h"
static FORCE_INLINE int tfileWriteHeader(TFileWriter *writer) {} static FORCE_INLINE int tfileWriteHeader(TFileWriter* writer) {
char buf[TFILE_HEADER_SIZE] = {0};
char* p = buf;
TFileHeader* header = &writer->header;
SERIALIZE_MEM_TO_BUF(p, header, suid);
SERIALIZE_MEM_TO_BUF(p, header, version);
SERIALIZE_VAR_TO_BUF(p, strlen(header->colName), int32_t);
SERIALIZE_STR_MEM_TO_BUF(p, header, colName, strlen(header->colName));
SERIALIZE_MEM_TO_BUF(p, header, colType);
int offset = p - buf;
int nwrite = writer->ctx->write(writer->ctx, buf, offset);
if (offset != nwrite) { return -1; }
}
static FORCE_INLINE int tfileReadLoadHeader(TFileReader* reader) { static FORCE_INLINE int tfileReadLoadHeader(TFileReader* reader) {
// TODO simple tfile header later // TODO simple tfile header later
char buf[TFILE_HADER_PRE_SIZE]; char buf[TFILE_HADER_PRE_SIZE];
@ -46,7 +60,8 @@ static FORCE_INLINE int tfileReadLoadHeader(TFileReader *reader) {
nread = reader->ctx->read(reader->ctx, &header->colType, sizeof(header->colType)); nread = reader->ctx->read(reader->ctx, &header->colType, sizeof(header->colType));
return 0; return 0;
}; }
static int tfileGetFileList(const char* path, SArray* result) { static int tfileGetFileList(const char* path, SArray* result) {
DIR* dir = opendir(path); DIR* dir = opendir(path);
if (NULL == dir) { return -1; } if (NULL == dir) { return -1; }
@ -124,8 +139,12 @@ TFileCache *tfileCacheCreate(const char *path) {
// loader fst and validate it // loader fst and validate it
TFileHeader* header = &reader->header; TFileHeader* header = &reader->header;
TFileCacheKey key = { TFileCacheKey key = {.suid = header->suid,
.suid = header->suid, .version = header->version, .colName = header->colName, .nColName = strlen(header->colName), .colType = header->colType}; .version = header->version,
.colName = header->colName,
.nColName = strlen(header->colName),
.colType = header->colType};
char buf[128] = {0}; char buf[128] = {0};
tfileSerialCacheKey(&key, buf); tfileSerialCacheKey(&key, buf);
taosHashPut(tcache->tableCache, buf, strlen(buf), &reader, sizeof(void*)); taosHashPut(tcache->tableCache, buf, strlen(buf), &reader, sizeof(void*));
@ -206,17 +225,20 @@ TFileWriter *tfileWriterCreate(WriterCtx *ctx, TFileHeader *header) {
// TFileHeader header = {.suid = suid, .version = version, .colName = {0}, colType = colType}; // TFileHeader header = {.suid = suid, .version = version, .colName = {0}, colType = colType};
// memcpy(header.colName, ); // memcpy(header.colName, );
char buf[TFILE_HADER_PRE_SIZE]; // char buf[TFILE_HADER_PRE_SIZE];
int len = TFILE_HADER_PRE_SIZE; // int len = TFILE_HADER_PRE_SIZE;
if (len != ctx->write(ctx, buf, len)) { // if (len != ctx->write(ctx, buf, len)) {
indexError("index: %" PRIu64 " failed to write header info", header->suid); // indexError("index: %" PRIu64 " failed to write header info", header->suid);
return NULL; // return NULL;
} //}
TFileWriter* tw = calloc(1, sizeof(TFileWriter)); TFileWriter* tw = calloc(1, sizeof(TFileWriter));
if (tw == NULL) { if (tw == NULL) {
indexError("index: %" PRIu64 " failed to alloc TFilerWriter", header->suid); indexError("index: %" PRIu64 " failed to alloc TFilerWriter", header->suid);
return NULL; return NULL;
} }
tw->ctx = ctx;
tw->header = *header;
return tw; return tw;
} }
void tfileWriterDestroy(TFileWriter* tw) { void tfileWriterDestroy(TFileWriter* tw) {
@ -233,7 +255,9 @@ IndexTFile *indexTFileCreate(const char *path) {
tfile->cache = tfileCacheCreate(path); tfile->cache = tfileCacheCreate(path);
return tfile; return tfile;
} }
void IndexTFileDestroy(IndexTFile *tfile) { free(tfile); } void IndexTFileDestroy(IndexTFile* tfile) {
free(tfile);
}
int indexTFileSearch(void* tfile, SIndexTermQuery* query, SArray* result) { int indexTFileSearch(void* tfile, SIndexTermQuery* query, SArray* result) {
if (tfile == NULL) { return -1; } if (tfile == NULL) { return -1; }