Merge pull request #27324 from taosdata/feat/sync

enh: make some simple adjustments to the sync log
This commit is contained in:
Shengliang Guan 2024-08-20 15:01:41 +08:00 committed by GitHub
commit 7c64afca11
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
23 changed files with 160 additions and 200 deletions

1
.gitignore vendored
View File

@ -134,3 +134,4 @@ tags
.clangd
*CMakeCache*
*CMakeFiles*
.history/

View File

@ -156,11 +156,11 @@ typedef struct SSnapshotParam {
typedef struct SSnapshot {
int32_t type;
SSyncTLV* data;
SSyncTLV* data;
ESyncFsmState state;
SyncIndex lastApplyIndex;
SyncTerm lastApplyTerm;
SyncIndex lastConfigIndex;
SyncIndex lastApplyIndex;
SyncTerm lastApplyTerm;
SyncIndex lastConfigIndex;
} SSnapshot;
typedef struct SSnapshotMeta {
@ -263,16 +263,16 @@ typedef struct SSyncState {
int64_t startTimeMs;
} SSyncState;
int32_t syncInit();
void syncCleanUp();
int64_t syncOpen(SSyncInfo* pSyncInfo, int32_t vnodeVersion);
int32_t syncStart(int64_t rid);
void syncStop(int64_t rid);
void syncPreStop(int64_t rid);
void syncPostStop(int64_t rid);
int32_t syncPropose(int64_t rid, SRpcMsg* pMsg, bool isWeak, int64_t* seq);
int32_t syncCheckMember(int64_t rid);
int32_t syncIsCatchUp(int64_t rid);
int32_t syncInit();
void syncCleanUp();
int64_t syncOpen(SSyncInfo* pSyncInfo, int32_t vnodeVersion);
int32_t syncStart(int64_t rid);
void syncStop(int64_t rid);
void syncPreStop(int64_t rid);
void syncPostStop(int64_t rid);
int32_t syncPropose(int64_t rid, SRpcMsg* pMsg, bool isWeak, int64_t* seq);
int32_t syncCheckMember(int64_t rid);
int32_t syncIsCatchUp(int64_t rid);
ESyncRole syncGetRole(int64_t rid);
int64_t syncGetTerm(int64_t rid);
int32_t syncProcessMsg(int64_t rid, SRpcMsg* pMsg);
@ -296,7 +296,7 @@ int32_t syncGetAssignedLogSynced(int64_t rid);
void syncGetRetryEpSet(int64_t rid, SEpSet* pEpSet);
const char* syncStr(ESyncState state);
int32_t syncNodeGetConfig(int64_t rid, SSyncCfg *cfg);
int32_t syncNodeGetConfig(int64_t rid, SSyncCfg* cfg);
// util
int32_t syncSnapInfoDataRealloc(SSnapshot* pSnap, int32_t size);

View File

@ -29,8 +29,7 @@ typedef void (*RefFp)(void *);
int32_t taosOpenRef(int32_t max, RefFp fp);
// close the reference set, refId is the return value by taosOpenRef
// return 0 if success. On error, -1 is returned, and terrno is set appropriately
int32_t taosCloseRef(int32_t rsetId);
void taosCloseRef(int32_t rsetId);
// add ref, p is the pointer to resource or pointer ID
// return Reference ID(rid) allocated. On error, -1 is returned, and terrno is set appropriately

View File

@ -74,15 +74,11 @@ void taos_cleanup(void) {
int32_t id = clientReqRefPool;
clientReqRefPool = -1;
if (TSDB_CODE_SUCCESS != taosCloseRef(id)) {
tscWarn("failed to close clientReqRefPool");
}
taosCloseRef(id);
id = clientConnRefPool;
clientConnRefPool = -1;
if (TSDB_CODE_SUCCESS != taosCloseRef(id)) {
tscWarn("failed to close clientReqRefPool");
}
taosCloseRef(id);
nodesDestroyAllocatorSet();
cleanupAppInfo();

View File

@ -1184,7 +1184,7 @@ void tmqMgmtClose(void) {
}
if (tmqMgmt.rsetId >= 0) {
(void)taosCloseRef(tmqMgmt.rsetId);
taosCloseRef(tmqMgmt.rsetId);
tmqMgmt.rsetId = -1;
}
}

View File

@ -486,7 +486,7 @@ int32_t mndInitSync(SMnode *pMnode) {
int32_t code = 0;
(void)tsem_init(&pMgmt->syncSem, 0, 0);
pMgmt->sync = syncOpen(&syncInfo, true);
pMgmt->sync = syncOpen(&syncInfo, 1); // always check
if (pMgmt->sync <= 0) {
if (terrno != 0) code = terrno;
mError("failed to open sync since %s", tstrerror(code));

View File

@ -69,7 +69,7 @@ int32_t smaInit() {
if (!smaMgmt.refHash || !smaMgmt.tmrHandle) {
code = terrno;
(void)taosCloseRef(smaMgmt.rsetId);
taosCloseRef(smaMgmt.rsetId);
if (smaMgmt.refHash) {
taosHashCleanup(smaMgmt.refHash);
smaMgmt.refHash = NULL;
@ -103,7 +103,7 @@ void smaCleanUp() {
}
if (old == 1) {
(void)taosCloseRef(smaMgmt.rsetId);
taosCloseRef(smaMgmt.rsetId);
taosHashCleanup(smaMgmt.refHash);
smaMgmt.refHash = NULL;
taosTmrCleanUp(smaMgmt.tmrHandle);

View File

@ -31,7 +31,7 @@ int32_t exchangeObjRefPool = -1;
static void cleanupRefPool() {
int32_t ref = atomic_val_compare_exchange_32(&exchangeObjRefPool, exchangeObjRefPool, 0);
(void)taosCloseRef(ref);
taosCloseRef(ref);
}
static void initRefPool() {

View File

@ -74,7 +74,7 @@ void indexCleanup() {
// refacto later
taosCleanUpScheduler(indexQhandle);
taosMemoryFreeClear(indexQhandle);
(void)taosCloseRef(indexRefMgt);
taosCloseRef(indexRefMgt);
}
typedef struct SIdxColInfo {

View File

@ -238,7 +238,7 @@ void nodesDestroyAllocatorSet() {
(void)taosRemoveRef(g_allocatorReqRefPool, refId);
pAllocator = taosIterateRef(g_allocatorReqRefPool, refId);
}
(void)taosCloseRef(g_allocatorReqRefPool);
taosCloseRef(g_allocatorReqRefPool);
}
}

View File

@ -564,7 +564,7 @@ int32_t qwSaveTbVersionInfo(qTaskInfo_t pTaskInfo, SQWTaskCtx *ctx) {
void qwCloseRef(void) {
taosWLockLatch(&gQwMgmt.lock);
if (atomic_load_32(&gQwMgmt.qwNum) <= 0 && gQwMgmt.qwRef >= 0) {
(void)taosCloseRef(gQwMgmt.qwRef); // ignore error
taosCloseRef(gQwMgmt.qwRef); // ignore error
gQwMgmt.qwRef = -1;
}
taosWUnLockLatch(&gQwMgmt.lock);

View File

@ -273,7 +273,7 @@ void schCloseJobRef(void) {
}
if (schMgmt.jobRef >= 0) {
(void)taosCloseRef(schMgmt.jobRef);
taosCloseRef(schMgmt.jobRef);
schMgmt.jobRef = -1;
}
}

View File

@ -65,9 +65,9 @@ static void streamMetaEnvInit() {
void streamMetaInit() { (void)taosThreadOnce(&streamMetaModuleInit, streamMetaEnvInit); }
void streamMetaCleanup() {
(void)taosCloseRef(streamBackendId);
(void)taosCloseRef(streamBackendCfWrapperId);
(void)taosCloseRef(streamMetaId);
taosCloseRef(streamBackendId);
taosCloseRef(streamBackendCfWrapperId);
taosCloseRef(streamMetaId);
metaRefMgtCleanup();
streamTimerCleanUp();

View File

@ -23,29 +23,12 @@ extern "C" {
#include "syncInt.h"
#define TIMER_MAX_MS 0x7FFFFFFF
#define ENV_TICK_TIMER_MS 1000
#define PING_TIMER_MS 5000
#define ELECT_TIMER_MS_MIN 2500
#define HEARTBEAT_TIMER_MS 1000
#define HEARTBEAT_TICK_NUM 20
typedef struct SSyncEnv {
uint8_t isStart;
// tick timer
tmr_h pEnvTickTimer;
int32_t envTickTimerMS;
uint64_t envTickTimerLogicClock; // if use queue, should pass logic clock into queue item
uint64_t envTickTimerLogicClockUser;
TAOS_TMR_CALLBACK FpEnvTickTimer; // Timer Fp
uint64_t envTickTimerCounter;
// timer manager
tmr_h pTimerManager;
// other resources shared by SyncNodes
// ...
tmr_h pTimerManager;
} SSyncEnv;
SSyncEnv* syncEnv();

View File

@ -24,7 +24,6 @@ extern "C" {
int32_t syncWriteCfgFile(SSyncNode *pNode);
int32_t syncReadCfgFile(SSyncNode *pNode);
int32_t syncAddCfgIndex(SSyncNode *pNode, SyncIndex cfgIndex);
#ifdef __cplusplus
}

View File

@ -75,8 +75,6 @@ int32_t syncUtilElectRandomMS(int32_t min, int32_t max);
int32_t syncUtilQuorum(int32_t replicaNum);
const char* syncStr(ESyncState state);
void syncUtilMsgHtoN(void* msg);
bool syncUtilUserPreCommit(tmsg_t msgType);
bool syncUtilUserRollback(tmsg_t msgType);
void syncUtilGenerateArbToken(int32_t nodeId, int32_t groupId, char* buf);
@ -109,7 +107,7 @@ void syncLogRecvAppendEntries(SSyncNode* pSyncNode, const SyncAppendEntries* pMs
void syncLogSendAppendEntries(SSyncNode* pSyncNode, const SyncAppendEntries* pMsg, const char* s);
void syncLogRecvRequestVote(SSyncNode* pSyncNode, const SyncRequestVote* pMsg, int32_t voteGranted, const char* s);
void syncLogSendRequestVote(SSyncNode* pNode, const SyncRequestVote* pMsg, const char* s);
void syncLogSendRequestVote(SSyncNode* pSyncNode, const SyncRequestVote* pMsg, const char* s);
void syncLogRecvRequestVoteReply(SSyncNode* pSyncNode, const SyncRequestVoteReply* pMsg, const char* s);
void syncLogSendRequestVoteReply(SSyncNode* pSyncNode, const SyncRequestVoteReply* pMsg, const char* s);

View File

@ -21,7 +21,6 @@
static SSyncEnv gSyncEnv = {0};
static int32_t gNodeRefId = -1;
static int32_t gHbDataRefId = -1;
static void syncEnvTick(void *param, void *tmrId);
SSyncEnv *syncEnv() { return &gSyncEnv; }
@ -33,67 +32,69 @@ int32_t syncInit() {
uint32_t seed = (uint32_t)(taosGetTimestampNs() & 0x00000000FFFFFFFF);
taosSeedRand(seed);
memset(&gSyncEnv, 0, sizeof(SSyncEnv));
gSyncEnv.envTickTimerCounter = 0;
gSyncEnv.envTickTimerMS = ENV_TICK_TIMER_MS;
gSyncEnv.FpEnvTickTimer = syncEnvTick;
atomic_store_64(&gSyncEnv.envTickTimerLogicClock, 0);
atomic_store_64(&gSyncEnv.envTickTimerLogicClockUser, 0);
// start tmr thread
(void)memset(&gSyncEnv, 0, sizeof(SSyncEnv));
gSyncEnv.pTimerManager = taosTmrInit(1000, 50, 10000, "SYNC-ENV");
atomic_store_8(&gSyncEnv.isStart, 1);
gNodeRefId = taosOpenRef(200, (RefFp)syncNodeClose);
if (gNodeRefId < 0) {
sError("failed to init node ref");
sError("failed to init node rset");
syncCleanUp();
return TSDB_CODE_SYN_WRONG_REF;
}
sDebug("sync node rset is open, rsetId:%d", gNodeRefId);
gHbDataRefId = taosOpenRef(200, (RefFp)syncHbTimerDataFree);
if (gHbDataRefId < 0) {
sError("failed to init hb-data ref");
sError("failed to init hbdata rset");
syncCleanUp();
return TSDB_CODE_SYN_WRONG_REF;
}
sDebug("sync rsetId:%d is open", gNodeRefId);
sDebug("sync hbdata rset is open, rsetId:%d", gHbDataRefId);
atomic_store_8(&gSyncEnv.isStart, 1);
return 0;
}
void syncCleanUp() {
atomic_store_8(&gSyncEnv.isStart, 0);
taosTmrCleanUp(gSyncEnv.pTimerManager);
memset(&gSyncEnv, 0, sizeof(SSyncEnv));
(void)memset(&gSyncEnv, 0, sizeof(SSyncEnv));
if (gNodeRefId != -1) {
sDebug("sync rsetId:%d is closed", gNodeRefId);
(void)taosCloseRef(gNodeRefId);
sDebug("sync node rset is closed, rsetId:%d", gNodeRefId);
taosCloseRef(gNodeRefId);
gNodeRefId = -1;
}
if (gHbDataRefId != -1) {
sDebug("sync rsetId:%d is closed", gHbDataRefId);
(void)taosCloseRef(gHbDataRefId);
sDebug("sync hbdata rset is closed, rsetId:%d", gHbDataRefId);
taosCloseRef(gHbDataRefId);
gHbDataRefId = -1;
}
}
int64_t syncNodeAdd(SSyncNode *pNode) {
pNode->rid = taosAddRef(gNodeRefId, pNode);
if (pNode->rid < 0) return -1;
if (pNode->rid < 0) {
return terrno = TSDB_CODE_SYN_WRONG_REF;
}
sDebug("vgId:%d, sync rid:%" PRId64 " is added to rsetId:%d", pNode->vgId, pNode->rid, gNodeRefId);
sDebug("vgId:%d, sync node refId:%" PRId64 " is added to rsetId:%d", pNode->vgId, pNode->rid, gNodeRefId);
return pNode->rid;
}
void syncNodeRemove(int64_t rid) { (void)taosRemoveRef(gNodeRefId, rid); }
void syncNodeRemove(int64_t rid) {
sDebug("sync node refId:%" PRId64 " is removed from rsetId:%d", rid, gNodeRefId);
if (rid > 0) {
(void)taosRemoveRef(gNodeRefId, rid);
}
}
SSyncNode *syncNodeAcquire(int64_t rid) {
SSyncNode *pNode = taosAcquireRef(gNodeRefId, rid);
if (pNode == NULL) {
sError("failed to acquire node from refId:%" PRId64, rid);
sError("failed to acquire sync node from refId:%" PRId64 ", rsetId:%d", rid, gNodeRefId);
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
}
@ -101,62 +102,38 @@ SSyncNode *syncNodeAcquire(int64_t rid) {
}
void syncNodeRelease(SSyncNode *pNode) {
if (pNode) (void)taosReleaseRef(gNodeRefId, pNode->rid);
if (pNode) {
(void)taosReleaseRef(gNodeRefId, pNode->rid);
}
}
int64_t syncHbTimerDataAdd(SSyncHbTimerData *pData) {
pData->rid = taosAddRef(gHbDataRefId, pData);
if (pData->rid < 0) return TSDB_CODE_SYN_WRONG_REF;
if (pData->rid < 0) {
return terrno = TSDB_CODE_SYN_WRONG_REF;
}
return pData->rid;
}
void syncHbTimerDataRemove(int64_t rid) { (void)taosRemoveRef(gHbDataRefId, rid); }
void syncHbTimerDataRemove(int64_t rid) {
if (rid > 0) {
(void)taosRemoveRef(gHbDataRefId, rid);
}
}
SSyncHbTimerData *syncHbTimerDataAcquire(int64_t rid) {
SSyncHbTimerData *pData = taosAcquireRef(gHbDataRefId, rid);
if (pData == NULL && rid > 0) {
sInfo("failed to acquire hb-timer-data from refId:%" PRId64, rid);
sInfo("failed to acquire hbdata from refId:%" PRId64 ", rsetId:%d", rid, gHbDataRefId);
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
}
return pData;
}
void syncHbTimerDataRelease(SSyncHbTimerData *pData) { (void)taosReleaseRef(gHbDataRefId, pData->rid); }
#if 0
void syncEnvStartTimer() {
taosTmrReset(gSyncEnv.FpEnvTickTimer, gSyncEnv.envTickTimerMS, &gSyncEnv, gSyncEnv.pTimerManager,
&gSyncEnv.pEnvTickTimer);
atomic_store_64(&gSyncEnv.envTickTimerLogicClock, gSyncEnv.envTickTimerLogicClockUser);
}
void syncEnvStopTimer() {
int32_t ret = 0;
atomic_add_fetch_64(&gSyncEnv.envTickTimerLogicClockUser, 1);
taosTmrStop(gSyncEnv.pEnvTickTimer);
gSyncEnv.pEnvTickTimer = NULL;
return ret;
}
#endif
static void syncEnvTick(void *param, void *tmrId) {
#if 0
SSyncEnv *pSyncEnv = param;
if (atomic_load_64(&gSyncEnv.envTickTimerLogicClockUser) <= atomic_load_64(&gSyncEnv.envTickTimerLogicClock)) {
gSyncEnv.envTickTimerCounter++;
sTrace("syncEnvTick do ... envTickTimerLogicClockUser:%" PRIu64 ", envTickTimerLogicClock:%" PRIu64
", envTickTimerCounter:%" PRIu64 ", envTickTimerMS:%d, tmrId:%p",
gSyncEnv.envTickTimerLogicClockUser, gSyncEnv.envTickTimerLogicClock, gSyncEnv.envTickTimerCounter,
gSyncEnv.envTickTimerMS, tmrId);
// do something, tick ...
taosTmrReset(syncEnvTick, gSyncEnv.envTickTimerMS, pSyncEnv, gSyncEnv.pTimerManager, &gSyncEnv.pEnvTickTimer);
} else {
sTrace("syncEnvTick pass ... envTickTimerLogicClockUser:%" PRIu64 ", envTickTimerLogicClock:%" PRIu64
", envTickTimerCounter:%" PRIu64 ", envTickTimerMS:%d, tmrId:%p",
gSyncEnv.envTickTimerLogicClockUser, gSyncEnv.envTickTimerLogicClock, gSyncEnv.envTickTimerCounter,
gSyncEnv.envTickTimerMS, tmrId);
void syncHbTimerDataRelease(SSyncHbTimerData *pData) {
if (pData) {
(void)taosReleaseRef(gHbDataRefId, pData->rid);
}
#endif
}

View File

@ -50,7 +50,7 @@ static int32_t syncHbTimerStart(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer);
static int32_t syncHbTimerStop(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer);
static int32_t syncNodeUpdateNewConfigIndex(SSyncNode* ths, SSyncCfg* pNewCfg);
static bool syncNodeInConfig(SSyncNode* pSyncNode, const SSyncCfg* config);
static void syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* newConfig, SyncIndex lastConfigChangeIndex);
static int32_t syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* newConfig, SyncIndex lastConfigChangeIndex);
static bool syncNodeIsOptimizedOneReplica(SSyncNode* ths, SRpcMsg* pMsg);
static bool syncNodeCanChange(SSyncNode* pSyncNode);
@ -182,7 +182,12 @@ int32_t syncReconfig(int64_t rid, SSyncCfg* pNewCfg) {
}
TAOS_CHECK_RETURN(syncNodeUpdateNewConfigIndex(pSyncNode, pNewCfg));
syncNodeDoConfigChange(pSyncNode, pNewCfg, pNewCfg->lastIndex);
if (syncNodeDoConfigChange(pSyncNode, pNewCfg, pNewCfg->lastIndex) != 0) {
code = TSDB_CODE_SYN_NEW_CONFIG_ERROR;
sError("vgId:%d, failed to reconfig since do change error", pSyncNode->vgId);
TAOS_RETURN(code);
}
if (pSyncNode->state == TAOS_SYNC_STATE_LEADER || pSyncNode->state == TAOS_SYNC_STATE_ASSIGNED_LEADER) {
// TODO check return value
@ -1015,7 +1020,7 @@ SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo, int32_t vnodeVersion) {
if (!taosDirExist((char*)(pSyncInfo->path))) {
if (taosMkDir(pSyncInfo->path) != 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
sError("failed to create dir:%s since %s", pSyncInfo->path, terrstr());
sError("vgId:%d, failed to create dir:%s since %s", pSyncInfo->vgId, pSyncInfo->path, terrstr());
goto _error;
}
}
@ -1108,37 +1113,6 @@ SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo, int32_t vnodeVersion) {
goto _error;
}
// init internal
pSyncNode->myNodeInfo = pSyncNode->raftCfg.cfg.nodeInfo[pSyncNode->raftCfg.cfg.myIndex];
if (!syncUtilNodeInfo2RaftId(&pSyncNode->myNodeInfo, pSyncNode->vgId, &pSyncNode->myRaftId)) {
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
sError("vgId:%d, failed to determine my raft member id", pSyncNode->vgId);
goto _error;
}
pSyncNode->arbTerm = -1;
(void)taosThreadMutexInit(&pSyncNode->arbTokenMutex, NULL);
syncUtilGenerateArbToken(pSyncNode->myNodeInfo.nodeId, pSyncInfo->vgId, pSyncNode->arbToken);
sInfo("vgId:%d, arb token:%s", pSyncNode->vgId, pSyncNode->arbToken);
// init peersNum, peers, peersId
pSyncNode->peersNum = pSyncNode->raftCfg.cfg.totalReplicaNum - 1;
int32_t j = 0;
for (int32_t i = 0; i < pSyncNode->raftCfg.cfg.totalReplicaNum; ++i) {
if (i != pSyncNode->raftCfg.cfg.myIndex) {
pSyncNode->peersNodeInfo[j] = pSyncNode->raftCfg.cfg.nodeInfo[i];
syncUtilNodeInfo2EpSet(&pSyncNode->peersNodeInfo[j], &pSyncNode->peersEpset[j]);
j++;
}
}
for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
if (!syncUtilNodeInfo2RaftId(&pSyncNode->peersNodeInfo[i], pSyncNode->vgId, &pSyncNode->peersId[i])) {
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
sError("vgId:%d, failed to determine raft member id, peer:%d", pSyncNode->vgId, i);
goto _error;
}
}
// init replicaNum, replicasId
pSyncNode->replicaNum = pSyncNode->raftCfg.cfg.replicaNum;
pSyncNode->totalReplicaNum = pSyncNode->raftCfg.cfg.totalReplicaNum;
@ -1150,6 +1124,27 @@ SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo, int32_t vnodeVersion) {
}
}
// init internal
pSyncNode->myNodeInfo = pSyncNode->raftCfg.cfg.nodeInfo[pSyncNode->raftCfg.cfg.myIndex];
pSyncNode->myRaftId = pSyncNode->replicasId[pSyncNode->raftCfg.cfg.myIndex];
// init peersNum, peers, peersId
pSyncNode->peersNum = pSyncNode->raftCfg.cfg.totalReplicaNum - 1;
int32_t j = 0;
for (int32_t i = 0; i < pSyncNode->raftCfg.cfg.totalReplicaNum; ++i) {
if (i != pSyncNode->raftCfg.cfg.myIndex) {
pSyncNode->peersNodeInfo[j] = pSyncNode->raftCfg.cfg.nodeInfo[i];
pSyncNode->peersId[j] = pSyncNode->replicasId[i];
syncUtilNodeInfo2EpSet(&pSyncNode->peersNodeInfo[j], &pSyncNode->peersEpset[j]);
j++;
}
}
pSyncNode->arbTerm = -1;
(void)taosThreadMutexInit(&pSyncNode->arbTokenMutex, NULL);
syncUtilGenerateArbToken(pSyncNode->myNodeInfo.nodeId, pSyncInfo->vgId, pSyncNode->arbToken);
sInfo("vgId:%d, generate arb token:%s", pSyncNode->vgId, pSyncNode->arbToken);
// init raft algorithm
pSyncNode->pFsm = pSyncInfo->pFsm;
pSyncInfo->pFsm = NULL;
@ -1766,11 +1761,11 @@ static bool syncIsConfigChanged(const SSyncCfg* pOldCfg, const SSyncCfg* pNewCfg
return false;
}
void syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncIndex lastConfigChangeIndex) {
int32_t syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncIndex lastConfigChangeIndex) {
SSyncCfg oldConfig = pSyncNode->raftCfg.cfg;
if (!syncIsConfigChanged(&oldConfig, pNewConfig)) {
sInfo("vgId:1, sync not reconfig since not changed");
return;
return 0;
}
pSyncNode->raftCfg.cfg = *pNewConfig;
@ -1809,7 +1804,15 @@ void syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncInde
}
// add last config index
(void)syncAddCfgIndex(pSyncNode, lastConfigChangeIndex);
SRaftCfg* pCfg = &pSyncNode->raftCfg;
if (pCfg->configIndexCount >= MAX_CONFIG_INDEX_COUNT) {
sNError(pSyncNode, "failed to add cfg index:%d since out of range", pCfg->configIndexCount);
terrno = TSDB_CODE_OUT_OF_RANGE;
return -1;
}
pCfg->configIndexArr[pCfg->configIndexCount] = lastConfigChangeIndex;
pCfg->configIndexCount++;
if (IamInNew) {
//-----------------------------------------
@ -1924,6 +1927,7 @@ void syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncInde
_END:
// log end config change
sNInfo(pSyncNode, "end do config change, from %d to %d", oldConfig.totalReplicaNum, pNewConfig->totalReplicaNum);
return 0;
}
// raft state change --------------

View File

@ -18,7 +18,7 @@
#include "syncUtil.h"
#include "tjson.h"
const char *syncRoleToStr(ESyncRole role) {
static const char *syncRoleToStr(ESyncRole role) {
switch (role) {
case TAOS_SYNC_ROLE_VOTER:
return "true";
@ -29,15 +29,14 @@ const char *syncRoleToStr(ESyncRole role) {
}
}
const ESyncRole syncStrToRole(char *str) {
static const ESyncRole syncStrToRole(char *str) {
if (strcmp(str, "true") == 0) {
return TAOS_SYNC_ROLE_VOTER;
}
if (strcmp(str, "false") == 0) {
} else if (strcmp(str, "false") == 0) {
return TAOS_SYNC_ROLE_LEARNER;
} else {
return TAOS_SYNC_ROLE_ERROR;
}
return TAOS_SYNC_ROLE_ERROR;
}
static int32_t syncEncodeSyncCfg(const void *pObj, SJson *pJson) {
@ -52,10 +51,12 @@ static int32_t syncEncodeSyncCfg(const void *pObj, SJson *pJson) {
if (nodeInfo == NULL) {
TAOS_CHECK_EXIT(TSDB_CODE_OUT_OF_MEMORY);
}
if ((code = tjsonAddItemToObject(pJson, "nodeInfo", nodeInfo)) < 0) {
tjsonDelete(nodeInfo);
TAOS_CHECK_EXIT(code);
}
for (int32_t i = 0; i < pCfg->totalReplicaNum; ++i) {
SJson *info = tjsonCreateObject();
if (info == NULL) {
@ -68,20 +69,25 @@ static int32_t syncEncodeSyncCfg(const void *pObj, SJson *pJson) {
TAOS_CHECK_GOTO(tjsonAddStringToObject(info, "isReplica", syncRoleToStr(pCfg->nodeInfo[i].nodeRole)), NULL, _err);
TAOS_CHECK_GOTO(tjsonAddItemToArray(nodeInfo, info), NULL, _err);
continue;
_err:
tjsonDelete(info);
break;
}
_exit:
if (code < 0) {
sError("failed to encode sync cfg at line %d since %s", lino, tstrerror(code));
}
TAOS_RETURN(code);
}
static int32_t syncEncodeRaftCfg(const void *pObj, SJson *pJson) {
SRaftCfg *pCfg = (SRaftCfg *)pObj;
int32_t code = 0, lino = 0;
int32_t code = 0;
int32_t lino = 0;
TAOS_CHECK_EXIT(tjsonAddObject(pJson, "SSyncCfg", syncEncodeSyncCfg, (void *)&pCfg->cfg));
TAOS_CHECK_EXIT(tjsonAddDoubleToObject(pJson, "isStandBy", pCfg->isStandBy));
TAOS_CHECK_EXIT(tjsonAddDoubleToObject(pJson, "snapshotStrategy", pCfg->snapshotStrategy));
@ -93,10 +99,12 @@ static int32_t syncEncodeRaftCfg(const void *pObj, SJson *pJson) {
if (configIndexArr == NULL) {
TAOS_CHECK_EXIT(TSDB_CODE_OUT_OF_MEMORY);
}
if ((code = tjsonAddItemToObject(pJson, "configIndexArr", configIndexArr)) < 0) {
tjsonDelete(configIndexArr);
TAOS_CHECK_EXIT(code);
}
for (int32_t i = 0; i < pCfg->configIndexCount; ++i) {
SJson *configIndex = tjsonCreateObject();
if (configIndex == NULL) {
@ -105,14 +113,17 @@ static int32_t syncEncodeRaftCfg(const void *pObj, SJson *pJson) {
TAOS_CHECK_EXIT(tjsonAddIntegerToObject(configIndex, "index", pCfg->configIndexArr[i]));
TAOS_CHECK_EXIT(tjsonAddItemToArray(configIndexArr, configIndex));
continue;
_err:
tjsonDelete(configIndex);
break;
}
_exit:
if (code < 0) {
sError("failed to encode raft cfg at line %d since %s", lino, tstrerror(code));
}
TAOS_RETURN(code);
}
@ -124,11 +135,13 @@ int32_t syncWriteCfgFile(SSyncNode *pNode) {
const char *realfile = pNode->configPath;
SRaftCfg *pCfg = &pNode->raftCfg;
char file[PATH_MAX] = {0};
(void)snprintf(file, sizeof(file), "%s.bak", realfile);
if ((pJson = tjsonCreateObject()) == NULL) {
TAOS_CHECK_EXIT(TSDB_CODE_OUT_OF_MEMORY);
}
TAOS_CHECK_EXIT(tjsonAddObject(pJson, "RaftCfg", syncEncodeRaftCfg, pCfg));
buffer = tjsonToString(pJson);
if (buffer == NULL) {
@ -145,6 +158,7 @@ int32_t syncWriteCfgFile(SSyncNode *pNode) {
if (taosWriteFile(pFile, buffer, len) <= 0) {
TAOS_CHECK_EXIT(TAOS_SYSTEM_ERROR(errno));
}
if (taosFsyncFile(pFile) < 0) {
TAOS_CHECK_EXIT(TAOS_SYSTEM_ERROR(errno));
}
@ -165,6 +179,7 @@ _exit:
if (code != 0) {
sError("vgId:%d, failed to write sync cfg file:%s since %s", pNode->vgId, realfile, tstrerror(code));
}
TAOS_RETURN(code);
}
@ -232,6 +247,7 @@ static int32_t syncDecodeRaftCfg(const SJson *pJson, void *pObj) {
tjsonGetNumberValue(configIndex, "index", pCfg->configIndexArr[i], code);
if (code < 0) return TSDB_CODE_INVALID_JSON_FORMAT;
}
return 0;
}
@ -292,16 +308,6 @@ _OVER:
if (code != 0) {
sError("vgId:%d, failed to read sync cfg file:%s since %s", pNode->vgId, file, tstrerror(code));
}
TAOS_RETURN(code);
}
int32_t syncAddCfgIndex(SSyncNode *pNode, SyncIndex cfgIndex) {
SRaftCfg *pCfg = &pNode->raftCfg;
if (pCfg->configIndexCount >= MAX_CONFIG_INDEX_COUNT) {
return TSDB_CODE_OUT_OF_RANGE;
}
pCfg->configIndexArr[pCfg->configIndexCount] = cfgIndex;
pCfg->configIndexCount++;
return 0;
}

View File

@ -23,7 +23,7 @@
#include "syncSnapshot.h"
#include "tglobal.h"
void syncCfg2SimpleStr(const SSyncCfg* pCfg, char* buf, int32_t bufLen) {
static void syncCfg2SimpleStr(const SSyncCfg* pCfg, char* buf, int32_t bufLen) {
int32_t len = snprintf(buf, bufLen, "{num:%d, as:%d, [", pCfg->replicaNum, pCfg->myIndex);
for (int32_t i = 0; i < pCfg->replicaNum; ++i) {
len += snprintf(buf + len, bufLen - len, "%s:%d", pCfg->nodeInfo[i].nodeFqdn, pCfg->nodeInfo[i].nodePort);
@ -43,14 +43,11 @@ void syncUtilNodeInfo2EpSet(const SNodeInfo* pInfo, SEpSet* pEpSet) {
bool syncUtilNodeInfo2RaftId(const SNodeInfo* pInfo, SyncGroupId vgId, SRaftId* raftId) {
uint32_t ipv4 = 0xFFFFFFFF;
sDebug(
"vgId:%d, start to resolve sync addr fqdn in %d seconds, "
"dnode:%d cluster:%" PRId64 " fqdn:%s port:%u ",
vgId, tsResolveFQDNRetryTime, pInfo->nodeId, pInfo->clusterId, pInfo->nodeFqdn, pInfo->nodePort);
for (int i = 0; i < tsResolveFQDNRetryTime; i++) {
sDebug("vgId:%d, resolve sync addr from fqdn, ep:%s:%u", vgId, pInfo->nodeFqdn, pInfo->nodePort);
for (int32_t i = 0; i < tsResolveFQDNRetryTime; i++) {
int32_t code = taosGetIpv4FromFqdn(pInfo->nodeFqdn, &ipv4);
if (code) {
sError("failed to resolve ipv4 addr, fqdn:%s, wait one second", pInfo->nodeFqdn);
sError("vgId:%d, failed to resolve sync addr, dnode:%d fqdn:%s, retry", vgId, pInfo->nodeId, pInfo->nodeFqdn);
taosSsleep(1);
} else {
break;
@ -58,7 +55,7 @@ bool syncUtilNodeInfo2RaftId(const SNodeInfo* pInfo, SyncGroupId vgId, SRaftId*
}
if (ipv4 == 0xFFFFFFFF || ipv4 == 1) {
sError("failed to resolve ipv4 addr, fqdn:%s", pInfo->nodeFqdn);
sError("vgId:%d, failed to resolve sync addr, dnode:%d fqdn:%s", vgId, pInfo->nodeId, pInfo->nodeFqdn);
terrno = TSDB_CODE_TSC_INVALID_FQDN;
return false;
}
@ -68,14 +65,20 @@ bool syncUtilNodeInfo2RaftId(const SNodeInfo* pInfo, SyncGroupId vgId, SRaftId*
raftId->addr = SYNC_ADDR(pInfo);
raftId->vgId = vgId;
sInfo("vgId:%d, sync addr:%" PRIu64 ", dnode:%d cluster:%" PRId64 " fqdn:%s ip:%s port:%u ipv4:%u", vgId,
raftId->addr, pInfo->nodeId, pInfo->clusterId, pInfo->nodeFqdn, ipbuf, pInfo->nodePort, ipv4);
sInfo("vgId:%d, sync addr:%" PRIu64 " is resolved, ep:%s:%u ip:%s ipv4:%u dnode:%d cluster:%" PRId64, vgId,
raftId->addr, pInfo->nodeFqdn, pInfo->nodePort, ipbuf, ipv4, pInfo->nodeId, pInfo->clusterId);
return true;
}
bool syncUtilSameId(const SRaftId* pId1, const SRaftId* pId2) {
if (pId1->addr == pId2->addr && pId1->vgId == pId2->vgId) return true;
if ((CID(pId1) == 0 || CID(pId2) == 0) && (DID(pId1) == DID(pId2)) && pId1->vgId == pId2->vgId) return true;
if (pId1->addr == pId2->addr && pId1->vgId == pId2->vgId) {
return true;
}
if ((CID(pId1) == 0 || CID(pId2) == 0) && (DID(pId1) == DID(pId2)) && pId1->vgId == pId2->vgId) {
return true;
}
return false;
}
@ -98,10 +101,6 @@ void syncUtilMsgHtoN(void* msg) {
pHead->vgId = htonl(pHead->vgId);
}
bool syncUtilUserPreCommit(tmsg_t msgType) { return msgType != TDMT_SYNC_NOOP && msgType != TDMT_SYNC_LEADER_TRANSFER; }
bool syncUtilUserRollback(tmsg_t msgType) { return msgType != TDMT_SYNC_NOOP && msgType != TDMT_SYNC_LEADER_TRANSFER; }
void syncUtilGenerateArbToken(int32_t nodeId, int32_t groupId, char* buf) {
(void)memset(buf, 0, TSDB_ARB_TOKEN_SIZE);
int32_t randVal = taosSafeRand() % 1000;
@ -142,18 +141,18 @@ static void syncLogBufferStates2Str(SSyncNode* pSyncNode, char* buf, int32_t buf
if (pBuf == NULL) {
return;
}
int len = 0;
int32_t len = 0;
len += snprintf(buf + len, bufLen - len, "[%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")", pBuf->startIndex,
pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex);
}
static void syncLogReplStates2Str(SSyncNode* pSyncNode, char* buf, int32_t bufLen) {
int len = 0;
int32_t len = 0;
len += snprintf(buf + len, bufLen - len, "%s", "{");
for (int32_t i = 0; i < pSyncNode->replicaNum; i++) {
SSyncLogReplMgr* pMgr = pSyncNode->logReplMgrs[i];
if (pMgr == NULL) break;
len += snprintf(buf + len, bufLen - len, "%d:%d [%" PRId64 " %" PRId64 ", %" PRId64 ")", i, pMgr->restored,
len += snprintf(buf + len, bufLen - len, "%d:%d [%" PRId64 " %" PRId64 ", %" PRId64 "]", i, pMgr->restored,
pMgr->startIndex, pMgr->matchIndex, pMgr->endIndex);
if (i + 1 < pSyncNode->replicaNum) {
len += snprintf(buf + len, bufLen - len, "%s", ", ");
@ -280,7 +279,7 @@ void syncPrintSnapshotSenderLog(const char* flags, ELogLevel level, int32_t dfla
" end:%" PRId64 " last-index:%" PRId64 " last-term:%" PRId64 " last-cfg:%" PRId64
", seq:%d, ack:%d, "
" buf:[%" PRId64 " %" PRId64 ", %" PRId64
"), finish:%d, as:%d, to-dnode:%d}"
"], finish:%d, as:%d, to-dnode:%d}"
", term:%" PRIu64 ", commit-index:%" PRId64 ", firstver:%" PRId64 ", lastver:%" PRId64
", min-match:%" PRId64 ", snap:{last-index:%" PRId64 ", term:%" PRIu64
"}, standby:%d, batch-sz:%d, replicas:%d, last-cfg:%" PRId64

View File

@ -736,7 +736,7 @@ int32_t transOpenRefMgt(int size, void (*func)(void*)) {
}
void transCloseRefMgt(int32_t mgt) {
// close ref
(void)taosCloseRef(mgt);
taosCloseRef(mgt);
}
int64_t transAddExHandle(int32_t refMgt, void* p) {
// acquire extern handle

View File

@ -75,7 +75,7 @@ void walCleanUp() {
if (old == 1) {
walStopThread();
TAOS_UNUSED(taosCloseRef(tsWal.refSetId));
taosCloseRef(tsWal.refSetId);
wInfo("wal module is cleaned up");
atomic_store_8(&tsWal.inited, 0);
}

View File

@ -110,13 +110,13 @@ int32_t taosOpenRef(int32_t max, RefFp fp) {
return rsetId;
}
int32_t taosCloseRef(int32_t rsetId) {
void taosCloseRef(int32_t rsetId) {
SRefSet *pSet;
int32_t deleted = 0;
if (rsetId < 0 || rsetId >= TSDB_REF_OBJECTS) {
uTrace("rsetId:%d is invalid, out of range", rsetId);
return terrno = TSDB_CODE_REF_INVALID_ID;
return;
}
pSet = tsRefSetList + rsetId;
@ -134,8 +134,6 @@ int32_t taosCloseRef(int32_t rsetId) {
(void)taosThreadMutexUnlock(&tsRefMutex);
if (deleted) taosDecRsetCount(pSet);
return 0;
}
int64_t taosAddRef(int32_t rsetId, void *p) {