feat: support pipelining of snap replication
This commit is contained in:
parent
9f9fae3b99
commit
e34da43e38
|
@ -294,6 +294,8 @@ typedef enum ELogicConditionType {
|
|||
#define TSDB_SYNC_APPLYQ_SIZE_LIMIT 512
|
||||
#define TSDB_SYNC_NEGOTIATION_WIN 512
|
||||
|
||||
#define TSDB_SYNC_SNAP_BUFFER_SIZE 2048
|
||||
|
||||
#define TSDB_TBNAME_COLUMN_INDEX (-1)
|
||||
#define TSDB_MULTI_TABLEMETA_MAX_NUM 100000 // maximum batch size allowed to load table meta
|
||||
|
||||
|
|
|
@ -249,8 +249,8 @@ int32_t syncNodeOnRequestVote(SSyncNode* pNode, const SRpcMsg* pMsg);
|
|||
int32_t syncNodeOnRequestVoteReply(SSyncNode* pNode, const SRpcMsg* pMsg);
|
||||
int32_t syncNodeOnAppendEntries(SSyncNode* pNode, const SRpcMsg* pMsg);
|
||||
int32_t syncNodeOnAppendEntriesReply(SSyncNode* ths, const SRpcMsg* pMsg);
|
||||
int32_t syncNodeOnSnapshot(SSyncNode* ths, const SRpcMsg* pMsg);
|
||||
int32_t syncNodeOnSnapshotRsp(SSyncNode* ths, const SRpcMsg* pMsg);
|
||||
int32_t syncNodeOnSnapshot(SSyncNode* ths, SRpcMsg* pMsg);
|
||||
int32_t syncNodeOnSnapshotRsp(SSyncNode* ths, SRpcMsg* pMsg);
|
||||
int32_t syncNodeOnHeartbeat(SSyncNode* ths, const SRpcMsg* pMsg);
|
||||
int32_t syncNodeOnHeartbeatReply(SSyncNode* ths, const SRpcMsg* pMsg);
|
||||
int32_t syncNodeOnLocalCmd(SSyncNode* ths, const SRpcMsg* pMsg);
|
||||
|
|
|
@ -30,6 +30,15 @@ extern "C" {
|
|||
|
||||
#define SYNC_SNAPSHOT_RETRY_MS 5000
|
||||
|
||||
typedef struct SSyncSnapBuffer {
|
||||
void *entries[TSDB_SYNC_SNAP_BUFFER_SIZE];
|
||||
int64_t start;
|
||||
int64_t cursor;
|
||||
int64_t end;
|
||||
int64_t size;
|
||||
TdThreadMutex mutex;
|
||||
} SSyncSnapBuffer;
|
||||
|
||||
typedef struct SSyncSnapshotSender {
|
||||
int8_t start;
|
||||
int32_t seq;
|
||||
|
@ -47,6 +56,9 @@ typedef struct SSyncSnapshotSender {
|
|||
int64_t lastSendTime;
|
||||
bool finish;
|
||||
|
||||
// buffer
|
||||
SSyncSnapBuffer *pSndBuf;
|
||||
|
||||
// init when create
|
||||
SSyncNode *pSyncNode;
|
||||
int32_t replicaIndex;
|
||||
|
@ -72,6 +84,9 @@ typedef struct SSyncSnapshotReceiver {
|
|||
SSnapshotParam snapshotParam;
|
||||
SSnapshot snapshot;
|
||||
|
||||
// buffer
|
||||
SSyncSnapBuffer *pRcvBuf;
|
||||
|
||||
// init when create
|
||||
SSyncNode *pSyncNode;
|
||||
} SSyncSnapshotReceiver;
|
||||
|
@ -83,8 +98,8 @@ void snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver);
|
|||
bool snapshotReceiverIsStart(SSyncSnapshotReceiver *pReceiver);
|
||||
|
||||
// on message
|
||||
int32_t syncNodeOnSnapshot(SSyncNode *ths, const SRpcMsg *pMsg);
|
||||
int32_t syncNodeOnSnapshotRsp(SSyncNode *ths, const SRpcMsg *pMsg);
|
||||
// int32_t syncNodeOnSnapshot(SSyncNode *ths, const SRpcMsg *pMsg);
|
||||
// int32_t syncNodeOnSnapshotRsp(SSyncNode *ths, const SRpcMsg *pMsg);
|
||||
|
||||
SyncIndex syncNodeGetSnapshotConfigIndex(SSyncNode *pSyncNode, SyncIndex snapshotLastApplyIndex);
|
||||
|
||||
|
|
|
@ -23,6 +23,42 @@
|
|||
#include "syncReplication.h"
|
||||
#include "syncUtil.h"
|
||||
|
||||
static void syncSnapBufferReset(SSyncSnapBuffer *pBuf) {
|
||||
taosThreadMutexLock(&pBuf->mutex);
|
||||
for (int64_t i = pBuf->start; i < pBuf->end; ++i) {
|
||||
rpcFreeCont(pBuf->entries[i % pBuf->size]);
|
||||
pBuf->entries[i % pBuf->size] = NULL;
|
||||
}
|
||||
pBuf->start = 1;
|
||||
pBuf->end = 1;
|
||||
pBuf->cursor = 0;
|
||||
taosThreadMutexUnlock(&pBuf->mutex);
|
||||
}
|
||||
|
||||
static void syncSnapBufferDestroy(SSyncSnapBuffer **ppBuf) {
|
||||
if (ppBuf == NULL || ppBuf[0] == NULL) return;
|
||||
SSyncSnapBuffer *pBuf = ppBuf[0];
|
||||
|
||||
syncSnapBufferReset(pBuf);
|
||||
|
||||
taosThreadMutexDestroy(&pBuf->mutex);
|
||||
taosMemoryFree(ppBuf[0]);
|
||||
ppBuf[0] = NULL;
|
||||
return;
|
||||
}
|
||||
|
||||
static SSyncSnapBuffer *syncSnapBufferCreate() {
|
||||
SSyncSnapBuffer *pBuf = taosMemoryCalloc(1, sizeof(SSyncSnapBuffer));
|
||||
if (pBuf == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return NULL;
|
||||
}
|
||||
pBuf->size = sizeof(pBuf->entries) / sizeof(void *);
|
||||
ASSERT(pBuf->size == TSDB_SYNC_SNAP_BUFFER_SIZE);
|
||||
taosThreadMutexInit(&pBuf->mutex, NULL);
|
||||
return pBuf;
|
||||
}
|
||||
|
||||
SSyncSnapshotSender *snapshotSenderCreate(SSyncNode *pSyncNode, int32_t replicaIndex) {
|
||||
bool condition = (pSyncNode->pFsm->FpSnapshotStartRead != NULL) && (pSyncNode->pFsm->FpSnapshotStopRead != NULL) &&
|
||||
(pSyncNode->pFsm->FpSnapshotDoRead != NULL);
|
||||
|
@ -49,6 +85,14 @@ SSyncSnapshotSender *snapshotSenderCreate(SSyncNode *pSyncNode, int32_t replicaI
|
|||
pSender->pSyncNode->pFsm->FpGetSnapshotInfo(pSender->pSyncNode->pFsm, &pSender->snapshot);
|
||||
pSender->finish = false;
|
||||
|
||||
pSender->pSndBuf = syncSnapBufferCreate();
|
||||
if (pSender->pSndBuf == NULL) {
|
||||
taosMemoryFree(pSender);
|
||||
pSender = NULL;
|
||||
return NULL;
|
||||
}
|
||||
syncSnapBufferReset(pSender->pSndBuf);
|
||||
|
||||
return pSender;
|
||||
}
|
||||
|
||||
|
@ -67,6 +111,10 @@ void snapshotSenderDestroy(SSyncSnapshotSender *pSender) {
|
|||
pSender->pReader = NULL;
|
||||
}
|
||||
|
||||
// free snap buffer
|
||||
if (pSender->pSndBuf) {
|
||||
syncSnapBufferDestroy(&pSender->pSndBuf);
|
||||
}
|
||||
// free sender
|
||||
taosMemoryFree(pSender);
|
||||
}
|
||||
|
@ -181,6 +229,8 @@ void snapshotSenderStop(SSyncSnapshotSender *pSender, bool finish) {
|
|||
pSender->pCurrentBlock = NULL;
|
||||
pSender->blockLen = 0;
|
||||
}
|
||||
|
||||
syncSnapBufferReset(pSender->pSndBuf);
|
||||
}
|
||||
|
||||
// when sender receive ack, call this function to send msg from seq
|
||||
|
@ -193,6 +243,8 @@ static int32_t snapshotSend(SSyncSnapshotSender *pSender) {
|
|||
pSender->blockLen = 0;
|
||||
}
|
||||
|
||||
pSender->seq++;
|
||||
|
||||
// read data
|
||||
int32_t ret = pSender->pSyncNode->pFsm->FpSnapshotDoRead(pSender->pSyncNode->pFsm, pSender->pReader,
|
||||
&pSender->pCurrentBlock, &pSender->blockLen);
|
||||
|
@ -362,6 +414,14 @@ SSyncSnapshotReceiver *snapshotReceiverCreate(SSyncNode *pSyncNode, SRaftId from
|
|||
pReceiver->snapshot.lastApplyTerm = 0;
|
||||
pReceiver->snapshot.lastConfigIndex = SYNC_INDEX_INVALID;
|
||||
|
||||
pReceiver->pRcvBuf = syncSnapBufferCreate();
|
||||
if (pReceiver->pRcvBuf == NULL) {
|
||||
taosMemoryFree(pReceiver);
|
||||
pReceiver = NULL;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
syncSnapBufferReset(pReceiver->pRcvBuf);
|
||||
return pReceiver;
|
||||
}
|
||||
|
||||
|
@ -389,6 +449,11 @@ void snapshotReceiverDestroy(SSyncSnapshotReceiver *pReceiver) {
|
|||
pReceiver->snapshot.data = NULL;
|
||||
}
|
||||
|
||||
// free snap buf
|
||||
if (pReceiver->pRcvBuf) {
|
||||
syncSnapBufferDestroy(&pReceiver->pRcvBuf);
|
||||
}
|
||||
|
||||
// free receiver
|
||||
taosMemoryFree(pReceiver);
|
||||
}
|
||||
|
@ -472,6 +537,8 @@ void snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver) {
|
|||
} else {
|
||||
sRInfo(pReceiver, "snapshot receiver stop, writer is null");
|
||||
}
|
||||
|
||||
syncSnapBufferReset(pReceiver->pRcvBuf);
|
||||
}
|
||||
|
||||
// when recv last snapshot block, apply data into snapshot
|
||||
|
@ -765,29 +832,8 @@ _SEND_REPLY:
|
|||
return code;
|
||||
}
|
||||
|
||||
static int32_t syncNodeOnSnapshotReceive(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
|
||||
// condition 4
|
||||
// transfering
|
||||
SSyncSnapshotReceiver *pReceiver = pSyncNode->pNewNodeReceiver;
|
||||
int64_t timeNow = taosGetTimestampMs();
|
||||
int32_t code = 0;
|
||||
|
||||
if (snapshotReceiverSignatureCmp(pReceiver, pMsg) != 0) {
|
||||
terrno = TSDB_CODE_SYN_MISMATCHED_SIGNATURE;
|
||||
sRError(pReceiver, "failed to receive snapshot data since %s.", terrstr());
|
||||
code = terrno;
|
||||
goto _SEND_REPLY;
|
||||
}
|
||||
|
||||
if (snapshotReceiverGotData(pReceiver, pMsg) != 0) {
|
||||
code = terrno;
|
||||
if (code >= SYNC_SNAPSHOT_SEQ_INVALID) {
|
||||
code = TSDB_CODE_SYN_INTERNAL_ERROR;
|
||||
}
|
||||
}
|
||||
|
||||
_SEND_REPLY:;
|
||||
|
||||
static int32_t syncSnapSendRsp(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pMsg, int32_t code) {
|
||||
SSyncNode *pSyncNode = pReceiver->pSyncNode;
|
||||
// build msg
|
||||
SRpcMsg rpcMsg = {0};
|
||||
if (syncBuildSnapshotSendRsp(&rpcMsg, 0, pSyncNode->vgId)) {
|
||||
|
@ -811,10 +857,76 @@ _SEND_REPLY:;
|
|||
sRError(pReceiver, "failed to send snapshot receiver resp since %s", terrstr());
|
||||
return -1;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int32_t syncSnapBufferRecv(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend **ppMsg) {
|
||||
int32_t code = 0;
|
||||
SSyncSnapBuffer *pRcvBuf = pReceiver->pRcvBuf;
|
||||
SyncSnapshotSend *pMsg = ppMsg[0];
|
||||
terrno = TSDB_CODE_SUCCESS;
|
||||
|
||||
taosThreadMutexLock(&pRcvBuf->mutex);
|
||||
|
||||
if (pMsg->seq - pRcvBuf->start >= pRcvBuf->size) {
|
||||
terrno = TSDB_CODE_SYN_BUFFER_FULL;
|
||||
code = terrno;
|
||||
goto _out;
|
||||
}
|
||||
|
||||
ASSERT(pRcvBuf->start <= pRcvBuf->cursor + 1 && pRcvBuf->cursor < pRcvBuf->end);
|
||||
|
||||
if (pMsg->seq > pRcvBuf->cursor) {
|
||||
pRcvBuf->entries[pMsg->seq % pRcvBuf->size] = pMsg;
|
||||
ppMsg[0] = NULL;
|
||||
pRcvBuf->end = TMAX(pMsg->seq + 1, pRcvBuf->end);
|
||||
}
|
||||
|
||||
for (int64_t seq = pRcvBuf->cursor + 1; seq < pRcvBuf->end; ++seq) {
|
||||
if (pRcvBuf->entries[seq]) {
|
||||
pRcvBuf->cursor = seq;
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
for (int64_t seq = pRcvBuf->start; seq <= pRcvBuf->cursor; ++seq) {
|
||||
if (snapshotReceiverGotData(pReceiver, pRcvBuf->entries[seq % pRcvBuf->size]) != 0) {
|
||||
code = terrno;
|
||||
if (code >= SYNC_SNAPSHOT_SEQ_INVALID) {
|
||||
code = TSDB_CODE_SYN_INTERNAL_ERROR;
|
||||
}
|
||||
}
|
||||
pRcvBuf->start = seq + 1;
|
||||
syncSnapSendRsp(pReceiver, pRcvBuf->entries[seq % pRcvBuf->size], code);
|
||||
rpcFreeCont(pRcvBuf->entries[seq % pRcvBuf->size]);
|
||||
pRcvBuf->entries[seq % pRcvBuf->size] = NULL;
|
||||
if (code) goto _out;
|
||||
}
|
||||
|
||||
_out:
|
||||
taosThreadMutexUnlock(&pRcvBuf->mutex);
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t syncNodeOnSnapshotReceive(SSyncNode *pSyncNode, SyncSnapshotSend **ppMsg) {
|
||||
// condition 4
|
||||
// transfering
|
||||
SyncSnapshotSend *pMsg = ppMsg[0];
|
||||
ASSERT(pMsg);
|
||||
SSyncSnapshotReceiver *pReceiver = pSyncNode->pNewNodeReceiver;
|
||||
int64_t timeNow = taosGetTimestampMs();
|
||||
int32_t code = 0;
|
||||
|
||||
if (snapshotReceiverSignatureCmp(pReceiver, pMsg) != 0) {
|
||||
terrno = TSDB_CODE_SYN_MISMATCHED_SIGNATURE;
|
||||
sRError(pReceiver, "failed to receive snapshot data since %s.", terrstr());
|
||||
return syncSnapSendRsp(pReceiver, pMsg, terrno);
|
||||
}
|
||||
|
||||
return syncSnapBufferRecv(pReceiver, ppMsg);
|
||||
}
|
||||
|
||||
static int32_t syncNodeOnSnapshotEnd(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
|
||||
// condition 2
|
||||
// end, finish FSM
|
||||
|
@ -885,8 +997,10 @@ _SEND_REPLY:;
|
|||
//
|
||||
// condition 5, got data, update ack
|
||||
//
|
||||
int32_t syncNodeOnSnapshot(SSyncNode *pSyncNode, const SRpcMsg *pRpcMsg) {
|
||||
SyncSnapshotSend *pMsg = pRpcMsg->pCont;
|
||||
int32_t syncNodeOnSnapshot(SSyncNode *pSyncNode, SRpcMsg *pRpcMsg) {
|
||||
SyncSnapshotSend **ppMsg = (SyncSnapshotSend **)&pRpcMsg->pCont;
|
||||
SyncSnapshotSend *pMsg = ppMsg[0];
|
||||
ASSERT(pMsg);
|
||||
SSyncSnapshotReceiver *pReceiver = pSyncNode->pNewNodeReceiver;
|
||||
|
||||
// if already drop replica, do not process
|
||||
|
@ -935,9 +1049,9 @@ int32_t syncNodeOnSnapshot(SSyncNode *pSyncNode, const SRpcMsg *pRpcMsg) {
|
|||
// force close, no response
|
||||
syncLogRecvSyncSnapshotSend(pSyncNode, pMsg, "process force stop");
|
||||
snapshotReceiverStop(pReceiver);
|
||||
} else if (pMsg->seq > SYNC_SNAPSHOT_SEQ_BEGIN && pMsg->seq < SYNC_SNAPSHOT_SEQ_END) {
|
||||
} else if (pMsg->seq > SYNC_SNAPSHOT_SEQ_BEGIN && pMsg->seq <= SYNC_SNAPSHOT_SEQ_END) {
|
||||
syncLogRecvSyncSnapshotSend(pSyncNode, pMsg, "process seq data");
|
||||
code = syncNodeOnSnapshotReceive(pSyncNode, pMsg);
|
||||
code = syncNodeOnSnapshotReceive(pSyncNode, ppMsg);
|
||||
} else {
|
||||
// error log
|
||||
sRError(pReceiver, "snapshot receiver recv error seq:%d, my ack:%d", pMsg->seq, pReceiver->ack);
|
||||
|
@ -1038,14 +1152,62 @@ static int32_t snapshotSenderSignatureCmp(SSyncSnapshotSender *pSender, SyncSnap
|
|||
return 0;
|
||||
}
|
||||
|
||||
static int32_t syncSnapBufferSend(SSyncSnapshotSender *pSender, SyncSnapshotRsp **ppMsg) {
|
||||
int32_t code = 0;
|
||||
SSyncSnapBuffer *pSndBuf = pSender->pSndBuf;
|
||||
SyncSnapshotRsp *pMsg = ppMsg[0];
|
||||
|
||||
taosThreadMutexLock(&pSndBuf->mutex);
|
||||
|
||||
if (pMsg->ack - pSndBuf->start >= pSndBuf->size) {
|
||||
terrno = TSDB_CODE_SYN_BUFFER_FULL;
|
||||
code = terrno;
|
||||
goto _out;
|
||||
}
|
||||
|
||||
ASSERT(pSndBuf->start <= pSndBuf->cursor + 1 && pSndBuf->cursor < pSndBuf->end);
|
||||
|
||||
if (pMsg->ack > pSndBuf->cursor && pSndBuf->entries[pMsg->ack % pSndBuf->size] == NULL) {
|
||||
pSndBuf->entries[pMsg->ack % pSndBuf->size] = pMsg;
|
||||
ppMsg[0] = NULL;
|
||||
pSndBuf->end = TMAX(pMsg->ack + 1, pSndBuf->end);
|
||||
}
|
||||
|
||||
for (int64_t ack = pSndBuf->cursor + 1; ack < pSndBuf->end; ++ack) {
|
||||
if (pSndBuf->entries[ack % pSndBuf->size]) {
|
||||
pSndBuf->cursor = ack;
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
for (int64_t ack = pSndBuf->start; ack < pSndBuf->cursor; ++ack) {
|
||||
rpcFreeCont(pSndBuf->entries[ack % pSndBuf->size]);
|
||||
pSndBuf->entries[ack % pSndBuf->size] = NULL;
|
||||
pSndBuf->start = ack + 1;
|
||||
}
|
||||
|
||||
while (pSender->seq - pSndBuf->start < (pSndBuf->size >> 2)) {
|
||||
if (snapshotSend(pSender) != 0) {
|
||||
code = terrno;
|
||||
goto _out;
|
||||
}
|
||||
}
|
||||
|
||||
_out:
|
||||
taosThreadMutexUnlock(&pSndBuf->mutex);
|
||||
return code;
|
||||
}
|
||||
|
||||
// sender on message
|
||||
//
|
||||
// condition 1 sender receives SYNC_SNAPSHOT_SEQ_END, close sender
|
||||
// condition 2 sender receives ack, set seq = ack + 1, send msg from seq
|
||||
// condition 3 sender receives error msg, just print error log
|
||||
//
|
||||
int32_t syncNodeOnSnapshotRsp(SSyncNode *pSyncNode, const SRpcMsg *pRpcMsg) {
|
||||
SyncSnapshotRsp *pMsg = pRpcMsg->pCont;
|
||||
int32_t syncNodeOnSnapshotRsp(SSyncNode *pSyncNode, SRpcMsg *pRpcMsg) {
|
||||
SyncSnapshotRsp **ppMsg = (SyncSnapshotRsp **)&pRpcMsg->pCont;
|
||||
SyncSnapshotRsp *pMsg = ppMsg[0];
|
||||
|
||||
// if already drop replica, do not process
|
||||
if (!syncNodeInRaftGroup(pSyncNode, &pMsg->srcId)) {
|
||||
|
@ -1123,12 +1285,8 @@ int32_t syncNodeOnSnapshotRsp(SSyncNode *pSyncNode, const SRpcMsg *pRpcMsg) {
|
|||
|
||||
if (pMsg->ack == SYNC_SNAPSHOT_SEQ_BEGIN) {
|
||||
syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "process seq begin");
|
||||
if (snapshotSenderUpdateProgress(pSender, pMsg) != 0) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (snapshotSend(pSender) != 0) {
|
||||
return -1;
|
||||
goto _ERROR;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
@ -1142,30 +1300,9 @@ int32_t syncNodeOnSnapshotRsp(SSyncNode *pSyncNode, const SRpcMsg *pRpcMsg) {
|
|||
}
|
||||
|
||||
// send next msg
|
||||
if (pMsg->ack == pSender->seq) {
|
||||
syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "process seq data");
|
||||
// update sender ack
|
||||
if (snapshotSenderUpdateProgress(pSender, pMsg) != 0) {
|
||||
return -1;
|
||||
}
|
||||
if (snapshotSend(pSender) != 0) {
|
||||
return -1;
|
||||
}
|
||||
} else if (pMsg->ack == pSender->seq - 1) {
|
||||
// maybe resend
|
||||
syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "process seq and resend");
|
||||
if (snapshotReSend(pSender) != 0) {
|
||||
return -1;
|
||||
}
|
||||
} else {
|
||||
// error log
|
||||
syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "receive error ack");
|
||||
sSError(pSender, "snapshot sender receive error ack:%d, my seq:%d", pMsg->ack, pSender->seq);
|
||||
snapshotSenderStop(pSender, true);
|
||||
syncNodeReplicateReset(pSyncNode, &pMsg->srcId);
|
||||
return -1;
|
||||
if (syncSnapBufferSend(pSender, ppMsg) != 0) {
|
||||
goto _ERROR;
|
||||
}
|
||||
|
||||
return 0;
|
||||
|
||||
_ERROR:
|
||||
|
|
Loading…
Reference in New Issue