From ad965dbcb462508d5c2b3822510cb84a5603b982 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Tue, 4 Jan 2022 19:24:28 -0800 Subject: [PATCH 1/3] invalid release --- source/dnode/mnode/impl/src/mndBnode.c | 12 +++++++----- source/dnode/mnode/impl/src/mndQnode.c | 12 +++++++----- source/dnode/mnode/impl/src/mndSnode.c | 12 +++++++----- 3 files changed, 21 insertions(+), 15 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndBnode.c b/source/dnode/mnode/impl/src/mndBnode.c index 886784805e..312438a535 100644 --- a/source/dnode/mnode/impl/src/mndBnode.c +++ b/source/dnode/mnode/impl/src/mndBnode.c @@ -26,7 +26,7 @@ static SSdbRaw *mndBnodeActionEncode(SBnodeObj *pObj); static SSdbRow *mndBnodeActionDecode(SSdbRaw *pRaw); static int32_t mndBnodeActionInsert(SSdb *pSdb, SBnodeObj *pObj); static int32_t mndBnodeActionDelete(SSdb *pSdb, SBnodeObj *pObj); -static int32_t mndBnodeActionUpdate(SSdb *pSdb, SBnodeObj *pOldBnode, SBnodeObj *pNewBnode); +static int32_t mndBnodeActionUpdate(SSdb *pSdb, SBnodeObj *pOld, SBnodeObj *pNew); static int32_t mndProcessCreateBnodeReq(SMnodeMsg *pMsg); static int32_t mndProcessDropBnodeReq(SMnodeMsg *pMsg); static int32_t mndProcessCreateBnodeRsp(SMnodeMsg *pMsg); @@ -155,9 +155,9 @@ static int32_t mndBnodeActionDelete(SSdb *pSdb, SBnodeObj *pObj) { return 0; } -static int32_t mndBnodeActionUpdate(SSdb *pSdb, SBnodeObj *pOldBnode, SBnodeObj *pNewBnode) { - mTrace("bnode:%d, perform update action, old_row:%p new_row:%p", pOldBnode->id, pOldBnode, pNewBnode); - pOldBnode->updateTime = pNewBnode->updateTime; +static int32_t mndBnodeActionUpdate(SSdb *pSdb, SBnodeObj *pOld, SBnodeObj *pNew) { + mTrace("bnode:%d, perform update action, old_row:%p new_row:%p", pOld->id, pOld, pNew); + pOld->updateTime = pNew->updateTime; return 0; } @@ -251,6 +251,7 @@ static int32_t mndProcessCreateBnodeReq(SMnodeMsg *pMsg) { SBnodeObj *pObj = mndAcquireBnode(pMnode, pCreate->dnodeId); if (pObj != NULL) { mError("bnode:%d, bnode already exist", pObj->id); + terrno = TSDB_CODE_MND_BNODE_ALREADY_EXIST; mndReleaseBnode(pMnode, pObj); return -1; } @@ -370,11 +371,12 @@ static int32_t mndProcessDropBnodeReq(SMnodeMsg *pMsg) { int32_t code = mndDropBnode(pMnode, pMsg, pObj); if (code != 0) { + sdbRelease(pMnode->pSdb, pObj); mError("bnode:%d, failed to drop since %s", pMnode->dnodeId, terrstr()); return -1; } - sdbRelease(pMnode->pSdb, pMnode); + sdbRelease(pMnode->pSdb, pObj); return TSDB_CODE_MND_ACTION_IN_PROGRESS; } diff --git a/source/dnode/mnode/impl/src/mndQnode.c b/source/dnode/mnode/impl/src/mndQnode.c index ea4cfa41c8..5ff3a79040 100644 --- a/source/dnode/mnode/impl/src/mndQnode.c +++ b/source/dnode/mnode/impl/src/mndQnode.c @@ -26,7 +26,7 @@ static SSdbRaw *mndQnodeActionEncode(SQnodeObj *pObj); static SSdbRow *mndQnodeActionDecode(SSdbRaw *pRaw); static int32_t mndQnodeActionInsert(SSdb *pSdb, SQnodeObj *pObj); static int32_t mndQnodeActionDelete(SSdb *pSdb, SQnodeObj *pObj); -static int32_t mndQnodeActionUpdate(SSdb *pSdb, SQnodeObj *pOldQnode, SQnodeObj *pNewQnode); +static int32_t mndQnodeActionUpdate(SSdb *pSdb, SQnodeObj *pOld, SQnodeObj *pNew); static int32_t mndProcessCreateQnodeReq(SMnodeMsg *pMsg); static int32_t mndProcessDropQnodeReq(SMnodeMsg *pMsg); static int32_t mndProcessCreateQnodeRsp(SMnodeMsg *pMsg); @@ -155,9 +155,9 @@ static int32_t mndQnodeActionDelete(SSdb *pSdb, SQnodeObj *pObj) { return 0; } -static int32_t mndQnodeActionUpdate(SSdb *pSdb, SQnodeObj *pOldQnode, SQnodeObj *pNewQnode) { - mTrace("qnode:%d, perform update action, old_row:%p new_row:%p", pOldQnode->id, pOldQnode, pNewQnode); - pOldQnode->updateTime = pNewQnode->updateTime; +static int32_t mndQnodeActionUpdate(SSdb *pSdb, SQnodeObj *pOld, SQnodeObj *pNew) { + mTrace("qnode:%d, perform update action, old_row:%p new_row:%p", pOld->id, pOld, pNew); + pOld->updateTime = pNew->updateTime; return 0; } @@ -251,6 +251,7 @@ static int32_t mndProcessCreateQnodeReq(SMnodeMsg *pMsg) { SQnodeObj *pObj = mndAcquireQnode(pMnode, pCreate->dnodeId); if (pObj != NULL) { mError("qnode:%d, qnode already exist", pObj->id); + terrno = TSDB_CODE_MND_QNODE_ALREADY_EXIST; mndReleaseQnode(pMnode, pObj); return -1; } @@ -370,11 +371,12 @@ static int32_t mndProcessDropQnodeReq(SMnodeMsg *pMsg) { int32_t code = mndDropQnode(pMnode, pMsg, pObj); if (code != 0) { + sdbRelease(pMnode->pSdb, pObj); mError("qnode:%d, failed to drop since %s", pMnode->dnodeId, terrstr()); return -1; } - sdbRelease(pMnode->pSdb, pMnode); + sdbRelease(pMnode->pSdb, pObj); return TSDB_CODE_MND_ACTION_IN_PROGRESS; } diff --git a/source/dnode/mnode/impl/src/mndSnode.c b/source/dnode/mnode/impl/src/mndSnode.c index 3ab1ad4eaf..5d13211380 100644 --- a/source/dnode/mnode/impl/src/mndSnode.c +++ b/source/dnode/mnode/impl/src/mndSnode.c @@ -26,7 +26,7 @@ static SSdbRaw *mndSnodeActionEncode(SSnodeObj *pObj); static SSdbRow *mndSnodeActionDecode(SSdbRaw *pRaw); static int32_t mndSnodeActionInsert(SSdb *pSdb, SSnodeObj *pObj); static int32_t mndSnodeActionDelete(SSdb *pSdb, SSnodeObj *pObj); -static int32_t mndSnodeActionUpdate(SSdb *pSdb, SSnodeObj *pOldSnode, SSnodeObj *pNewSnode); +static int32_t mndSnodeActionUpdate(SSdb *pSdb, SSnodeObj *pOld, SSnodeObj *pNew); static int32_t mndProcessCreateSnodeReq(SMnodeMsg *pMsg); static int32_t mndProcessDropSnodeReq(SMnodeMsg *pMsg); static int32_t mndProcessCreateSnodeRsp(SMnodeMsg *pMsg); @@ -155,9 +155,9 @@ static int32_t mndSnodeActionDelete(SSdb *pSdb, SSnodeObj *pObj) { return 0; } -static int32_t mndSnodeActionUpdate(SSdb *pSdb, SSnodeObj *pOldSnode, SSnodeObj *pNewSnode) { - mTrace("snode:%d, perform update action, old_row:%p new_row:%p", pOldSnode->id, pOldSnode, pNewSnode); - pOldSnode->updateTime = pNewSnode->updateTime; +static int32_t mndSnodeActionUpdate(SSdb *pSdb, SSnodeObj *pOld, SSnodeObj *pNew) { + mTrace("snode:%d, perform update action, old_row:%p new_row:%p", pOld->id, pOld, pNew); + pOld->updateTime = pNew->updateTime; return 0; } @@ -251,6 +251,7 @@ static int32_t mndProcessCreateSnodeReq(SMnodeMsg *pMsg) { SSnodeObj *pObj = mndAcquireSnode(pMnode, pCreate->dnodeId); if (pObj != NULL) { mError("snode:%d, snode already exist", pObj->id); + terrno = TSDB_CODE_MND_SNODE_ALREADY_EXIST; mndReleaseSnode(pMnode, pObj); return -1; } @@ -370,11 +371,12 @@ static int32_t mndProcessDropSnodeReq(SMnodeMsg *pMsg) { int32_t code = mndDropSnode(pMnode, pMsg, pObj); if (code != 0) { + sdbRelease(pMnode->pSdb, pObj); mError("snode:%d, failed to drop since %s", pMnode->dnodeId, terrstr()); return -1; } - sdbRelease(pMnode->pSdb, pMnode); + sdbRelease(pMnode->pSdb, pObj); return TSDB_CODE_MND_ACTION_IN_PROGRESS; } From 9dd5922d150bb6f5c2232ea638201d8d646a37c9 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Tue, 4 Jan 2022 22:36:24 -0800 Subject: [PATCH 2/3] error returned when trans rollback --- source/dnode/mnode/impl/inc/mndDef.h | 8 ++++---- source/dnode/mnode/impl/src/mndTrans.c | 21 +++++++++++---------- 2 files changed, 15 insertions(+), 14 deletions(-) diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index 267a3f6cf5..93a5722842 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -64,10 +64,10 @@ typedef enum { TRN_STAGE_PREPARE = 0, TRN_STAGE_REDO_LOG = 1, TRN_STAGE_REDO_ACTION = 2, - TRN_STAGE_UNDO_LOG = 3, - TRN_STAGE_UNDO_ACTION = 4, - TRN_STAGE_COMMIT_LOG = 5, - TRN_STAGE_COMMIT = 6, + TRN_STAGE_COMMIT_LOG = 3, + TRN_STAGE_COMMIT = 4, + TRN_STAGE_UNDO_ACTION = 5, + TRN_STAGE_UNDO_LOG = 6, TRN_STAGE_ROLLBACK = 7, TRN_STAGE_FINISHED = 8 } ETrnStage; diff --git a/source/dnode/mnode/impl/src/mndTrans.c b/source/dnode/mnode/impl/src/mndTrans.c index ee4a49ffdc..82fe296166 100644 --- a/source/dnode/mnode/impl/src/mndTrans.c +++ b/source/dnode/mnode/impl/src/mndTrans.c @@ -496,10 +496,15 @@ static int32_t mndTransRollback(SMnode *pMnode, STrans *pTrans) { } static void mndTransSendRpcRsp(STrans *pTrans) { - if (pTrans->rpcHandle != NULL) { - mDebug("trans:%d, send rsp, ahandle:%p code:0x%x", pTrans->id, pTrans->rpcAHandle, pTrans->code & 0xFFFF); - SRpcMsg rspMsg = {.handle = pTrans->rpcHandle, .code = pTrans->code, .ahandle = pTrans->rpcAHandle}; - rpcSendResponse(&rspMsg); + if (pTrans->stage == TRN_STAGE_FINISHED || pTrans->stage == TRN_STAGE_UNDO_LOG || + pTrans->stage == TRN_STAGE_UNDO_ACTION || pTrans->stage == TRN_STAGE_ROLLBACK) { + if (pTrans->rpcHandle != NULL) { + mDebug("trans:%d, send rsp, code:0x%x stage:%d app:%p", pTrans->id, pTrans->code & 0xFFFF, pTrans->stage, + pTrans->rpcAHandle); + SRpcMsg rspMsg = {.handle = pTrans->rpcHandle, .code = pTrans->code, .ahandle = pTrans->rpcAHandle}; + rpcSendResponse(&rspMsg); + pTrans->rpcHandle = NULL; + } } } @@ -764,7 +769,6 @@ static bool mndTransPerformCommitLogStage(SMnode *pMnode, STrans *pTrans) { pTrans->failedTimes++; mError("trans:%d, stage keep on commitLog since %s", pTrans->id, terrstr()); continueExec = false; - ; } return continueExec; @@ -791,7 +795,7 @@ static bool mndTransPerformUndoActionStage(SMnode *pMnode, STrans *pTrans) { int32_t code = mndTransExecuteUndoActions(pMnode, pTrans); if (code == 0) { - pTrans->stage = TRN_STAGE_REDO_LOG; + pTrans->stage = TRN_STAGE_UNDO_LOG; mDebug("trans:%d, stage from undoAction to undoLog", pTrans->id); continueExec = true; } else if (code == TSDB_CODE_MND_ACTION_IN_PROGRESS) { @@ -814,7 +818,6 @@ static bool mndTransPerformRollbackStage(SMnode *pMnode, STrans *pTrans) { pTrans->stage = TRN_STAGE_FINISHED; mDebug("trans:%d, stage from rollback to finished", pTrans->id); continueExec = true; - ; } else { pTrans->failedTimes++; mError("trans:%d, stage keep on rollback since %s", pTrans->id, terrstr()); @@ -880,9 +883,7 @@ static void mndTransExecute(SMnode *pMnode, STrans *pTrans) { } } - if (pTrans->stage == TRN_STAGE_FINISHED) { - mndTransSendRpcRsp(pTrans); - } + mndTransSendRpcRsp(pTrans); } static int32_t mndProcessTransMsg(SMnodeMsg *pMsg) { From 3bd42107652d23ae7a110442ef4595db30ea979e Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Wed, 5 Jan 2022 01:31:13 -0800 Subject: [PATCH 3/3] When an error occurs in rollback or retry, the failed result is returned first --- include/dnode/mnode/sdb/sdb.h | 1 + source/dnode/mnode/impl/inc/mndTrans.h | 1 + source/dnode/mnode/impl/src/mndBnode.c | 111 ++++++------ source/dnode/mnode/impl/src/mndQnode.c | 111 ++++++------ source/dnode/mnode/impl/src/mndSnode.c | 109 ++++++------ source/dnode/mnode/impl/src/mndTrans.c | 43 +++-- source/dnode/mnode/impl/test/bnode/bnode.cpp | 168 +++++++++++++++++-- source/dnode/mnode/impl/test/qnode/qnode.cpp | 146 +++++++++++++++- source/dnode/mnode/impl/test/snode/snode.cpp | 168 +++++++++++++++++-- source/dnode/mnode/sdb/src/sdbHash.c | 4 + 10 files changed, 655 insertions(+), 207 deletions(-) diff --git a/include/dnode/mnode/sdb/sdb.h b/include/dnode/mnode/sdb/sdb.h index 497da71c13..c7198eee6f 100644 --- a/include/dnode/mnode/sdb/sdb.h +++ b/include/dnode/mnode/sdb/sdb.h @@ -94,6 +94,7 @@ typedef struct SSdbRaw SSdbRaw; typedef struct SSdbRow SSdbRow; typedef enum { SDB_KEY_BINARY = 1, SDB_KEY_INT32 = 2, SDB_KEY_INT64 = 3 } EKeyType; typedef enum { + SDB_STATUS_INIT = 0, SDB_STATUS_CREATING = 1, SDB_STATUS_UPDATING = 2, SDB_STATUS_DROPPING = 3, diff --git a/source/dnode/mnode/impl/inc/mndTrans.h b/source/dnode/mnode/impl/inc/mndTrans.h index bd053d91b6..fda3fed13d 100644 --- a/source/dnode/mnode/impl/inc/mndTrans.h +++ b/source/dnode/mnode/impl/inc/mndTrans.h @@ -28,6 +28,7 @@ typedef struct { int8_t msgSent; int8_t msgReceived; int32_t errCode; + int32_t acceptableCode; int32_t contLen; void *pCont; } STransAction; diff --git a/source/dnode/mnode/impl/src/mndBnode.c b/source/dnode/mnode/impl/src/mndBnode.c index 312438a535..14d49213e9 100644 --- a/source/dnode/mnode/impl/src/mndBnode.c +++ b/source/dnode/mnode/impl/src/mndBnode.c @@ -59,9 +59,8 @@ int32_t mndInitBnode(SMnode *pMnode) { void mndCleanupBnode(SMnode *pMnode) {} static SBnodeObj *mndAcquireBnode(SMnode *pMnode, int32_t bnodeId) { - SSdb *pSdb = pMnode->pSdb; - SBnodeObj *pObj = sdbAcquire(pSdb, SDB_BNODE, &bnodeId); - if (pObj == NULL) { + SBnodeObj *pObj = sdbAcquire(pMnode->pSdb, SDB_BNODE, &bnodeId); + if (pObj == NULL && terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) { terrno = TSDB_CODE_MND_BNODE_NOT_EXIST; } return pObj; @@ -169,6 +168,14 @@ static int32_t mndSetCreateBnodeRedoLogs(STrans *pTrans, SBnodeObj *pObj) { return 0; } +static int32_t mndSetCreateBnodeUndoLogs(STrans *pTrans, SBnodeObj *pObj) { + SSdbRaw *pUndoRaw = mndBnodeActionEncode(pObj); + if (pUndoRaw == NULL) return -1; + if (mndTransAppendUndolog(pTrans, pUndoRaw) != 0) return -1; + if (sdbSetRawStatus(pUndoRaw, SDB_STATUS_DROPPED) != 0) return -1; + return 0; +} + static int32_t mndSetCreateBnodeCommitLogs(STrans *pTrans, SBnodeObj *pObj) { SSdbRaw *pCommitRaw = mndBnodeActionEncode(pObj); if (pCommitRaw == NULL) return -1; @@ -190,6 +197,7 @@ static int32_t mndSetCreateBnodeRedoActions(STrans *pTrans, SDnodeObj *pDnode, S action.pCont = pMsg; action.contLen = sizeof(SDCreateBnodeReq); action.msgType = TDMT_DND_CREATE_BNODE; + action.acceptableCode = TSDB_CODE_DND_BNODE_ALREADY_DEPLOYED; if (mndTransAppendRedoAction(pTrans, &action) != 0) { free(pMsg); @@ -199,39 +207,47 @@ static int32_t mndSetCreateBnodeRedoActions(STrans *pTrans, SDnodeObj *pDnode, S return 0; } +static int32_t mndSetCreateBnodeUndoActions(STrans *pTrans, SDnodeObj *pDnode, SBnodeObj *pObj) { + SDDropBnodeReq *pMsg = malloc(sizeof(SDDropBnodeReq)); + if (pMsg == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + pMsg->dnodeId = htonl(pDnode->id); + + STransAction action = {0}; + action.epSet = mndGetDnodeEpset(pDnode); + action.pCont = pMsg; + action.contLen = sizeof(SDDropBnodeReq); + action.msgType = TDMT_DND_DROP_BNODE; + action.acceptableCode = TSDB_CODE_DND_BNODE_NOT_DEPLOYED; + + if (mndTransAppendUndoAction(pTrans, &action) != 0) { + free(pMsg); + return -1; + } + + return 0; +} + static int32_t mndCreateBnode(SMnode *pMnode, SMnodeMsg *pMsg, SDnodeObj *pDnode, SMCreateBnodeReq *pCreate) { + int32_t code = -1; + SBnodeObj bnodeObj = {0}; bnodeObj.id = pDnode->id; bnodeObj.createdTime = taosGetTimestampMs(); bnodeObj.updateTime = bnodeObj.createdTime; - int32_t code = -1; - STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, &pMsg->rpcMsg); - if (pTrans == NULL) { - mError("bnode:%d, failed to create since %s", pCreate->dnodeId, terrstr()); - goto CREATE_BNODE_OVER; - } + STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, &pMsg->rpcMsg); + if (pTrans == NULL) goto CREATE_BNODE_OVER; + mDebug("trans:%d, used to create bnode:%d", pTrans->id, pCreate->dnodeId); - - if (mndSetCreateBnodeRedoLogs(pTrans, &bnodeObj) != 0) { - mError("trans:%d, failed to set redo log since %s", pTrans->id, terrstr()); - goto CREATE_BNODE_OVER; - } - - if (mndSetCreateBnodeCommitLogs(pTrans, &bnodeObj) != 0) { - mError("trans:%d, failed to set commit log since %s", pTrans->id, terrstr()); - goto CREATE_BNODE_OVER; - } - - if (mndSetCreateBnodeRedoActions(pTrans, pDnode, &bnodeObj) != 0) { - mError("trans:%d, failed to set redo actions since %s", pTrans->id, terrstr()); - goto CREATE_BNODE_OVER; - } - - if (mndTransPrepare(pMnode, pTrans) != 0) { - mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr()); - goto CREATE_BNODE_OVER; - } + if (mndSetCreateBnodeRedoLogs(pTrans, &bnodeObj) != 0) goto CREATE_BNODE_OVER; + if (mndSetCreateBnodeUndoLogs(pTrans, &bnodeObj) != 0) goto CREATE_BNODE_OVER; + if (mndSetCreateBnodeCommitLogs(pTrans, &bnodeObj) != 0) goto CREATE_BNODE_OVER; + if (mndSetCreateBnodeRedoActions(pTrans, pDnode, &bnodeObj) != 0) goto CREATE_BNODE_OVER; + if (mndSetCreateBnodeUndoActions(pTrans, pDnode, &bnodeObj) != 0) goto CREATE_BNODE_OVER; + if (mndTransPrepare(pMnode, pTrans) != 0) goto CREATE_BNODE_OVER; code = 0; @@ -254,6 +270,9 @@ static int32_t mndProcessCreateBnodeReq(SMnodeMsg *pMsg) { terrno = TSDB_CODE_MND_BNODE_ALREADY_EXIST; mndReleaseBnode(pMnode, pObj); return -1; + } else if (terrno != TSDB_CODE_MND_BNODE_NOT_EXIST) { + mError("bnode:%d, failed to create bnode since %s", pCreate->dnodeId, terrstr()); + return -1; } SDnodeObj *pDnode = mndAcquireDnode(pMnode, pCreate->dnodeId); @@ -303,6 +322,7 @@ static int32_t mndSetDropBnodeRedoActions(STrans *pTrans, SDnodeObj *pDnode, SBn action.pCont = pMsg; action.contLen = sizeof(SDDropBnodeReq); action.msgType = TDMT_DND_DROP_BNODE; + action.acceptableCode = TSDB_CODE_DND_BNODE_NOT_DEPLOYED; if (mndTransAppendRedoAction(pTrans, &action) != 0) { free(pMsg); @@ -314,33 +334,15 @@ static int32_t mndSetDropBnodeRedoActions(STrans *pTrans, SDnodeObj *pDnode, SBn static int32_t mndDropBnode(SMnode *pMnode, SMnodeMsg *pMsg, SBnodeObj *pObj) { int32_t code = -1; + STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, &pMsg->rpcMsg); - if (pTrans == NULL) { - mError("bnode:%d, failed to drop since %s", pObj->id, terrstr()); - goto DROP_BNODE_OVER; - } + if (pTrans == NULL) goto DROP_BNODE_OVER; mDebug("trans:%d, used to drop bnode:%d", pTrans->id, pObj->id); - - if (mndSetDropBnodeRedoLogs(pTrans, pObj) != 0) { - mError("trans:%d, failed to set redo log since %s", pTrans->id, terrstr()); - goto DROP_BNODE_OVER; - } - - if (mndSetDropBnodeCommitLogs(pTrans, pObj) != 0) { - mError("trans:%d, failed to set commit log since %s", pTrans->id, terrstr()); - goto DROP_BNODE_OVER; - } - - if (mndSetDropBnodeRedoActions(pTrans, pObj->pDnode, pObj) != 0) { - mError("trans:%d, failed to set redo actions since %s", pTrans->id, terrstr()); - goto DROP_BNODE_OVER; - } - - if (mndTransPrepare(pMnode, pTrans) != 0) { - mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr()); - goto DROP_BNODE_OVER; - } + if (mndSetDropBnodeRedoLogs(pTrans, pObj) != 0) goto DROP_BNODE_OVER; + if (mndSetDropBnodeCommitLogs(pTrans, pObj) != 0) goto DROP_BNODE_OVER; + if (mndSetDropBnodeRedoActions(pTrans, pObj->pDnode, pObj) != 0) goto DROP_BNODE_OVER; + if (mndTransPrepare(pMnode, pTrans) != 0) goto DROP_BNODE_OVER; code = 0; @@ -364,8 +366,7 @@ static int32_t mndProcessDropBnodeReq(SMnodeMsg *pMsg) { SBnodeObj *pObj = mndAcquireBnode(pMnode, pDrop->dnodeId); if (pObj == NULL) { - mError("bnode:%d, not exist", pDrop->dnodeId); - terrno = TSDB_CODE_MND_BNODE_NOT_EXIST; + mError("bnode:%d, failed to drop since %s", pDrop->dnodeId, terrstr()); return -1; } diff --git a/source/dnode/mnode/impl/src/mndQnode.c b/source/dnode/mnode/impl/src/mndQnode.c index 5ff3a79040..6951aa8717 100644 --- a/source/dnode/mnode/impl/src/mndQnode.c +++ b/source/dnode/mnode/impl/src/mndQnode.c @@ -59,9 +59,8 @@ int32_t mndInitQnode(SMnode *pMnode) { void mndCleanupQnode(SMnode *pMnode) {} static SQnodeObj *mndAcquireQnode(SMnode *pMnode, int32_t qnodeId) { - SSdb *pSdb = pMnode->pSdb; - SQnodeObj *pObj = sdbAcquire(pSdb, SDB_QNODE, &qnodeId); - if (pObj == NULL) { + SQnodeObj *pObj = sdbAcquire(pMnode->pSdb, SDB_QNODE, &qnodeId); + if (pObj == NULL && terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) { terrno = TSDB_CODE_MND_QNODE_NOT_EXIST; } return pObj; @@ -169,6 +168,14 @@ static int32_t mndSetCreateQnodeRedoLogs(STrans *pTrans, SQnodeObj *pObj) { return 0; } +static int32_t mndSetCreateQnodeUndoLogs(STrans *pTrans, SQnodeObj *pObj) { + SSdbRaw *pUndoRaw = mndQnodeActionEncode(pObj); + if (pUndoRaw == NULL) return -1; + if (mndTransAppendUndolog(pTrans, pUndoRaw) != 0) return -1; + if (sdbSetRawStatus(pUndoRaw, SDB_STATUS_DROPPED) != 0) return -1; + return 0; +} + static int32_t mndSetCreateQnodeCommitLogs(STrans *pTrans, SQnodeObj *pObj) { SSdbRaw *pCommitRaw = mndQnodeActionEncode(pObj); if (pCommitRaw == NULL) return -1; @@ -190,6 +197,7 @@ static int32_t mndSetCreateQnodeRedoActions(STrans *pTrans, SDnodeObj *pDnode, S action.pCont = pMsg; action.contLen = sizeof(SDCreateQnodeReq); action.msgType = TDMT_DND_CREATE_QNODE; + action.acceptableCode = TSDB_CODE_DND_QNODE_ALREADY_DEPLOYED; if (mndTransAppendRedoAction(pTrans, &action) != 0) { free(pMsg); @@ -199,39 +207,47 @@ static int32_t mndSetCreateQnodeRedoActions(STrans *pTrans, SDnodeObj *pDnode, S return 0; } +static int32_t mndSetCreateQnodeUndoActions(STrans *pTrans, SDnodeObj *pDnode, SQnodeObj *pObj) { + SDDropQnodeReq *pMsg = malloc(sizeof(SDDropQnodeReq)); + if (pMsg == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + pMsg->dnodeId = htonl(pDnode->id); + + STransAction action = {0}; + action.epSet = mndGetDnodeEpset(pDnode); + action.pCont = pMsg; + action.contLen = sizeof(SDDropQnodeReq); + action.msgType = TDMT_DND_DROP_QNODE; + action.acceptableCode = TSDB_CODE_DND_QNODE_NOT_DEPLOYED; + + if (mndTransAppendUndoAction(pTrans, &action) != 0) { + free(pMsg); + return -1; + } + + return 0; +} + static int32_t mndCreateQnode(SMnode *pMnode, SMnodeMsg *pMsg, SDnodeObj *pDnode, SMCreateQnodeReq *pCreate) { + int32_t code = -1; + SQnodeObj qnodeObj = {0}; qnodeObj.id = pDnode->id; qnodeObj.createdTime = taosGetTimestampMs(); qnodeObj.updateTime = qnodeObj.createdTime; - int32_t code = -1; - STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, &pMsg->rpcMsg); - if (pTrans == NULL) { - mError("qnode:%d, failed to create since %s", pCreate->dnodeId, terrstr()); - goto CREATE_QNODE_OVER; - } + STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, &pMsg->rpcMsg); + if (pTrans == NULL) goto CREATE_QNODE_OVER; + mDebug("trans:%d, used to create qnode:%d", pTrans->id, pCreate->dnodeId); - - if (mndSetCreateQnodeRedoLogs(pTrans, &qnodeObj) != 0) { - mError("trans:%d, failed to set redo log since %s", pTrans->id, terrstr()); - goto CREATE_QNODE_OVER; - } - - if (mndSetCreateQnodeCommitLogs(pTrans, &qnodeObj) != 0) { - mError("trans:%d, failed to set commit log since %s", pTrans->id, terrstr()); - goto CREATE_QNODE_OVER; - } - - if (mndSetCreateQnodeRedoActions(pTrans, pDnode, &qnodeObj) != 0) { - mError("trans:%d, failed to set redo actions since %s", pTrans->id, terrstr()); - goto CREATE_QNODE_OVER; - } - - if (mndTransPrepare(pMnode, pTrans) != 0) { - mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr()); - goto CREATE_QNODE_OVER; - } + if (mndSetCreateQnodeRedoLogs(pTrans, &qnodeObj) != 0) goto CREATE_QNODE_OVER; + if (mndSetCreateQnodeUndoLogs(pTrans, &qnodeObj) != 0) goto CREATE_QNODE_OVER; + if (mndSetCreateQnodeCommitLogs(pTrans, &qnodeObj) != 0) goto CREATE_QNODE_OVER; + if (mndSetCreateQnodeRedoActions(pTrans, pDnode, &qnodeObj) != 0) goto CREATE_QNODE_OVER; + if (mndSetCreateQnodeUndoActions(pTrans, pDnode, &qnodeObj) != 0) goto CREATE_QNODE_OVER; + if (mndTransPrepare(pMnode, pTrans) != 0) goto CREATE_QNODE_OVER; code = 0; @@ -254,6 +270,9 @@ static int32_t mndProcessCreateQnodeReq(SMnodeMsg *pMsg) { terrno = TSDB_CODE_MND_QNODE_ALREADY_EXIST; mndReleaseQnode(pMnode, pObj); return -1; + } else if (terrno != TSDB_CODE_MND_QNODE_NOT_EXIST) { + mError("qnode:%d, failed to create qnode since %s", pCreate->dnodeId, terrstr()); + return -1; } SDnodeObj *pDnode = mndAcquireDnode(pMnode, pCreate->dnodeId); @@ -303,6 +322,7 @@ static int32_t mndSetDropQnodeRedoActions(STrans *pTrans, SDnodeObj *pDnode, SQn action.pCont = pMsg; action.contLen = sizeof(SDDropQnodeReq); action.msgType = TDMT_DND_DROP_QNODE; + action.acceptableCode = TSDB_CODE_DND_QNODE_NOT_DEPLOYED; if (mndTransAppendRedoAction(pTrans, &action) != 0) { free(pMsg); @@ -314,33 +334,15 @@ static int32_t mndSetDropQnodeRedoActions(STrans *pTrans, SDnodeObj *pDnode, SQn static int32_t mndDropQnode(SMnode *pMnode, SMnodeMsg *pMsg, SQnodeObj *pObj) { int32_t code = -1; + STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, &pMsg->rpcMsg); - if (pTrans == NULL) { - mError("qnode:%d, failed to drop since %s", pObj->id, terrstr()); - goto DROP_QNODE_OVER; - } + if (pTrans == NULL) goto DROP_QNODE_OVER; mDebug("trans:%d, used to drop qnode:%d", pTrans->id, pObj->id); - - if (mndSetDropQnodeRedoLogs(pTrans, pObj) != 0) { - mError("trans:%d, failed to set redo log since %s", pTrans->id, terrstr()); - goto DROP_QNODE_OVER; - } - - if (mndSetDropQnodeCommitLogs(pTrans, pObj) != 0) { - mError("trans:%d, failed to set commit log since %s", pTrans->id, terrstr()); - goto DROP_QNODE_OVER; - } - - if (mndSetDropQnodeRedoActions(pTrans, pObj->pDnode, pObj) != 0) { - mError("trans:%d, failed to set redo actions since %s", pTrans->id, terrstr()); - goto DROP_QNODE_OVER; - } - - if (mndTransPrepare(pMnode, pTrans) != 0) { - mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr()); - goto DROP_QNODE_OVER; - } + if (mndSetDropQnodeRedoLogs(pTrans, pObj) != 0) goto DROP_QNODE_OVER; + if (mndSetDropQnodeCommitLogs(pTrans, pObj) != 0) goto DROP_QNODE_OVER; + if (mndSetDropQnodeRedoActions(pTrans, pObj->pDnode, pObj) != 0) goto DROP_QNODE_OVER; + if (mndTransPrepare(pMnode, pTrans) != 0) goto DROP_QNODE_OVER; code = 0; @@ -364,8 +366,7 @@ static int32_t mndProcessDropQnodeReq(SMnodeMsg *pMsg) { SQnodeObj *pObj = mndAcquireQnode(pMnode, pDrop->dnodeId); if (pObj == NULL) { - mError("qnode:%d, not exist", pDrop->dnodeId); - terrno = TSDB_CODE_MND_QNODE_NOT_EXIST; + mError("qnode:%d, failed to drop since %s", pDrop->dnodeId, terrstr()); return -1; } diff --git a/source/dnode/mnode/impl/src/mndSnode.c b/source/dnode/mnode/impl/src/mndSnode.c index 5d13211380..7221a2fbf4 100644 --- a/source/dnode/mnode/impl/src/mndSnode.c +++ b/source/dnode/mnode/impl/src/mndSnode.c @@ -59,9 +59,8 @@ int32_t mndInitSnode(SMnode *pMnode) { void mndCleanupSnode(SMnode *pMnode) {} static SSnodeObj *mndAcquireSnode(SMnode *pMnode, int32_t snodeId) { - SSdb *pSdb = pMnode->pSdb; - SSnodeObj *pObj = sdbAcquire(pSdb, SDB_SNODE, &snodeId); - if (pObj == NULL) { + SSnodeObj *pObj = sdbAcquire(pMnode->pSdb, SDB_SNODE, &snodeId); + if (pObj == NULL && terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) { terrno = TSDB_CODE_MND_SNODE_NOT_EXIST; } return pObj; @@ -169,6 +168,14 @@ static int32_t mndSetCreateSnodeRedoLogs(STrans *pTrans, SSnodeObj *pObj) { return 0; } +static int32_t mndSetCreateSnodeUndoLogs(STrans *pTrans, SSnodeObj *pObj) { + SSdbRaw *pUndoRaw = mndSnodeActionEncode(pObj); + if (pUndoRaw == NULL) return -1; + if (mndTransAppendUndolog(pTrans, pUndoRaw) != 0) return -1; + if (sdbSetRawStatus(pUndoRaw, SDB_STATUS_DROPPED) != 0) return -1; + return 0; +} + static int32_t mndSetCreateSnodeCommitLogs(STrans *pTrans, SSnodeObj *pObj) { SSdbRaw *pCommitRaw = mndSnodeActionEncode(pObj); if (pCommitRaw == NULL) return -1; @@ -190,6 +197,7 @@ static int32_t mndSetCreateSnodeRedoActions(STrans *pTrans, SDnodeObj *pDnode, S action.pCont = pMsg; action.contLen = sizeof(SDCreateSnodeReq); action.msgType = TDMT_DND_CREATE_SNODE; + action.acceptableCode = TSDB_CODE_DND_SNODE_ALREADY_DEPLOYED; if (mndTransAppendRedoAction(pTrans, &action) != 0) { free(pMsg); @@ -199,39 +207,48 @@ static int32_t mndSetCreateSnodeRedoActions(STrans *pTrans, SDnodeObj *pDnode, S return 0; } +static int32_t mndSetCreateSnodeUndoActions(STrans *pTrans, SDnodeObj *pDnode, SSnodeObj *pObj) { + SDDropSnodeReq *pMsg = malloc(sizeof(SDDropSnodeReq)); + if (pMsg == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + pMsg->dnodeId = htonl(pDnode->id); + + STransAction action = {0}; + action.epSet = mndGetDnodeEpset(pDnode); + action.pCont = pMsg; + action.contLen = sizeof(SDDropSnodeReq); + action.msgType = TDMT_DND_DROP_SNODE; + action.acceptableCode = TSDB_CODE_DND_SNODE_NOT_DEPLOYED; + + if (mndTransAppendUndoAction(pTrans, &action) != 0) { + free(pMsg); + return -1; + } + + return 0; +} + static int32_t mndCreateSnode(SMnode *pMnode, SMnodeMsg *pMsg, SDnodeObj *pDnode, SMCreateSnodeReq *pCreate) { + int32_t code = -1; + SSnodeObj snodeObj = {0}; snodeObj.id = pDnode->id; snodeObj.createdTime = taosGetTimestampMs(); snodeObj.updateTime = snodeObj.createdTime; - int32_t code = -1; - STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, &pMsg->rpcMsg); - if (pTrans == NULL) { - mError("snode:%d, failed to create since %s", pCreate->dnodeId, terrstr()); - goto CREATE_SNODE_OVER; - } + STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, &pMsg->rpcMsg); + if (pTrans == NULL) goto CREATE_SNODE_OVER; + mDebug("trans:%d, used to create snode:%d", pTrans->id, pCreate->dnodeId); - if (mndSetCreateSnodeRedoLogs(pTrans, &snodeObj) != 0) { - mError("trans:%d, failed to set redo log since %s", pTrans->id, terrstr()); - goto CREATE_SNODE_OVER; - } - - if (mndSetCreateSnodeCommitLogs(pTrans, &snodeObj) != 0) { - mError("trans:%d, failed to set commit log since %s", pTrans->id, terrstr()); - goto CREATE_SNODE_OVER; - } - - if (mndSetCreateSnodeRedoActions(pTrans, pDnode, &snodeObj) != 0) { - mError("trans:%d, failed to set redo actions since %s", pTrans->id, terrstr()); - goto CREATE_SNODE_OVER; - } - - if (mndTransPrepare(pMnode, pTrans) != 0) { - mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr()); - goto CREATE_SNODE_OVER; - } + if (mndSetCreateSnodeRedoLogs(pTrans, &snodeObj) != 0) goto CREATE_SNODE_OVER; + if (mndSetCreateSnodeUndoLogs(pTrans, &snodeObj) != 0) goto CREATE_SNODE_OVER; + if (mndSetCreateSnodeCommitLogs(pTrans, &snodeObj) != 0) goto CREATE_SNODE_OVER; + if (mndSetCreateSnodeRedoActions(pTrans, pDnode, &snodeObj) != 0) goto CREATE_SNODE_OVER; + if (mndSetCreateSnodeUndoActions(pTrans, pDnode, &snodeObj) != 0) goto CREATE_SNODE_OVER; + if (mndTransPrepare(pMnode, pTrans) != 0) goto CREATE_SNODE_OVER; code = 0; @@ -254,6 +271,9 @@ static int32_t mndProcessCreateSnodeReq(SMnodeMsg *pMsg) { terrno = TSDB_CODE_MND_SNODE_ALREADY_EXIST; mndReleaseSnode(pMnode, pObj); return -1; + } else if (terrno != TSDB_CODE_MND_SNODE_NOT_EXIST) { + mError("snode:%d, failed to create snode since %s", pCreate->dnodeId, terrstr()); + return -1; } SDnodeObj *pDnode = mndAcquireDnode(pMnode, pCreate->dnodeId); @@ -303,6 +323,7 @@ static int32_t mndSetDropSnodeRedoActions(STrans *pTrans, SDnodeObj *pDnode, SSn action.pCont = pMsg; action.contLen = sizeof(SDDropSnodeReq); action.msgType = TDMT_DND_DROP_SNODE; + action.acceptableCode = TSDB_CODE_DND_SNODE_NOT_DEPLOYED; if (mndTransAppendRedoAction(pTrans, &action) != 0) { free(pMsg); @@ -314,33 +335,16 @@ static int32_t mndSetDropSnodeRedoActions(STrans *pTrans, SDnodeObj *pDnode, SSn static int32_t mndDropSnode(SMnode *pMnode, SMnodeMsg *pMsg, SSnodeObj *pObj) { int32_t code = -1; + STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, &pMsg->rpcMsg); - if (pTrans == NULL) { - mError("snode:%d, failed to drop since %s", pObj->id, terrstr()); - goto DROP_SNODE_OVER; - } + if (pTrans == NULL) goto DROP_SNODE_OVER; mDebug("trans:%d, used to drop snode:%d", pTrans->id, pObj->id); - if (mndSetDropSnodeRedoLogs(pTrans, pObj) != 0) { - mError("trans:%d, failed to set redo log since %s", pTrans->id, terrstr()); - goto DROP_SNODE_OVER; - } - - if (mndSetDropSnodeCommitLogs(pTrans, pObj) != 0) { - mError("trans:%d, failed to set commit log since %s", pTrans->id, terrstr()); - goto DROP_SNODE_OVER; - } - - if (mndSetDropSnodeRedoActions(pTrans, pObj->pDnode, pObj) != 0) { - mError("trans:%d, failed to set redo actions since %s", pTrans->id, terrstr()); - goto DROP_SNODE_OVER; - } - - if (mndTransPrepare(pMnode, pTrans) != 0) { - mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr()); - goto DROP_SNODE_OVER; - } + if (mndSetDropSnodeRedoLogs(pTrans, pObj) != 0) goto DROP_SNODE_OVER; + if (mndSetDropSnodeCommitLogs(pTrans, pObj) != 0) goto DROP_SNODE_OVER; + if (mndSetDropSnodeRedoActions(pTrans, pObj->pDnode, pObj) != 0) goto DROP_SNODE_OVER; + if (mndTransPrepare(pMnode, pTrans) != 0) goto DROP_SNODE_OVER; code = 0; @@ -364,8 +368,7 @@ static int32_t mndProcessDropSnodeReq(SMnodeMsg *pMsg) { SSnodeObj *pObj = mndAcquireSnode(pMnode, pDrop->dnodeId); if (pObj == NULL) { - mError("snode:%d, not exist", pDrop->dnodeId); - terrno = TSDB_CODE_MND_SNODE_NOT_EXIST; + mError("snode:%d, failed to drop since %s", pDrop->dnodeId, terrstr()); return -1; } diff --git a/source/dnode/mnode/impl/src/mndTrans.c b/source/dnode/mnode/impl/src/mndTrans.c index 82fe296166..c1686c2923 100644 --- a/source/dnode/mnode/impl/src/mndTrans.c +++ b/source/dnode/mnode/impl/src/mndTrans.c @@ -143,6 +143,7 @@ static SSdbRaw *mndTransActionEncode(STrans *pTrans) { STransAction *pAction = taosArrayGet(pTrans->redoActions, i); SDB_SET_BINARY(pRaw, dataPos, (void *)&pAction->epSet, sizeof(SEpSet), TRANS_ENCODE_OVER) SDB_SET_INT16(pRaw, dataPos, pAction->msgType, TRANS_ENCODE_OVER) + SDB_SET_INT32(pRaw, dataPos, pAction->acceptableCode, TRANS_ENCODE_OVER) SDB_SET_INT32(pRaw, dataPos, pAction->contLen, TRANS_ENCODE_OVER) SDB_SET_BINARY(pRaw, dataPos, pAction->pCont, pAction->contLen, TRANS_ENCODE_OVER) } @@ -151,6 +152,7 @@ static SSdbRaw *mndTransActionEncode(STrans *pTrans) { STransAction *pAction = taosArrayGet(pTrans->undoActions, i); SDB_SET_BINARY(pRaw, dataPos, (void *)&pAction->epSet, sizeof(SEpSet), TRANS_ENCODE_OVER) SDB_SET_INT16(pRaw, dataPos, pAction->msgType, TRANS_ENCODE_OVER) + SDB_SET_INT32(pRaw, dataPos, pAction->acceptableCode, TRANS_ENCODE_OVER) SDB_SET_INT32(pRaw, dataPos, pAction->contLen, TRANS_ENCODE_OVER) SDB_SET_BINARY(pRaw, dataPos, (void *)pAction->pCont, pAction->contLen, TRANS_ENCODE_OVER) } @@ -253,6 +255,7 @@ static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) { for (int32_t i = 0; i < redoActionNum; ++i) { SDB_GET_BINARY(pRaw, dataPos, (void *)&action.epSet, sizeof(SEpSet), TRANS_DECODE_OVER); SDB_GET_INT16(pRaw, dataPos, &action.msgType, TRANS_DECODE_OVER) + SDB_GET_INT32(pRaw, dataPos, &action.acceptableCode, TRANS_DECODE_OVER) SDB_GET_INT32(pRaw, dataPos, &action.contLen, TRANS_DECODE_OVER) action.pCont = malloc(action.contLen); if (action.pCont == NULL) goto TRANS_DECODE_OVER; @@ -264,6 +267,7 @@ static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) { for (int32_t i = 0; i < undoActionNum; ++i) { SDB_GET_BINARY(pRaw, dataPos, (void *)&action.epSet, sizeof(SEpSet), TRANS_DECODE_OVER); SDB_GET_INT16(pRaw, dataPos, &action.msgType, TRANS_DECODE_OVER) + SDB_GET_INT32(pRaw, dataPos, &action.acceptableCode, TRANS_DECODE_OVER) SDB_GET_INT32(pRaw, dataPos, &action.contLen, TRANS_DECODE_OVER) action.pCont = malloc(action.contLen); if (action.pCont == NULL) goto TRANS_DECODE_OVER; @@ -496,16 +500,32 @@ static int32_t mndTransRollback(SMnode *pMnode, STrans *pTrans) { } static void mndTransSendRpcRsp(STrans *pTrans) { - if (pTrans->stage == TRN_STAGE_FINISHED || pTrans->stage == TRN_STAGE_UNDO_LOG || - pTrans->stage == TRN_STAGE_UNDO_ACTION || pTrans->stage == TRN_STAGE_ROLLBACK) { - if (pTrans->rpcHandle != NULL) { - mDebug("trans:%d, send rsp, code:0x%x stage:%d app:%p", pTrans->id, pTrans->code & 0xFFFF, pTrans->stage, - pTrans->rpcAHandle); - SRpcMsg rspMsg = {.handle = pTrans->rpcHandle, .code = pTrans->code, .ahandle = pTrans->rpcAHandle}; - rpcSendResponse(&rspMsg); - pTrans->rpcHandle = NULL; + bool sendRsp = false; + + if (pTrans->stage == TRN_STAGE_FINISHED) { + sendRsp = true; + } + + if (pTrans->policy == TRN_POLICY_ROLLBACK) { + if (pTrans->stage == TRN_STAGE_UNDO_LOG || pTrans->stage == TRN_STAGE_UNDO_ACTION || + pTrans->stage == TRN_STAGE_ROLLBACK) { + sendRsp = true; } } + + if (pTrans->policy == TRN_POLICY_RETRY) { + if (pTrans->stage == TRN_STAGE_REDO_ACTION && pTrans->failedTimes > 0) { + sendRsp = true; + } + } + + if (sendRsp && pTrans->rpcHandle != NULL) { + mDebug("trans:%d, send rsp, code:0x%x stage:%d app:%p", pTrans->id, pTrans->code & 0xFFFF, pTrans->stage, + pTrans->rpcAHandle); + SRpcMsg rspMsg = {.handle = pTrans->rpcHandle, .code = pTrans->code, .ahandle = pTrans->rpcAHandle}; + rpcSendResponse(&rspMsg); + pTrans->rpcHandle = NULL; + } } void mndTransProcessRsp(SMnodeMsg *pMsg) { @@ -547,7 +567,8 @@ void mndTransProcessRsp(SMnodeMsg *pMsg) { pAction->errCode = pMsg->rpcMsg.code; } - mDebug("trans:%d, action:%d response is received, code:0x%x", transId, action, pMsg->rpcMsg.code); + mDebug("trans:%d, action:%d response is received, code:0x%x, accept:0x%x", transId, action, pMsg->rpcMsg.code, + pAction->acceptableCode); mndTransExecute(pMnode, pTrans); HANDLE_ACTION_RSP_OVER: @@ -647,7 +668,7 @@ static int32_t mndTransExecuteActions(SMnode *pMnode, STrans *pTrans, SArray *pA if (pAction == NULL) continue; if (pAction->msgSent && pAction->msgReceived) { numOfReceived++; - if (pAction->errCode != 0) { + if (pAction->errCode != 0 && pAction->errCode != pAction->acceptableCode) { errCode = pAction->errCode; } } @@ -695,7 +716,7 @@ static bool mndTransPerformRedoLogStage(SMnode *pMnode, STrans *pTrans) { } else { pTrans->code = terrno; pTrans->stage = TRN_STAGE_UNDO_LOG; - mError("trans:%d, stage from redoLog to undoLog", pTrans->id); + mError("trans:%d, stage from redoLog to undoLog since %s", pTrans->id, terrstr()); } return continueExec; diff --git a/source/dnode/mnode/impl/test/bnode/bnode.cpp b/source/dnode/mnode/impl/test/bnode/bnode.cpp index fa06d9ac69..07dc163df5 100644 --- a/source/dnode/mnode/impl/test/bnode/bnode.cpp +++ b/source/dnode/mnode/impl/test/bnode/bnode.cpp @@ -50,7 +50,18 @@ TEST_F(MndTestBnode, 01_Show_Bnode) { EXPECT_EQ(test.GetShowRows(), 0); } -TEST_F(MndTestBnode, 02_Create_Bnode_Invalid_Id) { +TEST_F(MndTestBnode, 02_Create_Bnode) { + { + int32_t contLen = sizeof(SMCreateBnodeReq); + + SMCreateBnodeReq* pReq = (SMCreateBnodeReq*)rpcMallocCont(contLen); + pReq->dnodeId = htonl(2); + + SRpcMsg* pMsg = test.SendMsg(TDMT_MND_CREATE_BNODE, pReq, contLen); + ASSERT_NE(pMsg, nullptr); + ASSERT_EQ(pMsg->code, TSDB_CODE_MND_DNODE_NOT_EXIST); + } + { int32_t contLen = sizeof(SMCreateBnodeReq); @@ -63,11 +74,6 @@ TEST_F(MndTestBnode, 02_Create_Bnode_Invalid_Id) { test.SendShowMetaMsg(TSDB_MGMT_TABLE_BNODE, ""); CHECK_META("show bnodes", 3); - - CHECK_SCHEMA(0, TSDB_DATA_TYPE_SMALLINT, 2, "id"); - CHECK_SCHEMA(1, TSDB_DATA_TYPE_BINARY, TSDB_EP_LEN + VARSTR_HEADER_SIZE, "endpoint"); - CHECK_SCHEMA(2, TSDB_DATA_TYPE_TIMESTAMP, 8, "create_time"); - test.SendShowRetrieveMsg(); EXPECT_EQ(test.GetShowRows(), 1); @@ -75,24 +81,21 @@ TEST_F(MndTestBnode, 02_Create_Bnode_Invalid_Id) { CheckBinary("localhost:9018", TSDB_EP_LEN); CheckTimestamp(); } -} -TEST_F(MndTestBnode, 03_Create_Bnode_Invalid_Id) { { int32_t contLen = sizeof(SMCreateBnodeReq); SMCreateBnodeReq* pReq = (SMCreateBnodeReq*)rpcMallocCont(contLen); - pReq->dnodeId = htonl(2); + pReq->dnodeId = htonl(1); SRpcMsg* pMsg = test.SendMsg(TDMT_MND_CREATE_BNODE, pReq, contLen); ASSERT_NE(pMsg, nullptr); - ASSERT_EQ(pMsg->code, TSDB_CODE_MND_DNODE_NOT_EXIST); + ASSERT_EQ(pMsg->code, TSDB_CODE_MND_BNODE_ALREADY_EXIST); } } -TEST_F(MndTestBnode, 04_Create_Bnode) { +TEST_F(MndTestBnode, 03_Drop_Bnode) { { - // create dnode int32_t contLen = sizeof(SCreateDnodeMsg); SCreateDnodeMsg* pReq = (SCreateDnodeMsg*)rpcMallocCont(contLen); @@ -110,7 +113,6 @@ TEST_F(MndTestBnode, 04_Create_Bnode) { } { - // create bnode int32_t contLen = sizeof(SMCreateBnodeReq); SMCreateBnodeReq* pReq = (SMCreateBnodeReq*)rpcMallocCont(contLen); @@ -133,7 +135,6 @@ TEST_F(MndTestBnode, 04_Create_Bnode) { } { - // drop bnode int32_t contLen = sizeof(SMDropBnodeReq); SMDropBnodeReq* pReq = (SMDropBnodeReq*)rpcMallocCont(contLen); @@ -151,4 +152,143 @@ TEST_F(MndTestBnode, 04_Create_Bnode) { CheckBinary("localhost:9018", TSDB_EP_LEN); CheckTimestamp(); } + + { + int32_t contLen = sizeof(SMDropBnodeReq); + + SMDropBnodeReq* pReq = (SMDropBnodeReq*)rpcMallocCont(contLen); + pReq->dnodeId = htonl(2); + + SRpcMsg* pMsg = test.SendMsg(TDMT_MND_DROP_BNODE, pReq, contLen); + ASSERT_NE(pMsg, nullptr); + ASSERT_EQ(pMsg->code, TSDB_CODE_MND_BNODE_NOT_EXIST); + } +} + +TEST_F(MndTestBnode, 03_Create_Bnode_Rollback) { + { + // send message first, then dnode2 crash, result is returned, and rollback is started + int32_t contLen = sizeof(SMCreateBnodeReq); + + SMCreateBnodeReq* pReq = (SMCreateBnodeReq*)rpcMallocCont(contLen); + pReq->dnodeId = htonl(2); + + server2.Stop(); + SRpcMsg* pMsg = test.SendMsg(TDMT_MND_CREATE_BNODE, pReq, contLen); + ASSERT_NE(pMsg, nullptr); + ASSERT_EQ(pMsg->code, TSDB_CODE_RPC_NETWORK_UNAVAIL); + } + + { + // continue send message, bnode is creating + int32_t contLen = sizeof(SMCreateBnodeReq); + + SMCreateBnodeReq* pReq = (SMCreateBnodeReq*)rpcMallocCont(contLen); + pReq->dnodeId = htonl(2); + + server2.Stop(); + SRpcMsg* pMsg = test.SendMsg(TDMT_MND_CREATE_BNODE, pReq, contLen); + ASSERT_NE(pMsg, nullptr); + ASSERT_EQ(pMsg->code, TSDB_CODE_SDB_OBJ_CREATING); + } + + { + // continue send message, bnode is creating + int32_t contLen = sizeof(SMDropBnodeReq); + + SMDropBnodeReq* pReq = (SMDropBnodeReq*)rpcMallocCont(contLen); + pReq->dnodeId = htonl(2); + + server2.Stop(); + SRpcMsg* pMsg = test.SendMsg(TDMT_MND_DROP_BNODE, pReq, contLen); + ASSERT_NE(pMsg, nullptr); + ASSERT_EQ(pMsg->code, TSDB_CODE_SDB_OBJ_CREATING); + } + + { + // server start, wait until the rollback finished + server2.DoStart(); + taosMsleep(1000); + + int32_t retry = 0; + int32_t retryMax = 10; + + for (retry = 0; retry < retryMax; retry++) { + int32_t contLen = sizeof(SMCreateBnodeReq); + + SMCreateBnodeReq* pReq = (SMCreateBnodeReq*)rpcMallocCont(contLen); + pReq->dnodeId = htonl(2); + + SRpcMsg* pMsg = test.SendMsg(TDMT_MND_CREATE_BNODE, pReq, contLen); + ASSERT_NE(pMsg, nullptr); + if (pMsg->code == 0) break; + taosMsleep(1000); + } + + ASSERT_NE(retry, retryMax); + } +} + +TEST_F(MndTestBnode, 04_Drop_Bnode_Rollback) { + { + // send message first, then dnode2 crash, result is returned, and rollback is started + int32_t contLen = sizeof(SMDropBnodeReq); + + SMDropBnodeReq* pReq = (SMDropBnodeReq*)rpcMallocCont(contLen); + pReq->dnodeId = htonl(2); + + server2.Stop(); + SRpcMsg* pMsg = test.SendMsg(TDMT_MND_DROP_BNODE, pReq, contLen); + ASSERT_NE(pMsg, nullptr); + ASSERT_EQ(pMsg->code, TSDB_CODE_RPC_NETWORK_UNAVAIL); + } + + { + // continue send message, bnode is dropping + int32_t contLen = sizeof(SMCreateBnodeReq); + + SMCreateBnodeReq* pReq = (SMCreateBnodeReq*)rpcMallocCont(contLen); + pReq->dnodeId = htonl(2); + + server2.Stop(); + SRpcMsg* pMsg = test.SendMsg(TDMT_MND_CREATE_BNODE, pReq, contLen); + ASSERT_NE(pMsg, nullptr); + ASSERT_EQ(pMsg->code, TSDB_CODE_SDB_OBJ_DROPPING); + } + + { + // continue send message, bnode is dropping + int32_t contLen = sizeof(SMDropBnodeReq); + + SMDropBnodeReq* pReq = (SMDropBnodeReq*)rpcMallocCont(contLen); + pReq->dnodeId = htonl(2); + + server2.Stop(); + SRpcMsg* pMsg = test.SendMsg(TDMT_MND_DROP_BNODE, pReq, contLen); + ASSERT_NE(pMsg, nullptr); + ASSERT_EQ(pMsg->code, TSDB_CODE_SDB_OBJ_DROPPING); + } + + { + // server start, wait until the rollback finished + server2.DoStart(); + taosMsleep(1000); + + int32_t retry = 0; + int32_t retryMax = 10; + + for (retry = 0; retry < retryMax; retry++) { + int32_t contLen = sizeof(SMCreateBnodeReq); + + SMCreateBnodeReq* pReq = (SMCreateBnodeReq*)rpcMallocCont(contLen); + pReq->dnodeId = htonl(2); + + SRpcMsg* pMsg = test.SendMsg(TDMT_MND_CREATE_BNODE, pReq, contLen); + ASSERT_NE(pMsg, nullptr); + if (pMsg->code == 0) break; + taosMsleep(1000); + } + + ASSERT_NE(retry, retryMax); + } } \ No newline at end of file diff --git a/source/dnode/mnode/impl/test/qnode/qnode.cpp b/source/dnode/mnode/impl/test/qnode/qnode.cpp index 3fdd5315a4..612b138ccc 100644 --- a/source/dnode/mnode/impl/test/qnode/qnode.cpp +++ b/source/dnode/mnode/impl/test/qnode/qnode.cpp @@ -90,13 +90,12 @@ TEST_F(MndTestQnode, 02_Create_Qnode) { SRpcMsg* pMsg = test.SendMsg(TDMT_MND_CREATE_QNODE, pReq, contLen); ASSERT_NE(pMsg, nullptr); - ASSERT_EQ(pMsg->code, TSDB_CODE_MND_DNODE_NOT_EXIST); + ASSERT_EQ(pMsg->code, TSDB_CODE_MND_QNODE_ALREADY_EXIST); } } -TEST_F(MndTestQnode, 04_Create_Qnode) { +TEST_F(MndTestQnode, 03_Drop_Qnode) { { - // create dnode int32_t contLen = sizeof(SCreateDnodeMsg); SCreateDnodeMsg* pReq = (SCreateDnodeMsg*)rpcMallocCont(contLen); @@ -114,7 +113,6 @@ TEST_F(MndTestQnode, 04_Create_Qnode) { } { - // create qnode int32_t contLen = sizeof(SMCreateQnodeReq); SMCreateQnodeReq* pReq = (SMCreateQnodeReq*)rpcMallocCont(contLen); @@ -137,7 +135,6 @@ TEST_F(MndTestQnode, 04_Create_Qnode) { } { - // drop qnode int32_t contLen = sizeof(SMDropQnodeReq); SMDropQnodeReq* pReq = (SMDropQnodeReq*)rpcMallocCont(contLen); @@ -155,4 +152,143 @@ TEST_F(MndTestQnode, 04_Create_Qnode) { CheckBinary("localhost:9014", TSDB_EP_LEN); CheckTimestamp(); } + + { + int32_t contLen = sizeof(SMDropQnodeReq); + + SMDropQnodeReq* pReq = (SMDropQnodeReq*)rpcMallocCont(contLen); + pReq->dnodeId = htonl(2); + + SRpcMsg* pMsg = test.SendMsg(TDMT_MND_DROP_QNODE, pReq, contLen); + ASSERT_NE(pMsg, nullptr); + ASSERT_EQ(pMsg->code, TSDB_CODE_MND_QNODE_NOT_EXIST); + } +} + +TEST_F(MndTestQnode, 03_Create_Qnode_Rollback) { + { + // send message first, then dnode2 crash, result is returned, and rollback is started + int32_t contLen = sizeof(SMCreateQnodeReq); + + SMCreateQnodeReq* pReq = (SMCreateQnodeReq*)rpcMallocCont(contLen); + pReq->dnodeId = htonl(2); + + server2.Stop(); + SRpcMsg* pMsg = test.SendMsg(TDMT_MND_CREATE_QNODE, pReq, contLen); + ASSERT_NE(pMsg, nullptr); + ASSERT_EQ(pMsg->code, TSDB_CODE_RPC_NETWORK_UNAVAIL); + } + + { + // continue send message, qnode is creating + int32_t contLen = sizeof(SMCreateQnodeReq); + + SMCreateQnodeReq* pReq = (SMCreateQnodeReq*)rpcMallocCont(contLen); + pReq->dnodeId = htonl(2); + + server2.Stop(); + SRpcMsg* pMsg = test.SendMsg(TDMT_MND_CREATE_QNODE, pReq, contLen); + ASSERT_NE(pMsg, nullptr); + ASSERT_EQ(pMsg->code, TSDB_CODE_SDB_OBJ_CREATING); + } + + { + // continue send message, qnode is creating + int32_t contLen = sizeof(SMDropQnodeReq); + + SMDropQnodeReq* pReq = (SMDropQnodeReq*)rpcMallocCont(contLen); + pReq->dnodeId = htonl(2); + + server2.Stop(); + SRpcMsg* pMsg = test.SendMsg(TDMT_MND_DROP_QNODE, pReq, contLen); + ASSERT_NE(pMsg, nullptr); + ASSERT_EQ(pMsg->code, TSDB_CODE_SDB_OBJ_CREATING); + } + + { + // server start, wait until the rollback finished + server2.DoStart(); + taosMsleep(1000); + + int32_t retry = 0; + int32_t retryMax = 10; + + for (retry = 0; retry < retryMax; retry++) { + int32_t contLen = sizeof(SMCreateQnodeReq); + + SMCreateQnodeReq* pReq = (SMCreateQnodeReq*)rpcMallocCont(contLen); + pReq->dnodeId = htonl(2); + + SRpcMsg* pMsg = test.SendMsg(TDMT_MND_CREATE_QNODE, pReq, contLen); + ASSERT_NE(pMsg, nullptr); + if (pMsg->code == 0) break; + taosMsleep(1000); + } + + ASSERT_NE(retry, retryMax); + } +} + +TEST_F(MndTestQnode, 04_Drop_Qnode_Rollback) { + { + // send message first, then dnode2 crash, result is returned, and rollback is started + int32_t contLen = sizeof(SMDropQnodeReq); + + SMDropQnodeReq* pReq = (SMDropQnodeReq*)rpcMallocCont(contLen); + pReq->dnodeId = htonl(2); + + server2.Stop(); + SRpcMsg* pMsg = test.SendMsg(TDMT_MND_DROP_QNODE, pReq, contLen); + ASSERT_NE(pMsg, nullptr); + ASSERT_EQ(pMsg->code, TSDB_CODE_RPC_NETWORK_UNAVAIL); + } + + { + // continue send message, qnode is dropping + int32_t contLen = sizeof(SMCreateQnodeReq); + + SMCreateQnodeReq* pReq = (SMCreateQnodeReq*)rpcMallocCont(contLen); + pReq->dnodeId = htonl(2); + + server2.Stop(); + SRpcMsg* pMsg = test.SendMsg(TDMT_MND_CREATE_QNODE, pReq, contLen); + ASSERT_NE(pMsg, nullptr); + ASSERT_EQ(pMsg->code, TSDB_CODE_SDB_OBJ_DROPPING); + } + + { + // continue send message, qnode is dropping + int32_t contLen = sizeof(SMDropQnodeReq); + + SMDropQnodeReq* pReq = (SMDropQnodeReq*)rpcMallocCont(contLen); + pReq->dnodeId = htonl(2); + + server2.Stop(); + SRpcMsg* pMsg = test.SendMsg(TDMT_MND_DROP_QNODE, pReq, contLen); + ASSERT_NE(pMsg, nullptr); + ASSERT_EQ(pMsg->code, TSDB_CODE_SDB_OBJ_DROPPING); + } + + { + // server start, wait until the rollback finished + server2.DoStart(); + taosMsleep(1000); + + int32_t retry = 0; + int32_t retryMax = 10; + + for (retry = 0; retry < retryMax; retry++) { + int32_t contLen = sizeof(SMCreateQnodeReq); + + SMCreateQnodeReq* pReq = (SMCreateQnodeReq*)rpcMallocCont(contLen); + pReq->dnodeId = htonl(2); + + SRpcMsg* pMsg = test.SendMsg(TDMT_MND_CREATE_QNODE, pReq, contLen); + ASSERT_NE(pMsg, nullptr); + if (pMsg->code == 0) break; + taosMsleep(1000); + } + + ASSERT_NE(retry, retryMax); + } } \ No newline at end of file diff --git a/source/dnode/mnode/impl/test/snode/snode.cpp b/source/dnode/mnode/impl/test/snode/snode.cpp index 4819bc556c..cebb5fe113 100644 --- a/source/dnode/mnode/impl/test/snode/snode.cpp +++ b/source/dnode/mnode/impl/test/snode/snode.cpp @@ -50,7 +50,18 @@ TEST_F(MndTestSnode, 01_Show_Snode) { EXPECT_EQ(test.GetShowRows(), 0); } -TEST_F(MndTestSnode, 02_Create_Snode_Invalid_Id) { +TEST_F(MndTestSnode, 02_Create_Snode) { + { + int32_t contLen = sizeof(SMCreateSnodeReq); + + SMCreateSnodeReq* pReq = (SMCreateSnodeReq*)rpcMallocCont(contLen); + pReq->dnodeId = htonl(2); + + SRpcMsg* pMsg = test.SendMsg(TDMT_MND_CREATE_SNODE, pReq, contLen); + ASSERT_NE(pMsg, nullptr); + ASSERT_EQ(pMsg->code, TSDB_CODE_MND_DNODE_NOT_EXIST); + } + { int32_t contLen = sizeof(SMCreateSnodeReq); @@ -63,11 +74,6 @@ TEST_F(MndTestSnode, 02_Create_Snode_Invalid_Id) { test.SendShowMetaMsg(TSDB_MGMT_TABLE_SNODE, ""); CHECK_META("show snodes", 3); - - CHECK_SCHEMA(0, TSDB_DATA_TYPE_SMALLINT, 2, "id"); - CHECK_SCHEMA(1, TSDB_DATA_TYPE_BINARY, TSDB_EP_LEN + VARSTR_HEADER_SIZE, "endpoint"); - CHECK_SCHEMA(2, TSDB_DATA_TYPE_TIMESTAMP, 8, "create_time"); - test.SendShowRetrieveMsg(); EXPECT_EQ(test.GetShowRows(), 1); @@ -75,24 +81,21 @@ TEST_F(MndTestSnode, 02_Create_Snode_Invalid_Id) { CheckBinary("localhost:9016", TSDB_EP_LEN); CheckTimestamp(); } -} -TEST_F(MndTestSnode, 03_Create_Snode_Invalid_Id) { { int32_t contLen = sizeof(SMCreateSnodeReq); SMCreateSnodeReq* pReq = (SMCreateSnodeReq*)rpcMallocCont(contLen); - pReq->dnodeId = htonl(2); + pReq->dnodeId = htonl(1); SRpcMsg* pMsg = test.SendMsg(TDMT_MND_CREATE_SNODE, pReq, contLen); ASSERT_NE(pMsg, nullptr); - ASSERT_EQ(pMsg->code, TSDB_CODE_MND_DNODE_NOT_EXIST); + ASSERT_EQ(pMsg->code, TSDB_CODE_MND_SNODE_ALREADY_EXIST); } } -TEST_F(MndTestSnode, 04_Create_Snode) { +TEST_F(MndTestSnode, 03_Drop_Snode) { { - // create dnode int32_t contLen = sizeof(SCreateDnodeMsg); SCreateDnodeMsg* pReq = (SCreateDnodeMsg*)rpcMallocCont(contLen); @@ -110,7 +113,6 @@ TEST_F(MndTestSnode, 04_Create_Snode) { } { - // create snode int32_t contLen = sizeof(SMCreateSnodeReq); SMCreateSnodeReq* pReq = (SMCreateSnodeReq*)rpcMallocCont(contLen); @@ -133,7 +135,6 @@ TEST_F(MndTestSnode, 04_Create_Snode) { } { - // drop snode int32_t contLen = sizeof(SMDropSnodeReq); SMDropSnodeReq* pReq = (SMDropSnodeReq*)rpcMallocCont(contLen); @@ -151,4 +152,143 @@ TEST_F(MndTestSnode, 04_Create_Snode) { CheckBinary("localhost:9016", TSDB_EP_LEN); CheckTimestamp(); } + + { + int32_t contLen = sizeof(SMDropSnodeReq); + + SMDropSnodeReq* pReq = (SMDropSnodeReq*)rpcMallocCont(contLen); + pReq->dnodeId = htonl(2); + + SRpcMsg* pMsg = test.SendMsg(TDMT_MND_DROP_SNODE, pReq, contLen); + ASSERT_NE(pMsg, nullptr); + ASSERT_EQ(pMsg->code, TSDB_CODE_MND_SNODE_NOT_EXIST); + } +} + +TEST_F(MndTestSnode, 03_Create_Snode_Rollback) { + { + // send message first, then dnode2 crash, result is returned, and rollback is started + int32_t contLen = sizeof(SMCreateSnodeReq); + + SMCreateSnodeReq* pReq = (SMCreateSnodeReq*)rpcMallocCont(contLen); + pReq->dnodeId = htonl(2); + + server2.Stop(); + SRpcMsg* pMsg = test.SendMsg(TDMT_MND_CREATE_SNODE, pReq, contLen); + ASSERT_NE(pMsg, nullptr); + ASSERT_EQ(pMsg->code, TSDB_CODE_RPC_NETWORK_UNAVAIL); + } + + { + // continue send message, snode is creating + int32_t contLen = sizeof(SMCreateSnodeReq); + + SMCreateSnodeReq* pReq = (SMCreateSnodeReq*)rpcMallocCont(contLen); + pReq->dnodeId = htonl(2); + + server2.Stop(); + SRpcMsg* pMsg = test.SendMsg(TDMT_MND_CREATE_SNODE, pReq, contLen); + ASSERT_NE(pMsg, nullptr); + ASSERT_EQ(pMsg->code, TSDB_CODE_SDB_OBJ_CREATING); + } + + { + // continue send message, snode is creating + int32_t contLen = sizeof(SMDropSnodeReq); + + SMDropSnodeReq* pReq = (SMDropSnodeReq*)rpcMallocCont(contLen); + pReq->dnodeId = htonl(2); + + server2.Stop(); + SRpcMsg* pMsg = test.SendMsg(TDMT_MND_DROP_SNODE, pReq, contLen); + ASSERT_NE(pMsg, nullptr); + ASSERT_EQ(pMsg->code, TSDB_CODE_SDB_OBJ_CREATING); + } + + { + // server start, wait until the rollback finished + server2.DoStart(); + taosMsleep(1000); + + int32_t retry = 0; + int32_t retryMax = 10; + + for (retry = 0; retry < retryMax; retry++) { + int32_t contLen = sizeof(SMCreateSnodeReq); + + SMCreateSnodeReq* pReq = (SMCreateSnodeReq*)rpcMallocCont(contLen); + pReq->dnodeId = htonl(2); + + SRpcMsg* pMsg = test.SendMsg(TDMT_MND_CREATE_SNODE, pReq, contLen); + ASSERT_NE(pMsg, nullptr); + if (pMsg->code == 0) break; + taosMsleep(1000); + } + + ASSERT_NE(retry, retryMax); + } +} + +TEST_F(MndTestSnode, 04_Drop_Snode_Rollback) { + { + // send message first, then dnode2 crash, result is returned, and rollback is started + int32_t contLen = sizeof(SMDropSnodeReq); + + SMDropSnodeReq* pReq = (SMDropSnodeReq*)rpcMallocCont(contLen); + pReq->dnodeId = htonl(2); + + server2.Stop(); + SRpcMsg* pMsg = test.SendMsg(TDMT_MND_DROP_SNODE, pReq, contLen); + ASSERT_NE(pMsg, nullptr); + ASSERT_EQ(pMsg->code, TSDB_CODE_RPC_NETWORK_UNAVAIL); + } + + { + // continue send message, snode is dropping + int32_t contLen = sizeof(SMCreateSnodeReq); + + SMCreateSnodeReq* pReq = (SMCreateSnodeReq*)rpcMallocCont(contLen); + pReq->dnodeId = htonl(2); + + server2.Stop(); + SRpcMsg* pMsg = test.SendMsg(TDMT_MND_CREATE_SNODE, pReq, contLen); + ASSERT_NE(pMsg, nullptr); + ASSERT_EQ(pMsg->code, TSDB_CODE_SDB_OBJ_DROPPING); + } + + { + // continue send message, snode is dropping + int32_t contLen = sizeof(SMDropSnodeReq); + + SMDropSnodeReq* pReq = (SMDropSnodeReq*)rpcMallocCont(contLen); + pReq->dnodeId = htonl(2); + + server2.Stop(); + SRpcMsg* pMsg = test.SendMsg(TDMT_MND_DROP_SNODE, pReq, contLen); + ASSERT_NE(pMsg, nullptr); + ASSERT_EQ(pMsg->code, TSDB_CODE_SDB_OBJ_DROPPING); + } + + { + // server start, wait until the rollback finished + server2.DoStart(); + taosMsleep(1000); + + int32_t retry = 0; + int32_t retryMax = 10; + + for (retry = 0; retry < retryMax; retry++) { + int32_t contLen = sizeof(SMCreateSnodeReq); + + SMCreateSnodeReq* pReq = (SMCreateSnodeReq*)rpcMallocCont(contLen); + pReq->dnodeId = htonl(2); + + SRpcMsg* pMsg = test.SendMsg(TDMT_MND_CREATE_SNODE, pReq, contLen); + ASSERT_NE(pMsg, nullptr); + if (pMsg->code == 0) break; + taosMsleep(1000); + } + + ASSERT_NE(retry, retryMax); + } } \ No newline at end of file diff --git a/source/dnode/mnode/sdb/src/sdbHash.c b/source/dnode/mnode/sdb/src/sdbHash.c index 733075757f..13b2c7daa5 100644 --- a/source/dnode/mnode/sdb/src/sdbHash.c +++ b/source/dnode/mnode/sdb/src/sdbHash.c @@ -69,6 +69,8 @@ static const char *sdbStatusStr(ESdbStatus status) { return "ready"; case SDB_STATUS_DROPPED: return "dropped"; + case SDB_STATUS_INIT: + return "init"; default: return "undefine"; } @@ -261,6 +263,8 @@ int32_t sdbWrite(SSdb *pSdb, SSdbRaw *pRaw) { } void *sdbAcquire(SSdb *pSdb, ESdbType type, void *pKey) { + terrno = 0; + SHashObj *hash = sdbGetHash(pSdb, type); if (hash == NULL) return NULL;