Merge pull request #22199 from taosdata/fix/TS-3708
fix/TS-3708: check term for role time
This commit is contained in:
commit
cc3cc73cfc
|
@ -1180,6 +1180,8 @@ typedef struct {
|
|||
typedef struct {
|
||||
int8_t syncState;
|
||||
int8_t syncRestore;
|
||||
int64_t syncTerm;
|
||||
int64_t roleTimeMs;
|
||||
} SMnodeLoad;
|
||||
|
||||
typedef struct {
|
||||
|
|
|
@ -239,29 +239,31 @@ typedef struct SSyncState {
|
|||
ESyncState state;
|
||||
bool restored;
|
||||
bool canRead;
|
||||
SyncTerm term;
|
||||
int64_t roleTimeMs;
|
||||
} SSyncState;
|
||||
|
||||
int32_t syncInit();
|
||||
void syncCleanUp();
|
||||
int64_t syncOpen(SSyncInfo* pSyncInfo);
|
||||
int32_t syncStart(int64_t rid);
|
||||
void syncStop(int64_t rid);
|
||||
void syncPreStop(int64_t rid);
|
||||
void syncPostStop(int64_t rid);
|
||||
int32_t syncPropose(int64_t rid, SRpcMsg* pMsg, bool isWeak, int64_t* seq);
|
||||
int32_t syncIsCatchUp(int64_t rid);
|
||||
int32_t syncInit();
|
||||
void syncCleanUp();
|
||||
int64_t syncOpen(SSyncInfo* pSyncInfo);
|
||||
int32_t syncStart(int64_t rid);
|
||||
void syncStop(int64_t rid);
|
||||
void syncPreStop(int64_t rid);
|
||||
void syncPostStop(int64_t rid);
|
||||
int32_t syncPropose(int64_t rid, SRpcMsg* pMsg, bool isWeak, int64_t* seq);
|
||||
int32_t syncIsCatchUp(int64_t rid);
|
||||
ESyncRole syncGetRole(int64_t rid);
|
||||
int32_t syncProcessMsg(int64_t rid, SRpcMsg* pMsg);
|
||||
int32_t syncReconfig(int64_t rid, SSyncCfg* pCfg);
|
||||
int32_t syncBeginSnapshot(int64_t rid, int64_t lastApplyIndex);
|
||||
int32_t syncEndSnapshot(int64_t rid);
|
||||
int32_t syncLeaderTransfer(int64_t rid);
|
||||
int32_t syncStepDown(int64_t rid, SyncTerm newTerm);
|
||||
bool syncIsReadyForRead(int64_t rid);
|
||||
bool syncSnapshotSending(int64_t rid);
|
||||
bool syncSnapshotRecving(int64_t rid);
|
||||
int32_t syncSendTimeoutRsp(int64_t rid, int64_t seq);
|
||||
int32_t syncForceBecomeFollower(SSyncNode* ths, const SRpcMsg* pRpcMsg);
|
||||
int32_t syncProcessMsg(int64_t rid, SRpcMsg* pMsg);
|
||||
int32_t syncReconfig(int64_t rid, SSyncCfg* pCfg);
|
||||
int32_t syncBeginSnapshot(int64_t rid, int64_t lastApplyIndex);
|
||||
int32_t syncEndSnapshot(int64_t rid);
|
||||
int32_t syncLeaderTransfer(int64_t rid);
|
||||
int32_t syncStepDown(int64_t rid, SyncTerm newTerm);
|
||||
bool syncIsReadyForRead(int64_t rid);
|
||||
bool syncSnapshotSending(int64_t rid);
|
||||
bool syncSnapshotRecving(int64_t rid);
|
||||
int32_t syncSendTimeoutRsp(int64_t rid, int64_t seq);
|
||||
int32_t syncForceBecomeFollower(SSyncNode* ths, const SRpcMsg* pRpcMsg);
|
||||
|
||||
SSyncState syncGetState(int64_t rid);
|
||||
void syncGetRetryEpSet(int64_t rid, SEpSet* pEpSet);
|
||||
|
|
|
@ -33,7 +33,7 @@ static const SSysDbTableSchema dnodesSchema[] = {
|
|||
{.name = "support_vnodes", .bytes = 2, .type = TSDB_DATA_TYPE_SMALLINT, .sysInfo = true},
|
||||
{.name = "status", .bytes = 10 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true},
|
||||
{.name = "create_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = true},
|
||||
{.name = "reboot_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = true},
|
||||
{.name = "reboot_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = true},
|
||||
{.name = "note", .bytes = 256 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true},
|
||||
#ifdef TD_ENTERPRISE
|
||||
{.name = "active_code", .bytes = TSDB_ACTIVE_KEY_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true},
|
||||
|
@ -47,7 +47,7 @@ static const SSysDbTableSchema mnodesSchema[] = {
|
|||
{.name = "role", .bytes = 12 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true},
|
||||
{.name = "status", .bytes = 9 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true},
|
||||
{.name = "create_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = true},
|
||||
{.name = "reboot_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = true},
|
||||
{.name = "role_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = true},
|
||||
};
|
||||
|
||||
static const SSysDbTableSchema modulesSchema[] = {
|
||||
|
@ -73,7 +73,7 @@ static const SSysDbTableSchema clusterSchema[] = {
|
|||
{.name = "name", .bytes = TSDB_CLUSTER_ID_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true},
|
||||
{.name = "uptime", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = true},
|
||||
{.name = "create_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = true},
|
||||
{.name = "version", .bytes = 10 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true},
|
||||
{.name = "version", .bytes = 10 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true},
|
||||
{.name = "expire_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = true},
|
||||
};
|
||||
|
||||
|
|
|
@ -1101,6 +1101,8 @@ int32_t tSerializeSStatusReq(void *buf, int32_t bufLen, SStatusReq *pReq) {
|
|||
if (tEncodeI64(&encoder, pReq->qload.timeInFetchQueue) < 0) return -1;
|
||||
|
||||
if (tEncodeI32(&encoder, pReq->statusSeq) < 0) return -1;
|
||||
if (tEncodeI64(&encoder, pReq->mload.syncTerm) < 0) return -1;
|
||||
if (tEncodeI64(&encoder, pReq->mload.roleTimeMs) < 0) return -1;
|
||||
tEndEncode(&encoder);
|
||||
|
||||
int32_t tlen = encoder.pos;
|
||||
|
@ -1183,6 +1185,13 @@ int32_t tDeserializeSStatusReq(void *buf, int32_t bufLen, SStatusReq *pReq) {
|
|||
if (tDecodeI64(&decoder, &pReq->qload.timeInFetchQueue) < 0) return -1;
|
||||
|
||||
if (tDecodeI32(&decoder, &pReq->statusSeq) < 0) return -1;
|
||||
|
||||
pReq->mload.syncTerm = -1;
|
||||
pReq->mload.roleTimeMs = 0;
|
||||
if (!tDecodeIsEnd(&decoder)) {
|
||||
if (tDecodeI64(&decoder, &pReq->mload.syncTerm) < 0) return -1;
|
||||
if (tDecodeI64(&decoder, &pReq->mload.roleTimeMs) < 0) return -1;
|
||||
}
|
||||
tEndDecode(&decoder);
|
||||
tDecoderClear(&decoder);
|
||||
return 0;
|
||||
|
|
|
@ -216,8 +216,9 @@ typedef struct {
|
|||
int64_t createdTime;
|
||||
int64_t updateTime;
|
||||
ESyncState syncState;
|
||||
SyncTerm syncTerm;
|
||||
bool syncRestore;
|
||||
int64_t stateStartTime;
|
||||
int64_t roleTimeMs;
|
||||
SDnodeObj* pDnode;
|
||||
int32_t role;
|
||||
SyncIndex lastIndex;
|
||||
|
|
|
@ -524,13 +524,23 @@ static int32_t mndProcessStatusReq(SRpcMsg *pReq) {
|
|||
|
||||
SMnodeObj *pObj = mndAcquireMnode(pMnode, pDnode->id);
|
||||
if (pObj != NULL) {
|
||||
if (pObj->syncState != statusReq.mload.syncState || pObj->syncRestore != statusReq.mload.syncRestore) {
|
||||
mInfo("dnode:%d, mnode syncState from %s to %s, restoreState from %d to %d", pObj->id, syncStr(pObj->syncState),
|
||||
syncStr(statusReq.mload.syncState), pObj->syncRestore, statusReq.mload.syncRestore);
|
||||
bool roleChanged = pObj->syncState != statusReq.mload.syncState ||
|
||||
(statusReq.mload.syncTerm != -1 && pObj->syncTerm != statusReq.mload.syncTerm);
|
||||
bool restoreChanged = pObj->syncRestore != statusReq.mload.syncRestore;
|
||||
if (roleChanged || restoreChanged) {
|
||||
mInfo("dnode:%d, mnode syncState from %s to %s, restoreState from %d to %d, syncTerm from %" PRId64
|
||||
" to %" PRId64,
|
||||
pObj->id, syncStr(pObj->syncState), syncStr(statusReq.mload.syncState), pObj->syncRestore,
|
||||
statusReq.mload.syncRestore, pObj->syncTerm, statusReq.mload.syncTerm);
|
||||
pObj->syncState = statusReq.mload.syncState;
|
||||
pObj->syncRestore = statusReq.mload.syncRestore;
|
||||
pObj->stateStartTime = taosGetTimestampMs();
|
||||
pObj->syncTerm = statusReq.mload.syncTerm;
|
||||
}
|
||||
|
||||
if (roleChanged) {
|
||||
pObj->roleTimeMs = (statusReq.mload.roleTimeMs != 0) ? statusReq.mload.roleTimeMs : taosGetTimestampMs();
|
||||
}
|
||||
|
||||
mndReleaseMnode(pMnode, pObj);
|
||||
}
|
||||
|
||||
|
|
|
@ -890,7 +890,10 @@ int32_t mndGetLoad(SMnode *pMnode, SMnodeLoad *pLoad) {
|
|||
SSyncState state = syncGetState(pMnode->syncMgmt.sync);
|
||||
pLoad->syncState = state.state;
|
||||
pLoad->syncRestore = state.restored;
|
||||
mTrace("mnode current syncState is %s, syncRestore:%d", syncStr(pLoad->syncState), pLoad->syncRestore);
|
||||
pLoad->syncTerm = state.term;
|
||||
pLoad->roleTimeMs = state.roleTimeMs;
|
||||
mTrace("mnode current syncState is %s, syncRestore:%d, syncTerm:%" PRId64 " ,roleTimeMs:%" PRId64,
|
||||
syncStr(pLoad->syncState), pLoad->syncRestore, pLoad->syncTerm, pLoad->roleTimeMs);
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
|
|
@ -319,7 +319,7 @@ static int32_t mndBuildCreateMnodeRedoAction(STrans *pTrans, SDCreateMnodeReq *p
|
|||
return 0;
|
||||
}
|
||||
|
||||
static int32_t mndBuildAlterMnodeTypeRedoAction(STrans *pTrans,
|
||||
static int32_t mndBuildAlterMnodeTypeRedoAction(STrans *pTrans,
|
||||
SDAlterMnodeTypeReq *pAlterMnodeTypeReq, SEpSet *pAlterMnodeTypeEpSet) {
|
||||
int32_t contLen = tSerializeSDCreateMnodeReq(NULL, 0, pAlterMnodeTypeReq);
|
||||
void *pReq = taosMemoryMalloc(contLen);
|
||||
|
@ -803,9 +803,17 @@ static int32_t mndRetrieveMnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB
|
|||
int32_t numOfRows = 0;
|
||||
int32_t cols = 0;
|
||||
SMnodeObj *pObj = NULL;
|
||||
SMnodeObj *pSelfObj = NULL;
|
||||
ESdbStatus objStatus = 0;
|
||||
char *pWrite;
|
||||
int64_t curMs = taosGetTimestampMs();
|
||||
int64_t dummyTimeMs = 0;
|
||||
|
||||
pSelfObj = sdbAcquire(pSdb, SDB_MNODE, &pMnode->selfDnodeId);
|
||||
if (pSelfObj == NULL) {
|
||||
mError("mnode:%d, failed to acquire self %s", pMnode->selfDnodeId, terrstr());
|
||||
goto _out;
|
||||
}
|
||||
|
||||
while (numOfRows < rows) {
|
||||
pShow->pIter = sdbFetchAll(pSdb, SDB_MNODE, pShow->pIter, (void **)&pObj, &objStatus, true);
|
||||
|
@ -825,7 +833,8 @@ static int32_t mndRetrieveMnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB
|
|||
if (pObj->id == pMnode->selfDnodeId) {
|
||||
snprintf(role, sizeof(role), "%s%s", syncStr(TAOS_SYNC_STATE_LEADER), pMnode->restored ? "" : "*");
|
||||
}
|
||||
if (mndIsDnodeOnline(pObj->pDnode, curMs)) {
|
||||
bool isDnodeOnline = mndIsDnodeOnline(pObj->pDnode, curMs);
|
||||
if (isDnodeOnline) {
|
||||
tstrncpy(role, syncStr(pObj->syncState), sizeof(role));
|
||||
if (pObj->syncState == TAOS_SYNC_STATE_LEADER && pObj->id != pMnode->selfDnodeId) {
|
||||
tstrncpy(role, syncStr(TAOS_SYNC_STATE_ERROR), sizeof(role));
|
||||
|
@ -840,7 +849,7 @@ static int32_t mndRetrieveMnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB
|
|||
const char *status = "ready";
|
||||
if (objStatus == SDB_STATUS_CREATING) status = "creating";
|
||||
if (objStatus == SDB_STATUS_DROPPING) status = "dropping";
|
||||
if (!mndIsDnodeOnline(pObj->pDnode, curMs)) status = "offline";
|
||||
if (!isDnodeOnline) status = "offline";
|
||||
char b3[9 + VARSTR_HEADER_SIZE] = {0};
|
||||
STR_WITH_MAXSIZE_TO_VARSTR(b3, status, pShow->pMeta->pSchemas[cols].bytes);
|
||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
|
@ -850,7 +859,15 @@ static int32_t mndRetrieveMnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB
|
|||
colDataSetVal(pColInfo, numOfRows, (const char *)&pObj->createdTime, false);
|
||||
|
||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
colDataSetVal(pColInfo, numOfRows, (const char *)&pObj->stateStartTime, false);
|
||||
if (pObj->syncTerm != pSelfObj->syncTerm || !isDnodeOnline) {
|
||||
// state of old term / no status report => use dummyTimeMs
|
||||
if (pObj->syncTerm > pSelfObj->syncTerm) {
|
||||
mError("mnode:%d has a newer term:%" PRId64 " than me:%" PRId64, pObj->id, pObj->syncTerm, pSelfObj->syncTerm);
|
||||
}
|
||||
colDataSetVal(pColInfo, numOfRows, (const char *)&dummyTimeMs, false);
|
||||
} else {
|
||||
colDataSetVal(pColInfo, numOfRows, (const char *)&pObj->roleTimeMs, false);
|
||||
}
|
||||
|
||||
numOfRows++;
|
||||
sdbRelease(pSdb, pObj);
|
||||
|
@ -858,6 +875,8 @@ static int32_t mndRetrieveMnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB
|
|||
|
||||
pShow->numOfRows += numOfRows;
|
||||
|
||||
_out:
|
||||
sdbRelease(pSdb, pSelfObj);
|
||||
return numOfRows;
|
||||
}
|
||||
|
||||
|
@ -999,12 +1018,12 @@ static void mndReloadSyncConfig(SMnode *pMnode) {
|
|||
}
|
||||
|
||||
if (pMnode->syncMgmt.sync > 0) {
|
||||
mInfo("vgId:1, mnode sync reconfig, totalReplica:%d replica:%d myIndex:%d",
|
||||
mInfo("vgId:1, mnode sync reconfig, totalReplica:%d replica:%d myIndex:%d",
|
||||
cfg.totalReplicaNum, cfg.replicaNum, cfg.myIndex);
|
||||
|
||||
for (int32_t i = 0; i < cfg.totalReplicaNum; ++i) {
|
||||
SNodeInfo *pNode = &cfg.nodeInfo[i];
|
||||
mInfo("vgId:1, index:%d, ep:%s:%u dnode:%d cluster:%" PRId64 " role:%d", i, pNode->nodeFqdn, pNode->nodePort,
|
||||
mInfo("vgId:1, index:%d, ep:%s:%u dnode:%d cluster:%" PRId64 " role:%d", i, pNode->nodeFqdn, pNode->nodePort,
|
||||
pNode->nodeId, pNode->clusterId, pNode->nodeRole);
|
||||
}
|
||||
|
||||
|
|
|
@ -213,7 +213,7 @@ typedef struct SSyncNode {
|
|||
int64_t minMatchIndex;
|
||||
|
||||
int64_t startTime;
|
||||
int64_t leaderTime;
|
||||
int64_t roleTimeMs;
|
||||
int64_t lastReplicateTime;
|
||||
|
||||
int32_t electNum;
|
||||
|
|
|
@ -508,12 +508,14 @@ SSyncState syncGetState(int64_t rid) {
|
|||
SSyncNode* pSyncNode = syncNodeAcquire(rid);
|
||||
if (pSyncNode != NULL) {
|
||||
state.state = pSyncNode->state;
|
||||
state.roleTimeMs = pSyncNode->roleTimeMs;
|
||||
state.restored = pSyncNode->restoreFinish;
|
||||
if (pSyncNode->vgId != 1) {
|
||||
state.canRead = syncNodeIsReadyForRead(pSyncNode);
|
||||
} else {
|
||||
state.canRead = state.restored;
|
||||
}
|
||||
state.term = raftStoreGetTerm(pSyncNode);
|
||||
syncNodeRelease(pSyncNode);
|
||||
}
|
||||
|
||||
|
@ -898,6 +900,7 @@ SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo) {
|
|||
|
||||
// init TLA+ server vars
|
||||
pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER;
|
||||
pSyncNode->roleTimeMs = taosGetTimestampMs();
|
||||
if (raftStoreOpen(pSyncNode) != 0) {
|
||||
sError("vgId:%d, failed to open raft store at path %s", pSyncNode->vgId, pSyncNode->raftStorePath);
|
||||
goto _error;
|
||||
|
@ -1035,7 +1038,6 @@ SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo) {
|
|||
|
||||
int64_t timeNow = taosGetTimestampMs();
|
||||
pSyncNode->startTime = timeNow;
|
||||
pSyncNode->leaderTime = timeNow;
|
||||
pSyncNode->lastReplicateTime = timeNow;
|
||||
|
||||
// snapshotting
|
||||
|
@ -1131,6 +1133,7 @@ int32_t syncNodeStart(SSyncNode* pSyncNode) {
|
|||
int32_t syncNodeStartStandBy(SSyncNode* pSyncNode) {
|
||||
// state change
|
||||
pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER;
|
||||
pSyncNode->roleTimeMs = taosGetTimestampMs();
|
||||
syncNodeStopHeartbeatTimer(pSyncNode);
|
||||
|
||||
// reset elect timer, long enough
|
||||
|
@ -1667,6 +1670,7 @@ void syncNodeBecomeFollower(SSyncNode* pSyncNode, const char* debugStr) {
|
|||
|
||||
// state change
|
||||
pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER;
|
||||
pSyncNode->roleTimeMs = taosGetTimestampMs();
|
||||
syncNodeStopHeartbeatTimer(pSyncNode);
|
||||
|
||||
// trace log
|
||||
|
@ -1695,6 +1699,7 @@ void syncNodeBecomeLearner(SSyncNode* pSyncNode, const char* debugStr) {
|
|||
|
||||
// state change
|
||||
pSyncNode->state = TAOS_SYNC_STATE_LEARNER;
|
||||
pSyncNode->roleTimeMs = taosGetTimestampMs();
|
||||
|
||||
// trace log
|
||||
sNTrace(pSyncNode, "become learner %s", debugStr);
|
||||
|
@ -1730,8 +1735,6 @@ void syncNodeBecomeLearner(SSyncNode* pSyncNode, const char* debugStr) {
|
|||
// /\ UNCHANGED <<messages, currentTerm, votedFor, candidateVars, logVars>>
|
||||
//
|
||||
void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr) {
|
||||
pSyncNode->leaderTime = taosGetTimestampMs();
|
||||
|
||||
pSyncNode->becomeLeaderNum++;
|
||||
pSyncNode->hbrSlowNum = 0;
|
||||
|
||||
|
@ -1740,6 +1743,7 @@ void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr) {
|
|||
|
||||
// state change
|
||||
pSyncNode->state = TAOS_SYNC_STATE_LEADER;
|
||||
pSyncNode->roleTimeMs = taosGetTimestampMs();
|
||||
|
||||
// set leader cache
|
||||
pSyncNode->leaderCache = pSyncNode->myRaftId;
|
||||
|
@ -1839,6 +1843,7 @@ int32_t syncNodePeerStateInit(SSyncNode* pSyncNode) {
|
|||
void syncNodeFollower2Candidate(SSyncNode* pSyncNode) {
|
||||
ASSERT(pSyncNode->state == TAOS_SYNC_STATE_FOLLOWER);
|
||||
pSyncNode->state = TAOS_SYNC_STATE_CANDIDATE;
|
||||
pSyncNode->roleTimeMs = taosGetTimestampMs();
|
||||
SyncIndex lastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
|
||||
sInfo("vgId:%d, become candidate from follower. term:%" PRId64 ", commit index:%" PRId64 ", last index:%" PRId64,
|
||||
pSyncNode->vgId, raftStoreGetTerm(pSyncNode), pSyncNode->commitIndex, lastIndex);
|
||||
|
|
Loading…
Reference in New Issue