Merge pull request #12089 from taosdata/enh/supportIdxFilter
enh(index): support index filter
This commit is contained in:
commit
13b42a00fa
|
@ -8,7 +8,7 @@ add_library(executor STATIC ${EXECUTOR_SRC})
|
|||
# )
|
||||
|
||||
target_link_libraries(executor
|
||||
PRIVATE os util common function parser planner qcom vnode scalar nodes
|
||||
PRIVATE os util common function parser planner qcom vnode scalar nodes index
|
||||
)
|
||||
|
||||
target_include_directories(
|
||||
|
@ -19,4 +19,4 @@ target_include_directories(
|
|||
|
||||
#if(${BUILD_TEST})
|
||||
ADD_SUBDIRECTORY(test)
|
||||
#endif(${BUILD_TEST})
|
||||
#endif(${BUILD_TEST})
|
||||
|
|
|
@ -15,7 +15,9 @@
|
|||
|
||||
#include "indexoperator.h"
|
||||
#include "executorimpl.h"
|
||||
#include "index.h"
|
||||
#include "nodes.h"
|
||||
#include "tdatablock.h"
|
||||
|
||||
typedef struct SIFCtx {
|
||||
int32_t code;
|
||||
|
@ -48,11 +50,19 @@ typedef struct SIFCtx {
|
|||
} while (0)
|
||||
|
||||
typedef struct SIFParam {
|
||||
SArray * result;
|
||||
SHashObj *pFilter;
|
||||
|
||||
SArray *result;
|
||||
char * condValue;
|
||||
|
||||
col_id_t colId;
|
||||
|
||||
int64_t suid; // add later
|
||||
char dbName[TSDB_DB_NAME_LEN];
|
||||
char colName[TSDB_COL_NAME_LEN];
|
||||
} SIFParam;
|
||||
|
||||
typedef int32_t (*sif_func_t)(SNode *left, SNode *rigth, SIFParam *output);
|
||||
typedef int32_t (*sif_func_t)(SIFParam *left, SIFParam *rigth, SIFParam *output);
|
||||
// construct tag filter operator later
|
||||
static void destroyTagFilterOperatorInfo(void *param) {
|
||||
STagFilterOperatorInfo *pInfo = (STagFilterOperatorInfo *)param;
|
||||
|
@ -60,7 +70,10 @@ static void destroyTagFilterOperatorInfo(void *param) {
|
|||
|
||||
static void sifFreeParam(SIFParam *param) {
|
||||
if (param == NULL) return;
|
||||
|
||||
taosArrayDestroy(param->result);
|
||||
taosMemoryFree(param->condValue);
|
||||
taosHashCleanup(param->pFilter);
|
||||
}
|
||||
|
||||
static int32_t sifGetOperParamNum(EOperatorType ty) {
|
||||
|
@ -71,15 +84,70 @@ static int32_t sifGetOperParamNum(EOperatorType ty) {
|
|||
}
|
||||
return 2;
|
||||
}
|
||||
static int32_t sifValidateColumn(SColumnNode *cn) {
|
||||
// add more check
|
||||
if (cn == NULL) {
|
||||
return TSDB_CODE_QRY_INVALID_INPUT;
|
||||
}
|
||||
if (cn->colType != COLUMN_TYPE_TAG) {
|
||||
return TSDB_CODE_QRY_INVALID_INPUT;
|
||||
}
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t sifGetValueFromNode(SNode *node, char **value) {
|
||||
// covert data From snode;
|
||||
SValueNode *vn = (SValueNode *)node;
|
||||
|
||||
char * pData = nodesGetValueFromNode(vn);
|
||||
SDataType *pType = &vn->node.resType;
|
||||
int32_t type = pType->type;
|
||||
int32_t valLen = 0;
|
||||
|
||||
if (IS_VAR_DATA_TYPE(type)) {
|
||||
int32_t dataLen = varDataTLen(pData);
|
||||
if (type == TSDB_DATA_TYPE_JSON) {
|
||||
if (*pData == TSDB_DATA_TYPE_NULL) {
|
||||
dataLen = 0;
|
||||
} else if (*pData == TSDB_DATA_TYPE_NCHAR) {
|
||||
dataLen = varDataTLen(pData + CHAR_BYTES);
|
||||
} else if (*pData == TSDB_DATA_TYPE_BIGINT || *pData == TSDB_DATA_TYPE_DOUBLE) {
|
||||
dataLen = LONG_BYTES;
|
||||
} else if (*pData == TSDB_DATA_TYPE_BOOL) {
|
||||
dataLen = CHAR_BYTES;
|
||||
}
|
||||
dataLen += CHAR_BYTES;
|
||||
}
|
||||
valLen = dataLen;
|
||||
} else {
|
||||
valLen = pType->bytes;
|
||||
}
|
||||
char *tv = taosMemoryCalloc(1, valLen + 1);
|
||||
if (tv == NULL) {
|
||||
return TSDB_CODE_QRY_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
memcpy(tv, pData, valLen);
|
||||
*value = tv;
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t sifInitParam(SNode *node, SIFParam *param, SIFCtx *ctx) {
|
||||
switch (nodeType(node)) {
|
||||
case QUERY_NODE_VALUE: {
|
||||
SValueNode *vn = (SValueNode *)node;
|
||||
|
||||
SIF_ERR_RET(sifGetValueFromNode(node, ¶m->condValue));
|
||||
param->colId = -1;
|
||||
break;
|
||||
}
|
||||
case QUERY_NODE_COLUMN: {
|
||||
SColumnNode *cn = (SColumnNode *)node;
|
||||
/*only support tag column*/
|
||||
SIF_ERR_RET(sifValidateColumn(cn));
|
||||
param->colId = cn->colId;
|
||||
memcpy(param->dbName, cn->dbName, sizeof(cn->dbName));
|
||||
memcpy(param->colName, cn->colName, sizeof(cn->colName));
|
||||
|
||||
break;
|
||||
}
|
||||
|
@ -89,7 +157,7 @@ static int32_t sifInitParam(SNode *node, SIFParam *param, SIFCtx *ctx) {
|
|||
qError("invalid length for node:%p, length: %d", node, LIST_LENGTH(nl->pNodeList));
|
||||
SIF_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
|
||||
}
|
||||
|
||||
SIF_ERR_RET(scalarGenerateSetFromList((void **)¶m->pFilter, node, nl->dataType.type));
|
||||
if (taosHashPut(ctx->pRes, &node, POINTER_BYTES, param, sizeof(*param))) {
|
||||
taosHashCleanup(param->pFilter);
|
||||
qError("taosHashPut nodeList failed, size:%d", (int32_t)sizeof(*param));
|
||||
|
@ -163,58 +231,63 @@ static int32_t sifExecFunction(SFunctionNode *node, SIFCtx *ctx, SIFParam *outpu
|
|||
qError("index-filter not support buildin function");
|
||||
return TSDB_CODE_QRY_INVALID_INPUT;
|
||||
}
|
||||
|
||||
static int32_t sifLessThanFunc(SNode *left, SNode *rigth, SIFParam *output) {
|
||||
// impl later
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
static int32_t sifLessEqualFunc(SNode *left, SNode *rigth, SIFParam *output) {
|
||||
// impl later
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
static int32_t sifGreaterThanFunc(SNode *left, SNode *rigth, SIFParam *output) {
|
||||
// impl later
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
static int32_t sifGreaterEqualFunc(SNode *left, SNode *rigth, SIFParam *output) {
|
||||
// impl later
|
||||
static int32_t sifIndex(SIFParam *left, SIFParam *right, int8_t operType, SIFParam *output) {
|
||||
SIndexMultiTermQuery *mq = indexMultiTermQueryCreate(MUST);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t sifEqualFunc(SNode *left, SNode *rigth, SIFParam *output) {
|
||||
// impl later
|
||||
return TSDB_CODE_SUCCESS;
|
||||
static int32_t sifLessThanFunc(SIFParam *left, SIFParam *right, SIFParam *output) {
|
||||
int id = OP_TYPE_LOWER_THAN;
|
||||
return sifIndex(left, right, id, output);
|
||||
}
|
||||
static int32_t sifNotEqualFunc(SNode *left, SNode *rigth, SIFParam *output) {
|
||||
// impl later
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
static int32_t sifInFunc(SNode *left, SNode *rigth, SIFParam *output) {
|
||||
// impl later
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
static int32_t sifNotInFunc(SNode *left, SNode *right, SIFParam *output) {
|
||||
// impl later
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
static int32_t sifLikeFunc(SNode *left, SNode *right, SIFParam *output) {
|
||||
// impl later
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
static int32_t sifNotLikeFunc(SNode *left, SNode *right, SIFParam *output) {
|
||||
// impl later
|
||||
return TSDB_CODE_SUCCESS;
|
||||
static int32_t sifLessEqualFunc(SIFParam *left, SIFParam *right, SIFParam *output) {
|
||||
int id = OP_TYPE_LOWER_EQUAL;
|
||||
return sifIndex(left, right, id, output);
|
||||
}
|
||||
|
||||
static int32_t sifMatchFunc(SNode *left, SNode *rigth, SIFParam *output) {
|
||||
// impl later
|
||||
return TSDB_CODE_SUCCESS;
|
||||
static int32_t sifGreaterThanFunc(SIFParam *left, SIFParam *right, SIFParam *output) {
|
||||
int id = OP_TYPE_GREATER_THAN;
|
||||
return sifIndex(left, right, id, output);
|
||||
}
|
||||
static int32_t sifNotMatchFunc(SNode *left, SNode *rigth, SIFParam *output) {
|
||||
// impl later
|
||||
return TSDB_CODE_SUCCESS;
|
||||
static int32_t sifGreaterEqualFunc(SIFParam *left, SIFParam *right, SIFParam *output) {
|
||||
int id = OP_TYPE_GREATER_EQUAL;
|
||||
return sifIndex(left, right, id, output);
|
||||
}
|
||||
static int32_t sifDefaultFunc(SNode *left, SNode *rigth, SIFParam *output) {
|
||||
|
||||
static int32_t sifEqualFunc(SIFParam *left, SIFParam *right, SIFParam *output) {
|
||||
int id = OP_TYPE_EQUAL;
|
||||
return sifIndex(left, right, id, output);
|
||||
}
|
||||
static int32_t sifNotEqualFunc(SIFParam *left, SIFParam *right, SIFParam *output) {
|
||||
int id = OP_TYPE_NOT_EQUAL;
|
||||
return sifIndex(left, right, id, output);
|
||||
}
|
||||
static int32_t sifInFunc(SIFParam *left, SIFParam *right, SIFParam *output) {
|
||||
int id = OP_TYPE_IN;
|
||||
return sifIndex(left, right, id, output);
|
||||
}
|
||||
static int32_t sifNotInFunc(SIFParam *left, SIFParam *right, SIFParam *output) {
|
||||
int id = OP_TYPE_NOT_IN;
|
||||
return sifIndex(left, right, id, output);
|
||||
}
|
||||
static int32_t sifLikeFunc(SIFParam *left, SIFParam *right, SIFParam *output) {
|
||||
int id = OP_TYPE_LIKE;
|
||||
return sifIndex(left, right, id, output);
|
||||
}
|
||||
static int32_t sifNotLikeFunc(SIFParam *left, SIFParam *right, SIFParam *output) {
|
||||
int id = OP_TYPE_NOT_LIKE;
|
||||
return sifIndex(left, right, id, output);
|
||||
}
|
||||
|
||||
static int32_t sifMatchFunc(SIFParam *left, SIFParam *right, SIFParam *output) {
|
||||
int id = OP_TYPE_MATCH;
|
||||
return sifIndex(left, right, id, output);
|
||||
}
|
||||
static int32_t sifNotMatchFunc(SIFParam *left, SIFParam *right, SIFParam *output) {
|
||||
int id = OP_TYPE_NMATCH;
|
||||
return sifIndex(left, right, id, output);
|
||||
}
|
||||
static int32_t sifDefaultFunc(SIFParam *left, SIFParam *right, SIFParam *output) {
|
||||
// add more except
|
||||
return TSDB_CODE_QRY_INVALID_INPUT;
|
||||
}
|
||||
|
@ -252,17 +325,18 @@ static sif_func_t sifGetOperFn(int32_t funcId) {
|
|||
return sifDefaultFunc;
|
||||
}
|
||||
static int32_t sifExecOper(SOperatorNode *node, SIFCtx *ctx, SIFParam *output) {
|
||||
int32_t code = 0;
|
||||
SIFParam *params = NULL;
|
||||
SIF_ERR_RET(sifInitOperParams(¶ms, node, ctx));
|
||||
|
||||
int32_t code = 0;
|
||||
int32_t nParam = sifGetOperParamNum(node->opType);
|
||||
if (nParam <= 1) {
|
||||
SIF_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
|
||||
}
|
||||
|
||||
SIFParam *params = NULL;
|
||||
SIF_ERR_RET(sifInitOperParams(¶ms, node, ctx));
|
||||
|
||||
sif_func_t operFn = sifGetOperFn(node->opType);
|
||||
|
||||
return operFn(node->pLeft, node->pRight, output);
|
||||
return operFn(¶ms[0], nParam > 1 ? ¶ms[1] : NULL, output);
|
||||
_return:
|
||||
taosMemoryFree(params);
|
||||
SIF_RET(code);
|
||||
|
@ -335,7 +409,6 @@ static EDealRes sifWalkOper(SNode *pNode, void *context) {
|
|||
if (ctx->code) {
|
||||
return DEAL_RES_ERROR;
|
||||
}
|
||||
|
||||
if (taosHashPut(ctx->pRes, &pNode, POINTER_BYTES, &output, sizeof(output))) {
|
||||
ctx->code = TSDB_CODE_QRY_OUT_OF_MEMORY;
|
||||
return DEAL_RES_ERROR;
|
||||
|
|
|
@ -258,13 +258,13 @@ void indexOptsDestroy(SIndexOpts* opts) {
|
|||
*
|
||||
*/
|
||||
SIndexMultiTermQuery* indexMultiTermQueryCreate(EIndexOperatorType opera) {
|
||||
SIndexMultiTermQuery* p = (SIndexMultiTermQuery*)taosMemoryMalloc(sizeof(SIndexMultiTermQuery));
|
||||
if (p == NULL) {
|
||||
SIndexMultiTermQuery* mtq = (SIndexMultiTermQuery*)taosMemoryMalloc(sizeof(SIndexMultiTermQuery));
|
||||
if (mtq == NULL) {
|
||||
return NULL;
|
||||
}
|
||||
p->opera = opera;
|
||||
p->query = taosArrayInit(4, sizeof(SIndexTermQuery));
|
||||
return p;
|
||||
mtq->opera = opera;
|
||||
mtq->query = taosArrayInit(4, sizeof(SIndexTermQuery));
|
||||
return mtq;
|
||||
}
|
||||
void indexMultiTermQueryDestroy(SIndexMultiTermQuery* pQuery) {
|
||||
for (int i = 0; i < taosArrayGetSize(pQuery->query); i++) {
|
||||
|
@ -282,23 +282,24 @@ int indexMultiTermQueryAdd(SIndexMultiTermQuery* pQuery, SIndexTerm* term, EInde
|
|||
|
||||
SIndexTerm* indexTermCreate(int64_t suid, SIndexOperOnColumn oper, uint8_t colType, const char* colName,
|
||||
int32_t nColName, const char* colVal, int32_t nColVal) {
|
||||
SIndexTerm* t = (SIndexTerm*)taosMemoryCalloc(1, (sizeof(SIndexTerm)));
|
||||
if (t == NULL) {
|
||||
SIndexTerm* tm = (SIndexTerm*)taosMemoryCalloc(1, (sizeof(SIndexTerm)));
|
||||
if (tm == NULL) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
t->suid = suid;
|
||||
t->operType = oper;
|
||||
t->colType = colType;
|
||||
tm->suid = suid;
|
||||
tm->operType = oper;
|
||||
tm->colType = colType;
|
||||
|
||||
t->colName = (char*)taosMemoryCalloc(1, nColName + 1);
|
||||
memcpy(t->colName, colName, nColName);
|
||||
t->nColName = nColName;
|
||||
tm->colName = (char*)taosMemoryCalloc(1, nColName + 1);
|
||||
memcpy(tm->colName, colName, nColName);
|
||||
tm->nColName = nColName;
|
||||
|
||||
t->colVal = (char*)taosMemoryCalloc(1, nColVal + 1);
|
||||
memcpy(t->colVal, colVal, nColVal);
|
||||
t->nColVal = nColVal;
|
||||
return t;
|
||||
tm->colVal = (char*)taosMemoryCalloc(1, nColVal + 1);
|
||||
memcpy(tm->colVal, colVal, nColVal);
|
||||
tm->nColVal = nColVal;
|
||||
|
||||
return tm;
|
||||
}
|
||||
void indexTermDestroy(SIndexTerm* p) {
|
||||
taosMemoryFree(p->colName);
|
||||
|
|
Loading…
Reference in New Issue