diff --git a/source/libs/index/inc/index_cache.h b/source/libs/index/inc/index_cache.h index 0e7405869a..12b66bca2c 100644 --- a/source/libs/index/inc/index_cache.h +++ b/source/libs/index/inc/index_cache.h @@ -30,14 +30,18 @@ extern "C" { #endif +typedef struct MemTable { + T_REF_DECLARE() + SSkipList* mem; +} MemTable; typedef struct IndexCache { T_REF_DECLARE() - SSkipList *mem, *imm; - SIndex* index; - char* colName; - int32_t version; - int32_t nTerm; - int8_t type; + MemTable *mem, *imm; + SIndex* index; + char* colName; + int32_t version; + int32_t nTerm; + int8_t type; pthread_mutex_t mtx; } IndexCache; @@ -45,7 +49,6 @@ typedef struct IndexCache { #define CACHE_VERSION(cache) atomic_load_32(&cache->version) typedef struct CacheTerm { // key - int32_t nColVal; char* colVal; int32_t version; // value diff --git a/source/libs/index/src/index.c b/source/libs/index/src/index.c index 06e7e8ba44..b78c4ff258 100644 --- a/source/libs/index/src/index.c +++ b/source/libs/index/src/index.c @@ -34,9 +34,7 @@ int32_t indexInit() { return indexQhandle == NULL ? -1 : 0; // do nothing } -void indexCleanUp() { - taosCleanUpScheduler(indexQhandle); -} +void indexCleanUp() { taosCleanUpScheduler(indexQhandle); } static int uidCompare(const void* a, const void* b) { uint64_t u1 = *(uint64_t*)a; @@ -63,7 +61,9 @@ static int indexMergeFinalResults(SArray* interResults, EIndexOperatorType oTyp int indexOpen(SIndexOpts* opts, const char* path, SIndex** index) { // pthread_once(&isInit, indexInit); SIndex* sIdx = calloc(1, sizeof(SIndex)); - if (sIdx == NULL) { return -1; } + if (sIdx == NULL) { + return -1; + } #ifdef USE_LUCENE index_t* index = index_open(path); @@ -99,7 +99,9 @@ void indexClose(SIndex* sIdx) { void* iter = taosHashIterate(sIdx->colObj, NULL); while (iter) { IndexCache** pCache = iter; - if (*pCache) { indexCacheUnRef(*pCache); } + if (*pCache) { + indexCacheUnRef(*pCache); + } iter = taosHashIterate(sIdx->colObj, iter); } taosHashCleanup(sIdx->colObj); @@ -133,7 +135,7 @@ int indexPut(SIndex* index, SIndexMultiTerm* fVals, uint64_t uid) { for (int i = 0; i < taosArrayGetSize(fVals); i++) { SIndexTerm* p = taosArrayGetP(fVals, i); IndexCache** cache = taosHashGet(index->colObj, p->colName, p->nColName); - if (*cache == NULL) { + if (cache == NULL) { IndexCache* pCache = indexCacheCreate(index, p->colName, p->colType); taosHashPut(index->colObj, p->colName, p->nColName, &pCache, sizeof(void*)); } @@ -143,10 +145,11 @@ int indexPut(SIndex* index, SIndexMultiTerm* fVals, uint64_t uid) { for (int i = 0; i < taosArrayGetSize(fVals); i++) { SIndexTerm* p = taosArrayGetP(fVals, i); IndexCache** cache = taosHashGet(index->colObj, p->colName, p->nColName); - assert(*cache != NULL); int ret = indexCachePut(*cache, p, uid); - if (ret != 0) { return ret; } + if (ret != 0) { + return ret; + } } #endif @@ -224,17 +227,20 @@ SIndexOpts* indexOptsCreate() { #endif return NULL; } -void indexOptsDestroy(SIndexOpts* opts){ +void indexOptsDestroy(SIndexOpts* opts) { #ifdef USE_LUCENE #endif -} /* - * @param: oper - * - */ - + return; +} +/* + * @param: oper + * + */ SIndexMultiTermQuery* indexMultiTermQueryCreate(EIndexOperatorType opera) { SIndexMultiTermQuery* p = (SIndexMultiTermQuery*)malloc(sizeof(SIndexMultiTermQuery)); - if (p == NULL) { return NULL; } + if (p == NULL) { + return NULL; + } p->opera = opera; p->query = taosArrayInit(4, sizeof(SIndexTermQuery)); return p; @@ -253,15 +259,12 @@ int indexMultiTermQueryAdd(SIndexMultiTermQuery* pQuery, SIndexTerm* term, EInde return 0; } -SIndexTerm* indexTermCreate(int64_t suid, - SIndexOperOnColumn oper, - uint8_t colType, - const char* colName, - int32_t nColName, - const char* colVal, - int32_t nColVal) { +SIndexTerm* indexTermCreate(int64_t suid, SIndexOperOnColumn oper, uint8_t colType, const char* colName, + int32_t nColName, const char* colVal, int32_t nColVal) { SIndexTerm* t = (SIndexTerm*)calloc(1, (sizeof(SIndexTerm))); - if (t == NULL) { return NULL; } + if (t == NULL) { + return NULL; + } t->suid = suid; t->operType = oper; @@ -282,9 +285,7 @@ void indexTermDestroy(SIndexTerm* p) { free(p); } -SIndexMultiTerm* indexMultiTermCreate() { - return taosArrayInit(4, sizeof(SIndexTerm*)); -} +SIndexMultiTerm* indexMultiTermCreate() { return taosArrayInit(4, sizeof(SIndexTerm*)); } int indexMultiTermAdd(SIndexMultiTerm* terms, SIndexTerm* term) { taosArrayPush(terms, &term); @@ -307,7 +308,7 @@ static int indexTermSearch(SIndex* sIdx, SIndexTermQuery* query, SArray** result IndexCache* cache = NULL; pthread_mutex_lock(&sIdx->mtx); IndexCache** pCache = taosHashGet(sIdx->colObj, colName, nColName); - if (*pCache == NULL) { + if (pCache == NULL) { pthread_mutex_unlock(&sIdx->mtx); return -1; } @@ -335,7 +336,9 @@ static int indexTermSearch(SIndex* sIdx, SIndexTermQuery* query, SArray** result return 0; } static void indexInterResultsDestroy(SArray* results) { - if (results == NULL) { return; } + if (results == NULL) { + return; + } size_t sz = taosArrayGetSize(results); for (size_t i = 0; i < sz; i++) { @@ -366,7 +369,9 @@ static int indexMergeFinalResults(SArray* interResults, EIndexOperatorType oType } int indexFlushCacheTFile(SIndex* sIdx, void* cache) { - if (sIdx == NULL) { return -1; } + if (sIdx == NULL) { + return -1; + } indexWarn("suid %" PRIu64 " merge cache into tindex", sIdx->suid); IndexCache* pCache = (IndexCache*)cache; @@ -399,7 +404,6 @@ int indexFlushCacheTFile(SIndex* sIdx, void* cache) { TFileValue* tfv = tfileValueCreate(cv->colVal); taosArrayAddAll(tfv->tableId, cv->val); taosArrayPush(result, &tfv); - // copy to final Result; cn = cacheIter->next(cacheIter); } else { @@ -433,7 +437,9 @@ int indexFlushCacheTFile(SIndex* sIdx, void* cache) { indexError("faile to open file to write"); } else { int ret = tfileWriterPut(tw, result); - if (ret != 0) { indexError("faile to write into tindex "); } + if (ret != 0) { + indexError("faile to write into tindex "); + } } // not free later, just put int table cache indexCacheDestroyImm(pCache); diff --git a/source/libs/index/src/index_cache.c b/source/libs/index/src/index_cache.c index 3f99d04bc9..217545d23b 100644 --- a/source/libs/index/src/index_cache.c +++ b/source/libs/index/src/index_cache.c @@ -23,46 +23,22 @@ #define MEM_TERM_LIMIT 1000000 // ref index_cache.h:22 //#define CACHE_KEY_LEN(p) \ -// (sizeof(int32_t) + sizeof(uint16_t) + sizeof(p->colType) + sizeof(p->nColVal) + p->nColVal + sizeof(uint64_t) + sizeof(p->operType)) +// (sizeof(int32_t) + sizeof(uint16_t) + sizeof(p->colType) + sizeof(p->nColVal) + p->nColVal + sizeof(uint64_t) + +// sizeof(p->operType)) -static void cacheTermDestroy(CacheTerm* ct) { - if (ct == NULL) { return; } +static void indexMemRef(MemTable* tbl); +static void indexMemUnRef(MemTable* tbl); - free(ct->colVal); - free(ct); -} -static char* getIndexKey(const void* pData) { - CacheTerm* p = (CacheTerm*)pData; - return (char*)p; -} +static void cacheTermDestroy(CacheTerm* ct); +static char* getIndexKey(const void* pData); +static int32_t compareKey(const void* l, const void* r); -static int32_t compareKey(const void* l, const void* r) { - CacheTerm* lt = (CacheTerm*)l; - CacheTerm* rt = (CacheTerm*)r; +static MemTable* indexInternalCacheCreate(int8_t type); - // compare colVal - int i, j; - for (i = 0, j = 0; i < lt->nColVal && j < rt->nColVal; i++, j++) { - if (lt->colVal[i] == rt->colVal[j]) { - continue; - } else { - return lt->colVal[i] < rt->colVal[j] ? -1 : 1; - } - } - if (i < lt->nColVal) { - return 1; - } else if (j < rt->nColVal) { - return -1; - } - // compare version - return rt->version - lt->version; -} +static void doMergeWork(SSchedMsg* msg); +static bool indexCacheIteratorNext(Iterate* itera); -static SSkipList* indexInternalCacheCreate(int8_t type) { - if (type == TSDB_DATA_TYPE_BINARY) { - return tSkipListCreate(MAX_SKIP_LIST_LEVEL, type, MAX_INDEX_KEY_LEN, compareKey, SL_ALLOW_DUP_KEY, getIndexKey); - } -} +static IterateValue* indexCacheIteratorGetValue(Iterate* iter); IndexCache* indexCacheCreate(SIndex* idx, const char* colName, int8_t type) { IndexCache* cache = calloc(1, sizeof(IndexCache)); @@ -83,7 +59,15 @@ IndexCache* indexCacheCreate(SIndex* idx, const char* colName, int8_t type) { return cache; } void indexCacheDebug(IndexCache* cache) { - SSkipListIterator* iter = tSkipListCreateIter(cache->mem); + MemTable* tbl = NULL; + + pthread_mutex_lock(&cache->mtx); + tbl = cache->mem; + indexMemRef(tbl); + pthread_mutex_unlock(&cache->mtx); + + SSkipList* slt = tbl->mem; + SSkipListIterator* iter = tSkipListCreateIter(slt); while (tSkipListIterNext(iter)) { SSkipListNode* node = tSkipListIterGet(iter); CacheTerm* ct = (CacheTerm*)SL_GET_NODE_DATA(node); @@ -93,6 +77,8 @@ void indexCacheDebug(IndexCache* cache) { } } tSkipListDestroyIter(iter); + + indexMemUnRef(tbl); } void indexCacheDestroySkiplist(SSkipList* slt) { @@ -100,71 +86,50 @@ void indexCacheDestroySkiplist(SSkipList* slt) { while (tSkipListIterNext(iter)) { SSkipListNode* node = tSkipListIterGet(iter); CacheTerm* ct = (CacheTerm*)SL_GET_NODE_DATA(node); - if (ct != NULL) {} + if (ct != NULL) { + } } tSkipListDestroyIter(iter); + tSkipListDestroy(slt); } void indexCacheDestroyImm(IndexCache* cache) { + MemTable* tbl = NULL; pthread_mutex_lock(&cache->mtx); - SSkipList* timm = (SSkipList*)cache->imm; + tbl = cache->imm; cache->imm = NULL; // or throw int bg thread pthread_mutex_unlock(&cache->mtx); - - indexCacheDestroySkiplist(timm); + indexMemUnRef(tbl); } void indexCacheDestroy(void* cache) { IndexCache* pCache = cache; - if (pCache == NULL) { return; } - tSkipListDestroy(pCache->mem); - tSkipListDestroy(pCache->imm); + if (pCache == NULL) { + return; + } + indexMemUnRef(pCache->mem); + indexMemUnRef(pCache->imm); free(pCache->colName); free(pCache); } -static void doMergeWork(SSchedMsg* msg) { - IndexCache* pCache = msg->ahandle; - SIndex* sidx = (SIndex*)pCache->index; - indexFlushCacheTFile(sidx, pCache); -} -static bool indexCacheIteratorNext(Iterate* itera) { - SSkipListIterator* iter = itera->iter; - if (iter == NULL) { return false; } - - IterateValue* iv = &itera->val; - iterateValueDestroy(iv, false); - - bool next = tSkipListIterNext(iter); - if (next) { - SSkipListNode* node = tSkipListIterGet(iter); - CacheTerm* ct = (CacheTerm*)SL_GET_NODE_DATA(node); - - iv->type = ct->operaType; - iv->colVal = ct->colVal; - - taosArrayPush(iv->val, &ct->uid); - } - - return next; -} - -static IterateValue* indexCacheIteratorGetValue(Iterate* iter) { - return &iter->val; -} Iterate* indexCacheIteratorCreate(IndexCache* cache) { Iterate* iiter = calloc(1, sizeof(Iterate)); - if (iiter == NULL) { return NULL; } + if (iiter == NULL) { + return NULL; + } + MemTable* tbl = cache->imm; iiter->val.val = taosArrayInit(1, sizeof(uint64_t)); - iiter->iter = cache->imm != NULL ? tSkipListCreateIter(cache->imm) : NULL; + iiter->iter = tbl != NULL ? tSkipListCreateIter(tbl->mem) : NULL; iiter->next = indexCacheIteratorNext; iiter->getValue = indexCacheIteratorGetValue; return iiter; } void indexCacheIteratorDestroy(Iterate* iter) { - if (iter == NULL) { return; } - + if (iter == NULL) { + return; + } tSkipListDestroyIter(iter->iter); iterateValueDestroy(&iter->val, true); free(iter); @@ -201,18 +166,21 @@ static void indexCacheMakeRoomForWrite(IndexCache* cache) { } int indexCachePut(void* cache, SIndexTerm* term, uint64_t uid) { - if (cache == NULL) { return -1; } + if (cache == NULL) { + return -1; + } IndexCache* pCache = cache; indexCacheRef(pCache); // encode data CacheTerm* ct = calloc(1, sizeof(CacheTerm)); - if (cache == NULL) { return -1; } + if (cache == NULL) { + return -1; + } // set up key ct->colType = term->colType; - ct->nColVal = term->nColVal; - ct->colVal = (char*)calloc(1, sizeof(char) * (ct->nColVal + 1)); - memcpy(ct->colVal, term->colVal, ct->nColVal); + ct->colVal = (char*)calloc(1, sizeof(char) * (term->nColVal + 1)); + memcpy(ct->colVal, term->colVal, term->nColVal); ct->version = atomic_add_fetch_32(&pCache->version, 1); // set value ct->uid = uid; @@ -220,8 +188,13 @@ int indexCachePut(void* cache, SIndexTerm* term, uint64_t uid) { // ugly code, refactor later pthread_mutex_lock(&pCache->mtx); + indexCacheMakeRoomForWrite(pCache); - tSkipListPut(pCache->mem, (char*)ct); + MemTable* tbl = pCache->mem; + indexMemRef(tbl); + tSkipListPut(tbl->mem, (char*)ct); + indexMemUnRef(tbl); + pthread_mutex_unlock(&pCache->mtx); indexCacheUnRef(pCache); @@ -233,27 +206,38 @@ int indexCacheDel(void* cache, const char* fieldValue, int32_t fvlen, uint64_t u return 0; } int indexCacheSearch(void* cache, SIndexTermQuery* query, SArray* result, STermValueType* s) { - if (cache == NULL) { return -1; } + if (cache == NULL) { + return -1; + } IndexCache* pCache = cache; SIndexTerm* term = query->term; EIndexQueryType qtype = query->qType; + MemTable *mem = NULL, *imm = NULL; + pthread_mutex_lock(&pCache->mtx); + mem = pCache->mem; + imm = pCache->imm; + indexMemRef(mem); + indexMemRef(imm); + pthread_mutex_unlock(&pCache->mtx); + CacheTerm* ct = calloc(1, sizeof(CacheTerm)); - if (ct == NULL) { return -1; } - ct->nColVal = term->nColVal; - ct->colVal = calloc(1, sizeof(char) * (ct->nColVal + 1)); - memcpy(ct->colVal, term->colVal, ct->nColVal); + if (ct == NULL) { + return -1; + } + ct->colVal = calloc(1, sizeof(char) * (term->nColVal + 1)); + memcpy(ct->colVal, term->colVal, term->nColVal); ct->version = atomic_load_32(&pCache->version); char* key = getIndexKey(ct); // TODO handle multi situation later, and refactor - SSkipListIterator* iter = tSkipListCreateIterFromVal(pCache->mem, key, TSDB_DATA_TYPE_BINARY, TSDB_ORDER_ASC); + SSkipListIterator* iter = tSkipListCreateIterFromVal(mem->mem, key, TSDB_DATA_TYPE_BINARY, TSDB_ORDER_ASC); while (tSkipListIterNext(iter)) { SSkipListNode* node = tSkipListIterGet(iter); if (node != NULL) { CacheTerm* c = (CacheTerm*)SL_GET_NODE_DATA(node); if (c->operaType == ADD_VALUE || qtype == QUERY_TERM) { - if (c->nColVal == ct->nColVal && strncmp(c->colVal, ct->colVal, c->nColVal) == 0) { + if (strcmp(c->colVal, ct->colVal) == 0) { taosArrayPush(result, &c->uid); *s = kTypeValue; } else { @@ -279,14 +263,104 @@ int indexCacheSearch(void* cache, SIndexTermQuery* query, SArray* result, STermV } else if (qtype == QUERY_REGEX) { // } + indexMemUnRef(mem); + indexMemUnRef(imm); return 0; } void indexCacheRef(IndexCache* cache) { + if (cache == NULL) { + return; + } int ref = T_REF_INC(cache); UNUSED(ref); } void indexCacheUnRef(IndexCache* cache) { + if (cache == NULL) { + return; + } int ref = T_REF_DEC(cache); - if (ref == 0) { indexCacheDestroy(cache); } + if (ref == 0) { + indexCacheDestroy(cache); + } } + +void indexMemRef(MemTable* tbl) { + if (tbl == NULL) { + return; + } + int ref = T_REF_INC(tbl); + UNUSED(ref); +} +void indexMemUnRef(MemTable* tbl) { + if (tbl == NULL) { + return; + } + int ref = T_REF_DEC(tbl); + if (ref == 0) { + SSkipList* slt = tbl->mem; + indexCacheDestroySkiplist(slt); + free(tbl); + } +} + +static void cacheTermDestroy(CacheTerm* ct) { + if (ct == NULL) { + return; + } + free(ct->colVal); + free(ct); +} +static char* getIndexKey(const void* pData) { + CacheTerm* p = (CacheTerm*)pData; + return (char*)p; +} + +static int32_t compareKey(const void* l, const void* r) { + CacheTerm* lt = (CacheTerm*)l; + CacheTerm* rt = (CacheTerm*)r; + + // compare colVal + int32_t cmp = strcmp(lt->colVal, rt->colVal); + if (cmp == 0) { + return rt->version - lt->version; + } + return cmp; +} + +static MemTable* indexInternalCacheCreate(int8_t type) { + MemTable* tbl = calloc(1, sizeof(MemTable)); + indexMemRef(tbl); + if (type == TSDB_DATA_TYPE_BINARY) { + tbl->mem = tSkipListCreate(MAX_SKIP_LIST_LEVEL, type, MAX_INDEX_KEY_LEN, compareKey, SL_ALLOW_DUP_KEY, getIndexKey); + } + return tbl; +} + +static void doMergeWork(SSchedMsg* msg) { + IndexCache* pCache = msg->ahandle; + SIndex* sidx = (SIndex*)pCache->index; + indexFlushCacheTFile(sidx, pCache); +} +static bool indexCacheIteratorNext(Iterate* itera) { + SSkipListIterator* iter = itera->iter; + if (iter == NULL) { + return false; + } + IterateValue* iv = &itera->val; + iterateValueDestroy(iv, false); + + bool next = tSkipListIterNext(iter); + if (next) { + SSkipListNode* node = tSkipListIterGet(iter); + CacheTerm* ct = (CacheTerm*)SL_GET_NODE_DATA(node); + + iv->type = ct->operaType; + iv->colVal = ct->colVal; + + taosArrayPush(iv->val, &ct->uid); + } + return next; +} + +static IterateValue* indexCacheIteratorGetValue(Iterate* iter) { return &iter->val; } diff --git a/source/libs/index/src/index_tfile.c b/source/libs/index/src/index_tfile.c index fc31ff3c29..271528a437 100644 --- a/source/libs/index/src/index_tfile.c +++ b/source/libs/index/src/index_tfile.c @@ -54,7 +54,9 @@ static void tfileSerialCacheKey(TFileCacheKey* key, char* buf); TFileCache* tfileCacheCreate(const char* path) { TFileCache* tcache = calloc(1, sizeof(TFileCache)); - if (tcache == NULL) { return NULL; } + if (tcache == NULL) { + return NULL; + } tcache->tableCache = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); tcache->capacity = 64; @@ -83,7 +85,10 @@ TFileCache* tfileCacheCreate(const char* path) { tfileReaderRef(reader); // loader fst and validate it TFileHeader* header = &reader->header; - TFileCacheKey key = {.suid = header->suid, .colName = header->colName, .nColName = strlen(header->colName), .colType = header->colType}; + TFileCacheKey key = {.suid = header->suid, + .colName = header->colName, + .nColName = strlen(header->colName), + .colType = header->colType}; char buf[128] = {0}; tfileSerialCacheKey(&key, buf); @@ -97,13 +102,16 @@ End: return NULL; } void tfileCacheDestroy(TFileCache* tcache) { - if (tcache == NULL) { return; } + if (tcache == NULL) { + return; + } // free table cache TFileReader** reader = taosHashIterate(tcache->tableCache, NULL); while (reader) { TFileReader* p = *reader; - indexInfo("drop table cache suid: %" PRIu64 ", colName: %s, colType: %d", p->header.suid, p->header.colName, p->header.colType); + indexInfo("drop table cache suid: %" PRIu64 ", colName: %s, colType: %d", p->header.suid, p->header.colName, + p->header.colType); tfileReaderUnRef(p); reader = taosHashIterate(tcache->tableCache, reader); @@ -116,10 +124,13 @@ TFileReader* tfileCacheGet(TFileCache* tcache, TFileCacheKey* key) { char buf[128] = {0}; tfileSerialCacheKey(key, buf); - TFileReader* reader = taosHashGet(tcache->tableCache, buf, strlen(buf)); - tfileReaderRef(reader); + TFileReader** reader = taosHashGet(tcache->tableCache, buf, strlen(buf)); + if (reader == NULL) { + return NULL; + } + tfileReaderRef(*reader); - return reader; + return *reader; } void tfileCachePut(TFileCache* tcache, TFileCacheKey* key, TFileReader* reader) { char buf[128] = {0}; @@ -138,14 +149,17 @@ void tfileCachePut(TFileCache* tcache, TFileCacheKey* key, TFileReader* reader) } TFileReader* tfileReaderCreate(WriterCtx* ctx) { TFileReader* reader = calloc(1, sizeof(TFileReader)); - if (reader == NULL) { return NULL; } + if (reader == NULL) { + return NULL; + } // T_REF_INC(reader); reader->ctx = ctx; if (0 != tfileReaderLoadHeader(reader)) { tfileReaderDestroy(reader); - indexError("failed to load index header, suid: %" PRIu64 ", colName: %s", reader->header.suid, reader->header.colName); + indexError("failed to load index header, suid: %" PRIu64 ", colName: %s", reader->header.suid, + reader->header.colName); return NULL; } @@ -158,7 +172,9 @@ TFileReader* tfileReaderCreate(WriterCtx* ctx) { return reader; } void tfileReaderDestroy(TFileReader* reader) { - if (reader == NULL) { return; } + if (reader == NULL) { + return; + } // T_REF_INC(reader); fstDestroy(reader->fst); writerCtxDestroy(reader->ctx); @@ -175,10 +191,12 @@ int tfileReaderSearch(TFileReader* reader, SIndexTermQuery* query, SArray* resul uint64_t offset; FstSlice key = fstSliceCreate(term->colVal, term->nColVal); if (fstGet(reader->fst, &key, &offset)) { - indexInfo("index: %" PRIu64 ", col: %s, colVal: %s, found table info in tindex", term->suid, term->colName, term->colVal); + indexInfo("index: %" PRIu64 ", col: %s, colVal: %s, found table info in tindex", term->suid, term->colName, + term->colVal); ret = tfileReaderLoadTableIds(reader, offset, result); } else { - indexInfo("index: %" PRIu64 ", col: %s, colVal: %s, not found table info in tindex", term->suid, term->colName, term->colVal); + indexInfo("index: %" PRIu64 ", col: %s, colVal: %s, not found table info in tindex", term->suid, term->colName, + term->colVal); } fstSliceDestroy(&key); } else if (qtype == QUERY_PREFIX) { @@ -304,12 +322,16 @@ int tfileWriterPut(TFileWriter* tw, void* data) { return 0; } void tfileWriteClose(TFileWriter* tw) { - if (tw == NULL) { return; } + if (tw == NULL) { + return; + } writerCtxDestroy(tw->ctx); free(tw); } void tfileWriterDestroy(TFileWriter* tw) { - if (tw == NULL) { return; } + if (tw == NULL) { + return; + } writerCtxDestroy(tw->ctx); free(tw); @@ -317,29 +339,35 @@ void tfileWriterDestroy(TFileWriter* tw) { IndexTFile* indexTFileCreate(const char* path) { IndexTFile* tfile = calloc(1, sizeof(IndexTFile)); - if (tfile == NULL) { return NULL; } + if (tfile == NULL) { + return NULL; + } tfile->cache = tfileCacheCreate(path); return tfile; } -void IndexTFileDestroy(IndexTFile* tfile) { - free(tfile); -} +void IndexTFileDestroy(IndexTFile* tfile) { free(tfile); } int indexTFileSearch(void* tfile, SIndexTermQuery* query, SArray* result) { int ret = -1; - if (tfile == NULL) { return ret; } + if (tfile == NULL) { + return ret; + } IndexTFile* pTfile = (IndexTFile*)tfile; SIndexTerm* term = query->term; - TFileCacheKey key = {.suid = term->suid, .colType = term->colType, .colName = term->colName, .nColName = term->nColName}; - TFileReader* reader = tfileCacheGet(pTfile->cache, &key); + TFileCacheKey key = { + .suid = term->suid, .colType = term->colType, .colName = term->colName, .nColName = term->nColName}; + TFileReader* reader = tfileCacheGet(pTfile->cache, &key); + if (reader == NULL) { + return 0; + } return tfileReaderSearch(reader, query, result); } int indexTFilePut(void* tfile, SIndexTerm* term, uint64_t uid) { - // TFileWriterOpt wOpt = {.suid = term->suid, .colType = term->colType, .colName = term->colName, .nColName = term->nColName, .version = - // 1}; + // TFileWriterOpt wOpt = {.suid = term->suid, .colType = term->colType, .colName = term->colName, .nColName = + // term->nColName, .version = 1}; return 0; } @@ -353,7 +381,9 @@ static bool tfileIteratorNext(Iterate* iiter) { TFileFstIter* tIter = iiter->iter; StreamWithStateResult* rt = streamWithStateNextWith(tIter->st, NULL); - if (rt == NULL) { return false; } + if (rt == NULL) { + return false; + } int32_t sz = 0; char* ch = (char*)fstSliceData(&rt->data, &sz); @@ -364,20 +394,22 @@ static bool tfileIteratorNext(Iterate* iiter) { swsResultDestroy(rt); // set up iterate value - if (tfileReaderLoadTableIds(tIter->rdr, offset, iv->val) != 0) { return false; } + if (tfileReaderLoadTableIds(tIter->rdr, offset, iv->val) != 0) { + return false; + } iv->colVal = colVal; // std::string key(ch, sz); } -static IterateValue* tifileIterateGetValue(Iterate* iter) { - return &iter->val; -} +static IterateValue* tifileIterateGetValue(Iterate* iter) { return &iter->val; } static TFileFstIter* tfileFstIteratorCreate(TFileReader* reader) { TFileFstIter* tIter = calloc(1, sizeof(Iterate)); - if (tIter == NULL) { return NULL; } + if (tIter == NULL) { + return NULL; + } tIter->ctx = automCtxCreate(NULL, AUTOMATION_ALWAYS); tIter->fb = fstSearch(reader->fst, tIter->ctx); tIter->st = streamBuilderIntoStream(tIter->fb); @@ -389,14 +421,18 @@ Iterate* tfileIteratorCreate(TFileReader* reader) { Iterate* iter = calloc(1, sizeof(Iterate)); iter->iter = tfileFstIteratorCreate(reader); - if (iter->iter == NULL) { return NULL; } + if (iter->iter == NULL) { + return NULL; + } iter->next = tfileIteratorNext; iter->getValue = tifileIterateGetValue; return iter; } void tfileIteratorDestroy(Iterate* iter) { - if (iter == NULL) { return; } + if (iter == NULL) { + return; + } IterateValue* iv = &iter->val; iterateValueDestroy(iv, true); @@ -409,14 +445,18 @@ void tfileIteratorDestroy(Iterate* iter) { } TFileReader* tfileGetReaderByCol(IndexTFile* tf, char* colName) { - if (tf == NULL) { return NULL; } + if (tf == NULL) { + return NULL; + } TFileCacheKey key = {.suid = 0, .colType = TSDB_DATA_TYPE_BINARY, .colName = colName, .nColName = strlen(colName)}; return tfileCacheGet(tf->cache, &key); } static int tfileStrCompare(const void* a, const void* b) { int ret = strcmp((char*)a, (char*)b); - if (ret == 0) { return ret; } + if (ret == 0) { + return ret; + } return ret < 0 ? -1 : 1; } @@ -431,13 +471,17 @@ static int tfileValueCompare(const void* a, const void* b, const void* param) { TFileValue* tfileValueCreate(char* val) { TFileValue* tf = calloc(1, sizeof(TFileValue)); - if (tf == NULL) { return NULL; } + if (tf == NULL) { + return NULL; + } tf->tableId = taosArrayInit(32, sizeof(uint64_t)); return tf; } int tfileValuePush(TFileValue* tf, uint64_t val) { - if (tf == NULL) { return -1; } + if (tf == NULL) { + return -1; + } taosArrayPush(tf->tableId, &val); return 0; } @@ -457,7 +501,9 @@ static void tfileSerialTableIdsToBuf(char* buf, SArray* ids) { static int tfileWriteFstOffset(TFileWriter* tw, int32_t offset) { int32_t fstOffset = offset + sizeof(tw->header.fstOffset); tw->header.fstOffset = fstOffset; - if (sizeof(fstOffset) != tw->ctx->write(tw->ctx, (char*)&fstOffset, sizeof(fstOffset))) { return -1; } + if (sizeof(fstOffset) != tw->ctx->write(tw->ctx, (char*)&fstOffset, sizeof(fstOffset))) { + return -1; + } tw->offset += sizeof(fstOffset); return 0; } @@ -468,7 +514,9 @@ static int tfileWriteHeader(TFileWriter* writer) { memcpy(buf, (char*)header, sizeof(buf)); int nwrite = writer->ctx->write(writer->ctx, buf, sizeof(buf)); - if (sizeof(buf) != nwrite) { return -1; } + if (sizeof(buf) != nwrite) { + return -1; + } writer->offset = nwrite; return 0; } @@ -502,7 +550,9 @@ static int tfileReaderLoadFst(TFileReader* reader) { static int FST_MAX_SIZE = 16 * 1024; char* buf = calloc(1, sizeof(char) * FST_MAX_SIZE); - if (buf == NULL) { return -1; } + if (buf == NULL) { + return -1; + } WriterCtx* ctx = reader->ctx; int32_t nread = ctx->readFrom(ctx, buf, FST_MAX_SIZE, reader->header.fstOffset); @@ -525,7 +575,9 @@ static int tfileReaderLoadTableIds(TFileReader* reader, int32_t offset, SArray* int32_t total = sizeof(uint64_t) * nid; char* buf = calloc(1, total); - if (buf == NULL) { return -1; } + if (buf == NULL) { + return -1; + } nread = ctx->read(ctx, buf, total); assert(total == nread); @@ -543,12 +595,16 @@ void tfileReaderRef(TFileReader* reader) { void tfileReaderUnRef(TFileReader* reader) { int ref = T_REF_DEC(reader); - if (ref == 0) { tfileReaderDestroy(reader); } + if (ref == 0) { + tfileReaderDestroy(reader); + } } static int tfileGetFileList(const char* path, SArray* result) { DIR* dir = opendir(path); - if (NULL == dir) { return -1; } + if (NULL == dir) { + return -1; + } struct dirent* entry; while ((entry = readdir(dir)) != NULL) { @@ -576,7 +632,9 @@ static int tfileCompare(const void* a, const void* b) { size_t bLen = strlen(bName); int ret = strncmp(aName, bName, aLen > bLen ? aLen : bLen); - if (ret == 0) { return ret; } + if (ret == 0) { + return ret; + } return ret < 0 ? -1 : 1; } // tfile name suid-colId-version.tindex diff --git a/source/libs/index/test/indexTests.cc b/source/libs/index/test/indexTests.cc index 17733dd284..b3e385192f 100644 --- a/source/libs/index/test/indexTests.cc +++ b/source/libs/index/test/indexTests.cc @@ -2,7 +2,8 @@ * Copyright (c) 2019 TAOS Data, Inc. * * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 * or later ("AGPL"), as published by the Free Software Foundation. + * it under the terms of the GNU Affero General Public License, version 3 * or later ("AGPL"), as published by the Free + * Software Foundation. * * This program is distributed in the hope that it will be useful, but WITHOUT * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or @@ -75,7 +76,9 @@ class FstReadMemory { bool init() { char* buf = (char*)calloc(1, sizeof(char) * _size); int nRead = fstCountingWriterRead(_w, (uint8_t*)buf, _size); - if (nRead <= 0) { return false; } + if (nRead <= 0) { + return false; + } _size = nRead; _s = fstSliceCreate((uint8_t*)buf, _size); _fst = fstCreate(&_s); @@ -179,7 +182,9 @@ void checkFstPerf() { delete fw; FstReadMemory* m = new FstReadMemory(1024 * 64); - if (m->init()) { printf("success to init fst read"); } + if (m->init()) { + printf("success to init fst read"); + } Performance_fstReadRecords(m); delete m; } @@ -283,7 +288,8 @@ class IndexEnv : public ::testing::Test { // / { // / std::string colName("tag1"), colVal("Hello world"); // / SIndexTerm* term = -// indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), colVal.c_str(), / colVal.size()); +// indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), colVal.c_str(), / +// colVal.size()); // SIndexMultiTerm* terms = indexMultiTermCreate(); // indexMultiTermAdd(terms, term); // / / for (size_t i = 0; i < 100; i++) { @@ -301,14 +307,16 @@ class IndexEnv : public ::testing::Test { // / { // / std::string colName("tag1"), colVal("Hello world"); // / SIndexTerm* term = -// / indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), colVal.c_str(), colVal.size()); +// / indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), colVal.c_str(), +// colVal.size()); // / indexMultiTermAdd(terms, term); // / // } // / { // / std::string colName("tag2"), colVal("Hello world"); // / SIndexTerm* term = -// / indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), colVal.c_str(), colVal.size()); +// / indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), colVal.c_str(), +// colVal.size()); // / indexMultiTermAdd(terms, term); // / // } @@ -327,7 +335,8 @@ class IndexEnv : public ::testing::Test { class TFileObj { public: - TFileObj(const std::string& path = "/tmp/tindex", const std::string& colName = "voltage") : path_(path), colName_(colName) { + TFileObj(const std::string& path = "/tmp/tindex", const std::string& colName = "voltage") + : path_(path), colName_(colName) { colId_ = 10; // Do Nothing // @@ -337,7 +346,9 @@ class TFileObj { tfileReaderDestroy(reader_); reader_ = NULL; } - if (writer_ == NULL) { InitWriter(); } + if (writer_ == NULL) { + InitWriter(); + } return tfileWriterPut(writer_, tv); } bool InitWriter() { @@ -377,8 +388,12 @@ class TFileObj { return tfileReaderSearch(reader_, query, result); } ~TFileObj() { - if (writer_) { tfileWriterDestroy(writer_); } - if (reader_) { tfileReaderDestroy(reader_); } + if (writer_) { + tfileWriterDestroy(writer_); + } + if (reader_) { + tfileReaderDestroy(reader_); + } } private: @@ -455,9 +470,10 @@ TEST_F(IndexTFileEnv, test_tfile_write) { } taosArrayDestroy(data); - std::string colName("voltage"); - std::string colVal("ab"); - SIndexTerm* term = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), colVal.c_str(), colVal.size()); + std::string colName("voltage"); + std::string colVal("ab"); + SIndexTerm* term = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), + colVal.c_str(), colVal.size()); SIndexTermQuery query = {.term = term, .qType = QUERY_TERM}; SArray* result = (SArray*)taosArrayInit(1, sizeof(uint64_t)); @@ -525,54 +541,62 @@ TEST_F(IndexCacheEnv, cache_test) { std::string colName("voltage"); { std::string colVal("v1"); - SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), colVal.c_str(), colVal.size()); + SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), + colVal.c_str(), colVal.size()); coj->Put(term, colId, version++, suid++); } { std::string colVal("v3"); - SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), colVal.c_str(), colVal.size()); + SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), + colVal.c_str(), colVal.size()); coj->Put(term, colId, version++, suid++); } { std::string colVal("v2"); - SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), colVal.c_str(), colVal.size()); + SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), + colVal.c_str(), colVal.size()); coj->Put(term, colId, version++, suid++); } { std::string colVal("v3"); - SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), colVal.c_str(), colVal.size()); + SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), + colVal.c_str(), colVal.size()); coj->Put(term, colId, version++, suid++); } { std::string colVal("v3"); - SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), colVal.c_str(), colVal.size()); + SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), + colVal.c_str(), colVal.size()); coj->Put(term, colId, version++, suid++); } { std::string colVal("v3"); - SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), colVal.c_str(), colVal.size()); + SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), + colVal.c_str(), colVal.size()); coj->Put(term, othColId, version++, suid++); } { std::string colVal("v4"); - SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), colVal.c_str(), colVal.size()); + SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), + colVal.c_str(), colVal.size()); coj->Put(term, othColId, version++, suid++); } { std::string colVal("v4"); for (size_t i = 0; i < 10; i++) { colVal[colVal.size() - 1] = 'a' + i; - SIndexTerm* term = - indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), colVal.c_str(), colVal.size()); + SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), + colVal.c_str(), colVal.size()); coj->Put(term, colId, version++, suid++); } } coj->Debug(); // begin query { - std::string colVal("v3"); - SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), colVal.c_str(), colVal.size()); + std::string colVal("v3"); + SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), + colVal.c_str(), colVal.size()); SIndexTermQuery query = {.term = term, .qType = QUERY_TERM}; SArray* ret = (SArray*)taosArrayInit(4, sizeof(suid)); STermValueType valType; @@ -582,8 +606,9 @@ TEST_F(IndexCacheEnv, cache_test) { assert(taosArrayGetSize(ret) == 4); } { - std::string colVal("v2"); - SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), colVal.c_str(), colVal.size()); + std::string colVal("v2"); + SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), + colVal.c_str(), colVal.size()); SIndexTermQuery query = {.term = term, .qType = QUERY_TERM}; SArray* ret = (SArray*)taosArrayInit(4, sizeof(suid)); STermValueType valType; @@ -592,3 +617,132 @@ TEST_F(IndexCacheEnv, cache_test) { assert(taosArrayGetSize(ret) == 1); } } +class IndexObj { + public: + IndexObj() { + // opt + numOfWrite = 0; + numOfRead = 0; + indexInit(); + } + int Init(const std::string& dir) { + taosRemoveDir(dir.c_str()); + taosMkDir(dir.c_str()); + int ret = indexOpen(&opts, dir.c_str(), &idx); + if (ret != 0) { + // opt + std::cout << "failed to open index: %s" << dir << std::endl; + } + return ret; + } + int Put(SIndexMultiTerm* fvs, uint64_t uid) { + numOfWrite += taosArrayGetSize(fvs); + return indexPut(idx, fvs, uid); + } + int Search(SIndexMultiTermQuery* multiQ, SArray* result) { + SArray* query = multiQ->query; + numOfRead = taosArrayGetSize(query); + return indexSearch(idx, multiQ, result); + } + + void Debug() { + std::cout << "numOfWrite:" << numOfWrite << std::endl; + std::cout << "numOfRead:" << numOfRead << std::endl; + } + + ~IndexObj() { + indexClose(idx); + indexCleanUp(); + } + + private: + SIndexOpts opts; + SIndex* idx; + int numOfWrite; + int numOfRead; +}; + +class IndexEnv2 : public ::testing::Test { + protected: + virtual void SetUp() { + index = new IndexObj(); + // + } + virtual void TearDown() { + // r + delete index; + } + IndexObj* index; +}; +TEST_F(IndexEnv2, testIndexOpen) { + std::string path = "/tmp"; + if (index->Init(path) != 0) { + std::cout << "failed to init index" << std::endl; + exit(1); + } + + int targetSize = 100; + { + std::string colName("tag1"), colVal("Hello world"); + + SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), + colVal.c_str(), colVal.size()); + SIndexMultiTerm* terms = indexMultiTermCreate(); + indexMultiTermAdd(terms, term); + for (size_t i = 0; i < targetSize; i++) { + int tableId = i; + int ret = index->Put(terms, tableId); + assert(ret == 0); + } + indexMultiTermDestroy(terms); + } + { + size_t size = 100; + std::string colName("tag1"), colVal("hello world"); + + SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), + colVal.c_str(), colVal.size()); + SIndexMultiTerm* terms = indexMultiTermCreate(); + indexMultiTermAdd(terms, term); + for (size_t i = 0; i < size; i++) { + int tableId = i; + int ret = index->Put(terms, tableId); + assert(ret == 0); + } + indexMultiTermDestroy(terms); + } + + { + std::string colName("tag1"), colVal("Hello world"); + + SIndexMultiTermQuery* mq = indexMultiTermQueryCreate(MUST); + SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), + colVal.c_str(), colVal.size()); + indexMultiTermQueryAdd(mq, term, QUERY_TERM); + + SArray* result = (SArray*)taosArrayInit(1, sizeof(uint64_t)); + index->Search(mq, result); + assert(taosArrayGetSize(result) == targetSize); + } +} +TEST_F(IndexEnv2, testIndex_CachePut) { + std::string path = "/tmp"; + if (index->Init(path) != 0) { + } +} + +TEST_F(IndexEnv2, testIndexr_TFilePut) { + std::string path = "/tmp"; + if (index->Init(path) != 0) { + } +} +TEST_F(IndexEnv2, testIndex_CacheSearch) { + std::string path = "/tmp"; + if (index->Init(path) != 0) { + } +} +TEST_F(IndexEnv2, testIndex_TFileSearch) { + std::string path = "/tmp"; + if (index->Init(path) != 0) { + } +}