fix/TD-30989
This commit is contained in:
parent
3f2239ef03
commit
698cd2a5e2
|
@ -668,8 +668,14 @@ int32_t taosGetErrSize();
|
||||||
#define TSDB_CODE_SYN_BUFFER_FULL TAOS_DEF_ERROR_CODE(0, 0x0916)
|
#define TSDB_CODE_SYN_BUFFER_FULL TAOS_DEF_ERROR_CODE(0, 0x0916)
|
||||||
#define TSDB_CODE_SYN_WRITE_STALL TAOS_DEF_ERROR_CODE(0, 0x0917)
|
#define TSDB_CODE_SYN_WRITE_STALL TAOS_DEF_ERROR_CODE(0, 0x0917)
|
||||||
#define TSDB_CODE_SYN_NEGOTIATION_WIN_FULL TAOS_DEF_ERROR_CODE(0, 0x0918)
|
#define TSDB_CODE_SYN_NEGOTIATION_WIN_FULL TAOS_DEF_ERROR_CODE(0, 0x0918)
|
||||||
|
#define TSDB_CODE_SYN_WRONG_TERM TAOS_DEF_ERROR_CODE(0, 0x0919)
|
||||||
|
#define TSDB_CODE_SYN_WRONG_FSM_STATE TAOS_DEF_ERROR_CODE(0, 0x091A)
|
||||||
|
#define TSDB_CODE_SYN_WRONG_SYNC_STATE TAOS_DEF_ERROR_CODE(0, 0x091B)
|
||||||
|
#define TSDB_CODE_SYN_WRONG_REF TAOS_DEF_ERROR_CODE(0, 0x091C)
|
||||||
|
#define TSDB_CODE_SYN_INVALID_ID TAOS_DEF_ERROR_CODE(0, 0x091D)
|
||||||
#define TSDB_CODE_SYN_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x09FF)
|
#define TSDB_CODE_SYN_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x09FF)
|
||||||
|
|
||||||
|
|
||||||
// tq
|
// tq
|
||||||
#define TSDB_CODE_TQ_INVALID_CONFIG TAOS_DEF_ERROR_CODE(0, 0x0A00)
|
#define TSDB_CODE_TQ_INVALID_CONFIG TAOS_DEF_ERROR_CODE(0, 0x0A00)
|
||||||
#define TSDB_CODE_TQ_INIT_FAILED TAOS_DEF_ERROR_CODE(0, 0x0A01)
|
#define TSDB_CODE_TQ_INIT_FAILED TAOS_DEF_ERROR_CODE(0, 0x0A01)
|
||||||
|
|
|
@ -40,6 +40,7 @@
|
||||||
//
|
//
|
||||||
|
|
||||||
int32_t syncNodeOnAppendEntriesReply(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
|
int32_t syncNodeOnAppendEntriesReply(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
|
||||||
|
int32_t code = 0;
|
||||||
SyncAppendEntriesReply* pMsg = (SyncAppendEntriesReply*)pRpcMsg->pCont;
|
SyncAppendEntriesReply* pMsg = (SyncAppendEntriesReply*)pRpcMsg->pCont;
|
||||||
int32_t ret = 0;
|
int32_t ret = 0;
|
||||||
|
|
||||||
|
@ -59,7 +60,7 @@ int32_t syncNodeOnAppendEntriesReply(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
|
||||||
if (pMsg->term > raftStoreGetTerm(ths)) {
|
if (pMsg->term > raftStoreGetTerm(ths)) {
|
||||||
syncLogRecvAppendEntriesReply(ths, pMsg, "error term");
|
syncLogRecvAppendEntriesReply(ths, pMsg, "error term");
|
||||||
syncNodeStepDown(ths, pMsg->term);
|
syncNodeStepDown(ths, pMsg->term);
|
||||||
return -1;
|
return TSDB_CODE_SYN_WRONG_TERM;
|
||||||
}
|
}
|
||||||
|
|
||||||
ASSERT(pMsg->term == raftStoreGetTerm(ths));
|
ASSERT(pMsg->term == raftStoreGetTerm(ths));
|
||||||
|
@ -81,17 +82,19 @@ int32_t syncNodeOnAppendEntriesReply(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
|
||||||
syncNodeStepDown(ths, pMsg->term);
|
syncNodeStepDown(ths, pMsg->term);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
(void)syncLogBufferCommit(ths->pLogBuf, ths, commitIndex);
|
TAOS_CHECK_RETURN(syncLogBufferCommit(ths->pLogBuf, ths, commitIndex));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// replicate log
|
// replicate log
|
||||||
SSyncLogReplMgr* pMgr = syncNodeGetLogReplMgr(ths, &pMsg->srcId);
|
SSyncLogReplMgr* pMgr = syncNodeGetLogReplMgr(ths, &pMsg->srcId);
|
||||||
if (pMgr == NULL) {
|
if (pMgr == NULL) {
|
||||||
|
code = TSDB_CODE_MND_RETURN_VALUE_NULL;
|
||||||
|
if (terrno != 0) code = terrno;
|
||||||
sError("vgId:%d, failed to get log repl mgr for src addr: 0x%016" PRIx64, ths->vgId, pMsg->srcId.addr);
|
sError("vgId:%d, failed to get log repl mgr for src addr: 0x%016" PRIx64, ths->vgId, pMsg->srcId.addr);
|
||||||
return -1;
|
TAOS_RETURN(code);
|
||||||
}
|
}
|
||||||
(void)syncLogReplProcessReply(pMgr, ths, pMsg);
|
TAOS_CHECK_RETURN(syncLogReplProcessReply(pMgr, ths, pMsg));
|
||||||
}
|
}
|
||||||
return 0;
|
TAOS_RETURN(code);
|
||||||
}
|
}
|
||||||
|
|
|
@ -72,17 +72,25 @@ bool syncNodeAgreedUpon(SSyncNode* pNode, SyncIndex index) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int64_t syncNodeUpdateCommitIndex(SSyncNode* ths, SyncIndex commitIndex) {
|
int64_t syncNodeUpdateCommitIndex(SSyncNode* ths, SyncIndex commitIndex) {
|
||||||
|
int32_t code = 0;
|
||||||
SyncIndex lastVer = ths->pLogStore->syncLogLastIndex(ths->pLogStore);
|
SyncIndex lastVer = ths->pLogStore->syncLogLastIndex(ths->pLogStore);
|
||||||
commitIndex = TMAX(commitIndex, ths->commitIndex);
|
commitIndex = TMAX(commitIndex, ths->commitIndex);
|
||||||
ths->commitIndex = TMIN(commitIndex, lastVer);
|
ths->commitIndex = TMIN(commitIndex, lastVer);
|
||||||
ths->pLogStore->syncLogUpdateCommitIndex(ths->pLogStore, ths->commitIndex);
|
if ((code = ths->pLogStore->syncLogUpdateCommitIndex(ths->pLogStore, ths->commitIndex)) != 0) {
|
||||||
|
// TODO add return when error
|
||||||
|
sError("failed to update commit index since %s", tstrerror(code));
|
||||||
|
}
|
||||||
return ths->commitIndex;
|
return ths->commitIndex;
|
||||||
}
|
}
|
||||||
|
|
||||||
int64_t syncNodeCheckCommitIndex(SSyncNode* ths, SyncIndex indexLikely) {
|
int64_t syncNodeCheckCommitIndex(SSyncNode* ths, SyncIndex indexLikely) {
|
||||||
|
int32_t code = 0;
|
||||||
if (indexLikely > ths->commitIndex && syncNodeAgreedUpon(ths, indexLikely)) {
|
if (indexLikely > ths->commitIndex && syncNodeAgreedUpon(ths, indexLikely)) {
|
||||||
SyncIndex commitIndex = indexLikely;
|
SyncIndex commitIndex = indexLikely;
|
||||||
syncNodeUpdateCommitIndex(ths, commitIndex);
|
if ((code = syncNodeUpdateCommitIndex(ths, commitIndex)) != 0) {
|
||||||
|
// TODO add return when error
|
||||||
|
sError("failed to update commit index since %s", tstrerror(code));
|
||||||
|
}
|
||||||
sTrace("vgId:%d, agreed upon. role:%d, term:%" PRId64 ", index:%" PRId64 "", ths->vgId, ths->state,
|
sTrace("vgId:%d, agreed upon. role:%d, term:%" PRId64 ", index:%" PRId64 "", ths->vgId, ths->state,
|
||||||
raftStoreGetTerm(ths), commitIndex);
|
raftStoreGetTerm(ths), commitIndex);
|
||||||
}
|
}
|
||||||
|
@ -90,9 +98,13 @@ int64_t syncNodeCheckCommitIndex(SSyncNode* ths, SyncIndex indexLikely) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int64_t syncNodeUpdateAssignedCommitIndex(SSyncNode* ths, SyncIndex assignedCommitIndex) {
|
int64_t syncNodeUpdateAssignedCommitIndex(SSyncNode* ths, SyncIndex assignedCommitIndex) {
|
||||||
|
int32_t code = 0;
|
||||||
SyncIndex lastVer = ths->pLogStore->syncLogLastIndex(ths->pLogStore);
|
SyncIndex lastVer = ths->pLogStore->syncLogLastIndex(ths->pLogStore);
|
||||||
assignedCommitIndex = TMAX(assignedCommitIndex, ths->assignedCommitIndex);
|
assignedCommitIndex = TMAX(assignedCommitIndex, ths->assignedCommitIndex);
|
||||||
ths->assignedCommitIndex = TMIN(assignedCommitIndex, lastVer);
|
ths->assignedCommitIndex = TMIN(assignedCommitIndex, lastVer);
|
||||||
ths->pLogStore->syncLogUpdateCommitIndex(ths->pLogStore, ths->assignedCommitIndex);
|
if ((code = ths->pLogStore->syncLogUpdateCommitIndex(ths->pLogStore, ths->assignedCommitIndex)) != 0) {
|
||||||
|
// TODO add return when error
|
||||||
|
sError("failed to update commit index since %s", tstrerror(code));
|
||||||
|
}
|
||||||
return ths->commitIndex;
|
return ths->commitIndex;
|
||||||
}
|
}
|
||||||
|
|
|
@ -73,7 +73,7 @@ static int32_t syncNodeRequestVotePeers(SSyncNode* pNode) {
|
||||||
int32_t syncNodeElect(SSyncNode* pSyncNode) {
|
int32_t syncNodeElect(SSyncNode* pSyncNode) {
|
||||||
if (pSyncNode->fsmState == SYNC_FSM_STATE_INCOMPLETE) {
|
if (pSyncNode->fsmState == SYNC_FSM_STATE_INCOMPLETE) {
|
||||||
sNError(pSyncNode, "skip leader election due to incomplete fsm state");
|
sNError(pSyncNode, "skip leader election due to incomplete fsm state");
|
||||||
return -1;
|
return TSDB_CODE_SYN_WRONG_FSM_STATE;
|
||||||
}
|
}
|
||||||
|
|
||||||
sNInfo(pSyncNode, "begin election");
|
sNInfo(pSyncNode, "begin election");
|
||||||
|
@ -86,7 +86,7 @@ int32_t syncNodeElect(SSyncNode* pSyncNode) {
|
||||||
|
|
||||||
if (pSyncNode->state != TAOS_SYNC_STATE_CANDIDATE) {
|
if (pSyncNode->state != TAOS_SYNC_STATE_CANDIDATE) {
|
||||||
sNError(pSyncNode, "not candidate, can not elect");
|
sNError(pSyncNode, "not candidate, can not elect");
|
||||||
return -1;
|
return TSDB_CODE_SYN_WRONG_SYNC_STATE;
|
||||||
}
|
}
|
||||||
|
|
||||||
// start election
|
// start election
|
||||||
|
|
|
@ -48,14 +48,14 @@ int32_t syncInit() {
|
||||||
if (gNodeRefId < 0) {
|
if (gNodeRefId < 0) {
|
||||||
sError("failed to init node ref");
|
sError("failed to init node ref");
|
||||||
syncCleanUp();
|
syncCleanUp();
|
||||||
return -1;
|
return TSDB_CODE_SYN_WRONG_REF;
|
||||||
}
|
}
|
||||||
|
|
||||||
gHbDataRefId = taosOpenRef(200, (RefFp)syncHbTimerDataFree);
|
gHbDataRefId = taosOpenRef(200, (RefFp)syncHbTimerDataFree);
|
||||||
if (gHbDataRefId < 0) {
|
if (gHbDataRefId < 0) {
|
||||||
sError("failed to init hb-data ref");
|
sError("failed to init hb-data ref");
|
||||||
syncCleanUp();
|
syncCleanUp();
|
||||||
return -1;
|
return TSDB_CODE_SYN_WRONG_REF;
|
||||||
}
|
}
|
||||||
|
|
||||||
sDebug("sync rsetId:%d is open", gNodeRefId);
|
sDebug("sync rsetId:%d is open", gNodeRefId);
|
||||||
|
@ -106,7 +106,7 @@ void syncNodeRelease(SSyncNode *pNode) {
|
||||||
|
|
||||||
int64_t syncHbTimerDataAdd(SSyncHbTimerData *pData) {
|
int64_t syncHbTimerDataAdd(SSyncHbTimerData *pData) {
|
||||||
pData->rid = taosAddRef(gHbDataRefId, pData);
|
pData->rid = taosAddRef(gHbDataRefId, pData);
|
||||||
if (pData->rid < 0) return -1;
|
if (pData->rid < 0) return TSDB_CODE_SYN_WRONG_REF;
|
||||||
return pData->rid;
|
return pData->rid;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -98,6 +98,7 @@ SSyncLogReplMgr *syncNodeGetLogReplMgr(SSyncNode *pNode, SRaftId *pRaftId) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
terrno = TSDB_CODE_SYN_INVALID_ID;
|
||||||
sError("vgId:%d, indexmgr get replmgr from dnode:%d cluster:%d failed", pNode->vgId, DID(pRaftId), CID(pRaftId));
|
sError("vgId:%d, indexmgr get replmgr from dnode:%d cluster:%d failed", pNode->vgId, DID(pRaftId), CID(pRaftId));
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
@ -110,6 +111,7 @@ SyncIndex syncIndexMgrGetIndex(SSyncIndexMgr *pIndexMgr, const SRaftId *pRaftId)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
terrno = TSDB_CODE_SYN_INVALID_ID;
|
||||||
sError("vgId:%d, indexmgr get index from dnode:%d cluster:%d failed", pIndexMgr->pNode->vgId, DID(pRaftId),
|
sError("vgId:%d, indexmgr get index from dnode:%d cluster:%d failed", pIndexMgr->pNode->vgId, DID(pRaftId),
|
||||||
CID(pRaftId));
|
CID(pRaftId));
|
||||||
return SYNC_INDEX_INVALID;
|
return SYNC_INDEX_INVALID;
|
||||||
|
@ -137,7 +139,8 @@ int64_t syncIndexMgrGetStartTime(SSyncIndexMgr *pIndexMgr, const SRaftId *pRaftI
|
||||||
|
|
||||||
sError("vgId:%d, indexmgr get start-time from dnode:%d cluster:%d failed", pIndexMgr->pNode->vgId, DID(pRaftId),
|
sError("vgId:%d, indexmgr get start-time from dnode:%d cluster:%d failed", pIndexMgr->pNode->vgId, DID(pRaftId),
|
||||||
CID(pRaftId));
|
CID(pRaftId));
|
||||||
return -1;
|
return TSDB_CODE_SYN_INVALID_ID;
|
||||||
|
;
|
||||||
}
|
}
|
||||||
|
|
||||||
void syncIndexMgrSetRecvTime(SSyncIndexMgr *pIndexMgr, const SRaftId *pRaftId, int64_t recvTime) {
|
void syncIndexMgrSetRecvTime(SSyncIndexMgr *pIndexMgr, const SRaftId *pRaftId, int64_t recvTime) {
|
||||||
|
@ -162,7 +165,7 @@ int64_t syncIndexMgrGetRecvTime(SSyncIndexMgr *pIndexMgr, const SRaftId *pRaftId
|
||||||
|
|
||||||
sError("vgId:%d, indexmgr get recv-time from dnode:%d cluster:%d failed", pIndexMgr->pNode->vgId, DID(pRaftId),
|
sError("vgId:%d, indexmgr get recv-time from dnode:%d cluster:%d failed", pIndexMgr->pNode->vgId, DID(pRaftId),
|
||||||
CID(pRaftId));
|
CID(pRaftId));
|
||||||
return -1;
|
return TSDB_CODE_SYN_INVALID_ID;
|
||||||
}
|
}
|
||||||
|
|
||||||
void syncIndexMgrSetTerm(SSyncIndexMgr *pIndexMgr, const SRaftId *pRaftId, SyncTerm term) {
|
void syncIndexMgrSetTerm(SSyncIndexMgr *pIndexMgr, const SRaftId *pRaftId, SyncTerm term) {
|
||||||
|
@ -187,5 +190,5 @@ SyncTerm syncIndexMgrGetTerm(SSyncIndexMgr *pIndexMgr, const SRaftId *pRaftId) {
|
||||||
|
|
||||||
sError("vgId:%d, indexmgr get term from dnode:%d cluster:%d failed", pIndexMgr->pNode->vgId, DID(pRaftId),
|
sError("vgId:%d, indexmgr get term from dnode:%d cluster:%d failed", pIndexMgr->pNode->vgId, DID(pRaftId),
|
||||||
CID(pRaftId));
|
CID(pRaftId));
|
||||||
return -1;
|
return TSDB_CODE_SYN_INVALID_ID;
|
||||||
}
|
}
|
||||||
|
|
|
@ -527,6 +527,11 @@ TAOS_DEFINE_ERROR(TSDB_CODE_SYN_INVALID_SNAPSHOT_MSG, "Sync invalid snapshot
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_BUFFER_FULL, "Sync buffer is full")
|
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_BUFFER_FULL, "Sync buffer is full")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_WRITE_STALL, "Sync write stall")
|
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_WRITE_STALL, "Sync write stall")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_NEGOTIATION_WIN_FULL, "Sync negotiation win is full")
|
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_NEGOTIATION_WIN_FULL, "Sync negotiation win is full")
|
||||||
|
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_WRONG_TERM, "Sync got a wrong term")
|
||||||
|
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_WRONG_FSM_STATE, "Sync got a wrong fsm state")
|
||||||
|
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_WRONG_SYNC_STATE, "Sync got a wrong state")
|
||||||
|
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_WRONG_REF, "Sync got a wrong ref")
|
||||||
|
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_INVALID_ID, "Sync invalid id")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_INTERNAL_ERROR, "Sync internal error")
|
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_INTERNAL_ERROR, "Sync internal error")
|
||||||
|
|
||||||
//tq
|
//tq
|
||||||
|
|
Loading…
Reference in New Issue