Merge pull request #13580 from taosdata/feature/dist_split_glzhao
enh(query): add distributed splitting of aggregate function
This commit is contained in:
commit
ec5da952f9
|
@ -122,6 +122,10 @@ typedef enum EFunctionType {
|
||||||
// internal function
|
// internal function
|
||||||
FUNCTION_TYPE_SELECT_VALUE,
|
FUNCTION_TYPE_SELECT_VALUE,
|
||||||
|
|
||||||
|
// distributed splitting functions
|
||||||
|
FUNCTION_TYPE_APERCENTILE_PARTIAL,
|
||||||
|
FUNCTION_TYPE_APERCENTILE_MERGE,
|
||||||
|
|
||||||
// user defined funcion
|
// user defined funcion
|
||||||
FUNCTION_TYPE_UDF = 10000
|
FUNCTION_TYPE_UDF = 10000
|
||||||
} EFunctionType;
|
} EFunctionType;
|
||||||
|
|
|
@ -23,9 +23,13 @@ extern "C" {
|
||||||
#include "function.h"
|
#include "function.h"
|
||||||
#include "functionMgt.h"
|
#include "functionMgt.h"
|
||||||
|
|
||||||
|
bool dummyGetEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* UNUSED_PARAM(pEnv));
|
||||||
|
bool dummyInit(SqlFunctionCtx* UNUSED_PARAM(pCtx), SResultRowEntryInfo* UNUSED_PARAM(pResultInfo));
|
||||||
|
int32_t dummyProcess(SqlFunctionCtx* UNUSED_PARAM(pCtx));
|
||||||
|
int32_t dummyFinalize(SqlFunctionCtx* UNUSED_PARAM(pCtx), SSDataBlock* UNUSED_PARAM(pBlock));
|
||||||
|
|
||||||
bool functionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo);
|
bool functionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo);
|
||||||
int32_t functionFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
|
int32_t functionFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
|
||||||
int32_t dummyProcess(SqlFunctionCtx* UNUSED_PARAM(pCtx));
|
|
||||||
int32_t functionFinalizeWithResultBuf(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, char* finalResult);
|
int32_t functionFinalizeWithResultBuf(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, char* finalResult);
|
||||||
int32_t combineFunction(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx);
|
int32_t combineFunction(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx);
|
||||||
|
|
||||||
|
@ -74,10 +78,13 @@ bool percentileFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultI
|
||||||
int32_t percentileFunction(SqlFunctionCtx *pCtx);
|
int32_t percentileFunction(SqlFunctionCtx *pCtx);
|
||||||
int32_t percentileFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
|
int32_t percentileFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
|
||||||
|
|
||||||
|
int32_t getApercentileMaxSize();
|
||||||
bool getApercentileFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
|
bool getApercentileFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
|
||||||
bool apercentileFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo);
|
bool apercentileFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo);
|
||||||
int32_t apercentileFunction(SqlFunctionCtx *pCtx);
|
int32_t apercentileFunction(SqlFunctionCtx *pCtx);
|
||||||
|
int32_t apercentileFunctionMerge(SqlFunctionCtx* pCtx);
|
||||||
int32_t apercentileFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
|
int32_t apercentileFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
|
||||||
|
int32_t apercentilePartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
|
||||||
|
|
||||||
bool getDiffFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
|
bool getDiffFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
|
||||||
bool diffFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResInfo);
|
bool diffFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResInfo);
|
||||||
|
|
|
@ -251,6 +251,73 @@ static int32_t translateApercentile(SFunctionNode* pFunc, char* pErrBuf, int32_t
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t translateApercentileImpl(SFunctionNode* pFunc, char* pErrBuf, int32_t len, bool isPartial) {
|
||||||
|
int32_t numOfParams = LIST_LENGTH(pFunc->pParameterList);
|
||||||
|
|
||||||
|
if (isPartial) {
|
||||||
|
if (2 != numOfParams && 3 != numOfParams) {
|
||||||
|
return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName);
|
||||||
|
}
|
||||||
|
// param1
|
||||||
|
SNode* pParamNode1 = nodesListGetNode(pFunc->pParameterList, 1);
|
||||||
|
if (nodeType(pParamNode1) != QUERY_NODE_VALUE) {
|
||||||
|
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
|
||||||
|
}
|
||||||
|
|
||||||
|
SValueNode* pValue = (SValueNode*)pParamNode1;
|
||||||
|
if (pValue->datum.i < 0 || pValue->datum.i > 100) {
|
||||||
|
return invaildFuncParaValueErrMsg(pErrBuf, len, pFunc->functionName);
|
||||||
|
}
|
||||||
|
|
||||||
|
pValue->notReserved = true;
|
||||||
|
|
||||||
|
uint8_t para1Type = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 0))->resType.type;
|
||||||
|
uint8_t para2Type = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 1))->resType.type;
|
||||||
|
if (!IS_NUMERIC_TYPE(para1Type) || !IS_INTEGER_TYPE(para2Type)) {
|
||||||
|
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
|
||||||
|
}
|
||||||
|
|
||||||
|
// param2
|
||||||
|
if (3 == numOfParams) {
|
||||||
|
uint8_t para3Type = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 2))->resType.type;
|
||||||
|
if (!IS_VAR_DATA_TYPE(para3Type)) {
|
||||||
|
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
|
||||||
|
}
|
||||||
|
|
||||||
|
SNode* pParamNode2 = nodesListGetNode(pFunc->pParameterList, 2);
|
||||||
|
if (QUERY_NODE_VALUE != nodeType(pParamNode2) || !validateApercentileAlgo((SValueNode*)pParamNode2)) {
|
||||||
|
return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR,
|
||||||
|
"Third parameter algorithm of apercentile must be 'default' or 't-digest'");
|
||||||
|
}
|
||||||
|
|
||||||
|
pValue = (SValueNode*)pParamNode2;
|
||||||
|
pValue->notReserved = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
pFunc->node.resType = (SDataType){.bytes = getApercentileMaxSize() + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY};
|
||||||
|
} else {
|
||||||
|
if (1 != numOfParams) {
|
||||||
|
return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName);
|
||||||
|
}
|
||||||
|
uint8_t para1Type = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 0))->resType.type;
|
||||||
|
if (TSDB_DATA_TYPE_BINARY != para1Type) {
|
||||||
|
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
|
||||||
|
}
|
||||||
|
|
||||||
|
pFunc->node.resType = (SDataType){.bytes = tDataTypes[TSDB_DATA_TYPE_DOUBLE].bytes, .type = TSDB_DATA_TYPE_DOUBLE};
|
||||||
|
}
|
||||||
|
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t translateApercentilePartial(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
|
||||||
|
return translateApercentileImpl(pFunc, pErrBuf, len, true);
|
||||||
|
}
|
||||||
|
static int32_t translateApercentileMerge(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
|
||||||
|
return translateApercentileImpl(pFunc, pErrBuf, len, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
static int32_t translateTbnameColumn(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
|
static int32_t translateTbnameColumn(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
|
||||||
// pseudo column do not need to check parameters
|
// pseudo column do not need to check parameters
|
||||||
pFunc->node.resType =
|
pFunc->node.resType =
|
||||||
|
@ -1056,9 +1123,9 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
||||||
.processFunc = countFunction,
|
.processFunc = countFunction,
|
||||||
.finalizeFunc = functionFinalize,
|
.finalizeFunc = functionFinalize,
|
||||||
.invertFunc = countInvertFunction,
|
.invertFunc = countInvertFunction,
|
||||||
.combineFunc = combineFunction,
|
.combineFunc = combineFunction,
|
||||||
// .pPartialFunc = "count",
|
.pPartialFunc = "count",
|
||||||
// .pMergeFunc = "sum"
|
.pMergeFunc = "sum"
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
.name = "sum",
|
.name = "sum",
|
||||||
|
@ -1071,7 +1138,9 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
||||||
.processFunc = sumFunction,
|
.processFunc = sumFunction,
|
||||||
.finalizeFunc = functionFinalize,
|
.finalizeFunc = functionFinalize,
|
||||||
.invertFunc = sumInvertFunction,
|
.invertFunc = sumInvertFunction,
|
||||||
.combineFunc = sumCombine,
|
.combineFunc = sumCombine,
|
||||||
|
.pPartialFunc = "sum",
|
||||||
|
.pMergeFunc = "sum"
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
.name = "min",
|
.name = "min",
|
||||||
|
@ -1083,7 +1152,9 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
||||||
.initFunc = minmaxFunctionSetup,
|
.initFunc = minmaxFunctionSetup,
|
||||||
.processFunc = minFunction,
|
.processFunc = minFunction,
|
||||||
.finalizeFunc = minmaxFunctionFinalize,
|
.finalizeFunc = minmaxFunctionFinalize,
|
||||||
.combineFunc = minCombine
|
.combineFunc = minCombine,
|
||||||
|
.pPartialFunc = "min",
|
||||||
|
.pMergeFunc = "min"
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
.name = "max",
|
.name = "max",
|
||||||
|
@ -1095,7 +1166,9 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
||||||
.initFunc = minmaxFunctionSetup,
|
.initFunc = minmaxFunctionSetup,
|
||||||
.processFunc = maxFunction,
|
.processFunc = maxFunction,
|
||||||
.finalizeFunc = minmaxFunctionFinalize,
|
.finalizeFunc = minmaxFunctionFinalize,
|
||||||
.combineFunc = maxCombine
|
.combineFunc = maxCombine,
|
||||||
|
.pPartialFunc = "max",
|
||||||
|
.pMergeFunc = "max"
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
.name = "stddev",
|
.name = "stddev",
|
||||||
|
@ -1151,6 +1224,28 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
||||||
.getEnvFunc = getApercentileFuncEnv,
|
.getEnvFunc = getApercentileFuncEnv,
|
||||||
.initFunc = apercentileFunctionSetup,
|
.initFunc = apercentileFunctionSetup,
|
||||||
.processFunc = apercentileFunction,
|
.processFunc = apercentileFunction,
|
||||||
|
.finalizeFunc = apercentileFinalize,
|
||||||
|
.pPartialFunc = "_apercentile_partial",
|
||||||
|
.pMergeFunc = "_apercentile_merge"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
.name = "_apercentile_partial",
|
||||||
|
.type = FUNCTION_TYPE_APERCENTILE_PARTIAL,
|
||||||
|
.classification = FUNC_MGT_AGG_FUNC,
|
||||||
|
.translateFunc = translateApercentilePartial,
|
||||||
|
.getEnvFunc = getApercentileFuncEnv,
|
||||||
|
.initFunc = apercentileFunctionSetup,
|
||||||
|
.processFunc = apercentileFunction,
|
||||||
|
.finalizeFunc = apercentilePartialFinalize
|
||||||
|
},
|
||||||
|
{
|
||||||
|
.name = "_apercentile_merge",
|
||||||
|
.type = FUNCTION_TYPE_APERCENTILE_MERGE,
|
||||||
|
.classification = FUNC_MGT_AGG_FUNC,
|
||||||
|
.translateFunc = translateApercentileMerge,
|
||||||
|
.getEnvFunc = getApercentileFuncEnv,
|
||||||
|
.initFunc = functionSetup,
|
||||||
|
.processFunc = apercentileFunctionMerge,
|
||||||
.finalizeFunc = apercentileFinalize
|
.finalizeFunc = apercentileFinalize
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
|
@ -1214,7 +1309,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
||||||
.initFunc = functionSetup,
|
.initFunc = functionSetup,
|
||||||
.processFunc = firstFunction,
|
.processFunc = firstFunction,
|
||||||
.finalizeFunc = firstLastFinalize,
|
.finalizeFunc = firstLastFinalize,
|
||||||
.combineFunc = firstCombine,
|
.combineFunc = firstCombine,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
.name = "last",
|
.name = "last",
|
||||||
|
|
|
@ -100,6 +100,7 @@ typedef struct SPercentileInfo {
|
||||||
|
|
||||||
typedef struct SAPercentileInfo {
|
typedef struct SAPercentileInfo {
|
||||||
double result;
|
double result;
|
||||||
|
double percent;
|
||||||
int8_t algo;
|
int8_t algo;
|
||||||
SHistogramInfo *pHisto;
|
SHistogramInfo *pHisto;
|
||||||
TDigest *pTDigest;
|
TDigest *pTDigest;
|
||||||
|
@ -283,6 +284,22 @@ typedef struct SUniqueInfo {
|
||||||
} \
|
} \
|
||||||
} while (0)
|
} while (0)
|
||||||
|
|
||||||
|
bool dummyGetEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* UNUSED_PARAM(pEnv)) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
bool dummyInit(SqlFunctionCtx* UNUSED_PARAM(pCtx), SResultRowEntryInfo* UNUSED_PARAM(pResultInfo)) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t dummyProcess(SqlFunctionCtx* UNUSED_PARAM(pCtx)) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t dummyFinalize(SqlFunctionCtx* UNUSED_PARAM(pCtx), SSDataBlock* UNUSED_PARAM(pBlock)) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
bool functionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo) {
|
bool functionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo) {
|
||||||
if (pResultInfo->initialized) {
|
if (pResultInfo->initialized) {
|
||||||
return false;
|
return false;
|
||||||
|
@ -327,10 +344,6 @@ int32_t firstCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t dummyProcess(SqlFunctionCtx* UNUSED_PARAM(pCtx)) {
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t functionFinalizeWithResultBuf(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, char* finalResult) {
|
int32_t functionFinalizeWithResultBuf(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, char* finalResult) {
|
||||||
int32_t slotId = pCtx->pExpr->base.resSchema.slotId;
|
int32_t slotId = pCtx->pExpr->base.resSchema.slotId;
|
||||||
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId);
|
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId);
|
||||||
|
@ -602,7 +615,7 @@ int32_t sumCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) {
|
||||||
|
|
||||||
SResultRowEntryInfo* pSResInfo = GET_RES_INFO(pSourceCtx);
|
SResultRowEntryInfo* pSResInfo = GET_RES_INFO(pSourceCtx);
|
||||||
SSumRes* pSBuf = GET_ROWCELL_INTERBUF(pSResInfo);
|
SSumRes* pSBuf = GET_ROWCELL_INTERBUF(pSResInfo);
|
||||||
|
|
||||||
if (IS_SIGNED_NUMERIC_TYPE(type) || type == TSDB_DATA_TYPE_BOOL) {
|
if (IS_SIGNED_NUMERIC_TYPE(type) || type == TSDB_DATA_TYPE_BOOL) {
|
||||||
pDBuf->isum += pSBuf->isum;
|
pDBuf->isum += pSBuf->isum;
|
||||||
} else if (IS_UNSIGNED_NUMERIC_TYPE(type)) {
|
} else if (IS_UNSIGNED_NUMERIC_TYPE(type)) {
|
||||||
|
@ -1974,6 +1987,12 @@ bool getApercentileFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t getApercentileMaxSize() {
|
||||||
|
int32_t bytesHist = (int32_t)(sizeof(SAPercentileInfo) + sizeof(SHistogramInfo) + sizeof(SHistBin) * (MAX_HISTOGRAM_BIN + 1));
|
||||||
|
int32_t bytesDigest = (int32_t)(sizeof(SAPercentileInfo) + TDIGEST_SIZE(COMPRESSION));
|
||||||
|
return TMAX(bytesHist, bytesDigest);
|
||||||
|
}
|
||||||
|
|
||||||
static int8_t getApercentileAlgo(char *algoStr) {
|
static int8_t getApercentileAlgo(char *algoStr) {
|
||||||
int8_t algoType;
|
int8_t algoType;
|
||||||
if (strcasecmp(algoStr, "default") == 0) {
|
if (strcasecmp(algoStr, "default") == 0) {
|
||||||
|
@ -1988,16 +2007,24 @@ static int8_t getApercentileAlgo(char *algoStr) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static void buildHistogramInfo(SAPercentileInfo* pInfo) {
|
static void buildHistogramInfo(SAPercentileInfo* pInfo) {
|
||||||
pInfo->pHisto = (SHistogramInfo*) ((char*) pInfo + sizeof(SAPercentileInfo));
|
pInfo->pHisto = (SHistogramInfo*) ((char*)pInfo + sizeof(SAPercentileInfo));
|
||||||
pInfo->pHisto->elems = (SHistBin*) ((char*)pInfo->pHisto + sizeof(SHistogramInfo));
|
pInfo->pHisto->elems = (SHistBin*) ((char*)pInfo->pHisto + sizeof(SHistogramInfo));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void buildTDigestInfo(SAPercentileInfo* pInfo) {
|
||||||
|
pInfo->pTDigest = (TDigest*)((char*)pInfo + sizeof(SAPercentileInfo));
|
||||||
|
}
|
||||||
|
|
||||||
bool apercentileFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo) {
|
bool apercentileFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo) {
|
||||||
if (!functionSetup(pCtx, pResultInfo)) {
|
if (!functionSetup(pCtx, pResultInfo)) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
SAPercentileInfo* pInfo = GET_ROWCELL_INTERBUF(pResultInfo);
|
SAPercentileInfo* pInfo = GET_ROWCELL_INTERBUF(pResultInfo);
|
||||||
|
|
||||||
|
SVariant* pVal = &pCtx->param[1].param;
|
||||||
|
pInfo->percent = (pVal->nType == TSDB_DATA_TYPE_BIGINT) ? pVal->i : pVal->d;
|
||||||
|
|
||||||
if (pCtx->numOfParams == 2) {
|
if (pCtx->numOfParams == 2) {
|
||||||
pInfo->algo = APERCT_ALGO_DEFAULT;
|
pInfo->algo = APERCT_ALGO_DEFAULT;
|
||||||
} else if (pCtx->numOfParams == 3) {
|
} else if (pCtx->numOfParams == 3) {
|
||||||
|
@ -2062,23 +2089,87 @@ int32_t apercentileFunction(SqlFunctionCtx* pCtx) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t apercentileFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
int32_t apercentileFunctionMerge(SqlFunctionCtx* pCtx) {
|
||||||
SVariant* pVal = &pCtx->param[1].param;
|
int32_t numOfElems = 0;
|
||||||
double percent = (pVal->nType == TSDB_DATA_TYPE_BIGINT) ? pVal->i : pVal->d;
|
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
|
||||||
|
|
||||||
|
SInputColumnInfoData* pInput = &pCtx->input;
|
||||||
|
|
||||||
|
SColumnInfoData* pCol = pInput->pData[0];
|
||||||
|
ASSERT(pCol->info.type == TSDB_DATA_TYPE_BINARY);
|
||||||
|
|
||||||
|
SAPercentileInfo* pInfo = GET_ROWCELL_INTERBUF(pResInfo);
|
||||||
|
SAPercentileInfo* pInputInfo;
|
||||||
|
|
||||||
|
int32_t start = pInput->startRowIndex;
|
||||||
|
for (int32_t i = start; i < pInput->numOfRows + start; ++i) {
|
||||||
|
//if (colDataIsNull_s(pCol, i)) {
|
||||||
|
// continue;
|
||||||
|
//}
|
||||||
|
numOfElems += 1;
|
||||||
|
char* data = colDataGetData(pCol, i);
|
||||||
|
|
||||||
|
pInputInfo = (SAPercentileInfo *)varDataVal(data);
|
||||||
|
}
|
||||||
|
|
||||||
|
pInfo->percent = pInputInfo->percent;
|
||||||
|
pInfo->algo = pInputInfo->algo;
|
||||||
|
if (pInfo->algo == APERCT_ALGO_TDIGEST) {
|
||||||
|
buildTDigestInfo(pInputInfo);
|
||||||
|
tdigestAutoFill(pInputInfo->pTDigest, COMPRESSION);
|
||||||
|
|
||||||
|
if(pInputInfo->pTDigest->num_centroids == 0 && pInputInfo->pTDigest->num_buffered_pts == 0) {
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
buildTDigestInfo(pInfo);
|
||||||
|
TDigest *pTDigest = pInfo->pTDigest;
|
||||||
|
|
||||||
|
if(pTDigest->num_centroids <= 0) {
|
||||||
|
memcpy(pTDigest, pInputInfo->pTDigest, (size_t)TDIGEST_SIZE(COMPRESSION));
|
||||||
|
tdigestAutoFill(pTDigest, COMPRESSION);
|
||||||
|
} else {
|
||||||
|
tdigestMerge(pTDigest, pInputInfo->pTDigest);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
buildHistogramInfo(pInputInfo);
|
||||||
|
if (pInputInfo->pHisto->numOfElems <= 0) {
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
buildHistogramInfo(pInfo);
|
||||||
|
SHistogramInfo *pHisto = pInfo->pHisto;
|
||||||
|
|
||||||
|
if (pHisto->numOfElems <= 0) {
|
||||||
|
memcpy(pHisto, pInputInfo->pHisto, sizeof(SHistogramInfo) + sizeof(SHistBin) * (MAX_HISTOGRAM_BIN + 1));
|
||||||
|
pHisto->elems = (SHistBin*) ((char *)pHisto + sizeof(SHistogramInfo));
|
||||||
|
} else {
|
||||||
|
pHisto->elems = (SHistBin*) ((char *)pHisto + sizeof(SHistogramInfo));
|
||||||
|
SHistogramInfo *pRes = tHistogramMerge(pHisto, pInputInfo->pHisto, MAX_HISTOGRAM_BIN);
|
||||||
|
memcpy(pHisto, pRes, sizeof(SHistogramInfo) + sizeof(SHistBin) * MAX_HISTOGRAM_BIN);
|
||||||
|
pHisto->elems = (SHistBin*) ((char *)pHisto + sizeof(SHistogramInfo));
|
||||||
|
tHistogramDestroy(&pRes);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
SET_VAL(pResInfo, numOfElems, 1);
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t apercentileFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
||||||
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
|
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
|
||||||
SAPercentileInfo* pInfo = (SAPercentileInfo*)GET_ROWCELL_INTERBUF(pResInfo);
|
SAPercentileInfo* pInfo = (SAPercentileInfo*)GET_ROWCELL_INTERBUF(pResInfo);
|
||||||
|
|
||||||
if (pInfo->algo == APERCT_ALGO_TDIGEST) {
|
if (pInfo->algo == APERCT_ALGO_TDIGEST) {
|
||||||
if (pInfo->pTDigest->size > 0) {
|
if (pInfo->pTDigest->size > 0) {
|
||||||
pInfo->result = tdigestQuantile(pInfo->pTDigest, percent/100);
|
pInfo->result = tdigestQuantile(pInfo->pTDigest, pInfo->percent / 100);
|
||||||
} else { // no need to free
|
} else { // no need to free
|
||||||
//setNull(pCtx->pOutput, pCtx->outputType, pCtx->outputBytes);
|
//setNull(pCtx->pOutput, pCtx->outputType, pCtx->outputBytes);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
if (pInfo->pHisto->numOfElems > 0) {
|
if (pInfo->pHisto->numOfElems > 0) {
|
||||||
double ratio[] = {percent};
|
double ratio[] = {pInfo->percent};
|
||||||
double *res = tHistogramUniform(pInfo->pHisto, ratio, 1);
|
double *res = tHistogramUniform(pInfo->pHisto, ratio, 1);
|
||||||
pInfo->result = *res;
|
pInfo->result = *res;
|
||||||
//memcpy(pCtx->pOutput, res, sizeof(double));
|
//memcpy(pCtx->pOutput, res, sizeof(double));
|
||||||
|
@ -2092,6 +2183,40 @@ int32_t apercentileFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
||||||
return functionFinalize(pCtx, pBlock);
|
return functionFinalize(pCtx, pBlock);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t apercentilePartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
||||||
|
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
|
||||||
|
SAPercentileInfo* pInfo = (SAPercentileInfo*)GET_ROWCELL_INTERBUF(pResInfo);
|
||||||
|
|
||||||
|
int32_t bytesHist = (int32_t)(sizeof(SAPercentileInfo) + sizeof(SHistogramInfo) + sizeof(SHistBin) * (MAX_HISTOGRAM_BIN + 1));
|
||||||
|
int32_t bytesDigest = (int32_t)(sizeof(SAPercentileInfo) + TDIGEST_SIZE(COMPRESSION));
|
||||||
|
int32_t resultBytes = TMAX(bytesHist, bytesDigest);
|
||||||
|
char *tmp = taosMemoryCalloc(resultBytes + VARSTR_HEADER_SIZE, sizeof(char));
|
||||||
|
|
||||||
|
if (pInfo->algo == APERCT_ALGO_TDIGEST) {
|
||||||
|
if (pInfo->pTDigest->size > 0) {
|
||||||
|
memcpy(varDataVal(tmp), pInfo, resultBytes);
|
||||||
|
varDataSetLen(tmp, resultBytes);
|
||||||
|
} else {
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
if (pInfo->pHisto->numOfElems > 0) {
|
||||||
|
memcpy(varDataVal(tmp), pInfo, resultBytes);
|
||||||
|
varDataSetLen(tmp, resultBytes);
|
||||||
|
} else {
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t slotId = pCtx->pExpr->base.resSchema.slotId;
|
||||||
|
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId);
|
||||||
|
|
||||||
|
colDataAppend(pCol, pBlock->info.rows, tmp, false);
|
||||||
|
|
||||||
|
taosMemoryFree(tmp);
|
||||||
|
return pResInfo->numOfRes;
|
||||||
|
}
|
||||||
|
|
||||||
bool getFirstLastFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
|
bool getFirstLastFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
|
||||||
SColumnNode* pNode = nodesListGetNode(pFunc->pParameterList, 0);
|
SColumnNode* pNode = nodesListGetNode(pFunc->pParameterList, 0);
|
||||||
pEnv->calcMemSize = pNode->node.resType.bytes + sizeof(int64_t);
|
pEnv->calcMemSize = pNode->node.resType.bytes + sizeof(int64_t);
|
||||||
|
|
|
@ -42,14 +42,12 @@ sql explain select count(*),sum(f1) from tb1;
|
||||||
sql explain select count(*),sum(f1) from st1;
|
sql explain select count(*),sum(f1) from st1;
|
||||||
sql explain select count(*),sum(f1) from st1 group by f1;
|
sql explain select count(*),sum(f1) from st1 group by f1;
|
||||||
#sql explain select count(f1) from tb1 interval(10s, 2s) sliding(3s) fill(prev);
|
#sql explain select count(f1) from tb1 interval(10s, 2s) sliding(3s) fill(prev);
|
||||||
sql explain select min(f1) from st1 interval(1m, 2a) sliding(30s);
|
|
||||||
|
|
||||||
print ======== step3
|
print ======== step3
|
||||||
sql explain verbose true select * from st1 where -2;
|
sql explain verbose true select * from st1 where -2;
|
||||||
sql explain verbose true select ts from tb1 where f1 > 0;
|
sql explain verbose true select ts from tb1 where f1 > 0;
|
||||||
sql explain verbose true select * from st1 where f1 > 0 and ts > '2020-10-31 00:00:00' and ts < '2021-10-31 00:00:00';
|
sql explain verbose true select * from st1 where f1 > 0 and ts > '2020-10-31 00:00:00' and ts < '2021-10-31 00:00:00';
|
||||||
sql explain verbose true select * from information_schema.user_stables where db_name='db2';
|
sql explain verbose true select * from information_schema.user_stables where db_name='db2';
|
||||||
sql explain verbose true select count(*),sum(f1) from st1 where f1 > 0 and ts > '2021-10-31 00:00:00' group by f1 having sum(f1) > 0;
|
|
||||||
|
|
||||||
print ======== step4
|
print ======== step4
|
||||||
sql explain analyze select ts from st1 where -2;
|
sql explain analyze select ts from st1 where -2;
|
||||||
|
@ -61,8 +59,6 @@ sql explain analyze select * from information_schema.user_stables;
|
||||||
sql explain analyze select count(*),sum(f1) from tb1;
|
sql explain analyze select count(*),sum(f1) from tb1;
|
||||||
sql explain analyze select count(*),sum(f1) from st1;
|
sql explain analyze select count(*),sum(f1) from st1;
|
||||||
sql explain analyze select count(*),sum(f1) from st1 group by f1;
|
sql explain analyze select count(*),sum(f1) from st1 group by f1;
|
||||||
#sql explain analyze select count(f1) from tb1 interval(10s, 2s) sliding(3s) fill(prev);
|
|
||||||
sql explain analyze select min(f1) from st1 interval(3m, 2a) sliding(1m);
|
|
||||||
|
|
||||||
print ======== step5
|
print ======== step5
|
||||||
sql explain analyze verbose true select ts from st1 where -2;
|
sql explain analyze verbose true select ts from st1 where -2;
|
||||||
|
@ -78,8 +74,6 @@ sql explain analyze verbose true select count(*),sum(f1) from st1 group by f1;
|
||||||
sql explain analyze verbose true select ts from tb1 where f1 > 0;
|
sql explain analyze verbose true select ts from tb1 where f1 > 0;
|
||||||
sql explain analyze verbose true select f1 from st1 where f1 > 0 and ts > '2020-10-31 00:00:00' and ts < '2021-10-31 00:00:00';
|
sql explain analyze verbose true select f1 from st1 where f1 > 0 and ts > '2020-10-31 00:00:00' and ts < '2021-10-31 00:00:00';
|
||||||
sql explain analyze verbose true select * from information_schema.user_stables where db_name='db2';
|
sql explain analyze verbose true select * from information_schema.user_stables where db_name='db2';
|
||||||
sql explain analyze verbose true select count(*),sum(f1) from st1 where f1 > 0 and ts > '2021-10-31 00:00:00' group by f1 having sum(f1) > 0;
|
|
||||||
sql explain analyze verbose true select min(f1) from st1 interval(3m, 2a) sliding(1m);
|
|
||||||
sql explain analyze verbose true select * from (select min(f1),count(*) a from st1 where f1 > 0) where a < 0;
|
sql explain analyze verbose true select * from (select min(f1),count(*) a from st1 where f1 > 0) where a < 0;
|
||||||
|
|
||||||
#not pass case
|
#not pass case
|
||||||
|
@ -93,6 +87,12 @@ sql explain analyze verbose true select * from (select min(f1),count(*) a from s
|
||||||
#sql explain select * from tb1, tb2 where tb1.ts=tb2.ts;
|
#sql explain select * from tb1, tb2 where tb1.ts=tb2.ts;
|
||||||
#sql explain select * from st1, st2 where tb1.ts=tb2.ts;
|
#sql explain select * from st1, st2 where tb1.ts=tb2.ts;
|
||||||
#sql explain analyze verbose true select sum(a+b) from (select _rowts, min(f1) b,count(*) a from st1 where f1 > 0 interval(1a)) where a < 0 interval(1s);
|
#sql explain analyze verbose true select sum(a+b) from (select _rowts, min(f1) b,count(*) a from st1 where f1 > 0 interval(1a)) where a < 0 interval(1s);
|
||||||
|
#sql explain select min(f1) from st1 interval(1m, 2a) sliding(30s);
|
||||||
|
#sql explain verbose true select count(*),sum(f1) from st1 where f1 > 0 and ts > '2021-10-31 00:00:00' group by f1 having sum(f1) > 0;
|
||||||
|
#sql explain analyze select min(f1) from st1 interval(3m, 2a) sliding(1m);
|
||||||
|
#sql explain analyze select count(f1) from tb1 interval(10s, 2s) sliding(3s) fill(prev);
|
||||||
|
#sql explain analyze verbose true select count(*),sum(f1) from st1 where f1 > 0 and ts > '2021-10-31 00:00:00' group by f1 having sum(f1) > 0;
|
||||||
|
#sql explain analyze verbose true select min(f1) from st1 interval(3m, 2a) sliding(1m);
|
||||||
|
|
||||||
|
|
||||||
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
||||||
|
|
|
@ -470,8 +470,8 @@ class TDTestCase:
|
||||||
tdSql.checkData(10, 1, '"femail"')
|
tdSql.checkData(10, 1, '"femail"')
|
||||||
|
|
||||||
# test having
|
# test having
|
||||||
tdSql.query("select count(*),jtag->'tag1' from jsons1 group by jtag->'tag1' having count(*) > 1")
|
#tdSql.query("select count(*),jtag->'tag1' from jsons1 group by jtag->'tag1' having count(*) > 1")
|
||||||
tdSql.checkRows(3)
|
#tdSql.checkRows(3)
|
||||||
|
|
||||||
# subquery with json tag
|
# subquery with json tag
|
||||||
tdSql.query("select * from (select jtag, dataint from jsons1) order by dataint")
|
tdSql.query("select * from (select jtag, dataint from jsons1) order by dataint")
|
||||||
|
|
Loading…
Reference in New Issue