enh: refactor scheduler code

This commit is contained in:
dapan1121 2022-07-02 16:59:49 +08:00
parent f9c9afe61a
commit 211ae46a05
16 changed files with 1115 additions and 1111 deletions

View File

@ -29,12 +29,13 @@ extern "C" {
typedef enum {
JOB_TASK_STATUS_NULL = 0,
JOB_TASK_STATUS_NOT_START = 1,
JOB_TASK_STATUS_EXECUTING,
JOB_TASK_STATUS_PARTIAL_SUCCEED,
JOB_TASK_STATUS_SUCCEED,
JOB_TASK_STATUS_FAILED,
JOB_TASK_STATUS_DROPPING,
JOB_TASK_STATUS_INIT,
JOB_TASK_STATUS_EXEC,
JOB_TASK_STATUS_PART_SUCC,
JOB_TASK_STATUS_SUCC,
JOB_TASK_STATUS_FAIL,
JOB_TASK_STATUS_DROP,
JOB_TASK_STATUS_MAX,
} EJobTaskType;
typedef enum {

View File

@ -74,6 +74,7 @@ typedef void (*schedulerFetchFp)(void* pResult, void* param, int32_t code);
typedef bool (*schedulerChkKillFp)(void* param);
typedef struct SSchedulerReq {
bool syncReq;
SRequestConnInfo *pConn;
SArray *pNodeList;
SQueryPlan *pDag;
@ -83,36 +84,17 @@ typedef struct SSchedulerReq {
void* execParam;
schedulerChkKillFp chkKillFp;
void* chkKillParam;
SQueryResult* pQueryRes;
} SSchedulerReq;
int32_t schedulerInit(SSchedulerCfg *cfg);
/**
* Process the query job, generated according to the query physical plan.
* This is a synchronized API, and is also thread-safety.
* @param nodeList Qnode/Vnode address list, element is SQueryNodeAddr
* @return
*/
int32_t schedulerExecJob(SSchedulerReq *pReq, int64_t *pJob, SQueryResult *pRes);
int32_t schedulerExecJob(SSchedulerReq *pReq, int64_t *pJob);
/**
* Process the query job, generated according to the query physical plan.
* This is a asynchronized API, and is also thread-safety.
* @param pNodeList Qnode/Vnode address list, element is SQueryNodeAddr
* @return
*/
int32_t schedulerAsyncExecJob(SSchedulerReq *pReq, int64_t *pJob);
/**
* Fetch query result from the remote query executor
* @param pJob
* @param data
* @return
*/
int32_t schedulerFetchRows(int64_t job, void **data);
void schedulerAsyncFetchRows(int64_t job, schedulerFetchFp fp, void* param);
void schedulerFetchRowsA(int64_t job, schedulerFetchFp fp, void* param);
int32_t schedulerGetTasksStatus(int64_t job, SArray *pSub);

View File

@ -631,17 +631,21 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList
SRequestConnInfo conn = {.pTrans = pRequest->pTscObj->pAppInfo->pTransporter,
.requestId = pRequest->requestId,
.requestObjRefId = pRequest->self};
SSchedulerReq req = {.pConn = &conn,
.pNodeList = pNodeList,
.pDag = pDag,
.sql = pRequest->sqlstr,
.startTs = pRequest->metric.start,
.execFp = NULL,
.execParam = NULL,
.chkKillFp = chkRequestKilled,
.chkKillParam = (void*)pRequest->self};
SSchedulerReq req = {
.syncReq = true,
.pConn = &conn,
.pNodeList = pNodeList,
.pDag = pDag,
.sql = pRequest->sqlstr,
.startTs = pRequest->metric.start,
.execFp = NULL,
.execParam = NULL,
.chkKillFp = chkRequestKilled,
.chkKillParam = (void*)pRequest->self
.pQueryRes = &res,
};
int32_t code = schedulerExecJob(&req, &pRequest->body.queryJob, &res);
int32_t code = schedulerExecJob(&req, &pRequest->body.queryJob);
pRequest->body.resInfo.execRes = res.res;
if (code != TSDB_CODE_SUCCESS) {
@ -939,16 +943,20 @@ void launchAsyncQuery(SRequestObj* pRequest, SQuery* pQuery, SMetaData* pResultM
SRequestConnInfo conn = {
.pTrans = pAppInfo->pTransporter, .requestId = pRequest->requestId, .requestObjRefId = pRequest->self};
SSchedulerReq req = {.pConn = &conn,
.pNodeList = pNodeList,
.pDag = pDag,
.sql = pRequest->sqlstr,
.startTs = pRequest->metric.start,
.execFp = schedulerExecCb,
.execParam = pRequest,
.chkKillFp = chkRequestKilled,
.chkKillParam = (void*)pRequest->self};
code = schedulerAsyncExecJob(&req, &pRequest->body.queryJob);
SSchedulerReq req = {
.syncReq = false,
.pConn = &conn,
.pNodeList = pNodeList,
.pDag = pDag,
.sql = pRequest->sqlstr,
.startTs = pRequest->metric.start,
.execFp = schedulerExecCb,
.execParam = pRequest,
.chkKillFp = chkRequestKilled,
.chkKillParam = (void*)pRequest->self,
.pQueryRes = NULL,
};
code = schedulerExecJob(&req, &pRequest->body.queryJob);
taosArrayDestroy(pNodeList);
} else {
tscDebug("0x%" PRIx64 " plan not executed, code:%s 0x%" PRIx64, pRequest->self, tstrerror(code),

View File

@ -863,7 +863,7 @@ void taos_fetch_rows_a(TAOS_RES *res, __taos_async_fn_t fp, void *param) {
}
}
schedulerAsyncFetchRows(pRequest->body.queryJob, fetchCallback, pRequest);
schedulerFetchRowsA(pRequest->body.queryJob, fetchCallback, pRequest);
}
void taos_fetch_raw_block_a(TAOS_RES *res, __taos_async_fn_t fp, void *param) {

View File

@ -171,17 +171,17 @@ char* jobTaskStatusStr(int32_t status) {
switch (status) {
case JOB_TASK_STATUS_NULL:
return "NULL";
case JOB_TASK_STATUS_NOT_START:
return "NOT_START";
case JOB_TASK_STATUS_EXECUTING:
case JOB_TASK_STATUS_INIT:
return "INIT";
case JOB_TASK_STATUS_EXEC:
return "EXECUTING";
case JOB_TASK_STATUS_PARTIAL_SUCCEED:
case JOB_TASK_STATUS_PART_SUCC:
return "PARTIAL_SUCCEED";
case JOB_TASK_STATUS_SUCCEED:
case JOB_TASK_STATUS_SUCC:
return "SUCCEED";
case JOB_TASK_STATUS_FAILED:
case JOB_TASK_STATUS_FAIL:
return "FAILED";
case JOB_TASK_STATUS_DROPPING:
case JOB_TASK_STATUS_DROP:
return "DROPPING";
default:
break;

View File

@ -226,8 +226,8 @@ typedef struct SQWorkerMgmt {
#define QW_TASK_NOT_EXIST(code) (TSDB_CODE_QRY_SCH_NOT_EXIST == (code) || TSDB_CODE_QRY_TASK_NOT_EXIST == (code))
#define QW_TASK_ALREADY_EXIST(code) (TSDB_CODE_QRY_TASK_ALREADY_EXIST == (code))
#define QW_TASK_READY(status) \
(status == JOB_TASK_STATUS_SUCCEED || status == JOB_TASK_STATUS_FAILED || status == JOB_TASK_STATUS_CANCELLED || \
status == JOB_TASK_STATUS_PARTIAL_SUCCEED)
(status == JOB_TASK_STATUS_SUCC || status == JOB_TASK_STATUS_FAIL || status == JOB_TASK_STATUS_CANCELLED || \
status == JOB_TASK_STATUS_PART_SUCC)
#define QW_SET_QTID(id, qId, tId, eId) \
do { \
*(uint64_t *)(id) = (qId); \

View File

@ -19,7 +19,7 @@ int32_t qwDbgValidateStatus(QW_FPARAMS_DEF, int8_t oriStatus, int8_t newStatus,
int32_t code = 0;
if (oriStatus == newStatus) {
if (newStatus == JOB_TASK_STATUS_EXECUTING || newStatus == JOB_TASK_STATUS_FAILED) {
if (newStatus == JOB_TASK_STATUS_EXEC || newStatus == JOB_TASK_STATUS_FAIL) {
*ignore = true;
return TSDB_CODE_SUCCESS;
}
@ -29,47 +29,47 @@ int32_t qwDbgValidateStatus(QW_FPARAMS_DEF, int8_t oriStatus, int8_t newStatus,
switch (oriStatus) {
case JOB_TASK_STATUS_NULL:
if (newStatus != JOB_TASK_STATUS_EXECUTING && newStatus != JOB_TASK_STATUS_FAILED &&
newStatus != JOB_TASK_STATUS_NOT_START) {
if (newStatus != JOB_TASK_STATUS_EXEC && newStatus != JOB_TASK_STATUS_FAIL &&
newStatus != JOB_TASK_STATUS_INIT) {
QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
}
break;
case JOB_TASK_STATUS_NOT_START:
if (newStatus != JOB_TASK_STATUS_DROPPING && newStatus != JOB_TASK_STATUS_EXECUTING
&& newStatus != JOB_TASK_STATUS_FAILED) {
case JOB_TASK_STATUS_INIT:
if (newStatus != JOB_TASK_STATUS_DROP && newStatus != JOB_TASK_STATUS_EXEC
&& newStatus != JOB_TASK_STATUS_FAIL) {
QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
}
break;
case JOB_TASK_STATUS_EXECUTING:
if (newStatus != JOB_TASK_STATUS_PARTIAL_SUCCEED && newStatus != JOB_TASK_STATUS_SUCCEED &&
newStatus != JOB_TASK_STATUS_FAILED && newStatus != JOB_TASK_STATUS_DROPPING) {
case JOB_TASK_STATUS_EXEC:
if (newStatus != JOB_TASK_STATUS_PART_SUCC && newStatus != JOB_TASK_STATUS_SUCC &&
newStatus != JOB_TASK_STATUS_FAIL && newStatus != JOB_TASK_STATUS_DROP) {
QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
}
break;
case JOB_TASK_STATUS_PARTIAL_SUCCEED:
if (newStatus != JOB_TASK_STATUS_EXECUTING && newStatus != JOB_TASK_STATUS_SUCCEED &&
newStatus != JOB_TASK_STATUS_FAILED && newStatus != JOB_TASK_STATUS_DROPPING) {
case JOB_TASK_STATUS_PART_SUCC:
if (newStatus != JOB_TASK_STATUS_EXEC && newStatus != JOB_TASK_STATUS_SUCC &&
newStatus != JOB_TASK_STATUS_FAIL && newStatus != JOB_TASK_STATUS_DROP) {
QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
}
break;
case JOB_TASK_STATUS_SUCCEED:
if (newStatus != JOB_TASK_STATUS_DROPPING && newStatus != JOB_TASK_STATUS_FAILED) {
case JOB_TASK_STATUS_SUCC:
if (newStatus != JOB_TASK_STATUS_DROP && newStatus != JOB_TASK_STATUS_FAIL) {
QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
}
break;
case JOB_TASK_STATUS_FAILED:
if (newStatus != JOB_TASK_STATUS_DROPPING) {
case JOB_TASK_STATUS_FAIL:
if (newStatus != JOB_TASK_STATUS_DROP) {
QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
}
break;
case JOB_TASK_STATUS_DROPPING:
if (newStatus != JOB_TASK_STATUS_FAILED && newStatus != JOB_TASK_STATUS_PARTIAL_SUCCEED) {
case JOB_TASK_STATUS_DROP:
if (newStatus != JOB_TASK_STATUS_FAIL && newStatus != JOB_TASK_STATUS_PART_SUCC) {
QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
}
break;

View File

@ -206,7 +206,7 @@ int32_t qwGetQueryResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen,
QW_TASK_DLOG_E("no data in sink and query end");
qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_SUCCEED);
qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_SUCC);
QW_ERR_RET(qwMallocFetchRsp(len, &rsp));
*rspMsg = rsp;
@ -236,7 +236,7 @@ int32_t qwGetQueryResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen,
if (DS_BUF_EMPTY == pOutput->bufStatus && pOutput->queryEnd) {
QW_TASK_DLOG_E("task all data fetched, done");
qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_SUCCEED);
qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_SUCC);
}
return TSDB_CODE_SUCCESS;
@ -330,7 +330,7 @@ int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inpu
break;
}
QW_ERR_JRET(qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_EXECUTING));
QW_ERR_JRET(qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_EXEC));
break;
}
case QW_PHASE_PRE_FETCH: {
@ -447,7 +447,7 @@ int32_t qwHandlePostPhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inp
_return:
if (TSDB_CODE_SUCCESS == code && QW_PHASE_POST_QUERY == phase) {
qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_PARTIAL_SUCCEED);
qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_PART_SUCC);
}
if (rspConnection) {
@ -467,7 +467,7 @@ _return:
}
if (code) {
qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_FAILED);
qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_FAIL);
}
QW_TASK_DLOG("end to handle event at phase %s, code:%x - %s", qwPhaseStr(phase), code, tstrerror(code));
@ -499,7 +499,7 @@ int32_t qwPrerocessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
ctx->ctrlConnInfo = qwMsg->connInfo;
QW_ERR_JRET(qwAddTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_NOT_START));
QW_ERR_JRET(qwAddTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_INIT));
_return:
@ -698,7 +698,7 @@ int32_t qwProcessFetch(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
if (QW_IS_QUERY_RUNNING(ctx)) {
atomic_store_8((int8_t *)&ctx->queryContinue, 1);
} else if (0 == atomic_load_8((int8_t *)&ctx->queryInQueue)) {
qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_EXECUTING);
qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_EXEC);
atomic_store_8((int8_t *)&ctx->queryInQueue, 1);
@ -749,7 +749,7 @@ int32_t qwProcessDrop(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
if (QW_IS_QUERY_RUNNING(ctx)) {
QW_ERR_JRET(qwKillTaskHandle(QW_FPARAMS(), ctx));
qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_DROPPING);
qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_DROP);
} else if (ctx->phase > 0) {
QW_ERR_JRET(qwDropTask(QW_FPARAMS()));
rsped = true;
@ -770,7 +770,7 @@ _return:
QW_UPDATE_RSP_CODE(ctx, code);
}
qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_FAILED);
qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_FAIL);
}
if (locked) {

View File

@ -54,6 +54,13 @@ typedef enum {
SCH_OP_FETCH,
} SCH_OP_TYPE;
typedef enum {
SCH_EVENT_ENTER_API = 1,
SCH_EVENT_LEAVE_API,
SCH_EVENT_MSG,
SCH_EVENT_DROP,
} SCH_EVENT_TYPE;
typedef struct SSchTrans {
void *pTrans;
void *pHandle;
@ -104,6 +111,22 @@ typedef struct SSchResInfo {
void* userParam;
} SSchResInfo;
typedef struct SSchEvent {
SCH_EVENT_TYPE event;
void* info;
} SSchEvent;
typedef int32_t (*schStatusEnterFp)(void* pHandle, void* pParam);
typedef int32_t (*schStatusLeaveFp)(void* pHandle, void* pParam);
typedef int32_t (*schStatusEventFp)(void* pHandle, void* pParam, void* pEvent);
typedef struct SSchStatusFps {
EJobTaskType status;
schStatusEnterFp enterFp;
schStatusLeaveFp leaveFp;
schStatusEventFp eventFp;
} SSchStatusFps;
typedef struct SSchedulerMgmt {
uint64_t taskId; // sequential taksId
uint64_t sId; // schedulerId
@ -200,7 +223,7 @@ typedef struct SSchJobAttr {
typedef struct {
int32_t op;
bool sync;
bool syncReq;
} SSchOpStatus;
typedef struct SSchJob {
@ -349,7 +372,7 @@ int32_t schDecTaskFlowQuota(SSchJob *pJob, SSchTask *pTask);
int32_t schCheckIncTaskFlowQuota(SSchJob *pJob, SSchTask *pTask, bool *enough);
int32_t schLaunchTasksInFlowCtrlList(SSchJob *pJob, SSchTask *pTask);
int32_t schLaunchTaskImpl(SSchJob *pJob, SSchTask *pTask);
int32_t schFetchFromRemote(SSchJob *pJob);
int32_t schLaunchFetchTask(SSchJob *pJob);
int32_t schProcessOnTaskFailure(SSchJob *pJob, SSchTask *pTask, int32_t errCode);
int32_t schBuildAndSendHbMsg(SQueryNodeEpId *nodeEpId, SArray* taskAction);
int32_t schCloneSMsgSendInfo(void *src, void **dst);
@ -371,22 +394,21 @@ void schFreeRpcCtxVal(const void *arg);
int32_t schMakeBrokenLinkVal(SSchJob *pJob, SSchTask *pTask, SRpcBrokenlinkVal *brokenVal, bool isHb);
int32_t schAppendTaskExecNode(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, int32_t execId);
int32_t schExecStaticExplainJob(SSchedulerReq *pReq, int64_t *job, bool sync);
int32_t schExecJobImpl(SSchedulerReq *pReq, SSchJob *pJob, bool sync);
int32_t schUpdateJobStatus(SSchJob *pJob, int8_t newStatus);
int32_t schCancelJob(SSchJob *pJob);
int32_t schProcessOnJobDropped(SSchJob *pJob, int32_t errCode);
uint64_t schGenTaskId(void);
void schCloseJobRef(void);
int32_t schExecJob(SSchedulerReq *pReq, int64_t *pJob, SQueryResult *pRes);
int32_t schAsyncExecJob(SSchedulerReq *pReq, int64_t *pJob);
int32_t schFetchRows(SSchJob *pJob);
int32_t schAsyncFetchRows(SSchJob *pJob);
int32_t schJobFetchRows(SSchJob *pJob);
int32_t schJobFetchRowsA(SSchJob *pJob);
int32_t schUpdateTaskHandle(SSchJob *pJob, SSchTask *pTask, bool dropExecNode, void *handle, int32_t execId);
int32_t schProcessOnTaskStatusRsp(SQueryNodeEpId* pEpId, SArray* pStatusList);
void schFreeSMsgSendInfo(SMsgSendInfo *msgSendInfo);
char* schGetOpStr(SCH_OP_TYPE type);
int32_t schBeginOperation(SSchJob *pJob, SCH_OP_TYPE type, bool sync);
int32_t schInitJob(SSchedulerReq *pReq, SSchJob **pSchJob);
int32_t schInitJob(SSchJob **pJob, SSchedulerReq *pReq);
int32_t schExecJob(SSchJob *pJob, SSchedulerReq *pReq);
int32_t schSetJobQueryRes(SSchJob* pJob, SQueryResult* pRes);
int32_t schUpdateTaskCandidateAddr(SSchJob *pJob, SSchTask *pTask, SEpSet* pEpSet);
int32_t schHandleRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf* pData, int32_t rspCode);

File diff suppressed because it is too large Load Diff

View File

@ -37,7 +37,7 @@ int32_t schValidateReceivedMsgType(SSchJob *pJob, SSchTask *pTask, int32_t msgTy
TMSG_INFO(msgType));
}
if (taskStatus != JOB_TASK_STATUS_EXECUTING && taskStatus != JOB_TASK_STATUS_PARTIAL_SUCCEED) {
if (taskStatus != JOB_TASK_STATUS_EXEC && taskStatus != JOB_TASK_STATUS_PART_SUCC) {
SCH_TASK_DLOG("rsp msg conflicted with task status, status:%s, rspType:%s", jobTaskStatusStr(taskStatus),
TMSG_INFO(msgType));
}
@ -51,7 +51,7 @@ int32_t schValidateReceivedMsgType(SSchJob *pJob, SSchTask *pTask, int32_t msgTy
SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
}
if (taskStatus != JOB_TASK_STATUS_EXECUTING && taskStatus != JOB_TASK_STATUS_PARTIAL_SUCCEED) {
if (taskStatus != JOB_TASK_STATUS_EXEC && taskStatus != JOB_TASK_STATUS_PART_SUCC) {
SCH_TASK_ELOG("rsp msg conflicted with task status, status:%s, rspType:%s", jobTaskStatusStr(taskStatus),
TMSG_INFO(msgType));
SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
@ -76,7 +76,7 @@ int32_t schValidateReceivedMsgType(SSchJob *pJob, SSchTask *pTask, int32_t msgTy
SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
}
if (taskStatus != JOB_TASK_STATUS_EXECUTING && taskStatus != JOB_TASK_STATUS_PARTIAL_SUCCEED) {
if (taskStatus != JOB_TASK_STATUS_EXEC && taskStatus != JOB_TASK_STATUS_PART_SUCC) {
SCH_TASK_ELOG("rsp msg conflicted with task status, status:%s, rspType:%s", jobTaskStatusStr(taskStatus),
TMSG_INFO(msgType));
SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
@ -308,7 +308,7 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch
return TSDB_CODE_SUCCESS;
}
SCH_ERR_JRET(schFetchFromRemote(pJob));
SCH_ERR_JRET(schLaunchFetchTask(pJob));
taosMemoryFreeClear(msg);
@ -325,7 +325,7 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch
atomic_add_fetch_32(&pJob->resNumOfRows, htonl(rsp->numOfRows));
if (rsp->completed) {
SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_SUCCEED);
SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_SUCC);
}
SCH_TASK_DLOG("got fetch rsp, rows:%d, complete:%d", htonl(rsp->numOfRows), rsp->completed);

View File

@ -0,0 +1,46 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "catalog.h"
#include "command.h"
#include "query.h"
#include "schInt.h"
#include "tmsg.h"
#include "tref.h"
#include "trpc.h"
SSchStatusFps gSchJobFps[JOB_TASK_STATUS_MAX] = {
{JOB_TASK_STATUS_NULL, schJobStNullEnter, schJobStNullLeave, schJobStNullEvent},
{JOB_TASK_STATUS_INIT, schJobStNullEnter, schJobStNullLeave, schJobStNullEvent},
{JOB_TASK_STATUS_EXEC, schJobStNullEnter, schJobStNullLeave, schJobStNullEvent},
{JOB_TASK_STATUS_PART_SUCC, schJobStNullEnter, schJobStNullLeave, schJobStNullEvent},
{JOB_TASK_STATUS_SUCC, schJobStNullEnter, schJobStNullLeave, schJobStNullEvent},
{JOB_TASK_STATUS_FAIL, schJobStNullEnter, schJobStNullLeave, schJobStNullEvent},
{JOB_TASK_STATUS_DROP, schJobStNullEnter, schJobStNullLeave, schJobStNullEvent},
};
SSchStatusFps gSchTaskFps[JOB_TASK_STATUS_MAX] = {
{JOB_TASK_STATUS_NULL, schTaskStatusNullEnter, schTaskStatusNullLeave, schTaskStatusNullEvent},
{JOB_TASK_STATUS_INIT, schTaskStatusNullEnter, schTaskStatusNullLeave, schTaskStatusNullEvent},
{JOB_TASK_STATUS_EXEC, schTaskStatusNullEnter, schTaskStatusNullLeave, schTaskStatusNullEvent},
{JOB_TASK_STATUS_PART_SUCC, schTaskStatusNullEnter, schTaskStatusNullLeave, schTaskStatusNullEvent},
{JOB_TASK_STATUS_SUCC, schTaskStatusNullEnter, schTaskStatusNullLeave, schTaskStatusNullEvent},
{JOB_TASK_STATUS_FAIL, schTaskStatusNullEnter, schTaskStatusNullLeave, schTaskStatusNullEvent},
{JOB_TASK_STATUS_DROP, schTaskStatusNullEnter, schTaskStatusNullLeave, schTaskStatusNullEvent},
};

View File

@ -0,0 +1,843 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "catalog.h"
#include "command.h"
#include "query.h"
#include "schedulerInt.h"
#include "tmsg.h"
#include "tref.h"
#include "trpc.h"
void schFreeTask(SSchJob *pJob, SSchTask *pTask) {
schDeregisterTaskHb(pJob, pTask);
if (pTask->candidateAddrs) {
taosArrayDestroy(pTask->candidateAddrs);
}
taosMemoryFreeClear(pTask->msg);
if (pTask->children) {
taosArrayDestroy(pTask->children);
}
if (pTask->parents) {
taosArrayDestroy(pTask->parents);
}
if (pTask->execNodes) {
taosHashCleanup(pTask->execNodes);
}
}
int32_t schInitTask(SSchJob *pJob, SSchTask *pTask, SSubplan *pPlan, SSchLevel *pLevel) {
pTask->plan = pPlan;
pTask->level = pLevel;
pTask->execId = -1;
pTask->maxExecTimes = SCH_TASK_MAX_EXEC_TIMES;
pTask->timeoutUsec = SCH_DEFAULT_TASK_TIMEOUT_USEC;
SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_INIT);
pTask->taskId = schGenTaskId();
pTask->execNodes = taosHashInit(SCH_MAX_CANDIDATE_EP_NUM, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
if (NULL == pTask->execNodes) {
SCH_TASK_ELOG("taosHashInit %d execNodes failed", SCH_MAX_CANDIDATE_EP_NUM);
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
return TSDB_CODE_SUCCESS;
}
int32_t schRecordTaskSucceedNode(SSchJob *pJob, SSchTask *pTask) {
SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx);
if (NULL == addr) {
SCH_TASK_ELOG("taosArrayGet candidate addr failed, idx:%d, size:%d", pTask->candidateIdx,
(int32_t)taosArrayGetSize(pTask->candidateAddrs));
SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
}
pTask->succeedAddr = *addr;
return TSDB_CODE_SUCCESS;
}
int32_t schAppendTaskExecNode(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, int32_t execId) {
SSchNodeInfo nodeInfo = {.addr = *addr, .handle = NULL};
if (taosHashPut(pTask->execNodes, &execId, sizeof(execId), &nodeInfo, sizeof(nodeInfo))) {
SCH_TASK_ELOG("taosHashPut nodeInfo to execNodes failed, errno:%d", errno);
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
SCH_TASK_DLOG("task execNode added, execId:%d", execId);
return TSDB_CODE_SUCCESS;
}
int32_t schDropTaskExecNode(SSchJob *pJob, SSchTask *pTask, void *handle, int32_t execId) {
if (NULL == pTask->execNodes) {
return TSDB_CODE_SUCCESS;
}
if (taosHashRemove(pTask->execNodes, &execId, sizeof(execId))) {
SCH_TASK_ELOG("fail to remove execId %d from execNodeList", execId);
} else {
SCH_TASK_DLOG("execId %d removed from execNodeList", execId);
}
if (execId != pTask->execId) { // ignore it
SCH_TASK_DLOG("execId %d is not current execId %d", execId, pTask->execId);
SCH_RET(TSDB_CODE_SCH_IGNORE_ERROR);
}
return TSDB_CODE_SUCCESS;
}
int32_t schUpdateTaskExecNode(SSchJob *pJob, SSchTask *pTask, void *handle, int32_t execId) {
if (taosHashGetSize(pTask->execNodes) <= 0) {
return TSDB_CODE_SUCCESS;
}
SSchNodeInfo *nodeInfo = taosHashGet(pTask->execNodes, &execId, sizeof(execId));
nodeInfo->handle = handle;
SCH_TASK_DLOG("handle updated to %p for execId %d", handle, execId);
return TSDB_CODE_SUCCESS;
}
int32_t schUpdateTaskHandle(SSchJob *pJob, SSchTask *pTask, bool dropExecNode, void *handle, int32_t execId) {
if (dropExecNode) {
SCH_RET(schDropTaskExecNode(pJob, pTask, handle, execId));
}
SCH_SET_TASK_HANDLE(pTask, handle);
schUpdateTaskExecNode(pJob, pTask, handle, execId);
return TSDB_CODE_SUCCESS;
}
// Note: no more task error processing, handled in function internal
int32_t schProcessOnTaskFailure(SSchJob *pJob, SSchTask *pTask, int32_t errCode) {
int8_t status = 0;
if (errCode == TSDB_CODE_SCH_TIMEOUT_ERROR) {
SCH_LOG_TASK_WAIT_TS(pTask);
} else {
SCH_LOG_TASK_END_TS(pTask);
}
if (schJobNeedToStop(pJob, &status)) {
SCH_TASK_DLOG("task failed not processed cause of job status, job status:%s", jobTaskStatusStr(status));
SCH_RET(atomic_load_32(&pJob->errCode));
}
bool needRetry = false;
bool moved = false;
int32_t taskDone = 0;
int32_t code = 0;
SCH_TASK_DLOG("taskOnFailure, code:%s", tstrerror(errCode));
SCH_ERR_JRET(schTaskCheckSetRetry(pJob, pTask, errCode, &needRetry));
if (!needRetry) {
SCH_TASK_ELOG("task failed and no more retry, code:%s", tstrerror(errCode));
if (SCH_GET_TASK_STATUS(pTask) != JOB_TASK_STATUS_EXEC) {
SCH_TASK_ELOG("task not in executing list, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
SCH_ERR_JRET(TSDB_CODE_SCH_STATUS_ERROR);
}
SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_FAIL);
if (SCH_IS_WAIT_ALL_JOB(pJob)) {
SCH_LOCK(SCH_WRITE, &pTask->level->lock);
pTask->level->taskFailed++;
taskDone = pTask->level->taskSucceed + pTask->level->taskFailed;
SCH_UNLOCK(SCH_WRITE, &pTask->level->lock);
schUpdateJobErrCode(pJob, errCode);
if (taskDone < pTask->level->taskNum) {
SCH_TASK_DLOG("need to wait other tasks, doneNum:%d, allNum:%d", taskDone, pTask->level->taskNum);
SCH_RET(errCode);
}
}
} else {
SCH_ERR_JRET(schHandleTaskRetry(pJob, pTask));
return TSDB_CODE_SUCCESS;
}
_return:
SCH_RET(schProcessOnJobFailure(pJob, errCode));
}
// Note: no more task error processing, handled in function internal
int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) {
bool moved = false;
int32_t code = 0;
SCH_TASK_DLOG("taskOnSuccess, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
SCH_LOG_TASK_END_TS(pTask);
SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_PART_SUCC);
SCH_ERR_JRET(schRecordTaskSucceedNode(pJob, pTask));
SCH_ERR_JRET(schLaunchTasksInFlowCtrlList(pJob, pTask));
int32_t parentNum = pTask->parents ? (int32_t)taosArrayGetSize(pTask->parents) : 0;
if (parentNum == 0) {
int32_t taskDone = 0;
if (SCH_IS_WAIT_ALL_JOB(pJob)) {
SCH_LOCK(SCH_WRITE, &pTask->level->lock);
pTask->level->taskSucceed++;
taskDone = pTask->level->taskSucceed + pTask->level->taskFailed;
SCH_UNLOCK(SCH_WRITE, &pTask->level->lock);
if (taskDone < pTask->level->taskNum) {
SCH_TASK_DLOG("wait all tasks, done:%d, all:%d", taskDone, pTask->level->taskNum);
return TSDB_CODE_SUCCESS;
} else if (taskDone > pTask->level->taskNum) {
SCH_TASK_ELOG("taskDone number invalid, done:%d, total:%d", taskDone, pTask->level->taskNum);
}
if (pTask->level->taskFailed > 0) {
SCH_RET(schProcessOnJobFailure(pJob, 0));
} else {
SCH_RET(schProcessOnJobPartialSuccess(pJob));
}
} else {
pJob->resNode = pTask->succeedAddr;
}
pJob->fetchTask = pTask;
SCH_RET(schProcessOnJobPartialSuccess(pJob));
}
/*
if (SCH_IS_DATA_SRC_TASK(task) && job->dataSrcEps.numOfEps < SCH_MAX_CANDIDATE_EP_NUM) {
strncpy(job->dataSrcEps.fqdn[job->dataSrcEps.numOfEps], task->execAddr.fqdn, sizeof(task->execAddr.fqdn));
job->dataSrcEps.port[job->dataSrcEps.numOfEps] = task->execAddr.port;
++job->dataSrcEps.numOfEps;
}
*/
for (int32_t i = 0; i < parentNum; ++i) {
SSchTask *parent = *(SSchTask **)taosArrayGet(pTask->parents, i);
int32_t readyNum = atomic_add_fetch_32(&parent->childReady, 1);
SCH_LOCK(SCH_WRITE, &parent->lock);
SDownstreamSourceNode source = {.type = QUERY_NODE_DOWNSTREAM_SOURCE,
.taskId = pTask->taskId,
.schedId = schMgmt.sId,
.execId = pTask->execId,
.addr = pTask->succeedAddr};
qSetSubplanExecutionNode(parent->plan, pTask->plan->id.groupId, &source);
SCH_UNLOCK(SCH_WRITE, &parent->lock);
if (SCH_TASK_READY_FOR_LAUNCH(readyNum, parent)) {
SCH_TASK_DLOG("all %d children task done, start to launch parent task 0x%" PRIx64, readyNum, parent->taskId);
SCH_ERR_RET(schLaunchTask(pJob, parent));
}
}
SCH_ERR_RET(schLaunchJobLowerLevel(pJob, pTask));
return TSDB_CODE_SUCCESS;
_return:
SCH_RET(schProcessOnJobFailure(pJob, code));
}
int32_t schRescheduleTask(SSchJob *pJob, SSchTask *pTask) {
if (SCH_IS_DATA_SRC_QRY_TASK(pTask)) {
return TSDB_CODE_SUCCESS;
}
SCH_LOCK_TASK(pTask);
if (SCH_TASK_TIMEOUT(pTask) && JOB_TASK_STATUS_EXEC == pTask->status &&
pJob->fetchTask != pTask && taosArrayGetSize(pTask->candidateAddrs) > 1) {
SCH_TASK_DLOG("task execId %d will be rescheduled now", pTask->execId);
schDropTaskOnExecNode(pJob, pTask);
taosHashClear(pTask->execNodes);
schProcessOnTaskFailure(pJob, pTask, TSDB_CODE_SCH_TIMEOUT_ERROR);
}
SCH_UNLOCK_TASK(pTask);
return TSDB_CODE_SUCCESS;
}
int32_t schDoTaskRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf* pData, int32_t rspCode) {
int32_t code = 0;
if ((pTask->execId + 1) >= pTask->maxExecTimes) {
SCH_TASK_DLOG("task no more retry since reach max try times, execId:%d", pTask->execId);
schProcessOnJobFailure(pJob, rspCode);
return TSDB_CODE_SUCCESS;
}
SCH_TASK_DLOG("task will be redirected now, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
schDropTaskOnExecNode(pJob, pTask);
taosHashClear(pTask->execNodes);
SCH_ERR_JRET(schRemoveTaskFromExecList(pJob, pTask));
schDeregisterTaskHb(pJob, pTask);
atomic_sub_fetch_32(&pTask->level->taskLaunchedNum, 1);
taosMemoryFreeClear(pTask->msg);
pTask->msgLen = 0;
pTask->lastMsgType = 0;
memset(&pTask->succeedAddr, 0, sizeof(pTask->succeedAddr));
if (SCH_IS_DATA_SRC_QRY_TASK(pTask)) {
if (pData) {
SCH_ERR_JRET(schUpdateTaskCandidateAddr(pJob, pTask, pData->pEpSet));
}
if (SCH_TASK_NEED_FLOW_CTRL(pJob, pTask)) {
if (JOB_TASK_STATUS_EXEC == SCH_GET_TASK_STATUS(pTask)) {
SCH_ERR_JRET(schLaunchTasksInFlowCtrlList(pJob, pTask));
}
}
SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_INIT);
SCH_ERR_JRET(schLaunchTask(pJob, pTask));
return TSDB_CODE_SUCCESS;
}
// merge plan
pTask->childReady = 0;
qClearSubplanExecutionNode(pTask->plan);
SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_INIT);
int32_t childrenNum = taosArrayGetSize(pTask->children);
for (int32_t i = 0; i < childrenNum; ++i) {
SSchTask* pChild = taosArrayGetP(pTask->children, i);
SCH_LOCK_TASK(pChild);
schDoTaskRedirect(pJob, pChild, NULL, rspCode);
SCH_UNLOCK_TASK(pChild);
}
return TSDB_CODE_SUCCESS;
_return:
code = schProcessOnTaskFailure(pJob, pTask, code);
SCH_RET(code);
}
int32_t schHandleRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf* pData, int32_t rspCode) {
int32_t code = 0;
if (SCH_IS_DATA_SRC_QRY_TASK(pTask)) {
if (NULL == pData->pEpSet) {
SCH_TASK_ELOG("no epset updated while got error %s", tstrerror(rspCode));
SCH_ERR_JRET(rspCode);
}
}
SCH_RET(schDoTaskRedirect(pJob, pTask, pData, rspCode));
_return:
schProcessOnTaskFailure(pJob, pTask, code);
SCH_RET(code);
}
int32_t schPushTaskToExecList(SSchJob *pJob, SSchTask *pTask) {
int32_t code = taosHashPut(pJob->execTasks, &pTask->taskId, sizeof(pTask->taskId), &pTask, POINTER_BYTES);
if (0 != code) {
if (HASH_NODE_EXIST(code)) {
SCH_TASK_ELOG("task already in execTask list, code:%x", code);
SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
}
SCH_TASK_ELOG("taosHashPut task to execTask list failed, errno:%d", errno);
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
SCH_TASK_DLOG("task added to execTask list, numOfTasks:%d", taosHashGetSize(pJob->execTasks));
return TSDB_CODE_SUCCESS;
}
/*
int32_t schMoveTaskToSuccList(SSchJob *pJob, SSchTask *pTask, bool *moved) {
if (0 != taosHashRemove(pJob->execTasks, &pTask->taskId, sizeof(pTask->taskId))) {
SCH_TASK_WLOG("remove task from execTask list failed, may not exist, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
} else {
SCH_TASK_DLOG("task removed from execTask list, numOfTasks:%d", taosHashGetSize(pJob->execTasks));
}
int32_t code = taosHashPut(pJob->succTasks, &pTask->taskId, sizeof(pTask->taskId), &pTask, POINTER_BYTES);
if (0 != code) {
if (HASH_NODE_EXIST(code)) {
*moved = true;
SCH_TASK_ELOG("task already in succTask list, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
}
SCH_TASK_ELOG("taosHashPut task to succTask list failed, errno:%d", errno);
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
*moved = true;
SCH_TASK_DLOG("task moved to succTask list, numOfTasks:%d", taosHashGetSize(pJob->succTasks));
return TSDB_CODE_SUCCESS;
}
int32_t schMoveTaskToFailList(SSchJob *pJob, SSchTask *pTask, bool *moved) {
*moved = false;
if (0 != taosHashRemove(pJob->execTasks, &pTask->taskId, sizeof(pTask->taskId))) {
SCH_TASK_WLOG("remove task from execTask list failed, may not exist, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
}
int32_t code = taosHashPut(pJob->failTasks, &pTask->taskId, sizeof(pTask->taskId), &pTask, POINTER_BYTES);
if (0 != code) {
if (HASH_NODE_EXIST(code)) {
*moved = true;
SCH_TASK_WLOG("task already in failTask list, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
}
SCH_TASK_ELOG("taosHashPut task to failTask list failed, errno:%d", errno);
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
*moved = true;
SCH_TASK_DLOG("task moved to failTask list, numOfTasks:%d", taosHashGetSize(pJob->failTasks));
return TSDB_CODE_SUCCESS;
}
int32_t schMoveTaskToExecList(SSchJob *pJob, SSchTask *pTask, bool *moved) {
if (0 != taosHashRemove(pJob->succTasks, &pTask->taskId, sizeof(pTask->taskId))) {
SCH_TASK_WLOG("remove task from succTask list failed, may not exist, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
}
int32_t code = taosHashPut(pJob->execTasks, &pTask->taskId, sizeof(pTask->taskId), &pTask, POINTER_BYTES);
if (0 != code) {
if (HASH_NODE_EXIST(code)) {
*moved = true;
SCH_TASK_ELOG("task already in execTask list, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
}
SCH_TASK_ELOG("taosHashPut task to execTask list failed, errno:%d", errno);
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
*moved = true;
SCH_TASK_DLOG("task moved to execTask list, numOfTasks:%d", taosHashGetSize(pJob->execTasks));
return TSDB_CODE_SUCCESS;
}
*/
int32_t schTaskCheckSetRetry(SSchJob *pJob, SSchTask *pTask, int32_t errCode, bool *needRetry) {
if (TSDB_CODE_SCH_TIMEOUT_ERROR == errCode) {
pTask->maxExecTimes++;
if (pTask->timeoutUsec < SCH_MAX_TASK_TIMEOUT_USEC) {
pTask->timeoutUsec *= 2;
if (pTask->timeoutUsec > SCH_MAX_TASK_TIMEOUT_USEC) {
pTask->timeoutUsec = SCH_MAX_TASK_TIMEOUT_USEC;
}
}
}
if ((pTask->execId + 1) >= pTask->maxExecTimes) {
*needRetry = false;
SCH_TASK_DLOG("task no more retry since reach max try times, execId:%d", pTask->execId);
return TSDB_CODE_SUCCESS;
}
if (!SCH_NEED_RETRY(pTask->lastMsgType, errCode)) {
*needRetry = false;
SCH_TASK_DLOG("task no more retry cause of errCode, errCode:%x - %s", errCode, tstrerror(errCode));
return TSDB_CODE_SUCCESS;
}
if (SCH_IS_DATA_SRC_TASK(pTask)) {
if ((pTask->execId + 1) >= SCH_TASK_NUM_OF_EPS(&pTask->plan->execNode)) {
*needRetry = false;
SCH_TASK_DLOG("task no more retry since all ep tried, execId:%d, epNum:%d", pTask->execId,
SCH_TASK_NUM_OF_EPS(&pTask->plan->execNode));
return TSDB_CODE_SUCCESS;
}
} else {
int32_t candidateNum = taosArrayGetSize(pTask->candidateAddrs);
if ((pTask->candidateIdx + 1) >= candidateNum && (TSDB_CODE_SCH_TIMEOUT_ERROR != errCode)) {
*needRetry = false;
SCH_TASK_DLOG("task no more retry since all candiates tried, candidateIdx:%d, candidateNum:%d",
pTask->candidateIdx, candidateNum);
return TSDB_CODE_SUCCESS;
}
}
*needRetry = true;
SCH_TASK_DLOG("task need the %dth retry, errCode:%x - %s", pTask->execId + 1, errCode, tstrerror(errCode));
return TSDB_CODE_SUCCESS;
}
int32_t schHandleTaskRetry(SSchJob *pJob, SSchTask *pTask) {
atomic_sub_fetch_32(&pTask->level->taskLaunchedNum, 1);
SCH_ERR_RET(schRemoveTaskFromExecList(pJob, pTask));
SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_INIT);
if (SCH_TASK_NEED_FLOW_CTRL(pJob, pTask)) {
SCH_ERR_RET(schLaunchTasksInFlowCtrlList(pJob, pTask));
}
schDeregisterTaskHb(pJob, pTask);
if (SCH_IS_DATA_SRC_TASK(pTask)) {
SCH_SWITCH_EPSET(&pTask->plan->execNode);
} else {
int32_t candidateNum = taosArrayGetSize(pTask->candidateAddrs);
if (++pTask->candidateIdx >= candidateNum) {
pTask->candidateIdx = 0;
}
}
SCH_ERR_RET(schLaunchTask(pJob, pTask));
return TSDB_CODE_SUCCESS;
}
int32_t schSetAddrsFromNodeList(SSchJob *pJob, SSchTask *pTask) {
int32_t addNum = 0;
int32_t nodeNum = 0;
if (pJob->nodeList) {
nodeNum = taosArrayGetSize(pJob->nodeList);
for (int32_t i = 0; i < nodeNum && addNum < SCH_MAX_CANDIDATE_EP_NUM; ++i) {
SQueryNodeLoad *nload = taosArrayGet(pJob->nodeList, i);
SQueryNodeAddr *naddr = &nload->addr;
if (NULL == taosArrayPush(pTask->candidateAddrs, naddr)) {
SCH_TASK_ELOG("taosArrayPush execNode to candidate addrs failed, addNum:%d, errno:%d", addNum, errno);
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
SCH_TASK_DLOG("set %dth candidate addr, id %d, fqdn:%s, port:%d", i, naddr->nodeId, SCH_GET_CUR_EP(naddr)->fqdn, SCH_GET_CUR_EP(naddr)->port);
++addNum;
}
}
if (addNum <= 0) {
SCH_TASK_ELOG("no available execNode as candidates, nodeNum:%d", nodeNum);
SCH_ERR_RET(TSDB_CODE_TSC_NO_EXEC_NODE);
}
return TSDB_CODE_SUCCESS;
}
int32_t schSetTaskCandidateAddrs(SSchJob *pJob, SSchTask *pTask) {
if (NULL != pTask->candidateAddrs) {
return TSDB_CODE_SUCCESS;
}
pTask->candidateIdx = 0;
pTask->candidateAddrs = taosArrayInit(SCH_MAX_CANDIDATE_EP_NUM, sizeof(SQueryNodeAddr));
if (NULL == pTask->candidateAddrs) {
SCH_TASK_ELOG("taosArrayInit %d condidate addrs failed", SCH_MAX_CANDIDATE_EP_NUM);
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
if (pTask->plan->execNode.epSet.numOfEps > 0) {
if (NULL == taosArrayPush(pTask->candidateAddrs, &pTask->plan->execNode)) {
SCH_TASK_ELOG("taosArrayPush execNode to candidate addrs failed, errno:%d", errno);
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
SCH_TASK_DLOG("use execNode in plan as candidate addr, numOfEps:%d", pTask->plan->execNode.epSet.numOfEps);
return TSDB_CODE_SUCCESS;
}
SCH_ERR_RET(schSetAddrsFromNodeList(pJob, pTask));
/*
for (int32_t i = 0; i < job->dataSrcEps.numOfEps && addNum < SCH_MAX_CANDIDATE_EP_NUM; ++i) {
strncpy(epSet->fqdn[epSet->numOfEps], job->dataSrcEps.fqdn[i], sizeof(job->dataSrcEps.fqdn[i]));
epSet->port[epSet->numOfEps] = job->dataSrcEps.port[i];
++epSet->numOfEps;
}
*/
return TSDB_CODE_SUCCESS;
}
int32_t schUpdateTaskCandidateAddr(SSchJob *pJob, SSchTask *pTask, SEpSet* pEpSet) {
if (NULL == pTask->candidateAddrs || 1 != taosArrayGetSize(pTask->candidateAddrs)) {
SCH_TASK_ELOG("not able to update cndidate addr, addr num %d", (int32_t)(pTask->candidateAddrs ? taosArrayGetSize(pTask->candidateAddrs): 0));
SCH_ERR_RET(TSDB_CODE_APP_ERROR);
}
SQueryNodeAddr* pAddr = taosArrayGet(pTask->candidateAddrs, 0);
SEp* pOld = &pAddr->epSet.eps[pAddr->epSet.inUse];
SEp* pNew = &pEpSet->eps[pEpSet->inUse];
SCH_TASK_DLOG("update task ep from %s:%d to %s:%d", pOld->fqdn, pOld->port, pNew->fqdn, pNew->port);
memcpy(&pAddr->epSet, pEpSet, sizeof(pAddr->epSet));
return TSDB_CODE_SUCCESS;
}
int32_t schRemoveTaskFromExecList(SSchJob *pJob, SSchTask *pTask) {
int32_t code = taosHashRemove(pJob->execTasks, &pTask->taskId, sizeof(pTask->taskId));
if (code) {
SCH_TASK_ELOG("task failed to rm from execTask list, code:%x", code);
SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
}
return TSDB_CODE_SUCCESS;
}
void schDropTaskOnExecNode(SSchJob *pJob, SSchTask *pTask) {
if (NULL == pTask->execNodes) {
SCH_TASK_DLOG("no exec address, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
return;
}
int32_t size = (int32_t)taosHashGetSize(pTask->execNodes);
if (size <= 0) {
SCH_TASK_DLOG("task has no execNodes, no need to drop it, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
return;
}
SSchNodeInfo *nodeInfo = taosHashIterate(pTask->execNodes, NULL);
while (nodeInfo) {
SCH_SET_TASK_HANDLE(pTask, nodeInfo->handle);
schBuildAndSendMsg(pJob, pTask, &nodeInfo->addr, TDMT_SCH_DROP_TASK);
nodeInfo = taosHashIterate(pTask->execNodes, nodeInfo);
}
SCH_TASK_DLOG("task has been dropped on %d exec nodes", size);
}
int32_t schProcessOnTaskStatusRsp(SQueryNodeEpId* pEpId, SArray* pStatusList) {
int32_t taskNum = (int32_t)taosArrayGetSize(pStatusList);
SSchTask *pTask = NULL;
qDebug("%d task status in hb rsp from nodeId:%d, fqdn:%s, port:%d", taskNum, pEpId->nodeId, pEpId->ep.fqdn, pEpId->ep.port);
for (int32_t i = 0; i < taskNum; ++i) {
STaskStatus *taskStatus = taosArrayGet(pStatusList, i);
qDebug("QID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d task status in server: %s",
taskStatus->queryId, taskStatus->taskId, taskStatus->execId, jobTaskStatusStr(taskStatus->status));
SSchJob *pJob = schAcquireJob(taskStatus->refId);
if (NULL == pJob) {
qWarn("job not found, refId:0x%" PRIx64 ",QID:0x%" PRIx64 ",TID:0x%" PRIx64, taskStatus->refId,
taskStatus->queryId, taskStatus->taskId);
// TODO DROP TASK FROM SERVER!!!!
continue;
}
pTask = NULL;
schGetTaskInJob(pJob, taskStatus->taskId, &pTask);
if (NULL == pTask) {
// TODO DROP TASK FROM SERVER!!!!
schReleaseJob(taskStatus->refId);
continue;
}
if (taskStatus->execId != pTask->execId) {
// TODO DROP TASK FROM SERVER!!!!
SCH_TASK_DLOG("EID %d in hb rsp mis-match", taskStatus->execId);
schReleaseJob(taskStatus->refId);
continue;
}
if (taskStatus->status == JOB_TASK_STATUS_FAIL) {
// RECORD AND HANDLE ERROR!!!!
schReleaseJob(taskStatus->refId);
continue;
}
if (taskStatus->status == JOB_TASK_STATUS_INIT) {
schRescheduleTask(pJob, pTask);
}
schReleaseJob(taskStatus->refId);
}
return TSDB_CODE_SUCCESS;
}
int32_t schLaunchTaskImpl(SSchJob *pJob, SSchTask *pTask) {
int8_t status = 0;
int32_t code = 0;
atomic_add_fetch_32(&pTask->level->taskLaunchedNum, 1);
pTask->execId++;
SCH_TASK_DLOG("start to launch task's %dth exec", pTask->execId);
SCH_LOG_TASK_START_TS(pTask);
if (schJobNeedToStop(pJob, &status)) {
SCH_TASK_DLOG("no need to launch task cause of job status, job status:%s", jobTaskStatusStr(status));
SCH_RET(atomic_load_32(&pJob->errCode));
}
// NOTE: race condition: the task should be put into the hash table before send msg to server
if (SCH_GET_TASK_STATUS(pTask) != JOB_TASK_STATUS_EXEC) {
SCH_ERR_RET(schPushTaskToExecList(pJob, pTask));
SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_EXEC);
}
SSubplan *plan = pTask->plan;
if (NULL == pTask->msg) { // TODO add more detailed reason for failure
code = qSubPlanToString(plan, &pTask->msg, &pTask->msgLen);
if (TSDB_CODE_SUCCESS != code) {
SCH_TASK_ELOG("failed to create physical plan, code:%s, msg:%p, len:%d", tstrerror(code), pTask->msg,
pTask->msgLen);
SCH_ERR_RET(code);
} else {
SCH_TASK_DLOGL("physical plan len:%d, %s", pTask->msgLen, pTask->msg);
}
}
SCH_ERR_RET(schSetTaskCandidateAddrs(pJob, pTask));
if (SCH_IS_QUERY_JOB(pJob)) {
SCH_ERR_RET(schEnsureHbConnection(pJob, pTask));
}
SCH_ERR_RET(schBuildAndSendMsg(pJob, pTask, NULL, plan->msgType));
return TSDB_CODE_SUCCESS;
}
// Note: no more error processing, handled in function internal
int32_t schLaunchTask(SSchJob *pJob, SSchTask *pTask) {
bool enough = false;
int32_t code = 0;
SCH_SET_TASK_HANDLE(pTask, NULL);
if (SCH_TASK_NEED_FLOW_CTRL(pJob, pTask)) {
SCH_ERR_JRET(schCheckIncTaskFlowQuota(pJob, pTask, &enough));
if (enough) {
SCH_ERR_JRET(schLaunchTaskImpl(pJob, pTask));
}
} else {
SCH_ERR_JRET(schLaunchTaskImpl(pJob, pTask));
}
return TSDB_CODE_SUCCESS;
_return:
SCH_RET(schProcessOnTaskFailure(pJob, pTask, code));
}
int32_t schLaunchLevelTasks(SSchJob *pJob, SSchLevel *level) {
SCH_ERR_RET(schChkJobNeedFlowCtrl(pJob, level));
for (int32_t i = 0; i < level->taskNum; ++i) {
SSchTask *pTask = taosArrayGet(level->subTasks, i);
SCH_ERR_RET(schLaunchTask(pJob, pTask));
}
return TSDB_CODE_SUCCESS;
}
void schDropTaskInHashList(SSchJob *pJob, SHashObj *list) {
if (!SCH_IS_NEED_DROP_JOB(pJob)) {
return;
}
void *pIter = taosHashIterate(list, NULL);
while (pIter) {
SSchTask *pTask = *(SSchTask **)pIter;
schDropTaskOnExecNode(pJob, pTask);
pIter = taosHashIterate(list, pIter);
}
}
// Note: no more error processing, handled in function internal
int32_t schLaunchFetchTask(SSchJob *pJob) {
int32_t code = 0;
void *resData = atomic_load_ptr(&pJob->resData);
if (resData) {
SCH_JOB_DLOG("res already fetched, res:%p", resData);
return TSDB_CODE_SUCCESS;
}
SCH_ERR_JRET(schBuildAndSendMsg(pJob, pJob->fetchTask, &pJob->resNode, TDMT_SCH_FETCH));
return TSDB_CODE_SUCCESS;
_return:
SCH_RET(schProcessOnTaskFailure(pJob, pJob->fetchTask, code));
}

View File

@ -283,3 +283,20 @@ void schFreeSMsgSendInfo(SMsgSendInfo *msgSendInfo) {
taosMemoryFree(msgSendInfo);
}
int32_t schGetTaskFromList(SHashObj *pTaskList, uint64_t taskId, SSchTask **pTask) {
int32_t s = taosHashGetSize(pTaskList);
if (s <= 0) {
return TSDB_CODE_SUCCESS;
}
SSchTask **task = taosHashGet(pTaskList, &taskId, sizeof(taskId));
if (NULL == task || NULL == (*task)) {
return TSDB_CODE_SUCCESS;
}
*pTask = *task;
return TSDB_CODE_SUCCESS;
}

View File

@ -67,49 +67,22 @@ int32_t schedulerInit(SSchedulerCfg *cfg) {
return TSDB_CODE_SUCCESS;
}
int32_t schedulerExecJob(SSchedulerReq *pReq, int64_t *pJobId, SQueryResult *pRes) {
qDebug("scheduler sync exec job start");
int32_t schedulerExecJob(SSchedulerReq *pReq, int64_t *pJobId) {
qDebug("scheduler %s exec job start", pReq->syncReq ? "SYNC" : "ASYNC");
int32_t code = 0;
SSchJob *pJob = NULL;
SCH_ERR_JRET(schInitJob(pReq, &pJob));
SCH_ERR_RET(schJobStatusEnter(&pJob, JOB_TASK_STATUS_INIT, pReq));
SCH_ERR_RET(schJobStatusEnter(&pJob, JOB_TASK_STATUS_EXEC, pReq));
*pJobId = pJob->refId;
SCH_ERR_JRET(schExecJobImpl(pReq, pJob, true));
_return:
if (code && NULL == pJob) {
qDestroyQueryPlan(pReq->pDag);
}
if (pJob) {
schSetJobQueryRes(pJob, pRes);
schReleaseJob(pJob->refId);
}
return code;
}
int32_t schedulerAsyncExecJob(SSchedulerReq *pReq, int64_t *pJobId) {
qDebug("scheduler async exec job start");
int32_t code = 0;
SSchJob *pJob = NULL;
SCH_ERR_JRET(schInitJob(pReq, &pJob));
*pJobId = pJob->refId;
SCH_ERR_JRET(schExecJobImpl(pReq, pJob, false));
_return:
if (code && NULL == pJob) {
qDestroyQueryPlan(pReq->pDag);
}
if (pJob) {
schSetJobQueryRes(pJob, pReq->pQueryRes);
schReleaseJob(pJob->refId);
}
@ -133,14 +106,14 @@ int32_t schedulerFetchRows(int64_t job, void **pData) {
SCH_ERR_RET(schBeginOperation(pJob, SCH_OP_FETCH, true));
pJob->userRes.fetchRes = pData;
code = schFetchRows(pJob);
code = schJobFetchRows(pJob);
schReleaseJob(job);
SCH_RET(code);
}
void schedulerAsyncFetchRows(int64_t job, schedulerFetchFp fp, void* param) {
void schedulerFetchRowsA(int64_t job, schedulerFetchFp fp, void* param) {
qDebug("scheduler async fetch rows start");
int32_t code = 0;
@ -159,7 +132,7 @@ void schedulerAsyncFetchRows(int64_t job, schedulerFetchFp fp, void* param) {
pJob->userRes.fetchFp = fp;
pJob->userRes.userParam = param;
SCH_ERR_JRET(schAsyncFetchRows(pJob));
SCH_ERR_JRET(schJobFetchRowsA(pJob));
_return:
@ -178,7 +151,7 @@ int32_t schedulerGetTasksStatus(int64_t job, SArray *pSub) {
SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
}
if (pJob->status < JOB_TASK_STATUS_NOT_START || pJob->levelNum <= 0 || NULL == pJob->levels) {
if (pJob->status < JOB_TASK_STATUS_INIT || pJob->levelNum <= 0 || NULL == pJob->levels) {
qDebug("job not initialized or not executable job, refId:0x%" PRIx64, job);
SCH_ERR_JRET(TSDB_CODE_SCH_STATUS_ERROR);
}

View File

@ -507,6 +507,7 @@ void* schtRunJobThread(void *aa) {
SRequestConnInfo conn = {0};
conn.pTrans = mockPointer;
SSchedulerReq req = {0};
req.syncReq = false;
req.pConn = &conn;
req.pNodeList = qnodeList;
req.pDag = &dag;
@ -514,7 +515,7 @@ void* schtRunJobThread(void *aa) {
req.execFp = schtQueryCb;
req.execParam = &queryDone;
code = schedulerAsyncExecJob(&req, &queryJobRefId);
code = schedulerExecJob(&req, &queryJobRefId);
assert(code == 0);
pJob = schAcquireJob(queryJobRefId);
@ -658,7 +659,7 @@ TEST(queryTest, normalCase) {
SRequestConnInfo conn = {0};
conn.pTrans = mockPointer;
SSchedulerReq req = {0};
SSchedulerReq req = {0};
req.pConn = &conn;
req.pNodeList = qnodeList;
req.pDag = &dag;
@ -666,7 +667,7 @@ TEST(queryTest, normalCase) {
req.execFp = schtQueryCb;
req.execParam = &queryDone;
code = schedulerAsyncExecJob(&req, &job);
code = schedulerExecJob(&req, &job);
ASSERT_EQ(code, 0);
@ -769,7 +770,7 @@ TEST(queryTest, readyFirstCase) {
req.sql = "select * from tb";
req.execFp = schtQueryCb;
req.execParam = &queryDone;
code = schedulerAsyncExecJob(&req, &job);
code = schedulerExecJob(&req, &job);
ASSERT_EQ(code, 0);
@ -877,7 +878,7 @@ TEST(queryTest, flowCtrlCase) {
req.execFp = schtQueryCb;
req.execParam = &queryDone;
code = schedulerAsyncExecJob(&req, &job);
code = schedulerExecJob(&req, &job);
ASSERT_EQ(code, 0);