Merge pull request #14676 from taosdata/feature/3.0_mhli
fix(sync): append entries batch
This commit is contained in:
commit
d94680c3c4
|
@ -163,9 +163,6 @@ typedef struct SSyncLogStore {
|
|||
// return commit index of log
|
||||
SyncIndex (*getCommitIndex)(struct SSyncLogStore* pLogStore);
|
||||
|
||||
// refactor, log[0 .. n] ==> log[m .. n]
|
||||
// int32_t (*syncLogSetBeginIndex)(struct SSyncLogStore* pLogStore, SyncIndex beginIndex);
|
||||
|
||||
SyncIndex (*syncLogBeginIndex)(struct SSyncLogStore* pLogStore);
|
||||
SyncIndex (*syncLogEndIndex)(struct SSyncLogStore* pLogStore);
|
||||
bool (*syncLogIsEmpty)(struct SSyncLogStore* pLogStore);
|
||||
|
|
|
@ -1,72 +0,0 @@
|
|||
/*
|
||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||
*
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3
|
||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#ifndef _TD_LIBS_SYNC_ON_MESSAGE_H
|
||||
#define _TD_LIBS_SYNC_ON_MESSAGE_H
|
||||
|
||||
#ifdef __cplusplus
|
||||
extern "C" {
|
||||
#endif
|
||||
|
||||
#include <stdint.h>
|
||||
#include <stdio.h>
|
||||
#include <stdlib.h>
|
||||
#include "taosdef.h"
|
||||
|
||||
// TLA+ Spec
|
||||
// Receive(m) ==
|
||||
// LET i == m.mdest
|
||||
// j == m.msource
|
||||
// IN \* Any RPC with a newer term causes the recipient to advance
|
||||
// \* its term first. Responses with stale terms are ignored.
|
||||
// \/ UpdateTerm(i, j, m)
|
||||
// \/ /\ m.mtype = RequestVoteRequest
|
||||
// /\ HandleRequestVoteRequest(i, j, m)
|
||||
// \/ /\ m.mtype = RequestVoteResponse
|
||||
// /\ \/ DropStaleResponse(i, j, m)
|
||||
// \/ HandleRequestVoteResponse(i, j, m)
|
||||
// \/ /\ m.mtype = AppendEntriesRequest
|
||||
// /\ HandleAppendEntriesRequest(i, j, m)
|
||||
// \/ /\ m.mtype = AppendEntriesResponse
|
||||
// /\ \/ DropStaleResponse(i, j, m)
|
||||
// \/ HandleAppendEntriesResponse(i, j, m)
|
||||
|
||||
// DuplicateMessage(m) ==
|
||||
// /\ Send(m)
|
||||
// /\ UNCHANGED <<serverVars, candidateVars, leaderVars, logVars>>
|
||||
|
||||
// DropMessage(m) ==
|
||||
// /\ Discard(m)
|
||||
// /\ UNCHANGED <<serverVars, candidateVars, leaderVars, logVars>>
|
||||
|
||||
// Next == /\ \/ \E i \in Server : Restart(i)
|
||||
// \/ \E i \in Server : Timeout(i)
|
||||
// \/ \E i,j \in Server : RequestVote(i, j)
|
||||
// \/ \E i \in Server : BecomeLeader(i)
|
||||
// \/ \E i \in Server, v \in Value : ClientRequest(i, v)
|
||||
// \/ \E i \in Server : AdvanceCommitIndex(i)
|
||||
// \/ \E i,j \in Server : AppendEntries(i, j)
|
||||
// \/ \E m \in DOMAIN messages : Receive(m)
|
||||
// \/ \E m \in DOMAIN messages : DuplicateMessage(m)
|
||||
// \/ \E m \in DOMAIN messages : DropMessage(m)
|
||||
// \* History variable that tracks every log ever:
|
||||
// /\ allLogs' = allLogs \cup {log[i] : i \in Server}
|
||||
//
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
||||
#endif /*_TD_LIBS_SYNC_ON_MESSAGE_H*/
|
|
@ -162,6 +162,17 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
|
|||
pReply->success = false;
|
||||
pReply->matchIndex = SYNC_INDEX_INVALID;
|
||||
|
||||
// msg event log
|
||||
do {
|
||||
char host[128];
|
||||
uint16_t port;
|
||||
syncUtilU642Addr(pReply->destId.addr, host, sizeof(host), &port);
|
||||
sDebug(
|
||||
"vgId:%d, send sync-append-entries-reply to %s:%d, {term:%lu, pterm:%lu, success:%d, "
|
||||
"match-index:%ld}",
|
||||
ths->vgId, host, port, pReply->term, pReply->privateTerm, pReply->success, pReply->matchIndex);
|
||||
} while (0);
|
||||
|
||||
SRpcMsg rpcMsg;
|
||||
syncAppendEntriesReply2RpcMsg(pReply, &rpcMsg);
|
||||
syncNodeSendMsgById(&pReply->destId, ths, &rpcMsg);
|
||||
|
@ -334,270 +345,16 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
|
|||
pReply->matchIndex = pMsg->prevLogIndex;
|
||||
}
|
||||
|
||||
SRpcMsg rpcMsg;
|
||||
syncAppendEntriesReply2RpcMsg(pReply, &rpcMsg);
|
||||
syncNodeSendMsgById(&pReply->destId, ths, &rpcMsg);
|
||||
syncAppendEntriesReplyDestroy(pReply);
|
||||
|
||||
// maybe update commit index from leader
|
||||
if (pMsg->commitIndex > ths->commitIndex) {
|
||||
// has commit entry in local
|
||||
if (pMsg->commitIndex <= ths->pLogStore->getLastIndex(ths->pLogStore)) {
|
||||
SyncIndex beginIndex = ths->commitIndex + 1;
|
||||
SyncIndex endIndex = pMsg->commitIndex;
|
||||
|
||||
// update commit index
|
||||
ths->commitIndex = pMsg->commitIndex;
|
||||
|
||||
// call back Wal
|
||||
ths->pLogStore->updateCommitIndex(ths->pLogStore, ths->commitIndex);
|
||||
|
||||
int32_t code = syncNodeCommit(ths, beginIndex, endIndex, ths->state);
|
||||
ASSERT(code == 0);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
#if 0
|
||||
|
||||
int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
|
||||
int32_t ret = 0;
|
||||
|
||||
char logBuf[128] = {0};
|
||||
snprintf(logBuf, sizeof(logBuf), "==syncNodeOnAppendEntriesCb== term:%lu", ths->pRaftStore->currentTerm);
|
||||
syncAppendEntriesLog2(logBuf, pMsg);
|
||||
|
||||
if (pMsg->term > ths->pRaftStore->currentTerm) {
|
||||
syncNodeUpdateTerm(ths, pMsg->term);
|
||||
}
|
||||
ASSERT(pMsg->term <= ths->pRaftStore->currentTerm);
|
||||
|
||||
// reset elect timer
|
||||
if (pMsg->term == ths->pRaftStore->currentTerm) {
|
||||
ths->leaderCache = pMsg->srcId;
|
||||
syncNodeResetElectTimer(ths);
|
||||
}
|
||||
ASSERT(pMsg->dataLen >= 0);
|
||||
|
||||
SyncTerm localPreLogTerm = 0;
|
||||
if (pMsg->prevLogIndex >= SYNC_INDEX_BEGIN && pMsg->prevLogIndex <= ths->pLogStore->getLastIndex(ths->pLogStore)) {
|
||||
SSyncRaftEntry* pEntry = ths->pLogStore->getEntry(ths->pLogStore, pMsg->prevLogIndex);
|
||||
if (pEntry == NULL) {
|
||||
char logBuf[128];
|
||||
snprintf(logBuf, sizeof(logBuf), "getEntry error, index:%ld, since %s", pMsg->prevLogIndex, terrstr());
|
||||
syncNodeErrorLog(ths, logBuf);
|
||||
return -1;
|
||||
}
|
||||
|
||||
localPreLogTerm = pEntry->term;
|
||||
syncEntryDestory(pEntry);
|
||||
}
|
||||
|
||||
bool logOK =
|
||||
(pMsg->prevLogIndex == SYNC_INDEX_INVALID) ||
|
||||
((pMsg->prevLogIndex >= SYNC_INDEX_BEGIN) &&
|
||||
(pMsg->prevLogIndex <= ths->pLogStore->getLastIndex(ths->pLogStore)) && (pMsg->prevLogTerm == localPreLogTerm));
|
||||
|
||||
// reject request
|
||||
if ((pMsg->term < ths->pRaftStore->currentTerm) ||
|
||||
((pMsg->term == ths->pRaftStore->currentTerm) && (ths->state == TAOS_SYNC_STATE_FOLLOWER) && !logOK)) {
|
||||
sTrace(
|
||||
"syncNodeOnAppendEntriesCb --> reject, pMsg->term:%lu, ths->pRaftStore->currentTerm:%lu, ths->state:%d, "
|
||||
"logOK:%d",
|
||||
pMsg->term, ths->pRaftStore->currentTerm, ths->state, logOK);
|
||||
|
||||
SyncAppendEntriesReply* pReply = syncAppendEntriesReplyBuild(ths->vgId);
|
||||
pReply->srcId = ths->myRaftId;
|
||||
pReply->destId = pMsg->srcId;
|
||||
pReply->term = ths->pRaftStore->currentTerm;
|
||||
pReply->success = false;
|
||||
pReply->matchIndex = SYNC_INDEX_INVALID;
|
||||
|
||||
SRpcMsg rpcMsg;
|
||||
syncAppendEntriesReply2RpcMsg(pReply, &rpcMsg);
|
||||
syncNodeSendMsgById(&pReply->destId, ths, &rpcMsg);
|
||||
syncAppendEntriesReplyDestroy(pReply);
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
// return to follower state
|
||||
if (pMsg->term == ths->pRaftStore->currentTerm && ths->state == TAOS_SYNC_STATE_CANDIDATE) {
|
||||
sTrace(
|
||||
"syncNodeOnAppendEntriesCb --> return to follower, pMsg->term:%lu, ths->pRaftStore->currentTerm:%lu, "
|
||||
"ths->state:%d, logOK:%d",
|
||||
pMsg->term, ths->pRaftStore->currentTerm, ths->state, logOK);
|
||||
|
||||
syncNodeBecomeFollower(ths, "from candidate by append entries");
|
||||
|
||||
// ret or reply?
|
||||
return ret;
|
||||
}
|
||||
|
||||
// accept request
|
||||
if (pMsg->term == ths->pRaftStore->currentTerm && ths->state == TAOS_SYNC_STATE_FOLLOWER && logOK) {
|
||||
// preIndex = -1, or has preIndex entry in local log
|
||||
ASSERT(pMsg->prevLogIndex <= ths->pLogStore->getLastIndex(ths->pLogStore));
|
||||
|
||||
// has extra entries (> preIndex) in local log
|
||||
bool hasExtraEntries = pMsg->prevLogIndex < ths->pLogStore->getLastIndex(ths->pLogStore);
|
||||
|
||||
// has entries in SyncAppendEntries msg
|
||||
bool hasAppendEntries = pMsg->dataLen > 0;
|
||||
|
||||
sTrace(
|
||||
"syncNodeOnAppendEntriesCb --> accept, pMsg->term:%lu, ths->pRaftStore->currentTerm:%lu, ths->state:%d, "
|
||||
"logOK:%d, hasExtraEntries:%d, hasAppendEntries:%d",
|
||||
pMsg->term, ths->pRaftStore->currentTerm, ths->state, logOK, hasExtraEntries, hasAppendEntries);
|
||||
|
||||
if (hasExtraEntries && hasAppendEntries) {
|
||||
// not conflict by default
|
||||
bool conflict = false;
|
||||
|
||||
SyncIndex extraIndex = pMsg->prevLogIndex + 1;
|
||||
SSyncRaftEntry* pExtraEntry = ths->pLogStore->getEntry(ths->pLogStore, extraIndex);
|
||||
if (pExtraEntry == NULL) {
|
||||
char logBuf[128];
|
||||
snprintf(logBuf, sizeof(logBuf), "getEntry error2, index:%ld, since %s", extraIndex, terrstr());
|
||||
syncNodeErrorLog(ths, logBuf);
|
||||
return -1;
|
||||
}
|
||||
|
||||
SSyncRaftEntry* pAppendEntry = syncEntryDeserialize(pMsg->data, pMsg->dataLen);
|
||||
if (pAppendEntry == NULL) {
|
||||
syncNodeErrorLog(ths, "syncEntryDeserialize pAppendEntry error");
|
||||
return -1;
|
||||
}
|
||||
|
||||
// log not match, conflict
|
||||
ASSERT(extraIndex == pAppendEntry->index);
|
||||
if (pExtraEntry->term != pAppendEntry->term) {
|
||||
conflict = true;
|
||||
}
|
||||
|
||||
if (conflict) {
|
||||
// roll back
|
||||
SyncIndex delBegin = ths->pLogStore->getLastIndex(ths->pLogStore);
|
||||
SyncIndex delEnd = extraIndex;
|
||||
|
||||
sTrace("syncNodeOnAppendEntriesCb --> conflict:%d, delBegin:%ld, delEnd:%ld", conflict, delBegin, delEnd);
|
||||
|
||||
// notice! reverse roll back!
|
||||
for (SyncIndex index = delEnd; index >= delBegin; --index) {
|
||||
if (ths->pFsm->FpRollBackCb != NULL) {
|
||||
SSyncRaftEntry* pRollBackEntry = ths->pLogStore->getEntry(ths->pLogStore, index);
|
||||
if (pRollBackEntry == NULL) {
|
||||
char logBuf[128];
|
||||
snprintf(logBuf, sizeof(logBuf), "getEntry error3, index:%ld, since %s", index, terrstr());
|
||||
syncNodeErrorLog(ths, logBuf);
|
||||
return -1;
|
||||
}
|
||||
|
||||
// if (pRollBackEntry->msgType != TDMT_SYNC_NOOP) {
|
||||
if (syncUtilUserRollback(pRollBackEntry->msgType)) {
|
||||
SRpcMsg rpcMsg;
|
||||
syncEntry2OriginalRpc(pRollBackEntry, &rpcMsg);
|
||||
|
||||
SFsmCbMeta cbMeta = {0};
|
||||
cbMeta.index = pRollBackEntry->index;
|
||||
cbMeta.lastConfigIndex = syncNodeGetSnapshotConfigIndex(ths, cbMeta.index);
|
||||
cbMeta.isWeak = pRollBackEntry->isWeak;
|
||||
cbMeta.code = 0;
|
||||
cbMeta.state = ths->state;
|
||||
cbMeta.seqNum = pRollBackEntry->seqNum;
|
||||
ths->pFsm->FpRollBackCb(ths->pFsm, &rpcMsg, cbMeta);
|
||||
rpcFreeCont(rpcMsg.pCont);
|
||||
}
|
||||
|
||||
syncEntryDestory(pRollBackEntry);
|
||||
}
|
||||
}
|
||||
|
||||
// delete confict entries
|
||||
ths->pLogStore->truncate(ths->pLogStore, extraIndex);
|
||||
|
||||
// append new entries
|
||||
ths->pLogStore->appendEntry(ths->pLogStore, pAppendEntry);
|
||||
|
||||
// pre commit
|
||||
SRpcMsg rpcMsg;
|
||||
syncEntry2OriginalRpc(pAppendEntry, &rpcMsg);
|
||||
if (ths->pFsm != NULL) {
|
||||
// if (ths->pFsm->FpPreCommitCb != NULL && pAppendEntry->originalRpcType != TDMT_SYNC_NOOP) {
|
||||
if (ths->pFsm->FpPreCommitCb != NULL && syncUtilUserPreCommit(pAppendEntry->originalRpcType)) {
|
||||
SFsmCbMeta cbMeta = {0};
|
||||
cbMeta.index = pAppendEntry->index;
|
||||
cbMeta.lastConfigIndex = syncNodeGetSnapshotConfigIndex(ths, cbMeta.index);
|
||||
cbMeta.isWeak = pAppendEntry->isWeak;
|
||||
cbMeta.code = 2;
|
||||
cbMeta.state = ths->state;
|
||||
cbMeta.seqNum = pAppendEntry->seqNum;
|
||||
ths->pFsm->FpPreCommitCb(ths->pFsm, &rpcMsg, cbMeta);
|
||||
}
|
||||
}
|
||||
rpcFreeCont(rpcMsg.pCont);
|
||||
}
|
||||
|
||||
// free memory
|
||||
syncEntryDestory(pExtraEntry);
|
||||
syncEntryDestory(pAppendEntry);
|
||||
|
||||
} else if (hasExtraEntries && !hasAppendEntries) {
|
||||
// do nothing
|
||||
|
||||
} else if (!hasExtraEntries && hasAppendEntries) {
|
||||
SSyncRaftEntry* pAppendEntry = syncEntryDeserialize(pMsg->data, pMsg->dataLen);
|
||||
if (pAppendEntry == NULL) {
|
||||
syncNodeErrorLog(ths, "syncEntryDeserialize pAppendEntry2 error");
|
||||
return -1;
|
||||
}
|
||||
|
||||
// append new entries
|
||||
ths->pLogStore->appendEntry(ths->pLogStore, pAppendEntry);
|
||||
|
||||
// pre commit
|
||||
SRpcMsg rpcMsg;
|
||||
syncEntry2OriginalRpc(pAppendEntry, &rpcMsg);
|
||||
if (ths->pFsm != NULL) {
|
||||
// if (ths->pFsm->FpPreCommitCb != NULL && pAppendEntry->originalRpcType != TDMT_SYNC_NOOP) {
|
||||
if (ths->pFsm->FpPreCommitCb != NULL && syncUtilUserPreCommit(pAppendEntry->originalRpcType)) {
|
||||
SFsmCbMeta cbMeta = {0};
|
||||
cbMeta.index = pAppendEntry->index;
|
||||
cbMeta.lastConfigIndex = syncNodeGetSnapshotConfigIndex(ths, cbMeta.index);
|
||||
cbMeta.isWeak = pAppendEntry->isWeak;
|
||||
cbMeta.code = 3;
|
||||
cbMeta.state = ths->state;
|
||||
cbMeta.seqNum = pAppendEntry->seqNum;
|
||||
ths->pFsm->FpPreCommitCb(ths->pFsm, &rpcMsg, cbMeta);
|
||||
}
|
||||
}
|
||||
rpcFreeCont(rpcMsg.pCont);
|
||||
|
||||
// free memory
|
||||
syncEntryDestory(pAppendEntry);
|
||||
|
||||
} else if (!hasExtraEntries && !hasAppendEntries) {
|
||||
// do nothing
|
||||
|
||||
} else {
|
||||
syncNodeLog3("", ths);
|
||||
ASSERT(0);
|
||||
}
|
||||
|
||||
SyncAppendEntriesReply* pReply = syncAppendEntriesReplyBuild(ths->vgId);
|
||||
pReply->srcId = ths->myRaftId;
|
||||
pReply->destId = pMsg->srcId;
|
||||
pReply->term = ths->pRaftStore->currentTerm;
|
||||
pReply->success = true;
|
||||
|
||||
if (hasAppendEntries) {
|
||||
pReply->matchIndex = pMsg->prevLogIndex + 1;
|
||||
} else {
|
||||
pReply->matchIndex = pMsg->prevLogIndex;
|
||||
}
|
||||
// msg event log
|
||||
do {
|
||||
char host[128];
|
||||
uint16_t port;
|
||||
syncUtilU642Addr(pReply->destId.addr, host, sizeof(host), &port);
|
||||
sDebug(
|
||||
"vgId:%d, send sync-append-entries-reply to %s:%d, {term:%lu, pterm:%lu, success:%d, "
|
||||
"match-index:%ld}",
|
||||
ths->vgId, host, port, pReply->term, pReply->privateTerm, pReply->success, pReply->matchIndex);
|
||||
} while (0);
|
||||
|
||||
SRpcMsg rpcMsg;
|
||||
syncAppendEntriesReply2RpcMsg(pReply, &rpcMsg);
|
||||
|
@ -626,8 +383,6 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
|
|||
return ret;
|
||||
}
|
||||
|
||||
#endif
|
||||
|
||||
static int32_t syncNodeMakeLogSame(SSyncNode* ths, SyncAppendEntries* pMsg) {
|
||||
int32_t code;
|
||||
|
||||
|
@ -897,6 +652,17 @@ int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatc
|
|||
pReply->success = true;
|
||||
pReply->matchIndex = matchIndex;
|
||||
|
||||
// msg event log
|
||||
do {
|
||||
char host[128];
|
||||
uint16_t port;
|
||||
syncUtilU642Addr(pReply->destId.addr, host, sizeof(host), &port);
|
||||
sDebug(
|
||||
"vgId:%d, send sync-append-entries-reply to %s:%d, {term:%lu, pterm:%lu, success:%d, "
|
||||
"match-index:%ld}",
|
||||
ths->vgId, host, port, pReply->term, pReply->privateTerm, pReply->success, pReply->matchIndex);
|
||||
} while (0);
|
||||
|
||||
// send response
|
||||
SRpcMsg rpcMsg;
|
||||
syncAppendEntriesReply2RpcMsg(pReply, &rpcMsg);
|
||||
|
@ -945,6 +711,17 @@ int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatc
|
|||
pReply->success = false;
|
||||
pReply->matchIndex = SYNC_INDEX_INVALID;
|
||||
|
||||
// msg event log
|
||||
do {
|
||||
char host[128];
|
||||
uint16_t port;
|
||||
syncUtilU642Addr(pReply->destId.addr, host, sizeof(host), &port);
|
||||
sDebug(
|
||||
"vgId:%d, send sync-append-entries-reply to %s:%d, {term:%lu, pterm:%lu, success:%d, "
|
||||
"match-index:%ld}",
|
||||
ths->vgId, host, port, pReply->term, pReply->privateTerm, pReply->success, pReply->matchIndex);
|
||||
} while (0);
|
||||
|
||||
// send response
|
||||
SRpcMsg rpcMsg;
|
||||
syncAppendEntriesReply2RpcMsg(pReply, &rpcMsg);
|
||||
|
@ -977,7 +754,7 @@ int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatc
|
|||
do {
|
||||
char logBuf[128];
|
||||
snprintf(logBuf, sizeof(logBuf),
|
||||
"recv sync-append-entries, match, {pre-index:%ld, pre-term:%lu, datalen:%d, datacount:%d}",
|
||||
"recv sync-append-entries-batch, match, {pre-index:%ld, pre-term:%lu, datalen:%d, datacount:%d}",
|
||||
pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->dataLen, pMsg->dataCount);
|
||||
syncNodeEventLog(ths, logBuf);
|
||||
} while (0);
|
||||
|
@ -1018,6 +795,17 @@ int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatc
|
|||
pReply->success = true;
|
||||
pReply->matchIndex = hasAppendEntries ? pMsg->prevLogIndex + pMsg->dataCount : pMsg->prevLogIndex;
|
||||
|
||||
// msg event log
|
||||
do {
|
||||
char host[128];
|
||||
uint16_t port;
|
||||
syncUtilU642Addr(pReply->destId.addr, host, sizeof(host), &port);
|
||||
sDebug(
|
||||
"vgId:%d, send sync-append-entries-reply to %s:%d, {term:%lu, pterm:%lu, success:%d, "
|
||||
"match-index:%ld}",
|
||||
ths->vgId, host, port, pReply->term, pReply->privateTerm, pReply->success, pReply->matchIndex);
|
||||
} while (0);
|
||||
|
||||
// send response
|
||||
SRpcMsg rpcMsg;
|
||||
syncAppendEntriesReply2RpcMsg(pReply, &rpcMsg);
|
||||
|
@ -1227,6 +1015,17 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs
|
|||
pReply->success = true;
|
||||
pReply->matchIndex = matchIndex;
|
||||
|
||||
// msg event log
|
||||
do {
|
||||
char host[128];
|
||||
uint16_t port;
|
||||
syncUtilU642Addr(pReply->destId.addr, host, sizeof(host), &port);
|
||||
sDebug(
|
||||
"vgId:%d, send sync-append-entries-reply to %s:%d, {term:%lu, pterm:%lu, success:%d, "
|
||||
"match-index:%ld}",
|
||||
ths->vgId, host, port, pReply->term, pReply->privateTerm, pReply->success, pReply->matchIndex);
|
||||
} while (0);
|
||||
|
||||
// send response
|
||||
SRpcMsg rpcMsg;
|
||||
syncAppendEntriesReply2RpcMsg(pReply, &rpcMsg);
|
||||
|
@ -1272,6 +1071,17 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs
|
|||
pReply->success = false;
|
||||
pReply->matchIndex = SYNC_INDEX_INVALID;
|
||||
|
||||
// msg event log
|
||||
do {
|
||||
char host[128];
|
||||
uint16_t port;
|
||||
syncUtilU642Addr(pReply->destId.addr, host, sizeof(host), &port);
|
||||
sDebug(
|
||||
"vgId:%d, send sync-append-entries-reply to %s:%d, {term:%lu, pterm:%lu, success:%d, "
|
||||
"match-index:%ld}",
|
||||
ths->vgId, host, port, pReply->term, pReply->privateTerm, pReply->success, pReply->matchIndex);
|
||||
} while (0);
|
||||
|
||||
// send response
|
||||
SRpcMsg rpcMsg;
|
||||
syncAppendEntriesReply2RpcMsg(pReply, &rpcMsg);
|
||||
|
@ -1337,6 +1147,17 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs
|
|||
pReply->success = true;
|
||||
pReply->matchIndex = hasAppendEntries ? pMsg->prevLogIndex + 1 : pMsg->prevLogIndex;
|
||||
|
||||
// msg event log
|
||||
do {
|
||||
char host[128];
|
||||
uint16_t port;
|
||||
syncUtilU642Addr(pReply->destId.addr, host, sizeof(host), &port);
|
||||
sDebug(
|
||||
"vgId:%d, send sync-append-entries-reply to %s:%d, {term:%lu, pterm:%lu, success:%d, "
|
||||
"match-index:%ld}",
|
||||
ths->vgId, host, port, pReply->term, pReply->privateTerm, pReply->success, pReply->matchIndex);
|
||||
} while (0);
|
||||
|
||||
// send response
|
||||
SRpcMsg rpcMsg;
|
||||
syncAppendEntriesReply2RpcMsg(pReply, &rpcMsg);
|
||||
|
|
|
@ -1311,8 +1311,10 @@ int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pSyncNode, SRp
|
|||
pMsg->info.noResp = 1;
|
||||
pSyncNode->FpSendMsg(&epSet, pMsg);
|
||||
} else {
|
||||
sTrace("syncNodeSendMsgById pSyncNode->FpSendMsg is NULL");
|
||||
sError("vgId:%d, sync send msg by id error, fp-send-msg is null", pSyncNode->vgId);
|
||||
return -1;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -1326,7 +1328,7 @@ int32_t syncNodeSendMsgByInfo(const SNodeInfo* nodeInfo, SSyncNode* pSyncNode, S
|
|||
pMsg->info.noResp = 1;
|
||||
pSyncNode->FpSendMsg(&epSet, pMsg);
|
||||
} else {
|
||||
sTrace("syncNodeSendMsgByInfo pSyncNode->FpSendMsg is NULL");
|
||||
sError("vgId:%d, sync send msg by info error, fp-send-msg is null", pSyncNode->vgId);
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
|
|
@ -1,56 +0,0 @@
|
|||
/*
|
||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||
*
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3
|
||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#include "syncOnMessage.h"
|
||||
|
||||
// TLA+ Spec
|
||||
// Receive(m) ==
|
||||
// LET i == m.mdest
|
||||
// j == m.msource
|
||||
// IN \* Any RPC with a newer term causes the recipient to advance
|
||||
// \* its term first. Responses with stale terms are ignored.
|
||||
// \/ UpdateTerm(i, j, m)
|
||||
// \/ /\ m.mtype = RequestVoteRequest
|
||||
// /\ HandleRequestVoteRequest(i, j, m)
|
||||
// \/ /\ m.mtype = RequestVoteResponse
|
||||
// /\ \/ DropStaleResponse(i, j, m)
|
||||
// \/ HandleRequestVoteResponse(i, j, m)
|
||||
// \/ /\ m.mtype = AppendEntriesRequest
|
||||
// /\ HandleAppendEntriesRequest(i, j, m)
|
||||
// \/ /\ m.mtype = AppendEntriesResponse
|
||||
// /\ \/ DropStaleResponse(i, j, m)
|
||||
// \/ HandleAppendEntriesResponse(i, j, m)
|
||||
|
||||
// DuplicateMessage(m) ==
|
||||
// /\ Send(m)
|
||||
// /\ UNCHANGED <<serverVars, candidateVars, leaderVars, logVars>>
|
||||
|
||||
// DropMessage(m) ==
|
||||
// /\ Discard(m)
|
||||
// /\ UNCHANGED <<serverVars, candidateVars, leaderVars, logVars>>
|
||||
|
||||
// Next == /\ \/ \E i \in Server : Restart(i)
|
||||
// \/ \E i \in Server : Timeout(i)
|
||||
// \/ \E i,j \in Server : RequestVote(i, j)
|
||||
// \/ \E i \in Server : BecomeLeader(i)
|
||||
// \/ \E i \in Server, v \in Value : ClientRequest(i, v)
|
||||
// \/ \E i \in Server : AdvanceCommitIndex(i)
|
||||
// \/ \E i,j \in Server : AppendEntries(i, j)
|
||||
// \/ \E m \in DOMAIN messages : Receive(m)
|
||||
// \/ \E m \in DOMAIN messages : DuplicateMessage(m)
|
||||
// \/ \E m \in DOMAIN messages : DropMessage(m)
|
||||
// \* History variable that tracks every log ever:
|
||||
// /\ allLogs' = allLogs \cup {log[i] : i \in Server}
|
||||
//
|
|
@ -326,6 +326,14 @@ static int32_t raftLogTruncate(struct SSyncLogStore* pLogStore, SyncIndex fromIn
|
|||
|
||||
ASSERT(0);
|
||||
}
|
||||
|
||||
// event log
|
||||
do {
|
||||
char logBuf[128];
|
||||
snprintf(logBuf, sizeof(logBuf), "wal truncate, from-index:%ld", fromIndex);
|
||||
syncNodeEventLog(pData->pSyncNode, logBuf);
|
||||
} while (0);
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
|
@ -463,6 +471,14 @@ int32_t logStoreTruncate(SSyncLogStore* pLogStore, SyncIndex fromIndex) {
|
|||
|
||||
ASSERT(0);
|
||||
}
|
||||
|
||||
// event log
|
||||
do {
|
||||
char logBuf[128];
|
||||
snprintf(logBuf, sizeof(logBuf), "wal truncate, from-index:%ld", fromIndex);
|
||||
syncNodeEventLog(pData->pSyncNode, logBuf);
|
||||
} while (0);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
|
|
@ -151,14 +151,6 @@ int32_t syncNodeAppendEntriesPeersSnapshot2(SSyncNode* pSyncNode) {
|
|||
for (int32_t i = 0; i < pSyncNode->pRaftCfg->batchSize; ++i) {
|
||||
SSyncRaftEntry* pEntry = NULL;
|
||||
int32_t code = pSyncNode->pLogStore->syncLogGetEntry(pSyncNode->pLogStore, getEntryIndex, &pEntry);
|
||||
|
||||
// event log
|
||||
do {
|
||||
char logBuf[128];
|
||||
snprintf(logBuf, sizeof(logBuf), "get index:%d, code:%d, %s", getEntryIndex, code, tstrerror(terrno));
|
||||
syncNodeEventLog(pSyncNode, logBuf);
|
||||
} while (0);
|
||||
|
||||
if (code == 0) {
|
||||
ASSERT(pEntry != NULL);
|
||||
entryPArr[i] = pEntry;
|
||||
|
@ -172,8 +164,11 @@ int32_t syncNodeAppendEntriesPeersSnapshot2(SSyncNode* pSyncNode) {
|
|||
|
||||
// event log
|
||||
do {
|
||||
char logBuf[128];
|
||||
snprintf(logBuf, sizeof(logBuf), "build batch:%d", getCount);
|
||||
char logBuf[128];
|
||||
char host[64];
|
||||
uint16_t port;
|
||||
syncUtilU642Addr(pDestId->addr, host, sizeof(host), &port);
|
||||
snprintf(logBuf, sizeof(logBuf), "build batch:%d for %s:%d", getCount, host, port);
|
||||
syncNodeEventLog(pSyncNode, logBuf);
|
||||
} while (0);
|
||||
|
||||
|
|
|
@ -54,6 +54,7 @@ add_executable(syncRaftLogTest2 "")
|
|||
add_executable(syncRaftLogTest3 "")
|
||||
add_executable(syncLeaderTransferTest "")
|
||||
add_executable(syncReconfigFinishTest "")
|
||||
add_executable(syncRestoreFromSnapshot "")
|
||||
|
||||
|
||||
target_sources(syncTest
|
||||
|
@ -280,6 +281,10 @@ target_sources(syncReconfigFinishTest
|
|||
PRIVATE
|
||||
"syncReconfigFinishTest.cpp"
|
||||
)
|
||||
target_sources(syncRestoreFromSnapshot
|
||||
PRIVATE
|
||||
"syncRestoreFromSnapshot.cpp"
|
||||
)
|
||||
|
||||
|
||||
target_include_directories(syncTest
|
||||
|
@ -562,6 +567,11 @@ target_include_directories(syncReconfigFinishTest
|
|||
"${TD_SOURCE_DIR}/include/libs/sync"
|
||||
"${CMAKE_CURRENT_SOURCE_DIR}/../inc"
|
||||
)
|
||||
target_include_directories(syncRestoreFromSnapshot
|
||||
PUBLIC
|
||||
"${TD_SOURCE_DIR}/include/libs/sync"
|
||||
"${CMAKE_CURRENT_SOURCE_DIR}/../inc"
|
||||
)
|
||||
|
||||
|
||||
target_link_libraries(syncTest
|
||||
|
@ -788,6 +798,10 @@ target_link_libraries(syncReconfigFinishTest
|
|||
sync
|
||||
gtest_main
|
||||
)
|
||||
target_link_libraries(syncRestoreFromSnapshot
|
||||
sync
|
||||
gtest_main
|
||||
)
|
||||
|
||||
|
||||
enable_testing()
|
||||
|
|
|
@ -85,6 +85,7 @@ void logStoreTest() {
|
|||
}
|
||||
|
||||
int main(int argc, char** argv) {
|
||||
gRaftDetailLog = true;
|
||||
tsAsyncLog = 0;
|
||||
sDebugFlag = DEBUG_TRACE + DEBUG_SCREEN + DEBUG_FILE;
|
||||
|
||||
|
|
|
@ -0,0 +1,78 @@
|
|||
#include <gtest/gtest.h>
|
||||
#include <stdio.h>
|
||||
#include "syncEnv.h"
|
||||
#include "syncIO.h"
|
||||
#include "syncInt.h"
|
||||
#include "syncRaftLog.h"
|
||||
#include "syncRaftStore.h"
|
||||
#include "syncUtil.h"
|
||||
#include "wal.h"
|
||||
|
||||
void logTest() {
|
||||
sTrace("--- sync log test: trace");
|
||||
sDebug("--- sync log test: debug");
|
||||
sInfo("--- sync log test: info");
|
||||
sWarn("--- sync log test: warn");
|
||||
sError("--- sync log test: error");
|
||||
sFatal("--- sync log test: fatal");
|
||||
}
|
||||
|
||||
void init() {
|
||||
int code = walInit();
|
||||
assert(code == 0);
|
||||
}
|
||||
|
||||
void cleanup() { walCleanUp(); }
|
||||
|
||||
SWal* createWal(char* path, int32_t vgId) {
|
||||
SWalCfg walCfg;
|
||||
memset(&walCfg, 0, sizeof(SWalCfg));
|
||||
walCfg.vgId = vgId;
|
||||
walCfg.fsyncPeriod = 1000;
|
||||
walCfg.retentionPeriod = 1000;
|
||||
walCfg.rollPeriod = 1000;
|
||||
walCfg.retentionSize = 1000;
|
||||
walCfg.segSize = 1000;
|
||||
walCfg.level = TAOS_WAL_FSYNC;
|
||||
SWal* pWal = walOpen(path, &walCfg);
|
||||
assert(pWal != NULL);
|
||||
return pWal;
|
||||
}
|
||||
|
||||
SSyncNode* createSyncNode(SWal* pWal) {
|
||||
SSyncNode* pSyncNode = (SSyncNode*)taosMemoryMalloc(sizeof(SSyncNode));
|
||||
memset(pSyncNode, 0, sizeof(SSyncNode));
|
||||
pSyncNode->pWal = pWal;
|
||||
return pSyncNode;
|
||||
}
|
||||
|
||||
void usage(char* exe) { printf("usage: %s path vgId snapshotIndex \n", exe); }
|
||||
|
||||
int main(int argc, char** argv) {
|
||||
if (argc != 4) {
|
||||
usage(argv[0]);
|
||||
exit(-1);
|
||||
}
|
||||
char* path = argv[1];
|
||||
int32_t vgId = atoi(argv[2]);
|
||||
int64_t snapshotIndex = atoll(argv[3]);
|
||||
|
||||
init();
|
||||
SWal* pWal = createWal(path, vgId);
|
||||
assert(pWal != NULL);
|
||||
SSyncNode* pSyncNode = createSyncNode(pWal);
|
||||
assert(pSyncNode != NULL);
|
||||
|
||||
SSyncLogStore* pLog = logStoreCreate(pSyncNode);
|
||||
assert(pLog != NULL);
|
||||
|
||||
int32_t code = pLog->syncLogRestoreFromSnapshot(pLog, snapshotIndex);
|
||||
assert(code == 0);
|
||||
|
||||
walClose(pWal);
|
||||
logStoreDestory(pLog);
|
||||
taosMemoryFree(pSyncNode);
|
||||
|
||||
cleanup();
|
||||
return 0;
|
||||
}
|
|
@ -142,20 +142,33 @@ sql create table ct1 using stb tags(1000)
|
|||
|
||||
system sh/exec.sh -n dnode4 -s stop -x SIGINT
|
||||
|
||||
sql insert into ct1 values(now+0s, 10, 2.0, 3.0)
|
||||
sql insert into ct1 values(now+1s, 11, 2.1, 3.1)(now+2s, -12, -2.2, -3.2)(now+3s, -13, -2.3, -3.3)
|
||||
$N = 100
|
||||
$count = 0
|
||||
while $count < $N
|
||||
$ms = 1591200000000 + $count
|
||||
sql insert into ct1 values( $ms , $count , 2.1, 3.1)
|
||||
$count = $count + 1
|
||||
endw
|
||||
|
||||
|
||||
|
||||
#sql insert into ct1 values(now+0s, 10, 2.0, 3.0)
|
||||
#sql insert into ct1 values(now+1s, 11, 2.1, 3.1)(now+2s, -12, -2.2, -3.2)(now+3s, -13, -2.3, -3.3)
|
||||
|
||||
#sql flush database db;
|
||||
|
||||
#system sh/exec.sh -n dnode4 -s start
|
||||
|
||||
sql insert into ct1 values(now+1s, 81, 8.1, 8.1)(now+2s, -92, -9.2, -9.2)(now+3s, -73, -7.3, -7.3)
|
||||
#sql insert into ct1 values(now+1s, 81, 8.1, 8.1)(now+2s, -92, -9.2, -9.2)(now+3s, -73, -7.3, -7.3)
|
||||
|
||||
|
||||
|
||||
#system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
||||
#system sh/exec.sh -n dnode2 -s stop -x SIGINT
|
||||
#system sh/exec.sh -n dnode3 -s stop -x SIGINT
|
||||
sleep 5000
|
||||
|
||||
|
||||
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
||||
system sh/exec.sh -n dnode2 -s stop -x SIGINT
|
||||
system sh/exec.sh -n dnode3 -s stop -x SIGINT
|
||||
#system sh/exec.sh -n dnode4 -s stop -x SIGINT
|
||||
|
||||
|
||||
|
|
Loading…
Reference in New Issue