update tindex reader

This commit is contained in:
yihaoDeng 2021-12-22 23:12:11 +08:00
parent 199eb90e0f
commit 556788b634
2 changed files with 54 additions and 29 deletions

View File

@ -47,7 +47,6 @@ typedef struct TFileHeader {
typedef struct TFileCacheKey {
uint64_t suid;
uint8_t colType;
int32_t version;
char* colName;
int32_t nColName;
} TFileCacheKey;

View File

@ -41,8 +41,11 @@ static int tfileWriteData(TFileWriter* write, TFileValue* tval);
static int tfileReadLoadHeader(TFileReader* reader);
static int tfileReadLoadFst(TFileReader* reader);
static int tfileReadLoadTableIds(TFileReader* reader, int32_t offset, SArray* result);
static void tfileReadRef(TFileReader* reader);
static void tfileReadUnRef(TFileReader* reader);
static int tfileGetFileList(const char* path, SArray* result);
static int tfileRmExpireFile(SArray* result);
static void tfileDestroyFileName(void* elem);
static int tfileCompare(const void* a, const void* b);
static int tfileParseFileName(const char* filename, uint64_t* suid, int* colId, int* version);
@ -58,6 +61,8 @@ TFileCache* tfileCacheCreate(const char* path) {
SArray* files = taosArrayInit(4, sizeof(void*));
tfileGetFileList(path, files);
taosArraySort(files, tfileCompare);
tfileRmExpireFile(files);
uint64_t suid;
int32_t colId, version;
for (size_t i = 0; i < taosArrayGetSize(files); i++) {
@ -66,29 +71,29 @@ TFileCache* tfileCacheCreate(const char* path) {
indexInfo("try parse invalid file: %s, skip it", file);
continue;
}
WriterCtx* wc = writerCtxCreate(TFile, file, true, 1024 * 64);
if (wc == NULL) {
indexError("failed to open index: %s", file);
goto End;
}
TFileReader* reader = tfileReaderCreate(wc);
if (0 != tfileReadLoadHeader(reader)) {
tfileReaderDestroy(reader);
indexError("failed to load index header, index file: %s", file);
goto End;
}
if (0 != tfileReadLoadFst(reader)) {
tfileReaderDestroy(reader);
indexError("failed to load index fst, index file: %s", file);
goto End;
}
tfileReadRef(reader);
// loader fst and validate it
TFileHeader* header = &reader->header;
TFileCacheKey key = {.suid = header->suid,
.version = header->version,
.colName = header->colName,
.nColName = strlen(header->colName),
.colType = header->colType};
TFileCacheKey key = {.suid = header->suid, .colName = header->colName, .nColName = strlen(header->colName), .colType = header->colType};
char buf[128] = {0};
tfileSerialCacheKey(&key, buf);
@ -110,7 +115,7 @@ void tfileCacheDestroy(TFileCache* tcache) {
TFileReader* p = *reader;
indexInfo("drop table cache suid: %" PRIu64 ", colName: %s, colType: %d", p->header.suid, p->header.colName, p->header.colType);
tfileReaderDestroy(p);
tfileReadUnRef(p);
reader = taosHashIterate(tcache->tableCache, reader);
}
taosHashCleanup(tcache->tableCache);
@ -120,12 +125,17 @@ void tfileCacheDestroy(TFileCache* tcache) {
TFileReader* tfileCacheGet(TFileCache* tcache, TFileCacheKey* key) {
char buf[128] = {0};
tfileSerialCacheKey(key, buf);
TFileReader* reader = taosHashGet(tcache->tableCache, buf, strlen(buf));
tfileReadRef(reader);
return reader;
}
void tfileCachePut(TFileCache* tcache, TFileCacheKey* key, TFileReader* reader) {
char buf[128] = {0};
tfileSerialCacheKey(key, buf);
tfileReadRef(reader);
taosHashPut(tcache->tableCache, buf, strlen(buf), &reader, sizeof(void*));
return;
}
@ -148,24 +158,28 @@ void tfileReaderDestroy(TFileReader* reader) {
int tfileReaderSearch(TFileReader* reader, SIndexTermQuery* query, SArray* result) {
SIndexTerm* term = query->term;
EIndexQueryType qtype = query->qType;
int ret = -1;
// refactor to callback later
if (query->qType == QUERY_TERM) {
if (qtype == QUERY_TERM) {
uint64_t offset;
FstSlice key = fstSliceCreate(term->colVal, term->nColVal);
if (fstGet(reader->fst, &key, &offset)) {
return tfileReadLoadTableIds(reader, offset, result);
indexInfo("index: %" PRIu64 ", col: %s, colVal: %s, found table info in tindex", term->suid, term->colName, term->colVal);
ret = tfileReadLoadTableIds(reader, offset, result);
} else {
indexInfo("index: %" PRIu64 ", col: %s, colVal: %s, not found in tindex", term->suid, term->colName, term->colVal);
indexInfo("index: %" PRIu64 ", col: %s, colVal: %s, not found table info in tindex", term->suid, term->colName, term->colVal);
}
return 0;
} else if (query->qType == QUERY_PREFIX) {
fstSliceDestroy(&key);
} else if (qtype == QUERY_PREFIX) {
// handle later
//
} else {
// handle later
}
return 0;
tfileReadUnRef(reader);
return ret;
}
TFileWriter* tfileWriterCreate(WriterCtx* ctx, TFileHeader* header) {
@ -209,7 +223,6 @@ int tfileWriterPut(TFileWriter* tw, void* data) {
int32_t tbsz = taosArrayGetSize(v->tableId);
fstOffset += TF_TABLE_TATOAL_SIZE(tbsz);
}
// check result or not
tfileWriteFstOffset(tw, fstOffset);
for (size_t i = 0; i < sz; i++) {
@ -237,6 +250,7 @@ int tfileWriterPut(TFileWriter* tw, void* data) {
// write reversed data in buf to tindex
tw->ctx->write(tw->ctx, buf, offset);
}
tfree(buf);
// write fst
for (size_t i = 0; i < sz; i++) {
@ -244,11 +258,8 @@ int tfileWriterPut(TFileWriter* tw, void* data) {
TFileValue* v = taosArrayGetP((SArray*)data, i);
if (tfileWriteData(tw, v) == 0) {
//
//
}
}
tfree(buf);
return 0;
}
void tfileWriterDestroy(TFileWriter* tw) {
@ -270,17 +281,21 @@ void IndexTFileDestroy(IndexTFile* tfile) {
}
int indexTFileSearch(void* tfile, SIndexTermQuery* query, SArray* result) {
if (tfile == NULL) { return -1; }
int ret = -1;
if (tfile == NULL) { return ret; }
IndexTFile* pTfile = (IndexTFile*)tfile;
SIndexTerm* term = query->term;
TFileCacheKey key = {.suid = term->suid, .colType = term->colType, .version = 0, .colName = term->colName, .nColName = term->nColName};
TFileCacheKey key = {.suid = term->suid, .colType = term->colType, .colName = term->colName, .nColName = term->nColName};
TFileReader* reader = tfileCacheGet(pTfile->cache, &key);
return tfileReaderSearch(reader, query, result);
ret = tfileReaderSearch(reader, query, result);
return ret;
}
int indexTFilePut(void* tfile, SIndexTerm* term, uint64_t uid) {
TFileWriterOpt wOpt = {.suid = term->suid, .colType = term->colType, .colName = term->colName, .nColName = term->nColName, .version = 1};
// TFileWriterOpt wOpt = {.suid = term->suid, .colType = term->colType, .colName = term->colName, .nColName = term->nColName, .version =
// 1};
return 0;
}
@ -382,6 +397,15 @@ static int tfileReadLoadTableIds(TFileReader* reader, int32_t offset, SArray* re
free(buf);
return 0;
}
static void tfileReadRef(TFileReader* reader) {
int ref = T_REF_INC(reader);
UNUSED(ref);
}
static void tfileReadUnRef(TFileReader* reader) {
int ref = T_REF_DEC(reader);
if (ref == 0) { tfileReaderDestroy(reader); }
}
static int tfileGetFileList(const char* path, SArray* result) {
DIR* dir = opendir(path);
@ -397,6 +421,10 @@ static int tfileGetFileList(const char* path, SArray* result) {
closedir(dir);
return 0;
}
static int tfileRmExpireFile(SArray* result) {
// TODO(yihao): remove expire tindex after restart
return 0;
}
static void tfileDestroyFileName(void* elem) {
char* p = *(char**)elem;
free(p);
@ -423,7 +451,5 @@ static void tfileSerialCacheKey(TFileCacheKey* key, char* buf) {
SERIALIZE_VAR_TO_BUF(buf, '_', char);
SERIALIZE_MEM_TO_BUF(buf, key, colType);
SERIALIZE_VAR_TO_BUF(buf, '_', char);
SERIALIZE_MEM_TO_BUF(buf, key, version);
SERIALIZE_VAR_TO_BUF(buf, '_', char);
SERIALIZE_STR_MEM_TO_BUF(buf, key, colName, key->nColName);
}