From 15c0275333ca305320feb6d3325b97193820fba2 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Sat, 8 Jan 2022 23:58:51 +0800 Subject: [PATCH] validate tfile --- source/libs/index/inc/indexInt.h | 1 + .../index/inc/index_fst_counting_writer.h | 1 + source/libs/index/src/index.c | 5 +- source/libs/index/src/index_cache.c | 2 +- source/libs/index/src/index_fst.c | 5 +- .../index/src/index_fst_counting_writer.c | 15 +++++- source/libs/index/src/index_tfile.c | 48 ++++++++++++++++++- 7 files changed, 71 insertions(+), 6 deletions(-) diff --git a/source/libs/index/inc/indexInt.h b/source/libs/index/inc/indexInt.h index 378af4c1d1..90ad1e15f4 100644 --- a/source/libs/index/inc/indexInt.h +++ b/source/libs/index/inc/indexInt.h @@ -19,6 +19,7 @@ #include "index.h" #include "index_fst.h" #include "taos.h" +#include "tchecksum.h" #include "thash.h" #include "tlog.h" diff --git a/source/libs/index/inc/index_fst_counting_writer.h b/source/libs/index/inc/index_fst_counting_writer.h index d7363f2f4c..1e0a88e17f 100644 --- a/source/libs/index/inc/index_fst_counting_writer.h +++ b/source/libs/index/inc/index_fst_counting_writer.h @@ -34,6 +34,7 @@ typedef struct WriterCtx { int (*read)(struct WriterCtx* ctx, uint8_t* buf, int len); int (*flush)(struct WriterCtx* ctx); int (*readFrom)(struct WriterCtx* ctx, uint8_t* buf, int len, int32_t offset); + int (*size)(struct WriterCtx* ctx); WriterType type; union { struct { diff --git a/source/libs/index/src/index.c b/source/libs/index/src/index.c index 04dac57d8f..19e9375491 100644 --- a/source/libs/index/src/index.c +++ b/source/libs/index/src/index.c @@ -34,7 +34,10 @@ void indexInit() { // refactor later indexQhandle = taosInitScheduler(INDEX_QUEUE_SIZE, INDEX_NUM_OF_THREADS, "index"); } -void indexCleanUp() { taosCleanUpScheduler(indexQhandle); } +void indexCleanUp() { + // refacto later + taosCleanUpScheduler(indexQhandle); +} static int uidCompare(const void* a, const void* b) { // add more version compare diff --git a/source/libs/index/src/index_cache.c b/source/libs/index/src/index_cache.c index 8bc3776ed9..294c8192e8 100644 --- a/source/libs/index/src/index_cache.c +++ b/source/libs/index/src/index_cache.c @@ -21,7 +21,7 @@ #define MAX_INDEX_KEY_LEN 256 // test only, change later #define MEM_TERM_LIMIT 10 * 10000 -#define MEM_THRESHOLD 1024 * 1024 * 2 +#define MEM_THRESHOLD 1024 * 1024 #define MEM_ESTIMATE_RADIO 1.5 static void indexMemRef(MemTable* tbl); diff --git a/source/libs/index/src/index_fst.c b/source/libs/index/src/index_fst.c index bfaeeaaa33..4f782cef26 100644 --- a/source/libs/index/src/index_fst.c +++ b/source/libs/index/src/index_fst.c @@ -935,7 +935,10 @@ Fst* fstCreate(FstSlice* slice) { uint32_t checkSum = 0; len -= sizeof(checkSum); taosDecodeFixedU32(buf + len, &checkSum); - + if (taosCheckChecksum(buf, len, checkSum)) { + // verify fst + return NULL; + } CompiledAddr rootAddr; len -= sizeof(rootAddr); taosDecodeFixedU64(buf + len, &rootAddr); diff --git a/source/libs/index/src/index_fst_counting_writer.c b/source/libs/index/src/index_fst_counting_writer.c index 0763aae857..6db5555aa6 100644 --- a/source/libs/index/src/index_fst_counting_writer.c +++ b/source/libs/index/src/index_fst_counting_writer.c @@ -59,6 +59,13 @@ static int writeCtxDoReadFrom(WriterCtx* ctx, uint8_t* buf, int len, int32_t off } return nRead; } +static int writeCtxGetSize(WriterCtx* ctx) { + if (ctx->type == TFile && ctx->file.readOnly) { + // refactor later + return ctx->file.size; + } + return 0; +} static int writeCtxDoFlush(WriterCtx* ctx) { if (ctx->type == TFile) { // taosFsyncFile(ctx->file.fd); @@ -109,6 +116,7 @@ WriterCtx* writerCtxCreate(WriterType type, const char* path, bool readOnly, int ctx->read = writeCtxDoRead; ctx->flush = writeCtxDoFlush; ctx->readFrom = writeCtxDoReadFrom; + ctx->size = writeCtxGetSize; ctx->offset = 0; ctx->limit = capacity; @@ -159,6 +167,8 @@ int fstCountingWriterWrite(FstCountingWriter* write, uint8_t* buf, uint32_t len) int nWrite = ctx->write(ctx, buf, len); assert(nWrite == len); write->count += len; + + write->summer = taosCalcChecksum(write->summer, buf, len); return len; } int fstCountingWriterRead(FstCountingWriter* write, uint8_t* buf, uint32_t len) { @@ -169,7 +179,10 @@ int fstCountingWriterRead(FstCountingWriter* write, uint8_t* buf, uint32_t len) return nRead; } -uint32_t fstCountingWriterMaskedCheckSum(FstCountingWriter* write) { return 0; } +uint32_t fstCountingWriterMaskedCheckSum(FstCountingWriter* write) { + // opt + return write->summer; +} int fstCountingWriterFlush(FstCountingWriter* write) { WriterCtx* ctx = write->wrt; diff --git a/source/libs/index/src/index_tfile.c b/source/libs/index/src/index_tfile.c index 6b7d8a4082..6d1dac8e35 100644 --- a/source/libs/index/src/index_tfile.c +++ b/source/libs/index/src/index_tfile.c @@ -21,8 +21,11 @@ p * #include "index_fst_counting_writer.h" #include "index_util.h" #include "taosdef.h" +#include "tcoding.h" #include "tcompare.h" +const static uint64_t tfileMagicNumber = 0xdb4775248b80fb57ull; + typedef struct TFileFstIter { FstStreamBuilder* fb; StreamWithState* st; @@ -40,9 +43,12 @@ static void tfileSerialTableIdsToBuf(char* buf, SArray* tableIds); static int tfileWriteHeader(TFileWriter* writer); static int tfileWriteFstOffset(TFileWriter* tw, int32_t offset); static int tfileWriteData(TFileWriter* write, TFileValue* tval); +static int tfileWriteFooter(TFileWriter* write); +// handle file corrupt later static int tfileReaderLoadHeader(TFileReader* reader); static int tfileReaderLoadFst(TFileReader* reader); +static int tfileReaderVerify(TFileReader* reader); static int tfileReaderLoadTableIds(TFileReader* reader, int32_t offset, SArray* result); static SArray* tfileGetFileList(const char* path); @@ -138,8 +144,15 @@ TFileReader* tfileReaderCreate(WriterCtx* ctx) { TFileReader* reader = calloc(1, sizeof(TFileReader)); if (reader == NULL) { return NULL; } - // T_REF_INC(reader); reader->ctx = ctx; + + if (0 != tfileReaderVerify(reader)) { + tfileReaderDestroy(reader); + indexError("failed to load index header, suid: %" PRIu64 ", colName: %s", reader->header.suid, + reader->header.colName); + return NULL; + } + // T_REF_INC(reader); if (0 != tfileReaderLoadHeader(reader)) { tfileReaderDestroy(reader); indexError("failed to load index header, suid: %" PRIu64 ", colName: %s", reader->header.suid, @@ -296,6 +309,8 @@ int tfileWriterPut(TFileWriter* tw, void* data, bool order) { fstBuilderFinish(tw->fb); fstBuilderDestroy(tw->fb); tw->fb = NULL; + + tfileWriteFooter(tw); return 0; } void tfileWriterClose(TFileWriter* tw) { @@ -502,6 +517,14 @@ static int tfileWriteData(TFileWriter* write, TFileValue* tval) { } return 0; } +static int tfileWriteFooter(TFileWriter* write) { + char buf[sizeof(tfileMagicNumber) + 1] = {0}; + void* pBuf = (void*)buf; + taosEncodeFixedU64((void**)(void*)&pBuf, tfileMagicNumber); + int nwrite = write->ctx->write(write->ctx, buf, strlen(buf)); + assert(nwrite == sizeof(tfileMagicNumber)); + return nwrite; +} static int tfileReaderLoadHeader(TFileReader* reader) { // TODO simple tfile header later char buf[TFILE_HEADER_SIZE] = {0}; @@ -527,9 +550,11 @@ static int tfileReaderLoadFst(TFileReader* reader) { if (buf == NULL) { return -1; } WriterCtx* ctx = reader->ctx; + int size = ctx->size(ctx); int64_t ts = taosGetTimestampUs(); - int32_t nread = ctx->readFrom(ctx, buf, FST_MAX_SIZE, reader->header.fstOffset); + int32_t nread = + ctx->readFrom(ctx, buf, size - reader->header.fstOffset - sizeof(tfileMagicNumber), reader->header.fstOffset); int64_t cost = taosGetTimestampUs() - ts; indexInfo("nread = %d, and fst offset=%d, filename: %s, size: %d, time cost: %" PRId64 "us", nread, reader->header.fstOffset, ctx->file.buf, ctx->file.size, cost); @@ -561,6 +586,25 @@ static int tfileReaderLoadTableIds(TFileReader* reader, int32_t offset, SArray* free(buf); return 0; } +static int tfileReaderVerify(TFileReader* reader) { + // just validate header and Footer, file corrupted also shuild be verified later + WriterCtx* ctx = reader->ctx; + + uint64_t tMagicNumber = 0; + + char buf[sizeof(tMagicNumber) + 1] = {0}; + int size = ctx->size(ctx); + + if (size < sizeof(tMagicNumber) || size <= sizeof(reader->header)) { + return -1; + } else if (ctx->readFrom(ctx, buf, sizeof(tMagicNumber), size - sizeof(tMagicNumber)) != sizeof(tMagicNumber)) { + return -1; + } + + taosDecodeFixedU64(buf, &tMagicNumber); + return tMagicNumber == tfileMagicNumber ? 0 : -1; +} + void tfileReaderRef(TFileReader* reader) { if (reader == NULL) { return; } int ref = T_REF_INC(reader);