From 9fae3397100d8f8cd2632f52ea56829a6e9b85fc Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Sat, 16 Apr 2022 16:35:24 +0800 Subject: [PATCH 1/3] enh: add tag index filter impl --- source/libs/executor/src/indexoperator.c | 154 +++++++++++++++++++++-- 1 file changed, 144 insertions(+), 10 deletions(-) diff --git a/source/libs/executor/src/indexoperator.c b/source/libs/executor/src/indexoperator.c index b5b9cdb740..84098bba2e 100644 --- a/source/libs/executor/src/indexoperator.c +++ b/source/libs/executor/src/indexoperator.c @@ -22,35 +22,170 @@ typedef struct SIFCtx { SHashObj *pRes; /* element is SScalarParam */ } SIFCtx; +#define SIF_ERR_RET(c) \ + do { \ + int32_t _code = c; \ + if (_code != TSDB_CODE_SUCCESS) { \ + terrno = _code; \ + return _code; \ + } \ + } while (0) +#define SIF_RET(c) \ + do { \ + int32_t _code = c; \ + if (_code != TSDB_CODE_SUCCESS) { \ + terrno = _code; \ + } \ + return _code; \ + } while (0) +#define SIF_ERR_JRET(c) \ + do { \ + code = c; \ + if (code != TSDB_CODE_SUCCESS) { \ + terrno = code; \ + goto _return; \ + } \ + } while (0) + typedef struct SIFParam { SArray * result; SHashObj *pFilter; } SIFParam; + +typedef int32_t (*sif_func_t)(SNode *left, SNode *rigth, SIFParam *output); // construct tag filter operator later -static void destroyTagFilterOperatorInfo(void *param) { - STagFilterOperatorInfo *pInfo = (STagFilterOperatorInfo *)param; -} +static void destroyTagFilterOperatorInfo(void *param) { STagFilterOperatorInfo *pInfo = (STagFilterOperatorInfo *)param; } static void sifFreeParam(SIFParam *param) { if (param == NULL) return; taosArrayDestroy(param->result); } -int32_t sifInitOperParams(SIFParam *params, SOperatorNode *node, SIFCtx *ctx) { - int32_t code = 0; - return code; +static int32_t sifGetOperParamNum(EOperatorType ty) { + if (OP_TYPE_IS_NULL == ty || OP_TYPE_IS_NOT_NULL == ty || OP_TYPE_IS_TRUE == ty || OP_TYPE_IS_NOT_TRUE == ty || OP_TYPE_IS_FALSE == ty || + OP_TYPE_IS_NOT_FALSE == ty || OP_TYPE_IS_UNKNOWN == ty || OP_TYPE_IS_NOT_UNKNOWN == ty || OP_TYPE_MINUS == ty) { + return 1; + } + return 2; } +static int32_t sifInitParam(SNode *node, SIFParam *param, SIFCtx *ctx) { + switch (nodeType(node)) { + case QUERY_NODE_VALUE: { + SValueNode *vn = (SValueNode *)node; + + break; + } + case QUERY_NODE_COLUMN: { + SColumnNode *cn = (SColumnNode *)node; + + break; + } + case QUERY_NODE_NODE_LIST: { + SNodeListNode *nl = (SNodeListNode *)node; + if (LIST_LENGTH(nl->pNodeList) <= 0) { + qError("invalid length for node:%p, length: %d", node, LIST_LENGTH(nl->pNodeList)); + SIF_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); + } + + if (taosHashPut(ctx->pRes, &node, POINTER_BYTES, param, sizeof(*param))) { + taosHashCleanup(param->pFilter); + qError("taosHashPut nodeList failed, size:%d", (int32_t)sizeof(*param)); + SIF_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); + } + break; + } + case QUERY_NODE_FUNCTION: + case QUERY_NODE_OPERATOR: + case QUERY_NODE_LOGIC_CONDITION: { + SIFParam *res = (SIFParam *)taosHashGet(ctx->pRes, &node, POINTER_BYTES); + if (NULL == res) { + qError("no result for node, type:%d, node:%p", nodeType(node), node); + SIF_ERR_RET(TSDB_CODE_QRY_APP_ERROR); + } + *param = *res; + break; + } + default: + break; + } + return TSDB_CODE_SUCCESS; +} + +static int32_t sifInitOperParams(SIFParam **params, SOperatorNode *node, SIFCtx *ctx) { + int32_t code = 0; + int32_t nParam = sifGetOperParamNum(node->opType); + if (NULL == node->pLeft || (nParam == 2 && NULL == node->pRight)) { + qError("invalid operation node, left: %p, rigth: %p", node->pLeft, node->pRight); + SIF_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); + } + SIFParam *paramList = taosMemoryCalloc(nParam, sizeof(SIFParam)); + if (NULL == paramList) { + SIF_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); + } + + SIF_ERR_JRET(sifInitParam(node->pLeft, ¶mList[0], ctx)); + if (nParam > 1) { + SIF_ERR_JRET(sifInitParam(node->pRight, ¶mList[0], ctx)); + } + *params = paramList; + SIF_RET(TSDB_CODE_SUCCESS); +_return: + taosMemoryFree(paramList); + SIF_RET(code); +} +// int32_t sifInitOperParams(SIFParam *params, SOperatorNode *node, SIFCtx *ctx) { +// int32_t code = 0; +// return code; +//} static int32_t sifExecFunction(SFunctionNode *node, SIFCtx *ctx, SIFParam *output) { qError("index-filter not support buildin function"); + return TSDB_CODE_QRY_INVALID_INPUT; +} + +static int32_t sifLessThanFunc(SNode *left, SNode *rigth, SIFParam *output) { + // impl later return TSDB_CODE_SUCCESS; } +static int32_t sifDefaultFunc(SNode *left, SNode *rigth, SIFParam *output) { + // add more except + return TSDB_CODE_QRY_INVALID_INPUT; +} + +static sif_func_t sifGetOperFn(int32_t funcId) { + // impl later + switch (funcId) { + case OP_TYPE_LOWER_THAN: + return sifLessThanFunc; + default: + return sifDefaultFunc; + } + return sifDefaultFunc; +} static int32_t sifExecOper(SOperatorNode *node, SIFCtx *ctx, SIFParam *output) { + int32_t code = 0; SIFParam *params = NULL; + SIF_ERR_RET(sifInitOperParams(¶ms, node, ctx)); - return TSDB_CODE_SUCCESS; + int32_t nParam = sifGetOperParamNum(node->opType); + if (nParam <= 1) { + SIF_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT); + } + sif_func_t operFn = sifGetOperFn(node->opType); + + return operFn(node->pLeft, node->pRight, output); +_return: + taosMemoryFree(params); + SIF_RET(code); } -static int32_t sifExecLogic(SLogicConditionNode *node, SIFCtx *ctx, SIFParam *output) { return TSDB_CODE_SUCCESS; } +static int32_t sifExecLogic(SLogicConditionNode *node, SIFCtx *ctx, SIFParam *output) { + if (NULL == node->pParameterList || node->pParameterList->length <= 0) { + qError("invalid logic parameter list, list:%p, paramNum:%d", node->pParameterList, node->pParameterList ? node->pParameterList->length : 0); + return TSDB_CODE_QRY_INVALID_INPUT; + } + // impl later + return TSDB_CODE_SUCCESS; +} static EDealRes sifWalkFunction(SNode *pNode, void *context) { // impl later @@ -104,8 +239,7 @@ static EDealRes sifWalkOper(SNode *pNode, void *context) { } EDealRes sifCalcWalker(SNode *node, void *context) { - if (QUERY_NODE_VALUE == nodeType(node) || QUERY_NODE_NODE_LIST == nodeType(node) || - QUERY_NODE_COLUMN == nodeType(node)) { + if (QUERY_NODE_VALUE == nodeType(node) || QUERY_NODE_NODE_LIST == nodeType(node) || QUERY_NODE_COLUMN == nodeType(node)) { return DEAL_RES_CONTINUE; } SIFCtx *ctx = (SIFCtx *)context; From fe367b5ef49ec0f16d37d5c760f657f9a088966b Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Sat, 16 Apr 2022 17:56:06 +0800 Subject: [PATCH 2/3] enh: add tag index filter impl --- source/libs/executor/src/indexoperator.c | 68 ++++++++++++++++++++++++ 1 file changed, 68 insertions(+) diff --git a/source/libs/executor/src/indexoperator.c b/source/libs/executor/src/indexoperator.c index 84098bba2e..edffe24ac4 100644 --- a/source/libs/executor/src/indexoperator.c +++ b/source/libs/executor/src/indexoperator.c @@ -146,6 +146,52 @@ static int32_t sifLessThanFunc(SNode *left, SNode *rigth, SIFParam *output) { // impl later return TSDB_CODE_SUCCESS; } +static int32_t sifLessEqualFunc(SNode *left, SNode *rigth, SIFParam *output) { + // impl later + return TSDB_CODE_SUCCESS; +} +static int32_t sifGreaterThanFunc(SNode *left, SNode *rigth, SIFParam *output) { + // impl later + return TSDB_CODE_SUCCESS; +} +static int32_t sifGreaterEqualFunc(SNode *left, SNode *rigth, SIFParam *output) { + // impl later + return TSDB_CODE_SUCCESS; +} + +static int32_t sifEqualFunc(SNode *left, SNode *rigth, SIFParam *output) { + // impl later + return TSDB_CODE_SUCCESS; +} +static int32_t sifNotEqualFunc(SNode *left, SNode *rigth, SIFParam *output) { + // impl later + return TSDB_CODE_SUCCESS; +} +static int32_t sifInFunc(SNode *left, SNode *rigth, SIFParam *output) { + // impl later + return TSDB_CODE_SUCCESS; +} +static int32_t sifNotInFunc(SNode *left, SNode *right, SIFParam *output) { + // impl later + return TSDB_CODE_SUCCESS; +} +static int32_t sifLikeFunc(SNode *left, SNode *right, SIFParam *output) { + // impl later + return TSDB_CODE_SUCCESS; +} +static int32_t sifNotLikeFunc(SNode *left, SNode *right, SIFParam *output) { + // impl later + return TSDB_CODE_SUCCESS; +} + +static int32_t sifMatchFunc(SNode *left, SNode *rigth, SIFParam *output) { + // impl later + return TSDB_CODE_SUCCESS; +} +static int32_t sifNotMatchFunc(SNode *left, SNode *rigth, SIFParam *output) { + // impl later + return TSDB_CODE_SUCCESS; +} static int32_t sifDefaultFunc(SNode *left, SNode *rigth, SIFParam *output) { // add more except return TSDB_CODE_QRY_INVALID_INPUT; @@ -154,8 +200,30 @@ static int32_t sifDefaultFunc(SNode *left, SNode *rigth, SIFParam *output) { static sif_func_t sifGetOperFn(int32_t funcId) { // impl later switch (funcId) { + case OP_TYPE_GREATER_THAN: + return sifGreaterThanFunc; + case OP_TYPE_GREATER_EQUAL: + return sifGreaterEqualFunc; case OP_TYPE_LOWER_THAN: return sifLessThanFunc; + case OP_TYPE_LOWER_EQUAL: + return sifLessEqualFunc; + case OP_TYPE_EQUAL: + return sifEqualFunc; + case OP_TYPE_NOT_EQUAL: + return sifNotEqualFunc; + case OP_TYPE_IN: + return sifInFunc; + case OP_TYPE_NOT_IN: + return sifNotInFunc; + case OP_TYPE_LIKE: + return sifLikeFunc; + case OP_TYPE_NOT_LIKE: + return sifNotLikeFunc; + case OP_TYPE_MATCH: + return sifMatchFunc; + case OP_TYPE_NMATCH: + return sifNotMatchFunc; default: return sifDefaultFunc; } From 25837faf28cf17599afae04e7eccc422cd51aee5 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Sat, 16 Apr 2022 20:35:18 +0800 Subject: [PATCH 3/3] enh: add multi filter merge --- source/libs/executor/src/indexoperator.c | 52 ++++++++++++++++++++---- 1 file changed, 43 insertions(+), 9 deletions(-) diff --git a/source/libs/executor/src/indexoperator.c b/source/libs/executor/src/indexoperator.c index edffe24ac4..34b04c04de 100644 --- a/source/libs/executor/src/indexoperator.c +++ b/source/libs/executor/src/indexoperator.c @@ -125,18 +125,37 @@ static int32_t sifInitOperParams(SIFParam **params, SOperatorNode *node, SIFCtx SIF_ERR_JRET(sifInitParam(node->pLeft, ¶mList[0], ctx)); if (nParam > 1) { - SIF_ERR_JRET(sifInitParam(node->pRight, ¶mList[0], ctx)); + SIF_ERR_JRET(sifInitParam(node->pRight, ¶mList[1], ctx)); } *params = paramList; - SIF_RET(TSDB_CODE_SUCCESS); + return TSDB_CODE_SUCCESS; _return: taosMemoryFree(paramList); SIF_RET(code); } -// int32_t sifInitOperParams(SIFParam *params, SOperatorNode *node, SIFCtx *ctx) { -// int32_t code = 0; -// return code; -//} +static int32_t sifInitParamList(SIFParam **params, SNodeList *nodeList, SIFCtx *ctx) { + int32_t code = 0; + SIFParam *tParams = taosMemoryCalloc(nodeList->length, sizeof(SIFParam)); + if (tParams == NULL) { + qError("failed to calloc, nodeList: %p", nodeList); + SIF_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); + } + + SListCell *cell = nodeList->pHead; + for (int32_t i = 0; i < nodeList->length; i++) { + if (NULL == cell || NULL == cell->pNode) { + SIF_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT); + } + SIF_ERR_JRET(sifInitParam(cell->pNode, &tParams[i], ctx)); + cell = cell->pNext; + } + *params = tParams; + return TSDB_CODE_SUCCESS; + +_return: + taosMemoryFree(tParams); + SIF_RET(code); +} static int32_t sifExecFunction(SFunctionNode *node, SIFCtx *ctx, SIFParam *output) { qError("index-filter not support buildin function"); return TSDB_CODE_QRY_INVALID_INPUT; @@ -251,12 +270,27 @@ static int32_t sifExecLogic(SLogicConditionNode *node, SIFCtx *ctx, SIFParam *ou qError("invalid logic parameter list, list:%p, paramNum:%d", node->pParameterList, node->pParameterList ? node->pParameterList->length : 0); return TSDB_CODE_QRY_INVALID_INPUT; } - // impl later - return TSDB_CODE_SUCCESS; + + int32_t code; + SIFParam *params = NULL; + SIF_ERR_RET(sifInitParamList(¶ms, node->pParameterList, ctx)); + + for (int32_t m = 0; m < node->pParameterList->length; m++) { + // add impl later + if (node->condType == LOGIC_COND_TYPE_AND) { + taosArrayAddAll(output->result, params[m].result); + } else if (node->condType == LOGIC_COND_TYPE_OR) { + taosArrayAddAll(output->result, params[m].result); + } else if (node->condType == LOGIC_COND_TYPE_NOT) { + taosArrayAddAll(output->result, params[m].result); + } + } +_return: + taosMemoryFree(params); + SIF_RET(code); } static EDealRes sifWalkFunction(SNode *pNode, void *context) { - // impl later SFunctionNode *node = (SFunctionNode *)pNode; SIFParam output = {0};