From d12614aac8f7e0734d0d2602fd20fbe4d0f78c42 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Fri, 17 Jun 2022 09:22:03 +0800 Subject: [PATCH] kill query --- include/libs/scheduler/scheduler.h | 13 ++-- source/client/src/clientImpl.c | 6 +- source/libs/scheduler/inc/schedulerInt.h | 1 + source/libs/scheduler/src/schJob.c | 84 ++++++++++++++---------- 4 files changed, 60 insertions(+), 44 deletions(-) diff --git a/include/libs/scheduler/scheduler.h b/include/libs/scheduler/scheduler.h index dfff17221d..ecb21335b9 100644 --- a/include/libs/scheduler/scheduler.h +++ b/include/libs/scheduler/scheduler.h @@ -73,13 +73,14 @@ typedef void (*schedulerExecCallback)(SQueryResult* pResult, void* param, int32_ typedef void (*schedulerFetchCallback)(void* pResult, void* param, int32_t code); typedef struct SSchedulerReq { - SRequestConnInfo *pConn; - SArray *pNodeList; - SQueryPlan *pDag; - const char *sql; - int64_t startTs; + bool *reqKilled; + SRequestConnInfo *pConn; + SArray *pNodeList; + SQueryPlan *pDag; + const char *sql; + int64_t startTs; schedulerExecCallback fp; - void* cbParam; + void* cbParam; } SSchedulerReq; diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index a9dd1bb50b..6f0f1c7f41 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -461,7 +461,8 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList .sql = pRequest->sqlstr, .startTs = pRequest->metric.start, .fp = NULL, - .cbParam = NULL}; + .cbParam = NULL, + .reqKilled = &pRequest->killed}; int32_t code = schedulerExecJob(&req, &pRequest->body.queryJob, &res); pRequest->body.resInfo.execRes = res.res; @@ -738,7 +739,8 @@ void launchAsyncQuery(SRequestObj* pRequest, SQuery* pQuery) { .sql = pRequest->sqlstr, .startTs = pRequest->metric.start, .fp = schedulerExecCb, - .cbParam = pRequest}; + .cbParam = pRequest, + .reqKilled = &pRequest->killed}; code = schedulerAsyncExecJob(&req, &pRequest->body.queryJob); } else { tscError("0x%" PRIx64 " failed to create query plan, code:%s 0x%" PRIx64, pRequest->self, tstrerror(code), diff --git a/source/libs/scheduler/inc/schedulerInt.h b/source/libs/scheduler/inc/schedulerInt.h index 2a3a069d02..545bb6c45a 100644 --- a/source/libs/scheduler/inc/schedulerInt.h +++ b/source/libs/scheduler/inc/schedulerInt.h @@ -228,6 +228,7 @@ typedef struct SSchJob { SQueryNodeAddr resNode; tsem_t rspSem; SSchOpStatus opStatus; + bool *reqKilled; SSchTask *fetchTask; int32_t errCode; SRWLatch resLock; diff --git a/source/libs/scheduler/src/schJob.c b/source/libs/scheduler/src/schJob.c index d723a431c2..f9c7b21cf2 100644 --- a/source/libs/scheduler/src/schJob.c +++ b/source/libs/scheduler/src/schJob.c @@ -54,6 +54,7 @@ int32_t schInitJob(SSchedulerReq *pReq, SSchJob **pSchJob, SQueryResult* pRes, b pJob->attr.explainMode = pReq->pDag->explainInfo.mode; pJob->conn = *pReq->pConn; pJob->sql = pReq->sql; + pJob->reqKilled = pReq->reqKilled; pJob->userRes.queryRes = pRes; pJob->userRes.execFp = pReq->fp; pJob->userRes.userParam = pReq->cbParam; @@ -154,12 +155,52 @@ void schFreeTask(SSchJob *pJob, SSchTask *pTask) { } } + +void schUpdateJobErrCode(SSchJob *pJob, int32_t errCode) { + if (TSDB_CODE_SUCCESS == errCode) { + return; + } + + int32_t origCode = atomic_load_32(&pJob->errCode); + if (TSDB_CODE_SUCCESS == origCode) { + if (origCode == atomic_val_compare_exchange_32(&pJob->errCode, origCode, errCode)) { + goto _return; + } + + origCode = atomic_load_32(&pJob->errCode); + } + + if (NEED_CLIENT_HANDLE_ERROR(origCode)) { + return; + } + + if (NEED_CLIENT_HANDLE_ERROR(errCode)) { + atomic_store_32(&pJob->errCode, errCode); + goto _return; + } + + return; + +_return: + + SCH_JOB_DLOG("job errCode updated to %x - %s", errCode, tstrerror(errCode)); +} + + + FORCE_INLINE bool schJobNeedToStop(SSchJob *pJob, int8_t *pStatus) { int8_t status = SCH_GET_JOB_STATUS(pJob); if (pStatus) { *pStatus = status; } + if (pJob->reqKilled) { + schUpdateJobStatus(pJob, JOB_TASK_STATUS_DROPPING); + schUpdateJobErrCode(pJob, TSDB_CODE_TSC_QUERY_KILLED); + + return true; + } + return (status == JOB_TASK_STATUS_FAILED || status == JOB_TASK_STATUS_DROPPING || status == JOB_TASK_STATUS_SUCCEED); } @@ -255,7 +296,13 @@ void schEndOperation(SSchJob *pJob) { int32_t schBeginOperation(SSchJob *pJob, SCH_OP_TYPE type, bool sync) { int32_t code = 0; + int8_t status = 0; + if (schJobNeedToStop(pJob, &status)) { + SCH_JOB_ELOG("job need to stop cause of status %s", jobTaskStatusStr(status)); + SCH_ERR_JRET(pJob->errCode); + } + if (SCH_OP_NULL != atomic_val_compare_exchange_32(&pJob->opStatus.op, SCH_OP_NULL, type)) { SCH_JOB_ELOG("job already in %s operation", schGetOpStr(pJob->opStatus.op)); SCH_ERR_JRET(TSDB_CODE_TSC_APP_ERROR); @@ -275,11 +322,7 @@ int32_t schBeginOperation(SSchJob *pJob, SCH_OP_TYPE type, bool sync) { SCH_ERR_JRET(TSDB_CODE_QRY_APP_ERROR); } - int8_t status = 0; - if (schJobNeedToStop(pJob, &status)) { - SCH_JOB_ELOG("job need to stop cause of status %s", jobTaskStatusStr(status)); - SCH_ERR_JRET(TSDB_CODE_SCH_STATUS_ERROR); - } else if (status != JOB_TASK_STATUS_PARTIAL_SUCCEED) { + if (status != JOB_TASK_STATUS_PARTIAL_SUCCEED) { SCH_JOB_ELOG("job status error for fetch, status:%s", jobTaskStatusStr(status)); SCH_ERR_JRET(TSDB_CODE_SCH_STATUS_ERROR); } @@ -841,37 +884,6 @@ int32_t schHandleTaskRetry(SSchJob *pJob, SSchTask *pTask) { return TSDB_CODE_SUCCESS; } -void schUpdateJobErrCode(SSchJob *pJob, int32_t errCode) { - if (TSDB_CODE_SUCCESS == errCode) { - return; - } - - int32_t origCode = atomic_load_32(&pJob->errCode); - if (TSDB_CODE_SUCCESS == origCode) { - if (origCode == atomic_val_compare_exchange_32(&pJob->errCode, origCode, errCode)) { - goto _return; - } - - origCode = atomic_load_32(&pJob->errCode); - } - - if (NEED_CLIENT_HANDLE_ERROR(origCode)) { - return; - } - - if (NEED_CLIENT_HANDLE_ERROR(errCode)) { - atomic_store_32(&pJob->errCode, errCode); - goto _return; - } - - return; - -_return: - - SCH_JOB_DLOG("job errCode updated to %x - %s", errCode, tstrerror(errCode)); -} - - int32_t schSetJobQueryRes(SSchJob* pJob, SQueryResult* pRes) { pRes->code = atomic_load_32(&pJob->errCode); pRes->numOfRows = pJob->resNumOfRows;