feat(query): add apercentile function
This commit is contained in:
parent
c62aabcadb
commit
011bd32330
|
@ -67,6 +67,11 @@ bool percentileFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultI
|
||||||
int32_t percentileFunction(SqlFunctionCtx *pCtx);
|
int32_t percentileFunction(SqlFunctionCtx *pCtx);
|
||||||
int32_t percentileFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
|
int32_t percentileFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
|
||||||
|
|
||||||
|
bool getApercentileFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
|
||||||
|
bool apercentileFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo);
|
||||||
|
int32_t apercentileFunction(SqlFunctionCtx *pCtx);
|
||||||
|
int32_t apercentileFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
|
||||||
|
|
||||||
bool getDiffFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
|
bool getDiffFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
|
||||||
bool diffFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResInfo);
|
bool diffFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResInfo);
|
||||||
int32_t diffFunction(SqlFunctionCtx *pCtx);
|
int32_t diffFunction(SqlFunctionCtx *pCtx);
|
||||||
|
|
|
@ -815,10 +815,10 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
||||||
.type = FUNCTION_TYPE_APERCENTILE,
|
.type = FUNCTION_TYPE_APERCENTILE,
|
||||||
.classification = FUNC_MGT_AGG_FUNC,
|
.classification = FUNC_MGT_AGG_FUNC,
|
||||||
.translateFunc = translateApercentile,
|
.translateFunc = translateApercentile,
|
||||||
.getEnvFunc = getMinmaxFuncEnv,
|
.getEnvFunc = getApercentileFuncEnv,
|
||||||
.initFunc = minmaxFunctionSetup,
|
.initFunc = apercentileFunctionSetup,
|
||||||
.processFunc = maxFunction,
|
.processFunc = apercentileFunction,
|
||||||
.finalizeFunc = functionFinalize
|
.finalizeFunc = apercentileFinalize
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
.name = "top",
|
.name = "top",
|
||||||
|
|
|
@ -20,6 +20,8 @@
|
||||||
#include "taggfunction.h"
|
#include "taggfunction.h"
|
||||||
#include "tcompare.h"
|
#include "tcompare.h"
|
||||||
#include "tdatablock.h"
|
#include "tdatablock.h"
|
||||||
|
#include "tdigest.h"
|
||||||
|
#include "thistogram.h"
|
||||||
#include "tpercentile.h"
|
#include "tpercentile.h"
|
||||||
|
|
||||||
#define HISTOGRAM_MAX_BINS_NUM 1000
|
#define HISTOGRAM_MAX_BINS_NUM 1000
|
||||||
|
@ -95,6 +97,19 @@ typedef struct SPercentileInfo {
|
||||||
int64_t numOfElems;
|
int64_t numOfElems;
|
||||||
} SPercentileInfo;
|
} SPercentileInfo;
|
||||||
|
|
||||||
|
typedef struct SAPercentileInfo {
|
||||||
|
double result;
|
||||||
|
int8_t algo;
|
||||||
|
SHistogramInfo *pHisto;
|
||||||
|
TDigest *pTDigest;
|
||||||
|
} SAPercentileInfo;
|
||||||
|
|
||||||
|
typedef enum {
|
||||||
|
APERCT_ALGO_UNKNOWN = 0,
|
||||||
|
APERCT_ALGO_DEFAULT,
|
||||||
|
APERCT_ALGO_TDIGEST,
|
||||||
|
} EAPerctAlgoType;
|
||||||
|
|
||||||
typedef struct SDiffInfo {
|
typedef struct SDiffInfo {
|
||||||
bool hasPrev;
|
bool hasPrev;
|
||||||
bool includeNull;
|
bool includeNull;
|
||||||
|
@ -1790,6 +1805,131 @@ int32_t percentileFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
||||||
return functionFinalize(pCtx, pBlock);
|
return functionFinalize(pCtx, pBlock);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bool getApercentileFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
|
||||||
|
int32_t bytesHist = (int32_t)(sizeof(SAPercentileInfo) + sizeof(SHistogramInfo) + sizeof(SHistBin) * (MAX_HISTOGRAM_BIN + 1));
|
||||||
|
int32_t bytesDigest = (int32_t)(sizeof(SAPercentileInfo) + TDIGEST_SIZE(COMPRESSION));
|
||||||
|
pEnv->calcMemSize = MAX(bytesHist, bytesDigest);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int8_t getApercentileAlgo(char *algoStr) {
|
||||||
|
int8_t algoType;
|
||||||
|
if (strcasecmp(algoStr, "default") == 0) {
|
||||||
|
algoType = APERCT_ALGO_DEFAULT;
|
||||||
|
} else if (strcasecmp(algoStr, "t-digest") == 0) {
|
||||||
|
algoType = APERCT_ALGO_TDIGEST;
|
||||||
|
} else {
|
||||||
|
algoType = APERCT_ALGO_UNKNOWN;
|
||||||
|
}
|
||||||
|
|
||||||
|
return algoType;
|
||||||
|
}
|
||||||
|
|
||||||
|
static void buildHistogramInfo(SAPercentileInfo* pInfo) {
|
||||||
|
pInfo->pHisto = (SHistogramInfo*) ((char*) pInfo + sizeof(SAPercentileInfo));
|
||||||
|
pInfo->pHisto->elems = (SHistBin*) ((char*)pInfo->pHisto + sizeof(SHistogramInfo));
|
||||||
|
}
|
||||||
|
|
||||||
|
bool apercentileFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo) {
|
||||||
|
if (!functionSetup(pCtx, pResultInfo)) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
SAPercentileInfo* pInfo = GET_ROWCELL_INTERBUF(pResultInfo);
|
||||||
|
if (pCtx->numOfParams == 2) {
|
||||||
|
pInfo->algo = APERCT_ALGO_DEFAULT;
|
||||||
|
} else if (pCtx->numOfParams == 3) {
|
||||||
|
pInfo->algo = getApercentileAlgo(pCtx->param[2].param.pz);
|
||||||
|
if (pInfo->algo == APERCT_ALGO_UNKNOWN) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
char *tmp = (char *)pInfo + sizeof(SAPercentileInfo);
|
||||||
|
if (pInfo->algo == APERCT_ALGO_TDIGEST) {
|
||||||
|
pInfo->pTDigest = tdigestNewFrom(tmp, COMPRESSION);
|
||||||
|
} else {
|
||||||
|
buildHistogramInfo(pInfo);
|
||||||
|
pInfo->pHisto = tHistogramCreateFrom(tmp, MAX_HISTOGRAM_BIN);
|
||||||
|
}
|
||||||
|
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t apercentileFunction(SqlFunctionCtx* pCtx) {
|
||||||
|
int32_t notNullElems = 0;
|
||||||
|
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
|
||||||
|
|
||||||
|
SInputColumnInfoData* pInput = &pCtx->input;
|
||||||
|
//SColumnDataAgg* pAgg = pInput->pColumnDataAgg[0];
|
||||||
|
|
||||||
|
SColumnInfoData* pCol = pInput->pData[0];
|
||||||
|
int32_t type = pCol->info.type;
|
||||||
|
|
||||||
|
SAPercentileInfo* pInfo = GET_ROWCELL_INTERBUF(pResInfo);
|
||||||
|
|
||||||
|
int32_t start = pInput->startRowIndex;
|
||||||
|
if (pInfo->algo == APERCT_ALGO_TDIGEST) {
|
||||||
|
for (int32_t i = start; i < pInput->numOfRows + start; ++i) {
|
||||||
|
if (colDataIsNull_f(pCol->nullbitmap, i)) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
notNullElems += 1;
|
||||||
|
char* data = colDataGetData(pCol, i);
|
||||||
|
|
||||||
|
double v = 0; // value
|
||||||
|
int64_t w = 1; // weigth
|
||||||
|
GET_TYPED_DATA(v, double, type, data);
|
||||||
|
tdigestAdd(pInfo->pTDigest, v, w);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
for (int32_t i = start; i < pInput->numOfRows + start; ++i) {
|
||||||
|
if (colDataIsNull_f(pCol->nullbitmap, i)) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
notNullElems += 1;
|
||||||
|
char* data = colDataGetData(pCol, i);
|
||||||
|
|
||||||
|
double v = 0;
|
||||||
|
GET_TYPED_DATA(v, double, type, data);
|
||||||
|
tHistogramAdd(&pInfo->pHisto, v);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
SET_VAL(pResInfo, notNullElems, 1);
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t apercentileFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
||||||
|
SVariant* pVal = &pCtx->param[1].param;
|
||||||
|
double percent = (pVal->nType == TSDB_DATA_TYPE_BIGINT) ? pVal->i : pVal->d;
|
||||||
|
|
||||||
|
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
|
||||||
|
SAPercentileInfo* pInfo = (SAPercentileInfo*)GET_ROWCELL_INTERBUF(pResInfo);
|
||||||
|
|
||||||
|
if (pInfo->algo == APERCT_ALGO_TDIGEST) {
|
||||||
|
if (pInfo->pTDigest->size > 0) {
|
||||||
|
pInfo->result = tdigestQuantile(pInfo->pTDigest, percent/100);
|
||||||
|
} else { // no need to free
|
||||||
|
//setNull(pCtx->pOutput, pCtx->outputType, pCtx->outputBytes);
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
if (pInfo->pHisto->numOfElems > 0) {
|
||||||
|
double ratio[] = {percent};
|
||||||
|
double *res = tHistogramUniform(pInfo->pHisto, ratio, 1);
|
||||||
|
pInfo->result = *res;
|
||||||
|
//memcpy(pCtx->pOutput, res, sizeof(double));
|
||||||
|
taosMemoryFree(res);
|
||||||
|
} else { // no need to free
|
||||||
|
//setNull(pCtx->pOutput, pCtx->outputType, pCtx->outputBytes);
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return functionFinalize(pCtx, pBlock);
|
||||||
|
}
|
||||||
|
|
||||||
bool getFirstLastFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
|
bool getFirstLastFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
|
||||||
SColumnNode* pNode = nodesListGetNode(pFunc->pParameterList, 0);
|
SColumnNode* pNode = nodesListGetNode(pFunc->pParameterList, 0);
|
||||||
pEnv->calcMemSize = pNode->node.resType.bytes + sizeof(int64_t);
|
pEnv->calcMemSize = pNode->node.resType.bytes + sizeof(int64_t);
|
||||||
|
@ -1802,8 +1942,6 @@ bool getSelectivityFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
static FORCE_INLINE TSKEY getRowPTs(SColumnInfoData* pTsColInfo, int32_t rowIndex) {
|
static FORCE_INLINE TSKEY getRowPTs(SColumnInfoData* pTsColInfo, int32_t rowIndex) {
|
||||||
if (pTsColInfo == NULL) {
|
if (pTsColInfo == NULL) {
|
||||||
return 0;
|
return 0;
|
||||||
|
|
Loading…
Reference in New Issue