diff --git a/include/libs/scalar/scalar.h b/include/libs/scalar/scalar.h index dfdb69ee3c..9bc6aa8351 100644 --- a/include/libs/scalar/scalar.h +++ b/include/libs/scalar/scalar.h @@ -114,6 +114,7 @@ int32_t mavgScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam int32_t hllScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput); int32_t csumScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput); int32_t diffScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput); +int32_t stateCountScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput); #ifdef __cplusplus } diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index 10e0808c4d..c114a45fbd 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -2419,6 +2419,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .getEnvFunc = getStateFuncEnv, .initFunc = functionSetup, .processFunc = stateCountFunction, + .sprocessFunc = stateCountScalarFunction, .finalizeFunc = NULL }, { diff --git a/source/libs/scalar/src/sclfunc.c b/source/libs/scalar/src/sclfunc.c index 3f26cd46f8..5097453832 100644 --- a/source/libs/scalar/src/sclfunc.c +++ b/source/libs/scalar/src/sclfunc.c @@ -2427,3 +2427,153 @@ int32_t hllScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam * int32_t csumScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) { return sumScalarFunction(pInput, inputNum, pOutput); } + +typedef enum { + STATE_OPER_INVALID = 0, + STATE_OPER_LT, + STATE_OPER_GT, + STATE_OPER_LE, + STATE_OPER_GE, + STATE_OPER_NE, + STATE_OPER_EQ, +} EStateOperType; + +#define STATE_COMP(_op, _lval, _rval, _rtype) STATE_COMP_IMPL(_op, _lval, GET_STATE_VAL(_rval, _rtype)) + +#define GET_STATE_VAL(_val, _type) ((_type == TSDB_DATA_TYPE_BIGINT) ? (*(int64_t *)_val) : (*(double *)_val)) + +#define STATE_COMP_IMPL(_op, _lval, _rval) \ + do { \ + switch (_op) { \ + case STATE_OPER_LT: \ + return ((_lval) < (_rval)); \ + break; \ + case STATE_OPER_GT: \ + return ((_lval) > (_rval)); \ + break; \ + case STATE_OPER_LE: \ + return ((_lval) <= (_rval)); \ + break; \ + case STATE_OPER_GE: \ + return ((_lval) >= (_rval)); \ + break; \ + case STATE_OPER_NE: \ + return ((_lval) != (_rval)); \ + break; \ + case STATE_OPER_EQ: \ + return ((_lval) == (_rval)); \ + break; \ + default: \ + break; \ + } \ + } while (0) + +static int8_t getStateOpType(char* opStr) { + int8_t opType; + if (strcasecmp(opStr, "LT") == 0) { + opType = STATE_OPER_LT; + } else if (strcasecmp(opStr, "GT") == 0) { + opType = STATE_OPER_GT; + } else if (strcasecmp(opStr, "LE") == 0) { + opType = STATE_OPER_LE; + } else if (strcasecmp(opStr, "GE") == 0) { + opType = STATE_OPER_GE; + } else if (strcasecmp(opStr, "NE") == 0) { + opType = STATE_OPER_NE; + } else if (strcasecmp(opStr, "EQ") == 0) { + opType = STATE_OPER_EQ; + } else { + opType = STATE_OPER_INVALID; + } + + return opType; +} + +static bool checkStateOp(int8_t op, SColumnInfoData* pCol, int32_t index, SScalarParam *pCondParam) { + char* data = colDataGetData(pCol, index); + char* param = pCondParam->columnData->pData; + int32_t paramType = GET_PARAM_TYPE(pCondParam); + switch (pCol->info.type) { + case TSDB_DATA_TYPE_TINYINT: { + int8_t v = *(int8_t*)data; + STATE_COMP(op, v, param, paramType); + break; + } + case TSDB_DATA_TYPE_UTINYINT: { + uint8_t v = *(uint8_t*)data; + STATE_COMP(op, v, param, paramType); + break; + } + case TSDB_DATA_TYPE_SMALLINT: { + int16_t v = *(int16_t*)data; + STATE_COMP(op, v, param, paramType); + break; + } + case TSDB_DATA_TYPE_USMALLINT: { + uint16_t v = *(uint16_t*)data; + STATE_COMP(op, v, param, paramType); + break; + } + case TSDB_DATA_TYPE_INT: { + int32_t v = *(int32_t*)data; + STATE_COMP(op, v, param, paramType); + break; + } + case TSDB_DATA_TYPE_UINT: { + uint32_t v = *(uint32_t*)data; + STATE_COMP(op, v, param, paramType); + break; + } + case TSDB_DATA_TYPE_BIGINT: { + int64_t v = *(int64_t*)data; + STATE_COMP(op, v, param, paramType); + break; + } + case TSDB_DATA_TYPE_UBIGINT: { + uint64_t v = *(uint64_t*)data; + STATE_COMP(op, v, param, paramType); + break; + } + case TSDB_DATA_TYPE_FLOAT: { + float v = *(float*)data; + STATE_COMP(op, v, param, paramType); + break; + } + case TSDB_DATA_TYPE_DOUBLE: { + double v = *(double*)data; + STATE_COMP(op, v, param, paramType); + break; + } + default: { + ASSERT(0); + } + } + return false; +} + +int32_t stateCountScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) { + SColumnInfoData *pInputData = pInput->columnData; + SColumnInfoData *pOutputData = pOutput->columnData; + + int8_t op = getStateOpType(varDataVal(pInput[1].columnData->pData)); + int64_t count = 0; + + for (int32_t i = 0; i < pInput->numOfRows; ++i) { + if (colDataIsNull_s(pInputData, i)) { + colDataAppendNULL(pOutputData, i); + continue; + } + + bool ret = checkStateOp(op, pInputData, i, &pInput[2]); + int64_t out = -1; + if (ret) { + out = ++count; + } else { + count = 0; + } + colDataAppend(pOutputData, i, (char*)&out, false); + } + + pOutput->numOfRows = pInput->numOfRows; + return TSDB_CODE_SUCCESS; +}