Merge pull request #27774 from taosdata/fix/TS-5392-3.0

fix: oom with large msg
This commit is contained in:
Hongze Cheng 2024-09-13 15:07:30 +08:00 committed by GitHub
commit d4c37db0f5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 68 additions and 8 deletions

View File

@ -94,6 +94,7 @@ extern int32_t tsElectInterval;
extern int32_t tsHeartbeatInterval;
extern int32_t tsHeartbeatTimeout;
extern int32_t tsSnapReplMaxWaitN;
extern int64_t tsLogBufferMemoryAllowed; // maximum allowed log buffer size in bytes for each dnode
// arbitrator
extern int32_t tsArbHeartBeatIntervalSec;

View File

@ -332,6 +332,7 @@ typedef enum ELogicConditionType {
#define TSDB_MAX_LEARNER_REPLICA 10
#define TSDB_SYNC_LOG_BUFFER_SIZE 4096
#define TSDB_SYNC_LOG_BUFFER_RETENTION 256
#define TSDB_SYNC_LOG_BUFFER_THRESHOLD (1024 * 1024 * 5)
#define TSDB_SYNC_APPLYQ_SIZE_LIMIT 512
#define TSDB_SYNC_NEGOTIATION_WIN 512

View File

@ -81,6 +81,7 @@ int32_t tsElectInterval = 25 * 1000;
int32_t tsHeartbeatInterval = 1000;
int32_t tsHeartbeatTimeout = 20 * 1000;
int32_t tsSnapReplMaxWaitN = 128;
int64_t tsLogBufferMemoryAllowed = 0; // bytes
// mnode
int64_t tsMndSdbWriteDelta = 200;
@ -702,6 +703,9 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
tsQueueMemoryAllowed = tsTotalMemoryKB * 1024 * 0.1;
tsQueueMemoryAllowed = TRANGE(tsQueueMemoryAllowed, TSDB_MAX_MSG_SIZE * 10LL, TSDB_MAX_MSG_SIZE * 10000LL);
tsLogBufferMemoryAllowed = tsTotalMemoryKB * 1024 * 0.1;
tsLogBufferMemoryAllowed = TRANGE(tsLogBufferMemoryAllowed, TSDB_MAX_MSG_SIZE * 10LL, TSDB_MAX_MSG_SIZE * 10000LL);
// clang-format off
TAOS_CHECK_RETURN(cfgAddDir(pCfg, "dataDir", tsDataDir, CFG_SCOPE_SERVER, CFG_DYN_NONE));
TAOS_CHECK_RETURN(cfgAddFloat(pCfg, "minimalDataDirGB", 2.0f, 0.001f, 10000000, CFG_SCOPE_SERVER, CFG_DYN_NONE));
@ -736,6 +740,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "syncHeartbeatInterval", tsHeartbeatInterval, 10, 1000 * 60 * 24 * 2, CFG_SCOPE_SERVER, CFG_DYN_NONE));
TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "syncHeartbeatTimeout", tsHeartbeatTimeout, 10, 1000 * 60 * 24 * 2, CFG_SCOPE_SERVER, CFG_DYN_NONE));
TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "syncSnapReplMaxWaitN", tsSnapReplMaxWaitN, 16, (TSDB_SYNC_SNAP_BUFFER_SIZE >> 2), CFG_SCOPE_SERVER, CFG_DYN_NONE));
TAOS_CHECK_RETURN(cfgAddInt64(pCfg, "syncLogBufferMemoryAllowed", tsLogBufferMemoryAllowed, TSDB_MAX_MSG_SIZE * 10L, INT64_MAX, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER));
TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "arbHeartBeatIntervalSec", tsArbHeartBeatIntervalSec, 1, 60 * 24 * 2, CFG_SCOPE_SERVER, CFG_DYN_NONE));
TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "arbCheckSyncIntervalSec", tsArbCheckSyncIntervalSec, 1, 60 * 24 * 2, CFG_SCOPE_SERVER, CFG_DYN_NONE));
@ -970,6 +975,14 @@ static int32_t taosUpdateServerCfg(SConfig *pCfg) {
pItem->stype = stype;
}
pItem = cfgGetItem(tsCfg, "syncLogBufferMemoryAllowed");
if (pItem != NULL && pItem->stype == CFG_STYPE_DEFAULT) {
tsLogBufferMemoryAllowed = totalMemoryKB * 1024 * 0.1;
tsLogBufferMemoryAllowed = TRANGE(tsLogBufferMemoryAllowed, TSDB_MAX_MSG_SIZE * 10LL, TSDB_MAX_MSG_SIZE * 10000LL);
pItem->i64 = tsLogBufferMemoryAllowed;
pItem->stype = stype;
}
TAOS_RETURN(TSDB_CODE_SUCCESS);
}
@ -1520,6 +1533,9 @@ static int32_t taosSetServerCfg(SConfig *pCfg) {
TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "syncSnapReplMaxWaitN");
tsSnapReplMaxWaitN = pItem->i32;
TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "syncLogBufferMemoryAllowed");
tsLogBufferMemoryAllowed = pItem->i64;
TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "arbHeartBeatIntervalSec");
tsArbHeartBeatIntervalSec = pItem->i32;
@ -1954,6 +1970,7 @@ static int32_t taosCfgDynamicOptionsForServer(SConfig *pCfg, const char *name) {
{"randErrorChance", &tsRandErrChance},
{"randErrorDivisor", &tsRandErrDivisor},
{"randErrorScope", &tsRandErrScope},
{"syncLogBufferMemoryAllowed", &tsLogBufferMemoryAllowed},
{"cacheLazyLoadThreshold", &tsCacheLazyLoadThreshold},
{"checkpointInterval", &tsStreamCheckpointInterval},

View File

@ -21,7 +21,7 @@ extern "C" {
#endif
#include "sync.h"
#include "taosdef.h"
#include "tglobal.h"
#include "trpc.h"
#include "ttimer.h"

View File

@ -54,6 +54,7 @@ typedef struct SSyncLogBuffer {
int64_t matchIndex;
int64_t endIndex;
int64_t size;
int64_t bytes;
TdThreadMutex mutex;
TdThreadMutexAttr attr;
int64_t totalIndex;

View File

@ -28,6 +28,8 @@
#include "syncUtil.h"
#include "syncVoteMgr.h"
static int64_t tsLogBufferMemoryUsed = 0; // total bytes of vnode log buffer
static bool syncIsMsgBlock(tmsg_t type) {
return (type == TDMT_VND_CREATE_TABLE) || (type == TDMT_VND_ALTER_TABLE) || (type == TDMT_VND_DROP_TABLE) ||
(type == TDMT_VND_UPDATE_TAG_VAL) || (type == TDMT_VND_ALTER_CONFIRM);
@ -101,6 +103,10 @@ int32_t syncLogBufferAppend(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEnt
SSyncLogBufEntry tmp = {.pItem = pEntry, .prevLogIndex = pMatch->index, .prevLogTerm = pMatch->term};
pBuf->entries[index % pBuf->size] = tmp;
pBuf->endIndex = index + 1;
if (pNode->vgId > 1) {
pBuf->bytes += pEntry->bytes;
(void)atomic_add_fetch_64(&tsLogBufferMemoryUsed, (int64_t)pEntry->bytes);
}
(void)taosThreadMutexUnlock(&pBuf->mutex);
TAOS_CHECK_RETURN(syncLogBufferValidate(pBuf));
@ -260,6 +266,10 @@ int32_t syncLogBufferInitWithoutLock(SSyncLogBuffer* pBuf, SSyncNode* pNode) {
SSyncLogBufEntry tmp = {.pItem = pEntry, .prevLogIndex = -1, .prevLogTerm = -1};
pBuf->entries[index % pBuf->size] = tmp;
taken = true;
if (pNode->vgId > 1) {
pBuf->bytes += pEntry->bytes;
(void)atomic_add_fetch_64(&tsLogBufferMemoryUsed, (int64_t)pEntry->bytes);
}
}
if (index < toIndex) {
@ -286,6 +296,10 @@ int32_t syncLogBufferInitWithoutLock(SSyncLogBuffer* pBuf, SSyncNode* pNode) {
}
SSyncLogBufEntry tmp = {.pItem = pDummy, .prevLogIndex = commitIndex - 1, .prevLogTerm = commitTerm};
pBuf->entries[(commitIndex + pBuf->size) % pBuf->size] = tmp;
if (pNode->vgId > 1) {
pBuf->bytes += pDummy->bytes;
(void)atomic_add_fetch_64(&tsLogBufferMemoryUsed, (int64_t)pDummy->bytes);
}
if (index < toIndex) {
pBuf->entries[(index + 1) % pBuf->size].prevLogIndex = commitIndex;
@ -330,6 +344,7 @@ int32_t syncLogBufferReInit(SSyncLogBuffer* pBuf, SSyncNode* pNode) {
(void)memset(&pBuf->entries[(index + pBuf->size) % pBuf->size], 0, sizeof(pBuf->entries[0]));
}
pBuf->startIndex = pBuf->commitIndex = pBuf->matchIndex = pBuf->endIndex = 0;
pBuf->bytes = 0;
int32_t code = syncLogBufferInitWithoutLock(pBuf, pNode);
if (code < 0) {
sError("vgId:%d, failed to re-initialize sync log buffer since %s.", pNode->vgId, tstrerror(code));
@ -470,8 +485,12 @@ int32_t syncLogBufferAccept(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEnt
goto _out;
}
SSyncLogBufEntry tmp = {.pItem = pEntry, .prevLogIndex = prevIndex, .prevLogTerm = prevTerm};
pEntry = NULL;
pBuf->entries[index % pBuf->size] = tmp;
if (pNode->vgId > 1) {
pBuf->bytes += pEntry->bytes;
(void)atomic_add_fetch_64(&tsLogBufferMemoryUsed, (int64_t)pEntry->bytes);
}
pEntry = NULL;
// update end index
pBuf->endIndex = TMAX(index + 1, pBuf->endIndex);
@ -846,14 +865,34 @@ int32_t syncLogBufferCommit(SSyncLogBuffer* pBuf, SSyncNode* pNode, int64_t comm
}
// recycle
bool isVnode = pNode->vgId > 1;
SyncIndex until = pBuf->commitIndex - TSDB_SYNC_LOG_BUFFER_RETENTION;
for (SyncIndex index = pBuf->startIndex; index < until; index++) {
SSyncRaftEntry* pEntry = pBuf->entries[(index + pBuf->size) % pBuf->size].pItem;
if (pEntry == NULL) return TSDB_CODE_SYN_INTERNAL_ERROR;
do {
if ((pBuf->startIndex >= pBuf->commitIndex) ||
!((pBuf->startIndex < until) || (isVnode && pBuf->bytes >= TSDB_SYNC_LOG_BUFFER_THRESHOLD &&
atomic_load_64(&tsLogBufferMemoryUsed) >= tsLogBufferMemoryAllowed))) {
break;
}
SSyncRaftEntry* pEntry = pBuf->entries[(pBuf->startIndex + pBuf->size) % pBuf->size].pItem;
if (pEntry == NULL) {
sError("vgId:%d, invalid log entry to recycle. index:%" PRId64 ", startIndex:%" PRId64 ", until:%" PRId64
", commitIndex:%" PRId64 ", endIndex:%" PRId64 ", term:%" PRId64,
pNode->vgId, pEntry->index, pBuf->startIndex, until, pBuf->commitIndex, pBuf->endIndex, pEntry->term);
return TSDB_CODE_SYN_INTERNAL_ERROR;
}
if (isVnode) {
pBuf->bytes -= pEntry->bytes;
(void)atomic_sub_fetch_64(&tsLogBufferMemoryUsed, (int64_t)pEntry->bytes);
}
sDebug("vgId:%d, recycle log entry. index:%" PRId64 ", startIndex:%" PRId64 ", until:%" PRId64
", commitIndex:%" PRId64 ", endIndex:%" PRId64 ", term:%" PRId64 ", entry bytes:%u, buf bytes:%" PRId64
", used:%" PRId64 ", allowed:%" PRId64,
pNode->vgId, pEntry->index, pBuf->startIndex, until, pBuf->commitIndex, pBuf->endIndex, pEntry->term,
pEntry->bytes, pBuf->bytes, atomic_load_64(&tsLogBufferMemoryUsed), tsLogBufferMemoryAllowed);
syncEntryDestroy(pEntry);
(void)memset(&pBuf->entries[(index + pBuf->size) % pBuf->size], 0, sizeof(pBuf->entries[0]));
pBuf->startIndex = index + 1;
}
(void)memset(&pBuf->entries[(pBuf->startIndex + pBuf->size) % pBuf->size], 0, sizeof(pBuf->entries[0]));
++pBuf->startIndex;
} while (true);
code = 0;
_out:
@ -1324,6 +1363,7 @@ void syncLogBufferClear(SSyncLogBuffer* pBuf) {
(void)memset(&pBuf->entries[(index + pBuf->size) % pBuf->size], 0, sizeof(pBuf->entries[0]));
}
pBuf->startIndex = pBuf->commitIndex = pBuf->matchIndex = pBuf->endIndex = 0;
pBuf->bytes = 0;
(void)taosThreadMutexUnlock(&pBuf->mutex);
}