feat(query): add mavg function
This commit is contained in:
parent
71e43677f7
commit
818e3636d6
|
@ -97,6 +97,9 @@ int32_t stateDurationFunction(SqlFunctionCtx* pCtx);
|
||||||
bool getCsumFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
|
bool getCsumFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
|
||||||
int32_t csumFunction(SqlFunctionCtx* pCtx);
|
int32_t csumFunction(SqlFunctionCtx* pCtx);
|
||||||
|
|
||||||
|
bool getMavgFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
|
||||||
|
int32_t mavgFunction(SqlFunctionCtx* pCtx);
|
||||||
|
|
||||||
bool getSelectivityFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv);
|
bool getSelectivityFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
|
|
|
@ -339,6 +339,27 @@ static int32_t translateCsum(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t translateMavg(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
|
||||||
|
if (2 != LIST_LENGTH(pFunc->pParameterList)) {
|
||||||
|
return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName);
|
||||||
|
}
|
||||||
|
|
||||||
|
SNode* pPara = nodesListGetNode(pFunc->pParameterList, 0);
|
||||||
|
if (QUERY_NODE_COLUMN != nodeType(pPara)) {
|
||||||
|
return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR,
|
||||||
|
"The input parameter of MAVG function can only be column");
|
||||||
|
}
|
||||||
|
|
||||||
|
uint8_t colType = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 0))->resType.type;
|
||||||
|
uint8_t paraType = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 1))->resType.type;
|
||||||
|
if (!IS_NUMERIC_TYPE(colType) || !IS_INTEGER_TYPE(paraType)) {
|
||||||
|
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
|
||||||
|
}
|
||||||
|
|
||||||
|
pFunc->node.resType = (SDataType){.bytes = tDataTypes[TSDB_DATA_TYPE_DOUBLE].bytes, .type = TSDB_DATA_TYPE_DOUBLE};
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t translateLastRow(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
|
static int32_t translateLastRow(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
|
||||||
// todo
|
// todo
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
@ -783,6 +804,16 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
||||||
.processFunc = csumFunction,
|
.processFunc = csumFunction,
|
||||||
.finalizeFunc = NULL
|
.finalizeFunc = NULL
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
.name = "mavg",
|
||||||
|
.type = FUNCTION_TYPE_MAVG,
|
||||||
|
.classification = FUNC_MGT_NONSTANDARD_SQL_FUNC | FUNC_MGT_TIMELINE_FUNC,
|
||||||
|
.translateFunc = translateMavg,
|
||||||
|
.getEnvFunc = getMavgFuncEnv,
|
||||||
|
.initFunc = functionSetup,
|
||||||
|
.processFunc = mavgFunction,
|
||||||
|
.finalizeFunc = NULL
|
||||||
|
},
|
||||||
{
|
{
|
||||||
.name = "abs",
|
.name = "abs",
|
||||||
.type = FUNCTION_TYPE_ABS,
|
.type = FUNCTION_TYPE_ABS,
|
||||||
|
|
|
@ -21,7 +21,8 @@
|
||||||
#include "tdatablock.h"
|
#include "tdatablock.h"
|
||||||
#include "tpercentile.h"
|
#include "tpercentile.h"
|
||||||
|
|
||||||
#define HISTOGRAM_MAX_BINS_NUM 100
|
#define HISTOGRAM_MAX_BINS_NUM 1000
|
||||||
|
#define MAVG_MAX_POINTS_NUM 1000
|
||||||
|
|
||||||
typedef struct SSumRes {
|
typedef struct SSumRes {
|
||||||
union {
|
union {
|
||||||
|
@ -141,6 +142,14 @@ typedef enum {
|
||||||
STATE_OPER_EQ,
|
STATE_OPER_EQ,
|
||||||
} EStateOperType;
|
} EStateOperType;
|
||||||
|
|
||||||
|
typedef struct SMavgInfo {
|
||||||
|
int32_t pos;
|
||||||
|
double sum;
|
||||||
|
int32_t numOfPoints;
|
||||||
|
bool pointsMeet;
|
||||||
|
double points[];
|
||||||
|
} SMavgInfo;
|
||||||
|
|
||||||
#define SET_VAL(_info, numOfElem, res) \
|
#define SET_VAL(_info, numOfElem, res) \
|
||||||
do { \
|
do { \
|
||||||
if ((numOfElem) <= 0) { \
|
if ((numOfElem) <= 0) { \
|
||||||
|
@ -2946,3 +2955,77 @@ int32_t csumFunction(SqlFunctionCtx* pCtx) {
|
||||||
|
|
||||||
return numOfElems;
|
return numOfElems;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bool getMavgFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) {
|
||||||
|
pEnv->calcMemSize = sizeof(SMavgInfo) + MAVG_MAX_POINTS_NUM * sizeof(double);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
bool mavgFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo *pResultInfo) {
|
||||||
|
if (!functionSetup(pCtx, pResultInfo)) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
SMavgInfo *pInfo = GET_ROWCELL_INTERBUF(pResultInfo);
|
||||||
|
pInfo->pos = 0;
|
||||||
|
pInfo->sum = 0;
|
||||||
|
pInfo->numOfPoints = pCtx->param[1].param.i;
|
||||||
|
if (pInfo->numOfPoints < 1 || pInfo->numOfPoints > MAVG_MAX_POINTS_NUM) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
pInfo->pointsMeet = false;
|
||||||
|
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t mavgFunction(SqlFunctionCtx* pCtx) {
|
||||||
|
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
|
||||||
|
SMavgInfo* pInfo = GET_ROWCELL_INTERBUF(pResInfo);
|
||||||
|
|
||||||
|
SInputColumnInfoData* pInput = &pCtx->input;
|
||||||
|
TSKEY* tsList = (int64_t*)pInput->pPTS->pData;
|
||||||
|
|
||||||
|
SColumnInfoData* pInputCol = pInput->pData[0];
|
||||||
|
SColumnInfoData* pTsOutput = pCtx->pTsOutput;
|
||||||
|
SColumnInfoData* pOutput = (SColumnInfoData*)pCtx->pOutput;
|
||||||
|
|
||||||
|
int32_t numOfElems = 0;
|
||||||
|
int32_t type = pInputCol->info.type;
|
||||||
|
int32_t startOffset = pCtx->offset;
|
||||||
|
for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; i += 1) {
|
||||||
|
int32_t pos = startOffset + numOfElems;
|
||||||
|
if (colDataIsNull_f(pInputCol->nullbitmap, i)) {
|
||||||
|
//colDataAppendNULL(pOutput, i);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
char* data = colDataGetData(pInputCol, i);
|
||||||
|
double v;
|
||||||
|
GET_TYPED_DATA(v, double, type, data);
|
||||||
|
|
||||||
|
if (!pInfo->pointsMeet && (pInfo->pos < pInfo->numOfPoints - 1)) {
|
||||||
|
pInfo->points[pInfo->pos] = v;
|
||||||
|
pInfo->sum += v;
|
||||||
|
} else {
|
||||||
|
if (!pInfo->pointsMeet && (pInfo->pos == pInfo->numOfPoints - 1)) {
|
||||||
|
pInfo->sum +=v;
|
||||||
|
pInfo->pointsMeet = true;
|
||||||
|
} else {
|
||||||
|
pInfo->sum = pInfo->sum + v - pInfo->points[pInfo->pos];
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pInfo->points[pInfo->pos] = v;
|
||||||
|
double result = pInfo->sum / pInfo->numOfPoints;
|
||||||
|
colDataAppend(pOutput, pos, (char *)&result, false);
|
||||||
|
|
||||||
|
//TODO: remove this after pTsOutput is handled
|
||||||
|
if (pTsOutput != NULL) {
|
||||||
|
colDataAppendInt64(pTsOutput, pos, &tsList[i]);
|
||||||
|
}
|
||||||
|
|
||||||
|
numOfElems++;
|
||||||
|
}
|
||||||
|
|
||||||
|
return numOfElems;
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in New Issue