update index TFile manage
This commit is contained in:
parent
4f52726463
commit
6fe118c516
|
@ -26,7 +26,19 @@
|
||||||
extern "C" {
|
extern "C" {
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
// tfile header
|
||||||
|
// |<---suid--->|<---version--->|<--colLen-->|<-colName->|<---type-->|
|
||||||
|
// |<-uint64_t->|<---int32_t--->|<--int32_t->|<-colLen-->|<-uint8_t->|
|
||||||
|
|
||||||
|
typedef struct TFileReadHeader {
|
||||||
|
uint64_t suid;
|
||||||
|
int32_t version;
|
||||||
|
char colName[128]; //
|
||||||
|
uint8_t colType;
|
||||||
|
} TFileReadHeader;
|
||||||
|
|
||||||
|
#define TFILE_HEADER_SIZE (sizeof(TFILE_HEADER_SIZE) + sizeof(uint32_t));
|
||||||
|
#define TFILE_HADER_PRE_SIZE (sizeof(uint64_t) + sizeof(int32_t) + sizeof(int32_t))
|
||||||
|
|
||||||
typedef struct TFileCacheKey {
|
typedef struct TFileCacheKey {
|
||||||
uint64_t suid;
|
uint64_t suid;
|
||||||
|
@ -48,13 +60,13 @@ typedef struct TFileCache {
|
||||||
|
|
||||||
typedef struct TFileWriter {
|
typedef struct TFileWriter {
|
||||||
FstBuilder *fb;
|
FstBuilder *fb;
|
||||||
WriterCtx *wc;
|
WriterCtx *ctx;
|
||||||
} TFileWriter;
|
} TFileWriter;
|
||||||
|
|
||||||
typedef struct TFileReader {
|
typedef struct TFileReader {
|
||||||
T_REF_DECLARE()
|
T_REF_DECLARE()
|
||||||
Fst *fst;
|
Fst *fst;
|
||||||
|
WriterCtx *ctx;
|
||||||
} TFileReader;
|
} TFileReader;
|
||||||
|
|
||||||
typedef struct IndexTFile {
|
typedef struct IndexTFile {
|
||||||
|
@ -78,18 +90,22 @@ typedef struct TFileReaderOpt {
|
||||||
|
|
||||||
} TFileReaderOpt;
|
} TFileReaderOpt;
|
||||||
|
|
||||||
// tfile cache
|
// tfile cache, manage tindex reader
|
||||||
TFileCache *tfileCacheCreate(const char *path);
|
TFileCache *tfileCacheCreate(const char *path);
|
||||||
void tfileCacheDestroy(TFileCache *tcache);
|
void tfileCacheDestroy(TFileCache *tcache);
|
||||||
TFileReader* tfileCacheGet(TFileCache *tcache, TFileCacheKey *key);
|
TFileReader* tfileCacheGet(TFileCache *tcache, TFileCacheKey *key);
|
||||||
void tfileCachePut(TFileCache *tcache, TFileCacheKey *key, TFileReader *reader);
|
void tfileCachePut(TFileCache *tcache, TFileCacheKey *key, TFileReader *reader);
|
||||||
|
|
||||||
|
TFileReader* tfileReaderCreate();
|
||||||
|
void TFileReaderDestroy(TFileReader *reader);
|
||||||
|
|
||||||
|
|
||||||
TFileWriter *tfileWriterCreate(const char *suid, const char *colName);
|
TFileWriter *tfileWriterCreate(const char *suid, const char *colName);
|
||||||
|
void tfileWriterDestroy(TFileWriter *tw);
|
||||||
|
|
||||||
|
//
|
||||||
IndexTFile *indexTFileCreate(const char *path);
|
IndexTFile *indexTFileCreate(const char *path);
|
||||||
|
|
||||||
int indexTFilePut(void *tfile, SIndexTerm *term, uint64_t uid);
|
int indexTFilePut(void *tfile, SIndexTerm *term, uint64_t uid);
|
||||||
|
|
||||||
int indexTFileSearch(void *tfile, SIndexTermQuery *query, SArray *result);
|
int indexTFileSearch(void *tfile, SIndexTermQuery *query, SArray *result);
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -65,6 +65,7 @@ WriterCtx* writerCtxCreate(WriterType type, const char *path, bool readOnly, int
|
||||||
ctx->file.fd = tfOpenReadWrite(tmpFile);
|
ctx->file.fd = tfOpenReadWrite(tmpFile);
|
||||||
}
|
}
|
||||||
if (ctx->file.fd < 0) {
|
if (ctx->file.fd < 0) {
|
||||||
|
goto END;
|
||||||
indexError("open file error %d", errno);
|
indexError("open file error %d", errno);
|
||||||
}
|
}
|
||||||
} else if (ctx->type == TMemory) {
|
} else if (ctx->type == TMemory) {
|
||||||
|
@ -79,6 +80,9 @@ WriterCtx* writerCtxCreate(WriterType type, const char *path, bool readOnly, int
|
||||||
ctx->limit = capacity;
|
ctx->limit = capacity;
|
||||||
|
|
||||||
return ctx;
|
return ctx;
|
||||||
|
END:
|
||||||
|
if (ctx->type == TMemory) { free(ctx->mem.buf); }
|
||||||
|
free(ctx);
|
||||||
}
|
}
|
||||||
void writerCtxDestroy(WriterCtx *ctx) {
|
void writerCtxDestroy(WriterCtx *ctx) {
|
||||||
if (ctx->type == TMemory) {
|
if (ctx->type == TMemory) {
|
||||||
|
|
|
@ -13,14 +13,38 @@
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#include <sys/types.h>
|
//#include <sys/types.h>
|
||||||
#include <dirent.h>
|
//#include <dirent.h>
|
||||||
#include "index_tfile.h"
|
#include "index_tfile.h"
|
||||||
#include "index_fst.h"
|
#include "index_fst.h"
|
||||||
#include "index_util.h"
|
#include "index_util.h"
|
||||||
|
#include "taosdef.h"
|
||||||
|
#include "index.h"
|
||||||
|
#include "index_fst_counting_writer.h"
|
||||||
|
|
||||||
|
|
||||||
// tfile name suid-colId-version.tindex
|
static FORCE_INLINE int tfileLoadHeader(WriterCtx *ctx, TFileReadHeader *header) {
|
||||||
|
//TODO simple tfile header later
|
||||||
|
char buf[TFILE_HADER_PRE_SIZE];
|
||||||
|
char *p = buf;
|
||||||
|
int64_t nread = ctx->read(ctx, buf, TFILE_HADER_PRE_SIZE);
|
||||||
|
assert(nread == TFILE_HADER_PRE_SIZE);
|
||||||
|
|
||||||
|
memcpy(&header->suid, p, sizeof(header->suid));
|
||||||
|
p += sizeof(header->suid);
|
||||||
|
|
||||||
|
memcpy(&header->version, p, sizeof(header->version));
|
||||||
|
p += sizeof(header->version);
|
||||||
|
|
||||||
|
int32_t colLen = 0;
|
||||||
|
memcpy(&colLen, p, sizeof(colLen));
|
||||||
|
assert(colLen < sizeof(header->colName));
|
||||||
|
nread = ctx->read(ctx, header->colName, colLen);
|
||||||
|
assert(nread == colLen);
|
||||||
|
|
||||||
|
nread = ctx->read(ctx, &header->colType, sizeof(header->colType));
|
||||||
|
return 0;
|
||||||
|
};
|
||||||
static int tfileGetFileList(const char *path, SArray *result) {
|
static int tfileGetFileList(const char *path, SArray *result) {
|
||||||
DIR *dir = opendir(path);
|
DIR *dir = opendir(path);
|
||||||
if (NULL == dir) { return -1; }
|
if (NULL == dir) { return -1; }
|
||||||
|
@ -35,6 +59,10 @@ static int tfileGetFileList(const char *path, SArray *result) {
|
||||||
closedir(dir);
|
closedir(dir);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
static void tfileDestroyFileName(void *elem) {
|
||||||
|
char *p = *(char **)elem;
|
||||||
|
free(p);
|
||||||
|
}
|
||||||
static int tfileCompare(const void *a, const void *b) {
|
static int tfileCompare(const void *a, const void *b) {
|
||||||
const char *aName = *(char **)a;
|
const char *aName = *(char **)a;
|
||||||
const char *bName = *(char **)b;
|
const char *bName = *(char **)b;
|
||||||
|
@ -42,6 +70,7 @@ static int tfileCompare(const void *a, const void *b) {
|
||||||
size_t bLen = strlen(bName);
|
size_t bLen = strlen(bName);
|
||||||
return strncmp(aName, bName, aLen > bLen ? aLen : bLen);
|
return strncmp(aName, bName, aLen > bLen ? aLen : bLen);
|
||||||
}
|
}
|
||||||
|
// tfile name suid-colId-version.tindex
|
||||||
static int tfileParseFileName(const char *filename, uint64_t *suid, int *colId, int *version) {
|
static int tfileParseFileName(const char *filename, uint64_t *suid, int *colId, int *version) {
|
||||||
if (3 == sscanf(filename, "%" PRIu64 "-%d-%d.tindex", suid, colId, version)) {
|
if (3 == sscanf(filename, "%" PRIu64 "-%d-%d.tindex", suid, colId, version)) {
|
||||||
// read suid & colid & version success
|
// read suid & colid & version success
|
||||||
|
@ -74,14 +103,28 @@ TFileCache *tfileCacheCreate(const char *path) {
|
||||||
uint64_t suid;
|
uint64_t suid;
|
||||||
int colId, version;
|
int colId, version;
|
||||||
if (0 != tfileParseFileName(file, &suid, &colId, &version)) {
|
if (0 != tfileParseFileName(file, &suid, &colId, &version)) {
|
||||||
// invalid file, just skip
|
goto End;
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
free((void *)file);
|
|
||||||
|
TFileReader *reader = calloc(1, sizeof(TFileReader));
|
||||||
|
reader->ctx = writerCtxCreate(TFile, file, true, 1024 * 64);
|
||||||
|
if (reader->ctx == NULL) {
|
||||||
|
TFileReaderDestroy(reader);
|
||||||
|
indexError("failed to open index: %s", file);
|
||||||
|
goto End;
|
||||||
|
}
|
||||||
|
TFileReadHeader header = {0};
|
||||||
|
if (0 != tfileLoadHeader(reader->ctx, &header)) {
|
||||||
|
TFileReaderDestroy(reader);
|
||||||
|
indexError("failed to load index header, index Id: %s", file);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
taosArrayDestroy(files);
|
taosArrayDestroyEx(files, tfileDestroyFileName);
|
||||||
|
|
||||||
return tcache;
|
return tcache;
|
||||||
|
End:
|
||||||
|
taosArrayDestroyEx(files, tfileDestroyFileName);
|
||||||
|
return NULL;
|
||||||
}
|
}
|
||||||
void tfileCacheDestroy(TFileCache *tcache) {
|
void tfileCacheDestroy(TFileCache *tcache) {
|
||||||
|
|
||||||
|
@ -103,13 +146,25 @@ void tfileCachePut(TFileCache *tcache, TFileCacheKey *key, TFileReader *reader)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
TFileReader* tfileReaderCreate() {
|
||||||
|
|
||||||
|
}
|
||||||
|
void TFileReaderDestroy(TFileReader *reader) {
|
||||||
|
if (reader == NULL) { return; }
|
||||||
|
|
||||||
|
writerCtxDestroy(reader->ctx);
|
||||||
|
free(reader);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
TFileWriter *tfileWriterCreate(const char *suid, const char *colName);
|
||||||
|
void tfileWriterDestroy(TFileWriter *tw);
|
||||||
|
|
||||||
|
|
||||||
IndexTFile *indexTFileCreate(const char *path) {
|
IndexTFile *indexTFileCreate(const char *path) {
|
||||||
IndexTFile *tfile = calloc(1, sizeof(IndexTFile));
|
IndexTFile *tfile = calloc(1, sizeof(IndexTFile));
|
||||||
tfile->cache = tfileCacheCreate(path);
|
tfile->cache = tfileCacheCreate(path);
|
||||||
|
|
||||||
|
|
||||||
return tfile;
|
return tfile;
|
||||||
}
|
}
|
||||||
void IndexTFileDestroy(IndexTFile *tfile) {
|
void IndexTFileDestroy(IndexTFile *tfile) {
|
||||||
|
|
Loading…
Reference in New Issue