From f99f6ec87e59f2990993dc23a38b88ab56a96ed6 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Sun, 26 Dec 2021 23:31:16 +0800 Subject: [PATCH] add flush-helper function --- include/libs/index/index.h | 12 +++ source/libs/index/inc/indexInt.h | 59 ++++++----- source/libs/index/inc/index_cache.h | 25 +++-- source/libs/index/inc/index_tfile.h | 4 + source/libs/index/src/index.c | 79 ++++++++------- source/libs/index/src/index_cache.c | 140 ++++++++++++++------------- source/libs/index/src/index_tfile.c | 26 ++--- source/libs/index/test/indexTests.cc | 11 ++- 8 files changed, 201 insertions(+), 155 deletions(-) diff --git a/include/libs/index/index.h b/include/libs/index/index.h index f93c46da0c..d2b157542f 100644 --- a/include/libs/index/index.h +++ b/include/libs/index/index.h @@ -85,6 +85,18 @@ SIndexTerm* indexTermCreate(int64_t suid, int32_t nColVal); void indexTermDestroy(SIndexTerm* p); +/* + * init index + * + */ +int32_t indexInit(); +/* + * destory index + * + */ + +void indexCleanUp(); + #ifdef __cplusplus } #endif diff --git a/source/libs/index/inc/indexInt.h b/source/libs/index/inc/indexInt.h index 2584a847ff..a8f231da0a 100644 --- a/source/libs/index/inc/indexInt.h +++ b/source/libs/index/inc/indexInt.h @@ -49,7 +49,6 @@ struct SIndex { SHashObj* colObj; // < field name, field id> int64_t suid; // current super table id, -1 is normal table - int colId; // field id allocated to cache int32_t cVersion; // current version allocated to cache SIndexStat stat; @@ -88,41 +87,39 @@ typedef struct SIndexTermQuery { EIndexQueryType qType; } SIndexTermQuery; -#define indexFatal(...) \ - do { \ - if (sDebugFlag & DEBUG_FATAL) { \ - taosPrintLog("index FATAL ", 255, __VA_ARGS__); \ - } \ +typedef struct Iterate { + void* iter; + int8_t type; + char* colVal; + SArray* val; +} Iterate; +extern void* indexQhandle; + +int indexFlushCacheTFile(SIndex* sIdx, void*); + +#define indexFatal(...) \ + do { \ + if (sDebugFlag & DEBUG_FATAL) { taosPrintLog("index FATAL ", 255, __VA_ARGS__); } \ } while (0) -#define indexError(...) \ - do { \ - if (sDebugFlag & DEBUG_ERROR) { \ - taosPrintLog("index ERROR ", 255, __VA_ARGS__); \ - } \ +#define indexError(...) \ + do { \ + if (sDebugFlag & DEBUG_ERROR) { taosPrintLog("index ERROR ", 255, __VA_ARGS__); } \ } while (0) -#define indexWarn(...) \ - do { \ - if (sDebugFlag & DEBUG_WARN) { \ - taosPrintLog("index WARN ", 255, __VA_ARGS__); \ - } \ +#define indexWarn(...) \ + do { \ + if (sDebugFlag & DEBUG_WARN) { taosPrintLog("index WARN ", 255, __VA_ARGS__); } \ } while (0) -#define indexInfo(...) \ - do { \ - if (sDebugFlag & DEBUG_INFO) { \ - taosPrintLog("index ", 255, __VA_ARGS__); \ - } \ +#define indexInfo(...) \ + do { \ + if (sDebugFlag & DEBUG_INFO) { taosPrintLog("index ", 255, __VA_ARGS__); } \ } while (0) -#define indexDebug(...) \ - do { \ - if (sDebugFlag & DEBUG_DEBUG) { \ - taosPrintLog("index ", sDebugFlag, __VA_ARGS__); \ - } \ +#define indexDebug(...) \ + do { \ + if (sDebugFlag & DEBUG_DEBUG) { taosPrintLog("index ", sDebugFlag, __VA_ARGS__); } \ } while (0) -#define indexTrace(...) \ - do { \ - if (sDebugFlag & DEBUG_TRACE) { \ - taosPrintLog("index ", sDebugFlag, __VA_ARGS__); \ - } \ +#define indexTrace(...) \ + do { \ + if (sDebugFlag & DEBUG_TRACE) { taosPrintLog("index ", sDebugFlag, __VA_ARGS__); } \ } while (0) #ifdef __cplusplus diff --git a/source/libs/index/inc/index_cache.h b/source/libs/index/inc/index_cache.h index 692edcc064..07b5b8d564 100644 --- a/source/libs/index/inc/index_cache.h +++ b/source/libs/index/inc/index_cache.h @@ -22,10 +22,8 @@ // ----------------- key structure in skiplist --------------------- /* A data row, the format is like below: - * content: |<--totalLen-->|<-- fieldid-->|<--field type-->|<-- value len--->| - * |<-- value -->|<--uid -->|<--version--->|<-- itermType -->| - * len : |<--int32_t -->|<-- int16_t-->|<-- int8_t --->|<--- int32_t --->| - * <--valuelen->|<--uint64_t->| * <-- int32_t-->|<-- int8_t --->| + * content: |<--totalLen-->|<-- value len--->|<-- value -->|<--uid -->|<--version--->|<-- itermType -->| + * len : |<--int32_t -->|<--- int32_t --->|<--valuelen->|<--uint64_t->|<-- int32_t-->|<-- int8_t --->| */ #ifdef __cplusplus @@ -34,12 +32,17 @@ extern "C" { typedef struct IndexCache { T_REF_DECLARE() - SSkipList* skiplist; + SSkipList *mem, *imm; + SIndex* index; + char* colName; + int32_t version; + int32_t nTerm; + int8_t type; + } IndexCache; typedef struct CacheTerm { // key - int32_t colId; int32_t nColVal; char* colVal; int32_t version; @@ -49,14 +52,18 @@ typedef struct CacheTerm { SIndexOperOnColumn operaType; } CacheTerm; // -IndexCache* indexCacheCreate(); + +IndexCache* indexCacheCreate(SIndex* idx, const char* colName, int8_t type); void indexCacheDestroy(void* cache); -int indexCachePut(void* cache, SIndexTerm* term, int16_t colId, int32_t version, uint64_t uid); +int indexCachePut(void* cache, SIndexTerm* term, uint64_t uid); // int indexCacheGet(void *cache, uint64_t *rst); -int indexCacheSearch(void* cache, SIndexTermQuery* query, int16_t colId, int32_t version, SArray* result, STermValueType* s); +int indexCacheSearch(void* cache, SIndexTermQuery* query, SArray* result, STermValueType* s); + +void indexCacheRef(IndexCache* cache); +void indexCacheUnRef(IndexCache* cache); void indexCacheDebug(IndexCache* cache); #ifdef __cplusplus diff --git a/source/libs/index/inc/index_tfile.h b/source/libs/index/inc/index_tfile.h index 416b10bd14..550492ba50 100644 --- a/source/libs/index/inc/index_tfile.h +++ b/source/libs/index/inc/index_tfile.h @@ -105,9 +105,13 @@ void tfileCacheDestroy(TFileCache* tcache); TFileReader* tfileCacheGet(TFileCache* tcache, TFileCacheKey* key); void tfileCachePut(TFileCache* tcache, TFileCacheKey* key, TFileReader* reader); +TFileReader* tfileGetReaderByCol(IndexTFile* tf, char* colName); + TFileReader* tfileReaderCreate(WriterCtx* ctx); void tfileReaderDestroy(TFileReader* reader); int tfileReaderSearch(TFileReader* reader, SIndexTermQuery* query, SArray* result); +void tfileReaderRef(TFileReader* reader); +void tfileReaderUnRef(TFileReader* reader); TFileWriter* tfileWriterCreate(WriterCtx* ctx, TFileHeader* header); void tfileWriterDestroy(TFileWriter* tw); diff --git a/source/libs/index/src/index.c b/source/libs/index/src/index.c index 1fec9e8d0b..3f871af01d 100644 --- a/source/libs/index/src/index.c +++ b/source/libs/index/src/index.c @@ -18,11 +18,26 @@ #include "index_cache.h" #include "index_tfile.h" #include "tdef.h" +#include "tsched.h" #ifdef USE_LUCENE #include "lucene++/Lucene_c.h" #endif +#define INDEX_NUM_OF_THREADS 4 +#define INDEX_QUEUE_SIZE 4 + +void* indexQhandle = NULL; + +int32_t indexInit() { + indexQhandle = taosInitScheduler(INDEX_QUEUE_SIZE, INDEX_NUM_OF_THREADS, "index"); + return indexQhandle == NULL ? -1 : 0; + // do nothing +} +void indexCleanUp() { + taosCleanUpScheduler(indexQhandle); +} + static int uidCompare(const void* a, const void* b) { uint64_t u1 = *(uint64_t*)a; uint64_t u2 = *(uint64_t*)b; @@ -38,16 +53,15 @@ typedef struct SIdxColInfo { } SIdxColInfo; static pthread_once_t isInit = PTHREAD_ONCE_INIT; -static void indexInit(); +// static void indexInit(); static int indexTermSearch(SIndex* sIdx, SIndexTermQuery* term, SArray** result); -static int indexFlushCacheTFile(SIndex* sIdx); static void indexInterResultsDestroy(SArray* results); static int indexMergeFinalResults(SArray* interResults, EIndexOperatorType oType, SArray* finalResult); int indexOpen(SIndexOpts* opts, const char* path, SIndex** index) { - pthread_once(&isInit, indexInit); + // pthread_once(&isInit, indexInit); SIndex* sIdx = calloc(1, sizeof(SIndex)); if (sIdx == NULL) { return -1; } @@ -57,10 +71,9 @@ int indexOpen(SIndexOpts* opts, const char* path, SIndex** index) { #endif #ifdef USE_INVERTED_INDEX - sIdx->cache = (void*)indexCacheCreate(); - sIdx->tindex = NULL; + // sIdx->cache = (void*)indexCacheCreate(sIdx); + sIdx->tindex = indexTFileCreate(path); sIdx->colObj = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); - sIdx->colId = 1; sIdx->cVersion = 1; pthread_mutex_init(&sIdx->mtx, NULL); @@ -80,6 +93,12 @@ void indexClose(SIndex* sIdx) { #ifdef USE_INVERTED_INDEX indexCacheDestroy(sIdx->cache); + void* iter = taosHashIterate(sIdx->colObj, NULL); + while (iter) { + IndexCache** pCache = iter; + if (*pCache) { indexCacheUnRef(*pCache); } + iter = taosHashIterate(sIdx->colObj, iter); + } taosHashCleanup(sIdx->colObj); pthread_mutex_destroy(&sIdx->mtx); #endif @@ -110,29 +129,24 @@ int indexPut(SIndex* index, SIndexMultiTerm* fVals, uint64_t uid) { pthread_mutex_lock(&index->mtx); for (int i = 0; i < taosArrayGetSize(fVals); i++) { SIndexTerm* p = taosArrayGetP(fVals, i); - SIdxColInfo* fi = taosHashGet(index->colObj, p->colName, p->nColName); - if (fi == NULL) { - SIdxColInfo tfi = {.colId = index->colId}; - index->cVersion++; - index->colId++; - taosHashPut(index->colObj, p->colName, p->nColName, &tfi, sizeof(tfi)); - } else { - // TODO, del + IndexCache** cache = taosHashGet(index->colObj, p->colName, p->nColName); + if (*cache == NULL) { + IndexCache* pCache = indexCacheCreate(index, p->colName, p->colType); + taosHashPut(index->colObj, p->colName, p->nColName, &pCache, sizeof(void*)); } } pthread_mutex_unlock(&index->mtx); for (int i = 0; i < taosArrayGetSize(fVals); i++) { SIndexTerm* p = taosArrayGetP(fVals, i); - SIdxColInfo* fi = taosHashGet(index->colObj, p->colName, p->nColName); - assert(fi != NULL); - int32_t colId = fi->colId; - int32_t version = index->cVersion; - int ret = indexCachePut(index->cache, p, colId, version, uid); + IndexCache** cache = taosHashGet(index->colObj, p->colName, p->nColName); + + assert(*cache != NULL); + int ret = indexCachePut(*cache, p, uid); if (ret != 0) { return ret; } } -#endif +#endif return 0; } int indexSearch(SIndex* index, SIndexMultiTermQuery* multiQuerys, SArray* result) { @@ -281,32 +295,26 @@ void indexMultiTermDestroy(SIndexMultiTerm* terms) { taosArrayDestroy(terms); } -void indexInit() { - // do nothing -} static int indexTermSearch(SIndex* sIdx, SIndexTermQuery* query, SArray** result) { - int32_t version = -1; - int16_t colId = -1; - SIdxColInfo* colInfo = NULL; - SIndexTerm* term = query->term; const char* colName = term->colName; int32_t nColName = term->nColName; + // Get col info + IndexCache* cache = NULL; pthread_mutex_lock(&sIdx->mtx); - colInfo = taosHashGet(sIdx->colObj, colName, nColName); - if (colInfo == NULL) { + IndexCache** pCache = taosHashGet(sIdx->colObj, colName, nColName); + if (*pCache == NULL) { pthread_mutex_unlock(&sIdx->mtx); return -1; } - colId = colInfo->colId; - version = colInfo->cVersion; + cache = *pCache; pthread_mutex_unlock(&sIdx->mtx); *result = taosArrayInit(4, sizeof(uint64_t)); // TODO: iterator mem and tidex STermValueType s; - if (0 == indexCacheSearch(sIdx->cache, query, colId, version, *result, &s)) { + if (0 == indexCacheSearch(cache, query, *result, &s)) { if (s == kTypeDeletion) { indexInfo("col: %s already drop by other opera", term->colName); // coloum already drop by other oper, no need to query tindex @@ -353,9 +361,14 @@ static int indexMergeFinalResults(SArray* interResults, EIndexOperatorType oType } return 0; } -static int indexFlushCacheTFile(SIndex* sIdx) { +int indexFlushCacheTFile(SIndex* sIdx, void* cache) { if (sIdx == NULL) { return -1; } indexWarn("suid %" PRIu64 " merge cache into tindex", sIdx->suid); + IndexCache* pCache = (IndexCache*)cache; + TFileReader* pReader = tfileGetReaderByCol(sIdx->tindex, pCache->colName); + + tfileReaderUnRef(pReader); + indexCacheUnRef(pCache); return 0; } diff --git a/source/libs/index/src/index_cache.c b/source/libs/index/src/index_cache.c index 9c92203088..8181c17505 100644 --- a/source/libs/index/src/index_cache.c +++ b/source/libs/index/src/index_cache.c @@ -16,9 +16,11 @@ #include "index_cache.h" #include "index_util.h" #include "tcompare.h" +#include "tsched.h" #define MAX_INDEX_KEY_LEN 256 // test only, change later +#define CACH_LIMIT 1000000 // ref index_cache.h:22 //#define CACHE_KEY_LEN(p) \ // (sizeof(int32_t) + sizeof(uint16_t) + sizeof(p->colType) + sizeof(p->nColVal) + p->nColVal + sizeof(uint64_t) + sizeof(p->operType)) @@ -38,9 +40,6 @@ static int32_t compareKey(const void* l, const void* r) { CacheTerm* lt = (CacheTerm*)l; CacheTerm* rt = (CacheTerm*)r; - // compare colId - if (lt->colId != rt->colId) { return lt->colId - rt->colId; } - // compare colVal int i, j; for (i = 0, j = 0; i < lt->nColVal && j < rt->nColVal; i++, j++) { @@ -56,71 +55,40 @@ static int32_t compareKey(const void* l, const void* r) { return -1; } // compare version - return rt->version - lt->version; - - // char* lp = (char*)l; - // char* rp = (char*)r; - - //// compare col id - // int16_t lf, rf; // cold id - // memcpy(&lf, lp, sizeof(lf)); - // memcpy(&rf, rp, sizeof(rf)); - // if (lf != rf) { return lf < rf ? -1 : 1; } - - // lp += sizeof(lf); - // rp += sizeof(rf); - - //// skip value len - // int32_t lfl, rfl; - // memcpy(&lfl, lp, sizeof(lfl)); - // memcpy(&rfl, rp, sizeof(rfl)); - // lp += sizeof(lfl); - // rp += sizeof(rfl); - - //// compare value - // int32_t i, j; - // for (i = 0, j = 0; i < lfl && j < rfl; i++, j++) { - // if (lp[i] == rp[j]) { - // continue; - // } else { - // return lp[i] < rp[j] ? -1 : 1; - // } - //} - // if (i < lfl) { - // return 1; - //} else if (j < rfl) { - // return -1; - //} - // lp += lfl; - // rp += rfl; - - //// compare version, desc order - // int32_t lv, rv; - // memcpy(&lv, lp, sizeof(lv)); - // memcpy(&rv, rp, sizeof(rv)); - // if (lv != rv) { return lv < rv ? 1 : -1; } - - // return 0; } -IndexCache* indexCacheCreate() { + +static SSkipList* indexInternalCacheCreate(int8_t type) { + if (type == TSDB_DATA_TYPE_BINARY) { + return tSkipListCreate(MAX_SKIP_LIST_LEVEL, type, MAX_INDEX_KEY_LEN, compareKey, SL_ALLOW_DUP_KEY, getIndexKey); + } +} + +IndexCache* indexCacheCreate(SIndex* idx, const char* colName, int8_t type) { IndexCache* cache = calloc(1, sizeof(IndexCache)); if (cache == NULL) { indexError("failed to create index cache"); return NULL; - } - cache->skiplist = - tSkipListCreate(MAX_SKIP_LIST_LEVEL, TSDB_DATA_TYPE_BINARY, MAX_INDEX_KEY_LEN, compareKey, SL_ALLOW_DUP_KEY, getIndexKey); + }; + cache->mem = indexInternalCacheCreate(type); + + cache->colName = calloc(1, strlen(colName) + 1); + memcpy(cache->colName, colName, strlen(colName)); + cache->type = type; + cache->index = idx; + cache->version = 0; + + indexCacheRef(cache); return cache; } void indexCacheDebug(IndexCache* cache) { - SSkipListIterator* iter = tSkipListCreateIter(cache->skiplist); + SSkipListIterator* iter = tSkipListCreateIter(cache->mem); while (tSkipListIterNext(iter)) { SSkipListNode* node = tSkipListIterGet(iter); CacheTerm* ct = (CacheTerm*)SL_GET_NODE_DATA(node); if (ct != NULL) { // TODO, add more debug info - indexInfo("{colId:%d, colVal: %s, version: %d} \t", ct->colId, ct->colVal, ct->version); + indexInfo("{colVal: %s, version: %d} \t", ct->colVal, ct->version); } } tSkipListDestroyIter(iter); @@ -129,37 +97,71 @@ void indexCacheDebug(IndexCache* cache) { void indexCacheDestroy(void* cache) { IndexCache* pCache = cache; if (pCache == NULL) { return; } - tSkipListDestroy(pCache->skiplist); + tSkipListDestroy(pCache->mem); + tSkipListDestroy(pCache->imm); + free(pCache->colName); free(pCache); } -int indexCachePut(void* cache, SIndexTerm* term, int16_t colId, int32_t version, uint64_t uid) { +static void doMergeWork(SSchedMsg* msg) { + IndexCache* pCache = msg->ahandle; + SIndex* sidx = (SIndex*)pCache->index; + indexFlushCacheTFile(sidx, pCache); +} + +int indexCacheSchedToMerge(IndexCache* pCache) { + SSchedMsg schedMsg = {0}; + schedMsg.fp = doMergeWork; + schedMsg.ahandle = pCache; + schedMsg.thandle = NULL; + schedMsg.msg = NULL; + + taosScheduleTask(indexQhandle, &schedMsg); +} +int indexCachePut(void* cache, SIndexTerm* term, uint64_t uid) { if (cache == NULL) { return -1; } IndexCache* pCache = cache; + indexCacheRef(pCache); // encode data CacheTerm* ct = calloc(1, sizeof(CacheTerm)); if (cache == NULL) { return -1; } // set up key - ct->colId = colId; ct->colType = term->colType; ct->nColVal = term->nColVal; ct->colVal = (char*)calloc(1, sizeof(char) * (ct->nColVal + 1)); memcpy(ct->colVal, term->colVal, ct->nColVal); - ct->version = version; - + ct->version = atomic_add_fetch_32(&pCache->version, 1); + // set value ct->uid = uid; ct->operaType = term->operType; - tSkipListPut(pCache->skiplist, (char*)ct); + tSkipListPut(pCache->mem, (char*)ct); + pCache->nTerm += 1; + + if (pCache->nTerm >= CACH_LIMIT) { + pCache->nTerm = 0; + + while (pCache->imm != NULL) { + // do nothong + } + + pCache->imm = pCache->mem; + pCache->mem = indexInternalCacheCreate(pCache->type); + + // sched to merge + // unref cache int bgwork + indexCacheSchedToMerge(pCache); + } + indexCacheUnRef(pCache); return 0; // encode end } -int indexCacheDel(void* cache, int32_t fieldId, const char* fieldValue, int32_t fvlen, uint64_t uid, int8_t operType) { +int indexCacheDel(void* cache, const char* fieldValue, int32_t fvlen, uint64_t uid, int8_t operType) { IndexCache* pCache = cache; return 0; } -int indexCacheSearch(void* cache, SIndexTermQuery* query, int16_t colId, int32_t version, SArray* result, STermValueType* s) { +int indexCacheSearch(void* cache, SIndexTermQuery* query, SArray* result, STermValueType* s) { if (cache == NULL) { return -1; } IndexCache* pCache = cache; SIndexTerm* term = query->term; @@ -167,15 +169,14 @@ int indexCacheSearch(void* cache, SIndexTermQuery* query, int16_t colId, int32_t CacheTerm* ct = calloc(1, sizeof(CacheTerm)); if (ct == NULL) { return -1; } - ct->colId = colId; ct->nColVal = term->nColVal; ct->colVal = calloc(1, sizeof(char) * (ct->nColVal + 1)); memcpy(ct->colVal, term->colVal, ct->nColVal); - ct->version = version; + ct->version = atomic_load_32(&pCache->version); char* key = getIndexKey(ct); // TODO handle multi situation later, and refactor - SSkipListIterator* iter = tSkipListCreateIterFromVal(pCache->skiplist, key, TSDB_DATA_TYPE_BINARY, TSDB_ORDER_ASC); + SSkipListIterator* iter = tSkipListCreateIterFromVal(pCache->mem, key, TSDB_DATA_TYPE_BINARY, TSDB_ORDER_ASC); while (tSkipListIterNext(iter)) { SSkipListNode* node = tSkipListIterGet(iter); if (node != NULL) { @@ -209,3 +210,12 @@ int indexCacheSearch(void* cache, SIndexTermQuery* query, int16_t colId, int32_t } return 0; } + +void indexCacheRef(IndexCache* cache) { + int ref = T_REF_INC(cache); + UNUSED(ref); +} +void indexCacheUnRef(IndexCache* cache) { + int ref = T_REF_DEC(cache); + if (ref == 0) { indexCacheDestroy(cache); } +} diff --git a/source/libs/index/src/index_tfile.c b/source/libs/index/src/index_tfile.c index 003ae86c6a..0dfb14cc8d 100644 --- a/source/libs/index/src/index_tfile.c +++ b/source/libs/index/src/index_tfile.c @@ -33,11 +33,9 @@ static int tfileWriteHeader(TFileWriter* writer); static int tfileWriteFstOffset(TFileWriter* tw, int32_t offset); static int tfileWriteData(TFileWriter* write, TFileValue* tval); -static int tfileReaderLoadHeader(TFileReader* reader); -static int tfileReaderLoadFst(TFileReader* reader); -static int tfileReaderLoadTableIds(TFileReader* reader, int32_t offset, SArray* result); -static void tfileReaderRef(TFileReader* reader); -static void tfileReaderUnRef(TFileReader* reader); +static int tfileReaderLoadHeader(TFileReader* reader); +static int tfileReaderLoadFst(TFileReader* reader); +static int tfileReaderLoadTableIds(TFileReader* reader, int32_t offset, SArray* result); static int tfileGetFileList(const char* path, SArray* result); static int tfileRmExpireFile(SArray* result); @@ -131,7 +129,6 @@ void tfileCachePut(TFileCache* tcache, TFileCacheKey* key, TFileReader* reader) taosHashPut(tcache->tableCache, buf, strlen(buf), &reader, sizeof(void*)); return; } - TFileReader* tfileReaderCreate(WriterCtx* ctx) { TFileReader* reader = calloc(1, sizeof(TFileReader)); if (reader == NULL) { return NULL; } @@ -317,6 +314,11 @@ int indexTFilePut(void* tfile, SIndexTerm* term, uint64_t uid) { return 0; } +TFileReader* tfileGetReaderByCol(IndexTFile* tf, char* colName) { + if (tf == NULL) { return NULL; } + TFileCacheKey key = {.suid = 0, .colType = TSDB_DATA_TYPE_BINARY, .colName = colName, .nColName = strlen(colName)}; + return tfileCacheGet(tf->cache, &key); +} static int tfileStrCompare(const void* a, const void* b) { int ret = strcmp((char*)a, (char*)b); @@ -423,12 +425,12 @@ static int tfileReaderLoadTableIds(TFileReader* reader, int32_t offset, SArray* free(buf); return 0; } -static void tfileReaderRef(TFileReader* reader) { +void tfileReaderRef(TFileReader* reader) { int ref = T_REF_INC(reader); UNUSED(ref); } -static void tfileReaderUnRef(TFileReader* reader) { +void tfileReaderUnRef(TFileReader* reader) { int ref = T_REF_DEC(reader); if (ref == 0) { tfileReaderDestroy(reader); } } @@ -479,9 +481,9 @@ static int tfileParseFileName(const char* filename, uint64_t* suid, int* colId, return -1; } static void tfileSerialCacheKey(TFileCacheKey* key, char* buf) { - SERIALIZE_MEM_TO_BUF(buf, key, suid); - SERIALIZE_VAR_TO_BUF(buf, '_', char); - SERIALIZE_MEM_TO_BUF(buf, key, colType); - SERIALIZE_VAR_TO_BUF(buf, '_', char); + // SERIALIZE_MEM_TO_BUF(buf, key, suid); + // SERIALIZE_VAR_TO_BUF(buf, '_', char); + // SERIALIZE_MEM_TO_BUF(buf, key, colType); + // SERIALIZE_VAR_TO_BUF(buf, '_', char); SERIALIZE_STR_MEM_TO_BUF(buf, key, colName, key->nColName); } diff --git a/source/libs/index/test/indexTests.cc b/source/libs/index/test/indexTests.cc index b046f32c21..17733dd284 100644 --- a/source/libs/index/test/indexTests.cc +++ b/source/libs/index/test/indexTests.cc @@ -471,10 +471,10 @@ class CacheObj { public: CacheObj() { // TODO - cache = indexCacheCreate(); + cache = indexCacheCreate(NULL, "voltage", TSDB_DATA_TYPE_BINARY); } int Put(SIndexTerm* term, int16_t colId, int32_t version, uint64_t uid) { - int ret = indexCachePut(cache, term, colId, version, uid); + int ret = indexCachePut(cache, term, uid); if (ret != 0) { // std::cout << "failed to put into cache: " << ret << std::endl; @@ -486,7 +486,7 @@ class CacheObj { indexCacheDebug(cache); } int Get(SIndexTermQuery* query, int16_t colId, int32_t version, SArray* result, STermValueType* s) { - int ret = indexCacheSearch(cache, query, colId, version, result, s); + int ret = indexCacheSearch(cache, query, result, s); if (ret != 0) { // std::cout << "failed to get from cache:" << ret << std::endl; @@ -561,7 +561,7 @@ TEST_F(IndexCacheEnv, cache_test) { } { std::string colVal("v4"); - for (size_t i = 0; i < 100; i++) { + for (size_t i = 0; i < 10; i++) { colVal[colVal.size() - 1] = 'a' + i; SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), colVal.c_str(), colVal.size()); @@ -578,7 +578,8 @@ TEST_F(IndexCacheEnv, cache_test) { STermValueType valType; coj->Get(&query, colId, 10000, ret, &valType); - assert(taosArrayGetSize(ret) == 3); + // std::cout << "size : " << taosArrayGetSize(ret) << std::endl; + assert(taosArrayGetSize(ret) == 4); } { std::string colVal("v2");