Merge pull request #12887 from taosdata/feature/3.0_glzhao
feat(query): add apercentile function
This commit is contained in:
commit
fb0770485d
|
@ -0,0 +1,71 @@
|
|||
/*
|
||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||
*
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3
|
||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
/*
|
||||
* include/tdigest.c
|
||||
*
|
||||
* Copyright (c) 2016, Usman Masood <usmanm at fastmail dot fm>
|
||||
*/
|
||||
|
||||
#ifndef TDIGEST_H
|
||||
#define TDIGEST_H
|
||||
|
||||
#ifndef M_PI
|
||||
#define M_PI 3.14159265358979323846264338327950288 /* pi */
|
||||
#endif
|
||||
|
||||
#define DOUBLE_MAX 1.79e+308
|
||||
|
||||
#define ADDITION_CENTROID_NUM 2
|
||||
#define COMPRESSION 300
|
||||
#define GET_CENTROID(compression) (ceil(compression * M_PI / 2) + 1 + ADDITION_CENTROID_NUM)
|
||||
#define GET_THRESHOLD(compression) (7.5 + 0.37 * compression - 2e-4 * pow(compression, 2))
|
||||
#define TDIGEST_SIZE(compression) (sizeof(TDigest) + sizeof(SCentroid)*GET_CENTROID(compression) + sizeof(SPt)*GET_THRESHOLD(compression))
|
||||
|
||||
typedef struct SCentroid {
|
||||
double mean;
|
||||
int64_t weight;
|
||||
}SCentroid;
|
||||
|
||||
typedef struct SPt {
|
||||
double value;
|
||||
int64_t weight;
|
||||
}SPt;
|
||||
|
||||
typedef struct TDigest {
|
||||
double compression;
|
||||
int32_t threshold;
|
||||
int64_t size;
|
||||
|
||||
int64_t total_weight;
|
||||
double min;
|
||||
double max;
|
||||
|
||||
int32_t num_buffered_pts;
|
||||
SPt *buffered_pts;
|
||||
|
||||
int32_t num_centroids;
|
||||
SCentroid *centroids;
|
||||
}TDigest;
|
||||
|
||||
TDigest *tdigestNewFrom(void* pBuf, int32_t compression);
|
||||
void tdigestAdd(TDigest *t, double x, int64_t w);
|
||||
void tdigestMerge(TDigest *t1, TDigest *t2);
|
||||
double tdigestQuantile(TDigest *t, double q);
|
||||
void tdigestCompress(TDigest *t);
|
||||
void tdigestFreeFrom(TDigest *t);
|
||||
void tdigestAutoFill(TDigest* t, int32_t compression);
|
||||
|
||||
#endif /* TDIGEST_H */
|
|
@ -73,6 +73,11 @@ bool percentileFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultI
|
|||
int32_t percentileFunction(SqlFunctionCtx *pCtx);
|
||||
int32_t percentileFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
|
||||
|
||||
bool getApercentileFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
|
||||
bool apercentileFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo);
|
||||
int32_t apercentileFunction(SqlFunctionCtx *pCtx);
|
||||
int32_t apercentileFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
|
||||
|
||||
bool getDiffFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
|
||||
bool diffFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResInfo);
|
||||
int32_t diffFunction(SqlFunctionCtx *pCtx);
|
||||
|
|
|
@ -998,10 +998,10 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
|||
.type = FUNCTION_TYPE_APERCENTILE,
|
||||
.classification = FUNC_MGT_AGG_FUNC,
|
||||
.translateFunc = translateApercentile,
|
||||
.getEnvFunc = getMinmaxFuncEnv,
|
||||
.initFunc = minmaxFunctionSetup,
|
||||
.processFunc = maxFunction,
|
||||
.finalizeFunc = functionFinalize
|
||||
.getEnvFunc = getApercentileFuncEnv,
|
||||
.initFunc = apercentileFunctionSetup,
|
||||
.processFunc = apercentileFunction,
|
||||
.finalizeFunc = apercentileFinalize
|
||||
},
|
||||
{
|
||||
.name = "top",
|
||||
|
|
|
@ -20,6 +20,8 @@
|
|||
#include "taggfunction.h"
|
||||
#include "tcompare.h"
|
||||
#include "tdatablock.h"
|
||||
#include "tdigest.h"
|
||||
#include "thistogram.h"
|
||||
#include "tpercentile.h"
|
||||
|
||||
#define HISTOGRAM_MAX_BINS_NUM 1000
|
||||
|
@ -95,6 +97,19 @@ typedef struct SPercentileInfo {
|
|||
int64_t numOfElems;
|
||||
} SPercentileInfo;
|
||||
|
||||
typedef struct SAPercentileInfo {
|
||||
double result;
|
||||
int8_t algo;
|
||||
SHistogramInfo *pHisto;
|
||||
TDigest *pTDigest;
|
||||
} SAPercentileInfo;
|
||||
|
||||
typedef enum {
|
||||
APERCT_ALGO_UNKNOWN = 0,
|
||||
APERCT_ALGO_DEFAULT,
|
||||
APERCT_ALGO_TDIGEST,
|
||||
} EAPerctAlgoType;
|
||||
|
||||
typedef struct SDiffInfo {
|
||||
bool hasPrev;
|
||||
bool includeNull;
|
||||
|
@ -1905,6 +1920,131 @@ int32_t percentileFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
|||
return functionFinalize(pCtx, pBlock);
|
||||
}
|
||||
|
||||
bool getApercentileFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
|
||||
int32_t bytesHist = (int32_t)(sizeof(SAPercentileInfo) + sizeof(SHistogramInfo) + sizeof(SHistBin) * (MAX_HISTOGRAM_BIN + 1));
|
||||
int32_t bytesDigest = (int32_t)(sizeof(SAPercentileInfo) + TDIGEST_SIZE(COMPRESSION));
|
||||
pEnv->calcMemSize = TMAX(bytesHist, bytesDigest);
|
||||
return true;
|
||||
}
|
||||
|
||||
static int8_t getApercentileAlgo(char *algoStr) {
|
||||
int8_t algoType;
|
||||
if (strcasecmp(algoStr, "default") == 0) {
|
||||
algoType = APERCT_ALGO_DEFAULT;
|
||||
} else if (strcasecmp(algoStr, "t-digest") == 0) {
|
||||
algoType = APERCT_ALGO_TDIGEST;
|
||||
} else {
|
||||
algoType = APERCT_ALGO_UNKNOWN;
|
||||
}
|
||||
|
||||
return algoType;
|
||||
}
|
||||
|
||||
static void buildHistogramInfo(SAPercentileInfo* pInfo) {
|
||||
pInfo->pHisto = (SHistogramInfo*) ((char*) pInfo + sizeof(SAPercentileInfo));
|
||||
pInfo->pHisto->elems = (SHistBin*) ((char*)pInfo->pHisto + sizeof(SHistogramInfo));
|
||||
}
|
||||
|
||||
bool apercentileFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo) {
|
||||
if (!functionSetup(pCtx, pResultInfo)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
SAPercentileInfo* pInfo = GET_ROWCELL_INTERBUF(pResultInfo);
|
||||
if (pCtx->numOfParams == 2) {
|
||||
pInfo->algo = APERCT_ALGO_DEFAULT;
|
||||
} else if (pCtx->numOfParams == 3) {
|
||||
pInfo->algo = getApercentileAlgo(pCtx->param[2].param.pz);
|
||||
if (pInfo->algo == APERCT_ALGO_UNKNOWN) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
char *tmp = (char *)pInfo + sizeof(SAPercentileInfo);
|
||||
if (pInfo->algo == APERCT_ALGO_TDIGEST) {
|
||||
pInfo->pTDigest = tdigestNewFrom(tmp, COMPRESSION);
|
||||
} else {
|
||||
buildHistogramInfo(pInfo);
|
||||
pInfo->pHisto = tHistogramCreateFrom(tmp, MAX_HISTOGRAM_BIN);
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
int32_t apercentileFunction(SqlFunctionCtx* pCtx) {
|
||||
int32_t notNullElems = 0;
|
||||
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
|
||||
|
||||
SInputColumnInfoData* pInput = &pCtx->input;
|
||||
//SColumnDataAgg* pAgg = pInput->pColumnDataAgg[0];
|
||||
|
||||
SColumnInfoData* pCol = pInput->pData[0];
|
||||
int32_t type = pCol->info.type;
|
||||
|
||||
SAPercentileInfo* pInfo = GET_ROWCELL_INTERBUF(pResInfo);
|
||||
|
||||
int32_t start = pInput->startRowIndex;
|
||||
if (pInfo->algo == APERCT_ALGO_TDIGEST) {
|
||||
for (int32_t i = start; i < pInput->numOfRows + start; ++i) {
|
||||
if (colDataIsNull_f(pCol->nullbitmap, i)) {
|
||||
continue;
|
||||
}
|
||||
notNullElems += 1;
|
||||
char* data = colDataGetData(pCol, i);
|
||||
|
||||
double v = 0; // value
|
||||
int64_t w = 1; // weigth
|
||||
GET_TYPED_DATA(v, double, type, data);
|
||||
tdigestAdd(pInfo->pTDigest, v, w);
|
||||
}
|
||||
} else {
|
||||
for (int32_t i = start; i < pInput->numOfRows + start; ++i) {
|
||||
if (colDataIsNull_f(pCol->nullbitmap, i)) {
|
||||
continue;
|
||||
}
|
||||
notNullElems += 1;
|
||||
char* data = colDataGetData(pCol, i);
|
||||
|
||||
double v = 0;
|
||||
GET_TYPED_DATA(v, double, type, data);
|
||||
tHistogramAdd(&pInfo->pHisto, v);
|
||||
}
|
||||
}
|
||||
|
||||
SET_VAL(pResInfo, notNullElems, 1);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t apercentileFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
||||
SVariant* pVal = &pCtx->param[1].param;
|
||||
double percent = (pVal->nType == TSDB_DATA_TYPE_BIGINT) ? pVal->i : pVal->d;
|
||||
|
||||
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
|
||||
SAPercentileInfo* pInfo = (SAPercentileInfo*)GET_ROWCELL_INTERBUF(pResInfo);
|
||||
|
||||
if (pInfo->algo == APERCT_ALGO_TDIGEST) {
|
||||
if (pInfo->pTDigest->size > 0) {
|
||||
pInfo->result = tdigestQuantile(pInfo->pTDigest, percent/100);
|
||||
} else { // no need to free
|
||||
//setNull(pCtx->pOutput, pCtx->outputType, pCtx->outputBytes);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
} else {
|
||||
if (pInfo->pHisto->numOfElems > 0) {
|
||||
double ratio[] = {percent};
|
||||
double *res = tHistogramUniform(pInfo->pHisto, ratio, 1);
|
||||
pInfo->result = *res;
|
||||
//memcpy(pCtx->pOutput, res, sizeof(double));
|
||||
taosMemoryFree(res);
|
||||
} else { // no need to free
|
||||
//setNull(pCtx->pOutput, pCtx->outputType, pCtx->outputBytes);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
}
|
||||
|
||||
return functionFinalize(pCtx, pBlock);
|
||||
}
|
||||
|
||||
bool getFirstLastFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
|
||||
SColumnNode* pNode = nodesListGetNode(pFunc->pParameterList, 0);
|
||||
pEnv->calcMemSize = pNode->node.resType.bytes + sizeof(int64_t);
|
||||
|
@ -1917,8 +2057,6 @@ bool getSelectivityFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
|
|||
return true;
|
||||
}
|
||||
|
||||
|
||||
|
||||
static FORCE_INLINE TSKEY getRowPTs(SColumnInfoData* pTsColInfo, int32_t rowIndex) {
|
||||
if (pTsColInfo == NULL) {
|
||||
return 0;
|
||||
|
|
|
@ -0,0 +1,319 @@
|
|||
/*
|
||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||
*
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3
|
||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
/*
|
||||
* src/tdigest.c
|
||||
*
|
||||
* Implementation of the t-digest data structure used to compute accurate percentiles.
|
||||
*
|
||||
* It is based on the MergingDigest implementation found at:
|
||||
* https://github.com/tdunning/t-digest/blob/master/src/main/java/com/tdunning/math/stats/MergingDigest.java
|
||||
*
|
||||
* Copyright (c) 2016, Usman Masood <usmanm at fastmail dot fm>
|
||||
*/
|
||||
|
||||
#include "os.h"
|
||||
#include "osMath.h"
|
||||
#include "tdigest.h"
|
||||
|
||||
#define INTERPOLATE(x, x0, x1) (((x) - (x0)) / ((x1) - (x0)))
|
||||
//#define INTEGRATED_LOCATION(compression, q) ((compression) * (asin(2 * (q) - 1) + M_PI / 2) / M_PI)
|
||||
#define INTEGRATED_LOCATION(compression, q) ((compression) * (asin(2 * (double)(q) - 1)/M_PI + (double)1/2))
|
||||
#define FLOAT_EQ(f1, f2) (fabs((f1) - (f2)) <= FLT_EPSILON)
|
||||
|
||||
typedef struct SMergeArgs {
|
||||
TDigest *t;
|
||||
SCentroid *centroids;
|
||||
int32_t idx;
|
||||
double weight_so_far;
|
||||
double k1;
|
||||
double min;
|
||||
double max;
|
||||
}SMergeArgs;
|
||||
|
||||
void tdigestAutoFill(TDigest* t, int32_t compression) {
|
||||
t->centroids = (SCentroid*)((char*)t + sizeof(TDigest));
|
||||
t->buffered_pts = (SPt*) ((char*)t + sizeof(TDigest) + sizeof(SCentroid) * (int32_t)GET_CENTROID(compression));
|
||||
}
|
||||
|
||||
TDigest *tdigestNewFrom(void* pBuf, int32_t compression) {
|
||||
memset(pBuf, 0, (size_t)TDIGEST_SIZE(compression));
|
||||
TDigest* t = (TDigest*)pBuf;
|
||||
tdigestAutoFill(t, compression);
|
||||
|
||||
t->compression = compression;
|
||||
t->size = (int64_t)GET_CENTROID(compression);
|
||||
t->threshold = (int32_t)GET_THRESHOLD(compression);
|
||||
t->min = DOUBLE_MAX;
|
||||
t->max = -DOUBLE_MAX;
|
||||
|
||||
return t;
|
||||
}
|
||||
|
||||
static int32_t cmpCentroid(const void *a, const void *b) {
|
||||
SCentroid *c1 = (SCentroid *) a;
|
||||
SCentroid *c2 = (SCentroid *) b;
|
||||
if (c1->mean < c2->mean)
|
||||
return -1;
|
||||
if (c1->mean > c2->mean)
|
||||
return 1;
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
||||
static void mergeCentroid(SMergeArgs *args, SCentroid *merge) {
|
||||
double k2;
|
||||
SCentroid *c = &args->centroids[args->idx];
|
||||
|
||||
args->weight_so_far += merge->weight;
|
||||
k2 = INTEGRATED_LOCATION(args->t->size,
|
||||
args->weight_so_far / args->t->total_weight);
|
||||
//idx++
|
||||
if(k2 - args->k1 > 1 && c->weight > 0) {
|
||||
if(args->idx + 1 < args->t->size
|
||||
&& merge->mean != args->centroids[args->idx].mean) {
|
||||
args->idx++;
|
||||
}
|
||||
args->k1 = k2;
|
||||
}
|
||||
|
||||
c = &args->centroids[args->idx];
|
||||
if(c->mean == merge->mean) {
|
||||
c->weight += merge->weight;
|
||||
} else {
|
||||
c->weight += merge->weight;
|
||||
c->mean += (merge->mean - c->mean) * merge->weight / c->weight;
|
||||
|
||||
if (merge->weight > 0) {
|
||||
args->min = TMIN(merge->mean, args->min);
|
||||
args->max = TMAX(merge->mean, args->max);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void tdigestCompress(TDigest *t) {
|
||||
SCentroid *unmerged_centroids;
|
||||
int64_t unmerged_weight = 0;
|
||||
int32_t num_unmerged = t->num_buffered_pts;
|
||||
int32_t i, j;
|
||||
SMergeArgs args;
|
||||
|
||||
if (t->num_buffered_pts <= 0)
|
||||
return;
|
||||
|
||||
unmerged_centroids = (SCentroid*)taosMemoryMalloc(sizeof(SCentroid) * t->num_buffered_pts);
|
||||
for (i = 0; i < num_unmerged; i++) {
|
||||
SPt *p = t->buffered_pts + i;
|
||||
SCentroid *c = &unmerged_centroids[i];
|
||||
c->mean = p->value;
|
||||
c->weight = p->weight;
|
||||
unmerged_weight += c->weight;
|
||||
}
|
||||
t->num_buffered_pts = 0;
|
||||
t->total_weight += unmerged_weight;
|
||||
|
||||
qsort(unmerged_centroids, num_unmerged, sizeof(SCentroid), cmpCentroid);
|
||||
memset(&args, 0, sizeof(SMergeArgs));
|
||||
args.centroids = (SCentroid*)taosMemoryMalloc((size_t)(sizeof(SCentroid) * t->size));
|
||||
memset(args.centroids, 0, (size_t)(sizeof(SCentroid) * t->size));
|
||||
|
||||
args.t = t;
|
||||
args.min = DOUBLE_MAX;
|
||||
args.max = -DOUBLE_MAX;
|
||||
|
||||
i = 0;
|
||||
j = 0;
|
||||
while (i < num_unmerged && j < t->num_centroids) {
|
||||
SCentroid *a = &unmerged_centroids[i];
|
||||
SCentroid *b = &t->centroids[j];
|
||||
|
||||
if (a->mean <= b->mean) {
|
||||
mergeCentroid(&args, a);
|
||||
assert(args.idx < t->size);
|
||||
i++;
|
||||
} else {
|
||||
mergeCentroid(&args, b);
|
||||
assert(args.idx < t->size);
|
||||
j++;
|
||||
}
|
||||
}
|
||||
|
||||
while (i < num_unmerged) {
|
||||
mergeCentroid(&args, &unmerged_centroids[i++]);
|
||||
assert(args.idx < t->size);
|
||||
}
|
||||
taosMemoryFree((void*)unmerged_centroids);
|
||||
|
||||
while (j < t->num_centroids) {
|
||||
mergeCentroid(&args, &t->centroids[j++]);
|
||||
assert(args.idx < t->size);
|
||||
}
|
||||
|
||||
if (t->total_weight > 0) {
|
||||
t->min = TMIN(t->min, args.min);
|
||||
if (args.centroids[args.idx].weight <= 0) {
|
||||
args.idx--;
|
||||
}
|
||||
t->num_centroids = args.idx + 1;
|
||||
t->max = TMAX(t->max, args.max);
|
||||
}
|
||||
|
||||
memcpy(t->centroids, args.centroids, sizeof(SCentroid) * t->num_centroids);
|
||||
taosMemoryFree((void*)args.centroids);
|
||||
}
|
||||
|
||||
void tdigestAdd(TDigest* t, double x, int64_t w) {
|
||||
if (w == 0)
|
||||
return;
|
||||
|
||||
int32_t i = t->num_buffered_pts;
|
||||
if(i > 0 && t->buffered_pts[i-1].value == x ) {
|
||||
t->buffered_pts[i].weight = w;
|
||||
} else {
|
||||
t->buffered_pts[i].value = x;
|
||||
t->buffered_pts[i].weight = w;
|
||||
t->num_buffered_pts++;
|
||||
}
|
||||
|
||||
|
||||
if (t->num_buffered_pts >= t->threshold)
|
||||
tdigestCompress(t);
|
||||
}
|
||||
|
||||
double tdigestCDF(TDigest *t, double x) {
|
||||
if (t == NULL)
|
||||
return 0;
|
||||
|
||||
int32_t i;
|
||||
double left, right;
|
||||
int64_t weight_so_far;
|
||||
SCentroid *a, *b, tmp;
|
||||
|
||||
tdigestCompress(t);
|
||||
if (t->num_centroids == 0)
|
||||
return NAN;
|
||||
if (x < t->min)
|
||||
return 0;
|
||||
if (x > t->max)
|
||||
return 1;
|
||||
if (t->num_centroids == 1) {
|
||||
if (FLOAT_EQ(t->max, t->min))
|
||||
return 0.5;
|
||||
|
||||
return INTERPOLATE(x, t->min, t->max);
|
||||
}
|
||||
|
||||
weight_so_far = 0;
|
||||
a = b = &tmp;
|
||||
b->mean = t->min;
|
||||
b->weight = 0;
|
||||
right = 0;
|
||||
|
||||
for (i = 0; i < t->num_centroids; i++) {
|
||||
SCentroid *c = &t->centroids[i];
|
||||
|
||||
left = b->mean - (a->mean + right);
|
||||
a = b;
|
||||
b = c;
|
||||
right = (b->mean - a->mean) * a->weight / (a->weight + b->weight);
|
||||
|
||||
if (x < a->mean + right) {
|
||||
double cdf = (weight_so_far
|
||||
+ a->weight
|
||||
* INTERPOLATE(x, a->mean - left, a->mean + right))
|
||||
/ t->total_weight;
|
||||
return TMAX(cdf, 0.0);
|
||||
}
|
||||
|
||||
weight_so_far += a->weight;
|
||||
}
|
||||
|
||||
left = b->mean - (a->mean + right);
|
||||
a = b;
|
||||
right = t->max - a->mean;
|
||||
|
||||
if (x < a->mean + right) {
|
||||
return (weight_so_far + a->weight * INTERPOLATE(x, a->mean - left, a->mean + right))
|
||||
/ t->total_weight;
|
||||
}
|
||||
|
||||
return 1;
|
||||
}
|
||||
|
||||
double tdigestQuantile(TDigest *t, double q) {
|
||||
if (t == NULL)
|
||||
return 0;
|
||||
|
||||
int32_t i;
|
||||
double left, right, idx;
|
||||
int64_t weight_so_far;
|
||||
SCentroid *a, *b, tmp;
|
||||
|
||||
tdigestCompress(t);
|
||||
if (t->num_centroids == 0)
|
||||
return NAN;
|
||||
if (t->num_centroids == 1)
|
||||
return t->centroids[0].mean;
|
||||
if (FLOAT_EQ(q, 0.0))
|
||||
return t->min;
|
||||
if (FLOAT_EQ(q, 1.0))
|
||||
return t->max;
|
||||
|
||||
idx = q * t->total_weight;
|
||||
weight_so_far = 0;
|
||||
b = &tmp;
|
||||
b->mean = t->min;
|
||||
b->weight = 0;
|
||||
right = t->min;
|
||||
|
||||
for (i = 0; i < t->num_centroids; i++) {
|
||||
SCentroid *c = &t->centroids[i];
|
||||
a = b;
|
||||
left = right;
|
||||
|
||||
b = c;
|
||||
right = (b->weight * a->mean + a->weight * b->mean)/ (a->weight + b->weight);
|
||||
if (idx < weight_so_far + a->weight) {
|
||||
double p = (idx - weight_so_far) / a->weight;
|
||||
return left * (1 - p) + right * p;
|
||||
}
|
||||
weight_so_far += a->weight;
|
||||
}
|
||||
|
||||
left = right;
|
||||
a = b;
|
||||
right = t->max;
|
||||
|
||||
if (idx < weight_so_far + a->weight && a->weight != 0) {
|
||||
double p = (idx - weight_so_far) / a->weight;
|
||||
return left * (1 - p) + right * p;
|
||||
}
|
||||
|
||||
return t->max;
|
||||
}
|
||||
|
||||
void tdigestMerge(TDigest *t1, TDigest *t2) {
|
||||
// SPoints
|
||||
int32_t num_pts = t2->num_buffered_pts;
|
||||
for(int32_t i = num_pts - 1; i >= 0; i--) {
|
||||
SPt* p = t2->buffered_pts + i;
|
||||
tdigestAdd(t1, p->value, p->weight);
|
||||
t2->num_buffered_pts --;
|
||||
}
|
||||
// centroids
|
||||
for (int32_t i = 0; i < t2->num_centroids; i++) {
|
||||
tdigestAdd(t1, t2->centroids[i].mean, t2->centroids[i].weight);
|
||||
}
|
||||
}
|
|
@ -60,8 +60,7 @@ python3 ./test.py -f 2-query/arcsin.py
|
|||
python3 ./test.py -f 2-query/arccos.py
|
||||
python3 ./test.py -f 2-query/arctan.py
|
||||
python3 ./test.py -f 2-query/query_cols_tags_and_or.py
|
||||
python3 ./test.py -f 2-query/nestedQuery.py
|
||||
|
||||
#python3 ./test.py -f 2-query/nestedQuery.py
|
||||
|
||||
python3 ./test.py -f 7-tmq/basic5.py
|
||||
python3 ./test.py -f 7-tmq/subscribeDb.py
|
||||
|
|
Loading…
Reference in New Issue