enh: save a copy of snapshot info in syncNodeOnSnapshotPrepRsp
This commit is contained in:
parent
030f3db4d6
commit
081c83710e
|
@ -121,6 +121,11 @@ void snapshotSenderDestroy(SSyncSnapshotSender *pSender) {
|
||||||
if (pSender->pSndBuf) {
|
if (pSender->pSndBuf) {
|
||||||
syncSnapBufferDestroy(&pSender->pSndBuf);
|
syncSnapBufferDestroy(&pSender->pSndBuf);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (pSender->snapshotParam.data) {
|
||||||
|
taosMemoryFree(pSender->snapshotParam.data);
|
||||||
|
pSender->snapshotParam.data = NULL;
|
||||||
|
}
|
||||||
// free sender
|
// free sender
|
||||||
taosMemoryFree(pSender);
|
taosMemoryFree(pSender);
|
||||||
}
|
}
|
||||||
|
@ -344,9 +349,6 @@ _out:;
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
// return 0, start ok
|
|
||||||
// return 1, last snapshot finish ok
|
|
||||||
// return -1, error
|
|
||||||
int32_t syncNodeStartSnapshot(SSyncNode *pSyncNode, SRaftId *pDestId) {
|
int32_t syncNodeStartSnapshot(SSyncNode *pSyncNode, SRaftId *pDestId) {
|
||||||
SSyncSnapshotSender *pSender = syncNodeGetSnapshotSender(pSyncNode, pDestId);
|
SSyncSnapshotSender *pSender = syncNodeGetSnapshotSender(pSyncNode, pDestId);
|
||||||
if (pSender == NULL) {
|
if (pSender == NULL) {
|
||||||
|
@ -377,6 +379,7 @@ int32_t syncNodeStartSnapshot(SSyncNode *pSyncNode, SRaftId *pDestId) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// receiver
|
||||||
SSyncSnapshotReceiver *snapshotReceiverCreate(SSyncNode *pSyncNode, SRaftId fromId) {
|
SSyncSnapshotReceiver *snapshotReceiverCreate(SSyncNode *pSyncNode, SRaftId fromId) {
|
||||||
bool condition = (pSyncNode->pFsm->FpSnapshotStartWrite != NULL) && (pSyncNode->pFsm->FpSnapshotStopWrite != NULL) &&
|
bool condition = (pSyncNode->pFsm->FpSnapshotStartWrite != NULL) && (pSyncNode->pFsm->FpSnapshotStopWrite != NULL) &&
|
||||||
(pSyncNode->pFsm->FpSnapshotDoWrite != NULL);
|
(pSyncNode->pFsm->FpSnapshotDoWrite != NULL);
|
||||||
|
@ -506,8 +509,6 @@ void snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *p
|
||||||
sRInfo(pReceiver, "snapshot receiver start, from dnode:%d.", DID(&pReceiver->fromId));
|
sRInfo(pReceiver, "snapshot receiver start, from dnode:%d.", DID(&pReceiver->fromId));
|
||||||
}
|
}
|
||||||
|
|
||||||
// just set start = false
|
|
||||||
// FpSnapshotStopWrite should not be called
|
|
||||||
void snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver) {
|
void snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver) {
|
||||||
sRDebug(pReceiver, "snapshot receiver stop, not apply, writer:%p", pReceiver->pWriter);
|
sRDebug(pReceiver, "snapshot receiver stop, not apply, writer:%p", pReceiver->pWriter);
|
||||||
|
|
||||||
|
@ -528,7 +529,6 @@ void snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver) {
|
||||||
syncSnapBufferReset(pReceiver->pRcvBuf);
|
syncSnapBufferReset(pReceiver->pRcvBuf);
|
||||||
}
|
}
|
||||||
|
|
||||||
// when recv last snapshot block, apply data into snapshot
|
|
||||||
static int32_t snapshotReceiverFinish(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pMsg) {
|
static int32_t snapshotReceiverFinish(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pMsg) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
if (pReceiver->pWriter != NULL) {
|
if (pReceiver->pWriter != NULL) {
|
||||||
|
@ -587,8 +587,6 @@ static int32_t snapshotReceiverFinish(SSyncSnapshotReceiver *pReceiver, SyncSnap
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
// apply data block
|
|
||||||
// update progress
|
|
||||||
static int32_t snapshotReceiverGotData(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pMsg) {
|
static int32_t snapshotReceiverGotData(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pMsg) {
|
||||||
if (pMsg->seq != pReceiver->ack + 1) {
|
if (pMsg->seq != pReceiver->ack + 1) {
|
||||||
sRError(pReceiver, "snapshot receiver invalid seq, ack:%d seq:%d", pReceiver->ack, pMsg->seq);
|
sRError(pReceiver, "snapshot receiver invalid seq, ack:%d seq:%d", pReceiver->ack, pMsg->seq);
|
||||||
|
@ -641,8 +639,8 @@ SyncIndex syncNodeGetSnapBeginIndex(SSyncNode *ths) {
|
||||||
return snapStart;
|
return snapStart;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t syncNodeExchangeSnapInfo(SSyncNode *pSyncNode, SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pMsg,
|
static int32_t syncSnapReceiverExchgSnapInfo(SSyncNode *pSyncNode, SSyncSnapshotReceiver *pReceiver,
|
||||||
SSnapshot *pInfo) {
|
SyncSnapshotSend *pMsg, SSnapshot *pInfo) {
|
||||||
ASSERT(pMsg->payloadType == TDMT_SYNC_PREP_SNAPSHOT);
|
ASSERT(pMsg->payloadType == TDMT_SYNC_PREP_SNAPSHOT);
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
|
||||||
|
@ -734,7 +732,7 @@ _SEND_REPLY:;
|
||||||
SSnapshot snapInfo = {.type = TDMT_SYNC_PREP_SNAPSHOT_REPLY};
|
SSnapshot snapInfo = {.type = TDMT_SYNC_PREP_SNAPSHOT_REPLY};
|
||||||
int32_t dataLen = 0;
|
int32_t dataLen = 0;
|
||||||
if (pMsg->payloadType == TDMT_SYNC_PREP_SNAPSHOT) {
|
if (pMsg->payloadType == TDMT_SYNC_PREP_SNAPSHOT) {
|
||||||
if (syncNodeExchangeSnapInfo(pSyncNode, pReceiver, pMsg, &snapInfo) != 0) {
|
if (syncSnapReceiverExchgSnapInfo(pSyncNode, pReceiver, pMsg, &snapInfo) != 0) {
|
||||||
goto _out;
|
goto _out;
|
||||||
}
|
}
|
||||||
SSyncTLV *datHead = snapInfo.data;
|
SSyncTLV *datHead = snapInfo.data;
|
||||||
|
@ -949,26 +947,6 @@ _SEND_REPLY:;
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
// receiver on message
|
|
||||||
//
|
|
||||||
// condition 1, recv SYNC_SNAPSHOT_SEQ_PREP
|
|
||||||
// if receiver already start
|
|
||||||
// if sender.start-time > receiver.start-time, restart receiver(reply snapshot start)
|
|
||||||
// if sender.start-time = receiver.start-time, maybe duplicate msg
|
|
||||||
// if sender.start-time < receiver.start-time, ignore
|
|
||||||
// else
|
|
||||||
// waiting for clock match
|
|
||||||
// start receiver(reply snapshot start)
|
|
||||||
//
|
|
||||||
// condition 2, recv SYNC_SNAPSHOT_SEQ_BEGIN
|
|
||||||
// a. create writer with <begin, end>
|
|
||||||
//
|
|
||||||
// condition 3, recv SYNC_SNAPSHOT_SEQ_END, finish receiver(apply snapshot data, update commit index, maybe reconfig)
|
|
||||||
//
|
|
||||||
// condition 4, recv SYNC_SNAPSHOT_SEQ_FORCE_CLOSE, force close
|
|
||||||
//
|
|
||||||
// condition 5, got data, update ack
|
|
||||||
//
|
|
||||||
int32_t syncNodeOnSnapshot(SSyncNode *pSyncNode, SRpcMsg *pRpcMsg) {
|
int32_t syncNodeOnSnapshot(SSyncNode *pSyncNode, SRpcMsg *pRpcMsg) {
|
||||||
SyncSnapshotSend **ppMsg = (SyncSnapshotSend **)&pRpcMsg->pCont;
|
SyncSnapshotSend **ppMsg = (SyncSnapshotSend **)&pRpcMsg->pCont;
|
||||||
SyncSnapshotSend *pMsg = ppMsg[0];
|
SyncSnapshotSend *pMsg = ppMsg[0];
|
||||||
|
@ -1052,6 +1030,32 @@ _out:;
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t syncSnapSenderExchgSnapInfo(SSyncNode *pSyncNode, SSyncSnapshotSender *pSender, SyncSnapshotRsp *pMsg) {
|
||||||
|
ASSERT(pMsg->payloadType == TDMT_SYNC_PREP_SNAPSHOT_REPLY);
|
||||||
|
|
||||||
|
SSyncTLV *datHead = (void *)pMsg->data;
|
||||||
|
if (datHead->typ != pMsg->payloadType) {
|
||||||
|
sSError(pSender, "unexpected data type in data of SyncSnapshotRsp. typ: %d", datHead->typ);
|
||||||
|
terrno = TSDB_CODE_INVALID_DATA_FMT;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
int32_t dataLen = sizeof(SSyncTLV) + datHead->len;
|
||||||
|
|
||||||
|
SSnapshotParam *pParam = &pSender->snapshotParam;
|
||||||
|
void *data = taosMemoryRealloc(pParam->data, dataLen);
|
||||||
|
if (data == NULL) {
|
||||||
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
memcpy(data, pMsg->data, dataLen);
|
||||||
|
|
||||||
|
pParam->data = data;
|
||||||
|
data = NULL;
|
||||||
|
sSInfo(pSender, "data of snapshot param. len: %d", datHead->len);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
// sender
|
||||||
static int32_t syncNodeOnSnapshotPrepRsp(SSyncNode *pSyncNode, SSyncSnapshotSender *pSender, SyncSnapshotRsp *pMsg) {
|
static int32_t syncNodeOnSnapshotPrepRsp(SSyncNode *pSyncNode, SSyncSnapshotSender *pSender, SyncSnapshotRsp *pMsg) {
|
||||||
SSnapshot snapshot = {0};
|
SSnapshot snapshot = {0};
|
||||||
pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
|
pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
|
||||||
|
@ -1068,14 +1072,9 @@ static int32_t syncNodeOnSnapshotPrepRsp(SSyncNode *pSyncNode, SSyncSnapshotSend
|
||||||
|
|
||||||
// start reader
|
// start reader
|
||||||
if (pMsg->payloadType == TDMT_SYNC_PREP_SNAPSHOT_REPLY) {
|
if (pMsg->payloadType == TDMT_SYNC_PREP_SNAPSHOT_REPLY) {
|
||||||
SSyncTLV *datHead = (void *)pMsg->data;
|
if (syncSnapSenderExchgSnapInfo(pSyncNode, pSender, pMsg) != 0) {
|
||||||
if (datHead->typ != pMsg->payloadType) {
|
|
||||||
sSError(pSender, "unexpected data type in data of SyncSnapshotRsp. typ: %d", datHead->typ);
|
|
||||||
terrno = TSDB_CODE_INVALID_DATA_FMT;
|
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
pSender->snapshotParam.data = (void *)pMsg->data;
|
|
||||||
sSInfo(pSender, "data of snapshot param. len: %d", datHead->len);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t code = pSyncNode->pFsm->FpSnapshotStartRead(pSyncNode->pFsm, &pSender->snapshotParam, &pSender->pReader);
|
int32_t code = pSyncNode->pFsm->FpSnapshotStartRead(pSyncNode->pFsm, &pSender->snapshotParam, &pSender->pReader);
|
||||||
|
@ -1160,12 +1159,6 @@ _out:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
// sender on message
|
|
||||||
//
|
|
||||||
// condition 1 sender receives SYNC_SNAPSHOT_SEQ_END, close sender
|
|
||||||
// condition 2 sender receives ack, set seq = ack + 1, send msg from seq
|
|
||||||
// condition 3 sender receives error msg, just print error log
|
|
||||||
//
|
|
||||||
int32_t syncNodeOnSnapshotRsp(SSyncNode *pSyncNode, SRpcMsg *pRpcMsg) {
|
int32_t syncNodeOnSnapshotRsp(SSyncNode *pSyncNode, SRpcMsg *pRpcMsg) {
|
||||||
SyncSnapshotRsp **ppMsg = (SyncSnapshotRsp **)&pRpcMsg->pCont;
|
SyncSnapshotRsp **ppMsg = (SyncSnapshotRsp **)&pRpcMsg->pCont;
|
||||||
SyncSnapshotRsp *pMsg = ppMsg[0];
|
SyncSnapshotRsp *pMsg = ppMsg[0];
|
||||||
|
|
Loading…
Reference in New Issue