fix: refactor return code

This commit is contained in:
kailixu 2024-07-25 16:20:10 +08:00
parent 4b593aae64
commit 9de0e4ee64
8 changed files with 363 additions and 286 deletions

View File

@ -77,7 +77,7 @@ static FORCE_INLINE int32_t syncLogReplGetNextRetryBackoff(SSyncLogReplMgr* pMgr
return TMIN(pMgr->retryBackoff + 1, SYNC_MAX_RETRY_BACKOFF);
}
SyncTerm syncLogReplGetPrevLogTerm(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex index);
int32_t syncLogReplGetPrevLogTerm(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex index, SyncTerm* pSyncTerm);
int32_t syncLogReplStart(SSyncLogReplMgr* pMgr, SSyncNode* pNode);
int32_t syncLogReplAttempt(SSyncLogReplMgr* pMgr, SSyncNode* pNode);
@ -93,10 +93,10 @@ int32_t syncLogReplContinue(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendE
int32_t syncLogReplProcessHeartbeatReply(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncHeartbeatReply* pMsg);
// SSyncLogBuffer
SSyncLogBuffer* syncLogBufferCreate();
void syncLogBufferDestroy(SSyncLogBuffer* pBuf);
int32_t syncLogBufferInit(SSyncLogBuffer* pBuf, SSyncNode* pNode);
int32_t syncLogBufferReInit(SSyncLogBuffer* pBuf, SSyncNode* pNode);
int32_t syncLogBufferCreate(SSyncLogBuffer** ppBuf);
void syncLogBufferDestroy(SSyncLogBuffer* pBuf);
int32_t syncLogBufferInit(SSyncLogBuffer* pBuf, SSyncNode* pNode);
int32_t syncLogBufferReInit(SSyncLogBuffer* pBuf, SSyncNode* pNode);
// access
int64_t syncLogBufferGetEndIndex(SSyncLogBuffer* pBuf);
@ -110,9 +110,10 @@ int32_t syncLogBufferCommit(SSyncLogBuffer* pBuf, SSyncNode* pNode, int64_t comm
int32_t syncLogBufferReset(SSyncLogBuffer* pBuf, SSyncNode* pNode);
// private
SSyncRaftEntry* syncLogBufferGetOneEntry(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncIndex index, bool* pInBuf);
int32_t syncLogBufferValidate(SSyncLogBuffer* pBuf);
int32_t syncLogBufferRollback(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncIndex toIndex);
int32_t syncLogBufferGetOneEntry(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncIndex index, bool* pInBuf,
SSyncRaftEntry** ppEntry);
int32_t syncLogBufferValidate(SSyncLogBuffer* pBuf);
int32_t syncLogBufferRollback(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncIndex toIndex);
int32_t syncFsmExecute(SSyncNode* pNode, SSyncFSM* pFsm, ESyncState role, SyncTerm term, SSyncRaftEntry* pEntry,
int32_t applyCode, bool force);

View File

@ -43,7 +43,7 @@ SSyncRaftEntry* syncEntryBuildFromRpcMsg(const SRpcMsg* pMsg, SyncTerm term, Syn
SSyncRaftEntry* syncEntryBuildFromAppendEntries(const SyncAppendEntries* pMsg);
SSyncRaftEntry* syncEntryBuildNoop(SyncTerm term, SyncIndex index, int32_t vgId);
void syncEntryDestroy(SSyncRaftEntry* pEntry);
void syncEntry2OriginalRpc(const SSyncRaftEntry* pEntry, SRpcMsg* pRpcMsg); // step 7
int32_t syncEntry2OriginalRpc(const SSyncRaftEntry* pEntry, SRpcMsg* pRpcMsg); // step 7
static FORCE_INLINE bool syncLogReplBarrier(SSyncRaftEntry* pEntry) {
return pEntry->originalRpcType == TDMT_SYNC_NOOP || pEntry->originalRpcType == TDMT_SYNC_CONFIG_CHANGE;

View File

@ -949,6 +949,7 @@ int32_t syncNodeLogStoreRestoreOnNeed(SSyncNode* pNode) {
// open/close --------------
SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo, int32_t vnodeVersion) {
int32_t code = 0;
SSyncNode* pSyncNode = taosMemoryCalloc(1, sizeof(SSyncNode));
if (pSyncNode == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
@ -1041,9 +1042,9 @@ SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo, int32_t vnodeVersion) {
pSyncNode->syncEqCtrlMsg = pSyncInfo->syncEqCtrlMsg;
// create raft log ring buffer
pSyncNode->pLogBuf = syncLogBufferCreate();
code = syncLogBufferCreate(&pSyncNode->pLogBuf);
if (pSyncNode->pLogBuf == NULL) {
sError("failed to init sync log buffer since %s. vgId:%d", terrstr(), pSyncNode->vgId);
sError("failed to init sync log buffer since %s. vgId:%d", tstrerror(code), pSyncNode->vgId);
goto _error;
}

View File

@ -19,13 +19,13 @@
#include "syncCommit.h"
#include "syncIndexMgr.h"
#include "syncInt.h"
#include "syncRaftCfg.h"
#include "syncRaftEntry.h"
#include "syncRaftStore.h"
#include "syncReplication.h"
#include "syncRespMgr.h"
#include "syncSnapshot.h"
#include "syncUtil.h"
#include "syncRaftCfg.h"
#include "syncVoteMgr.h"
static bool syncIsMsgBlock(tmsg_t type) {
@ -45,28 +45,29 @@ int64_t syncLogBufferGetEndIndex(SSyncLogBuffer* pBuf) {
}
int32_t syncLogBufferAppend(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEntry* pEntry) {
int32_t code = 0;
taosThreadMutexLock(&pBuf->mutex);
syncLogBufferValidate(pBuf);
SyncIndex index = pEntry->index;
if (index - pBuf->startIndex >= pBuf->size) {
terrno = TSDB_CODE_SYN_BUFFER_FULL;
sError("vgId:%d, failed to append since %s. index:%" PRId64 "", pNode->vgId, terrstr(), index);
code = TSDB_CODE_SYN_BUFFER_FULL;
sError("vgId:%d, failed to append since %s. index:%" PRId64 "", pNode->vgId, tstrerror(code), index);
goto _err;
}
if (pNode->restoreFinish && index - pBuf->commitIndex >= TSDB_SYNC_NEGOTIATION_WIN) {
terrno = TSDB_CODE_SYN_NEGOTIATION_WIN_FULL;
sError("vgId:%d, failed to append since %s, index:%" PRId64 ", commit-index:%" PRId64, pNode->vgId, terrstr(),
code = TSDB_CODE_SYN_NEGOTIATION_WIN_FULL;
sError("vgId:%d, failed to append since %s, index:%" PRId64 ", commit-index:%" PRId64, pNode->vgId, tstrerror(code),
index, pBuf->commitIndex);
goto _err;
}
SyncIndex appliedIndex = pNode->pFsm->FpAppliedIndexCb(pNode->pFsm);
if (pNode->restoreFinish && pBuf->commitIndex - appliedIndex >= TSDB_SYNC_APPLYQ_SIZE_LIMIT) {
terrno = TSDB_CODE_SYN_WRITE_STALL;
code = TSDB_CODE_SYN_WRITE_STALL;
sError("vgId:%d, failed to append since %s. index:%" PRId64 ", commit-index:%" PRId64 ", applied-index:%" PRId64,
pNode->vgId, terrstr(), index, pBuf->commitIndex, appliedIndex);
pNode->vgId, tstrerror(code), index, pBuf->commitIndex, appliedIndex);
goto _err;
}
@ -93,21 +94,24 @@ _err:
syncLogBufferValidate(pBuf);
taosThreadMutexUnlock(&pBuf->mutex);
taosMsleep(1);
return -1;
TAOS_RETURN(code);
}
SyncTerm syncLogReplGetPrevLogTerm(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex index) {
int32_t syncLogReplGetPrevLogTerm(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex index, SyncTerm* pSyncTerm) {
SSyncLogBuffer* pBuf = pNode->pLogBuf;
SSyncRaftEntry* pEntry = NULL;
SyncIndex prevIndex = index - 1;
SyncTerm prevLogTerm = -1;
terrno = TSDB_CODE_SUCCESS;
int32_t code = 0;
if (prevIndex == -1 && pNode->pLogStore->syncLogBeginIndex(pNode->pLogStore) == 0) return 0;
if (prevIndex == -1 && pNode->pLogStore->syncLogBeginIndex(pNode->pLogStore) == 0) {
*pSyncTerm = 0;
return 0;
}
if (prevIndex > pBuf->matchIndex) {
terrno = TSDB_CODE_WAL_LOG_NOT_EXIST;
return -1;
*pSyncTerm = -1;
TAOS_RETURN(TSDB_CODE_WAL_LOG_NOT_EXIST);
}
ASSERT(index - 1 == prevIndex);
@ -116,7 +120,8 @@ SyncTerm syncLogReplGetPrevLogTerm(SSyncLogReplMgr* pMgr, SSyncNode* pNode, Sync
pEntry = pBuf->entries[(prevIndex + pBuf->size) % pBuf->size].pItem;
ASSERTS(pEntry != NULL, "no log entry found");
prevLogTerm = pEntry->term;
return prevLogTerm;
*pSyncTerm = prevLogTerm;
return 0;
}
if (pMgr && pMgr->startIndex <= prevIndex && prevIndex < pMgr->endIndex) {
@ -124,25 +129,28 @@ SyncTerm syncLogReplGetPrevLogTerm(SSyncLogReplMgr* pMgr, SSyncNode* pNode, Sync
ASSERTS(timeMs != 0, "no log entry found");
prevLogTerm = pMgr->states[(prevIndex + pMgr->size) % pMgr->size].term;
ASSERT(prevIndex == 0 || prevLogTerm != 0);
return prevLogTerm;
*pSyncTerm = prevLogTerm;
return 0;
}
SSnapshot snapshot = {0};
pNode->pFsm->FpGetSnapshotInfo(pNode->pFsm, &snapshot);
(void)pNode->pFsm->FpGetSnapshotInfo(pNode->pFsm, &snapshot); // TODO: check the return code
if (prevIndex == snapshot.lastApplyIndex) {
return snapshot.lastApplyTerm;
*pSyncTerm = snapshot.lastApplyTerm;
return 0;
}
if (pNode->pLogStore->syncLogGetEntry(pNode->pLogStore, prevIndex, &pEntry) == 0) {
if ((code = pNode->pLogStore->syncLogGetEntry(pNode->pLogStore, prevIndex, &pEntry)) == 0) {
prevLogTerm = pEntry->term;
syncEntryDestroy(pEntry);
pEntry = NULL;
return prevLogTerm;
*pSyncTerm = prevLogTerm;
return 0;
}
sInfo("vgId:%d, failed to get log term since %s. index:%" PRId64, pNode->vgId, terrstr(), prevIndex);
terrno = TSDB_CODE_WAL_LOG_NOT_EXIST;
return -1;
*pSyncTerm = -1;
sInfo("vgId:%d, failed to get log term since %s. index:%" PRId64, pNode->vgId, tstrerror(code), prevIndex);
TAOS_RETURN(code);
}
SSyncRaftEntry* syncEntryBuildDummy(SyncTerm term, SyncIndex index, int32_t vgId) {
@ -155,7 +163,7 @@ int32_t syncLogValidateAlignmentOfCommit(SSyncNode* pNode, SyncIndex commitIndex
sError("vgId:%d, firstVer of WAL log greater than tsdb commit version + 1. firstVer:%" PRId64
", tsdb commit version:%" PRId64 "",
pNode->vgId, firstVer, commitIndex);
return -1;
return TSDB_CODE_WAL_LOG_INCOMPLETE;
}
SyncIndex lastVer = pNode->pLogStore->syncLogLastIndex(pNode->pLogStore);
@ -163,7 +171,7 @@ int32_t syncLogValidateAlignmentOfCommit(SSyncNode* pNode, SyncIndex commitIndex
sError("vgId:%d, lastVer of WAL log less than tsdb commit version. lastVer:%" PRId64
", tsdb commit version:%" PRId64 "",
pNode->vgId, lastVer, commitIndex);
return -1;
return TSDB_CODE_WAL_LOG_INCOMPLETE;
}
return 0;
@ -174,15 +182,13 @@ int32_t syncLogBufferInitWithoutLock(SSyncLogBuffer* pBuf, SSyncNode* pNode) {
ASSERTS(pNode->pFsm != NULL, "pFsm not registered");
ASSERTS(pNode->pFsm->FpGetSnapshotInfo != NULL, "FpGetSnapshotInfo not registered");
int32_t code = 0, lino = 0;
SSnapshot snapshot = {0};
pNode->pFsm->FpGetSnapshotInfo(pNode->pFsm, &snapshot);
SyncIndex commitIndex = snapshot.lastApplyIndex;
SyncTerm commitTerm = TMAX(snapshot.lastApplyTerm, 0);
if (syncLogValidateAlignmentOfCommit(pNode, commitIndex)) {
terrno = TSDB_CODE_WAL_LOG_INCOMPLETE;
goto _err;
}
TAOS_CHECK_EXIT(syncLogValidateAlignmentOfCommit(pNode, commitIndex));
SyncIndex lastVer = pNode->pLogStore->syncLogLastIndex(pNode->pLogStore);
ASSERT(lastVer >= commitIndex);
@ -206,7 +212,7 @@ int32_t syncLogBufferInitWithoutLock(SSyncLogBuffer* pBuf, SSyncNode* pNode) {
}
if (pLogStore->syncLogGetEntry(pLogStore, index, &pEntry) < 0) {
sWarn("vgId:%d, failed to get log entry since %s. index:%" PRId64 "", pNode->vgId, terrstr(), index);
sWarn("vgId:%d, failed to get log entry since %s. index:%" PRId64 "", pNode->vgId, tstrerror(code), index);
break;
}
@ -237,8 +243,7 @@ int32_t syncLogBufferInitWithoutLock(SSyncLogBuffer* pBuf, SSyncNode* pNode) {
SSyncRaftEntry* pDummy = syncEntryBuildDummy(commitTerm, commitIndex, pNode->vgId);
if (pDummy == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
TAOS_CHECK_EXIT(TSDB_CODE_OUT_OF_MEMORY);
}
SSyncLogBufEntry tmp = {.pItem = pDummy, .prevLogIndex = commitIndex - 1, .prevLogTerm = commitTerm};
pBuf->entries[(commitIndex + pBuf->size) % pBuf->size] = tmp;
@ -261,8 +266,11 @@ int32_t syncLogBufferInitWithoutLock(SSyncLogBuffer* pBuf, SSyncNode* pNode) {
syncLogBufferValidate(pBuf);
return 0;
_err:
return -1;
_exit:
if (code != 0) {
sError("vgId:%d, failed to initialize sync log buffer at line %d since %s.", pNode->vgId, lino, tstrerror(code));
}
TAOS_RETURN(code);
}
int32_t syncLogBufferInit(SSyncLogBuffer* pBuf, SSyncNode* pNode) {
@ -283,13 +291,13 @@ int32_t syncLogBufferReInit(SSyncLogBuffer* pBuf, SSyncNode* pNode) {
memset(&pBuf->entries[(index + pBuf->size) % pBuf->size], 0, sizeof(pBuf->entries[0]));
}
pBuf->startIndex = pBuf->commitIndex = pBuf->matchIndex = pBuf->endIndex = 0;
int32_t ret = syncLogBufferInitWithoutLock(pBuf, pNode);
if (ret < 0) {
sError("vgId:%d, failed to re-initialize sync log buffer since %s.", pNode->vgId, terrstr());
int32_t code = syncLogBufferInitWithoutLock(pBuf, pNode);
if (code < 0) {
sError("vgId:%d, failed to re-initialize sync log buffer since %s.", pNode->vgId, tstrerror(code));
}
syncLogBufferValidate(pBuf);
taosThreadMutexUnlock(&pBuf->mutex);
return ret;
return code;
}
FORCE_INLINE SyncTerm syncLogBufferGetLastMatchTermWithoutLock(SSyncLogBuffer* pBuf) {
@ -316,7 +324,7 @@ bool syncLogBufferIsEmpty(SSyncLogBuffer* pBuf) {
int32_t syncLogBufferAccept(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEntry* pEntry, SyncTerm prevTerm) {
taosThreadMutexLock(&pBuf->mutex);
syncLogBufferValidate(pBuf);
int32_t ret = -1;
int32_t code = 0;
SyncIndex index = pEntry->index;
SyncIndex prevIndex = pEntry->index - 1;
SyncTerm lastMatchTerm = syncLogBufferGetLastMatchTermWithoutLock(pBuf);
@ -328,21 +336,22 @@ int32_t syncLogBufferAccept(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEnt
" %" PRId64 ", %" PRId64 ")",
pNode->vgId, pEntry->index, pEntry->term, pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex,
pBuf->endIndex);
SyncTerm term = syncLogReplGetPrevLogTerm(NULL, pNode, index + 1);
SyncTerm term = -1;
code = syncLogReplGetPrevLogTerm(NULL, pNode, index + 1, &term);
ASSERT(pEntry->term >= 0);
if (term == pEntry->term) {
ret = 0;
code = 0;
}
goto _out;
}
if(pNode->raftCfg.cfg.nodeInfo[pNode->raftCfg.cfg.myIndex].nodeRole == TAOS_SYNC_ROLE_LEARNER &&
index > 0 && index > pBuf->totalIndex){
if (pNode->raftCfg.cfg.nodeInfo[pNode->raftCfg.cfg.myIndex].nodeRole == TAOS_SYNC_ROLE_LEARNER && index > 0 &&
index > pBuf->totalIndex) {
pBuf->totalIndex = index;
sTrace("vgId:%d, update learner progress. index:%" PRId64 ", term:%" PRId64 ": prevterm:%" PRId64
" != lastmatch:%" PRId64 ". log buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")",
pNode->vgId, pEntry->index, pEntry->term, prevTerm, lastMatchTerm, pBuf->startIndex, pBuf->commitIndex,
pBuf->matchIndex, pBuf->endIndex);
" != lastmatch:%" PRId64 ". log buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")",
pNode->vgId, pEntry->index, pEntry->term, prevTerm, lastMatchTerm, pBuf->startIndex, pBuf->commitIndex,
pBuf->matchIndex, pBuf->endIndex);
}
if (index - pBuf->startIndex >= pBuf->size) {
@ -350,6 +359,7 @@ int32_t syncLogBufferAccept(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEnt
" %" PRId64 ", %" PRId64 ")",
pNode->vgId, pEntry->index, pEntry->term, pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex,
pBuf->endIndex);
code = TSDB_CODE_OUT_OF_RANGE;
goto _out;
}
@ -358,11 +368,12 @@ int32_t syncLogBufferAccept(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEnt
" != lastmatch:%" PRId64 ". log buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")",
pNode->vgId, pEntry->index, pEntry->term, prevTerm, lastMatchTerm, pBuf->startIndex, pBuf->commitIndex,
pBuf->matchIndex, pBuf->endIndex);
code = TSDB_CODE_ACTION_IN_PROGRESS;
goto _out;
}
// check current in buffer
pExist = syncLogBufferGetOneEntry(pBuf, pNode, index, &inBuf);
code = syncLogBufferGetOneEntry(pBuf, pNode, index, &inBuf, &pExist);
if (pExist != NULL) {
ASSERT(pEntry->index == pExist->index);
if (pEntry->term != pExist->term) {
@ -372,9 +383,10 @@ int32_t syncLogBufferAccept(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEnt
" %" PRId64 " %" PRId64 ", %" PRId64 ")",
pNode->vgId, pEntry->index, pEntry->term, pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex,
pBuf->endIndex);
SyncTerm existPrevTerm = syncLogReplGetPrevLogTerm(NULL, pNode, index);
SyncTerm existPrevTerm = -1;
(void)syncLogReplGetPrevLogTerm(NULL, pNode, index, &existPrevTerm);
ASSERT(pEntry->term == pExist->term && (pEntry->index > pBuf->matchIndex || prevTerm == existPrevTerm));
ret = 0;
code = 0;
goto _out;
}
}
@ -391,7 +403,7 @@ int32_t syncLogBufferAccept(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEnt
pBuf->endIndex = TMAX(index + 1, pBuf->endIndex);
// success
ret = 0;
code = 0;
_out:
syncEntryDestroy(pEntry);
@ -401,7 +413,7 @@ _out:
}
syncLogBufferValidate(pBuf);
taosThreadMutexUnlock(&pBuf->mutex);
return ret;
TAOS_RETURN(code);
}
static inline bool syncLogStoreNeedFlush(SSyncRaftEntry* pEntry, int32_t replicaNum) {
@ -409,20 +421,21 @@ static inline bool syncLogStoreNeedFlush(SSyncRaftEntry* pEntry, int32_t replica
}
int32_t syncLogStorePersist(SSyncLogStore* pLogStore, SSyncNode* pNode, SSyncRaftEntry* pEntry) {
int32_t code = 0;
ASSERT(pEntry->index >= 0);
SyncIndex lastVer = pLogStore->syncLogLastIndex(pLogStore);
if (lastVer >= pEntry->index && pLogStore->syncLogTruncate(pLogStore, pEntry->index) < 0) {
sError("failed to truncate log store since %s. from index:%" PRId64 "", terrstr(), pEntry->index);
return -1;
if (lastVer >= pEntry->index && (code = pLogStore->syncLogTruncate(pLogStore, pEntry->index)) < 0) {
sError("failed to truncate log store since %s. from index:%" PRId64 "", tstrerror(code), pEntry->index);
TAOS_RETURN(code);
}
lastVer = pLogStore->syncLogLastIndex(pLogStore);
ASSERT(pEntry->index == lastVer + 1);
bool doFsync = syncLogStoreNeedFlush(pEntry, pNode->replicaNum);
if (pLogStore->syncLogAppendEntry(pLogStore, pEntry, doFsync) < 0) {
sError("failed to append sync log entry since %s. index:%" PRId64 ", term:%" PRId64 "", terrstr(), pEntry->index,
pEntry->term);
return -1;
if ((code = pLogStore->syncLogAppendEntry(pLogStore, pEntry, doFsync)) < 0) {
sError("failed to append sync log entry since %s. index:%" PRId64 ", term:%" PRId64 "", tstrerror(code),
pEntry->index, pEntry->term);
TAOS_RETURN(code);
}
lastVer = pLogStore->syncLogLastIndex(pLogStore);
@ -430,12 +443,13 @@ int32_t syncLogStorePersist(SSyncLogStore* pLogStore, SSyncNode* pNode, SSyncRaf
return 0;
}
int64_t syncLogBufferProceed(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncTerm* pMatchTerm, char *str) {
int64_t syncLogBufferProceed(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncTerm* pMatchTerm, char* str) {
taosThreadMutexLock(&pBuf->mutex);
syncLogBufferValidate(pBuf);
SSyncLogStore* pLogStore = pNode->pLogStore;
int64_t matchIndex = pBuf->matchIndex;
int32_t code = 0;
while (pBuf->matchIndex + 1 < pBuf->endIndex) {
int64_t index = pBuf->matchIndex + 1;
@ -478,39 +492,40 @@ int64_t syncLogBufferProceed(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncTerm* p
pNode->vgId, pBuf->startIndex, pBuf->matchIndex, pBuf->endIndex);
// persist
if (syncLogStorePersist(pLogStore, pNode, pEntry) < 0) {
sError("vgId:%d, failed to persist sync log entry from buffer since %s. index:%" PRId64, pNode->vgId, terrstr(),
pEntry->index);
if ((code = syncLogStorePersist(pLogStore, pNode, pEntry)) < 0) {
sError("vgId:%d, failed to persist sync log entry from buffer since %s. index:%" PRId64, pNode->vgId,
tstrerror(code), pEntry->index);
taosMsleep(1);
goto _out;
}
if(pEntry->originalRpcType == TDMT_SYNC_CONFIG_CHANGE){
if(pNode->pLogBuf->commitIndex == pEntry->index -1){
sInfo("vgId:%d, to change config at %s. "
"current entry, index:%" PRId64 ", term:%" PRId64", "
"node, restore:%d, commitIndex:%" PRId64 ", "
"cond: (pre entry index:%" PRId64 "== buf commit index:%" PRId64 ")",
pNode->vgId, str,
pEntry->index, pEntry->term,
pNode->restoreFinish, pNode->commitIndex,
pEntry->index - 1, pNode->pLogBuf->commitIndex);
if(syncNodeChangeConfig(pNode, pEntry, str) != 0){
sError("vgId:%d, failed to change config from Append since %s. index:%" PRId64, pNode->vgId, terrstr(),
pEntry->index);
if (pEntry->originalRpcType == TDMT_SYNC_CONFIG_CHANGE) {
if (pNode->pLogBuf->commitIndex == pEntry->index - 1) {
sInfo(
"vgId:%d, to change config at %s. "
"current entry, index:%" PRId64 ", term:%" PRId64
", "
"node, restore:%d, commitIndex:%" PRId64
", "
"cond: (pre entry index:%" PRId64 "== buf commit index:%" PRId64 ")",
pNode->vgId, str, pEntry->index, pEntry->term, pNode->restoreFinish, pNode->commitIndex, pEntry->index - 1,
pNode->pLogBuf->commitIndex);
if ((code = syncNodeChangeConfig(pNode, pEntry, str)) != 0) {
sError("vgId:%d, failed to change config from Append since %s. index:%" PRId64, pNode->vgId, tstrerror(code),
pEntry->index);
goto _out;
}
}
else{
sInfo("vgId:%d, delay change config from Node %s. "
"curent entry, index:%" PRId64 ", term:%" PRId64 ", "
"node, commitIndex:%" PRId64 ", pBuf: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 "), "
"cond:( pre entry index:%" PRId64" != buf commit index:%" PRId64 ")",
pNode->vgId, str,
pEntry->index, pEntry->term,
pNode->commitIndex, pNode->pLogBuf->startIndex, pNode->pLogBuf->commitIndex,
pNode->pLogBuf->matchIndex, pNode->pLogBuf->endIndex,
pEntry->index - 1, pNode->pLogBuf->commitIndex);
} else {
sInfo(
"vgId:%d, delay change config from Node %s. "
"curent entry, index:%" PRId64 ", term:%" PRId64
", "
"node, commitIndex:%" PRId64 ", pBuf: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64
"), "
"cond:( pre entry index:%" PRId64 " != buf commit index:%" PRId64 ")",
pNode->vgId, str, pEntry->index, pEntry->term, pNode->commitIndex, pNode->pLogBuf->startIndex,
pNode->pLogBuf->commitIndex, pNode->pLogBuf->matchIndex, pNode->pLogBuf->endIndex, pEntry->index - 1,
pNode->pLogBuf->commitIndex);
}
}
@ -535,16 +550,16 @@ _out:
}
int32_t syncFsmExecute(SSyncNode* pNode, SSyncFSM* pFsm, ESyncState role, SyncTerm term, SSyncRaftEntry* pEntry,
int32_t applyCode, bool force) {
//learner need to execute fsm when it catch up entry log
//if force is true, keep all contition check to execute fsm
if (pNode->replicaNum == 1 && pNode->restoreFinish && pNode->vgId != 1
&& pNode->raftCfg.cfg.nodeInfo[pNode->raftCfg.cfg.myIndex].nodeRole != TAOS_SYNC_ROLE_LEARNER
&& force == false) {
sDebug("vgId:%d, not to execute fsm, index:%" PRId64 ", term:%" PRId64 ", type:%s code:0x%x, replicaNum:%d,"
"role:%d, restoreFinish:%d",
pNode->vgId, pEntry->index, pEntry->term, TMSG_INFO(pEntry->originalRpcType), applyCode,
pNode->replicaNum, pNode->raftCfg.cfg.nodeInfo[pNode->raftCfg.cfg.myIndex].nodeRole, pNode->restoreFinish);
int32_t applyCode, bool force) {
// learner need to execute fsm when it catch up entry log
// if force is true, keep all contition check to execute fsm
if (pNode->replicaNum == 1 && pNode->restoreFinish && pNode->vgId != 1 &&
pNode->raftCfg.cfg.nodeInfo[pNode->raftCfg.cfg.myIndex].nodeRole != TAOS_SYNC_ROLE_LEARNER && force == false) {
sDebug("vgId:%d, not to execute fsm, index:%" PRId64 ", term:%" PRId64
", type:%s code:0x%x, replicaNum:%d,"
"role:%d, restoreFinish:%d",
pNode->vgId, pEntry->index, pEntry->term, TMSG_INFO(pEntry->originalRpcType), applyCode, pNode->replicaNum,
pNode->raftCfg.cfg.nodeInfo[pNode->raftCfg.cfg.myIndex].nodeRole, pNode->restoreFinish);
return 0;
}
@ -558,11 +573,11 @@ int32_t syncFsmExecute(SSyncNode* pNode, SSyncFSM* pFsm, ESyncState role, SyncTe
pEntry->term);
}
int32_t code = 0;
int32_t code = 0, lino = 0;
bool retry = false;
do {
SRpcMsg rpcMsg = {.code = applyCode};
syncEntry2OriginalRpc(pEntry, &rpcMsg);
TAOS_CHECK_EXIT(syncEntry2OriginalRpc(pEntry, &rpcMsg));
SFsmCbMeta cbMeta = {0};
cbMeta.index = pEntry->index;
@ -580,9 +595,15 @@ int32_t syncFsmExecute(SSyncNode* pNode, SSyncFSM* pFsm, ESyncState role, SyncTe
retry = (code != 0) && (terrno == TSDB_CODE_OUT_OF_RPC_MEMORY_QUEUE);
if (retry) {
taosMsleep(10);
sError("vgId:%d, retry on fsm commit since %s. index:%" PRId64, pNode->vgId, terrstr(), pEntry->index);
sError("vgId:%d, retry on fsm commit since %s. index:%" PRId64, pNode->vgId, tstrerror(code), pEntry->index);
}
} while (retry);
_exit:
if (code < 0) {
sError("vgId:%d, failed to execute fsm at line %d since %s. index:%" PRId64 ", term:%" PRId64 ", type:%s",
pNode->vgId, lino, tstrerror(code), pEntry->index, pEntry->term, TMSG_INFO(pEntry->originalRpcType));
}
return code;
}
@ -604,7 +625,7 @@ int32_t syncLogBufferCommit(SSyncLogBuffer* pBuf, SSyncNode* pNode, int64_t comm
ESyncState role = pNode->state;
SyncTerm currentTerm = raftStoreGetTerm(pNode);
SyncGroupId vgId = pNode->vgId;
int32_t ret = -1;
int32_t code = 0;
int64_t upperIndex = TMIN(commitIndex, pBuf->matchIndex);
SSyncRaftEntry* pEntry = NULL;
bool inBuf = false;
@ -614,7 +635,6 @@ int32_t syncLogBufferCommit(SSyncLogBuffer* pBuf, SSyncNode* pNode, int64_t comm
if (commitIndex <= pBuf->commitIndex) {
sDebug("vgId:%d, stale commit index. current:%" PRId64 ", notified:%" PRId64 "", vgId, pBuf->commitIndex,
commitIndex);
ret = 0;
goto _out;
}
@ -624,7 +644,7 @@ int32_t syncLogBufferCommit(SSyncLogBuffer* pBuf, SSyncNode* pNode, int64_t comm
// execute in fsm
for (int64_t index = pBuf->commitIndex + 1; index <= upperIndex; index++) {
// get a log entry
pEntry = syncLogBufferGetOneEntry(pBuf, pNode, index, &inBuf);
code = syncLogBufferGetOneEntry(pBuf, pNode, index, &inBuf, &pEntry);
if (pEntry == NULL) {
goto _out;
}
@ -635,7 +655,7 @@ int32_t syncLogBufferCommit(SSyncLogBuffer* pBuf, SSyncNode* pNode, int64_t comm
pEntry->term, TMSG_INFO(pEntry->originalRpcType));
}
if (syncFsmExecute(pNode, pFsm, role, currentTerm, pEntry, 0, false) != 0) {
if ((code = syncFsmExecute(pNode, pFsm, role, currentTerm, pEntry, 0, false)) != 0) {
sError("vgId:%d, failed to execute sync log entry. index:%" PRId64 ", term:%" PRId64
", role:%d, current term:%" PRId64,
vgId, pEntry->index, pEntry->term, role, currentTerm);
@ -646,39 +666,40 @@ int32_t syncLogBufferCommit(SSyncLogBuffer* pBuf, SSyncNode* pNode, int64_t comm
sTrace("vgId:%d, committed index:%" PRId64 ", term:%" PRId64 ", role:%d, current term:%" PRId64 "", pNode->vgId,
pEntry->index, pEntry->term, role, currentTerm);
pNextEntry = syncLogBufferGetOneEntry(pBuf, pNode, index + 1, &nextInBuf);
code = syncLogBufferGetOneEntry(pBuf, pNode, index + 1, &nextInBuf, &pNextEntry);
if (pNextEntry != NULL) {
if(pNextEntry->originalRpcType == TDMT_SYNC_CONFIG_CHANGE){
sInfo("vgId:%d, to change config at Commit. "
"current entry, index:%" PRId64 ", term:%" PRId64", "
"node, role:%d, current term:%" PRId64 ", restore:%d, "
"cond, next entry index:%" PRId64 ", msgType:%s",
vgId,
pEntry->index, pEntry->term,
role, currentTerm, pNode->restoreFinish,
pNextEntry->index, TMSG_INFO(pNextEntry->originalRpcType));
if (pNextEntry->originalRpcType == TDMT_SYNC_CONFIG_CHANGE) {
sInfo(
"vgId:%d, to change config at Commit. "
"current entry, index:%" PRId64 ", term:%" PRId64
", "
"node, role:%d, current term:%" PRId64
", restore:%d, "
"cond, next entry index:%" PRId64 ", msgType:%s",
vgId, pEntry->index, pEntry->term, role, currentTerm, pNode->restoreFinish, pNextEntry->index,
TMSG_INFO(pNextEntry->originalRpcType));
if(syncNodeChangeConfig(pNode, pNextEntry, "Commit") != 0){
if ((code = syncNodeChangeConfig(pNode, pNextEntry, "Commit")) != 0) {
sError("vgId:%d, failed to change config from Commit. index:%" PRId64 ", term:%" PRId64
", role:%d, current term:%" PRId64,
vgId, pNextEntry->index, pNextEntry->term, role, currentTerm);
goto _out;
", role:%d, current term:%" PRId64,
vgId, pNextEntry->index, pNextEntry->term, role, currentTerm);
goto _out;
}
//for 2->1, need to apply config change entry in sync thread,
if(pNode->replicaNum == 1){
if (syncFsmExecute(pNode, pFsm, role, currentTerm, pNextEntry, 0, true) != 0) {
// for 2->1, need to apply config change entry in sync thread,
if (pNode->replicaNum == 1) {
if ((code = syncFsmExecute(pNode, pFsm, role, currentTerm, pNextEntry, 0, true)) != 0) {
sError("vgId:%d, failed to execute sync log entry. index:%" PRId64 ", term:%" PRId64
", role:%d, current term:%" PRId64,
vgId, pNextEntry->index, pNextEntry->term, role, currentTerm);
", role:%d, current term:%" PRId64,
vgId, pNextEntry->index, pNextEntry->term, role, currentTerm);
goto _out;
}
index++;
pBuf->commitIndex = index;
sTrace("vgId:%d, committed index:%" PRId64 ", term:%" PRId64 ", role:%d, current term:%" PRId64 "", pNode->vgId,
pNextEntry->index, pNextEntry->term, role, currentTerm);
sTrace("vgId:%d, committed index:%" PRId64 ", term:%" PRId64 ", role:%d, current term:%" PRId64 "",
pNode->vgId, pNextEntry->index, pNextEntry->term, role, currentTerm);
}
}
if (!nextInBuf) {
@ -703,7 +724,7 @@ int32_t syncLogBufferCommit(SSyncLogBuffer* pBuf, SSyncNode* pNode, int64_t comm
pBuf->startIndex = index + 1;
}
ret = 0;
code = 0;
_out:
// mark as restored if needed
if (!pNode->restoreFinish && pBuf->commitIndex >= pNode->commitIndex && pEntry != NULL &&
@ -724,7 +745,7 @@ _out:
}
syncLogBufferValidate(pBuf);
taosThreadMutexUnlock(&pBuf->mutex);
return ret;
TAOS_RETURN(code);
}
void syncLogReplReset(SSyncLogReplMgr* pMgr) {
@ -751,10 +772,10 @@ int32_t syncLogReplRetryOnNeed(SSyncLogReplMgr* pMgr, SSyncNode* pNode) {
syncLogReplReset(pMgr);
sWarn("vgId:%d, reset sync log repl since retry backoff exceeding limit. peer:%" PRIx64, pNode->vgId,
pDestId->addr);
return -1;
return TSDB_CODE_OUT_OF_RANGE;
}
int32_t ret = -1;
int32_t code = 0;
bool retried = false;
int64_t retryWaitMs = syncLogReplGetRetryBackoffTimeMs(pMgr);
int64_t nowMs = taosGetMonoTimestampMs();
@ -776,15 +797,16 @@ int32_t syncLogReplRetryOnNeed(SSyncLogReplMgr* pMgr, SSyncNode* pNode) {
syncLogReplReset(pMgr);
sWarn("vgId:%d, reset sync log repl since stagnation. index:%" PRId64 ", peer:%" PRIx64, pNode->vgId, index,
pDestId->addr);
code = TSDB_CODE_ACTION_IN_PROGRESS;
goto _out;
}
continue;
}
bool barrier = false;
if (syncLogReplSendTo(pMgr, pNode, index, &term, pDestId, &barrier) < 0) {
if ((code = syncLogReplSendTo(pMgr, pNode, index, &term, pDestId, &barrier)) < 0) {
sError("vgId:%d, failed to replicate sync log entry since %s. index:%" PRId64 ", dest:%" PRIx64 "", pNode->vgId,
terrstr(), index, pDestId->addr);
tstrerror(code), index, pDestId->addr);
goto _out;
}
ASSERT(barrier == pMgr->states[pos].barrier);
@ -800,7 +822,6 @@ int32_t syncLogReplRetryOnNeed(SSyncLogReplMgr* pMgr, SSyncNode* pNode) {
}
}
ret = 0;
_out:
if (retried) {
pMgr->retryBackoff = syncLogReplGetNextRetryBackoff(pMgr);
@ -811,12 +832,13 @@ _out:
pNode->vgId, count, pDestId->addr, firstIndex, term, retryWaitMs, pMgr->startIndex, pMgr->matchIndex,
pMgr->endIndex, pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex);
}
return ret;
TAOS_RETURN(code);
}
int32_t syncLogReplRecover(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEntriesReply* pMsg) {
SSyncLogBuffer* pBuf = pNode->pLogBuf;
SRaftId destId = pMsg->srcId;
int32_t code = 0;
ASSERT(pMgr->restored == false);
if (pMgr->endIndex == 0) {
@ -832,7 +854,7 @@ int32_t syncLogReplRecover(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEn
}
} else {
if (pMsg->lastSendIndex < pMgr->startIndex || pMsg->lastSendIndex >= pMgr->endIndex) {
syncLogReplRetryOnNeed(pMgr, pNode);
(void)syncLogReplRetryOnNeed(pMgr, pNode);
return 0;
}
@ -854,9 +876,9 @@ int32_t syncLogReplRecover(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEn
sInfo("vgId:%d, snapshot replication to dnode:%d. reason:%s, match index:%" PRId64 ", last sent:%" PRId64,
pNode->vgId, DID(&destId), (pMsg->fsmState == SYNC_FSM_STATE_INCOMPLETE ? msg2 : msg1), pMsg->matchIndex,
pMsg->lastSendIndex);
if (syncNodeStartSnapshot(pNode, &destId) < 0) {
if ((code = syncNodeStartSnapshot(pNode, &destId)) < 0) {
sError("vgId:%d, failed to start snapshot for peer dnode:%d", pNode->vgId, DID(&destId));
return -1;
TAOS_RETURN(code);
}
return 0;
}
@ -869,10 +891,10 @@ int32_t syncLogReplRecover(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEn
errno = 0;
if (pMsg->matchIndex < pNode->pLogBuf->matchIndex) {
term = syncLogReplGetPrevLogTerm(pMgr, pNode, index + 1);
code = syncLogReplGetPrevLogTerm(pMgr, pNode, index + 1, &term);
if (term < 0 && (errno == ENFILE || errno == EMFILE)) {
sError("vgId:%d, failed to get prev log term since %s. index:%" PRId64, pNode->vgId, terrstr(), index + 1);
return -1;
sError("vgId:%d, failed to get prev log term since %s. index:%" PRId64, pNode->vgId, tstrerror(code), index + 1);
TAOS_RETURN(code);
}
if (pMsg->matchIndex == -1) {
@ -888,9 +910,9 @@ int32_t syncLogReplRecover(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEn
if ((index + 1 < firstVer) || (term < 0) ||
(term != pMsg->lastMatchTerm && (index + 1 == firstVer || index == firstVer))) {
ASSERT(term >= 0 || terrno == TSDB_CODE_WAL_LOG_NOT_EXIST);
if (syncNodeStartSnapshot(pNode, &destId) < 0) {
if ((code = syncNodeStartSnapshot(pNode, &destId)) < 0) {
sError("vgId:%d, failed to start snapshot for peer dnode:%d", pNode->vgId, DID(&destId));
return -1;
TAOS_RETURN(code);
}
sInfo("vgId:%d, snapshot replication to peer dnode:%d", pNode->vgId, DID(&destId));
return 0;
@ -957,6 +979,7 @@ int32_t syncLogReplProbe(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex inde
ASSERT(pMgr->startIndex >= 0);
int64_t retryMaxWaitMs = syncGetRetryMaxWaitMs();
int64_t nowMs = taosGetMonoTimestampMs();
int32_t code = 0;
if (pMgr->endIndex > pMgr->startIndex &&
nowMs < pMgr->states[pMgr->startIndex % pMgr->size].timeMs + retryMaxWaitMs) {
@ -967,10 +990,10 @@ int32_t syncLogReplProbe(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex inde
SRaftId* pDestId = &pNode->replicasId[pMgr->peerId];
bool barrier = false;
SyncTerm term = -1;
if (syncLogReplSendTo(pMgr, pNode, index, &term, pDestId, &barrier) < 0) {
if ((code = syncLogReplSendTo(pMgr, pNode, index, &term, pDestId, &barrier)) < 0) {
sError("vgId:%d, failed to replicate log entry since %s. index:%" PRId64 ", dest: 0x%016" PRIx64 "", pNode->vgId,
terrstr(), index, pDestId->addr);
return -1;
tstrerror(code), index, pDestId->addr);
TAOS_RETURN(code);
}
ASSERT(index >= 0);
@ -995,6 +1018,7 @@ int32_t syncLogReplAttempt(SSyncLogReplMgr* pMgr, SSyncNode* pNode) {
SRaftId* pDestId = &pNode->replicasId[pMgr->peerId];
int32_t batchSize = TMAX(1, pMgr->size >> (4 + pMgr->retryBackoff));
int32_t code = 0;
int32_t count = 0;
int64_t nowMs = taosGetMonoTimestampMs();
int64_t limit = pMgr->size >> 1;
@ -1012,10 +1036,10 @@ int32_t syncLogReplAttempt(SSyncLogReplMgr* pMgr, SSyncNode* pNode) {
SRaftId* pDestId = &pNode->replicasId[pMgr->peerId];
bool barrier = false;
SyncTerm term = -1;
if (syncLogReplSendTo(pMgr, pNode, index, &term, pDestId, &barrier) < 0) {
if ((code = syncLogReplSendTo(pMgr, pNode, index, &term, pDestId, &barrier)) < 0) {
sError("vgId:%d, failed to replicate log entry since %s. index:%" PRId64 ", dest: 0x%016" PRIx64 "", pNode->vgId,
terrstr(), index, pDestId->addr);
return -1;
tstrerror(code), index, pDestId->addr);
TAOS_RETURN(code);
}
pMgr->states[pos].barrier = barrier;
pMgr->states[pos].timeMs = nowMs;
@ -1034,7 +1058,7 @@ int32_t syncLogReplAttempt(SSyncLogReplMgr* pMgr, SSyncNode* pNode) {
}
}
syncLogReplRetryOnNeed(pMgr, pNode);
(void)syncLogReplRetryOnNeed(pMgr, pNode);
SSyncLogBuffer* pBuf = pNode->pLogBuf;
sTrace("vgId:%d, replicated %d msgs to peer:%" PRIx64 ". indexes:%" PRId64 "..., terms: ...%" PRId64
@ -1094,8 +1118,7 @@ int32_t syncNodeLogReplInit(SSyncNode* pNode) {
ASSERT(pNode->logReplMgrs[i] == NULL);
pNode->logReplMgrs[i] = syncLogReplCreate();
if (pNode->logReplMgrs[i] == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY);
}
pNode->logReplMgrs[i]->peerId = i;
}
@ -1109,11 +1132,14 @@ void syncNodeLogReplDestroy(SSyncNode* pNode) {
}
}
SSyncLogBuffer* syncLogBufferCreate() {
int32_t syncLogBufferCreate(SSyncLogBuffer** ppBuf) {
int32_t code = 0;
*ppBuf = NULL;
SSyncLogBuffer* pBuf = taosMemoryCalloc(1, sizeof(SSyncLogBuffer));
if (pBuf == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY);
}
pBuf->size = sizeof(pBuf->entries) / sizeof(pBuf->entries[0]);
@ -1121,28 +1147,28 @@ SSyncLogBuffer* syncLogBufferCreate() {
ASSERT(pBuf->size == TSDB_SYNC_LOG_BUFFER_SIZE);
if (taosThreadMutexAttrInit(&pBuf->attr) < 0) {
sError("failed to init log buffer mutexattr due to %s", strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
code = TAOS_SYSTEM_ERROR(errno);
sError("failed to init log buffer mutexattr due to %s", tstrerror(code));
goto _err;
}
if (taosThreadMutexAttrSetType(&pBuf->attr, PTHREAD_MUTEX_RECURSIVE) < 0) {
sError("failed to set log buffer mutexattr type due to %s", strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
code = TAOS_SYSTEM_ERROR(errno);
sError("failed to set log buffer mutexattr type due to %s", tstrerror(code));
goto _err;
}
if (taosThreadMutexInit(&pBuf->mutex, &pBuf->attr) < 0) {
sError("failed to init log buffer mutex due to %s", strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
code = TAOS_SYSTEM_ERROR(errno);
sError("failed to init log buffer mutex due to %s", tstrerror(code));
goto _err;
}
return pBuf;
*ppBuf = pBuf;
_err:
taosMemoryFree(pBuf);
return NULL;
TAOS_RETURN(code);
}
void syncLogBufferClear(SSyncLogBuffer* pBuf) {
@ -1170,14 +1196,15 @@ void syncLogBufferDestroy(SSyncLogBuffer* pBuf) {
}
int32_t syncLogBufferRollback(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncIndex toIndex) {
int32_t code = 0;
ASSERT(pBuf->commitIndex < toIndex && toIndex <= pBuf->endIndex);
if (toIndex == pBuf->endIndex) {
return 0;
}
sInfo("vgId:%d, rollback sync log buffer. toindex:%" PRId64 ", buffer: [%" PRId64 " %" PRId64 " %" PRId64
", %" PRId64 ")",
sInfo("vgId:%d, rollback sync log buffer. toindex:%" PRId64 ", buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64
")",
pNode->vgId, toIndex, pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex);
// trunc buffer
@ -1197,19 +1224,19 @@ int32_t syncLogBufferRollback(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncIndex
// trunc wal
SyncIndex lastVer = pNode->pLogStore->syncLogLastIndex(pNode->pLogStore);
if (lastVer >= toIndex && pNode->pLogStore->syncLogTruncate(pNode->pLogStore, toIndex) < 0) {
sError("vgId:%d, failed to truncate log store since %s. from index:%" PRId64 "", pNode->vgId, terrstr(), toIndex);
return -1;
if (lastVer >= toIndex && (code = pNode->pLogStore->syncLogTruncate(pNode->pLogStore, toIndex)) < 0) {
sError("vgId:%d, failed to truncate log store since %s. from index:%" PRId64 "", pNode->vgId, tstrerror(code),
toIndex);
TAOS_RETURN(code);
}
lastVer = pNode->pLogStore->syncLogLastIndex(pNode->pLogStore);
ASSERT(toIndex == lastVer + 1);
// refill buffer on need
if (toIndex <= pBuf->startIndex) {
int32_t ret = syncLogBufferInitWithoutLock(pBuf, pNode);
if (ret < 0) {
sError("vgId:%d, failed to refill sync log buffer since %s", pNode->vgId, terrstr());
return -1;
if ((code = syncLogBufferInitWithoutLock(pBuf, pNode)) < 0) {
sError("vgId:%d, failed to refill sync log buffer since %s", pNode->vgId, tstrerror(code));
TAOS_RETURN(code);
}
}
@ -1242,21 +1269,27 @@ int32_t syncLogBufferReset(SSyncLogBuffer* pBuf, SSyncNode* pNode) {
return 0;
}
SSyncRaftEntry* syncLogBufferGetOneEntry(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncIndex index, bool* pInBuf) {
SSyncRaftEntry* pEntry = NULL;
int32_t syncLogBufferGetOneEntry(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncIndex index, bool* pInBuf,
SSyncRaftEntry** ppEntry) {
int32_t code = 0;
*ppEntry = NULL;
if (index >= pBuf->endIndex) {
return NULL;
return TSDB_CODE_OUT_OF_RANGE;
}
if (index > pBuf->startIndex) { // startIndex might be dummy
*pInBuf = true;
pEntry = pBuf->entries[index % pBuf->size].pItem;
*ppEntry = pBuf->entries[index % pBuf->size].pItem;
} else {
*pInBuf = false;
if (pNode->pLogStore->syncLogGetEntry(pNode->pLogStore, index, &pEntry) < 0) {
sWarn("vgId:%d, failed to get log entry since %s. index:%" PRId64 "", pNode->vgId, terrstr(), index);
if ((code = pNode->pLogStore->syncLogGetEntry(pNode->pLogStore, index, ppEntry)) < 0) {
sWarn("vgId:%d, failed to get log entry since %s. index:%" PRId64 "", pNode->vgId, tstrerror(code), index);
}
}
return pEntry;
TAOS_RETURN(code);
}
int32_t syncLogReplSendTo(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex index, SyncTerm* pTerm, SRaftId* pDestId,
@ -1266,15 +1299,16 @@ int32_t syncLogReplSendTo(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex ind
bool inBuf = false;
SyncTerm prevLogTerm = -1;
SSyncLogBuffer* pBuf = pNode->pLogBuf;
int32_t code = 0;
pEntry = syncLogBufferGetOneEntry(pBuf, pNode, index, &inBuf);
code = syncLogBufferGetOneEntry(pBuf, pNode, index, &inBuf, &pEntry);
if (pEntry == NULL) {
sWarn("vgId:%d, failed to get raft entry for index:%" PRId64 "", pNode->vgId, index);
if (terrno == TSDB_CODE_WAL_LOG_NOT_EXIST) {
if (code == TSDB_CODE_WAL_LOG_NOT_EXIST) {
SSyncLogReplMgr* pMgr = syncNodeGetLogReplMgr(pNode, pDestId);
if (pMgr) {
sInfo("vgId:%d, reset sync log repl of peer:%" PRIx64 " since %s. index:%" PRId64, pNode->vgId, pDestId->addr,
terrstr(), index);
tstrerror(code), index);
(void)syncLogReplReset(pMgr);
}
}
@ -1282,14 +1316,14 @@ int32_t syncLogReplSendTo(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex ind
}
*pBarrier = syncLogReplBarrier(pEntry);
prevLogTerm = syncLogReplGetPrevLogTerm(pMgr, pNode, index);
code = syncLogReplGetPrevLogTerm(pMgr, pNode, index, &prevLogTerm);
if (prevLogTerm < 0) {
sError("vgId:%d, failed to get prev log term since %s. index:%" PRId64 "", pNode->vgId, terrstr(), index);
sError("vgId:%d, failed to get prev log term since %s. index:%" PRId64 "", pNode->vgId, tstrerror(code), index);
goto _err;
}
if (pTerm) *pTerm = pEntry->term;
int32_t code = syncBuildAppendEntriesFromRaftEntry(pNode, pEntry, prevLogTerm, &msgOut);
code = syncBuildAppendEntriesFromRaftEntry(pNode, pEntry, prevLogTerm, &msgOut);
if (code < 0) {
sError("vgId:%d, failed to get append entries for index:%" PRId64 "", pNode->vgId, index);
goto _err;
@ -1313,5 +1347,5 @@ _err:
syncEntryDestroy(pEntry);
pEntry = NULL;
}
return -1;
TAOS_RETURN(code);
}

View File

@ -18,7 +18,7 @@
#include "syncUtil.h"
#include "tjson.h"
const char* syncRoleToStr(ESyncRole role) {
const char *syncRoleToStr(ESyncRole role) {
switch (role) {
case TAOS_SYNC_ROLE_VOTER:
return "true";
@ -29,11 +29,11 @@ const char* syncRoleToStr(ESyncRole role) {
}
}
const ESyncRole syncStrToRole(char* str) {
if(strcmp(str, "true") == 0){
const ESyncRole syncStrToRole(char *str) {
if (strcmp(str, "true") == 0) {
return TAOS_SYNC_ROLE_VOTER;
}
if(strcmp(str, "false") == 0){
if (strcmp(str, "false") == 0) {
return TAOS_SYNC_ROLE_LEARNER;
}
@ -42,51 +42,82 @@ const ESyncRole syncStrToRole(char* str) {
static int32_t syncEncodeSyncCfg(const void *pObj, SJson *pJson) {
SSyncCfg *pCfg = (SSyncCfg *)pObj;
if (tjsonAddDoubleToObject(pJson, "replicaNum", pCfg->replicaNum) < 0) return -1;
if (tjsonAddDoubleToObject(pJson, "myIndex", pCfg->myIndex) < 0) return -1;
if (tjsonAddDoubleToObject(pJson, "changeVersion", pCfg->changeVersion) < 0) return -1;
int32_t code = 0, lino = 0;
TAOS_CHECK_EXIT(tjsonAddDoubleToObject(pJson, "replicaNum", pCfg->replicaNum));
TAOS_CHECK_EXIT(tjsonAddDoubleToObject(pJson, "myIndex", pCfg->myIndex));
TAOS_CHECK_EXIT(tjsonAddDoubleToObject(pJson, "changeVersion", pCfg->changeVersion));
SJson *nodeInfo = tjsonCreateArray();
if (nodeInfo == NULL) return -1;
if (tjsonAddItemToObject(pJson, "nodeInfo", nodeInfo) < 0) return -1;
if (nodeInfo == NULL) {
TAOS_CHECK_EXIT(TSDB_CODE_OUT_OF_MEMORY);
}
if ((code = tjsonAddItemToObject(pJson, "nodeInfo", nodeInfo)) < 0) {
tjsonDelete(nodeInfo);
TAOS_CHECK_EXIT(code);
}
for (int32_t i = 0; i < pCfg->totalReplicaNum; ++i) {
SJson *info = tjsonCreateObject();
if (info == NULL) return -1;
if (tjsonAddDoubleToObject(info, "nodePort", pCfg->nodeInfo[i].nodePort) < 0) return -1;
if (tjsonAddStringToObject(info, "nodeFqdn", pCfg->nodeInfo[i].nodeFqdn) < 0) return -1;
if (tjsonAddIntegerToObject(info, "nodeId", pCfg->nodeInfo[i].nodeId) < 0) return -1;
if (tjsonAddIntegerToObject(info, "clusterId", pCfg->nodeInfo[i].clusterId) < 0) return -1;
if (tjsonAddStringToObject(info, "isReplica", syncRoleToStr(pCfg->nodeInfo[i].nodeRole)) < 0) return -1;
if (tjsonAddItemToArray(nodeInfo, info) < 0) return -1;
if (info == NULL) {
TAOS_CHECK_EXIT(TSDB_CODE_OUT_OF_MEMORY);
}
TAOS_CHECK_GOTO(tjsonAddDoubleToObject(info, "nodePort", pCfg->nodeInfo[i].nodePort), NULL, _err);
TAOS_CHECK_GOTO(tjsonAddStringToObject(info, "nodeFqdn", pCfg->nodeInfo[i].nodeFqdn), NULL, _err);
TAOS_CHECK_GOTO(tjsonAddIntegerToObject(info, "nodeId", pCfg->nodeInfo[i].nodeId), NULL, _err);
TAOS_CHECK_GOTO(tjsonAddIntegerToObject(info, "clusterId", pCfg->nodeInfo[i].clusterId), NULL, _err);
TAOS_CHECK_GOTO(tjsonAddStringToObject(info, "isReplica", syncRoleToStr(pCfg->nodeInfo[i].nodeRole)), NULL, _err);
TAOS_CHECK_GOTO(tjsonAddItemToArray(nodeInfo, info), NULL, _err);
continue;
_err:
tjsonDelete(info);
break;
}
return 0;
_exit:
if (code < 0) {
sError("failed to encode sync cfg at line %d since %s", lino, tstrerror(code));
}
TAOS_RETURN(code);
}
static int32_t syncEncodeRaftCfg(const void *pObj, SJson *pJson) {
SRaftCfg *pCfg = (SRaftCfg *)pObj;
if (tjsonAddObject(pJson, "SSyncCfg", syncEncodeSyncCfg, (void *)&pCfg->cfg) < 0) return -1;
if (tjsonAddDoubleToObject(pJson, "isStandBy", pCfg->isStandBy) < 0) return -1;
if (tjsonAddDoubleToObject(pJson, "snapshotStrategy", pCfg->snapshotStrategy) < 0) return -1;
if (tjsonAddDoubleToObject(pJson, "batchSize", pCfg->batchSize) < 0) return -1;
if (tjsonAddIntegerToObject(pJson, "lastConfigIndex", pCfg->lastConfigIndex) < 0) return -1;
if (tjsonAddDoubleToObject(pJson, "configIndexCount", pCfg->configIndexCount) < 0) return -1;
int32_t code = 0, lino = 0;
TAOS_CHECK_EXIT(tjsonAddObject(pJson, "SSyncCfg", syncEncodeSyncCfg, (void *)&pCfg->cfg));
TAOS_CHECK_EXIT(tjsonAddDoubleToObject(pJson, "isStandBy", pCfg->isStandBy));
TAOS_CHECK_EXIT(tjsonAddDoubleToObject(pJson, "snapshotStrategy", pCfg->snapshotStrategy));
TAOS_CHECK_EXIT(tjsonAddDoubleToObject(pJson, "batchSize", pCfg->batchSize));
TAOS_CHECK_EXIT(tjsonAddIntegerToObject(pJson, "lastConfigIndex", pCfg->lastConfigIndex));
TAOS_CHECK_EXIT(tjsonAddDoubleToObject(pJson, "configIndexCount", pCfg->configIndexCount));
SJson *configIndexArr = tjsonCreateArray();
if (configIndexArr == NULL) return -1;
if (tjsonAddItemToObject(pJson, "configIndexArr", configIndexArr) < 0) return -1;
if (configIndexArr == NULL) {
TAOS_CHECK_EXIT(TSDB_CODE_OUT_OF_MEMORY);
}
if ((code = tjsonAddItemToObject(pJson, "configIndexArr", configIndexArr)) < 0) {
tjsonDelete(configIndexArr);
TAOS_CHECK_EXIT(code);
}
for (int32_t i = 0; i < pCfg->configIndexCount; ++i) {
SJson *configIndex = tjsonCreateObject();
if (configIndex == NULL) return -1;
if (tjsonAddIntegerToObject(configIndex, "index", pCfg->configIndexArr[i]) < 0) return -1;
if (tjsonAddItemToArray(configIndexArr, configIndex) < 0) return -1;
if (configIndex == NULL) {
TAOS_CHECK_EXIT(TSDB_CODE_OUT_OF_MEMORY);
}
TAOS_CHECK_EXIT(tjsonAddIntegerToObject(configIndex, "index", pCfg->configIndexArr[i]));
TAOS_CHECK_EXIT(tjsonAddItemToArray(configIndexArr, configIndex));
continue;
_err:
tjsonDelete(configIndex);
break;
}
return 0;
_exit:
if (code < 0) {
sError("failed to encode raft cfg at line %d since %s", lino, tstrerror(code));
}
TAOS_RETURN(code);
}
int32_t syncWriteCfgFile(SSyncNode *pNode) {
int32_t code = -1;
int32_t code = 0, lino = 0;
char *buffer = NULL;
SJson *pJson = NULL;
TdFilePtr pFile = NULL;
@ -95,39 +126,46 @@ int32_t syncWriteCfgFile(SSyncNode *pNode) {
char file[PATH_MAX] = {0};
snprintf(file, sizeof(file), "%s.bak", realfile);
terrno = TSDB_CODE_OUT_OF_MEMORY;
pJson = tjsonCreateObject();
if (pJson == NULL) goto _OVER;
if (tjsonAddObject(pJson, "RaftCfg", syncEncodeRaftCfg, pCfg) < 0) goto _OVER;
if ((pJson = tjsonCreateObject()) == NULL) {
TAOS_CHECK_EXIT(TSDB_CODE_OUT_OF_MEMORY);
}
TAOS_CHECK_EXIT(tjsonAddObject(pJson, "RaftCfg", syncEncodeRaftCfg, pCfg));
buffer = tjsonToString(pJson);
if (buffer == NULL) goto _OVER;
terrno = 0;
if (buffer == NULL) {
TAOS_CHECK_EXIT(TSDB_CODE_OUT_OF_MEMORY);
}
pFile = taosOpenFile(file, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_WRITE_THROUGH);
if (pFile == NULL) goto _OVER;
if (pFile == NULL) {
code = terrno ? terrno : TAOS_SYSTEM_ERROR(errno);
TAOS_CHECK_EXIT(code);
}
int32_t len = strlen(buffer);
if (taosWriteFile(pFile, buffer, len) <= 0) goto _OVER;
if (taosFsyncFile(pFile) < 0) goto _OVER;
if (taosWriteFile(pFile, buffer, len) <= 0) {
TAOS_CHECK_EXIT(TAOS_SYSTEM_ERROR(errno));
}
if (taosFsyncFile(pFile) < 0) {
TAOS_CHECK_EXIT(TAOS_SYSTEM_ERROR(errno));
}
taosCloseFile(&pFile);
if (taosRenameFile(file, realfile) != 0) goto _OVER;
(void)taosCloseFile(&pFile);
if (taosRenameFile(file, realfile) != 0) {
TAOS_CHECK_EXIT(TAOS_SYSTEM_ERROR(errno));
}
code = 0;
sInfo("vgId:%d, succeed to write sync cfg file:%s, len:%d, lastConfigIndex:%" PRId64 ", "
"changeVersion:%d", pNode->vgId,
realfile, len, pNode->raftCfg.lastConfigIndex, pNode->raftCfg.cfg.changeVersion);
sInfo("vgId:%d, succeed to write sync cfg file:%s, len:%d, lastConfigIndex:%" PRId64 ", changeVersion:%d",
pNode->vgId, realfile, len, pNode->raftCfg.lastConfigIndex, pNode->raftCfg.cfg.changeVersion);
_OVER:
_exit:
if (pJson != NULL) tjsonDelete(pJson);
if (buffer != NULL) taosMemoryFree(buffer);
if (pFile != NULL) taosCloseFile(&pFile);
if (code != 0) {
if (terrno == 0) terrno = TAOS_SYSTEM_ERROR(errno);
sError("vgId:%d, failed to write sync cfg file:%s since %s", pNode->vgId, realfile, terrstr());
sError("vgId:%d, failed to write sync cfg file:%s since %s", pNode->vgId, realfile, tstrerror(code));
}
return code;
TAOS_RETURN(code);
}
static int32_t syncDecodeSyncCfg(const SJson *pJson, void *pObj) {
@ -135,32 +173,31 @@ static int32_t syncDecodeSyncCfg(const SJson *pJson, void *pObj) {
int32_t code = 0;
tjsonGetInt32ValueFromDouble(pJson, "replicaNum", pCfg->replicaNum, code);
if (code < 0) return -1;
if (code < 0) return TSDB_CODE_INVALID_JSON_FORMAT;
tjsonGetInt32ValueFromDouble(pJson, "myIndex", pCfg->myIndex, code);
if (code < 0) return -1;
if (code < 0) return TSDB_CODE_INVALID_JSON_FORMAT;
tjsonGetInt32ValueFromDouble(pJson, "changeVersion", pCfg->changeVersion, code);
if (code < 0) return -1;
if (code < 0) return TSDB_CODE_INVALID_JSON_FORMAT;
SJson *nodeInfo = tjsonGetObjectItem(pJson, "nodeInfo");
if (nodeInfo == NULL) return -1;
if (nodeInfo == NULL) return TSDB_CODE_INVALID_JSON_FORMAT;
pCfg->totalReplicaNum = tjsonGetArraySize(nodeInfo);
for (int32_t i = 0; i < pCfg->totalReplicaNum; ++i) {
SJson *info = tjsonGetArrayItem(nodeInfo, i);
if (info == NULL) return -1;
if (info == NULL) return TSDB_CODE_INVALID_JSON_FORMAT;
tjsonGetUInt16ValueFromDouble(info, "nodePort", pCfg->nodeInfo[i].nodePort, code);
if (code < 0) return -1;
if (code < 0) return TSDB_CODE_INVALID_JSON_FORMAT;
code = tjsonGetStringValue(info, "nodeFqdn", pCfg->nodeInfo[i].nodeFqdn);
if (code < 0) return -1;
if (code < 0) return TSDB_CODE_INVALID_JSON_FORMAT;
tjsonGetNumberValue(info, "nodeId", pCfg->nodeInfo[i].nodeId, code);
tjsonGetNumberValue(info, "clusterId", pCfg->nodeInfo[i].clusterId, code);
char role[10] = {0};
code = tjsonGetStringValue(info, "isReplica", role);
if(code < 0) return -1;
if(strlen(role) != 0){
if (code < 0) return TSDB_CODE_INVALID_JSON_FORMAT;
if (strlen(role) != 0) {
pCfg->nodeInfo[i].nodeRole = syncStrToRole(role);
}
else{
} else {
pCfg->nodeInfo[i].nodeRole = TAOS_SYNC_ROLE_VOTER;
}
}
@ -172,34 +209,34 @@ static int32_t syncDecodeRaftCfg(const SJson *pJson, void *pObj) {
SRaftCfg *pCfg = (SRaftCfg *)pObj;
int32_t code = 0;
if (tjsonToObject(pJson, "SSyncCfg", syncDecodeSyncCfg, (void *)&pCfg->cfg) < 0) return -1;
TAOS_CHECK_RETURN(tjsonToObject(pJson, "SSyncCfg", syncDecodeSyncCfg, (void *)&pCfg->cfg));
tjsonGetInt8ValueFromDouble(pJson, "isStandBy", pCfg->isStandBy, code);
if (code < 0) return -1;
if (code < 0) return TSDB_CODE_INVALID_JSON_FORMAT;
tjsonGetInt8ValueFromDouble(pJson, "snapshotStrategy", pCfg->snapshotStrategy, code);
if (code < 0) return -1;
if (code < 0) return TSDB_CODE_INVALID_JSON_FORMAT;
tjsonGetInt32ValueFromDouble(pJson, "batchSize", pCfg->batchSize, code);
if (code < 0) return -1;
if (code < 0) return TSDB_CODE_INVALID_JSON_FORMAT;
tjsonGetNumberValue(pJson, "lastConfigIndex", pCfg->lastConfigIndex, code);
if (code < 0) return -1;
if (code < 0) return TSDB_CODE_INVALID_JSON_FORMAT;
tjsonGetInt32ValueFromDouble(pJson, "configIndexCount", pCfg->configIndexCount, code);
if (code < 0) return TSDB_CODE_INVALID_JSON_FORMAT;
SJson *configIndexArr = tjsonGetObjectItem(pJson, "configIndexArr");
if (configIndexArr == NULL) return -1;
if (configIndexArr == NULL) return TSDB_CODE_INVALID_JSON_FORMAT;
pCfg->configIndexCount = tjsonGetArraySize(configIndexArr);
for (int32_t i = 0; i < pCfg->configIndexCount; ++i) {
SJson *configIndex = tjsonGetArrayItem(configIndexArr, i);
if (configIndex == NULL) return -1;
if (configIndex == NULL) return TSDB_CODE_INVALID_JSON_FORMAT;
tjsonGetNumberValue(configIndex, "index", pCfg->configIndexArr[i], code);
if (code < 0) return -1;
if (code < 0) return TSDB_CODE_INVALID_JSON_FORMAT;
}
return 0;
}
int32_t syncReadCfgFile(SSyncNode *pNode) {
int32_t code = -1;
int32_t code = 0;
TdFilePtr pFile = NULL;
char *pData = NULL;
SJson *pJson = NULL;
@ -208,27 +245,27 @@ int32_t syncReadCfgFile(SSyncNode *pNode) {
pFile = taosOpenFile(file, TD_FILE_READ);
if (pFile == NULL) {
terrno = TAOS_SYSTEM_ERROR(errno);
sError("vgId:%d, failed to open sync cfg file:%s since %s", pNode->vgId, file, terrstr());
code = TAOS_SYSTEM_ERROR(errno);
sError("vgId:%d, failed to open sync cfg file:%s since %s", pNode->vgId, file, tstrerror(code));
goto _OVER;
}
int64_t size = 0;
if (taosFStatFile(pFile, &size, NULL) < 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
sError("vgId:%d, failed to fstat sync cfg file:%s since %s", pNode->vgId, file, terrstr());
code = TAOS_SYSTEM_ERROR(errno);
sError("vgId:%d, failed to fstat sync cfg file:%s since %s", pNode->vgId, file, tstrerror(code));
goto _OVER;
}
pData = taosMemoryMalloc(size + 1);
if (pData == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
code = TSDB_CODE_OUT_OF_MEMORY;
goto _OVER;
}
if (taosReadFile(pFile, pData, size) != size) {
terrno = TAOS_SYSTEM_ERROR(errno);
sError("vgId:%d, failed to read sync cfg file:%s since %s", pNode->vgId, file, terrstr());
code = TAOS_SYSTEM_ERROR(errno);
sError("vgId:%d, failed to read sync cfg file:%s since %s", pNode->vgId, file, tstrerror(code));
goto _OVER;
}
@ -236,18 +273,16 @@ int32_t syncReadCfgFile(SSyncNode *pNode) {
pJson = tjsonParse(pData);
if (pJson == NULL) {
terrno = TSDB_CODE_INVALID_JSON_FORMAT;
code = TSDB_CODE_INVALID_JSON_FORMAT;
goto _OVER;
}
if (tjsonToObject(pJson, "RaftCfg", syncDecodeRaftCfg, (void *)pCfg) < 0) {
terrno = TSDB_CODE_INVALID_JSON_FORMAT;
code = TSDB_CODE_INVALID_JSON_FORMAT;
goto _OVER;
}
code = 0;
sInfo("vgId:%d, succceed to read sync cfg file %s, changeVersion:%d",
pNode->vgId, file, pCfg->cfg.changeVersion);
sInfo("vgId:%d, succceed to read sync cfg file %s, changeVersion:%d", pNode->vgId, file, pCfg->cfg.changeVersion);
_OVER:
if (pData != NULL) taosMemoryFree(pData);
@ -255,15 +290,15 @@ _OVER:
if (pFile != NULL) taosCloseFile(&pFile);
if (code != 0) {
sError("vgId:%d, failed to read sync cfg file:%s since %s", pNode->vgId, file, terrstr());
sError("vgId:%d, failed to read sync cfg file:%s since %s", pNode->vgId, file, tstrerror(code));
}
return code;
TAOS_RETURN(code);
}
int32_t syncAddCfgIndex(SSyncNode *pNode, SyncIndex cfgIndex) {
SRaftCfg *pCfg = &pNode->raftCfg;
if (pCfg->configIndexCount < MAX_CONFIG_INDEX_COUNT) {
return -1;
if (pCfg->configIndexCount >= MAX_CONFIG_INDEX_COUNT) {
return TSDB_CODE_OUT_OF_RANGE;
}
pCfg->configIndexArr[pCfg->configIndexCount] = cfgIndex;

View File

@ -99,9 +99,14 @@ void syncEntryDestroy(SSyncRaftEntry* pEntry) {
}
}
void syncEntry2OriginalRpc(const SSyncRaftEntry* pEntry, SRpcMsg* pRpcMsg) {
int32_t syncEntry2OriginalRpc(const SSyncRaftEntry* pEntry, SRpcMsg* pRpcMsg) {
pRpcMsg->msgType = pEntry->originalRpcType;
pRpcMsg->contLen = (int32_t)(pEntry->dataLen);
pRpcMsg->pCont = rpcMallocCont(pRpcMsg->contLen);
if (pRpcMsg->pCont == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
memcpy(pRpcMsg->pCont, pEntry->data, pRpcMsg->contLen);
return 0;
}

View File

@ -65,7 +65,7 @@ static int32_t syncNodeTimerRoutine(SSyncNode* ths) {
}
// timer replicate
syncNodeReplicate(ths);
(void)syncNodeReplicate(ths);
// clean mnode index
if (syncNodeIsMnode(ths)) {
@ -89,7 +89,7 @@ static int32_t syncNodeTimerRoutine(SSyncNode* ths) {
snapshotSenderStop(pSender, false);
} else {
sSWarn(pSender, "snap replication resend.");
snapshotReSend(pSender);
(void)snapshotReSend(pSender);
}
}
}
@ -112,14 +112,14 @@ int32_t syncNodeOnTimeout(SSyncNode* ths, const SRpcMsg* pRpc) {
if (atomic_load_64(&ths->pingTimerLogicClockUser) <= pMsg->logicClock) {
++(ths->pingTimerCounter);
syncNodeTimerRoutine(ths);
(void)syncNodeTimerRoutine(ths);
}
} else if (pMsg->timeoutType == SYNC_TIMEOUT_ELECTION) {
if (atomic_load_64(&ths->electTimerLogicClock) <= pMsg->logicClock) {
++(ths->electTimerCounter);
syncNodeElect(ths);
(void)syncNodeElect(ths);
}
} else if (pMsg->timeoutType == SYNC_TIMEOUT_HEARTBEAT) {

View File

@ -1100,6 +1100,7 @@ TdFilePtr taosOpenFile(const char *path, int32_t tdFileOptions) {
if (fd >= 0) (void)close(fd);
#endif
if (fp != NULL) (void)fclose(fp);
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}