From 1c60e67a8380813bb78f1cde4f86dce03d101de8 Mon Sep 17 00:00:00 2001 From: Benguang Zhao Date: Tue, 24 Oct 2023 14:16:46 +0800 Subject: [PATCH] enh: send the END snap msg at last --- source/libs/sync/src/syncSnapshot.c | 52 ++++++++++++++++++----------- 1 file changed, 32 insertions(+), 20 deletions(-) diff --git a/source/libs/sync/src/syncSnapshot.c b/source/libs/sync/src/syncSnapshot.c index 3017f4e33c..7e556b28f5 100644 --- a/source/libs/sync/src/syncSnapshot.c +++ b/source/libs/sync/src/syncSnapshot.c @@ -247,25 +247,28 @@ static int32_t snapshotSend(SSyncSnapshotSender *pSender) { pSender->blockLen = 0; } - pSender->seq++; + if (pSender->seq != SYNC_SNAPSHOT_SEQ_END) { + pSender->seq++; - // read data - int32_t ret = pSender->pSyncNode->pFsm->FpSnapshotDoRead(pSender->pSyncNode->pFsm, pSender->pReader, - &pSender->pCurrentBlock, &pSender->blockLen); - if (ret != 0) { - sSError(pSender, "snapshot sender read failed since %s", terrstr()); - return -1; - } + // read data + int32_t ret = pSender->pSyncNode->pFsm->FpSnapshotDoRead(pSender->pSyncNode->pFsm, pSender->pReader, + &pSender->pCurrentBlock, &pSender->blockLen); + if (ret != 0) { + sSError(pSender, "snapshot sender read failed since %s", terrstr()); + return -1; + } - if (pSender->blockLen > 0) { - // has read data - sSDebug(pSender, "vgId:%d, snapshot sender continue to read, blockLen:%d seq:%d", pSender->pSyncNode->vgId, - pSender->blockLen, pSender->seq); - } else { - // read finish, update seq to end - pSender->seq = SYNC_SNAPSHOT_SEQ_END; - sSInfo(pSender, "vgId:%d, snapshot sender read to the end, blockLen:%d seq:%d", pSender->pSyncNode->vgId, - pSender->blockLen, pSender->seq); + if (pSender->blockLen > 0) { + // has read data + sSDebug(pSender, "vgId:%d, snapshot sender continue to read, blockLen:%d seq:%d", pSender->pSyncNode->vgId, + pSender->blockLen, pSender->seq); + } else { + // read finish, update seq to end + pSender->seq = SYNC_SNAPSHOT_SEQ_END; + sSInfo(pSender, "vgId:%d, snapshot sender read to the end, blockLen:%d seq:%d", pSender->pSyncNode->vgId, + pSender->blockLen, pSender->seq); + return 0; + } } // build msg @@ -1188,19 +1191,28 @@ static int32_t syncSnapBufferSend(SSyncSnapshotSender *pSender, SyncSnapshotRsp } } - for (int64_t ack = pSndBuf->start; ack < pSndBuf->cursor; ++ack) { + for (int64_t ack = pSndBuf->start; ack <= pSndBuf->cursor; ++ack) { rpcFreeCont(pSndBuf->entries[ack % pSndBuf->size]); pSndBuf->entries[ack % pSndBuf->size] = NULL; pSndBuf->start = ack + 1; } - while (pSender->seq - pSndBuf->start < (pSndBuf->size >> 2)) { + while (pSender->seq != SYNC_SNAPSHOT_SEQ_END && pSender->seq - pSndBuf->start < (pSndBuf->size >> 2)) { + if (snapshotSend(pSender) != 0) { + code = terrno; + goto _out; + } + if (pSender->seq != SYNC_SNAPSHOT_SEQ_END) { + pSndBuf->end = TMAX(pSender->seq + 1, pSndBuf->end); + } + } + + if (pSender->seq == SYNC_SNAPSHOT_SEQ_END && pSndBuf->end <= pSndBuf->start) { if (snapshotSend(pSender) != 0) { code = terrno; goto _out; } } - _out: taosThreadMutexUnlock(&pSndBuf->mutex); return code;