From 08bd344d2ff0bfdca8fa18233076a16a9920d976 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Wed, 30 Mar 2022 17:59:05 +0800 Subject: [PATCH] feature/qnode --- source/libs/qcom/src/queryExplain.c | 22 +++++++++++----------- source/libs/scheduler/inc/schedulerInt.h | 8 ++++---- source/libs/scheduler/src/scheduler.c | 12 ++++++++---- 3 files changed, 23 insertions(+), 19 deletions(-) diff --git a/source/libs/qcom/src/queryExplain.c b/source/libs/qcom/src/queryExplain.c index e1bdf96331..91ca7f54b0 100644 --- a/source/libs/qcom/src/queryExplain.c +++ b/source/libs/qcom/src/queryExplain.c @@ -31,7 +31,7 @@ void qFreeExplainResTree(SExplainResNode *res) { FOREACH(node, res->pChildren) { qFreeExplainResTree((SExplainResNode *)node); } - nodesDestroyList(res->pChildren); + nodesClearList(res->pChildren); taosMemoryFreeClear(res); } @@ -214,7 +214,7 @@ int32_t qExplainResAppendRow(SExplainRowCtx *ctx, char *tbuf, int32_t len, int32 row.len = len; ctx->totalSize += len; - if (taosArrayPush(ctx->rows, &row)) { + if (NULL == taosArrayPush(ctx->rows, &row)) { qError("taosArrayPush row to explain res rows failed"); taosMemoryFree(row.buf); QRY_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); @@ -273,9 +273,9 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainRowCtx *ctx EXPLAIN_ROW_END(); QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1)); - if (pTblScanNode->pScanConditions) { + if (pTblScanNode->scan.node.pConditions) { EXPLAIN_ROW_NEW(level + 1, EXPLAIN_FILTER_FORMAT); - QRY_ERR_RET(nodesNodeToSQL(pTblScanNode->pScanConditions, tbuf, QUERY_EXPLAIN_MAX_RES_LEN, &tlen)); + QRY_ERR_RET(nodesNodeToSQL(pTblScanNode->scan.node.pConditions, tbuf, QUERY_EXPLAIN_MAX_RES_LEN, &tlen)); EXPLAIN_ROW_END(); QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1)); } @@ -301,7 +301,7 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainRowCtx *ctx } case QUERY_NODE_PHYSICAL_PLAN_PROJECT:{ SProjectPhysiNode *pPrjNode = (SProjectPhysiNode *)pNode; - EXPLAIN_ROW_NEW(level, EXPLAIN_PROJECTION_FORMAT, pPrjNode->pProjections->length, pPrjNode->node.pOutputDataBlockDesc->resultRowSize); + EXPLAIN_ROW_NEW(level, EXPLAIN_PROJECTION_FORMAT, pPrjNode->pProjections->length, pPrjNode->node.pOutputDataBlockDesc->outputRowSize); if (pResNode->pExecInfo) { QRY_ERR_RET(qExplainBufAppendExecInfo(pResNode->pExecInfo, tbuf, &tlen)); } @@ -318,7 +318,7 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainRowCtx *ctx } case QUERY_NODE_PHYSICAL_PLAN_JOIN:{ SJoinPhysiNode *pJoinNode = (SJoinPhysiNode *)pNode; - EXPLAIN_ROW_NEW(level, EXPLAIN_JOIN_FORMAT, EXPLAIN_JOIN_STRING(pJoinNode->joinType), pJoinNode->pTargets->length, pJoinNode->node.pOutputDataBlockDesc->resultRowSize); + EXPLAIN_ROW_NEW(level, EXPLAIN_JOIN_FORMAT, EXPLAIN_JOIN_STRING(pJoinNode->joinType), pJoinNode->pTargets->length, pJoinNode->node.pOutputDataBlockDesc->outputRowSize); if (pResNode->pExecInfo) { QRY_ERR_RET(qExplainBufAppendExecInfo(pResNode->pExecInfo, tbuf, &tlen)); } @@ -340,7 +340,7 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainRowCtx *ctx } case QUERY_NODE_PHYSICAL_PLAN_AGG:{ SAggPhysiNode *pAggNode = (SAggPhysiNode *)pNode; - EXPLAIN_ROW_NEW(level, EXPLAIN_AGG_FORMAT, pAggNode->pAggFuncs->length, pAggNode->pGroupKeys->length, pAggNode->node.pOutputDataBlockDesc->resultRowSize); + EXPLAIN_ROW_NEW(level, EXPLAIN_AGG_FORMAT, pAggNode->pAggFuncs->length, pAggNode->pGroupKeys->length, pAggNode->node.pOutputDataBlockDesc->outputRowSize); if (pResNode->pExecInfo) { QRY_ERR_RET(qExplainBufAppendExecInfo(pResNode->pExecInfo, tbuf, &tlen)); } @@ -357,7 +357,7 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainRowCtx *ctx } case QUERY_NODE_PHYSICAL_PLAN_EXCHANGE:{ SExchangePhysiNode *pExchNode = (SExchangePhysiNode *)pNode; - EXPLAIN_ROW_NEW(level, EXPLAIN_EXCHANGE_FORMAT, pExchNode->pSrcEndPoints->length, pExchNode->node.pOutputDataBlockDesc->resultRowSize); + EXPLAIN_ROW_NEW(level, EXPLAIN_EXCHANGE_FORMAT, pExchNode->pSrcEndPoints->length, pExchNode->node.pOutputDataBlockDesc->outputRowSize); if (pResNode->pExecInfo) { QRY_ERR_RET(qExplainBufAppendExecInfo(pResNode->pExecInfo, tbuf, &tlen)); } @@ -374,7 +374,7 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainRowCtx *ctx } case QUERY_NODE_PHYSICAL_PLAN_SORT:{ SSortPhysiNode *pSortNode = (SSortPhysiNode *)pNode; - EXPLAIN_ROW_NEW(level, EXPLAIN_SORT_FORMAT, pSortNode->pSortKeys->length, pSortNode->node.pOutputDataBlockDesc->resultRowSize); + EXPLAIN_ROW_NEW(level, EXPLAIN_SORT_FORMAT, pSortNode->pSortKeys->length, pSortNode->node.pOutputDataBlockDesc->outputRowSize); if (pResNode->pExecInfo) { QRY_ERR_RET(qExplainBufAppendExecInfo(pResNode->pExecInfo, tbuf, &tlen)); } @@ -392,7 +392,7 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainRowCtx *ctx case QUERY_NODE_PHYSICAL_PLAN_INTERVAL:{ SIntervalPhysiNode *pIntNode = (SIntervalPhysiNode *)pNode; EXPLAIN_ROW_NEW(level, EXPLAIN_INTERVAL_FORMAT, qGetNameFromColumnNode(pIntNode->pTspk), pIntNode->window.pFuncs->length, - pIntNode->interval, pIntNode->intervalUnit, pIntNode->offset, pIntNode->intervalUnit, pIntNode->sliding, pIntNode->slidingUnit, pIntNode->window.node.pOutputDataBlockDesc->resultRowSize); + pIntNode->interval, pIntNode->intervalUnit, pIntNode->offset, pIntNode->intervalUnit, pIntNode->sliding, pIntNode->slidingUnit, pIntNode->window.node.pOutputDataBlockDesc->outputRowSize); if (pResNode->pExecInfo) { QRY_ERR_RET(qExplainBufAppendExecInfo(pResNode->pExecInfo, tbuf, &tlen)); } @@ -415,7 +415,7 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainRowCtx *ctx } case QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW:{ SSessionWinodwPhysiNode *pIntNode = (SSessionWinodwPhysiNode *)pNode; - EXPLAIN_ROW_NEW(level, EXPLAIN_SESSION_FORMAT, pIntNode->gap, pIntNode->window.pFuncs->length, pIntNode->window.node.pOutputDataBlockDesc->resultRowSize); + EXPLAIN_ROW_NEW(level, EXPLAIN_SESSION_FORMAT, pIntNode->gap, pIntNode->window.pFuncs->length, pIntNode->window.node.pOutputDataBlockDesc->outputRowSize); if (pResNode->pExecInfo) { QRY_ERR_RET(qExplainBufAppendExecInfo(pResNode->pExecInfo, tbuf, &tlen)); } diff --git a/source/libs/scheduler/inc/schedulerInt.h b/source/libs/scheduler/inc/schedulerInt.h index 579a995056..1b2c7d54e2 100644 --- a/source/libs/scheduler/inc/schedulerInt.h +++ b/source/libs/scheduler/inc/schedulerInt.h @@ -147,10 +147,10 @@ typedef struct SSchTask { } SSchTask; typedef struct SSchJobAttr { - bool analyzeExplain; - bool syncSchedule; - bool queryJob; - bool needFlowCtrl; + EExplainMode explainMode; + bool syncSchedule; + bool queryJob; + bool needFlowCtrl; } SSchJobAttr; typedef struct SSchJob { diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index d11dafb19b..a26b5f32f3 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -2177,7 +2177,7 @@ static int32_t schExecJobImpl(void *transport, SArray *pNodeList, SQueryPlan *pD SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } - pJob->attr.analyzeExplain = (EXPLAIN_MODE_ANALYZE == pDag->explainInfo.mode); + pJob->attr.explainMode = pDag->explainInfo.mode; pJob->attr.syncSchedule = syncSchedule; pJob->transport = transport; pJob->sql = sql; @@ -2528,11 +2528,15 @@ int32_t schedulerFetchRows(int64_t job, void **pData) { SCH_JOB_DLOG("job already succeed, status:%s", jobTaskStatusStr(status)); goto _return; } else if (status == JOB_TASK_STATUS_PARTIAL_SUCCEED) { - SCH_ERR_JRET(schFetchFromRemote(pJob)); + if (!pJob->attr.explainMode == EXPLAIN_MODE_STATIC) { + SCH_ERR_JRET(schFetchFromRemote(pJob)); + tsem_wait(&pJob->rspSem); + } + } else { + SCH_JOB_ELOG("job status error for fetch, status:%s", jobTaskStatusStr(status)); + SCH_ERR_JRET(TSDB_CODE_SCH_STATUS_ERROR); } - tsem_wait(&pJob->rspSem); - status = SCH_GET_JOB_STATUS(pJob); if (JOB_TASK_STATUS_FAILED == status || JOB_TASK_STATUS_DROPPING == status) {