From c6d23422e9f602147457b78498773809adcf5e3d Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Tue, 22 Mar 2022 15:23:00 +0800 Subject: [PATCH 1/6] refact queue --- source/dnode/mgmt/bnode/inc/bmInt.h | 2 +- source/dnode/mgmt/bnode/src/bmWorker.c | 9 +- source/dnode/mgmt/container/inc/dnd.h | 19 ---- source/dnode/mgmt/container/src/dndWorker.c | 96 --------------------- source/dnode/mgmt/dnode/inc/dmInt.h | 6 +- source/dnode/mgmt/dnode/src/dmWorker.c | 22 +++-- source/dnode/mgmt/qnode/inc/qmInt.h | 4 +- source/dnode/mgmt/qnode/src/qmWorker.c | 34 +++++--- source/dnode/mgmt/snode/inc/smInt.h | 4 +- source/dnode/mgmt/snode/src/smWorker.c | 42 +++++---- source/dnode/mgmt/vnode/inc/vmInt.h | 2 +- source/dnode/mgmt/vnode/src/vmWorker.c | 10 ++- 12 files changed, 80 insertions(+), 170 deletions(-) delete mode 100644 source/dnode/mgmt/container/src/dndWorker.c diff --git a/source/dnode/mgmt/bnode/inc/bmInt.h b/source/dnode/mgmt/bnode/inc/bmInt.h index ddbf7f4c4e..fc6b7a999d 100644 --- a/source/dnode/mgmt/bnode/inc/bmInt.h +++ b/source/dnode/mgmt/bnode/inc/bmInt.h @@ -28,7 +28,7 @@ typedef struct SBnodeMgmt { SDnode *pDnode; SMgmtWrapper *pWrapper; const char *path; - SDnodeWorker writeWorker; + SWWorkerAll writeWorker; } SBnodeMgmt; // bmInt.c diff --git a/source/dnode/mgmt/bnode/src/bmWorker.c b/source/dnode/mgmt/bnode/src/bmWorker.c index c8ad137842..79b2a669ae 100644 --- a/source/dnode/mgmt/bnode/src/bmWorker.c +++ b/source/dnode/mgmt/bnode/src/bmWorker.c @@ -63,14 +63,15 @@ static void bmProcessQueue(SBnodeMgmt *pMgmt, STaosQall *qall, int32_t numOfMsgs } int32_t bmProcessWriteMsg(SBnodeMgmt *pMgmt, SNodeMsg *pMsg) { - SDnodeWorker *pWorker = &pMgmt->writeWorker; + SWWorkerAll *pWorker = &pMgmt->writeWorker; dTrace("msg:%p, put into worker:%s", pMsg, pWorker->name); - return dndWriteMsgToWorker(pWorker, pMsg); + return taosWriteQitem(pWorker->queue, pMsg); } int32_t bmStartWorker(SBnodeMgmt *pMgmt) { - if (dndInitWorker(pMgmt, &pMgmt->writeWorker, DND_WORKER_MULTI, "bnode-write", 0, 1, bmProcessQueue) != 0) { + SWWorkerAllCfg cfg = {.maxNum = 1, .name = "bnode-write", .fp = (FItems)bmProcessQueue, .param = pMgmt}; + if (tWWorkerAllInit(&pMgmt->writeWorker, &cfg) != 0) { dError("failed to start bnode write worker since %s", terrstr()); return -1; } @@ -78,4 +79,4 @@ int32_t bmStartWorker(SBnodeMgmt *pMgmt) { return 0; } -void bmStopWorker(SBnodeMgmt *pMgmt) { dndCleanupWorker(&pMgmt->writeWorker); } +void bmStopWorker(SBnodeMgmt *pMgmt) { tWWorkerAllCleanup(&pMgmt->writeWorker); } diff --git a/source/dnode/mgmt/container/inc/dnd.h b/source/dnode/mgmt/container/inc/dnd.h index f9fa896f42..af552b5357 100644 --- a/source/dnode/mgmt/container/inc/dnd.h +++ b/source/dnode/mgmt/container/inc/dnd.h @@ -74,20 +74,6 @@ typedef int32_t (*CreateNodeFp)(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); typedef int32_t (*DropNodeFp)(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); typedef int32_t (*RequireNodeFp)(SMgmtWrapper *pWrapper, bool *required); -typedef struct { - EWorkerType type; - const char *name; - int32_t minNum; - int32_t maxNum; - void *queueFp; - void *param; - STaosQueue *queue; - union { - SQWorkerPool pool; - SWWorkerPool mpool; - }; -} SDnodeWorker; - typedef struct SMsgHandle { int32_t vgId; NodeMsgFp vgIdMsgFp; @@ -161,11 +147,6 @@ int32_t dndSendReqToMnode(SMgmtWrapper *pWrapper, SRpcMsg *pMsg); int32_t dndSendReqToDnode(SMgmtWrapper *pWrapper, SEpSet *pEpSet, SRpcMsg *pMsg); void dndSendRsp(SMgmtWrapper *pWrapper, SRpcMsg *pRsp); -int32_t dndInitWorker(void *param, SDnodeWorker *pWorker, EWorkerType type, const char *name, int32_t minNum, - int32_t maxNum, void *queueFp); -void dndCleanupWorker(SDnodeWorker *pWorker); -int32_t dndWriteMsgToWorker(SDnodeWorker *pWorker, void *pMsg); - int32_t dndProcessNodeMsg(SDnode *pDnode, SNodeMsg *pMsg); int32_t dndReadFile(SMgmtWrapper *pWrapper, bool *pDeployed); diff --git a/source/dnode/mgmt/container/src/dndWorker.c b/source/dnode/mgmt/container/src/dndWorker.c deleted file mode 100644 index 5d99ec5f0d..0000000000 --- a/source/dnode/mgmt/container/src/dndWorker.c +++ /dev/null @@ -1,96 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#define _DEFAULT_SOURCE -#include "dndInt.h" - -int32_t dndInitWorker(void *param, SDnodeWorker *pWorker, EWorkerType type, const char *name, int32_t minNum, - int32_t maxNum, void *queueFp) { - if (pWorker == NULL || name == NULL || minNum < 0 || maxNum <= 0 || queueFp == NULL) { - terrno = TSDB_CODE_INVALID_PARA; - return -1; - } - - pWorker->type = type; - pWorker->name = name; - pWorker->minNum = minNum; - pWorker->maxNum = maxNum; - pWorker->queueFp = queueFp; - pWorker->param = param; - - if (pWorker->type == DND_WORKER_SINGLE) { - SQWorkerPool *pPool = &pWorker->pool; - pPool->name = name; - pPool->min = minNum; - pPool->max = maxNum; - if (tQWorkerInit(pPool) != 0) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } - pWorker->queue = tQWorkerAllocQueue(pPool, param, (FItem)queueFp); - if (pWorker->queue == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } - } else if (pWorker->type == DND_WORKER_MULTI) { - SWWorkerPool *pPool = &pWorker->mpool; - pPool->name = name; - pPool->max = maxNum; - if (tWWorkerInit(pPool) != 0) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } - pWorker->queue = tWWorkerAllocQueue(pPool, param, (FItems)queueFp); - if (pWorker->queue == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } - } else { - terrno = TSDB_CODE_INVALID_PARA; - } - - return 0; -} - -void dndCleanupWorker(SDnodeWorker *pWorker) { - if (pWorker->queue == NULL) return; - - while (!taosQueueEmpty(pWorker->queue)) { - taosMsleep(10); - } - - if (pWorker->type == DND_WORKER_SINGLE) { - tQWorkerCleanup(&pWorker->pool); - tQWorkerFreeQueue(&pWorker->pool, pWorker->queue); - } else if (pWorker->type == DND_WORKER_MULTI) { - tWWorkerCleanup(&pWorker->mpool); - tWWorkerFreeQueue(&pWorker->mpool, pWorker->queue); - } else { - } -} - -int32_t dndWriteMsgToWorker(SDnodeWorker *pWorker, void *pMsg) { - if (pWorker == NULL || pWorker->queue == NULL) { - terrno = TSDB_CODE_INVALID_PARA; - return -1; - } - - if (taosWriteQitem(pWorker->queue, pMsg) != 0) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } - - return 0; -} diff --git a/source/dnode/mgmt/dnode/inc/dmInt.h b/source/dnode/mgmt/dnode/inc/dmInt.h index 46e70727af..0330b7f996 100644 --- a/source/dnode/mgmt/dnode/inc/dmInt.h +++ b/source/dnode/mgmt/dnode/inc/dmInt.h @@ -29,10 +29,10 @@ typedef struct SDnodeMgmt { SEpSet mnodeEpSet; SHashObj *dnodeHash; SArray *dnodeEps; - TdThread *threadId; + TdThread *threadId; SRWLatch latch; - SDnodeWorker mgmtWorker; - SDnodeWorker statusWorker; + SQWorkerAll mgmtWorker; + SQWorkerAll statusWorker; const char *path; SDnode *pDnode; SMgmtWrapper *pWrapper; diff --git a/source/dnode/mgmt/dnode/src/dmWorker.c b/source/dnode/mgmt/dnode/src/dmWorker.c index e06cbf4351..5bacd5f10b 100644 --- a/source/dnode/mgmt/dnode/src/dmWorker.c +++ b/source/dnode/mgmt/dnode/src/dmWorker.c @@ -53,9 +53,9 @@ static void *dmThreadRoutine(void *param) { } static void dmProcessQueue(SDnodeMgmt *pMgmt, SNodeMsg *pMsg) { - SDnode *pDnode = pMgmt->pDnode; + SDnode *pDnode = pMgmt->pDnode; SRpcMsg *pRpc = &pMsg->rpcMsg; - int32_t code = -1; + int32_t code = -1; dTrace("msg:%p, will be processed in dnode queue", pMsg); switch (pRpc->msgType) { @@ -98,13 +98,17 @@ static void dmProcessQueue(SDnodeMgmt *pMgmt, SNodeMsg *pMsg) { } int32_t dmStartWorker(SDnodeMgmt *pMgmt) { - if (dndInitWorker(pMgmt, &pMgmt->mgmtWorker, DND_WORKER_SINGLE, "dnode-mgmt", 1, 1, dmProcessQueue) != 0) { + SQWorkerAllCfg mgmtCfg = { + .minNum = 0, .maxNum = 1, .name = "dnode-mgmt", .fp = (FItem)dmProcessQueue, .param = pMgmt}; + if (tQWorkerAllInit(&pMgmt->mgmtWorker, &mgmtCfg) != 0) { dError("failed to start dnode mgmt worker since %s", terrstr()); return -1; } - if (dndInitWorker(pMgmt, &pMgmt->statusWorker, DND_WORKER_SINGLE, "dnode-status", 1, 1, dmProcessQueue) != 0) { - dError("failed to start dnode mgmt worker since %s", terrstr()); + SQWorkerAllCfg statusCfg = { + .minNum = 0, .maxNum = 1, .name = "dnode-status", .fp = (FItem)dmProcessQueue, .param = pMgmt}; + if (tQWorkerAllInit(&pMgmt->statusWorker, &statusCfg) != 0) { + dError("failed to start dnode status worker since %s", terrstr()); return -1; } @@ -123,8 +127,8 @@ int32_t dmStartThread(SDnodeMgmt *pMgmt) { } void dmStopWorker(SDnodeMgmt *pMgmt) { - dndCleanupWorker(&pMgmt->mgmtWorker); - dndCleanupWorker(&pMgmt->statusWorker); + tQWorkerAllCleanup(&pMgmt->mgmtWorker); + tQWorkerAllCleanup(&pMgmt->statusWorker); if (pMgmt->threadId != NULL) { taosDestoryThread(pMgmt->threadId); @@ -133,11 +137,11 @@ void dmStopWorker(SDnodeMgmt *pMgmt) { } int32_t dmProcessMgmtMsg(SDnodeMgmt *pMgmt, SNodeMsg *pMsg) { - SDnodeWorker *pWorker = &pMgmt->mgmtWorker; + SQWorkerAll *pWorker = &pMgmt->mgmtWorker; if (pMsg->rpcMsg.msgType == TDMT_MND_STATUS_RSP) { pWorker = &pMgmt->statusWorker; } dTrace("msg:%p, will be written to worker %s", pMsg, pWorker->name); - return dndWriteMsgToWorker(pWorker, pMsg); + return taosWriteQitem(pWorker->queue, pMsg); } diff --git a/source/dnode/mgmt/qnode/inc/qmInt.h b/source/dnode/mgmt/qnode/inc/qmInt.h index dc7f10b939..c78d04ffb5 100644 --- a/source/dnode/mgmt/qnode/inc/qmInt.h +++ b/source/dnode/mgmt/qnode/inc/qmInt.h @@ -28,8 +28,8 @@ typedef struct SQnodeMgmt { SDnode *pDnode; SMgmtWrapper *pWrapper; const char *path; - SDnodeWorker queryWorker; - SDnodeWorker fetchWorker; + SQWorkerAll queryWorker; + SQWorkerAll fetchWorker; } SQnodeMgmt; // qmInt.c diff --git a/source/dnode/mgmt/qnode/src/qmWorker.c b/source/dnode/mgmt/qnode/src/qmWorker.c index 1a140c69e7..c828d4f3c3 100644 --- a/source/dnode/mgmt/qnode/src/qmWorker.c +++ b/source/dnode/mgmt/qnode/src/qmWorker.c @@ -45,16 +45,16 @@ static void qmProcessFetchQueue(SQnodeMgmt *pMgmt, SNodeMsg *pMsg) { taosFreeQitem(pMsg); } -static int32_t qmPutMsgToWorker(SDnodeWorker *pWorker, SNodeMsg *pMsg) { +static int32_t qmPutMsgToWorker(SQWorkerAll *pWorker, SNodeMsg *pMsg) { dTrace("msg:%p, put into worker %s", pMsg, pWorker->name); - return dndWriteMsgToWorker(pWorker, pMsg); + return taosWriteQitem(pWorker->queue, pMsg); } int32_t qmProcessQueryMsg(SQnodeMgmt *pMgmt, SNodeMsg *pMsg) { return qmPutMsgToWorker(&pMgmt->queryWorker, pMsg); } int32_t qmProcessFetchMsg(SQnodeMgmt *pMgmt, SNodeMsg *pMsg) { return qmPutMsgToWorker(&pMgmt->fetchWorker, pMsg); } -static int32_t qmPutRpcMsgToWorker(SQnodeMgmt *pMgmt, SDnodeWorker *pWorker, SRpcMsg *pRpc) { +static int32_t qmPutRpcMsgToWorker(SQnodeMgmt *pMgmt, SQWorkerAll *pWorker, SRpcMsg *pRpc) { SNodeMsg *pMsg = taosAllocateQitem(sizeof(SNodeMsg)); if (pMsg == NULL) { return -1; @@ -63,7 +63,7 @@ static int32_t qmPutRpcMsgToWorker(SQnodeMgmt *pMgmt, SDnodeWorker *pWorker, SRp dTrace("msg:%p, is created and put into worker:%s, type:%s", pMsg, pWorker->name, TMSG_INFO(pRpc->msgType)); pMsg->rpcMsg = *pRpc; - int32_t code = dndWriteMsgToWorker(pWorker, pMsg); + int32_t code = taosWriteQitem(pWorker->queue, pMsg); if (code != 0) { dTrace("msg:%p, is freed", pMsg); taosFreeQitem(pMsg); @@ -89,15 +89,25 @@ int32_t qmStartWorker(SQnodeMgmt *pMgmt) { int32_t minQueryThreads = TMAX((int32_t)(tsNumOfCores * tsRatioOfQueryCores), 1); int32_t maxQueryThreads = minQueryThreads; - if (dndInitWorker(pMgmt, &pMgmt->queryWorker, DND_WORKER_SINGLE, "qnode-query", minQueryThreads, maxQueryThreads, - qmProcessQueryQueue) != 0) { - dError("failed to start qnode query worker since %s", terrstr()); + SQWorkerAllCfg queryCfg = {.minNum = minQueryThreads, + .maxNum = maxQueryThreads, + .name = "qnode-query", + .fp = (FItem)qmProcessQueryQueue, + .param = pMgmt}; + + if (tQWorkerAllInit(&pMgmt->queryWorker, &queryCfg) != 0) { + dError("failed to start qnode-query worker since %s", terrstr()); return -1; } - if (dndInitWorker(pMgmt, &pMgmt->fetchWorker, DND_WORKER_SINGLE, "qnode-fetch", minFetchThreads, maxFetchThreads, - qmProcessFetchQueue) != 0) { - dError("failed to start qnode fetch worker since %s", terrstr()); + SQWorkerAllCfg fetchCfg = {.minNum = minFetchThreads, + .maxNum = maxFetchThreads, + .name = "qnode-fetch", + .fp = (FItem)qmProcessFetchQueue, + .param = pMgmt}; + + if (tQWorkerAllInit(&pMgmt->queryWorker, &fetchCfg) != 0) { + dError("failed to start qnode-fetch worker since %s", terrstr()); return -1; } @@ -105,6 +115,6 @@ int32_t qmStartWorker(SQnodeMgmt *pMgmt) { } void qmStopWorker(SQnodeMgmt *pMgmt) { - dndCleanupWorker(&pMgmt->queryWorker); - dndCleanupWorker(&pMgmt->fetchWorker); + tQWorkerAllCleanup(&pMgmt->queryWorker); + tQWorkerAllCleanup(&pMgmt->fetchWorker); } diff --git a/source/dnode/mgmt/snode/inc/smInt.h b/source/dnode/mgmt/snode/inc/smInt.h index 3def27b832..744089efae 100644 --- a/source/dnode/mgmt/snode/inc/smInt.h +++ b/source/dnode/mgmt/snode/inc/smInt.h @@ -30,8 +30,8 @@ typedef struct SSnodeMgmt { const char *path; SRWLatch latch; int8_t uniqueWorkerInUse; - SArray *uniqueWorkers; // SArray - SDnodeWorker sharedWorker; + SArray *uniqueWorkers; // SArray + SQWorkerAll sharedWorker; } SSnodeMgmt; // smInt.c diff --git a/source/dnode/mgmt/snode/src/smWorker.c b/source/dnode/mgmt/snode/src/smWorker.c index 57d0c09849..99c844f8b5 100644 --- a/source/dnode/mgmt/snode/src/smWorker.c +++ b/source/dnode/mgmt/snode/src/smWorker.c @@ -40,20 +40,23 @@ static void smProcessSharedQueue(SSnodeMgmt *pMgmt, SNodeMsg *pMsg) { } int32_t smStartWorker(SSnodeMgmt *pMgmt) { - pMgmt->uniqueWorkers = taosArrayInit(0, sizeof(void *)); + pMgmt->uniqueWorkers = taosArrayInit(0, sizeof(SWWorkerAll *)); if (pMgmt->uniqueWorkers == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } for (int32_t i = 0; i < SND_UNIQUE_THREAD_NUM; i++) { - SDnodeWorker *pUniqueWorker = malloc(sizeof(SDnodeWorker)); + SWWorkerAll *pUniqueWorker = malloc(sizeof(SWWorkerAll)); if (pUniqueWorker == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } - if (dndInitWorker(pMgmt, pUniqueWorker, DND_WORKER_MULTI, "snode-unique", 1, 1, smProcessSharedQueue) != 0) { - dError("failed to start snode unique worker since %s", terrstr()); + + SWWorkerAllCfg cfg = {.maxNum = 1, .name = "snode-unique", .fp = (FItems)smProcessUniqueQueue, .param = pMgmt}; + + if (tWWorkerAllInit(pUniqueWorker, &cfg) != 0) { + dError("failed to start snode-unique worker since %s", terrstr()); return -1; } if (taosArrayPush(pMgmt->uniqueWorkers, &pUniqueWorker) == NULL) { @@ -62,9 +65,14 @@ int32_t smStartWorker(SSnodeMgmt *pMgmt) { } } - if (dndInitWorker(pMgmt, &pMgmt->sharedWorker, DND_WORKER_SINGLE, "snode-shared", SND_SHARED_THREAD_NUM, - SND_SHARED_THREAD_NUM, smProcessSharedQueue)) { - dError("failed to start snode shared worker since %s", terrstr()); + SQWorkerAllCfg cfg = {.minNum = SND_SHARED_THREAD_NUM, + .maxNum = SND_SHARED_THREAD_NUM, + .name = "snode-shared", + .fp = (FItem)smProcessSharedQueue, + .param = pMgmt}; + + if (tQWorkerAllInit(&pMgmt->sharedWorker, &cfg)) { + dError("failed to start snode shared-worker since %s", terrstr()); return -1; } @@ -73,11 +81,11 @@ int32_t smStartWorker(SSnodeMgmt *pMgmt) { void smStopWorker(SSnodeMgmt *pMgmt) { for (int32_t i = 0; i < taosArrayGetSize(pMgmt->uniqueWorkers); i++) { - SDnodeWorker *worker = taosArrayGetP(pMgmt->uniqueWorkers, i); - dndCleanupWorker(worker); + SWWorkerAll *pWorker = taosArrayGetP(pMgmt->uniqueWorkers, i); + tWWorkerAllCleanup(pWorker); } taosArrayDestroy(pMgmt->uniqueWorkers); - dndCleanupWorker(&pMgmt->sharedWorker); + tQWorkerAllCleanup(&pMgmt->sharedWorker); } static FORCE_INLINE int32_t smGetSWIdFromMsg(SRpcMsg *pMsg) { @@ -93,33 +101,33 @@ static FORCE_INLINE int32_t smGetSWTypeFromMsg(SRpcMsg *pMsg) { } int32_t smProcessMgmtMsg(SSnodeMgmt *pMgmt, SNodeMsg *pMsg) { - SDnodeWorker *pWorker = taosArrayGetP(pMgmt->uniqueWorkers, 0); + SWWorkerAll *pWorker = taosArrayGetP(pMgmt->uniqueWorkers, 0); if (pWorker == NULL) { terrno = TSDB_CODE_INVALID_MSG; return -1; } dTrace("msg:%p, put into worker:%s", pMsg, pWorker->name); - return dndWriteMsgToWorker(pWorker, pMsg); + return taosWriteQitem(pWorker->queue, pMsg); } int32_t smProcessUniqueMsg(SSnodeMgmt *pMgmt, SNodeMsg *pMsg) { - int32_t index = smGetSWIdFromMsg(&pMsg->rpcMsg); - SDnodeWorker *pWorker = taosArrayGetP(pMgmt->uniqueWorkers, index); + int32_t index = smGetSWIdFromMsg(&pMsg->rpcMsg); + SWWorkerAll *pWorker = taosArrayGetP(pMgmt->uniqueWorkers, index); if (pWorker == NULL) { terrno = TSDB_CODE_INVALID_MSG; return -1; } dTrace("msg:%p, put into worker:%s", pMsg, pWorker->name); - return dndWriteMsgToWorker(pWorker, pMsg); + return taosWriteQitem(pWorker->queue, pMsg); } int32_t smProcessSharedMsg(SSnodeMgmt *pMgmt, SNodeMsg *pMsg) { - SDnodeWorker *pWorker = &pMgmt->sharedWorker; + SQWorkerAll *pWorker = &pMgmt->sharedWorker; dTrace("msg:%p, put into worker:%s", pMsg, pWorker->name); - return dndWriteMsgToWorker(pWorker, pMsg); + return taosWriteQitem(pWorker->queue, pMsg); } int32_t smProcessExecMsg(SSnodeMgmt *pMgmt, SNodeMsg *pMsg) { diff --git a/source/dnode/mgmt/vnode/inc/vmInt.h b/source/dnode/mgmt/vnode/inc/vmInt.h index ba59258d07..38a477178f 100644 --- a/source/dnode/mgmt/vnode/inc/vmInt.h +++ b/source/dnode/mgmt/vnode/inc/vmInt.h @@ -36,7 +36,7 @@ typedef struct SVnodesMgmt { const char *path; SDnode *pDnode; SMgmtWrapper *pWrapper; - SDnodeWorker mgmtWorker; + SQWorkerAll mgmtWorker; } SVnodesMgmt; typedef struct { diff --git a/source/dnode/mgmt/vnode/src/vmWorker.c b/source/dnode/mgmt/vnode/src/vmWorker.c index e0632cee68..2b9f31c867 100644 --- a/source/dnode/mgmt/vnode/src/vmWorker.c +++ b/source/dnode/mgmt/vnode/src/vmWorker.c @@ -207,9 +207,9 @@ int32_t vmProcessFetchMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) { } int32_t vmProcessMgmtMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) { - SDnodeWorker *pWorker = &pMgmt->mgmtWorker; + SQWorkerAll *pWorker = &pMgmt->mgmtWorker; dTrace("msg:%p, will be written to vnode-mgmt queue, worker:%s", pMsg, pWorker->name); - return dndWriteMsgToWorker(pWorker, pMsg); + return taosWriteQitem(pWorker->queue, pMsg); } static int32_t vmPutRpcMsgToQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc, EQueueType qtype) { @@ -319,7 +319,9 @@ int32_t vmStartWorker(SVnodesMgmt *pMgmt) { pWPool->max = maxSyncThreads; if (tWWorkerInit(pWPool) != 0) return -1; - if (dndInitWorker(pMgmt, &pMgmt->mgmtWorker, DND_WORKER_SINGLE, "vnode-mgmt", 1, 1, vmProcessMgmtQueue) != 0) { + SQWorkerAllCfg cfg = { + .minNum = 1, .maxNum = 1, .name = "vnode-mgmt", .fp = (FItem)vmProcessMgmtQueue, .param = pMgmt}; + if (tQWorkerAllInit(&pMgmt->mgmtWorker, &cfg) != 0) { dError("failed to start vnode-mgmt worker since %s", terrstr()); return -1; } @@ -329,7 +331,7 @@ int32_t vmStartWorker(SVnodesMgmt *pMgmt) { } void vmStopWorker(SVnodesMgmt *pMgmt) { - dndCleanupWorker(&pMgmt->mgmtWorker); + tQWorkerAllCleanup(&pMgmt->mgmtWorker); tQWorkerCleanup(&pMgmt->fetchPool); tQWorkerCleanup(&pMgmt->queryPool); tWWorkerCleanup(&pMgmt->writePool); From b36356ae57a5216d3eefe425ce4d419799341811 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Tue, 22 Mar 2022 15:53:29 +0800 Subject: [PATCH 2/6] get queue size --- include/common/tmsgcb.h | 6 ++++-- source/common/src/tmsgcb.c | 4 ++++ source/dnode/mgmt/qnode/inc/qmInt.h | 1 + source/dnode/mgmt/qnode/src/qmInt.c | 1 + source/dnode/mgmt/qnode/src/qmWorker.c | 17 ++++++++++++++++ source/dnode/mgmt/vnode/inc/vmInt.h | 1 + source/dnode/mgmt/vnode/src/vmInt.c | 1 + source/dnode/mgmt/vnode/src/vmMsg.c | 1 + source/dnode/mgmt/vnode/src/vmWorker.c | 28 ++++++++++++++++++++++++++ 9 files changed, 58 insertions(+), 2 deletions(-) diff --git a/include/common/tmsgcb.h b/include/common/tmsgcb.h index 5ce4b16300..54a145ff33 100644 --- a/include/common/tmsgcb.h +++ b/include/common/tmsgcb.h @@ -25,23 +25,25 @@ extern "C" { typedef struct SRpcMsg SRpcMsg; typedef struct SEpSet SEpSet; typedef struct SMgmtWrapper SMgmtWrapper; +typedef enum { QUERY_QUEUE, FETCH_QUEUE, WRITE_QUEUE, APPLY_QUEUE, SYNC_QUEUE, QUEUE_MAX } EQueueType; typedef int32_t (*PutToQueueFp)(SMgmtWrapper* pWrapper, SRpcMsg* pReq); +typedef int32_t (*GetQueueSizeFp)(SMgmtWrapper* pWrapper, int32_t vgId, EQueueType qtype); typedef int32_t (*SendReqFp)(SMgmtWrapper* pWrapper, SEpSet* epSet, SRpcMsg* pReq); typedef int32_t (*SendMnodeReqFp)(SMgmtWrapper* pWrapper, SRpcMsg* pReq); typedef void (*SendRspFp)(SMgmtWrapper* pWrapper, SRpcMsg* pRsp); -typedef enum { QUERY_QUEUE, FETCH_QUEUE, WRITE_QUEUE, APPLY_QUEUE, SYNC_QUEUE, QUEUE_MAX } EQueueType; - typedef struct { SMgmtWrapper* pWrapper; PutToQueueFp queueFps[QUEUE_MAX]; + GetQueueSizeFp qsizeFp; SendReqFp sendReqFp; SendMnodeReqFp sendMnodeReqFp; SendRspFp sendRspFp; } SMsgCb; int32_t tmsgPutToQueue(const SMsgCb* pMsgCb, EQueueType qtype, SRpcMsg* pReq); +int32_t tmsgGetQueueSize(const SMsgCb* pMsgCb, int32_t vgId, EQueueType qtype); int32_t tmsgSendReq(const SMsgCb* pMsgCb, SEpSet* epSet, SRpcMsg* pReq); int32_t tmsgSendMnodeReq(const SMsgCb* pMsgCb, SRpcMsg* pReq); void tmsgSendRsp(const SMsgCb* pMsgCb, SRpcMsg* pRsp); diff --git a/source/common/src/tmsgcb.c b/source/common/src/tmsgcb.c index b13f727db6..98ee1b679d 100644 --- a/source/common/src/tmsgcb.c +++ b/source/common/src/tmsgcb.c @@ -20,6 +20,10 @@ int32_t tmsgPutToQueue(const SMsgCb* pMsgCb, EQueueType qtype, SRpcMsg* pReq) { return (*pMsgCb->queueFps[qtype])(pMsgCb->pWrapper, pReq); } +int32_t tmsgGetQueueSize(const SMsgCb* pMsgCb, int32_t vgId, EQueueType qtype) { + return (*pMsgCb->qsizeFp)(pMsgCb->pWrapper, vgId, qtype); +} + int32_t tmsgSendReq(const SMsgCb* pMsgCb, SEpSet* epSet, SRpcMsg* pReq) { return (*pMsgCb->sendReqFp)(pMsgCb->pWrapper, epSet, pReq); } diff --git a/source/dnode/mgmt/qnode/inc/qmInt.h b/source/dnode/mgmt/qnode/inc/qmInt.h index c78d04ffb5..3f1f62ebd5 100644 --- a/source/dnode/mgmt/qnode/inc/qmInt.h +++ b/source/dnode/mgmt/qnode/inc/qmInt.h @@ -44,6 +44,7 @@ int32_t qmProcessDropReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); // qmWorker.c int32_t qmPutMsgToQueryQueue(SMgmtWrapper *pWrapper, SRpcMsg *pMsg); int32_t qmPutMsgToFetchQueue(SMgmtWrapper *pWrapper, SRpcMsg *pMsg); +int32_t qmGetQueueSize(SMgmtWrapper *pWrapper, int32_t vgId, EQueueType qtype); int32_t qmStartWorker(SQnodeMgmt *pMgmt); void qmStopWorker(SQnodeMgmt *pMgmt); diff --git a/source/dnode/mgmt/qnode/src/qmInt.c b/source/dnode/mgmt/qnode/src/qmInt.c index 40959192c3..2bbfbd83f5 100644 --- a/source/dnode/mgmt/qnode/src/qmInt.c +++ b/source/dnode/mgmt/qnode/src/qmInt.c @@ -23,6 +23,7 @@ static void qmInitOption(SQnodeMgmt *pMgmt, SQnodeOpt *pOption) { msgCb.pWrapper = pMgmt->pWrapper; msgCb.queueFps[QUERY_QUEUE] = qmPutMsgToQueryQueue; msgCb.queueFps[FETCH_QUEUE] = qmPutMsgToFetchQueue; + msgCb.qsizeFp = qmGetQueueSize; msgCb.sendReqFp = dndSendReqToDnode; msgCb.sendMnodeReqFp = dndSendReqToMnode; msgCb.sendRspFp = dndSendRsp; diff --git a/source/dnode/mgmt/qnode/src/qmWorker.c b/source/dnode/mgmt/qnode/src/qmWorker.c index c828d4f3c3..19686e2a76 100644 --- a/source/dnode/mgmt/qnode/src/qmWorker.c +++ b/source/dnode/mgmt/qnode/src/qmWorker.c @@ -83,6 +83,23 @@ int32_t qmPutMsgToFetchQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc) { return qmPutRpcMsgToWorker(pMgmt, &pMgmt->fetchWorker, pRpc); } +int32_t qmGetQueueSize(SMgmtWrapper *pWrapper, int32_t vgId, EQueueType qtype) { + int32_t size = -1; + SQnodeMgmt *pMgmt = pWrapper->pMgmt; + switch (qtype) { + case QUERY_QUEUE: + size = taosQueueSize(pMgmt->queryWorker.queue); + break; + case FETCH_QUEUE: + size = taosQueueSize(pMgmt->fetchWorker.queue); + break; + default: + break; + } + + return size; +} + int32_t qmStartWorker(SQnodeMgmt *pMgmt) { int32_t maxFetchThreads = 4; int32_t minFetchThreads = TMIN(maxFetchThreads, tsNumOfCores); diff --git a/source/dnode/mgmt/vnode/inc/vmInt.h b/source/dnode/mgmt/vnode/inc/vmInt.h index 38a477178f..ce1a004b65 100644 --- a/source/dnode/mgmt/vnode/inc/vmInt.h +++ b/source/dnode/mgmt/vnode/inc/vmInt.h @@ -104,6 +104,7 @@ void vmFreeQueue(SVnodesMgmt *pMgmt, SVnodeObj *pVnode); int32_t vmPutMsgToQueryQueue(SMgmtWrapper *pWrapper, SRpcMsg *pMsg); int32_t vmPutMsgToFetchQueue(SMgmtWrapper *pWrapper, SRpcMsg *pMsg); int32_t vmPutMsgToApplyQueue(SMgmtWrapper *pWrapper, SRpcMsg *pMsg); +int32_t vmGetQueueSize(SMgmtWrapper *pWrapper, int32_t vgId, EQueueType qtype); int32_t vmProcessWriteMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg); int32_t vmProcessSyncMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg); diff --git a/source/dnode/mgmt/vnode/src/vmInt.c b/source/dnode/mgmt/vnode/src/vmInt.c index 1c6c7d089e..a324c60618 100644 --- a/source/dnode/mgmt/vnode/src/vmInt.c +++ b/source/dnode/mgmt/vnode/src/vmInt.c @@ -133,6 +133,7 @@ static void *vmOpenVnodeFunc(void *param) { msgCb.queueFps[QUERY_QUEUE] = vmPutMsgToQueryQueue; msgCb.queueFps[FETCH_QUEUE] = vmPutMsgToFetchQueue; msgCb.queueFps[APPLY_QUEUE] = vmPutMsgToApplyQueue; + msgCb.qsizeFp = vmGetQueueSize; msgCb.sendReqFp = dndSendReqToDnode; msgCb.sendMnodeReqFp = dndSendReqToMnode; msgCb.sendRspFp = dndSendRsp; diff --git a/source/dnode/mgmt/vnode/src/vmMsg.c b/source/dnode/mgmt/vnode/src/vmMsg.c index 9f86351985..e4a4cfcd9f 100644 --- a/source/dnode/mgmt/vnode/src/vmMsg.c +++ b/source/dnode/mgmt/vnode/src/vmMsg.c @@ -87,6 +87,7 @@ int32_t vmProcessCreateVnodeReq(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) { msgCb.queueFps[QUERY_QUEUE] = vmPutMsgToQueryQueue; msgCb.queueFps[FETCH_QUEUE] = vmPutMsgToFetchQueue; msgCb.queueFps[APPLY_QUEUE] = vmPutMsgToApplyQueue; + msgCb.qsizeFp = vmGetQueueSize; msgCb.sendReqFp = dndSendReqToDnode; msgCb.sendMnodeReqFp = dndSendReqToMnode; msgCb.sendRspFp = dndSendRsp; diff --git a/source/dnode/mgmt/vnode/src/vmWorker.c b/source/dnode/mgmt/vnode/src/vmWorker.c index 2b9f31c867..dd398cb202 100644 --- a/source/dnode/mgmt/vnode/src/vmWorker.c +++ b/source/dnode/mgmt/vnode/src/vmWorker.c @@ -258,6 +258,34 @@ int32_t vmPutMsgToApplyQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc) { return vmPutRpcMsgToQueue(pWrapper, pRpc, APPLY_QUEUE); } +int32_t vmGetQueueSize(SMgmtWrapper *pWrapper, int32_t vgId, EQueueType qtype) { + int32_t size = -1; + SVnodeObj *pVnode = vmAcquireVnode(pWrapper->pMgmt, vgId); + if (pVnode != NULL) { + switch (qtype) { + case QUERY_QUEUE: + size = taosQueueSize(pVnode->pQueryQ); + break; + case FETCH_QUEUE: + size = taosQueueSize(pVnode->pFetchQ); + break; + case WRITE_QUEUE: + size = taosQueueSize(pVnode->pWriteQ); + break; + case SYNC_QUEUE: + size = taosQueueSize(pVnode->pSyncQ); + break; + case APPLY_QUEUE: + size = taosQueueSize(pVnode->pApplyQ); + break; + default: + break; + } + } + vmReleaseVnode(pWrapper->pMgmt, pVnode); + return size; +} + int32_t vmAllocQueue(SVnodesMgmt *pMgmt, SVnodeObj *pVnode) { pVnode->pWriteQ = tWWorkerAllocQueue(&pMgmt->writePool, pVnode, (FItems)vmProcessWriteQueue); pVnode->pApplyQ = tWWorkerAllocQueue(&pMgmt->writePool, pVnode, (FItems)vmProcessApplyQueue); From a340baa8795faea0c1463eb088e2de941cdd4ea7 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Tue, 22 Mar 2022 16:10:26 +0800 Subject: [PATCH 3/6] refact queue --- include/util/tqueue.h | 11 +++++++++-- include/util/tworker.h | 3 ++- source/dnode/mgmt/bnode/src/bmWorker.c | 3 ++- source/dnode/mgmt/dnode/src/dmWorker.c | 4 +++- source/dnode/mgmt/mnode/src/mmWorker.c | 4 +++- source/dnode/mgmt/qnode/src/qmWorker.c | 8 ++++++-- source/dnode/mgmt/snode/src/smWorker.c | 10 +++++++--- source/dnode/mgmt/vnode/src/vmWorker.c | 26 ++++++++++++++++++-------- source/util/src/tworker.c | 6 +++++- 9 files changed, 55 insertions(+), 20 deletions(-) diff --git a/include/util/tqueue.h b/include/util/tqueue.h index 9faf90113e..e84875bd7d 100644 --- a/include/util/tqueue.h +++ b/include/util/tqueue.h @@ -42,8 +42,15 @@ shall be used to set up the protection. typedef struct STaosQueue STaosQueue; typedef struct STaosQset STaosQset; typedef struct STaosQall STaosQall; -typedef void (*FItem)(void *ahandle, void *pItem); -typedef void (*FItems)(void *ahandle, STaosQall *qall, int32_t numOfItems); +typedef struct { + void *ahandle; + int32_t qsize; + int32_t workerId; + int32_t threadNum; +} SQueueInfo; + +typedef void (*FItem)(SQueueInfo *pInfo, void *pItem); +typedef void (*FItems)(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfItems); STaosQueue *taosOpenQueue(); void taosCloseQueue(STaosQueue *queue); diff --git a/include/util/tworker.h b/include/util/tworker.h index 91f4fbf7ff..f8ff45269b 100644 --- a/include/util/tworker.h +++ b/include/util/tworker.h @@ -50,7 +50,8 @@ typedef struct SWWorker { } SWWorker; typedef struct SWWorkerPool { - int32_t max; // max number of workers + int32_t max; // max number of workers + int32_t num; int32_t nextId; // from 0 to max-1, cyclic const char *name; SWWorker *workers; diff --git a/source/dnode/mgmt/bnode/src/bmWorker.c b/source/dnode/mgmt/bnode/src/bmWorker.c index 79b2a669ae..42490d66cf 100644 --- a/source/dnode/mgmt/bnode/src/bmWorker.c +++ b/source/dnode/mgmt/bnode/src/bmWorker.c @@ -33,7 +33,8 @@ static void bmSendErrorRsps(SMgmtWrapper *pWrapper, STaosQall *qall, int32_t num } } -static void bmProcessQueue(SBnodeMgmt *pMgmt, STaosQall *qall, int32_t numOfMsgs) { +static void bmProcessQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) { + SBnodeMgmt *pMgmt = pInfo->ahandle; SMgmtWrapper *pWrapper = pMgmt->pWrapper; SArray *pArray = taosArrayInit(numOfMsgs, sizeof(SNodeMsg *)); diff --git a/source/dnode/mgmt/dnode/src/dmWorker.c b/source/dnode/mgmt/dnode/src/dmWorker.c index 5bacd5f10b..79ed6d50c8 100644 --- a/source/dnode/mgmt/dnode/src/dmWorker.c +++ b/source/dnode/mgmt/dnode/src/dmWorker.c @@ -52,7 +52,9 @@ static void *dmThreadRoutine(void *param) { } } -static void dmProcessQueue(SDnodeMgmt *pMgmt, SNodeMsg *pMsg) { +static void dmProcessQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) { + SDnodeMgmt *pMgmt = pInfo->ahandle; + SDnode *pDnode = pMgmt->pDnode; SRpcMsg *pRpc = &pMsg->rpcMsg; int32_t code = -1; diff --git a/source/dnode/mgmt/mnode/src/mmWorker.c b/source/dnode/mgmt/mnode/src/mmWorker.c index 470128940b..9f64bec7c5 100644 --- a/source/dnode/mgmt/mnode/src/mmWorker.c +++ b/source/dnode/mgmt/mnode/src/mmWorker.c @@ -16,7 +16,9 @@ #define _DEFAULT_SOURCE #include "mmInt.h" -static void mmProcessQueue(SMnodeMgmt *pMgmt, SNodeMsg *pMsg) { +static void mmProcessQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) { + SMnodeMgmt *pMgmt = pInfo->ahandle; + dTrace("msg:%p, will be processed in mnode queue", pMsg); SRpcMsg *pRpc = &pMsg->rpcMsg; int32_t code = -1; diff --git a/source/dnode/mgmt/qnode/src/qmWorker.c b/source/dnode/mgmt/qnode/src/qmWorker.c index 19686e2a76..5f71837269 100644 --- a/source/dnode/mgmt/qnode/src/qmWorker.c +++ b/source/dnode/mgmt/qnode/src/qmWorker.c @@ -21,7 +21,9 @@ static void qmSendRsp(SMgmtWrapper *pWrapper, SNodeMsg *pMsg, int32_t code) { dndSendRsp(pWrapper, &rsp); } -static void qmProcessQueryQueue(SQnodeMgmt *pMgmt, SNodeMsg *pMsg) { +static void qmProcessQueryQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) { + SQnodeMgmt *pMgmt = pInfo->ahandle; + dTrace("msg:%p, will be processed in qnode-query queue", pMsg); int32_t code = qndProcessQueryMsg(pMgmt->pQnode, &pMsg->rpcMsg); if (code != 0) { @@ -33,7 +35,9 @@ static void qmProcessQueryQueue(SQnodeMgmt *pMgmt, SNodeMsg *pMsg) { taosFreeQitem(pMsg); } -static void qmProcessFetchQueue(SQnodeMgmt *pMgmt, SNodeMsg *pMsg) { +static void qmProcessFetchQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) { + SQnodeMgmt *pMgmt = pInfo->ahandle; + dTrace("msg:%p, will be processed in qnode-fetch queue", pMsg); int32_t code = qndProcessFetchMsg(pMgmt->pQnode, &pMsg->rpcMsg); if (code != 0) { diff --git a/source/dnode/mgmt/snode/src/smWorker.c b/source/dnode/mgmt/snode/src/smWorker.c index 99c844f8b5..c008d8175f 100644 --- a/source/dnode/mgmt/snode/src/smWorker.c +++ b/source/dnode/mgmt/snode/src/smWorker.c @@ -16,7 +16,9 @@ #define _DEFAULT_SOURCE #include "smInt.h" -static void smProcessUniqueQueue(SSnodeMgmt *pMgmt, STaosQall *qall, int32_t numOfMsgs) { +static void smProcessUniqueQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) { + SSnodeMgmt *pMgmt = pInfo->ahandle; + for (int32_t i = 0; i < numOfMsgs; i++) { SNodeMsg *pMsg = NULL; taosGetQitem(qall, (void **)&pMsg); @@ -30,7 +32,9 @@ static void smProcessUniqueQueue(SSnodeMgmt *pMgmt, STaosQall *qall, int32_t num } } -static void smProcessSharedQueue(SSnodeMgmt *pMgmt, SNodeMsg *pMsg) { +static void smProcessSharedQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) { + SSnodeMgmt *pMgmt = pInfo->ahandle; + dTrace("msg:%p, will be processed in snode shared queue", pMsg); sndProcessSMsg(pMgmt->pSnode, &pMsg->rpcMsg); @@ -53,7 +57,7 @@ int32_t smStartWorker(SSnodeMgmt *pMgmt) { return -1; } - SWWorkerAllCfg cfg = {.maxNum = 1, .name = "snode-unique", .fp = (FItems)smProcessUniqueQueue, .param = pMgmt}; + SWWorkerAllCfg cfg = {.maxNum = 1, .name = "snode-unique", .fp = smProcessUniqueQueue, .param = pMgmt}; if (tWWorkerAllInit(pUniqueWorker, &cfg) != 0) { dError("failed to start snode-unique worker since %s", terrstr()); diff --git a/source/dnode/mgmt/vnode/src/vmWorker.c b/source/dnode/mgmt/vnode/src/vmWorker.c index dd398cb202..73aff7f66d 100644 --- a/source/dnode/mgmt/vnode/src/vmWorker.c +++ b/source/dnode/mgmt/vnode/src/vmWorker.c @@ -21,7 +21,9 @@ static void vmSendRsp(SMgmtWrapper *pWrapper, SNodeMsg *pMsg, int32_t code) { dndSendRsp(pWrapper, &rsp); } -static void vmProcessMgmtQueue(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) { +static void vmProcessMgmtQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) { + SVnodesMgmt *pMgmt = pInfo->ahandle; + int32_t code = -1; tmsg_t msgType = pMsg->rpcMsg.msgType; dTrace("msg:%p, will be processed in vnode-mgmt queue", pMsg); @@ -57,7 +59,9 @@ static void vmProcessMgmtQueue(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) { taosFreeQitem(pMsg); } -static void vmProcessQueryQueue(SVnodeObj *pVnode, SNodeMsg *pMsg) { +static void vmProcessQueryQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) { + SVnodeObj *pVnode = pInfo->ahandle; + dTrace("msg:%p, will be processed in vnode-query queue", pMsg); int32_t code = vnodeProcessQueryMsg(pVnode->pImpl, &pMsg->rpcMsg); if (code != 0) { @@ -68,7 +72,9 @@ static void vmProcessQueryQueue(SVnodeObj *pVnode, SNodeMsg *pMsg) { } } -static void vmProcessFetchQueue(SVnodeObj *pVnode, SNodeMsg *pMsg) { +static void vmProcessFetchQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) { + SVnodeObj *pVnode = pInfo->ahandle; + dTrace("msg:%p, will be processed in vnode-fetch queue", pMsg); int32_t code = vnodeProcessFetchMsg(pVnode->pImpl, &pMsg->rpcMsg); if (code != 0) { @@ -79,7 +85,9 @@ static void vmProcessFetchQueue(SVnodeObj *pVnode, SNodeMsg *pMsg) { } } -static void vmProcessWriteQueue(SVnodeObj *pVnode, STaosQall *qall, int32_t numOfMsgs) { +static void vmProcessWriteQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) { + SVnodeObj *pVnode = pInfo->ahandle; + SArray *pArray = taosArrayInit(numOfMsgs, sizeof(SNodeMsg *)); if (pArray == NULL) { dError("failed to process %d msgs in write-queue since %s", numOfMsgs, terrstr()); @@ -126,8 +134,9 @@ static void vmProcessWriteQueue(SVnodeObj *pVnode, STaosQall *qall, int32_t numO taosArrayDestroy(pArray); } -static void vmProcessApplyQueue(SVnodeObj *pVnode, STaosQall *qall, int32_t numOfMsgs) { - SNodeMsg *pMsg = NULL; +static void vmProcessApplyQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) { + SVnodeObj *pVnode = pInfo->ahandle; + SNodeMsg *pMsg = NULL; for (int32_t i = 0; i < numOfMsgs; ++i) { taosGetQitem(qall, (void **)&pMsg); @@ -138,8 +147,9 @@ static void vmProcessApplyQueue(SVnodeObj *pVnode, STaosQall *qall, int32_t numO } } -static void vmProcessSyncQueue(SVnodeObj *pVnode, STaosQall *qall, int32_t numOfMsgs) { - SNodeMsg *pMsg = NULL; +static void vmProcessSyncQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) { + SVnodeObj *pVnode = pInfo->ahandle; + SNodeMsg *pMsg = NULL; for (int32_t i = 0; i < numOfMsgs; ++i) { taosGetQitem(qall, (void **)&pMsg); diff --git a/source/util/src/tworker.c b/source/util/src/tworker.c index e05c4e0a78..44fc2f9f93 100644 --- a/source/util/src/tworker.c +++ b/source/util/src/tworker.c @@ -86,7 +86,8 @@ static void *tQWorkerThreadFp(SQWorker *worker) { } if (fp != NULL) { - (*fp)(ahandle, msg); + SQueueInfo info = {.ahandle = ahandle, .workerId = worker->id, .threadNum = pool->num}; + (*fp)(&info, msg); } } @@ -210,6 +211,7 @@ static void *tWWorkerThreadFp(SWWorker *worker) { } if (fp != NULL) { + SQueueInfo info = {.ahandle = ahandle, .workerId = worker->id, .threadNum = pool->num}; (*fp)(ahandle, worker->qall, numOfMsgs); } } @@ -264,6 +266,8 @@ STaosQueue *tWWorkerAllocQueue(SWWorkerPool *pool, void *ahandle, FItems fp) { } taosThreadAttrDestroy(&thAttr); + pool->num++; + if (pool->num > pool->max) pool->num = pool->max; } else { taosAddIntoQset(worker->qset, queue, ahandle); pool->nextId = (pool->nextId + 1) % pool->max; From f48fddc7c43224d260752ecb46f34c533fbb877b Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Tue, 22 Mar 2022 16:31:21 +0800 Subject: [PATCH 4/6] refact queue --- include/util/tqueue.h | 1 - source/dnode/mgmt/dnode/src/dmWorker.c | 4 ++-- source/dnode/mgmt/qnode/src/qmWorker.c | 2 +- source/util/src/tworker.c | 2 +- 4 files changed, 4 insertions(+), 5 deletions(-) diff --git a/include/util/tqueue.h b/include/util/tqueue.h index e84875bd7d..3bccc7404b 100644 --- a/include/util/tqueue.h +++ b/include/util/tqueue.h @@ -44,7 +44,6 @@ typedef struct STaosQset STaosQset; typedef struct STaosQall STaosQall; typedef struct { void *ahandle; - int32_t qsize; int32_t workerId; int32_t threadNum; } SQueueInfo; diff --git a/source/dnode/mgmt/dnode/src/dmWorker.c b/source/dnode/mgmt/dnode/src/dmWorker.c index 79ed6d50c8..9929ebae4f 100644 --- a/source/dnode/mgmt/dnode/src/dmWorker.c +++ b/source/dnode/mgmt/dnode/src/dmWorker.c @@ -101,14 +101,14 @@ static void dmProcessQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) { int32_t dmStartWorker(SDnodeMgmt *pMgmt) { SQWorkerAllCfg mgmtCfg = { - .minNum = 0, .maxNum = 1, .name = "dnode-mgmt", .fp = (FItem)dmProcessQueue, .param = pMgmt}; + .minNum = 1, .maxNum = 1, .name = "dnode-mgmt", .fp = (FItem)dmProcessQueue, .param = pMgmt}; if (tQWorkerAllInit(&pMgmt->mgmtWorker, &mgmtCfg) != 0) { dError("failed to start dnode mgmt worker since %s", terrstr()); return -1; } SQWorkerAllCfg statusCfg = { - .minNum = 0, .maxNum = 1, .name = "dnode-status", .fp = (FItem)dmProcessQueue, .param = pMgmt}; + .minNum = 1, .maxNum = 1, .name = "dnode-status", .fp = (FItem)dmProcessQueue, .param = pMgmt}; if (tQWorkerAllInit(&pMgmt->statusWorker, &statusCfg) != 0) { dError("failed to start dnode status worker since %s", terrstr()); return -1; diff --git a/source/dnode/mgmt/qnode/src/qmWorker.c b/source/dnode/mgmt/qnode/src/qmWorker.c index 5f71837269..325eec7631 100644 --- a/source/dnode/mgmt/qnode/src/qmWorker.c +++ b/source/dnode/mgmt/qnode/src/qmWorker.c @@ -127,7 +127,7 @@ int32_t qmStartWorker(SQnodeMgmt *pMgmt) { .fp = (FItem)qmProcessFetchQueue, .param = pMgmt}; - if (tQWorkerAllInit(&pMgmt->queryWorker, &fetchCfg) != 0) { + if (tQWorkerAllInit(&pMgmt->fetchWorker, &fetchCfg) != 0) { dError("failed to start qnode-fetch worker since %s", terrstr()); return -1; } diff --git a/source/util/src/tworker.c b/source/util/src/tworker.c index 44fc2f9f93..f62a63132e 100644 --- a/source/util/src/tworker.c +++ b/source/util/src/tworker.c @@ -212,7 +212,7 @@ static void *tWWorkerThreadFp(SWWorker *worker) { if (fp != NULL) { SQueueInfo info = {.ahandle = ahandle, .workerId = worker->id, .threadNum = pool->num}; - (*fp)(ahandle, worker->qall, numOfMsgs); + (*fp)(&info, worker->qall, numOfMsgs); } } From 2af212f42da5054de5401e6f520f1ff01883dc68 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Tue, 22 Mar 2022 16:51:30 +0800 Subject: [PATCH 5/6] refact worker --- include/util/tworker.h | 16 ++++++------ source/dnode/mgmt/bnode/inc/bmInt.h | 2 +- source/dnode/mgmt/bnode/src/bmWorker.c | 8 +++--- source/dnode/mgmt/dnode/inc/dmInt.h | 4 +-- source/dnode/mgmt/dnode/src/dmWorker.c | 14 +++++------ source/dnode/mgmt/mnode/inc/mmInt.h | 6 ++--- source/dnode/mgmt/mnode/src/mmWorker.c | 18 +++++++------- source/dnode/mgmt/qnode/inc/qmInt.h | 4 +-- source/dnode/mgmt/qnode/src/qmWorker.c | 32 ++++++++++++------------ source/dnode/mgmt/snode/inc/smInt.h | 4 +-- source/dnode/mgmt/snode/src/smWorker.c | 34 +++++++++++++------------- source/dnode/mgmt/vnode/inc/vmInt.h | 2 +- source/dnode/mgmt/vnode/src/vmWorker.c | 24 ++++++------------ source/util/src/tworker.c | 8 +++--- 14 files changed, 84 insertions(+), 92 deletions(-) diff --git a/include/util/tworker.h b/include/util/tworker.h index f8ff45269b..92d474c885 100644 --- a/include/util/tworker.h +++ b/include/util/tworker.h @@ -74,31 +74,31 @@ typedef struct { int32_t maxNum; FItem fp; void *param; -} SQWorkerAllCfg; +} SSingleWorkerCfg; typedef struct { const char *name; STaosQueue *queue; SQWorkerPool pool; -} SQWorkerAll; +} SSingleWorker; typedef struct { const char *name; int32_t maxNum; FItems fp; void *param; -} SWWorkerAllCfg; +} SMultiWorkerCfg; typedef struct { const char *name; STaosQueue *queue; SWWorkerPool pool; -} SWWorkerAll; +} SMultiWorker; -int32_t tQWorkerAllInit(SQWorkerAll *pWorker, const SQWorkerAllCfg *pCfg); -void tQWorkerAllCleanup(SQWorkerAll *pWorker); -int32_t tWWorkerAllInit(SWWorkerAll *pWorker, const SWWorkerAllCfg *pCfg); -void tWWorkerAllCleanup(SWWorkerAll *pWorker); +int32_t tSingleWorkerInit(SSingleWorker *pWorker, const SSingleWorkerCfg *pCfg); +void tSingleWorkerCleanup(SSingleWorker *pWorker); +int32_t tMultiWorkerInit(SMultiWorker *pWorker, const SMultiWorkerCfg *pCfg); +void tMultiWorkerCleanup(SMultiWorker *pWorker); #ifdef __cplusplus } diff --git a/source/dnode/mgmt/bnode/inc/bmInt.h b/source/dnode/mgmt/bnode/inc/bmInt.h index fc6b7a999d..8cfff0f1f3 100644 --- a/source/dnode/mgmt/bnode/inc/bmInt.h +++ b/source/dnode/mgmt/bnode/inc/bmInt.h @@ -28,7 +28,7 @@ typedef struct SBnodeMgmt { SDnode *pDnode; SMgmtWrapper *pWrapper; const char *path; - SWWorkerAll writeWorker; + SMultiWorker writeWorker; } SBnodeMgmt; // bmInt.c diff --git a/source/dnode/mgmt/bnode/src/bmWorker.c b/source/dnode/mgmt/bnode/src/bmWorker.c index 42490d66cf..2099787c0d 100644 --- a/source/dnode/mgmt/bnode/src/bmWorker.c +++ b/source/dnode/mgmt/bnode/src/bmWorker.c @@ -64,15 +64,15 @@ static void bmProcessQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs } int32_t bmProcessWriteMsg(SBnodeMgmt *pMgmt, SNodeMsg *pMsg) { - SWWorkerAll *pWorker = &pMgmt->writeWorker; + SMultiWorker *pWorker = &pMgmt->writeWorker; dTrace("msg:%p, put into worker:%s", pMsg, pWorker->name); return taosWriteQitem(pWorker->queue, pMsg); } int32_t bmStartWorker(SBnodeMgmt *pMgmt) { - SWWorkerAllCfg cfg = {.maxNum = 1, .name = "bnode-write", .fp = (FItems)bmProcessQueue, .param = pMgmt}; - if (tWWorkerAllInit(&pMgmt->writeWorker, &cfg) != 0) { + SMultiWorkerCfg cfg = {.maxNum = 1, .name = "bnode-write", .fp = (FItems)bmProcessQueue, .param = pMgmt}; + if (tMultiWorkerInit(&pMgmt->writeWorker, &cfg) != 0) { dError("failed to start bnode write worker since %s", terrstr()); return -1; } @@ -80,4 +80,4 @@ int32_t bmStartWorker(SBnodeMgmt *pMgmt) { return 0; } -void bmStopWorker(SBnodeMgmt *pMgmt) { tWWorkerAllCleanup(&pMgmt->writeWorker); } +void bmStopWorker(SBnodeMgmt *pMgmt) { tMultiWorkerCleanup(&pMgmt->writeWorker); } diff --git a/source/dnode/mgmt/dnode/inc/dmInt.h b/source/dnode/mgmt/dnode/inc/dmInt.h index 0330b7f996..b02b1d2297 100644 --- a/source/dnode/mgmt/dnode/inc/dmInt.h +++ b/source/dnode/mgmt/dnode/inc/dmInt.h @@ -31,8 +31,8 @@ typedef struct SDnodeMgmt { SArray *dnodeEps; TdThread *threadId; SRWLatch latch; - SQWorkerAll mgmtWorker; - SQWorkerAll statusWorker; + SSingleWorker mgmtWorker; + SSingleWorker statusWorker; const char *path; SDnode *pDnode; SMgmtWrapper *pWrapper; diff --git a/source/dnode/mgmt/dnode/src/dmWorker.c b/source/dnode/mgmt/dnode/src/dmWorker.c index 9929ebae4f..b62c18655a 100644 --- a/source/dnode/mgmt/dnode/src/dmWorker.c +++ b/source/dnode/mgmt/dnode/src/dmWorker.c @@ -100,16 +100,16 @@ static void dmProcessQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) { } int32_t dmStartWorker(SDnodeMgmt *pMgmt) { - SQWorkerAllCfg mgmtCfg = { + SSingleWorkerCfg mgmtCfg = { .minNum = 1, .maxNum = 1, .name = "dnode-mgmt", .fp = (FItem)dmProcessQueue, .param = pMgmt}; - if (tQWorkerAllInit(&pMgmt->mgmtWorker, &mgmtCfg) != 0) { + if (tSingleWorkerInit(&pMgmt->mgmtWorker, &mgmtCfg) != 0) { dError("failed to start dnode mgmt worker since %s", terrstr()); return -1; } - SQWorkerAllCfg statusCfg = { + SSingleWorkerCfg statusCfg = { .minNum = 1, .maxNum = 1, .name = "dnode-status", .fp = (FItem)dmProcessQueue, .param = pMgmt}; - if (tQWorkerAllInit(&pMgmt->statusWorker, &statusCfg) != 0) { + if (tSingleWorkerInit(&pMgmt->statusWorker, &statusCfg) != 0) { dError("failed to start dnode status worker since %s", terrstr()); return -1; } @@ -129,8 +129,8 @@ int32_t dmStartThread(SDnodeMgmt *pMgmt) { } void dmStopWorker(SDnodeMgmt *pMgmt) { - tQWorkerAllCleanup(&pMgmt->mgmtWorker); - tQWorkerAllCleanup(&pMgmt->statusWorker); + tSingleWorkerCleanup(&pMgmt->mgmtWorker); + tSingleWorkerCleanup(&pMgmt->statusWorker); if (pMgmt->threadId != NULL) { taosDestoryThread(pMgmt->threadId); @@ -139,7 +139,7 @@ void dmStopWorker(SDnodeMgmt *pMgmt) { } int32_t dmProcessMgmtMsg(SDnodeMgmt *pMgmt, SNodeMsg *pMsg) { - SQWorkerAll *pWorker = &pMgmt->mgmtWorker; + SSingleWorker *pWorker = &pMgmt->mgmtWorker; if (pMsg->rpcMsg.msgType == TDMT_MND_STATUS_RSP) { pWorker = &pMgmt->statusWorker; } diff --git a/source/dnode/mgmt/mnode/inc/mmInt.h b/source/dnode/mgmt/mnode/inc/mmInt.h index 06ed637791..d57088474f 100644 --- a/source/dnode/mgmt/mnode/inc/mmInt.h +++ b/source/dnode/mgmt/mnode/inc/mmInt.h @@ -28,9 +28,9 @@ typedef struct SMnodeMgmt { SDnode *pDnode; SMgmtWrapper *pWrapper; const char *path; - SQWorkerAll readWorker; - SQWorkerAll writeWorker; - SQWorkerAll syncWorker; + SSingleWorker readWorker; + SSingleWorker writeWorker; + SSingleWorker syncWorker; SReplica replicas[TSDB_MAX_REPLICA]; int8_t replica; int8_t selfIndex; diff --git a/source/dnode/mgmt/mnode/src/mmWorker.c b/source/dnode/mgmt/mnode/src/mmWorker.c index 9f64bec7c5..b9a3a4f14c 100644 --- a/source/dnode/mgmt/mnode/src/mmWorker.c +++ b/source/dnode/mgmt/mnode/src/mmWorker.c @@ -44,7 +44,7 @@ static void mmProcessQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) { taosFreeQitem(pMsg); } -static int32_t mmPutMsgToWorker(SMnodeMgmt *pMgmt, SQWorkerAll *pWorker, SNodeMsg *pMsg) { +static int32_t mmPutMsgToWorker(SMnodeMgmt *pMgmt, SSingleWorker *pWorker, SNodeMsg *pMsg) { dTrace("msg:%p, put into worker %s", pMsg, pWorker->name); return taosWriteQitem(pWorker->queue, pMsg); } @@ -61,7 +61,7 @@ int32_t mmProcessReadMsg(SMnodeMgmt *pMgmt, SNodeMsg *pMsg) { return mmPutMsgToWorker(pMgmt, &pMgmt->readWorker, pMsg); } -static int32_t mmPutRpcMsgToWorker(SMnodeMgmt *pMgmt, SQWorkerAll *pWorker, SRpcMsg *pRpc) { +static int32_t mmPutRpcMsgToWorker(SMnodeMgmt *pMgmt, SSingleWorker *pWorker, SRpcMsg *pRpc) { SNodeMsg *pMsg = taosAllocateQitem(sizeof(SNodeMsg)); if (pMsg == NULL) { return -1; @@ -91,19 +91,19 @@ int32_t mmPutMsgToReadQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc) { } int32_t mmStartWorker(SMnodeMgmt *pMgmt) { - SQWorkerAllCfg cfg = {.minNum = 0, .maxNum = 1, .name = "mnode-read", .fp = (FItem)mmProcessQueue, .param = pMgmt}; + SSingleWorkerCfg cfg = {.minNum = 0, .maxNum = 1, .name = "mnode-read", .fp = (FItem)mmProcessQueue, .param = pMgmt}; - if (tQWorkerAllInit(&pMgmt->readWorker, &cfg) != 0) { + if (tSingleWorkerInit(&pMgmt->readWorker, &cfg) != 0) { dError("failed to start mnode-read worker since %s", terrstr()); return -1; } - if (tQWorkerAllInit(&pMgmt->writeWorker, &cfg) != 0) { + if (tSingleWorkerInit(&pMgmt->writeWorker, &cfg) != 0) { dError("failed to start mnode-write worker since %s", terrstr()); return -1; } - if (tQWorkerAllInit(&pMgmt->syncWorker, &cfg) != 0) { + if (tSingleWorkerInit(&pMgmt->syncWorker, &cfg) != 0) { dError("failed to start mnode sync-worker since %s", terrstr()); return -1; } @@ -112,7 +112,7 @@ int32_t mmStartWorker(SMnodeMgmt *pMgmt) { } void mmStopWorker(SMnodeMgmt *pMgmt) { - tQWorkerAllCleanup(&pMgmt->readWorker); - tQWorkerAllCleanup(&pMgmt->writeWorker); - tQWorkerAllCleanup(&pMgmt->syncWorker); + tSingleWorkerCleanup(&pMgmt->readWorker); + tSingleWorkerCleanup(&pMgmt->writeWorker); + tSingleWorkerCleanup(&pMgmt->syncWorker); } diff --git a/source/dnode/mgmt/qnode/inc/qmInt.h b/source/dnode/mgmt/qnode/inc/qmInt.h index 3f1f62ebd5..52d23a445c 100644 --- a/source/dnode/mgmt/qnode/inc/qmInt.h +++ b/source/dnode/mgmt/qnode/inc/qmInt.h @@ -28,8 +28,8 @@ typedef struct SQnodeMgmt { SDnode *pDnode; SMgmtWrapper *pWrapper; const char *path; - SQWorkerAll queryWorker; - SQWorkerAll fetchWorker; + SSingleWorker queryWorker; + SSingleWorker fetchWorker; } SQnodeMgmt; // qmInt.c diff --git a/source/dnode/mgmt/qnode/src/qmWorker.c b/source/dnode/mgmt/qnode/src/qmWorker.c index 325eec7631..fff469a902 100644 --- a/source/dnode/mgmt/qnode/src/qmWorker.c +++ b/source/dnode/mgmt/qnode/src/qmWorker.c @@ -49,7 +49,7 @@ static void qmProcessFetchQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) { taosFreeQitem(pMsg); } -static int32_t qmPutMsgToWorker(SQWorkerAll *pWorker, SNodeMsg *pMsg) { +static int32_t qmPutMsgToWorker(SSingleWorker *pWorker, SNodeMsg *pMsg) { dTrace("msg:%p, put into worker %s", pMsg, pWorker->name); return taosWriteQitem(pWorker->queue, pMsg); } @@ -58,7 +58,7 @@ int32_t qmProcessQueryMsg(SQnodeMgmt *pMgmt, SNodeMsg *pMsg) { return qmPutMsgTo int32_t qmProcessFetchMsg(SQnodeMgmt *pMgmt, SNodeMsg *pMsg) { return qmPutMsgToWorker(&pMgmt->fetchWorker, pMsg); } -static int32_t qmPutRpcMsgToWorker(SQnodeMgmt *pMgmt, SQWorkerAll *pWorker, SRpcMsg *pRpc) { +static int32_t qmPutRpcMsgToWorker(SQnodeMgmt *pMgmt, SSingleWorker *pWorker, SRpcMsg *pRpc) { SNodeMsg *pMsg = taosAllocateQitem(sizeof(SNodeMsg)); if (pMsg == NULL) { return -1; @@ -110,24 +110,24 @@ int32_t qmStartWorker(SQnodeMgmt *pMgmt) { int32_t minQueryThreads = TMAX((int32_t)(tsNumOfCores * tsRatioOfQueryCores), 1); int32_t maxQueryThreads = minQueryThreads; - SQWorkerAllCfg queryCfg = {.minNum = minQueryThreads, - .maxNum = maxQueryThreads, - .name = "qnode-query", - .fp = (FItem)qmProcessQueryQueue, - .param = pMgmt}; + SSingleWorkerCfg queryCfg = {.minNum = minQueryThreads, + .maxNum = maxQueryThreads, + .name = "qnode-query", + .fp = (FItem)qmProcessQueryQueue, + .param = pMgmt}; - if (tQWorkerAllInit(&pMgmt->queryWorker, &queryCfg) != 0) { + if (tSingleWorkerInit(&pMgmt->queryWorker, &queryCfg) != 0) { dError("failed to start qnode-query worker since %s", terrstr()); return -1; } - SQWorkerAllCfg fetchCfg = {.minNum = minFetchThreads, - .maxNum = maxFetchThreads, - .name = "qnode-fetch", - .fp = (FItem)qmProcessFetchQueue, - .param = pMgmt}; + SSingleWorkerCfg fetchCfg = {.minNum = minFetchThreads, + .maxNum = maxFetchThreads, + .name = "qnode-fetch", + .fp = (FItem)qmProcessFetchQueue, + .param = pMgmt}; - if (tQWorkerAllInit(&pMgmt->fetchWorker, &fetchCfg) != 0) { + if (tSingleWorkerInit(&pMgmt->fetchWorker, &fetchCfg) != 0) { dError("failed to start qnode-fetch worker since %s", terrstr()); return -1; } @@ -136,6 +136,6 @@ int32_t qmStartWorker(SQnodeMgmt *pMgmt) { } void qmStopWorker(SQnodeMgmt *pMgmt) { - tQWorkerAllCleanup(&pMgmt->queryWorker); - tQWorkerAllCleanup(&pMgmt->fetchWorker); + tSingleWorkerCleanup(&pMgmt->queryWorker); + tSingleWorkerCleanup(&pMgmt->fetchWorker); } diff --git a/source/dnode/mgmt/snode/inc/smInt.h b/source/dnode/mgmt/snode/inc/smInt.h index 744089efae..f2b510483c 100644 --- a/source/dnode/mgmt/snode/inc/smInt.h +++ b/source/dnode/mgmt/snode/inc/smInt.h @@ -30,8 +30,8 @@ typedef struct SSnodeMgmt { const char *path; SRWLatch latch; int8_t uniqueWorkerInUse; - SArray *uniqueWorkers; // SArray - SQWorkerAll sharedWorker; + SArray *uniqueWorkers; // SArray + SSingleWorker sharedWorker; } SSnodeMgmt; // smInt.c diff --git a/source/dnode/mgmt/snode/src/smWorker.c b/source/dnode/mgmt/snode/src/smWorker.c index c008d8175f..ceec6b82c3 100644 --- a/source/dnode/mgmt/snode/src/smWorker.c +++ b/source/dnode/mgmt/snode/src/smWorker.c @@ -44,22 +44,22 @@ static void smProcessSharedQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) { } int32_t smStartWorker(SSnodeMgmt *pMgmt) { - pMgmt->uniqueWorkers = taosArrayInit(0, sizeof(SWWorkerAll *)); + pMgmt->uniqueWorkers = taosArrayInit(0, sizeof(SMultiWorker *)); if (pMgmt->uniqueWorkers == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } for (int32_t i = 0; i < SND_UNIQUE_THREAD_NUM; i++) { - SWWorkerAll *pUniqueWorker = malloc(sizeof(SWWorkerAll)); + SMultiWorker *pUniqueWorker = malloc(sizeof(SMultiWorker)); if (pUniqueWorker == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } - SWWorkerAllCfg cfg = {.maxNum = 1, .name = "snode-unique", .fp = smProcessUniqueQueue, .param = pMgmt}; + SMultiWorkerCfg cfg = {.maxNum = 1, .name = "snode-unique", .fp = smProcessUniqueQueue, .param = pMgmt}; - if (tWWorkerAllInit(pUniqueWorker, &cfg) != 0) { + if (tMultiWorkerInit(pUniqueWorker, &cfg) != 0) { dError("failed to start snode-unique worker since %s", terrstr()); return -1; } @@ -69,13 +69,13 @@ int32_t smStartWorker(SSnodeMgmt *pMgmt) { } } - SQWorkerAllCfg cfg = {.minNum = SND_SHARED_THREAD_NUM, - .maxNum = SND_SHARED_THREAD_NUM, - .name = "snode-shared", - .fp = (FItem)smProcessSharedQueue, - .param = pMgmt}; + SSingleWorkerCfg cfg = {.minNum = SND_SHARED_THREAD_NUM, + .maxNum = SND_SHARED_THREAD_NUM, + .name = "snode-shared", + .fp = (FItem)smProcessSharedQueue, + .param = pMgmt}; - if (tQWorkerAllInit(&pMgmt->sharedWorker, &cfg)) { + if (tSingleWorkerInit(&pMgmt->sharedWorker, &cfg)) { dError("failed to start snode shared-worker since %s", terrstr()); return -1; } @@ -85,11 +85,11 @@ int32_t smStartWorker(SSnodeMgmt *pMgmt) { void smStopWorker(SSnodeMgmt *pMgmt) { for (int32_t i = 0; i < taosArrayGetSize(pMgmt->uniqueWorkers); i++) { - SWWorkerAll *pWorker = taosArrayGetP(pMgmt->uniqueWorkers, i); - tWWorkerAllCleanup(pWorker); + SMultiWorker *pWorker = taosArrayGetP(pMgmt->uniqueWorkers, i); + tMultiWorkerCleanup(pWorker); } taosArrayDestroy(pMgmt->uniqueWorkers); - tQWorkerAllCleanup(&pMgmt->sharedWorker); + tSingleWorkerCleanup(&pMgmt->sharedWorker); } static FORCE_INLINE int32_t smGetSWIdFromMsg(SRpcMsg *pMsg) { @@ -105,7 +105,7 @@ static FORCE_INLINE int32_t smGetSWTypeFromMsg(SRpcMsg *pMsg) { } int32_t smProcessMgmtMsg(SSnodeMgmt *pMgmt, SNodeMsg *pMsg) { - SWWorkerAll *pWorker = taosArrayGetP(pMgmt->uniqueWorkers, 0); + SMultiWorker *pWorker = taosArrayGetP(pMgmt->uniqueWorkers, 0); if (pWorker == NULL) { terrno = TSDB_CODE_INVALID_MSG; return -1; @@ -116,8 +116,8 @@ int32_t smProcessMgmtMsg(SSnodeMgmt *pMgmt, SNodeMsg *pMsg) { } int32_t smProcessUniqueMsg(SSnodeMgmt *pMgmt, SNodeMsg *pMsg) { - int32_t index = smGetSWIdFromMsg(&pMsg->rpcMsg); - SWWorkerAll *pWorker = taosArrayGetP(pMgmt->uniqueWorkers, index); + int32_t index = smGetSWIdFromMsg(&pMsg->rpcMsg); + SMultiWorker *pWorker = taosArrayGetP(pMgmt->uniqueWorkers, index); if (pWorker == NULL) { terrno = TSDB_CODE_INVALID_MSG; return -1; @@ -128,7 +128,7 @@ int32_t smProcessUniqueMsg(SSnodeMgmt *pMgmt, SNodeMsg *pMsg) { } int32_t smProcessSharedMsg(SSnodeMgmt *pMgmt, SNodeMsg *pMsg) { - SQWorkerAll *pWorker = &pMgmt->sharedWorker; + SSingleWorker *pWorker = &pMgmt->sharedWorker; dTrace("msg:%p, put into worker:%s", pMsg, pWorker->name); return taosWriteQitem(pWorker->queue, pMsg); diff --git a/source/dnode/mgmt/vnode/inc/vmInt.h b/source/dnode/mgmt/vnode/inc/vmInt.h index ce1a004b65..ccdb1ae257 100644 --- a/source/dnode/mgmt/vnode/inc/vmInt.h +++ b/source/dnode/mgmt/vnode/inc/vmInt.h @@ -36,7 +36,7 @@ typedef struct SVnodesMgmt { const char *path; SDnode *pDnode; SMgmtWrapper *pWrapper; - SQWorkerAll mgmtWorker; + SSingleWorker mgmtWorker; } SVnodesMgmt; typedef struct { diff --git a/source/dnode/mgmt/vnode/src/vmWorker.c b/source/dnode/mgmt/vnode/src/vmWorker.c index 73aff7f66d..6c7d513c58 100644 --- a/source/dnode/mgmt/vnode/src/vmWorker.c +++ b/source/dnode/mgmt/vnode/src/vmWorker.c @@ -200,24 +200,16 @@ static int32_t vmPutNodeMsgToQueue(SVnodesMgmt *pMgmt, SNodeMsg *pMsg, EQueueTyp return code; } -int32_t vmProcessSyncMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) { - return vmPutNodeMsgToQueue(pMgmt, pMsg, SYNC_QUEUE); -} +int32_t vmProcessSyncMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) { return vmPutNodeMsgToQueue(pMgmt, pMsg, SYNC_QUEUE); } -int32_t vmProcessWriteMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) { - return vmPutNodeMsgToQueue(pMgmt, pMsg, WRITE_QUEUE); -} +int32_t vmProcessWriteMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) { return vmPutNodeMsgToQueue(pMgmt, pMsg, WRITE_QUEUE); } -int32_t vmProcessQueryMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) { - return vmPutNodeMsgToQueue(pMgmt, pMsg, QUERY_QUEUE); -} +int32_t vmProcessQueryMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) { return vmPutNodeMsgToQueue(pMgmt, pMsg, QUERY_QUEUE); } -int32_t vmProcessFetchMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) { - return vmPutNodeMsgToQueue(pMgmt, pMsg, FETCH_QUEUE); -} +int32_t vmProcessFetchMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) { return vmPutNodeMsgToQueue(pMgmt, pMsg, FETCH_QUEUE); } int32_t vmProcessMgmtMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) { - SQWorkerAll *pWorker = &pMgmt->mgmtWorker; + SSingleWorker *pWorker = &pMgmt->mgmtWorker; dTrace("msg:%p, will be written to vnode-mgmt queue, worker:%s", pMsg, pWorker->name); return taosWriteQitem(pWorker->queue, pMsg); } @@ -357,9 +349,9 @@ int32_t vmStartWorker(SVnodesMgmt *pMgmt) { pWPool->max = maxSyncThreads; if (tWWorkerInit(pWPool) != 0) return -1; - SQWorkerAllCfg cfg = { + SSingleWorkerCfg cfg = { .minNum = 1, .maxNum = 1, .name = "vnode-mgmt", .fp = (FItem)vmProcessMgmtQueue, .param = pMgmt}; - if (tQWorkerAllInit(&pMgmt->mgmtWorker, &cfg) != 0) { + if (tSingleWorkerInit(&pMgmt->mgmtWorker, &cfg) != 0) { dError("failed to start vnode-mgmt worker since %s", terrstr()); return -1; } @@ -369,7 +361,7 @@ int32_t vmStartWorker(SVnodesMgmt *pMgmt) { } void vmStopWorker(SVnodesMgmt *pMgmt) { - tQWorkerAllCleanup(&pMgmt->mgmtWorker); + tSingleWorkerCleanup(&pMgmt->mgmtWorker); tQWorkerCleanup(&pMgmt->fetchPool); tQWorkerCleanup(&pMgmt->queryPool); tWWorkerCleanup(&pMgmt->writePool); diff --git a/source/util/src/tworker.c b/source/util/src/tworker.c index f62a63132e..044896d7a5 100644 --- a/source/util/src/tworker.c +++ b/source/util/src/tworker.c @@ -284,7 +284,7 @@ void tWWorkerFreeQueue(SWWorkerPool *pool, STaosQueue *queue) { uDebug("worker:%s, queue:%p is freed", pool->name, queue); } -int32_t tQWorkerAllInit(SQWorkerAll *pWorker, const SQWorkerAllCfg *pCfg) { +int32_t tSingleWorkerInit(SSingleWorker *pWorker, const SSingleWorkerCfg *pCfg) { SQWorkerPool *pPool = &pWorker->pool; pPool->name = pCfg->name; pPool->min = pCfg->minNum; @@ -302,7 +302,7 @@ int32_t tQWorkerAllInit(SQWorkerAll *pWorker, const SQWorkerAllCfg *pCfg) { return 0; } -void tQWorkerAllCleanup(SQWorkerAll *pWorker) { +void tSingleWorkerCleanup(SSingleWorker *pWorker) { if (pWorker->queue == NULL) return; while (!taosQueueEmpty(pWorker->queue)) { @@ -313,7 +313,7 @@ void tQWorkerAllCleanup(SQWorkerAll *pWorker) { tQWorkerFreeQueue(&pWorker->pool, pWorker->queue); } -int32_t tWWorkerAllInit(SWWorkerAll *pWorker, const SWWorkerAllCfg *pCfg) { +int32_t tMultiWorkerInit(SMultiWorker *pWorker, const SMultiWorkerCfg *pCfg) { SWWorkerPool *pPool = &pWorker->pool; pPool->name = pCfg->name; pPool->max = pCfg->maxNum; @@ -330,7 +330,7 @@ int32_t tWWorkerAllInit(SWWorkerAll *pWorker, const SWWorkerAllCfg *pCfg) { return 0; } -void tWWorkerAllCleanup(SWWorkerAll *pWorker) { +void tMultiWorkerCleanup(SMultiWorker *pWorker) { if (pWorker->queue == NULL) return; while (!taosQueueEmpty(pWorker->queue)) { From 20017d82b3300bbbd4fffb859ff66df3314d7492 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Tue, 22 Mar 2022 18:06:10 +0800 Subject: [PATCH 6/6] minor changes --- tests/script/jenkins/basic.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/script/jenkins/basic.txt b/tests/script/jenkins/basic.txt index cc0576e1f3..74d174572e 100644 --- a/tests/script/jenkins/basic.txt +++ b/tests/script/jenkins/basic.txt @@ -17,7 +17,7 @@ ./test.sh -f tsim/dnode/basic1.sim # ---- insert -./test.sh -f tsim/insert/basic0.sim +# ./test.sh -f tsim/insert/basic0.sim # ---- query ./test.sh -f tsim/query/interval.sim