From e08dd100a44d4b30f35964754683840b38279cd6 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Fri, 6 May 2022 18:39:28 +0800 Subject: [PATCH 1/2] enh(index): support more data type --- include/libs/index/index.h | 7 +- source/libs/executor/src/indexoperator.c | 4 +- source/libs/index/inc/indexCache.h | 1 + source/libs/index/inc/indexInt.h | 4 +- source/libs/index/src/index.c | 4 +- source/libs/index/src/indexCache.c | 190 +++++++++++++++++------ source/libs/index/src/indexTfile.c | 14 +- source/libs/index/test/indexTests.cc | 44 +++--- source/libs/index/test/jsonUT.cc | 16 +- source/libs/transport/src/transSrv.c | 5 + 10 files changed, 194 insertions(+), 95 deletions(-) diff --git a/include/libs/index/index.h b/include/libs/index/index.h index c94d75338a..fa4cb1d2bd 100644 --- a/include/libs/index/index.h +++ b/include/libs/index/index.h @@ -56,7 +56,8 @@ typedef enum { QUERY_LESS_EQUAL, QUERY_GREATER_THAN, QUERY_GREATER_EQUAL, - QUERY_RANGE + QUERY_RANGE, + QUERY_MAX } EIndexQueryType; /* @@ -178,8 +179,8 @@ void indexOptsDestroy(SIndexOpts* opts); * @param: */ -SIndexTerm* indexTermCreate(int64_t suid, SIndexOperOnColumn operType, int8_t qType, uint8_t colType, - const char* colName, int32_t nColName, const char* colVal, int32_t nColVal); +SIndexTerm* indexTermCreate(int64_t suid, SIndexOperOnColumn operType, uint8_t colType, const char* colName, + int32_t nColName, const char* colVal, int32_t nColVal); void indexTermDestroy(SIndexTerm* p); /* diff --git a/source/libs/executor/src/indexoperator.c b/source/libs/executor/src/indexoperator.c index 3b62bdd664..86a28605b2 100644 --- a/source/libs/executor/src/indexoperator.c +++ b/source/libs/executor/src/indexoperator.c @@ -252,8 +252,8 @@ static int32_t sifExecFunction(SFunctionNode *node, SIFCtx *ctx, SIFParam *outpu return TSDB_CODE_QRY_INVALID_INPUT; } static int32_t sifDoIndex(SIFParam *left, SIFParam *right, int8_t operType, SIFParam *output) { - SIndexTerm *tm = indexTermCreate(left->suid, DEFAULT, operType, left->colValType, left->colName, - strlen(left->colName), right->condValue, strlen(right->condValue)); + SIndexTerm *tm = indexTermCreate(left->suid, DEFAULT, left->colValType, left->colName, strlen(left->colName), + right->condValue, strlen(right->condValue)); if (tm == NULL) { return TSDB_CODE_QRY_OUT_OF_MEMORY; } diff --git a/source/libs/index/inc/indexCache.h b/source/libs/index/inc/indexCache.h index 3ea986ad48..0daca26ec9 100644 --- a/source/libs/index/inc/indexCache.h +++ b/source/libs/index/inc/indexCache.h @@ -31,6 +31,7 @@ extern "C" { typedef struct MemTable { T_REF_DECLARE() SSkipList* mem; + void* pCache; } MemTable; typedef struct IndexCache { T_REF_DECLARE() diff --git a/source/libs/index/inc/indexInt.h b/source/libs/index/inc/indexInt.h index 5c7b8b9afe..7b7050d80e 100644 --- a/source/libs/index/inc/indexInt.h +++ b/source/libs/index/inc/indexInt.h @@ -166,7 +166,9 @@ int32_t indexSerialCacheKey(ICacheKey* key, char* buf); } while (0) #define INDEX_TYPE_CONTAIN_EXTERN_TYPE(ty, exTy) (((ty >> 4) & (exTy)) != 0) -#define INDEX_TYPE_GET_TYPE(ty) (ty & 0x0F) + +#define INDEX_TYPE_GET_TYPE(ty) (ty & 0x0F) + #define INDEX_TYPE_ADD_EXTERN_TYPE(ty, exTy) \ do { \ uint8_t oldTy = ty; \ diff --git a/source/libs/index/src/index.c b/source/libs/index/src/index.c index 83b5025ad0..e0c24ac3bd 100644 --- a/source/libs/index/src/index.c +++ b/source/libs/index/src/index.c @@ -244,8 +244,8 @@ int indexMultiTermQueryAdd(SIndexMultiTermQuery* pQuery, SIndexTerm* term, EInde return 0; } -SIndexTerm* indexTermCreate(int64_t suid, SIndexOperOnColumn oper, int8_t queryType, uint8_t colType, - const char* colName, int32_t nColName, const char* colVal, int32_t nColVal) { +SIndexTerm* indexTermCreate(int64_t suid, SIndexOperOnColumn oper, uint8_t colType, const char* colName, + int32_t nColName, const char* colVal, int32_t nColVal) { SIndexTerm* tm = (SIndexTerm*)taosMemoryCalloc(1, (sizeof(SIndexTerm))); if (tm == NULL) { return NULL; diff --git a/source/libs/index/src/indexCache.c b/source/libs/index/src/indexCache.c index 13768ce682..ad82dd0748 100644 --- a/source/libs/index/src/indexCache.c +++ b/source/libs/index/src/indexCache.c @@ -34,23 +34,58 @@ static char* indexCacheTermGet(const void* pData); static MemTable* indexInternalCacheCreate(int8_t type); -static int32_t cacheSearchTerm(void* cache, CacheTerm* ct, SIdxTempResult* tr, STermValueType* s); -static int32_t cacheSearchPrefix(void* cache, CacheTerm* ct, SIdxTempResult* tr, STermValueType* s); -static int32_t cacheSearchSuffix(void* cache, CacheTerm* ct, SIdxTempResult* tr, STermValueType* s); -static int32_t cacheSearchRegex(void* cache, CacheTerm* ct, SIdxTempResult* tr, STermValueType* s); -static int32_t cacheSearchLessThan(void* cache, CacheTerm* ct, SIdxTempResult* tr, STermValueType* s); -static int32_t cacheSearchLessEqual(void* cache, CacheTerm* ct, SIdxTempResult* tr, STermValueType* s); -static int32_t cacheSearchGreaterThan(void* cache, CacheTerm* ct, SIdxTempResult* tr, STermValueType* s); -static int32_t cacheSearchGreaterEqual(void* cache, CacheTerm* ct, SIdxTempResult* tr, STermValueType* s); -static int32_t cacheSearchRange(void* cache, CacheTerm* ct, SIdxTempResult* tr, STermValueType* s); +static int32_t cacheSearchTerm(void* cache, SIndexTerm* ct, SIdxTempResult* tr, STermValueType* s); +static int32_t cacheSearchPrefix(void* cache, SIndexTerm* ct, SIdxTempResult* tr, STermValueType* s); +static int32_t cacheSearchSuffix(void* cache, SIndexTerm* ct, SIdxTempResult* tr, STermValueType* s); +static int32_t cacheSearchRegex(void* cache, SIndexTerm* ct, SIdxTempResult* tr, STermValueType* s); +static int32_t cacheSearchLessThan(void* cache, SIndexTerm* ct, SIdxTempResult* tr, STermValueType* s); +static int32_t cacheSearchLessEqual(void* cache, SIndexTerm* ct, SIdxTempResult* tr, STermValueType* s); +static int32_t cacheSearchGreaterThan(void* cache, SIndexTerm* ct, SIdxTempResult* tr, STermValueType* s); +static int32_t cacheSearchGreaterEqual(void* cache, SIndexTerm* ct, SIdxTempResult* tr, STermValueType* s); +static int32_t cacheSearchRange(void* cache, SIndexTerm* ct, SIdxTempResult* tr, STermValueType* s); /*comm func of compare, used in (LE/LT/GE/GT compare)*/ -static int32_t cacheSearchCompareFunc(void* cache, CacheTerm* ct, SIdxTempResult* tr, STermValueType* s, +static int32_t cacheSearchCompareFunc(void* cache, SIndexTerm* ct, SIdxTempResult* tr, STermValueType* s, RangeType type); +static int32_t cacheSearchTerm_JSON(void* cache, SIndexTerm* ct, SIdxTempResult* tr, STermValueType* s); +static int32_t cacheSearchPrefix_JSON(void* cache, SIndexTerm* ct, SIdxTempResult* tr, STermValueType* s); +static int32_t cacheSearchSuffix_JSON(void* cache, SIndexTerm* ct, SIdxTempResult* tr, STermValueType* s); +static int32_t cacheSearchRegex_JSON(void* cache, SIndexTerm* ct, SIdxTempResult* tr, STermValueType* s); +static int32_t cacheSearchLessThan_JSON(void* cache, SIndexTerm* ct, SIdxTempResult* tr, STermValueType* s); +static int32_t cacheSearchLessEqual_JSON(void* cache, SIndexTerm* ct, SIdxTempResult* tr, STermValueType* s); +static int32_t cacheSearchGreaterThan_JSON(void* cache, SIndexTerm* ct, SIdxTempResult* tr, STermValueType* s); +static int32_t cacheSearchGreaterEqual_JSON(void* cache, SIndexTerm* ct, SIdxTempResult* tr, STermValueType* s); +static int32_t cacheSearchRange_JSON(void* cache, SIndexTerm* ct, SIdxTempResult* tr, STermValueType* s); + +static int32_t cacheSearchCompareFunc_JSON(void* cache, SIndexTerm* term, SIdxTempResult* tr, STermValueType* s, + RangeType type); typedef enum { MATCH, CONTINUE, BREAK } TExeCond; typedef TExeCond (*_cache_range_compare)(void* a, void* b, int8_t type); -static TExeCond tCompareLessThan(void* a, void* b, int8_t type) { return MATCH; } +static TExeCond tDoCommpare(__compar_fn_t func, int8_t comType, void* a, void* b) { + int32_t ret = func(a, b); + switch (comType) { + case QUERY_LESS_THAN: { + if (ret < 0) return MATCH; + } break; + case QUERY_LESS_EQUAL: { + if (ret <= 0) return MATCH; + break; + } + case QUERY_GREATER_THAN: { + if (ret > 0) return MATCH; + break; + } + case QUERY_GREATER_EQUAL: { + if (ret >= 0) return MATCH; + } + } + return CONTINUE; +} +static TExeCond tCompareLessThan(void* a, void* b, int8_t type) { + __compar_fn_t func = getComparFunc(type, 0); + return tDoCommpare(func, QUERY_LESS_THAN, a, b); +} static TExeCond tCompareLessEqual(void* a, void* b, int8_t type) { return MATCH; } static TExeCond tCompareGreaterThan(void* a, void* b, int8_t type) { return MATCH; } static TExeCond tCompareGreaterEqual(void* a, void* b, int8_t type) { return MATCH; } @@ -58,20 +93,26 @@ static TExeCond tCompareGreaterEqual(void* a, void* b, int8_t type) { return MAT static TExeCond (*rangeCompare[])(void* a, void* b, int8_t type) = {tCompareLessThan, tCompareLessEqual, tCompareGreaterThan, tCompareGreaterEqual}; -static int32_t (*cacheSearch[])(void* cache, CacheTerm* ct, SIdxTempResult* tr, STermValueType* s) = { - cacheSearchTerm, cacheSearchPrefix, cacheSearchSuffix, cacheSearchRegex, cacheSearchLessThan, - cacheSearchLessEqual, cacheSearchGreaterThan, cacheSearchGreaterEqual, cacheSearchRange}; +static int32_t (*cacheSearch[][QUERY_MAX])(void* cache, SIndexTerm* ct, SIdxTempResult* tr, STermValueType* s) = { + {cacheSearchTerm, cacheSearchPrefix, cacheSearchSuffix, cacheSearchRegex, cacheSearchLessThan, cacheSearchLessEqual, + cacheSearchGreaterThan, cacheSearchGreaterEqual, cacheSearchRange}, + {cacheSearchTerm_JSON, cacheSearchPrefix_JSON, cacheSearchSuffix_JSON, cacheSearchRegex_JSON, + cacheSearchLessThan_JSON, cacheSearchLessEqual_JSON, cacheSearchGreaterThan_JSON, cacheSearchGreaterEqual_JSON, + cacheSearchRange_JSON}}; static void doMergeWork(SSchedMsg* msg); static bool indexCacheIteratorNext(Iterate* itera); -static int32_t cacheSearchTerm(void* cache, CacheTerm* ct, SIdxTempResult* tr, STermValueType* s) { +static int32_t cacheSearchTerm(void* cache, SIndexTerm* term, SIdxTempResult* tr, STermValueType* s) { if (cache == NULL) { return 0; } + MemTable* mem = cache; + IndexCache* pCache = mem->pCache; + CacheTerm ct = {.colVal = term->colVal, .version = atomic_load_32(&pCache->version)}; + CacheTerm* pCt = &ct; - MemTable* mem = cache; - char* key = indexCacheTermGet(ct); + char* key = indexCacheTermGet(&ct); SSkipListIterator* iter = tSkipListCreateIterFromVal(mem->mem, key, TSDB_DATA_TYPE_BINARY, TSDB_ORDER_ASC); while (tSkipListIterNext(iter)) { @@ -80,7 +121,7 @@ static int32_t cacheSearchTerm(void* cache, CacheTerm* ct, SIdxTempResult* tr, S break; } CacheTerm* c = (CacheTerm*)SL_GET_NODE_DATA(node); - if (0 == strcmp(c->colVal, ct->colVal)) { + if (0 == strcmp(c->colVal, pCt->colVal)) { if (c->operaType == ADD_VALUE) { INDEX_MERGE_ADD_DEL(tr->deled, tr->added, c->uid) // taosArrayPush(result, &c->uid); @@ -95,27 +136,33 @@ static int32_t cacheSearchTerm(void* cache, CacheTerm* ct, SIdxTempResult* tr, S tSkipListDestroyIter(iter); return 0; } -static int32_t cacheSearchPrefix(void* cache, CacheTerm* ct, SIdxTempResult* tr, STermValueType* s) { +static int32_t cacheSearchPrefix(void* cache, SIndexTerm* term, SIdxTempResult* tr, STermValueType* s) { // impl later return 0; } -static int32_t cacheSearchSuffix(void* cache, CacheTerm* ct, SIdxTempResult* tr, STermValueType* s) { +static int32_t cacheSearchSuffix(void* cache, SIndexTerm* term, SIdxTempResult* tr, STermValueType* s) { // impl later return 0; } -static int32_t cacheSearchRegex(void* cache, CacheTerm* ct, SIdxTempResult* tr, STermValueType* s) { +static int32_t cacheSearchRegex(void* cache, SIndexTerm* term, SIdxTempResult* tr, STermValueType* s) { // impl later return 0; } -static int32_t cacheSearchCompareFunc(void* cache, CacheTerm* ct, SIdxTempResult* tr, STermValueType* s, +static int32_t cacheSearchCompareFunc(void* cache, SIndexTerm* term, SIdxTempResult* tr, STermValueType* s, RangeType type) { if (cache == NULL) { return 0; } _cache_range_compare cmpFn = rangeCompare[type]; - MemTable* mem = cache; - char* key = indexCacheTermGet(ct); + MemTable* mem = cache; + IndexCache* pCache = mem->pCache; + + CacheTerm* pCt = taosMemoryCalloc(1, sizeof(CacheTerm)); + pCt->colVal = term->colVal; + pCt->version = atomic_load_32(&pCache->version); + + char* key = indexCacheTermGet(pCt); SSkipListIterator* iter = tSkipListCreateIter(mem->mem); while (tSkipListIterNext(iter)) { @@ -124,7 +171,7 @@ static int32_t cacheSearchCompareFunc(void* cache, CacheTerm* ct, SIdxTempResult break; } CacheTerm* c = (CacheTerm*)SL_GET_NODE_DATA(node); - TExeCond cond = cmpFn(c->colVal, ct->colVal, ct->colType); + TExeCond cond = cmpFn(c->colVal, pCt->colVal, pCt->colType); if (cond == MATCH) { if (c->operaType == ADD_VALUE) { INDEX_MERGE_ADD_DEL(tr->deled, tr->added, c->uid) @@ -134,26 +181,61 @@ static int32_t cacheSearchCompareFunc(void* cache, CacheTerm* ct, SIdxTempResult INDEX_MERGE_ADD_DEL(tr->added, tr->deled, c->uid) } } else if (cond == CONTINUE) { + continue; } else if (cond == BREAK) { break; } } + taosMemoryFree(pCt); tSkipListDestroyIter(iter); return TSDB_CODE_SUCCESS; } -static int32_t cacheSearchLessThan(void* cache, CacheTerm* ct, SIdxTempResult* tr, STermValueType* s) { - return cacheSearchCompareFunc(cache, ct, tr, s, LT); +static int32_t cacheSearchLessThan(void* cache, SIndexTerm* term, SIdxTempResult* tr, STermValueType* s) { + return cacheSearchCompareFunc(cache, term, tr, s, LT); } -static int32_t cacheSearchLessEqual(void* cache, CacheTerm* ct, SIdxTempResult* tr, STermValueType* s) { - return cacheSearchCompareFunc(cache, ct, tr, s, LE); +static int32_t cacheSearchLessEqual(void* cache, SIndexTerm* term, SIdxTempResult* tr, STermValueType* s) { + return cacheSearchCompareFunc(cache, term, tr, s, LE); } -static int32_t cacheSearchGreaterThan(void* cache, CacheTerm* ct, SIdxTempResult* tr, STermValueType* s) { - return cacheSearchCompareFunc(cache, ct, tr, s, GT); +static int32_t cacheSearchGreaterThan(void* cache, SIndexTerm* term, SIdxTempResult* tr, STermValueType* s) { + return cacheSearchCompareFunc(cache, term, tr, s, GT); } -static int32_t cacheSearchGreaterEqual(void* cache, CacheTerm* ct, SIdxTempResult* tr, STermValueType* s) { - return cacheSearchCompareFunc(cache, ct, tr, s, GE); +static int32_t cacheSearchGreaterEqual(void* cache, SIndexTerm* term, SIdxTempResult* tr, STermValueType* s) { + return cacheSearchCompareFunc(cache, term, tr, s, GE); } -static int32_t cacheSearchRange(void* cache, CacheTerm* ct, SIdxTempResult* tr, STermValueType* s) { + +static int32_t cacheSearchTerm_JSON(void* cache, SIndexTerm* term, SIdxTempResult* tr, STermValueType* s) { + return TSDB_CODE_SUCCESS; +} +static int32_t cacheSearchPrefix_JSON(void* cache, SIndexTerm* term, SIdxTempResult* tr, STermValueType* s) { + return TSDB_CODE_SUCCESS; +} +static int32_t cacheSearchSuffix_JSON(void* cache, SIndexTerm* term, SIdxTempResult* tr, STermValueType* s) { + return TSDB_CODE_SUCCESS; +} +static int32_t cacheSearchRegex_JSON(void* cache, SIndexTerm* term, SIdxTempResult* tr, STermValueType* s) { + return TSDB_CODE_SUCCESS; +} +static int32_t cacheSearchLessThan_JSON(void* cache, SIndexTerm* term, SIdxTempResult* tr, STermValueType* s) { + return cacheSearchCompareFunc_JSON(cache, term, tr, s, LT); +} +static int32_t cacheSearchLessEqual_JSON(void* cache, SIndexTerm* term, SIdxTempResult* tr, STermValueType* s) { + return cacheSearchCompareFunc_JSON(cache, term, tr, s, LE); +} +static int32_t cacheSearchGreaterThan_JSON(void* cache, SIndexTerm* term, SIdxTempResult* tr, STermValueType* s) { + return cacheSearchCompareFunc_JSON(cache, term, tr, s, GT); +} +static int32_t cacheSearchGreaterEqual_JSON(void* cache, SIndexTerm* term, SIdxTempResult* tr, STermValueType* s) { + return cacheSearchCompareFunc_JSON(cache, term, tr, s, GE); +} +static int32_t cacheSearchRange_JSON(void* cache, SIndexTerm* term, SIdxTempResult* tr, STermValueType* s) { + return TSDB_CODE_SUCCESS; +} + +static int32_t cacheSearchCompareFunc_JSON(void* cache, SIndexTerm* term, SIdxTempResult* tr, STermValueType* s, + RangeType type) { + return TSDB_CODE_SUCCESS; +} +static int32_t cacheSearchRange(void* cache, SIndexTerm* term, SIdxTempResult* tr, STermValueType* s) { // impl later return 0; } @@ -167,6 +249,7 @@ IndexCache* indexCacheCreate(SIndex* idx, uint64_t suid, const char* colName, in }; cache->mem = indexInternalCacheCreate(type); + cache->mem->pCache = cache; cache->colName = INDEX_TYPE_CONTAIN_EXTERN_TYPE(type, TSDB_DATA_TYPE_JSON) ? tstrdup(JSON_COLUMN) : tstrdup(colName); cache->type = type; cache->index = idx; @@ -379,11 +462,19 @@ int indexCacheDel(void* cache, const char* fieldValue, int32_t fvlen, uint64_t u return 0; } -static int indexQueryMem(MemTable* mem, CacheTerm* ct, EIndexQueryType qtype, SIdxTempResult* tr, STermValueType* s) { +static int32_t indexQueryMem(MemTable* mem, SIndexTermQuery* query, SIdxTempResult* tr, STermValueType* s) { if (mem == NULL) { return 0; } - return cacheSearch[qtype](mem, ct, tr, s); + + SIndexTerm* term = query->term; + EIndexQueryType qtype = query->qType; + + if (INDEX_TYPE_CONTAIN_EXTERN_TYPE(term->colType, TSDB_DATA_TYPE_JSON)) { + return cacheSearch[1][qtype](mem, term, tr, s); + } else { + return cacheSearch[0][qtype](mem, term, tr, s); + } } int indexCacheSearch(void* cache, SIndexTermQuery* query, SIdxTempResult* result, STermValueType* s) { int64_t st = taosGetTimestampUs(); @@ -400,25 +491,24 @@ int indexCacheSearch(void* cache, SIndexTermQuery* query, SIdxTempResult* result indexMemRef(imm); taosThreadMutexUnlock(&pCache->mtx); - SIndexTerm* term = query->term; - EIndexQueryType qtype = query->qType; + // SIndexTerm* term = query->term; + // EIndexQueryType qtype = query->qType; - bool hasJson = INDEX_TYPE_CONTAIN_EXTERN_TYPE(term->colType, TSDB_DATA_TYPE_JSON); - char* p = term->colVal; - if (hasJson) { - p = indexPackJsonData(term); - } - CacheTerm ct = {.colVal = p, .version = atomic_load_32(&pCache->version)}; + // bool isJson = INDEX_TYPE_CONTAIN_EXTERN_TYPE(term->colType, TSDB_DATA_TYPE_JSON); + // char* p = term->colVal; + // if (isJson) { + // p = indexPackJsonData(term); + //} + // CacheTerm ct = {.colVal = p, .version = atomic_load_32(&pCache->version)}; - int ret = indexQueryMem(mem, &ct, qtype, result, s); + int ret = indexQueryMem(mem, query, result, s); if (ret == 0 && *s != kTypeDeletion) { // continue search in imm - ret = indexQueryMem(imm, &ct, qtype, result, s); - } - - if (hasJson) { - taosMemoryFreeClear(p); + ret = indexQueryMem(imm, query, result, s); } + // if (isJson) { + // taosMemoryFreeClear(p); + //} indexMemUnRef(mem); indexMemUnRef(imm); diff --git a/source/libs/index/src/indexTfile.c b/source/libs/index/src/indexTfile.c index 5aed2bd6b0..9edd868272 100644 --- a/source/libs/index/src/indexTfile.c +++ b/source/libs/index/src/indexTfile.c @@ -606,16 +606,16 @@ static bool tfileIteratorNext(Iterate* iiter) { static IterateValue* tifileIterateGetValue(Iterate* iter) { return &iter->val; } static TFileFstIter* tfileFstIteratorCreate(TFileReader* reader) { - TFileFstIter* tIter = taosMemoryCalloc(1, sizeof(TFileFstIter)); - if (tIter == NULL) { + TFileFstIter* iter = taosMemoryCalloc(1, sizeof(TFileFstIter)); + if (iter == NULL) { return NULL; } - tIter->ctx = automCtxCreate(NULL, AUTOMATION_ALWAYS); - tIter->fb = fstSearch(reader->fst, tIter->ctx); - tIter->st = streamBuilderIntoStream(tIter->fb); - tIter->rdr = reader; - return tIter; + iter->ctx = automCtxCreate(NULL, AUTOMATION_ALWAYS); + iter->fb = fstSearch(reader->fst, iter->ctx); + iter->st = streamBuilderIntoStream(iter->fb); + iter->rdr = reader; + return iter; } Iterate* tfileIteratorCreate(TFileReader* reader) { diff --git a/source/libs/index/test/indexTests.cc b/source/libs/index/test/indexTests.cc index d8ea6a8233..896451c686 100644 --- a/source/libs/index/test/indexTests.cc +++ b/source/libs/index/test/indexTests.cc @@ -483,7 +483,7 @@ TEST_F(IndexTFileEnv, test_tfile_write) { std::string colName("voltage"); std::string colVal("ab"); - SIndexTerm* term = indexTermCreate(1, ADD_VALUE, 0, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), + SIndexTerm* term = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), colVal.c_str(), colVal.size()); SIndexTermQuery query = {term, QUERY_TERM}; @@ -557,7 +557,7 @@ TEST_F(IndexCacheEnv, cache_test) { std::string colName("voltage"); { std::string colVal("v1"); - SIndexTerm* term = indexTermCreate(0, ADD_VALUE, 0, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), + SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), colVal.c_str(), colVal.size()); coj->Put(term, colId, version++, suid++); indexTermDestroy(term); @@ -565,28 +565,28 @@ TEST_F(IndexCacheEnv, cache_test) { } { std::string colVal("v3"); - SIndexTerm* term = indexTermCreate(0, ADD_VALUE, 0, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), + SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), colVal.c_str(), colVal.size()); coj->Put(term, colId, version++, suid++); indexTermDestroy(term); } { std::string colVal("v2"); - SIndexTerm* term = indexTermCreate(0, ADD_VALUE, 0, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), + SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), colVal.c_str(), colVal.size()); coj->Put(term, colId, version++, suid++); indexTermDestroy(term); } { std::string colVal("v3"); - SIndexTerm* term = indexTermCreate(0, ADD_VALUE, 0, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), + SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), colVal.c_str(), colVal.size()); coj->Put(term, colId, version++, suid++); indexTermDestroy(term); } { std::string colVal("v3"); - SIndexTerm* term = indexTermCreate(0, ADD_VALUE, 0, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), + SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), colVal.c_str(), colVal.size()); coj->Put(term, colId, version++, suid++); indexTermDestroy(term); @@ -595,14 +595,14 @@ TEST_F(IndexCacheEnv, cache_test) { std::cout << "--------first----------" << std::endl; { std::string colVal("v3"); - SIndexTerm* term = indexTermCreate(0, ADD_VALUE, 0, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), + SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), colVal.c_str(), colVal.size()); coj->Put(term, othColId, version++, suid++); indexTermDestroy(term); } { std::string colVal("v4"); - SIndexTerm* term = indexTermCreate(0, ADD_VALUE, 0, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), + SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), colVal.c_str(), colVal.size()); coj->Put(term, othColId, version++, suid++); indexTermDestroy(term); @@ -613,7 +613,7 @@ TEST_F(IndexCacheEnv, cache_test) { std::string colVal("v4"); for (size_t i = 0; i < 10; i++) { colVal[colVal.size() - 1] = 'a' + i; - SIndexTerm* term = indexTermCreate(0, ADD_VALUE, 0, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), + SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), colVal.c_str(), colVal.size()); coj->Put(term, colId, version++, suid++); indexTermDestroy(term); @@ -623,7 +623,7 @@ TEST_F(IndexCacheEnv, cache_test) { // begin query { std::string colVal("v3"); - SIndexTerm* term = indexTermCreate(0, ADD_VALUE, 0, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), + SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), colVal.c_str(), colVal.size()); SIndexTermQuery query = {term, QUERY_TERM}; SArray* ret = (SArray*)taosArrayInit(4, sizeof(suid)); @@ -638,7 +638,7 @@ TEST_F(IndexCacheEnv, cache_test) { } { std::string colVal("v2"); - SIndexTerm* term = indexTermCreate(0, ADD_VALUE, 0, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), + SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), colVal.c_str(), colVal.size()); SIndexTermQuery query = {term, QUERY_TERM}; SArray* ret = (SArray*)taosArrayInit(4, sizeof(suid)); @@ -670,7 +670,7 @@ class IndexObj { return ret; } void Del(const std::string& colName, const std::string& colVal, uint64_t uid) { - SIndexTerm* term = indexTermCreate(0, DEL_VALUE, 0, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), + SIndexTerm* term = indexTermCreate(0, DEL_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), colVal.c_str(), colVal.size()); SIndexMultiTerm* terms = indexMultiTermCreate(); indexMultiTermAdd(terms, term); @@ -679,7 +679,7 @@ class IndexObj { } int WriteMillonData(const std::string& colName, const std::string& colVal = "Hello world", size_t numOfTable = 100 * 10000) { - SIndexTerm* term = indexTermCreate(0, ADD_VALUE, 0, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), + SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), colVal.c_str(), colVal.size()); SIndexMultiTerm* terms = indexMultiTermCreate(); indexMultiTermAdd(terms, term); @@ -701,7 +701,7 @@ class IndexObj { // opt tColVal[taosRand() % colValSize] = 'a' + k % 26; } - SIndexTerm* term = indexTermCreate(0, ADD_VALUE, 0, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), + SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), tColVal.c_str(), tColVal.size()); SIndexMultiTerm* terms = indexMultiTermCreate(); indexMultiTermAdd(terms, term); @@ -737,7 +737,7 @@ class IndexObj { int SearchOne(const std::string& colName, const std::string& colVal) { SIndexMultiTermQuery* mq = indexMultiTermQueryCreate(MUST); - SIndexTerm* term = indexTermCreate(0, ADD_VALUE, 0, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), + SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), colVal.c_str(), colVal.size()); indexMultiTermQueryAdd(mq, term, QUERY_TERM); @@ -759,7 +759,7 @@ class IndexObj { } int SearchOneTarget(const std::string& colName, const std::string& colVal, uint64_t val) { SIndexMultiTermQuery* mq = indexMultiTermQueryCreate(MUST); - SIndexTerm* term = indexTermCreate(0, ADD_VALUE, 0, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), + SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), colVal.c_str(), colVal.size()); indexMultiTermQueryAdd(mq, term, QUERY_TERM); @@ -784,7 +784,7 @@ class IndexObj { void PutOne(const std::string& colName, const std::string& colVal) { SIndexMultiTerm* terms = indexMultiTermCreate(); - SIndexTerm* term = indexTermCreate(0, ADD_VALUE, 0, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), + SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), colVal.c_str(), colVal.size()); indexMultiTermAdd(terms, term); Put(terms, 10); @@ -792,7 +792,7 @@ class IndexObj { } void PutOneTarge(const std::string& colName, const std::string& colVal, uint64_t val) { SIndexMultiTerm* terms = indexMultiTermCreate(); - SIndexTerm* term = indexTermCreate(0, ADD_VALUE, 0, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), + SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), colVal.c_str(), colVal.size()); indexMultiTermAdd(terms, term); Put(terms, val); @@ -832,7 +832,7 @@ TEST_F(IndexEnv2, testIndexOpen) { { std::string colName("tag1"), colVal("Hello"); - SIndexTerm* term = indexTermCreate(0, ADD_VALUE, 0, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), + SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), colVal.c_str(), colVal.size()); SIndexMultiTerm* terms = indexMultiTermCreate(); indexMultiTermAdd(terms, term); @@ -847,7 +847,7 @@ TEST_F(IndexEnv2, testIndexOpen) { size_t size = 200; std::string colName("tag1"), colVal("hello"); - SIndexTerm* term = indexTermCreate(0, ADD_VALUE, 0, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), + SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), colVal.c_str(), colVal.size()); SIndexMultiTerm* terms = indexMultiTermCreate(); indexMultiTermAdd(terms, term); @@ -862,7 +862,7 @@ TEST_F(IndexEnv2, testIndexOpen) { size_t size = 200; std::string colName("tag1"), colVal("Hello"); - SIndexTerm* term = indexTermCreate(0, ADD_VALUE, 0, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), + SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), colVal.c_str(), colVal.size()); SIndexMultiTerm* terms = indexMultiTermCreate(); indexMultiTermAdd(terms, term); @@ -877,7 +877,7 @@ TEST_F(IndexEnv2, testIndexOpen) { { std::string colName("tag1"), colVal("Hello"); SIndexMultiTermQuery* mq = indexMultiTermQueryCreate(MUST); - SIndexTerm* term = indexTermCreate(0, ADD_VALUE, 0, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), + SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), colVal.c_str(), colVal.size()); indexMultiTermQueryAdd(mq, term, QUERY_TERM); diff --git a/source/libs/index/test/jsonUT.cc b/source/libs/index/test/jsonUT.cc index e1e5004701..f789d23136 100644 --- a/source/libs/index/test/jsonUT.cc +++ b/source/libs/index/test/jsonUT.cc @@ -40,7 +40,7 @@ TEST_F(JsonEnv, testWrite) { { std::string colName("test"); std::string colVal("ab"); - SIndexTerm* term = indexTermCreate(1, ADD_VALUE, 0, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), + SIndexTerm* term = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), colVal.c_str(), colVal.size()); SIndexMultiTerm* terms = indexMultiTermCreate(); @@ -53,7 +53,7 @@ TEST_F(JsonEnv, testWrite) { { std::string colName("voltage"); std::string colVal("ab1"); - SIndexTerm* term = indexTermCreate(1, ADD_VALUE, 0, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), + SIndexTerm* term = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), colVal.c_str(), colVal.size()); SIndexMultiTerm* terms = indexMultiTermCreate(); @@ -66,7 +66,7 @@ TEST_F(JsonEnv, testWrite) { { std::string colName("voltage"); std::string colVal("123"); - SIndexTerm* term = indexTermCreate(1, ADD_VALUE, 0, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), + SIndexTerm* term = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), colVal.c_str(), colVal.size()); SIndexMultiTerm* terms = indexMultiTermCreate(); @@ -81,7 +81,7 @@ TEST_F(JsonEnv, testWrite) { std::string colVal("ab"); SIndexMultiTermQuery* mq = indexMultiTermQueryCreate(MUST); - SIndexTerm* q = indexTermCreate(1, ADD_VALUE, 0, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), + SIndexTerm* q = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), colVal.c_str(), colVal.size()); SArray* result = taosArrayInit(1, sizeof(uint64_t)); @@ -95,7 +95,7 @@ TEST_F(JsonEnv, testWriteMillonData) { { std::string colName("test"); std::string colVal("ab"); - SIndexTerm* term = indexTermCreate(1, ADD_VALUE, 0, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), + SIndexTerm* term = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), colVal.c_str(), colVal.size()); SIndexMultiTerm* terms = indexMultiTermCreate(); @@ -110,7 +110,7 @@ TEST_F(JsonEnv, testWriteMillonData) { std::string colVal("abxxxxxxxxxxxx"); for (int i = 0; i < 1000; i++) { colVal[i % colVal.size()] = '0' + i % 128; - SIndexTerm* term = indexTermCreate(1, ADD_VALUE, 0, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), + SIndexTerm* term = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), colVal.c_str(), colVal.size()); SIndexMultiTerm* terms = indexMultiTermCreate(); @@ -124,7 +124,7 @@ TEST_F(JsonEnv, testWriteMillonData) { { std::string colName("voltagefdadfa"); std::string colVal("abxxxxxxxxxxxx"); - SIndexTerm* term = indexTermCreate(1, ADD_VALUE, 0, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), + SIndexTerm* term = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), colVal.c_str(), colVal.size()); SIndexMultiTerm* terms = indexMultiTermCreate(); @@ -139,7 +139,7 @@ TEST_F(JsonEnv, testWriteMillonData) { std::string colVal("ab"); SIndexMultiTermQuery* mq = indexMultiTermQueryCreate(MUST); - SIndexTerm* q = indexTermCreate(1, ADD_VALUE, 0, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), + SIndexTerm* q = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), colVal.c_str(), colVal.size()); SArray* result = taosArrayInit(1, sizeof(uint64_t)); diff --git a/source/libs/transport/src/transSrv.c b/source/libs/transport/src/transSrv.c index b78edf6cd0..9c905771d5 100644 --- a/source/libs/transport/src/transSrv.c +++ b/source/libs/transport/src/transSrv.c @@ -782,6 +782,11 @@ static void uvDestroyConn(uv_handle_t* handle) { tDebug("server conn %p destroy", conn); // uv_timer_stop(&conn->pTimer); transQueueDestroy(&conn->srvMsgs); + + if (conn->regArg.init == 1) { + transFreeMsg(conn->regArg.msg.pCont); + conn->regArg.init = 0; + } QUEUE_REMOVE(&conn->queue); taosMemoryFree(conn->pTcp); taosMemoryFree(conn); From 8240b10a3ee28286d4a44dd6e1faf5554aaf7692 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Fri, 6 May 2022 21:36:39 +0800 Subject: [PATCH 2/2] enh(index): support more data type --- source/libs/index/inc/indexCache.h | 1 + source/libs/index/inc/indexComm.h | 2 +- source/libs/index/src/indexCache.c | 119 +++++++++++++++++++++++++++-- source/libs/index/src/indexComm.c | 27 +++++++ source/libs/index/src/indexTfile.c | 14 ++-- 5 files changed, 149 insertions(+), 14 deletions(-) diff --git a/source/libs/index/inc/indexCache.h b/source/libs/index/inc/indexCache.h index 0daca26ec9..d474d87409 100644 --- a/source/libs/index/inc/indexCache.h +++ b/source/libs/index/inc/indexCache.h @@ -48,6 +48,7 @@ typedef struct IndexCache { } IndexCache; #define CACHE_VERSION(cache) atomic_load_32(&cache->version) + typedef struct CacheTerm { // key char* colVal; diff --git a/source/libs/index/inc/indexComm.h b/source/libs/index/inc/indexComm.h index 0d8418ba65..3b07429089 100644 --- a/source/libs/index/inc/indexComm.h +++ b/source/libs/index/inc/indexComm.h @@ -24,7 +24,7 @@ extern char JSON_COLUMN[]; extern char JSON_VALUE_DELIM; char* indexPackJsonData(SIndexTerm* itm); - +char* indexPackJsonDataPrefix(SIndexTerm* itm, int32_t* skip); #ifdef __cplusplus } #endif diff --git a/source/libs/index/src/indexCache.c b/source/libs/index/src/indexCache.c index ad82dd0748..b3ae7b7dbe 100644 --- a/source/libs/index/src/indexCache.c +++ b/source/libs/index/src/indexCache.c @@ -86,9 +86,18 @@ static TExeCond tCompareLessThan(void* a, void* b, int8_t type) { __compar_fn_t func = getComparFunc(type, 0); return tDoCommpare(func, QUERY_LESS_THAN, a, b); } -static TExeCond tCompareLessEqual(void* a, void* b, int8_t type) { return MATCH; } -static TExeCond tCompareGreaterThan(void* a, void* b, int8_t type) { return MATCH; } -static TExeCond tCompareGreaterEqual(void* a, void* b, int8_t type) { return MATCH; } +static TExeCond tCompareLessEqual(void* a, void* b, int8_t type) { + __compar_fn_t func = getComparFunc(type, 0); + return tDoCommpare(func, QUERY_LESS_EQUAL, a, b); +} +static TExeCond tCompareGreaterThan(void* a, void* b, int8_t type) { + __compar_fn_t func = getComparFunc(type, 0); + return tDoCommpare(func, QUERY_GREATER_THAN, a, b); +} +static TExeCond tCompareGreaterEqual(void* a, void* b, int8_t type) { + __compar_fn_t func = getComparFunc(type, 0); + return tDoCommpare(func, QUERY_GREATER_EQUAL, a, b); +} static TExeCond (*rangeCompare[])(void* a, void* b, int8_t type) = {tCompareLessThan, tCompareLessEqual, tCompareGreaterThan, tCompareGreaterEqual}; @@ -109,10 +118,12 @@ static int32_t cacheSearchTerm(void* cache, SIndexTerm* term, SIdxTempResult* tr } MemTable* mem = cache; IndexCache* pCache = mem->pCache; - CacheTerm ct = {.colVal = term->colVal, .version = atomic_load_32(&pCache->version)}; - CacheTerm* pCt = &ct; - char* key = indexCacheTermGet(&ct); + CacheTerm* pCt = taosMemoryCalloc(1, sizeof(CacheTerm)); + pCt->colVal = term->colVal; + pCt->version = atomic_load_32(&pCache->version); + + char* key = indexCacheTermGet(pCt); SSkipListIterator* iter = tSkipListCreateIterFromVal(mem->mem, key, TSDB_DATA_TYPE_BINARY, TSDB_ORDER_ASC); while (tSkipListIterNext(iter)) { @@ -133,6 +144,8 @@ static int32_t cacheSearchTerm(void* cache, SIndexTerm* term, SIdxTempResult* tr break; } } + + taosMemoryFree(pCt); tSkipListDestroyIter(iter); return 0; } @@ -153,6 +166,7 @@ static int32_t cacheSearchCompareFunc(void* cache, SIndexTerm* term, SIdxTempRes if (cache == NULL) { return 0; } + _cache_range_compare cmpFn = rangeCompare[type]; MemTable* mem = cache; @@ -204,6 +218,48 @@ static int32_t cacheSearchGreaterEqual(void* cache, SIndexTerm* term, SIdxTempRe } static int32_t cacheSearchTerm_JSON(void* cache, SIndexTerm* term, SIdxTempResult* tr, STermValueType* s) { + if (cache == NULL) { + return 0; + } + MemTable* mem = cache; + IndexCache* pCache = mem->pCache; + + CacheTerm* pCt = taosMemoryCalloc(1, sizeof(CacheTerm)); + pCt->colVal = term->colVal; + pCt->version = atomic_load_32(&pCache->version); + + char* exBuf = NULL; + if (INDEX_TYPE_CONTAIN_EXTERN_TYPE(term->colType, TSDB_DATA_TYPE_JSON)) { + exBuf = indexPackJsonData(term); + pCt->colVal = exBuf; + } + char* key = indexCacheTermGet(pCt); + + SSkipListIterator* iter = tSkipListCreateIterFromVal(mem->mem, key, TSDB_DATA_TYPE_BINARY, TSDB_ORDER_ASC); + while (tSkipListIterNext(iter)) { + SSkipListNode* node = tSkipListIterGet(iter); + if (node == NULL) { + break; + } + CacheTerm* c = (CacheTerm*)SL_GET_NODE_DATA(node); + if (0 == strcmp(c->colVal, pCt->colVal)) { + if (c->operaType == ADD_VALUE) { + INDEX_MERGE_ADD_DEL(tr->deled, tr->added, c->uid) + // taosArrayPush(result, &c->uid); + *s = kTypeValue; + } else if (c->operaType == DEL_VALUE) { + INDEX_MERGE_ADD_DEL(tr->added, tr->deled, c->uid) + } + } else { + break; + } + } + + taosMemoryFree(pCt); + taosMemoryFree(exBuf); + tSkipListDestroyIter(iter); + return 0; + return TSDB_CODE_SUCCESS; } static int32_t cacheSearchPrefix_JSON(void* cache, SIndexTerm* term, SIdxTempResult* tr, STermValueType* s) { @@ -233,6 +289,56 @@ static int32_t cacheSearchRange_JSON(void* cache, SIndexTerm* term, SIdxTempResu static int32_t cacheSearchCompareFunc_JSON(void* cache, SIndexTerm* term, SIdxTempResult* tr, STermValueType* s, RangeType type) { + if (cache == NULL) { + return 0; + } + _cache_range_compare cmpFn = rangeCompare[type]; + + MemTable* mem = cache; + IndexCache* pCache = mem->pCache; + + CacheTerm* pCt = taosMemoryCalloc(1, sizeof(CacheTerm)); + pCt->colVal = term->colVal; + pCt->version = atomic_load_32(&pCache->version); + + int8_t dType = INDEX_TYPE_GET_TYPE(term->colType); + int skip = 0; + char* exBuf = NULL; + + if (INDEX_TYPE_CONTAIN_EXTERN_TYPE(term->colType, TSDB_DATA_TYPE_JSON)) { + exBuf = indexPackJsonDataPrefix(term, &skip); + pCt->colVal = exBuf; + } + char* key = indexCacheTermGet(pCt); + + SSkipListIterator* iter = tSkipListCreateIterFromVal(mem->mem, key, TSDB_DATA_TYPE_BINARY, TSDB_ORDER_ASC); + while (tSkipListIterNext(iter)) { + SSkipListNode* node = tSkipListIterGet(iter); + if (node == NULL) { + break; + } + CacheTerm* c = (CacheTerm*)SL_GET_NODE_DATA(node); + + TExeCond cond = cmpFn(c->colVal + skip, term->colVal, dType); + if (cond == MATCH) { + if (c->operaType == ADD_VALUE) { + INDEX_MERGE_ADD_DEL(tr->deled, tr->added, c->uid) + // taosArrayPush(result, &c->uid); + *s = kTypeValue; + } else if (c->operaType == DEL_VALUE) { + INDEX_MERGE_ADD_DEL(tr->added, tr->deled, c->uid) + } + } else if (cond == CONTINUE) { + continue; + } else if (cond == BREAK) { + break; + } + } + + taosMemoryFree(pCt); + taosMemoryFree(exBuf); + tSkipListDestroyIter(iter); + return TSDB_CODE_SUCCESS; } static int32_t cacheSearchRange(void* cache, SIndexTerm* term, SIdxTempResult* tr, STermValueType* s) { @@ -408,6 +514,7 @@ static void indexCacheMakeRoomForWrite(IndexCache* cache) { indexCacheRef(cache); cache->imm = cache->mem; cache->mem = indexInternalCacheCreate(cache->type); + cache->mem->pCache = cache; cache->occupiedMem = 0; // sched to merge // unref cache in bgwork diff --git a/source/libs/index/src/indexComm.c b/source/libs/index/src/indexComm.c index 4dea5fa011..cdd7b35675 100644 --- a/source/libs/index/src/indexComm.c +++ b/source/libs/index/src/indexComm.c @@ -46,3 +46,30 @@ char* indexPackJsonData(SIndexTerm* itm) { return buf; } +char* indexPackJsonDataPrefix(SIndexTerm* itm, int32_t* skip) { + /* + * |<-----colname---->|<-----dataType---->|<--------colVal---------->| + * |<-----string----->|<-----uint8_t----->|<----depend on dataType-->| + */ + uint8_t ty = INDEX_TYPE_GET_TYPE(itm->colType); + + int32_t sz = itm->nColName + itm->nColVal + sizeof(uint8_t) + sizeof(JSON_VALUE_DELIM) * 2 + 1; + char* buf = (char*)taosMemoryCalloc(1, sz); + char* p = buf; + + memcpy(p, itm->colName, itm->nColName); + p += itm->nColName; + + memcpy(p, &JSON_VALUE_DELIM, sizeof(JSON_VALUE_DELIM)); + p += sizeof(JSON_VALUE_DELIM); + + memcpy(p, &ty, sizeof(ty)); + p += sizeof(ty); + + memcpy(p, &JSON_VALUE_DELIM, sizeof(JSON_VALUE_DELIM)); + p += sizeof(JSON_VALUE_DELIM); + + *skip = p - buf; + + return buf; +} diff --git a/source/libs/index/src/indexTfile.c b/source/libs/index/src/indexTfile.c index 9edd868272..b5551e825f 100644 --- a/source/libs/index/src/indexTfile.c +++ b/source/libs/index/src/indexTfile.c @@ -308,20 +308,20 @@ static int32_t tfSearchRegex(void* reader, SIndexTerm* tem, SIdxTempResult* tr) } static int32_t tfSearchCompareFunc(void* reader, SIndexTerm* tem, SIdxTempResult* tr, RangeType type) { - bool hasJson = INDEX_TYPE_CONTAIN_EXTERN_TYPE(tem->colType, TSDB_DATA_TYPE_JSON); - int ret = 0; - char* p = tem->colVal; - uint64_t sz = tem->nColVal; + bool hasJson = INDEX_TYPE_CONTAIN_EXTERN_TYPE(tem->colType, TSDB_DATA_TYPE_JSON); + int ret = 0; + char* p = tem->colVal; + int skip = 0; + if (hasJson) { - p = indexPackJsonData(tem); - sz = strlen(p); + p = indexPackJsonDataPrefix(tem, &skip); } SArray* offsets = taosArrayInit(16, sizeof(uint64_t)); AutomationCtx* ctx = automCtxCreate((void*)p, AUTOMATION_ALWAYS); FstStreamBuilder* sb = fstSearch(((TFileReader*)reader)->fst, ctx); - FstSlice h = fstSliceCreate((uint8_t*)p, sz); + FstSlice h = fstSliceCreate((uint8_t*)p, skip); fstStreamBuilderSetRange(sb, &h, type); fstSliceDestroy(&h);