From 38fa01a6babc9b1770b11ce001010dfaec035c80 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Mon, 6 Jun 2022 10:23:52 +0800 Subject: [PATCH] remove offline sch --- source/libs/qworker/inc/qwInt.h | 2 +- source/libs/qworker/src/qwUtil.c | 17 ++++++++++++++++- source/libs/qworker/src/qworker.c | 5 +++-- 3 files changed, 20 insertions(+), 4 deletions(-) diff --git a/source/libs/qworker/inc/qwInt.h b/source/libs/qworker/inc/qwInt.h index 082db6428f..9ff49bef4f 100644 --- a/source/libs/qworker/inc/qwInt.h +++ b/source/libs/qworker/inc/qwInt.h @@ -355,7 +355,7 @@ int32_t qwOpenRef(void); void qwSetHbParam(int64_t refId, SQWHbParam **pParam); int32_t qwUpdateTimeInQueue(SQWorker *mgmt, int64_t ts, EQueueType type); int64_t qwGetTimeInQueue(SQWorker *mgmt, EQueueType type); -void qwClearExpiredSch(SArray* pExpiredSch); +void qwClearExpiredSch(SQWorker *mgmt, SArray* pExpiredSch); int32_t qwAcquireScheduler(SQWorker *mgmt, uint64_t sId, int32_t rwType, SQWSchStatus **sch); void qwDbgDumpMgmtInfo(SQWorker *mgmt); diff --git a/source/libs/qworker/src/qwUtil.c b/source/libs/qworker/src/qwUtil.c index 8bfb80f061..30337ade66 100644 --- a/source/libs/qworker/src/qwUtil.c +++ b/source/libs/qworker/src/qwUtil.c @@ -536,8 +536,23 @@ int64_t qwGetTimeInQueue(SQWorker *mgmt, EQueueType type) { } -void qwClearExpiredSch(SArray* pExpiredSch) { +void qwClearExpiredSch(SQWorker *mgmt, SArray* pExpiredSch) { + int32_t num = taosArrayGetSize(pExpiredSch); + for (int32_t i = 0; i < num; ++i) { + uint64_t *sId = taosArrayGet(pExpiredSch, i); + SQWSchStatus *pSch = NULL; + if (qwAcquireScheduler(mgmt, *sId, QW_WRITE, &pSch)) { + continue; + } + if (taosHashGetSize(pSch->tasksHash) <= 0) { + qwDestroySchStatus(pSch); + taosHashRemove(mgmt->schHash, sId, sizeof(*sId)); + qError("sch %" PRIx64 "destroyed", *sId); + } + + qwReleaseScheduler(QW_WRITE, mgmt); + } } diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index 44a8fdf7f4..8e0d14996d 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -743,9 +743,10 @@ int32_t qwProcessHb(SQWorker *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *req) { } QW_ERR_JRET(qwAcquireAddScheduler(mgmt, req->sId, QW_READ, &sch)); - QW_ERR_JRET(qwRegisterHbBrokenLinkArg(mgmt, req->sId, &qwMsg->connInfo)); + sch->hbBrokenTs = 0; + QW_LOCK(QW_WRITE, &sch->hbConnLock); if (sch->hbConnInfo.handle) { @@ -865,7 +866,7 @@ _return: } if (taosArrayGetSize(pExpiredSch) > 0) { - qwClearExpiredSch(pExpiredSch); + qwClearExpiredSch(mgmt, pExpiredSch); } taosMemoryFreeClear(rspList);