From 4c5a6ccf946df9efaf661223babdc45cafc2e847 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Thu, 13 Jan 2022 11:36:58 +0800 Subject: [PATCH] feature/qnode --- source/libs/qworker/inc/qworkerInt.h | 15 +- source/libs/qworker/src/qworker.c | 286 ++++++++++++++------------- 2 files changed, 160 insertions(+), 141 deletions(-) diff --git a/source/libs/qworker/inc/qworkerInt.h b/source/libs/qworker/inc/qworkerInt.h index b5ec50cc04..2f47c79065 100644 --- a/source/libs/qworker/inc/qworkerInt.h +++ b/source/libs/qworker/inc/qworkerInt.h @@ -114,9 +114,13 @@ typedef struct SQWorkerMgmt { #define QW_SCH_ELOG(param, ...) qError("QW:%p SID:%"PRIx64 param, mgmt, sId, __VA_ARGS__) #define QW_SCH_DLOG(param, ...) qDebug("QW:%p SID:%"PRIx64 param, mgmt, sId, __VA_ARGS__) -#define QW_TASK_ELOG(param, ...) qError("QW:%p SID:%"PRIx64",QID:%"PRIx64",TID:%"PRIx64 param, mgmt, sId, qId, tId, __VA_ARGS__) -#define QW_TASK_WLOG(param, ...) qWarn("QW:%p SID:%"PRIx64",QID:%"PRIx64",TID:%"PRIx64 param, mgmt, sId, qId, tId, __VA_ARGS__) -#define QW_TASK_DLOG(param, ...) qDebug("QW:%p SID:%"PRIx64",QID:%"PRIx64",TID:%"PRIx64 param, mgmt, sId, qId, tId, __VA_ARGS__) +#define QW_TASK_ELOG(param, ...) qError("QW:%p QID:%"PRIx64",TID:%"PRIx64 param, mgmt, qId, tId, __VA_ARGS__) +#define QW_TASK_WLOG(param, ...) qWarn("QW:%p QID:%"PRIx64",TID:%"PRIx64 param, mgmt, qId, tId, __VA_ARGS__) +#define QW_TASK_DLOG(param, ...) qDebug("QW:%p QID:%"PRIx64",TID:%"PRIx64 param, mgmt, qId, tId, __VA_ARGS__) + +#define QW_SCH_TASK_ELOG(param, ...) qError("QW:%p SID:%"PRIx64",QID:%"PRIx64",TID:%"PRIx64 param, mgmt, sId, qId, tId, __VA_ARGS__) +#define QW_SCH_TASK_WLOG(param, ...) qWarn("QW:%p SID:%"PRIx64",QID:%"PRIx64",TID:%"PRIx64 param, mgmt, sId, qId, tId, __VA_ARGS__) +#define QW_SCH_TASK_DLOG(param, ...) qDebug("QW:%p SID:%"PRIx64",QID:%"PRIx64",TID:%"PRIx64 param, mgmt, sId, qId, tId, __VA_ARGS__) #define TD_RWLATCH_WRITE_FLAG_COPY 0x40000000 @@ -155,8 +159,9 @@ typedef struct SQWorkerMgmt { -static int32_t qwAcquireScheduler(int32_t rwType, SQWorkerMgmt *mgmt, uint64_t sId, SQWSchStatus **sch); -static int32_t qwAddAcquireScheduler(int32_t rwType, SQWorkerMgmt *mgmt, uint64_t sId, SQWSchStatus **sch); +int32_t qwAcquireScheduler(int32_t rwType, SQWorkerMgmt *mgmt, uint64_t sId, SQWSchStatus **sch); +int32_t qwAcquireAddScheduler(int32_t rwType, SQWorkerMgmt *mgmt, uint64_t sId, SQWSchStatus **sch); +int32_t qwAcquireTask(SQWorkerMgmt *mgmt, int32_t rwType, SQWSchStatus *sch, uint64_t qId, uint64_t tId, SQWTaskStatus **task); #ifdef __cplusplus diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index 23d74ae91d..921298995f 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -108,7 +108,7 @@ int32_t qwAddTaskHandlesToCache(SQWorkerMgmt *mgmt, uint64_t qId, uint64_t tId, QW_LOCK(QW_WRITE, &mgmt->ctxLock); if (0 != taosHashPut(mgmt->ctxHash, id, sizeof(id), &resCache, sizeof(SQWTaskCtx))) { QW_UNLOCK(QW_WRITE, &mgmt->ctxLock); - qError("taosHashPut queryId[%"PRIx64"] taskId[%"PRIx64"] to resHash failed", qId, tId); + QW_TASK_ELOG("taosHashPut task ctx to ctxHash failed, taskHandle:%p, sinkHandle:%p", taskHandle, sinkHandle); return TSDB_CODE_QRY_APP_ERROR; } @@ -117,7 +117,7 @@ int32_t qwAddTaskHandlesToCache(SQWorkerMgmt *mgmt, uint64_t qId, uint64_t tId, return TSDB_CODE_SUCCESS; } -static int32_t qwAddScheduler(int32_t rwType, SQWorkerMgmt *mgmt, uint64_t sId, SQWSchStatus **sch) { +int32_t qwAddScheduler(int32_t rwType, SQWorkerMgmt *mgmt, uint64_t sId, SQWSchStatus **sch) { SQWSchStatus newSch = {0}; newSch.tasksHash = taosHashInit(mgmt->cfg.maxSchTaskNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); if (NULL == newSch.tasksHash) { @@ -150,7 +150,7 @@ static int32_t qwAddScheduler(int32_t rwType, SQWorkerMgmt *mgmt, uint64_t sId, return TSDB_CODE_SUCCESS; } -static int32_t qwAcquireSchedulerImpl(int32_t rwType, SQWorkerMgmt *mgmt, uint64_t sId, SQWSchStatus **sch, int32_t nOpt) { +int32_t qwAcquireSchedulerImpl(int32_t rwType, SQWorkerMgmt *mgmt, uint64_t sId, SQWSchStatus **sch, int32_t nOpt) { QW_LOCK(rwType, &mgmt->schLock); *sch = taosHashGet(mgmt->schHash, &sId, sizeof(sId)); if (NULL == (*sch)) { @@ -168,42 +168,19 @@ static int32_t qwAcquireSchedulerImpl(int32_t rwType, SQWorkerMgmt *mgmt, uint64 return TSDB_CODE_SUCCESS; } -static int32_t qwAddAcquireScheduler(int32_t rwType, SQWorkerMgmt *mgmt, uint64_t sId, SQWSchStatus **sch) { +int32_t qwAcquireAddScheduler(int32_t rwType, SQWorkerMgmt *mgmt, uint64_t sId, SQWSchStatus **sch) { return qwAcquireSchedulerImpl(rwType, mgmt, sId, sch, QW_NOT_EXIST_ADD); } -static int32_t qwAcquireScheduler(int32_t rwType, SQWorkerMgmt *mgmt, uint64_t sId, SQWSchStatus **sch) { +int32_t qwAcquireScheduler(int32_t rwType, SQWorkerMgmt *mgmt, uint64_t sId, SQWSchStatus **sch) { return qwAcquireSchedulerImpl(rwType, mgmt, sId, sch, QW_NOT_EXIST_RET_ERR); } -static FORCE_INLINE void qwReleaseScheduler(int32_t rwType, SQWorkerMgmt *mgmt) { +void qwReleaseScheduler(int32_t rwType, SQWorkerMgmt *mgmt) { QW_UNLOCK(rwType, &mgmt->schLock); } -static int32_t qwAcquireTaskImpl(int32_t rwType, SQWSchStatus *sch, uint64_t qId, uint64_t tId, SQWTaskStatus **task) { - char id[sizeof(qId) + sizeof(tId)] = {0}; - QW_SET_QTID(id, qId, tId); - - QW_LOCK(rwType, &sch->tasksLock); - *task = taosHashGet(sch->tasksHash, id, sizeof(id)); - if (NULL == (*task)) { - QW_UNLOCK(rwType, &sch->tasksLock); - - return TSDB_CODE_QRY_TASK_NOT_EXIST; - } - - return TSDB_CODE_SUCCESS; -} - -static int32_t qwAcquireTask(int32_t rwType, SQWSchStatus *sch, uint64_t qId, uint64_t tId, SQWTaskStatus **task) { - return qwAcquireTaskImpl(rwType, sch, qId, tId, task); -} - -static FORCE_INLINE void qwReleaseTask(int32_t rwType, SQWSchStatus *sch) { - QW_UNLOCK(rwType, &sch->tasksLock); -} - -int32_t qwAddTaskToSch(int32_t rwType, SQWSchStatus *sch, uint64_t qId, uint64_t tId, int8_t status, int32_t eOpt, SQWTaskStatus **task) { +int32_t qwAddTaskImpl(SQWorkerMgmt *mgmt, SQWSchStatus *sch, int32_t rwType, uint64_t qId, uint64_t tId, int32_t status, int32_t eOpt, SQWTaskStatus **task) { int32_t code = 0; char id[sizeof(qId) + sizeof(tId)] = {0}; @@ -212,62 +189,83 @@ int32_t qwAddTaskToSch(int32_t rwType, SQWSchStatus *sch, uint64_t qId, uint64_t SQWTaskStatus ntask = {0}; ntask.status = status; - while (true) { - QW_LOCK(QW_WRITE, &sch->tasksLock); - int32_t code = taosHashPut(sch->tasksHash, id, sizeof(id), &ntask, sizeof(ntask)); - if (0 != code) { - QW_UNLOCK(QW_WRITE, &sch->tasksLock); - if (HASH_NODE_EXIST(code)) { - if (QW_EXIST_ACQUIRE == eOpt && rwType && task) { - if (qwAcquireTask(rwType, sch, qId, tId, task)) { - continue; - } - } else if (QW_EXIST_RET_ERR == eOpt) { - return TSDB_CODE_QRY_TASK_ALREADY_EXIST; - } else { - assert(0); - } - - break; - } else { - qError("taosHashPut queryId[%"PRIx64"] taskId[%"PRIx64"] to scheduleHash failed", qId, tId); - return TSDB_CODE_QRY_APP_ERROR; - } - } - + QW_LOCK(QW_WRITE, &sch->tasksLock); + code = taosHashPut(sch->tasksHash, id, sizeof(id), &ntask, sizeof(ntask)); + if (0 != code) { QW_UNLOCK(QW_WRITE, &sch->tasksLock); - - if (rwType && task) { - if (TSDB_CODE_SUCCESS == qwAcquireTask(rwType, sch, qId, tId, task)) { - return TSDB_CODE_SUCCESS; + if (HASH_NODE_EXIST(code)) { + if (QW_EXIST_ACQUIRE == eOpt && rwType && task) { + QW_ERR_RET(qwAcquireTask(mgmt, rwType, sch, qId, tId, task)); + } else if (QW_EXIST_RET_ERR == eOpt) { + return TSDB_CODE_QRY_TASK_ALREADY_EXIST; + } else { + assert(0); } } else { - break; + qError("taosHashPut queryId[%"PRIx64"] taskId[%"PRIx64"] to scheduleHash failed", qId, tId); + return TSDB_CODE_QRY_APP_ERROR; } - } + } + + QW_UNLOCK(QW_WRITE, &sch->tasksLock); + + if (QW_EXIST_ACQUIRE == eOpt && rwType && task) { + QW_ERR_RET(qwAcquireTask(mgmt, rwType, sch, qId, tId, task)); + } return TSDB_CODE_SUCCESS; } -static int32_t qwAddTask(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, int32_t status, int32_t eOpt, SQWSchStatus **sch, SQWTaskStatus **task) { +int32_t qwAddTask(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, int32_t status) { SQWSchStatus *tsch = NULL; - QW_ERR_RET(qwAddAcquireScheduler(QW_READ, mgmt, sId, &tsch)); + int32_t code = 0; + QW_ERR_RET(qwAcquireAddScheduler(QW_READ, mgmt, sId, &tsch)); - int32_t code = qwAddTaskToSch(QW_READ, tsch, qId, tId, status, eOpt, task); - if (code) { - qwReleaseScheduler(QW_WRITE, mgmt); - } + QW_ERR_JRET(qwAddTaskImpl(mgmt, tsch, 0, qId, tId, JOB_TASK_STATUS_NOT_START, QW_EXIST_RET_ERR, NULL)); - if (NULL == task) { - qwReleaseScheduler(QW_READ, mgmt); - } else if (sch) { - *sch = tsch; - } +_return: - QW_RET(code); + qwReleaseScheduler(QW_READ, mgmt); + QW_ERR_RET(code); } -static FORCE_INLINE int32_t qwAcquireTaskCtx(int32_t rwType, SQWorkerMgmt *mgmt, uint64_t queryId, uint64_t taskId, SQWTaskCtx **handles) { + +int32_t qwAcquireTaskImpl(SQWorkerMgmt *mgmt, int32_t rwType, SQWSchStatus *sch, uint64_t qId, uint64_t tId, int32_t status, int32_t nOpt, SQWTaskStatus **task) { + char id[sizeof(qId) + sizeof(tId)] = {0}; + QW_SET_QTID(id, qId, tId); + + QW_LOCK(rwType, &sch->tasksLock); + *task = taosHashGet(sch->tasksHash, id, sizeof(id)); + if (NULL == (*task)) { + QW_UNLOCK(rwType, &sch->tasksLock); + + if (QW_NOT_EXIST_ADD == nOpt) { + QW_ERR_RET(qwAddTaskImpl(mgmt, sch, rwType, qId, tId, status, QW_EXIST_ACQUIRE, task)); + } else if (QW_NOT_EXIST_RET_ERR == nOpt) { + return TSDB_CODE_QRY_TASK_NOT_EXIST; + } else { + assert(0); + } + } + + return TSDB_CODE_SUCCESS; +} + +int32_t qwAcquireTask(SQWorkerMgmt *mgmt, int32_t rwType, SQWSchStatus *sch, uint64_t qId, uint64_t tId, SQWTaskStatus **task) { + return qwAcquireTaskImpl(mgmt, rwType, sch, qId, tId, 0, QW_NOT_EXIST_RET_ERR, task); +} + +int32_t qwAcquireAddTask(SQWorkerMgmt *mgmt, int32_t rwType, SQWSchStatus *sch, uint64_t qId, uint64_t tId, int32_t status, SQWTaskStatus **task) { + return qwAcquireTaskImpl(mgmt, rwType, sch, qId, tId, status, QW_NOT_EXIST_ADD, task); +} + + +void qwReleaseTask(int32_t rwType, SQWSchStatus *sch) { + QW_UNLOCK(rwType, &sch->tasksLock); +} + + +int32_t qwAcquireTaskCtx(int32_t rwType, SQWorkerMgmt *mgmt, uint64_t queryId, uint64_t taskId, SQWTaskCtx **handles) { char id[sizeof(queryId) + sizeof(taskId)] = {0}; QW_SET_QTID(id, queryId, taskId); @@ -281,7 +279,7 @@ static FORCE_INLINE int32_t qwAcquireTaskCtx(int32_t rwType, SQWorkerMgmt *mgmt, return TSDB_CODE_SUCCESS; } -static FORCE_INLINE void qwReleaseTaskResCache(int32_t rwType, SQWorkerMgmt *mgmt) { +void qwReleaseTaskResCache(int32_t rwType, SQWorkerMgmt *mgmt) { QW_UNLOCK(rwType, &mgmt->ctxLock); } @@ -352,7 +350,7 @@ int32_t qwUpdateTaskStatus(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint6 QW_ERR_RET(qwAcquireScheduler(QW_READ, mgmt, sId, &sch)); - QW_ERR_JRET(qwAcquireTask(QW_READ, sch, qId, tId, &task)); + QW_ERR_JRET(qwAcquireTask(mgmt, QW_READ, sch, qId, tId, &task)); QW_LOCK(QW_WRITE, &task->lock); qwUpdateTaskInfo(mgmt, task, QW_TASK_INFO_STATUS, &status, QW_IDS()); @@ -377,7 +375,7 @@ int32_t qwGetTaskStatus(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t queryId, uint return TSDB_CODE_SUCCESS; } - if (qwAcquireTask(QW_READ, sch, queryId, taskId, &task)) { + if (qwAcquireTask(mgmt, QW_READ, sch, queryId, taskId, &task)) { qwReleaseScheduler(QW_READ, mgmt); *taskStatus = JOB_TASK_STATUS_NULL; @@ -398,17 +396,10 @@ int32_t qwCancelTask(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tI SQWTaskStatus *task = NULL; int32_t code = 0; - QW_ERR_RET(qwAddAcquireScheduler(QW_READ, mgmt, sId, &sch)); + QW_ERR_RET(qwAcquireAddScheduler(QW_READ, mgmt, sId, &sch)); + + QW_ERR_JRET(qwAcquireAddTask(mgmt, QW_READ, sch, qId, tId, JOB_TASK_STATUS_NOT_START, &task)); - if (qwAcquireTask(QW_READ, sch, qId, tId, &task)) { - qwReleaseScheduler(QW_READ, mgmt); - - code = qwAddTask(mgmt, sId, qId, tId, JOB_TASK_STATUS_NOT_START, QW_EXIST_ACQUIRE, &sch, &task); - if (code) { - qwReleaseScheduler(QW_READ, mgmt); - QW_ERR_RET(code); - } - } QW_LOCK(QW_WRITE, &task->lock); @@ -458,6 +449,42 @@ _return: QW_RET(code); } + +// caller should make sure task is not running +int32_t qwDropTaskCtx(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId) { + char id[sizeof(qId) + sizeof(tId)] = {0}; + QW_SET_QTID(id, qId, tId); + + QW_LOCK(QW_WRITE, &mgmt->ctxLock); + SQWTaskCtx *ctx = taosHashGet(mgmt->ctxHash, id, sizeof(id)); + if (NULL == ctx) { + QW_UNLOCK(QW_WRITE, &mgmt->ctxLock); + return TSDB_CODE_QRY_RES_CACHE_NOT_EXIST; + } + + if (ctx->taskHandle) { + qDestroyTask(ctx->taskHandle); + ctx->taskHandle = NULL; + } + + if (ctx->sinkHandle) { + dsDestroyDataSinker(ctx->sinkHandle); + ctx->sinkHandle = NULL; + } + + if (taosHashRemove(mgmt->ctxHash, id, sizeof(id))) { + QW_TASK_ELOG("taosHashRemove from ctx hash failed, id:%s", id); + + QW_UNLOCK(QW_WRITE, &mgmt->ctxLock); + return TSDB_CODE_QRY_RES_CACHE_NOT_EXIST; + } + + QW_UNLOCK(QW_WRITE, &mgmt->ctxLock); + + return TSDB_CODE_SUCCESS; +} + + int32_t qwDropTask(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId) { SQWSchStatus *sch = NULL; SQWTaskStatus *task = NULL; @@ -466,20 +493,14 @@ int32_t qwDropTask(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId) char id[sizeof(qId) + sizeof(tId)] = {0}; QW_SET_QTID(id, qId, tId); - QW_LOCK(QW_WRITE, &mgmt->ctxLock); - if (mgmt->ctxHash) { - if (taosHashRemove(mgmt->ctxHash, id, sizeof(id))) { - QW_TASK_WLOG("taosHashRemove from ctx hash failed, id:%s", id); - } - } - QW_UNLOCK(QW_WRITE, &mgmt->ctxLock); + qwDropTaskCtx(mgmt, sId, qId, tId); if (qwAcquireScheduler(QW_WRITE, mgmt, sId, &sch)) { QW_TASK_WLOG("scheduler does not exist, sch:%p", sch); return TSDB_CODE_SUCCESS; } - if (qwAcquireTask(QW_WRITE, sch, qId, tId, &task)) { + if (qwAcquireTask(mgmt, QW_WRITE, sch, qId, tId, &task)) { qwReleaseScheduler(QW_WRITE, mgmt); QW_TASK_WLOG("task does not exist, task:%p", task); @@ -506,17 +527,9 @@ int32_t qwCancelDropTask(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_ SQWTaskStatus *task = NULL; int32_t code = 0; - QW_ERR_RET(qwAddAcquireScheduler(QW_READ, mgmt, sId, &sch)); + QW_ERR_RET(qwAcquireAddScheduler(QW_READ, mgmt, sId, &sch)); - if (qwAcquireTask(QW_READ, sch, qId, tId, &task)) { - qwReleaseScheduler(QW_READ, mgmt); - - code = qwAddTask(mgmt, sId, qId, tId, JOB_TASK_STATUS_NOT_START, QW_EXIST_ACQUIRE, &sch, &task); - if (code) { - qwReleaseScheduler(QW_READ, mgmt); - QW_ERR_RET(code); - } - } + QW_ERR_JRET(qwAcquireAddTask(mgmt, QW_READ, sch, qId, tId, JOB_TASK_STATUS_NOT_START, &task)); QW_LOCK(QW_WRITE, &task->lock); @@ -772,7 +785,7 @@ int32_t qwCheckAndSendReadyRsp(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t queryI QW_ERR_RET(qwAcquireScheduler(QW_READ, mgmt, sId, &sch)); - QW_ERR_JRET(qwAcquireTask(QW_READ, sch, queryId, taskId, &task)); + QW_ERR_JRET(qwAcquireTask(mgmt, QW_READ, sch, queryId, taskId, &task)); QW_LOCK(QW_WRITE, &task->lock); @@ -814,7 +827,7 @@ int32_t qwSetAndSendReadyRsp(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t queryId, QW_ERR_RET(qwAcquireScheduler(QW_READ, mgmt, sId, &sch)); - QW_ERR_JRET(qwAcquireTask(QW_READ, sch, queryId, taskId, &task)); + QW_ERR_JRET(qwAcquireTask(mgmt, QW_READ, sch, queryId, taskId, &task)); QW_LOCK(QW_WRITE, &task->lock); if (QW_TASK_READY_RESP(task->status)) { @@ -843,7 +856,7 @@ _return: QW_RET(code); } -int32_t qwCheckTaskCancelDrop(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, bool *needStop) { +int32_t qwCheckAndProcessTaskDrop(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, bool *needStop) { SQWSchStatus *sch = NULL; SQWTaskStatus *task = NULL; int32_t code = 0; @@ -855,7 +868,7 @@ int32_t qwCheckTaskCancelDrop(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, ui return TSDB_CODE_SUCCESS; } - if (qwAcquireTask(QW_READ, sch, qId, tId, &task)) { + if (qwAcquireTask(mgmt, QW_READ, sch, qId, tId, &task)) { qwReleaseScheduler(QW_READ, mgmt); return TSDB_CODE_SUCCESS; } @@ -867,10 +880,7 @@ int32_t qwCheckTaskCancelDrop(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, ui QW_UNLOCK(QW_READ, &task->lock); - qwReleaseTask(QW_READ, sch); - qwReleaseScheduler(QW_READ, mgmt); - - QW_RET(TSDB_CODE_QRY_APP_ERROR); + QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR); } QW_UNLOCK(QW_READ, &task->lock); @@ -907,22 +917,16 @@ int32_t qwQueryPostProcess(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint6 int32_t code = 0; int8_t newStatus = JOB_TASK_STATUS_CANCELLED; - code = qwAddAcquireScheduler(QW_READ, mgmt, sId, &sch); + code = qwAcquireAddScheduler(QW_READ, mgmt, sId, &sch); if (code) { - qError("sId:%"PRIx64" not in cache", sId); + QW_TASK_ELOG("sId:%"PRIx64" not in cache", sId); QW_ERR_RET(code); } - code = qwAcquireTask(QW_READ, sch, qId, tId, &task); + code = qwAcquireTask(mgmt, QW_READ, sch, qId, tId, &task); if (code) { - qwReleaseScheduler(QW_READ, mgmt); - - if (JOB_TASK_STATUS_PARTIAL_SUCCEED == status || JOB_TASK_STATUS_SUCCEED == status) { - qError("sId:%"PRIx64" queryId:%"PRIx64" taskId:%"PRIx64" not in cache", sId, qId, tId); - QW_ERR_RET(code); - } - - QW_ERR_RET(qwAddTask(mgmt, sId, qId, tId, status, QW_EXIST_ACQUIRE, &sch, &task)); + QW_TASK_ELOG("sId:%"PRIx64" queryId:%"PRIx64" taskId:%"PRIx64" not in cache", sId, qId, tId); + QW_ERR_RET(code); } if (task->cancel) { @@ -940,7 +944,7 @@ int32_t qwQueryPostProcess(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint6 return TSDB_CODE_SUCCESS; } - if (!(task->cancel || task->drop)) { + if ((!(task->cancel || task->drop)) && status > 0) { QW_LOCK(QW_WRITE, &task->lock); qwUpdateTaskInfo(mgmt, task, QW_TASK_INFO_STATUS, &status, QW_IDS()); task->code = errCode; @@ -1053,7 +1057,7 @@ int32_t qwHandleFetch(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t queryId, uint64 } QW_ERR_JRET(qwAcquireScheduler(QW_READ, mgmt, sId, &sch)); - QW_ERR_JRET(qwAcquireTask(QW_READ, sch, queryId, taskId, &task)); + QW_ERR_JRET(qwAcquireTask(mgmt, QW_READ, sch, queryId, taskId, &task)); QW_LOCK(QW_READ, &task->lock); @@ -1208,6 +1212,7 @@ int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { int32_t code = 0; bool queryRsped = false; bool needStop = false; + bool taskAdded = false; struct SSubplan *plan = NULL; SSubQueryMsg *msg = pMsg->pCont; SQWorkerMgmt *mgmt = (SQWorkerMgmt *)qWorkerMgmt; @@ -1226,27 +1231,30 @@ int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { uint64_t qId = msg->queryId; uint64_t tId = msg->taskId; - QW_ERR_JRET(qwCheckTaskCancelDrop(qWorkerMgmt, msg->sId, msg->queryId, msg->taskId, &needStop)); + QW_ERR_JRET(qwCheckAndProcessTaskDrop(qWorkerMgmt, msg->sId, msg->queryId, msg->taskId, &needStop)); if (needStop) { - qWarn("task need stop"); + QW_TASK_DLOG("task need stop, msgLen:%d", msg->contentLen); qwBuildAndSendQueryRsp(pMsg, TSDB_CODE_QRY_TASK_CANCELLED); QW_ERR_RET(TSDB_CODE_QRY_TASK_CANCELLED); } code = qStringToSubplan(msg->msg, &plan); if (TSDB_CODE_SUCCESS != code) { - qError("schId:%"PRIx64",qId:%"PRIx64",taskId:%"PRIx64" string to subplan failed, code:%d", msg->sId, msg->queryId, msg->taskId, code); + QW_TASK_ELOG("string to subplan failed, code:%d", code); QW_ERR_JRET(code); } qTaskInfo_t pTaskInfo = NULL; code = qCreateExecTask(node, 0, (struct SSubplan *)plan, &pTaskInfo); if (code) { - qError("qCreateExecTask failed, code:%x", code); + QW_TASK_ELOG("qCreateExecTask failed, code:%x", code); + QW_ERR_JRET(qwAddTask(qWorkerMgmt, sId, qId, tId, JOB_TASK_STATUS_FAILED)); QW_ERR_JRET(code); - } else { - QW_ERR_JRET(qwAddTask(qWorkerMgmt, msg->sId, msg->queryId, msg->taskId, JOB_TASK_STATUS_EXECUTING, QW_EXIST_RET_ERR, NULL, NULL)); } + + QW_ERR_JRET(qwAddTask(qWorkerMgmt, sId, qId, tId, JOB_TASK_STATUS_EXECUTING)); + + taskAdded = true; QW_ERR_JRET(qwBuildAndSendQueryRsp(pMsg, TSDB_CODE_SUCCESS)); @@ -1256,12 +1264,12 @@ int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { code = qExecTask(pTaskInfo, &sinkHandle); if (code) { - qError("qExecTask failed, code:%x", code); + QW_TASK_ELOG("qExecTask failed, code:%x", code); QW_ERR_JRET(code); - } else { - QW_ERR_JRET(qwAddTaskHandlesToCache(qWorkerMgmt, msg->queryId, msg->taskId, pTaskInfo, sinkHandle)); - QW_ERR_JRET(qwUpdateTaskStatus(qWorkerMgmt, msg->sId, msg->queryId, msg->taskId, JOB_TASK_STATUS_PARTIAL_SUCCEED)); - } + } + + QW_ERR_JRET(qwAddTaskHandlesToCache(qWorkerMgmt, msg->queryId, msg->taskId, pTaskInfo, sinkHandle)); + QW_ERR_JRET(qwUpdateTaskStatus(qWorkerMgmt, msg->sId, msg->queryId, msg->taskId, JOB_TASK_STATUS_PARTIAL_SUCCEED)); _return: @@ -1278,6 +1286,12 @@ _return: status = JOB_TASK_STATUS_PARTIAL_SUCCEED; } + if (!taskAdded) { + qwAddTask(qWorkerMgmt, sId, qId, tId, status); + + status = -1; + } + qwQueryPostProcess(qWorkerMgmt, msg->sId, msg->queryId, msg->taskId, status, code); QW_RET(code); @@ -1299,7 +1313,7 @@ int32_t qWorkerProcessQueryContinueMsg(void *node, void *qWorkerMgmt, SRpcMsg *p qwReleaseTaskResCache(QW_READ, qWorkerMgmt); - QW_ERR_JRET(qwCheckTaskCancelDrop(qWorkerMgmt, req->sId, req->queryId, req->taskId, &needStop)); + QW_ERR_JRET(qwCheckAndProcessTaskDrop(qWorkerMgmt, req->sId, req->queryId, req->taskId, &needStop)); if (needStop) { qWarn("task need stop"); if (needRsp) {