Merge pull request #28124 from taosdata/fix/TD-32316-add-try-at-trans-sync-context
fix/TD-32316-add-try-at-trans-sync-context
This commit is contained in:
commit
6489e2201f
|
@ -1308,7 +1308,7 @@ static int32_t mndTransWriteSingleLog(SMnode *pMnode, STrans *pTrans, STransActi
|
||||||
} else {
|
} else {
|
||||||
pAction->errCode = (terrno != 0) ? terrno : code;
|
pAction->errCode = (terrno != 0) ? terrno : code;
|
||||||
mError("trans:%d, %s:%d failed to write sdb since %s, type:%s status:%s", pTrans->id, mndTransStr(pAction->stage),
|
mError("trans:%d, %s:%d failed to write sdb since %s, type:%s status:%s", pTrans->id, mndTransStr(pAction->stage),
|
||||||
pAction->id, terrstr(), sdbTableName(pAction->pRaw->type), sdbStatusName(pAction->pRaw->status));
|
pAction->id, tstrerror(code), sdbTableName(pAction->pRaw->type), sdbStatusName(pAction->pRaw->status));
|
||||||
mndSetTransLastAction(pTrans, pAction);
|
mndSetTransLastAction(pTrans, pAction);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1528,7 +1528,13 @@ static int32_t mndTransExecuteActionsSerial(SMnode *pMnode, STrans *pTrans, SArr
|
||||||
}
|
}
|
||||||
mndSetTransLastAction(pTrans, pAction);
|
mndSetTransLastAction(pTrans, pAction);
|
||||||
|
|
||||||
if (mndCannotExecuteTransAction(pMnode, topHalf)) break;
|
if (mndCannotExecuteTransAction(pMnode, topHalf)) {
|
||||||
|
pTrans->lastErrorNo = code;
|
||||||
|
pTrans->code = code;
|
||||||
|
mInfo("trans:%d, %s:%d, topHalf:%d, not execute next action, code:%s", pTrans->id, mndTransStr(pAction->stage),
|
||||||
|
action, topHalf, tstrerror(code));
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
if (code == 0) {
|
if (code == 0) {
|
||||||
pTrans->code = 0;
|
pTrans->code = 0;
|
||||||
|
@ -1626,7 +1632,20 @@ static bool mndTransPerformRedoActionStage(SMnode *pMnode, STrans *pTrans, bool
|
||||||
code = mndTransExecuteRedoActions(pMnode, pTrans, topHalf);
|
code = mndTransExecuteRedoActions(pMnode, pTrans, topHalf);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (mndCannotExecuteTransAction(pMnode, topHalf)) return false;
|
if (mndCannotExecuteTransAction(pMnode, topHalf)) {
|
||||||
|
pTrans->lastErrorNo = code;
|
||||||
|
pTrans->code = code;
|
||||||
|
bool continueExec = true;
|
||||||
|
if (code != 0 && code != TSDB_CODE_MND_TRANS_CTX_SWITCH) {
|
||||||
|
continueExec = true;
|
||||||
|
} else {
|
||||||
|
continueExec = false;
|
||||||
|
}
|
||||||
|
mInfo("trans:%d, cannot execute redo action stage, topHalf:%d, continueExec:%d, code:%s", pTrans->id, topHalf,
|
||||||
|
continueExec, tstrerror(code));
|
||||||
|
|
||||||
|
return continueExec;
|
||||||
|
}
|
||||||
terrno = code;
|
terrno = code;
|
||||||
|
|
||||||
if (code == 0) {
|
if (code == 0) {
|
||||||
|
@ -1843,13 +1862,13 @@ void mndTransExecuteImp(SMnode *pMnode, STrans *pTrans, bool topHalf) {
|
||||||
// start trans, pullup, receive rsp, kill
|
// start trans, pullup, receive rsp, kill
|
||||||
void mndTransExecute(SMnode *pMnode, STrans *pTrans) {
|
void mndTransExecute(SMnode *pMnode, STrans *pTrans) {
|
||||||
bool topHalf = true;
|
bool topHalf = true;
|
||||||
return mndTransExecuteImp(pMnode, pTrans, topHalf);
|
mndTransExecuteImp(pMnode, pTrans, topHalf);
|
||||||
}
|
}
|
||||||
|
|
||||||
// update trans
|
// update trans
|
||||||
void mndTransRefresh(SMnode *pMnode, STrans *pTrans) {
|
void mndTransRefresh(SMnode *pMnode, STrans *pTrans) {
|
||||||
bool topHalf = false;
|
bool topHalf = false;
|
||||||
return mndTransExecuteImp(pMnode, pTrans, topHalf);
|
mndTransExecuteImp(pMnode, pTrans, topHalf);
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndProcessTransTimer(SRpcMsg *pReq) {
|
static int32_t mndProcessTransTimer(SRpcMsg *pReq) {
|
||||||
|
|
|
@ -162,13 +162,13 @@ static int32_t sdbInsertRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow *
|
||||||
pRow->status = pRaw->status;
|
pRow->status = pRaw->status;
|
||||||
sdbPrintOper(pSdb, pRow, "insert");
|
sdbPrintOper(pSdb, pRow, "insert");
|
||||||
|
|
||||||
if (taosHashPut(hash, pRow->pObj, keySize, &pRow, sizeof(void *)) != 0) {
|
int32_t code = 0;
|
||||||
|
if ((code = taosHashPut(hash, pRow->pObj, keySize, &pRow, sizeof(void *))) != 0) {
|
||||||
sdbUnLock(pSdb, type);
|
sdbUnLock(pSdb, type);
|
||||||
sdbFreeRow(pSdb, pRow, false);
|
sdbFreeRow(pSdb, pRow, false);
|
||||||
return terrno;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t code = 0;
|
|
||||||
SdbInsertFp insertFp = pSdb->insertFps[pRow->type];
|
SdbInsertFp insertFp = pSdb->insertFps[pRow->type];
|
||||||
if (insertFp != NULL) {
|
if (insertFp != NULL) {
|
||||||
code = (*insertFp)(pSdb, pRow->pObj);
|
code = (*insertFp)(pSdb, pRow->pObj);
|
||||||
|
|
|
@ -728,6 +728,8 @@ int32_t syncFsmExecute(SSyncNode* pNode, SSyncFSM* pFsm, ESyncState role, SyncTe
|
||||||
sDebug("vgId:%d, get response info, seqNum:%" PRId64 ", num:%d", pNode->vgId, cbMeta.seqNum, num);
|
sDebug("vgId:%d, get response info, seqNum:%" PRId64 ", num:%d", pNode->vgId, cbMeta.seqNum, num);
|
||||||
code = pFsm->FpCommitCb(pFsm, &rpcMsg, &cbMeta);
|
code = pFsm->FpCommitCb(pFsm, &rpcMsg, &cbMeta);
|
||||||
retry = (code != 0) && (terrno == TSDB_CODE_OUT_OF_RPC_MEMORY_QUEUE);
|
retry = (code != 0) && (terrno == TSDB_CODE_OUT_OF_RPC_MEMORY_QUEUE);
|
||||||
|
sDebug("vgId:%d, fsm execute, index:%" PRId64 ", term:%" PRId64 ", type:%s, code:%d, retry:%d", pNode->vgId,
|
||||||
|
pEntry->index, pEntry->term, TMSG_INFO(pEntry->originalRpcType), code, retry);
|
||||||
if (retry) {
|
if (retry) {
|
||||||
taosMsleep(10);
|
taosMsleep(10);
|
||||||
sError("vgId:%d, retry on fsm commit since %s. index:%" PRId64, pNode->vgId, tstrerror(code), pEntry->index);
|
sError("vgId:%d, retry on fsm commit since %s. index:%" PRId64, pNode->vgId, tstrerror(code), pEntry->index);
|
||||||
|
|
Loading…
Reference in New Issue