rescheduler timeout task

This commit is contained in:
dapan1121 2022-06-03 20:44:32 +08:00
parent f6c6083aad
commit b6e60082ae
9 changed files with 101 additions and 52 deletions

View File

@ -564,7 +564,8 @@ int32_t* taosGetErrno();
#define TSDB_CODE_SCH_STATUS_ERROR TAOS_DEF_ERROR_CODE(0, 0x2501)
#define TSDB_CODE_SCH_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x2502)
#define TSDB_CODE_SCH_IGNORE_ERROR TAOS_DEF_ERROR_CODE(0, 0x2503)
#define TSDB_CODE_QW_MSG_ERROR TAOS_DEF_ERROR_CODE(0, 0x2504)
#define TSDB_CODE_SCH_TIMEOUT_ERROR TAOS_DEF_ERROR_CODE(0, 0x2504)
#define TSDB_CODE_QW_MSG_ERROR TAOS_DEF_ERROR_CODE(0, 0x2550)
//parser
#define TSDB_CODE_PAR_SYNTAX_ERROR TAOS_DEF_ERROR_CODE(0, 0x2600)

View File

@ -131,7 +131,6 @@ void destroyTscObj(void *pObj) {
hbDeregisterConn(pTscObj->pAppInfo->pAppHbMgr, connKey);
atomic_sub_fetch_64(&pTscObj->pAppInfo->numOfConns, 1);
closeAllRequests(pTscObj->pRequests);
schedulerStopTransport(pTscObj->pAppInfo->pTransporter);
tscDebug("connObj 0x%" PRIx64 " destroyed, totalConn:%" PRId64, pTscObj->id, pTscObj->pAppInfo->numOfConns);
taosThreadMutexDestroy(&pTscObj->mutex);
taosMemoryFreeClear(pTscObj);

View File

@ -66,10 +66,11 @@ void taos_cleanup(void) {
hbMgrCleanUp();
rpcCleanup();
catalogDestroy();
schedulerDestroy();
rpcCleanup();
tscInfo("all local resources released");
taosCleanupCfg();
taosCloseLog();

View File

@ -274,7 +274,7 @@ int32_t qWorkerPreprocessQueryMsg(void *qWorkerMgmt, SRpcMsg *pMsg) {
uint64_t tId = msg->taskId;
int64_t rId = msg->refId;
SQWMsg qwMsg = {.node = node, .msg = msg->msg + msg->sqlLen, .msgLen = msg->phyLen, .connInfo = pMsg->info};
SQWMsg qwMsg = {.msg = msg->msg + msg->sqlLen, .msgLen = msg->phyLen, .connInfo = pMsg->info};
QW_SCH_TASK_DLOG("prerocessQuery start, handle:%p", pMsg->info.handle);
QW_ERR_RET(qwPrerocessQuery(QW_FPARAMS(), &qwMsg));

View File

@ -163,9 +163,11 @@ typedef struct SSchTaskProfile {
typedef struct SSchTask {
uint64_t taskId; // task id
int32_t execIdx; // task current execute try index
SRWLatch lock; // task lock
int32_t maxExecTimes; // task may exec times
int32_t execIdx; // task current execute try index
SSchLevel *level; // level
SRWLatch planLock; // task update plan lock
SSubplan *plan; // subplan
char *msg; // operator tree
int32_t msgLen; // msg length
@ -230,26 +232,39 @@ typedef struct SSchJob {
extern SSchedulerMgmt schMgmt;
#define SCH_LOG_TASK_START_TS(_task) \
#define SCH_LOG_TASK_START_TS(_task) \
do { \
int64_t us = taosGetTimestampUs(); \
int32_t idx = (_task)->execIdx % SCH_TASK_MAX_EXEC_TIMES; \
(_task)->profile.execUseTime[idx] = us; \
if (0 == (_task)->execIdx) { \
(_task)->profile.startTs = us; \
} \
} while (0)
#define SCH_LOG_TASK_WAIT_TS(_task) \
do { \
int64_t us = taosGetTimestampUs(); \
(_task)->profile.tryUseTime[(_task)->execIdx] = us; \
if (0 == (_task)->execIdx) { \
(_task)->profile.startTs = us; \
} \
int32_t idx = (_task)->execIdx % SCH_TASK_MAX_EXEC_TIMES; \
(_task)->profile.waitTime += us - (_task)->profile.execUseTime[idx]; \
} while (0)
#define SCH_LOG_TASK_END_TS(_task) \
do { \
int64_t us = taosGetTimestampUs(); \
(_task)->profile.tryUseTime[(_task)->execIdx] = us - (_task)->profile.tryUseTime[(_task)->execIdx]; \
int32_t idx = (_task)->execIdx % SCH_TASK_MAX_EXEC_TIMES; \
(_task)->profile.execUseTime[idx] = us - (_task)->profile.execUseTime[idx]; \
(_task)->profile.endTs = us; \
} while (0)
#define SCH_TASK_TIMEOUT(_task) ((taosGetTimestampUs() - (_task)->profile.tryUseTime[(_task)->execIdx]) > (_taks)->timeoutUsec)
#define SCH_TASK_TIMEOUT(_task) ((taosGetTimestampUs() - (_task)->profile.execUseTime[(_task)->execIdx % SCH_TASK_MAX_EXEC_TIMES]) > (_task)->timeoutUsec)
#define SCH_TASK_READY_FOR_LAUNCH(readyNum, task) ((readyNum) >= taosArrayGetSize((task)->children))
#define SCH_LOCK_TASK(_task) SCH_LOCK(SCH_WRITE, &(_task)->lock)
#define SCH_UNLOCK_TASK(_task) SCH_UNLOCK(SCH_WRITE, &(_task)->lock)
#define SCH_TASK_ID(_task) ((_task) ? (_task)->taskId : -1)
#define SCH_SET_TASK_LASTMSG_TYPE(_task, _type) do { if(_task) { atomic_store_32(&(_task)->lastMsgType, _type); } } while (0)
#define SCH_GET_TASK_LASTMSG_TYPE(_task) ((_task) ? atomic_load_32(&(_task)->lastMsgType) : -1)
@ -351,6 +366,7 @@ int32_t schAsyncExecJob(void *pTrans, SArray *pNodeList, SQueryPlan *pDag, int64
int32_t schFetchRows(SSchJob *pJob);
int32_t schAsyncFetchRows(SSchJob *pJob);
int32_t schUpdateTaskHandle(SSchJob *pJob, SSchTask *pTask, int32_t msgType, void *handle, int32_t execIdx);
int32_t schProcessOnTaskStatusRsp(SQueryNodeEpId* pEpId, SArray* pStatusList);
#ifdef __cplusplus

View File

@ -29,6 +29,7 @@ int32_t schInitTask(SSchJob *pJob, SSchTask *pTask, SSubplan *pPlan, SSchLevel *
pTask->plan = pPlan;
pTask->level = pLevel;
pTask->execIdx = -1;
pTask->maxExecTimes = SCH_TASK_MAX_EXEC_TIMES;
pTask->timeoutUsec = SCH_DEFAULT_TASK_TIMEOUT_USEC;
SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_NOT_START);
pTask->taskId = schGenTaskId();
@ -142,6 +143,7 @@ void schDeregisterTaskHb(SSchJob *pJob, SSchTask *pTask) {
SSchHbTrans *hb = taosHashGet(schMgmt.hbConnections, &epId, sizeof(SQueryNodeEpId));
if (NULL == hb) {
SCH_TASK_ELOG("nodeId %d fqdn %s port %d not in hb connections", epId.nodeId, epId.ep.fqdn, epId.ep.port);
return;
}
atomic_sub_fetch_64(&hb->taskNum, 1);
@ -360,7 +362,7 @@ int32_t schRecordTaskSucceedNode(SSchJob *pJob, SSchTask *pTask) {
int32_t schAppendTaskExecNode(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, int32_t execIdx) {
SSchNodeInfo nodeInfo = {.addr = *addr, .handle = NULL};
if (NULL == taosHashPut(pTask->execNodes, &execIdx, sizeof(execIdx), &nodeInfo, sizeof(nodeInfo))) {
if (taosHashPut(pTask->execNodes, &execIdx, sizeof(execIdx), &nodeInfo, sizeof(nodeInfo))) {
SCH_TASK_ELOG("taosHashPut nodeInfo to execNodes failed, errno:%d", errno);
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
@ -384,7 +386,7 @@ int32_t schDropTaskExecNode(SSchJob *pJob, SSchTask *pTask, void *handle, int32_
}
int32_t schUpdateTaskExecNode(SSchTask *pTask, void *handle, int32_t execIdx) {
if (taosArrayGetSize(pTask->execNodes) <= 0) {
if (taosHashGetSize(pTask->execNodes) <= 0) {
return TSDB_CODE_SUCCESS;
}
@ -714,7 +716,17 @@ int32_t schTaskCheckSetRetry(SSchJob *pJob, SSchTask *pTask, int32_t errCode, bo
return TSDB_CODE_SUCCESS;
}
if ((pTask->execIdx + 1) >= SCH_TASK_MAX_EXEC_TIMES) {
if (TSDB_CODE_SCH_TIMEOUT_ERROR == errCode) {
pTask->maxExecTimes++;
if (pTask->timeoutUsec < SCH_MAX_TASK_TIMEOUT_USEC) {
pTask->timeoutUsec *= 2;
if (pTask->timeoutUsec > SCH_MAX_TASK_TIMEOUT_USEC) {
pTask->timeoutUsec = SCH_MAX_TASK_TIMEOUT_USEC;
}
}
}
if ((pTask->execIdx + 1) >= pTask->maxExecTimes) {
*needRetry = false;
SCH_TASK_DLOG("task no more retry since reach max try times, execIdx:%d", pTask->execIdx);
return TSDB_CODE_SUCCESS;
@ -737,7 +749,7 @@ int32_t schTaskCheckSetRetry(SSchJob *pJob, SSchTask *pTask, int32_t errCode, bo
} else {
int32_t candidateNum = taosArrayGetSize(pTask->candidateAddrs);
if ((pTask->candidateIdx + 1) >= candidateNum) {
if ((pTask->candidateIdx + 1) >= candidateNum && (TSDB_CODE_SCH_TIMEOUT_ERROR != errCode)) {
*needRetry = false;
SCH_TASK_DLOG("task no more retry since all candiates tried, candidateIdx:%d, candidateNum:%d",
pTask->candidateIdx, candidateNum);
@ -767,7 +779,10 @@ int32_t schHandleTaskRetry(SSchJob *pJob, SSchTask *pTask) {
if (SCH_IS_DATA_SRC_TASK(pTask)) {
SCH_SWITCH_EPSET(&pTask->plan->execNode);
} else {
++pTask->candidateIdx;
int32_t candidateNum = taosArrayGetSize(pTask->candidateAddrs);
if (++pTask->candidateIdx >= candidateNum) {
pTask->candidateIdx = 0;
}
}
SCH_ERR_RET(schLaunchTask(pJob, pTask));
@ -942,8 +957,12 @@ void schProcessOnDataFetched(SSchJob *job) {
int32_t schProcessOnTaskFailure(SSchJob *pJob, SSchTask *pTask, int32_t errCode) {
int8_t status = 0;
SCH_LOG_TASK_END_TS(pTask);
if (errCode == TSDB_CODE_SCH_TIMEOUT_ERROR) {
SCH_LOG_TASK_WAIT_TS(pTask);
} else {
SCH_LOG_TASK_END_TS(pTask);
}
if (schJobNeedToStop(pJob, &status)) {
SCH_TASK_DLOG("task failed not processed cause of job status, job status:%s", jobTaskStatusStr(status));
SCH_RET(atomic_load_32(&pJob->errCode));
@ -1145,12 +1164,46 @@ int32_t schProcessOnExplainDone(SSchJob *pJob, SSchTask *pTask, SRetrieveTableRs
return TSDB_CODE_SUCCESS;
}
void schDropTaskOnExecNode(SSchJob *pJob, SSchTask *pTask) {
if (NULL == pTask->execNodes) {
SCH_TASK_DLOG("no exec address, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
return;
}
int32_t size = (int32_t)taosHashGetSize(pTask->execNodes);
if (size <= 0) {
SCH_TASK_DLOG("task has no execNodes, no need to drop it, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
return;
}
SSchNodeInfo *nodeInfo = taosHashIterate(pTask->execNodes, NULL);
while (nodeInfo) {
SCH_SET_TASK_HANDLE(pTask, nodeInfo->handle);
schBuildAndSendMsg(pJob, pTask, &nodeInfo->addr, TDMT_VND_DROP_TASK);
nodeInfo = taosHashIterate(pTask->execNodes, nodeInfo);
}
SCH_TASK_DLOG("task has %d exec address", size);
}
int32_t schRescheduleTask(SSchJob *pJob, SSchTask *pTask) {
if (SCH_IS_DATA_SRC_QRY_TASK(pTask)) {
return TSDB_CODE_SUCCESS;
}
SCH_LOCK_TASK(pTask);
if (JOB_TASK_STATUS_EXECUTING == pTask->status && pJob->fetchTask != pTask) {
schDropTaskOnExecNode(pJob, pTask);
taosHashClear(pTask->execNodes);
schProcessOnTaskFailure(pJob, pTask, TSDB_CODE_SCH_TIMEOUT_ERROR);
}
SCH_UNLOCK_TASK(pTask);
return TSDB_CODE_SUCCESS;
}
int32_t schProcessOnTaskStatusRsp(SQueryNodeEpId* pEpId, SArray* pStatusList) {
@ -1193,6 +1246,7 @@ int32_t schProcessOnTaskStatusRsp(SQueryNodeEpId* pEpId, SArray* pStatusList) {
schReleaseJob(taskStatus->refId);
}
return TSDB_CODE_SUCCESS;
}
@ -1339,30 +1393,6 @@ int32_t schLaunchJob(SSchJob *pJob) {
return TSDB_CODE_SUCCESS;
}
void schDropTaskOnExecNode(SSchJob *pJob, SSchTask *pTask) {
if (NULL == pTask->execNodes) {
SCH_TASK_DLOG("no exec address, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
return;
}
int32_t size = (int32_t)taosHashGetSize(pTask->execNodes);
if (size <= 0) {
SCH_TASK_DLOG("task has no execNodes, no need to drop it, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
return;
}
SSchNodeInfo *nodeInfo = taosHashIterate(pTask->execNodes, NULL);
while (nodeInfo) {
SCH_SET_TASK_HANDLE(pTask, nodeInfo->handle);
schBuildAndSendMsg(pJob, pTask, &nodeInfo->addr, TDMT_VND_DROP_TASK);
nodeInfo = taosHashIterate(pTask->execNodes, nodeInfo);
}
SCH_TASK_DLOG("task has %d exec address", size);
}
void schDropTaskInHashList(SSchJob *pJob, SHashObj *list) {
if (!SCH_IS_NEED_DROP_JOB(pJob)) {

View File

@ -92,8 +92,7 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch
int8_t status = 0;
if (schJobNeedToStop(pJob, &status)) {
SCH_TASK_ELOG("rsp not processed cause of job status, job status:%s, rspCode:0x%x", jobTaskStatusStr(status),
rspCode);
SCH_TASK_ELOG("rsp not processed cause of job status, job status:%s, rspCode:0x%x", jobTaskStatusStr(status), rspCode);
taosMemoryFreeClear(msg);
SCH_RET(atomic_load_32(&pJob->errCode));
}
@ -344,7 +343,7 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch
_return:
taosMemoryFreeClear(msg);
taosMemoryFreeClear(msg);
SCH_RET(schProcessOnTaskFailure(pJob, pTask, code));
}
@ -364,6 +363,8 @@ int32_t schHandleCallback(void *param, const SDataBuf *pMsg, int32_t msgType, in
SCH_ERR_JRET(schGetTaskInJob(pJob, pParam->taskId, &pTask));
SCH_LOCK_TASK(pTask);
SCH_TASK_DLOG("rsp msg received, type:%s, handle:%p, code:%s", TMSG_INFO(msgType), pMsg->handle, tstrerror(rspCode));
if (pParam->execIdx != pTask->execIdx) {
@ -376,6 +377,10 @@ int32_t schHandleCallback(void *param, const SDataBuf *pMsg, int32_t msgType, in
SCH_ERR_JRET(schHandleResponseMsg(pJob, pTask, msgType, pMsg->pData, pMsg->len, rspCode));
_return:
if (pTask) {
SCH_UNLOCK_TASK(pTask);
}
if (pJob) {
schReleaseJob(pParam->refId);
@ -667,7 +672,7 @@ int32_t schRegisterHbConnection(SSchJob *pJob, SSchTask *pTask, SQueryNodeEpId *
}
int32_t schBuildAndSendHbMsg(SQueryNodeEpId *nodeEpId) {
int32_t schBuildAndSendHbMsg(SQueryNodeEpId *nodeEpId, SArray* taskAction) {
SSchedulerHbReq req = {0};
int32_t code = 0;
SRpcCtx rpcCtx = {0};

View File

@ -176,10 +176,6 @@ int32_t scheduleCancelJob(int64_t job) {
SCH_RET(code);
}
void schedulerStopTransport(void *pTrans) {
// CLOSE && REMOVE RELATED HB CONNECTIONS
}
void schedulerFreeJob(int64_t job) {
SSchJob *pJob = schAcquireJob(job);
if (NULL == pJob) {

View File

@ -444,6 +444,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_CTG_VG_META_MISMATCH, "table meta and vgroup
//scheduler
TAOS_DEFINE_ERROR(TSDB_CODE_SCH_STATUS_ERROR, "scheduler status error")
TAOS_DEFINE_ERROR(TSDB_CODE_SCH_INTERNAL_ERROR, "scheduler internal error")
TAOS_DEFINE_ERROR(TSDB_CODE_SCH_TIMEOUT_ERROR, "Task timeout")
TAOS_DEFINE_ERROR(TSDB_CODE_QW_MSG_ERROR, "Invalid msg order")
// parser