fix: task dropped error code

This commit is contained in:
dapan1121 2023-02-16 10:06:01 +08:00
parent 957ed637b0
commit 892d6c61c9
2 changed files with 9 additions and 19 deletions

View File

@ -206,6 +206,8 @@ typedef struct SQWorkerMgmt {
int32_t paramIdx; int32_t paramIdx;
} SQWorkerMgmt; } SQWorkerMgmt;
#define QW_CTX_NOT_EXISTS_ERR_CODE(mgmt) (atomic_load_8(&(mgmt)->nodeStopped) ? TSDB_CODE_VND_STOPPED : TSDB_CODE_QRY_TASK_CTX_NOT_EXIST)
#define QW_FPARAMS_DEF SQWorker *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, int64_t rId, int32_t eId #define QW_FPARAMS_DEF SQWorker *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, int64_t rId, int32_t eId
#define QW_IDS() sId, qId, tId, rId, eId #define QW_IDS() sId, qId, tId, rId, eId
#define QW_FPARAMS() mgmt, QW_IDS() #define QW_FPARAMS() mgmt, QW_IDS()

View File

@ -213,15 +213,9 @@ int32_t qwAcquireTaskCtx(QW_FPARAMS_DEF, SQWTaskCtx **ctx) {
QW_SET_QTID(id, qId, tId, eId); QW_SET_QTID(id, qId, tId, eId);
*ctx = taosHashAcquire(mgmt->ctxHash, id, sizeof(id)); *ctx = taosHashAcquire(mgmt->ctxHash, id, sizeof(id));
int8_t nodeStopped = atomic_load_8(&mgmt->nodeStopped);
if (NULL == (*ctx)) { if (NULL == (*ctx)) {
if (!nodeStopped) { QW_TASK_DLOG_E("acquired task ctx not exist, may be dropped");
QW_TASK_DLOG_E("task ctx not exist, may be dropped"); QW_ERR_RET(QW_CTX_NOT_EXISTS_ERR_CODE(mgmt));
QW_ERR_RET(TSDB_CODE_QRY_TASK_CTX_NOT_EXIST);
} else {
QW_TASK_DLOG_E("node stopped");
QW_ERR_RET(TSDB_CODE_VND_STOPPED);
}
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
@ -232,16 +226,9 @@ int32_t qwGetTaskCtx(QW_FPARAMS_DEF, SQWTaskCtx **ctx) {
QW_SET_QTID(id, qId, tId, eId); QW_SET_QTID(id, qId, tId, eId);
*ctx = taosHashGet(mgmt->ctxHash, id, sizeof(id)); *ctx = taosHashGet(mgmt->ctxHash, id, sizeof(id));
int8_t nodeStopped = atomic_load_8(&mgmt->nodeStopped);
if (NULL == (*ctx)) { if (NULL == (*ctx)) {
if (!nodeStopped) { QW_TASK_DLOG_E("get task ctx not exist, may be dropped");
QW_TASK_DLOG_E("task ctx not exist, may be dropped"); QW_ERR_RET(QW_CTX_NOT_EXISTS_ERR_CODE(mgmt));
QW_ERR_RET(TSDB_CODE_QRY_TASK_CTX_NOT_EXIST);
} else {
QW_TASK_DLOG_E("node stopped");
QW_ERR_RET(TSDB_CODE_VND_STOPPED);
}
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
@ -334,7 +321,8 @@ int32_t qwDropTaskCtx(QW_FPARAMS_DEF) {
SQWTaskCtx *ctx = taosHashGet(mgmt->ctxHash, id, sizeof(id)); SQWTaskCtx *ctx = taosHashGet(mgmt->ctxHash, id, sizeof(id));
if (NULL == ctx) { if (NULL == ctx) {
QW_ERR_RET(TSDB_CODE_QRY_TASK_CTX_NOT_EXIST); QW_TASK_DLOG_E("drop task ctx not exist, may be dropped");
QW_ERR_RET(QW_CTX_NOT_EXISTS_ERR_CODE(mgmt));
} }
octx = *ctx; octx = *ctx;
@ -346,7 +334,7 @@ int32_t qwDropTaskCtx(QW_FPARAMS_DEF) {
if (taosHashRemove(mgmt->ctxHash, id, sizeof(id))) { if (taosHashRemove(mgmt->ctxHash, id, sizeof(id))) {
QW_TASK_ELOG_E("taosHashRemove from ctx hash failed"); QW_TASK_ELOG_E("taosHashRemove from ctx hash failed");
QW_ERR_RET(TSDB_CODE_QRY_TASK_CTX_NOT_EXIST); QW_ERR_RET(QW_CTX_NOT_EXISTS_ERR_CODE(mgmt));
} }
qwFreeTaskCtx(&octx); qwFreeTaskCtx(&octx);