diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 8693061d12..4f041518d5 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -1412,6 +1412,14 @@ typedef struct { SExplainExecInfo* subplanInfo; } SExplainRsp; +typedef struct { + SExplainRsp rsp; + uint64_t qId; + uint64_t tId; + int64_t rId; + int32_t eId; +} SExplainLocalRsp; + typedef struct STableScanAnalyzeInfo { uint64_t totalRows; uint64_t totalCheckedRows; @@ -1426,6 +1434,7 @@ typedef struct STableScanAnalyzeInfo { int32_t tSerializeSExplainRsp(void* buf, int32_t bufLen, SExplainRsp* pRsp); int32_t tDeserializeSExplainRsp(void* buf, int32_t bufLen, SExplainRsp* pRsp); +void tFreeSExplainRsp(SExplainRsp *pRsp); typedef struct { char fqdn[TSDB_FQDN_LEN]; // end point, hostname:port diff --git a/include/libs/executor/executor.h b/include/libs/executor/executor.h index 37f89caa41..86ae5ef155 100644 --- a/include/libs/executor/executor.h +++ b/include/libs/executor/executor.h @@ -29,11 +29,12 @@ typedef void* DataSinkHandle; struct SRpcMsg; struct SSubplan; -typedef int32_t (*localFetchFp)(void *handle, uint64_t sId, uint64_t qId, uint64_t tId, int64_t rId, int32_t eId, void** pRsp); +typedef int32_t (*localFetchFp)(void *, uint64_t, uint64_t, uint64_t, int64_t, int32_t, void**, SArray*); typedef struct { - void *handle; + void *handle; localFetchFp fp; + SArray *explainRes; } SLocalFetch; typedef struct { diff --git a/include/libs/qworker/qworker.h b/include/libs/qworker/qworker.h index 6062ffa3ef..99f5189228 100644 --- a/include/libs/qworker/qworker.h +++ b/include/libs/qworker/qworker.h @@ -99,9 +99,9 @@ void qWorkerDestroy(void **qWorkerMgmt); int32_t qWorkerGetStat(SReadHandle *handle, void *qWorkerMgmt, SQWorkerStat *pStat); -int32_t qWorkerProcessLocalQuery(void *pMgmt, uint64_t sId, uint64_t qId, uint64_t tId, int64_t rId, int32_t eId, SQWMsg *qwMsg); +int32_t qWorkerProcessLocalQuery(void *pMgmt, uint64_t sId, uint64_t qId, uint64_t tId, int64_t rId, int32_t eId, SQWMsg *qwMsg, SArray *explainRes); -int32_t qWorkerProcessLocalFetch(void *pMgmt, uint64_t sId, uint64_t qId, uint64_t tId, int64_t rId, int32_t eId, void** pRsp); +int32_t qWorkerProcessLocalFetch(void *pMgmt, uint64_t sId, uint64_t qId, uint64_t tId, int64_t rId, int32_t eId, void** pRsp, SArray* explainRes); #ifdef __cplusplus } diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index b056677a03..3b29c17883 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -4334,7 +4334,7 @@ int32_t tDeserializeSExplainRsp(void *buf, int32_t bufLen, SExplainRsp *pRsp) { if (tStartDecode(&decoder) < 0) return -1; if (tDecodeI32(&decoder, &pRsp->numOfPlans) < 0) return -1; if (pRsp->numOfPlans > 0) { - pRsp->subplanInfo = taosMemoryMalloc(pRsp->numOfPlans * sizeof(SExplainExecInfo)); + pRsp->subplanInfo = taosMemoryCalloc(pRsp->numOfPlans, sizeof(SExplainExecInfo)); if (pRsp->subplanInfo == NULL) return -1; } for (int32_t i = 0; i < pRsp->numOfPlans; ++i) { @@ -4342,7 +4342,7 @@ int32_t tDeserializeSExplainRsp(void *buf, int32_t bufLen, SExplainRsp *pRsp) { if (tDecodeDouble(&decoder, &pRsp->subplanInfo[i].totalCost) < 0) return -1; if (tDecodeU64(&decoder, &pRsp->subplanInfo[i].numOfRows) < 0) return -1; if (tDecodeU32(&decoder, &pRsp->subplanInfo[i].verboseLen) < 0) return -1; - if (tDecodeBinary(&decoder, (uint8_t **)&pRsp->subplanInfo[i].verboseInfo, &pRsp->subplanInfo[i].verboseLen) < 0) + if (tDecodeBinaryAlloc(&decoder, &pRsp->subplanInfo[i].verboseInfo, NULL) < 0) return -1; } @@ -4352,6 +4352,19 @@ int32_t tDeserializeSExplainRsp(void *buf, int32_t bufLen, SExplainRsp *pRsp) { return 0; } +void tFreeSExplainRsp(SExplainRsp *pRsp) { + if (NULL == pRsp) { + return; + } + + for (int32_t i = 0; i < pRsp->numOfPlans; ++i) { + SExplainExecInfo *pExec = pRsp->subplanInfo + i; + taosMemoryFree(pExec->verboseInfo); + } + + taosMemoryFreeClear(pRsp->subplanInfo); +} + int32_t tSerializeSSchedulerHbReq(void *buf, int32_t bufLen, SSchedulerHbReq *pReq) { int32_t headLen = sizeof(SMsgHead); if (buf != NULL) { diff --git a/source/libs/command/src/explain.c b/source/libs/command/src/explain.c index 967c682b0b..618bbb7593 100644 --- a/source/libs/command/src/explain.c +++ b/source/libs/command/src/explain.c @@ -56,7 +56,7 @@ void qExplainFreeCtx(SExplainCtx *pCtx) { int32_t num = taosArrayGetSize(group->nodeExecInfo); for (int32_t i = 0; i < num; ++i) { SExplainRsp *rsp = taosArrayGet(group->nodeExecInfo, i); - taosMemoryFreeClear(rsp->subplanInfo); + tFreeSExplainRsp(rsp); } } @@ -1723,7 +1723,7 @@ int32_t qExplainUpdateExecInfo(SExplainCtx *pCtx, SExplainRsp *pRspMsg, int32_t SExplainGroup *group = taosHashGet(ctx->groupHash, &groupId, sizeof(groupId)); if (NULL == group) { qError("group %d not in groupHash", groupId); - taosMemoryFreeClear(pRspMsg->subplanInfo); + tFreeSExplainRsp(pRspMsg); QRY_ERR_RET(TSDB_CODE_QRY_APP_ERROR); } @@ -1732,7 +1732,7 @@ int32_t qExplainUpdateExecInfo(SExplainCtx *pCtx, SExplainRsp *pRspMsg, int32_t group->nodeExecInfo = taosArrayInit(group->nodeNum, sizeof(SExplainRsp)); if (NULL == group->nodeExecInfo) { qError("taosArrayInit %d explainExecInfo failed", group->nodeNum); - taosMemoryFreeClear(pRspMsg->subplanInfo); + tFreeSExplainRsp(pRspMsg); taosWUnLockLatch(&group->lock); QRY_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); @@ -1742,7 +1742,7 @@ int32_t qExplainUpdateExecInfo(SExplainCtx *pCtx, SExplainRsp *pRspMsg, int32_t } else if (taosArrayGetSize(group->nodeExecInfo) >= group->nodeNum) { qError("group execInfo already full, size:%d, nodeNum:%d", (int32_t)taosArrayGetSize(group->nodeExecInfo), group->nodeNum); - taosMemoryFreeClear(pRspMsg->subplanInfo); + tFreeSExplainRsp(pRspMsg); taosWUnLockLatch(&group->lock); QRY_ERR_RET(TSDB_CODE_QRY_APP_ERROR); @@ -1751,13 +1751,14 @@ int32_t qExplainUpdateExecInfo(SExplainCtx *pCtx, SExplainRsp *pRspMsg, int32_t if (group->physiPlanExecNum != pRspMsg->numOfPlans) { qError("physiPlanExecNum %d mismatch with others %d in group %d", pRspMsg->numOfPlans, group->physiPlanExecNum, groupId); - taosMemoryFreeClear(pRspMsg->subplanInfo); + tFreeSExplainRsp(pRspMsg); taosWUnLockLatch(&group->lock); QRY_ERR_RET(TSDB_CODE_QRY_APP_ERROR); } taosArrayPush(group->nodeExecInfo, pRspMsg); + groupDone = (taosArrayGetSize(group->nodeExecInfo) >= group->nodeNum); taosWUnLockLatch(&group->lock); diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 93303cb62d..d0f0a98e94 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -455,8 +455,7 @@ int32_t qExecTaskOpt(qTaskInfo_t tinfo, SArray* pResList, uint64_t* useconds, SL int64_t threadId = taosGetSelfPthreadId(); if (pLocal) { - pTaskInfo->localFetch.handle = pLocal->handle; - pTaskInfo->localFetch.fp = pLocal->fp; + memcpy(&pTaskInfo->localFetch, pLocal, sizeof(*pLocal)); } taosArrayClearEx(pResList, freeBlock); diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index a1f3e5a4a5..dd00684ed1 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -2005,7 +2005,7 @@ static int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInf if (pSource->localExec) { SDataBuf pBuf = {0}; - int32_t code = (*pTaskInfo->localFetch.fp)(pTaskInfo->localFetch.handle, pSource->schedId, pTaskInfo->id.queryId, pSource->taskId, 0, pSource->execId, &pBuf.pData); + int32_t code = (*pTaskInfo->localFetch.fp)(pTaskInfo->localFetch.handle, pSource->schedId, pTaskInfo->id.queryId, pSource->taskId, 0, pSource->execId, &pBuf.pData, pTaskInfo->localFetch.explainRes); loadRemoteDataCallback(pWrapper, &pBuf, code); } else { qDebug("%s build fetch msg and send to vgId:%d, ep:%s, taskId:0x%" PRIx64 ", execId:%d, %d/%" PRIzu, diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index 00588977bc..a5eab73086 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -58,7 +58,17 @@ int32_t qwHandleTaskComplete(QW_FPARAMS_DEF, SQWTaskCtx *ctx) { QW_ERR_RET(qGetExplainExecInfo(taskHandle, execInfoList)); if (ctx->localExec) { - + SExplainLocalRsp localRsp = {0}; + localRsp.rsp.numOfPlans = taosArrayGetSize(execInfoList); + SExplainExecInfo *pExec = taosMemoryCalloc(localRsp.rsp.numOfPlans, sizeof(SExplainExecInfo)); + memcpy(pExec, taosArrayGet(execInfoList, 0), localRsp.rsp.numOfPlans * sizeof(SExplainExecInfo)); + localRsp.rsp.subplanInfo = pExec; + localRsp.qId = qId; + localRsp.tId = tId; + localRsp.rId = rId; + localRsp.eId = eId; + taosArrayPush(ctx->explainRes, &localRsp); + taosArrayDestroy(execInfoList); } else { SRpcHandleInfo connInfo = ctx->ctrlConnInfo; connInfo.ahandle = NULL; @@ -84,7 +94,7 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryStop) { int32_t execNum = 0; qTaskInfo_t taskHandle = ctx->taskHandle; DataSinkHandle sinkHandle = ctx->sinkHandle; - SLocalFetch localFetch = {(void*)mgmt, qWorkerProcessLocalFetch}; + SLocalFetch localFetch = {(void*)mgmt, qWorkerProcessLocalFetch, ctx->explainRes}; SArray *pResList = taosArrayInit(4, POINTER_BYTES); while (true) { diff --git a/source/libs/scheduler/inc/schInt.h b/source/libs/scheduler/inc/schInt.h index 01d962a71f..e00cda399b 100644 --- a/source/libs/scheduler/inc/schInt.h +++ b/source/libs/scheduler/inc/schInt.h @@ -505,6 +505,7 @@ int32_t schHandleJobFailure(SSchJob *pJob, int32_t errCode); int32_t schHandleJobDrop(SSchJob *pJob, int32_t errCode); bool schChkCurrentOp(SSchJob *pJob, int32_t op, int8_t sync); int32_t schProcessFetchRsp(SSchJob *pJob, SSchTask *pTask, char *msg, int32_t rspCode); +int32_t schProcessExplainRsp(SSchJob *pJob, SSchTask *pTask, SExplainRsp *rsp); extern SSchDebug gSCHDebug; diff --git a/source/libs/scheduler/src/schRemote.c b/source/libs/scheduler/src/schRemote.c index d79514a122..b2b824c33d 100644 --- a/source/libs/scheduler/src/schRemote.c +++ b/source/libs/scheduler/src/schRemote.c @@ -128,10 +128,10 @@ _return: int32_t schProcessExplainRsp(SSchJob *pJob, SSchTask *pTask, SExplainRsp *rsp) { SRetrieveTableRsp *pRsp = NULL; - SCH_ERR_JRET(qExplainUpdateExecInfo(pJob->explainCtx, rsp, pTask->plan->id.groupId, &pRsp)); + SCH_ERR_RET(qExplainUpdateExecInfo(pJob->explainCtx, rsp, pTask->plan->id.groupId, &pRsp)); if (pRsp) { - SCH_ERR_JRET(schProcessOnExplainDone(pJob, pTask, pRsp)); + SCH_ERR_RET(schProcessOnExplainDone(pJob, pTask, pRsp)); } return TSDB_CODE_SUCCESS; @@ -366,7 +366,7 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t execId, SDa SExplainRsp rsp = {0}; if (tDeserializeSExplainRsp(msg, msgSize, &rsp)) { - taosMemoryFree(rsp.subplanInfo); + tFreeSExplainRsp(&rsp); SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); } diff --git a/source/libs/scheduler/src/schTask.c b/source/libs/scheduler/src/schTask.c index 3cecbc9ae3..05c24288c4 100644 --- a/source/libs/scheduler/src/schTask.c +++ b/source/libs/scheduler/src/schTask.c @@ -824,6 +824,53 @@ int32_t schProcessOnTaskStatusRsp(SQueryNodeEpId *pEpId, SArray *pStatusList) { return TSDB_CODE_SUCCESS; } +int32_t schHandleExplainRes(SArray *pExplainRes) { + int32_t code = 0; + int32_t resNum = taosArrayGetSize(pExplainRes); + if (resNum <= 0) { + goto _return; + } + + SSchTask *pTask = NULL; + SSchJob *pJob = NULL; + + for (int32_t i = 0; i < resNum; ++i) { + SExplainLocalRsp* localRsp = taosArrayGet(pExplainRes, i); + + qDebug("QID:0x%" PRIx64 ",TID:0x%" PRIx64 ", begin to handle LOCAL explain rsp msg"); + + SCH_ERR_JRET(schProcessOnCbBegin(&pJob, &pTask, localRsp->qId, localRsp->rId, localRsp->tId)); + + code = schProcessExplainRsp(pJob, pTask, &localRsp->rsp); + + if (pTask) { + SCH_UNLOCK_TASK(pTask); + } + + if (pJob) { + schReleaseJob(pJob->refId); + } + + qDebug("QID:0x%" PRIx64 ",TID:0x%" PRIx64 ", end to handle LOCAL explain rsp msg"); + + SCH_ERR_JRET(code); + + localRsp->rsp.numOfPlans = 0; + localRsp->rsp.subplanInfo = NULL; + } + +_return: + + for (int32_t i = 0; i < resNum; ++i) { + SExplainLocalRsp* localRsp = taosArrayGet(pExplainRes, i); + tFreeSExplainRsp(&localRsp->rsp); + } + + taosArrayDestroy(pExplainRes); + + SCH_RET(code); +} + int32_t schLaunchRemoteTask(SSchJob *pJob, SSchTask *pTask) { SSubplan *plan = pTask->plan; int32_t code = 0; @@ -864,11 +911,15 @@ int32_t schLaunchLocalTask(SSchJob *pJob, SSchTask *pTask) { qwMsg.connInfo.handle = pJob->conn.pTrans; if (SCH_IS_EXPLAIN_JOB(pJob)) { - explainRes = taosArrayInit(pJob->taskNum, POINTER_BYTES); + explainRes = taosArrayInit(pJob->taskNum, sizeof(SExplainLocalRsp)); } SCH_ERR_RET(qWorkerProcessLocalQuery(schMgmt.queryMgmt, schMgmt.sId, pJob->queryId, pTask->taskId, pJob->refId, pTask->execId, &qwMsg, explainRes)); + if (SCH_IS_EXPLAIN_JOB(pJob)) { + SCH_ERR_RET(schHandleExplainRes(explainRes)); + } + SCH_RET(schProcessOnTaskSuccess(pJob, pTask)); } @@ -1008,10 +1059,14 @@ int32_t schExecLocalFetch(SSchJob *pJob, SSchTask *pTask) { SArray *explainRes = NULL; if (SCH_IS_EXPLAIN_JOB(pJob)) { - explainRes = taosArrayInit(pJob->taskNum, POINTER_BYTES); + explainRes = taosArrayInit(pJob->taskNum, sizeof(SExplainLocalRsp)); } SCH_ERR_RET(qWorkerProcessLocalFetch(schMgmt.queryMgmt, schMgmt.sId, pJob->queryId, pTask->taskId, pJob->refId, pTask->execId, &pRsp, explainRes)); + + if (SCH_IS_EXPLAIN_JOB(pJob)) { + SCH_ERR_RET(schHandleExplainRes(explainRes)); + } SCH_ERR_RET(schProcessFetchRsp(pJob, pTask, pRsp, TSDB_CODE_SUCCESS));