fix/TD-32703-add-log

This commit is contained in:
dmchen 2024-10-29 04:11:11 +00:00
parent a11ad2a871
commit ff2f2f2b5f
4 changed files with 38 additions and 11 deletions

View File

@ -95,6 +95,8 @@ int32_t syncNodeOnAppendEntries(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
bool accepted = false;
SSyncRaftEntry* pEntry = NULL;
bool resetElect = false;
const STraceId* trace = &pRpcMsg->info.traceId;
char tbuf[40] = {0};
// if already drop replica, do not process
if (!syncNodeInRaftGroup(ths, &(pMsg->srcId))) {
@ -150,7 +152,7 @@ int32_t syncNodeOnAppendEntries(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
goto _IGNORE;
}
sTrace("vgId:%d, recv append entries msg. index:%" PRId64 ", term:%" PRId64 ", preLogIndex:%" PRId64
sGTrace("vgId:%d, recv append entries msg. index:%" PRId64 ", term:%" PRId64 ", preLogIndex:%" PRId64
", prevLogTerm:%" PRId64 " commitIndex:%" PRId64 " entryterm:%" PRId64,
pMsg->vgId, pMsg->prevLogIndex + 1, pMsg->term, pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->commitIndex,
pEntry->term);
@ -179,6 +181,11 @@ _SEND_RESPONSE:
sTrace("vgId:%d, update commit return index %" PRId64 "", ths->vgId, returnIndex);
}
TRACE_SET_MSGID(&(rpcRsp.info.traceId), tGenIdPI64());
trace = &(rpcRsp.info.traceId);
sGTrace("vgId:%d, send append reply matchIndex:%" PRId64 " term:%" PRId64 " lastSendIndex:%" PRId64
" to dest: 0x%016" PRIx64,
ths->vgId, pReply->matchIndex, pReply->term, pReply->lastSendIndex, pReply->destId.addr);
// ack, i.e. send response
TAOS_CHECK_RETURN(syncNodeSendMsgById(&pReply->destId, ths, &rpcRsp));

View File

@ -43,6 +43,8 @@ int32_t syncNodeOnAppendEntriesReply(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
int32_t code = 0;
SyncAppendEntriesReply* pMsg = (SyncAppendEntriesReply*)pRpcMsg->pCont;
int32_t ret = 0;
const STraceId* trace = &pRpcMsg->info.traceId;
char tbuf[40] = {0};
// if already drop replica, do not process
if (!syncNodeInRaftGroup(ths, &(pMsg->srcId))) {
@ -63,7 +65,7 @@ int32_t syncNodeOnAppendEntriesReply(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
return TSDB_CODE_SYN_WRONG_TERM;
}
sTrace("vgId:%d, received append entries reply. srcId:0x%016" PRIx64 ", term:%" PRId64 ", matchIndex:%" PRId64 "",
sGTrace("vgId:%d, received append entries reply. srcId:0x%016" PRIx64 ", term:%" PRId64 ", matchIndex:%" PRId64 "",
pMsg->vgId, pMsg->srcId.addr, pMsg->term, pMsg->matchIndex);
if (pMsg->success) {

View File

@ -1026,6 +1026,14 @@ int32_t syncLogReplRecover(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEn
int32_t code = 0;
if (pMgr->restored != false) return TSDB_CODE_SYN_INTERNAL_ERROR;
sTrace("vgId:%d, begin to recover sync log repl. peer: dnode:%d (%" PRIx64 "), repl-mgr:[%" PRId64 ", %" PRId64
", %" PRId64 ") restore:%d, buffer: [%" PRId64 ", %" PRId64 ", %" PRId64 ", %" PRId64
"), msg: {lastSendIndex:%" PRId64 ", matchIndex:%" PRId64 ", fsmState:%d, success:%d, lastMatchTerm:%" PRId64
"}",
pNode->vgId, DID(&destId), destId.addr, pMgr->startIndex, pMgr->matchIndex, pMgr->endIndex, pMgr->restored,
pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex, pMsg->lastSendIndex, pMsg->matchIndex,
pMsg->fsmState, pMsg->success, pMsg->lastMatchTerm);
if (pMgr->endIndex == 0) {
if (pMgr->startIndex != 0) return TSDB_CODE_SYN_INTERNAL_ERROR;
if (pMgr->matchIndex != 0) return TSDB_CODE_SYN_INTERNAL_ERROR;
@ -1171,6 +1179,11 @@ int32_t syncLogReplProbe(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex inde
int64_t nowMs = taosGetMonoTimestampMs();
int32_t code = 0;
sTrace("vgId:%d, begin to probe peer:%" PRIx64 " with msg of index:%" PRId64 ". repl-mgr:[%" PRId64 ", %" PRId64
", %" PRId64 "), restored:%d",
pNode->vgId, pNode->replicasId[pMgr->peerId].addr, index, pMgr->startIndex, pMgr->matchIndex, pMgr->endIndex,
pMgr->restored);
if (pMgr->endIndex > pMgr->startIndex &&
nowMs < pMgr->states[pMgr->startIndex % pMgr->size].timeMs + retryMaxWaitMs) {
return 0;
@ -1206,6 +1219,10 @@ int32_t syncLogReplProbe(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex inde
int32_t syncLogReplAttempt(SSyncLogReplMgr* pMgr, SSyncNode* pNode) {
if (!pMgr->restored) return TSDB_CODE_SYN_INTERNAL_ERROR;
sTrace("vgId:%d, begin to attempt replicate log entries from end to match. repl-mgr:[%" PRId64 ", %" PRId64
", %" PRId64 "), restore:%d",
pNode->vgId, pMgr->startIndex, pMgr->matchIndex, pMgr->endIndex, pMgr->restored);
SRaftId* pDestId = &pNode->replicasId[pMgr->peerId];
int32_t batchSize = TMAX(1, pMgr->size >> (4 + pMgr->retryBackoff));
int32_t code = 0;
@ -1527,10 +1544,11 @@ int32_t syncLogReplSendTo(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex ind
goto _err;
}
TAOS_CHECK_GOTO(syncNodeSendAppendEntries(pNode, pDestId, &msgOut), &lino, _err);
sTrace("vgId:%d, replicate one msg index:%" PRId64 " term:%" PRId64 " prevterm:%" PRId64 " to dest: 0x%016" PRIx64,
TRACE_SET_MSGID(&(msgOut.info.traceId), tGenIdPI64());
STraceId* trace = &(msgOut.info.traceId);
sGTrace("vgId:%d, replicate one msg index:%" PRId64 " term:%" PRId64 " prevterm:%" PRId64 " to dest: 0x%016" PRIx64,
pNode->vgId, pEntry->index, pEntry->term, prevLogTerm, pDestId->addr);
TAOS_CHECK_GOTO(syncNodeSendAppendEntries(pNode, pDestId, &msgOut), &lino, _err);
if (!inBuf) {
syncEntryDestroy(pEntry);

View File

@ -152,7 +152,7 @@ static void syncLogReplStates2Str(SSyncNode* pSyncNode, char* buf, int32_t bufLe
for (int32_t i = 0; i < pSyncNode->replicaNum; i++) {
SSyncLogReplMgr* pMgr = pSyncNode->logReplMgrs[i];
if (pMgr == NULL) break;
len += tsnprintf(buf + len, bufLen - len, "%d:%d [%" PRId64 " %" PRId64 ", %" PRId64 "]", i, pMgr->restored,
len += tsnprintf(buf + len, bufLen - len, "%d:%d [%" PRId64 ", %" PRId64 ", %" PRId64 "]", i, pMgr->restored,
pMgr->startIndex, pMgr->matchIndex, pMgr->endIndex);
if (i + 1 < pSyncNode->replicaNum) {
len += tsnprintf(buf + len, bufLen - len, "%s", ", ");