diff --git a/source/libs/sync/src/syncPipeline.c b/source/libs/sync/src/syncPipeline.c index 1ba9de8fa7..d7c55fcd8a 100644 --- a/source/libs/sync/src/syncPipeline.c +++ b/source/libs/sync/src/syncPipeline.c @@ -28,7 +28,7 @@ #include "syncUtil.h" #include "syncVoteMgr.h" -static int64_t sSyncLogBufferBytes = 0; // total bytes of vnode log buffer +int64_t tsLogBufferMemoryUsed = 0; // total bytes of vnode log buffer static bool syncIsMsgBlock(tmsg_t type) { return (type == TDMT_VND_CREATE_TABLE) || (type == TDMT_VND_ALTER_TABLE) || (type == TDMT_VND_DROP_TABLE) || @@ -105,7 +105,7 @@ int32_t syncLogBufferAppend(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEnt pBuf->endIndex = index + 1; if (pNode->vgId > 1) { pBuf->bytes += pEntry->bytes; - atomic_add_fetch_64(&sSyncLogBufferBytes, (int64_t)pEntry->bytes); + atomic_add_fetch_64(&tsLogBufferMemoryUsed, (int64_t)pEntry->bytes); } (void)taosThreadMutexUnlock(&pBuf->mutex); @@ -480,7 +480,7 @@ int32_t syncLogBufferAccept(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEnt pBuf->entries[index % pBuf->size] = tmp; if (pNode->vgId > 1) { pBuf->bytes += pEntry->bytes; - atomic_add_fetch_64(&sSyncLogBufferBytes, (int64_t)pEntry->bytes); + atomic_add_fetch_64(&tsLogBufferMemoryUsed, (int64_t)pEntry->bytes); } pEntry = NULL; @@ -862,7 +862,7 @@ int32_t syncLogBufferCommit(SSyncLogBuffer* pBuf, SSyncNode* pNode, int64_t comm do { if ((pBuf->startIndex >= pBuf->commitIndex) || !((pBuf->startIndex < until) || (isVnode && pBuf->bytes >= TSDB_SYNC_LOG_BUFFER_THRESHOLD && - atomic_load_64(&sSyncLogBufferBytes) >= tsLogBufferMemoryAllowed))) { + atomic_load_64(&tsLogBufferMemoryUsed) >= tsLogBufferMemoryAllowed))) { break; } SSyncRaftEntry* pEntry = pBuf->entries[(pBuf->startIndex + pBuf->size) % pBuf->size].pItem; @@ -874,13 +874,13 @@ int32_t syncLogBufferCommit(SSyncLogBuffer* pBuf, SSyncNode* pNode, int64_t comm } if (isVnode) { pBuf->bytes -= pEntry->bytes; - atomic_sub_fetch_64(&sSyncLogBufferBytes, (int64_t)pEntry->bytes); + atomic_sub_fetch_64(&tsLogBufferMemoryUsed, (int64_t)pEntry->bytes); } sDebug("vgId:%d, recycle log entry. index:%" PRId64 ", startIndex:%" PRId64 ", until:%" PRId64 ", commitIndex:%" PRId64 ", endIndex:%" PRId64 ", term:%" PRId64 ", entry bytes:%u, buf bytes:%" PRId64 ", total bytes:%" PRId64, pNode->vgId, pEntry->index, pBuf->startIndex, until, pBuf->commitIndex, pBuf->endIndex, pEntry->term, - pEntry->bytes, pBuf->bytes, atomic_load_64(&sSyncLogBufferBytes)); + pEntry->bytes, pBuf->bytes, atomic_load_64(&tsLogBufferMemoryUsed)); syncEntryDestroy(pEntry); memset(&pBuf->entries[(pBuf->startIndex + pBuf->size) % pBuf->size], 0, sizeof(pBuf->entries[0])); ++pBuf->startIndex;