Merge pull request #12237 from taosdata/enh/supportIdxFilter
Enh/support idx filter
This commit is contained in:
commit
5fae014c6a
|
@ -30,6 +30,7 @@ static void indexMemUnRef(MemTable* tbl);
|
||||||
|
|
||||||
static void indexCacheTermDestroy(CacheTerm* ct);
|
static void indexCacheTermDestroy(CacheTerm* ct);
|
||||||
static int32_t indexCacheTermCompare(const void* l, const void* r);
|
static int32_t indexCacheTermCompare(const void* l, const void* r);
|
||||||
|
static int32_t indexCacheJsonTermCompare(const void* l, const void* r);
|
||||||
static char* indexCacheTermGet(const void* pData);
|
static char* indexCacheTermGet(const void* pData);
|
||||||
|
|
||||||
static MemTable* indexInternalCacheCreate(int8_t type);
|
static MemTable* indexInternalCacheCreate(int8_t type);
|
||||||
|
@ -63,6 +64,7 @@ typedef enum { MATCH, CONTINUE, BREAK } TExeCond;
|
||||||
typedef TExeCond (*_cache_range_compare)(void* a, void* b, int8_t type);
|
typedef TExeCond (*_cache_range_compare)(void* a, void* b, int8_t type);
|
||||||
|
|
||||||
static TExeCond tDoCommpare(__compar_fn_t func, int8_t comType, void* a, void* b) {
|
static TExeCond tDoCommpare(__compar_fn_t func, int8_t comType, void* a, void* b) {
|
||||||
|
// optime later
|
||||||
int32_t ret = func(a, b);
|
int32_t ret = func(a, b);
|
||||||
switch (comType) {
|
switch (comType) {
|
||||||
case QUERY_LESS_THAN: {
|
case QUERY_LESS_THAN: {
|
||||||
|
@ -242,6 +244,7 @@ static int32_t cacheSearchTerm_JSON(void* cache, SIndexTerm* term, SIdxTempResul
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
CacheTerm* c = (CacheTerm*)SL_GET_NODE_DATA(node);
|
CacheTerm* c = (CacheTerm*)SL_GET_NODE_DATA(node);
|
||||||
|
|
||||||
if (0 == strcmp(c->colVal, pCt->colVal)) {
|
if (0 == strcmp(c->colVal, pCt->colVal)) {
|
||||||
if (c->operaType == ADD_VALUE) {
|
if (c->operaType == ADD_VALUE) {
|
||||||
INDEX_MERGE_ADD_DEL(tr->deled, tr->added, c->uid)
|
INDEX_MERGE_ADD_DEL(tr->deled, tr->added, c->uid)
|
||||||
|
@ -311,6 +314,7 @@ static int32_t cacheSearchCompareFunc_JSON(void* cache, SIndexTerm* term, SIdxTe
|
||||||
}
|
}
|
||||||
char* key = indexCacheTermGet(pCt);
|
char* key = indexCacheTermGet(pCt);
|
||||||
|
|
||||||
|
// SSkipListIterator* iter = tSkipListCreateIter(mem->mem);
|
||||||
SSkipListIterator* iter = tSkipListCreateIterFromVal(mem->mem, key, TSDB_DATA_TYPE_BINARY, TSDB_ORDER_ASC);
|
SSkipListIterator* iter = tSkipListCreateIterFromVal(mem->mem, key, TSDB_DATA_TYPE_BINARY, TSDB_ORDER_ASC);
|
||||||
while (tSkipListIterNext(iter)) {
|
while (tSkipListIterNext(iter)) {
|
||||||
SSkipListNode* node = tSkipListIterGet(iter);
|
SSkipListNode* node = tSkipListIterGet(iter);
|
||||||
|
@ -318,6 +322,10 @@ static int32_t cacheSearchCompareFunc_JSON(void* cache, SIndexTerm* term, SIdxTe
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
CacheTerm* c = (CacheTerm*)SL_GET_NODE_DATA(node);
|
CacheTerm* c = (CacheTerm*)SL_GET_NODE_DATA(node);
|
||||||
|
printf("json val: %s\n", c->colVal);
|
||||||
|
if (0 != strncmp(c->colVal, term->colName, term->nColName)) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
TExeCond cond = cmpFn(c->colVal + skip, term->colVal, dType);
|
TExeCond cond = cmpFn(c->colVal + skip, term->colVal, dType);
|
||||||
if (cond == MATCH) {
|
if (cond == MATCH) {
|
||||||
|
@ -598,24 +606,11 @@ int indexCacheSearch(void* cache, SIndexTermQuery* query, SIdxTempResult* result
|
||||||
indexMemRef(imm);
|
indexMemRef(imm);
|
||||||
taosThreadMutexUnlock(&pCache->mtx);
|
taosThreadMutexUnlock(&pCache->mtx);
|
||||||
|
|
||||||
// SIndexTerm* term = query->term;
|
|
||||||
// EIndexQueryType qtype = query->qType;
|
|
||||||
|
|
||||||
// bool isJson = INDEX_TYPE_CONTAIN_EXTERN_TYPE(term->colType, TSDB_DATA_TYPE_JSON);
|
|
||||||
// char* p = term->colVal;
|
|
||||||
// if (isJson) {
|
|
||||||
// p = indexPackJsonData(term);
|
|
||||||
//}
|
|
||||||
// CacheTerm ct = {.colVal = p, .version = atomic_load_32(&pCache->version)};
|
|
||||||
|
|
||||||
int ret = indexQueryMem(mem, query, result, s);
|
int ret = indexQueryMem(mem, query, result, s);
|
||||||
if (ret == 0 && *s != kTypeDeletion) {
|
if (ret == 0 && *s != kTypeDeletion) {
|
||||||
// continue search in imm
|
// continue search in imm
|
||||||
ret = indexQueryMem(imm, query, result, s);
|
ret = indexQueryMem(imm, query, result, s);
|
||||||
}
|
}
|
||||||
// if (isJson) {
|
|
||||||
// taosMemoryFreeClear(p);
|
|
||||||
//}
|
|
||||||
|
|
||||||
indexMemUnRef(mem);
|
indexMemUnRef(mem);
|
||||||
indexMemUnRef(imm);
|
indexMemUnRef(imm);
|
||||||
|
@ -682,14 +677,52 @@ static int32_t indexCacheTermCompare(const void* l, const void* r) {
|
||||||
return cmp;
|
return cmp;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int indexFindCh(char* a, char c) {
|
||||||
|
char* p = a;
|
||||||
|
while (*p != 0 && *p++ != c) {
|
||||||
|
}
|
||||||
|
return p - a;
|
||||||
|
}
|
||||||
|
static int indexCacheJsonTermCompareImpl(char* a, char* b) {
|
||||||
|
int alen = indexFindCh(a, '&');
|
||||||
|
int blen = indexFindCh(b, '&');
|
||||||
|
|
||||||
|
int cmp = strncmp(a, b, MIN(alen, blen));
|
||||||
|
if (cmp == 0) {
|
||||||
|
cmp = alen - blen;
|
||||||
|
if (cmp != 0) {
|
||||||
|
return cmp;
|
||||||
|
}
|
||||||
|
cmp = *(a + alen) - *(b + blen);
|
||||||
|
if (cmp != 0) {
|
||||||
|
return cmp;
|
||||||
|
}
|
||||||
|
alen += 2;
|
||||||
|
blen += 2;
|
||||||
|
cmp = strcmp(a + alen, b + blen);
|
||||||
|
}
|
||||||
|
return cmp;
|
||||||
|
}
|
||||||
|
static int32_t indexCacheJsonTermCompare(const void* l, const void* r) {
|
||||||
|
CacheTerm* lt = (CacheTerm*)l;
|
||||||
|
CacheTerm* rt = (CacheTerm*)r;
|
||||||
|
// compare colVal
|
||||||
|
int cmp = indexCacheJsonTermCompareImpl(lt->colVal, rt->colVal);
|
||||||
|
if (cmp == 0) {
|
||||||
|
return rt->version - lt->version;
|
||||||
|
}
|
||||||
|
return cmp;
|
||||||
|
}
|
||||||
static MemTable* indexInternalCacheCreate(int8_t type) {
|
static MemTable* indexInternalCacheCreate(int8_t type) {
|
||||||
type = INDEX_TYPE_CONTAIN_EXTERN_TYPE(type, TSDB_DATA_TYPE_JSON) ? TSDB_DATA_TYPE_BINARY : type;
|
int ttype = INDEX_TYPE_CONTAIN_EXTERN_TYPE(type, TSDB_DATA_TYPE_JSON) ? TSDB_DATA_TYPE_BINARY : type;
|
||||||
|
int32_t (*cmpFn)(const void* l, const void* r) =
|
||||||
|
INDEX_TYPE_CONTAIN_EXTERN_TYPE(type, TSDB_DATA_TYPE_JSON) ? indexCacheJsonTermCompare : indexCacheTermCompare;
|
||||||
|
|
||||||
MemTable* tbl = taosMemoryCalloc(1, sizeof(MemTable));
|
MemTable* tbl = taosMemoryCalloc(1, sizeof(MemTable));
|
||||||
indexMemRef(tbl);
|
indexMemRef(tbl);
|
||||||
if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR) {
|
if (ttype == TSDB_DATA_TYPE_BINARY || ttype == TSDB_DATA_TYPE_NCHAR) {
|
||||||
tbl->mem = tSkipListCreate(MAX_SKIP_LIST_LEVEL, type, MAX_INDEX_KEY_LEN, indexCacheTermCompare, SL_ALLOW_DUP_KEY,
|
tbl->mem =
|
||||||
indexCacheTermGet);
|
tSkipListCreate(MAX_SKIP_LIST_LEVEL, ttype, MAX_INDEX_KEY_LEN, cmpFn, SL_ALLOW_DUP_KEY, indexCacheTermGet);
|
||||||
}
|
}
|
||||||
return tbl;
|
return tbl;
|
||||||
}
|
}
|
||||||
|
|
|
@ -129,7 +129,7 @@ TEST_F(JsonEnv, testWriteMillonData) {
|
||||||
|
|
||||||
SIndexMultiTerm* terms = indexMultiTermCreate();
|
SIndexMultiTerm* terms = indexMultiTermCreate();
|
||||||
indexMultiTermAdd(terms, term);
|
indexMultiTermAdd(terms, term);
|
||||||
for (size_t i = 0; i < 1000000; i++) {
|
for (size_t i = 0; i < 1000; i++) {
|
||||||
tIndexJsonPut(index, terms, i);
|
tIndexJsonPut(index, terms, i);
|
||||||
}
|
}
|
||||||
indexMultiTermDestroy(terms);
|
indexMultiTermDestroy(terms);
|
||||||
|
@ -148,4 +148,36 @@ TEST_F(JsonEnv, testWriteMillonData) {
|
||||||
assert(100 == taosArrayGetSize(result));
|
assert(100 == taosArrayGetSize(result));
|
||||||
indexMultiTermQueryDestroy(mq);
|
indexMultiTermQueryDestroy(mq);
|
||||||
}
|
}
|
||||||
|
{
|
||||||
|
{
|
||||||
|
std::string colName("test");
|
||||||
|
std::string colVal("ab");
|
||||||
|
|
||||||
|
SIndexMultiTermQuery* mq = indexMultiTermQueryCreate(MUST);
|
||||||
|
SIndexTerm* q = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
|
||||||
|
colVal.c_str(), colVal.size());
|
||||||
|
|
||||||
|
SArray* result = taosArrayInit(1, sizeof(uint64_t));
|
||||||
|
indexMultiTermQueryAdd(mq, q, QUERY_GREATER_THAN);
|
||||||
|
tIndexJsonSearch(index, mq, result);
|
||||||
|
assert(0 == taosArrayGetSize(result));
|
||||||
|
indexMultiTermQueryDestroy(mq);
|
||||||
|
}
|
||||||
|
{
|
||||||
|
{
|
||||||
|
std::string colName("test");
|
||||||
|
std::string colVal("ab");
|
||||||
|
|
||||||
|
SIndexMultiTermQuery* mq = indexMultiTermQueryCreate(MUST);
|
||||||
|
SIndexTerm* q = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
|
||||||
|
colVal.c_str(), colVal.size());
|
||||||
|
|
||||||
|
SArray* result = taosArrayInit(1, sizeof(uint64_t));
|
||||||
|
indexMultiTermQueryAdd(mq, q, QUERY_GREATER_EQUAL);
|
||||||
|
tIndexJsonSearch(index, mq, result);
|
||||||
|
assert(100 == taosArrayGetSize(result));
|
||||||
|
indexMultiTermQueryDestroy(mq);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue