Merge pull request #27991 from taosdata/fix/TD-31891-remove-void-sync1

fix/TD-31891-remove-void-sync1
This commit is contained in:
Hongze Cheng 2024-09-24 08:50:07 +08:00 committed by GitHub
commit 3312eec628
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 228 additions and 123 deletions

View File

@ -497,11 +497,9 @@ int32_t mndInitSync(SMnode *pMnode) {
pNode->nodePort = pMgmt->replicas[i].port; pNode->nodePort = pMgmt->replicas[i].port;
tstrncpy(pNode->nodeFqdn, pMgmt->replicas[i].fqdn, sizeof(pNode->nodeFqdn)); tstrncpy(pNode->nodeFqdn, pMgmt->replicas[i].fqdn, sizeof(pNode->nodeFqdn));
pNode->nodeRole = pMgmt->nodeRoles[i]; pNode->nodeRole = pMgmt->nodeRoles[i];
if (tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort) != true) { bool update = tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort);
mError("failed to open sync, tmsgUpdateDnodeInfo is false"); mInfo("vgId:1, index:%d ep:%s:%u dnode:%d cluster:%" PRId64 ", update:%d", i, pNode->nodeFqdn, pNode->nodePort,
} pNode->nodeId, pNode->clusterId, update);
mInfo("vgId:1, index:%d ep:%s:%u dnode:%d cluster:%" PRId64, i, pNode->nodeFqdn, pNode->nodePort, pNode->nodeId,
pNode->clusterId);
} }
int32_t code = 0; int32_t code = 0;

View File

@ -175,7 +175,8 @@ _SEND_RESPONSE:
if (accepted && matched) { if (accepted && matched) {
pReply->success = true; pReply->success = true;
// update commit index only after matching // update commit index only after matching
(void)syncNodeUpdateCommitIndex(ths, TMIN(pMsg->commitIndex, pReply->lastSendIndex)); SyncIndex returnIndex = syncNodeUpdateCommitIndex(ths, TMIN(pMsg->commitIndex, pReply->lastSendIndex));
sTrace("vgId:%d, update commit return index %" PRId64 "", ths->vgId, returnIndex);
} }
// ack, i.e. send response // ack, i.e. send response

View File

@ -85,10 +85,9 @@ int64_t syncNodeCheckCommitIndex(SSyncNode* ths, SyncIndex indexLikely) {
int32_t code = 0; int32_t code = 0;
if (indexLikely > ths->commitIndex && syncNodeAgreedUpon(ths, indexLikely)) { if (indexLikely > ths->commitIndex && syncNodeAgreedUpon(ths, indexLikely)) {
SyncIndex commitIndex = indexLikely; SyncIndex commitIndex = indexLikely;
// TODO add return when error SyncIndex returnIndex = syncNodeUpdateCommitIndex(ths, commitIndex);
(void)syncNodeUpdateCommitIndex(ths, commitIndex); sTrace("vgId:%d, agreed upon. role:%d, term:%" PRId64 ", index:%" PRId64 ", return:%" PRId64, ths->vgId, ths->state,
sTrace("vgId:%d, agreed upon. role:%d, term:%" PRId64 ", index:%" PRId64 "", ths->vgId, ths->state, raftStoreGetTerm(ths), commitIndex, returnIndex);
raftStoreGetTerm(ths), commitIndex);
} }
return ths->commitIndex; return ths->commitIndex;
} }

View File

@ -201,13 +201,13 @@ int32_t syncReconfig(int64_t rid, SSyncCfg* pNewCfg) {
if (pSyncNode->state == TAOS_SYNC_STATE_LEADER || pSyncNode->state == TAOS_SYNC_STATE_ASSIGNED_LEADER) { if (pSyncNode->state == TAOS_SYNC_STATE_LEADER || pSyncNode->state == TAOS_SYNC_STATE_ASSIGNED_LEADER) {
// TODO check return value // TODO check return value
(void)syncNodeStopHeartbeatTimer(pSyncNode); TAOS_CHECK_RETURN(syncNodeStopHeartbeatTimer(pSyncNode));
for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) { for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
(void)syncHbTimerInit(pSyncNode, &pSyncNode->peerHeartbeatTimerArr[i], pSyncNode->replicasId[i]); TAOS_CHECK_RETURN(syncHbTimerInit(pSyncNode, &pSyncNode->peerHeartbeatTimerArr[i], pSyncNode->replicasId[i]));
} }
(void)syncNodeStartHeartbeatTimer(pSyncNode); TAOS_CHECK_RETURN(syncNodeStartHeartbeatTimer(pSyncNode));
// syncNodeReplicate(pSyncNode); // syncNodeReplicate(pSyncNode);
} }
@ -410,9 +410,8 @@ int32_t syncSendTimeoutRsp(int64_t rid, int64_t seq) {
syncNodeRelease(pNode); syncNodeRelease(pNode);
if (ret == 1) { if (ret == 1) {
sInfo("send timeout response, seq:%" PRId64 " handle:%p ahandle:%p", seq, rpcMsg.info.handle, rpcMsg.info.ahandle); sInfo("send timeout response, seq:%" PRId64 " handle:%p ahandle:%p", seq, rpcMsg.info.handle, rpcMsg.info.ahandle);
// TODO check return value code = rpcSendResponse(&rpcMsg);
(void)rpcSendResponse(&rpcMsg); return code;
return 0;
} else { } else {
sError("no message handle to send timeout response, seq:%" PRId64, seq); sError("no message handle to send timeout response, seq:%" PRId64, seq);
return TSDB_CODE_SYN_INTERNAL_ERROR; return TSDB_CODE_SYN_INTERNAL_ERROR;
@ -933,7 +932,7 @@ int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak, int64_
int32_t code = syncBuildClientRequest(&rpcMsg, pMsg, seqNum, isWeak, pSyncNode->vgId); int32_t code = syncBuildClientRequest(&rpcMsg, pMsg, seqNum, isWeak, pSyncNode->vgId);
if (code != 0) { if (code != 0) {
sError("vgId:%d, failed to propose msg while serialize since %s", pSyncNode->vgId, terrstr()); sError("vgId:%d, failed to propose msg while serialize since %s", pSyncNode->vgId, terrstr());
(void)syncRespMgrDel(pSyncNode->pSyncRespMgr, seqNum); code = syncRespMgrDel(pSyncNode->pSyncRespMgr, seqNum);
TAOS_RETURN(code); TAOS_RETURN(code);
} }
@ -941,7 +940,7 @@ int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak, int64_
code = (*pSyncNode->syncEqMsg)(pSyncNode->msgcb, &rpcMsg); code = (*pSyncNode->syncEqMsg)(pSyncNode->msgcb, &rpcMsg);
if (code != 0) { if (code != 0) {
sWarn("vgId:%d, failed to propose msg while enqueue since %s", pSyncNode->vgId, terrstr()); sWarn("vgId:%d, failed to propose msg while enqueue since %s", pSyncNode->vgId, terrstr());
(void)syncRespMgrDel(pSyncNode->pSyncRespMgr, seqNum); TAOS_CHECK_RETURN(syncRespMgrDel(pSyncNode->pSyncRespMgr, seqNum));
} }
if (seq != NULL) *seq = seqNum; if (seq != NULL) *seq = seqNum;
@ -961,7 +960,7 @@ static int32_t syncHbTimerInit(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer, SRa
} }
static int32_t syncHbTimerStart(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer) { static int32_t syncHbTimerStart(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer) {
int32_t ret = 0; int32_t code = 0;
int64_t tsNow = taosGetTimestampMs(); int64_t tsNow = taosGetTimestampMs();
if (syncIsInit()) { if (syncIsInit()) {
SSyncHbTimerData* pData = syncHbTimerDataAcquire(pSyncTimer->hbDataRid); SSyncHbTimerData* pData = syncHbTimerDataAcquire(pSyncTimer->hbDataRid);
@ -980,21 +979,20 @@ static int32_t syncHbTimerStart(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer) {
sTrace("vgId:%d, start hb timer, rid:%" PRId64 " addr:%" PRId64, pSyncNode->vgId, pData->rid, pData->destId.addr); sTrace("vgId:%d, start hb timer, rid:%" PRId64 " addr:%" PRId64, pSyncNode->vgId, pData->rid, pData->destId.addr);
(void)taosTmrReset(pSyncTimer->timerCb, pSyncTimer->timerMS / HEARTBEAT_TICK_NUM, (void*)(pData->rid), TAOS_CHECK_RETURN(taosTmrReset(pSyncTimer->timerCb, pSyncTimer->timerMS / HEARTBEAT_TICK_NUM, (void*)(pData->rid),
syncEnv()->pTimerManager, &pSyncTimer->pTimer); syncEnv()->pTimerManager, &pSyncTimer->pTimer));
} else { } else {
ret = TSDB_CODE_SYN_INTERNAL_ERROR; code = TSDB_CODE_SYN_INTERNAL_ERROR;
sError("vgId:%d, start ctrl hb timer error, sync env is stop", pSyncNode->vgId); sError("vgId:%d, start ctrl hb timer error, sync env is stop", pSyncNode->vgId);
} }
return ret; return code;
} }
static int32_t syncHbTimerStop(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer) { static int32_t syncHbTimerStop(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer) {
int32_t ret = 0; int32_t ret = 0;
(void)atomic_add_fetch_64(&pSyncTimer->logicClock, 1); (void)atomic_add_fetch_64(&pSyncTimer->logicClock, 1);
if (!taosTmrStop(pSyncTimer->pTimer)) { bool stop = taosTmrStop(pSyncTimer->pTimer);
return TSDB_CODE_SYN_INTERNAL_ERROR; sDebug("vgId:%d, stop hb timer stop:%d", pSyncNode->vgId, stop);
}
pSyncTimer->pTimer = NULL; pSyncTimer->pTimer = NULL;
syncHbTimerDataRemove(pSyncTimer->hbDataRid); syncHbTimerDataRemove(pSyncTimer->hbDataRid);
pSyncTimer->hbDataRid = -1; pSyncTimer->hbDataRid = -1;
@ -1141,8 +1139,8 @@ SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo, int32_t vnodeVersion) {
pSyncNode->replicaNum = pSyncNode->raftCfg.cfg.replicaNum; pSyncNode->replicaNum = pSyncNode->raftCfg.cfg.replicaNum;
pSyncNode->totalReplicaNum = pSyncNode->raftCfg.cfg.totalReplicaNum; pSyncNode->totalReplicaNum = pSyncNode->raftCfg.cfg.totalReplicaNum;
for (int32_t i = 0; i < pSyncNode->raftCfg.cfg.totalReplicaNum; ++i) { for (int32_t i = 0; i < pSyncNode->raftCfg.cfg.totalReplicaNum; ++i) {
if (!syncUtilNodeInfo2RaftId(&pSyncNode->raftCfg.cfg.nodeInfo[i], pSyncNode->vgId, &pSyncNode->replicasId[i])) { if (syncUtilNodeInfo2RaftId(&pSyncNode->raftCfg.cfg.nodeInfo[i], pSyncNode->vgId, &pSyncNode->replicasId[i]) ==
terrno = TSDB_CODE_SYN_INTERNAL_ERROR; false) {
sError("vgId:%d, failed to determine raft member id, replica:%d", pSyncNode->vgId, i); sError("vgId:%d, failed to determine raft member id, replica:%d", pSyncNode->vgId, i);
goto _error; goto _error;
} }
@ -1308,7 +1306,10 @@ SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo, int32_t vnodeVersion) {
} }
// tools // tools
(void)syncRespMgrCreate(pSyncNode, SYNC_RESP_TTL_MS, &pSyncNode->pSyncRespMgr); // TODO: check return value if ((code = syncRespMgrCreate(pSyncNode, SYNC_RESP_TTL_MS, &pSyncNode->pSyncRespMgr)) != 0) {
sError("vgId:%d, failed to create SyncRespMgr", pSyncNode->vgId);
goto _error;
}
if (pSyncNode->pSyncRespMgr == NULL) { if (pSyncNode->pSyncRespMgr == NULL) {
sError("vgId:%d, failed to create SyncRespMgr", pSyncNode->vgId); sError("vgId:%d, failed to create SyncRespMgr", pSyncNode->vgId);
goto _error; goto _error;
@ -1471,29 +1472,31 @@ int32_t syncNodeStart(SSyncNode* pSyncNode) {
#ifdef BUILD_NO_CALL #ifdef BUILD_NO_CALL
int32_t syncNodeStartStandBy(SSyncNode* pSyncNode) { int32_t syncNodeStartStandBy(SSyncNode* pSyncNode) {
// state change // state change
int32_t code = 0;
pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER; pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER;
pSyncNode->roleTimeMs = taosGetTimestampMs(); pSyncNode->roleTimeMs = taosGetTimestampMs();
// TODO check return value // TODO check return value
(void)syncNodeStopHeartbeatTimer(pSyncNode); TAOS_CHECK_RETURN(syncNodeStopHeartbeatTimer(pSyncNode));
// reset elect timer, long enough // reset elect timer, long enough
int32_t electMS = TIMER_MAX_MS; int32_t electMS = TIMER_MAX_MS;
int32_t ret = syncNodeRestartElectTimer(pSyncNode, electMS); code = syncNodeRestartElectTimer(pSyncNode, electMS);
if (ret < 0) { if (code < 0) {
sError("vgId:%d, failed to restart elect timer since %s", pSyncNode->vgId, terrstr()); sError("vgId:%d, failed to restart elect timer since %s", pSyncNode->vgId, terrstr());
return -1; return -1;
} }
ret = syncNodeStartPingTimer(pSyncNode); code = syncNodeStartPingTimer(pSyncNode);
if (ret < 0) { if (code < 0) {
sError("vgId:%d, failed to start ping timer since %s", pSyncNode->vgId, terrstr()); sError("vgId:%d, failed to start ping timer since %s", pSyncNode->vgId, terrstr());
return -1; return -1;
} }
return ret; return code;
} }
#endif #endif
void syncNodePreClose(SSyncNode* pSyncNode) { void syncNodePreClose(SSyncNode* pSyncNode) {
int32_t code = 0;
if (pSyncNode == NULL) { if (pSyncNode == NULL) {
sError("failed to pre close sync node since sync node is null"); sError("failed to pre close sync node since sync node is null");
return; return;
@ -1508,13 +1511,22 @@ void syncNodePreClose(SSyncNode* pSyncNode) {
} }
// stop elect timer // stop elect timer
(void)syncNodeStopElectTimer(pSyncNode); if ((code = syncNodeStopElectTimer(pSyncNode)) != 0) {
sError("vgId:%d, failed to stop elect timer since %s", pSyncNode->vgId, tstrerror(code));
return;
}
// stop heartbeat timer // stop heartbeat timer
(void)syncNodeStopHeartbeatTimer(pSyncNode); if ((code = syncNodeStopHeartbeatTimer(pSyncNode)) != 0) {
sError("vgId:%d, failed to stop heartbeat timer since %s", pSyncNode->vgId, tstrerror(code));
return;
}
// stop ping timer // stop ping timer
(void)syncNodeStopPingTimer(pSyncNode); if ((code = syncNodeStopPingTimer(pSyncNode)) != 0) {
sError("vgId:%d, failed to stop ping timer since %s", pSyncNode->vgId, tstrerror(code));
return;
}
// clean rsp // clean rsp
syncRespCleanRsp(pSyncNode->pSyncRespMgr); syncRespCleanRsp(pSyncNode->pSyncRespMgr);
@ -1536,14 +1548,24 @@ void syncNodePostClose(SSyncNode* pSyncNode) {
void syncHbTimerDataFree(SSyncHbTimerData* pData) { taosMemoryFree(pData); } void syncHbTimerDataFree(SSyncHbTimerData* pData) { taosMemoryFree(pData); }
void syncNodeClose(SSyncNode* pSyncNode) { void syncNodeClose(SSyncNode* pSyncNode) {
int32_t code = 0;
if (pSyncNode == NULL) return; if (pSyncNode == NULL) return;
sNInfo(pSyncNode, "sync close, node:%p", pSyncNode); sNInfo(pSyncNode, "sync close, node:%p", pSyncNode);
syncRespCleanRsp(pSyncNode->pSyncRespMgr); syncRespCleanRsp(pSyncNode->pSyncRespMgr);
(void)syncNodeStopPingTimer(pSyncNode); if ((code = syncNodeStopPingTimer(pSyncNode)) != 0) {
(void)syncNodeStopElectTimer(pSyncNode); sError("vgId:%d, failed to stop ping timer since %s", pSyncNode->vgId, tstrerror(code));
(void)syncNodeStopHeartbeatTimer(pSyncNode); return;
}
if ((code = syncNodeStopElectTimer(pSyncNode)) != 0) {
sError("vgId:%d, failed to stop elect timer since %s", pSyncNode->vgId, tstrerror(code));
return;
}
if ((code = syncNodeStopHeartbeatTimer(pSyncNode)) != 0) {
sError("vgId:%d, failed to stop heartbeat timer since %s", pSyncNode->vgId, tstrerror(code));
return;
}
syncNodeLogReplDestroy(pSyncNode); syncNodeLogReplDestroy(pSyncNode);
syncRespMgrDestroy(pSyncNode->pSyncRespMgr); syncRespMgrDestroy(pSyncNode->pSyncRespMgr);
@ -1599,28 +1621,28 @@ ESyncStrategy syncNodeStrategy(SSyncNode* pSyncNode) { return pSyncNode->raftCfg
// timer control -------------- // timer control --------------
int32_t syncNodeStartPingTimer(SSyncNode* pSyncNode) { int32_t syncNodeStartPingTimer(SSyncNode* pSyncNode) {
int32_t ret = 0; int32_t code = 0;
if (syncIsInit()) { if (syncIsInit()) {
(void)taosTmrReset(pSyncNode->FpPingTimerCB, pSyncNode->pingTimerMS, (void*)pSyncNode->rid, TAOS_CHECK_RETURN(taosTmrReset(pSyncNode->FpPingTimerCB, pSyncNode->pingTimerMS, (void*)pSyncNode->rid,
syncEnv()->pTimerManager, &pSyncNode->pPingTimer); syncEnv()->pTimerManager, &pSyncNode->pPingTimer));
atomic_store_64(&pSyncNode->pingTimerLogicClock, pSyncNode->pingTimerLogicClockUser); atomic_store_64(&pSyncNode->pingTimerLogicClock, pSyncNode->pingTimerLogicClockUser);
} else { } else {
sError("vgId:%d, start ping timer error, sync env is stop", pSyncNode->vgId); sError("vgId:%d, start ping timer error, sync env is stop", pSyncNode->vgId);
} }
return ret; return code;
} }
int32_t syncNodeStopPingTimer(SSyncNode* pSyncNode) { int32_t syncNodeStopPingTimer(SSyncNode* pSyncNode) {
int32_t ret = 0; int32_t code = 0;
(void)atomic_add_fetch_64(&pSyncNode->pingTimerLogicClockUser, 1); (void)atomic_add_fetch_64(&pSyncNode->pingTimerLogicClockUser, 1);
// TODO check return value bool stop = taosTmrStop(pSyncNode->pPingTimer);
(void)taosTmrStop(pSyncNode->pPingTimer); sDebug("vgId:%d, stop ping timer, stop:%d", pSyncNode->vgId, stop);
pSyncNode->pPingTimer = NULL; pSyncNode->pPingTimer = NULL;
return ret; return code;
} }
int32_t syncNodeStartElectTimer(SSyncNode* pSyncNode, int32_t ms) { int32_t syncNodeStartElectTimer(SSyncNode* pSyncNode, int32_t ms) {
int32_t ret = 0; int32_t code = 0;
if (syncIsInit()) { if (syncIsInit()) {
pSyncNode->electTimerMS = ms; pSyncNode->electTimerMS = ms;
@ -1630,22 +1652,22 @@ int32_t syncNodeStartElectTimer(SSyncNode* pSyncNode, int32_t ms) {
pSyncNode->electTimerParam.pSyncNode = pSyncNode; pSyncNode->electTimerParam.pSyncNode = pSyncNode;
pSyncNode->electTimerParam.pData = NULL; pSyncNode->electTimerParam.pData = NULL;
(void)taosTmrReset(pSyncNode->FpElectTimerCB, pSyncNode->electTimerMS, (void*)(pSyncNode->rid), TAOS_CHECK_RETURN(taosTmrReset(pSyncNode->FpElectTimerCB, pSyncNode->electTimerMS, (void*)(pSyncNode->rid),
syncEnv()->pTimerManager, &pSyncNode->pElectTimer); syncEnv()->pTimerManager, &pSyncNode->pElectTimer));
} else { } else {
sError("vgId:%d, start elect timer error, sync env is stop", pSyncNode->vgId); sError("vgId:%d, start elect timer error, sync env is stop", pSyncNode->vgId);
} }
return ret; return code;
} }
int32_t syncNodeStopElectTimer(SSyncNode* pSyncNode) { int32_t syncNodeStopElectTimer(SSyncNode* pSyncNode) {
int32_t ret = 0; int32_t code = 0;
(void)atomic_add_fetch_64(&pSyncNode->electTimerLogicClock, 1); (void)atomic_add_fetch_64(&pSyncNode->electTimerLogicClock, 1);
// TODO check return value bool stop = taosTmrStop(pSyncNode->pElectTimer);
(void)taosTmrStop(pSyncNode->pElectTimer); sDebug("vgId:%d, stop elect timer, stop:%d", pSyncNode->vgId, stop);
pSyncNode->pElectTimer = NULL; pSyncNode->pElectTimer = NULL;
return ret; return code;
} }
int32_t syncNodeRestartElectTimer(SSyncNode* pSyncNode, int32_t ms) { int32_t syncNodeRestartElectTimer(SSyncNode* pSyncNode, int32_t ms) {
@ -1666,7 +1688,10 @@ void syncNodeResetElectTimer(SSyncNode* pSyncNode) {
} }
// TODO check return value // TODO check return value
(void)syncNodeRestartElectTimer(pSyncNode, electMS); if ((code = syncNodeRestartElectTimer(pSyncNode, electMS)) != 0) {
sError("vgId:%d, failed to restart elect timer since %s", pSyncNode->vgId, terrstr());
return;
};
sNTrace(pSyncNode, "reset elect timer, min:%d, max:%d, ms:%d", pSyncNode->electBaseLine, 2 * pSyncNode->electBaseLine, sNTrace(pSyncNode, "reset elect timer, min:%d, max:%d, ms:%d", pSyncNode->electBaseLine, 2 * pSyncNode->electBaseLine,
electMS); electMS);
@ -1674,17 +1699,17 @@ void syncNodeResetElectTimer(SSyncNode* pSyncNode) {
#ifdef BUILD_NO_CALL #ifdef BUILD_NO_CALL
static int32_t syncNodeDoStartHeartbeatTimer(SSyncNode* pSyncNode) { static int32_t syncNodeDoStartHeartbeatTimer(SSyncNode* pSyncNode) {
int32_t ret = 0; int32_t code = 0;
if (syncIsInit()) { if (syncIsInit()) {
(void)taosTmrReset(pSyncNode->FpHeartbeatTimerCB, pSyncNode->heartbeatTimerMS, (void*)pSyncNode->rid, TAOS_CHECK_RETURN(taosTmrReset(pSyncNode->FpHeartbeatTimerCB, pSyncNode->heartbeatTimerMS, (void*)pSyncNode->rid,
syncEnv()->pTimerManager, &pSyncNode->pHeartbeatTimer); syncEnv()->pTimerManager, &pSyncNode->pHeartbeatTimer));
atomic_store_64(&pSyncNode->heartbeatTimerLogicClock, pSyncNode->heartbeatTimerLogicClockUser); atomic_store_64(&pSyncNode->heartbeatTimerLogicClock, pSyncNode->heartbeatTimerLogicClockUser);
} else { } else {
sError("vgId:%d, start heartbeat timer error, sync env is stop", pSyncNode->vgId); sError("vgId:%d, start heartbeat timer error, sync env is stop", pSyncNode->vgId);
} }
sNTrace(pSyncNode, "start heartbeat timer, ms:%d", pSyncNode->heartbeatTimerMS); sNTrace(pSyncNode, "start heartbeat timer, ms:%d", pSyncNode->heartbeatTimerMS);
return ret; return code;
} }
#endif #endif
@ -1707,12 +1732,12 @@ int32_t syncNodeStartHeartbeatTimer(SSyncNode* pSyncNode) {
} }
int32_t syncNodeStopHeartbeatTimer(SSyncNode* pSyncNode) { int32_t syncNodeStopHeartbeatTimer(SSyncNode* pSyncNode) {
int32_t ret = 0; int32_t code = 0;
#if 0 #if 0
//TODO check return value TAOS_CHECK_RETURN(atomic_add_fetch_64(&pSyncNode->heartbeatTimerLogicClockUser, 1));
(void)atomic_add_fetch_64(&pSyncNode->heartbeatTimerLogicClockUser, 1); bool stop = taosTmrStop(pSyncNode->pHeartbeatTimer);
(void)taosTmrStop(pSyncNode->pHeartbeatTimer); sDebug("vgId:%d, stop heartbeat timer, stop:%d", pSyncNode->vgId, stop);
pSyncNode->pHeartbeatTimer = NULL; pSyncNode->pHeartbeatTimer = NULL;
#endif #endif
@ -1723,14 +1748,15 @@ int32_t syncNodeStopHeartbeatTimer(SSyncNode* pSyncNode) {
} }
} }
return ret; return code;
} }
#ifdef BUILD_NO_CALL #ifdef BUILD_NO_CALL
int32_t syncNodeRestartHeartbeatTimer(SSyncNode* pSyncNode) { int32_t syncNodeRestartHeartbeatTimer(SSyncNode* pSyncNode) {
// TODO check return value // TODO check return value
(void)syncNodeStopHeartbeatTimer(pSyncNode); int32_t code = 0;
(void)syncNodeStartHeartbeatTimer(pSyncNode); TAOS_CHECK_RETURN(syncNodeStopHeartbeatTimer(pSyncNode));
TAOS_CHECK_RETURN(syncNodeStartHeartbeatTimer(pSyncNode));
return 0; return 0;
} }
#endif #endif
@ -1806,6 +1832,7 @@ static bool syncIsConfigChanged(const SSyncCfg* pOldCfg, const SSyncCfg* pNewCfg
} }
int32_t syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncIndex lastConfigChangeIndex) { int32_t syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncIndex lastConfigChangeIndex) {
int32_t code = 0;
SSyncCfg oldConfig = pSyncNode->raftCfg.cfg; SSyncCfg oldConfig = pSyncNode->raftCfg.cfg;
if (!syncIsConfigChanged(&oldConfig, pNewConfig)) { if (!syncIsConfigChanged(&oldConfig, pNewConfig)) {
sInfo("vgId:1, sync not reconfig since not changed"); sInfo("vgId:1, sync not reconfig since not changed");
@ -1873,7 +1900,7 @@ int32_t syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncI
// init internal // init internal
pSyncNode->myNodeInfo = pSyncNode->raftCfg.cfg.nodeInfo[pSyncNode->raftCfg.cfg.myIndex]; pSyncNode->myNodeInfo = pSyncNode->raftCfg.cfg.nodeInfo[pSyncNode->raftCfg.cfg.myIndex];
(void)syncUtilNodeInfo2RaftId(&pSyncNode->myNodeInfo, pSyncNode->vgId, &pSyncNode->myRaftId); if (syncUtilNodeInfo2RaftId(&pSyncNode->myNodeInfo, pSyncNode->vgId, &pSyncNode->myRaftId) == false) return terrno;
// init peersNum, peers, peersId // init peersNum, peers, peersId
pSyncNode->peersNum = pSyncNode->raftCfg.cfg.totalReplicaNum - 1; pSyncNode->peersNum = pSyncNode->raftCfg.cfg.totalReplicaNum - 1;
@ -1886,14 +1913,17 @@ int32_t syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncI
} }
} }
for (int32_t i = 0; i < pSyncNode->peersNum; ++i) { for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
(void)syncUtilNodeInfo2RaftId(&pSyncNode->peersNodeInfo[i], pSyncNode->vgId, &pSyncNode->peersId[i]); if (syncUtilNodeInfo2RaftId(&pSyncNode->peersNodeInfo[i], pSyncNode->vgId, &pSyncNode->peersId[i]) == false)
return terrno;
} }
// init replicaNum, replicasId // init replicaNum, replicasId
pSyncNode->replicaNum = pSyncNode->raftCfg.cfg.replicaNum; pSyncNode->replicaNum = pSyncNode->raftCfg.cfg.replicaNum;
pSyncNode->totalReplicaNum = pSyncNode->raftCfg.cfg.totalReplicaNum; pSyncNode->totalReplicaNum = pSyncNode->raftCfg.cfg.totalReplicaNum;
for (int32_t i = 0; i < pSyncNode->raftCfg.cfg.totalReplicaNum; ++i) { for (int32_t i = 0; i < pSyncNode->raftCfg.cfg.totalReplicaNum; ++i) {
(void)syncUtilNodeInfo2RaftId(&pSyncNode->raftCfg.cfg.nodeInfo[i], pSyncNode->vgId, &pSyncNode->replicasId[i]); if (syncUtilNodeInfo2RaftId(&pSyncNode->raftCfg.cfg.nodeInfo[i], pSyncNode->vgId, &pSyncNode->replicasId[i]) ==
false)
return terrno;
} }
// update quorum first // update quorum first
@ -1939,7 +1969,7 @@ int32_t syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncI
// create new // create new
for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) { for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
if (pSyncNode->senders[i] == NULL) { if (pSyncNode->senders[i] == NULL) {
(void)snapshotSenderCreate(pSyncNode, i, &pSyncNode->senders[i]); TAOS_CHECK_RETURN(snapshotSenderCreate(pSyncNode, i, &pSyncNode->senders[i]));
if (pSyncNode->senders[i] == NULL) { if (pSyncNode->senders[i] == NULL) {
// will be created later while send snapshot // will be created later while send snapshot
sSError(pSyncNode->senders[i], "snapshot sender create failed while reconfig"); sSError(pSyncNode->senders[i], "snapshot sender create failed while reconfig");
@ -1961,10 +1991,10 @@ int32_t syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncI
} }
// persist cfg // persist cfg
(void)syncWriteCfgFile(pSyncNode); TAOS_CHECK_RETURN(syncWriteCfgFile(pSyncNode));
} else { } else {
// persist cfg // persist cfg
(void)syncWriteCfgFile(pSyncNode); TAOS_CHECK_RETURN(syncWriteCfgFile(pSyncNode));
sNInfo(pSyncNode, "do not config change from %d to %d", oldConfig.totalReplicaNum, pNewConfig->totalReplicaNum); sNInfo(pSyncNode, "do not config change from %d to %d", oldConfig.totalReplicaNum, pNewConfig->totalReplicaNum);
} }
@ -2015,7 +2045,7 @@ void syncNodeStepDown(SSyncNode* pSyncNode, SyncTerm newTerm) {
void syncNodeLeaderChangeRsp(SSyncNode* pSyncNode) { syncRespCleanRsp(pSyncNode->pSyncRespMgr); } void syncNodeLeaderChangeRsp(SSyncNode* pSyncNode) { syncRespCleanRsp(pSyncNode->pSyncRespMgr); }
void syncNodeBecomeFollower(SSyncNode* pSyncNode, const char* debugStr) { void syncNodeBecomeFollower(SSyncNode* pSyncNode, const char* debugStr) {
// maybe clear leader cache int32_t code = 0; // maybe clear leader cache
if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) { if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
pSyncNode->leaderCache = EMPTY_RAFT_ID; pSyncNode->leaderCache = EMPTY_RAFT_ID;
} }
@ -2025,7 +2055,10 @@ void syncNodeBecomeFollower(SSyncNode* pSyncNode, const char* debugStr) {
// state change // state change
pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER; pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER;
pSyncNode->roleTimeMs = taosGetTimestampMs(); pSyncNode->roleTimeMs = taosGetTimestampMs();
(void)syncNodeStopHeartbeatTimer(pSyncNode); if ((code = syncNodeStopHeartbeatTimer(pSyncNode)) != 0) {
sError("vgId:%d, failed to stop heartbeat timer since %s", pSyncNode->vgId, tstrerror(code));
return;
}
// trace log // trace log
sNTrace(pSyncNode, "become follower %s", debugStr); sNTrace(pSyncNode, "become follower %s", debugStr);
@ -2042,7 +2075,10 @@ void syncNodeBecomeFollower(SSyncNode* pSyncNode, const char* debugStr) {
pSyncNode->minMatchIndex = SYNC_INDEX_INVALID; pSyncNode->minMatchIndex = SYNC_INDEX_INVALID;
// reset log buffer // reset log buffer
(void)syncLogBufferReset(pSyncNode->pLogBuf, pSyncNode); if ((code = syncLogBufferReset(pSyncNode->pLogBuf, pSyncNode)) != 0) {
sError("vgId:%d, failed to reset log buffer since %s", pSyncNode->vgId, tstrerror(code));
return;
}
// reset elect timer // reset elect timer
syncNodeResetElectTimer(pSyncNode); syncNodeResetElectTimer(pSyncNode);
@ -2069,7 +2105,11 @@ void syncNodeBecomeLearner(SSyncNode* pSyncNode, const char* debugStr) {
pSyncNode->minMatchIndex = SYNC_INDEX_INVALID; pSyncNode->minMatchIndex = SYNC_INDEX_INVALID;
// reset log buffer // reset log buffer
(void)syncLogBufferReset(pSyncNode->pLogBuf, pSyncNode); int32_t code = 0;
if ((code = syncLogBufferReset(pSyncNode->pLogBuf, pSyncNode)) != 0) {
sError("vgId:%d, failed to reset log buffer since %s", pSyncNode->vgId, tstrerror(code));
return;
};
} }
// TLA+ Spec // TLA+ Spec
@ -2091,6 +2131,7 @@ void syncNodeBecomeLearner(SSyncNode* pSyncNode, const char* debugStr) {
// /\ UNCHANGED <<messages, currentTerm, votedFor, candidateVars, logVars>> // /\ UNCHANGED <<messages, currentTerm, votedFor, candidateVars, logVars>>
// //
void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr) { void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr) {
int32_t code = 0;
pSyncNode->becomeLeaderNum++; pSyncNode->becomeLeaderNum++;
pSyncNode->hbrSlowNum = 0; pSyncNode->hbrSlowNum = 0;
@ -2122,7 +2163,10 @@ void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr) {
} }
// init peer mgr // init peer mgr
(void)syncNodePeerStateInit(pSyncNode); if ((code = syncNodePeerStateInit(pSyncNode)) != 0) {
sError("vgId:%d, failed to init peer state since %s", pSyncNode->vgId, tstrerror(code));
return;
}
#if 0 #if 0
// update sender private term // update sender private term
@ -2143,13 +2187,22 @@ void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr) {
} }
// stop elect timer // stop elect timer
(void)syncNodeStopElectTimer(pSyncNode); if ((code = syncNodeStopElectTimer(pSyncNode)) != 0) {
sError("vgId:%d, failed to stop elect timer since %s", pSyncNode->vgId, tstrerror(code));
return;
}
// start heartbeat timer // start heartbeat timer
(void)syncNodeStartHeartbeatTimer(pSyncNode); if ((code = syncNodeStartHeartbeatTimer(pSyncNode)) != 0) {
sError("vgId:%d, failed to start heartbeat timer since %s", pSyncNode->vgId, tstrerror(code));
return;
}
// send heartbeat right now // send heartbeat right now
(void)syncNodeHeartbeatPeers(pSyncNode); if ((code = syncNodeHeartbeatPeers(pSyncNode)) != 0) {
sError("vgId:%d, failed to send heartbeat to peers since %s", pSyncNode->vgId, tstrerror(code));
return;
}
// call back // call back
if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpBecomeLeaderCb != NULL) { if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpBecomeLeaderCb != NULL) {
@ -2160,13 +2213,17 @@ void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr) {
pSyncNode->minMatchIndex = SYNC_INDEX_INVALID; pSyncNode->minMatchIndex = SYNC_INDEX_INVALID;
// reset log buffer // reset log buffer
(void)syncLogBufferReset(pSyncNode->pLogBuf, pSyncNode); if ((code = syncLogBufferReset(pSyncNode->pLogBuf, pSyncNode)) != 0) {
sError("vgId:%d, failed to reset log buffer since %s", pSyncNode->vgId, tstrerror(code));
return;
}
// trace log // trace log
sNInfo(pSyncNode, "become leader %s", debugStr); sNInfo(pSyncNode, "become leader %s", debugStr);
} }
void syncNodeBecomeAssignedLeader(SSyncNode* pSyncNode) { void syncNodeBecomeAssignedLeader(SSyncNode* pSyncNode) {
int32_t code = 0;
pSyncNode->becomeAssignedLeaderNum++; pSyncNode->becomeAssignedLeaderNum++;
pSyncNode->hbrSlowNum = 0; pSyncNode->hbrSlowNum = 0;
@ -2198,7 +2255,10 @@ void syncNodeBecomeAssignedLeader(SSyncNode* pSyncNode) {
} }
// init peer mgr // init peer mgr
(void)syncNodePeerStateInit(pSyncNode); if ((code = syncNodePeerStateInit(pSyncNode)) != 0) {
sError("vgId:%d, failed to init peer state since %s", pSyncNode->vgId, tstrerror(code));
return;
}
// close receiver // close receiver
if (snapshotReceiverIsStart(pSyncNode->pNewNodeReceiver)) { if (snapshotReceiverIsStart(pSyncNode->pNewNodeReceiver)) {
@ -2206,13 +2266,22 @@ void syncNodeBecomeAssignedLeader(SSyncNode* pSyncNode) {
} }
// stop elect timer // stop elect timer
(void)syncNodeStopElectTimer(pSyncNode); if ((code = syncNodeStopElectTimer(pSyncNode)) != 0) {
sError("vgId:%d, failed to stop elect timer since %s", pSyncNode->vgId, tstrerror(code));
return;
}
// start heartbeat timer // start heartbeat timer
(void)syncNodeStartHeartbeatTimer(pSyncNode); if ((code = syncNodeStartHeartbeatTimer(pSyncNode)) != 0) {
sError("vgId:%d, failed to start heartbeat timer since %s", pSyncNode->vgId, tstrerror(code));
return;
}
// send heartbeat right now // send heartbeat right now
(void)syncNodeHeartbeatPeers(pSyncNode); if ((code = syncNodeHeartbeatPeers(pSyncNode)) != 0) {
sError("vgId:%d, failed to send heartbeat to peers since %s", pSyncNode->vgId, tstrerror(code));
return;
}
// call back // call back
if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpBecomeAssignedLeaderCb != NULL) { if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpBecomeAssignedLeaderCb != NULL) {
@ -2223,7 +2292,10 @@ void syncNodeBecomeAssignedLeader(SSyncNode* pSyncNode) {
pSyncNode->minMatchIndex = SYNC_INDEX_INVALID; pSyncNode->minMatchIndex = SYNC_INDEX_INVALID;
// reset log buffer // reset log buffer
(void)syncLogBufferReset(pSyncNode->pLogBuf, pSyncNode); if ((code = syncLogBufferReset(pSyncNode->pLogBuf, pSyncNode)) != 0) {
sError("vgId:%d, failed to reset log buffer since %s", pSyncNode->vgId, tstrerror(code));
return;
}
// trace log // trace log
sNInfo(pSyncNode, "become assigned leader"); sNInfo(pSyncNode, "become assigned leader");
@ -2513,8 +2585,10 @@ static void syncNodeEqPingTimer(void* param, void* tmrId) {
} }
_out: _out:
(void)taosTmrReset(syncNodeEqPingTimer, pNode->pingTimerMS, (void*)pNode->rid, syncEnv()->pTimerManager, if ((code = taosTmrReset(syncNodeEqPingTimer, pNode->pingTimerMS, (void*)pNode->rid, syncEnv()->pTimerManager,
&pNode->pPingTimer); &pNode->pPingTimer)) != 0) {
sError("failed to reset ping timer since %s", tstrerror(code));
};
} }
syncNodeRelease(pNode); syncNodeRelease(pNode);
} }
@ -2591,8 +2665,9 @@ static void syncNodeEqHeartbeatTimer(void* param, void* tmrId) {
} }
_out: _out:
(void)taosTmrReset(syncNodeEqHeartbeatTimer, pNode->heartbeatTimerMS, (void*)pNode->rid, syncEnv()->pTimerManager, if (taosTmrReset(syncNodeEqHeartbeatTimer, pNode->heartbeatTimerMS, (void*)pNode->rid, syncEnv()->pTimerManager,
&pNode->pHeartbeatTimer); &pNode->pHeartbeatTimer) != 0)
return;
} else { } else {
sTrace("==syncNodeEqHeartbeatTimer== heartbeatTimerLogicClock:%" PRId64 ", heartbeatTimerLogicClockUser:%" PRId64, sTrace("==syncNodeEqHeartbeatTimer== heartbeatTimerLogicClock:%" PRId64 ", heartbeatTimerLogicClockUser:%" PRId64,
@ -2603,6 +2678,7 @@ static void syncNodeEqHeartbeatTimer(void* param, void* tmrId) {
#endif #endif
static void syncNodeEqPeerHeartbeatTimer(void* param, void* tmrId) { static void syncNodeEqPeerHeartbeatTimer(void* param, void* tmrId) {
int32_t code = 0;
int64_t hbDataRid = (int64_t)param; int64_t hbDataRid = (int64_t)param;
int64_t tsNow = taosGetTimestampMs(); int64_t tsNow = taosGetTimestampMs();
@ -2646,7 +2722,12 @@ static void syncNodeEqPeerHeartbeatTimer(void* param, void* tmrId) {
pData->execTime += pSyncTimer->timerMS; pData->execTime += pSyncTimer->timerMS;
SRpcMsg rpcMsg = {0}; SRpcMsg rpcMsg = {0};
(void)syncBuildHeartbeat(&rpcMsg, pSyncNode->vgId); if ((code = syncBuildHeartbeat(&rpcMsg, pSyncNode->vgId)) != 0) {
sError("vgId:%d, failed to build heartbeat msg since %s", pSyncNode->vgId, tstrerror(code));
syncNodeRelease(pSyncNode);
syncHbTimerDataRelease(pData);
return;
}
pSyncNode->minMatchIndex = syncMinMatchIndex(pSyncNode); pSyncNode->minMatchIndex = syncMinMatchIndex(pSyncNode);
@ -2668,14 +2749,22 @@ static void syncNodeEqPeerHeartbeatTimer(void* param, void* tmrId) {
STraceId* trace = &(rpcMsg.info.traceId); STraceId* trace = &(rpcMsg.info.traceId);
sGTrace("vgId:%d, send sync-heartbeat to dnode:%d", pSyncNode->vgId, DID(&(pSyncMsg->destId))); sGTrace("vgId:%d, send sync-heartbeat to dnode:%d", pSyncNode->vgId, DID(&(pSyncMsg->destId)));
syncLogSendHeartbeat(pSyncNode, pSyncMsg, false, timerElapsed, pData->execTime); syncLogSendHeartbeat(pSyncNode, pSyncMsg, false, timerElapsed, pData->execTime);
(void)syncNodeSendHeartbeat(pSyncNode, &pSyncMsg->destId, &rpcMsg); int ret = syncNodeSendHeartbeat(pSyncNode, &pSyncMsg->destId, &rpcMsg);
if (ret != 0) {
sError("vgId:%d, failed to send heartbeat since %s", pSyncNode->vgId, tstrerror(ret));
}
} else { } else {
} }
if (syncIsInit()) { if (syncIsInit()) {
// sTrace("vgId:%d, reset peer hb timer", pSyncNode->vgId); // sTrace("vgId:%d, reset peer hb timer", pSyncNode->vgId);
(void)taosTmrReset(syncNodeEqPeerHeartbeatTimer, pSyncTimer->timerMS / HEARTBEAT_TICK_NUM, (void*)hbDataRid, if ((code = taosTmrReset(syncNodeEqPeerHeartbeatTimer, pSyncTimer->timerMS / HEARTBEAT_TICK_NUM,
syncEnv()->pTimerManager, &pSyncTimer->pTimer); (void*)hbDataRid, syncEnv()->pTimerManager, &pSyncTimer->pTimer)) != 0) {
sError("vgId:%d, reset peer hb timer error, %s", pSyncNode->vgId, tstrerror(code));
syncNodeRelease(pSyncNode);
syncHbTimerDataRelease(pData);
return;
}
} else { } else {
sError("sync env is stop, reset peer hb timer error"); sError("sync env is stop, reset peer hb timer error");
} }
@ -2715,6 +2804,7 @@ int32_t syncCacheEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry, LRUHand
void syncBuildConfigFromReq(SAlterVnodeReplicaReq* pReq, SSyncCfg* cfg) { // TODO SAlterVnodeReplicaReq name is proper? void syncBuildConfigFromReq(SAlterVnodeReplicaReq* pReq, SSyncCfg* cfg) { // TODO SAlterVnodeReplicaReq name is proper?
cfg->replicaNum = 0; cfg->replicaNum = 0;
cfg->totalReplicaNum = 0; cfg->totalReplicaNum = 0;
int32_t code = 0;
for (int i = 0; i < pReq->replica; ++i) { for (int i = 0; i < pReq->replica; ++i) {
SNodeInfo* pNode = &cfg->nodeInfo[i]; SNodeInfo* pNode = &cfg->nodeInfo[i];
@ -2722,9 +2812,9 @@ void syncBuildConfigFromReq(SAlterVnodeReplicaReq* pReq, SSyncCfg* cfg) { // TO
pNode->nodePort = pReq->replicas[i].port; pNode->nodePort = pReq->replicas[i].port;
tstrncpy(pNode->nodeFqdn, pReq->replicas[i].fqdn, sizeof(pNode->nodeFqdn)); tstrncpy(pNode->nodeFqdn, pReq->replicas[i].fqdn, sizeof(pNode->nodeFqdn));
pNode->nodeRole = TAOS_SYNC_ROLE_VOTER; pNode->nodeRole = TAOS_SYNC_ROLE_VOTER;
(void)tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort); bool update = tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort);
sInfo("vgId:%d, replica:%d ep:%s:%u dnode:%d nodeRole:%d", pReq->vgId, i, pNode->nodeFqdn, pNode->nodePort, sInfo("vgId:%d, replica:%d ep:%s:%u dnode:%d nodeRole:%d, update:%d", pReq->vgId, i, pNode->nodeFqdn,
pNode->nodeId, pNode->nodeRole); pNode->nodePort, pNode->nodeId, pNode->nodeRole, update);
cfg->replicaNum++; cfg->replicaNum++;
} }
if (pReq->selfIndex != -1) { if (pReq->selfIndex != -1) {
@ -2736,9 +2826,9 @@ void syncBuildConfigFromReq(SAlterVnodeReplicaReq* pReq, SSyncCfg* cfg) { // TO
pNode->nodePort = pReq->learnerReplicas[cfg->totalReplicaNum].port; pNode->nodePort = pReq->learnerReplicas[cfg->totalReplicaNum].port;
pNode->nodeRole = TAOS_SYNC_ROLE_LEARNER; pNode->nodeRole = TAOS_SYNC_ROLE_LEARNER;
tstrncpy(pNode->nodeFqdn, pReq->learnerReplicas[cfg->totalReplicaNum].fqdn, sizeof(pNode->nodeFqdn)); tstrncpy(pNode->nodeFqdn, pReq->learnerReplicas[cfg->totalReplicaNum].fqdn, sizeof(pNode->nodeFqdn));
(void)tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort); bool update = tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort);
sInfo("vgId:%d, replica:%d ep:%s:%u dnode:%d nodeRole:%d", pReq->vgId, i, pNode->nodeFqdn, pNode->nodePort, sInfo("vgId:%d, replica:%d ep:%s:%u dnode:%d nodeRole:%d, update:%d", pReq->vgId, i, pNode->nodeFqdn,
pNode->nodeId, pNode->nodeRole); pNode->nodePort, pNode->nodeId, pNode->nodeRole, update);
cfg->totalReplicaNum++; cfg->totalReplicaNum++;
} }
cfg->totalReplicaNum += pReq->replica; cfg->totalReplicaNum += pReq->replica;
@ -2849,9 +2939,9 @@ int32_t syncNodeRebuildPeerAndCfg(SSyncNode* ths, SSyncCfg* cfg) {
syncUtilNodeInfo2EpSet(&ths->peersNodeInfo[i], &ths->peersEpset[i]); syncUtilNodeInfo2EpSet(&ths->peersNodeInfo[i], &ths->peersEpset[i]);
if (!syncUtilNodeInfo2RaftId(&ths->peersNodeInfo[i], ths->vgId, &ths->peersId[i])) { if (syncUtilNodeInfo2RaftId(&ths->peersNodeInfo[i], ths->vgId, &ths->peersId[i]) == false) {
sError("vgId:%d, failed to determine raft member id, peer:%d", ths->vgId, i); sError("vgId:%d, failed to determine raft member id, peer:%d", ths->vgId, i);
return -1; return terrno;
} }
i++; i++;
@ -2919,8 +3009,7 @@ int32_t syncNodeRebuildAndCopyIfExist(SSyncNode* ths, int32_t oldtotalReplicaNum
ths->replicaNum = ths->raftCfg.cfg.replicaNum; ths->replicaNum = ths->raftCfg.cfg.replicaNum;
ths->totalReplicaNum = ths->raftCfg.cfg.totalReplicaNum; ths->totalReplicaNum = ths->raftCfg.cfg.totalReplicaNum;
for (int32_t i = 0; i < ths->raftCfg.cfg.totalReplicaNum; ++i) { for (int32_t i = 0; i < ths->raftCfg.cfg.totalReplicaNum; ++i) {
if (!syncUtilNodeInfo2RaftId(&ths->raftCfg.cfg.nodeInfo[i], ths->vgId, &ths->replicasId[i])) if (syncUtilNodeInfo2RaftId(&ths->raftCfg.cfg.nodeInfo[i], ths->vgId, &ths->replicasId[i]) == false) return terrno;
return TSDB_CODE_SYN_INTERNAL_ERROR;
} }
// 2.rebuild MatchIndex, remove deleted one // 2.rebuild MatchIndex, remove deleted one
@ -3287,7 +3376,10 @@ int32_t syncNodeAppend(SSyncNode* ths, SSyncRaftEntry* pEntry) {
// append to log buffer // append to log buffer
if ((code = syncLogBufferAppend(ths->pLogBuf, ths, pEntry)) < 0) { if ((code = syncLogBufferAppend(ths->pLogBuf, ths, pEntry)) < 0) {
sError("vgId:%d, failed to enqueue sync log buffer, index:%" PRId64, ths->vgId, pEntry->index); sError("vgId:%d, failed to enqueue sync log buffer, index:%" PRId64, ths->vgId, pEntry->index);
(void)syncFsmExecute(ths, ths->pFsm, ths->state, raftStoreGetTerm(ths), pEntry, terrno, false); int32_t ret = 0;
if ((ret = syncFsmExecute(ths, ths->pFsm, ths->state, raftStoreGetTerm(ths), pEntry, terrno, false)) != 0) {
sError("vgId:%d, failed to execute fsm, since %s", ths->vgId, tstrerror(ret));
}
syncEntryDestroy(pEntry); syncEntryDestroy(pEntry);
pEntry = NULL; pEntry = NULL;
goto _out; goto _out;
@ -3305,7 +3397,7 @@ _out:;
ths->pLogBuf->matchIndex, ths->pLogBuf->endIndex); ths->pLogBuf->matchIndex, ths->pLogBuf->endIndex);
if (code == 0 && ths->state == TAOS_SYNC_STATE_ASSIGNED_LEADER) { if (code == 0 && ths->state == TAOS_SYNC_STATE_ASSIGNED_LEADER) {
(void)syncNodeUpdateAssignedCommitIndex(ths, matchIndex); TAOS_CHECK_RETURN(syncNodeUpdateAssignedCommitIndex(ths, matchIndex));
if (ths->fsmState != SYNC_FSM_STATE_INCOMPLETE && if (ths->fsmState != SYNC_FSM_STATE_INCOMPLETE &&
syncLogBufferCommit(ths->pLogBuf, ths, ths->assignedCommitIndex) < 0) { syncLogBufferCommit(ths->pLogBuf, ths, ths->assignedCommitIndex) < 0) {
@ -3320,7 +3412,8 @@ _out:;
} }
// single replica // single replica
(void)syncNodeUpdateCommitIndex(ths, matchIndex); SyncIndex returnIndex = syncNodeUpdateCommitIndex(ths, matchIndex);
sTrace("vgId:%d, update commit return index %" PRId64 "", ths->vgId, returnIndex);
if (ths->fsmState != SYNC_FSM_STATE_INCOMPLETE && if (ths->fsmState != SYNC_FSM_STATE_INCOMPLETE &&
(code = syncLogBufferCommit(ths->pLogBuf, ths, ths->commitIndex)) < 0) { (code = syncLogBufferCommit(ths->pLogBuf, ths, ths->commitIndex)) < 0) {
@ -3442,7 +3535,7 @@ int32_t syncNodeOnHeartbeat(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
} }
SRpcMsg rpcMsg = {0}; SRpcMsg rpcMsg = {0};
(void)syncBuildHeartbeatReply(&rpcMsg, ths->vgId); TAOS_CHECK_RETURN(syncBuildHeartbeatReply(&rpcMsg, ths->vgId));
SyncTerm currentTerm = raftStoreGetTerm(ths); SyncTerm currentTerm = raftStoreGetTerm(ths);
SyncHeartbeatReply* pMsgReply = rpcMsg.pCont; SyncHeartbeatReply* pMsgReply = rpcMsg.pCont;
@ -3470,7 +3563,7 @@ int32_t syncNodeOnHeartbeat(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
if (ths->state == TAOS_SYNC_STATE_FOLLOWER || ths->state == TAOS_SYNC_STATE_LEARNER) { if (ths->state == TAOS_SYNC_STATE_FOLLOWER || ths->state == TAOS_SYNC_STATE_LEARNER) {
SRpcMsg rpcMsgLocalCmd = {0}; SRpcMsg rpcMsgLocalCmd = {0};
(void)syncBuildLocalCmd(&rpcMsgLocalCmd, ths->vgId); TAOS_CHECK_RETURN(syncBuildLocalCmd(&rpcMsgLocalCmd, ths->vgId));
SyncLocalCmd* pSyncMsg = rpcMsgLocalCmd.pCont; SyncLocalCmd* pSyncMsg = rpcMsgLocalCmd.pCont;
pSyncMsg->cmd = pSyncMsg->cmd =
@ -3494,7 +3587,7 @@ int32_t syncNodeOnHeartbeat(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
if (pMsg->term >= currentTerm && if (pMsg->term >= currentTerm &&
(ths->state == TAOS_SYNC_STATE_LEADER || ths->state == TAOS_SYNC_STATE_ASSIGNED_LEADER)) { (ths->state == TAOS_SYNC_STATE_LEADER || ths->state == TAOS_SYNC_STATE_ASSIGNED_LEADER)) {
SRpcMsg rpcMsgLocalCmd = {0}; SRpcMsg rpcMsgLocalCmd = {0};
(void)syncBuildLocalCmd(&rpcMsgLocalCmd, ths->vgId); TAOS_CHECK_RETURN(syncBuildLocalCmd(&rpcMsgLocalCmd, ths->vgId));
SyncLocalCmd* pSyncMsg = rpcMsgLocalCmd.pCont; SyncLocalCmd* pSyncMsg = rpcMsgLocalCmd.pCont;
pSyncMsg->cmd = SYNC_LOCAL_CMD_STEP_DOWN; pSyncMsg->cmd = SYNC_LOCAL_CMD_STEP_DOWN;
@ -3577,7 +3670,8 @@ int32_t syncNodeOnLocalCmd(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
return TSDB_CODE_SYN_INTERNAL_ERROR; return TSDB_CODE_SYN_INTERNAL_ERROR;
} }
if (pMsg->currentTerm == matchTerm) { if (pMsg->currentTerm == matchTerm) {
(void)syncNodeUpdateCommitIndex(ths, pMsg->commitIndex); SyncIndex returnIndex = syncNodeUpdateCommitIndex(ths, pMsg->commitIndex);
sTrace("vgId:%d, update commit return index %" PRId64 "", ths->vgId, returnIndex);
} }
if (ths->fsmState != SYNC_FSM_STATE_INCOMPLETE && syncLogBufferCommit(ths->pLogBuf, ths, ths->commitIndex) < 0) { if (ths->fsmState != SYNC_FSM_STATE_INCOMPLETE && syncLogBufferCommit(ths->pLogBuf, ths, ths->commitIndex) < 0) {
sError("vgId:%d, failed to commit raft log since %s. commit index:%" PRId64 "", ths->vgId, terrstr(), sError("vgId:%d, failed to commit raft log since %s. commit index:%" PRId64 "", ths->vgId, terrstr(),

View File

@ -163,7 +163,7 @@ int32_t syncWriteCfgFile(SSyncNode *pNode) {
TAOS_CHECK_EXIT(TAOS_SYSTEM_ERROR(errno)); TAOS_CHECK_EXIT(TAOS_SYSTEM_ERROR(errno));
} }
(void)taosCloseFile(&pFile); TAOS_CHECK_EXIT(taosCloseFile(&pFile));
TAOS_CHECK_EXIT(taosRenameFile(file, realfile)); TAOS_CHECK_EXIT(taosRenameFile(file, realfile));
sInfo("vgId:%d, succeed to write sync cfg file:%s, len:%d, lastConfigIndex:%" PRId64 ", changeVersion:%d", sInfo("vgId:%d, succeed to write sync cfg file:%s, len:%d, lastConfigIndex:%" PRId64 ", changeVersion:%d",

View File

@ -134,7 +134,7 @@ int32_t raftStoreWriteFile(SSyncNode *pNode) {
if (taosFsyncFile(pFile) < 0) TAOS_CHECK_GOTO(terrno, &lino, _OVER); if (taosFsyncFile(pFile) < 0) TAOS_CHECK_GOTO(terrno, &lino, _OVER);
(void)taosCloseFile(&pFile); TAOS_CHECK_GOTO(taosCloseFile(&pFile), &lino, _OVER);
if (taosRenameFile(file, realfile) != 0) TAOS_CHECK_GOTO(terrno, &lino, _OVER); if (taosRenameFile(file, realfile) != 0) TAOS_CHECK_GOTO(terrno, &lino, _OVER);
code = 0; code = 0;
@ -168,21 +168,30 @@ bool raftStoreHasVoted(SSyncNode *pNode) {
void raftStoreVote(SSyncNode *pNode, SRaftId *pRaftId) { void raftStoreVote(SSyncNode *pNode, SRaftId *pRaftId) {
(void)taosThreadMutexLock(&pNode->raftStore.mutex); (void)taosThreadMutexLock(&pNode->raftStore.mutex);
pNode->raftStore.voteFor = *pRaftId; pNode->raftStore.voteFor = *pRaftId;
(void)raftStoreWriteFile(pNode); int32_t code = 0;
if ((code = raftStoreWriteFile(pNode)) != 0) {
sError("vgId:%d, failed to write raft store file since %s", pNode->vgId, tstrerror(code));
}
(void)taosThreadMutexUnlock(&pNode->raftStore.mutex); (void)taosThreadMutexUnlock(&pNode->raftStore.mutex);
} }
void raftStoreClearVote(SSyncNode *pNode) { void raftStoreClearVote(SSyncNode *pNode) {
(void)taosThreadMutexLock(&pNode->raftStore.mutex); (void)taosThreadMutexLock(&pNode->raftStore.mutex);
pNode->raftStore.voteFor = EMPTY_RAFT_ID; pNode->raftStore.voteFor = EMPTY_RAFT_ID;
(void)raftStoreWriteFile(pNode); int32_t code = 0;
if ((code = raftStoreWriteFile(pNode)) != 0) {
sError("vgId:%d, failed to write raft store file since %s", pNode->vgId, tstrerror(code));
}
(void)taosThreadMutexUnlock(&pNode->raftStore.mutex); (void)taosThreadMutexUnlock(&pNode->raftStore.mutex);
} }
void raftStoreNextTerm(SSyncNode *pNode) { void raftStoreNextTerm(SSyncNode *pNode) {
(void)taosThreadMutexLock(&pNode->raftStore.mutex); (void)taosThreadMutexLock(&pNode->raftStore.mutex);
pNode->raftStore.currentTerm++; pNode->raftStore.currentTerm++;
(void)raftStoreWriteFile(pNode); int32_t code = 0;
if ((code = raftStoreWriteFile(pNode)) != 0) {
sError("vgId:%d, failed to write raft store file since %s", pNode->vgId, tstrerror(code));
}
(void)taosThreadMutexUnlock(&pNode->raftStore.mutex); (void)taosThreadMutexUnlock(&pNode->raftStore.mutex);
} }
@ -190,7 +199,10 @@ void raftStoreSetTerm(SSyncNode *pNode, SyncTerm term) {
(void)taosThreadMutexLock(&pNode->raftStore.mutex); (void)taosThreadMutexLock(&pNode->raftStore.mutex);
if (pNode->raftStore.currentTerm < term) { if (pNode->raftStore.currentTerm < term) {
pNode->raftStore.currentTerm = term; pNode->raftStore.currentTerm = term;
(void)raftStoreWriteFile(pNode); int32_t code = 0;
if ((code = raftStoreWriteFile(pNode)) != 0) {
sError("vgId:%d, failed to write raft store file since %s", pNode->vgId, tstrerror(code));
}
} }
(void)taosThreadMutexUnlock(&pNode->raftStore.mutex); (void)taosThreadMutexUnlock(&pNode->raftStore.mutex);
} }

View File

@ -118,8 +118,9 @@ int32_t syncNodeHeartbeatPeers(SSyncNode* pSyncNode) {
STraceId* trace = &(rpcMsg.info.traceId); STraceId* trace = &(rpcMsg.info.traceId);
sGTrace("vgId:%d, send sync-heartbeat to dnode:%d", pSyncNode->vgId, DID(&(pSyncMsg->destId))); sGTrace("vgId:%d, send sync-heartbeat to dnode:%d", pSyncNode->vgId, DID(&(pSyncMsg->destId)));
syncLogSendHeartbeat(pSyncNode, pSyncMsg, true, 0, 0); syncLogSendHeartbeat(pSyncNode, pSyncMsg, true, 0, 0);
if (syncNodeSendHeartbeat(pSyncNode, &pSyncMsg->destId, &rpcMsg) != 0) { int32_t ret = syncNodeSendHeartbeat(pSyncNode, &pSyncMsg->destId, &rpcMsg);
sError("vgId:%d, failed to send sync-heartbeat to dnode:%d", pSyncNode->vgId, DID(&(pSyncMsg->destId))); if (ret != 0) {
sError("vgId:%d, failed to send sync-heartbeat since %s", pSyncNode->vgId, tstrerror(ret));
} }
} }

View File

@ -137,7 +137,7 @@ int32_t syncNodeOnRequestVote(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
// trace log // trace log
syncLogRecvRequestVote(ths, pMsg, pReply->voteGranted, "", "proceed"); syncLogRecvRequestVote(ths, pMsg, pReply->voteGranted, "", "proceed");
syncLogSendRequestVoteReply(ths, pReply, ""); syncLogSendRequestVoteReply(ths, pReply, "");
(void)syncNodeSendMsgById(&pReply->destId, ths, &rpcMsg); TAOS_CHECK_RETURN(syncNodeSendMsgById(&pReply->destId, ths, &rpcMsg));
if (resetElect) syncNodeResetElectTimer(ths); if (resetElect) syncNodeResetElectTimer(ths);