refactor index

This commit is contained in:
Yihao Deng 2024-07-23 06:59:09 +00:00
parent e708c1fbb4
commit ce5f9fa9f3
10 changed files with 177 additions and 113 deletions

View File

@ -83,7 +83,7 @@ void indexMultiTermQueryDestroy(SIndexMultiTermQuery* pQuery);
* @param type (input, single query type)
* @return error code
*/
int indexMultiTermQueryAdd(SIndexMultiTermQuery* pQuery, SIndexTerm* term, EIndexQueryType type);
int32_t indexMultiTermQueryAdd(SIndexMultiTermQuery* pQuery, SIndexTerm* term, EIndexQueryType type);
/*
* open index
* @param opt (input, index opt)
@ -91,7 +91,7 @@ int indexMultiTermQueryAdd(SIndexMultiTermQuery* pQuery, SIndexTerm* term, EInde
* @param index (output, index object)
* @return error code
*/
int indexOpen(SIndexOpts* opt, const char* path, SIndex** index);
int32_t indexOpen(SIndexOpts* opt, const char* path, SIndex** index);
/*
* close index
* @param index (input, index to be closed)
@ -106,7 +106,7 @@ void indexClose(SIndex* index);
* @param uid (input, uid of terms)
* @return error code
*/
int indexPut(SIndex* index, SIndexMultiTerm* terms, uint64_t uid);
int32_t indexPut(SIndex* index, SIndexMultiTerm* terms, uint64_t uid);
/*
* delete terms that meet query condition
* @param index (input, index object)
@ -114,7 +114,7 @@ int indexPut(SIndex* index, SIndexMultiTerm* terms, uint64_t uid);
* @return error code
*/
int indexDelete(SIndex* index, SIndexMultiTermQuery* query);
int32_t indexDelete(SIndex* index, SIndexMultiTermQuery* query);
/*
* search index
* @param index (input, index object)
@ -122,7 +122,7 @@ int indexDelete(SIndex* index, SIndexMultiTermQuery* query);
* @param result(output, query result)
* @return error code
*/
int indexSearch(SIndex* index, SIndexMultiTermQuery* query, SArray* result);
int32_t indexSearch(SIndex* index, SIndexMultiTermQuery* query, SArray* result);
/*
* rebuild index
* @param index (input, index object)
@ -138,7 +138,7 @@ int indexSearch(SIndex* index, SIndexMultiTermQuery* query, SArray* result);
* @param index (output, index json object)
* @return error code
*/
int indexJsonOpen(SIndexJsonOpts* opts, const char* path, SIndexJson** index);
int32_t indexJsonOpen(SIndexJsonOpts* opts, const char* path, SIndexJson** index);
/*
* close index
* @param index (input, index to be closed)
@ -154,7 +154,7 @@ void indexJsonClose(SIndexJson* index);
* @param uid (input, uid of terms)
* @return error code
*/
int indexJsonPut(SIndexJson* index, SIndexJsonMultiTerm* terms, uint64_t uid);
int32_t indexJsonPut(SIndexJson* index, SIndexJsonMultiTerm* terms, uint64_t uid);
/*
* search index
* @param index (input, index object)
@ -163,7 +163,7 @@ int indexJsonPut(SIndexJson* index, SIndexJsonMultiTerm* terms, uint64_t uid);
* @return error code
*/
int indexJsonSearch(SIndexJson* index, SIndexJsonMultiTermQuery* query, SArray* result);
int32_t indexJsonSearch(SIndexJson* index, SIndexJsonMultiTermQuery* query, SArray* result);
/*
* @param
* @param

View File

@ -78,11 +78,11 @@ typedef struct IdxFstFile {
CheckSummer summer;
} IdxFstFile;
int idxFileWrite(IdxFstFile* write, uint8_t* buf, uint32_t len);
int32_t idxFileWrite(IdxFstFile* write, uint8_t* buf, uint32_t len);
int idxFileRead(IdxFstFile* write, uint8_t* buf, uint32_t len);
int32_t idxFileRead(IdxFstFile* write, uint8_t* buf, uint32_t len);
int idxFileFlush(IdxFstFile* write);
int32_t idxFileFlush(IdxFstFile* write);
uint32_t idxFileMaskedCheckSum(IdxFstFile* write);

View File

@ -99,20 +99,21 @@ typedef struct TFileReaderOpt {
TFileCache* tfileCacheCreate(SIndex* idx, const char* path);
void tfileCacheDestroy(TFileCache* tcache);
TFileReader* tfileCacheGet(TFileCache* tcache, ICacheKey* key);
void tfileCachePut(TFileCache* tcache, ICacheKey* key, TFileReader* reader);
int32_t tfileCachePut(TFileCache* tcache, ICacheKey* key, TFileReader* reader);
TFileReader* tfileGetReaderByCol(IndexTFile* tf, uint64_t suid, char* colName);
TFileReader* tfileReaderOpen(SIndex* idx, uint64_t suid, int64_t version, const char* colName);
TFileReader* tfileReaderCreate(IFileCtx* ctx);
int32_t tfileReaderOpen(SIndex* idx, uint64_t suid, int64_t version, const char* colName, TFileReader** pReader);
int32_t tfileReaderCreate(IFileCtx* ctx, TFileReader** pReader);
void tfileReaderDestroy(TFileReader* reader);
int tfileReaderSearch(TFileReader* reader, SIndexTermQuery* query, SIdxTRslt* tr);
void tfileReaderRef(TFileReader* reader);
void tfileReaderUnRef(TFileReader* reader);
TFileWriter* tfileWriterOpen(char* path, uint64_t suid, int64_t version, const char* colName, uint8_t type);
int32_t tfileWriterOpen(char* path, uint64_t suid, int64_t version, const char* colName, uint8_t type,
TFileWriter** pWriter);
void tfileWriterClose(TFileWriter* tw);
TFileWriter* tfileWriterCreate(IFileCtx* ctx, TFileHeader* header);
int32_t tfileWriterCreate(IFileCtx* ctx, TFileHeader* header, TFileWriter** pWriter);
void tfileWriterDestroy(TFileWriter* tw);
int tfileWriterPut(TFileWriter* tw, void* data, bool order);
int tfileWriterFinish(TFileWriter* tw);

View File

@ -87,9 +87,9 @@ static TdThreadOnce isInit = PTHREAD_ONCE_INIT;
static int idxTermSearch(SIndex* sIdx, SIndexTermQuery* term, SArray** result);
static void idxInterRsltDestroy(SArray* results);
static int idxMergeFinalResults(SArray* in, EIndexOperatorType oType, SArray* out);
static int32_t idxMergeFinalResults(SArray* in, EIndexOperatorType oType, SArray* out);
static int idxGenTFile(SIndex* index, IndexCache* cache, SArray* batch);
static int32_t idxGenTFile(SIndex* index, IndexCache* cache, SArray* batch);
// merge cache and tfile by opera type
static void idxMergeCacheAndTFile(SArray* result, IterateValue* icache, IterateValue* iTfv, SIdxTRslt* helper);
@ -106,30 +106,37 @@ static void indexWait(void* idx) {
tsem_wait(&pIdx->sem);
}
int indexOpen(SIndexOpts* opts, const char* path, SIndex** index) {
int ret = TSDB_CODE_SUCCESS;
int32_t indexOpen(SIndexOpts* opts, const char* path, SIndex** index) {
taosThreadOnce(&isInit, indexEnvInit);
int code = TSDB_CODE_SUCCESS;
SIndex* idx = taosMemoryCalloc(1, sizeof(SIndex));
if (idx == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, END);
}
idx->lru = taosLRUCacheInit(opts->cacheSize, -1, .5);
if (idx->lru == NULL) {
ret = TSDB_CODE_OUT_OF_MEMORY;
goto END;
TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, END);
}
taosLRUCacheSetStrictCapacity(idx->lru, false);
idx->tindex = idxTFileCreate(idx, path);
if (idx->tindex == NULL) {
ret = TSDB_CODE_OUT_OF_MEMORY;
goto END;
TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, END);
}
idx->colObj = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
if (idx->colObj == NULL) {
TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, END);
}
idx->version = 1;
idx->path = taosStrdup(path);
if (idx->path == NULL) {
TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, END);
}
taosThreadMutexInit(&idx->mtx, NULL);
tsem_init(&idx->sem, 0, 0);
@ -138,17 +145,18 @@ int indexOpen(SIndexOpts* opts, const char* path, SIndex** index) {
idxAcquireRef(idx->refId);
*index = idx;
return ret;
return code;
END:
if (idx != NULL) {
indexDestroy(idx);
}
*index = NULL;
return ret;
return code;
}
void indexDestroy(void* handle) {
if (handle == NULL) return;
SIndex* idx = handle;
taosThreadMutexDestroy(&idx->mtx);
tsem_destroy(&idx->sem);
@ -202,7 +210,7 @@ void idxReleaseRef(int64_t ref) {
taosReleaseRef(indexRefMgt, ref);
}
int indexPut(SIndex* index, SIndexMultiTerm* fVals, uint64_t uid) {
int32_t indexPut(SIndex* index, SIndexMultiTerm* fVals, uint64_t uid) {
// TODO(yihao): reduce the lock range
taosThreadMutexLock(&index->mtx);
for (int i = 0; i < taosArrayGetSize(fVals); i++) {
@ -239,7 +247,7 @@ int indexPut(SIndex* index, SIndexMultiTerm* fVals, uint64_t uid) {
}
return 0;
}
int indexSearch(SIndex* index, SIndexMultiTermQuery* multiQuerys, SArray* result) {
int32_t indexSearch(SIndex* index, SIndexMultiTermQuery* multiQuerys, SArray* result) {
EIndexOperatorType opera = multiQuerys->opera; // relation of querys
SArray* iRslts = taosArrayInit(4, POINTER_BYTES);
@ -285,7 +293,7 @@ void indexMultiTermQueryDestroy(SIndexMultiTermQuery* pQuery) {
taosArrayDestroy(pQuery->query);
taosMemoryFree(pQuery);
};
int indexMultiTermQueryAdd(SIndexMultiTermQuery* pQuery, SIndexTerm* term, EIndexQueryType qType) {
int32_t indexMultiTermQueryAdd(SIndexMultiTermQuery* pQuery, SIndexTerm* term, EIndexQueryType qType) {
SIndexTermQuery q = {.qType = qType, .term = term};
taosArrayPush(pQuery->query, &q);
return 0;
@ -332,7 +340,7 @@ void indexTermDestroy(SIndexTerm* p) {
SIndexMultiTerm* indexMultiTermCreate() { return taosArrayInit(4, sizeof(SIndexTerm*)); }
int indexMultiTermAdd(SIndexMultiTerm* terms, SIndexTerm* term) {
int32_t indexMultiTermAdd(SIndexMultiTerm* terms, SIndexTerm* term) {
taosArrayPush(terms, &term);
return 0;
}
@ -392,7 +400,7 @@ bool indexJsonIsRebuild(SIndexJson* idx) {
return ((SIdxStatus)atomic_load_8(&idx->status)) == kRebuild ? true : false;
}
static int idxTermSearch(SIndex* sIdx, SIndexTermQuery* query, SArray** result) {
static int32_t idxTermSearch(SIndex* sIdx, SIndexTermQuery* query, SArray** result) {
SIndexTerm* term = query->term;
const char* colName = term->colName;
int32_t nColName = term->nColName;
@ -404,6 +412,7 @@ static int idxTermSearch(SIndex* sIdx, SIndexTermQuery* query, SArray** result)
ICacheKey key = {
.suid = term->suid, .colName = term->colName, .nColName = strlen(term->colName), .colType = term->colType};
indexDebug("r suid:%" PRIu64 ", colName:%s, colType:%d", key.suid, key.colName, key.colType);
int32_t sz = idxSerialCacheKey(&key, buf);
taosThreadMutexLock(&sIdx->mtx);
@ -412,7 +421,11 @@ static int idxTermSearch(SIndex* sIdx, SIndexTermQuery* query, SArray** result)
taosThreadMutexUnlock(&sIdx->mtx);
*result = taosArrayInit(4, sizeof(uint64_t));
if (*result == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
// TODO: iterator mem and tidex
STermValueType s = kTypeValue;
int64_t st = taosGetTimestampUs();
@ -445,7 +458,7 @@ static int idxTermSearch(SIndex* sIdx, SIndexTermQuery* query, SArray** result)
return 0;
END:
idxTRsltDestroy(tr);
return -1;
return 0;
}
static void idxInterRsltDestroy(SArray* results) {
if (results == NULL) {
@ -460,7 +473,7 @@ static void idxInterRsltDestroy(SArray* results) {
taosArrayDestroy(results);
}
static int idxMergeFinalResults(SArray* in, EIndexOperatorType oType, SArray* out) {
static int32_t idxMergeFinalResults(SArray* in, EIndexOperatorType oType, SArray* out) {
// refactor, merge interResults into fResults by oType
for (int i = 0; i < taosArrayGetSize(in); i++) {
SArray* t = taosArrayGetP(in, i);
@ -527,9 +540,9 @@ static void idxDestroyFinalRslt(SArray* result) {
taosArrayDestroy(result);
}
int idxFlushCacheToTFile(SIndex* sIdx, void* cache, bool quit) {
int32_t idxFlushCacheToTFile(SIndex* sIdx, void* cache, bool quit) {
if (sIdx == NULL) {
return -1;
return TSDB_CODE_INVALID_PTR;
}
indexInfo("suid %" PRIu64 " merge cache into tindex", sIdx->suid);
@ -537,8 +550,9 @@ int idxFlushCacheToTFile(SIndex* sIdx, void* cache, bool quit) {
IndexCache* pCache = (IndexCache*)cache;
while (quit && atomic_load_32(&pCache->merging) == 1)
;
do {
} while (quit && atomic_load_32(&pCache->merging) == 1);
TFileReader* pReader = tfileGetReaderByCol(sIdx->tindex, pCache->suid, pCache->colName);
if (pReader == NULL) {
indexWarn("empty tfile reader found");
@ -568,6 +582,8 @@ int idxFlushCacheToTFile(SIndex* sIdx, void* cache, bool quit) {
bool tn = tfileIter ? tfileIter->next(tfileIter) : false;
SIdxTRslt* tr = idxTRsltCreate();
if (tr == NULL) {
}
while (cn == true || tn == true) {
IterateValue* cv = (cn == true) ? cacheIter->getValue(cacheIter) : NULL;
IterateValue* tv = (tn == true) ? tfileIter->getValue(tfileIter) : NULL;
@ -650,27 +666,30 @@ static int64_t idxGetAvailableVer(SIndex* sIdx, IndexCache* cache) {
tfileReaderUnRef(rd);
return ver;
}
static int idxGenTFile(SIndex* sIdx, IndexCache* cache, SArray* batch) {
static int32_t idxGenTFile(SIndex* sIdx, IndexCache* cache, SArray* batch) {
int32_t code = 0;
int64_t version = idxGetAvailableVer(sIdx, cache);
indexInfo("file name version: %" PRId64 "", version);
uint8_t colType = cache->type;
TFileWriter* tw = tfileWriterOpen(sIdx->path, cache->suid, version, cache->colName, colType);
if (tw == NULL) {
indexError("failed to open file to write");
return -1;
TFileWriter* tw = NULL;
code = tfileWriterOpen(sIdx->path, cache->suid, version, cache->colName, cache->type, &tw);
if (code != 0) {
indexError("failed to open file to write since %s", tstrerror(code));
}
int ret = tfileWriterPut(tw, batch, true);
if (ret != 0) {
indexError("failed to write into tindex ");
code = tfileWriterPut(tw, batch, true);
if (code != 0) {
indexError("failed to write into tindex since %s", tstrerror(code));
goto END;
}
tfileWriterClose(tw);
TFileReader* reader = tfileReaderOpen(sIdx, cache->suid, version, cache->colName);
if (reader == NULL) {
return -1;
TFileReader* reader = NULL;
code = tfileReaderOpen(sIdx, cache->suid, version, cache->colName, &reader);
if (code != 0) {
goto END;
}
indexInfo("success to create tfile, reopen it, %s", reader->ctx->file.buf);
@ -680,16 +699,17 @@ static int idxGenTFile(SIndex* sIdx, IndexCache* cache, SArray* batch) {
ICacheKey key = {.suid = cache->suid, .colName = header->colName, .nColName = strlen(header->colName)};
taosThreadMutexLock(&tf->mtx);
tfileCachePut(tf->cache, &key, reader);
code = tfileCachePut(tf->cache, &key, reader);
taosThreadMutexUnlock(&tf->mtx);
return ret;
return code;
END:
if (tw != NULL) {
idxFileCtxDestroy(tw->ctx, true);
taosMemoryFree(tw);
}
return -1;
return code;
}
int32_t idxSerialCacheKey(ICacheKey* key, char* buf) {

View File

@ -129,6 +129,10 @@ static int32_t cacheSearchCompareFunc(void* cache, SIndexTerm* term, SIdxTRslt*
_cache_range_compare cmpFn = idxGetCompare(type);
CacheTerm* pCt = taosMemoryCalloc(1, sizeof(CacheTerm));
if (pCt == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pCt->colVal = term->colVal;
pCt->colType = term->colType;
pCt->version = atomic_load_64(&pCache->version);
@ -182,6 +186,10 @@ static int32_t cacheSearchTerm_JSON(void* cache, SIndexTerm* term, SIdxTRslt* tr
IndexCache* pCache = mem->pCache;
CacheTerm* pCt = taosMemoryCalloc(1, sizeof(CacheTerm));
if (pCt == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pCt->colVal = term->colVal;
pCt->version = atomic_load_64(&pCache->version);
@ -340,7 +348,8 @@ IndexCache* idxCacheCreate(SIndex* idx, uint64_t suid, const char* colName, int8
cache->mem = idxInternalCacheCreate(type);
cache->mem->pCache = cache;
cache->colName = IDX_TYPE_CONTAIN_EXTERN_TYPE(type, TSDB_DATA_TYPE_JSON) ? taosStrdup(JSON_COLUMN) : taosStrdup(colName);
cache->colName =
IDX_TYPE_CONTAIN_EXTERN_TYPE(type, TSDB_DATA_TYPE_JSON) ? taosStrdup(JSON_COLUMN) : taosStrdup(colName);
cache->type = type;
cache->index = idx;
cache->version = 0;

View File

@ -279,7 +279,9 @@ void idxFileDestroy(IdxFstFile* cw) {
taosMemoryFree(cw);
}
int idxFileWrite(IdxFstFile* write, uint8_t* buf, uint32_t len) {
int32_t idxFileWrite(IdxFstFile* write, uint8_t* buf, uint32_t len) {
int32_t code = 0;
if (write == NULL) {
return 0;
}
@ -288,7 +290,8 @@ int idxFileWrite(IdxFstFile* write, uint8_t* buf, uint32_t len) {
int nWrite = ctx->write(ctx, buf, len);
ASSERTS(nWrite == len, "index write incomplete data");
if (nWrite != len) {
return -1;
code = TAOS_SYSTEM_ERROR(errno);
return code;
}
write->count += len;
@ -296,7 +299,7 @@ int idxFileWrite(IdxFstFile* write, uint8_t* buf, uint32_t len) {
return len;
}
int idxFileRead(IdxFstFile* write, uint8_t* buf, uint32_t len) {
int32_t idxFileRead(IdxFstFile* write, uint8_t* buf, uint32_t len) {
if (write == NULL) {
return 0;
}

View File

@ -15,11 +15,11 @@
#include "index.h"
#include "indexInt.h"
int indexJsonOpen(SIndexJsonOpts *opts, const char *path, SIndexJson **index) {
int32_t indexJsonOpen(SIndexJsonOpts *opts, const char *path, SIndexJson **index) {
// handle
return indexOpen(opts, path, index);
}
int indexJsonPut(SIndexJson *index, SIndexJsonMultiTerm *terms, uint64_t uid) {
int32_t indexJsonPut(SIndexJson *index, SIndexJsonMultiTerm *terms, uint64_t uid) {
for (int i = 0; i < taosArrayGetSize(terms); i++) {
SIndexJsonTerm *p = taosArrayGetP(terms, i);
if (p->colType == TSDB_DATA_TYPE_BOOL) {
@ -36,7 +36,7 @@ int indexJsonPut(SIndexJson *index, SIndexJsonMultiTerm *terms, uint64_t uid) {
return indexPut(index, terms, uid);
}
int indexJsonSearch(SIndexJson *index, SIndexJsonMultiTermQuery *tq, SArray *result) {
int32_t indexJsonSearch(SIndexJson *index, SIndexJsonMultiTermQuery *tq, SArray *result) {
SArray *terms = tq->query;
for (int i = 0; i < taosArrayGetSize(terms); i++) {
SIndexJsonTerm *p = taosArrayGetP(terms, i);

View File

@ -110,11 +110,14 @@ TFileCache* tfileCacheCreate(SIndex* idx, const char* path) {
}
ctx->lru = idx->lru;
TFileReader* reader = tfileReaderCreate(ctx);
if (reader == NULL) {
indexInfo("skip invalid file: %s", file);
TFileReader* reader = NULL;
int32_t code = tfileReaderCreate(ctx, &reader);
if (code != 0) {
indexInfo("skip invalid file: %s since %s", file, tstrerror(code));
continue;
}
reader->lru = idx->lru;
TFileHeader* header = &reader->header;
@ -160,9 +163,12 @@ TFileReader* tfileCacheGet(TFileCache* tcache, ICacheKey* key) {
return *reader;
}
void tfileCachePut(TFileCache* tcache, ICacheKey* key, TFileReader* reader) {
int32_t tfileCachePut(TFileCache* tcache, ICacheKey* key, TFileReader* reader) {
int32_t code = 0;
char buf[128] = {0};
int32_t sz = idxSerialCacheKey(key, buf);
TFileReader** p = taosHashGet(tcache->tableCache, buf, sz);
if (p != NULL && *p != NULL) {
TFileReader* oldRdr = *p;
@ -171,39 +177,44 @@ void tfileCachePut(TFileCache* tcache, ICacheKey* key, TFileReader* reader) {
oldRdr->remove = true;
tfileReaderUnRef(oldRdr);
}
taosHashPut(tcache->tableCache, buf, sz, &reader, sizeof(void*));
code = taosHashPut(tcache->tableCache, buf, sz, &reader, sizeof(void*));
if (code == 0) {
tfileReaderRef(reader);
return;
}
return code;
}
TFileReader* tfileReaderCreate(IFileCtx* ctx) {
int32_t tfileReaderCreate(IFileCtx* ctx, TFileReader** pReader) {
int32_t code = 0;
TFileReader* reader = taosMemoryCalloc(1, sizeof(TFileReader));
if (reader == NULL) {
return NULL;
return TSDB_CODE_OUT_OF_MEMORY;
}
reader->ctx = ctx;
reader->remove = false;
if (0 != tfileReaderVerify(reader)) {
if ((code = tfileReaderVerify(reader)) != 0) {
indexError("invalid tfile, suid:%" PRIu64 ", colName:%s", reader->header.suid, reader->header.colName);
tfileReaderDestroy(reader);
return NULL;
TAOS_CHECK_GOTO(code, NULL, _End);
}
if (0 != tfileReaderLoadHeader(reader)) {
if ((code = tfileReaderLoadHeader(reader)) != 0) {
indexError("failed to load index header, suid:%" PRIu64 ", colName:%s", reader->header.suid,
reader->header.colName);
tfileReaderDestroy(reader);
return NULL;
TAOS_CHECK_GOTO(code, NULL, _End);
}
if (0 != tfileReaderLoadFst(reader)) {
if ((code = tfileReaderLoadFst(reader)) != 0) {
indexError("failed to load index fst, suid:%" PRIu64 ", colName:%s, code:0x%x", reader->header.suid,
reader->header.colName, errno);
tfileReaderDestroy(reader);
return NULL;
TAOS_CHECK_GOTO(code, NULL, _End);
}
return reader;
*pReader = reader;
return code;
_End:
tfileReaderDestroy(reader);
return code;
}
void tfileReaderDestroy(TFileReader* reader) {
if (reader == NULL) {
@ -479,9 +490,10 @@ static int32_t tfSearchCompareFunc_JSON(void* reader, SIndexTerm* tem, SIdxTRslt
return TSDB_CODE_SUCCESS;
}
int tfileReaderSearch(TFileReader* reader, SIndexTermQuery* query, SIdxTRslt* tr) {
int ret = 0;
SIndexTerm* term = query->term;
EIndexQueryType qtype = query->qType;
int ret = 0;
if (IDX_TYPE_CONTAIN_EXTERN_TYPE(term->colType, TSDB_DATA_TYPE_JSON)) {
ret = tfSearch[1][qtype](reader, term, tr);
} else {
@ -492,12 +504,15 @@ int tfileReaderSearch(TFileReader* reader, SIndexTermQuery* query, SIdxTRslt* tr
return ret;
}
TFileWriter* tfileWriterOpen(char* path, uint64_t suid, int64_t version, const char* colName, uint8_t colType) {
int32_t tfileWriterOpen(char* path, uint64_t suid, int64_t version, const char* colName, uint8_t colType,
TFileWriter** pWriter) {
int32_t code = 0;
char fullname[256] = {0};
tfileGenFileFullName(fullname, path, suid, colName, version);
IFileCtx* wcx = idxFileCtxCreate(TFILE, fullname, false, 1024 * 1024 * 64);
if (wcx == NULL) {
return NULL;
return TSDB_CODE_OUT_OF_MEMORY;
}
TFileHeader tfh = {0};
@ -508,34 +523,39 @@ TFileWriter* tfileWriterOpen(char* path, uint64_t suid, int64_t version, const c
memcpy(tfh.colName, colName, strlen(colName));
}
return tfileWriterCreate(wcx, &tfh);
return tfileWriterCreate(wcx, &tfh, pWriter);
}
TFileReader* tfileReaderOpen(SIndex* idx, uint64_t suid, int64_t version, const char* colName) {
int32_t tfileReaderOpen(SIndex* idx, uint64_t suid, int64_t version, const char* colName, TFileReader** pReader) {
int32_t code = 0;
char fullname[256] = {0};
tfileGenFileFullName(fullname, idx->path, suid, colName, version);
IFileCtx* wc = idxFileCtxCreate(TFILE, fullname, true, 1024 * 1024 * 1024);
if (wc == NULL) {
terrno = TAOS_SYSTEM_ERROR(errno);
indexError("failed to open readonly file: %s, reason: %s", fullname, terrstr());
return NULL;
code = TAOS_SYSTEM_ERROR(errno);
indexError("failed to open readonly file: %s, reason: %s", fullname, tstrerror(code));
return code;
}
wc->lru = idx->lru;
indexTrace("open read file name:%s, file size: %" PRId64 "", wc->file.buf, wc->file.size);
TFileReader* reader = tfileReaderCreate(wc);
return reader;
return tfileReaderCreate(wc, pReader);
}
TFileWriter* tfileWriterCreate(IFileCtx* ctx, TFileHeader* header) {
int32_t tfileWriterCreate(IFileCtx* ctx, TFileHeader* header, TFileWriter** pWriter) {
int32_t code = 0;
TFileWriter* tw = taosMemoryCalloc(1, sizeof(TFileWriter));
if (tw == NULL) {
indexError("index: %" PRIu64 " failed to alloc TFilerWriter", header->suid);
return NULL;
code = TSDB_CODE_OUT_OF_MEMORY;
indexError("index: %" PRIu64 " failed to alloc TFilerWriter since %s", header->suid, tstrerror(code));
return code;
}
tw->ctx = ctx;
tw->header = *header;
tfileWriteHeader(tw);
return tw;
*pWriter = tw;
return code;
}
int tfileWriterPut(TFileWriter* tw, void* data, bool order) {
@ -545,8 +565,8 @@ int tfileWriterPut(TFileWriter* tw, void* data, bool order) {
int8_t colType = tw->header.colType;
colType = IDX_TYPE_GET_TYPE(colType);
if (colType == TSDB_DATA_TYPE_BINARY || colType == TSDB_DATA_TYPE_VARBINARY ||
colType == TSDB_DATA_TYPE_NCHAR || colType == TSDB_DATA_TYPE_GEOMETRY) {
if (colType == TSDB_DATA_TYPE_BINARY || colType == TSDB_DATA_TYPE_VARBINARY || colType == TSDB_DATA_TYPE_NCHAR ||
colType == TSDB_DATA_TYPE_GEOMETRY) {
fn = tfileStrCompare;
} else {
fn = getComparFunc(colType, 0);
@ -570,6 +590,9 @@ int tfileWriterPut(TFileWriter* tw, void* data, bool order) {
int32_t cap = 4 * 1024;
char* buf = taosMemoryCalloc(1, cap);
if (buf == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
for (size_t i = 0; i < sz; i++) {
TFileValue* v = taosArrayGetP((SArray*)data, i);
@ -584,7 +607,7 @@ int tfileWriterPut(TFileWriter* tw, void* data, bool order) {
char* t = (char*)taosMemoryRealloc(buf, cap);
if (t == NULL) {
taosMemoryFree(buf);
return -1;
return TSDB_CODE_OUT_OF_MEMORY;
}
buf = t;
}
@ -664,7 +687,7 @@ void idxTFileDestroy(IndexTFile* tfile) {
int idxTFileSearch(void* tfile, SIndexTermQuery* query, SIdxTRslt* result) {
int ret = -1;
if (tfile == NULL) {
return ret;
return TSDB_CODE_INVALID_DATA_FMT;
}
int64_t st = taosGetTimestampUs();

View File

@ -160,10 +160,18 @@ int verdataCompare(const void *a, const void *b) {
SIdxTRslt *idxTRsltCreate() {
SIdxTRslt *tr = taosMemoryCalloc(1, sizeof(SIdxTRslt));
if (tr == NULL) {
return NULL;
}
tr->total = taosArrayInit(4, sizeof(uint64_t));
tr->add = taosArrayInit(4, sizeof(uint64_t));
tr->del = taosArrayInit(4, sizeof(uint64_t));
if (tr->total == NULL || tr->add == NULL || tr->del == NULL) {
idxTRsltClear(tr);
tr = NULL;
}
return tr;
}
void idxTRsltClear(SIdxTRslt *tr) {

View File

@ -294,7 +294,7 @@ class IndexEnv : public ::testing::Test {
taosRemoveDir(path);
SIndexOpts opts;
opts.cacheSize = 1024 * 1024 * 4;
int ret = indexOpen(&opts, path, &index);
int32_t ret = indexOpen(&opts, path, &index);
assert(ret == 0);
}
virtual void TearDown() { indexClose(index); }
@ -398,7 +398,7 @@ class TFileObj {
bool InitReader() {
IFileCtx* ctx = idxFileCtxCreate(TFILE, fileName_.c_str(), true, 64 * 1024 * 1024);
ctx->lru = taosLRUCacheInit(1024 * 1024 * 4, -1, .5);
reader_ = tfileReaderCreate(ctx);
int32_t code = tfileReaderCreate(ctx, &reader_);
return reader_ != NULL ? true : false;
}
int Get(SIndexTermQuery* query, SArray* result) {
@ -701,7 +701,7 @@ class IndexObj {
SIndexOpts opts;
opts.cacheSize = 1024 * 1024 * 4;
int ret = indexOpen(&opts, dir.c_str(), &idx);
int32_t ret = indexOpen(&opts, dir.c_str(), &idx);
if (ret != 0) {
// opt
std::cout << "failed to open index: %s" << dir << std::endl;