From dd8f235e974f476f8c99ba7ef05d5380c3c52144 Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Wed, 13 Jul 2022 17:47:56 +0800 Subject: [PATCH 1/9] feat(query): add count function scalar version TD-17344 --- include/libs/scalar/scalar.h | 3 +++ source/libs/function/src/builtins.c | 1 + source/libs/scalar/src/sclfunc.c | 22 ++++++++++++++++++++++ 3 files changed, 26 insertions(+) diff --git a/include/libs/scalar/scalar.h b/include/libs/scalar/scalar.h index c81c474366..dfc83e1ff0 100644 --- a/include/libs/scalar/scalar.h +++ b/include/libs/scalar/scalar.h @@ -95,6 +95,9 @@ int32_t qEndTsFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOu int32_t qTbnameFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput); +/* Aggregation functions */ +int32_t countScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput); + #ifdef __cplusplus } #endif diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index 1ed6dcad39..af0a0261d5 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -1886,6 +1886,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .getEnvFunc = getCountFuncEnv, .initFunc = functionSetup, .processFunc = countFunction, + .sprocessFunc = countScalarFunction, .finalizeFunc = functionFinalize, .invertFunc = countInvertFunction, .combineFunc = combineFunction, diff --git a/source/libs/scalar/src/sclfunc.c b/source/libs/scalar/src/sclfunc.c index df5df127f0..d0cad23983 100644 --- a/source/libs/scalar/src/sclfunc.c +++ b/source/libs/scalar/src/sclfunc.c @@ -702,6 +702,7 @@ int32_t substrFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOu return TSDB_CODE_SUCCESS; } +/** Conversion functions **/ int32_t castFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) { int16_t inputType = GET_PARAM_TYPE(&pInput[0]); int16_t inputLen = GET_PARAM_BYTES(&pInput[0]); @@ -1164,6 +1165,7 @@ int32_t toJsonFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOu return TSDB_CODE_SUCCESS; } +/** Time functions **/ int32_t timeTruncateFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) { int32_t type = GET_PARAM_TYPE(&pInput[0]); @@ -1736,3 +1738,23 @@ int32_t qTbnameFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pO pOutput->numOfRows += pInput->numOfRows; return TSDB_CODE_SUCCESS; } + + +/** Aggregation functions **/ +int32_t countScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) { + SColumnInfoData *pInputData = pInput->columnData; + SColumnInfoData *pOutputData = pOutput->columnData; + + int64_t *out = (int64_t *)pOutputData->pData; + *out = 0; + for (int32_t i = 0; i < pInput->numOfRows; ++i) { + if (colDataIsNull_s(pInputData, i)) { + colDataAppendNULL(pOutputData, i); + break; + } + (*out)++; + } + + pOutput->numOfRows = pInput->numOfRows; + return TSDB_CODE_SUCCESS; +} From 099f2d2c98b75326cf631b03c0262e4c78263557 Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Wed, 13 Jul 2022 18:14:27 +0800 Subject: [PATCH 2/9] remove scalar function check to allow agg function execute --- source/libs/scalar/src/scalar.c | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/source/libs/scalar/src/scalar.c b/source/libs/scalar/src/scalar.c index e610fcb62e..00b4dadc46 100644 --- a/source/libs/scalar/src/scalar.c +++ b/source/libs/scalar/src/scalar.c @@ -700,9 +700,9 @@ EDealRes sclRewriteNonConstOperator(SNode** pNode, SScalarCtx *ctx) { EDealRes sclRewriteFunction(SNode** pNode, SScalarCtx *ctx) { SFunctionNode *node = (SFunctionNode *)*pNode; SNode* tnode = NULL; - if (!fmIsScalarFunc(node->funcId)) { - return DEAL_RES_CONTINUE; - } + //if (!fmIsScalarFunc(node->funcId)) { + // return DEAL_RES_CONTINUE; + //} FOREACH(tnode, node->pParameterList) { if (!SCL_IS_CONST_NODE(tnode)) { @@ -728,8 +728,9 @@ EDealRes sclRewriteFunction(SNode** pNode, SScalarCtx *ctx) { res->translate = true; if (colDataIsNull_s(output.columnData, 0)) { - res->node.resType.type = TSDB_DATA_TYPE_NULL; - res->node.resType.bytes = tDataTypes[TSDB_DATA_TYPE_NULL].bytes; + res->isNull = true; + //res->node.resType.type = TSDB_DATA_TYPE_NULL; + //res->node.resType.bytes = tDataTypes[TSDB_DATA_TYPE_NULL].bytes; } else { res->node.resType.type = output.columnData->info.type; res->node.resType.bytes = output.columnData->info.bytes; From 4bc3770cd3001550c0bdecccd1455a989859bdbb Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Wed, 13 Jul 2022 18:16:41 +0800 Subject: [PATCH 3/9] fix code format --- source/libs/scalar/src/scalar.c | 64 ++++++++++++++++----------------- 1 file changed, 32 insertions(+), 32 deletions(-) diff --git a/source/libs/scalar/src/scalar.c b/source/libs/scalar/src/scalar.c index 00b4dadc46..fd4fe20dcf 100644 --- a/source/libs/scalar/src/scalar.c +++ b/source/libs/scalar/src/scalar.c @@ -11,7 +11,7 @@ #include "ttime.h" int32_t scalarGetOperatorParamNum(EOperatorType type) { - if (OP_TYPE_IS_NULL == type || OP_TYPE_IS_NOT_NULL == type || OP_TYPE_IS_TRUE == type || OP_TYPE_IS_NOT_TRUE == type + if (OP_TYPE_IS_NULL == type || OP_TYPE_IS_NOT_NULL == type || OP_TYPE_IS_TRUE == type || OP_TYPE_IS_NOT_TRUE == type || OP_TYPE_IS_FALSE == type || OP_TYPE_IS_NOT_FALSE == type || OP_TYPE_IS_UNKNOWN == type || OP_TYPE_IS_NOT_UNKNOWN == type || OP_TYPE_MINUS == type) { return 1; @@ -28,7 +28,7 @@ int32_t sclConvertToTsValueNode(int8_t precision, SValueNode* valueNode) { } taosMemoryFree(timeStr); valueNode->typeData = valueNode->datum.i; - + valueNode->node.resType.type = TSDB_DATA_TYPE_TIMESTAMP; valueNode->node.resType.bytes = tDataTypes[TSDB_DATA_TYPE_TIMESTAMP].bytes; @@ -82,7 +82,7 @@ int32_t scalarGenerateSetFromList(void **data, void *pNode, uint32_t type) { SCL_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } - taosHashSetEqualFp(pObj, taosGetDefaultEqualFunction(type)); + taosHashSetEqualFp(pObj, taosGetDefaultEqualFunction(type)); int32_t code = 0; SNodeListNode *nodeList = (SNodeListNode *)pNode; @@ -91,10 +91,10 @@ int32_t scalarGenerateSetFromList(void **data, void *pNode, uint32_t type) { int32_t len = 0; void *buf = NULL; - + for (int32_t i = 0; i < nodeList->pNodeList->length; ++i) { SValueNode *valueNode = (SValueNode *)cell->pNode; - + if (valueNode->node.resType.type != type) { out.columnData->info.type = type; if (IS_VAR_DATA_TYPE(type)) { @@ -134,7 +134,7 @@ int32_t scalarGenerateSetFromList(void **data, void *pNode, uint32_t type) { len = valueNode->node.resType.bytes; } } - + if (taosHashPut(pObj, buf, (size_t)len, NULL, 0)) { sclError("taosHashPut to set failed"); SCL_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); @@ -180,7 +180,7 @@ int32_t sclCopyValueNodeValue(SValueNode *pNode, void **res) { if (TSDB_DATA_TYPE_NULL == pNode->node.resType.type) { return TSDB_CODE_SUCCESS; } - + *res = taosMemoryMalloc(pNode->node.resType.bytes); if (NULL == (*res)) { sclError("malloc %d failed", pNode->node.resType.bytes); @@ -222,14 +222,14 @@ int32_t sclInitParam(SNode* node, SScalarParam *param, SScalarCtx *ctx, int32_t if (type == 0) { type = nodeList->dataType.type; } - + SCL_ERR_RET(scalarGenerateSetFromList((void **)¶m->pHashFilter, node, type)); param->hashValueType = type; if (taosHashPut(ctx->pRes, &node, POINTER_BYTES, param, sizeof(*param))) { taosHashCleanup(param->pHashFilter); sclError("taosHashPut nodeList failed, size:%d", (int32_t)sizeof(*param)); return TSDB_CODE_QRY_OUT_OF_MEMORY; - } + } break; } case QUERY_NODE_COLUMN: { @@ -237,7 +237,7 @@ int32_t sclInitParam(SNode* node, SScalarParam *param, SScalarCtx *ctx, int32_t sclError("invalid node type for constant calculating, type:%d, src:%p", nodeType(node), ctx->pBlockList); SCL_ERR_RET(TSDB_CODE_QRY_APP_ERROR); } - + SColumnNode *ref = (SColumnNode *)node; int32_t index = -1; @@ -285,7 +285,7 @@ int32_t sclInitParam(SNode* node, SScalarParam *param, SScalarCtx *ctx, int32_t sclError("different row nums, rowNum:%d, newRowNum:%d", *rowNum, param->numOfRows); SCL_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); } - + *rowNum = param->numOfRows; } @@ -293,7 +293,7 @@ int32_t sclInitParam(SNode* node, SScalarParam *param, SScalarCtx *ctx, int32_t return TSDB_CODE_SUCCESS; } -int32_t sclInitParamList(SScalarParam **pParams, SNodeList* pParamList, SScalarCtx *ctx, int32_t *paramNum, int32_t *rowNum) { +int32_t sclInitParamList(SScalarParam **pParams, SNodeList* pParamList, SScalarCtx *ctx, int32_t *paramNum, int32_t *rowNum) { int32_t code = 0; if (NULL == pParamList) { if (ctx->pBlockList) { @@ -318,18 +318,18 @@ int32_t sclInitParamList(SScalarParam **pParams, SNodeList* pParamList, SScalarC SNode *tnode = NULL; int32_t i = 0; if (SCL_IS_CONST_CALC(ctx)) { - WHERE_EACH (tnode, pParamList) { + WHERE_EACH (tnode, pParamList) { if (!SCL_IS_CONST_NODE(tnode)) { WHERE_NEXT; } else { SCL_ERR_JRET(sclInitParam(tnode, ¶mList[i], ctx, rowNum)); ERASE_NODE(pParamList); } - + ++i; } } else { - FOREACH(tnode, pParamList) { + FOREACH(tnode, pParamList) { SCL_ERR_JRET(sclInitParam(tnode, ¶mList[i], ctx, rowNum)); ++i; } @@ -339,7 +339,7 @@ int32_t sclInitParamList(SScalarParam **pParams, SNodeList* pParamList, SScalarC } if (0 == *rowNum) { - taosMemoryFreeClear(paramList); + taosMemoryFreeClear(paramList); } *pParams = paramList; @@ -354,7 +354,7 @@ int32_t sclGetNodeType(SNode *pNode, SScalarCtx *ctx) { if (NULL == pNode) { return -1; } - + switch ((int)nodeType(pNode)) { case QUERY_NODE_VALUE: { SValueNode *valueNode = (SValueNode *)pNode; @@ -397,7 +397,7 @@ int32_t sclInitOperatorParams(SScalarParam **pParams, SOperatorNode *node, SScal sclError("invalid operation node, left:%p, right:%p", node->pLeft, node->pRight); SCL_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); } - + SScalarParam *paramList = taosMemoryCalloc(paramNum, sizeof(SScalarParam)); if (NULL == paramList) { sclError("calloc %d failed", (int32_t)(paramNum * sizeof(SScalarParam))); @@ -440,7 +440,7 @@ int32_t sclExecFunction(SFunctionNode *node, SScalarCtx *ctx, SScalarParam *outp sclError("fmGetFuncExecFuncs failed, funcId:%d, code:%s", node->funcId, tstrerror(code)); SCL_ERR_JRET(code); } - + code = sclCreateColumnInfoData(&node->node.resType, rowNum, output); if (code != TSDB_CODE_SUCCESS) { SCL_ERR_JRET(code); @@ -588,27 +588,27 @@ EDealRes sclRewriteNullInOptr(SNode** pNode, SScalarCtx *ctx, EOperatorType opTy if (opType <= OP_TYPE_CALC_MAX) { SValueNode *res = (SValueNode *)nodesMakeNode(QUERY_NODE_VALUE); if (NULL == res) { - sclError("make value node failed"); + sclError("make value node failed"); ctx->code = TSDB_CODE_QRY_OUT_OF_MEMORY; return DEAL_RES_ERROR; } - + res->node.resType.type = TSDB_DATA_TYPE_NULL; - + nodesDestroyNode(*pNode); *pNode = (SNode*)res; } else { SValueNode *res = (SValueNode *)nodesMakeNode(QUERY_NODE_VALUE); if (NULL == res) { - sclError("make value node failed"); + sclError("make value node failed"); ctx->code = TSDB_CODE_QRY_OUT_OF_MEMORY; return DEAL_RES_ERROR; } - + res->node.resType.type = TSDB_DATA_TYPE_BOOL; res->node.resType.bytes = tDataTypes[TSDB_DATA_TYPE_BOOL].bytes; res->datum.b = false; - + nodesDestroyNode(*pNode); *pNode = (SNode*)res; } @@ -641,12 +641,12 @@ EDealRes sclRewriteNonConstOperator(SNode** pNode, SScalarCtx *ctx) { if (node->pLeft && (QUERY_NODE_VALUE == nodeType(node->pLeft))) { SValueNode *valueNode = (SValueNode *)node->pLeft; - if (SCL_IS_NULL_VALUE_NODE(valueNode) && (node->opType != OP_TYPE_IS_NULL && node->opType != OP_TYPE_IS_NOT_NULL) + if (SCL_IS_NULL_VALUE_NODE(valueNode) && (node->opType != OP_TYPE_IS_NULL && node->opType != OP_TYPE_IS_NOT_NULL) && (!sclContainsAggFuncNode(node->pRight))) { return sclRewriteNullInOptr(pNode, ctx, node->opType); } - if (IS_STR_DATA_TYPE(valueNode->node.resType.type) && node->pRight && nodesIsExprNode(node->pRight) + if (IS_STR_DATA_TYPE(valueNode->node.resType.type) && node->pRight && nodesIsExprNode(node->pRight) && ((SExprNode*)node->pRight)->resType.type == TSDB_DATA_TYPE_TIMESTAMP) { code = sclConvertToTsValueNode(((SExprNode*)node->pRight)->resType.precision, valueNode); if (code) { @@ -663,7 +663,7 @@ EDealRes sclRewriteNonConstOperator(SNode** pNode, SScalarCtx *ctx) { return sclRewriteNullInOptr(pNode, ctx, node->opType); } - if (IS_STR_DATA_TYPE(valueNode->node.resType.type) && node->pLeft && nodesIsExprNode(node->pLeft) + if (IS_STR_DATA_TYPE(valueNode->node.resType.type) && node->pLeft && nodesIsExprNode(node->pLeft) && ((SExprNode*)node->pLeft)->resType.type == TSDB_DATA_TYPE_TIMESTAMP) { code = sclConvertToTsValueNode(((SExprNode*)node->pLeft)->resType.precision, valueNode); if (code) { @@ -833,7 +833,7 @@ EDealRes sclRewriteOperator(SNode** pNode, SScalarCtx *ctx) { res->datum.p = output.columnData->pData; output.columnData->pData = NULL; } else { - nodesSetValueNodeValue(res, output.columnData->pData); + nodesSetValueNodeValue(res, output.columnData->pData); } } @@ -900,7 +900,7 @@ EDealRes sclWalkLogic(SNode* pNode, SScalarCtx *ctx) { EDealRes sclWalkOperator(SNode* pNode, SScalarCtx *ctx) { SOperatorNode *node = (SOperatorNode *)pNode; SScalarParam output = {0}; - + ctx->code = sclExecOperator(node, ctx, &output); if (ctx->code) { return DEAL_RES_ERROR; @@ -1023,7 +1023,7 @@ int32_t scalarCalculateConstants(SNode *pNode, SNode **pRes) { sclError("taosHashInit failed, num:%d", SCL_DEFAULT_OP_NUM); SCL_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } - + nodesRewriteExprPostOrder(&pNode, sclConstantsRewriter, (void *)&ctx); SCL_ERR_JRET(ctx.code); *pRes = pNode; @@ -1047,7 +1047,7 @@ int32_t scalarCalculate(SNode *pNode, SArray *pBlockList, SScalarParam *pDst) { sclError("taosHashInit failed, num:%d", SCL_DEFAULT_OP_NUM); SCL_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } - + nodesWalkExprPostOrder(pNode, sclCalcWalker, (void *)&ctx); SCL_ERR_JRET(ctx.code); From 80f175bdbed9cf09f5a50aaa26d6c731ff7b0e02 Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Wed, 13 Jul 2022 19:46:56 +0800 Subject: [PATCH 4/9] feat(query): add count function scalar version TD-17344 --- include/libs/scalar/scalar.h | 1 + source/libs/function/src/builtins.c | 1 + source/libs/scalar/src/sclfunc.c | 30 +++++++++++++++++++++++++++++ 3 files changed, 32 insertions(+) diff --git a/include/libs/scalar/scalar.h b/include/libs/scalar/scalar.h index dfc83e1ff0..b047557424 100644 --- a/include/libs/scalar/scalar.h +++ b/include/libs/scalar/scalar.h @@ -97,6 +97,7 @@ int32_t qTbnameFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pO /* Aggregation functions */ int32_t countScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput); +int32_t sumScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput); #ifdef __cplusplus } diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index af0a0261d5..8226eccf14 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -1902,6 +1902,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .getEnvFunc = getSumFuncEnv, .initFunc = functionSetup, .processFunc = sumFunction, + .sprocessFunc = sumScalarFunction, .finalizeFunc = functionFinalize, .invertFunc = sumInvertFunction, .combineFunc = sumCombine, diff --git a/source/libs/scalar/src/sclfunc.c b/source/libs/scalar/src/sclfunc.c index d0cad23983..ce4631b8ee 100644 --- a/source/libs/scalar/src/sclfunc.c +++ b/source/libs/scalar/src/sclfunc.c @@ -1758,3 +1758,33 @@ int32_t countScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam pOutput->numOfRows = pInput->numOfRows; return TSDB_CODE_SUCCESS; } + +int32_t sumScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) { + SColumnInfoData *pInputData = pInput->columnData; + SColumnInfoData *pOutputData = pOutput->columnData; + + int32_t type = GET_PARAM_TYPE(pInput); + for (int32_t i = 0; i < pInput->numOfRows; ++i) { + if (colDataIsNull_s(pInputData, i)) { + colDataAppendNULL(pOutputData, i); + break; + } + + if (IS_SIGNED_NUMERIC_TYPE(type)) { + int64_t *in = (int64_t *)pInputData->pData; + int64_t *out = (int64_t *)pOutputData->pData; + *out += in[i]; + } else if (IS_UNSIGNED_NUMERIC_TYPE(type)) { + uint64_t *in = (uint64_t *)pInputData->pData; + uint64_t *out = (uint64_t *)pOutputData->pData; + *out += in[i]; + } else if (IS_FLOAT_TYPE(type)) { + double *in = (double *)pInputData->pData; + double *out = (double *)pOutputData->pData; + *out += in[i]; + } + } + + pOutput->numOfRows = pInput->numOfRows; + return TSDB_CODE_SUCCESS; +} From 8b1e56389c916541572bbfc77f5e0b7c46cdc6b5 Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Wed, 13 Jul 2022 19:46:56 +0800 Subject: [PATCH 5/9] feat(query): add min function scalar version TD-17344 --- include/common/ttypes.h | 80 ++++++++++++++++++++++ include/libs/scalar/scalar.h | 1 + source/libs/function/src/builtins.c | 1 + source/libs/scalar/src/sclfunc.c | 102 ++++++++++++++++++++++++++++ 4 files changed, 184 insertions(+) diff --git a/include/common/ttypes.h b/include/common/ttypes.h index 16c59465cc..ec70dffd34 100644 --- a/include/common/ttypes.h +++ b/include/common/ttypes.h @@ -143,6 +143,86 @@ typedef struct { } \ } while (0) +#define SET_TYPED_DATA_MIN(_v, _type) \ + do { \ + switch (_type) { \ + case TSDB_DATA_TYPE_BOOL: \ + case TSDB_DATA_TYPE_TINYINT: \ + *(int8_t *)(_v) = INT8_MIN; \ + break; \ + case TSDB_DATA_TYPE_SMALLINT: \ + *(int16_t *)(_v) = INT16_MIN; \ + break; \ + case TSDB_DATA_TYPE_INT: \ + *(int32_t *)(_v) = INT32_MIN; \ + break; \ + case TSDB_DATA_TYPE_BIGINT: \ + case TSDB_DATA_TYPE_TIMESTAMP: \ + *(int64_t *)(_v) = INT64_MIN; \ + break; \ + case TSDB_DATA_TYPE_FLOAT: \ + *(float *)(_v) = FLT_MIN; \ + break; \ + case TSDB_DATA_TYPE_DOUBLE: \ + *(double *)(_v) = DBL_MIN; \ + break; \ + case TSDB_DATA_TYPE_UTINYINT: \ + *(uint8_t *)(_v) = 0; \ + break; \ + case TSDB_DATA_TYPE_USMALLINT: \ + *(uint16_t *)(_v) = 0; \ + break; \ + case TSDB_DATA_TYPE_UBIGINT: \ + *(uint64_t *)(_v) = 0; \ + break; \ + case TSDB_DATA_TYPE_UINT: \ + *(uint32_t *)(_v) = 0; \ + break; \ + default: \ + break; \ + } \ + } while (0) + +#define SET_TYPED_DATA_MAX(_v, _type) \ + do { \ + switch (_type) { \ + case TSDB_DATA_TYPE_BOOL: \ + case TSDB_DATA_TYPE_TINYINT: \ + *(int8_t *)(_v) = INT8_MAX; \ + break; \ + case TSDB_DATA_TYPE_SMALLINT: \ + *(int16_t *)(_v) = INT16_MAX; \ + break; \ + case TSDB_DATA_TYPE_INT: \ + *(int32_t *)(_v) = INT32_MAX; \ + break; \ + case TSDB_DATA_TYPE_BIGINT: \ + case TSDB_DATA_TYPE_TIMESTAMP: \ + *(int64_t *)(_v) = INT64_MAX; \ + break; \ + case TSDB_DATA_TYPE_FLOAT: \ + *(float *)(_v) = FLT_MAX; \ + break; \ + case TSDB_DATA_TYPE_DOUBLE: \ + *(double *)(_v) = DBL_MAX; \ + break; \ + case TSDB_DATA_TYPE_UTINYINT: \ + *(uint8_t *)(_v) = UINT8_MAX; \ + break; \ + case TSDB_DATA_TYPE_USMALLINT: \ + *(uint16_t *)(_v) = UINT16_MAX; \ + break; \ + case TSDB_DATA_TYPE_UINT: \ + *(uint32_t *)(_v) = UINT32_MAX; \ + break; \ + case TSDB_DATA_TYPE_UBIGINT: \ + *(uint64_t *)(_v) = UINT64_MAX; \ + break; \ + default: \ + break; \ + } \ + } while (0) + #define NUM_TO_STRING(_inputType, _input, _outputBytes, _output) \ do { \ switch (_inputType) { \ diff --git a/include/libs/scalar/scalar.h b/include/libs/scalar/scalar.h index b047557424..df3ce23949 100644 --- a/include/libs/scalar/scalar.h +++ b/include/libs/scalar/scalar.h @@ -98,6 +98,7 @@ int32_t qTbnameFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pO /* Aggregation functions */ int32_t countScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput); int32_t sumScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput); +int32_t minScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput); #ifdef __cplusplus } diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index 8226eccf14..81fc9c8254 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -1918,6 +1918,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .getEnvFunc = getMinmaxFuncEnv, .initFunc = minmaxFunctionSetup, .processFunc = minFunction, + .sprocessFunc = minScalarFunction, .finalizeFunc = minmaxFunctionFinalize, .combineFunc = minCombine, .pPartialFunc = "min", diff --git a/source/libs/scalar/src/sclfunc.c b/source/libs/scalar/src/sclfunc.c index ce4631b8ee..10b8b035bc 100644 --- a/source/libs/scalar/src/sclfunc.c +++ b/source/libs/scalar/src/sclfunc.c @@ -1788,3 +1788,105 @@ int32_t sumScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam * pOutput->numOfRows = pInput->numOfRows; return TSDB_CODE_SUCCESS; } + +int32_t minScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) { + SColumnInfoData *pInputData = pInput->columnData; + SColumnInfoData *pOutputData = pOutput->columnData; + + int32_t type = GET_PARAM_TYPE(pInput); + SET_TYPED_DATA_MAX(pOutputData->pData, type); + + for (int32_t i = 0; i < pInput->numOfRows; ++i) { + if (colDataIsNull_s(pInputData, i)) { + colDataAppendNULL(pOutputData, i); + break; + } + + switch(type) { + case TSDB_DATA_TYPE_BOOL: + case TSDB_DATA_TYPE_TINYINT: { + int8_t *in = (int8_t *)pInputData->pData; + int8_t *out = (int8_t *)pOutputData->pData; + if(in[i] < *out) { + *out = in[i]; + } + break; + } + case TSDB_DATA_TYPE_SMALLINT: { + int16_t *in = (int16_t *)pInputData->pData; + int16_t *out = (int16_t *)pOutputData->pData; + if(in[i] < *out) { + *out = in[i]; + } + break; + } + case TSDB_DATA_TYPE_INT: { + int32_t *in = (int32_t *)pInputData->pData; + int32_t *out = (int32_t *)pOutputData->pData; + if(in[i] < *out) { + *out = in[i]; + } + break; + } + case TSDB_DATA_TYPE_BIGINT: { + int64_t *in = (int64_t *)pInputData->pData; + int64_t *out = (int64_t *)pOutputData->pData; + if(in[i] < *out) { + *out = in[i]; + } + break; + } + case TSDB_DATA_TYPE_UTINYINT: { + uint8_t *in = (uint8_t *)pInputData->pData; + uint8_t *out = (uint8_t *)pOutputData->pData; + if(in[i] < *out) { + *out = in[i]; + } + break; + } + case TSDB_DATA_TYPE_USMALLINT: { + uint16_t *in = (uint16_t *)pInputData->pData; + uint16_t *out = (uint16_t *)pOutputData->pData; + if(in[i] < *out) { + *out = in[i]; + } + break; + } + case TSDB_DATA_TYPE_UINT: { + uint32_t *in = (uint32_t *)pInputData->pData; + uint32_t *out = (uint32_t *)pOutputData->pData; + if(in[i] < *out) { + *out = in[i]; + } + break; + } + case TSDB_DATA_TYPE_UBIGINT: { + uint64_t *in = (uint64_t *)pInputData->pData; + uint64_t *out = (uint64_t *)pOutputData->pData; + if(in[i] < *out) { + *out = in[i]; + } + break; + } + case TSDB_DATA_TYPE_FLOAT: { + float *in = (float *)pInputData->pData; + float *out = (float *)pOutputData->pData; + if(in[i] < *out) { + *out = in[i]; + } + break; + } + case TSDB_DATA_TYPE_DOUBLE: { + double *in = (double *)pInputData->pData; + double *out = (double *)pOutputData->pData; + if(in[i] < *out) { + *out = in[i]; + } + break; + } + } + } + + pOutput->numOfRows = pInput->numOfRows; + return TSDB_CODE_SUCCESS; +} From 3f31c464f0c12ef83acb75a79c4c7c398ff1b810 Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Wed, 13 Jul 2022 19:46:56 +0800 Subject: [PATCH 6/9] feat(query): add max function scalar version TD-17344 --- include/libs/scalar/scalar.h | 1 + source/libs/function/src/builtins.c | 1 + source/libs/scalar/src/sclfunc.c | 37 +++++++++++++++++++---------- 3 files changed, 27 insertions(+), 12 deletions(-) diff --git a/include/libs/scalar/scalar.h b/include/libs/scalar/scalar.h index df3ce23949..ab0a4f8529 100644 --- a/include/libs/scalar/scalar.h +++ b/include/libs/scalar/scalar.h @@ -99,6 +99,7 @@ int32_t qTbnameFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pO int32_t countScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput); int32_t sumScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput); int32_t minScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput); +int32_t maxScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput); #ifdef __cplusplus } diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index 81fc9c8254..e7795dcddc 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -1933,6 +1933,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .getEnvFunc = getMinmaxFuncEnv, .initFunc = minmaxFunctionSetup, .processFunc = maxFunction, + .sprocessFunc = maxScalarFunction, .finalizeFunc = minmaxFunctionFinalize, .combineFunc = maxCombine, .pPartialFunc = "max", diff --git a/source/libs/scalar/src/sclfunc.c b/source/libs/scalar/src/sclfunc.c index 10b8b035bc..cfdde85556 100644 --- a/source/libs/scalar/src/sclfunc.c +++ b/source/libs/scalar/src/sclfunc.c @@ -1789,12 +1789,17 @@ int32_t sumScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam * return TSDB_CODE_SUCCESS; } -int32_t minScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) { +static int32_t doMinMaxScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput, bool isMinFunc) { SColumnInfoData *pInputData = pInput->columnData; SColumnInfoData *pOutputData = pOutput->columnData; int32_t type = GET_PARAM_TYPE(pInput); - SET_TYPED_DATA_MAX(pOutputData->pData, type); + + if (isMinFunc) { + SET_TYPED_DATA_MAX(pOutputData->pData, type); + } else { + SET_TYPED_DATA_MIN(pOutputData->pData, type); + } for (int32_t i = 0; i < pInput->numOfRows; ++i) { if (colDataIsNull_s(pInputData, i)) { @@ -1807,7 +1812,7 @@ int32_t minScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam * case TSDB_DATA_TYPE_TINYINT: { int8_t *in = (int8_t *)pInputData->pData; int8_t *out = (int8_t *)pOutputData->pData; - if(in[i] < *out) { + if((in[i] > *out) ^ isMinFunc) { *out = in[i]; } break; @@ -1815,7 +1820,7 @@ int32_t minScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam * case TSDB_DATA_TYPE_SMALLINT: { int16_t *in = (int16_t *)pInputData->pData; int16_t *out = (int16_t *)pOutputData->pData; - if(in[i] < *out) { + if((in[i] > *out) ^ isMinFunc) { *out = in[i]; } break; @@ -1823,7 +1828,7 @@ int32_t minScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam * case TSDB_DATA_TYPE_INT: { int32_t *in = (int32_t *)pInputData->pData; int32_t *out = (int32_t *)pOutputData->pData; - if(in[i] < *out) { + if((in[i] > *out) ^ isMinFunc) { *out = in[i]; } break; @@ -1831,7 +1836,7 @@ int32_t minScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam * case TSDB_DATA_TYPE_BIGINT: { int64_t *in = (int64_t *)pInputData->pData; int64_t *out = (int64_t *)pOutputData->pData; - if(in[i] < *out) { + if((in[i] > *out) ^ isMinFunc) { *out = in[i]; } break; @@ -1839,7 +1844,7 @@ int32_t minScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam * case TSDB_DATA_TYPE_UTINYINT: { uint8_t *in = (uint8_t *)pInputData->pData; uint8_t *out = (uint8_t *)pOutputData->pData; - if(in[i] < *out) { + if((in[i] > *out) ^ isMinFunc) { *out = in[i]; } break; @@ -1847,7 +1852,7 @@ int32_t minScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam * case TSDB_DATA_TYPE_USMALLINT: { uint16_t *in = (uint16_t *)pInputData->pData; uint16_t *out = (uint16_t *)pOutputData->pData; - if(in[i] < *out) { + if((in[i] > *out) ^ isMinFunc) { *out = in[i]; } break; @@ -1855,7 +1860,7 @@ int32_t minScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam * case TSDB_DATA_TYPE_UINT: { uint32_t *in = (uint32_t *)pInputData->pData; uint32_t *out = (uint32_t *)pOutputData->pData; - if(in[i] < *out) { + if((in[i] > *out) ^ isMinFunc) { *out = in[i]; } break; @@ -1863,7 +1868,7 @@ int32_t minScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam * case TSDB_DATA_TYPE_UBIGINT: { uint64_t *in = (uint64_t *)pInputData->pData; uint64_t *out = (uint64_t *)pOutputData->pData; - if(in[i] < *out) { + if((in[i] > *out) ^ isMinFunc) { *out = in[i]; } break; @@ -1871,7 +1876,7 @@ int32_t minScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam * case TSDB_DATA_TYPE_FLOAT: { float *in = (float *)pInputData->pData; float *out = (float *)pOutputData->pData; - if(in[i] < *out) { + if((in[i] > *out) ^ isMinFunc) { *out = in[i]; } break; @@ -1879,7 +1884,7 @@ int32_t minScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam * case TSDB_DATA_TYPE_DOUBLE: { double *in = (double *)pInputData->pData; double *out = (double *)pOutputData->pData; - if(in[i] < *out) { + if((in[i] > *out) ^ isMinFunc) { *out = in[i]; } break; @@ -1890,3 +1895,11 @@ int32_t minScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam * pOutput->numOfRows = pInput->numOfRows; return TSDB_CODE_SUCCESS; } + +int32_t minScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) { + return doMinMaxScalarFunction(pInput, inputNum, pOutput, true); +} + +int32_t maxScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) { + return doMinMaxScalarFunction(pInput, inputNum, pOutput, false); +} From 29ff569191d0dc89e85d10a684b7391327904ca1 Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Thu, 14 Jul 2022 14:04:06 +0800 Subject: [PATCH 7/9] enable scalar function check to prevent crash --- source/libs/scalar/src/scalar.c | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/source/libs/scalar/src/scalar.c b/source/libs/scalar/src/scalar.c index b3bd989eb4..bbb7e07bad 100644 --- a/source/libs/scalar/src/scalar.c +++ b/source/libs/scalar/src/scalar.c @@ -700,9 +700,9 @@ EDealRes sclRewriteNonConstOperator(SNode** pNode, SScalarCtx *ctx) { EDealRes sclRewriteFunction(SNode** pNode, SScalarCtx *ctx) { SFunctionNode *node = (SFunctionNode *)*pNode; SNode* tnode = NULL; - //if (!fmIsScalarFunc(node->funcId)) { - // return DEAL_RES_CONTINUE; - //} + if (!fmIsScalarFunc(node->funcId)) { + return DEAL_RES_CONTINUE; + } FOREACH(tnode, node->pParameterList) { if (!SCL_IS_CONST_NODE(tnode)) { From 162fbbb2a2b8cee45df881955ca2ee2702db4987 Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Thu, 14 Jul 2022 16:06:14 +0800 Subject: [PATCH 8/9] comment out unstable test cases --- tests/system-test/fulltest.sh | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/system-test/fulltest.sh b/tests/system-test/fulltest.sh index 2c116113bc..ef4ac83bb7 100755 --- a/tests/system-test/fulltest.sh +++ b/tests/system-test/fulltest.sh @@ -22,7 +22,7 @@ python3 ./test.py -f 1-insert/test_stmt_muti_insert_query.py python3 ./test.py -f 1-insert/test_stmt_set_tbname_tag.py python3 ./test.py -f 1-insert/alter_stable.py python3 ./test.py -f 1-insert/alter_table.py -python3 ./test.py -f 1-insert/insertWithMoreVgroup.py +#python3 ./test.py -f 1-insert/insertWithMoreVgroup.py python3 ./test.py -f 1-insert/table_comment.py python3 ./test.py -f 1-insert/time_range_wise.py python3 ./test.py -f 1-insert/block_wise.py @@ -120,7 +120,7 @@ python3 ./test.py -f 2-query/irate.py python3 ./test.py -f 2-query/and_or_for_byte.py python3 ./test.py -f 2-query/count_partition.py python3 ./test.py -f 2-query/function_null.py -python3 ./test.py -f 2-query/queryQnode.py +#python3 ./test.py -f 2-query/queryQnode.py python3 ./test.py -f 2-query/max_partition.py From 00f3cac9fc197e2706d6c93e947b29c0938800bb Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Thu, 14 Jul 2022 16:41:29 +0800 Subject: [PATCH 9/9] restore test cases --- tests/system-test/fulltest.sh | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/system-test/fulltest.sh b/tests/system-test/fulltest.sh index ef4ac83bb7..2c116113bc 100755 --- a/tests/system-test/fulltest.sh +++ b/tests/system-test/fulltest.sh @@ -22,7 +22,7 @@ python3 ./test.py -f 1-insert/test_stmt_muti_insert_query.py python3 ./test.py -f 1-insert/test_stmt_set_tbname_tag.py python3 ./test.py -f 1-insert/alter_stable.py python3 ./test.py -f 1-insert/alter_table.py -#python3 ./test.py -f 1-insert/insertWithMoreVgroup.py +python3 ./test.py -f 1-insert/insertWithMoreVgroup.py python3 ./test.py -f 1-insert/table_comment.py python3 ./test.py -f 1-insert/time_range_wise.py python3 ./test.py -f 1-insert/block_wise.py @@ -120,7 +120,7 @@ python3 ./test.py -f 2-query/irate.py python3 ./test.py -f 2-query/and_or_for_byte.py python3 ./test.py -f 2-query/count_partition.py python3 ./test.py -f 2-query/function_null.py -#python3 ./test.py -f 2-query/queryQnode.py +python3 ./test.py -f 2-query/queryQnode.py python3 ./test.py -f 2-query/max_partition.py