fix: oom with large msg
This commit is contained in:
parent
883d2bfb7a
commit
2a13aa08e1
|
@ -94,6 +94,7 @@ extern int32_t tsElectInterval;
|
|||
extern int32_t tsHeartbeatInterval;
|
||||
extern int32_t tsHeartbeatTimeout;
|
||||
extern int32_t tsSnapReplMaxWaitN;
|
||||
extern int64_t tsLogBufferMemoryAllowed; // maximum allowed log buffer size in bytes for each dnode
|
||||
|
||||
// arbitrator
|
||||
extern int32_t tsArbHeartBeatIntervalSec;
|
||||
|
|
|
@ -332,6 +332,7 @@ typedef enum ELogicConditionType {
|
|||
#define TSDB_MAX_LEARNER_REPLICA 10
|
||||
#define TSDB_SYNC_LOG_BUFFER_SIZE 4096
|
||||
#define TSDB_SYNC_LOG_BUFFER_RETENTION 256
|
||||
#define TSDB_SYNC_LOG_BUFFER_THRESHOLD (1024 * 1024 * 5)
|
||||
#define TSDB_SYNC_APPLYQ_SIZE_LIMIT 512
|
||||
#define TSDB_SYNC_NEGOTIATION_WIN 512
|
||||
|
||||
|
|
|
@ -81,6 +81,7 @@ int32_t tsElectInterval = 25 * 1000;
|
|||
int32_t tsHeartbeatInterval = 1000;
|
||||
int32_t tsHeartbeatTimeout = 20 * 1000;
|
||||
int32_t tsSnapReplMaxWaitN = 128;
|
||||
int64_t tsLogBufferMemoryAllowed = 0; // bytes
|
||||
|
||||
// mnode
|
||||
int64_t tsMndSdbWriteDelta = 200;
|
||||
|
@ -702,6 +703,9 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
|
|||
tsQueueMemoryAllowed = tsTotalMemoryKB * 1024 * 0.1;
|
||||
tsQueueMemoryAllowed = TRANGE(tsQueueMemoryAllowed, TSDB_MAX_MSG_SIZE * 10LL, TSDB_MAX_MSG_SIZE * 10000LL);
|
||||
|
||||
tsLogBufferMemoryAllowed = tsTotalMemoryKB * 1024 * 0.1;
|
||||
tsLogBufferMemoryAllowed = TRANGE(tsLogBufferMemoryAllowed, TSDB_MAX_MSG_SIZE * 10LL, TSDB_MAX_MSG_SIZE * 10000LL);
|
||||
|
||||
// clang-format off
|
||||
TAOS_CHECK_RETURN(cfgAddDir(pCfg, "dataDir", tsDataDir, CFG_SCOPE_SERVER, CFG_DYN_NONE));
|
||||
TAOS_CHECK_RETURN(cfgAddFloat(pCfg, "minimalDataDirGB", 2.0f, 0.001f, 10000000, CFG_SCOPE_SERVER, CFG_DYN_NONE));
|
||||
|
@ -736,6 +740,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
|
|||
TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "syncHeartbeatInterval", tsHeartbeatInterval, 10, 1000 * 60 * 24 * 2, CFG_SCOPE_SERVER, CFG_DYN_NONE));
|
||||
TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "syncHeartbeatTimeout", tsHeartbeatTimeout, 10, 1000 * 60 * 24 * 2, CFG_SCOPE_SERVER, CFG_DYN_NONE));
|
||||
TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "syncSnapReplMaxWaitN", tsSnapReplMaxWaitN, 16, (TSDB_SYNC_SNAP_BUFFER_SIZE >> 2), CFG_SCOPE_SERVER, CFG_DYN_NONE));
|
||||
TAOS_CHECK_RETURN(cfgAddInt64(pCfg, "syncLogBufferMemoryAllowed", tsLogBufferMemoryAllowed, TSDB_MAX_MSG_SIZE * 10L, INT64_MAX, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER));
|
||||
|
||||
TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "arbHeartBeatIntervalSec", tsArbHeartBeatIntervalSec, 1, 60 * 24 * 2, CFG_SCOPE_SERVER, CFG_DYN_NONE));
|
||||
TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "arbCheckSyncIntervalSec", tsArbCheckSyncIntervalSec, 1, 60 * 24 * 2, CFG_SCOPE_SERVER, CFG_DYN_NONE));
|
||||
|
@ -970,6 +975,14 @@ static int32_t taosUpdateServerCfg(SConfig *pCfg) {
|
|||
pItem->stype = stype;
|
||||
}
|
||||
|
||||
pItem = cfgGetItem(tsCfg, "syncLogBufferMemoryAllowed");
|
||||
if (pItem != NULL && pItem->stype == CFG_STYPE_DEFAULT) {
|
||||
tsLogBufferMemoryAllowed = totalMemoryKB * 1024 * 0.1;
|
||||
tsLogBufferMemoryAllowed = TRANGE(tsLogBufferMemoryAllowed, TSDB_MAX_MSG_SIZE * 10LL, TSDB_MAX_MSG_SIZE * 10000LL);
|
||||
pItem->i64 = tsLogBufferMemoryAllowed;
|
||||
pItem->stype = stype;
|
||||
}
|
||||
|
||||
TAOS_RETURN(TSDB_CODE_SUCCESS);
|
||||
}
|
||||
|
||||
|
@ -1520,6 +1533,9 @@ static int32_t taosSetServerCfg(SConfig *pCfg) {
|
|||
TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "syncSnapReplMaxWaitN");
|
||||
tsSnapReplMaxWaitN = pItem->i32;
|
||||
|
||||
TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "syncLogBufferMemoryAllowed");
|
||||
tsLogBufferMemoryAllowed = pItem->i64;
|
||||
|
||||
TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "arbHeartBeatIntervalSec");
|
||||
tsArbHeartBeatIntervalSec = pItem->i32;
|
||||
|
||||
|
@ -1954,6 +1970,7 @@ static int32_t taosCfgDynamicOptionsForServer(SConfig *pCfg, const char *name) {
|
|||
{"randErrorChance", &tsRandErrChance},
|
||||
{"randErrorDivisor", &tsRandErrDivisor},
|
||||
{"randErrorScope", &tsRandErrScope},
|
||||
{"syncLogBufferMemoryAllowed", &tsLogBufferMemoryAllowed},
|
||||
|
||||
{"cacheLazyLoadThreshold", &tsCacheLazyLoadThreshold},
|
||||
{"checkpointInterval", &tsStreamCheckpointInterval},
|
||||
|
|
|
@ -21,7 +21,7 @@ extern "C" {
|
|||
#endif
|
||||
|
||||
#include "sync.h"
|
||||
#include "taosdef.h"
|
||||
#include "tglobal.h"
|
||||
#include "trpc.h"
|
||||
#include "ttimer.h"
|
||||
|
||||
|
|
|
@ -54,6 +54,7 @@ typedef struct SSyncLogBuffer {
|
|||
int64_t matchIndex;
|
||||
int64_t endIndex;
|
||||
int64_t size;
|
||||
int64_t bytes;
|
||||
TdThreadMutex mutex;
|
||||
TdThreadMutexAttr attr;
|
||||
int64_t totalIndex;
|
||||
|
|
|
@ -28,6 +28,8 @@
|
|||
#include "syncUtil.h"
|
||||
#include "syncVoteMgr.h"
|
||||
|
||||
static int64_t sSyncLogBufferBytes = 0; // total bytes of vnode log buffer
|
||||
|
||||
static bool syncIsMsgBlock(tmsg_t type) {
|
||||
return (type == TDMT_VND_CREATE_TABLE) || (type == TDMT_VND_ALTER_TABLE) || (type == TDMT_VND_DROP_TABLE) ||
|
||||
(type == TDMT_VND_UPDATE_TAG_VAL) || (type == TDMT_VND_ALTER_CONFIRM);
|
||||
|
@ -101,6 +103,10 @@ int32_t syncLogBufferAppend(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEnt
|
|||
SSyncLogBufEntry tmp = {.pItem = pEntry, .prevLogIndex = pMatch->index, .prevLogTerm = pMatch->term};
|
||||
pBuf->entries[index % pBuf->size] = tmp;
|
||||
pBuf->endIndex = index + 1;
|
||||
if (pNode->vgId > 1) {
|
||||
pBuf->bytes += pEntry->bytes;
|
||||
atomic_add_fetch_64(&sSyncLogBufferBytes, (int64_t)pEntry->bytes);
|
||||
}
|
||||
|
||||
(void)taosThreadMutexUnlock(&pBuf->mutex);
|
||||
TAOS_CHECK_RETURN(syncLogBufferValidate(pBuf));
|
||||
|
@ -330,6 +336,7 @@ int32_t syncLogBufferReInit(SSyncLogBuffer* pBuf, SSyncNode* pNode) {
|
|||
(void)memset(&pBuf->entries[(index + pBuf->size) % pBuf->size], 0, sizeof(pBuf->entries[0]));
|
||||
}
|
||||
pBuf->startIndex = pBuf->commitIndex = pBuf->matchIndex = pBuf->endIndex = 0;
|
||||
pBuf->bytes = 0;
|
||||
int32_t code = syncLogBufferInitWithoutLock(pBuf, pNode);
|
||||
if (code < 0) {
|
||||
sError("vgId:%d, failed to re-initialize sync log buffer since %s.", pNode->vgId, tstrerror(code));
|
||||
|
@ -470,8 +477,12 @@ int32_t syncLogBufferAccept(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEnt
|
|||
goto _out;
|
||||
}
|
||||
SSyncLogBufEntry tmp = {.pItem = pEntry, .prevLogIndex = prevIndex, .prevLogTerm = prevTerm};
|
||||
pEntry = NULL;
|
||||
pBuf->entries[index % pBuf->size] = tmp;
|
||||
if (pNode->vgId > 1) {
|
||||
pBuf->bytes += pEntry->bytes;
|
||||
atomic_add_fetch_64(&sSyncLogBufferBytes, (int64_t)pEntry->bytes);
|
||||
}
|
||||
pEntry = NULL;
|
||||
|
||||
// update end index
|
||||
pBuf->endIndex = TMAX(index + 1, pBuf->endIndex);
|
||||
|
@ -846,14 +857,36 @@ int32_t syncLogBufferCommit(SSyncLogBuffer* pBuf, SSyncNode* pNode, int64_t comm
|
|||
}
|
||||
|
||||
// recycle
|
||||
SyncIndex until = pBuf->commitIndex; // - TSDB_SYNC_LOG_BUFFER_RETENTION;
|
||||
for (SyncIndex index = pBuf->startIndex; index < until; index++) {
|
||||
SSyncRaftEntry* pEntry = pBuf->entries[(index + pBuf->size) % pBuf->size].pItem;
|
||||
if (pEntry == NULL) return TSDB_CODE_SYN_INTERNAL_ERROR;
|
||||
bool isVnode = pNode->vgId > 1;
|
||||
SyncIndex until = pBuf->commitIndex - TSDB_SYNC_LOG_BUFFER_RETENTION;
|
||||
do {
|
||||
if (pBuf->startIndex >= pBuf->commitIndex) {
|
||||
break;
|
||||
}
|
||||
SSyncRaftEntry* pEntry = pBuf->entries[(pBuf->startIndex + pBuf->size) % pBuf->size].pItem;
|
||||
if (pEntry == NULL) {
|
||||
sError("vgId:%d, invalid log entry to recycle. index:%" PRId64 ", startIndex:%" PRId64 ", until:%" PRId64
|
||||
", commitIndex:%" PRId64 ", endIndex:%" PRId64 ", term:%" PRId64,
|
||||
pNode->vgId, pEntry->index, pBuf->startIndex, until, pBuf->commitIndex, pBuf->endIndex, pEntry->term);
|
||||
return TSDB_CODE_SYN_INTERNAL_ERROR;
|
||||
}
|
||||
if (!((pBuf->startIndex < until) || (isVnode && pBuf->bytes >= TSDB_SYNC_LOG_BUFFER_THRESHOLD &&
|
||||
atomic_load_64(&sSyncLogBufferBytes) >= tsLogBufferMemoryAllowed))) {
|
||||
break;
|
||||
}
|
||||
if (isVnode) {
|
||||
pBuf->bytes -= pEntry->bytes;
|
||||
atomic_sub_fetch_64(&sSyncLogBufferBytes, (int64_t)pEntry->bytes);
|
||||
}
|
||||
sDebug("vgId:%d, recycle log entry. index:%" PRId64 ", startIndex:%" PRId64 ", until:%" PRId64
|
||||
", commitIndex:%" PRId64 ", endIndex:%" PRId64 ", term:%" PRId64 ", entry bytes:%u, buf bytes:%" PRId64
|
||||
", total bytes:%" PRId64,
|
||||
pNode->vgId, pEntry->index, pBuf->startIndex, until, pBuf->commitIndex, pBuf->endIndex, pEntry->term,
|
||||
pEntry->bytes, pBuf->bytes, atomic_load_64(&sSyncLogBufferBytes));
|
||||
syncEntryDestroy(pEntry);
|
||||
(void)memset(&pBuf->entries[(index + pBuf->size) % pBuf->size], 0, sizeof(pBuf->entries[0]));
|
||||
pBuf->startIndex = index + 1;
|
||||
}
|
||||
memset(&pBuf->entries[(pBuf->startIndex + pBuf->size) % pBuf->size], 0, sizeof(pBuf->entries[0]));
|
||||
++pBuf->startIndex;
|
||||
} while (true);
|
||||
|
||||
code = 0;
|
||||
_out:
|
||||
|
@ -1324,6 +1357,7 @@ void syncLogBufferClear(SSyncLogBuffer* pBuf) {
|
|||
(void)memset(&pBuf->entries[(index + pBuf->size) % pBuf->size], 0, sizeof(pBuf->entries[0]));
|
||||
}
|
||||
pBuf->startIndex = pBuf->commitIndex = pBuf->matchIndex = pBuf->endIndex = 0;
|
||||
pBuf->bytes = 0;
|
||||
(void)taosThreadMutexUnlock(&pBuf->mutex);
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue