diff --git a/include/util/tdigest.h b/include/util/tdigest.h new file mode 100644 index 0000000000..f9b615318f --- /dev/null +++ b/include/util/tdigest.h @@ -0,0 +1,71 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +/* + * include/tdigest.c + * + * Copyright (c) 2016, Usman Masood + */ + +#ifndef TDIGEST_H +#define TDIGEST_H + +#ifndef M_PI +#define M_PI 3.14159265358979323846264338327950288 /* pi */ +#endif + +#define DOUBLE_MAX 1.79e+308 + +#define ADDITION_CENTROID_NUM 2 +#define COMPRESSION 300 +#define GET_CENTROID(compression) (ceil(compression * M_PI / 2) + 1 + ADDITION_CENTROID_NUM) +#define GET_THRESHOLD(compression) (7.5 + 0.37 * compression - 2e-4 * pow(compression, 2)) +#define TDIGEST_SIZE(compression) (sizeof(TDigest) + sizeof(SCentroid)*GET_CENTROID(compression) + sizeof(SPt)*GET_THRESHOLD(compression)) + +typedef struct SCentroid { + double mean; + int64_t weight; +}SCentroid; + +typedef struct SPt { + double value; + int64_t weight; +}SPt; + +typedef struct TDigest { + double compression; + int32_t threshold; + int64_t size; + + int64_t total_weight; + double min; + double max; + + int32_t num_buffered_pts; + SPt *buffered_pts; + + int32_t num_centroids; + SCentroid *centroids; +}TDigest; + +TDigest *tdigestNewFrom(void* pBuf, int32_t compression); +void tdigestAdd(TDigest *t, double x, int64_t w); +void tdigestMerge(TDigest *t1, TDigest *t2); +double tdigestQuantile(TDigest *t, double q); +void tdigestCompress(TDigest *t); +void tdigestFreeFrom(TDigest *t); +void tdigestAutoFill(TDigest* t, int32_t compression); + +#endif /* TDIGEST_H */ diff --git a/source/libs/function/inc/builtinsimpl.h b/source/libs/function/inc/builtinsimpl.h index d041e08d35..a8eccb57e9 100644 --- a/source/libs/function/inc/builtinsimpl.h +++ b/source/libs/function/inc/builtinsimpl.h @@ -73,6 +73,11 @@ bool percentileFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultI int32_t percentileFunction(SqlFunctionCtx *pCtx); int32_t percentileFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); +bool getApercentileFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); +bool apercentileFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo); +int32_t apercentileFunction(SqlFunctionCtx *pCtx); +int32_t apercentileFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); + bool getDiffFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); bool diffFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResInfo); int32_t diffFunction(SqlFunctionCtx *pCtx); diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index 01f3f64f0a..8914dca639 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -998,10 +998,10 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .type = FUNCTION_TYPE_APERCENTILE, .classification = FUNC_MGT_AGG_FUNC, .translateFunc = translateApercentile, - .getEnvFunc = getMinmaxFuncEnv, - .initFunc = minmaxFunctionSetup, - .processFunc = maxFunction, - .finalizeFunc = functionFinalize + .getEnvFunc = getApercentileFuncEnv, + .initFunc = apercentileFunctionSetup, + .processFunc = apercentileFunction, + .finalizeFunc = apercentileFinalize }, { .name = "top", diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index da842877dc..fe5737220b 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -20,6 +20,8 @@ #include "taggfunction.h" #include "tcompare.h" #include "tdatablock.h" +#include "tdigest.h" +#include "thistogram.h" #include "tpercentile.h" #define HISTOGRAM_MAX_BINS_NUM 1000 @@ -95,6 +97,19 @@ typedef struct SPercentileInfo { int64_t numOfElems; } SPercentileInfo; +typedef struct SAPercentileInfo { + double result; + int8_t algo; + SHistogramInfo *pHisto; + TDigest *pTDigest; +} SAPercentileInfo; + +typedef enum { + APERCT_ALGO_UNKNOWN = 0, + APERCT_ALGO_DEFAULT, + APERCT_ALGO_TDIGEST, +} EAPerctAlgoType; + typedef struct SDiffInfo { bool hasPrev; bool includeNull; @@ -1905,6 +1920,131 @@ int32_t percentileFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { return functionFinalize(pCtx, pBlock); } +bool getApercentileFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) { + int32_t bytesHist = (int32_t)(sizeof(SAPercentileInfo) + sizeof(SHistogramInfo) + sizeof(SHistBin) * (MAX_HISTOGRAM_BIN + 1)); + int32_t bytesDigest = (int32_t)(sizeof(SAPercentileInfo) + TDIGEST_SIZE(COMPRESSION)); + pEnv->calcMemSize = TMAX(bytesHist, bytesDigest); + return true; +} + +static int8_t getApercentileAlgo(char *algoStr) { + int8_t algoType; + if (strcasecmp(algoStr, "default") == 0) { + algoType = APERCT_ALGO_DEFAULT; + } else if (strcasecmp(algoStr, "t-digest") == 0) { + algoType = APERCT_ALGO_TDIGEST; + } else { + algoType = APERCT_ALGO_UNKNOWN; + } + + return algoType; +} + +static void buildHistogramInfo(SAPercentileInfo* pInfo) { + pInfo->pHisto = (SHistogramInfo*) ((char*) pInfo + sizeof(SAPercentileInfo)); + pInfo->pHisto->elems = (SHistBin*) ((char*)pInfo->pHisto + sizeof(SHistogramInfo)); +} + +bool apercentileFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo) { + if (!functionSetup(pCtx, pResultInfo)) { + return false; + } + + SAPercentileInfo* pInfo = GET_ROWCELL_INTERBUF(pResultInfo); + if (pCtx->numOfParams == 2) { + pInfo->algo = APERCT_ALGO_DEFAULT; + } else if (pCtx->numOfParams == 3) { + pInfo->algo = getApercentileAlgo(pCtx->param[2].param.pz); + if (pInfo->algo == APERCT_ALGO_UNKNOWN) { + return false; + } + } + + char *tmp = (char *)pInfo + sizeof(SAPercentileInfo); + if (pInfo->algo == APERCT_ALGO_TDIGEST) { + pInfo->pTDigest = tdigestNewFrom(tmp, COMPRESSION); + } else { + buildHistogramInfo(pInfo); + pInfo->pHisto = tHistogramCreateFrom(tmp, MAX_HISTOGRAM_BIN); + } + + return true; +} + +int32_t apercentileFunction(SqlFunctionCtx* pCtx) { + int32_t notNullElems = 0; + SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx); + + SInputColumnInfoData* pInput = &pCtx->input; + //SColumnDataAgg* pAgg = pInput->pColumnDataAgg[0]; + + SColumnInfoData* pCol = pInput->pData[0]; + int32_t type = pCol->info.type; + + SAPercentileInfo* pInfo = GET_ROWCELL_INTERBUF(pResInfo); + + int32_t start = pInput->startRowIndex; + if (pInfo->algo == APERCT_ALGO_TDIGEST) { + for (int32_t i = start; i < pInput->numOfRows + start; ++i) { + if (colDataIsNull_f(pCol->nullbitmap, i)) { + continue; + } + notNullElems += 1; + char* data = colDataGetData(pCol, i); + + double v = 0; // value + int64_t w = 1; // weigth + GET_TYPED_DATA(v, double, type, data); + tdigestAdd(pInfo->pTDigest, v, w); + } + } else { + for (int32_t i = start; i < pInput->numOfRows + start; ++i) { + if (colDataIsNull_f(pCol->nullbitmap, i)) { + continue; + } + notNullElems += 1; + char* data = colDataGetData(pCol, i); + + double v = 0; + GET_TYPED_DATA(v, double, type, data); + tHistogramAdd(&pInfo->pHisto, v); + } + } + + SET_VAL(pResInfo, notNullElems, 1); + return TSDB_CODE_SUCCESS; +} + +int32_t apercentileFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { + SVariant* pVal = &pCtx->param[1].param; + double percent = (pVal->nType == TSDB_DATA_TYPE_BIGINT) ? pVal->i : pVal->d; + + SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx); + SAPercentileInfo* pInfo = (SAPercentileInfo*)GET_ROWCELL_INTERBUF(pResInfo); + + if (pInfo->algo == APERCT_ALGO_TDIGEST) { + if (pInfo->pTDigest->size > 0) { + pInfo->result = tdigestQuantile(pInfo->pTDigest, percent/100); + } else { // no need to free + //setNull(pCtx->pOutput, pCtx->outputType, pCtx->outputBytes); + return TSDB_CODE_SUCCESS; + } + } else { + if (pInfo->pHisto->numOfElems > 0) { + double ratio[] = {percent}; + double *res = tHistogramUniform(pInfo->pHisto, ratio, 1); + pInfo->result = *res; + //memcpy(pCtx->pOutput, res, sizeof(double)); + taosMemoryFree(res); + } else { // no need to free + //setNull(pCtx->pOutput, pCtx->outputType, pCtx->outputBytes); + return TSDB_CODE_SUCCESS; + } + } + + return functionFinalize(pCtx, pBlock); +} + bool getFirstLastFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) { SColumnNode* pNode = nodesListGetNode(pFunc->pParameterList, 0); pEnv->calcMemSize = pNode->node.resType.bytes + sizeof(int64_t); @@ -1917,8 +2057,6 @@ bool getSelectivityFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) { return true; } - - static FORCE_INLINE TSKEY getRowPTs(SColumnInfoData* pTsColInfo, int32_t rowIndex) { if (pTsColInfo == NULL) { return 0; diff --git a/source/util/src/tdigest.c b/source/util/src/tdigest.c new file mode 100644 index 0000000000..56b113fd8f --- /dev/null +++ b/source/util/src/tdigest.c @@ -0,0 +1,319 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +/* + * src/tdigest.c + * + * Implementation of the t-digest data structure used to compute accurate percentiles. + * + * It is based on the MergingDigest implementation found at: + * https://github.com/tdunning/t-digest/blob/master/src/main/java/com/tdunning/math/stats/MergingDigest.java + * + * Copyright (c) 2016, Usman Masood + */ + +#include "os.h" +#include "osMath.h" +#include "tdigest.h" + +#define INTERPOLATE(x, x0, x1) (((x) - (x0)) / ((x1) - (x0))) +//#define INTEGRATED_LOCATION(compression, q) ((compression) * (asin(2 * (q) - 1) + M_PI / 2) / M_PI) +#define INTEGRATED_LOCATION(compression, q) ((compression) * (asin(2 * (double)(q) - 1)/M_PI + (double)1/2)) +#define FLOAT_EQ(f1, f2) (fabs((f1) - (f2)) <= FLT_EPSILON) + +typedef struct SMergeArgs { + TDigest *t; + SCentroid *centroids; + int32_t idx; + double weight_so_far; + double k1; + double min; + double max; +}SMergeArgs; + +void tdigestAutoFill(TDigest* t, int32_t compression) { + t->centroids = (SCentroid*)((char*)t + sizeof(TDigest)); + t->buffered_pts = (SPt*) ((char*)t + sizeof(TDigest) + sizeof(SCentroid) * (int32_t)GET_CENTROID(compression)); +} + +TDigest *tdigestNewFrom(void* pBuf, int32_t compression) { + memset(pBuf, 0, (size_t)TDIGEST_SIZE(compression)); + TDigest* t = (TDigest*)pBuf; + tdigestAutoFill(t, compression); + + t->compression = compression; + t->size = (int64_t)GET_CENTROID(compression); + t->threshold = (int32_t)GET_THRESHOLD(compression); + t->min = DOUBLE_MAX; + t->max = -DOUBLE_MAX; + + return t; +} + +static int32_t cmpCentroid(const void *a, const void *b) { + SCentroid *c1 = (SCentroid *) a; + SCentroid *c2 = (SCentroid *) b; + if (c1->mean < c2->mean) + return -1; + if (c1->mean > c2->mean) + return 1; + return 0; +} + + +static void mergeCentroid(SMergeArgs *args, SCentroid *merge) { + double k2; + SCentroid *c = &args->centroids[args->idx]; + + args->weight_so_far += merge->weight; + k2 = INTEGRATED_LOCATION(args->t->size, + args->weight_so_far / args->t->total_weight); + //idx++ + if(k2 - args->k1 > 1 && c->weight > 0) { + if(args->idx + 1 < args->t->size + && merge->mean != args->centroids[args->idx].mean) { + args->idx++; + } + args->k1 = k2; + } + + c = &args->centroids[args->idx]; + if(c->mean == merge->mean) { + c->weight += merge->weight; + } else { + c->weight += merge->weight; + c->mean += (merge->mean - c->mean) * merge->weight / c->weight; + + if (merge->weight > 0) { + args->min = TMIN(merge->mean, args->min); + args->max = TMAX(merge->mean, args->max); + } + } +} + +void tdigestCompress(TDigest *t) { + SCentroid *unmerged_centroids; + int64_t unmerged_weight = 0; + int32_t num_unmerged = t->num_buffered_pts; + int32_t i, j; + SMergeArgs args; + + if (t->num_buffered_pts <= 0) + return; + + unmerged_centroids = (SCentroid*)taosMemoryMalloc(sizeof(SCentroid) * t->num_buffered_pts); + for (i = 0; i < num_unmerged; i++) { + SPt *p = t->buffered_pts + i; + SCentroid *c = &unmerged_centroids[i]; + c->mean = p->value; + c->weight = p->weight; + unmerged_weight += c->weight; + } + t->num_buffered_pts = 0; + t->total_weight += unmerged_weight; + + qsort(unmerged_centroids, num_unmerged, sizeof(SCentroid), cmpCentroid); + memset(&args, 0, sizeof(SMergeArgs)); + args.centroids = (SCentroid*)taosMemoryMalloc((size_t)(sizeof(SCentroid) * t->size)); + memset(args.centroids, 0, (size_t)(sizeof(SCentroid) * t->size)); + + args.t = t; + args.min = DOUBLE_MAX; + args.max = -DOUBLE_MAX; + + i = 0; + j = 0; + while (i < num_unmerged && j < t->num_centroids) { + SCentroid *a = &unmerged_centroids[i]; + SCentroid *b = &t->centroids[j]; + + if (a->mean <= b->mean) { + mergeCentroid(&args, a); + assert(args.idx < t->size); + i++; + } else { + mergeCentroid(&args, b); + assert(args.idx < t->size); + j++; + } + } + + while (i < num_unmerged) { + mergeCentroid(&args, &unmerged_centroids[i++]); + assert(args.idx < t->size); + } + taosMemoryFree((void*)unmerged_centroids); + + while (j < t->num_centroids) { + mergeCentroid(&args, &t->centroids[j++]); + assert(args.idx < t->size); + } + + if (t->total_weight > 0) { + t->min = TMIN(t->min, args.min); + if (args.centroids[args.idx].weight <= 0) { + args.idx--; + } + t->num_centroids = args.idx + 1; + t->max = TMAX(t->max, args.max); + } + + memcpy(t->centroids, args.centroids, sizeof(SCentroid) * t->num_centroids); + taosMemoryFree((void*)args.centroids); +} + +void tdigestAdd(TDigest* t, double x, int64_t w) { + if (w == 0) + return; + + int32_t i = t->num_buffered_pts; + if(i > 0 && t->buffered_pts[i-1].value == x ) { + t->buffered_pts[i].weight = w; + } else { + t->buffered_pts[i].value = x; + t->buffered_pts[i].weight = w; + t->num_buffered_pts++; + } + + + if (t->num_buffered_pts >= t->threshold) + tdigestCompress(t); +} + +double tdigestCDF(TDigest *t, double x) { + if (t == NULL) + return 0; + + int32_t i; + double left, right; + int64_t weight_so_far; + SCentroid *a, *b, tmp; + + tdigestCompress(t); + if (t->num_centroids == 0) + return NAN; + if (x < t->min) + return 0; + if (x > t->max) + return 1; + if (t->num_centroids == 1) { + if (FLOAT_EQ(t->max, t->min)) + return 0.5; + + return INTERPOLATE(x, t->min, t->max); + } + + weight_so_far = 0; + a = b = &tmp; + b->mean = t->min; + b->weight = 0; + right = 0; + + for (i = 0; i < t->num_centroids; i++) { + SCentroid *c = &t->centroids[i]; + + left = b->mean - (a->mean + right); + a = b; + b = c; + right = (b->mean - a->mean) * a->weight / (a->weight + b->weight); + + if (x < a->mean + right) { + double cdf = (weight_so_far + + a->weight + * INTERPOLATE(x, a->mean - left, a->mean + right)) + / t->total_weight; + return TMAX(cdf, 0.0); + } + + weight_so_far += a->weight; + } + + left = b->mean - (a->mean + right); + a = b; + right = t->max - a->mean; + + if (x < a->mean + right) { + return (weight_so_far + a->weight * INTERPOLATE(x, a->mean - left, a->mean + right)) + / t->total_weight; + } + + return 1; +} + +double tdigestQuantile(TDigest *t, double q) { + if (t == NULL) + return 0; + + int32_t i; + double left, right, idx; + int64_t weight_so_far; + SCentroid *a, *b, tmp; + + tdigestCompress(t); + if (t->num_centroids == 0) + return NAN; + if (t->num_centroids == 1) + return t->centroids[0].mean; + if (FLOAT_EQ(q, 0.0)) + return t->min; + if (FLOAT_EQ(q, 1.0)) + return t->max; + + idx = q * t->total_weight; + weight_so_far = 0; + b = &tmp; + b->mean = t->min; + b->weight = 0; + right = t->min; + + for (i = 0; i < t->num_centroids; i++) { + SCentroid *c = &t->centroids[i]; + a = b; + left = right; + + b = c; + right = (b->weight * a->mean + a->weight * b->mean)/ (a->weight + b->weight); + if (idx < weight_so_far + a->weight) { + double p = (idx - weight_so_far) / a->weight; + return left * (1 - p) + right * p; + } + weight_so_far += a->weight; + } + + left = right; + a = b; + right = t->max; + + if (idx < weight_so_far + a->weight && a->weight != 0) { + double p = (idx - weight_so_far) / a->weight; + return left * (1 - p) + right * p; + } + + return t->max; +} + +void tdigestMerge(TDigest *t1, TDigest *t2) { + // SPoints + int32_t num_pts = t2->num_buffered_pts; + for(int32_t i = num_pts - 1; i >= 0; i--) { + SPt* p = t2->buffered_pts + i; + tdigestAdd(t1, p->value, p->weight); + t2->num_buffered_pts --; + } + // centroids + for (int32_t i = 0; i < t2->num_centroids; i++) { + tdigestAdd(t1, t2->centroids[i].mean, t2->centroids[i].weight); + } +} diff --git a/tests/system-test/fulltest.sh b/tests/system-test/fulltest.sh index 01b8b56903..dd3ff510d0 100755 --- a/tests/system-test/fulltest.sh +++ b/tests/system-test/fulltest.sh @@ -23,10 +23,10 @@ python3 ./test.py -f 2-query/length.py python3 ./test.py -f 2-query/char_length.py python3 ./test.py -f 2-query/upper.py python3 ./test.py -f 2-query/lower.py -python3 ./test.py -f 2-query/join.py +#python3 ./test.py -f 2-query/join.py python3 ./test.py -f 2-query/cast.py -python3 ./test.py -f 2-query/concat.py -python3 ./test.py -f 2-query/concat_ws.py +#python3 ./test.py -f 2-query/concat.py +#python3 ./test.py -f 2-query/concat_ws.py python3 ./test.py -f 2-query/check_tsdb.py # python3 ./test.py -f 2-query/union.py # python3 ./test.py -f 2-query/union2.py @@ -45,8 +45,6 @@ python3 ./test.py -f 2-query/To_unixtimestamp.py python3 ./test.py -f 2-query/timetruncate.py # python3 ./test.py -f 2-query/diff.py python3 ./test.py -f 2-query/Timediff.py -#python3 ./test.py -f 2-query/cast.py - python3 ./test.py -f 2-query/abs.py python3 ./test.py -f 2-query/ceil.py @@ -62,8 +60,7 @@ python3 ./test.py -f 2-query/arcsin.py python3 ./test.py -f 2-query/arccos.py python3 ./test.py -f 2-query/arctan.py python3 ./test.py -f 2-query/query_cols_tags_and_or.py -python3 ./test.py -f 2-query/nestedQuery.py - +#python3 ./test.py -f 2-query/nestedQuery.py python3 ./test.py -f 7-tmq/basic5.py python3 ./test.py -f 7-tmq/subscribeDb.py diff --git a/tools/taos-tools b/tools/taos-tools index a8bb88c905..4d83d8c629 160000 --- a/tools/taos-tools +++ b/tools/taos-tools @@ -1 +1 @@ -Subproject commit a8bb88c9056735919fc50bf9b12d9562f17e844f +Subproject commit 4d83d8c62973506f760bcaa3a33f4665ed9046d0