Merge pull request #26746 from taosdata/fix/refactErrorCodeIdx

Fix/refactErrorCodeIdx
This commit is contained in:
Hongze Cheng 2024-07-27 17:07:39 +08:00 committed by GitHub
commit 805a1387b7
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
12 changed files with 191 additions and 133 deletions

View File

@ -83,7 +83,7 @@ void indexMultiTermQueryDestroy(SIndexMultiTermQuery* pQuery);
* @param type (input, single query type)
* @return error code
*/
int indexMultiTermQueryAdd(SIndexMultiTermQuery* pQuery, SIndexTerm* term, EIndexQueryType type);
int32_t indexMultiTermQueryAdd(SIndexMultiTermQuery* pQuery, SIndexTerm* term, EIndexQueryType type);
/*
* open index
* @param opt (input, index opt)
@ -91,7 +91,7 @@ int indexMultiTermQueryAdd(SIndexMultiTermQuery* pQuery, SIndexTerm* term, EInde
* @param index (output, index object)
* @return error code
*/
int indexOpen(SIndexOpts* opt, const char* path, SIndex** index);
int32_t indexOpen(SIndexOpts* opt, const char* path, SIndex** index);
/*
* close index
* @param index (input, index to be closed)
@ -106,7 +106,7 @@ void indexClose(SIndex* index);
* @param uid (input, uid of terms)
* @return error code
*/
int indexPut(SIndex* index, SIndexMultiTerm* terms, uint64_t uid);
int32_t indexPut(SIndex* index, SIndexMultiTerm* terms, uint64_t uid);
/*
* delete terms that meet query condition
* @param index (input, index object)
@ -114,7 +114,7 @@ int indexPut(SIndex* index, SIndexMultiTerm* terms, uint64_t uid);
* @return error code
*/
int indexDelete(SIndex* index, SIndexMultiTermQuery* query);
int32_t indexDelete(SIndex* index, SIndexMultiTermQuery* query);
/*
* search index
* @param index (input, index object)
@ -122,7 +122,7 @@ int indexDelete(SIndex* index, SIndexMultiTermQuery* query);
* @param result(output, query result)
* @return error code
*/
int indexSearch(SIndex* index, SIndexMultiTermQuery* query, SArray* result);
int32_t indexSearch(SIndex* index, SIndexMultiTermQuery* query, SArray* result);
/*
* rebuild index
* @param index (input, index object)
@ -138,7 +138,7 @@ int indexSearch(SIndex* index, SIndexMultiTermQuery* query, SArray* result);
* @param index (output, index json object)
* @return error code
*/
int indexJsonOpen(SIndexJsonOpts* opts, const char* path, SIndexJson** index);
int32_t indexJsonOpen(SIndexJsonOpts* opts, const char* path, SIndexJson** index);
/*
* close index
* @param index (input, index to be closed)
@ -154,7 +154,7 @@ void indexJsonClose(SIndexJson* index);
* @param uid (input, uid of terms)
* @return error code
*/
int indexJsonPut(SIndexJson* index, SIndexJsonMultiTerm* terms, uint64_t uid);
int32_t indexJsonPut(SIndexJson* index, SIndexJsonMultiTerm* terms, uint64_t uid);
/*
* search index
* @param index (input, index object)
@ -163,7 +163,7 @@ int indexJsonPut(SIndexJson* index, SIndexJsonMultiTerm* terms, uint64_t uid);
* @return error code
*/
int indexJsonSearch(SIndexJson* index, SIndexJsonMultiTermQuery* query, SArray* result);
int32_t indexJsonSearch(SIndexJson* index, SIndexJsonMultiTermQuery* query, SArray* result);
/*
* @param
* @param

View File

@ -78,11 +78,11 @@ typedef struct IdxFstFile {
CheckSummer summer;
} IdxFstFile;
int idxFileWrite(IdxFstFile* write, uint8_t* buf, uint32_t len);
int32_t idxFileWrite(IdxFstFile* write, uint8_t* buf, uint32_t len);
int idxFileRead(IdxFstFile* write, uint8_t* buf, uint32_t len);
int32_t idxFileRead(IdxFstFile* write, uint8_t* buf, uint32_t len);
int idxFileFlush(IdxFstFile* write);
int32_t idxFileFlush(IdxFstFile* write);
uint32_t idxFileMaskedCheckSum(IdxFstFile* write);

View File

@ -99,23 +99,24 @@ typedef struct TFileReaderOpt {
TFileCache* tfileCacheCreate(SIndex* idx, const char* path);
void tfileCacheDestroy(TFileCache* tcache);
TFileReader* tfileCacheGet(TFileCache* tcache, ICacheKey* key);
void tfileCachePut(TFileCache* tcache, ICacheKey* key, TFileReader* reader);
int32_t tfileCachePut(TFileCache* tcache, ICacheKey* key, TFileReader* reader);
TFileReader* tfileGetReaderByCol(IndexTFile* tf, uint64_t suid, char* colName);
TFileReader* tfileReaderOpen(SIndex* idx, uint64_t suid, int64_t version, const char* colName);
TFileReader* tfileReaderCreate(IFileCtx* ctx);
void tfileReaderDestroy(TFileReader* reader);
int tfileReaderSearch(TFileReader* reader, SIndexTermQuery* query, SIdxTRslt* tr);
void tfileReaderRef(TFileReader* reader);
void tfileReaderUnRef(TFileReader* reader);
int32_t tfileReaderOpen(SIndex* idx, uint64_t suid, int64_t version, const char* colName, TFileReader** pReader);
int32_t tfileReaderCreate(IFileCtx* ctx, TFileReader** pReader);
void tfileReaderDestroy(TFileReader* reader);
int tfileReaderSearch(TFileReader* reader, SIndexTermQuery* query, SIdxTRslt* tr);
void tfileReaderRef(TFileReader* reader);
void tfileReaderUnRef(TFileReader* reader);
TFileWriter* tfileWriterOpen(char* path, uint64_t suid, int64_t version, const char* colName, uint8_t type);
void tfileWriterClose(TFileWriter* tw);
TFileWriter* tfileWriterCreate(IFileCtx* ctx, TFileHeader* header);
void tfileWriterDestroy(TFileWriter* tw);
int tfileWriterPut(TFileWriter* tw, void* data, bool order);
int tfileWriterFinish(TFileWriter* tw);
int32_t tfileWriterOpen(char* path, uint64_t suid, int64_t version, const char* colName, uint8_t type,
TFileWriter** pWriter);
void tfileWriterClose(TFileWriter* tw);
int32_t tfileWriterCreate(IFileCtx* ctx, TFileHeader* header, TFileWriter** pWriter);
void tfileWriterDestroy(TFileWriter* tw);
int tfileWriterPut(TFileWriter* tw, void* data, bool order);
int tfileWriterFinish(TFileWriter* tw);
//
IndexTFile* idxTFileCreate(SIndex* idx, const char* path);

View File

@ -86,10 +86,10 @@ static TdThreadOnce isInit = PTHREAD_ONCE_INIT;
// static void indexInit();
static int idxTermSearch(SIndex* sIdx, SIndexTermQuery* term, SArray** result);
static void idxInterRsltDestroy(SArray* results);
static int idxMergeFinalResults(SArray* in, EIndexOperatorType oType, SArray* out);
static void idxInterRsltDestroy(SArray* results);
static int32_t idxMergeFinalResults(SArray* in, EIndexOperatorType oType, SArray* out);
static int idxGenTFile(SIndex* index, IndexCache* cache, SArray* batch);
static int32_t idxGenTFile(SIndex* index, IndexCache* cache, SArray* batch);
// merge cache and tfile by opera type
static void idxMergeCacheAndTFile(SArray* result, IterateValue* icache, IterateValue* iTfv, SIdxTRslt* helper);
@ -106,30 +106,37 @@ static void indexWait(void* idx) {
tsem_wait(&pIdx->sem);
}
int indexOpen(SIndexOpts* opts, const char* path, SIndex** index) {
int ret = TSDB_CODE_SUCCESS;
int32_t indexOpen(SIndexOpts* opts, const char* path, SIndex** index) {
taosThreadOnce(&isInit, indexEnvInit);
int code = TSDB_CODE_SUCCESS;
SIndex* idx = taosMemoryCalloc(1, sizeof(SIndex));
if (idx == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, END);
}
idx->lru = taosLRUCacheInit(opts->cacheSize, -1, .5);
if (idx->lru == NULL) {
ret = TSDB_CODE_OUT_OF_MEMORY;
goto END;
TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, END);
}
taosLRUCacheSetStrictCapacity(idx->lru, false);
idx->tindex = idxTFileCreate(idx, path);
if (idx->tindex == NULL) {
ret = TSDB_CODE_OUT_OF_MEMORY;
goto END;
TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, END);
}
idx->colObj = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
if (idx->colObj == NULL) {
TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, END);
}
idx->version = 1;
idx->path = taosStrdup(path);
if (idx->path == NULL) {
TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, END);
}
taosThreadMutexInit(&idx->mtx, NULL);
tsem_init(&idx->sem, 0, 0);
@ -138,17 +145,18 @@ int indexOpen(SIndexOpts* opts, const char* path, SIndex** index) {
idxAcquireRef(idx->refId);
*index = idx;
return ret;
return code;
END:
if (idx != NULL) {
indexDestroy(idx);
}
*index = NULL;
return ret;
return code;
}
void indexDestroy(void* handle) {
if (handle == NULL) return;
SIndex* idx = handle;
taosThreadMutexDestroy(&idx->mtx);
tsem_destroy(&idx->sem);
@ -202,7 +210,7 @@ void idxReleaseRef(int64_t ref) {
taosReleaseRef(indexRefMgt, ref);
}
int indexPut(SIndex* index, SIndexMultiTerm* fVals, uint64_t uid) {
int32_t indexPut(SIndex* index, SIndexMultiTerm* fVals, uint64_t uid) {
// TODO(yihao): reduce the lock range
taosThreadMutexLock(&index->mtx);
for (int i = 0; i < taosArrayGetSize(fVals); i++) {
@ -239,7 +247,7 @@ int indexPut(SIndex* index, SIndexMultiTerm* fVals, uint64_t uid) {
}
return 0;
}
int indexSearch(SIndex* index, SIndexMultiTermQuery* multiQuerys, SArray* result) {
int32_t indexSearch(SIndex* index, SIndexMultiTermQuery* multiQuerys, SArray* result) {
EIndexOperatorType opera = multiQuerys->opera; // relation of querys
SArray* iRslts = taosArrayInit(4, POINTER_BYTES);
@ -285,7 +293,7 @@ void indexMultiTermQueryDestroy(SIndexMultiTermQuery* pQuery) {
taosArrayDestroy(pQuery->query);
taosMemoryFree(pQuery);
};
int indexMultiTermQueryAdd(SIndexMultiTermQuery* pQuery, SIndexTerm* term, EIndexQueryType qType) {
int32_t indexMultiTermQueryAdd(SIndexMultiTermQuery* pQuery, SIndexTerm* term, EIndexQueryType qType) {
SIndexTermQuery q = {.qType = qType, .term = term};
taosArrayPush(pQuery->query, &q);
return 0;
@ -332,7 +340,7 @@ void indexTermDestroy(SIndexTerm* p) {
SIndexMultiTerm* indexMultiTermCreate() { return taosArrayInit(4, sizeof(SIndexTerm*)); }
int indexMultiTermAdd(SIndexMultiTerm* terms, SIndexTerm* term) {
int32_t indexMultiTermAdd(SIndexMultiTerm* terms, SIndexTerm* term) {
taosArrayPush(terms, &term);
return 0;
}
@ -392,7 +400,7 @@ bool indexJsonIsRebuild(SIndexJson* idx) {
return ((SIdxStatus)atomic_load_8(&idx->status)) == kRebuild ? true : false;
}
static int idxTermSearch(SIndex* sIdx, SIndexTermQuery* query, SArray** result) {
static int32_t idxTermSearch(SIndex* sIdx, SIndexTermQuery* query, SArray** result) {
SIndexTerm* term = query->term;
const char* colName = term->colName;
int32_t nColName = term->nColName;
@ -404,6 +412,7 @@ static int idxTermSearch(SIndex* sIdx, SIndexTermQuery* query, SArray** result)
ICacheKey key = {
.suid = term->suid, .colName = term->colName, .nColName = strlen(term->colName), .colType = term->colType};
indexDebug("r suid:%" PRIu64 ", colName:%s, colType:%d", key.suid, key.colName, key.colType);
int32_t sz = idxSerialCacheKey(&key, buf);
taosThreadMutexLock(&sIdx->mtx);
@ -412,7 +421,11 @@ static int idxTermSearch(SIndex* sIdx, SIndexTermQuery* query, SArray** result)
taosThreadMutexUnlock(&sIdx->mtx);
*result = taosArrayInit(4, sizeof(uint64_t));
if (*result == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
// TODO: iterator mem and tidex
STermValueType s = kTypeValue;
int64_t st = taosGetTimestampUs();
@ -445,7 +458,7 @@ static int idxTermSearch(SIndex* sIdx, SIndexTermQuery* query, SArray** result)
return 0;
END:
idxTRsltDestroy(tr);
return -1;
return 0;
}
static void idxInterRsltDestroy(SArray* results) {
if (results == NULL) {
@ -460,7 +473,7 @@ static void idxInterRsltDestroy(SArray* results) {
taosArrayDestroy(results);
}
static int idxMergeFinalResults(SArray* in, EIndexOperatorType oType, SArray* out) {
static int32_t idxMergeFinalResults(SArray* in, EIndexOperatorType oType, SArray* out) {
// refactor, merge interResults into fResults by oType
for (int i = 0; i < taosArrayGetSize(in); i++) {
SArray* t = taosArrayGetP(in, i);
@ -527,9 +540,9 @@ static void idxDestroyFinalRslt(SArray* result) {
taosArrayDestroy(result);
}
int idxFlushCacheToTFile(SIndex* sIdx, void* cache, bool quit) {
int32_t idxFlushCacheToTFile(SIndex* sIdx, void* cache, bool quit) {
if (sIdx == NULL) {
return -1;
return TSDB_CODE_INVALID_PTR;
}
indexInfo("suid %" PRIu64 " merge cache into tindex", sIdx->suid);
@ -537,8 +550,9 @@ int idxFlushCacheToTFile(SIndex* sIdx, void* cache, bool quit) {
IndexCache* pCache = (IndexCache*)cache;
while (quit && atomic_load_32(&pCache->merging) == 1)
;
do {
} while (quit && atomic_load_32(&pCache->merging) == 1);
TFileReader* pReader = tfileGetReaderByCol(sIdx->tindex, pCache->suid, pCache->colName);
if (pReader == NULL) {
indexWarn("empty tfile reader found");
@ -568,6 +582,8 @@ int idxFlushCacheToTFile(SIndex* sIdx, void* cache, bool quit) {
bool tn = tfileIter ? tfileIter->next(tfileIter) : false;
SIdxTRslt* tr = idxTRsltCreate();
if (tr == NULL) {
}
while (cn == true || tn == true) {
IterateValue* cv = (cn == true) ? cacheIter->getValue(cacheIter) : NULL;
IterateValue* tv = (tn == true) ? tfileIter->getValue(tfileIter) : NULL;
@ -650,27 +666,30 @@ static int64_t idxGetAvailableVer(SIndex* sIdx, IndexCache* cache) {
tfileReaderUnRef(rd);
return ver;
}
static int idxGenTFile(SIndex* sIdx, IndexCache* cache, SArray* batch) {
static int32_t idxGenTFile(SIndex* sIdx, IndexCache* cache, SArray* batch) {
int32_t code = 0;
int64_t version = idxGetAvailableVer(sIdx, cache);
indexInfo("file name version: %" PRId64 "", version);
uint8_t colType = cache->type;
TFileWriter* tw = tfileWriterOpen(sIdx->path, cache->suid, version, cache->colName, colType);
if (tw == NULL) {
indexError("failed to open file to write");
return -1;
TFileWriter* tw = NULL;
code = tfileWriterOpen(sIdx->path, cache->suid, version, cache->colName, cache->type, &tw);
if (code != 0) {
indexError("failed to open file to write since %s", tstrerror(code));
}
int ret = tfileWriterPut(tw, batch, true);
if (ret != 0) {
indexError("failed to write into tindex ");
code = tfileWriterPut(tw, batch, true);
if (code != 0) {
indexError("failed to write into tindex since %s", tstrerror(code));
goto END;
}
tfileWriterClose(tw);
TFileReader* reader = tfileReaderOpen(sIdx, cache->suid, version, cache->colName);
if (reader == NULL) {
return -1;
TFileReader* reader = NULL;
code = tfileReaderOpen(sIdx, cache->suid, version, cache->colName, &reader);
if (code != 0) {
goto END;
}
indexInfo("success to create tfile, reopen it, %s", reader->ctx->file.buf);
@ -680,16 +699,17 @@ static int idxGenTFile(SIndex* sIdx, IndexCache* cache, SArray* batch) {
ICacheKey key = {.suid = cache->suid, .colName = header->colName, .nColName = strlen(header->colName)};
taosThreadMutexLock(&tf->mtx);
tfileCachePut(tf->cache, &key, reader);
code = tfileCachePut(tf->cache, &key, reader);
taosThreadMutexUnlock(&tf->mtx);
return ret;
return code;
END:
if (tw != NULL) {
idxFileCtxDestroy(tw->ctx, true);
taosMemoryFree(tw);
}
return -1;
return code;
}
int32_t idxSerialCacheKey(ICacheKey* key, char* buf) {

View File

@ -129,6 +129,10 @@ static int32_t cacheSearchCompareFunc(void* cache, SIndexTerm* term, SIdxTRslt*
_cache_range_compare cmpFn = idxGetCompare(type);
CacheTerm* pCt = taosMemoryCalloc(1, sizeof(CacheTerm));
if (pCt == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pCt->colVal = term->colVal;
pCt->colType = term->colType;
pCt->version = atomic_load_64(&pCache->version);
@ -182,6 +186,10 @@ static int32_t cacheSearchTerm_JSON(void* cache, SIndexTerm* term, SIdxTRslt* tr
IndexCache* pCache = mem->pCache;
CacheTerm* pCt = taosMemoryCalloc(1, sizeof(CacheTerm));
if (pCt == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pCt->colVal = term->colVal;
pCt->version = atomic_load_64(&pCache->version);
@ -340,7 +348,8 @@ IndexCache* idxCacheCreate(SIndex* idx, uint64_t suid, const char* colName, int8
cache->mem = idxInternalCacheCreate(type);
cache->mem->pCache = cache;
cache->colName = IDX_TYPE_CONTAIN_EXTERN_TYPE(type, TSDB_DATA_TYPE_JSON) ? taosStrdup(JSON_COLUMN) : taosStrdup(colName);
cache->colName =
IDX_TYPE_CONTAIN_EXTERN_TYPE(type, TSDB_DATA_TYPE_JSON) ? taosStrdup(JSON_COLUMN) : taosStrdup(colName);
cache->type = type;
cache->index = idx;
cache->version = 0;

View File

@ -135,7 +135,7 @@ static FORCE_INLINE int32_t sifGetOperParamNum(EOperatorType ty) {
static FORCE_INLINE int32_t sifValidOp(EOperatorType ty) {
if ((ty >= OP_TYPE_ADD && ty <= OP_TYPE_BIT_OR) || (ty == OP_TYPE_IN || ty == OP_TYPE_NOT_IN) ||
(ty == OP_TYPE_LIKE || ty == OP_TYPE_NOT_LIKE || ty == OP_TYPE_MATCH || ty == OP_TYPE_NMATCH)) {
return -1;
return TSDB_CODE_INVALID_PARA;
}
return 0;
}
@ -209,7 +209,7 @@ static FORCE_INLINE int32_t sifGetValueFromNode(SNode *node, char **value) {
static FORCE_INLINE int32_t sifInitJsonParam(SNode *node, SIFParam *param, SIFCtx *ctx) {
SOperatorNode *nd = (SOperatorNode *)node;
if (nodeType(node) != QUERY_NODE_OPERATOR) {
return -1;
return TSDB_CODE_INVALID_PARA;
}
SColumnNode *l = (SColumnNode *)nd->pLeft;
SValueNode *r = (SValueNode *)nd->pRight;
@ -390,15 +390,7 @@ static int32_t sifInitOperParams(SIFParam **params, SOperatorNode *node, SIFCtx
SIF_ERR_JRET(sifInitParam(node->pLeft, &paramList[0], ctx));
if (nParam > 1) {
// if (sifNeedConvertCond(node->pLeft, node->pRight)) {
// SIF_ERR_JRET(sifInitParamValByCol(node->pLeft, node->pRight, &paramList[1], ctx));
// } else {
SIF_ERR_JRET(sifInitParam(node->pRight, &paramList[1], ctx));
// }
// if (paramList[0].colValType == TSDB_DATA_TYPE_JSON &&
// ((SOperatorNode *)(node))->opType == OP_TYPE_JSON_CONTAINS) {
// return TSDB_CODE_OUT_OF_MEMORY;
//}
}
*params = paramList;
return TSDB_CODE_SUCCESS;
@ -485,7 +477,7 @@ int32_t sifStr2Num(char *buf, int32_t len, int8_t type, void *val) {
if (IS_SIGNED_NUMERIC_TYPE(type)) {
int64_t v = 0;
if (0 != toInteger(buf, len, 10, &v)) {
return -1;
return TSDB_CODE_INVALID_PARA;
}
if (type == TSDB_DATA_TYPE_BIGINT) {
*(int64_t *)val = v;
@ -505,7 +497,7 @@ int32_t sifStr2Num(char *buf, int32_t len, int8_t type, void *val) {
} else if (IS_UNSIGNED_NUMERIC_TYPE(type)) {
uint64_t v = 0;
if (0 != toUInteger(buf, len, 10, &v)) {
return -1;
return TSDB_CODE_INVALID_PARA;
}
if (type == TSDB_DATA_TYPE_UBIGINT) {
*(uint64_t *)val = v;
@ -517,7 +509,7 @@ int32_t sifStr2Num(char *buf, int32_t len, int8_t type, void *val) {
*(uint16_t *)val = v;
}
} else {
return -1;
return TSDB_CODE_INVALID_PARA;
}
return 0;
}
@ -526,7 +518,7 @@ static int32_t sifSetFltParam(SIFParam *left, SIFParam *right, SDataTypeBuf *typ
int32_t code = 0;
int8_t ltype = left->colValType, rtype = right->colValType;
if (!IS_NUMERIC_TYPE(ltype) || !((IS_NUMERIC_TYPE(rtype)) || rtype == TSDB_DATA_TYPE_VARCHAR)) {
return -1;
return TSDB_CODE_INVALID_PARA;
}
if (ltype == TSDB_DATA_TYPE_FLOAT) {
float f = 0;
@ -661,7 +653,7 @@ static int32_t sifDoIndex(SIFParam *left, SIFParam *right, int8_t operType, SIFP
int8_t useIndex = sifShouldUseIndexBasedOnType(left, right);
if (!useIndex) {
output->status = SFLT_NOT_INDEX;
return -1;
return TSDB_CODE_INVALID_PARA;
}
bool reverse = false, equal = false;
@ -682,7 +674,7 @@ static int32_t sifDoIndex(SIFParam *left, SIFParam *right, int8_t operType, SIFP
if (sifSetFltParam(left, right, &typedata, &param) != 0) {
output->status = SFLT_NOT_INDEX;
return -1;
return TSDB_CODE_INVALID_PARA;
}
ret = left->api.metaFilterTableIds(arg->metaEx, &param, output->result);
@ -823,7 +815,7 @@ static FORCE_INLINE int32_t sifGetOperFn(int32_t funcId, sif_func_t *func, SIdxF
}
static int32_t sifExecOper(SOperatorNode *node, SIFCtx *ctx, SIFParam *output) {
int32_t code = -1;
int32_t code = TSDB_CODE_INVALID_PARA;
if (sifValidOp(node->opType) < 0) {
code = TSDB_CODE_QRY_INVALID_INPUT;
ctx->code = code;

View File

@ -279,7 +279,9 @@ void idxFileDestroy(IdxFstFile* cw) {
taosMemoryFree(cw);
}
int idxFileWrite(IdxFstFile* write, uint8_t* buf, uint32_t len) {
int32_t idxFileWrite(IdxFstFile* write, uint8_t* buf, uint32_t len) {
int32_t code = 0;
if (write == NULL) {
return 0;
}
@ -288,7 +290,8 @@ int idxFileWrite(IdxFstFile* write, uint8_t* buf, uint32_t len) {
int nWrite = ctx->write(ctx, buf, len);
ASSERTS(nWrite == len, "index write incomplete data");
if (nWrite != len) {
return -1;
code = TAOS_SYSTEM_ERROR(errno);
return code;
}
write->count += len;
@ -296,7 +299,7 @@ int idxFileWrite(IdxFstFile* write, uint8_t* buf, uint32_t len) {
return len;
}
int idxFileRead(IdxFstFile* write, uint8_t* buf, uint32_t len) {
int32_t idxFileRead(IdxFstFile* write, uint8_t* buf, uint32_t len) {
if (write == NULL) {
return 0;
}

View File

@ -15,11 +15,11 @@
#include "index.h"
#include "indexInt.h"
int indexJsonOpen(SIndexJsonOpts *opts, const char *path, SIndexJson **index) {
int32_t indexJsonOpen(SIndexJsonOpts *opts, const char *path, SIndexJson **index) {
// handle
return indexOpen(opts, path, index);
}
int indexJsonPut(SIndexJson *index, SIndexJsonMultiTerm *terms, uint64_t uid) {
int32_t indexJsonPut(SIndexJson *index, SIndexJsonMultiTerm *terms, uint64_t uid) {
for (int i = 0; i < taosArrayGetSize(terms); i++) {
SIndexJsonTerm *p = taosArrayGetP(terms, i);
if (p->colType == TSDB_DATA_TYPE_BOOL) {
@ -36,7 +36,7 @@ int indexJsonPut(SIndexJson *index, SIndexJsonMultiTerm *terms, uint64_t uid) {
return indexPut(index, terms, uid);
}
int indexJsonSearch(SIndexJson *index, SIndexJsonMultiTermQuery *tq, SArray *result) {
int32_t indexJsonSearch(SIndexJson *index, SIndexJsonMultiTermQuery *tq, SArray *result) {
SArray *terms = tq->query;
for (int i = 0; i < taosArrayGetSize(terms); i++) {
SIndexJsonTerm *p = taosArrayGetP(terms, i);

View File

@ -110,11 +110,14 @@ TFileCache* tfileCacheCreate(SIndex* idx, const char* path) {
}
ctx->lru = idx->lru;
TFileReader* reader = tfileReaderCreate(ctx);
if (reader == NULL) {
indexInfo("skip invalid file: %s", file);
TFileReader* reader = NULL;
int32_t code = tfileReaderCreate(ctx, &reader);
if (code != 0) {
indexInfo("skip invalid file: %s since %s", file, tstrerror(code));
continue;
}
reader->lru = idx->lru;
TFileHeader* header = &reader->header;
@ -160,9 +163,12 @@ TFileReader* tfileCacheGet(TFileCache* tcache, ICacheKey* key) {
return *reader;
}
void tfileCachePut(TFileCache* tcache, ICacheKey* key, TFileReader* reader) {
char buf[128] = {0};
int32_t sz = idxSerialCacheKey(key, buf);
int32_t tfileCachePut(TFileCache* tcache, ICacheKey* key, TFileReader* reader) {
int32_t code = 0;
char buf[128] = {0};
int32_t sz = idxSerialCacheKey(key, buf);
TFileReader** p = taosHashGet(tcache->tableCache, buf, sz);
if (p != NULL && *p != NULL) {
TFileReader* oldRdr = *p;
@ -171,39 +177,44 @@ void tfileCachePut(TFileCache* tcache, ICacheKey* key, TFileReader* reader) {
oldRdr->remove = true;
tfileReaderUnRef(oldRdr);
}
taosHashPut(tcache->tableCache, buf, sz, &reader, sizeof(void*));
tfileReaderRef(reader);
return;
code = taosHashPut(tcache->tableCache, buf, sz, &reader, sizeof(void*));
if (code == 0) {
tfileReaderRef(reader);
}
return code;
}
TFileReader* tfileReaderCreate(IFileCtx* ctx) {
int32_t tfileReaderCreate(IFileCtx* ctx, TFileReader** pReader) {
int32_t code = 0;
TFileReader* reader = taosMemoryCalloc(1, sizeof(TFileReader));
if (reader == NULL) {
return NULL;
return TSDB_CODE_OUT_OF_MEMORY;
}
reader->ctx = ctx;
reader->remove = false;
if (0 != tfileReaderVerify(reader)) {
if ((code = tfileReaderVerify(reader)) != 0) {
indexError("invalid tfile, suid:%" PRIu64 ", colName:%s", reader->header.suid, reader->header.colName);
tfileReaderDestroy(reader);
return NULL;
TAOS_CHECK_GOTO(code, NULL, _End);
}
if (0 != tfileReaderLoadHeader(reader)) {
if ((code = tfileReaderLoadHeader(reader)) != 0) {
indexError("failed to load index header, suid:%" PRIu64 ", colName:%s", reader->header.suid,
reader->header.colName);
tfileReaderDestroy(reader);
return NULL;
TAOS_CHECK_GOTO(code, NULL, _End);
}
if (0 != tfileReaderLoadFst(reader)) {
if ((code = tfileReaderLoadFst(reader)) != 0) {
indexError("failed to load index fst, suid:%" PRIu64 ", colName:%s, code:0x%x", reader->header.suid,
reader->header.colName, errno);
tfileReaderDestroy(reader);
return NULL;
TAOS_CHECK_GOTO(code, NULL, _End);
}
return reader;
*pReader = reader;
return code;
_End:
tfileReaderDestroy(reader);
return code;
}
void tfileReaderDestroy(TFileReader* reader) {
if (reader == NULL) {
@ -479,9 +490,10 @@ static int32_t tfSearchCompareFunc_JSON(void* reader, SIndexTerm* tem, SIdxTRslt
return TSDB_CODE_SUCCESS;
}
int tfileReaderSearch(TFileReader* reader, SIndexTermQuery* query, SIdxTRslt* tr) {
int ret = 0;
SIndexTerm* term = query->term;
EIndexQueryType qtype = query->qType;
int ret = 0;
if (IDX_TYPE_CONTAIN_EXTERN_TYPE(term->colType, TSDB_DATA_TYPE_JSON)) {
ret = tfSearch[1][qtype](reader, term, tr);
} else {
@ -492,12 +504,15 @@ int tfileReaderSearch(TFileReader* reader, SIndexTermQuery* query, SIdxTRslt* tr
return ret;
}
TFileWriter* tfileWriterOpen(char* path, uint64_t suid, int64_t version, const char* colName, uint8_t colType) {
char fullname[256] = {0};
int32_t tfileWriterOpen(char* path, uint64_t suid, int64_t version, const char* colName, uint8_t colType,
TFileWriter** pWriter) {
int32_t code = 0;
char fullname[256] = {0};
tfileGenFileFullName(fullname, path, suid, colName, version);
IFileCtx* wcx = idxFileCtxCreate(TFILE, fullname, false, 1024 * 1024 * 64);
if (wcx == NULL) {
return NULL;
return TSDB_CODE_OUT_OF_MEMORY;
}
TFileHeader tfh = {0};
@ -508,34 +523,39 @@ TFileWriter* tfileWriterOpen(char* path, uint64_t suid, int64_t version, const c
memcpy(tfh.colName, colName, strlen(colName));
}
return tfileWriterCreate(wcx, &tfh);
return tfileWriterCreate(wcx, &tfh, pWriter);
}
TFileReader* tfileReaderOpen(SIndex* idx, uint64_t suid, int64_t version, const char* colName) {
char fullname[256] = {0};
int32_t tfileReaderOpen(SIndex* idx, uint64_t suid, int64_t version, const char* colName, TFileReader** pReader) {
int32_t code = 0;
char fullname[256] = {0};
tfileGenFileFullName(fullname, idx->path, suid, colName, version);
IFileCtx* wc = idxFileCtxCreate(TFILE, fullname, true, 1024 * 1024 * 1024);
if (wc == NULL) {
terrno = TAOS_SYSTEM_ERROR(errno);
indexError("failed to open readonly file: %s, reason: %s", fullname, terrstr());
return NULL;
code = TAOS_SYSTEM_ERROR(errno);
indexError("failed to open readonly file: %s, reason: %s", fullname, tstrerror(code));
return code;
}
wc->lru = idx->lru;
indexTrace("open read file name:%s, file size: %" PRId64 "", wc->file.buf, wc->file.size);
TFileReader* reader = tfileReaderCreate(wc);
return reader;
return tfileReaderCreate(wc, pReader);
}
TFileWriter* tfileWriterCreate(IFileCtx* ctx, TFileHeader* header) {
int32_t tfileWriterCreate(IFileCtx* ctx, TFileHeader* header, TFileWriter** pWriter) {
int32_t code = 0;
TFileWriter* tw = taosMemoryCalloc(1, sizeof(TFileWriter));
if (tw == NULL) {
indexError("index: %" PRIu64 " failed to alloc TFilerWriter", header->suid);
return NULL;
code = TSDB_CODE_OUT_OF_MEMORY;
indexError("index: %" PRIu64 " failed to alloc TFilerWriter since %s", header->suid, tstrerror(code));
return code;
}
tw->ctx = ctx;
tw->header = *header;
tfileWriteHeader(tw);
return tw;
*pWriter = tw;
return code;
}
int tfileWriterPut(TFileWriter* tw, void* data, bool order) {
@ -545,8 +565,8 @@ int tfileWriterPut(TFileWriter* tw, void* data, bool order) {
int8_t colType = tw->header.colType;
colType = IDX_TYPE_GET_TYPE(colType);
if (colType == TSDB_DATA_TYPE_BINARY || colType == TSDB_DATA_TYPE_VARBINARY ||
colType == TSDB_DATA_TYPE_NCHAR || colType == TSDB_DATA_TYPE_GEOMETRY) {
if (colType == TSDB_DATA_TYPE_BINARY || colType == TSDB_DATA_TYPE_VARBINARY || colType == TSDB_DATA_TYPE_NCHAR ||
colType == TSDB_DATA_TYPE_GEOMETRY) {
fn = tfileStrCompare;
} else {
fn = getComparFunc(colType, 0);
@ -570,6 +590,9 @@ int tfileWriterPut(TFileWriter* tw, void* data, bool order) {
int32_t cap = 4 * 1024;
char* buf = taosMemoryCalloc(1, cap);
if (buf == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
for (size_t i = 0; i < sz; i++) {
TFileValue* v = taosArrayGetP((SArray*)data, i);
@ -584,7 +607,7 @@ int tfileWriterPut(TFileWriter* tw, void* data, bool order) {
char* t = (char*)taosMemoryRealloc(buf, cap);
if (t == NULL) {
taosMemoryFree(buf);
return -1;
return TSDB_CODE_OUT_OF_MEMORY;
}
buf = t;
}
@ -664,7 +687,7 @@ void idxTFileDestroy(IndexTFile* tfile) {
int idxTFileSearch(void* tfile, SIndexTermQuery* query, SIdxTRslt* result) {
int ret = -1;
if (tfile == NULL) {
return ret;
return TSDB_CODE_INVALID_DATA_FMT;
}
int64_t st = taosGetTimestampUs();

View File

@ -160,10 +160,18 @@ int verdataCompare(const void *a, const void *b) {
SIdxTRslt *idxTRsltCreate() {
SIdxTRslt *tr = taosMemoryCalloc(1, sizeof(SIdxTRslt));
if (tr == NULL) {
return NULL;
}
tr->total = taosArrayInit(4, sizeof(uint64_t));
tr->add = taosArrayInit(4, sizeof(uint64_t));
tr->del = taosArrayInit(4, sizeof(uint64_t));
if (tr->total == NULL || tr->add == NULL || tr->del == NULL) {
idxTRsltClear(tr);
tr = NULL;
}
return tr;
}
void idxTRsltClear(SIdxTRslt *tr) {

View File

@ -607,7 +607,8 @@ void validateTFile(char* arg) {
// std::vector<std::thread> threads;
SIndex* index = (SIndex*)taosMemoryCalloc(1, sizeof(SIndex));
index->path = taosStrdup(arg);
TFileReader* reader = tfileReaderOpen(index, 0, 20000000, "tag1");
TFileReader* reader = NULL;
int32_t code = tfileReaderOpen(index, 0, 20000000, "tag1", &reader);
for (int i = 0; i < NUM_OF_THREAD; i++) {
threads[i] = std::thread(fst_get, reader->fst);
@ -626,7 +627,8 @@ void iterTFileReader(char* path, char* uid, char* colName, char* ver) {
uint64_t suid = atoi(uid);
int version = atoi(ver);
TFileReader* reader = tfileReaderOpen(NULL, suid, version, colName);
TFileReader* reader = NULL;
int32_t code = tfileReaderOpen(NULL, suid, version, colName, &reader);
Iterate* iter = tfileIteratorCreate(reader);
bool tn = iter ? iter->next(iter) : false;

View File

@ -294,7 +294,7 @@ class IndexEnv : public ::testing::Test {
taosRemoveDir(path);
SIndexOpts opts;
opts.cacheSize = 1024 * 1024 * 4;
int ret = indexOpen(&opts, path, &index);
int32_t ret = indexOpen(&opts, path, &index);
assert(ret == 0);
}
virtual void TearDown() { indexClose(index); }
@ -392,13 +392,13 @@ class TFileObj {
IFileCtx* ctx = idxFileCtxCreate(TFILE, path.c_str(), false, 64 * 1024 * 1024);
ctx->lru = taosLRUCacheInit(1024 * 1024 * 4, -1, .5);
writer_ = tfileWriterCreate(ctx, &header);
int32_t code = tfileWriterCreate(ctx, &header, &writer_);
return writer_ != NULL ? true : false;
}
bool InitReader() {
IFileCtx* ctx = idxFileCtxCreate(TFILE, fileName_.c_str(), true, 64 * 1024 * 1024);
ctx->lru = taosLRUCacheInit(1024 * 1024 * 4, -1, .5);
reader_ = tfileReaderCreate(ctx);
int32_t code = tfileReaderCreate(ctx, &reader_);
return reader_ != NULL ? true : false;
}
int Get(SIndexTermQuery* query, SArray* result) {
@ -701,7 +701,7 @@ class IndexObj {
SIndexOpts opts;
opts.cacheSize = 1024 * 1024 * 4;
int ret = indexOpen(&opts, dir.c_str(), &idx);
int32_t ret = indexOpen(&opts, dir.c_str(), &idx);
if (ret != 0) {
// opt
std::cout << "failed to open index: %s" << dir << std::endl;