enh: opt index mutex
This commit is contained in:
parent
20d788d703
commit
6ca5e3ae6e
|
@ -74,9 +74,10 @@ typedef struct TFileReader {
|
||||||
} TFileReader;
|
} TFileReader;
|
||||||
|
|
||||||
typedef struct IndexTFile {
|
typedef struct IndexTFile {
|
||||||
char* path;
|
char* path;
|
||||||
TFileCache* cache;
|
TFileCache* cache;
|
||||||
TFileWriter* tw;
|
TFileWriter* tw;
|
||||||
|
TdThreadMutex mtx;
|
||||||
} IndexTFile;
|
} IndexTFile;
|
||||||
|
|
||||||
typedef struct TFileWriterOpt {
|
typedef struct TFileWriterOpt {
|
||||||
|
|
|
@ -557,20 +557,18 @@ void iterateValueDestroy(IterateValue* value, bool destroy) {
|
||||||
static int64_t indexGetAvaialbleVer(SIndex* sIdx, IndexCache* cache) {
|
static int64_t indexGetAvaialbleVer(SIndex* sIdx, IndexCache* cache) {
|
||||||
ICacheKey key = {.suid = cache->suid, .colName = cache->colName, .nColName = strlen(cache->colName)};
|
ICacheKey key = {.suid = cache->suid, .colName = cache->colName, .nColName = strlen(cache->colName)};
|
||||||
int64_t ver = CACHE_VERSION(cache);
|
int64_t ver = CACHE_VERSION(cache);
|
||||||
taosThreadMutexLock(&sIdx->mtx);
|
|
||||||
TFileReader* trd = tfileCacheGet(((IndexTFile*)sIdx->tindex)->cache, &key);
|
TFileReader* rd = tfileCacheGet(((IndexTFile*)sIdx->tindex)->cache, &key);
|
||||||
if (trd != NULL) {
|
IndexTFile* tf = (IndexTFile*)(sIdx->tindex);
|
||||||
if (ver < trd->header.version) {
|
|
||||||
ver = trd->header.version + 1;
|
taosThreadMutexLock(&tf->mtx);
|
||||||
} else {
|
tfileCacheGet(tf->cache, &key);
|
||||||
ver += 1;
|
taosThreadMutexUnlock(&tf->mtx);
|
||||||
}
|
|
||||||
indexInfo("header: %d, ver: %" PRId64 "", trd->header.version, ver);
|
if (rd != NULL) {
|
||||||
tfileReaderUnRef(trd);
|
ver += MAX(ver, rd->header.version) + 1;
|
||||||
} else {
|
indexInfo("header: %d, ver: %" PRId64 "", rd->header.version, ver);
|
||||||
indexInfo("not found reader base %p", trd);
|
|
||||||
}
|
}
|
||||||
taosThreadMutexUnlock(&sIdx->mtx);
|
|
||||||
return ver;
|
return ver;
|
||||||
}
|
}
|
||||||
static int indexGenTFile(SIndex* sIdx, IndexCache* cache, SArray* batch) {
|
static int indexGenTFile(SIndex* sIdx, IndexCache* cache, SArray* batch) {
|
||||||
|
@ -597,13 +595,14 @@ static int indexGenTFile(SIndex* sIdx, IndexCache* cache, SArray* batch) {
|
||||||
}
|
}
|
||||||
indexInfo("success to create tfile, reopen it, %s", reader->ctx->file.buf);
|
indexInfo("success to create tfile, reopen it, %s", reader->ctx->file.buf);
|
||||||
|
|
||||||
|
IndexTFile* tf = (IndexTFile*)sIdx->tindex;
|
||||||
|
|
||||||
TFileHeader* header = &reader->header;
|
TFileHeader* header = &reader->header;
|
||||||
ICacheKey key = {.suid = cache->suid, .colName = header->colName, .nColName = strlen(header->colName)};
|
ICacheKey key = {.suid = cache->suid, .colName = header->colName, .nColName = strlen(header->colName)};
|
||||||
|
|
||||||
taosThreadMutexLock(&sIdx->mtx);
|
taosThreadMutexLock(&tf->mtx);
|
||||||
IndexTFile* ifile = (IndexTFile*)sIdx->tindex;
|
tfileCachePut(tf->cache, &key, reader);
|
||||||
tfileCachePut(ifile->cache, &key, reader);
|
taosThreadMutexUnlock(&tf->mtx);
|
||||||
taosThreadMutexUnlock(&sIdx->mtx);
|
|
||||||
return ret;
|
return ret;
|
||||||
END:
|
END:
|
||||||
if (tw != NULL) {
|
if (tw != NULL) {
|
||||||
|
|
|
@ -151,13 +151,10 @@ TFileReader* tfileCacheGet(TFileCache* tcache, ICacheKey* key) {
|
||||||
char buf[128] = {0};
|
char buf[128] = {0};
|
||||||
int32_t sz = indexSerialCacheKey(key, buf);
|
int32_t sz = indexSerialCacheKey(key, buf);
|
||||||
assert(sz < sizeof(buf));
|
assert(sz < sizeof(buf));
|
||||||
indexInfo("Try to get key: %s", buf);
|
|
||||||
TFileReader** reader = taosHashGet(tcache->tableCache, buf, sz);
|
TFileReader** reader = taosHashGet(tcache->tableCache, buf, sz);
|
||||||
if (reader == NULL || *reader == NULL) {
|
if (reader == NULL || *reader == NULL) {
|
||||||
indexInfo("failed to get key: %s", buf);
|
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
indexInfo("Get key: %s file: %s", buf, (*reader)->ctx->file.buf);
|
|
||||||
tfileReaderRef(*reader);
|
tfileReaderRef(*reader);
|
||||||
|
|
||||||
return *reader;
|
return *reader;
|
||||||
|
@ -657,7 +654,7 @@ IndexTFile* indexTFileCreate(const char* path) {
|
||||||
tfileCacheDestroy(cache);
|
tfileCacheDestroy(cache);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
taosThreadMutexInit(&tfile->mtx, NULL);
|
||||||
tfile->cache = cache;
|
tfile->cache = cache;
|
||||||
return tfile;
|
return tfile;
|
||||||
}
|
}
|
||||||
|
@ -665,6 +662,7 @@ void indexTFileDestroy(IndexTFile* tfile) {
|
||||||
if (tfile == NULL) {
|
if (tfile == NULL) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
taosThreadMutexDestroy(&tfile->mtx);
|
||||||
tfileCacheDestroy(tfile->cache);
|
tfileCacheDestroy(tfile->cache);
|
||||||
taosMemoryFree(tfile);
|
taosMemoryFree(tfile);
|
||||||
}
|
}
|
||||||
|
@ -680,7 +678,10 @@ int indexTFileSearch(void* tfile, SIndexTermQuery* query, SIdxTempResult* result
|
||||||
|
|
||||||
SIndexTerm* term = query->term;
|
SIndexTerm* term = query->term;
|
||||||
ICacheKey key = {.suid = term->suid, .colType = term->colType, .colName = term->colName, .nColName = term->nColName};
|
ICacheKey key = {.suid = term->suid, .colType = term->colType, .colName = term->colName, .nColName = term->nColName};
|
||||||
|
|
||||||
|
taosThreadMutexLock(&pTfile->mtx);
|
||||||
TFileReader* reader = tfileCacheGet(pTfile->cache, &key);
|
TFileReader* reader = tfileCacheGet(pTfile->cache, &key);
|
||||||
|
taosThreadMutexUnlock(&pTfile->mtx);
|
||||||
if (reader == NULL) {
|
if (reader == NULL) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -780,8 +781,13 @@ TFileReader* tfileGetReaderByCol(IndexTFile* tf, uint64_t suid, char* colName) {
|
||||||
if (tf == NULL) {
|
if (tf == NULL) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
ICacheKey key = {.suid = suid, .colType = TSDB_DATA_TYPE_BINARY, .colName = colName, .nColName = strlen(colName)};
|
TFileReader* rd = NULL;
|
||||||
return tfileCacheGet(tf->cache, &key);
|
ICacheKey key = {.suid = suid, .colType = TSDB_DATA_TYPE_BINARY, .colName = colName, .nColName = strlen(colName)};
|
||||||
|
|
||||||
|
taosThreadMutexLock(&tf->mtx);
|
||||||
|
rd = tfileCacheGet(tf->cache, &key);
|
||||||
|
taosThreadMutexUnlock(&tf->mtx);
|
||||||
|
return rd;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int tfileUidCompare(const void* a, const void* b) {
|
static int tfileUidCompare(const void* a, const void* b) {
|
||||||
|
|
|
@ -794,10 +794,10 @@ class IndexObj {
|
||||||
}
|
}
|
||||||
int sz = taosArrayGetSize(result);
|
int sz = taosArrayGetSize(result);
|
||||||
indexMultiTermQueryDestroy(mq);
|
indexMultiTermQueryDestroy(mq);
|
||||||
taosArrayDestroy(result);
|
|
||||||
assert(sz == 1);
|
assert(sz == 1);
|
||||||
uint64_t* ret = (uint64_t*)taosArrayGet(result, 0);
|
uint64_t* ret = (uint64_t*)taosArrayGet(result, 0);
|
||||||
assert(val = *ret);
|
assert(val = *ret);
|
||||||
|
taosArrayDestroy(result);
|
||||||
|
|
||||||
return sz;
|
return sz;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue