feat(query): allow percentile function to take multiple params
to enhance performance
This commit is contained in:
parent
13362841fe
commit
28693ce285
|
@ -1565,6 +1565,8 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) {
|
|||
SOperatorInfo* downstream = pOperator->pDownstream[0];
|
||||
|
||||
int64_t st = taosGetTimestampUs();
|
||||
double scanCost = 0;
|
||||
double calcCost = 0;
|
||||
|
||||
int32_t order = TSDB_ORDER_ASC;
|
||||
int32_t scanFlag = MAIN_SCAN;
|
||||
|
@ -1573,6 +1575,7 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) {
|
|||
bool blockAllocated = false;
|
||||
|
||||
while (1) {
|
||||
st = taosGetTimestampUs();
|
||||
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
|
||||
if (pBlock == NULL) {
|
||||
if (!hasValidBlock) {
|
||||
|
@ -1586,6 +1589,7 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) {
|
|||
}
|
||||
}
|
||||
hasValidBlock = true;
|
||||
scanCost += (taosGetTimestampUs() - st) / 1000.0;
|
||||
|
||||
int32_t code = getTableScanInfo(pOperator, &order, &scanFlag);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
|
@ -1603,6 +1607,7 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) {
|
|||
}
|
||||
}
|
||||
|
||||
st = taosGetTimestampUs();
|
||||
// the pDataBlock are always the same one, no need to call this again
|
||||
setExecutionContext(pOperator, pOperator->exprSupp.numOfExprs, pBlock->info.id.groupId);
|
||||
setInputDataBlock(pSup, pBlock, order, scanFlag, true);
|
||||
|
@ -1612,9 +1617,14 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) {
|
|||
T_LONG_JMP(pTaskInfo->env, code);
|
||||
}
|
||||
|
||||
calcCost += (taosGetTimestampUs() - st) / 1000.0;
|
||||
|
||||
destroyDataBlockForEmptyInput(blockAllocated, &pBlock);
|
||||
}
|
||||
|
||||
qError("Gavin: %s total dowstream cost: %lf ms", pOperator->pDownstream[0]->name, scanCost);
|
||||
qError("Gavin: %s total calculation cost: %lf ms", pOperator->name, calcCost);
|
||||
|
||||
// the downstream operator may return with error code, so let's check the code before generating results.
|
||||
if (pTaskInfo->code != TSDB_CODE_SUCCESS) {
|
||||
T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
|
||||
|
|
|
@ -497,27 +497,37 @@ static int32_t translateTimezone(SFunctionNode* pFunc, char* pErrBuf, int32_t le
|
|||
}
|
||||
|
||||
static int32_t translatePercentile(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
|
||||
if (2 != LIST_LENGTH(pFunc->pParameterList)) {
|
||||
int32_t numOfParams = LIST_LENGTH(pFunc->pParameterList);
|
||||
if (numOfParams > 11) {
|
||||
return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName);
|
||||
}
|
||||
|
||||
// param1
|
||||
SValueNode* pValue = (SValueNode*)nodesListGetNode(pFunc->pParameterList, 1);
|
||||
|
||||
if (pValue->datum.i < 0 || pValue->datum.i > 100) {
|
||||
return invaildFuncParaValueErrMsg(pErrBuf, len, pFunc->functionName);
|
||||
}
|
||||
|
||||
pValue->notReserved = true;
|
||||
|
||||
uint8_t para1Type = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 0))->resType.type;
|
||||
uint8_t para2Type = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 1))->resType.type;
|
||||
if (!IS_NUMERIC_TYPE(para1Type) || (!IS_SIGNED_NUMERIC_TYPE(para2Type) && !IS_UNSIGNED_NUMERIC_TYPE(para2Type))) {
|
||||
if (!IS_NUMERIC_TYPE(para1Type)) {
|
||||
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
|
||||
}
|
||||
|
||||
|
||||
for (int32_t i = 1; i < numOfParams; ++i) {
|
||||
SValueNode* pValue = (SValueNode*)nodesListGetNode(pFunc->pParameterList, i);
|
||||
pValue->notReserved = true;
|
||||
|
||||
if (pValue->datum.i < 0 || pValue->datum.i > 100) {
|
||||
return invaildFuncParaValueErrMsg(pErrBuf, len, pFunc->functionName);
|
||||
}
|
||||
uint8_t paraType = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, i))->resType.type;
|
||||
if (!IS_SIGNED_NUMERIC_TYPE(paraType) && !IS_UNSIGNED_NUMERIC_TYPE(paraType)) {
|
||||
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
|
||||
}
|
||||
}
|
||||
|
||||
// set result type
|
||||
pFunc->node.resType = (SDataType){.bytes = tDataTypes[TSDB_DATA_TYPE_DOUBLE].bytes, .type = TSDB_DATA_TYPE_DOUBLE};
|
||||
if (numOfParams > 2) {
|
||||
pFunc->node.resType = (SDataType){.bytes = 128, .type = TSDB_DATA_TYPE_VARCHAR};
|
||||
} else {
|
||||
pFunc->node.resType = (SDataType){.bytes = tDataTypes[TSDB_DATA_TYPE_DOUBLE].bytes, .type = TSDB_DATA_TYPE_DOUBLE};
|
||||
}
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
|
|
@ -1661,26 +1661,63 @@ int32_t percentileFunction(SqlFunctionCtx* pCtx) {
|
|||
}
|
||||
|
||||
int32_t percentileFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
||||
SVariant* pVal = &pCtx->param[1].param;
|
||||
int32_t code = 0;
|
||||
double v = 0;
|
||||
if (pCtx->numOfParams > 2) {
|
||||
|
||||
GET_TYPED_DATA(v, double, pVal->nType, &pVal->i);
|
||||
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
|
||||
SPercentileInfo* ppInfo = (SPercentileInfo*)GET_ROWCELL_INTERBUF(pResInfo);
|
||||
|
||||
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
|
||||
SPercentileInfo* ppInfo = (SPercentileInfo*)GET_ROWCELL_INTERBUF(pResInfo);
|
||||
tMemBucket* pMemBucket = ppInfo->pMemBucket;
|
||||
if (pMemBucket == NULL || pMemBucket->total == 0) { // check for null
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
|
||||
tMemBucket* pMemBucket = ppInfo->pMemBucket;
|
||||
if (pMemBucket != NULL && pMemBucket->total > 0) { // check for null
|
||||
code = getPercentile(pMemBucket, v, &ppInfo->result);
|
||||
char buf[512] = {0};
|
||||
size_t len = 0;
|
||||
|
||||
for (int32_t i = 1; i < pCtx->numOfParams; ++i) {
|
||||
SVariant* pVal = &pCtx->param[i].param;
|
||||
double v = 0;
|
||||
|
||||
GET_TYPED_DATA(v, double, pVal->nType, &pVal->i);
|
||||
|
||||
int32_t code = getPercentile(pMemBucket, v, &ppInfo->result);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
}
|
||||
|
||||
len += snprintf(varDataVal(buf) + len, sizeof(buf) - VARSTR_HEADER_SIZE - len, "%.6lf; ", ppInfo->result);
|
||||
}
|
||||
tMemBucketDestroy(pMemBucket);
|
||||
|
||||
int32_t slotId = pCtx->pExpr->base.resSchema.slotId;
|
||||
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId);
|
||||
|
||||
varDataSetLen(buf, len);
|
||||
colDataAppend(pCol, pBlock->info.rows, buf, false);
|
||||
|
||||
return pResInfo->numOfRes;
|
||||
} else {
|
||||
SVariant* pVal = &pCtx->param[1].param;
|
||||
int32_t code = 0;
|
||||
double v = 0;
|
||||
|
||||
GET_TYPED_DATA(v, double, pVal->nType, &pVal->i);
|
||||
|
||||
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
|
||||
SPercentileInfo* ppInfo = (SPercentileInfo*)GET_ROWCELL_INTERBUF(pResInfo);
|
||||
|
||||
tMemBucket* pMemBucket = ppInfo->pMemBucket;
|
||||
if (pMemBucket != NULL && pMemBucket->total > 0) { // check for null
|
||||
code = getPercentile(pMemBucket, v, &ppInfo->result);
|
||||
}
|
||||
|
||||
tMemBucketDestroy(pMemBucket);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
}
|
||||
return functionFinalize(pCtx, pBlock);
|
||||
}
|
||||
|
||||
tMemBucketDestroy(pMemBucket);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
}
|
||||
|
||||
return functionFinalize(pCtx, pBlock);
|
||||
}
|
||||
|
||||
bool getApercentileFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
|
||||
|
|
Loading…
Reference in New Issue