diff --git a/source/libs/sync/inc/syncSnapshot.h b/source/libs/sync/inc/syncSnapshot.h index 540255c200..839b87d500 100644 --- a/source/libs/sync/inc/syncSnapshot.h +++ b/source/libs/sync/inc/syncSnapshot.h @@ -91,6 +91,7 @@ typedef struct SSyncSnapshotReceiver { // update when begin void *pWriter; + TdThreadMutex writerMutex; SSnapshotParam snapshotParam; SSnapshot snapshot; diff --git a/source/libs/sync/src/syncSnapshot.c b/source/libs/sync/src/syncSnapshot.c index 78fda6b093..cf4c99b989 100644 --- a/source/libs/sync/src/syncSnapshot.c +++ b/source/libs/sync/src/syncSnapshot.c @@ -429,6 +429,7 @@ int32_t snapshotReceiverCreate(SSyncNode *pSyncNode, SRaftId fromId, SSyncSnapsh pReceiver->startTime = 0; pReceiver->ack = SYNC_SNAPSHOT_SEQ_BEGIN; pReceiver->pWriter = NULL; + (void)taosThreadMutexInit(&pReceiver->writerMutex, NULL); pReceiver->pSyncNode = pSyncNode; pReceiver->fromId = fromId; pReceiver->term = raftStoreGetTerm(pSyncNode); @@ -468,6 +469,7 @@ static int32_t snapshotReceiverClearInfoData(SSyncSnapshotReceiver *pReceiver) { void snapshotReceiverDestroy(SSyncSnapshotReceiver *pReceiver) { if (pReceiver == NULL) return; + (void)taosThreadMutexLock(&pReceiver->writerMutex); // close writer if (pReceiver->pWriter != NULL) { int32_t code = pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, @@ -478,6 +480,9 @@ void snapshotReceiverDestroy(SSyncSnapshotReceiver *pReceiver) { } pReceiver->pWriter = NULL; } + (void)taosThreadMutexUnlock(&pReceiver->writerMutex); + + (void)taosThreadMutexDestroy(&pReceiver->writerMutex); // free snap buf if (pReceiver->pRcvBuf) { @@ -556,7 +561,8 @@ void snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver) { int8_t stopped = !atomic_val_compare_exchange_8(&pReceiver->start, true, false); if (stopped) return; - (void)taosThreadMutexLock(&pReceiver->pRcvBuf->mutex); + + (void)taosThreadMutexLock(&pReceiver->writerMutex); { if (pReceiver->pWriter != NULL) { int32_t code = pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, @@ -568,7 +574,11 @@ void snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver) { } else { sRInfo(pReceiver, "snapshot receiver stop, writer is null"); } + } + (void)taosThreadMutexUnlock(&pReceiver->writerMutex); + (void)taosThreadMutexLock(&pReceiver->pRcvBuf->mutex); + { syncSnapBufferReset(pReceiver->pRcvBuf); (void)snapshotReceiverClearInfoData(pReceiver); @@ -600,15 +610,19 @@ static int32_t snapshotReceiverFinish(SSyncSnapshotReceiver *pReceiver, SyncSnap raftStoreSetTerm(pReceiver->pSyncNode, pReceiver->snapshot.lastApplyTerm); } - // stop writer, apply data - code = pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, true, - &pReceiver->snapshot); - if (code != 0) { - sRError(pReceiver, "snapshot receiver apply failed since %s", tstrerror(code)); - TAOS_RETURN(code); + (void)taosThreadMutexLock(&pReceiver->writerMutex); + if (pReceiver->pWriter != NULL) { + // stop writer, apply data + code = pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, true, + &pReceiver->snapshot); + if (code != 0) { + sRError(pReceiver, "snapshot receiver apply failed since %s", tstrerror(code)); + TAOS_RETURN(code); + } + pReceiver->pWriter = NULL; + sRInfo(pReceiver, "snapshot receiver write stopped"); } - pReceiver->pWriter = NULL; - sRInfo(pReceiver, "snapshot receiver write stopped"); + (void)taosThreadMutexUnlock(&pReceiver->writerMutex); // update progress pReceiver->ack = SYNC_SNAPSHOT_SEQ_END;