diff --git a/source/libs/sync/src/syncSnapshot.c b/source/libs/sync/src/syncSnapshot.c index 25b620854e..1e3614857e 100644 --- a/source/libs/sync/src/syncSnapshot.c +++ b/source/libs/sync/src/syncSnapshot.c @@ -121,6 +121,11 @@ void snapshotSenderDestroy(SSyncSnapshotSender *pSender) { if (pSender->pSndBuf) { syncSnapBufferDestroy(&pSender->pSndBuf); } + + if (pSender->snapshotParam.data) { + taosMemoryFree(pSender->snapshotParam.data); + pSender->snapshotParam.data = NULL; + } // free sender taosMemoryFree(pSender); } @@ -344,9 +349,6 @@ _out:; return code; } -// return 0, start ok -// return 1, last snapshot finish ok -// return -1, error int32_t syncNodeStartSnapshot(SSyncNode *pSyncNode, SRaftId *pDestId) { SSyncSnapshotSender *pSender = syncNodeGetSnapshotSender(pSyncNode, pDestId); if (pSender == NULL) { @@ -377,6 +379,7 @@ int32_t syncNodeStartSnapshot(SSyncNode *pSyncNode, SRaftId *pDestId) { return 0; } +// receiver SSyncSnapshotReceiver *snapshotReceiverCreate(SSyncNode *pSyncNode, SRaftId fromId) { bool condition = (pSyncNode->pFsm->FpSnapshotStartWrite != NULL) && (pSyncNode->pFsm->FpSnapshotStopWrite != NULL) && (pSyncNode->pFsm->FpSnapshotDoWrite != NULL); @@ -506,8 +509,6 @@ void snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *p sRInfo(pReceiver, "snapshot receiver start, from dnode:%d.", DID(&pReceiver->fromId)); } -// just set start = false -// FpSnapshotStopWrite should not be called void snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver) { sRDebug(pReceiver, "snapshot receiver stop, not apply, writer:%p", pReceiver->pWriter); @@ -528,7 +529,6 @@ void snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver) { syncSnapBufferReset(pReceiver->pRcvBuf); } -// when recv last snapshot block, apply data into snapshot static int32_t snapshotReceiverFinish(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pMsg) { int32_t code = 0; if (pReceiver->pWriter != NULL) { @@ -587,8 +587,6 @@ static int32_t snapshotReceiverFinish(SSyncSnapshotReceiver *pReceiver, SyncSnap return 0; } -// apply data block -// update progress static int32_t snapshotReceiverGotData(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pMsg) { if (pMsg->seq != pReceiver->ack + 1) { sRError(pReceiver, "snapshot receiver invalid seq, ack:%d seq:%d", pReceiver->ack, pMsg->seq); @@ -641,8 +639,8 @@ SyncIndex syncNodeGetSnapBeginIndex(SSyncNode *ths) { return snapStart; } -static int32_t syncNodeExchangeSnapInfo(SSyncNode *pSyncNode, SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pMsg, - SSnapshot *pInfo) { +static int32_t syncSnapReceiverExchgSnapInfo(SSyncNode *pSyncNode, SSyncSnapshotReceiver *pReceiver, + SyncSnapshotSend *pMsg, SSnapshot *pInfo) { ASSERT(pMsg->payloadType == TDMT_SYNC_PREP_SNAPSHOT); int32_t code = 0; @@ -734,7 +732,7 @@ _SEND_REPLY:; SSnapshot snapInfo = {.type = TDMT_SYNC_PREP_SNAPSHOT_REPLY}; int32_t dataLen = 0; if (pMsg->payloadType == TDMT_SYNC_PREP_SNAPSHOT) { - if (syncNodeExchangeSnapInfo(pSyncNode, pReceiver, pMsg, &snapInfo) != 0) { + if (syncSnapReceiverExchgSnapInfo(pSyncNode, pReceiver, pMsg, &snapInfo) != 0) { goto _out; } SSyncTLV *datHead = snapInfo.data; @@ -949,26 +947,6 @@ _SEND_REPLY:; return code; } -// receiver on message -// -// condition 1, recv SYNC_SNAPSHOT_SEQ_PREP -// if receiver already start -// if sender.start-time > receiver.start-time, restart receiver(reply snapshot start) -// if sender.start-time = receiver.start-time, maybe duplicate msg -// if sender.start-time < receiver.start-time, ignore -// else -// waiting for clock match -// start receiver(reply snapshot start) -// -// condition 2, recv SYNC_SNAPSHOT_SEQ_BEGIN -// a. create writer with -// -// condition 3, recv SYNC_SNAPSHOT_SEQ_END, finish receiver(apply snapshot data, update commit index, maybe reconfig) -// -// condition 4, recv SYNC_SNAPSHOT_SEQ_FORCE_CLOSE, force close -// -// condition 5, got data, update ack -// int32_t syncNodeOnSnapshot(SSyncNode *pSyncNode, SRpcMsg *pRpcMsg) { SyncSnapshotSend **ppMsg = (SyncSnapshotSend **)&pRpcMsg->pCont; SyncSnapshotSend *pMsg = ppMsg[0]; @@ -1052,6 +1030,32 @@ _out:; return code; } +static int32_t syncSnapSenderExchgSnapInfo(SSyncNode *pSyncNode, SSyncSnapshotSender *pSender, SyncSnapshotRsp *pMsg) { + ASSERT(pMsg->payloadType == TDMT_SYNC_PREP_SNAPSHOT_REPLY); + + SSyncTLV *datHead = (void *)pMsg->data; + if (datHead->typ != pMsg->payloadType) { + sSError(pSender, "unexpected data type in data of SyncSnapshotRsp. typ: %d", datHead->typ); + terrno = TSDB_CODE_INVALID_DATA_FMT; + return -1; + } + int32_t dataLen = sizeof(SSyncTLV) + datHead->len; + + SSnapshotParam *pParam = &pSender->snapshotParam; + void *data = taosMemoryRealloc(pParam->data, dataLen); + if (data == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + memcpy(data, pMsg->data, dataLen); + + pParam->data = data; + data = NULL; + sSInfo(pSender, "data of snapshot param. len: %d", datHead->len); + return 0; +} + +// sender static int32_t syncNodeOnSnapshotPrepRsp(SSyncNode *pSyncNode, SSyncSnapshotSender *pSender, SyncSnapshotRsp *pMsg) { SSnapshot snapshot = {0}; pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot); @@ -1068,14 +1072,9 @@ static int32_t syncNodeOnSnapshotPrepRsp(SSyncNode *pSyncNode, SSyncSnapshotSend // start reader if (pMsg->payloadType == TDMT_SYNC_PREP_SNAPSHOT_REPLY) { - SSyncTLV *datHead = (void *)pMsg->data; - if (datHead->typ != pMsg->payloadType) { - sSError(pSender, "unexpected data type in data of SyncSnapshotRsp. typ: %d", datHead->typ); - terrno = TSDB_CODE_INVALID_DATA_FMT; + if (syncSnapSenderExchgSnapInfo(pSyncNode, pSender, pMsg) != 0) { return -1; } - pSender->snapshotParam.data = (void *)pMsg->data; - sSInfo(pSender, "data of snapshot param. len: %d", datHead->len); } int32_t code = pSyncNode->pFsm->FpSnapshotStartRead(pSyncNode->pFsm, &pSender->snapshotParam, &pSender->pReader); @@ -1160,12 +1159,6 @@ _out: return code; } -// sender on message -// -// condition 1 sender receives SYNC_SNAPSHOT_SEQ_END, close sender -// condition 2 sender receives ack, set seq = ack + 1, send msg from seq -// condition 3 sender receives error msg, just print error log -// int32_t syncNodeOnSnapshotRsp(SSyncNode *pSyncNode, SRpcMsg *pRpcMsg) { SyncSnapshotRsp **ppMsg = (SyncSnapshotRsp **)&pRpcMsg->pCont; SyncSnapshotRsp *pMsg = ppMsg[0];