Merge remote-tracking branch 'origin/3.0' into fix/tsim
This commit is contained in:
commit
883ae19cac
|
@ -28,7 +28,6 @@ extern "C" {
|
||||||
|
|
||||||
typedef struct SIndex SIndex;
|
typedef struct SIndex SIndex;
|
||||||
typedef struct SIndexTerm SIndexTerm;
|
typedef struct SIndexTerm SIndexTerm;
|
||||||
typedef struct SIndexOpts SIndexOpts;
|
|
||||||
typedef struct SIndexMultiTermQuery SIndexMultiTermQuery;
|
typedef struct SIndexMultiTermQuery SIndexMultiTermQuery;
|
||||||
typedef struct SArray SIndexMultiTerm;
|
typedef struct SArray SIndexMultiTerm;
|
||||||
|
|
||||||
|
@ -62,6 +61,9 @@ typedef enum {
|
||||||
QUERY_MAX
|
QUERY_MAX
|
||||||
} EIndexQueryType;
|
} EIndexQueryType;
|
||||||
|
|
||||||
|
typedef struct SIndexOpts {
|
||||||
|
int32_t cacheSize; // MB
|
||||||
|
} SIndexOpts;
|
||||||
/*
|
/*
|
||||||
* create multi query
|
* create multi query
|
||||||
* @param oper (input, relation between querys)
|
* @param oper (input, relation between querys)
|
||||||
|
@ -173,7 +175,7 @@ void indexMultiTermDestroy(SIndexMultiTerm* terms);
|
||||||
* @param:
|
* @param:
|
||||||
* @param:
|
* @param:
|
||||||
*/
|
*/
|
||||||
SIndexOpts* indexOptsCreate();
|
SIndexOpts* indexOptsCreate(int32_t cacheSize);
|
||||||
void indexOptsDestroy(SIndexOpts* opts);
|
void indexOptsDestroy(SIndexOpts* opts);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
|
|
@ -101,6 +101,7 @@ int32_t countScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam
|
||||||
int32_t sumScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
|
int32_t sumScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
|
||||||
int32_t minScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
|
int32_t minScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
|
||||||
int32_t maxScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
|
int32_t maxScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
|
||||||
|
int32_t avgScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
|
||||||
int32_t stddevScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
|
int32_t stddevScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
|
|
|
@ -314,6 +314,11 @@ int taos_options_imp(TSDB_OPTION option, const char* str);
|
||||||
|
|
||||||
void* openTransporter(const char* user, const char* auth, int32_t numOfThreads);
|
void* openTransporter(const char* user, const char* auth, int32_t numOfThreads);
|
||||||
|
|
||||||
|
typedef struct AsyncArg {
|
||||||
|
SRpcMsg msg;
|
||||||
|
SEpSet* pEpset;
|
||||||
|
} AsyncArg;
|
||||||
|
|
||||||
bool persistConnForSpecificMsg(void* parenct, tmsg_t msgType);
|
bool persistConnForSpecificMsg(void* parenct, tmsg_t msgType);
|
||||||
void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet);
|
void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet);
|
||||||
|
|
||||||
|
|
|
@ -1266,13 +1266,8 @@ void updateTargetEpSet(SMsgSendInfo* pSendInfo, STscObj* pTscObj, SRpcMsg* pMsg,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
typedef struct SchedArg {
|
|
||||||
SRpcMsg msg;
|
|
||||||
SEpSet* pEpset;
|
|
||||||
} SchedArg;
|
|
||||||
|
|
||||||
int32_t doProcessMsgFromServer(void* param) {
|
int32_t doProcessMsgFromServer(void* param) {
|
||||||
SchedArg* arg = (SchedArg*)param;
|
AsyncArg* arg = (AsyncArg*)param;
|
||||||
SRpcMsg* pMsg = &arg->msg;
|
SRpcMsg* pMsg = &arg->msg;
|
||||||
SEpSet* pEpSet = arg->pEpset;
|
SEpSet* pEpSet = arg->pEpset;
|
||||||
|
|
||||||
|
@ -1335,7 +1330,7 @@ void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) {
|
||||||
memcpy((void*)tEpSet, (void*)pEpSet, sizeof(SEpSet));
|
memcpy((void*)tEpSet, (void*)pEpSet, sizeof(SEpSet));
|
||||||
}
|
}
|
||||||
|
|
||||||
SchedArg* arg = taosMemoryCalloc(1, sizeof(SchedArg));
|
AsyncArg* arg = taosMemoryCalloc(1, sizeof(AsyncArg));
|
||||||
arg->msg = *pMsg;
|
arg->msg = *pMsg;
|
||||||
arg->pEpset = tEpSet;
|
arg->pEpset = tEpSet;
|
||||||
|
|
||||||
|
|
|
@ -99,12 +99,12 @@ int metaOpen(SVnode *pVnode, SMeta **ppMeta) {
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
|
|
||||||
// open pTagIdx
|
|
||||||
// TODO(yihaoDeng), refactor later
|
|
||||||
char indexFullPath[128] = {0};
|
char indexFullPath[128] = {0};
|
||||||
sprintf(indexFullPath, "%s/%s", pMeta->path, "invert");
|
sprintf(indexFullPath, "%s/%s", pMeta->path, "invert");
|
||||||
taosMkDir(indexFullPath);
|
taosMkDir(indexFullPath);
|
||||||
ret = indexOpen(indexOptsCreate(), indexFullPath, (SIndex **)&pMeta->pTagIvtIdx);
|
|
||||||
|
SIndexOpts opts = {.cacheSize = 8 * 1024 * 1024};
|
||||||
|
ret = indexOpen(&opts, indexFullPath, (SIndex **)&pMeta->pTagIvtIdx);
|
||||||
if (ret < 0) {
|
if (ret < 0) {
|
||||||
metaError("vgId:%d, failed to open meta tag index since %s", TD_VID(pVnode), tstrerror(terrno));
|
metaError("vgId:%d, failed to open meta tag index since %s", TD_VID(pVnode), tstrerror(terrno));
|
||||||
goto _err;
|
goto _err;
|
||||||
|
|
|
@ -1994,6 +1994,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
||||||
.getEnvFunc = getAvgFuncEnv,
|
.getEnvFunc = getAvgFuncEnv,
|
||||||
.initFunc = avgFunctionSetup,
|
.initFunc = avgFunctionSetup,
|
||||||
.processFunc = avgFunction,
|
.processFunc = avgFunction,
|
||||||
|
.sprocessFunc = avgScalarFunction,
|
||||||
.finalizeFunc = avgFinalize,
|
.finalizeFunc = avgFinalize,
|
||||||
.invertFunc = avgInvertFunction,
|
.invertFunc = avgInvertFunction,
|
||||||
.combineFunc = avgCombine,
|
.combineFunc = avgCombine,
|
||||||
|
|
|
@ -27,7 +27,7 @@ extern "C" {
|
||||||
#define DefaultMem 1024 * 1024
|
#define DefaultMem 1024 * 1024
|
||||||
|
|
||||||
static char tmpFile[] = "./index";
|
static char tmpFile[] = "./index";
|
||||||
typedef enum WriterType { TMemory, TFile } WriterType;
|
typedef enum WriterType { TMEMORY, TFILE } WriterType;
|
||||||
|
|
||||||
typedef struct IFileCtx {
|
typedef struct IFileCtx {
|
||||||
int (*write)(struct IFileCtx* ctx, uint8_t* buf, int len);
|
int (*write)(struct IFileCtx* ctx, uint8_t* buf, int len);
|
||||||
|
@ -35,6 +35,8 @@ typedef struct IFileCtx {
|
||||||
int (*flush)(struct IFileCtx* ctx);
|
int (*flush)(struct IFileCtx* ctx);
|
||||||
int (*readFrom)(struct IFileCtx* ctx, uint8_t* buf, int len, int32_t offset);
|
int (*readFrom)(struct IFileCtx* ctx, uint8_t* buf, int len, int32_t offset);
|
||||||
int (*size)(struct IFileCtx* ctx);
|
int (*size)(struct IFileCtx* ctx);
|
||||||
|
|
||||||
|
SLRUCache* lru;
|
||||||
WriterType type;
|
WriterType type;
|
||||||
union {
|
union {
|
||||||
struct {
|
struct {
|
||||||
|
|
|
@ -24,12 +24,9 @@
|
||||||
#include "tchecksum.h"
|
#include "tchecksum.h"
|
||||||
#include "thash.h"
|
#include "thash.h"
|
||||||
#include "tlog.h"
|
#include "tlog.h"
|
||||||
|
#include "tlrucache.h"
|
||||||
#include "tutil.h"
|
#include "tutil.h"
|
||||||
|
|
||||||
#ifdef USE_LUCENE
|
|
||||||
#include <lucene++/Lucene_c.h>
|
|
||||||
#endif
|
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
extern "C" {
|
extern "C" {
|
||||||
#endif
|
#endif
|
||||||
|
@ -61,28 +58,17 @@ struct SIndex {
|
||||||
void* tindex;
|
void* tindex;
|
||||||
SHashObj* colObj; // < field name, field id>
|
SHashObj* colObj; // < field name, field id>
|
||||||
|
|
||||||
int64_t suid; // current super table id, -1 is normal table
|
int64_t suid; // current super table id, -1 is normal table
|
||||||
int32_t cVersion; // current version allocated to cache
|
int32_t cVersion; // current version allocated to cache
|
||||||
|
SLRUCache* lru;
|
||||||
char* path;
|
char* path;
|
||||||
|
|
||||||
int8_t status;
|
int8_t status;
|
||||||
SIndexStat stat;
|
SIndexStat stat;
|
||||||
TdThreadMutex mtx;
|
TdThreadMutex mtx;
|
||||||
tsem_t sem;
|
tsem_t sem;
|
||||||
bool quit;
|
bool quit;
|
||||||
};
|
SIndexOpts opts;
|
||||||
|
|
||||||
struct SIndexOpts {
|
|
||||||
#ifdef USE_LUCENE
|
|
||||||
void* opts;
|
|
||||||
#endif
|
|
||||||
|
|
||||||
#ifdef USE_INVERTED_INDEX
|
|
||||||
int32_t cacheSize; // MB
|
|
||||||
// add cache module later
|
|
||||||
#endif
|
|
||||||
int32_t cacheOpt; // MB
|
|
||||||
};
|
};
|
||||||
|
|
||||||
struct SIndexMultiTermQuery {
|
struct SIndexMultiTermQuery {
|
||||||
|
|
|
@ -71,6 +71,7 @@ typedef struct TFileReader {
|
||||||
IFileCtx* ctx;
|
IFileCtx* ctx;
|
||||||
TFileHeader header;
|
TFileHeader header;
|
||||||
bool remove;
|
bool remove;
|
||||||
|
void* lru;
|
||||||
} TFileReader;
|
} TFileReader;
|
||||||
|
|
||||||
typedef struct IndexTFile {
|
typedef struct IndexTFile {
|
||||||
|
@ -95,14 +96,14 @@ typedef struct TFileReaderOpt {
|
||||||
} TFileReaderOpt;
|
} TFileReaderOpt;
|
||||||
|
|
||||||
// tfile cache, manage tindex reader
|
// tfile cache, manage tindex reader
|
||||||
TFileCache* tfileCacheCreate(const char* path);
|
TFileCache* tfileCacheCreate(SIndex* idx, const char* path);
|
||||||
void tfileCacheDestroy(TFileCache* tcache);
|
void tfileCacheDestroy(TFileCache* tcache);
|
||||||
TFileReader* tfileCacheGet(TFileCache* tcache, ICacheKey* key);
|
TFileReader* tfileCacheGet(TFileCache* tcache, ICacheKey* key);
|
||||||
void tfileCachePut(TFileCache* tcache, ICacheKey* key, TFileReader* reader);
|
void tfileCachePut(TFileCache* tcache, ICacheKey* key, TFileReader* reader);
|
||||||
|
|
||||||
TFileReader* tfileGetReaderByCol(IndexTFile* tf, uint64_t suid, char* colName);
|
TFileReader* tfileGetReaderByCol(IndexTFile* tf, uint64_t suid, char* colName);
|
||||||
|
|
||||||
TFileReader* tfileReaderOpen(char* path, uint64_t suid, int64_t version, const char* colName);
|
TFileReader* tfileReaderOpen(SIndex* idx, uint64_t suid, int64_t version, const char* colName);
|
||||||
TFileReader* tfileReaderCreate(IFileCtx* ctx);
|
TFileReader* tfileReaderCreate(IFileCtx* ctx);
|
||||||
void tfileReaderDestroy(TFileReader* reader);
|
void tfileReaderDestroy(TFileReader* reader);
|
||||||
int tfileReaderSearch(TFileReader* reader, SIndexTermQuery* query, SIdxTRslt* tr);
|
int tfileReaderSearch(TFileReader* reader, SIndexTermQuery* query, SIdxTRslt* tr);
|
||||||
|
@ -117,7 +118,7 @@ int tfileWriterPut(TFileWriter* tw, void* data, bool order);
|
||||||
int tfileWriterFinish(TFileWriter* tw);
|
int tfileWriterFinish(TFileWriter* tw);
|
||||||
|
|
||||||
//
|
//
|
||||||
IndexTFile* idxTFileCreate(const char* path);
|
IndexTFile* idxTFileCreate(SIndex* idx, const char* path);
|
||||||
void idxTFileDestroy(IndexTFile* tfile);
|
void idxTFileDestroy(IndexTFile* tfile);
|
||||||
int idxTFilePut(void* tfile, SIndexTerm* term, uint64_t uid);
|
int idxTFilePut(void* tfile, SIndexTerm* term, uint64_t uid);
|
||||||
int idxTFileSearch(void* tfile, SIndexTermQuery* query, SIdxTRslt* tr);
|
int idxTFileSearch(void* tfile, SIndexTermQuery* query, SIdxTRslt* tr);
|
||||||
|
|
|
@ -103,44 +103,59 @@ static void indexWait(void* idx) {
|
||||||
int indexOpen(SIndexOpts* opts, const char* path, SIndex** index) {
|
int indexOpen(SIndexOpts* opts, const char* path, SIndex** index) {
|
||||||
int ret = TSDB_CODE_SUCCESS;
|
int ret = TSDB_CODE_SUCCESS;
|
||||||
taosThreadOnce(&isInit, indexInit);
|
taosThreadOnce(&isInit, indexInit);
|
||||||
SIndex* sIdx = taosMemoryCalloc(1, sizeof(SIndex));
|
SIndex* idx = taosMemoryCalloc(1, sizeof(SIndex));
|
||||||
if (sIdx == NULL) {
|
if (idx == NULL) {
|
||||||
return TSDB_CODE_OUT_OF_MEMORY;
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
}
|
}
|
||||||
|
|
||||||
sIdx->tindex = idxTFileCreate(path);
|
idx->lru = taosLRUCacheInit(opts->cacheSize, -1, .5);
|
||||||
if (sIdx->tindex == NULL) {
|
if (idx->lru == NULL) {
|
||||||
|
ret = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
goto END;
|
||||||
|
}
|
||||||
|
taosLRUCacheSetStrictCapacity(idx->lru, false);
|
||||||
|
|
||||||
|
idx->tindex = idxTFileCreate(idx, path);
|
||||||
|
if (idx->tindex == NULL) {
|
||||||
ret = TSDB_CODE_OUT_OF_MEMORY;
|
ret = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
goto END;
|
goto END;
|
||||||
}
|
}
|
||||||
|
|
||||||
sIdx->colObj = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
|
idx->colObj = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
|
||||||
sIdx->cVersion = 1;
|
idx->cVersion = 1;
|
||||||
sIdx->path = tstrdup(path);
|
idx->path = tstrdup(path);
|
||||||
taosThreadMutexInit(&sIdx->mtx, NULL);
|
taosThreadMutexInit(&idx->mtx, NULL);
|
||||||
tsem_init(&sIdx->sem, 0, 0);
|
tsem_init(&idx->sem, 0, 0);
|
||||||
|
|
||||||
sIdx->refId = idxAddRef(sIdx);
|
idx->refId = idxAddRef(idx);
|
||||||
idxAcquireRef(sIdx->refId);
|
idx->opts = *opts;
|
||||||
|
idxAcquireRef(idx->refId);
|
||||||
|
|
||||||
*index = sIdx;
|
*index = idx;
|
||||||
return ret;
|
return ret;
|
||||||
|
|
||||||
END:
|
END:
|
||||||
if (sIdx != NULL) {
|
if (idx != NULL) {
|
||||||
indexClose(sIdx);
|
indexClose(idx);
|
||||||
}
|
}
|
||||||
*index = NULL;
|
*index = NULL;
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
void indexDestroy(void* handle) {
|
void indexDestroy(void* handle) {
|
||||||
SIndex* sIdx = handle;
|
SIndex* idx = handle;
|
||||||
taosThreadMutexDestroy(&sIdx->mtx);
|
taosThreadMutexDestroy(&idx->mtx);
|
||||||
tsem_destroy(&sIdx->sem);
|
tsem_destroy(&idx->sem);
|
||||||
idxTFileDestroy(sIdx->tindex);
|
idxTFileDestroy(idx->tindex);
|
||||||
taosMemoryFree(sIdx->path);
|
taosMemoryFree(idx->path);
|
||||||
taosMemoryFree(sIdx);
|
|
||||||
|
SLRUCache* lru = idx->lru;
|
||||||
|
if (lru != NULL) {
|
||||||
|
taosLRUCacheEraseUnrefEntries(lru);
|
||||||
|
taosLRUCacheCleanup(lru);
|
||||||
|
}
|
||||||
|
idx->lru = NULL;
|
||||||
|
taosMemoryFree(idx);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
void indexClose(SIndex* sIdx) {
|
void indexClose(SIndex* sIdx) {
|
||||||
|
@ -159,6 +174,7 @@ void indexClose(SIndex* sIdx) {
|
||||||
taosHashCleanup(sIdx->colObj);
|
taosHashCleanup(sIdx->colObj);
|
||||||
sIdx->colObj = NULL;
|
sIdx->colObj = NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
idxReleaseRef(sIdx->refId);
|
idxReleaseRef(sIdx->refId);
|
||||||
idxRemoveRef(sIdx->refId);
|
idxRemoveRef(sIdx->refId);
|
||||||
}
|
}
|
||||||
|
@ -234,8 +250,12 @@ int indexSearch(SIndex* index, SIndexMultiTermQuery* multiQuerys, SArray* result
|
||||||
int indexDelete(SIndex* index, SIndexMultiTermQuery* query) { return 1; }
|
int indexDelete(SIndex* index, SIndexMultiTermQuery* query) { return 1; }
|
||||||
// int indexRebuild(SIndex* index, SIndexOpts* opts) { return 0; }
|
// int indexRebuild(SIndex* index, SIndexOpts* opts) { return 0; }
|
||||||
|
|
||||||
SIndexOpts* indexOptsCreate() { return NULL; }
|
SIndexOpts* indexOptsCreate(int32_t cacheSize) {
|
||||||
void indexOptsDestroy(SIndexOpts* opts) { return; }
|
SIndexOpts* opts = taosMemoryCalloc(1, sizeof(SIndexOpts));
|
||||||
|
opts->cacheSize = cacheSize;
|
||||||
|
return opts;
|
||||||
|
}
|
||||||
|
void indexOptsDestroy(SIndexOpts* opts) { return taosMemoryFree(opts); }
|
||||||
/*
|
/*
|
||||||
* @param: oper
|
* @param: oper
|
||||||
*
|
*
|
||||||
|
@ -641,7 +661,7 @@ static int idxGenTFile(SIndex* sIdx, IndexCache* cache, SArray* batch) {
|
||||||
}
|
}
|
||||||
tfileWriterClose(tw);
|
tfileWriterClose(tw);
|
||||||
|
|
||||||
TFileReader* reader = tfileReaderOpen(sIdx->path, cache->suid, version, cache->colName);
|
TFileReader* reader = tfileReaderOpen(sIdx, cache->suid, version, cache->colName);
|
||||||
if (reader == NULL) {
|
if (reader == NULL) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
|
@ -462,8 +462,8 @@ Iterate* idxCacheIteratorCreate(IndexCache* cache) {
|
||||||
if (cache->imm == NULL) {
|
if (cache->imm == NULL) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
Iterate* iiter = taosMemoryCalloc(1, sizeof(Iterate));
|
Iterate* iter = taosMemoryCalloc(1, sizeof(Iterate));
|
||||||
if (iiter == NULL) {
|
if (iter == NULL) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
taosThreadMutexLock(&cache->mtx);
|
taosThreadMutexLock(&cache->mtx);
|
||||||
|
@ -471,15 +471,15 @@ Iterate* idxCacheIteratorCreate(IndexCache* cache) {
|
||||||
idxMemRef(cache->imm);
|
idxMemRef(cache->imm);
|
||||||
|
|
||||||
MemTable* tbl = cache->imm;
|
MemTable* tbl = cache->imm;
|
||||||
iiter->val.val = taosArrayInit(1, sizeof(uint64_t));
|
iter->val.val = taosArrayInit(1, sizeof(uint64_t));
|
||||||
iiter->val.colVal = NULL;
|
iter->val.colVal = NULL;
|
||||||
iiter->iter = tbl != NULL ? tSkipListCreateIter(tbl->mem) : NULL;
|
iter->iter = tbl != NULL ? tSkipListCreateIter(tbl->mem) : NULL;
|
||||||
iiter->next = idxCacheIteratorNext;
|
iter->next = idxCacheIteratorNext;
|
||||||
iiter->getValue = idxCacheIteratorGetValue;
|
iter->getValue = idxCacheIteratorGetValue;
|
||||||
|
|
||||||
taosThreadMutexUnlock(&cache->mtx);
|
taosThreadMutexUnlock(&cache->mtx);
|
||||||
|
|
||||||
return iiter;
|
return iter;
|
||||||
}
|
}
|
||||||
void idxCacheIteratorDestroy(Iterate* iter) {
|
void idxCacheIteratorDestroy(Iterate* iter) {
|
||||||
if (iter == NULL) {
|
if (iter == NULL) {
|
||||||
|
@ -564,13 +564,13 @@ int idxCachePut(void* cache, SIndexTerm* term, uint64_t uid) {
|
||||||
idxMemUnRef(tbl);
|
idxMemUnRef(tbl);
|
||||||
|
|
||||||
taosThreadMutexUnlock(&pCache->mtx);
|
taosThreadMutexUnlock(&pCache->mtx);
|
||||||
|
|
||||||
idxCacheUnRef(pCache);
|
idxCacheUnRef(pCache);
|
||||||
return 0;
|
return 0;
|
||||||
// encode end
|
// encode end
|
||||||
}
|
}
|
||||||
void idxCacheForceToMerge(void* cache) {
|
void idxCacheForceToMerge(void* cache) {
|
||||||
IndexCache* pCache = cache;
|
IndexCache* pCache = cache;
|
||||||
|
|
||||||
idxCacheRef(pCache);
|
idxCacheRef(pCache);
|
||||||
taosThreadMutexLock(&pCache->mtx);
|
taosThreadMutexLock(&pCache->mtx);
|
||||||
|
|
||||||
|
|
|
@ -31,7 +31,7 @@ typedef struct SIFParam {
|
||||||
SHashObj *pFilter;
|
SHashObj *pFilter;
|
||||||
|
|
||||||
SArray *result;
|
SArray *result;
|
||||||
char * condValue;
|
char *condValue;
|
||||||
|
|
||||||
SIdxFltStatus status;
|
SIdxFltStatus status;
|
||||||
uint8_t colValType;
|
uint8_t colValType;
|
||||||
|
@ -45,7 +45,7 @@ typedef struct SIFParam {
|
||||||
|
|
||||||
typedef struct SIFCtx {
|
typedef struct SIFCtx {
|
||||||
int32_t code;
|
int32_t code;
|
||||||
SHashObj * pRes; /* element is SIFParam */
|
SHashObj *pRes; /* element is SIFParam */
|
||||||
bool noExec; // true: just iterate condition tree, and add hint to executor plan
|
bool noExec; // true: just iterate condition tree, and add hint to executor plan
|
||||||
SIndexMetaArg arg;
|
SIndexMetaArg arg;
|
||||||
// SIdxFltStatus st;
|
// SIdxFltStatus st;
|
||||||
|
@ -137,7 +137,7 @@ static int32_t sifGetValueFromNode(SNode *node, char **value) {
|
||||||
// covert data From snode;
|
// covert data From snode;
|
||||||
SValueNode *vn = (SValueNode *)node;
|
SValueNode *vn = (SValueNode *)node;
|
||||||
|
|
||||||
char * pData = nodesGetValueFromNode(vn);
|
char *pData = nodesGetValueFromNode(vn);
|
||||||
SDataType *pType = &vn->node.resType;
|
SDataType *pType = &vn->node.resType;
|
||||||
int32_t type = pType->type;
|
int32_t type = pType->type;
|
||||||
int32_t valLen = 0;
|
int32_t valLen = 0;
|
||||||
|
@ -175,7 +175,7 @@ static int32_t sifInitJsonParam(SNode *node, SIFParam *param, SIFCtx *ctx) {
|
||||||
SOperatorNode *nd = (SOperatorNode *)node;
|
SOperatorNode *nd = (SOperatorNode *)node;
|
||||||
assert(nodeType(node) == QUERY_NODE_OPERATOR);
|
assert(nodeType(node) == QUERY_NODE_OPERATOR);
|
||||||
SColumnNode *l = (SColumnNode *)nd->pLeft;
|
SColumnNode *l = (SColumnNode *)nd->pLeft;
|
||||||
SValueNode * r = (SValueNode *)nd->pRight;
|
SValueNode *r = (SValueNode *)nd->pRight;
|
||||||
|
|
||||||
param->colId = l->colId;
|
param->colId = l->colId;
|
||||||
param->colValType = l->node.resType.type;
|
param->colValType = l->node.resType.type;
|
||||||
|
@ -357,7 +357,7 @@ static Filter sifGetFilterFunc(EIndexQueryType type, bool *reverse) {
|
||||||
static int32_t sifDoIndex(SIFParam *left, SIFParam *right, int8_t operType, SIFParam *output) {
|
static int32_t sifDoIndex(SIFParam *left, SIFParam *right, int8_t operType, SIFParam *output) {
|
||||||
int ret = 0;
|
int ret = 0;
|
||||||
|
|
||||||
SIndexMetaArg * arg = &output->arg;
|
SIndexMetaArg *arg = &output->arg;
|
||||||
EIndexQueryType qtype = 0;
|
EIndexQueryType qtype = 0;
|
||||||
SIF_ERR_RET(sifGetFuncFromSql(operType, &qtype));
|
SIF_ERR_RET(sifGetFuncFromSql(operType, &qtype));
|
||||||
if (left->colValType == TSDB_DATA_TYPE_JSON) {
|
if (left->colValType == TSDB_DATA_TYPE_JSON) {
|
||||||
|
@ -749,7 +749,7 @@ int32_t doFilterTag(SNode *pFilterNode, SIndexMetaArg *metaArg, SArray *result,
|
||||||
|
|
||||||
SFilterInfo *filter = NULL;
|
SFilterInfo *filter = NULL;
|
||||||
|
|
||||||
SArray * output = taosArrayInit(8, sizeof(uint64_t));
|
SArray *output = taosArrayInit(8, sizeof(uint64_t));
|
||||||
SIFParam param = {.arg = *metaArg, .result = output};
|
SIFParam param = {.arg = *metaArg, .result = output};
|
||||||
SIF_ERR_RET(sifCalculate((SNode *)pFilterNode, ¶m));
|
SIF_ERR_RET(sifCalculate((SNode *)pFilterNode, ¶m));
|
||||||
|
|
||||||
|
|
|
@ -772,6 +772,7 @@ void fstBuilderDestroy(FstBuilder* b) {
|
||||||
if (b == NULL) {
|
if (b == NULL) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
fstBuilderFinish(b);
|
||||||
|
|
||||||
idxFileDestroy(b->wrt);
|
idxFileDestroy(b->wrt);
|
||||||
fstUnFinishedNodesDestroy(b->unfinished);
|
fstUnFinishedNodesDestroy(b->unfinished);
|
||||||
|
@ -1074,8 +1075,8 @@ FStmStBuilder* fstSearchWithState(Fst* fst, FAutoCtx* ctx) {
|
||||||
}
|
}
|
||||||
|
|
||||||
FstNode* fstGetRoot(Fst* fst) {
|
FstNode* fstGetRoot(Fst* fst) {
|
||||||
CompiledAddr rAddr = fstGetRootAddr(fst);
|
CompiledAddr addr = fstGetRootAddr(fst);
|
||||||
return fstGetNode(fst, rAddr);
|
return fstGetNode(fst, addr);
|
||||||
}
|
}
|
||||||
|
|
||||||
FstNode* fstGetNode(Fst* fst, CompiledAddr addr) {
|
FstNode* fstGetNode(Fst* fst, CompiledAddr addr) {
|
||||||
|
|
|
@ -4,8 +4,7 @@
|
||||||
* This program is free software: you can use, redistribute, and/or modify
|
* This program is free software: you can use, redistribute, and/or modify
|
||||||
* it under the terms of the GNU Affero General Public License, version 3
|
* it under the terms of the GNU Affero General Public License, version 3
|
||||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||||
*
|
* * This program is distributed in the hope that it will be useful, but WITHOUT
|
||||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
|
||||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||||
*
|
*
|
||||||
|
@ -14,13 +13,32 @@
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#include "indexFstFile.h"
|
#include "indexFstFile.h"
|
||||||
|
#include "indexComm.h"
|
||||||
#include "indexFstUtil.h"
|
#include "indexFstUtil.h"
|
||||||
#include "indexInt.h"
|
#include "indexInt.h"
|
||||||
|
#include "indexUtil.h"
|
||||||
#include "os.h"
|
#include "os.h"
|
||||||
#include "tutil.h"
|
#include "tutil.h"
|
||||||
|
|
||||||
|
static int32_t kBlockSize = 4096;
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
int32_t blockId;
|
||||||
|
int32_t nread;
|
||||||
|
char buf[0];
|
||||||
|
} SDataBlock;
|
||||||
|
|
||||||
|
static void deleteDataBlockFromLRU(const void* key, size_t keyLen, void* value) { taosMemoryFree(value); }
|
||||||
|
|
||||||
|
static void idxGenLRUKey(char* buf, const char* path, int32_t blockId) {
|
||||||
|
char* p = buf;
|
||||||
|
SERIALIZE_STR_VAR_TO_BUF(p, path, strlen(path));
|
||||||
|
SERIALIZE_VAR_TO_BUF(p, '_', char);
|
||||||
|
idxInt2str(blockId, p, 0);
|
||||||
|
return;
|
||||||
|
}
|
||||||
static int idxFileCtxDoWrite(IFileCtx* ctx, uint8_t* buf, int len) {
|
static int idxFileCtxDoWrite(IFileCtx* ctx, uint8_t* buf, int len) {
|
||||||
if (ctx->type == TFile) {
|
if (ctx->type == TFILE) {
|
||||||
assert(len == taosWriteFile(ctx->file.pFile, buf, len));
|
assert(len == taosWriteFile(ctx->file.pFile, buf, len));
|
||||||
} else {
|
} else {
|
||||||
memcpy(ctx->mem.buf + ctx->offset, buf, len);
|
memcpy(ctx->mem.buf + ctx->offset, buf, len);
|
||||||
|
@ -30,7 +48,7 @@ static int idxFileCtxDoWrite(IFileCtx* ctx, uint8_t* buf, int len) {
|
||||||
}
|
}
|
||||||
static int idxFileCtxDoRead(IFileCtx* ctx, uint8_t* buf, int len) {
|
static int idxFileCtxDoRead(IFileCtx* ctx, uint8_t* buf, int len) {
|
||||||
int nRead = 0;
|
int nRead = 0;
|
||||||
if (ctx->type == TFile) {
|
if (ctx->type == TFILE) {
|
||||||
#ifdef USE_MMAP
|
#ifdef USE_MMAP
|
||||||
nRead = len < ctx->file.size ? len : ctx->file.size;
|
nRead = len < ctx->file.size ? len : ctx->file.size;
|
||||||
memcpy(buf, ctx->file.ptr, nRead);
|
memcpy(buf, ctx->file.ptr, nRead);
|
||||||
|
@ -45,24 +63,54 @@ static int idxFileCtxDoRead(IFileCtx* ctx, uint8_t* buf, int len) {
|
||||||
return nRead;
|
return nRead;
|
||||||
}
|
}
|
||||||
static int idxFileCtxDoReadFrom(IFileCtx* ctx, uint8_t* buf, int len, int32_t offset) {
|
static int idxFileCtxDoReadFrom(IFileCtx* ctx, uint8_t* buf, int len, int32_t offset) {
|
||||||
int nRead = 0;
|
int32_t total = 0, nread = 0;
|
||||||
if (ctx->type == TFile) {
|
int32_t blkId = offset / kBlockSize;
|
||||||
// tfLseek(ctx->file.pFile, offset, 0);
|
int32_t blkOffset = offset % kBlockSize;
|
||||||
#ifdef USE_MMAP
|
int32_t blkLeft = kBlockSize - blkOffset;
|
||||||
int32_t last = ctx->file.size - offset;
|
|
||||||
nRead = last >= len ? len : last;
|
do {
|
||||||
memcpy(buf, ctx->file.ptr + offset, nRead);
|
char key[128] = {0};
|
||||||
#else
|
idxGenLRUKey(key, ctx->file.buf, blkId);
|
||||||
nRead = taosPReadFile(ctx->file.pFile, buf, len, offset);
|
LRUHandle* h = taosLRUCacheLookup(ctx->lru, key, strlen(key));
|
||||||
#endif
|
|
||||||
} else {
|
if (h) {
|
||||||
// refactor later
|
SDataBlock* blk = taosLRUCacheValue(ctx->lru, h);
|
||||||
assert(0);
|
nread = TMIN(blkLeft, len);
|
||||||
}
|
memcpy(buf + total, blk->buf + blkOffset, nread);
|
||||||
return nRead;
|
taosLRUCacheRelease(ctx->lru, h, false);
|
||||||
|
} else {
|
||||||
|
int32_t cacheMemSize = sizeof(SDataBlock) + kBlockSize;
|
||||||
|
|
||||||
|
SDataBlock* blk = taosMemoryCalloc(1, cacheMemSize);
|
||||||
|
blk->blockId = blkId;
|
||||||
|
blk->nread = taosPReadFile(ctx->file.pFile, blk->buf, kBlockSize, blkId * kBlockSize);
|
||||||
|
assert(blk->nread <= kBlockSize);
|
||||||
|
nread = TMIN(blkLeft, len);
|
||||||
|
|
||||||
|
if (blk->nread < kBlockSize && blk->nread < len) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
memcpy(buf + total, blk->buf + blkOffset, nread);
|
||||||
|
|
||||||
|
LRUStatus s = taosLRUCacheInsert(ctx->lru, key, strlen(key), blk, cacheMemSize, deleteDataBlockFromLRU, NULL,
|
||||||
|
TAOS_LRU_PRIORITY_LOW);
|
||||||
|
if (s != TAOS_LRU_STATUS_OK) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
total += nread;
|
||||||
|
len -= nread;
|
||||||
|
offset += nread;
|
||||||
|
|
||||||
|
blkId = offset / kBlockSize;
|
||||||
|
blkOffset = offset % kBlockSize;
|
||||||
|
blkLeft = kBlockSize - blkOffset;
|
||||||
|
|
||||||
|
} while (len > 0);
|
||||||
|
return total;
|
||||||
}
|
}
|
||||||
static int idxFileCtxGetSize(IFileCtx* ctx) {
|
static int idxFileCtxGetSize(IFileCtx* ctx) {
|
||||||
if (ctx->type == TFile) {
|
if (ctx->type == TFILE) {
|
||||||
int64_t file_size = 0;
|
int64_t file_size = 0;
|
||||||
taosStatFile(ctx->file.buf, &file_size, NULL);
|
taosStatFile(ctx->file.buf, &file_size, NULL);
|
||||||
return (int)file_size;
|
return (int)file_size;
|
||||||
|
@ -70,7 +118,7 @@ static int idxFileCtxGetSize(IFileCtx* ctx) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
static int idxFileCtxDoFlush(IFileCtx* ctx) {
|
static int idxFileCtxDoFlush(IFileCtx* ctx) {
|
||||||
if (ctx->type == TFile) {
|
if (ctx->type == TFILE) {
|
||||||
taosFsyncFile(ctx->file.pFile);
|
taosFsyncFile(ctx->file.pFile);
|
||||||
} else {
|
} else {
|
||||||
// do nothing
|
// do nothing
|
||||||
|
@ -85,7 +133,7 @@ IFileCtx* idxFileCtxCreate(WriterType type, const char* path, bool readOnly, int
|
||||||
}
|
}
|
||||||
|
|
||||||
ctx->type = type;
|
ctx->type = type;
|
||||||
if (ctx->type == TFile) {
|
if (ctx->type == TFILE) {
|
||||||
// ugly code, refactor later
|
// ugly code, refactor later
|
||||||
ctx->file.readOnly = readOnly;
|
ctx->file.readOnly = readOnly;
|
||||||
memcpy(ctx->file.buf, path, strlen(path));
|
memcpy(ctx->file.buf, path, strlen(path));
|
||||||
|
@ -93,8 +141,6 @@ IFileCtx* idxFileCtxCreate(WriterType type, const char* path, bool readOnly, int
|
||||||
ctx->file.pFile = taosOpenFile(path, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND);
|
ctx->file.pFile = taosOpenFile(path, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND);
|
||||||
taosFtruncateFile(ctx->file.pFile, 0);
|
taosFtruncateFile(ctx->file.pFile, 0);
|
||||||
taosStatFile(path, &ctx->file.size, NULL);
|
taosStatFile(path, &ctx->file.size, NULL);
|
||||||
// ctx->file.size = (int)size;
|
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
ctx->file.pFile = taosOpenFile(path, TD_FILE_READ);
|
ctx->file.pFile = taosOpenFile(path, TD_FILE_READ);
|
||||||
|
|
||||||
|
@ -109,10 +155,11 @@ IFileCtx* idxFileCtxCreate(WriterType type, const char* path, bool readOnly, int
|
||||||
indexError("failed to open file, error %d", errno);
|
indexError("failed to open file, error %d", errno);
|
||||||
goto END;
|
goto END;
|
||||||
}
|
}
|
||||||
} else if (ctx->type == TMemory) {
|
} else if (ctx->type == TMEMORY) {
|
||||||
ctx->mem.buf = taosMemoryCalloc(1, sizeof(char) * capacity);
|
ctx->mem.buf = taosMemoryCalloc(1, sizeof(char) * capacity);
|
||||||
ctx->mem.cap = capacity;
|
ctx->mem.cap = capacity;
|
||||||
}
|
}
|
||||||
|
|
||||||
ctx->write = idxFileCtxDoWrite;
|
ctx->write = idxFileCtxDoWrite;
|
||||||
ctx->read = idxFileCtxDoRead;
|
ctx->read = idxFileCtxDoRead;
|
||||||
ctx->flush = idxFileCtxDoFlush;
|
ctx->flush = idxFileCtxDoFlush;
|
||||||
|
@ -124,14 +171,14 @@ IFileCtx* idxFileCtxCreate(WriterType type, const char* path, bool readOnly, int
|
||||||
|
|
||||||
return ctx;
|
return ctx;
|
||||||
END:
|
END:
|
||||||
if (ctx->type == TMemory) {
|
if (ctx->type == TMEMORY) {
|
||||||
taosMemoryFree(ctx->mem.buf);
|
taosMemoryFree(ctx->mem.buf);
|
||||||
}
|
}
|
||||||
taosMemoryFree(ctx);
|
taosMemoryFree(ctx);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
void idxFileCtxDestroy(IFileCtx* ctx, bool remove) {
|
void idxFileCtxDestroy(IFileCtx* ctx, bool remove) {
|
||||||
if (ctx->type == TMemory) {
|
if (ctx->type == TMEMORY) {
|
||||||
taosMemoryFree(ctx->mem.buf);
|
taosMemoryFree(ctx->mem.buf);
|
||||||
} else {
|
} else {
|
||||||
ctx->flush(ctx);
|
ctx->flush(ctx);
|
||||||
|
@ -183,6 +230,7 @@ int idxFileWrite(IdxFstFile* write, uint8_t* buf, uint32_t len) {
|
||||||
write->summer = taosCalcChecksum(write->summer, buf, len);
|
write->summer = taosCalcChecksum(write->summer, buf, len);
|
||||||
return len;
|
return len;
|
||||||
}
|
}
|
||||||
|
|
||||||
int idxFileRead(IdxFstFile* write, uint8_t* buf, uint32_t len) {
|
int idxFileRead(IdxFstFile* write, uint8_t* buf, uint32_t len) {
|
||||||
if (write == NULL) {
|
if (write == NULL) {
|
||||||
return 0;
|
return 0;
|
||||||
|
|
|
@ -90,7 +90,7 @@ static int32_t (*tfSearch[][QUERY_MAX])(void* reader, SIndexTerm* tem, SIdxTRslt
|
||||||
{tfSearchEqual_JSON, tfSearchPrefix_JSON, tfSearchSuffix_JSON, tfSearchRegex_JSON, tfSearchLessThan_JSON,
|
{tfSearchEqual_JSON, tfSearchPrefix_JSON, tfSearchSuffix_JSON, tfSearchRegex_JSON, tfSearchLessThan_JSON,
|
||||||
tfSearchLessEqual_JSON, tfSearchGreaterThan_JSON, tfSearchGreaterEqual_JSON, tfSearchRange_JSON}};
|
tfSearchLessEqual_JSON, tfSearchGreaterThan_JSON, tfSearchGreaterEqual_JSON, tfSearchRange_JSON}};
|
||||||
|
|
||||||
TFileCache* tfileCacheCreate(const char* path) {
|
TFileCache* tfileCacheCreate(SIndex* idx, const char* path) {
|
||||||
TFileCache* tcache = taosMemoryCalloc(1, sizeof(TFileCache));
|
TFileCache* tcache = taosMemoryCalloc(1, sizeof(TFileCache));
|
||||||
if (tcache == NULL) {
|
if (tcache == NULL) {
|
||||||
return NULL;
|
return NULL;
|
||||||
|
@ -103,17 +103,20 @@ TFileCache* tfileCacheCreate(const char* path) {
|
||||||
for (size_t i = 0; i < taosArrayGetSize(files); i++) {
|
for (size_t i = 0; i < taosArrayGetSize(files); i++) {
|
||||||
char* file = taosArrayGetP(files, i);
|
char* file = taosArrayGetP(files, i);
|
||||||
|
|
||||||
IFileCtx* wc = idxFileCtxCreate(TFile, file, true, 1024 * 1024 * 64);
|
IFileCtx* ctx = idxFileCtxCreate(TFILE, file, true, 1024 * 1024 * 64);
|
||||||
if (wc == NULL) {
|
if (ctx == NULL) {
|
||||||
indexError("failed to open index:%s", file);
|
indexError("failed to open index:%s", file);
|
||||||
goto End;
|
goto End;
|
||||||
}
|
}
|
||||||
|
ctx->lru = idx->lru;
|
||||||
|
|
||||||
TFileReader* reader = tfileReaderCreate(wc);
|
TFileReader* reader = tfileReaderCreate(ctx);
|
||||||
if (reader == NULL) {
|
if (reader == NULL) {
|
||||||
indexInfo("skip invalid file: %s", file);
|
indexInfo("skip invalid file: %s", file);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
reader->lru = idx->lru;
|
||||||
|
|
||||||
TFileHeader* header = &reader->header;
|
TFileHeader* header = &reader->header;
|
||||||
ICacheKey key = {.suid = header->suid, .colName = header->colName, .nColName = (int32_t)strlen(header->colName)};
|
ICacheKey key = {.suid = header->suid, .colName = header->colName, .nColName = (int32_t)strlen(header->colName)};
|
||||||
|
|
||||||
|
@ -160,9 +163,8 @@ TFileReader* tfileCacheGet(TFileCache* tcache, ICacheKey* key) {
|
||||||
return *reader;
|
return *reader;
|
||||||
}
|
}
|
||||||
void tfileCachePut(TFileCache* tcache, ICacheKey* key, TFileReader* reader) {
|
void tfileCachePut(TFileCache* tcache, ICacheKey* key, TFileReader* reader) {
|
||||||
char buf[128] = {0};
|
char buf[128] = {0};
|
||||||
int32_t sz = idxSerialCacheKey(key, buf);
|
int32_t sz = idxSerialCacheKey(key, buf);
|
||||||
// remove last version index reader
|
|
||||||
TFileReader** p = taosHashGet(tcache->tableCache, buf, sz);
|
TFileReader** p = taosHashGet(tcache->tableCache, buf, sz);
|
||||||
if (p != NULL && *p != NULL) {
|
if (p != NULL && *p != NULL) {
|
||||||
TFileReader* oldRdr = *p;
|
TFileReader* oldRdr = *p;
|
||||||
|
@ -493,7 +495,7 @@ TFileWriter* tfileWriterOpen(char* path, uint64_t suid, int64_t version, const c
|
||||||
char fullname[256] = {0};
|
char fullname[256] = {0};
|
||||||
tfileGenFileFullName(fullname, path, suid, colName, version);
|
tfileGenFileFullName(fullname, path, suid, colName, version);
|
||||||
// indexInfo("open write file name %s", fullname);
|
// indexInfo("open write file name %s", fullname);
|
||||||
IFileCtx* wcx = idxFileCtxCreate(TFile, fullname, false, 1024 * 1024 * 64);
|
IFileCtx* wcx = idxFileCtxCreate(TFILE, fullname, false, 1024 * 1024 * 64);
|
||||||
if (wcx == NULL) {
|
if (wcx == NULL) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
@ -506,16 +508,17 @@ TFileWriter* tfileWriterOpen(char* path, uint64_t suid, int64_t version, const c
|
||||||
|
|
||||||
return tfileWriterCreate(wcx, &tfh);
|
return tfileWriterCreate(wcx, &tfh);
|
||||||
}
|
}
|
||||||
TFileReader* tfileReaderOpen(char* path, uint64_t suid, int64_t version, const char* colName) {
|
TFileReader* tfileReaderOpen(SIndex* idx, uint64_t suid, int64_t version, const char* colName) {
|
||||||
char fullname[256] = {0};
|
char fullname[256] = {0};
|
||||||
tfileGenFileFullName(fullname, path, suid, colName, version);
|
tfileGenFileFullName(fullname, idx->path, suid, colName, version);
|
||||||
|
|
||||||
IFileCtx* wc = idxFileCtxCreate(TFile, fullname, true, 1024 * 1024 * 1024);
|
IFileCtx* wc = idxFileCtxCreate(TFILE, fullname, true, 1024 * 1024 * 1024);
|
||||||
if (wc == NULL) {
|
if (wc == NULL) {
|
||||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
indexError("failed to open readonly file: %s, reason: %s", fullname, terrstr());
|
indexError("failed to open readonly file: %s, reason: %s", fullname, terrstr());
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
wc->lru = idx->lru;
|
||||||
indexTrace("open read file name:%s, file size: %" PRId64 "", wc->file.buf, wc->file.size);
|
indexTrace("open read file name:%s, file size: %" PRId64 "", wc->file.buf, wc->file.size);
|
||||||
|
|
||||||
TFileReader* reader = tfileReaderCreate(wc);
|
TFileReader* reader = tfileReaderCreate(wc);
|
||||||
|
@ -598,17 +601,11 @@ int tfileWriterPut(TFileWriter* tw, void* data, bool order) {
|
||||||
indexError("failed to write data: %s, offset: %d len: %d", v->colVal, v->offset,
|
indexError("failed to write data: %s, offset: %d len: %d", v->colVal, v->offset,
|
||||||
(int)taosArrayGetSize(v->tableId));
|
(int)taosArrayGetSize(v->tableId));
|
||||||
} else {
|
} else {
|
||||||
// indexInfo("success to write data: %s, offset: %d len: %d", v->colVal, v->offset,
|
indexInfo("success to write data: %s, offset: %d len: %d", v->colVal, v->offset,
|
||||||
// (int)taosArrayGetSize(v->tableId));
|
(int)taosArrayGetSize(v->tableId));
|
||||||
|
|
||||||
// indexInfo("tfile write data size: %d", tw->ctx->size(tw->ctx));
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fstBuilderFinish(tw->fb);
|
|
||||||
fstBuilderDestroy(tw->fb);
|
fstBuilderDestroy(tw->fb);
|
||||||
tw->fb = NULL;
|
|
||||||
|
|
||||||
tfileWriteFooter(tw);
|
tfileWriteFooter(tw);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -627,8 +624,8 @@ void tfileWriterDestroy(TFileWriter* tw) {
|
||||||
taosMemoryFree(tw);
|
taosMemoryFree(tw);
|
||||||
}
|
}
|
||||||
|
|
||||||
IndexTFile* idxTFileCreate(const char* path) {
|
IndexTFile* idxTFileCreate(SIndex* idx, const char* path) {
|
||||||
TFileCache* cache = tfileCacheCreate(path);
|
TFileCache* cache = tfileCacheCreate(idx, path);
|
||||||
if (cache == NULL) {
|
if (cache == NULL) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
@ -859,18 +856,6 @@ static int tfileWriteData(TFileWriter* write, TFileValue* tval) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
return -1;
|
return -1;
|
||||||
|
|
||||||
// if (colType == TSDB_DATA_TYPE_BINARY || colType == TSDB_DATA_TYPE_NCHAR) {
|
|
||||||
// FstSlice key = fstSliceCreate((uint8_t*)(tval->colVal), (size_t)strlen(tval->colVal));
|
|
||||||
// if (fstBuilderInsert(write->fb, key, tval->offset)) {
|
|
||||||
// fstSliceDestroy(&key);
|
|
||||||
// return 0;
|
|
||||||
// }
|
|
||||||
// fstSliceDestroy(&key);
|
|
||||||
// return -1;
|
|
||||||
//} else {
|
|
||||||
// // handle other type later
|
|
||||||
//}
|
|
||||||
}
|
}
|
||||||
static int tfileWriteFooter(TFileWriter* write) {
|
static int tfileWriteFooter(TFileWriter* write) {
|
||||||
char buf[sizeof(FILE_MAGIC_NUMBER) + 1] = {0};
|
char buf[sizeof(FILE_MAGIC_NUMBER) + 1] = {0};
|
||||||
|
@ -887,6 +872,7 @@ static int tfileReaderLoadHeader(TFileReader* reader) {
|
||||||
char buf[TFILE_HEADER_SIZE] = {0};
|
char buf[TFILE_HEADER_SIZE] = {0};
|
||||||
|
|
||||||
int64_t nread = reader->ctx->readFrom(reader->ctx, buf, sizeof(buf), 0);
|
int64_t nread = reader->ctx->readFrom(reader->ctx, buf, sizeof(buf), 0);
|
||||||
|
|
||||||
if (nread == -1) {
|
if (nread == -1) {
|
||||||
indexError("actual Read: %d, to read: %d, errno: %d, filename: %s", (int)(nread), (int)sizeof(buf), errno,
|
indexError("actual Read: %d, to read: %d, errno: %d, filename: %s", (int)(nread), (int)sizeof(buf), errno,
|
||||||
reader->ctx->file.buf);
|
reader->ctx->file.buf);
|
||||||
|
@ -914,7 +900,7 @@ static int tfileReaderLoadFst(TFileReader* reader) {
|
||||||
int64_t cost = taosGetTimestampUs() - ts;
|
int64_t cost = taosGetTimestampUs() - ts;
|
||||||
indexInfo("nread = %d, and fst offset=%d, fst size: %d, filename: %s, file size: %" PRId64 ", time cost: %" PRId64
|
indexInfo("nread = %d, and fst offset=%d, fst size: %d, filename: %s, file size: %" PRId64 ", time cost: %" PRId64
|
||||||
"us",
|
"us",
|
||||||
nread, reader->header.fstOffset, fstSize, ctx->file.buf, ctx->file.size, cost);
|
nread, reader->header.fstOffset, fstSize, ctx->file.buf, size, cost);
|
||||||
// we assuse fst size less than FST_MAX_SIZE
|
// we assuse fst size less than FST_MAX_SIZE
|
||||||
assert(nread > 0 && nread <= fstSize);
|
assert(nread > 0 && nread <= fstSize);
|
||||||
|
|
||||||
|
|
|
@ -19,7 +19,7 @@ class FstWriter {
|
||||||
public:
|
public:
|
||||||
FstWriter() {
|
FstWriter() {
|
||||||
taosRemoveFile(fileName.c_str());
|
taosRemoveFile(fileName.c_str());
|
||||||
_wc = idxFileCtxCreate(TFile, fileName.c_str(), false, 64 * 1024 * 1024);
|
_wc = idxFileCtxCreate(TFILE, fileName.c_str(), false, 64 * 1024 * 1024);
|
||||||
_b = fstBuilderCreate(_wc, 0);
|
_b = fstBuilderCreate(_wc, 0);
|
||||||
}
|
}
|
||||||
bool Put(const std::string& key, uint64_t val) {
|
bool Put(const std::string& key, uint64_t val) {
|
||||||
|
@ -34,7 +34,7 @@ class FstWriter {
|
||||||
return ok;
|
return ok;
|
||||||
}
|
}
|
||||||
~FstWriter() {
|
~FstWriter() {
|
||||||
fstBuilderFinish(_b);
|
// fstBuilderFinish(_b);
|
||||||
fstBuilderDestroy(_b);
|
fstBuilderDestroy(_b);
|
||||||
|
|
||||||
idxFileCtxDestroy(_wc, false);
|
idxFileCtxDestroy(_wc, false);
|
||||||
|
@ -48,7 +48,7 @@ class FstWriter {
|
||||||
class FstReadMemory {
|
class FstReadMemory {
|
||||||
public:
|
public:
|
||||||
FstReadMemory(int32_t size, const std::string& fileName = TD_TMP_DIR_PATH "tindex.tindex") {
|
FstReadMemory(int32_t size, const std::string& fileName = TD_TMP_DIR_PATH "tindex.tindex") {
|
||||||
_wc = idxFileCtxCreate(TFile, fileName.c_str(), true, 64 * 1024);
|
_wc = idxFileCtxCreate(TFILE, fileName.c_str(), true, 64 * 1024);
|
||||||
_w = idxFileCreate(_wc);
|
_w = idxFileCreate(_wc);
|
||||||
_size = size;
|
_size = size;
|
||||||
memset((void*)&_s, 0, sizeof(_s));
|
memset((void*)&_s, 0, sizeof(_s));
|
||||||
|
@ -598,7 +598,9 @@ void fst_get(Fst* fst) {
|
||||||
void validateTFile(char* arg) {
|
void validateTFile(char* arg) {
|
||||||
std::thread threads[NUM_OF_THREAD];
|
std::thread threads[NUM_OF_THREAD];
|
||||||
// std::vector<std::thread> threads;
|
// std::vector<std::thread> threads;
|
||||||
TFileReader* reader = tfileReaderOpen(arg, 0, 20000000, "tag1");
|
SIndex* index = (SIndex*)taosMemoryCalloc(1, sizeof(SIndex));
|
||||||
|
index->path = strdup(arg);
|
||||||
|
TFileReader* reader = tfileReaderOpen(index, 0, 20000000, "tag1");
|
||||||
|
|
||||||
for (int i = 0; i < NUM_OF_THREAD; i++) {
|
for (int i = 0; i < NUM_OF_THREAD; i++) {
|
||||||
threads[i] = std::thread(fst_get, reader->fst);
|
threads[i] = std::thread(fst_get, reader->fst);
|
||||||
|
@ -617,7 +619,7 @@ void iterTFileReader(char* path, char* uid, char* colName, char* ver) {
|
||||||
uint64_t suid = atoi(uid);
|
uint64_t suid = atoi(uid);
|
||||||
int version = atoi(ver);
|
int version = atoi(ver);
|
||||||
|
|
||||||
TFileReader* reader = tfileReaderOpen(path, suid, version, colName);
|
TFileReader* reader = tfileReaderOpen(NULL, suid, version, colName);
|
||||||
|
|
||||||
Iterate* iter = tfileIteratorCreate(reader);
|
Iterate* iter = tfileIteratorCreate(reader);
|
||||||
bool tn = iter ? iter->next(iter) : false;
|
bool tn = iter ? iter->next(iter) : false;
|
||||||
|
|
|
@ -39,7 +39,7 @@ static void EnvCleanup() {}
|
||||||
class FstWriter {
|
class FstWriter {
|
||||||
public:
|
public:
|
||||||
FstWriter() {
|
FstWriter() {
|
||||||
_wc = idxFileCtxCreate(TFile, tindex, false, 64 * 1024 * 1024);
|
_wc = idxFileCtxCreate(TFILE, tindex, false, 64 * 1024 * 1024);
|
||||||
_b = fstBuilderCreate(_wc, 0);
|
_b = fstBuilderCreate(_wc, 0);
|
||||||
}
|
}
|
||||||
bool Put(const std::string& key, uint64_t val) {
|
bool Put(const std::string& key, uint64_t val) {
|
||||||
|
@ -54,7 +54,6 @@ class FstWriter {
|
||||||
return ok;
|
return ok;
|
||||||
}
|
}
|
||||||
~FstWriter() {
|
~FstWriter() {
|
||||||
fstBuilderFinish(_b);
|
|
||||||
fstBuilderDestroy(_b);
|
fstBuilderDestroy(_b);
|
||||||
|
|
||||||
idxFileCtxDestroy(_wc, false);
|
idxFileCtxDestroy(_wc, false);
|
||||||
|
@ -68,7 +67,7 @@ class FstWriter {
|
||||||
class FstReadMemory {
|
class FstReadMemory {
|
||||||
public:
|
public:
|
||||||
FstReadMemory(size_t size) {
|
FstReadMemory(size_t size) {
|
||||||
_wc = idxFileCtxCreate(TFile, tindex, true, 64 * 1024);
|
_wc = idxFileCtxCreate(TFILE, tindex, true, 64 * 1024);
|
||||||
_w = idxFileCreate(_wc);
|
_w = idxFileCreate(_wc);
|
||||||
_size = size;
|
_size = size;
|
||||||
memset((void*)&_s, 0, sizeof(_s));
|
memset((void*)&_s, 0, sizeof(_s));
|
||||||
|
|
|
@ -50,7 +50,7 @@ class DebugInfo {
|
||||||
class FstWriter {
|
class FstWriter {
|
||||||
public:
|
public:
|
||||||
FstWriter() {
|
FstWriter() {
|
||||||
_wc = idxFileCtxCreate(TFile, TD_TMP_DIR_PATH "tindex", false, 64 * 1024 * 1024);
|
_wc = idxFileCtxCreate(TFILE, TD_TMP_DIR_PATH "tindex", false, 64 * 1024 * 1024);
|
||||||
_b = fstBuilderCreate(NULL, 0);
|
_b = fstBuilderCreate(NULL, 0);
|
||||||
}
|
}
|
||||||
bool Put(const std::string& key, uint64_t val) {
|
bool Put(const std::string& key, uint64_t val) {
|
||||||
|
@ -60,7 +60,7 @@ class FstWriter {
|
||||||
return ok;
|
return ok;
|
||||||
}
|
}
|
||||||
~FstWriter() {
|
~FstWriter() {
|
||||||
fstBuilderFinish(_b);
|
// fstBuilderFinish(_b);
|
||||||
fstBuilderDestroy(_b);
|
fstBuilderDestroy(_b);
|
||||||
|
|
||||||
idxFileCtxDestroy(_wc, false);
|
idxFileCtxDestroy(_wc, false);
|
||||||
|
@ -74,7 +74,7 @@ class FstWriter {
|
||||||
class FstReadMemory {
|
class FstReadMemory {
|
||||||
public:
|
public:
|
||||||
FstReadMemory(size_t size) {
|
FstReadMemory(size_t size) {
|
||||||
_wc = idxFileCtxCreate(TFile, TD_TMP_DIR_PATH "tindex", true, 64 * 1024);
|
_wc = idxFileCtxCreate(TFILE, TD_TMP_DIR_PATH "tindex", true, 64 * 1024);
|
||||||
_w = idxFileCreate(_wc);
|
_w = idxFileCreate(_wc);
|
||||||
_size = size;
|
_size = size;
|
||||||
memset((void*)&_s, 0, sizeof(_s));
|
memset((void*)&_s, 0, sizeof(_s));
|
||||||
|
@ -292,14 +292,12 @@ class IndexEnv : public ::testing::Test {
|
||||||
virtual void SetUp() {
|
virtual void SetUp() {
|
||||||
initLog();
|
initLog();
|
||||||
taosRemoveDir(path);
|
taosRemoveDir(path);
|
||||||
opts = indexOptsCreate();
|
SIndexOpts opts;
|
||||||
int ret = indexOpen(opts, path, &index);
|
opts.cacheSize = 1024 * 1024 * 4;
|
||||||
|
int ret = indexOpen(&opts, path, &index);
|
||||||
assert(ret == 0);
|
assert(ret == 0);
|
||||||
}
|
}
|
||||||
virtual void TearDown() {
|
virtual void TearDown() { indexClose(index); }
|
||||||
indexClose(index);
|
|
||||||
indexOptsDestroy(opts);
|
|
||||||
}
|
|
||||||
|
|
||||||
const char* path = TD_TMP_DIR_PATH "tindex";
|
const char* path = TD_TMP_DIR_PATH "tindex";
|
||||||
SIndexOpts* opts;
|
SIndexOpts* opts;
|
||||||
|
@ -391,13 +389,15 @@ class TFileObj {
|
||||||
|
|
||||||
fileName_ = path;
|
fileName_ = path;
|
||||||
|
|
||||||
IFileCtx* ctx = idxFileCtxCreate(TFile, path.c_str(), false, 64 * 1024 * 1024);
|
IFileCtx* ctx = idxFileCtxCreate(TFILE, path.c_str(), false, 64 * 1024 * 1024);
|
||||||
|
ctx->lru = taosLRUCacheInit(1024 * 1024 * 4, -1, .5);
|
||||||
|
|
||||||
writer_ = tfileWriterCreate(ctx, &header);
|
writer_ = tfileWriterCreate(ctx, &header);
|
||||||
return writer_ != NULL ? true : false;
|
return writer_ != NULL ? true : false;
|
||||||
}
|
}
|
||||||
bool InitReader() {
|
bool InitReader() {
|
||||||
IFileCtx* ctx = idxFileCtxCreate(TFile, fileName_.c_str(), true, 64 * 1024 * 1024);
|
IFileCtx* ctx = idxFileCtxCreate(TFILE, fileName_.c_str(), true, 64 * 1024 * 1024);
|
||||||
|
ctx->lru = taosLRUCacheInit(1024 * 1024 * 4, -1, .5);
|
||||||
reader_ = tfileReaderCreate(ctx);
|
reader_ = tfileReaderCreate(ctx);
|
||||||
return reader_ != NULL ? true : false;
|
return reader_ != NULL ? true : false;
|
||||||
}
|
}
|
||||||
|
@ -657,7 +657,7 @@ TEST_F(IndexCacheEnv, cache_test) {
|
||||||
{
|
{
|
||||||
std::string colVal("v3");
|
std::string colVal("v3");
|
||||||
SIndexTerm* term = indexTermCreateT(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
|
SIndexTerm* term = indexTermCreateT(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
|
||||||
colVal.c_str(), colVal.size());
|
colVal.c_str(), colVal.size());
|
||||||
SIndexTermQuery query = {term, QUERY_TERM};
|
SIndexTermQuery query = {term, QUERY_TERM};
|
||||||
SArray* ret = (SArray*)taosArrayInit(4, sizeof(suid));
|
SArray* ret = (SArray*)taosArrayInit(4, sizeof(suid));
|
||||||
STermValueType valType;
|
STermValueType valType;
|
||||||
|
@ -672,7 +672,7 @@ TEST_F(IndexCacheEnv, cache_test) {
|
||||||
{
|
{
|
||||||
std::string colVal("v2");
|
std::string colVal("v2");
|
||||||
SIndexTerm* term = indexTermCreateT(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
|
SIndexTerm* term = indexTermCreateT(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
|
||||||
colVal.c_str(), colVal.size());
|
colVal.c_str(), colVal.size());
|
||||||
SIndexTermQuery query = {term, QUERY_TERM};
|
SIndexTermQuery query = {term, QUERY_TERM};
|
||||||
SArray* ret = (SArray*)taosArrayInit(4, sizeof(suid));
|
SArray* ret = (SArray*)taosArrayInit(4, sizeof(suid));
|
||||||
STermValueType valType;
|
STermValueType valType;
|
||||||
|
@ -698,6 +698,9 @@ class IndexObj {
|
||||||
taosMkDir(dir.c_str());
|
taosMkDir(dir.c_str());
|
||||||
}
|
}
|
||||||
taosMkDir(dir.c_str());
|
taosMkDir(dir.c_str());
|
||||||
|
SIndexOpts opts;
|
||||||
|
opts.cacheSize = 1024 * 1024 * 4;
|
||||||
|
|
||||||
int ret = indexOpen(&opts, dir.c_str(), &idx);
|
int ret = indexOpen(&opts, dir.c_str(), &idx);
|
||||||
if (ret != 0) {
|
if (ret != 0) {
|
||||||
// opt
|
// opt
|
||||||
|
@ -707,7 +710,7 @@ class IndexObj {
|
||||||
}
|
}
|
||||||
void Del(const std::string& colName, const std::string& colVal, uint64_t uid) {
|
void Del(const std::string& colName, const std::string& colVal, uint64_t uid) {
|
||||||
SIndexTerm* term = indexTermCreateT(0, DEL_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
|
SIndexTerm* term = indexTermCreateT(0, DEL_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
|
||||||
colVal.c_str(), colVal.size());
|
colVal.c_str(), colVal.size());
|
||||||
SIndexMultiTerm* terms = indexMultiTermCreate();
|
SIndexMultiTerm* terms = indexMultiTermCreate();
|
||||||
indexMultiTermAdd(terms, term);
|
indexMultiTermAdd(terms, term);
|
||||||
Put(terms, uid);
|
Put(terms, uid);
|
||||||
|
@ -716,7 +719,7 @@ class IndexObj {
|
||||||
int WriteMillonData(const std::string& colName, const std::string& colVal = "Hello world",
|
int WriteMillonData(const std::string& colName, const std::string& colVal = "Hello world",
|
||||||
size_t numOfTable = 100 * 10000) {
|
size_t numOfTable = 100 * 10000) {
|
||||||
SIndexTerm* term = indexTermCreateT(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
|
SIndexTerm* term = indexTermCreateT(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
|
||||||
colVal.c_str(), colVal.size());
|
colVal.c_str(), colVal.size());
|
||||||
SIndexMultiTerm* terms = indexMultiTermCreate();
|
SIndexMultiTerm* terms = indexMultiTermCreate();
|
||||||
indexMultiTermAdd(terms, term);
|
indexMultiTermAdd(terms, term);
|
||||||
for (size_t i = 0; i < numOfTable; i++) {
|
for (size_t i = 0; i < numOfTable; i++) {
|
||||||
|
@ -738,7 +741,7 @@ class IndexObj {
|
||||||
tColVal[taosRand() % colValSize] = 'a' + k % 26;
|
tColVal[taosRand() % colValSize] = 'a' + k % 26;
|
||||||
}
|
}
|
||||||
SIndexTerm* term = indexTermCreateT(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
|
SIndexTerm* term = indexTermCreateT(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
|
||||||
tColVal.c_str(), tColVal.size());
|
tColVal.c_str(), tColVal.size());
|
||||||
SIndexMultiTerm* terms = indexMultiTermCreate();
|
SIndexMultiTerm* terms = indexMultiTermCreate();
|
||||||
indexMultiTermAdd(terms, term);
|
indexMultiTermAdd(terms, term);
|
||||||
for (size_t j = 0; j < skip; j++) {
|
for (size_t j = 0; j < skip; j++) {
|
||||||
|
@ -774,7 +777,7 @@ class IndexObj {
|
||||||
int SearchOne(const std::string& colName, const std::string& colVal) {
|
int SearchOne(const std::string& colName, const std::string& colVal) {
|
||||||
SIndexMultiTermQuery* mq = indexMultiTermQueryCreate(MUST);
|
SIndexMultiTermQuery* mq = indexMultiTermQueryCreate(MUST);
|
||||||
SIndexTerm* term = indexTermCreateT(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
|
SIndexTerm* term = indexTermCreateT(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
|
||||||
colVal.c_str(), colVal.size());
|
colVal.c_str(), colVal.size());
|
||||||
indexMultiTermQueryAdd(mq, term, QUERY_TERM);
|
indexMultiTermQueryAdd(mq, term, QUERY_TERM);
|
||||||
|
|
||||||
SArray* result = (SArray*)taosArrayInit(1, sizeof(uint64_t));
|
SArray* result = (SArray*)taosArrayInit(1, sizeof(uint64_t));
|
||||||
|
@ -796,7 +799,7 @@ class IndexObj {
|
||||||
int SearchOneTarget(const std::string& colName, const std::string& colVal, uint64_t val) {
|
int SearchOneTarget(const std::string& colName, const std::string& colVal, uint64_t val) {
|
||||||
SIndexMultiTermQuery* mq = indexMultiTermQueryCreate(MUST);
|
SIndexMultiTermQuery* mq = indexMultiTermQueryCreate(MUST);
|
||||||
SIndexTerm* term = indexTermCreateT(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
|
SIndexTerm* term = indexTermCreateT(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
|
||||||
colVal.c_str(), colVal.size());
|
colVal.c_str(), colVal.size());
|
||||||
indexMultiTermQueryAdd(mq, term, QUERY_TERM);
|
indexMultiTermQueryAdd(mq, term, QUERY_TERM);
|
||||||
|
|
||||||
SArray* result = (SArray*)taosArrayInit(1, sizeof(uint64_t));
|
SArray* result = (SArray*)taosArrayInit(1, sizeof(uint64_t));
|
||||||
|
@ -821,7 +824,7 @@ class IndexObj {
|
||||||
void PutOne(const std::string& colName, const std::string& colVal) {
|
void PutOne(const std::string& colName, const std::string& colVal) {
|
||||||
SIndexMultiTerm* terms = indexMultiTermCreate();
|
SIndexMultiTerm* terms = indexMultiTermCreate();
|
||||||
SIndexTerm* term = indexTermCreateT(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
|
SIndexTerm* term = indexTermCreateT(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
|
||||||
colVal.c_str(), colVal.size());
|
colVal.c_str(), colVal.size());
|
||||||
indexMultiTermAdd(terms, term);
|
indexMultiTermAdd(terms, term);
|
||||||
Put(terms, 10);
|
Put(terms, 10);
|
||||||
indexMultiTermDestroy(terms);
|
indexMultiTermDestroy(terms);
|
||||||
|
@ -829,7 +832,7 @@ class IndexObj {
|
||||||
void PutOneTarge(const std::string& colName, const std::string& colVal, uint64_t val) {
|
void PutOneTarge(const std::string& colName, const std::string& colVal, uint64_t val) {
|
||||||
SIndexMultiTerm* terms = indexMultiTermCreate();
|
SIndexMultiTerm* terms = indexMultiTermCreate();
|
||||||
SIndexTerm* term = indexTermCreateT(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
|
SIndexTerm* term = indexTermCreateT(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
|
||||||
colVal.c_str(), colVal.size());
|
colVal.c_str(), colVal.size());
|
||||||
indexMultiTermAdd(terms, term);
|
indexMultiTermAdd(terms, term);
|
||||||
Put(terms, val);
|
Put(terms, val);
|
||||||
indexMultiTermDestroy(terms);
|
indexMultiTermDestroy(terms);
|
||||||
|
@ -845,10 +848,10 @@ class IndexObj {
|
||||||
}
|
}
|
||||||
|
|
||||||
private:
|
private:
|
||||||
SIndexOpts opts;
|
SIndexOpts* opts;
|
||||||
SIndex* idx;
|
SIndex* idx;
|
||||||
int numOfWrite;
|
int numOfWrite;
|
||||||
int numOfRead;
|
int numOfRead;
|
||||||
};
|
};
|
||||||
|
|
||||||
class IndexEnv2 : public ::testing::Test {
|
class IndexEnv2 : public ::testing::Test {
|
||||||
|
@ -875,7 +878,7 @@ TEST_F(IndexEnv2, testIndexOpen) {
|
||||||
std::string colName("tag1"), colVal("Hello");
|
std::string colName("tag1"), colVal("Hello");
|
||||||
|
|
||||||
SIndexTerm* term = indexTermCreateT(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
|
SIndexTerm* term = indexTermCreateT(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
|
||||||
colVal.c_str(), colVal.size());
|
colVal.c_str(), colVal.size());
|
||||||
SIndexMultiTerm* terms = indexMultiTermCreate();
|
SIndexMultiTerm* terms = indexMultiTermCreate();
|
||||||
indexMultiTermAdd(terms, term);
|
indexMultiTermAdd(terms, term);
|
||||||
for (size_t i = 0; i < targetSize; i++) {
|
for (size_t i = 0; i < targetSize; i++) {
|
||||||
|
@ -890,7 +893,7 @@ TEST_F(IndexEnv2, testIndexOpen) {
|
||||||
std::string colName("tag1"), colVal("hello");
|
std::string colName("tag1"), colVal("hello");
|
||||||
|
|
||||||
SIndexTerm* term = indexTermCreateT(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
|
SIndexTerm* term = indexTermCreateT(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
|
||||||
colVal.c_str(), colVal.size());
|
colVal.c_str(), colVal.size());
|
||||||
SIndexMultiTerm* terms = indexMultiTermCreate();
|
SIndexMultiTerm* terms = indexMultiTermCreate();
|
||||||
indexMultiTermAdd(terms, term);
|
indexMultiTermAdd(terms, term);
|
||||||
for (size_t i = 0; i < size; i++) {
|
for (size_t i = 0; i < size; i++) {
|
||||||
|
@ -905,7 +908,7 @@ TEST_F(IndexEnv2, testIndexOpen) {
|
||||||
std::string colName("tag1"), colVal("Hello");
|
std::string colName("tag1"), colVal("Hello");
|
||||||
|
|
||||||
SIndexTerm* term = indexTermCreateT(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
|
SIndexTerm* term = indexTermCreateT(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
|
||||||
colVal.c_str(), colVal.size());
|
colVal.c_str(), colVal.size());
|
||||||
SIndexMultiTerm* terms = indexMultiTermCreate();
|
SIndexMultiTerm* terms = indexMultiTermCreate();
|
||||||
indexMultiTermAdd(terms, term);
|
indexMultiTermAdd(terms, term);
|
||||||
for (size_t i = size * 3; i < size * 4; i++) {
|
for (size_t i = size * 3; i < size * 4; i++) {
|
||||||
|
@ -920,7 +923,7 @@ TEST_F(IndexEnv2, testIndexOpen) {
|
||||||
std::string colName("tag1"), colVal("Hello");
|
std::string colName("tag1"), colVal("Hello");
|
||||||
SIndexMultiTermQuery* mq = indexMultiTermQueryCreate(MUST);
|
SIndexMultiTermQuery* mq = indexMultiTermQueryCreate(MUST);
|
||||||
SIndexTerm* term = indexTermCreateT(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
|
SIndexTerm* term = indexTermCreateT(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
|
||||||
colVal.c_str(), colVal.size());
|
colVal.c_str(), colVal.size());
|
||||||
indexMultiTermQueryAdd(mq, term, QUERY_TERM);
|
indexMultiTermQueryAdd(mq, term, QUERY_TERM);
|
||||||
|
|
||||||
SArray* result = (SArray*)taosArrayInit(1, sizeof(uint64_t));
|
SArray* result = (SArray*)taosArrayInit(1, sizeof(uint64_t));
|
||||||
|
@ -943,7 +946,7 @@ TEST_F(IndexEnv2, testEmptyIndexOpen) {
|
||||||
std::string colName("tag1"), colVal("Hello");
|
std::string colName("tag1"), colVal("Hello");
|
||||||
|
|
||||||
SIndexTerm* term = indexTermCreateT(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
|
SIndexTerm* term = indexTermCreateT(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
|
||||||
colVal.c_str(), colVal.size());
|
colVal.c_str(), colVal.size());
|
||||||
SIndexMultiTerm* terms = indexMultiTermCreate();
|
SIndexMultiTerm* terms = indexMultiTermCreate();
|
||||||
indexMultiTermAdd(terms, term);
|
indexMultiTermAdd(terms, term);
|
||||||
for (size_t i = 0; i < targetSize; i++) {
|
for (size_t i = 0; i < targetSize; i++) {
|
||||||
|
|
|
@ -54,13 +54,12 @@ class JsonEnv : public ::testing::Test {
|
||||||
printf("set up\n");
|
printf("set up\n");
|
||||||
|
|
||||||
initLog();
|
initLog();
|
||||||
opts = indexOptsCreate();
|
opts = indexOptsCreate(1024 * 1024 * 4);
|
||||||
int ret = indexJsonOpen(opts, dir.c_str(), &index);
|
int ret = indexJsonOpen(opts, dir.c_str(), &index);
|
||||||
assert(ret == 0);
|
assert(ret == 0);
|
||||||
}
|
}
|
||||||
virtual void TearDown() {
|
virtual void TearDown() {
|
||||||
indexJsonClose(index);
|
indexJsonClose(index);
|
||||||
indexOptsDestroy(opts);
|
|
||||||
printf("destory\n");
|
printf("destory\n");
|
||||||
taosMsleep(1000);
|
taosMsleep(1000);
|
||||||
}
|
}
|
||||||
|
@ -71,7 +70,7 @@ class JsonEnv : public ::testing::Test {
|
||||||
static void WriteData(SIndexJson* index, const std::string& colName, int8_t dtype, void* data, int dlen, int tableId,
|
static void WriteData(SIndexJson* index, const std::string& colName, int8_t dtype, void* data, int dlen, int tableId,
|
||||||
int8_t operType = ADD_VALUE) {
|
int8_t operType = ADD_VALUE) {
|
||||||
SIndexTerm* term = indexTermCreateT(1, (SIndexOperOnColumn)operType, dtype, colName.c_str(), colName.size(),
|
SIndexTerm* term = indexTermCreateT(1, (SIndexOperOnColumn)operType, dtype, colName.c_str(), colName.size(),
|
||||||
(const char*)data, dlen);
|
(const char*)data, dlen);
|
||||||
SIndexMultiTerm* terms = indexMultiTermCreate();
|
SIndexMultiTerm* terms = indexMultiTermCreate();
|
||||||
indexMultiTermAdd(terms, term);
|
indexMultiTermAdd(terms, term);
|
||||||
indexJsonPut(index, terms, (int64_t)tableId);
|
indexJsonPut(index, terms, (int64_t)tableId);
|
||||||
|
@ -82,7 +81,7 @@ static void WriteData(SIndexJson* index, const std::string& colName, int8_t dtyp
|
||||||
static void delData(SIndexJson* index, const std::string& colName, int8_t dtype, void* data, int dlen, int tableId,
|
static void delData(SIndexJson* index, const std::string& colName, int8_t dtype, void* data, int dlen, int tableId,
|
||||||
int8_t operType = DEL_VALUE) {
|
int8_t operType = DEL_VALUE) {
|
||||||
SIndexTerm* term = indexTermCreateT(1, (SIndexOperOnColumn)operType, dtype, colName.c_str(), colName.size(),
|
SIndexTerm* term = indexTermCreateT(1, (SIndexOperOnColumn)operType, dtype, colName.c_str(), colName.size(),
|
||||||
(const char*)data, dlen);
|
(const char*)data, dlen);
|
||||||
SIndexMultiTerm* terms = indexMultiTermCreate();
|
SIndexMultiTerm* terms = indexMultiTermCreate();
|
||||||
indexMultiTermAdd(terms, term);
|
indexMultiTermAdd(terms, term);
|
||||||
indexJsonPut(index, terms, (int64_t)tableId);
|
indexJsonPut(index, terms, (int64_t)tableId);
|
||||||
|
@ -108,7 +107,7 @@ TEST_F(JsonEnv, testWrite) {
|
||||||
std::string colVal("ab");
|
std::string colVal("ab");
|
||||||
for (int i = 0; i < 100; i++) {
|
for (int i = 0; i < 100; i++) {
|
||||||
SIndexTerm* term = indexTermCreateT(1, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
|
SIndexTerm* term = indexTermCreateT(1, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
|
||||||
colVal.c_str(), colVal.size());
|
colVal.c_str(), colVal.size());
|
||||||
SIndexMultiTerm* terms = indexMultiTermCreate();
|
SIndexMultiTerm* terms = indexMultiTermCreate();
|
||||||
indexMultiTermAdd(terms, term);
|
indexMultiTermAdd(terms, term);
|
||||||
indexJsonPut(index, terms, i);
|
indexJsonPut(index, terms, i);
|
||||||
|
@ -147,7 +146,7 @@ TEST_F(JsonEnv, testWrite) {
|
||||||
|
|
||||||
SIndexMultiTermQuery* mq = indexMultiTermQueryCreate(MUST);
|
SIndexMultiTermQuery* mq = indexMultiTermQueryCreate(MUST);
|
||||||
SIndexTerm* q = indexTermCreateT(1, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
|
SIndexTerm* q = indexTermCreateT(1, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
|
||||||
colVal.c_str(), colVal.size());
|
colVal.c_str(), colVal.size());
|
||||||
|
|
||||||
SArray* result = taosArrayInit(1, sizeof(uint64_t));
|
SArray* result = taosArrayInit(1, sizeof(uint64_t));
|
||||||
indexMultiTermQueryAdd(mq, q, QUERY_TERM);
|
indexMultiTermQueryAdd(mq, q, QUERY_TERM);
|
||||||
|
@ -205,7 +204,7 @@ TEST_F(JsonEnv, testWriteMillonData) {
|
||||||
|
|
||||||
SIndexMultiTermQuery* mq = indexMultiTermQueryCreate(MUST);
|
SIndexMultiTermQuery* mq = indexMultiTermQueryCreate(MUST);
|
||||||
SIndexTerm* q = indexTermCreateT(1, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
|
SIndexTerm* q = indexTermCreateT(1, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
|
||||||
colVal.c_str(), colVal.size());
|
colVal.c_str(), colVal.size());
|
||||||
|
|
||||||
SArray* result = taosArrayInit(1, sizeof(uint64_t));
|
SArray* result = taosArrayInit(1, sizeof(uint64_t));
|
||||||
indexMultiTermQueryAdd(mq, q, QUERY_TERM);
|
indexMultiTermQueryAdd(mq, q, QUERY_TERM);
|
||||||
|
@ -220,7 +219,7 @@ TEST_F(JsonEnv, testWriteMillonData) {
|
||||||
|
|
||||||
SIndexMultiTermQuery* mq = indexMultiTermQueryCreate(MUST);
|
SIndexMultiTermQuery* mq = indexMultiTermQueryCreate(MUST);
|
||||||
SIndexTerm* q = indexTermCreateT(1, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
|
SIndexTerm* q = indexTermCreateT(1, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
|
||||||
colVal.c_str(), colVal.size());
|
colVal.c_str(), colVal.size());
|
||||||
|
|
||||||
SArray* result = taosArrayInit(1, sizeof(uint64_t));
|
SArray* result = taosArrayInit(1, sizeof(uint64_t));
|
||||||
indexMultiTermQueryAdd(mq, q, QUERY_GREATER_THAN);
|
indexMultiTermQueryAdd(mq, q, QUERY_GREATER_THAN);
|
||||||
|
@ -235,7 +234,7 @@ TEST_F(JsonEnv, testWriteMillonData) {
|
||||||
|
|
||||||
SIndexMultiTermQuery* mq = indexMultiTermQueryCreate(MUST);
|
SIndexMultiTermQuery* mq = indexMultiTermQueryCreate(MUST);
|
||||||
SIndexTerm* q = indexTermCreateT(1, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
|
SIndexTerm* q = indexTermCreateT(1, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
|
||||||
colVal.c_str(), colVal.size());
|
colVal.c_str(), colVal.size());
|
||||||
|
|
||||||
SArray* result = taosArrayInit(1, sizeof(uint64_t));
|
SArray* result = taosArrayInit(1, sizeof(uint64_t));
|
||||||
indexMultiTermQueryAdd(mq, q, QUERY_GREATER_EQUAL);
|
indexMultiTermQueryAdd(mq, q, QUERY_GREATER_EQUAL);
|
||||||
|
@ -305,7 +304,7 @@ TEST_F(JsonEnv, testWriteJsonNumberData) {
|
||||||
int val = 15;
|
int val = 15;
|
||||||
SIndexMultiTermQuery* mq = indexMultiTermQueryCreate(MUST);
|
SIndexMultiTermQuery* mq = indexMultiTermQueryCreate(MUST);
|
||||||
SIndexTerm* q = indexTermCreateT(1, ADD_VALUE, TSDB_DATA_TYPE_INT, colName.c_str(), colName.size(),
|
SIndexTerm* q = indexTermCreateT(1, ADD_VALUE, TSDB_DATA_TYPE_INT, colName.c_str(), colName.size(),
|
||||||
(const char*)&val, sizeof(val));
|
(const char*)&val, sizeof(val));
|
||||||
|
|
||||||
SArray* result = taosArrayInit(1, sizeof(uint64_t));
|
SArray* result = taosArrayInit(1, sizeof(uint64_t));
|
||||||
indexMultiTermQueryAdd(mq, q, QUERY_TERM);
|
indexMultiTermQueryAdd(mq, q, QUERY_TERM);
|
||||||
|
@ -319,7 +318,7 @@ TEST_F(JsonEnv, testWriteJsonNumberData) {
|
||||||
|
|
||||||
SIndexMultiTermQuery* mq = indexMultiTermQueryCreate(MUST);
|
SIndexMultiTermQuery* mq = indexMultiTermQueryCreate(MUST);
|
||||||
SIndexTerm* q = indexTermCreateT(1, ADD_VALUE, TSDB_DATA_TYPE_INT, colName.c_str(), colName.size(),
|
SIndexTerm* q = indexTermCreateT(1, ADD_VALUE, TSDB_DATA_TYPE_INT, colName.c_str(), colName.size(),
|
||||||
(const char*)&val, sizeof(val));
|
(const char*)&val, sizeof(val));
|
||||||
|
|
||||||
SArray* result = taosArrayInit(1, sizeof(uint64_t));
|
SArray* result = taosArrayInit(1, sizeof(uint64_t));
|
||||||
indexMultiTermQueryAdd(mq, q, QUERY_GREATER_THAN);
|
indexMultiTermQueryAdd(mq, q, QUERY_GREATER_THAN);
|
||||||
|
@ -334,7 +333,7 @@ TEST_F(JsonEnv, testWriteJsonNumberData) {
|
||||||
|
|
||||||
SIndexMultiTermQuery* mq = indexMultiTermQueryCreate(MUST);
|
SIndexMultiTermQuery* mq = indexMultiTermQueryCreate(MUST);
|
||||||
SIndexTerm* q = indexTermCreateT(1, ADD_VALUE, TSDB_DATA_TYPE_INT, colName.c_str(), colName.size(),
|
SIndexTerm* q = indexTermCreateT(1, ADD_VALUE, TSDB_DATA_TYPE_INT, colName.c_str(), colName.size(),
|
||||||
(const char*)&val, sizeof(int));
|
(const char*)&val, sizeof(int));
|
||||||
|
|
||||||
SArray* result = taosArrayInit(1, sizeof(uint64_t));
|
SArray* result = taosArrayInit(1, sizeof(uint64_t));
|
||||||
indexMultiTermQueryAdd(mq, q, QUERY_GREATER_EQUAL);
|
indexMultiTermQueryAdd(mq, q, QUERY_GREATER_EQUAL);
|
||||||
|
@ -349,7 +348,7 @@ TEST_F(JsonEnv, testWriteJsonNumberData) {
|
||||||
|
|
||||||
SIndexMultiTermQuery* mq = indexMultiTermQueryCreate(MUST);
|
SIndexMultiTermQuery* mq = indexMultiTermQueryCreate(MUST);
|
||||||
SIndexTerm* q = indexTermCreateT(1, ADD_VALUE, TSDB_DATA_TYPE_INT, colName.c_str(), colName.size(),
|
SIndexTerm* q = indexTermCreateT(1, ADD_VALUE, TSDB_DATA_TYPE_INT, colName.c_str(), colName.size(),
|
||||||
(const char*)&val, sizeof(val));
|
(const char*)&val, sizeof(val));
|
||||||
|
|
||||||
SArray* result = taosArrayInit(1, sizeof(uint64_t));
|
SArray* result = taosArrayInit(1, sizeof(uint64_t));
|
||||||
indexMultiTermQueryAdd(mq, q, QUERY_LESS_THAN);
|
indexMultiTermQueryAdd(mq, q, QUERY_LESS_THAN);
|
||||||
|
@ -364,7 +363,7 @@ TEST_F(JsonEnv, testWriteJsonNumberData) {
|
||||||
|
|
||||||
SIndexMultiTermQuery* mq = indexMultiTermQueryCreate(MUST);
|
SIndexMultiTermQuery* mq = indexMultiTermQueryCreate(MUST);
|
||||||
SIndexTerm* q = indexTermCreateT(1, ADD_VALUE, TSDB_DATA_TYPE_INT, colName.c_str(), colName.size(),
|
SIndexTerm* q = indexTermCreateT(1, ADD_VALUE, TSDB_DATA_TYPE_INT, colName.c_str(), colName.size(),
|
||||||
(const char*)&val, sizeof(val));
|
(const char*)&val, sizeof(val));
|
||||||
|
|
||||||
SArray* result = taosArrayInit(1, sizeof(uint64_t));
|
SArray* result = taosArrayInit(1, sizeof(uint64_t));
|
||||||
indexMultiTermQueryAdd(mq, q, QUERY_LESS_EQUAL);
|
indexMultiTermQueryAdd(mq, q, QUERY_LESS_EQUAL);
|
||||||
|
@ -407,7 +406,7 @@ TEST_F(JsonEnv, testWriteJsonTfileAndCache_INT) {
|
||||||
|
|
||||||
SIndexMultiTermQuery* mq = indexMultiTermQueryCreate(MUST);
|
SIndexMultiTermQuery* mq = indexMultiTermQueryCreate(MUST);
|
||||||
SIndexTerm* q = indexTermCreateT(1, ADD_VALUE, TSDB_DATA_TYPE_INT, colName.c_str(), colName.size(),
|
SIndexTerm* q = indexTermCreateT(1, ADD_VALUE, TSDB_DATA_TYPE_INT, colName.c_str(), colName.size(),
|
||||||
(const char*)&val, sizeof(val));
|
(const char*)&val, sizeof(val));
|
||||||
|
|
||||||
SArray* result = taosArrayInit(1, sizeof(uint64_t));
|
SArray* result = taosArrayInit(1, sizeof(uint64_t));
|
||||||
indexMultiTermQueryAdd(mq, q, QUERY_TERM);
|
indexMultiTermQueryAdd(mq, q, QUERY_TERM);
|
||||||
|
@ -421,7 +420,7 @@ TEST_F(JsonEnv, testWriteJsonTfileAndCache_INT) {
|
||||||
|
|
||||||
SIndexMultiTermQuery* mq = indexMultiTermQueryCreate(MUST);
|
SIndexMultiTermQuery* mq = indexMultiTermQueryCreate(MUST);
|
||||||
SIndexTerm* q = indexTermCreateT(1, ADD_VALUE, TSDB_DATA_TYPE_INT, colName.c_str(), colName.size(),
|
SIndexTerm* q = indexTermCreateT(1, ADD_VALUE, TSDB_DATA_TYPE_INT, colName.c_str(), colName.size(),
|
||||||
(const char*)&val, sizeof(int));
|
(const char*)&val, sizeof(int));
|
||||||
|
|
||||||
SArray* result = taosArrayInit(1, sizeof(uint64_t));
|
SArray* result = taosArrayInit(1, sizeof(uint64_t));
|
||||||
indexMultiTermQueryAdd(mq, q, QUERY_GREATER_THAN);
|
indexMultiTermQueryAdd(mq, q, QUERY_GREATER_THAN);
|
||||||
|
@ -436,7 +435,7 @@ TEST_F(JsonEnv, testWriteJsonTfileAndCache_INT) {
|
||||||
|
|
||||||
SIndexMultiTermQuery* mq = indexMultiTermQueryCreate(MUST);
|
SIndexMultiTermQuery* mq = indexMultiTermQueryCreate(MUST);
|
||||||
SIndexTerm* q = indexTermCreateT(1, ADD_VALUE, TSDB_DATA_TYPE_INT, colName.c_str(), colName.size(),
|
SIndexTerm* q = indexTermCreateT(1, ADD_VALUE, TSDB_DATA_TYPE_INT, colName.c_str(), colName.size(),
|
||||||
(const char*)&val, sizeof(val));
|
(const char*)&val, sizeof(val));
|
||||||
|
|
||||||
SArray* result = taosArrayInit(1, sizeof(uint64_t));
|
SArray* result = taosArrayInit(1, sizeof(uint64_t));
|
||||||
indexMultiTermQueryAdd(mq, q, QUERY_GREATER_EQUAL);
|
indexMultiTermQueryAdd(mq, q, QUERY_GREATER_EQUAL);
|
||||||
|
@ -450,7 +449,7 @@ TEST_F(JsonEnv, testWriteJsonTfileAndCache_INT) {
|
||||||
|
|
||||||
SIndexMultiTermQuery* mq = indexMultiTermQueryCreate(MUST);
|
SIndexMultiTermQuery* mq = indexMultiTermQueryCreate(MUST);
|
||||||
SIndexTerm* q = indexTermCreateT(1, ADD_VALUE, TSDB_DATA_TYPE_INT, colName.c_str(), colName.size(),
|
SIndexTerm* q = indexTermCreateT(1, ADD_VALUE, TSDB_DATA_TYPE_INT, colName.c_str(), colName.size(),
|
||||||
(const char*)&val, sizeof(val));
|
(const char*)&val, sizeof(val));
|
||||||
|
|
||||||
SArray* result = taosArrayInit(1, sizeof(uint64_t));
|
SArray* result = taosArrayInit(1, sizeof(uint64_t));
|
||||||
indexMultiTermQueryAdd(mq, q, QUERY_GREATER_THAN);
|
indexMultiTermQueryAdd(mq, q, QUERY_GREATER_THAN);
|
||||||
|
@ -464,7 +463,7 @@ TEST_F(JsonEnv, testWriteJsonTfileAndCache_INT) {
|
||||||
|
|
||||||
SIndexMultiTermQuery* mq = indexMultiTermQueryCreate(MUST);
|
SIndexMultiTermQuery* mq = indexMultiTermQueryCreate(MUST);
|
||||||
SIndexTerm* q = indexTermCreateT(1, ADD_VALUE, TSDB_DATA_TYPE_INT, colName.c_str(), colName.size(),
|
SIndexTerm* q = indexTermCreateT(1, ADD_VALUE, TSDB_DATA_TYPE_INT, colName.c_str(), colName.size(),
|
||||||
(const char*)&val, sizeof(val));
|
(const char*)&val, sizeof(val));
|
||||||
|
|
||||||
SArray* result = taosArrayInit(1, sizeof(uint64_t));
|
SArray* result = taosArrayInit(1, sizeof(uint64_t));
|
||||||
indexMultiTermQueryAdd(mq, q, QUERY_LESS_EQUAL);
|
indexMultiTermQueryAdd(mq, q, QUERY_LESS_EQUAL);
|
||||||
|
@ -493,7 +492,7 @@ TEST_F(JsonEnv, testWriteJsonTfileAndCache_INT) {
|
||||||
|
|
||||||
SIndexMultiTermQuery* mq = indexMultiTermQueryCreate(MUST);
|
SIndexMultiTermQuery* mq = indexMultiTermQueryCreate(MUST);
|
||||||
SIndexTerm* q = indexTermCreateT(1, ADD_VALUE, TSDB_DATA_TYPE_INT, colName.c_str(), colName.size(),
|
SIndexTerm* q = indexTermCreateT(1, ADD_VALUE, TSDB_DATA_TYPE_INT, colName.c_str(), colName.size(),
|
||||||
(const char*)&val, sizeof(val));
|
(const char*)&val, sizeof(val));
|
||||||
|
|
||||||
SArray* result = taosArrayInit(1, sizeof(uint64_t));
|
SArray* result = taosArrayInit(1, sizeof(uint64_t));
|
||||||
indexMultiTermQueryAdd(mq, q, QUERY_LESS_THAN);
|
indexMultiTermQueryAdd(mq, q, QUERY_LESS_THAN);
|
||||||
|
@ -521,7 +520,7 @@ TEST_F(JsonEnv, testWriteJsonTfileAndCache_INT) {
|
||||||
|
|
||||||
SIndexMultiTermQuery* mq = indexMultiTermQueryCreate(MUST);
|
SIndexMultiTermQuery* mq = indexMultiTermQueryCreate(MUST);
|
||||||
SIndexTerm* q = indexTermCreateT(1, ADD_VALUE, TSDB_DATA_TYPE_INT, colName.c_str(), colName.size(),
|
SIndexTerm* q = indexTermCreateT(1, ADD_VALUE, TSDB_DATA_TYPE_INT, colName.c_str(), colName.size(),
|
||||||
(const char*)&val, sizeof(val));
|
(const char*)&val, sizeof(val));
|
||||||
|
|
||||||
SArray* result = taosArrayInit(1, sizeof(uint64_t));
|
SArray* result = taosArrayInit(1, sizeof(uint64_t));
|
||||||
indexMultiTermQueryAdd(mq, q, QUERY_GREATER_EQUAL);
|
indexMultiTermQueryAdd(mq, q, QUERY_GREATER_EQUAL);
|
||||||
|
|
|
@ -1177,6 +1177,29 @@ static int32_t translateRepeatScanFunc(STranslateContext* pCxt, SFunctionNode* p
|
||||||
"%s is only supported in single table query", pFunc->functionName);
|
"%s is only supported in single table query", pFunc->functionName);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static bool isStar(SNode* pNode) {
|
||||||
|
return (QUERY_NODE_COLUMN == nodeType(pNode)) && ('\0' == ((SColumnNode*)pNode)->tableAlias[0]) &&
|
||||||
|
(0 == strcmp(((SColumnNode*)pNode)->colName, "*"));
|
||||||
|
}
|
||||||
|
|
||||||
|
static bool isTableStar(SNode* pNode) {
|
||||||
|
return (QUERY_NODE_COLUMN == nodeType(pNode)) && ('\0' != ((SColumnNode*)pNode)->tableAlias[0]) &&
|
||||||
|
(0 == strcmp(((SColumnNode*)pNode)->colName, "*"));
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t translateMultiResFunc(STranslateContext* pCxt, SFunctionNode* pFunc) {
|
||||||
|
if (!fmIsMultiResFunc(pFunc->funcId)) {
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
if (SQL_CLAUSE_SELECT != pCxt->currClause ) {
|
||||||
|
SNode* pPara = nodesListGetNode(pFunc->pParameterList, 0);
|
||||||
|
if (isStar(pPara) || isTableStar(pPara)) {
|
||||||
|
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_NOT_ALLOWED_FUNC,
|
||||||
|
"%s(*) is only supported in SELECTed list", pFunc->functionName);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
static void setFuncClassification(SNode* pCurrStmt, SFunctionNode* pFunc) {
|
static void setFuncClassification(SNode* pCurrStmt, SFunctionNode* pFunc) {
|
||||||
if (NULL != pCurrStmt && QUERY_NODE_SELECT_STMT == nodeType(pCurrStmt)) {
|
if (NULL != pCurrStmt && QUERY_NODE_SELECT_STMT == nodeType(pCurrStmt)) {
|
||||||
SSelectStmt* pSelect = (SSelectStmt*)pCurrStmt;
|
SSelectStmt* pSelect = (SSelectStmt*)pCurrStmt;
|
||||||
|
@ -1311,6 +1334,9 @@ static int32_t translateNoramlFunction(STranslateContext* pCxt, SFunctionNode* p
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = translateRepeatScanFunc(pCxt, pFunc);
|
code = translateRepeatScanFunc(pCxt, pFunc);
|
||||||
}
|
}
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
code = translateMultiResFunc(pCxt, pFunc);
|
||||||
|
}
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
setFuncClassification(pCxt->pCurrStmt, pFunc);
|
setFuncClassification(pCxt->pCurrStmt, pFunc);
|
||||||
}
|
}
|
||||||
|
@ -1908,16 +1934,6 @@ static int32_t createTableAllCols(STranslateContext* pCxt, SColumnNode* pCol, bo
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static bool isStar(SNode* pNode) {
|
|
||||||
return (QUERY_NODE_COLUMN == nodeType(pNode)) && ('\0' == ((SColumnNode*)pNode)->tableAlias[0]) &&
|
|
||||||
(0 == strcmp(((SColumnNode*)pNode)->colName, "*"));
|
|
||||||
}
|
|
||||||
|
|
||||||
static bool isTableStar(SNode* pNode) {
|
|
||||||
return (QUERY_NODE_COLUMN == nodeType(pNode)) && ('\0' != ((SColumnNode*)pNode)->tableAlias[0]) &&
|
|
||||||
(0 == strcmp(((SColumnNode*)pNode)->colName, "*"));
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t createMultiResFuncsParas(STranslateContext* pCxt, SNodeList* pSrcParas, SNodeList** pOutput) {
|
static int32_t createMultiResFuncsParas(STranslateContext* pCxt, SNodeList* pSrcParas, SNodeList** pOutput) {
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
|
||||||
|
|
|
@ -1919,6 +1919,113 @@ int32_t maxScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *
|
||||||
return doMinMaxScalarFunction(pInput, inputNum, pOutput, false);
|
return doMinMaxScalarFunction(pInput, inputNum, pOutput, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t avgScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
|
||||||
|
SColumnInfoData *pInputData = pInput->columnData;
|
||||||
|
SColumnInfoData *pOutputData = pOutput->columnData;
|
||||||
|
|
||||||
|
int32_t type = GET_PARAM_TYPE(pInput);
|
||||||
|
int64_t count = 0;
|
||||||
|
bool hasNull = false;
|
||||||
|
|
||||||
|
for (int32_t i = 0; i < pInput->numOfRows; ++i) {
|
||||||
|
if (colDataIsNull_s(pInputData, i)) {
|
||||||
|
hasNull = true;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
switch(type) {
|
||||||
|
case TSDB_DATA_TYPE_TINYINT: {
|
||||||
|
int8_t *in = (int8_t *)pInputData->pData;
|
||||||
|
int64_t *out = (int64_t *)pOutputData->pData;
|
||||||
|
*out += in[i];
|
||||||
|
count++;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case TSDB_DATA_TYPE_SMALLINT: {
|
||||||
|
int16_t *in = (int16_t *)pInputData->pData;
|
||||||
|
int64_t *out = (int64_t *)pOutputData->pData;
|
||||||
|
*out += in[i];
|
||||||
|
count++;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case TSDB_DATA_TYPE_INT: {
|
||||||
|
int32_t *in = (int32_t *)pInputData->pData;
|
||||||
|
int64_t *out = (int64_t *)pOutputData->pData;
|
||||||
|
*out += in[i];
|
||||||
|
count++;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case TSDB_DATA_TYPE_BIGINT: {
|
||||||
|
int64_t *in = (int64_t *)pInputData->pData;
|
||||||
|
int64_t *out = (int64_t *)pOutputData->pData;
|
||||||
|
*out += in[i];
|
||||||
|
count++;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case TSDB_DATA_TYPE_UTINYINT: {
|
||||||
|
uint8_t *in = (uint8_t *)pInputData->pData;
|
||||||
|
uint64_t *out = (uint64_t *)pOutputData->pData;
|
||||||
|
*out += in[i];
|
||||||
|
count++;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case TSDB_DATA_TYPE_USMALLINT: {
|
||||||
|
uint16_t *in = (uint16_t *)pInputData->pData;
|
||||||
|
uint64_t *out = (uint64_t *)pOutputData->pData;
|
||||||
|
*out += in[i];
|
||||||
|
count++;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case TSDB_DATA_TYPE_UINT: {
|
||||||
|
uint32_t *in = (uint32_t *)pInputData->pData;
|
||||||
|
uint64_t *out = (uint64_t *)pOutputData->pData;
|
||||||
|
*out += in[i];
|
||||||
|
count++;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case TSDB_DATA_TYPE_UBIGINT: {
|
||||||
|
uint64_t *in = (uint64_t *)pInputData->pData;
|
||||||
|
uint64_t *out = (uint64_t *)pOutputData->pData;
|
||||||
|
*out += in[i];
|
||||||
|
count++;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case TSDB_DATA_TYPE_FLOAT: {
|
||||||
|
float *in = (float *)pInputData->pData;
|
||||||
|
float *out = (float *)pOutputData->pData;
|
||||||
|
*out += in[i];
|
||||||
|
count++;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case TSDB_DATA_TYPE_DOUBLE: {
|
||||||
|
double *in = (double *)pInputData->pData;
|
||||||
|
double *out = (double *)pOutputData->pData;
|
||||||
|
*out += in[i];
|
||||||
|
count++;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (hasNull) {
|
||||||
|
colDataAppendNULL(pOutputData, 0);
|
||||||
|
} else {
|
||||||
|
if (IS_SIGNED_NUMERIC_TYPE(type)) {
|
||||||
|
int64_t *out = (int64_t *)pOutputData->pData;
|
||||||
|
*(double *)out = *out / (double)count;
|
||||||
|
} else if (IS_UNSIGNED_NUMERIC_TYPE(type)) {
|
||||||
|
uint64_t *out = (uint64_t *)pOutputData->pData;
|
||||||
|
*(double *)out = *out / (double)count;
|
||||||
|
} else if (IS_FLOAT_TYPE(type)) {
|
||||||
|
double *out = (double *)pOutputData->pData;
|
||||||
|
*(double *)out = *out / (double)count;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pOutput->numOfRows = 1;
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t stddevScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
|
int32_t stddevScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
|
||||||
SColumnInfoData *pInputData = pInput->columnData;
|
SColumnInfoData *pInputData = pInput->columnData;
|
||||||
SColumnInfoData *pOutputData = pOutput->columnData;
|
SColumnInfoData *pOutputData = pOutput->columnData;
|
||||||
|
@ -2031,3 +2138,4 @@ int32_t stddevScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarPara
|
||||||
pOutput->numOfRows = 1;
|
pOutput->numOfRows = 1;
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -771,6 +771,7 @@ static void cliHandleRelease(SCliMsg* pMsg, SCliThrd* pThrd) {
|
||||||
SExHandle* exh = transAcquireExHandle(transGetRefMgt(), refId);
|
SExHandle* exh = transAcquireExHandle(transGetRefMgt(), refId);
|
||||||
if (exh == NULL) {
|
if (exh == NULL) {
|
||||||
tDebug("%" PRId64 " already release", refId);
|
tDebug("%" PRId64 " already release", refId);
|
||||||
|
destroyCmsg(pMsg);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -24,6 +24,7 @@ class TDTestCase:
|
||||||
tdLog.debug("start to execute %s" % __file__)
|
tdLog.debug("start to execute %s" % __file__)
|
||||||
tdSql.init(conn.cursor())
|
tdSql.init(conn.cursor())
|
||||||
self.dbname = 'db'
|
self.dbname = 'db'
|
||||||
|
self.delaytime = 10
|
||||||
def get_database_info(self):
|
def get_database_info(self):
|
||||||
tdSql.query('select database()')
|
tdSql.query('select database()')
|
||||||
tdSql.checkData(0,0,None)
|
tdSql.checkData(0,0,None)
|
||||||
|
@ -43,14 +44,15 @@ class TDTestCase:
|
||||||
def get_server_status(self):
|
def get_server_status(self):
|
||||||
tdSql.query('select server_status()')
|
tdSql.query('select server_status()')
|
||||||
tdSql.checkData(0,0,1)
|
tdSql.checkData(0,0,1)
|
||||||
tdDnodes.stoptaosd(1)
|
#!for bug
|
||||||
|
# tdDnodes.stoptaosd(1)
|
||||||
|
# sleep(self.delaytime)
|
||||||
|
# tdSql.error('select server_status()')
|
||||||
|
|
||||||
tdSql.query('select server_status()')
|
|
||||||
print(tdSql.queryResult)
|
|
||||||
def run(self):
|
def run(self):
|
||||||
self.get_database_info()
|
self.get_database_info()
|
||||||
self.check_version()
|
self.check_version()
|
||||||
# self.get_server_status()
|
self.get_server_status()
|
||||||
def stop(self):
|
def stop(self):
|
||||||
tdSql.close()
|
tdSql.close()
|
||||||
tdLog.success("%s successfully executed" % __file__)
|
tdLog.success("%s successfully executed" % __file__)
|
||||||
|
|
Loading…
Reference in New Issue