Merge pull request #14438 from taosdata/feature/3.0_mhli
refactor(sync): add snapshot2 interface
This commit is contained in:
commit
3d03d326a7
|
@ -362,6 +362,11 @@ typedef struct SOffsetAndContLen {
|
||||||
} SOffsetAndContLen;
|
} SOffsetAndContLen;
|
||||||
*/
|
*/
|
||||||
|
|
||||||
|
// block1: SOffsetAndContLen
|
||||||
|
// block2: SOffsetAndContLen Array
|
||||||
|
// block3: SRpcMsg Array
|
||||||
|
// block4: SRpcMsg pCont Array
|
||||||
|
|
||||||
typedef struct SyncAppendEntriesBatch {
|
typedef struct SyncAppendEntriesBatch {
|
||||||
uint32_t bytes;
|
uint32_t bytes;
|
||||||
int32_t vgId;
|
int32_t vgId;
|
||||||
|
|
|
@ -40,8 +40,8 @@ typedef struct SSyncSnapshotSender {
|
||||||
bool start;
|
bool start;
|
||||||
int32_t seq;
|
int32_t seq;
|
||||||
int32_t ack;
|
int32_t ack;
|
||||||
void * pReader;
|
void *pReader;
|
||||||
void * pCurrentBlock;
|
void *pCurrentBlock;
|
||||||
int32_t blockLen;
|
int32_t blockLen;
|
||||||
SSnapshot snapshot;
|
SSnapshot snapshot;
|
||||||
SSyncCfg lastConfig;
|
SSyncCfg lastConfig;
|
||||||
|
@ -56,20 +56,20 @@ typedef struct SSyncSnapshotSender {
|
||||||
SSyncSnapshotSender *snapshotSenderCreate(SSyncNode *pSyncNode, int32_t replicaIndex);
|
SSyncSnapshotSender *snapshotSenderCreate(SSyncNode *pSyncNode, int32_t replicaIndex);
|
||||||
void snapshotSenderDestroy(SSyncSnapshotSender *pSender);
|
void snapshotSenderDestroy(SSyncSnapshotSender *pSender);
|
||||||
bool snapshotSenderIsStart(SSyncSnapshotSender *pSender);
|
bool snapshotSenderIsStart(SSyncSnapshotSender *pSender);
|
||||||
void snapshotSenderStart(SSyncSnapshotSender *pSender, SSnapshot snapshot, void *pReader);
|
int32_t snapshotSenderStart(SSyncSnapshotSender *pSender, SSnapshot snapshot, void *pReader);
|
||||||
void snapshotSenderStop(SSyncSnapshotSender *pSender, bool finish);
|
int32_t snapshotSenderStop(SSyncSnapshotSender *pSender, bool finish);
|
||||||
int32_t snapshotSend(SSyncSnapshotSender *pSender);
|
int32_t snapshotSend(SSyncSnapshotSender *pSender);
|
||||||
int32_t snapshotReSend(SSyncSnapshotSender *pSender);
|
int32_t snapshotReSend(SSyncSnapshotSender *pSender);
|
||||||
|
|
||||||
cJSON *snapshotSender2Json(SSyncSnapshotSender *pSender);
|
cJSON *snapshotSender2Json(SSyncSnapshotSender *pSender);
|
||||||
char * snapshotSender2Str(SSyncSnapshotSender *pSender);
|
char *snapshotSender2Str(SSyncSnapshotSender *pSender);
|
||||||
char * snapshotSender2SimpleStr(SSyncSnapshotSender *pSender, char *event);
|
char *snapshotSender2SimpleStr(SSyncSnapshotSender *pSender, char *event);
|
||||||
|
|
||||||
//---------------------------------------------------
|
//---------------------------------------------------
|
||||||
typedef struct SSyncSnapshotReceiver {
|
typedef struct SSyncSnapshotReceiver {
|
||||||
bool start;
|
bool start;
|
||||||
int32_t ack;
|
int32_t ack;
|
||||||
void * pWriter;
|
void *pWriter;
|
||||||
SyncTerm term;
|
SyncTerm term;
|
||||||
SyncTerm privateTerm;
|
SyncTerm privateTerm;
|
||||||
SSnapshot snapshot;
|
SSnapshot snapshot;
|
||||||
|
@ -80,13 +80,13 @@ typedef struct SSyncSnapshotReceiver {
|
||||||
|
|
||||||
SSyncSnapshotReceiver *snapshotReceiverCreate(SSyncNode *pSyncNode, SRaftId fromId);
|
SSyncSnapshotReceiver *snapshotReceiverCreate(SSyncNode *pSyncNode, SRaftId fromId);
|
||||||
void snapshotReceiverDestroy(SSyncSnapshotReceiver *pReceiver);
|
void snapshotReceiverDestroy(SSyncSnapshotReceiver *pReceiver);
|
||||||
void snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver, SyncTerm privateTerm, SyncSnapshotSend *pBeginMsg);
|
int32_t snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver, SyncTerm privateTerm, SyncSnapshotSend *pBeginMsg);
|
||||||
bool snapshotReceiverIsStart(SSyncSnapshotReceiver *pReceiver);
|
int32_t snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver);
|
||||||
void snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver);
|
bool snapshotReceiverIsStart(SSyncSnapshotReceiver *pReceiver);
|
||||||
|
|
||||||
cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver);
|
cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver);
|
||||||
char * snapshotReceiver2Str(SSyncSnapshotReceiver *pReceiver);
|
char *snapshotReceiver2Str(SSyncSnapshotReceiver *pReceiver);
|
||||||
char * snapshotReceiver2SimpleStr(SSyncSnapshotReceiver *pReceiver, char *event);
|
char *snapshotReceiver2SimpleStr(SSyncSnapshotReceiver *pReceiver, char *event);
|
||||||
|
|
||||||
//---------------------------------------------------
|
//---------------------------------------------------
|
||||||
// on message
|
// on message
|
||||||
|
|
|
@ -628,6 +628,8 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
|
||||||
|
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
static int32_t syncNodeMakeLogSame2(SSyncNode* ths, SyncAppendEntriesBatch* pMsg) { return 0; }
|
||||||
|
|
||||||
static int32_t syncNodeMakeLogSame(SSyncNode* ths, SyncAppendEntries* pMsg) {
|
static int32_t syncNodeMakeLogSame(SSyncNode* ths, SyncAppendEntries* pMsg) {
|
||||||
int32_t code;
|
int32_t code;
|
||||||
|
|
||||||
|
@ -719,7 +721,282 @@ static bool syncNodeOnAppendEntriesLogOK(SSyncNode* pSyncNode, SyncAppendEntries
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatch* pMsg) { return 0; }
|
int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatch* pMsg) {
|
||||||
|
int32_t ret = 0;
|
||||||
|
int32_t code = 0;
|
||||||
|
|
||||||
|
// if already drop replica, do not process
|
||||||
|
if (!syncNodeInRaftGroup(ths, &(pMsg->srcId)) && !ths->pRaftCfg->isStandBy) {
|
||||||
|
syncNodeEventLog(ths, "recv sync-append-entries-batch, maybe replica already dropped");
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
|
// maybe update term
|
||||||
|
if (pMsg->term > ths->pRaftStore->currentTerm) {
|
||||||
|
syncNodeUpdateTerm(ths, pMsg->term);
|
||||||
|
}
|
||||||
|
ASSERT(pMsg->term <= ths->pRaftStore->currentTerm);
|
||||||
|
|
||||||
|
// reset elect timer
|
||||||
|
if (pMsg->term == ths->pRaftStore->currentTerm) {
|
||||||
|
ths->leaderCache = pMsg->srcId;
|
||||||
|
syncNodeResetElectTimer(ths);
|
||||||
|
}
|
||||||
|
ASSERT(pMsg->dataLen >= 0);
|
||||||
|
|
||||||
|
// candidate to follower
|
||||||
|
//
|
||||||
|
// operation:
|
||||||
|
// to follower
|
||||||
|
do {
|
||||||
|
bool condition = pMsg->term == ths->pRaftStore->currentTerm && ths->state == TAOS_SYNC_STATE_CANDIDATE;
|
||||||
|
if (condition) {
|
||||||
|
syncNodeEventLog(ths, "recv sync-append-entries-batch, candidate to follower");
|
||||||
|
|
||||||
|
syncNodeBecomeFollower(ths, "from candidate by append entries");
|
||||||
|
// do not reply?
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
} while (0);
|
||||||
|
|
||||||
|
// fake match2
|
||||||
|
//
|
||||||
|
// condition1:
|
||||||
|
// preIndex <= my commit index
|
||||||
|
//
|
||||||
|
// operation:
|
||||||
|
// if hasAppendEntries && pMsg->prevLogIndex == ths->commitIndex, append entry
|
||||||
|
// match my-commit-index or my-commit-index + 1
|
||||||
|
// no operation on log
|
||||||
|
do {
|
||||||
|
bool condition = (pMsg->term == ths->pRaftStore->currentTerm) && (ths->state == TAOS_SYNC_STATE_FOLLOWER) &&
|
||||||
|
(pMsg->prevLogIndex <= ths->commitIndex);
|
||||||
|
if (condition) {
|
||||||
|
do {
|
||||||
|
char logBuf[128];
|
||||||
|
snprintf(logBuf, sizeof(logBuf),
|
||||||
|
"recv sync-append-entries-batch, fake match2, pre-index:%ld, pre-term:%lu, datalen:%d",
|
||||||
|
pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->dataLen);
|
||||||
|
syncNodeEventLog(ths, logBuf);
|
||||||
|
} while (0);
|
||||||
|
|
||||||
|
SyncIndex matchIndex = ths->commitIndex;
|
||||||
|
bool hasAppendEntries = pMsg->dataLen > 0;
|
||||||
|
if (hasAppendEntries && pMsg->prevLogIndex == ths->commitIndex) {
|
||||||
|
SRpcMsg rpcMsgArr[SYNC_MAX_BATCH_SIZE];
|
||||||
|
memset(rpcMsgArr, 0, sizeof(rpcMsgArr));
|
||||||
|
int32_t retArrSize = 0;
|
||||||
|
syncAppendEntriesBatch2RpcMsgArray(pMsg, rpcMsgArr, SYNC_MAX_BATCH_SIZE, &retArrSize);
|
||||||
|
|
||||||
|
// make log same
|
||||||
|
do {
|
||||||
|
SyncIndex logLastIndex = ths->pLogStore->syncLogLastIndex(ths->pLogStore);
|
||||||
|
bool hasExtraEntries = logLastIndex > pMsg->prevLogIndex;
|
||||||
|
|
||||||
|
if (hasExtraEntries) {
|
||||||
|
// make log same, rollback deleted entries
|
||||||
|
code = syncNodeMakeLogSame2(ths, pMsg);
|
||||||
|
ASSERT(code == 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
} while (0);
|
||||||
|
|
||||||
|
// append entry batch
|
||||||
|
for (int32_t i = 0; i < retArrSize; ++i) {
|
||||||
|
SSyncRaftEntry* pAppendEntry = syncEntryBuild(1234);
|
||||||
|
code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pAppendEntry);
|
||||||
|
if (code != 0) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
code = syncNodePreCommit(ths, pAppendEntry);
|
||||||
|
ASSERT(code == 0);
|
||||||
|
|
||||||
|
syncEntryDestory(pAppendEntry);
|
||||||
|
}
|
||||||
|
|
||||||
|
// fsync once
|
||||||
|
SSyncLogStoreData* pData = ths->pLogStore->data;
|
||||||
|
SWal* pWal = pData->pWal;
|
||||||
|
walFsync(pWal, true);
|
||||||
|
|
||||||
|
// update match index
|
||||||
|
matchIndex = pMsg->prevLogIndex + retArrSize;
|
||||||
|
}
|
||||||
|
|
||||||
|
// prepare response msg
|
||||||
|
SyncAppendEntriesReply* pReply = syncAppendEntriesReplyBuild(ths->vgId);
|
||||||
|
pReply->srcId = ths->myRaftId;
|
||||||
|
pReply->destId = pMsg->srcId;
|
||||||
|
pReply->term = ths->pRaftStore->currentTerm;
|
||||||
|
pReply->privateTerm = ths->pNewNodeReceiver->privateTerm;
|
||||||
|
pReply->success = true;
|
||||||
|
pReply->matchIndex = matchIndex;
|
||||||
|
|
||||||
|
// send response
|
||||||
|
SRpcMsg rpcMsg;
|
||||||
|
syncAppendEntriesReply2RpcMsg(pReply, &rpcMsg);
|
||||||
|
syncNodeSendMsgById(&pReply->destId, ths, &rpcMsg);
|
||||||
|
syncAppendEntriesReplyDestroy(pReply);
|
||||||
|
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
} while (0);
|
||||||
|
|
||||||
|
// calculate logOK here, before will coredump, due to fake match
|
||||||
|
// bool logOK = syncNodeOnAppendEntriesLogOK(ths, pMsg);
|
||||||
|
bool logOK = true;
|
||||||
|
|
||||||
|
// not match
|
||||||
|
//
|
||||||
|
// condition1:
|
||||||
|
// term < myTerm
|
||||||
|
//
|
||||||
|
// condition2:
|
||||||
|
// !logOK
|
||||||
|
//
|
||||||
|
// operation:
|
||||||
|
// not match
|
||||||
|
// no operation on log
|
||||||
|
do {
|
||||||
|
bool condition1 = pMsg->term < ths->pRaftStore->currentTerm;
|
||||||
|
bool condition2 =
|
||||||
|
(pMsg->term == ths->pRaftStore->currentTerm) && (ths->state == TAOS_SYNC_STATE_FOLLOWER) && !logOK;
|
||||||
|
bool condition = condition1 || condition2;
|
||||||
|
|
||||||
|
if (condition) {
|
||||||
|
char logBuf[128];
|
||||||
|
snprintf(logBuf, sizeof(logBuf), "recv sync-append-entries, not match, pre-index:%ld, pre-term:%lu, datalen:%d",
|
||||||
|
pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->dataLen);
|
||||||
|
syncNodeEventLog(ths, logBuf);
|
||||||
|
|
||||||
|
// prepare response msg
|
||||||
|
SyncAppendEntriesReply* pReply = syncAppendEntriesReplyBuild(ths->vgId);
|
||||||
|
pReply->srcId = ths->myRaftId;
|
||||||
|
pReply->destId = pMsg->srcId;
|
||||||
|
pReply->term = ths->pRaftStore->currentTerm;
|
||||||
|
pReply->privateTerm = ths->pNewNodeReceiver->privateTerm;
|
||||||
|
pReply->success = false;
|
||||||
|
pReply->matchIndex = SYNC_INDEX_INVALID;
|
||||||
|
|
||||||
|
// send response
|
||||||
|
SRpcMsg rpcMsg;
|
||||||
|
syncAppendEntriesReply2RpcMsg(pReply, &rpcMsg);
|
||||||
|
syncNodeSendMsgById(&pReply->destId, ths, &rpcMsg);
|
||||||
|
syncAppendEntriesReplyDestroy(pReply);
|
||||||
|
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
} while (0);
|
||||||
|
|
||||||
|
// really match
|
||||||
|
//
|
||||||
|
// condition:
|
||||||
|
// logOK
|
||||||
|
//
|
||||||
|
// operation:
|
||||||
|
// match
|
||||||
|
// make log same
|
||||||
|
do {
|
||||||
|
bool condition = (pMsg->term == ths->pRaftStore->currentTerm) && (ths->state == TAOS_SYNC_STATE_FOLLOWER) && logOK;
|
||||||
|
if (condition) {
|
||||||
|
// has extra entries (> preIndex) in local log
|
||||||
|
SyncIndex myLastIndex = syncNodeGetLastIndex(ths);
|
||||||
|
bool hasExtraEntries = myLastIndex > pMsg->prevLogIndex;
|
||||||
|
|
||||||
|
// has entries in SyncAppendEntries msg
|
||||||
|
bool hasAppendEntries = pMsg->dataLen > 0;
|
||||||
|
|
||||||
|
char logBuf[128];
|
||||||
|
snprintf(logBuf, sizeof(logBuf), "recv sync-append-entries, match, pre-index:%ld, pre-term:%lu, datalen:%d",
|
||||||
|
pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->dataLen);
|
||||||
|
syncNodeEventLog(ths, logBuf);
|
||||||
|
|
||||||
|
if (hasExtraEntries) {
|
||||||
|
// make log same, rollback deleted entries
|
||||||
|
// code = syncNodeMakeLogSame(ths, pMsg);
|
||||||
|
ASSERT(code == 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t retArrSize = 0;
|
||||||
|
if (hasAppendEntries) {
|
||||||
|
SRpcMsg rpcMsgArr[SYNC_MAX_BATCH_SIZE];
|
||||||
|
memset(rpcMsgArr, 0, sizeof(rpcMsgArr));
|
||||||
|
syncAppendEntriesBatch2RpcMsgArray(pMsg, rpcMsgArr, SYNC_MAX_BATCH_SIZE, &retArrSize);
|
||||||
|
|
||||||
|
// append entry batch
|
||||||
|
for (int32_t i = 0; i < retArrSize; ++i) {
|
||||||
|
SSyncRaftEntry* pAppendEntry = syncEntryBuild(1234);
|
||||||
|
code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pAppendEntry);
|
||||||
|
if (code != 0) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
code = syncNodePreCommit(ths, pAppendEntry);
|
||||||
|
ASSERT(code == 0);
|
||||||
|
|
||||||
|
syncEntryDestory(pAppendEntry);
|
||||||
|
}
|
||||||
|
|
||||||
|
// fsync once
|
||||||
|
SSyncLogStoreData* pData = ths->pLogStore->data;
|
||||||
|
SWal* pWal = pData->pWal;
|
||||||
|
walFsync(pWal, true);
|
||||||
|
}
|
||||||
|
|
||||||
|
// prepare response msg
|
||||||
|
SyncAppendEntriesReply* pReply = syncAppendEntriesReplyBuild(ths->vgId);
|
||||||
|
pReply->srcId = ths->myRaftId;
|
||||||
|
pReply->destId = pMsg->srcId;
|
||||||
|
pReply->term = ths->pRaftStore->currentTerm;
|
||||||
|
pReply->privateTerm = ths->pNewNodeReceiver->privateTerm;
|
||||||
|
pReply->success = true;
|
||||||
|
pReply->matchIndex = hasAppendEntries ? pMsg->prevLogIndex + retArrSize : pMsg->prevLogIndex;
|
||||||
|
|
||||||
|
// send response
|
||||||
|
SRpcMsg rpcMsg;
|
||||||
|
syncAppendEntriesReply2RpcMsg(pReply, &rpcMsg);
|
||||||
|
syncNodeSendMsgById(&pReply->destId, ths, &rpcMsg);
|
||||||
|
syncAppendEntriesReplyDestroy(pReply);
|
||||||
|
|
||||||
|
// maybe update commit index, leader notice me
|
||||||
|
if (pMsg->commitIndex > ths->commitIndex) {
|
||||||
|
// has commit entry in local
|
||||||
|
if (pMsg->commitIndex <= ths->pLogStore->syncLogLastIndex(ths->pLogStore)) {
|
||||||
|
// advance commit index to sanpshot first
|
||||||
|
SSnapshot snapshot;
|
||||||
|
ths->pFsm->FpGetSnapshotInfo(ths->pFsm, &snapshot);
|
||||||
|
if (snapshot.lastApplyIndex >= 0 && snapshot.lastApplyIndex > ths->commitIndex) {
|
||||||
|
SyncIndex commitBegin = ths->commitIndex;
|
||||||
|
SyncIndex commitEnd = snapshot.lastApplyIndex;
|
||||||
|
ths->commitIndex = snapshot.lastApplyIndex;
|
||||||
|
|
||||||
|
char eventLog[128];
|
||||||
|
snprintf(eventLog, sizeof(eventLog), "commit by snapshot from index:%ld to index:%ld", commitBegin,
|
||||||
|
commitEnd);
|
||||||
|
syncNodeEventLog(ths, eventLog);
|
||||||
|
}
|
||||||
|
|
||||||
|
SyncIndex beginIndex = ths->commitIndex + 1;
|
||||||
|
SyncIndex endIndex = pMsg->commitIndex;
|
||||||
|
|
||||||
|
// update commit index
|
||||||
|
ths->commitIndex = pMsg->commitIndex;
|
||||||
|
|
||||||
|
// call back Wal
|
||||||
|
code = ths->pLogStore->updateCommitIndex(ths->pLogStore, ths->commitIndex);
|
||||||
|
ASSERT(code == 0);
|
||||||
|
|
||||||
|
code = syncNodeCommit(ths, beginIndex, endIndex, ths->state);
|
||||||
|
ASSERT(code == 0);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
} while (0);
|
||||||
|
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
|
int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
|
||||||
int32_t ret = 0;
|
int32_t ret = 0;
|
||||||
|
|
|
@ -80,7 +80,7 @@ void snapshotSenderDestroy(SSyncSnapshotSender *pSender) {
|
||||||
bool snapshotSenderIsStart(SSyncSnapshotSender *pSender) { return pSender->start; }
|
bool snapshotSenderIsStart(SSyncSnapshotSender *pSender) { return pSender->start; }
|
||||||
|
|
||||||
// begin send snapshot by snapshot, pReader
|
// begin send snapshot by snapshot, pReader
|
||||||
void snapshotSenderStart(SSyncSnapshotSender *pSender, SSnapshot snapshot, void *pReader) {
|
int32_t snapshotSenderStart(SSyncSnapshotSender *pSender, SSnapshot snapshot, void *pReader) {
|
||||||
ASSERT(!snapshotSenderIsStart(pSender));
|
ASSERT(!snapshotSenderIsStart(pSender));
|
||||||
|
|
||||||
// init snapshot and reader
|
// init snapshot and reader
|
||||||
|
@ -181,9 +181,11 @@ void snapshotSenderStart(SSyncSnapshotSender *pSender, SSnapshot snapshot, void
|
||||||
syncNodeEventLog(pSender->pSyncNode, eventLog);
|
syncNodeEventLog(pSender->pSyncNode, eventLog);
|
||||||
taosMemoryFree(eventLog);
|
taosMemoryFree(eventLog);
|
||||||
} while (0);
|
} while (0);
|
||||||
|
|
||||||
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
void snapshotSenderStop(SSyncSnapshotSender *pSender, bool finish) {
|
int32_t snapshotSenderStop(SSyncSnapshotSender *pSender, bool finish) {
|
||||||
// close reader
|
// close reader
|
||||||
if (pSender->pReader != NULL) {
|
if (pSender->pReader != NULL) {
|
||||||
int32_t ret = pSender->pSyncNode->pFsm->FpSnapshotStopRead(pSender->pSyncNode->pFsm, pSender->pReader);
|
int32_t ret = pSender->pSyncNode->pFsm->FpSnapshotStopRead(pSender->pSyncNode->pFsm, pSender->pReader);
|
||||||
|
@ -208,6 +210,8 @@ void snapshotSenderStop(SSyncSnapshotSender *pSender, bool finish) {
|
||||||
syncNodeEventLog(pSender->pSyncNode, eventLog);
|
syncNodeEventLog(pSender->pSyncNode, eventLog);
|
||||||
taosMemoryFree(eventLog);
|
taosMemoryFree(eventLog);
|
||||||
} while (0);
|
} while (0);
|
||||||
|
|
||||||
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
// when sender receive ack, call this function to send msg from seq
|
// when sender receive ack, call this function to send msg from seq
|
||||||
|
@ -349,14 +353,14 @@ cJSON *snapshotSender2Json(SSyncSnapshotSender *pSender) {
|
||||||
|
|
||||||
char *snapshotSender2Str(SSyncSnapshotSender *pSender) {
|
char *snapshotSender2Str(SSyncSnapshotSender *pSender) {
|
||||||
cJSON *pJson = snapshotSender2Json(pSender);
|
cJSON *pJson = snapshotSender2Json(pSender);
|
||||||
char * serialized = cJSON_Print(pJson);
|
char *serialized = cJSON_Print(pJson);
|
||||||
cJSON_Delete(pJson);
|
cJSON_Delete(pJson);
|
||||||
return serialized;
|
return serialized;
|
||||||
}
|
}
|
||||||
|
|
||||||
char *snapshotSender2SimpleStr(SSyncSnapshotSender *pSender, char *event) {
|
char *snapshotSender2SimpleStr(SSyncSnapshotSender *pSender, char *event) {
|
||||||
int32_t len = 256;
|
int32_t len = 256;
|
||||||
char * s = taosMemoryMalloc(len);
|
char *s = taosMemoryMalloc(len);
|
||||||
|
|
||||||
SRaftId destId = pSender->pSyncNode->replicasId[pSender->replicaIndex];
|
SRaftId destId = pSender->pSyncNode->replicasId[pSender->replicaIndex];
|
||||||
char host[64];
|
char host[64];
|
||||||
|
@ -471,7 +475,7 @@ static void snapshotReceiverForceStop(SSyncSnapshotReceiver *pReceiver) {
|
||||||
|
|
||||||
// if receiver receive msg from seq = SYNC_SNAPSHOT_SEQ_BEGIN, start receiver
|
// if receiver receive msg from seq = SYNC_SNAPSHOT_SEQ_BEGIN, start receiver
|
||||||
// if already start, force close, start again
|
// if already start, force close, start again
|
||||||
void snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver, SyncTerm privateTerm, SyncSnapshotSend *pBeginMsg) {
|
int32_t snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver, SyncTerm privateTerm, SyncSnapshotSend *pBeginMsg) {
|
||||||
if (!snapshotReceiverIsStart(pReceiver)) {
|
if (!snapshotReceiverIsStart(pReceiver)) {
|
||||||
// first start
|
// first start
|
||||||
snapshotReceiverDoStart(pReceiver, privateTerm, pBeginMsg);
|
snapshotReceiverDoStart(pReceiver, privateTerm, pBeginMsg);
|
||||||
|
@ -486,9 +490,11 @@ void snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver, SyncTerm privateTer
|
||||||
// start again
|
// start again
|
||||||
snapshotReceiverDoStart(pReceiver, privateTerm, pBeginMsg);
|
snapshotReceiverDoStart(pReceiver, privateTerm, pBeginMsg);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
void snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver) {
|
int32_t snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver) {
|
||||||
if (pReceiver->pWriter != NULL) {
|
if (pReceiver->pWriter != NULL) {
|
||||||
int32_t ret =
|
int32_t ret =
|
||||||
pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, false);
|
pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, false);
|
||||||
|
@ -506,6 +512,8 @@ void snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver) {
|
||||||
syncNodeEventLog(pReceiver->pSyncNode, eventLog);
|
syncNodeEventLog(pReceiver->pSyncNode, eventLog);
|
||||||
taosMemoryFree(eventLog);
|
taosMemoryFree(eventLog);
|
||||||
} while (0);
|
} while (0);
|
||||||
|
|
||||||
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t snapshotReceiverFinish(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pMsg) {
|
static int32_t snapshotReceiverFinish(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pMsg) {
|
||||||
|
@ -604,7 +612,7 @@ cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver) {
|
||||||
cJSON_AddStringToObject(pFromId, "addr", u64buf);
|
cJSON_AddStringToObject(pFromId, "addr", u64buf);
|
||||||
{
|
{
|
||||||
uint64_t u64 = pReceiver->fromId.addr;
|
uint64_t u64 = pReceiver->fromId.addr;
|
||||||
cJSON * pTmp = pFromId;
|
cJSON *pTmp = pFromId;
|
||||||
char host[128] = {0};
|
char host[128] = {0};
|
||||||
uint16_t port;
|
uint16_t port;
|
||||||
syncUtilU642Addr(u64, host, sizeof(host), &port);
|
syncUtilU642Addr(u64, host, sizeof(host), &port);
|
||||||
|
@ -637,14 +645,14 @@ cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver) {
|
||||||
|
|
||||||
char *snapshotReceiver2Str(SSyncSnapshotReceiver *pReceiver) {
|
char *snapshotReceiver2Str(SSyncSnapshotReceiver *pReceiver) {
|
||||||
cJSON *pJson = snapshotReceiver2Json(pReceiver);
|
cJSON *pJson = snapshotReceiver2Json(pReceiver);
|
||||||
char * serialized = cJSON_Print(pJson);
|
char *serialized = cJSON_Print(pJson);
|
||||||
cJSON_Delete(pJson);
|
cJSON_Delete(pJson);
|
||||||
return serialized;
|
return serialized;
|
||||||
}
|
}
|
||||||
|
|
||||||
char *snapshotReceiver2SimpleStr(SSyncSnapshotReceiver *pReceiver, char *event) {
|
char *snapshotReceiver2SimpleStr(SSyncSnapshotReceiver *pReceiver, char *event) {
|
||||||
int32_t len = 256;
|
int32_t len = 256;
|
||||||
char * s = taosMemoryMalloc(len);
|
char *s = taosMemoryMalloc(len);
|
||||||
|
|
||||||
SRaftId fromId = pReceiver->fromId;
|
SRaftId fromId = pReceiver->fromId;
|
||||||
char host[128];
|
char host[128];
|
||||||
|
|
Loading…
Reference in New Issue