fix/TS-5805-check-arb
This commit is contained in:
parent
d3ab816838
commit
1bc47199be
|
@ -38,8 +38,8 @@ int32_t mndSetCreateArbGroupCommitLogs(STrans *pTrans, SArbGroup *pGroup);
|
||||||
int32_t mndSetDropArbGroupPrepareLogs(STrans *pTrans, SArbGroup *pGroup);
|
int32_t mndSetDropArbGroupPrepareLogs(STrans *pTrans, SArbGroup *pGroup);
|
||||||
int32_t mndSetDropArbGroupCommitLogs(STrans *pTrans, SArbGroup *pGroup);
|
int32_t mndSetDropArbGroupCommitLogs(STrans *pTrans, SArbGroup *pGroup);
|
||||||
|
|
||||||
bool mndUpdateArbGroupByHeartBeat(SArbGroup *pGroup, SVArbHbRspMember *pRspMember, int64_t nowMs, int32_t dnodeId,
|
bool mndCheckArbGroupByHeartBeat(SArbGroup *pGroup, SVArbHbRspMember *pRspMember, int64_t nowMs, int32_t dnodeId,
|
||||||
SArbGroup *pNewGroup);
|
SArbGroup *pNewGroup);
|
||||||
bool mndUpdateArbGroupByCheckSync(SArbGroup *pGroup, int32_t vgId, char *member0Token, char *member1Token,
|
bool mndUpdateArbGroupByCheckSync(SArbGroup *pGroup, int32_t vgId, char *member0Token, char *member1Token,
|
||||||
bool newIsSync, SArbGroup *pNewGroup, int32_t code);
|
bool newIsSync, SArbGroup *pNewGroup, int32_t code);
|
||||||
bool mndUpdateArbGroupBySetAssignedLeader(SArbGroup *pGroup, int32_t vgId, char *memberToken, int32_t errcode,
|
bool mndUpdateArbGroupBySetAssignedLeader(SArbGroup *pGroup, int32_t vgId, char *memberToken, int32_t errcode,
|
||||||
|
|
|
@ -34,9 +34,9 @@ static void mndArbGroupDupObj(SArbGroup *pGroup, SArbGroup *pNew);
|
||||||
static void mndArbGroupSetAssignedLeader(SArbGroup *pGroup, int32_t index);
|
static void mndArbGroupSetAssignedLeader(SArbGroup *pGroup, int32_t index);
|
||||||
static void mndArbGroupResetAssignedLeader(SArbGroup *pGroup);
|
static void mndArbGroupResetAssignedLeader(SArbGroup *pGroup);
|
||||||
|
|
||||||
static int32_t mndArbGroupUpdateTrans(SMnode *pMnode, SArbGroup *pNew);
|
// static int32_t mndArbGroupUpdateTrans(SMnode *pMnode, SArbGroup *pNew);
|
||||||
static int32_t mndPullupArbUpdateGroup(SMnode *pMnode, SArbGroup *pNewGroup);
|
static int32_t mndUpdateArbGroup(SMnode *pMnode, SArbGroup *pNewGroup);
|
||||||
static int32_t mndPullupArbUpdateGroupBatch(SMnode *pMnode, SArray *newGroupArray);
|
static int32_t mndBatchUpdateArbGroup(SMnode *pMnode, SArray *newGroupArray);
|
||||||
|
|
||||||
static int32_t mndProcessArbHbTimer(SRpcMsg *pReq);
|
static int32_t mndProcessArbHbTimer(SRpcMsg *pReq);
|
||||||
static int32_t mndProcessArbCheckSyncTimer(SRpcMsg *pReq);
|
static int32_t mndProcessArbCheckSyncTimer(SRpcMsg *pReq);
|
||||||
|
@ -728,7 +728,7 @@ static int32_t mndProcessArbCheckSyncTimer(SRpcMsg *pReq) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
TAOS_CHECK_EXIT(mndPullupArbUpdateGroupBatch(pMnode, pUpdateArray));
|
TAOS_CHECK_EXIT(mndBatchUpdateArbGroup(pMnode, pUpdateArray));
|
||||||
|
|
||||||
_exit:
|
_exit:
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
|
@ -772,7 +772,7 @@ static void mndInitArbUpdateGroup(SArbGroup *pGroup, SMArbUpdateGroup *outGroup)
|
||||||
outGroup->updateTimeMs = pGroup->updateTimeMs;
|
outGroup->updateTimeMs = pGroup->updateTimeMs;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndPullupArbUpdateGroup(SMnode *pMnode, SArbGroup *pNewGroup) {
|
static int32_t mndUpdateArbGroup(SMnode *pMnode, SArbGroup *pNewGroup) {
|
||||||
if (taosHashGet(arbUpdateHash, &pNewGroup->vgId, sizeof(pNewGroup->vgId)) != NULL) {
|
if (taosHashGet(arbUpdateHash, &pNewGroup->vgId, sizeof(pNewGroup->vgId)) != NULL) {
|
||||||
mInfo("vgId:%d, arb skip to pullup arb-update-group request, since it is in process", pNewGroup->vgId);
|
mInfo("vgId:%d, arb skip to pullup arb-update-group request, since it is in process", pNewGroup->vgId);
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -805,7 +805,7 @@ _OVER:
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndPullupArbUpdateGroupBatch(SMnode *pMnode, SArray *newGroupArray) {
|
static int32_t mndBatchUpdateArbGroup(SMnode *pMnode, SArray *newGroupArray) {
|
||||||
int32_t ret = -1;
|
int32_t ret = -1;
|
||||||
|
|
||||||
size_t sz = taosArrayGetSize(newGroupArray);
|
size_t sz = taosArrayGetSize(newGroupArray);
|
||||||
|
@ -963,41 +963,41 @@ static void mndArbGroupResetAssignedLeader(SArbGroup *pGroup) {
|
||||||
pGroup->assignedLeader.acked = false;
|
pGroup->assignedLeader.acked = false;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndArbGroupUpdateTrans(SMnode *pMnode, SArbGroup *pNew) {
|
// static int32_t mndArbGroupUpdateTrans(SMnode *pMnode, SArbGroup *pNew) {
|
||||||
int32_t code = -1;
|
// int32_t code = -1;
|
||||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_ARBGROUP, NULL, "update-arbgroup");
|
// STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_ARBGROUP, NULL, "update-arbgroup");
|
||||||
if (pTrans == NULL) {
|
// if (pTrans == NULL) {
|
||||||
mError("failed to update arbgroup in create trans, vgId:%d, since %s", pNew->vgId, terrstr());
|
// mError("failed to update arbgroup in create trans, vgId:%d, since %s", pNew->vgId, terrstr());
|
||||||
if (terrno != 0) code = terrno;
|
// if (terrno != 0) code = terrno;
|
||||||
goto _OVER;
|
// goto _OVER;
|
||||||
}
|
// }
|
||||||
|
|
||||||
mInfo("trans:%d, used to update arbgroup:%d, member0:[%d][%s] member1:[%d][%s] isSync:%d assigned:[%d][%s][%d]",
|
// mInfo("trans:%d, used to update arbgroup:%d, member0:[%d][%s] member1:[%d][%s] isSync:%d assigned:[%d][%s][%d]",
|
||||||
pTrans->id, pNew->vgId, pNew->members[0].info.dnodeId, pNew->members[0].state.token,
|
// pTrans->id, pNew->vgId, pNew->members[0].info.dnodeId, pNew->members[0].state.token,
|
||||||
pNew->members[1].info.dnodeId, pNew->members[1].state.token, pNew->isSync, pNew->assignedLeader.dnodeId,
|
// pNew->members[1].info.dnodeId, pNew->members[1].state.token, pNew->isSync, pNew->assignedLeader.dnodeId,
|
||||||
pNew->assignedLeader.token, pNew->assignedLeader.acked);
|
// pNew->assignedLeader.token, pNew->assignedLeader.acked);
|
||||||
|
|
||||||
mndTransAddArbGroupId(pTrans, pNew->vgId);
|
// mndTransAddArbGroupId(pTrans, pNew->vgId);
|
||||||
if ((code = mndTransCheckConflict(pMnode, pTrans)) != 0) {
|
// if ((code = mndTransCheckConflict(pMnode, pTrans)) != 0) {
|
||||||
goto _OVER;
|
// goto _OVER;
|
||||||
}
|
// }
|
||||||
|
|
||||||
if ((code = mndSetCreateArbGroupCommitLogs(pTrans, pNew)) != 0) {
|
// if ((code = mndSetCreateArbGroupCommitLogs(pTrans, pNew)) != 0) {
|
||||||
mError("failed to update arbgroup in set commit log, vgId:%d, since %s", pNew->vgId, tstrerror(code));
|
// mError("failed to update arbgroup in set commit log, vgId:%d, since %s", pNew->vgId, tstrerror(code));
|
||||||
goto _OVER;
|
// goto _OVER;
|
||||||
}
|
// }
|
||||||
|
|
||||||
if ((code = mndTransPrepare(pMnode, pTrans)) != 0) goto _OVER;
|
// if ((code = mndTransPrepare(pMnode, pTrans)) != 0) goto _OVER;
|
||||||
|
|
||||||
code = 0;
|
// code = 0;
|
||||||
|
|
||||||
_OVER:
|
// _OVER:
|
||||||
mndTransDrop(pTrans);
|
// mndTransDrop(pTrans);
|
||||||
return code;
|
// return code;
|
||||||
}
|
// }
|
||||||
|
|
||||||
bool mndUpdateArbGroupByHeartBeat(SArbGroup *pGroup, SVArbHbRspMember *pRspMember, int64_t nowMs, int32_t dnodeId,
|
bool mndCheckArbGroupByHeartBeat(SArbGroup *pGroup, SVArbHbRspMember *pRspMember, int64_t nowMs, int32_t dnodeId,
|
||||||
SArbGroup *pNewGroup) {
|
SArbGroup *pNewGroup) {
|
||||||
bool updateToken = false;
|
bool updateToken = false;
|
||||||
SArbGroupMember *pMember = NULL;
|
SArbGroupMember *pMember = NULL;
|
||||||
|
|
||||||
|
@ -1052,7 +1052,7 @@ _OVER:
|
||||||
return updateToken;
|
return updateToken;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndUpdateArbHeartBeat(SMnode *pMnode, int32_t dnodeId, SArray *memberArray) {
|
static int32_t mndUpdateArbGroupsByHeartBeat(SMnode *pMnode, int32_t dnodeId, SArray *memberArray) {
|
||||||
int64_t nowMs = taosGetTimestampMs();
|
int64_t nowMs = taosGetTimestampMs();
|
||||||
size_t size = taosArrayGetSize(memberArray);
|
size_t size = taosArrayGetSize(memberArray);
|
||||||
SArray *pUpdateArray = taosArrayInit(size, sizeof(SArbGroup));
|
SArray *pUpdateArray = taosArrayInit(size, sizeof(SArbGroup));
|
||||||
|
@ -1067,7 +1067,7 @@ static int32_t mndUpdateArbHeartBeat(SMnode *pMnode, int32_t dnodeId, SArray *me
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool updateToken = mndUpdateArbGroupByHeartBeat(pGroup, pRspMember, nowMs, dnodeId, &newGroup);
|
bool updateToken = mndCheckArbGroupByHeartBeat(pGroup, pRspMember, nowMs, dnodeId, &newGroup);
|
||||||
if (updateToken) {
|
if (updateToken) {
|
||||||
if (taosArrayPush(pUpdateArray, &newGroup) == NULL) {
|
if (taosArrayPush(pUpdateArray, &newGroup) == NULL) {
|
||||||
mError("failed to push newGroup to updateArray, but continue at this hearbear");
|
mError("failed to push newGroup to updateArray, but continue at this hearbear");
|
||||||
|
@ -1077,7 +1077,7 @@ static int32_t mndUpdateArbHeartBeat(SMnode *pMnode, int32_t dnodeId, SArray *me
|
||||||
sdbRelease(pMnode->pSdb, pGroup);
|
sdbRelease(pMnode->pSdb, pGroup);
|
||||||
}
|
}
|
||||||
|
|
||||||
TAOS_CHECK_RETURN(mndPullupArbUpdateGroupBatch(pMnode, pUpdateArray));
|
TAOS_CHECK_RETURN(mndBatchUpdateArbGroup(pMnode, pUpdateArray));
|
||||||
|
|
||||||
taosArrayDestroy(pUpdateArray);
|
taosArrayDestroy(pUpdateArray);
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -1134,7 +1134,7 @@ static int32_t mndUpdateArbSync(SMnode *pMnode, int32_t vgId, char *member0Token
|
||||||
bool updateIsSync =
|
bool updateIsSync =
|
||||||
mndUpdateArbGroupByCheckSync(pGroup, vgId, member0Token, member1Token, newIsSync, &newGroup, rsp_code);
|
mndUpdateArbGroupByCheckSync(pGroup, vgId, member0Token, member1Token, newIsSync, &newGroup, rsp_code);
|
||||||
if (updateIsSync) {
|
if (updateIsSync) {
|
||||||
if (mndPullupArbUpdateGroup(pMnode, &newGroup) != 0) {
|
if (mndUpdateArbGroup(pMnode, &newGroup) != 0) {
|
||||||
mInfo("failed to pullup update arb sync, vgId:%d, since %s", vgId, terrstr());
|
mInfo("failed to pullup update arb sync, vgId:%d, since %s", vgId, terrstr());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1173,7 +1173,7 @@ static int32_t mndProcessArbHbRsp(SRpcMsg *pRsp) {
|
||||||
goto _OVER;
|
goto _OVER;
|
||||||
}
|
}
|
||||||
|
|
||||||
TAOS_CHECK_GOTO(mndUpdateArbHeartBeat(pMnode, arbHbRsp.dnodeId, arbHbRsp.hbMembers), NULL, _OVER);
|
TAOS_CHECK_GOTO(mndUpdateArbGroupsByHeartBeat(pMnode, arbHbRsp.dnodeId, arbHbRsp.hbMembers), NULL, _OVER);
|
||||||
code = 0;
|
code = 0;
|
||||||
|
|
||||||
_OVER:
|
_OVER:
|
||||||
|
@ -1303,7 +1303,7 @@ static int32_t mndProcessArbSetAssignedLeaderRsp(SRpcMsg *pRsp) {
|
||||||
bool updateAssigned = mndUpdateArbGroupBySetAssignedLeader(pGroup, setAssignedRsp.vgId, setAssignedRsp.memberToken,
|
bool updateAssigned = mndUpdateArbGroupBySetAssignedLeader(pGroup, setAssignedRsp.vgId, setAssignedRsp.memberToken,
|
||||||
pRsp->code, &newGroup);
|
pRsp->code, &newGroup);
|
||||||
if (updateAssigned) {
|
if (updateAssigned) {
|
||||||
if ((code = mndPullupArbUpdateGroup(pMnode, &newGroup)) != 0) {
|
if ((code = mndUpdateArbGroup(pMnode, &newGroup)) != 0) {
|
||||||
mInfo("failed to pullup update arb assigned for vgId:%d, since:%s", setAssignedRsp.vgId, tstrerror(code));
|
mInfo("failed to pullup update arb assigned for vgId:%d, since:%s", setAssignedRsp.vgId, tstrerror(code));
|
||||||
goto _OVER;
|
goto _OVER;
|
||||||
}
|
}
|
||||||
|
|
|
@ -127,7 +127,7 @@ TEST_F(ArbgroupTest, 02_process_heart_beat_rsp) {
|
||||||
int32_t nowMs = group.members[0].state.lastHbMs + 10;
|
int32_t nowMs = group.members[0].state.lastHbMs + 10;
|
||||||
|
|
||||||
SArbGroup newGroup = {0};
|
SArbGroup newGroup = {0};
|
||||||
bool updateToken = mndUpdateArbGroupByHeartBeat(&group, &rspMember, nowMs, dnodeId, &newGroup);
|
bool updateToken = mndCheckArbGroupByHeartBeat(&group, &rspMember, nowMs, dnodeId, &newGroup);
|
||||||
|
|
||||||
ASSERT_EQ(updateToken, false);
|
ASSERT_EQ(updateToken, false);
|
||||||
ASSERT_NE(group.members[0].state.responsedHbSeq, rspMember.hbSeq);
|
ASSERT_NE(group.members[0].state.responsedHbSeq, rspMember.hbSeq);
|
||||||
|
@ -142,7 +142,7 @@ TEST_F(ArbgroupTest, 02_process_heart_beat_rsp) {
|
||||||
int32_t nowMs = group.members[0].state.lastHbMs + 10;
|
int32_t nowMs = group.members[0].state.lastHbMs + 10;
|
||||||
|
|
||||||
SArbGroup newGroup = {0};
|
SArbGroup newGroup = {0};
|
||||||
bool updateToken = mndUpdateArbGroupByHeartBeat(&group, &rspMember, nowMs, dnodeId, &newGroup);
|
bool updateToken = mndCheckArbGroupByHeartBeat(&group, &rspMember, nowMs, dnodeId, &newGroup);
|
||||||
|
|
||||||
ASSERT_EQ(updateToken, false);
|
ASSERT_EQ(updateToken, false);
|
||||||
ASSERT_EQ(group.members[0].state.responsedHbSeq, rspMember.hbSeq);
|
ASSERT_EQ(group.members[0].state.responsedHbSeq, rspMember.hbSeq);
|
||||||
|
@ -157,7 +157,7 @@ TEST_F(ArbgroupTest, 02_process_heart_beat_rsp) {
|
||||||
int32_t nowMs = group.members[0].state.lastHbMs + 10;
|
int32_t nowMs = group.members[0].state.lastHbMs + 10;
|
||||||
|
|
||||||
SArbGroup newGroup = {0};
|
SArbGroup newGroup = {0};
|
||||||
bool updateToken = mndUpdateArbGroupByHeartBeat(&group, &rspMember, nowMs, dnodeId, &newGroup);
|
bool updateToken = mndCheckArbGroupByHeartBeat(&group, &rspMember, nowMs, dnodeId, &newGroup);
|
||||||
|
|
||||||
ASSERT_EQ(updateToken, true);
|
ASSERT_EQ(updateToken, true);
|
||||||
ASSERT_EQ(group.members[0].state.responsedHbSeq, rspMember.hbSeq);
|
ASSERT_EQ(group.members[0].state.responsedHbSeq, rspMember.hbSeq);
|
||||||
|
|
Loading…
Reference in New Issue