diff --git a/include/common/ttypes.h b/include/common/ttypes.h index 16c59465cc..ec70dffd34 100644 --- a/include/common/ttypes.h +++ b/include/common/ttypes.h @@ -143,6 +143,86 @@ typedef struct { } \ } while (0) +#define SET_TYPED_DATA_MIN(_v, _type) \ + do { \ + switch (_type) { \ + case TSDB_DATA_TYPE_BOOL: \ + case TSDB_DATA_TYPE_TINYINT: \ + *(int8_t *)(_v) = INT8_MIN; \ + break; \ + case TSDB_DATA_TYPE_SMALLINT: \ + *(int16_t *)(_v) = INT16_MIN; \ + break; \ + case TSDB_DATA_TYPE_INT: \ + *(int32_t *)(_v) = INT32_MIN; \ + break; \ + case TSDB_DATA_TYPE_BIGINT: \ + case TSDB_DATA_TYPE_TIMESTAMP: \ + *(int64_t *)(_v) = INT64_MIN; \ + break; \ + case TSDB_DATA_TYPE_FLOAT: \ + *(float *)(_v) = FLT_MIN; \ + break; \ + case TSDB_DATA_TYPE_DOUBLE: \ + *(double *)(_v) = DBL_MIN; \ + break; \ + case TSDB_DATA_TYPE_UTINYINT: \ + *(uint8_t *)(_v) = 0; \ + break; \ + case TSDB_DATA_TYPE_USMALLINT: \ + *(uint16_t *)(_v) = 0; \ + break; \ + case TSDB_DATA_TYPE_UBIGINT: \ + *(uint64_t *)(_v) = 0; \ + break; \ + case TSDB_DATA_TYPE_UINT: \ + *(uint32_t *)(_v) = 0; \ + break; \ + default: \ + break; \ + } \ + } while (0) + +#define SET_TYPED_DATA_MAX(_v, _type) \ + do { \ + switch (_type) { \ + case TSDB_DATA_TYPE_BOOL: \ + case TSDB_DATA_TYPE_TINYINT: \ + *(int8_t *)(_v) = INT8_MAX; \ + break; \ + case TSDB_DATA_TYPE_SMALLINT: \ + *(int16_t *)(_v) = INT16_MAX; \ + break; \ + case TSDB_DATA_TYPE_INT: \ + *(int32_t *)(_v) = INT32_MAX; \ + break; \ + case TSDB_DATA_TYPE_BIGINT: \ + case TSDB_DATA_TYPE_TIMESTAMP: \ + *(int64_t *)(_v) = INT64_MAX; \ + break; \ + case TSDB_DATA_TYPE_FLOAT: \ + *(float *)(_v) = FLT_MAX; \ + break; \ + case TSDB_DATA_TYPE_DOUBLE: \ + *(double *)(_v) = DBL_MAX; \ + break; \ + case TSDB_DATA_TYPE_UTINYINT: \ + *(uint8_t *)(_v) = UINT8_MAX; \ + break; \ + case TSDB_DATA_TYPE_USMALLINT: \ + *(uint16_t *)(_v) = UINT16_MAX; \ + break; \ + case TSDB_DATA_TYPE_UINT: \ + *(uint32_t *)(_v) = UINT32_MAX; \ + break; \ + case TSDB_DATA_TYPE_UBIGINT: \ + *(uint64_t *)(_v) = UINT64_MAX; \ + break; \ + default: \ + break; \ + } \ + } while (0) + #define NUM_TO_STRING(_inputType, _input, _outputBytes, _output) \ do { \ switch (_inputType) { \ diff --git a/include/libs/scalar/scalar.h b/include/libs/scalar/scalar.h index 517c5ff0e6..97c77ef01b 100644 --- a/include/libs/scalar/scalar.h +++ b/include/libs/scalar/scalar.h @@ -96,6 +96,12 @@ int32_t qEndTsFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOu int32_t qTbnameFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput); +/* Aggregation functions */ +int32_t countScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput); +int32_t sumScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput); +int32_t minScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput); +int32_t maxScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput); + #ifdef __cplusplus } #endif diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index 5feb142757..001261767d 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -1886,6 +1886,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .getEnvFunc = getCountFuncEnv, .initFunc = functionSetup, .processFunc = countFunction, + .sprocessFunc = countScalarFunction, .finalizeFunc = functionFinalize, .invertFunc = countInvertFunction, .combineFunc = combineFunction, @@ -1901,6 +1902,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .getEnvFunc = getSumFuncEnv, .initFunc = functionSetup, .processFunc = sumFunction, + .sprocessFunc = sumScalarFunction, .finalizeFunc = functionFinalize, .invertFunc = sumInvertFunction, .combineFunc = sumCombine, @@ -1916,6 +1918,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .getEnvFunc = getMinmaxFuncEnv, .initFunc = minmaxFunctionSetup, .processFunc = minFunction, + .sprocessFunc = minScalarFunction, .finalizeFunc = minmaxFunctionFinalize, .combineFunc = minCombine, .pPartialFunc = "min", @@ -1930,6 +1933,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .getEnvFunc = getMinmaxFuncEnv, .initFunc = minmaxFunctionSetup, .processFunc = maxFunction, + .sprocessFunc = maxScalarFunction, .finalizeFunc = minmaxFunctionFinalize, .combineFunc = maxCombine, .pPartialFunc = "max", diff --git a/source/libs/scalar/src/scalar.c b/source/libs/scalar/src/scalar.c index bdfc411fa6..bbb7e07bad 100644 --- a/source/libs/scalar/src/scalar.c +++ b/source/libs/scalar/src/scalar.c @@ -11,7 +11,7 @@ #include "ttime.h" int32_t scalarGetOperatorParamNum(EOperatorType type) { - if (OP_TYPE_IS_NULL == type || OP_TYPE_IS_NOT_NULL == type || OP_TYPE_IS_TRUE == type || OP_TYPE_IS_NOT_TRUE == type + if (OP_TYPE_IS_NULL == type || OP_TYPE_IS_NOT_NULL == type || OP_TYPE_IS_TRUE == type || OP_TYPE_IS_NOT_TRUE == type || OP_TYPE_IS_FALSE == type || OP_TYPE_IS_NOT_FALSE == type || OP_TYPE_IS_UNKNOWN == type || OP_TYPE_IS_NOT_UNKNOWN == type || OP_TYPE_MINUS == type) { return 1; @@ -28,7 +28,7 @@ int32_t sclConvertToTsValueNode(int8_t precision, SValueNode* valueNode) { } taosMemoryFree(timeStr); valueNode->typeData = valueNode->datum.i; - + valueNode->node.resType.type = TSDB_DATA_TYPE_TIMESTAMP; valueNode->node.resType.bytes = tDataTypes[TSDB_DATA_TYPE_TIMESTAMP].bytes; @@ -82,7 +82,7 @@ int32_t scalarGenerateSetFromList(void **data, void *pNode, uint32_t type) { SCL_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } - taosHashSetEqualFp(pObj, taosGetDefaultEqualFunction(type)); + taosHashSetEqualFp(pObj, taosGetDefaultEqualFunction(type)); int32_t code = 0; SNodeListNode *nodeList = (SNodeListNode *)pNode; @@ -91,10 +91,10 @@ int32_t scalarGenerateSetFromList(void **data, void *pNode, uint32_t type) { int32_t len = 0; void *buf = NULL; - + for (int32_t i = 0; i < nodeList->pNodeList->length; ++i) { SValueNode *valueNode = (SValueNode *)cell->pNode; - + if (valueNode->node.resType.type != type) { out.columnData->info.type = type; if (IS_VAR_DATA_TYPE(type)) { @@ -134,7 +134,7 @@ int32_t scalarGenerateSetFromList(void **data, void *pNode, uint32_t type) { len = valueNode->node.resType.bytes; } } - + if (taosHashPut(pObj, buf, (size_t)len, NULL, 0)) { sclError("taosHashPut to set failed"); SCL_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); @@ -180,7 +180,7 @@ int32_t sclCopyValueNodeValue(SValueNode *pNode, void **res) { if (TSDB_DATA_TYPE_NULL == pNode->node.resType.type) { return TSDB_CODE_SUCCESS; } - + *res = taosMemoryMalloc(pNode->node.resType.bytes); if (NULL == (*res)) { sclError("malloc %d failed", pNode->node.resType.bytes); @@ -222,14 +222,14 @@ int32_t sclInitParam(SNode* node, SScalarParam *param, SScalarCtx *ctx, int32_t if (type == 0) { type = nodeList->dataType.type; } - + SCL_ERR_RET(scalarGenerateSetFromList((void **)¶m->pHashFilter, node, type)); param->hashValueType = type; if (taosHashPut(ctx->pRes, &node, POINTER_BYTES, param, sizeof(*param))) { taosHashCleanup(param->pHashFilter); sclError("taosHashPut nodeList failed, size:%d", (int32_t)sizeof(*param)); return TSDB_CODE_QRY_OUT_OF_MEMORY; - } + } break; } case QUERY_NODE_COLUMN: { @@ -237,7 +237,7 @@ int32_t sclInitParam(SNode* node, SScalarParam *param, SScalarCtx *ctx, int32_t sclError("invalid node type for constant calculating, type:%d, src:%p", nodeType(node), ctx->pBlockList); SCL_ERR_RET(TSDB_CODE_QRY_APP_ERROR); } - + SColumnNode *ref = (SColumnNode *)node; int32_t index = -1; @@ -285,7 +285,7 @@ int32_t sclInitParam(SNode* node, SScalarParam *param, SScalarCtx *ctx, int32_t sclError("different row nums, rowNum:%d, newRowNum:%d", *rowNum, param->numOfRows); SCL_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); } - + *rowNum = param->numOfRows; } @@ -293,7 +293,7 @@ int32_t sclInitParam(SNode* node, SScalarParam *param, SScalarCtx *ctx, int32_t return TSDB_CODE_SUCCESS; } -int32_t sclInitParamList(SScalarParam **pParams, SNodeList* pParamList, SScalarCtx *ctx, int32_t *paramNum, int32_t *rowNum) { +int32_t sclInitParamList(SScalarParam **pParams, SNodeList* pParamList, SScalarCtx *ctx, int32_t *paramNum, int32_t *rowNum) { int32_t code = 0; if (NULL == pParamList) { if (ctx->pBlockList) { @@ -318,18 +318,18 @@ int32_t sclInitParamList(SScalarParam **pParams, SNodeList* pParamList, SScalarC SNode *tnode = NULL; int32_t i = 0; if (SCL_IS_CONST_CALC(ctx)) { - WHERE_EACH (tnode, pParamList) { + WHERE_EACH (tnode, pParamList) { if (!SCL_IS_CONST_NODE(tnode)) { WHERE_NEXT; } else { SCL_ERR_JRET(sclInitParam(tnode, ¶mList[i], ctx, rowNum)); ERASE_NODE(pParamList); } - + ++i; } } else { - FOREACH(tnode, pParamList) { + FOREACH(tnode, pParamList) { SCL_ERR_JRET(sclInitParam(tnode, ¶mList[i], ctx, rowNum)); ++i; } @@ -339,7 +339,7 @@ int32_t sclInitParamList(SScalarParam **pParams, SNodeList* pParamList, SScalarC } if (0 == *rowNum) { - taosMemoryFreeClear(paramList); + taosMemoryFreeClear(paramList); } *pParams = paramList; @@ -354,7 +354,7 @@ int32_t sclGetNodeType(SNode *pNode, SScalarCtx *ctx) { if (NULL == pNode) { return -1; } - + switch ((int)nodeType(pNode)) { case QUERY_NODE_VALUE: { SValueNode *valueNode = (SValueNode *)pNode; @@ -397,7 +397,7 @@ int32_t sclInitOperatorParams(SScalarParam **pParams, SOperatorNode *node, SScal sclError("invalid operation node, left:%p, right:%p", node->pLeft, node->pRight); SCL_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); } - + SScalarParam *paramList = taosMemoryCalloc(paramNum, sizeof(SScalarParam)); if (NULL == paramList) { sclError("calloc %d failed", (int32_t)(paramNum * sizeof(SScalarParam))); @@ -440,7 +440,7 @@ int32_t sclExecFunction(SFunctionNode *node, SScalarCtx *ctx, SScalarParam *outp sclError("fmGetFuncExecFuncs failed, funcId:%d, code:%s", node->funcId, tstrerror(code)); SCL_ERR_JRET(code); } - + code = sclCreateColumnInfoData(&node->node.resType, rowNum, output); if (code != TSDB_CODE_SUCCESS) { SCL_ERR_JRET(code); @@ -588,27 +588,27 @@ EDealRes sclRewriteNullInOptr(SNode** pNode, SScalarCtx *ctx, EOperatorType opTy if (opType <= OP_TYPE_CALC_MAX) { SValueNode *res = (SValueNode *)nodesMakeNode(QUERY_NODE_VALUE); if (NULL == res) { - sclError("make value node failed"); + sclError("make value node failed"); ctx->code = TSDB_CODE_QRY_OUT_OF_MEMORY; return DEAL_RES_ERROR; } - + res->node.resType.type = TSDB_DATA_TYPE_NULL; - + nodesDestroyNode(*pNode); *pNode = (SNode*)res; } else { SValueNode *res = (SValueNode *)nodesMakeNode(QUERY_NODE_VALUE); if (NULL == res) { - sclError("make value node failed"); + sclError("make value node failed"); ctx->code = TSDB_CODE_QRY_OUT_OF_MEMORY; return DEAL_RES_ERROR; } - + res->node.resType.type = TSDB_DATA_TYPE_BOOL; res->node.resType.bytes = tDataTypes[TSDB_DATA_TYPE_BOOL].bytes; res->datum.b = false; - + nodesDestroyNode(*pNode); *pNode = (SNode*)res; } @@ -641,12 +641,12 @@ EDealRes sclRewriteNonConstOperator(SNode** pNode, SScalarCtx *ctx) { if (node->pLeft && (QUERY_NODE_VALUE == nodeType(node->pLeft))) { SValueNode *valueNode = (SValueNode *)node->pLeft; - if (SCL_IS_NULL_VALUE_NODE(valueNode) && (node->opType != OP_TYPE_IS_NULL && node->opType != OP_TYPE_IS_NOT_NULL) + if (SCL_IS_NULL_VALUE_NODE(valueNode) && (node->opType != OP_TYPE_IS_NULL && node->opType != OP_TYPE_IS_NOT_NULL) && (!sclContainsAggFuncNode(node->pRight))) { return sclRewriteNullInOptr(pNode, ctx, node->opType); } - if (IS_STR_DATA_TYPE(valueNode->node.resType.type) && node->pRight && nodesIsExprNode(node->pRight) + if (IS_STR_DATA_TYPE(valueNode->node.resType.type) && node->pRight && nodesIsExprNode(node->pRight) && ((SExprNode*)node->pRight)->resType.type == TSDB_DATA_TYPE_TIMESTAMP) { code = sclConvertToTsValueNode(((SExprNode*)node->pRight)->resType.precision, valueNode); if (code) { @@ -663,7 +663,7 @@ EDealRes sclRewriteNonConstOperator(SNode** pNode, SScalarCtx *ctx) { return sclRewriteNullInOptr(pNode, ctx, node->opType); } - if (IS_STR_DATA_TYPE(valueNode->node.resType.type) && node->pLeft && nodesIsExprNode(node->pLeft) + if (IS_STR_DATA_TYPE(valueNode->node.resType.type) && node->pLeft && nodesIsExprNode(node->pLeft) && ((SExprNode*)node->pLeft)->resType.type == TSDB_DATA_TYPE_TIMESTAMP) { code = sclConvertToTsValueNode(((SExprNode*)node->pLeft)->resType.precision, valueNode); if (code) { @@ -728,8 +728,9 @@ EDealRes sclRewriteFunction(SNode** pNode, SScalarCtx *ctx) { res->translate = true; if (colDataIsNull_s(output.columnData, 0)) { - res->node.resType.type = TSDB_DATA_TYPE_NULL; - res->node.resType.bytes = tDataTypes[TSDB_DATA_TYPE_NULL].bytes; + res->isNull = true; + //res->node.resType.type = TSDB_DATA_TYPE_NULL; + //res->node.resType.bytes = tDataTypes[TSDB_DATA_TYPE_NULL].bytes; } else { res->node.resType.type = output.columnData->info.type; res->node.resType.bytes = output.columnData->info.bytes; @@ -832,7 +833,7 @@ EDealRes sclRewriteOperator(SNode** pNode, SScalarCtx *ctx) { res->datum.p = output.columnData->pData; output.columnData->pData = NULL; } else { - nodesSetValueNodeValue(res, output.columnData->pData); + nodesSetValueNodeValue(res, output.columnData->pData); } } @@ -899,7 +900,7 @@ EDealRes sclWalkLogic(SNode* pNode, SScalarCtx *ctx) { EDealRes sclWalkOperator(SNode* pNode, SScalarCtx *ctx) { SOperatorNode *node = (SOperatorNode *)pNode; SScalarParam output = {0}; - + ctx->code = sclExecOperator(node, ctx, &output); if (ctx->code) { return DEAL_RES_ERROR; @@ -1023,7 +1024,7 @@ int32_t sclCalcConstants(SNode *pNode, bool dual, SNode **pRes) { sclError("taosHashInit failed, num:%d", SCL_DEFAULT_OP_NUM); SCL_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } - + nodesRewriteExprPostOrder(&pNode, sclConstantsRewriter, (void *)&ctx); SCL_ERR_JRET(ctx.code); *pRes = pNode; @@ -1125,7 +1126,7 @@ int32_t scalarCalculate(SNode *pNode, SArray *pBlockList, SScalarParam *pDst) { sclError("taosHashInit failed, num:%d", SCL_DEFAULT_OP_NUM); SCL_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } - + nodesWalkExprPostOrder(pNode, sclCalcWalker, (void *)&ctx); SCL_ERR_JRET(ctx.code); diff --git a/source/libs/scalar/src/sclfunc.c b/source/libs/scalar/src/sclfunc.c index df5df127f0..cfdde85556 100644 --- a/source/libs/scalar/src/sclfunc.c +++ b/source/libs/scalar/src/sclfunc.c @@ -702,6 +702,7 @@ int32_t substrFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOu return TSDB_CODE_SUCCESS; } +/** Conversion functions **/ int32_t castFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) { int16_t inputType = GET_PARAM_TYPE(&pInput[0]); int16_t inputLen = GET_PARAM_BYTES(&pInput[0]); @@ -1164,6 +1165,7 @@ int32_t toJsonFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOu return TSDB_CODE_SUCCESS; } +/** Time functions **/ int32_t timeTruncateFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) { int32_t type = GET_PARAM_TYPE(&pInput[0]); @@ -1736,3 +1738,168 @@ int32_t qTbnameFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pO pOutput->numOfRows += pInput->numOfRows; return TSDB_CODE_SUCCESS; } + + +/** Aggregation functions **/ +int32_t countScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) { + SColumnInfoData *pInputData = pInput->columnData; + SColumnInfoData *pOutputData = pOutput->columnData; + + int64_t *out = (int64_t *)pOutputData->pData; + *out = 0; + for (int32_t i = 0; i < pInput->numOfRows; ++i) { + if (colDataIsNull_s(pInputData, i)) { + colDataAppendNULL(pOutputData, i); + break; + } + (*out)++; + } + + pOutput->numOfRows = pInput->numOfRows; + return TSDB_CODE_SUCCESS; +} + +int32_t sumScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) { + SColumnInfoData *pInputData = pInput->columnData; + SColumnInfoData *pOutputData = pOutput->columnData; + + int32_t type = GET_PARAM_TYPE(pInput); + for (int32_t i = 0; i < pInput->numOfRows; ++i) { + if (colDataIsNull_s(pInputData, i)) { + colDataAppendNULL(pOutputData, i); + break; + } + + if (IS_SIGNED_NUMERIC_TYPE(type)) { + int64_t *in = (int64_t *)pInputData->pData; + int64_t *out = (int64_t *)pOutputData->pData; + *out += in[i]; + } else if (IS_UNSIGNED_NUMERIC_TYPE(type)) { + uint64_t *in = (uint64_t *)pInputData->pData; + uint64_t *out = (uint64_t *)pOutputData->pData; + *out += in[i]; + } else if (IS_FLOAT_TYPE(type)) { + double *in = (double *)pInputData->pData; + double *out = (double *)pOutputData->pData; + *out += in[i]; + } + } + + pOutput->numOfRows = pInput->numOfRows; + return TSDB_CODE_SUCCESS; +} + +static int32_t doMinMaxScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput, bool isMinFunc) { + SColumnInfoData *pInputData = pInput->columnData; + SColumnInfoData *pOutputData = pOutput->columnData; + + int32_t type = GET_PARAM_TYPE(pInput); + + if (isMinFunc) { + SET_TYPED_DATA_MAX(pOutputData->pData, type); + } else { + SET_TYPED_DATA_MIN(pOutputData->pData, type); + } + + for (int32_t i = 0; i < pInput->numOfRows; ++i) { + if (colDataIsNull_s(pInputData, i)) { + colDataAppendNULL(pOutputData, i); + break; + } + + switch(type) { + case TSDB_DATA_TYPE_BOOL: + case TSDB_DATA_TYPE_TINYINT: { + int8_t *in = (int8_t *)pInputData->pData; + int8_t *out = (int8_t *)pOutputData->pData; + if((in[i] > *out) ^ isMinFunc) { + *out = in[i]; + } + break; + } + case TSDB_DATA_TYPE_SMALLINT: { + int16_t *in = (int16_t *)pInputData->pData; + int16_t *out = (int16_t *)pOutputData->pData; + if((in[i] > *out) ^ isMinFunc) { + *out = in[i]; + } + break; + } + case TSDB_DATA_TYPE_INT: { + int32_t *in = (int32_t *)pInputData->pData; + int32_t *out = (int32_t *)pOutputData->pData; + if((in[i] > *out) ^ isMinFunc) { + *out = in[i]; + } + break; + } + case TSDB_DATA_TYPE_BIGINT: { + int64_t *in = (int64_t *)pInputData->pData; + int64_t *out = (int64_t *)pOutputData->pData; + if((in[i] > *out) ^ isMinFunc) { + *out = in[i]; + } + break; + } + case TSDB_DATA_TYPE_UTINYINT: { + uint8_t *in = (uint8_t *)pInputData->pData; + uint8_t *out = (uint8_t *)pOutputData->pData; + if((in[i] > *out) ^ isMinFunc) { + *out = in[i]; + } + break; + } + case TSDB_DATA_TYPE_USMALLINT: { + uint16_t *in = (uint16_t *)pInputData->pData; + uint16_t *out = (uint16_t *)pOutputData->pData; + if((in[i] > *out) ^ isMinFunc) { + *out = in[i]; + } + break; + } + case TSDB_DATA_TYPE_UINT: { + uint32_t *in = (uint32_t *)pInputData->pData; + uint32_t *out = (uint32_t *)pOutputData->pData; + if((in[i] > *out) ^ isMinFunc) { + *out = in[i]; + } + break; + } + case TSDB_DATA_TYPE_UBIGINT: { + uint64_t *in = (uint64_t *)pInputData->pData; + uint64_t *out = (uint64_t *)pOutputData->pData; + if((in[i] > *out) ^ isMinFunc) { + *out = in[i]; + } + break; + } + case TSDB_DATA_TYPE_FLOAT: { + float *in = (float *)pInputData->pData; + float *out = (float *)pOutputData->pData; + if((in[i] > *out) ^ isMinFunc) { + *out = in[i]; + } + break; + } + case TSDB_DATA_TYPE_DOUBLE: { + double *in = (double *)pInputData->pData; + double *out = (double *)pOutputData->pData; + if((in[i] > *out) ^ isMinFunc) { + *out = in[i]; + } + break; + } + } + } + + pOutput->numOfRows = pInput->numOfRows; + return TSDB_CODE_SUCCESS; +} + +int32_t minScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) { + return doMinMaxScalarFunction(pInput, inputNum, pOutput, true); +} + +int32_t maxScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) { + return doMinMaxScalarFunction(pInput, inputNum, pOutput, false); +}