From a1c218d381dd71d006826350da9eccc0ee786ee8 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Sat, 9 Apr 2022 16:59:03 +0800 Subject: [PATCH] feat[cluster]: create and drop bnode --- source/dnode/mgmt/bm/bmHandle.c | 3 +- source/dnode/mgmt/bm/bmWorker.c | 43 +++++++----- source/dnode/mgmt/qm/qmWorker.c | 8 +-- source/dnode/mnode/impl/src/mndBnode.c | 97 +++++++++++++------------- 4 files changed, 78 insertions(+), 73 deletions(-) diff --git a/source/dnode/mgmt/bm/bmHandle.c b/source/dnode/mgmt/bm/bmHandle.c index 66163b807b..645a1d09c2 100644 --- a/source/dnode/mgmt/bm/bmHandle.c +++ b/source/dnode/mgmt/bm/bmHandle.c @@ -16,8 +16,7 @@ #define _DEFAULT_SOURCE #include "bmInt.h" -void bmGetMonitorInfo(SMgmtWrapper *pWrapper, SMonBmInfo *bmInfo) { -} +void bmGetMonitorInfo(SMgmtWrapper *pWrapper, SMonBmInfo *bmInfo) {} int32_t bmProcessGetMonBmInfoReq(SMgmtWrapper *pWrapper, SNodeMsg *pReq) { SMonBmInfo bmInfo = {0}; diff --git a/source/dnode/mgmt/bm/bmWorker.c b/source/dnode/mgmt/bm/bmWorker.c index cf2d7ac939..3481078d51 100644 --- a/source/dnode/mgmt/bm/bmWorker.c +++ b/source/dnode/mgmt/bm/bmWorker.c @@ -16,7 +16,7 @@ #define _DEFAULT_SOURCE #include "bmInt.h" -static void bmSendErrorRsp(SMgmtWrapper *pWrapper, SNodeMsg *pMsg, int32_t code) { +static void bmSendErrorRsp(SNodeMsg *pMsg, int32_t code) { SRpcMsg rpcRsp = {.handle = pMsg->rpcMsg.handle, .ahandle = pMsg->rpcMsg.ahandle, .code = code}; tmsgSendRsp(&rpcRsp); @@ -25,15 +25,17 @@ static void bmSendErrorRsp(SMgmtWrapper *pWrapper, SNodeMsg *pMsg, int32_t code) taosFreeQitem(pMsg); } -static void bmSendErrorRsps(SMgmtWrapper *pWrapper, STaosQall *qall, int32_t numOfMsgs, int32_t code) { +static void bmSendErrorRsps(STaosQall *qall, int32_t numOfMsgs, int32_t code) { for (int32_t i = 0; i < numOfMsgs; ++i) { SNodeMsg *pMsg = NULL; taosGetQitem(qall, (void **)&pMsg); - bmSendErrorRsp(pWrapper, pMsg, code); + if (pMsg != NULL) { + bmSendErrorRsp(pMsg, code); + } } } -static inline void bmSendRsp(SMgmtWrapper *pWrapper, SNodeMsg *pMsg, int32_t code) { +static inline void bmSendRsp(SNodeMsg *pMsg, int32_t code) { SRpcMsg rsp = {.handle = pMsg->rpcMsg.handle, .ahandle = pMsg->rpcMsg.ahandle, .code = code, @@ -42,20 +44,22 @@ static inline void bmSendRsp(SMgmtWrapper *pWrapper, SNodeMsg *pMsg, int32_t cod tmsgSendRsp(&rsp); } -static void bmProcessMonQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) { +static void bmProcessMonitorQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) { SBnodeMgmt *pMgmt = pInfo->ahandle; - dTrace("msg:%p, get from bnode monitor queue", pMsg); + dTrace("msg:%p, get from bnode-monitor queue", pMsg); SRpcMsg *pRpc = &pMsg->rpcMsg; int32_t code = -1; if (pMsg->rpcMsg.msgType == TDMT_MON_BM_INFO) { code = bmProcessGetMonBmInfoReq(pMgmt->pWrapper, pMsg); + } else { + terrno = TSDB_CODE_MSG_NOT_PROCESSED; } if (pRpc->msgType & 1U) { if (code != 0 && terrno != 0) code = terrno; - bmSendRsp(pMgmt->pWrapper, pMsg, code); + bmSendRsp(pMsg, code); } dTrace("msg:%p, is freed, result:0x%04x:%s", pMsg, code & 0XFFFF, tstrerror(code)); @@ -64,21 +68,22 @@ static void bmProcessMonQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) { } static void bmProcessWriteQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) { - SBnodeMgmt *pMgmt = pInfo->ahandle; - SMgmtWrapper *pWrapper = pMgmt->pWrapper; + SBnodeMgmt *pMgmt = pInfo->ahandle; SArray *pArray = taosArrayInit(numOfMsgs, sizeof(SNodeMsg *)); if (pArray == NULL) { - bmSendErrorRsps(pWrapper, qall, numOfMsgs, TSDB_CODE_OUT_OF_MEMORY); + bmSendErrorRsps(qall, numOfMsgs, TSDB_CODE_OUT_OF_MEMORY); return; } for (int32_t i = 0; i < numOfMsgs; ++i) { SNodeMsg *pMsg = NULL; taosGetQitem(qall, (void **)&pMsg); - dTrace("msg:%p, will be processed in bnode queue", pMsg); - if (taosArrayPush(pArray, &pMsg) == NULL) { - bmSendErrorRsp(pWrapper, pMsg, TSDB_CODE_OUT_OF_MEMORY); + if (pMsg != NULL) { + dTrace("msg:%p, get from bnode-write queue", pMsg); + if (taosArrayPush(pArray, &pMsg) == NULL) { + bmSendErrorRsp(pMsg, TSDB_CODE_OUT_OF_MEMORY); + } } } @@ -86,9 +91,11 @@ static void bmProcessWriteQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO for (size_t i = 0; i < numOfMsgs; i++) { SNodeMsg *pMsg = *(SNodeMsg **)taosArrayGet(pArray, i); - dTrace("msg:%p, is freed", pMsg); - rpcFreeCont(pMsg->rpcMsg.pCont); - taosFreeQitem(pMsg); + if (pMsg != NULL) { + dTrace("msg:%p, is freed", pMsg); + rpcFreeCont(pMsg->rpcMsg.pCont); + taosFreeQitem(pMsg); + } } taosArrayDestroy(pArray); } @@ -120,12 +127,12 @@ int32_t bmStartWorker(SBnodeMgmt *pMgmt) { if (tsMultiProcess) { SSingleWorkerCfg mCfg = { - .min = 1, .max = 1, .name = "bnode-monitor", .fp = (FItem)bmProcessMonQueue, .param = pMgmt}; + .min = 1, .max = 1, .name = "bnode-monitor", .fp = (FItem)bmProcessMonitorQueue, .param = pMgmt}; if (tSingleWorkerInit(&pMgmt->monitorWorker, &mCfg) != 0) { dError("failed to start bnode-monitor worker since %s", terrstr()); return -1; } - } + } dDebug("bnode workers are initialized"); return 0; diff --git a/source/dnode/mgmt/qm/qmWorker.c b/source/dnode/mgmt/qm/qmWorker.c index 0acf536d8e..974052cdf6 100644 --- a/source/dnode/mgmt/qm/qmWorker.c +++ b/source/dnode/mgmt/qm/qmWorker.c @@ -16,7 +16,7 @@ #define _DEFAULT_SOURCE #include "qmInt.h" -static inline void qmSendRsp(SMgmtWrapper *pWrapper, SNodeMsg *pMsg, int32_t code) { +static inline void qmSendRsp(SNodeMsg *pMsg, int32_t code) { SRpcMsg rsp = {.handle = pMsg->rpcMsg.handle, .ahandle = pMsg->rpcMsg.ahandle, .code = code, @@ -40,7 +40,7 @@ static void qmProcessMonitorQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) { if (pRpc->msgType & 1U) { if (code != 0 && terrno != 0) code = terrno; - qmSendRsp(pMgmt->pWrapper, pMsg, code); + qmSendRsp(pMsg, code); } dTrace("msg:%p, is freed, result:0x%04x:%s", pMsg, code & 0XFFFF, tstrerror(code)); @@ -56,7 +56,7 @@ static void qmProcessQueryQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) { int32_t code = qndProcessQueryMsg(pMgmt->pQnode, pRpc); if (pRpc->msgType & 1U && code != 0) { - qmSendRsp(pMgmt->pWrapper, pMsg, code); + qmSendRsp(pMsg, code); } dTrace("msg:%p, is freed, result:0x%04x:%s", pMsg, code & 0XFFFF, tstrerror(code)); @@ -72,7 +72,7 @@ static void qmProcessFetchQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) { int32_t code = qndProcessFetchMsg(pMgmt->pQnode, pRpc); if (pRpc->msgType & 1U && code != 0) { - qmSendRsp(pMgmt->pWrapper, pMsg, code); + qmSendRsp(pMsg, code); } dTrace("msg:%p, is freed, result:0x%04x:%s", pMsg, code & 0XFFFF, tstrerror(code)); diff --git a/source/dnode/mnode/impl/src/mndBnode.c b/source/dnode/mnode/impl/src/mndBnode.c index b165b0aac0..a8b1ec393b 100644 --- a/source/dnode/mnode/impl/src/mndBnode.c +++ b/source/dnode/mnode/impl/src/mndBnode.c @@ -21,17 +21,17 @@ #include "mndTrans.h" #include "mndUser.h" -#define TSDB_BNODE_VER_NUMBER 1 -#define TSDB_BNODE_RESERVE_SIZE 64 +#define BNODE_VER_NUMBER 1 +#define BNODE_RESERVE_SIZE 64 static SSdbRaw *mndBnodeActionEncode(SBnodeObj *pObj); static SSdbRow *mndBnodeActionDecode(SSdbRaw *pRaw); static int32_t mndBnodeActionInsert(SSdb *pSdb, SBnodeObj *pObj); -static int32_t mndBnodeActionDelete(SSdb *pSdb, SBnodeObj *pObj); static int32_t mndBnodeActionUpdate(SSdb *pSdb, SBnodeObj *pOld, SBnodeObj *pNew); +static int32_t mndBnodeActionDelete(SSdb *pSdb, SBnodeObj *pObj); static int32_t mndProcessCreateBnodeReq(SNodeMsg *pReq); -static int32_t mndProcessDropBnodeReq(SNodeMsg *pReq); static int32_t mndProcessCreateBnodeRsp(SNodeMsg *pRsp); +static int32_t mndProcessDropBnodeReq(SNodeMsg *pReq); static int32_t mndProcessDropBnodeRsp(SNodeMsg *pRsp); static int32_t mndGetBnodeMeta(SNodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta); static int32_t mndRetrieveBnodes(SNodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows); @@ -76,18 +76,18 @@ static void mndReleaseBnode(SMnode *pMnode, SBnodeObj *pObj) { static SSdbRaw *mndBnodeActionEncode(SBnodeObj *pObj) { terrno = TSDB_CODE_OUT_OF_MEMORY; - SSdbRaw *pRaw = sdbAllocRaw(SDB_BNODE, TSDB_BNODE_VER_NUMBER, sizeof(SBnodeObj) + TSDB_BNODE_RESERVE_SIZE); - if (pRaw == NULL) goto BNODE_ENCODE_OVER; + SSdbRaw *pRaw = sdbAllocRaw(SDB_BNODE, BNODE_VER_NUMBER, sizeof(SBnodeObj) + BNODE_RESERVE_SIZE); + if (pRaw == NULL) goto _OVER; int32_t dataPos = 0; - SDB_SET_INT32(pRaw, dataPos, pObj->id, BNODE_ENCODE_OVER) - SDB_SET_INT64(pRaw, dataPos, pObj->createdTime, BNODE_ENCODE_OVER) - SDB_SET_INT64(pRaw, dataPos, pObj->updateTime, BNODE_ENCODE_OVER) - SDB_SET_RESERVE(pRaw, dataPos, TSDB_BNODE_RESERVE_SIZE, BNODE_ENCODE_OVER) + SDB_SET_INT32(pRaw, dataPos, pObj->id, _OVER) + SDB_SET_INT64(pRaw, dataPos, pObj->createdTime, _OVER) + SDB_SET_INT64(pRaw, dataPos, pObj->updateTime, _OVER) + SDB_SET_RESERVE(pRaw, dataPos, BNODE_RESERVE_SIZE, _OVER) terrno = 0; -BNODE_ENCODE_OVER: +_OVER: if (terrno != 0) { mError("bnode:%d, failed to encode to raw:%p since %s", pObj->id, pRaw, terrstr()); sdbFreeRaw(pRaw); @@ -102,28 +102,28 @@ static SSdbRow *mndBnodeActionDecode(SSdbRaw *pRaw) { terrno = TSDB_CODE_OUT_OF_MEMORY; int8_t sver = 0; - if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto BNODE_DECODE_OVER; + if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto _OVER; - if (sver != TSDB_BNODE_VER_NUMBER) { + if (sver != BNODE_VER_NUMBER) { terrno = TSDB_CODE_SDB_INVALID_DATA_VER; - goto BNODE_DECODE_OVER; + goto _OVER; } SSdbRow *pRow = sdbAllocRow(sizeof(SBnodeObj)); - if (pRow == NULL) goto BNODE_DECODE_OVER; + if (pRow == NULL) goto _OVER; SBnodeObj *pObj = sdbGetRowObj(pRow); - if (pObj == NULL) goto BNODE_DECODE_OVER; + if (pObj == NULL) goto _OVER; int32_t dataPos = 0; - SDB_GET_INT32(pRaw, dataPos, &pObj->id, BNODE_DECODE_OVER) - SDB_GET_INT64(pRaw, dataPos, &pObj->createdTime, BNODE_DECODE_OVER) - SDB_GET_INT64(pRaw, dataPos, &pObj->updateTime, BNODE_DECODE_OVER) - SDB_GET_RESERVE(pRaw, dataPos, TSDB_BNODE_RESERVE_SIZE, BNODE_DECODE_OVER) + SDB_GET_INT32(pRaw, dataPos, &pObj->id, _OVER) + SDB_GET_INT64(pRaw, dataPos, &pObj->createdTime, _OVER) + SDB_GET_INT64(pRaw, dataPos, &pObj->updateTime, _OVER) + SDB_GET_RESERVE(pRaw, dataPos, BNODE_RESERVE_SIZE, _OVER) terrno = 0; -BNODE_DECODE_OVER: +_OVER: if (terrno != 0) { mError("bnode:%d, failed to decode from raw:%p since %s", pObj->id, pRaw, terrstr()); taosMemoryFreeClear(pRow); @@ -249,19 +249,19 @@ static int32_t mndCreateBnode(SMnode *pMnode, SNodeMsg *pReq, SDnodeObj *pDnode, bnodeObj.updateTime = bnodeObj.createdTime; STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_CREATE_BNODE, &pReq->rpcMsg); - if (pTrans == NULL) goto CREATE_BNODE_OVER; + if (pTrans == NULL) goto _OVER; mDebug("trans:%d, used to create bnode:%d", pTrans->id, pCreate->dnodeId); - if (mndSetCreateBnodeRedoLogs(pTrans, &bnodeObj) != 0) goto CREATE_BNODE_OVER; - if (mndSetCreateBnodeUndoLogs(pTrans, &bnodeObj) != 0) goto CREATE_BNODE_OVER; - if (mndSetCreateBnodeCommitLogs(pTrans, &bnodeObj) != 0) goto CREATE_BNODE_OVER; - if (mndSetCreateBnodeRedoActions(pTrans, pDnode, &bnodeObj) != 0) goto CREATE_BNODE_OVER; - if (mndSetCreateBnodeUndoActions(pTrans, pDnode, &bnodeObj) != 0) goto CREATE_BNODE_OVER; - if (mndTransPrepare(pMnode, pTrans) != 0) goto CREATE_BNODE_OVER; + if (mndSetCreateBnodeRedoLogs(pTrans, &bnodeObj) != 0) goto _OVER; + if (mndSetCreateBnodeUndoLogs(pTrans, &bnodeObj) != 0) goto _OVER; + if (mndSetCreateBnodeCommitLogs(pTrans, &bnodeObj) != 0) goto _OVER; + if (mndSetCreateBnodeRedoActions(pTrans, pDnode, &bnodeObj) != 0) goto _OVER; + if (mndSetCreateBnodeUndoActions(pTrans, pDnode, &bnodeObj) != 0) goto _OVER; + if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER; code = 0; -CREATE_BNODE_OVER: +_OVER: mndTransDrop(pTrans); return code; } @@ -276,7 +276,7 @@ static int32_t mndProcessCreateBnodeReq(SNodeMsg *pReq) { if (tDeserializeSCreateDropMQSBNodeReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &createReq) != 0) { terrno = TSDB_CODE_INVALID_MSG; - goto CREATE_BNODE_OVER; + goto _OVER; } mDebug("bnode:%d, start to create", createReq.dnodeId); @@ -284,31 +284,31 @@ static int32_t mndProcessCreateBnodeReq(SNodeMsg *pReq) { pObj = mndAcquireBnode(pMnode, createReq.dnodeId); if (pObj != NULL) { terrno = TSDB_CODE_MND_BNODE_ALREADY_EXIST; - goto CREATE_BNODE_OVER; + goto _OVER; } else if (terrno != TSDB_CODE_MND_BNODE_NOT_EXIST) { - goto CREATE_BNODE_OVER; + goto _OVER; } pDnode = mndAcquireDnode(pMnode, createReq.dnodeId); if (pDnode == NULL) { terrno = TSDB_CODE_MND_DNODE_NOT_EXIST; - goto CREATE_BNODE_OVER; + goto _OVER; } pUser = mndAcquireUser(pMnode, pReq->user); if (pUser == NULL) { terrno = TSDB_CODE_MND_NO_USER_FROM_CONN; - goto CREATE_BNODE_OVER; + goto _OVER; } if (mndCheckNodeAuth(pUser)) { - goto CREATE_BNODE_OVER; + goto _OVER; } code = mndCreateBnode(pMnode, pReq, pDnode, &createReq); if (code == 0) code = TSDB_CODE_MND_ACTION_IN_PROGRESS; -CREATE_BNODE_OVER: +_OVER: if (code != 0 && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { mError("bnode:%d, failed to create since %s", createReq.dnodeId, terrstr()); } @@ -316,7 +316,6 @@ CREATE_BNODE_OVER: mndReleaseBnode(pMnode, pObj); mndReleaseDnode(pMnode, pDnode); mndReleaseUser(pMnode, pUser); - return code; } @@ -367,17 +366,17 @@ static int32_t mndDropBnode(SMnode *pMnode, SNodeMsg *pReq, SBnodeObj *pObj) { int32_t code = -1; STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_TYPE_DROP_BNODE, &pReq->rpcMsg); - if (pTrans == NULL) goto DROP_BNODE_OVER; + if (pTrans == NULL) goto _OVER; mDebug("trans:%d, used to drop bnode:%d", pTrans->id, pObj->id); - if (mndSetDropBnodeRedoLogs(pTrans, pObj) != 0) goto DROP_BNODE_OVER; - if (mndSetDropBnodeCommitLogs(pTrans, pObj) != 0) goto DROP_BNODE_OVER; - if (mndSetDropBnodeRedoActions(pTrans, pObj->pDnode, pObj) != 0) goto DROP_BNODE_OVER; - if (mndTransPrepare(pMnode, pTrans) != 0) goto DROP_BNODE_OVER; + if (mndSetDropBnodeRedoLogs(pTrans, pObj) != 0) goto _OVER; + if (mndSetDropBnodeCommitLogs(pTrans, pObj) != 0) goto _OVER; + if (mndSetDropBnodeRedoActions(pTrans, pObj->pDnode, pObj) != 0) goto _OVER; + if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER; code = 0; -DROP_BNODE_OVER: +_OVER: mndTransDrop(pTrans); return code; } @@ -391,35 +390,35 @@ static int32_t mndProcessDropBnodeReq(SNodeMsg *pReq) { if (tDeserializeSCreateDropMQSBNodeReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &dropReq) != 0) { terrno = TSDB_CODE_INVALID_MSG; - goto DROP_BNODE_OVER; + goto _OVER; } mDebug("bnode:%d, start to drop", dropReq.dnodeId); if (dropReq.dnodeId <= 0) { terrno = TSDB_CODE_SDB_APP_ERROR; - goto DROP_BNODE_OVER; + goto _OVER; } pObj = mndAcquireBnode(pMnode, dropReq.dnodeId); if (pObj == NULL) { - goto DROP_BNODE_OVER; + goto _OVER; } pUser = mndAcquireUser(pMnode, pReq->user); if (pUser == NULL) { terrno = TSDB_CODE_MND_NO_USER_FROM_CONN; - goto DROP_BNODE_OVER; + goto _OVER; } if (mndCheckNodeAuth(pUser)) { - goto DROP_BNODE_OVER; + goto _OVER; } code = mndDropBnode(pMnode, pReq, pObj); if (code == 0) code = TSDB_CODE_MND_ACTION_IN_PROGRESS; -DROP_BNODE_OVER: +_OVER: if (code != 0 && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { mError("bnode:%d, failed to drop since %s", dropReq.dnodeId, terrstr()); }