diff --git a/include/libs/transport/trpc.h b/include/libs/transport/trpc.h index 7f02eafe3f..8341b2f0bb 100644 --- a/include/libs/transport/trpc.h +++ b/include/libs/transport/trpc.h @@ -51,6 +51,7 @@ typedef struct { SRpcMsg rpcMsg; int32_t rspLen; void *pRsp; + void *pNode; } SNodeMsg; typedef struct SRpcInit { diff --git a/source/dnode/mgmt/container/inc/dndInt.h b/source/dnode/mgmt/container/inc/dndInt.h index 90f9e86c25..788f615553 100644 --- a/source/dnode/mgmt/container/inc/dndInt.h +++ b/source/dnode/mgmt/container/inc/dndInt.h @@ -56,7 +56,7 @@ extern "C" { #define dDebug(...) { if (dDebugFlag & DEBUG_DEBUG) { taosPrintLog("DND ", DEBUG_DEBUG, dDebugFlag, __VA_ARGS__); }} #define dTrace(...) { if (dDebugFlag & DEBUG_TRACE) { taosPrintLog("DND ", DEBUG_TRACE, dDebugFlag, __VA_ARGS__); }} -typedef enum { MNODE, VNODES, QNODE, SNODE, BNODE, DNODE, NODE_MAX } ENodeType; +typedef enum { VNODES, QNODE, SNODE, MNODE, BNODE, DNODE, NODE_MAX } ENodeType; typedef enum { PROC_SINGLE, PROC_CHILD, PROC_PARENT } EProcType; typedef enum { DND_STAT_INIT, DND_STAT_RUNNING, DND_STAT_STOPPED } EDndStatus; typedef enum { DND_WORKER_SINGLE, DND_WORKER_MULTI } EWorkerType; @@ -77,29 +77,6 @@ typedef int32_t (*OpenNodeFp)(SMgmtWrapper *pWrapper); typedef void (*CloseNodeFp)(SMgmtWrapper *pWrapper); typedef bool (*RequireNodeFp)(SMgmtWrapper *pWrapper); -typedef struct SMsgHandle { - NodeMsgFp msgFp; - SMgmtWrapper *pWrapper; -} SMsgHandle; - -typedef struct SMgmtFp { - OpenNodeFp openFp; - CloseNodeFp closeFp; - RequireNodeFp requiredFp; -} SMgmtFp; - -typedef struct SMgmtWrapper { - const char *name; - char *path; - bool required; - EProcType procType; - SProcObj *pProc; - void *pMgmt; - SDnode *pDnode; - NodeMsgFp msgFps[TDMT_MAX]; - SMgmtFp fp; -} SMgmtWrapper; - typedef struct { EWorkerType type; const char *name; @@ -114,6 +91,29 @@ typedef struct { }; } SDnodeWorker; +typedef struct SMsgHandle { + NodeMsgFp msgFp; + SMgmtWrapper *pWrapper; +} SMsgHandle; + +typedef struct SMgmtFp { + OpenNodeFp openFp; + CloseNodeFp closeFp; + RequireNodeFp requiredFp; +} SMgmtFp; + +typedef struct SMgmtWrapper { + const char *name; + char *path; + bool required; + EProcType procType; + SProcObj *pProc; + void *pMgmt; + SDnode *pDnode; + NodeMsgFp msgFps[TDMT_MAX]; + SMgmtFp fp; +} SMgmtWrapper; + typedef struct { void *serverRpc; void *clientRpc; @@ -143,6 +143,7 @@ void dndGetStartup(SDnode *pDnode, SStartupReq *pStartup); TdFilePtr dndCheckRunning(char *dataDir); SMgmtWrapper *dndGetWrapper(SDnode *pDnode, ENodeType nodeType); void dndSetMsgHandle(SMgmtWrapper *pWrapper, int32_t msgType, NodeMsgFp nodeMsgFp); +void dndProcessStartupReq(SDnode *pDnode, SRpcMsg *pMsg); // dndMonitor.h void dndSendMonitorReport(SDnode *pDnode); @@ -153,6 +154,7 @@ void dndClose(SDnode *pDnode); int32_t dndRun(SDnode *pDnode); void dndeHandleEvent(SDnode *pDnode, EDndEvent event); void dndProcessRpcMsg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg, SEpSet *pEpSet); +void dndSendRsp(SMgmtWrapper *pWrapper, SRpcMsg *pRsp); // dndTransport.h int32_t dndSendReqToMnode(SDnode *pDnode, SRpcMsg *pRpcMsg); diff --git a/source/dnode/mgmt/container/inc/dndNode.h b/source/dnode/mgmt/container/inc/dndNode.h index 7a92248021..0fbae69d89 100644 --- a/source/dnode/mgmt/container/inc/dndNode.h +++ b/source/dnode/mgmt/container/inc/dndNode.h @@ -27,6 +27,7 @@ void dndClose(SDnode *pDnode); int32_t dndRun(SDnode *pDnode); void dndeHandleEvent(SDnode *pDnode, EDndEvent event); void dndProcessRpcMsg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg, SEpSet *pEpSet); +void dndSendRsp(SMgmtWrapper *pWrapper, SRpcMsg *pRsp); #ifdef __cplusplus } diff --git a/source/dnode/mgmt/container/src/dndInt.c b/source/dnode/mgmt/container/src/dndInt.c index 1306c862bc..7f4b000ed5 100644 --- a/source/dnode/mgmt/container/src/dndInt.c +++ b/source/dnode/mgmt/container/src/dndInt.c @@ -126,3 +126,15 @@ TdFilePtr dndCheckRunning(char *dataDir) { dDebug("file:%s is locked", filepath); return pFile; } + +void dndProcessStartupReq(SDnode *pDnode, SRpcMsg *pReq) { + dDebug("startup req is received"); + + SStartupReq *pStartup = rpcMallocCont(sizeof(SStartupReq)); + dndGetStartup(pDnode, pStartup); + + dDebug("startup req is sent, step:%s desc:%s finished:%d", pStartup->name, pStartup->desc, pStartup->finished); + + SRpcMsg rpcRsp = {.handle = pReq->handle, .pCont = pStartup, .contLen = sizeof(SStartupReq)}; + rpcSendResponse(&rpcRsp); +} diff --git a/source/dnode/mgmt/container/src/dndMonitor.c b/source/dnode/mgmt/container/src/dndMonitor.c index 4362049a5a..8b7ff389f6 100644 --- a/source/dnode/mgmt/container/src/dndMonitor.c +++ b/source/dnode/mgmt/container/src/dndMonitor.c @@ -66,7 +66,7 @@ void dndSendMonitorReport(SDnode *pDnode) { SMonClusterInfo clusterInfo = {0}; SMonVgroupInfo vgroupInfo = {0}; SMonGrantInfo grantInfo = {0}; - if (mmGetMonitorInfo(pDnode, &clusterInfo, &vgroupInfo, &grantInfo) == 0) { + if (mmGetMonitorInfo(dndGetWrapper(pDnode, MNODE), &clusterInfo, &vgroupInfo, &grantInfo) == 0) { monSetClusterInfo(pMonitor, &clusterInfo); monSetVgroupInfo(pMonitor, &vgroupInfo); monSetGrantInfo(pMonitor, &grantInfo); diff --git a/source/dnode/mgmt/container/src/dndNode.c b/source/dnode/mgmt/container/src/dndNode.c index 82130660ab..a64cb4f2b8 100644 --- a/source/dnode/mgmt/container/src/dndNode.c +++ b/source/dnode/mgmt/container/src/dndNode.c @@ -49,6 +49,10 @@ static void dndCloseNode(SMgmtWrapper *pWrapper) { if (pWrapper->required) { (*pWrapper->fp.closeFp)(pWrapper); } + if (pWrapper->pProc) { + taosProcCleanup(pWrapper->pProc); + pWrapper->pProc = NULL; + } } static void dndClearMemory(SDnode *pDnode) { @@ -171,6 +175,7 @@ static int32_t dndRunInSingleProcess(SDnode *pDnode) { if (!pWrapper->required) continue; dInfo("node:%s, will start in single process", pWrapper->name); + pWrapper->procType = PROC_SINGLE; if (dndOpenNode(pWrapper) != 0) { dError("node:%s, failed to start since %s", pWrapper->name, terrstr()); return -1; @@ -189,14 +194,67 @@ static void dndClearNodesExecpt(SDnode *pDnode, ENodeType except) { } } +static void dndSendRpcRsp(SMgmtWrapper *pWrapper, SRpcMsg *pRsp) { + if (pRsp->code == TSDB_CODE_DND_MNODE_NOT_DEPLOYED || pRsp->code == TSDB_CODE_APP_NOT_READY) { + dmSendRedirectRsp(pWrapper->pDnode, pRsp); + } else { + rpcSendResponse(pRsp); + } +} + +void dndSendRsp(SMgmtWrapper *pWrapper, SRpcMsg *pRsp) { + int32_t code = -1; + + if (pWrapper->procType != PROC_CHILD) { + dndSendRpcRsp(pWrapper, pRsp); + } else { + do { + code = taosProcPutToParentQueue(pWrapper->pProc, pRsp, sizeof(SRpcMsg), pRsp->pCont, pRsp->contLen); + if (code != 0) { + taosMsleep(10); + } + } while (code != 0); + } +} + +static void dndConsumeChildQueue(SMgmtWrapper *pWrapper, SNodeMsg *pMsg, int32_t msgLen, void *pCont, int32_t contLen) { + dTrace("msg:%p, get from child queue", pMsg); + SRpcMsg *pRpc = &pMsg->rpcMsg; + pRpc->pCont = pCont; + + NodeMsgFp msgFp = pWrapper->msgFps[TMSG_INDEX(pRpc->msgType)]; + int32_t code = (*msgFp)(pWrapper, pMsg); + + if (code != 0) { + bool isReq = (pRpc->msgType & 1U); + if (isReq) { + SRpcMsg rsp = {.handle = pRpc->handle, .ahandle = pRpc->ahandle, .code = terrno}; + dndSendRsp(pWrapper, &rsp); + } + + dTrace("msg:%p, is freed", pMsg); + taosFreeQitem(pMsg); + rpcFreeCont(pCont); + } +} + +static void dndConsumeParentQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRsp, int32_t msgLen, void *pCont, int32_t contLen) { + dTrace("msg:%p, get from parent queue", pRsp); + pRsp->pCont = pCont; + dndSendRpcRsp(pWrapper, pRsp); + free(pRsp); +} + static int32_t dndRunInMultiProcess(SDnode *pDnode) { + dInfo("dnode run in multi process mode"); + for (ENodeType n = 0; n < NODE_MAX; ++n) { SMgmtWrapper *pWrapper = &pDnode->wrappers[n]; if (!pWrapper->required) continue; if (n == DNODE) { dInfo("node:%s, will start in parent process", pWrapper->name); - pWrapper->procType = PROC_PARENT; + pWrapper->procType = PROC_SINGLE; if (dndOpenNode(pWrapper) != 0) { dError("node:%s, failed to start since %s", pWrapper->name, terrstr()); return -1; @@ -204,7 +262,21 @@ static int32_t dndRunInMultiProcess(SDnode *pDnode) { continue; } - SProcCfg cfg = {0}; + SProcCfg cfg = {.childQueueSize = 1024 * 1024 * 2, // size will be a configuration item + .childConsumeFp = (ProcConsumeFp)dndConsumeChildQueue, + .childMallocHeadFp = (ProcMallocFp)taosAllocateQitem, + .childFreeHeadFp = (ProcFreeFp)taosFreeQitem, + .childMallocBodyFp = (ProcMallocFp)rpcMallocCont, + .childFreeBodyFp = (ProcFreeFp)rpcFreeCont, + .parentQueueSize = 1024 * 1024 * 2, // size will be a configuration item + .parentConsumeFp = (ProcConsumeFp)dndConsumeParentQueue, + .parentdMallocHeadFp = (ProcMallocFp)malloc, + .parentFreeHeadFp = (ProcFreeFp)free, + .parentMallocBodyFp = (ProcMallocFp)rpcMallocCont, + .parentFreeBodyFp = (ProcFreeFp)rpcFreeCont, + .testFlag = 0, + .pParent = pWrapper, + .name = pWrapper->name}; SProcObj *pProc = taosProcInit(&cfg); if (pProc == NULL) { dError("node:%s, failed to fork since %s", pWrapper->name, terrstr()); @@ -227,6 +299,11 @@ static int32_t dndRunInMultiProcess(SDnode *pDnode) { dInfo("node:%s, will not start in parent process", pWrapper->name); pWrapper->procType = PROC_PARENT; } + + if (taosProcRun(pProc) != 0) { + dError("node:%s, failed to run proc since %s", pWrapper->name, terrstr()); + return -1; + } } return 0; @@ -275,14 +352,6 @@ static int32_t dndBuildMsg(SNodeMsg *pMsg, SRpcMsg *pRpc, SEpSet *pEpSet) { return 0; } -static void dndSendRpcRsp(SDnode *pDnode, SRpcMsg *pRpc) { - if (pRpc->code == TSDB_CODE_APP_NOT_READY) { - dmSendRedirectRsp(pDnode, pRpc); - } else { - rpcSendResponse(pRpc); - } -} - void dndProcessRpcMsg(SMgmtWrapper *pWrapper, SRpcMsg *pRpc, SEpSet *pEpSet) { if (pEpSet && pEpSet->numOfEps > 0 && pRpc->msgType == TDMT_MND_STATUS_RSP) { dmUpdateMnodeEpSet(pWrapper->pDnode, pEpSet); @@ -307,19 +376,33 @@ void dndProcessRpcMsg(SMgmtWrapper *pWrapper, SRpcMsg *pRpc, SEpSet *pEpSet) { } dTrace("msg:%p, is created, app:%p user:%s", pMsg, pRpc->ahandle, pMsg->user); - code = (*msgFp)(pWrapper, pMsg); + + if (pWrapper->procType == PROC_SINGLE) { + code = (*msgFp)(pWrapper, pMsg); + } else if (pWrapper->procType == PROC_PARENT) { + code = taosProcPutToChildQueue(pWrapper->pProc, pMsg, sizeof(SNodeMsg), pRpc->pCont, pRpc->contLen); + } else { + terrno = TSDB_CODE_MEMORY_CORRUPTED; + dError("msg:%p, won't be processed for it is child process", pMsg); + } _OVER: - if (code != 0) { + if (code == 0) { + if (pWrapper->procType == PROC_PARENT) { + dTrace("msg:%p, is freed", pMsg); + taosFreeQitem(pMsg); + rpcFreeCont(pRpc->pCont); + } + } else { dError("msg:%p, failed to process since %s", pMsg, terrstr()); bool isReq = (pRpc->msgType & 1U); if (isReq) { SRpcMsg rsp = {.handle = pRpc->handle, .ahandle = pRpc->ahandle, .code = terrno}; - dndSendRpcRsp(pWrapper->pDnode, &rsp); + dndSendRsp(pWrapper, &rsp); } dTrace("msg:%p, is freed", pMsg); taosFreeQitem(pMsg); rpcFreeCont(pRpc->pCont); } -} \ No newline at end of file +} diff --git a/source/dnode/mgmt/container/src/dndTransport.c b/source/dnode/mgmt/container/src/dndTransport.c index 196bdb165b..002ce73460 100644 --- a/source/dnode/mgmt/container/src/dndTransport.c +++ b/source/dnode/mgmt/container/src/dndTransport.c @@ -25,8 +25,7 @@ static void dndProcessResponse(void *parent, SRpcMsg *pRsp, SEpSet *pEpSet) { SDnode *pDnode = parent; STransMgmt *pMgmt = &pDnode->trans; - - tmsg_t msgType = pRsp->msgType; + tmsg_t msgType = pRsp->msgType; if (dndGetStatus(pDnode) == DND_STAT_STOPPED) { if (pRsp == NULL || pRsp->pCont == NULL) return; @@ -88,11 +87,11 @@ void dndCleanupClient(SDnode *pDnode) { static void dndProcessRequest(void *param, SRpcMsg *pReq, SEpSet *pEpSet) { SDnode *pDnode = param; STransMgmt *pMgmt = &pDnode->trans; + tmsg_t msgType = pReq->msgType; - tmsg_t msgType = pReq->msgType; if (msgType == TDMT_DND_NETWORK_TEST) { dTrace("RPC %p, network test req will be processed, app:%p", pReq->handle, pReq->ahandle); - dmProcessStartupReq(pDnode, pReq); + dndProcessStartupReq(pDnode, pReq); return; } @@ -249,8 +248,8 @@ void dndCleanupServer(SDnode *pDnode) { int32_t dndInitMsgHandle(SDnode *pDnode) { STransMgmt *pMgmt = &pDnode->trans; - for (ENodeType nodeType = 0; nodeType < NODE_MAX; ++nodeType) { - SMgmtWrapper *pWrapper = &pDnode->wrappers[nodeType]; + for (ENodeType n = 0; n < NODE_MAX; ++n) { + SMgmtWrapper *pWrapper = &pDnode->wrappers[n]; for (int32_t msgIndex = 0; msgIndex < TDMT_MAX; ++msgIndex) { NodeMsgFp msgFp = pWrapper->msgFps[msgIndex]; diff --git a/source/dnode/mgmt/dnode/inc/dmInt.h b/source/dnode/mgmt/dnode/inc/dmInt.h index 6a3e205f7c..466c124866 100644 --- a/source/dnode/mgmt/dnode/inc/dmInt.h +++ b/source/dnode/mgmt/dnode/inc/dmInt.h @@ -51,9 +51,6 @@ void dmUpdateMnodeEpSet(SDnode *pDnode, SEpSet *pEpSet); void dmGetDnodeEp(SDnode *pDnode, int32_t dnodeId, char *pEp, char *pFqdn, uint16_t *pPort); void dmSendRedirectRsp(SDnode *pDnode, SRpcMsg *pMsg); -// dmHandle.h -void dmProcessStartupReq(SDnode *pDnode, SRpcMsg *pMsg); - #ifdef __cplusplus } #endif diff --git a/source/dnode/mgmt/dnode/inc/dmMsg.h b/source/dnode/mgmt/dnode/inc/dmMsg.h index 3877d03c96..036471f17f 100644 --- a/source/dnode/mgmt/dnode/inc/dmMsg.h +++ b/source/dnode/mgmt/dnode/inc/dmMsg.h @@ -24,7 +24,6 @@ extern "C" { void dmInitMsgHandles(SMgmtWrapper *pWrapper); void dmSendStatusReq(SDnodeMgmt *pMgmt); -void dmProcessStartupReq(SDnode *pDnode, SRpcMsg *pMsg); int32_t dmProcessConfigReq(SDnode *pDnode, SRpcMsg *pReq); void dmProcessStatusRsp(SDnode *pDnode, SRpcMsg *pRsp); void dmProcessAuthRsp(SDnode *pDnode, SRpcMsg *pRsp); diff --git a/source/dnode/mgmt/dnode/inc/dmWorker.h b/source/dnode/mgmt/dnode/inc/dmWorker.h index 992dca1ea7..611a5e7c67 100644 --- a/source/dnode/mgmt/dnode/inc/dmWorker.h +++ b/source/dnode/mgmt/dnode/inc/dmWorker.h @@ -22,8 +22,8 @@ extern "C" { #endif -int32_t dmStartWorker(); -void dmStopWorker(); +int32_t dmStartWorker(SDnodeMgmt *pMgmt); +void dmStopWorker(SDnodeMgmt *pMgmt); int32_t dmProcessMgmtMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); #ifdef __cplusplus diff --git a/source/dnode/mgmt/dnode/src/dmFile.c b/source/dnode/mgmt/dnode/src/dmFile.c index 0794824ac6..03fc47ae27 100644 --- a/source/dnode/mgmt/dnode/src/dmFile.c +++ b/source/dnode/mgmt/dnode/src/dmFile.c @@ -151,8 +151,8 @@ PRASE_DNODE_OVER: dmResetDnodes(pMgmt, pMgmt->pDnodeEps); - terrno = 0; - return 0; + terrno = code; + return code; } int32_t dmWriteFile(SDnodeMgmt *pMgmt) { diff --git a/source/dnode/mgmt/dnode/src/dmInt.c b/source/dnode/mgmt/dnode/src/dmInt.c index 27d4c5a3f9..23251a2ff2 100644 --- a/source/dnode/mgmt/dnode/src/dmInt.c +++ b/source/dnode/mgmt/dnode/src/dmInt.c @@ -108,6 +108,7 @@ void dmSendRedirectRsp(SDnode *pDnode, SRpcMsg *pReq) { int32_t dmInit(SMgmtWrapper *pWrapper) { SDnode *pDnode = pWrapper->pDnode; SDnodeMgmt *pMgmt = calloc(1, sizeof(SDnodeMgmt)); + dInfo("dnode-mgmt is initialized"); pMgmt->dnodeId = 0; pMgmt->dropped = 0; @@ -120,23 +121,23 @@ int32_t dmInit(SMgmtWrapper *pWrapper) { pMgmt->dnodeHash = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK); if (pMgmt->dnodeHash == NULL) { - dError("node:%s, failed to init dnode hash", pWrapper->name); + dError("failed to init dnode hash"); terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } if (dmReadFile(pMgmt) != 0) { - dError("node:%s, failed to read file since %s", pWrapper->name, terrstr()); + dError("failed to read file since %s", terrstr()); return -1; } if (pMgmt->dropped) { - dError("node:%s, will not start since its already dropped", pWrapper->name); + dError("dnode will not start since its already dropped"); return -1; } if (dmStartWorker(pMgmt) != 0) { - dError("node:%s, failed to start worker since %s", pWrapper->name, terrstr()); + dError("failed to start dnode worker since %s", terrstr()); return -1; } @@ -153,6 +154,7 @@ void dmCleanup(SMgmtWrapper *pWrapper) { SDnodeMgmt *pMgmt = pWrapper->pMgmt; if (pMgmt == NULL) return; + dInfo("dnode-mgmt start to clean up"); dmStopWorker(pMgmt); taosWLockLatch(&pMgmt->latch); @@ -168,6 +170,9 @@ void dmCleanup(SMgmtWrapper *pWrapper) { } taosWUnLockLatch(&pMgmt->latch); + + free(pMgmt); + pWrapper->pMgmt = NULL; dInfo("dnode-mgmt is cleaned up"); } diff --git a/source/dnode/mgmt/dnode/src/dmMsg.c b/source/dnode/mgmt/dnode/src/dmMsg.c index 5879e29b56..28d7eb0d29 100644 --- a/source/dnode/mgmt/dnode/src/dmMsg.c +++ b/source/dnode/mgmt/dnode/src/dmMsg.c @@ -102,18 +102,6 @@ int32_t dmProcessConfigReq(SDnode *pDnode, SRpcMsg *pReq) { return TSDB_CODE_OPS_NOT_SUPPORT; } -void dmProcessStartupReq(SDnode *pDnode, SRpcMsg *pReq) { - dDebug("startup req is received"); - - SStartupReq *pStartup = rpcMallocCont(sizeof(SStartupReq)); - dndGetStartup(pDnode, pStartup); - - dDebug("startup req is sent, step:%s desc:%s finished:%d", pStartup->name, pStartup->desc, pStartup->finished); - - SRpcMsg rpcRsp = {.handle = pReq->handle, .pCont = pStartup, .contLen = sizeof(SStartupReq)}; - rpcSendResponse(&rpcRsp); -} - void dmInitMsgHandles(SMgmtWrapper *pWrapper) { // Requests handled by DNODE dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_MNODE, dmProcessMgmtMsg); diff --git a/source/dnode/mgmt/dnode/src/dmWorker.c b/source/dnode/mgmt/dnode/src/dmWorker.c index 80fa34839e..161b740ac9 100644 --- a/source/dnode/mgmt/dnode/src/dmWorker.c +++ b/source/dnode/mgmt/dnode/src/dmWorker.c @@ -57,7 +57,7 @@ static void *dmThreadRoutine(void *param) { static void dmProcessMgmtQueue(SDnode *pDnode, SNodeMsg *pNodeMsg) { int32_t code = 0; SRpcMsg *pMsg = &pNodeMsg->rpcMsg; - dTrace("msg:%p, will be processed in mgmt queue", pNodeMsg); + dTrace("msg:%p, will be processed", pNodeMsg); switch (pMsg->msgType) { case TDMT_DND_CREATE_MNODE: diff --git a/source/dnode/mgmt/mnode/inc/mmFile.h b/source/dnode/mgmt/mnode/inc/mmFile.h index 0aae15f077..8beceea305 100644 --- a/source/dnode/mgmt/mnode/inc/mmFile.h +++ b/source/dnode/mgmt/mnode/inc/mmFile.h @@ -22,8 +22,8 @@ extern "C" { #endif -int32_t mmReadFile(SDnode *pDnode); -int32_t mmWriteFile(SDnode *pDnode); +int32_t mmReadFile(SMnodeMgmt *pMgmt); +int32_t mmWriteFile(SMnodeMgmt *pMgmt); #ifdef __cplusplus } diff --git a/source/dnode/mgmt/mnode/inc/mmInt.h b/source/dnode/mgmt/mnode/inc/mmInt.h index 1f7d94aa27..ac1b79ee62 100644 --- a/source/dnode/mgmt/mnode/inc/mmInt.h +++ b/source/dnode/mgmt/mnode/inc/mmInt.h @@ -30,31 +30,35 @@ typedef struct SMnodeMgmt { int8_t selfIndex; SReplica replicas[TSDB_MAX_REPLICA]; SMnode *pMnode; + SDnode *pDnode; SProcObj *pProcess; bool singleProc; SRWLatch latch; + const char *path; SDnodeWorker readWorker; SDnodeWorker writeWorker; SDnodeWorker syncWorker; } SMnodeMgmt; -// mmInt.h -void mmGetMgmtFp(SMgmtWrapper *pMgmt); +// interface +void mmGetMgmtFp(SMgmtWrapper *pMgmt); +int32_t mmGetUserAuth(SMgmtWrapper *pWrapper, char *user, char *spi, char *encrypt, char *secret, char *ckey); +int32_t mmGetMonitorInfo(SMgmtWrapper *pWrapper, SMonClusterInfo *pClusterInfo, SMonVgroupInfo *pVgroupInfo, + SMonGrantInfo *pGrantInfo); -// mmMgmt.h -int32_t mmInit(SDnode *pDnode); -void mmCleanup(SDnode *pDnode); +// mmInt.h +SMnode *mmAcquire(SMnodeMgmt *pMgmt); +void mmRelease(SMnodeMgmt *pMgmt, SMnode *pMnode); +int32_t mmOpen(SMnodeMgmt *pMgmt, SMnodeOpt *pOption); +int32_t mmAlter(SMnodeMgmt *pMgmt, SMnodeOpt *pOption); +int32_t mmDrop(SMnodeMgmt *pMgmt); +int32_t mmBuildOptionFromReq(SMnodeMgmt *pMgmt, SMnodeOpt *pOption, SDCreateMnodeReq *pCreate); // mmHandle.h int32_t mmProcessCreateReq(SDnode *pDnode, SRpcMsg *pRpcMsg); int32_t mmProcessAlterReq(SDnode *pDnode, SRpcMsg *pRpcMsg); int32_t mmProcessDropReq(SDnode *pDnode, SRpcMsg *pRpcMsg); -int32_t mmGetUserAuth(SMgmtWrapper *pWrapper, char *user, char *spi, char *encrypt, char *secret, char *ckey); -int32_t mmGetMonitorInfo(SDnode *pDnode, SMonClusterInfo *pClusterInfo, SMonVgroupInfo *pVgroupInfo, - SMonGrantInfo *pGrantInfo); - - #ifdef __cplusplus } #endif diff --git a/source/dnode/mgmt/mnode/inc/mmMgmt.h b/source/dnode/mgmt/mnode/inc/mmMgmt.h deleted file mode 100644 index d2075e40b6..0000000000 --- a/source/dnode/mgmt/mnode/inc/mmMgmt.h +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#ifndef _TD_DND_MNODE_MGMT_H_ -#define _TD_DND_MNODE_MGMT_H_ - -#include "mmInt.h" - -#ifdef __cplusplus -extern "C" { -#endif - -SMnode *mmAcquire(SDnode *pDnode); -void mmRelease(SDnode *pDnode, SMnode *pMnode); -int32_t mmOpen(SDnode *pDnode, SMnodeOpt *pOption); -int32_t mmAlter(SDnode *pDnode, SMnodeOpt *pOption); -int32_t mmDrop(SDnode *pDnode); -int32_t mmBuildOptionFromReq(SDnode *pDnode, SMnodeOpt *pOption, SDCreateMnodeReq *pCreate); - -#ifdef __cplusplus -} -#endif - -#endif /*_TD_DND_MNODE_MGMT_H_*/ \ No newline at end of file diff --git a/source/dnode/mgmt/mnode/inc/mmMsg.h b/source/dnode/mgmt/mnode/inc/mmMsg.h index 620d2903e4..fdafff901f 100644 --- a/source/dnode/mgmt/mnode/inc/mmMsg.h +++ b/source/dnode/mgmt/mnode/inc/mmMsg.h @@ -27,11 +27,6 @@ int32_t mmProcessCreateReq(SDnode *pDnode, SRpcMsg *pRpcMsg); int32_t mmProcessAlterReq(SDnode *pDnode, SRpcMsg *pRpcMsg); int32_t mmProcessDropReq(SDnode *pDnode, SRpcMsg *pRpcMsg); -int32_t mmGetUserAuth(SMgmtWrapper *pWrapper, char *user, char *spi, char *encrypt, char *secret, char *ckey); -int32_t mmGetMonitorInfo(SDnode *pDnode, SMonClusterInfo *pClusterInfo, SMonVgroupInfo *pVgroupInfo, - SMonGrantInfo *pGrantInfo); - - #ifdef __cplusplus } #endif diff --git a/source/dnode/mgmt/mnode/inc/mmWorker.h b/source/dnode/mgmt/mnode/inc/mmWorker.h index a1dbd5c267..f86b484699 100644 --- a/source/dnode/mgmt/mnode/inc/mmWorker.h +++ b/source/dnode/mgmt/mnode/inc/mmWorker.h @@ -22,19 +22,15 @@ extern "C" { #endif -int32_t mmStartWorker(SDnode *pDnode); -void mmStopWorker(SDnode *pDnode); -void mmInitMsgFp(SMnodeMgmt *pMgmt); -void mmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet); -int32_t mmPutMsgToWriteQueue(SDnode *pDnode, SRpcMsg *pRpcMsg); -int32_t mmPutMsgToReadQueue(SDnode *pDnode, SRpcMsg *pRpcMsg); -void mmConsumeChildQueue(SDnode *pDnode, SMndMsg *pMsg, int32_t msgLen, void *pCont, int32_t contLen); -void mmConsumeParentQueue(SDnode *pDnode, SRpcMsg *pMsg, int32_t msgLen, void *pCont, int32_t contLen); - +int32_t mmStartWorker(SMnodeMgmt *pMgmt); +void mmStopWorker(SMnodeMgmt *pMgmt); int32_t mmProcessWriteMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); int32_t mmProcessSyncMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); int32_t mmProcessReadMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); +int32_t mmPutMsgToWriteQueue(SDnode *pDnode, SRpcMsg *pRpcMsg); +int32_t mmPutMsgToReadQueue(SDnode *pDnode, SRpcMsg *pRpcMsg); + #ifdef __cplusplus } #endif diff --git a/source/dnode/mgmt/mnode/src/mmFile.c b/source/dnode/mgmt/mnode/src/mmFile.c index e6edfe1dce..a232d9a3ad 100644 --- a/source/dnode/mgmt/mnode/src/mmFile.c +++ b/source/dnode/mgmt/mnode/src/mmFile.c @@ -14,22 +14,19 @@ */ #define _DEFAULT_SOURCE -#include "mmInt.h" +#include "mmFile.h" -#if 0 -int32_t mmReadFile(SDnode *pDnode) { - SMnodeMgmt *pMgmt = &pDnode->mmgmt; +int32_t mmReadFile(SMnodeMgmt *pMgmt) { + int32_t code = TSDB_CODE_DND_MNODE_READ_FILE_ERROR; + int32_t len = 0; + int32_t maxLen = 4096; + char *content = calloc(1, maxLen + 1); + cJSON *root = NULL; + char file[PATH_MAX]; + TdFilePtr pFile = NULL; - int32_t code = TSDB_CODE_DND_MNODE_READ_FILE_ERROR; - int32_t len = 0; - int32_t maxLen = 4096; - char *content = calloc(1, maxLen + 1); - cJSON *root = NULL; - char file[PATH_MAX + 20]; - - snprintf(file, sizeof(file), "%s%smnode.json", pDnode->dir.dnode, TD_DIRSEP); - - TdFilePtr pFile = taosOpenFile(file, TD_FILE_READ); + snprintf(file, sizeof(file), "%s%smnode.json", pMgmt->path, TD_DIRSEP); + pFile = taosOpenFile(file, TD_FILE_READ); if (pFile == NULL) { dDebug("file %s not exist", file); code = 0; @@ -115,11 +112,9 @@ PRASE_MNODE_OVER: return code; } -int32_t mmWriteFile(SDnode *pDnode) { - SMnodeMgmt *pMgmt = &pDnode->mmgmt; - +int32_t mmWriteFile(SMnodeMgmt *pMgmt) { char file[PATH_MAX]; - snprintf(file, sizeof(file), "%s%smnode.json.bak", pDnode->dir.dnode, TD_DIRSEP); + snprintf(file, sizeof(file), "%s%smnode.json.bak", pMgmt->path, TD_DIRSEP); TdFilePtr pFile = taosOpenFile(file, TD_FILE_CTEATE | TD_FILE_WRITE | TD_FILE_TRUNC); if (pFile == NULL) { @@ -156,7 +151,7 @@ int32_t mmWriteFile(SDnode *pDnode) { free(content); char realfile[PATH_MAX + 20]; - snprintf(realfile, sizeof(realfile), "%s%smnode.json", pDnode->dir.dnode, TD_DIRSEP); + snprintf(realfile, sizeof(realfile), "%s%smnode.json", pMgmt->path, TD_DIRSEP); if (taosRenameFile(file, realfile) != 0) { terrno = TSDB_CODE_DND_MNODE_WRITE_FILE_ERROR; @@ -167,5 +162,3 @@ int32_t mmWriteFile(SDnode *pDnode) { dInfo("successed to write %s, deployed:%d dropped:%d", realfile, pMgmt->deployed, pMgmt->dropped); return 0; } - -#endif \ No newline at end of file diff --git a/source/dnode/mgmt/mnode/src/mmInt.c b/source/dnode/mgmt/mnode/src/mmInt.c index 77d94ffef5..b0acd9def1 100644 --- a/source/dnode/mgmt/mnode/src/mmInt.c +++ b/source/dnode/mgmt/mnode/src/mmInt.c @@ -15,15 +15,286 @@ #define _DEFAULT_SOURCE #include "mmInt.h" +#include "dmInt.h" +#include "mmFile.h" #include "mmMsg.h" +#include "mmWorker.h" -bool mmRequireNode(SMgmtWrapper *pWrapper) { return false; } +SMnode *mmAcquire(SMnodeMgmt *pMgmt) { + SMnode *pMnode = NULL; + int32_t refCount = 0; + + taosRLockLatch(&pMgmt->latch); + if (pMgmt->deployed && !pMgmt->dropped) { + refCount = atomic_add_fetch_32(&pMgmt->refCount, 1); + pMnode = pMgmt->pMnode; + } else { + terrno = TSDB_CODE_DND_MNODE_NOT_DEPLOYED; + } + taosRUnLockLatch(&pMgmt->latch); + + if (pMnode != NULL) { + dTrace("acquire mnode, refCount:%d", refCount); + } + return pMnode; +} + +void mmRelease(SMnodeMgmt *pMgmt, SMnode *pMnode) { + if (pMnode == NULL) return; + + taosRLockLatch(&pMgmt->latch); + int32_t refCount = atomic_sub_fetch_32(&pMgmt->refCount, 1); + taosRUnLockLatch(&pMgmt->latch); + dTrace("release mnode, refCount:%d", refCount); +} + +int32_t mmOpen(SMnodeMgmt *pMgmt, SMnodeOpt *pOption) { + SMnode *pMnode = mndOpen(pMgmt->path, pOption); + if (pMnode == NULL) { + dError("failed to open mnode since %s", terrstr()); + return -1; + } + + if (mmStartWorker(pMgmt) != 0) { + dError("failed to start mnode worker since %s", terrstr()); + mndClose(pMnode); + mndDestroy(pMgmt->path); + return -1; + } + + pMgmt->deployed = 1; + if (mmWriteFile(pMgmt) != 0) { + dError("failed to write mnode file since %s", terrstr()); + pMgmt->deployed = 0; + mmStopWorker(pMgmt); + mndClose(pMnode); + mndDestroy(pMgmt->path); + return -1; + } + + taosWLockLatch(&pMgmt->latch); + pMgmt->pMnode = pMnode; + pMgmt->deployed = 1; + taosWUnLockLatch(&pMgmt->latch); + + dInfo("mnode open successfully"); + return 0; +} + +int32_t mmAlter(SMnodeMgmt *pMgmt, SMnodeOpt *pOption) { + SMnode *pMnode = mmAcquire(pMgmt); + if (pMnode == NULL) { + dError("failed to alter mnode since %s", terrstr()); + return -1; + } + + if (mndAlter(pMnode, pOption) != 0) { + dError("failed to alter mnode since %s", terrstr()); + mmRelease(pMgmt, pMnode); + return -1; + } + + mmRelease(pMgmt, pMnode); + return 0; +} + +int32_t mmDrop(SMnodeMgmt *pMgmt) { + SMnode *pMnode = mmAcquire(pMgmt); + if (pMnode == NULL) { + dError("failed to drop mnode since %s", terrstr()); + return -1; + } + + taosRLockLatch(&pMgmt->latch); + pMgmt->dropped = 1; + taosRUnLockLatch(&pMgmt->latch); + + if (mmWriteFile(pMgmt) != 0) { + taosRLockLatch(&pMgmt->latch); + pMgmt->dropped = 0; + taosRUnLockLatch(&pMgmt->latch); + + mmRelease(pMgmt, pMnode); + dError("failed to drop mnode since %s", terrstr()); + return -1; + } + + mmRelease(pMgmt, pMnode); + mmStopWorker(pMgmt); + pMgmt->deployed = 0; + mmWriteFile(pMgmt); + mndClose(pMnode); + pMgmt->pMnode = NULL; + mndDestroy(pMgmt->path); + + return 0; +} + +static void mmInitOption(SMnodeMgmt *pMgmt, SMnodeOpt *pOption) { + SDnode *pDnode = pMgmt->pDnode; + + pOption->pDnode = pDnode; + pOption->sendReqToDnodeFp = dndSendReqToDnode; + pOption->sendReqToMnodeFp = dndSendReqToMnode; + pOption->sendRedirectRspFp = dmSendRedirectRsp; + pOption->putReqToMWriteQFp = mmPutMsgToWriteQueue; + pOption->putReqToMReadQFp = mmPutMsgToReadQueue; + pOption->dnodeId = dmGetDnodeId(pDnode); + pOption->clusterId = dmGetClusterId(pDnode); +} + +static void mmBuildOptionForDeploy(SMnodeMgmt *pMgmt, SMnodeOpt *pOption) { + SDnode *pDnode = pMgmt->pDnode; + + mmInitOption(pMgmt, pOption); + pOption->replica = 1; + pOption->selfIndex = 0; + SReplica *pReplica = &pOption->replicas[0]; + pReplica->id = 1; + pReplica->port = pDnode->cfg.serverPort; + memcpy(pReplica->fqdn, pDnode->cfg.localFqdn, TSDB_FQDN_LEN); + + pMgmt->selfIndex = pOption->selfIndex; + pMgmt->replica = pOption->replica; + memcpy(&pMgmt->replicas, pOption->replicas, sizeof(SReplica) * TSDB_MAX_REPLICA); +} + +static void mmBuildOptionForOpen(SMnodeMgmt *pMgmt, SMnodeOpt *pOption) { + mmInitOption(pMgmt, pOption); + pOption->selfIndex = pMgmt->selfIndex; + pOption->replica = pMgmt->replica; + memcpy(&pOption->replicas, pMgmt->replicas, sizeof(SReplica) * TSDB_MAX_REPLICA); +} + +int32_t mmBuildOptionFromReq(SMnodeMgmt *pMgmt, SMnodeOpt *pOption, SDCreateMnodeReq *pCreate) { + SDnode *pDnode = pMgmt->pDnode; + + mmInitOption(pMgmt, pOption); + pOption->dnodeId = dmGetDnodeId(pDnode); + pOption->clusterId = dmGetClusterId(pDnode); + + pOption->replica = pCreate->replica; + pOption->selfIndex = -1; + for (int32_t i = 0; i < pCreate->replica; ++i) { + SReplica *pReplica = &pOption->replicas[i]; + pReplica->id = pCreate->replicas[i].id; + pReplica->port = pCreate->replicas[i].port; + memcpy(pReplica->fqdn, pCreate->replicas[i].fqdn, TSDB_FQDN_LEN); + if (pReplica->id == pOption->dnodeId) { + pOption->selfIndex = i; + } + } + + if (pOption->selfIndex == -1) { + dError("failed to build mnode options since %s", terrstr()); + return -1; + } + + pMgmt->selfIndex = pOption->selfIndex; + pMgmt->replica = pOption->replica; + memcpy(&pMgmt->replicas, pOption->replicas, sizeof(SReplica) * TSDB_MAX_REPLICA); + return 0; +} + +static void mmCleanup(SMgmtWrapper *pWrapper) { + SMnodeMgmt *pMgmt = pWrapper->pMgmt; + if (pMgmt == NULL) return; + + dInfo("mnode-mgmt start to clean up"); + if (pMgmt->pMnode) { + mmStopWorker(pMgmt); + mndClose(pMgmt->pMnode); + pMgmt->pMnode = NULL; + } + free(pMgmt); + pWrapper->pMgmt = NULL; + dInfo("mnode-mgmt is cleaned up"); +} + +static int32_t mmInit(SMgmtWrapper *pWrapper) { + SDnode *pDnode = pWrapper->pDnode; + SMnodeMgmt *pMgmt = calloc(1, sizeof(SMnodeMgmt)); + int32_t code = -1; + SMnodeOpt option = {0}; + + dInfo("mnode-mgmt is initialized"); + pMgmt->path = pWrapper->path; + pMgmt->pDnode = pWrapper->pDnode; + taosInitRWLatch(&pMgmt->latch); + + if (mmReadFile(pMgmt) != 0) { + dError("failed to read file since %s", terrstr()); + goto _OVER; + } + + if (!pMgmt->deployed) { + dInfo("mnode start to deploy"); + mmBuildOptionForDeploy(pMgmt, &option); + code = mmOpen(pMgmt, &option); + } else { + dInfo("mnode start to open"); + mmBuildOptionForOpen(pMgmt, &option); + code = mmOpen(pMgmt, &option); + } + +_OVER: + if (code == 0) { + dInfo("mnode-mgmt is initialized"); + } else { + dError("failed to init mnode-mgmtsince %s", terrstr()); + mmCleanup(pWrapper); + } + + return code; +} + +static bool mmDeployRequired(SDnode *pDnode) { + if (dmGetDnodeId(pDnode) > 0) { + return false; + } + + if (dmGetClusterId(pDnode) > 0) { + return false; + } + + if (strcmp(pDnode->cfg.localEp, pDnode->cfg.firstEp) != 0) { + return false; + } + + return true; +} + +static bool mmRequire(SMgmtWrapper *pWrapper) { + SMnodeMgmt mgmt = {0}; + mgmt.path = pWrapper->path; + if (mmReadFile(&mgmt) != 0) { + return false; + } + + if (mgmt.dropped) { + dInfo("mnode has been dropped and needs to be deleted"); + mndDestroy(mgmt.path); + return false; + } + + if (mgmt.deployed) { + dInfo("mnode has been deployed"); + return true; + } + + bool required = mmDeployRequired(pWrapper->pDnode); + if (required) { + dInfo("mnode need to be deployed"); + } + + return required; +} void mmGetMgmtFp(SMgmtWrapper *pWrapper) { SMgmtFp mgmtFp = {0}; - mgmtFp.openFp = NULL; + mgmtFp.openFp = mmInit; mgmtFp.closeFp = NULL; - mgmtFp.requiredFp = mmRequireNode; + mgmtFp.requiredFp = mmRequire; mmInitMsgHandles(pWrapper); pWrapper->name = "mnode"; @@ -31,10 +302,29 @@ void mmGetMgmtFp(SMgmtWrapper *pWrapper) { } int32_t mmGetUserAuth(SMgmtWrapper *pWrapper, char *user, char *spi, char *encrypt, char *secret, char *ckey) { - return 0; + SMnodeMgmt *pMgmt = pWrapper->pMgmt; + + SMnode *pMnode = mmAcquire(pMgmt); + if (pMnode == NULL) { + terrno = TSDB_CODE_APP_NOT_READY; + dTrace("failed to get user auth since %s", terrstr()); + return -1; + } + + int32_t code = mndRetriveAuth(pMnode, user, spi, encrypt, secret, ckey); + mmRelease(pMgmt, pMnode); + + dTrace("user:%s, retrieve auth spi:%d encrypt:%d", user, *spi, *encrypt); + return code; } -int32_t mmGetMonitorInfo(SDnode *pDnode, SMonClusterInfo *pClusterInfo, SMonVgroupInfo *pVgroupInfo, +int32_t mmGetMonitorInfo(SMgmtWrapper *pWrapper, SMonClusterInfo *pClusterInfo, SMonVgroupInfo *pVgroupInfo, SMonGrantInfo *pGrantInfo) { - return 0; + SMnodeMgmt *pMgmt = pWrapper->pMgmt; + SMnode *pMnode = mmAcquire(pMgmt); + if (pMnode == NULL) return -1; + + int32_t code = mndGetMonitorInfo(pMnode, pClusterInfo, pVgroupInfo, pGrantInfo); + mmRelease(pMgmt, pMnode); + return code; } \ No newline at end of file diff --git a/source/dnode/mgmt/mnode/src/mmMgmt.c b/source/dnode/mgmt/mnode/src/mmMgmt.c deleted file mode 100644 index c7e3f39814..0000000000 --- a/source/dnode/mgmt/mnode/src/mmMgmt.c +++ /dev/null @@ -1,321 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#define _DEFAULT_SOURCE -#include "mmInt.h" - -#include "dmInt.h" -#include "dndTransport.h" - -#if 0 -static void mmInitOption(SDnode *pDnode, SMnodeOpt *pOption); -static void mmBuildOptionForDeploy(SDnode *pDnode, SMnodeOpt *pOption); -static void mmBuildOptionForOpen(SDnode *pDnode, SMnodeOpt *pOption); -static bool mmDeployRequired(SDnode *pDnode); -static int32_t mmOpenImp(SDnode *pDnode, SMnodeOpt *pOption); - -int32_t mmInit(SDnode *pDnode) { - dInfo("mnode mgmt start to init"); - int32_t code = -1; - - SMnodeMgmt *pMgmt = &pDnode->mmgmt; - taosInitRWLatch(&pMgmt->latch); - mmInitMsgFp(pMgmt); - - if (mmReadFile(pDnode) != 0) { - goto _OVER; - } - - if (pMgmt->dropped) { - dInfo("mnode has been dropped and needs to be deleted"); - mndDestroy(pDnode->dir.mnode); - code = 0; - goto _OVER; - } - - if (!pMgmt->deployed) { - bool required = mmDeployRequired(pDnode); - if (!required) { - dInfo("mnode does not need to be deployed"); - code = 0; - goto _OVER; - } - - dInfo("mnode start to deploy"); - SMnodeOpt option = {0}; - mmBuildOptionForDeploy(pDnode, &option); - code = mmOpen(pDnode, &option); - } else { - dInfo("mnode start to open"); - SMnodeOpt option = {0}; - mmBuildOptionForOpen(pDnode, &option); - code = mmOpen(pDnode, &option); - } - -_OVER: - if (code == 0) { - dInfo("mnode mgmt init success"); - } else { - dError("failed to init mnode mgmt since %s", terrstr()); - mmCleanup(pDnode); - } - - return code; -} - -void mmCleanup(SDnode *pDnode) { - dInfo("mnode mgmt start to clean up"); - SMnodeMgmt *pMgmt = &pDnode->mmgmt; - if (pMgmt->pMnode) { - mmStopWorker(pDnode); - mndClose(pMgmt->pMnode); - pMgmt->pMnode = NULL; - } - dInfo("mnode mgmt is cleaned up"); -} - -SMnode *mmAcquire(SDnode *pDnode) { - SMnodeMgmt *pMgmt = &pDnode->mmgmt; - SMnode *pMnode = NULL; - int32_t refCount = 0; - - taosRLockLatch(&pMgmt->latch); - if (pMgmt->deployed && !pMgmt->dropped) { - refCount = atomic_add_fetch_32(&pMgmt->refCount, 1); - pMnode = pMgmt->pMnode; - } else { - terrno = TSDB_CODE_DND_MNODE_NOT_DEPLOYED; - } - taosRUnLockLatch(&pMgmt->latch); - - if (pMnode != NULL) { - dTrace("acquire mnode, refCount:%d", refCount); - } - return pMnode; -} - -void mmRelease(SDnode *pDnode, SMnode *pMnode) { - if (pMnode == NULL) return; - - SMnodeMgmt *pMgmt = &pDnode->mmgmt; - taosRLockLatch(&pMgmt->latch); - int32_t refCount = atomic_sub_fetch_32(&pMgmt->refCount, 1); - taosRUnLockLatch(&pMgmt->latch); - dTrace("release mnode, refCount:%d", refCount); -} - -int32_t mmOpen(SDnode *pDnode, SMnodeOpt *pOption) { - SMnodeMgmt *pMgmt = &pDnode->mmgmt; - pMgmt->singleProc = true; - - int32_t code = mmOpenImp(pDnode, pOption); - - if (code == 0 && !pMgmt->singleProc) { - SProcCfg cfg = {.childQueueSize = 1024 * 1024, - .childConsumeFp = (ProcConsumeFp)mmConsumeChildQueue, - .childMallocHeadFp = (ProcMallocFp)taosAllocateQitem, - .childFreeHeadFp = (ProcFreeFp)taosFreeQitem, - .childMallocBodyFp = (ProcMallocFp)rpcMallocCont, - .childFreeBodyFp = (ProcFreeFp)rpcFreeCont, - .parentQueueSize = 1024 * 1024, - .parentConsumeFp = (ProcConsumeFp)mmConsumeParentQueue, - .parentdMallocHeadFp = (ProcMallocFp)malloc, - .parentFreeHeadFp = (ProcFreeFp)free, - .parentMallocBodyFp = (ProcMallocFp)rpcMallocCont, - .parentFreeBodyFp = (ProcFreeFp)rpcFreeCont, - .testFlag = 0, - .pParent = pDnode, - .name = "mnode"}; - - pMgmt->pProcess = taosProcInit(&cfg); - if (pMgmt->pProcess == NULL) { - return -1; - } - - return taosProcRun(pMgmt->pProcess); - } - - return code; -} - -int32_t mmAlter(SDnode *pDnode, SMnodeOpt *pOption) { - SMnodeMgmt *pMgmt = &pDnode->mmgmt; - - SMnode *pMnode = mmAcquire(pDnode); - if (pMnode == NULL) { - dError("failed to alter mnode since %s", terrstr()); - return -1; - } - - if (mndAlter(pMnode, pOption) != 0) { - dError("failed to alter mnode since %s", terrstr()); - mmRelease(pDnode, pMnode); - return -1; - } - - mmRelease(pDnode, pMnode); - return 0; -} - -int32_t mmDrop(SDnode *pDnode) { - SMnodeMgmt *pMgmt = &pDnode->mmgmt; - - SMnode *pMnode = mmAcquire(pDnode); - if (pMnode == NULL) { - dError("failed to drop mnode since %s", terrstr()); - return -1; - } - - taosRLockLatch(&pMgmt->latch); - pMgmt->dropped = 1; - taosRUnLockLatch(&pMgmt->latch); - - if (mmWriteFile(pDnode) != 0) { - taosRLockLatch(&pMgmt->latch); - pMgmt->dropped = 0; - taosRUnLockLatch(&pMgmt->latch); - - mmRelease(pDnode, pMnode); - dError("failed to drop mnode since %s", terrstr()); - return -1; - } - - mmRelease(pDnode, pMnode); - mmStopWorker(pDnode); - pMgmt->deployed = 0; - mmWriteFile(pDnode); - mndClose(pMnode); - pMgmt->pMnode = NULL; - mndDestroy(pDnode->dir.mnode); - - return 0; -} - -static bool mmDeployRequired(SDnode *pDnode) { - if (dmGetDnodeId(pDnode) > 0) { - return false; - } - - if (dmGetClusterId(pDnode) > 0) { - return false; - } - - if (strcmp(pDnode->cfg.localEp, pDnode->cfg.firstEp) != 0) { - return false; - } - - return true; -} - -static void mmInitOption(SDnode *pDnode, SMnodeOpt *pOption) { - pOption->pDnode = pDnode; - pOption->sendReqToDnodeFp = dndSendReqToDnode; - pOption->sendReqToMnodeFp = dndSendReqToMnode; - pOption->sendRedirectRspFp = dmSendRedirectRsp; - pOption->putReqToMWriteQFp = mmPutMsgToWriteQueue; - pOption->putReqToMReadQFp = mmPutMsgToReadQueue; - pOption->dnodeId = dmGetDnodeId(pDnode); - pOption->clusterId = dmGetClusterId(pDnode); -} - -static void mmBuildOptionForDeploy(SDnode *pDnode, SMnodeOpt *pOption) { - mmInitOption(pDnode, pOption); - pOption->replica = 1; - pOption->selfIndex = 0; - SReplica *pReplica = &pOption->replicas[0]; - pReplica->id = 1; - pReplica->port = pDnode->cfg.serverPort; - memcpy(pReplica->fqdn, pDnode->cfg.localFqdn, TSDB_FQDN_LEN); - - SMnodeMgmt *pMgmt = &pDnode->mmgmt; - pMgmt->selfIndex = pOption->selfIndex; - pMgmt->replica = pOption->replica; - memcpy(&pMgmt->replicas, pOption->replicas, sizeof(SReplica) * TSDB_MAX_REPLICA); -} - -static void mmBuildOptionForOpen(SDnode *pDnode, SMnodeOpt *pOption) { - mmInitOption(pDnode, pOption); - SMnodeMgmt *pMgmt = &pDnode->mmgmt; - pOption->selfIndex = pMgmt->selfIndex; - pOption->replica = pMgmt->replica; - memcpy(&pOption->replicas, pMgmt->replicas, sizeof(SReplica) * TSDB_MAX_REPLICA); -} - -int32_t mmBuildOptionFromReq(SDnode *pDnode, SMnodeOpt *pOption, SDCreateMnodeReq *pCreate) { - mmInitOption(pDnode, pOption); - pOption->dnodeId = dmGetDnodeId(pDnode); - pOption->clusterId = dmGetClusterId(pDnode); - - pOption->replica = pCreate->replica; - pOption->selfIndex = -1; - for (int32_t i = 0; i < pCreate->replica; ++i) { - SReplica *pReplica = &pOption->replicas[i]; - pReplica->id = pCreate->replicas[i].id; - pReplica->port = pCreate->replicas[i].port; - memcpy(pReplica->fqdn, pCreate->replicas[i].fqdn, TSDB_FQDN_LEN); - if (pReplica->id == pOption->dnodeId) { - pOption->selfIndex = i; - } - } - - if (pOption->selfIndex == -1) { - dError("failed to build mnode options since %s", terrstr()); - return -1; - } - - SMnodeMgmt *pMgmt = &pDnode->mmgmt; - pMgmt->selfIndex = pOption->selfIndex; - pMgmt->replica = pOption->replica; - memcpy(&pMgmt->replicas, pOption->replicas, sizeof(SReplica) * TSDB_MAX_REPLICA); - return 0; -} - -static int32_t mmOpenImp(SDnode *pDnode, SMnodeOpt *pOption) { - SMnodeMgmt *pMgmt = &pDnode->mmgmt; - - SMnode *pMnode = mndOpen(pDnode->dir.mnode, pOption); - if (pMnode == NULL) { - dError("failed to open mnode since %s", terrstr()); - return -1; - } - - if (mmStartWorker(pDnode) != 0) { - dError("failed to start mnode worker since %s", terrstr()); - mndClose(pMnode); - mndDestroy(pDnode->dir.mnode); - return -1; - } - - pMgmt->deployed = 1; - if (mmWriteFile(pDnode) != 0) { - dError("failed to write mnode file since %s", terrstr()); - pMgmt->deployed = 0; - mmStopWorker(pDnode); - mndClose(pMnode); - mndDestroy(pDnode->dir.mnode); - return -1; - } - - taosWLockLatch(&pMgmt->latch); - pMgmt->pMnode = pMnode; - pMgmt->deployed = 1; - taosWUnLockLatch(&pMgmt->latch); - - dInfo("mnode open successfully"); - return 0; -} - -#endif - diff --git a/source/dnode/mgmt/mnode/src/mmMsg.c b/source/dnode/mgmt/mnode/src/mmMsg.c index 389ebfe853..bbb49848cc 100644 --- a/source/dnode/mgmt/mnode/src/mmMsg.c +++ b/source/dnode/mgmt/mnode/src/mmMsg.c @@ -15,12 +15,13 @@ #define _DEFAULT_SOURCE #include "mmMsg.h" +#include "dmInt.h" #include "mmWorker.h" -#if 0 -#include "dmInt.h" - int32_t mmProcessCreateReq(SDnode *pDnode, SRpcMsg *pReq) { + SMgmtWrapper *pWrapper = dndGetWrapper(pDnode, MNODE); + SMnodeMgmt *pMgmt = pWrapper->pMgmt; + SDCreateMnodeReq createReq = {0}; if (tDeserializeSDCreateMnodeReq(pReq->pCont, pReq->contLen, &createReq) != 0) { terrno = TSDB_CODE_INVALID_MSG; @@ -34,25 +35,28 @@ int32_t mmProcessCreateReq(SDnode *pDnode, SRpcMsg *pReq) { } SMnodeOpt option = {0}; - if (mmBuildOptionFromReq(pDnode, &option, &createReq) != 0) { + if (mmBuildOptionFromReq(pMgmt, &option, &createReq) != 0) { terrno = TSDB_CODE_DND_MNODE_INVALID_OPTION; dError("failed to create mnode since %s", terrstr()); return -1; } - SMnode *pMnode = mmAcquire(pDnode); + SMnode *pMnode = mmAcquire(pMgmt); if (pMnode != NULL) { - mmRelease(pDnode, pMnode); + mmRelease(pMgmt, pMnode); terrno = TSDB_CODE_DND_MNODE_ALREADY_DEPLOYED; dError("failed to create mnode since %s", terrstr()); return -1; } dDebug("start to create mnode"); - return mmOpen(pDnode, &option); + return mmOpen(pMgmt, &option); } int32_t mmProcessAlterReq(SDnode *pDnode, SRpcMsg *pReq) { + SMgmtWrapper *pWrapper = dndGetWrapper(pDnode, MNODE); + SMnodeMgmt *pMgmt = pWrapper->pMgmt; + SDAlterMnodeReq alterReq = {0}; if (tDeserializeSDCreateMnodeReq(pReq->pCont, pReq->contLen, &alterReq) != 0) { terrno = TSDB_CODE_INVALID_MSG; @@ -66,13 +70,13 @@ int32_t mmProcessAlterReq(SDnode *pDnode, SRpcMsg *pReq) { } SMnodeOpt option = {0}; - if (mmBuildOptionFromReq(pDnode, &option, &alterReq) != 0) { + if (mmBuildOptionFromReq(pMgmt, &option, &alterReq) != 0) { terrno = TSDB_CODE_DND_MNODE_INVALID_OPTION; dError("failed to alter mnode since %s", terrstr()); return -1; } - SMnode *pMnode = mmAcquire(pDnode); + SMnode *pMnode = mmAcquire(pMgmt); if (pMnode == NULL) { terrno = TSDB_CODE_DND_MNODE_NOT_DEPLOYED; dError("failed to alter mnode since %s", terrstr()); @@ -80,13 +84,16 @@ int32_t mmProcessAlterReq(SDnode *pDnode, SRpcMsg *pReq) { } dDebug("start to alter mnode"); - int32_t code = mmAlter(pDnode, &option); - mmRelease(pDnode, pMnode); + int32_t code = mmAlter(pMgmt, &option); + mmRelease(pMgmt, pMnode); return code; } int32_t mmProcessDropReq(SDnode *pDnode, SRpcMsg *pReq) { + SMgmtWrapper *pWrapper = dndGetWrapper(pDnode, MNODE); + SMnodeMgmt *pMgmt = pWrapper->pMgmt; + SDDropMnodeReq dropReq = {0}; if (tDeserializeSMCreateDropMnodeReq(pReq->pCont, pReq->contLen, &dropReq) != 0) { terrno = TSDB_CODE_INVALID_MSG; @@ -99,7 +106,7 @@ int32_t mmProcessDropReq(SDnode *pDnode, SRpcMsg *pReq) { return -1; } - SMnode *pMnode = mmAcquire(pDnode); + SMnode *pMnode = mmAcquire(pMgmt); if (pMnode == NULL) { terrno = TSDB_CODE_DND_MNODE_NOT_DEPLOYED; dError("failed to drop mnode since %s", terrstr()); @@ -107,45 +114,12 @@ int32_t mmProcessDropReq(SDnode *pDnode, SRpcMsg *pReq) { } dDebug("start to drop mnode"); - int32_t code = mmDrop(pDnode); - mmRelease(pDnode, pMnode); + int32_t code = mmDrop(pMgmt); + mmRelease(pMgmt, pMnode); return code; } -int32_t mmGetMonitorInfo(SDnode *pDnode, SMonClusterInfo *pClusterInfo, SMonVgroupInfo *pVgroupInfo, - SMonGrantInfo *pGrantInfo) { - SMnode *pMnode = mmAcquire(pDnode); - if (pMnode == NULL) return -1; - - int32_t code = mndGetMonitorInfo(pMnode, pClusterInfo, pVgroupInfo, pGrantInfo); - mmRelease(pDnode, pMnode); - return code; -} - -int32_t mmGetUserAuth(SDnode *pDnode, char *user, char *spi, char *encrypt, char *secret, char *ckey) { - SMnodeMgmt *pMgmt = &pDnode->mmgmt; - - SMnode *pMnode = mmAcquire(pDnode); - if (pMnode == NULL) { - terrno = TSDB_CODE_APP_NOT_READY; - dTrace("failed to get user auth since %s", terrstr()); - return -1; - } - - int32_t code = mndRetriveAuth(pMnode, user, spi, encrypt, secret, ckey); - mmRelease(pDnode, pMnode); - - dTrace("user:%s, retrieve auth spi:%d encrypt:%d", user, *spi, *encrypt); - return code; -} - -#endif - -int32_t mmProcessCreateReq(SDnode *pDnode, SRpcMsg *pRpcMsg) {return 0;} -int32_t mmProcessAlterReq(SDnode *pDnode, SRpcMsg *pRpcMsg) {return 0;} -int32_t mmProcessDropReq(SDnode *pDnode, SRpcMsg *pRpcMsg) {return 0;} - void mmInitMsgHandles(SMgmtWrapper *pWrapper) { // Requests handled by DNODE dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_MNODE_RSP, mmProcessWriteMsg); diff --git a/source/dnode/mgmt/mnode/src/mmWorker.c b/source/dnode/mgmt/mnode/src/mmWorker.c index 3ca112782f..de8809ab17 100644 --- a/source/dnode/mgmt/mnode/src/mmWorker.c +++ b/source/dnode/mgmt/mnode/src/mmWorker.c @@ -20,16 +20,63 @@ #include "dndTransport.h" #include "dndWorker.h" -#if 0 -static int32_t mmProcessWriteMsg(SDnode *pDnode, SMndMsg *pMsg); -static int32_t mmProcessSyncMsg(SDnode *pDnode, SMndMsg *pMsg); -static int32_t mmProcessReadMsg(SDnode *pDnode, SMndMsg *pMsg); -static int32_t mmPutMndMsgToWorker(SDnode *pDnode, SDnodeWorker *pWorker, SMndMsg *pMsg); -static int32_t mmPutRpcMsgToWorker(SDnode *pDnode, SDnodeWorker *pWorker, SRpcMsg *pRpc); -static void mmConsumeMsgQueue(SDnode *pDnode, SMndMsg *pMsg); +static void mmSendRpcRsp(SMnodeMgmt *pMgmt, SRpcMsg *pRpc) { + if (pRpc->code == TSDB_CODE_DND_MNODE_NOT_DEPLOYED || pRpc->code == TSDB_CODE_APP_NOT_READY) { + dmSendRedirectRsp(pMgmt->pDnode, pRpc); + } else { + rpcSendResponse(pRpc); + } +} -int32_t mmStartWorker(SDnode *pDnode) { - SMnodeMgmt *pMgmt = &pDnode->mmgmt; +void mmPutRpcRspToWorker(SMnodeMgmt *pMgmt, SRpcMsg *pRpc) { + int32_t code = -1; + + if (pMgmt->singleProc) { + mmSendRpcRsp(pMgmt, pRpc); + } else { + do { + code = taosProcPutToParentQueue(pMgmt->pProcess, pRpc, sizeof(SRpcMsg), pRpc->pCont, pRpc->contLen); + if (code != 0) { + taosMsleep(10); + } + } while (code != 0); + } +} + +static void mmConsumeMsgQueue(SMnodeMgmt *pMgmt, SNodeMsg *pMsg) { + dTrace("msg:%p, will be processed", pMsg); + + SMnode *pMnode = mmAcquire(pMgmt); + SRpcMsg *pRpc = &pMsg->rpcMsg; + bool isReq = (pRpc->msgType & 1U); + int32_t code = -1; + + if (pMnode != NULL) { + pMsg->pNode = pMnode; + code = mndProcessMsg((SMndMsg*)pMsg); + mmRelease(pMgmt, pMnode); + } + + if (isReq) { + if (pMsg->rpcMsg.handle == NULL) return; + if (code == 0) { + SRpcMsg rsp = {.handle = pRpc->handle, .contLen = pMsg->rspLen, .pCont = pMsg->pRsp}; + mmPutRpcRspToWorker(pMgmt, &rsp); + } else { + if (terrno != TSDB_CODE_MND_ACTION_IN_PROGRESS) { + SRpcMsg rsp = {.handle = pRpc->handle, .contLen = pMsg->rspLen, .pCont = pMsg->pRsp, .code = terrno}; + mmPutRpcRspToWorker(pMgmt, &rsp); + } + } + } + + dTrace("msg:%p, is freed", pMsg); + rpcFreeCont(pRpc->pCont); + taosFreeQitem(pMsg); +} + +int32_t mmStartWorker(SMnodeMgmt *pMgmt) { + SDnode *pDnode = pMgmt->pDnode; if (dndInitWorker(pDnode, &pMgmt->readWorker, DND_WORKER_SINGLE, "mnode-read", 0, 1, mmConsumeMsgQueue) != 0) { dError("failed to start mnode read worker since %s", terrstr()); return -1; @@ -48,9 +95,7 @@ int32_t mmStartWorker(SDnode *pDnode) { return 0; } -void mmStopWorker(SDnode *pDnode) { - SMnodeMgmt *pMgmt = &pDnode->mmgmt; - +void mmStopWorker(SMnodeMgmt *pMgmt) { taosWLockLatch(&pMgmt->latch); pMgmt->deployed = 0; taosWUnLockLatch(&pMgmt->latch); @@ -64,122 +109,41 @@ void mmStopWorker(SDnode *pDnode) { dndCleanupWorker(&pMgmt->syncWorker); } -void mmInitMsgFp(SMnodeMgmt *pMgmt) { - -} - -static void mmSendRpcRsp(SDnode *pDnode, SRpcMsg *pRpc) { - if (pRpc->code == TSDB_CODE_DND_MNODE_NOT_DEPLOYED || pRpc->code == TSDB_CODE_APP_NOT_READY) { - dmSendRedirectRsp(pDnode, pRpc); - } else { - rpcSendResponse(pRpc); - } -} - -static int32_t mmBuildMsg(SMndMsg *pMsg, SRpcMsg *pRpc) { - SRpcConnInfo connInfo = {0}; - if ((pRpc->msgType & 1U) && rpcGetConnInfo(pRpc->handle, &connInfo) != 0) { - terrno = TSDB_CODE_MND_NO_USER_FROM_CONN; - dError("failed to create msg since %s, app:%p RPC:%p", terrstr(), pRpc->ahandle, pRpc->handle); - return -1; - } - - memcpy(pMsg->user, connInfo.user, TSDB_USER_LEN); - pMsg->rpcMsg = *pRpc; - pMsg->createdTime = taosGetTimestampSec(); - - return 0; -} - -void mmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) { - SMnodeMgmt *pMgmt = &pDnode->mmgmt; - int32_t code = -1; - SMndMsg *pMsg = NULL; - - MndMsgFp msgFp = pMgmt->msgFp[TMSG_INDEX(pRpc->msgType)]; - if (msgFp == NULL) { - terrno = TSDB_CODE_MSG_NOT_PROCESSED; - goto _OVER; - } - - pMsg = taosAllocateQitem(sizeof(SMndMsg)); - if (pMsg == NULL) { - goto _OVER; - } - - if (mmBuildMsg(pMsg, pRpc) != 0) { - goto _OVER; - } - - dTrace("msg:%p, is created, app:%p RPC:%p user:%s", pMsg, pRpc->ahandle, pRpc->handle, pMsg->user); - - if (pMgmt->singleProc) { - code = (*msgFp)(pDnode, pMsg); - } else { - code = taosProcPutToChildQueue(pMgmt->pProcess, pMsg, sizeof(SMndMsg), pRpc->pCont, pRpc->contLen); - } - -_OVER: - - if (code == 0) { - if (!pMgmt->singleProc) { - dTrace("msg:%p, is freed", pMsg); - taosFreeQitem(pMsg); - rpcFreeCont(pRpc->pCont); - } - } else { - bool isReq = (pRpc->msgType & 1U); - if (isReq) { - SRpcMsg rsp = {.handle = pRpc->handle, .ahandle = pRpc->ahandle, .code = terrno}; - mmSendRpcRsp(pDnode, &rsp); - } - dTrace("msg:%p, is freed", pMsg); - taosFreeQitem(pMsg); - rpcFreeCont(pRpc->pCont); - } -} - -int32_t mmProcessWriteMsg(SDnode *pDnode, SMndMsg *pMsg) { - return mmPutMndMsgToWorker(pDnode, &pDnode->mmgmt.writeWorker, pMsg); -} - -int32_t mmProcessSyncMsg(SDnode *pDnode, SMndMsg *pMsg) { - return mmPutMndMsgToWorker(pDnode, &pDnode->mmgmt.syncWorker, pMsg); -} - -int32_t mmProcessReadMsg(SDnode *pDnode, SMndMsg *pMsg) { - return mmPutMndMsgToWorker(pDnode, &pDnode->mmgmt.readWorker, pMsg); -} - -int32_t mmPutMsgToWriteQueue(SDnode *pDnode, SRpcMsg *pRpc) { - return mmPutRpcMsgToWorker(pDnode, &pDnode->mmgmt.writeWorker, pRpc); -} - -int32_t mmPutMsgToReadQueue(SDnode *pDnode, SRpcMsg *pRpc) { - return mmPutRpcMsgToWorker(pDnode, &pDnode->mmgmt.readWorker, pRpc); -} - -static int32_t mmPutMndMsgToWorker(SDnode *pDnode, SDnodeWorker *pWorker, SMndMsg *pMsg) { - SMnode *pMnode = mmAcquire(pDnode); +static int32_t mmPutMsgToWorker(SMnodeMgmt *pMgmt, SDnodeWorker *pWorker, SNodeMsg *pMsg) { + SMnode *pMnode = mmAcquire(pMgmt); if (pMnode == NULL) return -1; dTrace("msg:%p, put into worker %s", pMsg, pWorker->name); int32_t code = dndWriteMsgToWorker(pWorker, pMsg, 0); - mmRelease(pDnode, pMnode); + mmRelease(pMgmt, pMnode); return code; } -static int32_t mmPutRpcMsgToWorker(SDnode *pDnode, SDnodeWorker *pWorker, SRpcMsg *pRpc) { - SMndMsg *pMsg = taosAllocateQitem(sizeof(SMndMsg)); +int32_t mmProcessWriteMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { + SMnodeMgmt *pMgmt = pWrapper->pMgmt; + return mmPutMsgToWorker(pMgmt, &pMgmt->writeWorker, pMsg); +} + +int32_t mmProcessSyncMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { + SMnodeMgmt *pMgmt = pWrapper->pMgmt; + return mmPutMsgToWorker(pMgmt, &pMgmt->syncWorker, pMsg); +} + +int32_t mmProcessReadMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { + SMnodeMgmt *pMgmt = pWrapper->pMgmt; + return mmPutMsgToWorker(pMgmt, &pMgmt->readWorker, pMsg); +} + +static int32_t mmPutRpcMsgToWorker(SMgmtWrapper *pWrapper, SDnodeWorker *pWorker, SRpcMsg *pRpc) { + SNodeMsg *pMsg = taosAllocateQitem(sizeof(SNodeMsg)); if (pMsg == NULL) { return -1; } dTrace("msg:%p, is created", pMsg); pMsg->rpcMsg = *pRpc; - pMsg->createdTime = taosGetTimestampSec(); - int32_t code = mmPutMndMsgToWorker(pDnode, pWorker, pMsg); + int32_t code = mmPutMsgToWorker(pWrapper->pMgmt, pWorker, pMsg); if (code != 0) { dTrace("msg:%p, is freed", pMsg); taosFreeQitem(pMsg); @@ -189,88 +153,14 @@ static int32_t mmPutRpcMsgToWorker(SDnode *pDnode, SDnodeWorker *pWorker, SRpcMs return code; } -void mmPutRpcRspToWorker(SDnode *pDnode, SRpcMsg *pRpc) { - SMnodeMgmt *pMgmt = &pDnode->mmgmt; - int32_t code = -1; - - if (pMgmt->singleProc) { - mmSendRpcRsp(pDnode, pRpc); - } else { - do { - code = taosProcPutToParentQueue(pMgmt->pProcess, pRpc, sizeof(SRpcMsg), pRpc->pCont, pRpc->contLen); - if (code != 0) { - taosMsleep(10); - } - } while (code != 0); - } +int32_t mmPutMsgToWriteQueue(SDnode *pDnode, SRpcMsg *pRpc) { + SMgmtWrapper *pWrapper = dndGetWrapper(pDnode, MNODE); + SMnodeMgmt *pMgmt = pWrapper->pMgmt; + return mmPutRpcMsgToWorker(pWrapper, &pMgmt->writeWorker, pRpc); } -void mmConsumeChildQueue(SDnode *pDnode, SMndMsg *pMsg, int32_t msgLen, void *pCont, int32_t contLen) { - dTrace("msg:%p, get from child queue", pMsg); - SMnodeMgmt *pMgmt = &pDnode->mmgmt; - - SRpcMsg *pRpc = &pMsg->rpcMsg; - pRpc->pCont = pCont; - - MndMsgFp msgFp = pMgmt->msgFp[TMSG_INDEX(pRpc->msgType)]; - int32_t code = (*msgFp)(pDnode, pMsg); - - if (code != 0) { - bool isReq = (pRpc->msgType & 1U); - if (isReq) { - SRpcMsg rsp = {.handle = pRpc->handle, .ahandle = pRpc->ahandle, .code = terrno}; - mmPutRpcRspToWorker(pDnode, &rsp); - } - - dTrace("msg:%p, is freed", pMsg); - taosFreeQitem(pMsg); - rpcFreeCont(pCont); - } +int32_t mmPutMsgToReadQueue(SDnode *pDnode, SRpcMsg *pRpc) { + SMgmtWrapper *pWrapper = dndGetWrapper(pDnode, MNODE); + SMnodeMgmt *pMgmt = pWrapper->pMgmt; + return mmPutRpcMsgToWorker(pWrapper, &pMgmt->readWorker, pRpc); } - -void mmConsumeParentQueue(SDnode *pDnode, SRpcMsg *pMsg, int32_t msgLen, void *pCont, int32_t contLen) { - dTrace("msg:%p, get from parent queue", pMsg); - pMsg->pCont = pCont; - mmSendRpcRsp(pDnode, pMsg); - free(pMsg); -} - -static void mmConsumeMsgQueue(SDnode *pDnode, SMndMsg *pMsg) { - dTrace("msg:%p, get from msg queue", pMsg); - SMnode *pMnode = mmAcquire(pDnode); - SRpcMsg *pRpc = &pMsg->rpcMsg; - bool isReq = (pRpc->msgType & 1U); - int32_t code = -1; - - if (pMnode != NULL) { - pMsg->pMnode = pMnode; - code = mndProcessMsg(pMsg); - mmRelease(pDnode, pMnode); - } - - if (isReq) { - if (pMsg->rpcMsg.handle == NULL) return; - if (code == 0) { - SRpcMsg rsp = {.handle = pRpc->handle, .contLen = pMsg->contLen, .pCont = pMsg->pCont}; - mmPutRpcRspToWorker(pDnode, &rsp); - } else { - if (terrno != TSDB_CODE_MND_ACTION_IN_PROGRESS) { - SRpcMsg rsp = {.handle = pRpc->handle, .contLen = pMsg->contLen, .pCont = pMsg->pCont, .code = terrno}; - mmPutRpcRspToWorker(pDnode, &rsp); - } - } - } - - dTrace("msg:%p, is freed", pMsg); - rpcFreeCont(pRpc->pCont); - taosFreeQitem(pMsg); -} - -#endif - -int32_t mmProcessWriteMsg( SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {return 0;} -int32_t mmProcessSyncMsg( SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {return 0;} -int32_t mmProcessReadMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { - terrno = TSDB_CODE_MSG_NOT_PROCESSED; - return -1; -} \ No newline at end of file diff --git a/source/util/src/tprocess.c b/source/util/src/tprocess.c index 54cd894692..0286bdf972 100644 --- a/source/util/src/tprocess.c +++ b/source/util/src/tprocess.c @@ -348,13 +348,13 @@ SProcObj *taosProcInit(const SProcCfg *pCfg) { pProc->pChildQueue = taosProcQueueInit(pCfg->childQueueSize); pProc->pParentQueue = taosProcQueueInit(pCfg->parentQueueSize); - if (pProc->pChildQueue == NULL || pProc->pParentQueue == NULL) { + if (pProc->pChildQueue == NULL || pProc->pParentQueue == NULL) { taosProcQueueCleanup(pProc->pChildQueue); free(pProc); return NULL; } - pProc->pChildQueue->name = pCfg->name; + pProc->pChildQueue->name = pCfg->name; pProc->pChildQueue->pParent = pCfg->pParent; pProc->pChildQueue->mallocHeadFp = pCfg->childMallocHeadFp; pProc->pChildQueue->freeHeadFp = pCfg->childFreeHeadFp; @@ -436,8 +436,7 @@ int32_t taosProcRun(SProcObj *pProc) { void taosProcStop(SProcObj *pProc) { pProc->stopFlag = true; - // todo - // join + // todo join } bool taosProcIsChild(SProcObj *pProc) { return pProc->isChild; } @@ -445,6 +444,7 @@ bool taosProcIsChild(SProcObj *pProc) { return pProc->isChild; } void taosProcCleanup(SProcObj *pProc) { if (pProc != NULL) { uDebug("proc:%s, clean up", pProc->name); + taosProcStop(pProc); taosProcQueueCleanup(pProc->pChildQueue); taosProcQueueCleanup(pProc->pParentQueue); free(pProc);