From 1f1a2d9f6c7f464ad31d941817c4f369609eee05 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Tue, 20 Aug 2024 09:54:26 +0800 Subject: [PATCH] fix: handle error code --- include/libs/sync/sync.h | 30 ++++++++++----------- source/libs/sync/inc/syncRaftCfg.h | 1 - source/libs/sync/inc/syncUtil.h | 2 +- source/libs/sync/src/syncMain.c | 26 +++++++++++++----- source/libs/sync/src/syncRaftCfg.c | 42 +++++++++++++++++------------- 5 files changed, 60 insertions(+), 41 deletions(-) diff --git a/include/libs/sync/sync.h b/include/libs/sync/sync.h index 1fb077e3ca..07d56f9b07 100644 --- a/include/libs/sync/sync.h +++ b/include/libs/sync/sync.h @@ -156,11 +156,11 @@ typedef struct SSnapshotParam { typedef struct SSnapshot { int32_t type; - SSyncTLV* data; + SSyncTLV* data; ESyncFsmState state; - SyncIndex lastApplyIndex; - SyncTerm lastApplyTerm; - SyncIndex lastConfigIndex; + SyncIndex lastApplyIndex; + SyncTerm lastApplyTerm; + SyncIndex lastConfigIndex; } SSnapshot; typedef struct SSnapshotMeta { @@ -263,16 +263,16 @@ typedef struct SSyncState { int64_t startTimeMs; } SSyncState; -int32_t syncInit(); -void syncCleanUp(); -int64_t syncOpen(SSyncInfo* pSyncInfo, int32_t vnodeVersion); -int32_t syncStart(int64_t rid); -void syncStop(int64_t rid); -void syncPreStop(int64_t rid); -void syncPostStop(int64_t rid); -int32_t syncPropose(int64_t rid, SRpcMsg* pMsg, bool isWeak, int64_t* seq); -int32_t syncCheckMember(int64_t rid); -int32_t syncIsCatchUp(int64_t rid); +int32_t syncInit(); +void syncCleanUp(); +int64_t syncOpen(SSyncInfo* pSyncInfo, int32_t vnodeVersion); +int32_t syncStart(int64_t rid); +void syncStop(int64_t rid); +void syncPreStop(int64_t rid); +void syncPostStop(int64_t rid); +int32_t syncPropose(int64_t rid, SRpcMsg* pMsg, bool isWeak, int64_t* seq); +int32_t syncCheckMember(int64_t rid); +int32_t syncIsCatchUp(int64_t rid); ESyncRole syncGetRole(int64_t rid); int64_t syncGetTerm(int64_t rid); int32_t syncProcessMsg(int64_t rid, SRpcMsg* pMsg); @@ -296,7 +296,7 @@ int32_t syncGetAssignedLogSynced(int64_t rid); void syncGetRetryEpSet(int64_t rid, SEpSet* pEpSet); const char* syncStr(ESyncState state); -int32_t syncNodeGetConfig(int64_t rid, SSyncCfg *cfg); +int32_t syncNodeGetConfig(int64_t rid, SSyncCfg* cfg); // util int32_t syncSnapInfoDataRealloc(SSnapshot* pSnap, int32_t size); diff --git a/source/libs/sync/inc/syncRaftCfg.h b/source/libs/sync/inc/syncRaftCfg.h index 4f03a60fbc..2c1626c4e8 100644 --- a/source/libs/sync/inc/syncRaftCfg.h +++ b/source/libs/sync/inc/syncRaftCfg.h @@ -24,7 +24,6 @@ extern "C" { int32_t syncWriteCfgFile(SSyncNode *pNode); int32_t syncReadCfgFile(SSyncNode *pNode); -int32_t syncAddCfgIndex(SSyncNode *pNode, SyncIndex cfgIndex); #ifdef __cplusplus } diff --git a/source/libs/sync/inc/syncUtil.h b/source/libs/sync/inc/syncUtil.h index 555607d40c..2b5b818da3 100644 --- a/source/libs/sync/inc/syncUtil.h +++ b/source/libs/sync/inc/syncUtil.h @@ -107,7 +107,7 @@ void syncLogRecvAppendEntries(SSyncNode* pSyncNode, const SyncAppendEntries* pMs void syncLogSendAppendEntries(SSyncNode* pSyncNode, const SyncAppendEntries* pMsg, const char* s); void syncLogRecvRequestVote(SSyncNode* pSyncNode, const SyncRequestVote* pMsg, int32_t voteGranted, const char* s); -void syncLogSendRequestVote(SSyncNode* pNode, const SyncRequestVote* pMsg, const char* s); +void syncLogSendRequestVote(SSyncNode* pSyncNode, const SyncRequestVote* pMsg, const char* s); void syncLogRecvRequestVoteReply(SSyncNode* pSyncNode, const SyncRequestVoteReply* pMsg, const char* s); void syncLogSendRequestVoteReply(SSyncNode* pSyncNode, const SyncRequestVoteReply* pMsg, const char* s); diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 5465007b18..8b8a9e1279 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -50,7 +50,7 @@ static int32_t syncHbTimerStart(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer); static int32_t syncHbTimerStop(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer); static int32_t syncNodeUpdateNewConfigIndex(SSyncNode* ths, SSyncCfg* pNewCfg); static bool syncNodeInConfig(SSyncNode* pSyncNode, const SSyncCfg* config); -static void syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* newConfig, SyncIndex lastConfigChangeIndex); +static int32_t syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* newConfig, SyncIndex lastConfigChangeIndex); static bool syncNodeIsOptimizedOneReplica(SSyncNode* ths, SRpcMsg* pMsg); static bool syncNodeCanChange(SSyncNode* pSyncNode); @@ -182,7 +182,12 @@ int32_t syncReconfig(int64_t rid, SSyncCfg* pNewCfg) { } TAOS_CHECK_RETURN(syncNodeUpdateNewConfigIndex(pSyncNode, pNewCfg)); - syncNodeDoConfigChange(pSyncNode, pNewCfg, pNewCfg->lastIndex); + + if (syncNodeDoConfigChange(pSyncNode, pNewCfg, pNewCfg->lastIndex) != 0) { + code = TSDB_CODE_SYN_NEW_CONFIG_ERROR; + sError("vgId:%d, failed to reconfig since do change error", pSyncNode->vgId); + TAOS_RETURN(code); + } if (pSyncNode->state == TAOS_SYNC_STATE_LEADER || pSyncNode->state == TAOS_SYNC_STATE_ASSIGNED_LEADER) { // TODO check return value @@ -1015,7 +1020,7 @@ SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo, int32_t vnodeVersion) { if (!taosDirExist((char*)(pSyncInfo->path))) { if (taosMkDir(pSyncInfo->path) != 0) { terrno = TAOS_SYSTEM_ERROR(errno); - sError("failed to create dir:%s since %s", pSyncInfo->path, terrstr()); + sError("vgId:%d, failed to create dir:%s since %s", pSyncInfo->vgId, pSyncInfo->path, terrstr()); goto _error; } } @@ -1766,11 +1771,11 @@ static bool syncIsConfigChanged(const SSyncCfg* pOldCfg, const SSyncCfg* pNewCfg return false; } -void syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncIndex lastConfigChangeIndex) { +int32_t syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncIndex lastConfigChangeIndex) { SSyncCfg oldConfig = pSyncNode->raftCfg.cfg; if (!syncIsConfigChanged(&oldConfig, pNewConfig)) { sInfo("vgId:1, sync not reconfig since not changed"); - return; + return 0; } pSyncNode->raftCfg.cfg = *pNewConfig; @@ -1809,7 +1814,15 @@ void syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncInde } // add last config index - (void)syncAddCfgIndex(pSyncNode, lastConfigChangeIndex); + SRaftCfg* pCfg = &pSyncNode->raftCfg; + if (pCfg->configIndexCount >= MAX_CONFIG_INDEX_COUNT) { + sNError(pSyncNode, "failed to add cfg index:%d since out of range", pCfg->configIndexCount); + terrno = TSDB_CODE_OUT_OF_RANGE; + return -1; + } + + pCfg->configIndexArr[pCfg->configIndexCount] = lastConfigChangeIndex; + pCfg->configIndexCount++; if (IamInNew) { //----------------------------------------- @@ -1924,6 +1937,7 @@ void syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncInde _END: // log end config change sNInfo(pSyncNode, "end do config change, from %d to %d", oldConfig.totalReplicaNum, pNewConfig->totalReplicaNum); + return 0; } // raft state change -------------- diff --git a/source/libs/sync/src/syncRaftCfg.c b/source/libs/sync/src/syncRaftCfg.c index b0e6abffc6..82cc86ed86 100644 --- a/source/libs/sync/src/syncRaftCfg.c +++ b/source/libs/sync/src/syncRaftCfg.c @@ -18,7 +18,7 @@ #include "syncUtil.h" #include "tjson.h" -const char *syncRoleToStr(ESyncRole role) { +static const char *syncRoleToStr(ESyncRole role) { switch (role) { case TAOS_SYNC_ROLE_VOTER: return "true"; @@ -29,15 +29,14 @@ const char *syncRoleToStr(ESyncRole role) { } } -const ESyncRole syncStrToRole(char *str) { +static const ESyncRole syncStrToRole(char *str) { if (strcmp(str, "true") == 0) { return TAOS_SYNC_ROLE_VOTER; - } - if (strcmp(str, "false") == 0) { + } else if (strcmp(str, "false") == 0) { return TAOS_SYNC_ROLE_LEARNER; + } else { + return TAOS_SYNC_ROLE_ERROR; } - - return TAOS_SYNC_ROLE_ERROR; } static int32_t syncEncodeSyncCfg(const void *pObj, SJson *pJson) { @@ -52,10 +51,12 @@ static int32_t syncEncodeSyncCfg(const void *pObj, SJson *pJson) { if (nodeInfo == NULL) { TAOS_CHECK_EXIT(TSDB_CODE_OUT_OF_MEMORY); } + if ((code = tjsonAddItemToObject(pJson, "nodeInfo", nodeInfo)) < 0) { tjsonDelete(nodeInfo); TAOS_CHECK_EXIT(code); } + for (int32_t i = 0; i < pCfg->totalReplicaNum; ++i) { SJson *info = tjsonCreateObject(); if (info == NULL) { @@ -68,20 +69,25 @@ static int32_t syncEncodeSyncCfg(const void *pObj, SJson *pJson) { TAOS_CHECK_GOTO(tjsonAddStringToObject(info, "isReplica", syncRoleToStr(pCfg->nodeInfo[i].nodeRole)), NULL, _err); TAOS_CHECK_GOTO(tjsonAddItemToArray(nodeInfo, info), NULL, _err); continue; + _err: tjsonDelete(info); break; } + _exit: if (code < 0) { sError("failed to encode sync cfg at line %d since %s", lino, tstrerror(code)); } + TAOS_RETURN(code); } static int32_t syncEncodeRaftCfg(const void *pObj, SJson *pJson) { SRaftCfg *pCfg = (SRaftCfg *)pObj; - int32_t code = 0, lino = 0; + int32_t code = 0; + int32_t lino = 0; + TAOS_CHECK_EXIT(tjsonAddObject(pJson, "SSyncCfg", syncEncodeSyncCfg, (void *)&pCfg->cfg)); TAOS_CHECK_EXIT(tjsonAddDoubleToObject(pJson, "isStandBy", pCfg->isStandBy)); TAOS_CHECK_EXIT(tjsonAddDoubleToObject(pJson, "snapshotStrategy", pCfg->snapshotStrategy)); @@ -93,10 +99,12 @@ static int32_t syncEncodeRaftCfg(const void *pObj, SJson *pJson) { if (configIndexArr == NULL) { TAOS_CHECK_EXIT(TSDB_CODE_OUT_OF_MEMORY); } + if ((code = tjsonAddItemToObject(pJson, "configIndexArr", configIndexArr)) < 0) { tjsonDelete(configIndexArr); TAOS_CHECK_EXIT(code); } + for (int32_t i = 0; i < pCfg->configIndexCount; ++i) { SJson *configIndex = tjsonCreateObject(); if (configIndex == NULL) { @@ -105,14 +113,17 @@ static int32_t syncEncodeRaftCfg(const void *pObj, SJson *pJson) { TAOS_CHECK_EXIT(tjsonAddIntegerToObject(configIndex, "index", pCfg->configIndexArr[i])); TAOS_CHECK_EXIT(tjsonAddItemToArray(configIndexArr, configIndex)); continue; + _err: tjsonDelete(configIndex); break; } + _exit: if (code < 0) { sError("failed to encode raft cfg at line %d since %s", lino, tstrerror(code)); } + TAOS_RETURN(code); } @@ -124,11 +135,13 @@ int32_t syncWriteCfgFile(SSyncNode *pNode) { const char *realfile = pNode->configPath; SRaftCfg *pCfg = &pNode->raftCfg; char file[PATH_MAX] = {0}; + (void)snprintf(file, sizeof(file), "%s.bak", realfile); if ((pJson = tjsonCreateObject()) == NULL) { TAOS_CHECK_EXIT(TSDB_CODE_OUT_OF_MEMORY); } + TAOS_CHECK_EXIT(tjsonAddObject(pJson, "RaftCfg", syncEncodeRaftCfg, pCfg)); buffer = tjsonToString(pJson); if (buffer == NULL) { @@ -145,6 +158,7 @@ int32_t syncWriteCfgFile(SSyncNode *pNode) { if (taosWriteFile(pFile, buffer, len) <= 0) { TAOS_CHECK_EXIT(TAOS_SYSTEM_ERROR(errno)); } + if (taosFsyncFile(pFile) < 0) { TAOS_CHECK_EXIT(TAOS_SYSTEM_ERROR(errno)); } @@ -165,6 +179,7 @@ _exit: if (code != 0) { sError("vgId:%d, failed to write sync cfg file:%s since %s", pNode->vgId, realfile, tstrerror(code)); } + TAOS_RETURN(code); } @@ -232,6 +247,7 @@ static int32_t syncDecodeRaftCfg(const SJson *pJson, void *pObj) { tjsonGetNumberValue(configIndex, "index", pCfg->configIndexArr[i], code); if (code < 0) return TSDB_CODE_INVALID_JSON_FORMAT; } + return 0; } @@ -292,16 +308,6 @@ _OVER: if (code != 0) { sError("vgId:%d, failed to read sync cfg file:%s since %s", pNode->vgId, file, tstrerror(code)); } + TAOS_RETURN(code); } - -int32_t syncAddCfgIndex(SSyncNode *pNode, SyncIndex cfgIndex) { - SRaftCfg *pCfg = &pNode->raftCfg; - if (pCfg->configIndexCount >= MAX_CONFIG_INDEX_COUNT) { - return TSDB_CODE_OUT_OF_RANGE; - } - - pCfg->configIndexArr[pCfg->configIndexCount] = cfgIndex; - pCfg->configIndexCount++; - return 0; -} \ No newline at end of file