From f48fddc7c43224d260752ecb46f34c533fbb877b Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Tue, 22 Mar 2022 16:31:21 +0800 Subject: [PATCH] refact queue --- include/util/tqueue.h | 1 - source/dnode/mgmt/dnode/src/dmWorker.c | 4 ++-- source/dnode/mgmt/qnode/src/qmWorker.c | 2 +- source/util/src/tworker.c | 2 +- 4 files changed, 4 insertions(+), 5 deletions(-) diff --git a/include/util/tqueue.h b/include/util/tqueue.h index e84875bd7d..3bccc7404b 100644 --- a/include/util/tqueue.h +++ b/include/util/tqueue.h @@ -44,7 +44,6 @@ typedef struct STaosQset STaosQset; typedef struct STaosQall STaosQall; typedef struct { void *ahandle; - int32_t qsize; int32_t workerId; int32_t threadNum; } SQueueInfo; diff --git a/source/dnode/mgmt/dnode/src/dmWorker.c b/source/dnode/mgmt/dnode/src/dmWorker.c index 79ed6d50c8..9929ebae4f 100644 --- a/source/dnode/mgmt/dnode/src/dmWorker.c +++ b/source/dnode/mgmt/dnode/src/dmWorker.c @@ -101,14 +101,14 @@ static void dmProcessQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) { int32_t dmStartWorker(SDnodeMgmt *pMgmt) { SQWorkerAllCfg mgmtCfg = { - .minNum = 0, .maxNum = 1, .name = "dnode-mgmt", .fp = (FItem)dmProcessQueue, .param = pMgmt}; + .minNum = 1, .maxNum = 1, .name = "dnode-mgmt", .fp = (FItem)dmProcessQueue, .param = pMgmt}; if (tQWorkerAllInit(&pMgmt->mgmtWorker, &mgmtCfg) != 0) { dError("failed to start dnode mgmt worker since %s", terrstr()); return -1; } SQWorkerAllCfg statusCfg = { - .minNum = 0, .maxNum = 1, .name = "dnode-status", .fp = (FItem)dmProcessQueue, .param = pMgmt}; + .minNum = 1, .maxNum = 1, .name = "dnode-status", .fp = (FItem)dmProcessQueue, .param = pMgmt}; if (tQWorkerAllInit(&pMgmt->statusWorker, &statusCfg) != 0) { dError("failed to start dnode status worker since %s", terrstr()); return -1; diff --git a/source/dnode/mgmt/qnode/src/qmWorker.c b/source/dnode/mgmt/qnode/src/qmWorker.c index 5f71837269..325eec7631 100644 --- a/source/dnode/mgmt/qnode/src/qmWorker.c +++ b/source/dnode/mgmt/qnode/src/qmWorker.c @@ -127,7 +127,7 @@ int32_t qmStartWorker(SQnodeMgmt *pMgmt) { .fp = (FItem)qmProcessFetchQueue, .param = pMgmt}; - if (tQWorkerAllInit(&pMgmt->queryWorker, &fetchCfg) != 0) { + if (tQWorkerAllInit(&pMgmt->fetchWorker, &fetchCfg) != 0) { dError("failed to start qnode-fetch worker since %s", terrstr()); return -1; } diff --git a/source/util/src/tworker.c b/source/util/src/tworker.c index 44fc2f9f93..f62a63132e 100644 --- a/source/util/src/tworker.c +++ b/source/util/src/tworker.c @@ -212,7 +212,7 @@ static void *tWWorkerThreadFp(SWWorker *worker) { if (fp != NULL) { SQueueInfo info = {.ahandle = ahandle, .workerId = worker->id, .threadNum = pool->num}; - (*fp)(ahandle, worker->qall, numOfMsgs); + (*fp)(&info, worker->qall, numOfMsgs); } }