From 70171a66e018cea12d1f9f46a3b059113d91e6bb Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Sat, 14 May 2022 20:52:46 +0800 Subject: [PATCH] enh(index): fix sanitizer error --- source/libs/executor/src/indexoperator.c | 4 + .../executor/test/index_executor_tests.cpp | 4 +- source/libs/index/inc/indexComm.h | 2 + source/libs/index/inc/indexInt.h | 9 +- source/libs/index/src/index.c | 92 +++++++++++--- source/libs/index/src/indexCache.c | 3 + source/libs/index/src/indexComm.c | 114 +++++++++++++++++- source/libs/index/src/indexFstUtil.c | 5 +- source/libs/index/src/indexTfile.c | 13 +- source/libs/index/test/jsonUT.cc | 20 +++ 10 files changed, 233 insertions(+), 33 deletions(-) diff --git a/source/libs/executor/src/indexoperator.c b/source/libs/executor/src/indexoperator.c index c17fcacf1f..2c204e9356 100644 --- a/source/libs/executor/src/indexoperator.c +++ b/source/libs/executor/src/indexoperator.c @@ -398,6 +398,10 @@ static int32_t sifExecOper(SOperatorNode *node, SIFCtx *ctx, SIFParam *output) { output->status = SFLT_ACCURATE_INDEX; } + if (ctx->noExec) { + SIF_RET(code); + } + return operFn(¶ms[0], nParam > 1 ? ¶ms[1] : NULL, output); _return: taosMemoryFree(params); diff --git a/source/libs/executor/test/index_executor_tests.cpp b/source/libs/executor/test/index_executor_tests.cpp index 5b03da034e..2449bd1da1 100644 --- a/source/libs/executor/test/index_executor_tests.cpp +++ b/source/libs/executor/test/index_executor_tests.cpp @@ -249,7 +249,7 @@ TEST(testCase, index_filter_varify) { sifMakeOpNode(&opNode, OP_TYPE_LOWER_THAN, TSDB_DATA_TYPE_DOUBLE, pLeft, pRight); SIdxFltStatus st = idxGetFltStatus(opNode); - EXPECT_EQ(st, SFLT_COARSE_INDEX); + EXPECT_EQ(st, SFLT_ACCURATE_INDEX); nodesDestroyNode(res); } { @@ -269,7 +269,7 @@ TEST(testCase, index_filter_varify) { sifMakeOpNode(&opNode, OP_TYPE_GREATER_THAN, TSDB_DATA_TYPE_DOUBLE, pLeft, pRight); SIdxFltStatus st = idxGetFltStatus(opNode); - EXPECT_EQ(st, SFLT_COARSE_INDEX); + EXPECT_EQ(st, SFLT_ACCURATE_INDEX); nodesDestroyNode(res); } } diff --git a/source/libs/index/inc/indexComm.h b/source/libs/index/inc/indexComm.h index 4cab71f92c..043404f48f 100644 --- a/source/libs/index/inc/indexComm.h +++ b/source/libs/index/inc/indexComm.h @@ -37,6 +37,8 @@ TExeCond tDoCommpare(__compar_fn_t func, int8_t comType, void* a, void* b); _cache_range_compare indexGetCompare(RangeType ty); +int32_t indexConvertData(void* src, int8_t type, void** dst); + #ifdef __cplusplus } #endif diff --git a/source/libs/index/inc/indexInt.h b/source/libs/index/inc/indexInt.h index 7b7050d80e..27c380beaf 100644 --- a/source/libs/index/inc/indexInt.h +++ b/source/libs/index/inc/indexInt.h @@ -46,9 +46,7 @@ typedef struct SIndexStat { } SIndexStat; struct SIndex { -#ifdef USE_LUCENE - index_t* index; -#endif + int64_t refId; void* cache; void* tindex; SHashObj* colObj; // < field name, field id> @@ -124,6 +122,11 @@ typedef struct TFileCacheKey { int indexFlushCacheToTFile(SIndex* sIdx, void*); +int64_t indexAddRef(void* p); +int32_t indexRemoveRef(int64_t ref); +void indexAcquireRef(int64_t ref); +void indexReleaseRef(int64_t ref); + int32_t indexSerialCacheKey(ICacheKey* key, char* buf); // int32_t indexSerialKey(ICacheKey* key, char* buf); // int32_t indexSerialTermKey(SIndexTerm* itm, char* buf); diff --git a/source/libs/index/src/index.c b/source/libs/index/src/index.c index d56413f840..46f2f7a93b 100644 --- a/source/libs/index/src/index.c +++ b/source/libs/index/src/index.c @@ -19,7 +19,10 @@ #include "indexInt.h" #include "indexTfile.h" #include "indexUtil.h" +#include "tcoding.h" +#include "tdataformat.h" #include "tdef.h" +#include "tref.h" #include "tsched.h" #ifdef USE_LUCENE @@ -27,36 +30,40 @@ #endif #define INDEX_NUM_OF_THREADS 4 -#define INDEX_QUEUE_SIZE 200 +#define INDEX_QUEUE_SIZE 200 -void* indexQhandle = NULL; - -#define INDEX_DATA_BOOL_NULL 0x02 -#define INDEX_DATA_TINYINT_NULL 0x80 -#define INDEX_DATA_SMALLINT_NULL 0x8000 -#define INDEX_DATA_INT_NULL 0x80000000L -#define INDEX_DATA_BIGINT_NULL 0x8000000000000000L +#define INDEX_DATA_BOOL_NULL 0x02 +#define INDEX_DATA_TINYINT_NULL 0x80 +#define INDEX_DATA_SMALLINT_NULL 0x8000 +#define INDEX_DATA_INT_NULL 0x80000000L +#define INDEX_DATA_BIGINT_NULL 0x8000000000000000L #define INDEX_DATA_TIMESTAMP_NULL TSDB_DATA_BIGINT_NULL -#define INDEX_DATA_FLOAT_NULL 0x7FF00000 // it is an NAN -#define INDEX_DATA_DOUBLE_NULL 0x7FFFFF0000000000L // an NAN -#define INDEX_DATA_NCHAR_NULL 0xFFFFFFFF -#define INDEX_DATA_BINARY_NULL 0xFF -#define INDEX_DATA_JSON_NULL 0xFFFFFFFF -#define INDEX_DATA_JSON_null 0xFFFFFFFE +#define INDEX_DATA_FLOAT_NULL 0x7FF00000 // it is an NAN +#define INDEX_DATA_DOUBLE_NULL 0x7FFFFF0000000000L // an NAN +#define INDEX_DATA_NCHAR_NULL 0xFFFFFFFF +#define INDEX_DATA_BINARY_NULL 0xFF +#define INDEX_DATA_JSON_NULL 0xFFFFFFFF +#define INDEX_DATA_JSON_null 0xFFFFFFFE #define INDEX_DATA_JSON_NOT_NULL 0x01 -#define INDEX_DATA_UTINYINT_NULL 0xFF +#define INDEX_DATA_UTINYINT_NULL 0xFF #define INDEX_DATA_USMALLINT_NULL 0xFFFF -#define INDEX_DATA_UINT_NULL 0xFFFFFFFF -#define INDEX_DATA_UBIGINT_NULL 0xFFFFFFFFFFFFFFFFL +#define INDEX_DATA_UINT_NULL 0xFFFFFFFF +#define INDEX_DATA_UBIGINT_NULL 0xFFFFFFFFFFFFFFFFL -#define INDEX_DATA_NULL_STR "NULL" +#define INDEX_DATA_NULL_STR "NULL" #define INDEX_DATA_NULL_STR_L "null" +void* indexQhandle = NULL; +int32_t indexRefMgt; + +static void indexDestroy(void* sIdx); + void indexInit() { // refactor later indexQhandle = taosInitScheduler(INDEX_QUEUE_SIZE, INDEX_NUM_OF_THREADS, "index"); + indexRefMgt = taosOpenRef(10, indexDestroy); } void indexCleanUp() { // refacto later @@ -100,7 +107,12 @@ int indexOpen(SIndexOpts* opts, const char* path, SIndex** index) { sIdx->cVersion = 1; sIdx->path = tstrdup(path); taosThreadMutexInit(&sIdx->mtx, NULL); + + sIdx->refId = indexAddRef(sIdx); + taosAcquireRef(indexRefMgt, sIdx->refId); + *index = sIdx; + return 0; END: @@ -112,8 +124,9 @@ END: return -1; } -void indexClose(SIndex* sIdx) { - void* iter = taosHashIterate(sIdx->colObj, NULL); +void indexDestroy(void* handle) { + SIndex* sIdx = handle; + void* iter = taosHashIterate(sIdx->colObj, NULL); while (iter) { IndexCache** pCache = iter; if (*pCache) { @@ -128,6 +141,27 @@ void indexClose(SIndex* sIdx) { taosMemoryFree(sIdx); return; } +void indexClose(SIndex* sIdx) { + indexReleaseRef(sIdx->refId); + indexRemoveRef(sIdx->refId); +} +int64_t indexAddRef(void* p) { + // impl + return taosAddRef(indexRefMgt, p); +} +int32_t indexRemoveRef(int64_t ref) { + // impl later + return taosRemoveRef(indexRefMgt, ref); +} + +void indexAcquireRef(int64_t ref) { + // impl + taosAcquireRef(indexRefMgt, ref); +} +void indexReleaseRef(int64_t ref) { + // impl + taosReleaseRef(indexRefMgt, ref); +} int indexPut(SIndex* index, SIndexMultiTerm* fVals, uint64_t uid) { // TODO(yihao): reduce the lock range @@ -222,6 +256,7 @@ SIndexTerm* indexTermCreate(int64_t suid, SIndexOperOnColumn oper, uint8_t colTy tm->operType = oper; tm->colType = colType; +#if 0 tm->colName = (char*)taosMemoryCalloc(1, nColName + 1); memcpy(tm->colName, colName, nColName); tm->nColName = nColName; @@ -229,6 +264,22 @@ SIndexTerm* indexTermCreate(int64_t suid, SIndexOperOnColumn oper, uint8_t colTy tm->colVal = (char*)taosMemoryCalloc(1, nColVal + 1); memcpy(tm->colVal, colVal, nColVal); tm->nColVal = nColVal; +#endif + +#if 1 + + tm->colName = (char*)taosMemoryCalloc(1, nColName + 1); + memcpy(tm->colName, colName, nColName); + tm->nColName = nColName; + + char* buf = NULL; + int32_t len = indexConvertData((void*)colVal, INDEX_TYPE_GET_TYPE(colType), (void**)&buf); + assert(len != -1); + + tm->colVal = buf; + tm->nColVal = len; + +#endif return tm; } @@ -457,6 +508,7 @@ int indexFlushCacheToTFile(SIndex* sIdx, void* cache) { } else { indexInfo("success to merge , time cost: %" PRId64 "ms", cost / 1000); } + indexReleaseRef(sIdx->refId); return ret; } void iterateValueDestroy(IterateValue* value, bool destroy) { diff --git a/source/libs/index/src/indexCache.c b/source/libs/index/src/indexCache.c index 5294ac8c19..d4231619ec 100644 --- a/source/libs/index/src/indexCache.c +++ b/source/libs/index/src/indexCache.c @@ -460,8 +460,11 @@ int indexCacheSchedToMerge(IndexCache* pCache) { schedMsg.fp = doMergeWork; schedMsg.ahandle = pCache; schedMsg.thandle = NULL; + // schedMsg.thandle = taosMemoryCalloc(1, sizeof(int64_t)); + // memcpy((char*)(schedMsg.thandle), (char*)&(pCache->index->refId), sizeof(int64_t)); schedMsg.msg = NULL; + indexAcquireRef(pCache->index->refId); taosScheduleTask(indexQhandle, &schedMsg); return 0; diff --git a/source/libs/index/src/indexComm.c b/source/libs/index/src/indexComm.c index 9e85a6680a..ac26ed1fab 100644 --- a/source/libs/index/src/indexComm.c +++ b/source/libs/index/src/indexComm.c @@ -16,25 +16,33 @@ #include "indexComm.h" #include "index.h" #include "indexInt.h" +#include "tcoding.h" #include "tcompare.h" +#include "tdataformat.h" char JSON_COLUMN[] = "JSON"; char JSON_VALUE_DELIM = '&'; +static __compar_fn_t indexGetCompar(int8_t type) { + if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR) { + return (__compar_fn_t)strcmp; + } + return getComparFunc(type, 0); +} static TExeCond tCompareLessThan(void* a, void* b, int8_t type) { - __compar_fn_t func = getComparFunc(type, 0); + __compar_fn_t func = indexGetCompar(type); return tDoCommpare(func, QUERY_LESS_THAN, a, b); } static TExeCond tCompareLessEqual(void* a, void* b, int8_t type) { - __compar_fn_t func = getComparFunc(type, 0); + __compar_fn_t func = indexGetCompar(type); return tDoCommpare(func, QUERY_LESS_EQUAL, a, b); } static TExeCond tCompareGreaterThan(void* a, void* b, int8_t type) { - __compar_fn_t func = getComparFunc(type, 0); + __compar_fn_t func = indexGetCompar(type); return tDoCommpare(func, QUERY_GREATER_THAN, a, b); } static TExeCond tCompareGreaterEqual(void* a, void* b, int8_t type) { - __compar_fn_t func = getComparFunc(type, 0); + __compar_fn_t func = indexGetCompar(type); return tDoCommpare(func, QUERY_GREATER_EQUAL, a, b); } @@ -120,3 +128,101 @@ char* indexPackJsonDataPrefix(SIndexTerm* itm, int32_t* skip) { return buf; } + +int32_t indexConvertData(void* src, int8_t type, void** dst) { + int tlen = -1; + switch (type) { + case TSDB_DATA_TYPE_TIMESTAMP: + tlen = taosEncodeFixedI64(NULL, *(int64_t*)src); + *dst = taosMemoryCalloc(1, tlen + 1); + tlen = taosEncodeFixedI64(dst, *(int64_t*)src); + break; + case TSDB_DATA_TYPE_BOOL: + case TSDB_DATA_TYPE_UTINYINT: + tlen = taosEncodeFixedU8(NULL, *(uint8_t*)src); + *dst = taosMemoryCalloc(1, tlen + 1); + tlen = taosEncodeFixedU8(dst, *(uint8_t*)src); + break; + case TSDB_DATA_TYPE_TINYINT: + tlen = taosEncodeFixedI8(NULL, *(uint8_t*)src); + *dst = taosMemoryCalloc(1, tlen + 1); + tlen = taosEncodeFixedI8(dst, *(uint8_t*)src); + break; + case TSDB_DATA_TYPE_SMALLINT: + tlen = taosEncodeFixedI16(NULL, *(int16_t*)src); + *dst = taosMemoryCalloc(1, tlen + 1); + tlen = taosEncodeFixedI16(dst, *(int16_t*)src); + break; + case TSDB_DATA_TYPE_USMALLINT: + tlen = taosEncodeFixedU16(NULL, *(uint16_t*)src); + *dst = taosMemoryCalloc(1, tlen + 1); + tlen = taosEncodeFixedU16(dst, *(uint16_t*)src); + break; + case TSDB_DATA_TYPE_INT: + tlen = taosEncodeFixedI32(NULL, *(int32_t*)src); + *dst = taosMemoryCalloc(1, tlen + 1); + tlen = taosEncodeFixedI32(dst, *(int32_t*)src); + break; + case TSDB_DATA_TYPE_FLOAT: + tlen = taosEncodeBinary(NULL, src, sizeof(float)); + *dst = taosMemoryCalloc(1, tlen + 1); + tlen = taosEncodeBinary(dst, src, sizeof(float)); + break; + case TSDB_DATA_TYPE_UINT: + tlen = taosEncodeFixedU32(NULL, *(uint32_t*)src); + *dst = taosMemoryCalloc(1, tlen + 1); + tlen = taosEncodeFixedU32(dst, *(uint32_t*)src); + break; + case TSDB_DATA_TYPE_BIGINT: + tlen = taosEncodeFixedI64(NULL, *(uint32_t*)src); + *dst = taosMemoryCalloc(1, tlen + 1); + tlen = taosEncodeFixedI64(dst, *(uint32_t*)src); + break; + case TSDB_DATA_TYPE_DOUBLE: + tlen = taosEncodeBinary(NULL, src, sizeof(double)); + *dst = taosMemoryCalloc(1, tlen + 1); + tlen = taosEncodeBinary(dst, src, sizeof(double)); + break; + case TSDB_DATA_TYPE_UBIGINT: + tlen = taosEncodeFixedU64(NULL, *(uint32_t*)src); + *dst = taosMemoryCalloc(1, tlen + 1); + tlen = taosEncodeFixedU64(dst, *(uint32_t*)src); + break; + case TSDB_DATA_TYPE_NCHAR: { + tlen = taosEncodeBinary(NULL, varDataVal(src), varDataLen(src)); + *dst = taosMemoryCalloc(1, tlen + 1); + tlen = taosEncodeBinary(dst, varDataVal(src), varDataLen(src)); + + break; + } + case TSDB_DATA_TYPE_VARCHAR: { // TSDB_DATA_TYPE_BINARY +#if 1 + tlen = taosEncodeBinary(NULL, src, strlen(src)); + *dst = taosMemoryCalloc(1, tlen + 1); + tlen = taosEncodeBinary(dst, src, strlen(src)); + break; +#endif + } + case TSDB_DATA_TYPE_VARBINARY: +#if 1 + tlen = taosEncodeBinary(NULL, src, strlen(src)); + *dst = taosMemoryCalloc(1, tlen + 1); + tlen = taosEncodeBinary(dst, src, strlen(src)); + break; +#endif + default: + TASSERT(0); + break; + } + *dst = *dst - tlen; + if (type != TSDB_DATA_TYPE_BINARY && type != TSDB_DATA_TYPE_NCHAR && type != TSDB_DATA_TYPE_VARBINARY && + type == TSDB_DATA_TYPE_VARCHAR) { + uint8_t* p = *dst; + for (int i = 0; i < tlen; i++) { + if (p[i] == 0) { + p[i] = (uint8_t)'0'; + } + } + } + return tlen; +} diff --git a/source/libs/index/src/indexFstUtil.c b/source/libs/index/src/indexFstUtil.c index ec9a6943dc..a980c6b740 100644 --- a/source/libs/index/src/indexFstUtil.c +++ b/source/libs/index/src/indexFstUtil.c @@ -82,7 +82,10 @@ FstSlice fstSliceCreate(uint8_t* data, uint64_t len) { str->ref = 1; str->len = len; str->data = taosMemoryMalloc(len * sizeof(uint8_t)); - memcpy(str->data, data, len); + + if (data != NULL) { + memcpy(str->data, data, len); + } FstSlice s = {.str = str, .start = 0, .end = len - 1}; return s; diff --git a/source/libs/index/src/indexTfile.c b/source/libs/index/src/indexTfile.c index 4cc2a4975f..b787da117d 100644 --- a/source/libs/index/src/indexTfile.c +++ b/source/libs/index/src/indexTfile.c @@ -469,13 +469,19 @@ static int32_t tfSearchCompareFunc_JSON(void* reader, SIndexTerm* tem, SIdxTempR while ((rt = streamWithStateNextWith(st, NULL)) != NULL) { FstSlice* s = &rt->data; - char* ch = (char*)fstSliceData(s, NULL); - if (0 != strncmp(ch, p, skip)) { + int32_t sz = 0; + char* ch = (char*)fstSliceData(s, &sz); + char* tmp = taosMemoryCalloc(1, sz + 1); + memcpy(tmp, ch, sz); + + if (0 != strncmp(tmp, p, skip)) { swsResultDestroy(rt); + taosMemoryFree(tmp); break; } - TExeCond cond = cmpFn(ch + skip, tem->colVal, tem->colType); + TExeCond cond = cmpFn(tmp + skip, tem->colVal, INDEX_TYPE_GET_TYPE(tem->colType)); + if (MATCH == cond) { tfileReaderLoadTableIds((TFileReader*)reader, rt->out.out, tr->total); } else if (CONTINUE == cond) { @@ -483,6 +489,7 @@ static int32_t tfSearchCompareFunc_JSON(void* reader, SIndexTerm* tem, SIdxTempR swsResultDestroy(rt); break; } + taosMemoryFree(tmp); swsResultDestroy(rt); } streamWithStateDestroy(st); diff --git a/source/libs/index/test/jsonUT.cc b/source/libs/index/test/jsonUT.cc index 08d58da07f..3de7cb66f2 100644 --- a/source/libs/index/test/jsonUT.cc +++ b/source/libs/index/test/jsonUT.cc @@ -17,12 +17,32 @@ #include "tutil.h" static std::string dir = "/tmp/json"; +static std::string logDir = "/tmp/log"; + +static void initLog() { + const char* defaultLogFileNamePrefix = "taoslog"; + const int32_t maxLogFileNum = 10; + + tsAsyncLog = 0; + sDebugFlag = 143; + strcpy(tsLogDir, logDir.c_str()); + taosRemoveDir(tsLogDir); + taosMkDir(tsLogDir); + + if (taosInitLog(defaultLogFileNamePrefix, maxLogFileNum) < 0) { + printf("failed to open log file in directory:%s\n", tsLogDir); + } +} class JsonEnv : public ::testing::Test { protected: virtual void SetUp() { + taosRemoveDir(logDir.c_str()); + taosMkDir(logDir.c_str()); taosRemoveDir(dir.c_str()); taosMkDir(dir.c_str()); printf("set up\n"); + + initLog(); opts = indexOptsCreate(); int ret = tIndexJsonOpen(opts, dir.c_str(), &index); assert(ret == 0);