remove offline sch

This commit is contained in:
dapan1121 2022-06-06 10:23:52 +08:00
parent 64de7ee1ac
commit 38fa01a6ba
3 changed files with 20 additions and 4 deletions

View File

@ -355,7 +355,7 @@ int32_t qwOpenRef(void);
void qwSetHbParam(int64_t refId, SQWHbParam **pParam);
int32_t qwUpdateTimeInQueue(SQWorker *mgmt, int64_t ts, EQueueType type);
int64_t qwGetTimeInQueue(SQWorker *mgmt, EQueueType type);
void qwClearExpiredSch(SArray* pExpiredSch);
void qwClearExpiredSch(SQWorker *mgmt, SArray* pExpiredSch);
int32_t qwAcquireScheduler(SQWorker *mgmt, uint64_t sId, int32_t rwType, SQWSchStatus **sch);
void qwDbgDumpMgmtInfo(SQWorker *mgmt);

View File

@ -536,8 +536,23 @@ int64_t qwGetTimeInQueue(SQWorker *mgmt, EQueueType type) {
}
void qwClearExpiredSch(SArray* pExpiredSch) {
void qwClearExpiredSch(SQWorker *mgmt, SArray* pExpiredSch) {
int32_t num = taosArrayGetSize(pExpiredSch);
for (int32_t i = 0; i < num; ++i) {
uint64_t *sId = taosArrayGet(pExpiredSch, i);
SQWSchStatus *pSch = NULL;
if (qwAcquireScheduler(mgmt, *sId, QW_WRITE, &pSch)) {
continue;
}
if (taosHashGetSize(pSch->tasksHash) <= 0) {
qwDestroySchStatus(pSch);
taosHashRemove(mgmt->schHash, sId, sizeof(*sId));
qError("sch %" PRIx64 "destroyed", *sId);
}
qwReleaseScheduler(QW_WRITE, mgmt);
}
}

View File

@ -743,9 +743,10 @@ int32_t qwProcessHb(SQWorker *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *req) {
}
QW_ERR_JRET(qwAcquireAddScheduler(mgmt, req->sId, QW_READ, &sch));
QW_ERR_JRET(qwRegisterHbBrokenLinkArg(mgmt, req->sId, &qwMsg->connInfo));
sch->hbBrokenTs = 0;
QW_LOCK(QW_WRITE, &sch->hbConnLock);
if (sch->hbConnInfo.handle) {
@ -865,7 +866,7 @@ _return:
}
if (taosArrayGetSize(pExpiredSch) > 0) {
qwClearExpiredSch(pExpiredSch);
qwClearExpiredSch(mgmt, pExpiredSch);
}
taosMemoryFreeClear(rspList);