TD-10431 refact drop mnode msg
This commit is contained in:
parent
ad0995c207
commit
5335c08449
|
@ -202,52 +202,6 @@ void mndGetMnodeEpSet(SMnode *pMnode, SEpSet *pEpSet) {
|
|||
}
|
||||
}
|
||||
|
||||
static SCreateMnodeInMsg *mndBuildCreateMnodeMsg(SMnode *pMnode, SDnodeObj *pDnode, SMnodeObj *pObj) {
|
||||
SCreateMnodeInMsg *pCreate = calloc(1, sizeof(SCreateMnodeInMsg));
|
||||
if (pCreate == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
pCreate->dnodeId = htonl(pObj->id);
|
||||
|
||||
int32_t numOfReplicas = 0;
|
||||
SSdb *pSdb = pMnode->pSdb;
|
||||
void *pIter = NULL;
|
||||
|
||||
while (numOfReplicas < TSDB_MAX_REPLICA - 1) {
|
||||
SMnodeObj *pObj = NULL;
|
||||
pIter = sdbFetch(pSdb, SDB_MNODE, pIter, (void **)&pObj);
|
||||
if (pIter == NULL) break;
|
||||
if (pObj->pDnode == NULL) break;
|
||||
|
||||
SReplica *pReplica = &pCreate->replicas[numOfReplicas];
|
||||
pReplica->id = htonl(pObj->id);
|
||||
pReplica->port = htons(pObj->pDnode->port);
|
||||
memcpy(pReplica->fqdn, pObj->pDnode->fqdn, TSDB_FQDN_LEN);
|
||||
numOfReplicas++;
|
||||
}
|
||||
|
||||
numOfReplicas++;
|
||||
SReplica *pReplica = &pCreate->replicas[numOfReplicas];
|
||||
pReplica->id = htonl(pObj->id);
|
||||
pReplica->port = htons(pDnode->port);
|
||||
memcpy(pReplica->fqdn, pDnode->fqdn, TSDB_FQDN_LEN);
|
||||
|
||||
return pCreate;
|
||||
}
|
||||
|
||||
static SDropMnodeInMsg *mndBuildDropMnodeMsg(SMnode *pMnode, SMnodeObj *pObj) {
|
||||
SDropMnodeInMsg *pDrop = calloc(1, sizeof(SDropMnodeInMsg));
|
||||
if (pDrop == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
pDrop->dnodeId = htonl(pObj->id);
|
||||
return pDrop;
|
||||
}
|
||||
|
||||
static int32_t mndSetCreateMnodeRedoLogs(SMnode *pMnode, STrans *pTrans, SMnodeObj *pObj) {
|
||||
SSdbRaw *pRedoRaw = mndMnodeActionEncode(pObj);
|
||||
if (pRedoRaw == NULL) return -1;
|
||||
|
@ -272,17 +226,73 @@ static int32_t mndSetCreateMnodeCommitLogs(SMnode *pMnode, STrans *pTrans, SMnod
|
|||
return 0;
|
||||
}
|
||||
|
||||
static int32_t mndSetCreateMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SMnodeObj *pObj) {
|
||||
static int32_t mndSetCreateMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDnodeObj *pDnode, SMnodeObj *pObj) {
|
||||
SSdb *pSdb = pMnode->pSdb;
|
||||
void *pIter = NULL;
|
||||
int32_t numOfReplicas = 0;
|
||||
|
||||
SCreateMnodeInMsg createMsg = {0};
|
||||
while (1) {
|
||||
SMnodeObj *pMObj = NULL;
|
||||
pIter = sdbFetch(pSdb, SDB_MNODE, pIter, (void **)&pMObj);
|
||||
if (pIter == NULL) break;
|
||||
|
||||
SReplica *pReplica = &createMsg.replicas[numOfReplicas];
|
||||
pReplica->id = htonl(pMObj->id);
|
||||
pReplica->port = htons(pMObj->pDnode->port);
|
||||
memcpy(pReplica->fqdn, pMObj->pDnode->fqdn, TSDB_FQDN_LEN);
|
||||
numOfReplicas++;
|
||||
|
||||
sdbRelease(pSdb, pMObj);
|
||||
}
|
||||
|
||||
SReplica *pReplica = &createMsg.replicas[numOfReplicas];
|
||||
pReplica->id = htonl(pDnode->id);
|
||||
pReplica->port = htons(pDnode->port);
|
||||
memcpy(pReplica->fqdn, pDnode->fqdn, TSDB_FQDN_LEN);
|
||||
numOfReplicas++;
|
||||
|
||||
while (1) {
|
||||
SMnodeObj *pMObj = NULL;
|
||||
pIter = sdbFetch(pSdb, SDB_MNODE, pIter, (void **)&pMObj);
|
||||
if (pIter == NULL) break;
|
||||
|
||||
STransAction action = {0};
|
||||
|
||||
SDnodeObj *pDnode = mndAcquireDnode(pMnode, pObj->id);
|
||||
if (pDnode == NULL) return -1;
|
||||
SAlterMnodeInMsg *pMsg = malloc(sizeof(SAlterMnodeInMsg));
|
||||
if (pMsg == NULL) {
|
||||
sdbCancelFetch(pSdb, pIter);
|
||||
sdbRelease(pSdb, pMObj);
|
||||
return -1;
|
||||
}
|
||||
memcpy(pMsg, &createMsg, sizeof(SAlterMnodeInMsg));
|
||||
|
||||
pMsg->dnodeId = htonl(pMObj->id);
|
||||
action.epSet = mndGetDnodeEpset(pMObj->pDnode);
|
||||
action.pCont = pMsg;
|
||||
action.contLen = sizeof(SAlterMnodeInMsg);
|
||||
action.msgType = TSDB_MSG_TYPE_ALTER_MNODE_IN;
|
||||
|
||||
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
|
||||
free(pMsg);
|
||||
sdbCancelFetch(pSdb, pIter);
|
||||
sdbRelease(pSdb, pMObj);
|
||||
return -1;
|
||||
}
|
||||
|
||||
sdbRelease(pSdb, pMObj);
|
||||
}
|
||||
|
||||
{
|
||||
STransAction action = {0};
|
||||
action.epSet = mndGetDnodeEpset(pDnode);
|
||||
mndReleaseDnode(pMnode, pDnode);
|
||||
|
||||
SCreateMnodeInMsg *pMsg = mndBuildCreateMnodeMsg(pMnode, pDnode, pObj);
|
||||
SCreateMnodeInMsg *pMsg = malloc(sizeof(SCreateMnodeInMsg));
|
||||
if (pMsg == NULL) return -1;
|
||||
memcpy(pMsg, &createMsg, sizeof(SAlterMnodeInMsg));
|
||||
pMsg->dnodeId = htonl(pObj->id);
|
||||
|
||||
action.epSet = mndGetDnodeEpset(pDnode);
|
||||
action.pCont = pMsg;
|
||||
action.contLen = sizeof(SCreateMnodeInMsg);
|
||||
action.msgType = TSDB_MSG_TYPE_CREATE_MNODE_IN;
|
||||
|
@ -290,11 +300,12 @@ static int32_t mndSetCreateMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SMno
|
|||
free(pMsg);
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int32_t mndCreateMnode(SMnode *pMnode, SMnodeMsg *pMsg, SCreateMnodeMsg *pCreate) {
|
||||
static int32_t mndCreateMnode(SMnode *pMnode, SMnodeMsg *pMsg, SDnodeObj *pDnode, SCreateMnodeMsg *pCreate) {
|
||||
SMnodeObj mnodeObj = {0};
|
||||
mnodeObj.id = sdbGetMaxId(pMnode->pSdb, SDB_MNODE);
|
||||
mnodeObj.createdTime = taosGetTimestampMs();
|
||||
|
@ -318,7 +329,7 @@ static int32_t mndCreateMnode(SMnode *pMnode, SMnodeMsg *pMsg, SCreateMnodeMsg *
|
|||
goto CREATE_MNODE_OVER;
|
||||
}
|
||||
|
||||
if (mndSetCreateMnodeRedoActions(pMnode, pTrans, &mnodeObj) != 0) {
|
||||
if (mndSetCreateMnodeRedoActions(pMnode, pTrans, pDnode, &mnodeObj) != 0) {
|
||||
mError("trans:%d, failed to set redo actions since %s", pTrans->id, terrstr());
|
||||
goto CREATE_MNODE_OVER;
|
||||
}
|
||||
|
@ -343,23 +354,24 @@ static int32_t mndProcessCreateMnodeReq(SMnodeMsg *pMsg) {
|
|||
|
||||
mDebug("mnode:%d, start to create", pCreate->dnodeId);
|
||||
|
||||
SMnodeObj *pObj = mndAcquireMnode(pMnode, pCreate->dnodeId);
|
||||
if (pObj != NULL) {
|
||||
mndReleaseMnode(pMnode, pObj);
|
||||
mError("mnode:%d, mnode already exist", pObj->id);
|
||||
terrno = TSDB_CODE_MND_MNODE_ALREADY_EXIST;
|
||||
return -1;
|
||||
}
|
||||
|
||||
SDnodeObj *pDnode = mndAcquireDnode(pMnode, pCreate->dnodeId);
|
||||
if (pDnode == NULL) {
|
||||
mError("mnode:%d, dnode not exist", pDnode->id);
|
||||
terrno = TSDB_CODE_MND_DNODE_NOT_EXIST;
|
||||
return -1;
|
||||
}
|
||||
|
||||
int32_t code = mndCreateMnode(pMnode, pMsg, pDnode, pCreate);
|
||||
mndReleaseDnode(pMnode, pDnode);
|
||||
|
||||
SMnodeObj *pObj = mndAcquireMnode(pMnode, pCreate->dnodeId);
|
||||
if (pObj != NULL) {
|
||||
mError("mnode:%d, mnode already exist", pObj->id);
|
||||
terrno = TSDB_CODE_MND_MNODE_ALREADY_EXIST;
|
||||
return -1;
|
||||
}
|
||||
|
||||
int32_t code = mndCreateMnode(pMnode, pMsg, pCreate);
|
||||
|
||||
if (code != 0) {
|
||||
mError("mnode:%d, failed to create since %s", pCreate->dnodeId, terrstr());
|
||||
return -1;
|
||||
|
@ -384,24 +396,80 @@ static int32_t mndSetDropMnodeCommitLogs(SMnode *pMnode, STrans *pTrans, SMnodeO
|
|||
return 0;
|
||||
}
|
||||
|
||||
static int32_t mndSetDropMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SMnodeObj *pObj) {
|
||||
static int32_t mndSetDropMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDnodeObj *pDnode, SMnodeObj *pObj) {
|
||||
SSdb *pSdb = pMnode->pSdb;
|
||||
void *pIter = NULL;
|
||||
int32_t numOfReplicas = 0;
|
||||
|
||||
SAlterMnodeInMsg alterMsg = {0};
|
||||
while (1) {
|
||||
SMnodeObj *pMObj = NULL;
|
||||
pIter = sdbFetch(pSdb, SDB_MNODE, pIter, (void **)&pMObj);
|
||||
if (pIter == NULL) break;
|
||||
|
||||
if (pMObj->id != pObj->id) {
|
||||
SReplica *pReplica = &alterMsg.replicas[numOfReplicas];
|
||||
pReplica->id = htonl(pMObj->id);
|
||||
pReplica->port = htons(pMObj->pDnode->port);
|
||||
memcpy(pReplica->fqdn, pMObj->pDnode->fqdn, TSDB_FQDN_LEN);
|
||||
numOfReplicas++;
|
||||
}
|
||||
|
||||
sdbRelease(pSdb, pMObj);
|
||||
}
|
||||
|
||||
while (1) {
|
||||
SMnodeObj *pMObj = NULL;
|
||||
pIter = sdbFetch(pSdb, SDB_MNODE, pIter, (void **)&pMObj);
|
||||
if (pIter == NULL) break;
|
||||
if (pMObj->id != pObj->id) {
|
||||
STransAction action = {0};
|
||||
|
||||
SDnodeObj *pDnode = mndAcquireDnode(pMnode, pObj->id);
|
||||
if (pDnode == NULL) return -1;
|
||||
SAlterMnodeInMsg *pMsg = malloc(sizeof(SAlterMnodeInMsg));
|
||||
if (pMsg == NULL) {
|
||||
sdbCancelFetch(pSdb, pIter);
|
||||
sdbRelease(pSdb, pMObj);
|
||||
return -1;
|
||||
}
|
||||
memcpy(pMsg, &alterMsg, sizeof(SAlterMnodeInMsg));
|
||||
|
||||
pMsg->dnodeId = htonl(pMObj->id);
|
||||
action.epSet = mndGetDnodeEpset(pMObj->pDnode);
|
||||
action.pCont = pMsg;
|
||||
action.contLen = sizeof(SAlterMnodeInMsg);
|
||||
action.msgType = TSDB_MSG_TYPE_ALTER_MNODE_IN;
|
||||
|
||||
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
|
||||
free(pMsg);
|
||||
sdbCancelFetch(pSdb, pIter);
|
||||
sdbRelease(pSdb, pMObj);
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
sdbRelease(pSdb, pMObj);
|
||||
}
|
||||
|
||||
{
|
||||
STransAction action = {0};
|
||||
action.epSet = mndGetDnodeEpset(pDnode);
|
||||
mndReleaseDnode(pMnode, pDnode);
|
||||
|
||||
SDropMnodeInMsg *pMsg = mndBuildDropMnodeMsg(pMnode, pObj);
|
||||
if (pMsg == NULL) return -1;
|
||||
SDropMnodeInMsg *pMsg = malloc(sizeof(SDropMnodeInMsg));
|
||||
if (pMsg == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
}
|
||||
pMsg->dnodeId = htonl(pObj->id);
|
||||
|
||||
action.epSet = mndGetDnodeEpset(pDnode);
|
||||
action.pCont = pMsg;
|
||||
action.contLen = sizeof(SDropMnodeInMsg);
|
||||
action.msgType = TSDB_MSG_TYPE_CREATE_MNODE_IN;
|
||||
action.msgType = TSDB_MSG_TYPE_DROP_MNODE_IN;
|
||||
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
|
||||
free(pMsg);
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
@ -426,7 +494,7 @@ static int32_t mndDropMnode(SMnode *pMnode, SMnodeMsg *pMsg, SMnodeObj *pObj) {
|
|||
goto DROP_MNODE_OVER;
|
||||
}
|
||||
|
||||
if (mndSetCreateMnodeRedoActions(pMnode, pTrans, pObj) != 0) {
|
||||
if (mndSetDropMnodeRedoActions(pMnode, pTrans, pObj->pDnode, pObj) != 0) {
|
||||
mError("trans:%d, failed to set redo actions since %s", pTrans->id, terrstr());
|
||||
goto DROP_MNODE_OVER;
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue