enh: add memory pool

This commit is contained in:
dapan1121 2024-06-27 19:27:47 +08:00
parent 38a7dce5ca
commit 5f2ab7908a
3 changed files with 240 additions and 79 deletions

View File

@ -22,35 +22,41 @@ extern "C" {
#include "os.h"
#define MP_CHUNKPOOL_MIN_BATCH_SIZE 1000
#define MP_CHUNKGRP_ALLOC_BATCH_SIZE 1000
#define MP_NSCHUNKGRP_ALLOC_BATCH_SIZE 500
#define MP_MAX_KEEP_FREE_CHUNK_NUM 1000
#define MP_CHUNK_FLAG_IN_USE (1 << 0)
#define MP_CHUNK_FLAG_NS_CHUNK (1 << 1)
#define MP_DBG_FLAG_LOG_MALLOC_FREE (1 << 0)
typedef struct SMPChunk {
void *pNext;
void *pMemStart;
void *pNext;
char *pMemStart;
int32_t flags;
/* KEEP ABOVE SAME WITH SMPNSChunk */
uint32_t offset;
} SMPChunk;
typedef struct SMPNSChunk {
void *pNext;
void *pMemStart;
char *pMemStart;
int32_t flags;
uint32_t offset;
/* KEEP ABOVE SAME WITH SMPChunk */
uint32_t memBytes;
uint32_t offset;
uint64_t memBytes;
} SMPNSChunk;
typedef struct SMPChunkCache {
typedef struct SMPChunkGroup {
int32_t chunkNum;
int32_t idleOffset;
SMPChunk *pChunks;
void *pChunks;
void* pNext;
} SMPChunkCache;
} SMPChunkGroup;
typedef struct SMPMemoryStat {
int64_t chunkAlloc;
@ -77,6 +83,7 @@ typedef struct SMPSession {
SMPChunk *srcChunkHead;
SMPChunk *srcChunkTail;
int32_t inUseChunkNum;
SMPChunk *inUseChunkHead;
SMPChunk *inUseChunkTail;
@ -92,22 +99,36 @@ typedef struct SMPSession {
SMPStat stat;
} SMPSession;
typedef struct SMPChunkGroupInfo {
int16_t chunkNodeSize;
int64_t allocChunkNum;
int32_t chunkGroupNum;
SMPChunkGroup *pChunkGrpHead;
SMPChunkGroup *pChunkGrpTail;
void *pIdleChunkList;
} SMPChunkGroupInfo;
typedef struct SMPDebugInfo {
int64_t flags;
} SMPDebugInfo;
typedef struct SMemPool {
char *name;
int16_t slotId;
SMemPoolCfg cfg;
int32_t maxChunkNum;
SMPDebugInfo dbgInfo;
int16_t maxDiscardSize;
double threadChunkReserveNum;
int64_t allocChunkNum;
int64_t allocChunkSize;
int64_t allocNSChunkNum;
int64_t allocNSChunkSize;
int64_t allocMemSize;
int64_t usedMemSize;
int64_t allocChunkCacheNum;
int32_t chunkCacheUnitNum;
SMPChunkCache *pChunkCacheHead;
SMPChunkCache *pChunkCacheTail;
SMPChunk *pIdleChunkList;
SMPChunkGroupInfo chunkGrpInfo;
SMPChunkGroupInfo NSChunkGrpInfo;
int32_t readyChunkNum;
int32_t readyChunkReserveNum;
@ -124,15 +145,25 @@ typedef struct SMemPool {
SMPStat stat;
} SMemPool;
#define MP_CHUNK_GET_FLAG(st, f) ((st) & (f))
#define MP_CHUNK_SET_FLAG(st, f) (st) |= (f)
#define MP_CHUNK_CLR_FLAG(st, f) (st) &= (~f)
#define MP_GET_FLAG(st, f) ((st) & (f))
#define MP_SET_FLAG(st, f) (st) |= (f)
#define MP_CLR_FLAG(st, f) (st) &= (~f)
enum {
MP_READ = 1,
MP_WRITE,
};
#define MP_ADD_TO_CHUNK_LIST(_chunkHead, _chunkTail, _chunkNum, _chunk) \
do { \
if (NULL == _chunkHead) { \
_chunkHead = _chunk; \
_chunkTail = _chunk; \
} else { \
(_chunkTail)->pNext = _chunk; \
} \
(_chunkNum)++; \
} while (0)
#define MP_LOCK(type, _lock) \
do { \

View File

@ -104,7 +104,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_CFG_VALUE, "Invalid configuration
TAOS_DEFINE_ERROR(TSDB_CODE_IP_NOT_IN_WHITE_LIST, "Not allowed to connect")
TAOS_DEFINE_ERROR(TSDB_CODE_FAILED_TO_CONNECT_S3, "Failed to connect to s3 server")
TAOS_DEFINE_ERROR(TSDB_CODE_MSG_PREPROCESSED, "Message has been processed in preprocess")
TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_MEM_POOL_PARAM, "Invalid memory pool param")
TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_MEM_POOL_PARAM, "Invalid memory pool input param")
//client
TAOS_DEFINE_ERROR(TSDB_CODE_TSC_INVALID_OPERATION, "Invalid operation")

View File

@ -45,56 +45,52 @@ int32_t memPoolCheckCfg(SMemPoolCfg* cfg) {
return TSDB_CODE_SUCCESS;
}
void memPoolFreeChunkCache(SMPChunkCache* pCache) {
void memPoolFreeChunkGroup(SMPChunkGroup* pGrp) {
//TODO
}
int32_t memPoolAddChunkCache(SMemPool* pPool, SMPChunkCache* pTail) {
if (0 == pPool->chunkCacheUnitNum) {
pPool->chunkCacheUnitNum = TMAX(pPool->maxChunkNum / 10, MP_CHUNKPOOL_MIN_BATCH_SIZE);
}
SMPChunkCache* pCache = NULL;
if (NULL == pPool->pChunkCacheHead) {
pPool->pChunkCacheHead = taosMemCalloc(1, sizeof(*pPool->pChunkCacheHead));
if (NULL == pPool->pChunkCacheHead) {
int32_t memPoolAddChunkGroup(SMemPool* pPool, SMPChunkGroupInfo* pInfo, SMPChunkGroup* pTail) {
SMPChunkGroup* pGrp = NULL;
if (NULL == pInfo->pChunkGrpHead) {
pInfo->pChunkGrpHead = taosMemCalloc(1, sizeof(*pInfo->pChunkGrpHead));
if (NULL == pInfo->pChunkGrpHead) {
uError("malloc chunkCache failed");
MP_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
pCache = pPool->pChunkCacheHead;
pGrp = pInfo->pChunkGrpHead;
} else {
pCache = (SMPChunkCache*)taosMemCalloc(1, sizeof(SMPChunkCache));
pGrp = (SMPChunkGroup*)taosMemCalloc(1, sizeof(SMPChunkGroup));
}
pCache->chunkNum = pPool->chunkCacheUnitNum;
pCache->pChunks = taosMemoryCalloc(pCache->chunkNum, sizeof(*pCache->pChunks));
if (NULL == pCache->pChunks) {
uError("calloc %d chunks in cache failed", pCache->chunkNum);
pGrp->chunkNum = pInfo->chunkGroupNum;
pGrp->pChunks = taosMemoryCalloc(pGrp->chunkNum, pInfo->chunkNodeSize);
if (NULL == pGrp->pChunks) {
uError("calloc %d %d chunks in chunk group failed", pGrp->chunkNum, pInfo->chunkNodeSize);
MP_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
if (atomic_val_compare_exchange_ptr(&pPool->pChunkCacheTail, pTail, pCache) != pTail) {
memPoolFreeChunkCache(pCache);
if (atomic_val_compare_exchange_ptr(&pInfo->pChunkGrpTail, pTail, pGrp) != pTail) {
memPoolFreeChunkGroup(pGrp);
return TSDB_CODE_SUCCESS;
}
atomic_add_fetch_64(&pPool->allocChunkCacheNum, pCache->chunkNum);
atomic_add_fetch_64(&pInfo->allocChunkNum, pGrp->chunkNum);
return TSDB_CODE_SUCCESS;
}
int32_t memPoolGetIdleChunk(SMemPool* pPool, SMPChunkCache** ppCache, SMPChunk** ppChunk) {
SMPChunkCache* pCache = NULL;
int32_t memPoolGetIdleChunk(SMemPool* pPool, SMPChunkGroupInfo* pInfo, void** ppChunk) {
SMPChunkGroup* pGrp = NULL;
SMPChunk* pChunk = NULL;
while (true) {
pChunk = (SMPChunk*)atomic_load_ptr(&pPool->pIdleChunkList);
pChunk = (SMPChunk*)atomic_load_ptr(&pInfo->pIdleChunkList);
if (NULL == pChunk) {
break;
}
if (atomic_val_compare_exchange_ptr(&pPool->pIdleChunkList, pChunk, pChunk->pNext) != pChunk) {
if (atomic_val_compare_exchange_ptr(&pInfo->pIdleChunkList, pChunk, pChunk->pNext) != pChunk) {
continue;
}
@ -103,32 +99,28 @@ int32_t memPoolGetIdleChunk(SMemPool* pPool, SMPChunkCache** ppCache, SMPChunk**
}
while (true) {
pCache = atomic_load_ptr(&pPool->pChunkCacheTail);
int32_t offset = atomic_fetch_add_32(&pCache->idleOffset, 1);
if (offset < pCache->chunkNum) {
pChunk = pCache->pChunks + offset;
pGrp = atomic_load_ptr(&pInfo->pChunkGrpTail);
int32_t offset = atomic_fetch_add_32(&pGrp->idleOffset, 1);
if (offset < pGrp->chunkNum) {
pChunk = (SMPChunk*)((char*)pGrp->pChunks + offset * pInfo->chunkNodeSize);
break;
} else {
atomic_sub_fetch_32(&pCache->idleOffset, 1);
atomic_sub_fetch_32(&pGrp->idleOffset, 1);
}
MP_ERR_RET(memPoolAddChunkCache(pPool, pCache));
MP_ERR_RET(memPoolAddChunkGroup(pPool, pInfo, pGrp));
}
_return:
if (NULL != ppCache) {
*ppCache = pCache;
}
*ppChunk = pChunk;
return TSDB_CODE_SUCCESS;
}
int32_t memPoolNewReadyChunk(SMemPool* pPool, SMPChunkCache** ppCache, SMPChunk** ppChunk) {
int32_t memPoolNewChunk(SMemPool* pPool, SMPChunk** ppChunk) {
SMPChunk* pChunk = NULL;
MP_ERR_RET(memPoolGetIdleChunk(pPool, ppCache, &pChunk));
MP_ERR_RET(memPoolGetIdleChunk(pPool, &pPool->chunkGrpInfo, &pChunk));
pChunk->pMemStart = taosMemMalloc(pPool->cfg.chunkSize);
if (NULL == pChunk->pMemStart) {
@ -139,11 +131,29 @@ int32_t memPoolNewReadyChunk(SMemPool* pPool, SMPChunkCache** ppCache, SMPChunk*
return TSDB_CODE_SUCCESS;
}
int32_t memPoolAddReadyChunk(SMemPool* pPool, int32_t num) {
SMPChunkCache* pCache = NULL;
int32_t memPoolNewNSChunk(SMemPool* pPool, SMPNSChunk** ppChunk, int64_t chunkSize) {
SMPNSChunk* pChunk = NULL;
MP_ERR_RET(memPoolGetIdleChunk(pPool, &pPool->NSChunkGrpInfo, &pChunk));
pChunk->pMemStart = taosMemMalloc(chunkSize);
if (NULL == pChunk->pMemStart) {
uError("add new chunk, memory malloc %" PRId64 " failed", chunkSize);
return TSDB_CODE_OUT_OF_MEMORY;
}
pChunk->memBytes = chunkSize;
MP_SET_FLAG(pChunk->flags, MP_CHUNK_FLAG_NS_CHUNK);
return TSDB_CODE_SUCCESS;
}
int32_t memPoolPrepareChunks(SMemPool* pPool, int32_t num) {
SMPChunkGroup* pGrp = NULL;
SMPChunk* pChunk = NULL;
for (int32_t i = 0; i < num; ++i) {
MP_ERR_RET(memPoolNewReadyChunk(pPool, &pCache, &pChunk));
MP_ERR_RET(memPoolNewChunk(pPool, &pGrp, &pChunk));
if (NULL == pPool->readyChunkTail) {
pPool->readyChunkHead = pChunk;
@ -158,7 +168,8 @@ int32_t memPoolAddReadyChunk(SMemPool* pPool, int32_t num) {
return TSDB_CODE_SUCCESS;
}
int32_t memPoolEnsureReadyChunks(SMemPool* pPool) {
int32_t memPoolEnsureChunks(SMemPool* pPool) {
if (E_EVICT_ALL == pPool->cfg.evicPolicy) {
return TSDB_CODE_SUCCESS;
}
@ -168,7 +179,7 @@ int32_t memPoolEnsureReadyChunks(SMemPool* pPool) {
return TSDB_CODE_SUCCESS;
}
MP_ERR_RET(memPoolAddReadyChunk(pPool, readyMissNum));
MP_ERR_RET(memPoolPrepareChunks(pPool, readyMissNum));
return TSDB_CODE_SUCCESS;
}
@ -193,27 +204,32 @@ int32_t memPoolInit(SMemPool* pPool, char* poolName, SMemPoolCfg* cfg) {
pPool->threadChunkReserveNum = 1;
pPool->readyChunkReserveNum = TMIN(cfg->threadNum * pPool->threadChunkReserveNum, pPool->maxChunkNum);
MP_ERR_RET(memPoolAddChunkCache(pPool));
pPool->chunkGrpInfo.chunkGroupNum = TMAX(pPool->maxChunkNum / 10, MP_CHUNKGRP_ALLOC_BATCH_SIZE);
pPool->chunkGrpInfo.chunkNodeSize = sizeof(SMPChunk);
pPool->NSChunkGrpInfo.chunkGroupNum = MP_NSCHUNKGRP_ALLOC_BATCH_SIZE;
pPool->NSChunkGrpInfo.chunkNodeSize = sizeof(SMPNSChunk);
MP_ERR_RET(memPoolGetIdleChunk(pPool, NULL, &pPool->readyChunkHead));
MP_ERR_RET(memPoolAddChunkGroup(pPool, &pPool->chunkGrpInfo, NULL));
MP_ERR_RET(memPoolGetIdleChunk(pPool, &pPool->chunkGrpInfo, &pPool->readyChunkHead));
pPool->readyChunkTail = pPool->readyChunkHead;
MP_ERR_RET(memPoolEnsureReadyChunks(pPool));
MP_ERR_RET(memPoolEnsureChunks(pPool));
return TSDB_CODE_SUCCESS;
}
void memPoolNotifyLowReadyChunk(SMemPool* pPool) {
void memPoolNotifyLowChunkNum(SMemPool* pPool) {
}
int32_t memPoolGetReadyChunk(SMemPool* pPool, SMPChunk** ppChunk) {
SMPChunkCache* pCache = NULL;
int32_t memPoolGetChunk(SMemPool* pPool, SMPChunk** ppChunk) {
SMPChunkGroup* pCache = NULL;
SMPChunk* pChunk = NULL;
int32_t readyChunkNum = atomic_sub_fetch_32(&pPool->readyChunkNum, 1);
if (readyChunkNum >= 0) {
if (atomic_add_fetch_32(&pPool->readyChunkGotNum, 1) == pPool->readyChunkLowNum) {
memPoolNotifyLowReadyChunk(pPool);
memPoolNotifyLowChunkNum(pPool);
}
pChunk = (SMPChunk*)atomic_load_ptr(&pPool->readyChunkHead->pNext);
@ -226,9 +242,87 @@ int32_t memPoolGetReadyChunk(SMemPool* pPool, SMPChunk** ppChunk) {
return TSDB_CODE_SUCCESS;
}
MP_RET(memPoolNewReadyChunk(pPool, NULL, ppChunk));
MP_RET(memPoolNewChunk(pPool, NULL, ppChunk));
}
int32_t memPoolGetChunkFromSession(SMemPool* pPool, SMPSession* pSession, int64_t size, SMPChunk** ppChunk, SMPChunk** ppPreChunk) {
SMPChunk* pChunk = pSession->srcChunkHead;
while (NULL != pChunk) {
if ((pChunk->offset + size) <= pPool->cfg.chunkSize) {
*ppChunk = pChunk;
break;
}
*ppPreChunk = pChunk;
pChunk = (SMPChunk*)pChunk->pNext;
}
if (NULL == *ppChunk) {
*ppPreChunk = NULL;
}
return TSDB_CODE_SUCCESS;
}
void* memPoolAllocFromChunk(SMemPool* pPool, SMPSession* pSession, int64_t size) {
SMPChunk* pChunk = NULL, *preSrcChunk = NULL;
void* pRes = NULL;
if (pSession->srcChunkNum > 0) {
MP_ERR_JRET(memPoolGetChunkFromSession(pPool, pSession, size, &pChunk, &preSrcChunk));
}
if (NULL == pChunk) {
MP_ERR_JRET(memPoolNewChunk(pPool, &pChunk));
pSession->allocChunkNum++;
pSession->allocChunkMemSize += pPool->cfg.chunkSize;
pSession->allocMemSize += size;
MP_ADD_TO_CHUNK_LIST(pSession->srcChunkHead, pSession->srcChunkTail, pSession->srcChunkNum, pChunk);
MP_ADD_TO_CHUNK_LIST(pSession->inUseChunkHead, pSession->inUseChunkTail, pSession->inUseChunkNum, pChunk);
}
pRes = pChunk->pMemStart + pChunk->offset;
pChunk->offset += size;
if (pChunk->offset >= (pPool->cfg.chunkSize - pPool->maxDiscardSize)) {
if (NULL == preSrcChunk) {
pSession->srcChunkHead = NULL;
pSession->srcChunkTail = NULL;
} else {
preSrcChunk->pNext = pChunk->pNext;
}
pSession->srcChunkNum--;
}
_return:
return pRes;
}
void* memPoolAllocFromNSChunk(SMemPool* pPool, SMPSession* pSession, int64_t size) {
SMPNSChunk* pChunk = NULL;
MP_ERR_JRET(memPoolNewNSChunk(pPool, &pChunk, size));
pSession->allocChunkNum++;
pSession->allocChunkMemSize += size;
pSession->allocMemSize += size;
if (NULL == pSession->inUseNSChunkHead) {
pSession->inUseNSChunkHead = pChunk;
pSession->inUseNSChunkTail = pChunk;
} else {
pSession->inUseNSChunkTail->pNext = pChunk;
}
_return:
return pChunk ? pChunk->pMemStart : NULL;
}
void taosMemPoolModInit(void) {
taosThreadMutexInit(&gMPoolMutex, NULL);
@ -236,7 +330,6 @@ void taosMemPoolModInit(void) {
gMPoolList = taosArrayInit(10, POINTER_BYTES);
}
int32_t taosMemPoolOpen(char* poolName, SMemPoolCfg cfg, void** poolHandle) {
int32_t code = TSDB_CODE_SUCCESS;
SMemPool* pPool = NULL;
@ -285,13 +378,14 @@ int32_t taosMemPoolInitSession(void* poolHandle, void** ppSession) {
MP_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
}
MP_ERR_JRET(memPoolGetReadyChunk(pPool, &pSession->srcChunkHead));
MP_ERR_JRET(memPoolGetChunk(pPool, &pSession->srcChunkHead));
pSession->srcChunkTail = pSession->srcChunkHead;
pSession->srcChunkNum = 1;
pSession->allocChunkNum = 1;
pSession->allocChunkMemSize = pPool->cfg.chunkSize;
MP_ADD_TO_CHUNK_LIST(pSession->srcChunkHead, pSession->srcChunkTail, pSession->srcChunkNum, pSession->srcChunkHead);
MP_ADD_TO_CHUNK_LIST(pSession->inUseChunkHead, pSession->inUseChunkTail, pSession->inUseChunkNum, pSession->srcChunkHead);
_return:
if (TSDB_CODE_SUCCESS != code) {
@ -304,33 +398,69 @@ _return:
return code;
}
void *taosMemPoolMalloc(void* poolHandle, void* session, int64_t size, char* fileName, int32_t lineNo) {
void *res = NULL;
if (NULL == poolHandle || NULL == session || NULL == fileName || size < 0) {
uError("%s invalid input param, handle:%p, session:%p, fileName:%p, size:%" PRId64, __FUNC__, poolHandle, session, fileName, size);
MP_ERR_JRET(TSDB_CODE_INVALID_MEM_POOL_PARAM);
}
SMemPool* pPool = (SMemPool*)poolHandle;
SMPSession* pSession = (SMPSession*)session;
res = (size > pPool->cfg.chunkSize) ? memPoolAllocFromNSChunk(pPool, pSession, size) : memPoolAllocFromChunk(pPool, pSession, size);
if (MP_GET_FLAG(pPool->dbgInfo.flags, MP_DBG_FLAG_LOG_MALLOC_FREE)) {
//TODO
}
_return:
return res;
}
void *taosMemPoolCalloc(void* poolHandle, void* session, int64_t num, int64_t size, char* fileName, int32_t lineNo) {
void *res = NULL;
if (NULL == poolHandle || NULL == session || NULL == fileName || num < 0 || size < 0) {
uError("%s invalid input param, handle:%p, session:%p, fileName:%p, num:%" PRId64 ", size:%" PRId64,
__FUNC__, poolHandle, session, fileName, num, size);
MP_ERR_JRET(TSDB_CODE_INVALID_MEM_POOL_PARAM);
}
SMemPool* pPool = (SMemPool*)poolHandle;
SMPSession* pSession = (SMPSession*)session;
res = (size > pPool->cfg.chunkSize) ? memPoolAllocFromNSChunk(pPool, pSession, size) : memPoolAllocFromChunk(pPool, pSession, size);
memset(res, 0, num * size);
if (MP_GET_FLAG(pPool->dbgInfo.flags, MP_DBG_FLAG_LOG_MALLOC_FREE)) {
//TODO
}
_return:
return res;
}
void *taosMemPoolRealloc(void* poolHandle, void* session, void *ptr, int64_t size, char* fileName, int32_t lineNo) {
}
void *taosMemPoolRealloc(void* poolHandle, void* session, void *ptr, int64_t size) {
char *taosMemPoolStrdup(void* poolHandle, void* session, const char *ptr, char* fileName, int32_t lineNo) {
}
char *taosMemPoolStrdup(void* poolHandle, void* session, const char *ptr) {
void taosMemPoolFree(void* poolHandle, void* session, void *ptr, char* fileName, int32_t lineNo) {
}
void taosMemPoolFree(void* poolHandle, void* session, void *ptr) {
int32_t taosMemPoolGetMemorySize(void* poolHandle, void* session, void *ptr, int64_t* size, char* fileName, int32_t lineNo) {
}
int32_t taosMemPoolGetMemorySize(void* poolHandle, void* session, void *ptr, int64_t* size) {
}
void *taosMemPoolMallocAlign(void* poolHandle, void* session, uint32_t alignment, int64_t size) {
void *taosMemPoolMallocAlign(void* poolHandle, void* session, uint32_t alignment, int64_t size, char* fileName, int32_t lineNo) {
}