Merge pull request #9351 from taosdata/feature/index_cache

fix tfile bug and add unit test
This commit is contained in:
Shengliang Guan 2021-12-24 09:38:59 +08:00 committed by GitHub
commit 1d68b746e6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 156 additions and 59 deletions

View File

@ -33,11 +33,11 @@ static int tfileWriteHeader(TFileWriter* writer);
static int tfileWriteFstOffset(TFileWriter* tw, int32_t offset); static int tfileWriteFstOffset(TFileWriter* tw, int32_t offset);
static int tfileWriteData(TFileWriter* write, TFileValue* tval); static int tfileWriteData(TFileWriter* write, TFileValue* tval);
static int tfileReadLoadHeader(TFileReader* reader); static int tfileReaderLoadHeader(TFileReader* reader);
static int tfileReadLoadFst(TFileReader* reader); static int tfileReaderLoadFst(TFileReader* reader);
static int tfileReadLoadTableIds(TFileReader* reader, int32_t offset, SArray* result); static int tfileReaderLoadTableIds(TFileReader* reader, int32_t offset, SArray* result);
static void tfileReadRef(TFileReader* reader); static void tfileReaderRef(TFileReader* reader);
static void tfileReadUnRef(TFileReader* reader); static void tfileReaderUnRef(TFileReader* reader);
static int tfileGetFileList(const char* path, SArray* result); static int tfileGetFileList(const char* path, SArray* result);
static int tfileRmExpireFile(SArray* result); static int tfileRmExpireFile(SArray* result);
@ -70,23 +70,12 @@ TFileCache* tfileCacheCreate(const char* path) {
WriterCtx* wc = writerCtxCreate(TFile, file, true, 1024 * 64); WriterCtx* wc = writerCtxCreate(TFile, file, true, 1024 * 64);
if (wc == NULL) { if (wc == NULL) {
indexError("failed to open index: %s", file); indexError("failed to open index:%s", file);
goto End; goto End;
} }
TFileReader* reader = tfileReaderCreate(wc); TFileReader* reader = tfileReaderCreate(wc);
if (0 != tfileReadLoadHeader(reader)) { tfileReaderRef(reader);
tfileReaderDestroy(reader);
indexError("failed to load index header, index file: %s", file);
goto End;
}
if (0 != tfileReadLoadFst(reader)) {
tfileReaderDestroy(reader);
indexError("failed to load index fst, index file: %s", file);
goto End;
}
tfileReadRef(reader);
// loader fst and validate it // loader fst and validate it
TFileHeader* header = &reader->header; TFileHeader* header = &reader->header;
TFileCacheKey key = {.suid = header->suid, .colName = header->colName, .nColName = strlen(header->colName), .colType = header->colType}; TFileCacheKey key = {.suid = header->suid, .colName = header->colName, .nColName = strlen(header->colName), .colType = header->colType};
@ -111,7 +100,7 @@ void tfileCacheDestroy(TFileCache* tcache) {
TFileReader* p = *reader; TFileReader* p = *reader;
indexInfo("drop table cache suid: %" PRIu64 ", colName: %s, colType: %d", p->header.suid, p->header.colName, p->header.colType); indexInfo("drop table cache suid: %" PRIu64 ", colName: %s, colType: %d", p->header.suid, p->header.colName, p->header.colType);
tfileReadUnRef(p); tfileReaderUnRef(p);
reader = taosHashIterate(tcache->tableCache, reader); reader = taosHashIterate(tcache->tableCache, reader);
} }
taosHashCleanup(tcache->tableCache); taosHashCleanup(tcache->tableCache);
@ -123,7 +112,7 @@ TFileReader* tfileCacheGet(TFileCache* tcache, TFileCacheKey* key) {
tfileSerialCacheKey(key, buf); tfileSerialCacheKey(key, buf);
TFileReader* reader = taosHashGet(tcache->tableCache, buf, strlen(buf)); TFileReader* reader = taosHashGet(tcache->tableCache, buf, strlen(buf));
tfileReadRef(reader); tfileReaderRef(reader);
return reader; return reader;
} }
@ -135,10 +124,10 @@ void tfileCachePut(TFileCache* tcache, TFileCacheKey* key, TFileReader* reader)
if (*p != NULL) { if (*p != NULL) {
TFileReader* oldReader = *p; TFileReader* oldReader = *p;
taosHashRemove(tcache->tableCache, buf, strlen(buf)); taosHashRemove(tcache->tableCache, buf, strlen(buf));
tfileReadUnRef(oldReader); tfileReaderUnRef(oldReader);
} }
tfileReadRef(reader); tfileReaderRef(reader);
taosHashPut(tcache->tableCache, buf, strlen(buf), &reader, sizeof(void*)); taosHashPut(tcache->tableCache, buf, strlen(buf), &reader, sizeof(void*));
return; return;
} }
@ -149,6 +138,19 @@ TFileReader* tfileReaderCreate(WriterCtx* ctx) {
// T_REF_INC(reader); // T_REF_INC(reader);
reader->ctx = ctx; reader->ctx = ctx;
if (0 != tfileReaderLoadHeader(reader)) {
tfileReaderDestroy(reader);
indexError("failed to load index header, suid: %" PRIu64 ", colName: %s", reader->header.suid, reader->header.colName);
return NULL;
}
if (0 != tfileReaderLoadFst(reader)) {
tfileReaderDestroy(reader);
indexError("failed to load index fst, suid: %" PRIu64 ", colName: %s", reader->header.suid, reader->header.colName);
return NULL;
}
return reader; return reader;
} }
void tfileReaderDestroy(TFileReader* reader) { void tfileReaderDestroy(TFileReader* reader) {
@ -170,7 +172,7 @@ int tfileReaderSearch(TFileReader* reader, SIndexTermQuery* query, SArray* resul
FstSlice key = fstSliceCreate(term->colVal, term->nColVal); FstSlice key = fstSliceCreate(term->colVal, term->nColVal);
if (fstGet(reader->fst, &key, &offset)) { if (fstGet(reader->fst, &key, &offset)) {
indexInfo("index: %" PRIu64 ", col: %s, colVal: %s, found table info in tindex", term->suid, term->colName, term->colVal); indexInfo("index: %" PRIu64 ", col: %s, colVal: %s, found table info in tindex", term->suid, term->colName, term->colVal);
ret = tfileReadLoadTableIds(reader, offset, result); ret = tfileReaderLoadTableIds(reader, offset, result);
} else { } else {
indexInfo("index: %" PRIu64 ", col: %s, colVal: %s, not found table info in tindex", term->suid, term->colName, term->colVal); indexInfo("index: %" PRIu64 ", col: %s, colVal: %s, not found table info in tindex", term->suid, term->colName, term->colVal);
} }
@ -181,7 +183,7 @@ int tfileReaderSearch(TFileReader* reader, SIndexTermQuery* query, SArray* resul
} else { } else {
// handle later // handle later
} }
tfileReadUnRef(reader); tfileReaderUnRef(reader);
return ret; return ret;
} }
@ -205,11 +207,6 @@ TFileWriter* tfileWriterCreate(WriterCtx* ctx, TFileHeader* header) {
tw->ctx = ctx; tw->ctx = ctx;
tw->header = *header; tw->header = *header;
tfileWriteHeader(tw); tfileWriteHeader(tw);
tw->fb = fstBuilderCreate(ctx, 0);
if (tw->fb == NULL) {
tfileWriterDestroy(tw);
return NULL;
}
return tw; return tw;
} }
@ -267,6 +264,11 @@ int tfileWriterPut(TFileWriter* tw, void* data) {
} }
tfree(buf); tfree(buf);
tw->fb = fstBuilderCreate(tw->ctx, 0);
if (tw->fb == NULL) {
tfileWriterDestroy(tw);
return -1;
}
// write fst // write fst
for (size_t i = 0; i < sz; i++) { for (size_t i = 0; i < sz; i++) {
// TODO, fst batch write later // TODO, fst batch write later
@ -343,6 +345,7 @@ static int tfileWriteFstOffset(TFileWriter* tw, int32_t offset) {
int32_t fstOffset = offset + sizeof(tw->header.fstOffset); int32_t fstOffset = offset + sizeof(tw->header.fstOffset);
tw->header.fstOffset = fstOffset; tw->header.fstOffset = fstOffset;
if (sizeof(fstOffset) != tw->ctx->write(tw->ctx, (char*)&fstOffset, sizeof(fstOffset))) { return -1; } if (sizeof(fstOffset) != tw->ctx->write(tw->ctx, (char*)&fstOffset, sizeof(fstOffset))) { return -1; }
tw->offset += sizeof(fstOffset);
return 0; return 0;
} }
static int tfileWriteHeader(TFileWriter* writer) { static int tfileWriteHeader(TFileWriter* writer) {
@ -372,16 +375,16 @@ static int tfileWriteData(TFileWriter* write, TFileValue* tval) {
} }
return 0; return 0;
} }
static int tfileReadLoadHeader(TFileReader* reader) { static int tfileReaderLoadHeader(TFileReader* reader) {
// TODO simple tfile header later // TODO simple tfile header later
char buf[TFILE_HEADER_SIZE] = {0}; char buf[TFILE_HEADER_SIZE] = {0};
int64_t nread = reader->ctx->read(reader->ctx, buf, sizeof(buf)); int64_t nread = reader->ctx->readFrom(reader->ctx, buf, sizeof(buf), 0);
assert(nread == sizeof(buf)); assert(nread == sizeof(buf));
memcpy(&reader->header, buf, sizeof(buf)); memcpy(&reader->header, buf, sizeof(buf));
return 0; return 0;
} }
static int tfileReadLoadFst(TFileReader* reader) { static int tfileReaderLoadFst(TFileReader* reader) {
// current load fst into memory, refactor it later // current load fst into memory, refactor it later
static int FST_MAX_SIZE = 16 * 1024; static int FST_MAX_SIZE = 16 * 1024;
@ -398,9 +401,9 @@ static int tfileReadLoadFst(TFileReader* reader) {
free(buf); free(buf);
fstSliceDestroy(&st); fstSliceDestroy(&st);
return reader->fst == NULL ? 0 : -1; return reader->fst != NULL ? 0 : -1;
} }
static int tfileReadLoadTableIds(TFileReader* reader, int32_t offset, SArray* result) { static int tfileReaderLoadTableIds(TFileReader* reader, int32_t offset, SArray* result) {
int32_t nid; int32_t nid;
WriterCtx* ctx = reader->ctx; WriterCtx* ctx = reader->ctx;
@ -420,12 +423,12 @@ static int tfileReadLoadTableIds(TFileReader* reader, int32_t offset, SArray* re
free(buf); free(buf);
return 0; return 0;
} }
static void tfileReadRef(TFileReader* reader) { static void tfileReaderRef(TFileReader* reader) {
int ref = T_REF_INC(reader); int ref = T_REF_INC(reader);
UNUSED(ref); UNUSED(ref);
} }
static void tfileReadUnRef(TFileReader* reader) { static void tfileReaderUnRef(TFileReader* reader) {
int ref = T_REF_DEC(reader); int ref = T_REF_DEC(reader);
if (ref == 0) { tfileReaderDestroy(reader); } if (ref == 0) { tfileReaderDestroy(reader); }
} }

View File

@ -21,7 +21,7 @@
#include "index_fst_util.h" #include "index_fst_util.h"
#include "index_tfile.h" #include "index_tfile.h"
#include "tutil.h" #include "tutil.h"
using namespace std;
class FstWriter { class FstWriter {
public: public:
FstWriter() { FstWriter() {
@ -355,41 +355,121 @@ class IndexEnv : public ::testing::Test {
// // // //
//} //}
class TFileObj {
public:
TFileObj(const std::string& path = "/tmp/tindex", const std::string& colName = "voltage") : path_(path), colName_(colName) {
colId_ = 10;
// Do Nothing
//
}
int Put(SArray* tv) {
if (reader_ != NULL) {
tfileReaderDestroy(reader_);
reader_ = NULL;
}
if (writer_ == NULL) { InitWriter(); }
return tfileWriterPut(writer_, tv);
}
bool InitWriter() {
TFileHeader header;
header.suid = 1;
header.version = 1;
memcpy(header.colName, colName_.c_str(), colName_.size());
header.colType = TSDB_DATA_TYPE_BINARY;
std::string path(path_);
int colId = 2;
char buf[64] = {0};
sprintf(buf, "%" PRIu64 "-%d-%d.tindex", header.suid, colId_, header.version);
path.append("/").append(buf);
fileName_ = path;
WriterCtx* ctx = writerCtxCreate(TFile, path.c_str(), false, 64 * 1024 * 1024);
writer_ = tfileWriterCreate(ctx, &header);
return writer_ != NULL ? true : false;
}
bool InitReader() {
WriterCtx* ctx = writerCtxCreate(TFile, fileName_.c_str(), true, 64 * 1024 * 1024);
reader_ = tfileReaderCreate(ctx);
return reader_ != NULL ? true : false;
}
int Get(SIndexTermQuery* query, SArray* result) {
if (writer_ != NULL) {
tfileWriterDestroy(writer_);
writer_ = NULL;
}
if (reader_ == NULL && InitReader()) {
//
//
}
return tfileReaderSearch(reader_, query, result);
}
~TFileObj() {
if (writer_) { tfileWriterDestroy(writer_); }
if (reader_) { tfileReaderDestroy(reader_); }
}
private:
std::string path_;
std::string colName_;
std::string fileName_;
TFileWriter* writer_;
TFileReader* reader_;
int colId_;
};
class IndexTFileEnv : public ::testing::Test { class IndexTFileEnv : public ::testing::Test {
protected: protected:
virtual void SetUp() { virtual void SetUp() {
taosRemoveDir(dir); taosRemoveDir(dir.c_str());
taosMkDir(dir); taosMkDir(dir.c_str());
tfInit(); tfInit();
std::string colName("voltage"); fObj = new TFileObj(dir, colName);
header.suid = 1;
header.version = 1;
memcpy(header.colName, colName.c_str(), colName.size());
header.colType = TSDB_DATA_TYPE_BINARY;
std::string path(dir); // std::string colName("voltage");
int colId = 2; // header.suid = 1;
char buf[64] = {0}; // header.version = 1;
sprintf(buf, "%" PRIu64 "-%d-%d.tindex", header.suid, colId, header.version); // memcpy(header.colName, colName.c_str(), colName.size());
path.append("/").append(buf); // header.colType = TSDB_DATA_TYPE_BINARY;
ctx = writerCtxCreate(TFile, path.c_str(), false, 64 * 1024 * 1024); // std::string path(dir);
// int colId = 2;
// char buf[64] = {0};
// sprintf(buf, "%" PRIu64 "-%d-%d.tindex", header.suid, colId, header.version);
// path.append("/").append(buf);
twrite = tfileWriterCreate(ctx, &header); // ctx = writerCtxCreate(TFile, path.c_str(), false, 64 * 1024 * 1024);
// twrite = tfileWriterCreate(ctx, &header);
} }
virtual void TearDown() { virtual void TearDown() {
// indexClose(index); // indexClose(index);
// indexeptsDestroy(opts); // indexeptsDestroy(opts);
delete fObj;
tfCleanup(); tfCleanup();
tfileWriterDestroy(twrite); // tfileWriterDestroy(twrite);
} }
const char* dir = "/tmp/tindex"; TFileObj* fObj;
WriterCtx* ctx = NULL; std::string dir = "/tmp/tindex";
TFileHeader header; std::string colName = "voltage";
TFileWriter* twrite = NULL;
int coldId = 2;
int version = 1;
int colType = TSDB_DATA_TYPE_BINARY;
// WriterCtx* ctx = NULL;
// TFileHeader header;
// TFileWriter* twrite = NULL;
}; };
// static TFileWriter* genTFileWriter(const char* path, TFileHeader* header) {
// char buf[128] = {0};
// WriterCtx* ctx = writerCtxCreate(TFile, path, false, )
//}
static TFileValue* genTFileValue(const char* val) { static TFileValue* genTFileValue(const char* val) {
TFileValue* tv = (TFileValue*)calloc(1, sizeof(TFileValue)); TFileValue* tv = (TFileValue*)calloc(1, sizeof(TFileValue));
int32_t vlen = strlen(val) + 1; int32_t vlen = strlen(val) + 1;
@ -413,17 +493,31 @@ static void destroyTFileValue(void* val) {
TEST_F(IndexTFileEnv, test_tfile_write) { TEST_F(IndexTFileEnv, test_tfile_write) {
TFileValue* v1 = genTFileValue("c"); TFileValue* v1 = genTFileValue("c");
TFileValue* v2 = genTFileValue("a"); TFileValue* v2 = genTFileValue("a");
TFileValue* v3 = genTFileValue("b");
TFileValue* v4 = genTFileValue("d");
SArray* data = (SArray*)taosArrayInit(4, sizeof(void*)); SArray* data = (SArray*)taosArrayInit(4, sizeof(void*));
taosArrayPush(data, &v1); taosArrayPush(data, &v1);
taosArrayPush(data, &v2); taosArrayPush(data, &v2);
taosArrayPush(data, &v3);
taosArrayPush(data, &v4);
tfileWriterPut(twrite, data); fObj->Put(data);
// tfileWriterDestroy(twrite);
for (size_t i = 0; i < taosArrayGetSize(data); i++) { for (size_t i = 0; i < taosArrayGetSize(data); i++) {
destroyTFileValue(taosArrayGetP(data, i)); destroyTFileValue(taosArrayGetP(data, i));
} }
taosArrayDestroy(data); taosArrayDestroy(data);
std::string colName("voltage");
std::string colVal("b");
SIndexTerm* term = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), colVal.c_str(), colVal.size());
SIndexTermQuery query = {.term = term, .qType = QUERY_TERM};
SArray* result = (SArray*)taosArrayInit(1, sizeof(uint64_t));
fObj->Get(&query, result);
assert(taosArrayGetSize(result) == 10);
indexTermDestroy(term);
// tfileWriterDestroy(twrite);
} }