From 5f2ab7908af9fc6da75b0561dc1ea2345baf61d5 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Thu, 27 Jun 2024 19:27:47 +0800 Subject: [PATCH] enh: add memory pool --- source/util/inc/tmempoolInt.h | 67 ++++++--- source/util/src/terror.c | 2 +- source/util/src/tmempool.c | 250 ++++++++++++++++++++++++++-------- 3 files changed, 240 insertions(+), 79 deletions(-) diff --git a/source/util/inc/tmempoolInt.h b/source/util/inc/tmempoolInt.h index 762cd337b5..c62dd39644 100755 --- a/source/util/inc/tmempoolInt.h +++ b/source/util/inc/tmempoolInt.h @@ -22,35 +22,41 @@ extern "C" { #include "os.h" -#define MP_CHUNKPOOL_MIN_BATCH_SIZE 1000 +#define MP_CHUNKGRP_ALLOC_BATCH_SIZE 1000 +#define MP_NSCHUNKGRP_ALLOC_BATCH_SIZE 500 #define MP_MAX_KEEP_FREE_CHUNK_NUM 1000 #define MP_CHUNK_FLAG_IN_USE (1 << 0) #define MP_CHUNK_FLAG_NS_CHUNK (1 << 1) +#define MP_DBG_FLAG_LOG_MALLOC_FREE (1 << 0) + typedef struct SMPChunk { - void *pNext; - void *pMemStart; + void *pNext; + char *pMemStart; int32_t flags; + /* KEEP ABOVE SAME WITH SMPNSChunk */ + uint32_t offset; } SMPChunk; typedef struct SMPNSChunk { void *pNext; - void *pMemStart; + char *pMemStart; int32_t flags; - uint32_t offset; + /* KEEP ABOVE SAME WITH SMPChunk */ - uint32_t memBytes; + uint32_t offset; + uint64_t memBytes; } SMPNSChunk; -typedef struct SMPChunkCache { +typedef struct SMPChunkGroup { int32_t chunkNum; int32_t idleOffset; - SMPChunk *pChunks; + void *pChunks; void* pNext; -} SMPChunkCache; +} SMPChunkGroup; typedef struct SMPMemoryStat { int64_t chunkAlloc; @@ -77,6 +83,7 @@ typedef struct SMPSession { SMPChunk *srcChunkHead; SMPChunk *srcChunkTail; + int32_t inUseChunkNum; SMPChunk *inUseChunkHead; SMPChunk *inUseChunkTail; @@ -92,22 +99,36 @@ typedef struct SMPSession { SMPStat stat; } SMPSession; +typedef struct SMPChunkGroupInfo { + int16_t chunkNodeSize; + int64_t allocChunkNum; + int32_t chunkGroupNum; + SMPChunkGroup *pChunkGrpHead; + SMPChunkGroup *pChunkGrpTail; + void *pIdleChunkList; +} SMPChunkGroupInfo; + +typedef struct SMPDebugInfo { + int64_t flags; +} SMPDebugInfo; + typedef struct SMemPool { char *name; int16_t slotId; SMemPoolCfg cfg; int32_t maxChunkNum; + SMPDebugInfo dbgInfo; + int16_t maxDiscardSize; double threadChunkReserveNum; int64_t allocChunkNum; + int64_t allocChunkSize; + int64_t allocNSChunkNum; + int64_t allocNSChunkSize; int64_t allocMemSize; - int64_t usedMemSize; - int64_t allocChunkCacheNum; - int32_t chunkCacheUnitNum; - SMPChunkCache *pChunkCacheHead; - SMPChunkCache *pChunkCacheTail; - SMPChunk *pIdleChunkList; + SMPChunkGroupInfo chunkGrpInfo; + SMPChunkGroupInfo NSChunkGrpInfo; int32_t readyChunkNum; int32_t readyChunkReserveNum; @@ -124,15 +145,25 @@ typedef struct SMemPool { SMPStat stat; } SMemPool; -#define MP_CHUNK_GET_FLAG(st, f) ((st) & (f)) -#define MP_CHUNK_SET_FLAG(st, f) (st) |= (f) -#define MP_CHUNK_CLR_FLAG(st, f) (st) &= (~f) +#define MP_GET_FLAG(st, f) ((st) & (f)) +#define MP_SET_FLAG(st, f) (st) |= (f) +#define MP_CLR_FLAG(st, f) (st) &= (~f) enum { MP_READ = 1, MP_WRITE, }; +#define MP_ADD_TO_CHUNK_LIST(_chunkHead, _chunkTail, _chunkNum, _chunk) \ + do { \ + if (NULL == _chunkHead) { \ + _chunkHead = _chunk; \ + _chunkTail = _chunk; \ + } else { \ + (_chunkTail)->pNext = _chunk; \ + } \ + (_chunkNum)++; \ + } while (0) #define MP_LOCK(type, _lock) \ do { \ diff --git a/source/util/src/terror.c b/source/util/src/terror.c index 528a4a8a14..277763663e 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -104,7 +104,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_CFG_VALUE, "Invalid configuration TAOS_DEFINE_ERROR(TSDB_CODE_IP_NOT_IN_WHITE_LIST, "Not allowed to connect") TAOS_DEFINE_ERROR(TSDB_CODE_FAILED_TO_CONNECT_S3, "Failed to connect to s3 server") TAOS_DEFINE_ERROR(TSDB_CODE_MSG_PREPROCESSED, "Message has been processed in preprocess") -TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_MEM_POOL_PARAM, "Invalid memory pool param") +TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_MEM_POOL_PARAM, "Invalid memory pool input param") //client TAOS_DEFINE_ERROR(TSDB_CODE_TSC_INVALID_OPERATION, "Invalid operation") diff --git a/source/util/src/tmempool.c b/source/util/src/tmempool.c index 9a473fe1d1..f5c3abac49 100644 --- a/source/util/src/tmempool.c +++ b/source/util/src/tmempool.c @@ -45,56 +45,52 @@ int32_t memPoolCheckCfg(SMemPoolCfg* cfg) { return TSDB_CODE_SUCCESS; } -void memPoolFreeChunkCache(SMPChunkCache* pCache) { +void memPoolFreeChunkGroup(SMPChunkGroup* pGrp) { //TODO } -int32_t memPoolAddChunkCache(SMemPool* pPool, SMPChunkCache* pTail) { - if (0 == pPool->chunkCacheUnitNum) { - pPool->chunkCacheUnitNum = TMAX(pPool->maxChunkNum / 10, MP_CHUNKPOOL_MIN_BATCH_SIZE); - } - - SMPChunkCache* pCache = NULL; - if (NULL == pPool->pChunkCacheHead) { - pPool->pChunkCacheHead = taosMemCalloc(1, sizeof(*pPool->pChunkCacheHead)); - if (NULL == pPool->pChunkCacheHead) { +int32_t memPoolAddChunkGroup(SMemPool* pPool, SMPChunkGroupInfo* pInfo, SMPChunkGroup* pTail) { + SMPChunkGroup* pGrp = NULL; + if (NULL == pInfo->pChunkGrpHead) { + pInfo->pChunkGrpHead = taosMemCalloc(1, sizeof(*pInfo->pChunkGrpHead)); + if (NULL == pInfo->pChunkGrpHead) { uError("malloc chunkCache failed"); MP_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); } - pCache = pPool->pChunkCacheHead; + pGrp = pInfo->pChunkGrpHead; } else { - pCache = (SMPChunkCache*)taosMemCalloc(1, sizeof(SMPChunkCache)); + pGrp = (SMPChunkGroup*)taosMemCalloc(1, sizeof(SMPChunkGroup)); } - pCache->chunkNum = pPool->chunkCacheUnitNum; - pCache->pChunks = taosMemoryCalloc(pCache->chunkNum, sizeof(*pCache->pChunks)); - if (NULL == pCache->pChunks) { - uError("calloc %d chunks in cache failed", pCache->chunkNum); + pGrp->chunkNum = pInfo->chunkGroupNum; + pGrp->pChunks = taosMemoryCalloc(pGrp->chunkNum, pInfo->chunkNodeSize); + if (NULL == pGrp->pChunks) { + uError("calloc %d %d chunks in chunk group failed", pGrp->chunkNum, pInfo->chunkNodeSize); MP_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); } - if (atomic_val_compare_exchange_ptr(&pPool->pChunkCacheTail, pTail, pCache) != pTail) { - memPoolFreeChunkCache(pCache); + if (atomic_val_compare_exchange_ptr(&pInfo->pChunkGrpTail, pTail, pGrp) != pTail) { + memPoolFreeChunkGroup(pGrp); return TSDB_CODE_SUCCESS; } - atomic_add_fetch_64(&pPool->allocChunkCacheNum, pCache->chunkNum); + atomic_add_fetch_64(&pInfo->allocChunkNum, pGrp->chunkNum); return TSDB_CODE_SUCCESS; } -int32_t memPoolGetIdleChunk(SMemPool* pPool, SMPChunkCache** ppCache, SMPChunk** ppChunk) { - SMPChunkCache* pCache = NULL; +int32_t memPoolGetIdleChunk(SMemPool* pPool, SMPChunkGroupInfo* pInfo, void** ppChunk) { + SMPChunkGroup* pGrp = NULL; SMPChunk* pChunk = NULL; while (true) { - pChunk = (SMPChunk*)atomic_load_ptr(&pPool->pIdleChunkList); + pChunk = (SMPChunk*)atomic_load_ptr(&pInfo->pIdleChunkList); if (NULL == pChunk) { break; } - if (atomic_val_compare_exchange_ptr(&pPool->pIdleChunkList, pChunk, pChunk->pNext) != pChunk) { + if (atomic_val_compare_exchange_ptr(&pInfo->pIdleChunkList, pChunk, pChunk->pNext) != pChunk) { continue; } @@ -103,32 +99,28 @@ int32_t memPoolGetIdleChunk(SMemPool* pPool, SMPChunkCache** ppCache, SMPChunk** } while (true) { - pCache = atomic_load_ptr(&pPool->pChunkCacheTail); - int32_t offset = atomic_fetch_add_32(&pCache->idleOffset, 1); - if (offset < pCache->chunkNum) { - pChunk = pCache->pChunks + offset; + pGrp = atomic_load_ptr(&pInfo->pChunkGrpTail); + int32_t offset = atomic_fetch_add_32(&pGrp->idleOffset, 1); + if (offset < pGrp->chunkNum) { + pChunk = (SMPChunk*)((char*)pGrp->pChunks + offset * pInfo->chunkNodeSize); break; } else { - atomic_sub_fetch_32(&pCache->idleOffset, 1); + atomic_sub_fetch_32(&pGrp->idleOffset, 1); } - MP_ERR_RET(memPoolAddChunkCache(pPool, pCache)); + MP_ERR_RET(memPoolAddChunkGroup(pPool, pInfo, pGrp)); } _return: - if (NULL != ppCache) { - *ppCache = pCache; - } - *ppChunk = pChunk; return TSDB_CODE_SUCCESS; } -int32_t memPoolNewReadyChunk(SMemPool* pPool, SMPChunkCache** ppCache, SMPChunk** ppChunk) { +int32_t memPoolNewChunk(SMemPool* pPool, SMPChunk** ppChunk) { SMPChunk* pChunk = NULL; - MP_ERR_RET(memPoolGetIdleChunk(pPool, ppCache, &pChunk)); + MP_ERR_RET(memPoolGetIdleChunk(pPool, &pPool->chunkGrpInfo, &pChunk)); pChunk->pMemStart = taosMemMalloc(pPool->cfg.chunkSize); if (NULL == pChunk->pMemStart) { @@ -139,11 +131,29 @@ int32_t memPoolNewReadyChunk(SMemPool* pPool, SMPChunkCache** ppCache, SMPChunk* return TSDB_CODE_SUCCESS; } -int32_t memPoolAddReadyChunk(SMemPool* pPool, int32_t num) { - SMPChunkCache* pCache = NULL; + +int32_t memPoolNewNSChunk(SMemPool* pPool, SMPNSChunk** ppChunk, int64_t chunkSize) { + SMPNSChunk* pChunk = NULL; + MP_ERR_RET(memPoolGetIdleChunk(pPool, &pPool->NSChunkGrpInfo, &pChunk)); + + pChunk->pMemStart = taosMemMalloc(chunkSize); + if (NULL == pChunk->pMemStart) { + uError("add new chunk, memory malloc %" PRId64 " failed", chunkSize); + return TSDB_CODE_OUT_OF_MEMORY; + } + + pChunk->memBytes = chunkSize; + MP_SET_FLAG(pChunk->flags, MP_CHUNK_FLAG_NS_CHUNK); + + return TSDB_CODE_SUCCESS; +} + + +int32_t memPoolPrepareChunks(SMemPool* pPool, int32_t num) { + SMPChunkGroup* pGrp = NULL; SMPChunk* pChunk = NULL; for (int32_t i = 0; i < num; ++i) { - MP_ERR_RET(memPoolNewReadyChunk(pPool, &pCache, &pChunk)); + MP_ERR_RET(memPoolNewChunk(pPool, &pGrp, &pChunk)); if (NULL == pPool->readyChunkTail) { pPool->readyChunkHead = pChunk; @@ -158,7 +168,8 @@ int32_t memPoolAddReadyChunk(SMemPool* pPool, int32_t num) { return TSDB_CODE_SUCCESS; } -int32_t memPoolEnsureReadyChunks(SMemPool* pPool) { + +int32_t memPoolEnsureChunks(SMemPool* pPool) { if (E_EVICT_ALL == pPool->cfg.evicPolicy) { return TSDB_CODE_SUCCESS; } @@ -168,7 +179,7 @@ int32_t memPoolEnsureReadyChunks(SMemPool* pPool) { return TSDB_CODE_SUCCESS; } - MP_ERR_RET(memPoolAddReadyChunk(pPool, readyMissNum)); + MP_ERR_RET(memPoolPrepareChunks(pPool, readyMissNum)); return TSDB_CODE_SUCCESS; } @@ -193,27 +204,32 @@ int32_t memPoolInit(SMemPool* pPool, char* poolName, SMemPoolCfg* cfg) { pPool->threadChunkReserveNum = 1; pPool->readyChunkReserveNum = TMIN(cfg->threadNum * pPool->threadChunkReserveNum, pPool->maxChunkNum); - MP_ERR_RET(memPoolAddChunkCache(pPool)); + pPool->chunkGrpInfo.chunkGroupNum = TMAX(pPool->maxChunkNum / 10, MP_CHUNKGRP_ALLOC_BATCH_SIZE); + pPool->chunkGrpInfo.chunkNodeSize = sizeof(SMPChunk); + pPool->NSChunkGrpInfo.chunkGroupNum = MP_NSCHUNKGRP_ALLOC_BATCH_SIZE; + pPool->NSChunkGrpInfo.chunkNodeSize = sizeof(SMPNSChunk); - MP_ERR_RET(memPoolGetIdleChunk(pPool, NULL, &pPool->readyChunkHead)); + MP_ERR_RET(memPoolAddChunkGroup(pPool, &pPool->chunkGrpInfo, NULL)); + + MP_ERR_RET(memPoolGetIdleChunk(pPool, &pPool->chunkGrpInfo, &pPool->readyChunkHead)); pPool->readyChunkTail = pPool->readyChunkHead; - MP_ERR_RET(memPoolEnsureReadyChunks(pPool)); + MP_ERR_RET(memPoolEnsureChunks(pPool)); return TSDB_CODE_SUCCESS; } -void memPoolNotifyLowReadyChunk(SMemPool* pPool) { +void memPoolNotifyLowChunkNum(SMemPool* pPool) { } -int32_t memPoolGetReadyChunk(SMemPool* pPool, SMPChunk** ppChunk) { - SMPChunkCache* pCache = NULL; +int32_t memPoolGetChunk(SMemPool* pPool, SMPChunk** ppChunk) { + SMPChunkGroup* pCache = NULL; SMPChunk* pChunk = NULL; int32_t readyChunkNum = atomic_sub_fetch_32(&pPool->readyChunkNum, 1); if (readyChunkNum >= 0) { if (atomic_add_fetch_32(&pPool->readyChunkGotNum, 1) == pPool->readyChunkLowNum) { - memPoolNotifyLowReadyChunk(pPool); + memPoolNotifyLowChunkNum(pPool); } pChunk = (SMPChunk*)atomic_load_ptr(&pPool->readyChunkHead->pNext); @@ -226,9 +242,87 @@ int32_t memPoolGetReadyChunk(SMemPool* pPool, SMPChunk** ppChunk) { return TSDB_CODE_SUCCESS; } - MP_RET(memPoolNewReadyChunk(pPool, NULL, ppChunk)); + MP_RET(memPoolNewChunk(pPool, NULL, ppChunk)); } +int32_t memPoolGetChunkFromSession(SMemPool* pPool, SMPSession* pSession, int64_t size, SMPChunk** ppChunk, SMPChunk** ppPreChunk) { + SMPChunk* pChunk = pSession->srcChunkHead; + while (NULL != pChunk) { + if ((pChunk->offset + size) <= pPool->cfg.chunkSize) { + *ppChunk = pChunk; + break; + } + + *ppPreChunk = pChunk; + pChunk = (SMPChunk*)pChunk->pNext; + } + + if (NULL == *ppChunk) { + *ppPreChunk = NULL; + } + + return TSDB_CODE_SUCCESS; +} + +void* memPoolAllocFromChunk(SMemPool* pPool, SMPSession* pSession, int64_t size) { + SMPChunk* pChunk = NULL, *preSrcChunk = NULL; + void* pRes = NULL; + if (pSession->srcChunkNum > 0) { + MP_ERR_JRET(memPoolGetChunkFromSession(pPool, pSession, size, &pChunk, &preSrcChunk)); + } + + if (NULL == pChunk) { + MP_ERR_JRET(memPoolNewChunk(pPool, &pChunk)); + + pSession->allocChunkNum++; + pSession->allocChunkMemSize += pPool->cfg.chunkSize; + pSession->allocMemSize += size; + + MP_ADD_TO_CHUNK_LIST(pSession->srcChunkHead, pSession->srcChunkTail, pSession->srcChunkNum, pChunk); + MP_ADD_TO_CHUNK_LIST(pSession->inUseChunkHead, pSession->inUseChunkTail, pSession->inUseChunkNum, pChunk); + } + + pRes = pChunk->pMemStart + pChunk->offset; + pChunk->offset += size; + + if (pChunk->offset >= (pPool->cfg.chunkSize - pPool->maxDiscardSize)) { + if (NULL == preSrcChunk) { + pSession->srcChunkHead = NULL; + pSession->srcChunkTail = NULL; + } else { + preSrcChunk->pNext = pChunk->pNext; + } + + pSession->srcChunkNum--; + } + + +_return: + + return pRes; +} + +void* memPoolAllocFromNSChunk(SMemPool* pPool, SMPSession* pSession, int64_t size) { + SMPNSChunk* pChunk = NULL; + MP_ERR_JRET(memPoolNewNSChunk(pPool, &pChunk, size)); + + pSession->allocChunkNum++; + pSession->allocChunkMemSize += size; + pSession->allocMemSize += size; + + if (NULL == pSession->inUseNSChunkHead) { + pSession->inUseNSChunkHead = pChunk; + pSession->inUseNSChunkTail = pChunk; + } else { + pSession->inUseNSChunkTail->pNext = pChunk; + } + +_return: + + return pChunk ? pChunk->pMemStart : NULL; +} + + void taosMemPoolModInit(void) { taosThreadMutexInit(&gMPoolMutex, NULL); @@ -236,7 +330,6 @@ void taosMemPoolModInit(void) { gMPoolList = taosArrayInit(10, POINTER_BYTES); } - int32_t taosMemPoolOpen(char* poolName, SMemPoolCfg cfg, void** poolHandle) { int32_t code = TSDB_CODE_SUCCESS; SMemPool* pPool = NULL; @@ -285,13 +378,14 @@ int32_t taosMemPoolInitSession(void* poolHandle, void** ppSession) { MP_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY); } - MP_ERR_JRET(memPoolGetReadyChunk(pPool, &pSession->srcChunkHead)); + MP_ERR_JRET(memPoolGetChunk(pPool, &pSession->srcChunkHead)); - pSession->srcChunkTail = pSession->srcChunkHead; - pSession->srcChunkNum = 1; pSession->allocChunkNum = 1; pSession->allocChunkMemSize = pPool->cfg.chunkSize; + MP_ADD_TO_CHUNK_LIST(pSession->srcChunkHead, pSession->srcChunkTail, pSession->srcChunkNum, pSession->srcChunkHead); + MP_ADD_TO_CHUNK_LIST(pSession->inUseChunkHead, pSession->inUseChunkTail, pSession->inUseChunkNum, pSession->srcChunkHead); + _return: if (TSDB_CODE_SUCCESS != code) { @@ -304,33 +398,69 @@ _return: return code; } + void *taosMemPoolMalloc(void* poolHandle, void* session, int64_t size, char* fileName, int32_t lineNo) { + void *res = NULL; + + if (NULL == poolHandle || NULL == session || NULL == fileName || size < 0) { + uError("%s invalid input param, handle:%p, session:%p, fileName:%p, size:%" PRId64, __FUNC__, poolHandle, session, fileName, size); + MP_ERR_JRET(TSDB_CODE_INVALID_MEM_POOL_PARAM); + } + SMemPool* pPool = (SMemPool*)poolHandle; SMPSession* pSession = (SMPSession*)session; + res = (size > pPool->cfg.chunkSize) ? memPoolAllocFromNSChunk(pPool, pSession, size) : memPoolAllocFromChunk(pPool, pSession, size); + if (MP_GET_FLAG(pPool->dbgInfo.flags, MP_DBG_FLAG_LOG_MALLOC_FREE)) { + //TODO + } + +_return: + + return res; } void *taosMemPoolCalloc(void* poolHandle, void* session, int64_t num, int64_t size, char* fileName, int32_t lineNo) { + void *res = NULL; + + if (NULL == poolHandle || NULL == session || NULL == fileName || num < 0 || size < 0) { + uError("%s invalid input param, handle:%p, session:%p, fileName:%p, num:%" PRId64 ", size:%" PRId64, + __FUNC__, poolHandle, session, fileName, num, size); + MP_ERR_JRET(TSDB_CODE_INVALID_MEM_POOL_PARAM); + } + + SMemPool* pPool = (SMemPool*)poolHandle; + SMPSession* pSession = (SMPSession*)session; + res = (size > pPool->cfg.chunkSize) ? memPoolAllocFromNSChunk(pPool, pSession, size) : memPoolAllocFromChunk(pPool, pSession, size); + + memset(res, 0, num * size); + + if (MP_GET_FLAG(pPool->dbgInfo.flags, MP_DBG_FLAG_LOG_MALLOC_FREE)) { + //TODO + } + +_return: + + return res; +} + +void *taosMemPoolRealloc(void* poolHandle, void* session, void *ptr, int64_t size, char* fileName, int32_t lineNo) { } -void *taosMemPoolRealloc(void* poolHandle, void* session, void *ptr, int64_t size) { +char *taosMemPoolStrdup(void* poolHandle, void* session, const char *ptr, char* fileName, int32_t lineNo) { } -char *taosMemPoolStrdup(void* poolHandle, void* session, const char *ptr) { +void taosMemPoolFree(void* poolHandle, void* session, void *ptr, char* fileName, int32_t lineNo) { } -void taosMemPoolFree(void* poolHandle, void* session, void *ptr) { +int32_t taosMemPoolGetMemorySize(void* poolHandle, void* session, void *ptr, int64_t* size, char* fileName, int32_t lineNo) { } -int32_t taosMemPoolGetMemorySize(void* poolHandle, void* session, void *ptr, int64_t* size) { - -} - -void *taosMemPoolMallocAlign(void* poolHandle, void* session, uint32_t alignment, int64_t size) { +void *taosMemPoolMallocAlign(void* poolHandle, void* session, uint32_t alignment, int64_t size, char* fileName, int32_t lineNo) { }