enh(query): the results of count and hyperloglog is configurable.
This commit is contained in:
parent
9cea8dbaf7
commit
c06316537e
|
@ -45,6 +45,8 @@ extern bool tsEnableSlaveQuery;
|
||||||
extern bool tsPrintAuth;
|
extern bool tsPrintAuth;
|
||||||
extern int64_t tsTickPerMin[3];
|
extern int64_t tsTickPerMin[3];
|
||||||
|
|
||||||
|
extern int32_t tsCountAlwaysReturnValue;
|
||||||
|
|
||||||
// multi-process
|
// multi-process
|
||||||
extern int32_t tsMultiProcess;
|
extern int32_t tsMultiProcess;
|
||||||
extern int32_t tsMnodeShmSize;
|
extern int32_t tsMnodeShmSize;
|
||||||
|
@ -102,7 +104,6 @@ extern int32_t tsMaxStreamComputDelay;
|
||||||
extern int32_t tsStreamCompStartDelay;
|
extern int32_t tsStreamCompStartDelay;
|
||||||
extern int32_t tsRetryStreamCompDelay;
|
extern int32_t tsRetryStreamCompDelay;
|
||||||
extern float tsStreamComputDelayRatio; // the delayed computing ration of the whole time window
|
extern float tsStreamComputDelayRatio; // the delayed computing ration of the whole time window
|
||||||
extern int32_t tsProjectExecInterval;
|
|
||||||
extern int64_t tsMaxRetentWindow;
|
extern int64_t tsMaxRetentWindow;
|
||||||
|
|
||||||
// build info
|
// build info
|
||||||
|
|
|
@ -109,8 +109,11 @@ int32_t tsCompressColData = -1;
|
||||||
*/
|
*/
|
||||||
int32_t tsCompatibleModel = 1;
|
int32_t tsCompatibleModel = 1;
|
||||||
|
|
||||||
|
// count/hyperloglog function always return values in case of all NULL data or Empty data set.
|
||||||
|
int32_t tsCountAlwaysReturnValue = 1;
|
||||||
|
|
||||||
// 10 ms for sliding time, the value will changed in case of time precision changed
|
// 10 ms for sliding time, the value will changed in case of time precision changed
|
||||||
int32_t tsMinSlidingTime = 10;
|
int32_t tsMinSlidingTime = 10;
|
||||||
|
|
||||||
// the maxinum number of distict query result
|
// the maxinum number of distict query result
|
||||||
int32_t tsMaxNumOfDistinctResults = 1000 * 10000;
|
int32_t tsMaxNumOfDistinctResults = 1000 * 10000;
|
||||||
|
@ -130,7 +133,6 @@ int32_t tsRetryStreamCompDelay = 10 * 1000;
|
||||||
// The delayed computing ration. 10% of the whole computing time window by default.
|
// The delayed computing ration. 10% of the whole computing time window by default.
|
||||||
float tsStreamComputDelayRatio = 0.1f;
|
float tsStreamComputDelayRatio = 0.1f;
|
||||||
|
|
||||||
int32_t tsProjectExecInterval = 10000; // every 10sec, the projection will be executed once
|
|
||||||
int64_t tsMaxRetentWindow = 24 * 3600L; // maximum time window tolerance
|
int64_t tsMaxRetentWindow = 24 * 3600L; // maximum time window tolerance
|
||||||
|
|
||||||
// the maximum allowed query buffer size during query processing for each data node.
|
// the maximum allowed query buffer size during query processing for each data node.
|
||||||
|
@ -374,6 +376,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
|
||||||
if (cfgAddInt32(pCfg, "minSlidingTime", tsMinSlidingTime, 10, 1000000, 0) != 0) return -1;
|
if (cfgAddInt32(pCfg, "minSlidingTime", tsMinSlidingTime, 10, 1000000, 0) != 0) return -1;
|
||||||
if (cfgAddInt32(pCfg, "minIntervalTime", tsMinIntervalTime, 1, 1000000, 0) != 0) return -1;
|
if (cfgAddInt32(pCfg, "minIntervalTime", tsMinIntervalTime, 1, 1000000, 0) != 0) return -1;
|
||||||
if (cfgAddInt32(pCfg, "maxNumOfDistinctRes", tsMaxNumOfDistinctResults, 10 * 10000, 10000 * 10000, 0) != 0) return -1;
|
if (cfgAddInt32(pCfg, "maxNumOfDistinctRes", tsMaxNumOfDistinctResults, 10 * 10000, 10000 * 10000, 0) != 0) return -1;
|
||||||
|
if (cfgAddInt32(pCfg, "countAlwaysReturnValue", tsCountAlwaysReturnValue, 0, 1, 0) != 0) return -1;
|
||||||
if (cfgAddInt32(pCfg, "maxStreamCompDelay", tsMaxStreamComputDelay, 10, 1000000000, 0) != 0) return -1;
|
if (cfgAddInt32(pCfg, "maxStreamCompDelay", tsMaxStreamComputDelay, 10, 1000000000, 0) != 0) return -1;
|
||||||
if (cfgAddInt32(pCfg, "maxFirstStreamCompDelay", tsStreamCompStartDelay, 1000, 1000000000, 0) != 0) return -1;
|
if (cfgAddInt32(pCfg, "maxFirstStreamCompDelay", tsStreamCompStartDelay, 1000, 1000000000, 0) != 0) return -1;
|
||||||
if (cfgAddInt32(pCfg, "retryStreamCompDelay", tsRetryStreamCompDelay, 10, 1000000000, 0) != 0) return -1;
|
if (cfgAddInt32(pCfg, "retryStreamCompDelay", tsRetryStreamCompDelay, 10, 1000000000, 0) != 0) return -1;
|
||||||
|
@ -567,6 +570,7 @@ static int32_t taosSetServerCfg(SConfig *pCfg) {
|
||||||
tsMinSlidingTime = cfgGetItem(pCfg, "minSlidingTime")->i32;
|
tsMinSlidingTime = cfgGetItem(pCfg, "minSlidingTime")->i32;
|
||||||
tsMinIntervalTime = cfgGetItem(pCfg, "minIntervalTime")->i32;
|
tsMinIntervalTime = cfgGetItem(pCfg, "minIntervalTime")->i32;
|
||||||
tsMaxNumOfDistinctResults = cfgGetItem(pCfg, "maxNumOfDistinctRes")->i32;
|
tsMaxNumOfDistinctResults = cfgGetItem(pCfg, "maxNumOfDistinctRes")->i32;
|
||||||
|
tsCountAlwaysReturnValue = cfgGetItem(pCfg, "countAlwaysReturnValue")->i32;
|
||||||
tsMaxStreamComputDelay = cfgGetItem(pCfg, "maxStreamCompDelay")->i32;
|
tsMaxStreamComputDelay = cfgGetItem(pCfg, "maxStreamCompDelay")->i32;
|
||||||
tsStreamCompStartDelay = cfgGetItem(pCfg, "maxFirstStreamCompDelay")->i32;
|
tsStreamCompStartDelay = cfgGetItem(pCfg, "maxFirstStreamCompDelay")->i32;
|
||||||
tsRetryStreamCompDelay = cfgGetItem(pCfg, "retryStreamCompDelay")->i32;
|
tsRetryStreamCompDelay = cfgGetItem(pCfg, "retryStreamCompDelay")->i32;
|
||||||
|
|
|
@ -433,7 +433,7 @@ static int32_t translateHLL(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
|
||||||
"The input parameter of HYPERLOGLOG function can only be column");
|
"The input parameter of HYPERLOGLOG function can only be column");
|
||||||
}
|
}
|
||||||
|
|
||||||
pFunc->node.resType = (SDataType){.bytes = tDataTypes[TSDB_DATA_TYPE_UBIGINT].bytes, .type = TSDB_DATA_TYPE_UBIGINT};
|
pFunc->node.resType = (SDataType){.bytes = tDataTypes[TSDB_DATA_TYPE_BIGINT].bytes, .type = TSDB_DATA_TYPE_BIGINT};
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -14,6 +14,7 @@
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#include "builtinsimpl.h"
|
#include "builtinsimpl.h"
|
||||||
|
#include "tglobal.h"
|
||||||
#include "cJSON.h"
|
#include "cJSON.h"
|
||||||
#include "function.h"
|
#include "function.h"
|
||||||
#include "querynodes.h"
|
#include "querynodes.h"
|
||||||
|
@ -356,7 +357,7 @@ bool getCountFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
static FORCE_INLINE int32_t getNumofElem(SqlFunctionCtx* pCtx) {
|
static FORCE_INLINE int32_t getNumOfElems(SqlFunctionCtx* pCtx) {
|
||||||
int32_t numOfElem = 0;
|
int32_t numOfElem = 0;
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
@ -391,11 +392,12 @@ static FORCE_INLINE int32_t getNumofElem(SqlFunctionCtx* pCtx) {
|
||||||
* count function does not use the pCtx->interResBuf to keep the intermediate buffer
|
* count function does not use the pCtx->interResBuf to keep the intermediate buffer
|
||||||
*/
|
*/
|
||||||
int32_t countFunction(SqlFunctionCtx* pCtx) {
|
int32_t countFunction(SqlFunctionCtx* pCtx) {
|
||||||
int32_t numOfElem = getNumofElem(pCtx);
|
int32_t numOfElem = getNumOfElems(pCtx);
|
||||||
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
|
|
||||||
|
|
||||||
|
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
|
||||||
SInputColumnInfoData* pInput = &pCtx->input;
|
SInputColumnInfoData* pInput = &pCtx->input;
|
||||||
int32_t type = pInput->pData[0]->info.type;
|
|
||||||
|
int32_t type = pInput->pData[0]->info.type;
|
||||||
|
|
||||||
char* buf = GET_ROWCELL_INTERBUF(pResInfo);
|
char* buf = GET_ROWCELL_INTERBUF(pResInfo);
|
||||||
if (IS_NULL_TYPE(type)) {
|
if (IS_NULL_TYPE(type)) {
|
||||||
|
@ -406,12 +408,17 @@ int32_t countFunction(SqlFunctionCtx* pCtx) {
|
||||||
*((int64_t*)buf) += numOfElem;
|
*((int64_t*)buf) += numOfElem;
|
||||||
}
|
}
|
||||||
|
|
||||||
SET_VAL(pResInfo, numOfElem, 1);
|
if (tsCountAlwaysReturnValue) {
|
||||||
|
pResInfo->numOfRes = 1;
|
||||||
|
} else {
|
||||||
|
SET_VAL(pResInfo, 1, 1);
|
||||||
|
}
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t countInvertFunction(SqlFunctionCtx* pCtx) {
|
int32_t countInvertFunction(SqlFunctionCtx* pCtx) {
|
||||||
int32_t numOfElem = getNumofElem(pCtx);
|
int32_t numOfElem = getNumOfElems(pCtx);
|
||||||
|
|
||||||
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
|
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
|
||||||
char* buf = GET_ROWCELL_INTERBUF(pResInfo);
|
char* buf = GET_ROWCELL_INTERBUF(pResInfo);
|
||||||
|
@ -3236,13 +3243,13 @@ static uint64_t hllCountCnt(uint8_t *buckets) {
|
||||||
z += buckethisto[j];
|
z += buckethisto[j];
|
||||||
z *= 0.5;
|
z *= 0.5;
|
||||||
}
|
}
|
||||||
|
|
||||||
z += m * hllSigma(buckethisto[0]/(double)m);
|
z += m * hllSigma(buckethisto[0]/(double)m);
|
||||||
double E = (double)llroundl(HLL_ALPHA_INF*m*m/z);
|
double E = (double)llroundl(HLL_ALPHA_INF*m*m/z);
|
||||||
|
|
||||||
return (uint64_t) E;
|
return (uint64_t) E;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
int32_t hllFunction(SqlFunctionCtx *pCtx) {
|
int32_t hllFunction(SqlFunctionCtx *pCtx) {
|
||||||
SHLLInfo* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
|
SHLLInfo* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
|
||||||
|
|
||||||
|
@ -3275,7 +3282,6 @@ int32_t hllFunction(SqlFunctionCtx *pCtx) {
|
||||||
if (count > oldcount) {
|
if (count > oldcount) {
|
||||||
pInfo->buckets[index] = count;
|
pInfo->buckets[index] = count;
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
SET_VAL(GET_RES_INFO(pCtx), numOfElems, 1);
|
SET_VAL(GET_RES_INFO(pCtx), numOfElems, 1);
|
||||||
|
@ -3283,9 +3289,13 @@ int32_t hllFunction(SqlFunctionCtx *pCtx) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t hllFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
int32_t hllFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
||||||
SHLLInfo* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
|
SResultRowEntryInfo *pInfo = GET_RES_INFO(pCtx);
|
||||||
|
|
||||||
pInfo->result = hllCountCnt(pInfo->buckets);
|
SHLLInfo* pHllInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
|
||||||
|
pHllInfo->result = hllCountCnt(pHllInfo->buckets);
|
||||||
|
if (tsCountAlwaysReturnValue && pHllInfo->result == 0) {
|
||||||
|
pInfo->numOfRes = 1;
|
||||||
|
}
|
||||||
|
|
||||||
return functionFinalize(pCtx, pBlock);
|
return functionFinalize(pCtx, pBlock);
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue