Merge pull request #9281 from taosdata/feature/index_cache
update tindex write
This commit is contained in:
commit
cd8e91c96c
|
@ -31,6 +31,7 @@ typedef struct WriterCtx {
|
||||||
int (*write)(struct WriterCtx* ctx, uint8_t* buf, int len);
|
int (*write)(struct WriterCtx* ctx, uint8_t* buf, int len);
|
||||||
int (*read)(struct WriterCtx* ctx, uint8_t* buf, int len);
|
int (*read)(struct WriterCtx* ctx, uint8_t* buf, int len);
|
||||||
int (*flush)(struct WriterCtx* ctx);
|
int (*flush)(struct WriterCtx* ctx);
|
||||||
|
int (*readFrom)(struct WriterCtx* ctx, uint8_t* buf, int len, int32_t offset);
|
||||||
WriterType type;
|
WriterType type;
|
||||||
union {
|
union {
|
||||||
struct {
|
struct {
|
||||||
|
@ -48,6 +49,7 @@ typedef struct WriterCtx {
|
||||||
|
|
||||||
static int writeCtxDoWrite(WriterCtx* ctx, uint8_t* buf, int len);
|
static int writeCtxDoWrite(WriterCtx* ctx, uint8_t* buf, int len);
|
||||||
static int writeCtxDoRead(WriterCtx* ctx, uint8_t* buf, int len);
|
static int writeCtxDoRead(WriterCtx* ctx, uint8_t* buf, int len);
|
||||||
|
static int writeCtxDoReadFrom(WriterCtx* ctx, uint8_t* buf, int len, int32_t offset);
|
||||||
static int writeCtxDoFlush(WriterCtx* ctx);
|
static int writeCtxDoFlush(WriterCtx* ctx);
|
||||||
|
|
||||||
WriterCtx* writerCtxCreate(WriterType type, const char* path, bool readOnly, int32_t capacity);
|
WriterCtx* writerCtxCreate(WriterType type, const char* path, bool readOnly, int32_t capacity);
|
||||||
|
|
|
@ -39,6 +39,17 @@ static int writeCtxDoRead(WriterCtx* ctx, uint8_t* buf, int len) {
|
||||||
|
|
||||||
return nRead;
|
return nRead;
|
||||||
}
|
}
|
||||||
|
static int writeCtxDoReadFrom(WriterCtx* ctx, uint8_t* buf, int len, int32_t offset) {
|
||||||
|
int nRead = 0;
|
||||||
|
if (ctx->type == TFile) {
|
||||||
|
tfLseek(ctx->file.fd, offset, 0);
|
||||||
|
nRead = tfRead(ctx->file.fd, buf, len);
|
||||||
|
} else {
|
||||||
|
// refactor later
|
||||||
|
assert(0);
|
||||||
|
}
|
||||||
|
return nRead;
|
||||||
|
}
|
||||||
static int writeCtxDoFlush(WriterCtx* ctx) {
|
static int writeCtxDoFlush(WriterCtx* ctx) {
|
||||||
if (ctx->type == TFile) {
|
if (ctx->type == TFile) {
|
||||||
// tfFsync(ctx->fd);
|
// tfFsync(ctx->fd);
|
||||||
|
@ -73,6 +84,7 @@ WriterCtx* writerCtxCreate(WriterType type, const char* path, bool readOnly, int
|
||||||
ctx->write = writeCtxDoWrite;
|
ctx->write = writeCtxDoWrite;
|
||||||
ctx->read = writeCtxDoRead;
|
ctx->read = writeCtxDoRead;
|
||||||
ctx->flush = writeCtxDoFlush;
|
ctx->flush = writeCtxDoFlush;
|
||||||
|
ctx->readFrom = writeCtxDoReadFrom;
|
||||||
|
|
||||||
ctx->offset = 0;
|
ctx->offset = 0;
|
||||||
ctx->limit = capacity;
|
ctx->limit = capacity;
|
||||||
|
|
|
@ -34,18 +34,19 @@ typedef struct TFileValue {
|
||||||
static int tfileValueCompare(const void* a, const void* b, const void* param);
|
static int tfileValueCompare(const void* a, const void* b, const void* param);
|
||||||
static void tfileSerialTableIdsToBuf(char* buf, SArray* tableIds);
|
static void tfileSerialTableIdsToBuf(char* buf, SArray* tableIds);
|
||||||
|
|
||||||
static int tfileWriteFstOffset(TFileWriter* tw, int32_t offset);
|
|
||||||
static int tfileWriteHeader(TFileWriter* writer);
|
static int tfileWriteHeader(TFileWriter* writer);
|
||||||
|
static int tfileWriteFstOffset(TFileWriter* tw, int32_t offset);
|
||||||
static int tfileWriteData(TFileWriter* write, TFileValue* tval);
|
static int tfileWriteData(TFileWriter* write, TFileValue* tval);
|
||||||
|
|
||||||
static int tfileReadLoadHeader(TFileReader* reader);
|
static int tfileReadLoadHeader(TFileReader* reader);
|
||||||
|
static int tfileReadLoadFst(TFileReader* reader);
|
||||||
|
static int tfileReadLoadTableIds(TFileReader* reader, int32_t offset, SArray* result);
|
||||||
|
|
||||||
static int tfileGetFileList(const char* path, SArray* result);
|
static int tfileGetFileList(const char* path, SArray* result);
|
||||||
static void tfileDestroyFileName(void* elem);
|
static void tfileDestroyFileName(void* elem);
|
||||||
static int tfileCompare(const void* a, const void* b);
|
static int tfileCompare(const void* a, const void* b);
|
||||||
static int tfileParseFileName(const char* filename, uint64_t* suid, int* colId, int* version);
|
static int tfileParseFileName(const char* filename, uint64_t* suid, int* colId, int* version);
|
||||||
static void tfileSerialCacheKey(TFileCacheKey* key, char* buf);
|
static void tfileSerialCacheKey(TFileCacheKey* key, char* buf);
|
||||||
// static tfileGetCompareFunc(uint8_t byte) {}
|
|
||||||
|
|
||||||
TFileCache* tfileCacheCreate(const char* path) {
|
TFileCache* tfileCacheCreate(const char* path) {
|
||||||
TFileCache* tcache = calloc(1, sizeof(TFileCache));
|
TFileCache* tcache = calloc(1, sizeof(TFileCache));
|
||||||
|
@ -73,9 +74,13 @@ TFileCache* tfileCacheCreate(const char* path) {
|
||||||
TFileReader* reader = tfileReaderCreate(wc);
|
TFileReader* reader = tfileReaderCreate(wc);
|
||||||
if (0 != tfileReadLoadHeader(reader)) {
|
if (0 != tfileReadLoadHeader(reader)) {
|
||||||
tfileReaderDestroy(reader);
|
tfileReaderDestroy(reader);
|
||||||
indexError("failed to load index header, index Id: %s", file);
|
indexError("failed to load index header, index file: %s", file);
|
||||||
goto End;
|
goto End;
|
||||||
}
|
}
|
||||||
|
if (0 != tfileReadLoadFst(reader)) {
|
||||||
|
tfileReaderDestroy(reader);
|
||||||
|
indexError("failed to load index fst, index file: %s", file);
|
||||||
|
}
|
||||||
// loader fst and validate it
|
// loader fst and validate it
|
||||||
|
|
||||||
TFileHeader* header = &reader->header;
|
TFileHeader* header = &reader->header;
|
||||||
|
@ -136,25 +141,29 @@ TFileReader* tfileReaderCreate(WriterCtx* ctx) {
|
||||||
void tfileReaderDestroy(TFileReader* reader) {
|
void tfileReaderDestroy(TFileReader* reader) {
|
||||||
if (reader == NULL) { return; }
|
if (reader == NULL) { return; }
|
||||||
// T_REF_INC(reader);
|
// T_REF_INC(reader);
|
||||||
|
fstDestroy(reader->fst);
|
||||||
writerCtxDestroy(reader->ctx);
|
writerCtxDestroy(reader->ctx);
|
||||||
free(reader);
|
free(reader);
|
||||||
}
|
}
|
||||||
|
|
||||||
int tfileReaderSearch(TFileReader* reader, SIndexTermQuery* query, SArray* result) {
|
int tfileReaderSearch(TFileReader* reader, SIndexTermQuery* query, SArray* result) {
|
||||||
SIndexTerm* term = query->term;
|
SIndexTerm* term = query->term;
|
||||||
|
|
||||||
// refactor to callback later
|
// refactor to callback later
|
||||||
if (query->qType == QUERY_TERM) {
|
if (query->qType == QUERY_TERM) {
|
||||||
uint64_t offset;
|
uint64_t offset;
|
||||||
FstSlice key = fstSliceCreate(term->colVal, term->nColVal);
|
FstSlice key = fstSliceCreate(term->colVal, term->nColVal);
|
||||||
if (fstGet(reader->fst, &key, &offset)) {
|
if (fstGet(reader->fst, &key, &offset)) {
|
||||||
//
|
return tfileReadLoadTableIds(reader, offset, result);
|
||||||
} else {
|
} else {
|
||||||
indexInfo("index: %" PRIu64 ", col: %s, colVal: %s, not found in tindex", term->suid, term->colName, term->colVal);
|
indexInfo("index: %" PRIu64 ", col: %s, colVal: %s, not found in tindex", term->suid, term->colName, term->colVal);
|
||||||
}
|
}
|
||||||
return 0;
|
return 0;
|
||||||
} else if (query->qType == QUERY_PREFIX) {
|
} else if (query->qType == QUERY_PREFIX) {
|
||||||
|
// handle later
|
||||||
//
|
//
|
||||||
//
|
} else {
|
||||||
|
// handle later
|
||||||
}
|
}
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -198,13 +207,10 @@ int tfileWriterPut(TFileWriter* tw, void* data) {
|
||||||
TFileValue* v = taosArrayGetP((SArray*)data, i);
|
TFileValue* v = taosArrayGetP((SArray*)data, i);
|
||||||
|
|
||||||
int32_t tbsz = taosArrayGetSize(v->tableId);
|
int32_t tbsz = taosArrayGetSize(v->tableId);
|
||||||
int32_t ttsz = TF_TABLE_TATOAL_SIZE(tbsz);
|
fstOffset += TF_TABLE_TATOAL_SIZE(tbsz);
|
||||||
fstOffset += ttsz;
|
|
||||||
}
|
}
|
||||||
// check result or not
|
// check result or not
|
||||||
tfileWriteFstOffset(tw, fstOffset);
|
tfileWriteFstOffset(tw, fstOffset);
|
||||||
// tw->ctx->header.fstOffset = fstOffset;
|
|
||||||
// tw->ctx->write(tw->ctx, &fstOffset, sizeof(fstOffset));
|
|
||||||
|
|
||||||
for (size_t i = 0; i < sz; i++) {
|
for (size_t i = 0; i < sz; i++) {
|
||||||
TFileValue* v = taosArrayGetP((SArray*)data, i);
|
TFileValue* v = taosArrayGetP((SArray*)data, i);
|
||||||
|
@ -287,11 +293,11 @@ static int tfileValueCompare(const void* a, const void* b, const void* param) {
|
||||||
|
|
||||||
return fn(av->colVal, bv->colVal);
|
return fn(av->colVal, bv->colVal);
|
||||||
}
|
}
|
||||||
static void tfileSerialTableIdsToBuf(char* buf, SArray* tableIds) {
|
static void tfileSerialTableIdsToBuf(char* buf, SArray* ids) {
|
||||||
int tbSz = taosArrayGetSize(tableIds);
|
int sz = taosArrayGetSize(ids);
|
||||||
SERIALIZE_VAR_TO_BUF(buf, tbSz, int32_t);
|
SERIALIZE_VAR_TO_BUF(buf, sz, int32_t);
|
||||||
for (size_t i = 0; i < tbSz; i++) {
|
for (size_t i = 0; i < sz; i++) {
|
||||||
uint64_t* v = taosArrayGet(tableIds, i);
|
uint64_t* v = taosArrayGet(ids, i);
|
||||||
SERIALIZE_VAR_TO_BUF(buf, *v, uint64_t);
|
SERIALIZE_VAR_TO_BUF(buf, *v, uint64_t);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -328,6 +334,7 @@ static int tfileWriteData(TFileWriter* write, TFileValue* tval) {
|
||||||
} else {
|
} else {
|
||||||
// handle other type later
|
// handle other type later
|
||||||
}
|
}
|
||||||
|
return 0;
|
||||||
}
|
}
|
||||||
static int tfileReadLoadHeader(TFileReader* reader) {
|
static int tfileReadLoadHeader(TFileReader* reader) {
|
||||||
// TODO simple tfile header later
|
// TODO simple tfile header later
|
||||||
|
@ -339,6 +346,42 @@ static int tfileReadLoadHeader(TFileReader* reader) {
|
||||||
memcpy(&reader->header, buf, sizeof(buf));
|
memcpy(&reader->header, buf, sizeof(buf));
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
static int tfileReadLoadFst(TFileReader* reader) {
|
||||||
|
// current load fst into memory, refactor it later
|
||||||
|
static int FST_MAX_SIZE = 16 * 1024;
|
||||||
|
|
||||||
|
char* buf = calloc(1, sizeof(char) * FST_MAX_SIZE);
|
||||||
|
if (buf == NULL) { return -1; }
|
||||||
|
|
||||||
|
WriterCtx* ctx = reader->ctx;
|
||||||
|
int32_t nread = ctx->readFrom(ctx, buf, FST_MAX_SIZE, reader->header.fstOffset);
|
||||||
|
// we assuse fst size less than FST_MAX_SIZE
|
||||||
|
assert(nread > 0 && nread < FST_MAX_SIZE);
|
||||||
|
|
||||||
|
FstSlice st = fstSliceCreate((uint8_t*)buf, nread);
|
||||||
|
reader->fst = fstCreate(&st);
|
||||||
|
free(buf);
|
||||||
|
fstSliceDestroy(&st);
|
||||||
|
|
||||||
|
return reader->fst == NULL ? 0 : -1;
|
||||||
|
}
|
||||||
|
static int tfileReadLoadTableIds(TFileReader* reader, int32_t offset, SArray* result) {
|
||||||
|
int32_t nid;
|
||||||
|
WriterCtx* ctx = reader->ctx;
|
||||||
|
int32_t nread = ctx->readFrom(ctx, (char*)&nid, sizeof(nid), offset);
|
||||||
|
assert(sizeof(nid) == nread);
|
||||||
|
|
||||||
|
char* buf = calloc(1, sizeof(uint64_t) * nid);
|
||||||
|
if (buf == NULL) { return -1; }
|
||||||
|
|
||||||
|
nread = ctx->read(ctx, buf, sizeof(uint64_t) * nid);
|
||||||
|
uint64_t* ids = (uint64_t*)buf;
|
||||||
|
for (int32_t i = 0; i < nid; i++) {
|
||||||
|
taosArrayPush(result, ids + i);
|
||||||
|
}
|
||||||
|
free(buf);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
static int tfileGetFileList(const char* path, SArray* result) {
|
static int tfileGetFileList(const char* path, SArray* result) {
|
||||||
DIR* dir = opendir(path);
|
DIR* dir = opendir(path);
|
||||||
|
|
Loading…
Reference in New Issue