fix: oom with large msg
This commit is contained in:
parent
e92a58043a
commit
ffaa1092c0
|
@ -28,7 +28,7 @@
|
||||||
#include "syncUtil.h"
|
#include "syncUtil.h"
|
||||||
#include "syncVoteMgr.h"
|
#include "syncVoteMgr.h"
|
||||||
|
|
||||||
static int64_t sSyncLogBufferBytes = 0; // total bytes of vnode log buffer
|
int64_t tsLogBufferMemoryUsed = 0; // total bytes of vnode log buffer
|
||||||
|
|
||||||
static bool syncIsMsgBlock(tmsg_t type) {
|
static bool syncIsMsgBlock(tmsg_t type) {
|
||||||
return (type == TDMT_VND_CREATE_TABLE) || (type == TDMT_VND_ALTER_TABLE) || (type == TDMT_VND_DROP_TABLE) ||
|
return (type == TDMT_VND_CREATE_TABLE) || (type == TDMT_VND_ALTER_TABLE) || (type == TDMT_VND_DROP_TABLE) ||
|
||||||
|
@ -105,7 +105,7 @@ int32_t syncLogBufferAppend(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEnt
|
||||||
pBuf->endIndex = index + 1;
|
pBuf->endIndex = index + 1;
|
||||||
if (pNode->vgId > 1) {
|
if (pNode->vgId > 1) {
|
||||||
pBuf->bytes += pEntry->bytes;
|
pBuf->bytes += pEntry->bytes;
|
||||||
atomic_add_fetch_64(&sSyncLogBufferBytes, (int64_t)pEntry->bytes);
|
atomic_add_fetch_64(&tsLogBufferMemoryUsed, (int64_t)pEntry->bytes);
|
||||||
}
|
}
|
||||||
|
|
||||||
(void)taosThreadMutexUnlock(&pBuf->mutex);
|
(void)taosThreadMutexUnlock(&pBuf->mutex);
|
||||||
|
@ -480,7 +480,7 @@ int32_t syncLogBufferAccept(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEnt
|
||||||
pBuf->entries[index % pBuf->size] = tmp;
|
pBuf->entries[index % pBuf->size] = tmp;
|
||||||
if (pNode->vgId > 1) {
|
if (pNode->vgId > 1) {
|
||||||
pBuf->bytes += pEntry->bytes;
|
pBuf->bytes += pEntry->bytes;
|
||||||
atomic_add_fetch_64(&sSyncLogBufferBytes, (int64_t)pEntry->bytes);
|
atomic_add_fetch_64(&tsLogBufferMemoryUsed, (int64_t)pEntry->bytes);
|
||||||
}
|
}
|
||||||
pEntry = NULL;
|
pEntry = NULL;
|
||||||
|
|
||||||
|
@ -862,7 +862,7 @@ int32_t syncLogBufferCommit(SSyncLogBuffer* pBuf, SSyncNode* pNode, int64_t comm
|
||||||
do {
|
do {
|
||||||
if ((pBuf->startIndex >= pBuf->commitIndex) ||
|
if ((pBuf->startIndex >= pBuf->commitIndex) ||
|
||||||
!((pBuf->startIndex < until) || (isVnode && pBuf->bytes >= TSDB_SYNC_LOG_BUFFER_THRESHOLD &&
|
!((pBuf->startIndex < until) || (isVnode && pBuf->bytes >= TSDB_SYNC_LOG_BUFFER_THRESHOLD &&
|
||||||
atomic_load_64(&sSyncLogBufferBytes) >= tsLogBufferMemoryAllowed))) {
|
atomic_load_64(&tsLogBufferMemoryUsed) >= tsLogBufferMemoryAllowed))) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
SSyncRaftEntry* pEntry = pBuf->entries[(pBuf->startIndex + pBuf->size) % pBuf->size].pItem;
|
SSyncRaftEntry* pEntry = pBuf->entries[(pBuf->startIndex + pBuf->size) % pBuf->size].pItem;
|
||||||
|
@ -874,13 +874,13 @@ int32_t syncLogBufferCommit(SSyncLogBuffer* pBuf, SSyncNode* pNode, int64_t comm
|
||||||
}
|
}
|
||||||
if (isVnode) {
|
if (isVnode) {
|
||||||
pBuf->bytes -= pEntry->bytes;
|
pBuf->bytes -= pEntry->bytes;
|
||||||
atomic_sub_fetch_64(&sSyncLogBufferBytes, (int64_t)pEntry->bytes);
|
atomic_sub_fetch_64(&tsLogBufferMemoryUsed, (int64_t)pEntry->bytes);
|
||||||
}
|
}
|
||||||
sDebug("vgId:%d, recycle log entry. index:%" PRId64 ", startIndex:%" PRId64 ", until:%" PRId64
|
sDebug("vgId:%d, recycle log entry. index:%" PRId64 ", startIndex:%" PRId64 ", until:%" PRId64
|
||||||
", commitIndex:%" PRId64 ", endIndex:%" PRId64 ", term:%" PRId64 ", entry bytes:%u, buf bytes:%" PRId64
|
", commitIndex:%" PRId64 ", endIndex:%" PRId64 ", term:%" PRId64 ", entry bytes:%u, buf bytes:%" PRId64
|
||||||
", total bytes:%" PRId64,
|
", total bytes:%" PRId64,
|
||||||
pNode->vgId, pEntry->index, pBuf->startIndex, until, pBuf->commitIndex, pBuf->endIndex, pEntry->term,
|
pNode->vgId, pEntry->index, pBuf->startIndex, until, pBuf->commitIndex, pBuf->endIndex, pEntry->term,
|
||||||
pEntry->bytes, pBuf->bytes, atomic_load_64(&sSyncLogBufferBytes));
|
pEntry->bytes, pBuf->bytes, atomic_load_64(&tsLogBufferMemoryUsed));
|
||||||
syncEntryDestroy(pEntry);
|
syncEntryDestroy(pEntry);
|
||||||
memset(&pBuf->entries[(pBuf->startIndex + pBuf->size) % pBuf->size], 0, sizeof(pBuf->entries[0]));
|
memset(&pBuf->entries[(pBuf->startIndex + pBuf->size) % pBuf->size], 0, sizeof(pBuf->entries[0]));
|
||||||
++pBuf->startIndex;
|
++pBuf->startIndex;
|
||||||
|
|
Loading…
Reference in New Issue