From 9dcf9248d7ce534ccb95aacf020b618e00ae68ff Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Mon, 16 May 2022 22:43:06 +0800 Subject: [PATCH 1/5] feat(query): add HYPERLOGLOG function --- source/libs/function/src/builtinsimpl.c | 147 +++++++++++++++++++++++- 1 file changed, 146 insertions(+), 1 deletion(-) diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index 479a11ca4a..556015a2ac 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -26,7 +26,13 @@ #define MAVG_MAX_POINTS_NUM 1000 #define SAMPLE_MAX_POINTS_NUM 1000 #define TAIL_MAX_POINTS_NUM 100 -#define TAIL_MAX_OFFSET 100 +#define TAIL_MAX_OFFSET 10 + +#define HLL_BUCKET_BITS 14 // The bits of the bucket +#define HLL_DATA_BITS (64-HLL_BUCKET_BITS) +#define HLL_BUCKETS (1<numOfRes; } +bool getHLLFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) { + pEnv->calcMemSize = sizeof(SHLLInfo); + return true; +} + +static uint8_t hllCountNum(void* data, int32_t bytes, int32_t *buk) { + uint64_t hash = MurmurHash3_64(data, bytes); + int32_t index = hash & HLL_BUCKET_MASK; + hash >>= HLL_BUCKET_BITS; + hash |= ((uint64_t)1 << HLL_DATA_BITS); + uint64_t bit = 1; + uint8_t count = 1; + while((hash & bit) == 0) { + count++; + bit <<= 1; + } + *buk = index; + return count; +} + +static void hllBucketHisto(uint8_t *buckets, int32_t* bucketHisto) { + uint64_t *word = (uint64_t*) buckets; + uint8_t *bytes; + + for (int32_t j = 0; j < HLL_BUCKETS>>3; j++) { + if (*word == 0) { + bucketHisto[0] += 8; + } else { + bytes = (uint8_t*) word; + bucketHisto[bytes[0]]++; + bucketHisto[bytes[1]]++; + bucketHisto[bytes[2]]++; + bucketHisto[bytes[3]]++; + bucketHisto[bytes[4]]++; + bucketHisto[bytes[5]]++; + bucketHisto[bytes[6]]++; + bucketHisto[bytes[7]]++; + } + word++; + } +} +static double hllTau(double x) { + if (x == 0. || x == 1.) return 0.; + double zPrime; + double y = 1.0; + double z = 1 - x; + do { + x = sqrt(x); + zPrime = z; + y *= 0.5; + z -= pow(1 - x, 2)*y; + } while(zPrime != z); + return z / 3; +} + +static double hllSigma(double x) { + if (x == 1.0) return INFINITY; + double zPrime; + double y = 1; + double z = x; + do { + x *= x; + zPrime = z; + z += x * y; + y += y; + } while(zPrime != z); + return z; +} + +// estimate the cardinality, the algorithm refer this paper: "New cardinality estimation algorithms for HyperLogLog sketches" +static uint64_t hllCountCnt(uint8_t *buckets) { + double m = HLL_BUCKETS; + int32_t buckethisto[64] = {0}; + hllBucketHisto(buckets,buckethisto); + + double z = m * hllTau((m-buckethisto[HLL_DATA_BITS+1])/(double)m); + for (int j = HLL_DATA_BITS; j >= 1; --j) { + z += buckethisto[j]; + z *= 0.5; + } + z += m * hllSigma(buckethisto[0]/(double)m); + double E = (double)llroundl(HLL_ALPHA_INF*m*m/z); + + return (uint64_t) E; +} + + +int32_t hllFunction(SqlFunctionCtx *pCtx) { + SHLLInfo* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); + + SInputColumnInfoData* pInput = &pCtx->input; + SColumnInfoData* pCol = pInput->pData[0]; + + int32_t type = pCol->info.type; + int32_t bytes = pCol->info.bytes; + + int32_t start = pInput->startRowIndex; + int32_t numOfRows = pInput->numOfRows; + + int32_t numOfElems = 0; + for (int32_t i = start; i < numOfRows + start; ++i) { + if (pCol->hasNull && colDataIsNull_s(pCol, i)) { + continue; + } + + numOfElems++; + + char* data = colDataGetData(pCol, i); + if (IS_VAR_DATA_TYPE(type)) { + data = varDataVal(data); + bytes -= VARSTR_HEADER_SIZE; + } + + int32_t index = 0; + uint8_t count = hllCountNum(data, bytes, &index); + uint8_t oldcount = pInfo->buckets[index]; + if (count > oldcount) { + pInfo->buckets[index] = count; + } + + } + + SET_VAL(GET_RES_INFO(pCtx), numOfElems, 1); + return TSDB_CODE_SUCCESS; +} + +int32_t hllFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { + SHLLInfo* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); + + pInfo->result = hllCountCnt(pInfo->buckets); + + return functionFinalize(pCtx, pBlock); +} + bool getStateFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) { pEnv->calcMemSize = sizeof(SStateInfo); return true; From 267ed293d7ba3b20770e71c17fdb7add26187b75 Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Mon, 16 May 2022 22:43:06 +0800 Subject: [PATCH 2/5] feat(query): add HYPERLOGLOG function --- .gitmodules | 6 +++--- include/libs/function/functionMgt.h | 1 + source/libs/function/inc/builtinsimpl.h | 4 ++++ source/libs/function/src/builtins.c | 20 ++++++++++++++++++++ source/libs/function/src/builtinsimpl.c | 2 +- tools/taos-tools | 2 +- 6 files changed, 30 insertions(+), 5 deletions(-) diff --git a/.gitmodules b/.gitmodules index bc38453f19..9419dcd100 100644 --- a/.gitmodules +++ b/.gitmodules @@ -13,6 +13,6 @@ [submodule "examples/rust"] path = examples/rust url = https://github.com/songtianyi/tdengine-rust-bindings.git -[submodule "tools/taos-tools"] - path = tools/taos-tools - url = https://github.com/taosdata/taos-tools +#[submodule "tools/taos-tools"] + #path = tools/taos-tools + #url = https://github.com/taosdata/taos-tools diff --git a/include/libs/function/functionMgt.h b/include/libs/function/functionMgt.h index 2b58ed7c0b..aec1476663 100644 --- a/include/libs/function/functionMgt.h +++ b/include/libs/function/functionMgt.h @@ -41,6 +41,7 @@ typedef enum EFunctionType { FUNCTION_TYPE_SUM, FUNCTION_TYPE_TWA, FUNCTION_TYPE_HISTOGRAM, + FUNCTION_TYPE_HYPERLOGLOG, // nonstandard SQL function FUNCTION_TYPE_BOTTOM = 500, diff --git a/source/libs/function/inc/builtinsimpl.h b/source/libs/function/inc/builtinsimpl.h index c25d74911c..99313675a5 100644 --- a/source/libs/function/inc/builtinsimpl.h +++ b/source/libs/function/inc/builtinsimpl.h @@ -90,6 +90,10 @@ bool histogramFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultIn int32_t histogramFunction(SqlFunctionCtx* pCtx); int32_t histogramFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); +bool getHLLFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); +int32_t hllFunction(SqlFunctionCtx* pCtx); +int32_t hllFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); + bool getStateFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); bool stateFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo); int32_t stateCountFunction(SqlFunctionCtx* pCtx); diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index e41e3c7c39..cc50348397 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -263,6 +263,16 @@ static int32_t translateHistogram(SFunctionNode* pFunc, char* pErrBuf, int32_t l return TSDB_CODE_SUCCESS; } +static int32_t translateHLL(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { + if (1 != LIST_LENGTH(pFunc->pParameterList)) { + return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName); + } + + pFunc->node.resType = (SDataType){.bytes = tDataTypes[TSDB_DATA_TYPE_BIGINT].bytes, .type = TSDB_DATA_TYPE_BIGINT}; + return TSDB_CODE_SUCCESS; +} + + static int32_t translateStateCount(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { if (3 != LIST_LENGTH(pFunc->pParameterList)) { return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName); @@ -829,6 +839,16 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .processFunc = histogramFunction, .finalizeFunc = histogramFinalize }, + { + .name = "hyperloglog", + .type = FUNCTION_TYPE_HYPERLOGLOG, + .classification = FUNC_MGT_AGG_FUNC, + .translateFunc = translateHLL, + .getEnvFunc = getHLLFuncEnv, + .initFunc = functionSetup, + .processFunc = hllFunction, + .finalizeFunc = hllFinalize + }, { .name = "state_count", .type = FUNCTION_TYPE_STATE_COUNT, diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index 556015a2ac..de63567011 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -2745,7 +2745,7 @@ bool getHLLFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) { return true; } -static uint8_t hllCountNum(void* data, int32_t bytes, int32_t *buk) { +static uint8_t hllCountNum(char *data, int32_t bytes, int32_t *buk) { uint64_t hash = MurmurHash3_64(data, bytes); int32_t index = hash & HLL_BUCKET_MASK; hash >>= HLL_BUCKET_BITS; diff --git a/tools/taos-tools b/tools/taos-tools index 0aad27d725..2f3dfddd4d 160000 --- a/tools/taos-tools +++ b/tools/taos-tools @@ -1 +1 @@ -Subproject commit 0aad27d725f4ee6b18daf1db0c07d933aed16eea +Subproject commit 2f3dfddd4d9a869e706ba3cf98fb6d769404cd7c From 67e93ef90c3b4ffdca482c0cba0bffc7c71c9b97 Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Tue, 17 May 2022 10:33:58 +0800 Subject: [PATCH 3/5] Revert "feat(query): add HYPERLOGLOG function" This reverts commit 267ed293d7ba3b20770e71c17fdb7add26187b75. --- .gitmodules | 6 +++--- include/libs/function/functionMgt.h | 1 - source/libs/function/inc/builtinsimpl.h | 4 ---- source/libs/function/src/builtins.c | 20 -------------------- source/libs/function/src/builtinsimpl.c | 2 +- tools/taos-tools | 2 +- 6 files changed, 5 insertions(+), 30 deletions(-) diff --git a/.gitmodules b/.gitmodules index 9419dcd100..bc38453f19 100644 --- a/.gitmodules +++ b/.gitmodules @@ -13,6 +13,6 @@ [submodule "examples/rust"] path = examples/rust url = https://github.com/songtianyi/tdengine-rust-bindings.git -#[submodule "tools/taos-tools"] - #path = tools/taos-tools - #url = https://github.com/taosdata/taos-tools +[submodule "tools/taos-tools"] + path = tools/taos-tools + url = https://github.com/taosdata/taos-tools diff --git a/include/libs/function/functionMgt.h b/include/libs/function/functionMgt.h index aec1476663..2b58ed7c0b 100644 --- a/include/libs/function/functionMgt.h +++ b/include/libs/function/functionMgt.h @@ -41,7 +41,6 @@ typedef enum EFunctionType { FUNCTION_TYPE_SUM, FUNCTION_TYPE_TWA, FUNCTION_TYPE_HISTOGRAM, - FUNCTION_TYPE_HYPERLOGLOG, // nonstandard SQL function FUNCTION_TYPE_BOTTOM = 500, diff --git a/source/libs/function/inc/builtinsimpl.h b/source/libs/function/inc/builtinsimpl.h index 99313675a5..c25d74911c 100644 --- a/source/libs/function/inc/builtinsimpl.h +++ b/source/libs/function/inc/builtinsimpl.h @@ -90,10 +90,6 @@ bool histogramFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultIn int32_t histogramFunction(SqlFunctionCtx* pCtx); int32_t histogramFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); -bool getHLLFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); -int32_t hllFunction(SqlFunctionCtx* pCtx); -int32_t hllFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); - bool getStateFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); bool stateFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo); int32_t stateCountFunction(SqlFunctionCtx* pCtx); diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index cc50348397..e41e3c7c39 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -263,16 +263,6 @@ static int32_t translateHistogram(SFunctionNode* pFunc, char* pErrBuf, int32_t l return TSDB_CODE_SUCCESS; } -static int32_t translateHLL(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { - if (1 != LIST_LENGTH(pFunc->pParameterList)) { - return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName); - } - - pFunc->node.resType = (SDataType){.bytes = tDataTypes[TSDB_DATA_TYPE_BIGINT].bytes, .type = TSDB_DATA_TYPE_BIGINT}; - return TSDB_CODE_SUCCESS; -} - - static int32_t translateStateCount(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { if (3 != LIST_LENGTH(pFunc->pParameterList)) { return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName); @@ -839,16 +829,6 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .processFunc = histogramFunction, .finalizeFunc = histogramFinalize }, - { - .name = "hyperloglog", - .type = FUNCTION_TYPE_HYPERLOGLOG, - .classification = FUNC_MGT_AGG_FUNC, - .translateFunc = translateHLL, - .getEnvFunc = getHLLFuncEnv, - .initFunc = functionSetup, - .processFunc = hllFunction, - .finalizeFunc = hllFinalize - }, { .name = "state_count", .type = FUNCTION_TYPE_STATE_COUNT, diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index de63567011..556015a2ac 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -2745,7 +2745,7 @@ bool getHLLFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) { return true; } -static uint8_t hllCountNum(char *data, int32_t bytes, int32_t *buk) { +static uint8_t hllCountNum(void* data, int32_t bytes, int32_t *buk) { uint64_t hash = MurmurHash3_64(data, bytes); int32_t index = hash & HLL_BUCKET_MASK; hash >>= HLL_BUCKET_BITS; diff --git a/tools/taos-tools b/tools/taos-tools index 2f3dfddd4d..0aad27d725 160000 --- a/tools/taos-tools +++ b/tools/taos-tools @@ -1 +1 @@ -Subproject commit 2f3dfddd4d9a869e706ba3cf98fb6d769404cd7c +Subproject commit 0aad27d725f4ee6b18daf1db0c07d933aed16eea From 143c50dde3d511e10a7100c7624dc8de10a2d5e2 Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Tue, 17 May 2022 10:34:59 +0800 Subject: [PATCH 4/5] feat(query): add hll function --- include/util/thash.h | 1 + source/util/src/thashutil.c | 88 ++++++++++++++++++++++++++----------- 2 files changed, 63 insertions(+), 26 deletions(-) diff --git a/include/util/thash.h b/include/util/thash.h index f2ef445777..fc8785a8fb 100644 --- a/include/util/thash.h +++ b/include/util/thash.h @@ -40,6 +40,7 @@ typedef void (*_hash_free_fn_t)(void *); */ uint32_t MurmurHash3_32(const char *key, uint32_t len); +uint64_t MurmurHash3_64(const char *key, uint32_t len); /** * * @param key diff --git a/source/util/src/thashutil.c b/source/util/src/thashutil.c index d5182cb892..c2382550a6 100644 --- a/source/util/src/thashutil.c +++ b/source/util/src/thashutil.c @@ -30,7 +30,7 @@ (h) ^= (h) >> 13; \ (h) *= 0xc2b2ae35; \ (h) ^= (h) >> 16; } while (0) - + uint32_t MurmurHash3_32(const char *key, uint32_t len) { const uint8_t *data = (const uint8_t *)key; const int32_t nblocks = len >> 2u; @@ -78,18 +78,54 @@ uint32_t MurmurHash3_32(const char *key, uint32_t len) { return h1; } +uint64_t MurmurHash3_64(const char *key, uint32_t len) { + const uint64_t m = 0x87c37b91114253d5; + const int r = 47; + uint32_t seed = 0x12345678; + uint64_t h = seed ^ (len * m); + const uint8_t *data = (const uint8_t *)key; + const uint8_t *end = data + (len-(len&7)); + + while(data != end) { + uint64_t k = *((uint64_t*)data); + + k *= m; + k ^= k >> r; + k *= m; + h ^= k; + h *= m; + data += 8; + } + + switch(len & 7) { + case 7: h ^= (uint64_t)data[6] << 48; /* fall-thru */ + case 6: h ^= (uint64_t)data[5] << 40; /* fall-thru */ + case 5: h ^= (uint64_t)data[4] << 32; /* fall-thru */ + case 4: h ^= (uint64_t)data[3] << 24; /* fall-thru */ + case 3: h ^= (uint64_t)data[2] << 16; /* fall-thru */ + case 2: h ^= (uint64_t)data[1] << 8; /* fall-thru */ + case 1: h ^= (uint64_t)data[0]; + h *= m; /* fall-thru */ + }; + + h ^= h >> r; + h *= m; + h ^= h >> r; + return h; +} + uint32_t taosIntHash_32(const char *key, uint32_t UNUSED_PARAM(len)) { return *(uint32_t *)key; } uint32_t taosIntHash_16(const char *key, uint32_t UNUSED_PARAM(len)) { return *(uint16_t *)key; } uint32_t taosIntHash_8(const char *key, uint32_t UNUSED_PARAM(len)) { return *(uint8_t *)key; } uint32_t taosFloatHash(const char *key, uint32_t UNUSED_PARAM(len)) { - float f = GET_FLOAT_VAL(key); + float f = GET_FLOAT_VAL(key); if (isnan(f)) { return 0x7fc00000; } - + if (FLT_EQUAL(f, 0.0)) { return 0; - } + } if (fabs(f) < FLT_MAX/BASE - DLT) { int32_t t = (int32_t)(round(BASE * (f + DLT))); return (uint32_t)t; @@ -98,27 +134,27 @@ uint32_t taosFloatHash(const char *key, uint32_t UNUSED_PARAM(len)) { } } uint32_t taosDoubleHash(const char *key, uint32_t UNUSED_PARAM(len)) { - double f = GET_DOUBLE_VAL(key); + double f = GET_DOUBLE_VAL(key); if (isnan(f)) { return 0x7fc00000; } if (FLT_EQUAL(f, 0.0)) { return 0; - } + } if (fabs(f) < DBL_MAX/BASE - DLT) { int32_t t = (int32_t)(round(BASE * (f + DLT))); return (uint32_t)t; } else { return 0x7fc00000; - } + } } uint32_t taosIntHash_64(const char *key, uint32_t UNUSED_PARAM(len)) { uint64_t val = *(uint64_t *)key; uint64_t hash = val >> 16U; hash += (val & 0xFFFFU); - + return (uint32_t)hash; } @@ -127,39 +163,39 @@ _hash_fn_t taosGetDefaultHashFunction(int32_t type) { switch(type) { case TSDB_DATA_TYPE_TIMESTAMP: case TSDB_DATA_TYPE_UBIGINT: - case TSDB_DATA_TYPE_BIGINT: + case TSDB_DATA_TYPE_BIGINT: fn = taosIntHash_64; break; - case TSDB_DATA_TYPE_BINARY: + case TSDB_DATA_TYPE_BINARY: fn = MurmurHash3_32; break; - case TSDB_DATA_TYPE_NCHAR: + case TSDB_DATA_TYPE_NCHAR: fn = MurmurHash3_32; break; case TSDB_DATA_TYPE_UINT: - case TSDB_DATA_TYPE_INT: - fn = taosIntHash_32; + case TSDB_DATA_TYPE_INT: + fn = taosIntHash_32; break; - case TSDB_DATA_TYPE_SMALLINT: - case TSDB_DATA_TYPE_USMALLINT: - fn = taosIntHash_16; + case TSDB_DATA_TYPE_SMALLINT: + case TSDB_DATA_TYPE_USMALLINT: + fn = taosIntHash_16; break; case TSDB_DATA_TYPE_BOOL: case TSDB_DATA_TYPE_UTINYINT: - case TSDB_DATA_TYPE_TINYINT: - fn = taosIntHash_8; + case TSDB_DATA_TYPE_TINYINT: + fn = taosIntHash_8; break; - case TSDB_DATA_TYPE_FLOAT: - fn = taosFloatHash; - break; - case TSDB_DATA_TYPE_DOUBLE: - fn = taosDoubleHash; - break; - default: + case TSDB_DATA_TYPE_FLOAT: + fn = taosFloatHash; + break; + case TSDB_DATA_TYPE_DOUBLE: + fn = taosDoubleHash; + break; + default: fn = taosIntHash_32; break; } - + return fn; } From 1eb59a2fedc3a960c6478cba1865924311f54938 Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Tue, 17 May 2022 10:34:59 +0800 Subject: [PATCH 5/5] feat(query): add hll function --- include/libs/function/functionMgt.h | 1 + source/libs/function/inc/builtinsimpl.h | 4 ++++ source/libs/function/src/builtins.c | 25 +++++++++++++++++++++++++ source/libs/function/src/builtinsimpl.c | 4 ++-- 4 files changed, 32 insertions(+), 2 deletions(-) diff --git a/include/libs/function/functionMgt.h b/include/libs/function/functionMgt.h index 2b58ed7c0b..aec1476663 100644 --- a/include/libs/function/functionMgt.h +++ b/include/libs/function/functionMgt.h @@ -41,6 +41,7 @@ typedef enum EFunctionType { FUNCTION_TYPE_SUM, FUNCTION_TYPE_TWA, FUNCTION_TYPE_HISTOGRAM, + FUNCTION_TYPE_HYPERLOGLOG, // nonstandard SQL function FUNCTION_TYPE_BOTTOM = 500, diff --git a/source/libs/function/inc/builtinsimpl.h b/source/libs/function/inc/builtinsimpl.h index c25d74911c..99313675a5 100644 --- a/source/libs/function/inc/builtinsimpl.h +++ b/source/libs/function/inc/builtinsimpl.h @@ -90,6 +90,10 @@ bool histogramFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultIn int32_t histogramFunction(SqlFunctionCtx* pCtx); int32_t histogramFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); +bool getHLLFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); +int32_t hllFunction(SqlFunctionCtx* pCtx); +int32_t hllFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); + bool getStateFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); bool stateFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo); int32_t stateCountFunction(SqlFunctionCtx* pCtx); diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index e41e3c7c39..48165fdd99 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -263,6 +263,21 @@ static int32_t translateHistogram(SFunctionNode* pFunc, char* pErrBuf, int32_t l return TSDB_CODE_SUCCESS; } +static int32_t translateHLL(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { + if (1 != LIST_LENGTH(pFunc->pParameterList)) { + return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName); + } + + SNode* pPara = nodesListGetNode(pFunc->pParameterList, 0); + if (QUERY_NODE_COLUMN != nodeType(pPara)) { + return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR, + "The input parameter of HYPERLOGLOG function can only be column"); + } + + pFunc->node.resType = (SDataType){.bytes = tDataTypes[TSDB_DATA_TYPE_UBIGINT].bytes, .type = TSDB_DATA_TYPE_UBIGINT}; + return TSDB_CODE_SUCCESS; +} + static int32_t translateStateCount(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { if (3 != LIST_LENGTH(pFunc->pParameterList)) { return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName); @@ -829,6 +844,16 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .processFunc = histogramFunction, .finalizeFunc = histogramFinalize }, + { + .name = "hyperloglog", + .type = FUNCTION_TYPE_HYPERLOGLOG, + .classification = FUNC_MGT_AGG_FUNC, + .translateFunc = translateHLL, + .getEnvFunc = getHLLFuncEnv, + .initFunc = functionSetup, + .processFunc = hllFunction, + .finalizeFunc = hllFinalize + }, { .name = "state_count", .type = FUNCTION_TYPE_STATE_COUNT, diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index 556015a2ac..7ff3b6fb05 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -26,7 +26,7 @@ #define MAVG_MAX_POINTS_NUM 1000 #define SAMPLE_MAX_POINTS_NUM 1000 #define TAIL_MAX_POINTS_NUM 100 -#define TAIL_MAX_OFFSET 10 +#define TAIL_MAX_OFFSET 100 #define HLL_BUCKET_BITS 14 // The bits of the bucket #define HLL_DATA_BITS (64-HLL_BUCKET_BITS) @@ -2849,8 +2849,8 @@ int32_t hllFunction(SqlFunctionCtx *pCtx) { char* data = colDataGetData(pCol, i); if (IS_VAR_DATA_TYPE(type)) { + bytes = varDataLen(data); data = varDataVal(data); - bytes -= VARSTR_HEADER_SIZE; } int32_t index = 0;