Merge remote-tracking branch 'origin/3.0' into feature/dnode3
This commit is contained in:
commit
0d64b517ee
|
@ -59,7 +59,7 @@ typedef struct TFileReader {
|
||||||
|
|
||||||
typedef struct IndexTFile {
|
typedef struct IndexTFile {
|
||||||
char *path;
|
char *path;
|
||||||
TFileReader *tb;
|
TFileCache *cache;
|
||||||
TFileWriter *tw;
|
TFileWriter *tw;
|
||||||
} IndexTFile;
|
} IndexTFile;
|
||||||
|
|
||||||
|
@ -79,14 +79,14 @@ typedef struct TFileReaderOpt {
|
||||||
} TFileReaderOpt;
|
} TFileReaderOpt;
|
||||||
|
|
||||||
// tfile cache
|
// tfile cache
|
||||||
TFileCache *tfileCacheCreate();
|
TFileCache *tfileCacheCreate(const char *path);
|
||||||
void tfileCacheDestroy(TFileCache *tcache);
|
void tfileCacheDestroy(TFileCache *tcache);
|
||||||
TFileReader* tfileCacheGet(TFileCache *tcache, TFileCacheKey *key);
|
TFileReader* tfileCacheGet(TFileCache *tcache, TFileCacheKey *key);
|
||||||
void tfileCachePut(TFileCache *tcache, TFileCacheKey *key, TFileReader *reader);
|
void tfileCachePut(TFileCache *tcache, TFileCacheKey *key, TFileReader *reader);
|
||||||
|
|
||||||
TFileWriter *tfileWriterCreate(const char *suid, const char *colName);
|
TFileWriter *tfileWriterCreate(const char *suid, const char *colName);
|
||||||
|
|
||||||
IndexTFile *indexTFileCreate();
|
IndexTFile *indexTFileCreate(const char *path);
|
||||||
|
|
||||||
int indexTFilePut(void *tfile, SIndexTerm *term, uint64_t uid);
|
int indexTFilePut(void *tfile, SIndexTerm *term, uint64_t uid);
|
||||||
|
|
||||||
|
|
|
@ -333,13 +333,17 @@ static int indexMergeFinalResults(SArray *interResults, EIndexOperatorType oType
|
||||||
//refactor, merge interResults into fResults by oType
|
//refactor, merge interResults into fResults by oType
|
||||||
SArray *first = taosArrayGetP(interResults, 0);
|
SArray *first = taosArrayGetP(interResults, 0);
|
||||||
taosArraySort(first, uidCompare);
|
taosArraySort(first, uidCompare);
|
||||||
|
taosArrayRemoveDuplicate(first, uidCompare, NULL);
|
||||||
if (oType == MUST) {
|
if (oType == MUST) {
|
||||||
|
// just one column index, enhance later
|
||||||
|
taosArrayAddAll(fResults, first);
|
||||||
} else if (oType == SHOULD) {
|
} else if (oType == SHOULD) {
|
||||||
|
// just one column index, enhance later
|
||||||
|
taosArrayAddAll(fResults, first);
|
||||||
// tag1 condistion || tag2 condition
|
// tag1 condistion || tag2 condition
|
||||||
} else if (oType == NOT) {
|
} else if (oType == NOT) {
|
||||||
|
// just one column index, enhance later
|
||||||
|
taosArrayAddAll(fResults, first);
|
||||||
// not use currently
|
// not use currently
|
||||||
}
|
}
|
||||||
return 0;
|
return 0;
|
||||||
|
|
|
@ -147,7 +147,7 @@ int indexCacheSearch(void *cache, SIndexTermQuery *query, int16_t colId, int32_t
|
||||||
|
|
||||||
char *buf = calloc(1, keyLen);
|
char *buf = calloc(1, keyLen);
|
||||||
if (qtype == QUERY_TERM) {
|
if (qtype == QUERY_TERM) {
|
||||||
|
|
||||||
} else if (qtype == QUERY_PREFIX) {
|
} else if (qtype == QUERY_PREFIX) {
|
||||||
|
|
||||||
} else if (qtype == QUERY_SUFFIX) {
|
} else if (qtype == QUERY_SUFFIX) {
|
||||||
|
|
|
@ -13,12 +13,42 @@
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
|
#include <sys/types.h>
|
||||||
|
#include <dirent.h>
|
||||||
#include "index_tfile.h"
|
#include "index_tfile.h"
|
||||||
#include "index_fst.h"
|
#include "index_fst.h"
|
||||||
#include "index_util.h"
|
#include "index_util.h"
|
||||||
|
|
||||||
|
|
||||||
|
// tfile name suid-colId-version.tindex
|
||||||
|
static int tfileGetFileList(const char *path, SArray *result) {
|
||||||
|
DIR *dir = opendir(path);
|
||||||
|
if (NULL == dir) { return -1; }
|
||||||
|
|
||||||
|
struct dirent *entry;
|
||||||
|
while ((entry = readdir(dir)) != NULL) {
|
||||||
|
size_t len = strlen(entry->d_name);
|
||||||
|
char *buf = calloc(1, len + 1);
|
||||||
|
memcpy(buf, entry->d_name, len);
|
||||||
|
taosArrayPush(result, &buf);
|
||||||
|
}
|
||||||
|
closedir(dir);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
static int tfileCompare(const void *a, const void *b) {
|
||||||
|
const char *aName = *(char **)a;
|
||||||
|
const char *bName = *(char **)b;
|
||||||
|
size_t aLen = strlen(aName);
|
||||||
|
size_t bLen = strlen(bName);
|
||||||
|
return strncmp(aName, bName, aLen > bLen ? aLen : bLen);
|
||||||
|
}
|
||||||
|
static int tfileParseFileName(const char *filename, uint64_t *suid, int *colId, int *version) {
|
||||||
|
if (3 == sscanf(filename, "%" PRIu64 "-%d-%d.tindex", suid, colId, version)) {
|
||||||
|
// read suid & colid & version success
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
static void tfileSerialCacheKey(TFileCacheKey *key, char *buf) {
|
static void tfileSerialCacheKey(TFileCacheKey *key, char *buf) {
|
||||||
SERIALIZE_MEM_TO_BUF(buf, key, suid);
|
SERIALIZE_MEM_TO_BUF(buf, key, suid);
|
||||||
SERIALIZE_VAR_TO_BUF(buf, '_', char);
|
SERIALIZE_VAR_TO_BUF(buf, '_', char);
|
||||||
|
@ -29,12 +59,28 @@ static void tfileSerialCacheKey(TFileCacheKey *key, char *buf) {
|
||||||
SERIALIZE_STR_MEM_TO_BUF(buf, key, colName, key->nColName);
|
SERIALIZE_STR_MEM_TO_BUF(buf, key, colName, key->nColName);
|
||||||
}
|
}
|
||||||
|
|
||||||
TFileCache *tfileCacheCreate() {
|
TFileCache *tfileCacheCreate(const char *path) {
|
||||||
TFileCache *tcache = calloc(1, sizeof(TFileCache));
|
TFileCache *tcache = calloc(1, sizeof(TFileCache));
|
||||||
if (tcache == NULL) { return NULL; }
|
if (tcache == NULL) { return NULL; }
|
||||||
|
|
||||||
tcache->tableCache = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
|
tcache->tableCache = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
|
||||||
tcache->capacity = 64;
|
tcache->capacity = 64;
|
||||||
|
|
||||||
|
SArray *files = taosArrayInit(4, sizeof(void *));
|
||||||
|
tfileGetFileList(path, files);
|
||||||
|
taosArraySort(files, tfileCompare);
|
||||||
|
for (size_t i = 0; i < taosArrayGetSize(files); i++) {
|
||||||
|
char *file = taosArrayGetP(files, i);
|
||||||
|
uint64_t suid;
|
||||||
|
int colId, version;
|
||||||
|
if (0 != tfileParseFileName(file, &suid, &colId, &version)) {
|
||||||
|
// invalid file, just skip
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
free((void *)file);
|
||||||
|
}
|
||||||
|
taosArrayDestroy(files);
|
||||||
|
|
||||||
return tcache;
|
return tcache;
|
||||||
}
|
}
|
||||||
void tfileCacheDestroy(TFileCache *tcache) {
|
void tfileCacheDestroy(TFileCache *tcache) {
|
||||||
|
@ -59,8 +105,11 @@ void tfileCachePut(TFileCache *tcache, TFileCacheKey *key, TFileReader *reader)
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
IndexTFile *indexTFileCreate() {
|
IndexTFile *indexTFileCreate(const char *path) {
|
||||||
IndexTFile *tfile = calloc(1, sizeof(IndexTFile));
|
IndexTFile *tfile = calloc(1, sizeof(IndexTFile));
|
||||||
|
tfile->cache = tfileCacheCreate(path);
|
||||||
|
|
||||||
|
|
||||||
return tfile;
|
return tfile;
|
||||||
}
|
}
|
||||||
void IndexTFileDestroy(IndexTFile *tfile) {
|
void IndexTFileDestroy(IndexTFile *tfile) {
|
||||||
|
@ -69,8 +118,15 @@ void IndexTFileDestroy(IndexTFile *tfile) {
|
||||||
|
|
||||||
|
|
||||||
int indexTFileSearch(void *tfile, SIndexTermQuery *query, SArray *result) {
|
int indexTFileSearch(void *tfile, SIndexTermQuery *query, SArray *result) {
|
||||||
IndexTFile *ptfile = (IndexTFile *)tfile;
|
IndexTFile *pTfile = (IndexTFile *)tfile;
|
||||||
|
|
||||||
|
SIndexTerm *term = query->term;
|
||||||
|
TFileCacheKey key = {.suid = term->suid,
|
||||||
|
.colType = term->colType,
|
||||||
|
.version = 0,
|
||||||
|
.colName = term->colName,
|
||||||
|
.nColName= term->nColName};
|
||||||
|
TFileReader *reader = tfileCacheGet(pTfile->cache, &key);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
int indexTFilePut(void *tfile, SIndexTerm *term, uint64_t uid) {
|
int indexTFilePut(void *tfile, SIndexTerm *term, uint64_t uid) {
|
||||||
|
|
|
@ -26,6 +26,7 @@
|
||||||
class FstWriter {
|
class FstWriter {
|
||||||
public:
|
public:
|
||||||
FstWriter() {
|
FstWriter() {
|
||||||
|
_wc = writerCtxCreate(TFile, "/tmp/tindex", false, 0);
|
||||||
_b = fstBuilderCreate(NULL, 0);
|
_b = fstBuilderCreate(NULL, 0);
|
||||||
}
|
}
|
||||||
bool Put(const std::string &key, uint64_t val) {
|
bool Put(const std::string &key, uint64_t val) {
|
||||||
|
@ -37,15 +38,19 @@ class FstWriter {
|
||||||
~FstWriter() {
|
~FstWriter() {
|
||||||
fstBuilderFinish(_b);
|
fstBuilderFinish(_b);
|
||||||
fstBuilderDestroy(_b);
|
fstBuilderDestroy(_b);
|
||||||
|
|
||||||
|
writerCtxDestroy(_wc);
|
||||||
}
|
}
|
||||||
private:
|
private:
|
||||||
FstBuilder *_b;
|
FstBuilder *_b;
|
||||||
|
WriterCtx *_wc;
|
||||||
};
|
};
|
||||||
|
|
||||||
class FstReadMemory {
|
class FstReadMemory {
|
||||||
public:
|
public:
|
||||||
FstReadMemory(size_t size) {
|
FstReadMemory(size_t size) {
|
||||||
_w = fstCountingWriterCreate(NULL);
|
_wc = writerCtxCreate(TFile, "/tmp/tindex", true, 0);
|
||||||
|
_w = fstCountingWriterCreate(_wc);
|
||||||
_size = size;
|
_size = size;
|
||||||
memset((void *)&_s, 0, sizeof(_s));
|
memset((void *)&_s, 0, sizeof(_s));
|
||||||
}
|
}
|
||||||
|
@ -94,12 +99,14 @@ class FstReadMemory {
|
||||||
fstCountingWriterDestroy(_w);
|
fstCountingWriterDestroy(_w);
|
||||||
fstDestroy(_fst);
|
fstDestroy(_fst);
|
||||||
fstSliceDestroy(&_s);
|
fstSliceDestroy(&_s);
|
||||||
|
writerCtxDestroy(_wc);
|
||||||
}
|
}
|
||||||
|
|
||||||
private:
|
private:
|
||||||
FstCountingWriter *_w;
|
FstCountingWriter *_w;
|
||||||
Fst *_fst;
|
Fst *_fst;
|
||||||
FstSlice _s;
|
FstSlice _s;
|
||||||
|
WriterCtx *_wc;
|
||||||
size_t _size;
|
size_t _size;
|
||||||
|
|
||||||
};
|
};
|
||||||
|
|
Loading…
Reference in New Issue