enh: add signature info in logging msgs of snap replication
This commit is contained in:
parent
811f1bbbea
commit
4d61e87c0f
|
@ -195,8 +195,7 @@ int32_t snapshotSenderStart(SSyncSnapshotSender *pSender) {
|
||||||
goto _out;
|
goto _out;
|
||||||
}
|
}
|
||||||
|
|
||||||
sSInfo(pSender, "snapshot sender start to dnode:%d. signature:(%" PRId64 ", %" PRId64 ")", DID(&pMsg->destId),
|
sSInfo(pSender, "snapshot sender start, to dnode:%d.", DID(&pMsg->destId));
|
||||||
pSender->term, pSender->startTime);
|
|
||||||
|
|
||||||
code = 0;
|
code = 0;
|
||||||
_out:
|
_out:
|
||||||
|
@ -233,8 +232,7 @@ void snapshotSenderStop(SSyncSnapshotSender *pSender, bool finish) {
|
||||||
syncSnapBufferReset(pSender->pSndBuf);
|
syncSnapBufferReset(pSender->pSndBuf);
|
||||||
|
|
||||||
SRaftId destId = pSender->pSyncNode->replicasId[pSender->replicaIndex];
|
SRaftId destId = pSender->pSyncNode->replicasId[pSender->replicaIndex];
|
||||||
sSInfo(pSender, "snapshot sender stop to dnode:%d. signature:(%" PRId64 ", %" PRId64 "), finish:%d", DID(&destId),
|
sSInfo(pSender, "snapshot sender stop, to dnode:%d, finish:%d", DID(&destId), finish);
|
||||||
pSender->term, pSender->startTime, finish);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// when sender receive ack, call this function to send msg from seq
|
// when sender receive ack, call this function to send msg from seq
|
||||||
|
@ -522,8 +520,7 @@ void snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *p
|
||||||
pReceiver->startTime = pPreMsg->startTime;
|
pReceiver->startTime = pPreMsg->startTime;
|
||||||
ASSERT(pReceiver->startTime);
|
ASSERT(pReceiver->startTime);
|
||||||
|
|
||||||
sRInfo(pReceiver, "snapshot receiver start from dnode:%d. signature:(%" PRId64 ", %" PRId64 ")",
|
sRInfo(pReceiver, "snapshot receiver start, from dnode:%d.", DID(&pReceiver->fromId));
|
||||||
DID(&pReceiver->fromId), pReceiver->term, pReceiver->startTime);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// just set start = false
|
// just set start = false
|
||||||
|
@ -547,8 +544,7 @@ void snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver) {
|
||||||
|
|
||||||
syncSnapBufferReset(pReceiver->pRcvBuf);
|
syncSnapBufferReset(pReceiver->pRcvBuf);
|
||||||
|
|
||||||
sRInfo(pReceiver, "snapshot receiver stop from dnode:%d. signature:(%" PRId64 ", %" PRId64 ")",
|
sRInfo(pReceiver, "snapshot receiver stop, from dnode:%d.", DID(&pReceiver->fromId));
|
||||||
DID(&pReceiver->fromId), pReceiver->term, pReceiver->startTime);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// when recv last snapshot block, apply data into snapshot
|
// when recv last snapshot block, apply data into snapshot
|
||||||
|
@ -676,22 +672,22 @@ static int32_t syncNodeOnSnapshotPrep(SSyncNode *pSyncNode, SyncSnapshotSend *pM
|
||||||
int32_t order = 0;
|
int32_t order = 0;
|
||||||
if ((order = snapshotReceiverSignatureCmp(pReceiver, pMsg)) < 0) {
|
if ((order = snapshotReceiverSignatureCmp(pReceiver, pMsg)) < 0) {
|
||||||
sRInfo(pReceiver,
|
sRInfo(pReceiver,
|
||||||
"received a new snapshot preparation. restart receiver"
|
"received a new snapshot preparation. restart receiver."
|
||||||
"receiver signature: (%" PRId64 ", %" PRId64 "), msg signature:(%" PRId64 ", %" PRId64 ")",
|
" msg signature:(%" PRId64 ", %" PRId64 ")",
|
||||||
pReceiver->term, pReceiver->startTime, pMsg->term, pMsg->startTime);
|
pMsg->term, pMsg->startTime);
|
||||||
goto _START_RECEIVER;
|
goto _START_RECEIVER;
|
||||||
} else if (order == 0) {
|
} else if (order == 0) {
|
||||||
sRInfo(pReceiver,
|
sRInfo(pReceiver,
|
||||||
"received a duplicate snapshot preparation. send reply"
|
"received a duplicate snapshot preparation. send reply."
|
||||||
"receiver signature: (%" PRId64 ", %" PRId64 "), msg signature:(%" PRId64 ", %" PRId64 ")",
|
" msg signature:(%" PRId64 ", %" PRId64 ")",
|
||||||
pReceiver->term, pReceiver->startTime, pMsg->term, pMsg->startTime);
|
pMsg->term, pMsg->startTime);
|
||||||
goto _SEND_REPLY;
|
goto _SEND_REPLY;
|
||||||
} else {
|
} else {
|
||||||
// ignore
|
// ignore
|
||||||
sRError(pReceiver,
|
sRError(pReceiver,
|
||||||
"received a stale snapshot preparation. ignore"
|
"received a stale snapshot preparation. ignore."
|
||||||
"receiver signature: (%" PRId64 ", %" PRId64 "), msg signature:(%" PRId64 ", %" PRId64 ")",
|
" msg signature:(%" PRId64 ", %" PRId64 ")",
|
||||||
pReceiver->term, pReceiver->startTime, pMsg->term, pMsg->startTime);
|
pMsg->term, pMsg->startTime);
|
||||||
terrno = TSDB_CODE_SYN_MISMATCHED_SIGNATURE;
|
terrno = TSDB_CODE_SYN_MISMATCHED_SIGNATURE;
|
||||||
code = terrno;
|
code = terrno;
|
||||||
goto _SEND_REPLY;
|
goto _SEND_REPLY;
|
||||||
|
@ -809,6 +805,8 @@ static int32_t syncNodeOnSnapshotBegin(SSyncNode *pSyncNode, SyncSnapshotSend *p
|
||||||
goto _SEND_REPLY;
|
goto _SEND_REPLY;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
sRInfo(pReceiver, "snapshot begin");
|
||||||
|
|
||||||
code = 0;
|
code = 0;
|
||||||
_SEND_REPLY:
|
_SEND_REPLY:
|
||||||
if (code != 0 && terrno != 0) {
|
if (code != 0 && terrno != 0) {
|
||||||
|
@ -1009,8 +1007,7 @@ _SEND_REPLY:;
|
||||||
//
|
//
|
||||||
int32_t syncNodeOnSnapshot(SSyncNode *pSyncNode, SRpcMsg *pRpcMsg) {
|
int32_t syncNodeOnSnapshot(SSyncNode *pSyncNode, SRpcMsg *pRpcMsg) {
|
||||||
SyncSnapshotSend **ppMsg = (SyncSnapshotSend **)&pRpcMsg->pCont;
|
SyncSnapshotSend **ppMsg = (SyncSnapshotSend **)&pRpcMsg->pCont;
|
||||||
SyncSnapshotSend *pMsg = ppMsg[0];
|
SyncSnapshotSend *pMsg = ppMsg[0];
|
||||||
ASSERT(pMsg);
|
|
||||||
SSyncSnapshotReceiver *pReceiver = pSyncNode->pNewNodeReceiver;
|
SSyncSnapshotReceiver *pReceiver = pSyncNode->pNewNodeReceiver;
|
||||||
|
|
||||||
// if already drop replica, do not process
|
// if already drop replica, do not process
|
||||||
|
@ -1040,16 +1037,16 @@ int32_t syncNodeOnSnapshot(SSyncNode *pSyncNode, SRpcMsg *pRpcMsg) {
|
||||||
if (pSyncNode->state == TAOS_SYNC_STATE_FOLLOWER || pSyncNode->state == TAOS_SYNC_STATE_LEARNER) {
|
if (pSyncNode->state == TAOS_SYNC_STATE_FOLLOWER || pSyncNode->state == TAOS_SYNC_STATE_LEARNER) {
|
||||||
if (pMsg->term == raftStoreGetTerm(pSyncNode)) {
|
if (pMsg->term == raftStoreGetTerm(pSyncNode)) {
|
||||||
if (pMsg->seq == SYNC_SNAPSHOT_SEQ_PREP_SNAPSHOT) {
|
if (pMsg->seq == SYNC_SNAPSHOT_SEQ_PREP_SNAPSHOT) {
|
||||||
sInfo("vgId:%d, receive pre-snapshot msg of snapshot replication. signature:(%" PRId64 ", %" PRId64 ")",
|
sInfo("vgId:%d, receive prepare msg of snap replication. msg signature:(%" PRId64 ", %" PRId64 ")",
|
||||||
pSyncNode->vgId, pMsg->term, pMsg->startTime);
|
pSyncNode->vgId, pMsg->term, pMsg->startTime);
|
||||||
code = syncNodeOnSnapshotPrep(pSyncNode, pMsg);
|
code = syncNodeOnSnapshotPrep(pSyncNode, pMsg);
|
||||||
} else if (pMsg->seq == SYNC_SNAPSHOT_SEQ_BEGIN) {
|
} else if (pMsg->seq == SYNC_SNAPSHOT_SEQ_BEGIN) {
|
||||||
sInfo("vgId:%d, receive begin msg of snapshot replication. signature:(%" PRId64 ", %" PRId64 ")",
|
sInfo("vgId:%d, receive begin msg of snap replication. msg signature:(%" PRId64 ", %" PRId64 ")",
|
||||||
pSyncNode->vgId, pMsg->term, pMsg->startTime);
|
pSyncNode->vgId, pMsg->term, pMsg->startTime);
|
||||||
code = syncNodeOnSnapshotBegin(pSyncNode, pMsg);
|
code = syncNodeOnSnapshotBegin(pSyncNode, pMsg);
|
||||||
} else if (pMsg->seq == SYNC_SNAPSHOT_SEQ_END) {
|
} else if (pMsg->seq == SYNC_SNAPSHOT_SEQ_END) {
|
||||||
sInfo("vgId:%d, receive end msg of snapshot replication. signature: (%" PRId64 ", %" PRId64 ")",
|
sInfo("vgId:%d, receive end msg of snap replication. msg signature:(%" PRId64 ", %" PRId64 ")", pSyncNode->vgId,
|
||||||
pSyncNode->vgId, pMsg->term, pMsg->startTime);
|
pMsg->term, pMsg->startTime);
|
||||||
code = syncNodeOnSnapshotEnd(pSyncNode, pMsg);
|
code = syncNodeOnSnapshotEnd(pSyncNode, pMsg);
|
||||||
if (syncLogBufferReInit(pSyncNode->pLogBuf, pSyncNode) != 0) {
|
if (syncLogBufferReInit(pSyncNode->pLogBuf, pSyncNode) != 0) {
|
||||||
sRError(pReceiver, "failed to reinit log buffer since %s", terrstr());
|
sRError(pReceiver, "failed to reinit log buffer since %s", terrstr());
|
||||||
|
@ -1059,7 +1056,7 @@ int32_t syncNodeOnSnapshot(SSyncNode *pSyncNode, SRpcMsg *pRpcMsg) {
|
||||||
// force close, no response
|
// force close, no response
|
||||||
syncLogRecvSyncSnapshotSend(pSyncNode, pMsg, "process force stop");
|
syncLogRecvSyncSnapshotSend(pSyncNode, pMsg, "process force stop");
|
||||||
snapshotReceiverStop(pReceiver);
|
snapshotReceiverStop(pReceiver);
|
||||||
} else if (pMsg->seq > SYNC_SNAPSHOT_SEQ_BEGIN && pMsg->seq <= SYNC_SNAPSHOT_SEQ_END) {
|
} else if (pMsg->seq > SYNC_SNAPSHOT_SEQ_BEGIN && pMsg->seq < SYNC_SNAPSHOT_SEQ_END) {
|
||||||
syncLogRecvSyncSnapshotSend(pSyncNode, pMsg, "process seq data");
|
syncLogRecvSyncSnapshotSend(pSyncNode, pMsg, "process seq data");
|
||||||
code = syncNodeOnSnapshotReceive(pSyncNode, ppMsg);
|
code = syncNodeOnSnapshotReceive(pSyncNode, ppMsg);
|
||||||
} else {
|
} else {
|
||||||
|
@ -1139,10 +1136,7 @@ static int32_t syncNodeOnSnapshotPrepRsp(SSyncNode *pSyncNode, SSyncSnapshotSend
|
||||||
pSendMsg->startTime = pSender->startTime;
|
pSendMsg->startTime = pSender->startTime;
|
||||||
pSendMsg->seq = SYNC_SNAPSHOT_SEQ_BEGIN;
|
pSendMsg->seq = SYNC_SNAPSHOT_SEQ_BEGIN;
|
||||||
|
|
||||||
ASSERT(pSendMsg->startTime);
|
sSInfo(pSender, "begin snapshot replication to dnode %d." PRId64, DID(&pSendMsg->destId));
|
||||||
|
|
||||||
sSInfo(pSender, "begin snapshot replication to dnode %d. startTime:%" PRId64, DID(&pSendMsg->destId),
|
|
||||||
pSendMsg->startTime);
|
|
||||||
|
|
||||||
// send msg
|
// send msg
|
||||||
syncLogSendSyncSnapshotSend(pSyncNode, pSendMsg, "snapshot sender reply pre");
|
syncLogSendSyncSnapshotSend(pSyncNode, pSendMsg, "snapshot sender reply pre");
|
||||||
|
@ -1252,10 +1246,8 @@ int32_t syncNodeOnSnapshotRsp(SSyncNode *pSyncNode, SRpcMsg *pRpcMsg) {
|
||||||
// check signature
|
// check signature
|
||||||
int32_t order = 0;
|
int32_t order = 0;
|
||||||
if ((order = snapshotSenderSignatureCmp(pSender, pMsg)) > 0) {
|
if ((order = snapshotSenderSignatureCmp(pSender, pMsg)) > 0) {
|
||||||
sSError(pSender,
|
sSError(pSender, "received a stale snapshot rsp, msg signature:(%" PRId64 ", %" PRId64 "), ignore it.", pMsg->term,
|
||||||
"received a stale snapshot rsp. ignore it"
|
pMsg->startTime);
|
||||||
"sender signature: (%" PRId64 ", %" PRId64 "), msg signature:(%" PRId64 ", %" PRId64 ")",
|
|
||||||
pSender->term, pSender->startTime, pMsg->term, pMsg->startTime);
|
|
||||||
terrno = TSDB_CODE_SYN_MISMATCHED_SIGNATURE;
|
terrno = TSDB_CODE_SYN_MISMATCHED_SIGNATURE;
|
||||||
return -1;
|
return -1;
|
||||||
} else if (order < 0) {
|
} else if (order < 0) {
|
||||||
|
|
|
@ -266,22 +266,21 @@ void syncPrintSnapshotSenderLog(const char* flags, ELogLevel level, int32_t dfla
|
||||||
int32_t writeLen = vsnprintf(eventLog, sizeof(eventLog), format, argpointer);
|
int32_t writeLen = vsnprintf(eventLog, sizeof(eventLog), format, argpointer);
|
||||||
va_end(argpointer);
|
va_end(argpointer);
|
||||||
|
|
||||||
taosPrintLog(flags, level, dflag,
|
taosPrintLog(
|
||||||
"vgId:%d, %s, sync:%s, snap-sender:{%p start:%" PRId64 " end:%" PRId64 " last-index:%" PRId64
|
flags, level, dflag,
|
||||||
" last-term:%" PRIu64 " last-cfg:%" PRId64
|
"vgId:%d, %s, sync:%s, snap-sender:%p signature:(%" PRId64 ", %" PRId64 "), {start:%" PRId64 " end:%" PRId64
|
||||||
", seq:%d ack:%d finish:%d, as:%d dnode:%d}"
|
" last-index:%" PRId64 " last-term:%" PRIu64 " last-cfg:%" PRId64
|
||||||
", term:%" PRIu64 ", commit-index:%" PRId64 ", firstver:%" PRId64 ", lastver:%" PRId64
|
", seq:%d ack:%d finish:%d, as:%d, to-dnode:%d}"
|
||||||
", min-match:%" PRId64 ", snap:{last-index:%" PRId64 ", term:%" PRIu64
|
", term:%" PRIu64 ", commit-index:%" PRId64 ", firstver:%" PRId64 ", lastver:%" PRId64 ", min-match:%" PRId64
|
||||||
"}, standby:%d, batch-sz:%d, replicas:%d, last-cfg:%" PRId64
|
", snap:{last-index:%" PRId64 ", term:%" PRIu64 "}, standby:%d, batch-sz:%d, replicas:%d, last-cfg:%" PRId64
|
||||||
", chging:%d, restore:%d, quorum:%d, lc-timer:{elect:%" PRId64 ", hb:%" PRId64 "}, peer:%s, cfg:%s",
|
", chging:%d, restore:%d, quorum:%d, peer:%s, cfg:%s",
|
||||||
pNode->vgId, eventLog, syncStr(pNode->state), pSender, pSender->snapshotParam.start,
|
pNode->vgId, eventLog, syncStr(pNode->state), pSender, pSender->term, pSender->startTime,
|
||||||
pSender->snapshotParam.end, pSender->snapshot.lastApplyIndex, pSender->snapshot.lastApplyTerm,
|
pSender->snapshotParam.start, pSender->snapshotParam.end, pSender->snapshot.lastApplyIndex,
|
||||||
pSender->snapshot.lastConfigIndex, pSender->seq, pSender->ack, pSender->finish, pSender->replicaIndex,
|
pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex, pSender->seq, pSender->ack, pSender->finish,
|
||||||
DID(&pNode->replicasId[pSender->replicaIndex]), raftStoreGetTerm(pNode), pNode->commitIndex,
|
pSender->replicaIndex, DID(&pNode->replicasId[pSender->replicaIndex]), raftStoreGetTerm(pNode),
|
||||||
logBeginIndex, logLastIndex, pNode->minMatchIndex, snapshot.lastApplyIndex, snapshot.lastApplyTerm,
|
pNode->commitIndex, logBeginIndex, logLastIndex, pNode->minMatchIndex, snapshot.lastApplyIndex,
|
||||||
pNode->raftCfg.isStandBy, pNode->raftCfg.batchSize, pNode->replicaNum, pNode->raftCfg.lastConfigIndex,
|
snapshot.lastApplyTerm, pNode->raftCfg.isStandBy, pNode->raftCfg.batchSize, pNode->replicaNum,
|
||||||
pNode->changing, pNode->restoreFinish, syncNodeDynamicQuorum(pNode), pNode->electTimerLogicClock,
|
pNode->raftCfg.lastConfigIndex, pNode->changing, pNode->restoreFinish, pNode->quorum, peerStr, cfgStr);
|
||||||
pNode->heartbeatTimerLogicClockUser, peerStr, cfgStr);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void syncPrintSnapshotReceiverLog(const char* flags, ELogLevel level, int32_t dflag, SSyncSnapshotReceiver* pReceiver,
|
void syncPrintSnapshotReceiverLog(const char* flags, ELogLevel level, int32_t dflag, SSyncSnapshotReceiver* pReceiver,
|
||||||
|
@ -316,19 +315,19 @@ void syncPrintSnapshotReceiverLog(const char* flags, ELogLevel level, int32_t df
|
||||||
taosPrintLog(
|
taosPrintLog(
|
||||||
flags, level, dflag,
|
flags, level, dflag,
|
||||||
"vgId:%d, %s, sync:%s,"
|
"vgId:%d, %s, sync:%s,"
|
||||||
" snap-receiver:{%p started:%d acked:%d term:%" PRIu64 " start-time:%" PRId64 " from-dnode:%d, start:%" PRId64
|
" snap-receiver:%p signature:(%" PRId64 ", %" PRId64 "), {start:%d ack:%d term:%" PRIu64 " start-time:%" PRId64
|
||||||
" end:%" PRId64 " last-index:%" PRId64 " last-term:%" PRIu64 " last-cfg:%" PRId64
|
" from-dnode:%d, start:%" PRId64 " end:%" PRId64 " last-index:%" PRId64 " last-term:%" PRIu64 " last-cfg:%" PRId64
|
||||||
"}"
|
"}"
|
||||||
", term:%" PRIu64 ", commit-index:%" PRId64 ", firstver:%" PRId64 ", lastver:%" PRId64 ", min-match:%" PRId64
|
", term:%" PRIu64 ", commit-index:%" PRId64 ", firstver:%" PRId64 ", lastver:%" PRId64 ", min-match:%" PRId64
|
||||||
", snap:{last-index:%" PRId64 ", last-term:%" PRIu64 "}, standby:%d, batch-sz:%d, replicas:%d, last-cfg:%" PRId64
|
", snap:{last-index:%" PRId64 ", last-term:%" PRIu64 "}, standby:%d, batch-sz:%d, replicas:%d, last-cfg:%" PRId64
|
||||||
", chging:%d, restore:%d, quorum:%d, lc-timers:{elect:%" PRId64 ", hb:%" PRId64 "}, peer:%s, cfg:%s",
|
", chging:%d, restore:%d, quorum:%d, peer:%s, cfg:%s",
|
||||||
pNode->vgId, eventLog, syncStr(pNode->state), pReceiver, pReceiver->start, pReceiver->ack, pReceiver->term,
|
pNode->vgId, eventLog, syncStr(pNode->state), pReceiver, pReceiver->term, pReceiver->startTime, pReceiver->start,
|
||||||
pReceiver->startTime, DID(&pReceiver->fromId), pReceiver->snapshotParam.start, pReceiver->snapshotParam.end,
|
pReceiver->ack, pReceiver->term, pReceiver->startTime, DID(&pReceiver->fromId), pReceiver->snapshotParam.start,
|
||||||
pReceiver->snapshot.lastApplyIndex, pReceiver->snapshot.lastApplyTerm, pReceiver->snapshot.lastConfigIndex,
|
pReceiver->snapshotParam.end, pReceiver->snapshot.lastApplyIndex, pReceiver->snapshot.lastApplyTerm,
|
||||||
raftStoreGetTerm(pNode), pNode->commitIndex, logBeginIndex, logLastIndex, pNode->minMatchIndex,
|
pReceiver->snapshot.lastConfigIndex, raftStoreGetTerm(pNode), pNode->commitIndex, logBeginIndex, logLastIndex,
|
||||||
snapshot.lastApplyIndex, snapshot.lastApplyTerm, pNode->raftCfg.isStandBy, pNode->raftCfg.batchSize,
|
pNode->minMatchIndex, snapshot.lastApplyIndex, snapshot.lastApplyTerm, pNode->raftCfg.isStandBy,
|
||||||
pNode->replicaNum, pNode->raftCfg.lastConfigIndex, pNode->changing, pNode->restoreFinish,
|
pNode->raftCfg.batchSize, pNode->replicaNum, pNode->raftCfg.lastConfigIndex, pNode->changing,
|
||||||
syncNodeDynamicQuorum(pNode), pNode->electTimerLogicClock, pNode->heartbeatTimerLogicClockUser, peerStr, cfgStr);
|
pNode->restoreFinish, pNode->quorum, peerStr, cfgStr);
|
||||||
}
|
}
|
||||||
|
|
||||||
void syncLogRecvTimer(SSyncNode* pSyncNode, const SyncTimeout* pMsg, const char* s) {
|
void syncLogRecvTimer(SSyncNode* pSyncNode, const SyncTimeout* pMsg, const char* s) {
|
||||||
|
|
Loading…
Reference in New Issue