From 7ca0a88ee274d25ecf6e5dadacf82b9569fe659e Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Fri, 24 Dec 2021 23:55:15 +0800 Subject: [PATCH 1/3] add skiplist add --- source/libs/index/test/indexTests.cc | 364 +++++++++++++++++++++------ 1 file changed, 286 insertions(+), 78 deletions(-) diff --git a/source/libs/index/test/indexTests.cc b/source/libs/index/test/indexTests.cc index 6c6f45bd11..de82350aad 100644 --- a/source/libs/index/test/indexTests.cc +++ b/source/libs/index/test/indexTests.cc @@ -16,10 +16,12 @@ #include #include "index.h" #include "indexInt.h" +#include "index_cache.h" #include "index_fst.h" #include "index_fst_counting_writer.h" #include "index_fst_util.h" #include "index_tfile.h" +#include "tskiplist.h" #include "tutil.h" using namespace std; class FstWriter { @@ -110,58 +112,6 @@ class FstReadMemory { size_t _size; }; -// TEST(IndexTest, index_create_test) { -// SIndexOpts *opts = indexOptsCreate(); -// SIndex *index = indexOpen(opts, "./test"); -// if (index == NULL) { -// std::cout << "index open failed" << std::endl; -// } -// -// -// // write -// for (int i = 0; i < 100000; i++) { -// SIndexMultiTerm* terms = indexMultiTermCreate(); -// std::string val = "field"; -// -// indexMultiTermAdd(terms, "tag1", strlen("tag1"), val.c_str(), val.size()); -// -// val.append(std::to_string(i)); -// indexMultiTermAdd(terms, "tag2", strlen("tag2"), val.c_str(), val.size()); -// -// val.insert(0, std::to_string(i)); -// indexMultiTermAdd(terms, "tag3", strlen("tag3"), val.c_str(), val.size()); -// -// val.append("const"); -// indexMultiTermAdd(terms, "tag4", strlen("tag4"), val.c_str(), val.size()); -// -// -// indexPut(index, terms, i); -// indexMultiTermDestroy(terms); -// } -// -// -// // query -// SIndexMultiTermQuery *multiQuery = indexMultiTermQueryCreate(MUST); -// -// indexMultiTermQueryAdd(multiQuery, "tag1", strlen("tag1"), "field", strlen("field"), QUERY_PREFIX); -// indexMultiTermQueryAdd(multiQuery, "tag3", strlen("tag3"), "0field0", strlen("0field0"), QUERY_TERM); -// -// SArray *result = (SArray *)taosArrayInit(10, sizeof(int)); -// indexSearch(index, multiQuery, result); -// -// std::cout << "taos'size : " << taosArrayGetSize(result) << std::endl; -// for (int i = 0; i < taosArrayGetSize(result); i++) { -// int *v = (int *)taosArrayGet(result, i); -// std::cout << "value --->" << *v << std::endl; -// } -// // add more test case -// indexMultiTermQueryDestroy(multiQuery); -// -// indexOptsDestroy(opts); -// indexClose(index); -// // -//} - #define L 100 #define M 100 #define N 100 @@ -421,6 +371,7 @@ class TFileObj { int colId_; }; + class IndexTFileEnv : public ::testing::Test { protected: virtual void SetUp() { @@ -428,22 +379,6 @@ class IndexTFileEnv : public ::testing::Test { taosMkDir(dir.c_str()); tfInit(); fObj = new TFileObj(dir, colName); - - // std::string colName("voltage"); - // header.suid = 1; - // header.version = 1; - // memcpy(header.colName, colName.c_str(), colName.size()); - // header.colType = TSDB_DATA_TYPE_BINARY; - - // std::string path(dir); - // int colId = 2; - // char buf[64] = {0}; - // sprintf(buf, "%" PRIu64 "-%d-%d.tindex", header.suid, colId, header.version); - // path.append("/").append(buf); - - // ctx = writerCtxCreate(TFile, path.c_str(), false, 64 * 1024 * 1024); - - // twrite = tfileWriterCreate(ctx, &header); } virtual void TearDown() { @@ -460,16 +395,8 @@ class IndexTFileEnv : public ::testing::Test { int coldId = 2; int version = 1; int colType = TSDB_DATA_TYPE_BINARY; - - // WriterCtx* ctx = NULL; - // TFileHeader header; - // TFileWriter* twrite = NULL; }; -// static TFileWriter* genTFileWriter(const char* path, TFileHeader* header) { -// char buf[128] = {0}; -// WriterCtx* ctx = writerCtxCreate(TFile, path, false, ) -//} static TFileValue* genTFileValue(const char* val) { TFileValue* tv = (TFileValue*)calloc(1, sizeof(TFileValue)); int32_t vlen = strlen(val) + 1; @@ -492,7 +419,7 @@ static void destroyTFileValue(void* val) { TEST_F(IndexTFileEnv, test_tfile_write) { TFileValue* v1 = genTFileValue("c"); - TFileValue* v2 = genTFileValue("a"); + TFileValue* v2 = genTFileValue("ab"); TFileValue* v3 = genTFileValue("b"); TFileValue* v4 = genTFileValue("d"); @@ -510,7 +437,7 @@ TEST_F(IndexTFileEnv, test_tfile_write) { taosArrayDestroy(data); std::string colName("voltage"); - std::string colVal("b"); + std::string colVal("ab"); SIndexTerm* term = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), colVal.c_str(), colVal.size()); SIndexTermQuery query = {.term = term, .qType = QUERY_TERM}; @@ -521,3 +448,284 @@ TEST_F(IndexTFileEnv, test_tfile_write) { // tfileWriterDestroy(twrite); } +class CacheObj { + public: + CacheObj() { + // TODO + cache = indexCacheCreate(); + } + int Put(SIndexTerm* term, int16_t colId, int32_t version, uint64_t uid) { + int ret = indexCachePut(cache, term, colId, version, uid); + if (ret != 0) { + // + std::cout << "failed to put into cache: " << ret << std::endl; + } + return ret; + } + int Get(SIndexTermQuery* query, int16_t colId, int32_t version, SArray* result, STermValueType* s) { + int ret = indexCacheSearch(cache, query, colId, version, result, s); + if (ret != 0) { + // + std::cout << "failed to get from cache:" << ret << std::endl; + } + return ret; + } + ~CacheObj() { + // TODO + indexCacheDestroy(cache); + } + + private: + IndexCache* cache = NULL; +}; + +class IndexCacheEnv : public ::testing::Test { + protected: + virtual void SetUp() { + // TODO + coj = new CacheObj(); + } + virtual void TearDown() { + delete coj; + // formate + } + CacheObj* coj; +}; + +TEST_F(IndexCacheEnv, cache_test) { + int count = 10; + + int16_t colId = 1; + int32_t version = 10; + uint64_t suid = 100; + std::string colName("voltage"); + std::string colVal("My God"); + for (size_t i = 0; i < count; i++) { + colVal += ('a' + i); + SIndexTerm* term = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), colVal.c_str(), colVal.size()); + coj->Put(term, colId, version, suid); + version++; + } + + // coj->Get(); +} + +typedef struct CTerm { + char buf[16]; + char version[8]; + int val; + int other; +} CTerm; +CTerm* cTermCreate(const char* str, const char* version, int val) { + CTerm* tm = (CTerm*)calloc(1, sizeof(CTerm)); + memcpy(tm->buf, str, strlen(str)); + memcpy(tm->version, version, strlen(version)); + tm->val = val; + tm->other = -100; + return tm; +} +int termCompar(const void* a, const void* b) { + printf("a: %s \t b: %s\n", (char*)a, (char*)b); + int ret = strncmp((char*)a, (char*)b, 16); + if (ret == 0) { + // + return strncmp((char*)a + 16, (char*)b + 16, 8); + } + return ret; +} + +int SerialTermTo(char* buf, CTerm* term) { + char* p = buf; + memcpy(buf, term->buf, sizeof(term->buf)); + buf += sizeof(term->buf); + + // memcpy(buf, term->version, sizeof(term->version)); + // buf += sizeof(term->version); + return buf - p; +} +static char* getTermKey(const void* pData) { + CTerm* p = (CTerm*)pData; + return (char*)p->buf; +} +#define MAX_TERM_KEY_LEN 128 +class SkiplistObj { + public: + // max_key_len: + // + SkiplistObj() { + slt = tSkipListCreate(MAX_SKIP_LIST_LEVEL, TSDB_DATA_TYPE_BINARY, MAX_TERM_KEY_LEN, termCompar, SL_ALLOW_DUP_KEY, getTermKey); + } + int Put(CTerm* term, uint64_t suid) { + char buf[MAX_TERM_KEY_LEN] = {0}; + int sz = SerialTermTo(buf, term); + + char* pBuf = (char*)calloc(1, sz + sizeof(suid)); + + memcpy(pBuf, buf, sz); + memcpy(pBuf + sz, &suid, sizeof(suid)); + // int32_t level, headsize; + // tSkipListNewNodeInfo(slt, &level, &headsize); + + // SSkipListNode* node = (SSkipListNode*)calloc(1, headsize + strlen(buf) + sizeof(suid)); + // node->level = level; + // char* d = (char*)SL_GET_NODE_DATA(node); + // memcpy(d, buf, strlen(buf)); + // memcpy(d + strlen(buf), &suid, sizeof(suid)); + SSkipListNode* node = tSkipListPut(slt, pBuf); + tSkipListPrint(slt, 1); + free(pBuf); + return 0; + } + + int Get(int key, char* buf, int version) { + // CTerm term; + // term.key = key; + //// term.version = version; + // memcpy(term.buf, buf, strlen(buf)); + + // char tbuf[128] = {0}; + // SerialTermTo(tbuf, &term); + + // SSkipListIterator* iter = tSkipListCreateIterFromVal(slt, tbuf, TSDB_DATA_TYPE_BINARY, TSDB_ORDER_ASC); + // SSkipListNode* node = tSkipListIterGet(iter); + // CTerm* ct = (CTerm*)SL_GET_NODE_DATA(node); + // printf("key: %d\t, version: %d\t, buf: %s\n", ct->key, ct->version, ct->buf); + // while (iter) { + // assert(tSkipListIterNext(iter) == true); + // SSkipListNode* node = tSkipListIterGet(iter); + // // ugly formate + // CTerm* t = (CTerm*)SL_GET_NODE_KEY(slt, node); + // printf("key: %d\t, version: %d\t, buf: %s\n", t->key, t->version, t->buf); + //} + return 0; + } + ~SkiplistObj() { + // TODO + // indexCacheDestroy(cache); + } + + private: + SSkipList* slt; +}; + +typedef struct KV { + int32_t k; + int32_t v; +} KV; +int kvCompare(const void* a, const void* b) { + int32_t av = *(int32_t*)a; + int32_t bv = *(int32_t*)b; + return av - bv; +} +char* getKVkey(const void* a) { + return (char*)(&(((KV*)a)->v)); + // KV* kv = (KV*)a; +} +int testKV() { + SSkipList* slt = tSkipListCreate(MAX_SKIP_LIST_LEVEL, TSDB_DATA_TYPE_BINARY, MAX_TERM_KEY_LEN, kvCompare, SL_DISCARD_DUP_KEY, getKVkey); + { + KV t = {.k = 1, .v = 5}; + tSkipListPut(slt, (void*)&t); + } + { + KV t = {.k = 2, .v = 3}; + tSkipListPut(slt, (void*)&t); + } + + KV value = {.k = 4, .v = 5}; + char* key = getKVkey(&value); + // const char* key = "Hello"; + SArray* arr = tSkipListGet(slt, (SSkipListKey)&key); + for (size_t i = 0; i < taosArrayGetSize(arr); i++) { + SSkipListNode* node = (SSkipListNode*)taosArrayGetP(arr, i); + int32_t* ct = (int32_t*)SL_GET_NODE_KEY(slt, node); + + printf("Get key: %d\n", *ct); + // SSkipListIterator* iter = tSkipListCreateIterFromVal(slt, tbuf, TSDB_DATA_TYPE_BINARY, TSDB_ORDER_ASC); + } + return 1; +} + +int testComplicate() { + SSkipList* slt = tSkipListCreate(MAX_SKIP_LIST_LEVEL, TSDB_DATA_TYPE_BINARY, MAX_TERM_KEY_LEN, termCompar, SL_ALLOW_DUP_KEY, getTermKey); + { + CTerm* tm = cTermCreate("val", "v1", 10); + tSkipListPut(slt, (char*)tm); + } + { + CTerm* tm = cTermCreate("val1", "v2", 2); + tSkipListPut(slt, (char*)tm); + } + { + CTerm* tm = cTermCreate("val3", "v3", -1); + tSkipListPut(slt, (char*)tm); + } + { + CTerm* tm = cTermCreate("val3", "v4", 2); + tSkipListPut(slt, (char*)tm); + } + { + CTerm* tm = cTermCreate("val3", "v5", -1); + char* key = getTermKey(tm); + SArray* arr = tSkipListGet(slt, (SSkipListKey)key); + for (size_t i = 0; i < taosArrayGetSize(arr); i++) { + SSkipListNode* node = (SSkipListNode*)taosArrayGetP(arr, i); + CTerm* ct = (CTerm*)SL_GET_NODE_KEY(slt, node); + printf("other; %d\tbuf: %s\t, version: %s, val: %d\n", ct->other, ct->buf, ct->version, ct->val); + // SSkipListIterator* iter = tSkipListCreateIterFromVal(slt, tbuf, TSDB_DATA_TYPE_BINARY, TSDB_ORDER_ASC); + } + free(tm); + taosArrayDestroy(arr); + } + return 1; +} +int strCompare(const void* a, const void* b) { + const char* sa = (char*)a; + const char* sb = (char*)b; + return strcmp(sa, sb); +} +void testString() { + SSkipList* slt = tSkipListCreate(MAX_SKIP_LIST_LEVEL, TSDB_DATA_TYPE_BINARY, MAX_TERM_KEY_LEN, strCompare, SL_ALLOW_DUP_KEY, getTermKey); + { + tSkipListPut(slt, (void*)"Hello"); + tSkipListPut(slt, (void*)"World"); + tSkipListPut(slt, (void*)"YI"); + } + + const char* key = "YI"; + SArray* arr = tSkipListGet(slt, (SSkipListKey)key); + for (size_t i = 0; i < taosArrayGetSize(arr); i++) { + SSkipListNode* node = (SSkipListNode*)taosArrayGetP(arr, i); + char* ct = (char*)SL_GET_NODE_KEY(slt, node); + printf("Get key: %s\n", ct); + // SSkipListIterator* iter = tSkipListCreateIterFromVal(slt, tbuf, TSDB_DATA_TYPE_BINARY, TSDB_ORDER_ASC); + } +} +// class IndexSkip : public ::testing::Test { +// protected: +// virtual void SetUp() { +// // TODO +// sObj = new SkiplistObj(); +// } +// virtual void TearDown() { +// delete sObj; +// // formate +// } +// SkiplistObj* sObj; +//}; + +// TEST_F(IndexSkip, skip_test) { +// std::string val("Hello"); +// std::string minVal = val; +// for (size_t i = 0; i < 10; i++) { +// CTerm* t = (CTerm*)calloc(1, sizeof(CTerm)); +// t->key = 1; +// t->version = i; +// +// val[val.size() - 1] = 'a' + i; +// memcpy(t->buf, val.c_str(), val.size()); +// sObj->Put(t, 10); +// free(t); +// } +// sObj->Get(1, (char*)(minVal.c_str()), 1000000); +//} From a724e85b495911cbb02c25c6edf9f79526887a11 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Fri, 24 Dec 2021 23:56:53 +0800 Subject: [PATCH 2/3] add skiplist test --- source/libs/index/test/indexTests.cc | 1 + 1 file changed, 1 insertion(+) diff --git a/source/libs/index/test/indexTests.cc b/source/libs/index/test/indexTests.cc index de82350aad..18ad37de1f 100644 --- a/source/libs/index/test/indexTests.cc +++ b/source/libs/index/test/indexTests.cc @@ -578,6 +578,7 @@ class SkiplistObj { } int Get(int key, char* buf, int version) { + // TODO // CTerm term; // term.key = key; //// term.version = version; From 0365c67b81af64cffb9056d15397046acbd7ad66 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Sat, 25 Dec 2021 19:16:05 +0800 Subject: [PATCH 3/3] Combine the search results of cache and tfile And Update cache code --- source/libs/index/inc/index_cache.h | 12 + source/libs/index/src/index.c | 4 +- source/libs/index/src/index_cache.c | 201 +++++++++------ source/libs/index/src/index_tfile.c | 4 +- source/libs/index/test/indexTests.cc | 370 ++++++++------------------- 5 files changed, 247 insertions(+), 344 deletions(-) diff --git a/source/libs/index/inc/index_cache.h b/source/libs/index/inc/index_cache.h index fb4f478ae9..692edcc064 100644 --- a/source/libs/index/inc/index_cache.h +++ b/source/libs/index/inc/index_cache.h @@ -37,6 +37,17 @@ typedef struct IndexCache { SSkipList* skiplist; } IndexCache; +typedef struct CacheTerm { + // key + int32_t colId; + int32_t nColVal; + char* colVal; + int32_t version; + // value + uint64_t uid; + int8_t colType; + SIndexOperOnColumn operaType; +} CacheTerm; // IndexCache* indexCacheCreate(); @@ -47,6 +58,7 @@ int indexCachePut(void* cache, SIndexTerm* term, int16_t colId, int32_t version, // int indexCacheGet(void *cache, uint64_t *rst); int indexCacheSearch(void* cache, SIndexTermQuery* query, int16_t colId, int32_t version, SArray* result, STermValueType* s); +void indexCacheDebug(IndexCache* cache); #ifdef __cplusplus } #endif diff --git a/source/libs/index/src/index.c b/source/libs/index/src/index.c index f0546afaf5..1c65dd03d5 100644 --- a/source/libs/index/src/index.c +++ b/source/libs/index/src/index.c @@ -41,7 +41,7 @@ static pthread_once_t isInit = PTHREAD_ONCE_INIT; static void indexInit(); static int indexTermSearch(SIndex* sIdx, SIndexTermQuery* term, SArray** result); -static int indexFlushCacheToTindex(SIndex* sIdx); +static int indexFlushCacheTFile(SIndex* sIdx); static void indexInterResultsDestroy(SArray* results); static int indexMergeFinalResults(SArray* interResults, EIndexOperatorType oType, SArray* finalResult); @@ -353,7 +353,7 @@ static int indexMergeFinalResults(SArray* interResults, EIndexOperatorType oType } return 0; } -static int indexFlushCacheToTindex(SIndex* sIdx) { +static int indexFlushCacheTFile(SIndex* sIdx) { if (sIdx == NULL) { return -1; } indexWarn("suid %" PRIu64 " merge cache into tindex", sIdx->suid); diff --git a/source/libs/index/src/index_cache.c b/source/libs/index/src/index_cache.c index bb6a5e048a..9c92203088 100644 --- a/source/libs/index/src/index_cache.c +++ b/source/libs/index/src/index_cache.c @@ -20,81 +20,88 @@ #define MAX_INDEX_KEY_LEN 256 // test only, change later // ref index_cache.h:22 -#define CACHE_KEY_LEN(p) \ - (sizeof(int32_t) + sizeof(uint16_t) + sizeof(p->colType) + sizeof(p->nColVal) + p->nColVal + sizeof(uint64_t) + sizeof(p->operType)) +//#define CACHE_KEY_LEN(p) \ +// (sizeof(int32_t) + sizeof(uint16_t) + sizeof(p->colType) + sizeof(p->nColVal) + p->nColVal + sizeof(uint64_t) + sizeof(p->operType)) -static char* getIndexKey(const void* pData) { - return NULL; +static void cacheTermDestroy(CacheTerm* ct) { + if (ct == NULL) { return; } + + free(ct->colVal); + free(ct); } +static char* getIndexKey(const void* pData) { + CacheTerm* p = (CacheTerm*)pData; + return (char*)p; +} + static int32_t compareKey(const void* l, const void* r) { - char* lp = (char*)l; - char* rp = (char*)r; + CacheTerm* lt = (CacheTerm*)l; + CacheTerm* rt = (CacheTerm*)r; - // skip total len, not compare - int32_t ll, rl; // len - memcpy(&ll, lp, sizeof(int32_t)); - memcpy(&rl, rp, sizeof(int32_t)); - lp += sizeof(int32_t); - rp += sizeof(int32_t); + // compare colId + if (lt->colId != rt->colId) { return lt->colId - rt->colId; } - // compare field id - int16_t lf, rf; // field id - memcpy(&lf, lp, sizeof(lf)); - memcpy(&rf, rp, sizeof(rf)); - if (lf != rf) { return lf < rf ? -1 : 1; } - lp += sizeof(lf); - rp += sizeof(rf); - - // compare field type - int8_t lft, rft; - memcpy(&lft, lp, sizeof(lft)); - memcpy(&rft, rp, sizeof(rft)); - lp += sizeof(lft); - rp += sizeof(rft); - assert(rft == rft); - - // skip value len - int32_t lfl, rfl; - memcpy(&lfl, lp, sizeof(lfl)); - memcpy(&rfl, rp, sizeof(rfl)); - lp += sizeof(lfl); - rp += sizeof(rfl); - - // compare value - int32_t i, j; - for (i = 0, j = 0; i < lfl && j < rfl; i++, j++) { - if (lp[i] == rp[j]) { + // compare colVal + int i, j; + for (i = 0, j = 0; i < lt->nColVal && j < rt->nColVal; i++, j++) { + if (lt->colVal[i] == rt->colVal[j]) { continue; } else { - return lp[i] < rp[j] ? -1 : 1; + return lt->colVal[i] < rt->colVal[j] ? -1 : 1; } } - if (i < lfl) { + if (i < lt->nColVal) { return 1; - } else if (j < rfl) { + } else if (j < rt->nColVal) { return -1; } - lp += lfl; - rp += rfl; + // compare version - // skip uid - uint64_t lu, ru; - memcpy(&lu, lp, sizeof(lu)); - memcpy(&ru, rp, sizeof(ru)); - lp += sizeof(lu); - rp += sizeof(ru); + return rt->version - lt->version; - // compare version, desc order - int32_t lv, rv; - memcpy(&lv, lp, sizeof(lv)); - memcpy(&rv, rp, sizeof(rv)); - if (lv != rv) { return lv > rv ? -1 : 1; } + // char* lp = (char*)l; + // char* rp = (char*)r; - lp += sizeof(lv); - rp += sizeof(rv); - // not care item type + //// compare col id + // int16_t lf, rf; // cold id + // memcpy(&lf, lp, sizeof(lf)); + // memcpy(&rf, rp, sizeof(rf)); + // if (lf != rf) { return lf < rf ? -1 : 1; } - return 0; + // lp += sizeof(lf); + // rp += sizeof(rf); + + //// skip value len + // int32_t lfl, rfl; + // memcpy(&lfl, lp, sizeof(lfl)); + // memcpy(&rfl, rp, sizeof(rfl)); + // lp += sizeof(lfl); + // rp += sizeof(rfl); + + //// compare value + // int32_t i, j; + // for (i = 0, j = 0; i < lfl && j < rfl; i++, j++) { + // if (lp[i] == rp[j]) { + // continue; + // } else { + // return lp[i] < rp[j] ? -1 : 1; + // } + //} + // if (i < lfl) { + // return 1; + //} else if (j < rfl) { + // return -1; + //} + // lp += lfl; + // rp += rfl; + + //// compare version, desc order + // int32_t lv, rv; + // memcpy(&lv, lp, sizeof(lv)); + // memcpy(&rv, rp, sizeof(rv)); + // if (lv != rv) { return lv < rv ? 1 : -1; } + + // return 0; } IndexCache* indexCacheCreate() { IndexCache* cache = calloc(1, sizeof(IndexCache)); @@ -106,6 +113,18 @@ IndexCache* indexCacheCreate() { tSkipListCreate(MAX_SKIP_LIST_LEVEL, TSDB_DATA_TYPE_BINARY, MAX_INDEX_KEY_LEN, compareKey, SL_ALLOW_DUP_KEY, getIndexKey); return cache; } +void indexCacheDebug(IndexCache* cache) { + SSkipListIterator* iter = tSkipListCreateIter(cache->skiplist); + while (tSkipListIterNext(iter)) { + SSkipListNode* node = tSkipListIterGet(iter); + CacheTerm* ct = (CacheTerm*)SL_GET_NODE_DATA(node); + if (ct != NULL) { + // TODO, add more debug info + indexInfo("{colId:%d, colVal: %s, version: %d} \t", ct->colId, ct->colVal, ct->version); + } + } + tSkipListDestroyIter(iter); +} void indexCacheDestroy(void* cache) { IndexCache* pCache = cache; @@ -119,24 +138,20 @@ int indexCachePut(void* cache, SIndexTerm* term, int16_t colId, int32_t version, IndexCache* pCache = cache; // encode data - int32_t total = CACHE_KEY_LEN(term); + CacheTerm* ct = calloc(1, sizeof(CacheTerm)); + if (cache == NULL) { return -1; } + // set up key + ct->colId = colId; + ct->colType = term->colType; + ct->nColVal = term->nColVal; + ct->colVal = (char*)calloc(1, sizeof(char) * (ct->nColVal + 1)); + memcpy(ct->colVal, term->colVal, ct->nColVal); + ct->version = version; - char* buf = calloc(1, total); - char* p = buf; + ct->uid = uid; + ct->operaType = term->operType; - SERIALIZE_VAR_TO_BUF(p, total, int32_t); - SERIALIZE_VAR_TO_BUF(p, colId, int16_t); - - SERIALIZE_MEM_TO_BUF(p, term, colType); - SERIALIZE_MEM_TO_BUF(p, term, nColVal); - SERIALIZE_STR_MEM_TO_BUF(p, term, colVal, term->nColVal); - - SERIALIZE_VAR_TO_BUF(p, version, int32_t); - SERIALIZE_VAR_TO_BUF(p, uid, uint64_t); - - SERIALIZE_MEM_TO_BUF(p, term, operType); - - tSkipListPut(pCache->skiplist, (void*)buf); + tSkipListPut(pCache->skiplist, (char*)ct); return 0; // encode end } @@ -150,8 +165,39 @@ int indexCacheSearch(void* cache, SIndexTermQuery* query, int16_t colId, int32_t SIndexTerm* term = query->term; EIndexQueryType qtype = query->qType; - int32_t keyLen = CACHE_KEY_LEN(term); - char* buf = calloc(1, keyLen); + CacheTerm* ct = calloc(1, sizeof(CacheTerm)); + if (ct == NULL) { return -1; } + ct->colId = colId; + ct->nColVal = term->nColVal; + ct->colVal = calloc(1, sizeof(char) * (ct->nColVal + 1)); + memcpy(ct->colVal, term->colVal, ct->nColVal); + ct->version = version; + + char* key = getIndexKey(ct); + // TODO handle multi situation later, and refactor + SSkipListIterator* iter = tSkipListCreateIterFromVal(pCache->skiplist, key, TSDB_DATA_TYPE_BINARY, TSDB_ORDER_ASC); + while (tSkipListIterNext(iter)) { + SSkipListNode* node = tSkipListIterGet(iter); + if (node != NULL) { + CacheTerm* c = (CacheTerm*)SL_GET_NODE_DATA(node); + if (c->operaType == ADD_VALUE || qtype == QUERY_TERM) { + if (c->nColVal == ct->nColVal && strncmp(c->colVal, ct->colVal, c->nColVal) == 0) { + taosArrayPush(result, &c->uid); + *s = kTypeValue; + } else { + break; + } + } else if (c->operaType == DEL_VALUE) { + // table is del, not need + *s = kTypeDeletion; + break; + } + } + } + tSkipListDestroyIter(iter); + cacheTermDestroy(ct); + // int32_t keyLen = CACHE_KEY_LEN(term); + // char* buf = calloc(1, keyLen); if (qtype == QUERY_TERM) { // } else if (qtype == QUERY_PREFIX) { @@ -161,6 +207,5 @@ int indexCacheSearch(void* cache, SIndexTermQuery* query, int16_t colId, int32_t } else if (qtype == QUERY_REGEX) { // } - return 0; } diff --git a/source/libs/index/src/index_tfile.c b/source/libs/index/src/index_tfile.c index 9afc29457d..003ae86c6a 100644 --- a/source/libs/index/src/index_tfile.c +++ b/source/libs/index/src/index_tfile.c @@ -462,7 +462,9 @@ static int tfileCompare(const void* a, const void* b) { size_t aLen = strlen(aName); size_t bLen = strlen(bName); - return strncmp(aName, bName, aLen > bLen ? aLen : bLen); + int ret = strncmp(aName, bName, aLen > bLen ? aLen : bLen); + if (ret == 0) { return ret; } + return ret < 0 ? -1 : 1; } // tfile name suid-colId-version.tindex static void tfileGenFileName(char* filename, uint64_t suid, int colId, int version) { diff --git a/source/libs/index/test/indexTests.cc b/source/libs/index/test/indexTests.cc index 18ad37de1f..bed2b82daa 100644 --- a/source/libs/index/test/indexTests.cc +++ b/source/libs/index/test/indexTests.cc @@ -24,6 +24,22 @@ #include "tskiplist.h" #include "tutil.h" using namespace std; +class DebugInfo { + public: + DebugInfo(const char* str) : info(str) { + std::cout << "------------" << info << "\t" + << "begin" + << "-------------" << std::endl; + } + ~DebugInfo() { + std::cout << "-----------" << info << "\t" + << "end" + << "--------------" << std::endl; + } + + private: + std::string info; +}; class FstWriter { public: FstWriter() { @@ -133,7 +149,6 @@ int Performance_fstWriteRecords(FstWriter* b) { } return L * M * N; } - void Performance_fstReadRecords(FstReadMemory* m) { std::string str("aa"); for (int i = 0; i < M; i++) { @@ -168,7 +183,6 @@ void checkFstPerf() { Performance_fstReadRecords(m); delete m; } - void checkFstPrefixSearch() { FstWriter* fw = new FstWriter; int64_t s = taosGetTimestampUs(); @@ -246,7 +260,6 @@ void validateFst() { } delete m; } - class IndexEnv : public ::testing::Test { protected: virtual void SetUp() { @@ -265,44 +278,51 @@ class IndexEnv : public ::testing::Test { SIndex* index; }; -// TEST_F(IndexEnv, testPut) { -// // single index column -// { -// std::string colName("tag1"), colVal("Hello world"); -// SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), colVal.c_str(), -// colVal.size()); SIndexMultiTerm* terms = indexMultiTermCreate(); indexMultiTermAdd(terms, term); -// -// for (size_t i = 0; i < 100; i++) { -// int tableId = i; -// int ret = indexPut(index, terms, tableId); -// assert(ret == 0); -// } -// indexMultiTermDestroy(terms); -// } -// // multi index column -// { +/// TEST_F(IndexEnv, testPut) { +// / // single index column +// / { +// / std::string colName("tag1"), colVal("Hello world"); +// / SIndexTerm* term = +// indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), colVal.c_str(), / colVal.size()); // SIndexMultiTerm* terms = indexMultiTermCreate(); -// { -// std::string colName("tag1"), colVal("Hello world"); -// SIndexTerm* term = -// indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), colVal.c_str(), colVal.size()); -// indexMultiTermAdd(terms, term); +// indexMultiTermAdd(terms, term); +// / / for (size_t i = 0; i < 100; i++) { +// / int tableId = i; +// / int ret = indexPut(index, terms, tableId); +// / assert(ret == 0); +// / // } -// { -// std::string colName("tag2"), colVal("Hello world"); -// SIndexTerm* term = -// indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), colVal.c_str(), colVal.size()); -// indexMultiTermAdd(terms, term); -// } -// -// for (int i = 0; i < 100; i++) { -// int tableId = i; -// int ret = indexPut(index, terms, tableId); -// assert(ret == 0); -// } -// indexMultiTermDestroy(terms); +// / indexMultiTermDestroy(terms); +// / // } -// // +// / // multi index column +// / { +// / SIndexMultiTerm* terms = indexMultiTermCreate(); +// / { +// / std::string colName("tag1"), colVal("Hello world"); +// / SIndexTerm* term = +// / indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), colVal.c_str(), colVal.size()); +// / indexMultiTermAdd(terms, term); +// / +// } +// / { +// / std::string colName("tag2"), colVal("Hello world"); +// / SIndexTerm* term = +// / indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), colVal.c_str(), colVal.size()); +// / indexMultiTermAdd(terms, term); +// / +// } +// / / for (int i = 0; i < 100; i++) { +// / int tableId = i; +// / int ret = indexPut(index, terms, tableId); +// / assert(ret == 0); +// / +// } +// / indexMultiTermDestroy(terms); +// / +// } +// / // +// / //} class TFileObj { @@ -416,7 +436,6 @@ static void destroyTFileValue(void* val) { taosArrayDestroy(tv->tableId); free(tv); } - TEST_F(IndexTFileEnv, test_tfile_write) { TFileValue* v1 = genTFileValue("c"); TFileValue* v2 = genTFileValue("ab"); @@ -492,241 +511,66 @@ class IndexCacheEnv : public ::testing::Test { CacheObj* coj; }; -TEST_F(IndexCacheEnv, cache_test) { - int count = 10; - - int16_t colId = 1; - int32_t version = 10; - uint64_t suid = 100; - std::string colName("voltage"); - std::string colVal("My God"); - for (size_t i = 0; i < count; i++) { - colVal += ('a' + i); - SIndexTerm* term = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), colVal.c_str(), colVal.size()); - coj->Put(term, colId, version, suid); - version++; - } - - // coj->Get(); -} - -typedef struct CTerm { - char buf[16]; - char version[8]; - int val; - int other; -} CTerm; -CTerm* cTermCreate(const char* str, const char* version, int val) { - CTerm* tm = (CTerm*)calloc(1, sizeof(CTerm)); - memcpy(tm->buf, str, strlen(str)); - memcpy(tm->version, version, strlen(version)); - tm->val = val; - tm->other = -100; - return tm; -} -int termCompar(const void* a, const void* b) { - printf("a: %s \t b: %s\n", (char*)a, (char*)b); - int ret = strncmp((char*)a, (char*)b, 16); - if (ret == 0) { - // - return strncmp((char*)a + 16, (char*)b + 16, 8); - } - return ret; -} - -int SerialTermTo(char* buf, CTerm* term) { - char* p = buf; - memcpy(buf, term->buf, sizeof(term->buf)); - buf += sizeof(term->buf); - - // memcpy(buf, term->version, sizeof(term->version)); - // buf += sizeof(term->version); - return buf - p; -} -static char* getTermKey(const void* pData) { - CTerm* p = (CTerm*)pData; - return (char*)p->buf; -} #define MAX_TERM_KEY_LEN 128 -class SkiplistObj { - public: - // max_key_len: - // - SkiplistObj() { - slt = tSkipListCreate(MAX_SKIP_LIST_LEVEL, TSDB_DATA_TYPE_BINARY, MAX_TERM_KEY_LEN, termCompar, SL_ALLOW_DUP_KEY, getTermKey); - } - int Put(CTerm* term, uint64_t suid) { - char buf[MAX_TERM_KEY_LEN] = {0}; - int sz = SerialTermTo(buf, term); +TEST_F(IndexCacheEnv, cache_test) { + int version = 0; + int16_t colId = 0; - char* pBuf = (char*)calloc(1, sz + sizeof(suid)); - - memcpy(pBuf, buf, sz); - memcpy(pBuf + sz, &suid, sizeof(suid)); - // int32_t level, headsize; - // tSkipListNewNodeInfo(slt, &level, &headsize); - - // SSkipListNode* node = (SSkipListNode*)calloc(1, headsize + strlen(buf) + sizeof(suid)); - // node->level = level; - // char* d = (char*)SL_GET_NODE_DATA(node); - // memcpy(d, buf, strlen(buf)); - // memcpy(d + strlen(buf), &suid, sizeof(suid)); - SSkipListNode* node = tSkipListPut(slt, pBuf); - tSkipListPrint(slt, 1); - free(pBuf); - return 0; - } - - int Get(int key, char* buf, int version) { - // TODO - // CTerm term; - // term.key = key; - //// term.version = version; - // memcpy(term.buf, buf, strlen(buf)); - - // char tbuf[128] = {0}; - // SerialTermTo(tbuf, &term); - - // SSkipListIterator* iter = tSkipListCreateIterFromVal(slt, tbuf, TSDB_DATA_TYPE_BINARY, TSDB_ORDER_ASC); - // SSkipListNode* node = tSkipListIterGet(iter); - // CTerm* ct = (CTerm*)SL_GET_NODE_DATA(node); - // printf("key: %d\t, version: %d\t, buf: %s\n", ct->key, ct->version, ct->buf); - // while (iter) { - // assert(tSkipListIterNext(iter) == true); - // SSkipListNode* node = tSkipListIterGet(iter); - // // ugly formate - // CTerm* t = (CTerm*)SL_GET_NODE_KEY(slt, node); - // printf("key: %d\t, version: %d\t, buf: %s\n", t->key, t->version, t->buf); - //} - return 0; - } - ~SkiplistObj() { - // TODO - // indexCacheDestroy(cache); - } - - private: - SSkipList* slt; -}; - -typedef struct KV { - int32_t k; - int32_t v; -} KV; -int kvCompare(const void* a, const void* b) { - int32_t av = *(int32_t*)a; - int32_t bv = *(int32_t*)b; - return av - bv; -} -char* getKVkey(const void* a) { - return (char*)(&(((KV*)a)->v)); - // KV* kv = (KV*)a; -} -int testKV() { - SSkipList* slt = tSkipListCreate(MAX_SKIP_LIST_LEVEL, TSDB_DATA_TYPE_BINARY, MAX_TERM_KEY_LEN, kvCompare, SL_DISCARD_DUP_KEY, getKVkey); + uint64_t suid = 0; + std::string colName("voltage"); { - KV t = {.k = 1, .v = 5}; - tSkipListPut(slt, (void*)&t); + std::string colVal("v1"); + SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), colVal.c_str(), colVal.size()); + coj->Put(term, colId, version++, suid++); } { - KV t = {.k = 2, .v = 3}; - tSkipListPut(slt, (void*)&t); + std::string colVal("v3"); + SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), colVal.c_str(), colVal.size()); + coj->Put(term, colId, version++, suid++); + } + { + std::string colVal("v2"); + SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), colVal.c_str(), colVal.size()); + coj->Put(term, colId, version++, suid++); + } + { + std::string colVal("v3"); + SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), colVal.c_str(), colVal.size()); + coj->Put(term, colId, version++, suid++); + } + { + std::string colVal("v3"); + SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), colVal.c_str(), colVal.size()); + coj->Put(term, colId, version++, suid++); } - KV value = {.k = 4, .v = 5}; - char* key = getKVkey(&value); - // const char* key = "Hello"; - SArray* arr = tSkipListGet(slt, (SSkipListKey)&key); - for (size_t i = 0; i < taosArrayGetSize(arr); i++) { - SSkipListNode* node = (SSkipListNode*)taosArrayGetP(arr, i); - int32_t* ct = (int32_t*)SL_GET_NODE_KEY(slt, node); - - printf("Get key: %d\n", *ct); - // SSkipListIterator* iter = tSkipListCreateIterFromVal(slt, tbuf, TSDB_DATA_TYPE_BINARY, TSDB_ORDER_ASC); - } - return 1; -} - -int testComplicate() { - SSkipList* slt = tSkipListCreate(MAX_SKIP_LIST_LEVEL, TSDB_DATA_TYPE_BINARY, MAX_TERM_KEY_LEN, termCompar, SL_ALLOW_DUP_KEY, getTermKey); { - CTerm* tm = cTermCreate("val", "v1", 10); - tSkipListPut(slt, (char*)tm); - } - { - CTerm* tm = cTermCreate("val1", "v2", 2); - tSkipListPut(slt, (char*)tm); - } - { - CTerm* tm = cTermCreate("val3", "v3", -1); - tSkipListPut(slt, (char*)tm); - } - { - CTerm* tm = cTermCreate("val3", "v4", 2); - tSkipListPut(slt, (char*)tm); - } - { - CTerm* tm = cTermCreate("val3", "v5", -1); - char* key = getTermKey(tm); - SArray* arr = tSkipListGet(slt, (SSkipListKey)key); - for (size_t i = 0; i < taosArrayGetSize(arr); i++) { - SSkipListNode* node = (SSkipListNode*)taosArrayGetP(arr, i); - CTerm* ct = (CTerm*)SL_GET_NODE_KEY(slt, node); - printf("other; %d\tbuf: %s\t, version: %s, val: %d\n", ct->other, ct->buf, ct->version, ct->val); - // SSkipListIterator* iter = tSkipListCreateIterFromVal(slt, tbuf, TSDB_DATA_TYPE_BINARY, TSDB_ORDER_ASC); + std::string colVal("v4"); + for (size_t i = 0; i < 100; i++) { + colVal[colVal.size() - 1] = 'a' + i; + SIndexTerm* term = + indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), colVal.c_str(), colVal.size()); + coj->Put(term, colId, version++, suid++); } - free(tm); - taosArrayDestroy(arr); } - return 1; -} -int strCompare(const void* a, const void* b) { - const char* sa = (char*)a; - const char* sb = (char*)b; - return strcmp(sa, sb); -} -void testString() { - SSkipList* slt = tSkipListCreate(MAX_SKIP_LIST_LEVEL, TSDB_DATA_TYPE_BINARY, MAX_TERM_KEY_LEN, strCompare, SL_ALLOW_DUP_KEY, getTermKey); { - tSkipListPut(slt, (void*)"Hello"); - tSkipListPut(slt, (void*)"World"); - tSkipListPut(slt, (void*)"YI"); - } + std::string colVal("v3"); + SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), colVal.c_str(), colVal.size()); + SIndexTermQuery query = {.term = term, .qType = QUERY_TERM}; + SArray* ret = (SArray*)taosArrayInit(4, sizeof(suid)); + STermValueType valType; - const char* key = "YI"; - SArray* arr = tSkipListGet(slt, (SSkipListKey)key); - for (size_t i = 0; i < taosArrayGetSize(arr); i++) { - SSkipListNode* node = (SSkipListNode*)taosArrayGetP(arr, i); - char* ct = (char*)SL_GET_NODE_KEY(slt, node); - printf("Get key: %s\n", ct); - // SSkipListIterator* iter = tSkipListCreateIterFromVal(slt, tbuf, TSDB_DATA_TYPE_BINARY, TSDB_ORDER_ASC); + coj->Get(&query, colId, 10000, ret, &valType); + assert(taosArrayGetSize(ret) == 3); + } + { + std::string colVal("v2"); + SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), colVal.c_str(), colVal.size()); + SIndexTermQuery query = {.term = term, .qType = QUERY_TERM}; + SArray* ret = (SArray*)taosArrayInit(4, sizeof(suid)); + STermValueType valType; + + coj->Get(&query, colId, 10000, ret, &valType); + assert(taosArrayGetSize(ret) == 1); } } -// class IndexSkip : public ::testing::Test { -// protected: -// virtual void SetUp() { -// // TODO -// sObj = new SkiplistObj(); -// } -// virtual void TearDown() { -// delete sObj; -// // formate -// } -// SkiplistObj* sObj; -//}; - -// TEST_F(IndexSkip, skip_test) { -// std::string val("Hello"); -// std::string minVal = val; -// for (size_t i = 0; i < 10; i++) { -// CTerm* t = (CTerm*)calloc(1, sizeof(CTerm)); -// t->key = 1; -// t->version = i; -// -// val[val.size() - 1] = 'a' + i; -// memcpy(t->buf, val.c_str(), val.size()); -// sObj->Put(t, 10); -// free(t); -// } -// sObj->Get(1, (char*)(minVal.c_str()), 1000000); -//}