diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index 5dd43ca064..1df8dcda95 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -559,13 +559,13 @@ int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inpu QW_ERR_JRET(ctx->pJobInfo->errCode); } - if (atomic_load_8((int8_t *)&ctx->queryEnd) && !ctx->dynamicTask) { - QW_TASK_ELOG("query already end, phase:%d", phase); - QW_ERR_JRET(TSDB_CODE_QW_MSG_ERROR); - } - switch (phase) { case QW_PHASE_PRE_QUERY: { + if (atomic_load_8((int8_t *)&ctx->queryEnd) && !ctx->dynamicTask) { + QW_TASK_ELOG("query already end, phase:%d", phase); + QW_ERR_JRET(TSDB_CODE_QW_MSG_ERROR); + } + if (QW_EVENT_PROCESSED(ctx, QW_EVENT_DROP)) { QW_TASK_ELOG("task already dropped at phase %s", qwPhaseStr(phase)); QW_ERR_JRET(TSDB_CODE_QRY_TASK_STATUS_ERROR); @@ -592,6 +592,11 @@ int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inpu break; } case QW_PHASE_PRE_FETCH: { + if (atomic_load_8((int8_t *)&ctx->queryEnd) && !ctx->dynamicTask) { + QW_TASK_ELOG("query already end, phase:%d", phase); + QW_ERR_JRET(TSDB_CODE_QW_MSG_ERROR); + } + if (QW_EVENT_PROCESSED(ctx, QW_EVENT_DROP) || QW_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) { QW_TASK_WLOG("task dropping or already dropped, phase:%s", qwPhaseStr(phase)); QW_ERR_JRET(ctx->rspCode); @@ -614,6 +619,12 @@ int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inpu break; } case QW_PHASE_PRE_CQUERY: { + if (atomic_load_8((int8_t *)&ctx->queryEnd) && !ctx->dynamicTask) { + QW_TASK_ELOG("query already end, phase:%d", phase); + code = ctx->rspCode; + goto _return; + } + if (QW_EVENT_PROCESSED(ctx, QW_EVENT_DROP)) { QW_TASK_WLOG("task already dropped, phase:%s", qwPhaseStr(phase)); QW_ERR_JRET(ctx->rspCode);