check return code

This commit is contained in:
yihaoDeng 2024-08-26 11:48:20 +08:00
parent 27bdc9e71e
commit b57cd5f0d3
9 changed files with 255 additions and 81 deletions

View File

@ -44,7 +44,7 @@ FstBuilderNode* fstBuilderNodeDefault();
FstBuilderNode* fstBuilderNodeClone(FstBuilderNode* src); FstBuilderNode* fstBuilderNodeClone(FstBuilderNode* src);
void fstBuilderNodeCloneFrom(FstBuilderNode* dst, FstBuilderNode* src); int32_t fstBuilderNodeCloneFrom(FstBuilderNode* dst, FstBuilderNode* src);
// bool fstBuilderNodeCompileTo(FstBuilderNode *b, IdxFile' *wrt, // bool fstBuilderNodeCompileTo(FstBuilderNode *b, IdxFile' *wrt,
// CompiledAddr lastAddr, CompiledAddr startAddr); // CompiledAddr lastAddr, CompiledAddr startAddr);

View File

@ -129,8 +129,8 @@ void tfileIteratorDestroy(Iterate* iterator);
TFileValue* tfileValueCreate(char* val); TFileValue* tfileValueCreate(char* val);
int tfileValuePush(TFileValue* tf, uint64_t val); int32_t tfileValuePush(TFileValue* tf, uint64_t val);
void tfileValueDestroy(TFileValue* tf); void tfileValueDestroy(TFileValue* tf);
#ifdef __cplusplus #ifdef __cplusplus
} }

View File

@ -46,17 +46,17 @@ extern "C" {
buf += len; \ buf += len; \
} while (0) } while (0)
#define INDEX_MERGE_ADD_DEL(src, dst, tgt) \ #define INDEX_MERGE_ADD_DEL(src, dst, tgt) \
{ \ { \
bool f = false; \ bool f = false; \
for (int i = 0; i < taosArrayGetSize(src); i++) { \ for (int i = 0; i < taosArrayGetSize(src); i++) { \
if (*(uint64_t *)taosArrayGet(src, i) == tgt) { \ if (*(uint64_t *)taosArrayGet(src, i) == tgt) { \
f = true; \ f = true; \
} \ } \
} \ } \
if (f == false) { \ if (f == false) { \
(void)taosArrayPush(dst, &tgt); \ if (taosArrayPush(dst, &tgt) == NULL) code = TSDB_CODE_OUT_OF_MEMORY; \
} \ } \
} }
/* multi sorted result intersection /* multi sorted result intersection
@ -65,7 +65,7 @@ extern "C" {
* [1, 4, 5] * [1, 4, 5]
* output:[4, 5] * output:[4, 5]
*/ */
void iIntersection(SArray *in, SArray *out); int32_t iIntersection(SArray *in, SArray *out);
/* multi sorted result union /* multi sorted result union
* input: [1, 2, 4, 5] * input: [1, 2, 4, 5]
@ -73,7 +73,7 @@ void iIntersection(SArray *in, SArray *out);
* [1, 4, 5] * [1, 4, 5]
* output:[1, 2, 3, 4, 5] * output:[1, 2, 3, 4, 5]
*/ */
void iUnion(SArray *in, SArray *out); int32_t iUnion(SArray *in, SArray *out);
/* see example /* see example
* total: [1, 2, 4, 5, 7, 8] * total: [1, 2, 4, 5, 7, 8]
@ -81,7 +81,7 @@ void iUnion(SArray *in, SArray *out);
* return: [1, 2, 7, 8] saved in total * return: [1, 2, 7, 8] saved in total
*/ */
void iExcept(SArray *total, SArray *except); int32_t iExcept(SArray *total, SArray *except);
int uidCompare(const void *a, const void *b); int uidCompare(const void *a, const void *b);
@ -107,7 +107,7 @@ void idxTRsltClear(SIdxTRslt *tr);
void idxTRsltDestroy(SIdxTRslt *tr); void idxTRsltDestroy(SIdxTRslt *tr);
void idxTRsltMergeTo(SIdxTRslt *tr, SArray *out); int32_t idxTRsltMergeTo(SIdxTRslt *tr, SArray *out);
#ifdef __cplusplus #ifdef __cplusplus
} }

View File

@ -92,7 +92,7 @@ static int32_t idxMergeFinalResults(SArray* in, EIndexOperatorType oType, SArray
static int32_t idxGenTFile(SIndex* index, IndexCache* cache, SArray* batch); static int32_t idxGenTFile(SIndex* index, IndexCache* cache, SArray* batch);
// merge cache and tfile by opera type // merge cache and tfile by opera type
static void idxMergeCacheAndTFile(SArray* result, IterateValue* icache, IterateValue* iTfv, SIdxTRslt* helper); static int32_t idxMergeCacheAndTFile(SArray* result, IterateValue* icache, IterateValue* iTfv, SIdxTRslt* helper);
// static int32_t indexSerialTermKey(SIndexTerm* itm, char* buf); // static int32_t indexSerialTermKey(SIndexTerm* itm, char* buf);
// int32_t indexSerialKey(ICacheKey* key, char* buf); // int32_t indexSerialKey(ICacheKey* key, char* buf);
@ -247,15 +247,27 @@ int32_t indexPut(SIndex* index, SIndexMultiTerm* fVals, uint64_t uid) {
return 0; return 0;
} }
int32_t indexSearch(SIndex* index, SIndexMultiTermQuery* multiQuerys, SArray* result) { int32_t indexSearch(SIndex* index, SIndexMultiTermQuery* multiQuerys, SArray* result) {
int32_t code = 0;
EIndexOperatorType opera = multiQuerys->opera; // relation of querys EIndexOperatorType opera = multiQuerys->opera; // relation of querys
SArray* iRslts = taosArrayInit(4, POINTER_BYTES); SArray* iRslts = taosArrayInit(4, POINTER_BYTES);
int nQuery = taosArrayGetSize(multiQuerys->query); if (iRslts == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
int nQuery = taosArrayGetSize(multiQuerys->query);
for (size_t i = 0; i < nQuery; i++) { for (size_t i = 0; i < nQuery; i++) {
SIndexTermQuery* qterm = taosArrayGet(multiQuerys->query, i); SIndexTermQuery* qterm = taosArrayGet(multiQuerys->query, i);
SArray* trslt = NULL; SArray* trslt = NULL;
(void)idxTermSearch(index, qterm, &trslt); code = idxTermSearch(index, qterm, &trslt);
(void)taosArrayPush(iRslts, (void*)&trslt); if (code != 0) {
idxInterRsltDestroy(iRslts);
return code;
}
if (taosArrayPush(iRslts, (void*)&trslt) == NULL) {
idxInterRsltDestroy(iRslts);
return TSDB_CODE_OUT_OF_MEMORY;
}
} }
(void)idxMergeFinalResults(iRslts, opera, result); (void)idxMergeFinalResults(iRslts, opera, result);
idxInterRsltDestroy(iRslts); idxInterRsltDestroy(iRslts);
@ -267,6 +279,9 @@ int indexDelete(SIndex* index, SIndexMultiTermQuery* query) { return 1; }
SIndexOpts* indexOptsCreate(int32_t cacheSize) { SIndexOpts* indexOptsCreate(int32_t cacheSize) {
SIndexOpts* opts = taosMemoryCalloc(1, sizeof(SIndexOpts)); SIndexOpts* opts = taosMemoryCalloc(1, sizeof(SIndexOpts));
if (opts == NULL) {
return NULL;
}
opts->cacheSize = cacheSize; opts->cacheSize = cacheSize;
return opts; return opts;
} }
@ -295,7 +310,7 @@ void indexMultiTermQueryDestroy(SIndexMultiTermQuery* pQuery) {
int32_t indexMultiTermQueryAdd(SIndexMultiTermQuery* pQuery, SIndexTerm* term, EIndexQueryType qType) { int32_t indexMultiTermQueryAdd(SIndexMultiTermQuery* pQuery, SIndexTerm* term, EIndexQueryType qType) {
SIndexTermQuery q = {.qType = qType, .term = term}; SIndexTermQuery q = {.qType = qType, .term = term};
if (taosArrayPush(pQuery->query, &q) == NULL) { if (taosArrayPush(pQuery->query, &q) == NULL) {
return terrno; return TSDB_CODE_OUT_OF_MEMORY;
} }
return 0; return 0;
} }
@ -362,7 +377,9 @@ void indexTermDestroy(SIndexTerm* p) {
SIndexMultiTerm* indexMultiTermCreate() { return taosArrayInit(4, sizeof(SIndexTerm*)); } SIndexMultiTerm* indexMultiTermCreate() { return taosArrayInit(4, sizeof(SIndexTerm*)); }
int32_t indexMultiTermAdd(SIndexMultiTerm* terms, SIndexTerm* term) { int32_t indexMultiTermAdd(SIndexMultiTerm* terms, SIndexTerm* term) {
(void)taosArrayPush(terms, &term); if (taosArrayPush(terms, &term) == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
return 0; return 0;
} }
void indexMultiTermDestroy(SIndexMultiTerm* terms) { void indexMultiTermDestroy(SIndexMultiTerm* terms) {
@ -422,6 +439,7 @@ bool indexJsonIsRebuild(SIndexJson* idx) {
} }
static int32_t idxTermSearch(SIndex* sIdx, SIndexTermQuery* query, SArray** result) { static int32_t idxTermSearch(SIndex* sIdx, SIndexTermQuery* query, SArray** result) {
int32_t code = 0;
SIndexTerm* term = query->term; SIndexTerm* term = query->term;
const char* colName = term->colName; const char* colName = term->colName;
int32_t nColName = term->nColName; int32_t nColName = term->nColName;
@ -452,6 +470,10 @@ static int32_t idxTermSearch(SIndex* sIdx, SIndexTermQuery* query, SArray** resu
int64_t st = taosGetTimestampUs(); int64_t st = taosGetTimestampUs();
SIdxTRslt* tr = idxTRsltCreate(); SIdxTRslt* tr = idxTRsltCreate();
if (tr == NULL) {
TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, END);
}
if (0 == idxCacheSearch(cache, query, tr, &s)) { if (0 == idxCacheSearch(cache, query, tr, &s)) {
if (s == kTypeDeletion) { if (s == kTypeDeletion) {
indexInfo("col: %s already drop by", term->colName); indexInfo("col: %s already drop by", term->colName);
@ -473,13 +495,14 @@ static int32_t idxTermSearch(SIndex* sIdx, SIndexTermQuery* query, SArray** resu
int64_t cost = taosGetTimestampUs() - st; int64_t cost = taosGetTimestampUs() - st;
indexInfo("search cost: %" PRIu64 "us", cost); indexInfo("search cost: %" PRIu64 "us", cost);
idxTRsltMergeTo(tr, *result); code = idxTRsltMergeTo(tr, *result);
TAOS_CHECK_GOTO(code, NULL, END);
idxTRsltDestroy(tr); idxTRsltDestroy(tr);
return 0; return 0;
END: END:
idxTRsltDestroy(tr); idxTRsltDestroy(tr);
return 0; return code;
} }
static void idxInterRsltDestroy(SArray* results) { static void idxInterRsltDestroy(SArray* results) {
if (results == NULL) { if (results == NULL) {
@ -514,30 +537,50 @@ static int32_t idxMergeFinalResults(SArray* in, EIndexOperatorType oType, SArray
return 0; return 0;
} }
static void idxMayMergeTempToFinalRslt(SArray* result, TFileValue* tfv, SIdxTRslt* tr) { static int32_t idxMayMergeTempToFinalRslt(SArray* result, TFileValue* tfv, SIdxTRslt* tr) {
int32_t code = 0;
int32_t sz = taosArrayGetSize(result); int32_t sz = taosArrayGetSize(result);
if (sz > 0) { if (sz > 0) {
TFileValue* lv = taosArrayGetP(result, sz - 1); TFileValue* lv = taosArrayGetP(result, sz - 1);
if (tfv != NULL && strcmp(lv->colVal, tfv->colVal) != 0) { if (tfv != NULL && strcmp(lv->colVal, tfv->colVal) != 0) {
idxTRsltMergeTo(tr, lv->tableId); code = idxTRsltMergeTo(tr, lv->tableId);
if (code != 0) {
indexFatal("failed to merge result since %s", tstrerror(code));
return code;
}
idxTRsltClear(tr); idxTRsltClear(tr);
(void)taosArrayPush(result, &tfv); if (taosArrayPush(result, &tfv) == NULL) {
indexFatal("failed to merge result since %s", tstrerror(TSDB_CODE_OUT_OF_MEMORY));
}
} else if (tfv == NULL) { } else if (tfv == NULL) {
// handle last iterator // handle last iterator
idxTRsltMergeTo(tr, lv->tableId); code = idxTRsltMergeTo(tr, lv->tableId);
if (code != 0) {
indexFatal("failed to merge result since %s", tstrerror(code));
}
} else { } else {
tfileValueDestroy(tfv); return TSDB_CODE_INVALID_PARA;
} }
} else { } else {
(void)taosArrayPush(result, &tfv); if (taosArrayPush(result, &tfv) == NULL) {
}
} }
return code;
} }
static void idxMergeCacheAndTFile(SArray* result, IterateValue* cv, IterateValue* tv, SIdxTRslt* tr) { static int32_t idxMergeCacheAndTFile(SArray* result, IterateValue* cv, IterateValue* tv, SIdxTRslt* tr) {
int32_t code = 0;
char* colVal = (cv != NULL) ? cv->colVal : tv->colVal; char* colVal = (cv != NULL) ? cv->colVal : tv->colVal;
TFileValue* tfv = tfileValueCreate(colVal); TFileValue* tfv = tfileValueCreate(colVal);
if (tfv == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
idxMayMergeTempToFinalRslt(result, tfv, tr); code = idxMayMergeTempToFinalRslt(result, tfv, tr);
if (code != 0) {
tfileValueDestroy(tfv);
return code;
}
if (cv != NULL) { if (cv != NULL) {
uint64_t id = *(uint64_t*)taosArrayGet(cv->val, 0); uint64_t id = *(uint64_t*)taosArrayGet(cv->val, 0);
@ -548,9 +591,19 @@ static void idxMergeCacheAndTFile(SArray* result, IterateValue* cv, IterateValue
INDEX_MERGE_ADD_DEL(tr->add, tr->del, id) INDEX_MERGE_ADD_DEL(tr->add, tr->del, id)
} }
} }
if (tv != NULL) {
(void)taosArrayAddAll(tr->total, tv->val); if (code != 0) {
tfileValueDestroy(tfv);
return code;
} }
if (tv != NULL) {
if (taosArrayAddAll(tr->total, tv->val) == NULL) {
tfileValueDestroy(tfv);
return TSDB_CODE_OUT_OF_MEMORY;
}
}
return 0;
} }
static void idxDestroyFinalRslt(SArray* result) { static void idxDestroyFinalRslt(SArray* result) {
int32_t sz = result ? taosArrayGetSize(result) : 0; int32_t sz = result ? taosArrayGetSize(result) : 0;

View File

@ -75,6 +75,7 @@ static int32_t cacheSearchTerm(void* cache, SIndexTerm* term, SIdxTRslt* tr, STe
if (cache == NULL) { if (cache == NULL) {
return 0; return 0;
} }
int32_t code = 0;
MemTable* mem = cache; MemTable* mem = cache;
IndexCache* pCache = mem->pCache; IndexCache* pCache = mem->pCache;
@ -98,6 +99,10 @@ static int32_t cacheSearchTerm(void* cache, SIndexTerm* term, SIdxTRslt* tr, STe
} else if (c->operaType == DEL_VALUE) { } else if (c->operaType == DEL_VALUE) {
INDEX_MERGE_ADD_DEL(tr->add, tr->del, c->uid) INDEX_MERGE_ADD_DEL(tr->add, tr->del, c->uid)
} }
if (code != 0) {
break;
}
} else { } else {
break; break;
} }
@ -105,7 +110,7 @@ static int32_t cacheSearchTerm(void* cache, SIndexTerm* term, SIdxTRslt* tr, STe
taosMemoryFree(pCt); taosMemoryFree(pCt);
(void)tSkipListDestroyIter(iter); (void)tSkipListDestroyIter(iter);
return 0; return code;
} }
static int32_t cacheSearchPrefix(void* cache, SIndexTerm* term, SIdxTRslt* tr, STermValueType* s) { static int32_t cacheSearchPrefix(void* cache, SIndexTerm* term, SIdxTRslt* tr, STermValueType* s) {
// impl later // impl later
@ -123,6 +128,7 @@ static int32_t cacheSearchCompareFunc(void* cache, SIndexTerm* term, SIdxTRslt*
if (cache == NULL) { if (cache == NULL) {
return 0; return 0;
} }
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
MemTable* mem = cache; MemTable* mem = cache;
IndexCache* pCache = mem->pCache; IndexCache* pCache = mem->pCache;
@ -148,7 +154,7 @@ static int32_t cacheSearchCompareFunc(void* cache, SIndexTerm* term, SIdxTRslt*
} }
CacheTerm* c = (CacheTerm*)SL_GET_NODE_DATA(node); CacheTerm* c = (CacheTerm*)SL_GET_NODE_DATA(node);
terrno = TSDB_CODE_SUCCESS; terrno = TSDB_CODE_SUCCESS;
TExeCond cond = cmpFn(c->colVal, pCt->colVal, pCt->colType); TExeCond cond = cmpFn(c->colVal, pCt->colVal, pCt->colType);
if (terrno != TSDB_CODE_SUCCESS) { if (terrno != TSDB_CODE_SUCCESS) {
code = terrno; code = terrno;
goto _return; goto _return;
@ -161,6 +167,10 @@ static int32_t cacheSearchCompareFunc(void* cache, SIndexTerm* term, SIdxTRslt*
} else if (c->operaType == DEL_VALUE) { } else if (c->operaType == DEL_VALUE) {
INDEX_MERGE_ADD_DEL(tr->add, tr->del, c->uid) INDEX_MERGE_ADD_DEL(tr->add, tr->del, c->uid)
} }
if (code != 0) {
break;
}
} else if (cond == CONTINUE) { } else if (cond == CONTINUE) {
continue; continue;
} else if (cond == BREAK) { } else if (cond == BREAK) {
@ -190,6 +200,8 @@ static int32_t cacheSearchTerm_JSON(void* cache, SIndexTerm* term, SIdxTRslt* tr
if (cache == NULL) { if (cache == NULL) {
return 0; return 0;
} }
int32_t code = 0;
MemTable* mem = cache; MemTable* mem = cache;
IndexCache* pCache = mem->pCache; IndexCache* pCache = mem->pCache;
@ -223,6 +235,10 @@ static int32_t cacheSearchTerm_JSON(void* cache, SIndexTerm* term, SIdxTRslt* tr
} else if (c->operaType == DEL_VALUE) { } else if (c->operaType == DEL_VALUE) {
INDEX_MERGE_ADD_DEL(tr->add, tr->del, c->uid) INDEX_MERGE_ADD_DEL(tr->add, tr->del, c->uid)
} }
if (code != 0) {
break;
}
} else { } else {
break; break;
} }
@ -231,9 +247,7 @@ static int32_t cacheSearchTerm_JSON(void* cache, SIndexTerm* term, SIdxTRslt* tr
taosMemoryFree(pCt); taosMemoryFree(pCt);
taosMemoryFree(exBuf); taosMemoryFree(exBuf);
(void)tSkipListDestroyIter(iter); (void)tSkipListDestroyIter(iter);
return 0; return code;
return TSDB_CODE_SUCCESS;
} }
static int32_t cacheSearchSuffix_JSON(void* cache, SIndexTerm* term, SIdxTRslt* tr, STermValueType* s) { static int32_t cacheSearchSuffix_JSON(void* cache, SIndexTerm* term, SIdxTRslt* tr, STermValueType* s) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
@ -338,6 +352,10 @@ static int32_t cacheSearchCompareFunc_JSON(void* cache, SIndexTerm* term, SIdxTR
} else if (c->operaType == DEL_VALUE) { } else if (c->operaType == DEL_VALUE) {
INDEX_MERGE_ADD_DEL(tr->add, tr->del, c->uid) INDEX_MERGE_ADD_DEL(tr->add, tr->del, c->uid)
} }
if (code != 0) {
break;
}
} else if (cond == CONTINUE) { } else if (cond == CONTINUE) {
continue; continue;
} else if (cond == BREAK) { } else if (cond == BREAK) {

View File

@ -16,9 +16,14 @@
FstBuilderNode* fstBuilderNodeDefault() { FstBuilderNode* fstBuilderNodeDefault() {
FstBuilderNode* bn = taosMemoryMalloc(sizeof(FstBuilderNode)); FstBuilderNode* bn = taosMemoryMalloc(sizeof(FstBuilderNode));
if (bn == NULL) return NULL;
bn->isFinal = false; bn->isFinal = false;
bn->finalOutput = 0; bn->finalOutput = 0;
bn->trans = taosArrayInit(16, sizeof(FstTransition)); bn->trans = taosArrayInit(16, sizeof(FstTransition));
if (bn->trans == NULL) {
taosMemoryFree(bn);
return NULL;
}
return bn; return bn;
} }
void fstBuilderNodeDestroy(FstBuilderNode* node) { void fstBuilderNodeDestroy(FstBuilderNode* node) {
@ -56,6 +61,8 @@ bool fstBuilderNodeEqual(FstBuilderNode* n1, FstBuilderNode* n2) {
return true; return true;
} }
#if 0
FstBuilderNode* fstBuilderNodeClone(FstBuilderNode* src) { FstBuilderNode* fstBuilderNodeClone(FstBuilderNode* src) {
FstBuilderNode* node = taosMemoryMalloc(sizeof(FstBuilderNode)); FstBuilderNode* node = taosMemoryMalloc(sizeof(FstBuilderNode));
if (node == NULL) { if (node == NULL) {
@ -65,10 +72,18 @@ FstBuilderNode* fstBuilderNodeClone(FstBuilderNode* src) {
// //
size_t sz = taosArrayGetSize(src->trans); size_t sz = taosArrayGetSize(src->trans);
SArray* trans = taosArrayInit(sz, sizeof(FstTransition)); SArray* trans = taosArrayInit(sz, sizeof(FstTransition));
if (trans == NULL) {
taosMemoryFree(node);
return NULL;
}
for (size_t i = 0; i < sz; i++) { for (size_t i = 0; i < sz; i++) {
FstTransition* tran = taosArrayGet(src->trans, i); FstTransition* tran = taosArrayGet(src->trans, i);
(void)taosArrayPush(trans, tran); if (taosArrayPush(trans, tran) != NULL) {
taosArrayDestroy(trans);
taosMemoryFree(node);
return NULL;
}
} }
node->trans = trans; node->trans = trans;
@ -76,10 +91,12 @@ FstBuilderNode* fstBuilderNodeClone(FstBuilderNode* src) {
node->finalOutput = src->finalOutput; node->finalOutput = src->finalOutput;
return node; return node;
} }
#endif
// not destroy src, User's bussiness // not destroy src, User's bussiness
void fstBuilderNodeCloneFrom(FstBuilderNode* dst, FstBuilderNode* src) { int32_t fstBuilderNodeCloneFrom(FstBuilderNode* dst, FstBuilderNode* src) {
if (dst == NULL || src == NULL) { if (dst == NULL || src == NULL) {
return; return TSDB_CODE_INVALID_PARA;
} }
dst->isFinal = src->isFinal; dst->isFinal = src->isFinal;
@ -89,10 +106,18 @@ void fstBuilderNodeCloneFrom(FstBuilderNode* dst, FstBuilderNode* src) {
taosArrayDestroy(dst->trans); taosArrayDestroy(dst->trans);
size_t sz = taosArrayGetSize(src->trans); size_t sz = taosArrayGetSize(src->trans);
dst->trans = taosArrayInit(sz, sizeof(FstTransition)); dst->trans = taosArrayInit(sz, sizeof(FstTransition));
if (dst->trans == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
for (size_t i = 0; i < sz; i++) { for (size_t i = 0; i < sz; i++) {
FstTransition* trn = taosArrayGet(src->trans, i); FstTransition* trn = taosArrayGet(src->trans, i);
(void)taosArrayPush(dst->trans, trn); if (taosArrayPush(dst->trans, trn) == NULL) {
taosArrayDestroy(dst->trans);
dst->trans = NULL;
return TSDB_CODE_OUT_OF_MEMORY;
}
} }
return 0;
} }
// bool fstBuilderNodeCompileTo(FstBuilderNode *b, IdxFile *wrt, CompiledAddr lastAddr, CompiledAddr // bool fstBuilderNodeCompileTo(FstBuilderNode *b, IdxFile *wrt, CompiledAddr lastAddr, CompiledAddr

View File

@ -125,7 +125,7 @@ FstRegistryEntry* fstRegistryGetEntry(FstRegistry* registry, FstBuilderNode* bNo
entry->addr = cell->addr; entry->addr = cell->addr;
return entry; return entry;
} else { } else {
fstBuilderNodeCloneFrom(cell->node, bNode); (void)fstBuilderNodeCloneFrom(cell->node, bNode);
entry->state = NOTFOUND; entry->state = NOTFOUND;
entry->cell = cell; // copy or not entry->cell = cell; // copy or not
} }
@ -145,7 +145,7 @@ FstRegistryEntry* fstRegistryGetEntry(FstRegistry* registry, FstBuilderNode* bNo
return entry; return entry;
} }
// clone from bNode, refactor later // clone from bNode, refactor later
fstBuilderNodeCloneFrom(cell2->node, bNode); (void)fstBuilderNodeCloneFrom(cell2->node, bNode);
fstRegistryCellSwap(registry->table, start, start + 1); fstRegistryCellSwap(registry->table, start, start + 1);
FstRegistryCell* cCell = taosArrayGet(registry->table, start); FstRegistryCell* cCell = taosArrayGet(registry->table, start);
@ -166,7 +166,7 @@ FstRegistryEntry* fstRegistryGetEntry(FstRegistry* registry, FstBuilderNode* bNo
uint64_t last = end - 1; uint64_t last = end - 1;
FstRegistryCell* cell = (FstRegistryCell*)taosArrayGet(registry->table, last); FstRegistryCell* cell = (FstRegistryCell*)taosArrayGet(registry->table, last);
// clone from bNode, refactor later // clone from bNode, refactor later
fstBuilderNodeCloneFrom(cell->node, bNode); (void)fstBuilderNodeCloneFrom(cell->node, bNode);
fstRegistryCellPromote(registry->table, last, start); fstRegistryCellPromote(registry->table, last, start);
FstRegistryCell* cCell = taosArrayGet(registry->table, start); FstRegistryCell* cCell = taosArrayGet(registry->table, start);

View File

@ -49,7 +49,7 @@ static int tfileReaderLoadFst(TFileReader* reader);
static int tfileReaderVerify(TFileReader* reader); static int tfileReaderVerify(TFileReader* reader);
static int tfileReaderLoadTableIds(TFileReader* reader, int32_t offset, SArray* result); static int tfileReaderLoadTableIds(TFileReader* reader, int32_t offset, SArray* result);
static SArray* tfileGetFileList(const char* path); static int32_t tfileGetFileList(const char* path, SArray** pResult);
static int tfileRmExpireFile(SArray* result); static int tfileRmExpireFile(SArray* result);
static void tfileDestroyFileName(void* elem); static void tfileDestroyFileName(void* elem);
static int tfileCompare(const void* a, const void* b); static int tfileCompare(const void* a, const void* b);
@ -99,7 +99,8 @@ TFileCache* tfileCacheCreate(SIndex* idx, const char* path) {
tcache->tableCache = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); tcache->tableCache = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
tcache->capacity = 64; tcache->capacity = 64;
SArray* files = tfileGetFileList(path); SArray* files = NULL;
int32_t code = tfileGetFileList(path, &files);
for (size_t i = 0; i < taosArrayGetSize(files); i++) { for (size_t i = 0; i < taosArrayGetSize(files); i++) {
char* file = taosArrayGetP(files, i); char* file = taosArrayGetP(files, i);
@ -232,7 +233,7 @@ void tfileReaderDestroy(TFileReader* reader) {
} }
static int32_t tfSearchTerm(void* reader, SIndexTerm* tem, SIdxTRslt* tr) { static int32_t tfSearchTerm(void* reader, SIndexTerm* tem, SIdxTRslt* tr) {
int ret = 0; int32_t ret = 0;
char* p = tem->colVal; char* p = tem->colVal;
uint64_t sz = tem->nColVal; uint64_t sz = tem->nColVal;
@ -246,6 +247,11 @@ static int32_t tfSearchTerm(void* reader, SIndexTerm* tem, SIdxTRslt* tr) {
tem->suid, tem->colName, tem->colVal, cost); tem->suid, tem->colName, tem->colVal, cost);
ret = tfileReaderLoadTableIds((TFileReader*)reader, (int32_t)offset, tr->total); ret = tfileReaderLoadTableIds((TFileReader*)reader, (int32_t)offset, tr->total);
if (ret != 0) {
fstSliceDestroy(&key);
indexError("faile to search since %s", tstrerror(ret));
return ret;
}
cost = taosGetTimestampUs() - et; cost = taosGetTimestampUs() - et;
indexInfo("index: %" PRIu64 ", col: %s, colVal: %s, load all table info, time cost: %" PRIu64 "us", tem->suid, indexInfo("index: %" PRIu64 ", col: %s, colVal: %s, load all table info, time cost: %" PRIu64 "us", tem->suid,
tem->colName, tem->colVal, cost); tem->colName, tem->colVal, cost);
@ -255,6 +261,8 @@ static int32_t tfSearchTerm(void* reader, SIndexTerm* tem, SIdxTRslt* tr) {
} }
static int32_t tfSearchPrefix(void* reader, SIndexTerm* tem, SIdxTRslt* tr) { static int32_t tfSearchPrefix(void* reader, SIndexTerm* tem, SIdxTRslt* tr) {
int32_t lino = 0;
int32_t code = 0;
char* p = tem->colVal; char* p = tem->colVal;
uint64_t sz = tem->nColVal; uint64_t sz = tem->nColVal;
@ -265,7 +273,9 @@ static int32_t tfSearchPrefix(void* reader, SIndexTerm* tem, SIdxTRslt* tr) {
FStmSt* st = stmBuilderIntoStm(sb); FStmSt* st = stmBuilderIntoStm(sb);
FStmStRslt* rt = NULL; FStmStRslt* rt = NULL;
while ((rt = stmStNextWith(st, NULL)) != NULL) { while ((rt = stmStNextWith(st, NULL)) != NULL) {
(void)taosArrayPush(offsets, &(rt->out.out)); if (taosArrayPush(offsets, &(rt->out.out)) == NULL) {
TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _exception);
}
swsResultDestroy(rt); swsResultDestroy(rt);
} }
stmStDestroy(st); stmStDestroy(st);
@ -275,14 +285,16 @@ static int32_t tfSearchPrefix(void* reader, SIndexTerm* tem, SIdxTRslt* tr) {
for (int i = 0; i < taosArrayGetSize(offsets); i++) { for (int i = 0; i < taosArrayGetSize(offsets); i++) {
uint64_t offset = *(uint64_t*)taosArrayGet(offsets, i); uint64_t offset = *(uint64_t*)taosArrayGet(offsets, i);
ret = tfileReaderLoadTableIds((TFileReader*)reader, offset, tr->total); ret = tfileReaderLoadTableIds((TFileReader*)reader, offset, tr->total);
if (ret != 0) { TAOS_CHECK_GOTO(ret, &lino, _exception);
taosArrayDestroy(offsets);
indexError("failed to find target tablelist");
return TSDB_CODE_FILE_CORRUPTED;
}
} }
taosArrayDestroy(offsets); taosArrayDestroy(offsets);
return 0; return 0;
_exception:
stmStDestroy(st);
stmBuilderDestroy(sb);
taosArrayDestroy(offsets);
indexError("failed to searchPrefix since %s, lino:%d", tstrerror(code), lino);
return code;
} }
static int32_t tfSearchSuffix(void* reader, SIndexTerm* tem, SIdxTRslt* tr) { static int32_t tfSearchSuffix(void* reader, SIndexTerm* tem, SIdxTRslt* tr) {
int ret = 0; int ret = 0;
@ -393,6 +405,12 @@ static int32_t tfSearchTerm_JSON(void* reader, SIndexTerm* tem, SIdxTRslt* tr) {
tem->suid, tem->colName, tem->colVal, cost); tem->suid, tem->colName, tem->colVal, cost);
ret = tfileReaderLoadTableIds((TFileReader*)reader, offset, tr->total); ret = tfileReaderLoadTableIds((TFileReader*)reader, offset, tr->total);
if (ret != 0) {
indexError("failed to search json since %s", tstrerror(ret));
taosMemoryFree(p);
fstSliceDestroy(&key);
return ret;
}
cost = taosGetTimestampUs() - et; cost = taosGetTimestampUs() - et;
indexInfo("index: %" PRIu64 ", col: %s, colVal: %s, load all table info, offset: %" PRIu64 indexInfo("index: %" PRIu64 ", col: %s, colVal: %s, load all table info, offset: %" PRIu64
", size: %d, time cost: %" PRIu64 "us", ", size: %d, time cost: %" PRIu64 "us",
@ -863,14 +881,24 @@ TFileValue* tfileValueCreate(char* val) {
return NULL; return NULL;
} }
tf->colVal = taosStrdup(val); tf->colVal = taosStrdup(val);
if (tf->colVal == NULL) {
taosMemoryFree(tf);
}
tf->tableId = taosArrayInit(32, sizeof(uint64_t)); tf->tableId = taosArrayInit(32, sizeof(uint64_t));
if (tf->tableId == NULL) {
taosMemoryFree(tf->colVal);
taosMemoryFree(tf);
return NULL;
}
return tf; return tf;
} }
int tfileValuePush(TFileValue* tf, uint64_t val) { int32_t tfileValuePush(TFileValue* tf, uint64_t val) {
if (tf == NULL) { if (tf == NULL) {
return -1; return TSDB_CODE_INVALID_PARA;
}
if (taosArrayPush(tf->tableId, &val) == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
} }
(void)taosArrayPush(tf->tableId, &val);
return 0; return 0;
} }
void tfileValueDestroy(TFileValue* tf) { void tfileValueDestroy(TFileValue* tf) {
@ -986,8 +1014,10 @@ static int tfileReaderLoadFst(TFileReader* reader) {
return reader->fst != NULL ? 0 : -1; return reader->fst != NULL ? 0 : -1;
} }
static int tfileReaderLoadTableIds(TFileReader* reader, int32_t offset, SArray* result) { static int32_t tfileReaderLoadTableIds(TFileReader* reader, int32_t offset, SArray* result) {
// TODO(yihao): opt later // TODO(yihao): opt later
int32_t code = 0;
int32_t lino = 0;
IFileCtx* ctx = reader->ctx; IFileCtx* ctx = reader->ctx;
// add block cache // add block cache
char block[4096] = {0}; char block[4096] = {0};
@ -1003,7 +1033,9 @@ static int tfileReaderLoadTableIds(TFileReader* reader, int32_t offset, SArray*
while (nid > 0) { while (nid > 0) {
int32_t left = block + sizeof(block) - p; int32_t left = block + sizeof(block) - p;
if (left >= sizeof(uint64_t)) { if (left >= sizeof(uint64_t)) {
(void)taosArrayPush(result, (uint64_t*)p); if (taosArrayPush(result, (uint64_t*)p) == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
p += sizeof(uint64_t); p += sizeof(uint64_t);
} else { } else {
char buf[sizeof(uint64_t)] = {0}; char buf[sizeof(uint64_t)] = {0};
@ -1014,7 +1046,9 @@ static int tfileReaderLoadTableIds(TFileReader* reader, int32_t offset, SArray*
nread = ctx->readFrom(ctx, (uint8_t*)block, sizeof(block), offset); nread = ctx->readFrom(ctx, (uint8_t*)block, sizeof(block), offset);
memcpy(buf + left, block, sizeof(uint64_t) - left); memcpy(buf + left, block, sizeof(uint64_t) - left);
(void)taosArrayPush(result, (uint64_t*)buf); if (taosArrayPush(result, (uint64_t*)buf) == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
p = block + sizeof(uint64_t) - left; p = block + sizeof(uint64_t) - left;
} }
nid -= 1; nid -= 1;
@ -1059,16 +1093,19 @@ void tfileReaderUnRef(TFileReader* rd) {
} }
} }
static SArray* tfileGetFileList(const char* path) { static int32_t tfileGetFileList(const char* path, SArray** ppResult) {
int32_t code = 0;
char buf[128] = {0}; char buf[128] = {0};
uint64_t suid; uint64_t suid;
int64_t version; int64_t version;
SArray* files = taosArrayInit(4, sizeof(void*)); SArray* files = taosArrayInit(4, sizeof(void*));
if (files == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
TdDirPtr pDir = taosOpenDir(path); TdDirPtr pDir = taosOpenDir(path);
if (NULL == pDir) { if (NULL == pDir) {
taosArrayDestroy(files); TAOS_CHECK_GOTO(TAOS_SYSTEM_ERROR(errno), NULL, _exception);
return NULL;
} }
TdDirEntryPtr pDirEntry; TdDirEntryPtr pDirEntry;
while ((pDirEntry = taosReadDir(pDir)) != NULL) { while ((pDirEntry = taosReadDir(pDir)) != NULL) {
@ -1079,15 +1116,29 @@ static SArray* tfileGetFileList(const char* path) {
size_t len = strlen(path) + 1 + strlen(file) + 1; size_t len = strlen(path) + 1 + strlen(file) + 1;
char* buf = taosMemoryCalloc(1, len); char* buf = taosMemoryCalloc(1, len);
if (buf == NULL) {
TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _exception);
}
sprintf(buf, "%s/%s", path, file); sprintf(buf, "%s/%s", path, file);
(void)taosArrayPush(files, &buf); if (taosArrayPush(files, &buf) == NULL) {
TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _exception);
}
} }
(void)taosCloseDir(&pDir); (void)taosCloseDir(&pDir);
taosArraySort(files, tfileCompare); taosArraySort(files, tfileCompare);
(void)tfileRmExpireFile(files); (void)tfileRmExpireFile(files);
*ppResult = files;
return 0;
return files; _exception:
(void)taosCloseDir(&pDir);
if (files != NULL) {
taosArrayDestroyEx(files, tfileDestroyFileName);
taosArrayDestroy(files);
}
return code;
} }
static int tfileRmExpireFile(SArray* result) { static int tfileRmExpireFile(SArray* result) {
// TODO(yihao): remove expire tindex after restart // TODO(yihao): remove expire tindex after restart

View File

@ -36,12 +36,16 @@ static FORCE_INLINE int iBinarySearch(SArray *arr, int s, int e, uint64_t k) {
return s; return s;
} }
void iIntersection(SArray *in, SArray *out) { int32_t iIntersection(SArray *in, SArray *out) {
int32_t code = 0;
int32_t sz = (int32_t)taosArrayGetSize(in); int32_t sz = (int32_t)taosArrayGetSize(in);
if (sz <= 0) { if (sz <= 0) {
return; return 0;
} }
MergeIndex *mi = taosMemoryCalloc(sz, sizeof(MergeIndex)); MergeIndex *mi = taosMemoryCalloc(sz, sizeof(MergeIndex));
if (mi == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
for (int i = 0; i < sz; i++) { for (int i = 0; i < sz; i++) {
SArray *t = taosArrayGetP(in, i); SArray *t = taosArrayGetP(in, i);
mi[i].len = (int32_t)taosArrayGetSize(t); mi[i].len = (int32_t)taosArrayGetSize(t);
@ -64,19 +68,25 @@ void iIntersection(SArray *in, SArray *out) {
} }
} }
if (has == true) { if (has == true) {
(void)taosArrayPush(out, &tgt); if (taosArrayPush(out, &tgt) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
break;
}
} }
} }
taosMemoryFreeClear(mi); taosMemoryFreeClear(mi);
return code;
} }
void iUnion(SArray *in, SArray *out) { int32_t iUnion(SArray *in, SArray *out) {
int32_t code = 0;
int32_t sz = (int32_t)taosArrayGetSize(in); int32_t sz = (int32_t)taosArrayGetSize(in);
if (sz <= 0) { if (sz <= 0) {
return; return 0;
} }
if (sz == 1) { if (sz == 1) {
(void)taosArrayAddAll(out, taosArrayGetP(in, 0)); if (taosArrayAddAll(out, taosArrayGetP(in, 0)) == NULL) {
return; return TSDB_CODE_OUT_OF_MEMORY;
}
} }
MergeIndex *mi = taosMemoryCalloc(sz, sizeof(MergeIndex)); MergeIndex *mi = taosMemoryCalloc(sz, sizeof(MergeIndex));
@ -108,19 +118,23 @@ void iUnion(SArray *in, SArray *out) {
continue; continue;
} }
} }
(void)taosArrayPush(out, &mVal); if (taosArrayPush(out, &mVal) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
break;
}
} else { } else {
break; break;
} }
} }
taosMemoryFreeClear(mi); taosMemoryFreeClear(mi);
return 0;
} }
void iExcept(SArray *total, SArray *except) { int32_t iExcept(SArray *total, SArray *except) {
int32_t tsz = (int32_t)taosArrayGetSize(total); int32_t tsz = (int32_t)taosArrayGetSize(total);
int32_t esz = (int32_t)taosArrayGetSize(except); int32_t esz = (int32_t)taosArrayGetSize(except);
if (esz == 0 || tsz == 0) { if (esz == 0 || tsz == 0) {
return; return 0;
} }
int vIdx = 0; int vIdx = 0;
@ -135,6 +149,7 @@ void iExcept(SArray *total, SArray *except) {
} }
taosArrayPopTailBatch(total, tsz - vIdx); taosArrayPopTailBatch(total, tsz - vIdx);
return 0;
} }
int uidCompare(const void *a, const void *b) { int uidCompare(const void *a, const void *b) {
@ -191,7 +206,7 @@ void idxTRsltDestroy(SIdxTRslt *tr) {
taosArrayDestroy(tr->del); taosArrayDestroy(tr->del);
taosMemoryFree(tr); taosMemoryFree(tr);
} }
void idxTRsltMergeTo(SIdxTRslt *tr, SArray *result) { int32_t idxTRsltMergeTo(SIdxTRslt *tr, SArray *result) {
taosArraySort(tr->total, uidCompare); taosArraySort(tr->total, uidCompare);
taosArraySort(tr->add, uidCompare); taosArraySort(tr->add, uidCompare);
taosArraySort(tr->del, uidCompare); taosArraySort(tr->del, uidCompare);
@ -201,10 +216,22 @@ void idxTRsltMergeTo(SIdxTRslt *tr, SArray *result) {
(void)taosArrayAddAll(result, t); (void)taosArrayAddAll(result, t);
} else { } else {
SArray *arrs = taosArrayInit(2, sizeof(void *)); SArray *arrs = taosArrayInit(2, sizeof(void *));
(void)taosArrayPush(arrs, &tr->total); if (arrs == NULL) {
(void)taosArrayPush(arrs, &tr->add); return TSDB_CODE_OUT_OF_MEMORY;
}
if (taosArrayPush(arrs, &tr->total) == NULL) {
taosArrayDestroy(arrs);
return TSDB_CODE_OUT_OF_MEMORY;
}
if (taosArrayPush(arrs, &tr->add) == NULL) {
taosArrayDestroy(arrs);
return TSDB_CODE_OUT_OF_MEMORY;
}
iUnion(arrs, result); iUnion(arrs, result);
taosArrayDestroy(arrs); taosArrayDestroy(arrs);
} }
iExcept(result, tr->del); iExcept(result, tr->del);
return 0;
} }