Merge pull request #12481 from taosdata/fix/avoid_invalid_rw
enh(index): fix sanitizer error
This commit is contained in:
commit
7f08c25030
|
@ -398,6 +398,10 @@ static int32_t sifExecOper(SOperatorNode *node, SIFCtx *ctx, SIFParam *output) {
|
|||
output->status = SFLT_ACCURATE_INDEX;
|
||||
}
|
||||
|
||||
if (ctx->noExec) {
|
||||
SIF_RET(code);
|
||||
}
|
||||
|
||||
return operFn(¶ms[0], nParam > 1 ? ¶ms[1] : NULL, output);
|
||||
_return:
|
||||
taosMemoryFree(params);
|
||||
|
|
|
@ -249,7 +249,7 @@ TEST(testCase, index_filter_varify) {
|
|||
sifMakeOpNode(&opNode, OP_TYPE_LOWER_THAN, TSDB_DATA_TYPE_DOUBLE, pLeft, pRight);
|
||||
|
||||
SIdxFltStatus st = idxGetFltStatus(opNode);
|
||||
EXPECT_EQ(st, SFLT_COARSE_INDEX);
|
||||
EXPECT_EQ(st, SFLT_ACCURATE_INDEX);
|
||||
nodesDestroyNode(res);
|
||||
}
|
||||
{
|
||||
|
@ -269,7 +269,7 @@ TEST(testCase, index_filter_varify) {
|
|||
sifMakeOpNode(&opNode, OP_TYPE_GREATER_THAN, TSDB_DATA_TYPE_DOUBLE, pLeft, pRight);
|
||||
|
||||
SIdxFltStatus st = idxGetFltStatus(opNode);
|
||||
EXPECT_EQ(st, SFLT_COARSE_INDEX);
|
||||
EXPECT_EQ(st, SFLT_ACCURATE_INDEX);
|
||||
nodesDestroyNode(res);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -37,6 +37,8 @@ TExeCond tDoCommpare(__compar_fn_t func, int8_t comType, void* a, void* b);
|
|||
|
||||
_cache_range_compare indexGetCompare(RangeType ty);
|
||||
|
||||
int32_t indexConvertData(void* src, int8_t type, void** dst);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
|
|
@ -46,9 +46,7 @@ typedef struct SIndexStat {
|
|||
} SIndexStat;
|
||||
|
||||
struct SIndex {
|
||||
#ifdef USE_LUCENE
|
||||
index_t* index;
|
||||
#endif
|
||||
int64_t refId;
|
||||
void* cache;
|
||||
void* tindex;
|
||||
SHashObj* colObj; // < field name, field id>
|
||||
|
@ -124,6 +122,11 @@ typedef struct TFileCacheKey {
|
|||
|
||||
int indexFlushCacheToTFile(SIndex* sIdx, void*);
|
||||
|
||||
int64_t indexAddRef(void* p);
|
||||
int32_t indexRemoveRef(int64_t ref);
|
||||
void indexAcquireRef(int64_t ref);
|
||||
void indexReleaseRef(int64_t ref);
|
||||
|
||||
int32_t indexSerialCacheKey(ICacheKey* key, char* buf);
|
||||
// int32_t indexSerialKey(ICacheKey* key, char* buf);
|
||||
// int32_t indexSerialTermKey(SIndexTerm* itm, char* buf);
|
||||
|
|
|
@ -19,7 +19,10 @@
|
|||
#include "indexInt.h"
|
||||
#include "indexTfile.h"
|
||||
#include "indexUtil.h"
|
||||
#include "tcoding.h"
|
||||
#include "tdataformat.h"
|
||||
#include "tdef.h"
|
||||
#include "tref.h"
|
||||
#include "tsched.h"
|
||||
|
||||
#ifdef USE_LUCENE
|
||||
|
@ -27,36 +30,40 @@
|
|||
#endif
|
||||
|
||||
#define INDEX_NUM_OF_THREADS 4
|
||||
#define INDEX_QUEUE_SIZE 200
|
||||
#define INDEX_QUEUE_SIZE 200
|
||||
|
||||
void* indexQhandle = NULL;
|
||||
|
||||
#define INDEX_DATA_BOOL_NULL 0x02
|
||||
#define INDEX_DATA_TINYINT_NULL 0x80
|
||||
#define INDEX_DATA_SMALLINT_NULL 0x8000
|
||||
#define INDEX_DATA_INT_NULL 0x80000000L
|
||||
#define INDEX_DATA_BIGINT_NULL 0x8000000000000000L
|
||||
#define INDEX_DATA_BOOL_NULL 0x02
|
||||
#define INDEX_DATA_TINYINT_NULL 0x80
|
||||
#define INDEX_DATA_SMALLINT_NULL 0x8000
|
||||
#define INDEX_DATA_INT_NULL 0x80000000L
|
||||
#define INDEX_DATA_BIGINT_NULL 0x8000000000000000L
|
||||
#define INDEX_DATA_TIMESTAMP_NULL TSDB_DATA_BIGINT_NULL
|
||||
|
||||
#define INDEX_DATA_FLOAT_NULL 0x7FF00000 // it is an NAN
|
||||
#define INDEX_DATA_DOUBLE_NULL 0x7FFFFF0000000000L // an NAN
|
||||
#define INDEX_DATA_NCHAR_NULL 0xFFFFFFFF
|
||||
#define INDEX_DATA_BINARY_NULL 0xFF
|
||||
#define INDEX_DATA_JSON_NULL 0xFFFFFFFF
|
||||
#define INDEX_DATA_JSON_null 0xFFFFFFFE
|
||||
#define INDEX_DATA_FLOAT_NULL 0x7FF00000 // it is an NAN
|
||||
#define INDEX_DATA_DOUBLE_NULL 0x7FFFFF0000000000L // an NAN
|
||||
#define INDEX_DATA_NCHAR_NULL 0xFFFFFFFF
|
||||
#define INDEX_DATA_BINARY_NULL 0xFF
|
||||
#define INDEX_DATA_JSON_NULL 0xFFFFFFFF
|
||||
#define INDEX_DATA_JSON_null 0xFFFFFFFE
|
||||
#define INDEX_DATA_JSON_NOT_NULL 0x01
|
||||
|
||||
#define INDEX_DATA_UTINYINT_NULL 0xFF
|
||||
#define INDEX_DATA_UTINYINT_NULL 0xFF
|
||||
#define INDEX_DATA_USMALLINT_NULL 0xFFFF
|
||||
#define INDEX_DATA_UINT_NULL 0xFFFFFFFF
|
||||
#define INDEX_DATA_UBIGINT_NULL 0xFFFFFFFFFFFFFFFFL
|
||||
#define INDEX_DATA_UINT_NULL 0xFFFFFFFF
|
||||
#define INDEX_DATA_UBIGINT_NULL 0xFFFFFFFFFFFFFFFFL
|
||||
|
||||
#define INDEX_DATA_NULL_STR "NULL"
|
||||
#define INDEX_DATA_NULL_STR "NULL"
|
||||
#define INDEX_DATA_NULL_STR_L "null"
|
||||
|
||||
void* indexQhandle = NULL;
|
||||
int32_t indexRefMgt;
|
||||
|
||||
static void indexDestroy(void* sIdx);
|
||||
|
||||
void indexInit() {
|
||||
// refactor later
|
||||
indexQhandle = taosInitScheduler(INDEX_QUEUE_SIZE, INDEX_NUM_OF_THREADS, "index");
|
||||
indexRefMgt = taosOpenRef(10, indexDestroy);
|
||||
}
|
||||
void indexCleanUp() {
|
||||
// refacto later
|
||||
|
@ -100,7 +107,12 @@ int indexOpen(SIndexOpts* opts, const char* path, SIndex** index) {
|
|||
sIdx->cVersion = 1;
|
||||
sIdx->path = tstrdup(path);
|
||||
taosThreadMutexInit(&sIdx->mtx, NULL);
|
||||
|
||||
sIdx->refId = indexAddRef(sIdx);
|
||||
taosAcquireRef(indexRefMgt, sIdx->refId);
|
||||
|
||||
*index = sIdx;
|
||||
|
||||
return 0;
|
||||
|
||||
END:
|
||||
|
@ -112,8 +124,9 @@ END:
|
|||
return -1;
|
||||
}
|
||||
|
||||
void indexClose(SIndex* sIdx) {
|
||||
void* iter = taosHashIterate(sIdx->colObj, NULL);
|
||||
void indexDestroy(void* handle) {
|
||||
SIndex* sIdx = handle;
|
||||
void* iter = taosHashIterate(sIdx->colObj, NULL);
|
||||
while (iter) {
|
||||
IndexCache** pCache = iter;
|
||||
if (*pCache) {
|
||||
|
@ -128,6 +141,27 @@ void indexClose(SIndex* sIdx) {
|
|||
taosMemoryFree(sIdx);
|
||||
return;
|
||||
}
|
||||
void indexClose(SIndex* sIdx) {
|
||||
indexReleaseRef(sIdx->refId);
|
||||
indexRemoveRef(sIdx->refId);
|
||||
}
|
||||
int64_t indexAddRef(void* p) {
|
||||
// impl
|
||||
return taosAddRef(indexRefMgt, p);
|
||||
}
|
||||
int32_t indexRemoveRef(int64_t ref) {
|
||||
// impl later
|
||||
return taosRemoveRef(indexRefMgt, ref);
|
||||
}
|
||||
|
||||
void indexAcquireRef(int64_t ref) {
|
||||
// impl
|
||||
taosAcquireRef(indexRefMgt, ref);
|
||||
}
|
||||
void indexReleaseRef(int64_t ref) {
|
||||
// impl
|
||||
taosReleaseRef(indexRefMgt, ref);
|
||||
}
|
||||
|
||||
int indexPut(SIndex* index, SIndexMultiTerm* fVals, uint64_t uid) {
|
||||
// TODO(yihao): reduce the lock range
|
||||
|
@ -222,6 +256,7 @@ SIndexTerm* indexTermCreate(int64_t suid, SIndexOperOnColumn oper, uint8_t colTy
|
|||
tm->operType = oper;
|
||||
tm->colType = colType;
|
||||
|
||||
#if 0
|
||||
tm->colName = (char*)taosMemoryCalloc(1, nColName + 1);
|
||||
memcpy(tm->colName, colName, nColName);
|
||||
tm->nColName = nColName;
|
||||
|
@ -229,6 +264,22 @@ SIndexTerm* indexTermCreate(int64_t suid, SIndexOperOnColumn oper, uint8_t colTy
|
|||
tm->colVal = (char*)taosMemoryCalloc(1, nColVal + 1);
|
||||
memcpy(tm->colVal, colVal, nColVal);
|
||||
tm->nColVal = nColVal;
|
||||
#endif
|
||||
|
||||
#if 1
|
||||
|
||||
tm->colName = (char*)taosMemoryCalloc(1, nColName + 1);
|
||||
memcpy(tm->colName, colName, nColName);
|
||||
tm->nColName = nColName;
|
||||
|
||||
char* buf = NULL;
|
||||
int32_t len = indexConvertData((void*)colVal, INDEX_TYPE_GET_TYPE(colType), (void**)&buf);
|
||||
assert(len != -1);
|
||||
|
||||
tm->colVal = buf;
|
||||
tm->nColVal = len;
|
||||
|
||||
#endif
|
||||
|
||||
return tm;
|
||||
}
|
||||
|
@ -457,6 +508,7 @@ int indexFlushCacheToTFile(SIndex* sIdx, void* cache) {
|
|||
} else {
|
||||
indexInfo("success to merge , time cost: %" PRId64 "ms", cost / 1000);
|
||||
}
|
||||
indexReleaseRef(sIdx->refId);
|
||||
return ret;
|
||||
}
|
||||
void iterateValueDestroy(IterateValue* value, bool destroy) {
|
||||
|
|
|
@ -460,8 +460,11 @@ int indexCacheSchedToMerge(IndexCache* pCache) {
|
|||
schedMsg.fp = doMergeWork;
|
||||
schedMsg.ahandle = pCache;
|
||||
schedMsg.thandle = NULL;
|
||||
// schedMsg.thandle = taosMemoryCalloc(1, sizeof(int64_t));
|
||||
// memcpy((char*)(schedMsg.thandle), (char*)&(pCache->index->refId), sizeof(int64_t));
|
||||
schedMsg.msg = NULL;
|
||||
|
||||
indexAcquireRef(pCache->index->refId);
|
||||
taosScheduleTask(indexQhandle, &schedMsg);
|
||||
|
||||
return 0;
|
||||
|
|
|
@ -16,25 +16,33 @@
|
|||
#include "indexComm.h"
|
||||
#include "index.h"
|
||||
#include "indexInt.h"
|
||||
#include "tcoding.h"
|
||||
#include "tcompare.h"
|
||||
#include "tdataformat.h"
|
||||
|
||||
char JSON_COLUMN[] = "JSON";
|
||||
char JSON_VALUE_DELIM = '&';
|
||||
|
||||
static __compar_fn_t indexGetCompar(int8_t type) {
|
||||
if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR) {
|
||||
return (__compar_fn_t)strcmp;
|
||||
}
|
||||
return getComparFunc(type, 0);
|
||||
}
|
||||
static TExeCond tCompareLessThan(void* a, void* b, int8_t type) {
|
||||
__compar_fn_t func = getComparFunc(type, 0);
|
||||
__compar_fn_t func = indexGetCompar(type);
|
||||
return tDoCommpare(func, QUERY_LESS_THAN, a, b);
|
||||
}
|
||||
static TExeCond tCompareLessEqual(void* a, void* b, int8_t type) {
|
||||
__compar_fn_t func = getComparFunc(type, 0);
|
||||
__compar_fn_t func = indexGetCompar(type);
|
||||
return tDoCommpare(func, QUERY_LESS_EQUAL, a, b);
|
||||
}
|
||||
static TExeCond tCompareGreaterThan(void* a, void* b, int8_t type) {
|
||||
__compar_fn_t func = getComparFunc(type, 0);
|
||||
__compar_fn_t func = indexGetCompar(type);
|
||||
return tDoCommpare(func, QUERY_GREATER_THAN, a, b);
|
||||
}
|
||||
static TExeCond tCompareGreaterEqual(void* a, void* b, int8_t type) {
|
||||
__compar_fn_t func = getComparFunc(type, 0);
|
||||
__compar_fn_t func = indexGetCompar(type);
|
||||
return tDoCommpare(func, QUERY_GREATER_EQUAL, a, b);
|
||||
}
|
||||
|
||||
|
@ -120,3 +128,101 @@ char* indexPackJsonDataPrefix(SIndexTerm* itm, int32_t* skip) {
|
|||
|
||||
return buf;
|
||||
}
|
||||
|
||||
int32_t indexConvertData(void* src, int8_t type, void** dst) {
|
||||
int tlen = -1;
|
||||
switch (type) {
|
||||
case TSDB_DATA_TYPE_TIMESTAMP:
|
||||
tlen = taosEncodeFixedI64(NULL, *(int64_t*)src);
|
||||
*dst = taosMemoryCalloc(1, tlen + 1);
|
||||
tlen = taosEncodeFixedI64(dst, *(int64_t*)src);
|
||||
break;
|
||||
case TSDB_DATA_TYPE_BOOL:
|
||||
case TSDB_DATA_TYPE_UTINYINT:
|
||||
tlen = taosEncodeFixedU8(NULL, *(uint8_t*)src);
|
||||
*dst = taosMemoryCalloc(1, tlen + 1);
|
||||
tlen = taosEncodeFixedU8(dst, *(uint8_t*)src);
|
||||
break;
|
||||
case TSDB_DATA_TYPE_TINYINT:
|
||||
tlen = taosEncodeFixedI8(NULL, *(uint8_t*)src);
|
||||
*dst = taosMemoryCalloc(1, tlen + 1);
|
||||
tlen = taosEncodeFixedI8(dst, *(uint8_t*)src);
|
||||
break;
|
||||
case TSDB_DATA_TYPE_SMALLINT:
|
||||
tlen = taosEncodeFixedI16(NULL, *(int16_t*)src);
|
||||
*dst = taosMemoryCalloc(1, tlen + 1);
|
||||
tlen = taosEncodeFixedI16(dst, *(int16_t*)src);
|
||||
break;
|
||||
case TSDB_DATA_TYPE_USMALLINT:
|
||||
tlen = taosEncodeFixedU16(NULL, *(uint16_t*)src);
|
||||
*dst = taosMemoryCalloc(1, tlen + 1);
|
||||
tlen = taosEncodeFixedU16(dst, *(uint16_t*)src);
|
||||
break;
|
||||
case TSDB_DATA_TYPE_INT:
|
||||
tlen = taosEncodeFixedI32(NULL, *(int32_t*)src);
|
||||
*dst = taosMemoryCalloc(1, tlen + 1);
|
||||
tlen = taosEncodeFixedI32(dst, *(int32_t*)src);
|
||||
break;
|
||||
case TSDB_DATA_TYPE_FLOAT:
|
||||
tlen = taosEncodeBinary(NULL, src, sizeof(float));
|
||||
*dst = taosMemoryCalloc(1, tlen + 1);
|
||||
tlen = taosEncodeBinary(dst, src, sizeof(float));
|
||||
break;
|
||||
case TSDB_DATA_TYPE_UINT:
|
||||
tlen = taosEncodeFixedU32(NULL, *(uint32_t*)src);
|
||||
*dst = taosMemoryCalloc(1, tlen + 1);
|
||||
tlen = taosEncodeFixedU32(dst, *(uint32_t*)src);
|
||||
break;
|
||||
case TSDB_DATA_TYPE_BIGINT:
|
||||
tlen = taosEncodeFixedI64(NULL, *(uint32_t*)src);
|
||||
*dst = taosMemoryCalloc(1, tlen + 1);
|
||||
tlen = taosEncodeFixedI64(dst, *(uint32_t*)src);
|
||||
break;
|
||||
case TSDB_DATA_TYPE_DOUBLE:
|
||||
tlen = taosEncodeBinary(NULL, src, sizeof(double));
|
||||
*dst = taosMemoryCalloc(1, tlen + 1);
|
||||
tlen = taosEncodeBinary(dst, src, sizeof(double));
|
||||
break;
|
||||
case TSDB_DATA_TYPE_UBIGINT:
|
||||
tlen = taosEncodeFixedU64(NULL, *(uint32_t*)src);
|
||||
*dst = taosMemoryCalloc(1, tlen + 1);
|
||||
tlen = taosEncodeFixedU64(dst, *(uint32_t*)src);
|
||||
break;
|
||||
case TSDB_DATA_TYPE_NCHAR: {
|
||||
tlen = taosEncodeBinary(NULL, varDataVal(src), varDataLen(src));
|
||||
*dst = taosMemoryCalloc(1, tlen + 1);
|
||||
tlen = taosEncodeBinary(dst, varDataVal(src), varDataLen(src));
|
||||
|
||||
break;
|
||||
}
|
||||
case TSDB_DATA_TYPE_VARCHAR: { // TSDB_DATA_TYPE_BINARY
|
||||
#if 1
|
||||
tlen = taosEncodeBinary(NULL, src, strlen(src));
|
||||
*dst = taosMemoryCalloc(1, tlen + 1);
|
||||
tlen = taosEncodeBinary(dst, src, strlen(src));
|
||||
break;
|
||||
#endif
|
||||
}
|
||||
case TSDB_DATA_TYPE_VARBINARY:
|
||||
#if 1
|
||||
tlen = taosEncodeBinary(NULL, src, strlen(src));
|
||||
*dst = taosMemoryCalloc(1, tlen + 1);
|
||||
tlen = taosEncodeBinary(dst, src, strlen(src));
|
||||
break;
|
||||
#endif
|
||||
default:
|
||||
TASSERT(0);
|
||||
break;
|
||||
}
|
||||
*dst = *dst - tlen;
|
||||
if (type != TSDB_DATA_TYPE_BINARY && type != TSDB_DATA_TYPE_NCHAR && type != TSDB_DATA_TYPE_VARBINARY &&
|
||||
type == TSDB_DATA_TYPE_VARCHAR) {
|
||||
uint8_t* p = *dst;
|
||||
for (int i = 0; i < tlen; i++) {
|
||||
if (p[i] == 0) {
|
||||
p[i] = (uint8_t)'0';
|
||||
}
|
||||
}
|
||||
}
|
||||
return tlen;
|
||||
}
|
||||
|
|
|
@ -82,7 +82,10 @@ FstSlice fstSliceCreate(uint8_t* data, uint64_t len) {
|
|||
str->ref = 1;
|
||||
str->len = len;
|
||||
str->data = taosMemoryMalloc(len * sizeof(uint8_t));
|
||||
memcpy(str->data, data, len);
|
||||
|
||||
if (data != NULL) {
|
||||
memcpy(str->data, data, len);
|
||||
}
|
||||
|
||||
FstSlice s = {.str = str, .start = 0, .end = len - 1};
|
||||
return s;
|
||||
|
|
|
@ -469,13 +469,19 @@ static int32_t tfSearchCompareFunc_JSON(void* reader, SIndexTerm* tem, SIdxTempR
|
|||
while ((rt = streamWithStateNextWith(st, NULL)) != NULL) {
|
||||
FstSlice* s = &rt->data;
|
||||
|
||||
char* ch = (char*)fstSliceData(s, NULL);
|
||||
if (0 != strncmp(ch, p, skip)) {
|
||||
int32_t sz = 0;
|
||||
char* ch = (char*)fstSliceData(s, &sz);
|
||||
char* tmp = taosMemoryCalloc(1, sz + 1);
|
||||
memcpy(tmp, ch, sz);
|
||||
|
||||
if (0 != strncmp(tmp, p, skip)) {
|
||||
swsResultDestroy(rt);
|
||||
taosMemoryFree(tmp);
|
||||
break;
|
||||
}
|
||||
|
||||
TExeCond cond = cmpFn(ch + skip, tem->colVal, tem->colType);
|
||||
TExeCond cond = cmpFn(tmp + skip, tem->colVal, INDEX_TYPE_GET_TYPE(tem->colType));
|
||||
|
||||
if (MATCH == cond) {
|
||||
tfileReaderLoadTableIds((TFileReader*)reader, rt->out.out, tr->total);
|
||||
} else if (CONTINUE == cond) {
|
||||
|
@ -483,6 +489,7 @@ static int32_t tfSearchCompareFunc_JSON(void* reader, SIndexTerm* tem, SIdxTempR
|
|||
swsResultDestroy(rt);
|
||||
break;
|
||||
}
|
||||
taosMemoryFree(tmp);
|
||||
swsResultDestroy(rt);
|
||||
}
|
||||
streamWithStateDestroy(st);
|
||||
|
|
|
@ -17,12 +17,32 @@
|
|||
#include "tutil.h"
|
||||
|
||||
static std::string dir = "/tmp/json";
|
||||
static std::string logDir = "/tmp/log";
|
||||
|
||||
static void initLog() {
|
||||
const char* defaultLogFileNamePrefix = "taoslog";
|
||||
const int32_t maxLogFileNum = 10;
|
||||
|
||||
tsAsyncLog = 0;
|
||||
sDebugFlag = 143;
|
||||
strcpy(tsLogDir, logDir.c_str());
|
||||
taosRemoveDir(tsLogDir);
|
||||
taosMkDir(tsLogDir);
|
||||
|
||||
if (taosInitLog(defaultLogFileNamePrefix, maxLogFileNum) < 0) {
|
||||
printf("failed to open log file in directory:%s\n", tsLogDir);
|
||||
}
|
||||
}
|
||||
class JsonEnv : public ::testing::Test {
|
||||
protected:
|
||||
virtual void SetUp() {
|
||||
taosRemoveDir(logDir.c_str());
|
||||
taosMkDir(logDir.c_str());
|
||||
taosRemoveDir(dir.c_str());
|
||||
taosMkDir(dir.c_str());
|
||||
printf("set up\n");
|
||||
|
||||
initLog();
|
||||
opts = indexOptsCreate();
|
||||
int ret = tIndexJsonOpen(opts, dir.c_str(), &index);
|
||||
assert(ret == 0);
|
||||
|
|
Loading…
Reference in New Issue