fix: resend acked msgs on exceeding maximum retryWaitMs
This commit is contained in:
parent
ed43aeaa9b
commit
4718f4d1b6
|
@ -31,6 +31,10 @@ static bool syncIsMsgBlock(tmsg_t type) {
|
||||||
(type == TDMT_VND_UPDATE_TAG_VAL) || (type == TDMT_VND_ALTER_CONFIRM);
|
(type == TDMT_VND_UPDATE_TAG_VAL) || (type == TDMT_VND_ALTER_CONFIRM);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
FORCE_INLINE static int64_t syncGetRetryMaxWaitMs() {
|
||||||
|
return SYNC_LOG_REPL_RETRY_WAIT_MS * (1 << SYNC_MAX_RETRY_BACKOFF);
|
||||||
|
}
|
||||||
|
|
||||||
int64_t syncLogBufferGetEndIndex(SSyncLogBuffer* pBuf) {
|
int64_t syncLogBufferGetEndIndex(SSyncLogBuffer* pBuf) {
|
||||||
taosThreadMutexLock(&pBuf->mutex);
|
taosThreadMutexLock(&pBuf->mutex);
|
||||||
int64_t index = pBuf->endIndex;
|
int64_t index = pBuf->endIndex;
|
||||||
|
@ -627,7 +631,7 @@ int32_t syncLogReplMgrRetryOnNeed(SSyncLogReplMgr* pMgr, SSyncNode* pNode) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pMgr->states[pos].acked) {
|
if (pMgr->states[pos].acked && nowMs < pMgr->states[pos].timeMs + syncGetRetryMaxWaitMs()) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -791,7 +795,7 @@ int32_t syncLogReplMgrReplicateOnce(SSyncLogReplMgr* pMgr, SSyncNode* pNode) {
|
||||||
int32_t syncLogReplMgrReplicateProbeOnce(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex index) {
|
int32_t syncLogReplMgrReplicateProbeOnce(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex index) {
|
||||||
ASSERT(!pMgr->restored);
|
ASSERT(!pMgr->restored);
|
||||||
ASSERT(pMgr->startIndex >= 0);
|
ASSERT(pMgr->startIndex >= 0);
|
||||||
int64_t retryMaxWaitMs = SYNC_LOG_REPL_RETRY_WAIT_MS * (1 << SYNC_MAX_RETRY_BACKOFF);
|
int64_t retryMaxWaitMs = syncGetRetryMaxWaitMs();
|
||||||
int64_t nowMs = taosGetMonoTimestampMs();
|
int64_t nowMs = taosGetMonoTimestampMs();
|
||||||
|
|
||||||
if (pMgr->endIndex > pMgr->startIndex &&
|
if (pMgr->endIndex > pMgr->startIndex &&
|
||||||
|
@ -834,9 +838,11 @@ int32_t syncLogReplMgrReplicateAttemptedOnce(SSyncLogReplMgr* pMgr, SSyncNode* p
|
||||||
int32_t count = 0;
|
int32_t count = 0;
|
||||||
int64_t nowMs = taosGetMonoTimestampMs();
|
int64_t nowMs = taosGetMonoTimestampMs();
|
||||||
int64_t limit = pMgr->size >> 1;
|
int64_t limit = pMgr->size >> 1;
|
||||||
|
SyncTerm term = -1;
|
||||||
|
SyncIndex firstIndex = -1;
|
||||||
|
|
||||||
for (SyncIndex index = pMgr->endIndex; index <= pNode->pLogBuf->matchIndex; index++) {
|
for (SyncIndex index = pMgr->endIndex; index <= pNode->pLogBuf->matchIndex; index++) {
|
||||||
if (batchSize < count++ || limit <= index - pMgr->startIndex) {
|
if (batchSize < count || limit <= index - pMgr->startIndex) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
if (pMgr->startIndex + 1 < index && pMgr->states[(index - 1) % pMgr->size].barrier) {
|
if (pMgr->startIndex + 1 < index && pMgr->states[(index - 1) % pMgr->size].barrier) {
|
||||||
|
@ -845,7 +851,6 @@ int32_t syncLogReplMgrReplicateAttemptedOnce(SSyncLogReplMgr* pMgr, SSyncNode* p
|
||||||
int64_t pos = index % pMgr->size;
|
int64_t pos = index % pMgr->size;
|
||||||
SRaftId* pDestId = &pNode->replicasId[pMgr->peerId];
|
SRaftId* pDestId = &pNode->replicasId[pMgr->peerId];
|
||||||
bool barrier = false;
|
bool barrier = false;
|
||||||
SyncTerm term = -1;
|
|
||||||
if (syncLogBufferReplicateOneTo(pMgr, pNode, index, &term, pDestId, &barrier) < 0) {
|
if (syncLogBufferReplicateOneTo(pMgr, pNode, index, &term, pDestId, &barrier) < 0) {
|
||||||
sError("vgId:%d, failed to replicate log entry since %s. index: %" PRId64 ", dest: 0x%016" PRIx64 "", pNode->vgId,
|
sError("vgId:%d, failed to replicate log entry since %s. index: %" PRId64 ", dest: 0x%016" PRIx64 "", pNode->vgId,
|
||||||
terrstr(), index, pDestId->addr);
|
terrstr(), index, pDestId->addr);
|
||||||
|
@ -856,6 +861,9 @@ int32_t syncLogReplMgrReplicateAttemptedOnce(SSyncLogReplMgr* pMgr, SSyncNode* p
|
||||||
pMgr->states[pos].term = term;
|
pMgr->states[pos].term = term;
|
||||||
pMgr->states[pos].acked = false;
|
pMgr->states[pos].acked = false;
|
||||||
|
|
||||||
|
if (firstIndex == -1) firstIndex = index;
|
||||||
|
count++;
|
||||||
|
|
||||||
pMgr->endIndex = index + 1;
|
pMgr->endIndex = index + 1;
|
||||||
if (barrier) {
|
if (barrier) {
|
||||||
sInfo("vgId:%d, replicated sync barrier to dest: %" PRIx64 ". index: %" PRId64 ", term: %" PRId64
|
sInfo("vgId:%d, replicated sync barrier to dest: %" PRIx64 ". index: %" PRId64 ", term: %" PRId64
|
||||||
|
@ -869,10 +877,11 @@ int32_t syncLogReplMgrReplicateAttemptedOnce(SSyncLogReplMgr* pMgr, SSyncNode* p
|
||||||
syncLogReplMgrRetryOnNeed(pMgr, pNode);
|
syncLogReplMgrRetryOnNeed(pMgr, pNode);
|
||||||
|
|
||||||
SSyncLogBuffer* pBuf = pNode->pLogBuf;
|
SSyncLogBuffer* pBuf = pNode->pLogBuf;
|
||||||
sTrace("vgId:%d, attempted to replicate %d msgs to the %d'th peer. pMgr(rs:%d): [%" PRId64 " %" PRId64 ", %" PRId64
|
sTrace("vgId:%d, replicated %d msgs to peer: %" PRId64 ". indexes: %" PRId64 "..., terms: ...%" PRId64
|
||||||
"), pBuf: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")",
|
", mgr: (rs:%d) [%" PRId64 " %" PRId64 ", %" PRId64 "), buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64
|
||||||
pNode->vgId, count, pMgr->peerId, pMgr->restored, pMgr->startIndex, pMgr->matchIndex, pMgr->endIndex,
|
")",
|
||||||
pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex);
|
pNode->vgId, count, pDestId->addr, firstIndex, term, pMgr->restored, pMgr->startIndex, pMgr->matchIndex,
|
||||||
|
pMgr->endIndex, pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue