enh: cache snap blocks sent in snapshotSend for resending

This commit is contained in:
Benguang Zhao 2023-10-25 20:25:57 +08:00
parent ec5b1f2ec1
commit fe553352b8
2 changed files with 81 additions and 40 deletions

View File

@ -37,8 +37,21 @@ typedef struct SSyncSnapBuffer {
int64_t end;
int64_t size;
TdThreadMutex mutex;
void (*entryDeleteCb)(void *ptr);
} SSyncSnapBuffer;
typedef struct SyncSnapBlock {
int32_t seq;
int8_t acked;
int64_t sendTimeMs;
int16_t blockType;
void *pBlock;
int32_t blockLen;
} SyncSnapBlock;
void syncSnapBlockDestroy(void *ptr);
typedef struct SSyncSnapshotSender {
int8_t start;
int32_t seq;

View File

@ -26,7 +26,9 @@
static void syncSnapBufferReset(SSyncSnapBuffer *pBuf) {
taosThreadMutexLock(&pBuf->mutex);
for (int64_t i = pBuf->start; i < pBuf->end; ++i) {
rpcFreeCont(pBuf->entries[i % pBuf->size]);
if (pBuf->entryDeleteCb) {
pBuf->entryDeleteCb(pBuf->entries[i % pBuf->size]);
}
pBuf->entries[i % pBuf->size] = NULL;
}
pBuf->start = 1;
@ -85,17 +87,29 @@ SSyncSnapshotSender *snapshotSenderCreate(SSyncNode *pSyncNode, int32_t replicaI
pSender->pSyncNode->pFsm->FpGetSnapshotInfo(pSender->pSyncNode->pFsm, &pSender->snapshot);
pSender->finish = false;
pSender->pSndBuf = syncSnapBufferCreate();
if (pSender->pSndBuf == NULL) {
SSyncSnapBuffer *pSndBuf = syncSnapBufferCreate();
if (pSndBuf == NULL) {
taosMemoryFree(pSender);
pSender = NULL;
return NULL;
}
syncSnapBufferReset(pSender->pSndBuf);
pSndBuf->entryDeleteCb = syncSnapBlockDestroy;
pSender->pSndBuf = pSndBuf;
syncSnapBufferReset(pSender->pSndBuf);
return pSender;
}
void syncSnapBlockDestroy(void *ptr) {
SyncSnapBlock *pBlk = ptr;
if (pBlk->pBlock != NULL) {
taosMemoryFree(pBlk->pBlock);
pBlk->pBlock = NULL;
pBlk->blockLen = 0;
}
taosMemoryFree(pBlk);
}
void snapshotSenderDestroy(SSyncSnapshotSender *pSender) {
if (pSender == NULL) return;
@ -238,23 +252,26 @@ void snapshotSenderStop(SSyncSnapshotSender *pSender, bool finish) {
// when sender receive ack, call this function to send msg from seq
// seq = ack + 1, already updated
static int32_t snapshotSend(SSyncSnapshotSender *pSender) {
// free memory last time (current seq - 1)
if (pSender->pCurrentBlock != NULL) {
taosMemoryFree(pSender->pCurrentBlock);
pSender->pCurrentBlock = NULL;
pSender->blockLen = 0;
}
int32_t code = -1;
SyncSnapBlock *pBlk = NULL;
if (pSender->seq != SYNC_SNAPSHOT_SEQ_END) {
pSender->seq++;
pBlk = taosMemoryCalloc(1, sizeof(SyncSnapBlock));
if (pBlk == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _OUT;
}
// read data
int32_t ret = pSender->pSyncNode->pFsm->FpSnapshotDoRead(pSender->pSyncNode->pFsm, pSender->pReader,
&pSender->pCurrentBlock, &pSender->blockLen);
int32_t ret = pSender->pSyncNode->pFsm->FpSnapshotDoRead(pSender->pSyncNode->pFsm, pSender->pReader, &pBlk->pBlock,
&pBlk->blockLen);
if (ret != 0) {
sSError(pSender, "snapshot sender read failed since %s", terrstr());
return -1;
goto _OUT;
}
pBlk->seq = pSender->seq;
if (pSender->blockLen > 0) {
// has read data
@ -263,7 +280,8 @@ static int32_t snapshotSend(SSyncSnapshotSender *pSender) {
// read finish, update seq to end
pSender->seq = SYNC_SNAPSHOT_SEQ_END;
sSInfo(pSender, "snapshot sender read to the end, blockLen:%d seq:%d", pSender->blockLen, pSender->seq);
return 0;
code = 0;
goto _OUT;
}
}
@ -271,7 +289,7 @@ static int32_t snapshotSend(SSyncSnapshotSender *pSender) {
SRpcMsg rpcMsg = {0};
if (syncBuildSnapshotSend(&rpcMsg, pSender->blockLen, pSender->pSyncNode->vgId) != 0) {
sSError(pSender, "vgId:%d, snapshot sender build msg failed since %s", pSender->pSyncNode->vgId, terrstr());
return -1;
goto _OUT;
}
SyncSnapshotSend *pMsg = rpcMsg.pCont;
@ -286,25 +304,35 @@ static int32_t snapshotSend(SSyncSnapshotSender *pSender) {
pMsg->startTime = pSender->startTime;
pMsg->seq = pSender->seq;
if (pSender->pCurrentBlock != NULL) {
memcpy(pMsg->data, pSender->pCurrentBlock, pSender->blockLen);
}
// event log
if (pSender->seq == SYNC_SNAPSHOT_SEQ_END) {
syncLogSendSyncSnapshotSend(pSender->pSyncNode, pMsg, "snapshot sender finish");
} else {
syncLogSendSyncSnapshotSend(pSender->pSyncNode, pMsg, "snapshot sender sending");
if (pBlk != NULL && pBlk->pBlock != NULL) {
memcpy(pMsg->data, pBlk->pBlock, pBlk->blockLen);
}
// send msg
if (syncNodeSendMsgById(&pMsg->destId, pSender->pSyncNode, &rpcMsg) != 0) {
sSError(pSender, "snapshot sender send msg failed since %s", terrstr());
return -1;
goto _OUT;
}
pSender->lastSendTime = taosGetTimestampMs();
return 0;
// put in buffer
int64_t nowMs = taosGetTimestampMs();
if (pBlk) {
ASSERT(pBlk->seq != SYNC_SNAPSHOT_SEQ_END);
pBlk->sendTimeMs = nowMs;
pSender->pSndBuf->entries[pSender->seq % pSender->pSndBuf->size] = pBlk;
pBlk = NULL;
pSender->pSndBuf->end = pSender->seq + 1;
}
pSender->lastSendTime = nowMs;
code = 0;
_OUT:;
if (pBlk != NULL) {
syncSnapBlockDestroy(pBlk);
pBlk = NULL;
}
return code;
}
// send snapshot data from cache
@ -319,7 +347,7 @@ int32_t snapshotReSend(SSyncSnapshotSender *pSender) {
SyncSnapshotSend *pMsg = rpcMsg.pCont;
pMsg->srcId = pSender->pSyncNode->myRaftId;
pMsg->destId = pSender->pSyncNode->replicasId[pSender->replicaIndex];
pMsg->term = raftStoreGetTerm(pSender->pSyncNode);
pMsg->term = pSender->term;
pMsg->beginIndex = pSender->snapshotParam.start;
pMsg->lastIndex = pSender->snapshot.lastApplyIndex;
pMsg->lastTerm = pSender->snapshot.lastApplyTerm;
@ -332,9 +360,6 @@ int32_t snapshotReSend(SSyncSnapshotSender *pSender) {
memcpy(pMsg->data, pSender->pCurrentBlock, pSender->blockLen);
}
// event log
syncLogSendSyncSnapshotSend(pSender->pSyncNode, pMsg, "snapshot sender resend");
// send msg
if (syncNodeSendMsgById(&pMsg->destId, pSender->pSyncNode, &rpcMsg) != 0) {
sSError(pSender, "snapshot sender resend msg failed since %s", terrstr());
@ -401,12 +426,14 @@ SSyncSnapshotReceiver *snapshotReceiverCreate(SSyncNode *pSyncNode, SRaftId from
pReceiver->snapshot.lastApplyTerm = 0;
pReceiver->snapshot.lastConfigIndex = SYNC_INDEX_INVALID;
pReceiver->pRcvBuf = syncSnapBufferCreate();
if (pReceiver->pRcvBuf == NULL) {
SSyncSnapBuffer *pRcvBuf = syncSnapBufferCreate();
if (pRcvBuf == NULL) {
taosMemoryFree(pReceiver);
pReceiver = NULL;
return NULL;
}
pRcvBuf->entryDeleteCb = rpcFreeCont;
pReceiver->pRcvBuf = pRcvBuf;
syncSnapBufferReset(pReceiver->pRcvBuf);
return pReceiver;
@ -884,7 +911,7 @@ static int32_t syncSnapBufferRecv(SSyncSnapshotReceiver *pReceiver, SyncSnapshot
}
pRcvBuf->start = seq + 1;
syncSnapSendRsp(pReceiver, pRcvBuf->entries[seq % pRcvBuf->size], code);
rpcFreeCont(pRcvBuf->entries[seq % pRcvBuf->size]);
pRcvBuf->entryDeleteCb(pRcvBuf->entries[seq % pRcvBuf->size]);
pRcvBuf->entries[seq % pRcvBuf->size] = NULL;
if (code) goto _out;
}
@ -1148,14 +1175,15 @@ static int32_t syncSnapBufferSend(SSyncSnapshotSender *pSender, SyncSnapshotRsp
ASSERT(pSndBuf->start <= pSndBuf->cursor + 1 && pSndBuf->cursor < pSndBuf->end);
if (pMsg->ack > pSndBuf->cursor && pSndBuf->entries[pMsg->ack % pSndBuf->size] == NULL) {
pSndBuf->entries[pMsg->ack % pSndBuf->size] = pMsg;
ppMsg[0] = NULL;
pSndBuf->end = TMAX(pMsg->ack + 1, pSndBuf->end);
if (pMsg->ack > pSndBuf->cursor && pMsg->ack < pSndBuf->end) {
SyncSnapBlock *pBlk = pSndBuf->entries[pMsg->ack % pSndBuf->size];
ASSERT(pBlk);
pBlk->acked = 1;
}
for (int64_t ack = pSndBuf->cursor + 1; ack < pSndBuf->end; ++ack) {
if (pSndBuf->entries[ack % pSndBuf->size]) {
SyncSnapBlock *pBlk = pSndBuf->entries[ack % pSndBuf->size];
if (pBlk->acked) {
pSndBuf->cursor = ack;
} else {
break;
@ -1163,7 +1191,7 @@ static int32_t syncSnapBufferSend(SSyncSnapshotSender *pSender, SyncSnapshotRsp
}
for (int64_t ack = pSndBuf->start; ack <= pSndBuf->cursor; ++ack) {
rpcFreeCont(pSndBuf->entries[ack % pSndBuf->size]);
pSndBuf->entryDeleteCb(pSndBuf->entries[ack % pSndBuf->size]);
pSndBuf->entries[ack % pSndBuf->size] = NULL;
pSndBuf->start = ack + 1;
}