validate tfile
This commit is contained in:
parent
2a17fa3948
commit
15c0275333
|
@ -19,6 +19,7 @@
|
|||
#include "index.h"
|
||||
#include "index_fst.h"
|
||||
#include "taos.h"
|
||||
#include "tchecksum.h"
|
||||
#include "thash.h"
|
||||
#include "tlog.h"
|
||||
|
||||
|
|
|
@ -34,6 +34,7 @@ typedef struct WriterCtx {
|
|||
int (*read)(struct WriterCtx* ctx, uint8_t* buf, int len);
|
||||
int (*flush)(struct WriterCtx* ctx);
|
||||
int (*readFrom)(struct WriterCtx* ctx, uint8_t* buf, int len, int32_t offset);
|
||||
int (*size)(struct WriterCtx* ctx);
|
||||
WriterType type;
|
||||
union {
|
||||
struct {
|
||||
|
|
|
@ -34,7 +34,10 @@ void indexInit() {
|
|||
// refactor later
|
||||
indexQhandle = taosInitScheduler(INDEX_QUEUE_SIZE, INDEX_NUM_OF_THREADS, "index");
|
||||
}
|
||||
void indexCleanUp() { taosCleanUpScheduler(indexQhandle); }
|
||||
void indexCleanUp() {
|
||||
// refacto later
|
||||
taosCleanUpScheduler(indexQhandle);
|
||||
}
|
||||
|
||||
static int uidCompare(const void* a, const void* b) {
|
||||
// add more version compare
|
||||
|
|
|
@ -21,7 +21,7 @@
|
|||
#define MAX_INDEX_KEY_LEN 256 // test only, change later
|
||||
|
||||
#define MEM_TERM_LIMIT 10 * 10000
|
||||
#define MEM_THRESHOLD 1024 * 1024 * 2
|
||||
#define MEM_THRESHOLD 1024 * 1024
|
||||
#define MEM_ESTIMATE_RADIO 1.5
|
||||
|
||||
static void indexMemRef(MemTable* tbl);
|
||||
|
|
|
@ -935,7 +935,10 @@ Fst* fstCreate(FstSlice* slice) {
|
|||
uint32_t checkSum = 0;
|
||||
len -= sizeof(checkSum);
|
||||
taosDecodeFixedU32(buf + len, &checkSum);
|
||||
|
||||
if (taosCheckChecksum(buf, len, checkSum)) {
|
||||
// verify fst
|
||||
return NULL;
|
||||
}
|
||||
CompiledAddr rootAddr;
|
||||
len -= sizeof(rootAddr);
|
||||
taosDecodeFixedU64(buf + len, &rootAddr);
|
||||
|
|
|
@ -59,6 +59,13 @@ static int writeCtxDoReadFrom(WriterCtx* ctx, uint8_t* buf, int len, int32_t off
|
|||
}
|
||||
return nRead;
|
||||
}
|
||||
static int writeCtxGetSize(WriterCtx* ctx) {
|
||||
if (ctx->type == TFile && ctx->file.readOnly) {
|
||||
// refactor later
|
||||
return ctx->file.size;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
static int writeCtxDoFlush(WriterCtx* ctx) {
|
||||
if (ctx->type == TFile) {
|
||||
// taosFsyncFile(ctx->file.fd);
|
||||
|
@ -109,6 +116,7 @@ WriterCtx* writerCtxCreate(WriterType type, const char* path, bool readOnly, int
|
|||
ctx->read = writeCtxDoRead;
|
||||
ctx->flush = writeCtxDoFlush;
|
||||
ctx->readFrom = writeCtxDoReadFrom;
|
||||
ctx->size = writeCtxGetSize;
|
||||
|
||||
ctx->offset = 0;
|
||||
ctx->limit = capacity;
|
||||
|
@ -159,6 +167,8 @@ int fstCountingWriterWrite(FstCountingWriter* write, uint8_t* buf, uint32_t len)
|
|||
int nWrite = ctx->write(ctx, buf, len);
|
||||
assert(nWrite == len);
|
||||
write->count += len;
|
||||
|
||||
write->summer = taosCalcChecksum(write->summer, buf, len);
|
||||
return len;
|
||||
}
|
||||
int fstCountingWriterRead(FstCountingWriter* write, uint8_t* buf, uint32_t len) {
|
||||
|
@ -169,7 +179,10 @@ int fstCountingWriterRead(FstCountingWriter* write, uint8_t* buf, uint32_t len)
|
|||
return nRead;
|
||||
}
|
||||
|
||||
uint32_t fstCountingWriterMaskedCheckSum(FstCountingWriter* write) { return 0; }
|
||||
uint32_t fstCountingWriterMaskedCheckSum(FstCountingWriter* write) {
|
||||
// opt
|
||||
return write->summer;
|
||||
}
|
||||
|
||||
int fstCountingWriterFlush(FstCountingWriter* write) {
|
||||
WriterCtx* ctx = write->wrt;
|
||||
|
|
|
@ -21,8 +21,11 @@ p *
|
|||
#include "index_fst_counting_writer.h"
|
||||
#include "index_util.h"
|
||||
#include "taosdef.h"
|
||||
#include "tcoding.h"
|
||||
#include "tcompare.h"
|
||||
|
||||
const static uint64_t tfileMagicNumber = 0xdb4775248b80fb57ull;
|
||||
|
||||
typedef struct TFileFstIter {
|
||||
FstStreamBuilder* fb;
|
||||
StreamWithState* st;
|
||||
|
@ -40,9 +43,12 @@ static void tfileSerialTableIdsToBuf(char* buf, SArray* tableIds);
|
|||
static int tfileWriteHeader(TFileWriter* writer);
|
||||
static int tfileWriteFstOffset(TFileWriter* tw, int32_t offset);
|
||||
static int tfileWriteData(TFileWriter* write, TFileValue* tval);
|
||||
static int tfileWriteFooter(TFileWriter* write);
|
||||
|
||||
// handle file corrupt later
|
||||
static int tfileReaderLoadHeader(TFileReader* reader);
|
||||
static int tfileReaderLoadFst(TFileReader* reader);
|
||||
static int tfileReaderVerify(TFileReader* reader);
|
||||
static int tfileReaderLoadTableIds(TFileReader* reader, int32_t offset, SArray* result);
|
||||
|
||||
static SArray* tfileGetFileList(const char* path);
|
||||
|
@ -138,8 +144,15 @@ TFileReader* tfileReaderCreate(WriterCtx* ctx) {
|
|||
TFileReader* reader = calloc(1, sizeof(TFileReader));
|
||||
if (reader == NULL) { return NULL; }
|
||||
|
||||
// T_REF_INC(reader);
|
||||
reader->ctx = ctx;
|
||||
|
||||
if (0 != tfileReaderVerify(reader)) {
|
||||
tfileReaderDestroy(reader);
|
||||
indexError("failed to load index header, suid: %" PRIu64 ", colName: %s", reader->header.suid,
|
||||
reader->header.colName);
|
||||
return NULL;
|
||||
}
|
||||
// T_REF_INC(reader);
|
||||
if (0 != tfileReaderLoadHeader(reader)) {
|
||||
tfileReaderDestroy(reader);
|
||||
indexError("failed to load index header, suid: %" PRIu64 ", colName: %s", reader->header.suid,
|
||||
|
@ -296,6 +309,8 @@ int tfileWriterPut(TFileWriter* tw, void* data, bool order) {
|
|||
fstBuilderFinish(tw->fb);
|
||||
fstBuilderDestroy(tw->fb);
|
||||
tw->fb = NULL;
|
||||
|
||||
tfileWriteFooter(tw);
|
||||
return 0;
|
||||
}
|
||||
void tfileWriterClose(TFileWriter* tw) {
|
||||
|
@ -502,6 +517,14 @@ static int tfileWriteData(TFileWriter* write, TFileValue* tval) {
|
|||
}
|
||||
return 0;
|
||||
}
|
||||
static int tfileWriteFooter(TFileWriter* write) {
|
||||
char buf[sizeof(tfileMagicNumber) + 1] = {0};
|
||||
void* pBuf = (void*)buf;
|
||||
taosEncodeFixedU64((void**)(void*)&pBuf, tfileMagicNumber);
|
||||
int nwrite = write->ctx->write(write->ctx, buf, strlen(buf));
|
||||
assert(nwrite == sizeof(tfileMagicNumber));
|
||||
return nwrite;
|
||||
}
|
||||
static int tfileReaderLoadHeader(TFileReader* reader) {
|
||||
// TODO simple tfile header later
|
||||
char buf[TFILE_HEADER_SIZE] = {0};
|
||||
|
@ -527,9 +550,11 @@ static int tfileReaderLoadFst(TFileReader* reader) {
|
|||
if (buf == NULL) { return -1; }
|
||||
|
||||
WriterCtx* ctx = reader->ctx;
|
||||
int size = ctx->size(ctx);
|
||||
|
||||
int64_t ts = taosGetTimestampUs();
|
||||
int32_t nread = ctx->readFrom(ctx, buf, FST_MAX_SIZE, reader->header.fstOffset);
|
||||
int32_t nread =
|
||||
ctx->readFrom(ctx, buf, size - reader->header.fstOffset - sizeof(tfileMagicNumber), reader->header.fstOffset);
|
||||
int64_t cost = taosGetTimestampUs() - ts;
|
||||
indexInfo("nread = %d, and fst offset=%d, filename: %s, size: %d, time cost: %" PRId64 "us", nread,
|
||||
reader->header.fstOffset, ctx->file.buf, ctx->file.size, cost);
|
||||
|
@ -561,6 +586,25 @@ static int tfileReaderLoadTableIds(TFileReader* reader, int32_t offset, SArray*
|
|||
free(buf);
|
||||
return 0;
|
||||
}
|
||||
static int tfileReaderVerify(TFileReader* reader) {
|
||||
// just validate header and Footer, file corrupted also shuild be verified later
|
||||
WriterCtx* ctx = reader->ctx;
|
||||
|
||||
uint64_t tMagicNumber = 0;
|
||||
|
||||
char buf[sizeof(tMagicNumber) + 1] = {0};
|
||||
int size = ctx->size(ctx);
|
||||
|
||||
if (size < sizeof(tMagicNumber) || size <= sizeof(reader->header)) {
|
||||
return -1;
|
||||
} else if (ctx->readFrom(ctx, buf, sizeof(tMagicNumber), size - sizeof(tMagicNumber)) != sizeof(tMagicNumber)) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
taosDecodeFixedU64(buf, &tMagicNumber);
|
||||
return tMagicNumber == tfileMagicNumber ? 0 : -1;
|
||||
}
|
||||
|
||||
void tfileReaderRef(TFileReader* reader) {
|
||||
if (reader == NULL) { return; }
|
||||
int ref = T_REF_INC(reader);
|
||||
|
|
Loading…
Reference in New Issue