Merge pull request #25640 from taosdata/fix/TS-4737
fix: scheduler dead lock issue
This commit is contained in:
commit
12f38f7685
|
@ -287,6 +287,7 @@ typedef struct SSchJob {
|
||||||
|
|
||||||
SExplainCtx *explainCtx;
|
SExplainCtx *explainCtx;
|
||||||
int8_t status;
|
int8_t status;
|
||||||
|
int8_t inRetry;
|
||||||
SQueryNodeAddr resNode;
|
SQueryNodeAddr resNode;
|
||||||
tsem_t rspSem;
|
tsem_t rspSem;
|
||||||
SSchOpStatus opStatus;
|
SSchOpStatus opStatus;
|
||||||
|
|
|
@ -851,9 +851,18 @@ int32_t schChkResetJobRetry(SSchJob *pJob, int32_t rspCode) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t schResetJobForRetry(SSchJob *pJob, int32_t rspCode) {
|
int32_t schResetJobForRetry(SSchJob *pJob, int32_t rspCode, bool *inRetry) {
|
||||||
|
int8_t origInRetry = atomic_val_compare_exchange_8(&pJob->inRetry, 0, 1);
|
||||||
|
if (0 != origInRetry) {
|
||||||
|
SCH_JOB_DLOG("job already in retry, origInRetry: %d", pJob->inRetry);
|
||||||
|
return TSDB_CODE_SCH_IGNORE_ERROR;
|
||||||
|
}
|
||||||
|
|
||||||
|
*inRetry = true;
|
||||||
|
|
||||||
SCH_ERR_RET(schChkResetJobRetry(pJob, rspCode));
|
SCH_ERR_RET(schChkResetJobRetry(pJob, rspCode));
|
||||||
|
|
||||||
|
int32_t code = 0;
|
||||||
int32_t numOfLevels = taosArrayGetSize(pJob->levels);
|
int32_t numOfLevels = taosArrayGetSize(pJob->levels);
|
||||||
for (int32_t i = 0; i < numOfLevels; ++i) {
|
for (int32_t i = 0; i < numOfLevels; ++i) {
|
||||||
SSchLevel *pLevel = taosArrayGet(pJob->levels, i);
|
SSchLevel *pLevel = taosArrayGet(pJob->levels, i);
|
||||||
|
@ -865,7 +874,11 @@ int32_t schResetJobForRetry(SSchJob *pJob, int32_t rspCode) {
|
||||||
for (int32_t j = 0; j < numOfTasks; ++j) {
|
for (int32_t j = 0; j < numOfTasks; ++j) {
|
||||||
SSchTask *pTask = taosArrayGet(pLevel->subTasks, j);
|
SSchTask *pTask = taosArrayGet(pLevel->subTasks, j);
|
||||||
SCH_LOCK_TASK(pTask);
|
SCH_LOCK_TASK(pTask);
|
||||||
SCH_ERR_RET(schChkUpdateRedirectCtx(pJob, pTask, NULL, rspCode));
|
code = schChkUpdateRedirectCtx(pJob, pTask, NULL, rspCode);
|
||||||
|
if (TSDB_CODE_SUCCESS != code) {
|
||||||
|
SCH_UNLOCK_TASK(pTask);
|
||||||
|
SCH_RET(code);
|
||||||
|
}
|
||||||
qClearSubplanExecutionNode(pTask->plan);
|
qClearSubplanExecutionNode(pTask->plan);
|
||||||
schResetTaskForRetry(pJob, pTask);
|
schResetTaskForRetry(pJob, pTask);
|
||||||
SCH_UNLOCK_TASK(pTask);
|
SCH_UNLOCK_TASK(pTask);
|
||||||
|
@ -880,6 +893,7 @@ int32_t schResetJobForRetry(SSchJob *pJob, int32_t rspCode) {
|
||||||
|
|
||||||
int32_t schHandleJobRetry(SSchJob *pJob, SSchTask *pTask, SDataBuf *pMsg, int32_t rspCode) {
|
int32_t schHandleJobRetry(SSchJob *pJob, SSchTask *pTask, SDataBuf *pMsg, int32_t rspCode) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
bool inRetry = false;
|
||||||
|
|
||||||
taosMemoryFreeClear(pMsg->pData);
|
taosMemoryFreeClear(pMsg->pData);
|
||||||
taosMemoryFreeClear(pMsg->pEpSet);
|
taosMemoryFreeClear(pMsg->pEpSet);
|
||||||
|
@ -888,19 +902,27 @@ int32_t schHandleJobRetry(SSchJob *pJob, SSchTask *pTask, SDataBuf *pMsg, int32_
|
||||||
|
|
||||||
SCH_TASK_DLOG("start to redirect all job tasks cause of error: %s", tstrerror(rspCode));
|
SCH_TASK_DLOG("start to redirect all job tasks cause of error: %s", tstrerror(rspCode));
|
||||||
|
|
||||||
SCH_ERR_JRET(schResetJobForRetry(pJob, rspCode));
|
SCH_ERR_JRET(schResetJobForRetry(pJob, rspCode, &inRetry));
|
||||||
|
|
||||||
SCH_ERR_JRET(schLaunchJob(pJob));
|
SCH_ERR_JRET(schLaunchJob(pJob));
|
||||||
|
|
||||||
SCH_LOCK_TASK(pTask);
|
SCH_LOCK_TASK(pTask);
|
||||||
|
|
||||||
|
atomic_store_8(&pJob->inRetry, 0);
|
||||||
|
|
||||||
SCH_RET(code);
|
SCH_RET(code);
|
||||||
|
|
||||||
_return:
|
_return:
|
||||||
|
|
||||||
SCH_LOCK_TASK(pTask);
|
SCH_LOCK_TASK(pTask);
|
||||||
|
|
||||||
SCH_RET(schProcessOnTaskFailure(pJob, pTask, code));
|
code = schProcessOnTaskFailure(pJob, pTask, code);
|
||||||
|
|
||||||
|
if (inRetry) {
|
||||||
|
atomic_store_8(&pJob->inRetry, 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
SCH_RET(code);
|
||||||
}
|
}
|
||||||
|
|
||||||
bool schChkCurrentOp(SSchJob *pJob, int32_t op, int8_t sync) {
|
bool schChkCurrentOp(SSchJob *pJob, int32_t op, int8_t sync) {
|
||||||
|
|
Loading…
Reference in New Issue