feat: add memory pool

This commit is contained in:
dapan1121 2024-06-25 19:29:46 +08:00
parent 42beba30a8
commit cdaf8cef69
6 changed files with 338 additions and 102 deletions

View File

@ -43,24 +43,25 @@ extern "C" {
int32_t taosMemoryDbgInit();
int32_t taosMemoryDbgInitRestore();
void *taosMemoryMalloc(int64_t size);
void *taosMemoryCalloc(int64_t num, int64_t size);
void *taosMemoryRealloc(void *ptr, int64_t size);
char *taosStrdup(const char *ptr);
void taosMemoryFree(void *ptr);
int64_t taosMemorySize(void *ptr);
void *taosMemMalloc(int64_t size);
void *taosMemCalloc(int64_t num, int64_t size);
void *taosMemRealloc(void *ptr, int64_t size);
char *taosStrdupi(const char *ptr);
void taosMemFree(void *ptr);
int64_t taosMemSize(void *ptr);
void taosPrintBackTrace();
void taosMemoryTrim(int32_t size);
void *taosMemoryMallocAlign(uint32_t alignment, int64_t size);
void taosMemTrim(int32_t size);
void *taosMemMallocAlign(uint32_t alignment, int64_t size);
#define taosMemoryFreeClear(ptr) \
#define taosMemFreeClear(ptr) \
do { \
if (ptr) { \
taosMemoryFree((void *)ptr); \
taosMemFree((void *)ptr); \
(ptr) = NULL; \
} \
} while (0)
#ifdef __cplusplus
}
#endif

View File

@ -145,6 +145,7 @@ int32_t taosGetErrSize();
#define TSDB_CODE_IP_NOT_IN_WHITE_LIST TAOS_DEF_ERROR_CODE(0, 0x0134)
#define TSDB_CODE_FAILED_TO_CONNECT_S3 TAOS_DEF_ERROR_CODE(0, 0x0135)
#define TSDB_CODE_MSG_PREPROCESSED TAOS_DEF_ERROR_CODE(0, 0x0136) // internal
#define TSDB_CODE_INVALID_MEM_POOL_PARAM TAOS_DEF_ERROR_CODE(0, 0x0137)
//client
#define TSDB_CODE_TSC_INVALID_OPERATION TAOS_DEF_ERROR_CODE(0, 0x0200)

View File

@ -21,12 +21,53 @@
extern "C" {
#endif
typedef void *mpool_h;
#define MEMPOOL_MAX_CHUNK_SIZE (1 << 30)
#define MEMPOOL_MIN_CHUNK_SIZE (1 << 20)
typedef enum MemPoolEvictPolicy{
E_EVICT_ALL = 1,
E_EVICT_NONE,
E_EVICT_AUTO,
E_EVICT_MAX_VALUE, // no used
} MemPoolEvictPolicy;
typedef struct SMemPoolCfg {
int64_t maxSize;
int32_t chunkSize;
int32_t threadNum;
MemPoolEvictPolicy evicPolicy;
} SMemPoolCfg;
void taosMemPoolModInit(void);
int32_t taosMemPoolOpen(char* poolName, SMemPoolCfg cfg, void** poolHandle);
void *taosMemPoolMalloc(void* poolHandle, int64_t size, char* fileName, int32_t lineNo);
void *taosMemPoolCalloc(void* poolHandle, int64_t num, int64_t size, char* fileName, int32_t lineNo);
void *taosMemPoolRealloc(void* poolHandle, void *ptr, int64_t size, char* fileName, int32_t lineNo);
char *taosMemPoolStrdup(void* poolHandle, const char *ptr, char* fileName, int32_t lineNo);
void taosMemPoolFree(void* poolHandle, void *ptr, char* fileName, int32_t lineNo);
int64_t taosMemPoolGetMemorySize(void* poolHandle, void *ptr, char* fileName, int32_t lineNo);
void taosMemPoolTrim(void* poolHandle, int32_t size, char* fileName, int32_t lineNo);
void *taosMemPoolMallocAlign(void* poolHandle, uint32_t alignment, int64_t size, char* fileName, int32_t lineNo);
void taosMemPoolClose(void* poolHandle);
void taosMemPoolModDestroy(void);
#define taosMemoryMalloc(_size) ((NULL != threadPoolHandle) ? (taosMemPoolMalloc(threadPoolHandle, _size, __FILE__, __LINE__)) : (taosMemMalloc(_size)))
#define taosMemoryCalloc(_num, _size) ((NULL != threadPoolHandle) ? (taosMemPoolCalloc(threadPoolHandle, _num, _size, __FILE__, __LINE__)) : (taosMemCalloc(_num, _size)))
#define taosMemoryRealloc(_ptr, _size) ((NULL != threadPoolHandle) ? (taosMemPoolRealloc(threadPoolHandle, _ptr, _size, __FILE__, __LINE__)) : (taosMemRealloc(_ptr, _size)))
#define taosStrdup(_ptr) ((NULL != threadPoolHandle) ? (taosMemPoolStrdup(threadPoolHandle, _ptr, __FILE__, __LINE__)) : (taosStrdupi(_ptr)))
#define taosMemoryFree(_ptr) ((NULL != threadPoolHandle) ? (taosMemPoolFree(threadPoolHandle, _ptr, __FILE__, __LINE__)) : (taosMemFree(_ptr)))
#define taosMemorySize(_ptr) ((NULL != threadPoolHandle) ? (taosMemPoolGetMemorySize(threadPoolHandle, _ptr, __FILE__, __LINE__)) : (taosMemSize(_ptr)))
#define taosMemoryTrim(_size) ((NULL != threadPoolHandle) ? (taosMemPoolTrim(threadPoolHandle, _size, __FILE__, __LINE__)) : (taosMemTrim(_size)))
#define taosMemoryMallocAlign(_alignment, _size) ((NULL != threadPoolHandle) ? (taosMemPoolMallocAlign(threadPoolHandle, _alignment, _size, __FILE__, __LINE__)) : (taosMemMallocAlign(_alignment, _size)))
#define taosMemoryFreeClear(ptr) \
do { \
if (ptr) { \
taosMemoryFree((void *)ptr); \
(ptr) = NULL; \
} \
} while (0)
mpool_h taosMemPoolInit(int32_t maxNum, int32_t blockSize);
char *taosMemPoolMalloc(mpool_h handle);
void taosMemPoolFree(mpool_h handle, char *p);
void taosMemPoolCleanUp(mpool_h handle);
#ifdef __cplusplus
}

117
source/util/inc/tmempoolInt.h Executable file
View File

@ -0,0 +1,117 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_UTIL_MEMPOOL_INT_H_
#define _TD_UTIL_MEMPOOL_INT_H_
#ifdef __cplusplus
extern "C" {
#endif
#include "os.h"
#define MEMPOOL_CHUNKPOOL_MIN_BATCH_SIZE 1000
#define MEMPOOL_MAX_KEEP_FREE_CHUNK_NUM 1000
typedef struct SChunk {
void *pNext;
int64_t flags;
uint32_t memSize;
uint32_t offset;
void *pMemStart;
} SChunk;
typedef struct SChunkCache {
int32_t chunkNum;
int32_t idleOffset;
SChunk *pChunks;
} SChunkCache;
typedef struct SMemoryStat {
int64_t chunkAlloc;
int64_t chunkFree;
int64_t memMalloc;
int64_t memCalloc;
int64_t memRealloc;
int64_t strdup;
int64_t memFree;
} SMemoryStat;
typedef struct SMemPoolStat {
SMemPoolMemoryStat times;
SMemPoolMemoryStat bytes;
} SMemPoolStat;
typedef struct SMemPool {
char *name;
int16_t slotId;
SMemPoolCfg cfg;
int32_t maxChunkNum;
double threadChunkReserveNum;
int64_t allocChunkNum;
int64_t allocMemSize;
int64_t usedMemSize;
int64_t allocChunkCacheNum;
int32_t chunkCacheUnitNum;
SArray *pChunkCache; // SArray<SChunkCache>
int32_t freeChunkNum;
int32_t freeChunkReserveNum;
SChunk *freeChunkHead;
SChunk *freeChunkTail;
int64_t freeNChunkNum;
SChunk *freeNChunkHead;
SChunk *freeNChunkTail;
SMemPoolStat stat;
} SMemPool;
#define MP_ERR_RET(c) \
do { \
int32_t _code = c; \
if (_code != TSDB_CODE_SUCCESS) { \
terrno = _code; \
return _code; \
} \
} while (0)
#define MP_RET(c) \
do { \
int32_t _code = c; \
if (_code != TSDB_CODE_SUCCESS) { \
terrno = _code; \
} \
return _code; \
} while (0)
#define MP_ERR_JRET(c) \
do { \
code = c; \
if (code != TSDB_CODE_SUCCESS) { \
terrno = code; \
goto _return; \
} \
} while (0)
#ifdef __cplusplus
}
#endif
#endif /* _TD_UTIL_MEMPOOL_INT_H_ */

View File

@ -104,6 +104,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_CFG_VALUE, "Invalid configuration
TAOS_DEFINE_ERROR(TSDB_CODE_IP_NOT_IN_WHITE_LIST, "Not allowed to connect")
TAOS_DEFINE_ERROR(TSDB_CODE_FAILED_TO_CONNECT_S3, "Failed to connect to s3 server")
TAOS_DEFINE_ERROR(TSDB_CODE_MSG_PREPROCESSED, "Message has been processed in preprocess")
TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_MEM_POOL_PARAM, "Invalid memory pool param")
//client
TAOS_DEFINE_ERROR(TSDB_CODE_TSC_INVALID_OPERATION, "Invalid operation")

View File

@ -15,113 +15,188 @@
#define _DEFAULT_SOURCE
#include "tmempool.h"
#include "tmempoolInt.h"
#include "tlog.h"
#include "tutil.h"
typedef struct {
int32_t numOfFree; /* number of free slots */
int32_t first; /* the first free slot */
int32_t numOfBlock; /* the number of blocks */
int32_t blockSize; /* block size in bytes */
int32_t *freeList; /* the index list */
char *pool; /* the actual mem block */
TdThreadMutex mutex;
} pool_t;
static SArray* gMPoolList;
static TdThreadOnce gMPoolInit = PTHREAD_ONCE_INIT;
static TdThreadMutex gMPoolMutex;
mpool_h taosMemPoolInit(int32_t numOfBlock, int32_t blockSize) {
int32_t i;
pool_t *pool_p;
if (numOfBlock <= 1 || blockSize <= 1) {
uError("invalid parameter in memPoolInit\n");
return NULL;
int32_t memPoolCheckCfg(SMemPoolCfg* cfg) {
if (cfg->chunkSize < MEMPOOL_MIN_CHUNK_SIZE || cfg->chunkSize > MEMPOOL_MAX_CHUNK_SIZE) {
uError("invalid memory pool chunkSize:%d", cfg->chunkSize);
return TSDB_CODE_INVALID_MEM_POOL_PARAM;
}
pool_p = (pool_t *)taosMemoryMalloc(sizeof(pool_t));
if (pool_p == NULL) {
uError("mempool malloc failed\n");
return NULL;
} else {
memset(pool_p, 0, sizeof(pool_t));
if (cfg->evicPolicy <= 0 || cfg->evicPolicy >= E_EVICT_MAX_VALUE) {
uError("invalid memory pool evicPolicy:%d", cfg->evicPolicy);
return TSDB_CODE_INVALID_MEM_POOL_PARAM;
}
pool_p->blockSize = blockSize;
pool_p->numOfBlock = numOfBlock;
pool_p->pool = (char *)taosMemoryMalloc((size_t)(blockSize * numOfBlock));
pool_p->freeList = (int32_t *)taosMemoryMalloc(sizeof(int32_t) * (size_t)numOfBlock);
if (pool_p->pool == NULL || pool_p->freeList == NULL) {
uError("failed to allocate memory\n");
taosMemoryFreeClear(pool_p->freeList);
taosMemoryFreeClear(pool_p->pool);
taosMemoryFreeClear(pool_p);
return NULL;
if (cfg->threadNum <= 0) {
uError("invalid memory pool threadNum:%d", cfg->threadNum);
return TSDB_CODE_INVALID_MEM_POOL_PARAM;
}
taosThreadMutexInit(&(pool_p->mutex), NULL);
memset(pool_p->pool, 0, (size_t)(blockSize * numOfBlock));
for (i = 0; i < pool_p->numOfBlock; ++i) pool_p->freeList[i] = i;
pool_p->first = 0;
pool_p->numOfFree = pool_p->numOfBlock;
return (mpool_h)pool_p;
return TSDB_CODE_SUCCESS;
}
char *taosMemPoolMalloc(mpool_h handle) {
char *pos = NULL;
pool_t *pool_p = (pool_t *)handle;
taosThreadMutexLock(&(pool_p->mutex));
if (pool_p->numOfFree > 0) {
pos = pool_p->pool + pool_p->blockSize * (pool_p->freeList[pool_p->first]);
pool_p->first++;
pool_p->first = pool_p->first % pool_p->numOfBlock;
pool_p->numOfFree--;
int32_t memPoolAddChunkCache(SMemPool* pPool) {
if (0 == pPool->chunkCacheUnitNum) {
pPool->chunkCacheUnitNum = TMAX(pPool->maxChunkNum / 10, MEMPOOL_CHUNKPOOL_MIN_BATCH_SIZE);
}
taosThreadMutexUnlock(&(pool_p->mutex));
if (pos == NULL) uDebug("mempool: out of memory");
return pos;
}
void taosMemPoolFree(mpool_h handle, char *pMem) {
int32_t index;
pool_t *pool_p = (pool_t *)handle;
if (pMem == NULL) return;
index = (int32_t)(pMem - pool_p->pool) % pool_p->blockSize;
if (index != 0) {
uError("invalid free address:%p\n", pMem);
return;
if (NULL == pPool->pChunkCache) {
pPool->pChunkCache = taosArrayInit(10, sizeof(SChunkCache));
if (NULL == pPool->pChunkCache) {
uError("taosArrayInit chunkPool failed");
MP_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
}
index = (int32_t)((pMem - pool_p->pool) / pool_p->blockSize);
if (index < 0 || index >= pool_p->numOfBlock) {
uError("mempool: error, invalid address:%p", pMem);
return;
SChunkCache* pChunkCache = taosArrayReserve(pPool->pChunkCache, 1);
pChunkCache->chunkNum = pPool->chunkCacheUnitNum;
pChunkCache->pChunks = taosMemoryCalloc(pChunkCache->chunkNum, sizeof(*pChunkCache->pChunks));
if (NULL == pChunkCache->pChunks) {
uError("calloc %d chunks in pool failed", pChunkCache->chunkNum);
MP_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
memset(pMem, 0, (size_t)pool_p->blockSize);
atomic_add_fetch_64(&pPool->allocChunkCacheNum, pChunkCache->chunkNum);
taosThreadMutexLock(&pool_p->mutex);
pool_p->freeList[(pool_p->first + pool_p->numOfFree) % pool_p->numOfBlock] = index;
pool_p->numOfFree++;
taosThreadMutexUnlock(&pool_p->mutex);
return TSDB_CODE_SUCCESS;
}
void taosMemPoolCleanUp(mpool_h handle) {
pool_t *pool_p = (pool_t *)handle;
int32_t memPoolGetChunkCache(SMemPool* pPool, SChunkCache** pCache) {
while (true) {
SChunkCache* cache = (SChunkCache*)taosArrayGetLast(pPool->pChunkCache);
if (NULL != cache && cache->idleOffset < cache->chunkNum) {
*pCache = cache;
break;
}
taosThreadMutexDestroy(&pool_p->mutex);
if (pool_p->pool) taosMemoryFree(pool_p->pool);
if (pool_p->freeList) taosMemoryFree(pool_p->freeList);
memset(pool_p, 0, sizeof(*pool_p));
taosMemoryFree(pool_p);
MP_ERR_RET(memPoolAddChunkCache(pPool));
}
return TSDB_CODE_SUCCESS;
}
int32_t memPoolGetIdleChunkFromCache() {
}
int32_t memPoolEnsureMinFreeChunk(SMemPool* pPool) {
if (E_EVICT_ALL == pPool->cfg.evicPolicy) {
return TSDB_CODE_SUCCESS;
}
if (pPool->freeChunkNum >= pPool->freeChunkReserveNum) {
return TSDB_CODE_SUCCESS;
}
int32_t toAddNum = pPool->freeChunkReserveNum - pPool->freeChunkNum;
for (int32_t i = 0; i < toAddNum; ++i) {
memPoolGetIdleChunk(pPool);
}
}
int32_t memPoolInit(SMemPool* pPool, char* poolName, SMemPoolCfg* cfg) {
MP_ERR_RET(memPoolCheckCfg(cfg));
memcpy(&pPool->cfg, &cfg, sizeof(cfg));
pPool->name = taosStrdup(poolName);
if (NULL == pPool->name) {
uError("calloc memory pool name %s failed", poolName);
MP_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
pPool->maxChunkNum = cfg->maxSize / cfg->chunkSize;
if (pPool->maxChunkNum <= 0) {
uError("invalid memory pool max chunk num, maxSize:%" PRId64 ", chunkSize:%d", cfg->maxSize, cfg->chunkSize);
return TSDB_CODE_INVALID_MEM_POOL_PARAM;
}
pPool->threadChunkReserveNum = 1;
pPool->freeChunkReserveNum = TMIN(cfg->threadNum * pPool->threadChunkReserveNum, pPool->maxChunkNum);
MP_ERR_RET(memPoolAddChunkCache(pPool));
MP_ERR_RET(memPoolEnsureMinFreeChunk(pPool));
}
void taosMemPoolModInit(void) {
taosThreadMutexInit(&gMPoolMutex, NULL);
gMPoolList = taosArrayInit(10, POINTER_BYTES);
}
int32_t taosMemPoolOpen(char* poolName, SMemPoolCfg cfg, void** poolHandle) {
int32_t code = TSDB_CODE_SUCCESS;
SMemPool* pPool = NULL;
taosThreadOnce(&gMPoolInit, taosMemPoolModInit);
if (NULL == gMPoolList) {
uError("init memory pool failed");
MP_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
}
SMemPool* pPool = taosMemoryCalloc(1, sizeof(SMemPool));
if (NULL == pPool) {
uError("calloc memory pool failed");
MP_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
}
pPool->slotId = -1;
MP_ERR_JRET(memPoolInit(pPool, poolName, &cfg))
_return:
if (TSDB_CODE_SUCCESS != code) {
taosMemPoolClose(pPool);
}
}
void *taosMemPoolMalloc(void* poolHandle, int64_t size, char* fileName, int32_t lineNo) {
}
void *taosMemPoolCalloc(void* poolHandle, int64_t num, int64_t size, char* fileName, int32_t lineNo) {
}
void *taosMemPoolRealloc(void* poolHandle, void *ptr, int64_t size) {
}
char *taosMemPoolStrdup(void* poolHandle, const char *ptr) {
}
void taosMemPoolFree(void* poolHandle, void *ptr) {
}
int32_t taosMemPoolGetMemorySize(void* poolHandle, void *ptr, int64_t* size) {
}
void *taosMemPoolMallocAlign(void* poolHandle, uint32_t alignment, int64_t size) {
}
void taosMemPoolClose(void* poolHandle) {
}
void taosMemPoolModDestroy(void) {
}