Merge pull request #23501 from taosdata/FIX/TD-26596-3.0

fix: set ack of response properly in syncSnapSendRsp
This commit is contained in:
wade zhang 2023-11-03 11:57:24 +08:00 committed by GitHub
commit 717ddf5107
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 60 additions and 85 deletions

View File

@ -23,6 +23,8 @@
#include "syncReplication.h"
#include "syncUtil.h"
int32_t syncSnapSendMsg(SSyncSnapshotSender *pSender, int32_t seq, void *pBlock, int32_t len, int32_t typ);
static void syncSnapBufferReset(SSyncSnapBuffer *pBuf) {
taosThreadMutexLock(&pBuf->mutex);
for (int64_t i = pBuf->start; i < pBuf->end; ++i) {
@ -160,8 +162,11 @@ int32_t snapshotSenderStart(SSyncSnapshotSender *pSender) {
}
int dataLen = 0;
if (snapInfo.data) {
SSyncTLV *datHead = snapInfo.data;
void *pData = snapInfo.data;
int32_t type = 0;
if (pData) {
type = snapInfo.type;
SSyncTLV *datHead = pData;
if (datHead->typ != TDMT_SYNC_PREP_SNAPSHOT) {
sSError(pSender, "unexpected data typ in data of snapshot info. typ: %d", datHead->typ);
terrno = TSDB_CODE_INVALID_DATA_FMT;
@ -170,37 +175,12 @@ int32_t snapshotSenderStart(SSyncSnapshotSender *pSender) {
dataLen = sizeof(SSyncTLV) + datHead->len;
}
SRpcMsg rpcMsg = {0};
if (syncBuildSnapshotSend(&rpcMsg, dataLen, pSender->pSyncNode->vgId) != 0) {
sSError(pSender, "snapshot sender build msg failed since %s", terrstr());
if (syncSnapSendMsg(pSender, pSender->seq, pData, dataLen, type) != 0) {
goto _out;
}
SyncSnapshotSend *pMsg = rpcMsg.pCont;
pMsg->srcId = pSender->pSyncNode->myRaftId;
pMsg->destId = pSender->pSyncNode->replicasId[pSender->replicaIndex];
pMsg->term = pSender->term;
pMsg->beginIndex = pSender->snapshotParam.start;
pMsg->lastIndex = pSender->snapshot.lastApplyIndex;
pMsg->lastTerm = pSender->snapshot.lastApplyTerm;
pMsg->lastConfigIndex = pSender->snapshot.lastConfigIndex;
pMsg->lastConfig = pSender->lastConfig;
pMsg->startTime = pSender->startTime;
pMsg->seq = pSender->seq;
if (dataLen > 0) {
pMsg->payloadType = snapInfo.type;
memcpy(pMsg->data, snapInfo.data, dataLen);
}
// send msg
if (syncNodeSendMsgById(&pMsg->destId, pSender->pSyncNode, &rpcMsg) != 0) {
sSError(pSender, "snapshot sender send msg failed since %s", terrstr());
goto _out;
}
sSInfo(pSender, "snapshot sender start, to dnode:%d.", DID(&pMsg->destId));
SRaftId destId = pSender->pSyncNode->replicasId[pSender->replicaIndex];
sSInfo(pSender, "snapshot sender start, to dnode:%d.", DID(&destId));
code = 0;
_out:
if (snapInfo.data) {
@ -232,6 +212,43 @@ void snapshotSenderStop(SSyncSnapshotSender *pSender, bool finish) {
sSInfo(pSender, "snapshot sender stop, to dnode:%d, finish:%d", DID(&destId), finish);
}
int32_t syncSnapSendMsg(SSyncSnapshotSender *pSender, int32_t seq, void *pBlock, int32_t blockLen, int32_t typ) {
int32_t code = -1;
SRpcMsg rpcMsg = {0};
if (syncBuildSnapshotSend(&rpcMsg, blockLen, pSender->pSyncNode->vgId) != 0) {
sSError(pSender, "failed to build snap replication msg since %s", terrstr());
goto _OUT;
}
SyncSnapshotSend *pMsg = rpcMsg.pCont;
pMsg->srcId = pSender->pSyncNode->myRaftId;
pMsg->destId = pSender->pSyncNode->replicasId[pSender->replicaIndex];
pMsg->term = pSender->term;
pMsg->beginIndex = pSender->snapshotParam.start;
pMsg->lastIndex = pSender->snapshot.lastApplyIndex;
pMsg->lastTerm = pSender->snapshot.lastApplyTerm;
pMsg->lastConfigIndex = pSender->snapshot.lastConfigIndex;
pMsg->lastConfig = pSender->lastConfig;
pMsg->startTime = pSender->startTime;
pMsg->seq = seq;
if (pBlock != NULL && blockLen > 0) {
memcpy(pMsg->data, pBlock, blockLen);
}
pMsg->payloadType = typ;
// send msg
if (syncNodeSendMsgById(&pMsg->destId, pSender->pSyncNode, &rpcMsg) != 0) {
sSError(pSender, "failed to send snap replication msg since %s. seq:%d", terrstr(), seq);
goto _OUT;
}
code = 0;
_OUT:
return code;
}
// when sender receive ack, call this function to send msg from seq
// seq = ack + 1, already updated
static int32_t snapshotSend(SSyncSnapshotSender *pSender) {
@ -273,33 +290,10 @@ static int32_t snapshotSend(SSyncSnapshotSender *pSender) {
ASSERT(pSender->seq >= SYNC_SNAPSHOT_SEQ_BEGIN && pSender->seq <= SYNC_SNAPSHOT_SEQ_END);
int32_t blockLen = (pBlk != NULL) ? pBlk->blockLen : 0;
// build msg
SRpcMsg rpcMsg = {0};
if (syncBuildSnapshotSend(&rpcMsg, blockLen, pSender->pSyncNode->vgId) != 0) {
sSError(pSender, "vgId:%d, snapshot sender build msg failed since %s", pSender->pSyncNode->vgId, terrstr());
goto _OUT;
}
SyncSnapshotSend *pMsg = rpcMsg.pCont;
pMsg->srcId = pSender->pSyncNode->myRaftId;
pMsg->destId = pSender->pSyncNode->replicasId[pSender->replicaIndex];
pMsg->term = raftStoreGetTerm(pSender->pSyncNode);
pMsg->beginIndex = pSender->snapshotParam.start;
pMsg->lastIndex = pSender->snapshot.lastApplyIndex;
pMsg->lastTerm = pSender->snapshot.lastApplyTerm;
pMsg->lastConfigIndex = pSender->snapshot.lastConfigIndex;
pMsg->lastConfig = pSender->lastConfig;
pMsg->startTime = pSender->startTime;
pMsg->seq = pSender->seq;
if (pBlk != NULL && pBlk->pBlock != NULL && pBlk->blockLen > 0) {
memcpy(pMsg->data, pBlk->pBlock, pBlk->blockLen);
}
// send msg
if (syncNodeSendMsgById(&pMsg->destId, pSender->pSyncNode, &rpcMsg) != 0) {
sSError(pSender, "snapshot sender send msg failed since %s", terrstr());
int32_t blockLen = (pBlk) ? pBlk->blockLen : 0;
void *pBlock = (pBlk) ? pBlk->pBlock : NULL;
if (syncSnapSendMsg(pSender, pSender->seq, pBlock, blockLen, 0) != 0) {
goto _OUT;
}
@ -336,36 +330,17 @@ int32_t snapshotReSend(SSyncSnapshotSender *pSender) {
if (nowMs < pBlk->sendTimeMs + SYNC_SNAP_RESEND_MS) {
continue;
}
// build msg
SRpcMsg rpcMsg = {0};
if (syncBuildSnapshotSend(&rpcMsg, pBlk->blockLen, pSender->pSyncNode->vgId) != 0) {
sSError(pSender, "snapshot sender build msg failed since %s", terrstr());
goto _out;
}
SyncSnapshotSend *pMsg = rpcMsg.pCont;
pMsg->srcId = pSender->pSyncNode->myRaftId;
pMsg->destId = pSender->pSyncNode->replicasId[pSender->replicaIndex];
pMsg->term = pSender->term;
pMsg->beginIndex = pSender->snapshotParam.start;
pMsg->lastIndex = pSender->snapshot.lastApplyIndex;
pMsg->lastTerm = pSender->snapshot.lastApplyTerm;
pMsg->lastConfigIndex = pSender->snapshot.lastConfigIndex;
pMsg->lastConfig = pSender->lastConfig;
pMsg->startTime = pSender->startTime;
pMsg->seq = pBlk->seq;
if (pBlk->pBlock != NULL && pBlk->blockLen > 0) {
memcpy(pMsg->data, pBlk->pBlock, pBlk->blockLen);
}
// send msg
if (syncNodeSendMsgById(&pMsg->destId, pSender->pSyncNode, &rpcMsg) != 0) {
sSError(pSender, "snapshot sender resend msg failed since %s", terrstr());
if (syncSnapSendMsg(pSender, pBlk->seq, pBlk->pBlock, pBlk->blockLen, 0) != 0) {
goto _out;
}
pBlk->sendTimeMs = nowMs;
}
if (pSender->seq == SYNC_SNAPSHOT_SEQ_END && pSndBuf->end <= pSndBuf->start) {
if (syncSnapSendMsg(pSender, pSender->seq, NULL, 0, 0) != 0) {
goto _out;
}
}
code = 0;
_out:;
taosThreadMutexUnlock(&pSndBuf->mutex);
@ -861,7 +836,7 @@ static int32_t syncSnapSendRsp(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSen
pRspMsg->lastIndex = pMsg->lastIndex;
pRspMsg->lastTerm = pMsg->lastTerm;
pRspMsg->startTime = pMsg->startTime;
pRspMsg->ack = pReceiver->ack; // receiver maybe already closed
pRspMsg->ack = pMsg->seq;
pRspMsg->code = code;
pRspMsg->snapBeginIndex = pReceiver->snapshotParam.start;
@ -893,13 +868,13 @@ static int32_t syncSnapBufferRecv(SSyncSnapshotReceiver *pReceiver, SyncSnapshot
pRcvBuf->entries[pMsg->seq % pRcvBuf->size] = pMsg;
ppMsg[0] = NULL;
pRcvBuf->end = TMAX(pMsg->seq + 1, pRcvBuf->end);
} else {
} else if (pMsg->seq < pRcvBuf->start) {
syncSnapSendRsp(pReceiver, pMsg, code);
goto _out;
}
for (int64_t seq = pRcvBuf->cursor + 1; seq < pRcvBuf->end; ++seq) {
if (pRcvBuf->entries[seq]) {
if (pRcvBuf->entries[seq % pRcvBuf->size]) {
pRcvBuf->cursor = seq;
} else {
break;