diff --git a/source/libs/index/inc/indexInt.h b/source/libs/index/inc/indexInt.h index a258cb834d..22bb9d1d0f 100644 --- a/source/libs/index/inc/indexInt.h +++ b/source/libs/index/inc/indexInt.h @@ -76,7 +76,7 @@ struct SIndexMultiTermQuery { // field and key; typedef struct SIndexTerm { - int64_t suid; + int64_t suid; SIndexOperOnColumn operType; // oper type, add/del/update uint8_t colType; // term data type, str/interger/json char *colName; diff --git a/source/libs/index/inc/index_fst_counting_writer.h b/source/libs/index/inc/index_fst_counting_writer.h index ea090389bb..ac9a59fa04 100644 --- a/source/libs/index/inc/index_fst_counting_writer.h +++ b/source/libs/index/inc/index_fst_counting_writer.h @@ -34,8 +34,14 @@ typedef struct WriterCtx { int (*flush)(struct WriterCtx *ctx); WriterType type; union { - int fd; - void *mem; + struct { + int fd; + bool readOnly; + } file; + struct { + int32_t capa; + char *buf; + } mem; }; int32_t offset; int32_t limit; @@ -45,7 +51,7 @@ static int writeCtxDoWrite(WriterCtx *ctx, uint8_t *buf, int len); static int writeCtxDoRead(WriterCtx *ctx, uint8_t *buf, int len); static int writeCtxDoFlush(WriterCtx *ctx); -WriterCtx* writerCtxCreate(WriterType type, bool readOnly); +WriterCtx* writerCtxCreate(WriterType type, const char *path, bool readOnly, int32_t capacity); void writerCtxDestroy(WriterCtx *w); typedef uint32_t CheckSummer; @@ -66,7 +72,7 @@ int fstCountingWriterFlush(FstCountingWriter *write); uint32_t fstCountingWriterMaskedCheckSum(FstCountingWriter *write); -FstCountingWriter *fstCountingWriterCreate(void *wtr, bool readOnly); +FstCountingWriter *fstCountingWriterCreate(void *wtr); void fstCountingWriterDestroy(FstCountingWriter *w); diff --git a/source/libs/index/inc/index_tfile.h b/source/libs/index/inc/index_tfile.h index c3f4bd25e5..979c0b0639 100644 --- a/source/libs/index/inc/index_tfile.h +++ b/source/libs/index/inc/index_tfile.h @@ -18,23 +18,81 @@ #include "index.h" #include "indexInt.h" #include "tlockfree.h" -#include "tskiplist.h" +#include "index_tfile.h" +#include "index_fst_counting_writer.h" +#include "index_fst.h" #ifdef __cplusplus extern "C" { #endif -typedef struct IndexTFile { + + +typedef struct TFileCacheKey { + uint64_t suid; + uint8_t colType; + int32_t version; + const char *colName; + int32_t nColName; +} TFileCacheKey; + + +// table cache +// refactor to LRU cache later +typedef struct TFileCache { + SHashObj *tableCache; + int16_t capacity; + // add more param +} TFileCache; + + +typedef struct TFileWriter { + FstBuilder *fb; + WriterCtx *wc; +} TFileWriter; + +typedef struct TFileReader { T_REF_DECLARE() + Fst *fst; + +} TFileReader; + +typedef struct IndexTFile { + char *path; + TFileReader *tb; + TFileWriter *tw; } IndexTFile; +typedef struct TFileWriterOpt { + uint64_t suid; + int8_t colType; + char *colName; + int32_t nColName; + int32_t version; +} TFileWriterOpt; +typedef struct TFileReaderOpt { + uint64_t suid; + char *colName; + int32_t nColName; + +} TFileReaderOpt; + +// tfile cache +TFileCache *tfileCacheCreate(); +void tfileCacheDestroy(TFileCache *tcache); +TFileReader* tfileCacheGet(TFileCache *tcache, TFileCacheKey *key); +void tfileCachePut(TFileCache *tcache, TFileCacheKey *key, TFileReader *reader); + +TFileWriter *tfileWriterCreate(const char *suid, const char *colName); IndexTFile *indexTFileCreate(); +int indexTFilePut(void *tfile, SIndexTerm *term, uint64_t uid); int indexTFileSearch(void *tfile, SIndexTermQuery *query, SArray *result); + #ifdef __cplusplus } diff --git a/source/libs/index/inc/index_util.h b/source/libs/index/inc/index_util.h new file mode 100644 index 0000000000..4ab517ecfa --- /dev/null +++ b/source/libs/index/inc/index_util.h @@ -0,0 +1,53 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +#ifndef __INDEX_UTIL_H__ +#define __INDEX_UTIL_H__ + +#ifdef __cplusplus +extern "C" { +#endif + +#define SERIALIZE_MEM_TO_BUF(buf, key, mem) \ + do { \ + memcpy((void *)buf, (void *)(&key->mem), sizeof(key->mem)); \ + buf += sizeof(key->mem); \ + } while (0) + +#define SERIALIZE_STR_MEM_TO_BUF(buf, key, mem, len) \ + do { \ + memcpy((void *)buf, (void *)key->mem, len); \ + buf += len; \ + } while (0) + +#define SERIALIZE_VAR_TO_BUF(buf, var, type) \ + do { \ + type c = var; \ + assert(sizeof(var) == sizeof(type));\ + memcpy((void *)buf, (void *)&c, sizeof(c)); \ + buf += sizeof(c); \ + } while (0) + +#define SERIALIZE_STR_VAR_TO_BUF(buf, var, len) \ + do { \ + memcpy((void *)buf, (void *)var, len); \ + buf += len;\ + } while (0) + + +#ifdef __cplusplus +} +#endif + +#endif diff --git a/source/libs/index/src/index_cache.c b/source/libs/index/src/index_cache.c index 3c52275a4c..5813c99164 100644 --- a/source/libs/index/src/index_cache.c +++ b/source/libs/index/src/index_cache.c @@ -15,6 +15,7 @@ #include "index_cache.h" #include "tcompare.h" +#include "index_util.h" #define MAX_INDEX_KEY_LEN 256// test only, change later @@ -110,35 +111,22 @@ int indexCachePut(void *cache, SIndexTerm *term, int16_t colId, int32_t version, if (cache == NULL) { return -1;} IndexCache *pCache = cache; - // encode data int32_t total = CACHE_KEY_LEN(term); - char *buf = calloc(1, total); char *p = buf; - memcpy(p, &total, sizeof(total)); - p += sizeof(total); + SERIALIZE_VAR_TO_BUF(p, total,int32_t); + SERIALIZE_VAR_TO_BUF(p, colId, int16_t); - memcpy(p, &colId, sizeof(colId)); - p += sizeof(colId); - - memcpy(p, &term->colType, sizeof(term->colType)); - p += sizeof(term->colType); + SERIALIZE_MEM_TO_BUF(p, term, colType); + SERIALIZE_MEM_TO_BUF(p, term, nColVal); + SERIALIZE_STR_MEM_TO_BUF(p, term, colVal, term->nColVal); - memcpy(p, &term->nColVal, sizeof(term->nColVal)); - p += sizeof(term->nColVal); - memcpy(p, term->colVal, term->nColVal); - p += term->nColVal; + SERIALIZE_VAR_TO_BUF(p, version, int32_t); + SERIALIZE_VAR_TO_BUF(p, uid, uint64_t); - memcpy(p, &version, sizeof(version)); - p += sizeof(version); - - memcpy(p, &uid, sizeof(uid)); - p += sizeof(uid); - - memcpy(p, &term->operType, sizeof(term->operType)); - p += sizeof(term->operType); + SERIALIZE_MEM_TO_BUF(p, term, operType); tSkipListPut(pCache->skiplist, (void *)buf); return 0; diff --git a/source/libs/index/src/index_fst.c b/source/libs/index/src/index_fst.c index 7aaa498864..0f00aacf3b 100644 --- a/source/libs/index/src/index_fst.c +++ b/source/libs/index/src/index_fst.c @@ -779,7 +779,7 @@ FstBuilder *fstBuilderCreate(void *w, FstType ty) { if (NULL == b) { return b; } - b->wrt = fstCountingWriterCreate(w, false); + b->wrt = fstCountingWriterCreate(w); b->unfinished = fstUnFinishedNodesCreate(); b->registry = fstRegistryCreate(10000, 2) ; b->last = fstSliceCreate(NULL, 0); diff --git a/source/libs/index/src/index_fst_counting_writer.c b/source/libs/index/src/index_fst_counting_writer.c index 9ec346cebc..3497b9703d 100644 --- a/source/libs/index/src/index_fst_counting_writer.c +++ b/source/libs/index/src/index_fst_counting_writer.c @@ -23,9 +23,9 @@ static int writeCtxDoWrite(WriterCtx *ctx, uint8_t *buf, int len) { } if (ctx->type == TFile) { - assert(len == tfWrite(ctx->fd, buf, len)); + assert(len == tfWrite(ctx->file.fd, buf, len)); } else { - memcpy(ctx->mem + ctx->offset, buf, len); + memcpy(ctx->mem.buf+ ctx->offset, buf, len); } ctx->offset += len; return len; @@ -33,9 +33,9 @@ static int writeCtxDoWrite(WriterCtx *ctx, uint8_t *buf, int len) { static int writeCtxDoRead(WriterCtx *ctx, uint8_t *buf, int len) { int nRead = 0; if (ctx->type == TFile) { - nRead = tfRead(ctx->fd, buf, len); + nRead = tfRead(ctx->file.fd, buf, len); } else { - memcpy(buf, ctx->mem + ctx->offset, len); + memcpy(buf, ctx->mem.buf + ctx->offset, len); } ctx->offset += nRead; @@ -44,63 +44,64 @@ static int writeCtxDoRead(WriterCtx *ctx, uint8_t *buf, int len) { static int writeCtxDoFlush(WriterCtx *ctx) { if (ctx->type == TFile) { //tfFsync(ctx->fd); - //tfFlush(ctx->fd); + //tfFlush(ctx->file.fd); } else { // do nothing } return 1; } -WriterCtx* writerCtxCreate(WriterType type, bool readOnly) { +WriterCtx* writerCtxCreate(WriterType type, const char *path, bool readOnly, int32_t capacity) { WriterCtx *ctx = calloc(1, sizeof(WriterCtx)); if (ctx == NULL) { return NULL; } ctx->type = type; if (ctx->type == TFile) { - tfInit(); // ugly code, refactor later + ctx->file.readOnly = readOnly; if (readOnly == false) { - ctx->fd = tfOpenCreateWriteAppend(tmpFile); + ctx->file.fd = tfOpenCreateWriteAppend(tmpFile); } else { - ctx->fd = tfOpenReadWrite(tmpFile); + ctx->file.fd = tfOpenReadWrite(tmpFile); } - if (ctx->fd < 0) { + if (ctx->file.fd < 0) { indexError("open file error %d", errno); } } else if (ctx->type == TMemory) { - ctx->mem = calloc(1, DefaultMem * sizeof(uint8_t)); + ctx->mem.buf = calloc(1, sizeof(char) * capacity); + ctx->mem.capa = capacity; } ctx->write = writeCtxDoWrite; ctx->read = writeCtxDoRead; ctx->flush = writeCtxDoFlush; ctx->offset = 0; - ctx->limit = DefaultMem; + ctx->limit = capacity; return ctx; } void writerCtxDestroy(WriterCtx *ctx) { if (ctx->type == TMemory) { - free(ctx->mem); + free(ctx->mem.buf); } else { - tfClose(ctx->fd); - tfCleanup(); + tfClose(ctx->file.fd); } free(ctx); } -FstCountingWriter *fstCountingWriterCreate(void *wrt, bool readOnly) { +FstCountingWriter *fstCountingWriterCreate(void *wrt) { FstCountingWriter *cw = calloc(1, sizeof(FstCountingWriter)); if (cw == NULL) { return NULL; } - cw->wrt = (void *)(writerCtxCreate(TFile, readOnly)); + cw->wrt = wrt; + //(void *)(writerCtxCreate(TFile, readOnly)); return cw; } void fstCountingWriterDestroy(FstCountingWriter *cw) { // free wrt object: close fd or free mem fstCountingWriterFlush(cw); - writerCtxDestroy((WriterCtx *)(cw->wrt)); + //writerCtxDestroy((WriterCtx *)(cw->wrt)); free(cw); } @@ -124,6 +125,7 @@ int fstCountingWriterRead(FstCountingWriter *write, uint8_t *buf, uint32_t len) } uint32_t fstCountingWriterMaskedCheckSum(FstCountingWriter *write) { + return 0; } int fstCountingWriterFlush(FstCountingWriter *write) { diff --git a/source/libs/index/src/index_tfile.c b/source/libs/index/src/index_tfile.c index a1bba56391..81a7f9f443 100644 --- a/source/libs/index/src/index_tfile.c +++ b/source/libs/index/src/index_tfile.c @@ -14,6 +14,50 @@ */ #include "index_tfile.h" +#include "index_fst.h" +#include "index_util.h" + + + +static void tfileSerialCacheKey(TFileCacheKey *key, char *buf) { + SERIALIZE_MEM_TO_BUF(buf, key, suid); + SERIALIZE_VAR_TO_BUF(buf, '_', char); + SERIALIZE_MEM_TO_BUF(buf, key, colType); + SERIALIZE_VAR_TO_BUF(buf, '_', char); + SERIALIZE_MEM_TO_BUF(buf, key, version); + SERIALIZE_VAR_TO_BUF(buf, '_', char); + SERIALIZE_STR_MEM_TO_BUF(buf, key, colName, key->nColName); +} + +TFileCache *tfileCacheCreate() { + TFileCache *tcache = calloc(1, sizeof(TFileCache)); + if (tcache == NULL) { return NULL; } + + tcache->tableCache = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); + tcache->capacity = 64; + return tcache; +} +void tfileCacheDestroy(TFileCache *tcache) { + + free(tcache); + +} + +TFileReader *tfileCacheGet(TFileCache *tcache, TFileCacheKey *key) { + char buf[128] = {0}; + tfileSerialCacheKey(key, buf); + TFileReader *reader = taosHashGet(tcache->tableCache, buf, strlen(buf)); + return reader; +} +void tfileCachePut(TFileCache *tcache, TFileCacheKey *key, TFileReader *reader) { + char buf[128] = {0}; + tfileSerialCacheKey(key, buf); + taosHashPut(tcache->tableCache, buf, strlen(buf), &reader, sizeof(void *)); + return; +} + + + IndexTFile *indexTFileCreate() { IndexTFile *tfile = calloc(1, sizeof(IndexTFile)); @@ -22,10 +66,24 @@ IndexTFile *indexTFileCreate() { void IndexTFileDestroy(IndexTFile *tfile) { free(tfile); } + + int indexTFileSearch(void *tfile, SIndexTermQuery *query, SArray *result) { IndexTFile *ptfile = (IndexTFile *)tfile; + return 0; } +int indexTFilePut(void *tfile, SIndexTerm *term, uint64_t uid) { + TFileWriterOpt wOpt = {.suid = term->suid, + .colType = term->colType, + .colName = term->colName, + .nColName= term->nColName, + .version = 1}; + + + + return 0; +} diff --git a/source/libs/index/test/indexTests.cc b/source/libs/index/test/indexTests.cc index 9dff2e9ea0..9baabb9610 100644 --- a/source/libs/index/test/indexTests.cc +++ b/source/libs/index/test/indexTests.cc @@ -45,7 +45,7 @@ class FstWriter { class FstReadMemory { public: FstReadMemory(size_t size) { - _w = fstCountingWriterCreate(NULL, true); + _w = fstCountingWriterCreate(NULL); _size = size; memset((void *)&_s, 0, sizeof(_s)); }