Merge pull request #9579 from taosdata/feature/index_cache
Feature/index cache
This commit is contained in:
commit
cbfcc58664
|
@ -74,16 +74,15 @@ int indexOpen(SIndexOpts* opts, const char* path, SIndex** index) {
|
||||||
// sIdx->cache = (void*)indexCacheCreate(sIdx);
|
// sIdx->cache = (void*)indexCacheCreate(sIdx);
|
||||||
sIdx->tindex = indexTFileCreate(path);
|
sIdx->tindex = indexTFileCreate(path);
|
||||||
if (sIdx->tindex == NULL) { goto END; }
|
if (sIdx->tindex == NULL) { goto END; }
|
||||||
|
|
||||||
sIdx->colObj = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
|
sIdx->colObj = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
|
||||||
sIdx->cVersion = 1;
|
sIdx->cVersion = 1;
|
||||||
sIdx->path = calloc(1, strlen(path) + 1);
|
sIdx->path = tstrdup(path);
|
||||||
memcpy(sIdx->path, path, strlen(path));
|
|
||||||
pthread_mutex_init(&sIdx->mtx, NULL);
|
pthread_mutex_init(&sIdx->mtx, NULL);
|
||||||
|
|
||||||
*index = sIdx;
|
*index = sIdx;
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
END:
|
END:
|
||||||
if (sIdx != NULL) { indexClose(sIdx); }
|
if (sIdx != NULL) { indexClose(sIdx); }
|
||||||
|
|
||||||
|
@ -310,18 +309,14 @@ static int indexTermSearch(SIndex* sIdx, SIndexTermQuery* query, SArray** result
|
||||||
|
|
||||||
// Get col info
|
// Get col info
|
||||||
IndexCache* cache = NULL;
|
IndexCache* cache = NULL;
|
||||||
pthread_mutex_lock(&sIdx->mtx);
|
|
||||||
|
|
||||||
char buf[128] = {0};
|
char buf[128] = {0};
|
||||||
ICacheKey key = {.suid = term->suid, .colName = term->colName, .nColName = strlen(term->colName)};
|
ICacheKey key = {.suid = term->suid, .colName = term->colName, .nColName = strlen(term->colName)};
|
||||||
int32_t sz = indexSerialCacheKey(&key, buf);
|
int32_t sz = indexSerialCacheKey(&key, buf);
|
||||||
|
|
||||||
|
pthread_mutex_lock(&sIdx->mtx);
|
||||||
IndexCache** pCache = taosHashGet(sIdx->colObj, buf, sz);
|
IndexCache** pCache = taosHashGet(sIdx->colObj, buf, sz);
|
||||||
if (pCache == NULL) {
|
cache = (pCache == NULL) ? NULL : *pCache;
|
||||||
pthread_mutex_unlock(&sIdx->mtx);
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
cache = *pCache;
|
|
||||||
pthread_mutex_unlock(&sIdx->mtx);
|
pthread_mutex_unlock(&sIdx->mtx);
|
||||||
|
|
||||||
*result = taosArrayInit(4, sizeof(uint64_t));
|
*result = taosArrayInit(4, sizeof(uint64_t));
|
||||||
|
@ -329,7 +324,7 @@ static int indexTermSearch(SIndex* sIdx, SIndexTermQuery* query, SArray** result
|
||||||
STermValueType s = kTypeValue;
|
STermValueType s = kTypeValue;
|
||||||
if (0 == indexCacheSearch(cache, query, *result, &s)) {
|
if (0 == indexCacheSearch(cache, query, *result, &s)) {
|
||||||
if (s == kTypeDeletion) {
|
if (s == kTypeDeletion) {
|
||||||
indexInfo("col: %s already drop by other opera", term->colName);
|
indexInfo("col: %s already drop by", term->colName);
|
||||||
// coloum already drop by other oper, no need to query tindex
|
// coloum already drop by other oper, no need to query tindex
|
||||||
return 0;
|
return 0;
|
||||||
} else {
|
} else {
|
||||||
|
@ -402,7 +397,7 @@ static void indexDestroyTempResult(SArray* result) {
|
||||||
}
|
}
|
||||||
int indexFlushCacheTFile(SIndex* sIdx, void* cache) {
|
int indexFlushCacheTFile(SIndex* sIdx, void* cache) {
|
||||||
if (sIdx == NULL) { return -1; }
|
if (sIdx == NULL) { return -1; }
|
||||||
indexWarn("suid %" PRIu64 " merge cache into tindex", sIdx->suid);
|
indexInfo("suid %" PRIu64 " merge cache into tindex", sIdx->suid);
|
||||||
|
|
||||||
IndexCache* pCache = (IndexCache*)cache;
|
IndexCache* pCache = (IndexCache*)cache;
|
||||||
TFileReader* pReader = tfileGetReaderByCol(sIdx->tindex, pCache->suid, pCache->colName);
|
TFileReader* pReader = tfileGetReaderByCol(sIdx->tindex, pCache->suid, pCache->colName);
|
||||||
|
@ -504,17 +499,15 @@ static int indexGenTFile(SIndex* sIdx, IndexCache* cache, SArray* batch) {
|
||||||
tfileWriterClose(tw);
|
tfileWriterClose(tw);
|
||||||
|
|
||||||
TFileReader* reader = tfileReaderOpen(sIdx->path, cache->suid, version, cache->colName);
|
TFileReader* reader = tfileReaderOpen(sIdx->path, cache->suid, version, cache->colName);
|
||||||
|
if (reader == NULL) { goto END; }
|
||||||
|
|
||||||
char buf[128] = {0};
|
|
||||||
TFileHeader* header = &reader->header;
|
TFileHeader* header = &reader->header;
|
||||||
ICacheKey key = {
|
ICacheKey key = {
|
||||||
.suid = cache->suid, .colName = header->colName, .nColName = strlen(header->colName), .colType = header->colType};
|
.suid = cache->suid, .colName = header->colName, .nColName = strlen(header->colName), .colType = header->colType};
|
||||||
|
|
||||||
pthread_mutex_lock(&sIdx->mtx);
|
pthread_mutex_lock(&sIdx->mtx);
|
||||||
|
|
||||||
IndexTFile* ifile = (IndexTFile*)sIdx->tindex;
|
IndexTFile* ifile = (IndexTFile*)sIdx->tindex;
|
||||||
tfileCachePut(ifile->cache, &key, reader);
|
tfileCachePut(ifile->cache, &key, reader);
|
||||||
|
|
||||||
pthread_mutex_unlock(&sIdx->mtx);
|
pthread_mutex_unlock(&sIdx->mtx);
|
||||||
return ret;
|
return ret;
|
||||||
END:
|
END:
|
||||||
|
|
|
@ -20,7 +20,7 @@
|
||||||
|
|
||||||
#define MAX_INDEX_KEY_LEN 256 // test only, change later
|
#define MAX_INDEX_KEY_LEN 256 // test only, change later
|
||||||
|
|
||||||
#define MEM_TERM_LIMIT 5 * 10000
|
#define MEM_TERM_LIMIT 10 * 10000
|
||||||
// ref index_cache.h:22
|
// ref index_cache.h:22
|
||||||
//#define CACHE_KEY_LEN(p) \
|
//#define CACHE_KEY_LEN(p) \
|
||||||
// (sizeof(int32_t) + sizeof(uint16_t) + sizeof(p->colType) + sizeof(p->nColVal) + p->nColVal + sizeof(uint64_t) +
|
// (sizeof(int32_t) + sizeof(uint16_t) + sizeof(p->colType) + sizeof(p->nColVal) + p->nColVal + sizeof(uint64_t) +
|
||||||
|
@ -261,7 +261,7 @@ static int indexQueryMem(MemTable* mem, CacheTerm* ct, EIndexQueryType qtype, SA
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
int indexCacheSearch(void* cache, SIndexTermQuery* query, SArray* result, STermValueType* s) {
|
int indexCacheSearch(void* cache, SIndexTermQuery* query, SArray* result, STermValueType* s) {
|
||||||
if (cache == NULL) { return -1; }
|
if (cache == NULL) { return 0; }
|
||||||
IndexCache* pCache = cache;
|
IndexCache* pCache = cache;
|
||||||
|
|
||||||
MemTable *mem = NULL, *imm = NULL;
|
MemTable *mem = NULL, *imm = NULL;
|
||||||
|
|
|
@ -18,7 +18,7 @@
|
||||||
#include "tutil.h"
|
#include "tutil.h"
|
||||||
|
|
||||||
static int writeCtxDoWrite(WriterCtx* ctx, uint8_t* buf, int len) {
|
static int writeCtxDoWrite(WriterCtx* ctx, uint8_t* buf, int len) {
|
||||||
if (ctx->offset + len > ctx->limit) { return -1; }
|
// if (ctx->offset + len > ctx->limit) { return -1; }
|
||||||
|
|
||||||
if (ctx->type == TFile) {
|
if (ctx->type == TFile) {
|
||||||
assert(len == tfWrite(ctx->file.fd, buf, len));
|
assert(len == tfWrite(ctx->file.fd, buf, len));
|
||||||
|
@ -111,8 +111,8 @@ void writerCtxDestroy(WriterCtx* ctx, bool remove) {
|
||||||
if (ctx->type == TMemory) {
|
if (ctx->type == TMemory) {
|
||||||
free(ctx->mem.buf);
|
free(ctx->mem.buf);
|
||||||
} else {
|
} else {
|
||||||
|
// ctx->flush(ctx);
|
||||||
tfClose(ctx->file.fd);
|
tfClose(ctx->file.fd);
|
||||||
ctx->flush(ctx);
|
|
||||||
if (remove) { unlink(ctx->file.buf); }
|
if (remove) { unlink(ctx->file.buf); }
|
||||||
}
|
}
|
||||||
free(ctx);
|
free(ctx);
|
||||||
|
|
|
@ -67,29 +67,18 @@ TFileCache* tfileCacheCreate(const char* path) {
|
||||||
for (size_t i = 0; i < taosArrayGetSize(files); i++) {
|
for (size_t i = 0; i < taosArrayGetSize(files); i++) {
|
||||||
char* file = taosArrayGetP(files, i);
|
char* file = taosArrayGetP(files, i);
|
||||||
|
|
||||||
// refactor later, use colname and version info
|
WriterCtx* wc = writerCtxCreate(TFile, file, true, 1024 * 1024 * 64);
|
||||||
char colName[256] = {0};
|
|
||||||
if (0 != tfileParseFileName(file, &suid, colName, (int*)&version)) {
|
|
||||||
indexInfo("try parse invalid file: %s, skip it", file);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
char fullName[256] = {0};
|
|
||||||
sprintf(fullName, "%s/%s", path, file);
|
|
||||||
|
|
||||||
WriterCtx* wc = writerCtxCreate(TFile, fullName, true, 1024 * 1024 * 64);
|
|
||||||
if (wc == NULL) {
|
if (wc == NULL) {
|
||||||
indexError("failed to open index:%s", file);
|
indexError("failed to open index:%s", file);
|
||||||
goto End;
|
goto End;
|
||||||
}
|
}
|
||||||
|
|
||||||
char buf[128] = {0};
|
|
||||||
TFileReader* reader = tfileReaderCreate(wc);
|
TFileReader* reader = tfileReaderCreate(wc);
|
||||||
|
if (reader == NULL) { goto End; }
|
||||||
TFileHeader* header = &reader->header;
|
TFileHeader* header = &reader->header;
|
||||||
ICacheKey key = {.suid = header->suid,
|
|
||||||
.colName = header->colName,
|
char buf[128] = {0};
|
||||||
.nColName = strlen(header->colName),
|
ICacheKey key = {.suid = header->suid, .colName = header->colName, .nColName = strlen(header->colName)};
|
||||||
.colType = header->colType};
|
|
||||||
|
|
||||||
int32_t sz = indexSerialCacheKey(&key, buf);
|
int32_t sz = indexSerialCacheKey(&key, buf);
|
||||||
assert(sz < sizeof(buf));
|
assert(sz < sizeof(buf));
|
||||||
|
@ -256,7 +245,8 @@ int tfileWriterPut(TFileWriter* tw, void* data, bool order) {
|
||||||
// sort by coltype and write to tindex
|
// sort by coltype and write to tindex
|
||||||
if (order == false) {
|
if (order == false) {
|
||||||
__compar_fn_t fn;
|
__compar_fn_t fn;
|
||||||
int8_t colType = tw->header.colType;
|
|
||||||
|
int8_t colType = tw->header.colType;
|
||||||
if (colType == TSDB_DATA_TYPE_BINARY || colType == TSDB_DATA_TYPE_NCHAR) {
|
if (colType == TSDB_DATA_TYPE_BINARY || colType == TSDB_DATA_TYPE_NCHAR) {
|
||||||
fn = tfileStrCompare;
|
fn = tfileStrCompare;
|
||||||
} else {
|
} else {
|
||||||
|
@ -274,7 +264,8 @@ int tfileWriterPut(TFileWriter* tw, void* data, bool order) {
|
||||||
// ugly code, refactor later
|
// ugly code, refactor later
|
||||||
for (size_t i = 0; i < sz; i++) {
|
for (size_t i = 0; i < sz; i++) {
|
||||||
TFileValue* v = taosArrayGetP((SArray*)data, i);
|
TFileValue* v = taosArrayGetP((SArray*)data, i);
|
||||||
// taosArrayRemoveDuplicate(v->tablId, tfileUidCompare, NULL);
|
taosArraySort(v->tableId, tfileUidCompare);
|
||||||
|
taosArrayRemoveDuplicate(v->tableId, tfileUidCompare, NULL);
|
||||||
int32_t tbsz = taosArrayGetSize(v->tableId);
|
int32_t tbsz = taosArrayGetSize(v->tableId);
|
||||||
fstOffset += TF_TABLE_TATOAL_SIZE(tbsz);
|
fstOffset += TF_TABLE_TATOAL_SIZE(tbsz);
|
||||||
}
|
}
|
||||||
|
@ -351,10 +342,16 @@ void tfileWriterDestroy(TFileWriter* tw) {
|
||||||
}
|
}
|
||||||
|
|
||||||
IndexTFile* indexTFileCreate(const char* path) {
|
IndexTFile* indexTFileCreate(const char* path) {
|
||||||
IndexTFile* tfile = calloc(1, sizeof(IndexTFile));
|
TFileCache* cache = tfileCacheCreate(path);
|
||||||
if (tfile == NULL) { return NULL; }
|
if (cache == NULL) { return NULL; }
|
||||||
|
|
||||||
tfile->cache = tfileCacheCreate(path);
|
IndexTFile* tfile = calloc(1, sizeof(IndexTFile));
|
||||||
|
if (tfile == NULL) {
|
||||||
|
tfileCacheDestroy(cache);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
tfile->cache = cache;
|
||||||
return tfile;
|
return tfile;
|
||||||
}
|
}
|
||||||
void indexTFileDestroy(IndexTFile* tfile) {
|
void indexTFileDestroy(IndexTFile* tfile) {
|
||||||
|
@ -366,6 +363,7 @@ void indexTFileDestroy(IndexTFile* tfile) {
|
||||||
int indexTFileSearch(void* tfile, SIndexTermQuery* query, SArray* result) {
|
int indexTFileSearch(void* tfile, SIndexTermQuery* query, SArray* result) {
|
||||||
int ret = -1;
|
int ret = -1;
|
||||||
if (tfile == NULL) { return ret; }
|
if (tfile == NULL) { return ret; }
|
||||||
|
|
||||||
IndexTFile* pTfile = (IndexTFile*)tfile;
|
IndexTFile* pTfile = (IndexTFile*)tfile;
|
||||||
|
|
||||||
SIndexTerm* term = query->term;
|
SIndexTerm* term = query->term;
|
||||||
|
@ -545,12 +543,11 @@ static int tfileReaderLoadHeader(TFileReader* reader) {
|
||||||
|
|
||||||
int64_t nread = reader->ctx->readFrom(reader->ctx, buf, sizeof(buf), 0);
|
int64_t nread = reader->ctx->readFrom(reader->ctx, buf, sizeof(buf), 0);
|
||||||
if (nread == -1) {
|
if (nread == -1) {
|
||||||
//
|
|
||||||
indexError("actual Read: %d, to read: %d, errno: %d, filefd: %d, filename: %s", (int)(nread), (int)sizeof(buf),
|
indexError("actual Read: %d, to read: %d, errno: %d, filefd: %d, filename: %s", (int)(nread), (int)sizeof(buf),
|
||||||
errno, reader->ctx->file.fd, reader->ctx->file.buf);
|
errno, reader->ctx->file.fd, reader->ctx->file.buf);
|
||||||
} else {
|
} else {
|
||||||
indexError("actual Read: %d, to read: %d, errno: %d, filefd: %d, filename: %s", (int)(nread), (int)sizeof(buf),
|
indexInfo("actual Read: %d, to read: %d, filefd: %d, filename: %s", (int)(nread), (int)sizeof(buf),
|
||||||
errno, reader->ctx->file.fd, reader->ctx->file.buf);
|
reader->ctx->file.fd, reader->ctx->file.buf);
|
||||||
}
|
}
|
||||||
// assert(nread == sizeof(buf));
|
// assert(nread == sizeof(buf));
|
||||||
memcpy(&reader->header, buf, sizeof(buf));
|
memcpy(&reader->header, buf, sizeof(buf));
|
||||||
|
@ -566,7 +563,8 @@ static int tfileReaderLoadFst(TFileReader* reader) {
|
||||||
|
|
||||||
WriterCtx* ctx = reader->ctx;
|
WriterCtx* ctx = reader->ctx;
|
||||||
int32_t nread = ctx->readFrom(ctx, buf, FST_MAX_SIZE, reader->header.fstOffset);
|
int32_t nread = ctx->readFrom(ctx, buf, FST_MAX_SIZE, reader->header.fstOffset);
|
||||||
indexError("nread = %d, and fst offset=%d, filename: %s ", nread, reader->header.fstOffset, ctx->file.buf);
|
indexInfo("nread = %d, and fst offset=%d, filename: %s, size: %d ", nread, reader->header.fstOffset, ctx->file.buf,
|
||||||
|
ctx->file.size);
|
||||||
// we assuse fst size less than FST_MAX_SIZE
|
// we assuse fst size less than FST_MAX_SIZE
|
||||||
assert(nread > 0 && nread < FST_MAX_SIZE);
|
assert(nread > 0 && nread < FST_MAX_SIZE);
|
||||||
|
|
||||||
|
@ -613,15 +611,20 @@ void tfileReaderUnRef(TFileReader* reader) {
|
||||||
static SArray* tfileGetFileList(const char* path) {
|
static SArray* tfileGetFileList(const char* path) {
|
||||||
SArray* files = taosArrayInit(4, sizeof(void*));
|
SArray* files = taosArrayInit(4, sizeof(void*));
|
||||||
|
|
||||||
|
char buf[128] = {0};
|
||||||
|
uint64_t suid;
|
||||||
|
uint32_t version;
|
||||||
|
|
||||||
DIR* dir = opendir(path);
|
DIR* dir = opendir(path);
|
||||||
if (NULL == dir) { return NULL; }
|
if (NULL == dir) { return NULL; }
|
||||||
|
|
||||||
struct dirent* entry;
|
struct dirent* entry;
|
||||||
while ((entry = readdir(dir)) != NULL) {
|
while ((entry = readdir(dir)) != NULL) {
|
||||||
if (entry->d_type && DT_DIR) { continue; }
|
char* file = entry->d_name;
|
||||||
size_t len = strlen(entry->d_name);
|
if (0 != tfileParseFileName(file, &suid, buf, &version)) { continue; }
|
||||||
char* buf = calloc(1, len + 1);
|
|
||||||
memcpy(buf, entry->d_name, len);
|
size_t len = strlen(path) + 1 + strlen(file) + 1;
|
||||||
|
char* buf = calloc(1, len);
|
||||||
|
sprintf(buf, "%s/%s", path, file);
|
||||||
taosArrayPush(files, &buf);
|
taosArrayPush(files, &buf);
|
||||||
}
|
}
|
||||||
closedir(dir);
|
closedir(dir);
|
||||||
|
|
|
@ -701,7 +701,8 @@ class IndexObj {
|
||||||
int64_t s = taosGetTimestampUs();
|
int64_t s = taosGetTimestampUs();
|
||||||
if (Search(mq, result) == 0) {
|
if (Search(mq, result) == 0) {
|
||||||
int64_t e = taosGetTimestampUs();
|
int64_t e = taosGetTimestampUs();
|
||||||
std::cout << "search one successfully and time cost:" << e - s << std::endl;
|
std::cout << "search one successfully and time cost:" << e - s << "\tquery col:" << colName
|
||||||
|
<< "\t val: " << colVal << "\t size:" << taosArrayGetSize(result) << std::endl;
|
||||||
} else {
|
} else {
|
||||||
}
|
}
|
||||||
int sz = taosArrayGetSize(result);
|
int sz = taosArrayGetSize(result);
|
||||||
|
@ -834,11 +835,8 @@ static void write_and_search(IndexObj* idx) {
|
||||||
std::string colName("tag1"), colVal("Hello");
|
std::string colName("tag1"), colVal("Hello");
|
||||||
|
|
||||||
int target = idx->SearchOne("tag1", "Hello");
|
int target = idx->SearchOne("tag1", "Hello");
|
||||||
std::cout << "search: " << target << std::endl;
|
|
||||||
target = idx->SearchOne("tag2", "Test");
|
target = idx->SearchOne("tag2", "Test");
|
||||||
std::cout << "search: " << target << std::endl;
|
// idx->PutOne(colName, colVal);
|
||||||
|
|
||||||
idx->PutOne(colName, colVal);
|
|
||||||
}
|
}
|
||||||
TEST_F(IndexEnv2, testIndex_serarch_cache_and_tfile) {
|
TEST_F(IndexEnv2, testIndex_serarch_cache_and_tfile) {
|
||||||
std::string path = "/tmp/cache_and_tfile";
|
std::string path = "/tmp/cache_and_tfile";
|
||||||
|
@ -847,8 +845,8 @@ TEST_F(IndexEnv2, testIndex_serarch_cache_and_tfile) {
|
||||||
}
|
}
|
||||||
index->PutOne("tag1", "Hello");
|
index->PutOne("tag1", "Hello");
|
||||||
index->PutOne("tag2", "Test");
|
index->PutOne("tag2", "Test");
|
||||||
index->WriteMultiMillonData("tag1", "Hello", 50 * 10000);
|
index->WriteMultiMillonData("tag1", "Hello", 100 * 10000);
|
||||||
index->WriteMultiMillonData("tag2", "Test", 50 * 10000);
|
index->WriteMultiMillonData("tag2", "Test", 100 * 10000);
|
||||||
std::thread threads[NUM_OF_THREAD];
|
std::thread threads[NUM_OF_THREAD];
|
||||||
|
|
||||||
for (int i = 0; i < NUM_OF_THREAD; i++) {
|
for (int i = 0; i < NUM_OF_THREAD; i++) {
|
||||||
|
|
Loading…
Reference in New Issue