Merge pull request #19629 from taosdata/szhou/fix-td-21911
fix: add node stopped and get/accquire ctx return node stopped
This commit is contained in:
commit
c273c42dfb
|
@ -194,6 +194,8 @@ typedef struct SQWorker {
|
||||||
SMsgCb msgCb;
|
SMsgCb msgCb;
|
||||||
SQWStat stat;
|
SQWStat stat;
|
||||||
int32_t *destroyed;
|
int32_t *destroyed;
|
||||||
|
|
||||||
|
int8_t nodeStopped;
|
||||||
} SQWorker;
|
} SQWorker;
|
||||||
|
|
||||||
typedef struct SQWorkerMgmt {
|
typedef struct SQWorkerMgmt {
|
||||||
|
|
|
@ -213,9 +213,15 @@ int32_t qwAcquireTaskCtx(QW_FPARAMS_DEF, SQWTaskCtx **ctx) {
|
||||||
QW_SET_QTID(id, qId, tId, eId);
|
QW_SET_QTID(id, qId, tId, eId);
|
||||||
|
|
||||||
*ctx = taosHashAcquire(mgmt->ctxHash, id, sizeof(id));
|
*ctx = taosHashAcquire(mgmt->ctxHash, id, sizeof(id));
|
||||||
|
int8_t nodeStopped = atomic_load_8(&mgmt->nodeStopped);
|
||||||
if (NULL == (*ctx)) {
|
if (NULL == (*ctx)) {
|
||||||
|
if (!nodeStopped) {
|
||||||
QW_TASK_DLOG_E("task ctx not exist, may be dropped");
|
QW_TASK_DLOG_E("task ctx not exist, may be dropped");
|
||||||
QW_ERR_RET(TSDB_CODE_QRY_TASK_CTX_NOT_EXIST);
|
QW_ERR_RET(TSDB_CODE_QRY_TASK_CTX_NOT_EXIST);
|
||||||
|
} else {
|
||||||
|
QW_TASK_DLOG_E("node stopped");
|
||||||
|
QW_ERR_RET(TSDB_CODE_VND_STOPPED);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
@ -226,9 +232,16 @@ int32_t qwGetTaskCtx(QW_FPARAMS_DEF, SQWTaskCtx **ctx) {
|
||||||
QW_SET_QTID(id, qId, tId, eId);
|
QW_SET_QTID(id, qId, tId, eId);
|
||||||
|
|
||||||
*ctx = taosHashGet(mgmt->ctxHash, id, sizeof(id));
|
*ctx = taosHashGet(mgmt->ctxHash, id, sizeof(id));
|
||||||
|
int8_t nodeStopped = atomic_load_8(&mgmt->nodeStopped);
|
||||||
|
|
||||||
if (NULL == (*ctx)) {
|
if (NULL == (*ctx)) {
|
||||||
|
if (!nodeStopped) {
|
||||||
QW_TASK_DLOG_E("task ctx not exist, may be dropped");
|
QW_TASK_DLOG_E("task ctx not exist, may be dropped");
|
||||||
QW_ERR_RET(TSDB_CODE_QRY_TASK_CTX_NOT_EXIST);
|
QW_ERR_RET(TSDB_CODE_QRY_TASK_CTX_NOT_EXIST);
|
||||||
|
} else {
|
||||||
|
QW_TASK_DLOG_E("node stopped");
|
||||||
|
QW_ERR_RET(TSDB_CODE_VND_STOPPED);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
|
|
@ -1188,6 +1188,9 @@ void qWorkerStopAllTasks(void *qWorkerMgmt) {
|
||||||
uint64_t qId, tId, sId;
|
uint64_t qId, tId, sId;
|
||||||
int32_t eId;
|
int32_t eId;
|
||||||
int64_t rId = 0;
|
int64_t rId = 0;
|
||||||
|
|
||||||
|
atomic_store_8(&mgmt->nodeStopped, 1);
|
||||||
|
|
||||||
void *pIter = taosHashIterate(mgmt->ctxHash, NULL);
|
void *pIter = taosHashIterate(mgmt->ctxHash, NULL);
|
||||||
while (pIter) {
|
while (pIter) {
|
||||||
SQWTaskCtx *ctx = (SQWTaskCtx *)pIter;
|
SQWTaskCtx *ctx = (SQWTaskCtx *)pIter;
|
||||||
|
|
Loading…
Reference in New Issue