enh(index): support index filter
This commit is contained in:
parent
5ed7b44b16
commit
fb7da7e566
|
@ -56,7 +56,6 @@ typedef struct CacheTerm {
|
||||||
int8_t colType;
|
int8_t colType;
|
||||||
|
|
||||||
SIndexOperOnColumn operaType;
|
SIndexOperOnColumn operaType;
|
||||||
int8_t qType; // query type
|
|
||||||
} CacheTerm;
|
} CacheTerm;
|
||||||
//
|
//
|
||||||
|
|
||||||
|
|
|
@ -52,7 +52,6 @@ typedef struct FstRange {
|
||||||
uint64_t end;
|
uint64_t end;
|
||||||
} FstRange;
|
} FstRange;
|
||||||
|
|
||||||
typedef enum { GE, GT, LE, LT } RangeType;
|
|
||||||
typedef enum { OneTransNext, OneTrans, AnyTrans, EmptyFinal } State;
|
typedef enum { OneTransNext, OneTrans, AnyTrans, EmptyFinal } State;
|
||||||
typedef enum { Ordered, OutOfOrdered, DuplicateKey } OrderType;
|
typedef enum { Ordered, OutOfOrdered, DuplicateKey } OrderType;
|
||||||
|
|
||||||
|
@ -174,9 +173,9 @@ Output fstStateFinalOutput(FstState* state, uint64_t version, FstSlice* date,
|
||||||
uint64_t fstStateFindInput(FstState* state, FstNode* node, uint8_t b, bool* null);
|
uint64_t fstStateFindInput(FstState* state, FstNode* node, uint8_t b, bool* null);
|
||||||
|
|
||||||
#define FST_STATE_ONE_TRNAS_NEXT(node) (node->state.state == OneTransNext)
|
#define FST_STATE_ONE_TRNAS_NEXT(node) (node->state.state == OneTransNext)
|
||||||
#define FST_STATE_ONE_TRNAS(node) (node->state.state == OneTrans)
|
#define FST_STATE_ONE_TRNAS(node) (node->state.state == OneTrans)
|
||||||
#define FST_STATE_ANY_TRANS(node) (node->state.state == AnyTrans)
|
#define FST_STATE_ANY_TRANS(node) (node->state.state == AnyTrans)
|
||||||
#define FST_STATE_EMPTY_FINAL(node) (node->state.state == EmptyFinal)
|
#define FST_STATE_EMPTY_FINAL(node) (node->state.state == EmptyFinal)
|
||||||
|
|
||||||
typedef struct FstLastTransition {
|
typedef struct FstLastTransition {
|
||||||
uint8_t inp;
|
uint8_t inp;
|
||||||
|
|
|
@ -34,6 +34,7 @@
|
||||||
extern "C" {
|
extern "C" {
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
typedef enum { LT, LE, GT, GE } RangeType;
|
||||||
typedef enum { kTypeValue, kTypeDeletion } STermValueType;
|
typedef enum { kTypeValue, kTypeDeletion } STermValueType;
|
||||||
|
|
||||||
typedef struct SIndexStat {
|
typedef struct SIndexStat {
|
||||||
|
|
|
@ -43,6 +43,20 @@ static int32_t cacheSearchLessEqual(void* cache, CacheTerm* ct, SIdxTempResult*
|
||||||
static int32_t cacheSearchGreaterThan(void* cache, CacheTerm* ct, SIdxTempResult* tr, STermValueType* s);
|
static int32_t cacheSearchGreaterThan(void* cache, CacheTerm* ct, SIdxTempResult* tr, STermValueType* s);
|
||||||
static int32_t cacheSearchGreaterEqual(void* cache, CacheTerm* ct, SIdxTempResult* tr, STermValueType* s);
|
static int32_t cacheSearchGreaterEqual(void* cache, CacheTerm* ct, SIdxTempResult* tr, STermValueType* s);
|
||||||
static int32_t cacheSearchRange(void* cache, CacheTerm* ct, SIdxTempResult* tr, STermValueType* s);
|
static int32_t cacheSearchRange(void* cache, CacheTerm* ct, SIdxTempResult* tr, STermValueType* s);
|
||||||
|
/*comm func of compare, used in (LE/LT/GE/GT compare)*/
|
||||||
|
static int32_t cacheSearchCompareFunc(void* cache, CacheTerm* ct, SIdxTempResult* tr, STermValueType* s,
|
||||||
|
RangeType type);
|
||||||
|
|
||||||
|
typedef enum { MATCH, CONTINUE, BREAK } TExeCond;
|
||||||
|
typedef TExeCond (*_cache_range_compare)(void* a, void* b, int8_t type);
|
||||||
|
|
||||||
|
static TExeCond tCompareLessThan(void* a, void* b, int8_t type) { return MATCH; }
|
||||||
|
static TExeCond tCompareLessEqual(void* a, void* b, int8_t type) { return MATCH; }
|
||||||
|
static TExeCond tCompareGreaterThan(void* a, void* b, int8_t type) { return MATCH; }
|
||||||
|
static TExeCond tCompareGreaterEqual(void* a, void* b, int8_t type) { return MATCH; }
|
||||||
|
|
||||||
|
static TExeCond (*rangeCompare[])(void* a, void* b, int8_t type) = {tCompareLessThan, tCompareLessEqual,
|
||||||
|
tCompareGreaterThan, tCompareGreaterEqual};
|
||||||
|
|
||||||
static int32_t (*cacheSearch[])(void* cache, CacheTerm* ct, SIdxTempResult* tr, STermValueType* s) = {
|
static int32_t (*cacheSearch[])(void* cache, CacheTerm* ct, SIdxTempResult* tr, STermValueType* s) = {
|
||||||
cacheSearchTerm, cacheSearchPrefix, cacheSearchSuffix, cacheSearchRegex, cacheSearchLessThan,
|
cacheSearchTerm, cacheSearchPrefix, cacheSearchSuffix, cacheSearchRegex, cacheSearchLessThan,
|
||||||
|
@ -93,21 +107,51 @@ static int32_t cacheSearchRegex(void* cache, CacheTerm* ct, SIdxTempResult* tr,
|
||||||
// impl later
|
// impl later
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
static int32_t cacheSearchCompareFunc(void* cache, CacheTerm* ct, SIdxTempResult* tr, STermValueType* s,
|
||||||
|
RangeType type) {
|
||||||
|
if (cache == NULL) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
_cache_range_compare cmpFn = rangeCompare[type];
|
||||||
|
|
||||||
|
MemTable* mem = cache;
|
||||||
|
char* key = indexCacheTermGet(ct);
|
||||||
|
|
||||||
|
SSkipListIterator* iter = tSkipListCreateIter(mem->mem);
|
||||||
|
while (tSkipListIterNext(iter)) {
|
||||||
|
SSkipListNode* node = tSkipListIterGet(iter);
|
||||||
|
if (node == NULL) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
CacheTerm* c = (CacheTerm*)SL_GET_NODE_DATA(node);
|
||||||
|
TExeCond cond = cmpFn(c->colVal, ct->colVal, ct->colType);
|
||||||
|
if (cond == MATCH) {
|
||||||
|
if (c->operaType == ADD_VALUE) {
|
||||||
|
INDEX_MERGE_ADD_DEL(tr->deled, tr->added, c->uid)
|
||||||
|
// taosArrayPush(result, &c->uid);
|
||||||
|
*s = kTypeValue;
|
||||||
|
} else if (c->operaType == DEL_VALUE) {
|
||||||
|
INDEX_MERGE_ADD_DEL(tr->added, tr->deled, c->uid)
|
||||||
|
}
|
||||||
|
} else if (cond == CONTINUE) {
|
||||||
|
} else if (cond == BREAK) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
tSkipListDestroyIter(iter);
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
static int32_t cacheSearchLessThan(void* cache, CacheTerm* ct, SIdxTempResult* tr, STermValueType* s) {
|
static int32_t cacheSearchLessThan(void* cache, CacheTerm* ct, SIdxTempResult* tr, STermValueType* s) {
|
||||||
// impl later
|
return cacheSearchCompareFunc(cache, ct, tr, s, LT);
|
||||||
return 0;
|
|
||||||
}
|
}
|
||||||
static int32_t cacheSearchLessEqual(void* cache, CacheTerm* ct, SIdxTempResult* tr, STermValueType* s) {
|
static int32_t cacheSearchLessEqual(void* cache, CacheTerm* ct, SIdxTempResult* tr, STermValueType* s) {
|
||||||
// impl later
|
return cacheSearchCompareFunc(cache, ct, tr, s, LE);
|
||||||
return 0;
|
|
||||||
}
|
}
|
||||||
static int32_t cacheSearchGreaterThan(void* cache, CacheTerm* ct, SIdxTempResult* tr, STermValueType* s) {
|
static int32_t cacheSearchGreaterThan(void* cache, CacheTerm* ct, SIdxTempResult* tr, STermValueType* s) {
|
||||||
// impl later
|
return cacheSearchCompareFunc(cache, ct, tr, s, GT);
|
||||||
return 0;
|
|
||||||
}
|
}
|
||||||
static int32_t cacheSearchGreaterEqual(void* cache, CacheTerm* ct, SIdxTempResult* tr, STermValueType* s) {
|
static int32_t cacheSearchGreaterEqual(void* cache, CacheTerm* ct, SIdxTempResult* tr, STermValueType* s) {
|
||||||
// impl later
|
return cacheSearchCompareFunc(cache, ct, tr, s, GE);
|
||||||
return 0;
|
|
||||||
}
|
}
|
||||||
static int32_t cacheSearchRange(void* cache, CacheTerm* ct, SIdxTempResult* tr, STermValueType* s) {
|
static int32_t cacheSearchRange(void* cache, CacheTerm* ct, SIdxTempResult* tr, STermValueType* s) {
|
||||||
// impl later
|
// impl later
|
||||||
|
|
|
@ -70,6 +70,8 @@ static int32_t tfSearchGreaterThan(void* reader, SIndexTerm* tem, SIdxTempResult
|
||||||
static int32_t tfSearchGreaterEqual(void* reader, SIndexTerm* tem, SIdxTempResult* tr);
|
static int32_t tfSearchGreaterEqual(void* reader, SIndexTerm* tem, SIdxTempResult* tr);
|
||||||
static int32_t tfSearchRange(void* reader, SIndexTerm* tem, SIdxTempResult* tr);
|
static int32_t tfSearchRange(void* reader, SIndexTerm* tem, SIdxTempResult* tr);
|
||||||
|
|
||||||
|
static int32_t tfSearchCompareFunc(void* reader, SIndexTerm* tem, SIdxTempResult* tr, RangeType ctype);
|
||||||
|
|
||||||
static int32_t (*tfSearch[])(void* reader, SIndexTerm* tem, SIdxTempResult* tr) = {
|
static int32_t (*tfSearch[])(void* reader, SIndexTerm* tem, SIdxTempResult* tr) = {
|
||||||
tfSearchTerm, tfSearchPrefix, tfSearchSuffix, tfSearchRegex, tfSearchLessThan,
|
tfSearchTerm, tfSearchPrefix, tfSearchSuffix, tfSearchRegex, tfSearchLessThan,
|
||||||
tfSearchLessEqual, tfSearchGreaterThan, tfSearchGreaterEqual, tfSearchRange};
|
tfSearchLessEqual, tfSearchGreaterThan, tfSearchGreaterEqual, tfSearchRange};
|
||||||
|
@ -304,21 +306,46 @@ static int32_t tfSearchRegex(void* reader, SIndexTerm* tem, SIdxTempResult* tr)
|
||||||
fstSliceDestroy(&key);
|
fstSliceDestroy(&key);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t tfSearchCompareFunc(void* reader, SIndexTerm* tem, SIdxTempResult* tr, RangeType type) {
|
||||||
|
bool hasJson = INDEX_TYPE_CONTAIN_EXTERN_TYPE(tem->colType, TSDB_DATA_TYPE_JSON);
|
||||||
|
int ret = 0;
|
||||||
|
char* p = tem->colVal;
|
||||||
|
uint64_t sz = tem->nColVal;
|
||||||
|
if (hasJson) {
|
||||||
|
p = indexPackJsonData(tem);
|
||||||
|
sz = strlen(p);
|
||||||
|
}
|
||||||
|
SArray* offsets = taosArrayInit(16, sizeof(uint64_t));
|
||||||
|
|
||||||
|
AutomationCtx* ctx = automCtxCreate((void*)p, AUTOMATION_ALWAYS);
|
||||||
|
FstStreamBuilder* sb = fstSearch(((TFileReader*)reader)->fst, ctx);
|
||||||
|
|
||||||
|
FstSlice h = fstSliceCreate((uint8_t*)p, sz);
|
||||||
|
fstStreamBuilderSetRange(sb, &h, type);
|
||||||
|
fstSliceDestroy(&h);
|
||||||
|
|
||||||
|
StreamWithState* st = streamBuilderIntoStream(sb);
|
||||||
|
StreamWithStateResult* rt = NULL;
|
||||||
|
while ((rt = streamWithStateNextWith(st, NULL)) != NULL) {
|
||||||
|
taosArrayPush(offsets, &(rt->out.out));
|
||||||
|
swsResultDestroy(rt);
|
||||||
|
}
|
||||||
|
streamWithStateDestroy(st);
|
||||||
|
fstStreamBuilderDestroy(sb);
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
static int32_t tfSearchLessThan(void* reader, SIndexTerm* tem, SIdxTempResult* tr) {
|
static int32_t tfSearchLessThan(void* reader, SIndexTerm* tem, SIdxTempResult* tr) {
|
||||||
// impl later
|
return tfSearchCompareFunc(reader, tem, tr, LT);
|
||||||
return 0;
|
|
||||||
}
|
}
|
||||||
static int32_t tfSearchLessEqual(void* reader, SIndexTerm* tem, SIdxTempResult* tr) {
|
static int32_t tfSearchLessEqual(void* reader, SIndexTerm* tem, SIdxTempResult* tr) {
|
||||||
// impl later
|
return tfSearchCompareFunc(reader, tem, tr, LE);
|
||||||
return 0;
|
|
||||||
}
|
}
|
||||||
static int32_t tfSearchGreaterThan(void* reader, SIndexTerm* tem, SIdxTempResult* tr) {
|
static int32_t tfSearchGreaterThan(void* reader, SIndexTerm* tem, SIdxTempResult* tr) {
|
||||||
// impl later
|
return tfSearchCompareFunc(reader, tem, tr, GT);
|
||||||
return 0;
|
|
||||||
}
|
}
|
||||||
static int32_t tfSearchGreaterEqual(void* reader, SIndexTerm* tem, SIdxTempResult* tr) {
|
static int32_t tfSearchGreaterEqual(void* reader, SIndexTerm* tem, SIdxTempResult* tr) {
|
||||||
// impl later
|
return tfSearchCompareFunc(reader, tem, tr, GE);
|
||||||
return 0;
|
|
||||||
}
|
}
|
||||||
static int32_t tfSearchRange(void* reader, SIndexTerm* tem, SIdxTempResult* tr) {
|
static int32_t tfSearchRange(void* reader, SIndexTerm* tem, SIdxTempResult* tr) {
|
||||||
bool hasJson = INDEX_TYPE_CONTAIN_EXTERN_TYPE(tem->colType, TSDB_DATA_TYPE_JSON);
|
bool hasJson = INDEX_TYPE_CONTAIN_EXTERN_TYPE(tem->colType, TSDB_DATA_TYPE_JSON);
|
||||||
|
|
Loading…
Reference in New Issue