Merge pull request #24128 from taosdata/FEAT/TD-27125-3.0

enh: protect processing of snap preparation rsp with sender buf mutex
This commit is contained in:
wade zhang 2023-12-25 10:08:25 +08:00 committed by GitHub
commit 1fc7a1a72e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 11 additions and 4 deletions

View File

@ -1087,7 +1087,10 @@ static int32_t syncSnapSenderExchgSnapInfo(SSyncNode *pSyncNode, SSyncSnapshotSe
// sender
static int32_t syncNodeOnSnapshotPrepRsp(SSyncNode *pSyncNode, SSyncSnapshotSender *pSender, SyncSnapshotRsp *pMsg) {
int32_t code = -1;
SSnapshot snapshot = {0};
taosThreadMutexLock(&pSender->pSndBuf->mutex);
pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
// prepare <begin, end>
@ -1103,20 +1106,24 @@ static int32_t syncNodeOnSnapshotPrepRsp(SSyncNode *pSyncNode, SSyncSnapshotSend
// start reader
if (pMsg->payloadType == TDMT_SYNC_PREP_SNAPSHOT_REPLY) {
if (syncSnapSenderExchgSnapInfo(pSyncNode, pSender, pMsg) != 0) {
return -1;
goto _out;
}
}
int32_t code = pSyncNode->pFsm->FpSnapshotStartRead(pSyncNode->pFsm, &pSender->snapshotParam, &pSender->pReader);
code = pSyncNode->pFsm->FpSnapshotStartRead(pSyncNode->pFsm, &pSender->snapshotParam, &pSender->pReader);
if (code != 0) {
sSError(pSender, "prepare snapshot failed since %s", terrstr());
return -1;
goto _out;
}
// update next index
syncIndexMgrSetIndex(pSyncNode->pNextIndex, &pMsg->srcId, snapshot.lastApplyIndex + 1);
return snapshotSend(pSender);
code = snapshotSend(pSender);
_out:
taosThreadMutexUnlock(&pSender->pSndBuf->mutex);
return code;
}
static int32_t snapshotSenderSignatureCmp(SSyncSnapshotSender *pSender, SyncSnapshotRsp *pMsg) {