Merge pull request #24039 from taosdata/FIX/TD-27843-3.0

enh: resend new snap replication msg on all acked if not finished yet
This commit is contained in:
wade zhang 2023-12-13 09:51:31 +08:00 committed by GitHub
commit 3be9af9765
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 38 additions and 25 deletions

View File

@ -115,7 +115,7 @@ static int32_t tsdbDataFileRAWWriterDoClose(SDataFileRAWWriter *writer) { return
static int32_t tsdbDataFileRAWWriterCloseCommit(SDataFileRAWWriter *writer, TFileOpArray *opArr) {
int32_t code = 0;
int32_t lino = 0;
ASSERT(writer->ctx->offset == writer->file.size);
ASSERT(writer->ctx->offset <= writer->file.size);
ASSERT(writer->config->fid == writer->file.fid);
STFileOp op = (STFileOp){

View File

@ -24,7 +24,6 @@
#include "syncUtil.h"
static void syncSnapBufferReset(SSyncSnapBuffer *pBuf) {
taosThreadMutexLock(&pBuf->mutex);
for (int64_t i = pBuf->start; i < pBuf->end; ++i) {
if (pBuf->entryDeleteCb) {
pBuf->entryDeleteCb(pBuf->entries[i % pBuf->size]);
@ -34,7 +33,6 @@ static void syncSnapBufferReset(SSyncSnapBuffer *pBuf) {
pBuf->start = SYNC_SNAPSHOT_SEQ_BEGIN + 1;
pBuf->end = pBuf->start;
pBuf->cursor = pBuf->start - 1;
taosThreadMutexUnlock(&pBuf->mutex);
}
static void syncSnapBufferDestroy(SSyncSnapBuffer **ppBuf) {
@ -198,7 +196,8 @@ void snapshotSenderStop(SSyncSnapshotSender *pSender, bool finish) {
// update flag
int8_t stopped = !atomic_val_compare_exchange_8(&pSender->start, true, false);
if (stopped) return;
taosThreadMutexLock(&pSender->pSndBuf->mutex);
{
pSender->finish = finish;
pSender->waitTime = -1;
@ -213,6 +212,8 @@ void snapshotSenderStop(SSyncSnapshotSender *pSender, bool finish) {
SRaftId destId = pSender->pSyncNode->replicasId[pSender->replicaIndex];
sSInfo(pSender, "snapshot sender stop, to dnode:%d, finish:%d", DID(&destId), finish);
}
taosThreadMutexUnlock(&pSender->pSndBuf->mutex);
}
int32_t syncSnapSendMsg(SSyncSnapshotSender *pSender, int32_t seq, void *pBlock, int32_t blockLen, int32_t typ) {
int32_t code = -1;
@ -324,6 +325,9 @@ int32_t snapshotReSend(SSyncSnapshotSender *pSender) {
SSyncSnapBuffer *pSndBuf = pSender->pSndBuf;
int32_t code = -1;
taosThreadMutexLock(&pSndBuf->mutex);
if (pSender->pReader == NULL || pSender->finish || !snapshotSenderIsStart(pSender)) {
goto _out;
}
for (int32_t seq = pSndBuf->cursor + 1; seq < pSndBuf->end; ++seq) {
SyncSnapBlock *pBlk = pSndBuf->entries[seq % pSndBuf->size];
@ -338,6 +342,12 @@ int32_t snapshotReSend(SSyncSnapshotSender *pSender) {
pBlk->sendTimeMs = nowMs;
}
if (pSender->seq != SYNC_SNAPSHOT_SEQ_END && pSndBuf->end <= pSndBuf->start) {
if (snapshotSend(pSender) != 0) {
goto _out;
}
}
if (pSender->seq == SYNC_SNAPSHOT_SEQ_END && pSndBuf->end <= pSndBuf->start) {
if (syncSnapSendMsg(pSender, pSender->seq, NULL, 0, 0) != 0) {
goto _out;
@ -514,10 +524,11 @@ void snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver) {
int8_t stopped = !atomic_val_compare_exchange_8(&pReceiver->start, true, false);
if (stopped) return;
taosThreadMutexLock(&pReceiver->pRcvBuf->mutex);
{
if (pReceiver->pWriter != NULL) {
int32_t ret = pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, false,
&pReceiver->snapshot);
int32_t ret = pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter,
false, &pReceiver->snapshot);
if (ret != 0) {
sRError(pReceiver, "snapshot receiver stop write failed since %s", terrstr());
}
@ -528,6 +539,8 @@ void snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver) {
syncSnapBufferReset(pReceiver->pRcvBuf);
}
taosThreadMutexUnlock(&pReceiver->pRcvBuf->mutex);
}
static int32_t snapshotReceiverFinish(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pMsg) {
int32_t code = 0;