diff --git a/source/libs/index/inc/index_tfile.h b/source/libs/index/inc/index_tfile.h index e6cb3a75c4..ce974cc49f 100644 --- a/source/libs/index/inc/index_tfile.h +++ b/source/libs/index/inc/index_tfile.h @@ -26,7 +26,19 @@ extern "C" { #endif +// tfile header +// |<---suid--->|<---version--->|<--colLen-->|<-colName->|<---type-->| +// |<-uint64_t->|<---int32_t--->|<--int32_t->|<-colLen-->|<-uint8_t->| +typedef struct TFileReadHeader { + uint64_t suid; + int32_t version; + char colName[128]; // + uint8_t colType; +} TFileReadHeader; + +#define TFILE_HEADER_SIZE (sizeof(TFILE_HEADER_SIZE) + sizeof(uint32_t)); +#define TFILE_HADER_PRE_SIZE (sizeof(uint64_t) + sizeof(int32_t) + sizeof(int32_t)) typedef struct TFileCacheKey { uint64_t suid; @@ -48,13 +60,13 @@ typedef struct TFileCache { typedef struct TFileWriter { FstBuilder *fb; - WriterCtx *wc; + WriterCtx *ctx; } TFileWriter; typedef struct TFileReader { T_REF_DECLARE() Fst *fst; - + WriterCtx *ctx; } TFileReader; typedef struct IndexTFile { @@ -78,18 +90,22 @@ typedef struct TFileReaderOpt { } TFileReaderOpt; -// tfile cache +// tfile cache, manage tindex reader TFileCache *tfileCacheCreate(const char *path); void tfileCacheDestroy(TFileCache *tcache); TFileReader* tfileCacheGet(TFileCache *tcache, TFileCacheKey *key); void tfileCachePut(TFileCache *tcache, TFileCacheKey *key, TFileReader *reader); +TFileReader* tfileReaderCreate(); +void TFileReaderDestroy(TFileReader *reader); + + TFileWriter *tfileWriterCreate(const char *suid, const char *colName); +void tfileWriterDestroy(TFileWriter *tw); +// IndexTFile *indexTFileCreate(const char *path); - int indexTFilePut(void *tfile, SIndexTerm *term, uint64_t uid); - int indexTFileSearch(void *tfile, SIndexTermQuery *query, SArray *result); diff --git a/source/libs/index/src/index_fst_counting_writer.c b/source/libs/index/src/index_fst_counting_writer.c index 3497b9703d..f0a7b04407 100644 --- a/source/libs/index/src/index_fst_counting_writer.c +++ b/source/libs/index/src/index_fst_counting_writer.c @@ -65,6 +65,7 @@ WriterCtx* writerCtxCreate(WriterType type, const char *path, bool readOnly, int ctx->file.fd = tfOpenReadWrite(tmpFile); } if (ctx->file.fd < 0) { + goto END; indexError("open file error %d", errno); } } else if (ctx->type == TMemory) { @@ -79,6 +80,9 @@ WriterCtx* writerCtxCreate(WriterType type, const char *path, bool readOnly, int ctx->limit = capacity; return ctx; +END: + if (ctx->type == TMemory) { free(ctx->mem.buf); } + free(ctx); } void writerCtxDestroy(WriterCtx *ctx) { if (ctx->type == TMemory) { diff --git a/source/libs/index/src/index_tfile.c b/source/libs/index/src/index_tfile.c index 9a5b3251b1..bacadba716 100644 --- a/source/libs/index/src/index_tfile.c +++ b/source/libs/index/src/index_tfile.c @@ -13,14 +13,38 @@ * along with this program. If not, see . */ -#include -#include +//#include +//#include #include "index_tfile.h" #include "index_fst.h" #include "index_util.h" +#include "taosdef.h" +#include "index.h" +#include "index_fst_counting_writer.h" -// tfile name suid-colId-version.tindex +static FORCE_INLINE int tfileLoadHeader(WriterCtx *ctx, TFileReadHeader *header) { + //TODO simple tfile header later + char buf[TFILE_HADER_PRE_SIZE]; + char *p = buf; + int64_t nread = ctx->read(ctx, buf, TFILE_HADER_PRE_SIZE); + assert(nread == TFILE_HADER_PRE_SIZE); + + memcpy(&header->suid, p, sizeof(header->suid)); + p += sizeof(header->suid); + + memcpy(&header->version, p, sizeof(header->version)); + p += sizeof(header->version); + + int32_t colLen = 0; + memcpy(&colLen, p, sizeof(colLen)); + assert(colLen < sizeof(header->colName)); + nread = ctx->read(ctx, header->colName, colLen); + assert(nread == colLen); + + nread = ctx->read(ctx, &header->colType, sizeof(header->colType)); + return 0; +}; static int tfileGetFileList(const char *path, SArray *result) { DIR *dir = opendir(path); if (NULL == dir) { return -1; } @@ -35,6 +59,10 @@ static int tfileGetFileList(const char *path, SArray *result) { closedir(dir); return 0; } +static void tfileDestroyFileName(void *elem) { + char *p = *(char **)elem; + free(p); +} static int tfileCompare(const void *a, const void *b) { const char *aName = *(char **)a; const char *bName = *(char **)b; @@ -42,6 +70,7 @@ static int tfileCompare(const void *a, const void *b) { size_t bLen = strlen(bName); return strncmp(aName, bName, aLen > bLen ? aLen : bLen); } +// tfile name suid-colId-version.tindex static int tfileParseFileName(const char *filename, uint64_t *suid, int *colId, int *version) { if (3 == sscanf(filename, "%" PRIu64 "-%d-%d.tindex", suid, colId, version)) { // read suid & colid & version success @@ -74,14 +103,28 @@ TFileCache *tfileCacheCreate(const char *path) { uint64_t suid; int colId, version; if (0 != tfileParseFileName(file, &suid, &colId, &version)) { - // invalid file, just skip + goto End; continue; } - free((void *)file); + + TFileReader *reader = calloc(1, sizeof(TFileReader)); + reader->ctx = writerCtxCreate(TFile, file, true, 1024 * 64); + if (reader->ctx == NULL) { + TFileReaderDestroy(reader); + indexError("failed to open index: %s", file); + goto End; + } + TFileReadHeader header = {0}; + if (0 != tfileLoadHeader(reader->ctx, &header)) { + TFileReaderDestroy(reader); + indexError("failed to load index header, index Id: %s", file); + } } - taosArrayDestroy(files); - + taosArrayDestroyEx(files, tfileDestroyFileName); return tcache; +End: + taosArrayDestroyEx(files, tfileDestroyFileName); + return NULL; } void tfileCacheDestroy(TFileCache *tcache) { @@ -103,13 +146,25 @@ void tfileCachePut(TFileCache *tcache, TFileCacheKey *key, TFileReader *reader) } +TFileReader* tfileReaderCreate() { + +} +void TFileReaderDestroy(TFileReader *reader) { + if (reader == NULL) { return; } + + writerCtxDestroy(reader->ctx); + free(reader); +} + + +TFileWriter *tfileWriterCreate(const char *suid, const char *colName); +void tfileWriterDestroy(TFileWriter *tw); IndexTFile *indexTFileCreate(const char *path) { IndexTFile *tfile = calloc(1, sizeof(IndexTFile)); tfile->cache = tfileCacheCreate(path); - return tfile; } void IndexTFileDestroy(IndexTFile *tfile) {