From eb561f319dbb05bd8a14156b062beb167fa3b5e5 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Tue, 15 Mar 2022 19:52:36 +0800 Subject: [PATCH] shm --- source/dnode/mgmt/container/inc/dndInt.h | 4 +- source/dnode/mgmt/container/inc/dndWorker.h | 2 +- source/dnode/mgmt/container/src/dndWorker.c | 10 ++--- source/dnode/mgmt/dnode/src/dmWorker.c | 10 ++--- source/dnode/mgmt/mnode/inc/mmInt.h | 29 ++++++------- source/dnode/mgmt/mnode/src/mmInt.c | 3 +- source/dnode/mgmt/mnode/src/mmWorker.c | 48 +++++---------------- 7 files changed, 39 insertions(+), 67 deletions(-) diff --git a/source/dnode/mgmt/container/inc/dndInt.h b/source/dnode/mgmt/container/inc/dndInt.h index 788f615553..a2cd926159 100644 --- a/source/dnode/mgmt/container/inc/dndInt.h +++ b/source/dnode/mgmt/container/inc/dndInt.h @@ -83,7 +83,7 @@ typedef struct { int32_t minNum; int32_t maxNum; void *queueFp; - SDnode *pDnode; + void *param; STaosQueue *queue; union { SQWorkerPool pool; @@ -161,7 +161,7 @@ int32_t dndSendReqToMnode(SDnode *pDnode, SRpcMsg *pRpcMsg); int32_t dndSendReqToDnode(SDnode *pDnode, SEpSet *pEpSet, SRpcMsg *pRpcMsg); // dndWorker.h -int32_t dndInitWorker(SDnode *pDnode, SDnodeWorker *pWorker, EWorkerType type, const char *name, int32_t minNum, +int32_t dndInitWorker(void *param, SDnodeWorker *pWorker, EWorkerType type, const char *name, int32_t minNum, int32_t maxNum, void *queueFp); void dndCleanupWorker(SDnodeWorker *pWorker); int32_t dndWriteMsgToWorker(SDnodeWorker *pWorker, void *pCont, int32_t contLen); diff --git a/source/dnode/mgmt/container/inc/dndWorker.h b/source/dnode/mgmt/container/inc/dndWorker.h index 22edb74c92..918bf67599 100644 --- a/source/dnode/mgmt/container/inc/dndWorker.h +++ b/source/dnode/mgmt/container/inc/dndWorker.h @@ -22,7 +22,7 @@ extern "C" { #endif -int32_t dndInitWorker(SDnode *pDnode, SDnodeWorker *pWorker, EWorkerType type, const char *name, int32_t minNum, +int32_t dndInitWorker(void *param, SDnodeWorker *pWorker, EWorkerType type, const char *name, int32_t minNum, int32_t maxNum, void *queueFp); void dndCleanupWorker(SDnodeWorker *pWorker); int32_t dndWriteMsgToWorker(SDnodeWorker *pWorker, void *pCont, int32_t contLen); diff --git a/source/dnode/mgmt/container/src/dndWorker.c b/source/dnode/mgmt/container/src/dndWorker.c index 5ccf6640c0..ddef79e659 100644 --- a/source/dnode/mgmt/container/src/dndWorker.c +++ b/source/dnode/mgmt/container/src/dndWorker.c @@ -16,9 +16,9 @@ #define _DEFAULT_SOURCE #include "dndWorker.h" -int32_t dndInitWorker(SDnode *pDnode, SDnodeWorker *pWorker, EWorkerType type, const char *name, int32_t minNum, +int32_t dndInitWorker(void *param, SDnodeWorker *pWorker, EWorkerType type, const char *name, int32_t minNum, int32_t maxNum, void *queueFp) { - if (pDnode == NULL || pWorker == NULL || name == NULL || minNum < 0 || maxNum <= 0 || queueFp == NULL) { + if (pWorker == NULL || name == NULL || minNum < 0 || maxNum <= 0 || queueFp == NULL) { terrno = TSDB_CODE_INVALID_PARA; return -1; } @@ -28,7 +28,7 @@ int32_t dndInitWorker(SDnode *pDnode, SDnodeWorker *pWorker, EWorkerType type, c pWorker->minNum = minNum; pWorker->maxNum = maxNum; pWorker->queueFp = queueFp; - pWorker->pDnode = pDnode; + pWorker->param = param; if (pWorker->type == DND_WORKER_SINGLE) { SQWorkerPool *pPool = &pWorker->pool; @@ -39,7 +39,7 @@ int32_t dndInitWorker(SDnode *pDnode, SDnodeWorker *pWorker, EWorkerType type, c terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } - pWorker->queue = tQWorkerAllocQueue(pPool, pDnode, (FItem)queueFp); + pWorker->queue = tQWorkerAllocQueue(pPool, param, (FItem)queueFp); if (pWorker->queue == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; @@ -52,7 +52,7 @@ int32_t dndInitWorker(SDnode *pDnode, SDnodeWorker *pWorker, EWorkerType type, c terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } - pWorker->queue = tWWorkerAllocQueue(pPool, pDnode, (FItems)queueFp); + pWorker->queue = tWWorkerAllocQueue(pPool, param, (FItems)queueFp); if (pWorker->queue == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; diff --git a/source/dnode/mgmt/dnode/src/dmWorker.c b/source/dnode/mgmt/dnode/src/dmWorker.c index 161b740ac9..e0daf5260a 100644 --- a/source/dnode/mgmt/dnode/src/dmWorker.c +++ b/source/dnode/mgmt/dnode/src/dmWorker.c @@ -54,7 +54,7 @@ static void *dmThreadRoutine(void *param) { } } -static void dmProcessMgmtQueue(SDnode *pDnode, SNodeMsg *pNodeMsg) { +static void dmProcessQueue(SDnode *pDnode, SNodeMsg *pNodeMsg) { int32_t code = 0; SRpcMsg *pMsg = &pNodeMsg->rpcMsg; dTrace("msg:%p, will be processed", pNodeMsg); @@ -134,14 +134,14 @@ static void dmProcessMgmtQueue(SDnode *pDnode, SNodeMsg *pNodeMsg) { } int32_t dmStartWorker(SDnodeMgmt *pMgmt) { - if (dndInitWorker(pMgmt->pDnode, &pMgmt->mgmtWorker, DND_WORKER_SINGLE, "dnode-mgmt", 1, 1, dmProcessMgmtQueue) != - 0) { + SDnode *pDnode = pMgmt->pDnode; + + if (dndInitWorker(pDnode, &pMgmt->mgmtWorker, DND_WORKER_SINGLE, "dnode-mgmt", 1, 1, dmProcessQueue) != 0) { dError("failed to start dnode mgmt worker since %s", terrstr()); return -1; } - if (dndInitWorker(pMgmt->pDnode, &pMgmt->statusWorker, DND_WORKER_SINGLE, "dnode-status", 1, 1, - dmProcessMgmtQueue) != 0) { + if (dndInitWorker(pDnode, &pMgmt->statusWorker, DND_WORKER_SINGLE, "dnode-status", 1, 1, dmProcessQueue) != 0) { dError("failed to start dnode mgmt worker since %s", terrstr()); return -1; } diff --git a/source/dnode/mgmt/mnode/inc/mmInt.h b/source/dnode/mgmt/mnode/inc/mmInt.h index ac1b79ee62..70269b79bb 100644 --- a/source/dnode/mgmt/mnode/inc/mmInt.h +++ b/source/dnode/mgmt/mnode/inc/mmInt.h @@ -23,21 +23,20 @@ extern "C" { #endif typedef struct SMnodeMgmt { - int32_t refCount; - int8_t deployed; - int8_t dropped; - int8_t replica; - int8_t selfIndex; - SReplica replicas[TSDB_MAX_REPLICA]; - SMnode *pMnode; - SDnode *pDnode; - SProcObj *pProcess; - bool singleProc; - SRWLatch latch; - const char *path; - SDnodeWorker readWorker; - SDnodeWorker writeWorker; - SDnodeWorker syncWorker; + int32_t refCount; + int8_t deployed; + int8_t dropped; + int8_t replica; + int8_t selfIndex; + SReplica replicas[TSDB_MAX_REPLICA]; + SMnode *pMnode; + SDnode *pDnode; + SMgmtWrapper *pWrapper; + const char *path; + SRWLatch latch; + SDnodeWorker readWorker; + SDnodeWorker writeWorker; + SDnodeWorker syncWorker; } SMnodeMgmt; // interface diff --git a/source/dnode/mgmt/mnode/src/mmInt.c b/source/dnode/mgmt/mnode/src/mmInt.c index b0acd9def1..6cd5f2c76e 100644 --- a/source/dnode/mgmt/mnode/src/mmInt.c +++ b/source/dnode/mgmt/mnode/src/mmInt.c @@ -220,6 +220,7 @@ static int32_t mmInit(SMgmtWrapper *pWrapper) { dInfo("mnode-mgmt is initialized"); pMgmt->path = pWrapper->path; pMgmt->pDnode = pWrapper->pDnode; + pMgmt->pWrapper = pWrapper; taosInitRWLatch(&pMgmt->latch); if (mmReadFile(pMgmt) != 0) { @@ -293,7 +294,7 @@ static bool mmRequire(SMgmtWrapper *pWrapper) { void mmGetMgmtFp(SMgmtWrapper *pWrapper) { SMgmtFp mgmtFp = {0}; mgmtFp.openFp = mmInit; - mgmtFp.closeFp = NULL; + mgmtFp.closeFp = mmCleanup; mgmtFp.requiredFp = mmRequire; mmInitMsgHandles(pWrapper); diff --git a/source/dnode/mgmt/mnode/src/mmWorker.c b/source/dnode/mgmt/mnode/src/mmWorker.c index de8809ab17..067ca23e2b 100644 --- a/source/dnode/mgmt/mnode/src/mmWorker.c +++ b/source/dnode/mgmt/mnode/src/mmWorker.c @@ -20,32 +20,8 @@ #include "dndTransport.h" #include "dndWorker.h" -static void mmSendRpcRsp(SMnodeMgmt *pMgmt, SRpcMsg *pRpc) { - if (pRpc->code == TSDB_CODE_DND_MNODE_NOT_DEPLOYED || pRpc->code == TSDB_CODE_APP_NOT_READY) { - dmSendRedirectRsp(pMgmt->pDnode, pRpc); - } else { - rpcSendResponse(pRpc); - } -} - -void mmPutRpcRspToWorker(SMnodeMgmt *pMgmt, SRpcMsg *pRpc) { - int32_t code = -1; - - if (pMgmt->singleProc) { - mmSendRpcRsp(pMgmt, pRpc); - } else { - do { - code = taosProcPutToParentQueue(pMgmt->pProcess, pRpc, sizeof(SRpcMsg), pRpc->pCont, pRpc->contLen); - if (code != 0) { - taosMsleep(10); - } - } while (code != 0); - } -} - -static void mmConsumeMsgQueue(SMnodeMgmt *pMgmt, SNodeMsg *pMsg) { +static void mmProcessQueue(SMnodeMgmt *pMgmt, SNodeMsg *pMsg) { dTrace("msg:%p, will be processed", pMsg); - SMnode *pMnode = mmAcquire(pMgmt); SRpcMsg *pRpc = &pMsg->rpcMsg; bool isReq = (pRpc->msgType & 1U); @@ -53,7 +29,7 @@ static void mmConsumeMsgQueue(SMnodeMgmt *pMgmt, SNodeMsg *pMsg) { if (pMnode != NULL) { pMsg->pNode = pMnode; - code = mndProcessMsg((SMndMsg*)pMsg); + code = mndProcessMsg((SMndMsg *)pMsg); mmRelease(pMgmt, pMnode); } @@ -61,11 +37,11 @@ static void mmConsumeMsgQueue(SMnodeMgmt *pMgmt, SNodeMsg *pMsg) { if (pMsg->rpcMsg.handle == NULL) return; if (code == 0) { SRpcMsg rsp = {.handle = pRpc->handle, .contLen = pMsg->rspLen, .pCont = pMsg->pRsp}; - mmPutRpcRspToWorker(pMgmt, &rsp); + dndSendRsp(pMgmt->pWrapper, &rsp); } else { if (terrno != TSDB_CODE_MND_ACTION_IN_PROGRESS) { SRpcMsg rsp = {.handle = pRpc->handle, .contLen = pMsg->rspLen, .pCont = pMsg->pRsp, .code = terrno}; - mmPutRpcRspToWorker(pMgmt, &rsp); + dndSendRsp(pMgmt->pWrapper, &rsp); } } } @@ -76,18 +52,17 @@ static void mmConsumeMsgQueue(SMnodeMgmt *pMgmt, SNodeMsg *pMsg) { } int32_t mmStartWorker(SMnodeMgmt *pMgmt) { - SDnode *pDnode = pMgmt->pDnode; - if (dndInitWorker(pDnode, &pMgmt->readWorker, DND_WORKER_SINGLE, "mnode-read", 0, 1, mmConsumeMsgQueue) != 0) { + if (dndInitWorker(pMgmt, &pMgmt->readWorker, DND_WORKER_SINGLE, "mnode-read", 0, 1, mmProcessQueue) != 0) { dError("failed to start mnode read worker since %s", terrstr()); return -1; } - if (dndInitWorker(pDnode, &pMgmt->writeWorker, DND_WORKER_SINGLE, "mnode-write", 0, 1, mmConsumeMsgQueue) != 0) { + if (dndInitWorker(pMgmt, &pMgmt->writeWorker, DND_WORKER_SINGLE, "mnode-write", 0, 1, mmProcessQueue) != 0) { dError("failed to start mnode write worker since %s", terrstr()); return -1; } - if (dndInitWorker(pDnode, &pMgmt->syncWorker, DND_WORKER_SINGLE, "mnode-sync", 0, 1, mmConsumeMsgQueue) != 0) { + if (dndInitWorker(pMgmt, &pMgmt->syncWorker, DND_WORKER_SINGLE, "mnode-sync", 0, 1, mmProcessQueue) != 0) { dError("failed to start mnode sync worker since %s", terrstr()); return -1; } @@ -119,18 +94,15 @@ static int32_t mmPutMsgToWorker(SMnodeMgmt *pMgmt, SDnodeWorker *pWorker, SNodeM return code; } -int32_t mmProcessWriteMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { - SMnodeMgmt *pMgmt = pWrapper->pMgmt; +int32_t mmProcessWriteMsg(SMnodeMgmt *pMgmt, SNodeMsg *pMsg) { return mmPutMsgToWorker(pMgmt, &pMgmt->writeWorker, pMsg); } -int32_t mmProcessSyncMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { - SMnodeMgmt *pMgmt = pWrapper->pMgmt; +int32_t mmProcessSyncMsg(SMnodeMgmt *pMgmt, SNodeMsg *pMsg) { return mmPutMsgToWorker(pMgmt, &pMgmt->syncWorker, pMsg); } -int32_t mmProcessReadMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { - SMnodeMgmt *pMgmt = pWrapper->pMgmt; +int32_t mmProcessReadMsg(SMnodeMgmt *pMgmt, SNodeMsg *pMsg) { return mmPutMsgToWorker(pMgmt, &pMgmt->readWorker, pMsg); }