enh: protect stop of snap sender and receiver with the mutex of its buffer

This commit is contained in:
Benguang Zhao 2023-12-12 19:22:34 +08:00
parent c5cde7ffe8
commit af8a5c0ada
1 changed files with 31 additions and 24 deletions

View File

@ -24,7 +24,6 @@
#include "syncUtil.h"
static void syncSnapBufferReset(SSyncSnapBuffer *pBuf) {
taosThreadMutexLock(&pBuf->mutex);
for (int64_t i = pBuf->start; i < pBuf->end; ++i) {
if (pBuf->entryDeleteCb) {
pBuf->entryDeleteCb(pBuf->entries[i % pBuf->size]);
@ -34,7 +33,6 @@ static void syncSnapBufferReset(SSyncSnapBuffer *pBuf) {
pBuf->start = SYNC_SNAPSHOT_SEQ_BEGIN + 1;
pBuf->end = pBuf->start;
pBuf->cursor = pBuf->start - 1;
taosThreadMutexUnlock(&pBuf->mutex);
}
static void syncSnapBufferDestroy(SSyncSnapBuffer **ppBuf) {
@ -198,20 +196,23 @@ void snapshotSenderStop(SSyncSnapshotSender *pSender, bool finish) {
// update flag
int8_t stopped = !atomic_val_compare_exchange_8(&pSender->start, true, false);
if (stopped) return;
taosThreadMutexLock(&pSender->pSndBuf->mutex);
{
pSender->finish = finish;
pSender->waitTime = -1;
pSender->finish = finish;
pSender->waitTime = -1;
// close reader
if (pSender->pReader != NULL) {
pSender->pSyncNode->pFsm->FpSnapshotStopRead(pSender->pSyncNode->pFsm, pSender->pReader);
pSender->pReader = NULL;
}
// close reader
if (pSender->pReader != NULL) {
pSender->pSyncNode->pFsm->FpSnapshotStopRead(pSender->pSyncNode->pFsm, pSender->pReader);
pSender->pReader = NULL;
syncSnapBufferReset(pSender->pSndBuf);
SRaftId destId = pSender->pSyncNode->replicasId[pSender->replicaIndex];
sSInfo(pSender, "snapshot sender stop, to dnode:%d, finish:%d", DID(&destId), finish);
}
syncSnapBufferReset(pSender->pSndBuf);
SRaftId destId = pSender->pSyncNode->replicasId[pSender->replicaIndex];
sSInfo(pSender, "snapshot sender stop, to dnode:%d, finish:%d", DID(&destId), finish);
taosThreadMutexUnlock(&pSender->pSndBuf->mutex);
}
int32_t syncSnapSendMsg(SSyncSnapshotSender *pSender, int32_t seq, void *pBlock, int32_t blockLen, int32_t typ) {
@ -324,6 +325,9 @@ int32_t snapshotReSend(SSyncSnapshotSender *pSender) {
SSyncSnapBuffer *pSndBuf = pSender->pSndBuf;
int32_t code = -1;
taosThreadMutexLock(&pSndBuf->mutex);
if (pSender->pReader == NULL || pSender->finish || !snapshotSenderIsStart(pSender)) {
goto _out;
}
for (int32_t seq = pSndBuf->cursor + 1; seq < pSndBuf->end; ++seq) {
SyncSnapBlock *pBlk = pSndBuf->entries[seq % pSndBuf->size];
@ -520,19 +524,22 @@ void snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver) {
int8_t stopped = !atomic_val_compare_exchange_8(&pReceiver->start, true, false);
if (stopped) return;
if (pReceiver->pWriter != NULL) {
int32_t ret = pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, false,
&pReceiver->snapshot);
if (ret != 0) {
sRError(pReceiver, "snapshot receiver stop write failed since %s", terrstr());
taosThreadMutexLock(&pReceiver->pRcvBuf->mutex);
{
if (pReceiver->pWriter != NULL) {
int32_t ret = pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter,
false, &pReceiver->snapshot);
if (ret != 0) {
sRError(pReceiver, "snapshot receiver stop write failed since %s", terrstr());
}
pReceiver->pWriter = NULL;
} else {
sRInfo(pReceiver, "snapshot receiver stop, writer is null");
}
pReceiver->pWriter = NULL;
} else {
sRInfo(pReceiver, "snapshot receiver stop, writer is null");
}
syncSnapBufferReset(pReceiver->pRcvBuf);
syncSnapBufferReset(pReceiver->pRcvBuf);
}
taosThreadMutexUnlock(&pReceiver->pRcvBuf->mutex);
}
static int32_t snapshotReceiverFinish(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pMsg) {