From 0c243e658efadbf155b27e81af71b513b6b4201b Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Tue, 29 Mar 2022 17:30:44 +0800 Subject: [PATCH] math functions working with shell --- include/common/tcommon.h | 5 ++ source/libs/executor/src/executorimpl.c | 80 ++++++++++++++------- source/libs/scalar/src/sclfunc.c | 94 ++----------------------- 3 files changed, 65 insertions(+), 114 deletions(-) diff --git a/include/common/tcommon.h b/include/common/tcommon.h index b7fb988cec..51eabb7d61 100644 --- a/include/common/tcommon.h +++ b/include/common/tcommon.h @@ -197,6 +197,11 @@ typedef struct SGroupbyExpr { bool groupbyTag; // group by tag or column } SGroupbyExpr; +enum { + FUNC_PARAM_TYPE_VALUE = 0, + FUNC_PARAM_TYPE_COLUMN, +}; + typedef struct SFunctParam { int32_t type; SColumn* pCol; diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index f38888440c..f754aa900f 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -234,7 +234,6 @@ static void doSetOperatorCompleted(SOperatorInfo* pOperator) { setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED); } } - #define OPTR_IS_OPENED(_optr) (((_optr)->status & OP_OPENED) == OP_OPENED) #define OPTR_SET_OPENED(_optr) ((_optr)->status |= OP_OPENED) @@ -1186,8 +1185,19 @@ static void doSetInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, pCtx[i].size = pBlock->info.rows; pCtx[i].currentStage = MAIN_SCAN; + SExprInfo expr = pOperator->pExpr[i]; + for (int32_t j = 0; j < expr.base.numOfParams; ++j) { + SFunctParam *pFuncParam = &expr.base.pParam[j]; + if (pFuncParam->type == FUNC_PARAM_TYPE_COLUMN) { + int32_t slotId = pFuncParam->pCol->slotId; + pCtx[i].input.pData[j] = taosArrayGet(pBlock->pDataBlock, slotId); + pCtx[i].input.totalRows = pBlock->info.rows; + pCtx[i].input.numOfRows = pBlock->info.rows; + pCtx[i].input.startRowIndex = 0; + ASSERT(pCtx[i].input.pData[j] != NULL); + } + } // setBlockStatisInfo(&pCtx[i], pBlock, pOperator->pExpr[i].base.pColumns); - int32_t slotId = pOperator->pExpr[i].base.pParam[0].pCol->slotId; // uint32_t flag = pOperator->pExpr[i].base.pParam[0].pCol->flag; // if (TSDB_COL_IS_NORMAL_COL(flag) /*|| (pCtx[i].functionId == FUNCTION_BLKINFO) || @@ -1205,12 +1215,11 @@ static void doSetInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, // } // in case of the block distribution query, the inputBytes is not a constant value. - pCtx[i].input.pData[0] = taosArrayGet(pBlock->pDataBlock, slotId); - pCtx[i].input.totalRows = pBlock->info.rows; - pCtx[i].input.numOfRows = pBlock->info.rows; - pCtx[i].input.startRowIndex = 0; + //pCtx[i].input.pData[0] = taosArrayGet(pBlock->pDataBlock, slotId); + //pCtx[i].input.totalRows = pBlock->info.rows; + //pCtx[i].input.numOfRows = pBlock->info.rows; + //pCtx[i].input.startRowIndex = 0; - ASSERT(pCtx[i].input.pData[0] != NULL); // uint32_t status = aAggs[pCtx[i].functionId].status; // if ((status & (FUNCSTATE_SELECTIVITY | FUNCSTATE_NEED_TS)) != 0) { @@ -1267,15 +1276,17 @@ static void projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSData } else if (pExpr[k].pExpr->nodeType == QUERY_NODE_FUNCTION) { ASSERT(!fmIsAggFunc(pCtx->functionId)); - SScalarParam p = {.numOfRows = pSrcBlock->info.rows}; - int32_t slotId = pExpr[k].base.pParam[0].pCol->slotId; - p.columnData = taosArrayGet(pSrcBlock->pDataBlock, slotId); + SArray* pBlockList = taosArrayInit(4, POINTER_BYTES); + taosArrayPush(pBlockList, &pSrcBlock); SScalarParam dest = {0}; dest.columnData = taosArrayGet(pResult->pDataBlock, k); - pCtx[k].sfp.process(&p, 1, &dest); + scalarCalculate((SNode *)pExpr[k].pExpr->_function.pFunctNode, pBlockList, &dest); pResult->info.rows = dest.numOfRows; + + taosArrayDestroy(pBlockList); + } else { ASSERT(0); } @@ -8451,19 +8462,20 @@ SExprInfo* createExprInfo(SNodeList* pNodeList, SNodeList* pGroupKeys, int32_t* pExp->pExpr->_function.num = 1; pExp->pExpr->_function.functionId = -1; - pExp->base.pParam = taosMemoryCalloc(1, sizeof(SFunctParam)); - pExp->base.numOfParams = 1; - - pExp->base.pParam[0].pCol = taosMemoryCalloc(1, sizeof(SColumn)); - SColumn* pCol = pExp->base.pParam[0].pCol; - // it is a project query, or group by column if (nodeType(pTargetNode->pExpr) == QUERY_NODE_COLUMN) { pExp->pExpr->nodeType = QUERY_NODE_COLUMN; SColumnNode* pColNode = (SColumnNode*) pTargetNode->pExpr; + pExp->base.pParam = taosMemoryCalloc(1, sizeof(SFunctParam)); + pExp->base.numOfParams = 1; + pExp->base.pParam[0].pCol = taosMemoryCalloc(1, sizeof(SColumn)); + pExp->base.pParam[0].type = FUNC_PARAM_TYPE_COLUMN; + SDataType* pType = &pColNode->node.resType; pExp->base.resSchema = createResSchema(pType->type, pType->bytes, pTargetNode->slotId, pType->scale, pType->precision, pColNode->colName); + + SColumn* pCol = pExp->base.pParam[0].pCol; pCol->slotId = pColNode->slotId; // TODO refactor pCol->bytes = pType->bytes; pCol->type = pType->type; @@ -8482,26 +8494,46 @@ SExprInfo* createExprInfo(SNodeList* pNodeList, SNodeList* pGroupKeys, int32_t* // TODO: value parameter needs to be handled int32_t numOfParam = LIST_LENGTH(pFuncNode->pParameterList); + + pExp->base.pParam = taosMemoryCalloc(numOfParam, sizeof(SFunctParam)); + pExp->base.numOfParams = numOfParam; + for (int32_t j = 0; j < numOfParam; ++j) { SNode* p1 = nodesListGetNode(pFuncNode->pParameterList, j); - SColumnNode* pcn = (SColumnNode*)p1; // TODO refactor + if (p1->type == QUERY_NODE_COLUMN) { + SColumnNode* pcn = (SColumnNode*)p1; // TODO refactor - pCol->slotId = pcn->slotId; - pCol->bytes = pcn->node.resType.bytes; - pCol->type = pcn->node.resType.type; - pCol->scale = pcn->node.resType.scale; - pCol->precision = pcn->node.resType.precision; - pCol->dataBlockId = pcn->dataBlockId; + pExp->base.pParam[j].type = FUNC_PARAM_TYPE_COLUMN; + pExp->base.pParam[j].pCol = taosMemoryCalloc(1, sizeof(SColumn)); + SColumn* pCol = pExp->base.pParam[j].pCol; + + pCol->slotId = pcn->slotId; + pCol->bytes = pcn->node.resType.bytes; + pCol->type = pcn->node.resType.type; + pCol->scale = pcn->node.resType.scale; + pCol->precision = pcn->node.resType.precision; + pCol->dataBlockId = pcn->dataBlockId; + } else if (p1->type == QUERY_NODE_VALUE) { + SValueNode* pvn = (SValueNode*)p1; + + pExp->base.pParam[j].type = FUNC_PARAM_TYPE_VALUE; + } } } else if (nodeType(pTargetNode->pExpr) == QUERY_NODE_OPERATOR) { pExp->pExpr->nodeType = QUERY_NODE_OPERATOR; SOperatorNode* pNode = (SOperatorNode*) pTargetNode->pExpr; + pExp->base.pParam = taosMemoryCalloc(1, sizeof(SFunctParam)); + pExp->base.numOfParams = 1; + pExp->base.pParam[0].pCol = taosMemoryCalloc(1, sizeof(SColumn)); + pExp->base.pParam[0].type = FUNC_PARAM_TYPE_COLUMN; + SDataType* pType = &pNode->node.resType; pExp->base.resSchema = createResSchema(pType->type, pType->bytes, pTargetNode->slotId, pType->scale, pType->precision, pNode->node.aliasName); pExp->pExpr->_optrRoot.pRootNode = pTargetNode->pExpr; + SColumn* pCol = pExp->base.pParam[0].pCol; pCol->slotId = pTargetNode->slotId; // TODO refactor pCol->bytes = pType->bytes; pCol->type = pType->type; diff --git a/source/libs/scalar/src/sclfunc.c b/source/libs/scalar/src/sclfunc.c index bab626f2c0..af5f07751b 100644 --- a/source/libs/scalar/src/sclfunc.c +++ b/source/libs/scalar/src/sclfunc.c @@ -107,96 +107,14 @@ int32_t absFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutpu return TSDB_CODE_SUCCESS; } -#if 0 -int32_t logFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) { - if (inputNum != 2 || !IS_NUMERIC_TYPE(pInput[0].type) || !IS_NUMERIC_TYPE(pInput[1].type)) { - return TSDB_CODE_FAILED; - } - - char **input = NULL, *output = NULL; - bool hasNullInput = false; - input = taosMemoryCalloc(inputNum, sizeof(char *)); - for (int32_t i = 0; i < pOutput->num; ++i) { - for (int32_t j = 0; j < inputNum; ++j) { - if (pInput[j].num == 1) { - input[j] = pInput[j].data; - } else { - input[j] = pInput[j].data + i * pInput[j].bytes; - } - if (isNull(input[j], pInput[j].type)) { - hasNullInput = true; - break; - } - } - output = pOutput->data + i * pOutput->bytes; - - if (hasNullInput) { - setNull(output, pOutput->type, pOutput->bytes); - continue; - } - - double base; - GET_TYPED_DATA(base, double, pInput[1].type, input[1]); - double v; - GET_TYPED_DATA(v, double, pInput[0].type, input[0]); - double result = log(v) / log(base); - SET_TYPED_DATA(output, pOutput->type, result); - } - - taosMemoryFree(input); - - return TSDB_CODE_SUCCESS; -} -#endif - -#if 0 -int32_t powFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) { - if (inputNum != 2 || !IS_NUMERIC_TYPE(pInput[0].type) || !IS_NUMERIC_TYPE(pInput[1].type)) { - return TSDB_CODE_FAILED; - } - - pOutput->type = TSDB_DATA_TYPE_DOUBLE; - pOutput->bytes = tDataTypes[TSDB_DATA_TYPE_DOUBLE].bytes; - - char **input = NULL, *output = NULL; - bool hasNullInput = false; - input = taosMemoryCalloc(inputNum, sizeof(char *)); - for (int32_t i = 0; i < pOutput->num; ++i) { - for (int32_t j = 0; j < inputNum; ++j) { - if (pInput[j].num == 1) { - input[j] = pInput[j].data; - } else { - input[j] = pInput[j].data + i * pInput[j].bytes; - } - if (isNull(input[j], pInput[j].type)) { - hasNullInput = true; - break; - } - } - output = pOutput->data + i * pOutput->bytes; - - if (hasNullInput) { - setNull(output, pOutput->type, pOutput->bytes); - continue; - } - - double base; - GET_TYPED_DATA(base, double, pInput[1].type, input[1]); - double v; - GET_TYPED_DATA(v, double, pInput[0].type, input[0]); - double result = pow(v, base); - SET_TYPED_DATA(output, pOutput->type, result); - } - - taosMemoryFree(input); - return TSDB_CODE_SUCCESS; -} -#endif - typedef float (*_float_fn)(float); typedef double (*_double_fn)(double); typedef double (*_double_fn_2)(double, double); +double tlog(double v, double base) { + return log(v) / log(base); +} + int32_t doScalarFunctionUnique(SScalarParam *pInput, int32_t inputNum, SScalarParam* pOutput, _double_fn valFn) { int32_t type = GET_PARAM_TYPE(pInput); if (inputNum != 1 || !IS_NUMERIC_TYPE(type)) { @@ -222,10 +140,6 @@ int32_t doScalarFunctionUnique(SScalarParam *pInput, int32_t inputNum, SScalarPa return TSDB_CODE_SUCCESS; } -double tlog(double v, double base) { - return log(v) / log(base); -} - int32_t doScalarFunctionUnique2(SScalarParam *pInput, int32_t inputNum, SScalarParam* pOutput, _double_fn_2 valFn) { if (inputNum != 2 || !IS_NUMERIC_TYPE(GET_PARAM_TYPE(&pInput[0])) || !IS_NUMERIC_TYPE(GET_PARAM_TYPE(&pInput[1]))) { return TSDB_CODE_FAILED;