From d9b22b32991c072fe81e5ffd2ea4e3e8f5c3a087 Mon Sep 17 00:00:00 2001 From: slzhou Date: Wed, 18 Jan 2023 15:13:16 +0800 Subject: [PATCH] fix: add node stopped and get/accquire ctx return node stopped --- source/libs/qworker/inc/qwInt.h | 2 ++ source/libs/qworker/src/qwUtil.c | 21 +++++++++++++++++---- source/libs/qworker/src/qworker.c | 3 +++ 3 files changed, 22 insertions(+), 4 deletions(-) diff --git a/source/libs/qworker/inc/qwInt.h b/source/libs/qworker/inc/qwInt.h index aa1ce80903..bde05d4116 100644 --- a/source/libs/qworker/inc/qwInt.h +++ b/source/libs/qworker/inc/qwInt.h @@ -194,6 +194,8 @@ typedef struct SQWorker { SMsgCb msgCb; SQWStat stat; int32_t *destroyed; + + int8_t nodeStopped; } SQWorker; typedef struct SQWorkerMgmt { diff --git a/source/libs/qworker/src/qwUtil.c b/source/libs/qworker/src/qwUtil.c index fdd2775daa..7ee7c50c96 100644 --- a/source/libs/qworker/src/qwUtil.c +++ b/source/libs/qworker/src/qwUtil.c @@ -213,9 +213,15 @@ int32_t qwAcquireTaskCtx(QW_FPARAMS_DEF, SQWTaskCtx **ctx) { QW_SET_QTID(id, qId, tId, eId); *ctx = taosHashAcquire(mgmt->ctxHash, id, sizeof(id)); + int8_t nodeStopped = atomic_load_8(&mgmt->nodeStopped); if (NULL == (*ctx)) { - QW_TASK_DLOG_E("task ctx not exist, may be dropped"); - QW_ERR_RET(TSDB_CODE_QRY_TASK_CTX_NOT_EXIST); + if (!nodeStopped) { + QW_TASK_DLOG_E("task ctx not exist, may be dropped"); + QW_ERR_RET(TSDB_CODE_QRY_TASK_CTX_NOT_EXIST); + } else { + QW_TASK_DLOG_E("node stopped"); + QW_ERR_RET(TSDB_CODE_VND_STOPPED); + } } return TSDB_CODE_SUCCESS; @@ -226,9 +232,16 @@ int32_t qwGetTaskCtx(QW_FPARAMS_DEF, SQWTaskCtx **ctx) { QW_SET_QTID(id, qId, tId, eId); *ctx = taosHashGet(mgmt->ctxHash, id, sizeof(id)); + int8_t nodeStopped = atomic_load_8(&mgmt->nodeStopped); + if (NULL == (*ctx)) { - QW_TASK_DLOG_E("task ctx not exist, may be dropped"); - QW_ERR_RET(TSDB_CODE_QRY_TASK_CTX_NOT_EXIST); + if (!nodeStopped) { + QW_TASK_DLOG_E("task ctx not exist, may be dropped"); + QW_ERR_RET(TSDB_CODE_QRY_TASK_CTX_NOT_EXIST); + } else { + QW_TASK_DLOG_E("node stopped"); + QW_ERR_RET(TSDB_CODE_VND_STOPPED); + } } return TSDB_CODE_SUCCESS; diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index e38361d87f..fedaa96ed9 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -1188,6 +1188,9 @@ void qWorkerStopAllTasks(void *qWorkerMgmt) { uint64_t qId, tId, sId; int32_t eId; int64_t rId = 0; + + atomic_store_8(&mgmt->nodeStopped, 1); + void *pIter = taosHashIterate(mgmt->ctxHash, NULL); while (pIter) { SQWTaskCtx *ctx = (SQWTaskCtx *)pIter;