fix a bug

This commit is contained in:
Hongze Cheng 2022-04-15 07:49:33 +00:00
parent 98b98c8b85
commit bc178c247d
1 changed files with 37 additions and 38 deletions

View File

@ -14,12 +14,12 @@
*/
#include "catalog.h"
#include "command.h"
#include "query.h"
#include "schedulerInt.h"
#include "tmsg.h"
#include "tref.h"
#include "trpc.h"
#include "command.h"
SSchedulerMgmt schMgmt = {0};
@ -68,8 +68,8 @@ int32_t schInitTask(SSchJob *pJob, SSchTask *pTask, SSubplan *pPlan, SSchLevel *
}
int32_t schInitJob(SSchJob **pSchJob, SQueryPlan *pDag, void *transport, SArray *pNodeList, const char *sql,
int64_t startTs, bool syncSchedule) {
int32_t code = 0;
int64_t startTs, bool syncSchedule) {
int32_t code = 0;
SSchJob *pJob = taosMemoryCalloc(1, sizeof(SSchJob));
if (NULL == pJob) {
qError("QID:%" PRIx64 " calloc %d failed", pDag->queryId, (int32_t)sizeof(SSchJob));
@ -141,7 +141,6 @@ _return:
SCH_RET(code);
}
void schFreeRpcCtx(SRpcCtx *pCtx) {
if (NULL == pCtx) {
return;
@ -1238,23 +1237,24 @@ _return:
}
int32_t schGetTaskFromTaskList(SHashObj *pTaskList, uint64_t taskId, SSchTask **pTask) {
int32_t s = taosHashGetSize(pTaskList);
if (s <= 0) {
return TSDB_CODE_SUCCESS;
}
int32_t s = taosHashGetSize(pTaskList);
if (s <= 0) {
return TSDB_CODE_SUCCESS;
}
SSchTask **task = taosHashGet(pTaskList, &taskId, sizeof(taskId));
if (NULL == task || NULL == (*task)) {
return TSDB_CODE_SUCCESS;
}
SSchTask **task = taosHashGet(pTaskList, &taskId, sizeof(taskId));
if (NULL == task || NULL == (*task)) {
return TSDB_CODE_SUCCESS;
}
*pTask = *task;
*pTask = *task;
return TSDB_CODE_SUCCESS;
return TSDB_CODE_SUCCESS;
}
int32_t schUpdateTaskExecNodeHandle(SSchTask *pTask, void *handle, int32_t rspCode) {
if (rspCode || NULL == pTask->execNodes || taosArrayGetSize(pTask->execNodes) > 1 || taosArrayGetSize(pTask->execNodes) <= 0) {
if (rspCode || NULL == pTask->execNodes || taosArrayGetSize(pTask->execNodes) > 1 ||
taosArrayGetSize(pTask->execNodes) <= 0) {
return TSDB_CODE_SUCCESS;
}
@ -1264,7 +1264,6 @@ int32_t schUpdateTaskExecNodeHandle(SSchTask *pTask, void *handle, int32_t rspCo
return TSDB_CODE_SUCCESS;
}
int32_t schHandleCallback(void *param, const SDataBuf *pMsg, int32_t msgType, int32_t rspCode) {
int32_t code = 0;
SSchTaskCallbackParam *pParam = (SSchTaskCallbackParam *)param;
@ -1282,13 +1281,15 @@ int32_t schHandleCallback(void *param, const SDataBuf *pMsg, int32_t msgType, in
if (TDMT_VND_EXPLAIN_RSP == msgType) {
schGetTaskFromTaskList(pJob->succTasks, pParam->taskId, &pTask);
} else {
SCH_JOB_ELOG("task not found in execTask list, refId:%" PRIx64 ", taskId:%" PRIx64, pParam->refId, pParam->taskId);
SCH_JOB_ELOG("task not found in execTask list, refId:%" PRIx64 ", taskId:%" PRIx64, pParam->refId,
pParam->taskId);
SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR);
}
}
if (NULL == pTask) {
SCH_JOB_ELOG("task not found in execList & succList, refId:%" PRIx64 ", taskId:%" PRIx64, pParam->refId, pParam->taskId);
SCH_JOB_ELOG("task not found in execList & succList, refId:%" PRIx64 ", taskId:%" PRIx64, pParam->refId,
pParam->taskId);
SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR);
}
@ -1444,7 +1445,7 @@ int32_t schGetCallbackFp(int32_t msgType, __async_send_cb_fn_t *fp) {
}
int32_t schGenerateTaskCallBackAHandle(SSchJob *pJob, SSchTask *pTask, int32_t msgType, SMsgSendInfo **pMsgSendInfo) {
int32_t code = 0;
int32_t code = 0;
SMsgSendInfo *msgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
if (NULL == msgSendInfo) {
SCH_TASK_ELOG("calloc %d failed", (int32_t)sizeof(SMsgSendInfo));
@ -1565,7 +1566,7 @@ _return:
}
int32_t schMakeQueryRpcCtx(SSchJob *pJob, SSchTask *pTask, SRpcCtx *pCtx) {
int32_t code = 0;
int32_t code = 0;
SMsgSendInfo *pReadyMsgSendInfo = NULL;
SMsgSendInfo *pExplainMsgSendInfo = NULL;
@ -1578,7 +1579,7 @@ int32_t schMakeQueryRpcCtx(SSchJob *pJob, SSchTask *pTask, SRpcCtx *pCtx) {
SCH_ERR_JRET(schGenerateTaskCallBackAHandle(pJob, pTask, TDMT_VND_RES_READY, &pReadyMsgSendInfo));
SCH_ERR_JRET(schGenerateTaskCallBackAHandle(pJob, pTask, TDMT_VND_EXPLAIN, &pExplainMsgSendInfo));
int32_t msgType = TDMT_VND_RES_READY_RSP;
int32_t msgType = TDMT_VND_RES_READY_RSP;
SRpcCtxVal ctxVal = {.val = pReadyMsgSendInfo, .clone = schCloneSMsgSendInfo, .freeFunc = schFreeRpcCtxVal};
if (taosHashPut(pCtx->args, &msgType, sizeof(msgType), &ctxVal, sizeof(ctxVal))) {
SCH_TASK_ELOG("taosHashPut msg %d to rpcCtx failed", msgType);
@ -2319,7 +2320,7 @@ _return:
}
int32_t schExecStaticExplain(void *transport, SArray *pNodeList, SQueryPlan *pDag, int64_t *job, const char *sql,
bool syncSchedule) {
bool syncSchedule) {
qDebug("QID:0x%" PRIx64 " job started", pDag->queryId);
int32_t code = 0;
@ -2672,7 +2673,7 @@ int32_t schedulerGetTasksStatus(int64_t job, SArray *pSub) {
SSchLevel *pLevel = taosArrayGet(pJob->levels, i);
for (int32_t m = 0; m < pLevel->taskNum; ++m) {
SSchTask *pTask = taosArrayGet(pLevel->subTasks, m);
SSchTask *pTask = taosArrayGet(pLevel->subTasks, m);
SQuerySubDesc subDesc = {.tid = pTask->taskId, .status = pTask->status};
taosArrayPush(pSub, &subDesc);
@ -2682,7 +2683,6 @@ int32_t schedulerGetTasksStatus(int64_t job, SArray *pSub) {
return TSDB_CODE_SUCCESS;
}
int32_t scheduleCancelJob(int64_t job) {
SSchJob *pJob = schAcquireJob(job);
if (NULL == pJob) {
@ -2738,11 +2738,10 @@ void schedulerDestroy(void) {
while (pJob) {
taosRemoveRef(schMgmt.jobRef, pJob->refId);
pJob = taosIterateRef(schMgmt.jobRef, pJob->refId);
pJob = taosIterateRef(schMgmt.jobRef, pJob);
}
taosCloseRef(schMgmt.jobRef);
schMgmt.jobRef = 0;
}
}