diff --git a/source/libs/index/inc/index_fst_registry.h b/source/libs/index/inc/index_fst_registry.h index 1b0922c724..948bd41781 100644 --- a/source/libs/index/inc/index_fst_registry.h +++ b/source/libs/index/inc/index_fst_registry.h @@ -30,9 +30,7 @@ typedef struct FstRegistryCell { #define FST_REGISTRY_CELL_IS_EMPTY(cell) (cell->addr == NONE_ADDRESS) #define FST_REGISTRY_CELL_INSERT(cell, tAddr) \ - do { \ - cell->addr = tAddr; \ - } while (0) + do { cell->addr = tAddr; } while (0) // typedef struct FstRegistryCache { // SArray *cells; diff --git a/source/libs/index/inc/index_tfile.h b/source/libs/index/inc/index_tfile.h index 7c6261aacf..71b399a7e0 100644 --- a/source/libs/index/inc/index_tfile.h +++ b/source/libs/index/inc/index_tfile.h @@ -26,26 +26,26 @@ extern "C" { #endif -// tfile header +// tfile header content // |<---suid--->|<---version--->|<--colLen-->|<-colName->|<---type-->| // |<-uint64_t->|<---int32_t--->|<--int32_t->|<-colLen-->|<-uint8_t->| -typedef struct TFileReadHeader { +typedef struct TFileHeader { uint64_t suid; int32_t version; char colName[128]; // uint8_t colType; -} TFileReadHeader; +} TFileHeader; #define TFILE_HEADER_SIZE (sizeof(TFILE_HEADER_SIZE) + sizeof(uint32_t)); #define TFILE_HADER_PRE_SIZE (sizeof(uint64_t) + sizeof(int32_t) + sizeof(int32_t)) typedef struct TFileCacheKey { - uint64_t suid; - uint8_t colType; - int32_t version; - const char *colName; - int32_t nColName; + uint64_t suid; + uint8_t colType; + int32_t version; + char * colName; + int32_t nColName; } TFileCacheKey; // table cache @@ -59,13 +59,14 @@ typedef struct TFileCache { typedef struct TFileWriter { FstBuilder *fb; WriterCtx * ctx; + TFileHeader header; } TFileWriter; typedef struct TFileReader { T_REF_DECLARE() - Fst * fst; - WriterCtx * ctx; - TFileReadHeader header; + Fst * fst; + WriterCtx * ctx; + TFileHeader header; } TFileReader; typedef struct IndexTFile { @@ -94,11 +95,13 @@ void tfileCacheDestroy(TFileCache *tcache); TFileReader *tfileCacheGet(TFileCache *tcache, TFileCacheKey *key); void tfileCachePut(TFileCache *tcache, TFileCacheKey *key, TFileReader *reader); -TFileReader *tfileReaderCreate(); +TFileReader *tfileReaderCreate(WriterCtx *ctx); void TFileReaderDestroy(TFileReader *reader); -TFileWriter *tfileWriterCreate(const char *suid, const char *colName); +TFileWriter *tfileWriterCreate(WriterCtx *ctx, TFileHeader *header); void tfileWriterDestroy(TFileWriter *tw); +int tfileWriterPut(TFileWriter *tw, const char *key, int32_t nKey, const char *val, int32_t nVal); +int tfileWriterFinish(TFileWriter *tw); // IndexTFile *indexTFileCreate(const char *path); diff --git a/source/libs/index/src/index.c b/source/libs/index/src/index.c index cc68324017..e034b582ea 100644 --- a/source/libs/index/src/index.c +++ b/source/libs/index/src/index.c @@ -49,15 +49,14 @@ static int indexMergeFinalResults(SArray *interResults, EIndexOperatorType oTyp int indexOpen(SIndexOpts *opts, const char *path, SIndex **index) { pthread_once(&isInit, indexInit); SIndex *sIdx = calloc(1, sizeof(SIndex)); - if (sIdx == NULL) { - return -1; - } + if (sIdx == NULL) { return -1; } #ifdef USE_LUCENE index_t *index = index_open(path); sIdx->index = index; #endif +#ifdef USE_INVERTED_INDEX sIdx->cache = (void *)indexCacheCreate(); sIdx->tindex = NULL; sIdx->colObj = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); @@ -67,6 +66,10 @@ int indexOpen(SIndexOpts *opts, const char *path, SIndex **index) { *index = sIdx; return 0; +#endif + + *index = NULL; + return -1; } void indexClose(SIndex *sIdx) { @@ -126,9 +129,7 @@ int indexPut(SIndex *index, SIndexMultiTerm *fVals, uint64_t uid) { int32_t colId = fi->colId; int32_t version = index->cVersion; int ret = indexCachePut(index->cache, p, colId, version, uid); - if (ret != 0) { - return ret; - } + if (ret != 0) { return ret; } } #endif @@ -158,9 +159,7 @@ int indexSearch(SIndex *index, SIndexMultiTermQuery *multiQuerys, SArray *result int tsz = 0; index_multi_search(index->index, (const char **)fields, (const char **)keys, types, nQuery, opera, &tResult, &tsz); - for (int i = 0; i < tsz; i++) { - taosArrayPush(result, &tResult[i]); - } + for (int i = 0; i < tsz; i++) { taosArrayPush(result, &tResult[i]); } for (int i = 0; i < nQuery; i++) { free(fields[i]); @@ -216,9 +215,7 @@ void indexOptsDestroy(SIndexOpts *opts){ SIndexMultiTermQuery *indexMultiTermQueryCreate(EIndexOperatorType opera) { SIndexMultiTermQuery *p = (SIndexMultiTermQuery *)malloc(sizeof(SIndexMultiTermQuery)); - if (p == NULL) { - return NULL; - } + if (p == NULL) { return NULL; } p->opera = opera; p->query = taosArrayInit(4, sizeof(SIndexTermQuery)); return p; @@ -240,9 +237,7 @@ int indexMultiTermQueryAdd(SIndexMultiTermQuery *pQuery, SIndexTerm *term, EInde SIndexTerm *indexTermCreate(int64_t suid, SIndexOperOnColumn oper, uint8_t colType, const char *colName, int32_t nColName, const char *colVal, int32_t nColVal) { SIndexTerm *t = (SIndexTerm *)calloc(1, (sizeof(SIndexTerm))); - if (t == NULL) { - return NULL; - } + if (t == NULL) { return NULL; } t->suid = suid; t->operType = oper; @@ -320,9 +315,7 @@ static int indexTermSearch(SIndex *sIdx, SIndexTermQuery *query, SArray **result return 0; } static void indexInterResultsDestroy(SArray *results) { - if (results == NULL) { - return; - } + if (results == NULL) { return; } size_t sz = taosArrayGetSize(results); for (size_t i = 0; i < sz; i++) { @@ -336,6 +329,7 @@ static int indexMergeFinalResults(SArray *interResults, EIndexOperatorType oType SArray *first = taosArrayGetP(interResults, 0); taosArraySort(first, uidCompare); taosArrayRemoveDuplicate(first, uidCompare, NULL); + if (oType == MUST) { // just one column index, enhance later taosArrayAddAll(fResults, first); @@ -351,9 +345,7 @@ static int indexMergeFinalResults(SArray *interResults, EIndexOperatorType oType return 0; } static int indexMergeCacheIntoTindex(SIndex *sIdx) { - if (sIdx == NULL) { - return -1; - } + if (sIdx == NULL) { return -1; } indexWarn("suid %" PRIu64 " merge cache into tindex", sIdx->suid); return 0; } diff --git a/source/libs/index/src/index_cache.c b/source/libs/index/src/index_cache.c index dd8a8bcbb6..52789bc642 100644 --- a/source/libs/index/src/index_cache.c +++ b/source/libs/index/src/index_cache.c @@ -40,9 +40,7 @@ static int32_t compareKey(const void *l, const void *r) { int16_t lf, rf; // field id memcpy(&lf, lp, sizeof(lf)); memcpy(&rf, rp, sizeof(rf)); - if (lf != rf) { - return lf < rf ? -1 : 1; - } + if (lf != rf) { return lf < rf ? -1 : 1; } lp += sizeof(lf); rp += sizeof(rf); @@ -89,9 +87,8 @@ static int32_t compareKey(const void *l, const void *r) { int32_t lv, rv; memcpy(&lv, lp, sizeof(lv)); memcpy(&rv, rp, sizeof(rv)); - if (lv != rv) { - return lv > rv ? -1 : 1; - } + if (lv != rv) { return lv > rv ? -1 : 1; } + lp += sizeof(lv); rp += sizeof(rv); // not care item type @@ -100,6 +97,10 @@ static int32_t compareKey(const void *l, const void *r) { } IndexCache *indexCacheCreate() { IndexCache *cache = calloc(1, sizeof(IndexCache)); + if (cache == NULL) { + indexError("failed to create index cache"); + return NULL; + } cache->skiplist = tSkipListCreate( MAX_SKIP_LIST_LEVEL, TSDB_DATA_TYPE_BINARY, MAX_INDEX_KEY_LEN, compareKey, SL_ALLOW_DUP_KEY, getIndexKey); return cache; @@ -107,23 +108,20 @@ IndexCache *indexCacheCreate() { void indexCacheDestroy(void *cache) { IndexCache *pCache = cache; - if (pCache == NULL) { - return; - } + if (pCache == NULL) { return; } tSkipListDestroy(pCache->skiplist); free(pCache); } int indexCachePut(void *cache, SIndexTerm *term, int16_t colId, int32_t version, uint64_t uid) { - if (cache == NULL) { - return -1; - } + if (cache == NULL) { return -1; } IndexCache *pCache = cache; // encode data int32_t total = CACHE_KEY_LEN(term); - char * buf = calloc(1, total); - char * p = buf; + + char *buf = calloc(1, total); + char *p = buf; SERIALIZE_VAR_TO_BUF(p, total, int32_t); SERIALIZE_VAR_TO_BUF(p, colId, int16_t); @@ -145,11 +143,13 @@ int indexCacheDel(void *cache, int32_t fieldId, const char *fieldValue, int32_t IndexCache *pCache = cache; return 0; } -int indexCacheSearch( - void *cache, SIndexTermQuery *query, int16_t colId, int32_t version, SArray *result, STermValueType *s) { - if (cache == NULL) { - return -1; - } +int indexCacheSearch(void *cache, + SIndexTermQuery * query, + int16_t colId, + int32_t version, + SArray * result, + STermValueType * s) { + if (cache == NULL) { return -1; } IndexCache * pCache = cache; SIndexTerm * term = query->term; EIndexQueryType qtype = query->qType; @@ -158,9 +158,13 @@ int indexCacheSearch( char *buf = calloc(1, keyLen); if (qtype == QUERY_TERM) { + // } else if (qtype == QUERY_PREFIX) { + // } else if (qtype == QUERY_SUFFIX) { + // } else if (qtype == QUERY_REGEX) { + // } return 0; diff --git a/source/libs/index/src/index_fst.c b/source/libs/index/src/index_fst.c index adb002c2b7..a425cc676f 100644 --- a/source/libs/index/src/index_fst.c +++ b/source/libs/index/src/index_fst.c @@ -31,9 +31,7 @@ static uint8_t fstPackDetla(FstCountingWriter *wrt, CompiledAddr nodeAddr, Compi FstUnFinishedNodes *fstUnFinishedNodesCreate() { FstUnFinishedNodes *nodes = malloc(sizeof(FstUnFinishedNodes)); - if (nodes == NULL) { - return NULL; - } + if (nodes == NULL) { return NULL; } nodes->stack = (SArray *)taosArrayInit(64, sizeof(FstBuilderNodeUnfinished)); fstUnFinishedNodesPushEmpty(nodes, false); @@ -46,9 +44,7 @@ void unFinishedNodeDestroyElem(void *elem) { b->last = NULL; } void fstUnFinishedNodesDestroy(FstUnFinishedNodes *nodes) { - if (nodes == NULL) { - return; - } + if (nodes == NULL) { return; } taosArrayDestroyEx(nodes->stack, unFinishedNodeDestroyElem); free(nodes); @@ -97,9 +93,7 @@ void fstUnFinishedNodesTopLastFreeze(FstUnFinishedNodes *nodes, CompiledAddr add } void fstUnFinishedNodesAddSuffix(FstUnFinishedNodes *nodes, FstSlice bs, Output out) { FstSlice *s = &bs; - if (fstSliceIsEmpty(s)) { - return; - } + if (fstSliceIsEmpty(s)) { return; } size_t sz = taosArrayGetSize(nodes->stack) - 1; FstBuilderNodeUnfinished *un = taosArrayGet(nodes->stack, sz); assert(un->last == NULL); @@ -179,9 +173,7 @@ uint64_t fstUnFinishedNodesFindCommPrefixAndSetOutput(FstUnFinishedNodes *node, FstState fstStateCreateFrom(FstSlice *slice, CompiledAddr addr) { FstState fs = {.state = EmptyFinal, .val = 0}; - if (addr == EMPTY_ADDRESS) { - return fs; - } + if (addr == EMPTY_ADDRESS) { return fs; } uint8_t *data = fstSliceData(slice, NULL); uint8_t v = data[addr]; @@ -236,9 +228,7 @@ void fstStateCompileForOneTrans(FstCountingWriter *w, CompiledAddr addr, FstTran fstStateSetCommInput(&st, trn->inp); bool null = false; uint8_t inp = fstStateCommInput(&st, &null); - if (null == true) { - fstCountingWriterWrite(w, (char *)&trn->inp, sizeof(trn->inp)); - } + if (null == true) { fstCountingWriterWrite(w, (char *)&trn->inp, sizeof(trn->inp)); } fstCountingWriterWrite(w, (char *)(&(st.val)), sizeof(st.val)); return; } @@ -272,9 +262,7 @@ void fstStateCompileForAnyTrans(FstCountingWriter *w, CompiledAddr addr, FstBuil fstStateSetStateNtrans(&st, (uint8_t)sz); if (anyOuts) { - if (FST_BUILDER_NODE_IS_FINAL(node)) { - fstCountingWriterPackUintIn(w, node->finalOutput, oSize); - } + if (FST_BUILDER_NODE_IS_FINAL(node)) { fstCountingWriterPackUintIn(w, node->finalOutput, oSize); } for (int32_t i = sz - 1; i >= 0; i--) { FstTransition *t = taosArrayGet(node->trans, i); fstCountingWriterPackUintIn(w, t->out, oSize); @@ -439,9 +427,7 @@ Output fstStateOutput(FstState *s, FstNode *node) { assert(s->state == OneTrans); uint8_t oSizes = FST_GET_OUTPUT_PACK_SIZE(node->sizes); - if (oSizes == 0) { - return 0; - } + if (oSizes == 0) { return 0; } FstSlice *slice = &node->data; uint8_t tSizes = FST_GET_TRANSITION_PACK_SIZE(node->sizes); @@ -453,9 +439,7 @@ Output fstStateOutputForAnyTrans(FstState *s, FstNode *node, uint64_t i) { assert(s->state == AnyTrans); uint8_t oSizes = FST_GET_OUTPUT_PACK_SIZE(node->sizes); - if (oSizes == 0) { - return 0; - } + if (oSizes == 0) { return 0; } FstSlice *slice = &node->data; uint8_t * data = fstSliceData(slice, NULL); uint64_t at = node->start - fstStateNtransLen(s) - 1 // pack size @@ -468,9 +452,7 @@ Output fstStateOutputForAnyTrans(FstState *s, FstNode *node, uint64_t i) { void fstStateSetFinalState(FstState *s, bool yes) { assert(s->state == AnyTrans); - if (yes) { - s->val |= 0b01000000; - } + if (yes) { s->val |= 0b01000000; } return; } bool fstStateIsFinalState(FstState *s) { @@ -480,9 +462,7 @@ bool fstStateIsFinalState(FstState *s) { void fstStateSetStateNtrans(FstState *s, uint8_t n) { assert(s->state == AnyTrans); - if (n <= 0b00111111) { - s->val = (s->val & 0b11000000) | n; - } + if (n <= 0b00111111) { s->val = (s->val & 0b11000000) | n; } return; } // state_ntrans @@ -514,9 +494,7 @@ uint64_t fstStateNtransLen(FstState *s) { uint64_t fstStateNtrans(FstState *s, FstSlice *slice) { bool null = false; uint8_t n = fstStateStateNtrans(s, &null); - if (null != true) { - return n; - } + if (null != true) { return n; } int32_t len; uint8_t *data = fstSliceData(slice, &len); n = data[len - 2]; @@ -526,9 +504,7 @@ uint64_t fstStateNtrans(FstState *s, FstSlice *slice) { } Output fstStateFinalOutput(FstState *s, uint64_t version, FstSlice *slice, PackSizes sizes, uint64_t nTrans) { uint8_t oSizes = FST_GET_OUTPUT_PACK_SIZE(sizes); - if (oSizes == 0 || !fstStateIsFinalState(s)) { - return 0; - } + if (oSizes == 0 || !fstStateIsFinalState(s)) { return 0; } uint64_t at = FST_SLICE_LEN(slice) - 1 - fstStateNtransLen(s) - 1 // pack size - fstStateTotalTransSize(s, version, sizes, nTrans) - (nTrans * oSizes) - oSizes; @@ -545,9 +521,7 @@ uint64_t fstStateFindInput(FstState *s, FstNode *node, uint8_t b, bool *null) { uint8_t *data = fstSliceData(slice, &dlen); uint64_t i = data[at + b]; // uint64_t i = slice->data[slice->start + at + b]; - if (i >= node->nTrans) { - *null = true; - } + if (i >= node->nTrans) { *null = true; } return i; } else { uint64_t start = node->start - fstStateNtransLen(s) - 1 // pack size @@ -564,9 +538,7 @@ uint64_t fstStateFindInput(FstState *s, FstNode *node, uint8_t b, bool *null) { return node->nTrans - i - 1; // bug } } - if (i == len) { - *null = true; - } + if (i == len) { *null = true; } fstSliceDestroy(&t); } } @@ -575,9 +547,7 @@ uint64_t fstStateFindInput(FstState *s, FstNode *node, uint8_t b, bool *null) { FstNode *fstNodeCreate(int64_t version, CompiledAddr addr, FstSlice *slice) { FstNode *n = (FstNode *)malloc(sizeof(FstNode)); - if (n == NULL) { - return NULL; - } + if (n == NULL) { return NULL; } FstState st = fstStateCreateFrom(slice, addr); @@ -643,9 +613,7 @@ void fstNodeDestroy(FstNode *node) { } FstTransitions *fstNodeTransitions(FstNode *node) { FstTransitions *t = malloc(sizeof(FstTransitions)); - if (NULL == t) { - return NULL; - } + if (NULL == t) { return NULL; } FstRange range = {.start = 0, .end = FST_NODE_LEN(node)}; t->range = range; t->node = node; @@ -752,9 +720,7 @@ bool fstBuilderNodeCompileTo(FstBuilderNode *b, FstCountingWriter *wrt, Compiled FstBuilder *fstBuilderCreate(void *w, FstType ty) { FstBuilder *b = malloc(sizeof(FstBuilder)); - if (NULL == b) { - return b; - } + if (NULL == b) { return b; } b->wrt = fstCountingWriterCreate(w); b->unfinished = fstUnFinishedNodesCreate(); @@ -776,9 +742,7 @@ FstBuilder *fstBuilderCreate(void *w, FstType ty) { return b; } void fstBuilderDestroy(FstBuilder *b) { - if (b == NULL) { - return; - } + if (b == NULL) { return; } fstCountingWriterDestroy(b->wrt); fstUnFinishedNodesDestroy(b->unfinished); @@ -879,9 +843,7 @@ CompiledAddr fstBuilderCompile(FstBuilder *b, FstBuilderNode *bn) { fstBuilderNodeCompileTo(bn, b->wrt, b->lastAddr, startAddr); b->lastAddr = (CompiledAddr)(FST_WRITER_COUNT(b->wrt) - 1); - if (entry->state == NOTFOUND) { - FST_REGISTRY_CELL_INSERT(entry->cell, b->lastAddr); - } + if (entry->state == NOTFOUND) { FST_REGISTRY_CELL_INSERT(entry->cell, b->lastAddr); } fstRegistryEntryDestroy(entry); return b->lastAddr; @@ -924,9 +886,7 @@ FstSlice fstNodeAsSlice(FstNode *node) { FstLastTransition *fstLastTransitionCreate(uint8_t inp, Output out) { FstLastTransition *trn = malloc(sizeof(FstLastTransition)); - if (trn == NULL) { - return NULL; - } + if (trn == NULL) { return NULL; } trn->inp = inp; trn->out = out; @@ -936,9 +896,7 @@ FstLastTransition *fstLastTransitionCreate(uint8_t inp, Output out) { void fstLastTransitionDestroy(FstLastTransition *trn) { free(trn); } void fstBuilderNodeUnfinishedLastCompiled(FstBuilderNodeUnfinished *unNode, CompiledAddr addr) { FstLastTransition *trn = unNode->last; - if (trn == NULL) { - return; - } + if (trn == NULL) { return; } FstTransition t = {.inp = trn->inp, .out = trn->out, .addr = addr}; taosArrayPush(unNode->node->trans, &t); fstLastTransitionDestroy(trn); @@ -947,35 +905,27 @@ void fstBuilderNodeUnfinishedLastCompiled(FstBuilderNodeUnfinished *unNode, Comp } void fstBuilderNodeUnfinishedAddOutputPrefix(FstBuilderNodeUnfinished *unNode, Output out) { - if (FST_BUILDER_NODE_IS_FINAL(unNode->node)) { - unNode->node->finalOutput += out; - } + if (FST_BUILDER_NODE_IS_FINAL(unNode->node)) { unNode->node->finalOutput += out; } size_t sz = taosArrayGetSize(unNode->node->trans); for (size_t i = 0; i < sz; i++) { FstTransition *trn = taosArrayGet(unNode->node->trans, i); trn->out += out; } - if (unNode->last) { - unNode->last->out += out; - } + if (unNode->last) { unNode->last->out += out; } return; } Fst *fstCreate(FstSlice *slice) { int32_t slen; char * buf = fstSliceData(slice, &slen); - if (slen < 36) { - return NULL; - } + if (slen < 36) { return NULL; } uint64_t len = slen; uint64_t skip = 0; uint64_t version; taosDecodeFixedU64(buf, &version); skip += sizeof(version); - if (version == 0 || version > VERSION) { - return NULL; - } + if (version == 0 || version > VERSION) { return NULL; } uint64_t type; taosDecodeFixedU64(buf + skip, &type); @@ -994,14 +944,10 @@ Fst *fstCreate(FstSlice *slice) { taosDecodeFixedU64(buf + len, &fstLen); // TODO(validate root addr) Fst *fst = (Fst *)calloc(1, sizeof(Fst)); - if (fst == NULL) { - return NULL; - } + if (fst == NULL) { return NULL; } fst->meta = (FstMeta *)malloc(sizeof(FstMeta)); - if (NULL == fst->meta) { - goto FST_CREAT_FAILED; - } + if (NULL == fst->meta) { goto FST_CREAT_FAILED; } fst->meta->version = version; fst->meta->rootAddr = rootAddr; @@ -1039,9 +985,7 @@ bool fstGet(Fst *fst, FstSlice *b, Output *out) { for (uint32_t i = 0; i < len; i++) { uint8_t inp = data[i]; Output res = 0; - if (false == fstNodeFindInput(root, inp, &res)) { - return false; - } + if (false == fstNodeFindInput(root, inp, &res)) { return false; } FstTransition trn; fstNodeGetTransitionAt(root, res, &trn); @@ -1068,17 +1012,13 @@ bool fstGet(Fst *fst, FstSlice *b, Output *out) { } FstStreamBuilder *fstSearch(Fst *fst, AutomationCtx *ctx) { return fstStreamBuilderCreate(fst, ctx); } StreamWithState * streamBuilderIntoStream(FstStreamBuilder *sb) { - if (sb == NULL) { - return NULL; - } + if (sb == NULL) { return NULL; } return streamWithStateCreate(sb->fst, sb->aut, sb->min, sb->max); } FstStreamWithStateBuilder *fstSearchWithState(Fst *fst, AutomationCtx *ctx) { return fstStreamBuilderCreate(fst, ctx); } FstNode *fstGetRoot(Fst *fst) { - if (fst->root != NULL) { - return fst->root; - } + if (fst->root != NULL) { return fst->root; } CompiledAddr rAddr = fstGetRootAddr(fst); fst->root = fstGetNode(fst, rAddr); return fst->root; @@ -1104,18 +1044,14 @@ bool fstVerify(Fst *fst) { int32_t len; uint8_t *data = fstSliceData(fst->data, &len); TSCKSUM initSum = 0; - if (!taosCheckChecksumWhole(data, len)) { - return false; - } + if (!taosCheckChecksumWhole(data, len)) { return false; } return true; } // data bound function FstBoundWithData *fstBoundStateCreate(FstBound type, FstSlice *data) { FstBoundWithData *b = calloc(1, sizeof(FstBoundWithData)); - if (b == NULL) { - return NULL; - } + if (b == NULL) { return NULL; } if (data != NULL) { b->data = fstSliceCopy(data, data->start, data->end); @@ -1152,9 +1088,7 @@ void fstBoundDestroy(FstBoundWithData *bound) { free(bound); } StreamWithState *streamWithStateCreate( Fst *fst, AutomationCtx *automation, FstBoundWithData *min, FstBoundWithData *max) { StreamWithState *sws = calloc(1, sizeof(StreamWithState)); - if (sws == NULL) { - return NULL; - } + if (sws == NULL) { return NULL; } sws->fst = fst; sws->aut = automation; @@ -1170,9 +1104,7 @@ StreamWithState *streamWithStateCreate( return sws; } void streamWithStateDestroy(StreamWithState *sws) { - if (sws == NULL) { - return; - } + if (sws == NULL) { return; } taosArrayDestroy(sws->inp); taosArrayDestroyEx(sws->stack, streamStateDestroy); @@ -1195,7 +1127,6 @@ bool streamWithStateSeekMin(StreamWithState *sws, FstBoundWithData *min) { } FstSlice *key = NULL; bool inclusize = false; - ; if (min->type == Included) { key = &min->data; @@ -1239,9 +1170,7 @@ bool streamWithStateSeekMin(StreamWithState *sws, FstBoundWithData *min) { uint64_t i = 0; for (i = trans->range.start; i < trans->range.end; i++) { FstTransition trn; - if (fstNodeGetTransitionAt(node, i, &trn) && trn.inp > b) { - break; - } + if (fstNodeGetTransitionAt(node, i, &trn) && trn.inp > b) { break; } } StreamState s = {.node = node, .trans = i, .out = {.null = false, .out = out}, .autState = autState}; @@ -1289,9 +1218,7 @@ StreamWithStateResult *streamWithStateNextWith(StreamWithState *sws, StreamCallb while (taosArrayGetSize(sws->stack) > 0) { StreamState *p = (StreamState *)taosArrayPop(sws->stack); if (p->trans >= FST_NODE_LEN(p->node) || !automFuncs[aut->type].canMatch(aut, p->autState)) { - if (FST_NODE_ADDR(p->node) != fstGetRootAddr(sws->fst)) { - taosArrayPop(sws->inp); - } + if (FST_NODE_ADDR(p->node) != fstGetRootAddr(sws->fst)) { taosArrayPop(sws->inp); } streamStateDestroy(p); continue; } @@ -1308,9 +1235,7 @@ StreamWithStateResult *streamWithStateNextWith(StreamWithState *sws, StreamCallb if (FST_NODE_IS_FINAL(nextNode)) { // void *eofState = sws->aut->acceptEof(nextState); void *eofState = automFuncs[aut->type].acceptEof(aut, nextState); - if (eofState != NULL) { - isMatch = automFuncs[aut->type].isMatch(aut, eofState); - } + if (eofState != NULL) { isMatch = automFuncs[aut->type].isMatch(aut, eofState); } } StreamState s1 = {.node = p->node, .trans = p->trans + 1, .out = p->out, .autState = p->autState}; taosArrayPush(sws->stack, &s1); @@ -1320,9 +1245,7 @@ StreamWithStateResult *streamWithStateNextWith(StreamWithState *sws, StreamCallb size_t isz = taosArrayGetSize(sws->inp); uint8_t *buf = (uint8_t *)malloc(isz * sizeof(uint8_t)); - for (uint32_t i = 0; i < isz; i++) { - buf[i] = *(uint8_t *)taosArrayGet(sws->inp, i); - } + for (uint32_t i = 0; i < isz; i++) { buf[i] = *(uint8_t *)taosArrayGet(sws->inp, i); } FstSlice slice = fstSliceCreate(buf, taosArrayGetSize(sws->inp)); if (fstBoundWithDataExceededBy(sws->endAt, &slice)) { taosArrayDestroyEx(sws->stack, streamStateDestroy); @@ -1351,9 +1274,7 @@ StreamWithStateResult *streamWithStateNextWith(StreamWithState *sws, StreamCallb StreamWithStateResult *swsResultCreate(FstSlice *data, FstOutput fOut, void *state) { StreamWithStateResult *result = calloc(1, sizeof(StreamWithStateResult)); - if (result == NULL) { - return NULL; - } + if (result == NULL) { return NULL; } result->data = fstSliceCopy(data, 0, FST_SLICE_LEN(data) - 1); result->out = fOut; @@ -1362,9 +1283,7 @@ StreamWithStateResult *swsResultCreate(FstSlice *data, FstOutput fOut, void *sta return result; } void swsResultDestroy(StreamWithStateResult *result) { - if (NULL == result) { - return; - } + if (NULL == result) { return; } fstSliceDestroy(&result->data); startWithStateValueDestroy(result->state); @@ -1372,9 +1291,7 @@ void swsResultDestroy(StreamWithStateResult *result) { } void streamStateDestroy(void *s) { - if (NULL == s) { - return; - } + if (NULL == s) { return; } StreamState *ss = (StreamState *)s; fstNodeDestroy(ss->node); @@ -1383,9 +1300,7 @@ void streamStateDestroy(void *s) { FstStreamBuilder *fstStreamBuilderCreate(Fst *fst, AutomationCtx *aut) { FstStreamBuilder *b = calloc(1, sizeof(FstStreamBuilder)); - if (NULL == b) { - return NULL; - } + if (NULL == b) { return NULL; } b->fst = fst; b->aut = aut; @@ -1401,9 +1316,7 @@ void fstStreamBuilderDestroy(FstStreamBuilder *b) { free(b); } FstStreamBuilder *fstStreamBuilderRange(FstStreamBuilder *b, FstSlice *val, RangeType type) { - if (b == NULL) { - return NULL; - } + if (b == NULL) { return NULL; } if (type == GE) { b->min->type = Included; diff --git a/source/libs/index/src/index_fst_counting_writer.c b/source/libs/index/src/index_fst_counting_writer.c index 824021c9e3..6438654ee3 100644 --- a/source/libs/index/src/index_fst_counting_writer.c +++ b/source/libs/index/src/index_fst_counting_writer.c @@ -18,9 +18,7 @@ #include "tutil.h" static int writeCtxDoWrite(WriterCtx *ctx, uint8_t *buf, int len) { - if (ctx->offset + len > ctx->limit) { - return -1; - } + if (ctx->offset + len > ctx->limit) { return -1; } if (ctx->type == TFile) { assert(len == tfWrite(ctx->file.fd, buf, len)); @@ -53,9 +51,7 @@ static int writeCtxDoFlush(WriterCtx *ctx) { WriterCtx *writerCtxCreate(WriterType type, const char *path, bool readOnly, int32_t capacity) { WriterCtx *ctx = calloc(1, sizeof(WriterCtx)); - if (ctx == NULL) { - return NULL; - } + if (ctx == NULL) { return NULL; } ctx->type = type; if (ctx->type == TFile) { @@ -67,8 +63,8 @@ WriterCtx *writerCtxCreate(WriterType type, const char *path, bool readOnly, int ctx->file.fd = tfOpenReadWrite(tmpFile); } if (ctx->file.fd < 0) { - goto END; indexError("open file error %d", errno); + goto END; } } else if (ctx->type == TMemory) { ctx->mem.buf = calloc(1, sizeof(char) * capacity); @@ -83,9 +79,7 @@ WriterCtx *writerCtxCreate(WriterType type, const char *path, bool readOnly, int return ctx; END: - if (ctx->type == TMemory) { - free(ctx->mem.buf); - } + if (ctx->type == TMemory) { free(ctx->mem.buf); } free(ctx); } void writerCtxDestroy(WriterCtx *ctx) { @@ -99,9 +93,7 @@ void writerCtxDestroy(WriterCtx *ctx) { FstCountingWriter *fstCountingWriterCreate(void *wrt) { FstCountingWriter *cw = calloc(1, sizeof(FstCountingWriter)); - if (cw == NULL) { - return NULL; - } + if (cw == NULL) { return NULL; } cw->wrt = wrt; //(void *)(writerCtxCreate(TFile, readOnly)); @@ -115,9 +107,7 @@ void fstCountingWriterDestroy(FstCountingWriter *cw) { } int fstCountingWriterWrite(FstCountingWriter *write, uint8_t *buf, uint32_t len) { - if (write == NULL) { - return 0; - } + if (write == NULL) { return 0; } // update checksum // write data to file/socket or mem WriterCtx *ctx = write->wrt; @@ -128,9 +118,7 @@ int fstCountingWriterWrite(FstCountingWriter *write, uint8_t *buf, uint32_t len) return len; } int fstCountingWriterRead(FstCountingWriter *write, uint8_t *buf, uint32_t len) { - if (write == NULL) { - return 0; - } + if (write == NULL) { return 0; } WriterCtx *ctx = write->wrt; int nRead = ctx->read(ctx, buf, len); // assert(nRead == len); diff --git a/source/libs/index/src/index_fst_registry.c b/source/libs/index/src/index_fst_registry.c index 7bb2e72230..dffbc820c8 100644 --- a/source/libs/index/src/index_fst_registry.c +++ b/source/libs/index/src/index_fst_registry.c @@ -34,9 +34,7 @@ uint64_t fstRegistryHash(FstRegistry *registry, FstBuilderNode *bNode) { } static void fstRegistryCellSwap(SArray *arr, uint32_t a, uint32_t b) { size_t sz = taosArrayGetSize(arr); - if (a >= sz || b >= sz) { - return; - } + if (a >= sz || b >= sz) { return; } FstRegistryCell *cell1 = (FstRegistryCell *)taosArrayGet(arr, a); FstRegistryCell *cell2 = (FstRegistryCell *)taosArrayGet(arr, b); @@ -53,9 +51,7 @@ static void fstRegistryCellSwap(SArray *arr, uint32_t a, uint32_t b) { static void fstRegistryCellPromote(SArray *arr, uint32_t start, uint32_t end) { size_t sz = taosArrayGetSize(arr); - if (start >= sz && end >= sz) { - return; - } + if (start >= sz && end >= sz) { return; } assert(start >= end); @@ -69,9 +65,7 @@ static void fstRegistryCellPromote(SArray *arr, uint32_t start, uint32_t end) { FstRegistry *fstRegistryCreate(uint64_t tableSize, uint64_t mruSize) { FstRegistry *registry = malloc(sizeof(FstRegistry)); - if (registry == NULL) { - return NULL; - } + if (registry == NULL) { return NULL; } uint64_t nCells = tableSize * mruSize; SArray * tb = (SArray *)taosArrayInit(nCells, sizeof(FstRegistryCell)); @@ -92,9 +86,7 @@ FstRegistry *fstRegistryCreate(uint64_t tableSize, uint64_t mruSize) { } void fstRegistryDestroy(FstRegistry *registry) { - if (registry == NULL) { - return; - } + if (registry == NULL) { return; } SArray *tb = registry->table; size_t sz = taosArrayGetSize(tb); @@ -107,9 +99,7 @@ void fstRegistryDestroy(FstRegistry *registry) { } FstRegistryEntry *fstRegistryGetEntry(FstRegistry *registry, FstBuilderNode *bNode) { - if (taosArrayGetSize(registry->table) <= 0) { - return NULL; - } + if (taosArrayGetSize(registry->table) <= 0) { return NULL; } uint64_t bucket = fstRegistryHash(registry, bNode); uint64_t start = registry->mruSize * bucket; uint64_t end = start + registry->mruSize; diff --git a/source/libs/index/src/index_fst_util.c b/source/libs/index/src/index_fst_util.c index 597f7cc61a..ad5559f835 100644 --- a/source/libs/index/src/index_fst_util.c +++ b/source/libs/index/src/index_fst_util.c @@ -64,6 +64,7 @@ uint8_t packSize(uint64_t n) { uint64_t unpackUint64(uint8_t *ch, uint8_t sz) { uint64_t n = 0; for (uint8_t i = 0; i < sz; i++) { + // n = n | (ch[i] << (8 * i)); } return n; @@ -133,9 +134,7 @@ bool fstSliceIsEmpty(FstSlice *s) { return s->str == NULL || s->str->len == 0 || uint8_t *fstSliceData(FstSlice *s, int32_t *size) { FstString *str = s->str; - if (size != NULL) { - *size = s->end - s->start + 1; - } + if (size != NULL) { *size = s->end - s->start + 1; } return str->data + s->start; } void fstSliceDestroy(FstSlice *s) { diff --git a/source/libs/index/src/index_tfile.c b/source/libs/index/src/index_tfile.c index 19e8ff6750..1a1ac0776c 100644 --- a/source/libs/index/src/index_tfile.c +++ b/source/libs/index/src/index_tfile.c @@ -22,14 +22,16 @@ #include "index_util.h" #include "taosdef.h" +static FORCE_INLINE int tfileWriteHeader(TFileWriter *writer) {} static FORCE_INLINE int tfileReadLoadHeader(TFileReader *reader) { // TODO simple tfile header later - char buf[TFILE_HADER_PRE_SIZE]; - char * p = buf; - TFileReadHeader *header = &reader->header; - int64_t nread = reader->ctx->read(reader->ctx, buf, TFILE_HADER_PRE_SIZE); + char buf[TFILE_HADER_PRE_SIZE]; + char *p = buf; + + int64_t nread = reader->ctx->read(reader->ctx, buf, TFILE_HADER_PRE_SIZE); assert(nread == TFILE_HADER_PRE_SIZE); + TFileHeader *header = &reader->header; memcpy(&header->suid, p, sizeof(header->suid)); p += sizeof(header->suid); @@ -47,9 +49,7 @@ static FORCE_INLINE int tfileReadLoadHeader(TFileReader *reader) { }; static int tfileGetFileList(const char *path, SArray *result) { DIR *dir = opendir(path); - if (NULL == dir) { - return -1; - } + if (NULL == dir) { return -1; } struct dirent *entry; while ((entry = readdir(dir)) != NULL) { @@ -68,8 +68,10 @@ static void tfileDestroyFileName(void *elem) { static int tfileCompare(const void *a, const void *b) { const char *aName = *(char **)a; const char *bName = *(char **)b; - size_t aLen = strlen(aName); - size_t bLen = strlen(bName); + + size_t aLen = strlen(aName); + size_t bLen = strlen(bName); + return strncmp(aName, bName, aLen > bLen ? aLen : bLen); } // tfile name suid-colId-version.tindex @@ -92,9 +94,7 @@ static void tfileSerialCacheKey(TFileCacheKey *key, char *buf) { TFileCache *tfileCacheCreate(const char *path) { TFileCache *tcache = calloc(1, sizeof(TFileCache)); - if (tcache == NULL) { - return NULL; - } + if (tcache == NULL) { return NULL; } tcache->tableCache = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); tcache->capacity = 64; @@ -102,15 +102,14 @@ TFileCache *tfileCacheCreate(const char *path) { SArray *files = taosArrayInit(4, sizeof(void *)); tfileGetFileList(path, files); taosArraySort(files, tfileCompare); + uint64_t suid; + int32_t colId, version; for (size_t i = 0; i < taosArrayGetSize(files); i++) { - char * file = taosArrayGetP(files, i); - uint64_t suid; - int colId, version; - if (0 != tfileParseFileName(file, &suid, &colId, &version)) { - goto End; + char *file = taosArrayGetP(files, i); + if (0 != tfileParseFileName(file, &suid, (int *)&colId, (int *)&version)) { + indexInfo("try parse invalid file: %s, skip it", file); continue; } - WriterCtx *wc = writerCtxCreate(TFile, file, true, 1024 * 64); if (wc == NULL) { indexError("failed to open index: %s", file); @@ -122,6 +121,15 @@ TFileCache *tfileCacheCreate(const char *path) { indexError("failed to load index header, index Id: %s", file); goto End; } + // loader fst and validate it + + TFileHeader * header = &reader->header; + TFileCacheKey key = { + .suid = header->suid, .version = header->version, .colName = header->colName, .nColName = strlen(header->colName), .colType = header->colType}; + + char buf[128] = {0}; + tfileSerialCacheKey(&key, buf); + taosHashPut(tcache->tableCache, buf, strlen(buf), &reader, sizeof(void *)); } taosArrayDestroyEx(files, tfileDestroyFileName); return tcache; @@ -131,16 +139,14 @@ End: return NULL; } void tfileCacheDestroy(TFileCache *tcache) { - if (tcache == NULL) { - return; - } + if (tcache == NULL) { return; } // free table cache TFileReader **reader = taosHashIterate(tcache->tableCache, NULL); while (reader) { TFileReader *p = *reader; - indexInfo("drop table cache suid: %" PRIu64 ", colName: %s, colType: %d", p->header.suid, p->header.colName, - p->header.colType); + indexInfo("drop table cache suid: %" PRIu64 ", colName: %s, colType: %d", p->header.suid, p->header.colName, p->header.colType); + TFileReaderDestroy(p); reader = taosHashIterate(tcache->tableCache, reader); } @@ -163,45 +169,67 @@ void tfileCachePut(TFileCache *tcache, TFileCacheKey *key, TFileReader *reader) TFileReader *tfileReaderCreate(WriterCtx *ctx) { TFileReader *reader = calloc(1, sizeof(TFileReader)); - if (reader == NULL) { - return NULL; - } - reader->ctx = ctx; + if (reader == NULL) { return NULL; } + // T_REF_INC(reader); + reader->ctx = ctx; return reader; } void TFileReaderDestroy(TFileReader *reader) { - if (reader == NULL) { - return; - } + if (reader == NULL) { return; } // T_REF_INC(reader); writerCtxDestroy(reader->ctx); free(reader); } -TFileWriter *tfileWriterCreate(const char *suid, const char *colName); -void tfileWriterDestroy(TFileWriter *tw); +TFileWriter *tfileWriterCreate(WriterCtx *ctx, TFileHeader *header) { + // char pathBuf[128] = {0}; + // sprintf(pathBuf, "%s/% " PRIu64 "-%d-%d.tindex", path, suid, colId, version); + // TFileHeader header = {.suid = suid, .version = version, .colName = {0}, colType = colType}; + // memcpy(header.colName, ); + + char buf[TFILE_HADER_PRE_SIZE]; + int len = TFILE_HADER_PRE_SIZE; + if (len != ctx->write(ctx, buf, len)) { + indexError("index: %" PRIu64 " failed to write header info", header->suid); + return NULL; + } + TFileWriter *tw = calloc(1, sizeof(TFileWriter)); + if (tw == NULL) { + indexError("index: % " PRIu64 " failed to write header info"); + return NULL; + } + return tw; +} +void tfileWriterDestroy(TFileWriter *tw) { + if (tw == NULL) { return; } + + writerCtxDestroy(tw->ctx); + free(tw); +} IndexTFile *indexTFileCreate(const char *path) { IndexTFile *tfile = calloc(1, sizeof(IndexTFile)); - tfile->cache = tfileCacheCreate(path); + if (tfile == NULL) { return NULL; } + tfile->cache = tfileCacheCreate(path); return tfile; } void IndexTFileDestroy(IndexTFile *tfile) { free(tfile); } int indexTFileSearch(void *tfile, SIndexTermQuery *query, SArray *result) { IndexTFile *pTfile = (IndexTFile *)tfile; + if (pTfile == NULL) { return -1; } SIndexTerm * term = query->term; - TFileCacheKey key = { - .suid = term->suid, .colType = term->colType, .version = 0, .colName = term->colName, .nColName = term->nColName}; + TFileCacheKey key = {.suid = term->suid, .colType = term->colType, .version = 0, .colName = term->colName, .nColName = term->nColName}; + TFileReader *reader = tfileCacheGet(pTfile->cache, &key); + return 0; } int indexTFilePut(void *tfile, SIndexTerm *term, uint64_t uid) { - TFileWriterOpt wOpt = { - .suid = term->suid, .colType = term->colType, .colName = term->colName, .nColName = term->nColName, .version = 1}; + TFileWriterOpt wOpt = {.suid = term->suid, .colType = term->colType, .colName = term->colName, .nColName = term->nColName, .version = 1}; return 0; }