fix: avoid invalid read/write

This commit is contained in:
yihaoDeng 2022-05-22 23:19:35 +08:00
parent fb4982f8ca
commit bad686568d
3 changed files with 23 additions and 8 deletions

View File

@ -31,7 +31,7 @@ if (${BUILD_WITH_INVERTEDINDEX})
endif(${BUILD_WITH_INVERTEDINDEX}) endif(${BUILD_WITH_INVERTEDINDEX})
if (${BUILD_TEST}) #if (${BUILD_TEST})
add_subdirectory(test) # add_subdirectory(test)
endif(${BUILD_TEST}) #endif(${BUILD_TEST})

View File

@ -58,6 +58,7 @@ struct SIndex {
SIndexStat stat; SIndexStat stat;
TdThreadMutex mtx; TdThreadMutex mtx;
tsem_t sem;
bool quit; bool quit;
}; };
@ -70,6 +71,7 @@ struct SIndexOpts {
int32_t cacheSize; // MB int32_t cacheSize; // MB
// add cache module later // add cache module later
#endif #endif
int32_t cacheOpt; // MB
}; };
struct SIndexMultiTermQuery { struct SIndexMultiTermQuery {

View File

@ -90,6 +90,15 @@ static void indexMergeCacheAndTFile(SArray* result, IterateValue* icache, Iterat
// static int32_t indexSerialTermKey(SIndexTerm* itm, char* buf); // static int32_t indexSerialTermKey(SIndexTerm* itm, char* buf);
// int32_t indexSerialKey(ICacheKey* key, char* buf); // int32_t indexSerialKey(ICacheKey* key, char* buf);
static void indexPost(void* idx) {
SIndex* pIdx = idx;
tsem_post(&pIdx->sem);
}
static void indexWait(void* idx) {
SIndex* pIdx = idx;
tsem_wait(&pIdx->sem);
}
int indexOpen(SIndexOpts* opts, const char* path, SIndex** index) { int indexOpen(SIndexOpts* opts, const char* path, SIndex** index) {
taosThreadOnce(&isInit, indexInit); taosThreadOnce(&isInit, indexInit);
SIndex* sIdx = taosMemoryCalloc(1, sizeof(SIndex)); SIndex* sIdx = taosMemoryCalloc(1, sizeof(SIndex));
@ -107,6 +116,8 @@ int indexOpen(SIndexOpts* opts, const char* path, SIndex** index) {
sIdx->cVersion = 1; sIdx->cVersion = 1;
sIdx->path = tstrdup(path); sIdx->path = tstrdup(path);
taosThreadMutexInit(&sIdx->mtx, NULL); taosThreadMutexInit(&sIdx->mtx, NULL);
tsem_init(&sIdx->sem, 0, 0);
// taosThreadCondInit(&sIdx->finished, NULL);
sIdx->refId = indexAddRef(sIdx); sIdx->refId = indexAddRef(sIdx);
indexAcquireRef(sIdx->refId); indexAcquireRef(sIdx->refId);
@ -125,6 +136,7 @@ END:
void indexDestroy(void* handle) { void indexDestroy(void* handle) {
SIndex* sIdx = handle; SIndex* sIdx = handle;
taosThreadMutexDestroy(&sIdx->mtx); taosThreadMutexDestroy(&sIdx->mtx);
tsem_destroy(&sIdx->sem);
indexTFileDestroy(sIdx->tindex); indexTFileDestroy(sIdx->tindex);
taosMemoryFree(sIdx->path); taosMemoryFree(sIdx->path);
taosMemoryFree(sIdx); taosMemoryFree(sIdx);
@ -138,7 +150,7 @@ void indexClose(SIndex* sIdx) {
while (iter) { while (iter) {
IndexCache** pCache = iter; IndexCache** pCache = iter;
indexCacheForceToMerge((void*)(*pCache)); indexCacheForceToMerge((void*)(*pCache));
indexCacheWait((void*)(*pCache)); indexWait((void*)(sIdx));
iter = taosHashIterate(sIdx->colObj, iter); iter = taosHashIterate(sIdx->colObj, iter);
indexCacheUnRef(*pCache); indexCacheUnRef(*pCache);
} }
@ -461,7 +473,8 @@ int indexFlushCacheToTFile(SIndex* sIdx, void* cache) {
indexCacheDestroyImm(pCache); indexCacheDestroyImm(pCache);
tfileReaderUnRef(pReader); tfileReaderUnRef(pReader);
if (sIdx->quit) { if (sIdx->quit) {
indexCacheBroadcast(pCache); indexPost(sIdx);
// indexCacheBroadcast(pCache);
} }
indexReleaseRef(sIdx->refId); indexReleaseRef(sIdx->refId);
return 0; return 0;
@ -509,9 +522,6 @@ int indexFlushCacheToTFile(SIndex* sIdx, void* cache) {
indexDestroyFinalResult(result); indexDestroyFinalResult(result);
indexCacheDestroyImm(pCache); indexCacheDestroyImm(pCache);
if (sIdx->quit) {
indexCacheBroadcast(pCache);
}
indexCacheIteratorDestroy(cacheIter); indexCacheIteratorDestroy(cacheIter);
tfileIteratorDestroy(tfileIter); tfileIteratorDestroy(tfileIter);
@ -525,6 +535,9 @@ int indexFlushCacheToTFile(SIndex* sIdx, void* cache) {
} else { } else {
indexInfo("success to merge , time cost: %" PRId64 "ms", cost / 1000); indexInfo("success to merge , time cost: %" PRId64 "ms", cost / 1000);
} }
if (sIdx->quit) {
indexPost(sIdx);
}
indexReleaseRef(sIdx->refId); indexReleaseRef(sIdx->refId);
return ret; return ret;