diff --git a/include/libs/function/functionMgt.h b/include/libs/function/functionMgt.h index d77b914fae..03e11b0502 100644 --- a/include/libs/function/functionMgt.h +++ b/include/libs/function/functionMgt.h @@ -131,6 +131,8 @@ typedef enum EFunctionType { FUNCTION_TYPE_HISTOGRAM_MERGE, FUNCTION_TYPE_HYPERLOGLOG_PARTIAL, FUNCTION_TYPE_HYPERLOGLOG_MERGE, + FUNCTION_TYPE_ELAPSED_PARTIAL, + FUNCTION_TYPE_ELAPSED_MERGE, // user defined funcion FUNCTION_TYPE_UDF = 10000 diff --git a/source/libs/function/inc/builtinsimpl.h b/source/libs/function/inc/builtinsimpl.h index 8313b0c635..70ae325dc8 100644 --- a/source/libs/function/inc/builtinsimpl.h +++ b/source/libs/function/inc/builtinsimpl.h @@ -116,7 +116,10 @@ int32_t getSpreadInfoSize(); bool getElapsedFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); bool elapsedFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo); int32_t elapsedFunction(SqlFunctionCtx* pCtx); +int32_t elapsedFunctionMerge(SqlFunctionCtx* pCtx); int32_t elapsedFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); +int32_t elapsedPartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); +int32_t getElapsedInfoSize(); bool getHistogramFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); bool histogramFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo); diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index 8f553f541b..97eacf79c2 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -444,6 +444,64 @@ static int32_t translateElapsed(SFunctionNode* pFunc, char* pErrBuf, int32_t len return TSDB_CODE_SUCCESS; } +static int32_t translateElapsedImpl(SFunctionNode* pFunc, char* pErrBuf, int32_t len, bool isPartial) { + int32_t numOfParams = LIST_LENGTH(pFunc->pParameterList); + + if (isPartial) { + if (1 != numOfParams && 2 != numOfParams) { + return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName); + } + + uint8_t paraType = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 0))->resType.type; + if (TSDB_DATA_TYPE_TIMESTAMP != paraType) { + return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName); + } + + // param1 + if (2 == numOfParams) { + SNode* pParamNode1 = nodesListGetNode(pFunc->pParameterList, 1); + if (QUERY_NODE_VALUE != nodeType(pParamNode1)) { + return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName); + } + + SValueNode* pValue = (SValueNode*)pParamNode1; + + pValue->notReserved = true; + + paraType = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 1))->resType.type; + if (!IS_INTEGER_TYPE(paraType)) { + return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName); + } + + if (pValue->datum.i == 0) { + return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR, + "ELAPSED function time unit parameter should be greater than db precision"); + } + } + + pFunc->node.resType = (SDataType){.bytes = getElapsedInfoSize() + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY}; + } else { + if (1 != numOfParams) { + return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName); + } + + uint8_t paraType = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 0))->resType.type; + if (TSDB_DATA_TYPE_BINARY != paraType) { + return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName); + } + pFunc->node.resType = (SDataType){.bytes = tDataTypes[TSDB_DATA_TYPE_DOUBLE].bytes, .type = TSDB_DATA_TYPE_DOUBLE}; + } + return TSDB_CODE_SUCCESS; +} + +static int32_t translateElapsedPartial(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { + return translateElapsedImpl(pFunc, pErrBuf, len, true); +} + +static int32_t translateElapsedMerge(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { + return translateElapsedImpl(pFunc, pErrBuf, len, false); +} + static int32_t translateLeastSQR(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { int32_t numOfParams = LIST_LENGTH(pFunc->pParameterList); if (3 != numOfParams) { @@ -1468,6 +1526,30 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .getEnvFunc = getElapsedFuncEnv, .initFunc = elapsedFunctionSetup, .processFunc = elapsedFunction, + .finalizeFunc = elapsedFinalize, + .pPartialFunc = "_elapsed_partial", + .pMergeFunc = "_elapsed_merge" + }, + { + .name = "_elapsed_partial", + .type = FUNCTION_TYPE_ELAPSED, + .classification = FUNC_MGT_AGG_FUNC, + .dataRequiredFunc = statisDataRequired, + .translateFunc = translateElapsedPartial, + .getEnvFunc = getElapsedFuncEnv, + .initFunc = elapsedFunctionSetup, + .processFunc = elapsedFunction, + .finalizeFunc = elapsedPartialFinalize + }, + { + .name = "_elapsed_merge", + .type = FUNCTION_TYPE_ELAPSED, + .classification = FUNC_MGT_AGG_FUNC, + .dataRequiredFunc = statisDataRequired, + .translateFunc = translateElapsedMerge, + .getEnvFunc = getElapsedFuncEnv, + .initFunc = elapsedFunctionSetup, + .processFunc = elapsedFunctionMerge, .finalizeFunc = elapsedFinalize }, { diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index 1ec41a59f2..35f72ac655 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -3101,6 +3101,10 @@ int32_t spreadPartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { return pResInfo->numOfRes; } +int32_t getElapsedInfoSize() { + return (int32_t)sizeof(SElapsedInfo); +} + bool getElapsedFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) { pEnv->calcMemSize = sizeof(SElapsedInfo); return true; @@ -3202,6 +3206,30 @@ _elapsed_over: return TSDB_CODE_SUCCESS; } +int32_t elapsedFunctionMerge(SqlFunctionCtx *pCtx) { + SInputColumnInfoData* pInput = &pCtx->input; + SColumnInfoData* pCol = pInput->pData[0]; + ASSERT(pCol->info.type == TSDB_DATA_TYPE_BINARY); + + SElapsedInfo* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); + + int32_t start = pInput->startRowIndex; + char* data = colDataGetData(pCol, start); + SElapsedInfo* pInputInfo = (SElapsedInfo *)varDataVal(data); + + pInfo->timeUnit = pInputInfo->timeUnit; + if (pInfo->min > pInputInfo->min) { + pInfo->min = pInputInfo->min; + } + + if (pInfo->max < pInputInfo->max) { + pInfo->max = pInputInfo->max; + } + + SET_VAL(GET_RES_INFO(pCtx), 1, 1); + return TSDB_CODE_SUCCESS; +} + int32_t elapsedFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { SElapsedInfo* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); double result = (double)pInfo->max - (double)pInfo->min; @@ -3210,6 +3238,24 @@ int32_t elapsedFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { return functionFinalize(pCtx, pBlock); } +int32_t elapsedPartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { + SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx); + SElapsedInfo* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); + int32_t resultBytes = getElapsedInfoSize(); + char *res = taosMemoryCalloc(resultBytes + VARSTR_HEADER_SIZE, sizeof(char)); + + memcpy(varDataVal(res), pInfo, resultBytes); + varDataSetLen(res, resultBytes); + + int32_t slotId = pCtx->pExpr->base.resSchema.slotId; + SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId); + + colDataAppend(pCol, pBlock->info.rows, res, false); + + taosMemoryFree(res); + return pResInfo->numOfRes; +} + int32_t getHistogramInfoSize() { return (int32_t)sizeof(SHistoFuncInfo) + HISTOGRAM_MAX_BINS_NUM * sizeof(SHistoFuncBin); }