feat(query): add state_count function
This commit is contained in:
parent
7213548847
commit
85adb8611c
|
@ -55,6 +55,8 @@ typedef enum EFunctionType {
|
||||||
FUNCTION_TYPE_TAIL,
|
FUNCTION_TYPE_TAIL,
|
||||||
FUNCTION_TYPE_TOP,
|
FUNCTION_TYPE_TOP,
|
||||||
FUNCTION_TYPE_UNIQUE,
|
FUNCTION_TYPE_UNIQUE,
|
||||||
|
FUNCTION_TYPE_STATE_COUNT,
|
||||||
|
FUNCTION_TYPE_STATE_DURATION,
|
||||||
|
|
||||||
// math function
|
// math function
|
||||||
FUNCTION_TYPE_ABS = 1000,
|
FUNCTION_TYPE_ABS = 1000,
|
||||||
|
|
|
@ -88,6 +88,10 @@ bool histogramFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultIn
|
||||||
int32_t histogramFunction(SqlFunctionCtx* pCtx);
|
int32_t histogramFunction(SqlFunctionCtx* pCtx);
|
||||||
int32_t histogramFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
|
int32_t histogramFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
|
||||||
|
|
||||||
|
bool getStateFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
|
||||||
|
bool stateFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo);
|
||||||
|
int32_t stateCountFunction(SqlFunctionCtx* pCtx);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -262,6 +262,26 @@ static int32_t translateHistogram(SFunctionNode* pFunc, char* pErrBuf, int32_t l
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t translateState(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
|
||||||
|
if (3 != LIST_LENGTH(pFunc->pParameterList)) {
|
||||||
|
return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName);
|
||||||
|
}
|
||||||
|
|
||||||
|
uint8_t colType = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 0))->resType.type;
|
||||||
|
if (!IS_NUMERIC_TYPE(colType)) {
|
||||||
|
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (((SExprNode*)nodesListGetNode(pFunc->pParameterList, 1))->resType.type != TSDB_DATA_TYPE_BINARY ||
|
||||||
|
(((SExprNode*)nodesListGetNode(pFunc->pParameterList, 2))->resType.type != TSDB_DATA_TYPE_BIGINT &&
|
||||||
|
((SExprNode*)nodesListGetNode(pFunc->pParameterList, 2))->resType.type != TSDB_DATA_TYPE_DOUBLE)) {
|
||||||
|
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
|
||||||
|
}
|
||||||
|
|
||||||
|
pFunc->node.resType = (SDataType) { .bytes = tDataTypes[TSDB_DATA_TYPE_BIGINT].bytes, .type = TSDB_DATA_TYPE_BIGINT };
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t translateLastRow(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
|
static int32_t translateLastRow(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
|
||||||
// todo
|
// todo
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
@ -675,6 +695,16 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
||||||
.processFunc = histogramFunction,
|
.processFunc = histogramFunction,
|
||||||
.finalizeFunc = histogramFinalize
|
.finalizeFunc = histogramFinalize
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
.name = "state_count",
|
||||||
|
.type = FUNCTION_TYPE_STATE_COUNT,
|
||||||
|
.classification = FUNC_MGT_NONSTANDARD_SQL_FUNC,
|
||||||
|
.translateFunc = translateState,
|
||||||
|
.getEnvFunc = getStateFuncEnv,
|
||||||
|
.initFunc = functionSetup,
|
||||||
|
.processFunc = stateCountFunction,
|
||||||
|
.finalizeFunc = NULL
|
||||||
|
},
|
||||||
{
|
{
|
||||||
.name = "abs",
|
.name = "abs",
|
||||||
.type = FUNCTION_TYPE_ABS,
|
.type = FUNCTION_TYPE_ABS,
|
||||||
|
|
|
@ -122,6 +122,19 @@ typedef enum {
|
||||||
LOG_BIN
|
LOG_BIN
|
||||||
} EHistoBinType;
|
} EHistoBinType;
|
||||||
|
|
||||||
|
typedef struct SStateInfo {
|
||||||
|
int64_t count;
|
||||||
|
} SStateInfo;
|
||||||
|
|
||||||
|
typedef enum {
|
||||||
|
STATE_OPER_INVALID = 0,
|
||||||
|
STATE_OPER_LT,
|
||||||
|
STATE_OPER_GT,
|
||||||
|
STATE_OPER_LE,
|
||||||
|
STATE_OPER_GE,
|
||||||
|
STATE_OPER_NE,
|
||||||
|
STATE_OPER_EQ,
|
||||||
|
} EStateOperType;
|
||||||
|
|
||||||
#define SET_VAL(_info, numOfElem, res) \
|
#define SET_VAL(_info, numOfElem, res) \
|
||||||
do { \
|
do { \
|
||||||
|
@ -2382,3 +2395,154 @@ int32_t histogramFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
||||||
|
|
||||||
return pResInfo->numOfRes;
|
return pResInfo->numOfRes;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bool getStateFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) {
|
||||||
|
pEnv->calcMemSize = sizeof(SStateInfo);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int8_t getStateOpType(char *opStr) {
|
||||||
|
int8_t opType;
|
||||||
|
if (strcasecmp(opStr, "LT") == 0) {
|
||||||
|
opType = STATE_OPER_LT;
|
||||||
|
} else if (strcasecmp(opStr, "GT") == 0) {
|
||||||
|
opType = STATE_OPER_GT;
|
||||||
|
} else if (strcasecmp(opStr, "LE") == 0) {
|
||||||
|
opType = STATE_OPER_LE;
|
||||||
|
} else if (strcasecmp(opStr, "GE") == 0) {
|
||||||
|
opType = STATE_OPER_GE;
|
||||||
|
} else if (strcasecmp(opStr, "NE") == 0) {
|
||||||
|
opType = STATE_OPER_NE;
|
||||||
|
} else if (strcasecmp(opStr, "EQ") == 0) {
|
||||||
|
opType = STATE_OPER_EQ;
|
||||||
|
} else {
|
||||||
|
opType = STATE_OPER_INVALID;
|
||||||
|
}
|
||||||
|
|
||||||
|
return opType;
|
||||||
|
}
|
||||||
|
|
||||||
|
#define GET_STATE_VAL(param) \
|
||||||
|
((param.nType == TSDB_DATA_TYPE_BIGINT) ? (param.i) : (param.d)) \
|
||||||
|
|
||||||
|
#define STATE_COMP(_op, _lval, _param) \
|
||||||
|
STATE_COMP_IMPL(_op, _lval, GET_STATE_VAL(_param)) \
|
||||||
|
|
||||||
|
#define STATE_COMP_IMPL(_op, _lval, _rval) \
|
||||||
|
do { \
|
||||||
|
switch(_op) { \
|
||||||
|
case STATE_OPER_LT: \
|
||||||
|
return ((_lval) < (_rval)); \
|
||||||
|
break; \
|
||||||
|
case STATE_OPER_GT: \
|
||||||
|
return ((_lval) > (_rval)); \
|
||||||
|
break; \
|
||||||
|
case STATE_OPER_LE: \
|
||||||
|
return ((_lval) <= (_rval)); \
|
||||||
|
break; \
|
||||||
|
case STATE_OPER_GE: \
|
||||||
|
return ((_lval) >= (_rval)); \
|
||||||
|
break; \
|
||||||
|
case STATE_OPER_NE: \
|
||||||
|
return ((_lval) != (_rval)); \
|
||||||
|
break; \
|
||||||
|
case STATE_OPER_EQ: \
|
||||||
|
return ((_lval) == (_rval)); \
|
||||||
|
break; \
|
||||||
|
default: \
|
||||||
|
break; \
|
||||||
|
} \
|
||||||
|
} while (0) \
|
||||||
|
|
||||||
|
static bool checkStateOp(int8_t op, SColumnInfoData* pCol, int32_t index, SVariant param) {
|
||||||
|
char* data = colDataGetData(pCol, index);
|
||||||
|
switch(pCol->info.type) {
|
||||||
|
case TSDB_DATA_TYPE_TINYINT: {
|
||||||
|
int8_t v = *(int8_t *)data;
|
||||||
|
STATE_COMP(op, v, param);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case TSDB_DATA_TYPE_UTINYINT: {
|
||||||
|
uint8_t v = *(uint8_t *)data;
|
||||||
|
STATE_COMP(op, v, param);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case TSDB_DATA_TYPE_SMALLINT: {
|
||||||
|
int16_t v = *(int16_t *)data;
|
||||||
|
STATE_COMP(op, v, param);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case TSDB_DATA_TYPE_USMALLINT: {
|
||||||
|
uint16_t v = *(uint16_t *)data;
|
||||||
|
STATE_COMP(op, v, param);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case TSDB_DATA_TYPE_INT: {
|
||||||
|
int32_t v = *(int32_t *)data;
|
||||||
|
STATE_COMP(op, v, param);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case TSDB_DATA_TYPE_UINT: {
|
||||||
|
uint32_t v = *(uint32_t *)data;
|
||||||
|
STATE_COMP(op, v, param);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case TSDB_DATA_TYPE_BIGINT: {
|
||||||
|
int64_t v = *(int64_t *)data;
|
||||||
|
STATE_COMP(op, v, param);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case TSDB_DATA_TYPE_UBIGINT: {
|
||||||
|
uint64_t v = *(uint64_t *)data;
|
||||||
|
STATE_COMP(op, v, param);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case TSDB_DATA_TYPE_FLOAT: {
|
||||||
|
float v = *(float *)data;
|
||||||
|
STATE_COMP(op, v, param);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case TSDB_DATA_TYPE_DOUBLE: {
|
||||||
|
double v = *(double *)data;
|
||||||
|
STATE_COMP(op, v, param);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
default: {
|
||||||
|
ASSERT(0);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t stateCountFunction(SqlFunctionCtx* pCtx) {
|
||||||
|
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
|
||||||
|
SStateInfo* pInfo = GET_ROWCELL_INTERBUF(pResInfo);
|
||||||
|
|
||||||
|
SInputColumnInfoData* pInput = &pCtx->input;
|
||||||
|
|
||||||
|
SColumnInfoData* pInputCol = pInput->pData[0];
|
||||||
|
SColumnInfoData* pTsOutput = pCtx->pTsOutput;
|
||||||
|
|
||||||
|
int32_t numOfElems = 0;
|
||||||
|
SColumnInfoData* pOutput = (SColumnInfoData*)pCtx->pOutput;
|
||||||
|
|
||||||
|
int8_t op = getStateOpType(varDataVal(pCtx->param[1].param.pz));
|
||||||
|
for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; i += 1) {
|
||||||
|
numOfElems++;
|
||||||
|
if (colDataIsNull_f(pInputCol->nullbitmap, i)) {
|
||||||
|
colDataAppendNULL(pOutput, i);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
bool ret = checkStateOp(op, pInputCol, i, pCtx->param[2].param);
|
||||||
|
int64_t output = -1;
|
||||||
|
if (ret) {
|
||||||
|
output = ++pInfo->count;
|
||||||
|
} else {
|
||||||
|
pInfo->count = 0;
|
||||||
|
}
|
||||||
|
colDataAppend(pOutput, i, (char *)&output, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
return numOfElems;
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in New Issue