feat: add session init processing
This commit is contained in:
parent
cdaf8cef69
commit
38a7dce5ca
|
@ -40,25 +40,25 @@ typedef struct SMemPoolCfg {
|
|||
|
||||
void taosMemPoolModInit(void);
|
||||
int32_t taosMemPoolOpen(char* poolName, SMemPoolCfg cfg, void** poolHandle);
|
||||
void *taosMemPoolMalloc(void* poolHandle, int64_t size, char* fileName, int32_t lineNo);
|
||||
void *taosMemPoolCalloc(void* poolHandle, int64_t num, int64_t size, char* fileName, int32_t lineNo);
|
||||
void *taosMemPoolRealloc(void* poolHandle, void *ptr, int64_t size, char* fileName, int32_t lineNo);
|
||||
char *taosMemPoolStrdup(void* poolHandle, const char *ptr, char* fileName, int32_t lineNo);
|
||||
void taosMemPoolFree(void* poolHandle, void *ptr, char* fileName, int32_t lineNo);
|
||||
int64_t taosMemPoolGetMemorySize(void* poolHandle, void *ptr, char* fileName, int32_t lineNo);
|
||||
void taosMemPoolTrim(void* poolHandle, int32_t size, char* fileName, int32_t lineNo);
|
||||
void *taosMemPoolMallocAlign(void* poolHandle, uint32_t alignment, int64_t size, char* fileName, int32_t lineNo);
|
||||
void *taosMemPoolMalloc(void* poolHandle, void* session, int64_t size, char* fileName, int32_t lineNo);
|
||||
void *taosMemPoolCalloc(void* poolHandle, void* session, int64_t num, int64_t size, char* fileName, int32_t lineNo);
|
||||
void *taosMemPoolRealloc(void* poolHandle, void* session, void *ptr, int64_t size, char* fileName, int32_t lineNo);
|
||||
char *taosMemPoolStrdup(void* poolHandle, void* session, const char *ptr, char* fileName, int32_t lineNo);
|
||||
void taosMemPoolFree(void* poolHandle, void* session, void *ptr, char* fileName, int32_t lineNo);
|
||||
int64_t taosMemPoolGetMemorySize(void* poolHandle, void* session, void *ptr, char* fileName, int32_t lineNo);
|
||||
void taosMemPoolTrim(void* poolHandle, void* session, int32_t size, char* fileName, int32_t lineNo);
|
||||
void *taosMemPoolMallocAlign(void* poolHandle, void* session, uint32_t alignment, int64_t size, char* fileName, int32_t lineNo);
|
||||
void taosMemPoolClose(void* poolHandle);
|
||||
void taosMemPoolModDestroy(void);
|
||||
|
||||
#define taosMemoryMalloc(_size) ((NULL != threadPoolHandle) ? (taosMemPoolMalloc(threadPoolHandle, _size, __FILE__, __LINE__)) : (taosMemMalloc(_size)))
|
||||
#define taosMemoryCalloc(_num, _size) ((NULL != threadPoolHandle) ? (taosMemPoolCalloc(threadPoolHandle, _num, _size, __FILE__, __LINE__)) : (taosMemCalloc(_num, _size)))
|
||||
#define taosMemoryRealloc(_ptr, _size) ((NULL != threadPoolHandle) ? (taosMemPoolRealloc(threadPoolHandle, _ptr, _size, __FILE__, __LINE__)) : (taosMemRealloc(_ptr, _size)))
|
||||
#define taosStrdup(_ptr) ((NULL != threadPoolHandle) ? (taosMemPoolStrdup(threadPoolHandle, _ptr, __FILE__, __LINE__)) : (taosStrdupi(_ptr)))
|
||||
#define taosMemoryFree(_ptr) ((NULL != threadPoolHandle) ? (taosMemPoolFree(threadPoolHandle, _ptr, __FILE__, __LINE__)) : (taosMemFree(_ptr)))
|
||||
#define taosMemorySize(_ptr) ((NULL != threadPoolHandle) ? (taosMemPoolGetMemorySize(threadPoolHandle, _ptr, __FILE__, __LINE__)) : (taosMemSize(_ptr)))
|
||||
#define taosMemoryTrim(_size) ((NULL != threadPoolHandle) ? (taosMemPoolTrim(threadPoolHandle, _size, __FILE__, __LINE__)) : (taosMemTrim(_size)))
|
||||
#define taosMemoryMallocAlign(_alignment, _size) ((NULL != threadPoolHandle) ? (taosMemPoolMallocAlign(threadPoolHandle, _alignment, _size, __FILE__, __LINE__)) : (taosMemMallocAlign(_alignment, _size)))
|
||||
#define taosMemoryMalloc(_size) ((NULL != threadPoolHandle) ? (taosMemPoolMalloc(threadPoolHandle, threadPoolSession, _size, __FILE__, __LINE__)) : (taosMemMalloc(_size)))
|
||||
#define taosMemoryCalloc(_num, _size) ((NULL != threadPoolHandle) ? (taosMemPoolCalloc(threadPoolHandle, threadPoolSession, _num, _size, __FILE__, __LINE__)) : (taosMemCalloc(_num, _size)))
|
||||
#define taosMemoryRealloc(_ptr, _size) ((NULL != threadPoolHandle) ? (taosMemPoolRealloc(threadPoolHandle, threadPoolSession, _ptr, _size, __FILE__, __LINE__)) : (taosMemRealloc(_ptr, _size)))
|
||||
#define taosStrdup(_ptr) ((NULL != threadPoolHandle) ? (taosMemPoolStrdup(threadPoolHandle, threadPoolSession, _ptr, __FILE__, __LINE__)) : (taosStrdupi(_ptr)))
|
||||
#define taosMemoryFree(_ptr) ((NULL != threadPoolHandle) ? (taosMemPoolFree(threadPoolHandle, threadPoolSession, _ptr, __FILE__, __LINE__)) : (taosMemFree(_ptr)))
|
||||
#define taosMemorySize(_ptr) ((NULL != threadPoolHandle) ? (taosMemPoolGetMemorySize(threadPoolHandle, threadPoolSession, _ptr, __FILE__, __LINE__)) : (taosMemSize(_ptr)))
|
||||
#define taosMemoryTrim(_size) ((NULL != threadPoolHandle) ? (taosMemPoolTrim(threadPoolHandle, threadPoolSession, _size, __FILE__, __LINE__)) : (taosMemTrim(_size)))
|
||||
#define taosMemoryMallocAlign(_alignment, _size) ((NULL != threadPoolHandle) ? (taosMemPoolMallocAlign(threadPoolHandle, threadPoolSession, _alignment, _size, __FILE__, __LINE__)) : (taosMemMallocAlign(_alignment, _size)))
|
||||
|
||||
#define taosMemoryFreeClear(ptr) \
|
||||
do { \
|
||||
|
|
|
@ -22,24 +22,37 @@ extern "C" {
|
|||
|
||||
#include "os.h"
|
||||
|
||||
#define MEMPOOL_CHUNKPOOL_MIN_BATCH_SIZE 1000
|
||||
#define MEMPOOL_MAX_KEEP_FREE_CHUNK_NUM 1000
|
||||
#define MP_CHUNKPOOL_MIN_BATCH_SIZE 1000
|
||||
#define MP_MAX_KEEP_FREE_CHUNK_NUM 1000
|
||||
|
||||
typedef struct SChunk {
|
||||
#define MP_CHUNK_FLAG_IN_USE (1 << 0)
|
||||
#define MP_CHUNK_FLAG_NS_CHUNK (1 << 1)
|
||||
|
||||
typedef struct SMPChunk {
|
||||
void *pNext;
|
||||
int64_t flags;
|
||||
uint32_t memSize;
|
||||
uint32_t offset;
|
||||
void *pMemStart;
|
||||
} SChunk;
|
||||
int32_t flags;
|
||||
uint32_t offset;
|
||||
} SMPChunk;
|
||||
|
||||
typedef struct SChunkCache {
|
||||
typedef struct SMPNSChunk {
|
||||
void *pNext;
|
||||
void *pMemStart;
|
||||
int32_t flags;
|
||||
uint32_t offset;
|
||||
|
||||
uint32_t memBytes;
|
||||
} SMPNSChunk;
|
||||
|
||||
|
||||
typedef struct SMPChunkCache {
|
||||
int32_t chunkNum;
|
||||
int32_t idleOffset;
|
||||
SChunk *pChunks;
|
||||
} SChunkCache;
|
||||
SMPChunk *pChunks;
|
||||
void* pNext;
|
||||
} SMPChunkCache;
|
||||
|
||||
typedef struct SMemoryStat {
|
||||
typedef struct SMPMemoryStat {
|
||||
int64_t chunkAlloc;
|
||||
int64_t chunkFree;
|
||||
int64_t memMalloc;
|
||||
|
@ -47,12 +60,37 @@ typedef struct SMemoryStat {
|
|||
int64_t memRealloc;
|
||||
int64_t strdup;
|
||||
int64_t memFree;
|
||||
} SMemoryStat;
|
||||
} SMPMemoryStat;
|
||||
|
||||
typedef struct SMemPoolStat {
|
||||
SMemPoolMemoryStat times;
|
||||
SMemPoolMemoryStat bytes;
|
||||
} SMemPoolStat;
|
||||
typedef struct SMPStat {
|
||||
SMPMemoryStat times;
|
||||
SMPMemoryStat bytes;
|
||||
} SMPStat;
|
||||
|
||||
typedef struct SMPSession {
|
||||
int64_t allocChunkNum;
|
||||
int64_t allocChunkMemSize;
|
||||
int64_t allocMemSize;
|
||||
int64_t reUseChunkNum;
|
||||
|
||||
int32_t srcChunkNum;
|
||||
SMPChunk *srcChunkHead;
|
||||
SMPChunk *srcChunkTail;
|
||||
|
||||
SMPChunk *inUseChunkHead;
|
||||
SMPChunk *inUseChunkTail;
|
||||
|
||||
SMPNSChunk *inUseNSChunkHead;
|
||||
SMPNSChunk *inUseNSChunkTail;
|
||||
|
||||
SMPChunk *reUseChunkHead;
|
||||
SMPChunk *reUseChunkTail;
|
||||
|
||||
SMPNSChunk *reUseNSChunkHead;
|
||||
SMPNSChunk *reUseNSChunkTail;
|
||||
|
||||
SMPStat stat;
|
||||
} SMPSession;
|
||||
|
||||
typedef struct SMemPool {
|
||||
char *name;
|
||||
|
@ -67,20 +105,70 @@ typedef struct SMemPool {
|
|||
|
||||
int64_t allocChunkCacheNum;
|
||||
int32_t chunkCacheUnitNum;
|
||||
SArray *pChunkCache; // SArray<SChunkCache>
|
||||
SMPChunkCache *pChunkCacheHead;
|
||||
SMPChunkCache *pChunkCacheTail;
|
||||
SMPChunk *pIdleChunkList;
|
||||
|
||||
int32_t freeChunkNum;
|
||||
int32_t freeChunkReserveNum;
|
||||
SChunk *freeChunkHead;
|
||||
SChunk *freeChunkTail;
|
||||
int32_t readyChunkNum;
|
||||
int32_t readyChunkReserveNum;
|
||||
int32_t readyChunkLowNum;
|
||||
int32_t readyChunkGotNum;
|
||||
SRWLatch readyChunkLock;
|
||||
SMPChunk *readyChunkHead;
|
||||
SMPChunk *readyChunkTail;
|
||||
|
||||
int64_t freeNChunkNum;
|
||||
SChunk *freeNChunkHead;
|
||||
SChunk *freeNChunkTail;
|
||||
int64_t readyNSChunkNum;
|
||||
SMPChunk *readyNSChunkHead;
|
||||
SMPChunk *readyNSChunkTail;
|
||||
|
||||
SMemPoolStat stat;
|
||||
SMPStat stat;
|
||||
} SMemPool;
|
||||
|
||||
#define MP_CHUNK_GET_FLAG(st, f) ((st) & (f))
|
||||
#define MP_CHUNK_SET_FLAG(st, f) (st) |= (f)
|
||||
#define MP_CHUNK_CLR_FLAG(st, f) (st) &= (~f)
|
||||
|
||||
enum {
|
||||
MP_READ = 1,
|
||||
MP_WRITE,
|
||||
};
|
||||
|
||||
|
||||
#define MP_LOCK(type, _lock) \
|
||||
do { \
|
||||
if (MP_READ == (type)) { \
|
||||
ASSERTS(atomic_load_32((_lock)) >= 0, "invalid lock value before read lock"); \
|
||||
uDebug("MP RLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
|
||||
taosRLockLatch(_lock); \
|
||||
uDebug("MP RLOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
|
||||
ASSERTS(atomic_load_32((_lock)) > 0, "invalid lock value after read lock"); \
|
||||
} else { \
|
||||
ASSERTS(atomic_load_32((_lock)) >= 0, "invalid lock value before write lock"); \
|
||||
uDebug("MP WLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
|
||||
taosWLockLatch(_lock); \
|
||||
uDebug("MP WLOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
|
||||
ASSERTS(atomic_load_32((_lock)) == TD_RWLATCH_WRITE_FLAG_COPY, "invalid lock value after write lock"); \
|
||||
} \
|
||||
} while (0)
|
||||
|
||||
#define MP_UNLOCK(type, _lock) \
|
||||
do { \
|
||||
if (MP_READ == (type)) { \
|
||||
ASSERTS(atomic_load_32((_lock)) > 0, "invalid lock value before read unlock"); \
|
||||
uDebug("MP RULOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
|
||||
taosRUnLockLatch(_lock); \
|
||||
uDebug("MP RULOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
|
||||
ASSERTS(atomic_load_32((_lock)) >= 0, "invalid lock value after read unlock"); \
|
||||
} else { \
|
||||
ASSERTS(atomic_load_32((_lock)) == TD_RWLATCH_WRITE_FLAG_COPY, "invalid lock value before write unlock"); \
|
||||
uDebug("MP WULOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
|
||||
taosWUnLockLatch(_lock); \
|
||||
uDebug("MP WULOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
|
||||
ASSERTS(atomic_load_32((_lock)) >= 0, "invalid lock value after write unlock"); \
|
||||
} \
|
||||
} while (0)
|
||||
|
||||
|
||||
#define MP_ERR_RET(c) \
|
||||
do { \
|
||||
int32_t _code = c; \
|
||||
|
|
|
@ -19,9 +19,12 @@
|
|||
#include "tlog.h"
|
||||
#include "tutil.h"
|
||||
|
||||
static SArray* gMPoolList;
|
||||
static SArray* gMPoolList = NULL;
|
||||
static TdThreadOnce gMPoolInit = PTHREAD_ONCE_INIT;
|
||||
static TdThreadMutex gMPoolMutex;
|
||||
static threadlocal void* threadPoolHandle = NULL;
|
||||
static threadlocal void* threadPoolSession = NULL;
|
||||
|
||||
|
||||
int32_t memPoolCheckCfg(SMemPoolCfg* cfg) {
|
||||
if (cfg->chunkSize < MEMPOOL_MIN_CHUNK_SIZE || cfg->chunkSize > MEMPOOL_MAX_CHUNK_SIZE) {
|
||||
|
@ -42,64 +45,132 @@ int32_t memPoolCheckCfg(SMemPoolCfg* cfg) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t memPoolAddChunkCache(SMemPool* pPool) {
|
||||
void memPoolFreeChunkCache(SMPChunkCache* pCache) {
|
||||
//TODO
|
||||
}
|
||||
|
||||
int32_t memPoolAddChunkCache(SMemPool* pPool, SMPChunkCache* pTail) {
|
||||
if (0 == pPool->chunkCacheUnitNum) {
|
||||
pPool->chunkCacheUnitNum = TMAX(pPool->maxChunkNum / 10, MEMPOOL_CHUNKPOOL_MIN_BATCH_SIZE);
|
||||
pPool->chunkCacheUnitNum = TMAX(pPool->maxChunkNum / 10, MP_CHUNKPOOL_MIN_BATCH_SIZE);
|
||||
}
|
||||
|
||||
if (NULL == pPool->pChunkCache) {
|
||||
pPool->pChunkCache = taosArrayInit(10, sizeof(SChunkCache));
|
||||
if (NULL == pPool->pChunkCache) {
|
||||
uError("taosArrayInit chunkPool failed");
|
||||
SMPChunkCache* pCache = NULL;
|
||||
if (NULL == pPool->pChunkCacheHead) {
|
||||
pPool->pChunkCacheHead = taosMemCalloc(1, sizeof(*pPool->pChunkCacheHead));
|
||||
if (NULL == pPool->pChunkCacheHead) {
|
||||
uError("malloc chunkCache failed");
|
||||
MP_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
pCache = pPool->pChunkCacheHead;
|
||||
} else {
|
||||
pCache = (SMPChunkCache*)taosMemCalloc(1, sizeof(SMPChunkCache));
|
||||
}
|
||||
|
||||
SChunkCache* pChunkCache = taosArrayReserve(pPool->pChunkCache, 1);
|
||||
pChunkCache->chunkNum = pPool->chunkCacheUnitNum;
|
||||
pChunkCache->pChunks = taosMemoryCalloc(pChunkCache->chunkNum, sizeof(*pChunkCache->pChunks));
|
||||
if (NULL == pChunkCache->pChunks) {
|
||||
uError("calloc %d chunks in pool failed", pChunkCache->chunkNum);
|
||||
pCache->chunkNum = pPool->chunkCacheUnitNum;
|
||||
pCache->pChunks = taosMemoryCalloc(pCache->chunkNum, sizeof(*pCache->pChunks));
|
||||
if (NULL == pCache->pChunks) {
|
||||
uError("calloc %d chunks in cache failed", pCache->chunkNum);
|
||||
MP_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
atomic_add_fetch_64(&pPool->allocChunkCacheNum, pChunkCache->chunkNum);
|
||||
if (atomic_val_compare_exchange_ptr(&pPool->pChunkCacheTail, pTail, pCache) != pTail) {
|
||||
memPoolFreeChunkCache(pCache);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
atomic_add_fetch_64(&pPool->allocChunkCacheNum, pCache->chunkNum);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t memPoolGetChunkCache(SMemPool* pPool, SChunkCache** pCache) {
|
||||
int32_t memPoolGetIdleChunk(SMemPool* pPool, SMPChunkCache** ppCache, SMPChunk** ppChunk) {
|
||||
SMPChunkCache* pCache = NULL;
|
||||
SMPChunk* pChunk = NULL;
|
||||
|
||||
while (true) {
|
||||
SChunkCache* cache = (SChunkCache*)taosArrayGetLast(pPool->pChunkCache);
|
||||
if (NULL != cache && cache->idleOffset < cache->chunkNum) {
|
||||
*pCache = cache;
|
||||
pChunk = (SMPChunk*)atomic_load_ptr(&pPool->pIdleChunkList);
|
||||
if (NULL == pChunk) {
|
||||
break;
|
||||
}
|
||||
|
||||
MP_ERR_RET(memPoolAddChunkCache(pPool));
|
||||
if (atomic_val_compare_exchange_ptr(&pPool->pIdleChunkList, pChunk, pChunk->pNext) != pChunk) {
|
||||
continue;
|
||||
}
|
||||
|
||||
pChunk->pNext = NULL;
|
||||
goto _return;
|
||||
}
|
||||
|
||||
while (true) {
|
||||
pCache = atomic_load_ptr(&pPool->pChunkCacheTail);
|
||||
int32_t offset = atomic_fetch_add_32(&pCache->idleOffset, 1);
|
||||
if (offset < pCache->chunkNum) {
|
||||
pChunk = pCache->pChunks + offset;
|
||||
break;
|
||||
} else {
|
||||
atomic_sub_fetch_32(&pCache->idleOffset, 1);
|
||||
}
|
||||
|
||||
MP_ERR_RET(memPoolAddChunkCache(pPool, pCache));
|
||||
}
|
||||
|
||||
_return:
|
||||
|
||||
if (NULL != ppCache) {
|
||||
*ppCache = pCache;
|
||||
}
|
||||
|
||||
*ppChunk = pChunk;
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t memPoolNewReadyChunk(SMemPool* pPool, SMPChunkCache** ppCache, SMPChunk** ppChunk) {
|
||||
SMPChunk* pChunk = NULL;
|
||||
MP_ERR_RET(memPoolGetIdleChunk(pPool, ppCache, &pChunk));
|
||||
|
||||
pChunk->pMemStart = taosMemMalloc(pPool->cfg.chunkSize);
|
||||
if (NULL == pChunk->pMemStart) {
|
||||
uError("add new chunk, memory malloc %d failed", pPool->cfg.chunkSize);
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t memPoolGetIdleChunkFromCache() {
|
||||
int32_t memPoolAddReadyChunk(SMemPool* pPool, int32_t num) {
|
||||
SMPChunkCache* pCache = NULL;
|
||||
SMPChunk* pChunk = NULL;
|
||||
for (int32_t i = 0; i < num; ++i) {
|
||||
MP_ERR_RET(memPoolNewReadyChunk(pPool, &pCache, &pChunk));
|
||||
|
||||
if (NULL == pPool->readyChunkTail) {
|
||||
pPool->readyChunkHead = pChunk;
|
||||
pPool->readyChunkTail = pChunk;
|
||||
} else {
|
||||
pPool->readyChunkTail->pNext = pChunk;
|
||||
}
|
||||
|
||||
atomic_add_fetch_32(&pPool->readyChunkNum, 1);
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t memPoolEnsureMinFreeChunk(SMemPool* pPool) {
|
||||
int32_t memPoolEnsureReadyChunks(SMemPool* pPool) {
|
||||
if (E_EVICT_ALL == pPool->cfg.evicPolicy) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
if (pPool->freeChunkNum >= pPool->freeChunkReserveNum) {
|
||||
int32_t readyMissNum = pPool->readyChunkReserveNum - atomic_load_32(&pPool->readyChunkNum);
|
||||
if (readyMissNum <= 0) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t toAddNum = pPool->freeChunkReserveNum - pPool->freeChunkNum;
|
||||
for (int32_t i = 0; i < toAddNum; ++i) {
|
||||
memPoolGetIdleChunk(pPool);
|
||||
}
|
||||
|
||||
MP_ERR_RET(memPoolAddReadyChunk(pPool, readyMissNum));
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t memPoolInit(SMemPool* pPool, char* poolName, SMemPoolCfg* cfg) {
|
||||
|
@ -120,11 +191,42 @@ int32_t memPoolInit(SMemPool* pPool, char* poolName, SMemPoolCfg* cfg) {
|
|||
}
|
||||
|
||||
pPool->threadChunkReserveNum = 1;
|
||||
pPool->freeChunkReserveNum = TMIN(cfg->threadNum * pPool->threadChunkReserveNum, pPool->maxChunkNum);
|
||||
pPool->readyChunkReserveNum = TMIN(cfg->threadNum * pPool->threadChunkReserveNum, pPool->maxChunkNum);
|
||||
|
||||
MP_ERR_RET(memPoolAddChunkCache(pPool));
|
||||
|
||||
MP_ERR_RET(memPoolEnsureMinFreeChunk(pPool));
|
||||
MP_ERR_RET(memPoolGetIdleChunk(pPool, NULL, &pPool->readyChunkHead));
|
||||
pPool->readyChunkTail = pPool->readyChunkHead;
|
||||
|
||||
MP_ERR_RET(memPoolEnsureReadyChunks(pPool));
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
void memPoolNotifyLowReadyChunk(SMemPool* pPool) {
|
||||
|
||||
}
|
||||
|
||||
int32_t memPoolGetReadyChunk(SMemPool* pPool, SMPChunk** ppChunk) {
|
||||
SMPChunkCache* pCache = NULL;
|
||||
SMPChunk* pChunk = NULL;
|
||||
int32_t readyChunkNum = atomic_sub_fetch_32(&pPool->readyChunkNum, 1);
|
||||
if (readyChunkNum >= 0) {
|
||||
if (atomic_add_fetch_32(&pPool->readyChunkGotNum, 1) == pPool->readyChunkLowNum) {
|
||||
memPoolNotifyLowReadyChunk(pPool);
|
||||
}
|
||||
|
||||
pChunk = (SMPChunk*)atomic_load_ptr(&pPool->readyChunkHead->pNext);
|
||||
while (atomic_val_compare_exchange_ptr(&pPool->readyChunkHead->pNext, pChunk, pChunk->pNext) != pChunk) {
|
||||
pChunk = (SMPChunk*)atomic_load_ptr(&pPool->readyChunkHead->pNext);
|
||||
}
|
||||
|
||||
*ppChunk = pChunk;
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
MP_RET(memPoolNewReadyChunk(pPool, NULL, ppChunk));
|
||||
}
|
||||
|
||||
|
||||
|
@ -134,6 +236,7 @@ void taosMemPoolModInit(void) {
|
|||
gMPoolList = taosArrayInit(10, POINTER_BYTES);
|
||||
}
|
||||
|
||||
|
||||
int32_t taosMemPoolOpen(char* poolName, SMemPoolCfg cfg, void** poolHandle) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
SMemPool* pPool = NULL;
|
||||
|
@ -150,10 +253,14 @@ int32_t taosMemPoolOpen(char* poolName, SMemPoolCfg cfg, void** poolHandle) {
|
|||
MP_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
pPool->slotId = -1;
|
||||
MP_ERR_JRET(memPoolInit(pPool, poolName, &cfg))
|
||||
MP_ERR_JRET(memPoolInit(pPool, poolName, &cfg));
|
||||
|
||||
taosThreadMutexLock(&gMPoolMutex);
|
||||
|
||||
taosArrayPush(gMPoolList, &pPool);
|
||||
pPool->slotId = taosArrayGetSize(gMPoolList) - 1;
|
||||
|
||||
taosThreadMutexUnlock(&gMPoolMutex);
|
||||
|
||||
_return:
|
||||
|
||||
|
@ -161,33 +268,69 @@ _return:
|
|||
taosMemPoolClose(pPool);
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
void *taosMemPoolMalloc(void* poolHandle, int64_t size, char* fileName, int32_t lineNo) {
|
||||
void taosMemPoolDestroySession(void* session) {
|
||||
SMPSession* pSession = (SMPSession*)session;
|
||||
//TODO;
|
||||
}
|
||||
|
||||
int32_t taosMemPoolInitSession(void* poolHandle, void** ppSession) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
SMemPool* pPool = (SMemPool*)poolHandle;
|
||||
SMPSession* pSession = (SMPSession*)taosMemCalloc(1, sizeof(SMPSession));
|
||||
if (NULL == pSession) {
|
||||
uError("calloc memory pool session failed");
|
||||
MP_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
MP_ERR_JRET(memPoolGetReadyChunk(pPool, &pSession->srcChunkHead));
|
||||
|
||||
pSession->srcChunkTail = pSession->srcChunkHead;
|
||||
pSession->srcChunkNum = 1;
|
||||
pSession->allocChunkNum = 1;
|
||||
pSession->allocChunkMemSize = pPool->cfg.chunkSize;
|
||||
|
||||
_return:
|
||||
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
taosMemPoolDestroySession(pSession);
|
||||
pSession = NULL;
|
||||
}
|
||||
|
||||
*ppSession = pSession;
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
void *taosMemPoolMalloc(void* poolHandle, void* session, int64_t size, char* fileName, int32_t lineNo) {
|
||||
SMemPool* pPool = (SMemPool*)poolHandle;
|
||||
SMPSession* pSession = (SMPSession*)session;
|
||||
|
||||
}
|
||||
|
||||
void *taosMemPoolCalloc(void* poolHandle, int64_t num, int64_t size, char* fileName, int32_t lineNo) {
|
||||
void *taosMemPoolCalloc(void* poolHandle, void* session, int64_t num, int64_t size, char* fileName, int32_t lineNo) {
|
||||
|
||||
}
|
||||
|
||||
void *taosMemPoolRealloc(void* poolHandle, void *ptr, int64_t size) {
|
||||
void *taosMemPoolRealloc(void* poolHandle, void* session, void *ptr, int64_t size) {
|
||||
|
||||
}
|
||||
|
||||
char *taosMemPoolStrdup(void* poolHandle, const char *ptr) {
|
||||
char *taosMemPoolStrdup(void* poolHandle, void* session, const char *ptr) {
|
||||
|
||||
}
|
||||
|
||||
void taosMemPoolFree(void* poolHandle, void *ptr) {
|
||||
void taosMemPoolFree(void* poolHandle, void* session, void *ptr) {
|
||||
|
||||
}
|
||||
|
||||
int32_t taosMemPoolGetMemorySize(void* poolHandle, void *ptr, int64_t* size) {
|
||||
int32_t taosMemPoolGetMemorySize(void* poolHandle, void* session, void *ptr, int64_t* size) {
|
||||
|
||||
}
|
||||
|
||||
void *taosMemPoolMallocAlign(void* poolHandle, uint32_t alignment, int64_t size) {
|
||||
void *taosMemPoolMallocAlign(void* poolHandle, void* session, uint32_t alignment, int64_t size) {
|
||||
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue