Merge branch '3.0' into feature/vnode

This commit is contained in:
Hongze Cheng 2021-12-25 16:11:47 +08:00
commit 85a53ac1fa
3 changed files with 7 additions and 6 deletions

View File

@ -95,6 +95,7 @@ typedef struct {
int32_t id; int32_t id;
ETrnStage stage; ETrnStage stage;
ETrnPolicy policy; ETrnPolicy policy;
int32_t retryTimes;
void *rpcHandle; void *rpcHandle;
void *rpcAHandle; void *rpcAHandle;
SArray *redoLogs; SArray *redoLogs;

View File

@ -383,7 +383,7 @@ static int32_t mndSetCreateStbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj
STransAction action = {0}; STransAction action = {0};
action.epSet = mndGetVgroupEpset(pMnode, pVgroup); action.epSet = mndGetVgroupEpset(pMnode, pVgroup);
action.pCont = pMsg; action.pCont = pMsg;
action.contLen = htonl(contLen); action.contLen = contLen;
action.msgType = TDMT_VND_CREATE_STB; action.msgType = TDMT_VND_CREATE_STB;
if (mndTransAppendRedoAction(pTrans, &action) != 0) { if (mndTransAppendRedoAction(pTrans, &action) != 0) {
free(pMsg); free(pMsg);

View File

@ -39,7 +39,7 @@ static int32_t mndTransExecuteUndoLogs(SMnode *pMnode, STrans *pTrans);
static int32_t mndTransExecuteCommitLogs(SMnode *pMnode, STrans *pTrans); static int32_t mndTransExecuteCommitLogs(SMnode *pMnode, STrans *pTrans);
static int32_t mndTransExecuteRedoActions(SMnode *pMnode, STrans *pTrans); static int32_t mndTransExecuteRedoActions(SMnode *pMnode, STrans *pTrans);
static int32_t mndTransExecuteUndoActions(SMnode *pMnode, STrans *pTrans); static int32_t mndTransExecuteUndoActions(SMnode *pMnode, STrans *pTrans);
static int32_t mndTransPerformPrepareStage(SMnode *pMnode, STrans *pTrans); static void mndTransPerformPrepareStage(SMnode *pMnode, STrans *pTrans);
static int32_t mndTransPerformExecuteStage(SMnode *pMnode, STrans *pTrans); static int32_t mndTransPerformExecuteStage(SMnode *pMnode, STrans *pTrans);
static int32_t mndTransPerformCommitStage(SMnode *pMnode, STrans *pTrans); static int32_t mndTransPerformCommitStage(SMnode *pMnode, STrans *pTrans);
static int32_t mndTransPerformRollbackStage(SMnode *pMnode, STrans *pTrans); static int32_t mndTransPerformRollbackStage(SMnode *pMnode, STrans *pTrans);
@ -714,7 +714,7 @@ static int32_t mndTransExecuteUndoActions(SMnode *pMnode, STrans *pTrans) {
return mndTransExecuteActions(pMnode, pTrans, pTrans->undoActions); return mndTransExecuteActions(pMnode, pTrans, pTrans->undoActions);
} }
static int32_t mndTransPerformPrepareStage(SMnode *pMnode, STrans *pTrans) { static void mndTransPerformPrepareStage(SMnode *pMnode, STrans *pTrans) {
int32_t code = mndTransExecuteRedoLogs(pMnode, pTrans); int32_t code = mndTransExecuteRedoLogs(pMnode, pTrans);
if (code == 0) { if (code == 0) {
@ -724,8 +724,6 @@ static int32_t mndTransPerformPrepareStage(SMnode *pMnode, STrans *pTrans) {
pTrans->stage = TRN_STAGE_ROLLBACK; pTrans->stage = TRN_STAGE_ROLLBACK;
mError("trans:%d, stage from prepare to rollback since %s", pTrans->id, terrstr()); mError("trans:%d, stage from prepare to rollback since %s", pTrans->id, terrstr());
} }
return 0;
} }
static int32_t mndTransPerformExecuteStage(SMnode *pMnode, STrans *pTrans) { static int32_t mndTransPerformExecuteStage(SMnode *pMnode, STrans *pTrans) {
@ -742,6 +740,7 @@ static int32_t mndTransPerformExecuteStage(SMnode *pMnode, STrans *pTrans) {
mError("trans:%d, stage from execute to rollback since %s", pTrans->id, terrstr()); mError("trans:%d, stage from execute to rollback since %s", pTrans->id, terrstr());
} else { } else {
pTrans->stage = TRN_STAGE_EXECUTE; pTrans->stage = TRN_STAGE_EXECUTE;
pTrans->retryTimes++;
mError("trans:%d, stage keep on execute since %s", pTrans->id, terrstr()); mError("trans:%d, stage keep on execute since %s", pTrans->id, terrstr());
} }
} }
@ -762,6 +761,7 @@ static int32_t mndTransPerformRollbackStage(SMnode *pMnode, STrans *pTrans) {
mDebug("trans:%d, rollbacked", pTrans->id); mDebug("trans:%d, rollbacked", pTrans->id);
} else { } else {
pTrans->stage = TRN_STAGE_ROLLBACK; pTrans->stage = TRN_STAGE_ROLLBACK;
pTrans->retryTimes++;
mError("trans:%d, stage keep on rollback since %s", pTrans->id, terrstr()); mError("trans:%d, stage keep on rollback since %s", pTrans->id, terrstr());
} }
@ -774,7 +774,7 @@ static void mndTransExecute(SMnode *pMnode, STrans *pTrans) {
while (code == 0) { while (code == 0) {
switch (pTrans->stage) { switch (pTrans->stage) {
case TRN_STAGE_PREPARE: case TRN_STAGE_PREPARE:
code = mndTransPerformPrepareStage(pMnode, pTrans); mndTransPerformPrepareStage(pMnode, pTrans);
break; break;
case TRN_STAGE_EXECUTE: case TRN_STAGE_EXECUTE:
code = mndTransPerformExecuteStage(pMnode, pTrans); code = mndTransPerformExecuteStage(pMnode, pTrans);