enh(index): support more data type
This commit is contained in:
parent
e08dd100a4
commit
8240b10a3e
|
@ -48,6 +48,7 @@ typedef struct IndexCache {
|
|||
} IndexCache;
|
||||
|
||||
#define CACHE_VERSION(cache) atomic_load_32(&cache->version)
|
||||
|
||||
typedef struct CacheTerm {
|
||||
// key
|
||||
char* colVal;
|
||||
|
|
|
@ -24,7 +24,7 @@ extern char JSON_COLUMN[];
|
|||
extern char JSON_VALUE_DELIM;
|
||||
|
||||
char* indexPackJsonData(SIndexTerm* itm);
|
||||
|
||||
char* indexPackJsonDataPrefix(SIndexTerm* itm, int32_t* skip);
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
|
|
@ -86,9 +86,18 @@ static TExeCond tCompareLessThan(void* a, void* b, int8_t type) {
|
|||
__compar_fn_t func = getComparFunc(type, 0);
|
||||
return tDoCommpare(func, QUERY_LESS_THAN, a, b);
|
||||
}
|
||||
static TExeCond tCompareLessEqual(void* a, void* b, int8_t type) { return MATCH; }
|
||||
static TExeCond tCompareGreaterThan(void* a, void* b, int8_t type) { return MATCH; }
|
||||
static TExeCond tCompareGreaterEqual(void* a, void* b, int8_t type) { return MATCH; }
|
||||
static TExeCond tCompareLessEqual(void* a, void* b, int8_t type) {
|
||||
__compar_fn_t func = getComparFunc(type, 0);
|
||||
return tDoCommpare(func, QUERY_LESS_EQUAL, a, b);
|
||||
}
|
||||
static TExeCond tCompareGreaterThan(void* a, void* b, int8_t type) {
|
||||
__compar_fn_t func = getComparFunc(type, 0);
|
||||
return tDoCommpare(func, QUERY_GREATER_THAN, a, b);
|
||||
}
|
||||
static TExeCond tCompareGreaterEqual(void* a, void* b, int8_t type) {
|
||||
__compar_fn_t func = getComparFunc(type, 0);
|
||||
return tDoCommpare(func, QUERY_GREATER_EQUAL, a, b);
|
||||
}
|
||||
|
||||
static TExeCond (*rangeCompare[])(void* a, void* b, int8_t type) = {tCompareLessThan, tCompareLessEqual,
|
||||
tCompareGreaterThan, tCompareGreaterEqual};
|
||||
|
@ -109,10 +118,12 @@ static int32_t cacheSearchTerm(void* cache, SIndexTerm* term, SIdxTempResult* tr
|
|||
}
|
||||
MemTable* mem = cache;
|
||||
IndexCache* pCache = mem->pCache;
|
||||
CacheTerm ct = {.colVal = term->colVal, .version = atomic_load_32(&pCache->version)};
|
||||
CacheTerm* pCt = &ct;
|
||||
|
||||
char* key = indexCacheTermGet(&ct);
|
||||
CacheTerm* pCt = taosMemoryCalloc(1, sizeof(CacheTerm));
|
||||
pCt->colVal = term->colVal;
|
||||
pCt->version = atomic_load_32(&pCache->version);
|
||||
|
||||
char* key = indexCacheTermGet(pCt);
|
||||
|
||||
SSkipListIterator* iter = tSkipListCreateIterFromVal(mem->mem, key, TSDB_DATA_TYPE_BINARY, TSDB_ORDER_ASC);
|
||||
while (tSkipListIterNext(iter)) {
|
||||
|
@ -133,6 +144,8 @@ static int32_t cacheSearchTerm(void* cache, SIndexTerm* term, SIdxTempResult* tr
|
|||
break;
|
||||
}
|
||||
}
|
||||
|
||||
taosMemoryFree(pCt);
|
||||
tSkipListDestroyIter(iter);
|
||||
return 0;
|
||||
}
|
||||
|
@ -153,6 +166,7 @@ static int32_t cacheSearchCompareFunc(void* cache, SIndexTerm* term, SIdxTempRes
|
|||
if (cache == NULL) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
_cache_range_compare cmpFn = rangeCompare[type];
|
||||
|
||||
MemTable* mem = cache;
|
||||
|
@ -204,6 +218,48 @@ static int32_t cacheSearchGreaterEqual(void* cache, SIndexTerm* term, SIdxTempRe
|
|||
}
|
||||
|
||||
static int32_t cacheSearchTerm_JSON(void* cache, SIndexTerm* term, SIdxTempResult* tr, STermValueType* s) {
|
||||
if (cache == NULL) {
|
||||
return 0;
|
||||
}
|
||||
MemTable* mem = cache;
|
||||
IndexCache* pCache = mem->pCache;
|
||||
|
||||
CacheTerm* pCt = taosMemoryCalloc(1, sizeof(CacheTerm));
|
||||
pCt->colVal = term->colVal;
|
||||
pCt->version = atomic_load_32(&pCache->version);
|
||||
|
||||
char* exBuf = NULL;
|
||||
if (INDEX_TYPE_CONTAIN_EXTERN_TYPE(term->colType, TSDB_DATA_TYPE_JSON)) {
|
||||
exBuf = indexPackJsonData(term);
|
||||
pCt->colVal = exBuf;
|
||||
}
|
||||
char* key = indexCacheTermGet(pCt);
|
||||
|
||||
SSkipListIterator* iter = tSkipListCreateIterFromVal(mem->mem, key, TSDB_DATA_TYPE_BINARY, TSDB_ORDER_ASC);
|
||||
while (tSkipListIterNext(iter)) {
|
||||
SSkipListNode* node = tSkipListIterGet(iter);
|
||||
if (node == NULL) {
|
||||
break;
|
||||
}
|
||||
CacheTerm* c = (CacheTerm*)SL_GET_NODE_DATA(node);
|
||||
if (0 == strcmp(c->colVal, pCt->colVal)) {
|
||||
if (c->operaType == ADD_VALUE) {
|
||||
INDEX_MERGE_ADD_DEL(tr->deled, tr->added, c->uid)
|
||||
// taosArrayPush(result, &c->uid);
|
||||
*s = kTypeValue;
|
||||
} else if (c->operaType == DEL_VALUE) {
|
||||
INDEX_MERGE_ADD_DEL(tr->added, tr->deled, c->uid)
|
||||
}
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
taosMemoryFree(pCt);
|
||||
taosMemoryFree(exBuf);
|
||||
tSkipListDestroyIter(iter);
|
||||
return 0;
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
static int32_t cacheSearchPrefix_JSON(void* cache, SIndexTerm* term, SIdxTempResult* tr, STermValueType* s) {
|
||||
|
@ -233,6 +289,56 @@ static int32_t cacheSearchRange_JSON(void* cache, SIndexTerm* term, SIdxTempResu
|
|||
|
||||
static int32_t cacheSearchCompareFunc_JSON(void* cache, SIndexTerm* term, SIdxTempResult* tr, STermValueType* s,
|
||||
RangeType type) {
|
||||
if (cache == NULL) {
|
||||
return 0;
|
||||
}
|
||||
_cache_range_compare cmpFn = rangeCompare[type];
|
||||
|
||||
MemTable* mem = cache;
|
||||
IndexCache* pCache = mem->pCache;
|
||||
|
||||
CacheTerm* pCt = taosMemoryCalloc(1, sizeof(CacheTerm));
|
||||
pCt->colVal = term->colVal;
|
||||
pCt->version = atomic_load_32(&pCache->version);
|
||||
|
||||
int8_t dType = INDEX_TYPE_GET_TYPE(term->colType);
|
||||
int skip = 0;
|
||||
char* exBuf = NULL;
|
||||
|
||||
if (INDEX_TYPE_CONTAIN_EXTERN_TYPE(term->colType, TSDB_DATA_TYPE_JSON)) {
|
||||
exBuf = indexPackJsonDataPrefix(term, &skip);
|
||||
pCt->colVal = exBuf;
|
||||
}
|
||||
char* key = indexCacheTermGet(pCt);
|
||||
|
||||
SSkipListIterator* iter = tSkipListCreateIterFromVal(mem->mem, key, TSDB_DATA_TYPE_BINARY, TSDB_ORDER_ASC);
|
||||
while (tSkipListIterNext(iter)) {
|
||||
SSkipListNode* node = tSkipListIterGet(iter);
|
||||
if (node == NULL) {
|
||||
break;
|
||||
}
|
||||
CacheTerm* c = (CacheTerm*)SL_GET_NODE_DATA(node);
|
||||
|
||||
TExeCond cond = cmpFn(c->colVal + skip, term->colVal, dType);
|
||||
if (cond == MATCH) {
|
||||
if (c->operaType == ADD_VALUE) {
|
||||
INDEX_MERGE_ADD_DEL(tr->deled, tr->added, c->uid)
|
||||
// taosArrayPush(result, &c->uid);
|
||||
*s = kTypeValue;
|
||||
} else if (c->operaType == DEL_VALUE) {
|
||||
INDEX_MERGE_ADD_DEL(tr->added, tr->deled, c->uid)
|
||||
}
|
||||
} else if (cond == CONTINUE) {
|
||||
continue;
|
||||
} else if (cond == BREAK) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
taosMemoryFree(pCt);
|
||||
taosMemoryFree(exBuf);
|
||||
tSkipListDestroyIter(iter);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
static int32_t cacheSearchRange(void* cache, SIndexTerm* term, SIdxTempResult* tr, STermValueType* s) {
|
||||
|
@ -408,6 +514,7 @@ static void indexCacheMakeRoomForWrite(IndexCache* cache) {
|
|||
indexCacheRef(cache);
|
||||
cache->imm = cache->mem;
|
||||
cache->mem = indexInternalCacheCreate(cache->type);
|
||||
cache->mem->pCache = cache;
|
||||
cache->occupiedMem = 0;
|
||||
// sched to merge
|
||||
// unref cache in bgwork
|
||||
|
|
|
@ -46,3 +46,30 @@ char* indexPackJsonData(SIndexTerm* itm) {
|
|||
|
||||
return buf;
|
||||
}
|
||||
char* indexPackJsonDataPrefix(SIndexTerm* itm, int32_t* skip) {
|
||||
/*
|
||||
* |<-----colname---->|<-----dataType---->|<--------colVal---------->|
|
||||
* |<-----string----->|<-----uint8_t----->|<----depend on dataType-->|
|
||||
*/
|
||||
uint8_t ty = INDEX_TYPE_GET_TYPE(itm->colType);
|
||||
|
||||
int32_t sz = itm->nColName + itm->nColVal + sizeof(uint8_t) + sizeof(JSON_VALUE_DELIM) * 2 + 1;
|
||||
char* buf = (char*)taosMemoryCalloc(1, sz);
|
||||
char* p = buf;
|
||||
|
||||
memcpy(p, itm->colName, itm->nColName);
|
||||
p += itm->nColName;
|
||||
|
||||
memcpy(p, &JSON_VALUE_DELIM, sizeof(JSON_VALUE_DELIM));
|
||||
p += sizeof(JSON_VALUE_DELIM);
|
||||
|
||||
memcpy(p, &ty, sizeof(ty));
|
||||
p += sizeof(ty);
|
||||
|
||||
memcpy(p, &JSON_VALUE_DELIM, sizeof(JSON_VALUE_DELIM));
|
||||
p += sizeof(JSON_VALUE_DELIM);
|
||||
|
||||
*skip = p - buf;
|
||||
|
||||
return buf;
|
||||
}
|
||||
|
|
|
@ -308,20 +308,20 @@ static int32_t tfSearchRegex(void* reader, SIndexTerm* tem, SIdxTempResult* tr)
|
|||
}
|
||||
|
||||
static int32_t tfSearchCompareFunc(void* reader, SIndexTerm* tem, SIdxTempResult* tr, RangeType type) {
|
||||
bool hasJson = INDEX_TYPE_CONTAIN_EXTERN_TYPE(tem->colType, TSDB_DATA_TYPE_JSON);
|
||||
int ret = 0;
|
||||
char* p = tem->colVal;
|
||||
uint64_t sz = tem->nColVal;
|
||||
bool hasJson = INDEX_TYPE_CONTAIN_EXTERN_TYPE(tem->colType, TSDB_DATA_TYPE_JSON);
|
||||
int ret = 0;
|
||||
char* p = tem->colVal;
|
||||
int skip = 0;
|
||||
|
||||
if (hasJson) {
|
||||
p = indexPackJsonData(tem);
|
||||
sz = strlen(p);
|
||||
p = indexPackJsonDataPrefix(tem, &skip);
|
||||
}
|
||||
SArray* offsets = taosArrayInit(16, sizeof(uint64_t));
|
||||
|
||||
AutomationCtx* ctx = automCtxCreate((void*)p, AUTOMATION_ALWAYS);
|
||||
FstStreamBuilder* sb = fstSearch(((TFileReader*)reader)->fst, ctx);
|
||||
|
||||
FstSlice h = fstSliceCreate((uint8_t*)p, sz);
|
||||
FstSlice h = fstSliceCreate((uint8_t*)p, skip);
|
||||
fstStreamBuilderSetRange(sb, &h, type);
|
||||
fstSliceDestroy(&h);
|
||||
|
||||
|
|
Loading…
Reference in New Issue