From 6810e82fb5b4c8b6519141b20483c40eb79a7138 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Sat, 9 Apr 2022 17:23:05 +0800 Subject: [PATCH] feat[cluster]: create and drop snode --- source/dnode/mgmt/sm/smHandle.c | 2 +- source/dnode/mgmt/sm/smWorker.c | 15 ++-- source/dnode/mnode/impl/src/mndSnode.c | 105 ++++++++++++------------- 3 files changed, 60 insertions(+), 62 deletions(-) diff --git a/source/dnode/mgmt/sm/smHandle.c b/source/dnode/mgmt/sm/smHandle.c index 79fee0f4b7..36345cf490 100644 --- a/source/dnode/mgmt/sm/smHandle.c +++ b/source/dnode/mgmt/sm/smHandle.c @@ -55,7 +55,7 @@ int32_t smProcessCreateReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { if (createReq.dnodeId != pDnode->dnodeId) { terrno = TSDB_CODE_INVALID_OPTION; - dError("failed to create snode since %s, input:%d cur:%d", terrstr(), createReq.dnodeId, pDnode->dnodeId); + dError("failed to create snode since %s", terrstr()); return -1; } else { return smOpen(pWrapper); diff --git a/source/dnode/mgmt/sm/smWorker.c b/source/dnode/mgmt/sm/smWorker.c index a29d5d1abc..cf343423b7 100644 --- a/source/dnode/mgmt/sm/smWorker.c +++ b/source/dnode/mgmt/sm/smWorker.c @@ -16,7 +16,7 @@ #define _DEFAULT_SOURCE #include "smInt.h" -static inline void smSendRsp(SMgmtWrapper *pWrapper, SNodeMsg *pMsg, int32_t code) { +static inline void smSendRsp(SNodeMsg *pMsg, int32_t code) { SRpcMsg rsp = {.handle = pMsg->rpcMsg.handle, .ahandle = pMsg->rpcMsg.ahandle, .code = code, @@ -28,17 +28,19 @@ static inline void smSendRsp(SMgmtWrapper *pWrapper, SNodeMsg *pMsg, int32_t cod static void smProcessMonitorQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) { SSnodeMgmt *pMgmt = pInfo->ahandle; - dTrace("msg:%p, get from snode monitor queue", pMsg); + dTrace("msg:%p, get from snode-monitor queue", pMsg); SRpcMsg *pRpc = &pMsg->rpcMsg; int32_t code = -1; if (pMsg->rpcMsg.msgType == TDMT_MON_SM_INFO) { code = smProcessGetMonSmInfoReq(pMgmt->pWrapper, pMsg); + } else { + terrno = TSDB_CODE_MSG_NOT_PROCESSED; } if (pRpc->msgType & 1U) { if (code != 0 && terrno != 0) code = terrno; - smSendRsp(pMgmt->pWrapper, pMsg, code); + smSendRsp(pMsg, code); } dTrace("msg:%p, is freed, result:0x%04x:%s", pMsg, code & 0XFFFF, tstrerror(code)); @@ -53,7 +55,7 @@ static void smProcessUniqueQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t num SNodeMsg *pMsg = NULL; taosGetQitem(qall, (void **)&pMsg); - dTrace("msg:%p, will be processed in snode unique queue", pMsg); + dTrace("msg:%p, get from snode-unique queue", pMsg); sndProcessUMsg(pMgmt->pSnode, &pMsg->rpcMsg); dTrace("msg:%p, is freed", pMsg); @@ -65,7 +67,7 @@ static void smProcessUniqueQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t num static void smProcessSharedQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) { SSnodeMgmt *pMgmt = pInfo->ahandle; - dTrace("msg:%p, will be processed in snode shared queue", pMsg); + dTrace("msg:%p, get from snode-shared queue", pMsg); sndProcessSMsg(pMgmt->pSnode, &pMsg->rpcMsg); dTrace("msg:%p, is freed", pMsg); @@ -88,7 +90,6 @@ int32_t smStartWorker(SSnodeMgmt *pMgmt) { } SMultiWorkerCfg cfg = {.max = 1, .name = "snode-unique", .fp = smProcessUniqueQueue, .param = pMgmt}; - if (tMultiWorkerInit(pUniqueWorker, &cfg) != 0) { dError("failed to start snode-unique worker since %s", terrstr()); return -1; @@ -193,7 +194,7 @@ int32_t smProcessSharedMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { } int32_t smProcessExecMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { - int32_t workerType = smGetSWTypeFromMsg(&pMsg->rpcMsg); + int32_t workerType = smGetSWTypeFromMsg(&pMsg->rpcMsg); if (workerType == SND_WORKER_TYPE__SHARED) { return smProcessSharedMsg(pWrapper, pMsg); } else { diff --git a/source/dnode/mnode/impl/src/mndSnode.c b/source/dnode/mnode/impl/src/mndSnode.c index 2381724a50..87e5962f6b 100644 --- a/source/dnode/mnode/impl/src/mndSnode.c +++ b/source/dnode/mnode/impl/src/mndSnode.c @@ -21,17 +21,17 @@ #include "mndTrans.h" #include "mndUser.h" -#define TSDB_SNODE_VER_NUMBER 1 -#define TSDB_SNODE_RESERVE_SIZE 64 +#define SNODE_VER_NUMBER 1 +#define SNODE_RESERVE_SIZE 64 static SSdbRaw *mndSnodeActionEncode(SSnodeObj *pObj); static SSdbRow *mndSnodeActionDecode(SSdbRaw *pRaw); static int32_t mndSnodeActionInsert(SSdb *pSdb, SSnodeObj *pObj); -static int32_t mndSnodeActionDelete(SSdb *pSdb, SSnodeObj *pObj); static int32_t mndSnodeActionUpdate(SSdb *pSdb, SSnodeObj *pOld, SSnodeObj *pNew); +static int32_t mndSnodeActionDelete(SSdb *pSdb, SSnodeObj *pObj); static int32_t mndProcessCreateSnodeReq(SNodeMsg *pReq); -static int32_t mndProcessDropSnodeReq(SNodeMsg *pReq); static int32_t mndProcessCreateSnodeRsp(SNodeMsg *pRsp); +static int32_t mndProcessDropSnodeReq(SNodeMsg *pReq); static int32_t mndProcessDropSnodeRsp(SNodeMsg *pRsp); static int32_t mndGetSnodeMeta(SNodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta); static int32_t mndRetrieveSnodes(SNodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows); @@ -61,11 +61,9 @@ int32_t mndInitSnode(SMnode *pMnode) { void mndCleanupSnode(SMnode *pMnode) {} SEpSet mndAcquireEpFromSnode(SMnode *pMnode, const SSnodeObj *pSnode) { - SEpSet epSet; - memcpy(epSet.eps->fqdn, pSnode->pDnode->fqdn, 128); - epSet.eps->port = pSnode->pDnode->port; - epSet.numOfEps = 1; - epSet.inUse = 0; + SEpSet epSet = {.numOfEps = 1, .inUse = 0}; + memcpy(epSet.eps[0].fqdn, pSnode->pDnode->fqdn, TSDB_FQDN_LEN); + epSet.eps[0].port = pSnode->pDnode->port; return epSet; } @@ -85,18 +83,18 @@ static void mndReleaseSnode(SMnode *pMnode, SSnodeObj *pObj) { static SSdbRaw *mndSnodeActionEncode(SSnodeObj *pObj) { terrno = TSDB_CODE_OUT_OF_MEMORY; - SSdbRaw *pRaw = sdbAllocRaw(SDB_SNODE, TSDB_SNODE_VER_NUMBER, sizeof(SSnodeObj) + TSDB_SNODE_RESERVE_SIZE); - if (pRaw == NULL) goto SNODE_ENCODE_OVER; + SSdbRaw *pRaw = sdbAllocRaw(SDB_SNODE, SNODE_VER_NUMBER, sizeof(SSnodeObj) + SNODE_RESERVE_SIZE); + if (pRaw == NULL) goto _OVER; int32_t dataPos = 0; - SDB_SET_INT32(pRaw, dataPos, pObj->id, SNODE_ENCODE_OVER) - SDB_SET_INT64(pRaw, dataPos, pObj->createdTime, SNODE_ENCODE_OVER) - SDB_SET_INT64(pRaw, dataPos, pObj->updateTime, SNODE_ENCODE_OVER) - SDB_SET_RESERVE(pRaw, dataPos, TSDB_SNODE_RESERVE_SIZE, SNODE_ENCODE_OVER) + SDB_SET_INT32(pRaw, dataPos, pObj->id, _OVER) + SDB_SET_INT64(pRaw, dataPos, pObj->createdTime, _OVER) + SDB_SET_INT64(pRaw, dataPos, pObj->updateTime, _OVER) + SDB_SET_RESERVE(pRaw, dataPos, SNODE_RESERVE_SIZE, _OVER) terrno = 0; -SNODE_ENCODE_OVER: +_OVER: if (terrno != 0) { mError("snode:%d, failed to encode to raw:%p since %s", pObj->id, pRaw, terrstr()); sdbFreeRaw(pRaw); @@ -111,28 +109,28 @@ static SSdbRow *mndSnodeActionDecode(SSdbRaw *pRaw) { terrno = TSDB_CODE_OUT_OF_MEMORY; int8_t sver = 0; - if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto SNODE_DECODE_OVER; + if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto _OVER; - if (sver != TSDB_SNODE_VER_NUMBER) { + if (sver != SNODE_VER_NUMBER) { terrno = TSDB_CODE_SDB_INVALID_DATA_VER; - goto SNODE_DECODE_OVER; + goto _OVER; } SSdbRow *pRow = sdbAllocRow(sizeof(SSnodeObj)); - if (pRow == NULL) goto SNODE_DECODE_OVER; + if (pRow == NULL) goto _OVER; SSnodeObj *pObj = sdbGetRowObj(pRow); - if (pObj == NULL) goto SNODE_DECODE_OVER; + if (pObj == NULL) goto _OVER; int32_t dataPos = 0; - SDB_GET_INT32(pRaw, dataPos, &pObj->id, SNODE_DECODE_OVER) - SDB_GET_INT64(pRaw, dataPos, &pObj->createdTime, SNODE_DECODE_OVER) - SDB_GET_INT64(pRaw, dataPos, &pObj->updateTime, SNODE_DECODE_OVER) - SDB_GET_RESERVE(pRaw, dataPos, TSDB_SNODE_RESERVE_SIZE, SNODE_DECODE_OVER) + SDB_GET_INT32(pRaw, dataPos, &pObj->id, _OVER) + SDB_GET_INT64(pRaw, dataPos, &pObj->createdTime, _OVER) + SDB_GET_INT64(pRaw, dataPos, &pObj->updateTime, _OVER) + SDB_GET_RESERVE(pRaw, dataPos, SNODE_RESERVE_SIZE, _OVER) terrno = 0; -SNODE_DECODE_OVER: +_OVER: if (terrno != 0) { mError("snode:%d, failed to decode from raw:%p since %s", pObj->id, pRaw, terrstr()); taosMemoryFreeClear(pRow); @@ -258,20 +256,20 @@ static int32_t mndCreateSnode(SMnode *pMnode, SNodeMsg *pReq, SDnodeObj *pDnode, snodeObj.updateTime = snodeObj.createdTime; STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_CREATE_SNODE, &pReq->rpcMsg); - if (pTrans == NULL) goto CREATE_SNODE_OVER; + if (pTrans == NULL) goto _OVER; mDebug("trans:%d, used to create snode:%d", pTrans->id, pCreate->dnodeId); - if (mndSetCreateSnodeRedoLogs(pTrans, &snodeObj) != 0) goto CREATE_SNODE_OVER; - if (mndSetCreateSnodeUndoLogs(pTrans, &snodeObj) != 0) goto CREATE_SNODE_OVER; - if (mndSetCreateSnodeCommitLogs(pTrans, &snodeObj) != 0) goto CREATE_SNODE_OVER; - if (mndSetCreateSnodeRedoActions(pTrans, pDnode, &snodeObj) != 0) goto CREATE_SNODE_OVER; - if (mndSetCreateSnodeUndoActions(pTrans, pDnode, &snodeObj) != 0) goto CREATE_SNODE_OVER; - if (mndTransPrepare(pMnode, pTrans) != 0) goto CREATE_SNODE_OVER; + if (mndSetCreateSnodeRedoLogs(pTrans, &snodeObj) != 0) goto _OVER; + if (mndSetCreateSnodeUndoLogs(pTrans, &snodeObj) != 0) goto _OVER; + if (mndSetCreateSnodeCommitLogs(pTrans, &snodeObj) != 0) goto _OVER; + if (mndSetCreateSnodeRedoActions(pTrans, pDnode, &snodeObj) != 0) goto _OVER; + if (mndSetCreateSnodeUndoActions(pTrans, pDnode, &snodeObj) != 0) goto _OVER; + if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER; code = 0; -CREATE_SNODE_OVER: +_OVER: mndTransDrop(pTrans); return code; } @@ -286,7 +284,7 @@ static int32_t mndProcessCreateSnodeReq(SNodeMsg *pReq) { if (tDeserializeSCreateDropMQSBNodeReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &createReq) != 0) { terrno = TSDB_CODE_INVALID_MSG; - goto CREATE_SNODE_OVER; + goto _OVER; } mDebug("snode:%d, start to create", createReq.dnodeId); @@ -294,31 +292,31 @@ static int32_t mndProcessCreateSnodeReq(SNodeMsg *pReq) { pObj = mndAcquireSnode(pMnode, createReq.dnodeId); if (pObj != NULL) { terrno = TSDB_CODE_MND_SNODE_ALREADY_EXIST; - goto CREATE_SNODE_OVER; + goto _OVER; } else if (terrno != TSDB_CODE_MND_SNODE_NOT_EXIST) { - goto CREATE_SNODE_OVER; + goto _OVER; } pDnode = mndAcquireDnode(pMnode, createReq.dnodeId); if (pDnode == NULL) { terrno = TSDB_CODE_MND_DNODE_NOT_EXIST; - goto CREATE_SNODE_OVER; + goto _OVER; } pUser = mndAcquireUser(pMnode, pReq->user); if (pUser == NULL) { terrno = TSDB_CODE_MND_NO_USER_FROM_CONN; - goto CREATE_SNODE_OVER; + goto _OVER; } if (mndCheckNodeAuth(pUser)) { - goto CREATE_SNODE_OVER; + goto _OVER; } code = mndCreateSnode(pMnode, pReq, pDnode, &createReq); if (code == 0) code = TSDB_CODE_MND_ACTION_IN_PROGRESS; -CREATE_SNODE_OVER: +_OVER: if (code != 0 && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { mError("snode:%d, failed to create since %s", createReq.dnodeId, terrstr()); return -1; @@ -327,7 +325,6 @@ CREATE_SNODE_OVER: mndReleaseSnode(pMnode, pObj); mndReleaseDnode(pMnode, pDnode); mndReleaseUser(pMnode, pUser); - return code; } @@ -378,18 +375,18 @@ static int32_t mndDropSnode(SMnode *pMnode, SNodeMsg *pReq, SSnodeObj *pObj) { int32_t code = -1; STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_TYPE_DROP_SNODE, &pReq->rpcMsg); - if (pTrans == NULL) goto DROP_SNODE_OVER; + if (pTrans == NULL) goto _OVER; mDebug("trans:%d, used to drop snode:%d", pTrans->id, pObj->id); - if (mndSetDropSnodeRedoLogs(pTrans, pObj) != 0) goto DROP_SNODE_OVER; - if (mndSetDropSnodeCommitLogs(pTrans, pObj) != 0) goto DROP_SNODE_OVER; - if (mndSetDropSnodeRedoActions(pTrans, pObj->pDnode, pObj) != 0) goto DROP_SNODE_OVER; - if (mndTransPrepare(pMnode, pTrans) != 0) goto DROP_SNODE_OVER; + if (mndSetDropSnodeRedoLogs(pTrans, pObj) != 0) goto _OVER; + if (mndSetDropSnodeCommitLogs(pTrans, pObj) != 0) goto _OVER; + if (mndSetDropSnodeRedoActions(pTrans, pObj->pDnode, pObj) != 0) goto _OVER; + if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER; code = 0; -DROP_SNODE_OVER: +_OVER: mndTransDrop(pTrans); return code; } @@ -403,35 +400,35 @@ static int32_t mndProcessDropSnodeReq(SNodeMsg *pReq) { if (tDeserializeSCreateDropMQSBNodeReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &dropReq) != 0) { terrno = TSDB_CODE_INVALID_MSG; - goto DROP_SNODE_OVER; + goto _OVER; } mDebug("snode:%d, start to drop", dropReq.dnodeId); if (dropReq.dnodeId <= 0) { terrno = TSDB_CODE_SDB_APP_ERROR; - goto DROP_SNODE_OVER; + goto _OVER; } pObj = mndAcquireSnode(pMnode, dropReq.dnodeId); if (pObj == NULL) { - goto DROP_SNODE_OVER; + goto _OVER; } pUser = mndAcquireUser(pMnode, pReq->user); if (pUser == NULL) { terrno = TSDB_CODE_MND_NO_USER_FROM_CONN; - goto DROP_SNODE_OVER; + goto _OVER; } if (mndCheckNodeAuth(pUser)) { - goto DROP_SNODE_OVER; + goto _OVER; } code = mndDropSnode(pMnode, pReq, pObj); if (code == 0) code = TSDB_CODE_MND_ACTION_IN_PROGRESS; -DROP_SNODE_OVER: +_OVER: if (code != 0 && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { mError("snode:%d, failed to drop since %s", dropReq.dnodeId, terrstr()); }