From fe553352b85812fc4c24934f50fd440e6007f44e Mon Sep 17 00:00:00 2001 From: Benguang Zhao Date: Wed, 25 Oct 2023 20:25:57 +0800 Subject: [PATCH] enh: cache snap blocks sent in snapshotSend for resending --- source/libs/sync/inc/syncSnapshot.h | 13 ++++ source/libs/sync/src/syncSnapshot.c | 108 +++++++++++++++++----------- 2 files changed, 81 insertions(+), 40 deletions(-) diff --git a/source/libs/sync/inc/syncSnapshot.h b/source/libs/sync/inc/syncSnapshot.h index 93c62544f2..e68702568a 100644 --- a/source/libs/sync/inc/syncSnapshot.h +++ b/source/libs/sync/inc/syncSnapshot.h @@ -37,8 +37,21 @@ typedef struct SSyncSnapBuffer { int64_t end; int64_t size; TdThreadMutex mutex; + void (*entryDeleteCb)(void *ptr); } SSyncSnapBuffer; +typedef struct SyncSnapBlock { + int32_t seq; + int8_t acked; + int64_t sendTimeMs; + + int16_t blockType; + void *pBlock; + int32_t blockLen; +} SyncSnapBlock; + +void syncSnapBlockDestroy(void *ptr); + typedef struct SSyncSnapshotSender { int8_t start; int32_t seq; diff --git a/source/libs/sync/src/syncSnapshot.c b/source/libs/sync/src/syncSnapshot.c index 751f777b18..eee0ab2cc9 100644 --- a/source/libs/sync/src/syncSnapshot.c +++ b/source/libs/sync/src/syncSnapshot.c @@ -26,7 +26,9 @@ static void syncSnapBufferReset(SSyncSnapBuffer *pBuf) { taosThreadMutexLock(&pBuf->mutex); for (int64_t i = pBuf->start; i < pBuf->end; ++i) { - rpcFreeCont(pBuf->entries[i % pBuf->size]); + if (pBuf->entryDeleteCb) { + pBuf->entryDeleteCb(pBuf->entries[i % pBuf->size]); + } pBuf->entries[i % pBuf->size] = NULL; } pBuf->start = 1; @@ -85,17 +87,29 @@ SSyncSnapshotSender *snapshotSenderCreate(SSyncNode *pSyncNode, int32_t replicaI pSender->pSyncNode->pFsm->FpGetSnapshotInfo(pSender->pSyncNode->pFsm, &pSender->snapshot); pSender->finish = false; - pSender->pSndBuf = syncSnapBufferCreate(); - if (pSender->pSndBuf == NULL) { + SSyncSnapBuffer *pSndBuf = syncSnapBufferCreate(); + if (pSndBuf == NULL) { taosMemoryFree(pSender); pSender = NULL; return NULL; } - syncSnapBufferReset(pSender->pSndBuf); + pSndBuf->entryDeleteCb = syncSnapBlockDestroy; + pSender->pSndBuf = pSndBuf; + syncSnapBufferReset(pSender->pSndBuf); return pSender; } +void syncSnapBlockDestroy(void *ptr) { + SyncSnapBlock *pBlk = ptr; + if (pBlk->pBlock != NULL) { + taosMemoryFree(pBlk->pBlock); + pBlk->pBlock = NULL; + pBlk->blockLen = 0; + } + taosMemoryFree(pBlk); +} + void snapshotSenderDestroy(SSyncSnapshotSender *pSender) { if (pSender == NULL) return; @@ -238,23 +252,26 @@ void snapshotSenderStop(SSyncSnapshotSender *pSender, bool finish) { // when sender receive ack, call this function to send msg from seq // seq = ack + 1, already updated static int32_t snapshotSend(SSyncSnapshotSender *pSender) { - // free memory last time (current seq - 1) - if (pSender->pCurrentBlock != NULL) { - taosMemoryFree(pSender->pCurrentBlock); - pSender->pCurrentBlock = NULL; - pSender->blockLen = 0; - } + int32_t code = -1; + SyncSnapBlock *pBlk = NULL; if (pSender->seq != SYNC_SNAPSHOT_SEQ_END) { pSender->seq++; + pBlk = taosMemoryCalloc(1, sizeof(SyncSnapBlock)); + if (pBlk == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + goto _OUT; + } + // read data - int32_t ret = pSender->pSyncNode->pFsm->FpSnapshotDoRead(pSender->pSyncNode->pFsm, pSender->pReader, - &pSender->pCurrentBlock, &pSender->blockLen); + int32_t ret = pSender->pSyncNode->pFsm->FpSnapshotDoRead(pSender->pSyncNode->pFsm, pSender->pReader, &pBlk->pBlock, + &pBlk->blockLen); if (ret != 0) { sSError(pSender, "snapshot sender read failed since %s", terrstr()); - return -1; + goto _OUT; } + pBlk->seq = pSender->seq; if (pSender->blockLen > 0) { // has read data @@ -263,7 +280,8 @@ static int32_t snapshotSend(SSyncSnapshotSender *pSender) { // read finish, update seq to end pSender->seq = SYNC_SNAPSHOT_SEQ_END; sSInfo(pSender, "snapshot sender read to the end, blockLen:%d seq:%d", pSender->blockLen, pSender->seq); - return 0; + code = 0; + goto _OUT; } } @@ -271,7 +289,7 @@ static int32_t snapshotSend(SSyncSnapshotSender *pSender) { SRpcMsg rpcMsg = {0}; if (syncBuildSnapshotSend(&rpcMsg, pSender->blockLen, pSender->pSyncNode->vgId) != 0) { sSError(pSender, "vgId:%d, snapshot sender build msg failed since %s", pSender->pSyncNode->vgId, terrstr()); - return -1; + goto _OUT; } SyncSnapshotSend *pMsg = rpcMsg.pCont; @@ -286,25 +304,35 @@ static int32_t snapshotSend(SSyncSnapshotSender *pSender) { pMsg->startTime = pSender->startTime; pMsg->seq = pSender->seq; - if (pSender->pCurrentBlock != NULL) { - memcpy(pMsg->data, pSender->pCurrentBlock, pSender->blockLen); - } - - // event log - if (pSender->seq == SYNC_SNAPSHOT_SEQ_END) { - syncLogSendSyncSnapshotSend(pSender->pSyncNode, pMsg, "snapshot sender finish"); - } else { - syncLogSendSyncSnapshotSend(pSender->pSyncNode, pMsg, "snapshot sender sending"); + if (pBlk != NULL && pBlk->pBlock != NULL) { + memcpy(pMsg->data, pBlk->pBlock, pBlk->blockLen); } // send msg if (syncNodeSendMsgById(&pMsg->destId, pSender->pSyncNode, &rpcMsg) != 0) { sSError(pSender, "snapshot sender send msg failed since %s", terrstr()); - return -1; + goto _OUT; } - pSender->lastSendTime = taosGetTimestampMs(); - return 0; + // put in buffer + int64_t nowMs = taosGetTimestampMs(); + if (pBlk) { + ASSERT(pBlk->seq != SYNC_SNAPSHOT_SEQ_END); + pBlk->sendTimeMs = nowMs; + pSender->pSndBuf->entries[pSender->seq % pSender->pSndBuf->size] = pBlk; + pBlk = NULL; + pSender->pSndBuf->end = pSender->seq + 1; + } + + pSender->lastSendTime = nowMs; + code = 0; + +_OUT:; + if (pBlk != NULL) { + syncSnapBlockDestroy(pBlk); + pBlk = NULL; + } + return code; } // send snapshot data from cache @@ -319,7 +347,7 @@ int32_t snapshotReSend(SSyncSnapshotSender *pSender) { SyncSnapshotSend *pMsg = rpcMsg.pCont; pMsg->srcId = pSender->pSyncNode->myRaftId; pMsg->destId = pSender->pSyncNode->replicasId[pSender->replicaIndex]; - pMsg->term = raftStoreGetTerm(pSender->pSyncNode); + pMsg->term = pSender->term; pMsg->beginIndex = pSender->snapshotParam.start; pMsg->lastIndex = pSender->snapshot.lastApplyIndex; pMsg->lastTerm = pSender->snapshot.lastApplyTerm; @@ -332,9 +360,6 @@ int32_t snapshotReSend(SSyncSnapshotSender *pSender) { memcpy(pMsg->data, pSender->pCurrentBlock, pSender->blockLen); } - // event log - syncLogSendSyncSnapshotSend(pSender->pSyncNode, pMsg, "snapshot sender resend"); - // send msg if (syncNodeSendMsgById(&pMsg->destId, pSender->pSyncNode, &rpcMsg) != 0) { sSError(pSender, "snapshot sender resend msg failed since %s", terrstr()); @@ -401,12 +426,14 @@ SSyncSnapshotReceiver *snapshotReceiverCreate(SSyncNode *pSyncNode, SRaftId from pReceiver->snapshot.lastApplyTerm = 0; pReceiver->snapshot.lastConfigIndex = SYNC_INDEX_INVALID; - pReceiver->pRcvBuf = syncSnapBufferCreate(); - if (pReceiver->pRcvBuf == NULL) { + SSyncSnapBuffer *pRcvBuf = syncSnapBufferCreate(); + if (pRcvBuf == NULL) { taosMemoryFree(pReceiver); pReceiver = NULL; return NULL; } + pRcvBuf->entryDeleteCb = rpcFreeCont; + pReceiver->pRcvBuf = pRcvBuf; syncSnapBufferReset(pReceiver->pRcvBuf); return pReceiver; @@ -884,7 +911,7 @@ static int32_t syncSnapBufferRecv(SSyncSnapshotReceiver *pReceiver, SyncSnapshot } pRcvBuf->start = seq + 1; syncSnapSendRsp(pReceiver, pRcvBuf->entries[seq % pRcvBuf->size], code); - rpcFreeCont(pRcvBuf->entries[seq % pRcvBuf->size]); + pRcvBuf->entryDeleteCb(pRcvBuf->entries[seq % pRcvBuf->size]); pRcvBuf->entries[seq % pRcvBuf->size] = NULL; if (code) goto _out; } @@ -1148,14 +1175,15 @@ static int32_t syncSnapBufferSend(SSyncSnapshotSender *pSender, SyncSnapshotRsp ASSERT(pSndBuf->start <= pSndBuf->cursor + 1 && pSndBuf->cursor < pSndBuf->end); - if (pMsg->ack > pSndBuf->cursor && pSndBuf->entries[pMsg->ack % pSndBuf->size] == NULL) { - pSndBuf->entries[pMsg->ack % pSndBuf->size] = pMsg; - ppMsg[0] = NULL; - pSndBuf->end = TMAX(pMsg->ack + 1, pSndBuf->end); + if (pMsg->ack > pSndBuf->cursor && pMsg->ack < pSndBuf->end) { + SyncSnapBlock *pBlk = pSndBuf->entries[pMsg->ack % pSndBuf->size]; + ASSERT(pBlk); + pBlk->acked = 1; } for (int64_t ack = pSndBuf->cursor + 1; ack < pSndBuf->end; ++ack) { - if (pSndBuf->entries[ack % pSndBuf->size]) { + SyncSnapBlock *pBlk = pSndBuf->entries[ack % pSndBuf->size]; + if (pBlk->acked) { pSndBuf->cursor = ack; } else { break; @@ -1163,7 +1191,7 @@ static int32_t syncSnapBufferSend(SSyncSnapshotSender *pSender, SyncSnapshotRsp } for (int64_t ack = pSndBuf->start; ack <= pSndBuf->cursor; ++ack) { - rpcFreeCont(pSndBuf->entries[ack % pSndBuf->size]); + pSndBuf->entryDeleteCb(pSndBuf->entries[ack % pSndBuf->size]); pSndBuf->entries[ack % pSndBuf->size] = NULL; pSndBuf->start = ack + 1; }