From 2a17fa3948d906656766c68602cbb7bb2afccd08 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Sat, 8 Jan 2022 18:56:54 +0800 Subject: [PATCH 1/3] add tfile analysis tool --- source/libs/index/src/index_tfile.c | 9 +++++--- source/libs/index/test/fstTest.cc | 34 +++++++++++++++++++++++++--- source/libs/index/test/indexTests.cc | 20 ++++++++++------ 3 files changed, 50 insertions(+), 13 deletions(-) diff --git a/source/libs/index/src/index_tfile.c b/source/libs/index/src/index_tfile.c index 495c4d4477..6b7d8a4082 100644 --- a/source/libs/index/src/index_tfile.c +++ b/source/libs/index/src/index_tfile.c @@ -527,9 +527,12 @@ static int tfileReaderLoadFst(TFileReader* reader) { if (buf == NULL) { return -1; } WriterCtx* ctx = reader->ctx; - int32_t nread = ctx->readFrom(ctx, buf, FST_MAX_SIZE, reader->header.fstOffset); - indexInfo("nread = %d, and fst offset=%d, filename: %s, size: %d ", nread, reader->header.fstOffset, ctx->file.buf, - ctx->file.size); + + int64_t ts = taosGetTimestampUs(); + int32_t nread = ctx->readFrom(ctx, buf, FST_MAX_SIZE, reader->header.fstOffset); + int64_t cost = taosGetTimestampUs() - ts; + indexInfo("nread = %d, and fst offset=%d, filename: %s, size: %d, time cost: %" PRId64 "us", nread, + reader->header.fstOffset, ctx->file.buf, ctx->file.size, cost); // we assuse fst size less than FST_MAX_SIZE assert(nread > 0 && nread < FST_MAX_SIZE); diff --git a/source/libs/index/test/fstTest.cc b/source/libs/index/test/fstTest.cc index 70671a5f3e..a2c0046f9a 100644 --- a/source/libs/index/test/fstTest.cc +++ b/source/libs/index/test/fstTest.cc @@ -1,4 +1,5 @@ +#include #include #include #include @@ -12,7 +13,6 @@ #include "index_tfile.h" #include "tskiplist.h" #include "tutil.h" - void* callback(void* s) { return s; } static std::string fileName = "/tmp/tindex.tindex"; @@ -293,7 +293,7 @@ void validateTFile(char* arg) { std::thread threads[NUM_OF_THREAD]; // std::vector threads; - TFileReader* reader = tfileReaderOpen(arg, 0, 999992, "tag1"); + TFileReader* reader = tfileReaderOpen(arg, 0, 20000000, "tag1"); for (int i = 0; i < NUM_OF_THREAD; i++) { threads[i] = std::thread(fst_get, reader->fst); @@ -306,13 +306,41 @@ void validateTFile(char* arg) { } tfCleanup(); } + +void iterTFileReader(char* path, char* ver) { + tfInit(); + + int version = atoi(ver); + TFileReader* reader = tfileReaderOpen(path, 0, version, "tag1"); + Iterate* iter = tfileIteratorCreate(reader); + bool tn = iter ? iter->next(iter) : false; + int count = 0; + int termCount = 0; + while (tn == true) { + count++; + IterateValue* cv = iter->getValue(iter); + termCount += (int)taosArrayGetSize(cv->val); + printf("col val: %s, size: %d\n", cv->colVal, (int)taosArrayGetSize(cv->val)); + tn = iter->next(iter); + } + printf("total size: %d\n term count: %d\n", count, termCount); + + tfileIteratorDestroy(iter); + tfCleanup(); +} + int main(int argc, char* argv[]) { // tool to check all kind of fst test // if (argc > 1) { validateTFile(argv[1]); } + if (argc > 2) { + // opt + iterTFileReader(argv[1], argv[2]); + } // checkFstCheckIterator(); // checkFstLongTerm(); // checkFstPrefixSearch(); - checkMillonWriteAndReadOfFst(); + // checkMillonWriteAndReadOfFst(); + return 1; } diff --git a/source/libs/index/test/indexTests.cc b/source/libs/index/test/indexTests.cc index 5438f88b76..4f3330b7b3 100644 --- a/source/libs/index/test/indexTests.cc +++ b/source/libs/index/test/indexTests.cc @@ -665,14 +665,19 @@ class IndexObj { size_t numOfTable = 100 * 10000) { std::string tColVal = colVal; size_t colValSize = tColVal.size(); + int skip = 100; + numOfTable /= skip; for (int i = 0; i < numOfTable; i++) { - tColVal[i % colValSize] = 'a' + i % 26; + for (int k = 0; k < 10 && k < colVal.size(); k++) { + // opt + tColVal[rand() % colValSize] = 'a' + k % 26; + } SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), tColVal.c_str(), tColVal.size()); SIndexMultiTerm* terms = indexMultiTermCreate(); indexMultiTermAdd(terms, term); - for (size_t i = 0; i < 10; i++) { - int ret = Put(terms, i); + for (size_t j = 0; j < skip; j++) { + int ret = Put(terms, j); assert(ret == 0); } indexMultiTermDestroy(terms); @@ -939,10 +944,11 @@ TEST_F(IndexEnv2, testIndex_read_performance) { TEST_F(IndexEnv2, testIndexMultiTag) { std::string path = "/tmp/multi_tag"; if (index->Init(path) != 0) {} - index->WriteMultiMillonData("tag1", "Hello", 100 * 10000); - index->WriteMultiMillonData("tag2", "Test", 100 * 10000); - index->WriteMultiMillonData("tag3", "Test", 100 * 10000); - index->WriteMultiMillonData("tag4", "Test", 100 * 10000); + int64_t st = taosGetTimestampUs(); + int32_t num = 1000 * 10000; + index->WriteMultiMillonData("tag1", "xxxxxxxxxxxxxxx", num); + std::cout << "numOfRow: " << num << "\ttime cost:" << taosGetTimestampUs() - st << std::endl; + // index->WriteMultiMillonData("tag2", "xxxxxxxxxxxxxxxxxxxxxxxxx", 100 * 10000); } TEST_F(IndexEnv2, testLongComVal) { std::string path = "/tmp/long_colVal"; From 15c0275333ca305320feb6d3325b97193820fba2 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Sat, 8 Jan 2022 23:58:51 +0800 Subject: [PATCH 2/3] validate tfile --- source/libs/index/inc/indexInt.h | 1 + .../index/inc/index_fst_counting_writer.h | 1 + source/libs/index/src/index.c | 5 +- source/libs/index/src/index_cache.c | 2 +- source/libs/index/src/index_fst.c | 5 +- .../index/src/index_fst_counting_writer.c | 15 +++++- source/libs/index/src/index_tfile.c | 48 ++++++++++++++++++- 7 files changed, 71 insertions(+), 6 deletions(-) diff --git a/source/libs/index/inc/indexInt.h b/source/libs/index/inc/indexInt.h index 378af4c1d1..90ad1e15f4 100644 --- a/source/libs/index/inc/indexInt.h +++ b/source/libs/index/inc/indexInt.h @@ -19,6 +19,7 @@ #include "index.h" #include "index_fst.h" #include "taos.h" +#include "tchecksum.h" #include "thash.h" #include "tlog.h" diff --git a/source/libs/index/inc/index_fst_counting_writer.h b/source/libs/index/inc/index_fst_counting_writer.h index d7363f2f4c..1e0a88e17f 100644 --- a/source/libs/index/inc/index_fst_counting_writer.h +++ b/source/libs/index/inc/index_fst_counting_writer.h @@ -34,6 +34,7 @@ typedef struct WriterCtx { int (*read)(struct WriterCtx* ctx, uint8_t* buf, int len); int (*flush)(struct WriterCtx* ctx); int (*readFrom)(struct WriterCtx* ctx, uint8_t* buf, int len, int32_t offset); + int (*size)(struct WriterCtx* ctx); WriterType type; union { struct { diff --git a/source/libs/index/src/index.c b/source/libs/index/src/index.c index 04dac57d8f..19e9375491 100644 --- a/source/libs/index/src/index.c +++ b/source/libs/index/src/index.c @@ -34,7 +34,10 @@ void indexInit() { // refactor later indexQhandle = taosInitScheduler(INDEX_QUEUE_SIZE, INDEX_NUM_OF_THREADS, "index"); } -void indexCleanUp() { taosCleanUpScheduler(indexQhandle); } +void indexCleanUp() { + // refacto later + taosCleanUpScheduler(indexQhandle); +} static int uidCompare(const void* a, const void* b) { // add more version compare diff --git a/source/libs/index/src/index_cache.c b/source/libs/index/src/index_cache.c index 8bc3776ed9..294c8192e8 100644 --- a/source/libs/index/src/index_cache.c +++ b/source/libs/index/src/index_cache.c @@ -21,7 +21,7 @@ #define MAX_INDEX_KEY_LEN 256 // test only, change later #define MEM_TERM_LIMIT 10 * 10000 -#define MEM_THRESHOLD 1024 * 1024 * 2 +#define MEM_THRESHOLD 1024 * 1024 #define MEM_ESTIMATE_RADIO 1.5 static void indexMemRef(MemTable* tbl); diff --git a/source/libs/index/src/index_fst.c b/source/libs/index/src/index_fst.c index bfaeeaaa33..4f782cef26 100644 --- a/source/libs/index/src/index_fst.c +++ b/source/libs/index/src/index_fst.c @@ -935,7 +935,10 @@ Fst* fstCreate(FstSlice* slice) { uint32_t checkSum = 0; len -= sizeof(checkSum); taosDecodeFixedU32(buf + len, &checkSum); - + if (taosCheckChecksum(buf, len, checkSum)) { + // verify fst + return NULL; + } CompiledAddr rootAddr; len -= sizeof(rootAddr); taosDecodeFixedU64(buf + len, &rootAddr); diff --git a/source/libs/index/src/index_fst_counting_writer.c b/source/libs/index/src/index_fst_counting_writer.c index 0763aae857..6db5555aa6 100644 --- a/source/libs/index/src/index_fst_counting_writer.c +++ b/source/libs/index/src/index_fst_counting_writer.c @@ -59,6 +59,13 @@ static int writeCtxDoReadFrom(WriterCtx* ctx, uint8_t* buf, int len, int32_t off } return nRead; } +static int writeCtxGetSize(WriterCtx* ctx) { + if (ctx->type == TFile && ctx->file.readOnly) { + // refactor later + return ctx->file.size; + } + return 0; +} static int writeCtxDoFlush(WriterCtx* ctx) { if (ctx->type == TFile) { // taosFsyncFile(ctx->file.fd); @@ -109,6 +116,7 @@ WriterCtx* writerCtxCreate(WriterType type, const char* path, bool readOnly, int ctx->read = writeCtxDoRead; ctx->flush = writeCtxDoFlush; ctx->readFrom = writeCtxDoReadFrom; + ctx->size = writeCtxGetSize; ctx->offset = 0; ctx->limit = capacity; @@ -159,6 +167,8 @@ int fstCountingWriterWrite(FstCountingWriter* write, uint8_t* buf, uint32_t len) int nWrite = ctx->write(ctx, buf, len); assert(nWrite == len); write->count += len; + + write->summer = taosCalcChecksum(write->summer, buf, len); return len; } int fstCountingWriterRead(FstCountingWriter* write, uint8_t* buf, uint32_t len) { @@ -169,7 +179,10 @@ int fstCountingWriterRead(FstCountingWriter* write, uint8_t* buf, uint32_t len) return nRead; } -uint32_t fstCountingWriterMaskedCheckSum(FstCountingWriter* write) { return 0; } +uint32_t fstCountingWriterMaskedCheckSum(FstCountingWriter* write) { + // opt + return write->summer; +} int fstCountingWriterFlush(FstCountingWriter* write) { WriterCtx* ctx = write->wrt; diff --git a/source/libs/index/src/index_tfile.c b/source/libs/index/src/index_tfile.c index 6b7d8a4082..6d1dac8e35 100644 --- a/source/libs/index/src/index_tfile.c +++ b/source/libs/index/src/index_tfile.c @@ -21,8 +21,11 @@ p * #include "index_fst_counting_writer.h" #include "index_util.h" #include "taosdef.h" +#include "tcoding.h" #include "tcompare.h" +const static uint64_t tfileMagicNumber = 0xdb4775248b80fb57ull; + typedef struct TFileFstIter { FstStreamBuilder* fb; StreamWithState* st; @@ -40,9 +43,12 @@ static void tfileSerialTableIdsToBuf(char* buf, SArray* tableIds); static int tfileWriteHeader(TFileWriter* writer); static int tfileWriteFstOffset(TFileWriter* tw, int32_t offset); static int tfileWriteData(TFileWriter* write, TFileValue* tval); +static int tfileWriteFooter(TFileWriter* write); +// handle file corrupt later static int tfileReaderLoadHeader(TFileReader* reader); static int tfileReaderLoadFst(TFileReader* reader); +static int tfileReaderVerify(TFileReader* reader); static int tfileReaderLoadTableIds(TFileReader* reader, int32_t offset, SArray* result); static SArray* tfileGetFileList(const char* path); @@ -138,8 +144,15 @@ TFileReader* tfileReaderCreate(WriterCtx* ctx) { TFileReader* reader = calloc(1, sizeof(TFileReader)); if (reader == NULL) { return NULL; } - // T_REF_INC(reader); reader->ctx = ctx; + + if (0 != tfileReaderVerify(reader)) { + tfileReaderDestroy(reader); + indexError("failed to load index header, suid: %" PRIu64 ", colName: %s", reader->header.suid, + reader->header.colName); + return NULL; + } + // T_REF_INC(reader); if (0 != tfileReaderLoadHeader(reader)) { tfileReaderDestroy(reader); indexError("failed to load index header, suid: %" PRIu64 ", colName: %s", reader->header.suid, @@ -296,6 +309,8 @@ int tfileWriterPut(TFileWriter* tw, void* data, bool order) { fstBuilderFinish(tw->fb); fstBuilderDestroy(tw->fb); tw->fb = NULL; + + tfileWriteFooter(tw); return 0; } void tfileWriterClose(TFileWriter* tw) { @@ -502,6 +517,14 @@ static int tfileWriteData(TFileWriter* write, TFileValue* tval) { } return 0; } +static int tfileWriteFooter(TFileWriter* write) { + char buf[sizeof(tfileMagicNumber) + 1] = {0}; + void* pBuf = (void*)buf; + taosEncodeFixedU64((void**)(void*)&pBuf, tfileMagicNumber); + int nwrite = write->ctx->write(write->ctx, buf, strlen(buf)); + assert(nwrite == sizeof(tfileMagicNumber)); + return nwrite; +} static int tfileReaderLoadHeader(TFileReader* reader) { // TODO simple tfile header later char buf[TFILE_HEADER_SIZE] = {0}; @@ -527,9 +550,11 @@ static int tfileReaderLoadFst(TFileReader* reader) { if (buf == NULL) { return -1; } WriterCtx* ctx = reader->ctx; + int size = ctx->size(ctx); int64_t ts = taosGetTimestampUs(); - int32_t nread = ctx->readFrom(ctx, buf, FST_MAX_SIZE, reader->header.fstOffset); + int32_t nread = + ctx->readFrom(ctx, buf, size - reader->header.fstOffset - sizeof(tfileMagicNumber), reader->header.fstOffset); int64_t cost = taosGetTimestampUs() - ts; indexInfo("nread = %d, and fst offset=%d, filename: %s, size: %d, time cost: %" PRId64 "us", nread, reader->header.fstOffset, ctx->file.buf, ctx->file.size, cost); @@ -561,6 +586,25 @@ static int tfileReaderLoadTableIds(TFileReader* reader, int32_t offset, SArray* free(buf); return 0; } +static int tfileReaderVerify(TFileReader* reader) { + // just validate header and Footer, file corrupted also shuild be verified later + WriterCtx* ctx = reader->ctx; + + uint64_t tMagicNumber = 0; + + char buf[sizeof(tMagicNumber) + 1] = {0}; + int size = ctx->size(ctx); + + if (size < sizeof(tMagicNumber) || size <= sizeof(reader->header)) { + return -1; + } else if (ctx->readFrom(ctx, buf, sizeof(tMagicNumber), size - sizeof(tMagicNumber)) != sizeof(tMagicNumber)) { + return -1; + } + + taosDecodeFixedU64(buf, &tMagicNumber); + return tMagicNumber == tfileMagicNumber ? 0 : -1; +} + void tfileReaderRef(TFileReader* reader) { if (reader == NULL) { return; } int ref = T_REF_INC(reader); From cdaf97cb0e1cb4fa0a8b4be66ffa39ff7049873d Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Sun, 9 Jan 2022 00:06:03 +0800 Subject: [PATCH 3/3] validate tfile --- source/libs/index/src/index_tfile.c | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/source/libs/index/src/index_tfile.c b/source/libs/index/src/index_tfile.c index 6d1dac8e35..4b76402560 100644 --- a/source/libs/index/src/index_tfile.c +++ b/source/libs/index/src/index_tfile.c @@ -148,8 +148,7 @@ TFileReader* tfileReaderCreate(WriterCtx* ctx) { if (0 != tfileReaderVerify(reader)) { tfileReaderDestroy(reader); - indexError("failed to load index header, suid: %" PRIu64 ", colName: %s", reader->header.suid, - reader->header.colName); + indexError("invalid tfile, suid: %" PRIu64 ", colName: %s", reader->header.suid, reader->header.colName); return NULL; } // T_REF_INC(reader);