feature/qnode

This commit is contained in:
dapan1121 2022-03-30 17:59:05 +08:00
parent 5fae817e38
commit 08bd344d2f
3 changed files with 23 additions and 19 deletions

View File

@ -31,7 +31,7 @@ void qFreeExplainResTree(SExplainResNode *res) {
FOREACH(node, res->pChildren) { FOREACH(node, res->pChildren) {
qFreeExplainResTree((SExplainResNode *)node); qFreeExplainResTree((SExplainResNode *)node);
} }
nodesDestroyList(res->pChildren); nodesClearList(res->pChildren);
taosMemoryFreeClear(res); taosMemoryFreeClear(res);
} }
@ -214,7 +214,7 @@ int32_t qExplainResAppendRow(SExplainRowCtx *ctx, char *tbuf, int32_t len, int32
row.len = len; row.len = len;
ctx->totalSize += len; ctx->totalSize += len;
if (taosArrayPush(ctx->rows, &row)) { if (NULL == taosArrayPush(ctx->rows, &row)) {
qError("taosArrayPush row to explain res rows failed"); qError("taosArrayPush row to explain res rows failed");
taosMemoryFree(row.buf); taosMemoryFree(row.buf);
QRY_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); QRY_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
@ -273,9 +273,9 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainRowCtx *ctx
EXPLAIN_ROW_END(); EXPLAIN_ROW_END();
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1)); QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1));
if (pTblScanNode->pScanConditions) { if (pTblScanNode->scan.node.pConditions) {
EXPLAIN_ROW_NEW(level + 1, EXPLAIN_FILTER_FORMAT); EXPLAIN_ROW_NEW(level + 1, EXPLAIN_FILTER_FORMAT);
QRY_ERR_RET(nodesNodeToSQL(pTblScanNode->pScanConditions, tbuf, QUERY_EXPLAIN_MAX_RES_LEN, &tlen)); QRY_ERR_RET(nodesNodeToSQL(pTblScanNode->scan.node.pConditions, tbuf, QUERY_EXPLAIN_MAX_RES_LEN, &tlen));
EXPLAIN_ROW_END(); EXPLAIN_ROW_END();
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1)); QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1));
} }
@ -301,7 +301,7 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainRowCtx *ctx
} }
case QUERY_NODE_PHYSICAL_PLAN_PROJECT:{ case QUERY_NODE_PHYSICAL_PLAN_PROJECT:{
SProjectPhysiNode *pPrjNode = (SProjectPhysiNode *)pNode; SProjectPhysiNode *pPrjNode = (SProjectPhysiNode *)pNode;
EXPLAIN_ROW_NEW(level, EXPLAIN_PROJECTION_FORMAT, pPrjNode->pProjections->length, pPrjNode->node.pOutputDataBlockDesc->resultRowSize); EXPLAIN_ROW_NEW(level, EXPLAIN_PROJECTION_FORMAT, pPrjNode->pProjections->length, pPrjNode->node.pOutputDataBlockDesc->outputRowSize);
if (pResNode->pExecInfo) { if (pResNode->pExecInfo) {
QRY_ERR_RET(qExplainBufAppendExecInfo(pResNode->pExecInfo, tbuf, &tlen)); QRY_ERR_RET(qExplainBufAppendExecInfo(pResNode->pExecInfo, tbuf, &tlen));
} }
@ -318,7 +318,7 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainRowCtx *ctx
} }
case QUERY_NODE_PHYSICAL_PLAN_JOIN:{ case QUERY_NODE_PHYSICAL_PLAN_JOIN:{
SJoinPhysiNode *pJoinNode = (SJoinPhysiNode *)pNode; SJoinPhysiNode *pJoinNode = (SJoinPhysiNode *)pNode;
EXPLAIN_ROW_NEW(level, EXPLAIN_JOIN_FORMAT, EXPLAIN_JOIN_STRING(pJoinNode->joinType), pJoinNode->pTargets->length, pJoinNode->node.pOutputDataBlockDesc->resultRowSize); EXPLAIN_ROW_NEW(level, EXPLAIN_JOIN_FORMAT, EXPLAIN_JOIN_STRING(pJoinNode->joinType), pJoinNode->pTargets->length, pJoinNode->node.pOutputDataBlockDesc->outputRowSize);
if (pResNode->pExecInfo) { if (pResNode->pExecInfo) {
QRY_ERR_RET(qExplainBufAppendExecInfo(pResNode->pExecInfo, tbuf, &tlen)); QRY_ERR_RET(qExplainBufAppendExecInfo(pResNode->pExecInfo, tbuf, &tlen));
} }
@ -340,7 +340,7 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainRowCtx *ctx
} }
case QUERY_NODE_PHYSICAL_PLAN_AGG:{ case QUERY_NODE_PHYSICAL_PLAN_AGG:{
SAggPhysiNode *pAggNode = (SAggPhysiNode *)pNode; SAggPhysiNode *pAggNode = (SAggPhysiNode *)pNode;
EXPLAIN_ROW_NEW(level, EXPLAIN_AGG_FORMAT, pAggNode->pAggFuncs->length, pAggNode->pGroupKeys->length, pAggNode->node.pOutputDataBlockDesc->resultRowSize); EXPLAIN_ROW_NEW(level, EXPLAIN_AGG_FORMAT, pAggNode->pAggFuncs->length, pAggNode->pGroupKeys->length, pAggNode->node.pOutputDataBlockDesc->outputRowSize);
if (pResNode->pExecInfo) { if (pResNode->pExecInfo) {
QRY_ERR_RET(qExplainBufAppendExecInfo(pResNode->pExecInfo, tbuf, &tlen)); QRY_ERR_RET(qExplainBufAppendExecInfo(pResNode->pExecInfo, tbuf, &tlen));
} }
@ -357,7 +357,7 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainRowCtx *ctx
} }
case QUERY_NODE_PHYSICAL_PLAN_EXCHANGE:{ case QUERY_NODE_PHYSICAL_PLAN_EXCHANGE:{
SExchangePhysiNode *pExchNode = (SExchangePhysiNode *)pNode; SExchangePhysiNode *pExchNode = (SExchangePhysiNode *)pNode;
EXPLAIN_ROW_NEW(level, EXPLAIN_EXCHANGE_FORMAT, pExchNode->pSrcEndPoints->length, pExchNode->node.pOutputDataBlockDesc->resultRowSize); EXPLAIN_ROW_NEW(level, EXPLAIN_EXCHANGE_FORMAT, pExchNode->pSrcEndPoints->length, pExchNode->node.pOutputDataBlockDesc->outputRowSize);
if (pResNode->pExecInfo) { if (pResNode->pExecInfo) {
QRY_ERR_RET(qExplainBufAppendExecInfo(pResNode->pExecInfo, tbuf, &tlen)); QRY_ERR_RET(qExplainBufAppendExecInfo(pResNode->pExecInfo, tbuf, &tlen));
} }
@ -374,7 +374,7 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainRowCtx *ctx
} }
case QUERY_NODE_PHYSICAL_PLAN_SORT:{ case QUERY_NODE_PHYSICAL_PLAN_SORT:{
SSortPhysiNode *pSortNode = (SSortPhysiNode *)pNode; SSortPhysiNode *pSortNode = (SSortPhysiNode *)pNode;
EXPLAIN_ROW_NEW(level, EXPLAIN_SORT_FORMAT, pSortNode->pSortKeys->length, pSortNode->node.pOutputDataBlockDesc->resultRowSize); EXPLAIN_ROW_NEW(level, EXPLAIN_SORT_FORMAT, pSortNode->pSortKeys->length, pSortNode->node.pOutputDataBlockDesc->outputRowSize);
if (pResNode->pExecInfo) { if (pResNode->pExecInfo) {
QRY_ERR_RET(qExplainBufAppendExecInfo(pResNode->pExecInfo, tbuf, &tlen)); QRY_ERR_RET(qExplainBufAppendExecInfo(pResNode->pExecInfo, tbuf, &tlen));
} }
@ -392,7 +392,7 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainRowCtx *ctx
case QUERY_NODE_PHYSICAL_PLAN_INTERVAL:{ case QUERY_NODE_PHYSICAL_PLAN_INTERVAL:{
SIntervalPhysiNode *pIntNode = (SIntervalPhysiNode *)pNode; SIntervalPhysiNode *pIntNode = (SIntervalPhysiNode *)pNode;
EXPLAIN_ROW_NEW(level, EXPLAIN_INTERVAL_FORMAT, qGetNameFromColumnNode(pIntNode->pTspk), pIntNode->window.pFuncs->length, EXPLAIN_ROW_NEW(level, EXPLAIN_INTERVAL_FORMAT, qGetNameFromColumnNode(pIntNode->pTspk), pIntNode->window.pFuncs->length,
pIntNode->interval, pIntNode->intervalUnit, pIntNode->offset, pIntNode->intervalUnit, pIntNode->sliding, pIntNode->slidingUnit, pIntNode->window.node.pOutputDataBlockDesc->resultRowSize); pIntNode->interval, pIntNode->intervalUnit, pIntNode->offset, pIntNode->intervalUnit, pIntNode->sliding, pIntNode->slidingUnit, pIntNode->window.node.pOutputDataBlockDesc->outputRowSize);
if (pResNode->pExecInfo) { if (pResNode->pExecInfo) {
QRY_ERR_RET(qExplainBufAppendExecInfo(pResNode->pExecInfo, tbuf, &tlen)); QRY_ERR_RET(qExplainBufAppendExecInfo(pResNode->pExecInfo, tbuf, &tlen));
} }
@ -415,7 +415,7 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainRowCtx *ctx
} }
case QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW:{ case QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW:{
SSessionWinodwPhysiNode *pIntNode = (SSessionWinodwPhysiNode *)pNode; SSessionWinodwPhysiNode *pIntNode = (SSessionWinodwPhysiNode *)pNode;
EXPLAIN_ROW_NEW(level, EXPLAIN_SESSION_FORMAT, pIntNode->gap, pIntNode->window.pFuncs->length, pIntNode->window.node.pOutputDataBlockDesc->resultRowSize); EXPLAIN_ROW_NEW(level, EXPLAIN_SESSION_FORMAT, pIntNode->gap, pIntNode->window.pFuncs->length, pIntNode->window.node.pOutputDataBlockDesc->outputRowSize);
if (pResNode->pExecInfo) { if (pResNode->pExecInfo) {
QRY_ERR_RET(qExplainBufAppendExecInfo(pResNode->pExecInfo, tbuf, &tlen)); QRY_ERR_RET(qExplainBufAppendExecInfo(pResNode->pExecInfo, tbuf, &tlen));
} }

View File

@ -147,10 +147,10 @@ typedef struct SSchTask {
} SSchTask; } SSchTask;
typedef struct SSchJobAttr { typedef struct SSchJobAttr {
bool analyzeExplain; EExplainMode explainMode;
bool syncSchedule; bool syncSchedule;
bool queryJob; bool queryJob;
bool needFlowCtrl; bool needFlowCtrl;
} SSchJobAttr; } SSchJobAttr;
typedef struct SSchJob { typedef struct SSchJob {

View File

@ -2177,7 +2177,7 @@ static int32_t schExecJobImpl(void *transport, SArray *pNodeList, SQueryPlan *pD
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
} }
pJob->attr.analyzeExplain = (EXPLAIN_MODE_ANALYZE == pDag->explainInfo.mode); pJob->attr.explainMode = pDag->explainInfo.mode;
pJob->attr.syncSchedule = syncSchedule; pJob->attr.syncSchedule = syncSchedule;
pJob->transport = transport; pJob->transport = transport;
pJob->sql = sql; pJob->sql = sql;
@ -2528,11 +2528,15 @@ int32_t schedulerFetchRows(int64_t job, void **pData) {
SCH_JOB_DLOG("job already succeed, status:%s", jobTaskStatusStr(status)); SCH_JOB_DLOG("job already succeed, status:%s", jobTaskStatusStr(status));
goto _return; goto _return;
} else if (status == JOB_TASK_STATUS_PARTIAL_SUCCEED) { } else if (status == JOB_TASK_STATUS_PARTIAL_SUCCEED) {
SCH_ERR_JRET(schFetchFromRemote(pJob)); if (!pJob->attr.explainMode == EXPLAIN_MODE_STATIC) {
SCH_ERR_JRET(schFetchFromRemote(pJob));
tsem_wait(&pJob->rspSem);
}
} else {
SCH_JOB_ELOG("job status error for fetch, status:%s", jobTaskStatusStr(status));
SCH_ERR_JRET(TSDB_CODE_SCH_STATUS_ERROR);
} }
tsem_wait(&pJob->rspSem);
status = SCH_GET_JOB_STATUS(pJob); status = SCH_GET_JOB_STATUS(pJob);
if (JOB_TASK_STATUS_FAILED == status || JOB_TASK_STATUS_DROPPING == status) { if (JOB_TASK_STATUS_FAILED == status || JOB_TASK_STATUS_DROPPING == status) {