diff --git a/source/libs/index/inc/index_tfile.h b/source/libs/index/inc/index_tfile.h index d711da1807..cf6341ce9c 100644 --- a/source/libs/index/inc/index_tfile.h +++ b/source/libs/index/inc/index_tfile.h @@ -47,7 +47,6 @@ typedef struct TFileHeader { typedef struct TFileCacheKey { uint64_t suid; uint8_t colType; - int32_t version; char* colName; int32_t nColName; } TFileCacheKey; diff --git a/source/libs/index/src/index_tfile.c b/source/libs/index/src/index_tfile.c index 6ac1a893a6..9acd9f6870 100644 --- a/source/libs/index/src/index_tfile.c +++ b/source/libs/index/src/index_tfile.c @@ -38,11 +38,14 @@ static int tfileWriteHeader(TFileWriter* writer); static int tfileWriteFstOffset(TFileWriter* tw, int32_t offset); static int tfileWriteData(TFileWriter* write, TFileValue* tval); -static int tfileReadLoadHeader(TFileReader* reader); -static int tfileReadLoadFst(TFileReader* reader); -static int tfileReadLoadTableIds(TFileReader* reader, int32_t offset, SArray* result); +static int tfileReadLoadHeader(TFileReader* reader); +static int tfileReadLoadFst(TFileReader* reader); +static int tfileReadLoadTableIds(TFileReader* reader, int32_t offset, SArray* result); +static void tfileReadRef(TFileReader* reader); +static void tfileReadUnRef(TFileReader* reader); static int tfileGetFileList(const char* path, SArray* result); +static int tfileRmExpireFile(SArray* result); static void tfileDestroyFileName(void* elem); static int tfileCompare(const void* a, const void* b); static int tfileParseFileName(const char* filename, uint64_t* suid, int* colId, int* version); @@ -58,6 +61,8 @@ TFileCache* tfileCacheCreate(const char* path) { SArray* files = taosArrayInit(4, sizeof(void*)); tfileGetFileList(path, files); taosArraySort(files, tfileCompare); + tfileRmExpireFile(files); + uint64_t suid; int32_t colId, version; for (size_t i = 0; i < taosArrayGetSize(files); i++) { @@ -66,29 +71,29 @@ TFileCache* tfileCacheCreate(const char* path) { indexInfo("try parse invalid file: %s, skip it", file); continue; } + WriterCtx* wc = writerCtxCreate(TFile, file, true, 1024 * 64); if (wc == NULL) { indexError("failed to open index: %s", file); goto End; } + TFileReader* reader = tfileReaderCreate(wc); if (0 != tfileReadLoadHeader(reader)) { tfileReaderDestroy(reader); indexError("failed to load index header, index file: %s", file); goto End; } + if (0 != tfileReadLoadFst(reader)) { tfileReaderDestroy(reader); indexError("failed to load index fst, index file: %s", file); + goto End; } + tfileReadRef(reader); // loader fst and validate it - TFileHeader* header = &reader->header; - TFileCacheKey key = {.suid = header->suid, - .version = header->version, - .colName = header->colName, - .nColName = strlen(header->colName), - .colType = header->colType}; + TFileCacheKey key = {.suid = header->suid, .colName = header->colName, .nColName = strlen(header->colName), .colType = header->colType}; char buf[128] = {0}; tfileSerialCacheKey(&key, buf); @@ -110,7 +115,7 @@ void tfileCacheDestroy(TFileCache* tcache) { TFileReader* p = *reader; indexInfo("drop table cache suid: %" PRIu64 ", colName: %s, colType: %d", p->header.suid, p->header.colName, p->header.colType); - tfileReaderDestroy(p); + tfileReadUnRef(p); reader = taosHashIterate(tcache->tableCache, reader); } taosHashCleanup(tcache->tableCache); @@ -120,12 +125,17 @@ void tfileCacheDestroy(TFileCache* tcache) { TFileReader* tfileCacheGet(TFileCache* tcache, TFileCacheKey* key) { char buf[128] = {0}; tfileSerialCacheKey(key, buf); + TFileReader* reader = taosHashGet(tcache->tableCache, buf, strlen(buf)); + tfileReadRef(reader); + return reader; } void tfileCachePut(TFileCache* tcache, TFileCacheKey* key, TFileReader* reader) { char buf[128] = {0}; tfileSerialCacheKey(key, buf); + tfileReadRef(reader); + taosHashPut(tcache->tableCache, buf, strlen(buf), &reader, sizeof(void*)); return; } @@ -147,25 +157,29 @@ void tfileReaderDestroy(TFileReader* reader) { } int tfileReaderSearch(TFileReader* reader, SIndexTermQuery* query, SArray* result) { - SIndexTerm* term = query->term; + SIndexTerm* term = query->term; + EIndexQueryType qtype = query->qType; + int ret = -1; // refactor to callback later - if (query->qType == QUERY_TERM) { + if (qtype == QUERY_TERM) { uint64_t offset; FstSlice key = fstSliceCreate(term->colVal, term->nColVal); if (fstGet(reader->fst, &key, &offset)) { - return tfileReadLoadTableIds(reader, offset, result); + indexInfo("index: %" PRIu64 ", col: %s, colVal: %s, found table info in tindex", term->suid, term->colName, term->colVal); + ret = tfileReadLoadTableIds(reader, offset, result); } else { - indexInfo("index: %" PRIu64 ", col: %s, colVal: %s, not found in tindex", term->suid, term->colName, term->colVal); + indexInfo("index: %" PRIu64 ", col: %s, colVal: %s, not found table info in tindex", term->suid, term->colName, term->colVal); } - return 0; - } else if (query->qType == QUERY_PREFIX) { + fstSliceDestroy(&key); + } else if (qtype == QUERY_PREFIX) { // handle later // } else { // handle later } - return 0; + tfileReadUnRef(reader); + return ret; } TFileWriter* tfileWriterCreate(WriterCtx* ctx, TFileHeader* header) { @@ -209,7 +223,6 @@ int tfileWriterPut(TFileWriter* tw, void* data) { int32_t tbsz = taosArrayGetSize(v->tableId); fstOffset += TF_TABLE_TATOAL_SIZE(tbsz); } - // check result or not tfileWriteFstOffset(tw, fstOffset); for (size_t i = 0; i < sz; i++) { @@ -237,6 +250,7 @@ int tfileWriterPut(TFileWriter* tw, void* data) { // write reversed data in buf to tindex tw->ctx->write(tw->ctx, buf, offset); } + tfree(buf); // write fst for (size_t i = 0; i < sz; i++) { @@ -244,11 +258,8 @@ int tfileWriterPut(TFileWriter* tw, void* data) { TFileValue* v = taosArrayGetP((SArray*)data, i); if (tfileWriteData(tw, v) == 0) { // - // } } - - tfree(buf); return 0; } void tfileWriterDestroy(TFileWriter* tw) { @@ -270,17 +281,21 @@ void IndexTFileDestroy(IndexTFile* tfile) { } int indexTFileSearch(void* tfile, SIndexTermQuery* query, SArray* result) { - if (tfile == NULL) { return -1; } + int ret = -1; + if (tfile == NULL) { return ret; } IndexTFile* pTfile = (IndexTFile*)tfile; SIndexTerm* term = query->term; - TFileCacheKey key = {.suid = term->suid, .colType = term->colType, .version = 0, .colName = term->colName, .nColName = term->nColName}; + TFileCacheKey key = {.suid = term->suid, .colType = term->colType, .colName = term->colName, .nColName = term->nColName}; + TFileReader* reader = tfileCacheGet(pTfile->cache, &key); - TFileReader* reader = tfileCacheGet(pTfile->cache, &key); - return tfileReaderSearch(reader, query, result); + ret = tfileReaderSearch(reader, query, result); + + return ret; } int indexTFilePut(void* tfile, SIndexTerm* term, uint64_t uid) { - TFileWriterOpt wOpt = {.suid = term->suid, .colType = term->colType, .colName = term->colName, .nColName = term->nColName, .version = 1}; + // TFileWriterOpt wOpt = {.suid = term->suid, .colType = term->colType, .colName = term->colName, .nColName = term->nColName, .version = + // 1}; return 0; } @@ -382,6 +397,15 @@ static int tfileReadLoadTableIds(TFileReader* reader, int32_t offset, SArray* re free(buf); return 0; } +static void tfileReadRef(TFileReader* reader) { + int ref = T_REF_INC(reader); + UNUSED(ref); +} + +static void tfileReadUnRef(TFileReader* reader) { + int ref = T_REF_DEC(reader); + if (ref == 0) { tfileReaderDestroy(reader); } +} static int tfileGetFileList(const char* path, SArray* result) { DIR* dir = opendir(path); @@ -397,6 +421,10 @@ static int tfileGetFileList(const char* path, SArray* result) { closedir(dir); return 0; } +static int tfileRmExpireFile(SArray* result) { + // TODO(yihao): remove expire tindex after restart + return 0; +} static void tfileDestroyFileName(void* elem) { char* p = *(char**)elem; free(p); @@ -423,7 +451,5 @@ static void tfileSerialCacheKey(TFileCacheKey* key, char* buf) { SERIALIZE_VAR_TO_BUF(buf, '_', char); SERIALIZE_MEM_TO_BUF(buf, key, colType); SERIALIZE_VAR_TO_BUF(buf, '_', char); - SERIALIZE_MEM_TO_BUF(buf, key, version); - SERIALIZE_VAR_TO_BUF(buf, '_', char); SERIALIZE_STR_MEM_TO_BUF(buf, key, colName, key->nColName); }