diff --git a/include/os/osMemory.h b/include/os/osMemory.h index 683d10e926..e43e7d7cf6 100644 --- a/include/os/osMemory.h +++ b/include/os/osMemory.h @@ -43,24 +43,25 @@ extern "C" { int32_t taosMemoryDbgInit(); int32_t taosMemoryDbgInitRestore(); -void *taosMemoryMalloc(int64_t size); -void *taosMemoryCalloc(int64_t num, int64_t size); -void *taosMemoryRealloc(void *ptr, int64_t size); -char *taosStrdup(const char *ptr); -void taosMemoryFree(void *ptr); -int64_t taosMemorySize(void *ptr); +void *taosMemMalloc(int64_t size); +void *taosMemCalloc(int64_t num, int64_t size); +void *taosMemRealloc(void *ptr, int64_t size); +char *taosStrdupi(const char *ptr); +void taosMemFree(void *ptr); +int64_t taosMemSize(void *ptr); void taosPrintBackTrace(); -void taosMemoryTrim(int32_t size); -void *taosMemoryMallocAlign(uint32_t alignment, int64_t size); +void taosMemTrim(int32_t size); +void *taosMemMallocAlign(uint32_t alignment, int64_t size); -#define taosMemoryFreeClear(ptr) \ +#define taosMemFreeClear(ptr) \ do { \ if (ptr) { \ - taosMemoryFree((void *)ptr); \ + taosMemFree((void *)ptr); \ (ptr) = NULL; \ } \ } while (0) + #ifdef __cplusplus } #endif diff --git a/include/util/taoserror.h b/include/util/taoserror.h index 2e407d26b0..cd78d9308d 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -145,6 +145,7 @@ int32_t taosGetErrSize(); #define TSDB_CODE_IP_NOT_IN_WHITE_LIST TAOS_DEF_ERROR_CODE(0, 0x0134) #define TSDB_CODE_FAILED_TO_CONNECT_S3 TAOS_DEF_ERROR_CODE(0, 0x0135) #define TSDB_CODE_MSG_PREPROCESSED TAOS_DEF_ERROR_CODE(0, 0x0136) // internal +#define TSDB_CODE_INVALID_MEM_POOL_PARAM TAOS_DEF_ERROR_CODE(0, 0x0137) //client #define TSDB_CODE_TSC_INVALID_OPERATION TAOS_DEF_ERROR_CODE(0, 0x0200) diff --git a/include/util/tmempool.h b/include/util/tmempool.h index 7a5aca7b34..455538d64e 100644 --- a/include/util/tmempool.h +++ b/include/util/tmempool.h @@ -21,12 +21,53 @@ extern "C" { #endif -typedef void *mpool_h; +#define MEMPOOL_MAX_CHUNK_SIZE (1 << 30) +#define MEMPOOL_MIN_CHUNK_SIZE (1 << 20) + +typedef enum MemPoolEvictPolicy{ + E_EVICT_ALL = 1, + E_EVICT_NONE, + E_EVICT_AUTO, + E_EVICT_MAX_VALUE, // no used +} MemPoolEvictPolicy; + +typedef struct SMemPoolCfg { + int64_t maxSize; + int32_t chunkSize; + int32_t threadNum; + MemPoolEvictPolicy evicPolicy; +} SMemPoolCfg; + +void taosMemPoolModInit(void); +int32_t taosMemPoolOpen(char* poolName, SMemPoolCfg cfg, void** poolHandle); +void *taosMemPoolMalloc(void* poolHandle, int64_t size, char* fileName, int32_t lineNo); +void *taosMemPoolCalloc(void* poolHandle, int64_t num, int64_t size, char* fileName, int32_t lineNo); +void *taosMemPoolRealloc(void* poolHandle, void *ptr, int64_t size, char* fileName, int32_t lineNo); +char *taosMemPoolStrdup(void* poolHandle, const char *ptr, char* fileName, int32_t lineNo); +void taosMemPoolFree(void* poolHandle, void *ptr, char* fileName, int32_t lineNo); +int64_t taosMemPoolGetMemorySize(void* poolHandle, void *ptr, char* fileName, int32_t lineNo); +void taosMemPoolTrim(void* poolHandle, int32_t size, char* fileName, int32_t lineNo); +void *taosMemPoolMallocAlign(void* poolHandle, uint32_t alignment, int64_t size, char* fileName, int32_t lineNo); +void taosMemPoolClose(void* poolHandle); +void taosMemPoolModDestroy(void); + +#define taosMemoryMalloc(_size) ((NULL != threadPoolHandle) ? (taosMemPoolMalloc(threadPoolHandle, _size, __FILE__, __LINE__)) : (taosMemMalloc(_size))) +#define taosMemoryCalloc(_num, _size) ((NULL != threadPoolHandle) ? (taosMemPoolCalloc(threadPoolHandle, _num, _size, __FILE__, __LINE__)) : (taosMemCalloc(_num, _size))) +#define taosMemoryRealloc(_ptr, _size) ((NULL != threadPoolHandle) ? (taosMemPoolRealloc(threadPoolHandle, _ptr, _size, __FILE__, __LINE__)) : (taosMemRealloc(_ptr, _size))) +#define taosStrdup(_ptr) ((NULL != threadPoolHandle) ? (taosMemPoolStrdup(threadPoolHandle, _ptr, __FILE__, __LINE__)) : (taosStrdupi(_ptr))) +#define taosMemoryFree(_ptr) ((NULL != threadPoolHandle) ? (taosMemPoolFree(threadPoolHandle, _ptr, __FILE__, __LINE__)) : (taosMemFree(_ptr))) +#define taosMemorySize(_ptr) ((NULL != threadPoolHandle) ? (taosMemPoolGetMemorySize(threadPoolHandle, _ptr, __FILE__, __LINE__)) : (taosMemSize(_ptr))) +#define taosMemoryTrim(_size) ((NULL != threadPoolHandle) ? (taosMemPoolTrim(threadPoolHandle, _size, __FILE__, __LINE__)) : (taosMemTrim(_size))) +#define taosMemoryMallocAlign(_alignment, _size) ((NULL != threadPoolHandle) ? (taosMemPoolMallocAlign(threadPoolHandle, _alignment, _size, __FILE__, __LINE__)) : (taosMemMallocAlign(_alignment, _size))) + +#define taosMemoryFreeClear(ptr) \ + do { \ + if (ptr) { \ + taosMemoryFree((void *)ptr); \ + (ptr) = NULL; \ + } \ + } while (0) -mpool_h taosMemPoolInit(int32_t maxNum, int32_t blockSize); -char *taosMemPoolMalloc(mpool_h handle); -void taosMemPoolFree(mpool_h handle, char *p); -void taosMemPoolCleanUp(mpool_h handle); #ifdef __cplusplus } diff --git a/source/util/inc/tmempoolInt.h b/source/util/inc/tmempoolInt.h new file mode 100755 index 0000000000..f5a7a2c7d2 --- /dev/null +++ b/source/util/inc/tmempoolInt.h @@ -0,0 +1,117 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef _TD_UTIL_MEMPOOL_INT_H_ +#define _TD_UTIL_MEMPOOL_INT_H_ + +#ifdef __cplusplus +extern "C" { +#endif + +#include "os.h" + +#define MEMPOOL_CHUNKPOOL_MIN_BATCH_SIZE 1000 +#define MEMPOOL_MAX_KEEP_FREE_CHUNK_NUM 1000 + +typedef struct SChunk { + void *pNext; + int64_t flags; + uint32_t memSize; + uint32_t offset; + void *pMemStart; +} SChunk; + +typedef struct SChunkCache { + int32_t chunkNum; + int32_t idleOffset; + SChunk *pChunks; +} SChunkCache; + +typedef struct SMemoryStat { + int64_t chunkAlloc; + int64_t chunkFree; + int64_t memMalloc; + int64_t memCalloc; + int64_t memRealloc; + int64_t strdup; + int64_t memFree; +} SMemoryStat; + +typedef struct SMemPoolStat { + SMemPoolMemoryStat times; + SMemPoolMemoryStat bytes; +} SMemPoolStat; + +typedef struct SMemPool { + char *name; + int16_t slotId; + SMemPoolCfg cfg; + int32_t maxChunkNum; + + double threadChunkReserveNum; + int64_t allocChunkNum; + int64_t allocMemSize; + int64_t usedMemSize; + + int64_t allocChunkCacheNum; + int32_t chunkCacheUnitNum; + SArray *pChunkCache; // SArray + + int32_t freeChunkNum; + int32_t freeChunkReserveNum; + SChunk *freeChunkHead; + SChunk *freeChunkTail; + + int64_t freeNChunkNum; + SChunk *freeNChunkHead; + SChunk *freeNChunkTail; + + SMemPoolStat stat; +} SMemPool; + +#define MP_ERR_RET(c) \ + do { \ + int32_t _code = c; \ + if (_code != TSDB_CODE_SUCCESS) { \ + terrno = _code; \ + return _code; \ + } \ + } while (0) + +#define MP_RET(c) \ + do { \ + int32_t _code = c; \ + if (_code != TSDB_CODE_SUCCESS) { \ + terrno = _code; \ + } \ + return _code; \ + } while (0) + +#define MP_ERR_JRET(c) \ + do { \ + code = c; \ + if (code != TSDB_CODE_SUCCESS) { \ + terrno = code; \ + goto _return; \ + } \ + } while (0) + + + +#ifdef __cplusplus +} +#endif + +#endif /* _TD_UTIL_MEMPOOL_INT_H_ */ diff --git a/source/util/src/terror.c b/source/util/src/terror.c index 80de20f5f5..528a4a8a14 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -104,6 +104,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_CFG_VALUE, "Invalid configuration TAOS_DEFINE_ERROR(TSDB_CODE_IP_NOT_IN_WHITE_LIST, "Not allowed to connect") TAOS_DEFINE_ERROR(TSDB_CODE_FAILED_TO_CONNECT_S3, "Failed to connect to s3 server") TAOS_DEFINE_ERROR(TSDB_CODE_MSG_PREPROCESSED, "Message has been processed in preprocess") +TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_MEM_POOL_PARAM, "Invalid memory pool param") //client TAOS_DEFINE_ERROR(TSDB_CODE_TSC_INVALID_OPERATION, "Invalid operation") diff --git a/source/util/src/tmempool.c b/source/util/src/tmempool.c index 8a57715c22..d73a0ed99d 100644 --- a/source/util/src/tmempool.c +++ b/source/util/src/tmempool.c @@ -15,113 +15,188 @@ #define _DEFAULT_SOURCE #include "tmempool.h" +#include "tmempoolInt.h" #include "tlog.h" #include "tutil.h" -typedef struct { - int32_t numOfFree; /* number of free slots */ - int32_t first; /* the first free slot */ - int32_t numOfBlock; /* the number of blocks */ - int32_t blockSize; /* block size in bytes */ - int32_t *freeList; /* the index list */ - char *pool; /* the actual mem block */ - TdThreadMutex mutex; -} pool_t; +static SArray* gMPoolList; +static TdThreadOnce gMPoolInit = PTHREAD_ONCE_INIT; +static TdThreadMutex gMPoolMutex; -mpool_h taosMemPoolInit(int32_t numOfBlock, int32_t blockSize) { - int32_t i; - pool_t *pool_p; - - if (numOfBlock <= 1 || blockSize <= 1) { - uError("invalid parameter in memPoolInit\n"); - return NULL; +int32_t memPoolCheckCfg(SMemPoolCfg* cfg) { + if (cfg->chunkSize < MEMPOOL_MIN_CHUNK_SIZE || cfg->chunkSize > MEMPOOL_MAX_CHUNK_SIZE) { + uError("invalid memory pool chunkSize:%d", cfg->chunkSize); + return TSDB_CODE_INVALID_MEM_POOL_PARAM; } - pool_p = (pool_t *)taosMemoryMalloc(sizeof(pool_t)); - if (pool_p == NULL) { - uError("mempool malloc failed\n"); - return NULL; - } else { - memset(pool_p, 0, sizeof(pool_t)); + if (cfg->evicPolicy <= 0 || cfg->evicPolicy >= E_EVICT_MAX_VALUE) { + uError("invalid memory pool evicPolicy:%d", cfg->evicPolicy); + return TSDB_CODE_INVALID_MEM_POOL_PARAM; } - pool_p->blockSize = blockSize; - pool_p->numOfBlock = numOfBlock; - pool_p->pool = (char *)taosMemoryMalloc((size_t)(blockSize * numOfBlock)); - pool_p->freeList = (int32_t *)taosMemoryMalloc(sizeof(int32_t) * (size_t)numOfBlock); - - if (pool_p->pool == NULL || pool_p->freeList == NULL) { - uError("failed to allocate memory\n"); - taosMemoryFreeClear(pool_p->freeList); - taosMemoryFreeClear(pool_p->pool); - taosMemoryFreeClear(pool_p); - return NULL; + if (cfg->threadNum <= 0) { + uError("invalid memory pool threadNum:%d", cfg->threadNum); + return TSDB_CODE_INVALID_MEM_POOL_PARAM; } - taosThreadMutexInit(&(pool_p->mutex), NULL); - - memset(pool_p->pool, 0, (size_t)(blockSize * numOfBlock)); - for (i = 0; i < pool_p->numOfBlock; ++i) pool_p->freeList[i] = i; - - pool_p->first = 0; - pool_p->numOfFree = pool_p->numOfBlock; - - return (mpool_h)pool_p; + return TSDB_CODE_SUCCESS; } -char *taosMemPoolMalloc(mpool_h handle) { - char *pos = NULL; - pool_t *pool_p = (pool_t *)handle; - - taosThreadMutexLock(&(pool_p->mutex)); - - if (pool_p->numOfFree > 0) { - pos = pool_p->pool + pool_p->blockSize * (pool_p->freeList[pool_p->first]); - pool_p->first++; - pool_p->first = pool_p->first % pool_p->numOfBlock; - pool_p->numOfFree--; +int32_t memPoolAddChunkCache(SMemPool* pPool) { + if (0 == pPool->chunkCacheUnitNum) { + pPool->chunkCacheUnitNum = TMAX(pPool->maxChunkNum / 10, MEMPOOL_CHUNKPOOL_MIN_BATCH_SIZE); } - taosThreadMutexUnlock(&(pool_p->mutex)); - - if (pos == NULL) uDebug("mempool: out of memory"); - return pos; -} - -void taosMemPoolFree(mpool_h handle, char *pMem) { - int32_t index; - pool_t *pool_p = (pool_t *)handle; - - if (pMem == NULL) return; - - index = (int32_t)(pMem - pool_p->pool) % pool_p->blockSize; - if (index != 0) { - uError("invalid free address:%p\n", pMem); - return; + if (NULL == pPool->pChunkCache) { + pPool->pChunkCache = taosArrayInit(10, sizeof(SChunkCache)); + if (NULL == pPool->pChunkCache) { + uError("taosArrayInit chunkPool failed"); + MP_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); + } } - index = (int32_t)((pMem - pool_p->pool) / pool_p->blockSize); - if (index < 0 || index >= pool_p->numOfBlock) { - uError("mempool: error, invalid address:%p", pMem); - return; + SChunkCache* pChunkCache = taosArrayReserve(pPool->pChunkCache, 1); + pChunkCache->chunkNum = pPool->chunkCacheUnitNum; + pChunkCache->pChunks = taosMemoryCalloc(pChunkCache->chunkNum, sizeof(*pChunkCache->pChunks)); + if (NULL == pChunkCache->pChunks) { + uError("calloc %d chunks in pool failed", pChunkCache->chunkNum); + MP_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); } - memset(pMem, 0, (size_t)pool_p->blockSize); + atomic_add_fetch_64(&pPool->allocChunkCacheNum, pChunkCache->chunkNum); - taosThreadMutexLock(&pool_p->mutex); - - pool_p->freeList[(pool_p->first + pool_p->numOfFree) % pool_p->numOfBlock] = index; - pool_p->numOfFree++; - - taosThreadMutexUnlock(&pool_p->mutex); + return TSDB_CODE_SUCCESS; } -void taosMemPoolCleanUp(mpool_h handle) { - pool_t *pool_p = (pool_t *)handle; +int32_t memPoolGetChunkCache(SMemPool* pPool, SChunkCache** pCache) { + while (true) { + SChunkCache* cache = (SChunkCache*)taosArrayGetLast(pPool->pChunkCache); + if (NULL != cache && cache->idleOffset < cache->chunkNum) { + *pCache = cache; + break; + } - taosThreadMutexDestroy(&pool_p->mutex); - if (pool_p->pool) taosMemoryFree(pool_p->pool); - if (pool_p->freeList) taosMemoryFree(pool_p->freeList); - memset(pool_p, 0, sizeof(*pool_p)); - taosMemoryFree(pool_p); + MP_ERR_RET(memPoolAddChunkCache(pPool)); + } + + return TSDB_CODE_SUCCESS; } + +int32_t memPoolGetIdleChunkFromCache() { + +} + +int32_t memPoolEnsureMinFreeChunk(SMemPool* pPool) { + if (E_EVICT_ALL == pPool->cfg.evicPolicy) { + return TSDB_CODE_SUCCESS; + } + + if (pPool->freeChunkNum >= pPool->freeChunkReserveNum) { + return TSDB_CODE_SUCCESS; + } + + int32_t toAddNum = pPool->freeChunkReserveNum - pPool->freeChunkNum; + for (int32_t i = 0; i < toAddNum; ++i) { + memPoolGetIdleChunk(pPool); + } + +} + +int32_t memPoolInit(SMemPool* pPool, char* poolName, SMemPoolCfg* cfg) { + MP_ERR_RET(memPoolCheckCfg(cfg)); + + memcpy(&pPool->cfg, &cfg, sizeof(cfg)); + + pPool->name = taosStrdup(poolName); + if (NULL == pPool->name) { + uError("calloc memory pool name %s failed", poolName); + MP_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); + } + + pPool->maxChunkNum = cfg->maxSize / cfg->chunkSize; + if (pPool->maxChunkNum <= 0) { + uError("invalid memory pool max chunk num, maxSize:%" PRId64 ", chunkSize:%d", cfg->maxSize, cfg->chunkSize); + return TSDB_CODE_INVALID_MEM_POOL_PARAM; + } + + pPool->threadChunkReserveNum = 1; + pPool->freeChunkReserveNum = TMIN(cfg->threadNum * pPool->threadChunkReserveNum, pPool->maxChunkNum); + + MP_ERR_RET(memPoolAddChunkCache(pPool)); + + MP_ERR_RET(memPoolEnsureMinFreeChunk(pPool)); +} + + +void taosMemPoolModInit(void) { + taosThreadMutexInit(&gMPoolMutex, NULL); + + gMPoolList = taosArrayInit(10, POINTER_BYTES); +} + +int32_t taosMemPoolOpen(char* poolName, SMemPoolCfg cfg, void** poolHandle) { + int32_t code = TSDB_CODE_SUCCESS; + SMemPool* pPool = NULL; + + taosThreadOnce(&gMPoolInit, taosMemPoolModInit); + if (NULL == gMPoolList) { + uError("init memory pool failed"); + MP_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY); + } + + SMemPool* pPool = taosMemoryCalloc(1, sizeof(SMemPool)); + if (NULL == pPool) { + uError("calloc memory pool failed"); + MP_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY); + } + + pPool->slotId = -1; + MP_ERR_JRET(memPoolInit(pPool, poolName, &cfg)) + + + +_return: + + if (TSDB_CODE_SUCCESS != code) { + taosMemPoolClose(pPool); + } + +} + +void *taosMemPoolMalloc(void* poolHandle, int64_t size, char* fileName, int32_t lineNo) { + +} + +void *taosMemPoolCalloc(void* poolHandle, int64_t num, int64_t size, char* fileName, int32_t lineNo) { + +} + +void *taosMemPoolRealloc(void* poolHandle, void *ptr, int64_t size) { + +} + +char *taosMemPoolStrdup(void* poolHandle, const char *ptr) { + +} + +void taosMemPoolFree(void* poolHandle, void *ptr) { + +} + +int32_t taosMemPoolGetMemorySize(void* poolHandle, void *ptr, int64_t* size) { + +} + +void *taosMemPoolMallocAlign(void* poolHandle, uint32_t alignment, int64_t size) { + +} + +void taosMemPoolClose(void* poolHandle) { + +} + +void taosMemPoolModDestroy(void) { + +} + +