refactor code
This commit is contained in:
parent
7d29af7316
commit
2e0dc58890
|
@ -40,7 +40,7 @@ typedef struct IndexCache {
|
|||
SIndex* index;
|
||||
char* colName;
|
||||
int32_t version;
|
||||
int32_t nTerm;
|
||||
int64_t occupiedMem;
|
||||
int8_t type;
|
||||
uint64_t suid;
|
||||
|
||||
|
|
|
@ -21,6 +21,8 @@
|
|||
#define MAX_INDEX_KEY_LEN 256 // test only, change later
|
||||
|
||||
#define MEM_TERM_LIMIT 10 * 10000
|
||||
#define MEM_THRESHOLD 1024 * 1024 * 2
|
||||
#define MEM_ESTIMATE_RADIO 1.5
|
||||
|
||||
static void indexMemRef(MemTable* tbl);
|
||||
static void indexMemUnRef(MemTable* tbl);
|
||||
|
@ -50,6 +52,7 @@ IndexCache* indexCacheCreate(SIndex* idx, uint64_t suid, const char* colName, in
|
|||
cache->index = idx;
|
||||
cache->version = 0;
|
||||
cache->suid = suid;
|
||||
cache->occupiedMem = 0;
|
||||
pthread_mutex_init(&cache->mtx, NULL);
|
||||
indexCacheRef(cache);
|
||||
return cache;
|
||||
|
@ -173,8 +176,7 @@ int indexCacheSchedToMerge(IndexCache* pCache) {
|
|||
}
|
||||
static void indexCacheMakeRoomForWrite(IndexCache* cache) {
|
||||
while (true) {
|
||||
if (cache->nTerm < MEM_TERM_LIMIT) {
|
||||
cache->nTerm += 1;
|
||||
if (cache->occupiedMem * MEM_ESTIMATE_RADIO < MEM_THRESHOLD) {
|
||||
break;
|
||||
} else if (cache->imm != NULL) {
|
||||
// TODO: wake up by condition variable
|
||||
|
@ -185,7 +187,7 @@ static void indexCacheMakeRoomForWrite(IndexCache* cache) {
|
|||
indexCacheRef(cache);
|
||||
cache->imm = cache->mem;
|
||||
cache->mem = indexInternalCacheCreate(cache->type);
|
||||
cache->nTerm = 1;
|
||||
cache->occupiedMem = 0;
|
||||
// sched to merge
|
||||
// unref cache in bgwork
|
||||
indexCacheSchedToMerge(cache);
|
||||
|
@ -211,8 +213,9 @@ int indexCachePut(void* cache, SIndexTerm* term, uint64_t uid) {
|
|||
ct->operaType = term->operType;
|
||||
|
||||
// ugly code, refactor later
|
||||
int64_t estimate = sizeof(ct) + strlen(ct->colVal);
|
||||
pthread_mutex_lock(&pCache->mtx);
|
||||
|
||||
pCache->occupiedMem += estimate;
|
||||
indexCacheMakeRoomForWrite(pCache);
|
||||
MemTable* tbl = pCache->mem;
|
||||
indexMemRef(tbl);
|
||||
|
@ -333,7 +336,7 @@ static int32_t compareKey(const void* l, const void* r) {
|
|||
static MemTable* indexInternalCacheCreate(int8_t type) {
|
||||
MemTable* tbl = calloc(1, sizeof(MemTable));
|
||||
indexMemRef(tbl);
|
||||
if (type == TSDB_DATA_TYPE_BINARY) {
|
||||
if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR) {
|
||||
tbl->mem = tSkipListCreate(MAX_SKIP_LIST_LEVEL, type, MAX_INDEX_KEY_LEN, compareKey, SL_ALLOW_DUP_KEY, getIndexKey);
|
||||
}
|
||||
return tbl;
|
||||
|
|
|
@ -346,9 +346,6 @@ int indexTFilePut(void* tfile, SIndexTerm* term, uint64_t uid) {
|
|||
}
|
||||
static bool tfileIteratorNext(Iterate* iiter) {
|
||||
IterateValue* iv = &iiter->val;
|
||||
if (iv->colVal != NULL && iv->val != NULL) {
|
||||
// indexError("value in fst: colVal: %s, size: %d", iv->colVal, (int)taosArrayGetSize(iv->val));
|
||||
}
|
||||
iterateValueDestroy(iv, false);
|
||||
|
||||
char* colVal = NULL;
|
||||
|
|
Loading…
Reference in New Issue