From 19e5d6372104647697264d00d6f511d06b7285fe Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Fri, 31 Mar 2023 18:26:46 +0800 Subject: [PATCH] fix: add job retry --- source/libs/qworker/src/qwDbg.c | 17 +++++++-- source/libs/scheduler/inc/schInt.h | 16 ++++++-- source/libs/scheduler/src/schJob.c | 53 +++++++++++++++++++++++++-- source/libs/scheduler/src/schRemote.c | 9 ++++- source/libs/scheduler/src/schTask.c | 31 +++++++++++++--- 5 files changed, 109 insertions(+), 17 deletions(-) diff --git a/source/libs/qworker/src/qwDbg.c b/source/libs/qworker/src/qwDbg.c index b8d5d2e6ee..59e63e9eae 100644 --- a/source/libs/qworker/src/qwDbg.c +++ b/source/libs/qworker/src/qwDbg.c @@ -259,15 +259,26 @@ void qwDbgSimulateDead(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *rsped) { static int32_t ignoreTime = 0; if (++ignoreTime > 10 && 0 == taosRand() % 9) { + if (ctx->msgType == TDMT_SCH_FETCH) { + qwBuildAndSendErrorRsp(TDMT_SCH_LINK_BROKEN, &ctx->ctrlConnInfo, TSDB_CODE_RPC_BROKEN_LINK); + qwBuildAndSendErrorRsp(ctx->msgType + 1, &ctx->dataConnInfo, TSDB_CODE_QRY_TASK_CTX_NOT_EXIST); + *rsped = true; + + taosSsleep(3); + return; + } + +#if 0 SRpcHandleInfo *pConn = ((ctx->msgType == TDMT_SCH_FETCH || ctx->msgType == TDMT_SCH_MERGE_FETCH) ? &ctx->dataConnInfo - : &ctx->ctrlConnInfo); + : &ctx->ctrlConnInfo); qwBuildAndSendErrorRsp(ctx->msgType + 1, pConn, TSDB_CODE_RPC_BROKEN_LINK); - + qwBuildAndSendDropMsg(QW_FPARAMS(), pConn); *rsped = true; - + return; +#endif } } diff --git a/source/libs/scheduler/inc/schInt.h b/source/libs/scheduler/inc/schInt.h index b7f9272bdc..d002b5dfa9 100644 --- a/source/libs/scheduler/inc/schInt.h +++ b/source/libs/scheduler/inc/schInt.h @@ -299,6 +299,7 @@ typedef struct SSchJob { SExecResult execRes; void *fetchRes; // TODO free it or not bool fetched; + bool noMoreRetry; int64_t resNumOfRows; // from int32_t to int64_t SSchResInfo userRes; char *sql; @@ -333,8 +334,8 @@ extern SSchedulerMgmt schMgmt; ((_job)->attr.localExec && SCH_IS_QUERY_JOB(_job) && (!SCH_IS_INSERT_JOB(_job)) && \ (!SCH_IS_DATA_BIND_QRY_TASK(_task))) -#define SCH_UPDATE_REDICT_CODE(job, _code) atomic_val_compare_exchange_32(&((job)->redirectCode), 0, _code) -#define SCH_GET_REDICT_CODE(job, _code) (((!NO_RET_REDIRECT_ERROR(_code)) || (job)->redirectCode == 0) ? (_code) : (job)->redirectCode) +#define SCH_UPDATE_REDIRECT_CODE(job, _code) atomic_val_compare_exchange_32(&((job)->redirectCode), 0, _code) +#define SCH_GET_REDIRECT_CODE(job, _code) (((!NO_RET_REDIRECT_ERROR(_code)) || (job)->redirectCode == 0) ? (_code) : (job)->redirectCode) #define SCH_SET_TASK_STATUS(task, st) atomic_store_8(&(task)->status, st) #define SCH_GET_TASK_STATUS(task) atomic_load_8(&(task)->status) @@ -398,7 +399,7 @@ extern SSchedulerMgmt schMgmt; (NEED_SCHEDULER_REDIRECT_ERROR(_code) || SCH_LOW_LEVEL_NETWORK_ERR((_job), (_task), (_code)) || SCH_TASK_RETRY_NETWORK_ERR((_task), (_code)))) #define SCH_TASK_NEED_RETRY(_msgType, _code) \ ((SCH_REDIRECT_MSGTYPE(_msgType) && SCH_NETWORK_ERR(_code)) || (_code) == TSDB_CODE_SCH_TIMEOUT_ERROR) - + #define SCH_IS_LEVEL_UNFINISHED(_level) ((_level)->taskLaunchedNum < (_level)->taskNum) #define SCH_GET_CUR_EP(_addr) (&(_addr)->epSet.eps[(_addr)->epSet.inUse]) @@ -522,6 +523,11 @@ extern SSchedulerMgmt schMgmt; } \ } while (0) +#define SCH_RESET_JOB_LEVEL_IDX(_job) do { \ + (_job)->levelIdx = (_job)->levelNum - 1; \ + SCH_JOB_DLOG("set job levelIdx to %d", (_job)->levelIdx); \ +} while (0) + void schDeregisterTaskHb(SSchJob *pJob, SSchTask *pTask); void schCleanClusterHb(void *pTrans); int32_t schLaunchTask(SSchJob *job, SSchTask *task); @@ -603,6 +609,10 @@ int32_t schHandleJobDrop(SSchJob *pJob, int32_t errCode); bool schChkCurrentOp(SSchJob *pJob, int32_t op, int8_t sync); int32_t schProcessFetchRsp(SSchJob *pJob, SSchTask *pTask, char *msg, int32_t rspCode); int32_t schProcessExplainRsp(SSchJob *pJob, SSchTask *pTask, SExplainRsp *rsp); +int32_t schHandleJobRetry(SSchJob *pJob, SSchTask *pTask, SDataBuf *pMsg, int32_t rspCode); +int32_t schChkResetJobRetry(SSchJob *pJob, int32_t rspCode); +void schResetTaskForRetry(SSchJob *pJob, SSchTask *pTask); +int32_t schChkUpdateRedirectCtx(SSchJob *pJob, SSchTask *pTask, SEpSet *pEpSet, int32_t rspCode); extern SSchDebug gSCHDebug; diff --git a/source/libs/scheduler/src/schJob.c b/source/libs/scheduler/src/schJob.c index b2c90dc67f..a36cd6747c 100644 --- a/source/libs/scheduler/src/schJob.c +++ b/source/libs/scheduler/src/schJob.c @@ -296,7 +296,7 @@ int32_t schValidateAndBuildJob(SQueryPlan *pDag, SSchJob *pJob) { } pJob->levelNum = levelNum; - pJob->levelIdx = levelNum - 1; + SCH_RESET_JOB_LEVEL_IDX(pJob); SSchLevel level = {0}; SNodeListNode *plans = NULL; @@ -828,8 +828,9 @@ int32_t schChkResetJobRetry(SSchJob *pJob, int32_t rspCode) { SCH_LOCK(SCH_WRITE, &pJob->resLock); if (pJob->fetched) { SCH_UNLOCK(SCH_WRITE, &pJob->resLock); - SCH_TASK_ELOG("already fetched while got error %s", tstrerror(rspCode)); - SCH_ERR_JRET(rspCode); + pJob->noMoreRetry = true; + SCH_JOB_ELOG("already fetched while got error %s", tstrerror(rspCode)); + SCH_ERR_RET(rspCode); } SCH_UNLOCK(SCH_WRITE, &pJob->resLock); @@ -839,12 +840,56 @@ int32_t schChkResetJobRetry(SSchJob *pJob, int32_t rspCode) { return TSDB_CODE_SUCCESS; } +int32_t schResetJobForRetry(SSchJob *pJob, int32_t rspCode) { + SCH_ERR_RET(schChkResetJobRetry(pJob, rspCode)); + + int32_t numOfLevels = taosArrayGetSize(pJob->levels); + for (int32_t i = 0; i < numOfLevels; ++i) { + SSchLevel *pLevel = taosArrayGet(pJob->levels, i); + + pLevel->taskExecDoneNum = 0; + pLevel->taskLaunchedNum = 0; + + int32_t numOfTasks = taosArrayGetSize(pLevel->subTasks); + for (int32_t j = 0; j < numOfTasks; ++j) { + SSchTask *pTask = taosArrayGet(pLevel->subTasks, j); + SCH_LOCK_TASK(pTask); + SCH_ERR_RET(schChkUpdateRedirectCtx(pJob, pTask, NULL, rspCode)); + qClearSubplanExecutionNode(pTask->plan); + schResetTaskForRetry(pJob, pTask); + SCH_UNLOCK_TASK(pTask); + } + } + + SCH_RESET_JOB_LEVEL_IDX(pJob); + + return TSDB_CODE_SUCCESS; +} + int32_t schHandleJobRetry(SSchJob *pJob, SSchTask *pTask, SDataBuf *pMsg, int32_t rspCode) { int32_t code = 0; - SCH_ERR_JRET(schChkResetJobRetry(pJob, rspCode)); + taosMemoryFreeClear(pMsg->pData); + taosMemoryFreeClear(pMsg->pEpSet); + SCH_UNLOCK_TASK(pTask); + + SCH_TASK_DLOG("start to redirect all job tasks cause of error: %s", tstrerror(rspCode)); + + SCH_ERR_JRET(schResetJobForRetry(pJob, rspCode)); + + SCH_ERR_JRET(schLaunchJob(pJob)); + + SCH_LOCK_TASK(pTask); + + SCH_RET(code); + +_return: + + SCH_LOCK_TASK(pTask); + + SCH_RET(schProcessOnTaskFailure(pJob, pTask, code)); } bool schChkCurrentOp(SSchJob *pJob, int32_t op, int8_t sync) { diff --git a/source/libs/scheduler/src/schRemote.c b/source/libs/scheduler/src/schRemote.c index af206aa46e..80fdc7594c 100644 --- a/source/libs/scheduler/src/schRemote.c +++ b/source/libs/scheduler/src/schRemote.c @@ -416,6 +416,7 @@ _return: int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t execId, SDataBuf *pMsg, int32_t rspCode) { int32_t code = 0; int32_t msgType = pMsg->msgType; + char *msg = pMsg->pData; bool dropExecNode = (msgType == TDMT_SCH_LINK_BROKEN || SCH_NETWORK_ERR(rspCode)); if (SCH_IS_QUERY_JOB(pJob)) { @@ -426,7 +427,7 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t execId, SDa int32_t reqType = IsReq(pMsg) ? pMsg->msgType : (pMsg->msgType - 1); if (SCH_JOB_NEED_RETRY(pJob, pTask, reqType, rspCode)) { - SCH_RET(schHandleJobRetry()); + SCH_RET(schHandleJobRetry(pJob, pTask, (SDataBuf *)pMsg, rspCode)); } else if (SCH_TASKSET_NEED_RETRY(pJob, pTask, reqType, rspCode)) { SCH_RET(schHandleTaskSetRetry(pJob, pTask, (SDataBuf *)pMsg, rspCode)); } @@ -434,6 +435,12 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t execId, SDa pTask->redirectCtx.inRedirect = false; SCH_RET(schProcessResponseMsg(pJob, pTask, execId, pMsg, rspCode)); + +_return: + + taosMemoryFreeClear(msg); + + SCH_RET(schProcessOnTaskFailure(pJob, pTask, code)); } int32_t schHandleCallback(void *param, SDataBuf *pMsg, int32_t rspCode) { int32_t code = 0; diff --git a/source/libs/scheduler/src/schTask.c b/source/libs/scheduler/src/schTask.c index a3194c7ff7..207753ae25 100644 --- a/source/libs/scheduler/src/schTask.c +++ b/source/libs/scheduler/src/schTask.c @@ -378,7 +378,8 @@ int32_t schChkUpdateRedirectCtx(SSchJob *pJob, SSchTask *pTask, SEpSet *pEpSet, if (lastTime > tsMaxRetryWaitTime) { SCH_TASK_DLOG("task no more redirect retry since timeout, now:%" PRId64 ", start:%" PRId64 ", max:%d, total:%d", nowTs, pCtx->startTs, tsMaxRetryWaitTime, pCtx->totalTimes); - SCH_ERR_RET(SCH_GET_REDICT_CODE(pJob, rspCode)); + pJob->noMoreRetry = true; + SCH_ERR_RET(SCH_GET_REDIRECT_CODE(pJob, rspCode)); } pCtx->periodMs *= tsRedirectFactor; @@ -408,16 +409,16 @@ void schResetTaskForRetry(SSchJob *pJob, SSchTask *pTask) { pTask->waitRetry = true; schDropTaskOnExecNode(pJob, pTask); + if (pTask->delayTimer) { + taosTmrStopA(&pTask->delayTimer); + } taosHashClear(pTask->execNodes); schRemoveTaskFromExecList(pJob, pTask); schDeregisterTaskHb(pJob, pTask); - atomic_sub_fetch_32(&pTask->level->taskLaunchedNum, 1); - if (SCH_TASK_EXEC_DONE(pTask)) { - atomic_sub_fetch_32(&pTask->level->taskExecDoneNum, 1); - } taosMemoryFreeClear(pTask->msg); pTask->msgLen = 0; pTask->lastMsgType = 0; + pTask->childReady = 0; memset(&pTask->succeedAddr, 0, sizeof(pTask->succeedAddr)); } @@ -427,7 +428,7 @@ int32_t schDoTaskRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf *pData, int32 SCH_TASK_DLOG("task will be redirected now, status:%s, code:%s", SCH_GET_TASK_STATUS_STR(pTask), tstrerror(rspCode)); if (!NO_RET_REDIRECT_ERROR(rspCode)) { - SCH_UPDATE_REDICT_CODE(pJob, rspCode); + SCH_UPDATE_REDIRECT_CODE(pJob, rspCode); } SCH_ERR_JRET(schChkUpdateRedirectCtx(pJob, pTask, pData ? pData->pEpSet : NULL, rspCode)); @@ -496,6 +497,17 @@ int32_t schHandleTaskSetRetry(SSchJob *pJob, SSchTask *pTask, SDataBuf *pData, i } } + SCH_TASK_DLOG("start to redirect current task set cause of error: %s", tstrerror(rspCode)); + + for (int32_t i = 0; i < pJob->levelNum; ++i) { + SSchLevel *pLevel = taosArrayGet(pJob->levels, i); + + pLevel->taskExecDoneNum = 0; + pLevel->taskLaunchedNum = 0; + } + + SCH_RESET_JOB_LEVEL_IDX(pJob); + code = schDoTaskRedirect(pJob, pTask, pData, rspCode); taosMemoryFreeClear(pData->pData); @@ -609,6 +621,13 @@ int32_t schMoveTaskToExecList(SSchJob *pJob, SSchTask *pTask, bool *moved) { */ int32_t schTaskCheckSetRetry(SSchJob *pJob, SSchTask *pTask, int32_t errCode, bool *needRetry) { + if (pJob->noMoreRetry) { + *needRetry = false; + SCH_TASK_DLOG("task no more retry since job no more retry, retryTimes:%d/%d", pTask->retryTimes, + pTask->maxRetryTimes); + return TSDB_CODE_SUCCESS; + } + if (TSDB_CODE_SCH_TIMEOUT_ERROR == errCode) { pTask->maxExecTimes++; pTask->maxRetryTimes++;