enh: add query memory pool

This commit is contained in:
dapan1121 2024-06-28 18:13:34 +08:00
parent 5f2ab7908a
commit 2927e1d6e4
8 changed files with 380 additions and 102 deletions

View File

@ -39,7 +39,7 @@ typedef struct SMemPoolCfg {
} SMemPoolCfg;
void taosMemPoolModInit(void);
int32_t taosMemPoolOpen(char* poolName, SMemPoolCfg cfg, void** poolHandle);
int32_t taosMemPoolOpen(char* poolName, SMemPoolCfg* cfg, void** poolHandle);
void *taosMemPoolMalloc(void* poolHandle, void* session, int64_t size, char* fileName, int32_t lineNo);
void *taosMemPoolCalloc(void* poolHandle, void* session, int64_t num, int64_t size, char* fileName, int32_t lineNo);
void *taosMemPoolRealloc(void* poolHandle, void* session, void *ptr, int64_t size, char* fileName, int32_t lineNo);
@ -51,6 +51,9 @@ void *taosMemPoolMallocAlign(void* poolHandle, void* session, uint32_t alignme
void taosMemPoolClose(void* poolHandle);
void taosMemPoolModDestroy(void);
#define taosEnableMemoryPoolUsage(_pool, _session) do { threadPoolHandle = _pool; threadPoolSession = _session; } while (0)
#define taosDisableMemoryPoolUsage() (threadPoolHandle = NULL)
#define taosMemoryMalloc(_size) ((NULL != threadPoolHandle) ? (taosMemPoolMalloc(threadPoolHandle, threadPoolSession, _size, __FILE__, __LINE__)) : (taosMemMalloc(_size)))
#define taosMemoryCalloc(_num, _size) ((NULL != threadPoolHandle) ? (taosMemPoolCalloc(threadPoolHandle, threadPoolSession, _num, _size, __FILE__, __LINE__)) : (taosMemCalloc(_num, _size)))
#define taosMemoryRealloc(_ptr, _size) ((NULL != threadPoolHandle) ? (taosMemPoolRealloc(threadPoolHandle, threadPoolSession, _ptr, _size, __FILE__, __LINE__)) : (taosMemRealloc(_ptr, _size)))

View File

@ -53,6 +53,9 @@ char tsEncryptKey[17] = {0};
int32_t tsMaxShellConns = 50000;
int32_t tsShellActivityTimer = 3; // second
// memory pool
int8_t tsQueryUseMemoryPool = 1;
// queue & threads
int32_t tsNumOfRpcThreads = 1;
int32_t tsNumOfRpcSessions = 30000;
@ -671,6 +674,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
if (cfgAddInt32(pCfg, "queryRspPolicy", tsQueryRspPolicy, 0, 1, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != 0) return -1;
if (cfgAddInt32(pCfg, "numOfCommitThreads", tsNumOfCommitThreads, 1, 1024, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1;
if (cfgAddInt32(pCfg, "retentionSpeedLimitMB", tsRetentionSpeedLimitMB, 0, 1024, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1;
if (cfgAddBool(pCfg, "queryUseMemoryPool", tsQueryUseMemoryPool, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1;
if (cfgAddInt32(pCfg, "numOfMnodeReadThreads", tsNumOfMnodeReadThreads, 1, 1024, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1;
if (cfgAddInt32(pCfg, "numOfVnodeQueryThreads", tsNumOfVnodeQueryThreads, 4, 1024, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1;
@ -1120,6 +1124,8 @@ static int32_t taosSetServerCfg(SConfig *pCfg) {
tsNumOfRpcSessions = cfgGetItem(pCfg, "numOfRpcSessions")->i32;
tsTimeToGetAvailableConn = cfgGetItem(pCfg, "timeToGetAvailableConn")->i32;
tsQueryUseMemoryPool = (bool)cfgGetItem(pCfg, "queryUseMemoryPool")->bval;
tsNumOfCommitThreads = cfgGetItem(pCfg, "numOfCommitThreads")->i32;
tsRetentionSpeedLimitMB = cfgGetItem(pCfg, "retentionSpeedLimitMB")->i32;
tsNumOfMnodeReadThreads = cfgGetItem(pCfg, "numOfMnodeReadThreads")->i32;

View File

@ -37,6 +37,13 @@ extern "C" {
#define QW_SCH_TIMEOUT_MSEC 180000
#define QW_MIN_RES_ROWS 16384
#define QW_THREAD_MAX_SCHED_TASK_NUM 10
#define QW_QUERY_MEM_POOL_NAME "Query"
#define QW_DEFAULT_RESERVE_MEM_PERCENT 20
#define QW_MIN_RESERVE_MEM_SIZE (512 * 1048576)
#define QW_MIN_MEM_POOL_SIZE (256 * 1048576)
enum {
QW_PHASE_PRE_QUERY = 1,
QW_PHASE_POST_QUERY,
@ -146,6 +153,8 @@ typedef struct SQWTaskCtx {
void *taskHandle;
void *sinkHandle;
SArray *tbInfo; // STbVerInfo
void *memPoolSession;
} SQWTaskCtx;
typedef struct SQWSchStatus {

View File

@ -0,0 +1,49 @@
#include "qwInt.h"
#include "qworker.h"
void* gQueryPoolHandle = NULL;
int32_t qwGetMemPoolMaxMemSize(int64_t totalSize, int64_t* maxSize) {
int64_t reserveSize = TMAX(totalSize * QW_DEFAULT_RESERVE_MEM_PERCENT / 100 / 1048576 * 1048576, QW_MIN_RESERVE_MEM_SIZE);
int64_t availSize = (totalSize - reserveSize) / 1048576 * 1048576;
if (availSize < QW_MIN_MEM_POOL_SIZE) {
return -1;
}
*maxSize = availSize;
return TSDB_CODE_SUCCESS;
}
int32_t qwGetMemPoolChunkSize(int64_t totalSize, int32_t threadNum, int64_t* chunkSize) {
*chunkSize = 2 * 1048576;
return TSDB_CODE_SUCCESS;
}
void qwInitQueryPool(void) {
int64_t memSize = 0;
int32_t code = taosGetSysMemory(&memSize);
if (TSDB_CODE_SUCCESS != code) {
return;
}
SMemPoolCfg cfg = {0};
code = qwGetMemPoolMaxMemSize(memSize, &cfg.maxSize);
if (TSDB_CODE_SUCCESS != code) {
return;
}
cfg.threadNum = 10; //TODO
cfg.evicPolicy = E_EVICT_AUTO; //TODO
code = qwGetMemPoolChunkSize(cfg.maxSize, cfg.threadNum, &cfg.chunkSize);
if (TSDB_CODE_SUCCESS != code) {
return;
}
taosMemPoolOpen(QW_QUERY_MEM_POOL_NAME, &cfg, &gQueryPoolHandle);
}

View File

@ -18,6 +18,8 @@ SQWorkerMgmt gQwMgmt = {
.qwNum = 0,
};
static TdThreadOnce gQueryPoolInit = PTHREAD_ONCE_INIT;
int32_t qwStopAllTasks(SQWorker *mgmt) {
uint64_t qId, tId, sId;
int32_t eId;
@ -707,7 +709,11 @@ int32_t qwPreprocessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
ctx->ctrlConnInfo = qwMsg->connInfo;
ctx->sId = sId;
ctx->phase = -1;
if (NULL != gQueryPoolHandle) {
QW_ERR_JRET(taosMemPoolInitSession(gQueryPoolHandle, &ctx->memPoolSession));
}
QW_ERR_JRET(qwAddTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_INIT));
qwSendQueryRsp(QW_FPARAMS(), qwMsg->msgType + 1, ctx, code, true);
@ -1256,6 +1262,8 @@ int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, void **qWorkerMgmt, const S
QW_RET(TSDB_CODE_QRY_INVALID_INPUT);
}
taosThreadOnce(&gQueryPoolInit, qwInitQueryPool);
int32_t qwNum = atomic_add_fetch_32(&gQwMgmt.qwNum, 1);
if (1 == qwNum) {
memset(gQwMgmt.param, 0, sizeof(gQwMgmt.param));

View File

@ -254,7 +254,7 @@ int32_t taosMemoryDbgInitRestore() {
#endif
}
void *taosMemoryMalloc(int64_t size) {
void *taosMemMalloc(int64_t size) {
#ifdef USE_TD_MEMORY
void *tmp = malloc(size + sizeof(TdMemoryInfo));
if (tmp == NULL) return NULL;
@ -270,7 +270,7 @@ void *taosMemoryMalloc(int64_t size) {
#endif
}
void *taosMemoryCalloc(int64_t num, int64_t size) {
void *taosMemCalloc(int64_t num, int64_t size) {
#ifdef USE_TD_MEMORY
int32_t memorySize = num * size;
char *tmp = calloc(memorySize + sizeof(TdMemoryInfo), 1);
@ -287,7 +287,7 @@ void *taosMemoryCalloc(int64_t num, int64_t size) {
#endif
}
void *taosMemoryRealloc(void *ptr, int64_t size) {
void *taosMemRealloc(void *ptr, int64_t size) {
#ifdef USE_TD_MEMORY
if (ptr == NULL) return taosMemoryMalloc(size);
@ -334,7 +334,7 @@ char *taosStrdup(const char *ptr) {
#endif
}
void taosMemoryFree(void *ptr) {
void taosMemFree(void *ptr) {
if (NULL == ptr) return;
#ifdef USE_TD_MEMORY
TdMemoryInfoPtr pTdMemoryInfo = (TdMemoryInfoPtr)((char *)ptr - sizeof(TdMemoryInfo));
@ -350,7 +350,7 @@ void taosMemoryFree(void *ptr) {
#endif
}
int64_t taosMemorySize(void *ptr) {
int64_t taosMemSize(void *ptr) {
if (ptr == NULL) return 0;
#ifdef USE_TD_MEMORY
@ -373,7 +373,7 @@ int64_t taosMemorySize(void *ptr) {
#endif
}
void taosMemoryTrim(int32_t size) {
void taosMemTrim(int32_t size) {
#if defined(WINDOWS) || defined(DARWIN) || defined(_ALPINE)
// do nothing
return;
@ -382,7 +382,7 @@ void taosMemoryTrim(int32_t size) {
#endif
}
void *taosMemoryMallocAlign(uint32_t alignment, int64_t size) {
void *taosMemMallocAlign(uint32_t alignment, int64_t size) {
#ifdef USE_TD_MEMORY
ASSERT(0);
#else
@ -390,7 +390,7 @@ void *taosMemoryMallocAlign(uint32_t alignment, int64_t size) {
void *p = memalign(alignment, size);
return p;
#else
return taosMemoryMalloc(size);
return taosMemMalloc(size);
#endif
#endif
}

View File

@ -22,17 +22,37 @@ extern "C" {
#include "os.h"
#define MP_CHUNKGRP_ALLOC_BATCH_SIZE 1000
#define MP_NSCHUNKGRP_ALLOC_BATCH_SIZE 500
#define MP_MAX_KEEP_FREE_CHUNK_NUM 1000
#define MP_CHUNK_CACHE_ALLOC_BATCH_SIZE 1000
#define MP_NSCHUNK_CACHE_ALLOC_BATCH_SIZE 500
#define MP_SESSION_CACHE_ALLOC_BATCH_SIZE 100
#define MP_MAX_KEEP_FREE_CHUNK_NUM 1000
#define MP_MAX_MALLOC_MEM_SIZE 0xFFFFFFFFFF
// FLAGS AREA
#define MP_CHUNK_FLAG_IN_USE (1 << 0)
#define MP_CHUNK_FLAG_NS_CHUNK (1 << 1)
#define MP_DBG_FLAG_LOG_MALLOC_FREE (1 << 0)
#define MP_MEM_HEADER_FLAG_NS_CHUNK (1 << 0)
typedef struct SMPMemHeader {
uint64_t flags:3;
uint64_t size:5;
} SMPMemHeader;
typedef struct SMPMemTailer {
} SMPMemTailer;
typedef struct SMPListNode {
void *pNext;
} SMPListNode;
typedef struct SMPChunk {
void *pNext;
SMPListNode list;
char *pMemStart;
int32_t flags;
/* KEEP ABOVE SAME WITH SMPNSChunk */
@ -41,22 +61,22 @@ typedef struct SMPChunk {
} SMPChunk;
typedef struct SMPNSChunk {
void *pNext;
SMPListNode list;
char *pMemStart;
int32_t flags;
/* KEEP ABOVE SAME WITH SMPChunk */
uint32_t offset;
uint64_t offset;
uint64_t memBytes;
} SMPNSChunk;
typedef struct SMPChunkGroup {
int32_t chunkNum;
typedef struct SMPCacheGroup {
int32_t nodesNum;
int32_t idleOffset;
void *pChunks;
void *pNodes;
void* pNext;
} SMPChunkGroup;
} SMPCacheGroup;
typedef struct SMPMemoryStat {
int64_t chunkAlloc;
@ -74,6 +94,7 @@ typedef struct SMPStat {
} SMPStat;
typedef struct SMPSession {
SMPListNode list;
int64_t allocChunkNum;
int64_t allocChunkMemSize;
int64_t allocMemSize;
@ -99,14 +120,14 @@ typedef struct SMPSession {
SMPStat stat;
} SMPSession;
typedef struct SMPChunkGroupInfo {
int16_t chunkNodeSize;
int64_t allocChunkNum;
int32_t chunkGroupNum;
SMPChunkGroup *pChunkGrpHead;
SMPChunkGroup *pChunkGrpTail;
void *pIdleChunkList;
} SMPChunkGroupInfo;
typedef struct SMPCacheGroupInfo {
int16_t nodeSize;
int64_t allocNum;
int32_t groupNum;
SMPCacheGroup *pGrpHead;
SMPCacheGroup *pGrpTail;
void *pIdleList;
} SMPCacheGroupInfo;
typedef struct SMPDebugInfo {
int64_t flags;
@ -127,8 +148,9 @@ typedef struct SMemPool {
int64_t allocNSChunkSize;
int64_t allocMemSize;
SMPChunkGroupInfo chunkGrpInfo;
SMPChunkGroupInfo NSChunkGrpInfo;
SMPCacheGroupInfo chunkCache;
SMPCacheGroupInfo NSChunkCache;
SMPCacheGroupInfo sessionCache;
int32_t readyChunkNum;
int32_t readyChunkReserveNum;
@ -154,13 +176,21 @@ enum {
MP_WRITE,
};
#define MP_INIT_MEM_HEADER(_header, _size, _nsChunk) \
do { \
(_header)->size = _size; \
if (_nsChunk) { \
MP_SET_FLAG((_header)->flags, MP_MEM_HEADER_FLAG_NS_CHUNK); \
} \
} while (0)
#define MP_ADD_TO_CHUNK_LIST(_chunkHead, _chunkTail, _chunkNum, _chunk) \
do { \
if (NULL == _chunkHead) { \
_chunkHead = _chunk; \
_chunkTail = _chunk; \
} else { \
(_chunkTail)->pNext = _chunk; \
(_chunkTail)->list.pNext = _chunk; \
} \
(_chunkNum)++; \
} while (0)

View File

@ -45,82 +45,82 @@ int32_t memPoolCheckCfg(SMemPoolCfg* cfg) {
return TSDB_CODE_SUCCESS;
}
void memPoolFreeChunkGroup(SMPChunkGroup* pGrp) {
void memPoolFreeChunkGroup(SMPCacheGroup* pGrp) {
//TODO
}
int32_t memPoolAddChunkGroup(SMemPool* pPool, SMPChunkGroupInfo* pInfo, SMPChunkGroup* pTail) {
SMPChunkGroup* pGrp = NULL;
if (NULL == pInfo->pChunkGrpHead) {
pInfo->pChunkGrpHead = taosMemCalloc(1, sizeof(*pInfo->pChunkGrpHead));
if (NULL == pInfo->pChunkGrpHead) {
int32_t memPoolAddCacheGroup(SMemPool* pPool, SMPCacheGroupInfo* pInfo, SMPCacheGroup* pTail) {
SMPCacheGroup* pGrp = NULL;
if (NULL == pInfo->pGrpHead) {
pInfo->pGrpHead = taosMemCalloc(1, sizeof(*pInfo->pGrpHead));
if (NULL == pInfo->pGrpHead) {
uError("malloc chunkCache failed");
MP_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
pGrp = pInfo->pChunkGrpHead;
pGrp = pInfo->pGrpHead;
} else {
pGrp = (SMPChunkGroup*)taosMemCalloc(1, sizeof(SMPChunkGroup));
pGrp = (SMPCacheGroup*)taosMemCalloc(1, sizeof(SMPCacheGroup));
}
pGrp->chunkNum = pInfo->chunkGroupNum;
pGrp->pChunks = taosMemoryCalloc(pGrp->chunkNum, pInfo->chunkNodeSize);
if (NULL == pGrp->pChunks) {
uError("calloc %d %d chunks in chunk group failed", pGrp->chunkNum, pInfo->chunkNodeSize);
pGrp->nodesNum = pInfo->groupNum;
pGrp->pNodes = taosMemoryCalloc(pGrp->nodesNum, pInfo->nodeSize);
if (NULL == pGrp->pNodes) {
uError("calloc %d %d nodes in cache group failed", pGrp->nodesNum, pInfo->nodeSize);
MP_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
if (atomic_val_compare_exchange_ptr(&pInfo->pChunkGrpTail, pTail, pGrp) != pTail) {
if (atomic_val_compare_exchange_ptr(&pInfo->pGrpTail, pTail, pGrp) != pTail) {
memPoolFreeChunkGroup(pGrp);
return TSDB_CODE_SUCCESS;
}
atomic_add_fetch_64(&pInfo->allocChunkNum, pGrp->chunkNum);
atomic_add_fetch_64(&pInfo->allocNum, pGrp->nodesNum);
return TSDB_CODE_SUCCESS;
}
int32_t memPoolGetIdleChunk(SMemPool* pPool, SMPChunkGroupInfo* pInfo, void** ppChunk) {
SMPChunkGroup* pGrp = NULL;
SMPChunk* pChunk = NULL;
int32_t memPoolGetIdleNode(SMemPool* pPool, SMPCacheGroupInfo* pInfo, void** ppRes) {
SMPCacheGroup* pGrp = NULL;
SMPListNode* pList = NULL;
while (true) {
pChunk = (SMPChunk*)atomic_load_ptr(&pInfo->pIdleChunkList);
if (NULL == pChunk) {
pList = (SMPListNode*)atomic_load_ptr(&pInfo->pIdleList);
if (NULL == pList) {
break;
}
if (atomic_val_compare_exchange_ptr(&pInfo->pIdleChunkList, pChunk, pChunk->pNext) != pChunk) {
if (atomic_val_compare_exchange_ptr(&pInfo->pIdleList, pList, pList->pNext) != pList) {
continue;
}
pChunk->pNext = NULL;
pList->pNext = NULL;
goto _return;
}
while (true) {
pGrp = atomic_load_ptr(&pInfo->pChunkGrpTail);
pGrp = atomic_load_ptr(&pInfo->pGrpTail);
int32_t offset = atomic_fetch_add_32(&pGrp->idleOffset, 1);
if (offset < pGrp->chunkNum) {
pChunk = (SMPChunk*)((char*)pGrp->pChunks + offset * pInfo->chunkNodeSize);
if (offset < pGrp->nodesNum) {
pList = (SMPListNode*)((char*)pGrp->pNodes + offset * pInfo->nodeSize);
break;
} else {
atomic_sub_fetch_32(&pGrp->idleOffset, 1);
}
MP_ERR_RET(memPoolAddChunkGroup(pPool, pInfo, pGrp));
MP_ERR_RET(memPoolAddCacheGroup(pPool, pInfo, pGrp));
}
_return:
*ppChunk = pChunk;
*ppRes = pList;
return TSDB_CODE_SUCCESS;
}
int32_t memPoolNewChunk(SMemPool* pPool, SMPChunk** ppChunk) {
SMPChunk* pChunk = NULL;
MP_ERR_RET(memPoolGetIdleChunk(pPool, &pPool->chunkGrpInfo, &pChunk));
MP_ERR_RET(memPoolGetIdleNode(pPool, &pPool->chunkCache, &pChunk));
pChunk->pMemStart = taosMemMalloc(pPool->cfg.chunkSize);
if (NULL == pChunk->pMemStart) {
@ -128,13 +128,16 @@ int32_t memPoolNewChunk(SMemPool* pPool, SMPChunk** ppChunk) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pPool->allocChunkNum++;
pPool->allocChunkSize += pPool->cfg.chunkSize;
return TSDB_CODE_SUCCESS;
}
int32_t memPoolNewNSChunk(SMemPool* pPool, SMPNSChunk** ppChunk, int64_t chunkSize) {
SMPNSChunk* pChunk = NULL;
MP_ERR_RET(memPoolGetIdleChunk(pPool, &pPool->NSChunkGrpInfo, &pChunk));
MP_ERR_RET(memPoolGetIdleNode(pPool, &pPool->NSChunkCache, &pChunk));
pChunk->pMemStart = taosMemMalloc(chunkSize);
if (NULL == pChunk->pMemStart) {
@ -145,12 +148,15 @@ int32_t memPoolNewNSChunk(SMemPool* pPool, SMPNSChunk** ppChunk, int64_t chunkSi
pChunk->memBytes = chunkSize;
MP_SET_FLAG(pChunk->flags, MP_CHUNK_FLAG_NS_CHUNK);
pPool->allocNSChunkNum++;
pPool->allocNSChunkSize += pPool->cfg.chunkSize;
return TSDB_CODE_SUCCESS;
}
int32_t memPoolPrepareChunks(SMemPool* pPool, int32_t num) {
SMPChunkGroup* pGrp = NULL;
SMPCacheGroup* pGrp = NULL;
SMPChunk* pChunk = NULL;
for (int32_t i = 0; i < num; ++i) {
MP_ERR_RET(memPoolNewChunk(pPool, &pGrp, &pChunk));
@ -159,7 +165,7 @@ int32_t memPoolPrepareChunks(SMemPool* pPool, int32_t num) {
pPool->readyChunkHead = pChunk;
pPool->readyChunkTail = pChunk;
} else {
pPool->readyChunkTail->pNext = pChunk;
pPool->readyChunkTail->list.pNext = pChunk;
}
atomic_add_fetch_32(&pPool->readyChunkNum, 1);
@ -204,14 +210,18 @@ int32_t memPoolInit(SMemPool* pPool, char* poolName, SMemPoolCfg* cfg) {
pPool->threadChunkReserveNum = 1;
pPool->readyChunkReserveNum = TMIN(cfg->threadNum * pPool->threadChunkReserveNum, pPool->maxChunkNum);
pPool->chunkGrpInfo.chunkGroupNum = TMAX(pPool->maxChunkNum / 10, MP_CHUNKGRP_ALLOC_BATCH_SIZE);
pPool->chunkGrpInfo.chunkNodeSize = sizeof(SMPChunk);
pPool->NSChunkGrpInfo.chunkGroupNum = MP_NSCHUNKGRP_ALLOC_BATCH_SIZE;
pPool->NSChunkGrpInfo.chunkNodeSize = sizeof(SMPNSChunk);
pPool->chunkCache.groupNum = TMAX(pPool->maxChunkNum / 10, MP_CHUNK_CACHE_ALLOC_BATCH_SIZE);
pPool->chunkCache.nodeSize = sizeof(SMPChunk);
pPool->NSChunkCache.groupNum = MP_NSCHUNK_CACHE_ALLOC_BATCH_SIZE;
pPool->NSChunkCache.nodeSize = sizeof(SMPNSChunk);
pPool->sessionCache.groupNum = MP_SESSION_CACHE_ALLOC_BATCH_SIZE;
pPool->sessionCache.nodeSize = sizeof(SMPSession);
MP_ERR_RET(memPoolAddChunkGroup(pPool, &pPool->chunkGrpInfo, NULL));
MP_ERR_RET(memPoolAddCacheGroup(pPool, &pPool->chunkCache, NULL));
MP_ERR_RET(memPoolAddCacheGroup(pPool, &pPool->NSChunkCache, NULL));
MP_ERR_RET(memPoolAddCacheGroup(pPool, &pPool->sessionCache, NULL));
MP_ERR_RET(memPoolGetIdleChunk(pPool, &pPool->chunkGrpInfo, &pPool->readyChunkHead));
MP_ERR_RET(memPoolGetIdleNode(pPool, &pPool->chunkCache, &pPool->readyChunkHead));
pPool->readyChunkTail = pPool->readyChunkHead;
MP_ERR_RET(memPoolEnsureChunks(pPool));
@ -224,7 +234,7 @@ void memPoolNotifyLowChunkNum(SMemPool* pPool) {
}
int32_t memPoolGetChunk(SMemPool* pPool, SMPChunk** ppChunk) {
SMPChunkGroup* pCache = NULL;
SMPCacheGroup* pCache = NULL;
SMPChunk* pChunk = NULL;
int32_t readyChunkNum = atomic_sub_fetch_32(&pPool->readyChunkNum, 1);
if (readyChunkNum >= 0) {
@ -232,9 +242,9 @@ int32_t memPoolGetChunk(SMemPool* pPool, SMPChunk** ppChunk) {
memPoolNotifyLowChunkNum(pPool);
}
pChunk = (SMPChunk*)atomic_load_ptr(&pPool->readyChunkHead->pNext);
while (atomic_val_compare_exchange_ptr(&pPool->readyChunkHead->pNext, pChunk, pChunk->pNext) != pChunk) {
pChunk = (SMPChunk*)atomic_load_ptr(&pPool->readyChunkHead->pNext);
pChunk = (SMPChunk*)atomic_load_ptr(&pPool->readyChunkHead->list.pNext);
while (atomic_val_compare_exchange_ptr(&pPool->readyChunkHead->list.pNext, pChunk, pChunk->list.pNext) != pChunk) {
pChunk = (SMPChunk*)atomic_load_ptr(&pPool->readyChunkHead->list.pNext);
}
*ppChunk = pChunk;
@ -254,7 +264,7 @@ int32_t memPoolGetChunkFromSession(SMemPool* pPool, SMPSession* pSession, int64_
}
*ppPreChunk = pChunk;
pChunk = (SMPChunk*)pChunk->pNext;
pChunk = (SMPChunk*)pChunk->list.pNext;
}
if (NULL == *ppChunk) {
@ -267,8 +277,10 @@ int32_t memPoolGetChunkFromSession(SMemPool* pPool, SMPSession* pSession, int64_
void* memPoolAllocFromChunk(SMemPool* pPool, SMPSession* pSession, int64_t size) {
SMPChunk* pChunk = NULL, *preSrcChunk = NULL;
void* pRes = NULL;
int64_t totalSize = size + sizeof(SMPMemHeader) + sizeof(SMPMemTailer);
if (pSession->srcChunkNum > 0) {
MP_ERR_JRET(memPoolGetChunkFromSession(pPool, pSession, size, &pChunk, &preSrcChunk));
MP_ERR_JRET(memPoolGetChunkFromSession(pPool, pSession, totalSize, &pChunk, &preSrcChunk));
}
if (NULL == pChunk) {
@ -276,21 +288,24 @@ void* memPoolAllocFromChunk(SMemPool* pPool, SMPSession* pSession, int64_t size)
pSession->allocChunkNum++;
pSession->allocChunkMemSize += pPool->cfg.chunkSize;
pSession->allocMemSize += size;
pSession->allocMemSize += totalSize;
MP_ADD_TO_CHUNK_LIST(pSession->srcChunkHead, pSession->srcChunkTail, pSession->srcChunkNum, pChunk);
MP_ADD_TO_CHUNK_LIST(pSession->inUseChunkHead, pSession->inUseChunkTail, pSession->inUseChunkNum, pChunk);
}
pRes = pChunk->pMemStart + pChunk->offset;
pChunk->offset += size;
SMPMemHeader* pHeader = (SMPMemHeader*)(pChunk->pMemStart + pChunk->offset);
MP_INIT_MEM_HEADER(pHeader, size, false);
pRes = (void*)(pHeader + 1);
pChunk->offset += totalSize;
if (pChunk->offset >= (pPool->cfg.chunkSize - pPool->maxDiscardSize)) {
if (NULL == preSrcChunk) {
pSession->srcChunkHead = NULL;
pSession->srcChunkTail = NULL;
} else {
preSrcChunk->pNext = pChunk->pNext;
preSrcChunk->list.pNext = pChunk->list.pNext;
}
pSession->srcChunkNum--;
@ -304,22 +319,77 @@ _return:
void* memPoolAllocFromNSChunk(SMemPool* pPool, SMPSession* pSession, int64_t size) {
SMPNSChunk* pChunk = NULL;
MP_ERR_JRET(memPoolNewNSChunk(pPool, &pChunk, size));
void* pRes = NULL;
int64_t totalSize = size + sizeof(SMPMemHeader) + sizeof(SMPMemTailer);
MP_ERR_JRET(memPoolNewNSChunk(pPool, &pChunk, totalSize));
SMPMemHeader* pHeader = (SMPMemHeader*)pChunk->pMemStart;
MP_INIT_MEM_HEADER(pHeader, size, false);
pRes = (void*)(pHeader + 1);
pSession->allocChunkNum++;
pSession->allocChunkMemSize += size;
pSession->allocMemSize += size;
pSession->allocChunkMemSize += totalSize;
pSession->allocMemSize += totalSize;
if (NULL == pSession->inUseNSChunkHead) {
pSession->inUseNSChunkHead = pChunk;
pSession->inUseNSChunkTail = pChunk;
} else {
pSession->inUseNSChunkTail->pNext = pChunk;
pSession->inUseNSChunkTail->list.pNext = pChunk;
}
_return:
return pChunk ? pChunk->pMemStart : NULL;
return pRes;
}
void *memPoolMallocImpl(SMemPool* pPool, SMPSession* pSession, int64_t size, char* fileName, int32_t lineNo) {
int32_t code = TSDB_CODE_SUCCESS;
void *res = NULL;
res = (size > pPool->cfg.chunkSize) ? memPoolAllocFromNSChunk(pPool, pSession, size) : memPoolAllocFromChunk(pPool, pSession, size);
if (MP_GET_FLAG(pPool->dbgInfo.flags, MP_DBG_FLAG_LOG_MALLOC_FREE)) {
//TODO
}
_return:
return res;
}
void *memPoolCallocImpl(SMemPool* pPool, SMPSession* pSession, int64_t num, int64_t size, char* fileName, int32_t lineNo) {
int32_t code = TSDB_CODE_SUCCESS;
int64_t totalSize = num * size;
void *res = memPoolMallocImpl(pPool, pSession, totalSize, fileName, lineNo);
if (NULL != res) {
memset(res, 0, totalSize);
}
if (MP_GET_FLAG(pPool->dbgInfo.flags, MP_DBG_FLAG_LOG_MALLOC_FREE)) {
//TODO
}
_return:
return res;
}
void memPoolFreeImpl(SMemPool* pPool, SMPSession* pSession, void *ptr, char* fileName, int32_t lineNo) {
//TODO
if (MP_GET_FLAG(pPool->dbgInfo.flags, MP_DBG_FLAG_LOG_MALLOC_FREE)) {
//TODO
}
}
int64_t memPoolGetMemorySizeImpl(SMemPool* pPool, SMPSession* pSession, void *ptr, char* fileName, int32_t lineNo) {
SMPMemHeader* pHeader = (char)ptr - sizeof(SMPMemHeader);
return pHeader->size;
}
@ -330,7 +400,7 @@ void taosMemPoolModInit(void) {
gMPoolList = taosArrayInit(10, POINTER_BYTES);
}
int32_t taosMemPoolOpen(char* poolName, SMemPoolCfg cfg, void** poolHandle) {
int32_t taosMemPoolOpen(char* poolName, SMemPoolCfg* cfg, void** poolHandle) {
int32_t code = TSDB_CODE_SUCCESS;
SMemPool* pPool = NULL;
@ -346,7 +416,7 @@ int32_t taosMemPoolOpen(char* poolName, SMemPoolCfg cfg, void** poolHandle) {
MP_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
}
MP_ERR_JRET(memPoolInit(pPool, poolName, &cfg));
MP_ERR_JRET(memPoolInit(pPool, poolName, cfg));
taosThreadMutexLock(&gMPoolMutex);
@ -359,8 +429,11 @@ _return:
if (TSDB_CODE_SUCCESS != code) {
taosMemPoolClose(pPool);
pPool = NULL;
}
*poolHandle = pPool;
return code;
}
@ -372,11 +445,9 @@ void taosMemPoolDestroySession(void* session) {
int32_t taosMemPoolInitSession(void* poolHandle, void** ppSession) {
int32_t code = TSDB_CODE_SUCCESS;
SMemPool* pPool = (SMemPool*)poolHandle;
SMPSession* pSession = (SMPSession*)taosMemCalloc(1, sizeof(SMPSession));
if (NULL == pSession) {
uError("calloc memory pool session failed");
MP_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
}
SMPSession* pSession = NULL;
MP_ERR_JRET(memPoolGetIdleNode(pPool, &pPool->sessionCache, &pSession));
MP_ERR_JRET(memPoolGetChunk(pPool, &pSession->srcChunkHead));
@ -400,7 +471,7 @@ _return:
void *taosMemPoolMalloc(void* poolHandle, void* session, int64_t size, char* fileName, int32_t lineNo) {
void *res = NULL;
int32_t code = TSDB_CODE_SUCCESS;
if (NULL == poolHandle || NULL == session || NULL == fileName || size < 0) {
uError("%s invalid input param, handle:%p, session:%p, fileName:%p, size:%" PRId64, __FUNC__, poolHandle, session, fileName, size);
@ -409,18 +480,15 @@ void *taosMemPoolMalloc(void* poolHandle, void* session, int64_t size, char* f
SMemPool* pPool = (SMemPool*)poolHandle;
SMPSession* pSession = (SMPSession*)session;
res = (size > pPool->cfg.chunkSize) ? memPoolAllocFromNSChunk(pPool, pSession, size) : memPoolAllocFromChunk(pPool, pSession, size);
if (MP_GET_FLAG(pPool->dbgInfo.flags, MP_DBG_FLAG_LOG_MALLOC_FREE)) {
//TODO
}
return memPoolMallocImpl(pPool, pSession, size, fileName, lineNo);
_return:
return res;
return NULL;
}
void *taosMemPoolCalloc(void* poolHandle, void* session, int64_t num, int64_t size, char* fileName, int32_t lineNo) {
int32_t code = TSDB_CODE_SUCCESS;
void *res = NULL;
if (NULL == poolHandle || NULL == session || NULL == fileName || num < 0 || size < 0) {
@ -431,9 +499,11 @@ void *taosMemPoolCalloc(void* poolHandle, void* session, int64_t num, int64_t
SMemPool* pPool = (SMemPool*)poolHandle;
SMPSession* pSession = (SMPSession*)session;
res = (size > pPool->cfg.chunkSize) ? memPoolAllocFromNSChunk(pPool, pSession, size) : memPoolAllocFromChunk(pPool, pSession, size);
res = memPoolMallocImpl(pPool, pSession, num * size, fileName, lineNo);
memset(res, 0, num * size);
if (NULL != res) {
memset(res, 0, num * size);
}
if (MP_GET_FLAG(pPool->dbgInfo.flags, MP_DBG_FLAG_LOG_MALLOC_FREE)) {
//TODO
@ -445,26 +515,129 @@ _return:
}
void *taosMemPoolRealloc(void* poolHandle, void* session, void *ptr, int64_t size, char* fileName, int32_t lineNo) {
int32_t code = TSDB_CODE_SUCCESS;
void *res = NULL;
if (NULL == poolHandle || NULL == session || NULL == fileName || size < 0) {
uError("%s invalid input param, handle:%p, session:%p, fileName:%p, size:%" PRId64,
__FUNC__, poolHandle, session, fileName, size);
MP_ERR_JRET(TSDB_CODE_INVALID_MEM_POOL_PARAM);
}
SMemPool* pPool = (SMemPool*)poolHandle;
SMPSession* pSession = (SMPSession*)session;
if (NULL == ptr) {
res = (size > pPool->cfg.chunkSize) ? memPoolAllocFromNSChunk(pPool, pSession, size) : memPoolAllocFromChunk(pPool, pSession, size);
} else if (0 == size) {
memPoolFreeImpl(pPool, pSession, ptr, fileName, lineNo);
} else {
int64_t origSize = memPoolGetMemorySizeImpl(pPool, pSession, ptr, fileName, lineNo);
if (origSize >= size) {
SMPMemHeader* pHeader = (SMPMemHeader*)((char*)ptr - sizeof(SMPMemHeader));
pHeader->size = size;
} else {
res = (size > pPool->cfg.chunkSize) ? memPoolAllocFromNSChunk(pPool, pSession, size) : memPoolAllocFromChunk(pPool, pSession, size);
SMPMemHeader* pOrigHeader = (SMPMemHeader*)((char*)ptr - sizeof(SMPMemHeader));
SMPMemHeader* pNewHeader = (SMPMemHeader*)((char*)res - sizeof(SMPMemHeader));
memcpy(res, ptr, origSize);
memset((char*)res + origSize, 0, size - origSize);
}
}
if (MP_GET_FLAG(pPool->dbgInfo.flags, MP_DBG_FLAG_LOG_MALLOC_FREE)) {
//TODO
}
_return:
return res;
}
char *taosMemPoolStrdup(void* poolHandle, void* session, const char *ptr, char* fileName, int32_t lineNo) {
int32_t code = TSDB_CODE_SUCCESS;
void *res = NULL;
if (NULL == poolHandle || NULL == session || NULL == fileName || NULL == ptr) {
uError("%s invalid input param, handle:%p, session:%p, fileName:%p, ptr:%p",
__FUNC__, poolHandle, session, fileName, ptr);
MP_ERR_JRET(TSDB_CODE_INVALID_MEM_POOL_PARAM);
}
SMemPool* pPool = (SMemPool*)poolHandle;
SMPSession* pSession = (SMPSession*)session;
int64_t size = strlen(ptr) + 1;
res = (size > pPool->cfg.chunkSize) ? memPoolAllocFromNSChunk(pPool, pSession, size) : memPoolAllocFromChunk(pPool, pSession, size);
if (NULL != res) {
strcpy(res, ptr);
}
if (MP_GET_FLAG(pPool->dbgInfo.flags, MP_DBG_FLAG_LOG_MALLOC_FREE)) {
//TODO
}
_return:
return res;
}
void taosMemPoolFree(void* poolHandle, void* session, void *ptr, char* fileName, int32_t lineNo) {
void taosMemPoolFree(void* poolHandle, void* session, void *ptr, char* fileName, int32_t lineNo) {
int32_t code = TSDB_CODE_SUCCESS;
if (NULL == poolHandle || NULL == session || NULL == fileName || NULL == ptr) {
uError("%s invalid input param, handle:%p, session:%p, fileName:%p, ptr:%p",
__FUNC__, poolHandle, session, fileName, ptr);
MP_ERR_JRET(TSDB_CODE_INVALID_MEM_POOL_PARAM);
}
SMemPool* pPool = (SMemPool*)poolHandle;
SMPSession* pSession = (SMPSession*)session;
memPoolFreeImpl(pPool, pSession, ptr, fileName, lineNo);
_return:
return;
}
int32_t taosMemPoolGetMemorySize(void* poolHandle, void* session, void *ptr, int64_t* size, char* fileName, int32_t lineNo) {
int32_t taosMemPoolGetMemorySize(void* poolHandle, void* session, void *ptr, char* fileName, int32_t lineNo) {
int32_t code = TSDB_CODE_SUCCESS;
if (NULL == poolHandle || NULL == session || NULL == fileName) {
uError("%s invalid input param, handle:%p, session:%p, fileName:%p, size:%p",
__FUNC__, poolHandle, session, fileName, size);
MP_ERR_JRET(TSDB_CODE_INVALID_MEM_POOL_PARAM);
}
if (NULL == ptr) {
return 0;
}
SMemPool* pPool = (SMemPool*)poolHandle;
SMPSession* pSession = (SMPSession*)session;
MP_RET(memPoolGetMemorySizeImpl(pPool, pSession, ptr, fileName, lineNo));
_return:
return -1;
}
void *taosMemPoolMallocAlign(void* poolHandle, void* session, uint32_t alignment, int64_t size, char* fileName, int32_t lineNo) {
void* taosMemPoolMallocAlign(void* poolHandle, void* session, uint32_t alignment, int64_t size, char* fileName, int32_t lineNo) {
int32_t code = TSDB_CODE_SUCCESS;
if (NULL == poolHandle || NULL == session || NULL == fileName || size < 0 || alignment < POINTER_BYTES || alignment % POINTER_BYTES) {
uError("%s invalid input param, handle:%p, session:%p, fileName:%p, alignment:%u, size:%" PRId64,
__FUNC__, poolHandle, session, fileName, alignment, size);
MP_ERR_JRET(TSDB_CODE_INVALID_MEM_POOL_PARAM);
}
SMemPool* pPool = (SMemPool*)poolHandle;
SMPSession* pSession = (SMPSession*)session;
return memPoolMallocImpl(pPool, pSession, size, fileName, lineNo);
_return:
return NULL;
}
void taosMemPoolClose(void* poolHandle) {
void taosMemPoolClose(void* poolHandle) {
}