From 6694e4b2dac2e19a72d971705480815ac7b79ed1 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Thu, 23 Dec 2021 22:46:32 +0800 Subject: [PATCH] fix tfile bug and add unit test --- source/libs/index/src/index_tfile.c | 75 +++++++------- source/libs/index/test/indexTests.cc | 140 ++++++++++++++++++++++----- 2 files changed, 156 insertions(+), 59 deletions(-) diff --git a/source/libs/index/src/index_tfile.c b/source/libs/index/src/index_tfile.c index e02a3e0327..9afc29457d 100644 --- a/source/libs/index/src/index_tfile.c +++ b/source/libs/index/src/index_tfile.c @@ -33,11 +33,11 @@ static int tfileWriteHeader(TFileWriter* writer); static int tfileWriteFstOffset(TFileWriter* tw, int32_t offset); static int tfileWriteData(TFileWriter* write, TFileValue* tval); -static int tfileReadLoadHeader(TFileReader* reader); -static int tfileReadLoadFst(TFileReader* reader); -static int tfileReadLoadTableIds(TFileReader* reader, int32_t offset, SArray* result); -static void tfileReadRef(TFileReader* reader); -static void tfileReadUnRef(TFileReader* reader); +static int tfileReaderLoadHeader(TFileReader* reader); +static int tfileReaderLoadFst(TFileReader* reader); +static int tfileReaderLoadTableIds(TFileReader* reader, int32_t offset, SArray* result); +static void tfileReaderRef(TFileReader* reader); +static void tfileReaderUnRef(TFileReader* reader); static int tfileGetFileList(const char* path, SArray* result); static int tfileRmExpireFile(SArray* result); @@ -70,23 +70,12 @@ TFileCache* tfileCacheCreate(const char* path) { WriterCtx* wc = writerCtxCreate(TFile, file, true, 1024 * 64); if (wc == NULL) { - indexError("failed to open index: %s", file); + indexError("failed to open index:%s", file); goto End; } TFileReader* reader = tfileReaderCreate(wc); - if (0 != tfileReadLoadHeader(reader)) { - tfileReaderDestroy(reader); - indexError("failed to load index header, index file: %s", file); - goto End; - } - - if (0 != tfileReadLoadFst(reader)) { - tfileReaderDestroy(reader); - indexError("failed to load index fst, index file: %s", file); - goto End; - } - tfileReadRef(reader); + tfileReaderRef(reader); // loader fst and validate it TFileHeader* header = &reader->header; TFileCacheKey key = {.suid = header->suid, .colName = header->colName, .nColName = strlen(header->colName), .colType = header->colType}; @@ -111,7 +100,7 @@ void tfileCacheDestroy(TFileCache* tcache) { TFileReader* p = *reader; indexInfo("drop table cache suid: %" PRIu64 ", colName: %s, colType: %d", p->header.suid, p->header.colName, p->header.colType); - tfileReadUnRef(p); + tfileReaderUnRef(p); reader = taosHashIterate(tcache->tableCache, reader); } taosHashCleanup(tcache->tableCache); @@ -123,7 +112,7 @@ TFileReader* tfileCacheGet(TFileCache* tcache, TFileCacheKey* key) { tfileSerialCacheKey(key, buf); TFileReader* reader = taosHashGet(tcache->tableCache, buf, strlen(buf)); - tfileReadRef(reader); + tfileReaderRef(reader); return reader; } @@ -135,10 +124,10 @@ void tfileCachePut(TFileCache* tcache, TFileCacheKey* key, TFileReader* reader) if (*p != NULL) { TFileReader* oldReader = *p; taosHashRemove(tcache->tableCache, buf, strlen(buf)); - tfileReadUnRef(oldReader); + tfileReaderUnRef(oldReader); } - tfileReadRef(reader); + tfileReaderRef(reader); taosHashPut(tcache->tableCache, buf, strlen(buf), &reader, sizeof(void*)); return; } @@ -149,6 +138,19 @@ TFileReader* tfileReaderCreate(WriterCtx* ctx) { // T_REF_INC(reader); reader->ctx = ctx; + + if (0 != tfileReaderLoadHeader(reader)) { + tfileReaderDestroy(reader); + indexError("failed to load index header, suid: %" PRIu64 ", colName: %s", reader->header.suid, reader->header.colName); + return NULL; + } + + if (0 != tfileReaderLoadFst(reader)) { + tfileReaderDestroy(reader); + indexError("failed to load index fst, suid: %" PRIu64 ", colName: %s", reader->header.suid, reader->header.colName); + return NULL; + } + return reader; } void tfileReaderDestroy(TFileReader* reader) { @@ -170,7 +172,7 @@ int tfileReaderSearch(TFileReader* reader, SIndexTermQuery* query, SArray* resul FstSlice key = fstSliceCreate(term->colVal, term->nColVal); if (fstGet(reader->fst, &key, &offset)) { indexInfo("index: %" PRIu64 ", col: %s, colVal: %s, found table info in tindex", term->suid, term->colName, term->colVal); - ret = tfileReadLoadTableIds(reader, offset, result); + ret = tfileReaderLoadTableIds(reader, offset, result); } else { indexInfo("index: %" PRIu64 ", col: %s, colVal: %s, not found table info in tindex", term->suid, term->colName, term->colVal); } @@ -181,7 +183,7 @@ int tfileReaderSearch(TFileReader* reader, SIndexTermQuery* query, SArray* resul } else { // handle later } - tfileReadUnRef(reader); + tfileReaderUnRef(reader); return ret; } @@ -205,11 +207,6 @@ TFileWriter* tfileWriterCreate(WriterCtx* ctx, TFileHeader* header) { tw->ctx = ctx; tw->header = *header; tfileWriteHeader(tw); - tw->fb = fstBuilderCreate(ctx, 0); - if (tw->fb == NULL) { - tfileWriterDestroy(tw); - return NULL; - } return tw; } @@ -267,6 +264,11 @@ int tfileWriterPut(TFileWriter* tw, void* data) { } tfree(buf); + tw->fb = fstBuilderCreate(tw->ctx, 0); + if (tw->fb == NULL) { + tfileWriterDestroy(tw); + return -1; + } // write fst for (size_t i = 0; i < sz; i++) { // TODO, fst batch write later @@ -343,6 +345,7 @@ static int tfileWriteFstOffset(TFileWriter* tw, int32_t offset) { int32_t fstOffset = offset + sizeof(tw->header.fstOffset); tw->header.fstOffset = fstOffset; if (sizeof(fstOffset) != tw->ctx->write(tw->ctx, (char*)&fstOffset, sizeof(fstOffset))) { return -1; } + tw->offset += sizeof(fstOffset); return 0; } static int tfileWriteHeader(TFileWriter* writer) { @@ -372,16 +375,16 @@ static int tfileWriteData(TFileWriter* write, TFileValue* tval) { } return 0; } -static int tfileReadLoadHeader(TFileReader* reader) { +static int tfileReaderLoadHeader(TFileReader* reader) { // TODO simple tfile header later char buf[TFILE_HEADER_SIZE] = {0}; - int64_t nread = reader->ctx->read(reader->ctx, buf, sizeof(buf)); + int64_t nread = reader->ctx->readFrom(reader->ctx, buf, sizeof(buf), 0); assert(nread == sizeof(buf)); memcpy(&reader->header, buf, sizeof(buf)); return 0; } -static int tfileReadLoadFst(TFileReader* reader) { +static int tfileReaderLoadFst(TFileReader* reader) { // current load fst into memory, refactor it later static int FST_MAX_SIZE = 16 * 1024; @@ -398,9 +401,9 @@ static int tfileReadLoadFst(TFileReader* reader) { free(buf); fstSliceDestroy(&st); - return reader->fst == NULL ? 0 : -1; + return reader->fst != NULL ? 0 : -1; } -static int tfileReadLoadTableIds(TFileReader* reader, int32_t offset, SArray* result) { +static int tfileReaderLoadTableIds(TFileReader* reader, int32_t offset, SArray* result) { int32_t nid; WriterCtx* ctx = reader->ctx; @@ -420,12 +423,12 @@ static int tfileReadLoadTableIds(TFileReader* reader, int32_t offset, SArray* re free(buf); return 0; } -static void tfileReadRef(TFileReader* reader) { +static void tfileReaderRef(TFileReader* reader) { int ref = T_REF_INC(reader); UNUSED(ref); } -static void tfileReadUnRef(TFileReader* reader) { +static void tfileReaderUnRef(TFileReader* reader) { int ref = T_REF_DEC(reader); if (ref == 0) { tfileReaderDestroy(reader); } } diff --git a/source/libs/index/test/indexTests.cc b/source/libs/index/test/indexTests.cc index f8dae787b8..6c6f45bd11 100644 --- a/source/libs/index/test/indexTests.cc +++ b/source/libs/index/test/indexTests.cc @@ -21,7 +21,7 @@ #include "index_fst_util.h" #include "index_tfile.h" #include "tutil.h" - +using namespace std; class FstWriter { public: FstWriter() { @@ -355,41 +355,121 @@ class IndexEnv : public ::testing::Test { // // //} +class TFileObj { + public: + TFileObj(const std::string& path = "/tmp/tindex", const std::string& colName = "voltage") : path_(path), colName_(colName) { + colId_ = 10; + // Do Nothing + // + } + int Put(SArray* tv) { + if (reader_ != NULL) { + tfileReaderDestroy(reader_); + reader_ = NULL; + } + if (writer_ == NULL) { InitWriter(); } + return tfileWriterPut(writer_, tv); + } + bool InitWriter() { + TFileHeader header; + header.suid = 1; + header.version = 1; + memcpy(header.colName, colName_.c_str(), colName_.size()); + header.colType = TSDB_DATA_TYPE_BINARY; + + std::string path(path_); + int colId = 2; + char buf[64] = {0}; + sprintf(buf, "%" PRIu64 "-%d-%d.tindex", header.suid, colId_, header.version); + path.append("/").append(buf); + + fileName_ = path; + + WriterCtx* ctx = writerCtxCreate(TFile, path.c_str(), false, 64 * 1024 * 1024); + + writer_ = tfileWriterCreate(ctx, &header); + return writer_ != NULL ? true : false; + } + bool InitReader() { + WriterCtx* ctx = writerCtxCreate(TFile, fileName_.c_str(), true, 64 * 1024 * 1024); + reader_ = tfileReaderCreate(ctx); + return reader_ != NULL ? true : false; + } + int Get(SIndexTermQuery* query, SArray* result) { + if (writer_ != NULL) { + tfileWriterDestroy(writer_); + writer_ = NULL; + } + if (reader_ == NULL && InitReader()) { + // + // + } + return tfileReaderSearch(reader_, query, result); + } + ~TFileObj() { + if (writer_) { tfileWriterDestroy(writer_); } + if (reader_) { tfileReaderDestroy(reader_); } + } + + private: + std::string path_; + std::string colName_; + std::string fileName_; + + TFileWriter* writer_; + TFileReader* reader_; + + int colId_; +}; class IndexTFileEnv : public ::testing::Test { protected: virtual void SetUp() { - taosRemoveDir(dir); - taosMkDir(dir); + taosRemoveDir(dir.c_str()); + taosMkDir(dir.c_str()); tfInit(); - std::string colName("voltage"); - header.suid = 1; - header.version = 1; - memcpy(header.colName, colName.c_str(), colName.size()); - header.colType = TSDB_DATA_TYPE_BINARY; + fObj = new TFileObj(dir, colName); - std::string path(dir); - int colId = 2; - char buf[64] = {0}; - sprintf(buf, "%" PRIu64 "-%d-%d.tindex", header.suid, colId, header.version); - path.append("/").append(buf); + // std::string colName("voltage"); + // header.suid = 1; + // header.version = 1; + // memcpy(header.colName, colName.c_str(), colName.size()); + // header.colType = TSDB_DATA_TYPE_BINARY; - ctx = writerCtxCreate(TFile, path.c_str(), false, 64 * 1024 * 1024); + // std::string path(dir); + // int colId = 2; + // char buf[64] = {0}; + // sprintf(buf, "%" PRIu64 "-%d-%d.tindex", header.suid, colId, header.version); + // path.append("/").append(buf); - twrite = tfileWriterCreate(ctx, &header); + // ctx = writerCtxCreate(TFile, path.c_str(), false, 64 * 1024 * 1024); + + // twrite = tfileWriterCreate(ctx, &header); } virtual void TearDown() { // indexClose(index); // indexeptsDestroy(opts); + delete fObj; tfCleanup(); - tfileWriterDestroy(twrite); + // tfileWriterDestroy(twrite); } - const char* dir = "/tmp/tindex"; - WriterCtx* ctx = NULL; - TFileHeader header; - TFileWriter* twrite = NULL; + TFileObj* fObj; + std::string dir = "/tmp/tindex"; + std::string colName = "voltage"; + + int coldId = 2; + int version = 1; + int colType = TSDB_DATA_TYPE_BINARY; + + // WriterCtx* ctx = NULL; + // TFileHeader header; + // TFileWriter* twrite = NULL; }; +// static TFileWriter* genTFileWriter(const char* path, TFileHeader* header) { +// char buf[128] = {0}; +// WriterCtx* ctx = writerCtxCreate(TFile, path, false, ) +//} static TFileValue* genTFileValue(const char* val) { TFileValue* tv = (TFileValue*)calloc(1, sizeof(TFileValue)); int32_t vlen = strlen(val) + 1; @@ -413,17 +493,31 @@ static void destroyTFileValue(void* val) { TEST_F(IndexTFileEnv, test_tfile_write) { TFileValue* v1 = genTFileValue("c"); TFileValue* v2 = genTFileValue("a"); + TFileValue* v3 = genTFileValue("b"); + TFileValue* v4 = genTFileValue("d"); SArray* data = (SArray*)taosArrayInit(4, sizeof(void*)); taosArrayPush(data, &v1); taosArrayPush(data, &v2); + taosArrayPush(data, &v3); + taosArrayPush(data, &v4); - tfileWriterPut(twrite, data); - // tfileWriterDestroy(twrite); - + fObj->Put(data); for (size_t i = 0; i < taosArrayGetSize(data); i++) { destroyTFileValue(taosArrayGetP(data, i)); } taosArrayDestroy(data); + + std::string colName("voltage"); + std::string colVal("b"); + SIndexTerm* term = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), colVal.c_str(), colVal.size()); + SIndexTermQuery query = {.term = term, .qType = QUERY_TERM}; + + SArray* result = (SArray*)taosArrayInit(1, sizeof(uint64_t)); + fObj->Get(&query, result); + assert(taosArrayGetSize(result) == 10); + indexTermDestroy(term); + + // tfileWriterDestroy(twrite); }