From b19987ac3a155f23293021a186fb81e052db4303 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Sat, 12 Feb 2022 14:21:26 +0800 Subject: [PATCH] serialize mnode msg --- source/dnode/mnode/impl/src/mndDnode.c | 32 +++--- source/dnode/mnode/impl/src/mndMnode.c | 109 +++++++++++++------ source/dnode/mnode/impl/test/dnode/dnode.cpp | 11 +- source/dnode/mnode/impl/test/mnode/mnode.cpp | 104 +++++++++++------- 4 files changed, 164 insertions(+), 92 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndDnode.c b/source/dnode/mnode/impl/src/mndDnode.c index 26f4f58f45..e104048741 100644 --- a/source/dnode/mnode/impl/src/mndDnode.c +++ b/source/dnode/mnode/impl/src/mndDnode.c @@ -541,13 +541,13 @@ static int32_t mndDropDnode(SMnode *pMnode, SMnodeMsg *pReq, SDnodeObj *pDnode) } static int32_t mndProcessDropDnodeReq(SMnodeMsg *pReq) { - SMnode *pMnode = pReq->pMnode; - int32_t code = -1; - SUserObj *pUser = NULL; - SDnodeObj *pDnode = NULL; - SDropDnodeReq dropReq = {0}; + SMnode *pMnode = pReq->pMnode; + int32_t code = -1; + SUserObj *pUser = NULL; + SDnodeObj *pDnode = NULL; + SMDropMnodeReq dropReq = {0}; - if (tDeserializeSDropDnodeReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &dropReq) != 0) { + if (tDeserializeSMCreateDropMnodeReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &dropReq) != 0) { terrno = TSDB_CODE_INVALID_MSG; goto DROP_DNODE_OVER; } @@ -591,14 +591,18 @@ DROP_DNODE_OVER: } static int32_t mndProcessConfigDnodeReq(SMnodeMsg *pReq) { - SMnode *pMnode = pReq->pMnode; - SMCfgDnodeReq *pCfg = pReq->rpcMsg.pCont; - pCfg->dnodeId = htonl(pCfg->dnodeId); + SMnode *pMnode = pReq->pMnode; - SDnodeObj *pDnode = mndAcquireDnode(pMnode, pCfg->dnodeId); + SMCfgDnodeReq cfgReq = {0}; + if (tDeserializeSMCfgDnodeReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &cfgReq) != 0) { + terrno = TSDB_CODE_INVALID_MSG; + return -1; + } + + SDnodeObj *pDnode = mndAcquireDnode(pMnode, cfgReq.dnodeId); if (pDnode == NULL) { terrno = TSDB_CODE_MND_DNODE_NOT_EXIST; - mError("dnode:%d, failed to config since %s ", pCfg->dnodeId, terrstr()); + mError("dnode:%d, failed to config since %s ", cfgReq.dnodeId, terrstr()); return -1; } @@ -606,15 +610,15 @@ static int32_t mndProcessConfigDnodeReq(SMnodeMsg *pReq) { mndReleaseDnode(pMnode, pDnode); SDCfgDnodeReq *pCfgDnode = rpcMallocCont(sizeof(SDCfgDnodeReq)); - pCfgDnode->dnodeId = htonl(pCfg->dnodeId); - memcpy(pCfgDnode->config, pCfg->config, TSDB_DNODE_CONFIG_LEN); + pCfgDnode->dnodeId = htonl(cfgReq.dnodeId); + memcpy(pCfgDnode->config, cfgReq.config, TSDB_DNODE_CONFIG_LEN); SRpcMsg rpcMsg = {.msgType = TDMT_DND_CONFIG_DNODE, .pCont = pCfgDnode, .contLen = sizeof(SDCfgDnodeReq), .ahandle = pReq->rpcMsg.ahandle}; - mInfo("dnode:%d, app:%p config:%s req send to dnode", pCfg->dnodeId, rpcMsg.ahandle, pCfg->config); + mInfo("dnode:%d, app:%p config:%s req send to dnode", cfgReq.dnodeId, rpcMsg.ahandle, cfgReq.config); mndSendReqToDnode(pMnode, &epSet, &rpcMsg); return 0; diff --git a/source/dnode/mnode/impl/src/mndMnode.c b/source/dnode/mnode/impl/src/mndMnode.c index 1e0164a572..57b2e22e77 100644 --- a/source/dnode/mnode/impl/src/mndMnode.c +++ b/source/dnode/mnode/impl/src/mndMnode.c @@ -15,9 +15,11 @@ #define _DEFAULT_SOURCE #include "mndMnode.h" +#include "mndAuth.h" #include "mndDnode.h" #include "mndShow.h" #include "mndTrans.h" +#include "mndUser.h" #define TSDB_MNODE_VER_NUMBER 1 #define TSDB_MNODE_RESERVE_SIZE 64 @@ -379,40 +381,58 @@ CREATE_MNODE_OVER: } static int32_t mndProcessCreateMnodeReq(SMnodeMsg *pReq) { - SMnode *pMnode = pReq->pMnode; - SMCreateMnodeReq *pCreate = pReq->rpcMsg.pCont; + SMnode *pMnode = pReq->pMnode; + int32_t code = -1; + SMnodeObj *pObj = NULL; + SDnodeObj *pDnode = NULL; + SUserObj *pUser = NULL; + SMCreateMnodeReq createReq = {0}; - pCreate->dnodeId = htonl(pCreate->dnodeId); + if (tDeserializeSMCreateDropMnodeReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &createReq) != 0) { + terrno = TSDB_CODE_INVALID_MSG; + goto CREATE_MNODE_OVER; + } - mDebug("mnode:%d, start to create", pCreate->dnodeId); + mDebug("mnode:%d, start to create", createReq.dnodeId); - SMnodeObj *pObj = mndAcquireMnode(pMnode, pCreate->dnodeId); + pObj = mndAcquireMnode(pMnode, createReq.dnodeId); if (pObj != NULL) { - mndReleaseMnode(pMnode, pObj); - mError("mnode:%d, mnode already exist", pObj->id); terrno = TSDB_CODE_MND_MNODE_ALREADY_EXIST; - return -1; + goto CREATE_MNODE_OVER; } else if (terrno != TSDB_CODE_MND_MNODE_NOT_EXIST) { - mError("qnode:%d, failed to create mnode since %s", pCreate->dnodeId, terrstr()); - return -1; + goto CREATE_MNODE_OVER; } - SDnodeObj *pDnode = mndAcquireDnode(pMnode, pCreate->dnodeId); + pDnode = mndAcquireDnode(pMnode, createReq.dnodeId); if (pDnode == NULL) { - mError("mnode:%d, dnode not exist", pCreate->dnodeId); terrno = TSDB_CODE_MND_DNODE_NOT_EXIST; + goto CREATE_MNODE_OVER; + } + + pUser = mndAcquireUser(pMnode, pReq->user); + if (pUser == NULL) { + terrno = TSDB_CODE_MND_NO_USER_FROM_CONN; + goto CREATE_MNODE_OVER; + } + + if (mndCheckDropNodeAuth(pUser)) { + goto CREATE_MNODE_OVER; + } + + code = mndCreateMnode(pMnode, pReq, pDnode, &createReq); + if (code == 0) code = TSDB_CODE_MND_ACTION_IN_PROGRESS; + +CREATE_MNODE_OVER: + if (code != 0 && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { + mError("mnode:%d, failed to create since %s", createReq.dnodeId, terrstr()); return -1; } - int32_t code = mndCreateMnode(pMnode, pReq, pDnode, pCreate); + mndReleaseMnode(pMnode, pObj); mndReleaseDnode(pMnode, pDnode); + mndReleaseUser(pMnode, pUser); - if (code != 0) { - mError("mnode:%d, failed to create since %s", pCreate->dnodeId, terrstr()); - return -1; - } - - return TSDB_CODE_MND_ACTION_IN_PROGRESS; + return code; } static int32_t mndSetDropMnodeRedoLogs(SMnode *pMnode, STrans *pTrans, SMnodeObj *pObj) { @@ -534,32 +554,51 @@ DROP_MNODE_OVER: } static int32_t mndProcessDropMnodeReq(SMnodeMsg *pReq) { - SMnode *pMnode = pReq->pMnode; - SMDropMnodeReq *pDrop = pReq->rpcMsg.pCont; - pDrop->dnodeId = htonl(pDrop->dnodeId); + SMnode *pMnode = pReq->pMnode; + int32_t code = -1; + SUserObj *pUser = NULL; + SMnodeObj *pObj = NULL; + SMDropMnodeReq dropReq = {0}; - mDebug("mnode:%d, start to drop", pDrop->dnodeId); + if (tDeserializeSMCreateDropMnodeReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &dropReq) != 0) { + terrno = TSDB_CODE_INVALID_MSG; + goto DROP_MNODE_OVER; + } - if (pDrop->dnodeId <= 0) { + mDebug("mnode:%d, start to drop", dropReq.dnodeId); + + if (dropReq.dnodeId <= 0) { terrno = TSDB_CODE_SDB_APP_ERROR; - mError("mnode:%d, failed to drop since %s", pDrop->dnodeId, terrstr()); - return -1; + goto DROP_MNODE_OVER; } - SMnodeObj *pObj = mndAcquireMnode(pMnode, pDrop->dnodeId); + pObj = mndAcquireMnode(pMnode, dropReq.dnodeId); if (pObj == NULL) { - mError("mnode:%d, not exist", pDrop->dnodeId); - return -1; + goto DROP_MNODE_OVER; } - int32_t code = mndDropMnode(pMnode, pReq, pObj); - if (code != 0) { - mError("mnode:%d, failed to drop since %s", pMnode->dnodeId, terrstr()); - return -1; + pUser = mndAcquireUser(pMnode, pReq->user); + if (pUser == NULL) { + terrno = TSDB_CODE_MND_NO_USER_FROM_CONN; + goto DROP_MNODE_OVER; } - sdbRelease(pMnode->pSdb, pObj); - return TSDB_CODE_MND_ACTION_IN_PROGRESS; + if (mndCheckCreateNodeAuth(pUser)) { + goto DROP_MNODE_OVER; + } + + code = mndDropMnode(pMnode, pReq, pObj); + if (code == 0) code = TSDB_CODE_MND_ACTION_IN_PROGRESS; + +DROP_MNODE_OVER: + if (code != 0 && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { + mError("mnode:%d, failed to drop since %s", dropReq.dnodeId, terrstr()); + } + + mndReleaseMnode(pMnode, pObj); + mndReleaseUser(pMnode, pUser); + + return code; } static int32_t mndProcessCreateMnodeRsp(SMnodeMsg *pRsp) { diff --git a/source/dnode/mnode/impl/test/dnode/dnode.cpp b/source/dnode/mnode/impl/test/dnode/dnode.cpp index 9af0aad1ae..f575556345 100644 --- a/source/dnode/mnode/impl/test/dnode/dnode.cpp +++ b/source/dnode/mnode/impl/test/dnode/dnode.cpp @@ -75,11 +75,14 @@ TEST_F(MndTestDnode, 01_ShowDnode) { } TEST_F(MndTestDnode, 02_ConfigDnode) { - int32_t contLen = sizeof(SMCfgDnodeReq); + SMCfgDnodeReq cfgReq = {0}; + cfgReq.dnodeId = 1; + strcpy(cfgReq.config, "ddebugflag"); + strcpy(cfgReq.value, "131"); - SMCfgDnodeReq* pReq = (SMCfgDnodeReq*)rpcMallocCont(contLen); - pReq->dnodeId = htonl(1); - strcpy(pReq->config, "ddebugflag 131"); + int32_t contLen = tSerializeSMCfgDnodeReq(NULL, 0, &cfgReq); + void* pReq = rpcMallocCont(contLen); + tSerializeSMCfgDnodeReq(pReq, contLen, &cfgReq); SRpcMsg* pRsp = test.SendReq(TDMT_MND_CONFIG_DNODE, pReq, contLen); ASSERT_NE(pRsp, nullptr); diff --git a/source/dnode/mnode/impl/test/mnode/mnode.cpp b/source/dnode/mnode/impl/test/mnode/mnode.cpp index 208c40e66d..fd8c83e26a 100644 --- a/source/dnode/mnode/impl/test/mnode/mnode.cpp +++ b/source/dnode/mnode/impl/test/mnode/mnode.cpp @@ -60,10 +60,12 @@ TEST_F(MndTestMnode, 01_ShowDnode) { TEST_F(MndTestMnode, 02_Create_Mnode_Invalid_Id) { { - int32_t contLen = sizeof(SMCreateMnodeReq); + SMCreateMnodeReq createReq = {0}; + createReq.dnodeId = 1; - SMCreateMnodeReq* pReq = (SMCreateMnodeReq*)rpcMallocCont(contLen); - pReq->dnodeId = htonl(1); + int32_t contLen = tSerializeSMCreateDropMnodeReq(NULL, 0, &createReq); + void* pReq = rpcMallocCont(contLen); + tSerializeSMCreateDropMnodeReq(pReq, contLen, &createReq); SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_MNODE, pReq, contLen); ASSERT_NE(pRsp, nullptr); @@ -73,10 +75,12 @@ TEST_F(MndTestMnode, 02_Create_Mnode_Invalid_Id) { TEST_F(MndTestMnode, 03_Create_Mnode_Invalid_Id) { { - int32_t contLen = sizeof(SMCreateMnodeReq); + SMCreateMnodeReq createReq = {0}; + createReq.dnodeId = 2; - SMCreateMnodeReq* pReq = (SMCreateMnodeReq*)rpcMallocCont(contLen); - pReq->dnodeId = htonl(2); + int32_t contLen = tSerializeSMCreateDropMnodeReq(NULL, 0, &createReq); + void* pReq = rpcMallocCont(contLen); + tSerializeSMCreateDropMnodeReq(pReq, contLen, &createReq); SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_MNODE, pReq, contLen); ASSERT_NE(pRsp, nullptr); @@ -107,10 +111,12 @@ TEST_F(MndTestMnode, 04_Create_Mnode) { { // create mnode - int32_t contLen = sizeof(SMCreateMnodeReq); + SMCreateMnodeReq createReq = {0}; + createReq.dnodeId = 2; - SMCreateMnodeReq* pReq = (SMCreateMnodeReq*)rpcMallocCont(contLen); - pReq->dnodeId = htonl(2); + int32_t contLen = tSerializeSMCreateDropMnodeReq(NULL, 0, &createReq); + void* pReq = rpcMallocCont(contLen); + tSerializeSMCreateDropMnodeReq(pReq, contLen, &createReq); SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_MNODE, pReq, contLen); ASSERT_NE(pRsp, nullptr); @@ -134,10 +140,12 @@ TEST_F(MndTestMnode, 04_Create_Mnode) { { // drop mnode - int32_t contLen = sizeof(SMDropMnodeReq); + SMDropMnodeReq dropReq = {0}; + dropReq.dnodeId = 2; - SMDropMnodeReq* pReq = (SMDropMnodeReq*)rpcMallocCont(contLen); - pReq->dnodeId = htonl(2); + int32_t contLen = tSerializeSMCreateDropMnodeReq(NULL, 0, &dropReq); + void* pReq = rpcMallocCont(contLen); + tSerializeSMCreateDropMnodeReq(pReq, contLen, &dropReq); SRpcMsg* pRsp = test.SendReq(TDMT_MND_DROP_MNODE, pReq, contLen); ASSERT_NE(pRsp, nullptr); @@ -156,10 +164,12 @@ TEST_F(MndTestMnode, 04_Create_Mnode) { { // drop mnode - int32_t contLen = sizeof(SMDropMnodeReq); + SMDropMnodeReq dropReq = {0}; + dropReq.dnodeId = 2; - SMDropMnodeReq* pReq = (SMDropMnodeReq*)rpcMallocCont(contLen); - pReq->dnodeId = htonl(2); + int32_t contLen = tSerializeSMCreateDropMnodeReq(NULL, 0, &dropReq); + void* pReq = rpcMallocCont(contLen); + tSerializeSMCreateDropMnodeReq(pReq, contLen, &dropReq); SRpcMsg* pRsp = test.SendReq(TDMT_MND_DROP_MNODE, pReq, contLen); ASSERT_NE(pRsp, nullptr); @@ -170,10 +180,12 @@ TEST_F(MndTestMnode, 04_Create_Mnode) { TEST_F(MndTestMnode, 03_Create_Mnode_Rollback) { { // send message first, then dnode2 crash, result is returned, and rollback is started - int32_t contLen = sizeof(SMCreateMnodeReq); + SMCreateMnodeReq createReq = {0}; + createReq.dnodeId = 2; - SMCreateMnodeReq* pReq = (SMCreateMnodeReq*)rpcMallocCont(contLen); - pReq->dnodeId = htonl(2); + int32_t contLen = tSerializeSMCreateDropMnodeReq(NULL, 0, &createReq); + void* pReq = rpcMallocCont(contLen); + tSerializeSMCreateDropMnodeReq(pReq, contLen, &createReq); server2.Stop(); SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_MNODE, pReq, contLen); @@ -183,10 +195,12 @@ TEST_F(MndTestMnode, 03_Create_Mnode_Rollback) { { // continue send message, mnode is creating - int32_t contLen = sizeof(SMCreateMnodeReq); + SMCreateMnodeReq createReq = {0}; + createReq.dnodeId = 2; - SMCreateMnodeReq* pReq = (SMCreateMnodeReq*)rpcMallocCont(contLen); - pReq->dnodeId = htonl(2); + int32_t contLen = tSerializeSMCreateDropMnodeReq(NULL, 0, &createReq); + void* pReq = rpcMallocCont(contLen); + tSerializeSMCreateDropMnodeReq(pReq, contLen, &createReq); SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_MNODE, pReq, contLen); ASSERT_NE(pRsp, nullptr); @@ -195,10 +209,12 @@ TEST_F(MndTestMnode, 03_Create_Mnode_Rollback) { { // continue send message, mnode is creating - int32_t contLen = sizeof(SMDropMnodeReq); + SMDropMnodeReq dropReq = {0}; + dropReq.dnodeId = 2; - SMDropMnodeReq* pReq = (SMDropMnodeReq*)rpcMallocCont(contLen); - pReq->dnodeId = htonl(2); + int32_t contLen = tSerializeSMCreateDropMnodeReq(NULL, 0, &dropReq); + void* pReq = rpcMallocCont(contLen); + tSerializeSMCreateDropMnodeReq(pReq, contLen, &dropReq); SRpcMsg* pRsp = test.SendReq(TDMT_MND_DROP_MNODE, pReq, contLen); ASSERT_NE(pRsp, nullptr); @@ -214,10 +230,12 @@ TEST_F(MndTestMnode, 03_Create_Mnode_Rollback) { int32_t retryMax = 20; for (retry = 0; retry < retryMax; retry++) { - int32_t contLen = sizeof(SMCreateMnodeReq); + SMCreateMnodeReq createReq = {0}; + createReq.dnodeId = 2; - SMCreateMnodeReq* pReq = (SMCreateMnodeReq*)rpcMallocCont(contLen); - pReq->dnodeId = htonl(2); + int32_t contLen = tSerializeSMCreateDropMnodeReq(NULL, 0, &createReq); + void* pReq = rpcMallocCont(contLen); + tSerializeSMCreateDropMnodeReq(pReq, contLen, &createReq); SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_MNODE, pReq, contLen); ASSERT_NE(pRsp, nullptr); @@ -232,10 +250,12 @@ TEST_F(MndTestMnode, 03_Create_Mnode_Rollback) { TEST_F(MndTestMnode, 04_Drop_Mnode_Rollback) { { // send message first, then dnode2 crash, result is returned, and rollback is started - int32_t contLen = sizeof(SMDropMnodeReq); + SMDropMnodeReq dropReq = {0}; + dropReq.dnodeId = 2; - SMDropMnodeReq* pReq = (SMDropMnodeReq*)rpcMallocCont(contLen); - pReq->dnodeId = htonl(2); + int32_t contLen = tSerializeSMCreateDropMnodeReq(NULL, 0, &dropReq); + void* pReq = rpcMallocCont(contLen); + tSerializeSMCreateDropMnodeReq(pReq, contLen, &dropReq); server2.Stop(); SRpcMsg* pRsp = test.SendReq(TDMT_MND_DROP_MNODE, pReq, contLen); @@ -245,10 +265,12 @@ TEST_F(MndTestMnode, 04_Drop_Mnode_Rollback) { { // continue send message, mnode is dropping - int32_t contLen = sizeof(SMCreateMnodeReq); + SMCreateMnodeReq createReq = {0}; + createReq.dnodeId = 2; - SMCreateMnodeReq* pReq = (SMCreateMnodeReq*)rpcMallocCont(contLen); - pReq->dnodeId = htonl(2); + int32_t contLen = tSerializeSMCreateDropMnodeReq(NULL, 0, &createReq); + void* pReq = rpcMallocCont(contLen); + tSerializeSMCreateDropMnodeReq(pReq, contLen, &createReq); SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_MNODE, pReq, contLen); ASSERT_NE(pRsp, nullptr); @@ -257,10 +279,12 @@ TEST_F(MndTestMnode, 04_Drop_Mnode_Rollback) { { // continue send message, mnode is dropping - int32_t contLen = sizeof(SMDropMnodeReq); + SMDropMnodeReq dropReq = {0}; + dropReq.dnodeId = 2; - SMDropMnodeReq* pReq = (SMDropMnodeReq*)rpcMallocCont(contLen); - pReq->dnodeId = htonl(2); + int32_t contLen = tSerializeSMCreateDropMnodeReq(NULL, 0, &dropReq); + void* pReq = rpcMallocCont(contLen); + tSerializeSMCreateDropMnodeReq(pReq, contLen, &dropReq); SRpcMsg* pRsp = test.SendReq(TDMT_MND_DROP_MNODE, pReq, contLen); ASSERT_NE(pRsp, nullptr); @@ -276,10 +300,12 @@ TEST_F(MndTestMnode, 04_Drop_Mnode_Rollback) { int32_t retryMax = 20; for (retry = 0; retry < retryMax; retry++) { - int32_t contLen = sizeof(SMCreateMnodeReq); + SMCreateMnodeReq createReq = {0}; + createReq.dnodeId = 2; - SMCreateMnodeReq* pReq = (SMCreateMnodeReq*)rpcMallocCont(contLen); - pReq->dnodeId = htonl(2); + int32_t contLen = tSerializeSMCreateDropMnodeReq(NULL, 0, &createReq); + void* pReq = rpcMallocCont(contLen); + tSerializeSMCreateDropMnodeReq(pReq, contLen, &createReq); SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_MNODE, pReq, contLen); ASSERT_NE(pRsp, nullptr);