diff --git a/source/libs/index/inc/index_fst_counting_writer.h b/source/libs/index/inc/index_fst_counting_writer.h index db1f063af3..ba0bd52e10 100644 --- a/source/libs/index/inc/index_fst_counting_writer.h +++ b/source/libs/index/inc/index_fst_counting_writer.h @@ -31,6 +31,7 @@ typedef struct WriterCtx { int (*write)(struct WriterCtx* ctx, uint8_t* buf, int len); int (*read)(struct WriterCtx* ctx, uint8_t* buf, int len); int (*flush)(struct WriterCtx* ctx); + int (*readFrom)(struct WriterCtx* ctx, uint8_t* buf, int len, int32_t offset); WriterType type; union { struct { @@ -48,6 +49,7 @@ typedef struct WriterCtx { static int writeCtxDoWrite(WriterCtx* ctx, uint8_t* buf, int len); static int writeCtxDoRead(WriterCtx* ctx, uint8_t* buf, int len); +static int writeCtxDoReadFrom(WriterCtx* ctx, uint8_t* buf, int len, int32_t offset); static int writeCtxDoFlush(WriterCtx* ctx); WriterCtx* writerCtxCreate(WriterType type, const char* path, bool readOnly, int32_t capacity); diff --git a/source/libs/index/src/index_fst_counting_writer.c b/source/libs/index/src/index_fst_counting_writer.c index 60d717efc6..962973bb5c 100644 --- a/source/libs/index/src/index_fst_counting_writer.c +++ b/source/libs/index/src/index_fst_counting_writer.c @@ -39,6 +39,17 @@ static int writeCtxDoRead(WriterCtx* ctx, uint8_t* buf, int len) { return nRead; } +static int writeCtxDoReadFrom(WriterCtx* ctx, uint8_t* buf, int len, int32_t offset) { + int nRead = 0; + if (ctx->type == TFile) { + tfLseek(ctx->file.fd, offset, 0); + nRead = tfRead(ctx->file.fd, buf, len); + } else { + // refactor later + assert(0); + } + return nRead; +} static int writeCtxDoFlush(WriterCtx* ctx) { if (ctx->type == TFile) { // tfFsync(ctx->fd); @@ -73,6 +84,7 @@ WriterCtx* writerCtxCreate(WriterType type, const char* path, bool readOnly, int ctx->write = writeCtxDoWrite; ctx->read = writeCtxDoRead; ctx->flush = writeCtxDoFlush; + ctx->readFrom = writeCtxDoReadFrom; ctx->offset = 0; ctx->limit = capacity; diff --git a/source/libs/index/src/index_tfile.c b/source/libs/index/src/index_tfile.c index 39100edd8b..6ac1a893a6 100644 --- a/source/libs/index/src/index_tfile.c +++ b/source/libs/index/src/index_tfile.c @@ -34,18 +34,19 @@ typedef struct TFileValue { static int tfileValueCompare(const void* a, const void* b, const void* param); static void tfileSerialTableIdsToBuf(char* buf, SArray* tableIds); -static int tfileWriteFstOffset(TFileWriter* tw, int32_t offset); static int tfileWriteHeader(TFileWriter* writer); +static int tfileWriteFstOffset(TFileWriter* tw, int32_t offset); static int tfileWriteData(TFileWriter* write, TFileValue* tval); static int tfileReadLoadHeader(TFileReader* reader); +static int tfileReadLoadFst(TFileReader* reader); +static int tfileReadLoadTableIds(TFileReader* reader, int32_t offset, SArray* result); static int tfileGetFileList(const char* path, SArray* result); static void tfileDestroyFileName(void* elem); static int tfileCompare(const void* a, const void* b); static int tfileParseFileName(const char* filename, uint64_t* suid, int* colId, int* version); static void tfileSerialCacheKey(TFileCacheKey* key, char* buf); -// static tfileGetCompareFunc(uint8_t byte) {} TFileCache* tfileCacheCreate(const char* path) { TFileCache* tcache = calloc(1, sizeof(TFileCache)); @@ -73,9 +74,13 @@ TFileCache* tfileCacheCreate(const char* path) { TFileReader* reader = tfileReaderCreate(wc); if (0 != tfileReadLoadHeader(reader)) { tfileReaderDestroy(reader); - indexError("failed to load index header, index Id: %s", file); + indexError("failed to load index header, index file: %s", file); goto End; } + if (0 != tfileReadLoadFst(reader)) { + tfileReaderDestroy(reader); + indexError("failed to load index fst, index file: %s", file); + } // loader fst and validate it TFileHeader* header = &reader->header; @@ -136,25 +141,29 @@ TFileReader* tfileReaderCreate(WriterCtx* ctx) { void tfileReaderDestroy(TFileReader* reader) { if (reader == NULL) { return; } // T_REF_INC(reader); + fstDestroy(reader->fst); writerCtxDestroy(reader->ctx); free(reader); } int tfileReaderSearch(TFileReader* reader, SIndexTermQuery* query, SArray* result) { SIndexTerm* term = query->term; + // refactor to callback later if (query->qType == QUERY_TERM) { uint64_t offset; FstSlice key = fstSliceCreate(term->colVal, term->nColVal); if (fstGet(reader->fst, &key, &offset)) { - // + return tfileReadLoadTableIds(reader, offset, result); } else { indexInfo("index: %" PRIu64 ", col: %s, colVal: %s, not found in tindex", term->suid, term->colName, term->colVal); } return 0; } else if (query->qType == QUERY_PREFIX) { + // handle later // - // + } else { + // handle later } return 0; } @@ -198,13 +207,10 @@ int tfileWriterPut(TFileWriter* tw, void* data) { TFileValue* v = taosArrayGetP((SArray*)data, i); int32_t tbsz = taosArrayGetSize(v->tableId); - int32_t ttsz = TF_TABLE_TATOAL_SIZE(tbsz); - fstOffset += ttsz; + fstOffset += TF_TABLE_TATOAL_SIZE(tbsz); } // check result or not tfileWriteFstOffset(tw, fstOffset); - // tw->ctx->header.fstOffset = fstOffset; - // tw->ctx->write(tw->ctx, &fstOffset, sizeof(fstOffset)); for (size_t i = 0; i < sz; i++) { TFileValue* v = taosArrayGetP((SArray*)data, i); @@ -287,11 +293,11 @@ static int tfileValueCompare(const void* a, const void* b, const void* param) { return fn(av->colVal, bv->colVal); } -static void tfileSerialTableIdsToBuf(char* buf, SArray* tableIds) { - int tbSz = taosArrayGetSize(tableIds); - SERIALIZE_VAR_TO_BUF(buf, tbSz, int32_t); - for (size_t i = 0; i < tbSz; i++) { - uint64_t* v = taosArrayGet(tableIds, i); +static void tfileSerialTableIdsToBuf(char* buf, SArray* ids) { + int sz = taosArrayGetSize(ids); + SERIALIZE_VAR_TO_BUF(buf, sz, int32_t); + for (size_t i = 0; i < sz; i++) { + uint64_t* v = taosArrayGet(ids, i); SERIALIZE_VAR_TO_BUF(buf, *v, uint64_t); } } @@ -328,6 +334,7 @@ static int tfileWriteData(TFileWriter* write, TFileValue* tval) { } else { // handle other type later } + return 0; } static int tfileReadLoadHeader(TFileReader* reader) { // TODO simple tfile header later @@ -339,6 +346,42 @@ static int tfileReadLoadHeader(TFileReader* reader) { memcpy(&reader->header, buf, sizeof(buf)); return 0; } +static int tfileReadLoadFst(TFileReader* reader) { + // current load fst into memory, refactor it later + static int FST_MAX_SIZE = 16 * 1024; + + char* buf = calloc(1, sizeof(char) * FST_MAX_SIZE); + if (buf == NULL) { return -1; } + + WriterCtx* ctx = reader->ctx; + int32_t nread = ctx->readFrom(ctx, buf, FST_MAX_SIZE, reader->header.fstOffset); + // we assuse fst size less than FST_MAX_SIZE + assert(nread > 0 && nread < FST_MAX_SIZE); + + FstSlice st = fstSliceCreate((uint8_t*)buf, nread); + reader->fst = fstCreate(&st); + free(buf); + fstSliceDestroy(&st); + + return reader->fst == NULL ? 0 : -1; +} +static int tfileReadLoadTableIds(TFileReader* reader, int32_t offset, SArray* result) { + int32_t nid; + WriterCtx* ctx = reader->ctx; + int32_t nread = ctx->readFrom(ctx, (char*)&nid, sizeof(nid), offset); + assert(sizeof(nid) == nread); + + char* buf = calloc(1, sizeof(uint64_t) * nid); + if (buf == NULL) { return -1; } + + nread = ctx->read(ctx, buf, sizeof(uint64_t) * nid); + uint64_t* ids = (uint64_t*)buf; + for (int32_t i = 0; i < nid; i++) { + taosArrayPush(result, ids + i); + } + free(buf); + return 0; +} static int tfileGetFileList(const char* path, SArray* result) { DIR* dir = opendir(path);