Merge remote-tracking branch 'origin/3.0' into feature/dnode3
This commit is contained in:
commit
37b6ea6233
|
@ -20,7 +20,7 @@
|
|||
|
||||
#define MAX_INDEX_KEY_LEN 256 // test only, change later
|
||||
|
||||
#define MEM_TERM_LIMIT 100
|
||||
#define MEM_TERM_LIMIT 200
|
||||
// ref index_cache.h:22
|
||||
//#define CACHE_KEY_LEN(p) \
|
||||
// (sizeof(int32_t) + sizeof(uint16_t) + sizeof(p->colType) + sizeof(p->nColVal) + p->nColVal + sizeof(uint64_t) +
|
||||
|
@ -66,6 +66,7 @@ void indexCacheDebug(IndexCache* cache) {
|
|||
indexMemRef(tbl);
|
||||
pthread_mutex_unlock(&cache->mtx);
|
||||
|
||||
{
|
||||
SSkipList* slt = tbl->mem;
|
||||
SSkipListIterator* iter = tSkipListCreateIter(slt);
|
||||
while (tSkipListIterNext(iter)) {
|
||||
|
@ -81,6 +82,29 @@ void indexCacheDebug(IndexCache* cache) {
|
|||
indexMemUnRef(tbl);
|
||||
}
|
||||
|
||||
{
|
||||
pthread_mutex_lock(&cache->mtx);
|
||||
tbl = cache->imm;
|
||||
indexMemRef(tbl);
|
||||
pthread_mutex_unlock(&cache->mtx);
|
||||
if (tbl != NULL) {
|
||||
SSkipList* slt = tbl->mem;
|
||||
SSkipListIterator* iter = tSkipListCreateIter(slt);
|
||||
while (tSkipListIterNext(iter)) {
|
||||
SSkipListNode* node = tSkipListIterGet(iter);
|
||||
CacheTerm* ct = (CacheTerm*)SL_GET_NODE_DATA(node);
|
||||
if (ct != NULL) {
|
||||
// TODO, add more debug info
|
||||
indexInfo("{colVal: %s, version: %d} \t", ct->colVal, ct->version);
|
||||
}
|
||||
}
|
||||
tSkipListDestroyIter(iter);
|
||||
}
|
||||
|
||||
indexMemUnRef(tbl);
|
||||
}
|
||||
}
|
||||
|
||||
void indexCacheDestroySkiplist(SSkipList* slt) {
|
||||
SSkipListIterator* iter = tSkipListCreateIter(slt);
|
||||
while (tSkipListIterNext(iter)) {
|
||||
|
@ -247,6 +271,7 @@ int indexCacheSearch(void* cache, SIndexTermQuery* query, SArray* result, STermV
|
|||
SIndexTerm* term = query->term;
|
||||
EIndexQueryType qtype = query->qType;
|
||||
CacheTerm ct = {.colVal = term->colVal, .version = atomic_load_32(&pCache->version)};
|
||||
indexCacheDebug(pCache);
|
||||
|
||||
int ret = indexQueryMem(mem, &ct, qtype, result, s);
|
||||
if (ret == 0 && *s != kTypeDeletion) {
|
||||
|
|
|
@ -52,7 +52,7 @@ static int writeCtxDoReadFrom(WriterCtx* ctx, uint8_t* buf, int len, int32_t off
|
|||
}
|
||||
static int writeCtxDoFlush(WriterCtx* ctx) {
|
||||
if (ctx->type == TFile) {
|
||||
// tfFsync(ctx->fd);
|
||||
tfFsync(ctx->file.fd);
|
||||
// tfFlush(ctx->file.fd);
|
||||
} else {
|
||||
// do nothing
|
||||
|
@ -101,7 +101,10 @@ void writerCtxDestroy(WriterCtx* ctx, bool remove) {
|
|||
free(ctx->mem.buf);
|
||||
} else {
|
||||
tfClose(ctx->file.fd);
|
||||
if (remove) unlink(ctx->file.buf);
|
||||
if (remove) {
|
||||
indexError("rm file %s", ctx->file.buf);
|
||||
unlink(ctx->file.buf);
|
||||
}
|
||||
}
|
||||
free(ctx);
|
||||
}
|
||||
|
|
|
@ -81,7 +81,7 @@ TFileCache* tfileCacheCreate(const char* path) {
|
|||
continue;
|
||||
}
|
||||
|
||||
WriterCtx* wc = writerCtxCreate(TFile, file, true, 1024 * 64);
|
||||
WriterCtx* wc = writerCtxCreate(TFile, file, true, 1024 * 1024 * 64);
|
||||
if (wc == NULL) {
|
||||
indexError("failed to open index:%s", file);
|
||||
goto End;
|
||||
|
@ -211,7 +211,7 @@ TFileWriter* tfileWriterOpen(char* path, uint64_t suid, int32_t version, const c
|
|||
|
||||
char fullname[256] = {0};
|
||||
snprintf(fullname, sizeof(fullname), "%s/%s", path, filename);
|
||||
WriterCtx* wcx = writerCtxCreate(TFile, fullname, false, 1024 * 1024);
|
||||
WriterCtx* wcx = writerCtxCreate(TFile, fullname, false, 1024 * 1024 * 64);
|
||||
if (wcx == NULL) { return NULL; }
|
||||
|
||||
TFileHeader tfh = {0};
|
||||
|
@ -229,7 +229,7 @@ TFileReader* tfileReaderOpen(char* path, uint64_t suid, int32_t version, const c
|
|||
|
||||
char fullname[256] = {0};
|
||||
snprintf(fullname, sizeof(fullname), "%s/%s", path, filename);
|
||||
WriterCtx* wc = writerCtxCreate(TFile, fullname, true, 1024 * 1024);
|
||||
WriterCtx* wc = writerCtxCreate(TFile, fullname, true, 1024 * 1024 * 1024);
|
||||
if (wc == NULL) { return NULL; }
|
||||
|
||||
TFileReader* reader = tfileReaderCreateImpl(wc);
|
||||
|
|
|
@ -427,7 +427,7 @@ static TFileValue* genTFileValue(const char* val) {
|
|||
memcpy(tv->colVal, val, vlen);
|
||||
|
||||
tv->tableId = (SArray*)taosArrayInit(1, sizeof(uint64_t));
|
||||
for (size_t i = 0; i < 10; i++) {
|
||||
for (size_t i = 0; i < 200; i++) {
|
||||
uint64_t v = i;
|
||||
taosArrayPush(tv->tableId, &v);
|
||||
}
|
||||
|
@ -440,17 +440,14 @@ static void destroyTFileValue(void* val) {
|
|||
free(tv);
|
||||
}
|
||||
TEST_F(IndexTFileEnv, test_tfile_write) {
|
||||
TFileValue* v1 = genTFileValue("c");
|
||||
TFileValue* v2 = genTFileValue("ab");
|
||||
TFileValue* v3 = genTFileValue("b");
|
||||
TFileValue* v4 = genTFileValue("d");
|
||||
TFileValue* v1 = genTFileValue("ab");
|
||||
|
||||
SArray* data = (SArray*)taosArrayInit(4, sizeof(void*));
|
||||
|
||||
taosArrayPush(data, &v1);
|
||||
taosArrayPush(data, &v2);
|
||||
taosArrayPush(data, &v3);
|
||||
taosArrayPush(data, &v4);
|
||||
// taosArrayPush(data, &v2);
|
||||
// taosArrayPush(data, &v3);
|
||||
// taosArrayPush(data, &v4);
|
||||
|
||||
fObj->Put(data);
|
||||
for (size_t i = 0; i < taosArrayGetSize(data); i++) { destroyTFileValue(taosArrayGetP(data, i)); }
|
||||
|
@ -464,7 +461,7 @@ TEST_F(IndexTFileEnv, test_tfile_write) {
|
|||
|
||||
SArray* result = (SArray*)taosArrayInit(1, sizeof(uint64_t));
|
||||
fObj->Get(&query, result);
|
||||
assert(taosArrayGetSize(result) == 10);
|
||||
assert(taosArrayGetSize(result) == 200);
|
||||
indexTermDestroy(term);
|
||||
|
||||
// tfileWriterDestroy(twrite);
|
||||
|
@ -665,8 +662,8 @@ class IndexObj {
|
|||
}
|
||||
|
||||
~IndexObj() {
|
||||
indexClose(idx);
|
||||
indexCleanUp();
|
||||
indexClose(idx);
|
||||
}
|
||||
|
||||
private:
|
||||
|
@ -696,9 +693,9 @@ TEST_F(IndexEnv2, testIndexOpen) {
|
|||
exit(1);
|
||||
}
|
||||
|
||||
int targetSize = 100;
|
||||
int targetSize = 200;
|
||||
{
|
||||
std::string colName("tag1"), colVal("Hello world");
|
||||
std::string colName("tag1"), colVal("Hello");
|
||||
|
||||
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
|
||||
colVal.c_str(), colVal.size());
|
||||
|
@ -712,8 +709,8 @@ TEST_F(IndexEnv2, testIndexOpen) {
|
|||
indexMultiTermDestroy(terms);
|
||||
}
|
||||
{
|
||||
size_t size = 100;
|
||||
std::string colName("tag1"), colVal("hello world");
|
||||
size_t size = 200;
|
||||
std::string colName("tag1"), colVal("hello");
|
||||
|
||||
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
|
||||
colVal.c_str(), colVal.size());
|
||||
|
@ -728,7 +725,7 @@ TEST_F(IndexEnv2, testIndexOpen) {
|
|||
}
|
||||
|
||||
{
|
||||
std::string colName("tag1"), colVal("Hello world");
|
||||
std::string colName("tag1"), colVal("Hello");
|
||||
|
||||
SIndexMultiTermQuery* mq = indexMultiTermQueryCreate(MUST);
|
||||
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
|
||||
|
@ -747,7 +744,7 @@ TEST_F(IndexEnv2, testIndex_TrigeFlush) {
|
|||
if (index->Init(path) != 0) {}
|
||||
int numOfTable = 100 * 10000;
|
||||
index->WriteMillonData("tag1", "Hello world", numOfTable);
|
||||
int target = index->SearchOne("tag1", "Hellow world");
|
||||
int target = index->SearchOne("tag1", "Hello world");
|
||||
assert(numOfTable == target);
|
||||
}
|
||||
TEST_F(IndexEnv2, testIndex_serarch_cache_and_tfile) {
|
||||
|
|
Loading…
Reference in New Issue