Merge pull request #19239 from taosdata/fix/TD-21574
fix: some query tasks not end when vnode stopped
This commit is contained in:
commit
4da5957d07
|
@ -114,14 +114,14 @@ typedef struct SQWTaskStatus {
|
||||||
typedef struct SQWTaskCtx {
|
typedef struct SQWTaskCtx {
|
||||||
SRWLatch lock;
|
SRWLatch lock;
|
||||||
int8_t phase;
|
int8_t phase;
|
||||||
|
int8_t inFetch;
|
||||||
int8_t taskType;
|
int8_t taskType;
|
||||||
int8_t explain;
|
int8_t explain;
|
||||||
int8_t needFetch;
|
int8_t needFetch;
|
||||||
int8_t localExec;
|
int8_t localExec;
|
||||||
int32_t msgType;
|
int32_t msgType;
|
||||||
int32_t fetchType;
|
|
||||||
int32_t execId;
|
|
||||||
int32_t level;
|
int32_t level;
|
||||||
|
uint64_t sId;
|
||||||
|
|
||||||
bool queryGotData;
|
bool queryGotData;
|
||||||
bool queryRsped;
|
bool queryRsped;
|
||||||
|
@ -221,8 +221,16 @@ typedef struct SQWorkerMgmt {
|
||||||
#define QW_GET_PHASE(ctx) atomic_load_8(&(ctx)->phase)
|
#define QW_GET_PHASE(ctx) atomic_load_8(&(ctx)->phase)
|
||||||
#define QW_SET_PHASE(ctx, _value) \
|
#define QW_SET_PHASE(ctx, _value) \
|
||||||
do { \
|
do { \
|
||||||
if ((_value) != QW_PHASE_PRE_FETCH && (_value) != QW_PHASE_POST_FETCH) { \
|
switch (_value) { \
|
||||||
atomic_store_8(&(ctx)->phase, _value); \
|
case QW_PHASE_PRE_FETCH: \
|
||||||
|
ctx->inFetch = 1; \
|
||||||
|
break; \
|
||||||
|
case QW_PHASE_POST_FETCH: \
|
||||||
|
ctx->inFetch = 0; \
|
||||||
|
break; \
|
||||||
|
default: \
|
||||||
|
atomic_store_8(&(ctx)->phase, _value); \
|
||||||
|
break; \
|
||||||
} \
|
} \
|
||||||
} while (0)
|
} while (0)
|
||||||
|
|
||||||
|
@ -230,6 +238,7 @@ typedef struct SQWorkerMgmt {
|
||||||
#define QW_UPDATE_RSP_CODE(ctx, code) atomic_val_compare_exchange_32(&(ctx)->rspCode, 0, code)
|
#define QW_UPDATE_RSP_CODE(ctx, code) atomic_val_compare_exchange_32(&(ctx)->rspCode, 0, code)
|
||||||
|
|
||||||
#define QW_QUERY_RUNNING(ctx) (QW_GET_PHASE(ctx) == QW_PHASE_PRE_QUERY || QW_GET_PHASE(ctx) == QW_PHASE_PRE_CQUERY)
|
#define QW_QUERY_RUNNING(ctx) (QW_GET_PHASE(ctx) == QW_PHASE_PRE_QUERY || QW_GET_PHASE(ctx) == QW_PHASE_PRE_CQUERY)
|
||||||
|
#define QW_FETCH_RUNNING(ctx) ((ctx)->inFetch)
|
||||||
|
|
||||||
#define QW_SET_QTID(id, qId, tId, eId) \
|
#define QW_SET_QTID(id, qId, tId, eId) \
|
||||||
do { \
|
do { \
|
||||||
|
|
|
@ -124,11 +124,11 @@ void qwDbgDumpTasksInfo(SQWorker *mgmt) {
|
||||||
void *key = taosHashGetKey(pIter, NULL);
|
void *key = taosHashGetKey(pIter, NULL);
|
||||||
QW_GET_QTID(key, qId, tId, eId);
|
QW_GET_QTID(key, qId, tId, eId);
|
||||||
|
|
||||||
QW_TASK_DLOG("%p lock:%x, phase:%d, type:%d, explain:%d, needFetch:%d, localExec:%d, msgType:%d, fetchType:%d, "
|
QW_TASK_DLOG("%p lock:%x, phase:%d, type:%d, explain:%d, needFetch:%d, localExec:%d, msgType:%d, "
|
||||||
"execId:%x, level:%d, queryGotData:%d, queryRsped:%d, queryEnd:%d, queryContinue:%d, queryInQueue:%d, "
|
"sId:%" PRId64 ", level:%d, queryGotData:%d, queryRsped:%d, queryEnd:%d, queryContinue:%d, queryInQueue:%d, "
|
||||||
"rspCode:%x, affectedRows:%" PRId64 ", taskHandle:%p, sinkHandle:%p, tbFName:%s, sver:%d, tver:%d, events:%d,%d,%d,%d,%d",
|
"rspCode:%x, affectedRows:%" PRId64 ", taskHandle:%p, sinkHandle:%p, tbFName:%s, sver:%d, tver:%d, events:%d,%d,%d,%d,%d",
|
||||||
ctx, ctx->lock, ctx->phase, ctx->taskType, ctx->explain, ctx->needFetch, ctx->localExec, ctx->msgType,
|
ctx, ctx->lock, ctx->phase, ctx->taskType, ctx->explain, ctx->needFetch, ctx->localExec, ctx->msgType,
|
||||||
ctx->fetchType, ctx->execId, ctx->level, ctx->queryGotData, ctx->queryRsped, ctx->queryEnd, ctx->queryContinue,
|
ctx->sId, ctx->level, ctx->queryGotData, ctx->queryRsped, ctx->queryEnd, ctx->queryContinue,
|
||||||
ctx->queryInQueue, ctx->rspCode, ctx->affectedRows, ctx->taskHandle, ctx->sinkHandle, ctx->tbInfo.tbFName,
|
ctx->queryInQueue, ctx->rspCode, ctx->affectedRows, ctx->taskHandle, ctx->sinkHandle, ctx->tbInfo.tbFName,
|
||||||
ctx->tbInfo.sversion, ctx->tbInfo.tversion, ctx->events[QW_EVENT_CANCEL], ctx->events[QW_EVENT_READY],
|
ctx->tbInfo.sversion, ctx->tbInfo.tversion, ctx->events[QW_EVENT_CANCEL], ctx->events[QW_EVENT_READY],
|
||||||
ctx->events[QW_EVENT_FETCH], ctx->events[QW_EVENT_DROP], ctx->events[QW_EVENT_CQUERY]);
|
ctx->events[QW_EVENT_FETCH], ctx->events[QW_EVENT_DROP], ctx->events[QW_EVENT_CQUERY]);
|
||||||
|
|
|
@ -508,14 +508,6 @@ int32_t qwHandlePostPhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inp
|
||||||
}
|
}
|
||||||
|
|
||||||
if (QW_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) {
|
if (QW_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) {
|
||||||
if (QW_PHASE_POST_FETCH == phase) {
|
|
||||||
QW_TASK_WLOG("drop received at wrong phase %s", qwPhaseStr(phase));
|
|
||||||
QW_ERR_JRET(TSDB_CODE_APP_ERROR);
|
|
||||||
}
|
|
||||||
|
|
||||||
// qwBuildAndSendDropRsp(&ctx->ctrlConnInfo, code);
|
|
||||||
// QW_TASK_DLOG("drop rsp send, handle:%p, code:%x - %s", ctx->ctrlConnInfo.handle, code, tstrerror(code));
|
|
||||||
|
|
||||||
QW_ERR_JRET(qwDropTask(QW_FPARAMS()));
|
QW_ERR_JRET(qwDropTask(QW_FPARAMS()));
|
||||||
QW_ERR_JRET(ctx->rspCode);
|
QW_ERR_JRET(ctx->rspCode);
|
||||||
}
|
}
|
||||||
|
@ -580,6 +572,7 @@ int32_t qwPreprocessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
|
||||||
QW_ERR_JRET(qwAcquireTaskCtx(QW_FPARAMS(), &ctx));
|
QW_ERR_JRET(qwAcquireTaskCtx(QW_FPARAMS(), &ctx));
|
||||||
|
|
||||||
ctx->ctrlConnInfo = qwMsg->connInfo;
|
ctx->ctrlConnInfo = qwMsg->connInfo;
|
||||||
|
ctx->sId = sId;
|
||||||
ctx->phase = -1;
|
ctx->phase = -1;
|
||||||
|
|
||||||
QW_ERR_JRET(qwAddTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_INIT));
|
QW_ERR_JRET(qwAddTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_INIT));
|
||||||
|
@ -670,7 +663,7 @@ _return:
|
||||||
qwMsg->connInfo = ctx->dataConnInfo;
|
qwMsg->connInfo = ctx->dataConnInfo;
|
||||||
QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_FETCH);
|
QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_FETCH);
|
||||||
|
|
||||||
qwBuildAndSendFetchRsp(ctx->fetchType, &qwMsg->connInfo, rsp, dataLen, code);
|
qwBuildAndSendFetchRsp(ctx->msgType + 1, &qwMsg->connInfo, rsp, dataLen, code);
|
||||||
rsp = NULL;
|
rsp = NULL;
|
||||||
|
|
||||||
QW_TASK_DLOG("fetch rsp send, handle:%p, code:%x - %s, dataLen:%d", qwMsg->connInfo.handle, code, tstrerror(code),
|
QW_TASK_DLOG("fetch rsp send, handle:%p, code:%x - %s, dataLen:%d", qwMsg->connInfo.handle, code, tstrerror(code),
|
||||||
|
@ -722,7 +715,7 @@ int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
|
||||||
qwMsg->connInfo = ctx->dataConnInfo;
|
qwMsg->connInfo = ctx->dataConnInfo;
|
||||||
QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_FETCH);
|
QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_FETCH);
|
||||||
|
|
||||||
qwBuildAndSendFetchRsp(ctx->fetchType, &qwMsg->connInfo, rsp, dataLen, code);
|
qwBuildAndSendFetchRsp(ctx->msgType + 1, &qwMsg->connInfo, rsp, dataLen, code);
|
||||||
rsp = NULL;
|
rsp = NULL;
|
||||||
|
|
||||||
QW_TASK_DLOG("fetch rsp send, handle:%p, code:%x - %s, dataLen:%d", qwMsg->connInfo.handle, code,
|
QW_TASK_DLOG("fetch rsp send, handle:%p, code:%x - %s, dataLen:%d", qwMsg->connInfo.handle, code,
|
||||||
|
@ -744,7 +737,7 @@ int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
|
||||||
rsp = NULL;
|
rsp = NULL;
|
||||||
|
|
||||||
qwMsg->connInfo = ctx->dataConnInfo;
|
qwMsg->connInfo = ctx->dataConnInfo;
|
||||||
qwBuildAndSendFetchRsp(ctx->fetchType, &qwMsg->connInfo, NULL, 0, code);
|
qwBuildAndSendFetchRsp(ctx->msgType + 1, &qwMsg->connInfo, NULL, 0, code);
|
||||||
QW_TASK_DLOG("fetch rsp send, handle:%p, code:%x - %s, dataLen:%d", qwMsg->connInfo.handle, code, tstrerror(code),
|
QW_TASK_DLOG("fetch rsp send, handle:%p, code:%x - %s, dataLen:%d", qwMsg->connInfo.handle, code, tstrerror(code),
|
||||||
0);
|
0);
|
||||||
}
|
}
|
||||||
|
@ -1178,8 +1171,9 @@ void qWorkerStopAllTasks(void *qWorkerMgmt) {
|
||||||
|
|
||||||
QW_DLOG("start to stop all tasks, taskNum:%d", taosHashGetSize(mgmt->ctxHash));
|
QW_DLOG("start to stop all tasks, taskNum:%d", taosHashGetSize(mgmt->ctxHash));
|
||||||
|
|
||||||
uint64_t qId, tId;
|
uint64_t qId, tId, sId;
|
||||||
int32_t eId;
|
int32_t eId;
|
||||||
|
int64_t rId = 0;
|
||||||
void *pIter = taosHashIterate(mgmt->ctxHash, NULL);
|
void *pIter = taosHashIterate(mgmt->ctxHash, NULL);
|
||||||
while (pIter) {
|
while (pIter) {
|
||||||
SQWTaskCtx *ctx = (SQWTaskCtx *)pIter;
|
SQWTaskCtx *ctx = (SQWTaskCtx *)pIter;
|
||||||
|
@ -1188,6 +1182,8 @@ void qWorkerStopAllTasks(void *qWorkerMgmt) {
|
||||||
|
|
||||||
QW_LOCK(QW_WRITE, &ctx->lock);
|
QW_LOCK(QW_WRITE, &ctx->lock);
|
||||||
|
|
||||||
|
sId = ctx->sId;
|
||||||
|
|
||||||
QW_TASK_DLOG_E("start to force stop task");
|
QW_TASK_DLOG_E("start to force stop task");
|
||||||
|
|
||||||
if (QW_EVENT_RECEIVED(ctx, QW_EVENT_DROP) || QW_EVENT_PROCESSED(ctx, QW_EVENT_DROP)) {
|
if (QW_EVENT_RECEIVED(ctx, QW_EVENT_DROP) || QW_EVENT_PROCESSED(ctx, QW_EVENT_DROP)) {
|
||||||
|
@ -1200,9 +1196,11 @@ void qWorkerStopAllTasks(void *qWorkerMgmt) {
|
||||||
|
|
||||||
if (QW_QUERY_RUNNING(ctx)) {
|
if (QW_QUERY_RUNNING(ctx)) {
|
||||||
qwKillTaskHandle(ctx, TSDB_CODE_VND_STOPPED);
|
qwKillTaskHandle(ctx, TSDB_CODE_VND_STOPPED);
|
||||||
} else if (!QW_EVENT_PROCESSED(ctx, QW_EVENT_DROP)) {
|
} else if (QW_FETCH_RUNNING(ctx)) {
|
||||||
QW_UPDATE_RSP_CODE(ctx, TSDB_CODE_VND_STOPPED);
|
QW_UPDATE_RSP_CODE(ctx, TSDB_CODE_VND_STOPPED);
|
||||||
QW_SET_EVENT_RECEIVED(ctx, QW_EVENT_DROP);
|
QW_SET_EVENT_RECEIVED(ctx, QW_EVENT_DROP);
|
||||||
|
} else {
|
||||||
|
qwDropTask(QW_FPARAMS());
|
||||||
}
|
}
|
||||||
|
|
||||||
QW_UNLOCK(QW_WRITE, &ctx->lock);
|
QW_UNLOCK(QW_WRITE, &ctx->lock);
|
||||||
|
|
Loading…
Reference in New Issue