From af8a5c0ada376acca68166450901d8fa5c843098 Mon Sep 17 00:00:00 2001 From: Benguang Zhao Date: Tue, 12 Dec 2023 19:22:34 +0800 Subject: [PATCH] enh: protect stop of snap sender and receiver with the mutex of its buffer --- source/libs/sync/src/syncSnapshot.c | 55 ++++++++++++++++------------- 1 file changed, 31 insertions(+), 24 deletions(-) diff --git a/source/libs/sync/src/syncSnapshot.c b/source/libs/sync/src/syncSnapshot.c index 98cabca521..d7b19a75cd 100644 --- a/source/libs/sync/src/syncSnapshot.c +++ b/source/libs/sync/src/syncSnapshot.c @@ -24,7 +24,6 @@ #include "syncUtil.h" static void syncSnapBufferReset(SSyncSnapBuffer *pBuf) { - taosThreadMutexLock(&pBuf->mutex); for (int64_t i = pBuf->start; i < pBuf->end; ++i) { if (pBuf->entryDeleteCb) { pBuf->entryDeleteCb(pBuf->entries[i % pBuf->size]); @@ -34,7 +33,6 @@ static void syncSnapBufferReset(SSyncSnapBuffer *pBuf) { pBuf->start = SYNC_SNAPSHOT_SEQ_BEGIN + 1; pBuf->end = pBuf->start; pBuf->cursor = pBuf->start - 1; - taosThreadMutexUnlock(&pBuf->mutex); } static void syncSnapBufferDestroy(SSyncSnapBuffer **ppBuf) { @@ -198,20 +196,23 @@ void snapshotSenderStop(SSyncSnapshotSender *pSender, bool finish) { // update flag int8_t stopped = !atomic_val_compare_exchange_8(&pSender->start, true, false); if (stopped) return; + taosThreadMutexLock(&pSender->pSndBuf->mutex); + { + pSender->finish = finish; + pSender->waitTime = -1; - pSender->finish = finish; - pSender->waitTime = -1; + // close reader + if (pSender->pReader != NULL) { + pSender->pSyncNode->pFsm->FpSnapshotStopRead(pSender->pSyncNode->pFsm, pSender->pReader); + pSender->pReader = NULL; + } - // close reader - if (pSender->pReader != NULL) { - pSender->pSyncNode->pFsm->FpSnapshotStopRead(pSender->pSyncNode->pFsm, pSender->pReader); - pSender->pReader = NULL; + syncSnapBufferReset(pSender->pSndBuf); + + SRaftId destId = pSender->pSyncNode->replicasId[pSender->replicaIndex]; + sSInfo(pSender, "snapshot sender stop, to dnode:%d, finish:%d", DID(&destId), finish); } - - syncSnapBufferReset(pSender->pSndBuf); - - SRaftId destId = pSender->pSyncNode->replicasId[pSender->replicaIndex]; - sSInfo(pSender, "snapshot sender stop, to dnode:%d, finish:%d", DID(&destId), finish); + taosThreadMutexUnlock(&pSender->pSndBuf->mutex); } int32_t syncSnapSendMsg(SSyncSnapshotSender *pSender, int32_t seq, void *pBlock, int32_t blockLen, int32_t typ) { @@ -324,6 +325,9 @@ int32_t snapshotReSend(SSyncSnapshotSender *pSender) { SSyncSnapBuffer *pSndBuf = pSender->pSndBuf; int32_t code = -1; taosThreadMutexLock(&pSndBuf->mutex); + if (pSender->pReader == NULL || pSender->finish || !snapshotSenderIsStart(pSender)) { + goto _out; + } for (int32_t seq = pSndBuf->cursor + 1; seq < pSndBuf->end; ++seq) { SyncSnapBlock *pBlk = pSndBuf->entries[seq % pSndBuf->size]; @@ -520,19 +524,22 @@ void snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver) { int8_t stopped = !atomic_val_compare_exchange_8(&pReceiver->start, true, false); if (stopped) return; - - if (pReceiver->pWriter != NULL) { - int32_t ret = pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, false, - &pReceiver->snapshot); - if (ret != 0) { - sRError(pReceiver, "snapshot receiver stop write failed since %s", terrstr()); + taosThreadMutexLock(&pReceiver->pRcvBuf->mutex); + { + if (pReceiver->pWriter != NULL) { + int32_t ret = pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, + false, &pReceiver->snapshot); + if (ret != 0) { + sRError(pReceiver, "snapshot receiver stop write failed since %s", terrstr()); + } + pReceiver->pWriter = NULL; + } else { + sRInfo(pReceiver, "snapshot receiver stop, writer is null"); } - pReceiver->pWriter = NULL; - } else { - sRInfo(pReceiver, "snapshot receiver stop, writer is null"); - } - syncSnapBufferReset(pReceiver->pRcvBuf); + syncSnapBufferReset(pReceiver->pRcvBuf); + } + taosThreadMutexUnlock(&pReceiver->pRcvBuf->mutex); } static int32_t snapshotReceiverFinish(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pMsg) {