diff --git a/source/libs/sync/src/syncSnapshot.c b/source/libs/sync/src/syncSnapshot.c index f060e9da13..2277b70c8f 100644 --- a/source/libs/sync/src/syncSnapshot.c +++ b/source/libs/sync/src/syncSnapshot.c @@ -1087,7 +1087,10 @@ static int32_t syncSnapSenderExchgSnapInfo(SSyncNode *pSyncNode, SSyncSnapshotSe // sender static int32_t syncNodeOnSnapshotPrepRsp(SSyncNode *pSyncNode, SSyncSnapshotSender *pSender, SyncSnapshotRsp *pMsg) { + int32_t code = -1; SSnapshot snapshot = {0}; + + taosThreadMutexLock(&pSender->pSndBuf->mutex); pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot); // prepare @@ -1103,20 +1106,24 @@ static int32_t syncNodeOnSnapshotPrepRsp(SSyncNode *pSyncNode, SSyncSnapshotSend // start reader if (pMsg->payloadType == TDMT_SYNC_PREP_SNAPSHOT_REPLY) { if (syncSnapSenderExchgSnapInfo(pSyncNode, pSender, pMsg) != 0) { - return -1; + goto _out; } } - int32_t code = pSyncNode->pFsm->FpSnapshotStartRead(pSyncNode->pFsm, &pSender->snapshotParam, &pSender->pReader); + code = pSyncNode->pFsm->FpSnapshotStartRead(pSyncNode->pFsm, &pSender->snapshotParam, &pSender->pReader); if (code != 0) { sSError(pSender, "prepare snapshot failed since %s", terrstr()); - return -1; + goto _out; } // update next index syncIndexMgrSetIndex(pSyncNode->pNextIndex, &pMsg->srcId, snapshot.lastApplyIndex + 1); - return snapshotSend(pSender); + code = snapshotSend(pSender); + +_out: + taosThreadMutexUnlock(&pSender->pSndBuf->mutex); + return code; } static int32_t snapshotSenderSignatureCmp(SSyncSnapshotSender *pSender, SyncSnapshotRsp *pMsg) {