enh: protect processing of snap preparation rsp with sender buf mutex
This commit is contained in:
parent
ecab6655a6
commit
4c13dee721
|
@ -1087,7 +1087,10 @@ static int32_t syncSnapSenderExchgSnapInfo(SSyncNode *pSyncNode, SSyncSnapshotSe
|
||||||
|
|
||||||
// sender
|
// sender
|
||||||
static int32_t syncNodeOnSnapshotPrepRsp(SSyncNode *pSyncNode, SSyncSnapshotSender *pSender, SyncSnapshotRsp *pMsg) {
|
static int32_t syncNodeOnSnapshotPrepRsp(SSyncNode *pSyncNode, SSyncSnapshotSender *pSender, SyncSnapshotRsp *pMsg) {
|
||||||
|
int32_t code = -1;
|
||||||
SSnapshot snapshot = {0};
|
SSnapshot snapshot = {0};
|
||||||
|
|
||||||
|
taosThreadMutexLock(&pSender->pSndBuf->mutex);
|
||||||
pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
|
pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
|
||||||
|
|
||||||
// prepare <begin, end>
|
// prepare <begin, end>
|
||||||
|
@ -1103,20 +1106,24 @@ static int32_t syncNodeOnSnapshotPrepRsp(SSyncNode *pSyncNode, SSyncSnapshotSend
|
||||||
// start reader
|
// start reader
|
||||||
if (pMsg->payloadType == TDMT_SYNC_PREP_SNAPSHOT_REPLY) {
|
if (pMsg->payloadType == TDMT_SYNC_PREP_SNAPSHOT_REPLY) {
|
||||||
if (syncSnapSenderExchgSnapInfo(pSyncNode, pSender, pMsg) != 0) {
|
if (syncSnapSenderExchgSnapInfo(pSyncNode, pSender, pMsg) != 0) {
|
||||||
return -1;
|
goto _out;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t code = pSyncNode->pFsm->FpSnapshotStartRead(pSyncNode->pFsm, &pSender->snapshotParam, &pSender->pReader);
|
code = pSyncNode->pFsm->FpSnapshotStartRead(pSyncNode->pFsm, &pSender->snapshotParam, &pSender->pReader);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
sSError(pSender, "prepare snapshot failed since %s", terrstr());
|
sSError(pSender, "prepare snapshot failed since %s", terrstr());
|
||||||
return -1;
|
goto _out;
|
||||||
}
|
}
|
||||||
|
|
||||||
// update next index
|
// update next index
|
||||||
syncIndexMgrSetIndex(pSyncNode->pNextIndex, &pMsg->srcId, snapshot.lastApplyIndex + 1);
|
syncIndexMgrSetIndex(pSyncNode->pNextIndex, &pMsg->srcId, snapshot.lastApplyIndex + 1);
|
||||||
|
|
||||||
return snapshotSend(pSender);
|
code = snapshotSend(pSender);
|
||||||
|
|
||||||
|
_out:
|
||||||
|
taosThreadMutexUnlock(&pSender->pSndBuf->mutex);
|
||||||
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t snapshotSenderSignatureCmp(SSyncSnapshotSender *pSender, SyncSnapshotRsp *pMsg) {
|
static int32_t snapshotSenderSignatureCmp(SSyncSnapshotSender *pSender, SyncSnapshotRsp *pMsg) {
|
||||||
|
|
Loading…
Reference in New Issue