diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 283c6c9e69..27a01e4818 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -508,21 +508,26 @@ typedef struct { } SAlterDbReq; typedef struct { - char db[TSDB_TABLE_FNAME_LEN]; + char db[TSDB_DB_FNAME_LEN]; int8_t ignoreNotExists; } SDropDbReq; typedef struct { - char db[TSDB_TABLE_FNAME_LEN]; + char db[TSDB_DB_FNAME_LEN]; + uint64_t uid; +} SDropDbRsp; + +typedef struct { + char db[TSDB_DB_FNAME_LEN]; int32_t vgVersion; } SUseDbReq; typedef struct { - char db[TSDB_TABLE_FNAME_LEN]; + char db[TSDB_DB_FNAME_LEN]; } SSyncDbReq; typedef struct { - char db[TSDB_TABLE_FNAME_LEN]; + char db[TSDB_DB_FNAME_LEN]; } SCompactDbReq; typedef struct { diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index 5ec9173fc8..00d1b61b74 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -128,6 +128,8 @@ typedef struct { int32_t failedTimes; void* rpcHandle; void* rpcAHandle; + void* rpcRsp; + int32_t rpcRspLen; SArray* redoLogs; SArray* undoLogs; SArray* commitLogs; diff --git a/source/dnode/mnode/impl/inc/mndTrans.h b/source/dnode/mnode/impl/inc/mndTrans.h index fda3fed13d..f1a213790c 100644 --- a/source/dnode/mnode/impl/inc/mndTrans.h +++ b/source/dnode/mnode/impl/inc/mndTrans.h @@ -36,16 +36,17 @@ typedef struct { int32_t mndInitTrans(SMnode *pMnode); void mndCleanupTrans(SMnode *pMnode); -STrans *mndTransCreate(SMnode *pMnode, ETrnPolicy policy, SRpcMsg *pMsg); +STrans *mndTransCreate(SMnode *pMnode, ETrnPolicy policy, const SRpcMsg *pReq); void mndTransDrop(STrans *pTrans); int32_t mndTransAppendRedolog(STrans *pTrans, SSdbRaw *pRaw); int32_t mndTransAppendUndolog(STrans *pTrans, SSdbRaw *pRaw); int32_t mndTransAppendCommitlog(STrans *pTrans, SSdbRaw *pRaw); int32_t mndTransAppendRedoAction(STrans *pTrans, STransAction *pAction); int32_t mndTransAppendUndoAction(STrans *pTrans, STransAction *pAction); +void mndTransSetRpcRsp(STrans *pTrans, void *pCont, int32_t contLen); int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans); -void mndTransProcessRsp(SMnodeMsg *pMsg); +void mndTransProcessRsp(SMnodeMsg *pRsp); void mndTransPullup(SMnode *pMnode); #ifdef __cplusplus diff --git a/source/dnode/mnode/impl/src/mndDb.c b/source/dnode/mnode/impl/src/mndDb.c index 4dde8a5410..4bebdb8a5b 100644 --- a/source/dnode/mnode/impl/src/mndDb.c +++ b/source/dnode/mnode/impl/src/mndDb.c @@ -767,6 +767,14 @@ static int32_t mndDropDb(SMnode *pMnode, SMnodeMsg *pReq, SDbObj *pDb) { if (mndSetDropDbRedoLogs(pMnode, pTrans, pDb) != 0) goto DROP_DB_OVER; if (mndSetDropDbCommitLogs(pMnode, pTrans, pDb) != 0) goto DROP_DB_OVER; if (mndSetDropDbRedoActions(pMnode, pTrans, pDb) != 0) goto DROP_DB_OVER; + + int32_t rspLen = sizeof(SDropDbRsp); + SDropDbRsp *pRsp = rpcMallocCont(rspLen); + if (pRsp == NULL) goto DROP_DB_OVER; + memcpy(pRsp->db, pDb->name, TSDB_DB_FNAME_LEN); + pRsp->uid = htobe64(pDb->uid); + mndTransSetRpcRsp(pTrans, pRsp, rspLen); + if (mndTransPrepare(pMnode, pTrans) != 0) goto DROP_DB_OVER; code = 0; diff --git a/source/dnode/mnode/impl/src/mndTrans.c b/source/dnode/mnode/impl/src/mndTrans.c index 808e6dcbe5..6686c0887f 100644 --- a/source/dnode/mnode/impl/src/mndTrans.c +++ b/source/dnode/mnode/impl/src/mndTrans.c @@ -308,6 +308,11 @@ static void mndTransDropData(STrans *pTrans) { mndTransDropLogs(pTrans->commitLogs); mndTransDropActions(pTrans->redoActions); mndTransDropActions(pTrans->undoActions); + if (pTrans->rpcRsp != NULL) { + rpcFreeCont(pTrans->rpcRsp); + pTrans->rpcRsp = NULL; + pTrans->rpcRspLen = 0; + } } static int32_t mndTransActionDelete(SSdb *pSdb, STrans *pTrans) { @@ -339,7 +344,7 @@ void mndReleaseTrans(SMnode *pMnode, STrans *pTrans) { sdbRelease(pSdb, pTrans); } -STrans *mndTransCreate(SMnode *pMnode, ETrnPolicy policy, SRpcMsg *pMsg) { +STrans *mndTransCreate(SMnode *pMnode, ETrnPolicy policy, const SRpcMsg *pReq) { STrans *pTrans = calloc(1, sizeof(STrans)); if (pTrans == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -350,8 +355,8 @@ STrans *mndTransCreate(SMnode *pMnode, ETrnPolicy policy, SRpcMsg *pMsg) { pTrans->id = sdbGetMaxId(pMnode->pSdb, SDB_TRANS); pTrans->stage = TRN_STAGE_PREPARE; pTrans->policy = policy; - pTrans->rpcHandle = pMsg->handle; - pTrans->rpcAHandle = pMsg->ahandle; + pTrans->rpcHandle = pReq->handle; + pTrans->rpcAHandle = pReq->ahandle; pTrans->redoLogs = taosArrayInit(MND_TRANS_ARRAY_SIZE, sizeof(void *)); pTrans->undoLogs = taosArrayInit(MND_TRANS_ARRAY_SIZE, sizeof(void *)); pTrans->commitLogs = taosArrayInit(MND_TRANS_ARRAY_SIZE, sizeof(void *)); @@ -436,6 +441,11 @@ int32_t mndTransAppendUndoAction(STrans *pTrans, STransAction *pAction) { return mndTransAppendAction(pTrans->undoActions, pAction); } +void mndTransSetRpcRsp(STrans *pTrans, void *pCont, int32_t contLen) { + pTrans->rpcRsp = pCont; + pTrans->rpcRspLen = contLen; +} + static int32_t mndTransSync(SMnode *pMnode, STrans *pTrans) { SSdbRaw *pRaw = mndTransActionEncode(pTrans); if (pRaw == NULL) { @@ -479,6 +489,11 @@ int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans) { pNew->rpcHandle = pTrans->rpcHandle; pNew->rpcAHandle = pTrans->rpcAHandle; + pNew->rpcRsp = pTrans->rpcRsp; + pNew->rpcRspLen = pTrans->rpcRspLen; + pTrans->rpcRsp = NULL; + pTrans->rpcRspLen = 0; + mndTransExecute(pMnode, pNew); mndReleaseTrans(pMnode, pNew); return 0; @@ -529,15 +544,21 @@ static void mndTransSendRpcRsp(STrans *pTrans) { if (sendRsp && pTrans->rpcHandle != NULL) { mDebug("trans:%d, send rsp, code:0x%x stage:%d app:%p", pTrans->id, pTrans->code & 0xFFFF, pTrans->stage, pTrans->rpcAHandle); - SRpcMsg rspMsg = {.handle = pTrans->rpcHandle, .code = pTrans->code, .ahandle = pTrans->rpcAHandle}; + SRpcMsg rspMsg = {.handle = pTrans->rpcHandle, + .code = pTrans->code, + .ahandle = pTrans->rpcAHandle, + .pCont = pTrans->rpcRsp, + .contLen = pTrans->rpcRspLen}; rpcSendResponse(&rspMsg); pTrans->rpcHandle = NULL; + pTrans->rpcRsp = NULL; + pTrans->rpcRspLen = 0; } } -void mndTransProcessRsp(SMnodeMsg *pMsg) { - SMnode *pMnode = pMsg->pMnode; - int64_t signature = (int64_t)(pMsg->rpcMsg.ahandle); +void mndTransProcessRsp(SMnodeMsg *pRsp) { + SMnode *pMnode = pRsp->pMnode; + int64_t signature = (int64_t)(pRsp->rpcMsg.ahandle); int32_t transId = (int32_t)(signature >> 32); int32_t action = (int32_t)((signature << 32) >> 32); @@ -571,10 +592,10 @@ void mndTransProcessRsp(SMnodeMsg *pMsg) { STransAction *pAction = taosArrayGet(pArray, action); if (pAction != NULL) { pAction->msgReceived = 1; - pAction->errCode = pMsg->rpcMsg.code; + pAction->errCode = pRsp->rpcMsg.code; } - mDebug("trans:%d, action:%d response is received, code:0x%x, accept:0x%x", transId, action, pMsg->rpcMsg.code, + mDebug("trans:%d, action:%d response is received, code:0x%x, accept:0x%x", transId, action, pRsp->rpcMsg.code, pAction->acceptableCode); mndTransExecute(pMnode, pTrans); @@ -921,7 +942,7 @@ static int32_t mndProcessTransMsg(SMnodeMsg *pMsg) { void mndTransPullup(SMnode *pMnode) { STrans *pTrans = NULL; - void * pIter = NULL; + void *pIter = NULL; while (1) { pIter = sdbFetch(pMnode->pSdb, SDB_TRANS, pIter, (void **)&pTrans); diff --git a/source/dnode/mnode/impl/test/db/db.cpp b/source/dnode/mnode/impl/test/db/db.cpp index 964a483aac..2d1574467e 100644 --- a/source/dnode/mnode/impl/test/db/db.cpp +++ b/source/dnode/mnode/impl/test/db/db.cpp @@ -202,6 +202,10 @@ TEST_F(MndTestDb, 02_Create_Alter_Drop_Db) { SRpcMsg* pRsp = test.SendReq(TDMT_MND_DROP_DB, pReq, contLen); ASSERT_NE(pRsp, nullptr); ASSERT_EQ(pRsp->code, 0); + + SDropDbRsp* pDrop = (SDropDbRsp*)pRsp->pCont; + pDrop->uid = htobe64(pDrop->uid); + EXPECT_STREQ(pDrop->db, "1.d1"); } test.SendShowMetaReq(TSDB_MGMT_TABLE_DB, ""); @@ -249,6 +253,8 @@ TEST_F(MndTestDb, 03_Create_Use_Restart_Use_Db) { EXPECT_EQ(test.GetShowRows(), 1); CheckBinary("d2", TSDB_DB_NAME_LEN - 1); + uint64_t d2_uid = 0; + { int32_t contLen = sizeof(SUseDbReq); @@ -262,6 +268,8 @@ TEST_F(MndTestDb, 03_Create_Use_Restart_Use_Db) { SUseDbRsp* pRsp = (SUseDbRsp*)pMsg->pCont; EXPECT_STREQ(pRsp->db, "1.d2"); + pRsp->uid = htobe64(pRsp->uid); + d2_uid = pRsp->uid; pRsp->vgVersion = htonl(pRsp->vgVersion); pRsp->vgNum = htonl(pRsp->vgNum); pRsp->hashMethod = pRsp->hashMethod; @@ -311,5 +319,10 @@ TEST_F(MndTestDb, 03_Create_Use_Restart_Use_Db) { SRpcMsg* pRsp = test.SendReq(TDMT_MND_DROP_DB, pReq, contLen); ASSERT_NE(pRsp, nullptr); ASSERT_EQ(pRsp->code, 0); + + SDropDbRsp* pDrop = (SDropDbRsp*)pRsp->pCont; + pDrop->uid = htobe64(pDrop->uid); + EXPECT_STREQ(pDrop->db, "1.d2"); + EXPECT_EQ(pDrop->uid, d2_uid); } }