From a4e38ed7ca7ee4ed8cfddc757ad214ec23290a35 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 24 May 2022 15:54:39 +0800 Subject: [PATCH] enh(query): return open/total cost for each operator. --- source/libs/command/inc/commandInt.h | 2 + source/libs/command/src/explain.c | 90 ++++++++++++++++++- source/libs/executor/src/executorimpl.c | 39 +++----- source/libs/executor/src/groupoperator.c | 21 ++++- source/libs/executor/src/scanoperator.c | 4 +- source/libs/executor/src/timewindowoperator.c | 19 +++- 6 files changed, 141 insertions(+), 34 deletions(-) diff --git a/source/libs/command/inc/commandInt.h b/source/libs/command/inc/commandInt.h index 16d7ec0c4a..100e35bc3c 100644 --- a/source/libs/command/inc/commandInt.h +++ b/source/libs/command/inc/commandInt.h @@ -36,6 +36,8 @@ extern "C" { #define EXPLAIN_SORT_FORMAT "Sort" #define EXPLAIN_INTERVAL_FORMAT "Interval on Column %s" #define EXPLAIN_SESSION_FORMAT "Session" +#define EXPLAIN_STATE_WINDOW_FORMAT "StateWindow on Column %s" +#define EXPLAIN_PARITION_FORMAT "Partition on Column %s" #define EXPLAIN_ORDER_FORMAT "Order: %s" #define EXPLAIN_FILTER_FORMAT "Filter: " #define EXPLAIN_FILL_FORMAT "Fill: %s" diff --git a/source/libs/command/src/explain.c b/source/libs/command/src/explain.c index 03a4e67db4..1acc9368c8 100644 --- a/source/libs/command/src/explain.c +++ b/source/libs/command/src/explain.c @@ -162,6 +162,16 @@ int32_t qExplainGenerateResChildren(SPhysiNode *pNode, SExplainGroup *group, SNo pPhysiChildren = pSessNode->window.node.pChildren; break; } + case QUERY_NODE_PHYSICAL_PLAN_STATE_WINDOW: { + SStateWinodwPhysiNode* pStateNode = (SStateWinodwPhysiNode*) pNode; + pPhysiChildren = pStateNode->window.node.pChildren; + break; + } + case QUERY_NODE_PHYSICAL_PLAN_PARTITION: { + SPartitionPhysiNode* partitionPhysiNode = (SPartitionPhysiNode*) pNode; + pPhysiChildren = partitionPhysiNode->node.pChildren; + break; + } default: qError("not supported physical node type %d", pNode->type); QRY_ERR_RET(TSDB_CODE_QRY_APP_ERROR); @@ -339,7 +349,6 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i EXPLAIN_ROW_APPEND(EXPLAIN_COLUMNS_FORMAT, pTagScanNode->pScanCols->length); EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT); EXPLAIN_ROW_APPEND(EXPLAIN_WIDTH_FORMAT, pTagScanNode->node.pOutputDataBlockDesc->totalRowSize); - EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT); EXPLAIN_ROW_APPEND(EXPLAIN_RIGHT_PARENTHESIS_FORMAT); EXPLAIN_ROW_END(); QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level)); @@ -734,6 +743,85 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i } break; } + case QUERY_NODE_PHYSICAL_PLAN_STATE_WINDOW: { + SStateWinodwPhysiNode *pStateNode = (SStateWinodwPhysiNode *)pNode; + + EXPLAIN_ROW_NEW(level, EXPLAIN_STATE_WINDOW_FORMAT, nodesGetNameFromColumnNode(((STargetNode*)pStateNode->pStateKey)->pExpr)); + EXPLAIN_ROW_APPEND(EXPLAIN_LEFT_PARENTHESIS_FORMAT); + if (pResNode->pExecInfo) { + QRY_ERR_RET(qExplainBufAppendExecInfo(pResNode->pExecInfo, tbuf, &tlen)); + EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT); + } + + EXPLAIN_ROW_APPEND(EXPLAIN_FUNCTIONS_FORMAT, pStateNode->window.pFuncs->length); + EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT); + EXPLAIN_ROW_APPEND(EXPLAIN_WIDTH_FORMAT, pStateNode->window.node.pOutputDataBlockDesc->totalRowSize); + EXPLAIN_ROW_APPEND(EXPLAIN_RIGHT_PARENTHESIS_FORMAT); + EXPLAIN_ROW_END(); + QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level)); + + if (verbose) { + EXPLAIN_ROW_NEW(level + 1, EXPLAIN_OUTPUT_FORMAT); + EXPLAIN_ROW_APPEND(EXPLAIN_COLUMNS_FORMAT, + nodesGetOutputNumFromSlotList(pStateNode->window.node.pOutputDataBlockDesc->pSlots)); + EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT); + EXPLAIN_ROW_APPEND(EXPLAIN_WIDTH_FORMAT, pStateNode->window.node.pOutputDataBlockDesc->outputRowSize); + EXPLAIN_ROW_END(); + QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1)); + + EXPLAIN_ROW_END(); + QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1)); + + if (pStateNode->window.node.pConditions) { + EXPLAIN_ROW_NEW(level + 1, EXPLAIN_FILTER_FORMAT); + QRY_ERR_RET(nodesNodeToSQL(pStateNode->window.node.pConditions, tbuf + VARSTR_HEADER_SIZE, + TSDB_EXPLAIN_RESULT_ROW_SIZE, &tlen)); + EXPLAIN_ROW_END(); + QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1)); + } + } + break; + } + case QUERY_NODE_PHYSICAL_PLAN_PARTITION: { + SPartitionPhysiNode *pPartNode = (SPartitionPhysiNode *)pNode; + + SNode* p = nodesListGetNode(pPartNode->pPartitionKeys, 0); + EXPLAIN_ROW_NEW(level, EXPLAIN_PARITION_FORMAT, nodesGetNameFromColumnNode(p)); + EXPLAIN_ROW_APPEND(EXPLAIN_LEFT_PARENTHESIS_FORMAT); + if (pResNode->pExecInfo) { + QRY_ERR_RET(qExplainBufAppendExecInfo(pResNode->pExecInfo, tbuf, &tlen)); + EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT); + } +// EXPLAIN_ROW_APPEND(EXPLAIN_FUNCTIONS_FORMAT, pPartNode->length); +// EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT); + EXPLAIN_ROW_APPEND(EXPLAIN_WIDTH_FORMAT, pPartNode->node.pOutputDataBlockDesc->totalRowSize); +// if (pPartNode->pGroupKeys) { +// EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT); +// EXPLAIN_ROW_APPEND(EXPLAIN_GROUPS_FORMAT, pPartNode->pGroupKeys->length); +// } + EXPLAIN_ROW_APPEND(EXPLAIN_RIGHT_PARENTHESIS_FORMAT); + EXPLAIN_ROW_END(); + QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level)); + + if (verbose) { + EXPLAIN_ROW_NEW(level + 1, EXPLAIN_OUTPUT_FORMAT); + EXPLAIN_ROW_APPEND(EXPLAIN_COLUMNS_FORMAT, + nodesGetOutputNumFromSlotList(pPartNode->node.pOutputDataBlockDesc->pSlots)); + EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT); + EXPLAIN_ROW_APPEND(EXPLAIN_WIDTH_FORMAT, pPartNode->node.pOutputDataBlockDesc->outputRowSize); + EXPLAIN_ROW_END(); + QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1)); + + if (pPartNode->node.pConditions) { + EXPLAIN_ROW_NEW(level + 1, EXPLAIN_FILTER_FORMAT); + QRY_ERR_RET(nodesNodeToSQL(pPartNode->node.pConditions, tbuf + VARSTR_HEADER_SIZE, + TSDB_EXPLAIN_RESULT_ROW_SIZE, &tlen)); + EXPLAIN_ROW_END(); + QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1)); + } + } + break; + } default: qError("not supported physical node type %d", pNode->type); return TSDB_CODE_QRY_APP_ERROR; diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 04c9b89895..e16b60e58b 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -2719,8 +2719,9 @@ static void* setAllSourcesCompleted(SOperatorInfo* pOperator, int64_t startTs) { SExchangeInfo* pExchangeInfo = pOperator->info; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; - int64_t el = taosGetTimestampUs() - startTs; + int64_t el = taosGetTimestampUs() - startTs; SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo; + pLoadInfo->totalElapsed += el; size_t totalSources = taosArrayGetSize(pExchangeInfo->pSources); @@ -2921,6 +2922,7 @@ static SSDataBlock* seqLoadRemoteData(SOperatorInfo* pOperator) { pLoadInfo->totalSize); } + pOperator->resultInfo.totalRows += pRes->info.rows; return pExchangeInfo->pResult; } } @@ -2930,10 +2932,10 @@ static int32_t prepareLoadRemoteData(SOperatorInfo* pOperator) { return TSDB_CODE_SUCCESS; } + int64_t st = taosGetTimestampUs(); + SExchangeInfo* pExchangeInfo = pOperator->info; - if (pExchangeInfo->seqLoadData) { - // do nothing for sequentially load data - } else { + if (!pExchangeInfo->seqLoadData) { int32_t code = prepareConcurrentlyLoad(pOperator); if (code != TSDB_CODE_SUCCESS) { return code; @@ -2941,6 +2943,7 @@ static int32_t prepareLoadRemoteData(SOperatorInfo* pOperator) { } OPTR_SET_OPENED(pOperator); + pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0; return TSDB_CODE_SUCCESS; } @@ -2968,15 +2971,6 @@ static SSDataBlock* doLoadRemoteData(SOperatorInfo* pOperator) { } else { return concurrentlyLoadRemoteData(pOperator); } - -#if 0 - _error: - taosMemoryFreeClear(pMsg); - taosMemoryFreeClear(pMsgSendInfo); - - terrno = pTaskInfo->code; - return NULL; -#endif } static int32_t initDataSource(int32_t numOfSources, SExchangeInfo* pInfo) { @@ -3005,12 +2999,8 @@ SOperatorInfo* createExchangeOperatorInfo(void* pTransporter, const SNodeList* p SExecTaskInfo* pTaskInfo) { SExchangeInfo* pInfo = taosMemoryCalloc(1, sizeof(SExchangeInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); - if (pInfo == NULL || pOperator == NULL) { - taosMemoryFreeClear(pInfo); - taosMemoryFreeClear(pOperator); - terrno = TSDB_CODE_QRY_OUT_OF_MEMORY; - return NULL; + goto _error; } size_t numOfSources = LIST_LENGTH(pSources); @@ -3035,18 +3025,17 @@ SOperatorInfo* createExchangeOperatorInfo(void* pTransporter, const SNodeList* p tsem_init(&pInfo->ready, 0, 0); - pOperator->name = "ExchangeOperator"; + pOperator->name = "ExchangeOperator"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_EXCHANGE; - pOperator->blocking = false; - pOperator->status = OP_NOT_OPENED; - pOperator->info = pInfo; - pOperator->numOfExprs = pBlock->info.numOfCols; - pOperator->pTaskInfo = pTaskInfo; + pOperator->blocking = false; + pOperator->status = OP_NOT_OPENED; + pOperator->info = pInfo; + pOperator->numOfExprs = pBlock->info.numOfCols; + pOperator->pTaskInfo = pTaskInfo; pOperator->fpSet = createOperatorFpSet(prepareLoadRemoteData, doLoadRemoteData, NULL, NULL, destroyExchangeOperatorInfo, NULL, NULL, NULL); pInfo->pTransporter = pTransporter; - return pOperator; _error: diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index 212a5391e1..2600c17060 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -269,15 +269,20 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator) { if (pOperator->status == OP_RES_TO_RETURN) { doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf); - if (pRes->info.rows == 0 || !hashRemainDataInGroupInfo(&pInfo->groupResInfo)) { + + size_t rows = pRes->info.rows; + if (rows == 0 || !hashRemainDataInGroupInfo(&pInfo->groupResInfo)) { doSetOperatorCompleted(pOperator); } + + pOperator->resultInfo.totalRows += rows; return (pRes->info.rows == 0)? NULL:pRes; } int32_t order = TSDB_ORDER_ASC; int32_t scanFlag = MAIN_SCAN; + int64_t st = taosGetTimestampUs(); SOperatorInfo* downstream = pOperator->pDownstream[0]; while (1) { @@ -317,6 +322,8 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator) { blockDataEnsureCapacity(pRes, pOperator->resultInfo.capacity); initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, 0); + pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0; + while(1) { doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf); doFilter(pInfo->pCondition, pRes, NULL); @@ -545,7 +552,7 @@ static SSDataBlock* buildPartitionResult(SOperatorInfo* pOperator) { // try next group data pInfo->pGroupIter = taosHashIterate(pInfo->pGroupSet, pInfo->pGroupIter); if (pInfo->pGroupIter == NULL) { - pOperator->status = OP_EXEC_DONE; + doSetOperatorCompleted(pOperator); return NULL; } @@ -562,6 +569,8 @@ static SSDataBlock* buildPartitionResult(SOperatorInfo* pOperator) { blockDataUpdateTsWindow(pInfo->binfo.pRes, 0); pInfo->binfo.pRes->info.groupId = pGroupInfo->groupId; + + pOperator->resultInfo.totalRows += pInfo->binfo.pRes->info.rows; return pInfo->binfo.pRes; } @@ -578,6 +587,7 @@ static SSDataBlock* hashPartition(SOperatorInfo* pOperator) { return buildPartitionResult(pOperator); } + int64_t st = taosGetTimestampUs(); SOperatorInfo* downstream = pOperator->pDownstream[0]; while (1) { @@ -589,6 +599,8 @@ static SSDataBlock* hashPartition(SOperatorInfo* pOperator) { doHashPartition(pOperator, pBlock); } + pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0; + pOperator->status = OP_RES_TO_RETURN; blockDataEnsureCapacity(pRes, 4096); return buildPartitionResult(pOperator); @@ -632,13 +644,14 @@ SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SExprInfo* } pOperator->name = "PartitionOperator"; - pOperator->blocking = true; + pOperator->blocking = true; pOperator->status = OP_NOT_OPENED; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_PARTITION; pInfo->binfo.pRes = pResultBlock; - pOperator->numOfExprs = numOfCols; + pOperator->numOfExprs = numOfCols; pOperator->pExpr = pExprInfo; pOperator->info = pInfo; + pOperator->pTaskInfo = pTaskInfo; pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, hashPartition, NULL, NULL, destroyPartitionOperatorInfo, NULL, NULL, NULL); diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 4ff3d9b8ed..f77b80c533 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1640,7 +1640,7 @@ static SSDataBlock* doTagScan(SOperatorInfo* pOperator) { count += 1; if (++pInfo->curPos >= pInfo->pTableGroups->numOfTables) { - pOperator->status = OP_EXEC_DONE; + doSetOperatorCompleted(pOperator); } } @@ -1652,6 +1652,8 @@ static SSDataBlock* doTagScan(SOperatorInfo* pOperator) { } pRes->info.rows = count; + pOperator->resultInfo.totalRows += count; + return (pRes->info.rows == 0) ? NULL : pInfo->pRes; } diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 2bf62a03bb..deca2f3804 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -943,6 +943,7 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator) { } int32_t order = TSDB_ORDER_ASC; + int64_t st = taosGetTimestampUs(); SOperatorInfo* downstream = pOperator->pDownstream[0]; while (1) { @@ -957,6 +958,8 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator) { doStateWindowAggImpl(pOperator, pInfo, pBlock); } + pOperator->cost.openCost = (taosGetTimestampUs() - st)/1000.0; + pOperator->status = OP_RES_TO_RETURN; closeAllResultRows(&pBInfo->resultRowInfo); @@ -967,7 +970,10 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator) { doSetOperatorCompleted(pOperator); } - return pBInfo->pRes->info.rows == 0 ? NULL : pBInfo->pRes; + size_t rows = pBInfo->pRes->info.rows; + pOperator->resultInfo.totalRows += rows; + + return (rows == 0)? NULL : pBInfo->pRes; } static SSDataBlock* doBuildIntervalResult(SOperatorInfo* pOperator) { @@ -1419,7 +1425,9 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator) { return pBInfo->pRes; } - int32_t order = TSDB_ORDER_ASC; + int64_t st = taosGetTimestampUs(); + int32_t order = TSDB_ORDER_ASC; + SOperatorInfo* downstream = pOperator->pDownstream[0]; while (1) { @@ -1435,6 +1443,8 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator) { doSessionWindowAggImpl(pOperator, pInfo, pBlock); } + pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0; + // restore the value pOperator->status = OP_RES_TO_RETURN; closeAllResultRows(&pBInfo->resultRowInfo); @@ -1446,7 +1456,10 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator) { doSetOperatorCompleted(pOperator); } - return pBInfo->pRes->info.rows == 0 ? NULL : pBInfo->pRes; + size_t rows = pBInfo->pRes->info.rows; + pOperator->resultInfo.totalRows += rows; + + return (rows == 0)? NULL : pBInfo->pRes; } static SSDataBlock* doAllIntervalAgg(SOperatorInfo* pOperator) {