feat: support uniq grant

This commit is contained in:
kailixu 2023-12-07 16:00:13 +08:00
parent 4adce7ed52
commit f850ccbfd4
3 changed files with 23 additions and 16 deletions

View File

@ -109,6 +109,7 @@ SArray *mmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_VNODE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_VNODE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_DND_CONFIG_DNODE_RSP, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_GET_MACHINE_RSP, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_DND_ALTER_MNODE_TYPE_RSP, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_DND_ALTER_VNODE_TYPE_RSP, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_DND_CHECK_VNODE_LEARNER_CATCHUP_RSP, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;

View File

@ -290,8 +290,7 @@ static inline int32_t dmSendReq(const SEpSet *pEpSet, SRpcMsg *pMsg) {
dError("failed to send rpc msg:%s since %s, handle:%p", TMSG_INFO(pMsg->msgType), terrstr(), pMsg->info.handle);
return -1;
} else {
rpcSendRequest(pDnode->trans.clientRpc, pEpSet, pMsg, NULL);
return 0;
return rpcSendRequest(pDnode->trans.clientRpc, pEpSet, pMsg, NULL);
}
}

View File

@ -146,6 +146,7 @@ static int32_t mndCreateDefaultDnode(SMnode *pMnode) {
char *machineId = grantGetMachineId();
if (machineId) {
memcpy(dnodeObj.machineId, machineId, TSDB_MACHINE_ID_LEN);
taosMemoryFreeClear(machineId);
}
pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_GLOBAL, NULL, "create-dnode");
@ -743,15 +744,10 @@ _OVER:
static int32_t mndSendGetMachineToDnode(SMnode *pMnode, SDnodeObj *pObj, SMachineInfo *pInfo) {
SRpcMsg rpcMsg = {.pCont = NULL, .contLen = 0, .msgType = TDMT_MND_GET_MACHINE, .info.ahandle = pInfo};
mDebug("send get machine msg to dnode:%d %s", pObj->id, pObj->ep);
SEpSet epSet = {.numOfEps = 1};
SEpSet epSet = {.numOfEps = 1};
strncpy(epSet.eps[0].fqdn, pObj->fqdn, TSDB_FQDN_LEN);
epSet.eps[0].port = pObj->port;
tmsgSendReq(&epSet, &rpcMsg);
return TSDB_CODE_SUCCESS;
return tmsgSendReq(&epSet, &rpcMsg);
}
static int32_t mndCreateDnode(SMnode *pMnode, SRpcMsg *pReq, SCreateDnodeReq *pCreate) {
@ -768,13 +764,22 @@ static int32_t mndCreateDnode(SMnode *pMnode, SRpcMsg *pReq, SCreateDnodeReq *pC
tstrncpy(dnodeObj.fqdn, pCreate->fqdn, TSDB_FQDN_LEN);
snprintf(dnodeObj.ep, TSDB_EP_LEN - 1, "%s:%u", pCreate->fqdn, pCreate->port);
if (!(pInfo = taosMemoryCalloc(1, sizeof(*pInfo)))) {
code = TSDB_CODE_OUT_OF_MEMORY;
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _OVER;
}
tsem_init(&pInfo->sem, 0, 0);
mndSendGetMachineToDnode(pMnode, &dnodeObj, pInfo);
tsem_wait(&pInfo->sem);
memcpy(dnodeObj.machineId, pInfo->machineId, TSDB_MACHINE_ID_LEN);
if ((terrno = mndSendGetMachineToDnode(pMnode, &dnodeObj, pInfo)) != 0) {
goto _OVER;
}
if (tsem_timewait(&pInfo->sem, 1000) < 0) {
terrno = TSDB_CODE_DNODE_OFFLINE;
goto _OVER;
}
if (strlen(pInfo->machineId) == TSDB_MACHINE_ID_LEN) {
memcpy(dnodeObj.machineId, pInfo->machineId, TSDB_MACHINE_ID_LEN);
} else {
mWarn("Invalid machineId:%s to create dnode:%s", pInfo->machineId, dnodeObj.fqdn);
}
pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_GLOBAL, pReq, "create-dnode");
if (pTrans == NULL) goto _OVER;
mInfo("trans:%d, used to create dnode:%s", pTrans->id, dnodeObj.ep);
@ -792,7 +797,7 @@ static int32_t mndCreateDnode(SMnode *pMnode, SRpcMsg *pReq, SCreateDnodeReq *pC
_OVER:
mndTransDrop(pTrans);
sdbFreeRaw(pRaw);
if(pInfo) {
if (pInfo) {
tsem_destroy(&pInfo->sem);
taosMemoryFree(pInfo);
}
@ -948,8 +953,10 @@ static int32_t mndProcessCreateDnodeReq(SRpcMsg *pReq) {
}
code = mndCreateDnode(pMnode, pReq, &createReq);
if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
tsGrantHBInterval = 5;
if (code == 0) {
code = TSDB_CODE_ACTION_IN_PROGRESS;
tsGrantHBInterval = 5;
}
char obj[200] = {0};
sprintf(obj, "%s:%d", createReq.fqdn, createReq.port);