diff --git a/source/libs/sync/inc/syncSnapshot.h b/source/libs/sync/inc/syncSnapshot.h index 974a8f968e..5277e7818f 100644 --- a/source/libs/sync/inc/syncSnapshot.h +++ b/source/libs/sync/inc/syncSnapshot.h @@ -57,7 +57,6 @@ void snapshotSenderDestroy(SSyncSnapshotSender *pSender); bool snapshotSenderIsStart(SSyncSnapshotSender *pSender); int32_t snapshotSenderStart(SSyncSnapshotSender *pSender); void snapshotSenderStop(SSyncSnapshotSender *pSender, bool finish); -int32_t snapshotSend(SSyncSnapshotSender *pSender); int32_t snapshotReSend(SSyncSnapshotSender *pSender); typedef struct SSyncSnapshotReceiver { @@ -82,7 +81,6 @@ void snapshotReceiverDestroy(SSyncSnapshotReceiver *pReceiver) void snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pBeginMsg); void snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver); bool snapshotReceiverIsStart(SSyncSnapshotReceiver *pReceiver); -void snapshotReceiverForceStop(SSyncSnapshotReceiver *pReceiver); // on message int32_t syncNodeOnSnapshot(SSyncNode *ths, const SRpcMsg *pMsg); diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 5b9fb7c59c..91a05e78b0 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -1247,7 +1247,7 @@ void syncNodePreClose(SSyncNode* pSyncNode) { #if 0 if (pSyncNode->pNewNodeReceiver != NULL) { if (snapshotReceiverIsStart(pSyncNode->pNewNodeReceiver)) { - snapshotReceiverForceStop(pSyncNode->pNewNodeReceiver); + snapshotReceiverStop(pSyncNode->pNewNodeReceiver); } sDebug("vgId:%d, snapshot receiver destroy while preclose sync node, data:%p", pSyncNode->vgId, @@ -1270,7 +1270,7 @@ void syncNodePreClose(SSyncNode* pSyncNode) { void syncNodePostClose(SSyncNode* pSyncNode) { if (pSyncNode->pNewNodeReceiver != NULL) { if (snapshotReceiverIsStart(pSyncNode->pNewNodeReceiver)) { - snapshotReceiverForceStop(pSyncNode->pNewNodeReceiver); + snapshotReceiverStop(pSyncNode->pNewNodeReceiver); } sDebug("vgId:%d, snapshot receiver destroy while preclose sync node, data:%p", pSyncNode->vgId, @@ -1325,7 +1325,7 @@ void syncNodeClose(SSyncNode* pSyncNode) { if (pSyncNode->pNewNodeReceiver != NULL) { if (snapshotReceiverIsStart(pSyncNode->pNewNodeReceiver)) { - snapshotReceiverForceStop(pSyncNode->pNewNodeReceiver); + snapshotReceiverStop(pSyncNode->pNewNodeReceiver); } sDebug("vgId:%d, snapshot receiver destroy while close, data:%p", pSyncNode->vgId, pSyncNode->pNewNodeReceiver); @@ -1855,7 +1855,7 @@ void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr) { // close receiver if (pSyncNode != NULL && pSyncNode->pNewNodeReceiver != NULL && snapshotReceiverIsStart(pSyncNode->pNewNodeReceiver)) { - snapshotReceiverForceStop(pSyncNode->pNewNodeReceiver); + snapshotReceiverStop(pSyncNode->pNewNodeReceiver); } // stop elect timer diff --git a/source/libs/sync/src/syncSnapshot.c b/source/libs/sync/src/syncSnapshot.c index b68b735f46..defb7402f4 100644 --- a/source/libs/sync/src/syncSnapshot.c +++ b/source/libs/sync/src/syncSnapshot.c @@ -150,7 +150,7 @@ void snapshotSenderStop(SSyncSnapshotSender *pSender, bool finish) { // when sender receive ack, call this function to send msg from seq // seq = ack + 1, already updated -int32_t snapshotSend(SSyncSnapshotSender *pSender) { +static int32_t snapshotSend(SSyncSnapshotSender *pSender) { // free memory last time (current seq - 1) if (pSender->pCurrentBlock != NULL) { taosMemoryFree(pSender->pCurrentBlock); @@ -342,23 +342,6 @@ void snapshotReceiverDestroy(SSyncSnapshotReceiver *pReceiver) { bool snapshotReceiverIsStart(SSyncSnapshotReceiver *pReceiver) { return pReceiver->start; } -// force stop -void snapshotReceiverForceStop(SSyncSnapshotReceiver *pReceiver) { - sRInfo(pReceiver, "snapshot receiver force stop, writer:%p"); - - // force close, abandon incomplete data - if (pReceiver->pWriter != NULL) { - int32_t ret = pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, false, - &pReceiver->snapshot); - if (ret != 0) { - sRInfo(pReceiver, "snapshot receiver force stop failed since %s", terrstr()); - } - pReceiver->pWriter = NULL; - } - - pReceiver->start = false; -} - static int32_t snapshotReceiverStartWriter(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pBeginMsg) { if (pReceiver->pWriter != NULL) { sRError(pReceiver, "vgId:%d, snapshot receiver writer is not null"); @@ -590,7 +573,7 @@ _START_RECEIVER: if (snapshotReceiverIsStart(pReceiver)) { sRInfo(pReceiver, "snapshot receiver already start and force stop pre one"); - snapshotReceiverForceStop(pReceiver); + snapshotReceiverStop(pReceiver); } snapshotReceiverStart(pReceiver, pMsg); // set start-time same with sender @@ -842,7 +825,7 @@ int32_t syncNodeOnSnapshot(SSyncNode *pSyncNode, const SRpcMsg *pRpcMsg) { } else if (pMsg->seq == SYNC_SNAPSHOT_SEQ_FORCE_CLOSE) { // force close, no response syncLogRecvSyncSnapshotSend(pSyncNode, pMsg, "process force stop"); - snapshotReceiverForceStop(pReceiver); + snapshotReceiverStop(pReceiver); } else if (pMsg->seq > SYNC_SNAPSHOT_SEQ_BEGIN && pMsg->seq < SYNC_SNAPSHOT_SEQ_END) { syncLogRecvSyncSnapshotSend(pSyncNode, pMsg, "process seq data"); code = syncNodeOnSnapshotReceive(pSyncNode, pMsg); @@ -989,6 +972,13 @@ int32_t syncNodeOnSnapshotRsp(SSyncNode *pSyncNode, const SRpcMsg *pRpcMsg) { return syncNodeOnSnapshotPreRsp(pSyncNode, pSender, pMsg); } + if (pSender->pReader == NULL || pSender->finish) { + syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "snapshot sender invalid"); + sSError(pSender, "snapshot sender invalid, pReader:%p finish:%d", pMsg->code, pSender->pReader, pSender->finish); + terrno = pMsg->code; + goto _ERROR; + } + if (pMsg->ack == SYNC_SNAPSHOT_SEQ_BEGIN) { syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "process seq begin"); if (snapshotSenderUpdateProgress(pSender, pMsg) != 0) {