Merge pull request #12343 from taosdata/feature/3.0_glzhao
feat(query): add state_duration function
This commit is contained in:
commit
93b0cd8047
|
@ -93,6 +93,7 @@ int32_t histogramFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
|
||||||
bool getStateFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
|
bool getStateFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
|
||||||
bool stateFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo);
|
bool stateFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo);
|
||||||
int32_t stateCountFunction(SqlFunctionCtx* pCtx);
|
int32_t stateCountFunction(SqlFunctionCtx* pCtx);
|
||||||
|
int32_t stateDurationFunction(SqlFunctionCtx* pCtx);
|
||||||
|
|
||||||
bool getSelectivityFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv);
|
bool getSelectivityFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv);
|
||||||
|
|
||||||
|
|
|
@ -263,7 +263,7 @@ static int32_t translateHistogram(SFunctionNode* pFunc, char* pErrBuf, int32_t l
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t translateState(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
|
static int32_t translateStateCount(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
|
||||||
if (3 != LIST_LENGTH(pFunc->pParameterList)) {
|
if (3 != LIST_LENGTH(pFunc->pParameterList)) {
|
||||||
return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName);
|
return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName);
|
||||||
}
|
}
|
||||||
|
@ -283,6 +283,31 @@ static int32_t translateState(SFunctionNode* pFunc, char* pErrBuf, int32_t len)
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t translateStateDuration(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
|
||||||
|
int32_t paraNum = LIST_LENGTH(pFunc->pParameterList);
|
||||||
|
if (3 != paraNum && 4 != paraNum) {
|
||||||
|
return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName);
|
||||||
|
}
|
||||||
|
|
||||||
|
uint8_t colType = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 0))->resType.type;
|
||||||
|
if (!IS_NUMERIC_TYPE(colType)) {
|
||||||
|
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (((SExprNode*)nodesListGetNode(pFunc->pParameterList, 1))->resType.type != TSDB_DATA_TYPE_BINARY ||
|
||||||
|
(((SExprNode*)nodesListGetNode(pFunc->pParameterList, 2))->resType.type != TSDB_DATA_TYPE_BIGINT &&
|
||||||
|
((SExprNode*)nodesListGetNode(pFunc->pParameterList, 2))->resType.type != TSDB_DATA_TYPE_DOUBLE)) {
|
||||||
|
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (paraNum == 4 && ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 3))->resType.type != TSDB_DATA_TYPE_BIGINT) {
|
||||||
|
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
|
||||||
|
}
|
||||||
|
|
||||||
|
pFunc->node.resType = (SDataType) { .bytes = tDataTypes[TSDB_DATA_TYPE_BIGINT].bytes, .type = TSDB_DATA_TYPE_BIGINT };
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t translateLastRow(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
|
static int32_t translateLastRow(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
|
||||||
// todo
|
// todo
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
@ -701,12 +726,22 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
||||||
.name = "state_count",
|
.name = "state_count",
|
||||||
.type = FUNCTION_TYPE_STATE_COUNT,
|
.type = FUNCTION_TYPE_STATE_COUNT,
|
||||||
.classification = FUNC_MGT_NONSTANDARD_SQL_FUNC,
|
.classification = FUNC_MGT_NONSTANDARD_SQL_FUNC,
|
||||||
.translateFunc = translateState,
|
.translateFunc = translateStateCount,
|
||||||
.getEnvFunc = getStateFuncEnv,
|
.getEnvFunc = getStateFuncEnv,
|
||||||
.initFunc = functionSetup,
|
.initFunc = functionSetup,
|
||||||
.processFunc = stateCountFunction,
|
.processFunc = stateCountFunction,
|
||||||
.finalizeFunc = NULL
|
.finalizeFunc = NULL
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
.name = "state_duration",
|
||||||
|
.type = FUNCTION_TYPE_STATE_DURATION,
|
||||||
|
.classification = FUNC_MGT_NONSTANDARD_SQL_FUNC | FUNC_MGT_TIMELINE_FUNC,
|
||||||
|
.translateFunc = translateStateDuration,
|
||||||
|
.getEnvFunc = getStateFuncEnv,
|
||||||
|
.initFunc = functionSetup,
|
||||||
|
.processFunc = stateDurationFunction,
|
||||||
|
.finalizeFunc = NULL
|
||||||
|
},
|
||||||
{
|
{
|
||||||
.name = "abs",
|
.name = "abs",
|
||||||
.type = FUNCTION_TYPE_ABS,
|
.type = FUNCTION_TYPE_ABS,
|
||||||
|
|
|
@ -125,7 +125,10 @@ typedef enum {
|
||||||
} EHistoBinType;
|
} EHistoBinType;
|
||||||
|
|
||||||
typedef struct SStateInfo {
|
typedef struct SStateInfo {
|
||||||
int64_t count;
|
union {
|
||||||
|
int64_t count;
|
||||||
|
int64_t durationStart;
|
||||||
|
};
|
||||||
} SStateInfo;
|
} SStateInfo;
|
||||||
|
|
||||||
typedef enum {
|
typedef enum {
|
||||||
|
@ -2844,3 +2847,52 @@ int32_t stateCountFunction(SqlFunctionCtx* pCtx) {
|
||||||
|
|
||||||
return numOfElems;
|
return numOfElems;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t stateDurationFunction(SqlFunctionCtx* pCtx) {
|
||||||
|
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
|
||||||
|
SStateInfo* pInfo = GET_ROWCELL_INTERBUF(pResInfo);
|
||||||
|
|
||||||
|
SInputColumnInfoData* pInput = &pCtx->input;
|
||||||
|
TSKEY* tsList = (int64_t*)pInput->pPTS->pData;
|
||||||
|
|
||||||
|
SColumnInfoData* pInputCol = pInput->pData[0];
|
||||||
|
SColumnInfoData* pTsOutput = pCtx->pTsOutput;
|
||||||
|
|
||||||
|
int32_t numOfElems = 0;
|
||||||
|
SColumnInfoData* pOutput = (SColumnInfoData*)pCtx->pOutput;
|
||||||
|
|
||||||
|
//TODO: process timeUnit for different db precisions
|
||||||
|
int32_t timeUnit = 1000;
|
||||||
|
if (pCtx->numOfParams == 5) { //TODO: param number incorrect
|
||||||
|
timeUnit = pCtx->param[3].param.i;
|
||||||
|
}
|
||||||
|
|
||||||
|
int8_t op = getStateOpType(varDataVal(pCtx->param[1].param.pz));
|
||||||
|
if (STATE_OPER_INVALID == op) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; i += 1) {
|
||||||
|
numOfElems++;
|
||||||
|
if (colDataIsNull_f(pInputCol->nullbitmap, i)) {
|
||||||
|
colDataAppendNULL(pOutput, i);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
bool ret = checkStateOp(op, pInputCol, i, pCtx->param[2].param);
|
||||||
|
int64_t output = -1;
|
||||||
|
if (ret) {
|
||||||
|
if (pInfo->durationStart == 0) {
|
||||||
|
output = 0;
|
||||||
|
pInfo->durationStart = tsList[i];
|
||||||
|
} else {
|
||||||
|
output = (tsList[i] - pInfo->durationStart) / timeUnit;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
pInfo->durationStart = 0;
|
||||||
|
}
|
||||||
|
colDataAppend(pOutput, i, (char *)&output, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
return numOfElems;
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in New Issue