From 883d2bfb7ab248ae8b932ad2d79e45809f0d5a74 Mon Sep 17 00:00:00 2001 From: kailixu Date: Tue, 10 Sep 2024 14:54:00 +0800 Subject: [PATCH 01/13] fix: oom with large submit --- source/libs/sync/src/syncPipeline.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/sync/src/syncPipeline.c b/source/libs/sync/src/syncPipeline.c index a6e9c7de32..4cdc6c3d83 100644 --- a/source/libs/sync/src/syncPipeline.c +++ b/source/libs/sync/src/syncPipeline.c @@ -846,7 +846,7 @@ int32_t syncLogBufferCommit(SSyncLogBuffer* pBuf, SSyncNode* pNode, int64_t comm } // recycle - SyncIndex until = pBuf->commitIndex - TSDB_SYNC_LOG_BUFFER_RETENTION; + SyncIndex until = pBuf->commitIndex; // - TSDB_SYNC_LOG_BUFFER_RETENTION; for (SyncIndex index = pBuf->startIndex; index < until; index++) { SSyncRaftEntry* pEntry = pBuf->entries[(index + pBuf->size) % pBuf->size].pItem; if (pEntry == NULL) return TSDB_CODE_SYN_INTERNAL_ERROR; From 2a13aa08e15c4962b154cc0b9675932c55d074d7 Mon Sep 17 00:00:00 2001 From: kailixu Date: Wed, 11 Sep 2024 14:23:47 +0800 Subject: [PATCH 02/13] fix: oom with large msg --- include/common/tglobal.h | 1 + include/util/tdef.h | 1 + source/common/src/tglobal.c | 17 ++++++++++ source/libs/sync/inc/syncInt.h | 2 +- source/libs/sync/inc/syncPipeline.h | 1 + source/libs/sync/src/syncPipeline.c | 50 ++++++++++++++++++++++++----- 6 files changed, 63 insertions(+), 9 deletions(-) diff --git a/include/common/tglobal.h b/include/common/tglobal.h index 2d4d437649..bece14c17d 100644 --- a/include/common/tglobal.h +++ b/include/common/tglobal.h @@ -94,6 +94,7 @@ extern int32_t tsElectInterval; extern int32_t tsHeartbeatInterval; extern int32_t tsHeartbeatTimeout; extern int32_t tsSnapReplMaxWaitN; +extern int64_t tsLogBufferMemoryAllowed; // maximum allowed log buffer size in bytes for each dnode // arbitrator extern int32_t tsArbHeartBeatIntervalSec; diff --git a/include/util/tdef.h b/include/util/tdef.h index 46a0d01457..46c84ab26a 100644 --- a/include/util/tdef.h +++ b/include/util/tdef.h @@ -332,6 +332,7 @@ typedef enum ELogicConditionType { #define TSDB_MAX_LEARNER_REPLICA 10 #define TSDB_SYNC_LOG_BUFFER_SIZE 4096 #define TSDB_SYNC_LOG_BUFFER_RETENTION 256 +#define TSDB_SYNC_LOG_BUFFER_THRESHOLD (1024 * 1024 * 5) #define TSDB_SYNC_APPLYQ_SIZE_LIMIT 512 #define TSDB_SYNC_NEGOTIATION_WIN 512 diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 5b67e1267b..2519ead06e 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -81,6 +81,7 @@ int32_t tsElectInterval = 25 * 1000; int32_t tsHeartbeatInterval = 1000; int32_t tsHeartbeatTimeout = 20 * 1000; int32_t tsSnapReplMaxWaitN = 128; +int64_t tsLogBufferMemoryAllowed = 0; // bytes // mnode int64_t tsMndSdbWriteDelta = 200; @@ -702,6 +703,9 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { tsQueueMemoryAllowed = tsTotalMemoryKB * 1024 * 0.1; tsQueueMemoryAllowed = TRANGE(tsQueueMemoryAllowed, TSDB_MAX_MSG_SIZE * 10LL, TSDB_MAX_MSG_SIZE * 10000LL); + tsLogBufferMemoryAllowed = tsTotalMemoryKB * 1024 * 0.1; + tsLogBufferMemoryAllowed = TRANGE(tsLogBufferMemoryAllowed, TSDB_MAX_MSG_SIZE * 10LL, TSDB_MAX_MSG_SIZE * 10000LL); + // clang-format off TAOS_CHECK_RETURN(cfgAddDir(pCfg, "dataDir", tsDataDir, CFG_SCOPE_SERVER, CFG_DYN_NONE)); TAOS_CHECK_RETURN(cfgAddFloat(pCfg, "minimalDataDirGB", 2.0f, 0.001f, 10000000, CFG_SCOPE_SERVER, CFG_DYN_NONE)); @@ -736,6 +740,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "syncHeartbeatInterval", tsHeartbeatInterval, 10, 1000 * 60 * 24 * 2, CFG_SCOPE_SERVER, CFG_DYN_NONE)); TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "syncHeartbeatTimeout", tsHeartbeatTimeout, 10, 1000 * 60 * 24 * 2, CFG_SCOPE_SERVER, CFG_DYN_NONE)); TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "syncSnapReplMaxWaitN", tsSnapReplMaxWaitN, 16, (TSDB_SYNC_SNAP_BUFFER_SIZE >> 2), CFG_SCOPE_SERVER, CFG_DYN_NONE)); + TAOS_CHECK_RETURN(cfgAddInt64(pCfg, "syncLogBufferMemoryAllowed", tsLogBufferMemoryAllowed, TSDB_MAX_MSG_SIZE * 10L, INT64_MAX, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER)); TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "arbHeartBeatIntervalSec", tsArbHeartBeatIntervalSec, 1, 60 * 24 * 2, CFG_SCOPE_SERVER, CFG_DYN_NONE)); TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "arbCheckSyncIntervalSec", tsArbCheckSyncIntervalSec, 1, 60 * 24 * 2, CFG_SCOPE_SERVER, CFG_DYN_NONE)); @@ -970,6 +975,14 @@ static int32_t taosUpdateServerCfg(SConfig *pCfg) { pItem->stype = stype; } + pItem = cfgGetItem(tsCfg, "syncLogBufferMemoryAllowed"); + if (pItem != NULL && pItem->stype == CFG_STYPE_DEFAULT) { + tsLogBufferMemoryAllowed = totalMemoryKB * 1024 * 0.1; + tsLogBufferMemoryAllowed = TRANGE(tsLogBufferMemoryAllowed, TSDB_MAX_MSG_SIZE * 10LL, TSDB_MAX_MSG_SIZE * 10000LL); + pItem->i64 = tsLogBufferMemoryAllowed; + pItem->stype = stype; + } + TAOS_RETURN(TSDB_CODE_SUCCESS); } @@ -1520,6 +1533,9 @@ static int32_t taosSetServerCfg(SConfig *pCfg) { TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "syncSnapReplMaxWaitN"); tsSnapReplMaxWaitN = pItem->i32; + TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "syncLogBufferMemoryAllowed"); + tsLogBufferMemoryAllowed = pItem->i64; + TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "arbHeartBeatIntervalSec"); tsArbHeartBeatIntervalSec = pItem->i32; @@ -1954,6 +1970,7 @@ static int32_t taosCfgDynamicOptionsForServer(SConfig *pCfg, const char *name) { {"randErrorChance", &tsRandErrChance}, {"randErrorDivisor", &tsRandErrDivisor}, {"randErrorScope", &tsRandErrScope}, + {"syncLogBufferMemoryAllowed", &tsLogBufferMemoryAllowed}, {"cacheLazyLoadThreshold", &tsCacheLazyLoadThreshold}, {"checkpointInterval", &tsStreamCheckpointInterval}, diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h index 009854b45b..0b653ddbe9 100644 --- a/source/libs/sync/inc/syncInt.h +++ b/source/libs/sync/inc/syncInt.h @@ -21,7 +21,7 @@ extern "C" { #endif #include "sync.h" -#include "taosdef.h" +#include "tglobal.h" #include "trpc.h" #include "ttimer.h" diff --git a/source/libs/sync/inc/syncPipeline.h b/source/libs/sync/inc/syncPipeline.h index ea85b796d5..427a3690f2 100644 --- a/source/libs/sync/inc/syncPipeline.h +++ b/source/libs/sync/inc/syncPipeline.h @@ -54,6 +54,7 @@ typedef struct SSyncLogBuffer { int64_t matchIndex; int64_t endIndex; int64_t size; + int64_t bytes; TdThreadMutex mutex; TdThreadMutexAttr attr; int64_t totalIndex; diff --git a/source/libs/sync/src/syncPipeline.c b/source/libs/sync/src/syncPipeline.c index 4cdc6c3d83..5d20f9290b 100644 --- a/source/libs/sync/src/syncPipeline.c +++ b/source/libs/sync/src/syncPipeline.c @@ -28,6 +28,8 @@ #include "syncUtil.h" #include "syncVoteMgr.h" +static int64_t sSyncLogBufferBytes = 0; // total bytes of vnode log buffer + static bool syncIsMsgBlock(tmsg_t type) { return (type == TDMT_VND_CREATE_TABLE) || (type == TDMT_VND_ALTER_TABLE) || (type == TDMT_VND_DROP_TABLE) || (type == TDMT_VND_UPDATE_TAG_VAL) || (type == TDMT_VND_ALTER_CONFIRM); @@ -101,6 +103,10 @@ int32_t syncLogBufferAppend(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEnt SSyncLogBufEntry tmp = {.pItem = pEntry, .prevLogIndex = pMatch->index, .prevLogTerm = pMatch->term}; pBuf->entries[index % pBuf->size] = tmp; pBuf->endIndex = index + 1; + if (pNode->vgId > 1) { + pBuf->bytes += pEntry->bytes; + atomic_add_fetch_64(&sSyncLogBufferBytes, (int64_t)pEntry->bytes); + } (void)taosThreadMutexUnlock(&pBuf->mutex); TAOS_CHECK_RETURN(syncLogBufferValidate(pBuf)); @@ -330,6 +336,7 @@ int32_t syncLogBufferReInit(SSyncLogBuffer* pBuf, SSyncNode* pNode) { (void)memset(&pBuf->entries[(index + pBuf->size) % pBuf->size], 0, sizeof(pBuf->entries[0])); } pBuf->startIndex = pBuf->commitIndex = pBuf->matchIndex = pBuf->endIndex = 0; + pBuf->bytes = 0; int32_t code = syncLogBufferInitWithoutLock(pBuf, pNode); if (code < 0) { sError("vgId:%d, failed to re-initialize sync log buffer since %s.", pNode->vgId, tstrerror(code)); @@ -470,8 +477,12 @@ int32_t syncLogBufferAccept(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEnt goto _out; } SSyncLogBufEntry tmp = {.pItem = pEntry, .prevLogIndex = prevIndex, .prevLogTerm = prevTerm}; - pEntry = NULL; pBuf->entries[index % pBuf->size] = tmp; + if (pNode->vgId > 1) { + pBuf->bytes += pEntry->bytes; + atomic_add_fetch_64(&sSyncLogBufferBytes, (int64_t)pEntry->bytes); + } + pEntry = NULL; // update end index pBuf->endIndex = TMAX(index + 1, pBuf->endIndex); @@ -846,14 +857,36 @@ int32_t syncLogBufferCommit(SSyncLogBuffer* pBuf, SSyncNode* pNode, int64_t comm } // recycle - SyncIndex until = pBuf->commitIndex; // - TSDB_SYNC_LOG_BUFFER_RETENTION; - for (SyncIndex index = pBuf->startIndex; index < until; index++) { - SSyncRaftEntry* pEntry = pBuf->entries[(index + pBuf->size) % pBuf->size].pItem; - if (pEntry == NULL) return TSDB_CODE_SYN_INTERNAL_ERROR; + bool isVnode = pNode->vgId > 1; + SyncIndex until = pBuf->commitIndex - TSDB_SYNC_LOG_BUFFER_RETENTION; + do { + if (pBuf->startIndex >= pBuf->commitIndex) { + break; + } + SSyncRaftEntry* pEntry = pBuf->entries[(pBuf->startIndex + pBuf->size) % pBuf->size].pItem; + if (pEntry == NULL) { + sError("vgId:%d, invalid log entry to recycle. index:%" PRId64 ", startIndex:%" PRId64 ", until:%" PRId64 + ", commitIndex:%" PRId64 ", endIndex:%" PRId64 ", term:%" PRId64, + pNode->vgId, pEntry->index, pBuf->startIndex, until, pBuf->commitIndex, pBuf->endIndex, pEntry->term); + return TSDB_CODE_SYN_INTERNAL_ERROR; + } + if (!((pBuf->startIndex < until) || (isVnode && pBuf->bytes >= TSDB_SYNC_LOG_BUFFER_THRESHOLD && + atomic_load_64(&sSyncLogBufferBytes) >= tsLogBufferMemoryAllowed))) { + break; + } + if (isVnode) { + pBuf->bytes -= pEntry->bytes; + atomic_sub_fetch_64(&sSyncLogBufferBytes, (int64_t)pEntry->bytes); + } + sDebug("vgId:%d, recycle log entry. index:%" PRId64 ", startIndex:%" PRId64 ", until:%" PRId64 + ", commitIndex:%" PRId64 ", endIndex:%" PRId64 ", term:%" PRId64 ", entry bytes:%u, buf bytes:%" PRId64 + ", total bytes:%" PRId64, + pNode->vgId, pEntry->index, pBuf->startIndex, until, pBuf->commitIndex, pBuf->endIndex, pEntry->term, + pEntry->bytes, pBuf->bytes, atomic_load_64(&sSyncLogBufferBytes)); syncEntryDestroy(pEntry); - (void)memset(&pBuf->entries[(index + pBuf->size) % pBuf->size], 0, sizeof(pBuf->entries[0])); - pBuf->startIndex = index + 1; - } + memset(&pBuf->entries[(pBuf->startIndex + pBuf->size) % pBuf->size], 0, sizeof(pBuf->entries[0])); + ++pBuf->startIndex; + } while (true); code = 0; _out: @@ -1324,6 +1357,7 @@ void syncLogBufferClear(SSyncLogBuffer* pBuf) { (void)memset(&pBuf->entries[(index + pBuf->size) % pBuf->size], 0, sizeof(pBuf->entries[0])); } pBuf->startIndex = pBuf->commitIndex = pBuf->matchIndex = pBuf->endIndex = 0; + pBuf->bytes = 0; (void)taosThreadMutexUnlock(&pBuf->mutex); } From e92a58043ab8923b2f0b2301185c092db190bf1f Mon Sep 17 00:00:00 2001 From: Kaili Xu Date: Wed, 11 Sep 2024 14:44:29 +0800 Subject: [PATCH 03/13] enh: code optimization --- source/libs/sync/src/syncPipeline.c | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/source/libs/sync/src/syncPipeline.c b/source/libs/sync/src/syncPipeline.c index 5d20f9290b..1ba9de8fa7 100644 --- a/source/libs/sync/src/syncPipeline.c +++ b/source/libs/sync/src/syncPipeline.c @@ -860,7 +860,9 @@ int32_t syncLogBufferCommit(SSyncLogBuffer* pBuf, SSyncNode* pNode, int64_t comm bool isVnode = pNode->vgId > 1; SyncIndex until = pBuf->commitIndex - TSDB_SYNC_LOG_BUFFER_RETENTION; do { - if (pBuf->startIndex >= pBuf->commitIndex) { + if ((pBuf->startIndex >= pBuf->commitIndex) || + !((pBuf->startIndex < until) || (isVnode && pBuf->bytes >= TSDB_SYNC_LOG_BUFFER_THRESHOLD && + atomic_load_64(&sSyncLogBufferBytes) >= tsLogBufferMemoryAllowed))) { break; } SSyncRaftEntry* pEntry = pBuf->entries[(pBuf->startIndex + pBuf->size) % pBuf->size].pItem; @@ -870,10 +872,6 @@ int32_t syncLogBufferCommit(SSyncLogBuffer* pBuf, SSyncNode* pNode, int64_t comm pNode->vgId, pEntry->index, pBuf->startIndex, until, pBuf->commitIndex, pBuf->endIndex, pEntry->term); return TSDB_CODE_SYN_INTERNAL_ERROR; } - if (!((pBuf->startIndex < until) || (isVnode && pBuf->bytes >= TSDB_SYNC_LOG_BUFFER_THRESHOLD && - atomic_load_64(&sSyncLogBufferBytes) >= tsLogBufferMemoryAllowed))) { - break; - } if (isVnode) { pBuf->bytes -= pEntry->bytes; atomic_sub_fetch_64(&sSyncLogBufferBytes, (int64_t)pEntry->bytes); From ffaa1092c027577eb28e616c8feebb6e9ccfe977 Mon Sep 17 00:00:00 2001 From: kailixu Date: Wed, 11 Sep 2024 14:51:52 +0800 Subject: [PATCH 04/13] fix: oom with large msg --- source/libs/sync/src/syncPipeline.c | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/source/libs/sync/src/syncPipeline.c b/source/libs/sync/src/syncPipeline.c index 1ba9de8fa7..d7c55fcd8a 100644 --- a/source/libs/sync/src/syncPipeline.c +++ b/source/libs/sync/src/syncPipeline.c @@ -28,7 +28,7 @@ #include "syncUtil.h" #include "syncVoteMgr.h" -static int64_t sSyncLogBufferBytes = 0; // total bytes of vnode log buffer +int64_t tsLogBufferMemoryUsed = 0; // total bytes of vnode log buffer static bool syncIsMsgBlock(tmsg_t type) { return (type == TDMT_VND_CREATE_TABLE) || (type == TDMT_VND_ALTER_TABLE) || (type == TDMT_VND_DROP_TABLE) || @@ -105,7 +105,7 @@ int32_t syncLogBufferAppend(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEnt pBuf->endIndex = index + 1; if (pNode->vgId > 1) { pBuf->bytes += pEntry->bytes; - atomic_add_fetch_64(&sSyncLogBufferBytes, (int64_t)pEntry->bytes); + atomic_add_fetch_64(&tsLogBufferMemoryUsed, (int64_t)pEntry->bytes); } (void)taosThreadMutexUnlock(&pBuf->mutex); @@ -480,7 +480,7 @@ int32_t syncLogBufferAccept(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEnt pBuf->entries[index % pBuf->size] = tmp; if (pNode->vgId > 1) { pBuf->bytes += pEntry->bytes; - atomic_add_fetch_64(&sSyncLogBufferBytes, (int64_t)pEntry->bytes); + atomic_add_fetch_64(&tsLogBufferMemoryUsed, (int64_t)pEntry->bytes); } pEntry = NULL; @@ -862,7 +862,7 @@ int32_t syncLogBufferCommit(SSyncLogBuffer* pBuf, SSyncNode* pNode, int64_t comm do { if ((pBuf->startIndex >= pBuf->commitIndex) || !((pBuf->startIndex < until) || (isVnode && pBuf->bytes >= TSDB_SYNC_LOG_BUFFER_THRESHOLD && - atomic_load_64(&sSyncLogBufferBytes) >= tsLogBufferMemoryAllowed))) { + atomic_load_64(&tsLogBufferMemoryUsed) >= tsLogBufferMemoryAllowed))) { break; } SSyncRaftEntry* pEntry = pBuf->entries[(pBuf->startIndex + pBuf->size) % pBuf->size].pItem; @@ -874,13 +874,13 @@ int32_t syncLogBufferCommit(SSyncLogBuffer* pBuf, SSyncNode* pNode, int64_t comm } if (isVnode) { pBuf->bytes -= pEntry->bytes; - atomic_sub_fetch_64(&sSyncLogBufferBytes, (int64_t)pEntry->bytes); + atomic_sub_fetch_64(&tsLogBufferMemoryUsed, (int64_t)pEntry->bytes); } sDebug("vgId:%d, recycle log entry. index:%" PRId64 ", startIndex:%" PRId64 ", until:%" PRId64 ", commitIndex:%" PRId64 ", endIndex:%" PRId64 ", term:%" PRId64 ", entry bytes:%u, buf bytes:%" PRId64 ", total bytes:%" PRId64, pNode->vgId, pEntry->index, pBuf->startIndex, until, pBuf->commitIndex, pBuf->endIndex, pEntry->term, - pEntry->bytes, pBuf->bytes, atomic_load_64(&sSyncLogBufferBytes)); + pEntry->bytes, pBuf->bytes, atomic_load_64(&tsLogBufferMemoryUsed)); syncEntryDestroy(pEntry); memset(&pBuf->entries[(pBuf->startIndex + pBuf->size) % pBuf->size], 0, sizeof(pBuf->entries[0])); ++pBuf->startIndex; From 8b24bae5334a8fe188b32fae3d784eddb77af612 Mon Sep 17 00:00:00 2001 From: Kaili Xu Date: Wed, 11 Sep 2024 17:08:34 +0800 Subject: [PATCH 05/13] enh: return value of memset --- source/libs/sync/src/syncPipeline.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/sync/src/syncPipeline.c b/source/libs/sync/src/syncPipeline.c index d7c55fcd8a..ae0d5b7ddb 100644 --- a/source/libs/sync/src/syncPipeline.c +++ b/source/libs/sync/src/syncPipeline.c @@ -882,7 +882,7 @@ int32_t syncLogBufferCommit(SSyncLogBuffer* pBuf, SSyncNode* pNode, int64_t comm pNode->vgId, pEntry->index, pBuf->startIndex, until, pBuf->commitIndex, pBuf->endIndex, pEntry->term, pEntry->bytes, pBuf->bytes, atomic_load_64(&tsLogBufferMemoryUsed)); syncEntryDestroy(pEntry); - memset(&pBuf->entries[(pBuf->startIndex + pBuf->size) % pBuf->size], 0, sizeof(pBuf->entries[0])); + (void)memset(&pBuf->entries[(pBuf->startIndex + pBuf->size) % pBuf->size], 0, sizeof(pBuf->entries[0])); ++pBuf->startIndex; } while (true); From e127a29e64870330a78497180dd8e3ac1f0cbb91 Mon Sep 17 00:00:00 2001 From: kailixu Date: Thu, 12 Sep 2024 05:55:22 +0800 Subject: [PATCH 06/13] fix: oom with large msg --- source/libs/sync/src/syncPipeline.c | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/source/libs/sync/src/syncPipeline.c b/source/libs/sync/src/syncPipeline.c index d7c55fcd8a..72d586f47e 100644 --- a/source/libs/sync/src/syncPipeline.c +++ b/source/libs/sync/src/syncPipeline.c @@ -266,6 +266,10 @@ int32_t syncLogBufferInitWithoutLock(SSyncLogBuffer* pBuf, SSyncNode* pNode) { SSyncLogBufEntry tmp = {.pItem = pEntry, .prevLogIndex = -1, .prevLogTerm = -1}; pBuf->entries[index % pBuf->size] = tmp; taken = true; + if (pNode->vgId > 1) { + pBuf->bytes += pEntry->bytes; + atomic_add_fetch_64(&tsLogBufferMemoryUsed, (int64_t)pEntry->bytes); + } } if (index < toIndex) { @@ -292,6 +296,10 @@ int32_t syncLogBufferInitWithoutLock(SSyncLogBuffer* pBuf, SSyncNode* pNode) { } SSyncLogBufEntry tmp = {.pItem = pDummy, .prevLogIndex = commitIndex - 1, .prevLogTerm = commitTerm}; pBuf->entries[(commitIndex + pBuf->size) % pBuf->size] = tmp; + if (pNode->vgId > 1) { + pBuf->bytes += pDummy->bytes; + atomic_add_fetch_64(&tsLogBufferMemoryUsed, (int64_t)pDummy->bytes); + } if (index < toIndex) { pBuf->entries[(index + 1) % pBuf->size].prevLogIndex = commitIndex; @@ -878,9 +886,9 @@ int32_t syncLogBufferCommit(SSyncLogBuffer* pBuf, SSyncNode* pNode, int64_t comm } sDebug("vgId:%d, recycle log entry. index:%" PRId64 ", startIndex:%" PRId64 ", until:%" PRId64 ", commitIndex:%" PRId64 ", endIndex:%" PRId64 ", term:%" PRId64 ", entry bytes:%u, buf bytes:%" PRId64 - ", total bytes:%" PRId64, + ", used:%" PRId64 ", allowed:%" PRId64, pNode->vgId, pEntry->index, pBuf->startIndex, until, pBuf->commitIndex, pBuf->endIndex, pEntry->term, - pEntry->bytes, pBuf->bytes, atomic_load_64(&tsLogBufferMemoryUsed)); + pEntry->bytes, pBuf->bytes, atomic_load_64(&tsLogBufferMemoryUsed), tsLogBufferMemoryAllowed); syncEntryDestroy(pEntry); memset(&pBuf->entries[(pBuf->startIndex + pBuf->size) % pBuf->size], 0, sizeof(pBuf->entries[0])); ++pBuf->startIndex; From e3121c24ebae09ba5e09f1be26406256046e358e Mon Sep 17 00:00:00 2001 From: kailixu Date: Thu, 12 Sep 2024 06:38:15 +0800 Subject: [PATCH 07/13] fix: compile problem --- source/libs/sync/inc/syncInt.h | 2 +- source/libs/sync/src/syncPipeline.c | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h index 0b653ddbe9..2ff890da56 100644 --- a/source/libs/sync/inc/syncInt.h +++ b/source/libs/sync/inc/syncInt.h @@ -21,7 +21,7 @@ extern "C" { #endif #include "sync.h" -#include "tglobal.h" +#include "tdef.h" #include "trpc.h" #include "ttimer.h" diff --git a/source/libs/sync/src/syncPipeline.c b/source/libs/sync/src/syncPipeline.c index 5eac01b884..279583340d 100644 --- a/source/libs/sync/src/syncPipeline.c +++ b/source/libs/sync/src/syncPipeline.c @@ -27,6 +27,7 @@ #include "syncSnapshot.h" #include "syncUtil.h" #include "syncVoteMgr.h" +#include "tglobal.h" int64_t tsLogBufferMemoryUsed = 0; // total bytes of vnode log buffer From 15c92cea3fda0e8204fac0af9044ab4fd57b5dfa Mon Sep 17 00:00:00 2001 From: kailixu Date: Thu, 12 Sep 2024 07:11:26 +0800 Subject: [PATCH 08/13] fix: compile problem --- source/libs/sync/inc/syncInt.h | 2 +- source/libs/sync/src/syncPipeline.c | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h index 2ff890da56..0b653ddbe9 100644 --- a/source/libs/sync/inc/syncInt.h +++ b/source/libs/sync/inc/syncInt.h @@ -21,7 +21,7 @@ extern "C" { #endif #include "sync.h" -#include "tdef.h" +#include "tglobal.h" #include "trpc.h" #include "ttimer.h" diff --git a/source/libs/sync/src/syncPipeline.c b/source/libs/sync/src/syncPipeline.c index 279583340d..5eac01b884 100644 --- a/source/libs/sync/src/syncPipeline.c +++ b/source/libs/sync/src/syncPipeline.c @@ -27,7 +27,6 @@ #include "syncSnapshot.h" #include "syncUtil.h" #include "syncVoteMgr.h" -#include "tglobal.h" int64_t tsLogBufferMemoryUsed = 0; // total bytes of vnode log buffer From d9a25b31fe6346edc709bace7cc7698d1e40744f Mon Sep 17 00:00:00 2001 From: kailixu Date: Thu, 12 Sep 2024 08:32:59 +0800 Subject: [PATCH 09/13] fix: compile problem --- source/libs/sync/inc/syncInt.h | 2 +- source/libs/sync/src/syncPipeline.c | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h index 0b653ddbe9..2ff890da56 100644 --- a/source/libs/sync/inc/syncInt.h +++ b/source/libs/sync/inc/syncInt.h @@ -21,7 +21,7 @@ extern "C" { #endif #include "sync.h" -#include "tglobal.h" +#include "tdef.h" #include "trpc.h" #include "ttimer.h" diff --git a/source/libs/sync/src/syncPipeline.c b/source/libs/sync/src/syncPipeline.c index 5eac01b884..c891db07c6 100644 --- a/source/libs/sync/src/syncPipeline.c +++ b/source/libs/sync/src/syncPipeline.c @@ -27,8 +27,9 @@ #include "syncSnapshot.h" #include "syncUtil.h" #include "syncVoteMgr.h" +#include "tglobal.h" -int64_t tsLogBufferMemoryUsed = 0; // total bytes of vnode log buffer +static int64_t tsLogBufferMemoryUsed = 0; // total bytes of vnode log buffer static bool syncIsMsgBlock(tmsg_t type) { return (type == TDMT_VND_CREATE_TABLE) || (type == TDMT_VND_ALTER_TABLE) || (type == TDMT_VND_DROP_TABLE) || From d1cb30c47396a2cba83d9148ae4b5f653e36ebaf Mon Sep 17 00:00:00 2001 From: Kaili Xu Date: Thu, 12 Sep 2024 10:47:49 +0800 Subject: [PATCH 10/13] chore: restore header file --- source/libs/sync/inc/syncInt.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h index 2ff890da56..009854b45b 100644 --- a/source/libs/sync/inc/syncInt.h +++ b/source/libs/sync/inc/syncInt.h @@ -21,7 +21,7 @@ extern "C" { #endif #include "sync.h" -#include "tdef.h" +#include "taosdef.h" #include "trpc.h" #include "ttimer.h" From 36a3a035b033ba7f55cd1dcdc9da8080b26a467b Mon Sep 17 00:00:00 2001 From: kailixu Date: Thu, 12 Sep 2024 11:02:27 +0800 Subject: [PATCH 11/13] fix: compile problem --- source/libs/sync/src/syncPipeline.c | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/source/libs/sync/src/syncPipeline.c b/source/libs/sync/src/syncPipeline.c index c891db07c6..a7c22ffd55 100644 --- a/source/libs/sync/src/syncPipeline.c +++ b/source/libs/sync/src/syncPipeline.c @@ -106,7 +106,7 @@ int32_t syncLogBufferAppend(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEnt pBuf->endIndex = index + 1; if (pNode->vgId > 1) { pBuf->bytes += pEntry->bytes; - atomic_add_fetch_64(&tsLogBufferMemoryUsed, (int64_t)pEntry->bytes); + (void)atomic_add_fetch_64(&tsLogBufferMemoryUsed, (int64_t)pEntry->bytes); } (void)taosThreadMutexUnlock(&pBuf->mutex); @@ -269,7 +269,7 @@ int32_t syncLogBufferInitWithoutLock(SSyncLogBuffer* pBuf, SSyncNode* pNode) { taken = true; if (pNode->vgId > 1) { pBuf->bytes += pEntry->bytes; - atomic_add_fetch_64(&tsLogBufferMemoryUsed, (int64_t)pEntry->bytes); + (void)atomic_add_fetch_64(&tsLogBufferMemoryUsed, (int64_t)pEntry->bytes); } } @@ -299,7 +299,7 @@ int32_t syncLogBufferInitWithoutLock(SSyncLogBuffer* pBuf, SSyncNode* pNode) { pBuf->entries[(commitIndex + pBuf->size) % pBuf->size] = tmp; if (pNode->vgId > 1) { pBuf->bytes += pDummy->bytes; - atomic_add_fetch_64(&tsLogBufferMemoryUsed, (int64_t)pDummy->bytes); + (void)atomic_add_fetch_64(&tsLogBufferMemoryUsed, (int64_t)pDummy->bytes); } if (index < toIndex) { @@ -489,7 +489,7 @@ int32_t syncLogBufferAccept(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEnt pBuf->entries[index % pBuf->size] = tmp; if (pNode->vgId > 1) { pBuf->bytes += pEntry->bytes; - atomic_add_fetch_64(&tsLogBufferMemoryUsed, (int64_t)pEntry->bytes); + (void)atomic_add_fetch_64(&tsLogBufferMemoryUsed, (int64_t)pEntry->bytes); } pEntry = NULL; @@ -883,7 +883,7 @@ int32_t syncLogBufferCommit(SSyncLogBuffer* pBuf, SSyncNode* pNode, int64_t comm } if (isVnode) { pBuf->bytes -= pEntry->bytes; - atomic_sub_fetch_64(&tsLogBufferMemoryUsed, (int64_t)pEntry->bytes); + (void)atomic_sub_fetch_64(&tsLogBufferMemoryUsed, (int64_t)pEntry->bytes); } sDebug("vgId:%d, recycle log entry. index:%" PRId64 ", startIndex:%" PRId64 ", until:%" PRId64 ", commitIndex:%" PRId64 ", endIndex:%" PRId64 ", term:%" PRId64 ", entry bytes:%u, buf bytes:%" PRId64 From c2d28a208eabb537d813fa9e07bc018c51c44b0b Mon Sep 17 00:00:00 2001 From: kailixu Date: Thu, 12 Sep 2024 11:18:10 +0800 Subject: [PATCH 12/13] chore: code optimization --- source/libs/sync/inc/syncInt.h | 2 +- source/libs/sync/src/syncPipeline.c | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h index 009854b45b..0b653ddbe9 100644 --- a/source/libs/sync/inc/syncInt.h +++ b/source/libs/sync/inc/syncInt.h @@ -21,7 +21,7 @@ extern "C" { #endif #include "sync.h" -#include "taosdef.h" +#include "tglobal.h" #include "trpc.h" #include "ttimer.h" diff --git a/source/libs/sync/src/syncPipeline.c b/source/libs/sync/src/syncPipeline.c index a7c22ffd55..44f5a293a9 100644 --- a/source/libs/sync/src/syncPipeline.c +++ b/source/libs/sync/src/syncPipeline.c @@ -27,7 +27,6 @@ #include "syncSnapshot.h" #include "syncUtil.h" #include "syncVoteMgr.h" -#include "tglobal.h" static int64_t tsLogBufferMemoryUsed = 0; // total bytes of vnode log buffer From 9cf662f1642be01a76895ee67c48ef2a562b5e6b Mon Sep 17 00:00:00 2001 From: Shungang Li Date: Fri, 13 Sep 2024 10:58:47 +0800 Subject: [PATCH 13/13] fix: disable dynamic adjustment of 'keepaliveidle' --- source/common/src/tglobal.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 5b67e1267b..0937a56b3c 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -613,7 +613,7 @@ static int32_t taosAddClientCfg(SConfig *pCfg) { cfgAddInt32(pCfg, "timeToGetAvailableConn", tsTimeToGetAvailableConn, 20, 1000000, CFG_SCOPE_BOTH, CFG_DYN_NONE)); tsKeepAliveIdle = TRANGE(tsKeepAliveIdle, 1, 72000); - TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "keepAliveIdle", tsKeepAliveIdle, 1, 7200000, CFG_SCOPE_BOTH, CFG_DYN_ENT_BOTH)); + TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "keepAliveIdle", tsKeepAliveIdle, 1, 7200000, CFG_SCOPE_BOTH, CFG_DYN_NONE)); tsNumOfTaskQueueThreads = tsNumOfCores * 2; tsNumOfTaskQueueThreads = TMAX(tsNumOfTaskQueueThreads, 16);