enh: refactor mndProcessArbCheckSyncTimer

This commit is contained in:
Shungang Li 2024-11-01 15:57:55 +08:00
parent ff565f0f39
commit 362aa5f7cb
1 changed files with 117 additions and 76 deletions

View File

@ -583,19 +583,84 @@ static int32_t mndSendArbSetAssignedLeaderReq(SMnode *pMnode, int32_t dnodeId, i
return code;
}
static int32_t mndProcessArbCheckSyncTimer(SRpcMsg *pReq) {
typedef enum {
CHECK_SYNC_NONE = 0,
CHECK_SYNC_SET_ASSIGNED_LEADER = 1,
CHECK_SYNC_CHECK_SYNC = 2,
CHECK_SYNC_UPDATE = 3
} ECheckSyncOp;
static void mndArbCheckSync(SArbGroup *pArbGroup, int64_t nowMs, ECheckSyncOp *pOp, SArbGroup *pNewGroup) {
*pOp = CHECK_SYNC_NONE;
int32_t code = 0;
int32_t vgId = pArbGroup->vgId;
bool member0IsTimeout = mndCheckArbMemberHbTimeout(pArbGroup, 0, nowMs);
bool member1IsTimeout = mndCheckArbMemberHbTimeout(pArbGroup, 1, nowMs);
SArbAssignedLeader *pAssignedLeader = &pArbGroup->assignedLeader;
int32_t currentAssignedDnodeId = pAssignedLeader->dnodeId;
// 1. has assigned && no response => send req
if (currentAssignedDnodeId != 0 && pAssignedLeader->acked == false) {
*pOp = CHECK_SYNC_SET_ASSIGNED_LEADER;
return;
}
// 2. both of the two members are timeout => skip
if (member0IsTimeout && member1IsTimeout) {
return;
}
// 3. no member is timeout => check sync
if (member0IsTimeout == false && member1IsTimeout == false) {
// no assigned leader and not sync
if (currentAssignedDnodeId == 0 && !pArbGroup->isSync) {
*pOp = CHECK_SYNC_CHECK_SYNC;
}
return;
}
// 4. one of the members is timeout => set assigned leader
int32_t candidateIndex = member0IsTimeout ? 1 : 0;
SArbGroupMember *pMember = &pArbGroup->members[candidateIndex];
// has assigned leader and dnodeId not match => skip
if (currentAssignedDnodeId != 0 && currentAssignedDnodeId != pMember->info.dnodeId) {
mInfo("arb skip to set assigned leader to vgId:%d dnodeId:%d, assigned leader has been set to dnodeId:%d", vgId,
pMember->info.dnodeId, currentAssignedDnodeId);
return;
}
// not sync => skip
if (pArbGroup->isSync == false) {
if (currentAssignedDnodeId == pMember->info.dnodeId) {
mDebug("arb skip to set assigned leader to vgId:%d dnodeId:%d, arb group is not sync", vgId,
pMember->info.dnodeId);
} else {
mInfo("arb skip to set assigned leader to vgId:%d dnodeId:%d, arb group is not sync", vgId,
pMember->info.dnodeId);
}
return;
}
// is sync && no assigned leader => write to sdb
mndArbGroupDupObj(pArbGroup, pNewGroup);
mndArbGroupSetAssignedLeader(pNewGroup, candidateIndex);
*pOp = CHECK_SYNC_UPDATE;
}
static int32_t mndProcessArbCheckSyncTimer(SRpcMsg *pReq) {
int32_t code = 0, lino = 0;
SMnode *pMnode = pReq->info.node;
SSdb *pSdb = pMnode->pSdb;
SArbGroup *pArbGroup = NULL;
SArbGroup arbGroupDup = {0};
void *pIter = NULL;
SArray *pUpdateArray = NULL;
char arbToken[TSDB_ARB_TOKEN_SIZE];
if ((code = mndGetArbToken(pMnode, arbToken)) != 0) {
mError("failed to get arb token for arb-check-sync timer");
TAOS_RETURN(code);
}
TAOS_CHECK_EXIT(mndGetArbToken(pMnode, arbToken));
int64_t term = mndGetTerm(pMnode);
if (term < 0) {
mError("arb failed to get term since %s", terrstr());
@ -612,87 +677,63 @@ static int32_t mndProcessArbCheckSyncTimer(SRpcMsg *pReq) {
return 0;
}
SArray *pUpdateArray = taosArrayInit(16, sizeof(SArbGroup));
while (1) {
pIter = sdbFetch(pSdb, SDB_ARBGROUP, pIter, (void **)&pArbGroup);
if (pIter == NULL) break;
SArbGroup arbGroupDup = {0};
(void)taosThreadMutexLock(&pArbGroup->mutex);
mndArbGroupDupObj(pArbGroup, &arbGroupDup);
(void)taosThreadMutexUnlock(&pArbGroup->mutex);
int32_t vgId = arbGroupDup.vgId;
bool member0IsTimeout = mndCheckArbMemberHbTimeout(&arbGroupDup, 0, nowMs);
bool member1IsTimeout = mndCheckArbMemberHbTimeout(&arbGroupDup, 1, nowMs);
SArbAssignedLeader *pAssignedLeader = &arbGroupDup.assignedLeader;
int32_t currentAssignedDnodeId = pAssignedLeader->dnodeId;
// 1. has assigned && no response => send req
if (currentAssignedDnodeId != 0 && pAssignedLeader->acked == false) {
(void)mndSendArbSetAssignedLeaderReq(pMnode, currentAssignedDnodeId, vgId, arbToken, term,
pAssignedLeader->token);
mInfo("vgId:%d, arb send set assigned leader to dnodeId:%d", vgId, currentAssignedDnodeId);
sdbRelease(pSdb, pArbGroup);
continue;
}
// 2. both of the two members are timeout => skip
if (member0IsTimeout && member1IsTimeout) {
sdbRelease(pSdb, pArbGroup);
continue;
}
// 3. no member is timeout => check sync
if (member0IsTimeout == false && member1IsTimeout == false) {
// no assigned leader and not sync
if (currentAssignedDnodeId == 0 && !arbGroupDup.isSync) {
(void)mndSendArbCheckSyncReq(pMnode, arbGroupDup.vgId, arbToken, term, arbGroupDup.members[0].state.token,
arbGroupDup.members[1].state.token);
}
sdbRelease(pSdb, pArbGroup);
continue;
}
// 4. one of the members is timeout => set assigned leader
int32_t candidateIndex = member0IsTimeout ? 1 : 0;
SArbGroupMember *pMember = &arbGroupDup.members[candidateIndex];
// has assigned leader and dnodeId not match => skip
if (currentAssignedDnodeId != 0 && currentAssignedDnodeId != pMember->info.dnodeId) {
mInfo("arb skip to set assigned leader to vgId:%d dnodeId:%d, assigned leader has been set to dnodeId:%d", vgId,
pMember->info.dnodeId, currentAssignedDnodeId);
sdbRelease(pSdb, pArbGroup);
continue;
}
// not sync => skip
if (arbGroupDup.isSync == false) {
if (currentAssignedDnodeId == pMember->info.dnodeId) {
mDebug("arb skip to set assigned leader to vgId:%d dnodeId:%d, arb group is not sync", vgId,
pMember->info.dnodeId);
} else {
mInfo("arb skip to set assigned leader to vgId:%d dnodeId:%d, arb group is not sync", vgId,
pMember->info.dnodeId);
}
sdbRelease(pSdb, pArbGroup);
continue;
}
// is sync && no assigned leader => write to sdb
ECheckSyncOp op = CHECK_SYNC_NONE;
SArbGroup newGroup = {0};
mndArbGroupDupObj(&arbGroupDup, &newGroup);
mndArbGroupSetAssignedLeader(&newGroup, candidateIndex);
mndArbCheckSync(&arbGroupDup, nowMs, &op, &newGroup);
int32_t vgId = arbGroupDup.vgId;
SArbAssignedLeader *pAssgndLeader = &arbGroupDup.assignedLeader;
int32_t assgndDnodeId = pAssgndLeader->dnodeId;
switch (op) {
case CHECK_SYNC_NONE:
mTrace("vgId:%d, arb skip to send msg by check sync", vgId);
break;
case CHECK_SYNC_SET_ASSIGNED_LEADER:
(void)mndSendArbSetAssignedLeaderReq(pMnode, assgndDnodeId, vgId, arbToken, term, pAssgndLeader->token);
mInfo("vgId:%d, arb send set assigned leader to dnodeId:%d", vgId, assgndDnodeId);
break;
case CHECK_SYNC_CHECK_SYNC:
(void)mndSendArbCheckSyncReq(pMnode, vgId, arbToken, term, arbGroupDup.members[0].state.token,
arbGroupDup.members[1].state.token);
mInfo("vgId:%d, arb send check sync request", vgId);
break;
case CHECK_SYNC_UPDATE:
if (!pUpdateArray) {
pUpdateArray = taosArrayInit(16, sizeof(SArbGroup));
if (!pUpdateArray) {
TAOS_CHECK_EXIT(TSDB_CODE_OUT_OF_MEMORY);
}
}
if (taosArrayPush(pUpdateArray, &newGroup) == NULL) {
taosArrayDestroy(pUpdateArray);
return terrno;
TAOS_CHECK_EXIT(terrno);
}
break;
default:
mError("vgId:%d, arb unknown check sync op:%d", vgId, op);
break;
}
}
sdbRelease(pSdb, pArbGroup);
}
TAOS_CHECK_EXIT(mndPullupArbUpdateGroupBatch(pMnode, pUpdateArray));
TAOS_CHECK_RETURN(mndPullupArbUpdateGroupBatch(pMnode, pUpdateArray));
_exit:
if (code != 0) {
mError("failed to check sync at line %d since %s", lino, terrstr());
}
taosArrayDestroy(pUpdateArray);
return 0;