From c6d23422e9f602147457b78498773809adcf5e3d Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Tue, 22 Mar 2022 15:23:00 +0800 Subject: [PATCH] refact queue --- source/dnode/mgmt/bnode/inc/bmInt.h | 2 +- source/dnode/mgmt/bnode/src/bmWorker.c | 9 +- source/dnode/mgmt/container/inc/dnd.h | 19 ---- source/dnode/mgmt/container/src/dndWorker.c | 96 --------------------- source/dnode/mgmt/dnode/inc/dmInt.h | 6 +- source/dnode/mgmt/dnode/src/dmWorker.c | 22 +++-- source/dnode/mgmt/qnode/inc/qmInt.h | 4 +- source/dnode/mgmt/qnode/src/qmWorker.c | 34 +++++--- source/dnode/mgmt/snode/inc/smInt.h | 4 +- source/dnode/mgmt/snode/src/smWorker.c | 42 +++++---- source/dnode/mgmt/vnode/inc/vmInt.h | 2 +- source/dnode/mgmt/vnode/src/vmWorker.c | 10 ++- 12 files changed, 80 insertions(+), 170 deletions(-) delete mode 100644 source/dnode/mgmt/container/src/dndWorker.c diff --git a/source/dnode/mgmt/bnode/inc/bmInt.h b/source/dnode/mgmt/bnode/inc/bmInt.h index ddbf7f4c4e..fc6b7a999d 100644 --- a/source/dnode/mgmt/bnode/inc/bmInt.h +++ b/source/dnode/mgmt/bnode/inc/bmInt.h @@ -28,7 +28,7 @@ typedef struct SBnodeMgmt { SDnode *pDnode; SMgmtWrapper *pWrapper; const char *path; - SDnodeWorker writeWorker; + SWWorkerAll writeWorker; } SBnodeMgmt; // bmInt.c diff --git a/source/dnode/mgmt/bnode/src/bmWorker.c b/source/dnode/mgmt/bnode/src/bmWorker.c index c8ad137842..79b2a669ae 100644 --- a/source/dnode/mgmt/bnode/src/bmWorker.c +++ b/source/dnode/mgmt/bnode/src/bmWorker.c @@ -63,14 +63,15 @@ static void bmProcessQueue(SBnodeMgmt *pMgmt, STaosQall *qall, int32_t numOfMsgs } int32_t bmProcessWriteMsg(SBnodeMgmt *pMgmt, SNodeMsg *pMsg) { - SDnodeWorker *pWorker = &pMgmt->writeWorker; + SWWorkerAll *pWorker = &pMgmt->writeWorker; dTrace("msg:%p, put into worker:%s", pMsg, pWorker->name); - return dndWriteMsgToWorker(pWorker, pMsg); + return taosWriteQitem(pWorker->queue, pMsg); } int32_t bmStartWorker(SBnodeMgmt *pMgmt) { - if (dndInitWorker(pMgmt, &pMgmt->writeWorker, DND_WORKER_MULTI, "bnode-write", 0, 1, bmProcessQueue) != 0) { + SWWorkerAllCfg cfg = {.maxNum = 1, .name = "bnode-write", .fp = (FItems)bmProcessQueue, .param = pMgmt}; + if (tWWorkerAllInit(&pMgmt->writeWorker, &cfg) != 0) { dError("failed to start bnode write worker since %s", terrstr()); return -1; } @@ -78,4 +79,4 @@ int32_t bmStartWorker(SBnodeMgmt *pMgmt) { return 0; } -void bmStopWorker(SBnodeMgmt *pMgmt) { dndCleanupWorker(&pMgmt->writeWorker); } +void bmStopWorker(SBnodeMgmt *pMgmt) { tWWorkerAllCleanup(&pMgmt->writeWorker); } diff --git a/source/dnode/mgmt/container/inc/dnd.h b/source/dnode/mgmt/container/inc/dnd.h index f9fa896f42..af552b5357 100644 --- a/source/dnode/mgmt/container/inc/dnd.h +++ b/source/dnode/mgmt/container/inc/dnd.h @@ -74,20 +74,6 @@ typedef int32_t (*CreateNodeFp)(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); typedef int32_t (*DropNodeFp)(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); typedef int32_t (*RequireNodeFp)(SMgmtWrapper *pWrapper, bool *required); -typedef struct { - EWorkerType type; - const char *name; - int32_t minNum; - int32_t maxNum; - void *queueFp; - void *param; - STaosQueue *queue; - union { - SQWorkerPool pool; - SWWorkerPool mpool; - }; -} SDnodeWorker; - typedef struct SMsgHandle { int32_t vgId; NodeMsgFp vgIdMsgFp; @@ -161,11 +147,6 @@ int32_t dndSendReqToMnode(SMgmtWrapper *pWrapper, SRpcMsg *pMsg); int32_t dndSendReqToDnode(SMgmtWrapper *pWrapper, SEpSet *pEpSet, SRpcMsg *pMsg); void dndSendRsp(SMgmtWrapper *pWrapper, SRpcMsg *pRsp); -int32_t dndInitWorker(void *param, SDnodeWorker *pWorker, EWorkerType type, const char *name, int32_t minNum, - int32_t maxNum, void *queueFp); -void dndCleanupWorker(SDnodeWorker *pWorker); -int32_t dndWriteMsgToWorker(SDnodeWorker *pWorker, void *pMsg); - int32_t dndProcessNodeMsg(SDnode *pDnode, SNodeMsg *pMsg); int32_t dndReadFile(SMgmtWrapper *pWrapper, bool *pDeployed); diff --git a/source/dnode/mgmt/container/src/dndWorker.c b/source/dnode/mgmt/container/src/dndWorker.c deleted file mode 100644 index 5d99ec5f0d..0000000000 --- a/source/dnode/mgmt/container/src/dndWorker.c +++ /dev/null @@ -1,96 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#define _DEFAULT_SOURCE -#include "dndInt.h" - -int32_t dndInitWorker(void *param, SDnodeWorker *pWorker, EWorkerType type, const char *name, int32_t minNum, - int32_t maxNum, void *queueFp) { - if (pWorker == NULL || name == NULL || minNum < 0 || maxNum <= 0 || queueFp == NULL) { - terrno = TSDB_CODE_INVALID_PARA; - return -1; - } - - pWorker->type = type; - pWorker->name = name; - pWorker->minNum = minNum; - pWorker->maxNum = maxNum; - pWorker->queueFp = queueFp; - pWorker->param = param; - - if (pWorker->type == DND_WORKER_SINGLE) { - SQWorkerPool *pPool = &pWorker->pool; - pPool->name = name; - pPool->min = minNum; - pPool->max = maxNum; - if (tQWorkerInit(pPool) != 0) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } - pWorker->queue = tQWorkerAllocQueue(pPool, param, (FItem)queueFp); - if (pWorker->queue == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } - } else if (pWorker->type == DND_WORKER_MULTI) { - SWWorkerPool *pPool = &pWorker->mpool; - pPool->name = name; - pPool->max = maxNum; - if (tWWorkerInit(pPool) != 0) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } - pWorker->queue = tWWorkerAllocQueue(pPool, param, (FItems)queueFp); - if (pWorker->queue == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } - } else { - terrno = TSDB_CODE_INVALID_PARA; - } - - return 0; -} - -void dndCleanupWorker(SDnodeWorker *pWorker) { - if (pWorker->queue == NULL) return; - - while (!taosQueueEmpty(pWorker->queue)) { - taosMsleep(10); - } - - if (pWorker->type == DND_WORKER_SINGLE) { - tQWorkerCleanup(&pWorker->pool); - tQWorkerFreeQueue(&pWorker->pool, pWorker->queue); - } else if (pWorker->type == DND_WORKER_MULTI) { - tWWorkerCleanup(&pWorker->mpool); - tWWorkerFreeQueue(&pWorker->mpool, pWorker->queue); - } else { - } -} - -int32_t dndWriteMsgToWorker(SDnodeWorker *pWorker, void *pMsg) { - if (pWorker == NULL || pWorker->queue == NULL) { - terrno = TSDB_CODE_INVALID_PARA; - return -1; - } - - if (taosWriteQitem(pWorker->queue, pMsg) != 0) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } - - return 0; -} diff --git a/source/dnode/mgmt/dnode/inc/dmInt.h b/source/dnode/mgmt/dnode/inc/dmInt.h index 46e70727af..0330b7f996 100644 --- a/source/dnode/mgmt/dnode/inc/dmInt.h +++ b/source/dnode/mgmt/dnode/inc/dmInt.h @@ -29,10 +29,10 @@ typedef struct SDnodeMgmt { SEpSet mnodeEpSet; SHashObj *dnodeHash; SArray *dnodeEps; - TdThread *threadId; + TdThread *threadId; SRWLatch latch; - SDnodeWorker mgmtWorker; - SDnodeWorker statusWorker; + SQWorkerAll mgmtWorker; + SQWorkerAll statusWorker; const char *path; SDnode *pDnode; SMgmtWrapper *pWrapper; diff --git a/source/dnode/mgmt/dnode/src/dmWorker.c b/source/dnode/mgmt/dnode/src/dmWorker.c index e06cbf4351..5bacd5f10b 100644 --- a/source/dnode/mgmt/dnode/src/dmWorker.c +++ b/source/dnode/mgmt/dnode/src/dmWorker.c @@ -53,9 +53,9 @@ static void *dmThreadRoutine(void *param) { } static void dmProcessQueue(SDnodeMgmt *pMgmt, SNodeMsg *pMsg) { - SDnode *pDnode = pMgmt->pDnode; + SDnode *pDnode = pMgmt->pDnode; SRpcMsg *pRpc = &pMsg->rpcMsg; - int32_t code = -1; + int32_t code = -1; dTrace("msg:%p, will be processed in dnode queue", pMsg); switch (pRpc->msgType) { @@ -98,13 +98,17 @@ static void dmProcessQueue(SDnodeMgmt *pMgmt, SNodeMsg *pMsg) { } int32_t dmStartWorker(SDnodeMgmt *pMgmt) { - if (dndInitWorker(pMgmt, &pMgmt->mgmtWorker, DND_WORKER_SINGLE, "dnode-mgmt", 1, 1, dmProcessQueue) != 0) { + SQWorkerAllCfg mgmtCfg = { + .minNum = 0, .maxNum = 1, .name = "dnode-mgmt", .fp = (FItem)dmProcessQueue, .param = pMgmt}; + if (tQWorkerAllInit(&pMgmt->mgmtWorker, &mgmtCfg) != 0) { dError("failed to start dnode mgmt worker since %s", terrstr()); return -1; } - if (dndInitWorker(pMgmt, &pMgmt->statusWorker, DND_WORKER_SINGLE, "dnode-status", 1, 1, dmProcessQueue) != 0) { - dError("failed to start dnode mgmt worker since %s", terrstr()); + SQWorkerAllCfg statusCfg = { + .minNum = 0, .maxNum = 1, .name = "dnode-status", .fp = (FItem)dmProcessQueue, .param = pMgmt}; + if (tQWorkerAllInit(&pMgmt->statusWorker, &statusCfg) != 0) { + dError("failed to start dnode status worker since %s", terrstr()); return -1; } @@ -123,8 +127,8 @@ int32_t dmStartThread(SDnodeMgmt *pMgmt) { } void dmStopWorker(SDnodeMgmt *pMgmt) { - dndCleanupWorker(&pMgmt->mgmtWorker); - dndCleanupWorker(&pMgmt->statusWorker); + tQWorkerAllCleanup(&pMgmt->mgmtWorker); + tQWorkerAllCleanup(&pMgmt->statusWorker); if (pMgmt->threadId != NULL) { taosDestoryThread(pMgmt->threadId); @@ -133,11 +137,11 @@ void dmStopWorker(SDnodeMgmt *pMgmt) { } int32_t dmProcessMgmtMsg(SDnodeMgmt *pMgmt, SNodeMsg *pMsg) { - SDnodeWorker *pWorker = &pMgmt->mgmtWorker; + SQWorkerAll *pWorker = &pMgmt->mgmtWorker; if (pMsg->rpcMsg.msgType == TDMT_MND_STATUS_RSP) { pWorker = &pMgmt->statusWorker; } dTrace("msg:%p, will be written to worker %s", pMsg, pWorker->name); - return dndWriteMsgToWorker(pWorker, pMsg); + return taosWriteQitem(pWorker->queue, pMsg); } diff --git a/source/dnode/mgmt/qnode/inc/qmInt.h b/source/dnode/mgmt/qnode/inc/qmInt.h index dc7f10b939..c78d04ffb5 100644 --- a/source/dnode/mgmt/qnode/inc/qmInt.h +++ b/source/dnode/mgmt/qnode/inc/qmInt.h @@ -28,8 +28,8 @@ typedef struct SQnodeMgmt { SDnode *pDnode; SMgmtWrapper *pWrapper; const char *path; - SDnodeWorker queryWorker; - SDnodeWorker fetchWorker; + SQWorkerAll queryWorker; + SQWorkerAll fetchWorker; } SQnodeMgmt; // qmInt.c diff --git a/source/dnode/mgmt/qnode/src/qmWorker.c b/source/dnode/mgmt/qnode/src/qmWorker.c index 1a140c69e7..c828d4f3c3 100644 --- a/source/dnode/mgmt/qnode/src/qmWorker.c +++ b/source/dnode/mgmt/qnode/src/qmWorker.c @@ -45,16 +45,16 @@ static void qmProcessFetchQueue(SQnodeMgmt *pMgmt, SNodeMsg *pMsg) { taosFreeQitem(pMsg); } -static int32_t qmPutMsgToWorker(SDnodeWorker *pWorker, SNodeMsg *pMsg) { +static int32_t qmPutMsgToWorker(SQWorkerAll *pWorker, SNodeMsg *pMsg) { dTrace("msg:%p, put into worker %s", pMsg, pWorker->name); - return dndWriteMsgToWorker(pWorker, pMsg); + return taosWriteQitem(pWorker->queue, pMsg); } int32_t qmProcessQueryMsg(SQnodeMgmt *pMgmt, SNodeMsg *pMsg) { return qmPutMsgToWorker(&pMgmt->queryWorker, pMsg); } int32_t qmProcessFetchMsg(SQnodeMgmt *pMgmt, SNodeMsg *pMsg) { return qmPutMsgToWorker(&pMgmt->fetchWorker, pMsg); } -static int32_t qmPutRpcMsgToWorker(SQnodeMgmt *pMgmt, SDnodeWorker *pWorker, SRpcMsg *pRpc) { +static int32_t qmPutRpcMsgToWorker(SQnodeMgmt *pMgmt, SQWorkerAll *pWorker, SRpcMsg *pRpc) { SNodeMsg *pMsg = taosAllocateQitem(sizeof(SNodeMsg)); if (pMsg == NULL) { return -1; @@ -63,7 +63,7 @@ static int32_t qmPutRpcMsgToWorker(SQnodeMgmt *pMgmt, SDnodeWorker *pWorker, SRp dTrace("msg:%p, is created and put into worker:%s, type:%s", pMsg, pWorker->name, TMSG_INFO(pRpc->msgType)); pMsg->rpcMsg = *pRpc; - int32_t code = dndWriteMsgToWorker(pWorker, pMsg); + int32_t code = taosWriteQitem(pWorker->queue, pMsg); if (code != 0) { dTrace("msg:%p, is freed", pMsg); taosFreeQitem(pMsg); @@ -89,15 +89,25 @@ int32_t qmStartWorker(SQnodeMgmt *pMgmt) { int32_t minQueryThreads = TMAX((int32_t)(tsNumOfCores * tsRatioOfQueryCores), 1); int32_t maxQueryThreads = minQueryThreads; - if (dndInitWorker(pMgmt, &pMgmt->queryWorker, DND_WORKER_SINGLE, "qnode-query", minQueryThreads, maxQueryThreads, - qmProcessQueryQueue) != 0) { - dError("failed to start qnode query worker since %s", terrstr()); + SQWorkerAllCfg queryCfg = {.minNum = minQueryThreads, + .maxNum = maxQueryThreads, + .name = "qnode-query", + .fp = (FItem)qmProcessQueryQueue, + .param = pMgmt}; + + if (tQWorkerAllInit(&pMgmt->queryWorker, &queryCfg) != 0) { + dError("failed to start qnode-query worker since %s", terrstr()); return -1; } - if (dndInitWorker(pMgmt, &pMgmt->fetchWorker, DND_WORKER_SINGLE, "qnode-fetch", minFetchThreads, maxFetchThreads, - qmProcessFetchQueue) != 0) { - dError("failed to start qnode fetch worker since %s", terrstr()); + SQWorkerAllCfg fetchCfg = {.minNum = minFetchThreads, + .maxNum = maxFetchThreads, + .name = "qnode-fetch", + .fp = (FItem)qmProcessFetchQueue, + .param = pMgmt}; + + if (tQWorkerAllInit(&pMgmt->queryWorker, &fetchCfg) != 0) { + dError("failed to start qnode-fetch worker since %s", terrstr()); return -1; } @@ -105,6 +115,6 @@ int32_t qmStartWorker(SQnodeMgmt *pMgmt) { } void qmStopWorker(SQnodeMgmt *pMgmt) { - dndCleanupWorker(&pMgmt->queryWorker); - dndCleanupWorker(&pMgmt->fetchWorker); + tQWorkerAllCleanup(&pMgmt->queryWorker); + tQWorkerAllCleanup(&pMgmt->fetchWorker); } diff --git a/source/dnode/mgmt/snode/inc/smInt.h b/source/dnode/mgmt/snode/inc/smInt.h index 3def27b832..744089efae 100644 --- a/source/dnode/mgmt/snode/inc/smInt.h +++ b/source/dnode/mgmt/snode/inc/smInt.h @@ -30,8 +30,8 @@ typedef struct SSnodeMgmt { const char *path; SRWLatch latch; int8_t uniqueWorkerInUse; - SArray *uniqueWorkers; // SArray - SDnodeWorker sharedWorker; + SArray *uniqueWorkers; // SArray + SQWorkerAll sharedWorker; } SSnodeMgmt; // smInt.c diff --git a/source/dnode/mgmt/snode/src/smWorker.c b/source/dnode/mgmt/snode/src/smWorker.c index 57d0c09849..99c844f8b5 100644 --- a/source/dnode/mgmt/snode/src/smWorker.c +++ b/source/dnode/mgmt/snode/src/smWorker.c @@ -40,20 +40,23 @@ static void smProcessSharedQueue(SSnodeMgmt *pMgmt, SNodeMsg *pMsg) { } int32_t smStartWorker(SSnodeMgmt *pMgmt) { - pMgmt->uniqueWorkers = taosArrayInit(0, sizeof(void *)); + pMgmt->uniqueWorkers = taosArrayInit(0, sizeof(SWWorkerAll *)); if (pMgmt->uniqueWorkers == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } for (int32_t i = 0; i < SND_UNIQUE_THREAD_NUM; i++) { - SDnodeWorker *pUniqueWorker = malloc(sizeof(SDnodeWorker)); + SWWorkerAll *pUniqueWorker = malloc(sizeof(SWWorkerAll)); if (pUniqueWorker == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } - if (dndInitWorker(pMgmt, pUniqueWorker, DND_WORKER_MULTI, "snode-unique", 1, 1, smProcessSharedQueue) != 0) { - dError("failed to start snode unique worker since %s", terrstr()); + + SWWorkerAllCfg cfg = {.maxNum = 1, .name = "snode-unique", .fp = (FItems)smProcessUniqueQueue, .param = pMgmt}; + + if (tWWorkerAllInit(pUniqueWorker, &cfg) != 0) { + dError("failed to start snode-unique worker since %s", terrstr()); return -1; } if (taosArrayPush(pMgmt->uniqueWorkers, &pUniqueWorker) == NULL) { @@ -62,9 +65,14 @@ int32_t smStartWorker(SSnodeMgmt *pMgmt) { } } - if (dndInitWorker(pMgmt, &pMgmt->sharedWorker, DND_WORKER_SINGLE, "snode-shared", SND_SHARED_THREAD_NUM, - SND_SHARED_THREAD_NUM, smProcessSharedQueue)) { - dError("failed to start snode shared worker since %s", terrstr()); + SQWorkerAllCfg cfg = {.minNum = SND_SHARED_THREAD_NUM, + .maxNum = SND_SHARED_THREAD_NUM, + .name = "snode-shared", + .fp = (FItem)smProcessSharedQueue, + .param = pMgmt}; + + if (tQWorkerAllInit(&pMgmt->sharedWorker, &cfg)) { + dError("failed to start snode shared-worker since %s", terrstr()); return -1; } @@ -73,11 +81,11 @@ int32_t smStartWorker(SSnodeMgmt *pMgmt) { void smStopWorker(SSnodeMgmt *pMgmt) { for (int32_t i = 0; i < taosArrayGetSize(pMgmt->uniqueWorkers); i++) { - SDnodeWorker *worker = taosArrayGetP(pMgmt->uniqueWorkers, i); - dndCleanupWorker(worker); + SWWorkerAll *pWorker = taosArrayGetP(pMgmt->uniqueWorkers, i); + tWWorkerAllCleanup(pWorker); } taosArrayDestroy(pMgmt->uniqueWorkers); - dndCleanupWorker(&pMgmt->sharedWorker); + tQWorkerAllCleanup(&pMgmt->sharedWorker); } static FORCE_INLINE int32_t smGetSWIdFromMsg(SRpcMsg *pMsg) { @@ -93,33 +101,33 @@ static FORCE_INLINE int32_t smGetSWTypeFromMsg(SRpcMsg *pMsg) { } int32_t smProcessMgmtMsg(SSnodeMgmt *pMgmt, SNodeMsg *pMsg) { - SDnodeWorker *pWorker = taosArrayGetP(pMgmt->uniqueWorkers, 0); + SWWorkerAll *pWorker = taosArrayGetP(pMgmt->uniqueWorkers, 0); if (pWorker == NULL) { terrno = TSDB_CODE_INVALID_MSG; return -1; } dTrace("msg:%p, put into worker:%s", pMsg, pWorker->name); - return dndWriteMsgToWorker(pWorker, pMsg); + return taosWriteQitem(pWorker->queue, pMsg); } int32_t smProcessUniqueMsg(SSnodeMgmt *pMgmt, SNodeMsg *pMsg) { - int32_t index = smGetSWIdFromMsg(&pMsg->rpcMsg); - SDnodeWorker *pWorker = taosArrayGetP(pMgmt->uniqueWorkers, index); + int32_t index = smGetSWIdFromMsg(&pMsg->rpcMsg); + SWWorkerAll *pWorker = taosArrayGetP(pMgmt->uniqueWorkers, index); if (pWorker == NULL) { terrno = TSDB_CODE_INVALID_MSG; return -1; } dTrace("msg:%p, put into worker:%s", pMsg, pWorker->name); - return dndWriteMsgToWorker(pWorker, pMsg); + return taosWriteQitem(pWorker->queue, pMsg); } int32_t smProcessSharedMsg(SSnodeMgmt *pMgmt, SNodeMsg *pMsg) { - SDnodeWorker *pWorker = &pMgmt->sharedWorker; + SQWorkerAll *pWorker = &pMgmt->sharedWorker; dTrace("msg:%p, put into worker:%s", pMsg, pWorker->name); - return dndWriteMsgToWorker(pWorker, pMsg); + return taosWriteQitem(pWorker->queue, pMsg); } int32_t smProcessExecMsg(SSnodeMgmt *pMgmt, SNodeMsg *pMsg) { diff --git a/source/dnode/mgmt/vnode/inc/vmInt.h b/source/dnode/mgmt/vnode/inc/vmInt.h index ba59258d07..38a477178f 100644 --- a/source/dnode/mgmt/vnode/inc/vmInt.h +++ b/source/dnode/mgmt/vnode/inc/vmInt.h @@ -36,7 +36,7 @@ typedef struct SVnodesMgmt { const char *path; SDnode *pDnode; SMgmtWrapper *pWrapper; - SDnodeWorker mgmtWorker; + SQWorkerAll mgmtWorker; } SVnodesMgmt; typedef struct { diff --git a/source/dnode/mgmt/vnode/src/vmWorker.c b/source/dnode/mgmt/vnode/src/vmWorker.c index e0632cee68..2b9f31c867 100644 --- a/source/dnode/mgmt/vnode/src/vmWorker.c +++ b/source/dnode/mgmt/vnode/src/vmWorker.c @@ -207,9 +207,9 @@ int32_t vmProcessFetchMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) { } int32_t vmProcessMgmtMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) { - SDnodeWorker *pWorker = &pMgmt->mgmtWorker; + SQWorkerAll *pWorker = &pMgmt->mgmtWorker; dTrace("msg:%p, will be written to vnode-mgmt queue, worker:%s", pMsg, pWorker->name); - return dndWriteMsgToWorker(pWorker, pMsg); + return taosWriteQitem(pWorker->queue, pMsg); } static int32_t vmPutRpcMsgToQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc, EQueueType qtype) { @@ -319,7 +319,9 @@ int32_t vmStartWorker(SVnodesMgmt *pMgmt) { pWPool->max = maxSyncThreads; if (tWWorkerInit(pWPool) != 0) return -1; - if (dndInitWorker(pMgmt, &pMgmt->mgmtWorker, DND_WORKER_SINGLE, "vnode-mgmt", 1, 1, vmProcessMgmtQueue) != 0) { + SQWorkerAllCfg cfg = { + .minNum = 1, .maxNum = 1, .name = "vnode-mgmt", .fp = (FItem)vmProcessMgmtQueue, .param = pMgmt}; + if (tQWorkerAllInit(&pMgmt->mgmtWorker, &cfg) != 0) { dError("failed to start vnode-mgmt worker since %s", terrstr()); return -1; } @@ -329,7 +331,7 @@ int32_t vmStartWorker(SVnodesMgmt *pMgmt) { } void vmStopWorker(SVnodesMgmt *pMgmt) { - dndCleanupWorker(&pMgmt->mgmtWorker); + tQWorkerAllCleanup(&pMgmt->mgmtWorker); tQWorkerCleanup(&pMgmt->fetchPool); tQWorkerCleanup(&pMgmt->queryPool); tWWorkerCleanup(&pMgmt->writePool);