enh(index): update index filter

This commit is contained in:
yihaoDeng 2022-05-11 18:00:42 +08:00
parent 4a7910973e
commit 17bbd59a82
1 changed files with 90 additions and 20 deletions

View File

@ -22,6 +22,8 @@
typedef struct SIFCtx {
int32_t code;
SHashObj *pRes; /* element is SScalarParam */
bool noExec; // true: just iterate condition tree, and add hint to executor plan
// SIdxFltStatus st;
} SIFCtx;
#define SIF_ERR_RET(c) \
@ -55,6 +57,7 @@ typedef struct SIFParam {
SArray *result;
char * condValue;
SIdxFltStatus status;
uint8_t colValType;
col_id_t colId;
int64_t suid; // add later
@ -82,6 +85,9 @@ static int32_t sifGetFuncFromSql(EOperatorType src, EIndexQueryType *dst) {
}
typedef int32_t (*sif_func_t)(SIFParam *left, SIFParam *rigth, SIFParam *output);
static sif_func_t sifNullFunc = NULL;
// typedef struct SIFWalkParm
// construct tag filter operator later
static void destroyTagFilterOperatorInfo(void *param) {
STagFilterOperatorInfo *pInfo = (STagFilterOperatorInfo *)param;
@ -114,6 +120,24 @@ static int32_t sifValidateColumn(SColumnNode *cn) {
return TSDB_CODE_SUCCESS;
}
static SIdxFltStatus sifMergeCond(ELogicConditionType type, SIdxFltStatus ls, SIdxFltStatus rs) {
// enh rule later
if (type == LOGIC_COND_TYPE_AND) {
if (ls == SFLT_NOT_INDEX || rs == SFLT_NOT_INDEX) {
if (ls == SFLT_NOT_INDEX)
return rs;
else
return ls;
}
return SFLT_COARSE_INDEX;
} else if (type == LOGIC_COND_TYPE_OR) {
return SFLT_COARSE_INDEX;
} else if (type == LOGIC_COND_TYPE_NOT) {
return SFLT_NOT_INDEX;
}
return SFLT_NOT_INDEX;
}
static int32_t sifGetValueFromNode(SNode *node, char **value) {
// covert data From snode;
SValueNode *vn = (SValueNode *)node;
@ -257,11 +281,11 @@ static int32_t sifDoIndex(SIFParam *left, SIFParam *right, int8_t operType, SIFP
if (tm == NULL) {
return TSDB_CODE_QRY_OUT_OF_MEMORY;
}
SIndexMultiTermQuery *mtm = indexMultiTermQueryCreate(MUST);
EIndexQueryType qtype = 0;
SIF_ERR_RET(sifGetFuncFromSql(operType, &qtype));
SIndexMultiTermQuery *mtm = indexMultiTermQueryCreate(MUST);
indexMultiTermQueryAdd(mtm, tm, qtype);
int ret = indexSearch(NULL, mtm, output->result);
indexMultiTermQueryDestroy(mtm);
@ -319,6 +343,7 @@ static int32_t sifNotMatchFunc(SIFParam *left, SIFParam *right, SIFParam *output
int id = OP_TYPE_NMATCH;
return sifDoIndex(left, right, id, output);
}
static int32_t sifDefaultFunc(SIFParam *left, SIFParam *right, SIFParam *output) {
// add more except
return TSDB_CODE_QRY_INVALID_INPUT;
@ -352,9 +377,9 @@ static sif_func_t sifGetOperFn(int32_t funcId) {
case OP_TYPE_NMATCH:
return sifNotMatchFunc;
default:
return sifDefaultFunc;
return sifNullFunc;
}
return sifDefaultFunc;
return sifNullFunc;
}
static int32_t sifExecOper(SOperatorNode *node, SIFCtx *ctx, SIFParam *output) {
int32_t code = 0;
@ -367,6 +392,11 @@ static int32_t sifExecOper(SOperatorNode *node, SIFCtx *ctx, SIFParam *output) {
SIF_ERR_RET(sifInitOperParams(&params, node, ctx));
sif_func_t operFn = sifGetOperFn(node->opType);
if (ctx->noExec && operFn == NULL) {
output->status = SFLT_NOT_INDEX;
} else {
output->status = SFLT_ACCURATE_INDEX;
}
return operFn(&params[0], nParam > 1 ? &params[1] : NULL, output);
_return:
@ -385,6 +415,7 @@ static int32_t sifExecLogic(SLogicConditionNode *node, SIFCtx *ctx, SIFParam *ou
SIFParam *params = NULL;
SIF_ERR_RET(sifInitParamList(&params, node->pParameterList, ctx));
if (ctx->noExec == false) {
for (int32_t m = 0; m < node->pParameterList->length; m++) {
// add impl later
if (node->condType == LOGIC_COND_TYPE_AND) {
@ -395,6 +426,11 @@ static int32_t sifExecLogic(SLogicConditionNode *node, SIFCtx *ctx, SIFParam *ou
taosArrayAddAll(output->result, params[m].result);
}
}
} else {
for (int32_t m = 0; m < node->pParameterList->length; m++) {
output->status = sifMergeCond(node->condType, output->status, params[m].status);
}
}
_return:
taosMemoryFree(params);
SIF_RET(code);
@ -486,7 +522,7 @@ static int32_t sifCalculate(SNode *pNode, SIFParam *pDst) {
return TSDB_CODE_QRY_INVALID_INPUT;
}
int32_t code = 0;
SIFCtx ctx = {.code = 0};
SIFCtx ctx = {.code = 0, .noExec = false};
ctx.pRes = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
if (NULL == ctx.pRes) {
qError("index-filter failed to taosHashInit");
@ -510,6 +546,36 @@ static int32_t sifCalculate(SNode *pNode, SIFParam *pDst) {
SIF_RET(code);
}
static int32_t sifGetFltHint(SNode *pNode, SIdxFltStatus *status) {
int32_t code = TSDB_CODE_SUCCESS;
if (pNode == NULL) {
return TSDB_CODE_QRY_INVALID_INPUT;
}
SIFCtx ctx = {.code = 0, .noExec = true};
ctx.pRes = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
if (NULL == ctx.pRes) {
qError("index-filter failed to taosHashInit");
return TSDB_CODE_QRY_OUT_OF_MEMORY;
}
nodesWalkExprPostOrder(pNode, sifCalcWalker, &ctx);
SIF_ERR_RET(ctx.code);
SIFParam *res = (SIFParam *)taosHashGet(ctx.pRes, (void *)&pNode, POINTER_BYTES);
if (res == NULL) {
qError("no valid res in hash, node:(%p), type(%d)", (void *)&pNode, nodeType(pNode));
SIF_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
}
*status = res->status;
sifFreeParam(res);
taosHashRemove(ctx.pRes, (void *)&pNode, POINTER_BYTES);
SIF_RET(code);
}
int32_t doFilterTag(const SNode *pFilterNode, SArray *result) {
if (pFilterNode == NULL) {
return TSDB_CODE_SUCCESS;
@ -528,10 +594,14 @@ int32_t doFilterTag(const SNode *pFilterNode, SArray *result) {
}
SIdxFltStatus idxGetFltStatus(SNode *pFilterNode) {
SIdxFltStatus st = SFLT_NOT_INDEX;
if (pFilterNode == NULL) {
return SFLT_NOT_INDEX;
}
SFilterInfo *filter = NULL;
// todo move to the initialization function
SIF_ERR_RET(filterInitFromNode((SNode *)pFilterNode, &filter, 0));
// impl later
return SFLT_ACCURATE_INDEX;
SIF_ERR_RET(sifGetFltHint((SNode *)pFilterNode, &st));
return st;
}