Merge pull request #9233 from taosdata/feature/index_cache
update index TFile manage
This commit is contained in:
commit
84d102b5e7
|
@ -26,7 +26,19 @@
|
|||
extern "C" {
|
||||
#endif
|
||||
|
||||
// tfile header
|
||||
// |<---suid--->|<---version--->|<--colLen-->|<-colName->|<---type-->|
|
||||
// |<-uint64_t->|<---int32_t--->|<--int32_t->|<-colLen-->|<-uint8_t->|
|
||||
|
||||
typedef struct TFileReadHeader {
|
||||
uint64_t suid;
|
||||
int32_t version;
|
||||
char colName[128]; //
|
||||
uint8_t colType;
|
||||
} TFileReadHeader;
|
||||
|
||||
#define TFILE_HEADER_SIZE (sizeof(TFILE_HEADER_SIZE) + sizeof(uint32_t));
|
||||
#define TFILE_HADER_PRE_SIZE (sizeof(uint64_t) + sizeof(int32_t) + sizeof(int32_t))
|
||||
|
||||
typedef struct TFileCacheKey {
|
||||
uint64_t suid;
|
||||
|
@ -48,13 +60,13 @@ typedef struct TFileCache {
|
|||
|
||||
typedef struct TFileWriter {
|
||||
FstBuilder *fb;
|
||||
WriterCtx *wc;
|
||||
WriterCtx *ctx;
|
||||
} TFileWriter;
|
||||
|
||||
typedef struct TFileReader {
|
||||
T_REF_DECLARE()
|
||||
Fst *fst;
|
||||
|
||||
WriterCtx *ctx;
|
||||
} TFileReader;
|
||||
|
||||
typedef struct IndexTFile {
|
||||
|
@ -78,18 +90,22 @@ typedef struct TFileReaderOpt {
|
|||
|
||||
} TFileReaderOpt;
|
||||
|
||||
// tfile cache
|
||||
// tfile cache, manage tindex reader
|
||||
TFileCache *tfileCacheCreate(const char *path);
|
||||
void tfileCacheDestroy(TFileCache *tcache);
|
||||
TFileReader* tfileCacheGet(TFileCache *tcache, TFileCacheKey *key);
|
||||
void tfileCachePut(TFileCache *tcache, TFileCacheKey *key, TFileReader *reader);
|
||||
|
||||
TFileReader* tfileReaderCreate();
|
||||
void TFileReaderDestroy(TFileReader *reader);
|
||||
|
||||
|
||||
TFileWriter *tfileWriterCreate(const char *suid, const char *colName);
|
||||
void tfileWriterDestroy(TFileWriter *tw);
|
||||
|
||||
//
|
||||
IndexTFile *indexTFileCreate(const char *path);
|
||||
|
||||
int indexTFilePut(void *tfile, SIndexTerm *term, uint64_t uid);
|
||||
|
||||
int indexTFileSearch(void *tfile, SIndexTermQuery *query, SArray *result);
|
||||
|
||||
|
||||
|
|
|
@ -65,6 +65,7 @@ WriterCtx* writerCtxCreate(WriterType type, const char *path, bool readOnly, int
|
|||
ctx->file.fd = tfOpenReadWrite(tmpFile);
|
||||
}
|
||||
if (ctx->file.fd < 0) {
|
||||
goto END;
|
||||
indexError("open file error %d", errno);
|
||||
}
|
||||
} else if (ctx->type == TMemory) {
|
||||
|
@ -79,6 +80,9 @@ WriterCtx* writerCtxCreate(WriterType type, const char *path, bool readOnly, int
|
|||
ctx->limit = capacity;
|
||||
|
||||
return ctx;
|
||||
END:
|
||||
if (ctx->type == TMemory) { free(ctx->mem.buf); }
|
||||
free(ctx);
|
||||
}
|
||||
void writerCtxDestroy(WriterCtx *ctx) {
|
||||
if (ctx->type == TMemory) {
|
||||
|
|
|
@ -13,14 +13,38 @@
|
|||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#include <sys/types.h>
|
||||
#include <dirent.h>
|
||||
//#include <sys/types.h>
|
||||
//#include <dirent.h>
|
||||
#include "index_tfile.h"
|
||||
#include "index_fst.h"
|
||||
#include "index_util.h"
|
||||
#include "taosdef.h"
|
||||
#include "index.h"
|
||||
#include "index_fst_counting_writer.h"
|
||||
|
||||
|
||||
// tfile name suid-colId-version.tindex
|
||||
static FORCE_INLINE int tfileLoadHeader(WriterCtx *ctx, TFileReadHeader *header) {
|
||||
//TODO simple tfile header later
|
||||
char buf[TFILE_HADER_PRE_SIZE];
|
||||
char *p = buf;
|
||||
int64_t nread = ctx->read(ctx, buf, TFILE_HADER_PRE_SIZE);
|
||||
assert(nread == TFILE_HADER_PRE_SIZE);
|
||||
|
||||
memcpy(&header->suid, p, sizeof(header->suid));
|
||||
p += sizeof(header->suid);
|
||||
|
||||
memcpy(&header->version, p, sizeof(header->version));
|
||||
p += sizeof(header->version);
|
||||
|
||||
int32_t colLen = 0;
|
||||
memcpy(&colLen, p, sizeof(colLen));
|
||||
assert(colLen < sizeof(header->colName));
|
||||
nread = ctx->read(ctx, header->colName, colLen);
|
||||
assert(nread == colLen);
|
||||
|
||||
nread = ctx->read(ctx, &header->colType, sizeof(header->colType));
|
||||
return 0;
|
||||
};
|
||||
static int tfileGetFileList(const char *path, SArray *result) {
|
||||
DIR *dir = opendir(path);
|
||||
if (NULL == dir) { return -1; }
|
||||
|
@ -35,6 +59,10 @@ static int tfileGetFileList(const char *path, SArray *result) {
|
|||
closedir(dir);
|
||||
return 0;
|
||||
}
|
||||
static void tfileDestroyFileName(void *elem) {
|
||||
char *p = *(char **)elem;
|
||||
free(p);
|
||||
}
|
||||
static int tfileCompare(const void *a, const void *b) {
|
||||
const char *aName = *(char **)a;
|
||||
const char *bName = *(char **)b;
|
||||
|
@ -42,6 +70,7 @@ static int tfileCompare(const void *a, const void *b) {
|
|||
size_t bLen = strlen(bName);
|
||||
return strncmp(aName, bName, aLen > bLen ? aLen : bLen);
|
||||
}
|
||||
// tfile name suid-colId-version.tindex
|
||||
static int tfileParseFileName(const char *filename, uint64_t *suid, int *colId, int *version) {
|
||||
if (3 == sscanf(filename, "%" PRIu64 "-%d-%d.tindex", suid, colId, version)) {
|
||||
// read suid & colid & version success
|
||||
|
@ -74,14 +103,28 @@ TFileCache *tfileCacheCreate(const char *path) {
|
|||
uint64_t suid;
|
||||
int colId, version;
|
||||
if (0 != tfileParseFileName(file, &suid, &colId, &version)) {
|
||||
// invalid file, just skip
|
||||
goto End;
|
||||
continue;
|
||||
}
|
||||
free((void *)file);
|
||||
|
||||
TFileReader *reader = calloc(1, sizeof(TFileReader));
|
||||
reader->ctx = writerCtxCreate(TFile, file, true, 1024 * 64);
|
||||
if (reader->ctx == NULL) {
|
||||
TFileReaderDestroy(reader);
|
||||
indexError("failed to open index: %s", file);
|
||||
goto End;
|
||||
}
|
||||
TFileReadHeader header = {0};
|
||||
if (0 != tfileLoadHeader(reader->ctx, &header)) {
|
||||
TFileReaderDestroy(reader);
|
||||
indexError("failed to load index header, index Id: %s", file);
|
||||
}
|
||||
}
|
||||
taosArrayDestroy(files);
|
||||
|
||||
taosArrayDestroyEx(files, tfileDestroyFileName);
|
||||
return tcache;
|
||||
End:
|
||||
taosArrayDestroyEx(files, tfileDestroyFileName);
|
||||
return NULL;
|
||||
}
|
||||
void tfileCacheDestroy(TFileCache *tcache) {
|
||||
|
||||
|
@ -103,13 +146,25 @@ void tfileCachePut(TFileCache *tcache, TFileCacheKey *key, TFileReader *reader)
|
|||
}
|
||||
|
||||
|
||||
TFileReader* tfileReaderCreate() {
|
||||
|
||||
}
|
||||
void TFileReaderDestroy(TFileReader *reader) {
|
||||
if (reader == NULL) { return; }
|
||||
|
||||
writerCtxDestroy(reader->ctx);
|
||||
free(reader);
|
||||
}
|
||||
|
||||
|
||||
TFileWriter *tfileWriterCreate(const char *suid, const char *colName);
|
||||
void tfileWriterDestroy(TFileWriter *tw);
|
||||
|
||||
|
||||
IndexTFile *indexTFileCreate(const char *path) {
|
||||
IndexTFile *tfile = calloc(1, sizeof(IndexTFile));
|
||||
tfile->cache = tfileCacheCreate(path);
|
||||
|
||||
|
||||
return tfile;
|
||||
}
|
||||
void IndexTFileDestroy(IndexTFile *tfile) {
|
||||
|
|
Loading…
Reference in New Issue