add ahandle to topic
This commit is contained in:
parent
856104c4f3
commit
1f70617b37
|
@ -96,6 +96,7 @@ typedef struct {
|
|||
ETrnStage stage;
|
||||
ETrnPolicy policy;
|
||||
void *rpcHandle;
|
||||
void *rpcAHandle;
|
||||
SArray *redoLogs;
|
||||
SArray *undoLogs;
|
||||
SArray *commitLogs;
|
||||
|
|
|
@ -35,7 +35,7 @@ typedef struct {
|
|||
int32_t mndInitTrans(SMnode *pMnode);
|
||||
void mndCleanupTrans(SMnode *pMnode);
|
||||
|
||||
STrans *mndTransCreate(SMnode *pMnode, ETrnPolicy policy, void *rpcHandle);
|
||||
STrans *mndTransCreate(SMnode *pMnode, ETrnPolicy policy, SRpcMsg *pMsg);
|
||||
void mndTransDrop(STrans *pTrans);
|
||||
int32_t mndTransAppendRedolog(STrans *pTrans, SSdbRaw *pRaw);
|
||||
int32_t mndTransAppendUndolog(STrans *pTrans, SSdbRaw *pRaw);
|
||||
|
|
|
@ -400,7 +400,7 @@ static int32_t mndCreateDb(SMnode *pMnode, SMnodeMsg *pMsg, SCreateDbMsg *pCreat
|
|||
}
|
||||
|
||||
int32_t code = -1;
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, pMsg->rpcMsg.handle);
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, &pMsg->rpcMsg);
|
||||
if (pTrans == NULL) {
|
||||
mError("db:%s, failed to create since %s", pCreate->db, terrstr());
|
||||
goto CREATE_DB_OVER;
|
||||
|
@ -608,7 +608,7 @@ static int32_t mndSetUpdateDbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj
|
|||
|
||||
static int32_t mndUpdateDb(SMnode *pMnode, SMnodeMsg *pMsg, SDbObj *pOldDb, SDbObj *pNewDb) {
|
||||
int32_t code = -1;
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, pMsg->rpcMsg.handle);
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, &pMsg->rpcMsg);
|
||||
if (pTrans == NULL) {
|
||||
mError("db:%s, failed to update since %s", pOldDb->name, terrstr());
|
||||
return terrno;
|
||||
|
@ -772,7 +772,7 @@ static int32_t mndSetDropDbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *p
|
|||
|
||||
static int32_t mndDropDb(SMnode *pMnode, SMnodeMsg *pMsg, SDbObj *pDb) {
|
||||
int32_t code = -1;
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, pMsg->rpcMsg.handle);
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, &pMsg->rpcMsg);
|
||||
if (pTrans == NULL) {
|
||||
mError("db:%s, failed to drop since %s", pDb->name, terrstr());
|
||||
return -1;
|
||||
|
|
|
@ -396,7 +396,7 @@ static int32_t mndCreateDnode(SMnode *pMnode, SMnodeMsg *pMsg, SCreateDnodeMsg *
|
|||
return terrno;
|
||||
}
|
||||
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, pMsg->rpcMsg.handle);
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, &pMsg->rpcMsg);
|
||||
if (pTrans == NULL) {
|
||||
mError("dnode:%s, failed to create since %s", pCreate->ep, terrstr());
|
||||
return -1;
|
||||
|
@ -452,7 +452,7 @@ static int32_t mndProcessCreateDnodeMsg(SMnodeMsg *pMsg) {
|
|||
}
|
||||
|
||||
static int32_t mndDropDnode(SMnode *pMnode, SMnodeMsg *pMsg, SDnodeObj *pDnode) {
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, pMsg->rpcMsg.handle);
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, &pMsg->rpcMsg);
|
||||
if (pTrans == NULL) {
|
||||
mError("dnode:%d, failed to drop since %s", pDnode->id, terrstr());
|
||||
return -1;
|
||||
|
|
|
@ -147,7 +147,7 @@ static int32_t mndCreateFunc(SMnode *pMnode, SMnodeMsg *pMsg, SCreateFuncMsg *pC
|
|||
pFunc->pCode = pFunc->pData + pCreate->commentSize;
|
||||
memcpy(pFunc->pCode, pCreate->pCont + pCreate->commentSize, pFunc->codeSize);
|
||||
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, pMsg->rpcMsg.handle);
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, &pMsg->rpcMsg);
|
||||
if (pTrans == NULL) {
|
||||
free(pFunc);
|
||||
mError("func:%s, failed to create since %s", pCreate->name, terrstr());
|
||||
|
@ -195,7 +195,7 @@ static int32_t mndCreateFunc(SMnode *pMnode, SMnodeMsg *pMsg, SCreateFuncMsg *pC
|
|||
}
|
||||
|
||||
static int32_t mndDropFunc(SMnode *pMnode, SMnodeMsg *pMsg, SFuncObj *pFunc) {
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, pMsg->rpcMsg.handle);
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, &pMsg->rpcMsg);
|
||||
if (pTrans == NULL) {
|
||||
mError("func:%s, failed to drop since %s", pFunc->name, terrstr());
|
||||
return -1;
|
||||
|
|
|
@ -334,7 +334,7 @@ static int32_t mndCreateMnode(SMnode *pMnode, SMnodeMsg *pMsg, SDnodeObj *pDnode
|
|||
mnodeObj.updateTime = mnodeObj.createdTime;
|
||||
|
||||
int32_t code = -1;
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, pMsg->rpcMsg.handle);
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, &pMsg->rpcMsg);
|
||||
if (pTrans == NULL) {
|
||||
mError("mnode:%d, failed to create since %s", pCreate->dnodeId, terrstr());
|
||||
goto CREATE_MNODE_OVER;
|
||||
|
@ -500,7 +500,7 @@ static int32_t mndSetDropMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDnode
|
|||
|
||||
static int32_t mndDropMnode(SMnode *pMnode, SMnodeMsg *pMsg, SMnodeObj *pObj) {
|
||||
int32_t code = -1;
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, pMsg->rpcMsg.handle);
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, &pMsg->rpcMsg);
|
||||
if (pTrans == NULL) {
|
||||
mError("mnode:%d, failed to drop since %s", pObj->id, terrstr());
|
||||
goto DROP_MNODE_OVER;
|
||||
|
|
|
@ -415,7 +415,7 @@ static int32_t mndCreateStb(SMnode *pMnode, SMnodeMsg *pMsg, SCreateStbMsg *pCre
|
|||
memcpy(stbObj.pSchema, pCreate->pSchema, totalSize);
|
||||
|
||||
int32_t code = 0;
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, pMsg->rpcMsg.handle);
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, &pMsg->rpcMsg);
|
||||
if (pTrans == NULL) {
|
||||
mError("stb:%s, failed to create since %s", pCreate->name, terrstr());
|
||||
return -1;
|
||||
|
@ -614,7 +614,7 @@ static int32_t mndSetDropStbUndoActions(SMnode *pMnode, STrans *pTrans, SStbObj
|
|||
|
||||
static int32_t mndDropStb(SMnode *pMnode, SMnodeMsg *pMsg, SStbObj *pStb) {
|
||||
int32_t code = -1;
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, pMsg->rpcMsg.handle);
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, &pMsg->rpcMsg);
|
||||
if (pTrans == NULL) {
|
||||
mError("stb:%s, failed to drop since %s", pStb->name, terrstr());
|
||||
return -1;
|
||||
|
|
|
@ -367,7 +367,7 @@ static int32_t mndCreateTopic(SMnode *pMnode, SMnodeMsg *pMsg, SCreateTopicMsg *
|
|||
#endif
|
||||
|
||||
int32_t code = 0;
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, pMsg->rpcMsg.handle);
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, &pMsg->rpcMsg);
|
||||
if (pTrans == NULL) {
|
||||
mError("topic:%s, failed to create since %s", pCreate->name, terrstr());
|
||||
return -1;
|
||||
|
@ -561,7 +561,7 @@ static int32_t mndSetDropTopicUndoActions(SMnode *pMnode, STrans *pTrans, STopic
|
|||
|
||||
static int32_t mndDropTopic(SMnode *pMnode, SMnodeMsg *pMsg, STopicObj *pTopic) {
|
||||
int32_t code = -1;
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, pMsg->rpcMsg.handle);
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, &pMsg->rpcMsg);
|
||||
if (pTrans == NULL) {
|
||||
mError("topic:%s, failed to drop since %s", pTopic->name, terrstr());
|
||||
return -1;
|
||||
|
|
|
@ -27,7 +27,6 @@ static int32_t mndTransActionInsert(SSdb *pSdb, STrans *pTrans);
|
|||
static int32_t mndTransActionUpdate(SSdb *pSdb, STrans *OldTrans, STrans *pOldTrans);
|
||||
static int32_t mndTransActionDelete(SSdb *pSdb, STrans *pTrans);
|
||||
|
||||
static void mndTransSetRpcHandle(STrans *pTrans, void *rpcHandle);
|
||||
static void mndTransSendRpcRsp(STrans *pTrans, int32_t code);
|
||||
static int32_t mndTransAppendLog(SArray *pArray, SSdbRaw *pRaw);
|
||||
static int32_t mndTransAppendAction(SArray *pArray, STransAction *pAction);
|
||||
|
@ -355,7 +354,7 @@ char *mndTransPolicyStr(ETrnPolicy policy) {
|
|||
}
|
||||
}
|
||||
|
||||
STrans *mndTransCreate(SMnode *pMnode, ETrnPolicy policy, void *rpcHandle) {
|
||||
STrans *mndTransCreate(SMnode *pMnode, ETrnPolicy policy, SRpcMsg *pMsg) {
|
||||
STrans *pTrans = calloc(1, sizeof(STrans));
|
||||
if (pTrans == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
|
@ -366,7 +365,8 @@ STrans *mndTransCreate(SMnode *pMnode, ETrnPolicy policy, void *rpcHandle) {
|
|||
pTrans->id = sdbGetMaxId(pMnode->pSdb, SDB_TRANS);
|
||||
pTrans->stage = TRN_STAGE_PREPARE;
|
||||
pTrans->policy = policy;
|
||||
pTrans->rpcHandle = rpcHandle;
|
||||
pTrans->rpcHandle = pMsg->handle;
|
||||
pTrans->rpcAHandle = pMsg->ahandle;
|
||||
pTrans->redoLogs = taosArrayInit(MND_TRANS_ARRAY_SIZE, sizeof(void *));
|
||||
pTrans->undoLogs = taosArrayInit(MND_TRANS_ARRAY_SIZE, sizeof(void *));
|
||||
pTrans->commitLogs = taosArrayInit(MND_TRANS_ARRAY_SIZE, sizeof(void *));
|
||||
|
@ -413,11 +413,6 @@ void mndTransDrop(STrans *pTrans) {
|
|||
tfree(pTrans);
|
||||
}
|
||||
|
||||
static void mndTransSetRpcHandle(STrans *pTrans, void *rpcHandle) {
|
||||
pTrans->rpcHandle = rpcHandle;
|
||||
mTrace("trans:%d, set rpc handle:%p", pTrans->id, rpcHandle);
|
||||
}
|
||||
|
||||
static int32_t mndTransAppendLog(SArray *pArray, SSdbRaw *pRaw) {
|
||||
if (pArray == NULL || pRaw == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
|
@ -506,6 +501,7 @@ int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans) {
|
|||
}
|
||||
|
||||
pNewTrans->rpcHandle = pTrans->rpcHandle;
|
||||
pNewTrans->rpcAHandle = pTrans->rpcAHandle;
|
||||
mndTransExecute(pMnode, pNewTrans);
|
||||
mndReleaseTrans(pMnode, pNewTrans);
|
||||
return 0;
|
||||
|
@ -571,10 +567,11 @@ int32_t mndTransRollback(SMnode *pMnode, STrans *pTrans) {
|
|||
|
||||
static void mndTransSendRpcRsp(STrans *pTrans, int32_t code) {
|
||||
if (code == TSDB_CODE_MND_ACTION_IN_PROGRESS) return;
|
||||
mDebug("trans:%d, send rpc rsp, RPC:%p code:0x%x", pTrans->id, pTrans->rpcHandle, code & 0xFFFF);
|
||||
mDebug("trans:%d, send rpc rsp, RPC:%p ahandle:%p code:0x%x", pTrans->id, pTrans->rpcHandle, pTrans->rpcAHandle,
|
||||
code & 0xFFFF);
|
||||
|
||||
if (pTrans->rpcHandle != NULL) {
|
||||
SRpcMsg rspMsg = {.handle = pTrans->rpcHandle, .code = code};
|
||||
SRpcMsg rspMsg = {.handle = pTrans->rpcHandle, .code = code, .ahandle = pTrans->rpcAHandle};
|
||||
rpcSendResponse(&rspMsg);
|
||||
}
|
||||
}
|
||||
|
@ -739,7 +736,6 @@ static int32_t mndTransPerformExecuteStage(SMnode *pMnode, STrans *pTrans) {
|
|||
mDebug("trans:%d, stage from execute to commit", pTrans->id);
|
||||
} else if (code == TSDB_CODE_MND_ACTION_IN_PROGRESS) {
|
||||
mDebug("trans:%d, stage keep on execute since %s", pTrans->id, tstrerror(code));
|
||||
return code;
|
||||
} else {
|
||||
if (pTrans->policy == TRN_POLICY_ROLLBACK) {
|
||||
pTrans->stage = TRN_STAGE_ROLLBACK;
|
||||
|
@ -790,9 +786,9 @@ static void mndTransExecute(SMnode *pMnode, STrans *pTrans) {
|
|||
}
|
||||
break;
|
||||
case TRN_STAGE_ROLLBACK:
|
||||
code = mndTransPerformRollbackStage(pMnode, pTrans);
|
||||
code = mndTransRollback(pMnode, pTrans);
|
||||
if (code == 0) {
|
||||
code = mndTransRollback(pMnode, pTrans);
|
||||
mndTransPerformRollbackStage(pMnode, pTrans);
|
||||
}
|
||||
break;
|
||||
default:
|
||||
|
|
|
@ -197,7 +197,7 @@ static int32_t mndCreateUser(SMnode *pMnode, char *acct, char *user, char *pass,
|
|||
userObj.updateTime = userObj.createdTime;
|
||||
userObj.superUser = 0;
|
||||
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, pMsg->rpcMsg.handle);
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, &pMsg->rpcMsg);
|
||||
if (pTrans == NULL) {
|
||||
mError("user:%s, failed to create since %s", user, terrstr());
|
||||
return -1;
|
||||
|
@ -267,7 +267,7 @@ static int32_t mndProcessCreateUserMsg(SMnodeMsg *pMsg) {
|
|||
}
|
||||
|
||||
static int32_t mndUpdateUser(SMnode *pMnode, SUserObj *pOldUser, SUserObj *pNewUser, SMnodeMsg *pMsg) {
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, pMsg->rpcMsg.handle);
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, &pMsg->rpcMsg);
|
||||
if (pTrans == NULL) {
|
||||
mError("user:%s, failed to update since %s", pOldUser->user, terrstr());
|
||||
return -1;
|
||||
|
@ -342,7 +342,7 @@ static int32_t mndProcessAlterUserMsg(SMnodeMsg *pMsg) {
|
|||
}
|
||||
|
||||
static int32_t mndDropUser(SMnode *pMnode, SMnodeMsg *pMsg, SUserObj *pUser) {
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, pMsg->rpcMsg.handle);
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, &pMsg->rpcMsg);
|
||||
if (pTrans == NULL) {
|
||||
mError("user:%s, failed to drop since %s", pUser->user, terrstr());
|
||||
return -1;
|
||||
|
|
Loading…
Reference in New Issue