From e45a348e5bd508fa9a7b452b2c1158ce9b2d9640 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Mon, 6 May 2024 14:08:37 +0800 Subject: [PATCH] fix: scheduler job retry issue --- source/libs/scheduler/inc/schInt.h | 1 + source/libs/scheduler/src/schJob.c | 23 ++++++++++++++++++++--- 2 files changed, 21 insertions(+), 3 deletions(-) diff --git a/source/libs/scheduler/inc/schInt.h b/source/libs/scheduler/inc/schInt.h index d129b0024f..c7f6c20b7d 100644 --- a/source/libs/scheduler/inc/schInt.h +++ b/source/libs/scheduler/inc/schInt.h @@ -287,6 +287,7 @@ typedef struct SSchJob { SExplainCtx *explainCtx; int8_t status; + int8_t inRetry; SQueryNodeAddr resNode; tsem_t rspSem; SSchOpStatus opStatus; diff --git a/source/libs/scheduler/src/schJob.c b/source/libs/scheduler/src/schJob.c index 48aab63ba3..a4a76f01b7 100644 --- a/source/libs/scheduler/src/schJob.c +++ b/source/libs/scheduler/src/schJob.c @@ -851,7 +851,15 @@ int32_t schChkResetJobRetry(SSchJob *pJob, int32_t rspCode) { return TSDB_CODE_SUCCESS; } -int32_t schResetJobForRetry(SSchJob *pJob, int32_t rspCode) { +int32_t schResetJobForRetry(SSchJob *pJob, int32_t rspCode, bool *inRetry) { + int8_t origInRetry = atomic_val_compare_exchange_8(&pJob->inRetry, 0, 1); + if (0 != origInRetry) { + SCH_JOB_DLOG("job already in retry, origInRetry: %d", pJob->inRetry); + return TSDB_CODE_SCH_IGNORE_ERROR; + } + + *inRetry = true; + SCH_ERR_RET(schChkResetJobRetry(pJob, rspCode)); int32_t numOfLevels = taosArrayGetSize(pJob->levels); @@ -880,6 +888,7 @@ int32_t schResetJobForRetry(SSchJob *pJob, int32_t rspCode) { int32_t schHandleJobRetry(SSchJob *pJob, SSchTask *pTask, SDataBuf *pMsg, int32_t rspCode) { int32_t code = 0; + bool inRetry = false; taosMemoryFreeClear(pMsg->pData); taosMemoryFreeClear(pMsg->pEpSet); @@ -888,19 +897,27 @@ int32_t schHandleJobRetry(SSchJob *pJob, SSchTask *pTask, SDataBuf *pMsg, int32_ SCH_TASK_DLOG("start to redirect all job tasks cause of error: %s", tstrerror(rspCode)); - SCH_ERR_JRET(schResetJobForRetry(pJob, rspCode)); + SCH_ERR_JRET(schResetJobForRetry(pJob, rspCode, &inRetry)); SCH_ERR_JRET(schLaunchJob(pJob)); SCH_LOCK_TASK(pTask); + atomic_store_8(&pJob->inRetry, 0); + SCH_RET(code); _return: SCH_LOCK_TASK(pTask); - SCH_RET(schProcessOnTaskFailure(pJob, pTask, code)); + schProcessOnTaskFailure(pJob, pTask, code); + + if (inRetry) { + atomic_store_8(&pJob->inRetry, 0); + } + + SCH_RET(code); } bool schChkCurrentOp(SSchJob *pJob, int32_t op, int8_t sync) {