Merge pull request #9691 from taosdata/feature/index_complete
add tfile analysis tool
This commit is contained in:
commit
d861c05468
|
@ -19,6 +19,7 @@
|
|||
#include "index.h"
|
||||
#include "index_fst.h"
|
||||
#include "taos.h"
|
||||
#include "tchecksum.h"
|
||||
#include "thash.h"
|
||||
#include "tlog.h"
|
||||
|
||||
|
|
|
@ -34,6 +34,7 @@ typedef struct WriterCtx {
|
|||
int (*read)(struct WriterCtx* ctx, uint8_t* buf, int len);
|
||||
int (*flush)(struct WriterCtx* ctx);
|
||||
int (*readFrom)(struct WriterCtx* ctx, uint8_t* buf, int len, int32_t offset);
|
||||
int (*size)(struct WriterCtx* ctx);
|
||||
WriterType type;
|
||||
union {
|
||||
struct {
|
||||
|
|
|
@ -34,7 +34,10 @@ void indexInit() {
|
|||
// refactor later
|
||||
indexQhandle = taosInitScheduler(INDEX_QUEUE_SIZE, INDEX_NUM_OF_THREADS, "index");
|
||||
}
|
||||
void indexCleanUp() { taosCleanUpScheduler(indexQhandle); }
|
||||
void indexCleanUp() {
|
||||
// refacto later
|
||||
taosCleanUpScheduler(indexQhandle);
|
||||
}
|
||||
|
||||
static int uidCompare(const void* a, const void* b) {
|
||||
// add more version compare
|
||||
|
|
|
@ -21,7 +21,7 @@
|
|||
#define MAX_INDEX_KEY_LEN 256 // test only, change later
|
||||
|
||||
#define MEM_TERM_LIMIT 10 * 10000
|
||||
#define MEM_THRESHOLD 1024 * 1024 * 2
|
||||
#define MEM_THRESHOLD 1024 * 1024
|
||||
#define MEM_ESTIMATE_RADIO 1.5
|
||||
|
||||
static void indexMemRef(MemTable* tbl);
|
||||
|
|
|
@ -935,7 +935,10 @@ Fst* fstCreate(FstSlice* slice) {
|
|||
uint32_t checkSum = 0;
|
||||
len -= sizeof(checkSum);
|
||||
taosDecodeFixedU32(buf + len, &checkSum);
|
||||
|
||||
if (taosCheckChecksum(buf, len, checkSum)) {
|
||||
// verify fst
|
||||
return NULL;
|
||||
}
|
||||
CompiledAddr rootAddr;
|
||||
len -= sizeof(rootAddr);
|
||||
taosDecodeFixedU64(buf + len, &rootAddr);
|
||||
|
|
|
@ -59,6 +59,13 @@ static int writeCtxDoReadFrom(WriterCtx* ctx, uint8_t* buf, int len, int32_t off
|
|||
}
|
||||
return nRead;
|
||||
}
|
||||
static int writeCtxGetSize(WriterCtx* ctx) {
|
||||
if (ctx->type == TFile && ctx->file.readOnly) {
|
||||
// refactor later
|
||||
return ctx->file.size;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
static int writeCtxDoFlush(WriterCtx* ctx) {
|
||||
if (ctx->type == TFile) {
|
||||
// taosFsyncFile(ctx->file.fd);
|
||||
|
@ -109,6 +116,7 @@ WriterCtx* writerCtxCreate(WriterType type, const char* path, bool readOnly, int
|
|||
ctx->read = writeCtxDoRead;
|
||||
ctx->flush = writeCtxDoFlush;
|
||||
ctx->readFrom = writeCtxDoReadFrom;
|
||||
ctx->size = writeCtxGetSize;
|
||||
|
||||
ctx->offset = 0;
|
||||
ctx->limit = capacity;
|
||||
|
@ -159,6 +167,8 @@ int fstCountingWriterWrite(FstCountingWriter* write, uint8_t* buf, uint32_t len)
|
|||
int nWrite = ctx->write(ctx, buf, len);
|
||||
assert(nWrite == len);
|
||||
write->count += len;
|
||||
|
||||
write->summer = taosCalcChecksum(write->summer, buf, len);
|
||||
return len;
|
||||
}
|
||||
int fstCountingWriterRead(FstCountingWriter* write, uint8_t* buf, uint32_t len) {
|
||||
|
@ -169,7 +179,10 @@ int fstCountingWriterRead(FstCountingWriter* write, uint8_t* buf, uint32_t len)
|
|||
return nRead;
|
||||
}
|
||||
|
||||
uint32_t fstCountingWriterMaskedCheckSum(FstCountingWriter* write) { return 0; }
|
||||
uint32_t fstCountingWriterMaskedCheckSum(FstCountingWriter* write) {
|
||||
// opt
|
||||
return write->summer;
|
||||
}
|
||||
|
||||
int fstCountingWriterFlush(FstCountingWriter* write) {
|
||||
WriterCtx* ctx = write->wrt;
|
||||
|
|
|
@ -21,8 +21,11 @@ p *
|
|||
#include "index_fst_counting_writer.h"
|
||||
#include "index_util.h"
|
||||
#include "taosdef.h"
|
||||
#include "tcoding.h"
|
||||
#include "tcompare.h"
|
||||
|
||||
const static uint64_t tfileMagicNumber = 0xdb4775248b80fb57ull;
|
||||
|
||||
typedef struct TFileFstIter {
|
||||
FstStreamBuilder* fb;
|
||||
StreamWithState* st;
|
||||
|
@ -40,9 +43,12 @@ static void tfileSerialTableIdsToBuf(char* buf, SArray* tableIds);
|
|||
static int tfileWriteHeader(TFileWriter* writer);
|
||||
static int tfileWriteFstOffset(TFileWriter* tw, int32_t offset);
|
||||
static int tfileWriteData(TFileWriter* write, TFileValue* tval);
|
||||
static int tfileWriteFooter(TFileWriter* write);
|
||||
|
||||
// handle file corrupt later
|
||||
static int tfileReaderLoadHeader(TFileReader* reader);
|
||||
static int tfileReaderLoadFst(TFileReader* reader);
|
||||
static int tfileReaderVerify(TFileReader* reader);
|
||||
static int tfileReaderLoadTableIds(TFileReader* reader, int32_t offset, SArray* result);
|
||||
|
||||
static SArray* tfileGetFileList(const char* path);
|
||||
|
@ -138,8 +144,14 @@ TFileReader* tfileReaderCreate(WriterCtx* ctx) {
|
|||
TFileReader* reader = calloc(1, sizeof(TFileReader));
|
||||
if (reader == NULL) { return NULL; }
|
||||
|
||||
// T_REF_INC(reader);
|
||||
reader->ctx = ctx;
|
||||
|
||||
if (0 != tfileReaderVerify(reader)) {
|
||||
tfileReaderDestroy(reader);
|
||||
indexError("invalid tfile, suid: %" PRIu64 ", colName: %s", reader->header.suid, reader->header.colName);
|
||||
return NULL;
|
||||
}
|
||||
// T_REF_INC(reader);
|
||||
if (0 != tfileReaderLoadHeader(reader)) {
|
||||
tfileReaderDestroy(reader);
|
||||
indexError("failed to load index header, suid: %" PRIu64 ", colName: %s", reader->header.suid,
|
||||
|
@ -296,6 +308,8 @@ int tfileWriterPut(TFileWriter* tw, void* data, bool order) {
|
|||
fstBuilderFinish(tw->fb);
|
||||
fstBuilderDestroy(tw->fb);
|
||||
tw->fb = NULL;
|
||||
|
||||
tfileWriteFooter(tw);
|
||||
return 0;
|
||||
}
|
||||
void tfileWriterClose(TFileWriter* tw) {
|
||||
|
@ -502,6 +516,14 @@ static int tfileWriteData(TFileWriter* write, TFileValue* tval) {
|
|||
}
|
||||
return 0;
|
||||
}
|
||||
static int tfileWriteFooter(TFileWriter* write) {
|
||||
char buf[sizeof(tfileMagicNumber) + 1] = {0};
|
||||
void* pBuf = (void*)buf;
|
||||
taosEncodeFixedU64((void**)(void*)&pBuf, tfileMagicNumber);
|
||||
int nwrite = write->ctx->write(write->ctx, buf, strlen(buf));
|
||||
assert(nwrite == sizeof(tfileMagicNumber));
|
||||
return nwrite;
|
||||
}
|
||||
static int tfileReaderLoadHeader(TFileReader* reader) {
|
||||
// TODO simple tfile header later
|
||||
char buf[TFILE_HEADER_SIZE] = {0};
|
||||
|
@ -527,9 +549,14 @@ static int tfileReaderLoadFst(TFileReader* reader) {
|
|||
if (buf == NULL) { return -1; }
|
||||
|
||||
WriterCtx* ctx = reader->ctx;
|
||||
int32_t nread = ctx->readFrom(ctx, buf, FST_MAX_SIZE, reader->header.fstOffset);
|
||||
indexInfo("nread = %d, and fst offset=%d, filename: %s, size: %d ", nread, reader->header.fstOffset, ctx->file.buf,
|
||||
ctx->file.size);
|
||||
int size = ctx->size(ctx);
|
||||
|
||||
int64_t ts = taosGetTimestampUs();
|
||||
int32_t nread =
|
||||
ctx->readFrom(ctx, buf, size - reader->header.fstOffset - sizeof(tfileMagicNumber), reader->header.fstOffset);
|
||||
int64_t cost = taosGetTimestampUs() - ts;
|
||||
indexInfo("nread = %d, and fst offset=%d, filename: %s, size: %d, time cost: %" PRId64 "us", nread,
|
||||
reader->header.fstOffset, ctx->file.buf, ctx->file.size, cost);
|
||||
// we assuse fst size less than FST_MAX_SIZE
|
||||
assert(nread > 0 && nread < FST_MAX_SIZE);
|
||||
|
||||
|
@ -558,6 +585,25 @@ static int tfileReaderLoadTableIds(TFileReader* reader, int32_t offset, SArray*
|
|||
free(buf);
|
||||
return 0;
|
||||
}
|
||||
static int tfileReaderVerify(TFileReader* reader) {
|
||||
// just validate header and Footer, file corrupted also shuild be verified later
|
||||
WriterCtx* ctx = reader->ctx;
|
||||
|
||||
uint64_t tMagicNumber = 0;
|
||||
|
||||
char buf[sizeof(tMagicNumber) + 1] = {0};
|
||||
int size = ctx->size(ctx);
|
||||
|
||||
if (size < sizeof(tMagicNumber) || size <= sizeof(reader->header)) {
|
||||
return -1;
|
||||
} else if (ctx->readFrom(ctx, buf, sizeof(tMagicNumber), size - sizeof(tMagicNumber)) != sizeof(tMagicNumber)) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
taosDecodeFixedU64(buf, &tMagicNumber);
|
||||
return tMagicNumber == tfileMagicNumber ? 0 : -1;
|
||||
}
|
||||
|
||||
void tfileReaderRef(TFileReader* reader) {
|
||||
if (reader == NULL) { return; }
|
||||
int ref = T_REF_INC(reader);
|
||||
|
|
|
@ -1,4 +1,5 @@
|
|||
|
||||
#include <algorithm>
|
||||
#include <iostream>
|
||||
#include <string>
|
||||
#include <thread>
|
||||
|
@ -12,7 +13,6 @@
|
|||
#include "index_tfile.h"
|
||||
#include "tskiplist.h"
|
||||
#include "tutil.h"
|
||||
|
||||
void* callback(void* s) { return s; }
|
||||
|
||||
static std::string fileName = "/tmp/tindex.tindex";
|
||||
|
@ -293,7 +293,7 @@ void validateTFile(char* arg) {
|
|||
|
||||
std::thread threads[NUM_OF_THREAD];
|
||||
// std::vector<std::thread> threads;
|
||||
TFileReader* reader = tfileReaderOpen(arg, 0, 999992, "tag1");
|
||||
TFileReader* reader = tfileReaderOpen(arg, 0, 20000000, "tag1");
|
||||
|
||||
for (int i = 0; i < NUM_OF_THREAD; i++) {
|
||||
threads[i] = std::thread(fst_get, reader->fst);
|
||||
|
@ -306,13 +306,41 @@ void validateTFile(char* arg) {
|
|||
}
|
||||
tfCleanup();
|
||||
}
|
||||
|
||||
void iterTFileReader(char* path, char* ver) {
|
||||
tfInit();
|
||||
|
||||
int version = atoi(ver);
|
||||
TFileReader* reader = tfileReaderOpen(path, 0, version, "tag1");
|
||||
Iterate* iter = tfileIteratorCreate(reader);
|
||||
bool tn = iter ? iter->next(iter) : false;
|
||||
int count = 0;
|
||||
int termCount = 0;
|
||||
while (tn == true) {
|
||||
count++;
|
||||
IterateValue* cv = iter->getValue(iter);
|
||||
termCount += (int)taosArrayGetSize(cv->val);
|
||||
printf("col val: %s, size: %d\n", cv->colVal, (int)taosArrayGetSize(cv->val));
|
||||
tn = iter->next(iter);
|
||||
}
|
||||
printf("total size: %d\n term count: %d\n", count, termCount);
|
||||
|
||||
tfileIteratorDestroy(iter);
|
||||
tfCleanup();
|
||||
}
|
||||
|
||||
int main(int argc, char* argv[]) {
|
||||
// tool to check all kind of fst test
|
||||
// if (argc > 1) { validateTFile(argv[1]); }
|
||||
if (argc > 2) {
|
||||
// opt
|
||||
iterTFileReader(argv[1], argv[2]);
|
||||
}
|
||||
// checkFstCheckIterator();
|
||||
// checkFstLongTerm();
|
||||
// checkFstPrefixSearch();
|
||||
|
||||
checkMillonWriteAndReadOfFst();
|
||||
// checkMillonWriteAndReadOfFst();
|
||||
|
||||
return 1;
|
||||
}
|
||||
|
|
|
@ -665,14 +665,19 @@ class IndexObj {
|
|||
size_t numOfTable = 100 * 10000) {
|
||||
std::string tColVal = colVal;
|
||||
size_t colValSize = tColVal.size();
|
||||
int skip = 100;
|
||||
numOfTable /= skip;
|
||||
for (int i = 0; i < numOfTable; i++) {
|
||||
tColVal[i % colValSize] = 'a' + i % 26;
|
||||
for (int k = 0; k < 10 && k < colVal.size(); k++) {
|
||||
// opt
|
||||
tColVal[rand() % colValSize] = 'a' + k % 26;
|
||||
}
|
||||
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
|
||||
tColVal.c_str(), tColVal.size());
|
||||
SIndexMultiTerm* terms = indexMultiTermCreate();
|
||||
indexMultiTermAdd(terms, term);
|
||||
for (size_t i = 0; i < 10; i++) {
|
||||
int ret = Put(terms, i);
|
||||
for (size_t j = 0; j < skip; j++) {
|
||||
int ret = Put(terms, j);
|
||||
assert(ret == 0);
|
||||
}
|
||||
indexMultiTermDestroy(terms);
|
||||
|
@ -939,10 +944,11 @@ TEST_F(IndexEnv2, testIndex_read_performance) {
|
|||
TEST_F(IndexEnv2, testIndexMultiTag) {
|
||||
std::string path = "/tmp/multi_tag";
|
||||
if (index->Init(path) != 0) {}
|
||||
index->WriteMultiMillonData("tag1", "Hello", 100 * 10000);
|
||||
index->WriteMultiMillonData("tag2", "Test", 100 * 10000);
|
||||
index->WriteMultiMillonData("tag3", "Test", 100 * 10000);
|
||||
index->WriteMultiMillonData("tag4", "Test", 100 * 10000);
|
||||
int64_t st = taosGetTimestampUs();
|
||||
int32_t num = 1000 * 10000;
|
||||
index->WriteMultiMillonData("tag1", "xxxxxxxxxxxxxxx", num);
|
||||
std::cout << "numOfRow: " << num << "\ttime cost:" << taosGetTimestampUs() - st << std::endl;
|
||||
// index->WriteMultiMillonData("tag2", "xxxxxxxxxxxxxxxxxxxxxxxxx", 100 * 10000);
|
||||
}
|
||||
TEST_F(IndexEnv2, testLongComVal) {
|
||||
std::string path = "/tmp/long_colVal";
|
||||
|
|
Loading…
Reference in New Issue