diff --git a/source/libs/index/inc/indexCache.h b/source/libs/index/inc/indexCache.h index d474d87409..96bfb2cd7f 100644 --- a/source/libs/index/inc/indexCache.h +++ b/source/libs/index/inc/indexCache.h @@ -63,6 +63,7 @@ typedef struct CacheTerm { IndexCache* indexCacheCreate(SIndex* idx, uint64_t suid, const char* colName, int8_t type); +void indexCacheForceToMerge(void* cache); void indexCacheDestroy(void* cache); Iterate* indexCacheIteratorCreate(IndexCache* cache); diff --git a/source/libs/index/src/index.c b/source/libs/index/src/index.c index 92159dae7a..d1f50f1979 100644 --- a/source/libs/index/src/index.c +++ b/source/libs/index/src/index.c @@ -124,15 +124,21 @@ END: void indexDestroy(void* handle) { SIndex* sIdx = handle; - void* iter = taosHashIterate(sIdx->colObj, NULL); - while (iter) { - IndexCache** pCache = iter; - if (*pCache) { + // indexAcquireRef(sIdx->refId); + if (sIdx->colObj != NULL) { + void* iter = taosHashIterate(sIdx->colObj, NULL); + while (iter) { + IndexCache** pCache = iter; + indexCacheForceToMerge((void*)(*pCache)); + iter = taosHashIterate(sIdx->colObj, iter); + indexCacheUnRef(*pCache); } - iter = taosHashIterate(sIdx->colObj, iter); - } - taosHashCleanup(sIdx->colObj); + taosHashCleanup(sIdx->colObj); + sIdx->colObj = NULL; + return; + } // indexReleaseRef(sIdx->refId); + taosThreadMutexDestroy(&sIdx->mtx); indexTFileDestroy(sIdx->tindex); taosMemoryFree(sIdx->path); @@ -177,7 +183,6 @@ int indexPut(SIndex* index, SIndexMultiTerm* fVals, uint64_t uid) { taosHashPut(index->colObj, buf, sz, &pCache, sizeof(void*)); } } - taosThreadMutexUnlock(&index->mtx); for (int i = 0; i < taosArrayGetSize(fVals); i++) { SIndexTerm* p = taosArrayGetP(fVals, i); @@ -193,6 +198,7 @@ int indexPut(SIndex* index, SIndexMultiTerm* fVals, uint64_t uid) { return ret; } } + taosThreadMutexUnlock(&index->mtx); return 0; } int indexSearch(SIndex* index, SIndexMultiTermQuery* multiQuerys, SArray* result) { @@ -451,6 +457,14 @@ int indexFlushCacheToTFile(SIndex* sIdx, void* cache) { } // handle flush Iterate* cacheIter = indexCacheIteratorCreate(pCache); + if (cacheIter == NULL) { + indexError("%p immtable is empty, ignore merge opera", pCache); + indexCacheDestroyImm(pCache); + tfileReaderUnRef(pReader); + indexReleaseRef(sIdx->refId); + return 0; + } + Iterate* tfileIter = tfileIteratorCreate(pReader); if (tfileIter == NULL) { indexWarn("empty tfile reader iterator"); diff --git a/source/libs/index/src/indexCache.c b/source/libs/index/src/indexCache.c index 5f7cc10b99..ef01094b5d 100644 --- a/source/libs/index/src/indexCache.c +++ b/source/libs/index/src/indexCache.c @@ -429,11 +429,13 @@ void indexCacheDestroy(void* cache) { } Iterate* indexCacheIteratorCreate(IndexCache* cache) { + if (cache->imm == NULL) { + return NULL; + } Iterate* iiter = taosMemoryCalloc(1, sizeof(Iterate)); if (iiter == NULL) { return NULL; } - taosThreadMutexLock(&cache->mtx); indexMemRef(cache->imm); @@ -463,12 +465,9 @@ int indexCacheSchedToMerge(IndexCache* pCache) { schedMsg.fp = doMergeWork; schedMsg.ahandle = pCache; schedMsg.thandle = NULL; - // schedMsg.thandle = taosMemoryCalloc(1, sizeof(int64_t)); - // memcpy((char*)(schedMsg.thandle), (char*)&(pCache->index->refId), sizeof(int64_t)); schedMsg.msg = NULL; indexAcquireRef(pCache->index->refId); taosScheduleTask(indexQhandle, &schedMsg); - return 0; } @@ -533,6 +532,19 @@ int indexCachePut(void* cache, SIndexTerm* term, uint64_t uid) { return 0; // encode end } +void indexCacheForceToMerge(void* cache) { + IndexCache* pCache = cache; + indexCacheRef(pCache); + taosThreadMutexLock(&pCache->mtx); + + indexInfo("%p is forced to merge into tfile", pCache); + pCache->occupiedMem += MEM_THRESHOLD * 5; + indexCacheMakeRoomForWrite(pCache); + + taosThreadMutexUnlock(&pCache->mtx); + indexCacheUnRef(pCache); + return; +} int indexCacheDel(void* cache, const char* fieldValue, int32_t fvlen, uint64_t uid, int8_t operType) { IndexCache* pCache = cache; return 0; diff --git a/source/libs/index/test/indexTests.cc b/source/libs/index/test/indexTests.cc index 733f1b4ed1..262bbfe55a 100644 --- a/source/libs/index/test/indexTests.cc +++ b/source/libs/index/test/indexTests.cc @@ -272,9 +272,26 @@ void validateFst() { } delete m; } +static std::string logDir = "/tmp/log"; + +static void initLog() { + const char* defaultLogFileNamePrefix = "taoslog"; + const int32_t maxLogFileNum = 10; + + tsAsyncLog = 0; + sDebugFlag = 143; + strcpy(tsLogDir, logDir.c_str()); + taosRemoveDir(tsLogDir); + taosMkDir(tsLogDir); + + if (taosInitLog(defaultLogFileNamePrefix, maxLogFileNum) < 0) { + printf("failed to open log file in directory:%s\n", tsLogDir); + } +} class IndexEnv : public ::testing::Test { protected: virtual void SetUp() { + initLog(); taosRemoveDir(path); opts = indexOptsCreate(); int ret = indexOpen(opts, path, &index); @@ -804,7 +821,7 @@ class IndexObj { } ~IndexObj() { - indexCleanUp(); + // indexCleanUp(); indexClose(idx); } @@ -884,7 +901,7 @@ TEST_F(IndexEnv2, testIndexOpen) { SArray* result = (SArray*)taosArrayInit(1, sizeof(uint64_t)); index->Search(mq, result); std::cout << "target size: " << taosArrayGetSize(result) << std::endl; - assert(taosArrayGetSize(result) == 400); + EXPECT_EQ(400, taosArrayGetSize(result)); taosArrayDestroy(result); indexMultiTermQueryDestroy(mq); }