feature/qnode
This commit is contained in:
parent
08bd344d2f
commit
35f8a337af
|
@ -389,6 +389,7 @@ typedef enum ELogicConditionType {
|
||||||
#define TSDB_DEFAULT_EXPLAIN_RATIO 0.001
|
#define TSDB_DEFAULT_EXPLAIN_RATIO 0.001
|
||||||
|
|
||||||
#define TSDB_EXPLAIN_RESULT_ROW_SIZE 1024
|
#define TSDB_EXPLAIN_RESULT_ROW_SIZE 1024
|
||||||
|
#define TSDB_EXPLAIN_RESULT_COLUMN_NAME "QUERY PLAN"
|
||||||
|
|
||||||
#define TSDB_MAX_JOIN_TABLE_NUM 10
|
#define TSDB_MAX_JOIN_TABLE_NUM 10
|
||||||
#define TSDB_MAX_UNION_CLAUSE 5
|
#define TSDB_MAX_UNION_CLAUSE 5
|
||||||
|
|
|
@ -1944,6 +1944,7 @@ int32_t qExtractResultSchema(const SNode* pRoot, int32_t* numOfCols, SSchema** p
|
||||||
}
|
}
|
||||||
(*pSchema)[0].type = TSDB_DATA_TYPE_BINARY;
|
(*pSchema)[0].type = TSDB_DATA_TYPE_BINARY;
|
||||||
(*pSchema)[0].bytes = TSDB_EXPLAIN_RESULT_ROW_SIZE;
|
(*pSchema)[0].bytes = TSDB_EXPLAIN_RESULT_ROW_SIZE;
|
||||||
|
strcpy((*pSchema)[0].name, TSDB_EXPLAIN_RESULT_COLUMN_NAME);
|
||||||
}
|
}
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
|
|
@ -22,8 +22,6 @@ extern "C" {
|
||||||
#include "nodes.h"
|
#include "nodes.h"
|
||||||
#include "plannodes.h"
|
#include "plannodes.h"
|
||||||
|
|
||||||
#define QUERY_EXPLAIN_MAX_RES_LEN 1024
|
|
||||||
|
|
||||||
//newline area
|
//newline area
|
||||||
#define EXPLAIN_TAG_SCAN_FORMAT "Tag scan on %s columns=%d"
|
#define EXPLAIN_TAG_SCAN_FORMAT "Tag scan on %s columns=%d"
|
||||||
#define EXPLAIN_TBL_SCAN_FORMAT "Table scan on %s columns=%d"
|
#define EXPLAIN_TBL_SCAN_FORMAT "Table scan on %s columns=%d"
|
||||||
|
@ -42,8 +40,8 @@ extern "C" {
|
||||||
#define EXPLAIN_TIMERANGE_FORMAT "Time range: [%" PRId64 ", %" PRId64 "]"
|
#define EXPLAIN_TIMERANGE_FORMAT "Time range: [%" PRId64 ", %" PRId64 "]"
|
||||||
|
|
||||||
//append area
|
//append area
|
||||||
#define EXPLAIN_LOOPS_FORMAT "loops %d"
|
#define EXPLAIN_LOOPS_FORMAT "loops=%d"
|
||||||
#define EXPLAIN_REVERSE_FORMAT "reverse %d"
|
#define EXPLAIN_REVERSE_FORMAT "reverse=%d"
|
||||||
|
|
||||||
|
|
||||||
typedef struct SExplainResNode {
|
typedef struct SExplainResNode {
|
||||||
|
@ -68,11 +66,11 @@ typedef struct SExplainRowCtx {
|
||||||
|
|
||||||
#define EXPLAIN_ROW_NEW(level, ...) \
|
#define EXPLAIN_ROW_NEW(level, ...) \
|
||||||
do { \
|
do { \
|
||||||
tlen = snprintf(tbuf + VARSTR_HEADER_SIZE, QUERY_EXPLAIN_MAX_RES_LEN, "%*s", level, ""); \
|
tlen = snprintf(tbuf + VARSTR_HEADER_SIZE, TSDB_EXPLAIN_RESULT_ROW_SIZE, "%*s", (level) * 2, ""); \
|
||||||
tlen += snprintf(tbuf + VARSTR_HEADER_SIZE + tlen, QUERY_EXPLAIN_MAX_RES_LEN - tlen, __VA_ARGS__); \
|
tlen += snprintf(tbuf + VARSTR_HEADER_SIZE + tlen, TSDB_EXPLAIN_RESULT_ROW_SIZE - tlen, __VA_ARGS__); \
|
||||||
} while (0)
|
} while (0)
|
||||||
|
|
||||||
#define EXPLAIN_ROW_APPEND(...) tlen += snprintf(tbuf + VARSTR_HEADER_SIZE + tlen, QUERY_EXPLAIN_MAX_RES_LEN - tlen, __VA_ARGS__)
|
#define EXPLAIN_ROW_APPEND(...) tlen += snprintf(tbuf + VARSTR_HEADER_SIZE + tlen, TSDB_EXPLAIN_RESULT_ROW_SIZE - tlen, __VA_ARGS__)
|
||||||
#define EXPLAIN_ROW_END() do { varDataSetLen(tbuf, tlen); tlen += VARSTR_HEADER_SIZE; } while (0)
|
#define EXPLAIN_ROW_END() do { varDataSetLen(tbuf, tlen); tlen += VARSTR_HEADER_SIZE; } while (0)
|
||||||
|
|
||||||
#define QRY_ERR_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; return _code; } } while (0)
|
#define QRY_ERR_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; return _code; } } while (0)
|
||||||
|
|
|
@ -204,12 +204,13 @@ int32_t qExplainBufAppendExecInfo(void *pExecInfo, char *tbuf, int32_t *len) {
|
||||||
|
|
||||||
int32_t qExplainResAppendRow(SExplainRowCtx *ctx, char *tbuf, int32_t len, int32_t level) {
|
int32_t qExplainResAppendRow(SExplainRowCtx *ctx, char *tbuf, int32_t len, int32_t level) {
|
||||||
SQueryExplainRowInfo row = {0};
|
SQueryExplainRowInfo row = {0};
|
||||||
row.buf = strdup(tbuf);
|
row.buf = taosMemoryMalloc(len);
|
||||||
if (NULL == row.buf) {
|
if (NULL == row.buf) {
|
||||||
qError("strdup %s failed", tbuf);
|
qError("taosMemoryMalloc %d failed", len);
|
||||||
QRY_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
QRY_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
memcpy(row.buf, tbuf, len);
|
||||||
row.level = level;
|
row.level = level;
|
||||||
row.len = len;
|
row.len = len;
|
||||||
ctx->totalSize += len;
|
ctx->totalSize += len;
|
||||||
|
@ -275,7 +276,7 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainRowCtx *ctx
|
||||||
|
|
||||||
if (pTblScanNode->scan.node.pConditions) {
|
if (pTblScanNode->scan.node.pConditions) {
|
||||||
EXPLAIN_ROW_NEW(level + 1, EXPLAIN_FILTER_FORMAT);
|
EXPLAIN_ROW_NEW(level + 1, EXPLAIN_FILTER_FORMAT);
|
||||||
QRY_ERR_RET(nodesNodeToSQL(pTblScanNode->scan.node.pConditions, tbuf, QUERY_EXPLAIN_MAX_RES_LEN, &tlen));
|
QRY_ERR_RET(nodesNodeToSQL(pTblScanNode->scan.node.pConditions, tbuf, TSDB_EXPLAIN_RESULT_ROW_SIZE, &tlen));
|
||||||
EXPLAIN_ROW_END();
|
EXPLAIN_ROW_END();
|
||||||
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1));
|
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1));
|
||||||
}
|
}
|
||||||
|
@ -310,7 +311,7 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainRowCtx *ctx
|
||||||
|
|
||||||
if (pPrjNode->node.pConditions) {
|
if (pPrjNode->node.pConditions) {
|
||||||
EXPLAIN_ROW_NEW(level + 1, EXPLAIN_FILTER_FORMAT);
|
EXPLAIN_ROW_NEW(level + 1, EXPLAIN_FILTER_FORMAT);
|
||||||
QRY_ERR_RET(nodesNodeToSQL(pPrjNode->node.pConditions, tbuf, QUERY_EXPLAIN_MAX_RES_LEN, &tlen));
|
QRY_ERR_RET(nodesNodeToSQL(pPrjNode->node.pConditions, tbuf, TSDB_EXPLAIN_RESULT_ROW_SIZE, &tlen));
|
||||||
EXPLAIN_ROW_END();
|
EXPLAIN_ROW_END();
|
||||||
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1));
|
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1));
|
||||||
}
|
}
|
||||||
|
@ -327,13 +328,13 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainRowCtx *ctx
|
||||||
|
|
||||||
if (pJoinNode->node.pConditions) {
|
if (pJoinNode->node.pConditions) {
|
||||||
EXPLAIN_ROW_NEW(level + 1, EXPLAIN_FILTER_FORMAT);
|
EXPLAIN_ROW_NEW(level + 1, EXPLAIN_FILTER_FORMAT);
|
||||||
QRY_ERR_RET(nodesNodeToSQL(pJoinNode->node.pConditions, tbuf, QUERY_EXPLAIN_MAX_RES_LEN, &tlen));
|
QRY_ERR_RET(nodesNodeToSQL(pJoinNode->node.pConditions, tbuf, TSDB_EXPLAIN_RESULT_ROW_SIZE, &tlen));
|
||||||
EXPLAIN_ROW_END();
|
EXPLAIN_ROW_END();
|
||||||
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1));
|
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1));
|
||||||
}
|
}
|
||||||
|
|
||||||
EXPLAIN_ROW_NEW(level + 1, EXPLAIN_ON_CONDITIONS_FORMAT);
|
EXPLAIN_ROW_NEW(level + 1, EXPLAIN_ON_CONDITIONS_FORMAT);
|
||||||
QRY_ERR_RET(nodesNodeToSQL(pJoinNode->pOnConditions, tbuf, QUERY_EXPLAIN_MAX_RES_LEN, &tlen));
|
QRY_ERR_RET(nodesNodeToSQL(pJoinNode->pOnConditions, tbuf, TSDB_EXPLAIN_RESULT_ROW_SIZE, &tlen));
|
||||||
EXPLAIN_ROW_END();
|
EXPLAIN_ROW_END();
|
||||||
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1));
|
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1));
|
||||||
break;
|
break;
|
||||||
|
@ -349,7 +350,7 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainRowCtx *ctx
|
||||||
|
|
||||||
if (pAggNode->node.pConditions) {
|
if (pAggNode->node.pConditions) {
|
||||||
EXPLAIN_ROW_NEW(level + 1, EXPLAIN_FILTER_FORMAT);
|
EXPLAIN_ROW_NEW(level + 1, EXPLAIN_FILTER_FORMAT);
|
||||||
QRY_ERR_RET(nodesNodeToSQL(pAggNode->node.pConditions, tbuf, QUERY_EXPLAIN_MAX_RES_LEN, &tlen));
|
QRY_ERR_RET(nodesNodeToSQL(pAggNode->node.pConditions, tbuf, TSDB_EXPLAIN_RESULT_ROW_SIZE, &tlen));
|
||||||
EXPLAIN_ROW_END();
|
EXPLAIN_ROW_END();
|
||||||
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1));
|
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1));
|
||||||
}
|
}
|
||||||
|
@ -366,7 +367,7 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainRowCtx *ctx
|
||||||
|
|
||||||
if (pExchNode->node.pConditions) {
|
if (pExchNode->node.pConditions) {
|
||||||
EXPLAIN_ROW_NEW(level + 1, EXPLAIN_FILTER_FORMAT);
|
EXPLAIN_ROW_NEW(level + 1, EXPLAIN_FILTER_FORMAT);
|
||||||
QRY_ERR_RET(nodesNodeToSQL(pExchNode->node.pConditions, tbuf, QUERY_EXPLAIN_MAX_RES_LEN, &tlen));
|
QRY_ERR_RET(nodesNodeToSQL(pExchNode->node.pConditions, tbuf, TSDB_EXPLAIN_RESULT_ROW_SIZE, &tlen));
|
||||||
EXPLAIN_ROW_END();
|
EXPLAIN_ROW_END();
|
||||||
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1));
|
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1));
|
||||||
}
|
}
|
||||||
|
@ -383,7 +384,7 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainRowCtx *ctx
|
||||||
|
|
||||||
if (pSortNode->node.pConditions) {
|
if (pSortNode->node.pConditions) {
|
||||||
EXPLAIN_ROW_NEW(level + 1, EXPLAIN_FILTER_FORMAT);
|
EXPLAIN_ROW_NEW(level + 1, EXPLAIN_FILTER_FORMAT);
|
||||||
QRY_ERR_RET(nodesNodeToSQL(pSortNode->node.pConditions, tbuf, QUERY_EXPLAIN_MAX_RES_LEN, &tlen));
|
QRY_ERR_RET(nodesNodeToSQL(pSortNode->node.pConditions, tbuf, TSDB_EXPLAIN_RESULT_ROW_SIZE, &tlen));
|
||||||
EXPLAIN_ROW_END();
|
EXPLAIN_ROW_END();
|
||||||
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1));
|
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1));
|
||||||
}
|
}
|
||||||
|
@ -407,7 +408,7 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainRowCtx *ctx
|
||||||
|
|
||||||
if (pIntNode->window.node.pConditions) {
|
if (pIntNode->window.node.pConditions) {
|
||||||
EXPLAIN_ROW_NEW(level + 1, EXPLAIN_FILTER_FORMAT);
|
EXPLAIN_ROW_NEW(level + 1, EXPLAIN_FILTER_FORMAT);
|
||||||
QRY_ERR_RET(nodesNodeToSQL(pIntNode->window.node.pConditions, tbuf, QUERY_EXPLAIN_MAX_RES_LEN, &tlen));
|
QRY_ERR_RET(nodesNodeToSQL(pIntNode->window.node.pConditions, tbuf, TSDB_EXPLAIN_RESULT_ROW_SIZE, &tlen));
|
||||||
EXPLAIN_ROW_END();
|
EXPLAIN_ROW_END();
|
||||||
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1));
|
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1));
|
||||||
}
|
}
|
||||||
|
@ -424,7 +425,7 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainRowCtx *ctx
|
||||||
|
|
||||||
if (pIntNode->window.node.pConditions) {
|
if (pIntNode->window.node.pConditions) {
|
||||||
EXPLAIN_ROW_NEW(level + 1, EXPLAIN_FILTER_FORMAT);
|
EXPLAIN_ROW_NEW(level + 1, EXPLAIN_FILTER_FORMAT);
|
||||||
QRY_ERR_RET(nodesNodeToSQL(pIntNode->window.node.pConditions, tbuf, QUERY_EXPLAIN_MAX_RES_LEN, &tlen));
|
QRY_ERR_RET(nodesNodeToSQL(pIntNode->window.node.pConditions, tbuf, TSDB_EXPLAIN_RESULT_ROW_SIZE, &tlen));
|
||||||
EXPLAIN_ROW_END();
|
EXPLAIN_ROW_END();
|
||||||
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1));
|
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1));
|
||||||
}
|
}
|
||||||
|
@ -463,9 +464,9 @@ int32_t qGenerateExplainResRowsFromNode(SExplainResNode *pResNode, SExplainRowCt
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
char *tbuf = taosMemoryMalloc(QUERY_EXPLAIN_MAX_RES_LEN);
|
char *tbuf = taosMemoryMalloc(TSDB_EXPLAIN_RESULT_ROW_SIZE);
|
||||||
if (NULL == tbuf) {
|
if (NULL == tbuf) {
|
||||||
qError("malloc size %d failed", QUERY_EXPLAIN_MAX_RES_LEN);
|
qError("malloc size %d failed", TSDB_EXPLAIN_RESULT_ROW_SIZE);
|
||||||
QRY_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
QRY_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -536,9 +537,9 @@ int32_t qGetExplainRspFromRowCtx(void *ctx, SRetrieveTableRsp **pRsp) {
|
||||||
}
|
}
|
||||||
|
|
||||||
rsp->completed = 1;
|
rsp->completed = 1;
|
||||||
rsp->numOfRows = rowNum;
|
rsp->numOfRows = htonl(rowNum);
|
||||||
|
|
||||||
*(int32_t *)rsp->data = pCtx->totalSize;
|
*(int32_t *)rsp->data = htonl(pCtx->totalSize);
|
||||||
|
|
||||||
int32_t *offset = (int32_t *)((char *)rsp->data + sizeof(int32_t));
|
int32_t *offset = (int32_t *)((char *)rsp->data + sizeof(int32_t));
|
||||||
char *data = (char *)(offset + rowNum);
|
char *data = (char *)(offset + rowNum);
|
||||||
|
@ -555,6 +556,8 @@ int32_t qGetExplainRspFromRowCtx(void *ctx, SRetrieveTableRsp **pRsp) {
|
||||||
data += row->len;
|
data += row->len;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
*pRsp = rsp;
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -2261,6 +2261,7 @@ int32_t schStaticExplain(void *transport, SArray *pNodeList, SQueryPlan *pDag, i
|
||||||
|
|
||||||
pJob->sql = sql;
|
pJob->sql = sql;
|
||||||
pJob->attr.queryJob = true;
|
pJob->attr.queryJob = true;
|
||||||
|
pJob->attr.explainMode = pDag->explainInfo.mode;
|
||||||
|
|
||||||
SCH_ERR_JRET(schValidateAndBuildJobExplain(pDag, pJob));
|
SCH_ERR_JRET(schValidateAndBuildJobExplain(pDag, pJob));
|
||||||
|
|
||||||
|
@ -2528,7 +2529,7 @@ int32_t schedulerFetchRows(int64_t job, void **pData) {
|
||||||
SCH_JOB_DLOG("job already succeed, status:%s", jobTaskStatusStr(status));
|
SCH_JOB_DLOG("job already succeed, status:%s", jobTaskStatusStr(status));
|
||||||
goto _return;
|
goto _return;
|
||||||
} else if (status == JOB_TASK_STATUS_PARTIAL_SUCCEED) {
|
} else if (status == JOB_TASK_STATUS_PARTIAL_SUCCEED) {
|
||||||
if (!pJob->attr.explainMode == EXPLAIN_MODE_STATIC) {
|
if (!(pJob->attr.explainMode == EXPLAIN_MODE_STATIC)) {
|
||||||
SCH_ERR_JRET(schFetchFromRemote(pJob));
|
SCH_ERR_JRET(schFetchFromRemote(pJob));
|
||||||
tsem_wait(&pJob->rspSem);
|
tsem_wait(&pJob->rspSem);
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue