refact: post sem in mnode while sync timeout

This commit is contained in:
Shengliang Guan 2022-12-14 10:30:47 +08:00
parent f623837b60
commit 3b519172a3
2 changed files with 19 additions and 9 deletions

View File

@ -101,6 +101,7 @@ static void *mndBuildCheckpointTickMsg(int32_t *pContLen, int64_t sec) {
} }
static void mndPullupTrans(SMnode *pMnode) { static void mndPullupTrans(SMnode *pMnode) {
mTrace("pullup trans msg");
int32_t contLen = 0; int32_t contLen = 0;
void *pReq = mndBuildTimerMsg(&contLen); void *pReq = mndBuildTimerMsg(&contLen);
if (pReq != NULL) { if (pReq != NULL) {
@ -110,6 +111,7 @@ static void mndPullupTrans(SMnode *pMnode) {
} }
static void mndPullupTtl(SMnode *pMnode) { static void mndPullupTtl(SMnode *pMnode) {
mTrace("pullup ttl");
int32_t contLen = 0; int32_t contLen = 0;
void *pReq = mndBuildTimerMsg(&contLen); void *pReq = mndBuildTimerMsg(&contLen);
SRpcMsg rpcMsg = {.msgType = TDMT_MND_TTL_TIMER, .pCont = pReq, .contLen = contLen}; SRpcMsg rpcMsg = {.msgType = TDMT_MND_TTL_TIMER, .pCont = pReq, .contLen = contLen};
@ -117,6 +119,7 @@ static void mndPullupTtl(SMnode *pMnode) {
} }
static void mndCalMqRebalance(SMnode *pMnode) { static void mndCalMqRebalance(SMnode *pMnode) {
mTrace("calc mq rebalance");
int32_t contLen = 0; int32_t contLen = 0;
void *pReq = mndBuildTimerMsg(&contLen); void *pReq = mndBuildTimerMsg(&contLen);
if (pReq != NULL) { if (pReq != NULL) {
@ -143,6 +146,7 @@ static void mndStreamCheckpointTick(SMnode *pMnode, int64_t sec) {
} }
static void mndPullupTelem(SMnode *pMnode) { static void mndPullupTelem(SMnode *pMnode) {
mTrace("pullup telem msg");
int32_t contLen = 0; int32_t contLen = 0;
void *pReq = mndBuildTimerMsg(&contLen); void *pReq = mndBuildTimerMsg(&contLen);
if (pReq != NULL) { if (pReq != NULL) {
@ -152,6 +156,7 @@ static void mndPullupTelem(SMnode *pMnode) {
} }
static void mndPullupGrant(SMnode *pMnode) { static void mndPullupGrant(SMnode *pMnode) {
mTrace("pullup grant msg");
int32_t contLen = 0; int32_t contLen = 0;
void *pReq = mndBuildTimerMsg(&contLen); void *pReq = mndBuildTimerMsg(&contLen);
if (pReq != NULL) { if (pReq != NULL) {
@ -162,6 +167,7 @@ static void mndPullupGrant(SMnode *pMnode) {
} }
static void mndIncreaseUpTime(SMnode *pMnode) { static void mndIncreaseUpTime(SMnode *pMnode) {
mTrace("increate uptime");
int32_t contLen = 0; int32_t contLen = 0;
void *pReq = mndBuildTimerMsg(&contLen); void *pReq = mndBuildTimerMsg(&contLen);
if (pReq != NULL) { if (pReq != NULL) {
@ -213,6 +219,7 @@ static void mndSetVgroupOffline(SMnode *pMnode, int32_t dnodeId, int64_t curMs)
} }
static void mndCheckDnodeOffline(SMnode *pMnode) { static void mndCheckDnodeOffline(SMnode *pMnode) {
mTrace("check dnode offline");
if (mndAcquireRpc(pMnode) != 0) return; if (mndAcquireRpc(pMnode) != 0) return;
SSdb *pSdb = pMnode->pSdb; SSdb *pSdb = pMnode->pSdb;

View File

@ -78,26 +78,27 @@ int32_t mndProcessWriteMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFsmCbMeta
SSdbRaw *pRaw = pMsg->pCont; SSdbRaw *pRaw = pMsg->pCont;
int32_t transId = sdbGetIdFromRaw(pMnode->pSdb, pRaw); int32_t transId = sdbGetIdFromRaw(pMnode->pSdb, pRaw);
pMgmt->errCode = pMeta->code;
mInfo("trans:%d, is proposed, saved:%d code:0x%x, apply index:%" PRId64 " term:%" PRIu64 " config:%" PRId64 mInfo("trans:%d, is proposed, saved:%d code:0x%x, apply index:%" PRId64 " term:%" PRIu64 " config:%" PRId64
" role:%s raw:%p", " role:%s raw:%p sec:%d seq:%" PRId64,
transId, pMgmt->transId, pMeta->code, pMeta->index, pMeta->term, pMeta->lastConfigIndex, syncStr(pMeta->state), transId, pMgmt->transId, pMeta->code, pMeta->index, pMeta->term, pMeta->lastConfigIndex, syncStr(pMeta->state),
pRaw); pRaw, pMgmt->transSec, pMgmt->transSeq);
if (pMgmt->errCode == 0) { if (pMeta->code == 0) {
sdbWriteWithoutFree(pMnode->pSdb, pRaw); sdbWriteWithoutFree(pMnode->pSdb, pRaw);
sdbSetApplyInfo(pMnode->pSdb, pMeta->index, pMeta->term, pMeta->lastConfigIndex); sdbSetApplyInfo(pMnode->pSdb, pMeta->index, pMeta->term, pMeta->lastConfigIndex);
} }
taosThreadMutexLock(&pMgmt->lock); taosThreadMutexLock(&pMgmt->lock);
pMgmt->errCode = pMeta->code;
if (transId <= 0) { if (transId <= 0) {
taosThreadMutexUnlock(&pMgmt->lock); taosThreadMutexUnlock(&pMgmt->lock);
mError("trans:%d, invalid commit msg", transId); mError("trans:%d, invalid commit msg, cache transId:%d seq:%" PRId64, transId, pMgmt->transId, pMgmt->transSeq);
} else if (transId == pMgmt->transId) { } else if (transId == pMgmt->transId) {
if (pMgmt->errCode != 0) { if (pMgmt->errCode != 0) {
mError("trans:%d, failed to propose since %s, post sem", transId, tstrerror(pMgmt->errCode)); mError("trans:%d, failed to propose since %s, post sem", transId, tstrerror(pMgmt->errCode));
} else { } else {
mInfo("trans:%d, is proposed and post sem", transId); mInfo("trans:%d, is proposed and post sem, seq:%" PRId64, transId, pMgmt->transSeq);
} }
pMgmt->transId = 0; pMgmt->transId = 0;
pMgmt->transSec = 0; pMgmt->transSec = 0;
@ -108,7 +109,7 @@ int32_t mndProcessWriteMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFsmCbMeta
taosThreadMutexUnlock(&pMgmt->lock); taosThreadMutexUnlock(&pMgmt->lock);
STrans *pTrans = mndAcquireTrans(pMnode, transId); STrans *pTrans = mndAcquireTrans(pMnode, transId);
if (pTrans != NULL) { if (pTrans != NULL) {
mInfo("trans:%d, execute in mnode which not leader", transId); mInfo("trans:%d, execute in mnode which not leader or sync timeout", transId);
mndTransExecute(pMnode, pTrans); mndTransExecute(pMnode, pTrans);
mndReleaseTrans(pMnode, pTrans); mndReleaseTrans(pMnode, pTrans);
// sdbWriteFile(pMnode->pSdb, SDB_WRITE_DELTA); // sdbWriteFile(pMnode->pSdb, SDB_WRITE_DELTA);
@ -324,6 +325,7 @@ void mndCleanupSync(SMnode *pMnode) {
} }
void mndSyncCheckTimeout(SMnode *pMnode) { void mndSyncCheckTimeout(SMnode *pMnode) {
mTrace("check sync timeout");
SSyncMgmt *pMgmt = &pMnode->syncMgmt; SSyncMgmt *pMgmt = &pMnode->syncMgmt;
taosThreadMutexLock(&pMgmt->lock); taosThreadMutexLock(&pMgmt->lock);
if (pMgmt->transId != 0) { if (pMgmt->transId != 0) {
@ -350,7 +352,6 @@ void mndSyncCheckTimeout(SMnode *pMnode) {
int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw, int32_t transId) { int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw, int32_t transId) {
SSyncMgmt *pMgmt = &pMnode->syncMgmt; SSyncMgmt *pMgmt = &pMnode->syncMgmt;
pMgmt->errCode = 0;
SRpcMsg req = {.msgType = TDMT_MND_APPLY_MSG, .contLen = sdbGetRawTotalSize(pRaw)}; SRpcMsg req = {.msgType = TDMT_MND_APPLY_MSG, .contLen = sdbGetRawTotalSize(pRaw)};
if (req.contLen <= 0) return -1; if (req.contLen <= 0) return -1;
@ -360,6 +361,8 @@ int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw, int32_t transId) {
memcpy(req.pCont, pRaw, req.contLen); memcpy(req.pCont, pRaw, req.contLen);
taosThreadMutexLock(&pMgmt->lock); taosThreadMutexLock(&pMgmt->lock);
pMgmt->errCode = 0;
if (pMgmt->transId != 0) { if (pMgmt->transId != 0) {
mError("trans:%d, can't be proposed since trans:%d already waiting for confirm", transId, pMgmt->transId); mError("trans:%d, can't be proposed since trans:%d already waiting for confirm", transId, pMgmt->transId);
taosThreadMutexUnlock(&pMgmt->lock); taosThreadMutexUnlock(&pMgmt->lock);
@ -406,7 +409,7 @@ int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw, int32_t transId) {
} }
terrno = pMgmt->errCode; terrno = pMgmt->errCode;
return pMgmt->errCode; return terrno;
} }
void mndSyncStart(SMnode *pMnode) { void mndSyncStart(SMnode *pMnode) {