From 6c844c7567dfa875ff90005c9f35561a10818d8a Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Mon, 9 May 2022 23:34:48 +0800 Subject: [PATCH] enh(index): support numberic and json filter --- source/libs/index/inc/indexComm.h | 12 +++ source/libs/index/src/indexCache.c | 48 +-------- source/libs/index/src/indexComm.c | 47 +++++++++ source/libs/index/src/indexTfile.c | 149 ++++++++++++++++++++++----- source/libs/transport/src/transSrv.c | 2 +- 5 files changed, 187 insertions(+), 71 deletions(-) diff --git a/source/libs/index/inc/indexComm.h b/source/libs/index/inc/indexComm.h index 3b07429089..4cab71f92c 100644 --- a/source/libs/index/inc/indexComm.h +++ b/source/libs/index/inc/indexComm.h @@ -20,11 +20,23 @@ extern "C" { #endif +#include "indexInt.h" +#include "tcompare.h" + extern char JSON_COLUMN[]; extern char JSON_VALUE_DELIM; char* indexPackJsonData(SIndexTerm* itm); char* indexPackJsonDataPrefix(SIndexTerm* itm, int32_t* skip); + +typedef enum { MATCH, CONTINUE, BREAK } TExeCond; + +typedef TExeCond (*_cache_range_compare)(void* a, void* b, int8_t type); + +TExeCond tDoCommpare(__compar_fn_t func, int8_t comType, void* a, void* b); + +_cache_range_compare indexGetCompare(RangeType ty); + #ifdef __cplusplus } #endif diff --git a/source/libs/index/src/indexCache.c b/source/libs/index/src/indexCache.c index 929f33909e..0653c1d1fa 100644 --- a/source/libs/index/src/indexCache.c +++ b/source/libs/index/src/indexCache.c @@ -60,50 +60,6 @@ static int32_t cacheSearchRange_JSON(void* cache, SIndexTerm* ct, SIdxTempResult static int32_t cacheSearchCompareFunc_JSON(void* cache, SIndexTerm* term, SIdxTempResult* tr, STermValueType* s, RangeType type); -typedef enum { MATCH, CONTINUE, BREAK } TExeCond; -typedef TExeCond (*_cache_range_compare)(void* a, void* b, int8_t type); - -static TExeCond tDoCommpare(__compar_fn_t func, int8_t comType, void* a, void* b) { - // optime later - int32_t ret = func(a, b); - switch (comType) { - case QUERY_LESS_THAN: { - if (ret < 0) return MATCH; - } break; - case QUERY_LESS_EQUAL: { - if (ret <= 0) return MATCH; - break; - } - case QUERY_GREATER_THAN: { - if (ret > 0) return MATCH; - break; - } - case QUERY_GREATER_EQUAL: { - if (ret >= 0) return MATCH; - } - } - return CONTINUE; -} -static TExeCond tCompareLessThan(void* a, void* b, int8_t type) { - __compar_fn_t func = getComparFunc(type, 0); - return tDoCommpare(func, QUERY_LESS_THAN, a, b); -} -static TExeCond tCompareLessEqual(void* a, void* b, int8_t type) { - __compar_fn_t func = getComparFunc(type, 0); - return tDoCommpare(func, QUERY_LESS_EQUAL, a, b); -} -static TExeCond tCompareGreaterThan(void* a, void* b, int8_t type) { - __compar_fn_t func = getComparFunc(type, 0); - return tDoCommpare(func, QUERY_GREATER_THAN, a, b); -} -static TExeCond tCompareGreaterEqual(void* a, void* b, int8_t type) { - __compar_fn_t func = getComparFunc(type, 0); - return tDoCommpare(func, QUERY_GREATER_EQUAL, a, b); -} - -static TExeCond (*rangeCompare[])(void* a, void* b, int8_t type) = {tCompareLessThan, tCompareLessEqual, - tCompareGreaterThan, tCompareGreaterEqual}; - static int32_t (*cacheSearch[][QUERY_MAX])(void* cache, SIndexTerm* ct, SIdxTempResult* tr, STermValueType* s) = { {cacheSearchTerm, cacheSearchPrefix, cacheSearchSuffix, cacheSearchRegex, cacheSearchLessThan, cacheSearchLessEqual, cacheSearchGreaterThan, cacheSearchGreaterEqual, cacheSearchRange}, @@ -169,7 +125,7 @@ static int32_t cacheSearchCompareFunc(void* cache, SIndexTerm* term, SIdxTempRes return 0; } - _cache_range_compare cmpFn = rangeCompare[type]; + _cache_range_compare cmpFn = indexGetCompare(type); MemTable* mem = cache; IndexCache* pCache = mem->pCache; @@ -295,7 +251,7 @@ static int32_t cacheSearchCompareFunc_JSON(void* cache, SIndexTerm* term, SIdxTe if (cache == NULL) { return 0; } - _cache_range_compare cmpFn = rangeCompare[type]; + _cache_range_compare cmpFn = indexGetCompare(type); MemTable* mem = cache; IndexCache* pCache = mem->pCache; diff --git a/source/libs/index/src/indexComm.c b/source/libs/index/src/indexComm.c index cdd7b35675..9e85a6680a 100644 --- a/source/libs/index/src/indexComm.c +++ b/source/libs/index/src/indexComm.c @@ -13,12 +13,58 @@ * along with this program. If not, see . */ +#include "indexComm.h" #include "index.h" #include "indexInt.h" +#include "tcompare.h" char JSON_COLUMN[] = "JSON"; char JSON_VALUE_DELIM = '&'; +static TExeCond tCompareLessThan(void* a, void* b, int8_t type) { + __compar_fn_t func = getComparFunc(type, 0); + return tDoCommpare(func, QUERY_LESS_THAN, a, b); +} +static TExeCond tCompareLessEqual(void* a, void* b, int8_t type) { + __compar_fn_t func = getComparFunc(type, 0); + return tDoCommpare(func, QUERY_LESS_EQUAL, a, b); +} +static TExeCond tCompareGreaterThan(void* a, void* b, int8_t type) { + __compar_fn_t func = getComparFunc(type, 0); + return tDoCommpare(func, QUERY_GREATER_THAN, a, b); +} +static TExeCond tCompareGreaterEqual(void* a, void* b, int8_t type) { + __compar_fn_t func = getComparFunc(type, 0); + return tDoCommpare(func, QUERY_GREATER_EQUAL, a, b); +} + +TExeCond tDoCommpare(__compar_fn_t func, int8_t comType, void* a, void* b) { + // optime later + int32_t ret = func(a, b); + switch (comType) { + case QUERY_LESS_THAN: { + if (ret < 0) return MATCH; + } break; + case QUERY_LESS_EQUAL: { + if (ret <= 0) return MATCH; + break; + } + case QUERY_GREATER_THAN: { + if (ret > 0) return MATCH; + break; + } + case QUERY_GREATER_EQUAL: { + if (ret >= 0) return MATCH; + } + } + return CONTINUE; +} + +static TExeCond (*rangeCompare[])(void* a, void* b, int8_t type) = {tCompareLessThan, tCompareLessEqual, + tCompareGreaterThan, tCompareGreaterEqual}; + +_cache_range_compare indexGetCompare(RangeType ty) { return rangeCompare[ty]; } + char* indexPackJsonData(SIndexTerm* itm) { /* * |<-----colname---->|<-----dataType---->|<--------colVal---------->| @@ -46,6 +92,7 @@ char* indexPackJsonData(SIndexTerm* itm) { return buf; } + char* indexPackJsonDataPrefix(SIndexTerm* itm, int32_t* skip) { /* * |<-----colname---->|<-----dataType---->|<--------colVal---------->| diff --git a/source/libs/index/src/indexTfile.c b/source/libs/index/src/indexTfile.c index b5551e825f..c56d65fc6a 100644 --- a/source/libs/index/src/indexTfile.c +++ b/source/libs/index/src/indexTfile.c @@ -72,9 +72,23 @@ static int32_t tfSearchRange(void* reader, SIndexTerm* tem, SIdxTempResult* tr); static int32_t tfSearchCompareFunc(void* reader, SIndexTerm* tem, SIdxTempResult* tr, RangeType ctype); -static int32_t (*tfSearch[])(void* reader, SIndexTerm* tem, SIdxTempResult* tr) = { - tfSearchTerm, tfSearchPrefix, tfSearchSuffix, tfSearchRegex, tfSearchLessThan, - tfSearchLessEqual, tfSearchGreaterThan, tfSearchGreaterEqual, tfSearchRange}; +static int32_t tfSearchTerm_JSON(void* reader, SIndexTerm* tem, SIdxTempResult* tr); +static int32_t tfSearchPrefix_JSON(void* reader, SIndexTerm* tem, SIdxTempResult* tr); +static int32_t tfSearchSuffix_JSON(void* reader, SIndexTerm* tem, SIdxTempResult* tr); +static int32_t tfSearchRegex_JSON(void* reader, SIndexTerm* tem, SIdxTempResult* tr); +static int32_t tfSearchLessThan_JSON(void* reader, SIndexTerm* tem, SIdxTempResult* tr); +static int32_t tfSearchLessEqual_JSON(void* reader, SIndexTerm* tem, SIdxTempResult* tr); +static int32_t tfSearchGreaterThan_JSON(void* reader, SIndexTerm* tem, SIdxTempResult* tr); +static int32_t tfSearchGreaterEqual_JSON(void* reader, SIndexTerm* tem, SIdxTempResult* tr); +static int32_t tfSearchRange_JSON(void* reader, SIndexTerm* tem, SIdxTempResult* tr); + +static int32_t tfSearchCompareFunc_JSON(void* reader, SIndexTerm* tem, SIdxTempResult* tr, RangeType ctype); + +static int32_t (*tfSearch[][QUERY_MAX])(void* reader, SIndexTerm* tem, SIdxTempResult* tr) = { + {tfSearchTerm, tfSearchPrefix, tfSearchSuffix, tfSearchRegex, tfSearchLessThan, tfSearchLessEqual, + tfSearchGreaterThan, tfSearchGreaterEqual, tfSearchRange}, + {tfSearchTerm_JSON, tfSearchPrefix_JSON, tfSearchSuffix_JSON, tfSearchRegex_JSON, tfSearchLessThan_JSON, + tfSearchLessEqual_JSON, tfSearchGreaterThan_JSON, tfSearchGreaterEqual_JSON, tfSearchRange_JSON}}; TFileCache* tfileCacheCreate(const char* path) { TFileCache* tcache = taosMemoryCalloc(1, sizeof(TFileCache)); @@ -202,14 +216,10 @@ void tfileReaderDestroy(TFileReader* reader) { taosMemoryFree(reader); } static int32_t tfSearchTerm(void* reader, SIndexTerm* tem, SIdxTempResult* tr) { - bool hasJson = INDEX_TYPE_CONTAIN_EXTERN_TYPE(tem->colType, TSDB_DATA_TYPE_JSON); int ret = 0; char* p = tem->colVal; uint64_t sz = tem->nColVal; - if (hasJson) { - p = indexPackJsonData(tem); - sz = strlen(p); - } + int64_t st = taosGetTimestampUs(); FstSlice key = fstSliceCreate(p, sz); uint64_t offset; @@ -224,9 +234,6 @@ static int32_t tfSearchTerm(void* reader, SIndexTerm* tem, SIdxTempResult* tr) { indexInfo("index: %" PRIu64 ", col: %s, colVal: %s, load all table info, time cost: %" PRIu64 "us", tem->suid, tem->colName, tem->colVal, cost); } - if (hasJson) { - taosMemoryFree(p); - } fstSliceDestroy(&key); return 0; } @@ -308,14 +315,11 @@ static int32_t tfSearchRegex(void* reader, SIndexTerm* tem, SIdxTempResult* tr) } static int32_t tfSearchCompareFunc(void* reader, SIndexTerm* tem, SIdxTempResult* tr, RangeType type) { - bool hasJson = INDEX_TYPE_CONTAIN_EXTERN_TYPE(tem->colType, TSDB_DATA_TYPE_JSON); - int ret = 0; - char* p = tem->colVal; - int skip = 0; + int ret = 0; + char* p = tem->colVal; + int skip = 0; + _cache_range_compare cmpFn = indexGetCompare(type); - if (hasJson) { - p = indexPackJsonDataPrefix(tem, &skip); - } SArray* offsets = taosArrayInit(16, sizeof(uint64_t)); AutomationCtx* ctx = automCtxCreate((void*)p, AUTOMATION_ALWAYS); @@ -328,7 +332,16 @@ static int32_t tfSearchCompareFunc(void* reader, SIndexTerm* tem, SIdxTempResult StreamWithState* st = streamBuilderIntoStream(sb); StreamWithStateResult* rt = NULL; while ((rt = streamWithStateNextWith(st, NULL)) != NULL) { - taosArrayPush(offsets, &(rt->out.out)); + FstSlice* s = &rt->data; + char* ch = (char*)fstSliceData(s, NULL); + TExeCond cond = cmpFn(ch, p, tem->colType); + if (MATCH == cond) { + tfileReaderLoadTableIds((TFileReader*)reader, rt->out.out, tr->total); + } else if (CONTINUE == cond) { + } else if (BREAK == cond) { + swsResultDestroy(rt); + break; + } swsResultDestroy(rt); } streamWithStateDestroy(st); @@ -376,17 +389,105 @@ static int32_t tfSearchRange(void* reader, SIndexTerm* tem, SIdxTempResult* tr) fstSliceDestroy(&key); return 0; } +static int32_t tfSearchTerm_JSON(void* reader, SIndexTerm* tem, SIdxTempResult* tr) { + int ret = 0; + char* p = indexPackJsonData(tem); + int sz = strlen(p); + int64_t st = taosGetTimestampUs(); + FstSlice key = fstSliceCreate(p, sz); + uint64_t offset; + if (fstGet(((TFileReader*)reader)->fst, &key, &offset)) { + int64_t et = taosGetTimestampUs(); + int64_t cost = et - st; + indexInfo("index: %" PRIu64 ", col: %s, colVal: %s, found table info in tindex, time cost: %" PRIu64 "us", + tem->suid, tem->colName, tem->colVal, cost); + + ret = tfileReaderLoadTableIds((TFileReader*)reader, offset, tr->total); + cost = taosGetTimestampUs() - et; + indexInfo("index: %" PRIu64 ", col: %s, colVal: %s, load all table info, time cost: %" PRIu64 "us", tem->suid, + tem->colName, tem->colVal, cost); + } + fstSliceDestroy(&key); + return 0; + // deprecate api + return TSDB_CODE_SUCCESS; +} +static int32_t tfSearchPrefix_JSON(void* reader, SIndexTerm* tem, SIdxTempResult* tr) { + // impl later + return TSDB_CODE_SUCCESS; +} +static int32_t tfSearchSuffix_JSON(void* reader, SIndexTerm* tem, SIdxTempResult* tr) { + // impl later + return TSDB_CODE_SUCCESS; +} +static int32_t tfSearchRegex_JSON(void* reader, SIndexTerm* tem, SIdxTempResult* tr) { + // impl later + return TSDB_CODE_SUCCESS; +} +static int32_t tfSearchLessThan_JSON(void* reader, SIndexTerm* tem, SIdxTempResult* tr) { + return tfSearchCompareFunc_JSON(reader, tem, tr, LT); +} +static int32_t tfSearchLessEqual_JSON(void* reader, SIndexTerm* tem, SIdxTempResult* tr) { + return tfSearchCompareFunc_JSON(reader, tem, tr, LE); +} +static int32_t tfSearchGreaterThan_JSON(void* reader, SIndexTerm* tem, SIdxTempResult* tr) { + return tfSearchCompareFunc_JSON(reader, tem, tr, GT); +} +static int32_t tfSearchGreaterEqual_JSON(void* reader, SIndexTerm* tem, SIdxTempResult* tr) { + return tfSearchCompareFunc_JSON(reader, tem, tr, GE); +} +static int32_t tfSearchRange_JSON(void* reader, SIndexTerm* tem, SIdxTempResult* tr) { + // impl later + return TSDB_CODE_SUCCESS; +} + +static int32_t tfSearchCompareFunc_JSON(void* reader, SIndexTerm* tem, SIdxTempResult* tr, RangeType ctype) { + int ret = 0; + int skip = 0; + + char* p = indexPackJsonDataPrefix(tem, &skip); + + _cache_range_compare cmpFn = indexGetCompare(ctype); + + SArray* offsets = taosArrayInit(16, sizeof(uint64_t)); + + AutomationCtx* ctx = automCtxCreate((void*)p, AUTOMATION_PREFIX); + FstStreamBuilder* sb = fstSearch(((TFileReader*)reader)->fst, ctx); + + FstSlice h = fstSliceCreate((uint8_t*)p, skip); + fstStreamBuilderSetRange(sb, &h, ctype); + fstSliceDestroy(&h); + + StreamWithState* st = streamBuilderIntoStream(sb); + StreamWithStateResult* rt = NULL; + while ((rt = streamWithStateNextWith(st, NULL)) != NULL) { + FstSlice* s = &rt->data; + char* ch = (char*)fstSliceData(s, NULL); + TExeCond cond = cmpFn(ch, p, tem->colType); + if (MATCH == cond) { + tfileReaderLoadTableIds((TFileReader*)reader, rt->out.out, tr->total); + } else if (CONTINUE == cond) { + } else if (BREAK == cond) { + swsResultDestroy(rt); + break; + } + swsResultDestroy(rt); + } + streamWithStateDestroy(st); + fstStreamBuilderDestroy(sb); + return TSDB_CODE_SUCCESS; +} int tfileReaderSearch(TFileReader* reader, SIndexTermQuery* query, SIdxTempResult* tr) { SIndexTerm* term = query->term; EIndexQueryType qtype = query->qType; - if (qtype >= sizeof(tfSearch) / sizeof(tfSearch[0])) { - indexInfo("index: %" PRIu64 ", col: %s, colVal: %s, not found table info in tindex", term->suid, term->colName, - term->colVal); - return -1; + + if (INDEX_TYPE_CONTAIN_EXTERN_TYPE(term->colType, TSDB_DATA_TYPE_JSON)) { + return tfSearch[1][qtype](reader, term, tr); } else { - return tfSearch[qtype](reader, term, tr); + return tfSearch[0][qtype](reader, term, tr); } + tfileReaderUnRef(reader); return 0; } diff --git a/source/libs/transport/src/transSrv.c b/source/libs/transport/src/transSrv.c index 27efbcda53..f6fa7b93fd 100644 --- a/source/libs/transport/src/transSrv.c +++ b/source/libs/transport/src/transSrv.c @@ -1070,7 +1070,7 @@ void transSendResponse(const STransMsg* msg) { SSrvMsg* srvMsg = taosMemoryCalloc(1, sizeof(SSrvMsg)); srvMsg->msg = tmsg; srvMsg->type = Normal; - tTrace("server conn %p start to send resp (1/2)", exh->handle); + tDebug("server conn %p start to send resp (1/2)", exh->handle); transSendAsync(pThrd->asyncPool, &srvMsg->q); uvReleaseExHandle(refId); return;