enh: logging buf info for snapshot reader and sender
This commit is contained in:
parent
7d9d1c6877
commit
ec5b1f2ec1
|
@ -56,7 +56,7 @@ typedef struct SSyncSnapshotSender {
|
|||
int64_t lastSendTime;
|
||||
bool finish;
|
||||
|
||||
// buffer
|
||||
// ring buffer for ack
|
||||
SSyncSnapBuffer *pSndBuf;
|
||||
|
||||
// init when create
|
||||
|
|
|
@ -345,20 +345,6 @@ int32_t snapshotReSend(SSyncSnapshotSender *pSender) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
static int32_t snapshotSenderUpdateProgress(SSyncSnapshotSender *pSender, SyncSnapshotRsp *pMsg) {
|
||||
if (pMsg->ack != pSender->seq) {
|
||||
sSError(pSender, "snapshot sender update seq failed, ack:%d seq:%d", pMsg->ack, pSender->seq);
|
||||
terrno = TSDB_CODE_SYN_MISMATCHED_SIGNATURE;
|
||||
return -1;
|
||||
}
|
||||
|
||||
pSender->ack = pMsg->ack;
|
||||
pSender->seq++;
|
||||
|
||||
sSDebug(pSender, "snapshot sender update seq:%d", pSender->seq);
|
||||
return 0;
|
||||
}
|
||||
|
||||
// return 0, start ok
|
||||
// return 1, last snapshot finish ok
|
||||
// return -1, error
|
||||
|
@ -1182,6 +1168,8 @@ static int32_t syncSnapBufferSend(SSyncSnapshotSender *pSender, SyncSnapshotRsp
|
|||
pSndBuf->start = ack + 1;
|
||||
}
|
||||
|
||||
pSender->ack = pSndBuf->start - 1;
|
||||
|
||||
while (pSender->seq != SYNC_SNAPSHOT_SEQ_END && pSender->seq - pSndBuf->start < (pSndBuf->size >> 2)) {
|
||||
if (snapshotSend(pSender) != 0) {
|
||||
code = terrno;
|
||||
|
|
|
@ -266,21 +266,24 @@ void syncPrintSnapshotSenderLog(const char* flags, ELogLevel level, int32_t dfla
|
|||
int32_t writeLen = vsnprintf(eventLog, sizeof(eventLog), format, argpointer);
|
||||
va_end(argpointer);
|
||||
|
||||
taosPrintLog(
|
||||
flags, level, dflag,
|
||||
"vgId:%d, %s, sync:%s, snap-sender:%p signature:(%" PRId64 ", %" PRId64 "), {start:%" PRId64 " end:%" PRId64
|
||||
" last-index:%" PRId64 " last-term:%" PRIu64 " last-cfg:%" PRId64
|
||||
", seq:%d ack:%d finish:%d, as:%d, to-dnode:%d}"
|
||||
", term:%" PRIu64 ", commit-index:%" PRId64 ", firstver:%" PRId64 ", lastver:%" PRId64 ", min-match:%" PRId64
|
||||
", snap:{last-index:%" PRId64 ", term:%" PRIu64 "}, standby:%d, batch-sz:%d, replicas:%d, last-cfg:%" PRId64
|
||||
", chging:%d, restore:%d, quorum:%d, peer:%s, cfg:%s",
|
||||
pNode->vgId, eventLog, syncStr(pNode->state), pSender, pSender->term, pSender->startTime,
|
||||
pSender->snapshotParam.start, pSender->snapshotParam.end, pSender->snapshot.lastApplyIndex,
|
||||
pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex, pSender->seq, pSender->ack, pSender->finish,
|
||||
pSender->replicaIndex, DID(&pNode->replicasId[pSender->replicaIndex]), raftStoreGetTerm(pNode),
|
||||
pNode->commitIndex, logBeginIndex, logLastIndex, pNode->minMatchIndex, snapshot.lastApplyIndex,
|
||||
snapshot.lastApplyTerm, pNode->raftCfg.isStandBy, pNode->raftCfg.batchSize, pNode->replicaNum,
|
||||
pNode->raftCfg.lastConfigIndex, pNode->changing, pNode->restoreFinish, pNode->quorum, peerStr, cfgStr);
|
||||
taosPrintLog(flags, level, dflag,
|
||||
"vgId:%d, %s, sync:%s, snap-sender:%p signature:(%" PRId64 ", %" PRId64 "), {start:%" PRId64
|
||||
" end:%" PRId64 " last-index:%" PRId64 " last-term:%" PRIu64 " last-cfg:%" PRId64
|
||||
", seq:%d, ack:%d, "
|
||||
" buf:[%" PRId64 " %" PRId64 ", %" PRId64
|
||||
"), finish:%d, as:%d, to-dnode:%d}"
|
||||
", term:%" PRIu64 ", commit-index:%" PRId64 ", firstver:%" PRId64 ", lastver:%" PRId64
|
||||
", min-match:%" PRId64 ", snap:{last-index:%" PRId64 ", term:%" PRIu64
|
||||
"}, standby:%d, batch-sz:%d, replicas:%d, last-cfg:%" PRId64
|
||||
", chging:%d, restore:%d, quorum:%d, peer:%s, cfg:%s",
|
||||
pNode->vgId, eventLog, syncStr(pNode->state), pSender, pSender->term, pSender->startTime,
|
||||
pSender->snapshotParam.start, pSender->snapshotParam.end, pSender->snapshot.lastApplyIndex,
|
||||
pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex, pSender->seq, pSender->ack,
|
||||
pSender->pSndBuf->start, pSender->pSndBuf->cursor, pSender->pSndBuf->end, pSender->finish,
|
||||
pSender->replicaIndex, DID(&pNode->replicasId[pSender->replicaIndex]), raftStoreGetTerm(pNode),
|
||||
pNode->commitIndex, logBeginIndex, logLastIndex, pNode->minMatchIndex, snapshot.lastApplyIndex,
|
||||
snapshot.lastApplyTerm, pNode->raftCfg.isStandBy, pNode->raftCfg.batchSize, pNode->replicaNum,
|
||||
pNode->raftCfg.lastConfigIndex, pNode->changing, pNode->restoreFinish, pNode->quorum, peerStr, cfgStr);
|
||||
}
|
||||
|
||||
void syncPrintSnapshotReceiverLog(const char* flags, ELogLevel level, int32_t dflag, SSyncSnapshotReceiver* pReceiver,
|
||||
|
@ -315,19 +318,21 @@ void syncPrintSnapshotReceiverLog(const char* flags, ELogLevel level, int32_t df
|
|||
taosPrintLog(
|
||||
flags, level, dflag,
|
||||
"vgId:%d, %s, sync:%s,"
|
||||
" snap-receiver:%p signature:(%" PRId64 ", %" PRId64 "), {start:%d ack:%d term:%" PRIu64 " start-time:%" PRId64
|
||||
" snap-receiver:%p signature:(%" PRId64 ", %" PRId64 "), {start:%d ack:%d buf:[%" PRId64 " %" PRId64 ", %" PRId64
|
||||
")"
|
||||
" from-dnode:%d, start:%" PRId64 " end:%" PRId64 " last-index:%" PRId64 " last-term:%" PRIu64 " last-cfg:%" PRId64
|
||||
"}"
|
||||
", term:%" PRIu64 ", commit-index:%" PRId64 ", firstver:%" PRId64 ", lastver:%" PRId64 ", min-match:%" PRId64
|
||||
", snap:{last-index:%" PRId64 ", last-term:%" PRIu64 "}, standby:%d, batch-sz:%d, replicas:%d, last-cfg:%" PRId64
|
||||
", chging:%d, restore:%d, quorum:%d, peer:%s, cfg:%s",
|
||||
pNode->vgId, eventLog, syncStr(pNode->state), pReceiver, pReceiver->term, pReceiver->startTime, pReceiver->start,
|
||||
pReceiver->ack, pReceiver->term, pReceiver->startTime, DID(&pReceiver->fromId), pReceiver->snapshotParam.start,
|
||||
pReceiver->snapshotParam.end, pReceiver->snapshot.lastApplyIndex, pReceiver->snapshot.lastApplyTerm,
|
||||
pReceiver->snapshot.lastConfigIndex, raftStoreGetTerm(pNode), pNode->commitIndex, logBeginIndex, logLastIndex,
|
||||
pNode->minMatchIndex, snapshot.lastApplyIndex, snapshot.lastApplyTerm, pNode->raftCfg.isStandBy,
|
||||
pNode->raftCfg.batchSize, pNode->replicaNum, pNode->raftCfg.lastConfigIndex, pNode->changing,
|
||||
pNode->restoreFinish, pNode->quorum, peerStr, cfgStr);
|
||||
pReceiver->ack, pReceiver->pRcvBuf->start, pReceiver->pRcvBuf->cursor, pReceiver->pRcvBuf->end,
|
||||
DID(&pReceiver->fromId), pReceiver->snapshotParam.start, pReceiver->snapshotParam.end,
|
||||
pReceiver->snapshot.lastApplyIndex, pReceiver->snapshot.lastApplyTerm, pReceiver->snapshot.lastConfigIndex,
|
||||
raftStoreGetTerm(pNode), pNode->commitIndex, logBeginIndex, logLastIndex, pNode->minMatchIndex,
|
||||
snapshot.lastApplyIndex, snapshot.lastApplyTerm, pNode->raftCfg.isStandBy, pNode->raftCfg.batchSize,
|
||||
pNode->replicaNum, pNode->raftCfg.lastConfigIndex, pNode->changing, pNode->restoreFinish, pNode->quorum, peerStr,
|
||||
cfgStr);
|
||||
}
|
||||
|
||||
void syncLogRecvTimer(SSyncNode* pSyncNode, const SyncTimeout* pMsg, const char* s) {
|
||||
|
|
Loading…
Reference in New Issue