refactor(sync): modify wal error log
This commit is contained in:
parent
81eae2a9af
commit
ca4d7329ce
|
@ -466,14 +466,11 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
|
||||||
// print log
|
// print log
|
||||||
char logBuf[128] = {0};
|
syncAppendEntriesLog2("==syncNodeOnAppendEntriesSnapshotCb==", pMsg);
|
||||||
snprintf(logBuf, sizeof(logBuf), "recv SyncAppendEntries, vgId:%d, term:%lu", ths->vgId,
|
|
||||||
ths->pRaftStore->currentTerm);
|
|
||||||
syncAppendEntriesLog2(logBuf, pMsg);
|
|
||||||
|
|
||||||
// if already drop replica, do not process
|
// if already drop replica, do not process
|
||||||
if (!syncNodeInRaftGroup(ths, &(pMsg->srcId)) && !ths->pRaftCfg->isStandBy) {
|
if (!syncNodeInRaftGroup(ths, &(pMsg->srcId)) && !ths->pRaftCfg->isStandBy) {
|
||||||
sInfo("recv SyncAppendEntries maybe replica already dropped");
|
syncNodeEventLog(ths, "recv sync-append-entries, maybe replica already dropped");
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -497,7 +494,7 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs
|
||||||
do {
|
do {
|
||||||
bool condition = pMsg->term == ths->pRaftStore->currentTerm && ths->state == TAOS_SYNC_STATE_CANDIDATE;
|
bool condition = pMsg->term == ths->pRaftStore->currentTerm && ths->state == TAOS_SYNC_STATE_CANDIDATE;
|
||||||
if (condition) {
|
if (condition) {
|
||||||
sTrace("recv SyncAppendEntries, candidate to follower");
|
syncNodeEventLog(ths, "recv sync-append-entries, candidate to follower");
|
||||||
|
|
||||||
syncNodeBecomeFollower(ths, "from candidate by append entries");
|
syncNodeBecomeFollower(ths, "from candidate by append entries");
|
||||||
// do not reply?
|
// do not reply?
|
||||||
|
@ -538,11 +535,7 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs
|
||||||
bool condition = condition1 || condition2 || condition3 || condition4;
|
bool condition = condition1 || condition2 || condition3 || condition4;
|
||||||
|
|
||||||
if (condition) {
|
if (condition) {
|
||||||
sTrace(
|
syncNodeEventLog(ths, "recv sync-append-entries, fake match");
|
||||||
"recv SyncAppendEntries, fake match, myLastIndex:%ld, syncLogBeginIndex:%ld, syncLogEndIndex:%ld, "
|
|
||||||
"condition1:%d, condition2:%d, condition3:%d, condition4:%d",
|
|
||||||
myLastIndex, ths->pLogStore->syncLogBeginIndex(ths->pLogStore),
|
|
||||||
ths->pLogStore->syncLogEndIndex(ths->pLogStore), condition1, condition2, condition3, condition4);
|
|
||||||
|
|
||||||
// prepare response msg
|
// prepare response msg
|
||||||
SyncAppendEntriesReply* pReply = syncAppendEntriesReplyBuild(ths->vgId);
|
SyncAppendEntriesReply* pReply = syncAppendEntriesReplyBuild(ths->vgId);
|
||||||
|
@ -576,8 +569,7 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs
|
||||||
bool condition = (pMsg->term == ths->pRaftStore->currentTerm) && (ths->state == TAOS_SYNC_STATE_FOLLOWER) &&
|
bool condition = (pMsg->term == ths->pRaftStore->currentTerm) && (ths->state == TAOS_SYNC_STATE_FOLLOWER) &&
|
||||||
(pMsg->prevLogIndex <= ths->commitIndex);
|
(pMsg->prevLogIndex <= ths->commitIndex);
|
||||||
if (condition) {
|
if (condition) {
|
||||||
sTrace("recv SyncAppendEntries, fake match2, msg-prevLogIndex:%ld, my-commitIndex:%ld", pMsg->prevLogIndex,
|
syncNodeEventLog(ths, "recv sync-append-entries, fake match2");
|
||||||
ths->commitIndex);
|
|
||||||
|
|
||||||
SyncIndex matchIndex = ths->commitIndex;
|
SyncIndex matchIndex = ths->commitIndex;
|
||||||
bool hasAppendEntries = pMsg->dataLen > 0;
|
bool hasAppendEntries = pMsg->dataLen > 0;
|
||||||
|
@ -650,11 +642,7 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs
|
||||||
bool condition = condition1 || condition2;
|
bool condition = condition1 || condition2;
|
||||||
|
|
||||||
if (condition) {
|
if (condition) {
|
||||||
sTrace(
|
syncNodeEventLog(ths, "recv sync-append-entries, not match");
|
||||||
"recv SyncAppendEntries, not match, syncLogBeginIndex:%ld, syncLogEndIndex:%ld, condition1:%d, "
|
|
||||||
"condition2:%d, logOK:%d",
|
|
||||||
ths->pLogStore->syncLogBeginIndex(ths->pLogStore), ths->pLogStore->syncLogEndIndex(ths->pLogStore),
|
|
||||||
condition1, condition2, logOK);
|
|
||||||
|
|
||||||
// prepare response msg
|
// prepare response msg
|
||||||
SyncAppendEntriesReply* pReply = syncAppendEntriesReplyBuild(ths->vgId);
|
SyncAppendEntriesReply* pReply = syncAppendEntriesReplyBuild(ths->vgId);
|
||||||
|
@ -693,8 +681,7 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs
|
||||||
// has entries in SyncAppendEntries msg
|
// has entries in SyncAppendEntries msg
|
||||||
bool hasAppendEntries = pMsg->dataLen > 0;
|
bool hasAppendEntries = pMsg->dataLen > 0;
|
||||||
|
|
||||||
sTrace("recv SyncAppendEntries, match, myLastIndex:%ld, hasExtraEntries:%d, hasAppendEntries:%d", myLastIndex,
|
syncNodeEventLog(ths, "recv sync-append-entries, match");
|
||||||
hasExtraEntries, hasAppendEntries);
|
|
||||||
|
|
||||||
if (hasExtraEntries) {
|
if (hasExtraEntries) {
|
||||||
// make log same, rollback deleted entries
|
// make log same, rollback deleted entries
|
||||||
|
|
|
@ -101,32 +101,27 @@ int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntries
|
||||||
int32_t ret = 0;
|
int32_t ret = 0;
|
||||||
|
|
||||||
// print log
|
// print log
|
||||||
char logBuf[128] = {0};
|
syncAppendEntriesReplyLog2("==syncNodeOnAppendEntriesReplySnapshotCb==", pMsg);
|
||||||
snprintf(logBuf, sizeof(logBuf), "recv SyncAppendEntriesReply, vgId:%d, term:%lu", ths->vgId,
|
|
||||||
ths->pRaftStore->currentTerm);
|
|
||||||
syncAppendEntriesReplyLog2(logBuf, pMsg);
|
|
||||||
|
|
||||||
// if already drop replica, do not process
|
// if already drop replica, do not process
|
||||||
if (!syncNodeInRaftGroup(ths, &(pMsg->srcId)) && !ths->pRaftCfg->isStandBy) {
|
if (!syncNodeInRaftGroup(ths, &(pMsg->srcId)) && !ths->pRaftCfg->isStandBy) {
|
||||||
sInfo("recv SyncAppendEntriesReply, maybe replica already dropped");
|
syncNodeEventLog(ths, "recv sync-append-entries-reply, maybe replica already dropped");
|
||||||
return ret;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
// drop stale response
|
// drop stale response
|
||||||
if (pMsg->term < ths->pRaftStore->currentTerm) {
|
if (pMsg->term < ths->pRaftStore->currentTerm) {
|
||||||
sTrace("recv SyncAppendEntriesReply, drop stale response, receive_term:%lu current_term:%lu", pMsg->term,
|
char logBuf[128];
|
||||||
ths->pRaftStore->currentTerm);
|
snprintf(logBuf, sizeof(logBuf), "recv sync-append-entries-reply, recv-term:%lu, drop stale response", pMsg->term);
|
||||||
return ret;
|
syncNodeEventLog(ths, logBuf);
|
||||||
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
syncIndexMgrLog2("recv SyncAppendEntriesReply, before pNextIndex:", ths->pNextIndex);
|
|
||||||
syncIndexMgrLog2("recv SyncAppendEntriesReply, before pMatchIndex:", ths->pMatchIndex);
|
|
||||||
if (gRaftDetailLog) {
|
if (gRaftDetailLog) {
|
||||||
SSnapshot snapshot;
|
syncNodeEventLog(ths, "recv sync-append-entries-reply, before");
|
||||||
ths->pFsm->FpGetSnapshotInfo(ths->pFsm, &snapshot);
|
|
||||||
sTrace("recv SyncAppendEntriesReply, before snapshot.lastApplyIndex:%ld, snapshot.lastApplyTerm:%lu",
|
|
||||||
snapshot.lastApplyIndex, snapshot.lastApplyTerm);
|
|
||||||
}
|
}
|
||||||
|
syncIndexMgrLog2("recv sync-append-entries-reply, before pNextIndex:", ths->pNextIndex);
|
||||||
|
syncIndexMgrLog2("recv sync-append-entries-reply, before pMatchIndex:", ths->pMatchIndex);
|
||||||
|
|
||||||
// no need this code, because if I receive reply.term, then I must have sent for that term.
|
// no need this code, because if I receive reply.term, then I must have sent for that term.
|
||||||
// if (pMsg->term > ths->pRaftStore->currentTerm) {
|
// if (pMsg->term > ths->pRaftStore->currentTerm) {
|
||||||
|
@ -134,12 +129,10 @@ int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntries
|
||||||
// }
|
// }
|
||||||
|
|
||||||
if (pMsg->term > ths->pRaftStore->currentTerm) {
|
if (pMsg->term > ths->pRaftStore->currentTerm) {
|
||||||
char logBuf[128] = {0};
|
char logBuf[128];
|
||||||
snprintf(logBuf, sizeof(logBuf), "recv SyncAppendEntriesReply, error term, receive_term:%lu current_term:%lu",
|
snprintf(logBuf, sizeof(logBuf), "recv sync-append-entries-reply, error term, recv-term:%lu", pMsg->term);
|
||||||
pMsg->term, ths->pRaftStore->currentTerm);
|
syncNodeErrorLog(ths, logBuf);
|
||||||
syncNodeLog2(logBuf, ths);
|
return -1;
|
||||||
sError("%s", logBuf);
|
|
||||||
return ret;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
ASSERT(pMsg->term == ths->pRaftStore->currentTerm);
|
ASSERT(pMsg->term == ths->pRaftStore->currentTerm);
|
||||||
|
@ -228,8 +221,11 @@ int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntries
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
syncIndexMgrLog2("recv SyncAppendEntriesReply, after pNextIndex:", ths->pNextIndex);
|
if (gRaftDetailLog) {
|
||||||
syncIndexMgrLog2("recv SyncAppendEntriesReply, after pMatchIndex:", ths->pMatchIndex);
|
syncNodeEventLog(ths, "recv sync-append-entries-reply, after");
|
||||||
|
}
|
||||||
|
syncIndexMgrLog2("recv sync-append-entries-reply, after pNextIndex:", ths->pNextIndex);
|
||||||
|
syncIndexMgrLog2("recv sync-append-entries-reply, after pMatchIndex:", ths->pMatchIndex);
|
||||||
|
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
|
@ -416,7 +416,7 @@ int32_t syncGetSnapshotMetaByIndex(int64_t rid, SyncIndex snapshotIndex, struct
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
sMeta->lastConfigIndex = lastIndex;
|
sMeta->lastConfigIndex = lastIndex;
|
||||||
sTrace("vgId:%d, get snapshot meta by index:%" PRId64 " lastConfigIndex:%" PRId64, pSyncNode->vgId, snapshotIndex,
|
sTrace("vgId:%d, get snapshot meta by index:%" PRId64 " lcindex:%" PRId64, pSyncNode->vgId, snapshotIndex,
|
||||||
sMeta->lastConfigIndex);
|
sMeta->lastConfigIndex);
|
||||||
|
|
||||||
taosReleaseRef(tsNodeRefId, pSyncNode->rid);
|
taosReleaseRef(tsNodeRefId, pSyncNode->rid);
|
||||||
|
@ -433,8 +433,9 @@ SyncIndex syncNodeGetSnapshotConfigIndex(SSyncNode* pSyncNode, SyncIndex snapsho
|
||||||
lastIndex = (pSyncNode->pRaftCfg->configIndexArr)[i];
|
lastIndex = (pSyncNode->pRaftCfg->configIndexArr)[i];
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
sTrace("vgId:%d, sync get snapshot last config index, index:%ld lcindex:%ld", pSyncNode->vgId, snapshotLastApplyIndex,
|
||||||
|
lastIndex);
|
||||||
|
|
||||||
sTrace("sync syncNodeGetSnapshotConfigIndex index:%ld lastConfigIndex:%ld", snapshotLastApplyIndex, lastIndex);
|
|
||||||
return lastIndex;
|
return lastIndex;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -46,13 +46,13 @@ static SyncIndex logStoreGetCommitIndex(SSyncLogStore* pLogStore);
|
||||||
|
|
||||||
// refactor, log[0 .. n] ==> log[m .. n]
|
// refactor, log[0 .. n] ==> log[m .. n]
|
||||||
static int32_t raftLogSetBeginIndex(struct SSyncLogStore* pLogStore, SyncIndex beginIndex) {
|
static int32_t raftLogSetBeginIndex(struct SSyncLogStore* pLogStore, SyncIndex beginIndex) {
|
||||||
sTrace("raftLogSetBeginIndex beginIndex:%ld", beginIndex);
|
|
||||||
|
|
||||||
// if beginIndex == 0, donot need call this funciton
|
// if beginIndex == 0, donot need call this funciton
|
||||||
ASSERT(beginIndex > 0);
|
ASSERT(beginIndex > 0);
|
||||||
|
|
||||||
SSyncLogStoreData* pData = pLogStore->data;
|
SSyncLogStoreData* pData = pLogStore->data;
|
||||||
SWal* pWal = pData->pWal;
|
SWal* pWal = pData->pWal;
|
||||||
|
sTrace("vgId:%d, reset wal begin index:%ld", pData->pSyncNode->vgId, beginIndex);
|
||||||
|
|
||||||
pData->beginIndex = beginIndex;
|
pData->beginIndex = beginIndex;
|
||||||
walRestoreFromSnapshot(pWal, beginIndex - 1);
|
walRestoreFromSnapshot(pWal, beginIndex - 1);
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -160,8 +160,12 @@ static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntr
|
||||||
const char* errStr = tstrerror(err);
|
const char* errStr = tstrerror(err);
|
||||||
int32_t sysErr = errno;
|
int32_t sysErr = errno;
|
||||||
const char* sysErrStr = strerror(errno);
|
const char* sysErrStr = strerror(errno);
|
||||||
sError("vgId:%d wal write error, index:%ld, err:%d %X, msg:%s, syserr:%d, sysmsg:%s", pData->pSyncNode->vgId,
|
|
||||||
|
char logBuf[128];
|
||||||
|
snprintf(logBuf, sizeof(logBuf), "wal write error, index:%ld, err:%d %X, msg:%s, syserr:%d, sysmsg:%s",
|
||||||
pEntry->index, err, err, errStr, sysErr, sysErrStr);
|
pEntry->index, err, err, errStr, sysErr, sysErrStr);
|
||||||
|
syncNodeErrorLog(pData->pSyncNode, logBuf);
|
||||||
|
|
||||||
ASSERT(0);
|
ASSERT(0);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -240,8 +244,17 @@ static int32_t raftLogGetEntry(struct SSyncLogStore* pLogStore, SyncIndex index,
|
||||||
const char* errStr = tstrerror(err);
|
const char* errStr = tstrerror(err);
|
||||||
int32_t sysErr = errno;
|
int32_t sysErr = errno;
|
||||||
const char* sysErrStr = strerror(errno);
|
const char* sysErrStr = strerror(errno);
|
||||||
sError("vgId:%d wal read error, index:%ld, err:%d %X, msg:%s, syserr:%d, sysmsg:%s", pData->pSyncNode->vgId, index,
|
|
||||||
err, err, errStr, sysErr, sysErrStr);
|
do {
|
||||||
|
char logBuf[128];
|
||||||
|
snprintf(logBuf, sizeof(logBuf), "wal read error, index:%ld, err:%d %X, msg:%s, syserr:%d, sysmsg:%s", index, err,
|
||||||
|
err, errStr, sysErr, sysErrStr);
|
||||||
|
if (terrno == TSDB_CODE_WAL_LOG_NOT_EXIST) {
|
||||||
|
syncNodeEventLog(pData->pSyncNode, logBuf);
|
||||||
|
} else {
|
||||||
|
syncNodeErrorLog(pData->pSyncNode, logBuf);
|
||||||
|
}
|
||||||
|
} while (0);
|
||||||
|
|
||||||
int32_t saveErr = terrno;
|
int32_t saveErr = terrno;
|
||||||
walCloseReadHandle(pWalHandle);
|
walCloseReadHandle(pWalHandle);
|
||||||
|
@ -368,8 +381,11 @@ int32_t logStoreAppendEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry) {
|
||||||
const char* errStr = tstrerror(err);
|
const char* errStr = tstrerror(err);
|
||||||
int32_t sysErr = errno;
|
int32_t sysErr = errno;
|
||||||
const char* sysErrStr = strerror(errno);
|
const char* sysErrStr = strerror(errno);
|
||||||
sError("vgId:%d wal write error, index:%ld, err:%d %X, msg:%s, syserr:%d, sysmsg:%s", pData->pSyncNode->vgId,
|
|
||||||
|
char logBuf[128];
|
||||||
|
snprintf(logBuf, sizeof(logBuf), "wal write error, index:%ld, err:%d %X, msg:%s, syserr:%d, sysmsg:%s",
|
||||||
pEntry->index, err, err, errStr, sysErr, sysErrStr);
|
pEntry->index, err, err, errStr, sysErr, sysErrStr);
|
||||||
|
syncNodeErrorLog(pData->pSyncNode, logBuf);
|
||||||
|
|
||||||
ASSERT(0);
|
ASSERT(0);
|
||||||
}
|
}
|
||||||
|
@ -398,12 +414,20 @@ SSyncRaftEntry* logStoreGetEntry(SSyncLogStore* pLogStore, SyncIndex index) {
|
||||||
const char* errStr = tstrerror(err);
|
const char* errStr = tstrerror(err);
|
||||||
int32_t sysErr = errno;
|
int32_t sysErr = errno;
|
||||||
const char* sysErrStr = strerror(errno);
|
const char* sysErrStr = strerror(errno);
|
||||||
sError("vgId:%d wal read error, index:%ld, err:%d %X, msg:%s, syserr:%d, sysmsg:%s", pData->pSyncNode->vgId,
|
|
||||||
index, err, err, errStr, sysErr, sysErrStr);
|
do {
|
||||||
|
char logBuf[128];
|
||||||
|
snprintf(logBuf, sizeof(logBuf), "wal read error, index:%ld, err:%d %X, msg:%s, syserr:%d, sysmsg:%s", index,
|
||||||
|
err, err, errStr, sysErr, sysErrStr);
|
||||||
|
if (terrno == TSDB_CODE_WAL_LOG_NOT_EXIST) {
|
||||||
|
syncNodeEventLog(pData->pSyncNode, logBuf);
|
||||||
|
} else {
|
||||||
|
syncNodeErrorLog(pData->pSyncNode, logBuf);
|
||||||
|
}
|
||||||
|
} while (0);
|
||||||
|
|
||||||
ASSERT(0);
|
ASSERT(0);
|
||||||
}
|
}
|
||||||
// ASSERT(walReadWithHandle(pWalHandle, index) == 0);
|
|
||||||
|
|
||||||
SSyncRaftEntry* pEntry = syncEntryBuild(pWalHandle->pHead->head.bodyLen);
|
SSyncRaftEntry* pEntry = syncEntryBuild(pWalHandle->pHead->head.bodyLen);
|
||||||
ASSERT(pEntry != NULL);
|
ASSERT(pEntry != NULL);
|
||||||
|
|
Loading…
Reference in New Issue