enh: add schedule policy
This commit is contained in:
parent
a0f7cc6790
commit
12b961c7f5
|
@ -25,13 +25,6 @@ extern "C" {
|
|||
|
||||
extern tsem_t schdRspSem;
|
||||
|
||||
typedef struct SSchedulerCfg {
|
||||
uint32_t maxJobNum;
|
||||
int32_t maxNodeTableNum;
|
||||
SCH_POLICY schPolicy;
|
||||
bool enableReSchedule;
|
||||
} SSchedulerCfg;
|
||||
|
||||
typedef struct SQueryProfileSummary {
|
||||
int64_t startTs; // Object created and added into the message queue
|
||||
int64_t endTs; // the timestamp when the task is completed
|
||||
|
@ -86,7 +79,7 @@ typedef struct SSchedulerReq {
|
|||
} SSchedulerReq;
|
||||
|
||||
|
||||
int32_t schedulerInit(SSchedulerCfg *cfg);
|
||||
int32_t schedulerInit(void);
|
||||
|
||||
int32_t schedulerExecJob(SSchedulerReq *pReq, int64_t *pJob);
|
||||
|
||||
|
|
|
@ -834,6 +834,7 @@ void schedulerExecCb(SExecResult* pResult, void* param, int32_t code) {
|
|||
tscDebug("0x%" PRIx64 " client retry to handle the error, code:%d - %s, tryCount:%d, reqId:0x%" PRIx64,
|
||||
pRequest->self, code, tstrerror(code), pRequest->retry, pRequest->requestId);
|
||||
pRequest->prevCode = code;
|
||||
schedulerFreeJob(&pRequest->body.queryJob, 0);
|
||||
doAsyncQuery(pRequest, true);
|
||||
return;
|
||||
}
|
||||
|
|
|
@ -8,9 +8,9 @@ target_include_directories(
|
|||
|
||||
target_link_libraries(
|
||||
command
|
||||
PRIVATE os util nodes catalog function transport qcom
|
||||
PRIVATE os util nodes catalog function transport qcom scheduler
|
||||
)
|
||||
|
||||
if(${BUILD_TEST})
|
||||
ADD_SUBDIRECTORY(test)
|
||||
endif(${BUILD_TEST})
|
||||
endif(${BUILD_TEST})
|
||||
|
|
|
@ -17,6 +17,8 @@
|
|||
#include "catalog.h"
|
||||
#include "tdatablock.h"
|
||||
#include "tglobal.h"
|
||||
#include "commandInt.h"
|
||||
#include "scheduler.h"
|
||||
|
||||
extern SConfig* tsCfg;
|
||||
|
||||
|
|
|
@ -378,6 +378,7 @@ void qwDbgDumpMgmtInfo(SQWorker *mgmt);
|
|||
int32_t qwDbgValidateStatus(QW_FPARAMS_DEF, int8_t oriStatus, int8_t newStatus, bool *ignore);
|
||||
int32_t qwDbgBuildAndSendRedirectRsp(int32_t rspType, SRpcHandleInfo *pConn, int32_t code, SEpSet *pEpSet);
|
||||
int32_t qwAddTaskCtx(QW_FPARAMS_DEF);
|
||||
int32_t qwDbgResponseRedirect(SQWMsg *qwMsg, SQWTaskCtx *ctx);
|
||||
|
||||
|
||||
#ifdef __cplusplus
|
||||
|
|
|
@ -9,7 +9,7 @@
|
|||
#include "tmsg.h"
|
||||
#include "tname.h"
|
||||
|
||||
SQWDebug gQWDebug = {.statusEnable = true, .dumpEnable = false, .tmp = true};
|
||||
SQWDebug gQWDebug = {.statusEnable = true, .dumpEnable = false, .tmp = false};
|
||||
|
||||
int32_t qwDbgValidateStatus(QW_FPARAMS_DEF, int8_t oriStatus, int8_t newStatus, bool *ignore) {
|
||||
if (!gQWDebug.statusEnable) {
|
||||
|
@ -147,9 +147,9 @@ int32_t qwDbgBuildAndSendRedirectRsp(int32_t rspType, SRpcHandleInfo *pConn, int
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t qwDbgResponseREdirect(SQWMsg *qwMsg, SQWTaskCtx *ctx) {
|
||||
int32_t qwDbgResponseRedirect(SQWMsg *qwMsg, SQWTaskCtx *ctx) {
|
||||
if (gQWDebug.tmp) {
|
||||
if (TDMT_SCH_QUERY == qwMsg->msgType) {
|
||||
if (TDMT_SCH_QUERY == qwMsg->msgType && (0 == taosRand() % 3)) {
|
||||
SEpSet epSet = {0};
|
||||
epSet.inUse = 1;
|
||||
epSet.numOfEps = 3;
|
||||
|
@ -159,16 +159,15 @@ int32_t qwDbgResponseREdirect(SQWMsg *qwMsg, SQWTaskCtx *ctx) {
|
|||
epSet.eps[1].port = 7200;
|
||||
strcpy(epSet.eps[2].fqdn, "localhost");
|
||||
epSet.eps[2].port = 7300;
|
||||
|
||||
|
||||
ctx->phase = QW_PHASE_POST_QUERY;
|
||||
qwDbgBuildAndSendRedirectRsp(qwMsg->msgType + 1, &qwMsg->connInfo, TSDB_CODE_RPC_REDIRECT, &epSet);
|
||||
gQWDebug.tmp = false;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
if (TDMT_SCH_MERGE_QUERY == qwMsg->msgType) {
|
||||
if (TDMT_SCH_MERGE_QUERY == qwMsg->msgType && (0 == taosRand() % 3)) {
|
||||
ctx->phase = QW_PHASE_POST_QUERY;
|
||||
qwDbgBuildAndSendRedirectRsp(qwMsg->msgType + 1, &qwMsg->connInfo, TSDB_CODE_RPC_REDIRECT, NULL);
|
||||
gQWDebug.tmp = false;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -315,7 +315,7 @@ int32_t qWorkerPreprocessQueryMsg(void *qWorkerMgmt, SRpcMsg *pMsg) {
|
|||
int64_t rId = msg->refId;
|
||||
int32_t eId = msg->execId;
|
||||
|
||||
SQWMsg qwMsg = {.msg = msg->msg + msg->sqlLen, .msgLen = msg->phyLen, .connInfo = pMsg->info};
|
||||
SQWMsg qwMsg = {.msgType = pMsg->msgType, .msg = msg->msg + msg->sqlLen, .msgLen = msg->phyLen, .connInfo = pMsg->info};
|
||||
|
||||
QW_SCH_TASK_DLOG("prerocessQuery start, handle:%p", pMsg->info.handle);
|
||||
QW_ERR_RET(qwPreprocessQuery(QW_FPARAMS(), &qwMsg));
|
||||
|
|
|
@ -488,6 +488,8 @@ int32_t qwPreprocessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
|
|||
|
||||
QW_ERR_JRET(qwAddTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_INIT));
|
||||
|
||||
qwDbgResponseRedirect(qwMsg, ctx);
|
||||
|
||||
_return:
|
||||
|
||||
if (ctx) {
|
||||
|
|
|
@ -135,6 +135,13 @@ typedef struct SSchStatusFps {
|
|||
schStatusEventFp eventFp;
|
||||
} SSchStatusFps;
|
||||
|
||||
typedef struct SSchedulerCfg {
|
||||
uint32_t maxJobNum;
|
||||
int32_t maxNodeTableNum;
|
||||
SCH_POLICY schPolicy;
|
||||
bool enableReSchedule;
|
||||
} SSchedulerCfg;
|
||||
|
||||
typedef struct SSchedulerMgmt {
|
||||
uint64_t taskId; // sequential taksId
|
||||
uint64_t sId; // schedulerId
|
||||
|
@ -275,7 +282,7 @@ typedef struct SSchJob {
|
|||
|
||||
extern SSchedulerMgmt schMgmt;
|
||||
|
||||
#define SCH_TASK_TIMEOUT(_task) ((taosGetTimestampUs() - (_task)->profile.execTime[(_task)->execId % (_task)->maxExecTimes]) > (_task)->timeoutUsec)
|
||||
#define SCH_TASK_TIMEOUT(_task) ((taosGetTimestampUs() - *(int64_t*)taosArrayGet((_task)->profile.execTime, (_task)->execId)) > (_task)->timeoutUsec)
|
||||
|
||||
#define SCH_TASK_READY_FOR_LAUNCH(readyNum, task) ((readyNum) >= taosArrayGetSize((task)->children))
|
||||
|
||||
|
@ -479,9 +486,12 @@ void schFreeTask(SSchJob *pJob, SSchTask *pTask);
|
|||
void schDropTaskInHashList(SSchJob *pJob, SHashObj *list);
|
||||
int32_t schLaunchLevelTasks(SSchJob *pJob, SSchLevel *level);
|
||||
int32_t schGetTaskFromList(SHashObj *pTaskList, uint64_t taskId, SSchTask **pTask);
|
||||
int32_t schInitTask(SSchJob *pJob, SSchTask *pTask, SSubplan *pPlan, SSchLevel *pLevel, int32_t levelNum);
|
||||
int32_t schInitTask(SSchJob *pJob, SSchTask *pTask, SSubplan *pPlan, SSchLevel *pLevel);
|
||||
int32_t schSwitchTaskCandidateAddr(SSchJob *pJob, SSchTask *pTask);
|
||||
void schDirectPostJobRes(SSchedulerReq* pReq, int32_t errCode);
|
||||
int32_t schHandleJobFailure(SSchJob *pJob, int32_t errCode);
|
||||
int32_t schHandleJobDrop(SSchJob *pJob, int32_t errCode);
|
||||
|
||||
|
||||
extern SSchDebug gSCHDebug;
|
||||
|
||||
|
|
|
@ -343,7 +343,7 @@ int32_t schValidateAndBuildJob(SQueryPlan *pDag, SSchJob *pJob) {
|
|||
SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
SCH_ERR_JRET(schInitTask(pJob, pTask, plan, pLevel, levelNum));
|
||||
SCH_ERR_JRET(schInitTask(pJob, pTask, plan, pLevel));
|
||||
|
||||
SCH_ERR_JRET(schAppendJobDataSrc(pJob, pTask));
|
||||
|
||||
|
@ -464,7 +464,7 @@ void schPostJobRes(SSchJob *pJob, SCH_OP_TYPE op) {
|
|||
}
|
||||
}
|
||||
|
||||
int32_t schProcessOnJobFailureImpl(SSchJob *pJob, int32_t status, int32_t errCode) {
|
||||
int32_t schProcessOnJobFailure(SSchJob *pJob, int32_t errCode) {
|
||||
schUpdateJobErrCode(pJob, errCode);
|
||||
|
||||
int32_t code = atomic_load_32(&pJob->errCode);
|
||||
|
@ -477,21 +477,29 @@ int32_t schProcessOnJobFailureImpl(SSchJob *pJob, int32_t status, int32_t errCod
|
|||
SCH_RET(TSDB_CODE_SCH_IGNORE_ERROR);
|
||||
}
|
||||
|
||||
// Note: no more task error processing, handled in function internal
|
||||
int32_t schProcessOnJobFailure(SSchJob *pJob, int32_t errCode) {
|
||||
int32_t schHandleJobFailure(SSchJob *pJob, int32_t errCode) {
|
||||
if (TSDB_CODE_SCH_IGNORE_ERROR == errCode) {
|
||||
return TSDB_CODE_SCH_IGNORE_ERROR;
|
||||
}
|
||||
|
||||
schProcessOnJobFailureImpl(pJob, JOB_TASK_STATUS_FAIL, errCode);
|
||||
schSwitchJobStatus(pJob, JOB_TASK_STATUS_FAIL, &errCode);
|
||||
return TSDB_CODE_SCH_IGNORE_ERROR;
|
||||
}
|
||||
|
||||
// Note: no more error processing, handled in function internal
|
||||
int32_t schProcessOnJobDropped(SSchJob *pJob, int32_t errCode) {
|
||||
SCH_RET(schProcessOnJobFailureImpl(pJob, JOB_TASK_STATUS_DROP, errCode));
|
||||
SCH_RET(schProcessOnJobFailure(pJob, errCode));
|
||||
}
|
||||
|
||||
int32_t schHandleJobDrop(SSchJob *pJob, int32_t errCode) {
|
||||
if (TSDB_CODE_SCH_IGNORE_ERROR == errCode) {
|
||||
return TSDB_CODE_SCH_IGNORE_ERROR;
|
||||
}
|
||||
|
||||
schSwitchJobStatus(pJob, JOB_TASK_STATUS_DROP, &errCode);
|
||||
return TSDB_CODE_SCH_IGNORE_ERROR;
|
||||
}
|
||||
|
||||
|
||||
int32_t schProcessOnJobPartialSuccess(SSchJob *pJob) {
|
||||
schPostJobRes(pJob, SCH_OP_EXEC);
|
||||
|
||||
|
@ -804,7 +812,7 @@ void schProcessOnOpEnd(SSchJob *pJob, SCH_OP_TYPE type, SSchedulerReq* pReq, int
|
|||
}
|
||||
|
||||
if (errCode) {
|
||||
schSwitchJobStatus(pJob, JOB_TASK_STATUS_FAIL, (void*)&errCode);
|
||||
schHandleJobFailure(pJob, errCode);
|
||||
}
|
||||
|
||||
SCH_JOB_DLOG("job end %s operation with code %s", schGetOpStr(type), tstrerror(errCode));
|
||||
|
@ -877,7 +885,7 @@ void schProcessOnCbEnd(SSchJob *pJob, SSchTask *pTask, int32_t errCode) {
|
|||
}
|
||||
|
||||
if (errCode) {
|
||||
schSwitchJobStatus(pJob, JOB_TASK_STATUS_FAIL, (void*)&errCode);
|
||||
schHandleJobFailure(pJob, errCode);
|
||||
}
|
||||
|
||||
if (pJob) {
|
||||
|
|
|
@ -45,7 +45,7 @@ void schFreeTask(SSchJob *pJob, SSchTask *pTask) {
|
|||
taosArrayDestroy(pTask->profile.execTime);
|
||||
}
|
||||
|
||||
void schInitTaskRetryTimes(SSchJob *pJob, SSchTask *pTask, SSchLevel *pLevel, int32_t levelNum) {
|
||||
void schInitTaskRetryTimes(SSchJob *pJob, SSchTask *pTask, SSchLevel *pLevel) {
|
||||
if (SCH_IS_DATA_BIND_TASK(pTask) || (!SCH_IS_QUERY_JOB(pJob)) || (SCH_ALL != schMgmt.cfg.schPolicy)) {
|
||||
pTask->maxRetryTimes = SCH_MAX_CANDIDATE_EP_NUM;
|
||||
} else {
|
||||
|
@ -53,10 +53,10 @@ void schInitTaskRetryTimes(SSchJob *pJob, SSchTask *pTask, SSchLevel *pLevel, in
|
|||
pTask->maxRetryTimes = TMAX(nodeNum, SCH_MAX_CANDIDATE_EP_NUM);
|
||||
}
|
||||
|
||||
pTask->maxExecTimes = pTask->maxRetryTimes * (levelNum - pLevel->level);
|
||||
pTask->maxExecTimes = pTask->maxRetryTimes * (pLevel->level + 1);
|
||||
}
|
||||
|
||||
int32_t schInitTask(SSchJob *pJob, SSchTask *pTask, SSubplan *pPlan, SSchLevel *pLevel, int32_t levelNum) {
|
||||
int32_t schInitTask(SSchJob *pJob, SSchTask *pTask, SSubplan *pPlan, SSchLevel *pLevel) {
|
||||
int32_t code = 0;
|
||||
|
||||
pTask->plan = pPlan;
|
||||
|
@ -67,7 +67,7 @@ int32_t schInitTask(SSchJob *pJob, SSchTask *pTask, SSubplan *pPlan, SSchLevel *
|
|||
pTask->execNodes =
|
||||
taosHashInit(SCH_MAX_CANDIDATE_EP_NUM, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
|
||||
|
||||
schInitTaskRetryTimes(pJob, pTask, pLevel, levelNum);
|
||||
schInitTaskRetryTimes(pJob, pTask, pLevel);
|
||||
|
||||
pTask->profile.execTime = taosArrayInit(pTask->maxExecTimes, sizeof(int64_t));
|
||||
if (NULL == pTask->execNodes || NULL == pTask->profile.execTime) {
|
||||
|
@ -76,6 +76,8 @@ int32_t schInitTask(SSchJob *pJob, SSchTask *pTask, SSubplan *pPlan, SSchLevel *
|
|||
|
||||
SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_INIT);
|
||||
|
||||
SCH_TASK_DLOG("task initialized, max times %d:%d", pTask->maxRetryTimes, pTask->maxExecTimes);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
||||
_return:
|
||||
|
@ -118,7 +120,7 @@ int32_t schDropTaskExecNode(SSchJob *pJob, SSchTask *pTask, void *handle, int32_
|
|||
}
|
||||
|
||||
if (taosHashRemove(pTask->execNodes, &execId, sizeof(execId))) {
|
||||
SCH_TASK_ELOG("fail to remove execId %d from execNodeList", execId);
|
||||
SCH_TASK_DLOG("execId %d already not in execNodeList", execId);
|
||||
} else {
|
||||
SCH_TASK_DLOG("execId %d removed from execNodeList", execId);
|
||||
}
|
||||
|
@ -248,7 +250,7 @@ int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) {
|
|||
}
|
||||
|
||||
if (pTask->level->taskFailed > 0) {
|
||||
SCH_RET(schSwitchJobStatus(pJob, JOB_TASK_STATUS_FAIL, NULL));
|
||||
SCH_RET(schHandleJobFailure(pJob, pJob->errCode));
|
||||
} else {
|
||||
SCH_RET(schSwitchJobStatus(pJob, JOB_TASK_STATUS_PART_SUCC, NULL));
|
||||
}
|
||||
|
@ -321,13 +323,17 @@ int32_t schRescheduleTask(SSchJob *pJob, SSchTask *pTask) {
|
|||
int32_t schDoTaskRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf *pData, int32_t rspCode) {
|
||||
int32_t code = 0;
|
||||
|
||||
if ((pTask->execId + 1) >= pTask->maxExecTimes) {
|
||||
SCH_TASK_DLOG("task no more retry since reach max try times, execId:%d", pTask->execId);
|
||||
schSwitchJobStatus(pJob, JOB_TASK_STATUS_FAIL, (void *)&rspCode);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
SCH_TASK_DLOG("task will be redirected now, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
|
||||
|
||||
if (NULL == pData) {
|
||||
pTask->retryTimes = 0;
|
||||
}
|
||||
|
||||
SCH_TASK_DLOG("task will be redirected now, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
|
||||
if (((pTask->execId + 1) >= pTask->maxExecTimes) || ((pTask->retryTimes + 1) > pTask->maxRetryTimes)) {
|
||||
SCH_TASK_DLOG("task no more retry since reach max times %d:%d, execId %d", pTask->maxRetryTimes, pTask->maxExecTimes, pTask->execId);
|
||||
schHandleJobFailure(pJob, rspCode);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
schDropTaskOnExecNode(pJob, pTask);
|
||||
taosHashClear(pTask->execNodes);
|
||||
|
@ -337,7 +343,6 @@ int32_t schDoTaskRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf *pData, int32
|
|||
taosMemoryFreeClear(pTask->msg);
|
||||
pTask->msgLen = 0;
|
||||
pTask->lastMsgType = 0;
|
||||
pTask->retryTimes = 0;
|
||||
memset(&pTask->succeedAddr, 0, sizeof(pTask->succeedAddr));
|
||||
|
||||
if (SCH_IS_DATA_BIND_TASK(pTask)) {
|
||||
|
@ -784,7 +789,7 @@ int32_t schLaunchTaskImpl(SSchJob *pJob, SSchTask *pTask) {
|
|||
pTask->execId++;
|
||||
pTask->retryTimes++;
|
||||
|
||||
SCH_TASK_DLOG("start to launch task's %dth exec", pTask->execId);
|
||||
SCH_TASK_DLOG("start to launch task, execId %d, retry %d", pTask->execId, pTask->retryTimes);
|
||||
|
||||
SCH_LOG_TASK_START_TS(pTask);
|
||||
|
||||
|
|
|
@ -154,7 +154,7 @@ void schedulerFreeJob(int64_t* jobId, int32_t errCode) {
|
|||
return;
|
||||
}
|
||||
|
||||
schSwitchJobStatus(pJob, JOB_TASK_STATUS_DROP, (void*)&errCode);
|
||||
schHandleJobDrop(pJob, errCode);
|
||||
|
||||
schReleaseJob(*jobId);
|
||||
*jobId = 0;
|
||||
|
|
|
@ -753,6 +753,7 @@ static void cliHandleRelease(SCliMsg* pMsg, SCliThrd* pThrd) {
|
|||
SExHandle* exh = transAcquireExHandle(transGetRefMgt(), refId);
|
||||
if (exh == NULL) {
|
||||
tDebug("%" PRId64 " already release", refId);
|
||||
return;
|
||||
}
|
||||
|
||||
SCliConn* conn = exh->handle;
|
||||
|
|
|
@ -135,7 +135,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TSC_STMT_API_ERROR, "Stmt API usage error"
|
|||
TAOS_DEFINE_ERROR(TSDB_CODE_TSC_STMT_TBNAME_ERROR, "Stmt table name not set")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_TSC_STMT_CLAUSE_ERROR, "not supported stmt clause")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_TSC_QUERY_KILLED, "Query killed")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_TSC_NO_EXEC_NODE, "No available execution node")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_TSC_NO_EXEC_NODE, "No available execution node in current query policy configuration")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_TSC_NOT_STABLE_ERROR, "Table is not a super table")
|
||||
|
||||
// mnode-common
|
||||
|
|
Loading…
Reference in New Issue