From 0608824186c025e294ee0aa3384ddbc4f66c44c8 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Thu, 31 Mar 2022 18:02:58 +0800 Subject: [PATCH] feature/qnode --- include/common/ttime.h | 1 + include/libs/command/command.h | 5 + include/libs/nodes/plannodes.h | 2 +- include/libs/qcom/query.h | 10 +- source/common/src/ttime.c | 12 ++ source/libs/command/inc/commandInt.h | 95 +++++++++++++++ .../queryExplain.c => command/src/explain.c} | 112 ++++++++++++++++-- source/libs/qcom/inc/queryInt.h | 71 ----------- source/libs/scheduler/CMakeLists.txt | 2 +- source/libs/scheduler/inc/schedulerInt.h | 7 -- source/libs/scheduler/src/scheduler.c | 95 +-------------- 11 files changed, 227 insertions(+), 185 deletions(-) create mode 100644 source/libs/command/inc/commandInt.h rename source/libs/{qcom/src/queryExplain.c => command/src/explain.c} (85%) diff --git a/include/common/ttime.h b/include/common/ttime.h index 2209cc998f..306f54bedb 100644 --- a/include/common/ttime.h +++ b/include/common/ttime.h @@ -60,6 +60,7 @@ int32_t parseNatualDuration(const char* token, int32_t tokenLen, int64_t* durati int32_t taosParseTime(const char* timestr, int64_t* time, int32_t len, int32_t timePrec, int8_t dayligth); void deltaToUtcInitOnce(); +char getPrecisionUnit(int32_t precision); int64_t convertTimePrecision(int64_t time, int32_t fromPrecision, int32_t toPrecision); int64_t convertTimeFromPrecisionToUnit(int64_t time, int32_t fromPrecision, char toUnit); diff --git a/include/libs/command/command.h b/include/libs/command/command.h index 699c2f9792..7e58d39692 100644 --- a/include/libs/command/command.h +++ b/include/libs/command/command.h @@ -15,5 +15,10 @@ #include "cmdnodes.h" #include "tmsg.h" +#include "plannodes.h" int32_t qExecCommand(SNode* pStmt, SRetrieveTableRsp** pRsp); + +int32_t qExecStaticExplain(SQueryPlan *pDag, SRetrieveTableRsp **pRsp); + + diff --git a/include/libs/nodes/plannodes.h b/include/libs/nodes/plannodes.h index f41e049196..37163f60dd 100644 --- a/include/libs/nodes/plannodes.h +++ b/include/libs/nodes/plannodes.h @@ -253,7 +253,7 @@ typedef struct SIntervalPhysiNode { int64_t sliding; int8_t intervalUnit; int8_t slidingUnit; - uint8_t precision; + uint8_t precision; SFillNode* pFill; } SIntervalPhysiNode; diff --git a/include/libs/qcom/query.h b/include/libs/qcom/query.h index 5908b0b875..bb550e75e8 100644 --- a/include/libs/qcom/query.h +++ b/include/libs/qcom/query.h @@ -174,11 +174,6 @@ bool tIsValidSchema(struct SSchema* pSchema, int32_t numOfCols, int32_ int32_t queryCreateTableMetaFromMsg(STableMetaRsp* msg, bool isSuperTable, STableMeta** pMeta); char *jobTaskStatusStr(int32_t status); -int32_t qInitExplainCtx(void **pCtx, SHashObj *groupHash, bool verbose); -int32_t qAppendTaskExplainResRows(void *pCtx, int32_t groupId, int32_t level); -int32_t qGetExplainRspFromCtx(void *ctx, SRetrieveTableRsp **pRsp); -void qFreeExplainCtx(void *ctx); - SSchema createSchema(int8_t type, int32_t bytes, col_id_t colId, const char* name); extern int32_t (*queryBuildMsg[TDMT_MAX])(void* input, char** msg, int32_t msgSize, int32_t* msgLen); @@ -241,6 +236,11 @@ extern int32_t (*queryProcessMsgRsp[TDMT_MAX])(void* output, char* msg, int32_t } \ } while (0) +#define QRY_ERR_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; return _code; } } while (0) +#define QRY_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; } return _code; } while (0) +#define QRY_ERR_JRET(c) do { code = c; if (code != TSDB_CODE_SUCCESS) { terrno = code; goto _return; } } while (0) + + #ifdef __cplusplus } #endif diff --git a/source/common/src/ttime.c b/source/common/src/ttime.c index 23b19b55e7..a65352f2b9 100644 --- a/source/common/src/ttime.c +++ b/source/common/src/ttime.c @@ -361,6 +361,18 @@ int32_t parseLocaltimeDst(char* timestr, int64_t* time, int32_t timePrec) { return 0; } +char getPrecisionUnit(int32_t precision) { + static char units[3] = {TIME_UNIT_MILLISECOND, TIME_UNIT_MICROSECOND, TIME_UNIT_NANOSECOND}; + switch (precision) { + case TSDB_TIME_PRECISION_MILLI: + case TSDB_TIME_PRECISION_MICRO: + case TSDB_TIME_PRECISION_NANO: + return units[precision]; + default: + return 0; + } +} + int64_t convertTimePrecision(int64_t time, int32_t fromPrecision, int32_t toPrecision) { assert(fromPrecision == TSDB_TIME_PRECISION_MILLI || fromPrecision == TSDB_TIME_PRECISION_MICRO || fromPrecision == TSDB_TIME_PRECISION_NANO); diff --git a/source/libs/command/inc/commandInt.h b/source/libs/command/inc/commandInt.h new file mode 100644 index 0000000000..5915aa7f36 --- /dev/null +++ b/source/libs/command/inc/commandInt.h @@ -0,0 +1,95 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef _TD_QUERY_INT_H_ +#define _TD_QUERY_INT_H_ + +#ifdef __cplusplus +extern "C" { +#endif +#include "nodes.h" +#include "plannodes.h" +#include "ttime.h" + +#define EXPLAIN_MAX_GROUP_NUM 100 + +//newline area +#define EXPLAIN_TAG_SCAN_FORMAT "Tag Scan on %s columns=%d" +#define EXPLAIN_TBL_SCAN_FORMAT "Table Scan on %s columns=%d" +#define EXPLAIN_SYSTBL_SCAN_FORMAT "System Table Scan on %s columns=%d" +#define EXPLAIN_PROJECTION_FORMAT "Projection columns=%d width=%d" +#define EXPLAIN_JOIN_FORMAT "%s between %d tables width=%d" +#define EXPLAIN_AGG_FORMAT "Aggragate functions=%d" +#define EXPLAIN_EXCHANGE_FORMAT "Data Exchange %d:1 width=%d" +#define EXPLAIN_SORT_FORMAT "Sort on %d Column(s) width=%d" +#define EXPLAIN_INTERVAL_FORMAT "Interval on Column %s functions=%d interval=%" PRId64 "%c offset=%" PRId64 "%c sliding=%" PRId64 "%c width=%d" +#define EXPLAIN_SESSION_FORMAT "Session gap=%" PRId64 " functions=%d width=%d" +#define EXPLAIN_ORDER_FORMAT "Order: %s" +#define EXPLAIN_FILTER_FORMAT "Filter: " +#define EXPLAIN_FILL_FORMAT "Fill: %s" +#define EXPLAIN_ON_CONDITIONS_FORMAT "Join Cond: " +#define EXPLAIN_TIMERANGE_FORMAT "Time Range: [%" PRId64 ", %" PRId64 "]" + +//append area +#define EXPLAIN_GROUPS_FORMAT " groups=%d" +#define EXPLAIN_WIDTH_FORMAT " width=%d" +#define EXPLAIN_LOOPS_FORMAT " loops=%d" +#define EXPLAIN_REVERSE_FORMAT " reverse=%d" + +typedef struct SExplainGroup { + int32_t nodeNum; + SSubplan *plan; + void *execInfo; //TODO +} SExplainGroup; + +typedef struct SExplainResNode { + SNodeList* pChildren; + SPhysiNode* pNode; + void* pExecInfo; +} SExplainResNode; + +typedef struct SQueryExplainRowInfo { + int32_t level; + int32_t len; + char *buf; +} SQueryExplainRowInfo; + +typedef struct SExplainCtx { + int32_t totalSize; + bool verbose; + char *tbuf; + SArray *rows; + SHashObj *groupHash; +} SExplainCtx; + +#define EXPLAIN_ORDER_STRING(_order) ((TSDB_ORDER_ASC == _order) ? "Ascending" : "Descending") +#define EXPLAIN_JOIN_STRING(_type) ((JOIN_TYPE_INNER == _type) ? "Inner join" : "Join") + +#define INVERAL_TIME_FROM_PRECISION_TO_UNIT(_t, _u, _p) (((_u) == 'n' || (_u) == 'y') ? (_t) : (convertTimeFromPrecisionToUnit(_t, _p, _u))) + +#define EXPLAIN_ROW_NEW(level, ...) \ + do { \ + tlen = snprintf(tbuf + VARSTR_HEADER_SIZE, TSDB_EXPLAIN_RESULT_ROW_SIZE, "%*s%s", (level) * 2, "", (isVerboseLine ? "" : "-> ")); \ + tlen += snprintf(tbuf + VARSTR_HEADER_SIZE + tlen, TSDB_EXPLAIN_RESULT_ROW_SIZE - tlen, __VA_ARGS__); \ + } while (0) + +#define EXPLAIN_ROW_APPEND(...) tlen += snprintf(tbuf + VARSTR_HEADER_SIZE + tlen, TSDB_EXPLAIN_RESULT_ROW_SIZE - tlen, __VA_ARGS__) +#define EXPLAIN_ROW_END() do { varDataSetLen(tbuf, tlen); tlen += VARSTR_HEADER_SIZE; isVerboseLine = true; } while (0) + +#ifdef __cplusplus +} +#endif + +#endif /*_TD_QUERY_INT_H_*/ diff --git a/source/libs/qcom/src/queryExplain.c b/source/libs/command/src/explain.c similarity index 85% rename from source/libs/qcom/src/queryExplain.c rename to source/libs/command/src/explain.c index af092e842e..5fa12f5ebe 100644 --- a/source/libs/qcom/src/queryExplain.c +++ b/source/libs/command/src/explain.c @@ -13,11 +13,12 @@ * along with this program. If not, see . */ -#include "queryInt.h" #include "query.h" #include "plannodes.h" +#include "commandInt.h" int32_t qGenerateExplainResNode(SPhysiNode *pNode, void *pExecInfo, SExplainResNode **pRes); +int32_t qAppendTaskExplainResRows(void *pCtx, int32_t groupId, int32_t level); void qFreeExplainResTree(SExplainResNode *res) { @@ -265,6 +266,7 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i bool isVerboseLine = false; char *tbuf = ctx->tbuf; bool verbose = ctx->verbose; + int32_t filterLen = 0; SPhysiNode* pNode = pResNode->pNode; if (NULL == pNode) { qError("pyhsical node in explain res node is NULL"); @@ -317,7 +319,8 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i if (pTblScanNode->scan.node.pConditions) { EXPLAIN_ROW_NEW(level + 1, EXPLAIN_FILTER_FORMAT); - QRY_ERR_RET(nodesNodeToSQL(pTblScanNode->scan.node.pConditions, tbuf, TSDB_EXPLAIN_RESULT_ROW_SIZE, &tlen)); + QRY_ERR_RET(nodesNodeToSQL(pTblScanNode->scan.node.pConditions, tbuf + tlen, TSDB_EXPLAIN_RESULT_ROW_SIZE - tlen, &filterLen)); + tlen += filterLen; EXPLAIN_ROW_END(); QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1)); } @@ -356,7 +359,8 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i if (verbose) { if (pPrjNode->node.pConditions) { EXPLAIN_ROW_NEW(level + 1, EXPLAIN_FILTER_FORMAT); - QRY_ERR_RET(nodesNodeToSQL(pPrjNode->node.pConditions, tbuf, TSDB_EXPLAIN_RESULT_ROW_SIZE, &tlen)); + QRY_ERR_RET(nodesNodeToSQL(pPrjNode->node.pConditions, tbuf + tlen, TSDB_EXPLAIN_RESULT_ROW_SIZE - tlen, &filterLen)); + tlen += filterLen; EXPLAIN_ROW_END(); QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1)); } @@ -375,13 +379,15 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i if (verbose) { if (pJoinNode->node.pConditions) { EXPLAIN_ROW_NEW(level + 1, EXPLAIN_FILTER_FORMAT); - QRY_ERR_RET(nodesNodeToSQL(pJoinNode->node.pConditions, tbuf, TSDB_EXPLAIN_RESULT_ROW_SIZE, &tlen)); + QRY_ERR_RET(nodesNodeToSQL(pJoinNode->node.pConditions, tbuf + tlen, TSDB_EXPLAIN_RESULT_ROW_SIZE - tlen, &filterLen)); + tlen += filterLen; EXPLAIN_ROW_END(); QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1)); } EXPLAIN_ROW_NEW(level + 1, EXPLAIN_ON_CONDITIONS_FORMAT); - QRY_ERR_RET(nodesNodeToSQL(pJoinNode->pOnConditions, tbuf, TSDB_EXPLAIN_RESULT_ROW_SIZE, &tlen)); + QRY_ERR_RET(nodesNodeToSQL(pJoinNode->pOnConditions, tbuf + tlen, TSDB_EXPLAIN_RESULT_ROW_SIZE - tlen, &filterLen)); + tlen += filterLen; EXPLAIN_ROW_END(); QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1)); } @@ -404,7 +410,8 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i if (verbose) { if (pAggNode->node.pConditions) { EXPLAIN_ROW_NEW(level + 1, EXPLAIN_FILTER_FORMAT); - QRY_ERR_RET(nodesNodeToSQL(pAggNode->node.pConditions, tbuf, TSDB_EXPLAIN_RESULT_ROW_SIZE, &tlen)); + QRY_ERR_RET(nodesNodeToSQL(pAggNode->node.pConditions, tbuf + tlen, TSDB_EXPLAIN_RESULT_ROW_SIZE - tlen, &filterLen)); + tlen += filterLen; EXPLAIN_ROW_END(); QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1)); } @@ -429,7 +436,8 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i if (verbose) { if (pExchNode->node.pConditions) { EXPLAIN_ROW_NEW(level + 1, EXPLAIN_FILTER_FORMAT); - QRY_ERR_RET(nodesNodeToSQL(pExchNode->node.pConditions, tbuf, TSDB_EXPLAIN_RESULT_ROW_SIZE, &tlen)); + QRY_ERR_RET(nodesNodeToSQL(pExchNode->node.pConditions, tbuf + tlen, TSDB_EXPLAIN_RESULT_ROW_SIZE - tlen, &filterLen)); + tlen += filterLen; EXPLAIN_ROW_END(); QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1)); } @@ -450,7 +458,8 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i if (verbose) { if (pSortNode->node.pConditions) { EXPLAIN_ROW_NEW(level + 1, EXPLAIN_FILTER_FORMAT); - QRY_ERR_RET(nodesNodeToSQL(pSortNode->node.pConditions, tbuf, TSDB_EXPLAIN_RESULT_ROW_SIZE, &tlen)); + QRY_ERR_RET(nodesNodeToSQL(pSortNode->node.pConditions, tbuf + tlen, TSDB_EXPLAIN_RESULT_ROW_SIZE - tlen, &filterLen)); + tlen += filterLen; EXPLAIN_ROW_END(); QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1)); } @@ -461,7 +470,7 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i SIntervalPhysiNode *pIntNode = (SIntervalPhysiNode *)pNode; EXPLAIN_ROW_NEW(level, EXPLAIN_INTERVAL_FORMAT, qGetNameFromColumnNode(pIntNode->pTspk), pIntNode->window.pFuncs->length, INVERAL_TIME_FROM_PRECISION_TO_UNIT(pIntNode->interval, pIntNode->intervalUnit, pIntNode->precision), pIntNode->intervalUnit, - INVERAL_TIME_FROM_PRECISION_TO_UNIT(pIntNode->offset, pIntNode->intervalUnit, pIntNode->precision), pIntNode->intervalUnit, + pIntNode->offset, getPrecisionUnit(pIntNode->precision), INVERAL_TIME_FROM_PRECISION_TO_UNIT(pIntNode->sliding, pIntNode->slidingUnit, pIntNode->precision), pIntNode->slidingUnit, pIntNode->window.node.pOutputDataBlockDesc->outputRowSize); if (pResNode->pExecInfo) { @@ -479,7 +488,8 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i if (pIntNode->window.node.pConditions) { EXPLAIN_ROW_NEW(level + 1, EXPLAIN_FILTER_FORMAT); - QRY_ERR_RET(nodesNodeToSQL(pIntNode->window.node.pConditions, tbuf, TSDB_EXPLAIN_RESULT_ROW_SIZE, &tlen)); + QRY_ERR_RET(nodesNodeToSQL(pIntNode->window.node.pConditions, tbuf + tlen, TSDB_EXPLAIN_RESULT_ROW_SIZE - tlen, &filterLen)); + tlen += filterLen; EXPLAIN_ROW_END(); QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1)); } @@ -498,7 +508,8 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i if (verbose) { if (pIntNode->window.node.pConditions) { EXPLAIN_ROW_NEW(level + 1, EXPLAIN_FILTER_FORMAT); - QRY_ERR_RET(nodesNodeToSQL(pIntNode->window.node.pConditions, tbuf, TSDB_EXPLAIN_RESULT_ROW_SIZE, &tlen)); + QRY_ERR_RET(nodesNodeToSQL(pIntNode->window.node.pConditions, tbuf + tlen, TSDB_EXPLAIN_RESULT_ROW_SIZE - tlen, &filterLen)); + tlen += filterLen; EXPLAIN_ROW_END(); QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1)); } @@ -595,5 +606,84 @@ int32_t qGetExplainRspFromCtx(void *ctx, SRetrieveTableRsp **pRsp) { return TSDB_CODE_SUCCESS; } +int32_t qExecStaticExplain(SQueryPlan *pDag, SRetrieveTableRsp **pRsp) { + int32_t code = 0; + SNodeListNode *plans = NULL; + int32_t taskNum = 0; + SExplainGroup *pGroup = NULL; + void *pCtx = NULL; + int32_t rootGroupId = 0; + + if (pDag->numOfSubplans <= 0) { + qError("invalid subplan num:%d", pDag->numOfSubplans); + QRY_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); + } + + int32_t levelNum = (int32_t)LIST_LENGTH(pDag->pSubplans); + if (levelNum <= 0) { + qError("invalid level num:%d", levelNum); + QRY_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); + } + + SHashObj *groupHash = taosHashInit(EXPLAIN_MAX_GROUP_NUM, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK); + if (NULL == groupHash) { + qError("groupHash %d failed", EXPLAIN_MAX_GROUP_NUM); + QRY_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); + } + + QRY_ERR_JRET(qInitExplainCtx(&pCtx, groupHash, pDag->explainInfo.verbose)); + + for (int32_t i = 0; i < levelNum; ++i) { + plans = (SNodeListNode *)nodesListGetNode(pDag->pSubplans, i); + if (NULL == plans) { + qError("empty level plan, level:%d", i); + QRY_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT); + } + + taskNum = (int32_t)LIST_LENGTH(plans->pNodeList); + if (taskNum <= 0) { + qError("invalid level plan number:%d, level:%d", taskNum, i); + QRY_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT); + } + + SSubplan *plan = NULL; + for (int32_t n = 0; n < taskNum; ++n) { + plan = (SSubplan *)nodesListGetNode(plans->pNodeList, n); + pGroup = taosHashGet(groupHash, &plan->id.groupId, sizeof(plan->id.groupId)); + if (pGroup) { + ++pGroup->nodeNum; + continue; + } + + SExplainGroup group = {.nodeNum = 1, .plan = plan, .execInfo = NULL}; + if (0 != taosHashPut(groupHash, &plan->id.groupId, sizeof(plan->id.groupId), &group, sizeof(group))) { + qError("taosHashPut to explainGroupHash failed, taskIdx:%d", n); + QRY_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); + } + } + + if (0 == i) { + if (taskNum > 1) { + qError("invalid taskNum %d for level 0", taskNum); + QRY_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT); + } + + rootGroupId = plan->id.groupId; + } + + qDebug("level %d group handled, taskNum:%d", i, taskNum); + } + + QRY_ERR_JRET(qAppendTaskExplainResRows(pCtx, rootGroupId, 0)); + + QRY_ERR_JRET(qGetExplainRspFromCtx(pCtx, pRsp)); + +_return: + + qFreeExplainCtx(pCtx); + + QRY_RET(code); +} + diff --git a/source/libs/qcom/inc/queryInt.h b/source/libs/qcom/inc/queryInt.h index 06d5b07dc7..f120bf26ce 100644 --- a/source/libs/qcom/inc/queryInt.h +++ b/source/libs/qcom/inc/queryInt.h @@ -19,77 +19,6 @@ #ifdef __cplusplus extern "C" { #endif -#include "nodes.h" -#include "plannodes.h" - -//newline area -#define EXPLAIN_TAG_SCAN_FORMAT "Tag Scan on %s columns=%d" -#define EXPLAIN_TBL_SCAN_FORMAT "Table Scan on %s columns=%d" -#define EXPLAIN_SYSTBL_SCAN_FORMAT "System Table Scan on %s columns=%d" -#define EXPLAIN_PROJECTION_FORMAT "Projection columns=%d width=%d" -#define EXPLAIN_JOIN_FORMAT "%s between %d tables width=%d" -#define EXPLAIN_AGG_FORMAT "Aggragate functions=%d" -#define EXPLAIN_EXCHANGE_FORMAT "Data Exchange %d:1 width=%d" -#define EXPLAIN_SORT_FORMAT "Sort on %d Column(s) width=%d" -#define EXPLAIN_INTERVAL_FORMAT "Interval on Column %s functions=%d interval=%" PRId64 "%c offset=%" PRId64 "%c sliding=%" PRId64 "%c width=%d" -#define EXPLAIN_SESSION_FORMAT "Session gap=%" PRId64 " functions=%d width=%d" -#define EXPLAIN_ORDER_FORMAT "Order: %s" -#define EXPLAIN_FILTER_FORMAT "Filter: " -#define EXPLAIN_FILL_FORMAT "Fill: %s" -#define EXPLAIN_ON_CONDITIONS_FORMAT "Join Cond: " -#define EXPLAIN_TIMERANGE_FORMAT "Time Range: [%" PRId64 ", %" PRId64 "]" - -//append area -#define EXPLAIN_GROUPS_FORMAT " groups=%d" -#define EXPLAIN_WIDTH_FORMAT " width=%d" -#define EXPLAIN_LOOPS_FORMAT " loops=%d" -#define EXPLAIN_REVERSE_FORMAT " reverse=%d" - -//TODO MOVE TO LIB -typedef struct SExplainGroup { - int32_t nodeNum; - SSubplan *plan; - void *execInfo; //TODO -} SExplainGroup; - - -typedef struct SExplainResNode { - SNodeList* pChildren; - SPhysiNode* pNode; - void* pExecInfo; -} SExplainResNode; - -typedef struct SQueryExplainRowInfo { - int32_t level; - int32_t len; - char *buf; -} SQueryExplainRowInfo; - -typedef struct SExplainCtx { - int32_t totalSize; - bool verbose; - char *tbuf; - SArray *rows; - SHashObj *groupHash; -} SExplainCtx; - -#define EXPLAIN_ORDER_STRING(_order) ((TSDB_ORDER_ASC == _order) ? "Ascending" : "Descending") -#define EXPLAIN_JOIN_STRING(_type) ((JOIN_TYPE_INNER == _type) ? "Inner join" : "Join") - -#define INVERAL_TIME_FROM_PRECISION_TO_UNIT(_t, _u, _p) (((_u) == 'n' || (_u) == 'y') ? (_t) : (convertTimeFromPrecisionToUnit(_t, _p, _u))) - -#define EXPLAIN_ROW_NEW(level, ...) \ - do { \ - tlen = snprintf(tbuf + VARSTR_HEADER_SIZE, TSDB_EXPLAIN_RESULT_ROW_SIZE, "%*s%s", (level) * 2, "", (isVerboseLine ? "" : "-> ")); \ - tlen += snprintf(tbuf + VARSTR_HEADER_SIZE + tlen, TSDB_EXPLAIN_RESULT_ROW_SIZE - tlen, __VA_ARGS__); \ - } while (0) - -#define EXPLAIN_ROW_APPEND(...) tlen += snprintf(tbuf + VARSTR_HEADER_SIZE + tlen, TSDB_EXPLAIN_RESULT_ROW_SIZE - tlen, __VA_ARGS__) -#define EXPLAIN_ROW_END() do { varDataSetLen(tbuf, tlen); tlen += VARSTR_HEADER_SIZE; isVerboseLine = true; } while (0) - -#define QRY_ERR_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; return _code; } } while (0) -#define QRY_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; } return _code; } while (0) -#define QRY_ERR_JRET(c) do { code = c; if (code != TSDB_CODE_SUCCESS) { terrno = code; goto _return; } } while (0) #ifdef __cplusplus diff --git a/source/libs/scheduler/CMakeLists.txt b/source/libs/scheduler/CMakeLists.txt index a4a299317c..1a62c7d89d 100644 --- a/source/libs/scheduler/CMakeLists.txt +++ b/source/libs/scheduler/CMakeLists.txt @@ -9,7 +9,7 @@ target_include_directories( target_link_libraries( scheduler - PUBLIC os util nodes planner qcom common catalog transport + PUBLIC os util nodes planner qcom common catalog transport command ) if(${BUILD_TEST}) diff --git a/source/libs/scheduler/inc/schedulerInt.h b/source/libs/scheduler/inc/schedulerInt.h index f19e822974..22bd039219 100644 --- a/source/libs/scheduler/inc/schedulerInt.h +++ b/source/libs/scheduler/inc/schedulerInt.h @@ -38,13 +38,6 @@ enum { SCH_WRITE, }; -//TODO MOVE TO LIB -typedef struct SExplainGroup { - int32_t nodeNum; - SSubplan *plan; - void *execInfo; //TODO -} SExplainGroup; - typedef struct SSchTrans { void *transInst; void *transHandle; diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index 940aaed7c1..7c24a25f65 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -19,6 +19,7 @@ #include "tmsg.h" #include "tref.h" #include "trpc.h" +#include "command.h" SSchedulerMgmt schMgmt = {0}; @@ -476,92 +477,6 @@ _return: SCH_RET(code); } -int32_t schValidateAndBuildJobExplain(SQueryPlan *pDag, SSchJob *pJob) { - int32_t code = 0; - SNodeListNode *plans = NULL; - int32_t taskNum = 0; - SExplainGroup *pGroup = NULL; - void *pCtx = NULL; - int32_t rootGroupId = 0; - - pJob->queryId = pDag->queryId; - pJob->subPlans = pDag->pSubplans; - - if (pDag->numOfSubplans <= 0) { - SCH_JOB_ELOG("invalid subplan num:%d", pDag->numOfSubplans); - SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); - } - - int32_t levelNum = (int32_t)LIST_LENGTH(pDag->pSubplans); - if (levelNum <= 0) { - SCH_JOB_ELOG("invalid level num:%d", levelNum); - SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); - } - - SHashObj *groupHash = taosHashInit(SCHEDULE_DEFAULT_MAX_TASK_NUM, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK); - if (NULL == groupHash) { - SCH_JOB_ELOG("groupHash %d failed", SCHEDULE_DEFAULT_MAX_TASK_NUM); - SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); - } - - SCH_ERR_JRET(qInitExplainCtx(&pCtx, groupHash, pDag->explainInfo.verbose)); - - for (int32_t i = 0; i < levelNum; ++i) { - plans = (SNodeListNode *)nodesListGetNode(pDag->pSubplans, i); - if (NULL == plans) { - SCH_JOB_ELOG("empty level plan, level:%d", i); - SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT); - } - - taskNum = (int32_t)LIST_LENGTH(plans->pNodeList); - if (taskNum <= 0) { - SCH_JOB_ELOG("invalid level plan number:%d, level:%d", taskNum, i); - SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT); - } - - SSubplan *plan = NULL; - for (int32_t n = 0; n < taskNum; ++n) { - plan = (SSubplan *)nodesListGetNode(plans->pNodeList, n); - pGroup = taosHashGet(groupHash, &plan->id.groupId, sizeof(plan->id.groupId)); - if (pGroup) { - ++pGroup->nodeNum; - continue; - } - - SExplainGroup group = {.nodeNum = 1, .plan = plan, .execInfo = NULL}; - if (0 != taosHashPut(groupHash, &plan->id.groupId, sizeof(plan->id.groupId), &group, sizeof(group))) { - SCH_JOB_ELOG("taosHashPut to explainGroupHash failed, taskIdx:%d", n); - SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); - } - } - - if (0 == i) { - if (taskNum > 1) { - SCH_JOB_ELOG("invalid taskNum %d for level 0", taskNum); - SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT); - } - - rootGroupId = plan->id.groupId; - } - - SCH_JOB_DLOG("level %d group handled, taskNum:%d", i, taskNum); - } - - SCH_ERR_JRET(qAppendTaskExplainResRows(pCtx, rootGroupId, 0)); - - SRetrieveTableRsp *pRsp = NULL; - SCH_ERR_JRET(qGetExplainRspFromCtx(pCtx, &pRsp)); - - pJob->resData = pRsp; - -_return: - - qFreeExplainCtx(pCtx); - - SCH_RET(code); -} - - int32_t schSetTaskCandidateAddrs(SSchJob *pJob, SSchTask *pTask) { if (NULL != pTask->candidateAddrs) { return TSDB_CODE_SUCCESS; @@ -2250,7 +2165,7 @@ _return: SCH_RET(code); } -int32_t schStaticExplain(void *transport, SArray *pNodeList, SQueryPlan *pDag, int64_t *job, const char *sql, +int32_t schExecStaticExplain(void *transport, SArray *pNodeList, SQueryPlan *pDag, int64_t *job, const char *sql, bool syncSchedule) { qDebug("QID:0x%" PRIx64 " job started", pDag->queryId); @@ -2264,8 +2179,10 @@ int32_t schStaticExplain(void *transport, SArray *pNodeList, SQueryPlan *pDag, i pJob->sql = sql; pJob->attr.queryJob = true; pJob->attr.explainMode = pDag->explainInfo.mode; + pJob->queryId = pDag->queryId; + pJob->subPlans = pDag->pSubplans; - SCH_ERR_JRET(schValidateAndBuildJobExplain(pDag, pJob)); + SCH_ERR_JRET(qExecStaticExplain(pDag, (SRetrieveTableRsp **)&pJob->resData)); int64_t refId = taosAddRef(schMgmt.jobRef, pJob); if (refId < 0) { @@ -2345,7 +2262,7 @@ int32_t schedulerExecJob(void *transport, SArray *nodeList, SQueryPlan *pDag, in } if (EXPLAIN_MODE_STATIC == pDag->explainInfo.mode) { - SCH_ERR_RET(schStaticExplain(transport, nodeList, pDag, pJob, sql, true)); + SCH_ERR_RET(schExecStaticExplain(transport, nodeList, pDag, pJob, sql, true)); } else { SCH_ERR_RET(schExecJobImpl(transport, nodeList, pDag, pJob, sql, true)); }