diff --git a/source/libs/index/inc/indexFstNode.h b/source/libs/index/inc/indexFstNode.h index 5bdb2acb32..a6f1662cb3 100644 --- a/source/libs/index/inc/indexFstNode.h +++ b/source/libs/index/inc/indexFstNode.h @@ -44,7 +44,7 @@ FstBuilderNode* fstBuilderNodeDefault(); FstBuilderNode* fstBuilderNodeClone(FstBuilderNode* src); -void fstBuilderNodeCloneFrom(FstBuilderNode* dst, FstBuilderNode* src); +int32_t fstBuilderNodeCloneFrom(FstBuilderNode* dst, FstBuilderNode* src); // bool fstBuilderNodeCompileTo(FstBuilderNode *b, IdxFile' *wrt, // CompiledAddr lastAddr, CompiledAddr startAddr); diff --git a/source/libs/index/inc/indexTfile.h b/source/libs/index/inc/indexTfile.h index 115d5b3894..e769ea2393 100644 --- a/source/libs/index/inc/indexTfile.h +++ b/source/libs/index/inc/indexTfile.h @@ -18,7 +18,7 @@ #include "indexFst.h" #include "indexFstFile.h" #include "indexInt.h" -#include "indexTfile.h" +//#include "indexTfile.h" #include "indexUtil.h" #include "tlockfree.h" @@ -129,8 +129,8 @@ void tfileIteratorDestroy(Iterate* iterator); TFileValue* tfileValueCreate(char* val); -int tfileValuePush(TFileValue* tf, uint64_t val); -void tfileValueDestroy(TFileValue* tf); +int32_t tfileValuePush(TFileValue* tf, uint64_t val); +void tfileValueDestroy(TFileValue* tf); #ifdef __cplusplus } diff --git a/source/libs/index/inc/indexUtil.h b/source/libs/index/inc/indexUtil.h index 6b016900c2..9eb8001d17 100644 --- a/source/libs/index/inc/indexUtil.h +++ b/source/libs/index/inc/indexUtil.h @@ -46,17 +46,17 @@ extern "C" { buf += len; \ } while (0) -#define INDEX_MERGE_ADD_DEL(src, dst, tgt) \ - { \ - bool f = false; \ - for (int i = 0; i < taosArrayGetSize(src); i++) { \ - if (*(uint64_t *)taosArrayGet(src, i) == tgt) { \ - f = true; \ - } \ - } \ - if (f == false) { \ - (void)taosArrayPush(dst, &tgt); \ - } \ +#define INDEX_MERGE_ADD_DEL(src, dst, tgt) \ + { \ + bool f = false; \ + for (int i = 0; i < taosArrayGetSize(src); i++) { \ + if (*(uint64_t *)taosArrayGet(src, i) == tgt) { \ + f = true; \ + } \ + } \ + if (f == false) { \ + if (taosArrayPush(dst, &tgt) == NULL) code = TSDB_CODE_OUT_OF_MEMORY; \ + } \ } /* multi sorted result intersection @@ -65,7 +65,7 @@ extern "C" { * [1, 4, 5] * output:[4, 5] */ -void iIntersection(SArray *in, SArray *out); +int32_t iIntersection(SArray *in, SArray *out); /* multi sorted result union * input: [1, 2, 4, 5] @@ -73,7 +73,7 @@ void iIntersection(SArray *in, SArray *out); * [1, 4, 5] * output:[1, 2, 3, 4, 5] */ -void iUnion(SArray *in, SArray *out); +int32_t iUnion(SArray *in, SArray *out); /* see example * total: [1, 2, 4, 5, 7, 8] @@ -81,7 +81,7 @@ void iUnion(SArray *in, SArray *out); * return: [1, 2, 7, 8] saved in total */ -void iExcept(SArray *total, SArray *except); +int32_t iExcept(SArray *total, SArray *except); int uidCompare(const void *a, const void *b); @@ -107,7 +107,7 @@ void idxTRsltClear(SIdxTRslt *tr); void idxTRsltDestroy(SIdxTRslt *tr); -void idxTRsltMergeTo(SIdxTRslt *tr, SArray *out); +int32_t idxTRsltMergeTo(SIdxTRslt *tr, SArray *out); #ifdef __cplusplus } diff --git a/source/libs/index/src/index.c b/source/libs/index/src/index.c index e2c74f0dbb..0d834a68cb 100644 --- a/source/libs/index/src/index.c +++ b/source/libs/index/src/index.c @@ -92,7 +92,7 @@ static int32_t idxMergeFinalResults(SArray* in, EIndexOperatorType oType, SArray static int32_t idxGenTFile(SIndex* index, IndexCache* cache, SArray* batch); // merge cache and tfile by opera type -static void idxMergeCacheAndTFile(SArray* result, IterateValue* icache, IterateValue* iTfv, SIdxTRslt* helper); +static int32_t idxMergeCacheAndTFile(SArray* result, IterateValue* icache, IterateValue* iTfv, SIdxTRslt* helper); // static int32_t indexSerialTermKey(SIndexTerm* itm, char* buf); // int32_t indexSerialKey(ICacheKey* key, char* buf); @@ -212,6 +212,7 @@ void idxReleaseRef(int64_t ref) { int32_t indexPut(SIndex* index, SIndexMultiTerm* fVals, uint64_t uid) { // TODO(yihao): reduce the lock range + int32_t code = 0; (void)taosThreadMutexLock(&index->mtx); for (int i = 0; i < taosArrayGetSize(fVals); i++) { SIndexTerm* p = taosArrayGetP(fVals, i); @@ -223,11 +224,19 @@ int32_t indexPut(SIndex* index, SIndexMultiTerm* fVals, uint64_t uid) { IndexCache** cache = taosHashGet(index->colObj, buf, sz); if (cache == NULL) { IndexCache* pCache = idxCacheCreate(index, p->suid, p->colName, p->colType); - (void)taosHashPut(index->colObj, buf, sz, &pCache, sizeof(void*)); + code = taosHashPut(index->colObj, buf, sz, &pCache, sizeof(void*)); + if (code != 0) { + idxCacheDestroy(pCache); + break; + } } } (void)taosThreadMutexUnlock(&index->mtx); + if (code != 0) { + return code; + } + for (int i = 0; i < taosArrayGetSize(fVals); i++) { SIndexTerm* p = taosArrayGetP(fVals, i); @@ -247,15 +256,27 @@ int32_t indexPut(SIndex* index, SIndexMultiTerm* fVals, uint64_t uid) { return 0; } int32_t indexSearch(SIndex* index, SIndexMultiTermQuery* multiQuerys, SArray* result) { + int32_t code = 0; EIndexOperatorType opera = multiQuerys->opera; // relation of querys SArray* iRslts = taosArrayInit(4, POINTER_BYTES); - int nQuery = taosArrayGetSize(multiQuerys->query); + if (iRslts == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + int nQuery = taosArrayGetSize(multiQuerys->query); for (size_t i = 0; i < nQuery; i++) { SIndexTermQuery* qterm = taosArrayGet(multiQuerys->query, i); SArray* trslt = NULL; - (void)idxTermSearch(index, qterm, &trslt); - (void)taosArrayPush(iRslts, (void*)&trslt); + code = idxTermSearch(index, qterm, &trslt); + if (code != 0) { + idxInterRsltDestroy(iRslts); + return code; + } + if (taosArrayPush(iRslts, (void*)&trslt) == NULL) { + idxInterRsltDestroy(iRslts); + return TSDB_CODE_OUT_OF_MEMORY; + } } (void)idxMergeFinalResults(iRslts, opera, result); idxInterRsltDestroy(iRslts); @@ -267,6 +288,9 @@ int indexDelete(SIndex* index, SIndexMultiTermQuery* query) { return 1; } SIndexOpts* indexOptsCreate(int32_t cacheSize) { SIndexOpts* opts = taosMemoryCalloc(1, sizeof(SIndexOpts)); + if (opts == NULL) { + return NULL; + } opts->cacheSize = cacheSize; return opts; } @@ -295,7 +319,7 @@ void indexMultiTermQueryDestroy(SIndexMultiTermQuery* pQuery) { int32_t indexMultiTermQueryAdd(SIndexMultiTermQuery* pQuery, SIndexTerm* term, EIndexQueryType qType) { SIndexTermQuery q = {.qType = qType, .term = term}; if (taosArrayPush(pQuery->query, &q) == NULL) { - return terrno; + return TSDB_CODE_OUT_OF_MEMORY; } return 0; } @@ -362,7 +386,9 @@ void indexTermDestroy(SIndexTerm* p) { SIndexMultiTerm* indexMultiTermCreate() { return taosArrayInit(4, sizeof(SIndexTerm*)); } int32_t indexMultiTermAdd(SIndexMultiTerm* terms, SIndexTerm* term) { - (void)taosArrayPush(terms, &term); + if (taosArrayPush(terms, &term) == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } return 0; } void indexMultiTermDestroy(SIndexMultiTerm* terms) { @@ -422,6 +448,7 @@ bool indexJsonIsRebuild(SIndexJson* idx) { } static int32_t idxTermSearch(SIndex* sIdx, SIndexTermQuery* query, SArray** result) { + int32_t code = 0; SIndexTerm* term = query->term; const char* colName = term->colName; int32_t nColName = term->nColName; @@ -452,6 +479,10 @@ static int32_t idxTermSearch(SIndex* sIdx, SIndexTermQuery* query, SArray** resu int64_t st = taosGetTimestampUs(); SIdxTRslt* tr = idxTRsltCreate(); + if (tr == NULL) { + TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, END); + } + if (0 == idxCacheSearch(cache, query, tr, &s)) { if (s == kTypeDeletion) { indexInfo("col: %s already drop by", term->colName); @@ -473,13 +504,14 @@ static int32_t idxTermSearch(SIndex* sIdx, SIndexTermQuery* query, SArray** resu int64_t cost = taosGetTimestampUs() - st; indexInfo("search cost: %" PRIu64 "us", cost); - idxTRsltMergeTo(tr, *result); + code = idxTRsltMergeTo(tr, *result); + TAOS_CHECK_GOTO(code, NULL, END); idxTRsltDestroy(tr); return 0; END: idxTRsltDestroy(tr); - return 0; + return code; } static void idxInterRsltDestroy(SArray* results) { if (results == NULL) { @@ -503,9 +535,9 @@ static int32_t idxMergeFinalResults(SArray* in, EIndexOperatorType oType, SArray } if (oType == MUST) { - iIntersection(in, out); + return iIntersection(in, out); } else if (oType == SHOULD) { - iUnion(in, out); + return iUnion(in, out); } else if (oType == NOT) { // just one column index, enhance later // taosArrayAddAll(fResults, interResults); @@ -514,30 +546,53 @@ static int32_t idxMergeFinalResults(SArray* in, EIndexOperatorType oType, SArray return 0; } -static void idxMayMergeTempToFinalRslt(SArray* result, TFileValue* tfv, SIdxTRslt* tr) { +static int32_t idxMayMergeTempToFinalRslt(SArray* result, TFileValue* tfv, SIdxTRslt* tr) { + int32_t code = 0; int32_t sz = taosArrayGetSize(result); if (sz > 0) { TFileValue* lv = taosArrayGetP(result, sz - 1); if (tfv != NULL && strcmp(lv->colVal, tfv->colVal) != 0) { - idxTRsltMergeTo(tr, lv->tableId); + code = idxTRsltMergeTo(tr, lv->tableId); + if (code != 0) { + indexFatal("failed to merge result since %s", tstrerror(code)); + return code; + } idxTRsltClear(tr); - (void)taosArrayPush(result, &tfv); + if (taosArrayPush(result, &tfv) == NULL) { + indexFatal("failed to merge result since %s", tstrerror(TSDB_CODE_OUT_OF_MEMORY)); + } } else if (tfv == NULL) { // handle last iterator - idxTRsltMergeTo(tr, lv->tableId); + code = idxTRsltMergeTo(tr, lv->tableId); + if (code != 0) { + indexFatal("failed to merge result since %s", tstrerror(code)); + } } else { tfileValueDestroy(tfv); + return 0; } } else { - (void)taosArrayPush(result, &tfv); + if (taosArrayPush(result, &tfv) == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } } + return code; } -static void idxMergeCacheAndTFile(SArray* result, IterateValue* cv, IterateValue* tv, SIdxTRslt* tr) { +static int32_t idxMergeCacheAndTFile(SArray* result, IterateValue* cv, IterateValue* tv, SIdxTRslt* tr) { + int32_t code = 0; char* colVal = (cv != NULL) ? cv->colVal : tv->colVal; TFileValue* tfv = tfileValueCreate(colVal); + if (tfv == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } - idxMayMergeTempToFinalRslt(result, tfv, tr); + code = idxMayMergeTempToFinalRslt(result, tfv, tr); + if (code != 0) { + tfileValueDestroy(tfv); + return code; + } + tfv = NULL; if (cv != NULL) { uint64_t id = *(uint64_t*)taosArrayGet(cv->val, 0); @@ -549,8 +604,11 @@ static void idxMergeCacheAndTFile(SArray* result, IterateValue* cv, IterateValue } } if (tv != NULL) { - (void)taosArrayAddAll(tr->total, tv->val); + if (taosArrayAddAll(tr->total, tv->val) == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } } + return 0; } static void idxDestroyFinalRslt(SArray* result) { int32_t sz = result ? taosArrayGetSize(result) : 0; @@ -562,6 +620,7 @@ static void idxDestroyFinalRslt(SArray* result) { } int32_t idxFlushCacheToTFile(SIndex* sIdx, void* cache, bool quit) { + int32_t code = 0; if (sIdx == NULL) { return TSDB_CODE_INVALID_PTR; } @@ -598,12 +657,16 @@ int32_t idxFlushCacheToTFile(SIndex* sIdx, void* cache, bool quit) { } SArray* result = taosArrayInit(1024, sizeof(void*)); + if (result == NULL) { + TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _exception); + } bool cn = cacheIter ? cacheIter->next(cacheIter) : false; bool tn = tfileIter ? tfileIter->next(tfileIter) : false; SIdxTRslt* tr = idxTRsltCreate(); if (tr == NULL) { + TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _exception); } while (cn == true || tn == true) { IterateValue* cv = (cn == true) ? cacheIter->getValue(cacheIter) : NULL; @@ -618,27 +681,42 @@ int32_t idxFlushCacheToTFile(SIndex* sIdx, void* cache, bool quit) { comp = 1; } if (comp == 0) { - idxMergeCacheAndTFile(result, cv, tv, tr); + code = idxMergeCacheAndTFile(result, cv, tv, tr); + if (code != 0) { + TAOS_CHECK_GOTO(code, NULL, _exception); + } + cn = cacheIter->next(cacheIter); tn = tfileIter->next(tfileIter); } else if (comp < 0) { - idxMergeCacheAndTFile(result, cv, NULL, tr); + code = idxMergeCacheAndTFile(result, cv, NULL, tr); + if (code != 0) { + TAOS_CHECK_GOTO(code, NULL, _exception); + } cn = cacheIter->next(cacheIter); } else { - idxMergeCacheAndTFile(result, NULL, tv, tr); + code = idxMergeCacheAndTFile(result, NULL, tv, tr); + if (code != 0) { + TAOS_CHECK_GOTO(code, NULL, _exception); + } tn = tfileIter->next(tfileIter); } } - idxMayMergeTempToFinalRslt(result, NULL, tr); + if ((code = idxMayMergeTempToFinalRslt(result, NULL, tr)) != 0) { + idxTRsltDestroy(tr); + TAOS_CHECK_GOTO(code, NULL, _exception); + } idxTRsltDestroy(tr); - int ret = idxGenTFile(sIdx, pCache, result); - if (ret != 0) { - indexError("failed to merge"); + code = idxGenTFile(sIdx, pCache, result); + if (code != 0) { + indexError("failed to merge since %s", tstrerror(code)); } else { int64_t cost = taosGetTimestampUs() - st; indexInfo("success to merge , time cost: %" PRId64 "ms", cost / 1000); } + +_exception: idxDestroyFinalRslt(result); idxCacheDestroyImm(pCache); @@ -654,8 +732,11 @@ int32_t idxFlushCacheToTFile(SIndex* sIdx, void* cache, bool quit) { idxPost(sIdx); } idxReleaseRef(sIdx->refId); + if (code != 0) { + indexError("failed to merge since %s", tstrerror(code)); + } - return ret; + return code; } void iterateValueDestroy(IterateValue* value, bool destroy) { if (destroy) { diff --git a/source/libs/index/src/indexCache.c b/source/libs/index/src/indexCache.c index 734c4067c6..a89cc47925 100644 --- a/source/libs/index/src/indexCache.c +++ b/source/libs/index/src/indexCache.c @@ -75,6 +75,7 @@ static int32_t cacheSearchTerm(void* cache, SIndexTerm* term, SIdxTRslt* tr, STe if (cache == NULL) { return 0; } + int32_t code = 0; MemTable* mem = cache; IndexCache* pCache = mem->pCache; @@ -98,6 +99,10 @@ static int32_t cacheSearchTerm(void* cache, SIndexTerm* term, SIdxTRslt* tr, STe } else if (c->operaType == DEL_VALUE) { INDEX_MERGE_ADD_DEL(tr->add, tr->del, c->uid) } + + if (code != 0) { + break; + } } else { break; } @@ -105,7 +110,7 @@ static int32_t cacheSearchTerm(void* cache, SIndexTerm* term, SIdxTRslt* tr, STe taosMemoryFree(pCt); (void)tSkipListDestroyIter(iter); - return 0; + return code; } static int32_t cacheSearchPrefix(void* cache, SIndexTerm* term, SIdxTRslt* tr, STermValueType* s) { // impl later @@ -123,6 +128,7 @@ static int32_t cacheSearchCompareFunc(void* cache, SIndexTerm* term, SIdxTRslt* if (cache == NULL) { return 0; } + int32_t code = TSDB_CODE_SUCCESS; MemTable* mem = cache; IndexCache* pCache = mem->pCache; @@ -148,7 +154,7 @@ static int32_t cacheSearchCompareFunc(void* cache, SIndexTerm* term, SIdxTRslt* } CacheTerm* c = (CacheTerm*)SL_GET_NODE_DATA(node); terrno = TSDB_CODE_SUCCESS; - TExeCond cond = cmpFn(c->colVal, pCt->colVal, pCt->colType); + TExeCond cond = cmpFn(c->colVal, pCt->colVal, pCt->colType); if (terrno != TSDB_CODE_SUCCESS) { code = terrno; goto _return; @@ -156,11 +162,14 @@ static int32_t cacheSearchCompareFunc(void* cache, SIndexTerm* term, SIdxTRslt* if (cond == MATCH) { if (c->operaType == ADD_VALUE) { INDEX_MERGE_ADD_DEL(tr->del, tr->add, c->uid) - // taosArrayPush(result, &c->uid); *s = kTypeValue; } else if (c->operaType == DEL_VALUE) { INDEX_MERGE_ADD_DEL(tr->add, tr->del, c->uid) } + + if (code != 0) { + break; + } } else if (cond == CONTINUE) { continue; } else if (cond == BREAK) { @@ -190,6 +199,8 @@ static int32_t cacheSearchTerm_JSON(void* cache, SIndexTerm* term, SIdxTRslt* tr if (cache == NULL) { return 0; } + + int32_t code = 0; MemTable* mem = cache; IndexCache* pCache = mem->pCache; @@ -223,6 +234,10 @@ static int32_t cacheSearchTerm_JSON(void* cache, SIndexTerm* term, SIdxTRslt* tr } else if (c->operaType == DEL_VALUE) { INDEX_MERGE_ADD_DEL(tr->add, tr->del, c->uid) } + + if (code != 0) { + break; + } } else { break; } @@ -231,9 +246,7 @@ static int32_t cacheSearchTerm_JSON(void* cache, SIndexTerm* term, SIdxTRslt* tr taosMemoryFree(pCt); taosMemoryFree(exBuf); (void)tSkipListDestroyIter(iter); - return 0; - - return TSDB_CODE_SUCCESS; + return code; } static int32_t cacheSearchSuffix_JSON(void* cache, SIndexTerm* term, SIdxTRslt* tr, STermValueType* s) { return TSDB_CODE_SUCCESS; @@ -338,6 +351,10 @@ static int32_t cacheSearchCompareFunc_JSON(void* cache, SIndexTerm* term, SIdxTR } else if (c->operaType == DEL_VALUE) { INDEX_MERGE_ADD_DEL(tr->add, tr->del, c->uid) } + + if (code != 0) { + break; + } } else if (cond == CONTINUE) { continue; } else if (cond == BREAK) { @@ -800,7 +817,13 @@ static bool idxCacheIteratorNext(Iterate* itera) { iv->type = ct->operaType; iv->ver = ct->version; iv->colVal = taosStrdup(ct->colVal); - (void)taosArrayPush(iv->val, &ct->uid); + if (iv->colVal == NULL) { + return false; + } + if (taosArrayPush(iv->val, &ct->uid) == NULL) { + taosMemoryFree(iv->colVal); + return false; + } } return next; } diff --git a/source/libs/index/src/indexFst.c b/source/libs/index/src/indexFst.c index 3edd61bd08..aa617c2de2 100644 --- a/source/libs/index/src/indexFst.c +++ b/source/libs/index/src/indexFst.c @@ -62,7 +62,9 @@ void fstUnFinishedNodesPushEmpty(FstUnFinishedNodes* nodes, bool isFinal) { node->trans = taosArrayInit(16, sizeof(FstTransition)); FstBuilderNodeUnfinished un = {.node = node, .last = NULL}; - (void)taosArrayPush(nodes->stack, &un); + if (taosArrayPush(nodes->stack, &un) == NULL) { + fstBuilderNodeDestroy(node); + } } FstBuilderNode* fstUnFinishedNodesPopRoot(FstUnFinishedNodes* nodes) { FstBuilderNodeUnfinished* un = taosArrayPop(nodes->stack); @@ -120,7 +122,10 @@ void fstUnFinishedNodesAddSuffix(FstUnFinishedNodes* nodes, FstSlice bs, Output FstLastTransition* trn = fstLastTransitionCreate(data[i], 0); FstBuilderNodeUnfinished un = {.node = n, .last = trn}; - (void)taosArrayPush(nodes->stack, &un); + if (taosArrayPush(nodes->stack, &un) == NULL) { + fstBuilderNodeDestroy(n); + taosMemoryFree(trn); + } } fstUnFinishedNodesPushEmpty(nodes, true); } @@ -892,7 +897,9 @@ void fstBuilderNodeUnfinishedLastCompiled(FstBuilderNodeUnfinished* unNode, Comp return; } FstTransition t = {.inp = trn->inp, .out = trn->out, .addr = addr}; - (void)taosArrayPush(unNode->node->trans, &t); + if (taosArrayPush(unNode->node->trans, &t) == NULL) { + return; + } fstLastTransitionDestroy(trn); unNode->last = NULL; return; @@ -997,7 +1004,12 @@ bool fstGet(Fst* fst, FstSlice* b, Output* out) { uint8_t* data = fstSliceData(b, &len); SArray* nodes = (SArray*)taosArrayInit(len, sizeof(FstNode*)); - (void)taosArrayPush(nodes, &root); + if (nodes == NULL) { + return false; + } + if (taosArrayPush(nodes, &root) == NULL) { + goto _return; + } for (uint32_t i = 0; i < len; i++) { uint8_t inp = data[i]; Output res = 0; @@ -1009,7 +1021,9 @@ bool fstGet(Fst* fst, FstSlice* b, Output* out) { (void)fstNodeGetTransitionAt(root, res, &trn); tOut += trn.out; root = fstGetNode(fst, trn.addr); - (void)taosArrayPush(nodes, &root); + if (taosArrayPush(nodes, &root) == NULL) { + goto _return; + } } if (!FST_NODE_IS_FINAL(root)) { goto _return; @@ -1156,7 +1170,9 @@ bool stmStSeekMin(FStmSt* sws, FstBoundWithData* min) { .trans = 0, .out = {.null = false, .out = 0}, .autState = automFuncs[aut->type].start(aut)}; // auto.start callback - (void)taosArrayPush(sws->stack, &s); + if (taosArrayPush(sws->stack, &s) == NULL) { + return false; + } return true; } FstSlice* key = NULL; @@ -1185,12 +1201,16 @@ bool stmStSeekMin(FStmSt* sws, FstBoundWithData* min) { (void)fstNodeGetTransitionAt(node, res, &trn); void* preState = autState; autState = automFuncs[aut->type].accept(aut, preState, b); - (void)taosArrayPush(sws->inp, &b); + if (taosArrayPush(sws->inp, &b) == NULL) { + return false; + } FstStreamState s = {.node = node, .trans = res + 1, .out = {.null = false, .out = out}, .autState = preState}; node = NULL; - (void)taosArrayPush(sws->stack, &s); + if (taosArrayPush(sws->stack, &s) == NULL) { + return false; + } out += trn.out; node = fstGetNode(sws->fst, trn.addr); } else { @@ -1209,7 +1229,9 @@ bool stmStSeekMin(FStmSt* sws, FstBoundWithData* min) { } FstStreamState s = {.node = node, .trans = i, .out = {.null = false, .out = out}, .autState = autState}; - (void)taosArrayPush(sws->stack, &s); + if (taosArrayPush(sws->stack, &s) == NULL) { + return false; + } taosMemoryFree(trans); return true; } @@ -1230,7 +1252,9 @@ bool stmStSeekMin(FStmSt* sws, FstBoundWithData* min) { (void)fstNodeGetTransitionAt(n, trans - 1, &trn); FstStreamState s = { .node = fstGetNode(sws->fst, trn.addr), .trans = 0, .out = {.null = false, .out = out}, .autState = autState}; - (void)taosArrayPush(sws->stack, &s); + if (taosArrayPush(sws->stack, &s) == NULL) { + return false; + } return true; } return false; @@ -1273,8 +1297,14 @@ FStmStRslt* stmStNextWith(FStmSt* sws, streamCallback__fn callback) { bool isMatch = automFuncs[aut->type].isMatch(aut, nextState); FstNode* nextNode = fstGetNode(sws->fst, trn.addr); - (void)taosArrayPush(nodes, &nextNode); - (void)taosArrayPush(sws->inp, &(trn.inp)); + if (taosArrayPush(nodes, &nextNode) == NULL) { + taosArrayDestroy(nodes); + return NULL; + } + if (taosArrayPush(sws->inp, &(trn.inp)) == NULL) { + taosArrayDestroy(nodes); + return NULL; + } if (FST_NODE_IS_FINAL(nextNode)) { void* eofState = automFuncs[aut->type].acceptEof(aut, nextState); @@ -1283,10 +1313,16 @@ FStmStRslt* stmStNextWith(FStmSt* sws, streamCallback__fn callback) { } } FstStreamState s1 = {.node = p->node, .trans = p->trans + 1, .out = p->out, .autState = p->autState}; - (void)taosArrayPush(sws->stack, &s1); + if (taosArrayPush(sws->stack, &s1) == NULL) { + taosArrayDestroy(nodes); + return NULL; + } FstStreamState s2 = {.node = nextNode, .trans = 0, .out = {.null = false, .out = out}, .autState = nextState}; - (void)taosArrayPush(sws->stack, &s2); + if (taosArrayPush(sws->stack, &s2) == NULL) { + taosArrayDestroy(nodes); + return NULL; + } int32_t isz = taosArrayGetSize(sws->inp); uint8_t* buf = (uint8_t*)taosMemoryMalloc(isz * sizeof(uint8_t)); @@ -1357,11 +1393,18 @@ FStmBuilder* stmBuilderCreate(Fst* fst, FAutoCtx* aut) { b->aut = aut; b->min = fstBoundStateCreate(Unbounded, NULL); b->max = fstBoundStateCreate(Unbounded, NULL); + + if (b->min == NULL || b->max == NULL) { + stmBuilderDestroy(b); + return NULL; + } + return b; } void stmBuilderDestroy(FStmBuilder* b) { - fstSliceDestroy(&b->min->data); - fstSliceDestroy(&b->max->data); + if (b->min) fstSliceDestroy(&b->min->data); + if (b->max) fstSliceDestroy(&b->max->data); + taosMemoryFreeClear(b->min); taosMemoryFreeClear(b->max); taosMemoryFree(b); diff --git a/source/libs/index/src/indexFstDfa.c b/source/libs/index/src/indexFstDfa.c index f552d5feba..c95847525f 100644 --- a/source/libs/index/src/indexFstDfa.c +++ b/source/libs/index/src/indexFstDfa.c @@ -14,6 +14,7 @@ */ #include "indexFstDfa.h" +#include "indexInt.h" #include "thash.h" const static uint32_t STATE_LIMIT = 1000; @@ -68,23 +69,41 @@ FstDfa *dfaBuilderBuild(FstDfaBuilder *builder) { uint32_t sz = taosArrayGetSize(builder->dfa->insts); FstSparseSet *cur = sparSetCreate(sz); FstSparseSet *nxt = sparSetCreate(sz); + if (cur == NULL || nxt == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } dfaAdd(builder->dfa, cur, 0); uint32_t result; SArray *states = taosArrayInit(0, sizeof(uint32_t)); + if (states == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } if (dfaBuilderCacheState(builder, cur, &result)) { - (void)taosArrayPush(states, &result); + if (taosArrayPush(states, &result) == NULL) { + goto _exception; + } } SHashObj *seen = taosHashInit(12, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK); + if (seen == NULL) { + goto _exception; + } + while (taosArrayGetSize(states) != 0) { result = *(uint32_t *)taosArrayPop(states); for (int i = 0; i < 256; i++) { uint32_t ns, dummpy = 0; if (dfaBuilderRunState(builder, cur, nxt, result, i, &ns)) { if (taosHashGet(seen, &ns, sizeof(ns)) == NULL) { - (void)taosHashPut(seen, &ns, sizeof(ns), &dummpy, sizeof(dummpy)); - (void)taosArrayPush(states, &ns); + if (taosHashPut(seen, &ns, sizeof(ns), &dummpy, sizeof(dummpy)) != 0) { + goto _exception; + } + if (taosArrayPush(states, &ns) == NULL) { + goto _exception; + } } } if (taosArrayGetSize(builder->dfa->states) > STATE_LIMIT) { @@ -96,6 +115,11 @@ FstDfa *dfaBuilderBuild(FstDfaBuilder *builder) { taosArrayDestroy(states); taosHashCleanup(seen); return builder->dfa; +_exception: + taosArrayDestroy(states); + taosHashCleanup(seen); + indexError("failed to build dfa since %s", tstrerror(terrno)); + return NULL; } bool dfaBuilderRunState(FstDfaBuilder *builder, FstSparseSet *cur, FstSparseSet *next, uint32_t state, uint8_t byte, @@ -122,8 +146,13 @@ bool dfaBuilderRunState(FstDfaBuilder *builder, FstSparseSet *cur, FstSparseSet } bool dfaBuilderCacheState(FstDfaBuilder *builder, FstSparseSet *set, uint32_t *result) { + int32_t code = 0; SArray *tinsts = taosArrayInit(4, sizeof(uint32_t)); - bool isMatch = false; + if (tinsts == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _exception; + } + bool isMatch = false; for (int i = 0; i < sparSetLen(set); i++) { int32_t ip; @@ -133,10 +162,16 @@ bool dfaBuilderCacheState(FstDfaBuilder *builder, FstSparseSet *set, uint32_t *r if (inst->ty == JUMP || inst->ty == SPLIT) { continue; } else if (inst->ty == RANGE) { - (void)taosArrayPush(tinsts, &ip); + if (taosArrayPush(tinsts, &ip) == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _exception; + } } else if (inst->ty == MATCH) { isMatch = true; - (void)taosArrayPush(tinsts, &ip); + if (taosArrayPush(tinsts, &ip) == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _exception; + } } } if (taosArrayGetSize(tinsts) == 0) { @@ -149,13 +184,23 @@ bool dfaBuilderCacheState(FstDfaBuilder *builder, FstSparseSet *set, uint32_t *r taosArrayDestroy(tinsts); } else { DfaState st = {.insts = tinsts, .isMatch = isMatch}; - (void)taosArrayPush(builder->dfa->states, &st); + if (taosArrayPush(builder->dfa->states, &st) == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _exception; + } int32_t sz = taosArrayGetSize(builder->dfa->states) - 1; - (void)taosHashPut(builder->cache, &tinsts, sizeof(POINTER_BYTES), &sz, sizeof(sz)); + if ((code = taosHashPut(builder->cache, &tinsts, sizeof(POINTER_BYTES), &sz, sizeof(sz))) != 0) { + goto _exception; + } + *result = sz; } return true; +_exception: + indexError("failed to create dfa state, code:%d", code); + taosArrayDestroy(tinsts); + return false; } FstDfa *dfaCreate(SArray *insts, SArray *states) { diff --git a/source/libs/index/src/indexFstNode.c b/source/libs/index/src/indexFstNode.c index 7f1a86bc55..245f5cf5fc 100644 --- a/source/libs/index/src/indexFstNode.c +++ b/source/libs/index/src/indexFstNode.c @@ -16,9 +16,14 @@ FstBuilderNode* fstBuilderNodeDefault() { FstBuilderNode* bn = taosMemoryMalloc(sizeof(FstBuilderNode)); + if (bn == NULL) return NULL; bn->isFinal = false; bn->finalOutput = 0; bn->trans = taosArrayInit(16, sizeof(FstTransition)); + if (bn->trans == NULL) { + taosMemoryFree(bn); + return NULL; + } return bn; } void fstBuilderNodeDestroy(FstBuilderNode* node) { @@ -56,30 +61,11 @@ bool fstBuilderNodeEqual(FstBuilderNode* n1, FstBuilderNode* n2) { return true; } -FstBuilderNode* fstBuilderNodeClone(FstBuilderNode* src) { - FstBuilderNode* node = taosMemoryMalloc(sizeof(FstBuilderNode)); - if (node == NULL) { - return NULL; - } - // - size_t sz = taosArrayGetSize(src->trans); - SArray* trans = taosArrayInit(sz, sizeof(FstTransition)); - - for (size_t i = 0; i < sz; i++) { - FstTransition* tran = taosArrayGet(src->trans, i); - (void)taosArrayPush(trans, tran); - } - - node->trans = trans; - node->isFinal = src->isFinal; - node->finalOutput = src->finalOutput; - return node; -} // not destroy src, User's bussiness -void fstBuilderNodeCloneFrom(FstBuilderNode* dst, FstBuilderNode* src) { +int32_t fstBuilderNodeCloneFrom(FstBuilderNode* dst, FstBuilderNode* src) { if (dst == NULL || src == NULL) { - return; + return TSDB_CODE_INVALID_PARA; } dst->isFinal = src->isFinal; @@ -89,10 +75,18 @@ void fstBuilderNodeCloneFrom(FstBuilderNode* dst, FstBuilderNode* src) { taosArrayDestroy(dst->trans); size_t sz = taosArrayGetSize(src->trans); dst->trans = taosArrayInit(sz, sizeof(FstTransition)); + if (dst->trans == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } for (size_t i = 0; i < sz; i++) { FstTransition* trn = taosArrayGet(src->trans, i); - (void)taosArrayPush(dst->trans, trn); + if (taosArrayPush(dst->trans, trn) == NULL) { + taosArrayDestroy(dst->trans); + dst->trans = NULL; + return TSDB_CODE_OUT_OF_MEMORY; + } } + return 0; } // bool fstBuilderNodeCompileTo(FstBuilderNode *b, IdxFile *wrt, CompiledAddr lastAddr, CompiledAddr diff --git a/source/libs/index/src/indexFstRegex.c b/source/libs/index/src/indexFstRegex.c index c6ceb1d965..941cb82e90 100644 --- a/source/libs/index/src/indexFstRegex.c +++ b/source/libs/index/src/indexFstRegex.c @@ -39,7 +39,12 @@ FstRegex *regexCreate(const char *str) { for (int i = 0; i < strlen(str); i++) { uint8_t v = str[i]; - (void)taosArrayPush(insts, &v); + if (taosArrayPush(insts, &v) == NULL) { + taosArrayDestroy(insts); + taosMemoryFree(regex->orig); + taosMemoryFree(regex); + return NULL; + } } FstDfaBuilder *builder = dfaBuilderCreate(insts); regex->dfa = dfaBuilderBuild(builder); diff --git a/source/libs/index/src/indexFstRegister.c b/source/libs/index/src/indexFstRegister.c index a8c4365a44..cc069c5d0f 100644 --- a/source/libs/index/src/indexFstRegister.c +++ b/source/libs/index/src/indexFstRegister.c @@ -84,7 +84,10 @@ FstRegistry* fstRegistryCreate(uint64_t tableSize, uint64_t mruSize) { for (uint64_t i = 0; i < nCells; i++) { FstRegistryCell cell = {.addr = NONE_ADDRESS, .node = fstBuilderNodeDefault()}; - (void)taosArrayPush(tb, &cell); + if (taosArrayPush(tb, &cell) == NULL) { + fstRegistryDestroy(registry); + return NULL; + } } registry->table = tb; @@ -125,7 +128,7 @@ FstRegistryEntry* fstRegistryGetEntry(FstRegistry* registry, FstBuilderNode* bNo entry->addr = cell->addr; return entry; } else { - fstBuilderNodeCloneFrom(cell->node, bNode); + (void)fstBuilderNodeCloneFrom(cell->node, bNode); entry->state = NOTFOUND; entry->cell = cell; // copy or not } @@ -145,7 +148,7 @@ FstRegistryEntry* fstRegistryGetEntry(FstRegistry* registry, FstBuilderNode* bNo return entry; } // clone from bNode, refactor later - fstBuilderNodeCloneFrom(cell2->node, bNode); + (void)fstBuilderNodeCloneFrom(cell2->node, bNode); fstRegistryCellSwap(registry->table, start, start + 1); FstRegistryCell* cCell = taosArrayGet(registry->table, start); @@ -166,7 +169,7 @@ FstRegistryEntry* fstRegistryGetEntry(FstRegistry* registry, FstBuilderNode* bNo uint64_t last = end - 1; FstRegistryCell* cell = (FstRegistryCell*)taosArrayGet(registry->table, last); // clone from bNode, refactor later - fstBuilderNodeCloneFrom(cell->node, bNode); + (void)fstBuilderNodeCloneFrom(cell->node, bNode); fstRegistryCellPromote(registry->table, last, start); FstRegistryCell* cCell = taosArrayGet(registry->table, start); diff --git a/source/libs/index/src/indexTfile.c b/source/libs/index/src/indexTfile.c index 8c06296732..55a9bb06d9 100644 --- a/source/libs/index/src/indexTfile.c +++ b/source/libs/index/src/indexTfile.c @@ -49,7 +49,7 @@ static int tfileReaderLoadFst(TFileReader* reader); static int tfileReaderVerify(TFileReader* reader); static int tfileReaderLoadTableIds(TFileReader* reader, int32_t offset, SArray* result); -static SArray* tfileGetFileList(const char* path); +static int32_t tfileGetFileList(const char* path, SArray** pResult); static int tfileRmExpireFile(SArray* result); static void tfileDestroyFileName(void* elem); static int tfileCompare(const void* a, const void* b); @@ -97,9 +97,15 @@ TFileCache* tfileCacheCreate(SIndex* idx, const char* path) { } tcache->tableCache = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); + if (tcache->tableCache == NULL) { + indexError("failed to open table cache since%s", tstrerror(terrno)); + goto End; + } + tcache->capacity = 64; - SArray* files = tfileGetFileList(path); + SArray* files = NULL; + int32_t code = tfileGetFileList(path, &files); for (size_t i = 0; i < taosArrayGetSize(files); i++) { char* file = taosArrayGetP(files, i); @@ -125,7 +131,11 @@ TFileCache* tfileCacheCreate(SIndex* idx, const char* path) { char buf[128] = {0}; int32_t sz = idxSerialCacheKey(&key, buf); - (void)taosHashPut(tcache->tableCache, buf, sz, &reader, sizeof(void*)); + code = taosHashPut(tcache->tableCache, buf, sz, &reader, sizeof(void*)); + if (code != 0) { + tfileReaderDestroy(reader); + goto End; + } tfileReaderRef(reader); } taosArrayDestroyEx(files, tfileDestroyFileName); @@ -163,6 +173,7 @@ TFileReader* tfileCacheGet(TFileCache* tcache, ICacheKey* key) { return *reader; } + int32_t tfileCachePut(TFileCache* tcache, ICacheKey* key, TFileReader* reader) { int32_t code = 0; @@ -172,16 +183,18 @@ int32_t tfileCachePut(TFileCache* tcache, ICacheKey* key, TFileReader* reader) { TFileReader** p = taosHashGet(tcache->tableCache, buf, sz); if (p != NULL && *p != NULL) { TFileReader* oldRdr = *p; - (void)taosHashRemove(tcache->tableCache, buf, sz); - indexInfo("found %s, should remove file %s", buf, oldRdr->ctx->file.buf); - oldRdr->remove = true; - tfileReaderUnRef(oldRdr); + if ((code = taosHashRemove(tcache->tableCache, buf, sz)) != 0) { + indexError("failed to remove old reader from cache since %s, suid:%" PRIu64 ", colName:%s", tstrerror(code), + oldRdr->header.suid, oldRdr->header.colName); + } else { + indexInfo("found %s, should remove file %s", buf, oldRdr->ctx->file.buf); + oldRdr->remove = true; + tfileReaderUnRef(oldRdr); + } } code = taosHashPut(tcache->tableCache, buf, sz, &reader, sizeof(void*)); - if (code == 0) { - tfileReaderRef(reader); - } + tfileReaderRef(reader); return code; } int32_t tfileReaderCreate(IFileCtx* ctx, TFileReader** pReader) { @@ -232,7 +245,7 @@ void tfileReaderDestroy(TFileReader* reader) { } static int32_t tfSearchTerm(void* reader, SIndexTerm* tem, SIdxTRslt* tr) { - int ret = 0; + int32_t ret = 0; char* p = tem->colVal; uint64_t sz = tem->nColVal; @@ -246,6 +259,11 @@ static int32_t tfSearchTerm(void* reader, SIndexTerm* tem, SIdxTRslt* tr) { tem->suid, tem->colName, tem->colVal, cost); ret = tfileReaderLoadTableIds((TFileReader*)reader, (int32_t)offset, tr->total); + if (ret != 0) { + fstSliceDestroy(&key); + indexError("faile to search since %s", tstrerror(ret)); + return ret; + } cost = taosGetTimestampUs() - et; indexInfo("index: %" PRIu64 ", col: %s, colVal: %s, load all table info, time cost: %" PRIu64 "us", tem->suid, tem->colName, tem->colVal, cost); @@ -255,17 +273,29 @@ static int32_t tfSearchTerm(void* reader, SIndexTerm* tem, SIdxTRslt* tr) { } static int32_t tfSearchPrefix(void* reader, SIndexTerm* tem, SIdxTRslt* tr) { + int32_t lino = 0; + int32_t code = 0; char* p = tem->colVal; uint64_t sz = tem->nColVal; SArray* offsets = taosArrayInit(16, sizeof(uint64_t)); + if (offsets == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + FAutoCtx* ctx = automCtxCreate((void*)p, AUTOMATION_PREFIX); + if (ctx == NULL) { + taosArrayDestroy(offsets); + return TSDB_CODE_OUT_OF_MEMORY; + } - FAutoCtx* ctx = automCtxCreate((void*)p, AUTOMATION_PREFIX); FStmBuilder* sb = fstSearch(((TFileReader*)reader)->fst, ctx); FStmSt* st = stmBuilderIntoStm(sb); FStmStRslt* rt = NULL; while ((rt = stmStNextWith(st, NULL)) != NULL) { - (void)taosArrayPush(offsets, &(rt->out.out)); + if (taosArrayPush(offsets, &(rt->out.out)) == NULL) { + TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _exception); + } swsResultDestroy(rt); } stmStDestroy(st); @@ -275,14 +305,16 @@ static int32_t tfSearchPrefix(void* reader, SIndexTerm* tem, SIdxTRslt* tr) { for (int i = 0; i < taosArrayGetSize(offsets); i++) { uint64_t offset = *(uint64_t*)taosArrayGet(offsets, i); ret = tfileReaderLoadTableIds((TFileReader*)reader, offset, tr->total); - if (ret != 0) { - taosArrayDestroy(offsets); - indexError("failed to find target tablelist"); - return TSDB_CODE_FILE_CORRUPTED; - } + TAOS_CHECK_GOTO(ret, &lino, _exception); } taosArrayDestroy(offsets); return 0; +_exception: + stmStDestroy(st); + stmBuilderDestroy(sb); + taosArrayDestroy(offsets); + indexError("failed to searchPrefix since %s, lino:%d", tstrerror(code), lino); + return code; } static int32_t tfSearchSuffix(void* reader, SIndexTerm* tem, SIdxTRslt* tr) { int ret = 0; @@ -393,6 +425,12 @@ static int32_t tfSearchTerm_JSON(void* reader, SIndexTerm* tem, SIdxTRslt* tr) { tem->suid, tem->colName, tem->colVal, cost); ret = tfileReaderLoadTableIds((TFileReader*)reader, offset, tr->total); + if (ret != 0) { + indexError("failed to search json since %s", tstrerror(ret)); + taosMemoryFree(p); + fstSliceDestroy(&key); + return ret; + } cost = taosGetTimestampUs() - et; indexInfo("index: %" PRIu64 ", col: %s, colVal: %s, load all table info, offset: %" PRIu64 ", size: %d, time cost: %" PRIu64 "us", @@ -863,14 +901,24 @@ TFileValue* tfileValueCreate(char* val) { return NULL; } tf->colVal = taosStrdup(val); + if (tf->colVal == NULL) { + taosMemoryFree(tf); + } tf->tableId = taosArrayInit(32, sizeof(uint64_t)); + if (tf->tableId == NULL) { + taosMemoryFree(tf->colVal); + taosMemoryFree(tf); + return NULL; + } return tf; } -int tfileValuePush(TFileValue* tf, uint64_t val) { +int32_t tfileValuePush(TFileValue* tf, uint64_t val) { if (tf == NULL) { - return -1; + return TSDB_CODE_INVALID_PARA; + } + if (taosArrayPush(tf->tableId, &val) == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; } - (void)taosArrayPush(tf->tableId, &val); return 0; } void tfileValueDestroy(TFileValue* tf) { @@ -986,8 +1034,10 @@ static int tfileReaderLoadFst(TFileReader* reader) { return reader->fst != NULL ? 0 : -1; } -static int tfileReaderLoadTableIds(TFileReader* reader, int32_t offset, SArray* result) { +static int32_t tfileReaderLoadTableIds(TFileReader* reader, int32_t offset, SArray* result) { // TODO(yihao): opt later + int32_t code = 0; + int32_t lino = 0; IFileCtx* ctx = reader->ctx; // add block cache char block[4096] = {0}; @@ -1003,7 +1053,9 @@ static int tfileReaderLoadTableIds(TFileReader* reader, int32_t offset, SArray* while (nid > 0) { int32_t left = block + sizeof(block) - p; if (left >= sizeof(uint64_t)) { - (void)taosArrayPush(result, (uint64_t*)p); + if (taosArrayPush(result, (uint64_t*)p) == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } p += sizeof(uint64_t); } else { char buf[sizeof(uint64_t)] = {0}; @@ -1014,7 +1066,9 @@ static int tfileReaderLoadTableIds(TFileReader* reader, int32_t offset, SArray* nread = ctx->readFrom(ctx, (uint8_t*)block, sizeof(block), offset); memcpy(buf + left, block, sizeof(uint64_t) - left); - (void)taosArrayPush(result, (uint64_t*)buf); + if (taosArrayPush(result, (uint64_t*)buf) == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } p = block + sizeof(uint64_t) - left; } nid -= 1; @@ -1059,16 +1113,19 @@ void tfileReaderUnRef(TFileReader* rd) { } } -static SArray* tfileGetFileList(const char* path) { +static int32_t tfileGetFileList(const char* path, SArray** ppResult) { + int32_t code = 0; char buf[128] = {0}; uint64_t suid; int64_t version; SArray* files = taosArrayInit(4, sizeof(void*)); + if (files == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } TdDirPtr pDir = taosOpenDir(path); if (NULL == pDir) { - taosArrayDestroy(files); - return NULL; + TAOS_CHECK_GOTO(TAOS_SYSTEM_ERROR(errno), NULL, _exception); } TdDirEntryPtr pDirEntry; while ((pDirEntry = taosReadDir(pDir)) != NULL) { @@ -1079,15 +1136,29 @@ static SArray* tfileGetFileList(const char* path) { size_t len = strlen(path) + 1 + strlen(file) + 1; char* buf = taosMemoryCalloc(1, len); + if (buf == NULL) { + TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _exception); + } + sprintf(buf, "%s/%s", path, file); - (void)taosArrayPush(files, &buf); + if (taosArrayPush(files, &buf) == NULL) { + TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _exception); + } } (void)taosCloseDir(&pDir); taosArraySort(files, tfileCompare); (void)tfileRmExpireFile(files); + *ppResult = files; + return 0; - return files; +_exception: + (void)taosCloseDir(&pDir); + if (files != NULL) { + taosArrayDestroyEx(files, tfileDestroyFileName); + taosArrayDestroy(files); + } + return code; } static int tfileRmExpireFile(SArray* result) { // TODO(yihao): remove expire tindex after restart diff --git a/source/libs/index/src/indexUtil.c b/source/libs/index/src/indexUtil.c index 0776e71180..6a52b7b5a4 100644 --- a/source/libs/index/src/indexUtil.c +++ b/source/libs/index/src/indexUtil.c @@ -36,12 +36,16 @@ static FORCE_INLINE int iBinarySearch(SArray *arr, int s, int e, uint64_t k) { return s; } -void iIntersection(SArray *in, SArray *out) { +int32_t iIntersection(SArray *in, SArray *out) { + int32_t code = 0; int32_t sz = (int32_t)taosArrayGetSize(in); if (sz <= 0) { - return; + return 0; } MergeIndex *mi = taosMemoryCalloc(sz, sizeof(MergeIndex)); + if (mi == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } for (int i = 0; i < sz; i++) { SArray *t = taosArrayGetP(in, i); mi[i].len = (int32_t)taosArrayGetSize(t); @@ -64,19 +68,25 @@ void iIntersection(SArray *in, SArray *out) { } } if (has == true) { - (void)taosArrayPush(out, &tgt); + if (taosArrayPush(out, &tgt) == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + break; + } } } taosMemoryFreeClear(mi); + return code; } -void iUnion(SArray *in, SArray *out) { +int32_t iUnion(SArray *in, SArray *out) { + int32_t code = 0; int32_t sz = (int32_t)taosArrayGetSize(in); if (sz <= 0) { - return; + return 0; } if (sz == 1) { - (void)taosArrayAddAll(out, taosArrayGetP(in, 0)); - return; + if (taosArrayAddAll(out, taosArrayGetP(in, 0)) == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } } MergeIndex *mi = taosMemoryCalloc(sz, sizeof(MergeIndex)); @@ -108,19 +118,23 @@ void iUnion(SArray *in, SArray *out) { continue; } } - (void)taosArrayPush(out, &mVal); + if (taosArrayPush(out, &mVal) == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + break; + } } else { break; } } taosMemoryFreeClear(mi); + return 0; } -void iExcept(SArray *total, SArray *except) { +int32_t iExcept(SArray *total, SArray *except) { int32_t tsz = (int32_t)taosArrayGetSize(total); int32_t esz = (int32_t)taosArrayGetSize(except); if (esz == 0 || tsz == 0) { - return; + return 0; } int vIdx = 0; @@ -135,6 +149,7 @@ void iExcept(SArray *total, SArray *except) { } taosArrayPopTailBatch(total, tsz - vIdx); + return 0; } int uidCompare(const void *a, const void *b) { @@ -191,20 +206,37 @@ void idxTRsltDestroy(SIdxTRslt *tr) { taosArrayDestroy(tr->del); taosMemoryFree(tr); } -void idxTRsltMergeTo(SIdxTRslt *tr, SArray *result) { +int32_t idxTRsltMergeTo(SIdxTRslt *tr, SArray *result) { + int32_t code = 0; taosArraySort(tr->total, uidCompare); taosArraySort(tr->add, uidCompare); taosArraySort(tr->del, uidCompare); if (taosArrayGetSize(tr->total) == 0 || taosArrayGetSize(tr->add) == 0) { SArray *t = taosArrayGetSize(tr->total) == 0 ? tr->add : tr->total; - (void)taosArrayAddAll(result, t); + if (taosArrayAddAll(result, t) == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } } else { SArray *arrs = taosArrayInit(2, sizeof(void *)); - (void)taosArrayPush(arrs, &tr->total); - (void)taosArrayPush(arrs, &tr->add); - iUnion(arrs, result); + if (arrs == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + if (taosArrayPush(arrs, &tr->total) == NULL) { + taosArrayDestroy(arrs); + return TSDB_CODE_OUT_OF_MEMORY; + } + + if (taosArrayPush(arrs, &tr->add) == NULL) { + taosArrayDestroy(arrs); + return TSDB_CODE_OUT_OF_MEMORY; + } + code = iUnion(arrs, result); taosArrayDestroy(arrs); } - iExcept(result, tr->del); + if (code == 0) { + code = iExcept(result, tr->del); + } + return code; } diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index 29fb18ef07..4793a8951a 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -1096,21 +1096,23 @@ int32_t delObsoleteCheckpoint(void* arg, const char* path) { * replication is finished */ int32_t chkpMayDelObsolete(void* arg, int64_t chkpId, char* path) { + int32_t code = 0; STaskDbWrapper* pBackend = arg; - + SArray * chkpDel = NULL, *chkpDup = NULL; (void)taosThreadRwlockWrlock(&pBackend->chkpDirLock); - (void)taosArrayPush(pBackend->chkpSaved, &chkpId); - - SArray* chkpDel = taosArrayInit(8, sizeof(int64_t)); - if (chkpDel == NULL) { - return TSDB_CODE_OUT_OF_MEMORY; + if (taosArrayPush(pBackend->chkpSaved, &chkpId) == NULL) { + TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _exception); } - SArray* chkpDup = taosArrayInit(8, sizeof(int64_t)); + chkpDel = taosArrayInit(8, sizeof(int64_t)); + if (chkpDel == NULL) { + TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _exception); + } + + chkpDup = taosArrayInit(8, sizeof(int64_t)); if (chkpDup == NULL) { - taosArrayDestroy(chkpDel); - return TSDB_CODE_OUT_OF_MEMORY; + TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _exception); } int64_t firsId = 0; @@ -1120,9 +1122,13 @@ int32_t chkpMayDelObsolete(void* arg, int64_t chkpId, char* path) { for (int i = 0; i < taosArrayGetSize(pBackend->chkpSaved); i++) { int64_t id = *(int64_t*)taosArrayGet(pBackend->chkpSaved, i); if (id >= firsId) { - (void)taosArrayPush(chkpDup, &id); + if (taosArrayPush(chkpDup, &id) == NULL) { + TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _exception); + } } else { - (void)taosArrayPush(chkpDel, &id); + if (taosArrayPush(chkpDel, &id) == NULL) { + TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _exception); + } } } } else { @@ -1131,13 +1137,18 @@ int32_t chkpMayDelObsolete(void* arg, int64_t chkpId, char* path) { for (int i = 0; i < dsz; i++) { int64_t id = *(int64_t*)taosArrayGet(pBackend->chkpSaved, i); - (void)taosArrayPush(chkpDel, &id); + if (taosArrayPush(chkpDel, &id) == NULL) { + TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _exception); + } } for (int i = dsz < 0 ? 0 : dsz; i < sz; i++) { int64_t id = *(int64_t*)taosArrayGet(pBackend->chkpSaved, i); - (void)taosArrayPush(chkpDup, &id); + if (taosArrayPush(chkpDup, &id) == NULL) { + TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _exception); + } } } + taosArrayDestroy(pBackend->chkpSaved); pBackend->chkpSaved = chkpDup; @@ -1155,6 +1166,11 @@ int32_t chkpMayDelObsolete(void* arg, int64_t chkpId, char* path) { } taosArrayDestroy(chkpDel); return 0; +_exception: + taosArrayDestroy(chkpDup); + taosArrayDestroy(chkpDel); + (void)taosThreadRwlockUnlock(&pBackend->chkpDirLock); + return code; } #ifdef BUILD_NO_CALL @@ -1288,7 +1304,9 @@ int32_t taskDbLoadChkpInfo(STaskDbWrapper* pBackend) { int ret = sscanf(taosGetDirEntryName(de), "checkpoint%" PRId64 "", &checkpointId); if (ret == 1) { - (void)taosArrayPush(pBackend->chkpSaved, &checkpointId); + if (taosArrayPush(pBackend->chkpSaved, &checkpointId) == NULL) { + TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _exception); + } } } else { continue; @@ -1300,13 +1318,21 @@ int32_t taskDbLoadChkpInfo(STaskDbWrapper* pBackend) { (void)taosCloseDir(&pDir); return 0; +_exception: + taosMemoryFree(pChkpDir); + (void)taosCloseDir(&pDir); + return code; } int32_t chkpGetAllDbCfHandle2(STaskDbWrapper* pBackend, rocksdb_column_family_handle_t*** ppHandle) { + int32_t code = 0; SArray* pHandle = taosArrayInit(8, POINTER_BYTES); for (int i = 0; i < sizeof(ginitDict) / sizeof(ginitDict[0]); i++) { if (pBackend->pCf[i]) { rocksdb_column_family_handle_t* p = pBackend->pCf[i]; - (void)taosArrayPush(pHandle, &p); + if (taosArrayPush(pHandle, &p) == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _exception; + } } } int32_t nCf = taosArrayGetSize(pHandle); @@ -1316,13 +1342,20 @@ int32_t chkpGetAllDbCfHandle2(STaskDbWrapper* pBackend, rocksdb_column_family_ha } rocksdb_column_family_handle_t** ppCf = taosMemoryCalloc(nCf, sizeof(rocksdb_column_family_handle_t*)); + if (ppCf == NULL) { + TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _exception); + } for (int i = 0; i < nCf; i++) { ppCf[i] = taosArrayGetP(pHandle, i); } + taosArrayDestroy(pHandle); *ppHandle = ppCf; return nCf; +_exception: + taosArrayDestroy(pHandle); + return code; } int32_t chkpDoDbCheckpoint(rocksdb_t* db, char* path) { @@ -2435,7 +2468,9 @@ void taskDbInitChkpOpt(STaskDbWrapper* pTaskDb) { void taskDbRefChkp(STaskDbWrapper* pTaskDb, int64_t chkp) { (void)taosThreadRwlockWrlock(&pTaskDb->chkpDirLock); - (void)taosArrayPush(pTaskDb->chkpInUse, &chkp); + if (taosArrayPush(pTaskDb->chkpInUse, &chkp) == NULL) { + stError("failed to push chkp: %" PRIi64 " into inuse", chkp); + } taosArraySort(pTaskDb->chkpInUse, chkpIdComp); (void)taosThreadRwlockUnlock(&pTaskDb->chkpDirLock); } @@ -4271,7 +4306,10 @@ int32_t streamDefaultIterGet_rocksdb(SStreamState* pState, const void* start, co if (strncmp(key, start, strlen(start)) == 0 && strlen(key) >= strlen(start) + 1) { int64_t checkPoint = 0; if (sscanf(key + strlen(key), ":%" PRId64 "", &checkPoint) == 1) { - (void)taosArrayPush(result, &checkPoint); + if (taosArrayPush(result, &checkPoint) == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + break; + } } } else { break; @@ -4487,7 +4525,10 @@ int32_t compareHashTableImpl(SHashObj* p1, SHashObj* p2, SArray* diff) { return TSDB_CODE_OUT_OF_MEMORY; } (void)strncpy(fname, name, len); - (void)taosArrayPush(diff, &fname); + if (taosArrayPush(diff, &fname) == NULL) { + taosMemoryFree(fname); + return TSDB_CODE_OUT_OF_MEMORY; + } } pIter = taosHashIterate(p2, pIter); } @@ -4646,7 +4687,11 @@ int32_t dbChkpGetDelta(SDbChkp* p, int64_t chkpId, SArray* list) { } (void)strncpy(fname, name, len); - (void)taosArrayPush(p->pAdd, &fname); + if (taosArrayPush(p->pAdd, &fname) == NULL) { + taosMemoryFree(fname); + (void)taosThreadRwlockUnlock(&p->rwLock); + return TSDB_CODE_OUT_OF_MEMORY; + } } pIter = taosHashIterate(p->pSstTbl[1 - p->idx], pIter); } @@ -4850,7 +4895,11 @@ int32_t dbChkpDumpTo(SDbChkp* p, char* dname, SArray* list) { code = TSDB_CODE_OUT_OF_MEMORY; goto _ERROR; } - (void)taosArrayPush(list, &p); + if (taosArrayPush(list, &p) == NULL) { + taosMemoryFree(p); + code = TSDB_CODE_OUT_OF_MEMORY; + goto _ERROR; + } } // copy current file to dst dir