Merge pull request #13319 from taosdata/enh/supportTagFlt
enh: support tag filter
This commit is contained in:
commit
0accd674ad
|
@ -194,6 +194,7 @@ void indexInit();
|
||||||
/* index filter */
|
/* index filter */
|
||||||
typedef struct SIndexMetaArg {
|
typedef struct SIndexMetaArg {
|
||||||
void* metaHandle;
|
void* metaHandle;
|
||||||
|
void* metaEx;
|
||||||
uint64_t suid;
|
uint64_t suid;
|
||||||
} SIndexMetaArg;
|
} SIndexMetaArg;
|
||||||
|
|
||||||
|
|
|
@ -80,6 +80,18 @@ int32_t metaGetTableEntryByUid(SMetaReader *pReader, tb_uid_t uid);
|
||||||
int32_t metaReadNext(SMetaReader *pReader);
|
int32_t metaReadNext(SMetaReader *pReader);
|
||||||
const void *metaGetTableTagVal(SMetaEntry *pEntry, int16_t cid);
|
const void *metaGetTableTagVal(SMetaEntry *pEntry, int16_t cid);
|
||||||
|
|
||||||
|
typedef struct SMetaFltParam {
|
||||||
|
tb_uid_t suid;
|
||||||
|
int16_t cid;
|
||||||
|
int16_t type;
|
||||||
|
char * val;
|
||||||
|
bool reverse;
|
||||||
|
int (*filterFunc)(void *a, void *b, int16_t type);
|
||||||
|
|
||||||
|
} SMetaFltParam;
|
||||||
|
|
||||||
|
int32_t metaFilteTableIds(SMeta *pMeta, SMetaFltParam *param, SArray *results);
|
||||||
|
|
||||||
#if 1 // refact APIs below (TODO)
|
#if 1 // refact APIs below (TODO)
|
||||||
typedef SVCreateTbReq STbCfg;
|
typedef SVCreateTbReq STbCfg;
|
||||||
typedef SVCreateTSmaReq SSmaCfg;
|
typedef SVCreateTSmaReq SSmaCfg;
|
||||||
|
|
|
@ -16,8 +16,8 @@
|
||||||
#ifndef _TD_VNODE_META_H_
|
#ifndef _TD_VNODE_META_H_
|
||||||
#define _TD_VNODE_META_H_
|
#define _TD_VNODE_META_H_
|
||||||
|
|
||||||
#include "vnodeInt.h"
|
|
||||||
#include "index.h"
|
#include "index.h"
|
||||||
|
#include "vnodeInt.h"
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
extern "C" {
|
extern "C" {
|
||||||
|
@ -45,8 +45,6 @@ int32_t metaULock(SMeta* pMeta);
|
||||||
int metaEncodeEntry(SEncoder* pCoder, const SMetaEntry* pME);
|
int metaEncodeEntry(SEncoder* pCoder, const SMetaEntry* pME);
|
||||||
int metaDecodeEntry(SDecoder* pCoder, SMetaEntry* pME);
|
int metaDecodeEntry(SDecoder* pCoder, SMetaEntry* pME);
|
||||||
|
|
||||||
// metaTable ==================
|
|
||||||
|
|
||||||
// metaQuery ==================
|
// metaQuery ==================
|
||||||
int metaGetTableEntryByVersion(SMetaReader* pReader, int64_t version, tb_uid_t uid);
|
int metaGetTableEntryByVersion(SMetaReader* pReader, int64_t version, tb_uid_t uid);
|
||||||
|
|
||||||
|
@ -118,6 +116,10 @@ typedef struct {
|
||||||
int64_t smaUid;
|
int64_t smaUid;
|
||||||
} SSmaIdxKey;
|
} SSmaIdxKey;
|
||||||
|
|
||||||
|
// metaTable ==================
|
||||||
|
int metaCreateTagIdxKey(tb_uid_t suid, int32_t cid, const void* pTagData, int8_t type, tb_uid_t uid,
|
||||||
|
STagIdxKey** ppTagIdxKey, int32_t* nTagIdxKey);
|
||||||
|
|
||||||
#ifndef META_REFACT
|
#ifndef META_REFACT
|
||||||
// SMetaDB
|
// SMetaDB
|
||||||
int metaOpenDB(SMeta* pMeta);
|
int metaOpenDB(SMeta* pMeta);
|
||||||
|
|
|
@ -31,7 +31,7 @@ void metaReaderClear(SMetaReader *pReader) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int metaGetTableEntryByVersion(SMetaReader *pReader, int64_t version, tb_uid_t uid) {
|
int metaGetTableEntryByVersion(SMetaReader *pReader, int64_t version, tb_uid_t uid) {
|
||||||
SMeta *pMeta = pReader->pMeta;
|
SMeta * pMeta = pReader->pMeta;
|
||||||
STbDbKey tbDbKey = {.version = version, .uid = uid};
|
STbDbKey tbDbKey = {.version = version, .uid = uid};
|
||||||
|
|
||||||
// query table.db
|
// query table.db
|
||||||
|
@ -54,7 +54,7 @@ _err:
|
||||||
}
|
}
|
||||||
|
|
||||||
int metaGetTableEntryByUid(SMetaReader *pReader, tb_uid_t uid) {
|
int metaGetTableEntryByUid(SMetaReader *pReader, tb_uid_t uid) {
|
||||||
SMeta *pMeta = pReader->pMeta;
|
SMeta * pMeta = pReader->pMeta;
|
||||||
int64_t version;
|
int64_t version;
|
||||||
|
|
||||||
// query uid.idx
|
// query uid.idx
|
||||||
|
@ -68,7 +68,7 @@ int metaGetTableEntryByUid(SMetaReader *pReader, tb_uid_t uid) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int metaGetTableEntryByName(SMetaReader *pReader, const char *name) {
|
int metaGetTableEntryByName(SMetaReader *pReader, const char *name) {
|
||||||
SMeta *pMeta = pReader->pMeta;
|
SMeta * pMeta = pReader->pMeta;
|
||||||
tb_uid_t uid;
|
tb_uid_t uid;
|
||||||
|
|
||||||
// query name.idx
|
// query name.idx
|
||||||
|
@ -82,7 +82,7 @@ int metaGetTableEntryByName(SMetaReader *pReader, const char *name) {
|
||||||
}
|
}
|
||||||
|
|
||||||
tb_uid_t metaGetTableEntryUidByName(SMeta *pMeta, const char *name) {
|
tb_uid_t metaGetTableEntryUidByName(SMeta *pMeta, const char *name) {
|
||||||
void *pData = NULL;
|
void * pData = NULL;
|
||||||
int nData = 0;
|
int nData = 0;
|
||||||
tb_uid_t uid = 0;
|
tb_uid_t uid = 0;
|
||||||
|
|
||||||
|
@ -134,7 +134,7 @@ void metaCloseTbCursor(SMTbCursor *pTbCur) {
|
||||||
|
|
||||||
int metaTbCursorNext(SMTbCursor *pTbCur) {
|
int metaTbCursorNext(SMTbCursor *pTbCur) {
|
||||||
int ret;
|
int ret;
|
||||||
void *pBuf;
|
void * pBuf;
|
||||||
STbCfg tbCfg;
|
STbCfg tbCfg;
|
||||||
|
|
||||||
for (;;) {
|
for (;;) {
|
||||||
|
@ -155,7 +155,7 @@ int metaTbCursorNext(SMTbCursor *pTbCur) {
|
||||||
}
|
}
|
||||||
|
|
||||||
SSchemaWrapper *metaGetTableSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver, bool isinline) {
|
SSchemaWrapper *metaGetTableSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver, bool isinline) {
|
||||||
void *pData = NULL;
|
void * pData = NULL;
|
||||||
int nData = 0;
|
int nData = 0;
|
||||||
int64_t version;
|
int64_t version;
|
||||||
SSchemaWrapper schema = {0};
|
SSchemaWrapper schema = {0};
|
||||||
|
@ -205,11 +205,11 @@ _err:
|
||||||
}
|
}
|
||||||
|
|
||||||
struct SMCtbCursor {
|
struct SMCtbCursor {
|
||||||
SMeta *pMeta;
|
SMeta * pMeta;
|
||||||
TBC *pCur;
|
TBC * pCur;
|
||||||
tb_uid_t suid;
|
tb_uid_t suid;
|
||||||
void *pKey;
|
void * pKey;
|
||||||
void *pVal;
|
void * pVal;
|
||||||
int kLen;
|
int kLen;
|
||||||
int vLen;
|
int vLen;
|
||||||
};
|
};
|
||||||
|
@ -281,10 +281,10 @@ tb_uid_t metaCtbCursorNext(SMCtbCursor *pCtbCur) {
|
||||||
STSchema *metaGetTbTSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver) {
|
STSchema *metaGetTbTSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver) {
|
||||||
tb_uid_t quid;
|
tb_uid_t quid;
|
||||||
SMetaReader mr = {0};
|
SMetaReader mr = {0};
|
||||||
STSchema *pTSchema = NULL;
|
STSchema * pTSchema = NULL;
|
||||||
SSchemaWrapper *pSW = NULL;
|
SSchemaWrapper *pSW = NULL;
|
||||||
STSchemaBuilder sb = {0};
|
STSchemaBuilder sb = {0};
|
||||||
SSchema *pSchema;
|
SSchema * pSchema;
|
||||||
|
|
||||||
metaReaderInit(&mr, pMeta, 0);
|
metaReaderInit(&mr, pMeta, 0);
|
||||||
metaGetTableEntryByUid(&mr, uid);
|
metaGetTableEntryByUid(&mr, uid);
|
||||||
|
@ -321,11 +321,11 @@ int metaGetTbNum(SMeta *pMeta) {
|
||||||
}
|
}
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
SMeta *pMeta;
|
SMeta * pMeta;
|
||||||
TBC *pCur;
|
TBC * pCur;
|
||||||
tb_uid_t uid;
|
tb_uid_t uid;
|
||||||
void *pKey;
|
void * pKey;
|
||||||
void *pVal;
|
void * pVal;
|
||||||
int kLen;
|
int kLen;
|
||||||
int vLen;
|
int vLen;
|
||||||
} SMSmaCursor;
|
} SMSmaCursor;
|
||||||
|
@ -397,7 +397,7 @@ tb_uid_t metaSmaCursorNext(SMSmaCursor *pSmaCur) {
|
||||||
|
|
||||||
STSmaWrapper *metaGetSmaInfoByTable(SMeta *pMeta, tb_uid_t uid, bool deepCopy) {
|
STSmaWrapper *metaGetSmaInfoByTable(SMeta *pMeta, tb_uid_t uid, bool deepCopy) {
|
||||||
STSmaWrapper *pSW = NULL;
|
STSmaWrapper *pSW = NULL;
|
||||||
SArray *pSmaIds = NULL;
|
SArray * pSmaIds = NULL;
|
||||||
|
|
||||||
if (!(pSmaIds = metaGetSmaIdsByTable(pMeta, uid))) {
|
if (!(pSmaIds = metaGetSmaIdsByTable(pMeta, uid))) {
|
||||||
return NULL;
|
return NULL;
|
||||||
|
@ -421,7 +421,7 @@ STSmaWrapper *metaGetSmaInfoByTable(SMeta *pMeta, tb_uid_t uid, bool deepCopy) {
|
||||||
metaReaderInit(&mr, pMeta, 0);
|
metaReaderInit(&mr, pMeta, 0);
|
||||||
int64_t smaId;
|
int64_t smaId;
|
||||||
int smaIdx = 0;
|
int smaIdx = 0;
|
||||||
STSma *pTSma = NULL;
|
STSma * pTSma = NULL;
|
||||||
for (int i = 0; i < pSW->number; ++i) {
|
for (int i = 0; i < pSW->number; ++i) {
|
||||||
smaId = *(tb_uid_t *)taosArrayGet(pSmaIds, i);
|
smaId = *(tb_uid_t *)taosArrayGet(pSmaIds, i);
|
||||||
if (metaGetTableEntryByUid(&mr, smaId) < 0) {
|
if (metaGetTableEntryByUid(&mr, smaId) < 0) {
|
||||||
|
@ -469,7 +469,7 @@ _err:
|
||||||
}
|
}
|
||||||
|
|
||||||
STSma *metaGetSmaInfoByIndex(SMeta *pMeta, int64_t indexUid) {
|
STSma *metaGetSmaInfoByIndex(SMeta *pMeta, int64_t indexUid) {
|
||||||
STSma *pTSma = NULL;
|
STSma * pTSma = NULL;
|
||||||
SMetaReader mr = {0};
|
SMetaReader mr = {0};
|
||||||
metaReaderInit(&mr, pMeta, 0);
|
metaReaderInit(&mr, pMeta, 0);
|
||||||
if (metaGetTableEntryByUid(&mr, indexUid) < 0) {
|
if (metaGetTableEntryByUid(&mr, indexUid) < 0) {
|
||||||
|
@ -491,7 +491,7 @@ STSma *metaGetSmaInfoByIndex(SMeta *pMeta, int64_t indexUid) {
|
||||||
}
|
}
|
||||||
|
|
||||||
SArray *metaGetSmaIdsByTable(SMeta *pMeta, tb_uid_t uid) {
|
SArray *metaGetSmaIdsByTable(SMeta *pMeta, tb_uid_t uid) {
|
||||||
SArray *pUids = NULL;
|
SArray * pUids = NULL;
|
||||||
SSmaIdxKey *pSmaIdxKey = NULL;
|
SSmaIdxKey *pSmaIdxKey = NULL;
|
||||||
|
|
||||||
SMSmaCursor *pCur = metaOpenSmaCursor(pMeta, uid);
|
SMSmaCursor *pCur = metaOpenSmaCursor(pMeta, uid);
|
||||||
|
@ -529,7 +529,7 @@ SArray *metaGetSmaIdsByTable(SMeta *pMeta, tb_uid_t uid) {
|
||||||
}
|
}
|
||||||
|
|
||||||
SArray *metaGetSmaTbUids(SMeta *pMeta) {
|
SArray *metaGetSmaTbUids(SMeta *pMeta) {
|
||||||
SArray *pUids = NULL;
|
SArray * pUids = NULL;
|
||||||
SSmaIdxKey *pSmaIdxKey = NULL;
|
SSmaIdxKey *pSmaIdxKey = NULL;
|
||||||
tb_uid_t lastUid = 0;
|
tb_uid_t lastUid = 0;
|
||||||
|
|
||||||
|
@ -577,3 +577,78 @@ const void *metaGetTableTagVal(SMetaEntry *pEntry, int16_t cid) {
|
||||||
ASSERT(pEntry->type == TSDB_CHILD_TABLE);
|
ASSERT(pEntry->type == TSDB_CHILD_TABLE);
|
||||||
return tdGetKVRowValOfCol((const SKVRow)pEntry->ctbEntry.pTags, cid);
|
return tdGetKVRowValOfCol((const SKVRow)pEntry->ctbEntry.pTags, cid);
|
||||||
}
|
}
|
||||||
|
typedef struct {
|
||||||
|
SMeta * pMeta;
|
||||||
|
TBC * pCur;
|
||||||
|
tb_uid_t suid;
|
||||||
|
int16_t cid;
|
||||||
|
int16_t type;
|
||||||
|
void * pKey;
|
||||||
|
void * pVal;
|
||||||
|
int32_t kLen;
|
||||||
|
int32_t vLen;
|
||||||
|
} SIdxCursor;
|
||||||
|
|
||||||
|
int32_t metaFilteTableIds(SMeta *pMeta, SMetaFltParam *param, SArray *pUids) {
|
||||||
|
SIdxCursor *pCursor = NULL;
|
||||||
|
|
||||||
|
char *tagData = param->val;
|
||||||
|
|
||||||
|
int32_t ret = 0, valid = 0;
|
||||||
|
pCursor = (SIdxCursor *)taosMemoryCalloc(1, sizeof(SIdxCursor));
|
||||||
|
pCursor->pMeta = pMeta;
|
||||||
|
pCursor->suid = param->suid;
|
||||||
|
pCursor->cid = param->cid;
|
||||||
|
pCursor->type = param->type;
|
||||||
|
|
||||||
|
metaRLock(pMeta);
|
||||||
|
ret = tdbTbcOpen(pMeta->pTagIdx, &pCursor->pCur, NULL);
|
||||||
|
if (ret < 0) {
|
||||||
|
goto END;
|
||||||
|
}
|
||||||
|
STagIdxKey *pKey = NULL;
|
||||||
|
int32_t nKey = 0;
|
||||||
|
|
||||||
|
ret = metaCreateTagIdxKey(pCursor->suid, pCursor->cid, param->val, pCursor->type,
|
||||||
|
param->reverse ? INT64_MAX : INT64_MIN, &pKey, &nKey);
|
||||||
|
if (ret != 0) {
|
||||||
|
goto END;
|
||||||
|
}
|
||||||
|
int cmp = 0;
|
||||||
|
if (tdbTbcMoveTo(pCursor->pCur, pKey, nKey, &cmp) < 0) {
|
||||||
|
goto END;
|
||||||
|
}
|
||||||
|
void * entryKey = NULL, *entryVal = NULL;
|
||||||
|
int32_t nEntryKey, nEntryVal;
|
||||||
|
while (1) {
|
||||||
|
valid = tdbTbcGet(pCursor->pCur, (const void **)&entryKey, &nEntryKey, (const void **)&entryVal, &nEntryVal);
|
||||||
|
if (valid < 0) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
STagIdxKey *p = entryKey;
|
||||||
|
if (p != NULL) {
|
||||||
|
int32_t cmp = (*param->filterFunc)(p->data, pKey->data, pKey->type);
|
||||||
|
if (cmp == 0) {
|
||||||
|
// match
|
||||||
|
tb_uid_t tuid = *(tb_uid_t *)(p->data + tDataTypes[pCursor->type].bytes);
|
||||||
|
taosArrayPush(pUids, &tuid);
|
||||||
|
} else if (cmp == 1) {
|
||||||
|
// not match but should continue to iter
|
||||||
|
} else {
|
||||||
|
// not match and no more result
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
valid = param->reverse ? tdbTbcMoveToPrev(pCursor->pCur) : tdbTbcMoveToNext(pCursor->pCur);
|
||||||
|
if (valid < 0) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
END:
|
||||||
|
if (pCursor->pMeta) metaULock(pCursor->pMeta);
|
||||||
|
if (pCursor->pCur) tdbTbcClose(pCursor->pCur);
|
||||||
|
|
||||||
|
taosMemoryFree(pCursor);
|
||||||
|
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
|
@ -721,8 +721,8 @@ static int metaUpdateCtbIdx(SMeta *pMeta, const SMetaEntry *pME) {
|
||||||
return tdbTbInsert(pMeta->pCtbIdx, &ctbIdxKey, sizeof(ctbIdxKey), NULL, 0, &pMeta->txn);
|
return tdbTbInsert(pMeta->pCtbIdx, &ctbIdxKey, sizeof(ctbIdxKey), NULL, 0, &pMeta->txn);
|
||||||
}
|
}
|
||||||
|
|
||||||
static int metaCreateTagIdxKey(tb_uid_t suid, int32_t cid, const void *pTagData, int8_t type, tb_uid_t uid,
|
int metaCreateTagIdxKey(tb_uid_t suid, int32_t cid, const void *pTagData, int8_t type, tb_uid_t uid,
|
||||||
STagIdxKey **ppTagIdxKey, int32_t *nTagIdxKey) {
|
STagIdxKey **ppTagIdxKey, int32_t *nTagIdxKey) {
|
||||||
int32_t nTagData = 0;
|
int32_t nTagData = 0;
|
||||||
|
|
||||||
if (pTagData) {
|
if (pTagData) {
|
||||||
|
|
|
@ -4529,6 +4529,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
|
||||||
qDebug("%s pDataReader is not NULL", GET_TASKID(pTaskInfo));
|
qDebug("%s pDataReader is not NULL", GET_TASKID(pTaskInfo));
|
||||||
}
|
}
|
||||||
SArray* tableIdList = extractTableIdList(pTableListInfo);
|
SArray* tableIdList = extractTableIdList(pTableListInfo);
|
||||||
|
|
||||||
SOperatorInfo* pOperator = createStreamScanOperatorInfo(pDataReader, pHandle,
|
SOperatorInfo* pOperator = createStreamScanOperatorInfo(pDataReader, pHandle,
|
||||||
tableIdList, pTableScanNode, pTaskInfo, &twSup);
|
tableIdList, pTableScanNode, pTaskInfo, &twSup);
|
||||||
|
|
||||||
|
@ -4942,7 +4943,7 @@ int32_t getTableList(void* metaHandle, int32_t tableType, uint64_t tableUid, STa
|
||||||
|
|
||||||
if (tableType == TSDB_SUPER_TABLE) {
|
if (tableType == TSDB_SUPER_TABLE) {
|
||||||
if (pTagCond) {
|
if (pTagCond) {
|
||||||
SIndexMetaArg metaArg = {.metaHandle = tsdbGetIdx(metaHandle), .suid = tableUid};
|
SIndexMetaArg metaArg = {.metaEx = metaHandle, .metaHandle = tsdbGetIdx(metaHandle), .suid = tableUid};
|
||||||
|
|
||||||
SArray* res = taosArrayInit(8, sizeof(uint64_t));
|
SArray* res = taosArrayInit(8, sizeof(uint64_t));
|
||||||
code = doFilterTag(pTagCond, &metaArg, res);
|
code = doFilterTag(pTagCond, &metaArg, res);
|
||||||
|
|
|
@ -12,6 +12,7 @@ target_link_libraries(
|
||||||
PUBLIC os
|
PUBLIC os
|
||||||
PUBLIC util
|
PUBLIC util
|
||||||
PUBLIC common
|
PUBLIC common
|
||||||
|
PUBLIC vnode
|
||||||
PUBLIC nodes
|
PUBLIC nodes
|
||||||
PUBLIC scalar
|
PUBLIC scalar
|
||||||
PUBLIC function
|
PUBLIC function
|
||||||
|
|
|
@ -33,8 +33,9 @@ typedef enum { MATCH, CONTINUE, BREAK } TExeCond;
|
||||||
|
|
||||||
typedef TExeCond (*_cache_range_compare)(void* a, void* b, int8_t type);
|
typedef TExeCond (*_cache_range_compare)(void* a, void* b, int8_t type);
|
||||||
|
|
||||||
TExeCond tCompare(__compar_fn_t func, int8_t cmpType, void* a, void* b, int8_t dType);
|
__compar_fn_t indexGetCompar(int8_t type);
|
||||||
TExeCond tDoCompare(__compar_fn_t func, int8_t cmpType, void* a, void* b);
|
TExeCond tCompare(__compar_fn_t func, int8_t cmpType, void* a, void* b, int8_t dType);
|
||||||
|
TExeCond tDoCompare(__compar_fn_t func, int8_t cmpType, void* a, void* b);
|
||||||
|
|
||||||
_cache_range_compare indexGetCompare(RangeType ty);
|
_cache_range_compare indexGetCompare(RangeType ty);
|
||||||
|
|
||||||
|
|
|
@ -80,7 +80,7 @@ static TdThreadOnce isInit = PTHREAD_ONCE_INIT;
|
||||||
static int indexTermSearch(SIndex* sIdx, SIndexTermQuery* term, SArray** result);
|
static int indexTermSearch(SIndex* sIdx, SIndexTermQuery* term, SArray** result);
|
||||||
|
|
||||||
static void indexInterResultsDestroy(SArray* results);
|
static void indexInterResultsDestroy(SArray* results);
|
||||||
static int indexMergeFinalResults(SArray* interResults, EIndexOperatorType oType, SArray* finalResult);
|
static int indexMergeFinalResults(SArray* in, EIndexOperatorType oType, SArray* out);
|
||||||
|
|
||||||
static int indexGenTFile(SIndex* index, IndexCache* cache, SArray* batch);
|
static int indexGenTFile(SIndex* index, IndexCache* cache, SArray* batch);
|
||||||
|
|
||||||
|
@ -386,21 +386,21 @@ static void indexInterResultsDestroy(SArray* results) {
|
||||||
taosArrayDestroy(results);
|
taosArrayDestroy(results);
|
||||||
}
|
}
|
||||||
|
|
||||||
static int indexMergeFinalResults(SArray* interResults, EIndexOperatorType oType, SArray* fResults) {
|
static int indexMergeFinalResults(SArray* in, EIndexOperatorType oType, SArray* out) {
|
||||||
// refactor, merge interResults into fResults by oType
|
// refactor, merge interResults into fResults by oType
|
||||||
for (int i = 0; i < taosArrayGetSize(interResults); i--) {
|
for (int i = 0; i < taosArrayGetSize(in); i--) {
|
||||||
SArray* t = taosArrayGetP(interResults, i);
|
SArray* t = taosArrayGetP(in, i);
|
||||||
taosArraySort(t, uidCompare);
|
taosArraySort(t, uidCompare);
|
||||||
taosArrayRemoveDuplicate(t, uidCompare, NULL);
|
taosArrayRemoveDuplicate(t, uidCompare, NULL);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (oType == MUST) {
|
if (oType == MUST) {
|
||||||
iIntersection(interResults, fResults);
|
iIntersection(in, out);
|
||||||
} else if (oType == SHOULD) {
|
} else if (oType == SHOULD) {
|
||||||
iUnion(interResults, fResults);
|
iUnion(in, out);
|
||||||
} else if (oType == NOT) {
|
} else if (oType == NOT) {
|
||||||
// just one column index, enhance later
|
// just one column index, enhance later
|
||||||
taosArrayAddAll(fResults, interResults);
|
// taosArrayAddAll(fResults, interResults);
|
||||||
// not use currently
|
// not use currently
|
||||||
}
|
}
|
||||||
return 0;
|
return 0;
|
||||||
|
|
|
@ -75,7 +75,7 @@ char* indexInt2str(int64_t val, char* dst, int radix) {
|
||||||
;
|
;
|
||||||
return dst - 1;
|
return dst - 1;
|
||||||
}
|
}
|
||||||
static __compar_fn_t indexGetCompar(int8_t type) {
|
__compar_fn_t indexGetCompar(int8_t type) {
|
||||||
if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR) {
|
if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR) {
|
||||||
return (__compar_fn_t)strcmp;
|
return (__compar_fn_t)strcmp;
|
||||||
}
|
}
|
||||||
|
@ -182,6 +182,9 @@ TExeCond tDoCompare(__compar_fn_t func, int8_t comparType, void* a, void* b) {
|
||||||
case QUERY_GREATER_EQUAL: {
|
case QUERY_GREATER_EQUAL: {
|
||||||
if (ret >= 0) return MATCH;
|
if (ret >= 0) return MATCH;
|
||||||
}
|
}
|
||||||
|
case QUERY_TERM: {
|
||||||
|
if (ret == 0) return MATCH;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return CONTINUE;
|
return CONTINUE;
|
||||||
}
|
}
|
||||||
|
|
|
@ -14,11 +14,13 @@
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#include "index.h"
|
#include "index.h"
|
||||||
|
#include "indexComm.h"
|
||||||
#include "indexInt.h"
|
#include "indexInt.h"
|
||||||
#include "nodes.h"
|
#include "nodes.h"
|
||||||
#include "querynodes.h"
|
#include "querynodes.h"
|
||||||
#include "scalar.h"
|
#include "scalar.h"
|
||||||
#include "tdatablock.h"
|
#include "tdatablock.h"
|
||||||
|
#include "vnode.h"
|
||||||
|
|
||||||
// clang-format off
|
// clang-format off
|
||||||
#define SIF_ERR_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; return _code; } } while (0)
|
#define SIF_ERR_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; return _code; } } while (0)
|
||||||
|
@ -259,10 +261,52 @@ static int32_t sifExecFunction(SFunctionNode *node, SIFCtx *ctx, SIFParam *outpu
|
||||||
indexError("index-filter not support buildin function");
|
indexError("index-filter not support buildin function");
|
||||||
return TSDB_CODE_QRY_INVALID_INPUT;
|
return TSDB_CODE_QRY_INVALID_INPUT;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
typedef int (*Filter)(void *a, void *b, int16_t dtype);
|
||||||
|
|
||||||
|
int sifGreaterThan(void *a, void *b, int16_t dtype) {
|
||||||
|
__compar_fn_t func = indexGetCompar(dtype);
|
||||||
|
return tDoCompare(func, QUERY_GREATER_THAN, a, b);
|
||||||
|
}
|
||||||
|
int sifGreaterEqual(void *a, void *b, int16_t dtype) {
|
||||||
|
__compar_fn_t func = indexGetCompar(dtype);
|
||||||
|
return tDoCompare(func, QUERY_GREATER_EQUAL, a, b);
|
||||||
|
}
|
||||||
|
int sifLessEqual(void *a, void *b, int16_t dtype) {
|
||||||
|
__compar_fn_t func = indexGetCompar(dtype);
|
||||||
|
return tDoCompare(func, QUERY_LESS_EQUAL, a, b);
|
||||||
|
}
|
||||||
|
int sifLessThan(void *a, void *b, int16_t dtype) {
|
||||||
|
__compar_fn_t func = indexGetCompar(dtype);
|
||||||
|
return (int)tDoCompare(func, QUERY_LESS_THAN, a, b);
|
||||||
|
}
|
||||||
|
int sifEqual(void *a, void *b, int16_t dtype) {
|
||||||
|
__compar_fn_t func = indexGetCompar(dtype);
|
||||||
|
return (int)tDoCompare(func, QUERY_TERM, a, b);
|
||||||
|
}
|
||||||
|
static Filter sifGetFilterFunc(EIndexQueryType type, bool *reverse) {
|
||||||
|
if (type == QUERY_LESS_EQUAL || type == QUERY_LESS_THAN) {
|
||||||
|
*reverse = true;
|
||||||
|
} else {
|
||||||
|
*reverse = false;
|
||||||
|
}
|
||||||
|
if (type == QUERY_LESS_EQUAL)
|
||||||
|
return sifLessEqual;
|
||||||
|
else if (type == QUERY_LESS_THAN)
|
||||||
|
return sifLessThan;
|
||||||
|
else if (type == QUERY_GREATER_EQUAL)
|
||||||
|
return sifGreaterEqual;
|
||||||
|
else if (type == QUERY_GREATER_THAN)
|
||||||
|
return sifGreaterThan;
|
||||||
|
else if (type == QUERY_TERM) {
|
||||||
|
return sifEqual;
|
||||||
|
}
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
static int32_t sifDoIndex(SIFParam *left, SIFParam *right, int8_t operType, SIFParam *output) {
|
static int32_t sifDoIndex(SIFParam *left, SIFParam *right, int8_t operType, SIFParam *output) {
|
||||||
#ifdef USE_INVERTED_INDEX
|
|
||||||
SIndexMetaArg *arg = &output->arg;
|
SIndexMetaArg *arg = &output->arg;
|
||||||
SIndexTerm * tm = indexTermCreate(arg->suid, DEFAULT, left->colValType, left->colName, strlen(left->colName),
|
#ifdef USE_INVERTED_INDEX
|
||||||
|
SIndexTerm *tm = indexTermCreate(arg->suid, DEFAULT, left->colValType, left->colName, strlen(left->colName),
|
||||||
right->condValue, strlen(right->condValue));
|
right->condValue, strlen(right->condValue));
|
||||||
if (tm == NULL) {
|
if (tm == NULL) {
|
||||||
return TSDB_CODE_QRY_OUT_OF_MEMORY;
|
return TSDB_CODE_QRY_OUT_OF_MEMORY;
|
||||||
|
@ -278,8 +322,22 @@ static int32_t sifDoIndex(SIFParam *left, SIFParam *right, int8_t operType, SIFP
|
||||||
indexMultiTermQueryDestroy(mtm);
|
indexMultiTermQueryDestroy(mtm);
|
||||||
return ret;
|
return ret;
|
||||||
#else
|
#else
|
||||||
return 0;
|
EIndexQueryType qtype = 0;
|
||||||
|
SIF_ERR_RET(sifGetFuncFromSql(operType, &qtype));
|
||||||
|
bool reverse;
|
||||||
|
Filter filterFunc = sifGetFilterFunc(qtype, &reverse);
|
||||||
|
|
||||||
|
SMetaFltParam param = {.suid = arg->suid,
|
||||||
|
.cid = left->colId,
|
||||||
|
.type = left->colValType,
|
||||||
|
.val = right->condValue,
|
||||||
|
.reverse = reverse,
|
||||||
|
.filterFunc = filterFunc};
|
||||||
|
|
||||||
|
int ret = metaFilteTableIds(arg->metaEx, ¶m, output->result);
|
||||||
|
return ret;
|
||||||
#endif
|
#endif
|
||||||
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t sifLessThanFunc(SIFParam *left, SIFParam *right, SIFParam *output) {
|
static int32_t sifLessThanFunc(SIFParam *left, SIFParam *right, SIFParam *output) {
|
||||||
|
|
|
@ -24,8 +24,8 @@ int tIndexJsonPut(SIndexJson *index, SIndexJsonMultiTerm *terms, uint64_t uid) {
|
||||||
SIndexJsonTerm *p = taosArrayGetP(terms, i);
|
SIndexJsonTerm *p = taosArrayGetP(terms, i);
|
||||||
INDEX_TYPE_ADD_EXTERN_TYPE(p->colType, TSDB_DATA_TYPE_JSON);
|
INDEX_TYPE_ADD_EXTERN_TYPE(p->colType, TSDB_DATA_TYPE_JSON);
|
||||||
}
|
}
|
||||||
return indexPut(index, terms, uid);
|
|
||||||
// handle put
|
// handle put
|
||||||
|
return indexPut(index, terms, uid);
|
||||||
}
|
}
|
||||||
|
|
||||||
int tIndexJsonSearch(SIndexJson *index, SIndexJsonMultiTermQuery *tq, SArray *result) {
|
int tIndexJsonSearch(SIndexJson *index, SIndexJsonMultiTermQuery *tq, SArray *result) {
|
||||||
|
@ -34,11 +34,11 @@ int tIndexJsonSearch(SIndexJson *index, SIndexJsonMultiTermQuery *tq, SArray *re
|
||||||
SIndexJsonTerm *p = taosArrayGetP(terms, i);
|
SIndexJsonTerm *p = taosArrayGetP(terms, i);
|
||||||
INDEX_TYPE_ADD_EXTERN_TYPE(p->colType, TSDB_DATA_TYPE_JSON);
|
INDEX_TYPE_ADD_EXTERN_TYPE(p->colType, TSDB_DATA_TYPE_JSON);
|
||||||
}
|
}
|
||||||
return indexSearch(index, tq, result);
|
|
||||||
// handle search
|
// handle search
|
||||||
|
return indexSearch(index, tq, result);
|
||||||
}
|
}
|
||||||
|
|
||||||
void tIndexJsonClose(SIndexJson *index) {
|
void tIndexJsonClose(SIndexJson *index) {
|
||||||
return indexClose(index);
|
|
||||||
// handle close
|
// handle close
|
||||||
|
return indexClose(index);
|
||||||
}
|
}
|
||||||
|
|
|
@ -16,7 +16,7 @@
|
||||||
#include "tdbInt.h"
|
#include "tdbInt.h"
|
||||||
|
|
||||||
struct STTB {
|
struct STTB {
|
||||||
TDB *pEnv;
|
TDB * pEnv;
|
||||||
SBTree *pBt;
|
SBTree *pBt;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -25,11 +25,11 @@ struct STBC {
|
||||||
};
|
};
|
||||||
|
|
||||||
int tdbTbOpen(const char *tbname, int keyLen, int valLen, tdb_cmpr_fn_t keyCmprFn, TDB *pEnv, TTB **ppTb) {
|
int tdbTbOpen(const char *tbname, int keyLen, int valLen, tdb_cmpr_fn_t keyCmprFn, TDB *pEnv, TTB **ppTb) {
|
||||||
TTB *pTb;
|
TTB * pTb;
|
||||||
SPager *pPager;
|
SPager *pPager;
|
||||||
int ret;
|
int ret;
|
||||||
char fFullName[TDB_FILENAME_LEN];
|
char fFullName[TDB_FILENAME_LEN];
|
||||||
SPage *pPage;
|
SPage * pPage;
|
||||||
SPgno pgno;
|
SPgno pgno;
|
||||||
|
|
||||||
*ppTb = NULL;
|
*ppTb = NULL;
|
||||||
|
|
Loading…
Reference in New Issue