From ce5f9fa9f3800cfca9a7986ac6f7db4830a01414 Mon Sep 17 00:00:00 2001 From: Yihao Deng Date: Tue, 23 Jul 2024 06:59:09 +0000 Subject: [PATCH] refactor index --- include/libs/index/index.h | 16 ++-- source/libs/index/inc/indexFstFile.h | 6 +- source/libs/index/inc/indexTfile.h | 27 +++---- source/libs/index/src/index.c | 96 ++++++++++++++---------- source/libs/index/src/indexCache.c | 11 ++- source/libs/index/src/indexFstFile.c | 9 ++- source/libs/index/src/indexJson.c | 6 +- source/libs/index/src/indexTfile.c | 105 ++++++++++++++++----------- source/libs/index/src/indexUtil.c | 8 ++ source/libs/index/test/indexTests.cc | 6 +- 10 files changed, 177 insertions(+), 113 deletions(-) diff --git a/include/libs/index/index.h b/include/libs/index/index.h index cfcc9993cf..0ef21794ee 100644 --- a/include/libs/index/index.h +++ b/include/libs/index/index.h @@ -83,7 +83,7 @@ void indexMultiTermQueryDestroy(SIndexMultiTermQuery* pQuery); * @param type (input, single query type) * @return error code */ -int indexMultiTermQueryAdd(SIndexMultiTermQuery* pQuery, SIndexTerm* term, EIndexQueryType type); +int32_t indexMultiTermQueryAdd(SIndexMultiTermQuery* pQuery, SIndexTerm* term, EIndexQueryType type); /* * open index * @param opt (input, index opt) @@ -91,7 +91,7 @@ int indexMultiTermQueryAdd(SIndexMultiTermQuery* pQuery, SIndexTerm* term, EInde * @param index (output, index object) * @return error code */ -int indexOpen(SIndexOpts* opt, const char* path, SIndex** index); +int32_t indexOpen(SIndexOpts* opt, const char* path, SIndex** index); /* * close index * @param index (input, index to be closed) @@ -106,7 +106,7 @@ void indexClose(SIndex* index); * @param uid (input, uid of terms) * @return error code */ -int indexPut(SIndex* index, SIndexMultiTerm* terms, uint64_t uid); +int32_t indexPut(SIndex* index, SIndexMultiTerm* terms, uint64_t uid); /* * delete terms that meet query condition * @param index (input, index object) @@ -114,7 +114,7 @@ int indexPut(SIndex* index, SIndexMultiTerm* terms, uint64_t uid); * @return error code */ -int indexDelete(SIndex* index, SIndexMultiTermQuery* query); +int32_t indexDelete(SIndex* index, SIndexMultiTermQuery* query); /* * search index * @param index (input, index object) @@ -122,7 +122,7 @@ int indexDelete(SIndex* index, SIndexMultiTermQuery* query); * @param result(output, query result) * @return error code */ -int indexSearch(SIndex* index, SIndexMultiTermQuery* query, SArray* result); +int32_t indexSearch(SIndex* index, SIndexMultiTermQuery* query, SArray* result); /* * rebuild index * @param index (input, index object) @@ -138,7 +138,7 @@ int indexSearch(SIndex* index, SIndexMultiTermQuery* query, SArray* result); * @param index (output, index json object) * @return error code */ -int indexJsonOpen(SIndexJsonOpts* opts, const char* path, SIndexJson** index); +int32_t indexJsonOpen(SIndexJsonOpts* opts, const char* path, SIndexJson** index); /* * close index * @param index (input, index to be closed) @@ -154,7 +154,7 @@ void indexJsonClose(SIndexJson* index); * @param uid (input, uid of terms) * @return error code */ -int indexJsonPut(SIndexJson* index, SIndexJsonMultiTerm* terms, uint64_t uid); +int32_t indexJsonPut(SIndexJson* index, SIndexJsonMultiTerm* terms, uint64_t uid); /* * search index * @param index (input, index object) @@ -163,7 +163,7 @@ int indexJsonPut(SIndexJson* index, SIndexJsonMultiTerm* terms, uint64_t uid); * @return error code */ -int indexJsonSearch(SIndexJson* index, SIndexJsonMultiTermQuery* query, SArray* result); +int32_t indexJsonSearch(SIndexJson* index, SIndexJsonMultiTermQuery* query, SArray* result); /* * @param * @param diff --git a/source/libs/index/inc/indexFstFile.h b/source/libs/index/inc/indexFstFile.h index d15141f79a..b1ff4fee34 100644 --- a/source/libs/index/inc/indexFstFile.h +++ b/source/libs/index/inc/indexFstFile.h @@ -78,11 +78,11 @@ typedef struct IdxFstFile { CheckSummer summer; } IdxFstFile; -int idxFileWrite(IdxFstFile* write, uint8_t* buf, uint32_t len); +int32_t idxFileWrite(IdxFstFile* write, uint8_t* buf, uint32_t len); -int idxFileRead(IdxFstFile* write, uint8_t* buf, uint32_t len); +int32_t idxFileRead(IdxFstFile* write, uint8_t* buf, uint32_t len); -int idxFileFlush(IdxFstFile* write); +int32_t idxFileFlush(IdxFstFile* write); uint32_t idxFileMaskedCheckSum(IdxFstFile* write); diff --git a/source/libs/index/inc/indexTfile.h b/source/libs/index/inc/indexTfile.h index 7425b13ce9..51d3cc0e7b 100644 --- a/source/libs/index/inc/indexTfile.h +++ b/source/libs/index/inc/indexTfile.h @@ -99,23 +99,24 @@ typedef struct TFileReaderOpt { TFileCache* tfileCacheCreate(SIndex* idx, const char* path); void tfileCacheDestroy(TFileCache* tcache); TFileReader* tfileCacheGet(TFileCache* tcache, ICacheKey* key); -void tfileCachePut(TFileCache* tcache, ICacheKey* key, TFileReader* reader); +int32_t tfileCachePut(TFileCache* tcache, ICacheKey* key, TFileReader* reader); TFileReader* tfileGetReaderByCol(IndexTFile* tf, uint64_t suid, char* colName); -TFileReader* tfileReaderOpen(SIndex* idx, uint64_t suid, int64_t version, const char* colName); -TFileReader* tfileReaderCreate(IFileCtx* ctx); -void tfileReaderDestroy(TFileReader* reader); -int tfileReaderSearch(TFileReader* reader, SIndexTermQuery* query, SIdxTRslt* tr); -void tfileReaderRef(TFileReader* reader); -void tfileReaderUnRef(TFileReader* reader); +int32_t tfileReaderOpen(SIndex* idx, uint64_t suid, int64_t version, const char* colName, TFileReader** pReader); +int32_t tfileReaderCreate(IFileCtx* ctx, TFileReader** pReader); +void tfileReaderDestroy(TFileReader* reader); +int tfileReaderSearch(TFileReader* reader, SIndexTermQuery* query, SIdxTRslt* tr); +void tfileReaderRef(TFileReader* reader); +void tfileReaderUnRef(TFileReader* reader); -TFileWriter* tfileWriterOpen(char* path, uint64_t suid, int64_t version, const char* colName, uint8_t type); -void tfileWriterClose(TFileWriter* tw); -TFileWriter* tfileWriterCreate(IFileCtx* ctx, TFileHeader* header); -void tfileWriterDestroy(TFileWriter* tw); -int tfileWriterPut(TFileWriter* tw, void* data, bool order); -int tfileWriterFinish(TFileWriter* tw); +int32_t tfileWriterOpen(char* path, uint64_t suid, int64_t version, const char* colName, uint8_t type, + TFileWriter** pWriter); +void tfileWriterClose(TFileWriter* tw); +int32_t tfileWriterCreate(IFileCtx* ctx, TFileHeader* header, TFileWriter** pWriter); +void tfileWriterDestroy(TFileWriter* tw); +int tfileWriterPut(TFileWriter* tw, void* data, bool order); +int tfileWriterFinish(TFileWriter* tw); // IndexTFile* idxTFileCreate(SIndex* idx, const char* path); diff --git a/source/libs/index/src/index.c b/source/libs/index/src/index.c index d7e5d87e80..fad04798a8 100644 --- a/source/libs/index/src/index.c +++ b/source/libs/index/src/index.c @@ -86,10 +86,10 @@ static TdThreadOnce isInit = PTHREAD_ONCE_INIT; // static void indexInit(); static int idxTermSearch(SIndex* sIdx, SIndexTermQuery* term, SArray** result); -static void idxInterRsltDestroy(SArray* results); -static int idxMergeFinalResults(SArray* in, EIndexOperatorType oType, SArray* out); +static void idxInterRsltDestroy(SArray* results); +static int32_t idxMergeFinalResults(SArray* in, EIndexOperatorType oType, SArray* out); -static int idxGenTFile(SIndex* index, IndexCache* cache, SArray* batch); +static int32_t idxGenTFile(SIndex* index, IndexCache* cache, SArray* batch); // merge cache and tfile by opera type static void idxMergeCacheAndTFile(SArray* result, IterateValue* icache, IterateValue* iTfv, SIdxTRslt* helper); @@ -106,30 +106,37 @@ static void indexWait(void* idx) { tsem_wait(&pIdx->sem); } -int indexOpen(SIndexOpts* opts, const char* path, SIndex** index) { - int ret = TSDB_CODE_SUCCESS; +int32_t indexOpen(SIndexOpts* opts, const char* path, SIndex** index) { taosThreadOnce(&isInit, indexEnvInit); + + int code = TSDB_CODE_SUCCESS; SIndex* idx = taosMemoryCalloc(1, sizeof(SIndex)); if (idx == NULL) { - return TSDB_CODE_OUT_OF_MEMORY; + TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, END); } idx->lru = taosLRUCacheInit(opts->cacheSize, -1, .5); if (idx->lru == NULL) { - ret = TSDB_CODE_OUT_OF_MEMORY; - goto END; + TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, END); } taosLRUCacheSetStrictCapacity(idx->lru, false); idx->tindex = idxTFileCreate(idx, path); if (idx->tindex == NULL) { - ret = TSDB_CODE_OUT_OF_MEMORY; - goto END; + TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, END); } idx->colObj = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); + if (idx->colObj == NULL) { + TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, END); + } + idx->version = 1; idx->path = taosStrdup(path); + if (idx->path == NULL) { + TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, END); + } + taosThreadMutexInit(&idx->mtx, NULL); tsem_init(&idx->sem, 0, 0); @@ -138,17 +145,18 @@ int indexOpen(SIndexOpts* opts, const char* path, SIndex** index) { idxAcquireRef(idx->refId); *index = idx; - return ret; + return code; END: if (idx != NULL) { indexDestroy(idx); } *index = NULL; - return ret; + return code; } void indexDestroy(void* handle) { + if (handle == NULL) return; SIndex* idx = handle; taosThreadMutexDestroy(&idx->mtx); tsem_destroy(&idx->sem); @@ -202,7 +210,7 @@ void idxReleaseRef(int64_t ref) { taosReleaseRef(indexRefMgt, ref); } -int indexPut(SIndex* index, SIndexMultiTerm* fVals, uint64_t uid) { +int32_t indexPut(SIndex* index, SIndexMultiTerm* fVals, uint64_t uid) { // TODO(yihao): reduce the lock range taosThreadMutexLock(&index->mtx); for (int i = 0; i < taosArrayGetSize(fVals); i++) { @@ -239,7 +247,7 @@ int indexPut(SIndex* index, SIndexMultiTerm* fVals, uint64_t uid) { } return 0; } -int indexSearch(SIndex* index, SIndexMultiTermQuery* multiQuerys, SArray* result) { +int32_t indexSearch(SIndex* index, SIndexMultiTermQuery* multiQuerys, SArray* result) { EIndexOperatorType opera = multiQuerys->opera; // relation of querys SArray* iRslts = taosArrayInit(4, POINTER_BYTES); @@ -285,7 +293,7 @@ void indexMultiTermQueryDestroy(SIndexMultiTermQuery* pQuery) { taosArrayDestroy(pQuery->query); taosMemoryFree(pQuery); }; -int indexMultiTermQueryAdd(SIndexMultiTermQuery* pQuery, SIndexTerm* term, EIndexQueryType qType) { +int32_t indexMultiTermQueryAdd(SIndexMultiTermQuery* pQuery, SIndexTerm* term, EIndexQueryType qType) { SIndexTermQuery q = {.qType = qType, .term = term}; taosArrayPush(pQuery->query, &q); return 0; @@ -332,7 +340,7 @@ void indexTermDestroy(SIndexTerm* p) { SIndexMultiTerm* indexMultiTermCreate() { return taosArrayInit(4, sizeof(SIndexTerm*)); } -int indexMultiTermAdd(SIndexMultiTerm* terms, SIndexTerm* term) { +int32_t indexMultiTermAdd(SIndexMultiTerm* terms, SIndexTerm* term) { taosArrayPush(terms, &term); return 0; } @@ -392,7 +400,7 @@ bool indexJsonIsRebuild(SIndexJson* idx) { return ((SIdxStatus)atomic_load_8(&idx->status)) == kRebuild ? true : false; } -static int idxTermSearch(SIndex* sIdx, SIndexTermQuery* query, SArray** result) { +static int32_t idxTermSearch(SIndex* sIdx, SIndexTermQuery* query, SArray** result) { SIndexTerm* term = query->term; const char* colName = term->colName; int32_t nColName = term->nColName; @@ -404,6 +412,7 @@ static int idxTermSearch(SIndex* sIdx, SIndexTermQuery* query, SArray** result) ICacheKey key = { .suid = term->suid, .colName = term->colName, .nColName = strlen(term->colName), .colType = term->colType}; indexDebug("r suid:%" PRIu64 ", colName:%s, colType:%d", key.suid, key.colName, key.colType); + int32_t sz = idxSerialCacheKey(&key, buf); taosThreadMutexLock(&sIdx->mtx); @@ -412,7 +421,11 @@ static int idxTermSearch(SIndex* sIdx, SIndexTermQuery* query, SArray** result) taosThreadMutexUnlock(&sIdx->mtx); *result = taosArrayInit(4, sizeof(uint64_t)); + if (*result == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } // TODO: iterator mem and tidex + STermValueType s = kTypeValue; int64_t st = taosGetTimestampUs(); @@ -445,7 +458,7 @@ static int idxTermSearch(SIndex* sIdx, SIndexTermQuery* query, SArray** result) return 0; END: idxTRsltDestroy(tr); - return -1; + return 0; } static void idxInterRsltDestroy(SArray* results) { if (results == NULL) { @@ -460,7 +473,7 @@ static void idxInterRsltDestroy(SArray* results) { taosArrayDestroy(results); } -static int idxMergeFinalResults(SArray* in, EIndexOperatorType oType, SArray* out) { +static int32_t idxMergeFinalResults(SArray* in, EIndexOperatorType oType, SArray* out) { // refactor, merge interResults into fResults by oType for (int i = 0; i < taosArrayGetSize(in); i++) { SArray* t = taosArrayGetP(in, i); @@ -527,9 +540,9 @@ static void idxDestroyFinalRslt(SArray* result) { taosArrayDestroy(result); } -int idxFlushCacheToTFile(SIndex* sIdx, void* cache, bool quit) { +int32_t idxFlushCacheToTFile(SIndex* sIdx, void* cache, bool quit) { if (sIdx == NULL) { - return -1; + return TSDB_CODE_INVALID_PTR; } indexInfo("suid %" PRIu64 " merge cache into tindex", sIdx->suid); @@ -537,8 +550,9 @@ int idxFlushCacheToTFile(SIndex* sIdx, void* cache, bool quit) { IndexCache* pCache = (IndexCache*)cache; - while (quit && atomic_load_32(&pCache->merging) == 1) - ; + do { + } while (quit && atomic_load_32(&pCache->merging) == 1); + TFileReader* pReader = tfileGetReaderByCol(sIdx->tindex, pCache->suid, pCache->colName); if (pReader == NULL) { indexWarn("empty tfile reader found"); @@ -568,6 +582,8 @@ int idxFlushCacheToTFile(SIndex* sIdx, void* cache, bool quit) { bool tn = tfileIter ? tfileIter->next(tfileIter) : false; SIdxTRslt* tr = idxTRsltCreate(); + if (tr == NULL) { + } while (cn == true || tn == true) { IterateValue* cv = (cn == true) ? cacheIter->getValue(cacheIter) : NULL; IterateValue* tv = (tn == true) ? tfileIter->getValue(tfileIter) : NULL; @@ -650,27 +666,30 @@ static int64_t idxGetAvailableVer(SIndex* sIdx, IndexCache* cache) { tfileReaderUnRef(rd); return ver; } -static int idxGenTFile(SIndex* sIdx, IndexCache* cache, SArray* batch) { +static int32_t idxGenTFile(SIndex* sIdx, IndexCache* cache, SArray* batch) { + int32_t code = 0; + int64_t version = idxGetAvailableVer(sIdx, cache); indexInfo("file name version: %" PRId64 "", version); - uint8_t colType = cache->type; - TFileWriter* tw = tfileWriterOpen(sIdx->path, cache->suid, version, cache->colName, colType); - if (tw == NULL) { - indexError("failed to open file to write"); - return -1; + TFileWriter* tw = NULL; + + code = tfileWriterOpen(sIdx->path, cache->suid, version, cache->colName, cache->type, &tw); + if (code != 0) { + indexError("failed to open file to write since %s", tstrerror(code)); } - int ret = tfileWriterPut(tw, batch, true); - if (ret != 0) { - indexError("failed to write into tindex "); + code = tfileWriterPut(tw, batch, true); + if (code != 0) { + indexError("failed to write into tindex since %s", tstrerror(code)); goto END; } tfileWriterClose(tw); - TFileReader* reader = tfileReaderOpen(sIdx, cache->suid, version, cache->colName); - if (reader == NULL) { - return -1; + TFileReader* reader = NULL; + code = tfileReaderOpen(sIdx, cache->suid, version, cache->colName, &reader); + if (code != 0) { + goto END; } indexInfo("success to create tfile, reopen it, %s", reader->ctx->file.buf); @@ -680,16 +699,17 @@ static int idxGenTFile(SIndex* sIdx, IndexCache* cache, SArray* batch) { ICacheKey key = {.suid = cache->suid, .colName = header->colName, .nColName = strlen(header->colName)}; taosThreadMutexLock(&tf->mtx); - tfileCachePut(tf->cache, &key, reader); + code = tfileCachePut(tf->cache, &key, reader); taosThreadMutexUnlock(&tf->mtx); - return ret; + return code; + END: if (tw != NULL) { idxFileCtxDestroy(tw->ctx, true); taosMemoryFree(tw); } - return -1; + return code; } int32_t idxSerialCacheKey(ICacheKey* key, char* buf) { diff --git a/source/libs/index/src/indexCache.c b/source/libs/index/src/indexCache.c index a6fbc73332..f2aefcfc41 100644 --- a/source/libs/index/src/indexCache.c +++ b/source/libs/index/src/indexCache.c @@ -129,6 +129,10 @@ static int32_t cacheSearchCompareFunc(void* cache, SIndexTerm* term, SIdxTRslt* _cache_range_compare cmpFn = idxGetCompare(type); CacheTerm* pCt = taosMemoryCalloc(1, sizeof(CacheTerm)); + if (pCt == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + pCt->colVal = term->colVal; pCt->colType = term->colType; pCt->version = atomic_load_64(&pCache->version); @@ -182,6 +186,10 @@ static int32_t cacheSearchTerm_JSON(void* cache, SIndexTerm* term, SIdxTRslt* tr IndexCache* pCache = mem->pCache; CacheTerm* pCt = taosMemoryCalloc(1, sizeof(CacheTerm)); + if (pCt == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + pCt->colVal = term->colVal; pCt->version = atomic_load_64(&pCache->version); @@ -340,7 +348,8 @@ IndexCache* idxCacheCreate(SIndex* idx, uint64_t suid, const char* colName, int8 cache->mem = idxInternalCacheCreate(type); cache->mem->pCache = cache; - cache->colName = IDX_TYPE_CONTAIN_EXTERN_TYPE(type, TSDB_DATA_TYPE_JSON) ? taosStrdup(JSON_COLUMN) : taosStrdup(colName); + cache->colName = + IDX_TYPE_CONTAIN_EXTERN_TYPE(type, TSDB_DATA_TYPE_JSON) ? taosStrdup(JSON_COLUMN) : taosStrdup(colName); cache->type = type; cache->index = idx; cache->version = 0; diff --git a/source/libs/index/src/indexFstFile.c b/source/libs/index/src/indexFstFile.c index 43f15f5196..d86b3d476f 100644 --- a/source/libs/index/src/indexFstFile.c +++ b/source/libs/index/src/indexFstFile.c @@ -279,7 +279,9 @@ void idxFileDestroy(IdxFstFile* cw) { taosMemoryFree(cw); } -int idxFileWrite(IdxFstFile* write, uint8_t* buf, uint32_t len) { +int32_t idxFileWrite(IdxFstFile* write, uint8_t* buf, uint32_t len) { + int32_t code = 0; + if (write == NULL) { return 0; } @@ -288,7 +290,8 @@ int idxFileWrite(IdxFstFile* write, uint8_t* buf, uint32_t len) { int nWrite = ctx->write(ctx, buf, len); ASSERTS(nWrite == len, "index write incomplete data"); if (nWrite != len) { - return -1; + code = TAOS_SYSTEM_ERROR(errno); + return code; } write->count += len; @@ -296,7 +299,7 @@ int idxFileWrite(IdxFstFile* write, uint8_t* buf, uint32_t len) { return len; } -int idxFileRead(IdxFstFile* write, uint8_t* buf, uint32_t len) { +int32_t idxFileRead(IdxFstFile* write, uint8_t* buf, uint32_t len) { if (write == NULL) { return 0; } diff --git a/source/libs/index/src/indexJson.c b/source/libs/index/src/indexJson.c index 09c1320667..c6761233b9 100644 --- a/source/libs/index/src/indexJson.c +++ b/source/libs/index/src/indexJson.c @@ -15,11 +15,11 @@ #include "index.h" #include "indexInt.h" -int indexJsonOpen(SIndexJsonOpts *opts, const char *path, SIndexJson **index) { +int32_t indexJsonOpen(SIndexJsonOpts *opts, const char *path, SIndexJson **index) { // handle return indexOpen(opts, path, index); } -int indexJsonPut(SIndexJson *index, SIndexJsonMultiTerm *terms, uint64_t uid) { +int32_t indexJsonPut(SIndexJson *index, SIndexJsonMultiTerm *terms, uint64_t uid) { for (int i = 0; i < taosArrayGetSize(terms); i++) { SIndexJsonTerm *p = taosArrayGetP(terms, i); if (p->colType == TSDB_DATA_TYPE_BOOL) { @@ -36,7 +36,7 @@ int indexJsonPut(SIndexJson *index, SIndexJsonMultiTerm *terms, uint64_t uid) { return indexPut(index, terms, uid); } -int indexJsonSearch(SIndexJson *index, SIndexJsonMultiTermQuery *tq, SArray *result) { +int32_t indexJsonSearch(SIndexJson *index, SIndexJsonMultiTermQuery *tq, SArray *result) { SArray *terms = tq->query; for (int i = 0; i < taosArrayGetSize(terms); i++) { SIndexJsonTerm *p = taosArrayGetP(terms, i); diff --git a/source/libs/index/src/indexTfile.c b/source/libs/index/src/indexTfile.c index 84d17681b4..d0166ae2e5 100644 --- a/source/libs/index/src/indexTfile.c +++ b/source/libs/index/src/indexTfile.c @@ -110,11 +110,14 @@ TFileCache* tfileCacheCreate(SIndex* idx, const char* path) { } ctx->lru = idx->lru; - TFileReader* reader = tfileReaderCreate(ctx); - if (reader == NULL) { - indexInfo("skip invalid file: %s", file); + TFileReader* reader = NULL; + + int32_t code = tfileReaderCreate(ctx, &reader); + if (code != 0) { + indexInfo("skip invalid file: %s since %s", file, tstrerror(code)); continue; } + reader->lru = idx->lru; TFileHeader* header = &reader->header; @@ -160,9 +163,12 @@ TFileReader* tfileCacheGet(TFileCache* tcache, ICacheKey* key) { return *reader; } -void tfileCachePut(TFileCache* tcache, ICacheKey* key, TFileReader* reader) { - char buf[128] = {0}; - int32_t sz = idxSerialCacheKey(key, buf); +int32_t tfileCachePut(TFileCache* tcache, ICacheKey* key, TFileReader* reader) { + int32_t code = 0; + + char buf[128] = {0}; + int32_t sz = idxSerialCacheKey(key, buf); + TFileReader** p = taosHashGet(tcache->tableCache, buf, sz); if (p != NULL && *p != NULL) { TFileReader* oldRdr = *p; @@ -171,39 +177,44 @@ void tfileCachePut(TFileCache* tcache, ICacheKey* key, TFileReader* reader) { oldRdr->remove = true; tfileReaderUnRef(oldRdr); } - taosHashPut(tcache->tableCache, buf, sz, &reader, sizeof(void*)); - tfileReaderRef(reader); - return; + + code = taosHashPut(tcache->tableCache, buf, sz, &reader, sizeof(void*)); + if (code == 0) { + tfileReaderRef(reader); + } + return code; } -TFileReader* tfileReaderCreate(IFileCtx* ctx) { +int32_t tfileReaderCreate(IFileCtx* ctx, TFileReader** pReader) { + int32_t code = 0; TFileReader* reader = taosMemoryCalloc(1, sizeof(TFileReader)); if (reader == NULL) { - return NULL; + return TSDB_CODE_OUT_OF_MEMORY; } reader->ctx = ctx; reader->remove = false; - if (0 != tfileReaderVerify(reader)) { + if ((code = tfileReaderVerify(reader)) != 0) { indexError("invalid tfile, suid:%" PRIu64 ", colName:%s", reader->header.suid, reader->header.colName); - tfileReaderDestroy(reader); - return NULL; + TAOS_CHECK_GOTO(code, NULL, _End); } - if (0 != tfileReaderLoadHeader(reader)) { + if ((code = tfileReaderLoadHeader(reader)) != 0) { indexError("failed to load index header, suid:%" PRIu64 ", colName:%s", reader->header.suid, reader->header.colName); - tfileReaderDestroy(reader); - return NULL; + TAOS_CHECK_GOTO(code, NULL, _End); } - if (0 != tfileReaderLoadFst(reader)) { + if ((code = tfileReaderLoadFst(reader)) != 0) { indexError("failed to load index fst, suid:%" PRIu64 ", colName:%s, code:0x%x", reader->header.suid, reader->header.colName, errno); - tfileReaderDestroy(reader); - return NULL; + TAOS_CHECK_GOTO(code, NULL, _End); } - return reader; + *pReader = reader; + return code; +_End: + tfileReaderDestroy(reader); + return code; } void tfileReaderDestroy(TFileReader* reader) { if (reader == NULL) { @@ -479,9 +490,10 @@ static int32_t tfSearchCompareFunc_JSON(void* reader, SIndexTerm* tem, SIdxTRslt return TSDB_CODE_SUCCESS; } int tfileReaderSearch(TFileReader* reader, SIndexTermQuery* query, SIdxTRslt* tr) { + int ret = 0; SIndexTerm* term = query->term; EIndexQueryType qtype = query->qType; - int ret = 0; + if (IDX_TYPE_CONTAIN_EXTERN_TYPE(term->colType, TSDB_DATA_TYPE_JSON)) { ret = tfSearch[1][qtype](reader, term, tr); } else { @@ -492,12 +504,15 @@ int tfileReaderSearch(TFileReader* reader, SIndexTermQuery* query, SIdxTRslt* tr return ret; } -TFileWriter* tfileWriterOpen(char* path, uint64_t suid, int64_t version, const char* colName, uint8_t colType) { - char fullname[256] = {0}; +int32_t tfileWriterOpen(char* path, uint64_t suid, int64_t version, const char* colName, uint8_t colType, + TFileWriter** pWriter) { + int32_t code = 0; + char fullname[256] = {0}; tfileGenFileFullName(fullname, path, suid, colName, version); + IFileCtx* wcx = idxFileCtxCreate(TFILE, fullname, false, 1024 * 1024 * 64); if (wcx == NULL) { - return NULL; + return TSDB_CODE_OUT_OF_MEMORY; } TFileHeader tfh = {0}; @@ -508,34 +523,39 @@ TFileWriter* tfileWriterOpen(char* path, uint64_t suid, int64_t version, const c memcpy(tfh.colName, colName, strlen(colName)); } - return tfileWriterCreate(wcx, &tfh); + return tfileWriterCreate(wcx, &tfh, pWriter); } -TFileReader* tfileReaderOpen(SIndex* idx, uint64_t suid, int64_t version, const char* colName) { - char fullname[256] = {0}; +int32_t tfileReaderOpen(SIndex* idx, uint64_t suid, int64_t version, const char* colName, TFileReader** pReader) { + int32_t code = 0; + char fullname[256] = {0}; tfileGenFileFullName(fullname, idx->path, suid, colName, version); IFileCtx* wc = idxFileCtxCreate(TFILE, fullname, true, 1024 * 1024 * 1024); if (wc == NULL) { - terrno = TAOS_SYSTEM_ERROR(errno); - indexError("failed to open readonly file: %s, reason: %s", fullname, terrstr()); - return NULL; + code = TAOS_SYSTEM_ERROR(errno); + indexError("failed to open readonly file: %s, reason: %s", fullname, tstrerror(code)); + return code; } wc->lru = idx->lru; indexTrace("open read file name:%s, file size: %" PRId64 "", wc->file.buf, wc->file.size); - TFileReader* reader = tfileReaderCreate(wc); - return reader; + return tfileReaderCreate(wc, pReader); } -TFileWriter* tfileWriterCreate(IFileCtx* ctx, TFileHeader* header) { + +int32_t tfileWriterCreate(IFileCtx* ctx, TFileHeader* header, TFileWriter** pWriter) { + int32_t code = 0; TFileWriter* tw = taosMemoryCalloc(1, sizeof(TFileWriter)); if (tw == NULL) { - indexError("index: %" PRIu64 " failed to alloc TFilerWriter", header->suid); - return NULL; + code = TSDB_CODE_OUT_OF_MEMORY; + indexError("index: %" PRIu64 " failed to alloc TFilerWriter since %s", header->suid, tstrerror(code)); + return code; } tw->ctx = ctx; tw->header = *header; tfileWriteHeader(tw); - return tw; + + *pWriter = tw; + return code; } int tfileWriterPut(TFileWriter* tw, void* data, bool order) { @@ -545,8 +565,8 @@ int tfileWriterPut(TFileWriter* tw, void* data, bool order) { int8_t colType = tw->header.colType; colType = IDX_TYPE_GET_TYPE(colType); - if (colType == TSDB_DATA_TYPE_BINARY || colType == TSDB_DATA_TYPE_VARBINARY || - colType == TSDB_DATA_TYPE_NCHAR || colType == TSDB_DATA_TYPE_GEOMETRY) { + if (colType == TSDB_DATA_TYPE_BINARY || colType == TSDB_DATA_TYPE_VARBINARY || colType == TSDB_DATA_TYPE_NCHAR || + colType == TSDB_DATA_TYPE_GEOMETRY) { fn = tfileStrCompare; } else { fn = getComparFunc(colType, 0); @@ -570,6 +590,9 @@ int tfileWriterPut(TFileWriter* tw, void* data, bool order) { int32_t cap = 4 * 1024; char* buf = taosMemoryCalloc(1, cap); + if (buf == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } for (size_t i = 0; i < sz; i++) { TFileValue* v = taosArrayGetP((SArray*)data, i); @@ -584,7 +607,7 @@ int tfileWriterPut(TFileWriter* tw, void* data, bool order) { char* t = (char*)taosMemoryRealloc(buf, cap); if (t == NULL) { taosMemoryFree(buf); - return -1; + return TSDB_CODE_OUT_OF_MEMORY; } buf = t; } @@ -664,7 +687,7 @@ void idxTFileDestroy(IndexTFile* tfile) { int idxTFileSearch(void* tfile, SIndexTermQuery* query, SIdxTRslt* result) { int ret = -1; if (tfile == NULL) { - return ret; + return TSDB_CODE_INVALID_DATA_FMT; } int64_t st = taosGetTimestampUs(); diff --git a/source/libs/index/src/indexUtil.c b/source/libs/index/src/indexUtil.c index cdfb79016f..f89944204d 100644 --- a/source/libs/index/src/indexUtil.c +++ b/source/libs/index/src/indexUtil.c @@ -160,10 +160,18 @@ int verdataCompare(const void *a, const void *b) { SIdxTRslt *idxTRsltCreate() { SIdxTRslt *tr = taosMemoryCalloc(1, sizeof(SIdxTRslt)); + if (tr == NULL) { + return NULL; + } tr->total = taosArrayInit(4, sizeof(uint64_t)); tr->add = taosArrayInit(4, sizeof(uint64_t)); tr->del = taosArrayInit(4, sizeof(uint64_t)); + + if (tr->total == NULL || tr->add == NULL || tr->del == NULL) { + idxTRsltClear(tr); + tr = NULL; + } return tr; } void idxTRsltClear(SIdxTRslt *tr) { diff --git a/source/libs/index/test/indexTests.cc b/source/libs/index/test/indexTests.cc index 612ce107c7..bbd2ea6fae 100644 --- a/source/libs/index/test/indexTests.cc +++ b/source/libs/index/test/indexTests.cc @@ -294,7 +294,7 @@ class IndexEnv : public ::testing::Test { taosRemoveDir(path); SIndexOpts opts; opts.cacheSize = 1024 * 1024 * 4; - int ret = indexOpen(&opts, path, &index); + int32_t ret = indexOpen(&opts, path, &index); assert(ret == 0); } virtual void TearDown() { indexClose(index); } @@ -398,7 +398,7 @@ class TFileObj { bool InitReader() { IFileCtx* ctx = idxFileCtxCreate(TFILE, fileName_.c_str(), true, 64 * 1024 * 1024); ctx->lru = taosLRUCacheInit(1024 * 1024 * 4, -1, .5); - reader_ = tfileReaderCreate(ctx); + int32_t code = tfileReaderCreate(ctx, &reader_); return reader_ != NULL ? true : false; } int Get(SIndexTermQuery* query, SArray* result) { @@ -701,7 +701,7 @@ class IndexObj { SIndexOpts opts; opts.cacheSize = 1024 * 1024 * 4; - int ret = indexOpen(&opts, dir.c_str(), &idx); + int32_t ret = indexOpen(&opts, dir.c_str(), &idx); if (ret != 0) { // opt std::cout << "failed to open index: %s" << dir << std::endl;