minor changes

This commit is contained in:
Shengliang Guan 2021-12-25 16:05:18 +08:00
parent 406a350476
commit be06f24c46
4 changed files with 9 additions and 8 deletions

View File

@ -822,12 +822,12 @@ static void dndProcessVnodeWriteQueue(SVnodeObj *pVnode, taos_qall qall, int32_t
assert(ptr != NULL); assert(ptr != NULL);
} }
vnodeProcessWMsgs(pVnode->pImpl, pArray); //vnodeProcessWMsgs(pVnode->pImpl, pArray);
for (size_t i = 0; i < numOfMsgs; i++) { for (size_t i = 0; i < numOfMsgs; i++) {
SRpcMsg *pRsp = NULL; SRpcMsg *pRsp = NULL;
SRpcMsg *pMsg = *(SRpcMsg **)taosArrayGet(pArray, i); SRpcMsg *pMsg = *(SRpcMsg **)taosArrayGet(pArray, i);
int32_t code = vnodeApplyWMsg(pVnode->pImpl, pMsg, &pRsp); int32_t code = 0; //vnodeApplyWMsg(pVnode->pImpl, pMsg, &pRsp);
if (pRsp != NULL) { if (pRsp != NULL) {
pRsp->ahandle = pMsg->ahandle; pRsp->ahandle = pMsg->ahandle;
rpcSendResponse(pRsp); rpcSendResponse(pRsp);

View File

@ -95,6 +95,7 @@ typedef struct {
int32_t id; int32_t id;
ETrnStage stage; ETrnStage stage;
ETrnPolicy policy; ETrnPolicy policy;
int32_t retryTimes;
void *rpcHandle; void *rpcHandle;
void *rpcAHandle; void *rpcAHandle;
SArray *redoLogs; SArray *redoLogs;

View File

@ -374,7 +374,7 @@ static int32_t mndSetCreateStbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj
STransAction action = {0}; STransAction action = {0};
action.epSet = mndGetVgroupEpset(pMnode, pVgroup); action.epSet = mndGetVgroupEpset(pMnode, pVgroup);
action.pCont = pMsg; action.pCont = pMsg;
action.contLen = htonl(contLen); action.contLen = contLen;
action.msgType = TDMT_VND_CREATE_STB; action.msgType = TDMT_VND_CREATE_STB;
if (mndTransAppendRedoAction(pTrans, &action) != 0) { if (mndTransAppendRedoAction(pTrans, &action) != 0) {
free(pMsg); free(pMsg);

View File

@ -39,7 +39,7 @@ static int32_t mndTransExecuteUndoLogs(SMnode *pMnode, STrans *pTrans);
static int32_t mndTransExecuteCommitLogs(SMnode *pMnode, STrans *pTrans); static int32_t mndTransExecuteCommitLogs(SMnode *pMnode, STrans *pTrans);
static int32_t mndTransExecuteRedoActions(SMnode *pMnode, STrans *pTrans); static int32_t mndTransExecuteRedoActions(SMnode *pMnode, STrans *pTrans);
static int32_t mndTransExecuteUndoActions(SMnode *pMnode, STrans *pTrans); static int32_t mndTransExecuteUndoActions(SMnode *pMnode, STrans *pTrans);
static int32_t mndTransPerformPrepareStage(SMnode *pMnode, STrans *pTrans); static void mndTransPerformPrepareStage(SMnode *pMnode, STrans *pTrans);
static int32_t mndTransPerformExecuteStage(SMnode *pMnode, STrans *pTrans); static int32_t mndTransPerformExecuteStage(SMnode *pMnode, STrans *pTrans);
static int32_t mndTransPerformCommitStage(SMnode *pMnode, STrans *pTrans); static int32_t mndTransPerformCommitStage(SMnode *pMnode, STrans *pTrans);
static int32_t mndTransPerformRollbackStage(SMnode *pMnode, STrans *pTrans); static int32_t mndTransPerformRollbackStage(SMnode *pMnode, STrans *pTrans);
@ -714,7 +714,7 @@ static int32_t mndTransExecuteUndoActions(SMnode *pMnode, STrans *pTrans) {
return mndTransExecuteActions(pMnode, pTrans, pTrans->undoActions); return mndTransExecuteActions(pMnode, pTrans, pTrans->undoActions);
} }
static int32_t mndTransPerformPrepareStage(SMnode *pMnode, STrans *pTrans) { static void mndTransPerformPrepareStage(SMnode *pMnode, STrans *pTrans) {
int32_t code = mndTransExecuteRedoLogs(pMnode, pTrans); int32_t code = mndTransExecuteRedoLogs(pMnode, pTrans);
if (code == 0) { if (code == 0) {
@ -724,8 +724,6 @@ static int32_t mndTransPerformPrepareStage(SMnode *pMnode, STrans *pTrans) {
pTrans->stage = TRN_STAGE_ROLLBACK; pTrans->stage = TRN_STAGE_ROLLBACK;
mError("trans:%d, stage from prepare to rollback since %s", pTrans->id, terrstr()); mError("trans:%d, stage from prepare to rollback since %s", pTrans->id, terrstr());
} }
return 0;
} }
static int32_t mndTransPerformExecuteStage(SMnode *pMnode, STrans *pTrans) { static int32_t mndTransPerformExecuteStage(SMnode *pMnode, STrans *pTrans) {
@ -742,6 +740,7 @@ static int32_t mndTransPerformExecuteStage(SMnode *pMnode, STrans *pTrans) {
mError("trans:%d, stage from execute to rollback since %s", pTrans->id, terrstr()); mError("trans:%d, stage from execute to rollback since %s", pTrans->id, terrstr());
} else { } else {
pTrans->stage = TRN_STAGE_EXECUTE; pTrans->stage = TRN_STAGE_EXECUTE;
pTrans->retryTimes++;
mError("trans:%d, stage keep on execute since %s", pTrans->id, terrstr()); mError("trans:%d, stage keep on execute since %s", pTrans->id, terrstr());
} }
} }
@ -762,6 +761,7 @@ static int32_t mndTransPerformRollbackStage(SMnode *pMnode, STrans *pTrans) {
mDebug("trans:%d, rollbacked", pTrans->id); mDebug("trans:%d, rollbacked", pTrans->id);
} else { } else {
pTrans->stage = TRN_STAGE_ROLLBACK; pTrans->stage = TRN_STAGE_ROLLBACK;
pTrans->retryTimes++;
mError("trans:%d, stage keep on rollback since %s", pTrans->id, terrstr()); mError("trans:%d, stage keep on rollback since %s", pTrans->id, terrstr());
} }
@ -774,7 +774,7 @@ static void mndTransExecute(SMnode *pMnode, STrans *pTrans) {
while (code == 0) { while (code == 0) {
switch (pTrans->stage) { switch (pTrans->stage) {
case TRN_STAGE_PREPARE: case TRN_STAGE_PREPARE:
code = mndTransPerformPrepareStage(pMnode, pTrans); mndTransPerformPrepareStage(pMnode, pTrans);
break; break;
case TRN_STAGE_EXECUTE: case TRN_STAGE_EXECUTE:
code = mndTransPerformExecuteStage(pMnode, pTrans); code = mndTransPerformExecuteStage(pMnode, pTrans);