enh: send the END snap msg at last

This commit is contained in:
Benguang Zhao 2023-10-24 14:16:46 +08:00
parent f444cd7a6d
commit 1c60e67a83
1 changed files with 32 additions and 20 deletions

View File

@ -247,25 +247,28 @@ static int32_t snapshotSend(SSyncSnapshotSender *pSender) {
pSender->blockLen = 0; pSender->blockLen = 0;
} }
pSender->seq++; if (pSender->seq != SYNC_SNAPSHOT_SEQ_END) {
pSender->seq++;
// read data // read data
int32_t ret = pSender->pSyncNode->pFsm->FpSnapshotDoRead(pSender->pSyncNode->pFsm, pSender->pReader, int32_t ret = pSender->pSyncNode->pFsm->FpSnapshotDoRead(pSender->pSyncNode->pFsm, pSender->pReader,
&pSender->pCurrentBlock, &pSender->blockLen); &pSender->pCurrentBlock, &pSender->blockLen);
if (ret != 0) { if (ret != 0) {
sSError(pSender, "snapshot sender read failed since %s", terrstr()); sSError(pSender, "snapshot sender read failed since %s", terrstr());
return -1; return -1;
} }
if (pSender->blockLen > 0) { if (pSender->blockLen > 0) {
// has read data // has read data
sSDebug(pSender, "vgId:%d, snapshot sender continue to read, blockLen:%d seq:%d", pSender->pSyncNode->vgId, sSDebug(pSender, "vgId:%d, snapshot sender continue to read, blockLen:%d seq:%d", pSender->pSyncNode->vgId,
pSender->blockLen, pSender->seq); pSender->blockLen, pSender->seq);
} else { } else {
// read finish, update seq to end // read finish, update seq to end
pSender->seq = SYNC_SNAPSHOT_SEQ_END; pSender->seq = SYNC_SNAPSHOT_SEQ_END;
sSInfo(pSender, "vgId:%d, snapshot sender read to the end, blockLen:%d seq:%d", pSender->pSyncNode->vgId, sSInfo(pSender, "vgId:%d, snapshot sender read to the end, blockLen:%d seq:%d", pSender->pSyncNode->vgId,
pSender->blockLen, pSender->seq); pSender->blockLen, pSender->seq);
return 0;
}
} }
// build msg // build msg
@ -1188,19 +1191,28 @@ static int32_t syncSnapBufferSend(SSyncSnapshotSender *pSender, SyncSnapshotRsp
} }
} }
for (int64_t ack = pSndBuf->start; ack < pSndBuf->cursor; ++ack) { for (int64_t ack = pSndBuf->start; ack <= pSndBuf->cursor; ++ack) {
rpcFreeCont(pSndBuf->entries[ack % pSndBuf->size]); rpcFreeCont(pSndBuf->entries[ack % pSndBuf->size]);
pSndBuf->entries[ack % pSndBuf->size] = NULL; pSndBuf->entries[ack % pSndBuf->size] = NULL;
pSndBuf->start = ack + 1; pSndBuf->start = ack + 1;
} }
while (pSender->seq - pSndBuf->start < (pSndBuf->size >> 2)) { while (pSender->seq != SYNC_SNAPSHOT_SEQ_END && pSender->seq - pSndBuf->start < (pSndBuf->size >> 2)) {
if (snapshotSend(pSender) != 0) {
code = terrno;
goto _out;
}
if (pSender->seq != SYNC_SNAPSHOT_SEQ_END) {
pSndBuf->end = TMAX(pSender->seq + 1, pSndBuf->end);
}
}
if (pSender->seq == SYNC_SNAPSHOT_SEQ_END && pSndBuf->end <= pSndBuf->start) {
if (snapshotSend(pSender) != 0) { if (snapshotSend(pSender) != 0) {
code = terrno; code = terrno;
goto _out; goto _out;
} }
} }
_out: _out:
taosThreadMutexUnlock(&pSndBuf->mutex); taosThreadMutexUnlock(&pSndBuf->mutex);
return code; return code;