refactor index write

This commit is contained in:
yihaoDeng 2021-12-28 14:36:44 +08:00
parent 7e3e6022db
commit 80718aee34
3 changed files with 38 additions and 20 deletions

View File

@ -39,6 +39,7 @@ typedef struct IndexCache {
int32_t nTerm; int32_t nTerm;
int8_t type; int8_t type;
pthread_mutex_t mtx;
} IndexCache; } IndexCache;
#define CACHE_VERSION(cache) atomic_load_32(&cache->version) #define CACHE_VERSION(cache) atomic_load_32(&cache->version)
@ -71,7 +72,7 @@ void indexCacheUnRef(IndexCache* cache);
void indexCacheDebug(IndexCache* cache); void indexCacheDebug(IndexCache* cache);
void indexCacheDestroySkiplist(SSkipList* slt); void indexCacheDestroyImm(IndexCache* cache);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif

View File

@ -436,9 +436,7 @@ int indexFlushCacheTFile(SIndex* sIdx, void* cache) {
if (ret != 0) { indexError("faile to write into tindex "); } if (ret != 0) { indexError("faile to write into tindex "); }
} }
// not free later, just put int table cache // not free later, just put int table cache
SSkipList* timm = (SSkipList*)pCache->imm; indexCacheDestroyImm(pCache);
pCache->imm = NULL; // or throw int bg thread
indexCacheDestroySkiplist(timm);
tfileWriteClose(tw); tfileWriteClose(tw);
indexCacheIteratorDestroy(cacheIter); indexCacheIteratorDestroy(cacheIter);

View File

@ -20,7 +20,7 @@
#define MAX_INDEX_KEY_LEN 256 // test only, change later #define MAX_INDEX_KEY_LEN 256 // test only, change later
#define CACH_LIMIT 1000000 #define MEM_TERM_LIMIT 1000000
// ref index_cache.h:22 // ref index_cache.h:22
//#define CACHE_KEY_LEN(p) \ //#define CACHE_KEY_LEN(p) \
// (sizeof(int32_t) + sizeof(uint16_t) + sizeof(p->colType) + sizeof(p->nColVal) + p->nColVal + sizeof(uint64_t) + sizeof(p->operType)) // (sizeof(int32_t) + sizeof(uint16_t) + sizeof(p->colType) + sizeof(p->nColVal) + p->nColVal + sizeof(uint64_t) + sizeof(p->operType))
@ -78,6 +78,7 @@ IndexCache* indexCacheCreate(SIndex* idx, const char* colName, int8_t type) {
cache->index = idx; cache->index = idx;
cache->version = 0; cache->version = 0;
pthread_mutex_init(&cache->mtx, NULL);
indexCacheRef(cache); indexCacheRef(cache);
return cache; return cache;
} }
@ -103,13 +104,21 @@ void indexCacheDestroySkiplist(SSkipList* slt) {
} }
tSkipListDestroyIter(iter); tSkipListDestroyIter(iter);
} }
void indexCacheDestroyImm(IndexCache* cache) {
pthread_mutex_lock(&cache->mtx);
SSkipList* timm = (SSkipList*)cache->imm;
cache->imm = NULL; // or throw int bg thread
pthread_mutex_unlock(&cache->mtx);
indexCacheDestroySkiplist(timm);
}
void indexCacheDestroy(void* cache) { void indexCacheDestroy(void* cache) {
IndexCache* pCache = cache; IndexCache* pCache = cache;
if (pCache == NULL) { return; } if (pCache == NULL) { return; }
tSkipListDestroy(pCache->mem); tSkipListDestroy(pCache->mem);
tSkipListDestroy(pCache->imm); tSkipListDestroy(pCache->imm);
free(pCache->colName); free(pCache->colName);
free(pCache); free(pCache);
} }
@ -170,6 +179,27 @@ int indexCacheSchedToMerge(IndexCache* pCache) {
taosScheduleTask(indexQhandle, &schedMsg); taosScheduleTask(indexQhandle, &schedMsg);
} }
static void indexCacheMakeRoomForWrite(IndexCache* cache) {
while (true) {
if (cache->nTerm < MEM_TERM_LIMIT) {
cache->nTerm += 1;
break;
} else if (cache->imm != NULL) {
// TODO: wake up by condition variable
pthread_mutex_unlock(&cache->mtx);
taosMsleep(50);
pthread_mutex_lock(&cache->mtx);
} else {
cache->imm = cache->mem;
cache->mem = indexInternalCacheCreate(cache->type);
cache->nTerm = 1;
// sched to merge
// unref cache in bgwork
indexCacheSchedToMerge(cache);
}
}
}
int indexCachePut(void* cache, SIndexTerm* term, uint64_t uid) { int indexCachePut(void* cache, SIndexTerm* term, uint64_t uid) {
if (cache == NULL) { return -1; } if (cache == NULL) { return -1; }
@ -188,23 +218,12 @@ int indexCachePut(void* cache, SIndexTerm* term, uint64_t uid) {
ct->uid = uid; ct->uid = uid;
ct->operaType = term->operType; ct->operaType = term->operType;
// ugly code, refactor later
pthread_mutex_lock(&pCache->mtx);
indexCacheMakeRoomForWrite(pCache);
tSkipListPut(pCache->mem, (char*)ct); tSkipListPut(pCache->mem, (char*)ct);
pCache->nTerm += 1; pthread_mutex_unlock(&pCache->mtx);
if (pCache->nTerm >= CACH_LIMIT) {
pCache->nTerm = 0;
while (pCache->imm != NULL) {
// do nothong
}
pCache->imm = pCache->mem;
pCache->mem = indexInternalCacheCreate(pCache->type);
// sched to merge
// unref cache int bgwork
indexCacheSchedToMerge(pCache);
}
indexCacheUnRef(pCache); indexCacheUnRef(pCache);
return 0; return 0;
// encode end // encode end