From 877cb761f1b4fdc316078b47ffa0abbee1e17783 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Tue, 14 Jun 2022 09:43:22 +0800 Subject: [PATCH 1/9] refactor: mnode worker --- include/dnode/mnode/mnode.h | 1 + source/dnode/mgmt/mgmt_mnode/inc/mmInt.h | 14 +- source/dnode/mgmt/mgmt_mnode/src/mmHandle.c | 218 ++++++++++---------- source/dnode/mgmt/mgmt_mnode/src/mmInt.c | 5 +- source/dnode/mgmt/mgmt_mnode/src/mmWorker.c | 88 ++++---- 5 files changed, 161 insertions(+), 165 deletions(-) diff --git a/include/dnode/mnode/mnode.h b/include/dnode/mnode/mnode.h index ab090940f2..83183422c2 100644 --- a/include/dnode/mnode/mnode.h +++ b/include/dnode/mnode/mnode.h @@ -83,6 +83,7 @@ int32_t mndGetLoad(SMnode *pMnode, SMnodeLoad *pLoad); */ int32_t mndProcessRpcMsg(SRpcMsg *pMsg); int32_t mndProcessSyncMsg(SRpcMsg *pMsg); +int32_t mndPreprocessQueryMsg(SMnode * pMnode, SRpcMsg * pMsg); /** * @brief Generate machine code diff --git a/source/dnode/mgmt/mgmt_mnode/inc/mmInt.h b/source/dnode/mgmt/mgmt_mnode/inc/mmInt.h index 22691300bc..2443493d0b 100644 --- a/source/dnode/mgmt/mgmt_mnode/inc/mmInt.h +++ b/source/dnode/mgmt/mgmt_mnode/inc/mmInt.h @@ -53,20 +53,18 @@ void mmRelease(SMnodeMgmt *pMgmt); SArray *mmGetMsgHandles(); int32_t mmProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg); int32_t mmProcessDropReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg); -int32_t mmProcessAlterReq(SMnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t mmProcessGetMonitorInfoReq(SMnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t mmProcessGetLoadsReq(SMnodeMgmt *pMgmt, SRpcMsg *pMsg); -int32_t mndPreprocessQueryMsg(SMnode * pMnode, SRpcMsg * pMsg); // mmWorker.c int32_t mmStartWorker(SMnodeMgmt *pMgmt); void mmStopWorker(SMnodeMgmt *pMgmt); -int32_t mmPutNodeMsgToWriteQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg); -int32_t mmPutNodeMsgToSyncQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg); -int32_t mmPutNodeMsgToReadQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg); -int32_t mmPutNodeMsgToQueryQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg); -int32_t mmPutNodeMsgToMonitorQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg); -int32_t mmPutRpcMsgToQueue(SMnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc); +int32_t mmPutMsgToWriteQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg); +int32_t mmPutMsgToSyncQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg); +int32_t mmPutMsgToReadQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg); +int32_t mmPutMsgToQueryQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg); +int32_t mmPutMsgToMonitorQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg); +int32_t mmPutMsgToQueue(SMnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc); #ifdef __cplusplus } diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c index c47988e1ef..f151a09b94 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c @@ -20,11 +20,6 @@ void mmGetMonitorInfo(SMnodeMgmt *pMgmt, SMonMmInfo *pInfo) { mndGetMonitorInfo(pMgmt->pMnode, &pInfo->cluster, &pInfo->vgroup, &pInfo->grant); } -void mmGetMnodeLoads(SMnodeMgmt *pMgmt, SMonMloadInfo *pInfo) { - pInfo->isMnode = 1; - mndGetLoad(pMgmt->pMnode, &pInfo->load); -} - int32_t mmProcessGetMonitorInfoReq(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) { SMonMmInfo mmInfo = {0}; mmGetMonitorInfo(pMgmt, &mmInfo); @@ -50,6 +45,11 @@ int32_t mmProcessGetMonitorInfoReq(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) { return 0; } +void mmGetMnodeLoads(SMnodeMgmt *pMgmt, SMonMloadInfo *pInfo) { + pInfo->isMnode = 1; + mndGetLoad(pMgmt->pMnode, &pInfo->load); +} + int32_t mmProcessGetLoadsReq(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) { SMonMloadInfo mloads = {0}; mmGetMnodeLoads(pMgmt, &mloads); @@ -129,114 +129,114 @@ SArray *mmGetMsgHandles() { SArray *pArray = taosArrayInit(64, sizeof(SMgmtHandle)); if (pArray == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_MNODE_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_MNODE_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_QNODE_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_QNODE_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_SNODE_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_SNODE_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_BNODE_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_BNODE_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_VNODE_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_VNODE_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_DND_CONFIG_DNODE_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_MNODE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_MNODE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_QNODE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_QNODE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_SNODE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_SNODE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_BNODE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_BNODE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_VNODE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_VNODE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_DND_CONFIG_DNODE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MND_CONNECT, mmPutNodeMsgToReadQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_ACCT, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MND_ALTER_ACCT, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_ACCT, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_USER, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MND_ALTER_USER, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_USER, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MND_GET_USER_AUTH, mmPutNodeMsgToReadQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_DNODE, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MND_CONFIG_DNODE, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_DNODE, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_MNODE, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MND_ALTER_MNODE, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MND_ALTER_MNODE_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MND_SET_STANDBY_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_MNODE, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_QNODE, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_QNODE, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MND_QNODE_LIST, mmPutNodeMsgToReadQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_SNODE, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_SNODE, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_BNODE, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_BNODE, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_DB, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_DB, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MND_USE_DB, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MND_ALTER_DB, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MND_COMPACT_DB, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MND_GET_DB_CFG, mmPutNodeMsgToReadQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MND_VGROUP_LIST, mmPutNodeMsgToReadQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MND_REDISTRIBUTE_VGROUP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MND_MERGE_VGROUP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MND_BALANCE_VGROUP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_FUNC, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MND_RETRIEVE_FUNC, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_FUNC, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_STB, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MND_ALTER_STB, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_STB, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MND_TABLE_META, mmPutNodeMsgToReadQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_SMA, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_SMA, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_STREAM, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MND_GET_INDEX, mmPutNodeMsgToReadQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MND_GET_TABLE_INDEX, mmPutNodeMsgToReadQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_TOPIC, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MND_ALTER_TOPIC, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_TOPIC, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MND_SUBSCRIBE, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MND_MQ_ASK_EP, mmPutNodeMsgToReadQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MND_MQ_DROP_CGROUP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MND_MQ_DROP_CGROUP_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MND_MQ_COMMIT_OFFSET, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MND_KILL_TRANS, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MND_KILL_QUERY, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MND_KILL_CONN, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MND_HEARTBEAT, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MND_STATUS, mmPutNodeMsgToReadQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MND_SYSTABLE_RETRIEVE, mmPutNodeMsgToReadQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MND_GRANT, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MND_AUTH, mmPutNodeMsgToReadQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_CONNECT, mmPutMsgToReadQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_ACCT, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_ALTER_ACCT, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_ACCT, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_USER, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_ALTER_USER, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_USER, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_GET_USER_AUTH, mmPutMsgToReadQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_DNODE, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_CONFIG_DNODE, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_DNODE, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_MNODE, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_ALTER_MNODE, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_ALTER_MNODE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_SET_STANDBY_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_MNODE, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_QNODE, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_QNODE, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_QNODE_LIST, mmPutMsgToReadQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_SNODE, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_SNODE, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_BNODE, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_BNODE, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_DB, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_DB, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_USE_DB, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_ALTER_DB, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_COMPACT_DB, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_GET_DB_CFG, mmPutMsgToReadQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_VGROUP_LIST, mmPutMsgToReadQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_REDISTRIBUTE_VGROUP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_MERGE_VGROUP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_BALANCE_VGROUP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_FUNC, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_RETRIEVE_FUNC, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_FUNC, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_STB, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_ALTER_STB, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_STB, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_TABLE_META, mmPutMsgToReadQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_SMA, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_SMA, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_STREAM, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_GET_INDEX, mmPutMsgToReadQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_GET_TABLE_INDEX, mmPutMsgToReadQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_TOPIC, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_ALTER_TOPIC, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_TOPIC, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_SUBSCRIBE, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_MQ_ASK_EP, mmPutMsgToReadQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_MQ_DROP_CGROUP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_MQ_DROP_CGROUP_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_MQ_COMMIT_OFFSET, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_KILL_TRANS, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_KILL_QUERY, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_KILL_CONN, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_HEARTBEAT, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_STATUS, mmPutMsgToReadQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_SYSTABLE_RETRIEVE, mmPutMsgToReadQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_GRANT, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_AUTH, mmPutMsgToReadQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY, mmPutNodeMsgToQueryQueue, 1) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY_CONTINUE, mmPutNodeMsgToQueryQueue, 1) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY_HEARTBEAT, mmPutNodeMsgToQueryQueue, 1) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_FETCH, mmPutNodeMsgToQueryQueue, 1) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_CREATE_STB_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_STB_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_STB_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_CREATE_SMA_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_SMA_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_VG_CHANGE_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_VG_DELETE_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_TASK, mmPutNodeMsgToQueryQueue, 1) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DEPLOY_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIG_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_REPLICA_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIRM_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_HASHRANGE_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_COMPACT_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY, mmPutMsgToQueryQueue, 1) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY_CONTINUE, mmPutMsgToQueryQueue, 1) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY_HEARTBEAT, mmPutMsgToQueryQueue, 1) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_FETCH, mmPutMsgToQueryQueue, 1) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_CREATE_STB_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_STB_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_STB_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_CREATE_SMA_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_SMA_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_VG_CHANGE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_VG_DELETE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_TASK, mmPutMsgToQueryQueue, 1) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DEPLOY_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIG_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_REPLICA_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIRM_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_HASHRANGE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_COMPACT_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MON_MM_INFO, mmPutNodeMsgToMonitorQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MON_MM_LOAD, mmPutNodeMsgToMonitorQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MON_MM_INFO, mmPutMsgToMonitorQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MON_MM_LOAD, mmPutMsgToMonitorQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_SYNC_TIMEOUT, mmPutNodeMsgToSyncQueue, 1) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_SYNC_PING, mmPutNodeMsgToSyncQueue, 1) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_SYNC_PING_REPLY, mmPutNodeMsgToSyncQueue, 1) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_SYNC_CLIENT_REQUEST, mmPutNodeMsgToSyncQueue, 1) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_SYNC_CLIENT_REQUEST_REPLY, mmPutNodeMsgToSyncQueue, 1) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_SYNC_REQUEST_VOTE, mmPutNodeMsgToSyncQueue, 1) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_SYNC_REQUEST_VOTE_REPLY, mmPutNodeMsgToSyncQueue, 1) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_SYNC_APPEND_ENTRIES, mmPutNodeMsgToSyncQueue, 1) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_SYNC_APPEND_ENTRIES_REPLY, mmPutNodeMsgToSyncQueue, 1) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_SYNC_SNAPSHOT_SEND, mmPutNodeMsgToSyncQueue, 1) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_SYNC_SNAPSHOT_RSP, mmPutNodeMsgToSyncQueue, 1) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MND_SET_STANDBY, mmPutNodeMsgToSyncQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_SYNC_TIMEOUT, mmPutMsgToSyncQueue, 1) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_SYNC_PING, mmPutMsgToSyncQueue, 1) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_SYNC_PING_REPLY, mmPutMsgToSyncQueue, 1) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_SYNC_CLIENT_REQUEST, mmPutMsgToSyncQueue, 1) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_SYNC_CLIENT_REQUEST_REPLY, mmPutMsgToSyncQueue, 1) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_SYNC_REQUEST_VOTE, mmPutMsgToSyncQueue, 1) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_SYNC_REQUEST_VOTE_REPLY, mmPutMsgToSyncQueue, 1) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_SYNC_APPEND_ENTRIES, mmPutMsgToSyncQueue, 1) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_SYNC_APPEND_ENTRIES_REPLY, mmPutMsgToSyncQueue, 1) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_SYNC_SNAPSHOT_SEND, mmPutMsgToSyncQueue, 1) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_SYNC_SNAPSHOT_RSP, mmPutMsgToSyncQueue, 1) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_SET_STANDBY, mmPutMsgToSyncQueue, 0) == NULL) goto _OVER; code = 0; diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmInt.c b/source/dnode/mgmt/mgmt_mnode/src/mmInt.c index 0f3c06cb3a..cb942e8d1c 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmInt.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmInt.c @@ -43,7 +43,6 @@ static void mmBuildOptionForDeploy(SMnodeMgmt *pMgmt, const SMgmtInputOpt *pInpu pOption->deploy = true; pOption->msgCb = pMgmt->msgCb; pOption->dnodeId = pMgmt->pData->dnodeId; - pOption->replica = 1; pOption->selfIndex = 0; @@ -54,8 +53,8 @@ static void mmBuildOptionForDeploy(SMnodeMgmt *pMgmt, const SMgmtInputOpt *pInpu } static void mmBuildOptionForOpen(SMnodeMgmt *pMgmt, SMnodeOpt *pOption) { - pOption->deploy = false; pOption->standby = false; + pOption->deploy = false; pOption->msgCb = pMgmt->msgCb; pOption->dnodeId = pMgmt->pData->dnodeId; @@ -105,7 +104,7 @@ static int32_t mmOpen(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) { pMgmt->path = pInput->path; pMgmt->name = pInput->name; pMgmt->msgCb = pInput->msgCb; - pMgmt->msgCb.putToQueueFp = (PutToQueueFp)mmPutRpcMsgToQueue; + pMgmt->msgCb.putToQueueFp = (PutToQueueFp)mmPutMsgToQueue; pMgmt->msgCb.mgmt = pMgmt; taosThreadRwlockInit(&pMgmt->lock, NULL); diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c b/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c index 53943b61b0..6da9bfc528 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c @@ -26,7 +26,7 @@ static inline void mmSendRsp(SRpcMsg *pMsg, int32_t code) { tmsgSendRsp(&rsp); } -static void mmProcessQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) { +static void mmProcessRpcMsg(SQueueInfo *pInfo, SRpcMsg *pMsg) { SMnodeMgmt *pMgmt = pInfo->ahandle; int32_t code = -1; dTrace("msg:%p, get from mnode queue", pMsg); @@ -53,11 +53,10 @@ static void mmProcessQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) { taosFreeQitem(pMsg); } -static void mmProcessSyncQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) { +static void mmProcessSyncMsg(SQueueInfo *pInfo, SRpcMsg *pMsg) { SMnodeMgmt *pMgmt = pInfo->ahandle; - dTrace("msg:%p, get from mnode-sync queue", pMsg); - pMsg->info.node = pMgmt->pMnode; + dTrace("msg:%p, get from mnode-sync queue", pMsg); SMsgHead *pHead = pMsg->pCont; pHead->contLen = ntohl(pHead->contLen); @@ -70,64 +69,63 @@ static void mmProcessSyncQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) { taosFreeQitem(pMsg); } -static int32_t mmPutNodeMsgToWorker(SSingleWorker *pWorker, SRpcMsg *pMsg) { - dTrace("msg:%p, put into %s queue, type:%s", pMsg, pWorker->name, TMSG_INFO(pMsg->msgType)); - taosWriteQitem(pWorker->queue, pMsg); - return 0; +static inline int32_t mmPutMsgToWorker(SMnodeMgmt *pMgmt, SSingleWorker *pWorker, SRpcMsg *pMsg) { + if (mmAcquire(pMgmt) == 0) { + dTrace("msg:%p, put into %s queue, type:%s", pMsg, pWorker->name, TMSG_INFO(pMsg->msgType)); + taosWriteQitem(pWorker->queue, pMsg); + mmRelease(pMgmt); + return 0; + } else { + dTrace("msg:%p, failed to put into %s queue since %s, type:%s", pMsg, pWorker->name, terrstr(), + TMSG_INFO(pMsg->msgType)); + return -1; + } } -int32_t mmPutNodeMsgToWriteQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) { - return mmPutNodeMsgToWorker(&pMgmt->writeWorker, pMsg); +inline int32_t mmPutMsgToWriteQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) { + return mmPutMsgToWorker(pMgmt, &pMgmt->writeWorker, pMsg); } -int32_t mmPutNodeMsgToSyncQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) { - return mmPutNodeMsgToWorker(&pMgmt->syncWorker, pMsg); +inline int32_t mmPutMsgToSyncQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) { + return mmPutMsgToWorker(pMgmt, &pMgmt->syncWorker, pMsg); } -int32_t mmPutNodeMsgToReadQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) { - return mmPutNodeMsgToWorker(&pMgmt->readWorker, pMsg); +inline int32_t mmPutMsgToReadQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) { + return mmPutMsgToWorker(pMgmt, &pMgmt->readWorker, pMsg); } -int32_t mmPutNodeMsgToQueryQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) { - mndPreprocessQueryMsg(pMgmt->pMnode, pMsg); - - return mmPutNodeMsgToWorker(&pMgmt->queryWorker, pMsg); +inline int32_t mmPutMsgToQueryQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) { + if (mndPreprocessQueryMsg(pMgmt->pMnode, pMsg) != 0) { + dError("msg:%p, failed to pre-process in mnode since %s, type:%s", pMsg, terrstr(), TMSG_INFO(pMsg->msgType)); + return -1; + } + return mmPutMsgToWorker(pMgmt, &pMgmt->queryWorker, pMsg); } -int32_t mmPutNodeMsgToMonitorQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) { - return mmPutNodeMsgToWorker(&pMgmt->monitorWorker, pMsg); +inline int32_t mmPutMsgToMonitorQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) { + return mmPutMsgToWorker(pMgmt, &pMgmt->monitorWorker, pMsg); } -int32_t mmPutRpcMsgToQueue(SMnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc) { +inline int32_t mmPutMsgToQueue(SMnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc) { SRpcMsg *pMsg = taosAllocateQitem(sizeof(SRpcMsg), RPC_QITEM); if (pMsg == NULL) return -1; - memcpy(pMsg, pRpc, sizeof(SRpcMsg)); + dTrace("msg:%p, is created", pMsg); + memcpy(pMsg, pRpc, sizeof(SRpcMsg)); switch (qtype) { case WRITE_QUEUE: - dTrace("msg:%p, is created and will put into vnode-write queue", pMsg); - taosWriteQitem(pMgmt->writeWorker.queue, pMsg); - return 0; + return mmPutMsgToWorker(pMgmt, &pMgmt->writeWorker, pMsg); case QUERY_QUEUE: - dTrace("msg:%p, is created and will put into vnode-query queue", pMsg); - taosWriteQitem(pMgmt->queryWorker.queue, pMsg); - return 0; - + return mmPutMsgToWorker(pMgmt, &pMgmt->queryWorker, pMsg); case READ_QUEUE: - dTrace("msg:%p, is created and will put into vnode-read queue", pMsg); - taosWriteQitem(pMgmt->readWorker.queue, pMsg); - return 0; + return mmPutMsgToWorker(pMgmt, &pMgmt->readWorker, pMsg); case SYNC_QUEUE: - if (mmAcquire(pMgmt) == 0) { - dTrace("msg:%p, is created and will put into vnode-sync queue", pMsg); - taosWriteQitem(pMgmt->syncWorker.queue, pMsg); - mmRelease(pMgmt); - return 0; - } else { - return -1; - } + return mmPutMsgToWorker(pMgmt, &pMgmt->syncWorker, pMsg); default: terrno = TSDB_CODE_INVALID_PARA; + dTrace("msg:%p, is freed", pMsg); + taosFreeQitem(pMsg); + rpcFreeCont(pMsg->pCont); return -1; } } @@ -137,7 +135,7 @@ int32_t mmStartWorker(SMnodeMgmt *pMgmt) { .min = tsNumOfMnodeQueryThreads, .max = tsNumOfMnodeQueryThreads, .name = "mnode-query", - .fp = (FItem)mmProcessQueue, + .fp = (FItem)mmProcessRpcMsg, .param = pMgmt, }; if (tSingleWorkerInit(&pMgmt->queryWorker, &qCfg) != 0) { @@ -149,7 +147,7 @@ int32_t mmStartWorker(SMnodeMgmt *pMgmt) { .min = tsNumOfMnodeReadThreads, .max = tsNumOfMnodeReadThreads, .name = "mnode-read", - .fp = (FItem)mmProcessQueue, + .fp = (FItem)mmProcessRpcMsg, .param = pMgmt, }; if (tSingleWorkerInit(&pMgmt->readWorker, &rCfg) != 0) { @@ -161,7 +159,7 @@ int32_t mmStartWorker(SMnodeMgmt *pMgmt) { .min = 1, .max = 1, .name = "mnode-write", - .fp = (FItem)mmProcessQueue, + .fp = (FItem)mmProcessRpcMsg, .param = pMgmt, }; if (tSingleWorkerInit(&pMgmt->writeWorker, &wCfg) != 0) { @@ -173,7 +171,7 @@ int32_t mmStartWorker(SMnodeMgmt *pMgmt) { .min = 1, .max = 1, .name = "mnode-sync", - .fp = (FItem)mmProcessSyncQueue, + .fp = (FItem)mmProcessSyncMsg, .param = pMgmt, }; if (tSingleWorkerInit(&pMgmt->syncWorker, &sCfg) != 0) { @@ -185,7 +183,7 @@ int32_t mmStartWorker(SMnodeMgmt *pMgmt) { .min = 1, .max = 1, .name = "mnode-monitor", - .fp = (FItem)mmProcessQueue, + .fp = (FItem)mmProcessRpcMsg, .param = pMgmt, }; if (tSingleWorkerInit(&pMgmt->monitorWorker, &mCfg) != 0) { From 01521a2541af1aa7cd62eb790163ce89407f5807 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Tue, 14 Jun 2022 10:41:15 +0800 Subject: [PATCH 2/9] refactor: mnode file --- source/dnode/mgmt/mgmt_mnode/inc/mmInt.h | 6 +- source/dnode/mgmt/mgmt_mnode/src/mmFile.c | 84 ++++++++------------- source/dnode/mgmt/mgmt_mnode/src/mmHandle.c | 2 +- source/dnode/mgmt/mgmt_mnode/src/mmInt.c | 24 +++--- source/dnode/mgmt/node_mgmt/src/dmEnv.c | 8 +- 5 files changed, 47 insertions(+), 77 deletions(-) diff --git a/source/dnode/mgmt/mgmt_mnode/inc/mmInt.h b/source/dnode/mgmt/mgmt_mnode/inc/mmInt.h index 2443493d0b..5dd1a5b3b8 100644 --- a/source/dnode/mgmt/mgmt_mnode/inc/mmInt.h +++ b/source/dnode/mgmt/mgmt_mnode/inc/mmInt.h @@ -34,16 +34,14 @@ typedef struct SMnodeMgmt { SSingleWorker writeWorker; SSingleWorker syncWorker; SSingleWorker monitorWorker; - SReplica replicas[TSDB_MAX_REPLICA]; - int8_t replica; bool stopped; int32_t refCount; TdThreadRwlock lock; } SMnodeMgmt; // mmFile.c -int32_t mmReadFile(SMnodeMgmt *pMgmt, bool *pDeployed); -int32_t mmWriteFile(SMnodeMgmt *pMgmt, SDCreateMnodeReq *pMsg, bool deployed); +int32_t mmReadFile(SMnodeMgmt *pMgmt, SReplica *pReplica, bool *pDeployed); +int32_t mmWriteFile(SMnodeMgmt *pMgmt, const SReplica *pReplica, bool deployed); // mmInt.c int32_t mmAcquire(SMnodeMgmt *pMgmt); diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmFile.c b/source/dnode/mgmt/mgmt_mnode/src/mmFile.c index 478d6abd52..e24ebcb9e2 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmFile.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmFile.c @@ -16,7 +16,7 @@ #define _DEFAULT_SOURCE #include "mmInt.h" -int32_t mmReadFile(SMnodeMgmt *pMgmt, bool *pDeployed) { +int32_t mmReadFile(SMnodeMgmt *pMgmt, SReplica *pReplica, bool *pDeployed) { int32_t code = TSDB_CODE_INVALID_JSON_FORMAT; int32_t len = 0; int32_t maxLen = 4096; @@ -52,45 +52,36 @@ int32_t mmReadFile(SMnodeMgmt *pMgmt, bool *pDeployed) { } *pDeployed = deployed->valueint; - cJSON *mnodes = cJSON_GetObjectItem(root, "mnodes"); - if (mnodes != NULL) { - if (!mnodes || mnodes->type != cJSON_Array) { - dError("failed to read %s since nodes not found", file); + cJSON *id = cJSON_GetObjectItem(root, "id"); + if (id) { + if (id->type != cJSON_Number) { + dError("failed to read %s since id not found", file); goto _OVER; } - - pMgmt->replica = cJSON_GetArraySize(mnodes); - if (pMgmt->replica <= 0 || pMgmt->replica > TSDB_MAX_REPLICA) { - dError("failed to read %s since mnodes size %d invalid", file, pMgmt->replica); - goto _OVER; - } - - for (int32_t i = 0; i < pMgmt->replica; ++i) { - cJSON *node = cJSON_GetArrayItem(mnodes, i); - if (node == NULL) break; - - SReplica *pReplica = &pMgmt->replicas[i]; - - cJSON *id = cJSON_GetObjectItem(node, "id"); - if (!id || id->type != cJSON_Number) { - dError("failed to read %s since id not found", file); - goto _OVER; - } + if (pReplica) { pReplica->id = id->valueint; + } + } - cJSON *fqdn = cJSON_GetObjectItem(node, "fqdn"); - if (!fqdn || fqdn->type != cJSON_String || fqdn->valuestring == NULL) { - dError("failed to read %s since fqdn not found", file); - goto _OVER; - } + cJSON *fqdn = cJSON_GetObjectItem(root, "fqdn"); + if (fqdn) { + if (fqdn->type != cJSON_String || fqdn->valuestring == NULL) { + dError("failed to read %s since fqdn not found", file); + goto _OVER; + } + if (pReplica) { tstrncpy(pReplica->fqdn, fqdn->valuestring, TSDB_FQDN_LEN); + } + } - cJSON *port = cJSON_GetObjectItem(node, "port"); - if (!port || port->type != cJSON_Number) { - dError("failed to read %s since port not found", file); - goto _OVER; - } - pReplica->port = port->valueint; + cJSON *port = cJSON_GetObjectItem(root, "port"); + if (port) { + if (port->type != cJSON_Number) { + dError("failed to read %s since port not found", file); + goto _OVER; + } + if (pReplica) { + pReplica->port = (uint16_t)port->valueint; } } @@ -106,7 +97,7 @@ _OVER: return code; } -int32_t mmWriteFile(SMnodeMgmt *pMgmt, SDCreateMnodeReq *pMsg, bool deployed) { +int32_t mmWriteFile(SMnodeMgmt *pMgmt, const SReplica *pReplica, bool deployed) { char file[PATH_MAX] = {0}; char realfile[PATH_MAX] = {0}; snprintf(file, sizeof(file), "%s%smnode.json.bak", pMgmt->path, TD_DIRSEP); @@ -124,26 +115,11 @@ int32_t mmWriteFile(SMnodeMgmt *pMgmt, SDCreateMnodeReq *pMsg, bool deployed) { char *content = taosMemoryCalloc(1, maxLen + 1); len += snprintf(content + len, maxLen - len, "{\n"); - - int8_t replica = (pMsg != NULL ? pMsg->replica : pMgmt->replica); - if (replica > 0) { - len += snprintf(content + len, maxLen - len, " \"mnodes\": [{\n"); - for (int32_t i = 0; i < replica; ++i) { - SReplica *pReplica = &pMgmt->replicas[i]; - if (pMsg != NULL) { - pReplica = &pMsg->replicas[i]; - } - len += snprintf(content + len, maxLen - len, " \"id\": %d,\n", pReplica->id); - len += snprintf(content + len, maxLen - len, " \"fqdn\": \"%s\",\n", pReplica->fqdn); - len += snprintf(content + len, maxLen - len, " \"port\": %u\n", pReplica->port); - if (i < replica - 1) { - len += snprintf(content + len, maxLen - len, " },{\n"); - } else { - len += snprintf(content + len, maxLen - len, " }],\n"); - } - } + if (pReplica != NULL && pReplica->id > 0) { + len += snprintf(content + len, maxLen - len, " \"id\": %d,\n", pReplica->id); + len += snprintf(content + len, maxLen - len, " \"fqdn\": \"%s\",\n", pReplica->fqdn); + len += snprintf(content + len, maxLen - len, " \"port\": %u\n,", pReplica->port); } - len += snprintf(content + len, maxLen - len, " \"deployed\": %d\n", deployed); len += snprintf(content + len, maxLen - len, "}\n"); diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c index f151a09b94..124876d718 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c @@ -90,7 +90,7 @@ int32_t mmProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) { SMnodeMgmt mgmt = {0}; mgmt.path = pInput->path; mgmt.name = pInput->name; - if (mmWriteFile(&mgmt, &createReq, deployed) != 0) { + if (mmWriteFile(&mgmt, &createReq.replicas[0], deployed) != 0) { dError("failed to write mnode file since %s", terrstr()); return -1; } diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmInt.c b/source/dnode/mgmt/mgmt_mnode/src/mmInt.c index cb942e8d1c..7e73e53481 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmInt.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmInt.c @@ -27,7 +27,7 @@ static bool mmDeployRequired(const SMgmtInputOpt *pInput) { static int32_t mmRequire(const SMgmtInputOpt *pInput, bool *required) { SMnodeMgmt mgmt = {0}; mgmt.path = pInput->path; - if (mmReadFile(&mgmt, required) != 0) { + if (mmReadFile(&mgmt, NULL, required) != 0) { return -1; } @@ -52,23 +52,17 @@ static void mmBuildOptionForDeploy(SMnodeMgmt *pMgmt, const SMgmtInputOpt *pInpu tstrncpy(pReplica->fqdn, tsLocalFqdn, TSDB_FQDN_LEN); } -static void mmBuildOptionForOpen(SMnodeMgmt *pMgmt, SMnodeOpt *pOption) { +static void mmBuildOptionForOpen(SMnodeMgmt *pMgmt, const SReplica *pReplica, SMnodeOpt *pOption) { pOption->standby = false; pOption->deploy = false; pOption->msgCb = pMgmt->msgCb; pOption->dnodeId = pMgmt->pData->dnodeId; - if (pMgmt->replica > 0) { + if (pReplica->id > 0) { pOption->standby = true; pOption->replica = 1; pOption->selfIndex = 0; - SReplica *pReplica = &pOption->replicas[0]; - for (int32_t i = 0; i < pMgmt->replica; ++i) { - if (pMgmt->replicas[i].id != pMgmt->pData->dnodeId) continue; - pReplica->id = pMgmt->replicas[i].id; - pReplica->port = pMgmt->replicas[i].port; - memcpy(pReplica->fqdn, pMgmt->replicas[i].fqdn, TSDB_FQDN_LEN); - } + memcpy(&pOption->replicas[0], pReplica, sizeof(SReplica)); } } @@ -108,8 +102,9 @@ static int32_t mmOpen(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) { pMgmt->msgCb.mgmt = pMgmt; taosThreadRwlockInit(&pMgmt->lock, NULL); - bool deployed = false; - if (mmReadFile(pMgmt, &deployed) != 0) { + bool deployed = false; + SReplica replica = {0}; + if (mmReadFile(pMgmt, &replica, &deployed) != 0) { dError("failed to read file since %s", terrstr()); mmClose(pMgmt); return -1; @@ -122,7 +117,7 @@ static int32_t mmOpen(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) { mmBuildOptionForDeploy(pMgmt, pInput, &option); } else { dInfo("mnode start to open"); - mmBuildOptionForOpen(pMgmt, &option); + mmBuildOptionForOpen(pMgmt, &replica, &option); } pMgmt->pMnode = mndOpen(pMgmt->path, &option); @@ -140,8 +135,7 @@ static int32_t mmOpen(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) { } tmsgReportStartup("mnode-worker", "initialized"); - if (!deployed || pMgmt->replica > 0) { - pMgmt->replica = 0; + if (!deployed || replica.id > 0) { deployed = true; if (mmWriteFile(pMgmt, NULL, deployed) != 0) { dError("failed to write mnode file since %s", terrstr()); diff --git a/source/dnode/mgmt/node_mgmt/src/dmEnv.c b/source/dnode/mgmt/node_mgmt/src/dmEnv.c index 528beb280b..498af36786 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmEnv.c +++ b/source/dnode/mgmt/node_mgmt/src/dmEnv.c @@ -123,10 +123,12 @@ static int32_t dmProcessCreateNodeReq(EDndNodeType ntype, SRpcMsg *pMsg) { dError("node:%s, failed to create since %s", pWrapper->name, terrstr()); } else { dInfo("node:%s, has been created", pWrapper->name); - (void)dmOpenNode(pWrapper); - (void)dmStartNode(pWrapper); - pWrapper->required = true; + code = dmOpenNode(pWrapper); + if (code != 0) { + code = dmStartNode(pWrapper); + } pWrapper->deployed = true; + pWrapper->required = true; pWrapper->proc.ptype = pDnode->ptype; } From b5bc9aa2bae4dc9df60577b0b6d0cacec53f0107 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Tue, 14 Jun 2022 10:44:34 +0800 Subject: [PATCH 3/9] fix rpc retry error --- source/libs/transport/src/transCli.c | 3 +++ 1 file changed, 3 insertions(+) diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index d1debc6af6..4852dcfb54 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -983,6 +983,9 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) { SEpSet epSet = {0}; tDeserializeSEpSet(pResp->pCont, pResp->contLen, &epSet); pCtx->epSet = epSet; + if (!transEpSetIsEqual(&epSet, &pCtx->epSet)) { + pCtx->retryCount = 0; + } } addConnToPool(pThrd->pool, pConn); tTrace("use remote epset, current in use: %d, retry count:%d, try limit: %d", pEpSet->inUse, pCtx->retryCount + 1, From a3f12736407af1bbe8ed75435fde928b70167ef9 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Tue, 14 Jun 2022 10:58:36 +0800 Subject: [PATCH 4/9] refactor: mnode sync --- include/dnode/mnode/mnode.h | 4 +--- source/dnode/mgmt/mgmt_mnode/src/mmFile.c | 4 +++- source/dnode/mgmt/mgmt_mnode/src/mmHandle.c | 2 +- source/dnode/mgmt/mgmt_mnode/src/mmInt.c | 15 ++++----------- source/dnode/mnode/impl/inc/mndInt.h | 14 ++++++-------- source/dnode/mnode/impl/src/mndDnode.c | 4 ++-- source/dnode/mnode/impl/src/mndMain.c | 4 +--- source/dnode/mnode/impl/src/mndSync.c | 18 +++++++++--------- 8 files changed, 27 insertions(+), 38 deletions(-) diff --git a/include/dnode/mnode/mnode.h b/include/dnode/mnode/mnode.h index 83183422c2..99b704826d 100644 --- a/include/dnode/mnode/mnode.h +++ b/include/dnode/mnode/mnode.h @@ -32,9 +32,7 @@ typedef struct { int32_t dnodeId; bool standby; bool deploy; - int8_t replica; - int8_t selfIndex; - SReplica replicas[TSDB_MAX_REPLICA]; + SReplica replica; SMsgCb msgCb; } SMnodeOpt; diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmFile.c b/source/dnode/mgmt/mgmt_mnode/src/mmFile.c index e24ebcb9e2..27a35ae17a 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmFile.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmFile.c @@ -86,12 +86,14 @@ int32_t mmReadFile(SMnodeMgmt *pMgmt, SReplica *pReplica, bool *pDeployed) { } code = 0; - dDebug("succcessed to read file %s, deployed:%d", file, *pDeployed); _OVER: if (content != NULL) taosMemoryFree(content); if (root != NULL) cJSON_Delete(root); if (pFile != NULL) taosCloseFile(&pFile); + if (code == 0) { + dDebug("succcessed to read file %s, deployed:%d", file, *pDeployed); + } terrno = code; return code; diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c index 124876d718..e91f4b8cf4 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c @@ -126,7 +126,7 @@ int32_t mmProcessDropReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) { SArray *mmGetMsgHandles() { int32_t code = -1; - SArray *pArray = taosArrayInit(64, sizeof(SMgmtHandle)); + SArray *pArray = taosArrayInit(128, sizeof(SMgmtHandle)); if (pArray == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_MNODE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmInt.c b/source/dnode/mgmt/mgmt_mnode/src/mmInt.c index 7e73e53481..bc67b8442e 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmInt.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmInt.c @@ -43,13 +43,9 @@ static void mmBuildOptionForDeploy(SMnodeMgmt *pMgmt, const SMgmtInputOpt *pInpu pOption->deploy = true; pOption->msgCb = pMgmt->msgCb; pOption->dnodeId = pMgmt->pData->dnodeId; - pOption->replica = 1; - pOption->selfIndex = 0; - - SReplica *pReplica = &pOption->replicas[0]; - pReplica->id = 1; - pReplica->port = tsServerPort; - tstrncpy(pReplica->fqdn, tsLocalFqdn, TSDB_FQDN_LEN); + pOption->replica.id = 1; + pOption->replica.port = tsServerPort; + tstrncpy(pOption->replica.fqdn, tsLocalFqdn, TSDB_FQDN_LEN); } static void mmBuildOptionForOpen(SMnodeMgmt *pMgmt, const SReplica *pReplica, SMnodeOpt *pOption) { @@ -57,12 +53,9 @@ static void mmBuildOptionForOpen(SMnodeMgmt *pMgmt, const SReplica *pReplica, SM pOption->deploy = false; pOption->msgCb = pMgmt->msgCb; pOption->dnodeId = pMgmt->pData->dnodeId; - if (pReplica->id > 0) { pOption->standby = true; - pOption->replica = 1; - pOption->selfIndex = 0; - memcpy(&pOption->replicas[0], pReplica, sizeof(SReplica)); + pOption->replica = *pReplica; } } diff --git a/source/dnode/mnode/impl/inc/mndInt.h b/source/dnode/mnode/impl/inc/mndInt.h index 4869a19856..16220f101a 100644 --- a/source/dnode/mnode/impl/inc/mndInt.h +++ b/source/dnode/mnode/impl/inc/mndInt.h @@ -76,11 +76,12 @@ typedef struct { } STelemMgmt; typedef struct { - sem_t syncSem; - int64_t sync; - bool standby; - int32_t errCode; - int32_t transId; + sem_t syncSem; + int64_t sync; + bool standby; + SReplica replica; + int32_t errCode; + int32_t transId; } SSyncMgmt; typedef struct { @@ -98,9 +99,6 @@ typedef struct SMnode { bool stopped; bool restored; bool deploy; - int8_t replica; - int8_t selfIndex; - SReplica replicas[TSDB_MAX_REPLICA]; char *path; int64_t checkTime; SSdb *pSdb; diff --git a/source/dnode/mnode/impl/src/mndDnode.c b/source/dnode/mnode/impl/src/mndDnode.c index 73eea70195..c936c0f93d 100644 --- a/source/dnode/mnode/impl/src/mndDnode.c +++ b/source/dnode/mnode/impl/src/mndDnode.c @@ -95,8 +95,8 @@ static int32_t mndCreateDefaultDnode(SMnode *pMnode) { dnodeObj.id = 1; dnodeObj.createdTime = taosGetTimestampMs(); dnodeObj.updateTime = dnodeObj.createdTime; - dnodeObj.port = pMnode->replicas[0].port; - memcpy(&dnodeObj.fqdn, pMnode->replicas[0].fqdn, TSDB_FQDN_LEN); + dnodeObj.port = tsServerPort; + memcpy(&dnodeObj.fqdn, tsLocalFqdn, TSDB_FQDN_LEN); snprintf(dnodeObj.ep, TSDB_EP_LEN, "%s:%u", dnodeObj.fqdn, dnodeObj.port); pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_GLOBAL, NULL); diff --git a/source/dnode/mnode/impl/src/mndMain.c b/source/dnode/mnode/impl/src/mndMain.c index 7a73d3c360..892650fa18 100644 --- a/source/dnode/mnode/impl/src/mndMain.c +++ b/source/dnode/mnode/impl/src/mndMain.c @@ -289,11 +289,9 @@ static int32_t mndExecSteps(SMnode *pMnode) { } static void mndSetOptions(SMnode *pMnode, const SMnodeOpt *pOption) { - pMnode->replica = pOption->replica; - pMnode->selfIndex = pOption->selfIndex; - memcpy(&pMnode->replicas, pOption->replicas, sizeof(SReplica) * TSDB_MAX_REPLICA); pMnode->msgCb = pOption->msgCb; pMnode->selfDnodeId = pOption->dnodeId; + pMnode->syncMgmt.replica = pOption->replica; pMnode->syncMgmt.standby = pOption->standby; } diff --git a/source/dnode/mnode/impl/src/mndSync.c b/source/dnode/mnode/impl/src/mndSync.c index e0b4cc6a57..e58012b1b7 100644 --- a/source/dnode/mnode/impl/src/mndSync.c +++ b/source/dnode/mnode/impl/src/mndSync.c @@ -188,15 +188,15 @@ int32_t mndInitSync(SMnode *pMnode) { syncInfo.isStandBy = pMgmt->standby; syncInfo.snapshotEnable = true; - SSyncCfg *pCfg = &syncInfo.syncCfg; - pCfg->replicaNum = pMnode->replica; - pCfg->myIndex = pMnode->selfIndex; - mInfo("start to open mnode sync, replica:%d myindex:%d standby:%d", pCfg->replicaNum, pCfg->myIndex, pMgmt->standby); - for (int32_t i = 0; i < pMnode->replica; ++i) { - SNodeInfo *pNode = &pCfg->nodeInfo[i]; - tstrncpy(pNode->nodeFqdn, pMnode->replicas[i].fqdn, sizeof(pNode->nodeFqdn)); - pNode->nodePort = pMnode->replicas[i].port; - mInfo("index:%d, fqdn:%s port:%d", i, pNode->nodeFqdn, pNode->nodePort); + mInfo("start to open mnode sync, standby:%d", pMgmt->standby); + if (pMgmt->standby || pMgmt->replica.id > 0) { + SSyncCfg *pCfg = &syncInfo.syncCfg; + pCfg->replicaNum = 1; + pCfg->myIndex = 0; + SNodeInfo *pNode = &pCfg->nodeInfo[0]; + tstrncpy(pNode->nodeFqdn, pMgmt->replica.fqdn, sizeof(pNode->nodeFqdn)); + pNode->nodePort = pMgmt->replica.port; + mInfo("fqdn:%s port:%u", pNode->nodeFqdn, pNode->nodePort); } tsem_init(&pMgmt->syncSem, 0, 0); From d93f4c4a83d906e729cbf8fc344cdfa4c5b14254 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Tue, 14 Jun 2022 11:10:36 +0800 Subject: [PATCH 5/9] refactor: mnode worker --- source/dnode/mgmt/mgmt_mnode/inc/mmInt.h | 4 -- source/dnode/mgmt/mgmt_mnode/src/mmInt.c | 19 -------- source/dnode/mgmt/mgmt_mnode/src/mmWorker.c | 48 +++++++++++++++------ 3 files changed, 35 insertions(+), 36 deletions(-) diff --git a/source/dnode/mgmt/mgmt_mnode/inc/mmInt.h b/source/dnode/mgmt/mgmt_mnode/inc/mmInt.h index 5dd1a5b3b8..c5c3d76f1e 100644 --- a/source/dnode/mgmt/mgmt_mnode/inc/mmInt.h +++ b/source/dnode/mgmt/mgmt_mnode/inc/mmInt.h @@ -43,10 +43,6 @@ typedef struct SMnodeMgmt { int32_t mmReadFile(SMnodeMgmt *pMgmt, SReplica *pReplica, bool *pDeployed); int32_t mmWriteFile(SMnodeMgmt *pMgmt, const SReplica *pReplica, bool deployed); -// mmInt.c -int32_t mmAcquire(SMnodeMgmt *pMgmt); -void mmRelease(SMnodeMgmt *pMgmt); - // mmHandle.c SArray *mmGetMsgHandles(); int32_t mmProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg); diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmInt.c b/source/dnode/mgmt/mgmt_mnode/src/mmInt.c index bc67b8442e..b7124dfaa5 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmInt.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmInt.c @@ -164,22 +164,3 @@ SMgmtFunc mmGetMgmtFunc() { return mgmtFunc; } - -int32_t mmAcquire(SMnodeMgmt *pMgmt) { - int32_t code = 0; - - taosThreadRwlockRdlock(&pMgmt->lock); - if (pMgmt->stopped) { - code = -1; - } else { - atomic_add_fetch_32(&pMgmt->refCount, 1); - } - taosThreadRwlockUnlock(&pMgmt->lock); - return code; -} - -void mmRelease(SMnodeMgmt *pMgmt) { - taosThreadRwlockRdlock(&pMgmt->lock); - atomic_sub_fetch_32(&pMgmt->refCount, 1); - taosThreadRwlockUnlock(&pMgmt->lock); -} \ No newline at end of file diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c b/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c index 6da9bfc528..17581fcef7 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c @@ -16,6 +16,25 @@ #define _DEFAULT_SOURCE #include "mmInt.h" +static inline int32_t mmAcquire(SMnodeMgmt *pMgmt) { + int32_t code = 0; + + taosThreadRwlockRdlock(&pMgmt->lock); + if (pMgmt->stopped) { + code = -1; + } else { + atomic_add_fetch_32(&pMgmt->refCount, 1); + } + taosThreadRwlockUnlock(&pMgmt->lock); + return code; +} + +static inline void mmRelease(SMnodeMgmt *pMgmt) { + taosThreadRwlockRdlock(&pMgmt->lock); + atomic_sub_fetch_32(&pMgmt->refCount, 1); + taosThreadRwlockUnlock(&pMgmt->lock); +} + static inline void mmSendRsp(SRpcMsg *pMsg, int32_t code) { SRpcMsg rsp = { .code = code, @@ -107,27 +126,30 @@ inline int32_t mmPutMsgToMonitorQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) { } inline int32_t mmPutMsgToQueue(SMnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc) { - SRpcMsg *pMsg = taosAllocateQitem(sizeof(SRpcMsg), RPC_QITEM); - if (pMsg == NULL) return -1; - dTrace("msg:%p, is created", pMsg); - - memcpy(pMsg, pRpc, sizeof(SRpcMsg)); + SSingleWorker *pWorker = NULL; switch (qtype) { case WRITE_QUEUE: - return mmPutMsgToWorker(pMgmt, &pMgmt->writeWorker, pMsg); + pWorker = &pMgmt->writeWorker; + break; case QUERY_QUEUE: - return mmPutMsgToWorker(pMgmt, &pMgmt->queryWorker, pMsg); + pWorker = &pMgmt->queryWorker; + break; case READ_QUEUE: - return mmPutMsgToWorker(pMgmt, &pMgmt->readWorker, pMsg); + pWorker = &pMgmt->readWorker; + break; case SYNC_QUEUE: - return mmPutMsgToWorker(pMgmt, &pMgmt->syncWorker, pMsg); + pWorker = &pMgmt->syncWorker; + break; default: terrno = TSDB_CODE_INVALID_PARA; - dTrace("msg:%p, is freed", pMsg); - taosFreeQitem(pMsg); - rpcFreeCont(pMsg->pCont); - return -1; } + + if (pWorker == NULL) return -1; + SRpcMsg *pMsg = taosAllocateQitem(sizeof(SRpcMsg), RPC_QITEM); + if (pMsg == NULL) return -1; + dTrace("msg:%p, is created and will put int %s queue", pMsg, pWorker->name); + + return mmPutMsgToWorker(pMgmt, pWorker, pMsg); } int32_t mmStartWorker(SMnodeMgmt *pMgmt) { From 63927d15fab259f84e78e710ce1011fb3aff8ad6 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Tue, 14 Jun 2022 11:15:17 +0800 Subject: [PATCH 6/9] fix: compile err in windows --- source/dnode/mgmt/mgmt_mnode/src/mmWorker.c | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c b/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c index 17581fcef7..f02f90962b 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c @@ -101,19 +101,19 @@ static inline int32_t mmPutMsgToWorker(SMnodeMgmt *pMgmt, SSingleWorker *pWorker } } -inline int32_t mmPutMsgToWriteQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) { +int32_t mmPutMsgToWriteQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) { return mmPutMsgToWorker(pMgmt, &pMgmt->writeWorker, pMsg); } -inline int32_t mmPutMsgToSyncQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) { +int32_t mmPutMsgToSyncQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) { return mmPutMsgToWorker(pMgmt, &pMgmt->syncWorker, pMsg); } -inline int32_t mmPutMsgToReadQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) { +int32_t mmPutMsgToReadQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) { return mmPutMsgToWorker(pMgmt, &pMgmt->readWorker, pMsg); } -inline int32_t mmPutMsgToQueryQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) { +int32_t mmPutMsgToQueryQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) { if (mndPreprocessQueryMsg(pMgmt->pMnode, pMsg) != 0) { dError("msg:%p, failed to pre-process in mnode since %s, type:%s", pMsg, terrstr(), TMSG_INFO(pMsg->msgType)); return -1; @@ -121,11 +121,11 @@ inline int32_t mmPutMsgToQueryQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) { return mmPutMsgToWorker(pMgmt, &pMgmt->queryWorker, pMsg); } -inline int32_t mmPutMsgToMonitorQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) { +int32_t mmPutMsgToMonitorQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) { return mmPutMsgToWorker(pMgmt, &pMgmt->monitorWorker, pMsg); } -inline int32_t mmPutMsgToQueue(SMnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc) { +int32_t mmPutMsgToQueue(SMnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc) { SSingleWorker *pWorker = NULL; switch (qtype) { case WRITE_QUEUE: From 313d2f0e4dbcd7d647d9a8bbd511e2151f9c08bc Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Tue, 14 Jun 2022 11:18:30 +0800 Subject: [PATCH 7/9] test: enable mnode/basic3.sim --- tests/script/jenkins/basic.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/script/jenkins/basic.txt b/tests/script/jenkins/basic.txt index 1c694bf7fb..db8a055362 100644 --- a/tests/script/jenkins/basic.txt +++ b/tests/script/jenkins/basic.txt @@ -58,7 +58,7 @@ # ---- mnode ./test.sh -f tsim/mnode/basic1.sim ./test.sh -f tsim/mnode/basic2.sim -#./test.sh -f tsim/mnode/basic3.sim +./test.sh -f tsim/mnode/basic3.sim ./test.sh -f tsim/mnode/basic4.sim ./test.sh -f tsim/mnode/basic5.sim From 4e7aac8171a99b41c3c020e54046bbe95dc1d0d0 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Tue, 14 Jun 2022 11:34:51 +0800 Subject: [PATCH 8/9] fix: crash in mnode --- source/dnode/mgmt/mgmt_mnode/src/mmWorker.c | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c b/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c index f02f90962b..ee2df5f089 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c @@ -147,8 +147,9 @@ int32_t mmPutMsgToQueue(SMnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc) { if (pWorker == NULL) return -1; SRpcMsg *pMsg = taosAllocateQitem(sizeof(SRpcMsg), RPC_QITEM); if (pMsg == NULL) return -1; - dTrace("msg:%p, is created and will put int %s queue", pMsg, pWorker->name); + memcpy(pMsg, pRpc, sizeof(SRpcMsg)); + dTrace("msg:%p, is created and will put int %s queue", pMsg, pWorker->name); return mmPutMsgToWorker(pMgmt, pWorker, pMsg); } From fe6d46d2c0a94d236897aeb1977e81e82cc66047 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Tue, 14 Jun 2022 12:41:33 +0800 Subject: [PATCH 9/9] test: disable mnode basic3 --- tests/script/jenkins/basic.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/script/jenkins/basic.txt b/tests/script/jenkins/basic.txt index db8a055362..1c694bf7fb 100644 --- a/tests/script/jenkins/basic.txt +++ b/tests/script/jenkins/basic.txt @@ -58,7 +58,7 @@ # ---- mnode ./test.sh -f tsim/mnode/basic1.sim ./test.sh -f tsim/mnode/basic2.sim -./test.sh -f tsim/mnode/basic3.sim +#./test.sh -f tsim/mnode/basic3.sim ./test.sh -f tsim/mnode/basic4.sim ./test.sh -f tsim/mnode/basic5.sim