Merge pull request #28720 from taosdata/fix/fix/TD-32838-add-writer-lock-main

fix:[TD-32838]add-writer-lock
This commit is contained in:
Shengliang Guan 2024-11-13 09:15:30 +08:00 committed by GitHub
commit 09285f9710
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 33 additions and 9 deletions

View File

@ -91,6 +91,7 @@ typedef struct SSyncSnapshotReceiver {
// update when begin // update when begin
void *pWriter; void *pWriter;
TdThreadMutex writerMutex;
SSnapshotParam snapshotParam; SSnapshotParam snapshotParam;
SSnapshot snapshot; SSnapshot snapshot;

View File

@ -429,6 +429,12 @@ int32_t snapshotReceiverCreate(SSyncNode *pSyncNode, SRaftId fromId, SSyncSnapsh
pReceiver->startTime = 0; pReceiver->startTime = 0;
pReceiver->ack = SYNC_SNAPSHOT_SEQ_BEGIN; pReceiver->ack = SYNC_SNAPSHOT_SEQ_BEGIN;
pReceiver->pWriter = NULL; pReceiver->pWriter = NULL;
code = taosThreadMutexInit(&pReceiver->writerMutex, NULL);
if (code != 0) {
taosMemoryFree(pReceiver);
pReceiver = NULL;
TAOS_RETURN(code);
}
pReceiver->pSyncNode = pSyncNode; pReceiver->pSyncNode = pSyncNode;
pReceiver->fromId = fromId; pReceiver->fromId = fromId;
pReceiver->term = raftStoreGetTerm(pSyncNode); pReceiver->term = raftStoreGetTerm(pSyncNode);
@ -440,6 +446,10 @@ int32_t snapshotReceiverCreate(SSyncNode *pSyncNode, SRaftId fromId, SSyncSnapsh
SSyncSnapBuffer *pRcvBuf = NULL; SSyncSnapBuffer *pRcvBuf = NULL;
code = syncSnapBufferCreate(&pRcvBuf); code = syncSnapBufferCreate(&pRcvBuf);
if (pRcvBuf == NULL) { if (pRcvBuf == NULL) {
int32_t ret = taosThreadMutexDestroy(&pReceiver->writerMutex);
if (ret != 0) {
sError("failed to destroy mutex since %s", tstrerror(ret));
}
taosMemoryFree(pReceiver); taosMemoryFree(pReceiver);
pReceiver = NULL; pReceiver = NULL;
TAOS_RETURN(code); TAOS_RETURN(code);
@ -468,6 +478,7 @@ static int32_t snapshotReceiverClearInfoData(SSyncSnapshotReceiver *pReceiver) {
void snapshotReceiverDestroy(SSyncSnapshotReceiver *pReceiver) { void snapshotReceiverDestroy(SSyncSnapshotReceiver *pReceiver) {
if (pReceiver == NULL) return; if (pReceiver == NULL) return;
(void)taosThreadMutexLock(&pReceiver->writerMutex);
// close writer // close writer
if (pReceiver->pWriter != NULL) { if (pReceiver->pWriter != NULL) {
int32_t code = pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, int32_t code = pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter,
@ -478,6 +489,9 @@ void snapshotReceiverDestroy(SSyncSnapshotReceiver *pReceiver) {
} }
pReceiver->pWriter = NULL; pReceiver->pWriter = NULL;
} }
(void)taosThreadMutexUnlock(&pReceiver->writerMutex);
(void)taosThreadMutexDestroy(&pReceiver->writerMutex);
// free snap buf // free snap buf
if (pReceiver->pRcvBuf) { if (pReceiver->pRcvBuf) {
@ -556,7 +570,8 @@ void snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver) {
int8_t stopped = !atomic_val_compare_exchange_8(&pReceiver->start, true, false); int8_t stopped = !atomic_val_compare_exchange_8(&pReceiver->start, true, false);
if (stopped) return; if (stopped) return;
(void)taosThreadMutexLock(&pReceiver->pRcvBuf->mutex);
(void)taosThreadMutexLock(&pReceiver->writerMutex);
{ {
if (pReceiver->pWriter != NULL) { if (pReceiver->pWriter != NULL) {
int32_t code = pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, int32_t code = pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter,
@ -568,7 +583,11 @@ void snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver) {
} else { } else {
sRInfo(pReceiver, "snapshot receiver stop, writer is null"); sRInfo(pReceiver, "snapshot receiver stop, writer is null");
} }
}
(void)taosThreadMutexUnlock(&pReceiver->writerMutex);
(void)taosThreadMutexLock(&pReceiver->pRcvBuf->mutex);
{
syncSnapBufferReset(pReceiver->pRcvBuf); syncSnapBufferReset(pReceiver->pRcvBuf);
(void)snapshotReceiverClearInfoData(pReceiver); (void)snapshotReceiverClearInfoData(pReceiver);
@ -600,6 +619,8 @@ static int32_t snapshotReceiverFinish(SSyncSnapshotReceiver *pReceiver, SyncSnap
raftStoreSetTerm(pReceiver->pSyncNode, pReceiver->snapshot.lastApplyTerm); raftStoreSetTerm(pReceiver->pSyncNode, pReceiver->snapshot.lastApplyTerm);
} }
(void)taosThreadMutexLock(&pReceiver->writerMutex);
if (pReceiver->pWriter != NULL) {
// stop writer, apply data // stop writer, apply data
code = pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, true, code = pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, true,
&pReceiver->snapshot); &pReceiver->snapshot);
@ -609,6 +630,8 @@ static int32_t snapshotReceiverFinish(SSyncSnapshotReceiver *pReceiver, SyncSnap
} }
pReceiver->pWriter = NULL; pReceiver->pWriter = NULL;
sRInfo(pReceiver, "snapshot receiver write stopped"); sRInfo(pReceiver, "snapshot receiver write stopped");
}
(void)taosThreadMutexUnlock(&pReceiver->writerMutex);
// update progress // update progress
pReceiver->ack = SYNC_SNAPSHOT_SEQ_END; pReceiver->ack = SYNC_SNAPSHOT_SEQ_END;