diff --git a/include/common/tmsgcb.h b/include/common/tmsgcb.h index 5ce4b16300..54a145ff33 100644 --- a/include/common/tmsgcb.h +++ b/include/common/tmsgcb.h @@ -25,23 +25,25 @@ extern "C" { typedef struct SRpcMsg SRpcMsg; typedef struct SEpSet SEpSet; typedef struct SMgmtWrapper SMgmtWrapper; +typedef enum { QUERY_QUEUE, FETCH_QUEUE, WRITE_QUEUE, APPLY_QUEUE, SYNC_QUEUE, QUEUE_MAX } EQueueType; typedef int32_t (*PutToQueueFp)(SMgmtWrapper* pWrapper, SRpcMsg* pReq); +typedef int32_t (*GetQueueSizeFp)(SMgmtWrapper* pWrapper, int32_t vgId, EQueueType qtype); typedef int32_t (*SendReqFp)(SMgmtWrapper* pWrapper, SEpSet* epSet, SRpcMsg* pReq); typedef int32_t (*SendMnodeReqFp)(SMgmtWrapper* pWrapper, SRpcMsg* pReq); typedef void (*SendRspFp)(SMgmtWrapper* pWrapper, SRpcMsg* pRsp); -typedef enum { QUERY_QUEUE, FETCH_QUEUE, WRITE_QUEUE, APPLY_QUEUE, SYNC_QUEUE, QUEUE_MAX } EQueueType; - typedef struct { SMgmtWrapper* pWrapper; PutToQueueFp queueFps[QUEUE_MAX]; + GetQueueSizeFp qsizeFp; SendReqFp sendReqFp; SendMnodeReqFp sendMnodeReqFp; SendRspFp sendRspFp; } SMsgCb; int32_t tmsgPutToQueue(const SMsgCb* pMsgCb, EQueueType qtype, SRpcMsg* pReq); +int32_t tmsgGetQueueSize(const SMsgCb* pMsgCb, int32_t vgId, EQueueType qtype); int32_t tmsgSendReq(const SMsgCb* pMsgCb, SEpSet* epSet, SRpcMsg* pReq); int32_t tmsgSendMnodeReq(const SMsgCb* pMsgCb, SRpcMsg* pReq); void tmsgSendRsp(const SMsgCb* pMsgCb, SRpcMsg* pRsp); diff --git a/include/util/tqueue.h b/include/util/tqueue.h index 9faf90113e..3bccc7404b 100644 --- a/include/util/tqueue.h +++ b/include/util/tqueue.h @@ -42,8 +42,14 @@ shall be used to set up the protection. typedef struct STaosQueue STaosQueue; typedef struct STaosQset STaosQset; typedef struct STaosQall STaosQall; -typedef void (*FItem)(void *ahandle, void *pItem); -typedef void (*FItems)(void *ahandle, STaosQall *qall, int32_t numOfItems); +typedef struct { + void *ahandle; + int32_t workerId; + int32_t threadNum; +} SQueueInfo; + +typedef void (*FItem)(SQueueInfo *pInfo, void *pItem); +typedef void (*FItems)(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfItems); STaosQueue *taosOpenQueue(); void taosCloseQueue(STaosQueue *queue); diff --git a/include/util/tworker.h b/include/util/tworker.h index 91f4fbf7ff..92d474c885 100644 --- a/include/util/tworker.h +++ b/include/util/tworker.h @@ -50,7 +50,8 @@ typedef struct SWWorker { } SWWorker; typedef struct SWWorkerPool { - int32_t max; // max number of workers + int32_t max; // max number of workers + int32_t num; int32_t nextId; // from 0 to max-1, cyclic const char *name; SWWorker *workers; @@ -73,31 +74,31 @@ typedef struct { int32_t maxNum; FItem fp; void *param; -} SQWorkerAllCfg; +} SSingleWorkerCfg; typedef struct { const char *name; STaosQueue *queue; SQWorkerPool pool; -} SQWorkerAll; +} SSingleWorker; typedef struct { const char *name; int32_t maxNum; FItems fp; void *param; -} SWWorkerAllCfg; +} SMultiWorkerCfg; typedef struct { const char *name; STaosQueue *queue; SWWorkerPool pool; -} SWWorkerAll; +} SMultiWorker; -int32_t tQWorkerAllInit(SQWorkerAll *pWorker, const SQWorkerAllCfg *pCfg); -void tQWorkerAllCleanup(SQWorkerAll *pWorker); -int32_t tWWorkerAllInit(SWWorkerAll *pWorker, const SWWorkerAllCfg *pCfg); -void tWWorkerAllCleanup(SWWorkerAll *pWorker); +int32_t tSingleWorkerInit(SSingleWorker *pWorker, const SSingleWorkerCfg *pCfg); +void tSingleWorkerCleanup(SSingleWorker *pWorker); +int32_t tMultiWorkerInit(SMultiWorker *pWorker, const SMultiWorkerCfg *pCfg); +void tMultiWorkerCleanup(SMultiWorker *pWorker); #ifdef __cplusplus } diff --git a/source/common/src/tmsgcb.c b/source/common/src/tmsgcb.c index b13f727db6..98ee1b679d 100644 --- a/source/common/src/tmsgcb.c +++ b/source/common/src/tmsgcb.c @@ -20,6 +20,10 @@ int32_t tmsgPutToQueue(const SMsgCb* pMsgCb, EQueueType qtype, SRpcMsg* pReq) { return (*pMsgCb->queueFps[qtype])(pMsgCb->pWrapper, pReq); } +int32_t tmsgGetQueueSize(const SMsgCb* pMsgCb, int32_t vgId, EQueueType qtype) { + return (*pMsgCb->qsizeFp)(pMsgCb->pWrapper, vgId, qtype); +} + int32_t tmsgSendReq(const SMsgCb* pMsgCb, SEpSet* epSet, SRpcMsg* pReq) { return (*pMsgCb->sendReqFp)(pMsgCb->pWrapper, epSet, pReq); } diff --git a/source/dnode/mgmt/bnode/inc/bmInt.h b/source/dnode/mgmt/bnode/inc/bmInt.h index ddbf7f4c4e..8cfff0f1f3 100644 --- a/source/dnode/mgmt/bnode/inc/bmInt.h +++ b/source/dnode/mgmt/bnode/inc/bmInt.h @@ -28,7 +28,7 @@ typedef struct SBnodeMgmt { SDnode *pDnode; SMgmtWrapper *pWrapper; const char *path; - SDnodeWorker writeWorker; + SMultiWorker writeWorker; } SBnodeMgmt; // bmInt.c diff --git a/source/dnode/mgmt/bnode/src/bmWorker.c b/source/dnode/mgmt/bnode/src/bmWorker.c index c8ad137842..2099787c0d 100644 --- a/source/dnode/mgmt/bnode/src/bmWorker.c +++ b/source/dnode/mgmt/bnode/src/bmWorker.c @@ -33,7 +33,8 @@ static void bmSendErrorRsps(SMgmtWrapper *pWrapper, STaosQall *qall, int32_t num } } -static void bmProcessQueue(SBnodeMgmt *pMgmt, STaosQall *qall, int32_t numOfMsgs) { +static void bmProcessQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) { + SBnodeMgmt *pMgmt = pInfo->ahandle; SMgmtWrapper *pWrapper = pMgmt->pWrapper; SArray *pArray = taosArrayInit(numOfMsgs, sizeof(SNodeMsg *)); @@ -63,14 +64,15 @@ static void bmProcessQueue(SBnodeMgmt *pMgmt, STaosQall *qall, int32_t numOfMsgs } int32_t bmProcessWriteMsg(SBnodeMgmt *pMgmt, SNodeMsg *pMsg) { - SDnodeWorker *pWorker = &pMgmt->writeWorker; + SMultiWorker *pWorker = &pMgmt->writeWorker; dTrace("msg:%p, put into worker:%s", pMsg, pWorker->name); - return dndWriteMsgToWorker(pWorker, pMsg); + return taosWriteQitem(pWorker->queue, pMsg); } int32_t bmStartWorker(SBnodeMgmt *pMgmt) { - if (dndInitWorker(pMgmt, &pMgmt->writeWorker, DND_WORKER_MULTI, "bnode-write", 0, 1, bmProcessQueue) != 0) { + SMultiWorkerCfg cfg = {.maxNum = 1, .name = "bnode-write", .fp = (FItems)bmProcessQueue, .param = pMgmt}; + if (tMultiWorkerInit(&pMgmt->writeWorker, &cfg) != 0) { dError("failed to start bnode write worker since %s", terrstr()); return -1; } @@ -78,4 +80,4 @@ int32_t bmStartWorker(SBnodeMgmt *pMgmt) { return 0; } -void bmStopWorker(SBnodeMgmt *pMgmt) { dndCleanupWorker(&pMgmt->writeWorker); } +void bmStopWorker(SBnodeMgmt *pMgmt) { tMultiWorkerCleanup(&pMgmt->writeWorker); } diff --git a/source/dnode/mgmt/container/inc/dnd.h b/source/dnode/mgmt/container/inc/dnd.h index f9fa896f42..af552b5357 100644 --- a/source/dnode/mgmt/container/inc/dnd.h +++ b/source/dnode/mgmt/container/inc/dnd.h @@ -74,20 +74,6 @@ typedef int32_t (*CreateNodeFp)(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); typedef int32_t (*DropNodeFp)(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); typedef int32_t (*RequireNodeFp)(SMgmtWrapper *pWrapper, bool *required); -typedef struct { - EWorkerType type; - const char *name; - int32_t minNum; - int32_t maxNum; - void *queueFp; - void *param; - STaosQueue *queue; - union { - SQWorkerPool pool; - SWWorkerPool mpool; - }; -} SDnodeWorker; - typedef struct SMsgHandle { int32_t vgId; NodeMsgFp vgIdMsgFp; @@ -161,11 +147,6 @@ int32_t dndSendReqToMnode(SMgmtWrapper *pWrapper, SRpcMsg *pMsg); int32_t dndSendReqToDnode(SMgmtWrapper *pWrapper, SEpSet *pEpSet, SRpcMsg *pMsg); void dndSendRsp(SMgmtWrapper *pWrapper, SRpcMsg *pRsp); -int32_t dndInitWorker(void *param, SDnodeWorker *pWorker, EWorkerType type, const char *name, int32_t minNum, - int32_t maxNum, void *queueFp); -void dndCleanupWorker(SDnodeWorker *pWorker); -int32_t dndWriteMsgToWorker(SDnodeWorker *pWorker, void *pMsg); - int32_t dndProcessNodeMsg(SDnode *pDnode, SNodeMsg *pMsg); int32_t dndReadFile(SMgmtWrapper *pWrapper, bool *pDeployed); diff --git a/source/dnode/mgmt/container/src/dndWorker.c b/source/dnode/mgmt/container/src/dndWorker.c deleted file mode 100644 index 5d99ec5f0d..0000000000 --- a/source/dnode/mgmt/container/src/dndWorker.c +++ /dev/null @@ -1,96 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#define _DEFAULT_SOURCE -#include "dndInt.h" - -int32_t dndInitWorker(void *param, SDnodeWorker *pWorker, EWorkerType type, const char *name, int32_t minNum, - int32_t maxNum, void *queueFp) { - if (pWorker == NULL || name == NULL || minNum < 0 || maxNum <= 0 || queueFp == NULL) { - terrno = TSDB_CODE_INVALID_PARA; - return -1; - } - - pWorker->type = type; - pWorker->name = name; - pWorker->minNum = minNum; - pWorker->maxNum = maxNum; - pWorker->queueFp = queueFp; - pWorker->param = param; - - if (pWorker->type == DND_WORKER_SINGLE) { - SQWorkerPool *pPool = &pWorker->pool; - pPool->name = name; - pPool->min = minNum; - pPool->max = maxNum; - if (tQWorkerInit(pPool) != 0) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } - pWorker->queue = tQWorkerAllocQueue(pPool, param, (FItem)queueFp); - if (pWorker->queue == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } - } else if (pWorker->type == DND_WORKER_MULTI) { - SWWorkerPool *pPool = &pWorker->mpool; - pPool->name = name; - pPool->max = maxNum; - if (tWWorkerInit(pPool) != 0) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } - pWorker->queue = tWWorkerAllocQueue(pPool, param, (FItems)queueFp); - if (pWorker->queue == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } - } else { - terrno = TSDB_CODE_INVALID_PARA; - } - - return 0; -} - -void dndCleanupWorker(SDnodeWorker *pWorker) { - if (pWorker->queue == NULL) return; - - while (!taosQueueEmpty(pWorker->queue)) { - taosMsleep(10); - } - - if (pWorker->type == DND_WORKER_SINGLE) { - tQWorkerCleanup(&pWorker->pool); - tQWorkerFreeQueue(&pWorker->pool, pWorker->queue); - } else if (pWorker->type == DND_WORKER_MULTI) { - tWWorkerCleanup(&pWorker->mpool); - tWWorkerFreeQueue(&pWorker->mpool, pWorker->queue); - } else { - } -} - -int32_t dndWriteMsgToWorker(SDnodeWorker *pWorker, void *pMsg) { - if (pWorker == NULL || pWorker->queue == NULL) { - terrno = TSDB_CODE_INVALID_PARA; - return -1; - } - - if (taosWriteQitem(pWorker->queue, pMsg) != 0) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } - - return 0; -} diff --git a/source/dnode/mgmt/dnode/inc/dmInt.h b/source/dnode/mgmt/dnode/inc/dmInt.h index 46e70727af..b02b1d2297 100644 --- a/source/dnode/mgmt/dnode/inc/dmInt.h +++ b/source/dnode/mgmt/dnode/inc/dmInt.h @@ -29,10 +29,10 @@ typedef struct SDnodeMgmt { SEpSet mnodeEpSet; SHashObj *dnodeHash; SArray *dnodeEps; - TdThread *threadId; + TdThread *threadId; SRWLatch latch; - SDnodeWorker mgmtWorker; - SDnodeWorker statusWorker; + SSingleWorker mgmtWorker; + SSingleWorker statusWorker; const char *path; SDnode *pDnode; SMgmtWrapper *pWrapper; diff --git a/source/dnode/mgmt/dnode/src/dmWorker.c b/source/dnode/mgmt/dnode/src/dmWorker.c index e06cbf4351..b62c18655a 100644 --- a/source/dnode/mgmt/dnode/src/dmWorker.c +++ b/source/dnode/mgmt/dnode/src/dmWorker.c @@ -52,10 +52,12 @@ static void *dmThreadRoutine(void *param) { } } -static void dmProcessQueue(SDnodeMgmt *pMgmt, SNodeMsg *pMsg) { - SDnode *pDnode = pMgmt->pDnode; +static void dmProcessQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) { + SDnodeMgmt *pMgmt = pInfo->ahandle; + + SDnode *pDnode = pMgmt->pDnode; SRpcMsg *pRpc = &pMsg->rpcMsg; - int32_t code = -1; + int32_t code = -1; dTrace("msg:%p, will be processed in dnode queue", pMsg); switch (pRpc->msgType) { @@ -98,13 +100,17 @@ static void dmProcessQueue(SDnodeMgmt *pMgmt, SNodeMsg *pMsg) { } int32_t dmStartWorker(SDnodeMgmt *pMgmt) { - if (dndInitWorker(pMgmt, &pMgmt->mgmtWorker, DND_WORKER_SINGLE, "dnode-mgmt", 1, 1, dmProcessQueue) != 0) { + SSingleWorkerCfg mgmtCfg = { + .minNum = 1, .maxNum = 1, .name = "dnode-mgmt", .fp = (FItem)dmProcessQueue, .param = pMgmt}; + if (tSingleWorkerInit(&pMgmt->mgmtWorker, &mgmtCfg) != 0) { dError("failed to start dnode mgmt worker since %s", terrstr()); return -1; } - if (dndInitWorker(pMgmt, &pMgmt->statusWorker, DND_WORKER_SINGLE, "dnode-status", 1, 1, dmProcessQueue) != 0) { - dError("failed to start dnode mgmt worker since %s", terrstr()); + SSingleWorkerCfg statusCfg = { + .minNum = 1, .maxNum = 1, .name = "dnode-status", .fp = (FItem)dmProcessQueue, .param = pMgmt}; + if (tSingleWorkerInit(&pMgmt->statusWorker, &statusCfg) != 0) { + dError("failed to start dnode status worker since %s", terrstr()); return -1; } @@ -123,8 +129,8 @@ int32_t dmStartThread(SDnodeMgmt *pMgmt) { } void dmStopWorker(SDnodeMgmt *pMgmt) { - dndCleanupWorker(&pMgmt->mgmtWorker); - dndCleanupWorker(&pMgmt->statusWorker); + tSingleWorkerCleanup(&pMgmt->mgmtWorker); + tSingleWorkerCleanup(&pMgmt->statusWorker); if (pMgmt->threadId != NULL) { taosDestoryThread(pMgmt->threadId); @@ -133,11 +139,11 @@ void dmStopWorker(SDnodeMgmt *pMgmt) { } int32_t dmProcessMgmtMsg(SDnodeMgmt *pMgmt, SNodeMsg *pMsg) { - SDnodeWorker *pWorker = &pMgmt->mgmtWorker; + SSingleWorker *pWorker = &pMgmt->mgmtWorker; if (pMsg->rpcMsg.msgType == TDMT_MND_STATUS_RSP) { pWorker = &pMgmt->statusWorker; } dTrace("msg:%p, will be written to worker %s", pMsg, pWorker->name); - return dndWriteMsgToWorker(pWorker, pMsg); + return taosWriteQitem(pWorker->queue, pMsg); } diff --git a/source/dnode/mgmt/mnode/inc/mmInt.h b/source/dnode/mgmt/mnode/inc/mmInt.h index 06ed637791..d57088474f 100644 --- a/source/dnode/mgmt/mnode/inc/mmInt.h +++ b/source/dnode/mgmt/mnode/inc/mmInt.h @@ -28,9 +28,9 @@ typedef struct SMnodeMgmt { SDnode *pDnode; SMgmtWrapper *pWrapper; const char *path; - SQWorkerAll readWorker; - SQWorkerAll writeWorker; - SQWorkerAll syncWorker; + SSingleWorker readWorker; + SSingleWorker writeWorker; + SSingleWorker syncWorker; SReplica replicas[TSDB_MAX_REPLICA]; int8_t replica; int8_t selfIndex; diff --git a/source/dnode/mgmt/mnode/src/mmWorker.c b/source/dnode/mgmt/mnode/src/mmWorker.c index 470128940b..b9a3a4f14c 100644 --- a/source/dnode/mgmt/mnode/src/mmWorker.c +++ b/source/dnode/mgmt/mnode/src/mmWorker.c @@ -16,7 +16,9 @@ #define _DEFAULT_SOURCE #include "mmInt.h" -static void mmProcessQueue(SMnodeMgmt *pMgmt, SNodeMsg *pMsg) { +static void mmProcessQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) { + SMnodeMgmt *pMgmt = pInfo->ahandle; + dTrace("msg:%p, will be processed in mnode queue", pMsg); SRpcMsg *pRpc = &pMsg->rpcMsg; int32_t code = -1; @@ -42,7 +44,7 @@ static void mmProcessQueue(SMnodeMgmt *pMgmt, SNodeMsg *pMsg) { taosFreeQitem(pMsg); } -static int32_t mmPutMsgToWorker(SMnodeMgmt *pMgmt, SQWorkerAll *pWorker, SNodeMsg *pMsg) { +static int32_t mmPutMsgToWorker(SMnodeMgmt *pMgmt, SSingleWorker *pWorker, SNodeMsg *pMsg) { dTrace("msg:%p, put into worker %s", pMsg, pWorker->name); return taosWriteQitem(pWorker->queue, pMsg); } @@ -59,7 +61,7 @@ int32_t mmProcessReadMsg(SMnodeMgmt *pMgmt, SNodeMsg *pMsg) { return mmPutMsgToWorker(pMgmt, &pMgmt->readWorker, pMsg); } -static int32_t mmPutRpcMsgToWorker(SMnodeMgmt *pMgmt, SQWorkerAll *pWorker, SRpcMsg *pRpc) { +static int32_t mmPutRpcMsgToWorker(SMnodeMgmt *pMgmt, SSingleWorker *pWorker, SRpcMsg *pRpc) { SNodeMsg *pMsg = taosAllocateQitem(sizeof(SNodeMsg)); if (pMsg == NULL) { return -1; @@ -89,19 +91,19 @@ int32_t mmPutMsgToReadQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc) { } int32_t mmStartWorker(SMnodeMgmt *pMgmt) { - SQWorkerAllCfg cfg = {.minNum = 0, .maxNum = 1, .name = "mnode-read", .fp = (FItem)mmProcessQueue, .param = pMgmt}; + SSingleWorkerCfg cfg = {.minNum = 0, .maxNum = 1, .name = "mnode-read", .fp = (FItem)mmProcessQueue, .param = pMgmt}; - if (tQWorkerAllInit(&pMgmt->readWorker, &cfg) != 0) { + if (tSingleWorkerInit(&pMgmt->readWorker, &cfg) != 0) { dError("failed to start mnode-read worker since %s", terrstr()); return -1; } - if (tQWorkerAllInit(&pMgmt->writeWorker, &cfg) != 0) { + if (tSingleWorkerInit(&pMgmt->writeWorker, &cfg) != 0) { dError("failed to start mnode-write worker since %s", terrstr()); return -1; } - if (tQWorkerAllInit(&pMgmt->syncWorker, &cfg) != 0) { + if (tSingleWorkerInit(&pMgmt->syncWorker, &cfg) != 0) { dError("failed to start mnode sync-worker since %s", terrstr()); return -1; } @@ -110,7 +112,7 @@ int32_t mmStartWorker(SMnodeMgmt *pMgmt) { } void mmStopWorker(SMnodeMgmt *pMgmt) { - tQWorkerAllCleanup(&pMgmt->readWorker); - tQWorkerAllCleanup(&pMgmt->writeWorker); - tQWorkerAllCleanup(&pMgmt->syncWorker); + tSingleWorkerCleanup(&pMgmt->readWorker); + tSingleWorkerCleanup(&pMgmt->writeWorker); + tSingleWorkerCleanup(&pMgmt->syncWorker); } diff --git a/source/dnode/mgmt/qnode/inc/qmInt.h b/source/dnode/mgmt/qnode/inc/qmInt.h index dc7f10b939..52d23a445c 100644 --- a/source/dnode/mgmt/qnode/inc/qmInt.h +++ b/source/dnode/mgmt/qnode/inc/qmInt.h @@ -28,8 +28,8 @@ typedef struct SQnodeMgmt { SDnode *pDnode; SMgmtWrapper *pWrapper; const char *path; - SDnodeWorker queryWorker; - SDnodeWorker fetchWorker; + SSingleWorker queryWorker; + SSingleWorker fetchWorker; } SQnodeMgmt; // qmInt.c @@ -44,6 +44,7 @@ int32_t qmProcessDropReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); // qmWorker.c int32_t qmPutMsgToQueryQueue(SMgmtWrapper *pWrapper, SRpcMsg *pMsg); int32_t qmPutMsgToFetchQueue(SMgmtWrapper *pWrapper, SRpcMsg *pMsg); +int32_t qmGetQueueSize(SMgmtWrapper *pWrapper, int32_t vgId, EQueueType qtype); int32_t qmStartWorker(SQnodeMgmt *pMgmt); void qmStopWorker(SQnodeMgmt *pMgmt); diff --git a/source/dnode/mgmt/qnode/src/qmInt.c b/source/dnode/mgmt/qnode/src/qmInt.c index 40959192c3..2bbfbd83f5 100644 --- a/source/dnode/mgmt/qnode/src/qmInt.c +++ b/source/dnode/mgmt/qnode/src/qmInt.c @@ -23,6 +23,7 @@ static void qmInitOption(SQnodeMgmt *pMgmt, SQnodeOpt *pOption) { msgCb.pWrapper = pMgmt->pWrapper; msgCb.queueFps[QUERY_QUEUE] = qmPutMsgToQueryQueue; msgCb.queueFps[FETCH_QUEUE] = qmPutMsgToFetchQueue; + msgCb.qsizeFp = qmGetQueueSize; msgCb.sendReqFp = dndSendReqToDnode; msgCb.sendMnodeReqFp = dndSendReqToMnode; msgCb.sendRspFp = dndSendRsp; diff --git a/source/dnode/mgmt/qnode/src/qmWorker.c b/source/dnode/mgmt/qnode/src/qmWorker.c index 1a140c69e7..fff469a902 100644 --- a/source/dnode/mgmt/qnode/src/qmWorker.c +++ b/source/dnode/mgmt/qnode/src/qmWorker.c @@ -21,7 +21,9 @@ static void qmSendRsp(SMgmtWrapper *pWrapper, SNodeMsg *pMsg, int32_t code) { dndSendRsp(pWrapper, &rsp); } -static void qmProcessQueryQueue(SQnodeMgmt *pMgmt, SNodeMsg *pMsg) { +static void qmProcessQueryQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) { + SQnodeMgmt *pMgmt = pInfo->ahandle; + dTrace("msg:%p, will be processed in qnode-query queue", pMsg); int32_t code = qndProcessQueryMsg(pMgmt->pQnode, &pMsg->rpcMsg); if (code != 0) { @@ -33,7 +35,9 @@ static void qmProcessQueryQueue(SQnodeMgmt *pMgmt, SNodeMsg *pMsg) { taosFreeQitem(pMsg); } -static void qmProcessFetchQueue(SQnodeMgmt *pMgmt, SNodeMsg *pMsg) { +static void qmProcessFetchQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) { + SQnodeMgmt *pMgmt = pInfo->ahandle; + dTrace("msg:%p, will be processed in qnode-fetch queue", pMsg); int32_t code = qndProcessFetchMsg(pMgmt->pQnode, &pMsg->rpcMsg); if (code != 0) { @@ -45,16 +49,16 @@ static void qmProcessFetchQueue(SQnodeMgmt *pMgmt, SNodeMsg *pMsg) { taosFreeQitem(pMsg); } -static int32_t qmPutMsgToWorker(SDnodeWorker *pWorker, SNodeMsg *pMsg) { +static int32_t qmPutMsgToWorker(SSingleWorker *pWorker, SNodeMsg *pMsg) { dTrace("msg:%p, put into worker %s", pMsg, pWorker->name); - return dndWriteMsgToWorker(pWorker, pMsg); + return taosWriteQitem(pWorker->queue, pMsg); } int32_t qmProcessQueryMsg(SQnodeMgmt *pMgmt, SNodeMsg *pMsg) { return qmPutMsgToWorker(&pMgmt->queryWorker, pMsg); } int32_t qmProcessFetchMsg(SQnodeMgmt *pMgmt, SNodeMsg *pMsg) { return qmPutMsgToWorker(&pMgmt->fetchWorker, pMsg); } -static int32_t qmPutRpcMsgToWorker(SQnodeMgmt *pMgmt, SDnodeWorker *pWorker, SRpcMsg *pRpc) { +static int32_t qmPutRpcMsgToWorker(SQnodeMgmt *pMgmt, SSingleWorker *pWorker, SRpcMsg *pRpc) { SNodeMsg *pMsg = taosAllocateQitem(sizeof(SNodeMsg)); if (pMsg == NULL) { return -1; @@ -63,7 +67,7 @@ static int32_t qmPutRpcMsgToWorker(SQnodeMgmt *pMgmt, SDnodeWorker *pWorker, SRp dTrace("msg:%p, is created and put into worker:%s, type:%s", pMsg, pWorker->name, TMSG_INFO(pRpc->msgType)); pMsg->rpcMsg = *pRpc; - int32_t code = dndWriteMsgToWorker(pWorker, pMsg); + int32_t code = taosWriteQitem(pWorker->queue, pMsg); if (code != 0) { dTrace("msg:%p, is freed", pMsg); taosFreeQitem(pMsg); @@ -83,21 +87,48 @@ int32_t qmPutMsgToFetchQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc) { return qmPutRpcMsgToWorker(pMgmt, &pMgmt->fetchWorker, pRpc); } +int32_t qmGetQueueSize(SMgmtWrapper *pWrapper, int32_t vgId, EQueueType qtype) { + int32_t size = -1; + SQnodeMgmt *pMgmt = pWrapper->pMgmt; + switch (qtype) { + case QUERY_QUEUE: + size = taosQueueSize(pMgmt->queryWorker.queue); + break; + case FETCH_QUEUE: + size = taosQueueSize(pMgmt->fetchWorker.queue); + break; + default: + break; + } + + return size; +} + int32_t qmStartWorker(SQnodeMgmt *pMgmt) { int32_t maxFetchThreads = 4; int32_t minFetchThreads = TMIN(maxFetchThreads, tsNumOfCores); int32_t minQueryThreads = TMAX((int32_t)(tsNumOfCores * tsRatioOfQueryCores), 1); int32_t maxQueryThreads = minQueryThreads; - if (dndInitWorker(pMgmt, &pMgmt->queryWorker, DND_WORKER_SINGLE, "qnode-query", minQueryThreads, maxQueryThreads, - qmProcessQueryQueue) != 0) { - dError("failed to start qnode query worker since %s", terrstr()); + SSingleWorkerCfg queryCfg = {.minNum = minQueryThreads, + .maxNum = maxQueryThreads, + .name = "qnode-query", + .fp = (FItem)qmProcessQueryQueue, + .param = pMgmt}; + + if (tSingleWorkerInit(&pMgmt->queryWorker, &queryCfg) != 0) { + dError("failed to start qnode-query worker since %s", terrstr()); return -1; } - if (dndInitWorker(pMgmt, &pMgmt->fetchWorker, DND_WORKER_SINGLE, "qnode-fetch", minFetchThreads, maxFetchThreads, - qmProcessFetchQueue) != 0) { - dError("failed to start qnode fetch worker since %s", terrstr()); + SSingleWorkerCfg fetchCfg = {.minNum = minFetchThreads, + .maxNum = maxFetchThreads, + .name = "qnode-fetch", + .fp = (FItem)qmProcessFetchQueue, + .param = pMgmt}; + + if (tSingleWorkerInit(&pMgmt->fetchWorker, &fetchCfg) != 0) { + dError("failed to start qnode-fetch worker since %s", terrstr()); return -1; } @@ -105,6 +136,6 @@ int32_t qmStartWorker(SQnodeMgmt *pMgmt) { } void qmStopWorker(SQnodeMgmt *pMgmt) { - dndCleanupWorker(&pMgmt->queryWorker); - dndCleanupWorker(&pMgmt->fetchWorker); + tSingleWorkerCleanup(&pMgmt->queryWorker); + tSingleWorkerCleanup(&pMgmt->fetchWorker); } diff --git a/source/dnode/mgmt/snode/inc/smInt.h b/source/dnode/mgmt/snode/inc/smInt.h index 3def27b832..f2b510483c 100644 --- a/source/dnode/mgmt/snode/inc/smInt.h +++ b/source/dnode/mgmt/snode/inc/smInt.h @@ -30,8 +30,8 @@ typedef struct SSnodeMgmt { const char *path; SRWLatch latch; int8_t uniqueWorkerInUse; - SArray *uniqueWorkers; // SArray - SDnodeWorker sharedWorker; + SArray *uniqueWorkers; // SArray + SSingleWorker sharedWorker; } SSnodeMgmt; // smInt.c diff --git a/source/dnode/mgmt/snode/src/smWorker.c b/source/dnode/mgmt/snode/src/smWorker.c index 57d0c09849..ceec6b82c3 100644 --- a/source/dnode/mgmt/snode/src/smWorker.c +++ b/source/dnode/mgmt/snode/src/smWorker.c @@ -16,7 +16,9 @@ #define _DEFAULT_SOURCE #include "smInt.h" -static void smProcessUniqueQueue(SSnodeMgmt *pMgmt, STaosQall *qall, int32_t numOfMsgs) { +static void smProcessUniqueQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) { + SSnodeMgmt *pMgmt = pInfo->ahandle; + for (int32_t i = 0; i < numOfMsgs; i++) { SNodeMsg *pMsg = NULL; taosGetQitem(qall, (void **)&pMsg); @@ -30,7 +32,9 @@ static void smProcessUniqueQueue(SSnodeMgmt *pMgmt, STaosQall *qall, int32_t num } } -static void smProcessSharedQueue(SSnodeMgmt *pMgmt, SNodeMsg *pMsg) { +static void smProcessSharedQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) { + SSnodeMgmt *pMgmt = pInfo->ahandle; + dTrace("msg:%p, will be processed in snode shared queue", pMsg); sndProcessSMsg(pMgmt->pSnode, &pMsg->rpcMsg); @@ -40,20 +44,23 @@ static void smProcessSharedQueue(SSnodeMgmt *pMgmt, SNodeMsg *pMsg) { } int32_t smStartWorker(SSnodeMgmt *pMgmt) { - pMgmt->uniqueWorkers = taosArrayInit(0, sizeof(void *)); + pMgmt->uniqueWorkers = taosArrayInit(0, sizeof(SMultiWorker *)); if (pMgmt->uniqueWorkers == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } for (int32_t i = 0; i < SND_UNIQUE_THREAD_NUM; i++) { - SDnodeWorker *pUniqueWorker = malloc(sizeof(SDnodeWorker)); + SMultiWorker *pUniqueWorker = malloc(sizeof(SMultiWorker)); if (pUniqueWorker == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } - if (dndInitWorker(pMgmt, pUniqueWorker, DND_WORKER_MULTI, "snode-unique", 1, 1, smProcessSharedQueue) != 0) { - dError("failed to start snode unique worker since %s", terrstr()); + + SMultiWorkerCfg cfg = {.maxNum = 1, .name = "snode-unique", .fp = smProcessUniqueQueue, .param = pMgmt}; + + if (tMultiWorkerInit(pUniqueWorker, &cfg) != 0) { + dError("failed to start snode-unique worker since %s", terrstr()); return -1; } if (taosArrayPush(pMgmt->uniqueWorkers, &pUniqueWorker) == NULL) { @@ -62,9 +69,14 @@ int32_t smStartWorker(SSnodeMgmt *pMgmt) { } } - if (dndInitWorker(pMgmt, &pMgmt->sharedWorker, DND_WORKER_SINGLE, "snode-shared", SND_SHARED_THREAD_NUM, - SND_SHARED_THREAD_NUM, smProcessSharedQueue)) { - dError("failed to start snode shared worker since %s", terrstr()); + SSingleWorkerCfg cfg = {.minNum = SND_SHARED_THREAD_NUM, + .maxNum = SND_SHARED_THREAD_NUM, + .name = "snode-shared", + .fp = (FItem)smProcessSharedQueue, + .param = pMgmt}; + + if (tSingleWorkerInit(&pMgmt->sharedWorker, &cfg)) { + dError("failed to start snode shared-worker since %s", terrstr()); return -1; } @@ -73,11 +85,11 @@ int32_t smStartWorker(SSnodeMgmt *pMgmt) { void smStopWorker(SSnodeMgmt *pMgmt) { for (int32_t i = 0; i < taosArrayGetSize(pMgmt->uniqueWorkers); i++) { - SDnodeWorker *worker = taosArrayGetP(pMgmt->uniqueWorkers, i); - dndCleanupWorker(worker); + SMultiWorker *pWorker = taosArrayGetP(pMgmt->uniqueWorkers, i); + tMultiWorkerCleanup(pWorker); } taosArrayDestroy(pMgmt->uniqueWorkers); - dndCleanupWorker(&pMgmt->sharedWorker); + tSingleWorkerCleanup(&pMgmt->sharedWorker); } static FORCE_INLINE int32_t smGetSWIdFromMsg(SRpcMsg *pMsg) { @@ -93,33 +105,33 @@ static FORCE_INLINE int32_t smGetSWTypeFromMsg(SRpcMsg *pMsg) { } int32_t smProcessMgmtMsg(SSnodeMgmt *pMgmt, SNodeMsg *pMsg) { - SDnodeWorker *pWorker = taosArrayGetP(pMgmt->uniqueWorkers, 0); + SMultiWorker *pWorker = taosArrayGetP(pMgmt->uniqueWorkers, 0); if (pWorker == NULL) { terrno = TSDB_CODE_INVALID_MSG; return -1; } dTrace("msg:%p, put into worker:%s", pMsg, pWorker->name); - return dndWriteMsgToWorker(pWorker, pMsg); + return taosWriteQitem(pWorker->queue, pMsg); } int32_t smProcessUniqueMsg(SSnodeMgmt *pMgmt, SNodeMsg *pMsg) { int32_t index = smGetSWIdFromMsg(&pMsg->rpcMsg); - SDnodeWorker *pWorker = taosArrayGetP(pMgmt->uniqueWorkers, index); + SMultiWorker *pWorker = taosArrayGetP(pMgmt->uniqueWorkers, index); if (pWorker == NULL) { terrno = TSDB_CODE_INVALID_MSG; return -1; } dTrace("msg:%p, put into worker:%s", pMsg, pWorker->name); - return dndWriteMsgToWorker(pWorker, pMsg); + return taosWriteQitem(pWorker->queue, pMsg); } int32_t smProcessSharedMsg(SSnodeMgmt *pMgmt, SNodeMsg *pMsg) { - SDnodeWorker *pWorker = &pMgmt->sharedWorker; + SSingleWorker *pWorker = &pMgmt->sharedWorker; dTrace("msg:%p, put into worker:%s", pMsg, pWorker->name); - return dndWriteMsgToWorker(pWorker, pMsg); + return taosWriteQitem(pWorker->queue, pMsg); } int32_t smProcessExecMsg(SSnodeMgmt *pMgmt, SNodeMsg *pMsg) { diff --git a/source/dnode/mgmt/vnode/inc/vmInt.h b/source/dnode/mgmt/vnode/inc/vmInt.h index ba59258d07..ccdb1ae257 100644 --- a/source/dnode/mgmt/vnode/inc/vmInt.h +++ b/source/dnode/mgmt/vnode/inc/vmInt.h @@ -36,7 +36,7 @@ typedef struct SVnodesMgmt { const char *path; SDnode *pDnode; SMgmtWrapper *pWrapper; - SDnodeWorker mgmtWorker; + SSingleWorker mgmtWorker; } SVnodesMgmt; typedef struct { @@ -104,6 +104,7 @@ void vmFreeQueue(SVnodesMgmt *pMgmt, SVnodeObj *pVnode); int32_t vmPutMsgToQueryQueue(SMgmtWrapper *pWrapper, SRpcMsg *pMsg); int32_t vmPutMsgToFetchQueue(SMgmtWrapper *pWrapper, SRpcMsg *pMsg); int32_t vmPutMsgToApplyQueue(SMgmtWrapper *pWrapper, SRpcMsg *pMsg); +int32_t vmGetQueueSize(SMgmtWrapper *pWrapper, int32_t vgId, EQueueType qtype); int32_t vmProcessWriteMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg); int32_t vmProcessSyncMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg); diff --git a/source/dnode/mgmt/vnode/src/vmInt.c b/source/dnode/mgmt/vnode/src/vmInt.c index 1c6c7d089e..a324c60618 100644 --- a/source/dnode/mgmt/vnode/src/vmInt.c +++ b/source/dnode/mgmt/vnode/src/vmInt.c @@ -133,6 +133,7 @@ static void *vmOpenVnodeFunc(void *param) { msgCb.queueFps[QUERY_QUEUE] = vmPutMsgToQueryQueue; msgCb.queueFps[FETCH_QUEUE] = vmPutMsgToFetchQueue; msgCb.queueFps[APPLY_QUEUE] = vmPutMsgToApplyQueue; + msgCb.qsizeFp = vmGetQueueSize; msgCb.sendReqFp = dndSendReqToDnode; msgCb.sendMnodeReqFp = dndSendReqToMnode; msgCb.sendRspFp = dndSendRsp; diff --git a/source/dnode/mgmt/vnode/src/vmMsg.c b/source/dnode/mgmt/vnode/src/vmMsg.c index 9f86351985..e4a4cfcd9f 100644 --- a/source/dnode/mgmt/vnode/src/vmMsg.c +++ b/source/dnode/mgmt/vnode/src/vmMsg.c @@ -87,6 +87,7 @@ int32_t vmProcessCreateVnodeReq(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) { msgCb.queueFps[QUERY_QUEUE] = vmPutMsgToQueryQueue; msgCb.queueFps[FETCH_QUEUE] = vmPutMsgToFetchQueue; msgCb.queueFps[APPLY_QUEUE] = vmPutMsgToApplyQueue; + msgCb.qsizeFp = vmGetQueueSize; msgCb.sendReqFp = dndSendReqToDnode; msgCb.sendMnodeReqFp = dndSendReqToMnode; msgCb.sendRspFp = dndSendRsp; diff --git a/source/dnode/mgmt/vnode/src/vmWorker.c b/source/dnode/mgmt/vnode/src/vmWorker.c index e0632cee68..6c7d513c58 100644 --- a/source/dnode/mgmt/vnode/src/vmWorker.c +++ b/source/dnode/mgmt/vnode/src/vmWorker.c @@ -21,7 +21,9 @@ static void vmSendRsp(SMgmtWrapper *pWrapper, SNodeMsg *pMsg, int32_t code) { dndSendRsp(pWrapper, &rsp); } -static void vmProcessMgmtQueue(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) { +static void vmProcessMgmtQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) { + SVnodesMgmt *pMgmt = pInfo->ahandle; + int32_t code = -1; tmsg_t msgType = pMsg->rpcMsg.msgType; dTrace("msg:%p, will be processed in vnode-mgmt queue", pMsg); @@ -57,7 +59,9 @@ static void vmProcessMgmtQueue(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) { taosFreeQitem(pMsg); } -static void vmProcessQueryQueue(SVnodeObj *pVnode, SNodeMsg *pMsg) { +static void vmProcessQueryQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) { + SVnodeObj *pVnode = pInfo->ahandle; + dTrace("msg:%p, will be processed in vnode-query queue", pMsg); int32_t code = vnodeProcessQueryMsg(pVnode->pImpl, &pMsg->rpcMsg); if (code != 0) { @@ -68,7 +72,9 @@ static void vmProcessQueryQueue(SVnodeObj *pVnode, SNodeMsg *pMsg) { } } -static void vmProcessFetchQueue(SVnodeObj *pVnode, SNodeMsg *pMsg) { +static void vmProcessFetchQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) { + SVnodeObj *pVnode = pInfo->ahandle; + dTrace("msg:%p, will be processed in vnode-fetch queue", pMsg); int32_t code = vnodeProcessFetchMsg(pVnode->pImpl, &pMsg->rpcMsg); if (code != 0) { @@ -79,7 +85,9 @@ static void vmProcessFetchQueue(SVnodeObj *pVnode, SNodeMsg *pMsg) { } } -static void vmProcessWriteQueue(SVnodeObj *pVnode, STaosQall *qall, int32_t numOfMsgs) { +static void vmProcessWriteQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) { + SVnodeObj *pVnode = pInfo->ahandle; + SArray *pArray = taosArrayInit(numOfMsgs, sizeof(SNodeMsg *)); if (pArray == NULL) { dError("failed to process %d msgs in write-queue since %s", numOfMsgs, terrstr()); @@ -126,8 +134,9 @@ static void vmProcessWriteQueue(SVnodeObj *pVnode, STaosQall *qall, int32_t numO taosArrayDestroy(pArray); } -static void vmProcessApplyQueue(SVnodeObj *pVnode, STaosQall *qall, int32_t numOfMsgs) { - SNodeMsg *pMsg = NULL; +static void vmProcessApplyQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) { + SVnodeObj *pVnode = pInfo->ahandle; + SNodeMsg *pMsg = NULL; for (int32_t i = 0; i < numOfMsgs; ++i) { taosGetQitem(qall, (void **)&pMsg); @@ -138,8 +147,9 @@ static void vmProcessApplyQueue(SVnodeObj *pVnode, STaosQall *qall, int32_t numO } } -static void vmProcessSyncQueue(SVnodeObj *pVnode, STaosQall *qall, int32_t numOfMsgs) { - SNodeMsg *pMsg = NULL; +static void vmProcessSyncQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) { + SVnodeObj *pVnode = pInfo->ahandle; + SNodeMsg *pMsg = NULL; for (int32_t i = 0; i < numOfMsgs; ++i) { taosGetQitem(qall, (void **)&pMsg); @@ -190,26 +200,18 @@ static int32_t vmPutNodeMsgToQueue(SVnodesMgmt *pMgmt, SNodeMsg *pMsg, EQueueTyp return code; } -int32_t vmProcessSyncMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) { - return vmPutNodeMsgToQueue(pMgmt, pMsg, SYNC_QUEUE); -} +int32_t vmProcessSyncMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) { return vmPutNodeMsgToQueue(pMgmt, pMsg, SYNC_QUEUE); } -int32_t vmProcessWriteMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) { - return vmPutNodeMsgToQueue(pMgmt, pMsg, WRITE_QUEUE); -} +int32_t vmProcessWriteMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) { return vmPutNodeMsgToQueue(pMgmt, pMsg, WRITE_QUEUE); } -int32_t vmProcessQueryMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) { - return vmPutNodeMsgToQueue(pMgmt, pMsg, QUERY_QUEUE); -} +int32_t vmProcessQueryMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) { return vmPutNodeMsgToQueue(pMgmt, pMsg, QUERY_QUEUE); } -int32_t vmProcessFetchMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) { - return vmPutNodeMsgToQueue(pMgmt, pMsg, FETCH_QUEUE); -} +int32_t vmProcessFetchMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) { return vmPutNodeMsgToQueue(pMgmt, pMsg, FETCH_QUEUE); } int32_t vmProcessMgmtMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) { - SDnodeWorker *pWorker = &pMgmt->mgmtWorker; + SSingleWorker *pWorker = &pMgmt->mgmtWorker; dTrace("msg:%p, will be written to vnode-mgmt queue, worker:%s", pMsg, pWorker->name); - return dndWriteMsgToWorker(pWorker, pMsg); + return taosWriteQitem(pWorker->queue, pMsg); } static int32_t vmPutRpcMsgToQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc, EQueueType qtype) { @@ -258,6 +260,34 @@ int32_t vmPutMsgToApplyQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc) { return vmPutRpcMsgToQueue(pWrapper, pRpc, APPLY_QUEUE); } +int32_t vmGetQueueSize(SMgmtWrapper *pWrapper, int32_t vgId, EQueueType qtype) { + int32_t size = -1; + SVnodeObj *pVnode = vmAcquireVnode(pWrapper->pMgmt, vgId); + if (pVnode != NULL) { + switch (qtype) { + case QUERY_QUEUE: + size = taosQueueSize(pVnode->pQueryQ); + break; + case FETCH_QUEUE: + size = taosQueueSize(pVnode->pFetchQ); + break; + case WRITE_QUEUE: + size = taosQueueSize(pVnode->pWriteQ); + break; + case SYNC_QUEUE: + size = taosQueueSize(pVnode->pSyncQ); + break; + case APPLY_QUEUE: + size = taosQueueSize(pVnode->pApplyQ); + break; + default: + break; + } + } + vmReleaseVnode(pWrapper->pMgmt, pVnode); + return size; +} + int32_t vmAllocQueue(SVnodesMgmt *pMgmt, SVnodeObj *pVnode) { pVnode->pWriteQ = tWWorkerAllocQueue(&pMgmt->writePool, pVnode, (FItems)vmProcessWriteQueue); pVnode->pApplyQ = tWWorkerAllocQueue(&pMgmt->writePool, pVnode, (FItems)vmProcessApplyQueue); @@ -319,7 +349,9 @@ int32_t vmStartWorker(SVnodesMgmt *pMgmt) { pWPool->max = maxSyncThreads; if (tWWorkerInit(pWPool) != 0) return -1; - if (dndInitWorker(pMgmt, &pMgmt->mgmtWorker, DND_WORKER_SINGLE, "vnode-mgmt", 1, 1, vmProcessMgmtQueue) != 0) { + SSingleWorkerCfg cfg = { + .minNum = 1, .maxNum = 1, .name = "vnode-mgmt", .fp = (FItem)vmProcessMgmtQueue, .param = pMgmt}; + if (tSingleWorkerInit(&pMgmt->mgmtWorker, &cfg) != 0) { dError("failed to start vnode-mgmt worker since %s", terrstr()); return -1; } @@ -329,7 +361,7 @@ int32_t vmStartWorker(SVnodesMgmt *pMgmt) { } void vmStopWorker(SVnodesMgmt *pMgmt) { - dndCleanupWorker(&pMgmt->mgmtWorker); + tSingleWorkerCleanup(&pMgmt->mgmtWorker); tQWorkerCleanup(&pMgmt->fetchPool); tQWorkerCleanup(&pMgmt->queryPool); tWWorkerCleanup(&pMgmt->writePool); diff --git a/source/util/src/tworker.c b/source/util/src/tworker.c index e05c4e0a78..044896d7a5 100644 --- a/source/util/src/tworker.c +++ b/source/util/src/tworker.c @@ -86,7 +86,8 @@ static void *tQWorkerThreadFp(SQWorker *worker) { } if (fp != NULL) { - (*fp)(ahandle, msg); + SQueueInfo info = {.ahandle = ahandle, .workerId = worker->id, .threadNum = pool->num}; + (*fp)(&info, msg); } } @@ -210,7 +211,8 @@ static void *tWWorkerThreadFp(SWWorker *worker) { } if (fp != NULL) { - (*fp)(ahandle, worker->qall, numOfMsgs); + SQueueInfo info = {.ahandle = ahandle, .workerId = worker->id, .threadNum = pool->num}; + (*fp)(&info, worker->qall, numOfMsgs); } } @@ -264,6 +266,8 @@ STaosQueue *tWWorkerAllocQueue(SWWorkerPool *pool, void *ahandle, FItems fp) { } taosThreadAttrDestroy(&thAttr); + pool->num++; + if (pool->num > pool->max) pool->num = pool->max; } else { taosAddIntoQset(worker->qset, queue, ahandle); pool->nextId = (pool->nextId + 1) % pool->max; @@ -280,7 +284,7 @@ void tWWorkerFreeQueue(SWWorkerPool *pool, STaosQueue *queue) { uDebug("worker:%s, queue:%p is freed", pool->name, queue); } -int32_t tQWorkerAllInit(SQWorkerAll *pWorker, const SQWorkerAllCfg *pCfg) { +int32_t tSingleWorkerInit(SSingleWorker *pWorker, const SSingleWorkerCfg *pCfg) { SQWorkerPool *pPool = &pWorker->pool; pPool->name = pCfg->name; pPool->min = pCfg->minNum; @@ -298,7 +302,7 @@ int32_t tQWorkerAllInit(SQWorkerAll *pWorker, const SQWorkerAllCfg *pCfg) { return 0; } -void tQWorkerAllCleanup(SQWorkerAll *pWorker) { +void tSingleWorkerCleanup(SSingleWorker *pWorker) { if (pWorker->queue == NULL) return; while (!taosQueueEmpty(pWorker->queue)) { @@ -309,7 +313,7 @@ void tQWorkerAllCleanup(SQWorkerAll *pWorker) { tQWorkerFreeQueue(&pWorker->pool, pWorker->queue); } -int32_t tWWorkerAllInit(SWWorkerAll *pWorker, const SWWorkerAllCfg *pCfg) { +int32_t tMultiWorkerInit(SMultiWorker *pWorker, const SMultiWorkerCfg *pCfg) { SWWorkerPool *pPool = &pWorker->pool; pPool->name = pCfg->name; pPool->max = pCfg->maxNum; @@ -326,7 +330,7 @@ int32_t tWWorkerAllInit(SWWorkerAll *pWorker, const SWWorkerAllCfg *pCfg) { return 0; } -void tWWorkerAllCleanup(SWWorkerAll *pWorker) { +void tMultiWorkerCleanup(SMultiWorker *pWorker) { if (pWorker->queue == NULL) return; while (!taosQueueEmpty(pWorker->queue)) { diff --git a/tests/script/jenkins/basic.txt b/tests/script/jenkins/basic.txt index cc0576e1f3..74d174572e 100644 --- a/tests/script/jenkins/basic.txt +++ b/tests/script/jenkins/basic.txt @@ -17,7 +17,7 @@ ./test.sh -f tsim/dnode/basic1.sim # ---- insert -./test.sh -f tsim/insert/basic0.sim +# ./test.sh -f tsim/insert/basic0.sim # ---- query ./test.sh -f tsim/query/interval.sim