diff --git a/include/common/tmsg.h b/include/common/tmsg.h index cd16bbf862..4cfcf0e888 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -915,6 +915,12 @@ typedef struct { char data[]; } SRetrieveMetaTableRsp; +typedef struct { + int32_t numOfItems; + char data[]; +} SExplainRsp; + + typedef struct { char fqdn[TSDB_FQDN_LEN]; // end point, hostname:port int32_t port; @@ -1052,6 +1058,7 @@ typedef struct SSubQueryMsg { uint64_t taskId; int64_t refId; int8_t taskType; + int8_t explain; uint32_t sqlLen; // the query sql, uint32_t phyLen; char msg[]; diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index 36a489eb59..1d4667cda0 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -188,6 +188,7 @@ enum { TD_DEF_MSG_TYPE(TDMT_VND_SHOW_TABLES_FETCH, "vnode-show-tables-fetch", SVShowTablesFetchReq, SVShowTablesFetchRsp) TD_DEF_MSG_TYPE(TDMT_VND_QUERY_CONTINUE, "vnode-query-continue", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_QUERY_HEARTBEAT, "vnode-query-heartbeat", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_VND_EXPLAIN, "vnode-explain", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_SUBSCRIBE, "vnode-subscribe", SMVSubscribeReq, SMVSubscribeRsp) TD_DEF_MSG_TYPE(TDMT_VND_CONSUME, "vnode-consume", SMqCVConsumeReq, SMqCVConsumeRsp) diff --git a/include/libs/command/command.h b/include/libs/command/command.h index 7e58d39692..31e370ac5c 100644 --- a/include/libs/command/command.h +++ b/include/libs/command/command.h @@ -17,8 +17,20 @@ #include "tmsg.h" #include "plannodes.h" +typedef struct SExplainCtx SExplainCtx; + +typedef struct SExplainExecInfo { + uint64_t startupCost; + uint64_t totalCost; + uint64_t numOfRows; +} SExplainExecInfo; + int32_t qExecCommand(SNode* pStmt, SRetrieveTableRsp** pRsp); int32_t qExecStaticExplain(SQueryPlan *pDag, SRetrieveTableRsp **pRsp); +int32_t qExecExplainBegin(SQueryPlan *pDag, SExplainCtx **pCtx, int32_t startTs); +int32_t qExecExplainEnd(SExplainCtx *pCtx); +int32_t qExplainUpdateExecInfo(SExplainCtx *pCtx, SExplainRsp *pRspMsg, int32_t groupId, SRetrieveTableRsp **pRsp); +void qExplainFreeCtx(SExplainCtx *pCtx); diff --git a/include/libs/executor/executor.h b/include/libs/executor/executor.h index d3cd828cf5..4d289147d0 100644 --- a/include/libs/executor/executor.h +++ b/include/libs/executor/executor.h @@ -174,6 +174,8 @@ void** qReleaseTask(void* pMgmt, void* pQInfo, bool freeHandle); void qProcessFetchRsp(void* parent, struct SRpcMsg* pMsg, struct SEpSet* pEpSet); +int32_t qGetExplainExecInfo(qTaskInfo_t tinfo, int32_t *resNum, SExplainExecInfo **pRes); + #ifdef __cplusplus } #endif diff --git a/include/libs/nodes/nodes.h b/include/libs/nodes/nodes.h index 9849dfca39..2979040c07 100644 --- a/include/libs/nodes/nodes.h +++ b/include/libs/nodes/nodes.h @@ -216,6 +216,7 @@ int32_t nodesListToString(const SNodeList* pList, bool format, char** pStr, int3 int32_t nodesStringToList(const char* pStr, SNodeList** pList); int32_t nodesNodeToSQL(SNode *pNode, char *buf, int32_t bufSize, int32_t *len); +char *nodesGetNameFromColumnNode(SNode *pNode); #ifdef __cplusplus } diff --git a/include/libs/nodes/plannodes.h b/include/libs/nodes/plannodes.h index 37163f60dd..eda7882e2c 100644 --- a/include/libs/nodes/plannodes.h +++ b/include/libs/nodes/plannodes.h @@ -308,6 +308,7 @@ typedef enum EExplainMode { typedef struct SExplainInfo { EExplainMode mode; bool verbose; + double ratio; } SExplainInfo; typedef struct SQueryPlan { diff --git a/include/libs/nodes/querynodes.h b/include/libs/nodes/querynodes.h index ddcbaa0bee..04fa8f21df 100644 --- a/include/libs/nodes/querynodes.h +++ b/include/libs/nodes/querynodes.h @@ -315,6 +315,7 @@ bool nodesIsTimelineQuery(const SNode* pQuery); void* nodesGetValueFromNode(SValueNode *pNode); char* nodesGetStrValueFromNode(SValueNode *pNode); +char *getFillModeString(EFillMode mode); #ifdef __cplusplus } diff --git a/include/libs/scheduler/scheduler.h b/include/libs/scheduler/scheduler.h index 16a6ae32cf..5ab4ead89c 100644 --- a/include/libs/scheduler/scheduler.h +++ b/include/libs/scheduler/scheduler.h @@ -71,7 +71,7 @@ int32_t schedulerInit(SSchedulerCfg *cfg); * @param nodeList Qnode/Vnode address list, element is SQueryNodeAddr * @return */ -int32_t schedulerExecJob(void *transport, SArray *nodeList, SQueryPlan* pDag, int64_t* pJob, const char* sql, SQueryResult *pRes); +int32_t schedulerExecJob(void *transport, SArray *nodeList, SQueryPlan *pDag, int64_t *pJob, const char *sql, int64_t startTs, SQueryResult *pRes); /** * Process the query job, generated according to the query physical plan. diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index 8cd53f18b0..1512322f1a 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -241,7 +241,7 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList void* pTransporter = pRequest->pTscObj->pAppInfo->pTransporter; SQueryResult res = {.code = 0, .numOfRows = 0, .msgSize = ERROR_MSG_BUF_DEFAULT_SIZE, .msg = pRequest->msgBuf}; - int32_t code = schedulerExecJob(pTransporter, pNodeList, pDag, &pRequest->body.queryJob, pRequest->sqlstr, &res); + int32_t code = schedulerExecJob(pTransporter, pNodeList, pDag, &pRequest->body.queryJob, pRequest->sqlstr, pRequest->metric.start, &res); if (code != TSDB_CODE_SUCCESS) { if (pRequest->body.queryJob != 0) { schedulerFreeJob(pRequest->body.queryJob); diff --git a/source/libs/command/inc/commandInt.h b/source/libs/command/inc/commandInt.h index 771baca2ab..de0a7c433a 100644 --- a/source/libs/command/inc/commandInt.h +++ b/source/libs/command/inc/commandInt.h @@ -50,8 +50,9 @@ extern "C" { typedef struct SExplainGroup { int32_t nodeNum; + SRWLatch lock; SSubplan *plan; - void *execInfo; //TODO + SArray *execInfo; } SExplainGroup; typedef struct SExplainResNode { @@ -67,10 +68,18 @@ typedef struct SQueryExplainRowInfo { } SQueryExplainRowInfo; typedef struct SExplainCtx { - int32_t totalSize; + double ratio; bool verbose; + + int32_t rootGroupId; + int32_t dataSize; + bool execDone; + int64_t reqStartTs; + int64_t jobStartTs; + int64_t jobDoneTs; char *tbuf; SArray *rows; + int32_t groupDoneNum; SHashObj *groupHash; } SExplainCtx; diff --git a/source/libs/command/src/explain.c b/source/libs/command/src/explain.c index 847a863e76..4bf354dee5 100644 --- a/source/libs/command/src/explain.c +++ b/source/libs/command/src/explain.c @@ -17,32 +17,31 @@ #include "plannodes.h" #include "commandInt.h" -int32_t qGenerateExplainResNode(SPhysiNode *pNode, void *pExecInfo, SExplainResNode **pRes); -int32_t qAppendTaskExplainResRows(void *pCtx, int32_t groupId, int32_t level); +int32_t qExplainGenerateResNode(SPhysiNode *pNode, void *pExecInfo, SExplainResNode **pRes); +int32_t qExplainAppendGroupResRows(void *pCtx, int32_t groupId, int32_t level); -void qFreeExplainResTree(SExplainResNode *res) { - if (NULL == res) { +void qExplainFreeResNode(SExplainResNode *resNode) { + if (NULL == resNode) { return; } - taosMemoryFreeClear(res->pExecInfo); + taosMemoryFreeClear(resNode->pExecInfo); SNode* node = NULL; - FOREACH(node, res->pChildren) { - qFreeExplainResTree((SExplainResNode *)node); + FOREACH(node, resNode->pChildren) { + qExplainFreeResNode((SExplainResNode *)node); } - nodesClearList(res->pChildren); + nodesClearList(resNode->pChildren); - taosMemoryFreeClear(res); + taosMemoryFreeClear(resNode); } -void qFreeExplainCtx(void *ctx) { - if (NULL == ctx) { +void qExplainFreeCtx(SExplainCtx *pCtx) { + if (NULL == pCtx) { return; } - SExplainCtx *pCtx = (SExplainCtx *)ctx; int32_t rowSize = taosArrayGetSize(pCtx->rows); for (int32_t i = 0; i < rowSize; ++i) { SQueryExplainRowInfo *row = taosArrayGet(pCtx->rows, i); @@ -54,7 +53,7 @@ void qFreeExplainCtx(void *ctx) { taosMemoryFree(pCtx); } -int32_t qInitExplainCtx(void **pCtx, SHashObj *groupHash, bool verbose) { +int32_t qExplainInitCtx(SExplainCtx **pCtx, SHashObj *groupHash, bool verbose, double ratio) { int32_t code = 0; SExplainCtx *ctx = taosMemoryCalloc(1, sizeof(SExplainCtx)); if (NULL == ctx) { @@ -75,6 +74,7 @@ int32_t qInitExplainCtx(void **pCtx, SHashObj *groupHash, bool verbose) { } ctx->verbose = verbose; + ctx->ratio = ratio; ctx->tbuf = tbuf; ctx->rows = rows; ctx->groupHash = groupHash; @@ -92,35 +92,7 @@ _return: QRY_RET(code); } - -char *qFillModeString(EFillMode mode) { - switch (mode) { - case FILL_MODE_NONE: - return "none"; - case FILL_MODE_VALUE: - return "value"; - case FILL_MODE_PREV: - return "prev"; - case FILL_MODE_NULL: - return "null"; - case FILL_MODE_LINEAR: - return "linear"; - case FILL_MODE_NEXT: - return "next"; - default: - return "unknown"; - } -} - -char *qGetNameFromColumnNode(SNode *pNode) { - if (NULL == pNode || QUERY_NODE_COLUMN != pNode->type) { - return "NULL"; - } - - return ((SColumnNode *)pNode)->colName; -} - -int32_t qGenerateExplainResChildren(SPhysiNode *pNode, void *pExecInfo, SNodeList **pChildren) { +int32_t qExplainGenerateResChildren(SPhysiNode *pNode, void *pExecInfo, SNodeList **pChildren) { int32_t tlen = 0; SNodeList *pPhysiChildren = NULL; @@ -192,38 +164,38 @@ int32_t qGenerateExplainResChildren(SPhysiNode *pNode, void *pExecInfo, SNodeLis SNode* node = NULL; SExplainResNode *pResNode = NULL; FOREACH(node, pPhysiChildren) { - QRY_ERR_RET(qGenerateExplainResNode((SPhysiNode *)node, pExecInfo, &pResNode)); + QRY_ERR_RET(qExplainGenerateResNode((SPhysiNode *)node, pExecInfo, &pResNode)); QRY_ERR_RET(nodesListAppend(*pChildren, pResNode)); } return TSDB_CODE_SUCCESS; } -int32_t qGenerateExplainResNode(SPhysiNode *pNode, void *pExecInfo, SExplainResNode **pRes) { +int32_t qExplainGenerateResNode(SPhysiNode *pNode, void *pExecInfo, SExplainResNode **pResNode) { if (NULL == pNode) { - *pRes = NULL; + *pResNode = NULL; qError("physical node is NULL"); return TSDB_CODE_QRY_APP_ERROR; } - SExplainResNode *res = taosMemoryCalloc(1, sizeof(SExplainResNode)); - if (NULL == res) { + SExplainResNode *resNode = taosMemoryCalloc(1, sizeof(SExplainResNode)); + if (NULL == resNode) { qError("calloc SPhysiNodeExplainRes failed"); return TSDB_CODE_QRY_OUT_OF_MEMORY; } int32_t code = 0; - res->pNode = pNode; - res->pExecInfo = pExecInfo; - QRY_ERR_JRET(qGenerateExplainResChildren(pNode, pExecInfo, &res->pChildren)); + resNode->pNode = pNode; + resNode->pExecInfo = pExecInfo; + QRY_ERR_JRET(qExplainGenerateResChildren(pNode, pExecInfo, &resNode->pChildren)); - *pRes = res; + *pResNode = resNode; return TSDB_CODE_SUCCESS; _return: - qFreeExplainResTree(res); + qExplainFreeResNode(resNode); QRY_RET(code); } @@ -249,7 +221,7 @@ int32_t qExplainResAppendRow(SExplainCtx *ctx, char *tbuf, int32_t len, int32_t memcpy(row.buf, tbuf, len); row.level = level; row.len = len; - ctx->totalSize += len; + ctx->dataSize += len; if (NULL == taosArrayPush(ctx->rows, &row)) { qError("taosArrayPush row to explain res rows failed"); @@ -444,7 +416,7 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i } } - QRY_ERR_RET(qAppendTaskExplainResRows(ctx, pExchNode->srcGroupId, level + 1)); + QRY_ERR_RET(qExplainAppendGroupResRows(ctx, pExchNode->srcGroupId, level + 1)); break; } case QUERY_NODE_PHYSICAL_PLAN_SORT:{ @@ -468,7 +440,7 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i } case QUERY_NODE_PHYSICAL_PLAN_INTERVAL:{ SIntervalPhysiNode *pIntNode = (SIntervalPhysiNode *)pNode; - EXPLAIN_ROW_NEW(level, EXPLAIN_INTERVAL_FORMAT, qGetNameFromColumnNode(pIntNode->pTspk), pIntNode->window.pFuncs->length, + EXPLAIN_ROW_NEW(level, EXPLAIN_INTERVAL_FORMAT, nodesGetNameFromColumnNode(pIntNode->pTspk), pIntNode->window.pFuncs->length, INVERAL_TIME_FROM_PRECISION_TO_UNIT(pIntNode->interval, pIntNode->intervalUnit, pIntNode->precision), pIntNode->intervalUnit, pIntNode->offset, getPrecisionUnit(pIntNode->precision), INVERAL_TIME_FROM_PRECISION_TO_UNIT(pIntNode->sliding, pIntNode->slidingUnit, pIntNode->precision), pIntNode->slidingUnit, @@ -481,7 +453,7 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i if (verbose) { if (pIntNode->pFill) { - EXPLAIN_ROW_NEW(level + 1, EXPLAIN_FILL_FORMAT, qFillModeString(pIntNode->pFill->mode)); + EXPLAIN_ROW_NEW(level + 1, EXPLAIN_FILL_FORMAT, getFillModeString(pIntNode->pFill->mode)); EXPLAIN_ROW_END(); QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1)); } @@ -540,7 +512,7 @@ int32_t qExplainResNodeToRows(SExplainResNode *pResNode, SExplainCtx *ctx, int32 return TSDB_CODE_SUCCESS; } -int32_t qAppendTaskExplainResRows(void *pCtx, int32_t groupId, int32_t level) { +int32_t qExplainAppendGroupResRows(void *pCtx, int32_t groupId, int32_t level) { SExplainResNode *node = NULL; int32_t code = 0; SExplainCtx *ctx = (SExplainCtx *)pCtx; @@ -551,19 +523,19 @@ int32_t qAppendTaskExplainResRows(void *pCtx, int32_t groupId, int32_t level) { QRY_ERR_RET(TSDB_CODE_QRY_APP_ERROR); } - QRY_ERR_RET(qGenerateExplainResNode(group->plan->pNode, group->execInfo, &node)); + QRY_ERR_RET(qExplainGenerateResNode(group->plan->pNode, group->execInfo, &node)); QRY_ERR_JRET(qExplainResNodeToRows(node, ctx, level)); _return: - qFreeExplainResTree(node); + qExplainFreeResNode(node); QRY_RET(code); } -int32_t qGetExplainRspFromCtx(void *ctx, SRetrieveTableRsp **pRsp) { +int32_t qExplainGetRspFromCtx(void *ctx, SRetrieveTableRsp **pRsp) { SExplainCtx *pCtx = (SExplainCtx *)ctx; int32_t rowNum = taosArrayGetSize(pCtx->rows); if (rowNum <= 0) { @@ -572,7 +544,7 @@ int32_t qGetExplainRspFromCtx(void *ctx, SRetrieveTableRsp **pRsp) { } int32_t colNum = 1; - int32_t rspSize = sizeof(SRetrieveTableRsp) + sizeof(int32_t) * colNum + sizeof(int32_t) * rowNum + pCtx->totalSize; + int32_t rspSize = sizeof(SRetrieveTableRsp) + sizeof(int32_t) * colNum + sizeof(int32_t) * rowNum + pCtx->dataSize; SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)taosMemoryCalloc(1, rspSize); if (NULL == rsp) { qError("malloc SRetrieveTableRsp failed, size:%d", rspSize); @@ -582,7 +554,7 @@ int32_t qGetExplainRspFromCtx(void *ctx, SRetrieveTableRsp **pRsp) { rsp->completed = 1; rsp->numOfRows = htonl(rowNum); - *(int32_t *)rsp->data = htonl(pCtx->totalSize); + *(int32_t *)rsp->data = htonl(pCtx->dataSize); int32_t *offset = (int32_t *)((char *)rsp->data + sizeof(int32_t)); char *data = (char *)(offset + rowNum); @@ -604,13 +576,13 @@ int32_t qGetExplainRspFromCtx(void *ctx, SRetrieveTableRsp **pRsp) { return TSDB_CODE_SUCCESS; } -int32_t qExecStaticExplain(SQueryPlan *pDag, SRetrieveTableRsp **pRsp) { + +int32_t qExplainPrepareCtx(SQueryPlan *pDag, SExplainCtx **pCtx) { int32_t code = 0; SNodeListNode *plans = NULL; int32_t taskNum = 0; SExplainGroup *pGroup = NULL; - void *pCtx = NULL; - int32_t rootGroupId = 0; + SExplainCtx *ctx = NULL; if (pDag->numOfSubplans <= 0) { qError("invalid subplan num:%d", pDag->numOfSubplans); @@ -629,7 +601,7 @@ int32_t qExecStaticExplain(SQueryPlan *pDag, SRetrieveTableRsp **pRsp) { QRY_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } - QRY_ERR_JRET(qInitExplainCtx(&pCtx, groupHash, pDag->explainInfo.verbose)); + QRY_ERR_JRET(qExplainInitCtx(&ctx, groupHash, pDag->explainInfo.verbose, pDag->explainInfo.ratio)); for (int32_t i = 0; i < levelNum; ++i) { plans = (SNodeListNode *)nodesListGetNode(pDag->pSubplans, i); @@ -666,22 +638,62 @@ int32_t qExecStaticExplain(SQueryPlan *pDag, SRetrieveTableRsp **pRsp) { QRY_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT); } - rootGroupId = plan->id.groupId; + ctx->rootGroupId = plan->id.groupId; } qDebug("level %d group handled, taskNum:%d", i, taskNum); } - QRY_ERR_JRET(qAppendTaskExplainResRows(pCtx, rootGroupId, 0)); - - QRY_ERR_JRET(qGetExplainRspFromCtx(pCtx, pRsp)); + *pCtx = ctx; + + return TSDB_CODE_SUCCESS; _return: - qFreeExplainCtx(pCtx); + qExplainFreeCtx(ctx); QRY_RET(code); } +int32_t qExplainUpdateExecInfo(SExplainCtx *pCtx, SExplainRsp *pRspMsg, int32_t groupId, SRetrieveTableRsp **pRsp) { + +} + + +int32_t qExecStaticExplain(SQueryPlan *pDag, SRetrieveTableRsp **pRsp) { + int32_t code = 0; + SExplainCtx *pCtx = NULL; + + QRY_ERR_RET(qExplainPrepareCtx(pDag, &pCtx)); + + QRY_ERR_JRET(qExplainAppendGroupResRows(pCtx, pCtx->rootGroupId, 0)); + + QRY_ERR_JRET(qExplainGetRspFromCtx(pCtx, pRsp)); + +_return: + + qExplainFreeCtx(pCtx); + + QRY_RET(code); +} + +int32_t qExecExplainBegin(SQueryPlan *pDag, SExplainCtx **pCtx, int32_t startTs) { + QRY_ERR_RET(qExplainPrepareCtx(pDag, pCtx)); + + (*pCtx)->reqStartTs = startTs; + (*pCtx)->jobStartTs = taosGetTimestampMs(); + + return TSDB_CODE_SUCCESS; +} + +int32_t qExecExplainEnd(SExplainCtx *pCtx) { + pCtx->jobDoneTs = taosGetTimestampMs(); + + atomic_store_8((int8_t *)&pCtx->execDone, true); + + return TSDB_CODE_SUCCESS; +} + + diff --git a/source/libs/executor/src/executorMain.c b/source/libs/executor/src/executorMain.c index 6d78fa835a..d7c99d54df 100644 --- a/source/libs/executor/src/executorMain.c +++ b/source/libs/executor/src/executorMain.c @@ -229,3 +229,12 @@ void qDestroyTask(qTaskInfo_t qTaskHandle) { queryCostStatis(pTaskInfo); // print the query cost summary doDestroyTask(pTaskInfo); } + +int32_t qGetExplainExecInfo(qTaskInfo_t tinfo, int32_t *resNum, SExplainExecInfo **pRes) { + SExecTaskInfo *pTaskInfo = (SExecTaskInfo *)tinfo; + int32_t capacity = 0; + + return getOperatorExplainExecInfo(pTaskInfo->pRoot, pRes, &capacity, resNum); +} + + diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 169e47eb33..e423690740 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -9516,3 +9516,18 @@ void releaseQueryBuf(size_t numOfTables) { // restore value is not enough buffer available atomic_add_fetch_64(&tsQueryBufferSizeBytes, t); } + +int32_t getOperatorExplainExecInfo(SOperatorInfo *operator, SExplainExecInfo **pRes, int32_t *capacity, int32_t *resNum) { + if (*resNum >= *capacity) { + *capacity += 10; + + *pRes = taosMemoryRealloc(*pRes, (*capacity) * sizeof(SExplainExecInfo)); + if (NULL == *pRes) { + qError("malloc %d failed", capacity * sizeof(SExplainExecInfo)); + return TSDB_CODE_QRY_OUT_OF_MEMORY; + } + } + +} + + diff --git a/source/libs/nodes/inc/nodesUtil.h b/source/libs/nodes/inc/nodesUtil.h index 976044c16f..c6233ba980 100644 --- a/source/libs/nodes/inc/nodesUtil.h +++ b/source/libs/nodes/inc/nodesUtil.h @@ -31,6 +31,7 @@ extern "C" { #define NODES_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; } return _code; } while (0) #define NODES_ERR_JRET(c) do { code = c; if (code != TSDB_CODE_SUCCESS) { terrno = code; goto _return; } } while (0) + #ifdef __cplusplus } #endif diff --git a/source/libs/nodes/src/nodesUtilFuncs.c b/source/libs/nodes/src/nodesUtilFuncs.c index 732d9acfbe..812fa333b6 100644 --- a/source/libs/nodes/src/nodesUtilFuncs.c +++ b/source/libs/nodes/src/nodesUtilFuncs.c @@ -1023,3 +1023,33 @@ int32_t nodesCollectFuncs(SSelectStmt* pSelect, FFuncClassifier classifier, SNod return TSDB_CODE_SUCCESS; } + + +char *getFillModeString(EFillMode mode) { + switch (mode) { + case FILL_MODE_NONE: + return "none"; + case FILL_MODE_VALUE: + return "value"; + case FILL_MODE_PREV: + return "prev"; + case FILL_MODE_NULL: + return "null"; + case FILL_MODE_LINEAR: + return "linear"; + case FILL_MODE_NEXT: + return "next"; + default: + return "unknown"; + } +} + +char *nodesGetNameFromColumnNode(SNode *pNode) { + if (NULL == pNode || QUERY_NODE_COLUMN != pNode->type) { + return "NULL"; + } + + return ((SColumnNode *)pNode)->colName; +} + + diff --git a/source/libs/qworker/inc/qworkerInt.h b/source/libs/qworker/inc/qworkerInt.h index 573eaed2e6..cfd4a3ec7b 100644 --- a/source/libs/qworker/inc/qworkerInt.h +++ b/source/libs/qworker/inc/qworkerInt.h @@ -107,15 +107,17 @@ typedef struct SQWTaskCtx { SRWLatch lock; int8_t phase; int8_t taskType; + int8_t explain; - bool emptyRes; bool queryFetched; bool queryEnd; bool queryContinue; bool queryInQueue; int32_t rspCode; - SQWConnInfo connInfo; + SQWConnInfo ctrlConnInfo; + SQWConnInfo dataConnInfo; + int8_t events[QW_EVENT_MAX]; qTaskInfo_t taskHandle; diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index 42d10a2361..726d0ebf79 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -432,8 +432,10 @@ int32_t qwKillTaskHandle(QW_FPARAMS_DEF, SQWTaskCtx *ctx) { void qwFreeTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx) { - tmsgReleaseHandle(ctx->connInfo.handle, TAOS_CONN_SERVER); - ctx->connInfo.handle = NULL; + tmsgReleaseHandle(ctx->ctrlConnInfo.handle, TAOS_CONN_SERVER); + ctx->ctrlConnInfo.handle = NULL; + + // NO need to release dataConnInfo qwFreeTaskHandle(QW_FPARAMS(), &ctx->taskHandle); @@ -537,6 +539,26 @@ int32_t qwDropTask(QW_FPARAMS_DEF) { return TSDB_CODE_SUCCESS; } + +int32_t qwHandleTaskComplete(QW_FPARAMS_DEF, SQWTaskCtx *ctx) { + qTaskInfo_t *taskHandle = &ctx->taskHandle; + + if (TASK_TYPE_TEMP == ctx->taskType) { + if (ctx->explain) { + SExplainExecInfo *execInfo = NULL; + int32_t resNum = 0; + QW_ERR_RET(qGetExplainExecInfo(ctx->taskHandle, &resNum, &execInfo)); + + QW_ERR_RET(qwBuildAndSendExplainRsp(&ctx->ctrlConnInfo, &rsp)); + } + + qwFreeTaskHandle(QW_FPARAMS(), taskHandle); + } + + return TSDB_CODE_SUCCESS; +} + + int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryEnd) { int32_t code = 0; bool qcontinue = true; @@ -562,10 +584,8 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryEnd) { QW_TASK_DLOG("qExecTask end with empty res, useconds:%"PRIu64, useconds); dsEndPut(sinkHandle, useconds); - - if (TASK_TYPE_TEMP == ctx->taskType) { - qwFreeTaskHandle(QW_FPARAMS(), taskHandle); - } + + QW_ERR_RET(qwHandleTaskComplete(QW_FPARAMS(), ctx)); if (queryEnd) { *queryEnd = true; @@ -658,19 +678,6 @@ int32_t qwGetResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen, void bool queryEnd = false; int32_t code = 0; - if (ctx->emptyRes) { - QW_TASK_DLOG_E("query end with empty result"); - - qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_SUCCEED); - QW_ERR_RET(qwMallocFetchRsp(len, &rsp)); - - *rspMsg = rsp; - *dataLen = 0; - pOutput->queryEnd = true; - - return TSDB_CODE_SUCCESS; - } - dsGetDataLength(ctx->sinkHandle, &len, &queryEnd); if (len < 0) { @@ -760,12 +767,12 @@ int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inpu } if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) { - dropConnection = &ctx->connInfo; + dropConnection = &ctx->ctrlConnInfo; QW_ERR_JRET(qwDropTask(QW_FPARAMS())); dropConnection = NULL; - qwBuildAndSendDropRsp(&ctx->connInfo, code); - QW_TASK_DLOG("drop rsp send, handle:%p, code:%x - %s", ctx->connInfo.handle, code, tstrerror(code)); + qwBuildAndSendDropRsp(&ctx->ctrlConnInfo, code); + QW_TASK_DLOG("drop rsp send, handle:%p, code:%x - %s", ctx->ctrlConnInfo.handle, code, tstrerror(code)); QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED); break; @@ -798,12 +805,12 @@ int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inpu } if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) { - dropConnection = &ctx->connInfo; + dropConnection = &ctx->ctrlConnInfo; QW_ERR_JRET(qwDropTask(QW_FPARAMS())); dropConnection = NULL; - qwBuildAndSendDropRsp(&ctx->connInfo, code); - QW_TASK_DLOG("drop rsp send, handle:%p, code:%x - %s", ctx->connInfo.handle, code, tstrerror(code)); + qwBuildAndSendDropRsp(&ctx->ctrlConnInfo, code); + QW_TASK_DLOG("drop rsp send, handle:%p, code:%x - %s", ctx->ctrlConnInfo.handle, code, tstrerror(code)); QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED); } @@ -863,17 +870,13 @@ int32_t qwHandlePostPhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inp } if (QW_PHASE_POST_QUERY == phase) { - if (NULL == ctx->taskHandle && NULL == ctx->sinkHandle) { - ctx->emptyRes = true; - } - #if 0 if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_READY)) { readyConnection = &ctx->connInfo; QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_READY); } #else - connInfo.handle = ctx->connInfo.handle; + connInfo.handle = ctx->ctrlConnInfo.handle; readyConnection = &connInfo; QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_READY); @@ -886,8 +889,8 @@ int32_t qwHandlePostPhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inp QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR); } - qwBuildAndSendDropRsp(&ctx->connInfo, code); - QW_TASK_DLOG("drop rsp send, handle:%p, code:%x - %s", ctx->connInfo.handle, code, tstrerror(code)); + qwBuildAndSendDropRsp(&ctx->ctrlConnInfo, code); + QW_TASK_DLOG("drop rsp send, handle:%p, code:%x - %s", ctx->ctrlConnInfo.handle, code, tstrerror(code)); QW_ERR_JRET(qwDropTask(QW_FPARAMS())); QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED); @@ -931,7 +934,7 @@ _return: QW_RET(code); } -int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, int8_t taskType) { +int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, int8_t taskType, int8_t explain) { int32_t code = 0; bool queryRsped = false; struct SSubplan *plan = NULL; @@ -947,9 +950,10 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, int8_t taskType) { QW_ERR_JRET(qwGetTaskCtx(QW_FPARAMS(), &ctx)); atomic_store_8(&ctx->taskType, taskType); + atomic_store_8(&ctx->explain, explain); - atomic_store_ptr(&ctx->connInfo.handle, qwMsg->connInfo.handle); - atomic_store_ptr(&ctx->connInfo.ahandle, qwMsg->connInfo.ahandle); + atomic_store_ptr(&ctx->ctrlConnInfo.handle, qwMsg->connInfo.handle); + atomic_store_ptr(&ctx->ctrlConnInfo.ahandle, qwMsg->connInfo.ahandle); QW_TASK_DLOGL("subplan json string, len:%d, %s", qwMsg->msgLen, qwMsg->msg); @@ -1011,8 +1015,8 @@ int32_t qwProcessReady(QW_FPARAMS_DEF, SQWMsg *qwMsg) { } if (ctx->phase == QW_PHASE_PRE_QUERY) { - ctx->connInfo.handle == qwMsg->connInfo.handle; - ctx->connInfo.ahandle = qwMsg->connInfo.ahandle; + ctx->ctrlConnInfo.handle == qwMsg->connInfo.handle; + ctx->ctrlConnInfo.ahandle = qwMsg->connInfo.ahandle; QW_SET_EVENT_RECEIVED(ctx, QW_EVENT_READY); needRsp = false; QW_TASK_DLOG_E("ready msg will not rsp now"); @@ -1089,10 +1093,13 @@ int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) { if (rsp) { bool qComplete = (DS_BUF_EMPTY == sOutput.bufStatus && sOutput.queryEnd); + qwBuildFetchRsp(rsp, &sOutput, dataLen, qComplete); - atomic_store_8((int8_t*)&ctx->queryEnd, qComplete); + if (qComplete) { + atomic_store_8((int8_t*)&ctx->queryEnd, true); + } - qwMsg->connInfo = ctx->connInfo; + qwMsg->connInfo = ctx->dataConnInfo; QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_FETCH); qwBuildAndSendFetchRsp(&qwMsg->connInfo, rsp, dataLen, code); @@ -1113,7 +1120,7 @@ _return: qwFreeFetchRsp(rsp); rsp = NULL; - qwMsg->connInfo = ctx->connInfo; + qwMsg->connInfo = ctx->dataConnInfo; qwBuildAndSendFetchRsp(&qwMsg->connInfo, rsp, 0, code); QW_TASK_DLOG("fetch rsp send, handle:%p, code:%x - %s, dataLen:%d", qwMsg->connInfo.handle, code, tstrerror(code), 0); } @@ -1151,14 +1158,17 @@ int32_t qwProcessFetch(QW_FPARAMS_DEF, SQWMsg *qwMsg) { QW_ERR_JRET(qwGetResFromSink(QW_FPARAMS(), ctx, &dataLen, &rsp, &sOutput)); if (NULL == rsp) { - atomic_store_ptr(&ctx->connInfo.handle, qwMsg->connInfo.handle); - atomic_store_ptr(&ctx->connInfo.ahandle, qwMsg->connInfo.ahandle); + atomic_store_ptr(&ctx->dataConnInfo.handle, qwMsg->connInfo.handle); + atomic_store_ptr(&ctx->dataConnInfo.ahandle, qwMsg->connInfo.ahandle); QW_SET_EVENT_RECEIVED(ctx, QW_EVENT_FETCH); } else { bool qComplete = (DS_BUF_EMPTY == sOutput.bufStatus && sOutput.queryEnd); + qwBuildFetchRsp(rsp, &sOutput, dataLen, qComplete); - atomic_store_8((int8_t*)&ctx->queryEnd, qComplete); + if (qComplete) { + atomic_store_8((int8_t*)&ctx->queryEnd, true); + } } if ((!sOutput.queryEnd) && (DS_BUF_LOW == sOutput.bufStatus || DS_BUF_EMPTY == sOutput.bufStatus)) { @@ -1236,8 +1246,8 @@ int32_t qwProcessDrop(QW_FPARAMS_DEF, SQWMsg *qwMsg) { } if (!rsped) { - ctx->connInfo.handle = qwMsg->connInfo.handle; - ctx->connInfo.ahandle = qwMsg->connInfo.ahandle; + ctx->ctrlConnInfo.handle = qwMsg->connInfo.handle; + ctx->ctrlConnInfo.ahandle = qwMsg->connInfo.ahandle; QW_SET_EVENT_RECEIVED(ctx, QW_EVENT_DROP); } diff --git a/source/libs/qworker/src/qworkerMsg.c b/source/libs/qworker/src/qworkerMsg.c index 80ae013653..c9779f9252 100644 --- a/source/libs/qworker/src/qworkerMsg.c +++ b/source/libs/qworker/src/qworkerMsg.c @@ -327,7 +327,7 @@ int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { QW_SCH_TASK_DLOG("processQuery start, node:%p, handle:%p, sql:%s", node, pMsg->handle, sql); taosMemoryFreeClear(sql); - QW_ERR_RET(qwProcessQuery(QW_FPARAMS(), &qwMsg, msg->taskType)); + QW_ERR_RET(qwProcessQuery(QW_FPARAMS(), &qwMsg, msg->taskType, msg->explain)); QW_SCH_TASK_DLOG("processQuery end, node:%p", node); diff --git a/source/libs/scheduler/inc/schedulerInt.h b/source/libs/scheduler/inc/schedulerInt.h index 22bd039219..62a96b6438 100644 --- a/source/libs/scheduler/inc/schedulerInt.h +++ b/source/libs/scheduler/inc/schedulerInt.h @@ -26,6 +26,7 @@ extern "C" { #include "scheduler.h" #include "thash.h" #include "trpc.h" +#include "command.h" #define SCHEDULE_DEFAULT_MAX_JOB_NUM 1000 #define SCHEDULE_DEFAULT_MAX_TASK_NUM 1000 @@ -165,6 +166,7 @@ typedef struct SSchJob { SHashObj *succTasks; // succeed tasks, key:taskid, value:SQueryTask* SHashObj *failTasks; // failed tasks, key:taskid, value:SQueryTask* + SExplainCtx *explainCtx; int8_t status; SQueryNodeAddr resNode; tsem_t rspSem; @@ -211,6 +213,7 @@ extern SSchedulerMgmt schMgmt; #define SCH_JOB_NEED_FETCH(_job) SCH_IS_QUERY_JOB(_job) #define SCH_IS_WAIT_ALL_JOB(_job) (!SCH_IS_QUERY_JOB(_job)) #define SCH_IS_NEED_DROP_JOB(_job) (SCH_IS_QUERY_JOB(_job)) +#define SCH_IS_EXPLAIN_JOB(_job) (EXPLAIN_MODE_ANALYZE == (_job)->attr.explainMode) #define SCH_IS_LEVEL_UNFINISHED(_level) ((_level)->taskLaunchedNum < (_level)->taskNum) #define SCH_GET_CUR_EP(_addr) (&(_addr)->epSet.eps[(_addr)->epSet.inUse]) @@ -251,6 +254,8 @@ int32_t schFetchFromRemote(SSchJob *pJob); int32_t schProcessOnTaskFailure(SSchJob *pJob, SSchTask *pTask, int32_t errCode); int32_t schBuildAndSendHbMsg(SQueryNodeEpId *nodeEpId); int32_t schCloneSMsgSendInfo(void *src, void **dst); +int32_t schValidateAndBuildJob(SQueryPlan *pDag, SSchJob *pJob); +void schFreeJobImpl(void *job); #ifdef __cplusplus diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index 9c5019aedc..658d924f38 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -67,6 +67,81 @@ int32_t schInitTask(SSchJob *pJob, SSchTask *pTask, SSubplan *pPlan, SSchLevel * return TSDB_CODE_SUCCESS; } +int32_t schInitJob(SSchJob **pSchJob, SQueryPlan *pDag, void *transport, SArray *pNodeList, const char *sql, + int64_t startTs, bool syncSchedule) { + int32_t code = 0; + SSchJob *pJob = taosMemoryCalloc(1, sizeof(SSchJob)); + if (NULL == pJob) { + qError("QID:%" PRIx64 " calloc %d failed", pDag->queryId, (int32_t)sizeof(SSchJob)); + SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); + } + + pJob->attr.explainMode = pDag->explainInfo.mode; + pJob->attr.syncSchedule = syncSchedule; + pJob->transport = transport; + pJob->sql = sql; + + if (pNodeList != NULL) { + pJob->nodeList = taosArrayDup(pNodeList); + } + + SCH_ERR_JRET(schValidateAndBuildJob(pDag, pJob)); + + if (SCH_IS_EXPLAIN_JOB(pJob)) { + SCH_ERR_JRET(qExecExplainBegin(pDag, &pJob->explainCtx, startTs)); + } + + pJob->execTasks = + taosHashInit(pDag->numOfSubplans, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK); + if (NULL == pJob->execTasks) { + SCH_JOB_ELOG("taosHashInit %d execTasks failed", pDag->numOfSubplans); + SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); + } + + pJob->succTasks = + taosHashInit(pDag->numOfSubplans, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK); + if (NULL == pJob->succTasks) { + SCH_JOB_ELOG("taosHashInit %d succTasks failed", pDag->numOfSubplans); + SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); + } + + pJob->failTasks = + taosHashInit(pDag->numOfSubplans, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK); + if (NULL == pJob->failTasks) { + SCH_JOB_ELOG("taosHashInit %d failTasks failed", pDag->numOfSubplans); + SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); + } + + tsem_init(&pJob->rspSem, 0, 0); + + int64_t refId = taosAddRef(schMgmt.jobRef, pJob); + if (refId < 0) { + SCH_JOB_ELOG("taosAddRef job failed, error:%s", tstrerror(terrno)); + SCH_ERR_JRET(terrno); + } + + if (NULL == schAcquireJob(refId)) { + SCH_JOB_ELOG("schAcquireJob job failed, refId:%" PRIx64, refId); + SCH_RET(TSDB_CODE_SCH_STATUS_ERROR); + } + + pJob->refId = refId; + + SCH_JOB_DLOG("job refId:%" PRIx64, pJob->refId); + + pJob->status = JOB_TASK_STATUS_NOT_START; + + *pSchJob = pJob; + + return TSDB_CODE_SUCCESS; + +_return: + + schFreeJobImpl(pJob); + SCH_RET(code); +} + + void schFreeRpcCtx(SRpcCtx *pCtx) { if (NULL == pCtx) { return; @@ -1050,6 +1125,40 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch break; } + case TDMT_VND_EXPLAIN_RSP: { + SExplainRsp *taskRsp = (SExplainRsp *)msg; + + SCH_ERR_JRET(rspCode); + if (NULL == msg) { + SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT); + } + + if (!SCH_IS_EXPLAIN_JOB(pJob)) { + SCH_TASK_ELOG("invalid msg received for none explain query, msg type:%s", TMSG_INFO(msgType)); + SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT); + } + + if (pJob->resData) { + SCH_TASK_ELOG("explain result is already generated, res:%p", pJob->resData); + taosMemoryFreeClear(taskRsp); + SCH_ERR_JRET(TSDB_CODE_SCH_STATUS_ERROR); + } + + SRetrieveTableRsp *pRsp = NULL; + SCH_ERR_JRET(qExplainUpdateExecInfo(pJob->explainCtx, taskRsp, pTask->plan->id.groupId, &pRsp)); + + if (pRsp) { + atomic_store_ptr(&pJob->resData, pRsp); + atomic_store_32(&pJob->resNumOfRows, htonl(pRsp->numOfRows)); + + SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_SUCCEED); + + SCH_TASK_DLOG("got explain rsp, rows:%d, complete:%d", htonl(pRsp->numOfRows), pRsp->completed); + + schProcessOnDataFetched(pJob); + } + break; + } case TDMT_VND_FETCH_RSP: { SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)msg; @@ -1058,6 +1167,17 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT); } + if (SCH_IS_EXPLAIN_JOB(pJob)) { + if (rsp->completed) { + SCH_ERR_JRET(qExecExplainEnd(pJob->explainCtx)); + return TSDB_CODE_SUCCESS; + } + + SCH_ERR_JRET(schFetchFromRemote(pJob)); + + return TSDB_CODE_SUCCESS; + } + if (pJob->resData) { SCH_TASK_ELOG("got fetch rsp while res already exists, res:%p", pJob->resData); taosMemoryFreeClear(rsp); @@ -1767,6 +1887,7 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, pMsg->taskId = htobe64(pTask->taskId); pMsg->refId = htobe64(pJob->refId); pMsg->taskType = TASK_TYPE_TEMP; + pMsg->explain = SCH_IS_EXPLAIN_JOB(pJob); pMsg->phyLen = htonl(pTask->msgLen); pMsg->sqlLen = htonl(len); @@ -2083,6 +2204,8 @@ void schFreeJobImpl(void *job) { taosArrayDestroy(pJob->levels); taosArrayDestroy(pJob->nodeList); + qExplainFreeCtx(pJob->explainCtx); + taosMemoryFreeClear(pJob->resData); taosMemoryFreeClear(pJob); @@ -2090,70 +2213,17 @@ void schFreeJobImpl(void *job) { } static int32_t schExecJobImpl(void *transport, SArray *pNodeList, SQueryPlan *pDag, int64_t *job, const char *sql, - bool syncSchedule) { + int64_t startTs, bool syncSchedule) { qDebug("QID:0x%" PRIx64 " job started", pDag->queryId); - if (pNodeList == NULL || (pNodeList && taosArrayGetSize(pNodeList) <= 0)) { + if (pNodeList == NULL || taosArrayGetSize(pNodeList) <= 0) { qDebug("QID:0x%" PRIx64 " input exec nodeList is empty", pDag->queryId); } int32_t code = 0; - SSchJob *pJob = taosMemoryCalloc(1, sizeof(SSchJob)); - if (NULL == pJob) { - qError("QID:%" PRIx64 " calloc %d failed", pDag->queryId, (int32_t)sizeof(SSchJob)); - SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); - } + SSchJob *pJob = NULL; + SCH_ERR_JRET(schInitJob(&pJob, pDag, transport, pNodeList, sql, startTs, syncSchedule)); - pJob->attr.explainMode = pDag->explainInfo.mode; - pJob->attr.syncSchedule = syncSchedule; - pJob->transport = transport; - pJob->sql = sql; - - if (pNodeList != NULL) { - pJob->nodeList = taosArrayDup(pNodeList); - } - - SCH_ERR_JRET(schValidateAndBuildJob(pDag, pJob)); - - pJob->execTasks = - taosHashInit(pDag->numOfSubplans, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK); - if (NULL == pJob->execTasks) { - SCH_JOB_ELOG("taosHashInit %d execTasks failed", pDag->numOfSubplans); - SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); - } - - pJob->succTasks = - taosHashInit(pDag->numOfSubplans, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK); - if (NULL == pJob->succTasks) { - SCH_JOB_ELOG("taosHashInit %d succTasks failed", pDag->numOfSubplans); - SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); - } - - pJob->failTasks = - taosHashInit(pDag->numOfSubplans, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK); - if (NULL == pJob->failTasks) { - SCH_JOB_ELOG("taosHashInit %d failTasks failed", pDag->numOfSubplans); - SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); - } - - tsem_init(&pJob->rspSem, 0, 0); - - int64_t refId = taosAddRef(schMgmt.jobRef, pJob); - if (refId < 0) { - SCH_JOB_ELOG("taosAddRef job failed, error:%s", tstrerror(terrno)); - SCH_ERR_JRET(terrno); - } - - if (NULL == schAcquireJob(refId)) { - SCH_JOB_ELOG("schAcquireJob job failed, refId:%" PRIx64, refId); - SCH_RET(TSDB_CODE_SCH_STATUS_ERROR); - } - - pJob->refId = refId; - - SCH_JOB_DLOG("job refId:%" PRIx64, pJob->refId); - - pJob->status = JOB_TASK_STATUS_NOT_START; SCH_ERR_JRET(schLaunchJob(pJob)); *job = pJob->refId; @@ -2266,7 +2336,7 @@ int32_t schedulerInit(SSchedulerCfg *cfg) { } int32_t schedulerExecJob(void *transport, SArray *nodeList, SQueryPlan *pDag, int64_t *pJob, const char *sql, - SQueryResult *pRes) { + int64_t startTs, SQueryResult *pRes) { if (NULL == transport || NULL == pDag || NULL == pDag->pSubplans || NULL == pJob || NULL == pRes) { SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); } @@ -2274,7 +2344,7 @@ int32_t schedulerExecJob(void *transport, SArray *nodeList, SQueryPlan *pDag, in if (EXPLAIN_MODE_STATIC == pDag->explainInfo.mode) { SCH_ERR_RET(schExecStaticExplain(transport, nodeList, pDag, pJob, sql, true)); } else { - SCH_ERR_RET(schExecJobImpl(transport, nodeList, pDag, pJob, sql, true)); + SCH_ERR_RET(schExecJobImpl(transport, nodeList, pDag, pJob, sql, startTs, true)); } SSchJob *job = schAcquireJob(*pJob); @@ -2292,7 +2362,11 @@ int32_t schedulerAsyncExecJob(void *transport, SArray *pNodeList, SQueryPlan *pD SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); } - SCH_ERR_RET(schExecJobImpl(transport, pNodeList, pDag, pJob, sql, false)); + if (EXPLAIN_MODE_STATIC == pDag->explainInfo.mode) { + SCH_ERR_RET(schExecStaticExplain(transport, pNodeList, pDag, pJob, sql, false)); + } else { + SCH_ERR_RET(schExecJobImpl(transport, pNodeList, pDag, pJob, sql, 0, false)); + } return TSDB_CODE_SUCCESS; } diff --git a/source/libs/scheduler/test/schedulerTests.cpp b/source/libs/scheduler/test/schedulerTests.cpp index cf04b06579..5d9322a575 100644 --- a/source/libs/scheduler/test/schedulerTests.cpp +++ b/source/libs/scheduler/test/schedulerTests.cpp @@ -951,7 +951,7 @@ TEST(insertTest, normalCase) { taosThreadCreate(&(thread1), &thattr, schtSendRsp, &insertJobRefId); SQueryResult res = {0}; - code = schedulerExecJob(mockPointer, qnodeList, &dag, &insertJobRefId, "insert into tb values(now,1)", &res); + code = schedulerExecJob(mockPointer, qnodeList, &dag, &insertJobRefId, "insert into tb values(now,1)", 0, &res); ASSERT_EQ(code, 0); ASSERT_EQ(res.numOfRows, 20);