fix: some query tasks not end when vnode stopped
This commit is contained in:
parent
6e9b5d1be6
commit
9facfe83d1
|
@ -114,14 +114,14 @@ typedef struct SQWTaskStatus {
|
|||
typedef struct SQWTaskCtx {
|
||||
SRWLatch lock;
|
||||
int8_t phase;
|
||||
int8_t inFetch;
|
||||
int8_t taskType;
|
||||
int8_t explain;
|
||||
int8_t needFetch;
|
||||
int8_t localExec;
|
||||
int32_t msgType;
|
||||
int32_t fetchType;
|
||||
int32_t execId;
|
||||
int32_t level;
|
||||
uint64_t sId;
|
||||
|
||||
bool queryGotData;
|
||||
bool queryRsped;
|
||||
|
@ -221,8 +221,16 @@ typedef struct SQWorkerMgmt {
|
|||
#define QW_GET_PHASE(ctx) atomic_load_8(&(ctx)->phase)
|
||||
#define QW_SET_PHASE(ctx, _value) \
|
||||
do { \
|
||||
if ((_value) != QW_PHASE_PRE_FETCH && (_value) != QW_PHASE_POST_FETCH) { \
|
||||
atomic_store_8(&(ctx)->phase, _value); \
|
||||
switch (_value) { \
|
||||
case QW_PHASE_PRE_FETCH: \
|
||||
ctx->inFetch = 1; \
|
||||
break; \
|
||||
case QW_PHASE_POST_FETCH: \
|
||||
ctx->inFetch = 0; \
|
||||
break; \
|
||||
default: \
|
||||
atomic_store_8(&(ctx)->phase, _value); \
|
||||
break; \
|
||||
} \
|
||||
} while (0)
|
||||
|
||||
|
@ -230,6 +238,7 @@ typedef struct SQWorkerMgmt {
|
|||
#define QW_UPDATE_RSP_CODE(ctx, code) atomic_val_compare_exchange_32(&(ctx)->rspCode, 0, code)
|
||||
|
||||
#define QW_QUERY_RUNNING(ctx) (QW_GET_PHASE(ctx) == QW_PHASE_PRE_QUERY || QW_GET_PHASE(ctx) == QW_PHASE_PRE_CQUERY)
|
||||
#define QW_FETCH_RUNNING(ctx) ((ctx)->inFetch)
|
||||
|
||||
#define QW_SET_QTID(id, qId, tId, eId) \
|
||||
do { \
|
||||
|
|
|
@ -124,11 +124,11 @@ void qwDbgDumpTasksInfo(SQWorker *mgmt) {
|
|||
void *key = taosHashGetKey(pIter, NULL);
|
||||
QW_GET_QTID(key, qId, tId, eId);
|
||||
|
||||
QW_TASK_DLOG("%p lock:%x, phase:%d, type:%d, explain:%d, needFetch:%d, localExec:%d, msgType:%d, fetchType:%d, "
|
||||
"execId:%x, level:%d, queryGotData:%d, queryRsped:%d, queryEnd:%d, queryContinue:%d, queryInQueue:%d, "
|
||||
QW_TASK_DLOG("%p lock:%x, phase:%d, type:%d, explain:%d, needFetch:%d, localExec:%d, msgType:%d, "
|
||||
"sId:%x, level:%d, queryGotData:%d, queryRsped:%d, queryEnd:%d, queryContinue:%d, queryInQueue:%d, "
|
||||
"rspCode:%x, affectedRows:%" PRId64 ", taskHandle:%p, sinkHandle:%p, tbFName:%s, sver:%d, tver:%d, events:%d,%d,%d,%d,%d",
|
||||
ctx, ctx->lock, ctx->phase, ctx->taskType, ctx->explain, ctx->needFetch, ctx->localExec, ctx->msgType,
|
||||
ctx->fetchType, ctx->execId, ctx->level, ctx->queryGotData, ctx->queryRsped, ctx->queryEnd, ctx->queryContinue,
|
||||
ctx->sId, ctx->level, ctx->queryGotData, ctx->queryRsped, ctx->queryEnd, ctx->queryContinue,
|
||||
ctx->queryInQueue, ctx->rspCode, ctx->affectedRows, ctx->taskHandle, ctx->sinkHandle, ctx->tbInfo.tbFName,
|
||||
ctx->tbInfo.sversion, ctx->tbInfo.tversion, ctx->events[QW_EVENT_CANCEL], ctx->events[QW_EVENT_READY],
|
||||
ctx->events[QW_EVENT_FETCH], ctx->events[QW_EVENT_DROP], ctx->events[QW_EVENT_CQUERY]);
|
||||
|
|
|
@ -508,14 +508,6 @@ int32_t qwHandlePostPhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inp
|
|||
}
|
||||
|
||||
if (QW_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) {
|
||||
if (QW_PHASE_POST_FETCH == phase) {
|
||||
QW_TASK_WLOG("drop received at wrong phase %s", qwPhaseStr(phase));
|
||||
QW_ERR_JRET(TSDB_CODE_APP_ERROR);
|
||||
}
|
||||
|
||||
// qwBuildAndSendDropRsp(&ctx->ctrlConnInfo, code);
|
||||
// QW_TASK_DLOG("drop rsp send, handle:%p, code:%x - %s", ctx->ctrlConnInfo.handle, code, tstrerror(code));
|
||||
|
||||
QW_ERR_JRET(qwDropTask(QW_FPARAMS()));
|
||||
QW_ERR_JRET(ctx->rspCode);
|
||||
}
|
||||
|
@ -580,6 +572,7 @@ int32_t qwPreprocessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
|
|||
QW_ERR_JRET(qwAcquireTaskCtx(QW_FPARAMS(), &ctx));
|
||||
|
||||
ctx->ctrlConnInfo = qwMsg->connInfo;
|
||||
ctx->sId = sId;
|
||||
ctx->phase = -1;
|
||||
|
||||
QW_ERR_JRET(qwAddTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_INIT));
|
||||
|
@ -670,7 +663,7 @@ _return:
|
|||
qwMsg->connInfo = ctx->dataConnInfo;
|
||||
QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_FETCH);
|
||||
|
||||
qwBuildAndSendFetchRsp(ctx->fetchType, &qwMsg->connInfo, rsp, dataLen, code);
|
||||
qwBuildAndSendFetchRsp(ctx->msgType + 1, &qwMsg->connInfo, rsp, dataLen, code);
|
||||
rsp = NULL;
|
||||
|
||||
QW_TASK_DLOG("fetch rsp send, handle:%p, code:%x - %s, dataLen:%d", qwMsg->connInfo.handle, code, tstrerror(code),
|
||||
|
@ -722,7 +715,7 @@ int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
|
|||
qwMsg->connInfo = ctx->dataConnInfo;
|
||||
QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_FETCH);
|
||||
|
||||
qwBuildAndSendFetchRsp(ctx->fetchType, &qwMsg->connInfo, rsp, dataLen, code);
|
||||
qwBuildAndSendFetchRsp(ctx->msgType + 1, &qwMsg->connInfo, rsp, dataLen, code);
|
||||
rsp = NULL;
|
||||
|
||||
QW_TASK_DLOG("fetch rsp send, handle:%p, code:%x - %s, dataLen:%d", qwMsg->connInfo.handle, code,
|
||||
|
@ -744,7 +737,7 @@ int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
|
|||
rsp = NULL;
|
||||
|
||||
qwMsg->connInfo = ctx->dataConnInfo;
|
||||
qwBuildAndSendFetchRsp(ctx->fetchType, &qwMsg->connInfo, NULL, 0, code);
|
||||
qwBuildAndSendFetchRsp(ctx->msgType + 1, &qwMsg->connInfo, NULL, 0, code);
|
||||
QW_TASK_DLOG("fetch rsp send, handle:%p, code:%x - %s, dataLen:%d", qwMsg->connInfo.handle, code, tstrerror(code),
|
||||
0);
|
||||
}
|
||||
|
@ -1178,8 +1171,9 @@ void qWorkerStopAllTasks(void *qWorkerMgmt) {
|
|||
|
||||
QW_DLOG("start to stop all tasks, taskNum:%d", taosHashGetSize(mgmt->ctxHash));
|
||||
|
||||
uint64_t qId, tId;
|
||||
uint64_t qId, tId, sId;
|
||||
int32_t eId;
|
||||
int64_t rId = 0;
|
||||
void *pIter = taosHashIterate(mgmt->ctxHash, NULL);
|
||||
while (pIter) {
|
||||
SQWTaskCtx *ctx = (SQWTaskCtx *)pIter;
|
||||
|
@ -1188,6 +1182,8 @@ void qWorkerStopAllTasks(void *qWorkerMgmt) {
|
|||
|
||||
QW_LOCK(QW_WRITE, &ctx->lock);
|
||||
|
||||
sId = ctx->sId;
|
||||
|
||||
QW_TASK_DLOG_E("start to force stop task");
|
||||
|
||||
if (QW_EVENT_RECEIVED(ctx, QW_EVENT_DROP) || QW_EVENT_PROCESSED(ctx, QW_EVENT_DROP)) {
|
||||
|
@ -1200,9 +1196,11 @@ void qWorkerStopAllTasks(void *qWorkerMgmt) {
|
|||
|
||||
if (QW_QUERY_RUNNING(ctx)) {
|
||||
qwKillTaskHandle(ctx, TSDB_CODE_VND_STOPPED);
|
||||
} else if (!QW_EVENT_PROCESSED(ctx, QW_EVENT_DROP)) {
|
||||
} else if (QW_FETCH_RUNNING(ctx)) {
|
||||
QW_UPDATE_RSP_CODE(ctx, TSDB_CODE_VND_STOPPED);
|
||||
QW_SET_EVENT_RECEIVED(ctx, QW_EVENT_DROP);
|
||||
QW_SET_EVENT_RECEIVED(ctx, QW_EVENT_DROP);
|
||||
} else {
|
||||
qwDropTask(QW_FPARAMS());
|
||||
}
|
||||
|
||||
QW_UNLOCK(QW_WRITE, &ctx->lock);
|
||||
|
|
Loading…
Reference in New Issue