diff --git a/source/libs/function/inc/builtinsimpl.h b/source/libs/function/inc/builtinsimpl.h index 607ab00e7d..e3b7127efe 100644 --- a/source/libs/function/inc/builtinsimpl.h +++ b/source/libs/function/inc/builtinsimpl.h @@ -92,6 +92,7 @@ int32_t histogramFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); bool getStateFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); bool stateFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo); int32_t stateCountFunction(SqlFunctionCtx* pCtx); +int32_t stateDurationFunction(SqlFunctionCtx* pCtx); bool getSelectivityFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv); diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index a94777c7f2..07ad0f7d1f 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -263,7 +263,7 @@ static int32_t translateHistogram(SFunctionNode* pFunc, char* pErrBuf, int32_t l return TSDB_CODE_SUCCESS; } -static int32_t translateState(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { +static int32_t translateStateCount(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { if (3 != LIST_LENGTH(pFunc->pParameterList)) { return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName); } @@ -283,6 +283,31 @@ static int32_t translateState(SFunctionNode* pFunc, char* pErrBuf, int32_t len) return TSDB_CODE_SUCCESS; } +static int32_t translateStateDuration(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { + int32_t paraNum = LIST_LENGTH(pFunc->pParameterList); + if (3 != paraNum && 4 != paraNum) { + return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName); + } + + uint8_t colType = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 0))->resType.type; + if (!IS_NUMERIC_TYPE(colType)) { + return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName); + } + + if (((SExprNode*)nodesListGetNode(pFunc->pParameterList, 1))->resType.type != TSDB_DATA_TYPE_BINARY || + (((SExprNode*)nodesListGetNode(pFunc->pParameterList, 2))->resType.type != TSDB_DATA_TYPE_BIGINT && + ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 2))->resType.type != TSDB_DATA_TYPE_DOUBLE)) { + return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName); + } + + if (paraNum == 4 && ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 3))->resType.type != TSDB_DATA_TYPE_BIGINT) { + return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName); + } + + pFunc->node.resType = (SDataType) { .bytes = tDataTypes[TSDB_DATA_TYPE_BIGINT].bytes, .type = TSDB_DATA_TYPE_BIGINT }; + return TSDB_CODE_SUCCESS; +} + static int32_t translateLastRow(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { // todo return TSDB_CODE_SUCCESS; @@ -701,12 +726,22 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .name = "state_count", .type = FUNCTION_TYPE_STATE_COUNT, .classification = FUNC_MGT_NONSTANDARD_SQL_FUNC, - .translateFunc = translateState, + .translateFunc = translateStateCount, .getEnvFunc = getStateFuncEnv, .initFunc = functionSetup, .processFunc = stateCountFunction, .finalizeFunc = NULL }, + { + .name = "state_duration", + .type = FUNCTION_TYPE_STATE_DURATION, + .classification = FUNC_MGT_NONSTANDARD_SQL_FUNC | FUNC_MGT_TIMELINE_FUNC, + .translateFunc = translateStateDuration, + .getEnvFunc = getStateFuncEnv, + .initFunc = functionSetup, + .processFunc = stateDurationFunction, + .finalizeFunc = NULL + }, { .name = "abs", .type = FUNCTION_TYPE_ABS, diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index c255c55d4e..efc2992075 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -125,7 +125,10 @@ typedef enum { } EHistoBinType; typedef struct SStateInfo { - int64_t count; + union { + int64_t count; + int64_t durationStart; + }; } SStateInfo; typedef enum { @@ -2841,3 +2844,52 @@ int32_t stateCountFunction(SqlFunctionCtx* pCtx) { return numOfElems; } + +int32_t stateDurationFunction(SqlFunctionCtx* pCtx) { + SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx); + SStateInfo* pInfo = GET_ROWCELL_INTERBUF(pResInfo); + + SInputColumnInfoData* pInput = &pCtx->input; + TSKEY* tsList = (int64_t*)pInput->pPTS->pData; + + SColumnInfoData* pInputCol = pInput->pData[0]; + SColumnInfoData* pTsOutput = pCtx->pTsOutput; + + int32_t numOfElems = 0; + SColumnInfoData* pOutput = (SColumnInfoData*)pCtx->pOutput; + + //TODO: process timeUnit for different db precisions + int32_t timeUnit = 1000; + if (pCtx->numOfParams == 5) { //TODO: param number incorrect + timeUnit = pCtx->param[3].param.i; + } + + int8_t op = getStateOpType(varDataVal(pCtx->param[1].param.pz)); + if (STATE_OPER_INVALID == op) { + return 0; + } + + for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; i += 1) { + numOfElems++; + if (colDataIsNull_f(pInputCol->nullbitmap, i)) { + colDataAppendNULL(pOutput, i); + continue; + } + + bool ret = checkStateOp(op, pInputCol, i, pCtx->param[2].param); + int64_t output = -1; + if (ret) { + if (pInfo->durationStart == 0) { + output = 0; + pInfo->durationStart = tsList[i]; + } else { + output = (tsList[i] - pInfo->durationStart) / timeUnit; + } + } else { + pInfo->durationStart = 0; + } + colDataAppend(pOutput, i, (char *)&output, false); + } + + return numOfElems; +}