Merge pull request #28597 from taosdata/fix/TD-32271-main

fix: mndProcessArbCheckSyncTimer no check for sync
This commit is contained in:
Shengliang Guan 2024-11-04 20:15:47 +08:00 committed by GitHub
commit 8c9f80455e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 227 additions and 108 deletions

View File

@ -47,6 +47,15 @@ bool mndUpdateArbGroupBySetAssignedLeader(SArbGroup *pGroup, int32_t vgId, char
int32_t mndGetArbGroupSize(SMnode *pMnode);
typedef enum {
CHECK_SYNC_NONE = 0,
CHECK_SYNC_SET_ASSIGNED_LEADER = 1,
CHECK_SYNC_CHECK_SYNC = 2,
CHECK_SYNC_UPDATE = 3
} ECheckSyncOp;
void mndArbCheckSync(SArbGroup *pArbGroup, int64_t nowMs, ECheckSyncOp *pOp, SArbGroup *pNewGroup);
#ifdef __cplusplus
}
#endif

View File

@ -250,6 +250,12 @@ static int32_t mndArbGroupActionUpdate(SSdb *pSdb, SArbGroup *pOld, SArbGroup *p
pOld->assignedLeader.acked = pNew->assignedLeader.acked;
pOld->version++;
mInfo(
"arbgroup:%d, perform update action. members[0].token:%s, members[1].token:%s, isSync:%d, as-dnodeid:%d, "
"as-token:%s, as-acked:%d, version:%" PRId64,
pOld->vgId, pOld->members[0].state.token, pOld->members[1].state.token, pOld->isSync,
pOld->assignedLeader.dnodeId, pOld->assignedLeader.token, pOld->assignedLeader.acked, pOld->version);
_OVER:
(void)taosThreadMutexUnlock(&pOld->mutex);
@ -577,19 +583,77 @@ static int32_t mndSendArbSetAssignedLeaderReq(SMnode *pMnode, int32_t dnodeId, i
return code;
}
void mndArbCheckSync(SArbGroup *pArbGroup, int64_t nowMs, ECheckSyncOp *pOp, SArbGroup *pNewGroup) {
*pOp = CHECK_SYNC_NONE;
int32_t code = 0;
int32_t vgId = pArbGroup->vgId;
bool member0IsTimeout = mndCheckArbMemberHbTimeout(pArbGroup, 0, nowMs);
bool member1IsTimeout = mndCheckArbMemberHbTimeout(pArbGroup, 1, nowMs);
SArbAssignedLeader *pAssignedLeader = &pArbGroup->assignedLeader;
int32_t currentAssignedDnodeId = pAssignedLeader->dnodeId;
// 1. has assigned && no response => send req
if (currentAssignedDnodeId != 0 && pAssignedLeader->acked == false) {
*pOp = CHECK_SYNC_SET_ASSIGNED_LEADER;
return;
}
// 2. both of the two members are timeout => skip
if (member0IsTimeout && member1IsTimeout) {
return;
}
// 3. no member is timeout => check sync
if (member0IsTimeout == false && member1IsTimeout == false) {
// no assigned leader and not sync
if (currentAssignedDnodeId == 0 && !pArbGroup->isSync) {
*pOp = CHECK_SYNC_CHECK_SYNC;
}
return;
}
// 4. one of the members is timeout => set assigned leader
int32_t candidateIndex = member0IsTimeout ? 1 : 0;
SArbGroupMember *pMember = &pArbGroup->members[candidateIndex];
// has assigned leader and dnodeId not match => skip
if (currentAssignedDnodeId != 0 && currentAssignedDnodeId != pMember->info.dnodeId) {
mInfo("arb skip to set assigned leader to vgId:%d dnodeId:%d, assigned leader has been set to dnodeId:%d", vgId,
pMember->info.dnodeId, currentAssignedDnodeId);
return;
}
// not sync => skip
if (pArbGroup->isSync == false) {
if (currentAssignedDnodeId == pMember->info.dnodeId) {
mDebug("arb skip to set assigned leader to vgId:%d dnodeId:%d, arb group is not sync", vgId,
pMember->info.dnodeId);
} else {
mInfo("arb skip to set assigned leader to vgId:%d dnodeId:%d, arb group is not sync", vgId,
pMember->info.dnodeId);
}
return;
}
// is sync && no assigned leader => write to sdb
mndArbGroupDupObj(pArbGroup, pNewGroup);
mndArbGroupSetAssignedLeader(pNewGroup, candidateIndex);
*pOp = CHECK_SYNC_UPDATE;
}
static int32_t mndProcessArbCheckSyncTimer(SRpcMsg *pReq) {
int32_t code = 0;
int32_t code = 0, lino = 0;
SMnode *pMnode = pReq->info.node;
SSdb *pSdb = pMnode->pSdb;
SArbGroup *pArbGroup = NULL;
SArbGroup arbGroupDup = {0};
void *pIter = NULL;
SArray *pUpdateArray = NULL;
char arbToken[TSDB_ARB_TOKEN_SIZE];
if ((code = mndGetArbToken(pMnode, arbToken)) != 0) {
mError("failed to get arb token for arb-check-sync timer");
TAOS_RETURN(code);
}
TAOS_CHECK_EXIT(mndGetArbToken(pMnode, arbToken));
int64_t term = mndGetTerm(pMnode);
if (term < 0) {
mError("arb failed to get term since %s", terrstr());
@ -606,87 +670,63 @@ static int32_t mndProcessArbCheckSyncTimer(SRpcMsg *pReq) {
return 0;
}
SArray *pUpdateArray = taosArrayInit(16, sizeof(SArbGroup));
while (1) {
pIter = sdbFetch(pSdb, SDB_ARBGROUP, pIter, (void **)&pArbGroup);
if (pIter == NULL) break;
SArbGroup arbGroupDup = {0};
(void)taosThreadMutexLock(&pArbGroup->mutex);
mndArbGroupDupObj(pArbGroup, &arbGroupDup);
(void)taosThreadMutexUnlock(&pArbGroup->mutex);
int32_t vgId = arbGroupDup.vgId;
bool member0IsTimeout = mndCheckArbMemberHbTimeout(&arbGroupDup, 0, nowMs);
bool member1IsTimeout = mndCheckArbMemberHbTimeout(&arbGroupDup, 1, nowMs);
SArbAssignedLeader *pAssignedLeader = &arbGroupDup.assignedLeader;
int32_t currentAssignedDnodeId = pAssignedLeader->dnodeId;
// 1. has assigned && is sync && no response => send req
if (currentAssignedDnodeId != 0 && arbGroupDup.isSync == true && pAssignedLeader->acked == false) {
(void)mndSendArbSetAssignedLeaderReq(pMnode, currentAssignedDnodeId, vgId, arbToken, term,
pAssignedLeader->token);
mInfo("vgId:%d, arb send set assigned leader to dnodeId:%d", vgId, currentAssignedDnodeId);
sdbRelease(pSdb, pArbGroup);
continue;
}
// 2. both of the two members are timeout => skip
if (member0IsTimeout && member1IsTimeout) {
sdbRelease(pSdb, pArbGroup);
continue;
}
// 3. no member is timeout => check sync
if (member0IsTimeout == false && member1IsTimeout == false) {
// no assigned leader and not sync
if (currentAssignedDnodeId == 0 && !arbGroupDup.isSync) {
(void)mndSendArbCheckSyncReq(pMnode, arbGroupDup.vgId, arbToken, term, arbGroupDup.members[0].state.token,
arbGroupDup.members[1].state.token);
}
sdbRelease(pSdb, pArbGroup);
continue;
}
// 4. one of the members is timeout => set assigned leader
int32_t candidateIndex = member0IsTimeout ? 1 : 0;
SArbGroupMember *pMember = &arbGroupDup.members[candidateIndex];
// has assigned leader and dnodeId not match => skip
if (currentAssignedDnodeId != 0 && currentAssignedDnodeId != pMember->info.dnodeId) {
mInfo("arb skip to set assigned leader to vgId:%d dnodeId:%d, assigned leader has been set to dnodeId:%d", vgId,
pMember->info.dnodeId, currentAssignedDnodeId);
sdbRelease(pSdb, pArbGroup);
continue;
}
// not sync => skip
if (arbGroupDup.isSync == false) {
if (currentAssignedDnodeId == pMember->info.dnodeId) {
mDebug("arb skip to set assigned leader to vgId:%d dnodeId:%d, arb group is not sync", vgId,
pMember->info.dnodeId);
} else {
mInfo("arb skip to set assigned leader to vgId:%d dnodeId:%d, arb group is not sync", vgId,
pMember->info.dnodeId);
}
sdbRelease(pSdb, pArbGroup);
continue;
}
// is sync && no assigned leader => write to sdb
SArbGroup newGroup = {0};
mndArbGroupDupObj(&arbGroupDup, &newGroup);
mndArbGroupSetAssignedLeader(&newGroup, candidateIndex);
if (taosArrayPush(pUpdateArray, &newGroup) == NULL) {
taosArrayDestroy(pUpdateArray);
return terrno;
}
sdbRelease(pSdb, pArbGroup);
ECheckSyncOp op = CHECK_SYNC_NONE;
SArbGroup newGroup = {0};
mndArbCheckSync(&arbGroupDup, nowMs, &op, &newGroup);
int32_t vgId = arbGroupDup.vgId;
SArbAssignedLeader *pAssgndLeader = &arbGroupDup.assignedLeader;
int32_t assgndDnodeId = pAssgndLeader->dnodeId;
switch (op) {
case CHECK_SYNC_NONE:
mTrace("vgId:%d, arb skip to send msg by check sync", vgId);
break;
case CHECK_SYNC_SET_ASSIGNED_LEADER:
(void)mndSendArbSetAssignedLeaderReq(pMnode, assgndDnodeId, vgId, arbToken, term, pAssgndLeader->token);
mInfo("vgId:%d, arb send set assigned leader to dnodeId:%d", vgId, assgndDnodeId);
break;
case CHECK_SYNC_CHECK_SYNC:
(void)mndSendArbCheckSyncReq(pMnode, vgId, arbToken, term, arbGroupDup.members[0].state.token,
arbGroupDup.members[1].state.token);
mInfo("vgId:%d, arb send check sync request", vgId);
break;
case CHECK_SYNC_UPDATE:
if (!pUpdateArray) {
pUpdateArray = taosArrayInit(16, sizeof(SArbGroup));
if (!pUpdateArray) {
TAOS_CHECK_EXIT(TSDB_CODE_OUT_OF_MEMORY);
}
}
if (taosArrayPush(pUpdateArray, &newGroup) == NULL) {
TAOS_CHECK_EXIT(terrno);
}
break;
default:
mError("vgId:%d, arb unknown check sync op:%d", vgId, op);
break;
}
}
TAOS_CHECK_RETURN(mndPullupArbUpdateGroupBatch(pMnode, pUpdateArray));
TAOS_CHECK_EXIT(mndPullupArbUpdateGroupBatch(pMnode, pUpdateArray));
_exit:
if (code != 0) {
mError("failed to check sync at line %d since %s", lino, terrstr());
}
taosArrayDestroy(pUpdateArray);
return 0;

View File

@ -80,17 +80,17 @@ TEST_F(ArbgroupTest, 01_encode_decode_sdb) {
SArbGroup* pNewGroup = (SArbGroup*)sdbGetRowObj(pRow);
EXPECT_EQ(group.vgId, pNewGroup->vgId);
EXPECT_EQ(group.dbUid, pNewGroup->dbUid);
EXPECT_EQ(group.members[0].info.dnodeId, pNewGroup->members[0].info.dnodeId);
EXPECT_EQ(group.members[1].info.dnodeId, pNewGroup->members[1].info.dnodeId);
EXPECT_EQ(group.isSync, pNewGroup->isSync);
EXPECT_EQ(group.assignedLeader.dnodeId, pNewGroup->assignedLeader.dnodeId);
ASSERT_EQ(group.vgId, pNewGroup->vgId);
ASSERT_EQ(group.dbUid, pNewGroup->dbUid);
ASSERT_EQ(group.members[0].info.dnodeId, pNewGroup->members[0].info.dnodeId);
ASSERT_EQ(group.members[1].info.dnodeId, pNewGroup->members[1].info.dnodeId);
ASSERT_EQ(group.isSync, pNewGroup->isSync);
ASSERT_EQ(group.assignedLeader.dnodeId, pNewGroup->assignedLeader.dnodeId);
EXPECT_EQ(std::string(group.members[0].state.token), std::string(pNewGroup->members[0].state.token));
EXPECT_EQ(std::string(group.members[1].state.token), std::string(pNewGroup->members[1].state.token));
EXPECT_EQ(std::string(group.assignedLeader.token), std::string(pNewGroup->assignedLeader.token));
EXPECT_EQ(group.version, pNewGroup->version);
ASSERT_EQ(std::string(group.members[0].state.token), std::string(pNewGroup->members[0].state.token));
ASSERT_EQ(std::string(group.members[1].state.token), std::string(pNewGroup->members[1].state.token));
ASSERT_EQ(std::string(group.assignedLeader.token), std::string(pNewGroup->assignedLeader.token));
ASSERT_EQ(group.version, pNewGroup->version);
taosMemoryFree(pRow);
taosMemoryFree(pRaw);
@ -129,9 +129,9 @@ TEST_F(ArbgroupTest, 02_process_heart_beat_rsp) {
SArbGroup newGroup = {0};
bool updateToken = mndUpdateArbGroupByHeartBeat(&group, &rspMember, nowMs, dnodeId, &newGroup);
EXPECT_FALSE(updateToken);
EXPECT_NE(group.members[0].state.responsedHbSeq, rspMember.hbSeq);
EXPECT_NE(group.members[0].state.lastHbMs, nowMs);
ASSERT_EQ(updateToken, false);
ASSERT_NE(group.members[0].state.responsedHbSeq, rspMember.hbSeq);
ASSERT_NE(group.members[0].state.lastHbMs, nowMs);
}
{ // old token
@ -144,9 +144,9 @@ TEST_F(ArbgroupTest, 02_process_heart_beat_rsp) {
SArbGroup newGroup = {0};
bool updateToken = mndUpdateArbGroupByHeartBeat(&group, &rspMember, nowMs, dnodeId, &newGroup);
EXPECT_FALSE(updateToken);
EXPECT_EQ(group.members[0].state.responsedHbSeq, rspMember.hbSeq);
EXPECT_EQ(group.members[0].state.lastHbMs, nowMs);
ASSERT_EQ(updateToken, false);
ASSERT_EQ(group.members[0].state.responsedHbSeq, rspMember.hbSeq);
ASSERT_EQ(group.members[0].state.lastHbMs, nowMs);
}
{ // new token
@ -159,14 +159,14 @@ TEST_F(ArbgroupTest, 02_process_heart_beat_rsp) {
SArbGroup newGroup = {0};
bool updateToken = mndUpdateArbGroupByHeartBeat(&group, &rspMember, nowMs, dnodeId, &newGroup);
EXPECT_TRUE(updateToken);
EXPECT_EQ(group.members[0].state.responsedHbSeq, rspMember.hbSeq);
EXPECT_EQ(group.members[0].state.lastHbMs, nowMs);
ASSERT_EQ(updateToken, true);
ASSERT_EQ(group.members[0].state.responsedHbSeq, rspMember.hbSeq);
ASSERT_EQ(group.members[0].state.lastHbMs, nowMs);
EXPECT_EQ(std::string(newGroup.members[0].state.token), std::string(rspMember.memberToken));
EXPECT_FALSE(newGroup.isSync);
EXPECT_EQ(newGroup.assignedLeader.dnodeId, 0);
EXPECT_EQ(std::string(newGroup.assignedLeader.token).size(), 0);
ASSERT_EQ(std::string(newGroup.members[0].state.token), std::string(rspMember.memberToken));
ASSERT_EQ(newGroup.isSync, false);
ASSERT_EQ(newGroup.assignedLeader.dnodeId, 0);
ASSERT_EQ(std::string(newGroup.assignedLeader.token).size(), 0);
}
taosThreadMutexDestroy(&group.mutex);
@ -203,7 +203,7 @@ TEST_F(ArbgroupTest, 03_process_check_sync_rsp) {
SArbGroup newGroup = {0};
bool updateIsSync = mndUpdateArbGroupByCheckSync(&group, vgId, member0Token, member1Token, newIsSync, &newGroup);
EXPECT_FALSE(updateIsSync);
ASSERT_EQ(updateIsSync, false);
}
{ // newIsSync
@ -216,8 +216,8 @@ TEST_F(ArbgroupTest, 03_process_check_sync_rsp) {
SArbGroup newGroup = {0};
bool updateIsSync = mndUpdateArbGroupByCheckSync(&group, vgId, member0Token, member1Token, newIsSync, &newGroup);
EXPECT_TRUE(updateIsSync);
EXPECT_TRUE(newGroup.isSync);
ASSERT_EQ(updateIsSync, true);
ASSERT_EQ(newGroup.isSync, true);
}
taosThreadMutexDestroy(&group.mutex);
@ -254,7 +254,7 @@ TEST_F(ArbgroupTest, 04_process_set_assigned_leader){
SArbGroup newGroup = {0};
bool updateAssigned = mndUpdateArbGroupBySetAssignedLeader(&group, vgId, memberToken, errcode, &newGroup);
EXPECT_FALSE(updateAssigned);
ASSERT_EQ(updateAssigned, false);
}
{ // errcode != TSDB_CODE_SUCCESS
@ -265,7 +265,7 @@ TEST_F(ArbgroupTest, 04_process_set_assigned_leader){
SArbGroup newGroup = {0};
bool updateAssigned = mndUpdateArbGroupBySetAssignedLeader(&group, vgId, memberToken, errcode, &newGroup);
EXPECT_FALSE(updateAssigned);
ASSERT_EQ(updateAssigned, false);
}
{ // errcode == TSDB_CODE_SUCCESS
@ -276,11 +276,81 @@ TEST_F(ArbgroupTest, 04_process_set_assigned_leader){
SArbGroup newGroup = {0};
bool updateAssigned = mndUpdateArbGroupBySetAssignedLeader(&group, vgId, memberToken, errcode, &newGroup);
EXPECT_TRUE(updateAssigned);
EXPECT_FALSE(newGroup.isSync);
ASSERT_EQ(updateAssigned, true);
ASSERT_EQ(newGroup.isSync, false);
}
taosThreadMutexDestroy(&group.mutex);
}
TEST_F(ArbgroupTest, 05_check_sync_timer) {
const int32_t assgndDnodeId = 1;
const int32_t vgId = 5;
const int64_t nowMs = 173044838300;
SArbGroup group = {0};
group.vgId = vgId;
group.dbUid = 1234;
group.members[0].info.dnodeId = assgndDnodeId;
group.members[0].state.lastHbMs = nowMs - 10;
group.members[1].info.dnodeId = 2;
group.members[1].state.lastHbMs = nowMs - 10;
group.isSync = 1;
taosThreadMutexInit(&group.mutex, NULL);
SArbAssignedLeader assgnedLeader = {0};
assgnedLeader.dnodeId = assgndDnodeId;
assgnedLeader.acked = false;
strncpy(assgnedLeader.token, group.members[0].state.token, TSDB_ARB_TOKEN_SIZE);
SArbAssignedLeader noneAsgndLeader = {0};
noneAsgndLeader.dnodeId = 0;
noneAsgndLeader.acked = false;
ECheckSyncOp op = CHECK_SYNC_NONE;
SArbGroup newGroup = {0};
// 1. asgnd,sync,noAck --> send set assigned
group.assignedLeader = assgnedLeader;
group.assignedLeader.acked = false;
group.isSync = true;
mndArbCheckSync(&group, nowMs, &op, &newGroup);
ASSERT_EQ(op, CHECK_SYNC_SET_ASSIGNED_LEADER);
// 2. asgnd,notSync,noAck --> send set assgnd
newGroup = {0};
group.assignedLeader = assgnedLeader;
group.isSync = false;
group.assignedLeader.acked = false;
mndArbCheckSync(&group, nowMs, &op, &newGroup);
ASSERT_EQ(op, CHECK_SYNC_SET_ASSIGNED_LEADER);
// 3. noAsgnd,notSync,noAck(init) --> check sync
newGroup = {0};
group.assignedLeader = noneAsgndLeader;
group.isSync = false;
group.assignedLeader.acked = false;
mndArbCheckSync(&group, nowMs, &op, &newGroup);
ASSERT_EQ(op, CHECK_SYNC_CHECK_SYNC);
// 4. noAsgnd,sync,noAck,one timeout--> update arbgroup (asgnd,sync,noAck)
newGroup = {0};
group.assignedLeader = noneAsgndLeader;
group.isSync = true;
group.assignedLeader.acked = false;
group.members[1].state.lastHbMs = nowMs - 2 * tsArbSetAssignedTimeoutSec * 1000; // member1 timeout
mndArbCheckSync(&group, nowMs, &op, &newGroup);
ASSERT_EQ(op, CHECK_SYNC_UPDATE);
ASSERT_EQ(newGroup.assignedLeader.dnodeId, assgndDnodeId);
ASSERT_EQ(std::string(newGroup.assignedLeader.token), std::string(group.members[0].state.token));
ASSERT_EQ(newGroup.isSync, true);
ASSERT_EQ(newGroup.assignedLeader.acked, false);
}
#pragma GCC diagnostic pop

View File

@ -234,14 +234,14 @@ void syncPrintNodeLog(const char* flags, ELogLevel level, int32_t dflag, SSyncNo
", elect-times:%d, as-leader-times:%d, as-assigned-leader-times:%d, cfg-ch-times:%d, hb-slow:%d, hbr-slow:%d, "
"aq-items:%d, snaping:%" PRId64 ", replicas:%d, last-cfg:%" PRId64
", chging:%d, restore:%d, quorum:%d, elect-lc-timer:%" PRId64 ", hb:%" PRId64
", buffer:%s, repl-mgrs:%s, members:%s, hb:%s, hb-reply:%s",
", buffer:%s, repl-mgrs:%s, members:%s, hb:%s, hb-reply:%s, arb-token:%s",
pNode->vgId, eventLog, syncStr(pNode->state), currentTerm, pNode->commitIndex, pNode->assignedCommitIndex,
appliedIndex, logBeginIndex, logLastIndex, pNode->minMatchIndex, snapshot.lastApplyIndex,
snapshot.lastApplyTerm, pNode->electNum, pNode->becomeLeaderNum, pNode->becomeAssignedLeaderNum,
pNode->configChangeNum, pNode->hbSlowNum, pNode->hbrSlowNum, aqItems, pNode->snapshottingIndex,
pNode->replicaNum, pNode->raftCfg.lastConfigIndex, pNode->changing, pNode->restoreFinish,
syncNodeDynamicQuorum(pNode), pNode->electTimerLogicClock, pNode->heartbeatTimerLogicClockUser, bufferStatesStr,
replMgrStatesStr, cfgStr, hbTimeStr, hbrTimeStr);
replMgrStatesStr, cfgStr, hbTimeStr, hbrTimeStr, pNode->arbToken);
}
}