Merge pull request #19389 from taosdata/fix/TD-21746

fix: crash if reader is already dropped
This commit is contained in:
Shengliang Guan 2023-01-06 08:59:14 +08:00 committed by GitHub
commit 7b78a15824
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 14 additions and 26 deletions

View File

@ -57,7 +57,6 @@ void snapshotSenderDestroy(SSyncSnapshotSender *pSender);
bool snapshotSenderIsStart(SSyncSnapshotSender *pSender);
int32_t snapshotSenderStart(SSyncSnapshotSender *pSender);
void snapshotSenderStop(SSyncSnapshotSender *pSender, bool finish);
int32_t snapshotSend(SSyncSnapshotSender *pSender);
int32_t snapshotReSend(SSyncSnapshotSender *pSender);
typedef struct SSyncSnapshotReceiver {
@ -82,7 +81,6 @@ void snapshotReceiverDestroy(SSyncSnapshotReceiver *pReceiver)
void snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pBeginMsg);
void snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver);
bool snapshotReceiverIsStart(SSyncSnapshotReceiver *pReceiver);
void snapshotReceiverForceStop(SSyncSnapshotReceiver *pReceiver);
// on message
int32_t syncNodeOnSnapshot(SSyncNode *ths, const SRpcMsg *pMsg);

View File

@ -1247,7 +1247,7 @@ void syncNodePreClose(SSyncNode* pSyncNode) {
#if 0
if (pSyncNode->pNewNodeReceiver != NULL) {
if (snapshotReceiverIsStart(pSyncNode->pNewNodeReceiver)) {
snapshotReceiverForceStop(pSyncNode->pNewNodeReceiver);
snapshotReceiverStop(pSyncNode->pNewNodeReceiver);
}
sDebug("vgId:%d, snapshot receiver destroy while preclose sync node, data:%p", pSyncNode->vgId,
@ -1270,7 +1270,7 @@ void syncNodePreClose(SSyncNode* pSyncNode) {
void syncNodePostClose(SSyncNode* pSyncNode) {
if (pSyncNode->pNewNodeReceiver != NULL) {
if (snapshotReceiverIsStart(pSyncNode->pNewNodeReceiver)) {
snapshotReceiverForceStop(pSyncNode->pNewNodeReceiver);
snapshotReceiverStop(pSyncNode->pNewNodeReceiver);
}
sDebug("vgId:%d, snapshot receiver destroy while preclose sync node, data:%p", pSyncNode->vgId,
@ -1325,7 +1325,7 @@ void syncNodeClose(SSyncNode* pSyncNode) {
if (pSyncNode->pNewNodeReceiver != NULL) {
if (snapshotReceiverIsStart(pSyncNode->pNewNodeReceiver)) {
snapshotReceiverForceStop(pSyncNode->pNewNodeReceiver);
snapshotReceiverStop(pSyncNode->pNewNodeReceiver);
}
sDebug("vgId:%d, snapshot receiver destroy while close, data:%p", pSyncNode->vgId, pSyncNode->pNewNodeReceiver);
@ -1855,7 +1855,7 @@ void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr) {
// close receiver
if (pSyncNode != NULL && pSyncNode->pNewNodeReceiver != NULL &&
snapshotReceiverIsStart(pSyncNode->pNewNodeReceiver)) {
snapshotReceiverForceStop(pSyncNode->pNewNodeReceiver);
snapshotReceiverStop(pSyncNode->pNewNodeReceiver);
}
// stop elect timer

View File

@ -150,7 +150,7 @@ void snapshotSenderStop(SSyncSnapshotSender *pSender, bool finish) {
// when sender receive ack, call this function to send msg from seq
// seq = ack + 1, already updated
int32_t snapshotSend(SSyncSnapshotSender *pSender) {
static int32_t snapshotSend(SSyncSnapshotSender *pSender) {
// free memory last time (current seq - 1)
if (pSender->pCurrentBlock != NULL) {
taosMemoryFree(pSender->pCurrentBlock);
@ -342,23 +342,6 @@ void snapshotReceiverDestroy(SSyncSnapshotReceiver *pReceiver) {
bool snapshotReceiverIsStart(SSyncSnapshotReceiver *pReceiver) { return pReceiver->start; }
// force stop
void snapshotReceiverForceStop(SSyncSnapshotReceiver *pReceiver) {
sRInfo(pReceiver, "snapshot receiver force stop, writer:%p");
// force close, abandon incomplete data
if (pReceiver->pWriter != NULL) {
int32_t ret = pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, false,
&pReceiver->snapshot);
if (ret != 0) {
sRInfo(pReceiver, "snapshot receiver force stop failed since %s", terrstr());
}
pReceiver->pWriter = NULL;
}
pReceiver->start = false;
}
static int32_t snapshotReceiverStartWriter(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pBeginMsg) {
if (pReceiver->pWriter != NULL) {
sRError(pReceiver, "vgId:%d, snapshot receiver writer is not null");
@ -590,7 +573,7 @@ _START_RECEIVER:
if (snapshotReceiverIsStart(pReceiver)) {
sRInfo(pReceiver, "snapshot receiver already start and force stop pre one");
snapshotReceiverForceStop(pReceiver);
snapshotReceiverStop(pReceiver);
}
snapshotReceiverStart(pReceiver, pMsg); // set start-time same with sender
@ -842,7 +825,7 @@ int32_t syncNodeOnSnapshot(SSyncNode *pSyncNode, const SRpcMsg *pRpcMsg) {
} else if (pMsg->seq == SYNC_SNAPSHOT_SEQ_FORCE_CLOSE) {
// force close, no response
syncLogRecvSyncSnapshotSend(pSyncNode, pMsg, "process force stop");
snapshotReceiverForceStop(pReceiver);
snapshotReceiverStop(pReceiver);
} else if (pMsg->seq > SYNC_SNAPSHOT_SEQ_BEGIN && pMsg->seq < SYNC_SNAPSHOT_SEQ_END) {
syncLogRecvSyncSnapshotSend(pSyncNode, pMsg, "process seq data");
code = syncNodeOnSnapshotReceive(pSyncNode, pMsg);
@ -989,6 +972,13 @@ int32_t syncNodeOnSnapshotRsp(SSyncNode *pSyncNode, const SRpcMsg *pRpcMsg) {
return syncNodeOnSnapshotPreRsp(pSyncNode, pSender, pMsg);
}
if (pSender->pReader == NULL || pSender->finish) {
syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "snapshot sender invalid");
sSError(pSender, "snapshot sender invalid, pReader:%p finish:%d", pMsg->code, pSender->pReader, pSender->finish);
terrno = pMsg->code;
goto _ERROR;
}
if (pMsg->ack == SYNC_SNAPSHOT_SEQ_BEGIN) {
syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "process seq begin");
if (snapshotSenderUpdateProgress(pSender, pMsg) != 0) {