diff --git a/source/dnode/mgmt/impl/mnodeMgmt/inc/mm.h b/source/dnode/mgmt/impl/mnodeMgmt/inc/mm.h index 3a297368ce..7e8d01836e 100644 --- a/source/dnode/mgmt/impl/mnodeMgmt/inc/mm.h +++ b/source/dnode/mgmt/impl/mnodeMgmt/inc/mm.h @@ -24,32 +24,37 @@ extern "C" { // interface int32_t mmInit(SDnode *pDnode); void mmCleanup(SDnode *pDnode); - -// internal -void mmInitMsgFp(SMnodeMgmt *pMgmt); - -SMnode *mmAcquire(SDnode *pDnode); -void mmRelease(SDnode *pDnode, SMnode *pMnode); +int32_t mmProcessCreateMnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg); +int32_t mmProcessAlterMnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg); +int32_t mmProcessDropMnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg); // mmFile int32_t mmReadFile(SDnode *pDnode); int32_t mmWriteFile(SDnode *pDnode); -// mmMsg -void mmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet); - -// mmQueue -int32_t mmWriteToWorker(SDnode *pDnode, SDnodeWorker *pWorker, SMnodeMsg *pMnodeMsg); -//////////// - +// mmHandle int32_t dndGetUserAuthFromMnode(SDnode *pDnode, char *user, char *spi, char *encrypt, char *secret, char *ckey); -int32_t dndProcessCreateMnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg); -int32_t dndProcessAlterMnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg); -int32_t dndProcessDropMnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg); - int32_t mmGetMonitorInfo(SDnode *pDnode, SMonClusterInfo *pClusterInfo, SMonVgroupInfo *pVgroupInfo, SMonGrantInfo *pGrantInfo); +// mmMgmt +SMnode *mmAcquire(SDnode *pDnode); +void mmRelease(SDnode *pDnode, SMnode *pMnode); +int32_t mmOpen(SDnode *pDnode, SMnodeOpt *pOption); +int32_t mmAlter(SDnode *pDnode, SMnodeOpt *pOption); +int32_t mmDrop(SDnode *pDnode); +int32_t mmBuildOptionFromReq(SDnode *pDnode, SMnodeOpt *pOption, SDCreateMnodeReq *pCreate); + +// mmWorker +int32_t mmStartWorker(SDnode *pDnode); +void mmStopWorker(SDnode *pDnode); +void mmInitMsgFp(SMnodeMgmt *pMgmt); +void mmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet); +int32_t mmPutMsgToWriteQueue(SDnode *pDnode, SRpcMsg *pRpcMsg); +int32_t mmPutMsgToReadQueue(SDnode *pDnode, SRpcMsg *pRpcMsg); +void mmConsumeChildQueue(SDnode *pDnode, SBlockItem *pBlock); +void mmConsumeParentQueue(SMnodeMgmt *pMgmt, SBlockItem *pBlock); + #ifdef __cplusplus } #endif diff --git a/source/dnode/mgmt/impl/mnodeMgmt/src/mmHandle.c b/source/dnode/mgmt/impl/mnodeMgmt/src/mmHandle.c new file mode 100644 index 0000000000..34fe71eef7 --- /dev/null +++ b/source/dnode/mgmt/impl/mnodeMgmt/src/mmHandle.c @@ -0,0 +1,139 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#define _DEFAULT_SOURCE +#include "mm.h" + +#include "dndMgmt.h" + +int32_t mmProcessCreateMnodeReq(SDnode *pDnode, SRpcMsg *pReq) { + SDCreateMnodeReq createReq = {0}; + if (tDeserializeSDCreateMnodeReq(pReq->pCont, pReq->contLen, &createReq) != 0) { + terrno = TSDB_CODE_INVALID_MSG; + return -1; + } + + if (createReq.replica <= 1 || createReq.dnodeId != dndGetDnodeId(pDnode)) { + terrno = TSDB_CODE_DND_MNODE_INVALID_OPTION; + dError("failed to create mnode since %s", terrstr()); + return -1; + } + + SMnodeOpt option = {0}; + if (mmBuildOptionFromReq(pDnode, &option, &createReq) != 0) { + terrno = TSDB_CODE_DND_MNODE_INVALID_OPTION; + dError("failed to create mnode since %s", terrstr()); + return -1; + } + + SMnode *pMnode = mmAcquire(pDnode); + if (pMnode != NULL) { + mmRelease(pDnode, pMnode); + terrno = TSDB_CODE_DND_MNODE_ALREADY_DEPLOYED; + dError("failed to create mnode since %s", terrstr()); + return -1; + } + + dDebug("start to create mnode"); + return mmOpen(pDnode, &option); +} + +int32_t mmProcessAlterMnodeReq(SDnode *pDnode, SRpcMsg *pReq) { + SDAlterMnodeReq alterReq = {0}; + if (tDeserializeSDCreateMnodeReq(pReq->pCont, pReq->contLen, &alterReq) != 0) { + terrno = TSDB_CODE_INVALID_MSG; + return -1; + } + + if (alterReq.dnodeId != dndGetDnodeId(pDnode)) { + terrno = TSDB_CODE_DND_MNODE_INVALID_OPTION; + dError("failed to alter mnode since %s", terrstr()); + return -1; + } + + SMnodeOpt option = {0}; + if (mmBuildOptionFromReq(pDnode, &option, &alterReq) != 0) { + terrno = TSDB_CODE_DND_MNODE_INVALID_OPTION; + dError("failed to alter mnode since %s", terrstr()); + return -1; + } + + SMnode *pMnode = mmAcquire(pDnode); + if (pMnode == NULL) { + terrno = TSDB_CODE_DND_MNODE_NOT_DEPLOYED; + dError("failed to alter mnode since %s", terrstr()); + return -1; + } + + dDebug("start to alter mnode"); + int32_t code = mmAlter(pDnode, &option); + mmRelease(pDnode, pMnode); + + return code; +} + +int32_t mmProcessDropMnodeReq(SDnode *pDnode, SRpcMsg *pReq) { + SDDropMnodeReq dropReq = {0}; + if (tDeserializeSMCreateDropMnodeReq(pReq->pCont, pReq->contLen, &dropReq) != 0) { + terrno = TSDB_CODE_INVALID_MSG; + return -1; + } + + if (dropReq.dnodeId != dndGetDnodeId(pDnode)) { + terrno = TSDB_CODE_DND_MNODE_INVALID_OPTION; + dError("failed to drop mnode since %s", terrstr()); + return -1; + } + + SMnode *pMnode = mmAcquire(pDnode); + if (pMnode == NULL) { + terrno = TSDB_CODE_DND_MNODE_NOT_DEPLOYED; + dError("failed to drop mnode since %s", terrstr()); + return -1; + } + + dDebug("start to drop mnode"); + int32_t code = mmDrop(pDnode); + mmRelease(pDnode, pMnode); + + return code; +} + +int32_t mmGetMonitorInfo(SDnode *pDnode, SMonClusterInfo *pClusterInfo, SMonVgroupInfo *pVgroupInfo, + SMonGrantInfo *pGrantInfo) { + SMnode *pMnode = mmAcquire(pDnode); + if (pMnode == NULL) return -1; + + int32_t code = mndGetMonitorInfo(pMnode, pClusterInfo, pVgroupInfo, pGrantInfo); + mmRelease(pDnode, pMnode); + return code; +} + +int32_t dndGetUserAuthFromMnode(SDnode *pDnode, char *user, char *spi, char *encrypt, char *secret, char *ckey) { + SMnodeMgmt *pMgmt = &pDnode->mmgmt; + + SMnode *pMnode = mmAcquire(pDnode); + if (pMnode == NULL) { + terrno = TSDB_CODE_APP_NOT_READY; + dTrace("failed to get user auth since %s", terrstr()); + return -1; + } + + int32_t code = mndRetriveAuth(pMnode, user, spi, encrypt, secret, ckey); + mmRelease(pDnode, pMnode); + + dTrace("user:%s, retrieve auth spi:%d encrypt:%d", user, *spi, *encrypt); + return code; +} diff --git a/source/dnode/mgmt/impl/mnodeMgmt/src/mmMgmt.c b/source/dnode/mgmt/impl/mnodeMgmt/src/mmMgmt.c index 9669d7d132..2ee12e66ca 100644 --- a/source/dnode/mgmt/impl/mnodeMgmt/src/mmMgmt.c +++ b/source/dnode/mgmt/impl/mnodeMgmt/src/mmMgmt.c @@ -18,11 +18,72 @@ #include "dndMgmt.h" #include "dndTransport.h" -#include "dndWorker.h" +static void mmInitOption(SDnode *pDnode, SMnodeOpt *pOption); +static void mmBuildOptionForDeploy(SDnode *pDnode, SMnodeOpt *pOption); +static void mmBuildOptionForOpen(SDnode *pDnode, SMnodeOpt *pOption); +static bool mmDeployRequired(SDnode *pDnode); +static int32_t mmOpenImp(SDnode *pDnode, SMnodeOpt *pOption); +int32_t mmInit(SDnode *pDnode) { + dInfo("mnode mgmt start to init"); + int32_t code = -1; -static void dndProcessMnodeQueue(SDnode *pDnode, SMnodeMsg *pMsg); + SMnodeMgmt *pMgmt = &pDnode->mmgmt; + taosInitRWLatch(&pMgmt->latch); + mmInitMsgFp(pMgmt); + + if (mmReadFile(pDnode) != 0) { + goto _OVER; + } + + if (pMgmt->dropped) { + dInfo("mnode has been dropped and needs to be deleted"); + mndDestroy(pDnode->dir.mnode); + code = 0; + goto _OVER; + } + + if (!pMgmt->deployed) { + bool required = mmDeployRequired(pDnode); + if (!required) { + dInfo("mnode does not need to be deployed"); + code = 0; + goto _OVER; + } + + dInfo("mnode start to deploy"); + SMnodeOpt option = {0}; + mmBuildOptionForDeploy(pDnode, &option); + code = mmOpen(pDnode, &option); + } else { + dInfo("mnode start to open"); + SMnodeOpt option = {0}; + mmBuildOptionForOpen(pDnode, &option); + code = mmOpen(pDnode, &option); + } + +_OVER: + if (code == 0) { + dInfo("mnode mgmt init success"); + } else { + dError("failed to init mnode mgmt since %s", terrstr()); + mmCleanup(pDnode); + } + + return code; +} + +void mmCleanup(SDnode *pDnode) { + dInfo("mnode mgmt start to clean up"); + SMnodeMgmt *pMgmt = &pDnode->mmgmt; + if (pMgmt->pMnode) { + mmStopWorker(pDnode); + mndClose(pMgmt->pMnode); + pMgmt->pMnode = NULL; + } + dInfo("mnode mgmt is cleaned up"); +} SMnode *mmAcquire(SDnode *pDnode) { SMnodeMgmt *pMgmt = &pDnode->mmgmt; @@ -54,40 +115,84 @@ void mmRelease(SDnode *pDnode, SMnode *pMnode) { dTrace("release mnode, refCount:%d", refCount); } -static int32_t mmStartWorker(SDnode *pDnode) { +int32_t mmOpen(SDnode *pDnode, SMnodeOpt *pOption) { SMnodeMgmt *pMgmt = &pDnode->mmgmt; - if (dndInitWorker(pDnode, &pMgmt->readWorker, DND_WORKER_SINGLE, "mnode-read", 0, 1, dndProcessMnodeQueue) != 0) { - dError("failed to start mnode read worker since %s", terrstr()); + pMgmt->singleProc = true; + pMgmt->isChild = false; + + int32_t code = mmOpenImp(pDnode, pOption); + + if (code == 0 && !pMgmt->singleProc) { + SProcCfg cfg = {0}; + cfg.childFp = (ProcFp)mmConsumeChildQueue; + cfg.parentFp = (ProcFp)mmConsumeParentQueue; + cfg.childQueueSize = 1024 * 1024; + cfg.parentQueueSize = 1024 * 1024; + + pMgmt->pProcess = taosProcInit(&cfg); + if (pMgmt->pProcess == NULL) { + return -1; + } + + pMgmt->pProcess->pParent = pDnode; + pMgmt->pProcess->testFlag = true; + return taosProcStart(pMgmt->pProcess); + } + + return code; +} + +int32_t mmAlter(SDnode *pDnode, SMnodeOpt *pOption) { + SMnodeMgmt *pMgmt = &pDnode->mmgmt; + + SMnode *pMnode = mmAcquire(pDnode); + if (pMnode == NULL) { + dError("failed to alter mnode since %s", terrstr()); return -1; } - if (dndInitWorker(pDnode, &pMgmt->writeWorker, DND_WORKER_SINGLE, "mnode-write", 0, 1, dndProcessMnodeQueue) != 0) { - dError("failed to start mnode write worker since %s", terrstr()); - return -1; - } - - if (dndInitWorker(pDnode, &pMgmt->syncWorker, DND_WORKER_SINGLE, "mnode-sync", 0, 1, dndProcessMnodeQueue) != 0) { - dError("failed to start mnode sync worker since %s", terrstr()); + if (mndAlter(pMnode, pOption) != 0) { + dError("failed to alter mnode since %s", terrstr()); + mmRelease(pDnode, pMnode); return -1; } + mmRelease(pDnode, pMnode); return 0; } -static void mmStopWorker(SDnode *pDnode) { +int32_t mmDrop(SDnode *pDnode) { SMnodeMgmt *pMgmt = &pDnode->mmgmt; - taosWLockLatch(&pMgmt->latch); - pMgmt->deployed = 0; - taosWUnLockLatch(&pMgmt->latch); - - while (pMgmt->refCount > 1) { - taosMsleep(10); + SMnode *pMnode = mmAcquire(pDnode); + if (pMnode == NULL) { + dError("failed to drop mnode since %s", terrstr()); + return -1; } - dndCleanupWorker(&pMgmt->readWorker); - dndCleanupWorker(&pMgmt->writeWorker); - dndCleanupWorker(&pMgmt->syncWorker); + taosRLockLatch(&pMgmt->latch); + pMgmt->dropped = 1; + taosRUnLockLatch(&pMgmt->latch); + + if (mmWriteFile(pDnode) != 0) { + taosRLockLatch(&pMgmt->latch); + pMgmt->dropped = 0; + taosRUnLockLatch(&pMgmt->latch); + + mmRelease(pDnode, pMnode); + dError("failed to drop mnode since %s", terrstr()); + return -1; + } + + mmRelease(pDnode, pMnode); + mmStopWorker(pDnode); + pMgmt->deployed = 0; + mmWriteFile(pDnode); + mndClose(pMnode); + pMgmt->pMnode = NULL; + mndDestroy(pDnode->dir.mnode); + + return 0; } static bool mmDeployRequired(SDnode *pDnode) { @@ -106,43 +211,7 @@ static bool mmDeployRequired(SDnode *pDnode) { return true; } -static int32_t mmPutMsgToQueue(SDnode *pDnode, SDnodeWorker *pWorker, SRpcMsg *pRpcMsg) { - int32_t contLen = sizeof(SMnodeMsg) + pRpcMsg->contLen; - SMnodeMsg *pMnodeMsg = taosAllocateQitem(contLen); - if (pMnodeMsg == NULL) { - return -1; - } - - pMnodeMsg->contLen = pRpcMsg->contLen; - pMnodeMsg->pCont = (char *)pMnodeMsg + sizeof(SMnodeMsg); - memcpy(pMnodeMsg->pCont, pRpcMsg->pCont, pRpcMsg->contLen); - rpcFreeCont(pRpcMsg->pCont); - - int32_t code = mmWriteToWorker(pDnode, pWorker, pMnodeMsg); - if (code != 0) { - taosFreeQitem(pMnodeMsg); - } - - return code; -} - -static int32_t mmPutMsgToWriteQueue(SDnode *pDnode, SRpcMsg *pRpcMsg) { - return mmPutMsgToQueue(pDnode, &pDnode->mmgmt.writeWorker, pRpcMsg); -} - -static int32_t mmPutMsgToReadQueue(SDnode *pDnode, SRpcMsg *pRpcMsg) { - return mmPutMsgToQueue(pDnode, &pDnode->mmgmt.readWorker, pRpcMsg); -} - -static void mmProcessChildQueue(SDnode *pDnode, SBlockItem *pBlock) { - SMnodeMsg *pMsg = (SMnodeMsg *)pBlock->pCont; - - if (mmWriteToWorker(pDnode, &pDnode->mmgmt.writeWorker, pMsg) != 0) { - //todo - } -} - -static void mmInitOptionImp(SDnode *pDnode, SMnodeOpt *pOption) { +static void mmInitOption(SDnode *pDnode, SMnodeOpt *pOption) { pOption->pDnode = pDnode; pOption->sendReqToDnodeFp = dndSendReqToDnode; pOption->sendReqToMnodeFp = dndSendReqToMnode; @@ -153,8 +222,8 @@ static void mmInitOptionImp(SDnode *pDnode, SMnodeOpt *pOption) { pOption->clusterId = dndGetClusterId(pDnode); } -static void mmInitDeployOption(SDnode *pDnode, SMnodeOpt *pOption) { - mmInitOptionImp(pDnode, pOption); +static void mmBuildOptionForDeploy(SDnode *pDnode, SMnodeOpt *pOption) { + mmInitOption(pDnode, pOption); pOption->replica = 1; pOption->selfIndex = 0; SReplica *pReplica = &pOption->replicas[0]; @@ -168,16 +237,16 @@ static void mmInitDeployOption(SDnode *pDnode, SMnodeOpt *pOption) { memcpy(&pMgmt->replicas, pOption->replicas, sizeof(SReplica) * TSDB_MAX_REPLICA); } -static void mmInitOpenOption(SDnode *pDnode, SMnodeOpt *pOption) { - mmInitOptionImp(pDnode, pOption); +static void mmBuildOptionForOpen(SDnode *pDnode, SMnodeOpt *pOption) { + mmInitOption(pDnode, pOption); SMnodeMgmt *pMgmt = &pDnode->mmgmt; pOption->selfIndex = pMgmt->selfIndex; pOption->replica = pMgmt->replica; memcpy(&pOption->replicas, pMgmt->replicas, sizeof(SReplica) * TSDB_MAX_REPLICA); } -static int32_t mmInitOptionFromReq(SDnode *pDnode, SMnodeOpt *pOption, SDCreateMnodeReq *pCreate) { - mmInitOptionImp(pDnode, pOption); +int32_t mmBuildOptionFromReq(SDnode *pDnode, SMnodeOpt *pOption, SDCreateMnodeReq *pCreate) { + mmInitOption(pDnode, pOption); pOption->dnodeId = dndGetDnodeId(pDnode); pOption->clusterId = dndGetClusterId(pDnode); @@ -239,278 +308,3 @@ static int32_t mmOpenImp(SDnode *pDnode, SMnodeOpt *pOption) { dInfo("mnode open successfully"); return 0; } - - -static void dndMnodeProcessParentQueue(SMnodeMgmt *pMgmt, SBlockItem *pItem) {} - -static int32_t mmOpen(SDnode *pDnode, SMnodeOpt *pOption) { - SMnodeMgmt *pMgmt = &pDnode->mmgmt; - pMgmt->singleProc = false; - - int32_t code = mmOpenImp(pDnode, pOption); - - if (code == 0 && !pMgmt->singleProc) { - SProcCfg cfg = {0}; - cfg.childFp = (ProcFp)mmProcessChildQueue; - cfg.parentFp = (ProcFp)dndMnodeProcessParentQueue; - cfg.childQueueSize = 1024 * 1024; - cfg.parentQueueSize = 1024 * 1024; - - pMgmt->pProcess = taosProcInit(&cfg); - if (pMgmt->pProcess == NULL) { - return -1; - } - pMgmt->pProcess->pParent = pDnode; - pMgmt->pProcess->testFlag = true; - return taosProcStart(pMgmt->pProcess); - } - - return code; -} - -static int32_t dndAlterMnode(SDnode *pDnode, SMnodeOpt *pOption) { - SMnodeMgmt *pMgmt = &pDnode->mmgmt; - - SMnode *pMnode = mmAcquire(pDnode); - if (pMnode == NULL) { - dError("failed to alter mnode since %s", terrstr()); - return -1; - } - - if (mndAlter(pMnode, pOption) != 0) { - dError("failed to alter mnode since %s", terrstr()); - mmRelease(pDnode, pMnode); - return -1; - } - - mmRelease(pDnode, pMnode); - return 0; -} - -static int32_t dndDropMnode(SDnode *pDnode) { - SMnodeMgmt *pMgmt = &pDnode->mmgmt; - - SMnode *pMnode = mmAcquire(pDnode); - if (pMnode == NULL) { - dError("failed to drop mnode since %s", terrstr()); - return -1; - } - - taosRLockLatch(&pMgmt->latch); - pMgmt->dropped = 1; - taosRUnLockLatch(&pMgmt->latch); - - if (mmWriteFile(pDnode) != 0) { - taosRLockLatch(&pMgmt->latch); - pMgmt->dropped = 0; - taosRUnLockLatch(&pMgmt->latch); - - mmRelease(pDnode, pMnode); - dError("failed to drop mnode since %s", terrstr()); - return -1; - } - - mmRelease(pDnode, pMnode); - mmStopWorker(pDnode); - pMgmt->deployed = 0; - mmWriteFile(pDnode); - mndClose(pMnode); - pMgmt->pMnode = NULL; - mndDestroy(pDnode->dir.mnode); - - return 0; -} - -int32_t dndProcessCreateMnodeReq(SDnode *pDnode, SRpcMsg *pReq) { - SDCreateMnodeReq createReq = {0}; - if (tDeserializeSDCreateMnodeReq(pReq->pCont, pReq->contLen, &createReq) != 0) { - terrno = TSDB_CODE_INVALID_MSG; - return -1; - } - - if (createReq.replica <= 1 || createReq.dnodeId != dndGetDnodeId(pDnode)) { - terrno = TSDB_CODE_DND_MNODE_INVALID_OPTION; - dError("failed to create mnode since %s", terrstr()); - return -1; - } - - SMnodeOpt option = {0}; - if (mmInitOptionFromReq(pDnode, &option, &createReq) != 0) { - terrno = TSDB_CODE_DND_MNODE_INVALID_OPTION; - dError("failed to create mnode since %s", terrstr()); - return -1; - } - - SMnode *pMnode = mmAcquire(pDnode); - if (pMnode != NULL) { - mmRelease(pDnode, pMnode); - terrno = TSDB_CODE_DND_MNODE_ALREADY_DEPLOYED; - dError("failed to create mnode since %s", terrstr()); - return -1; - } - - dDebug("start to create mnode"); - return mmOpenImp(pDnode, &option); -} - -int32_t dndProcessAlterMnodeReq(SDnode *pDnode, SRpcMsg *pReq) { - SDAlterMnodeReq alterReq = {0}; - if (tDeserializeSDCreateMnodeReq(pReq->pCont, pReq->contLen, &alterReq) != 0) { - terrno = TSDB_CODE_INVALID_MSG; - return -1; - } - - if (alterReq.dnodeId != dndGetDnodeId(pDnode)) { - terrno = TSDB_CODE_DND_MNODE_INVALID_OPTION; - dError("failed to alter mnode since %s", terrstr()); - return -1; - } - - SMnodeOpt option = {0}; - if (mmInitOptionFromReq(pDnode, &option, &alterReq) != 0) { - terrno = TSDB_CODE_DND_MNODE_INVALID_OPTION; - dError("failed to alter mnode since %s", terrstr()); - return -1; - } - - SMnode *pMnode = mmAcquire(pDnode); - if (pMnode == NULL) { - terrno = TSDB_CODE_DND_MNODE_NOT_DEPLOYED; - dError("failed to alter mnode since %s", terrstr()); - return -1; - } - - dDebug("start to alter mnode"); - int32_t code = dndAlterMnode(pDnode, &option); - mmRelease(pDnode, pMnode); - - return code; -} - -int32_t dndProcessDropMnodeReq(SDnode *pDnode, SRpcMsg *pReq) { - SDDropMnodeReq dropReq = {0}; - if (tDeserializeSMCreateDropMnodeReq(pReq->pCont, pReq->contLen, &dropReq) != 0) { - terrno = TSDB_CODE_INVALID_MSG; - return -1; - } - - if (dropReq.dnodeId != dndGetDnodeId(pDnode)) { - terrno = TSDB_CODE_DND_MNODE_INVALID_OPTION; - dError("failed to drop mnode since %s", terrstr()); - return -1; - } - - SMnode *pMnode = mmAcquire(pDnode); - if (pMnode == NULL) { - terrno = TSDB_CODE_DND_MNODE_NOT_DEPLOYED; - dError("failed to drop mnode since %s", terrstr()); - return -1; - } - - dDebug("start to drop mnode"); - int32_t code = dndDropMnode(pDnode); - mmRelease(pDnode, pMnode); - - return code; -} - -static void dndProcessMnodeQueue(SDnode *pDnode, SMnodeMsg *pMsg) { - SMnodeMgmt *pMgmt = &pDnode->mmgmt; - - SMnode *pMnode = mmAcquire(pDnode); - if (pMnode != NULL) { - mndProcessMsg(pMsg); - mmRelease(pDnode, pMnode); - } else { - mndSendRsp(pMsg, terrno); - } - - // mndCleanupMsg(pMsg); -} - -int32_t mmInit(SDnode *pDnode) { - dInfo("mnode mgmt start to init"); - int32_t code = -1; - - SMnodeMgmt *pMgmt = &pDnode->mmgmt; - taosInitRWLatch(&pMgmt->latch); - mmInitMsgFp(pMgmt); - - if (mmReadFile(pDnode) != 0) { - goto _OVER; - } - - if (pMgmt->dropped) { - dInfo("mnode has been dropped and needs to be deleted"); - mndDestroy(pDnode->dir.mnode); - code = 0; - goto _OVER; - } - - if (!pMgmt->deployed) { - bool required = mmDeployRequired(pDnode); - if (!required) { - dInfo("mnode does not need to be deployed"); - code = 0; - goto _OVER; - } - - dInfo("mnode start to deploy"); - SMnodeOpt option = {0}; - mmInitDeployOption(pDnode, &option); - code = mmOpen(pDnode, &option); - } else { - dInfo("mnode start to open"); - SMnodeOpt option = {0}; - mmInitOpenOption(pDnode, &option); - code = mmOpen(pDnode, &option); - } - -_OVER: - if (code == 0) { - dInfo("mnode mgmt init success"); - } else { - dError("failed to init mnode mgmt since %s", terrstr()); - mmCleanup(pDnode); - } - - return code; -} - -void mmCleanup(SDnode *pDnode) { - dInfo("mnode mgmt start to clean up"); - SMnodeMgmt *pMgmt = &pDnode->mmgmt; - if (pMgmt->pMnode) { - mmStopWorker(pDnode); - mndClose(pMgmt->pMnode); - pMgmt->pMnode = NULL; - } - dInfo("mnode mgmt is cleaned up"); -} - -int32_t dndGetUserAuthFromMnode(SDnode *pDnode, char *user, char *spi, char *encrypt, char *secret, char *ckey) { - SMnodeMgmt *pMgmt = &pDnode->mmgmt; - - SMnode *pMnode = mmAcquire(pDnode); - if (pMnode == NULL) { - terrno = TSDB_CODE_APP_NOT_READY; - dTrace("failed to get user auth since %s", terrstr()); - return -1; - } - - int32_t code = mndRetriveAuth(pMnode, user, spi, encrypt, secret, ckey); - mmRelease(pDnode, pMnode); - - dTrace("user:%s, retrieve auth spi:%d encrypt:%d", user, *spi, *encrypt); - return code; -} - -int32_t mmGetMonitorInfo(SDnode *pDnode, SMonClusterInfo *pClusterInfo, SMonVgroupInfo *pVgroupInfo, - SMonGrantInfo *pGrantInfo) { - SMnode *pMnode = mmAcquire(pDnode); - if (pMnode == NULL) return -1; - - int32_t code = mndGetMonitorInfo(pMnode, pClusterInfo, pVgroupInfo, pGrantInfo); - mmRelease(pDnode, pMnode); - return code; -} diff --git a/source/dnode/mgmt/impl/mnodeMgmt/src/mmProc.c b/source/dnode/mgmt/impl/mnodeMgmt/src/mmProc.c deleted file mode 100644 index e69de29bb2..0000000000 diff --git a/source/dnode/mgmt/impl/mnodeMgmt/src/mmQueue.c b/source/dnode/mgmt/impl/mnodeMgmt/src/mmQueue.c deleted file mode 100644 index e69de29bb2..0000000000 diff --git a/source/dnode/mgmt/impl/mnodeMgmt/src/mmMsg.c b/source/dnode/mgmt/impl/mnodeMgmt/src/mmWorker.c similarity index 71% rename from source/dnode/mgmt/impl/mnodeMgmt/src/mmMsg.c rename to source/dnode/mgmt/impl/mnodeMgmt/src/mmWorker.c index 0e8934fe4e..1990dab822 100644 --- a/source/dnode/mgmt/impl/mnodeMgmt/src/mmMsg.c +++ b/source/dnode/mgmt/impl/mnodeMgmt/src/mmWorker.c @@ -23,6 +23,45 @@ static int32_t mmProcessWriteMsg(SDnode *pDnode, SMnodeMsg *pMnodeMsg); static int32_t mmProcessSyncMsg(SDnode *pDnode, SMnodeMsg *pMnodeMsg); static int32_t mmProcessReadMsg(SDnode *pDnode, SMnodeMsg *pMnodeMsg); +static int32_t mmPutMsgToWorker(SDnode *pDnode, SDnodeWorker *pWorker, SMnodeMsg *pMnodeMsg); +static int32_t mmPutRpcMsgToWorker(SDnode *pDnode, SDnodeWorker *pWorker, SRpcMsg *pRpcMsg); +static void mmConsumeQueue(SDnode *pDnode, SMnodeMsg *pMsg); + +int32_t mmStartWorker(SDnode *pDnode) { + SMnodeMgmt *pMgmt = &pDnode->mmgmt; + if (dndInitWorker(pDnode, &pMgmt->readWorker, DND_WORKER_SINGLE, "mnode-read", 0, 1, mmConsumeQueue) != 0) { + dError("failed to start mnode read worker since %s", terrstr()); + return -1; + } + + if (dndInitWorker(pDnode, &pMgmt->writeWorker, DND_WORKER_SINGLE, "mnode-write", 0, 1, mmConsumeQueue) != 0) { + dError("failed to start mnode write worker since %s", terrstr()); + return -1; + } + + if (dndInitWorker(pDnode, &pMgmt->syncWorker, DND_WORKER_SINGLE, "mnode-sync", 0, 1, mmConsumeQueue) != 0) { + dError("failed to start mnode sync worker since %s", terrstr()); + return -1; + } + + return 0; +} + +void mmStopWorker(SDnode *pDnode) { + SMnodeMgmt *pMgmt = &pDnode->mmgmt; + + taosWLockLatch(&pMgmt->latch); + pMgmt->deployed = 0; + taosWUnLockLatch(&pMgmt->latch); + + while (pMgmt->refCount > 1) { + taosMsleep(10); + } + + dndCleanupWorker(&pMgmt->readWorker); + dndCleanupWorker(&pMgmt->writeWorker); + dndCleanupWorker(&pMgmt->syncWorker); +} void mmInitMsgFp(SMnodeMgmt *pMgmt) { // Requests handled by DNODE @@ -151,18 +190,26 @@ _OVER: } int32_t mmProcessWriteMsg(SDnode *pDnode, SMnodeMsg *pMnodeMsg) { - return mmWriteToWorker(pDnode, &pDnode->mmgmt.writeWorker, pMnodeMsg); + return mmPutMsgToWorker(pDnode, &pDnode->mmgmt.writeWorker, pMnodeMsg); } int32_t mmProcessSyncMsg(SDnode *pDnode, SMnodeMsg *pMnodeMsg) { - return mmWriteToWorker(pDnode, &pDnode->mmgmt.syncWorker, pMnodeMsg); + return mmPutMsgToWorker(pDnode, &pDnode->mmgmt.syncWorker, pMnodeMsg); } int32_t mmProcessReadMsg(SDnode *pDnode, SMnodeMsg *pMnodeMsg) { - return mmWriteToWorker(pDnode, &pDnode->mmgmt.readWorker, pMnodeMsg); + return mmPutMsgToWorker(pDnode, &pDnode->mmgmt.readWorker, pMnodeMsg); } -int32_t mmWriteToWorker(SDnode *pDnode, SDnodeWorker *pWorker, SMnodeMsg *pMnodeMsg) { +int32_t mmPutMsgToWriteQueue(SDnode *pDnode, SRpcMsg *pRpcMsg) { + return mmPutRpcMsgToWorker(pDnode, &pDnode->mmgmt.writeWorker, pRpcMsg); +} + +int32_t mmPutMsgToReadQueue(SDnode *pDnode, SRpcMsg *pRpcMsg) { + return mmPutRpcMsgToWorker(pDnode, &pDnode->mmgmt.readWorker, pRpcMsg); +} + +static int32_t mmPutMsgToWorker(SDnode *pDnode, SDnodeWorker *pWorker, SMnodeMsg *pMnodeMsg) { SMnode *pMnode = mmAcquire(pDnode); if (pMnode == NULL) return -1; @@ -172,3 +219,47 @@ int32_t mmWriteToWorker(SDnode *pDnode, SDnodeWorker *pWorker, SMnodeMsg *pMnode mmRelease(pDnode, pMnode); return code; } + +static int32_t mmPutRpcMsgToWorker(SDnode *pDnode, SDnodeWorker *pWorker, SRpcMsg *pRpcMsg) { + int32_t contLen = sizeof(SMnodeMsg) + pRpcMsg->contLen; + SMnodeMsg *pMnodeMsg = taosAllocateQitem(contLen); + if (pMnodeMsg == NULL) { + return -1; + } + + pMnodeMsg->contLen = pRpcMsg->contLen; + pMnodeMsg->pCont = (char *)pMnodeMsg + sizeof(SMnodeMsg); + memcpy(pMnodeMsg->pCont, pRpcMsg->pCont, pRpcMsg->contLen); + rpcFreeCont(pRpcMsg->pCont); + + int32_t code = mmPutMsgToWorker(pDnode, pWorker, pMnodeMsg); + if (code != 0) { + taosFreeQitem(pMnodeMsg); + } + + return code; +} + +void mmConsumeChildQueue(SDnode *pDnode, SBlockItem *pBlock) { + SMnodeMsg *pMsg = (SMnodeMsg *)pBlock->pCont; + + if (mmPutMsgToWorker(pDnode, &pDnode->mmgmt.writeWorker, pMsg) != 0) { + // todo + } +} + +void mmConsumeParentQueue(SMnodeMgmt *pMgmt, SBlockItem *pBlock) {} + +static void mmConsumeQueue(SDnode *pDnode, SMnodeMsg *pMsg) { + SMnodeMgmt *pMgmt = &pDnode->mmgmt; + + SMnode *pMnode = mmAcquire(pDnode); + if (pMnode != NULL) { + mndProcessMsg(pMsg); + mmRelease(pDnode, pMnode); + } else { + mndSendRsp(pMsg, terrno); + } + + // mndCleanupMsg(pMsg); +} diff --git a/source/dnode/mgmt/impl/src/dndMgmt.c b/source/dnode/mgmt/impl/src/dndMgmt.c index 68d52df142..19e269063b 100644 --- a/source/dnode/mgmt/impl/src/dndMgmt.c +++ b/source/dnode/mgmt/impl/src/dndMgmt.c @@ -691,13 +691,13 @@ static void dndProcessMgmtQueue(SDnode *pDnode, SRpcMsg *pMsg) { switch (pMsg->msgType) { case TDMT_DND_CREATE_MNODE: - code = dndProcessCreateMnodeReq(pDnode, pMsg); + code = mmProcessCreateMnodeReq(pDnode, pMsg); break; case TDMT_DND_ALTER_MNODE: - code = dndProcessAlterMnodeReq(pDnode, pMsg); + code = mmProcessAlterMnodeReq(pDnode, pMsg); break; case TDMT_DND_DROP_MNODE: - code = dndProcessDropMnodeReq(pDnode, pMsg); + code = mmProcessDropMnodeReq(pDnode, pMsg); break; case TDMT_DND_CREATE_QNODE: code = dndProcessCreateQnodeReq(pDnode, pMsg);