fix: crash if reader is already dropped

This commit is contained in:
Shengliang Guan 2023-01-05 15:55:33 +08:00
parent e34378184d
commit ed56abd40f
1 changed files with 10 additions and 20 deletions

View File

@ -150,7 +150,7 @@ void snapshotSenderStop(SSyncSnapshotSender *pSender, bool finish) {
// when sender receive ack, call this function to send msg from seq // when sender receive ack, call this function to send msg from seq
// seq = ack + 1, already updated // seq = ack + 1, already updated
int32_t snapshotSend(SSyncSnapshotSender *pSender) { static int32_t snapshotSend(SSyncSnapshotSender *pSender) {
// free memory last time (current seq - 1) // free memory last time (current seq - 1)
if (pSender->pCurrentBlock != NULL) { if (pSender->pCurrentBlock != NULL) {
taosMemoryFree(pSender->pCurrentBlock); taosMemoryFree(pSender->pCurrentBlock);
@ -342,23 +342,6 @@ void snapshotReceiverDestroy(SSyncSnapshotReceiver *pReceiver) {
bool snapshotReceiverIsStart(SSyncSnapshotReceiver *pReceiver) { return pReceiver->start; } bool snapshotReceiverIsStart(SSyncSnapshotReceiver *pReceiver) { return pReceiver->start; }
// force stop
void snapshotReceiverForceStop(SSyncSnapshotReceiver *pReceiver) {
sRInfo(pReceiver, "snapshot receiver force stop, writer:%p");
// force close, abandon incomplete data
if (pReceiver->pWriter != NULL) {
int32_t ret = pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, false,
&pReceiver->snapshot);
if (ret != 0) {
sRInfo(pReceiver, "snapshot receiver force stop failed since %s", terrstr());
}
pReceiver->pWriter = NULL;
}
pReceiver->start = false;
}
static int32_t snapshotReceiverStartWriter(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pBeginMsg) { static int32_t snapshotReceiverStartWriter(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pBeginMsg) {
if (pReceiver->pWriter != NULL) { if (pReceiver->pWriter != NULL) {
sRError(pReceiver, "vgId:%d, snapshot receiver writer is not null"); sRError(pReceiver, "vgId:%d, snapshot receiver writer is not null");
@ -590,7 +573,7 @@ _START_RECEIVER:
if (snapshotReceiverIsStart(pReceiver)) { if (snapshotReceiverIsStart(pReceiver)) {
sRInfo(pReceiver, "snapshot receiver already start and force stop pre one"); sRInfo(pReceiver, "snapshot receiver already start and force stop pre one");
snapshotReceiverForceStop(pReceiver); snapshotReceiverStop(pReceiver);
} }
snapshotReceiverStart(pReceiver, pMsg); // set start-time same with sender snapshotReceiverStart(pReceiver, pMsg); // set start-time same with sender
@ -842,7 +825,7 @@ int32_t syncNodeOnSnapshot(SSyncNode *pSyncNode, const SRpcMsg *pRpcMsg) {
} else if (pMsg->seq == SYNC_SNAPSHOT_SEQ_FORCE_CLOSE) { } else if (pMsg->seq == SYNC_SNAPSHOT_SEQ_FORCE_CLOSE) {
// force close, no response // force close, no response
syncLogRecvSyncSnapshotSend(pSyncNode, pMsg, "process force stop"); syncLogRecvSyncSnapshotSend(pSyncNode, pMsg, "process force stop");
snapshotReceiverForceStop(pReceiver); snapshotReceiverStop(pReceiver);
} else if (pMsg->seq > SYNC_SNAPSHOT_SEQ_BEGIN && pMsg->seq < SYNC_SNAPSHOT_SEQ_END) { } else if (pMsg->seq > SYNC_SNAPSHOT_SEQ_BEGIN && pMsg->seq < SYNC_SNAPSHOT_SEQ_END) {
syncLogRecvSyncSnapshotSend(pSyncNode, pMsg, "process seq data"); syncLogRecvSyncSnapshotSend(pSyncNode, pMsg, "process seq data");
code = syncNodeOnSnapshotReceive(pSyncNode, pMsg); code = syncNodeOnSnapshotReceive(pSyncNode, pMsg);
@ -989,6 +972,13 @@ int32_t syncNodeOnSnapshotRsp(SSyncNode *pSyncNode, const SRpcMsg *pRpcMsg) {
return syncNodeOnSnapshotPreRsp(pSyncNode, pSender, pMsg); return syncNodeOnSnapshotPreRsp(pSyncNode, pSender, pMsg);
} }
if (pSender->pReader == NULL || pSender->finish) {
syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "snapshot sender invalid");
sSError(pSender, "snapshot sender invalid, pReader:%p finish:%d", pMsg->code, pSender->pReader, pSender->finish);
terrno = pMsg->code;
goto _ERROR;
}
if (pMsg->ack == SYNC_SNAPSHOT_SEQ_BEGIN) { if (pMsg->ack == SYNC_SNAPSHOT_SEQ_BEGIN) {
syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "process seq begin"); syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "process seq begin");
if (snapshotSenderUpdateProgress(pSender, pMsg) != 0) { if (snapshotSenderUpdateProgress(pSender, pMsg) != 0) {