feature/qnode

This commit is contained in:
dapan1121 2022-01-13 11:36:58 +08:00
parent 5cf4edacbb
commit 4c5a6ccf94
2 changed files with 160 additions and 141 deletions

View File

@ -114,9 +114,13 @@ typedef struct SQWorkerMgmt {
#define QW_SCH_ELOG(param, ...) qError("QW:%p SID:%"PRIx64 param, mgmt, sId, __VA_ARGS__) #define QW_SCH_ELOG(param, ...) qError("QW:%p SID:%"PRIx64 param, mgmt, sId, __VA_ARGS__)
#define QW_SCH_DLOG(param, ...) qDebug("QW:%p SID:%"PRIx64 param, mgmt, sId, __VA_ARGS__) #define QW_SCH_DLOG(param, ...) qDebug("QW:%p SID:%"PRIx64 param, mgmt, sId, __VA_ARGS__)
#define QW_TASK_ELOG(param, ...) qError("QW:%p SID:%"PRIx64",QID:%"PRIx64",TID:%"PRIx64 param, mgmt, sId, qId, tId, __VA_ARGS__) #define QW_TASK_ELOG(param, ...) qError("QW:%p QID:%"PRIx64",TID:%"PRIx64 param, mgmt, qId, tId, __VA_ARGS__)
#define QW_TASK_WLOG(param, ...) qWarn("QW:%p SID:%"PRIx64",QID:%"PRIx64",TID:%"PRIx64 param, mgmt, sId, qId, tId, __VA_ARGS__) #define QW_TASK_WLOG(param, ...) qWarn("QW:%p QID:%"PRIx64",TID:%"PRIx64 param, mgmt, qId, tId, __VA_ARGS__)
#define QW_TASK_DLOG(param, ...) qDebug("QW:%p SID:%"PRIx64",QID:%"PRIx64",TID:%"PRIx64 param, mgmt, sId, qId, tId, __VA_ARGS__) #define QW_TASK_DLOG(param, ...) qDebug("QW:%p QID:%"PRIx64",TID:%"PRIx64 param, mgmt, qId, tId, __VA_ARGS__)
#define QW_SCH_TASK_ELOG(param, ...) qError("QW:%p SID:%"PRIx64",QID:%"PRIx64",TID:%"PRIx64 param, mgmt, sId, qId, tId, __VA_ARGS__)
#define QW_SCH_TASK_WLOG(param, ...) qWarn("QW:%p SID:%"PRIx64",QID:%"PRIx64",TID:%"PRIx64 param, mgmt, sId, qId, tId, __VA_ARGS__)
#define QW_SCH_TASK_DLOG(param, ...) qDebug("QW:%p SID:%"PRIx64",QID:%"PRIx64",TID:%"PRIx64 param, mgmt, sId, qId, tId, __VA_ARGS__)
#define TD_RWLATCH_WRITE_FLAG_COPY 0x40000000 #define TD_RWLATCH_WRITE_FLAG_COPY 0x40000000
@ -155,8 +159,9 @@ typedef struct SQWorkerMgmt {
static int32_t qwAcquireScheduler(int32_t rwType, SQWorkerMgmt *mgmt, uint64_t sId, SQWSchStatus **sch); int32_t qwAcquireScheduler(int32_t rwType, SQWorkerMgmt *mgmt, uint64_t sId, SQWSchStatus **sch);
static int32_t qwAddAcquireScheduler(int32_t rwType, SQWorkerMgmt *mgmt, uint64_t sId, SQWSchStatus **sch); int32_t qwAcquireAddScheduler(int32_t rwType, SQWorkerMgmt *mgmt, uint64_t sId, SQWSchStatus **sch);
int32_t qwAcquireTask(SQWorkerMgmt *mgmt, int32_t rwType, SQWSchStatus *sch, uint64_t qId, uint64_t tId, SQWTaskStatus **task);
#ifdef __cplusplus #ifdef __cplusplus

View File

@ -108,7 +108,7 @@ int32_t qwAddTaskHandlesToCache(SQWorkerMgmt *mgmt, uint64_t qId, uint64_t tId,
QW_LOCK(QW_WRITE, &mgmt->ctxLock); QW_LOCK(QW_WRITE, &mgmt->ctxLock);
if (0 != taosHashPut(mgmt->ctxHash, id, sizeof(id), &resCache, sizeof(SQWTaskCtx))) { if (0 != taosHashPut(mgmt->ctxHash, id, sizeof(id), &resCache, sizeof(SQWTaskCtx))) {
QW_UNLOCK(QW_WRITE, &mgmt->ctxLock); QW_UNLOCK(QW_WRITE, &mgmt->ctxLock);
qError("taosHashPut queryId[%"PRIx64"] taskId[%"PRIx64"] to resHash failed", qId, tId); QW_TASK_ELOG("taosHashPut task ctx to ctxHash failed, taskHandle:%p, sinkHandle:%p", taskHandle, sinkHandle);
return TSDB_CODE_QRY_APP_ERROR; return TSDB_CODE_QRY_APP_ERROR;
} }
@ -117,7 +117,7 @@ int32_t qwAddTaskHandlesToCache(SQWorkerMgmt *mgmt, uint64_t qId, uint64_t tId,
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t qwAddScheduler(int32_t rwType, SQWorkerMgmt *mgmt, uint64_t sId, SQWSchStatus **sch) { int32_t qwAddScheduler(int32_t rwType, SQWorkerMgmt *mgmt, uint64_t sId, SQWSchStatus **sch) {
SQWSchStatus newSch = {0}; SQWSchStatus newSch = {0};
newSch.tasksHash = taosHashInit(mgmt->cfg.maxSchTaskNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); newSch.tasksHash = taosHashInit(mgmt->cfg.maxSchTaskNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
if (NULL == newSch.tasksHash) { if (NULL == newSch.tasksHash) {
@ -150,7 +150,7 @@ static int32_t qwAddScheduler(int32_t rwType, SQWorkerMgmt *mgmt, uint64_t sId,
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t qwAcquireSchedulerImpl(int32_t rwType, SQWorkerMgmt *mgmt, uint64_t sId, SQWSchStatus **sch, int32_t nOpt) { int32_t qwAcquireSchedulerImpl(int32_t rwType, SQWorkerMgmt *mgmt, uint64_t sId, SQWSchStatus **sch, int32_t nOpt) {
QW_LOCK(rwType, &mgmt->schLock); QW_LOCK(rwType, &mgmt->schLock);
*sch = taosHashGet(mgmt->schHash, &sId, sizeof(sId)); *sch = taosHashGet(mgmt->schHash, &sId, sizeof(sId));
if (NULL == (*sch)) { if (NULL == (*sch)) {
@ -168,42 +168,19 @@ static int32_t qwAcquireSchedulerImpl(int32_t rwType, SQWorkerMgmt *mgmt, uint64
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t qwAddAcquireScheduler(int32_t rwType, SQWorkerMgmt *mgmt, uint64_t sId, SQWSchStatus **sch) { int32_t qwAcquireAddScheduler(int32_t rwType, SQWorkerMgmt *mgmt, uint64_t sId, SQWSchStatus **sch) {
return qwAcquireSchedulerImpl(rwType, mgmt, sId, sch, QW_NOT_EXIST_ADD); return qwAcquireSchedulerImpl(rwType, mgmt, sId, sch, QW_NOT_EXIST_ADD);
} }
static int32_t qwAcquireScheduler(int32_t rwType, SQWorkerMgmt *mgmt, uint64_t sId, SQWSchStatus **sch) { int32_t qwAcquireScheduler(int32_t rwType, SQWorkerMgmt *mgmt, uint64_t sId, SQWSchStatus **sch) {
return qwAcquireSchedulerImpl(rwType, mgmt, sId, sch, QW_NOT_EXIST_RET_ERR); return qwAcquireSchedulerImpl(rwType, mgmt, sId, sch, QW_NOT_EXIST_RET_ERR);
} }
static FORCE_INLINE void qwReleaseScheduler(int32_t rwType, SQWorkerMgmt *mgmt) { void qwReleaseScheduler(int32_t rwType, SQWorkerMgmt *mgmt) {
QW_UNLOCK(rwType, &mgmt->schLock); QW_UNLOCK(rwType, &mgmt->schLock);
} }
static int32_t qwAcquireTaskImpl(int32_t rwType, SQWSchStatus *sch, uint64_t qId, uint64_t tId, SQWTaskStatus **task) { int32_t qwAddTaskImpl(SQWorkerMgmt *mgmt, SQWSchStatus *sch, int32_t rwType, uint64_t qId, uint64_t tId, int32_t status, int32_t eOpt, SQWTaskStatus **task) {
char id[sizeof(qId) + sizeof(tId)] = {0};
QW_SET_QTID(id, qId, tId);
QW_LOCK(rwType, &sch->tasksLock);
*task = taosHashGet(sch->tasksHash, id, sizeof(id));
if (NULL == (*task)) {
QW_UNLOCK(rwType, &sch->tasksLock);
return TSDB_CODE_QRY_TASK_NOT_EXIST;
}
return TSDB_CODE_SUCCESS;
}
static int32_t qwAcquireTask(int32_t rwType, SQWSchStatus *sch, uint64_t qId, uint64_t tId, SQWTaskStatus **task) {
return qwAcquireTaskImpl(rwType, sch, qId, tId, task);
}
static FORCE_INLINE void qwReleaseTask(int32_t rwType, SQWSchStatus *sch) {
QW_UNLOCK(rwType, &sch->tasksLock);
}
int32_t qwAddTaskToSch(int32_t rwType, SQWSchStatus *sch, uint64_t qId, uint64_t tId, int8_t status, int32_t eOpt, SQWTaskStatus **task) {
int32_t code = 0; int32_t code = 0;
char id[sizeof(qId) + sizeof(tId)] = {0}; char id[sizeof(qId) + sizeof(tId)] = {0};
@ -212,23 +189,18 @@ int32_t qwAddTaskToSch(int32_t rwType, SQWSchStatus *sch, uint64_t qId, uint64_t
SQWTaskStatus ntask = {0}; SQWTaskStatus ntask = {0};
ntask.status = status; ntask.status = status;
while (true) {
QW_LOCK(QW_WRITE, &sch->tasksLock); QW_LOCK(QW_WRITE, &sch->tasksLock);
int32_t code = taosHashPut(sch->tasksHash, id, sizeof(id), &ntask, sizeof(ntask)); code = taosHashPut(sch->tasksHash, id, sizeof(id), &ntask, sizeof(ntask));
if (0 != code) { if (0 != code) {
QW_UNLOCK(QW_WRITE, &sch->tasksLock); QW_UNLOCK(QW_WRITE, &sch->tasksLock);
if (HASH_NODE_EXIST(code)) { if (HASH_NODE_EXIST(code)) {
if (QW_EXIST_ACQUIRE == eOpt && rwType && task) { if (QW_EXIST_ACQUIRE == eOpt && rwType && task) {
if (qwAcquireTask(rwType, sch, qId, tId, task)) { QW_ERR_RET(qwAcquireTask(mgmt, rwType, sch, qId, tId, task));
continue;
}
} else if (QW_EXIST_RET_ERR == eOpt) { } else if (QW_EXIST_RET_ERR == eOpt) {
return TSDB_CODE_QRY_TASK_ALREADY_EXIST; return TSDB_CODE_QRY_TASK_ALREADY_EXIST;
} else { } else {
assert(0); assert(0);
} }
break;
} else { } else {
qError("taosHashPut queryId[%"PRIx64"] taskId[%"PRIx64"] to scheduleHash failed", qId, tId); qError("taosHashPut queryId[%"PRIx64"] taskId[%"PRIx64"] to scheduleHash failed", qId, tId);
return TSDB_CODE_QRY_APP_ERROR; return TSDB_CODE_QRY_APP_ERROR;
@ -237,37 +209,63 @@ int32_t qwAddTaskToSch(int32_t rwType, SQWSchStatus *sch, uint64_t qId, uint64_t
QW_UNLOCK(QW_WRITE, &sch->tasksLock); QW_UNLOCK(QW_WRITE, &sch->tasksLock);
if (rwType && task) { if (QW_EXIST_ACQUIRE == eOpt && rwType && task) {
if (TSDB_CODE_SUCCESS == qwAcquireTask(rwType, sch, qId, tId, task)) { QW_ERR_RET(qwAcquireTask(mgmt, rwType, sch, qId, tId, task));
return TSDB_CODE_SUCCESS;
}
} else {
break;
}
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t qwAddTask(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, int32_t status, int32_t eOpt, SQWSchStatus **sch, SQWTaskStatus **task) { int32_t qwAddTask(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, int32_t status) {
SQWSchStatus *tsch = NULL; SQWSchStatus *tsch = NULL;
QW_ERR_RET(qwAddAcquireScheduler(QW_READ, mgmt, sId, &tsch)); int32_t code = 0;
QW_ERR_RET(qwAcquireAddScheduler(QW_READ, mgmt, sId, &tsch));
int32_t code = qwAddTaskToSch(QW_READ, tsch, qId, tId, status, eOpt, task); QW_ERR_JRET(qwAddTaskImpl(mgmt, tsch, 0, qId, tId, JOB_TASK_STATUS_NOT_START, QW_EXIST_RET_ERR, NULL));
if (code) {
qwReleaseScheduler(QW_WRITE, mgmt); _return:
}
if (NULL == task) {
qwReleaseScheduler(QW_READ, mgmt); qwReleaseScheduler(QW_READ, mgmt);
} else if (sch) { QW_ERR_RET(code);
*sch = tsch;
}
QW_RET(code);
} }
static FORCE_INLINE int32_t qwAcquireTaskCtx(int32_t rwType, SQWorkerMgmt *mgmt, uint64_t queryId, uint64_t taskId, SQWTaskCtx **handles) {
int32_t qwAcquireTaskImpl(SQWorkerMgmt *mgmt, int32_t rwType, SQWSchStatus *sch, uint64_t qId, uint64_t tId, int32_t status, int32_t nOpt, SQWTaskStatus **task) {
char id[sizeof(qId) + sizeof(tId)] = {0};
QW_SET_QTID(id, qId, tId);
QW_LOCK(rwType, &sch->tasksLock);
*task = taosHashGet(sch->tasksHash, id, sizeof(id));
if (NULL == (*task)) {
QW_UNLOCK(rwType, &sch->tasksLock);
if (QW_NOT_EXIST_ADD == nOpt) {
QW_ERR_RET(qwAddTaskImpl(mgmt, sch, rwType, qId, tId, status, QW_EXIST_ACQUIRE, task));
} else if (QW_NOT_EXIST_RET_ERR == nOpt) {
return TSDB_CODE_QRY_TASK_NOT_EXIST;
} else {
assert(0);
}
}
return TSDB_CODE_SUCCESS;
}
int32_t qwAcquireTask(SQWorkerMgmt *mgmt, int32_t rwType, SQWSchStatus *sch, uint64_t qId, uint64_t tId, SQWTaskStatus **task) {
return qwAcquireTaskImpl(mgmt, rwType, sch, qId, tId, 0, QW_NOT_EXIST_RET_ERR, task);
}
int32_t qwAcquireAddTask(SQWorkerMgmt *mgmt, int32_t rwType, SQWSchStatus *sch, uint64_t qId, uint64_t tId, int32_t status, SQWTaskStatus **task) {
return qwAcquireTaskImpl(mgmt, rwType, sch, qId, tId, status, QW_NOT_EXIST_ADD, task);
}
void qwReleaseTask(int32_t rwType, SQWSchStatus *sch) {
QW_UNLOCK(rwType, &sch->tasksLock);
}
int32_t qwAcquireTaskCtx(int32_t rwType, SQWorkerMgmt *mgmt, uint64_t queryId, uint64_t taskId, SQWTaskCtx **handles) {
char id[sizeof(queryId) + sizeof(taskId)] = {0}; char id[sizeof(queryId) + sizeof(taskId)] = {0};
QW_SET_QTID(id, queryId, taskId); QW_SET_QTID(id, queryId, taskId);
@ -281,7 +279,7 @@ static FORCE_INLINE int32_t qwAcquireTaskCtx(int32_t rwType, SQWorkerMgmt *mgmt,
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static FORCE_INLINE void qwReleaseTaskResCache(int32_t rwType, SQWorkerMgmt *mgmt) { void qwReleaseTaskResCache(int32_t rwType, SQWorkerMgmt *mgmt) {
QW_UNLOCK(rwType, &mgmt->ctxLock); QW_UNLOCK(rwType, &mgmt->ctxLock);
} }
@ -352,7 +350,7 @@ int32_t qwUpdateTaskStatus(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint6
QW_ERR_RET(qwAcquireScheduler(QW_READ, mgmt, sId, &sch)); QW_ERR_RET(qwAcquireScheduler(QW_READ, mgmt, sId, &sch));
QW_ERR_JRET(qwAcquireTask(QW_READ, sch, qId, tId, &task)); QW_ERR_JRET(qwAcquireTask(mgmt, QW_READ, sch, qId, tId, &task));
QW_LOCK(QW_WRITE, &task->lock); QW_LOCK(QW_WRITE, &task->lock);
qwUpdateTaskInfo(mgmt, task, QW_TASK_INFO_STATUS, &status, QW_IDS()); qwUpdateTaskInfo(mgmt, task, QW_TASK_INFO_STATUS, &status, QW_IDS());
@ -377,7 +375,7 @@ int32_t qwGetTaskStatus(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t queryId, uint
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
if (qwAcquireTask(QW_READ, sch, queryId, taskId, &task)) { if (qwAcquireTask(mgmt, QW_READ, sch, queryId, taskId, &task)) {
qwReleaseScheduler(QW_READ, mgmt); qwReleaseScheduler(QW_READ, mgmt);
*taskStatus = JOB_TASK_STATUS_NULL; *taskStatus = JOB_TASK_STATUS_NULL;
@ -398,17 +396,10 @@ int32_t qwCancelTask(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tI
SQWTaskStatus *task = NULL; SQWTaskStatus *task = NULL;
int32_t code = 0; int32_t code = 0;
QW_ERR_RET(qwAddAcquireScheduler(QW_READ, mgmt, sId, &sch)); QW_ERR_RET(qwAcquireAddScheduler(QW_READ, mgmt, sId, &sch));
if (qwAcquireTask(QW_READ, sch, qId, tId, &task)) { QW_ERR_JRET(qwAcquireAddTask(mgmt, QW_READ, sch, qId, tId, JOB_TASK_STATUS_NOT_START, &task));
qwReleaseScheduler(QW_READ, mgmt);
code = qwAddTask(mgmt, sId, qId, tId, JOB_TASK_STATUS_NOT_START, QW_EXIST_ACQUIRE, &sch, &task);
if (code) {
qwReleaseScheduler(QW_READ, mgmt);
QW_ERR_RET(code);
}
}
QW_LOCK(QW_WRITE, &task->lock); QW_LOCK(QW_WRITE, &task->lock);
@ -458,6 +449,42 @@ _return:
QW_RET(code); QW_RET(code);
} }
// caller should make sure task is not running
int32_t qwDropTaskCtx(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId) {
char id[sizeof(qId) + sizeof(tId)] = {0};
QW_SET_QTID(id, qId, tId);
QW_LOCK(QW_WRITE, &mgmt->ctxLock);
SQWTaskCtx *ctx = taosHashGet(mgmt->ctxHash, id, sizeof(id));
if (NULL == ctx) {
QW_UNLOCK(QW_WRITE, &mgmt->ctxLock);
return TSDB_CODE_QRY_RES_CACHE_NOT_EXIST;
}
if (ctx->taskHandle) {
qDestroyTask(ctx->taskHandle);
ctx->taskHandle = NULL;
}
if (ctx->sinkHandle) {
dsDestroyDataSinker(ctx->sinkHandle);
ctx->sinkHandle = NULL;
}
if (taosHashRemove(mgmt->ctxHash, id, sizeof(id))) {
QW_TASK_ELOG("taosHashRemove from ctx hash failed, id:%s", id);
QW_UNLOCK(QW_WRITE, &mgmt->ctxLock);
return TSDB_CODE_QRY_RES_CACHE_NOT_EXIST;
}
QW_UNLOCK(QW_WRITE, &mgmt->ctxLock);
return TSDB_CODE_SUCCESS;
}
int32_t qwDropTask(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId) { int32_t qwDropTask(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId) {
SQWSchStatus *sch = NULL; SQWSchStatus *sch = NULL;
SQWTaskStatus *task = NULL; SQWTaskStatus *task = NULL;
@ -466,20 +493,14 @@ int32_t qwDropTask(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId)
char id[sizeof(qId) + sizeof(tId)] = {0}; char id[sizeof(qId) + sizeof(tId)] = {0};
QW_SET_QTID(id, qId, tId); QW_SET_QTID(id, qId, tId);
QW_LOCK(QW_WRITE, &mgmt->ctxLock); qwDropTaskCtx(mgmt, sId, qId, tId);
if (mgmt->ctxHash) {
if (taosHashRemove(mgmt->ctxHash, id, sizeof(id))) {
QW_TASK_WLOG("taosHashRemove from ctx hash failed, id:%s", id);
}
}
QW_UNLOCK(QW_WRITE, &mgmt->ctxLock);
if (qwAcquireScheduler(QW_WRITE, mgmt, sId, &sch)) { if (qwAcquireScheduler(QW_WRITE, mgmt, sId, &sch)) {
QW_TASK_WLOG("scheduler does not exist, sch:%p", sch); QW_TASK_WLOG("scheduler does not exist, sch:%p", sch);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
if (qwAcquireTask(QW_WRITE, sch, qId, tId, &task)) { if (qwAcquireTask(mgmt, QW_WRITE, sch, qId, tId, &task)) {
qwReleaseScheduler(QW_WRITE, mgmt); qwReleaseScheduler(QW_WRITE, mgmt);
QW_TASK_WLOG("task does not exist, task:%p", task); QW_TASK_WLOG("task does not exist, task:%p", task);
@ -506,17 +527,9 @@ int32_t qwCancelDropTask(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_
SQWTaskStatus *task = NULL; SQWTaskStatus *task = NULL;
int32_t code = 0; int32_t code = 0;
QW_ERR_RET(qwAddAcquireScheduler(QW_READ, mgmt, sId, &sch)); QW_ERR_RET(qwAcquireAddScheduler(QW_READ, mgmt, sId, &sch));
if (qwAcquireTask(QW_READ, sch, qId, tId, &task)) { QW_ERR_JRET(qwAcquireAddTask(mgmt, QW_READ, sch, qId, tId, JOB_TASK_STATUS_NOT_START, &task));
qwReleaseScheduler(QW_READ, mgmt);
code = qwAddTask(mgmt, sId, qId, tId, JOB_TASK_STATUS_NOT_START, QW_EXIST_ACQUIRE, &sch, &task);
if (code) {
qwReleaseScheduler(QW_READ, mgmt);
QW_ERR_RET(code);
}
}
QW_LOCK(QW_WRITE, &task->lock); QW_LOCK(QW_WRITE, &task->lock);
@ -772,7 +785,7 @@ int32_t qwCheckAndSendReadyRsp(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t queryI
QW_ERR_RET(qwAcquireScheduler(QW_READ, mgmt, sId, &sch)); QW_ERR_RET(qwAcquireScheduler(QW_READ, mgmt, sId, &sch));
QW_ERR_JRET(qwAcquireTask(QW_READ, sch, queryId, taskId, &task)); QW_ERR_JRET(qwAcquireTask(mgmt, QW_READ, sch, queryId, taskId, &task));
QW_LOCK(QW_WRITE, &task->lock); QW_LOCK(QW_WRITE, &task->lock);
@ -814,7 +827,7 @@ int32_t qwSetAndSendReadyRsp(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t queryId,
QW_ERR_RET(qwAcquireScheduler(QW_READ, mgmt, sId, &sch)); QW_ERR_RET(qwAcquireScheduler(QW_READ, mgmt, sId, &sch));
QW_ERR_JRET(qwAcquireTask(QW_READ, sch, queryId, taskId, &task)); QW_ERR_JRET(qwAcquireTask(mgmt, QW_READ, sch, queryId, taskId, &task));
QW_LOCK(QW_WRITE, &task->lock); QW_LOCK(QW_WRITE, &task->lock);
if (QW_TASK_READY_RESP(task->status)) { if (QW_TASK_READY_RESP(task->status)) {
@ -843,7 +856,7 @@ _return:
QW_RET(code); QW_RET(code);
} }
int32_t qwCheckTaskCancelDrop(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, bool *needStop) { int32_t qwCheckAndProcessTaskDrop(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, bool *needStop) {
SQWSchStatus *sch = NULL; SQWSchStatus *sch = NULL;
SQWTaskStatus *task = NULL; SQWTaskStatus *task = NULL;
int32_t code = 0; int32_t code = 0;
@ -855,7 +868,7 @@ int32_t qwCheckTaskCancelDrop(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, ui
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
if (qwAcquireTask(QW_READ, sch, qId, tId, &task)) { if (qwAcquireTask(mgmt, QW_READ, sch, qId, tId, &task)) {
qwReleaseScheduler(QW_READ, mgmt); qwReleaseScheduler(QW_READ, mgmt);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -867,10 +880,7 @@ int32_t qwCheckTaskCancelDrop(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, ui
QW_UNLOCK(QW_READ, &task->lock); QW_UNLOCK(QW_READ, &task->lock);
qwReleaseTask(QW_READ, sch); QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
qwReleaseScheduler(QW_READ, mgmt);
QW_RET(TSDB_CODE_QRY_APP_ERROR);
} }
QW_UNLOCK(QW_READ, &task->lock); QW_UNLOCK(QW_READ, &task->lock);
@ -907,24 +917,18 @@ int32_t qwQueryPostProcess(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint6
int32_t code = 0; int32_t code = 0;
int8_t newStatus = JOB_TASK_STATUS_CANCELLED; int8_t newStatus = JOB_TASK_STATUS_CANCELLED;
code = qwAddAcquireScheduler(QW_READ, mgmt, sId, &sch); code = qwAcquireAddScheduler(QW_READ, mgmt, sId, &sch);
if (code) { if (code) {
qError("sId:%"PRIx64" not in cache", sId); QW_TASK_ELOG("sId:%"PRIx64" not in cache", sId);
QW_ERR_RET(code); QW_ERR_RET(code);
} }
code = qwAcquireTask(QW_READ, sch, qId, tId, &task); code = qwAcquireTask(mgmt, QW_READ, sch, qId, tId, &task);
if (code) { if (code) {
qwReleaseScheduler(QW_READ, mgmt); QW_TASK_ELOG("sId:%"PRIx64" queryId:%"PRIx64" taskId:%"PRIx64" not in cache", sId, qId, tId);
if (JOB_TASK_STATUS_PARTIAL_SUCCEED == status || JOB_TASK_STATUS_SUCCEED == status) {
qError("sId:%"PRIx64" queryId:%"PRIx64" taskId:%"PRIx64" not in cache", sId, qId, tId);
QW_ERR_RET(code); QW_ERR_RET(code);
} }
QW_ERR_RET(qwAddTask(mgmt, sId, qId, tId, status, QW_EXIST_ACQUIRE, &sch, &task));
}
if (task->cancel) { if (task->cancel) {
QW_LOCK(QW_WRITE, &task->lock); QW_LOCK(QW_WRITE, &task->lock);
qwUpdateTaskInfo(mgmt, task, QW_TASK_INFO_STATUS, &newStatus, QW_IDS()); qwUpdateTaskInfo(mgmt, task, QW_TASK_INFO_STATUS, &newStatus, QW_IDS());
@ -940,7 +944,7 @@ int32_t qwQueryPostProcess(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint6
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
if (!(task->cancel || task->drop)) { if ((!(task->cancel || task->drop)) && status > 0) {
QW_LOCK(QW_WRITE, &task->lock); QW_LOCK(QW_WRITE, &task->lock);
qwUpdateTaskInfo(mgmt, task, QW_TASK_INFO_STATUS, &status, QW_IDS()); qwUpdateTaskInfo(mgmt, task, QW_TASK_INFO_STATUS, &status, QW_IDS());
task->code = errCode; task->code = errCode;
@ -1053,7 +1057,7 @@ int32_t qwHandleFetch(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t queryId, uint64
} }
QW_ERR_JRET(qwAcquireScheduler(QW_READ, mgmt, sId, &sch)); QW_ERR_JRET(qwAcquireScheduler(QW_READ, mgmt, sId, &sch));
QW_ERR_JRET(qwAcquireTask(QW_READ, sch, queryId, taskId, &task)); QW_ERR_JRET(qwAcquireTask(mgmt, QW_READ, sch, queryId, taskId, &task));
QW_LOCK(QW_READ, &task->lock); QW_LOCK(QW_READ, &task->lock);
@ -1208,6 +1212,7 @@ int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
int32_t code = 0; int32_t code = 0;
bool queryRsped = false; bool queryRsped = false;
bool needStop = false; bool needStop = false;
bool taskAdded = false;
struct SSubplan *plan = NULL; struct SSubplan *plan = NULL;
SSubQueryMsg *msg = pMsg->pCont; SSubQueryMsg *msg = pMsg->pCont;
SQWorkerMgmt *mgmt = (SQWorkerMgmt *)qWorkerMgmt; SQWorkerMgmt *mgmt = (SQWorkerMgmt *)qWorkerMgmt;
@ -1226,28 +1231,31 @@ int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
uint64_t qId = msg->queryId; uint64_t qId = msg->queryId;
uint64_t tId = msg->taskId; uint64_t tId = msg->taskId;
QW_ERR_JRET(qwCheckTaskCancelDrop(qWorkerMgmt, msg->sId, msg->queryId, msg->taskId, &needStop)); QW_ERR_JRET(qwCheckAndProcessTaskDrop(qWorkerMgmt, msg->sId, msg->queryId, msg->taskId, &needStop));
if (needStop) { if (needStop) {
qWarn("task need stop"); QW_TASK_DLOG("task need stop, msgLen:%d", msg->contentLen);
qwBuildAndSendQueryRsp(pMsg, TSDB_CODE_QRY_TASK_CANCELLED); qwBuildAndSendQueryRsp(pMsg, TSDB_CODE_QRY_TASK_CANCELLED);
QW_ERR_RET(TSDB_CODE_QRY_TASK_CANCELLED); QW_ERR_RET(TSDB_CODE_QRY_TASK_CANCELLED);
} }
code = qStringToSubplan(msg->msg, &plan); code = qStringToSubplan(msg->msg, &plan);
if (TSDB_CODE_SUCCESS != code) { if (TSDB_CODE_SUCCESS != code) {
qError("schId:%"PRIx64",qId:%"PRIx64",taskId:%"PRIx64" string to subplan failed, code:%d", msg->sId, msg->queryId, msg->taskId, code); QW_TASK_ELOG("string to subplan failed, code:%d", code);
QW_ERR_JRET(code); QW_ERR_JRET(code);
} }
qTaskInfo_t pTaskInfo = NULL; qTaskInfo_t pTaskInfo = NULL;
code = qCreateExecTask(node, 0, (struct SSubplan *)plan, &pTaskInfo); code = qCreateExecTask(node, 0, (struct SSubplan *)plan, &pTaskInfo);
if (code) { if (code) {
qError("qCreateExecTask failed, code:%x", code); QW_TASK_ELOG("qCreateExecTask failed, code:%x", code);
QW_ERR_JRET(qwAddTask(qWorkerMgmt, sId, qId, tId, JOB_TASK_STATUS_FAILED));
QW_ERR_JRET(code); QW_ERR_JRET(code);
} else {
QW_ERR_JRET(qwAddTask(qWorkerMgmt, msg->sId, msg->queryId, msg->taskId, JOB_TASK_STATUS_EXECUTING, QW_EXIST_RET_ERR, NULL, NULL));
} }
QW_ERR_JRET(qwAddTask(qWorkerMgmt, sId, qId, tId, JOB_TASK_STATUS_EXECUTING));
taskAdded = true;
QW_ERR_JRET(qwBuildAndSendQueryRsp(pMsg, TSDB_CODE_SUCCESS)); QW_ERR_JRET(qwBuildAndSendQueryRsp(pMsg, TSDB_CODE_SUCCESS));
queryRsped = true; queryRsped = true;
@ -1256,12 +1264,12 @@ int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
code = qExecTask(pTaskInfo, &sinkHandle); code = qExecTask(pTaskInfo, &sinkHandle);
if (code) { if (code) {
qError("qExecTask failed, code:%x", code); QW_TASK_ELOG("qExecTask failed, code:%x", code);
QW_ERR_JRET(code); QW_ERR_JRET(code);
} else { }
QW_ERR_JRET(qwAddTaskHandlesToCache(qWorkerMgmt, msg->queryId, msg->taskId, pTaskInfo, sinkHandle)); QW_ERR_JRET(qwAddTaskHandlesToCache(qWorkerMgmt, msg->queryId, msg->taskId, pTaskInfo, sinkHandle));
QW_ERR_JRET(qwUpdateTaskStatus(qWorkerMgmt, msg->sId, msg->queryId, msg->taskId, JOB_TASK_STATUS_PARTIAL_SUCCEED)); QW_ERR_JRET(qwUpdateTaskStatus(qWorkerMgmt, msg->sId, msg->queryId, msg->taskId, JOB_TASK_STATUS_PARTIAL_SUCCEED));
}
_return: _return:
@ -1278,6 +1286,12 @@ _return:
status = JOB_TASK_STATUS_PARTIAL_SUCCEED; status = JOB_TASK_STATUS_PARTIAL_SUCCEED;
} }
if (!taskAdded) {
qwAddTask(qWorkerMgmt, sId, qId, tId, status);
status = -1;
}
qwQueryPostProcess(qWorkerMgmt, msg->sId, msg->queryId, msg->taskId, status, code); qwQueryPostProcess(qWorkerMgmt, msg->sId, msg->queryId, msg->taskId, status, code);
QW_RET(code); QW_RET(code);
@ -1299,7 +1313,7 @@ int32_t qWorkerProcessQueryContinueMsg(void *node, void *qWorkerMgmt, SRpcMsg *p
qwReleaseTaskResCache(QW_READ, qWorkerMgmt); qwReleaseTaskResCache(QW_READ, qWorkerMgmt);
QW_ERR_JRET(qwCheckTaskCancelDrop(qWorkerMgmt, req->sId, req->queryId, req->taskId, &needStop)); QW_ERR_JRET(qwCheckAndProcessTaskDrop(qWorkerMgmt, req->sId, req->queryId, req->taskId, &needStop));
if (needStop) { if (needStop) {
qWarn("task need stop"); qWarn("task need stop");
if (needRsp) { if (needRsp) {