diff --git a/source/libs/index/inc/index_cache.h b/source/libs/index/inc/index_cache.h index 27f5a6fb20..0e7405869a 100644 --- a/source/libs/index/inc/index_cache.h +++ b/source/libs/index/inc/index_cache.h @@ -39,6 +39,7 @@ typedef struct IndexCache { int32_t nTerm; int8_t type; + pthread_mutex_t mtx; } IndexCache; #define CACHE_VERSION(cache) atomic_load_32(&cache->version) @@ -71,7 +72,7 @@ void indexCacheUnRef(IndexCache* cache); void indexCacheDebug(IndexCache* cache); -void indexCacheDestroySkiplist(SSkipList* slt); +void indexCacheDestroyImm(IndexCache* cache); #ifdef __cplusplus } #endif diff --git a/source/libs/index/src/index.c b/source/libs/index/src/index.c index 23df5f9f9a..06e7e8ba44 100644 --- a/source/libs/index/src/index.c +++ b/source/libs/index/src/index.c @@ -436,9 +436,7 @@ int indexFlushCacheTFile(SIndex* sIdx, void* cache) { if (ret != 0) { indexError("faile to write into tindex "); } } // not free later, just put int table cache - SSkipList* timm = (SSkipList*)pCache->imm; - pCache->imm = NULL; // or throw int bg thread - indexCacheDestroySkiplist(timm); + indexCacheDestroyImm(pCache); tfileWriteClose(tw); indexCacheIteratorDestroy(cacheIter); diff --git a/source/libs/index/src/index_cache.c b/source/libs/index/src/index_cache.c index 54aee8858b..3f99d04bc9 100644 --- a/source/libs/index/src/index_cache.c +++ b/source/libs/index/src/index_cache.c @@ -20,7 +20,7 @@ #define MAX_INDEX_KEY_LEN 256 // test only, change later -#define CACH_LIMIT 1000000 +#define MEM_TERM_LIMIT 1000000 // ref index_cache.h:22 //#define CACHE_KEY_LEN(p) \ // (sizeof(int32_t) + sizeof(uint16_t) + sizeof(p->colType) + sizeof(p->nColVal) + p->nColVal + sizeof(uint64_t) + sizeof(p->operType)) @@ -78,6 +78,7 @@ IndexCache* indexCacheCreate(SIndex* idx, const char* colName, int8_t type) { cache->index = idx; cache->version = 0; + pthread_mutex_init(&cache->mtx, NULL); indexCacheRef(cache); return cache; } @@ -103,13 +104,21 @@ void indexCacheDestroySkiplist(SSkipList* slt) { } tSkipListDestroyIter(iter); } +void indexCacheDestroyImm(IndexCache* cache) { + pthread_mutex_lock(&cache->mtx); + SSkipList* timm = (SSkipList*)cache->imm; + cache->imm = NULL; // or throw int bg thread + pthread_mutex_unlock(&cache->mtx); + indexCacheDestroySkiplist(timm); +} void indexCacheDestroy(void* cache) { IndexCache* pCache = cache; if (pCache == NULL) { return; } tSkipListDestroy(pCache->mem); tSkipListDestroy(pCache->imm); free(pCache->colName); + free(pCache); } @@ -170,6 +179,27 @@ int indexCacheSchedToMerge(IndexCache* pCache) { taosScheduleTask(indexQhandle, &schedMsg); } +static void indexCacheMakeRoomForWrite(IndexCache* cache) { + while (true) { + if (cache->nTerm < MEM_TERM_LIMIT) { + cache->nTerm += 1; + break; + } else if (cache->imm != NULL) { + // TODO: wake up by condition variable + pthread_mutex_unlock(&cache->mtx); + taosMsleep(50); + pthread_mutex_lock(&cache->mtx); + } else { + cache->imm = cache->mem; + cache->mem = indexInternalCacheCreate(cache->type); + cache->nTerm = 1; + // sched to merge + // unref cache in bgwork + indexCacheSchedToMerge(cache); + } + } +} + int indexCachePut(void* cache, SIndexTerm* term, uint64_t uid) { if (cache == NULL) { return -1; } @@ -188,23 +218,12 @@ int indexCachePut(void* cache, SIndexTerm* term, uint64_t uid) { ct->uid = uid; ct->operaType = term->operType; + // ugly code, refactor later + pthread_mutex_lock(&pCache->mtx); + indexCacheMakeRoomForWrite(pCache); tSkipListPut(pCache->mem, (char*)ct); - pCache->nTerm += 1; + pthread_mutex_unlock(&pCache->mtx); - if (pCache->nTerm >= CACH_LIMIT) { - pCache->nTerm = 0; - - while (pCache->imm != NULL) { - // do nothong - } - - pCache->imm = pCache->mem; - pCache->mem = indexInternalCacheCreate(pCache->type); - - // sched to merge - // unref cache int bgwork - indexCacheSchedToMerge(pCache); - } indexCacheUnRef(pCache); return 0; // encode end