From ad0995c2070bfd9945a94c67b95dbf1da256ede6 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Thu, 23 Dec 2021 12:09:37 +0800 Subject: [PATCH] TD-10431 refact create mnode msg --- include/common/taosmsg.h | 3 + source/dnode/mnode/impl/src/mndMnode.c | 219 +++++++++++++++++++------ 2 files changed, 172 insertions(+), 50 deletions(-) diff --git a/include/common/taosmsg.h b/include/common/taosmsg.h index 2a4512ef48..5b82e807a1 100644 --- a/include/common/taosmsg.h +++ b/include/common/taosmsg.h @@ -907,6 +907,7 @@ typedef struct { typedef struct { int32_t dnodeId; + int32_t reserve[8]; } SCreateMnodeMsg, SDropMnodeMsg; typedef struct { @@ -914,10 +915,12 @@ typedef struct { int8_t align[3]; int8_t replica; SReplica replicas[TSDB_MAX_REPLICA]; + int32_t reserve[8]; } SCreateMnodeInMsg, SAlterMnodeInMsg; typedef struct { int32_t dnodeId; + int32_t reserve[8]; } SDropMnodeInMsg; typedef struct { diff --git a/source/dnode/mnode/impl/src/mndMnode.c b/source/dnode/mnode/impl/src/mndMnode.c index 111ed60632..9120b7fe27 100644 --- a/source/dnode/mnode/impl/src/mndMnode.c +++ b/source/dnode/mnode/impl/src/mndMnode.c @@ -202,51 +202,137 @@ void mndGetMnodeEpSet(SMnode *pMnode, SEpSet *pEpSet) { } } +static SCreateMnodeInMsg *mndBuildCreateMnodeMsg(SMnode *pMnode, SDnodeObj *pDnode, SMnodeObj *pObj) { + SCreateMnodeInMsg *pCreate = calloc(1, sizeof(SCreateMnodeInMsg)); + if (pCreate == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } + + pCreate->dnodeId = htonl(pObj->id); + + int32_t numOfReplicas = 0; + SSdb *pSdb = pMnode->pSdb; + void *pIter = NULL; + + while (numOfReplicas < TSDB_MAX_REPLICA - 1) { + SMnodeObj *pObj = NULL; + pIter = sdbFetch(pSdb, SDB_MNODE, pIter, (void **)&pObj); + if (pIter == NULL) break; + if (pObj->pDnode == NULL) break; + + SReplica *pReplica = &pCreate->replicas[numOfReplicas]; + pReplica->id = htonl(pObj->id); + pReplica->port = htons(pObj->pDnode->port); + memcpy(pReplica->fqdn, pObj->pDnode->fqdn, TSDB_FQDN_LEN); + numOfReplicas++; + } + + numOfReplicas++; + SReplica *pReplica = &pCreate->replicas[numOfReplicas]; + pReplica->id = htonl(pObj->id); + pReplica->port = htons(pDnode->port); + memcpy(pReplica->fqdn, pDnode->fqdn, TSDB_FQDN_LEN); + + return pCreate; +} + +static SDropMnodeInMsg *mndBuildDropMnodeMsg(SMnode *pMnode, SMnodeObj *pObj) { + SDropMnodeInMsg *pDrop = calloc(1, sizeof(SDropMnodeInMsg)); + if (pDrop == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } + + pDrop->dnodeId = htonl(pObj->id); + return pDrop; +} + +static int32_t mndSetCreateMnodeRedoLogs(SMnode *pMnode, STrans *pTrans, SMnodeObj *pObj) { + SSdbRaw *pRedoRaw = mndMnodeActionEncode(pObj); + if (pRedoRaw == NULL) return -1; + if (mndTransAppendRedolog(pTrans, pRedoRaw) != 0) return -1; + if (sdbSetRawStatus(pRedoRaw, SDB_STATUS_CREATING) != 0) return -1; + return 0; +} + +static int32_t mndSetCreateMnodeUndoLogs(SMnode *pMnode, STrans *pTrans, SMnodeObj *pObj) { + SSdbRaw *pUndoRaw = mndMnodeActionEncode(pObj); + if (pUndoRaw == NULL) return -1; + if (mndTransAppendUndolog(pTrans, pUndoRaw) != 0) return -1; + if (sdbSetRawStatus(pUndoRaw, SDB_STATUS_DROPPED) != 0) return -1; + return 0; +} + +static int32_t mndSetCreateMnodeCommitLogs(SMnode *pMnode, STrans *pTrans, SMnodeObj *pObj) { + SSdbRaw *pCommitRaw = mndMnodeActionEncode(pObj); + if (pCommitRaw == NULL) return -1; + if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) return -1; + if (sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY) != 0) return -1; + return 0; +} + +static int32_t mndSetCreateMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SMnodeObj *pObj) { + STransAction action = {0}; + + SDnodeObj *pDnode = mndAcquireDnode(pMnode, pObj->id); + if (pDnode == NULL) return -1; + action.epSet = mndGetDnodeEpset(pDnode); + mndReleaseDnode(pMnode, pDnode); + + SCreateMnodeInMsg *pMsg = mndBuildCreateMnodeMsg(pMnode, pDnode, pObj); + if (pMsg == NULL) return -1; + + action.pCont = pMsg; + action.contLen = sizeof(SCreateMnodeInMsg); + action.msgType = TSDB_MSG_TYPE_CREATE_MNODE_IN; + if (mndTransAppendRedoAction(pTrans, &action) != 0) { + free(pMsg); + return -1; + } + + return 0; +} + static int32_t mndCreateMnode(SMnode *pMnode, SMnodeMsg *pMsg, SCreateMnodeMsg *pCreate) { SMnodeObj mnodeObj = {0}; mnodeObj.id = sdbGetMaxId(pMnode->pSdb, SDB_MNODE); mnodeObj.createdTime = taosGetTimestampMs(); mnodeObj.updateTime = mnodeObj.createdTime; - STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, pMsg->rpcMsg.handle); + int32_t code = -1; + STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, pMsg->rpcMsg.handle); if (pTrans == NULL) { - mError("dnode:%d, failed to create since %s", pCreate->dnodeId, terrstr()); - return -1; + mError("mnode:%d, failed to create since %s", pCreate->dnodeId, terrstr()); + goto CREATE_MNODE_OVER; } - mDebug("trans:%d, used to create dnode:%d", pTrans->id, pCreate->dnodeId); + mDebug("trans:%d, used to create mnode:%d", pTrans->id, pCreate->dnodeId); - SSdbRaw *pRedoRaw = mndMnodeActionEncode(&mnodeObj); - if (pRedoRaw == NULL || mndTransAppendRedolog(pTrans, pRedoRaw) != 0) { - mError("trans:%d, failed to append redo log since %s", pTrans->id, terrstr()); - mndTransDrop(pTrans); - return -1; + if (mndSetCreateMnodeRedoLogs(pMnode, pTrans, &mnodeObj) != 0) { + mError("trans:%d, failed to set redo log since %s", pTrans->id, terrstr()); + goto CREATE_MNODE_OVER; } - sdbSetRawStatus(pRedoRaw, SDB_STATUS_CREATING); - SSdbRaw *pUndoRaw = mndMnodeActionEncode(&mnodeObj); - if (pUndoRaw == NULL || mndTransAppendUndolog(pTrans, pUndoRaw) != 0) { - mError("trans:%d, failed to append undo log since %s", pTrans->id, terrstr()); - mndTransDrop(pTrans); - return -1; + if (mndSetCreateMnodeCommitLogs(pMnode, pTrans, &mnodeObj) != 0) { + mError("trans:%d, failed to set commit log since %s", pTrans->id, terrstr()); + goto CREATE_MNODE_OVER; } - sdbSetRawStatus(pUndoRaw, SDB_STATUS_DROPPED); - SSdbRaw *pCommitRaw = mndMnodeActionEncode(&mnodeObj); - if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) { - mError("trans:%d, failed to append commit log since %s", pTrans->id, terrstr()); - mndTransDrop(pTrans); - return -1; + if (mndSetCreateMnodeRedoActions(pMnode, pTrans, &mnodeObj) != 0) { + mError("trans:%d, failed to set redo actions since %s", pTrans->id, terrstr()); + goto CREATE_MNODE_OVER; } - sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY); if (mndTransPrepare(pMnode, pTrans) != 0) { mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr()); - mndTransDrop(pTrans); - return -1; + goto CREATE_MNODE_OVER; } + code = 0; + +CREATE_MNODE_OVER: mndTransDrop(pTrans); - return 0; + return code; } static int32_t mndProcessCreateMnodeReq(SMnodeMsg *pMsg) { @@ -282,46 +368,79 @@ static int32_t mndProcessCreateMnodeReq(SMnodeMsg *pMsg) { return TSDB_CODE_MND_ACTION_IN_PROGRESS; } +static int32_t mndSetDropMnodeRedoLogs(SMnode *pMnode, STrans *pTrans, SMnodeObj *pObj) { + SSdbRaw *pRedoRaw = mndMnodeActionEncode(pObj); + if (pRedoRaw == NULL) return -1; + if (mndTransAppendRedolog(pTrans, pRedoRaw) != 0) return -1; + if (sdbSetRawStatus(pRedoRaw, SDB_STATUS_DROPPING) != 0) return -1; + return 0; +} + +static int32_t mndSetDropMnodeCommitLogs(SMnode *pMnode, STrans *pTrans, SMnodeObj *pObj) { + SSdbRaw *pCommitRaw = mndMnodeActionEncode(pObj); + if (pCommitRaw == NULL) return -1; + if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) return -1; + if (sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED) != 0) return -1; + return 0; +} + +static int32_t mndSetDropMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SMnodeObj *pObj) { + STransAction action = {0}; + + SDnodeObj *pDnode = mndAcquireDnode(pMnode, pObj->id); + if (pDnode == NULL) return -1; + action.epSet = mndGetDnodeEpset(pDnode); + mndReleaseDnode(pMnode, pDnode); + + SDropMnodeInMsg *pMsg = mndBuildDropMnodeMsg(pMnode, pObj); + if (pMsg == NULL) return -1; + + action.pCont = pMsg; + action.contLen = sizeof(SDropMnodeInMsg); + action.msgType = TSDB_MSG_TYPE_CREATE_MNODE_IN; + if (mndTransAppendRedoAction(pTrans, &action) != 0) { + free(pMsg); + return -1; + } + + return 0; +} + static int32_t mndDropMnode(SMnode *pMnode, SMnodeMsg *pMsg, SMnodeObj *pObj) { - STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, pMsg->rpcMsg.handle); + int32_t code = -1; + STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, pMsg->rpcMsg.handle); if (pTrans == NULL) { mError("mnode:%d, failed to drop since %s", pObj->id, terrstr()); - return -1; + goto DROP_MNODE_OVER; } - mDebug("trans:%d, used to drop user:%d", pTrans->id, pObj->id); - SSdbRaw *pRedoRaw = mndMnodeActionEncode(pObj); - if (pRedoRaw == NULL || mndTransAppendRedolog(pTrans, pRedoRaw) != 0) { - mError("trans:%d, failed to append redo log since %s", pTrans->id, terrstr()); - mndTransDrop(pTrans); - return -1; - } - sdbSetRawStatus(pRedoRaw, SDB_STATUS_DROPPING); + mDebug("trans:%d, used to drop mnode:%d", pTrans->id, pObj->id); - SSdbRaw *pUndoRaw = mndMnodeActionEncode(pObj); - if (pUndoRaw == NULL || mndTransAppendUndolog(pTrans, pUndoRaw) != 0) { - mError("trans:%d, failed to append undo log since %s", pTrans->id, terrstr()); - mndTransDrop(pTrans); - return -1; + if (mndSetCreateMnodeRedoLogs(pMnode, pTrans, pObj) != 0) { + mError("trans:%d, failed to set redo log since %s", pTrans->id, terrstr()); + goto DROP_MNODE_OVER; } - sdbSetRawStatus(pUndoRaw, SDB_STATUS_READY); - SSdbRaw *pCommitRaw = mndMnodeActionEncode(pObj); - if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) { - mError("trans:%d, failed to append commit log since %s", pTrans->id, terrstr()); - mndTransDrop(pTrans); - return -1; + if (mndSetCreateMnodeCommitLogs(pMnode, pTrans, pObj) != 0) { + mError("trans:%d, failed to set commit log since %s", pTrans->id, terrstr()); + goto DROP_MNODE_OVER; + } + + if (mndSetCreateMnodeRedoActions(pMnode, pTrans, pObj) != 0) { + mError("trans:%d, failed to set redo actions since %s", pTrans->id, terrstr()); + goto DROP_MNODE_OVER; } - sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED); if (mndTransPrepare(pMnode, pTrans) != 0) { mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr()); - mndTransDrop(pTrans); - return -1; + goto DROP_MNODE_OVER; } + code = 0; + +DROP_MNODE_OVER: mndTransDrop(pTrans); - return 0; + return code; } static int32_t mndProcessDropMnodeReq(SMnodeMsg *pMsg) {