feature/qnode
This commit is contained in:
parent
369d08c557
commit
0608824186
|
@ -60,6 +60,7 @@ int32_t parseNatualDuration(const char* token, int32_t tokenLen, int64_t* durati
|
|||
|
||||
int32_t taosParseTime(const char* timestr, int64_t* time, int32_t len, int32_t timePrec, int8_t dayligth);
|
||||
void deltaToUtcInitOnce();
|
||||
char getPrecisionUnit(int32_t precision);
|
||||
|
||||
int64_t convertTimePrecision(int64_t time, int32_t fromPrecision, int32_t toPrecision);
|
||||
int64_t convertTimeFromPrecisionToUnit(int64_t time, int32_t fromPrecision, char toUnit);
|
||||
|
|
|
@ -15,5 +15,10 @@
|
|||
|
||||
#include "cmdnodes.h"
|
||||
#include "tmsg.h"
|
||||
#include "plannodes.h"
|
||||
|
||||
int32_t qExecCommand(SNode* pStmt, SRetrieveTableRsp** pRsp);
|
||||
|
||||
int32_t qExecStaticExplain(SQueryPlan *pDag, SRetrieveTableRsp **pRsp);
|
||||
|
||||
|
||||
|
|
|
@ -253,7 +253,7 @@ typedef struct SIntervalPhysiNode {
|
|||
int64_t sliding;
|
||||
int8_t intervalUnit;
|
||||
int8_t slidingUnit;
|
||||
uint8_t precision;
|
||||
uint8_t precision;
|
||||
SFillNode* pFill;
|
||||
} SIntervalPhysiNode;
|
||||
|
||||
|
|
|
@ -174,11 +174,6 @@ bool tIsValidSchema(struct SSchema* pSchema, int32_t numOfCols, int32_
|
|||
int32_t queryCreateTableMetaFromMsg(STableMetaRsp* msg, bool isSuperTable, STableMeta** pMeta);
|
||||
char *jobTaskStatusStr(int32_t status);
|
||||
|
||||
int32_t qInitExplainCtx(void **pCtx, SHashObj *groupHash, bool verbose);
|
||||
int32_t qAppendTaskExplainResRows(void *pCtx, int32_t groupId, int32_t level);
|
||||
int32_t qGetExplainRspFromCtx(void *ctx, SRetrieveTableRsp **pRsp);
|
||||
void qFreeExplainCtx(void *ctx);
|
||||
|
||||
SSchema createSchema(int8_t type, int32_t bytes, col_id_t colId, const char* name);
|
||||
|
||||
extern int32_t (*queryBuildMsg[TDMT_MAX])(void* input, char** msg, int32_t msgSize, int32_t* msgLen);
|
||||
|
@ -241,6 +236,11 @@ extern int32_t (*queryProcessMsgRsp[TDMT_MAX])(void* output, char* msg, int32_t
|
|||
} \
|
||||
} while (0)
|
||||
|
||||
#define QRY_ERR_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; return _code; } } while (0)
|
||||
#define QRY_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; } return _code; } while (0)
|
||||
#define QRY_ERR_JRET(c) do { code = c; if (code != TSDB_CODE_SUCCESS) { terrno = code; goto _return; } } while (0)
|
||||
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
|
|
@ -361,6 +361,18 @@ int32_t parseLocaltimeDst(char* timestr, int64_t* time, int32_t timePrec) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
char getPrecisionUnit(int32_t precision) {
|
||||
static char units[3] = {TIME_UNIT_MILLISECOND, TIME_UNIT_MICROSECOND, TIME_UNIT_NANOSECOND};
|
||||
switch (precision) {
|
||||
case TSDB_TIME_PRECISION_MILLI:
|
||||
case TSDB_TIME_PRECISION_MICRO:
|
||||
case TSDB_TIME_PRECISION_NANO:
|
||||
return units[precision];
|
||||
default:
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
int64_t convertTimePrecision(int64_t time, int32_t fromPrecision, int32_t toPrecision) {
|
||||
assert(fromPrecision == TSDB_TIME_PRECISION_MILLI || fromPrecision == TSDB_TIME_PRECISION_MICRO ||
|
||||
fromPrecision == TSDB_TIME_PRECISION_NANO);
|
||||
|
|
|
@ -0,0 +1,95 @@
|
|||
/*
|
||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||
*
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3
|
||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#ifndef _TD_QUERY_INT_H_
|
||||
#define _TD_QUERY_INT_H_
|
||||
|
||||
#ifdef __cplusplus
|
||||
extern "C" {
|
||||
#endif
|
||||
#include "nodes.h"
|
||||
#include "plannodes.h"
|
||||
#include "ttime.h"
|
||||
|
||||
#define EXPLAIN_MAX_GROUP_NUM 100
|
||||
|
||||
//newline area
|
||||
#define EXPLAIN_TAG_SCAN_FORMAT "Tag Scan on %s columns=%d"
|
||||
#define EXPLAIN_TBL_SCAN_FORMAT "Table Scan on %s columns=%d"
|
||||
#define EXPLAIN_SYSTBL_SCAN_FORMAT "System Table Scan on %s columns=%d"
|
||||
#define EXPLAIN_PROJECTION_FORMAT "Projection columns=%d width=%d"
|
||||
#define EXPLAIN_JOIN_FORMAT "%s between %d tables width=%d"
|
||||
#define EXPLAIN_AGG_FORMAT "Aggragate functions=%d"
|
||||
#define EXPLAIN_EXCHANGE_FORMAT "Data Exchange %d:1 width=%d"
|
||||
#define EXPLAIN_SORT_FORMAT "Sort on %d Column(s) width=%d"
|
||||
#define EXPLAIN_INTERVAL_FORMAT "Interval on Column %s functions=%d interval=%" PRId64 "%c offset=%" PRId64 "%c sliding=%" PRId64 "%c width=%d"
|
||||
#define EXPLAIN_SESSION_FORMAT "Session gap=%" PRId64 " functions=%d width=%d"
|
||||
#define EXPLAIN_ORDER_FORMAT "Order: %s"
|
||||
#define EXPLAIN_FILTER_FORMAT "Filter: "
|
||||
#define EXPLAIN_FILL_FORMAT "Fill: %s"
|
||||
#define EXPLAIN_ON_CONDITIONS_FORMAT "Join Cond: "
|
||||
#define EXPLAIN_TIMERANGE_FORMAT "Time Range: [%" PRId64 ", %" PRId64 "]"
|
||||
|
||||
//append area
|
||||
#define EXPLAIN_GROUPS_FORMAT " groups=%d"
|
||||
#define EXPLAIN_WIDTH_FORMAT " width=%d"
|
||||
#define EXPLAIN_LOOPS_FORMAT " loops=%d"
|
||||
#define EXPLAIN_REVERSE_FORMAT " reverse=%d"
|
||||
|
||||
typedef struct SExplainGroup {
|
||||
int32_t nodeNum;
|
||||
SSubplan *plan;
|
||||
void *execInfo; //TODO
|
||||
} SExplainGroup;
|
||||
|
||||
typedef struct SExplainResNode {
|
||||
SNodeList* pChildren;
|
||||
SPhysiNode* pNode;
|
||||
void* pExecInfo;
|
||||
} SExplainResNode;
|
||||
|
||||
typedef struct SQueryExplainRowInfo {
|
||||
int32_t level;
|
||||
int32_t len;
|
||||
char *buf;
|
||||
} SQueryExplainRowInfo;
|
||||
|
||||
typedef struct SExplainCtx {
|
||||
int32_t totalSize;
|
||||
bool verbose;
|
||||
char *tbuf;
|
||||
SArray *rows;
|
||||
SHashObj *groupHash;
|
||||
} SExplainCtx;
|
||||
|
||||
#define EXPLAIN_ORDER_STRING(_order) ((TSDB_ORDER_ASC == _order) ? "Ascending" : "Descending")
|
||||
#define EXPLAIN_JOIN_STRING(_type) ((JOIN_TYPE_INNER == _type) ? "Inner join" : "Join")
|
||||
|
||||
#define INVERAL_TIME_FROM_PRECISION_TO_UNIT(_t, _u, _p) (((_u) == 'n' || (_u) == 'y') ? (_t) : (convertTimeFromPrecisionToUnit(_t, _p, _u)))
|
||||
|
||||
#define EXPLAIN_ROW_NEW(level, ...) \
|
||||
do { \
|
||||
tlen = snprintf(tbuf + VARSTR_HEADER_SIZE, TSDB_EXPLAIN_RESULT_ROW_SIZE, "%*s%s", (level) * 2, "", (isVerboseLine ? "" : "-> ")); \
|
||||
tlen += snprintf(tbuf + VARSTR_HEADER_SIZE + tlen, TSDB_EXPLAIN_RESULT_ROW_SIZE - tlen, __VA_ARGS__); \
|
||||
} while (0)
|
||||
|
||||
#define EXPLAIN_ROW_APPEND(...) tlen += snprintf(tbuf + VARSTR_HEADER_SIZE + tlen, TSDB_EXPLAIN_RESULT_ROW_SIZE - tlen, __VA_ARGS__)
|
||||
#define EXPLAIN_ROW_END() do { varDataSetLen(tbuf, tlen); tlen += VARSTR_HEADER_SIZE; isVerboseLine = true; } while (0)
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
||||
#endif /*_TD_QUERY_INT_H_*/
|
|
@ -13,11 +13,12 @@
|
|||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#include "queryInt.h"
|
||||
#include "query.h"
|
||||
#include "plannodes.h"
|
||||
#include "commandInt.h"
|
||||
|
||||
int32_t qGenerateExplainResNode(SPhysiNode *pNode, void *pExecInfo, SExplainResNode **pRes);
|
||||
int32_t qAppendTaskExplainResRows(void *pCtx, int32_t groupId, int32_t level);
|
||||
|
||||
|
||||
void qFreeExplainResTree(SExplainResNode *res) {
|
||||
|
@ -265,6 +266,7 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
|
|||
bool isVerboseLine = false;
|
||||
char *tbuf = ctx->tbuf;
|
||||
bool verbose = ctx->verbose;
|
||||
int32_t filterLen = 0;
|
||||
SPhysiNode* pNode = pResNode->pNode;
|
||||
if (NULL == pNode) {
|
||||
qError("pyhsical node in explain res node is NULL");
|
||||
|
@ -317,7 +319,8 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
|
|||
|
||||
if (pTblScanNode->scan.node.pConditions) {
|
||||
EXPLAIN_ROW_NEW(level + 1, EXPLAIN_FILTER_FORMAT);
|
||||
QRY_ERR_RET(nodesNodeToSQL(pTblScanNode->scan.node.pConditions, tbuf, TSDB_EXPLAIN_RESULT_ROW_SIZE, &tlen));
|
||||
QRY_ERR_RET(nodesNodeToSQL(pTblScanNode->scan.node.pConditions, tbuf + tlen, TSDB_EXPLAIN_RESULT_ROW_SIZE - tlen, &filterLen));
|
||||
tlen += filterLen;
|
||||
EXPLAIN_ROW_END();
|
||||
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1));
|
||||
}
|
||||
|
@ -356,7 +359,8 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
|
|||
if (verbose) {
|
||||
if (pPrjNode->node.pConditions) {
|
||||
EXPLAIN_ROW_NEW(level + 1, EXPLAIN_FILTER_FORMAT);
|
||||
QRY_ERR_RET(nodesNodeToSQL(pPrjNode->node.pConditions, tbuf, TSDB_EXPLAIN_RESULT_ROW_SIZE, &tlen));
|
||||
QRY_ERR_RET(nodesNodeToSQL(pPrjNode->node.pConditions, tbuf + tlen, TSDB_EXPLAIN_RESULT_ROW_SIZE - tlen, &filterLen));
|
||||
tlen += filterLen;
|
||||
EXPLAIN_ROW_END();
|
||||
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1));
|
||||
}
|
||||
|
@ -375,13 +379,15 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
|
|||
if (verbose) {
|
||||
if (pJoinNode->node.pConditions) {
|
||||
EXPLAIN_ROW_NEW(level + 1, EXPLAIN_FILTER_FORMAT);
|
||||
QRY_ERR_RET(nodesNodeToSQL(pJoinNode->node.pConditions, tbuf, TSDB_EXPLAIN_RESULT_ROW_SIZE, &tlen));
|
||||
QRY_ERR_RET(nodesNodeToSQL(pJoinNode->node.pConditions, tbuf + tlen, TSDB_EXPLAIN_RESULT_ROW_SIZE - tlen, &filterLen));
|
||||
tlen += filterLen;
|
||||
EXPLAIN_ROW_END();
|
||||
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1));
|
||||
}
|
||||
|
||||
EXPLAIN_ROW_NEW(level + 1, EXPLAIN_ON_CONDITIONS_FORMAT);
|
||||
QRY_ERR_RET(nodesNodeToSQL(pJoinNode->pOnConditions, tbuf, TSDB_EXPLAIN_RESULT_ROW_SIZE, &tlen));
|
||||
QRY_ERR_RET(nodesNodeToSQL(pJoinNode->pOnConditions, tbuf + tlen, TSDB_EXPLAIN_RESULT_ROW_SIZE - tlen, &filterLen));
|
||||
tlen += filterLen;
|
||||
EXPLAIN_ROW_END();
|
||||
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1));
|
||||
}
|
||||
|
@ -404,7 +410,8 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
|
|||
if (verbose) {
|
||||
if (pAggNode->node.pConditions) {
|
||||
EXPLAIN_ROW_NEW(level + 1, EXPLAIN_FILTER_FORMAT);
|
||||
QRY_ERR_RET(nodesNodeToSQL(pAggNode->node.pConditions, tbuf, TSDB_EXPLAIN_RESULT_ROW_SIZE, &tlen));
|
||||
QRY_ERR_RET(nodesNodeToSQL(pAggNode->node.pConditions, tbuf + tlen, TSDB_EXPLAIN_RESULT_ROW_SIZE - tlen, &filterLen));
|
||||
tlen += filterLen;
|
||||
EXPLAIN_ROW_END();
|
||||
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1));
|
||||
}
|
||||
|
@ -429,7 +436,8 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
|
|||
if (verbose) {
|
||||
if (pExchNode->node.pConditions) {
|
||||
EXPLAIN_ROW_NEW(level + 1, EXPLAIN_FILTER_FORMAT);
|
||||
QRY_ERR_RET(nodesNodeToSQL(pExchNode->node.pConditions, tbuf, TSDB_EXPLAIN_RESULT_ROW_SIZE, &tlen));
|
||||
QRY_ERR_RET(nodesNodeToSQL(pExchNode->node.pConditions, tbuf + tlen, TSDB_EXPLAIN_RESULT_ROW_SIZE - tlen, &filterLen));
|
||||
tlen += filterLen;
|
||||
EXPLAIN_ROW_END();
|
||||
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1));
|
||||
}
|
||||
|
@ -450,7 +458,8 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
|
|||
if (verbose) {
|
||||
if (pSortNode->node.pConditions) {
|
||||
EXPLAIN_ROW_NEW(level + 1, EXPLAIN_FILTER_FORMAT);
|
||||
QRY_ERR_RET(nodesNodeToSQL(pSortNode->node.pConditions, tbuf, TSDB_EXPLAIN_RESULT_ROW_SIZE, &tlen));
|
||||
QRY_ERR_RET(nodesNodeToSQL(pSortNode->node.pConditions, tbuf + tlen, TSDB_EXPLAIN_RESULT_ROW_SIZE - tlen, &filterLen));
|
||||
tlen += filterLen;
|
||||
EXPLAIN_ROW_END();
|
||||
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1));
|
||||
}
|
||||
|
@ -461,7 +470,7 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
|
|||
SIntervalPhysiNode *pIntNode = (SIntervalPhysiNode *)pNode;
|
||||
EXPLAIN_ROW_NEW(level, EXPLAIN_INTERVAL_FORMAT, qGetNameFromColumnNode(pIntNode->pTspk), pIntNode->window.pFuncs->length,
|
||||
INVERAL_TIME_FROM_PRECISION_TO_UNIT(pIntNode->interval, pIntNode->intervalUnit, pIntNode->precision), pIntNode->intervalUnit,
|
||||
INVERAL_TIME_FROM_PRECISION_TO_UNIT(pIntNode->offset, pIntNode->intervalUnit, pIntNode->precision), pIntNode->intervalUnit,
|
||||
pIntNode->offset, getPrecisionUnit(pIntNode->precision),
|
||||
INVERAL_TIME_FROM_PRECISION_TO_UNIT(pIntNode->sliding, pIntNode->slidingUnit, pIntNode->precision), pIntNode->slidingUnit,
|
||||
pIntNode->window.node.pOutputDataBlockDesc->outputRowSize);
|
||||
if (pResNode->pExecInfo) {
|
||||
|
@ -479,7 +488,8 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
|
|||
|
||||
if (pIntNode->window.node.pConditions) {
|
||||
EXPLAIN_ROW_NEW(level + 1, EXPLAIN_FILTER_FORMAT);
|
||||
QRY_ERR_RET(nodesNodeToSQL(pIntNode->window.node.pConditions, tbuf, TSDB_EXPLAIN_RESULT_ROW_SIZE, &tlen));
|
||||
QRY_ERR_RET(nodesNodeToSQL(pIntNode->window.node.pConditions, tbuf + tlen, TSDB_EXPLAIN_RESULT_ROW_SIZE - tlen, &filterLen));
|
||||
tlen += filterLen;
|
||||
EXPLAIN_ROW_END();
|
||||
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1));
|
||||
}
|
||||
|
@ -498,7 +508,8 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
|
|||
if (verbose) {
|
||||
if (pIntNode->window.node.pConditions) {
|
||||
EXPLAIN_ROW_NEW(level + 1, EXPLAIN_FILTER_FORMAT);
|
||||
QRY_ERR_RET(nodesNodeToSQL(pIntNode->window.node.pConditions, tbuf, TSDB_EXPLAIN_RESULT_ROW_SIZE, &tlen));
|
||||
QRY_ERR_RET(nodesNodeToSQL(pIntNode->window.node.pConditions, tbuf + tlen, TSDB_EXPLAIN_RESULT_ROW_SIZE - tlen, &filterLen));
|
||||
tlen += filterLen;
|
||||
EXPLAIN_ROW_END();
|
||||
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1));
|
||||
}
|
||||
|
@ -595,5 +606,84 @@ int32_t qGetExplainRspFromCtx(void *ctx, SRetrieveTableRsp **pRsp) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t qExecStaticExplain(SQueryPlan *pDag, SRetrieveTableRsp **pRsp) {
|
||||
int32_t code = 0;
|
||||
SNodeListNode *plans = NULL;
|
||||
int32_t taskNum = 0;
|
||||
SExplainGroup *pGroup = NULL;
|
||||
void *pCtx = NULL;
|
||||
int32_t rootGroupId = 0;
|
||||
|
||||
if (pDag->numOfSubplans <= 0) {
|
||||
qError("invalid subplan num:%d", pDag->numOfSubplans);
|
||||
QRY_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
|
||||
}
|
||||
|
||||
int32_t levelNum = (int32_t)LIST_LENGTH(pDag->pSubplans);
|
||||
if (levelNum <= 0) {
|
||||
qError("invalid level num:%d", levelNum);
|
||||
QRY_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
|
||||
}
|
||||
|
||||
SHashObj *groupHash = taosHashInit(EXPLAIN_MAX_GROUP_NUM, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
|
||||
if (NULL == groupHash) {
|
||||
qError("groupHash %d failed", EXPLAIN_MAX_GROUP_NUM);
|
||||
QRY_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
QRY_ERR_JRET(qInitExplainCtx(&pCtx, groupHash, pDag->explainInfo.verbose));
|
||||
|
||||
for (int32_t i = 0; i < levelNum; ++i) {
|
||||
plans = (SNodeListNode *)nodesListGetNode(pDag->pSubplans, i);
|
||||
if (NULL == plans) {
|
||||
qError("empty level plan, level:%d", i);
|
||||
QRY_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
|
||||
}
|
||||
|
||||
taskNum = (int32_t)LIST_LENGTH(plans->pNodeList);
|
||||
if (taskNum <= 0) {
|
||||
qError("invalid level plan number:%d, level:%d", taskNum, i);
|
||||
QRY_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
|
||||
}
|
||||
|
||||
SSubplan *plan = NULL;
|
||||
for (int32_t n = 0; n < taskNum; ++n) {
|
||||
plan = (SSubplan *)nodesListGetNode(plans->pNodeList, n);
|
||||
pGroup = taosHashGet(groupHash, &plan->id.groupId, sizeof(plan->id.groupId));
|
||||
if (pGroup) {
|
||||
++pGroup->nodeNum;
|
||||
continue;
|
||||
}
|
||||
|
||||
SExplainGroup group = {.nodeNum = 1, .plan = plan, .execInfo = NULL};
|
||||
if (0 != taosHashPut(groupHash, &plan->id.groupId, sizeof(plan->id.groupId), &group, sizeof(group))) {
|
||||
qError("taosHashPut to explainGroupHash failed, taskIdx:%d", n);
|
||||
QRY_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||
}
|
||||
}
|
||||
|
||||
if (0 == i) {
|
||||
if (taskNum > 1) {
|
||||
qError("invalid taskNum %d for level 0", taskNum);
|
||||
QRY_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
|
||||
}
|
||||
|
||||
rootGroupId = plan->id.groupId;
|
||||
}
|
||||
|
||||
qDebug("level %d group handled, taskNum:%d", i, taskNum);
|
||||
}
|
||||
|
||||
QRY_ERR_JRET(qAppendTaskExplainResRows(pCtx, rootGroupId, 0));
|
||||
|
||||
QRY_ERR_JRET(qGetExplainRspFromCtx(pCtx, pRsp));
|
||||
|
||||
_return:
|
||||
|
||||
qFreeExplainCtx(pCtx);
|
||||
|
||||
QRY_RET(code);
|
||||
}
|
||||
|
||||
|
||||
|
|
@ -19,77 +19,6 @@
|
|||
#ifdef __cplusplus
|
||||
extern "C" {
|
||||
#endif
|
||||
#include "nodes.h"
|
||||
#include "plannodes.h"
|
||||
|
||||
//newline area
|
||||
#define EXPLAIN_TAG_SCAN_FORMAT "Tag Scan on %s columns=%d"
|
||||
#define EXPLAIN_TBL_SCAN_FORMAT "Table Scan on %s columns=%d"
|
||||
#define EXPLAIN_SYSTBL_SCAN_FORMAT "System Table Scan on %s columns=%d"
|
||||
#define EXPLAIN_PROJECTION_FORMAT "Projection columns=%d width=%d"
|
||||
#define EXPLAIN_JOIN_FORMAT "%s between %d tables width=%d"
|
||||
#define EXPLAIN_AGG_FORMAT "Aggragate functions=%d"
|
||||
#define EXPLAIN_EXCHANGE_FORMAT "Data Exchange %d:1 width=%d"
|
||||
#define EXPLAIN_SORT_FORMAT "Sort on %d Column(s) width=%d"
|
||||
#define EXPLAIN_INTERVAL_FORMAT "Interval on Column %s functions=%d interval=%" PRId64 "%c offset=%" PRId64 "%c sliding=%" PRId64 "%c width=%d"
|
||||
#define EXPLAIN_SESSION_FORMAT "Session gap=%" PRId64 " functions=%d width=%d"
|
||||
#define EXPLAIN_ORDER_FORMAT "Order: %s"
|
||||
#define EXPLAIN_FILTER_FORMAT "Filter: "
|
||||
#define EXPLAIN_FILL_FORMAT "Fill: %s"
|
||||
#define EXPLAIN_ON_CONDITIONS_FORMAT "Join Cond: "
|
||||
#define EXPLAIN_TIMERANGE_FORMAT "Time Range: [%" PRId64 ", %" PRId64 "]"
|
||||
|
||||
//append area
|
||||
#define EXPLAIN_GROUPS_FORMAT " groups=%d"
|
||||
#define EXPLAIN_WIDTH_FORMAT " width=%d"
|
||||
#define EXPLAIN_LOOPS_FORMAT " loops=%d"
|
||||
#define EXPLAIN_REVERSE_FORMAT " reverse=%d"
|
||||
|
||||
//TODO MOVE TO LIB
|
||||
typedef struct SExplainGroup {
|
||||
int32_t nodeNum;
|
||||
SSubplan *plan;
|
||||
void *execInfo; //TODO
|
||||
} SExplainGroup;
|
||||
|
||||
|
||||
typedef struct SExplainResNode {
|
||||
SNodeList* pChildren;
|
||||
SPhysiNode* pNode;
|
||||
void* pExecInfo;
|
||||
} SExplainResNode;
|
||||
|
||||
typedef struct SQueryExplainRowInfo {
|
||||
int32_t level;
|
||||
int32_t len;
|
||||
char *buf;
|
||||
} SQueryExplainRowInfo;
|
||||
|
||||
typedef struct SExplainCtx {
|
||||
int32_t totalSize;
|
||||
bool verbose;
|
||||
char *tbuf;
|
||||
SArray *rows;
|
||||
SHashObj *groupHash;
|
||||
} SExplainCtx;
|
||||
|
||||
#define EXPLAIN_ORDER_STRING(_order) ((TSDB_ORDER_ASC == _order) ? "Ascending" : "Descending")
|
||||
#define EXPLAIN_JOIN_STRING(_type) ((JOIN_TYPE_INNER == _type) ? "Inner join" : "Join")
|
||||
|
||||
#define INVERAL_TIME_FROM_PRECISION_TO_UNIT(_t, _u, _p) (((_u) == 'n' || (_u) == 'y') ? (_t) : (convertTimeFromPrecisionToUnit(_t, _p, _u)))
|
||||
|
||||
#define EXPLAIN_ROW_NEW(level, ...) \
|
||||
do { \
|
||||
tlen = snprintf(tbuf + VARSTR_HEADER_SIZE, TSDB_EXPLAIN_RESULT_ROW_SIZE, "%*s%s", (level) * 2, "", (isVerboseLine ? "" : "-> ")); \
|
||||
tlen += snprintf(tbuf + VARSTR_HEADER_SIZE + tlen, TSDB_EXPLAIN_RESULT_ROW_SIZE - tlen, __VA_ARGS__); \
|
||||
} while (0)
|
||||
|
||||
#define EXPLAIN_ROW_APPEND(...) tlen += snprintf(tbuf + VARSTR_HEADER_SIZE + tlen, TSDB_EXPLAIN_RESULT_ROW_SIZE - tlen, __VA_ARGS__)
|
||||
#define EXPLAIN_ROW_END() do { varDataSetLen(tbuf, tlen); tlen += VARSTR_HEADER_SIZE; isVerboseLine = true; } while (0)
|
||||
|
||||
#define QRY_ERR_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; return _code; } } while (0)
|
||||
#define QRY_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; } return _code; } while (0)
|
||||
#define QRY_ERR_JRET(c) do { code = c; if (code != TSDB_CODE_SUCCESS) { terrno = code; goto _return; } } while (0)
|
||||
|
||||
|
||||
#ifdef __cplusplus
|
||||
|
|
|
@ -9,7 +9,7 @@ target_include_directories(
|
|||
|
||||
target_link_libraries(
|
||||
scheduler
|
||||
PUBLIC os util nodes planner qcom common catalog transport
|
||||
PUBLIC os util nodes planner qcom common catalog transport command
|
||||
)
|
||||
|
||||
if(${BUILD_TEST})
|
||||
|
|
|
@ -38,13 +38,6 @@ enum {
|
|||
SCH_WRITE,
|
||||
};
|
||||
|
||||
//TODO MOVE TO LIB
|
||||
typedef struct SExplainGroup {
|
||||
int32_t nodeNum;
|
||||
SSubplan *plan;
|
||||
void *execInfo; //TODO
|
||||
} SExplainGroup;
|
||||
|
||||
typedef struct SSchTrans {
|
||||
void *transInst;
|
||||
void *transHandle;
|
||||
|
|
|
@ -19,6 +19,7 @@
|
|||
#include "tmsg.h"
|
||||
#include "tref.h"
|
||||
#include "trpc.h"
|
||||
#include "command.h"
|
||||
|
||||
SSchedulerMgmt schMgmt = {0};
|
||||
|
||||
|
@ -476,92 +477,6 @@ _return:
|
|||
SCH_RET(code);
|
||||
}
|
||||
|
||||
int32_t schValidateAndBuildJobExplain(SQueryPlan *pDag, SSchJob *pJob) {
|
||||
int32_t code = 0;
|
||||
SNodeListNode *plans = NULL;
|
||||
int32_t taskNum = 0;
|
||||
SExplainGroup *pGroup = NULL;
|
||||
void *pCtx = NULL;
|
||||
int32_t rootGroupId = 0;
|
||||
|
||||
pJob->queryId = pDag->queryId;
|
||||
pJob->subPlans = pDag->pSubplans;
|
||||
|
||||
if (pDag->numOfSubplans <= 0) {
|
||||
SCH_JOB_ELOG("invalid subplan num:%d", pDag->numOfSubplans);
|
||||
SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
|
||||
}
|
||||
|
||||
int32_t levelNum = (int32_t)LIST_LENGTH(pDag->pSubplans);
|
||||
if (levelNum <= 0) {
|
||||
SCH_JOB_ELOG("invalid level num:%d", levelNum);
|
||||
SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
|
||||
}
|
||||
|
||||
SHashObj *groupHash = taosHashInit(SCHEDULE_DEFAULT_MAX_TASK_NUM, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
|
||||
if (NULL == groupHash) {
|
||||
SCH_JOB_ELOG("groupHash %d failed", SCHEDULE_DEFAULT_MAX_TASK_NUM);
|
||||
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
SCH_ERR_JRET(qInitExplainCtx(&pCtx, groupHash, pDag->explainInfo.verbose));
|
||||
|
||||
for (int32_t i = 0; i < levelNum; ++i) {
|
||||
plans = (SNodeListNode *)nodesListGetNode(pDag->pSubplans, i);
|
||||
if (NULL == plans) {
|
||||
SCH_JOB_ELOG("empty level plan, level:%d", i);
|
||||
SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
|
||||
}
|
||||
|
||||
taskNum = (int32_t)LIST_LENGTH(plans->pNodeList);
|
||||
if (taskNum <= 0) {
|
||||
SCH_JOB_ELOG("invalid level plan number:%d, level:%d", taskNum, i);
|
||||
SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
|
||||
}
|
||||
|
||||
SSubplan *plan = NULL;
|
||||
for (int32_t n = 0; n < taskNum; ++n) {
|
||||
plan = (SSubplan *)nodesListGetNode(plans->pNodeList, n);
|
||||
pGroup = taosHashGet(groupHash, &plan->id.groupId, sizeof(plan->id.groupId));
|
||||
if (pGroup) {
|
||||
++pGroup->nodeNum;
|
||||
continue;
|
||||
}
|
||||
|
||||
SExplainGroup group = {.nodeNum = 1, .plan = plan, .execInfo = NULL};
|
||||
if (0 != taosHashPut(groupHash, &plan->id.groupId, sizeof(plan->id.groupId), &group, sizeof(group))) {
|
||||
SCH_JOB_ELOG("taosHashPut to explainGroupHash failed, taskIdx:%d", n);
|
||||
SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||
}
|
||||
}
|
||||
|
||||
if (0 == i) {
|
||||
if (taskNum > 1) {
|
||||
SCH_JOB_ELOG("invalid taskNum %d for level 0", taskNum);
|
||||
SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
|
||||
}
|
||||
|
||||
rootGroupId = plan->id.groupId;
|
||||
}
|
||||
|
||||
SCH_JOB_DLOG("level %d group handled, taskNum:%d", i, taskNum);
|
||||
}
|
||||
|
||||
SCH_ERR_JRET(qAppendTaskExplainResRows(pCtx, rootGroupId, 0));
|
||||
|
||||
SRetrieveTableRsp *pRsp = NULL;
|
||||
SCH_ERR_JRET(qGetExplainRspFromCtx(pCtx, &pRsp));
|
||||
|
||||
pJob->resData = pRsp;
|
||||
|
||||
_return:
|
||||
|
||||
qFreeExplainCtx(pCtx);
|
||||
|
||||
SCH_RET(code);
|
||||
}
|
||||
|
||||
|
||||
int32_t schSetTaskCandidateAddrs(SSchJob *pJob, SSchTask *pTask) {
|
||||
if (NULL != pTask->candidateAddrs) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
@ -2250,7 +2165,7 @@ _return:
|
|||
SCH_RET(code);
|
||||
}
|
||||
|
||||
int32_t schStaticExplain(void *transport, SArray *pNodeList, SQueryPlan *pDag, int64_t *job, const char *sql,
|
||||
int32_t schExecStaticExplain(void *transport, SArray *pNodeList, SQueryPlan *pDag, int64_t *job, const char *sql,
|
||||
bool syncSchedule) {
|
||||
qDebug("QID:0x%" PRIx64 " job started", pDag->queryId);
|
||||
|
||||
|
@ -2264,8 +2179,10 @@ int32_t schStaticExplain(void *transport, SArray *pNodeList, SQueryPlan *pDag, i
|
|||
pJob->sql = sql;
|
||||
pJob->attr.queryJob = true;
|
||||
pJob->attr.explainMode = pDag->explainInfo.mode;
|
||||
pJob->queryId = pDag->queryId;
|
||||
pJob->subPlans = pDag->pSubplans;
|
||||
|
||||
SCH_ERR_JRET(schValidateAndBuildJobExplain(pDag, pJob));
|
||||
SCH_ERR_JRET(qExecStaticExplain(pDag, (SRetrieveTableRsp **)&pJob->resData));
|
||||
|
||||
int64_t refId = taosAddRef(schMgmt.jobRef, pJob);
|
||||
if (refId < 0) {
|
||||
|
@ -2345,7 +2262,7 @@ int32_t schedulerExecJob(void *transport, SArray *nodeList, SQueryPlan *pDag, in
|
|||
}
|
||||
|
||||
if (EXPLAIN_MODE_STATIC == pDag->explainInfo.mode) {
|
||||
SCH_ERR_RET(schStaticExplain(transport, nodeList, pDag, pJob, sql, true));
|
||||
SCH_ERR_RET(schExecStaticExplain(transport, nodeList, pDag, pJob, sql, true));
|
||||
} else {
|
||||
SCH_ERR_RET(schExecJobImpl(transport, nodeList, pDag, pJob, sql, true));
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue