From b601542094c2ede426a5394386fba8ec9e76b840 Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Mon, 28 Mar 2022 15:45:11 +0800 Subject: [PATCH 1/6] refactor pow/log --- source/libs/function/src/builtins.c | 20 ++++++++++-- source/libs/scalar/src/sclfunc.c | 50 ++++++++++++++++++++++++++--- 2 files changed, 64 insertions(+), 6 deletions(-) diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index 9b3a7ff515..78664ddd86 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -173,7 +173,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .finalizeFunc = NULL }, { - .name = "power", + .name = "pow", .type = FUNCTION_TYPE_POW, .classification = FUNC_MGT_SCALAR_FUNC, .checkFunc = stubCheckAndGetResultType, @@ -332,13 +332,29 @@ int32_t stubCheckAndGetResultType(SFunctionNode* pFunc) { // todo break; - case FUNCTION_TYPE_ABS: { + case FUNCTION_TYPE_ABS: + case FUNCTION_TYPE_CEIL: + case FUNCTION_TYPE_FLOOR: + case FUNCTION_TYPE_ROUND: { SColumnNode* pParam = nodesListGetNode(pFunc->pParameterList, 0); int32_t paraType = pParam->node.resType.type; pFunc->node.resType = (SDataType) { .bytes = tDataTypes[paraType].bytes, .type = paraType }; break; } + case FUNCTION_TYPE_SIN: + case FUNCTION_TYPE_COS: + case FUNCTION_TYPE_TAN: + case FUNCTION_TYPE_ASIN: + case FUNCTION_TYPE_ACOS: + case FUNCTION_TYPE_ATAN: + case FUNCTION_TYPE_SQRT: + case FUNCTION_TYPE_LOG: + case FUNCTION_TYPE_POW: { + pFunc->node.resType = (SDataType) { .bytes = tDataTypes[TSDB_DATA_TYPE_DOUBLE].bytes, .type = TSDB_DATA_TYPE_DOUBLE }; + break; + } + default: ASSERT(0); // to found the fault ASAP. } diff --git a/source/libs/scalar/src/sclfunc.c b/source/libs/scalar/src/sclfunc.c index feada84685..bab626f2c0 100644 --- a/source/libs/scalar/src/sclfunc.c +++ b/source/libs/scalar/src/sclfunc.c @@ -107,8 +107,8 @@ int32_t absFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutpu return TSDB_CODE_SUCCESS; } -int32_t logFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) { #if 0 +int32_t logFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) { if (inputNum != 2 || !IS_NUMERIC_TYPE(pInput[0].type) || !IS_NUMERIC_TYPE(pInput[1].type)) { return TSDB_CODE_FAILED; } @@ -144,13 +144,13 @@ int32_t logFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutpu } taosMemoryFree(input); -#endif return TSDB_CODE_SUCCESS; } +#endif -int32_t powFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) { #if 0 +int32_t powFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) { if (inputNum != 2 || !IS_NUMERIC_TYPE(pInput[0].type) || !IS_NUMERIC_TYPE(pInput[1].type)) { return TSDB_CODE_FAILED; } @@ -189,12 +189,13 @@ int32_t powFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutpu } taosMemoryFree(input); -#endif return TSDB_CODE_SUCCESS; } +#endif typedef float (*_float_fn)(float); typedef double (*_double_fn)(double); +typedef double (*_double_fn_2)(double, double); int32_t doScalarFunctionUnique(SScalarParam *pInput, int32_t inputNum, SScalarParam* pOutput, _double_fn valFn) { int32_t type = GET_PARAM_TYPE(pInput); @@ -221,6 +222,39 @@ int32_t doScalarFunctionUnique(SScalarParam *pInput, int32_t inputNum, SScalarPa return TSDB_CODE_SUCCESS; } +double tlog(double v, double base) { + return log(v) / log(base); +} + +int32_t doScalarFunctionUnique2(SScalarParam *pInput, int32_t inputNum, SScalarParam* pOutput, _double_fn_2 valFn) { + if (inputNum != 2 || !IS_NUMERIC_TYPE(GET_PARAM_TYPE(&pInput[0])) || !IS_NUMERIC_TYPE(GET_PARAM_TYPE(&pInput[1]))) { + return TSDB_CODE_FAILED; + } + + SColumnInfoData *pInputData[2]; + SColumnInfoData *pOutputData = pOutput->columnData; + _getDoubleValue_fn_t getValueFn[2]; + + for (int32_t i = 0; i < inputNum; ++i) { + pInputData[i] = pInput[i].columnData; + getValueFn[i]= getVectorDoubleValueFn(GET_PARAM_TYPE(&pInput[i])); + } + + double *out = (double *)pOutputData->pData; + + for (int32_t i = 0; i < pInput->numOfRows; ++i) { + if (colDataIsNull_f(pInputData[0]->nullbitmap, i) || + colDataIsNull_f(pInputData[1]->nullbitmap, 0)) { + colDataSetNull_f(pOutputData->nullbitmap, i); + continue; + } + out[i] = valFn(getValueFn[0](pInputData[0]->pData, i), getValueFn[1](pInputData[1]->pData, 0)); + } + + pOutput->numOfRows = pInput->numOfRows; + return TSDB_CODE_SUCCESS; +} + int32_t doScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam* pOutput, _float_fn f1, _double_fn d1) { int32_t type = GET_PARAM_TYPE(pInput); if (inputNum != 1 || !IS_NUMERIC_TYPE(type)) { @@ -292,6 +326,14 @@ int32_t acosFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutp return doScalarFunctionUnique(pInput, inputNum, pOutput, acos); } +int32_t powFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) { + return doScalarFunctionUnique2(pInput, inputNum, pOutput, pow); +} + +int32_t logFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) { + return doScalarFunctionUnique2(pInput, inputNum, pOutput, tlog); +} + int32_t sqrtFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) { return doScalarFunctionUnique(pInput, inputNum, pOutput, sqrt); } From 0c243e658efadbf155b27e81af71b513b6b4201b Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Tue, 29 Mar 2022 17:30:44 +0800 Subject: [PATCH 2/6] math functions working with shell --- include/common/tcommon.h | 5 ++ source/libs/executor/src/executorimpl.c | 80 ++++++++++++++------- source/libs/scalar/src/sclfunc.c | 94 ++----------------------- 3 files changed, 65 insertions(+), 114 deletions(-) diff --git a/include/common/tcommon.h b/include/common/tcommon.h index b7fb988cec..51eabb7d61 100644 --- a/include/common/tcommon.h +++ b/include/common/tcommon.h @@ -197,6 +197,11 @@ typedef struct SGroupbyExpr { bool groupbyTag; // group by tag or column } SGroupbyExpr; +enum { + FUNC_PARAM_TYPE_VALUE = 0, + FUNC_PARAM_TYPE_COLUMN, +}; + typedef struct SFunctParam { int32_t type; SColumn* pCol; diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index f38888440c..f754aa900f 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -234,7 +234,6 @@ static void doSetOperatorCompleted(SOperatorInfo* pOperator) { setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED); } } - #define OPTR_IS_OPENED(_optr) (((_optr)->status & OP_OPENED) == OP_OPENED) #define OPTR_SET_OPENED(_optr) ((_optr)->status |= OP_OPENED) @@ -1186,8 +1185,19 @@ static void doSetInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, pCtx[i].size = pBlock->info.rows; pCtx[i].currentStage = MAIN_SCAN; + SExprInfo expr = pOperator->pExpr[i]; + for (int32_t j = 0; j < expr.base.numOfParams; ++j) { + SFunctParam *pFuncParam = &expr.base.pParam[j]; + if (pFuncParam->type == FUNC_PARAM_TYPE_COLUMN) { + int32_t slotId = pFuncParam->pCol->slotId; + pCtx[i].input.pData[j] = taosArrayGet(pBlock->pDataBlock, slotId); + pCtx[i].input.totalRows = pBlock->info.rows; + pCtx[i].input.numOfRows = pBlock->info.rows; + pCtx[i].input.startRowIndex = 0; + ASSERT(pCtx[i].input.pData[j] != NULL); + } + } // setBlockStatisInfo(&pCtx[i], pBlock, pOperator->pExpr[i].base.pColumns); - int32_t slotId = pOperator->pExpr[i].base.pParam[0].pCol->slotId; // uint32_t flag = pOperator->pExpr[i].base.pParam[0].pCol->flag; // if (TSDB_COL_IS_NORMAL_COL(flag) /*|| (pCtx[i].functionId == FUNCTION_BLKINFO) || @@ -1205,12 +1215,11 @@ static void doSetInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, // } // in case of the block distribution query, the inputBytes is not a constant value. - pCtx[i].input.pData[0] = taosArrayGet(pBlock->pDataBlock, slotId); - pCtx[i].input.totalRows = pBlock->info.rows; - pCtx[i].input.numOfRows = pBlock->info.rows; - pCtx[i].input.startRowIndex = 0; + //pCtx[i].input.pData[0] = taosArrayGet(pBlock->pDataBlock, slotId); + //pCtx[i].input.totalRows = pBlock->info.rows; + //pCtx[i].input.numOfRows = pBlock->info.rows; + //pCtx[i].input.startRowIndex = 0; - ASSERT(pCtx[i].input.pData[0] != NULL); // uint32_t status = aAggs[pCtx[i].functionId].status; // if ((status & (FUNCSTATE_SELECTIVITY | FUNCSTATE_NEED_TS)) != 0) { @@ -1267,15 +1276,17 @@ static void projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSData } else if (pExpr[k].pExpr->nodeType == QUERY_NODE_FUNCTION) { ASSERT(!fmIsAggFunc(pCtx->functionId)); - SScalarParam p = {.numOfRows = pSrcBlock->info.rows}; - int32_t slotId = pExpr[k].base.pParam[0].pCol->slotId; - p.columnData = taosArrayGet(pSrcBlock->pDataBlock, slotId); + SArray* pBlockList = taosArrayInit(4, POINTER_BYTES); + taosArrayPush(pBlockList, &pSrcBlock); SScalarParam dest = {0}; dest.columnData = taosArrayGet(pResult->pDataBlock, k); - pCtx[k].sfp.process(&p, 1, &dest); + scalarCalculate((SNode *)pExpr[k].pExpr->_function.pFunctNode, pBlockList, &dest); pResult->info.rows = dest.numOfRows; + + taosArrayDestroy(pBlockList); + } else { ASSERT(0); } @@ -8451,19 +8462,20 @@ SExprInfo* createExprInfo(SNodeList* pNodeList, SNodeList* pGroupKeys, int32_t* pExp->pExpr->_function.num = 1; pExp->pExpr->_function.functionId = -1; - pExp->base.pParam = taosMemoryCalloc(1, sizeof(SFunctParam)); - pExp->base.numOfParams = 1; - - pExp->base.pParam[0].pCol = taosMemoryCalloc(1, sizeof(SColumn)); - SColumn* pCol = pExp->base.pParam[0].pCol; - // it is a project query, or group by column if (nodeType(pTargetNode->pExpr) == QUERY_NODE_COLUMN) { pExp->pExpr->nodeType = QUERY_NODE_COLUMN; SColumnNode* pColNode = (SColumnNode*) pTargetNode->pExpr; + pExp->base.pParam = taosMemoryCalloc(1, sizeof(SFunctParam)); + pExp->base.numOfParams = 1; + pExp->base.pParam[0].pCol = taosMemoryCalloc(1, sizeof(SColumn)); + pExp->base.pParam[0].type = FUNC_PARAM_TYPE_COLUMN; + SDataType* pType = &pColNode->node.resType; pExp->base.resSchema = createResSchema(pType->type, pType->bytes, pTargetNode->slotId, pType->scale, pType->precision, pColNode->colName); + + SColumn* pCol = pExp->base.pParam[0].pCol; pCol->slotId = pColNode->slotId; // TODO refactor pCol->bytes = pType->bytes; pCol->type = pType->type; @@ -8482,26 +8494,46 @@ SExprInfo* createExprInfo(SNodeList* pNodeList, SNodeList* pGroupKeys, int32_t* // TODO: value parameter needs to be handled int32_t numOfParam = LIST_LENGTH(pFuncNode->pParameterList); + + pExp->base.pParam = taosMemoryCalloc(numOfParam, sizeof(SFunctParam)); + pExp->base.numOfParams = numOfParam; + for (int32_t j = 0; j < numOfParam; ++j) { SNode* p1 = nodesListGetNode(pFuncNode->pParameterList, j); - SColumnNode* pcn = (SColumnNode*)p1; // TODO refactor + if (p1->type == QUERY_NODE_COLUMN) { + SColumnNode* pcn = (SColumnNode*)p1; // TODO refactor - pCol->slotId = pcn->slotId; - pCol->bytes = pcn->node.resType.bytes; - pCol->type = pcn->node.resType.type; - pCol->scale = pcn->node.resType.scale; - pCol->precision = pcn->node.resType.precision; - pCol->dataBlockId = pcn->dataBlockId; + pExp->base.pParam[j].type = FUNC_PARAM_TYPE_COLUMN; + pExp->base.pParam[j].pCol = taosMemoryCalloc(1, sizeof(SColumn)); + SColumn* pCol = pExp->base.pParam[j].pCol; + + pCol->slotId = pcn->slotId; + pCol->bytes = pcn->node.resType.bytes; + pCol->type = pcn->node.resType.type; + pCol->scale = pcn->node.resType.scale; + pCol->precision = pcn->node.resType.precision; + pCol->dataBlockId = pcn->dataBlockId; + } else if (p1->type == QUERY_NODE_VALUE) { + SValueNode* pvn = (SValueNode*)p1; + + pExp->base.pParam[j].type = FUNC_PARAM_TYPE_VALUE; + } } } else if (nodeType(pTargetNode->pExpr) == QUERY_NODE_OPERATOR) { pExp->pExpr->nodeType = QUERY_NODE_OPERATOR; SOperatorNode* pNode = (SOperatorNode*) pTargetNode->pExpr; + pExp->base.pParam = taosMemoryCalloc(1, sizeof(SFunctParam)); + pExp->base.numOfParams = 1; + pExp->base.pParam[0].pCol = taosMemoryCalloc(1, sizeof(SColumn)); + pExp->base.pParam[0].type = FUNC_PARAM_TYPE_COLUMN; + SDataType* pType = &pNode->node.resType; pExp->base.resSchema = createResSchema(pType->type, pType->bytes, pTargetNode->slotId, pType->scale, pType->precision, pNode->node.aliasName); pExp->pExpr->_optrRoot.pRootNode = pTargetNode->pExpr; + SColumn* pCol = pExp->base.pParam[0].pCol; pCol->slotId = pTargetNode->slotId; // TODO refactor pCol->bytes = pType->bytes; pCol->type = pType->type; diff --git a/source/libs/scalar/src/sclfunc.c b/source/libs/scalar/src/sclfunc.c index bab626f2c0..af5f07751b 100644 --- a/source/libs/scalar/src/sclfunc.c +++ b/source/libs/scalar/src/sclfunc.c @@ -107,96 +107,14 @@ int32_t absFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutpu return TSDB_CODE_SUCCESS; } -#if 0 -int32_t logFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) { - if (inputNum != 2 || !IS_NUMERIC_TYPE(pInput[0].type) || !IS_NUMERIC_TYPE(pInput[1].type)) { - return TSDB_CODE_FAILED; - } - - char **input = NULL, *output = NULL; - bool hasNullInput = false; - input = taosMemoryCalloc(inputNum, sizeof(char *)); - for (int32_t i = 0; i < pOutput->num; ++i) { - for (int32_t j = 0; j < inputNum; ++j) { - if (pInput[j].num == 1) { - input[j] = pInput[j].data; - } else { - input[j] = pInput[j].data + i * pInput[j].bytes; - } - if (isNull(input[j], pInput[j].type)) { - hasNullInput = true; - break; - } - } - output = pOutput->data + i * pOutput->bytes; - - if (hasNullInput) { - setNull(output, pOutput->type, pOutput->bytes); - continue; - } - - double base; - GET_TYPED_DATA(base, double, pInput[1].type, input[1]); - double v; - GET_TYPED_DATA(v, double, pInput[0].type, input[0]); - double result = log(v) / log(base); - SET_TYPED_DATA(output, pOutput->type, result); - } - - taosMemoryFree(input); - - return TSDB_CODE_SUCCESS; -} -#endif - -#if 0 -int32_t powFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) { - if (inputNum != 2 || !IS_NUMERIC_TYPE(pInput[0].type) || !IS_NUMERIC_TYPE(pInput[1].type)) { - return TSDB_CODE_FAILED; - } - - pOutput->type = TSDB_DATA_TYPE_DOUBLE; - pOutput->bytes = tDataTypes[TSDB_DATA_TYPE_DOUBLE].bytes; - - char **input = NULL, *output = NULL; - bool hasNullInput = false; - input = taosMemoryCalloc(inputNum, sizeof(char *)); - for (int32_t i = 0; i < pOutput->num; ++i) { - for (int32_t j = 0; j < inputNum; ++j) { - if (pInput[j].num == 1) { - input[j] = pInput[j].data; - } else { - input[j] = pInput[j].data + i * pInput[j].bytes; - } - if (isNull(input[j], pInput[j].type)) { - hasNullInput = true; - break; - } - } - output = pOutput->data + i * pOutput->bytes; - - if (hasNullInput) { - setNull(output, pOutput->type, pOutput->bytes); - continue; - } - - double base; - GET_TYPED_DATA(base, double, pInput[1].type, input[1]); - double v; - GET_TYPED_DATA(v, double, pInput[0].type, input[0]); - double result = pow(v, base); - SET_TYPED_DATA(output, pOutput->type, result); - } - - taosMemoryFree(input); - return TSDB_CODE_SUCCESS; -} -#endif - typedef float (*_float_fn)(float); typedef double (*_double_fn)(double); typedef double (*_double_fn_2)(double, double); +double tlog(double v, double base) { + return log(v) / log(base); +} + int32_t doScalarFunctionUnique(SScalarParam *pInput, int32_t inputNum, SScalarParam* pOutput, _double_fn valFn) { int32_t type = GET_PARAM_TYPE(pInput); if (inputNum != 1 || !IS_NUMERIC_TYPE(type)) { @@ -222,10 +140,6 @@ int32_t doScalarFunctionUnique(SScalarParam *pInput, int32_t inputNum, SScalarPa return TSDB_CODE_SUCCESS; } -double tlog(double v, double base) { - return log(v) / log(base); -} - int32_t doScalarFunctionUnique2(SScalarParam *pInput, int32_t inputNum, SScalarParam* pOutput, _double_fn_2 valFn) { if (inputNum != 2 || !IS_NUMERIC_TYPE(GET_PARAM_TYPE(&pInput[0])) || !IS_NUMERIC_TYPE(GET_PARAM_TYPE(&pInput[1]))) { return TSDB_CODE_FAILED; From 7cb632cbf08a4c875b6a2a18c9488d5becfc0dc0 Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Tue, 29 Mar 2022 17:31:09 +0800 Subject: [PATCH 3/6] fix math function unit test --- .../libs/scalar/test/scalar/scalarTests.cpp | 20 +++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/source/libs/scalar/test/scalar/scalarTests.cpp b/source/libs/scalar/test/scalar/scalarTests.cpp index b3211babf1..153222516c 100644 --- a/source/libs/scalar/test/scalar/scalarTests.cpp +++ b/source/libs/scalar/test/scalar/scalarTests.cpp @@ -2833,11 +2833,11 @@ TEST(ScalarFunctionTest, logFunction_column) { int32_t rowNum = 3; int32_t type; int32_t otype = TSDB_DATA_TYPE_DOUBLE; - double result[] = {2.0, 4.0, 3.0}; + double result[] = {2.0, 3.0, 4.0}; pInput = (SScalarParam *)taosMemoryCalloc(2, sizeof(SScalarParam)); //TINYINT - int8_t val_tinyint[2][3] = {{25, 81, 64}, {5, 3, 4}}; + int8_t val_tinyint[2][3] = {{9, 27, 81}, {3, 3, 3}}; type = TSDB_DATA_TYPE_TINYINT; for (int32_t i = 0; i < 2; ++i) { scltMakeDataBlock(&input[i], type, 0, rowNum, false); @@ -2863,7 +2863,7 @@ TEST(ScalarFunctionTest, logFunction_column) { scltDestroyDataBlock(pOutput); //FLOAT - float val_float[2][3] = {{25.0, 81.0, 64.0}, {5.0, 3.0, 4.0}}; + float val_float[2][3] = {{9.0, 27.0, 81.0}, {3.0, 3.0, 3.0}}; type = TSDB_DATA_TYPE_FLOAT; for (int32_t i = 0; i < 2; ++i) { scltMakeDataBlock(&input[i], type, 0, rowNum, false); @@ -2888,8 +2888,8 @@ TEST(ScalarFunctionTest, logFunction_column) { scltDestroyDataBlock(pOutput); //TINYINT AND FLOAT - int8_t param0[] = {25, 81, 64}; - float param1[] = {5.0, 3.0, 4.0}; + int8_t param0[] = {9, 27, 81}; + float param1[] = {3.0, 3.0, 3.0}; scltMakeDataBlock(&input[0], TSDB_DATA_TYPE_TINYINT, 0, rowNum, false); pInput[0] = *input[0]; for (int32_t i = 0; i < rowNum; ++i) { @@ -3003,17 +3003,17 @@ TEST(ScalarFunctionTest, powFunction_column) { int32_t rowNum = 3; int32_t type; int32_t otype = TSDB_DATA_TYPE_DOUBLE; - double result[] = {32.0, 27.0, 16.0}; + double result[] = {8.0, 27.0, 64.0}; pInput = (SScalarParam *)taosMemoryCalloc(2, sizeof(SScalarParam)); //TINYINT - int8_t val_tinyint[2][3] = {{2, 3, 4}, {5, 3, 2}}; + int8_t val_tinyint[2][3] = {{2, 3, 4}, {3, 3, 3}}; type = TSDB_DATA_TYPE_TINYINT; for (int32_t i = 0; i < 2; ++i) { scltMakeDataBlock(&input[i], type, 0, rowNum, false); pInput[i] = *input[i]; for (int32_t j = 0; j < rowNum; ++j) { - colDataAppend(pInput[i].columnData, i, (const char*) &val_tinyint[i][j], false); + colDataAppend(pInput[i].columnData, j, (const char*) &val_tinyint[i][j], false); } PRINTF("tiny_int before POW:%d,%d,%d\n", *((int8_t *)pInput[i].data + 0), @@ -3034,7 +3034,7 @@ TEST(ScalarFunctionTest, powFunction_column) { scltDestroyDataBlock(pOutput); //FLOAT - float val_float[2][3] = {{2.0, 3.0, 4.0}, {5.0, 3.0, 2.0}}; + float val_float[2][3] = {{2.0, 3.0, 4.0}, {3.0, 3.0, 3.0}}; type = TSDB_DATA_TYPE_FLOAT; for (int32_t i = 0; i < 2; ++i) { scltMakeDataBlock(&input[i], type, 0, rowNum, false); @@ -3060,7 +3060,7 @@ TEST(ScalarFunctionTest, powFunction_column) { //TINYINT AND FLOAT int8_t param0[] = {2, 3, 4}; - float param1[] = {5.0, 3.0, 2.0}; + float param1[] = {3.0, 3.0, 2.0}; scltMakeDataBlock(&input[0], TSDB_DATA_TYPE_TINYINT, 0, rowNum, false); pInput[0] = *input[0]; for (int32_t i = 0; i < rowNum; ++i) { From ace84f8189a17cee4b452ed0c4ce7d2aed2fae07 Mon Sep 17 00:00:00 2001 From: plum-lihui Date: Tue, 29 Mar 2022 17:33:17 +0800 Subject: [PATCH 4/6] [add tmq cases] --- tests/script/jenkins/basic.txt | 2 +- tests/script/tsim/testCaseSuite.sim | 2 +- tests/script/tsim/tmq/basic1.sim | 12 +++---- tests/test/c/tmqSim.c | 52 ++++++++++++++++++----------- 4 files changed, 41 insertions(+), 27 deletions(-) diff --git a/tests/script/jenkins/basic.txt b/tests/script/jenkins/basic.txt index 77ee1d9de4..2cad518128 100644 --- a/tests/script/jenkins/basic.txt +++ b/tests/script/jenkins/basic.txt @@ -37,6 +37,6 @@ # ---- tmq ./test.sh -f tsim/tmq/basic.sim -#./test.sh -f tsim/tmq/basic1.sim +./test.sh -f tsim/tmq/basic1.sim #======================b1-end=============== diff --git a/tests/script/tsim/testCaseSuite.sim b/tests/script/tsim/testCaseSuite.sim index 2ed5e5fcf3..bf184f8794 100644 --- a/tests/script/tsim/testCaseSuite.sim +++ b/tests/script/tsim/testCaseSuite.sim @@ -26,4 +26,4 @@ run tsim/show/basic.sim run tsim/table/basic1.sim run tsim/tmq/basic.sim -#run tsim/tmq/basic1.sim +run tsim/tmq/basic1.sim diff --git a/tests/script/tsim/tmq/basic1.sim b/tests/script/tsim/tmq/basic1.sim index b828dd5b78..27d08230c4 100644 --- a/tests/script/tsim/tmq/basic1.sim +++ b/tests/script/tsim/tmq/basic1.sim @@ -135,42 +135,42 @@ print inserted totalMsgCnt: $totalMsgCnt print cmd===> system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_stb_column" -k "group.id:tg2" system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_stb_column" -k "group.id:tg2" print cmd result----> $system_content -if $system_content != @{consume success: 20}@ then +if $system_content != @{consume success: 20, 0}@ then return -1 endi #print cmd===> system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_stb_all" -k "group.id:tg2" #system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_stb_all" -k "group.id:tg2" #print cmd result----> $system_content -#if $system_content != @{consume success: 20}@ then +#if $system_content != @{consume success: 20, 0}@ then # return -1 #endi print cmd===> system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_ctb_column" -k "group.id:tg2" system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_ctb_column" -k "group.id:tg2" print cmd result----> $system_content -if $system_content != @{consume success: 10}@ then +if $system_content != @{consume success: 10, 0}@ then return -1 endi print cmd===> system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_ctb_all" -k "group.id:tg2" system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_ctb_all" -k "group.id:tg2" print cmd result----> $system_content -if $system_content != @{consume success: 10}@ then +if $system_content != @{consume success: 10, 0}@ then return -1 endi print cmd===> system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_ntb_column" -k "group.id:tg2" system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_ntb_column" -k "group.id:tg2" print cmd result----> $system_content -if $system_content != @{consume success: 20}@ then +if $system_content != @{consume success: 20, 0}@ then return -1 endi print cmd===> system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_ntb_all" -k "group.id:tg2" system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_ntb_all" -k "group.id:tg2" print cmd result----> $system_content -if $system_content != @{consume success: 20}@ then +if $system_content != @{consume success: 20, 0}@ then return -1 endi diff --git a/tests/test/c/tmqSim.c b/tests/test/c/tmqSim.c index 75542bfa71..4d3108500e 100644 --- a/tests/test/c/tmqSim.c +++ b/tests/test/c/tmqSim.c @@ -219,33 +219,33 @@ tmq_list_t* build_topic_list() { return topic_list; } -void perf_loop(tmq_t* tmq, tmq_list_t* topics) { +void loop_consume(tmq_t* tmq) { tmq_resp_err_t err; - if ((err = tmq_subscribe(tmq, topics))) { - printf("tmq_subscribe() fail, reason: %s\n", tmq_err2str(err)); - exit(-1); - } - int32_t totalMsgs = 0; + int32_t totalRows = 0; int32_t skipLogNum = 0; - //int64_t startTime = taosGetTimestampUs(); while (running) { - tmq_message_t* tmqmessage = tmq_consumer_poll(tmq, 1); - if (tmqmessage) { - totalMsgs++; - skipLogNum += tmqGetSkipLogNum(tmqmessage); - if (0 != g_stConfInfo.showMsgFlag) { - msg_process(tmqmessage); + tmq_message_t* tmqMsg = tmq_consumer_poll(tmq, 1); + if (tmqMsg) { + totalMsgs++; + + #if 0 + TAOS_ROW row; + while (NULL != (row = tmq_get_row(tmqMsg))) { + totalRows++; } - tmq_message_destroy(tmqmessage); + #endif + + skipLogNum += tmqGetSkipLogNum(tmqMsg); + if (0 != g_stConfInfo.showMsgFlag) { + msg_process(tmqMsg); + } + tmq_message_destroy(tmqMsg); } else { break; } } - //int64_t endTime = taosGetTimestampUs(); - //double consumeTime = (double)(endTime - startTime) / 1000000; - err = tmq_consumer_close(tmq); if (err) { @@ -253,7 +253,7 @@ void perf_loop(tmq_t* tmq, tmq_list_t* topics) { exit(-1); } - printf("{consume success: %d}", totalMsgs); + printf("{consume success: %d, %d}", totalMsgs, totalRows); } int main(int32_t argc, char *argv[]) { @@ -266,7 +266,21 @@ int main(int32_t argc, char *argv[]) { return -1; } - perf_loop(tmq, topic_list); + tmq_resp_err_t err = tmq_subscribe(tmq, topic_list); + if (err) { + printf("tmq_subscribe() fail, reason: %s\n", tmq_err2str(err)); + exit(-1); + } + + loop_consume(tmq); + + #if 0 + err = tmq_unsubscribe(tmq); + if (err) { + printf("tmq_unsubscribe() fail, reason: %s\n", tmq_err2str(err)); + exit(-1); + } + #endif return 0; } From 49558da2774fa37a129dc9b8e99ef0c6a0d908a7 Mon Sep 17 00:00:00 2001 From: plum-lihui Date: Tue, 29 Mar 2022 17:41:23 +0800 Subject: [PATCH 5/6] [add notes] --- tests/script/tsim/tmq/basic1.sim | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/script/tsim/tmq/basic1.sim b/tests/script/tsim/tmq/basic1.sim index 27d08230c4..fe6a7a0660 100644 --- a/tests/script/tsim/tmq/basic1.sim +++ b/tests/script/tsim/tmq/basic1.sim @@ -3,9 +3,10 @@ # vgroups=1, multi topics for one consumer, include: columns from stb/ctb/ntb, * from stb/ctb/ntb, Scalar function from stb/ctb/ntb # vgroups=4, one topic for one consumer, include: columns from stb/ctb/ntb, * from stb/ctb/ntb, Scalar function from stb/ctb/ntb # vgroups=4, multi topics for one consumer, include: columns from stb/ctb/ntb, * from stb/ctb/ntb, Scalar function from stb/ctb/ntb +# notes1: Scalar function: ABS/ACOS/ASIN/ATAN/CEIL/COS/FLOOR/LOG/POW/ROUND/SIN/SQRT/TAN # The above use cases are combined with where filter conditions, such as: where ts > "2017-08-12 18:25:58.128Z" and sin(a) > 0.5; # -# notes: not support aggregate functions(such as sum/count/min/max) and time-windows(interval). +# notes2: not support aggregate functions(such as sum/count/min/max) and time-windows(interval). # system sh/stop_dnodes.sh From 7606fa91faf5a240dbd35f14416f22eb46910a37 Mon Sep 17 00:00:00 2001 From: plum-lihui Date: Tue, 29 Mar 2022 18:11:30 +0800 Subject: [PATCH 6/6] [modify case] --- tests/script/tsim/parser/groupby-basic.sim | 88 ++++++++-------------- 1 file changed, 31 insertions(+), 57 deletions(-) diff --git a/tests/script/tsim/parser/groupby-basic.sim b/tests/script/tsim/parser/groupby-basic.sim index cbd05031df..c0cbfa8aeb 100644 --- a/tests/script/tsim/parser/groupby-basic.sim +++ b/tests/script/tsim/parser/groupby-basic.sim @@ -107,80 +107,54 @@ print rows: $rows print $data00 $data01 $data02 $data03 print $data10 $data11 $data12 $data13 print $data20 $data21 $data22 $data23 -if $row != 20 then - return -1 +print $data80 $data81 $data82 $data83 +print $data90 $data91 $data92 $data93 +if $rows != 10 then + return -1 endi - -if $data00 != 100 then - return -1 -endi - +#if $data00 != 10 then +# return -1 +#endi if $data01 != 0 then return -1 endi - -if $data10 != 100 then - return -1 -endi - +#if $data10 != 10 then +# return -1 +#endi if $data11 != 1 then return -1 endi - -sql select first(ts),c1 from group_tb0 where c1<20 group by c1; -if $row != 20 then - return -1 -endi - -if $data00 != @70-01-01 08:01:40.000@ then - return -1 -endi - -if $data01 != 0 then - return -1 -endi - -if $data90 != @70-01-01 08:01:40.009@ then - return -1 -endi - +#if $data90 != 10 then +# return -1 +#endi if $data91 != 9 then return -1 endi -sql select first(ts), ts, c1 from group_tb0 where c1 < 20 group by c1; -print $row -if $row != 20 then +sql select first(ts),c1 from group_tb0 group by c1; +print rows: $rows +print $data00 $data01 $data02 $data03 +print $data10 $data11 $data12 $data13 +print $data20 $data21 $data22 $data23 +print $data80 $data81 $data82 $data83 +print $data90 $data91 $data92 $data93 +if $row != 10 then return -1 endi -if $data00 != $data01 then +if $data00 != @2022-01-01 00:00:00.000@ then + return -1 +endi +if $data01 != 0 then + return -1 +endi +if $data90 != @2022-01-01 00:00:00.009@ then + return -1 +endi +if $data91 != 9 then return -1 endi -if $data10 != $data11 then - return -1 -endi - -if $data20 != $data21 then - return -1 -endi - -if $data90 != $data91 then - return -1 -endi - -if $data02 != 0 then - return -1 -endi - -if $data12 != 1 then - return -1 -endi - -if $data92 != 9 then - return -1 -endi sql select sum(c1), c1, avg(c1), min(c1), max(c2) from group_tb0 where c1 < 20 group by c1; if $row != 20 then