Merge pull request #19533 from taosdata/FIX/TD-21662-main
enh: skip WAL forceSync for single replica vgroup
This commit is contained in:
commit
e11d57d58c
|
@ -193,7 +193,7 @@ typedef struct SSyncLogStore {
|
|||
SyncIndex (*syncLogLastIndex)(struct SSyncLogStore* pLogStore);
|
||||
SyncTerm (*syncLogLastTerm)(struct SSyncLogStore* pLogStore);
|
||||
|
||||
int32_t (*syncLogAppendEntry)(struct SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry);
|
||||
int32_t (*syncLogAppendEntry)(struct SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry, bool forcSync);
|
||||
int32_t (*syncLogGetEntry)(struct SSyncLogStore* pLogStore, SyncIndex index, SSyncRaftEntry** ppEntry);
|
||||
int32_t (*syncLogTruncate)(struct SSyncLogStore* pLogStore, SyncIndex fromIndex);
|
||||
|
||||
|
|
|
@ -357,7 +357,7 @@ int32_t syncNodeOnAppendEntriesOld(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
|
|||
ASSERT(pAppendEntry->index == appendIndex);
|
||||
|
||||
// append
|
||||
code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pAppendEntry);
|
||||
code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pAppendEntry, false);
|
||||
if (code != 0) {
|
||||
char logBuf[128];
|
||||
snprintf(logBuf, sizeof(logBuf), "ignore, append error, append-index:%" PRId64, appendIndex);
|
||||
|
@ -398,7 +398,7 @@ int32_t syncNodeOnAppendEntriesOld(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
|
|||
}
|
||||
|
||||
// append
|
||||
code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pAppendEntry);
|
||||
code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pAppendEntry, false);
|
||||
if (code != 0) {
|
||||
char logBuf[128];
|
||||
snprintf(logBuf, sizeof(logBuf), "ignore, log not exist, append error, append-index:%" PRId64, appendIndex);
|
||||
|
|
|
@ -2477,7 +2477,7 @@ static int32_t syncNodeAppendNoopOld(SSyncNode* ths) {
|
|||
LRUHandle* h = NULL;
|
||||
|
||||
if (ths->state == TAOS_SYNC_STATE_LEADER) {
|
||||
int32_t code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pEntry);
|
||||
int32_t code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pEntry, false);
|
||||
if (code != 0) {
|
||||
sError("append noop error");
|
||||
return -1;
|
||||
|
@ -2720,7 +2720,7 @@ int32_t syncNodeOnClientRequestOld(SSyncNode* ths, SRpcMsg* pMsg, SyncIndex* pRe
|
|||
|
||||
if (ths->state == TAOS_SYNC_STATE_LEADER) {
|
||||
// append entry
|
||||
code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pEntry);
|
||||
code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pEntry, false);
|
||||
if (code != 0) {
|
||||
if (ths->replicaNum == 1) {
|
||||
if (h) {
|
||||
|
|
|
@ -364,7 +364,11 @@ _out:
|
|||
return ret;
|
||||
}
|
||||
|
||||
int32_t syncLogStorePersist(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry) {
|
||||
static inline bool syncLogStoreNeedFlush(SSyncRaftEntry* pEntry, int32_t replicaNum) {
|
||||
return (replicaNum > 1) && (pEntry->originalRpcType == TDMT_VND_COMMIT);
|
||||
}
|
||||
|
||||
int32_t syncLogStorePersist(SSyncLogStore* pLogStore, SSyncNode* pNode, SSyncRaftEntry* pEntry) {
|
||||
ASSERT(pEntry->index >= 0);
|
||||
SyncIndex lastVer = pLogStore->syncLogLastIndex(pLogStore);
|
||||
if (lastVer >= pEntry->index && pLogStore->syncLogTruncate(pLogStore, pEntry->index) < 0) {
|
||||
|
@ -374,7 +378,8 @@ int32_t syncLogStorePersist(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry) {
|
|||
lastVer = pLogStore->syncLogLastIndex(pLogStore);
|
||||
ASSERT(pEntry->index == lastVer + 1);
|
||||
|
||||
if (pLogStore->syncLogAppendEntry(pLogStore, pEntry) < 0) {
|
||||
bool doFsync = syncLogStoreNeedFlush(pEntry, pNode->replicaNum);
|
||||
if (pLogStore->syncLogAppendEntry(pLogStore, pEntry, doFsync) < 0) {
|
||||
sError("failed to append sync log entry since %s. index:%" PRId64 ", term:%" PRId64 "", terrstr(), pEntry->index,
|
||||
pEntry->term);
|
||||
return -1;
|
||||
|
@ -436,7 +441,7 @@ int64_t syncLogBufferProceed(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncTerm* p
|
|||
(void)syncNodeReplicateWithoutLock(pNode);
|
||||
|
||||
// persist
|
||||
if (syncLogStorePersist(pLogStore, pEntry) < 0) {
|
||||
if (syncLogStorePersist(pLogStore, pNode, pEntry) < 0) {
|
||||
sError("vgId:%d, failed to persist sync log entry from buffer since %s. index:%" PRId64, pNode->vgId, terrstr(),
|
||||
pEntry->index);
|
||||
goto _out;
|
||||
|
|
|
@ -23,7 +23,7 @@
|
|||
|
||||
// public function
|
||||
static int32_t raftLogRestoreFromSnapshot(struct SSyncLogStore* pLogStore, SyncIndex snapshotIndex);
|
||||
static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry);
|
||||
static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry, bool forceSync);
|
||||
static int32_t raftLogTruncate(struct SSyncLogStore* pLogStore, SyncIndex fromIndex);
|
||||
static bool raftLogExist(struct SSyncLogStore* pLogStore, SyncIndex index);
|
||||
static int32_t raftLogUpdateCommitIndex(SSyncLogStore* pLogStore, SyncIndex index);
|
||||
|
@ -192,9 +192,7 @@ SyncTerm raftLogLastTerm(struct SSyncLogStore* pLogStore) {
|
|||
return SYNC_TERM_INVALID;
|
||||
}
|
||||
|
||||
static inline bool raftLogForceSync(SSyncRaftEntry* pEntry) { return (pEntry->originalRpcType == TDMT_VND_COMMIT); }
|
||||
|
||||
static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry) {
|
||||
static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry, bool forceSync) {
|
||||
SSyncLogStoreData* pData = pLogStore->data;
|
||||
SWal* pWal = pData->pWal;
|
||||
|
||||
|
@ -221,7 +219,6 @@ static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntr
|
|||
|
||||
ASSERT(pEntry->index == index);
|
||||
|
||||
bool forceSync = raftLogForceSync(pEntry);
|
||||
walFsync(pWal, forceSync);
|
||||
|
||||
sNTrace(pData->pSyncNode, "write index:%" PRId64 ", type:%s, origin type:%s, elapsed:%" PRId64, pEntry->index,
|
||||
|
|
Loading…
Reference in New Issue