diff --git a/source/libs/sync/src/syncPipeline.c b/source/libs/sync/src/syncPipeline.c index d7c55fcd8a..72d586f47e 100644 --- a/source/libs/sync/src/syncPipeline.c +++ b/source/libs/sync/src/syncPipeline.c @@ -266,6 +266,10 @@ int32_t syncLogBufferInitWithoutLock(SSyncLogBuffer* pBuf, SSyncNode* pNode) { SSyncLogBufEntry tmp = {.pItem = pEntry, .prevLogIndex = -1, .prevLogTerm = -1}; pBuf->entries[index % pBuf->size] = tmp; taken = true; + if (pNode->vgId > 1) { + pBuf->bytes += pEntry->bytes; + atomic_add_fetch_64(&tsLogBufferMemoryUsed, (int64_t)pEntry->bytes); + } } if (index < toIndex) { @@ -292,6 +296,10 @@ int32_t syncLogBufferInitWithoutLock(SSyncLogBuffer* pBuf, SSyncNode* pNode) { } SSyncLogBufEntry tmp = {.pItem = pDummy, .prevLogIndex = commitIndex - 1, .prevLogTerm = commitTerm}; pBuf->entries[(commitIndex + pBuf->size) % pBuf->size] = tmp; + if (pNode->vgId > 1) { + pBuf->bytes += pDummy->bytes; + atomic_add_fetch_64(&tsLogBufferMemoryUsed, (int64_t)pDummy->bytes); + } if (index < toIndex) { pBuf->entries[(index + 1) % pBuf->size].prevLogIndex = commitIndex; @@ -878,9 +886,9 @@ int32_t syncLogBufferCommit(SSyncLogBuffer* pBuf, SSyncNode* pNode, int64_t comm } sDebug("vgId:%d, recycle log entry. index:%" PRId64 ", startIndex:%" PRId64 ", until:%" PRId64 ", commitIndex:%" PRId64 ", endIndex:%" PRId64 ", term:%" PRId64 ", entry bytes:%u, buf bytes:%" PRId64 - ", total bytes:%" PRId64, + ", used:%" PRId64 ", allowed:%" PRId64, pNode->vgId, pEntry->index, pBuf->startIndex, until, pBuf->commitIndex, pBuf->endIndex, pEntry->term, - pEntry->bytes, pBuf->bytes, atomic_load_64(&tsLogBufferMemoryUsed)); + pEntry->bytes, pBuf->bytes, atomic_load_64(&tsLogBufferMemoryUsed), tsLogBufferMemoryAllowed); syncEntryDestroy(pEntry); memset(&pBuf->entries[(pBuf->startIndex + pBuf->size) % pBuf->size], 0, sizeof(pBuf->entries[0])); ++pBuf->startIndex;