fix/TD-32838-add-writer-lock

This commit is contained in:
dmchen 2024-11-11 12:00:09 +08:00
parent 82cf87369a
commit aedeca9ecb
2 changed files with 24 additions and 9 deletions

View File

@ -91,6 +91,7 @@ typedef struct SSyncSnapshotReceiver {
// update when begin
void *pWriter;
TdThreadMutex writerMutex;
SSnapshotParam snapshotParam;
SSnapshot snapshot;

View File

@ -429,6 +429,7 @@ int32_t snapshotReceiverCreate(SSyncNode *pSyncNode, SRaftId fromId, SSyncSnapsh
pReceiver->startTime = 0;
pReceiver->ack = SYNC_SNAPSHOT_SEQ_BEGIN;
pReceiver->pWriter = NULL;
(void)taosThreadMutexInit(&pReceiver->writerMutex, NULL);
pReceiver->pSyncNode = pSyncNode;
pReceiver->fromId = fromId;
pReceiver->term = raftStoreGetTerm(pSyncNode);
@ -468,6 +469,7 @@ static int32_t snapshotReceiverClearInfoData(SSyncSnapshotReceiver *pReceiver) {
void snapshotReceiverDestroy(SSyncSnapshotReceiver *pReceiver) {
if (pReceiver == NULL) return;
(void)taosThreadMutexLock(&pReceiver->writerMutex);
// close writer
if (pReceiver->pWriter != NULL) {
int32_t code = pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter,
@ -478,6 +480,9 @@ void snapshotReceiverDestroy(SSyncSnapshotReceiver *pReceiver) {
}
pReceiver->pWriter = NULL;
}
(void)taosThreadMutexUnlock(&pReceiver->writerMutex);
(void)taosThreadMutexDestroy(&pReceiver->writerMutex);
// free snap buf
if (pReceiver->pRcvBuf) {
@ -556,7 +561,8 @@ void snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver) {
int8_t stopped = !atomic_val_compare_exchange_8(&pReceiver->start, true, false);
if (stopped) return;
(void)taosThreadMutexLock(&pReceiver->pRcvBuf->mutex);
(void)taosThreadMutexLock(&pReceiver->writerMutex);
{
if (pReceiver->pWriter != NULL) {
int32_t code = pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter,
@ -568,7 +574,11 @@ void snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver) {
} else {
sRInfo(pReceiver, "snapshot receiver stop, writer is null");
}
}
(void)taosThreadMutexUnlock(&pReceiver->writerMutex);
(void)taosThreadMutexLock(&pReceiver->pRcvBuf->mutex);
{
syncSnapBufferReset(pReceiver->pRcvBuf);
(void)snapshotReceiverClearInfoData(pReceiver);
@ -600,15 +610,19 @@ static int32_t snapshotReceiverFinish(SSyncSnapshotReceiver *pReceiver, SyncSnap
raftStoreSetTerm(pReceiver->pSyncNode, pReceiver->snapshot.lastApplyTerm);
}
// stop writer, apply data
code = pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, true,
&pReceiver->snapshot);
if (code != 0) {
sRError(pReceiver, "snapshot receiver apply failed since %s", tstrerror(code));
TAOS_RETURN(code);
(void)taosThreadMutexLock(&pReceiver->writerMutex);
if (pReceiver->pWriter != NULL) {
// stop writer, apply data
code = pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, true,
&pReceiver->snapshot);
if (code != 0) {
sRError(pReceiver, "snapshot receiver apply failed since %s", tstrerror(code));
TAOS_RETURN(code);
}
pReceiver->pWriter = NULL;
sRInfo(pReceiver, "snapshot receiver write stopped");
}
pReceiver->pWriter = NULL;
sRInfo(pReceiver, "snapshot receiver write stopped");
(void)taosThreadMutexUnlock(&pReceiver->writerMutex);
// update progress
pReceiver->ack = SYNC_SNAPSHOT_SEQ_END;