enh: make the range span of unconfirmed logs sent less than half of the size of log ring buffer
This commit is contained in:
parent
ff286e1f1c
commit
d5ae1ca18a
|
@ -41,7 +41,7 @@ extern "C" {
|
||||||
#define SNAPSHOT_WAIT_MS 1000 * 30
|
#define SNAPSHOT_WAIT_MS 1000 * 30
|
||||||
|
|
||||||
#define SYNC_MAX_RETRY_BACKOFF 5
|
#define SYNC_MAX_RETRY_BACKOFF 5
|
||||||
#define SYNC_LOG_REPL_RETRY_WAIT_MS 50
|
#define SYNC_LOG_REPL_RETRY_WAIT_MS 100
|
||||||
#define SYNC_APPEND_ENTRIES_TIMEOUT_MS 10000
|
#define SYNC_APPEND_ENTRIES_TIMEOUT_MS 10000
|
||||||
|
|
||||||
#define SYNC_MAX_BATCH_SIZE 1
|
#define SYNC_MAX_BATCH_SIZE 1
|
||||||
|
|
|
@ -566,33 +566,42 @@ int32_t syncLogReplMgrReset(SSyncLogReplMgr* pMgr) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
_Atomic int64_t tsRetryCnt = 0;
|
|
||||||
|
|
||||||
int32_t syncLogReplMgrRetryOnNeed(SSyncLogReplMgr* pMgr, SSyncNode* pNode) {
|
int32_t syncLogReplMgrRetryOnNeed(SSyncLogReplMgr* pMgr, SSyncNode* pNode) {
|
||||||
if (pMgr->endIndex <= pMgr->startIndex) {
|
if (pMgr->endIndex <= pMgr->startIndex) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
SRaftId* pDestId = &pNode->replicasId[pMgr->peerId];
|
||||||
|
if (pMgr->retryBackoff == SYNC_MAX_RETRY_BACKOFF) {
|
||||||
|
syncLogReplMgrReset(pMgr);
|
||||||
|
sWarn("vgId:%d, reset sync log repl mgr since retry backoff exceeding limit. peer: %" PRIx64, pNode->vgId,
|
||||||
|
pDestId->addr);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t ret = -1;
|
int32_t ret = -1;
|
||||||
bool retried = false;
|
bool retried = false;
|
||||||
int64_t retryWaitMs = syncLogGetRetryBackoffTimeMs(pMgr);
|
int64_t retryWaitMs = syncLogGetRetryBackoffTimeMs(pMgr);
|
||||||
|
int64_t nowMs = taosGetMonoTimestampMs();
|
||||||
|
int count = 0;
|
||||||
|
int64_t firstIndex = -1;
|
||||||
|
SyncTerm term = -1;
|
||||||
|
|
||||||
for (SyncIndex index = pMgr->startIndex; index < pMgr->endIndex; index++) {
|
for (SyncIndex index = pMgr->startIndex; index < pMgr->endIndex; index++) {
|
||||||
int64_t pos = index % pMgr->size;
|
int64_t pos = index % pMgr->size;
|
||||||
ASSERT(!pMgr->states[pos].barrier || (index == pMgr->startIndex || index + 1 == pMgr->endIndex));
|
ASSERT(!pMgr->states[pos].barrier || (index == pMgr->startIndex || index + 1 == pMgr->endIndex));
|
||||||
if (pMgr->states[pos].acked) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
int64_t nowMs = taosGetMonoTimestampMs();
|
|
||||||
if (nowMs < pMgr->states[pos].timeMs + retryWaitMs) {
|
if (nowMs < pMgr->states[pos].timeMs + retryWaitMs) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
SRaftId* pDestId = &pNode->replicasId[pMgr->peerId];
|
if (pMgr->states[pos].acked) {
|
||||||
bool barrier = false;
|
continue;
|
||||||
SyncTerm term = -1;
|
}
|
||||||
|
|
||||||
|
bool barrier = false;
|
||||||
if (syncLogBufferReplicateOneTo(pMgr, pNode, index, &term, pDestId, &barrier) < 0) {
|
if (syncLogBufferReplicateOneTo(pMgr, pNode, index, &term, pDestId, &barrier) < 0) {
|
||||||
sError("vgId:%d, failed to replicate log entry since %s. index: %" PRId64 ", dest: 0x%016" PRIx64 "", pNode->vgId,
|
sError("vgId:%d, failed to replicate log entry since %s. index: %" PRId64 ", dest: %" PRIx64 "", pNode->vgId,
|
||||||
terrstr(), index, pDestId->addr);
|
terrstr(), index, pDestId->addr);
|
||||||
goto _out;
|
goto _out;
|
||||||
}
|
}
|
||||||
|
@ -601,13 +610,19 @@ int32_t syncLogReplMgrRetryOnNeed(SSyncLogReplMgr* pMgr, SSyncNode* pNode) {
|
||||||
pMgr->states[pos].term = term;
|
pMgr->states[pos].term = term;
|
||||||
pMgr->states[pos].acked = false;
|
pMgr->states[pos].acked = false;
|
||||||
retried = true;
|
retried = true;
|
||||||
tsRetryCnt++;
|
if (firstIndex == -1) firstIndex = index;
|
||||||
|
count++;
|
||||||
}
|
}
|
||||||
|
|
||||||
ret = 0;
|
ret = 0;
|
||||||
_out:
|
_out:
|
||||||
if (retried) {
|
if (retried) {
|
||||||
pMgr->retryBackoff = syncLogGetNextRetryBackoff(pMgr);
|
pMgr->retryBackoff = syncLogGetNextRetryBackoff(pMgr);
|
||||||
|
sInfo("vgId:%d, resend %d raft log entries. dest: %" PRIx64 ", for indexes: %" PRId64
|
||||||
|
" etc., maybe of term: %" PRId64 ", retryWaitMs: %" PRId64 ", repl mgr: [%" PRId64 " %" PRId64 ", %" PRId64
|
||||||
|
")",
|
||||||
|
pNode->vgId, count, pDestId->addr, firstIndex, term, retryWaitMs, pMgr->startIndex, pMgr->matchIndex,
|
||||||
|
pMgr->endIndex);
|
||||||
}
|
}
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
@ -771,9 +786,10 @@ int32_t syncLogReplMgrReplicateAttemptedOnce(SSyncLogReplMgr* pMgr, SSyncNode* p
|
||||||
int32_t batchSize = TMAX(1, pMgr->size / 20);
|
int32_t batchSize = TMAX(1, pMgr->size / 20);
|
||||||
int32_t count = 0;
|
int32_t count = 0;
|
||||||
int64_t nowMs = taosGetMonoTimestampMs();
|
int64_t nowMs = taosGetMonoTimestampMs();
|
||||||
|
int64_t limit = pMgr->size >> 1;
|
||||||
|
|
||||||
for (SyncIndex index = pMgr->endIndex; index <= pNode->pLogBuf->matchIndex; index++) {
|
for (SyncIndex index = pMgr->endIndex; index <= pNode->pLogBuf->matchIndex; index++) {
|
||||||
if (batchSize < count++ || pMgr->startIndex + pMgr->size <= index) {
|
if (batchSize < count++ || limit <= index - pMgr->startIndex) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
if (pMgr->startIndex + 1 < index && pMgr->states[(index - 1) % pMgr->size].barrier) {
|
if (pMgr->startIndex + 1 < index && pMgr->states[(index - 1) % pMgr->size].barrier) {
|
||||||
|
@ -800,12 +816,13 @@ int32_t syncLogReplMgrReplicateAttemptedOnce(SSyncLogReplMgr* pMgr, SSyncNode* p
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
syncLogReplMgrRetryOnNeed(pMgr, pNode);
|
||||||
|
|
||||||
SSyncLogBuffer* pBuf = pNode->pLogBuf;
|
SSyncLogBuffer* pBuf = pNode->pLogBuf;
|
||||||
sTrace("vgId:%d, attempted to replicate %d msgs to the %d'th peer. pMgr(rs:%d): [%" PRId64 " %" PRId64 ", %" PRId64
|
sTrace("vgId:%d, attempted to replicate %d msgs to the %d'th peer. pMgr(rs:%d): [%" PRId64 " %" PRId64 ", %" PRId64
|
||||||
"), pBuf: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")",
|
"), pBuf: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")",
|
||||||
pNode->vgId, count, pMgr->peerId, pMgr->restored, pMgr->startIndex, pMgr->matchIndex, pMgr->endIndex,
|
pNode->vgId, count, pMgr->peerId, pMgr->restored, pMgr->startIndex, pMgr->matchIndex, pMgr->endIndex,
|
||||||
pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex);
|
pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex);
|
||||||
syncLogReplMgrRetryOnNeed(pMgr, pNode);
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue