fix: scheduler job retry issue

This commit is contained in:
dapan1121 2024-05-06 14:08:37 +08:00
parent 7809533c79
commit e45a348e5b
2 changed files with 21 additions and 3 deletions

View File

@ -287,6 +287,7 @@ typedef struct SSchJob {
SExplainCtx *explainCtx;
int8_t status;
int8_t inRetry;
SQueryNodeAddr resNode;
tsem_t rspSem;
SSchOpStatus opStatus;

View File

@ -851,7 +851,15 @@ int32_t schChkResetJobRetry(SSchJob *pJob, int32_t rspCode) {
return TSDB_CODE_SUCCESS;
}
int32_t schResetJobForRetry(SSchJob *pJob, int32_t rspCode) {
int32_t schResetJobForRetry(SSchJob *pJob, int32_t rspCode, bool *inRetry) {
int8_t origInRetry = atomic_val_compare_exchange_8(&pJob->inRetry, 0, 1);
if (0 != origInRetry) {
SCH_JOB_DLOG("job already in retry, origInRetry: %d", pJob->inRetry);
return TSDB_CODE_SCH_IGNORE_ERROR;
}
*inRetry = true;
SCH_ERR_RET(schChkResetJobRetry(pJob, rspCode));
int32_t numOfLevels = taosArrayGetSize(pJob->levels);
@ -880,6 +888,7 @@ int32_t schResetJobForRetry(SSchJob *pJob, int32_t rspCode) {
int32_t schHandleJobRetry(SSchJob *pJob, SSchTask *pTask, SDataBuf *pMsg, int32_t rspCode) {
int32_t code = 0;
bool inRetry = false;
taosMemoryFreeClear(pMsg->pData);
taosMemoryFreeClear(pMsg->pEpSet);
@ -888,19 +897,27 @@ int32_t schHandleJobRetry(SSchJob *pJob, SSchTask *pTask, SDataBuf *pMsg, int32_
SCH_TASK_DLOG("start to redirect all job tasks cause of error: %s", tstrerror(rspCode));
SCH_ERR_JRET(schResetJobForRetry(pJob, rspCode));
SCH_ERR_JRET(schResetJobForRetry(pJob, rspCode, &inRetry));
SCH_ERR_JRET(schLaunchJob(pJob));
SCH_LOCK_TASK(pTask);
atomic_store_8(&pJob->inRetry, 0);
SCH_RET(code);
_return:
SCH_LOCK_TASK(pTask);
SCH_RET(schProcessOnTaskFailure(pJob, pTask, code));
schProcessOnTaskFailure(pJob, pTask, code);
if (inRetry) {
atomic_store_8(&pJob->inRetry, 0);
}
SCH_RET(code);
}
bool schChkCurrentOp(SSchJob *pJob, int32_t op, int8_t sync) {