diff --git a/source/libs/scheduler/inc/schInt.h b/source/libs/scheduler/inc/schInt.h index ae120a42be..8e8652aab5 100644 --- a/source/libs/scheduler/inc/schInt.h +++ b/source/libs/scheduler/inc/schInt.h @@ -175,7 +175,7 @@ typedef struct SSchLevel { int32_t taskNum; int32_t taskLaunchedNum; int32_t taskDoneNum; - SArray *subTasks; // Element is SQueryTask + SArray *subTasks; // Element is SSchTask } SSchLevel; typedef struct SSchTaskProfile { @@ -213,6 +213,7 @@ typedef struct SSchTask { typedef struct SSchJobAttr { EExplainMode explainMode; bool queryJob; + bool needFetch; bool needFlowCtrl; } SSchJobAttr; @@ -318,11 +319,11 @@ extern SSchedulerMgmt schMgmt; #define SCH_JOB_NEED_FLOW_CTRL(_job) ((_job)->attr.needFlowCtrl) #define SCH_TASK_NEED_FLOW_CTRL(_job, _task) (SCH_IS_DATA_SRC_QRY_TASK(_task) && SCH_JOB_NEED_FLOW_CTRL(_job) && SCH_IS_LEVEL_UNFINISHED((_task)->level)) -#define SCH_SET_JOB_TYPE(_job, type) (_job)->attr.queryJob = ((type) != SUBPLAN_TYPE_MODIFY) +#define SCH_SET_JOB_TYPE(_job, type) do { if ((type) != SUBPLAN_TYPE_MODIFY) { (_job)->attr.queryJob = true; } } while (0) #define SCH_IS_QUERY_JOB(_job) ((_job)->attr.queryJob) -#define SCH_JOB_NEED_FETCH(_job) SCH_IS_QUERY_JOB(_job) -#define SCH_IS_WAIT_ALL_JOB(_job) (!SCH_IS_QUERY_JOB(_job)) -#define SCH_IS_NEED_DROP_JOB(_job) (SCH_IS_QUERY_JOB(_job)) +#define SCH_JOB_NEED_FETCH(_job) ((_job)->attr.needFetch) +#define SCH_JOB_NEED_WAIT(_job) (!SCH_IS_QUERY_JOB(_job)) +#define SCH_JOB_NEED_DROP(_job) (SCH_IS_QUERY_JOB(_job)) #define SCH_IS_EXPLAIN_JOB(_job) (EXPLAIN_MODE_ANALYZE == (_job)->attr.explainMode) #define SCH_NETWORK_ERR(_code) ((_code) == TSDB_CODE_RPC_BROKEN_LINK || (_code) == TSDB_CODE_RPC_NETWORK_UNAVAIL) #define SCH_SUB_TASK_NETWORK_ERR(_code, _len) (SCH_NETWORK_ERR(_code) && ((_len) > 0)) diff --git a/source/libs/scheduler/src/schJob.c b/source/libs/scheduler/src/schJob.c index c4923b8740..858f68e7ae 100644 --- a/source/libs/scheduler/src/schJob.c +++ b/source/libs/scheduler/src/schJob.c @@ -230,9 +230,16 @@ int32_t schBuildTaskRalation(SSchJob *pJob, SHashObj *planToTask) { } SSchLevel *pLevel = taosArrayGet(pJob->levels, 0); - if (SCH_IS_QUERY_JOB(pJob) && pLevel->taskNum > 1) { - SCH_JOB_ELOG("invalid query plan, level:0, taskNum:%d", pLevel->taskNum); - SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR); + if (SCH_IS_QUERY_JOB(pJob)) { + if (pLevel->taskNum > 1) { + SCH_JOB_ELOG("invalid query plan, level:0, taskNum:%d", pLevel->taskNum); + SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR); + } + + SSchTask* pTask = taosArrayGet(pLevel->subTasks, 0); + if (SUBPLAN_TYPE_MODIFY != pTask->plan->subplanType) { + pJob->attr.needFetch = true; + } } return TSDB_CODE_SUCCESS; @@ -371,9 +378,12 @@ _return: int32_t schDumpJobExecRes(SSchJob* pJob, SExecResult* pRes) { pRes->code = atomic_load_32(&pJob->errCode); pRes->numOfRows = pJob->resNumOfRows; - memcpy(pRes, &pJob->execRes, sizeof(pJob->execRes)); + pRes->res = pJob->execRes.res; + pRes->msgType = pJob->execRes.msgType; pJob->execRes.res = NULL; + SCH_JOB_DLOG("execRes dumped, code: %s", tstrerror(pRes->code)); + return TSDB_CODE_SUCCESS; } @@ -434,12 +444,12 @@ int32_t schNotifyUserFetchRes(SSchJob* pJob) { void schPostJobRes(SSchJob *pJob, SCH_OP_TYPE op) { if (SCH_OP_NULL == pJob->opStatus.op) { - SCH_JOB_DLOG("job not in any op, no need to post job res, status:%s", jobTaskStatusStr(pJob->status)); + SCH_JOB_DLOG("job not in any operation, no need to post job res, status:%s", jobTaskStatusStr(pJob->status)); return; } if (op && pJob->opStatus.op != op) { - SCH_JOB_ELOG("job in op %s mis-match with expected %s", schGetOpStr(pJob->opStatus.op), schGetOpStr(op)); + SCH_JOB_ELOG("job in operation %s mis-match with expected %s", schGetOpStr(pJob->opStatus.op), schGetOpStr(op)); return; } @@ -754,23 +764,21 @@ void schProcessOnOpEnd(SSchJob *pJob, SCH_OP_TYPE type, SSchedulerReq* pReq, int switch (type) { case SCH_OP_EXEC: -/* - op = atomic_val_compare_exchange_32(&pJob->opStatus.op, type, SCH_OP_NULL); - if (SCH_OP_NULL == op || op != type) { - SCH_JOB_ELOG("job not in %s operation, op:%s, status:%s", schGetOpStr(type), schGetOpStr(op), jobTaskStatusStr(pJob->status)); - } -*/ if (pReq && pReq->syncReq) { + op = atomic_val_compare_exchange_32(&pJob->opStatus.op, type, SCH_OP_NULL); + if (SCH_OP_NULL == op || op != type) { + SCH_JOB_ELOG("job not in %s operation, op:%s, status:%s", schGetOpStr(type), schGetOpStr(op), jobTaskStatusStr(pJob->status)); + } schDumpJobExecRes(pJob, pReq->pExecRes); } break; case SCH_OP_FETCH: -/* - op = atomic_val_compare_exchange_32(&pJob->opStatus.op, type, SCH_OP_NULL); - if (SCH_OP_NULL == op || op != type) { - SCH_JOB_ELOG("job not in %s operation, op:%s, status:%s", schGetOpStr(type), schGetOpStr(op), jobTaskStatusStr(pJob->status)); + if (pReq && pReq->syncReq) { + op = atomic_val_compare_exchange_32(&pJob->opStatus.op, type, SCH_OP_NULL); + if (SCH_OP_NULL == op || op != type) { + SCH_JOB_ELOG("job not in %s operation, op:%s, status:%s", schGetOpStr(type), schGetOpStr(op), jobTaskStatusStr(pJob->status)); + } } -*/ break; case SCH_OP_GET_STATUS: errCode = TSDB_CODE_SUCCESS; diff --git a/source/libs/scheduler/src/schTask.c b/source/libs/scheduler/src/schTask.c index 4da8ed446b..be33d686c8 100644 --- a/source/libs/scheduler/src/schTask.c +++ b/source/libs/scheduler/src/schTask.c @@ -170,7 +170,7 @@ int32_t schProcessOnTaskFailure(SSchJob *pJob, SSchTask *pTask, int32_t errCode) SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_FAIL); - if (SCH_IS_WAIT_ALL_JOB(pJob)) { + if (SCH_JOB_NEED_WAIT(pJob)) { SCH_LOCK(SCH_WRITE, &pTask->level->lock); pTask->level->taskFailed++; taskDone = pTask->level->taskSucceed + pTask->level->taskFailed; @@ -212,7 +212,7 @@ int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) { int32_t parentNum = pTask->parents ? (int32_t)taosArrayGetSize(pTask->parents) : 0; if (parentNum == 0) { int32_t taskDone = 0; - if (SCH_IS_WAIT_ALL_JOB(pJob)) { + if (SCH_JOB_NEED_WAIT(pJob)) { SCH_LOCK(SCH_WRITE, &pTask->level->lock); pTask->level->taskSucceed++; taskDone = pTask->level->taskSucceed + pTask->level->taskFailed; @@ -792,7 +792,7 @@ int32_t schLaunchLevelTasks(SSchJob *pJob, SSchLevel *level) { } void schDropTaskInHashList(SSchJob *pJob, SHashObj *list) { - if (!SCH_IS_NEED_DROP_JOB(pJob)) { + if (!SCH_JOB_NEED_DROP(pJob)) { return; }