TD-10431 refact create mnode msg
This commit is contained in:
parent
766039a7a9
commit
ad0995c207
|
@ -907,6 +907,7 @@ typedef struct {
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int32_t dnodeId;
|
int32_t dnodeId;
|
||||||
|
int32_t reserve[8];
|
||||||
} SCreateMnodeMsg, SDropMnodeMsg;
|
} SCreateMnodeMsg, SDropMnodeMsg;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
|
@ -914,10 +915,12 @@ typedef struct {
|
||||||
int8_t align[3];
|
int8_t align[3];
|
||||||
int8_t replica;
|
int8_t replica;
|
||||||
SReplica replicas[TSDB_MAX_REPLICA];
|
SReplica replicas[TSDB_MAX_REPLICA];
|
||||||
|
int32_t reserve[8];
|
||||||
} SCreateMnodeInMsg, SAlterMnodeInMsg;
|
} SCreateMnodeInMsg, SAlterMnodeInMsg;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int32_t dnodeId;
|
int32_t dnodeId;
|
||||||
|
int32_t reserve[8];
|
||||||
} SDropMnodeInMsg;
|
} SDropMnodeInMsg;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
|
|
|
@ -202,51 +202,137 @@ void mndGetMnodeEpSet(SMnode *pMnode, SEpSet *pEpSet) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static SCreateMnodeInMsg *mndBuildCreateMnodeMsg(SMnode *pMnode, SDnodeObj *pDnode, SMnodeObj *pObj) {
|
||||||
|
SCreateMnodeInMsg *pCreate = calloc(1, sizeof(SCreateMnodeInMsg));
|
||||||
|
if (pCreate == NULL) {
|
||||||
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
pCreate->dnodeId = htonl(pObj->id);
|
||||||
|
|
||||||
|
int32_t numOfReplicas = 0;
|
||||||
|
SSdb *pSdb = pMnode->pSdb;
|
||||||
|
void *pIter = NULL;
|
||||||
|
|
||||||
|
while (numOfReplicas < TSDB_MAX_REPLICA - 1) {
|
||||||
|
SMnodeObj *pObj = NULL;
|
||||||
|
pIter = sdbFetch(pSdb, SDB_MNODE, pIter, (void **)&pObj);
|
||||||
|
if (pIter == NULL) break;
|
||||||
|
if (pObj->pDnode == NULL) break;
|
||||||
|
|
||||||
|
SReplica *pReplica = &pCreate->replicas[numOfReplicas];
|
||||||
|
pReplica->id = htonl(pObj->id);
|
||||||
|
pReplica->port = htons(pObj->pDnode->port);
|
||||||
|
memcpy(pReplica->fqdn, pObj->pDnode->fqdn, TSDB_FQDN_LEN);
|
||||||
|
numOfReplicas++;
|
||||||
|
}
|
||||||
|
|
||||||
|
numOfReplicas++;
|
||||||
|
SReplica *pReplica = &pCreate->replicas[numOfReplicas];
|
||||||
|
pReplica->id = htonl(pObj->id);
|
||||||
|
pReplica->port = htons(pDnode->port);
|
||||||
|
memcpy(pReplica->fqdn, pDnode->fqdn, TSDB_FQDN_LEN);
|
||||||
|
|
||||||
|
return pCreate;
|
||||||
|
}
|
||||||
|
|
||||||
|
static SDropMnodeInMsg *mndBuildDropMnodeMsg(SMnode *pMnode, SMnodeObj *pObj) {
|
||||||
|
SDropMnodeInMsg *pDrop = calloc(1, sizeof(SDropMnodeInMsg));
|
||||||
|
if (pDrop == NULL) {
|
||||||
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
pDrop->dnodeId = htonl(pObj->id);
|
||||||
|
return pDrop;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t mndSetCreateMnodeRedoLogs(SMnode *pMnode, STrans *pTrans, SMnodeObj *pObj) {
|
||||||
|
SSdbRaw *pRedoRaw = mndMnodeActionEncode(pObj);
|
||||||
|
if (pRedoRaw == NULL) return -1;
|
||||||
|
if (mndTransAppendRedolog(pTrans, pRedoRaw) != 0) return -1;
|
||||||
|
if (sdbSetRawStatus(pRedoRaw, SDB_STATUS_CREATING) != 0) return -1;
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t mndSetCreateMnodeUndoLogs(SMnode *pMnode, STrans *pTrans, SMnodeObj *pObj) {
|
||||||
|
SSdbRaw *pUndoRaw = mndMnodeActionEncode(pObj);
|
||||||
|
if (pUndoRaw == NULL) return -1;
|
||||||
|
if (mndTransAppendUndolog(pTrans, pUndoRaw) != 0) return -1;
|
||||||
|
if (sdbSetRawStatus(pUndoRaw, SDB_STATUS_DROPPED) != 0) return -1;
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t mndSetCreateMnodeCommitLogs(SMnode *pMnode, STrans *pTrans, SMnodeObj *pObj) {
|
||||||
|
SSdbRaw *pCommitRaw = mndMnodeActionEncode(pObj);
|
||||||
|
if (pCommitRaw == NULL) return -1;
|
||||||
|
if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) return -1;
|
||||||
|
if (sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY) != 0) return -1;
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t mndSetCreateMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SMnodeObj *pObj) {
|
||||||
|
STransAction action = {0};
|
||||||
|
|
||||||
|
SDnodeObj *pDnode = mndAcquireDnode(pMnode, pObj->id);
|
||||||
|
if (pDnode == NULL) return -1;
|
||||||
|
action.epSet = mndGetDnodeEpset(pDnode);
|
||||||
|
mndReleaseDnode(pMnode, pDnode);
|
||||||
|
|
||||||
|
SCreateMnodeInMsg *pMsg = mndBuildCreateMnodeMsg(pMnode, pDnode, pObj);
|
||||||
|
if (pMsg == NULL) return -1;
|
||||||
|
|
||||||
|
action.pCont = pMsg;
|
||||||
|
action.contLen = sizeof(SCreateMnodeInMsg);
|
||||||
|
action.msgType = TSDB_MSG_TYPE_CREATE_MNODE_IN;
|
||||||
|
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
|
||||||
|
free(pMsg);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t mndCreateMnode(SMnode *pMnode, SMnodeMsg *pMsg, SCreateMnodeMsg *pCreate) {
|
static int32_t mndCreateMnode(SMnode *pMnode, SMnodeMsg *pMsg, SCreateMnodeMsg *pCreate) {
|
||||||
SMnodeObj mnodeObj = {0};
|
SMnodeObj mnodeObj = {0};
|
||||||
mnodeObj.id = sdbGetMaxId(pMnode->pSdb, SDB_MNODE);
|
mnodeObj.id = sdbGetMaxId(pMnode->pSdb, SDB_MNODE);
|
||||||
mnodeObj.createdTime = taosGetTimestampMs();
|
mnodeObj.createdTime = taosGetTimestampMs();
|
||||||
mnodeObj.updateTime = mnodeObj.createdTime;
|
mnodeObj.updateTime = mnodeObj.createdTime;
|
||||||
|
|
||||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, pMsg->rpcMsg.handle);
|
int32_t code = -1;
|
||||||
|
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, pMsg->rpcMsg.handle);
|
||||||
if (pTrans == NULL) {
|
if (pTrans == NULL) {
|
||||||
mError("dnode:%d, failed to create since %s", pCreate->dnodeId, terrstr());
|
mError("mnode:%d, failed to create since %s", pCreate->dnodeId, terrstr());
|
||||||
return -1;
|
goto CREATE_MNODE_OVER;
|
||||||
}
|
}
|
||||||
mDebug("trans:%d, used to create dnode:%d", pTrans->id, pCreate->dnodeId);
|
mDebug("trans:%d, used to create mnode:%d", pTrans->id, pCreate->dnodeId);
|
||||||
|
|
||||||
SSdbRaw *pRedoRaw = mndMnodeActionEncode(&mnodeObj);
|
if (mndSetCreateMnodeRedoLogs(pMnode, pTrans, &mnodeObj) != 0) {
|
||||||
if (pRedoRaw == NULL || mndTransAppendRedolog(pTrans, pRedoRaw) != 0) {
|
mError("trans:%d, failed to set redo log since %s", pTrans->id, terrstr());
|
||||||
mError("trans:%d, failed to append redo log since %s", pTrans->id, terrstr());
|
goto CREATE_MNODE_OVER;
|
||||||
mndTransDrop(pTrans);
|
|
||||||
return -1;
|
|
||||||
}
|
}
|
||||||
sdbSetRawStatus(pRedoRaw, SDB_STATUS_CREATING);
|
|
||||||
|
|
||||||
SSdbRaw *pUndoRaw = mndMnodeActionEncode(&mnodeObj);
|
if (mndSetCreateMnodeCommitLogs(pMnode, pTrans, &mnodeObj) != 0) {
|
||||||
if (pUndoRaw == NULL || mndTransAppendUndolog(pTrans, pUndoRaw) != 0) {
|
mError("trans:%d, failed to set commit log since %s", pTrans->id, terrstr());
|
||||||
mError("trans:%d, failed to append undo log since %s", pTrans->id, terrstr());
|
goto CREATE_MNODE_OVER;
|
||||||
mndTransDrop(pTrans);
|
|
||||||
return -1;
|
|
||||||
}
|
}
|
||||||
sdbSetRawStatus(pUndoRaw, SDB_STATUS_DROPPED);
|
|
||||||
|
|
||||||
SSdbRaw *pCommitRaw = mndMnodeActionEncode(&mnodeObj);
|
if (mndSetCreateMnodeRedoActions(pMnode, pTrans, &mnodeObj) != 0) {
|
||||||
if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) {
|
mError("trans:%d, failed to set redo actions since %s", pTrans->id, terrstr());
|
||||||
mError("trans:%d, failed to append commit log since %s", pTrans->id, terrstr());
|
goto CREATE_MNODE_OVER;
|
||||||
mndTransDrop(pTrans);
|
|
||||||
return -1;
|
|
||||||
}
|
}
|
||||||
sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY);
|
|
||||||
|
|
||||||
if (mndTransPrepare(pMnode, pTrans) != 0) {
|
if (mndTransPrepare(pMnode, pTrans) != 0) {
|
||||||
mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
|
mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
|
||||||
mndTransDrop(pTrans);
|
goto CREATE_MNODE_OVER;
|
||||||
return -1;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
code = 0;
|
||||||
|
|
||||||
|
CREATE_MNODE_OVER:
|
||||||
mndTransDrop(pTrans);
|
mndTransDrop(pTrans);
|
||||||
return 0;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndProcessCreateMnodeReq(SMnodeMsg *pMsg) {
|
static int32_t mndProcessCreateMnodeReq(SMnodeMsg *pMsg) {
|
||||||
|
@ -282,46 +368,79 @@ static int32_t mndProcessCreateMnodeReq(SMnodeMsg *pMsg) {
|
||||||
return TSDB_CODE_MND_ACTION_IN_PROGRESS;
|
return TSDB_CODE_MND_ACTION_IN_PROGRESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t mndSetDropMnodeRedoLogs(SMnode *pMnode, STrans *pTrans, SMnodeObj *pObj) {
|
||||||
|
SSdbRaw *pRedoRaw = mndMnodeActionEncode(pObj);
|
||||||
|
if (pRedoRaw == NULL) return -1;
|
||||||
|
if (mndTransAppendRedolog(pTrans, pRedoRaw) != 0) return -1;
|
||||||
|
if (sdbSetRawStatus(pRedoRaw, SDB_STATUS_DROPPING) != 0) return -1;
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t mndSetDropMnodeCommitLogs(SMnode *pMnode, STrans *pTrans, SMnodeObj *pObj) {
|
||||||
|
SSdbRaw *pCommitRaw = mndMnodeActionEncode(pObj);
|
||||||
|
if (pCommitRaw == NULL) return -1;
|
||||||
|
if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) return -1;
|
||||||
|
if (sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED) != 0) return -1;
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t mndSetDropMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SMnodeObj *pObj) {
|
||||||
|
STransAction action = {0};
|
||||||
|
|
||||||
|
SDnodeObj *pDnode = mndAcquireDnode(pMnode, pObj->id);
|
||||||
|
if (pDnode == NULL) return -1;
|
||||||
|
action.epSet = mndGetDnodeEpset(pDnode);
|
||||||
|
mndReleaseDnode(pMnode, pDnode);
|
||||||
|
|
||||||
|
SDropMnodeInMsg *pMsg = mndBuildDropMnodeMsg(pMnode, pObj);
|
||||||
|
if (pMsg == NULL) return -1;
|
||||||
|
|
||||||
|
action.pCont = pMsg;
|
||||||
|
action.contLen = sizeof(SDropMnodeInMsg);
|
||||||
|
action.msgType = TSDB_MSG_TYPE_CREATE_MNODE_IN;
|
||||||
|
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
|
||||||
|
free(pMsg);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t mndDropMnode(SMnode *pMnode, SMnodeMsg *pMsg, SMnodeObj *pObj) {
|
static int32_t mndDropMnode(SMnode *pMnode, SMnodeMsg *pMsg, SMnodeObj *pObj) {
|
||||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, pMsg->rpcMsg.handle);
|
int32_t code = -1;
|
||||||
|
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, pMsg->rpcMsg.handle);
|
||||||
if (pTrans == NULL) {
|
if (pTrans == NULL) {
|
||||||
mError("mnode:%d, failed to drop since %s", pObj->id, terrstr());
|
mError("mnode:%d, failed to drop since %s", pObj->id, terrstr());
|
||||||
return -1;
|
goto DROP_MNODE_OVER;
|
||||||
}
|
}
|
||||||
mDebug("trans:%d, used to drop user:%d", pTrans->id, pObj->id);
|
|
||||||
|
|
||||||
SSdbRaw *pRedoRaw = mndMnodeActionEncode(pObj);
|
mDebug("trans:%d, used to drop mnode:%d", pTrans->id, pObj->id);
|
||||||
if (pRedoRaw == NULL || mndTransAppendRedolog(pTrans, pRedoRaw) != 0) {
|
|
||||||
mError("trans:%d, failed to append redo log since %s", pTrans->id, terrstr());
|
|
||||||
mndTransDrop(pTrans);
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
sdbSetRawStatus(pRedoRaw, SDB_STATUS_DROPPING);
|
|
||||||
|
|
||||||
SSdbRaw *pUndoRaw = mndMnodeActionEncode(pObj);
|
if (mndSetCreateMnodeRedoLogs(pMnode, pTrans, pObj) != 0) {
|
||||||
if (pUndoRaw == NULL || mndTransAppendUndolog(pTrans, pUndoRaw) != 0) {
|
mError("trans:%d, failed to set redo log since %s", pTrans->id, terrstr());
|
||||||
mError("trans:%d, failed to append undo log since %s", pTrans->id, terrstr());
|
goto DROP_MNODE_OVER;
|
||||||
mndTransDrop(pTrans);
|
|
||||||
return -1;
|
|
||||||
}
|
}
|
||||||
sdbSetRawStatus(pUndoRaw, SDB_STATUS_READY);
|
|
||||||
|
|
||||||
SSdbRaw *pCommitRaw = mndMnodeActionEncode(pObj);
|
if (mndSetCreateMnodeCommitLogs(pMnode, pTrans, pObj) != 0) {
|
||||||
if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) {
|
mError("trans:%d, failed to set commit log since %s", pTrans->id, terrstr());
|
||||||
mError("trans:%d, failed to append commit log since %s", pTrans->id, terrstr());
|
goto DROP_MNODE_OVER;
|
||||||
mndTransDrop(pTrans);
|
}
|
||||||
return -1;
|
|
||||||
|
if (mndSetCreateMnodeRedoActions(pMnode, pTrans, pObj) != 0) {
|
||||||
|
mError("trans:%d, failed to set redo actions since %s", pTrans->id, terrstr());
|
||||||
|
goto DROP_MNODE_OVER;
|
||||||
}
|
}
|
||||||
sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED);
|
|
||||||
|
|
||||||
if (mndTransPrepare(pMnode, pTrans) != 0) {
|
if (mndTransPrepare(pMnode, pTrans) != 0) {
|
||||||
mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
|
mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
|
||||||
mndTransDrop(pTrans);
|
goto DROP_MNODE_OVER;
|
||||||
return -1;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
code = 0;
|
||||||
|
|
||||||
|
DROP_MNODE_OVER:
|
||||||
mndTransDrop(pTrans);
|
mndTransDrop(pTrans);
|
||||||
return 0;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndProcessDropMnodeReq(SMnodeMsg *pMsg) {
|
static int32_t mndProcessDropMnodeReq(SMnodeMsg *pMsg) {
|
||||||
|
|
Loading…
Reference in New Issue