Merge pull request #13332 from taosdata/enh/indexTest

fix: close cause flush result error
This commit is contained in:
Yihao Deng 2022-05-31 22:38:05 +08:00 committed by GitHub
commit 5422db24b9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 13 additions and 4 deletions

View File

@ -36,6 +36,7 @@ typedef struct MemTable {
typedef struct IndexCache { typedef struct IndexCache {
T_REF_DECLARE() T_REF_DECLARE()
MemTable *mem, *imm; MemTable *mem, *imm;
int32_t merging;
SIndex* index; SIndex* index;
char* colName; char* colName;
int64_t version; int64_t version;

View File

@ -463,6 +463,9 @@ int indexFlushCacheToTFile(SIndex* sIdx, void* cache) {
int64_t st = taosGetTimestampUs(); int64_t st = taosGetTimestampUs();
IndexCache* pCache = (IndexCache*)cache; IndexCache* pCache = (IndexCache*)cache;
while (sIdx->quit && atomic_load_32(&pCache->merging) == 1) {
}
TFileReader* pReader = tfileGetReaderByCol(sIdx->tindex, pCache->suid, pCache->colName); TFileReader* pReader = tfileGetReaderByCol(sIdx->tindex, pCache->suid, pCache->colName);
if (pReader == NULL) { if (pReader == NULL) {
indexWarn("empty tfile reader found"); indexWarn("empty tfile reader found");
@ -475,9 +478,9 @@ int indexFlushCacheToTFile(SIndex* sIdx, void* cache) {
tfileReaderUnRef(pReader); tfileReaderUnRef(pReader);
if (sIdx->quit) { if (sIdx->quit) {
indexPost(sIdx); indexPost(sIdx);
// indexCacheBroadcast(pCache);
} }
indexReleaseRef(sIdx->refId); indexReleaseRef(sIdx->refId);
atomic_store_32(&pCache->merging, 0);
return 0; return 0;
} }
@ -539,6 +542,7 @@ int indexFlushCacheToTFile(SIndex* sIdx, void* cache) {
if (sIdx->quit) { if (sIdx->quit) {
indexPost(sIdx); indexPost(sIdx);
} }
atomic_store_32(&pCache->merging, 0);
indexReleaseRef(sIdx->refId); indexReleaseRef(sIdx->refId);
return ret; return ret;
@ -605,6 +609,7 @@ static int indexGenTFile(SIndex* sIdx, IndexCache* cache, SArray* batch) {
taosThreadMutexLock(&tf->mtx); taosThreadMutexLock(&tf->mtx);
tfileCachePut(tf->cache, &key, reader); tfileCachePut(tf->cache, &key, reader);
taosThreadMutexUnlock(&tf->mtx); taosThreadMutexUnlock(&tf->mtx);
return ret; return ret;
END: END:
if (tw != NULL) { if (tw != NULL) {

View File

@ -494,16 +494,19 @@ static void indexCacheMakeRoomForWrite(IndexCache* cache) {
// TODO: wake up by condition variable // TODO: wake up by condition variable
indexCacheWait(cache); indexCacheWait(cache);
} else { } else {
bool notifyQuit = cache->occupiedMem >= MEM_SIGNAL_QUIT ? true : false; bool quit = cache->occupiedMem >= MEM_SIGNAL_QUIT ? true : false;
indexCacheRef(cache); indexCacheRef(cache);
cache->imm = cache->mem; cache->imm = cache->mem;
cache->mem = indexInternalCacheCreate(cache->type); cache->mem = indexInternalCacheCreate(cache->type);
cache->mem->pCache = cache; cache->mem->pCache = cache;
cache->occupiedMem = 0; cache->occupiedMem = 0;
if (quit == false) {
atomic_store_32(&cache->merging, 1);
}
// sched to merge // sched to merge
// unref cache in bgwork // unref cache in bgwork
indexCacheSchedToMerge(cache, notifyQuit); indexCacheSchedToMerge(cache, quit);
} }
} }
} }