diff --git a/source/libs/index/inc/indexInt.h b/source/libs/index/inc/indexInt.h index 048c9e804e..6e1256d857 100644 --- a/source/libs/index/inc/indexInt.h +++ b/source/libs/index/inc/indexInt.h @@ -108,8 +108,17 @@ void iterateValueDestroy(IterateValue* iv, bool destroy); extern void* indexQhandle; +typedef struct TFileCacheKey { + uint64_t suid; + uint8_t colType; + char* colName; + int32_t nColName; +} ICacheKey; + int indexFlushCacheTFile(SIndex* sIdx, void*); +int32_t indexSerialCacheKey(ICacheKey* key, char* buf); + #define indexFatal(...) \ do { \ if (sDebugFlag & DEBUG_FATAL) { taosPrintLog("index FATAL ", 255, __VA_ARGS__); } \ diff --git a/source/libs/index/inc/index_cache.h b/source/libs/index/inc/index_cache.h index 12b66bca2c..805137ccaf 100644 --- a/source/libs/index/inc/index_cache.h +++ b/source/libs/index/inc/index_cache.h @@ -42,6 +42,7 @@ typedef struct IndexCache { int32_t version; int32_t nTerm; int8_t type; + uint64_t suid; pthread_mutex_t mtx; } IndexCache; @@ -58,7 +59,7 @@ typedef struct CacheTerm { } CacheTerm; // -IndexCache* indexCacheCreate(SIndex* idx, const char* colName, int8_t type); +IndexCache* indexCacheCreate(SIndex* idx, uint64_t suid, const char* colName, int8_t type); void indexCacheDestroy(void* cache); diff --git a/source/libs/index/inc/index_tfile.h b/source/libs/index/inc/index_tfile.h index 4928e01a63..675203fa1a 100644 --- a/source/libs/index/inc/index_tfile.h +++ b/source/libs/index/inc/index_tfile.h @@ -49,13 +49,6 @@ typedef struct TFileValue { int32_t offset; } TFileValue; -typedef struct TFileCacheKey { - uint64_t suid; - uint8_t colType; - char* colName; - int32_t nColName; -} TFileCacheKey; - // table cache // refactor to LRU cache later typedef struct TFileCache { @@ -103,10 +96,10 @@ typedef struct TFileReaderOpt { // tfile cache, manage tindex reader TFileCache* tfileCacheCreate(const char* path); void tfileCacheDestroy(TFileCache* tcache); -TFileReader* tfileCacheGet(TFileCache* tcache, TFileCacheKey* key); -void tfileCachePut(TFileCache* tcache, TFileCacheKey* key, TFileReader* reader); +TFileReader* tfileCacheGet(TFileCache* tcache, ICacheKey* key); +void tfileCachePut(TFileCache* tcache, ICacheKey* key, TFileReader* reader); -TFileReader* tfileGetReaderByCol(IndexTFile* tf, char* colName); +TFileReader* tfileGetReaderByCol(IndexTFile* tf, uint64_t suid, char* colName); TFileReader* tfileReaderOpen(char* path, uint64_t suid, int32_t version, const char* colName); TFileReader* tfileReaderCreate(WriterCtx* ctx); diff --git a/source/libs/index/inc/index_util.h b/source/libs/index/inc/index_util.h index 21c5ca155b..adeb52bb8c 100644 --- a/source/libs/index/inc/index_util.h +++ b/source/libs/index/inc/index_util.h @@ -34,7 +34,7 @@ extern "C" { #define SERIALIZE_VAR_TO_BUF(buf, var, type) \ do { \ type c = var; \ - assert(sizeof(var) == sizeof(type)); \ + assert(sizeof(type) == sizeof(c)); \ memcpy((void*)buf, (void*)&c, sizeof(c)); \ buf += sizeof(c); \ } while (0) diff --git a/source/libs/index/src/index.c b/source/libs/index/src/index.c index 6398259a96..11a2aa5e5f 100644 --- a/source/libs/index/src/index.c +++ b/source/libs/index/src/index.c @@ -17,6 +17,7 @@ #include "indexInt.h" #include "index_cache.h" #include "index_tfile.h" +#include "index_util.h" #include "tdef.h" #include "tsched.h" @@ -130,18 +131,28 @@ int indexPut(SIndex* index, SIndexMultiTerm* fVals, uint64_t uid) { // TODO(yihao): reduce the lock range pthread_mutex_lock(&index->mtx); for (int i = 0; i < taosArrayGetSize(fVals); i++) { - SIndexTerm* p = taosArrayGetP(fVals, i); - IndexCache** cache = taosHashGet(index->colObj, p->colName, p->nColName); + SIndexTerm* p = taosArrayGetP(fVals, i); + + char buf[128] = {0}; + ICacheKey key = {.suid = p->suid, .colName = p->colName}; + int32_t sz = indexSerialCacheKey(&key, buf); + + IndexCache** cache = taosHashGet(index->colObj, buf, sz); if (cache == NULL) { - IndexCache* pCache = indexCacheCreate(index, p->colName, p->colType); - taosHashPut(index->colObj, p->colName, p->nColName, &pCache, sizeof(void*)); + IndexCache* pCache = indexCacheCreate(index, p->suid, p->colName, p->colType); + taosHashPut(index->colObj, buf, sz, &pCache, sizeof(void*)); } } pthread_mutex_unlock(&index->mtx); for (int i = 0; i < taosArrayGetSize(fVals); i++) { - SIndexTerm* p = taosArrayGetP(fVals, i); - IndexCache** cache = taosHashGet(index->colObj, p->colName, p->nColName); + SIndexTerm* p = taosArrayGetP(fVals, i); + + char buf[128] = {0}; + ICacheKey key = {.suid = p->suid, .colName = p->colName}; + int32_t sz = indexSerialCacheKey(&key, buf); + + IndexCache** cache = taosHashGet(index->colObj, buf, sz); assert(*cache != NULL); int ret = indexCachePut(*cache, p, uid); if (ret != 0) { return ret; } @@ -296,7 +307,12 @@ static int indexTermSearch(SIndex* sIdx, SIndexTermQuery* query, SArray** result // Get col info IndexCache* cache = NULL; pthread_mutex_lock(&sIdx->mtx); - IndexCache** pCache = taosHashGet(sIdx->colObj, colName, nColName); + + char buf[128] = {0}; + ICacheKey key = {.suid = term->suid, .colName = term->colName}; + int32_t sz = indexSerialCacheKey(&key, buf); + + IndexCache** pCache = taosHashGet(sIdx->colObj, buf, sz); if (pCache == NULL) { pthread_mutex_unlock(&sIdx->mtx); return -1; @@ -385,10 +401,12 @@ int indexFlushCacheTFile(SIndex* sIdx, void* cache) { indexWarn("suid %" PRIu64 " merge cache into tindex", sIdx->suid); IndexCache* pCache = (IndexCache*)cache; - TFileReader* pReader = tfileGetReaderByCol(sIdx->tindex, pCache->colName); + TFileReader* pReader = tfileGetReaderByCol(sIdx->tindex, pCache->suid, pCache->colName); + if (pReader == NULL) { indexWarn("empty pReader found"); } // handle flush Iterate* cacheIter = indexCacheIteratorCreate(pCache); Iterate* tfileIter = tfileIteratorCreate(pReader); + if (tfileIter == NULL) { indexWarn("empty tfile reader iterator"); } SArray* result = taosArrayInit(1024, sizeof(void*)); @@ -468,7 +486,7 @@ static int indexGenTFile(SIndex* sIdx, IndexCache* cache, SArray* batch) { int32_t version = CACHE_VERSION(cache); uint8_t colType = cache->type; - TFileWriter* tw = tfileWriterOpen(sIdx->path, sIdx->suid, version, cache->colName, colType); + TFileWriter* tw = tfileWriterOpen(sIdx->path, cache->suid, version, cache->colName, colType); if (tw == NULL) { indexError("failed to open file to write"); return -1; @@ -481,14 +499,13 @@ static int indexGenTFile(SIndex* sIdx, IndexCache* cache, SArray* batch) { } tfileWriterClose(tw); - TFileReader* reader = tfileReaderOpen(sIdx->path, sIdx->suid, version, cache->colName); + TFileReader* reader = tfileReaderOpen(sIdx->path, cache->suid, version, cache->colName); + + char buf[128] = {0}; + TFileHeader* header = &reader->header; + ICacheKey key = { + .suid = cache->suid, .colName = header->colName, .nColName = strlen(header->colName), .colType = header->colType}; - char buf[128] = {0}; - TFileHeader* header = &reader->header; - TFileCacheKey key = {.suid = header->suid, - .colName = header->colName, - .nColName = strlen(header->colName), - .colType = header->colType}; pthread_mutex_lock(&sIdx->mtx); IndexTFile* ifile = (IndexTFile*)sIdx->tindex; @@ -499,3 +516,13 @@ static int indexGenTFile(SIndex* sIdx, IndexCache* cache, SArray* batch) { END: tfileWriterClose(tw); } + +int32_t indexSerialCacheKey(ICacheKey* key, char* buf) { + char* p = buf; + SERIALIZE_MEM_TO_BUF(buf, key, suid); + SERIALIZE_VAR_TO_BUF(buf, '_', char); + // SERIALIZE_MEM_TO_BUF(buf, key, colType); + // SERIALIZE_VAR_TO_BUF(buf, '_', char); + SERIALIZE_STR_MEM_TO_BUF(buf, key, colName, key->nColName); + return buf - p; +} diff --git a/source/libs/index/src/index_cache.c b/source/libs/index/src/index_cache.c index 503d7cd928..0f00d9d4af 100644 --- a/source/libs/index/src/index_cache.c +++ b/source/libs/index/src/index_cache.c @@ -40,7 +40,7 @@ static bool indexCacheIteratorNext(Iterate* itera); static IterateValue* indexCacheIteratorGetValue(Iterate* iter); -IndexCache* indexCacheCreate(SIndex* idx, const char* colName, int8_t type) { +IndexCache* indexCacheCreate(SIndex* idx, uint64_t suid, const char* colName, int8_t type) { IndexCache* cache = calloc(1, sizeof(IndexCache)); if (cache == NULL) { indexError("failed to create index cache"); @@ -53,7 +53,7 @@ IndexCache* indexCacheCreate(SIndex* idx, const char* colName, int8_t type) { cache->type = type; cache->index = idx; cache->version = 0; - + cache->suid = suid; pthread_mutex_init(&cache->mtx, NULL); indexCacheRef(cache); return cache; diff --git a/source/libs/index/src/index_tfile.c b/source/libs/index/src/index_tfile.c index 6669198861..fe54b5ebb3 100644 --- a/source/libs/index/src/index_tfile.c +++ b/source/libs/index/src/index_tfile.c @@ -51,7 +51,6 @@ static void tfileDestroyFileName(void* elem); static int tfileCompare(const void* a, const void* b); static int tfileParseFileName(const char* filename, uint64_t* suid, int* colId, int* version); static void tfileGenFileName(char* filename, uint64_t suid, int colId, int version); -static void tfileSerialCacheKey(TFileCacheKey* key, char* buf); TFileCache* tfileCacheCreate(const char* path) { TFileCache* tcache = calloc(1, sizeof(TFileCache)); @@ -80,18 +79,18 @@ TFileCache* tfileCacheCreate(const char* path) { goto End; } - char buf[128] = {0}; - TFileReader* reader = tfileReaderCreate(wc); - TFileHeader* header = &reader->header; - TFileCacheKey key = {.suid = header->suid, - .colName = header->colName, - .nColName = strlen(header->colName), - .colType = header->colType}; - tfileSerialCacheKey(&key, buf); + char buf[128] = {0}; + TFileReader* reader = tfileReaderCreate(wc); + TFileHeader* header = &reader->header; + ICacheKey key = {.suid = header->suid, + .colName = header->colName, + .nColName = strlen(header->colName), + .colType = header->colType}; + int32_t sz = indexSerialCacheKey(&key, buf); + assert(sz < sizeof(buf)); + taosHashPut(tcache->tableCache, buf, sz, &reader, sizeof(void*)); tfileReaderRef(reader); - // indexTable - taosHashPut(tcache->tableCache, buf, strlen(buf), &reader, sizeof(void*)); } taosArrayDestroyEx(files, tfileDestroyFileName); return tcache; @@ -117,30 +116,30 @@ void tfileCacheDestroy(TFileCache* tcache) { free(tcache); } -TFileReader* tfileCacheGet(TFileCache* tcache, TFileCacheKey* key) { - char buf[128] = {0}; - tfileSerialCacheKey(key, buf); - - TFileReader** reader = taosHashGet(tcache->tableCache, buf, strlen(buf)); +TFileReader* tfileCacheGet(TFileCache* tcache, ICacheKey* key) { + char buf[128] = {0}; + int32_t sz = indexSerialCacheKey(key, buf); + assert(sz < sizeof(buf)); + TFileReader** reader = taosHashGet(tcache->tableCache, buf, sz); if (reader == NULL) { return NULL; } tfileReaderRef(*reader); return *reader; } -void tfileCachePut(TFileCache* tcache, TFileCacheKey* key, TFileReader* reader) { - char buf[128] = {0}; - tfileSerialCacheKey(key, buf); +void tfileCachePut(TFileCache* tcache, ICacheKey* key, TFileReader* reader) { + char buf[128] = {0}; + int32_t sz = indexSerialCacheKey(key, buf); // remove last version index reader - TFileReader** p = taosHashGet(tcache->tableCache, buf, strlen(buf)); + TFileReader** p = taosHashGet(tcache->tableCache, buf, sz); if (p != NULL) { TFileReader* oldReader = *p; - taosHashRemove(tcache->tableCache, buf, strlen(buf)); + taosHashRemove(tcache->tableCache, buf, sz); oldReader->remove = true; tfileReaderUnRef(oldReader); } + taosHashPut(tcache->tableCache, buf, sz, &reader, sizeof(void*)); tfileReaderRef(reader); - taosHashPut(tcache->tableCache, buf, strlen(buf), &reader, sizeof(void*)); return; } TFileReader* tfileReaderCreate(WriterCtx* ctx) { @@ -230,8 +229,6 @@ TFileReader* tfileReaderOpen(char* path, uint64_t suid, int32_t version, const c TFileReader* reader = tfileReaderCreate(wc); return reader; - - // tfileSerialCacheKey(&key, buf); } TFileWriter* tfileWriterCreate(WriterCtx* ctx, TFileHeader* header) { // char pathBuf[128] = {0}; @@ -325,15 +322,19 @@ int tfileWriterPut(TFileWriter* tw, void* data, bool order) { tfileWriterClose(tw); return -1; } - // write fst + + // write data indexError("--------Begin----------------"); for (size_t i = 0; i < sz; i++) { // TODO, fst batch write later TFileValue* v = taosArrayGetP((SArray*)data, i); - if (tfileWriteData(tw, v) == 0) { - // + if (tfileWriteData(tw, v) != 0) { + indexError("failed to write data: %s, offset: %d len: %d", v->colVal, v->offset, + (int)taosArrayGetSize(v->tableId)); + } else { + indexInfo("success to write data: %s, offset: %d len: %d", v->colVal, v->offset, + (int)taosArrayGetSize(v->tableId)); } - indexError("data: %s, offset: %d len: %d", v->colVal, v->offset, (int)taosArrayGetSize(v->tableId)); } indexError("--------End----------------"); fstBuilderFinish(tw->fb); @@ -369,9 +370,8 @@ int indexTFileSearch(void* tfile, SIndexTermQuery* query, SArray* result) { if (tfile == NULL) { return ret; } IndexTFile* pTfile = (IndexTFile*)tfile; - SIndexTerm* term = query->term; - TFileCacheKey key = { - .suid = term->suid, .colType = term->colType, .colName = term->colName, .nColName = term->nColName}; + SIndexTerm* term = query->term; + ICacheKey key = {.suid = term->suid, .colType = term->colType, .colName = term->colName, .nColName = term->nColName}; TFileReader* reader = tfileCacheGet(pTfile->cache, &key); if (reader == NULL) { return 0; } @@ -453,9 +453,9 @@ void tfileIteratorDestroy(Iterate* iter) { free(iter); } -TFileReader* tfileGetReaderByCol(IndexTFile* tf, char* colName) { +TFileReader* tfileGetReaderByCol(IndexTFile* tf, uint64_t suid, char* colName) { if (tf == NULL) { return NULL; } - TFileCacheKey key = {.suid = 0, .colType = TSDB_DATA_TYPE_BINARY, .colName = colName, .nColName = strlen(colName)}; + ICacheKey key = {.suid = suid, .colType = TSDB_DATA_TYPE_BINARY, .colName = colName, .nColName = strlen(colName)}; return tfileCacheGet(tf->cache, &key); } @@ -650,10 +650,3 @@ static int tfileParseFileName(const char* filename, uint64_t* suid, int* colId, } return -1; } -static void tfileSerialCacheKey(TFileCacheKey* key, char* buf) { - // SERIALIZE_MEM_TO_BUF(buf, key, suid); - // SERIALIZE_VAR_TO_BUF(buf, '_', char); - // SERIALIZE_MEM_TO_BUF(buf, key, colType); - // SERIALIZE_VAR_TO_BUF(buf, '_', char); - SERIALIZE_STR_MEM_TO_BUF(buf, key, colName, key->nColName); -} diff --git a/source/libs/index/test/indexTests.cc b/source/libs/index/test/indexTests.cc index 3ad64cd03e..7bf40bc7b3 100644 --- a/source/libs/index/test/indexTests.cc +++ b/source/libs/index/test/indexTests.cc @@ -477,7 +477,7 @@ class CacheObj { public: CacheObj() { // TODO - cache = indexCacheCreate(NULL, "voltage", TSDB_DATA_TYPE_BINARY); + cache = indexCacheCreate(NULL, 0, "voltage", TSDB_DATA_TYPE_BINARY); } int Put(SIndexTerm* term, int16_t colId, int32_t version, uint64_t uid) { int ret = indexCachePut(cache, term, uid);