From 28693ce285d93940e6233f4a9a16c1035ce296ca Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Mon, 20 Feb 2023 14:07:24 +0800 Subject: [PATCH 01/14] feat(query): allow percentile function to take multiple params to enhance performance --- source/libs/executor/src/executorimpl.c | 10 ++++ source/libs/function/src/builtins.c | 34 ++++++++----- source/libs/function/src/builtinsimpl.c | 67 +++++++++++++++++++------ 3 files changed, 84 insertions(+), 27 deletions(-) diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 448e2b0a91..f41f2650fb 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -1565,6 +1565,8 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) { SOperatorInfo* downstream = pOperator->pDownstream[0]; int64_t st = taosGetTimestampUs(); + double scanCost = 0; + double calcCost = 0; int32_t order = TSDB_ORDER_ASC; int32_t scanFlag = MAIN_SCAN; @@ -1573,6 +1575,7 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) { bool blockAllocated = false; while (1) { + st = taosGetTimestampUs(); SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream); if (pBlock == NULL) { if (!hasValidBlock) { @@ -1586,6 +1589,7 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) { } } hasValidBlock = true; + scanCost += (taosGetTimestampUs() - st) / 1000.0; int32_t code = getTableScanInfo(pOperator, &order, &scanFlag); if (code != TSDB_CODE_SUCCESS) { @@ -1603,6 +1607,7 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) { } } + st = taosGetTimestampUs(); // the pDataBlock are always the same one, no need to call this again setExecutionContext(pOperator, pOperator->exprSupp.numOfExprs, pBlock->info.id.groupId); setInputDataBlock(pSup, pBlock, order, scanFlag, true); @@ -1612,9 +1617,14 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) { T_LONG_JMP(pTaskInfo->env, code); } + calcCost += (taosGetTimestampUs() - st) / 1000.0; + destroyDataBlockForEmptyInput(blockAllocated, &pBlock); } + qError("Gavin: %s total dowstream cost: %lf ms", pOperator->pDownstream[0]->name, scanCost); + qError("Gavin: %s total calculation cost: %lf ms", pOperator->name, calcCost); + // the downstream operator may return with error code, so let's check the code before generating results. if (pTaskInfo->code != TSDB_CODE_SUCCESS) { T_LONG_JMP(pTaskInfo->env, pTaskInfo->code); diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index faf7a29dd0..755f103c44 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -497,27 +497,37 @@ static int32_t translateTimezone(SFunctionNode* pFunc, char* pErrBuf, int32_t le } static int32_t translatePercentile(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { - if (2 != LIST_LENGTH(pFunc->pParameterList)) { + int32_t numOfParams = LIST_LENGTH(pFunc->pParameterList); + if (numOfParams > 11) { return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName); } - // param1 - SValueNode* pValue = (SValueNode*)nodesListGetNode(pFunc->pParameterList, 1); - - if (pValue->datum.i < 0 || pValue->datum.i > 100) { - return invaildFuncParaValueErrMsg(pErrBuf, len, pFunc->functionName); - } - - pValue->notReserved = true; uint8_t para1Type = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 0))->resType.type; - uint8_t para2Type = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 1))->resType.type; - if (!IS_NUMERIC_TYPE(para1Type) || (!IS_SIGNED_NUMERIC_TYPE(para2Type) && !IS_UNSIGNED_NUMERIC_TYPE(para2Type))) { + if (!IS_NUMERIC_TYPE(para1Type)) { return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName); } + + for (int32_t i = 1; i < numOfParams; ++i) { + SValueNode* pValue = (SValueNode*)nodesListGetNode(pFunc->pParameterList, i); + pValue->notReserved = true; + + if (pValue->datum.i < 0 || pValue->datum.i > 100) { + return invaildFuncParaValueErrMsg(pErrBuf, len, pFunc->functionName); + } + uint8_t paraType = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, i))->resType.type; + if (!IS_SIGNED_NUMERIC_TYPE(paraType) && !IS_UNSIGNED_NUMERIC_TYPE(paraType)) { + return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName); + } + } + // set result type - pFunc->node.resType = (SDataType){.bytes = tDataTypes[TSDB_DATA_TYPE_DOUBLE].bytes, .type = TSDB_DATA_TYPE_DOUBLE}; + if (numOfParams > 2) { + pFunc->node.resType = (SDataType){.bytes = 128, .type = TSDB_DATA_TYPE_VARCHAR}; + } else { + pFunc->node.resType = (SDataType){.bytes = tDataTypes[TSDB_DATA_TYPE_DOUBLE].bytes, .type = TSDB_DATA_TYPE_DOUBLE}; + } return TSDB_CODE_SUCCESS; } diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index e4081ddf0d..80e77969ab 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -1661,26 +1661,63 @@ int32_t percentileFunction(SqlFunctionCtx* pCtx) { } int32_t percentileFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { - SVariant* pVal = &pCtx->param[1].param; - int32_t code = 0; - double v = 0; + if (pCtx->numOfParams > 2) { - GET_TYPED_DATA(v, double, pVal->nType, &pVal->i); + SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx); + SPercentileInfo* ppInfo = (SPercentileInfo*)GET_ROWCELL_INTERBUF(pResInfo); - SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx); - SPercentileInfo* ppInfo = (SPercentileInfo*)GET_ROWCELL_INTERBUF(pResInfo); + tMemBucket* pMemBucket = ppInfo->pMemBucket; + if (pMemBucket == NULL || pMemBucket->total == 0) { // check for null + return TSDB_CODE_FAILED; + } - tMemBucket* pMemBucket = ppInfo->pMemBucket; - if (pMemBucket != NULL && pMemBucket->total > 0) { // check for null - code = getPercentile(pMemBucket, v, &ppInfo->result); + char buf[512] = {0}; + size_t len = 0; + + for (int32_t i = 1; i < pCtx->numOfParams; ++i) { + SVariant* pVal = &pCtx->param[i].param; + double v = 0; + + GET_TYPED_DATA(v, double, pVal->nType, &pVal->i); + + int32_t code = getPercentile(pMemBucket, v, &ppInfo->result); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + + len += snprintf(varDataVal(buf) + len, sizeof(buf) - VARSTR_HEADER_SIZE - len, "%.6lf; ", ppInfo->result); + } + tMemBucketDestroy(pMemBucket); + + int32_t slotId = pCtx->pExpr->base.resSchema.slotId; + SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId); + + varDataSetLen(buf, len); + colDataAppend(pCol, pBlock->info.rows, buf, false); + + return pResInfo->numOfRes; + } else { + SVariant* pVal = &pCtx->param[1].param; + int32_t code = 0; + double v = 0; + + GET_TYPED_DATA(v, double, pVal->nType, &pVal->i); + + SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx); + SPercentileInfo* ppInfo = (SPercentileInfo*)GET_ROWCELL_INTERBUF(pResInfo); + + tMemBucket* pMemBucket = ppInfo->pMemBucket; + if (pMemBucket != NULL && pMemBucket->total > 0) { // check for null + code = getPercentile(pMemBucket, v, &ppInfo->result); + } + + tMemBucketDestroy(pMemBucket); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + return functionFinalize(pCtx, pBlock); } - tMemBucketDestroy(pMemBucket); - if (code != TSDB_CODE_SUCCESS) { - return code; - } - - return functionFinalize(pCtx, pBlock); } bool getApercentileFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) { From f7a8393c4761e19e990e05407eff4458ec25b2f6 Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Mon, 20 Feb 2023 15:23:11 +0800 Subject: [PATCH 02/14] add perf logs --- source/libs/executor/inc/executorimpl.h | 4 ++++ source/libs/executor/src/executorimpl.c | 21 +++++++++++++++++++++ 2 files changed, 25 insertions(+) diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 999a7965fb..e63c00af1e 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -228,6 +228,10 @@ typedef struct SOperatorInfo { struct SOperatorInfo** pDownstream; // downstram pointer list int32_t numOfDownstream; // number of downstream. The value is always ONE expect for join operator SOperatorFpSet fpSet; + int64_t downstreamTime; + int64_t funcInitTime; + int64_t funcExecTime; + int64_t funcFinTime; } SOperatorInfo; typedef enum { diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index f41f2650fb..db8c8c6ac0 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -517,7 +517,9 @@ static int32_t doAggregateImpl(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx) { continue; } + int64_t st = taosGetTimestampUs(); int32_t code = pCtx[k].fpSet.process(&pCtx[k]); + pOperator->funcExecTime += taosGetTimestampUs() - st; if (code != TSDB_CODE_SUCCESS) { qError("%s aggregate function error happens, code: %s", GET_TASKID(pOperator->pTaskInfo), tstrerror(code)); return code; @@ -1035,7 +1037,10 @@ void doSetTableGroupOutputBuf(SOperatorInfo* pOperator, int32_t numOfOutput, uin } } + + int64_t st = taosGetTimestampUs(); setResultRowInitCtx(pResultRow, pCtx, numOfOutput, rowEntryInfoOffset); + pOperator->funcInitTime += taosGetTimestampUs() - st; } static void setExecutionContext(SOperatorInfo* pOperator, int32_t numOfOutput, uint64_t groupId) { @@ -1252,7 +1257,9 @@ void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SG doCopyToSDataBlock(pTaskInfo, pBlock, &pOperator->exprSupp, pBuf, pGroupResInfo); } else { while (hasRemainResults(pGroupResInfo)) { + int64_t st = taosGetTimestampUs(); doCopyToSDataBlock(pTaskInfo, pBlock, &pOperator->exprSupp, pBuf, pGroupResInfo); + pOperator->funcFinTime += taosGetTimestampUs() - st; if (pBlock->info.rows >= pOperator->resultInfo.threshold) { break; } @@ -1577,6 +1584,7 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) { while (1) { st = taosGetTimestampUs(); SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream); + pOperator->downstreamTime += taosGetTimestampUs() - st; if (pBlock == NULL) { if (!hasValidBlock) { createDataBlockForEmptyInput(pOperator, &pBlock); @@ -1694,6 +1702,15 @@ void destroyOperatorInfo(SOperatorInfo* pOperator) { return; } + if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_HASH_AGG) { + double downstream = (double)pOperator->downstreamTime / 1000000; + double init = (double)pOperator->funcInitTime / 1000000; + double exec = (double)pOperator->funcExecTime / 1000000; + double fin = (double)pOperator->funcFinTime / 1000000; + qError("operator: %s, downstream time:%lf, init time:%lf, exec time:%lf, fin time:%lf", + pOperator->name, downstream, init, exec, fin); + } + if (pOperator->fpSet.closeFn != NULL) { pOperator->fpSet.closeFn(pOperator->info); } @@ -1910,6 +1927,10 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SAggPhysiN pTaskInfo); pOperator->fpSet = createOperatorFpSet(doOpenAggregateOptr, getAggregateResult, NULL, destroyAggOperatorInfo, optrDefaultBufFn, NULL); + pOperator->downstreamTime = 0; + pOperator->funcInitTime = 0; + pOperator->funcExecTime = 0; + pOperator->funcFinTime = 0; if (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) { STableScanInfo* pTableScanInfo = downstream->info; From 09ce8182232a4d032dfcc3bbb5627dd0b3ad7fe3 Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Mon, 20 Feb 2023 19:35:06 +0800 Subject: [PATCH 03/14] add stats --- include/libs/function/function.h | 5 +++++ source/libs/executor/inc/executorimpl.h | 7 +++++++ source/libs/executor/src/executil.c | 5 +++++ source/libs/executor/src/executorimpl.c | 20 ++++++++++++++++++-- source/libs/function/src/builtinsimpl.c | 5 +++++ 5 files changed, 40 insertions(+), 2 deletions(-) diff --git a/include/libs/function/function.h b/include/libs/function/function.h index 32db6773e0..bad8e042a3 100644 --- a/include/libs/function/function.h +++ b/include/libs/function/function.h @@ -153,6 +153,11 @@ typedef struct SqlFunctionCtx { SSerializeDataHandle saveHandle; int32_t exprIdx; char udfName[TSDB_FUNC_NAME_LEN]; + int64_t smaHits; + int64_t smaNoHits; + int64_t smaNoHitsRows; + int64_t sdHits; + int64_t sdHitsRows; } SqlFunctionCtx; typedef struct tExprNode { diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index e63c00af1e..b50a22ed67 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -231,7 +231,14 @@ typedef struct SOperatorInfo { int64_t downstreamTime; int64_t funcInitTime; int64_t funcExecTime; + int64_t funcExecCalled; + int64_t totalNumOfRows; int64_t funcFinTime; + int64_t smaHits; + int64_t smaNoHits; + int64_t smaNoHitsRows; + int64_t sdHits; + int64_t sdHitsRows; } SOperatorInfo; typedef enum { diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index 3c4cafb753..b2f48042aa 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -1543,6 +1543,11 @@ SqlFunctionCtx* createSqlFunctionCtx(SExprInfo* pExprInfo, int32_t numOfOutput, pCtx->numOfParams = pExpr->base.numOfParams; pCtx->param = pFunct->pParam; pCtx->saveHandle.currentPage = -1; + pCtx->smaHits = 0; + pCtx->smaNoHits = 0; + pCtx->smaNoHitsRows = 0; + pCtx->sdHits = 0; + pCtx->sdHitsRows = 0; } for (int32_t i = 1; i < numOfOutput; ++i) { diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index db8c8c6ac0..2f76a3227b 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -519,7 +519,14 @@ static int32_t doAggregateImpl(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx) { int64_t st = taosGetTimestampUs(); int32_t code = pCtx[k].fpSet.process(&pCtx[k]); + pOperator->funcExecCalled += 1; + pOperator->totalNumOfRows += pCtx->input.numOfRows; pOperator->funcExecTime += taosGetTimestampUs() - st; + pOperator->smaHits = pCtx->smaHits; + pOperator->smaNoHits = pCtx->smaNoHits; + pOperator->smaNoHitsRows = pCtx->smaNoHitsRows; + pOperator->sdHits = pCtx->sdHits; + pOperator->sdHitsRows = pCtx->sdHitsRows; if (code != TSDB_CODE_SUCCESS) { qError("%s aggregate function error happens, code: %s", GET_TASKID(pOperator->pTaskInfo), tstrerror(code)); return code; @@ -1707,8 +1714,10 @@ void destroyOperatorInfo(SOperatorInfo* pOperator) { double init = (double)pOperator->funcInitTime / 1000000; double exec = (double)pOperator->funcExecTime / 1000000; double fin = (double)pOperator->funcFinTime / 1000000; - qError("operator: %s, downstream time:%lf, init time:%lf, exec time:%lf, fin time:%lf", - pOperator->name, downstream, init, exec, fin); + qError("operator: %s, downstream time:%lf, init time:%lf, exec time:%lf, exec called:%ld, fin time:%lf, total rows:%ld", + pOperator->name, downstream, init, exec, pOperator->funcExecCalled, fin, pOperator->totalNumOfRows); + qError("operator: %s, sma hits:%ld, sma nohits:%ld, sma nohits rows:%ld, second stage hits:%ld, second stage hits rows:%ld", + pOperator->name, pOperator->smaHits, pOperator->smaNoHits, pOperator->smaNoHitsRows, pOperator->sdHits, pOperator->sdHitsRows); } if (pOperator->fpSet.closeFn != NULL) { @@ -1930,7 +1939,14 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SAggPhysiN pOperator->downstreamTime = 0; pOperator->funcInitTime = 0; pOperator->funcExecTime = 0; + pOperator->funcExecCalled = 0; pOperator->funcFinTime = 0; + pOperator->totalNumOfRows = 0; + pOperator->smaHits = 0; + pOperator->smaNoHits = 0; + pOperator->smaNoHitsRows = 0; + pOperator->sdHits = 0; + pOperator->sdHitsRows= 0; if (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) { STableScanInfo* pTableScanInfo = downstream->info; diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index 80e77969ab..59aa2f1a92 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -1614,6 +1614,7 @@ int32_t percentileFunction(SqlFunctionCtx* pCtx) { } pInfo->numOfElems += (pInput->numOfRows - pAgg->numOfNull); + pCtx->smaHits += 1; } else { // check the valid data one by one int32_t start = pInput->startRowIndex; @@ -1636,6 +1637,8 @@ int32_t percentileFunction(SqlFunctionCtx* pCtx) { pInfo->numOfElems += 1; } + pCtx->smaNoHits += 1; + pCtx->smaNoHitsRows += pInput->numOfRows; } } else { // the second stage, calculate the true percentile value @@ -1655,6 +1658,8 @@ int32_t percentileFunction(SqlFunctionCtx* pCtx) { } SET_VAL(pResInfo, numOfElems, 1); + pCtx->sdHits += 1; + pCtx->sdHitsRows += pInput->numOfRows; } return TSDB_CODE_SUCCESS; From 7e01d092a23168b075c3bec27cf7caed7cfe544b Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Tue, 21 Feb 2023 17:24:09 +0800 Subject: [PATCH 04/14] optimize repeat scan --- source/libs/executor/src/executorimpl.c | 5 +++-- source/libs/executor/src/scanoperator.c | 11 ++++++++--- source/libs/function/src/builtins.c | 3 ++- source/libs/function/src/builtinsimpl.c | 2 +- 4 files changed, 14 insertions(+), 7 deletions(-) diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 2f76a3227b..b02c9c1d57 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -395,19 +395,20 @@ void applyAggFunctionOnPartialTuples(SExecTaskInfo* taskInfo, SqlFunctionCtx* pC } } -static void doSetInputDataBlockInfo(SExprSupp* pExprSup, SSDataBlock* pBlock, int32_t order) { +static void doSetInputDataBlockInfo(SExprSupp* pExprSup, SSDataBlock* pBlock, int32_t order, int32_t scanFlag) { SqlFunctionCtx* pCtx = pExprSup->pCtx; for (int32_t i = 0; i < pExprSup->numOfExprs; ++i) { pCtx[i].order = order; pCtx[i].input.numOfRows = pBlock->info.rows; setBlockSMAInfo(&pCtx[i], &pExprSup->pExprInfo[i], pBlock); pCtx[i].pSrcBlock = pBlock; + pCtx[i].scanFlag = scanFlag; } } void setInputDataBlock(SExprSupp* pExprSup, SSDataBlock* pBlock, int32_t order, int32_t scanFlag, bool createDummyCol) { if (pBlock->pBlockAgg != NULL) { - doSetInputDataBlockInfo(pExprSup, pBlock, order); + doSetInputDataBlockInfo(pExprSup, pBlock, order, scanFlag); } else { doSetInputDataBlock(pExprSup, pBlock, order, scanFlag, createDummyCol); } diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 8528985e05..097d6da63a 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -699,7 +699,8 @@ static SSDataBlock* doGroupedTableScan(SOperatorInfo* pOperator) { if (pTableScanInfo->scanTimes < pTableScanInfo->scanInfo.numOfAsc) { setTaskStatus(pTaskInfo, TASK_NOT_COMPLETED); - pTableScanInfo->base.scanFlag = REPEAT_SCAN; + pTableScanInfo->base.scanFlag = MAIN_SCAN; + pTableScanInfo->base.dataBlockLoadFlag = FUNC_DATA_REQUIRED_DATA_LOAD; qDebug("start to repeat ascending order scan data blocks due to query func required, %s", GET_TASKID(pTaskInfo)); // do prepare for the next round table scan operation @@ -725,7 +726,7 @@ static SSDataBlock* doGroupedTableScan(SOperatorInfo* pOperator) { if (pTableScanInfo->scanTimes < total) { setTaskStatus(pTaskInfo, TASK_NOT_COMPLETED); - pTableScanInfo->base.scanFlag = REPEAT_SCAN; + pTableScanInfo->base.scanFlag = MAIN_SCAN; qDebug("%s start to repeat descending order scan data blocks", GET_TASKID(pTaskInfo)); tsdbReaderReset(pTableScanInfo->base.dataReader, &pTableScanInfo->base.cond); @@ -878,7 +879,11 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, pInfo->scanInfo = (SScanInfo){.numOfAsc = pTableScanNode->scanSeq[0], .numOfDesc = pTableScanNode->scanSeq[1]}; - pInfo->base.scanFlag = MAIN_SCAN; + if (pInfo->scanInfo.numOfAsc > 1) { + pInfo->base.scanFlag = REPEAT_SCAN; + } else { + pInfo->base.scanFlag = MAIN_SCAN; + } pInfo->base.pdInfo.interval = extractIntervalInfo(pTableScanNode); pInfo->base.readHandle = *readHandle; pInfo->base.dataBlockLoadFlag = pTableScanNode->dataRequired; diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index 755f103c44..8902bd72cf 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -2276,8 +2276,9 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { { .name = "percentile", .type = FUNCTION_TYPE_PERCENTILE, - .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_REPEAT_SCAN_FUNC | FUNC_MGT_FORBID_STREAM_FUNC, + .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_REPEAT_SCAN_FUNC | FUNC_MGT_SPECIAL_DATA_REQUIRED | FUNC_MGT_FORBID_STREAM_FUNC, .translateFunc = translatePercentile, + .dataRequiredFunc = statisDataRequired, .getEnvFunc = getPercentileFuncEnv, .initFunc = percentileFunctionSetup, .processFunc = percentileFunction, diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index 59aa2f1a92..bf4ca7d01e 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -1578,7 +1578,7 @@ int32_t percentileFunction(SqlFunctionCtx* pCtx) { int32_t type = pCol->info.type; SPercentileInfo* pInfo = GET_ROWCELL_INTERBUF(pResInfo); - if (pCtx->scanFlag == REPEAT_SCAN && pInfo->stage == 0) { + if (pCtx->scanFlag == MAIN_SCAN && pInfo->stage == 0) { pInfo->stage += 1; // all data are null, set it completed From 61de3b179a887cc65547b71eac4db5eb198556d2 Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Wed, 22 Feb 2023 10:04:28 +0800 Subject: [PATCH 05/14] Revert "add stats" This reverts commit 09ce8182232a4d032dfcc3bbb5627dd0b3ad7fe3. --- include/libs/function/function.h | 5 ----- source/libs/executor/inc/executorimpl.h | 7 ------- source/libs/executor/src/executil.c | 5 ----- source/libs/executor/src/executorimpl.c | 20 ++------------------ source/libs/function/src/builtinsimpl.c | 5 ----- 5 files changed, 2 insertions(+), 40 deletions(-) diff --git a/include/libs/function/function.h b/include/libs/function/function.h index bad8e042a3..32db6773e0 100644 --- a/include/libs/function/function.h +++ b/include/libs/function/function.h @@ -153,11 +153,6 @@ typedef struct SqlFunctionCtx { SSerializeDataHandle saveHandle; int32_t exprIdx; char udfName[TSDB_FUNC_NAME_LEN]; - int64_t smaHits; - int64_t smaNoHits; - int64_t smaNoHitsRows; - int64_t sdHits; - int64_t sdHitsRows; } SqlFunctionCtx; typedef struct tExprNode { diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index b50a22ed67..e63c00af1e 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -231,14 +231,7 @@ typedef struct SOperatorInfo { int64_t downstreamTime; int64_t funcInitTime; int64_t funcExecTime; - int64_t funcExecCalled; - int64_t totalNumOfRows; int64_t funcFinTime; - int64_t smaHits; - int64_t smaNoHits; - int64_t smaNoHitsRows; - int64_t sdHits; - int64_t sdHitsRows; } SOperatorInfo; typedef enum { diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index b2f48042aa..3c4cafb753 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -1543,11 +1543,6 @@ SqlFunctionCtx* createSqlFunctionCtx(SExprInfo* pExprInfo, int32_t numOfOutput, pCtx->numOfParams = pExpr->base.numOfParams; pCtx->param = pFunct->pParam; pCtx->saveHandle.currentPage = -1; - pCtx->smaHits = 0; - pCtx->smaNoHits = 0; - pCtx->smaNoHitsRows = 0; - pCtx->sdHits = 0; - pCtx->sdHitsRows = 0; } for (int32_t i = 1; i < numOfOutput; ++i) { diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index b02c9c1d57..e38cfb5285 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -520,14 +520,7 @@ static int32_t doAggregateImpl(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx) { int64_t st = taosGetTimestampUs(); int32_t code = pCtx[k].fpSet.process(&pCtx[k]); - pOperator->funcExecCalled += 1; - pOperator->totalNumOfRows += pCtx->input.numOfRows; pOperator->funcExecTime += taosGetTimestampUs() - st; - pOperator->smaHits = pCtx->smaHits; - pOperator->smaNoHits = pCtx->smaNoHits; - pOperator->smaNoHitsRows = pCtx->smaNoHitsRows; - pOperator->sdHits = pCtx->sdHits; - pOperator->sdHitsRows = pCtx->sdHitsRows; if (code != TSDB_CODE_SUCCESS) { qError("%s aggregate function error happens, code: %s", GET_TASKID(pOperator->pTaskInfo), tstrerror(code)); return code; @@ -1715,10 +1708,8 @@ void destroyOperatorInfo(SOperatorInfo* pOperator) { double init = (double)pOperator->funcInitTime / 1000000; double exec = (double)pOperator->funcExecTime / 1000000; double fin = (double)pOperator->funcFinTime / 1000000; - qError("operator: %s, downstream time:%lf, init time:%lf, exec time:%lf, exec called:%ld, fin time:%lf, total rows:%ld", - pOperator->name, downstream, init, exec, pOperator->funcExecCalled, fin, pOperator->totalNumOfRows); - qError("operator: %s, sma hits:%ld, sma nohits:%ld, sma nohits rows:%ld, second stage hits:%ld, second stage hits rows:%ld", - pOperator->name, pOperator->smaHits, pOperator->smaNoHits, pOperator->smaNoHitsRows, pOperator->sdHits, pOperator->sdHitsRows); + qError("operator: %s, downstream time:%lf, init time:%lf, exec time:%lf, fin time:%lf", + pOperator->name, downstream, init, exec, fin); } if (pOperator->fpSet.closeFn != NULL) { @@ -1940,14 +1931,7 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SAggPhysiN pOperator->downstreamTime = 0; pOperator->funcInitTime = 0; pOperator->funcExecTime = 0; - pOperator->funcExecCalled = 0; pOperator->funcFinTime = 0; - pOperator->totalNumOfRows = 0; - pOperator->smaHits = 0; - pOperator->smaNoHits = 0; - pOperator->smaNoHitsRows = 0; - pOperator->sdHits = 0; - pOperator->sdHitsRows= 0; if (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) { STableScanInfo* pTableScanInfo = downstream->info; diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index bf4ca7d01e..68b2987518 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -1614,7 +1614,6 @@ int32_t percentileFunction(SqlFunctionCtx* pCtx) { } pInfo->numOfElems += (pInput->numOfRows - pAgg->numOfNull); - pCtx->smaHits += 1; } else { // check the valid data one by one int32_t start = pInput->startRowIndex; @@ -1637,8 +1636,6 @@ int32_t percentileFunction(SqlFunctionCtx* pCtx) { pInfo->numOfElems += 1; } - pCtx->smaNoHits += 1; - pCtx->smaNoHitsRows += pInput->numOfRows; } } else { // the second stage, calculate the true percentile value @@ -1658,8 +1655,6 @@ int32_t percentileFunction(SqlFunctionCtx* pCtx) { } SET_VAL(pResInfo, numOfElems, 1); - pCtx->sdHits += 1; - pCtx->sdHitsRows += pInput->numOfRows; } return TSDB_CODE_SUCCESS; From 961ac26f92d850d73c8a6cccbd4ccd017c7cd8d5 Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Wed, 22 Feb 2023 10:04:35 +0800 Subject: [PATCH 06/14] Revert "add perf logs" This reverts commit f7a8393c4761e19e990e05407eff4458ec25b2f6. --- source/libs/executor/inc/executorimpl.h | 4 ---- source/libs/executor/src/executorimpl.c | 21 --------------------- 2 files changed, 25 deletions(-) diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index e63c00af1e..999a7965fb 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -228,10 +228,6 @@ typedef struct SOperatorInfo { struct SOperatorInfo** pDownstream; // downstram pointer list int32_t numOfDownstream; // number of downstream. The value is always ONE expect for join operator SOperatorFpSet fpSet; - int64_t downstreamTime; - int64_t funcInitTime; - int64_t funcExecTime; - int64_t funcFinTime; } SOperatorInfo; typedef enum { diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index e38cfb5285..9de48e1a6e 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -518,9 +518,7 @@ static int32_t doAggregateImpl(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx) { continue; } - int64_t st = taosGetTimestampUs(); int32_t code = pCtx[k].fpSet.process(&pCtx[k]); - pOperator->funcExecTime += taosGetTimestampUs() - st; if (code != TSDB_CODE_SUCCESS) { qError("%s aggregate function error happens, code: %s", GET_TASKID(pOperator->pTaskInfo), tstrerror(code)); return code; @@ -1038,10 +1036,7 @@ void doSetTableGroupOutputBuf(SOperatorInfo* pOperator, int32_t numOfOutput, uin } } - - int64_t st = taosGetTimestampUs(); setResultRowInitCtx(pResultRow, pCtx, numOfOutput, rowEntryInfoOffset); - pOperator->funcInitTime += taosGetTimestampUs() - st; } static void setExecutionContext(SOperatorInfo* pOperator, int32_t numOfOutput, uint64_t groupId) { @@ -1258,9 +1253,7 @@ void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SG doCopyToSDataBlock(pTaskInfo, pBlock, &pOperator->exprSupp, pBuf, pGroupResInfo); } else { while (hasRemainResults(pGroupResInfo)) { - int64_t st = taosGetTimestampUs(); doCopyToSDataBlock(pTaskInfo, pBlock, &pOperator->exprSupp, pBuf, pGroupResInfo); - pOperator->funcFinTime += taosGetTimestampUs() - st; if (pBlock->info.rows >= pOperator->resultInfo.threshold) { break; } @@ -1585,7 +1578,6 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) { while (1) { st = taosGetTimestampUs(); SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream); - pOperator->downstreamTime += taosGetTimestampUs() - st; if (pBlock == NULL) { if (!hasValidBlock) { createDataBlockForEmptyInput(pOperator, &pBlock); @@ -1703,15 +1695,6 @@ void destroyOperatorInfo(SOperatorInfo* pOperator) { return; } - if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_HASH_AGG) { - double downstream = (double)pOperator->downstreamTime / 1000000; - double init = (double)pOperator->funcInitTime / 1000000; - double exec = (double)pOperator->funcExecTime / 1000000; - double fin = (double)pOperator->funcFinTime / 1000000; - qError("operator: %s, downstream time:%lf, init time:%lf, exec time:%lf, fin time:%lf", - pOperator->name, downstream, init, exec, fin); - } - if (pOperator->fpSet.closeFn != NULL) { pOperator->fpSet.closeFn(pOperator->info); } @@ -1928,10 +1911,6 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SAggPhysiN pTaskInfo); pOperator->fpSet = createOperatorFpSet(doOpenAggregateOptr, getAggregateResult, NULL, destroyAggOperatorInfo, optrDefaultBufFn, NULL); - pOperator->downstreamTime = 0; - pOperator->funcInitTime = 0; - pOperator->funcExecTime = 0; - pOperator->funcFinTime = 0; if (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) { STableScanInfo* pTableScanInfo = downstream->info; From fea91264535830a947d829943af874e9ea7ec48b Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Wed, 22 Feb 2023 10:07:03 +0800 Subject: [PATCH 07/14] remove debug logs --- source/libs/executor/src/executorimpl.c | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 9de48e1a6e..e539b3df06 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -1566,8 +1566,6 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) { SOperatorInfo* downstream = pOperator->pDownstream[0]; int64_t st = taosGetTimestampUs(); - double scanCost = 0; - double calcCost = 0; int32_t order = TSDB_ORDER_ASC; int32_t scanFlag = MAIN_SCAN; @@ -1576,7 +1574,6 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) { bool blockAllocated = false; while (1) { - st = taosGetTimestampUs(); SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream); if (pBlock == NULL) { if (!hasValidBlock) { @@ -1590,7 +1587,6 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) { } } hasValidBlock = true; - scanCost += (taosGetTimestampUs() - st) / 1000.0; int32_t code = getTableScanInfo(pOperator, &order, &scanFlag); if (code != TSDB_CODE_SUCCESS) { @@ -1608,7 +1604,6 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) { } } - st = taosGetTimestampUs(); // the pDataBlock are always the same one, no need to call this again setExecutionContext(pOperator, pOperator->exprSupp.numOfExprs, pBlock->info.id.groupId); setInputDataBlock(pSup, pBlock, order, scanFlag, true); @@ -1618,14 +1613,9 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) { T_LONG_JMP(pTaskInfo->env, code); } - calcCost += (taosGetTimestampUs() - st) / 1000.0; - destroyDataBlockForEmptyInput(blockAllocated, &pBlock); } - qError("Gavin: %s total dowstream cost: %lf ms", pOperator->pDownstream[0]->name, scanCost); - qError("Gavin: %s total calculation cost: %lf ms", pOperator->name, calcCost); - // the downstream operator may return with error code, so let's check the code before generating results. if (pTaskInfo->code != TSDB_CODE_SUCCESS) { T_LONG_JMP(pTaskInfo->env, pTaskInfo->code); From 9102c7a42a8edda12bcc3fefa01261f4a67cefb8 Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Wed, 22 Feb 2023 10:22:10 +0800 Subject: [PATCH 08/14] change repeat_scan to pre_scan --- include/libs/function/function.h | 2 +- source/libs/executor/src/executorimpl.c | 2 +- source/libs/executor/src/scanoperator.c | 3 ++- 3 files changed, 4 insertions(+), 3 deletions(-) diff --git a/include/libs/function/function.h b/include/libs/function/function.h index 32db6773e0..4fbcf7e3ce 100644 --- a/include/libs/function/function.h +++ b/include/libs/function/function.h @@ -76,7 +76,7 @@ enum { enum { MAIN_SCAN = 0x0u, REVERSE_SCAN = 0x1u, // todo remove it - REPEAT_SCAN = 0x2u, // repeat scan belongs to the master scan + PRE_SCAN = 0x2u, // pre-scan belongs to the main scan and occurs before main scan }; typedef struct SPoint1 { diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index e539b3df06..7907cd4312 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -538,7 +538,7 @@ bool functionNeedToExecute(SqlFunctionCtx* pCtx) { return false; } - if (pCtx->scanFlag == REPEAT_SCAN) { + if (pCtx->scanFlag == PRE_SCAN) { return fmIsRepeatScanFunc(pCtx->functionId); } diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 097d6da63a..0c3d5bbe7d 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -880,10 +880,11 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, pInfo->scanInfo = (SScanInfo){.numOfAsc = pTableScanNode->scanSeq[0], .numOfDesc = pTableScanNode->scanSeq[1]}; if (pInfo->scanInfo.numOfAsc > 1) { - pInfo->base.scanFlag = REPEAT_SCAN; + pInfo->base.scanFlag = PRE_SCAN; } else { pInfo->base.scanFlag = MAIN_SCAN; } + pInfo->base.pdInfo.interval = extractIntervalInfo(pTableScanNode); pInfo->base.readHandle = *readHandle; pInfo->base.dataBlockLoadFlag = pTableScanNode->dataRequired; From 84765ff670338b2f51031deeb0630a86cac2d163 Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Wed, 22 Feb 2023 10:47:49 +0800 Subject: [PATCH 09/14] refactor code --- source/libs/function/src/builtins.c | 4 +-- source/libs/function/src/builtinsimpl.c | 47 ++++++++++++------------- 2 files changed, 25 insertions(+), 26 deletions(-) diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index 8902bd72cf..ea639cb425 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -517,14 +517,14 @@ static int32_t translatePercentile(SFunctionNode* pFunc, char* pErrBuf, int32_t return invaildFuncParaValueErrMsg(pErrBuf, len, pFunc->functionName); } uint8_t paraType = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, i))->resType.type; - if (!IS_SIGNED_NUMERIC_TYPE(paraType) && !IS_UNSIGNED_NUMERIC_TYPE(paraType)) { + if (!IS_NUMERIC_TYPE(paraType)) { return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName); } } // set result type if (numOfParams > 2) { - pFunc->node.resType = (SDataType){.bytes = 128, .type = TSDB_DATA_TYPE_VARCHAR}; + pFunc->node.resType = (SDataType){.bytes = 512, .type = TSDB_DATA_TYPE_VARCHAR}; } else { pFunc->node.resType = (SDataType){.bytes = tDataTypes[TSDB_DATA_TYPE_DOUBLE].bytes, .type = TSDB_DATA_TYPE_DOUBLE}; } diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index 68b2987518..6df657f743 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -1661,63 +1661,62 @@ int32_t percentileFunction(SqlFunctionCtx* pCtx) { } int32_t percentileFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { + SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx); + SPercentileInfo* ppInfo = (SPercentileInfo*)GET_ROWCELL_INTERBUF(pResInfo); + + int32_t code = 0; + double v = 0; + + tMemBucket* pMemBucket = ppInfo->pMemBucket; + if (pMemBucket == NULL || pMemBucket->total == 0) { // check for null + code = TSDB_CODE_FAILED; + goto _fin_error; + } + if (pCtx->numOfParams > 2) { - - SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx); - SPercentileInfo* ppInfo = (SPercentileInfo*)GET_ROWCELL_INTERBUF(pResInfo); - - tMemBucket* pMemBucket = ppInfo->pMemBucket; - if (pMemBucket == NULL || pMemBucket->total == 0) { // check for null - return TSDB_CODE_FAILED; - } - char buf[512] = {0}; size_t len = 0; for (int32_t i = 1; i < pCtx->numOfParams; ++i) { SVariant* pVal = &pCtx->param[i].param; - double v = 0; GET_TYPED_DATA(v, double, pVal->nType, &pVal->i); int32_t code = getPercentile(pMemBucket, v, &ppInfo->result); if (code != TSDB_CODE_SUCCESS) { - return code; + goto _fin_error; } len += snprintf(varDataVal(buf) + len, sizeof(buf) - VARSTR_HEADER_SIZE - len, "%.6lf; ", ppInfo->result); } - tMemBucketDestroy(pMemBucket); int32_t slotId = pCtx->pExpr->base.resSchema.slotId; - SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId); + SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId); varDataSetLen(buf, len); colDataAppend(pCol, pBlock->info.rows, buf, false); + tMemBucketDestroy(pMemBucket); return pResInfo->numOfRes; } else { SVariant* pVal = &pCtx->param[1].param; - int32_t code = 0; - double v = 0; GET_TYPED_DATA(v, double, pVal->nType, &pVal->i); - SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx); - SPercentileInfo* ppInfo = (SPercentileInfo*)GET_ROWCELL_INTERBUF(pResInfo); - - tMemBucket* pMemBucket = ppInfo->pMemBucket; - if (pMemBucket != NULL && pMemBucket->total > 0) { // check for null - code = getPercentile(pMemBucket, v, &ppInfo->result); + code = getPercentile(pMemBucket, v, &ppInfo->result); + if (code != TSDB_CODE_SUCCESS) { + goto _fin_error; } tMemBucketDestroy(pMemBucket); - if (code != TSDB_CODE_SUCCESS) { - return code; - } return functionFinalize(pCtx, pBlock); } +_fin_error: + + tMemBucketDestroy(pMemBucket); + return code; + } bool getApercentileFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) { From 61140d31434e791b5381f1e8de962dd827b69c69 Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Wed, 22 Feb 2023 11:09:23 +0800 Subject: [PATCH 10/14] fix parameter check --- source/libs/executor/src/scanoperator.c | 7 +------ source/libs/function/src/builtins.c | 14 +++++++++++--- 2 files changed, 12 insertions(+), 9 deletions(-) diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 0c3d5bbe7d..a7ae8d05c7 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -878,12 +878,7 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, } pInfo->scanInfo = (SScanInfo){.numOfAsc = pTableScanNode->scanSeq[0], .numOfDesc = pTableScanNode->scanSeq[1]}; - - if (pInfo->scanInfo.numOfAsc > 1) { - pInfo->base.scanFlag = PRE_SCAN; - } else { - pInfo->base.scanFlag = MAIN_SCAN; - } + pInfo->base.scanFlag = (pInfo->scanInfo.numOfAsc > 1) ? PRE_SCAN : MAIN_SCAN; pInfo->base.pdInfo.interval = extractIntervalInfo(pTableScanNode); pInfo->base.readHandle = *readHandle; diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index ea639cb425..502af1a828 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -513,13 +513,21 @@ static int32_t translatePercentile(SFunctionNode* pFunc, char* pErrBuf, int32_t SValueNode* pValue = (SValueNode*)nodesListGetNode(pFunc->pParameterList, i); pValue->notReserved = true; - if (pValue->datum.i < 0 || pValue->datum.i > 100) { - return invaildFuncParaValueErrMsg(pErrBuf, len, pFunc->functionName); - } uint8_t paraType = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, i))->resType.type; if (!IS_NUMERIC_TYPE(paraType)) { return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName); } + + double v = 0; + if (IS_INTEGER_TYPE(paraType)) { + v = (double)pValue->datum.i; + } else { + v = pValue->datum.d; + } + + if (v < 0 || v > 100) { + return invaildFuncParaValueErrMsg(pErrBuf, len, pFunc->functionName); + } } // set result type From d6ac7de7462b2ac5fe84d57743dc0ffa34c22688 Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Wed, 22 Feb 2023 11:44:17 +0800 Subject: [PATCH 11/14] change output format to JSON array --- source/libs/function/src/builtinsimpl.c | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index 6df657f743..236d27fb37 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -1675,8 +1675,9 @@ int32_t percentileFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { if (pCtx->numOfParams > 2) { char buf[512] = {0}; - size_t len = 0; + size_t len = 1; + varDataVal(buf)[0] = '['; for (int32_t i = 1; i < pCtx->numOfParams; ++i) { SVariant* pVal = &pCtx->param[i].param; @@ -1687,7 +1688,11 @@ int32_t percentileFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { goto _fin_error; } - len += snprintf(varDataVal(buf) + len, sizeof(buf) - VARSTR_HEADER_SIZE - len, "%.6lf; ", ppInfo->result); + if (i == pCtx->numOfParams - 1) { + len += snprintf(varDataVal(buf) + len, sizeof(buf) - VARSTR_HEADER_SIZE - len, "%.6lf]", ppInfo->result); + } else { + len += snprintf(varDataVal(buf) + len, sizeof(buf) - VARSTR_HEADER_SIZE - len, "%.6lf, ", ppInfo->result); + } } int32_t slotId = pCtx->pExpr->base.resSchema.slotId; From 39d38c15f1532441ca329eb3e10549e5bcba618e Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Wed, 22 Feb 2023 14:08:03 +0800 Subject: [PATCH 12/14] fix param check --- source/libs/function/src/builtins.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index 502af1a828..deb7a32432 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -498,7 +498,7 @@ static int32_t translateTimezone(SFunctionNode* pFunc, char* pErrBuf, int32_t le static int32_t translatePercentile(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { int32_t numOfParams = LIST_LENGTH(pFunc->pParameterList); - if (numOfParams > 11) { + if (numOfParams < 2 || numOfParams > 11) { return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName); } From 8175b36d31894867ab0711f9e1360d094d57cd40 Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Wed, 22 Feb 2023 14:08:26 +0800 Subject: [PATCH 13/14] add test cases --- tests/system-test/2-query/percentile.py | 46 ++++++++++++++++++++----- 1 file changed, 37 insertions(+), 9 deletions(-) diff --git a/tests/system-test/2-query/percentile.py b/tests/system-test/2-query/percentile.py index 935f55a8c2..e01aae97c0 100644 --- a/tests/system-test/2-query/percentile.py +++ b/tests/system-test/2-query/percentile.py @@ -50,7 +50,7 @@ class TDTestCase: 'col12': f'binary({self.binary_length})', 'col13': f'nchar({self.nchar_length})' } - + self.tag_dict = { 'ts_tag' : 'timestamp', 't1': 'tinyint', @@ -85,19 +85,19 @@ class TDTestCase: self.tag_values = [ f'{self.tag_ts},{self.tag_tinyint},{self.tag_smallint},{self.tag_int},{self.tag_bigint},\ {self.tag_utint},{self.tag_usint},{self.tag_uint},{self.tag_ubint},{self.tag_float},{self.tag_double},{self.tag_bool},"{self.binary_str}","{self.nchar_str}"' - + ] - + self.param = [1,50,100] - + def insert_data(self,column_dict,tbname,row_num): - intData = [] + intData = [] floatData = [] insert_sql = self.setsql.set_insertsql(column_dict,tbname,self.binary_str,self.nchar_str) for i in range(row_num): insert_list = [] self.setsql.insert_values(column_dict,i,insert_sql,insert_list,self.ts) - intData.append(i) + intData.append(i) floatData.append(i + 0.1) return intData,floatData def check_tags(self,tags,param,num,value): @@ -117,6 +117,20 @@ class TDTestCase: else: tdSql.query(f'select percentile({k}, {param}) from {self.ntbname}') tdSql.checkData(0, 0, np.percentile(floatData, param)) + + tdSql.query(f'select percentile(col1, 10, 20, 30, 40, 50, 60, 70, 80, 90, 100) from {self.ntbname}') + tdSql.checkData(0, 0, '[0.900000, 1.800000, 2.700000, 3.600000, 4.500000, 5.400000, 6.300000, 7.200000, 8.100000, 9.000000]') + + tdSql.query(f'select percentile(col1, 9.9, 19.9, 29.9, 39.9, 49.9, 59.9, 69.9, 79.9, 89.9, 99.9) from {self.ntbname}') + tdSql.checkData(0, 0, '[0.891000, 1.791000, 2.691000, 3.591000, 4.491000, 5.391000, 6.291000, 7.191000, 8.091000, 8.991000]') + + tdSql.error(f'select percentile(col1) from {self.ntbname}') + tdSql.error(f'select percentile(col1, -1) from {self.ntbname}') + tdSql.error(f'select percentile(col1, 101) from {self.ntbname}') + tdSql.error(f'select percentile(col1, col2) from {self.ntbname}') + tdSql.error(f'select percentile(1, col1) from {self.ntbname}') + tdSql.error(f'select percentile(col1, 10, 20, 30, 40, 50, 60, 70, 80, 90, 100, 101) from {self.ntbname}') + tdSql.execute(f'drop database {self.dbname}') def function_check_ctb(self): tdSql.execute(f'create database {self.dbname}') @@ -135,7 +149,7 @@ class TDTestCase: else: tdSql.query(f'select percentile({k}, {param}) from {self.stbname}_{i}') tdSql.checkData(0, 0, np.percentile(floatData, param)) - + for k,v in self.tag_dict.items(): for param in self.param: if v.lower() in ['timestamp','bool'] or 'binary' in v.lower() or 'nchar' in v.lower(): @@ -145,11 +159,25 @@ class TDTestCase: data_num = tdSql.queryResult[0][0] tdSql.query(f'select percentile({k},{param}) from {self.stbname}_{i}') tdSql.checkData(0,0,data_num) - tdSql.execute(f'drop database {self.dbname}') + + tdSql.query(f'select percentile(col1, 10, 20, 30, 40, 50, 60, 70, 80, 90, 100) from {self.stbname}_0') + tdSql.checkData(0, 0, '[0.900000, 1.800000, 2.700000, 3.600000, 4.500000, 5.400000, 6.300000, 7.200000, 8.100000, 9.000000]') + + tdSql.query(f'select percentile(col1, 9.9, 19.9, 29.9, 39.9, 49.9, 59.9, 69.9, 79.9, 89.9, 99.9) from {self.stbname}_0') + tdSql.checkData(0, 0, '[0.891000, 1.791000, 2.691000, 3.591000, 4.491000, 5.391000, 6.291000, 7.191000, 8.091000, 8.991000]') + + tdSql.error(f'select percentile(col1) from {self.stbname}_0') + tdSql.error(f'select percentile(col1, -1) from {self.stbname}_0') + tdSql.error(f'select percentile(col1, 101) from {self.stbname}_0') + tdSql.error(f'select percentile(col1, col2) from {self.stbname}_0') + tdSql.error(f'select percentile(1, col1) from {self.stbname}_0') + tdSql.error(f'select percentile(col1, 10, 20, 30, 40, 50, 60, 70, 80, 90, 100, 101) from {self.stbname}_0') + + tdSql.execute(f'drop database {self.dbname}') def run(self): self.function_check_ntb() self.function_check_ctb() - + def stop(self): tdSql.close() tdLog.success("%s successfully executed" % __file__) From f72b0d9861b99c70ca1d9c1023e67a42475251e8 Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Wed, 22 Feb 2023 16:31:45 +0800 Subject: [PATCH 14/14] add docs --- docs/en/12-taos-sql/10-function.md | 10 +++++++--- docs/zh/12-taos-sql/10-function.md | 10 +++++++--- 2 files changed, 14 insertions(+), 6 deletions(-) diff --git a/docs/en/12-taos-sql/10-function.md b/docs/en/12-taos-sql/10-function.md index 8ab5176aab..4504456cd7 100644 --- a/docs/en/12-taos-sql/10-function.md +++ b/docs/en/12-taos-sql/10-function.md @@ -795,19 +795,23 @@ HISTOGRAM(expr,bin_type, bin_description, normalized) ### PERCENTILE ```sql -PERCENTILE(expr, p) +PERCENTILE(expr, p [, p1] ...) ``` **Description**: The value whose rank in a specific column matches the specified percentage. If such a value matching the specified percentage doesn't exist in the column, an interpolation value will be returned. -**Return value type**: DOUBLE +**Return value type**: This function takes 2 minumum and 11 maximum parameters, and it can simultaneously return 10 percentiles at most. If 2 parameters are given, a single percentile is returned and the value type is DOUBLE. + If more than 2 parameters are given, the return value type is a VARCHAR string, the format of which is a JSON ARRAY containing all return values. **Applicable column types**: Numeric **Applicable table types**: table only -**More explanations**: _p_ is in range [0,100], when _p_ is 0, the result is same as using function MIN; when _p_ is 100, the result is same as function MAX. +**More explanations**: +- _p_ is in range [0,100], when _p_ is 0, the result is same as using function MIN; when _p_ is 100, the result is same as function MAX. +- When calculating multiple percentiles of a specific column, a single PERCENTILE function with multiple parameters is adviced, as this can largely reduce the query response time. + For example, using SELECT percentile(col, 90, 95, 99) FROM table will perform better than SELECT percentile(col, 90), percentile(col, 95), percentile(col, 99) from table. ## Selection Functions diff --git a/docs/zh/12-taos-sql/10-function.md b/docs/zh/12-taos-sql/10-function.md index c44c09d10c..bc51a51209 100644 --- a/docs/zh/12-taos-sql/10-function.md +++ b/docs/zh/12-taos-sql/10-function.md @@ -798,18 +798,22 @@ HISTOGRAM(expr,bin_type, bin_description, normalized) ### PERCENTILE ```sql -PERCENTILE(expr, p) +PERCENTILE(expr, p [, p1] ... ) ``` **功能说明**:统计表中某列的值百分比分位数。 -**返回数据类型**: DOUBLE。 +**返回数据类型**: 该函数最小参数个数为 2 个,最大参数个数为 11 个。可以最多同时返回 10 个百分比分位数。当参数个数为 2 时, 返回一个分位数, 类型为DOUBLE,当参数个数大于 2 时,返回类型为VARCHAR, 格式为包含多个返回值的JSON数组。 **应用字段**:数值类型。 **适用于**:表。 -**使用说明**:*P*值取值范围 0≤*P*≤100,为 0 的时候等同于 MIN,为 100 的时候等同于 MAX。 +**使用说明**: + +- *P*值取值范围 0≤*P*≤100,为 0 的时候等同于 MIN,为 100 的时候等同于 MAX; +- 同时计算针对同一列的多个分位数时,建议使用一个PERCENTILE函数和多个参数的方式,能很大程度上降低查询的响应时间。 + 比如,使用查询SELECT percentile(col, 90, 95, 99) FROM table, 性能会优于SELECT percentile(col, 90), percentile(col, 95), percentile(col, 99) from table。 ## 选择函数