From f850ccbfd4c26d7626ae8eec9eb84c1360887a2e Mon Sep 17 00:00:00 2001 From: kailixu Date: Thu, 7 Dec 2023 16:00:13 +0800 Subject: [PATCH] feat: support uniq grant --- source/dnode/mgmt/mgmt_mnode/src/mmHandle.c | 1 + source/dnode/mgmt/node_mgmt/src/dmTransport.c | 3 +- source/dnode/mnode/impl/src/mndDnode.c | 35 +++++++++++-------- 3 files changed, 23 insertions(+), 16 deletions(-) diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c index 737a0338ef..45bd3d787b 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c @@ -109,6 +109,7 @@ SArray *mmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_VNODE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_VNODE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_DND_CONFIG_DNODE_RSP, mmPutMsgToReadQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_GET_MACHINE_RSP, mmPutMsgToReadQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_DND_ALTER_MNODE_TYPE_RSP, mmPutMsgToReadQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_DND_ALTER_VNODE_TYPE_RSP, mmPutMsgToReadQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_DND_CHECK_VNODE_LEARNER_CATCHUP_RSP, mmPutMsgToReadQueue, 0) == NULL) goto _OVER; diff --git a/source/dnode/mgmt/node_mgmt/src/dmTransport.c b/source/dnode/mgmt/node_mgmt/src/dmTransport.c index ad5ca2cecf..804834afba 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmTransport.c +++ b/source/dnode/mgmt/node_mgmt/src/dmTransport.c @@ -290,8 +290,7 @@ static inline int32_t dmSendReq(const SEpSet *pEpSet, SRpcMsg *pMsg) { dError("failed to send rpc msg:%s since %s, handle:%p", TMSG_INFO(pMsg->msgType), terrstr(), pMsg->info.handle); return -1; } else { - rpcSendRequest(pDnode->trans.clientRpc, pEpSet, pMsg, NULL); - return 0; + return rpcSendRequest(pDnode->trans.clientRpc, pEpSet, pMsg, NULL); } } diff --git a/source/dnode/mnode/impl/src/mndDnode.c b/source/dnode/mnode/impl/src/mndDnode.c index e51dbde69b..856d2a1e1e 100644 --- a/source/dnode/mnode/impl/src/mndDnode.c +++ b/source/dnode/mnode/impl/src/mndDnode.c @@ -146,6 +146,7 @@ static int32_t mndCreateDefaultDnode(SMnode *pMnode) { char *machineId = grantGetMachineId(); if (machineId) { memcpy(dnodeObj.machineId, machineId, TSDB_MACHINE_ID_LEN); + taosMemoryFreeClear(machineId); } pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_GLOBAL, NULL, "create-dnode"); @@ -743,15 +744,10 @@ _OVER: static int32_t mndSendGetMachineToDnode(SMnode *pMnode, SDnodeObj *pObj, SMachineInfo *pInfo) { SRpcMsg rpcMsg = {.pCont = NULL, .contLen = 0, .msgType = TDMT_MND_GET_MACHINE, .info.ahandle = pInfo}; - - mDebug("send get machine msg to dnode:%d %s", pObj->id, pObj->ep); - - SEpSet epSet = {.numOfEps = 1}; + SEpSet epSet = {.numOfEps = 1}; strncpy(epSet.eps[0].fqdn, pObj->fqdn, TSDB_FQDN_LEN); epSet.eps[0].port = pObj->port; - tmsgSendReq(&epSet, &rpcMsg); - - return TSDB_CODE_SUCCESS; + return tmsgSendReq(&epSet, &rpcMsg); } static int32_t mndCreateDnode(SMnode *pMnode, SRpcMsg *pReq, SCreateDnodeReq *pCreate) { @@ -768,13 +764,22 @@ static int32_t mndCreateDnode(SMnode *pMnode, SRpcMsg *pReq, SCreateDnodeReq *pC tstrncpy(dnodeObj.fqdn, pCreate->fqdn, TSDB_FQDN_LEN); snprintf(dnodeObj.ep, TSDB_EP_LEN - 1, "%s:%u", pCreate->fqdn, pCreate->port); if (!(pInfo = taosMemoryCalloc(1, sizeof(*pInfo)))) { - code = TSDB_CODE_OUT_OF_MEMORY; + terrno = TSDB_CODE_OUT_OF_MEMORY; goto _OVER; } tsem_init(&pInfo->sem, 0, 0); - mndSendGetMachineToDnode(pMnode, &dnodeObj, pInfo); - tsem_wait(&pInfo->sem); - memcpy(dnodeObj.machineId, pInfo->machineId, TSDB_MACHINE_ID_LEN); + if ((terrno = mndSendGetMachineToDnode(pMnode, &dnodeObj, pInfo)) != 0) { + goto _OVER; + } + if (tsem_timewait(&pInfo->sem, 1000) < 0) { + terrno = TSDB_CODE_DNODE_OFFLINE; + goto _OVER; + } + if (strlen(pInfo->machineId) == TSDB_MACHINE_ID_LEN) { + memcpy(dnodeObj.machineId, pInfo->machineId, TSDB_MACHINE_ID_LEN); + } else { + mWarn("Invalid machineId:%s to create dnode:%s", pInfo->machineId, dnodeObj.fqdn); + } pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_GLOBAL, pReq, "create-dnode"); if (pTrans == NULL) goto _OVER; mInfo("trans:%d, used to create dnode:%s", pTrans->id, dnodeObj.ep); @@ -792,7 +797,7 @@ static int32_t mndCreateDnode(SMnode *pMnode, SRpcMsg *pReq, SCreateDnodeReq *pC _OVER: mndTransDrop(pTrans); sdbFreeRaw(pRaw); - if(pInfo) { + if (pInfo) { tsem_destroy(&pInfo->sem); taosMemoryFree(pInfo); } @@ -948,8 +953,10 @@ static int32_t mndProcessCreateDnodeReq(SRpcMsg *pReq) { } code = mndCreateDnode(pMnode, pReq, &createReq); - if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS; - tsGrantHBInterval = 5; + if (code == 0) { + code = TSDB_CODE_ACTION_IN_PROGRESS; + tsGrantHBInterval = 5; + } char obj[200] = {0}; sprintf(obj, "%s:%d", createReq.fqdn, createReq.port);