From 0022a766b3b43e2f17bcf202f5ff7919165aa1fb Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Mon, 26 Sep 2022 19:21:57 +0800 Subject: [PATCH] enh: support case when --- source/libs/scalar/src/filter.c | 5 ++ source/libs/scalar/src/scalar.c | 135 ++++++++++++++++++++++++++++++-- 2 files changed, 134 insertions(+), 6 deletions(-) diff --git a/source/libs/scalar/src/filter.c b/source/libs/scalar/src/filter.c index 2ad06249ca..b82cc2fd26 100644 --- a/source/libs/scalar/src/filter.c +++ b/source/libs/scalar/src/filter.c @@ -3786,6 +3786,11 @@ EDealRes fltReviseRewriter(SNode** pNode, void* pContext) { return DEAL_RES_CONTINUE; } + if (QUERY_NODE_CASE_WHEN == nodeType(*pNode) || QUERY_NODE_WHEN_THEN == nodeType(*pNode)) { + stat->scalarMode = true; + return DEAL_RES_CONTINUE; + } + if (QUERY_NODE_OPERATOR == nodeType(*pNode)) { SOperatorNode *node = (SOperatorNode *)*pNode; if (!FLT_IS_COMPARISON_OPERATOR(node->opType)) { diff --git a/source/libs/scalar/src/scalar.c b/source/libs/scalar/src/scalar.c index 9cba94d85a..9394c4f6ca 100644 --- a/source/libs/scalar/src/scalar.c +++ b/source/libs/scalar/src/scalar.c @@ -532,6 +532,10 @@ _return: SCL_RET(code); } +int32_t sclGetNodeRes(SNode* node, SArray* pBlockList, SScalarParam **res) { + +} + int32_t sclExecFunction(SFunctionNode *node, SScalarCtx *ctx, SScalarParam *output) { SScalarParam *params = NULL; int32_t rowNum = 0; @@ -688,6 +692,47 @@ _return: SCL_RET(code); } +int32_t sclExecCaseWhen(SCaseWhenNode *node, SScalarCtx *ctx, SScalarParam *output) { + int32_t rowNum = 0; + int32_t code = 0; + SScalarParam *pCase = NULL; + SScalarParam *pElse = NULL; + + if (node->pCase) { + SCL_ERR_RET(sclGetNodeRes(node->pCase, ctx->pBlockList, &pCase)); + } + + SNode* tnode = NULL; + FOREACH(tnode, node->pWhenThenList) { + if (QUERY_NODE_VALUE == tnode->type) { + return DEAL_RES_CONTINUE; + } + } + + if (output->columnData == NULL) { + code = sclCreateColumnInfoData(&node->node.resType, rowNum, output); + if (code != TSDB_CODE_SUCCESS) { + SCL_ERR_JRET(code); + } + } + + _bin_scalar_fn_t OperatorFn = getBinScalarOperatorFn(node->opType); + + int32_t paramNum = scalarGetOperatorParamNum(node->opType); + SScalarParam* pLeft = ¶ms[0]; + SScalarParam* pRight = paramNum > 1 ? ¶ms[1] : NULL; + + terrno = TSDB_CODE_SUCCESS; + OperatorFn(pLeft, pRight, output, TSDB_ORDER_ASC); + code = terrno; + +_return: + + sclFreeParamList(params, paramNum); + SCL_RET(code); +} + + EDealRes sclRewriteNullInOptr(SNode** pNode, SScalarCtx *ctx, EOperatorType opType) { if (opType <= OP_TYPE_CALC_MAX) { SValueNode *res = (SValueNode *)nodesMakeNode(QUERY_NODE_VALUE); @@ -951,9 +996,65 @@ EDealRes sclRewriteOperator(SNode** pNode, SScalarCtx *ctx) { return DEAL_RES_CONTINUE; } +EDealRes sclRewriteCaseWhen(SNode** pNode, SScalarCtx *ctx) { + SCaseWhenNode *node = (SCaseWhenNode *)*pNode; + + if ((!SCL_IS_CONST_NODE(node->pCase)) || (!SCL_IS_CONST_NODE(node->pElse))) { + return DEAL_RES_CONTINUE; + } + + SNode* tnode = NULL; + FOREACH(tnode, node->pWhenThenList) { + if (!SCL_IS_CONST_NODE(tnode)) { + return DEAL_RES_CONTINUE; + } + } + + SScalarParam output = {0}; + ctx->code = sclExecCaseWhen(node, ctx, &output); + if (ctx->code) { + return DEAL_RES_ERROR; + } + + SValueNode *res = (SValueNode *)nodesMakeNode(QUERY_NODE_VALUE); + if (NULL == res) { + sclError("make value node failed"); + sclFreeParam(&output); + ctx->code = TSDB_CODE_QRY_OUT_OF_MEMORY; + return DEAL_RES_ERROR; + } + + res->translate = true; + + res->node.resType = node->node.resType; + if (colDataIsNull_s(output.columnData, 0)) { + res->isNull = true; + res->node.resType = node->node.resType; + } else { + int32_t type = output.columnData->info.type; + if (IS_VAR_DATA_TYPE(type)) { // todo refactor + res->datum.p = output.columnData->pData; + output.columnData->pData = NULL; + } else { + nodesSetValueNodeValue(res, output.columnData->pData); + } + } + + nodesDestroyNode(*pNode); + *pNode = (SNode*)res; + + sclFreeParam(&output); + return DEAL_RES_CONTINUE; +} + + EDealRes sclConstantsRewriter(SNode** pNode, void* pContext) { SScalarCtx *ctx = (SScalarCtx *)pContext; + if (QUERY_NODE_OPERATOR == nodeType(*pNode)) { + return sclRewriteOperator(pNode, ctx); + } + if (QUERY_NODE_FUNCTION == nodeType(*pNode)) { return sclRewriteFunction(pNode, ctx); } @@ -962,8 +1063,8 @@ EDealRes sclConstantsRewriter(SNode** pNode, void* pContext) { return sclRewriteLogic(pNode, ctx); } - if (QUERY_NODE_OPERATOR == nodeType(*pNode)) { - return sclRewriteOperator(pNode, ctx); + if (QUERY_NODE_CASE_WHEN == nodeType(pNode)) { + return sclRewriteCaseWhen(pNode, ctx); } return DEAL_RES_CONTINUE; @@ -1070,12 +1171,34 @@ EDealRes sclWalkTarget(SNode* pNode, SScalarCtx *ctx) { return DEAL_RES_CONTINUE; } +EDealRes sclWalkCaseWhen(SNode* pNode, SScalarCtx *ctx) { + SCaseWhenNode *node = (SCaseWhenNode *)pNode; + SScalarParam output = {0}; + + ctx->code = sclExecCaseWhen(node, ctx, &output); + if (ctx->code) { + return DEAL_RES_ERROR; + } + + if (taosHashPut(ctx->pRes, &pNode, POINTER_BYTES, &output, sizeof(output))) { + ctx->code = TSDB_CODE_QRY_OUT_OF_MEMORY; + return DEAL_RES_ERROR; + } + + return DEAL_RES_CONTINUE; +} + + EDealRes sclCalcWalker(SNode* pNode, void* pContext) { if (QUERY_NODE_VALUE == nodeType(pNode) || QUERY_NODE_NODE_LIST == nodeType(pNode) || QUERY_NODE_COLUMN == nodeType(pNode)|| QUERY_NODE_LEFT_VALUE == nodeType(pNode)) { return DEAL_RES_CONTINUE; } SScalarCtx *ctx = (SScalarCtx *)pContext; + if (QUERY_NODE_OPERATOR == nodeType(pNode)) { + return sclWalkOperator(pNode, ctx); + } + if (QUERY_NODE_FUNCTION == nodeType(pNode)) { return sclWalkFunction(pNode, ctx); } @@ -1084,14 +1207,14 @@ EDealRes sclCalcWalker(SNode* pNode, void* pContext) { return sclWalkLogic(pNode, ctx); } - if (QUERY_NODE_OPERATOR == nodeType(pNode)) { - return sclWalkOperator(pNode, ctx); - } - if (QUERY_NODE_TARGET == nodeType(pNode)) { return sclWalkTarget(pNode, ctx); } + if (QUERY_NODE_CASE_WHEN == nodeType(pNode)) { + return sclWalkCaseWhen(pNode, ctx); + } + sclError("invalid node type for scalar calculating, type:%d", nodeType(pNode)); ctx->code = TSDB_CODE_QRY_INVALID_INPUT; return DEAL_RES_ERROR;