Merge pull request #9611 from taosdata/feature/dnode3
when an error occurs in rollback or retry, the failed result is returned first
This commit is contained in:
commit
5ba5bc79d7
|
@ -94,6 +94,7 @@ typedef struct SSdbRaw SSdbRaw;
|
|||
typedef struct SSdbRow SSdbRow;
|
||||
typedef enum { SDB_KEY_BINARY = 1, SDB_KEY_INT32 = 2, SDB_KEY_INT64 = 3 } EKeyType;
|
||||
typedef enum {
|
||||
SDB_STATUS_INIT = 0,
|
||||
SDB_STATUS_CREATING = 1,
|
||||
SDB_STATUS_UPDATING = 2,
|
||||
SDB_STATUS_DROPPING = 3,
|
||||
|
|
|
@ -64,10 +64,10 @@ typedef enum {
|
|||
TRN_STAGE_PREPARE = 0,
|
||||
TRN_STAGE_REDO_LOG = 1,
|
||||
TRN_STAGE_REDO_ACTION = 2,
|
||||
TRN_STAGE_UNDO_LOG = 3,
|
||||
TRN_STAGE_UNDO_ACTION = 4,
|
||||
TRN_STAGE_COMMIT_LOG = 5,
|
||||
TRN_STAGE_COMMIT = 6,
|
||||
TRN_STAGE_COMMIT_LOG = 3,
|
||||
TRN_STAGE_COMMIT = 4,
|
||||
TRN_STAGE_UNDO_ACTION = 5,
|
||||
TRN_STAGE_UNDO_LOG = 6,
|
||||
TRN_STAGE_ROLLBACK = 7,
|
||||
TRN_STAGE_FINISHED = 8
|
||||
} ETrnStage;
|
||||
|
|
|
@ -28,6 +28,7 @@ typedef struct {
|
|||
int8_t msgSent;
|
||||
int8_t msgReceived;
|
||||
int32_t errCode;
|
||||
int32_t acceptableCode;
|
||||
int32_t contLen;
|
||||
void *pCont;
|
||||
} STransAction;
|
||||
|
|
|
@ -26,7 +26,7 @@ static SSdbRaw *mndBnodeActionEncode(SBnodeObj *pObj);
|
|||
static SSdbRow *mndBnodeActionDecode(SSdbRaw *pRaw);
|
||||
static int32_t mndBnodeActionInsert(SSdb *pSdb, SBnodeObj *pObj);
|
||||
static int32_t mndBnodeActionDelete(SSdb *pSdb, SBnodeObj *pObj);
|
||||
static int32_t mndBnodeActionUpdate(SSdb *pSdb, SBnodeObj *pOldBnode, SBnodeObj *pNewBnode);
|
||||
static int32_t mndBnodeActionUpdate(SSdb *pSdb, SBnodeObj *pOld, SBnodeObj *pNew);
|
||||
static int32_t mndProcessCreateBnodeReq(SMnodeMsg *pMsg);
|
||||
static int32_t mndProcessDropBnodeReq(SMnodeMsg *pMsg);
|
||||
static int32_t mndProcessCreateBnodeRsp(SMnodeMsg *pMsg);
|
||||
|
@ -59,9 +59,8 @@ int32_t mndInitBnode(SMnode *pMnode) {
|
|||
void mndCleanupBnode(SMnode *pMnode) {}
|
||||
|
||||
static SBnodeObj *mndAcquireBnode(SMnode *pMnode, int32_t bnodeId) {
|
||||
SSdb *pSdb = pMnode->pSdb;
|
||||
SBnodeObj *pObj = sdbAcquire(pSdb, SDB_BNODE, &bnodeId);
|
||||
if (pObj == NULL) {
|
||||
SBnodeObj *pObj = sdbAcquire(pMnode->pSdb, SDB_BNODE, &bnodeId);
|
||||
if (pObj == NULL && terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
|
||||
terrno = TSDB_CODE_MND_BNODE_NOT_EXIST;
|
||||
}
|
||||
return pObj;
|
||||
|
@ -155,9 +154,9 @@ static int32_t mndBnodeActionDelete(SSdb *pSdb, SBnodeObj *pObj) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
static int32_t mndBnodeActionUpdate(SSdb *pSdb, SBnodeObj *pOldBnode, SBnodeObj *pNewBnode) {
|
||||
mTrace("bnode:%d, perform update action, old_row:%p new_row:%p", pOldBnode->id, pOldBnode, pNewBnode);
|
||||
pOldBnode->updateTime = pNewBnode->updateTime;
|
||||
static int32_t mndBnodeActionUpdate(SSdb *pSdb, SBnodeObj *pOld, SBnodeObj *pNew) {
|
||||
mTrace("bnode:%d, perform update action, old_row:%p new_row:%p", pOld->id, pOld, pNew);
|
||||
pOld->updateTime = pNew->updateTime;
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -169,6 +168,14 @@ static int32_t mndSetCreateBnodeRedoLogs(STrans *pTrans, SBnodeObj *pObj) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
static int32_t mndSetCreateBnodeUndoLogs(STrans *pTrans, SBnodeObj *pObj) {
|
||||
SSdbRaw *pUndoRaw = mndBnodeActionEncode(pObj);
|
||||
if (pUndoRaw == NULL) return -1;
|
||||
if (mndTransAppendUndolog(pTrans, pUndoRaw) != 0) return -1;
|
||||
if (sdbSetRawStatus(pUndoRaw, SDB_STATUS_DROPPED) != 0) return -1;
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int32_t mndSetCreateBnodeCommitLogs(STrans *pTrans, SBnodeObj *pObj) {
|
||||
SSdbRaw *pCommitRaw = mndBnodeActionEncode(pObj);
|
||||
if (pCommitRaw == NULL) return -1;
|
||||
|
@ -190,6 +197,7 @@ static int32_t mndSetCreateBnodeRedoActions(STrans *pTrans, SDnodeObj *pDnode, S
|
|||
action.pCont = pMsg;
|
||||
action.contLen = sizeof(SDCreateBnodeReq);
|
||||
action.msgType = TDMT_DND_CREATE_BNODE;
|
||||
action.acceptableCode = TSDB_CODE_DND_BNODE_ALREADY_DEPLOYED;
|
||||
|
||||
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
|
||||
free(pMsg);
|
||||
|
@ -199,39 +207,47 @@ static int32_t mndSetCreateBnodeRedoActions(STrans *pTrans, SDnodeObj *pDnode, S
|
|||
return 0;
|
||||
}
|
||||
|
||||
static int32_t mndSetCreateBnodeUndoActions(STrans *pTrans, SDnodeObj *pDnode, SBnodeObj *pObj) {
|
||||
SDDropBnodeReq *pMsg = malloc(sizeof(SDDropBnodeReq));
|
||||
if (pMsg == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
}
|
||||
pMsg->dnodeId = htonl(pDnode->id);
|
||||
|
||||
STransAction action = {0};
|
||||
action.epSet = mndGetDnodeEpset(pDnode);
|
||||
action.pCont = pMsg;
|
||||
action.contLen = sizeof(SDDropBnodeReq);
|
||||
action.msgType = TDMT_DND_DROP_BNODE;
|
||||
action.acceptableCode = TSDB_CODE_DND_BNODE_NOT_DEPLOYED;
|
||||
|
||||
if (mndTransAppendUndoAction(pTrans, &action) != 0) {
|
||||
free(pMsg);
|
||||
return -1;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int32_t mndCreateBnode(SMnode *pMnode, SMnodeMsg *pMsg, SDnodeObj *pDnode, SMCreateBnodeReq *pCreate) {
|
||||
int32_t code = -1;
|
||||
|
||||
SBnodeObj bnodeObj = {0};
|
||||
bnodeObj.id = pDnode->id;
|
||||
bnodeObj.createdTime = taosGetTimestampMs();
|
||||
bnodeObj.updateTime = bnodeObj.createdTime;
|
||||
|
||||
int32_t code = -1;
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, &pMsg->rpcMsg);
|
||||
if (pTrans == NULL) {
|
||||
mError("bnode:%d, failed to create since %s", pCreate->dnodeId, terrstr());
|
||||
goto CREATE_BNODE_OVER;
|
||||
}
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, &pMsg->rpcMsg);
|
||||
if (pTrans == NULL) goto CREATE_BNODE_OVER;
|
||||
|
||||
mDebug("trans:%d, used to create bnode:%d", pTrans->id, pCreate->dnodeId);
|
||||
|
||||
if (mndSetCreateBnodeRedoLogs(pTrans, &bnodeObj) != 0) {
|
||||
mError("trans:%d, failed to set redo log since %s", pTrans->id, terrstr());
|
||||
goto CREATE_BNODE_OVER;
|
||||
}
|
||||
|
||||
if (mndSetCreateBnodeCommitLogs(pTrans, &bnodeObj) != 0) {
|
||||
mError("trans:%d, failed to set commit log since %s", pTrans->id, terrstr());
|
||||
goto CREATE_BNODE_OVER;
|
||||
}
|
||||
|
||||
if (mndSetCreateBnodeRedoActions(pTrans, pDnode, &bnodeObj) != 0) {
|
||||
mError("trans:%d, failed to set redo actions since %s", pTrans->id, terrstr());
|
||||
goto CREATE_BNODE_OVER;
|
||||
}
|
||||
|
||||
if (mndTransPrepare(pMnode, pTrans) != 0) {
|
||||
mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
|
||||
goto CREATE_BNODE_OVER;
|
||||
}
|
||||
if (mndSetCreateBnodeRedoLogs(pTrans, &bnodeObj) != 0) goto CREATE_BNODE_OVER;
|
||||
if (mndSetCreateBnodeUndoLogs(pTrans, &bnodeObj) != 0) goto CREATE_BNODE_OVER;
|
||||
if (mndSetCreateBnodeCommitLogs(pTrans, &bnodeObj) != 0) goto CREATE_BNODE_OVER;
|
||||
if (mndSetCreateBnodeRedoActions(pTrans, pDnode, &bnodeObj) != 0) goto CREATE_BNODE_OVER;
|
||||
if (mndSetCreateBnodeUndoActions(pTrans, pDnode, &bnodeObj) != 0) goto CREATE_BNODE_OVER;
|
||||
if (mndTransPrepare(pMnode, pTrans) != 0) goto CREATE_BNODE_OVER;
|
||||
|
||||
code = 0;
|
||||
|
||||
|
@ -251,8 +267,12 @@ static int32_t mndProcessCreateBnodeReq(SMnodeMsg *pMsg) {
|
|||
SBnodeObj *pObj = mndAcquireBnode(pMnode, pCreate->dnodeId);
|
||||
if (pObj != NULL) {
|
||||
mError("bnode:%d, bnode already exist", pObj->id);
|
||||
terrno = TSDB_CODE_MND_BNODE_ALREADY_EXIST;
|
||||
mndReleaseBnode(pMnode, pObj);
|
||||
return -1;
|
||||
} else if (terrno != TSDB_CODE_MND_BNODE_NOT_EXIST) {
|
||||
mError("bnode:%d, failed to create bnode since %s", pCreate->dnodeId, terrstr());
|
||||
return -1;
|
||||
}
|
||||
|
||||
SDnodeObj *pDnode = mndAcquireDnode(pMnode, pCreate->dnodeId);
|
||||
|
@ -302,6 +322,7 @@ static int32_t mndSetDropBnodeRedoActions(STrans *pTrans, SDnodeObj *pDnode, SBn
|
|||
action.pCont = pMsg;
|
||||
action.contLen = sizeof(SDDropBnodeReq);
|
||||
action.msgType = TDMT_DND_DROP_BNODE;
|
||||
action.acceptableCode = TSDB_CODE_DND_BNODE_NOT_DEPLOYED;
|
||||
|
||||
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
|
||||
free(pMsg);
|
||||
|
@ -313,33 +334,15 @@ static int32_t mndSetDropBnodeRedoActions(STrans *pTrans, SDnodeObj *pDnode, SBn
|
|||
|
||||
static int32_t mndDropBnode(SMnode *pMnode, SMnodeMsg *pMsg, SBnodeObj *pObj) {
|
||||
int32_t code = -1;
|
||||
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, &pMsg->rpcMsg);
|
||||
if (pTrans == NULL) {
|
||||
mError("bnode:%d, failed to drop since %s", pObj->id, terrstr());
|
||||
goto DROP_BNODE_OVER;
|
||||
}
|
||||
if (pTrans == NULL) goto DROP_BNODE_OVER;
|
||||
|
||||
mDebug("trans:%d, used to drop bnode:%d", pTrans->id, pObj->id);
|
||||
|
||||
if (mndSetDropBnodeRedoLogs(pTrans, pObj) != 0) {
|
||||
mError("trans:%d, failed to set redo log since %s", pTrans->id, terrstr());
|
||||
goto DROP_BNODE_OVER;
|
||||
}
|
||||
|
||||
if (mndSetDropBnodeCommitLogs(pTrans, pObj) != 0) {
|
||||
mError("trans:%d, failed to set commit log since %s", pTrans->id, terrstr());
|
||||
goto DROP_BNODE_OVER;
|
||||
}
|
||||
|
||||
if (mndSetDropBnodeRedoActions(pTrans, pObj->pDnode, pObj) != 0) {
|
||||
mError("trans:%d, failed to set redo actions since %s", pTrans->id, terrstr());
|
||||
goto DROP_BNODE_OVER;
|
||||
}
|
||||
|
||||
if (mndTransPrepare(pMnode, pTrans) != 0) {
|
||||
mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
|
||||
goto DROP_BNODE_OVER;
|
||||
}
|
||||
if (mndSetDropBnodeRedoLogs(pTrans, pObj) != 0) goto DROP_BNODE_OVER;
|
||||
if (mndSetDropBnodeCommitLogs(pTrans, pObj) != 0) goto DROP_BNODE_OVER;
|
||||
if (mndSetDropBnodeRedoActions(pTrans, pObj->pDnode, pObj) != 0) goto DROP_BNODE_OVER;
|
||||
if (mndTransPrepare(pMnode, pTrans) != 0) goto DROP_BNODE_OVER;
|
||||
|
||||
code = 0;
|
||||
|
||||
|
@ -363,18 +366,18 @@ static int32_t mndProcessDropBnodeReq(SMnodeMsg *pMsg) {
|
|||
|
||||
SBnodeObj *pObj = mndAcquireBnode(pMnode, pDrop->dnodeId);
|
||||
if (pObj == NULL) {
|
||||
mError("bnode:%d, not exist", pDrop->dnodeId);
|
||||
terrno = TSDB_CODE_MND_BNODE_NOT_EXIST;
|
||||
mError("bnode:%d, failed to drop since %s", pDrop->dnodeId, terrstr());
|
||||
return -1;
|
||||
}
|
||||
|
||||
int32_t code = mndDropBnode(pMnode, pMsg, pObj);
|
||||
if (code != 0) {
|
||||
sdbRelease(pMnode->pSdb, pObj);
|
||||
mError("bnode:%d, failed to drop since %s", pMnode->dnodeId, terrstr());
|
||||
return -1;
|
||||
}
|
||||
|
||||
sdbRelease(pMnode->pSdb, pMnode);
|
||||
sdbRelease(pMnode->pSdb, pObj);
|
||||
return TSDB_CODE_MND_ACTION_IN_PROGRESS;
|
||||
}
|
||||
|
||||
|
|
|
@ -26,7 +26,7 @@ static SSdbRaw *mndQnodeActionEncode(SQnodeObj *pObj);
|
|||
static SSdbRow *mndQnodeActionDecode(SSdbRaw *pRaw);
|
||||
static int32_t mndQnodeActionInsert(SSdb *pSdb, SQnodeObj *pObj);
|
||||
static int32_t mndQnodeActionDelete(SSdb *pSdb, SQnodeObj *pObj);
|
||||
static int32_t mndQnodeActionUpdate(SSdb *pSdb, SQnodeObj *pOldQnode, SQnodeObj *pNewQnode);
|
||||
static int32_t mndQnodeActionUpdate(SSdb *pSdb, SQnodeObj *pOld, SQnodeObj *pNew);
|
||||
static int32_t mndProcessCreateQnodeReq(SMnodeMsg *pMsg);
|
||||
static int32_t mndProcessDropQnodeReq(SMnodeMsg *pMsg);
|
||||
static int32_t mndProcessCreateQnodeRsp(SMnodeMsg *pMsg);
|
||||
|
@ -59,9 +59,8 @@ int32_t mndInitQnode(SMnode *pMnode) {
|
|||
void mndCleanupQnode(SMnode *pMnode) {}
|
||||
|
||||
static SQnodeObj *mndAcquireQnode(SMnode *pMnode, int32_t qnodeId) {
|
||||
SSdb *pSdb = pMnode->pSdb;
|
||||
SQnodeObj *pObj = sdbAcquire(pSdb, SDB_QNODE, &qnodeId);
|
||||
if (pObj == NULL) {
|
||||
SQnodeObj *pObj = sdbAcquire(pMnode->pSdb, SDB_QNODE, &qnodeId);
|
||||
if (pObj == NULL && terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
|
||||
terrno = TSDB_CODE_MND_QNODE_NOT_EXIST;
|
||||
}
|
||||
return pObj;
|
||||
|
@ -155,9 +154,9 @@ static int32_t mndQnodeActionDelete(SSdb *pSdb, SQnodeObj *pObj) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
static int32_t mndQnodeActionUpdate(SSdb *pSdb, SQnodeObj *pOldQnode, SQnodeObj *pNewQnode) {
|
||||
mTrace("qnode:%d, perform update action, old_row:%p new_row:%p", pOldQnode->id, pOldQnode, pNewQnode);
|
||||
pOldQnode->updateTime = pNewQnode->updateTime;
|
||||
static int32_t mndQnodeActionUpdate(SSdb *pSdb, SQnodeObj *pOld, SQnodeObj *pNew) {
|
||||
mTrace("qnode:%d, perform update action, old_row:%p new_row:%p", pOld->id, pOld, pNew);
|
||||
pOld->updateTime = pNew->updateTime;
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -169,6 +168,14 @@ static int32_t mndSetCreateQnodeRedoLogs(STrans *pTrans, SQnodeObj *pObj) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
static int32_t mndSetCreateQnodeUndoLogs(STrans *pTrans, SQnodeObj *pObj) {
|
||||
SSdbRaw *pUndoRaw = mndQnodeActionEncode(pObj);
|
||||
if (pUndoRaw == NULL) return -1;
|
||||
if (mndTransAppendUndolog(pTrans, pUndoRaw) != 0) return -1;
|
||||
if (sdbSetRawStatus(pUndoRaw, SDB_STATUS_DROPPED) != 0) return -1;
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int32_t mndSetCreateQnodeCommitLogs(STrans *pTrans, SQnodeObj *pObj) {
|
||||
SSdbRaw *pCommitRaw = mndQnodeActionEncode(pObj);
|
||||
if (pCommitRaw == NULL) return -1;
|
||||
|
@ -190,6 +197,7 @@ static int32_t mndSetCreateQnodeRedoActions(STrans *pTrans, SDnodeObj *pDnode, S
|
|||
action.pCont = pMsg;
|
||||
action.contLen = sizeof(SDCreateQnodeReq);
|
||||
action.msgType = TDMT_DND_CREATE_QNODE;
|
||||
action.acceptableCode = TSDB_CODE_DND_QNODE_ALREADY_DEPLOYED;
|
||||
|
||||
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
|
||||
free(pMsg);
|
||||
|
@ -199,39 +207,47 @@ static int32_t mndSetCreateQnodeRedoActions(STrans *pTrans, SDnodeObj *pDnode, S
|
|||
return 0;
|
||||
}
|
||||
|
||||
static int32_t mndSetCreateQnodeUndoActions(STrans *pTrans, SDnodeObj *pDnode, SQnodeObj *pObj) {
|
||||
SDDropQnodeReq *pMsg = malloc(sizeof(SDDropQnodeReq));
|
||||
if (pMsg == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
}
|
||||
pMsg->dnodeId = htonl(pDnode->id);
|
||||
|
||||
STransAction action = {0};
|
||||
action.epSet = mndGetDnodeEpset(pDnode);
|
||||
action.pCont = pMsg;
|
||||
action.contLen = sizeof(SDDropQnodeReq);
|
||||
action.msgType = TDMT_DND_DROP_QNODE;
|
||||
action.acceptableCode = TSDB_CODE_DND_QNODE_NOT_DEPLOYED;
|
||||
|
||||
if (mndTransAppendUndoAction(pTrans, &action) != 0) {
|
||||
free(pMsg);
|
||||
return -1;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int32_t mndCreateQnode(SMnode *pMnode, SMnodeMsg *pMsg, SDnodeObj *pDnode, SMCreateQnodeReq *pCreate) {
|
||||
int32_t code = -1;
|
||||
|
||||
SQnodeObj qnodeObj = {0};
|
||||
qnodeObj.id = pDnode->id;
|
||||
qnodeObj.createdTime = taosGetTimestampMs();
|
||||
qnodeObj.updateTime = qnodeObj.createdTime;
|
||||
|
||||
int32_t code = -1;
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, &pMsg->rpcMsg);
|
||||
if (pTrans == NULL) {
|
||||
mError("qnode:%d, failed to create since %s", pCreate->dnodeId, terrstr());
|
||||
goto CREATE_QNODE_OVER;
|
||||
}
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, &pMsg->rpcMsg);
|
||||
if (pTrans == NULL) goto CREATE_QNODE_OVER;
|
||||
|
||||
mDebug("trans:%d, used to create qnode:%d", pTrans->id, pCreate->dnodeId);
|
||||
|
||||
if (mndSetCreateQnodeRedoLogs(pTrans, &qnodeObj) != 0) {
|
||||
mError("trans:%d, failed to set redo log since %s", pTrans->id, terrstr());
|
||||
goto CREATE_QNODE_OVER;
|
||||
}
|
||||
|
||||
if (mndSetCreateQnodeCommitLogs(pTrans, &qnodeObj) != 0) {
|
||||
mError("trans:%d, failed to set commit log since %s", pTrans->id, terrstr());
|
||||
goto CREATE_QNODE_OVER;
|
||||
}
|
||||
|
||||
if (mndSetCreateQnodeRedoActions(pTrans, pDnode, &qnodeObj) != 0) {
|
||||
mError("trans:%d, failed to set redo actions since %s", pTrans->id, terrstr());
|
||||
goto CREATE_QNODE_OVER;
|
||||
}
|
||||
|
||||
if (mndTransPrepare(pMnode, pTrans) != 0) {
|
||||
mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
|
||||
goto CREATE_QNODE_OVER;
|
||||
}
|
||||
if (mndSetCreateQnodeRedoLogs(pTrans, &qnodeObj) != 0) goto CREATE_QNODE_OVER;
|
||||
if (mndSetCreateQnodeUndoLogs(pTrans, &qnodeObj) != 0) goto CREATE_QNODE_OVER;
|
||||
if (mndSetCreateQnodeCommitLogs(pTrans, &qnodeObj) != 0) goto CREATE_QNODE_OVER;
|
||||
if (mndSetCreateQnodeRedoActions(pTrans, pDnode, &qnodeObj) != 0) goto CREATE_QNODE_OVER;
|
||||
if (mndSetCreateQnodeUndoActions(pTrans, pDnode, &qnodeObj) != 0) goto CREATE_QNODE_OVER;
|
||||
if (mndTransPrepare(pMnode, pTrans) != 0) goto CREATE_QNODE_OVER;
|
||||
|
||||
code = 0;
|
||||
|
||||
|
@ -251,8 +267,12 @@ static int32_t mndProcessCreateQnodeReq(SMnodeMsg *pMsg) {
|
|||
SQnodeObj *pObj = mndAcquireQnode(pMnode, pCreate->dnodeId);
|
||||
if (pObj != NULL) {
|
||||
mError("qnode:%d, qnode already exist", pObj->id);
|
||||
terrno = TSDB_CODE_MND_QNODE_ALREADY_EXIST;
|
||||
mndReleaseQnode(pMnode, pObj);
|
||||
return -1;
|
||||
} else if (terrno != TSDB_CODE_MND_QNODE_NOT_EXIST) {
|
||||
mError("qnode:%d, failed to create qnode since %s", pCreate->dnodeId, terrstr());
|
||||
return -1;
|
||||
}
|
||||
|
||||
SDnodeObj *pDnode = mndAcquireDnode(pMnode, pCreate->dnodeId);
|
||||
|
@ -302,6 +322,7 @@ static int32_t mndSetDropQnodeRedoActions(STrans *pTrans, SDnodeObj *pDnode, SQn
|
|||
action.pCont = pMsg;
|
||||
action.contLen = sizeof(SDDropQnodeReq);
|
||||
action.msgType = TDMT_DND_DROP_QNODE;
|
||||
action.acceptableCode = TSDB_CODE_DND_QNODE_NOT_DEPLOYED;
|
||||
|
||||
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
|
||||
free(pMsg);
|
||||
|
@ -313,33 +334,15 @@ static int32_t mndSetDropQnodeRedoActions(STrans *pTrans, SDnodeObj *pDnode, SQn
|
|||
|
||||
static int32_t mndDropQnode(SMnode *pMnode, SMnodeMsg *pMsg, SQnodeObj *pObj) {
|
||||
int32_t code = -1;
|
||||
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, &pMsg->rpcMsg);
|
||||
if (pTrans == NULL) {
|
||||
mError("qnode:%d, failed to drop since %s", pObj->id, terrstr());
|
||||
goto DROP_QNODE_OVER;
|
||||
}
|
||||
if (pTrans == NULL) goto DROP_QNODE_OVER;
|
||||
|
||||
mDebug("trans:%d, used to drop qnode:%d", pTrans->id, pObj->id);
|
||||
|
||||
if (mndSetDropQnodeRedoLogs(pTrans, pObj) != 0) {
|
||||
mError("trans:%d, failed to set redo log since %s", pTrans->id, terrstr());
|
||||
goto DROP_QNODE_OVER;
|
||||
}
|
||||
|
||||
if (mndSetDropQnodeCommitLogs(pTrans, pObj) != 0) {
|
||||
mError("trans:%d, failed to set commit log since %s", pTrans->id, terrstr());
|
||||
goto DROP_QNODE_OVER;
|
||||
}
|
||||
|
||||
if (mndSetDropQnodeRedoActions(pTrans, pObj->pDnode, pObj) != 0) {
|
||||
mError("trans:%d, failed to set redo actions since %s", pTrans->id, terrstr());
|
||||
goto DROP_QNODE_OVER;
|
||||
}
|
||||
|
||||
if (mndTransPrepare(pMnode, pTrans) != 0) {
|
||||
mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
|
||||
goto DROP_QNODE_OVER;
|
||||
}
|
||||
if (mndSetDropQnodeRedoLogs(pTrans, pObj) != 0) goto DROP_QNODE_OVER;
|
||||
if (mndSetDropQnodeCommitLogs(pTrans, pObj) != 0) goto DROP_QNODE_OVER;
|
||||
if (mndSetDropQnodeRedoActions(pTrans, pObj->pDnode, pObj) != 0) goto DROP_QNODE_OVER;
|
||||
if (mndTransPrepare(pMnode, pTrans) != 0) goto DROP_QNODE_OVER;
|
||||
|
||||
code = 0;
|
||||
|
||||
|
@ -363,18 +366,18 @@ static int32_t mndProcessDropQnodeReq(SMnodeMsg *pMsg) {
|
|||
|
||||
SQnodeObj *pObj = mndAcquireQnode(pMnode, pDrop->dnodeId);
|
||||
if (pObj == NULL) {
|
||||
mError("qnode:%d, not exist", pDrop->dnodeId);
|
||||
terrno = TSDB_CODE_MND_QNODE_NOT_EXIST;
|
||||
mError("qnode:%d, failed to drop since %s", pDrop->dnodeId, terrstr());
|
||||
return -1;
|
||||
}
|
||||
|
||||
int32_t code = mndDropQnode(pMnode, pMsg, pObj);
|
||||
if (code != 0) {
|
||||
sdbRelease(pMnode->pSdb, pObj);
|
||||
mError("qnode:%d, failed to drop since %s", pMnode->dnodeId, terrstr());
|
||||
return -1;
|
||||
}
|
||||
|
||||
sdbRelease(pMnode->pSdb, pMnode);
|
||||
sdbRelease(pMnode->pSdb, pObj);
|
||||
return TSDB_CODE_MND_ACTION_IN_PROGRESS;
|
||||
}
|
||||
|
||||
|
|
|
@ -26,7 +26,7 @@ static SSdbRaw *mndSnodeActionEncode(SSnodeObj *pObj);
|
|||
static SSdbRow *mndSnodeActionDecode(SSdbRaw *pRaw);
|
||||
static int32_t mndSnodeActionInsert(SSdb *pSdb, SSnodeObj *pObj);
|
||||
static int32_t mndSnodeActionDelete(SSdb *pSdb, SSnodeObj *pObj);
|
||||
static int32_t mndSnodeActionUpdate(SSdb *pSdb, SSnodeObj *pOldSnode, SSnodeObj *pNewSnode);
|
||||
static int32_t mndSnodeActionUpdate(SSdb *pSdb, SSnodeObj *pOld, SSnodeObj *pNew);
|
||||
static int32_t mndProcessCreateSnodeReq(SMnodeMsg *pMsg);
|
||||
static int32_t mndProcessDropSnodeReq(SMnodeMsg *pMsg);
|
||||
static int32_t mndProcessCreateSnodeRsp(SMnodeMsg *pMsg);
|
||||
|
@ -59,9 +59,8 @@ int32_t mndInitSnode(SMnode *pMnode) {
|
|||
void mndCleanupSnode(SMnode *pMnode) {}
|
||||
|
||||
static SSnodeObj *mndAcquireSnode(SMnode *pMnode, int32_t snodeId) {
|
||||
SSdb *pSdb = pMnode->pSdb;
|
||||
SSnodeObj *pObj = sdbAcquire(pSdb, SDB_SNODE, &snodeId);
|
||||
if (pObj == NULL) {
|
||||
SSnodeObj *pObj = sdbAcquire(pMnode->pSdb, SDB_SNODE, &snodeId);
|
||||
if (pObj == NULL && terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
|
||||
terrno = TSDB_CODE_MND_SNODE_NOT_EXIST;
|
||||
}
|
||||
return pObj;
|
||||
|
@ -155,9 +154,9 @@ static int32_t mndSnodeActionDelete(SSdb *pSdb, SSnodeObj *pObj) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
static int32_t mndSnodeActionUpdate(SSdb *pSdb, SSnodeObj *pOldSnode, SSnodeObj *pNewSnode) {
|
||||
mTrace("snode:%d, perform update action, old_row:%p new_row:%p", pOldSnode->id, pOldSnode, pNewSnode);
|
||||
pOldSnode->updateTime = pNewSnode->updateTime;
|
||||
static int32_t mndSnodeActionUpdate(SSdb *pSdb, SSnodeObj *pOld, SSnodeObj *pNew) {
|
||||
mTrace("snode:%d, perform update action, old_row:%p new_row:%p", pOld->id, pOld, pNew);
|
||||
pOld->updateTime = pNew->updateTime;
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -169,6 +168,14 @@ static int32_t mndSetCreateSnodeRedoLogs(STrans *pTrans, SSnodeObj *pObj) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
static int32_t mndSetCreateSnodeUndoLogs(STrans *pTrans, SSnodeObj *pObj) {
|
||||
SSdbRaw *pUndoRaw = mndSnodeActionEncode(pObj);
|
||||
if (pUndoRaw == NULL) return -1;
|
||||
if (mndTransAppendUndolog(pTrans, pUndoRaw) != 0) return -1;
|
||||
if (sdbSetRawStatus(pUndoRaw, SDB_STATUS_DROPPED) != 0) return -1;
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int32_t mndSetCreateSnodeCommitLogs(STrans *pTrans, SSnodeObj *pObj) {
|
||||
SSdbRaw *pCommitRaw = mndSnodeActionEncode(pObj);
|
||||
if (pCommitRaw == NULL) return -1;
|
||||
|
@ -190,6 +197,7 @@ static int32_t mndSetCreateSnodeRedoActions(STrans *pTrans, SDnodeObj *pDnode, S
|
|||
action.pCont = pMsg;
|
||||
action.contLen = sizeof(SDCreateSnodeReq);
|
||||
action.msgType = TDMT_DND_CREATE_SNODE;
|
||||
action.acceptableCode = TSDB_CODE_DND_SNODE_ALREADY_DEPLOYED;
|
||||
|
||||
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
|
||||
free(pMsg);
|
||||
|
@ -199,39 +207,48 @@ static int32_t mndSetCreateSnodeRedoActions(STrans *pTrans, SDnodeObj *pDnode, S
|
|||
return 0;
|
||||
}
|
||||
|
||||
static int32_t mndSetCreateSnodeUndoActions(STrans *pTrans, SDnodeObj *pDnode, SSnodeObj *pObj) {
|
||||
SDDropSnodeReq *pMsg = malloc(sizeof(SDDropSnodeReq));
|
||||
if (pMsg == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
}
|
||||
pMsg->dnodeId = htonl(pDnode->id);
|
||||
|
||||
STransAction action = {0};
|
||||
action.epSet = mndGetDnodeEpset(pDnode);
|
||||
action.pCont = pMsg;
|
||||
action.contLen = sizeof(SDDropSnodeReq);
|
||||
action.msgType = TDMT_DND_DROP_SNODE;
|
||||
action.acceptableCode = TSDB_CODE_DND_SNODE_NOT_DEPLOYED;
|
||||
|
||||
if (mndTransAppendUndoAction(pTrans, &action) != 0) {
|
||||
free(pMsg);
|
||||
return -1;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int32_t mndCreateSnode(SMnode *pMnode, SMnodeMsg *pMsg, SDnodeObj *pDnode, SMCreateSnodeReq *pCreate) {
|
||||
int32_t code = -1;
|
||||
|
||||
SSnodeObj snodeObj = {0};
|
||||
snodeObj.id = pDnode->id;
|
||||
snodeObj.createdTime = taosGetTimestampMs();
|
||||
snodeObj.updateTime = snodeObj.createdTime;
|
||||
|
||||
int32_t code = -1;
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, &pMsg->rpcMsg);
|
||||
if (pTrans == NULL) {
|
||||
mError("snode:%d, failed to create since %s", pCreate->dnodeId, terrstr());
|
||||
goto CREATE_SNODE_OVER;
|
||||
}
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, &pMsg->rpcMsg);
|
||||
if (pTrans == NULL) goto CREATE_SNODE_OVER;
|
||||
|
||||
mDebug("trans:%d, used to create snode:%d", pTrans->id, pCreate->dnodeId);
|
||||
|
||||
if (mndSetCreateSnodeRedoLogs(pTrans, &snodeObj) != 0) {
|
||||
mError("trans:%d, failed to set redo log since %s", pTrans->id, terrstr());
|
||||
goto CREATE_SNODE_OVER;
|
||||
}
|
||||
|
||||
if (mndSetCreateSnodeCommitLogs(pTrans, &snodeObj) != 0) {
|
||||
mError("trans:%d, failed to set commit log since %s", pTrans->id, terrstr());
|
||||
goto CREATE_SNODE_OVER;
|
||||
}
|
||||
|
||||
if (mndSetCreateSnodeRedoActions(pTrans, pDnode, &snodeObj) != 0) {
|
||||
mError("trans:%d, failed to set redo actions since %s", pTrans->id, terrstr());
|
||||
goto CREATE_SNODE_OVER;
|
||||
}
|
||||
|
||||
if (mndTransPrepare(pMnode, pTrans) != 0) {
|
||||
mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
|
||||
goto CREATE_SNODE_OVER;
|
||||
}
|
||||
if (mndSetCreateSnodeRedoLogs(pTrans, &snodeObj) != 0) goto CREATE_SNODE_OVER;
|
||||
if (mndSetCreateSnodeUndoLogs(pTrans, &snodeObj) != 0) goto CREATE_SNODE_OVER;
|
||||
if (mndSetCreateSnodeCommitLogs(pTrans, &snodeObj) != 0) goto CREATE_SNODE_OVER;
|
||||
if (mndSetCreateSnodeRedoActions(pTrans, pDnode, &snodeObj) != 0) goto CREATE_SNODE_OVER;
|
||||
if (mndSetCreateSnodeUndoActions(pTrans, pDnode, &snodeObj) != 0) goto CREATE_SNODE_OVER;
|
||||
if (mndTransPrepare(pMnode, pTrans) != 0) goto CREATE_SNODE_OVER;
|
||||
|
||||
code = 0;
|
||||
|
||||
|
@ -251,8 +268,12 @@ static int32_t mndProcessCreateSnodeReq(SMnodeMsg *pMsg) {
|
|||
SSnodeObj *pObj = mndAcquireSnode(pMnode, pCreate->dnodeId);
|
||||
if (pObj != NULL) {
|
||||
mError("snode:%d, snode already exist", pObj->id);
|
||||
terrno = TSDB_CODE_MND_SNODE_ALREADY_EXIST;
|
||||
mndReleaseSnode(pMnode, pObj);
|
||||
return -1;
|
||||
} else if (terrno != TSDB_CODE_MND_SNODE_NOT_EXIST) {
|
||||
mError("snode:%d, failed to create snode since %s", pCreate->dnodeId, terrstr());
|
||||
return -1;
|
||||
}
|
||||
|
||||
SDnodeObj *pDnode = mndAcquireDnode(pMnode, pCreate->dnodeId);
|
||||
|
@ -302,6 +323,7 @@ static int32_t mndSetDropSnodeRedoActions(STrans *pTrans, SDnodeObj *pDnode, SSn
|
|||
action.pCont = pMsg;
|
||||
action.contLen = sizeof(SDDropSnodeReq);
|
||||
action.msgType = TDMT_DND_DROP_SNODE;
|
||||
action.acceptableCode = TSDB_CODE_DND_SNODE_NOT_DEPLOYED;
|
||||
|
||||
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
|
||||
free(pMsg);
|
||||
|
@ -313,33 +335,16 @@ static int32_t mndSetDropSnodeRedoActions(STrans *pTrans, SDnodeObj *pDnode, SSn
|
|||
|
||||
static int32_t mndDropSnode(SMnode *pMnode, SMnodeMsg *pMsg, SSnodeObj *pObj) {
|
||||
int32_t code = -1;
|
||||
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, &pMsg->rpcMsg);
|
||||
if (pTrans == NULL) {
|
||||
mError("snode:%d, failed to drop since %s", pObj->id, terrstr());
|
||||
goto DROP_SNODE_OVER;
|
||||
}
|
||||
if (pTrans == NULL) goto DROP_SNODE_OVER;
|
||||
|
||||
mDebug("trans:%d, used to drop snode:%d", pTrans->id, pObj->id);
|
||||
|
||||
if (mndSetDropSnodeRedoLogs(pTrans, pObj) != 0) {
|
||||
mError("trans:%d, failed to set redo log since %s", pTrans->id, terrstr());
|
||||
goto DROP_SNODE_OVER;
|
||||
}
|
||||
|
||||
if (mndSetDropSnodeCommitLogs(pTrans, pObj) != 0) {
|
||||
mError("trans:%d, failed to set commit log since %s", pTrans->id, terrstr());
|
||||
goto DROP_SNODE_OVER;
|
||||
}
|
||||
|
||||
if (mndSetDropSnodeRedoActions(pTrans, pObj->pDnode, pObj) != 0) {
|
||||
mError("trans:%d, failed to set redo actions since %s", pTrans->id, terrstr());
|
||||
goto DROP_SNODE_OVER;
|
||||
}
|
||||
|
||||
if (mndTransPrepare(pMnode, pTrans) != 0) {
|
||||
mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
|
||||
goto DROP_SNODE_OVER;
|
||||
}
|
||||
if (mndSetDropSnodeRedoLogs(pTrans, pObj) != 0) goto DROP_SNODE_OVER;
|
||||
if (mndSetDropSnodeCommitLogs(pTrans, pObj) != 0) goto DROP_SNODE_OVER;
|
||||
if (mndSetDropSnodeRedoActions(pTrans, pObj->pDnode, pObj) != 0) goto DROP_SNODE_OVER;
|
||||
if (mndTransPrepare(pMnode, pTrans) != 0) goto DROP_SNODE_OVER;
|
||||
|
||||
code = 0;
|
||||
|
||||
|
@ -363,18 +368,18 @@ static int32_t mndProcessDropSnodeReq(SMnodeMsg *pMsg) {
|
|||
|
||||
SSnodeObj *pObj = mndAcquireSnode(pMnode, pDrop->dnodeId);
|
||||
if (pObj == NULL) {
|
||||
mError("snode:%d, not exist", pDrop->dnodeId);
|
||||
terrno = TSDB_CODE_MND_SNODE_NOT_EXIST;
|
||||
mError("snode:%d, failed to drop since %s", pDrop->dnodeId, terrstr());
|
||||
return -1;
|
||||
}
|
||||
|
||||
int32_t code = mndDropSnode(pMnode, pMsg, pObj);
|
||||
if (code != 0) {
|
||||
sdbRelease(pMnode->pSdb, pObj);
|
||||
mError("snode:%d, failed to drop since %s", pMnode->dnodeId, terrstr());
|
||||
return -1;
|
||||
}
|
||||
|
||||
sdbRelease(pMnode->pSdb, pMnode);
|
||||
sdbRelease(pMnode->pSdb, pObj);
|
||||
return TSDB_CODE_MND_ACTION_IN_PROGRESS;
|
||||
}
|
||||
|
||||
|
|
|
@ -143,6 +143,7 @@ static SSdbRaw *mndTransActionEncode(STrans *pTrans) {
|
|||
STransAction *pAction = taosArrayGet(pTrans->redoActions, i);
|
||||
SDB_SET_BINARY(pRaw, dataPos, (void *)&pAction->epSet, sizeof(SEpSet), TRANS_ENCODE_OVER)
|
||||
SDB_SET_INT16(pRaw, dataPos, pAction->msgType, TRANS_ENCODE_OVER)
|
||||
SDB_SET_INT32(pRaw, dataPos, pAction->acceptableCode, TRANS_ENCODE_OVER)
|
||||
SDB_SET_INT32(pRaw, dataPos, pAction->contLen, TRANS_ENCODE_OVER)
|
||||
SDB_SET_BINARY(pRaw, dataPos, pAction->pCont, pAction->contLen, TRANS_ENCODE_OVER)
|
||||
}
|
||||
|
@ -151,6 +152,7 @@ static SSdbRaw *mndTransActionEncode(STrans *pTrans) {
|
|||
STransAction *pAction = taosArrayGet(pTrans->undoActions, i);
|
||||
SDB_SET_BINARY(pRaw, dataPos, (void *)&pAction->epSet, sizeof(SEpSet), TRANS_ENCODE_OVER)
|
||||
SDB_SET_INT16(pRaw, dataPos, pAction->msgType, TRANS_ENCODE_OVER)
|
||||
SDB_SET_INT32(pRaw, dataPos, pAction->acceptableCode, TRANS_ENCODE_OVER)
|
||||
SDB_SET_INT32(pRaw, dataPos, pAction->contLen, TRANS_ENCODE_OVER)
|
||||
SDB_SET_BINARY(pRaw, dataPos, (void *)pAction->pCont, pAction->contLen, TRANS_ENCODE_OVER)
|
||||
}
|
||||
|
@ -253,6 +255,7 @@ static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) {
|
|||
for (int32_t i = 0; i < redoActionNum; ++i) {
|
||||
SDB_GET_BINARY(pRaw, dataPos, (void *)&action.epSet, sizeof(SEpSet), TRANS_DECODE_OVER);
|
||||
SDB_GET_INT16(pRaw, dataPos, &action.msgType, TRANS_DECODE_OVER)
|
||||
SDB_GET_INT32(pRaw, dataPos, &action.acceptableCode, TRANS_DECODE_OVER)
|
||||
SDB_GET_INT32(pRaw, dataPos, &action.contLen, TRANS_DECODE_OVER)
|
||||
action.pCont = malloc(action.contLen);
|
||||
if (action.pCont == NULL) goto TRANS_DECODE_OVER;
|
||||
|
@ -264,6 +267,7 @@ static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) {
|
|||
for (int32_t i = 0; i < undoActionNum; ++i) {
|
||||
SDB_GET_BINARY(pRaw, dataPos, (void *)&action.epSet, sizeof(SEpSet), TRANS_DECODE_OVER);
|
||||
SDB_GET_INT16(pRaw, dataPos, &action.msgType, TRANS_DECODE_OVER)
|
||||
SDB_GET_INT32(pRaw, dataPos, &action.acceptableCode, TRANS_DECODE_OVER)
|
||||
SDB_GET_INT32(pRaw, dataPos, &action.contLen, TRANS_DECODE_OVER)
|
||||
action.pCont = malloc(action.contLen);
|
||||
if (action.pCont == NULL) goto TRANS_DECODE_OVER;
|
||||
|
@ -496,10 +500,31 @@ static int32_t mndTransRollback(SMnode *pMnode, STrans *pTrans) {
|
|||
}
|
||||
|
||||
static void mndTransSendRpcRsp(STrans *pTrans) {
|
||||
if (pTrans->rpcHandle != NULL) {
|
||||
mDebug("trans:%d, send rsp, ahandle:%p code:0x%x", pTrans->id, pTrans->rpcAHandle, pTrans->code & 0xFFFF);
|
||||
bool sendRsp = false;
|
||||
|
||||
if (pTrans->stage == TRN_STAGE_FINISHED) {
|
||||
sendRsp = true;
|
||||
}
|
||||
|
||||
if (pTrans->policy == TRN_POLICY_ROLLBACK) {
|
||||
if (pTrans->stage == TRN_STAGE_UNDO_LOG || pTrans->stage == TRN_STAGE_UNDO_ACTION ||
|
||||
pTrans->stage == TRN_STAGE_ROLLBACK) {
|
||||
sendRsp = true;
|
||||
}
|
||||
}
|
||||
|
||||
if (pTrans->policy == TRN_POLICY_RETRY) {
|
||||
if (pTrans->stage == TRN_STAGE_REDO_ACTION && pTrans->failedTimes > 0) {
|
||||
sendRsp = true;
|
||||
}
|
||||
}
|
||||
|
||||
if (sendRsp && pTrans->rpcHandle != NULL) {
|
||||
mDebug("trans:%d, send rsp, code:0x%x stage:%d app:%p", pTrans->id, pTrans->code & 0xFFFF, pTrans->stage,
|
||||
pTrans->rpcAHandle);
|
||||
SRpcMsg rspMsg = {.handle = pTrans->rpcHandle, .code = pTrans->code, .ahandle = pTrans->rpcAHandle};
|
||||
rpcSendResponse(&rspMsg);
|
||||
pTrans->rpcHandle = NULL;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -542,7 +567,8 @@ void mndTransProcessRsp(SMnodeMsg *pMsg) {
|
|||
pAction->errCode = pMsg->rpcMsg.code;
|
||||
}
|
||||
|
||||
mDebug("trans:%d, action:%d response is received, code:0x%x", transId, action, pMsg->rpcMsg.code);
|
||||
mDebug("trans:%d, action:%d response is received, code:0x%x, accept:0x%x", transId, action, pMsg->rpcMsg.code,
|
||||
pAction->acceptableCode);
|
||||
mndTransExecute(pMnode, pTrans);
|
||||
|
||||
HANDLE_ACTION_RSP_OVER:
|
||||
|
@ -642,7 +668,7 @@ static int32_t mndTransExecuteActions(SMnode *pMnode, STrans *pTrans, SArray *pA
|
|||
if (pAction == NULL) continue;
|
||||
if (pAction->msgSent && pAction->msgReceived) {
|
||||
numOfReceived++;
|
||||
if (pAction->errCode != 0) {
|
||||
if (pAction->errCode != 0 && pAction->errCode != pAction->acceptableCode) {
|
||||
errCode = pAction->errCode;
|
||||
}
|
||||
}
|
||||
|
@ -690,7 +716,7 @@ static bool mndTransPerformRedoLogStage(SMnode *pMnode, STrans *pTrans) {
|
|||
} else {
|
||||
pTrans->code = terrno;
|
||||
pTrans->stage = TRN_STAGE_UNDO_LOG;
|
||||
mError("trans:%d, stage from redoLog to undoLog", pTrans->id);
|
||||
mError("trans:%d, stage from redoLog to undoLog since %s", pTrans->id, terrstr());
|
||||
}
|
||||
|
||||
return continueExec;
|
||||
|
@ -764,7 +790,6 @@ static bool mndTransPerformCommitLogStage(SMnode *pMnode, STrans *pTrans) {
|
|||
pTrans->failedTimes++;
|
||||
mError("trans:%d, stage keep on commitLog since %s", pTrans->id, terrstr());
|
||||
continueExec = false;
|
||||
;
|
||||
}
|
||||
|
||||
return continueExec;
|
||||
|
@ -791,7 +816,7 @@ static bool mndTransPerformUndoActionStage(SMnode *pMnode, STrans *pTrans) {
|
|||
int32_t code = mndTransExecuteUndoActions(pMnode, pTrans);
|
||||
|
||||
if (code == 0) {
|
||||
pTrans->stage = TRN_STAGE_REDO_LOG;
|
||||
pTrans->stage = TRN_STAGE_UNDO_LOG;
|
||||
mDebug("trans:%d, stage from undoAction to undoLog", pTrans->id);
|
||||
continueExec = true;
|
||||
} else if (code == TSDB_CODE_MND_ACTION_IN_PROGRESS) {
|
||||
|
@ -814,7 +839,6 @@ static bool mndTransPerformRollbackStage(SMnode *pMnode, STrans *pTrans) {
|
|||
pTrans->stage = TRN_STAGE_FINISHED;
|
||||
mDebug("trans:%d, stage from rollback to finished", pTrans->id);
|
||||
continueExec = true;
|
||||
;
|
||||
} else {
|
||||
pTrans->failedTimes++;
|
||||
mError("trans:%d, stage keep on rollback since %s", pTrans->id, terrstr());
|
||||
|
@ -880,9 +904,7 @@ static void mndTransExecute(SMnode *pMnode, STrans *pTrans) {
|
|||
}
|
||||
}
|
||||
|
||||
if (pTrans->stage == TRN_STAGE_FINISHED) {
|
||||
mndTransSendRpcRsp(pTrans);
|
||||
}
|
||||
}
|
||||
|
||||
static int32_t mndProcessTransMsg(SMnodeMsg *pMsg) {
|
||||
|
|
|
@ -50,7 +50,18 @@ TEST_F(MndTestBnode, 01_Show_Bnode) {
|
|||
EXPECT_EQ(test.GetShowRows(), 0);
|
||||
}
|
||||
|
||||
TEST_F(MndTestBnode, 02_Create_Bnode_Invalid_Id) {
|
||||
TEST_F(MndTestBnode, 02_Create_Bnode) {
|
||||
{
|
||||
int32_t contLen = sizeof(SMCreateBnodeReq);
|
||||
|
||||
SMCreateBnodeReq* pReq = (SMCreateBnodeReq*)rpcMallocCont(contLen);
|
||||
pReq->dnodeId = htonl(2);
|
||||
|
||||
SRpcMsg* pMsg = test.SendMsg(TDMT_MND_CREATE_BNODE, pReq, contLen);
|
||||
ASSERT_NE(pMsg, nullptr);
|
||||
ASSERT_EQ(pMsg->code, TSDB_CODE_MND_DNODE_NOT_EXIST);
|
||||
}
|
||||
|
||||
{
|
||||
int32_t contLen = sizeof(SMCreateBnodeReq);
|
||||
|
||||
|
@ -63,11 +74,6 @@ TEST_F(MndTestBnode, 02_Create_Bnode_Invalid_Id) {
|
|||
|
||||
test.SendShowMetaMsg(TSDB_MGMT_TABLE_BNODE, "");
|
||||
CHECK_META("show bnodes", 3);
|
||||
|
||||
CHECK_SCHEMA(0, TSDB_DATA_TYPE_SMALLINT, 2, "id");
|
||||
CHECK_SCHEMA(1, TSDB_DATA_TYPE_BINARY, TSDB_EP_LEN + VARSTR_HEADER_SIZE, "endpoint");
|
||||
CHECK_SCHEMA(2, TSDB_DATA_TYPE_TIMESTAMP, 8, "create_time");
|
||||
|
||||
test.SendShowRetrieveMsg();
|
||||
EXPECT_EQ(test.GetShowRows(), 1);
|
||||
|
||||
|
@ -75,24 +81,21 @@ TEST_F(MndTestBnode, 02_Create_Bnode_Invalid_Id) {
|
|||
CheckBinary("localhost:9018", TSDB_EP_LEN);
|
||||
CheckTimestamp();
|
||||
}
|
||||
}
|
||||
|
||||
TEST_F(MndTestBnode, 03_Create_Bnode_Invalid_Id) {
|
||||
{
|
||||
int32_t contLen = sizeof(SMCreateBnodeReq);
|
||||
|
||||
SMCreateBnodeReq* pReq = (SMCreateBnodeReq*)rpcMallocCont(contLen);
|
||||
pReq->dnodeId = htonl(2);
|
||||
pReq->dnodeId = htonl(1);
|
||||
|
||||
SRpcMsg* pMsg = test.SendMsg(TDMT_MND_CREATE_BNODE, pReq, contLen);
|
||||
ASSERT_NE(pMsg, nullptr);
|
||||
ASSERT_EQ(pMsg->code, TSDB_CODE_MND_DNODE_NOT_EXIST);
|
||||
ASSERT_EQ(pMsg->code, TSDB_CODE_MND_BNODE_ALREADY_EXIST);
|
||||
}
|
||||
}
|
||||
|
||||
TEST_F(MndTestBnode, 04_Create_Bnode) {
|
||||
TEST_F(MndTestBnode, 03_Drop_Bnode) {
|
||||
{
|
||||
// create dnode
|
||||
int32_t contLen = sizeof(SCreateDnodeMsg);
|
||||
|
||||
SCreateDnodeMsg* pReq = (SCreateDnodeMsg*)rpcMallocCont(contLen);
|
||||
|
@ -110,7 +113,6 @@ TEST_F(MndTestBnode, 04_Create_Bnode) {
|
|||
}
|
||||
|
||||
{
|
||||
// create bnode
|
||||
int32_t contLen = sizeof(SMCreateBnodeReq);
|
||||
|
||||
SMCreateBnodeReq* pReq = (SMCreateBnodeReq*)rpcMallocCont(contLen);
|
||||
|
@ -133,7 +135,6 @@ TEST_F(MndTestBnode, 04_Create_Bnode) {
|
|||
}
|
||||
|
||||
{
|
||||
// drop bnode
|
||||
int32_t contLen = sizeof(SMDropBnodeReq);
|
||||
|
||||
SMDropBnodeReq* pReq = (SMDropBnodeReq*)rpcMallocCont(contLen);
|
||||
|
@ -151,4 +152,143 @@ TEST_F(MndTestBnode, 04_Create_Bnode) {
|
|||
CheckBinary("localhost:9018", TSDB_EP_LEN);
|
||||
CheckTimestamp();
|
||||
}
|
||||
|
||||
{
|
||||
int32_t contLen = sizeof(SMDropBnodeReq);
|
||||
|
||||
SMDropBnodeReq* pReq = (SMDropBnodeReq*)rpcMallocCont(contLen);
|
||||
pReq->dnodeId = htonl(2);
|
||||
|
||||
SRpcMsg* pMsg = test.SendMsg(TDMT_MND_DROP_BNODE, pReq, contLen);
|
||||
ASSERT_NE(pMsg, nullptr);
|
||||
ASSERT_EQ(pMsg->code, TSDB_CODE_MND_BNODE_NOT_EXIST);
|
||||
}
|
||||
}
|
||||
|
||||
TEST_F(MndTestBnode, 03_Create_Bnode_Rollback) {
|
||||
{
|
||||
// send message first, then dnode2 crash, result is returned, and rollback is started
|
||||
int32_t contLen = sizeof(SMCreateBnodeReq);
|
||||
|
||||
SMCreateBnodeReq* pReq = (SMCreateBnodeReq*)rpcMallocCont(contLen);
|
||||
pReq->dnodeId = htonl(2);
|
||||
|
||||
server2.Stop();
|
||||
SRpcMsg* pMsg = test.SendMsg(TDMT_MND_CREATE_BNODE, pReq, contLen);
|
||||
ASSERT_NE(pMsg, nullptr);
|
||||
ASSERT_EQ(pMsg->code, TSDB_CODE_RPC_NETWORK_UNAVAIL);
|
||||
}
|
||||
|
||||
{
|
||||
// continue send message, bnode is creating
|
||||
int32_t contLen = sizeof(SMCreateBnodeReq);
|
||||
|
||||
SMCreateBnodeReq* pReq = (SMCreateBnodeReq*)rpcMallocCont(contLen);
|
||||
pReq->dnodeId = htonl(2);
|
||||
|
||||
server2.Stop();
|
||||
SRpcMsg* pMsg = test.SendMsg(TDMT_MND_CREATE_BNODE, pReq, contLen);
|
||||
ASSERT_NE(pMsg, nullptr);
|
||||
ASSERT_EQ(pMsg->code, TSDB_CODE_SDB_OBJ_CREATING);
|
||||
}
|
||||
|
||||
{
|
||||
// continue send message, bnode is creating
|
||||
int32_t contLen = sizeof(SMDropBnodeReq);
|
||||
|
||||
SMDropBnodeReq* pReq = (SMDropBnodeReq*)rpcMallocCont(contLen);
|
||||
pReq->dnodeId = htonl(2);
|
||||
|
||||
server2.Stop();
|
||||
SRpcMsg* pMsg = test.SendMsg(TDMT_MND_DROP_BNODE, pReq, contLen);
|
||||
ASSERT_NE(pMsg, nullptr);
|
||||
ASSERT_EQ(pMsg->code, TSDB_CODE_SDB_OBJ_CREATING);
|
||||
}
|
||||
|
||||
{
|
||||
// server start, wait until the rollback finished
|
||||
server2.DoStart();
|
||||
taosMsleep(1000);
|
||||
|
||||
int32_t retry = 0;
|
||||
int32_t retryMax = 10;
|
||||
|
||||
for (retry = 0; retry < retryMax; retry++) {
|
||||
int32_t contLen = sizeof(SMCreateBnodeReq);
|
||||
|
||||
SMCreateBnodeReq* pReq = (SMCreateBnodeReq*)rpcMallocCont(contLen);
|
||||
pReq->dnodeId = htonl(2);
|
||||
|
||||
SRpcMsg* pMsg = test.SendMsg(TDMT_MND_CREATE_BNODE, pReq, contLen);
|
||||
ASSERT_NE(pMsg, nullptr);
|
||||
if (pMsg->code == 0) break;
|
||||
taosMsleep(1000);
|
||||
}
|
||||
|
||||
ASSERT_NE(retry, retryMax);
|
||||
}
|
||||
}
|
||||
|
||||
TEST_F(MndTestBnode, 04_Drop_Bnode_Rollback) {
|
||||
{
|
||||
// send message first, then dnode2 crash, result is returned, and rollback is started
|
||||
int32_t contLen = sizeof(SMDropBnodeReq);
|
||||
|
||||
SMDropBnodeReq* pReq = (SMDropBnodeReq*)rpcMallocCont(contLen);
|
||||
pReq->dnodeId = htonl(2);
|
||||
|
||||
server2.Stop();
|
||||
SRpcMsg* pMsg = test.SendMsg(TDMT_MND_DROP_BNODE, pReq, contLen);
|
||||
ASSERT_NE(pMsg, nullptr);
|
||||
ASSERT_EQ(pMsg->code, TSDB_CODE_RPC_NETWORK_UNAVAIL);
|
||||
}
|
||||
|
||||
{
|
||||
// continue send message, bnode is dropping
|
||||
int32_t contLen = sizeof(SMCreateBnodeReq);
|
||||
|
||||
SMCreateBnodeReq* pReq = (SMCreateBnodeReq*)rpcMallocCont(contLen);
|
||||
pReq->dnodeId = htonl(2);
|
||||
|
||||
server2.Stop();
|
||||
SRpcMsg* pMsg = test.SendMsg(TDMT_MND_CREATE_BNODE, pReq, contLen);
|
||||
ASSERT_NE(pMsg, nullptr);
|
||||
ASSERT_EQ(pMsg->code, TSDB_CODE_SDB_OBJ_DROPPING);
|
||||
}
|
||||
|
||||
{
|
||||
// continue send message, bnode is dropping
|
||||
int32_t contLen = sizeof(SMDropBnodeReq);
|
||||
|
||||
SMDropBnodeReq* pReq = (SMDropBnodeReq*)rpcMallocCont(contLen);
|
||||
pReq->dnodeId = htonl(2);
|
||||
|
||||
server2.Stop();
|
||||
SRpcMsg* pMsg = test.SendMsg(TDMT_MND_DROP_BNODE, pReq, contLen);
|
||||
ASSERT_NE(pMsg, nullptr);
|
||||
ASSERT_EQ(pMsg->code, TSDB_CODE_SDB_OBJ_DROPPING);
|
||||
}
|
||||
|
||||
{
|
||||
// server start, wait until the rollback finished
|
||||
server2.DoStart();
|
||||
taosMsleep(1000);
|
||||
|
||||
int32_t retry = 0;
|
||||
int32_t retryMax = 10;
|
||||
|
||||
for (retry = 0; retry < retryMax; retry++) {
|
||||
int32_t contLen = sizeof(SMCreateBnodeReq);
|
||||
|
||||
SMCreateBnodeReq* pReq = (SMCreateBnodeReq*)rpcMallocCont(contLen);
|
||||
pReq->dnodeId = htonl(2);
|
||||
|
||||
SRpcMsg* pMsg = test.SendMsg(TDMT_MND_CREATE_BNODE, pReq, contLen);
|
||||
ASSERT_NE(pMsg, nullptr);
|
||||
if (pMsg->code == 0) break;
|
||||
taosMsleep(1000);
|
||||
}
|
||||
|
||||
ASSERT_NE(retry, retryMax);
|
||||
}
|
||||
}
|
|
@ -90,13 +90,12 @@ TEST_F(MndTestQnode, 02_Create_Qnode) {
|
|||
|
||||
SRpcMsg* pMsg = test.SendMsg(TDMT_MND_CREATE_QNODE, pReq, contLen);
|
||||
ASSERT_NE(pMsg, nullptr);
|
||||
ASSERT_EQ(pMsg->code, TSDB_CODE_MND_DNODE_NOT_EXIST);
|
||||
ASSERT_EQ(pMsg->code, TSDB_CODE_MND_QNODE_ALREADY_EXIST);
|
||||
}
|
||||
}
|
||||
|
||||
TEST_F(MndTestQnode, 04_Create_Qnode) {
|
||||
TEST_F(MndTestQnode, 03_Drop_Qnode) {
|
||||
{
|
||||
// create dnode
|
||||
int32_t contLen = sizeof(SCreateDnodeMsg);
|
||||
|
||||
SCreateDnodeMsg* pReq = (SCreateDnodeMsg*)rpcMallocCont(contLen);
|
||||
|
@ -114,7 +113,6 @@ TEST_F(MndTestQnode, 04_Create_Qnode) {
|
|||
}
|
||||
|
||||
{
|
||||
// create qnode
|
||||
int32_t contLen = sizeof(SMCreateQnodeReq);
|
||||
|
||||
SMCreateQnodeReq* pReq = (SMCreateQnodeReq*)rpcMallocCont(contLen);
|
||||
|
@ -137,7 +135,6 @@ TEST_F(MndTestQnode, 04_Create_Qnode) {
|
|||
}
|
||||
|
||||
{
|
||||
// drop qnode
|
||||
int32_t contLen = sizeof(SMDropQnodeReq);
|
||||
|
||||
SMDropQnodeReq* pReq = (SMDropQnodeReq*)rpcMallocCont(contLen);
|
||||
|
@ -155,4 +152,143 @@ TEST_F(MndTestQnode, 04_Create_Qnode) {
|
|||
CheckBinary("localhost:9014", TSDB_EP_LEN);
|
||||
CheckTimestamp();
|
||||
}
|
||||
|
||||
{
|
||||
int32_t contLen = sizeof(SMDropQnodeReq);
|
||||
|
||||
SMDropQnodeReq* pReq = (SMDropQnodeReq*)rpcMallocCont(contLen);
|
||||
pReq->dnodeId = htonl(2);
|
||||
|
||||
SRpcMsg* pMsg = test.SendMsg(TDMT_MND_DROP_QNODE, pReq, contLen);
|
||||
ASSERT_NE(pMsg, nullptr);
|
||||
ASSERT_EQ(pMsg->code, TSDB_CODE_MND_QNODE_NOT_EXIST);
|
||||
}
|
||||
}
|
||||
|
||||
TEST_F(MndTestQnode, 03_Create_Qnode_Rollback) {
|
||||
{
|
||||
// send message first, then dnode2 crash, result is returned, and rollback is started
|
||||
int32_t contLen = sizeof(SMCreateQnodeReq);
|
||||
|
||||
SMCreateQnodeReq* pReq = (SMCreateQnodeReq*)rpcMallocCont(contLen);
|
||||
pReq->dnodeId = htonl(2);
|
||||
|
||||
server2.Stop();
|
||||
SRpcMsg* pMsg = test.SendMsg(TDMT_MND_CREATE_QNODE, pReq, contLen);
|
||||
ASSERT_NE(pMsg, nullptr);
|
||||
ASSERT_EQ(pMsg->code, TSDB_CODE_RPC_NETWORK_UNAVAIL);
|
||||
}
|
||||
|
||||
{
|
||||
// continue send message, qnode is creating
|
||||
int32_t contLen = sizeof(SMCreateQnodeReq);
|
||||
|
||||
SMCreateQnodeReq* pReq = (SMCreateQnodeReq*)rpcMallocCont(contLen);
|
||||
pReq->dnodeId = htonl(2);
|
||||
|
||||
server2.Stop();
|
||||
SRpcMsg* pMsg = test.SendMsg(TDMT_MND_CREATE_QNODE, pReq, contLen);
|
||||
ASSERT_NE(pMsg, nullptr);
|
||||
ASSERT_EQ(pMsg->code, TSDB_CODE_SDB_OBJ_CREATING);
|
||||
}
|
||||
|
||||
{
|
||||
// continue send message, qnode is creating
|
||||
int32_t contLen = sizeof(SMDropQnodeReq);
|
||||
|
||||
SMDropQnodeReq* pReq = (SMDropQnodeReq*)rpcMallocCont(contLen);
|
||||
pReq->dnodeId = htonl(2);
|
||||
|
||||
server2.Stop();
|
||||
SRpcMsg* pMsg = test.SendMsg(TDMT_MND_DROP_QNODE, pReq, contLen);
|
||||
ASSERT_NE(pMsg, nullptr);
|
||||
ASSERT_EQ(pMsg->code, TSDB_CODE_SDB_OBJ_CREATING);
|
||||
}
|
||||
|
||||
{
|
||||
// server start, wait until the rollback finished
|
||||
server2.DoStart();
|
||||
taosMsleep(1000);
|
||||
|
||||
int32_t retry = 0;
|
||||
int32_t retryMax = 10;
|
||||
|
||||
for (retry = 0; retry < retryMax; retry++) {
|
||||
int32_t contLen = sizeof(SMCreateQnodeReq);
|
||||
|
||||
SMCreateQnodeReq* pReq = (SMCreateQnodeReq*)rpcMallocCont(contLen);
|
||||
pReq->dnodeId = htonl(2);
|
||||
|
||||
SRpcMsg* pMsg = test.SendMsg(TDMT_MND_CREATE_QNODE, pReq, contLen);
|
||||
ASSERT_NE(pMsg, nullptr);
|
||||
if (pMsg->code == 0) break;
|
||||
taosMsleep(1000);
|
||||
}
|
||||
|
||||
ASSERT_NE(retry, retryMax);
|
||||
}
|
||||
}
|
||||
|
||||
TEST_F(MndTestQnode, 04_Drop_Qnode_Rollback) {
|
||||
{
|
||||
// send message first, then dnode2 crash, result is returned, and rollback is started
|
||||
int32_t contLen = sizeof(SMDropQnodeReq);
|
||||
|
||||
SMDropQnodeReq* pReq = (SMDropQnodeReq*)rpcMallocCont(contLen);
|
||||
pReq->dnodeId = htonl(2);
|
||||
|
||||
server2.Stop();
|
||||
SRpcMsg* pMsg = test.SendMsg(TDMT_MND_DROP_QNODE, pReq, contLen);
|
||||
ASSERT_NE(pMsg, nullptr);
|
||||
ASSERT_EQ(pMsg->code, TSDB_CODE_RPC_NETWORK_UNAVAIL);
|
||||
}
|
||||
|
||||
{
|
||||
// continue send message, qnode is dropping
|
||||
int32_t contLen = sizeof(SMCreateQnodeReq);
|
||||
|
||||
SMCreateQnodeReq* pReq = (SMCreateQnodeReq*)rpcMallocCont(contLen);
|
||||
pReq->dnodeId = htonl(2);
|
||||
|
||||
server2.Stop();
|
||||
SRpcMsg* pMsg = test.SendMsg(TDMT_MND_CREATE_QNODE, pReq, contLen);
|
||||
ASSERT_NE(pMsg, nullptr);
|
||||
ASSERT_EQ(pMsg->code, TSDB_CODE_SDB_OBJ_DROPPING);
|
||||
}
|
||||
|
||||
{
|
||||
// continue send message, qnode is dropping
|
||||
int32_t contLen = sizeof(SMDropQnodeReq);
|
||||
|
||||
SMDropQnodeReq* pReq = (SMDropQnodeReq*)rpcMallocCont(contLen);
|
||||
pReq->dnodeId = htonl(2);
|
||||
|
||||
server2.Stop();
|
||||
SRpcMsg* pMsg = test.SendMsg(TDMT_MND_DROP_QNODE, pReq, contLen);
|
||||
ASSERT_NE(pMsg, nullptr);
|
||||
ASSERT_EQ(pMsg->code, TSDB_CODE_SDB_OBJ_DROPPING);
|
||||
}
|
||||
|
||||
{
|
||||
// server start, wait until the rollback finished
|
||||
server2.DoStart();
|
||||
taosMsleep(1000);
|
||||
|
||||
int32_t retry = 0;
|
||||
int32_t retryMax = 10;
|
||||
|
||||
for (retry = 0; retry < retryMax; retry++) {
|
||||
int32_t contLen = sizeof(SMCreateQnodeReq);
|
||||
|
||||
SMCreateQnodeReq* pReq = (SMCreateQnodeReq*)rpcMallocCont(contLen);
|
||||
pReq->dnodeId = htonl(2);
|
||||
|
||||
SRpcMsg* pMsg = test.SendMsg(TDMT_MND_CREATE_QNODE, pReq, contLen);
|
||||
ASSERT_NE(pMsg, nullptr);
|
||||
if (pMsg->code == 0) break;
|
||||
taosMsleep(1000);
|
||||
}
|
||||
|
||||
ASSERT_NE(retry, retryMax);
|
||||
}
|
||||
}
|
|
@ -50,7 +50,18 @@ TEST_F(MndTestSnode, 01_Show_Snode) {
|
|||
EXPECT_EQ(test.GetShowRows(), 0);
|
||||
}
|
||||
|
||||
TEST_F(MndTestSnode, 02_Create_Snode_Invalid_Id) {
|
||||
TEST_F(MndTestSnode, 02_Create_Snode) {
|
||||
{
|
||||
int32_t contLen = sizeof(SMCreateSnodeReq);
|
||||
|
||||
SMCreateSnodeReq* pReq = (SMCreateSnodeReq*)rpcMallocCont(contLen);
|
||||
pReq->dnodeId = htonl(2);
|
||||
|
||||
SRpcMsg* pMsg = test.SendMsg(TDMT_MND_CREATE_SNODE, pReq, contLen);
|
||||
ASSERT_NE(pMsg, nullptr);
|
||||
ASSERT_EQ(pMsg->code, TSDB_CODE_MND_DNODE_NOT_EXIST);
|
||||
}
|
||||
|
||||
{
|
||||
int32_t contLen = sizeof(SMCreateSnodeReq);
|
||||
|
||||
|
@ -63,11 +74,6 @@ TEST_F(MndTestSnode, 02_Create_Snode_Invalid_Id) {
|
|||
|
||||
test.SendShowMetaMsg(TSDB_MGMT_TABLE_SNODE, "");
|
||||
CHECK_META("show snodes", 3);
|
||||
|
||||
CHECK_SCHEMA(0, TSDB_DATA_TYPE_SMALLINT, 2, "id");
|
||||
CHECK_SCHEMA(1, TSDB_DATA_TYPE_BINARY, TSDB_EP_LEN + VARSTR_HEADER_SIZE, "endpoint");
|
||||
CHECK_SCHEMA(2, TSDB_DATA_TYPE_TIMESTAMP, 8, "create_time");
|
||||
|
||||
test.SendShowRetrieveMsg();
|
||||
EXPECT_EQ(test.GetShowRows(), 1);
|
||||
|
||||
|
@ -75,24 +81,21 @@ TEST_F(MndTestSnode, 02_Create_Snode_Invalid_Id) {
|
|||
CheckBinary("localhost:9016", TSDB_EP_LEN);
|
||||
CheckTimestamp();
|
||||
}
|
||||
}
|
||||
|
||||
TEST_F(MndTestSnode, 03_Create_Snode_Invalid_Id) {
|
||||
{
|
||||
int32_t contLen = sizeof(SMCreateSnodeReq);
|
||||
|
||||
SMCreateSnodeReq* pReq = (SMCreateSnodeReq*)rpcMallocCont(contLen);
|
||||
pReq->dnodeId = htonl(2);
|
||||
pReq->dnodeId = htonl(1);
|
||||
|
||||
SRpcMsg* pMsg = test.SendMsg(TDMT_MND_CREATE_SNODE, pReq, contLen);
|
||||
ASSERT_NE(pMsg, nullptr);
|
||||
ASSERT_EQ(pMsg->code, TSDB_CODE_MND_DNODE_NOT_EXIST);
|
||||
ASSERT_EQ(pMsg->code, TSDB_CODE_MND_SNODE_ALREADY_EXIST);
|
||||
}
|
||||
}
|
||||
|
||||
TEST_F(MndTestSnode, 04_Create_Snode) {
|
||||
TEST_F(MndTestSnode, 03_Drop_Snode) {
|
||||
{
|
||||
// create dnode
|
||||
int32_t contLen = sizeof(SCreateDnodeMsg);
|
||||
|
||||
SCreateDnodeMsg* pReq = (SCreateDnodeMsg*)rpcMallocCont(contLen);
|
||||
|
@ -110,7 +113,6 @@ TEST_F(MndTestSnode, 04_Create_Snode) {
|
|||
}
|
||||
|
||||
{
|
||||
// create snode
|
||||
int32_t contLen = sizeof(SMCreateSnodeReq);
|
||||
|
||||
SMCreateSnodeReq* pReq = (SMCreateSnodeReq*)rpcMallocCont(contLen);
|
||||
|
@ -133,7 +135,6 @@ TEST_F(MndTestSnode, 04_Create_Snode) {
|
|||
}
|
||||
|
||||
{
|
||||
// drop snode
|
||||
int32_t contLen = sizeof(SMDropSnodeReq);
|
||||
|
||||
SMDropSnodeReq* pReq = (SMDropSnodeReq*)rpcMallocCont(contLen);
|
||||
|
@ -151,4 +152,143 @@ TEST_F(MndTestSnode, 04_Create_Snode) {
|
|||
CheckBinary("localhost:9016", TSDB_EP_LEN);
|
||||
CheckTimestamp();
|
||||
}
|
||||
|
||||
{
|
||||
int32_t contLen = sizeof(SMDropSnodeReq);
|
||||
|
||||
SMDropSnodeReq* pReq = (SMDropSnodeReq*)rpcMallocCont(contLen);
|
||||
pReq->dnodeId = htonl(2);
|
||||
|
||||
SRpcMsg* pMsg = test.SendMsg(TDMT_MND_DROP_SNODE, pReq, contLen);
|
||||
ASSERT_NE(pMsg, nullptr);
|
||||
ASSERT_EQ(pMsg->code, TSDB_CODE_MND_SNODE_NOT_EXIST);
|
||||
}
|
||||
}
|
||||
|
||||
TEST_F(MndTestSnode, 03_Create_Snode_Rollback) {
|
||||
{
|
||||
// send message first, then dnode2 crash, result is returned, and rollback is started
|
||||
int32_t contLen = sizeof(SMCreateSnodeReq);
|
||||
|
||||
SMCreateSnodeReq* pReq = (SMCreateSnodeReq*)rpcMallocCont(contLen);
|
||||
pReq->dnodeId = htonl(2);
|
||||
|
||||
server2.Stop();
|
||||
SRpcMsg* pMsg = test.SendMsg(TDMT_MND_CREATE_SNODE, pReq, contLen);
|
||||
ASSERT_NE(pMsg, nullptr);
|
||||
ASSERT_EQ(pMsg->code, TSDB_CODE_RPC_NETWORK_UNAVAIL);
|
||||
}
|
||||
|
||||
{
|
||||
// continue send message, snode is creating
|
||||
int32_t contLen = sizeof(SMCreateSnodeReq);
|
||||
|
||||
SMCreateSnodeReq* pReq = (SMCreateSnodeReq*)rpcMallocCont(contLen);
|
||||
pReq->dnodeId = htonl(2);
|
||||
|
||||
server2.Stop();
|
||||
SRpcMsg* pMsg = test.SendMsg(TDMT_MND_CREATE_SNODE, pReq, contLen);
|
||||
ASSERT_NE(pMsg, nullptr);
|
||||
ASSERT_EQ(pMsg->code, TSDB_CODE_SDB_OBJ_CREATING);
|
||||
}
|
||||
|
||||
{
|
||||
// continue send message, snode is creating
|
||||
int32_t contLen = sizeof(SMDropSnodeReq);
|
||||
|
||||
SMDropSnodeReq* pReq = (SMDropSnodeReq*)rpcMallocCont(contLen);
|
||||
pReq->dnodeId = htonl(2);
|
||||
|
||||
server2.Stop();
|
||||
SRpcMsg* pMsg = test.SendMsg(TDMT_MND_DROP_SNODE, pReq, contLen);
|
||||
ASSERT_NE(pMsg, nullptr);
|
||||
ASSERT_EQ(pMsg->code, TSDB_CODE_SDB_OBJ_CREATING);
|
||||
}
|
||||
|
||||
{
|
||||
// server start, wait until the rollback finished
|
||||
server2.DoStart();
|
||||
taosMsleep(1000);
|
||||
|
||||
int32_t retry = 0;
|
||||
int32_t retryMax = 10;
|
||||
|
||||
for (retry = 0; retry < retryMax; retry++) {
|
||||
int32_t contLen = sizeof(SMCreateSnodeReq);
|
||||
|
||||
SMCreateSnodeReq* pReq = (SMCreateSnodeReq*)rpcMallocCont(contLen);
|
||||
pReq->dnodeId = htonl(2);
|
||||
|
||||
SRpcMsg* pMsg = test.SendMsg(TDMT_MND_CREATE_SNODE, pReq, contLen);
|
||||
ASSERT_NE(pMsg, nullptr);
|
||||
if (pMsg->code == 0) break;
|
||||
taosMsleep(1000);
|
||||
}
|
||||
|
||||
ASSERT_NE(retry, retryMax);
|
||||
}
|
||||
}
|
||||
|
||||
TEST_F(MndTestSnode, 04_Drop_Snode_Rollback) {
|
||||
{
|
||||
// send message first, then dnode2 crash, result is returned, and rollback is started
|
||||
int32_t contLen = sizeof(SMDropSnodeReq);
|
||||
|
||||
SMDropSnodeReq* pReq = (SMDropSnodeReq*)rpcMallocCont(contLen);
|
||||
pReq->dnodeId = htonl(2);
|
||||
|
||||
server2.Stop();
|
||||
SRpcMsg* pMsg = test.SendMsg(TDMT_MND_DROP_SNODE, pReq, contLen);
|
||||
ASSERT_NE(pMsg, nullptr);
|
||||
ASSERT_EQ(pMsg->code, TSDB_CODE_RPC_NETWORK_UNAVAIL);
|
||||
}
|
||||
|
||||
{
|
||||
// continue send message, snode is dropping
|
||||
int32_t contLen = sizeof(SMCreateSnodeReq);
|
||||
|
||||
SMCreateSnodeReq* pReq = (SMCreateSnodeReq*)rpcMallocCont(contLen);
|
||||
pReq->dnodeId = htonl(2);
|
||||
|
||||
server2.Stop();
|
||||
SRpcMsg* pMsg = test.SendMsg(TDMT_MND_CREATE_SNODE, pReq, contLen);
|
||||
ASSERT_NE(pMsg, nullptr);
|
||||
ASSERT_EQ(pMsg->code, TSDB_CODE_SDB_OBJ_DROPPING);
|
||||
}
|
||||
|
||||
{
|
||||
// continue send message, snode is dropping
|
||||
int32_t contLen = sizeof(SMDropSnodeReq);
|
||||
|
||||
SMDropSnodeReq* pReq = (SMDropSnodeReq*)rpcMallocCont(contLen);
|
||||
pReq->dnodeId = htonl(2);
|
||||
|
||||
server2.Stop();
|
||||
SRpcMsg* pMsg = test.SendMsg(TDMT_MND_DROP_SNODE, pReq, contLen);
|
||||
ASSERT_NE(pMsg, nullptr);
|
||||
ASSERT_EQ(pMsg->code, TSDB_CODE_SDB_OBJ_DROPPING);
|
||||
}
|
||||
|
||||
{
|
||||
// server start, wait until the rollback finished
|
||||
server2.DoStart();
|
||||
taosMsleep(1000);
|
||||
|
||||
int32_t retry = 0;
|
||||
int32_t retryMax = 10;
|
||||
|
||||
for (retry = 0; retry < retryMax; retry++) {
|
||||
int32_t contLen = sizeof(SMCreateSnodeReq);
|
||||
|
||||
SMCreateSnodeReq* pReq = (SMCreateSnodeReq*)rpcMallocCont(contLen);
|
||||
pReq->dnodeId = htonl(2);
|
||||
|
||||
SRpcMsg* pMsg = test.SendMsg(TDMT_MND_CREATE_SNODE, pReq, contLen);
|
||||
ASSERT_NE(pMsg, nullptr);
|
||||
if (pMsg->code == 0) break;
|
||||
taosMsleep(1000);
|
||||
}
|
||||
|
||||
ASSERT_NE(retry, retryMax);
|
||||
}
|
||||
}
|
|
@ -69,6 +69,8 @@ static const char *sdbStatusStr(ESdbStatus status) {
|
|||
return "ready";
|
||||
case SDB_STATUS_DROPPED:
|
||||
return "dropped";
|
||||
case SDB_STATUS_INIT:
|
||||
return "init";
|
||||
default:
|
||||
return "undefine";
|
||||
}
|
||||
|
@ -261,6 +263,8 @@ int32_t sdbWrite(SSdb *pSdb, SSdbRaw *pRaw) {
|
|||
}
|
||||
|
||||
void *sdbAcquire(SSdb *pSdb, ESdbType type, void *pKey) {
|
||||
terrno = 0;
|
||||
|
||||
SHashObj *hash = sdbGetHash(pSdb, type);
|
||||
if (hash == NULL) return NULL;
|
||||
|
||||
|
|
Loading…
Reference in New Issue