From 1f70617b371141758ae0bcd776104d4f359613da Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Sat, 25 Dec 2021 14:09:23 +0800 Subject: [PATCH] add ahandle to topic --- source/dnode/mnode/impl/inc/mndDef.h | 1 + source/dnode/mnode/impl/inc/mndTrans.h | 2 +- source/dnode/mnode/impl/src/mndDb.c | 6 +++--- source/dnode/mnode/impl/src/mndDnode.c | 4 ++-- source/dnode/mnode/impl/src/mndFunc.c | 4 ++-- source/dnode/mnode/impl/src/mndMnode.c | 4 ++-- source/dnode/mnode/impl/src/mndStb.c | 4 ++-- source/dnode/mnode/impl/src/mndTopic.c | 4 ++-- source/dnode/mnode/impl/src/mndTrans.c | 22 +++++++++------------- source/dnode/mnode/impl/src/mndUser.c | 6 +++--- 10 files changed, 27 insertions(+), 30 deletions(-) diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index 927b1e2e16..b21dc0841c 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -96,6 +96,7 @@ typedef struct { ETrnStage stage; ETrnPolicy policy; void *rpcHandle; + void *rpcAHandle; SArray *redoLogs; SArray *undoLogs; SArray *commitLogs; diff --git a/source/dnode/mnode/impl/inc/mndTrans.h b/source/dnode/mnode/impl/inc/mndTrans.h index a424c315f9..2d57179f1c 100644 --- a/source/dnode/mnode/impl/inc/mndTrans.h +++ b/source/dnode/mnode/impl/inc/mndTrans.h @@ -35,7 +35,7 @@ typedef struct { int32_t mndInitTrans(SMnode *pMnode); void mndCleanupTrans(SMnode *pMnode); -STrans *mndTransCreate(SMnode *pMnode, ETrnPolicy policy, void *rpcHandle); +STrans *mndTransCreate(SMnode *pMnode, ETrnPolicy policy, SRpcMsg *pMsg); void mndTransDrop(STrans *pTrans); int32_t mndTransAppendRedolog(STrans *pTrans, SSdbRaw *pRaw); int32_t mndTransAppendUndolog(STrans *pTrans, SSdbRaw *pRaw); diff --git a/source/dnode/mnode/impl/src/mndDb.c b/source/dnode/mnode/impl/src/mndDb.c index 82aea79edb..6eb94eccf2 100644 --- a/source/dnode/mnode/impl/src/mndDb.c +++ b/source/dnode/mnode/impl/src/mndDb.c @@ -400,7 +400,7 @@ static int32_t mndCreateDb(SMnode *pMnode, SMnodeMsg *pMsg, SCreateDbMsg *pCreat } int32_t code = -1; - STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, pMsg->rpcMsg.handle); + STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, &pMsg->rpcMsg); if (pTrans == NULL) { mError("db:%s, failed to create since %s", pCreate->db, terrstr()); goto CREATE_DB_OVER; @@ -608,7 +608,7 @@ static int32_t mndSetUpdateDbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj static int32_t mndUpdateDb(SMnode *pMnode, SMnodeMsg *pMsg, SDbObj *pOldDb, SDbObj *pNewDb) { int32_t code = -1; - STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, pMsg->rpcMsg.handle); + STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, &pMsg->rpcMsg); if (pTrans == NULL) { mError("db:%s, failed to update since %s", pOldDb->name, terrstr()); return terrno; @@ -772,7 +772,7 @@ static int32_t mndSetDropDbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *p static int32_t mndDropDb(SMnode *pMnode, SMnodeMsg *pMsg, SDbObj *pDb) { int32_t code = -1; - STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, pMsg->rpcMsg.handle); + STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, &pMsg->rpcMsg); if (pTrans == NULL) { mError("db:%s, failed to drop since %s", pDb->name, terrstr()); return -1; diff --git a/source/dnode/mnode/impl/src/mndDnode.c b/source/dnode/mnode/impl/src/mndDnode.c index f13bbed247..80c9f9544e 100644 --- a/source/dnode/mnode/impl/src/mndDnode.c +++ b/source/dnode/mnode/impl/src/mndDnode.c @@ -396,7 +396,7 @@ static int32_t mndCreateDnode(SMnode *pMnode, SMnodeMsg *pMsg, SCreateDnodeMsg * return terrno; } - STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, pMsg->rpcMsg.handle); + STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, &pMsg->rpcMsg); if (pTrans == NULL) { mError("dnode:%s, failed to create since %s", pCreate->ep, terrstr()); return -1; @@ -452,7 +452,7 @@ static int32_t mndProcessCreateDnodeMsg(SMnodeMsg *pMsg) { } static int32_t mndDropDnode(SMnode *pMnode, SMnodeMsg *pMsg, SDnodeObj *pDnode) { - STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, pMsg->rpcMsg.handle); + STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, &pMsg->rpcMsg); if (pTrans == NULL) { mError("dnode:%d, failed to drop since %s", pDnode->id, terrstr()); return -1; diff --git a/source/dnode/mnode/impl/src/mndFunc.c b/source/dnode/mnode/impl/src/mndFunc.c index c6209c949f..638d984c69 100644 --- a/source/dnode/mnode/impl/src/mndFunc.c +++ b/source/dnode/mnode/impl/src/mndFunc.c @@ -147,7 +147,7 @@ static int32_t mndCreateFunc(SMnode *pMnode, SMnodeMsg *pMsg, SCreateFuncMsg *pC pFunc->pCode = pFunc->pData + pCreate->commentSize; memcpy(pFunc->pCode, pCreate->pCont + pCreate->commentSize, pFunc->codeSize); - STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, pMsg->rpcMsg.handle); + STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, &pMsg->rpcMsg); if (pTrans == NULL) { free(pFunc); mError("func:%s, failed to create since %s", pCreate->name, terrstr()); @@ -195,7 +195,7 @@ static int32_t mndCreateFunc(SMnode *pMnode, SMnodeMsg *pMsg, SCreateFuncMsg *pC } static int32_t mndDropFunc(SMnode *pMnode, SMnodeMsg *pMsg, SFuncObj *pFunc) { - STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, pMsg->rpcMsg.handle); + STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, &pMsg->rpcMsg); if (pTrans == NULL) { mError("func:%s, failed to drop since %s", pFunc->name, terrstr()); return -1; diff --git a/source/dnode/mnode/impl/src/mndMnode.c b/source/dnode/mnode/impl/src/mndMnode.c index 9f76060063..8f57a18a1d 100644 --- a/source/dnode/mnode/impl/src/mndMnode.c +++ b/source/dnode/mnode/impl/src/mndMnode.c @@ -334,7 +334,7 @@ static int32_t mndCreateMnode(SMnode *pMnode, SMnodeMsg *pMsg, SDnodeObj *pDnode mnodeObj.updateTime = mnodeObj.createdTime; int32_t code = -1; - STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, pMsg->rpcMsg.handle); + STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, &pMsg->rpcMsg); if (pTrans == NULL) { mError("mnode:%d, failed to create since %s", pCreate->dnodeId, terrstr()); goto CREATE_MNODE_OVER; @@ -500,7 +500,7 @@ static int32_t mndSetDropMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDnode static int32_t mndDropMnode(SMnode *pMnode, SMnodeMsg *pMsg, SMnodeObj *pObj) { int32_t code = -1; - STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, pMsg->rpcMsg.handle); + STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, &pMsg->rpcMsg); if (pTrans == NULL) { mError("mnode:%d, failed to drop since %s", pObj->id, terrstr()); goto DROP_MNODE_OVER; diff --git a/source/dnode/mnode/impl/src/mndStb.c b/source/dnode/mnode/impl/src/mndStb.c index 411f68e7e1..0f577c3c3b 100644 --- a/source/dnode/mnode/impl/src/mndStb.c +++ b/source/dnode/mnode/impl/src/mndStb.c @@ -415,7 +415,7 @@ static int32_t mndCreateStb(SMnode *pMnode, SMnodeMsg *pMsg, SCreateStbMsg *pCre memcpy(stbObj.pSchema, pCreate->pSchema, totalSize); int32_t code = 0; - STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, pMsg->rpcMsg.handle); + STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, &pMsg->rpcMsg); if (pTrans == NULL) { mError("stb:%s, failed to create since %s", pCreate->name, terrstr()); return -1; @@ -614,7 +614,7 @@ static int32_t mndSetDropStbUndoActions(SMnode *pMnode, STrans *pTrans, SStbObj static int32_t mndDropStb(SMnode *pMnode, SMnodeMsg *pMsg, SStbObj *pStb) { int32_t code = -1; - STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, pMsg->rpcMsg.handle); + STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, &pMsg->rpcMsg); if (pTrans == NULL) { mError("stb:%s, failed to drop since %s", pStb->name, terrstr()); return -1; diff --git a/source/dnode/mnode/impl/src/mndTopic.c b/source/dnode/mnode/impl/src/mndTopic.c index 648ebb14fc..04f6907918 100644 --- a/source/dnode/mnode/impl/src/mndTopic.c +++ b/source/dnode/mnode/impl/src/mndTopic.c @@ -367,7 +367,7 @@ static int32_t mndCreateTopic(SMnode *pMnode, SMnodeMsg *pMsg, SCreateTopicMsg * #endif int32_t code = 0; - STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, pMsg->rpcMsg.handle); + STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, &pMsg->rpcMsg); if (pTrans == NULL) { mError("topic:%s, failed to create since %s", pCreate->name, terrstr()); return -1; @@ -561,7 +561,7 @@ static int32_t mndSetDropTopicUndoActions(SMnode *pMnode, STrans *pTrans, STopic static int32_t mndDropTopic(SMnode *pMnode, SMnodeMsg *pMsg, STopicObj *pTopic) { int32_t code = -1; - STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, pMsg->rpcMsg.handle); + STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, &pMsg->rpcMsg); if (pTrans == NULL) { mError("topic:%s, failed to drop since %s", pTopic->name, terrstr()); return -1; diff --git a/source/dnode/mnode/impl/src/mndTrans.c b/source/dnode/mnode/impl/src/mndTrans.c index 966ee3c4b9..2432e31b9e 100644 --- a/source/dnode/mnode/impl/src/mndTrans.c +++ b/source/dnode/mnode/impl/src/mndTrans.c @@ -27,7 +27,6 @@ static int32_t mndTransActionInsert(SSdb *pSdb, STrans *pTrans); static int32_t mndTransActionUpdate(SSdb *pSdb, STrans *OldTrans, STrans *pOldTrans); static int32_t mndTransActionDelete(SSdb *pSdb, STrans *pTrans); -static void mndTransSetRpcHandle(STrans *pTrans, void *rpcHandle); static void mndTransSendRpcRsp(STrans *pTrans, int32_t code); static int32_t mndTransAppendLog(SArray *pArray, SSdbRaw *pRaw); static int32_t mndTransAppendAction(SArray *pArray, STransAction *pAction); @@ -355,7 +354,7 @@ char *mndTransPolicyStr(ETrnPolicy policy) { } } -STrans *mndTransCreate(SMnode *pMnode, ETrnPolicy policy, void *rpcHandle) { +STrans *mndTransCreate(SMnode *pMnode, ETrnPolicy policy, SRpcMsg *pMsg) { STrans *pTrans = calloc(1, sizeof(STrans)); if (pTrans == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -366,7 +365,8 @@ STrans *mndTransCreate(SMnode *pMnode, ETrnPolicy policy, void *rpcHandle) { pTrans->id = sdbGetMaxId(pMnode->pSdb, SDB_TRANS); pTrans->stage = TRN_STAGE_PREPARE; pTrans->policy = policy; - pTrans->rpcHandle = rpcHandle; + pTrans->rpcHandle = pMsg->handle; + pTrans->rpcAHandle = pMsg->ahandle; pTrans->redoLogs = taosArrayInit(MND_TRANS_ARRAY_SIZE, sizeof(void *)); pTrans->undoLogs = taosArrayInit(MND_TRANS_ARRAY_SIZE, sizeof(void *)); pTrans->commitLogs = taosArrayInit(MND_TRANS_ARRAY_SIZE, sizeof(void *)); @@ -413,11 +413,6 @@ void mndTransDrop(STrans *pTrans) { tfree(pTrans); } -static void mndTransSetRpcHandle(STrans *pTrans, void *rpcHandle) { - pTrans->rpcHandle = rpcHandle; - mTrace("trans:%d, set rpc handle:%p", pTrans->id, rpcHandle); -} - static int32_t mndTransAppendLog(SArray *pArray, SSdbRaw *pRaw) { if (pArray == NULL || pRaw == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -506,6 +501,7 @@ int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans) { } pNewTrans->rpcHandle = pTrans->rpcHandle; + pNewTrans->rpcAHandle = pTrans->rpcAHandle; mndTransExecute(pMnode, pNewTrans); mndReleaseTrans(pMnode, pNewTrans); return 0; @@ -571,10 +567,11 @@ int32_t mndTransRollback(SMnode *pMnode, STrans *pTrans) { static void mndTransSendRpcRsp(STrans *pTrans, int32_t code) { if (code == TSDB_CODE_MND_ACTION_IN_PROGRESS) return; - mDebug("trans:%d, send rpc rsp, RPC:%p code:0x%x", pTrans->id, pTrans->rpcHandle, code & 0xFFFF); + mDebug("trans:%d, send rpc rsp, RPC:%p ahandle:%p code:0x%x", pTrans->id, pTrans->rpcHandle, pTrans->rpcAHandle, + code & 0xFFFF); if (pTrans->rpcHandle != NULL) { - SRpcMsg rspMsg = {.handle = pTrans->rpcHandle, .code = code}; + SRpcMsg rspMsg = {.handle = pTrans->rpcHandle, .code = code, .ahandle = pTrans->rpcAHandle}; rpcSendResponse(&rspMsg); } } @@ -739,7 +736,6 @@ static int32_t mndTransPerformExecuteStage(SMnode *pMnode, STrans *pTrans) { mDebug("trans:%d, stage from execute to commit", pTrans->id); } else if (code == TSDB_CODE_MND_ACTION_IN_PROGRESS) { mDebug("trans:%d, stage keep on execute since %s", pTrans->id, tstrerror(code)); - return code; } else { if (pTrans->policy == TRN_POLICY_ROLLBACK) { pTrans->stage = TRN_STAGE_ROLLBACK; @@ -790,9 +786,9 @@ static void mndTransExecute(SMnode *pMnode, STrans *pTrans) { } break; case TRN_STAGE_ROLLBACK: - code = mndTransPerformRollbackStage(pMnode, pTrans); + code = mndTransRollback(pMnode, pTrans); if (code == 0) { - code = mndTransRollback(pMnode, pTrans); + mndTransPerformRollbackStage(pMnode, pTrans); } break; default: diff --git a/source/dnode/mnode/impl/src/mndUser.c b/source/dnode/mnode/impl/src/mndUser.c index 9b5eca6d39..0926cf0c48 100644 --- a/source/dnode/mnode/impl/src/mndUser.c +++ b/source/dnode/mnode/impl/src/mndUser.c @@ -197,7 +197,7 @@ static int32_t mndCreateUser(SMnode *pMnode, char *acct, char *user, char *pass, userObj.updateTime = userObj.createdTime; userObj.superUser = 0; - STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, pMsg->rpcMsg.handle); + STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, &pMsg->rpcMsg); if (pTrans == NULL) { mError("user:%s, failed to create since %s", user, terrstr()); return -1; @@ -267,7 +267,7 @@ static int32_t mndProcessCreateUserMsg(SMnodeMsg *pMsg) { } static int32_t mndUpdateUser(SMnode *pMnode, SUserObj *pOldUser, SUserObj *pNewUser, SMnodeMsg *pMsg) { - STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, pMsg->rpcMsg.handle); + STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, &pMsg->rpcMsg); if (pTrans == NULL) { mError("user:%s, failed to update since %s", pOldUser->user, terrstr()); return -1; @@ -342,7 +342,7 @@ static int32_t mndProcessAlterUserMsg(SMnodeMsg *pMsg) { } static int32_t mndDropUser(SMnode *pMnode, SMnodeMsg *pMsg, SUserObj *pUser) { - STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, pMsg->rpcMsg.handle); + STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, &pMsg->rpcMsg); if (pTrans == NULL) { mError("user:%s, failed to drop since %s", pUser->user, terrstr()); return -1;