add apercentile distributed spliting
This commit is contained in:
parent
49b277e64b
commit
dff653ed16
|
@ -205,7 +205,7 @@ static bool validateApercentileAlgo(const SValueNode* pVal) {
|
||||||
0 == strcasecmp(varDataVal(pVal->datum.p), "t-digest"));
|
0 == strcasecmp(varDataVal(pVal->datum.p), "t-digest"));
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t translateApercentileImpl(SFunctionNode* pFunc, char* pErrBuf, int32_t len, bool isPartial) {
|
static int32_t translateApercentile(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
|
||||||
int32_t numOfParams = LIST_LENGTH(pFunc->pParameterList);
|
int32_t numOfParams = LIST_LENGTH(pFunc->pParameterList);
|
||||||
if (2 != numOfParams && 3 != numOfParams) {
|
if (2 != numOfParams && 3 != numOfParams) {
|
||||||
return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName);
|
return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName);
|
||||||
|
@ -247,11 +247,66 @@ static int32_t translateApercentileImpl(SFunctionNode* pFunc, char* pErrBuf, int
|
||||||
pValue->notReserved = true;
|
pValue->notReserved = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!isPartial) {
|
pFunc->node.resType = (SDataType){.bytes = tDataTypes[TSDB_DATA_TYPE_DOUBLE].bytes, .type = TSDB_DATA_TYPE_DOUBLE};
|
||||||
pFunc->node.resType = (SDataType){.bytes = tDataTypes[TSDB_DATA_TYPE_DOUBLE].bytes, .type = TSDB_DATA_TYPE_DOUBLE};
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t translateApercentileImpl(SFunctionNode* pFunc, char* pErrBuf, int32_t len, bool isPartial) {
|
||||||
|
int32_t numOfParams = LIST_LENGTH(pFunc->pParameterList);
|
||||||
|
|
||||||
|
if (isPartial) {
|
||||||
|
if (2 != numOfParams && 3 != numOfParams) {
|
||||||
|
return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName);
|
||||||
|
}
|
||||||
|
// param1
|
||||||
|
SNode* pParamNode1 = nodesListGetNode(pFunc->pParameterList, 1);
|
||||||
|
if (nodeType(pParamNode1) != QUERY_NODE_VALUE) {
|
||||||
|
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
|
||||||
|
}
|
||||||
|
|
||||||
|
SValueNode* pValue = (SValueNode*)pParamNode1;
|
||||||
|
if (pValue->datum.i < 0 || pValue->datum.i > 100) {
|
||||||
|
return invaildFuncParaValueErrMsg(pErrBuf, len, pFunc->functionName);
|
||||||
|
}
|
||||||
|
|
||||||
|
pValue->notReserved = true;
|
||||||
|
|
||||||
|
uint8_t para1Type = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 0))->resType.type;
|
||||||
|
uint8_t para2Type = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 1))->resType.type;
|
||||||
|
if (!IS_NUMERIC_TYPE(para1Type) || !IS_INTEGER_TYPE(para2Type)) {
|
||||||
|
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
|
||||||
|
}
|
||||||
|
|
||||||
|
// param2
|
||||||
|
if (3 == numOfParams) {
|
||||||
|
uint8_t para3Type = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 2))->resType.type;
|
||||||
|
if (!IS_VAR_DATA_TYPE(para3Type)) {
|
||||||
|
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
|
||||||
|
}
|
||||||
|
|
||||||
|
SNode* pParamNode2 = nodesListGetNode(pFunc->pParameterList, 2);
|
||||||
|
if (QUERY_NODE_VALUE != nodeType(pParamNode2) || !validateApercentileAlgo((SValueNode*)pParamNode2)) {
|
||||||
|
return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR,
|
||||||
|
"Third parameter algorithm of apercentile must be 'default' or 't-digest'");
|
||||||
|
}
|
||||||
|
|
||||||
|
pValue = (SValueNode*)pParamNode2;
|
||||||
|
pValue->notReserved = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
pFunc->node.resType = (SDataType){.bytes = getApercentileMaxSize() + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY};
|
||||||
} else {
|
} else {
|
||||||
pFunc->node.resType = (SDataType){.bytes = 1000, .type = TSDB_DATA_TYPE_BINARY};
|
if (1 != numOfParams) {
|
||||||
|
return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName);
|
||||||
|
}
|
||||||
|
uint8_t para1Type = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 0))->resType.type;
|
||||||
|
if (!IS_NUMERIC_TYPE(para1Type)) {
|
||||||
|
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
|
||||||
|
}
|
||||||
|
|
||||||
|
pFunc->node.resType = (SDataType){.bytes = tDataTypes[TSDB_DATA_TYPE_DOUBLE].bytes, .type = TSDB_DATA_TYPE_DOUBLE};
|
||||||
}
|
}
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1164,7 +1219,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
||||||
.name = "apercentile",
|
.name = "apercentile",
|
||||||
.type = FUNCTION_TYPE_APERCENTILE,
|
.type = FUNCTION_TYPE_APERCENTILE,
|
||||||
.classification = FUNC_MGT_AGG_FUNC,
|
.classification = FUNC_MGT_AGG_FUNC,
|
||||||
.translateFunc = translateApercentileFinal,
|
.translateFunc = translateApercentile,
|
||||||
.getEnvFunc = getApercentileFuncEnv,
|
.getEnvFunc = getApercentileFuncEnv,
|
||||||
.initFunc = apercentileFunctionSetup,
|
.initFunc = apercentileFunctionSetup,
|
||||||
.processFunc = apercentileFunction,
|
.processFunc = apercentileFunction,
|
||||||
|
@ -1188,7 +1243,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
||||||
.classification = FUNC_MGT_AGG_FUNC,
|
.classification = FUNC_MGT_AGG_FUNC,
|
||||||
.translateFunc = translateApercentileFinal,
|
.translateFunc = translateApercentileFinal,
|
||||||
.getEnvFunc = getApercentileFuncEnv,
|
.getEnvFunc = getApercentileFuncEnv,
|
||||||
.initFunc = apercentileFunctionSetup,
|
.initFunc = NULL,//apercentileFunctionSetup,
|
||||||
.processFunc = apercentileFunction,
|
.processFunc = apercentileFunction,
|
||||||
.finalizeFunc = apercentileFinalize
|
.finalizeFunc = apercentileFinalize
|
||||||
},
|
},
|
||||||
|
|
|
@ -2047,6 +2047,55 @@ int32_t apercentileFunction(SqlFunctionCtx* pCtx) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t apercentileFunctionMerge(SqlFunctionCtx* pCtx) {
|
||||||
|
int32_t numOfElems = 0;
|
||||||
|
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
|
||||||
|
|
||||||
|
SInputColumnInfoData* pInput = &pCtx->input;
|
||||||
|
|
||||||
|
SColumnInfoData* pCol = pInput->pData[0];
|
||||||
|
int32_t type = pCol->info.type;
|
||||||
|
|
||||||
|
SAPercentileInfo* pInfo = GET_ROWCELL_INTERBUF(pResInfo);
|
||||||
|
SAPercentileInfo* pInputInfo;
|
||||||
|
|
||||||
|
int32_t start = pInput->startRowIndex;
|
||||||
|
for (int32_t i = start; i < pInput->numOfRows + start; ++i) {
|
||||||
|
//if (colDataIsNull_s(pCol, i)) {
|
||||||
|
// continue;
|
||||||
|
//}
|
||||||
|
numOfElems += 1;
|
||||||
|
char* data = colDataGetData(pCol, i);
|
||||||
|
|
||||||
|
pInputInfo = (SAPercentileInfo *)varDataVal(data);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pInfo->algo == APERCT_ALGO_TDIGEST) {
|
||||||
|
} else {
|
||||||
|
buildHistogramInfo(pInputInfo);
|
||||||
|
if (pInputInfo->pHisto->numOfElems <= 0) {
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
buildHistogramInfo(pInfo);
|
||||||
|
SHistogramInfo *pHisto = pInfo->pHisto;
|
||||||
|
|
||||||
|
if (pHisto->numOfElems <= 0) {
|
||||||
|
memcpy(pHisto, pInputInfo->pHisto, sizeof(SHistogramInfo) + sizeof(SHistBin) * (MAX_HISTOGRAM_BIN + 1));
|
||||||
|
pHisto->elems = (SHistBin*) ((char *)pHisto + sizeof(SHistogramInfo));
|
||||||
|
} else {
|
||||||
|
pHisto->elems = (SHistBin*) ((char *)pHisto + sizeof(SHistogramInfo));
|
||||||
|
SHistogramInfo *pRes = tHistogramMerge(pHisto, pInputInfo->pHisto, MAX_HISTOGRAM_BIN);
|
||||||
|
memcpy(pHisto, pRes, sizeof(SHistogramInfo) + sizeof(SHistBin) * MAX_HISTOGRAM_BIN);
|
||||||
|
pHisto->elems = (SHistBin*) ((char *)pHisto + sizeof(SHistogramInfo));
|
||||||
|
tHistogramDestroy(&pRes);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
SET_VAL(pResInfo, numOfElems, 1);
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t apercentileFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
int32_t apercentileFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
||||||
SVariant* pVal = &pCtx->param[1].param;
|
SVariant* pVal = &pCtx->param[1].param;
|
||||||
double percent = (pVal->nType == TSDB_DATA_TYPE_BIGINT) ? pVal->i : pVal->d;
|
double percent = (pVal->nType == TSDB_DATA_TYPE_BIGINT) ? pVal->i : pVal->d;
|
||||||
|
@ -2091,13 +2140,15 @@ int32_t apercentilePartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
||||||
|
|
||||||
if (pInfo->algo == APERCT_ALGO_TDIGEST) {
|
if (pInfo->algo == APERCT_ALGO_TDIGEST) {
|
||||||
if (pInfo->pTDigest->size > 0) {
|
if (pInfo->pTDigest->size > 0) {
|
||||||
memcpy(varDataVal(tmp), pInfo->pTDigest, resultBytes);
|
memcpy(varDataVal(tmp), pInfo, resultBytes);
|
||||||
|
varDataSetLen(tmp, resultBytes);
|
||||||
} else {
|
} else {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
if (pInfo->pHisto->numOfElems > 0) {
|
if (pInfo->pHisto->numOfElems > 0) {
|
||||||
memcpy(varDataVal(tmp), pInfo->pHisto, resultBytes);
|
memcpy(varDataVal(tmp), pInfo->pHisto, resultBytes);
|
||||||
|
varDataSetLen(tmp, resultBytes);
|
||||||
} else {
|
} else {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue