enh: arb check roletime before check sync
This commit is contained in:
parent
230631a535
commit
a8383369ba
|
@ -101,6 +101,8 @@ int32_t mndGetMonitorInfo(SMnode *pMnode, SMonClusterInfo *pClusterInfo, SMonVgr
|
||||||
*/
|
*/
|
||||||
int32_t mndGetLoad(SMnode *pMnode, SMnodeLoad *pLoad);
|
int32_t mndGetLoad(SMnode *pMnode, SMnodeLoad *pLoad);
|
||||||
|
|
||||||
|
int64_t mndGetRoleTimeMs(SMnode *pMnode);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @brief Process the rpc, sync request.
|
* @brief Process the rpc, sync request.
|
||||||
*
|
*
|
||||||
|
|
|
@ -540,6 +540,14 @@ static int32_t mndProcessArbCheckSyncTimer(SRpcMsg *pReq) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int64_t roleTimeMs = mndGetRoleTimeMs(pMnode);
|
||||||
|
int64_t nowMs = taosGetTimestampMs();
|
||||||
|
if (nowMs - roleTimeMs < tsArbHeartBeatIntervalSec * 1000 * 2) {
|
||||||
|
mInfo("arb skip to check sync since mnd had just switch over, roleTime:%" PRId64 " now:%" PRId64, roleTimeMs,
|
||||||
|
nowMs);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
SArray *pUpdateArray = taosArrayInit(16, sizeof(SArbGroup));
|
SArray *pUpdateArray = taosArrayInit(16, sizeof(SArbGroup));
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
|
@ -551,7 +559,6 @@ static int32_t mndProcessArbCheckSyncTimer(SRpcMsg *pReq) {
|
||||||
taosThreadMutexUnlock(&pArbGroup->mutex);
|
taosThreadMutexUnlock(&pArbGroup->mutex);
|
||||||
|
|
||||||
int32_t vgId = arbGroupDup.vgId;
|
int32_t vgId = arbGroupDup.vgId;
|
||||||
int64_t nowMs = taosGetTimestampMs();
|
|
||||||
|
|
||||||
bool member0IsTimeout = mndCheckArbMemberHbTimeout(&arbGroupDup, 0, nowMs);
|
bool member0IsTimeout = mndCheckArbMemberHbTimeout(&arbGroupDup, 0, nowMs);
|
||||||
bool member1IsTimeout = mndCheckArbMemberHbTimeout(&arbGroupDup, 1, nowMs);
|
bool member1IsTimeout = mndCheckArbMemberHbTimeout(&arbGroupDup, 1, nowMs);
|
||||||
|
|
|
@ -334,6 +334,8 @@ static int32_t minCronTime() {
|
||||||
min = TMIN(min, tsStreamCheckpointInterval);
|
min = TMIN(min, tsStreamCheckpointInterval);
|
||||||
min = TMIN(min, 6); // checkpointRemain
|
min = TMIN(min, 6); // checkpointRemain
|
||||||
min = TMIN(min, tsStreamNodeCheckInterval);
|
min = TMIN(min, tsStreamNodeCheckInterval);
|
||||||
|
min = TMIN(min, tsArbHeartBeatIntervalSec);
|
||||||
|
min = TMIN(min, tsArbCheckSyncIntervalSec);
|
||||||
|
|
||||||
int64_t telemInt = TMIN(60, (tsTelemInterval - 1));
|
int64_t telemInt = TMIN(60, (tsTelemInterval - 1));
|
||||||
min = TMIN(min, telemInt);
|
min = TMIN(min, telemInt);
|
||||||
|
@ -390,6 +392,18 @@ void mndDoTimerPullupTask(SMnode *pMnode, int64_t sec) {
|
||||||
if (sec % tsUptimeInterval == 0) {
|
if (sec % tsUptimeInterval == 0) {
|
||||||
mndIncreaseUpTime(pMnode);
|
mndIncreaseUpTime(pMnode);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (sec % (tsArbHeartBeatIntervalSec) == 0) {
|
||||||
|
if (mndPullupArbHeartbeat(pMnode) != 0) {
|
||||||
|
mError("failed to pullup arb heartbeat, since:%s", terrstr());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (sec % (tsArbCheckSyncIntervalSec) == 0) {
|
||||||
|
if (mndPullupArbCheckSync(pMnode) != 0) {
|
||||||
|
mError("failed to pullup arb check sync, since:%s", terrstr());
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
void mndDoTimerCheckTask(SMnode *pMnode, int64_t sec) {
|
void mndDoTimerCheckTask(SMnode *pMnode, int64_t sec) {
|
||||||
if (sec % (tsStatusInterval * 5) == 0) {
|
if (sec % (tsStatusInterval * 5) == 0) {
|
||||||
|
@ -421,18 +435,6 @@ static void *mndThreadFp(void *param) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
mndDoTimerPullupTask(pMnode, sec);
|
mndDoTimerPullupTask(pMnode, sec);
|
||||||
|
|
||||||
if (sec % (tsArbHeartBeatIntervalSec) == 0) {
|
|
||||||
if (mndPullupArbHeartbeat(pMnode) != 0) {
|
|
||||||
mError("failed to pullup arb heartbeat, since:%s", terrstr());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (sec % (tsArbCheckSyncIntervalSec) == 0) {
|
|
||||||
if (mndPullupArbCheckSync(pMnode) != 0) {
|
|
||||||
mError("failed to pullup arb check sync, since:%s", terrstr());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return NULL;
|
return NULL;
|
||||||
|
@ -1076,6 +1078,11 @@ int32_t mndGetLoad(SMnode *pMnode, SMnodeLoad *pLoad) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int64_t mndGetRoleTimeMs(SMnode *pMnode) {
|
||||||
|
SSyncState state = syncGetState(pMnode->syncMgmt.sync);
|
||||||
|
return state.roleTimeMs;
|
||||||
|
}
|
||||||
|
|
||||||
void mndSetRestored(SMnode *pMnode, bool restored) {
|
void mndSetRestored(SMnode *pMnode, bool restored) {
|
||||||
if (restored) {
|
if (restored) {
|
||||||
taosThreadRwlockWrlock(&pMnode->lock);
|
taosThreadRwlockWrlock(&pMnode->lock);
|
||||||
|
|
Loading…
Reference in New Issue