parent
a601081e6d
commit
b96d372e81
|
@ -40,6 +40,11 @@ bool getMinmaxFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
|
|||
int32_t minFunction(SqlFunctionCtx* pCtx);
|
||||
int32_t maxFunction(SqlFunctionCtx *pCtx);
|
||||
|
||||
bool getAvgFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
|
||||
bool avgFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo);
|
||||
int32_t avgFunction(SqlFunctionCtx* pCtx);
|
||||
void avgFinalize(SqlFunctionCtx* pCtx);
|
||||
|
||||
bool getStddevFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
|
||||
bool stddevFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo);
|
||||
int32_t stddevFunction(SqlFunctionCtx* pCtx);
|
||||
|
|
|
@ -104,7 +104,7 @@ static int32_t translateCount(SFunctionNode* pFunc, char* pErrBuf, int32_t len)
|
|||
if (1 != LIST_LENGTH(pFunc->pParameterList)) {
|
||||
return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName);
|
||||
}
|
||||
pFunc->node.resType = (SDataType){.bytes = sizeof(int64_t), .type = TSDB_DATA_TYPE_BIGINT};
|
||||
pFunc->node.resType = (SDataType){.bytes = tDataTypes[TSDB_DATA_TYPE_BIGINT].bytes, .type = TSDB_DATA_TYPE_BIGINT};
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
@ -472,6 +472,16 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
|||
.processFunc = stddevFunction,
|
||||
.finalizeFunc = stddevFinalize
|
||||
},
|
||||
{
|
||||
.name = "avg",
|
||||
.type = FUNCTION_TYPE_AVG,
|
||||
.classification = FUNC_MGT_AGG_FUNC,
|
||||
.translateFunc = translateInNumOutDou,
|
||||
.getEnvFunc = getAvgFuncEnv,
|
||||
.initFunc = avgFunctionSetup,
|
||||
.processFunc = avgFunction,
|
||||
.finalizeFunc = avgFinalize
|
||||
},
|
||||
{
|
||||
.name = "percentile",
|
||||
.type = FUNCTION_TYPE_PERCENTILE,
|
||||
|
|
|
@ -19,6 +19,48 @@
|
|||
#include "taggfunction.h"
|
||||
#include "tdatablock.h"
|
||||
|
||||
typedef struct SSumRes {
|
||||
union {
|
||||
int64_t isum;
|
||||
uint64_t usum;
|
||||
double dsum;
|
||||
};
|
||||
} SSumRes;
|
||||
|
||||
typedef struct SAvgRes {
|
||||
double result;
|
||||
SSumRes sum;
|
||||
int64_t count;
|
||||
} SAvgRes;
|
||||
|
||||
typedef struct STopBotRes {
|
||||
int32_t num;
|
||||
} STopBotRes;
|
||||
|
||||
typedef struct SStddevRes {
|
||||
double result;
|
||||
int64_t count;
|
||||
union {double quadraticDSum; int64_t quadraticISum;};
|
||||
union {double dsum; int64_t isum;};
|
||||
} SStddevRes;
|
||||
|
||||
typedef struct SPercentileInfo {
|
||||
double result;
|
||||
tMemBucket *pMemBucket;
|
||||
int32_t stage;
|
||||
double minval;
|
||||
double maxval;
|
||||
int64_t numOfElems;
|
||||
} SPercentileInfo;
|
||||
|
||||
typedef struct SDiffInfo {
|
||||
bool hasPrev;
|
||||
bool includeNull;
|
||||
bool ignoreNegative;
|
||||
bool firstOutput;
|
||||
union { int64_t i64; double d64;} prev;
|
||||
} SDiffInfo;
|
||||
|
||||
#define SET_VAL(_info, numOfElem, res) \
|
||||
do { \
|
||||
if ((numOfElem) <= 0) { \
|
||||
|
@ -27,13 +69,50 @@
|
|||
(_info)->numOfRes = (res); \
|
||||
} while (0)
|
||||
|
||||
typedef struct SSumRes {
|
||||
union {
|
||||
int64_t isum;
|
||||
uint64_t usum;
|
||||
double dsum;
|
||||
};
|
||||
} SSumRes;
|
||||
#define GET_TS_LIST(x) ((TSKEY*)((x)->ptsList))
|
||||
#define GET_TS_DATA(x, y) (GET_TS_LIST(x)[(y)])
|
||||
|
||||
#define DO_UPDATE_TAG_COLUMNS_WITHOUT_TS(ctx) \
|
||||
do { \
|
||||
for (int32_t _i = 0; _i < (ctx)->tagInfo.numOfTagCols; ++_i) { \
|
||||
SqlFunctionCtx *__ctx = (ctx)->tagInfo.pTagCtxList[_i]; \
|
||||
__ctx->fpSet.process(__ctx); \
|
||||
} \
|
||||
} while (0);
|
||||
|
||||
#define DO_UPDATE_SUBSID_RES(ctx, ts) \
|
||||
do { \
|
||||
for (int32_t _i = 0; _i < (ctx)->subsidiaryRes.numOfCols; ++_i) { \
|
||||
SqlFunctionCtx *__ctx = (ctx)->subsidiaryRes.pCtx[_i]; \
|
||||
if (__ctx->functionId == FUNCTION_TS_DUMMY) { \
|
||||
__ctx->tag.i = (ts); \
|
||||
__ctx->tag.nType = TSDB_DATA_TYPE_BIGINT; \
|
||||
} \
|
||||
__ctx->fpSet.process(__ctx); \
|
||||
} \
|
||||
} while (0)
|
||||
|
||||
#define UPDATE_DATA(ctx, left, right, num, sign, _ts) \
|
||||
do { \
|
||||
if (((left) < (right)) ^ (sign)) { \
|
||||
(left) = (right); \
|
||||
DO_UPDATE_SUBSID_RES(ctx, _ts); \
|
||||
(num) += 1; \
|
||||
} \
|
||||
} while (0)
|
||||
|
||||
#define LOOPCHECK_N(val, _col, ctx, _t, _nrow, _start, sign, num) \
|
||||
do { \
|
||||
_t *d = (_t *)((_col)->pData); \
|
||||
for (int32_t i = (_start); i < (_nrow) + (_start); ++i) { \
|
||||
if (((_col)->hasNull) && colDataIsNull_f((_col)->nullbitmap, i)) { \
|
||||
continue; \
|
||||
} \
|
||||
TSKEY ts = (ctx)->ptsList != NULL ? GET_TS_DATA(ctx, i) : 0; \
|
||||
UPDATE_DATA(ctx, val, d[i], num, sign, ts); \
|
||||
} \
|
||||
} while (0)
|
||||
|
||||
|
||||
bool functionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo) {
|
||||
if (pResultInfo->initialized) {
|
||||
|
@ -128,7 +207,7 @@ int32_t sumFunction(SqlFunctionCtx *pCtx) {
|
|||
int32_t type = pInput->pData[0]->info.type;
|
||||
|
||||
SSumRes* pSumRes = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
|
||||
|
||||
|
||||
if (pInput->colDataAggIsSet) {
|
||||
numOfElem = pInput->numOfRows - pAgg->numOfNull;
|
||||
ASSERT(numOfElem >= 0);
|
||||
|
@ -183,6 +262,145 @@ bool getSumFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) {
|
|||
return true;
|
||||
}
|
||||
|
||||
bool getAvgFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) {
|
||||
pEnv->calcMemSize = sizeof(double);
|
||||
return true;
|
||||
}
|
||||
|
||||
bool avgFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo) {
|
||||
if (!functionSetup(pCtx, pResultInfo)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
SAvgRes* pRes = GET_ROWCELL_INTERBUF(pResultInfo);
|
||||
memset(pRes, 0, sizeof(SAvgRes));
|
||||
return true;
|
||||
}
|
||||
|
||||
int32_t avgFunction(SqlFunctionCtx* pCtx) {
|
||||
int32_t numOfElem = 0;
|
||||
|
||||
// Only the pre-computing information loaded and actual data does not loaded
|
||||
SInputColumnInfoData* pInput = &pCtx->input;
|
||||
int32_t type = pInput->pData[0]->info.type;
|
||||
|
||||
SAvgRes* pAvgRes = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
|
||||
|
||||
// computing based on the true data block
|
||||
SColumnInfoData* pCol = pInput->pData[0];
|
||||
|
||||
int32_t start = pInput->startRowIndex;
|
||||
int32_t numOfRows = pInput->numOfRows;
|
||||
|
||||
switch (type) {
|
||||
case TSDB_DATA_TYPE_TINYINT: {
|
||||
int8_t* plist = (int8_t*)pCol->pData;
|
||||
for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) {
|
||||
if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
numOfElem += 1;
|
||||
pAvgRes->count += 1;
|
||||
pAvgRes->sum.isum += plist[i];
|
||||
}
|
||||
|
||||
break;
|
||||
}
|
||||
|
||||
case TSDB_DATA_TYPE_SMALLINT: {
|
||||
int16_t* plist = (int16_t*)pCol->pData;
|
||||
for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) {
|
||||
if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
numOfElem += 1;
|
||||
pAvgRes->count += 1;
|
||||
pAvgRes->sum.isum += plist[i];
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
||||
case TSDB_DATA_TYPE_INT: {
|
||||
int32_t* plist = (int32_t*)pCol->pData;
|
||||
for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) {
|
||||
if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
numOfElem += 1;
|
||||
pAvgRes->count += 1;
|
||||
pAvgRes->sum.isum += plist[i];
|
||||
}
|
||||
|
||||
break;
|
||||
}
|
||||
|
||||
case TSDB_DATA_TYPE_BIGINT: {
|
||||
int64_t* plist = (int64_t*)pCol->pData;
|
||||
for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) {
|
||||
if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
numOfElem += 1;
|
||||
pAvgRes->count += 1;
|
||||
pAvgRes->sum.isum += plist[i];
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
||||
case TSDB_DATA_TYPE_FLOAT: {
|
||||
float* plist = (float*)pCol->pData;
|
||||
for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) {
|
||||
if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
numOfElem += 1;
|
||||
pAvgRes->count += 1;
|
||||
pAvgRes->sum.dsum += plist[i];
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
||||
case TSDB_DATA_TYPE_DOUBLE: {
|
||||
double* plist = (double*)pCol->pData;
|
||||
for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) {
|
||||
if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
numOfElem += 1;
|
||||
pAvgRes->count += 1;
|
||||
pAvgRes->sum.dsum += plist[i];
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
||||
default:
|
||||
break;
|
||||
}
|
||||
|
||||
// data in the check operation are all null, not output
|
||||
SET_VAL(GET_RES_INFO(pCtx), numOfElem, 1);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
void avgFinalize(SqlFunctionCtx* pCtx) {
|
||||
functionFinalize(pCtx);
|
||||
|
||||
SInputColumnInfoData* pInput = &pCtx->input;
|
||||
int32_t type = pInput->pData[0]->info.type;
|
||||
SAvgRes* pAvgRes = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
|
||||
if (IS_INTEGER_TYPE(type)) {
|
||||
pAvgRes->result = pAvgRes->sum.isum / ((double) pAvgRes->count);
|
||||
} else {
|
||||
pAvgRes->result = pAvgRes->sum.dsum / ((double) pAvgRes->count);
|
||||
}
|
||||
}
|
||||
|
||||
EFuncDataRequired statisDataRequired(SFunctionNode* pFunc, STimeWindow* pTimeWindow){
|
||||
return FUNC_DATA_REQUIRED_STATIS_LOAD;
|
||||
}
|
||||
|
@ -285,49 +503,6 @@ bool getMinmaxFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) {
|
|||
return true;
|
||||
}
|
||||
|
||||
#define GET_TS_LIST(x) ((TSKEY*)((x)->ptsList))
|
||||
#define GET_TS_DATA(x, y) (GET_TS_LIST(x)[(y)])
|
||||
|
||||
#define DO_UPDATE_TAG_COLUMNS_WITHOUT_TS(ctx) \
|
||||
do { \
|
||||
for (int32_t _i = 0; _i < (ctx)->tagInfo.numOfTagCols; ++_i) { \
|
||||
SqlFunctionCtx *__ctx = (ctx)->tagInfo.pTagCtxList[_i]; \
|
||||
__ctx->fpSet.process(__ctx); \
|
||||
} \
|
||||
} while (0);
|
||||
|
||||
#define DO_UPDATE_SUBSID_RES(ctx, ts) \
|
||||
do { \
|
||||
for (int32_t _i = 0; _i < (ctx)->subsidiaryRes.numOfCols; ++_i) { \
|
||||
SqlFunctionCtx *__ctx = (ctx)->subsidiaryRes.pCtx[_i]; \
|
||||
if (__ctx->functionId == FUNCTION_TS_DUMMY) { \
|
||||
__ctx->tag.i = (ts); \
|
||||
__ctx->tag.nType = TSDB_DATA_TYPE_BIGINT; \
|
||||
} \
|
||||
__ctx->fpSet.process(__ctx); \
|
||||
} \
|
||||
} while (0)
|
||||
|
||||
#define UPDATE_DATA(ctx, left, right, num, sign, _ts) \
|
||||
do { \
|
||||
if (((left) < (right)) ^ (sign)) { \
|
||||
(left) = (right); \
|
||||
DO_UPDATE_SUBSID_RES(ctx, _ts); \
|
||||
(num) += 1; \
|
||||
} \
|
||||
} while (0)
|
||||
|
||||
#define LOOPCHECK_N(val, _col, ctx, _t, _nrow, _start, sign, num) \
|
||||
do { \
|
||||
_t *d = (_t *)((_col)->pData); \
|
||||
for (int32_t i = (_start); i < (_nrow) + (_start); ++i) { \
|
||||
if (((_col)->hasNull) && colDataIsNull_f((_col)->nullbitmap, i)) { \
|
||||
continue; \
|
||||
} \
|
||||
TSKEY ts = (ctx)->ptsList != NULL ? GET_TS_DATA(ctx, i) : 0; \
|
||||
UPDATE_DATA(ctx, val, d[i], num, sign, ts); \
|
||||
} \
|
||||
} while (0)
|
||||
|
||||
int32_t doMinMaxHelper(SqlFunctionCtx *pCtx, int32_t isMinFunc) {
|
||||
int32_t numOfElems = 0;
|
||||
|
@ -472,10 +647,6 @@ int32_t maxFunction(SqlFunctionCtx *pCtx) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
typedef struct STopBotRes {
|
||||
int32_t num;
|
||||
} STopBotRes;
|
||||
|
||||
bool getTopBotFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
|
||||
SColumnNode* pColNode = (SColumnNode*) nodesListGetNode(pFunc->pParameterList, 0);
|
||||
int32_t bytes = pColNode->node.resType.bytes;
|
||||
|
@ -483,13 +654,6 @@ bool getTopBotFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
|
|||
return true;
|
||||
}
|
||||
|
||||
typedef struct SStddevRes {
|
||||
double result;
|
||||
int64_t count;
|
||||
union {double quadraticDSum; int64_t quadraticISum;};
|
||||
union {double dsum; int64_t isum;};
|
||||
} SStddevRes;
|
||||
|
||||
bool getStddevFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
|
||||
pEnv->calcMemSize = sizeof(SStddevRes);
|
||||
return true;
|
||||
|
@ -592,8 +756,8 @@ int32_t stddevFunction(SqlFunctionCtx* pCtx) {
|
|||
|
||||
numOfElem += 1;
|
||||
pStddevRes->count += 1;
|
||||
pStddevRes->isum += plist[i];
|
||||
pStddevRes->quadraticISum += plist[i] * plist[i];
|
||||
pStddevRes->dsum += plist[i];
|
||||
pStddevRes->quadraticDSum += plist[i] * plist[i];
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
@ -607,8 +771,8 @@ int32_t stddevFunction(SqlFunctionCtx* pCtx) {
|
|||
|
||||
numOfElem += 1;
|
||||
pStddevRes->count += 1;
|
||||
pStddevRes->isum += plist[i];
|
||||
pStddevRes->quadraticISum += plist[i] * plist[i];
|
||||
pStddevRes->dsum += plist[i];
|
||||
pStddevRes->quadraticDSum += plist[i] * plist[i];
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
@ -625,20 +789,19 @@ int32_t stddevFunction(SqlFunctionCtx* pCtx) {
|
|||
void stddevFinalize(SqlFunctionCtx* pCtx) {
|
||||
functionFinalize(pCtx);
|
||||
|
||||
SInputColumnInfoData* pInput = &pCtx->input;
|
||||
int32_t type = pInput->pData[0]->info.type;
|
||||
SStddevRes* pStddevRes = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
|
||||
double avg = pStddevRes->isum / ((double) pStddevRes->count);
|
||||
pStddevRes->result = sqrt(pStddevRes->quadraticISum/((double)pStddevRes->count) - avg*avg);
|
||||
double avg;
|
||||
if (IS_INTEGER_TYPE(type)) {
|
||||
avg = pStddevRes->isum / ((double) pStddevRes->count);
|
||||
pStddevRes->result = sqrt(pStddevRes->quadraticISum/((double)pStddevRes->count) - avg*avg);
|
||||
} else {
|
||||
avg = pStddevRes->dsum / ((double) pStddevRes->count);
|
||||
pStddevRes->result = sqrt(pStddevRes->quadraticDSum/((double)pStddevRes->count) - avg*avg);
|
||||
}
|
||||
}
|
||||
|
||||
typedef struct SPercentileInfo {
|
||||
double result;
|
||||
tMemBucket *pMemBucket;
|
||||
int32_t stage;
|
||||
double minval;
|
||||
double maxval;
|
||||
int64_t numOfElems;
|
||||
} SPercentileInfo;
|
||||
|
||||
bool getPercentileFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
|
||||
pEnv->calcMemSize = sizeof(SPercentileInfo);
|
||||
return true;
|
||||
|
@ -933,14 +1096,6 @@ int32_t lastFunction(SqlFunctionCtx *pCtx) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
typedef struct SDiffInfo {
|
||||
bool hasPrev;
|
||||
bool includeNull;
|
||||
bool ignoreNegative;
|
||||
bool firstOutput;
|
||||
union { int64_t i64; double d64;} prev;
|
||||
} SDiffInfo;
|
||||
|
||||
bool getDiffFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) {
|
||||
pEnv->calcMemSize = sizeof(SDiffInfo);
|
||||
return true;
|
||||
|
|
Loading…
Reference in New Issue