diff --git a/source/libs/index/inc/indexTfile.h b/source/libs/index/inc/indexTfile.h index 85ed397b0a..6fca3b1bf5 100644 --- a/source/libs/index/inc/indexTfile.h +++ b/source/libs/index/inc/indexTfile.h @@ -74,9 +74,10 @@ typedef struct TFileReader { } TFileReader; typedef struct IndexTFile { - char* path; - TFileCache* cache; - TFileWriter* tw; + char* path; + TFileCache* cache; + TFileWriter* tw; + TdThreadMutex mtx; } IndexTFile; typedef struct TFileWriterOpt { diff --git a/source/libs/index/src/index.c b/source/libs/index/src/index.c index 6add788a89..02f1682655 100644 --- a/source/libs/index/src/index.c +++ b/source/libs/index/src/index.c @@ -557,20 +557,18 @@ void iterateValueDestroy(IterateValue* value, bool destroy) { static int64_t indexGetAvaialbleVer(SIndex* sIdx, IndexCache* cache) { ICacheKey key = {.suid = cache->suid, .colName = cache->colName, .nColName = strlen(cache->colName)}; int64_t ver = CACHE_VERSION(cache); - taosThreadMutexLock(&sIdx->mtx); - TFileReader* trd = tfileCacheGet(((IndexTFile*)sIdx->tindex)->cache, &key); - if (trd != NULL) { - if (ver < trd->header.version) { - ver = trd->header.version + 1; - } else { - ver += 1; - } - indexInfo("header: %d, ver: %" PRId64 "", trd->header.version, ver); - tfileReaderUnRef(trd); - } else { - indexInfo("not found reader base %p", trd); + + TFileReader* rd = tfileCacheGet(((IndexTFile*)sIdx->tindex)->cache, &key); + IndexTFile* tf = (IndexTFile*)(sIdx->tindex); + + taosThreadMutexLock(&tf->mtx); + tfileCacheGet(tf->cache, &key); + taosThreadMutexUnlock(&tf->mtx); + + if (rd != NULL) { + ver += MAX(ver, rd->header.version) + 1; + indexInfo("header: %d, ver: %" PRId64 "", rd->header.version, ver); } - taosThreadMutexUnlock(&sIdx->mtx); return ver; } static int indexGenTFile(SIndex* sIdx, IndexCache* cache, SArray* batch) { @@ -597,13 +595,14 @@ static int indexGenTFile(SIndex* sIdx, IndexCache* cache, SArray* batch) { } indexInfo("success to create tfile, reopen it, %s", reader->ctx->file.buf); + IndexTFile* tf = (IndexTFile*)sIdx->tindex; + TFileHeader* header = &reader->header; ICacheKey key = {.suid = cache->suid, .colName = header->colName, .nColName = strlen(header->colName)}; - taosThreadMutexLock(&sIdx->mtx); - IndexTFile* ifile = (IndexTFile*)sIdx->tindex; - tfileCachePut(ifile->cache, &key, reader); - taosThreadMutexUnlock(&sIdx->mtx); + taosThreadMutexLock(&tf->mtx); + tfileCachePut(tf->cache, &key, reader); + taosThreadMutexUnlock(&tf->mtx); return ret; END: if (tw != NULL) { diff --git a/source/libs/index/src/indexTfile.c b/source/libs/index/src/indexTfile.c index 3d85646bd2..43754193ae 100644 --- a/source/libs/index/src/indexTfile.c +++ b/source/libs/index/src/indexTfile.c @@ -151,13 +151,10 @@ TFileReader* tfileCacheGet(TFileCache* tcache, ICacheKey* key) { char buf[128] = {0}; int32_t sz = indexSerialCacheKey(key, buf); assert(sz < sizeof(buf)); - indexInfo("Try to get key: %s", buf); TFileReader** reader = taosHashGet(tcache->tableCache, buf, sz); if (reader == NULL || *reader == NULL) { - indexInfo("failed to get key: %s", buf); return NULL; } - indexInfo("Get key: %s file: %s", buf, (*reader)->ctx->file.buf); tfileReaderRef(*reader); return *reader; @@ -657,7 +654,7 @@ IndexTFile* indexTFileCreate(const char* path) { tfileCacheDestroy(cache); return NULL; } - + taosThreadMutexInit(&tfile->mtx, NULL); tfile->cache = cache; return tfile; } @@ -665,6 +662,7 @@ void indexTFileDestroy(IndexTFile* tfile) { if (tfile == NULL) { return; } + taosThreadMutexDestroy(&tfile->mtx); tfileCacheDestroy(tfile->cache); taosMemoryFree(tfile); } @@ -680,7 +678,10 @@ int indexTFileSearch(void* tfile, SIndexTermQuery* query, SIdxTempResult* result SIndexTerm* term = query->term; ICacheKey key = {.suid = term->suid, .colType = term->colType, .colName = term->colName, .nColName = term->nColName}; + + taosThreadMutexLock(&pTfile->mtx); TFileReader* reader = tfileCacheGet(pTfile->cache, &key); + taosThreadMutexUnlock(&pTfile->mtx); if (reader == NULL) { return 0; } @@ -780,8 +781,13 @@ TFileReader* tfileGetReaderByCol(IndexTFile* tf, uint64_t suid, char* colName) { if (tf == NULL) { return NULL; } - ICacheKey key = {.suid = suid, .colType = TSDB_DATA_TYPE_BINARY, .colName = colName, .nColName = strlen(colName)}; - return tfileCacheGet(tf->cache, &key); + TFileReader* rd = NULL; + ICacheKey key = {.suid = suid, .colType = TSDB_DATA_TYPE_BINARY, .colName = colName, .nColName = strlen(colName)}; + + taosThreadMutexLock(&tf->mtx); + rd = tfileCacheGet(tf->cache, &key); + taosThreadMutexUnlock(&tf->mtx); + return rd; } static int tfileUidCompare(const void* a, const void* b) { diff --git a/source/libs/index/test/indexTests.cc b/source/libs/index/test/indexTests.cc index f848cee86b..5c6fe8bf91 100644 --- a/source/libs/index/test/indexTests.cc +++ b/source/libs/index/test/indexTests.cc @@ -794,10 +794,10 @@ class IndexObj { } int sz = taosArrayGetSize(result); indexMultiTermQueryDestroy(mq); - taosArrayDestroy(result); assert(sz == 1); uint64_t* ret = (uint64_t*)taosArrayGet(result, 0); assert(val = *ret); + taosArrayDestroy(result); return sz; }