refactor code
This commit is contained in:
parent
4e94a21082
commit
971efd7ef4
|
@ -49,7 +49,6 @@ typedef struct IndexCache {
|
||||||
#define CACHE_VERSION(cache) atomic_load_32(&cache->version)
|
#define CACHE_VERSION(cache) atomic_load_32(&cache->version)
|
||||||
typedef struct CacheTerm {
|
typedef struct CacheTerm {
|
||||||
// key
|
// key
|
||||||
int32_t nColVal;
|
|
||||||
char* colVal;
|
char* colVal;
|
||||||
int32_t version;
|
int32_t version;
|
||||||
// value
|
// value
|
||||||
|
|
|
@ -227,14 +227,15 @@ SIndexOpts* indexOptsCreate() {
|
||||||
#endif
|
#endif
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
void indexOptsDestroy(SIndexOpts* opts){
|
void indexOptsDestroy(SIndexOpts* opts) {
|
||||||
#ifdef USE_LUCENE
|
#ifdef USE_LUCENE
|
||||||
#endif
|
#endif
|
||||||
} /*
|
return;
|
||||||
|
}
|
||||||
|
/*
|
||||||
* @param: oper
|
* @param: oper
|
||||||
*
|
*
|
||||||
*/
|
*/
|
||||||
|
|
||||||
SIndexMultiTermQuery* indexMultiTermQueryCreate(EIndexOperatorType opera) {
|
SIndexMultiTermQuery* indexMultiTermQueryCreate(EIndexOperatorType opera) {
|
||||||
SIndexMultiTermQuery* p = (SIndexMultiTermQuery*)malloc(sizeof(SIndexMultiTermQuery));
|
SIndexMultiTermQuery* p = (SIndexMultiTermQuery*)malloc(sizeof(SIndexMultiTermQuery));
|
||||||
if (p == NULL) {
|
if (p == NULL) {
|
||||||
|
@ -403,7 +404,6 @@ int indexFlushCacheTFile(SIndex* sIdx, void* cache) {
|
||||||
TFileValue* tfv = tfileValueCreate(cv->colVal);
|
TFileValue* tfv = tfileValueCreate(cv->colVal);
|
||||||
taosArrayAddAll(tfv->tableId, cv->val);
|
taosArrayAddAll(tfv->tableId, cv->val);
|
||||||
taosArrayPush(result, &tfv);
|
taosArrayPush(result, &tfv);
|
||||||
|
|
||||||
// copy to final Result;
|
// copy to final Result;
|
||||||
cn = cacheIter->next(cacheIter);
|
cn = cacheIter->next(cacheIter);
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -23,21 +23,21 @@
|
||||||
#define MEM_TERM_LIMIT 1000000
|
#define MEM_TERM_LIMIT 1000000
|
||||||
// ref index_cache.h:22
|
// ref index_cache.h:22
|
||||||
//#define CACHE_KEY_LEN(p) \
|
//#define CACHE_KEY_LEN(p) \
|
||||||
// (sizeof(int32_t) + sizeof(uint16_t) + sizeof(p->colType) + sizeof(p->nColVal) + p->nColVal + sizeof(uint64_t) + sizeof(p->operType))
|
// (sizeof(int32_t) + sizeof(uint16_t) + sizeof(p->colType) + sizeof(p->nColVal) + p->nColVal + sizeof(uint64_t) +
|
||||||
|
// sizeof(p->operType))
|
||||||
|
|
||||||
void indexMemRef(MemTable* tbl);
|
static void indexMemRef(MemTable* tbl);
|
||||||
void indexMemUnRef(MemTable* tbl);
|
static void indexMemUnRef(MemTable* tbl);
|
||||||
|
|
||||||
void indexCacheRef(IndexCache* cache);
|
|
||||||
void indexCacheUnRef(IndexCache* cache);
|
|
||||||
|
|
||||||
static void cacheTermDestroy(CacheTerm* ct);
|
static void cacheTermDestroy(CacheTerm* ct);
|
||||||
static char* getIndexKey(const void* pData);
|
static char* getIndexKey(const void* pData);
|
||||||
static int32_t compareKey(const void* l, const void* r);
|
static int32_t compareKey(const void* l, const void* r);
|
||||||
|
|
||||||
static MemTable* indexInternalCacheCreate(int8_t type);
|
static MemTable* indexInternalCacheCreate(int8_t type);
|
||||||
|
|
||||||
static void doMergeWork(SSchedMsg* msg);
|
static void doMergeWork(SSchedMsg* msg);
|
||||||
static bool indexCacheIteratorNext(Iterate* itera);
|
static bool indexCacheIteratorNext(Iterate* itera);
|
||||||
|
|
||||||
static IterateValue* indexCacheIteratorGetValue(Iterate* iter);
|
static IterateValue* indexCacheIteratorGetValue(Iterate* iter);
|
||||||
|
|
||||||
IndexCache* indexCacheCreate(SIndex* idx, const char* colName, int8_t type) {
|
IndexCache* indexCacheCreate(SIndex* idx, const char* colName, int8_t type) {
|
||||||
|
@ -86,7 +86,8 @@ void indexCacheDestroySkiplist(SSkipList* slt) {
|
||||||
while (tSkipListIterNext(iter)) {
|
while (tSkipListIterNext(iter)) {
|
||||||
SSkipListNode* node = tSkipListIterGet(iter);
|
SSkipListNode* node = tSkipListIterGet(iter);
|
||||||
CacheTerm* ct = (CacheTerm*)SL_GET_NODE_DATA(node);
|
CacheTerm* ct = (CacheTerm*)SL_GET_NODE_DATA(node);
|
||||||
if (ct != NULL) {}
|
if (ct != NULL) {
|
||||||
|
}
|
||||||
}
|
}
|
||||||
tSkipListDestroyIter(iter);
|
tSkipListDestroyIter(iter);
|
||||||
tSkipListDestroy(slt);
|
tSkipListDestroy(slt);
|
||||||
|
@ -101,7 +102,9 @@ void indexCacheDestroyImm(IndexCache* cache) {
|
||||||
}
|
}
|
||||||
void indexCacheDestroy(void* cache) {
|
void indexCacheDestroy(void* cache) {
|
||||||
IndexCache* pCache = cache;
|
IndexCache* pCache = cache;
|
||||||
if (pCache == NULL) { return; }
|
if (pCache == NULL) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
indexMemUnRef(pCache->mem);
|
indexMemUnRef(pCache->mem);
|
||||||
indexMemUnRef(pCache->imm);
|
indexMemUnRef(pCache->imm);
|
||||||
free(pCache->colName);
|
free(pCache->colName);
|
||||||
|
@ -111,7 +114,9 @@ void indexCacheDestroy(void* cache) {
|
||||||
|
|
||||||
Iterate* indexCacheIteratorCreate(IndexCache* cache) {
|
Iterate* indexCacheIteratorCreate(IndexCache* cache) {
|
||||||
Iterate* iiter = calloc(1, sizeof(Iterate));
|
Iterate* iiter = calloc(1, sizeof(Iterate));
|
||||||
if (iiter == NULL) { return NULL; }
|
if (iiter == NULL) {
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
MemTable* tbl = cache->imm;
|
MemTable* tbl = cache->imm;
|
||||||
iiter->val.val = taosArrayInit(1, sizeof(uint64_t));
|
iiter->val.val = taosArrayInit(1, sizeof(uint64_t));
|
||||||
|
@ -122,8 +127,9 @@ Iterate* indexCacheIteratorCreate(IndexCache* cache) {
|
||||||
return iiter;
|
return iiter;
|
||||||
}
|
}
|
||||||
void indexCacheIteratorDestroy(Iterate* iter) {
|
void indexCacheIteratorDestroy(Iterate* iter) {
|
||||||
if (iter == NULL) { return; }
|
if (iter == NULL) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
tSkipListDestroyIter(iter->iter);
|
tSkipListDestroyIter(iter->iter);
|
||||||
iterateValueDestroy(&iter->val, true);
|
iterateValueDestroy(&iter->val, true);
|
||||||
free(iter);
|
free(iter);
|
||||||
|
@ -160,18 +166,21 @@ static void indexCacheMakeRoomForWrite(IndexCache* cache) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int indexCachePut(void* cache, SIndexTerm* term, uint64_t uid) {
|
int indexCachePut(void* cache, SIndexTerm* term, uint64_t uid) {
|
||||||
if (cache == NULL) { return -1; }
|
if (cache == NULL) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
IndexCache* pCache = cache;
|
IndexCache* pCache = cache;
|
||||||
indexCacheRef(pCache);
|
indexCacheRef(pCache);
|
||||||
// encode data
|
// encode data
|
||||||
CacheTerm* ct = calloc(1, sizeof(CacheTerm));
|
CacheTerm* ct = calloc(1, sizeof(CacheTerm));
|
||||||
if (cache == NULL) { return -1; }
|
if (cache == NULL) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
// set up key
|
// set up key
|
||||||
ct->colType = term->colType;
|
ct->colType = term->colType;
|
||||||
ct->nColVal = term->nColVal;
|
ct->colVal = (char*)calloc(1, sizeof(char) * (term->nColVal + 1));
|
||||||
ct->colVal = (char*)calloc(1, sizeof(char) * (ct->nColVal + 1));
|
memcpy(ct->colVal, term->colVal, term->nColVal);
|
||||||
memcpy(ct->colVal, term->colVal, ct->nColVal);
|
|
||||||
ct->version = atomic_add_fetch_32(&pCache->version, 1);
|
ct->version = atomic_add_fetch_32(&pCache->version, 1);
|
||||||
// set value
|
// set value
|
||||||
ct->uid = uid;
|
ct->uid = uid;
|
||||||
|
@ -197,7 +206,9 @@ int indexCacheDel(void* cache, const char* fieldValue, int32_t fvlen, uint64_t u
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
int indexCacheSearch(void* cache, SIndexTermQuery* query, SArray* result, STermValueType* s) {
|
int indexCacheSearch(void* cache, SIndexTermQuery* query, SArray* result, STermValueType* s) {
|
||||||
if (cache == NULL) { return -1; }
|
if (cache == NULL) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
IndexCache* pCache = cache;
|
IndexCache* pCache = cache;
|
||||||
SIndexTerm* term = query->term;
|
SIndexTerm* term = query->term;
|
||||||
EIndexQueryType qtype = query->qType;
|
EIndexQueryType qtype = query->qType;
|
||||||
|
@ -211,10 +222,11 @@ int indexCacheSearch(void* cache, SIndexTermQuery* query, SArray* result, STermV
|
||||||
pthread_mutex_unlock(&pCache->mtx);
|
pthread_mutex_unlock(&pCache->mtx);
|
||||||
|
|
||||||
CacheTerm* ct = calloc(1, sizeof(CacheTerm));
|
CacheTerm* ct = calloc(1, sizeof(CacheTerm));
|
||||||
if (ct == NULL) { return -1; }
|
if (ct == NULL) {
|
||||||
ct->nColVal = term->nColVal;
|
return -1;
|
||||||
ct->colVal = calloc(1, sizeof(char) * (ct->nColVal + 1));
|
}
|
||||||
memcpy(ct->colVal, term->colVal, ct->nColVal);
|
ct->colVal = calloc(1, sizeof(char) * (term->nColVal + 1));
|
||||||
|
memcpy(ct->colVal, term->colVal, term->nColVal);
|
||||||
ct->version = atomic_load_32(&pCache->version);
|
ct->version = atomic_load_32(&pCache->version);
|
||||||
|
|
||||||
char* key = getIndexKey(ct);
|
char* key = getIndexKey(ct);
|
||||||
|
@ -225,7 +237,7 @@ int indexCacheSearch(void* cache, SIndexTermQuery* query, SArray* result, STermV
|
||||||
if (node != NULL) {
|
if (node != NULL) {
|
||||||
CacheTerm* c = (CacheTerm*)SL_GET_NODE_DATA(node);
|
CacheTerm* c = (CacheTerm*)SL_GET_NODE_DATA(node);
|
||||||
if (c->operaType == ADD_VALUE || qtype == QUERY_TERM) {
|
if (c->operaType == ADD_VALUE || qtype == QUERY_TERM) {
|
||||||
if (c->nColVal == ct->nColVal && strncmp(c->colVal, ct->colVal, c->nColVal) == 0) {
|
if (strcmp(c->colVal, ct->colVal) == 0) {
|
||||||
taosArrayPush(result, &c->uid);
|
taosArrayPush(result, &c->uid);
|
||||||
*s = kTypeValue;
|
*s = kTypeValue;
|
||||||
} else {
|
} else {
|
||||||
|
@ -257,26 +269,33 @@ int indexCacheSearch(void* cache, SIndexTermQuery* query, SArray* result, STermV
|
||||||
}
|
}
|
||||||
|
|
||||||
void indexCacheRef(IndexCache* cache) {
|
void indexCacheRef(IndexCache* cache) {
|
||||||
if (cache == NULL) { return; }
|
if (cache == NULL) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
int ref = T_REF_INC(cache);
|
int ref = T_REF_INC(cache);
|
||||||
UNUSED(ref);
|
UNUSED(ref);
|
||||||
}
|
}
|
||||||
void indexCacheUnRef(IndexCache* cache) {
|
void indexCacheUnRef(IndexCache* cache) {
|
||||||
if (cache == NULL) { return; }
|
if (cache == NULL) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
int ref = T_REF_DEC(cache);
|
int ref = T_REF_DEC(cache);
|
||||||
if (ref == 0) { indexCacheDestroy(cache); }
|
if (ref == 0) {
|
||||||
|
indexCacheDestroy(cache);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void indexMemRef(MemTable* tbl) {
|
void indexMemRef(MemTable* tbl) {
|
||||||
if (tbl == NULL) { return; }
|
if (tbl == NULL) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
int ref = T_REF_INC(tbl);
|
int ref = T_REF_INC(tbl);
|
||||||
UNUSED(ref);
|
UNUSED(ref);
|
||||||
}
|
}
|
||||||
void indexMemUnRef(MemTable* tbl) {
|
void indexMemUnRef(MemTable* tbl) {
|
||||||
if (tbl == NULL) { return; }
|
if (tbl == NULL) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
int ref = T_REF_DEC(tbl);
|
int ref = T_REF_DEC(tbl);
|
||||||
if (ref == 0) {
|
if (ref == 0) {
|
||||||
SSkipList* slt = tbl->mem;
|
SSkipList* slt = tbl->mem;
|
||||||
|
@ -286,8 +305,9 @@ void indexMemUnRef(MemTable* tbl) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static void cacheTermDestroy(CacheTerm* ct) {
|
static void cacheTermDestroy(CacheTerm* ct) {
|
||||||
if (ct == NULL) { return; }
|
if (ct == NULL) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
free(ct->colVal);
|
free(ct->colVal);
|
||||||
free(ct);
|
free(ct);
|
||||||
}
|
}
|
||||||
|
@ -301,21 +321,11 @@ static int32_t compareKey(const void* l, const void* r) {
|
||||||
CacheTerm* rt = (CacheTerm*)r;
|
CacheTerm* rt = (CacheTerm*)r;
|
||||||
|
|
||||||
// compare colVal
|
// compare colVal
|
||||||
int i, j;
|
int32_t cmp = strcmp(lt->colVal, rt->colVal);
|
||||||
for (i = 0, j = 0; i < lt->nColVal && j < rt->nColVal; i++, j++) {
|
if (cmp == 0) {
|
||||||
if (lt->colVal[i] == rt->colVal[j]) {
|
|
||||||
continue;
|
|
||||||
} else {
|
|
||||||
return lt->colVal[i] < rt->colVal[j] ? -1 : 1;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (i < lt->nColVal) {
|
|
||||||
return 1;
|
|
||||||
} else if (j < rt->nColVal) {
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
// compare version
|
|
||||||
return rt->version - lt->version;
|
return rt->version - lt->version;
|
||||||
|
}
|
||||||
|
return cmp;
|
||||||
}
|
}
|
||||||
|
|
||||||
static MemTable* indexInternalCacheCreate(int8_t type) {
|
static MemTable* indexInternalCacheCreate(int8_t type) {
|
||||||
|
@ -334,8 +344,9 @@ static void doMergeWork(SSchedMsg* msg) {
|
||||||
}
|
}
|
||||||
static bool indexCacheIteratorNext(Iterate* itera) {
|
static bool indexCacheIteratorNext(Iterate* itera) {
|
||||||
SSkipListIterator* iter = itera->iter;
|
SSkipListIterator* iter = itera->iter;
|
||||||
if (iter == NULL) { return false; }
|
if (iter == NULL) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
IterateValue* iv = &itera->val;
|
IterateValue* iv = &itera->val;
|
||||||
iterateValueDestroy(iv, false);
|
iterateValueDestroy(iv, false);
|
||||||
|
|
||||||
|
@ -349,10 +360,7 @@ static bool indexCacheIteratorNext(Iterate* itera) {
|
||||||
|
|
||||||
taosArrayPush(iv->val, &ct->uid);
|
taosArrayPush(iv->val, &ct->uid);
|
||||||
}
|
}
|
||||||
|
|
||||||
return next;
|
return next;
|
||||||
}
|
}
|
||||||
|
|
||||||
static IterateValue* indexCacheIteratorGetValue(Iterate* iter) {
|
static IterateValue* indexCacheIteratorGetValue(Iterate* iter) { return &iter->val; }
|
||||||
return &iter->val;
|
|
||||||
}
|
|
||||||
|
|
Loading…
Reference in New Issue