diff --git a/include/util/tdef.h b/include/util/tdef.h index 7f8fe22340..9ea6240b1f 100644 --- a/include/util/tdef.h +++ b/include/util/tdef.h @@ -294,6 +294,8 @@ typedef enum ELogicConditionType { #define TSDB_SYNC_APPLYQ_SIZE_LIMIT 512 #define TSDB_SYNC_NEGOTIATION_WIN 512 +#define TSDB_SYNC_SNAP_BUFFER_SIZE 2048 + #define TSDB_TBNAME_COLUMN_INDEX (-1) #define TSDB_MULTI_TABLEMETA_MAX_NUM 100000 // maximum batch size allowed to load table meta diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h index cec1a12024..637c18e97d 100644 --- a/source/libs/sync/inc/syncInt.h +++ b/source/libs/sync/inc/syncInt.h @@ -249,8 +249,8 @@ int32_t syncNodeOnRequestVote(SSyncNode* pNode, const SRpcMsg* pMsg); int32_t syncNodeOnRequestVoteReply(SSyncNode* pNode, const SRpcMsg* pMsg); int32_t syncNodeOnAppendEntries(SSyncNode* pNode, const SRpcMsg* pMsg); int32_t syncNodeOnAppendEntriesReply(SSyncNode* ths, const SRpcMsg* pMsg); -int32_t syncNodeOnSnapshot(SSyncNode* ths, const SRpcMsg* pMsg); -int32_t syncNodeOnSnapshotRsp(SSyncNode* ths, const SRpcMsg* pMsg); +int32_t syncNodeOnSnapshot(SSyncNode* ths, SRpcMsg* pMsg); +int32_t syncNodeOnSnapshotRsp(SSyncNode* ths, SRpcMsg* pMsg); int32_t syncNodeOnHeartbeat(SSyncNode* ths, const SRpcMsg* pMsg); int32_t syncNodeOnHeartbeatReply(SSyncNode* ths, const SRpcMsg* pMsg); int32_t syncNodeOnLocalCmd(SSyncNode* ths, const SRpcMsg* pMsg); diff --git a/source/libs/sync/inc/syncSnapshot.h b/source/libs/sync/inc/syncSnapshot.h index 95382132b5..0332204769 100644 --- a/source/libs/sync/inc/syncSnapshot.h +++ b/source/libs/sync/inc/syncSnapshot.h @@ -30,6 +30,15 @@ extern "C" { #define SYNC_SNAPSHOT_RETRY_MS 5000 +typedef struct SSyncSnapBuffer { + void *entries[TSDB_SYNC_SNAP_BUFFER_SIZE]; + int64_t start; + int64_t cursor; + int64_t end; + int64_t size; + TdThreadMutex mutex; +} SSyncSnapBuffer; + typedef struct SSyncSnapshotSender { int8_t start; int32_t seq; @@ -47,6 +56,9 @@ typedef struct SSyncSnapshotSender { int64_t lastSendTime; bool finish; + // buffer + SSyncSnapBuffer *pSndBuf; + // init when create SSyncNode *pSyncNode; int32_t replicaIndex; @@ -72,6 +84,9 @@ typedef struct SSyncSnapshotReceiver { SSnapshotParam snapshotParam; SSnapshot snapshot; + // buffer + SSyncSnapBuffer *pRcvBuf; + // init when create SSyncNode *pSyncNode; } SSyncSnapshotReceiver; @@ -83,8 +98,8 @@ void snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver); bool snapshotReceiverIsStart(SSyncSnapshotReceiver *pReceiver); // on message -int32_t syncNodeOnSnapshot(SSyncNode *ths, const SRpcMsg *pMsg); -int32_t syncNodeOnSnapshotRsp(SSyncNode *ths, const SRpcMsg *pMsg); +// int32_t syncNodeOnSnapshot(SSyncNode *ths, const SRpcMsg *pMsg); +// int32_t syncNodeOnSnapshotRsp(SSyncNode *ths, const SRpcMsg *pMsg); SyncIndex syncNodeGetSnapshotConfigIndex(SSyncNode *pSyncNode, SyncIndex snapshotLastApplyIndex); diff --git a/source/libs/sync/src/syncSnapshot.c b/source/libs/sync/src/syncSnapshot.c index 924813eb98..43726307e6 100644 --- a/source/libs/sync/src/syncSnapshot.c +++ b/source/libs/sync/src/syncSnapshot.c @@ -23,6 +23,42 @@ #include "syncReplication.h" #include "syncUtil.h" +static void syncSnapBufferReset(SSyncSnapBuffer *pBuf) { + taosThreadMutexLock(&pBuf->mutex); + for (int64_t i = pBuf->start; i < pBuf->end; ++i) { + rpcFreeCont(pBuf->entries[i % pBuf->size]); + pBuf->entries[i % pBuf->size] = NULL; + } + pBuf->start = 1; + pBuf->end = 1; + pBuf->cursor = 0; + taosThreadMutexUnlock(&pBuf->mutex); +} + +static void syncSnapBufferDestroy(SSyncSnapBuffer **ppBuf) { + if (ppBuf == NULL || ppBuf[0] == NULL) return; + SSyncSnapBuffer *pBuf = ppBuf[0]; + + syncSnapBufferReset(pBuf); + + taosThreadMutexDestroy(&pBuf->mutex); + taosMemoryFree(ppBuf[0]); + ppBuf[0] = NULL; + return; +} + +static SSyncSnapBuffer *syncSnapBufferCreate() { + SSyncSnapBuffer *pBuf = taosMemoryCalloc(1, sizeof(SSyncSnapBuffer)); + if (pBuf == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } + pBuf->size = sizeof(pBuf->entries) / sizeof(void *); + ASSERT(pBuf->size == TSDB_SYNC_SNAP_BUFFER_SIZE); + taosThreadMutexInit(&pBuf->mutex, NULL); + return pBuf; +} + SSyncSnapshotSender *snapshotSenderCreate(SSyncNode *pSyncNode, int32_t replicaIndex) { bool condition = (pSyncNode->pFsm->FpSnapshotStartRead != NULL) && (pSyncNode->pFsm->FpSnapshotStopRead != NULL) && (pSyncNode->pFsm->FpSnapshotDoRead != NULL); @@ -49,6 +85,14 @@ SSyncSnapshotSender *snapshotSenderCreate(SSyncNode *pSyncNode, int32_t replicaI pSender->pSyncNode->pFsm->FpGetSnapshotInfo(pSender->pSyncNode->pFsm, &pSender->snapshot); pSender->finish = false; + pSender->pSndBuf = syncSnapBufferCreate(); + if (pSender->pSndBuf == NULL) { + taosMemoryFree(pSender); + pSender = NULL; + return NULL; + } + syncSnapBufferReset(pSender->pSndBuf); + return pSender; } @@ -67,6 +111,10 @@ void snapshotSenderDestroy(SSyncSnapshotSender *pSender) { pSender->pReader = NULL; } + // free snap buffer + if (pSender->pSndBuf) { + syncSnapBufferDestroy(&pSender->pSndBuf); + } // free sender taosMemoryFree(pSender); } @@ -181,6 +229,8 @@ void snapshotSenderStop(SSyncSnapshotSender *pSender, bool finish) { pSender->pCurrentBlock = NULL; pSender->blockLen = 0; } + + syncSnapBufferReset(pSender->pSndBuf); } // when sender receive ack, call this function to send msg from seq @@ -193,6 +243,8 @@ static int32_t snapshotSend(SSyncSnapshotSender *pSender) { pSender->blockLen = 0; } + pSender->seq++; + // read data int32_t ret = pSender->pSyncNode->pFsm->FpSnapshotDoRead(pSender->pSyncNode->pFsm, pSender->pReader, &pSender->pCurrentBlock, &pSender->blockLen); @@ -362,6 +414,14 @@ SSyncSnapshotReceiver *snapshotReceiverCreate(SSyncNode *pSyncNode, SRaftId from pReceiver->snapshot.lastApplyTerm = 0; pReceiver->snapshot.lastConfigIndex = SYNC_INDEX_INVALID; + pReceiver->pRcvBuf = syncSnapBufferCreate(); + if (pReceiver->pRcvBuf == NULL) { + taosMemoryFree(pReceiver); + pReceiver = NULL; + return NULL; + } + + syncSnapBufferReset(pReceiver->pRcvBuf); return pReceiver; } @@ -389,6 +449,11 @@ void snapshotReceiverDestroy(SSyncSnapshotReceiver *pReceiver) { pReceiver->snapshot.data = NULL; } + // free snap buf + if (pReceiver->pRcvBuf) { + syncSnapBufferDestroy(&pReceiver->pRcvBuf); + } + // free receiver taosMemoryFree(pReceiver); } @@ -472,6 +537,8 @@ void snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver) { } else { sRInfo(pReceiver, "snapshot receiver stop, writer is null"); } + + syncSnapBufferReset(pReceiver->pRcvBuf); } // when recv last snapshot block, apply data into snapshot @@ -765,29 +832,8 @@ _SEND_REPLY: return code; } -static int32_t syncNodeOnSnapshotReceive(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) { - // condition 4 - // transfering - SSyncSnapshotReceiver *pReceiver = pSyncNode->pNewNodeReceiver; - int64_t timeNow = taosGetTimestampMs(); - int32_t code = 0; - - if (snapshotReceiverSignatureCmp(pReceiver, pMsg) != 0) { - terrno = TSDB_CODE_SYN_MISMATCHED_SIGNATURE; - sRError(pReceiver, "failed to receive snapshot data since %s.", terrstr()); - code = terrno; - goto _SEND_REPLY; - } - - if (snapshotReceiverGotData(pReceiver, pMsg) != 0) { - code = terrno; - if (code >= SYNC_SNAPSHOT_SEQ_INVALID) { - code = TSDB_CODE_SYN_INTERNAL_ERROR; - } - } - -_SEND_REPLY:; - +static int32_t syncSnapSendRsp(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pMsg, int32_t code) { + SSyncNode *pSyncNode = pReceiver->pSyncNode; // build msg SRpcMsg rpcMsg = {0}; if (syncBuildSnapshotSendRsp(&rpcMsg, 0, pSyncNode->vgId)) { @@ -811,10 +857,76 @@ _SEND_REPLY:; sRError(pReceiver, "failed to send snapshot receiver resp since %s", terrstr()); return -1; } + return 0; +} +static int32_t syncSnapBufferRecv(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend **ppMsg) { + int32_t code = 0; + SSyncSnapBuffer *pRcvBuf = pReceiver->pRcvBuf; + SyncSnapshotSend *pMsg = ppMsg[0]; + terrno = TSDB_CODE_SUCCESS; + + taosThreadMutexLock(&pRcvBuf->mutex); + + if (pMsg->seq - pRcvBuf->start >= pRcvBuf->size) { + terrno = TSDB_CODE_SYN_BUFFER_FULL; + code = terrno; + goto _out; + } + + ASSERT(pRcvBuf->start <= pRcvBuf->cursor + 1 && pRcvBuf->cursor < pRcvBuf->end); + + if (pMsg->seq > pRcvBuf->cursor) { + pRcvBuf->entries[pMsg->seq % pRcvBuf->size] = pMsg; + ppMsg[0] = NULL; + pRcvBuf->end = TMAX(pMsg->seq + 1, pRcvBuf->end); + } + + for (int64_t seq = pRcvBuf->cursor + 1; seq < pRcvBuf->end; ++seq) { + if (pRcvBuf->entries[seq]) { + pRcvBuf->cursor = seq; + } else { + break; + } + } + + for (int64_t seq = pRcvBuf->start; seq <= pRcvBuf->cursor; ++seq) { + if (snapshotReceiverGotData(pReceiver, pRcvBuf->entries[seq % pRcvBuf->size]) != 0) { + code = terrno; + if (code >= SYNC_SNAPSHOT_SEQ_INVALID) { + code = TSDB_CODE_SYN_INTERNAL_ERROR; + } + } + pRcvBuf->start = seq + 1; + syncSnapSendRsp(pReceiver, pRcvBuf->entries[seq % pRcvBuf->size], code); + rpcFreeCont(pRcvBuf->entries[seq % pRcvBuf->size]); + pRcvBuf->entries[seq % pRcvBuf->size] = NULL; + if (code) goto _out; + } + +_out: + taosThreadMutexUnlock(&pRcvBuf->mutex); return code; } +static int32_t syncNodeOnSnapshotReceive(SSyncNode *pSyncNode, SyncSnapshotSend **ppMsg) { + // condition 4 + // transfering + SyncSnapshotSend *pMsg = ppMsg[0]; + ASSERT(pMsg); + SSyncSnapshotReceiver *pReceiver = pSyncNode->pNewNodeReceiver; + int64_t timeNow = taosGetTimestampMs(); + int32_t code = 0; + + if (snapshotReceiverSignatureCmp(pReceiver, pMsg) != 0) { + terrno = TSDB_CODE_SYN_MISMATCHED_SIGNATURE; + sRError(pReceiver, "failed to receive snapshot data since %s.", terrstr()); + return syncSnapSendRsp(pReceiver, pMsg, terrno); + } + + return syncSnapBufferRecv(pReceiver, ppMsg); +} + static int32_t syncNodeOnSnapshotEnd(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) { // condition 2 // end, finish FSM @@ -885,8 +997,10 @@ _SEND_REPLY:; // // condition 5, got data, update ack // -int32_t syncNodeOnSnapshot(SSyncNode *pSyncNode, const SRpcMsg *pRpcMsg) { - SyncSnapshotSend *pMsg = pRpcMsg->pCont; +int32_t syncNodeOnSnapshot(SSyncNode *pSyncNode, SRpcMsg *pRpcMsg) { + SyncSnapshotSend **ppMsg = (SyncSnapshotSend **)&pRpcMsg->pCont; + SyncSnapshotSend *pMsg = ppMsg[0]; + ASSERT(pMsg); SSyncSnapshotReceiver *pReceiver = pSyncNode->pNewNodeReceiver; // if already drop replica, do not process @@ -935,9 +1049,9 @@ int32_t syncNodeOnSnapshot(SSyncNode *pSyncNode, const SRpcMsg *pRpcMsg) { // force close, no response syncLogRecvSyncSnapshotSend(pSyncNode, pMsg, "process force stop"); snapshotReceiverStop(pReceiver); - } else if (pMsg->seq > SYNC_SNAPSHOT_SEQ_BEGIN && pMsg->seq < SYNC_SNAPSHOT_SEQ_END) { + } else if (pMsg->seq > SYNC_SNAPSHOT_SEQ_BEGIN && pMsg->seq <= SYNC_SNAPSHOT_SEQ_END) { syncLogRecvSyncSnapshotSend(pSyncNode, pMsg, "process seq data"); - code = syncNodeOnSnapshotReceive(pSyncNode, pMsg); + code = syncNodeOnSnapshotReceive(pSyncNode, ppMsg); } else { // error log sRError(pReceiver, "snapshot receiver recv error seq:%d, my ack:%d", pMsg->seq, pReceiver->ack); @@ -1038,14 +1152,62 @@ static int32_t snapshotSenderSignatureCmp(SSyncSnapshotSender *pSender, SyncSnap return 0; } +static int32_t syncSnapBufferSend(SSyncSnapshotSender *pSender, SyncSnapshotRsp **ppMsg) { + int32_t code = 0; + SSyncSnapBuffer *pSndBuf = pSender->pSndBuf; + SyncSnapshotRsp *pMsg = ppMsg[0]; + + taosThreadMutexLock(&pSndBuf->mutex); + + if (pMsg->ack - pSndBuf->start >= pSndBuf->size) { + terrno = TSDB_CODE_SYN_BUFFER_FULL; + code = terrno; + goto _out; + } + + ASSERT(pSndBuf->start <= pSndBuf->cursor + 1 && pSndBuf->cursor < pSndBuf->end); + + if (pMsg->ack > pSndBuf->cursor && pSndBuf->entries[pMsg->ack % pSndBuf->size] == NULL) { + pSndBuf->entries[pMsg->ack % pSndBuf->size] = pMsg; + ppMsg[0] = NULL; + pSndBuf->end = TMAX(pMsg->ack + 1, pSndBuf->end); + } + + for (int64_t ack = pSndBuf->cursor + 1; ack < pSndBuf->end; ++ack) { + if (pSndBuf->entries[ack % pSndBuf->size]) { + pSndBuf->cursor = ack; + } else { + break; + } + } + + for (int64_t ack = pSndBuf->start; ack < pSndBuf->cursor; ++ack) { + rpcFreeCont(pSndBuf->entries[ack % pSndBuf->size]); + pSndBuf->entries[ack % pSndBuf->size] = NULL; + pSndBuf->start = ack + 1; + } + + while (pSender->seq - pSndBuf->start < (pSndBuf->size >> 2)) { + if (snapshotSend(pSender) != 0) { + code = terrno; + goto _out; + } + } + +_out: + taosThreadMutexUnlock(&pSndBuf->mutex); + return code; +} + // sender on message // // condition 1 sender receives SYNC_SNAPSHOT_SEQ_END, close sender // condition 2 sender receives ack, set seq = ack + 1, send msg from seq // condition 3 sender receives error msg, just print error log // -int32_t syncNodeOnSnapshotRsp(SSyncNode *pSyncNode, const SRpcMsg *pRpcMsg) { - SyncSnapshotRsp *pMsg = pRpcMsg->pCont; +int32_t syncNodeOnSnapshotRsp(SSyncNode *pSyncNode, SRpcMsg *pRpcMsg) { + SyncSnapshotRsp **ppMsg = (SyncSnapshotRsp **)&pRpcMsg->pCont; + SyncSnapshotRsp *pMsg = ppMsg[0]; // if already drop replica, do not process if (!syncNodeInRaftGroup(pSyncNode, &pMsg->srcId)) { @@ -1123,12 +1285,8 @@ int32_t syncNodeOnSnapshotRsp(SSyncNode *pSyncNode, const SRpcMsg *pRpcMsg) { if (pMsg->ack == SYNC_SNAPSHOT_SEQ_BEGIN) { syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "process seq begin"); - if (snapshotSenderUpdateProgress(pSender, pMsg) != 0) { - return -1; - } - if (snapshotSend(pSender) != 0) { - return -1; + goto _ERROR; } return 0; } @@ -1142,30 +1300,9 @@ int32_t syncNodeOnSnapshotRsp(SSyncNode *pSyncNode, const SRpcMsg *pRpcMsg) { } // send next msg - if (pMsg->ack == pSender->seq) { - syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "process seq data"); - // update sender ack - if (snapshotSenderUpdateProgress(pSender, pMsg) != 0) { - return -1; - } - if (snapshotSend(pSender) != 0) { - return -1; - } - } else if (pMsg->ack == pSender->seq - 1) { - // maybe resend - syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "process seq and resend"); - if (snapshotReSend(pSender) != 0) { - return -1; - } - } else { - // error log - syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "receive error ack"); - sSError(pSender, "snapshot sender receive error ack:%d, my seq:%d", pMsg->ack, pSender->seq); - snapshotSenderStop(pSender, true); - syncNodeReplicateReset(pSyncNode, &pMsg->srcId); - return -1; + if (syncSnapBufferSend(pSender, ppMsg) != 0) { + goto _ERROR; } - return 0; _ERROR: