Merge pull request #12291 from taosdata/enh/supportIdxFilter
enh(index): support numberic and json filter
This commit is contained in:
commit
5de1a22656
|
@ -20,11 +20,23 @@
|
|||
extern "C" {
|
||||
#endif
|
||||
|
||||
#include "indexInt.h"
|
||||
#include "tcompare.h"
|
||||
|
||||
extern char JSON_COLUMN[];
|
||||
extern char JSON_VALUE_DELIM;
|
||||
|
||||
char* indexPackJsonData(SIndexTerm* itm);
|
||||
char* indexPackJsonDataPrefix(SIndexTerm* itm, int32_t* skip);
|
||||
|
||||
typedef enum { MATCH, CONTINUE, BREAK } TExeCond;
|
||||
|
||||
typedef TExeCond (*_cache_range_compare)(void* a, void* b, int8_t type);
|
||||
|
||||
TExeCond tDoCommpare(__compar_fn_t func, int8_t comType, void* a, void* b);
|
||||
|
||||
_cache_range_compare indexGetCompare(RangeType ty);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
|
|
@ -60,50 +60,6 @@ static int32_t cacheSearchRange_JSON(void* cache, SIndexTerm* ct, SIdxTempResult
|
|||
static int32_t cacheSearchCompareFunc_JSON(void* cache, SIndexTerm* term, SIdxTempResult* tr, STermValueType* s,
|
||||
RangeType type);
|
||||
|
||||
typedef enum { MATCH, CONTINUE, BREAK } TExeCond;
|
||||
typedef TExeCond (*_cache_range_compare)(void* a, void* b, int8_t type);
|
||||
|
||||
static TExeCond tDoCommpare(__compar_fn_t func, int8_t comType, void* a, void* b) {
|
||||
// optime later
|
||||
int32_t ret = func(a, b);
|
||||
switch (comType) {
|
||||
case QUERY_LESS_THAN: {
|
||||
if (ret < 0) return MATCH;
|
||||
} break;
|
||||
case QUERY_LESS_EQUAL: {
|
||||
if (ret <= 0) return MATCH;
|
||||
break;
|
||||
}
|
||||
case QUERY_GREATER_THAN: {
|
||||
if (ret > 0) return MATCH;
|
||||
break;
|
||||
}
|
||||
case QUERY_GREATER_EQUAL: {
|
||||
if (ret >= 0) return MATCH;
|
||||
}
|
||||
}
|
||||
return CONTINUE;
|
||||
}
|
||||
static TExeCond tCompareLessThan(void* a, void* b, int8_t type) {
|
||||
__compar_fn_t func = getComparFunc(type, 0);
|
||||
return tDoCommpare(func, QUERY_LESS_THAN, a, b);
|
||||
}
|
||||
static TExeCond tCompareLessEqual(void* a, void* b, int8_t type) {
|
||||
__compar_fn_t func = getComparFunc(type, 0);
|
||||
return tDoCommpare(func, QUERY_LESS_EQUAL, a, b);
|
||||
}
|
||||
static TExeCond tCompareGreaterThan(void* a, void* b, int8_t type) {
|
||||
__compar_fn_t func = getComparFunc(type, 0);
|
||||
return tDoCommpare(func, QUERY_GREATER_THAN, a, b);
|
||||
}
|
||||
static TExeCond tCompareGreaterEqual(void* a, void* b, int8_t type) {
|
||||
__compar_fn_t func = getComparFunc(type, 0);
|
||||
return tDoCommpare(func, QUERY_GREATER_EQUAL, a, b);
|
||||
}
|
||||
|
||||
static TExeCond (*rangeCompare[])(void* a, void* b, int8_t type) = {tCompareLessThan, tCompareLessEqual,
|
||||
tCompareGreaterThan, tCompareGreaterEqual};
|
||||
|
||||
static int32_t (*cacheSearch[][QUERY_MAX])(void* cache, SIndexTerm* ct, SIdxTempResult* tr, STermValueType* s) = {
|
||||
{cacheSearchTerm, cacheSearchPrefix, cacheSearchSuffix, cacheSearchRegex, cacheSearchLessThan, cacheSearchLessEqual,
|
||||
cacheSearchGreaterThan, cacheSearchGreaterEqual, cacheSearchRange},
|
||||
|
@ -169,7 +125,7 @@ static int32_t cacheSearchCompareFunc(void* cache, SIndexTerm* term, SIdxTempRes
|
|||
return 0;
|
||||
}
|
||||
|
||||
_cache_range_compare cmpFn = rangeCompare[type];
|
||||
_cache_range_compare cmpFn = indexGetCompare(type);
|
||||
|
||||
MemTable* mem = cache;
|
||||
IndexCache* pCache = mem->pCache;
|
||||
|
@ -295,7 +251,7 @@ static int32_t cacheSearchCompareFunc_JSON(void* cache, SIndexTerm* term, SIdxTe
|
|||
if (cache == NULL) {
|
||||
return 0;
|
||||
}
|
||||
_cache_range_compare cmpFn = rangeCompare[type];
|
||||
_cache_range_compare cmpFn = indexGetCompare(type);
|
||||
|
||||
MemTable* mem = cache;
|
||||
IndexCache* pCache = mem->pCache;
|
||||
|
|
|
@ -13,12 +13,58 @@
|
|||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#include "indexComm.h"
|
||||
#include "index.h"
|
||||
#include "indexInt.h"
|
||||
#include "tcompare.h"
|
||||
|
||||
char JSON_COLUMN[] = "JSON";
|
||||
char JSON_VALUE_DELIM = '&';
|
||||
|
||||
static TExeCond tCompareLessThan(void* a, void* b, int8_t type) {
|
||||
__compar_fn_t func = getComparFunc(type, 0);
|
||||
return tDoCommpare(func, QUERY_LESS_THAN, a, b);
|
||||
}
|
||||
static TExeCond tCompareLessEqual(void* a, void* b, int8_t type) {
|
||||
__compar_fn_t func = getComparFunc(type, 0);
|
||||
return tDoCommpare(func, QUERY_LESS_EQUAL, a, b);
|
||||
}
|
||||
static TExeCond tCompareGreaterThan(void* a, void* b, int8_t type) {
|
||||
__compar_fn_t func = getComparFunc(type, 0);
|
||||
return tDoCommpare(func, QUERY_GREATER_THAN, a, b);
|
||||
}
|
||||
static TExeCond tCompareGreaterEqual(void* a, void* b, int8_t type) {
|
||||
__compar_fn_t func = getComparFunc(type, 0);
|
||||
return tDoCommpare(func, QUERY_GREATER_EQUAL, a, b);
|
||||
}
|
||||
|
||||
TExeCond tDoCommpare(__compar_fn_t func, int8_t comType, void* a, void* b) {
|
||||
// optime later
|
||||
int32_t ret = func(a, b);
|
||||
switch (comType) {
|
||||
case QUERY_LESS_THAN: {
|
||||
if (ret < 0) return MATCH;
|
||||
} break;
|
||||
case QUERY_LESS_EQUAL: {
|
||||
if (ret <= 0) return MATCH;
|
||||
break;
|
||||
}
|
||||
case QUERY_GREATER_THAN: {
|
||||
if (ret > 0) return MATCH;
|
||||
break;
|
||||
}
|
||||
case QUERY_GREATER_EQUAL: {
|
||||
if (ret >= 0) return MATCH;
|
||||
}
|
||||
}
|
||||
return CONTINUE;
|
||||
}
|
||||
|
||||
static TExeCond (*rangeCompare[])(void* a, void* b, int8_t type) = {tCompareLessThan, tCompareLessEqual,
|
||||
tCompareGreaterThan, tCompareGreaterEqual};
|
||||
|
||||
_cache_range_compare indexGetCompare(RangeType ty) { return rangeCompare[ty]; }
|
||||
|
||||
char* indexPackJsonData(SIndexTerm* itm) {
|
||||
/*
|
||||
* |<-----colname---->|<-----dataType---->|<--------colVal---------->|
|
||||
|
@ -46,6 +92,7 @@ char* indexPackJsonData(SIndexTerm* itm) {
|
|||
|
||||
return buf;
|
||||
}
|
||||
|
||||
char* indexPackJsonDataPrefix(SIndexTerm* itm, int32_t* skip) {
|
||||
/*
|
||||
* |<-----colname---->|<-----dataType---->|<--------colVal---------->|
|
||||
|
|
|
@ -72,9 +72,23 @@ static int32_t tfSearchRange(void* reader, SIndexTerm* tem, SIdxTempResult* tr);
|
|||
|
||||
static int32_t tfSearchCompareFunc(void* reader, SIndexTerm* tem, SIdxTempResult* tr, RangeType ctype);
|
||||
|
||||
static int32_t (*tfSearch[])(void* reader, SIndexTerm* tem, SIdxTempResult* tr) = {
|
||||
tfSearchTerm, tfSearchPrefix, tfSearchSuffix, tfSearchRegex, tfSearchLessThan,
|
||||
tfSearchLessEqual, tfSearchGreaterThan, tfSearchGreaterEqual, tfSearchRange};
|
||||
static int32_t tfSearchTerm_JSON(void* reader, SIndexTerm* tem, SIdxTempResult* tr);
|
||||
static int32_t tfSearchPrefix_JSON(void* reader, SIndexTerm* tem, SIdxTempResult* tr);
|
||||
static int32_t tfSearchSuffix_JSON(void* reader, SIndexTerm* tem, SIdxTempResult* tr);
|
||||
static int32_t tfSearchRegex_JSON(void* reader, SIndexTerm* tem, SIdxTempResult* tr);
|
||||
static int32_t tfSearchLessThan_JSON(void* reader, SIndexTerm* tem, SIdxTempResult* tr);
|
||||
static int32_t tfSearchLessEqual_JSON(void* reader, SIndexTerm* tem, SIdxTempResult* tr);
|
||||
static int32_t tfSearchGreaterThan_JSON(void* reader, SIndexTerm* tem, SIdxTempResult* tr);
|
||||
static int32_t tfSearchGreaterEqual_JSON(void* reader, SIndexTerm* tem, SIdxTempResult* tr);
|
||||
static int32_t tfSearchRange_JSON(void* reader, SIndexTerm* tem, SIdxTempResult* tr);
|
||||
|
||||
static int32_t tfSearchCompareFunc_JSON(void* reader, SIndexTerm* tem, SIdxTempResult* tr, RangeType ctype);
|
||||
|
||||
static int32_t (*tfSearch[][QUERY_MAX])(void* reader, SIndexTerm* tem, SIdxTempResult* tr) = {
|
||||
{tfSearchTerm, tfSearchPrefix, tfSearchSuffix, tfSearchRegex, tfSearchLessThan, tfSearchLessEqual,
|
||||
tfSearchGreaterThan, tfSearchGreaterEqual, tfSearchRange},
|
||||
{tfSearchTerm_JSON, tfSearchPrefix_JSON, tfSearchSuffix_JSON, tfSearchRegex_JSON, tfSearchLessThan_JSON,
|
||||
tfSearchLessEqual_JSON, tfSearchGreaterThan_JSON, tfSearchGreaterEqual_JSON, tfSearchRange_JSON}};
|
||||
|
||||
TFileCache* tfileCacheCreate(const char* path) {
|
||||
TFileCache* tcache = taosMemoryCalloc(1, sizeof(TFileCache));
|
||||
|
@ -202,14 +216,10 @@ void tfileReaderDestroy(TFileReader* reader) {
|
|||
taosMemoryFree(reader);
|
||||
}
|
||||
static int32_t tfSearchTerm(void* reader, SIndexTerm* tem, SIdxTempResult* tr) {
|
||||
bool hasJson = INDEX_TYPE_CONTAIN_EXTERN_TYPE(tem->colType, TSDB_DATA_TYPE_JSON);
|
||||
int ret = 0;
|
||||
char* p = tem->colVal;
|
||||
uint64_t sz = tem->nColVal;
|
||||
if (hasJson) {
|
||||
p = indexPackJsonData(tem);
|
||||
sz = strlen(p);
|
||||
}
|
||||
|
||||
int64_t st = taosGetTimestampUs();
|
||||
FstSlice key = fstSliceCreate(p, sz);
|
||||
uint64_t offset;
|
||||
|
@ -224,9 +234,6 @@ static int32_t tfSearchTerm(void* reader, SIndexTerm* tem, SIdxTempResult* tr) {
|
|||
indexInfo("index: %" PRIu64 ", col: %s, colVal: %s, load all table info, time cost: %" PRIu64 "us", tem->suid,
|
||||
tem->colName, tem->colVal, cost);
|
||||
}
|
||||
if (hasJson) {
|
||||
taosMemoryFree(p);
|
||||
}
|
||||
fstSliceDestroy(&key);
|
||||
return 0;
|
||||
}
|
||||
|
@ -308,14 +315,11 @@ static int32_t tfSearchRegex(void* reader, SIndexTerm* tem, SIdxTempResult* tr)
|
|||
}
|
||||
|
||||
static int32_t tfSearchCompareFunc(void* reader, SIndexTerm* tem, SIdxTempResult* tr, RangeType type) {
|
||||
bool hasJson = INDEX_TYPE_CONTAIN_EXTERN_TYPE(tem->colType, TSDB_DATA_TYPE_JSON);
|
||||
int ret = 0;
|
||||
char* p = tem->colVal;
|
||||
int skip = 0;
|
||||
int ret = 0;
|
||||
char* p = tem->colVal;
|
||||
int skip = 0;
|
||||
_cache_range_compare cmpFn = indexGetCompare(type);
|
||||
|
||||
if (hasJson) {
|
||||
p = indexPackJsonDataPrefix(tem, &skip);
|
||||
}
|
||||
SArray* offsets = taosArrayInit(16, sizeof(uint64_t));
|
||||
|
||||
AutomationCtx* ctx = automCtxCreate((void*)p, AUTOMATION_ALWAYS);
|
||||
|
@ -328,7 +332,16 @@ static int32_t tfSearchCompareFunc(void* reader, SIndexTerm* tem, SIdxTempResult
|
|||
StreamWithState* st = streamBuilderIntoStream(sb);
|
||||
StreamWithStateResult* rt = NULL;
|
||||
while ((rt = streamWithStateNextWith(st, NULL)) != NULL) {
|
||||
taosArrayPush(offsets, &(rt->out.out));
|
||||
FstSlice* s = &rt->data;
|
||||
char* ch = (char*)fstSliceData(s, NULL);
|
||||
TExeCond cond = cmpFn(ch, p, tem->colType);
|
||||
if (MATCH == cond) {
|
||||
tfileReaderLoadTableIds((TFileReader*)reader, rt->out.out, tr->total);
|
||||
} else if (CONTINUE == cond) {
|
||||
} else if (BREAK == cond) {
|
||||
swsResultDestroy(rt);
|
||||
break;
|
||||
}
|
||||
swsResultDestroy(rt);
|
||||
}
|
||||
streamWithStateDestroy(st);
|
||||
|
@ -376,17 +389,105 @@ static int32_t tfSearchRange(void* reader, SIndexTerm* tem, SIdxTempResult* tr)
|
|||
fstSliceDestroy(&key);
|
||||
return 0;
|
||||
}
|
||||
static int32_t tfSearchTerm_JSON(void* reader, SIndexTerm* tem, SIdxTempResult* tr) {
|
||||
int ret = 0;
|
||||
char* p = indexPackJsonData(tem);
|
||||
int sz = strlen(p);
|
||||
|
||||
int64_t st = taosGetTimestampUs();
|
||||
FstSlice key = fstSliceCreate(p, sz);
|
||||
uint64_t offset;
|
||||
if (fstGet(((TFileReader*)reader)->fst, &key, &offset)) {
|
||||
int64_t et = taosGetTimestampUs();
|
||||
int64_t cost = et - st;
|
||||
indexInfo("index: %" PRIu64 ", col: %s, colVal: %s, found table info in tindex, time cost: %" PRIu64 "us",
|
||||
tem->suid, tem->colName, tem->colVal, cost);
|
||||
|
||||
ret = tfileReaderLoadTableIds((TFileReader*)reader, offset, tr->total);
|
||||
cost = taosGetTimestampUs() - et;
|
||||
indexInfo("index: %" PRIu64 ", col: %s, colVal: %s, load all table info, time cost: %" PRIu64 "us", tem->suid,
|
||||
tem->colName, tem->colVal, cost);
|
||||
}
|
||||
fstSliceDestroy(&key);
|
||||
return 0;
|
||||
// deprecate api
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
static int32_t tfSearchPrefix_JSON(void* reader, SIndexTerm* tem, SIdxTempResult* tr) {
|
||||
// impl later
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
static int32_t tfSearchSuffix_JSON(void* reader, SIndexTerm* tem, SIdxTempResult* tr) {
|
||||
// impl later
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
static int32_t tfSearchRegex_JSON(void* reader, SIndexTerm* tem, SIdxTempResult* tr) {
|
||||
// impl later
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
static int32_t tfSearchLessThan_JSON(void* reader, SIndexTerm* tem, SIdxTempResult* tr) {
|
||||
return tfSearchCompareFunc_JSON(reader, tem, tr, LT);
|
||||
}
|
||||
static int32_t tfSearchLessEqual_JSON(void* reader, SIndexTerm* tem, SIdxTempResult* tr) {
|
||||
return tfSearchCompareFunc_JSON(reader, tem, tr, LE);
|
||||
}
|
||||
static int32_t tfSearchGreaterThan_JSON(void* reader, SIndexTerm* tem, SIdxTempResult* tr) {
|
||||
return tfSearchCompareFunc_JSON(reader, tem, tr, GT);
|
||||
}
|
||||
static int32_t tfSearchGreaterEqual_JSON(void* reader, SIndexTerm* tem, SIdxTempResult* tr) {
|
||||
return tfSearchCompareFunc_JSON(reader, tem, tr, GE);
|
||||
}
|
||||
static int32_t tfSearchRange_JSON(void* reader, SIndexTerm* tem, SIdxTempResult* tr) {
|
||||
// impl later
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t tfSearchCompareFunc_JSON(void* reader, SIndexTerm* tem, SIdxTempResult* tr, RangeType ctype) {
|
||||
int ret = 0;
|
||||
int skip = 0;
|
||||
|
||||
char* p = indexPackJsonDataPrefix(tem, &skip);
|
||||
|
||||
_cache_range_compare cmpFn = indexGetCompare(ctype);
|
||||
|
||||
SArray* offsets = taosArrayInit(16, sizeof(uint64_t));
|
||||
|
||||
AutomationCtx* ctx = automCtxCreate((void*)p, AUTOMATION_PREFIX);
|
||||
FstStreamBuilder* sb = fstSearch(((TFileReader*)reader)->fst, ctx);
|
||||
|
||||
FstSlice h = fstSliceCreate((uint8_t*)p, skip);
|
||||
fstStreamBuilderSetRange(sb, &h, ctype);
|
||||
fstSliceDestroy(&h);
|
||||
|
||||
StreamWithState* st = streamBuilderIntoStream(sb);
|
||||
StreamWithStateResult* rt = NULL;
|
||||
while ((rt = streamWithStateNextWith(st, NULL)) != NULL) {
|
||||
FstSlice* s = &rt->data;
|
||||
char* ch = (char*)fstSliceData(s, NULL);
|
||||
TExeCond cond = cmpFn(ch, p, tem->colType);
|
||||
if (MATCH == cond) {
|
||||
tfileReaderLoadTableIds((TFileReader*)reader, rt->out.out, tr->total);
|
||||
} else if (CONTINUE == cond) {
|
||||
} else if (BREAK == cond) {
|
||||
swsResultDestroy(rt);
|
||||
break;
|
||||
}
|
||||
swsResultDestroy(rt);
|
||||
}
|
||||
streamWithStateDestroy(st);
|
||||
fstStreamBuilderDestroy(sb);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
int tfileReaderSearch(TFileReader* reader, SIndexTermQuery* query, SIdxTempResult* tr) {
|
||||
SIndexTerm* term = query->term;
|
||||
EIndexQueryType qtype = query->qType;
|
||||
if (qtype >= sizeof(tfSearch) / sizeof(tfSearch[0])) {
|
||||
indexInfo("index: %" PRIu64 ", col: %s, colVal: %s, not found table info in tindex", term->suid, term->colName,
|
||||
term->colVal);
|
||||
return -1;
|
||||
|
||||
if (INDEX_TYPE_CONTAIN_EXTERN_TYPE(term->colType, TSDB_DATA_TYPE_JSON)) {
|
||||
return tfSearch[1][qtype](reader, term, tr);
|
||||
} else {
|
||||
return tfSearch[qtype](reader, term, tr);
|
||||
return tfSearch[0][qtype](reader, term, tr);
|
||||
}
|
||||
|
||||
tfileReaderUnRef(reader);
|
||||
return 0;
|
||||
}
|
||||
|
|
|
@ -1108,7 +1108,7 @@ void transSendResponse(const STransMsg* msg) {
|
|||
SSrvMsg* srvMsg = taosMemoryCalloc(1, sizeof(SSrvMsg));
|
||||
srvMsg->msg = tmsg;
|
||||
srvMsg->type = Normal;
|
||||
tTrace("server conn %p start to send resp (1/2)", exh->handle);
|
||||
tDebug("server conn %p start to send resp (1/2)", exh->handle);
|
||||
transSendAsync(pThrd->asyncPool, &srvMsg->q);
|
||||
uvReleaseExHandle(refId);
|
||||
return;
|
||||
|
|
Loading…
Reference in New Issue