commit
4256e6516e
|
@ -44,7 +44,7 @@ FstBuilderNode* fstBuilderNodeDefault();
|
||||||
|
|
||||||
FstBuilderNode* fstBuilderNodeClone(FstBuilderNode* src);
|
FstBuilderNode* fstBuilderNodeClone(FstBuilderNode* src);
|
||||||
|
|
||||||
void fstBuilderNodeCloneFrom(FstBuilderNode* dst, FstBuilderNode* src);
|
int32_t fstBuilderNodeCloneFrom(FstBuilderNode* dst, FstBuilderNode* src);
|
||||||
|
|
||||||
// bool fstBuilderNodeCompileTo(FstBuilderNode *b, IdxFile' *wrt,
|
// bool fstBuilderNodeCompileTo(FstBuilderNode *b, IdxFile' *wrt,
|
||||||
// CompiledAddr lastAddr, CompiledAddr startAddr);
|
// CompiledAddr lastAddr, CompiledAddr startAddr);
|
||||||
|
|
|
@ -18,7 +18,7 @@
|
||||||
#include "indexFst.h"
|
#include "indexFst.h"
|
||||||
#include "indexFstFile.h"
|
#include "indexFstFile.h"
|
||||||
#include "indexInt.h"
|
#include "indexInt.h"
|
||||||
#include "indexTfile.h"
|
//#include "indexTfile.h"
|
||||||
#include "indexUtil.h"
|
#include "indexUtil.h"
|
||||||
#include "tlockfree.h"
|
#include "tlockfree.h"
|
||||||
|
|
||||||
|
@ -129,8 +129,8 @@ void tfileIteratorDestroy(Iterate* iterator);
|
||||||
|
|
||||||
TFileValue* tfileValueCreate(char* val);
|
TFileValue* tfileValueCreate(char* val);
|
||||||
|
|
||||||
int tfileValuePush(TFileValue* tf, uint64_t val);
|
int32_t tfileValuePush(TFileValue* tf, uint64_t val);
|
||||||
void tfileValueDestroy(TFileValue* tf);
|
void tfileValueDestroy(TFileValue* tf);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
|
|
|
@ -46,17 +46,17 @@ extern "C" {
|
||||||
buf += len; \
|
buf += len; \
|
||||||
} while (0)
|
} while (0)
|
||||||
|
|
||||||
#define INDEX_MERGE_ADD_DEL(src, dst, tgt) \
|
#define INDEX_MERGE_ADD_DEL(src, dst, tgt) \
|
||||||
{ \
|
{ \
|
||||||
bool f = false; \
|
bool f = false; \
|
||||||
for (int i = 0; i < taosArrayGetSize(src); i++) { \
|
for (int i = 0; i < taosArrayGetSize(src); i++) { \
|
||||||
if (*(uint64_t *)taosArrayGet(src, i) == tgt) { \
|
if (*(uint64_t *)taosArrayGet(src, i) == tgt) { \
|
||||||
f = true; \
|
f = true; \
|
||||||
} \
|
} \
|
||||||
} \
|
} \
|
||||||
if (f == false) { \
|
if (f == false) { \
|
||||||
(void)taosArrayPush(dst, &tgt); \
|
if (taosArrayPush(dst, &tgt) == NULL) code = TSDB_CODE_OUT_OF_MEMORY; \
|
||||||
} \
|
} \
|
||||||
}
|
}
|
||||||
|
|
||||||
/* multi sorted result intersection
|
/* multi sorted result intersection
|
||||||
|
@ -65,7 +65,7 @@ extern "C" {
|
||||||
* [1, 4, 5]
|
* [1, 4, 5]
|
||||||
* output:[4, 5]
|
* output:[4, 5]
|
||||||
*/
|
*/
|
||||||
void iIntersection(SArray *in, SArray *out);
|
int32_t iIntersection(SArray *in, SArray *out);
|
||||||
|
|
||||||
/* multi sorted result union
|
/* multi sorted result union
|
||||||
* input: [1, 2, 4, 5]
|
* input: [1, 2, 4, 5]
|
||||||
|
@ -73,7 +73,7 @@ void iIntersection(SArray *in, SArray *out);
|
||||||
* [1, 4, 5]
|
* [1, 4, 5]
|
||||||
* output:[1, 2, 3, 4, 5]
|
* output:[1, 2, 3, 4, 5]
|
||||||
*/
|
*/
|
||||||
void iUnion(SArray *in, SArray *out);
|
int32_t iUnion(SArray *in, SArray *out);
|
||||||
|
|
||||||
/* see example
|
/* see example
|
||||||
* total: [1, 2, 4, 5, 7, 8]
|
* total: [1, 2, 4, 5, 7, 8]
|
||||||
|
@ -81,7 +81,7 @@ void iUnion(SArray *in, SArray *out);
|
||||||
* return: [1, 2, 7, 8] saved in total
|
* return: [1, 2, 7, 8] saved in total
|
||||||
*/
|
*/
|
||||||
|
|
||||||
void iExcept(SArray *total, SArray *except);
|
int32_t iExcept(SArray *total, SArray *except);
|
||||||
|
|
||||||
int uidCompare(const void *a, const void *b);
|
int uidCompare(const void *a, const void *b);
|
||||||
|
|
||||||
|
@ -107,7 +107,7 @@ void idxTRsltClear(SIdxTRslt *tr);
|
||||||
|
|
||||||
void idxTRsltDestroy(SIdxTRslt *tr);
|
void idxTRsltDestroy(SIdxTRslt *tr);
|
||||||
|
|
||||||
void idxTRsltMergeTo(SIdxTRslt *tr, SArray *out);
|
int32_t idxTRsltMergeTo(SIdxTRslt *tr, SArray *out);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
|
|
|
@ -92,7 +92,7 @@ static int32_t idxMergeFinalResults(SArray* in, EIndexOperatorType oType, SArray
|
||||||
static int32_t idxGenTFile(SIndex* index, IndexCache* cache, SArray* batch);
|
static int32_t idxGenTFile(SIndex* index, IndexCache* cache, SArray* batch);
|
||||||
|
|
||||||
// merge cache and tfile by opera type
|
// merge cache and tfile by opera type
|
||||||
static void idxMergeCacheAndTFile(SArray* result, IterateValue* icache, IterateValue* iTfv, SIdxTRslt* helper);
|
static int32_t idxMergeCacheAndTFile(SArray* result, IterateValue* icache, IterateValue* iTfv, SIdxTRslt* helper);
|
||||||
|
|
||||||
// static int32_t indexSerialTermKey(SIndexTerm* itm, char* buf);
|
// static int32_t indexSerialTermKey(SIndexTerm* itm, char* buf);
|
||||||
// int32_t indexSerialKey(ICacheKey* key, char* buf);
|
// int32_t indexSerialKey(ICacheKey* key, char* buf);
|
||||||
|
@ -212,6 +212,7 @@ void idxReleaseRef(int64_t ref) {
|
||||||
|
|
||||||
int32_t indexPut(SIndex* index, SIndexMultiTerm* fVals, uint64_t uid) {
|
int32_t indexPut(SIndex* index, SIndexMultiTerm* fVals, uint64_t uid) {
|
||||||
// TODO(yihao): reduce the lock range
|
// TODO(yihao): reduce the lock range
|
||||||
|
int32_t code = 0;
|
||||||
(void)taosThreadMutexLock(&index->mtx);
|
(void)taosThreadMutexLock(&index->mtx);
|
||||||
for (int i = 0; i < taosArrayGetSize(fVals); i++) {
|
for (int i = 0; i < taosArrayGetSize(fVals); i++) {
|
||||||
SIndexTerm* p = taosArrayGetP(fVals, i);
|
SIndexTerm* p = taosArrayGetP(fVals, i);
|
||||||
|
@ -223,11 +224,19 @@ int32_t indexPut(SIndex* index, SIndexMultiTerm* fVals, uint64_t uid) {
|
||||||
IndexCache** cache = taosHashGet(index->colObj, buf, sz);
|
IndexCache** cache = taosHashGet(index->colObj, buf, sz);
|
||||||
if (cache == NULL) {
|
if (cache == NULL) {
|
||||||
IndexCache* pCache = idxCacheCreate(index, p->suid, p->colName, p->colType);
|
IndexCache* pCache = idxCacheCreate(index, p->suid, p->colName, p->colType);
|
||||||
(void)taosHashPut(index->colObj, buf, sz, &pCache, sizeof(void*));
|
code = taosHashPut(index->colObj, buf, sz, &pCache, sizeof(void*));
|
||||||
|
if (code != 0) {
|
||||||
|
idxCacheDestroy(pCache);
|
||||||
|
break;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
(void)taosThreadMutexUnlock(&index->mtx);
|
(void)taosThreadMutexUnlock(&index->mtx);
|
||||||
|
|
||||||
|
if (code != 0) {
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
for (int i = 0; i < taosArrayGetSize(fVals); i++) {
|
for (int i = 0; i < taosArrayGetSize(fVals); i++) {
|
||||||
SIndexTerm* p = taosArrayGetP(fVals, i);
|
SIndexTerm* p = taosArrayGetP(fVals, i);
|
||||||
|
|
||||||
|
@ -247,15 +256,27 @@ int32_t indexPut(SIndex* index, SIndexMultiTerm* fVals, uint64_t uid) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
int32_t indexSearch(SIndex* index, SIndexMultiTermQuery* multiQuerys, SArray* result) {
|
int32_t indexSearch(SIndex* index, SIndexMultiTermQuery* multiQuerys, SArray* result) {
|
||||||
|
int32_t code = 0;
|
||||||
EIndexOperatorType opera = multiQuerys->opera; // relation of querys
|
EIndexOperatorType opera = multiQuerys->opera; // relation of querys
|
||||||
|
|
||||||
SArray* iRslts = taosArrayInit(4, POINTER_BYTES);
|
SArray* iRslts = taosArrayInit(4, POINTER_BYTES);
|
||||||
int nQuery = taosArrayGetSize(multiQuerys->query);
|
if (iRslts == NULL) {
|
||||||
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
|
||||||
|
int nQuery = taosArrayGetSize(multiQuerys->query);
|
||||||
for (size_t i = 0; i < nQuery; i++) {
|
for (size_t i = 0; i < nQuery; i++) {
|
||||||
SIndexTermQuery* qterm = taosArrayGet(multiQuerys->query, i);
|
SIndexTermQuery* qterm = taosArrayGet(multiQuerys->query, i);
|
||||||
SArray* trslt = NULL;
|
SArray* trslt = NULL;
|
||||||
(void)idxTermSearch(index, qterm, &trslt);
|
code = idxTermSearch(index, qterm, &trslt);
|
||||||
(void)taosArrayPush(iRslts, (void*)&trslt);
|
if (code != 0) {
|
||||||
|
idxInterRsltDestroy(iRslts);
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
if (taosArrayPush(iRslts, (void*)&trslt) == NULL) {
|
||||||
|
idxInterRsltDestroy(iRslts);
|
||||||
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
(void)idxMergeFinalResults(iRslts, opera, result);
|
(void)idxMergeFinalResults(iRslts, opera, result);
|
||||||
idxInterRsltDestroy(iRslts);
|
idxInterRsltDestroy(iRslts);
|
||||||
|
@ -267,6 +288,9 @@ int indexDelete(SIndex* index, SIndexMultiTermQuery* query) { return 1; }
|
||||||
|
|
||||||
SIndexOpts* indexOptsCreate(int32_t cacheSize) {
|
SIndexOpts* indexOptsCreate(int32_t cacheSize) {
|
||||||
SIndexOpts* opts = taosMemoryCalloc(1, sizeof(SIndexOpts));
|
SIndexOpts* opts = taosMemoryCalloc(1, sizeof(SIndexOpts));
|
||||||
|
if (opts == NULL) {
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
opts->cacheSize = cacheSize;
|
opts->cacheSize = cacheSize;
|
||||||
return opts;
|
return opts;
|
||||||
}
|
}
|
||||||
|
@ -295,7 +319,7 @@ void indexMultiTermQueryDestroy(SIndexMultiTermQuery* pQuery) {
|
||||||
int32_t indexMultiTermQueryAdd(SIndexMultiTermQuery* pQuery, SIndexTerm* term, EIndexQueryType qType) {
|
int32_t indexMultiTermQueryAdd(SIndexMultiTermQuery* pQuery, SIndexTerm* term, EIndexQueryType qType) {
|
||||||
SIndexTermQuery q = {.qType = qType, .term = term};
|
SIndexTermQuery q = {.qType = qType, .term = term};
|
||||||
if (taosArrayPush(pQuery->query, &q) == NULL) {
|
if (taosArrayPush(pQuery->query, &q) == NULL) {
|
||||||
return terrno;
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
}
|
}
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -362,7 +386,9 @@ void indexTermDestroy(SIndexTerm* p) {
|
||||||
SIndexMultiTerm* indexMultiTermCreate() { return taosArrayInit(4, sizeof(SIndexTerm*)); }
|
SIndexMultiTerm* indexMultiTermCreate() { return taosArrayInit(4, sizeof(SIndexTerm*)); }
|
||||||
|
|
||||||
int32_t indexMultiTermAdd(SIndexMultiTerm* terms, SIndexTerm* term) {
|
int32_t indexMultiTermAdd(SIndexMultiTerm* terms, SIndexTerm* term) {
|
||||||
(void)taosArrayPush(terms, &term);
|
if (taosArrayPush(terms, &term) == NULL) {
|
||||||
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
void indexMultiTermDestroy(SIndexMultiTerm* terms) {
|
void indexMultiTermDestroy(SIndexMultiTerm* terms) {
|
||||||
|
@ -422,6 +448,7 @@ bool indexJsonIsRebuild(SIndexJson* idx) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t idxTermSearch(SIndex* sIdx, SIndexTermQuery* query, SArray** result) {
|
static int32_t idxTermSearch(SIndex* sIdx, SIndexTermQuery* query, SArray** result) {
|
||||||
|
int32_t code = 0;
|
||||||
SIndexTerm* term = query->term;
|
SIndexTerm* term = query->term;
|
||||||
const char* colName = term->colName;
|
const char* colName = term->colName;
|
||||||
int32_t nColName = term->nColName;
|
int32_t nColName = term->nColName;
|
||||||
|
@ -452,6 +479,10 @@ static int32_t idxTermSearch(SIndex* sIdx, SIndexTermQuery* query, SArray** resu
|
||||||
int64_t st = taosGetTimestampUs();
|
int64_t st = taosGetTimestampUs();
|
||||||
|
|
||||||
SIdxTRslt* tr = idxTRsltCreate();
|
SIdxTRslt* tr = idxTRsltCreate();
|
||||||
|
if (tr == NULL) {
|
||||||
|
TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, END);
|
||||||
|
}
|
||||||
|
|
||||||
if (0 == idxCacheSearch(cache, query, tr, &s)) {
|
if (0 == idxCacheSearch(cache, query, tr, &s)) {
|
||||||
if (s == kTypeDeletion) {
|
if (s == kTypeDeletion) {
|
||||||
indexInfo("col: %s already drop by", term->colName);
|
indexInfo("col: %s already drop by", term->colName);
|
||||||
|
@ -473,13 +504,14 @@ static int32_t idxTermSearch(SIndex* sIdx, SIndexTermQuery* query, SArray** resu
|
||||||
int64_t cost = taosGetTimestampUs() - st;
|
int64_t cost = taosGetTimestampUs() - st;
|
||||||
indexInfo("search cost: %" PRIu64 "us", cost);
|
indexInfo("search cost: %" PRIu64 "us", cost);
|
||||||
|
|
||||||
idxTRsltMergeTo(tr, *result);
|
code = idxTRsltMergeTo(tr, *result);
|
||||||
|
TAOS_CHECK_GOTO(code, NULL, END);
|
||||||
|
|
||||||
idxTRsltDestroy(tr);
|
idxTRsltDestroy(tr);
|
||||||
return 0;
|
return 0;
|
||||||
END:
|
END:
|
||||||
idxTRsltDestroy(tr);
|
idxTRsltDestroy(tr);
|
||||||
return 0;
|
return code;
|
||||||
}
|
}
|
||||||
static void idxInterRsltDestroy(SArray* results) {
|
static void idxInterRsltDestroy(SArray* results) {
|
||||||
if (results == NULL) {
|
if (results == NULL) {
|
||||||
|
@ -503,9 +535,9 @@ static int32_t idxMergeFinalResults(SArray* in, EIndexOperatorType oType, SArray
|
||||||
}
|
}
|
||||||
|
|
||||||
if (oType == MUST) {
|
if (oType == MUST) {
|
||||||
iIntersection(in, out);
|
return iIntersection(in, out);
|
||||||
} else if (oType == SHOULD) {
|
} else if (oType == SHOULD) {
|
||||||
iUnion(in, out);
|
return iUnion(in, out);
|
||||||
} else if (oType == NOT) {
|
} else if (oType == NOT) {
|
||||||
// just one column index, enhance later
|
// just one column index, enhance later
|
||||||
// taosArrayAddAll(fResults, interResults);
|
// taosArrayAddAll(fResults, interResults);
|
||||||
|
@ -514,30 +546,53 @@ static int32_t idxMergeFinalResults(SArray* in, EIndexOperatorType oType, SArray
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void idxMayMergeTempToFinalRslt(SArray* result, TFileValue* tfv, SIdxTRslt* tr) {
|
static int32_t idxMayMergeTempToFinalRslt(SArray* result, TFileValue* tfv, SIdxTRslt* tr) {
|
||||||
|
int32_t code = 0;
|
||||||
int32_t sz = taosArrayGetSize(result);
|
int32_t sz = taosArrayGetSize(result);
|
||||||
if (sz > 0) {
|
if (sz > 0) {
|
||||||
TFileValue* lv = taosArrayGetP(result, sz - 1);
|
TFileValue* lv = taosArrayGetP(result, sz - 1);
|
||||||
if (tfv != NULL && strcmp(lv->colVal, tfv->colVal) != 0) {
|
if (tfv != NULL && strcmp(lv->colVal, tfv->colVal) != 0) {
|
||||||
idxTRsltMergeTo(tr, lv->tableId);
|
code = idxTRsltMergeTo(tr, lv->tableId);
|
||||||
|
if (code != 0) {
|
||||||
|
indexFatal("failed to merge result since %s", tstrerror(code));
|
||||||
|
return code;
|
||||||
|
}
|
||||||
idxTRsltClear(tr);
|
idxTRsltClear(tr);
|
||||||
|
|
||||||
(void)taosArrayPush(result, &tfv);
|
if (taosArrayPush(result, &tfv) == NULL) {
|
||||||
|
indexFatal("failed to merge result since %s", tstrerror(TSDB_CODE_OUT_OF_MEMORY));
|
||||||
|
}
|
||||||
} else if (tfv == NULL) {
|
} else if (tfv == NULL) {
|
||||||
// handle last iterator
|
// handle last iterator
|
||||||
idxTRsltMergeTo(tr, lv->tableId);
|
code = idxTRsltMergeTo(tr, lv->tableId);
|
||||||
|
if (code != 0) {
|
||||||
|
indexFatal("failed to merge result since %s", tstrerror(code));
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
tfileValueDestroy(tfv);
|
tfileValueDestroy(tfv);
|
||||||
|
return 0;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
(void)taosArrayPush(result, &tfv);
|
if (taosArrayPush(result, &tfv) == NULL) {
|
||||||
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
return code;
|
||||||
}
|
}
|
||||||
static void idxMergeCacheAndTFile(SArray* result, IterateValue* cv, IterateValue* tv, SIdxTRslt* tr) {
|
static int32_t idxMergeCacheAndTFile(SArray* result, IterateValue* cv, IterateValue* tv, SIdxTRslt* tr) {
|
||||||
|
int32_t code = 0;
|
||||||
char* colVal = (cv != NULL) ? cv->colVal : tv->colVal;
|
char* colVal = (cv != NULL) ? cv->colVal : tv->colVal;
|
||||||
TFileValue* tfv = tfileValueCreate(colVal);
|
TFileValue* tfv = tfileValueCreate(colVal);
|
||||||
|
if (tfv == NULL) {
|
||||||
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
|
||||||
idxMayMergeTempToFinalRslt(result, tfv, tr);
|
code = idxMayMergeTempToFinalRslt(result, tfv, tr);
|
||||||
|
if (code != 0) {
|
||||||
|
tfileValueDestroy(tfv);
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
tfv = NULL;
|
||||||
|
|
||||||
if (cv != NULL) {
|
if (cv != NULL) {
|
||||||
uint64_t id = *(uint64_t*)taosArrayGet(cv->val, 0);
|
uint64_t id = *(uint64_t*)taosArrayGet(cv->val, 0);
|
||||||
|
@ -549,8 +604,11 @@ static void idxMergeCacheAndTFile(SArray* result, IterateValue* cv, IterateValue
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (tv != NULL) {
|
if (tv != NULL) {
|
||||||
(void)taosArrayAddAll(tr->total, tv->val);
|
if (taosArrayAddAll(tr->total, tv->val) == NULL) {
|
||||||
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
return 0;
|
||||||
}
|
}
|
||||||
static void idxDestroyFinalRslt(SArray* result) {
|
static void idxDestroyFinalRslt(SArray* result) {
|
||||||
int32_t sz = result ? taosArrayGetSize(result) : 0;
|
int32_t sz = result ? taosArrayGetSize(result) : 0;
|
||||||
|
@ -562,6 +620,7 @@ static void idxDestroyFinalRslt(SArray* result) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t idxFlushCacheToTFile(SIndex* sIdx, void* cache, bool quit) {
|
int32_t idxFlushCacheToTFile(SIndex* sIdx, void* cache, bool quit) {
|
||||||
|
int32_t code = 0;
|
||||||
if (sIdx == NULL) {
|
if (sIdx == NULL) {
|
||||||
return TSDB_CODE_INVALID_PTR;
|
return TSDB_CODE_INVALID_PTR;
|
||||||
}
|
}
|
||||||
|
@ -598,12 +657,16 @@ int32_t idxFlushCacheToTFile(SIndex* sIdx, void* cache, bool quit) {
|
||||||
}
|
}
|
||||||
|
|
||||||
SArray* result = taosArrayInit(1024, sizeof(void*));
|
SArray* result = taosArrayInit(1024, sizeof(void*));
|
||||||
|
if (result == NULL) {
|
||||||
|
TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _exception);
|
||||||
|
}
|
||||||
|
|
||||||
bool cn = cacheIter ? cacheIter->next(cacheIter) : false;
|
bool cn = cacheIter ? cacheIter->next(cacheIter) : false;
|
||||||
bool tn = tfileIter ? tfileIter->next(tfileIter) : false;
|
bool tn = tfileIter ? tfileIter->next(tfileIter) : false;
|
||||||
|
|
||||||
SIdxTRslt* tr = idxTRsltCreate();
|
SIdxTRslt* tr = idxTRsltCreate();
|
||||||
if (tr == NULL) {
|
if (tr == NULL) {
|
||||||
|
TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _exception);
|
||||||
}
|
}
|
||||||
while (cn == true || tn == true) {
|
while (cn == true || tn == true) {
|
||||||
IterateValue* cv = (cn == true) ? cacheIter->getValue(cacheIter) : NULL;
|
IterateValue* cv = (cn == true) ? cacheIter->getValue(cacheIter) : NULL;
|
||||||
|
@ -618,27 +681,42 @@ int32_t idxFlushCacheToTFile(SIndex* sIdx, void* cache, bool quit) {
|
||||||
comp = 1;
|
comp = 1;
|
||||||
}
|
}
|
||||||
if (comp == 0) {
|
if (comp == 0) {
|
||||||
idxMergeCacheAndTFile(result, cv, tv, tr);
|
code = idxMergeCacheAndTFile(result, cv, tv, tr);
|
||||||
|
if (code != 0) {
|
||||||
|
TAOS_CHECK_GOTO(code, NULL, _exception);
|
||||||
|
}
|
||||||
|
|
||||||
cn = cacheIter->next(cacheIter);
|
cn = cacheIter->next(cacheIter);
|
||||||
tn = tfileIter->next(tfileIter);
|
tn = tfileIter->next(tfileIter);
|
||||||
} else if (comp < 0) {
|
} else if (comp < 0) {
|
||||||
idxMergeCacheAndTFile(result, cv, NULL, tr);
|
code = idxMergeCacheAndTFile(result, cv, NULL, tr);
|
||||||
|
if (code != 0) {
|
||||||
|
TAOS_CHECK_GOTO(code, NULL, _exception);
|
||||||
|
}
|
||||||
cn = cacheIter->next(cacheIter);
|
cn = cacheIter->next(cacheIter);
|
||||||
} else {
|
} else {
|
||||||
idxMergeCacheAndTFile(result, NULL, tv, tr);
|
code = idxMergeCacheAndTFile(result, NULL, tv, tr);
|
||||||
|
if (code != 0) {
|
||||||
|
TAOS_CHECK_GOTO(code, NULL, _exception);
|
||||||
|
}
|
||||||
tn = tfileIter->next(tfileIter);
|
tn = tfileIter->next(tfileIter);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
idxMayMergeTempToFinalRslt(result, NULL, tr);
|
if ((code = idxMayMergeTempToFinalRslt(result, NULL, tr)) != 0) {
|
||||||
|
idxTRsltDestroy(tr);
|
||||||
|
TAOS_CHECK_GOTO(code, NULL, _exception);
|
||||||
|
}
|
||||||
idxTRsltDestroy(tr);
|
idxTRsltDestroy(tr);
|
||||||
|
|
||||||
int ret = idxGenTFile(sIdx, pCache, result);
|
code = idxGenTFile(sIdx, pCache, result);
|
||||||
if (ret != 0) {
|
if (code != 0) {
|
||||||
indexError("failed to merge");
|
indexError("failed to merge since %s", tstrerror(code));
|
||||||
} else {
|
} else {
|
||||||
int64_t cost = taosGetTimestampUs() - st;
|
int64_t cost = taosGetTimestampUs() - st;
|
||||||
indexInfo("success to merge , time cost: %" PRId64 "ms", cost / 1000);
|
indexInfo("success to merge , time cost: %" PRId64 "ms", cost / 1000);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
_exception:
|
||||||
idxDestroyFinalRslt(result);
|
idxDestroyFinalRslt(result);
|
||||||
|
|
||||||
idxCacheDestroyImm(pCache);
|
idxCacheDestroyImm(pCache);
|
||||||
|
@ -654,8 +732,11 @@ int32_t idxFlushCacheToTFile(SIndex* sIdx, void* cache, bool quit) {
|
||||||
idxPost(sIdx);
|
idxPost(sIdx);
|
||||||
}
|
}
|
||||||
idxReleaseRef(sIdx->refId);
|
idxReleaseRef(sIdx->refId);
|
||||||
|
if (code != 0) {
|
||||||
|
indexError("failed to merge since %s", tstrerror(code));
|
||||||
|
}
|
||||||
|
|
||||||
return ret;
|
return code;
|
||||||
}
|
}
|
||||||
void iterateValueDestroy(IterateValue* value, bool destroy) {
|
void iterateValueDestroy(IterateValue* value, bool destroy) {
|
||||||
if (destroy) {
|
if (destroy) {
|
||||||
|
|
|
@ -75,6 +75,7 @@ static int32_t cacheSearchTerm(void* cache, SIndexTerm* term, SIdxTRslt* tr, STe
|
||||||
if (cache == NULL) {
|
if (cache == NULL) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
int32_t code = 0;
|
||||||
MemTable* mem = cache;
|
MemTable* mem = cache;
|
||||||
IndexCache* pCache = mem->pCache;
|
IndexCache* pCache = mem->pCache;
|
||||||
|
|
||||||
|
@ -98,6 +99,10 @@ static int32_t cacheSearchTerm(void* cache, SIndexTerm* term, SIdxTRslt* tr, STe
|
||||||
} else if (c->operaType == DEL_VALUE) {
|
} else if (c->operaType == DEL_VALUE) {
|
||||||
INDEX_MERGE_ADD_DEL(tr->add, tr->del, c->uid)
|
INDEX_MERGE_ADD_DEL(tr->add, tr->del, c->uid)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (code != 0) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
@ -105,7 +110,7 @@ static int32_t cacheSearchTerm(void* cache, SIndexTerm* term, SIdxTRslt* tr, STe
|
||||||
|
|
||||||
taosMemoryFree(pCt);
|
taosMemoryFree(pCt);
|
||||||
(void)tSkipListDestroyIter(iter);
|
(void)tSkipListDestroyIter(iter);
|
||||||
return 0;
|
return code;
|
||||||
}
|
}
|
||||||
static int32_t cacheSearchPrefix(void* cache, SIndexTerm* term, SIdxTRslt* tr, STermValueType* s) {
|
static int32_t cacheSearchPrefix(void* cache, SIndexTerm* term, SIdxTRslt* tr, STermValueType* s) {
|
||||||
// impl later
|
// impl later
|
||||||
|
@ -123,6 +128,7 @@ static int32_t cacheSearchCompareFunc(void* cache, SIndexTerm* term, SIdxTRslt*
|
||||||
if (cache == NULL) {
|
if (cache == NULL) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
MemTable* mem = cache;
|
MemTable* mem = cache;
|
||||||
IndexCache* pCache = mem->pCache;
|
IndexCache* pCache = mem->pCache;
|
||||||
|
@ -148,7 +154,7 @@ static int32_t cacheSearchCompareFunc(void* cache, SIndexTerm* term, SIdxTRslt*
|
||||||
}
|
}
|
||||||
CacheTerm* c = (CacheTerm*)SL_GET_NODE_DATA(node);
|
CacheTerm* c = (CacheTerm*)SL_GET_NODE_DATA(node);
|
||||||
terrno = TSDB_CODE_SUCCESS;
|
terrno = TSDB_CODE_SUCCESS;
|
||||||
TExeCond cond = cmpFn(c->colVal, pCt->colVal, pCt->colType);
|
TExeCond cond = cmpFn(c->colVal, pCt->colVal, pCt->colType);
|
||||||
if (terrno != TSDB_CODE_SUCCESS) {
|
if (terrno != TSDB_CODE_SUCCESS) {
|
||||||
code = terrno;
|
code = terrno;
|
||||||
goto _return;
|
goto _return;
|
||||||
|
@ -156,11 +162,14 @@ static int32_t cacheSearchCompareFunc(void* cache, SIndexTerm* term, SIdxTRslt*
|
||||||
if (cond == MATCH) {
|
if (cond == MATCH) {
|
||||||
if (c->operaType == ADD_VALUE) {
|
if (c->operaType == ADD_VALUE) {
|
||||||
INDEX_MERGE_ADD_DEL(tr->del, tr->add, c->uid)
|
INDEX_MERGE_ADD_DEL(tr->del, tr->add, c->uid)
|
||||||
// taosArrayPush(result, &c->uid);
|
|
||||||
*s = kTypeValue;
|
*s = kTypeValue;
|
||||||
} else if (c->operaType == DEL_VALUE) {
|
} else if (c->operaType == DEL_VALUE) {
|
||||||
INDEX_MERGE_ADD_DEL(tr->add, tr->del, c->uid)
|
INDEX_MERGE_ADD_DEL(tr->add, tr->del, c->uid)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (code != 0) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
} else if (cond == CONTINUE) {
|
} else if (cond == CONTINUE) {
|
||||||
continue;
|
continue;
|
||||||
} else if (cond == BREAK) {
|
} else if (cond == BREAK) {
|
||||||
|
@ -190,6 +199,8 @@ static int32_t cacheSearchTerm_JSON(void* cache, SIndexTerm* term, SIdxTRslt* tr
|
||||||
if (cache == NULL) {
|
if (cache == NULL) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t code = 0;
|
||||||
MemTable* mem = cache;
|
MemTable* mem = cache;
|
||||||
IndexCache* pCache = mem->pCache;
|
IndexCache* pCache = mem->pCache;
|
||||||
|
|
||||||
|
@ -223,6 +234,10 @@ static int32_t cacheSearchTerm_JSON(void* cache, SIndexTerm* term, SIdxTRslt* tr
|
||||||
} else if (c->operaType == DEL_VALUE) {
|
} else if (c->operaType == DEL_VALUE) {
|
||||||
INDEX_MERGE_ADD_DEL(tr->add, tr->del, c->uid)
|
INDEX_MERGE_ADD_DEL(tr->add, tr->del, c->uid)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (code != 0) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
@ -231,9 +246,7 @@ static int32_t cacheSearchTerm_JSON(void* cache, SIndexTerm* term, SIdxTRslt* tr
|
||||||
taosMemoryFree(pCt);
|
taosMemoryFree(pCt);
|
||||||
taosMemoryFree(exBuf);
|
taosMemoryFree(exBuf);
|
||||||
(void)tSkipListDestroyIter(iter);
|
(void)tSkipListDestroyIter(iter);
|
||||||
return 0;
|
return code;
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
|
||||||
}
|
}
|
||||||
static int32_t cacheSearchSuffix_JSON(void* cache, SIndexTerm* term, SIdxTRslt* tr, STermValueType* s) {
|
static int32_t cacheSearchSuffix_JSON(void* cache, SIndexTerm* term, SIdxTRslt* tr, STermValueType* s) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
@ -338,6 +351,10 @@ static int32_t cacheSearchCompareFunc_JSON(void* cache, SIndexTerm* term, SIdxTR
|
||||||
} else if (c->operaType == DEL_VALUE) {
|
} else if (c->operaType == DEL_VALUE) {
|
||||||
INDEX_MERGE_ADD_DEL(tr->add, tr->del, c->uid)
|
INDEX_MERGE_ADD_DEL(tr->add, tr->del, c->uid)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (code != 0) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
} else if (cond == CONTINUE) {
|
} else if (cond == CONTINUE) {
|
||||||
continue;
|
continue;
|
||||||
} else if (cond == BREAK) {
|
} else if (cond == BREAK) {
|
||||||
|
@ -800,7 +817,13 @@ static bool idxCacheIteratorNext(Iterate* itera) {
|
||||||
iv->type = ct->operaType;
|
iv->type = ct->operaType;
|
||||||
iv->ver = ct->version;
|
iv->ver = ct->version;
|
||||||
iv->colVal = taosStrdup(ct->colVal);
|
iv->colVal = taosStrdup(ct->colVal);
|
||||||
(void)taosArrayPush(iv->val, &ct->uid);
|
if (iv->colVal == NULL) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
if (taosArrayPush(iv->val, &ct->uid) == NULL) {
|
||||||
|
taosMemoryFree(iv->colVal);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return next;
|
return next;
|
||||||
}
|
}
|
||||||
|
|
|
@ -62,7 +62,9 @@ void fstUnFinishedNodesPushEmpty(FstUnFinishedNodes* nodes, bool isFinal) {
|
||||||
node->trans = taosArrayInit(16, sizeof(FstTransition));
|
node->trans = taosArrayInit(16, sizeof(FstTransition));
|
||||||
|
|
||||||
FstBuilderNodeUnfinished un = {.node = node, .last = NULL};
|
FstBuilderNodeUnfinished un = {.node = node, .last = NULL};
|
||||||
(void)taosArrayPush(nodes->stack, &un);
|
if (taosArrayPush(nodes->stack, &un) == NULL) {
|
||||||
|
fstBuilderNodeDestroy(node);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
FstBuilderNode* fstUnFinishedNodesPopRoot(FstUnFinishedNodes* nodes) {
|
FstBuilderNode* fstUnFinishedNodesPopRoot(FstUnFinishedNodes* nodes) {
|
||||||
FstBuilderNodeUnfinished* un = taosArrayPop(nodes->stack);
|
FstBuilderNodeUnfinished* un = taosArrayPop(nodes->stack);
|
||||||
|
@ -120,7 +122,10 @@ void fstUnFinishedNodesAddSuffix(FstUnFinishedNodes* nodes, FstSlice bs, Output
|
||||||
FstLastTransition* trn = fstLastTransitionCreate(data[i], 0);
|
FstLastTransition* trn = fstLastTransitionCreate(data[i], 0);
|
||||||
|
|
||||||
FstBuilderNodeUnfinished un = {.node = n, .last = trn};
|
FstBuilderNodeUnfinished un = {.node = n, .last = trn};
|
||||||
(void)taosArrayPush(nodes->stack, &un);
|
if (taosArrayPush(nodes->stack, &un) == NULL) {
|
||||||
|
fstBuilderNodeDestroy(n);
|
||||||
|
taosMemoryFree(trn);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
fstUnFinishedNodesPushEmpty(nodes, true);
|
fstUnFinishedNodesPushEmpty(nodes, true);
|
||||||
}
|
}
|
||||||
|
@ -892,7 +897,9 @@ void fstBuilderNodeUnfinishedLastCompiled(FstBuilderNodeUnfinished* unNode, Comp
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
FstTransition t = {.inp = trn->inp, .out = trn->out, .addr = addr};
|
FstTransition t = {.inp = trn->inp, .out = trn->out, .addr = addr};
|
||||||
(void)taosArrayPush(unNode->node->trans, &t);
|
if (taosArrayPush(unNode->node->trans, &t) == NULL) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
fstLastTransitionDestroy(trn);
|
fstLastTransitionDestroy(trn);
|
||||||
unNode->last = NULL;
|
unNode->last = NULL;
|
||||||
return;
|
return;
|
||||||
|
@ -997,7 +1004,12 @@ bool fstGet(Fst* fst, FstSlice* b, Output* out) {
|
||||||
uint8_t* data = fstSliceData(b, &len);
|
uint8_t* data = fstSliceData(b, &len);
|
||||||
|
|
||||||
SArray* nodes = (SArray*)taosArrayInit(len, sizeof(FstNode*));
|
SArray* nodes = (SArray*)taosArrayInit(len, sizeof(FstNode*));
|
||||||
(void)taosArrayPush(nodes, &root);
|
if (nodes == NULL) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
if (taosArrayPush(nodes, &root) == NULL) {
|
||||||
|
goto _return;
|
||||||
|
}
|
||||||
for (uint32_t i = 0; i < len; i++) {
|
for (uint32_t i = 0; i < len; i++) {
|
||||||
uint8_t inp = data[i];
|
uint8_t inp = data[i];
|
||||||
Output res = 0;
|
Output res = 0;
|
||||||
|
@ -1009,7 +1021,9 @@ bool fstGet(Fst* fst, FstSlice* b, Output* out) {
|
||||||
(void)fstNodeGetTransitionAt(root, res, &trn);
|
(void)fstNodeGetTransitionAt(root, res, &trn);
|
||||||
tOut += trn.out;
|
tOut += trn.out;
|
||||||
root = fstGetNode(fst, trn.addr);
|
root = fstGetNode(fst, trn.addr);
|
||||||
(void)taosArrayPush(nodes, &root);
|
if (taosArrayPush(nodes, &root) == NULL) {
|
||||||
|
goto _return;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
if (!FST_NODE_IS_FINAL(root)) {
|
if (!FST_NODE_IS_FINAL(root)) {
|
||||||
goto _return;
|
goto _return;
|
||||||
|
@ -1156,7 +1170,9 @@ bool stmStSeekMin(FStmSt* sws, FstBoundWithData* min) {
|
||||||
.trans = 0,
|
.trans = 0,
|
||||||
.out = {.null = false, .out = 0},
|
.out = {.null = false, .out = 0},
|
||||||
.autState = automFuncs[aut->type].start(aut)}; // auto.start callback
|
.autState = automFuncs[aut->type].start(aut)}; // auto.start callback
|
||||||
(void)taosArrayPush(sws->stack, &s);
|
if (taosArrayPush(sws->stack, &s) == NULL) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
FstSlice* key = NULL;
|
FstSlice* key = NULL;
|
||||||
|
@ -1185,12 +1201,16 @@ bool stmStSeekMin(FStmSt* sws, FstBoundWithData* min) {
|
||||||
(void)fstNodeGetTransitionAt(node, res, &trn);
|
(void)fstNodeGetTransitionAt(node, res, &trn);
|
||||||
void* preState = autState;
|
void* preState = autState;
|
||||||
autState = automFuncs[aut->type].accept(aut, preState, b);
|
autState = automFuncs[aut->type].accept(aut, preState, b);
|
||||||
(void)taosArrayPush(sws->inp, &b);
|
if (taosArrayPush(sws->inp, &b) == NULL) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
FstStreamState s = {.node = node, .trans = res + 1, .out = {.null = false, .out = out}, .autState = preState};
|
FstStreamState s = {.node = node, .trans = res + 1, .out = {.null = false, .out = out}, .autState = preState};
|
||||||
node = NULL;
|
node = NULL;
|
||||||
|
|
||||||
(void)taosArrayPush(sws->stack, &s);
|
if (taosArrayPush(sws->stack, &s) == NULL) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
out += trn.out;
|
out += trn.out;
|
||||||
node = fstGetNode(sws->fst, trn.addr);
|
node = fstGetNode(sws->fst, trn.addr);
|
||||||
} else {
|
} else {
|
||||||
|
@ -1209,7 +1229,9 @@ bool stmStSeekMin(FStmSt* sws, FstBoundWithData* min) {
|
||||||
}
|
}
|
||||||
|
|
||||||
FstStreamState s = {.node = node, .trans = i, .out = {.null = false, .out = out}, .autState = autState};
|
FstStreamState s = {.node = node, .trans = i, .out = {.null = false, .out = out}, .autState = autState};
|
||||||
(void)taosArrayPush(sws->stack, &s);
|
if (taosArrayPush(sws->stack, &s) == NULL) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
taosMemoryFree(trans);
|
taosMemoryFree(trans);
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
@ -1230,7 +1252,9 @@ bool stmStSeekMin(FStmSt* sws, FstBoundWithData* min) {
|
||||||
(void)fstNodeGetTransitionAt(n, trans - 1, &trn);
|
(void)fstNodeGetTransitionAt(n, trans - 1, &trn);
|
||||||
FstStreamState s = {
|
FstStreamState s = {
|
||||||
.node = fstGetNode(sws->fst, trn.addr), .trans = 0, .out = {.null = false, .out = out}, .autState = autState};
|
.node = fstGetNode(sws->fst, trn.addr), .trans = 0, .out = {.null = false, .out = out}, .autState = autState};
|
||||||
(void)taosArrayPush(sws->stack, &s);
|
if (taosArrayPush(sws->stack, &s) == NULL) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
return false;
|
return false;
|
||||||
|
@ -1273,8 +1297,14 @@ FStmStRslt* stmStNextWith(FStmSt* sws, streamCallback__fn callback) {
|
||||||
bool isMatch = automFuncs[aut->type].isMatch(aut, nextState);
|
bool isMatch = automFuncs[aut->type].isMatch(aut, nextState);
|
||||||
|
|
||||||
FstNode* nextNode = fstGetNode(sws->fst, trn.addr);
|
FstNode* nextNode = fstGetNode(sws->fst, trn.addr);
|
||||||
(void)taosArrayPush(nodes, &nextNode);
|
if (taosArrayPush(nodes, &nextNode) == NULL) {
|
||||||
(void)taosArrayPush(sws->inp, &(trn.inp));
|
taosArrayDestroy(nodes);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
if (taosArrayPush(sws->inp, &(trn.inp)) == NULL) {
|
||||||
|
taosArrayDestroy(nodes);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
if (FST_NODE_IS_FINAL(nextNode)) {
|
if (FST_NODE_IS_FINAL(nextNode)) {
|
||||||
void* eofState = automFuncs[aut->type].acceptEof(aut, nextState);
|
void* eofState = automFuncs[aut->type].acceptEof(aut, nextState);
|
||||||
|
@ -1283,10 +1313,16 @@ FStmStRslt* stmStNextWith(FStmSt* sws, streamCallback__fn callback) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
FstStreamState s1 = {.node = p->node, .trans = p->trans + 1, .out = p->out, .autState = p->autState};
|
FstStreamState s1 = {.node = p->node, .trans = p->trans + 1, .out = p->out, .autState = p->autState};
|
||||||
(void)taosArrayPush(sws->stack, &s1);
|
if (taosArrayPush(sws->stack, &s1) == NULL) {
|
||||||
|
taosArrayDestroy(nodes);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
FstStreamState s2 = {.node = nextNode, .trans = 0, .out = {.null = false, .out = out}, .autState = nextState};
|
FstStreamState s2 = {.node = nextNode, .trans = 0, .out = {.null = false, .out = out}, .autState = nextState};
|
||||||
(void)taosArrayPush(sws->stack, &s2);
|
if (taosArrayPush(sws->stack, &s2) == NULL) {
|
||||||
|
taosArrayDestroy(nodes);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t isz = taosArrayGetSize(sws->inp);
|
int32_t isz = taosArrayGetSize(sws->inp);
|
||||||
uint8_t* buf = (uint8_t*)taosMemoryMalloc(isz * sizeof(uint8_t));
|
uint8_t* buf = (uint8_t*)taosMemoryMalloc(isz * sizeof(uint8_t));
|
||||||
|
@ -1357,11 +1393,18 @@ FStmBuilder* stmBuilderCreate(Fst* fst, FAutoCtx* aut) {
|
||||||
b->aut = aut;
|
b->aut = aut;
|
||||||
b->min = fstBoundStateCreate(Unbounded, NULL);
|
b->min = fstBoundStateCreate(Unbounded, NULL);
|
||||||
b->max = fstBoundStateCreate(Unbounded, NULL);
|
b->max = fstBoundStateCreate(Unbounded, NULL);
|
||||||
|
|
||||||
|
if (b->min == NULL || b->max == NULL) {
|
||||||
|
stmBuilderDestroy(b);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
return b;
|
return b;
|
||||||
}
|
}
|
||||||
void stmBuilderDestroy(FStmBuilder* b) {
|
void stmBuilderDestroy(FStmBuilder* b) {
|
||||||
fstSliceDestroy(&b->min->data);
|
if (b->min) fstSliceDestroy(&b->min->data);
|
||||||
fstSliceDestroy(&b->max->data);
|
if (b->max) fstSliceDestroy(&b->max->data);
|
||||||
|
|
||||||
taosMemoryFreeClear(b->min);
|
taosMemoryFreeClear(b->min);
|
||||||
taosMemoryFreeClear(b->max);
|
taosMemoryFreeClear(b->max);
|
||||||
taosMemoryFree(b);
|
taosMemoryFree(b);
|
||||||
|
|
|
@ -14,6 +14,7 @@
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#include "indexFstDfa.h"
|
#include "indexFstDfa.h"
|
||||||
|
#include "indexInt.h"
|
||||||
#include "thash.h"
|
#include "thash.h"
|
||||||
|
|
||||||
const static uint32_t STATE_LIMIT = 1000;
|
const static uint32_t STATE_LIMIT = 1000;
|
||||||
|
@ -68,23 +69,41 @@ FstDfa *dfaBuilderBuild(FstDfaBuilder *builder) {
|
||||||
uint32_t sz = taosArrayGetSize(builder->dfa->insts);
|
uint32_t sz = taosArrayGetSize(builder->dfa->insts);
|
||||||
FstSparseSet *cur = sparSetCreate(sz);
|
FstSparseSet *cur = sparSetCreate(sz);
|
||||||
FstSparseSet *nxt = sparSetCreate(sz);
|
FstSparseSet *nxt = sparSetCreate(sz);
|
||||||
|
if (cur == NULL || nxt == NULL) {
|
||||||
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
dfaAdd(builder->dfa, cur, 0);
|
dfaAdd(builder->dfa, cur, 0);
|
||||||
|
|
||||||
uint32_t result;
|
uint32_t result;
|
||||||
SArray *states = taosArrayInit(0, sizeof(uint32_t));
|
SArray *states = taosArrayInit(0, sizeof(uint32_t));
|
||||||
|
if (states == NULL) {
|
||||||
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
if (dfaBuilderCacheState(builder, cur, &result)) {
|
if (dfaBuilderCacheState(builder, cur, &result)) {
|
||||||
(void)taosArrayPush(states, &result);
|
if (taosArrayPush(states, &result) == NULL) {
|
||||||
|
goto _exception;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
SHashObj *seen = taosHashInit(12, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
|
SHashObj *seen = taosHashInit(12, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
|
||||||
|
if (seen == NULL) {
|
||||||
|
goto _exception;
|
||||||
|
}
|
||||||
|
|
||||||
while (taosArrayGetSize(states) != 0) {
|
while (taosArrayGetSize(states) != 0) {
|
||||||
result = *(uint32_t *)taosArrayPop(states);
|
result = *(uint32_t *)taosArrayPop(states);
|
||||||
for (int i = 0; i < 256; i++) {
|
for (int i = 0; i < 256; i++) {
|
||||||
uint32_t ns, dummpy = 0;
|
uint32_t ns, dummpy = 0;
|
||||||
if (dfaBuilderRunState(builder, cur, nxt, result, i, &ns)) {
|
if (dfaBuilderRunState(builder, cur, nxt, result, i, &ns)) {
|
||||||
if (taosHashGet(seen, &ns, sizeof(ns)) == NULL) {
|
if (taosHashGet(seen, &ns, sizeof(ns)) == NULL) {
|
||||||
(void)taosHashPut(seen, &ns, sizeof(ns), &dummpy, sizeof(dummpy));
|
if (taosHashPut(seen, &ns, sizeof(ns), &dummpy, sizeof(dummpy)) != 0) {
|
||||||
(void)taosArrayPush(states, &ns);
|
goto _exception;
|
||||||
|
}
|
||||||
|
if (taosArrayPush(states, &ns) == NULL) {
|
||||||
|
goto _exception;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (taosArrayGetSize(builder->dfa->states) > STATE_LIMIT) {
|
if (taosArrayGetSize(builder->dfa->states) > STATE_LIMIT) {
|
||||||
|
@ -96,6 +115,11 @@ FstDfa *dfaBuilderBuild(FstDfaBuilder *builder) {
|
||||||
taosArrayDestroy(states);
|
taosArrayDestroy(states);
|
||||||
taosHashCleanup(seen);
|
taosHashCleanup(seen);
|
||||||
return builder->dfa;
|
return builder->dfa;
|
||||||
|
_exception:
|
||||||
|
taosArrayDestroy(states);
|
||||||
|
taosHashCleanup(seen);
|
||||||
|
indexError("failed to build dfa since %s", tstrerror(terrno));
|
||||||
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool dfaBuilderRunState(FstDfaBuilder *builder, FstSparseSet *cur, FstSparseSet *next, uint32_t state, uint8_t byte,
|
bool dfaBuilderRunState(FstDfaBuilder *builder, FstSparseSet *cur, FstSparseSet *next, uint32_t state, uint8_t byte,
|
||||||
|
@ -122,8 +146,13 @@ bool dfaBuilderRunState(FstDfaBuilder *builder, FstSparseSet *cur, FstSparseSet
|
||||||
}
|
}
|
||||||
|
|
||||||
bool dfaBuilderCacheState(FstDfaBuilder *builder, FstSparseSet *set, uint32_t *result) {
|
bool dfaBuilderCacheState(FstDfaBuilder *builder, FstSparseSet *set, uint32_t *result) {
|
||||||
|
int32_t code = 0;
|
||||||
SArray *tinsts = taosArrayInit(4, sizeof(uint32_t));
|
SArray *tinsts = taosArrayInit(4, sizeof(uint32_t));
|
||||||
bool isMatch = false;
|
if (tinsts == NULL) {
|
||||||
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
goto _exception;
|
||||||
|
}
|
||||||
|
bool isMatch = false;
|
||||||
|
|
||||||
for (int i = 0; i < sparSetLen(set); i++) {
|
for (int i = 0; i < sparSetLen(set); i++) {
|
||||||
int32_t ip;
|
int32_t ip;
|
||||||
|
@ -133,10 +162,16 @@ bool dfaBuilderCacheState(FstDfaBuilder *builder, FstSparseSet *set, uint32_t *r
|
||||||
if (inst->ty == JUMP || inst->ty == SPLIT) {
|
if (inst->ty == JUMP || inst->ty == SPLIT) {
|
||||||
continue;
|
continue;
|
||||||
} else if (inst->ty == RANGE) {
|
} else if (inst->ty == RANGE) {
|
||||||
(void)taosArrayPush(tinsts, &ip);
|
if (taosArrayPush(tinsts, &ip) == NULL) {
|
||||||
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
goto _exception;
|
||||||
|
}
|
||||||
} else if (inst->ty == MATCH) {
|
} else if (inst->ty == MATCH) {
|
||||||
isMatch = true;
|
isMatch = true;
|
||||||
(void)taosArrayPush(tinsts, &ip);
|
if (taosArrayPush(tinsts, &ip) == NULL) {
|
||||||
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
goto _exception;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (taosArrayGetSize(tinsts) == 0) {
|
if (taosArrayGetSize(tinsts) == 0) {
|
||||||
|
@ -149,13 +184,23 @@ bool dfaBuilderCacheState(FstDfaBuilder *builder, FstSparseSet *set, uint32_t *r
|
||||||
taosArrayDestroy(tinsts);
|
taosArrayDestroy(tinsts);
|
||||||
} else {
|
} else {
|
||||||
DfaState st = {.insts = tinsts, .isMatch = isMatch};
|
DfaState st = {.insts = tinsts, .isMatch = isMatch};
|
||||||
(void)taosArrayPush(builder->dfa->states, &st);
|
if (taosArrayPush(builder->dfa->states, &st) == NULL) {
|
||||||
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
goto _exception;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t sz = taosArrayGetSize(builder->dfa->states) - 1;
|
int32_t sz = taosArrayGetSize(builder->dfa->states) - 1;
|
||||||
(void)taosHashPut(builder->cache, &tinsts, sizeof(POINTER_BYTES), &sz, sizeof(sz));
|
if ((code = taosHashPut(builder->cache, &tinsts, sizeof(POINTER_BYTES), &sz, sizeof(sz))) != 0) {
|
||||||
|
goto _exception;
|
||||||
|
}
|
||||||
|
|
||||||
*result = sz;
|
*result = sz;
|
||||||
}
|
}
|
||||||
return true;
|
return true;
|
||||||
|
_exception:
|
||||||
|
indexError("failed to create dfa state, code:%d", code);
|
||||||
|
taosArrayDestroy(tinsts);
|
||||||
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
FstDfa *dfaCreate(SArray *insts, SArray *states) {
|
FstDfa *dfaCreate(SArray *insts, SArray *states) {
|
||||||
|
|
|
@ -16,9 +16,14 @@
|
||||||
|
|
||||||
FstBuilderNode* fstBuilderNodeDefault() {
|
FstBuilderNode* fstBuilderNodeDefault() {
|
||||||
FstBuilderNode* bn = taosMemoryMalloc(sizeof(FstBuilderNode));
|
FstBuilderNode* bn = taosMemoryMalloc(sizeof(FstBuilderNode));
|
||||||
|
if (bn == NULL) return NULL;
|
||||||
bn->isFinal = false;
|
bn->isFinal = false;
|
||||||
bn->finalOutput = 0;
|
bn->finalOutput = 0;
|
||||||
bn->trans = taosArrayInit(16, sizeof(FstTransition));
|
bn->trans = taosArrayInit(16, sizeof(FstTransition));
|
||||||
|
if (bn->trans == NULL) {
|
||||||
|
taosMemoryFree(bn);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
return bn;
|
return bn;
|
||||||
}
|
}
|
||||||
void fstBuilderNodeDestroy(FstBuilderNode* node) {
|
void fstBuilderNodeDestroy(FstBuilderNode* node) {
|
||||||
|
@ -56,30 +61,11 @@ bool fstBuilderNodeEqual(FstBuilderNode* n1, FstBuilderNode* n2) {
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
FstBuilderNode* fstBuilderNodeClone(FstBuilderNode* src) {
|
|
||||||
FstBuilderNode* node = taosMemoryMalloc(sizeof(FstBuilderNode));
|
|
||||||
if (node == NULL) {
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
//
|
|
||||||
size_t sz = taosArrayGetSize(src->trans);
|
|
||||||
SArray* trans = taosArrayInit(sz, sizeof(FstTransition));
|
|
||||||
|
|
||||||
for (size_t i = 0; i < sz; i++) {
|
|
||||||
FstTransition* tran = taosArrayGet(src->trans, i);
|
|
||||||
(void)taosArrayPush(trans, tran);
|
|
||||||
}
|
|
||||||
|
|
||||||
node->trans = trans;
|
|
||||||
node->isFinal = src->isFinal;
|
|
||||||
node->finalOutput = src->finalOutput;
|
|
||||||
return node;
|
|
||||||
}
|
|
||||||
// not destroy src, User's bussiness
|
// not destroy src, User's bussiness
|
||||||
void fstBuilderNodeCloneFrom(FstBuilderNode* dst, FstBuilderNode* src) {
|
int32_t fstBuilderNodeCloneFrom(FstBuilderNode* dst, FstBuilderNode* src) {
|
||||||
if (dst == NULL || src == NULL) {
|
if (dst == NULL || src == NULL) {
|
||||||
return;
|
return TSDB_CODE_INVALID_PARA;
|
||||||
}
|
}
|
||||||
|
|
||||||
dst->isFinal = src->isFinal;
|
dst->isFinal = src->isFinal;
|
||||||
|
@ -89,10 +75,18 @@ void fstBuilderNodeCloneFrom(FstBuilderNode* dst, FstBuilderNode* src) {
|
||||||
taosArrayDestroy(dst->trans);
|
taosArrayDestroy(dst->trans);
|
||||||
size_t sz = taosArrayGetSize(src->trans);
|
size_t sz = taosArrayGetSize(src->trans);
|
||||||
dst->trans = taosArrayInit(sz, sizeof(FstTransition));
|
dst->trans = taosArrayInit(sz, sizeof(FstTransition));
|
||||||
|
if (dst->trans == NULL) {
|
||||||
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
for (size_t i = 0; i < sz; i++) {
|
for (size_t i = 0; i < sz; i++) {
|
||||||
FstTransition* trn = taosArrayGet(src->trans, i);
|
FstTransition* trn = taosArrayGet(src->trans, i);
|
||||||
(void)taosArrayPush(dst->trans, trn);
|
if (taosArrayPush(dst->trans, trn) == NULL) {
|
||||||
|
taosArrayDestroy(dst->trans);
|
||||||
|
dst->trans = NULL;
|
||||||
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
// bool fstBuilderNodeCompileTo(FstBuilderNode *b, IdxFile *wrt, CompiledAddr lastAddr, CompiledAddr
|
// bool fstBuilderNodeCompileTo(FstBuilderNode *b, IdxFile *wrt, CompiledAddr lastAddr, CompiledAddr
|
||||||
|
|
|
@ -39,7 +39,12 @@ FstRegex *regexCreate(const char *str) {
|
||||||
|
|
||||||
for (int i = 0; i < strlen(str); i++) {
|
for (int i = 0; i < strlen(str); i++) {
|
||||||
uint8_t v = str[i];
|
uint8_t v = str[i];
|
||||||
(void)taosArrayPush(insts, &v);
|
if (taosArrayPush(insts, &v) == NULL) {
|
||||||
|
taosArrayDestroy(insts);
|
||||||
|
taosMemoryFree(regex->orig);
|
||||||
|
taosMemoryFree(regex);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
FstDfaBuilder *builder = dfaBuilderCreate(insts);
|
FstDfaBuilder *builder = dfaBuilderCreate(insts);
|
||||||
regex->dfa = dfaBuilderBuild(builder);
|
regex->dfa = dfaBuilderBuild(builder);
|
||||||
|
|
|
@ -84,7 +84,10 @@ FstRegistry* fstRegistryCreate(uint64_t tableSize, uint64_t mruSize) {
|
||||||
|
|
||||||
for (uint64_t i = 0; i < nCells; i++) {
|
for (uint64_t i = 0; i < nCells; i++) {
|
||||||
FstRegistryCell cell = {.addr = NONE_ADDRESS, .node = fstBuilderNodeDefault()};
|
FstRegistryCell cell = {.addr = NONE_ADDRESS, .node = fstBuilderNodeDefault()};
|
||||||
(void)taosArrayPush(tb, &cell);
|
if (taosArrayPush(tb, &cell) == NULL) {
|
||||||
|
fstRegistryDestroy(registry);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
registry->table = tb;
|
registry->table = tb;
|
||||||
|
@ -125,7 +128,7 @@ FstRegistryEntry* fstRegistryGetEntry(FstRegistry* registry, FstBuilderNode* bNo
|
||||||
entry->addr = cell->addr;
|
entry->addr = cell->addr;
|
||||||
return entry;
|
return entry;
|
||||||
} else {
|
} else {
|
||||||
fstBuilderNodeCloneFrom(cell->node, bNode);
|
(void)fstBuilderNodeCloneFrom(cell->node, bNode);
|
||||||
entry->state = NOTFOUND;
|
entry->state = NOTFOUND;
|
||||||
entry->cell = cell; // copy or not
|
entry->cell = cell; // copy or not
|
||||||
}
|
}
|
||||||
|
@ -145,7 +148,7 @@ FstRegistryEntry* fstRegistryGetEntry(FstRegistry* registry, FstBuilderNode* bNo
|
||||||
return entry;
|
return entry;
|
||||||
}
|
}
|
||||||
// clone from bNode, refactor later
|
// clone from bNode, refactor later
|
||||||
fstBuilderNodeCloneFrom(cell2->node, bNode);
|
(void)fstBuilderNodeCloneFrom(cell2->node, bNode);
|
||||||
|
|
||||||
fstRegistryCellSwap(registry->table, start, start + 1);
|
fstRegistryCellSwap(registry->table, start, start + 1);
|
||||||
FstRegistryCell* cCell = taosArrayGet(registry->table, start);
|
FstRegistryCell* cCell = taosArrayGet(registry->table, start);
|
||||||
|
@ -166,7 +169,7 @@ FstRegistryEntry* fstRegistryGetEntry(FstRegistry* registry, FstBuilderNode* bNo
|
||||||
uint64_t last = end - 1;
|
uint64_t last = end - 1;
|
||||||
FstRegistryCell* cell = (FstRegistryCell*)taosArrayGet(registry->table, last);
|
FstRegistryCell* cell = (FstRegistryCell*)taosArrayGet(registry->table, last);
|
||||||
// clone from bNode, refactor later
|
// clone from bNode, refactor later
|
||||||
fstBuilderNodeCloneFrom(cell->node, bNode);
|
(void)fstBuilderNodeCloneFrom(cell->node, bNode);
|
||||||
|
|
||||||
fstRegistryCellPromote(registry->table, last, start);
|
fstRegistryCellPromote(registry->table, last, start);
|
||||||
FstRegistryCell* cCell = taosArrayGet(registry->table, start);
|
FstRegistryCell* cCell = taosArrayGet(registry->table, start);
|
||||||
|
|
|
@ -49,7 +49,7 @@ static int tfileReaderLoadFst(TFileReader* reader);
|
||||||
static int tfileReaderVerify(TFileReader* reader);
|
static int tfileReaderVerify(TFileReader* reader);
|
||||||
static int tfileReaderLoadTableIds(TFileReader* reader, int32_t offset, SArray* result);
|
static int tfileReaderLoadTableIds(TFileReader* reader, int32_t offset, SArray* result);
|
||||||
|
|
||||||
static SArray* tfileGetFileList(const char* path);
|
static int32_t tfileGetFileList(const char* path, SArray** pResult);
|
||||||
static int tfileRmExpireFile(SArray* result);
|
static int tfileRmExpireFile(SArray* result);
|
||||||
static void tfileDestroyFileName(void* elem);
|
static void tfileDestroyFileName(void* elem);
|
||||||
static int tfileCompare(const void* a, const void* b);
|
static int tfileCompare(const void* a, const void* b);
|
||||||
|
@ -97,9 +97,15 @@ TFileCache* tfileCacheCreate(SIndex* idx, const char* path) {
|
||||||
}
|
}
|
||||||
|
|
||||||
tcache->tableCache = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
|
tcache->tableCache = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
|
||||||
|
if (tcache->tableCache == NULL) {
|
||||||
|
indexError("failed to open table cache since%s", tstrerror(terrno));
|
||||||
|
goto End;
|
||||||
|
}
|
||||||
|
|
||||||
tcache->capacity = 64;
|
tcache->capacity = 64;
|
||||||
|
|
||||||
SArray* files = tfileGetFileList(path);
|
SArray* files = NULL;
|
||||||
|
int32_t code = tfileGetFileList(path, &files);
|
||||||
for (size_t i = 0; i < taosArrayGetSize(files); i++) {
|
for (size_t i = 0; i < taosArrayGetSize(files); i++) {
|
||||||
char* file = taosArrayGetP(files, i);
|
char* file = taosArrayGetP(files, i);
|
||||||
|
|
||||||
|
@ -125,7 +131,11 @@ TFileCache* tfileCacheCreate(SIndex* idx, const char* path) {
|
||||||
|
|
||||||
char buf[128] = {0};
|
char buf[128] = {0};
|
||||||
int32_t sz = idxSerialCacheKey(&key, buf);
|
int32_t sz = idxSerialCacheKey(&key, buf);
|
||||||
(void)taosHashPut(tcache->tableCache, buf, sz, &reader, sizeof(void*));
|
code = taosHashPut(tcache->tableCache, buf, sz, &reader, sizeof(void*));
|
||||||
|
if (code != 0) {
|
||||||
|
tfileReaderDestroy(reader);
|
||||||
|
goto End;
|
||||||
|
}
|
||||||
tfileReaderRef(reader);
|
tfileReaderRef(reader);
|
||||||
}
|
}
|
||||||
taosArrayDestroyEx(files, tfileDestroyFileName);
|
taosArrayDestroyEx(files, tfileDestroyFileName);
|
||||||
|
@ -163,6 +173,7 @@ TFileReader* tfileCacheGet(TFileCache* tcache, ICacheKey* key) {
|
||||||
|
|
||||||
return *reader;
|
return *reader;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tfileCachePut(TFileCache* tcache, ICacheKey* key, TFileReader* reader) {
|
int32_t tfileCachePut(TFileCache* tcache, ICacheKey* key, TFileReader* reader) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
|
||||||
|
@ -172,16 +183,18 @@ int32_t tfileCachePut(TFileCache* tcache, ICacheKey* key, TFileReader* reader) {
|
||||||
TFileReader** p = taosHashGet(tcache->tableCache, buf, sz);
|
TFileReader** p = taosHashGet(tcache->tableCache, buf, sz);
|
||||||
if (p != NULL && *p != NULL) {
|
if (p != NULL && *p != NULL) {
|
||||||
TFileReader* oldRdr = *p;
|
TFileReader* oldRdr = *p;
|
||||||
(void)taosHashRemove(tcache->tableCache, buf, sz);
|
if ((code = taosHashRemove(tcache->tableCache, buf, sz)) != 0) {
|
||||||
indexInfo("found %s, should remove file %s", buf, oldRdr->ctx->file.buf);
|
indexError("failed to remove old reader from cache since %s, suid:%" PRIu64 ", colName:%s", tstrerror(code),
|
||||||
oldRdr->remove = true;
|
oldRdr->header.suid, oldRdr->header.colName);
|
||||||
tfileReaderUnRef(oldRdr);
|
} else {
|
||||||
|
indexInfo("found %s, should remove file %s", buf, oldRdr->ctx->file.buf);
|
||||||
|
oldRdr->remove = true;
|
||||||
|
tfileReaderUnRef(oldRdr);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
code = taosHashPut(tcache->tableCache, buf, sz, &reader, sizeof(void*));
|
code = taosHashPut(tcache->tableCache, buf, sz, &reader, sizeof(void*));
|
||||||
if (code == 0) {
|
tfileReaderRef(reader);
|
||||||
tfileReaderRef(reader);
|
|
||||||
}
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
int32_t tfileReaderCreate(IFileCtx* ctx, TFileReader** pReader) {
|
int32_t tfileReaderCreate(IFileCtx* ctx, TFileReader** pReader) {
|
||||||
|
@ -232,7 +245,7 @@ void tfileReaderDestroy(TFileReader* reader) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t tfSearchTerm(void* reader, SIndexTerm* tem, SIdxTRslt* tr) {
|
static int32_t tfSearchTerm(void* reader, SIndexTerm* tem, SIdxTRslt* tr) {
|
||||||
int ret = 0;
|
int32_t ret = 0;
|
||||||
char* p = tem->colVal;
|
char* p = tem->colVal;
|
||||||
uint64_t sz = tem->nColVal;
|
uint64_t sz = tem->nColVal;
|
||||||
|
|
||||||
|
@ -246,6 +259,11 @@ static int32_t tfSearchTerm(void* reader, SIndexTerm* tem, SIdxTRslt* tr) {
|
||||||
tem->suid, tem->colName, tem->colVal, cost);
|
tem->suid, tem->colName, tem->colVal, cost);
|
||||||
|
|
||||||
ret = tfileReaderLoadTableIds((TFileReader*)reader, (int32_t)offset, tr->total);
|
ret = tfileReaderLoadTableIds((TFileReader*)reader, (int32_t)offset, tr->total);
|
||||||
|
if (ret != 0) {
|
||||||
|
fstSliceDestroy(&key);
|
||||||
|
indexError("faile to search since %s", tstrerror(ret));
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
cost = taosGetTimestampUs() - et;
|
cost = taosGetTimestampUs() - et;
|
||||||
indexInfo("index: %" PRIu64 ", col: %s, colVal: %s, load all table info, time cost: %" PRIu64 "us", tem->suid,
|
indexInfo("index: %" PRIu64 ", col: %s, colVal: %s, load all table info, time cost: %" PRIu64 "us", tem->suid,
|
||||||
tem->colName, tem->colVal, cost);
|
tem->colName, tem->colVal, cost);
|
||||||
|
@ -255,17 +273,29 @@ static int32_t tfSearchTerm(void* reader, SIndexTerm* tem, SIdxTRslt* tr) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t tfSearchPrefix(void* reader, SIndexTerm* tem, SIdxTRslt* tr) {
|
static int32_t tfSearchPrefix(void* reader, SIndexTerm* tem, SIdxTRslt* tr) {
|
||||||
|
int32_t lino = 0;
|
||||||
|
int32_t code = 0;
|
||||||
char* p = tem->colVal;
|
char* p = tem->colVal;
|
||||||
uint64_t sz = tem->nColVal;
|
uint64_t sz = tem->nColVal;
|
||||||
|
|
||||||
SArray* offsets = taosArrayInit(16, sizeof(uint64_t));
|
SArray* offsets = taosArrayInit(16, sizeof(uint64_t));
|
||||||
|
if (offsets == NULL) {
|
||||||
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
|
||||||
|
FAutoCtx* ctx = automCtxCreate((void*)p, AUTOMATION_PREFIX);
|
||||||
|
if (ctx == NULL) {
|
||||||
|
taosArrayDestroy(offsets);
|
||||||
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
|
||||||
FAutoCtx* ctx = automCtxCreate((void*)p, AUTOMATION_PREFIX);
|
|
||||||
FStmBuilder* sb = fstSearch(((TFileReader*)reader)->fst, ctx);
|
FStmBuilder* sb = fstSearch(((TFileReader*)reader)->fst, ctx);
|
||||||
FStmSt* st = stmBuilderIntoStm(sb);
|
FStmSt* st = stmBuilderIntoStm(sb);
|
||||||
FStmStRslt* rt = NULL;
|
FStmStRslt* rt = NULL;
|
||||||
while ((rt = stmStNextWith(st, NULL)) != NULL) {
|
while ((rt = stmStNextWith(st, NULL)) != NULL) {
|
||||||
(void)taosArrayPush(offsets, &(rt->out.out));
|
if (taosArrayPush(offsets, &(rt->out.out)) == NULL) {
|
||||||
|
TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _exception);
|
||||||
|
}
|
||||||
swsResultDestroy(rt);
|
swsResultDestroy(rt);
|
||||||
}
|
}
|
||||||
stmStDestroy(st);
|
stmStDestroy(st);
|
||||||
|
@ -275,14 +305,16 @@ static int32_t tfSearchPrefix(void* reader, SIndexTerm* tem, SIdxTRslt* tr) {
|
||||||
for (int i = 0; i < taosArrayGetSize(offsets); i++) {
|
for (int i = 0; i < taosArrayGetSize(offsets); i++) {
|
||||||
uint64_t offset = *(uint64_t*)taosArrayGet(offsets, i);
|
uint64_t offset = *(uint64_t*)taosArrayGet(offsets, i);
|
||||||
ret = tfileReaderLoadTableIds((TFileReader*)reader, offset, tr->total);
|
ret = tfileReaderLoadTableIds((TFileReader*)reader, offset, tr->total);
|
||||||
if (ret != 0) {
|
TAOS_CHECK_GOTO(ret, &lino, _exception);
|
||||||
taosArrayDestroy(offsets);
|
|
||||||
indexError("failed to find target tablelist");
|
|
||||||
return TSDB_CODE_FILE_CORRUPTED;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
taosArrayDestroy(offsets);
|
taosArrayDestroy(offsets);
|
||||||
return 0;
|
return 0;
|
||||||
|
_exception:
|
||||||
|
stmStDestroy(st);
|
||||||
|
stmBuilderDestroy(sb);
|
||||||
|
taosArrayDestroy(offsets);
|
||||||
|
indexError("failed to searchPrefix since %s, lino:%d", tstrerror(code), lino);
|
||||||
|
return code;
|
||||||
}
|
}
|
||||||
static int32_t tfSearchSuffix(void* reader, SIndexTerm* tem, SIdxTRslt* tr) {
|
static int32_t tfSearchSuffix(void* reader, SIndexTerm* tem, SIdxTRslt* tr) {
|
||||||
int ret = 0;
|
int ret = 0;
|
||||||
|
@ -393,6 +425,12 @@ static int32_t tfSearchTerm_JSON(void* reader, SIndexTerm* tem, SIdxTRslt* tr) {
|
||||||
tem->suid, tem->colName, tem->colVal, cost);
|
tem->suid, tem->colName, tem->colVal, cost);
|
||||||
|
|
||||||
ret = tfileReaderLoadTableIds((TFileReader*)reader, offset, tr->total);
|
ret = tfileReaderLoadTableIds((TFileReader*)reader, offset, tr->total);
|
||||||
|
if (ret != 0) {
|
||||||
|
indexError("failed to search json since %s", tstrerror(ret));
|
||||||
|
taosMemoryFree(p);
|
||||||
|
fstSliceDestroy(&key);
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
cost = taosGetTimestampUs() - et;
|
cost = taosGetTimestampUs() - et;
|
||||||
indexInfo("index: %" PRIu64 ", col: %s, colVal: %s, load all table info, offset: %" PRIu64
|
indexInfo("index: %" PRIu64 ", col: %s, colVal: %s, load all table info, offset: %" PRIu64
|
||||||
", size: %d, time cost: %" PRIu64 "us",
|
", size: %d, time cost: %" PRIu64 "us",
|
||||||
|
@ -863,14 +901,24 @@ TFileValue* tfileValueCreate(char* val) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
tf->colVal = taosStrdup(val);
|
tf->colVal = taosStrdup(val);
|
||||||
|
if (tf->colVal == NULL) {
|
||||||
|
taosMemoryFree(tf);
|
||||||
|
}
|
||||||
tf->tableId = taosArrayInit(32, sizeof(uint64_t));
|
tf->tableId = taosArrayInit(32, sizeof(uint64_t));
|
||||||
|
if (tf->tableId == NULL) {
|
||||||
|
taosMemoryFree(tf->colVal);
|
||||||
|
taosMemoryFree(tf);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
return tf;
|
return tf;
|
||||||
}
|
}
|
||||||
int tfileValuePush(TFileValue* tf, uint64_t val) {
|
int32_t tfileValuePush(TFileValue* tf, uint64_t val) {
|
||||||
if (tf == NULL) {
|
if (tf == NULL) {
|
||||||
return -1;
|
return TSDB_CODE_INVALID_PARA;
|
||||||
|
}
|
||||||
|
if (taosArrayPush(tf->tableId, &val) == NULL) {
|
||||||
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
}
|
}
|
||||||
(void)taosArrayPush(tf->tableId, &val);
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
void tfileValueDestroy(TFileValue* tf) {
|
void tfileValueDestroy(TFileValue* tf) {
|
||||||
|
@ -986,8 +1034,10 @@ static int tfileReaderLoadFst(TFileReader* reader) {
|
||||||
|
|
||||||
return reader->fst != NULL ? 0 : -1;
|
return reader->fst != NULL ? 0 : -1;
|
||||||
}
|
}
|
||||||
static int tfileReaderLoadTableIds(TFileReader* reader, int32_t offset, SArray* result) {
|
static int32_t tfileReaderLoadTableIds(TFileReader* reader, int32_t offset, SArray* result) {
|
||||||
// TODO(yihao): opt later
|
// TODO(yihao): opt later
|
||||||
|
int32_t code = 0;
|
||||||
|
int32_t lino = 0;
|
||||||
IFileCtx* ctx = reader->ctx;
|
IFileCtx* ctx = reader->ctx;
|
||||||
// add block cache
|
// add block cache
|
||||||
char block[4096] = {0};
|
char block[4096] = {0};
|
||||||
|
@ -1003,7 +1053,9 @@ static int tfileReaderLoadTableIds(TFileReader* reader, int32_t offset, SArray*
|
||||||
while (nid > 0) {
|
while (nid > 0) {
|
||||||
int32_t left = block + sizeof(block) - p;
|
int32_t left = block + sizeof(block) - p;
|
||||||
if (left >= sizeof(uint64_t)) {
|
if (left >= sizeof(uint64_t)) {
|
||||||
(void)taosArrayPush(result, (uint64_t*)p);
|
if (taosArrayPush(result, (uint64_t*)p) == NULL) {
|
||||||
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
p += sizeof(uint64_t);
|
p += sizeof(uint64_t);
|
||||||
} else {
|
} else {
|
||||||
char buf[sizeof(uint64_t)] = {0};
|
char buf[sizeof(uint64_t)] = {0};
|
||||||
|
@ -1014,7 +1066,9 @@ static int tfileReaderLoadTableIds(TFileReader* reader, int32_t offset, SArray*
|
||||||
nread = ctx->readFrom(ctx, (uint8_t*)block, sizeof(block), offset);
|
nread = ctx->readFrom(ctx, (uint8_t*)block, sizeof(block), offset);
|
||||||
memcpy(buf + left, block, sizeof(uint64_t) - left);
|
memcpy(buf + left, block, sizeof(uint64_t) - left);
|
||||||
|
|
||||||
(void)taosArrayPush(result, (uint64_t*)buf);
|
if (taosArrayPush(result, (uint64_t*)buf) == NULL) {
|
||||||
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
p = block + sizeof(uint64_t) - left;
|
p = block + sizeof(uint64_t) - left;
|
||||||
}
|
}
|
||||||
nid -= 1;
|
nid -= 1;
|
||||||
|
@ -1059,16 +1113,19 @@ void tfileReaderUnRef(TFileReader* rd) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static SArray* tfileGetFileList(const char* path) {
|
static int32_t tfileGetFileList(const char* path, SArray** ppResult) {
|
||||||
|
int32_t code = 0;
|
||||||
char buf[128] = {0};
|
char buf[128] = {0};
|
||||||
uint64_t suid;
|
uint64_t suid;
|
||||||
int64_t version;
|
int64_t version;
|
||||||
SArray* files = taosArrayInit(4, sizeof(void*));
|
SArray* files = taosArrayInit(4, sizeof(void*));
|
||||||
|
if (files == NULL) {
|
||||||
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
|
||||||
TdDirPtr pDir = taosOpenDir(path);
|
TdDirPtr pDir = taosOpenDir(path);
|
||||||
if (NULL == pDir) {
|
if (NULL == pDir) {
|
||||||
taosArrayDestroy(files);
|
TAOS_CHECK_GOTO(TAOS_SYSTEM_ERROR(errno), NULL, _exception);
|
||||||
return NULL;
|
|
||||||
}
|
}
|
||||||
TdDirEntryPtr pDirEntry;
|
TdDirEntryPtr pDirEntry;
|
||||||
while ((pDirEntry = taosReadDir(pDir)) != NULL) {
|
while ((pDirEntry = taosReadDir(pDir)) != NULL) {
|
||||||
|
@ -1079,15 +1136,29 @@ static SArray* tfileGetFileList(const char* path) {
|
||||||
|
|
||||||
size_t len = strlen(path) + 1 + strlen(file) + 1;
|
size_t len = strlen(path) + 1 + strlen(file) + 1;
|
||||||
char* buf = taosMemoryCalloc(1, len);
|
char* buf = taosMemoryCalloc(1, len);
|
||||||
|
if (buf == NULL) {
|
||||||
|
TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _exception);
|
||||||
|
}
|
||||||
|
|
||||||
sprintf(buf, "%s/%s", path, file);
|
sprintf(buf, "%s/%s", path, file);
|
||||||
(void)taosArrayPush(files, &buf);
|
if (taosArrayPush(files, &buf) == NULL) {
|
||||||
|
TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _exception);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
(void)taosCloseDir(&pDir);
|
(void)taosCloseDir(&pDir);
|
||||||
|
|
||||||
taosArraySort(files, tfileCompare);
|
taosArraySort(files, tfileCompare);
|
||||||
(void)tfileRmExpireFile(files);
|
(void)tfileRmExpireFile(files);
|
||||||
|
*ppResult = files;
|
||||||
|
return 0;
|
||||||
|
|
||||||
return files;
|
_exception:
|
||||||
|
(void)taosCloseDir(&pDir);
|
||||||
|
if (files != NULL) {
|
||||||
|
taosArrayDestroyEx(files, tfileDestroyFileName);
|
||||||
|
taosArrayDestroy(files);
|
||||||
|
}
|
||||||
|
return code;
|
||||||
}
|
}
|
||||||
static int tfileRmExpireFile(SArray* result) {
|
static int tfileRmExpireFile(SArray* result) {
|
||||||
// TODO(yihao): remove expire tindex after restart
|
// TODO(yihao): remove expire tindex after restart
|
||||||
|
|
|
@ -36,12 +36,16 @@ static FORCE_INLINE int iBinarySearch(SArray *arr, int s, int e, uint64_t k) {
|
||||||
return s;
|
return s;
|
||||||
}
|
}
|
||||||
|
|
||||||
void iIntersection(SArray *in, SArray *out) {
|
int32_t iIntersection(SArray *in, SArray *out) {
|
||||||
|
int32_t code = 0;
|
||||||
int32_t sz = (int32_t)taosArrayGetSize(in);
|
int32_t sz = (int32_t)taosArrayGetSize(in);
|
||||||
if (sz <= 0) {
|
if (sz <= 0) {
|
||||||
return;
|
return 0;
|
||||||
}
|
}
|
||||||
MergeIndex *mi = taosMemoryCalloc(sz, sizeof(MergeIndex));
|
MergeIndex *mi = taosMemoryCalloc(sz, sizeof(MergeIndex));
|
||||||
|
if (mi == NULL) {
|
||||||
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
for (int i = 0; i < sz; i++) {
|
for (int i = 0; i < sz; i++) {
|
||||||
SArray *t = taosArrayGetP(in, i);
|
SArray *t = taosArrayGetP(in, i);
|
||||||
mi[i].len = (int32_t)taosArrayGetSize(t);
|
mi[i].len = (int32_t)taosArrayGetSize(t);
|
||||||
|
@ -64,19 +68,25 @@ void iIntersection(SArray *in, SArray *out) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (has == true) {
|
if (has == true) {
|
||||||
(void)taosArrayPush(out, &tgt);
|
if (taosArrayPush(out, &tgt) == NULL) {
|
||||||
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
break;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
taosMemoryFreeClear(mi);
|
taosMemoryFreeClear(mi);
|
||||||
|
return code;
|
||||||
}
|
}
|
||||||
void iUnion(SArray *in, SArray *out) {
|
int32_t iUnion(SArray *in, SArray *out) {
|
||||||
|
int32_t code = 0;
|
||||||
int32_t sz = (int32_t)taosArrayGetSize(in);
|
int32_t sz = (int32_t)taosArrayGetSize(in);
|
||||||
if (sz <= 0) {
|
if (sz <= 0) {
|
||||||
return;
|
return 0;
|
||||||
}
|
}
|
||||||
if (sz == 1) {
|
if (sz == 1) {
|
||||||
(void)taosArrayAddAll(out, taosArrayGetP(in, 0));
|
if (taosArrayAddAll(out, taosArrayGetP(in, 0)) == NULL) {
|
||||||
return;
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
MergeIndex *mi = taosMemoryCalloc(sz, sizeof(MergeIndex));
|
MergeIndex *mi = taosMemoryCalloc(sz, sizeof(MergeIndex));
|
||||||
|
@ -108,19 +118,23 @@ void iUnion(SArray *in, SArray *out) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
(void)taosArrayPush(out, &mVal);
|
if (taosArrayPush(out, &mVal) == NULL) {
|
||||||
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
break;
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
taosMemoryFreeClear(mi);
|
taosMemoryFreeClear(mi);
|
||||||
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
void iExcept(SArray *total, SArray *except) {
|
int32_t iExcept(SArray *total, SArray *except) {
|
||||||
int32_t tsz = (int32_t)taosArrayGetSize(total);
|
int32_t tsz = (int32_t)taosArrayGetSize(total);
|
||||||
int32_t esz = (int32_t)taosArrayGetSize(except);
|
int32_t esz = (int32_t)taosArrayGetSize(except);
|
||||||
if (esz == 0 || tsz == 0) {
|
if (esz == 0 || tsz == 0) {
|
||||||
return;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int vIdx = 0;
|
int vIdx = 0;
|
||||||
|
@ -135,6 +149,7 @@ void iExcept(SArray *total, SArray *except) {
|
||||||
}
|
}
|
||||||
|
|
||||||
taosArrayPopTailBatch(total, tsz - vIdx);
|
taosArrayPopTailBatch(total, tsz - vIdx);
|
||||||
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int uidCompare(const void *a, const void *b) {
|
int uidCompare(const void *a, const void *b) {
|
||||||
|
@ -191,20 +206,37 @@ void idxTRsltDestroy(SIdxTRslt *tr) {
|
||||||
taosArrayDestroy(tr->del);
|
taosArrayDestroy(tr->del);
|
||||||
taosMemoryFree(tr);
|
taosMemoryFree(tr);
|
||||||
}
|
}
|
||||||
void idxTRsltMergeTo(SIdxTRslt *tr, SArray *result) {
|
int32_t idxTRsltMergeTo(SIdxTRslt *tr, SArray *result) {
|
||||||
|
int32_t code = 0;
|
||||||
taosArraySort(tr->total, uidCompare);
|
taosArraySort(tr->total, uidCompare);
|
||||||
taosArraySort(tr->add, uidCompare);
|
taosArraySort(tr->add, uidCompare);
|
||||||
taosArraySort(tr->del, uidCompare);
|
taosArraySort(tr->del, uidCompare);
|
||||||
|
|
||||||
if (taosArrayGetSize(tr->total) == 0 || taosArrayGetSize(tr->add) == 0) {
|
if (taosArrayGetSize(tr->total) == 0 || taosArrayGetSize(tr->add) == 0) {
|
||||||
SArray *t = taosArrayGetSize(tr->total) == 0 ? tr->add : tr->total;
|
SArray *t = taosArrayGetSize(tr->total) == 0 ? tr->add : tr->total;
|
||||||
(void)taosArrayAddAll(result, t);
|
if (taosArrayAddAll(result, t) == NULL) {
|
||||||
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
SArray *arrs = taosArrayInit(2, sizeof(void *));
|
SArray *arrs = taosArrayInit(2, sizeof(void *));
|
||||||
(void)taosArrayPush(arrs, &tr->total);
|
if (arrs == NULL) {
|
||||||
(void)taosArrayPush(arrs, &tr->add);
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
iUnion(arrs, result);
|
}
|
||||||
|
|
||||||
|
if (taosArrayPush(arrs, &tr->total) == NULL) {
|
||||||
|
taosArrayDestroy(arrs);
|
||||||
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (taosArrayPush(arrs, &tr->add) == NULL) {
|
||||||
|
taosArrayDestroy(arrs);
|
||||||
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
code = iUnion(arrs, result);
|
||||||
taosArrayDestroy(arrs);
|
taosArrayDestroy(arrs);
|
||||||
}
|
}
|
||||||
iExcept(result, tr->del);
|
if (code == 0) {
|
||||||
|
code = iExcept(result, tr->del);
|
||||||
|
}
|
||||||
|
return code;
|
||||||
}
|
}
|
||||||
|
|
|
@ -1096,21 +1096,23 @@ int32_t delObsoleteCheckpoint(void* arg, const char* path) {
|
||||||
* replication is finished
|
* replication is finished
|
||||||
*/
|
*/
|
||||||
int32_t chkpMayDelObsolete(void* arg, int64_t chkpId, char* path) {
|
int32_t chkpMayDelObsolete(void* arg, int64_t chkpId, char* path) {
|
||||||
|
int32_t code = 0;
|
||||||
STaskDbWrapper* pBackend = arg;
|
STaskDbWrapper* pBackend = arg;
|
||||||
|
SArray * chkpDel = NULL, *chkpDup = NULL;
|
||||||
(void)taosThreadRwlockWrlock(&pBackend->chkpDirLock);
|
(void)taosThreadRwlockWrlock(&pBackend->chkpDirLock);
|
||||||
|
|
||||||
(void)taosArrayPush(pBackend->chkpSaved, &chkpId);
|
if (taosArrayPush(pBackend->chkpSaved, &chkpId) == NULL) {
|
||||||
|
TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _exception);
|
||||||
SArray* chkpDel = taosArrayInit(8, sizeof(int64_t));
|
|
||||||
if (chkpDel == NULL) {
|
|
||||||
return TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
SArray* chkpDup = taosArrayInit(8, sizeof(int64_t));
|
chkpDel = taosArrayInit(8, sizeof(int64_t));
|
||||||
|
if (chkpDel == NULL) {
|
||||||
|
TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _exception);
|
||||||
|
}
|
||||||
|
|
||||||
|
chkpDup = taosArrayInit(8, sizeof(int64_t));
|
||||||
if (chkpDup == NULL) {
|
if (chkpDup == NULL) {
|
||||||
taosArrayDestroy(chkpDel);
|
TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _exception);
|
||||||
return TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
int64_t firsId = 0;
|
int64_t firsId = 0;
|
||||||
|
@ -1120,9 +1122,13 @@ int32_t chkpMayDelObsolete(void* arg, int64_t chkpId, char* path) {
|
||||||
for (int i = 0; i < taosArrayGetSize(pBackend->chkpSaved); i++) {
|
for (int i = 0; i < taosArrayGetSize(pBackend->chkpSaved); i++) {
|
||||||
int64_t id = *(int64_t*)taosArrayGet(pBackend->chkpSaved, i);
|
int64_t id = *(int64_t*)taosArrayGet(pBackend->chkpSaved, i);
|
||||||
if (id >= firsId) {
|
if (id >= firsId) {
|
||||||
(void)taosArrayPush(chkpDup, &id);
|
if (taosArrayPush(chkpDup, &id) == NULL) {
|
||||||
|
TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _exception);
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
(void)taosArrayPush(chkpDel, &id);
|
if (taosArrayPush(chkpDel, &id) == NULL) {
|
||||||
|
TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _exception);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
@ -1131,13 +1137,18 @@ int32_t chkpMayDelObsolete(void* arg, int64_t chkpId, char* path) {
|
||||||
|
|
||||||
for (int i = 0; i < dsz; i++) {
|
for (int i = 0; i < dsz; i++) {
|
||||||
int64_t id = *(int64_t*)taosArrayGet(pBackend->chkpSaved, i);
|
int64_t id = *(int64_t*)taosArrayGet(pBackend->chkpSaved, i);
|
||||||
(void)taosArrayPush(chkpDel, &id);
|
if (taosArrayPush(chkpDel, &id) == NULL) {
|
||||||
|
TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _exception);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
for (int i = dsz < 0 ? 0 : dsz; i < sz; i++) {
|
for (int i = dsz < 0 ? 0 : dsz; i < sz; i++) {
|
||||||
int64_t id = *(int64_t*)taosArrayGet(pBackend->chkpSaved, i);
|
int64_t id = *(int64_t*)taosArrayGet(pBackend->chkpSaved, i);
|
||||||
(void)taosArrayPush(chkpDup, &id);
|
if (taosArrayPush(chkpDup, &id) == NULL) {
|
||||||
|
TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _exception);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
taosArrayDestroy(pBackend->chkpSaved);
|
taosArrayDestroy(pBackend->chkpSaved);
|
||||||
pBackend->chkpSaved = chkpDup;
|
pBackend->chkpSaved = chkpDup;
|
||||||
|
|
||||||
|
@ -1155,6 +1166,11 @@ int32_t chkpMayDelObsolete(void* arg, int64_t chkpId, char* path) {
|
||||||
}
|
}
|
||||||
taosArrayDestroy(chkpDel);
|
taosArrayDestroy(chkpDel);
|
||||||
return 0;
|
return 0;
|
||||||
|
_exception:
|
||||||
|
taosArrayDestroy(chkpDup);
|
||||||
|
taosArrayDestroy(chkpDel);
|
||||||
|
(void)taosThreadRwlockUnlock(&pBackend->chkpDirLock);
|
||||||
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
#ifdef BUILD_NO_CALL
|
#ifdef BUILD_NO_CALL
|
||||||
|
@ -1288,7 +1304,9 @@ int32_t taskDbLoadChkpInfo(STaskDbWrapper* pBackend) {
|
||||||
|
|
||||||
int ret = sscanf(taosGetDirEntryName(de), "checkpoint%" PRId64 "", &checkpointId);
|
int ret = sscanf(taosGetDirEntryName(de), "checkpoint%" PRId64 "", &checkpointId);
|
||||||
if (ret == 1) {
|
if (ret == 1) {
|
||||||
(void)taosArrayPush(pBackend->chkpSaved, &checkpointId);
|
if (taosArrayPush(pBackend->chkpSaved, &checkpointId) == NULL) {
|
||||||
|
TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _exception);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
continue;
|
continue;
|
||||||
|
@ -1300,13 +1318,21 @@ int32_t taskDbLoadChkpInfo(STaskDbWrapper* pBackend) {
|
||||||
(void)taosCloseDir(&pDir);
|
(void)taosCloseDir(&pDir);
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
|
_exception:
|
||||||
|
taosMemoryFree(pChkpDir);
|
||||||
|
(void)taosCloseDir(&pDir);
|
||||||
|
return code;
|
||||||
}
|
}
|
||||||
int32_t chkpGetAllDbCfHandle2(STaskDbWrapper* pBackend, rocksdb_column_family_handle_t*** ppHandle) {
|
int32_t chkpGetAllDbCfHandle2(STaskDbWrapper* pBackend, rocksdb_column_family_handle_t*** ppHandle) {
|
||||||
|
int32_t code = 0;
|
||||||
SArray* pHandle = taosArrayInit(8, POINTER_BYTES);
|
SArray* pHandle = taosArrayInit(8, POINTER_BYTES);
|
||||||
for (int i = 0; i < sizeof(ginitDict) / sizeof(ginitDict[0]); i++) {
|
for (int i = 0; i < sizeof(ginitDict) / sizeof(ginitDict[0]); i++) {
|
||||||
if (pBackend->pCf[i]) {
|
if (pBackend->pCf[i]) {
|
||||||
rocksdb_column_family_handle_t* p = pBackend->pCf[i];
|
rocksdb_column_family_handle_t* p = pBackend->pCf[i];
|
||||||
(void)taosArrayPush(pHandle, &p);
|
if (taosArrayPush(pHandle, &p) == NULL) {
|
||||||
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
goto _exception;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
int32_t nCf = taosArrayGetSize(pHandle);
|
int32_t nCf = taosArrayGetSize(pHandle);
|
||||||
|
@ -1316,13 +1342,20 @@ int32_t chkpGetAllDbCfHandle2(STaskDbWrapper* pBackend, rocksdb_column_family_ha
|
||||||
}
|
}
|
||||||
|
|
||||||
rocksdb_column_family_handle_t** ppCf = taosMemoryCalloc(nCf, sizeof(rocksdb_column_family_handle_t*));
|
rocksdb_column_family_handle_t** ppCf = taosMemoryCalloc(nCf, sizeof(rocksdb_column_family_handle_t*));
|
||||||
|
if (ppCf == NULL) {
|
||||||
|
TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _exception);
|
||||||
|
}
|
||||||
for (int i = 0; i < nCf; i++) {
|
for (int i = 0; i < nCf; i++) {
|
||||||
ppCf[i] = taosArrayGetP(pHandle, i);
|
ppCf[i] = taosArrayGetP(pHandle, i);
|
||||||
}
|
}
|
||||||
|
|
||||||
taosArrayDestroy(pHandle);
|
taosArrayDestroy(pHandle);
|
||||||
|
|
||||||
*ppHandle = ppCf;
|
*ppHandle = ppCf;
|
||||||
return nCf;
|
return nCf;
|
||||||
|
_exception:
|
||||||
|
taosArrayDestroy(pHandle);
|
||||||
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t chkpDoDbCheckpoint(rocksdb_t* db, char* path) {
|
int32_t chkpDoDbCheckpoint(rocksdb_t* db, char* path) {
|
||||||
|
@ -2435,7 +2468,9 @@ void taskDbInitChkpOpt(STaskDbWrapper* pTaskDb) {
|
||||||
|
|
||||||
void taskDbRefChkp(STaskDbWrapper* pTaskDb, int64_t chkp) {
|
void taskDbRefChkp(STaskDbWrapper* pTaskDb, int64_t chkp) {
|
||||||
(void)taosThreadRwlockWrlock(&pTaskDb->chkpDirLock);
|
(void)taosThreadRwlockWrlock(&pTaskDb->chkpDirLock);
|
||||||
(void)taosArrayPush(pTaskDb->chkpInUse, &chkp);
|
if (taosArrayPush(pTaskDb->chkpInUse, &chkp) == NULL) {
|
||||||
|
stError("failed to push chkp: %" PRIi64 " into inuse", chkp);
|
||||||
|
}
|
||||||
taosArraySort(pTaskDb->chkpInUse, chkpIdComp);
|
taosArraySort(pTaskDb->chkpInUse, chkpIdComp);
|
||||||
(void)taosThreadRwlockUnlock(&pTaskDb->chkpDirLock);
|
(void)taosThreadRwlockUnlock(&pTaskDb->chkpDirLock);
|
||||||
}
|
}
|
||||||
|
@ -4271,7 +4306,10 @@ int32_t streamDefaultIterGet_rocksdb(SStreamState* pState, const void* start, co
|
||||||
if (strncmp(key, start, strlen(start)) == 0 && strlen(key) >= strlen(start) + 1) {
|
if (strncmp(key, start, strlen(start)) == 0 && strlen(key) >= strlen(start) + 1) {
|
||||||
int64_t checkPoint = 0;
|
int64_t checkPoint = 0;
|
||||||
if (sscanf(key + strlen(key), ":%" PRId64 "", &checkPoint) == 1) {
|
if (sscanf(key + strlen(key), ":%" PRId64 "", &checkPoint) == 1) {
|
||||||
(void)taosArrayPush(result, &checkPoint);
|
if (taosArrayPush(result, &checkPoint) == NULL) {
|
||||||
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
break;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
break;
|
break;
|
||||||
|
@ -4487,7 +4525,10 @@ int32_t compareHashTableImpl(SHashObj* p1, SHashObj* p2, SArray* diff) {
|
||||||
return TSDB_CODE_OUT_OF_MEMORY;
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
}
|
}
|
||||||
(void)strncpy(fname, name, len);
|
(void)strncpy(fname, name, len);
|
||||||
(void)taosArrayPush(diff, &fname);
|
if (taosArrayPush(diff, &fname) == NULL) {
|
||||||
|
taosMemoryFree(fname);
|
||||||
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
pIter = taosHashIterate(p2, pIter);
|
pIter = taosHashIterate(p2, pIter);
|
||||||
}
|
}
|
||||||
|
@ -4646,7 +4687,11 @@ int32_t dbChkpGetDelta(SDbChkp* p, int64_t chkpId, SArray* list) {
|
||||||
}
|
}
|
||||||
|
|
||||||
(void)strncpy(fname, name, len);
|
(void)strncpy(fname, name, len);
|
||||||
(void)taosArrayPush(p->pAdd, &fname);
|
if (taosArrayPush(p->pAdd, &fname) == NULL) {
|
||||||
|
taosMemoryFree(fname);
|
||||||
|
(void)taosThreadRwlockUnlock(&p->rwLock);
|
||||||
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
pIter = taosHashIterate(p->pSstTbl[1 - p->idx], pIter);
|
pIter = taosHashIterate(p->pSstTbl[1 - p->idx], pIter);
|
||||||
}
|
}
|
||||||
|
@ -4850,7 +4895,11 @@ int32_t dbChkpDumpTo(SDbChkp* p, char* dname, SArray* list) {
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
goto _ERROR;
|
goto _ERROR;
|
||||||
}
|
}
|
||||||
(void)taosArrayPush(list, &p);
|
if (taosArrayPush(list, &p) == NULL) {
|
||||||
|
taosMemoryFree(p);
|
||||||
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
goto _ERROR;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// copy current file to dst dir
|
// copy current file to dst dir
|
||||||
|
|
Loading…
Reference in New Issue