Merge pull request #26782 from taosdata/fix/TD-30989-sync1

fix/TD-30989
This commit is contained in:
Hongze Cheng 2024-07-27 12:07:15 +08:00 committed by GitHub
commit 51b358a6f9
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 45 additions and 16 deletions

View File

@ -669,8 +669,14 @@ int32_t taosGetErrSize();
#define TSDB_CODE_SYN_BUFFER_FULL TAOS_DEF_ERROR_CODE(0, 0x0916) #define TSDB_CODE_SYN_BUFFER_FULL TAOS_DEF_ERROR_CODE(0, 0x0916)
#define TSDB_CODE_SYN_WRITE_STALL TAOS_DEF_ERROR_CODE(0, 0x0917) #define TSDB_CODE_SYN_WRITE_STALL TAOS_DEF_ERROR_CODE(0, 0x0917)
#define TSDB_CODE_SYN_NEGOTIATION_WIN_FULL TAOS_DEF_ERROR_CODE(0, 0x0918) #define TSDB_CODE_SYN_NEGOTIATION_WIN_FULL TAOS_DEF_ERROR_CODE(0, 0x0918)
#define TSDB_CODE_SYN_WRONG_TERM TAOS_DEF_ERROR_CODE(0, 0x0919)
#define TSDB_CODE_SYN_WRONG_FSM_STATE TAOS_DEF_ERROR_CODE(0, 0x091A)
#define TSDB_CODE_SYN_WRONG_SYNC_STATE TAOS_DEF_ERROR_CODE(0, 0x091B)
#define TSDB_CODE_SYN_WRONG_REF TAOS_DEF_ERROR_CODE(0, 0x091C)
#define TSDB_CODE_SYN_INVALID_ID TAOS_DEF_ERROR_CODE(0, 0x091D)
#define TSDB_CODE_SYN_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x09FF) #define TSDB_CODE_SYN_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x09FF)
// tq // tq
#define TSDB_CODE_TQ_INVALID_CONFIG TAOS_DEF_ERROR_CODE(0, 0x0A00) #define TSDB_CODE_TQ_INVALID_CONFIG TAOS_DEF_ERROR_CODE(0, 0x0A00)
#define TSDB_CODE_TQ_INIT_FAILED TAOS_DEF_ERROR_CODE(0, 0x0A01) #define TSDB_CODE_TQ_INIT_FAILED TAOS_DEF_ERROR_CODE(0, 0x0A01)

View File

@ -40,6 +40,7 @@
// //
int32_t syncNodeOnAppendEntriesReply(SSyncNode* ths, const SRpcMsg* pRpcMsg) { int32_t syncNodeOnAppendEntriesReply(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
int32_t code = 0;
SyncAppendEntriesReply* pMsg = (SyncAppendEntriesReply*)pRpcMsg->pCont; SyncAppendEntriesReply* pMsg = (SyncAppendEntriesReply*)pRpcMsg->pCont;
int32_t ret = 0; int32_t ret = 0;
@ -59,7 +60,7 @@ int32_t syncNodeOnAppendEntriesReply(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
if (pMsg->term > raftStoreGetTerm(ths)) { if (pMsg->term > raftStoreGetTerm(ths)) {
syncLogRecvAppendEntriesReply(ths, pMsg, "error term"); syncLogRecvAppendEntriesReply(ths, pMsg, "error term");
syncNodeStepDown(ths, pMsg->term); syncNodeStepDown(ths, pMsg->term);
return -1; return TSDB_CODE_SYN_WRONG_TERM;
} }
ASSERT(pMsg->term == raftStoreGetTerm(ths)); ASSERT(pMsg->term == raftStoreGetTerm(ths));
@ -81,17 +82,19 @@ int32_t syncNodeOnAppendEntriesReply(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
syncNodeStepDown(ths, pMsg->term); syncNodeStepDown(ths, pMsg->term);
} }
} else { } else {
(void)syncLogBufferCommit(ths->pLogBuf, ths, commitIndex); TAOS_CHECK_RETURN(syncLogBufferCommit(ths->pLogBuf, ths, commitIndex));
} }
} }
// replicate log // replicate log
SSyncLogReplMgr* pMgr = syncNodeGetLogReplMgr(ths, &pMsg->srcId); SSyncLogReplMgr* pMgr = syncNodeGetLogReplMgr(ths, &pMsg->srcId);
if (pMgr == NULL) { if (pMgr == NULL) {
code = TSDB_CODE_MND_RETURN_VALUE_NULL;
if (terrno != 0) code = terrno;
sError("vgId:%d, failed to get log repl mgr for src addr: 0x%016" PRIx64, ths->vgId, pMsg->srcId.addr); sError("vgId:%d, failed to get log repl mgr for src addr: 0x%016" PRIx64, ths->vgId, pMsg->srcId.addr);
return -1; TAOS_RETURN(code);
} }
(void)syncLogReplProcessReply(pMgr, ths, pMsg); TAOS_CHECK_RETURN(syncLogReplProcessReply(pMgr, ths, pMsg));
} }
return 0; TAOS_RETURN(code);
} }

View File

@ -72,17 +72,25 @@ bool syncNodeAgreedUpon(SSyncNode* pNode, SyncIndex index) {
} }
int64_t syncNodeUpdateCommitIndex(SSyncNode* ths, SyncIndex commitIndex) { int64_t syncNodeUpdateCommitIndex(SSyncNode* ths, SyncIndex commitIndex) {
int32_t code = 0;
SyncIndex lastVer = ths->pLogStore->syncLogLastIndex(ths->pLogStore); SyncIndex lastVer = ths->pLogStore->syncLogLastIndex(ths->pLogStore);
commitIndex = TMAX(commitIndex, ths->commitIndex); commitIndex = TMAX(commitIndex, ths->commitIndex);
ths->commitIndex = TMIN(commitIndex, lastVer); ths->commitIndex = TMIN(commitIndex, lastVer);
ths->pLogStore->syncLogUpdateCommitIndex(ths->pLogStore, ths->commitIndex); if ((code = ths->pLogStore->syncLogUpdateCommitIndex(ths->pLogStore, ths->commitIndex)) != 0) {
// TODO add return when error
sError("failed to update commit index since %s", tstrerror(code));
}
return ths->commitIndex; return ths->commitIndex;
} }
int64_t syncNodeCheckCommitIndex(SSyncNode* ths, SyncIndex indexLikely) { int64_t syncNodeCheckCommitIndex(SSyncNode* ths, SyncIndex indexLikely) {
int32_t code = 0;
if (indexLikely > ths->commitIndex && syncNodeAgreedUpon(ths, indexLikely)) { if (indexLikely > ths->commitIndex && syncNodeAgreedUpon(ths, indexLikely)) {
SyncIndex commitIndex = indexLikely; SyncIndex commitIndex = indexLikely;
syncNodeUpdateCommitIndex(ths, commitIndex); if ((code = syncNodeUpdateCommitIndex(ths, commitIndex)) != 0) {
// TODO add return when error
sError("failed to update commit index since %s", tstrerror(code));
}
sTrace("vgId:%d, agreed upon. role:%d, term:%" PRId64 ", index:%" PRId64 "", ths->vgId, ths->state, sTrace("vgId:%d, agreed upon. role:%d, term:%" PRId64 ", index:%" PRId64 "", ths->vgId, ths->state,
raftStoreGetTerm(ths), commitIndex); raftStoreGetTerm(ths), commitIndex);
} }
@ -90,9 +98,13 @@ int64_t syncNodeCheckCommitIndex(SSyncNode* ths, SyncIndex indexLikely) {
} }
int64_t syncNodeUpdateAssignedCommitIndex(SSyncNode* ths, SyncIndex assignedCommitIndex) { int64_t syncNodeUpdateAssignedCommitIndex(SSyncNode* ths, SyncIndex assignedCommitIndex) {
int32_t code = 0;
SyncIndex lastVer = ths->pLogStore->syncLogLastIndex(ths->pLogStore); SyncIndex lastVer = ths->pLogStore->syncLogLastIndex(ths->pLogStore);
assignedCommitIndex = TMAX(assignedCommitIndex, ths->assignedCommitIndex); assignedCommitIndex = TMAX(assignedCommitIndex, ths->assignedCommitIndex);
ths->assignedCommitIndex = TMIN(assignedCommitIndex, lastVer); ths->assignedCommitIndex = TMIN(assignedCommitIndex, lastVer);
ths->pLogStore->syncLogUpdateCommitIndex(ths->pLogStore, ths->assignedCommitIndex); if ((code = ths->pLogStore->syncLogUpdateCommitIndex(ths->pLogStore, ths->assignedCommitIndex)) != 0) {
// TODO add return when error
sError("failed to update commit index since %s", tstrerror(code));
}
return ths->commitIndex; return ths->commitIndex;
} }

View File

@ -73,7 +73,7 @@ static int32_t syncNodeRequestVotePeers(SSyncNode* pNode) {
int32_t syncNodeElect(SSyncNode* pSyncNode) { int32_t syncNodeElect(SSyncNode* pSyncNode) {
if (pSyncNode->fsmState == SYNC_FSM_STATE_INCOMPLETE) { if (pSyncNode->fsmState == SYNC_FSM_STATE_INCOMPLETE) {
sNError(pSyncNode, "skip leader election due to incomplete fsm state"); sNError(pSyncNode, "skip leader election due to incomplete fsm state");
return -1; return TSDB_CODE_SYN_WRONG_FSM_STATE;
} }
sNInfo(pSyncNode, "begin election"); sNInfo(pSyncNode, "begin election");
@ -86,7 +86,7 @@ int32_t syncNodeElect(SSyncNode* pSyncNode) {
if (pSyncNode->state != TAOS_SYNC_STATE_CANDIDATE) { if (pSyncNode->state != TAOS_SYNC_STATE_CANDIDATE) {
sNError(pSyncNode, "not candidate, can not elect"); sNError(pSyncNode, "not candidate, can not elect");
return -1; return TSDB_CODE_SYN_WRONG_SYNC_STATE;
} }
// start election // start election

View File

@ -48,14 +48,14 @@ int32_t syncInit() {
if (gNodeRefId < 0) { if (gNodeRefId < 0) {
sError("failed to init node ref"); sError("failed to init node ref");
syncCleanUp(); syncCleanUp();
return -1; return TSDB_CODE_SYN_WRONG_REF;
} }
gHbDataRefId = taosOpenRef(200, (RefFp)syncHbTimerDataFree); gHbDataRefId = taosOpenRef(200, (RefFp)syncHbTimerDataFree);
if (gHbDataRefId < 0) { if (gHbDataRefId < 0) {
sError("failed to init hb-data ref"); sError("failed to init hb-data ref");
syncCleanUp(); syncCleanUp();
return -1; return TSDB_CODE_SYN_WRONG_REF;
} }
sDebug("sync rsetId:%d is open", gNodeRefId); sDebug("sync rsetId:%d is open", gNodeRefId);
@ -106,7 +106,7 @@ void syncNodeRelease(SSyncNode *pNode) {
int64_t syncHbTimerDataAdd(SSyncHbTimerData *pData) { int64_t syncHbTimerDataAdd(SSyncHbTimerData *pData) {
pData->rid = taosAddRef(gHbDataRefId, pData); pData->rid = taosAddRef(gHbDataRefId, pData);
if (pData->rid < 0) return -1; if (pData->rid < 0) return TSDB_CODE_SYN_WRONG_REF;
return pData->rid; return pData->rid;
} }

View File

@ -98,6 +98,7 @@ SSyncLogReplMgr *syncNodeGetLogReplMgr(SSyncNode *pNode, SRaftId *pRaftId) {
} }
} }
terrno = TSDB_CODE_SYN_INVALID_ID;
sError("vgId:%d, indexmgr get replmgr from dnode:%d cluster:%d failed", pNode->vgId, DID(pRaftId), CID(pRaftId)); sError("vgId:%d, indexmgr get replmgr from dnode:%d cluster:%d failed", pNode->vgId, DID(pRaftId), CID(pRaftId));
return NULL; return NULL;
} }
@ -110,6 +111,7 @@ SyncIndex syncIndexMgrGetIndex(SSyncIndexMgr *pIndexMgr, const SRaftId *pRaftId)
} }
} }
terrno = TSDB_CODE_SYN_INVALID_ID;
sError("vgId:%d, indexmgr get index from dnode:%d cluster:%d failed", pIndexMgr->pNode->vgId, DID(pRaftId), sError("vgId:%d, indexmgr get index from dnode:%d cluster:%d failed", pIndexMgr->pNode->vgId, DID(pRaftId),
CID(pRaftId)); CID(pRaftId));
return SYNC_INDEX_INVALID; return SYNC_INDEX_INVALID;
@ -137,7 +139,8 @@ int64_t syncIndexMgrGetStartTime(SSyncIndexMgr *pIndexMgr, const SRaftId *pRaftI
sError("vgId:%d, indexmgr get start-time from dnode:%d cluster:%d failed", pIndexMgr->pNode->vgId, DID(pRaftId), sError("vgId:%d, indexmgr get start-time from dnode:%d cluster:%d failed", pIndexMgr->pNode->vgId, DID(pRaftId),
CID(pRaftId)); CID(pRaftId));
return -1; return TSDB_CODE_SYN_INVALID_ID;
;
} }
void syncIndexMgrSetRecvTime(SSyncIndexMgr *pIndexMgr, const SRaftId *pRaftId, int64_t recvTime) { void syncIndexMgrSetRecvTime(SSyncIndexMgr *pIndexMgr, const SRaftId *pRaftId, int64_t recvTime) {
@ -162,7 +165,7 @@ int64_t syncIndexMgrGetRecvTime(SSyncIndexMgr *pIndexMgr, const SRaftId *pRaftId
sError("vgId:%d, indexmgr get recv-time from dnode:%d cluster:%d failed", pIndexMgr->pNode->vgId, DID(pRaftId), sError("vgId:%d, indexmgr get recv-time from dnode:%d cluster:%d failed", pIndexMgr->pNode->vgId, DID(pRaftId),
CID(pRaftId)); CID(pRaftId));
return -1; return TSDB_CODE_SYN_INVALID_ID;
} }
void syncIndexMgrSetTerm(SSyncIndexMgr *pIndexMgr, const SRaftId *pRaftId, SyncTerm term) { void syncIndexMgrSetTerm(SSyncIndexMgr *pIndexMgr, const SRaftId *pRaftId, SyncTerm term) {
@ -187,5 +190,5 @@ SyncTerm syncIndexMgrGetTerm(SSyncIndexMgr *pIndexMgr, const SRaftId *pRaftId) {
sError("vgId:%d, indexmgr get term from dnode:%d cluster:%d failed", pIndexMgr->pNode->vgId, DID(pRaftId), sError("vgId:%d, indexmgr get term from dnode:%d cluster:%d failed", pIndexMgr->pNode->vgId, DID(pRaftId),
CID(pRaftId)); CID(pRaftId));
return -1; return TSDB_CODE_SYN_INVALID_ID;
} }

View File

@ -528,6 +528,11 @@ TAOS_DEFINE_ERROR(TSDB_CODE_SYN_INVALID_SNAPSHOT_MSG, "Sync invalid snapshot
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_BUFFER_FULL, "Sync buffer is full") TAOS_DEFINE_ERROR(TSDB_CODE_SYN_BUFFER_FULL, "Sync buffer is full")
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_WRITE_STALL, "Sync write stall") TAOS_DEFINE_ERROR(TSDB_CODE_SYN_WRITE_STALL, "Sync write stall")
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_NEGOTIATION_WIN_FULL, "Sync negotiation win is full") TAOS_DEFINE_ERROR(TSDB_CODE_SYN_NEGOTIATION_WIN_FULL, "Sync negotiation win is full")
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_WRONG_TERM, "Sync got a wrong term")
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_WRONG_FSM_STATE, "Sync got a wrong fsm state")
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_WRONG_SYNC_STATE, "Sync got a wrong state")
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_WRONG_REF, "Sync got a wrong ref")
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_INVALID_ID, "Sync invalid id")
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_INTERNAL_ERROR, "Sync internal error") TAOS_DEFINE_ERROR(TSDB_CODE_SYN_INTERNAL_ERROR, "Sync internal error")
//tq //tq