enh: refactor idx module
This commit is contained in:
parent
daa2e86887
commit
f057cd1943
|
@ -133,24 +133,24 @@ typedef struct TFileCacheKey {
|
|||
} ICacheKey;
|
||||
int idxFlushCacheToTFile(SIndex* sIdx, void*, bool quit);
|
||||
|
||||
int64_t indexAddRef(void* p);
|
||||
int32_t indexRemoveRef(int64_t ref);
|
||||
void indexAcquireRef(int64_t ref);
|
||||
void indexReleaseRef(int64_t ref);
|
||||
int64_t idxAddRef(void* p);
|
||||
int32_t idxRemoveRef(int64_t ref);
|
||||
void idxAcquireRef(int64_t ref);
|
||||
void idxReleaseRef(int64_t ref);
|
||||
|
||||
int32_t idxSerialCacheKey(ICacheKey* key, char* buf);
|
||||
// int32_t indexSerialKey(ICacheKey* key, char* buf);
|
||||
// int32_t indexSerialTermKey(SIndexTerm* itm, char* buf);
|
||||
|
||||
#define INDEX_TYPE_CONTAIN_EXTERN_TYPE(ty, exTy) (((ty >> 4) & (exTy)) != 0)
|
||||
#define IDX_TYPE_CONTAIN_EXTERN_TYPE(ty, exTy) (((ty >> 4) & (exTy)) != 0)
|
||||
|
||||
#define INDEX_TYPE_GET_TYPE(ty) (ty & 0x0F)
|
||||
#define IDX_TYPE_GET_TYPE(ty) (ty & 0x0F)
|
||||
|
||||
#define INDEX_TYPE_ADD_EXTERN_TYPE(ty, exTy) \
|
||||
do { \
|
||||
uint8_t oldTy = ty; \
|
||||
ty = (ty >> 4) | exTy; \
|
||||
ty = (ty << 4) | oldTy; \
|
||||
#define IDX_TYPE_ADD_EXTERN_TYPE(ty, exTy) \
|
||||
do { \
|
||||
uint8_t oldTy = ty; \
|
||||
ty = (ty >> 4) | exTy; \
|
||||
ty = (ty << 4) | oldTy; \
|
||||
} while (0)
|
||||
|
||||
#ifdef __cplusplus
|
||||
|
|
|
@ -90,7 +90,7 @@ static void idxMergeCacheAndTFile(SArray* result, IterateValue* icache, IterateV
|
|||
// static int32_t indexSerialTermKey(SIndexTerm* itm, char* buf);
|
||||
// int32_t indexSerialKey(ICacheKey* key, char* buf);
|
||||
|
||||
static void indexPost(void* idx) {
|
||||
static void idxPost(void* idx) {
|
||||
SIndex* pIdx = idx;
|
||||
tsem_post(&pIdx->sem);
|
||||
}
|
||||
|
@ -118,8 +118,8 @@ int indexOpen(SIndexOpts* opts, const char* path, SIndex** index) {
|
|||
taosThreadMutexInit(&sIdx->mtx, NULL);
|
||||
tsem_init(&sIdx->sem, 0, 0);
|
||||
|
||||
sIdx->refId = indexAddRef(sIdx);
|
||||
indexAcquireRef(sIdx->refId);
|
||||
sIdx->refId = idxAddRef(sIdx);
|
||||
idxAcquireRef(sIdx->refId);
|
||||
|
||||
*index = sIdx;
|
||||
return 0;
|
||||
|
@ -157,23 +157,23 @@ void indexClose(SIndex* sIdx) {
|
|||
taosHashCleanup(sIdx->colObj);
|
||||
sIdx->colObj = NULL;
|
||||
}
|
||||
indexReleaseRef(sIdx->refId);
|
||||
indexRemoveRef(sIdx->refId);
|
||||
idxReleaseRef(sIdx->refId);
|
||||
idxRemoveRef(sIdx->refId);
|
||||
}
|
||||
int64_t indexAddRef(void* p) {
|
||||
int64_t idxAddRef(void* p) {
|
||||
// impl
|
||||
return taosAddRef(indexRefMgt, p);
|
||||
}
|
||||
int32_t indexRemoveRef(int64_t ref) {
|
||||
int32_t idxRemoveRef(int64_t ref) {
|
||||
// impl later
|
||||
return taosRemoveRef(indexRefMgt, ref);
|
||||
}
|
||||
|
||||
void indexAcquireRef(int64_t ref) {
|
||||
void idxAcquireRef(int64_t ref) {
|
||||
// impl
|
||||
taosAcquireRef(indexRefMgt, ref);
|
||||
}
|
||||
void indexReleaseRef(int64_t ref) {
|
||||
void idxReleaseRef(int64_t ref) {
|
||||
// impl
|
||||
taosReleaseRef(indexRefMgt, ref);
|
||||
}
|
||||
|
@ -289,7 +289,7 @@ SIndexTerm* indexTermCreate(int64_t suid, SIndexOperOnColumn oper, uint8_t colTy
|
|||
tm->nColName = nColName;
|
||||
|
||||
char* buf = NULL;
|
||||
int32_t len = idxConvertDataToStr((void*)colVal, INDEX_TYPE_GET_TYPE(colType), (void**)&buf);
|
||||
int32_t len = idxConvertDataToStr((void*)colVal, IDX_TYPE_GET_TYPE(colType), (void**)&buf);
|
||||
assert(len != -1);
|
||||
|
||||
tm->colVal = buf;
|
||||
|
@ -479,9 +479,9 @@ int idxFlushCacheToTFile(SIndex* sIdx, void* cache, bool quit) {
|
|||
tfileReaderUnRef(pReader);
|
||||
atomic_store_32(&pCache->merging, 0);
|
||||
if (quit) {
|
||||
indexPost(sIdx);
|
||||
idxPost(sIdx);
|
||||
}
|
||||
indexReleaseRef(sIdx->refId);
|
||||
idxReleaseRef(sIdx->refId);
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -542,9 +542,9 @@ int idxFlushCacheToTFile(SIndex* sIdx, void* cache, bool quit) {
|
|||
}
|
||||
atomic_store_32(&pCache->merging, 0);
|
||||
if (quit) {
|
||||
indexPost(sIdx);
|
||||
idxPost(sIdx);
|
||||
}
|
||||
indexReleaseRef(sIdx->refId);
|
||||
idxReleaseRef(sIdx->refId);
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
@ -621,7 +621,7 @@ END:
|
|||
}
|
||||
|
||||
int32_t idxSerialCacheKey(ICacheKey* key, char* buf) {
|
||||
bool hasJson = INDEX_TYPE_CONTAIN_EXTERN_TYPE(key->colType, TSDB_DATA_TYPE_JSON);
|
||||
bool hasJson = IDX_TYPE_CONTAIN_EXTERN_TYPE(key->colType, TSDB_DATA_TYPE_JSON);
|
||||
|
||||
char* p = buf;
|
||||
char tbuf[65] = {0};
|
||||
|
|
|
@ -187,7 +187,7 @@ static int32_t cacheSearchTerm_JSON(void* cache, SIndexTerm* term, SIdxTRslt* tr
|
|||
pCt->version = atomic_load_64(&pCache->version);
|
||||
|
||||
char* exBuf = NULL;
|
||||
if (INDEX_TYPE_CONTAIN_EXTERN_TYPE(term->colType, TSDB_DATA_TYPE_JSON)) {
|
||||
if (IDX_TYPE_CONTAIN_EXTERN_TYPE(term->colType, TSDB_DATA_TYPE_JSON)) {
|
||||
exBuf = idxPackJsonData(term);
|
||||
pCt->colVal = exBuf;
|
||||
}
|
||||
|
@ -266,7 +266,7 @@ static int32_t cacheSearchCompareFunc_JSON(void* cache, SIndexTerm* term, SIdxTR
|
|||
pCt->colVal = term->colVal;
|
||||
pCt->version = atomic_load_64(&pCache->version);
|
||||
|
||||
int8_t dType = INDEX_TYPE_GET_TYPE(term->colType);
|
||||
int8_t dType = IDX_TYPE_GET_TYPE(term->colType);
|
||||
int skip = 0;
|
||||
char* exBuf = NULL;
|
||||
if (type == CONTAINS) {
|
||||
|
@ -342,7 +342,7 @@ IndexCache* idxCacheCreate(SIndex* idx, uint64_t suid, const char* colName, int8
|
|||
|
||||
cache->mem = idxInternalCacheCreate(type);
|
||||
cache->mem->pCache = cache;
|
||||
cache->colName = INDEX_TYPE_CONTAIN_EXTERN_TYPE(type, TSDB_DATA_TYPE_JSON) ? tstrdup(JSON_COLUMN) : tstrdup(colName);
|
||||
cache->colName = IDX_TYPE_CONTAIN_EXTERN_TYPE(type, TSDB_DATA_TYPE_JSON) ? tstrdup(JSON_COLUMN) : tstrdup(colName);
|
||||
cache->type = type;
|
||||
cache->index = idx;
|
||||
cache->version = 0;
|
||||
|
@ -354,7 +354,7 @@ IndexCache* idxCacheCreate(SIndex* idx, uint64_t suid, const char* colName, int8
|
|||
|
||||
idxCacheRef(cache);
|
||||
if (idx != NULL) {
|
||||
indexAcquireRef(idx->refId);
|
||||
idxAcquireRef(idx->refId);
|
||||
}
|
||||
return cache;
|
||||
}
|
||||
|
@ -455,7 +455,7 @@ void idxCacheDestroy(void* cache) {
|
|||
taosThreadMutexDestroy(&pCache->mtx);
|
||||
taosThreadCondDestroy(&pCache->finished);
|
||||
if (pCache->index != NULL) {
|
||||
indexReleaseRef(((SIndex*)pCache->index)->refId);
|
||||
idxReleaseRef(((SIndex*)pCache->index)->refId);
|
||||
}
|
||||
taosMemoryFree(pCache);
|
||||
}
|
||||
|
@ -500,7 +500,7 @@ int idxCacheSchedToMerge(IndexCache* pCache, bool notify) {
|
|||
schedMsg.thandle = taosMemoryMalloc(1);
|
||||
}
|
||||
schedMsg.msg = NULL;
|
||||
indexAcquireRef(pCache->index->refId);
|
||||
idxAcquireRef(pCache->index->refId);
|
||||
taosScheduleTask(indexQhandle, &schedMsg);
|
||||
return 0;
|
||||
}
|
||||
|
@ -533,7 +533,7 @@ int idxCachePut(void* cache, SIndexTerm* term, uint64_t uid) {
|
|||
if (cache == NULL) {
|
||||
return -1;
|
||||
}
|
||||
bool hasJson = INDEX_TYPE_CONTAIN_EXTERN_TYPE(term->colType, TSDB_DATA_TYPE_JSON);
|
||||
bool hasJson = IDX_TYPE_CONTAIN_EXTERN_TYPE(term->colType, TSDB_DATA_TYPE_JSON);
|
||||
|
||||
IndexCache* pCache = cache;
|
||||
idxCacheRef(pCache);
|
||||
|
@ -597,7 +597,7 @@ static int32_t idxQueryMem(MemTable* mem, SIndexTermQuery* query, SIdxTRslt* tr,
|
|||
SIndexTerm* term = query->term;
|
||||
EIndexQueryType qtype = query->qType;
|
||||
|
||||
if (INDEX_TYPE_CONTAIN_EXTERN_TYPE(term->colType, TSDB_DATA_TYPE_JSON)) {
|
||||
if (IDX_TYPE_CONTAIN_EXTERN_TYPE(term->colType, TSDB_DATA_TYPE_JSON)) {
|
||||
return cacheSearch[1][qtype](mem, term, tr, s);
|
||||
} else {
|
||||
return cacheSearch[0][qtype](mem, term, tr, s);
|
||||
|
@ -730,9 +730,9 @@ static int32_t idxCacheJsonTermCompare(const void* l, const void* r) {
|
|||
return cmp;
|
||||
}
|
||||
static MemTable* idxInternalCacheCreate(int8_t type) {
|
||||
int ttype = INDEX_TYPE_CONTAIN_EXTERN_TYPE(type, TSDB_DATA_TYPE_JSON) ? TSDB_DATA_TYPE_BINARY : TSDB_DATA_TYPE_BINARY;
|
||||
int ttype = IDX_TYPE_CONTAIN_EXTERN_TYPE(type, TSDB_DATA_TYPE_JSON) ? TSDB_DATA_TYPE_BINARY : TSDB_DATA_TYPE_BINARY;
|
||||
int32_t (*cmpFn)(const void* l, const void* r) =
|
||||
INDEX_TYPE_CONTAIN_EXTERN_TYPE(type, TSDB_DATA_TYPE_JSON) ? idxCacheJsonTermCompare : idxCacheTermCompare;
|
||||
IDX_TYPE_CONTAIN_EXTERN_TYPE(type, TSDB_DATA_TYPE_JSON) ? idxCacheJsonTermCompare : idxCacheTermCompare;
|
||||
|
||||
MemTable* tbl = taosMemoryCalloc(1, sizeof(MemTable));
|
||||
idxMemRef(tbl);
|
||||
|
|
|
@ -212,7 +212,7 @@ char* idxPackJsonData(SIndexTerm* itm) {
|
|||
* |<-----colname---->|<-----dataType---->|<--------colVal---------->|
|
||||
* |<-----string----->|<-----uint8_t----->|<----depend on dataType-->|
|
||||
*/
|
||||
uint8_t ty = INDEX_TYPE_GET_TYPE(itm->colType);
|
||||
uint8_t ty = IDX_TYPE_GET_TYPE(itm->colType);
|
||||
|
||||
int32_t sz = itm->nColName + itm->nColVal + sizeof(uint8_t) + sizeof(JSON_VALUE_DELIM) * 2 + 1;
|
||||
char* buf = (char*)taosMemoryCalloc(1, sz);
|
||||
|
@ -240,7 +240,7 @@ char* idxPackJsonDataPrefix(SIndexTerm* itm, int32_t* skip) {
|
|||
* |<-----colname---->|<-----dataType---->|<--------colVal---------->|
|
||||
* |<-----string----->|<-----uint8_t----->|<----depend on dataType-->|
|
||||
*/
|
||||
uint8_t ty = INDEX_TYPE_GET_TYPE(itm->colType);
|
||||
uint8_t ty = IDX_TYPE_GET_TYPE(itm->colType);
|
||||
|
||||
int32_t sz = itm->nColName + itm->nColVal + sizeof(uint8_t) + sizeof(JSON_VALUE_DELIM) * 2 + 1;
|
||||
char* buf = (char*)taosMemoryCalloc(1, sz);
|
||||
|
@ -267,7 +267,7 @@ char* idxPackJsonDataPrefixNoType(SIndexTerm* itm, int32_t* skip) {
|
|||
* |<-----colname---->|<-----dataType---->|<--------colVal---------->|
|
||||
* |<-----string----->|<-----uint8_t----->|<----depend on dataType-->|
|
||||
*/
|
||||
uint8_t ty = INDEX_TYPE_GET_TYPE(itm->colType);
|
||||
uint8_t ty = IDX_TYPE_GET_TYPE(itm->colType);
|
||||
|
||||
int32_t sz = itm->nColName + itm->nColVal + sizeof(uint8_t) + sizeof(JSON_VALUE_DELIM) * 2 + 1;
|
||||
char* buf = (char*)taosMemoryCalloc(1, sz);
|
||||
|
|
|
@ -30,7 +30,7 @@ int indexJsonPut(SIndexJson *index, SIndexJsonMultiTerm *terms, uint64_t uid) {
|
|||
} else {
|
||||
p->colType = TSDB_DATA_TYPE_DOUBLE;
|
||||
}
|
||||
INDEX_TYPE_ADD_EXTERN_TYPE(p->colType, TSDB_DATA_TYPE_JSON);
|
||||
IDX_TYPE_ADD_EXTERN_TYPE(p->colType, TSDB_DATA_TYPE_JSON);
|
||||
}
|
||||
// handle put
|
||||
return indexPut(index, terms, uid);
|
||||
|
@ -48,7 +48,7 @@ int indexJsonSearch(SIndexJson *index, SIndexJsonMultiTermQuery *tq, SArray *res
|
|||
} else {
|
||||
p->colType = TSDB_DATA_TYPE_DOUBLE;
|
||||
}
|
||||
INDEX_TYPE_ADD_EXTERN_TYPE(p->colType, TSDB_DATA_TYPE_JSON);
|
||||
IDX_TYPE_ADD_EXTERN_TYPE(p->colType, TSDB_DATA_TYPE_JSON);
|
||||
}
|
||||
// handle search
|
||||
return indexSearch(index, tq, result);
|
||||
|
|
|
@ -281,7 +281,7 @@ static int32_t tfSearchSuffix(void* reader, SIndexTerm* tem, SIdxTRslt* tr) {
|
|||
return 0;
|
||||
}
|
||||
static int32_t tfSearchRegex(void* reader, SIndexTerm* tem, SIdxTRslt* tr) {
|
||||
bool hasJson = INDEX_TYPE_CONTAIN_EXTERN_TYPE(tem->colType, TSDB_DATA_TYPE_JSON);
|
||||
bool hasJson = IDX_TYPE_CONTAIN_EXTERN_TYPE(tem->colType, TSDB_DATA_TYPE_JSON);
|
||||
|
||||
int ret = 0;
|
||||
char* p = tem->colVal;
|
||||
|
@ -457,7 +457,7 @@ static int32_t tfSearchCompareFunc_JSON(void* reader, SIndexTerm* tem, SIdxTRslt
|
|||
} else if (0 != strncmp(ch, p, skip)) {
|
||||
continue;
|
||||
}
|
||||
cond = cmpFn(ch + skip, tem->colVal, INDEX_TYPE_GET_TYPE(tem->colType));
|
||||
cond = cmpFn(ch + skip, tem->colVal, IDX_TYPE_GET_TYPE(tem->colType));
|
||||
}
|
||||
if (MATCH == cond) {
|
||||
tfileReaderLoadTableIds((TFileReader*)reader, rt->out.out, tr->total);
|
||||
|
@ -476,7 +476,7 @@ int tfileReaderSearch(TFileReader* reader, SIndexTermQuery* query, SIdxTRslt* tr
|
|||
SIndexTerm* term = query->term;
|
||||
EIndexQueryType qtype = query->qType;
|
||||
int ret = 0;
|
||||
if (INDEX_TYPE_CONTAIN_EXTERN_TYPE(term->colType, TSDB_DATA_TYPE_JSON)) {
|
||||
if (IDX_TYPE_CONTAIN_EXTERN_TYPE(term->colType, TSDB_DATA_TYPE_JSON)) {
|
||||
ret = tfSearch[1][qtype](reader, term, tr);
|
||||
} else {
|
||||
ret = tfSearch[0][qtype](reader, term, tr);
|
||||
|
@ -536,7 +536,7 @@ int tfileWriterPut(TFileWriter* tw, void* data, bool order) {
|
|||
__compar_fn_t fn;
|
||||
|
||||
int8_t colType = tw->header.colType;
|
||||
colType = INDEX_TYPE_GET_TYPE(colType);
|
||||
colType = IDX_TYPE_GET_TYPE(colType);
|
||||
if (colType == TSDB_DATA_TYPE_BINARY || colType == TSDB_DATA_TYPE_NCHAR) {
|
||||
fn = tfileStrCompare;
|
||||
} else {
|
||||
|
@ -845,7 +845,7 @@ static int tfileWriteData(TFileWriter* write, TFileValue* tval) {
|
|||
TFileHeader* header = &write->header;
|
||||
uint8_t colType = header->colType;
|
||||
|
||||
colType = INDEX_TYPE_GET_TYPE(colType);
|
||||
colType = IDX_TYPE_GET_TYPE(colType);
|
||||
FstSlice key = fstSliceCreate((uint8_t*)(tval->colVal), (size_t)strlen(tval->colVal));
|
||||
if (fstBuilderInsert(write->fb, key, tval->offset)) {
|
||||
fstSliceDestroy(&key);
|
||||
|
|
Loading…
Reference in New Issue