From 5a0e7dfa74baa6bab50b8a1049926faafe82c240 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Sun, 22 May 2022 19:58:42 +0800 Subject: [PATCH] enh: refator index/transport code --- cmake/cmake.options | 2 +- source/libs/index/inc/indexCache.h | 2 ++ source/libs/index/inc/indexInt.h | 1 + source/libs/index/src/index.c | 38 ++++++++++++++++------------ source/libs/index/src/indexCache.c | 29 ++++++++++++++++----- source/libs/index/test/indexTests.cc | 28 +++++++++++++++++++- 6 files changed, 75 insertions(+), 25 deletions(-) diff --git a/cmake/cmake.options b/cmake/cmake.options index d1feb6516a..cb6fd1400d 100644 --- a/cmake/cmake.options +++ b/cmake/cmake.options @@ -146,6 +146,6 @@ option( option( BUILD_WITH_INVERTEDINDEX "If use invertedIndex" - ON + OFF ) diff --git a/source/libs/index/inc/indexCache.h b/source/libs/index/inc/indexCache.h index 96bfb2cd7f..aff2e0e836 100644 --- a/source/libs/index/inc/indexCache.h +++ b/source/libs/index/inc/indexCache.h @@ -65,6 +65,8 @@ IndexCache* indexCacheCreate(SIndex* idx, uint64_t suid, const char* colName, in void indexCacheForceToMerge(void* cache); void indexCacheDestroy(void* cache); +void indexCacheBroadcast(void* cache); +void indexCacheWait(void* cache); Iterate* indexCacheIteratorCreate(IndexCache* cache); void indexCacheIteratorDestroy(Iterate* iiter); diff --git a/source/libs/index/inc/indexInt.h b/source/libs/index/inc/indexInt.h index 5cb60abe7d..4bce792dd9 100644 --- a/source/libs/index/inc/indexInt.h +++ b/source/libs/index/inc/indexInt.h @@ -58,6 +58,7 @@ struct SIndex { SIndexStat stat; TdThreadMutex mtx; + bool quit; }; struct SIndexOpts { diff --git a/source/libs/index/src/index.c b/source/libs/index/src/index.c index d1f50f1979..add59cb0b3 100644 --- a/source/libs/index/src/index.c +++ b/source/libs/index/src/index.c @@ -124,21 +124,6 @@ END: void indexDestroy(void* handle) { SIndex* sIdx = handle; - // indexAcquireRef(sIdx->refId); - if (sIdx->colObj != NULL) { - void* iter = taosHashIterate(sIdx->colObj, NULL); - while (iter) { - IndexCache** pCache = iter; - indexCacheForceToMerge((void*)(*pCache)); - iter = taosHashIterate(sIdx->colObj, iter); - - indexCacheUnRef(*pCache); - } - taosHashCleanup(sIdx->colObj); - sIdx->colObj = NULL; - return; - } // indexReleaseRef(sIdx->refId); - taosThreadMutexDestroy(&sIdx->mtx); indexTFileDestroy(sIdx->tindex); taosMemoryFree(sIdx->path); @@ -147,6 +132,20 @@ void indexDestroy(void* handle) { } void indexClose(SIndex* sIdx) { indexReleaseRef(sIdx->refId); + bool ref = 0; + if (sIdx->colObj != NULL) { + void* iter = taosHashIterate(sIdx->colObj, NULL); + while (iter) { + IndexCache** pCache = iter; + indexCacheForceToMerge((void*)(*pCache)); + indexCacheWait((void*)(*pCache)); + iter = taosHashIterate(sIdx->colObj, iter); + indexCacheUnRef(*pCache); + } + taosHashCleanup(sIdx->colObj); + sIdx->colObj = NULL; + } + // taosMsleep(1000 * 5); indexRemoveRef(sIdx->refId); } int64_t indexAddRef(void* p) { @@ -183,6 +182,7 @@ int indexPut(SIndex* index, SIndexMultiTerm* fVals, uint64_t uid) { taosHashPut(index->colObj, buf, sz, &pCache, sizeof(void*)); } } + taosThreadMutexUnlock(&index->mtx); for (int i = 0; i < taosArrayGetSize(fVals); i++) { SIndexTerm* p = taosArrayGetP(fVals, i); @@ -198,7 +198,6 @@ int indexPut(SIndex* index, SIndexMultiTerm* fVals, uint64_t uid) { return ret; } } - taosThreadMutexUnlock(&index->mtx); return 0; } int indexSearch(SIndex* index, SIndexMultiTermQuery* multiQuerys, SArray* result) { @@ -461,6 +460,9 @@ int indexFlushCacheToTFile(SIndex* sIdx, void* cache) { indexError("%p immtable is empty, ignore merge opera", pCache); indexCacheDestroyImm(pCache); tfileReaderUnRef(pReader); + if (sIdx->quit) { + indexCacheBroadcast(pCache); + } indexReleaseRef(sIdx->refId); return 0; } @@ -507,6 +509,9 @@ int indexFlushCacheToTFile(SIndex* sIdx, void* cache) { indexDestroyFinalResult(result); indexCacheDestroyImm(pCache); + if (sIdx->quit) { + indexCacheBroadcast(pCache); + } indexCacheIteratorDestroy(cacheIter); tfileIteratorDestroy(tfileIter); @@ -521,6 +526,7 @@ int indexFlushCacheToTFile(SIndex* sIdx, void* cache) { indexInfo("success to merge , time cost: %" PRId64 "ms", cost / 1000); } indexReleaseRef(sIdx->refId); + return ret; } void iterateValueDestroy(IterateValue* value, bool destroy) { diff --git a/source/libs/index/src/indexCache.c b/source/libs/index/src/indexCache.c index ef01094b5d..232eca9304 100644 --- a/source/libs/index/src/indexCache.c +++ b/source/libs/index/src/indexCache.c @@ -23,6 +23,7 @@ #define MEM_TERM_LIMIT 10 * 10000 #define MEM_THRESHOLD 64 * 1024 +#define MEM_SIGNAL_QUIT MEM_THRESHOLD * 20 #define MEM_ESTIMATE_RADIO 1.5 static void indexMemRef(MemTable* tbl); @@ -396,17 +397,24 @@ void indexCacheDestroySkiplist(SSkipList* slt) { tSkipListDestroyIter(iter); tSkipListDestroy(slt); } +void indexCacheBroadcast(void* cache) { + IndexCache* pCache = cache; + taosThreadCondBroadcast(&pCache->finished); +} +void indexCacheWait(void* cache) { + IndexCache* pCache = cache; + taosThreadCondWait(&pCache->finished, &pCache->mtx); +} void indexCacheDestroyImm(IndexCache* cache) { if (cache == NULL) { return; } - MemTable* tbl = NULL; taosThreadMutexLock(&cache->mtx); tbl = cache->imm; cache->imm = NULL; // or throw int bg thread - taosThreadCondBroadcast(&cache->finished); + indexCacheBroadcast(cache); taosThreadMutexUnlock(&cache->mtx); @@ -460,11 +468,13 @@ void indexCacheIteratorDestroy(Iterate* iter) { taosMemoryFree(iter); } -int indexCacheSchedToMerge(IndexCache* pCache) { +int indexCacheSchedToMerge(IndexCache* pCache, bool notify) { SSchedMsg schedMsg = {0}; schedMsg.fp = doMergeWork; schedMsg.ahandle = pCache; - schedMsg.thandle = NULL; + if (notify) { + schedMsg.thandle = taosMemoryMalloc(1); + } schedMsg.msg = NULL; indexAcquireRef(pCache->index->refId); taosScheduleTask(indexQhandle, &schedMsg); @@ -477,8 +487,10 @@ static void indexCacheMakeRoomForWrite(IndexCache* cache) { break; } else if (cache->imm != NULL) { // TODO: wake up by condition variable - taosThreadCondWait(&cache->finished, &cache->mtx); + indexCacheWait(cache); } else { + bool notifyQuit = cache->occupiedMem >= MEM_SIGNAL_QUIT ? true : false; + indexCacheRef(cache); cache->imm = cache->mem; cache->mem = indexInternalCacheCreate(cache->type); @@ -486,7 +498,7 @@ static void indexCacheMakeRoomForWrite(IndexCache* cache) { cache->occupiedMem = 0; // sched to merge // unref cache in bgwork - indexCacheSchedToMerge(cache); + indexCacheSchedToMerge(cache, notifyQuit); } } } @@ -538,7 +550,7 @@ void indexCacheForceToMerge(void* cache) { taosThreadMutexLock(&pCache->mtx); indexInfo("%p is forced to merge into tfile", pCache); - pCache->occupiedMem += MEM_THRESHOLD * 5; + pCache->occupiedMem += MEM_SIGNAL_QUIT; indexCacheMakeRoomForWrite(pCache); taosThreadMutexUnlock(&pCache->mtx); @@ -703,6 +715,9 @@ static MemTable* indexInternalCacheCreate(int8_t type) { static void doMergeWork(SSchedMsg* msg) { IndexCache* pCache = msg->ahandle; SIndex* sidx = (SIndex*)pCache->index; + + sidx->quit = msg->thandle ? true : false; + taosMemoryFree(msg->thandle); indexFlushCacheToTFile(sidx, pCache); } static bool indexCacheIteratorNext(Iterate* itera) { diff --git a/source/libs/index/test/indexTests.cc b/source/libs/index/test/indexTests.cc index 262bbfe55a..222bf51c5f 100644 --- a/source/libs/index/test/indexTests.cc +++ b/source/libs/index/test/indexTests.cc @@ -834,7 +834,10 @@ class IndexObj { class IndexEnv2 : public ::testing::Test { protected: - virtual void SetUp() { index = new IndexObj(); } + virtual void SetUp() { + initLog(); + index = new IndexObj(); + } virtual void TearDown() { delete index; } IndexObj* index; }; @@ -906,6 +909,29 @@ TEST_F(IndexEnv2, testIndexOpen) { indexMultiTermQueryDestroy(mq); } } +TEST_F(IndexEnv2, testEmptyIndexOpen) { + std::string path = "/tmp/test"; + if (index->Init(path) != 0) { + std::cout << "failed to init index" << std::endl; + exit(1); + } + + int targetSize = 1; + { + std::string colName("tag1"), colVal("Hello"); + + SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), + colVal.c_str(), colVal.size()); + SIndexMultiTerm* terms = indexMultiTermCreate(); + indexMultiTermAdd(terms, term); + for (size_t i = 0; i < targetSize; i++) { + int tableId = i; + int ret = index->Put(terms, tableId); + assert(ret == 0); + } + indexMultiTermDestroy(terms); + } +} TEST_F(IndexEnv2, testIndex_TrigeFlush) { std::string path = "/tmp/testxxx";