diff --git a/include/util/tmempool.h b/include/util/tmempool.h index 4f7c8ee17d..d6fe6e5ca2 100644 --- a/include/util/tmempool.h +++ b/include/util/tmempool.h @@ -39,7 +39,7 @@ typedef struct SMemPoolCfg { } SMemPoolCfg; void taosMemPoolModInit(void); -int32_t taosMemPoolOpen(char* poolName, SMemPoolCfg cfg, void** poolHandle); +int32_t taosMemPoolOpen(char* poolName, SMemPoolCfg* cfg, void** poolHandle); void *taosMemPoolMalloc(void* poolHandle, void* session, int64_t size, char* fileName, int32_t lineNo); void *taosMemPoolCalloc(void* poolHandle, void* session, int64_t num, int64_t size, char* fileName, int32_t lineNo); void *taosMemPoolRealloc(void* poolHandle, void* session, void *ptr, int64_t size, char* fileName, int32_t lineNo); @@ -51,6 +51,9 @@ void *taosMemPoolMallocAlign(void* poolHandle, void* session, uint32_t alignme void taosMemPoolClose(void* poolHandle); void taosMemPoolModDestroy(void); +#define taosEnableMemoryPoolUsage(_pool, _session) do { threadPoolHandle = _pool; threadPoolSession = _session; } while (0) +#define taosDisableMemoryPoolUsage() (threadPoolHandle = NULL) + #define taosMemoryMalloc(_size) ((NULL != threadPoolHandle) ? (taosMemPoolMalloc(threadPoolHandle, threadPoolSession, _size, __FILE__, __LINE__)) : (taosMemMalloc(_size))) #define taosMemoryCalloc(_num, _size) ((NULL != threadPoolHandle) ? (taosMemPoolCalloc(threadPoolHandle, threadPoolSession, _num, _size, __FILE__, __LINE__)) : (taosMemCalloc(_num, _size))) #define taosMemoryRealloc(_ptr, _size) ((NULL != threadPoolHandle) ? (taosMemPoolRealloc(threadPoolHandle, threadPoolSession, _ptr, _size, __FILE__, __LINE__)) : (taosMemRealloc(_ptr, _size))) diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index a79ee03ab1..db5edba6bf 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -53,6 +53,9 @@ char tsEncryptKey[17] = {0}; int32_t tsMaxShellConns = 50000; int32_t tsShellActivityTimer = 3; // second +// memory pool +int8_t tsQueryUseMemoryPool = 1; + // queue & threads int32_t tsNumOfRpcThreads = 1; int32_t tsNumOfRpcSessions = 30000; @@ -671,6 +674,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { if (cfgAddInt32(pCfg, "queryRspPolicy", tsQueryRspPolicy, 0, 1, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != 0) return -1; if (cfgAddInt32(pCfg, "numOfCommitThreads", tsNumOfCommitThreads, 1, 1024, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1; if (cfgAddInt32(pCfg, "retentionSpeedLimitMB", tsRetentionSpeedLimitMB, 0, 1024, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1; + if (cfgAddBool(pCfg, "queryUseMemoryPool", tsQueryUseMemoryPool, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1; if (cfgAddInt32(pCfg, "numOfMnodeReadThreads", tsNumOfMnodeReadThreads, 1, 1024, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1; if (cfgAddInt32(pCfg, "numOfVnodeQueryThreads", tsNumOfVnodeQueryThreads, 4, 1024, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1; @@ -1120,6 +1124,8 @@ static int32_t taosSetServerCfg(SConfig *pCfg) { tsNumOfRpcSessions = cfgGetItem(pCfg, "numOfRpcSessions")->i32; tsTimeToGetAvailableConn = cfgGetItem(pCfg, "timeToGetAvailableConn")->i32; + tsQueryUseMemoryPool = (bool)cfgGetItem(pCfg, "queryUseMemoryPool")->bval; + tsNumOfCommitThreads = cfgGetItem(pCfg, "numOfCommitThreads")->i32; tsRetentionSpeedLimitMB = cfgGetItem(pCfg, "retentionSpeedLimitMB")->i32; tsNumOfMnodeReadThreads = cfgGetItem(pCfg, "numOfMnodeReadThreads")->i32; diff --git a/source/libs/qworker/inc/qwInt.h b/source/libs/qworker/inc/qwInt.h index 2222f9cb31..d4c8fc103e 100644 --- a/source/libs/qworker/inc/qwInt.h +++ b/source/libs/qworker/inc/qwInt.h @@ -37,6 +37,13 @@ extern "C" { #define QW_SCH_TIMEOUT_MSEC 180000 #define QW_MIN_RES_ROWS 16384 +#define QW_THREAD_MAX_SCHED_TASK_NUM 10 + +#define QW_QUERY_MEM_POOL_NAME "Query" +#define QW_DEFAULT_RESERVE_MEM_PERCENT 20 +#define QW_MIN_RESERVE_MEM_SIZE (512 * 1048576) +#define QW_MIN_MEM_POOL_SIZE (256 * 1048576) + enum { QW_PHASE_PRE_QUERY = 1, QW_PHASE_POST_QUERY, @@ -146,6 +153,8 @@ typedef struct SQWTaskCtx { void *taskHandle; void *sinkHandle; SArray *tbInfo; // STbVerInfo + + void *memPoolSession; } SQWTaskCtx; typedef struct SQWSchStatus { diff --git a/source/libs/qworker/src/qwMem.c b/source/libs/qworker/src/qwMem.c new file mode 100644 index 0000000000..40cf29057d --- /dev/null +++ b/source/libs/qworker/src/qwMem.c @@ -0,0 +1,49 @@ +#include "qwInt.h" +#include "qworker.h" + +void* gQueryPoolHandle = NULL; + +int32_t qwGetMemPoolMaxMemSize(int64_t totalSize, int64_t* maxSize) { + int64_t reserveSize = TMAX(totalSize * QW_DEFAULT_RESERVE_MEM_PERCENT / 100 / 1048576 * 1048576, QW_MIN_RESERVE_MEM_SIZE); + int64_t availSize = (totalSize - reserveSize) / 1048576 * 1048576; + if (availSize < QW_MIN_MEM_POOL_SIZE) { + return -1; + } + + *maxSize = availSize; + + return TSDB_CODE_SUCCESS; +} + +int32_t qwGetMemPoolChunkSize(int64_t totalSize, int32_t threadNum, int64_t* chunkSize) { + *chunkSize = 2 * 1048576; + + return TSDB_CODE_SUCCESS; +} + + +void qwInitQueryPool(void) { + int64_t memSize = 0; + int32_t code = taosGetSysMemory(&memSize); + if (TSDB_CODE_SUCCESS != code) { + return; + } + + SMemPoolCfg cfg = {0}; + code = qwGetMemPoolMaxMemSize(memSize, &cfg.maxSize); + if (TSDB_CODE_SUCCESS != code) { + return; + } + + cfg.threadNum = 10; //TODO + cfg.evicPolicy = E_EVICT_AUTO; //TODO + + code = qwGetMemPoolChunkSize(cfg.maxSize, cfg.threadNum, &cfg.chunkSize); + if (TSDB_CODE_SUCCESS != code) { + return; + } + + taosMemPoolOpen(QW_QUERY_MEM_POOL_NAME, &cfg, &gQueryPoolHandle); +} + + diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index c181997ad2..9dfad8c525 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -18,6 +18,8 @@ SQWorkerMgmt gQwMgmt = { .qwNum = 0, }; +static TdThreadOnce gQueryPoolInit = PTHREAD_ONCE_INIT; + int32_t qwStopAllTasks(SQWorker *mgmt) { uint64_t qId, tId, sId; int32_t eId; @@ -707,7 +709,11 @@ int32_t qwPreprocessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) { ctx->ctrlConnInfo = qwMsg->connInfo; ctx->sId = sId; ctx->phase = -1; - + + if (NULL != gQueryPoolHandle) { + QW_ERR_JRET(taosMemPoolInitSession(gQueryPoolHandle, &ctx->memPoolSession)); + } + QW_ERR_JRET(qwAddTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_INIT)); qwSendQueryRsp(QW_FPARAMS(), qwMsg->msgType + 1, ctx, code, true); @@ -1256,6 +1262,8 @@ int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, void **qWorkerMgmt, const S QW_RET(TSDB_CODE_QRY_INVALID_INPUT); } + taosThreadOnce(&gQueryPoolInit, qwInitQueryPool); + int32_t qwNum = atomic_add_fetch_32(&gQwMgmt.qwNum, 1); if (1 == qwNum) { memset(gQwMgmt.param, 0, sizeof(gQwMgmt.param)); diff --git a/source/os/src/osMemory.c b/source/os/src/osMemory.c index d130ebe469..2852abbf50 100644 --- a/source/os/src/osMemory.c +++ b/source/os/src/osMemory.c @@ -254,7 +254,7 @@ int32_t taosMemoryDbgInitRestore() { #endif } -void *taosMemoryMalloc(int64_t size) { +void *taosMemMalloc(int64_t size) { #ifdef USE_TD_MEMORY void *tmp = malloc(size + sizeof(TdMemoryInfo)); if (tmp == NULL) return NULL; @@ -270,7 +270,7 @@ void *taosMemoryMalloc(int64_t size) { #endif } -void *taosMemoryCalloc(int64_t num, int64_t size) { +void *taosMemCalloc(int64_t num, int64_t size) { #ifdef USE_TD_MEMORY int32_t memorySize = num * size; char *tmp = calloc(memorySize + sizeof(TdMemoryInfo), 1); @@ -287,7 +287,7 @@ void *taosMemoryCalloc(int64_t num, int64_t size) { #endif } -void *taosMemoryRealloc(void *ptr, int64_t size) { +void *taosMemRealloc(void *ptr, int64_t size) { #ifdef USE_TD_MEMORY if (ptr == NULL) return taosMemoryMalloc(size); @@ -334,7 +334,7 @@ char *taosStrdup(const char *ptr) { #endif } -void taosMemoryFree(void *ptr) { +void taosMemFree(void *ptr) { if (NULL == ptr) return; #ifdef USE_TD_MEMORY TdMemoryInfoPtr pTdMemoryInfo = (TdMemoryInfoPtr)((char *)ptr - sizeof(TdMemoryInfo)); @@ -350,7 +350,7 @@ void taosMemoryFree(void *ptr) { #endif } -int64_t taosMemorySize(void *ptr) { +int64_t taosMemSize(void *ptr) { if (ptr == NULL) return 0; #ifdef USE_TD_MEMORY @@ -373,7 +373,7 @@ int64_t taosMemorySize(void *ptr) { #endif } -void taosMemoryTrim(int32_t size) { +void taosMemTrim(int32_t size) { #if defined(WINDOWS) || defined(DARWIN) || defined(_ALPINE) // do nothing return; @@ -382,7 +382,7 @@ void taosMemoryTrim(int32_t size) { #endif } -void *taosMemoryMallocAlign(uint32_t alignment, int64_t size) { +void *taosMemMallocAlign(uint32_t alignment, int64_t size) { #ifdef USE_TD_MEMORY ASSERT(0); #else @@ -390,7 +390,7 @@ void *taosMemoryMallocAlign(uint32_t alignment, int64_t size) { void *p = memalign(alignment, size); return p; #else - return taosMemoryMalloc(size); + return taosMemMalloc(size); #endif #endif } diff --git a/source/util/inc/tmempoolInt.h b/source/util/inc/tmempoolInt.h index c62dd39644..acd63db984 100755 --- a/source/util/inc/tmempoolInt.h +++ b/source/util/inc/tmempoolInt.h @@ -22,17 +22,37 @@ extern "C" { #include "os.h" -#define MP_CHUNKGRP_ALLOC_BATCH_SIZE 1000 -#define MP_NSCHUNKGRP_ALLOC_BATCH_SIZE 500 -#define MP_MAX_KEEP_FREE_CHUNK_NUM 1000 +#define MP_CHUNK_CACHE_ALLOC_BATCH_SIZE 1000 +#define MP_NSCHUNK_CACHE_ALLOC_BATCH_SIZE 500 +#define MP_SESSION_CACHE_ALLOC_BATCH_SIZE 100 +#define MP_MAX_KEEP_FREE_CHUNK_NUM 1000 +#define MP_MAX_MALLOC_MEM_SIZE 0xFFFFFFFFFF + + +// FLAGS AREA #define MP_CHUNK_FLAG_IN_USE (1 << 0) #define MP_CHUNK_FLAG_NS_CHUNK (1 << 1) #define MP_DBG_FLAG_LOG_MALLOC_FREE (1 << 0) +#define MP_MEM_HEADER_FLAG_NS_CHUNK (1 << 0) + +typedef struct SMPMemHeader { + uint64_t flags:3; + uint64_t size:5; +} SMPMemHeader; + +typedef struct SMPMemTailer { + +} SMPMemTailer; + +typedef struct SMPListNode { + void *pNext; +} SMPListNode; + typedef struct SMPChunk { - void *pNext; + SMPListNode list; char *pMemStart; int32_t flags; /* KEEP ABOVE SAME WITH SMPNSChunk */ @@ -41,22 +61,22 @@ typedef struct SMPChunk { } SMPChunk; typedef struct SMPNSChunk { - void *pNext; + SMPListNode list; char *pMemStart; int32_t flags; /* KEEP ABOVE SAME WITH SMPChunk */ - uint32_t offset; + uint64_t offset; uint64_t memBytes; } SMPNSChunk; -typedef struct SMPChunkGroup { - int32_t chunkNum; +typedef struct SMPCacheGroup { + int32_t nodesNum; int32_t idleOffset; - void *pChunks; + void *pNodes; void* pNext; -} SMPChunkGroup; +} SMPCacheGroup; typedef struct SMPMemoryStat { int64_t chunkAlloc; @@ -74,6 +94,7 @@ typedef struct SMPStat { } SMPStat; typedef struct SMPSession { + SMPListNode list; int64_t allocChunkNum; int64_t allocChunkMemSize; int64_t allocMemSize; @@ -99,14 +120,14 @@ typedef struct SMPSession { SMPStat stat; } SMPSession; -typedef struct SMPChunkGroupInfo { - int16_t chunkNodeSize; - int64_t allocChunkNum; - int32_t chunkGroupNum; - SMPChunkGroup *pChunkGrpHead; - SMPChunkGroup *pChunkGrpTail; - void *pIdleChunkList; -} SMPChunkGroupInfo; +typedef struct SMPCacheGroupInfo { + int16_t nodeSize; + int64_t allocNum; + int32_t groupNum; + SMPCacheGroup *pGrpHead; + SMPCacheGroup *pGrpTail; + void *pIdleList; +} SMPCacheGroupInfo; typedef struct SMPDebugInfo { int64_t flags; @@ -127,8 +148,9 @@ typedef struct SMemPool { int64_t allocNSChunkSize; int64_t allocMemSize; - SMPChunkGroupInfo chunkGrpInfo; - SMPChunkGroupInfo NSChunkGrpInfo; + SMPCacheGroupInfo chunkCache; + SMPCacheGroupInfo NSChunkCache; + SMPCacheGroupInfo sessionCache; int32_t readyChunkNum; int32_t readyChunkReserveNum; @@ -154,13 +176,21 @@ enum { MP_WRITE, }; +#define MP_INIT_MEM_HEADER(_header, _size, _nsChunk) \ + do { \ + (_header)->size = _size; \ + if (_nsChunk) { \ + MP_SET_FLAG((_header)->flags, MP_MEM_HEADER_FLAG_NS_CHUNK); \ + } \ + } while (0) + #define MP_ADD_TO_CHUNK_LIST(_chunkHead, _chunkTail, _chunkNum, _chunk) \ do { \ if (NULL == _chunkHead) { \ _chunkHead = _chunk; \ _chunkTail = _chunk; \ } else { \ - (_chunkTail)->pNext = _chunk; \ + (_chunkTail)->list.pNext = _chunk; \ } \ (_chunkNum)++; \ } while (0) diff --git a/source/util/src/tmempool.c b/source/util/src/tmempool.c index f5c3abac49..d2b1b83d14 100644 --- a/source/util/src/tmempool.c +++ b/source/util/src/tmempool.c @@ -45,82 +45,82 @@ int32_t memPoolCheckCfg(SMemPoolCfg* cfg) { return TSDB_CODE_SUCCESS; } -void memPoolFreeChunkGroup(SMPChunkGroup* pGrp) { +void memPoolFreeChunkGroup(SMPCacheGroup* pGrp) { //TODO } -int32_t memPoolAddChunkGroup(SMemPool* pPool, SMPChunkGroupInfo* pInfo, SMPChunkGroup* pTail) { - SMPChunkGroup* pGrp = NULL; - if (NULL == pInfo->pChunkGrpHead) { - pInfo->pChunkGrpHead = taosMemCalloc(1, sizeof(*pInfo->pChunkGrpHead)); - if (NULL == pInfo->pChunkGrpHead) { +int32_t memPoolAddCacheGroup(SMemPool* pPool, SMPCacheGroupInfo* pInfo, SMPCacheGroup* pTail) { + SMPCacheGroup* pGrp = NULL; + if (NULL == pInfo->pGrpHead) { + pInfo->pGrpHead = taosMemCalloc(1, sizeof(*pInfo->pGrpHead)); + if (NULL == pInfo->pGrpHead) { uError("malloc chunkCache failed"); MP_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); } - pGrp = pInfo->pChunkGrpHead; + pGrp = pInfo->pGrpHead; } else { - pGrp = (SMPChunkGroup*)taosMemCalloc(1, sizeof(SMPChunkGroup)); + pGrp = (SMPCacheGroup*)taosMemCalloc(1, sizeof(SMPCacheGroup)); } - pGrp->chunkNum = pInfo->chunkGroupNum; - pGrp->pChunks = taosMemoryCalloc(pGrp->chunkNum, pInfo->chunkNodeSize); - if (NULL == pGrp->pChunks) { - uError("calloc %d %d chunks in chunk group failed", pGrp->chunkNum, pInfo->chunkNodeSize); + pGrp->nodesNum = pInfo->groupNum; + pGrp->pNodes = taosMemoryCalloc(pGrp->nodesNum, pInfo->nodeSize); + if (NULL == pGrp->pNodes) { + uError("calloc %d %d nodes in cache group failed", pGrp->nodesNum, pInfo->nodeSize); MP_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); } - if (atomic_val_compare_exchange_ptr(&pInfo->pChunkGrpTail, pTail, pGrp) != pTail) { + if (atomic_val_compare_exchange_ptr(&pInfo->pGrpTail, pTail, pGrp) != pTail) { memPoolFreeChunkGroup(pGrp); return TSDB_CODE_SUCCESS; } - atomic_add_fetch_64(&pInfo->allocChunkNum, pGrp->chunkNum); + atomic_add_fetch_64(&pInfo->allocNum, pGrp->nodesNum); return TSDB_CODE_SUCCESS; } -int32_t memPoolGetIdleChunk(SMemPool* pPool, SMPChunkGroupInfo* pInfo, void** ppChunk) { - SMPChunkGroup* pGrp = NULL; - SMPChunk* pChunk = NULL; +int32_t memPoolGetIdleNode(SMemPool* pPool, SMPCacheGroupInfo* pInfo, void** ppRes) { + SMPCacheGroup* pGrp = NULL; + SMPListNode* pList = NULL; while (true) { - pChunk = (SMPChunk*)atomic_load_ptr(&pInfo->pIdleChunkList); - if (NULL == pChunk) { + pList = (SMPListNode*)atomic_load_ptr(&pInfo->pIdleList); + if (NULL == pList) { break; } - if (atomic_val_compare_exchange_ptr(&pInfo->pIdleChunkList, pChunk, pChunk->pNext) != pChunk) { + if (atomic_val_compare_exchange_ptr(&pInfo->pIdleList, pList, pList->pNext) != pList) { continue; } - pChunk->pNext = NULL; + pList->pNext = NULL; goto _return; } while (true) { - pGrp = atomic_load_ptr(&pInfo->pChunkGrpTail); + pGrp = atomic_load_ptr(&pInfo->pGrpTail); int32_t offset = atomic_fetch_add_32(&pGrp->idleOffset, 1); - if (offset < pGrp->chunkNum) { - pChunk = (SMPChunk*)((char*)pGrp->pChunks + offset * pInfo->chunkNodeSize); + if (offset < pGrp->nodesNum) { + pList = (SMPListNode*)((char*)pGrp->pNodes + offset * pInfo->nodeSize); break; } else { atomic_sub_fetch_32(&pGrp->idleOffset, 1); } - MP_ERR_RET(memPoolAddChunkGroup(pPool, pInfo, pGrp)); + MP_ERR_RET(memPoolAddCacheGroup(pPool, pInfo, pGrp)); } _return: - *ppChunk = pChunk; + *ppRes = pList; return TSDB_CODE_SUCCESS; } int32_t memPoolNewChunk(SMemPool* pPool, SMPChunk** ppChunk) { SMPChunk* pChunk = NULL; - MP_ERR_RET(memPoolGetIdleChunk(pPool, &pPool->chunkGrpInfo, &pChunk)); + MP_ERR_RET(memPoolGetIdleNode(pPool, &pPool->chunkCache, &pChunk)); pChunk->pMemStart = taosMemMalloc(pPool->cfg.chunkSize); if (NULL == pChunk->pMemStart) { @@ -128,13 +128,16 @@ int32_t memPoolNewChunk(SMemPool* pPool, SMPChunk** ppChunk) { return TSDB_CODE_OUT_OF_MEMORY; } + pPool->allocChunkNum++; + pPool->allocChunkSize += pPool->cfg.chunkSize; + return TSDB_CODE_SUCCESS; } int32_t memPoolNewNSChunk(SMemPool* pPool, SMPNSChunk** ppChunk, int64_t chunkSize) { SMPNSChunk* pChunk = NULL; - MP_ERR_RET(memPoolGetIdleChunk(pPool, &pPool->NSChunkGrpInfo, &pChunk)); + MP_ERR_RET(memPoolGetIdleNode(pPool, &pPool->NSChunkCache, &pChunk)); pChunk->pMemStart = taosMemMalloc(chunkSize); if (NULL == pChunk->pMemStart) { @@ -145,12 +148,15 @@ int32_t memPoolNewNSChunk(SMemPool* pPool, SMPNSChunk** ppChunk, int64_t chunkSi pChunk->memBytes = chunkSize; MP_SET_FLAG(pChunk->flags, MP_CHUNK_FLAG_NS_CHUNK); + pPool->allocNSChunkNum++; + pPool->allocNSChunkSize += pPool->cfg.chunkSize; + return TSDB_CODE_SUCCESS; } int32_t memPoolPrepareChunks(SMemPool* pPool, int32_t num) { - SMPChunkGroup* pGrp = NULL; + SMPCacheGroup* pGrp = NULL; SMPChunk* pChunk = NULL; for (int32_t i = 0; i < num; ++i) { MP_ERR_RET(memPoolNewChunk(pPool, &pGrp, &pChunk)); @@ -159,7 +165,7 @@ int32_t memPoolPrepareChunks(SMemPool* pPool, int32_t num) { pPool->readyChunkHead = pChunk; pPool->readyChunkTail = pChunk; } else { - pPool->readyChunkTail->pNext = pChunk; + pPool->readyChunkTail->list.pNext = pChunk; } atomic_add_fetch_32(&pPool->readyChunkNum, 1); @@ -204,14 +210,18 @@ int32_t memPoolInit(SMemPool* pPool, char* poolName, SMemPoolCfg* cfg) { pPool->threadChunkReserveNum = 1; pPool->readyChunkReserveNum = TMIN(cfg->threadNum * pPool->threadChunkReserveNum, pPool->maxChunkNum); - pPool->chunkGrpInfo.chunkGroupNum = TMAX(pPool->maxChunkNum / 10, MP_CHUNKGRP_ALLOC_BATCH_SIZE); - pPool->chunkGrpInfo.chunkNodeSize = sizeof(SMPChunk); - pPool->NSChunkGrpInfo.chunkGroupNum = MP_NSCHUNKGRP_ALLOC_BATCH_SIZE; - pPool->NSChunkGrpInfo.chunkNodeSize = sizeof(SMPNSChunk); + pPool->chunkCache.groupNum = TMAX(pPool->maxChunkNum / 10, MP_CHUNK_CACHE_ALLOC_BATCH_SIZE); + pPool->chunkCache.nodeSize = sizeof(SMPChunk); + pPool->NSChunkCache.groupNum = MP_NSCHUNK_CACHE_ALLOC_BATCH_SIZE; + pPool->NSChunkCache.nodeSize = sizeof(SMPNSChunk); + pPool->sessionCache.groupNum = MP_SESSION_CACHE_ALLOC_BATCH_SIZE; + pPool->sessionCache.nodeSize = sizeof(SMPSession); - MP_ERR_RET(memPoolAddChunkGroup(pPool, &pPool->chunkGrpInfo, NULL)); + MP_ERR_RET(memPoolAddCacheGroup(pPool, &pPool->chunkCache, NULL)); + MP_ERR_RET(memPoolAddCacheGroup(pPool, &pPool->NSChunkCache, NULL)); + MP_ERR_RET(memPoolAddCacheGroup(pPool, &pPool->sessionCache, NULL)); - MP_ERR_RET(memPoolGetIdleChunk(pPool, &pPool->chunkGrpInfo, &pPool->readyChunkHead)); + MP_ERR_RET(memPoolGetIdleNode(pPool, &pPool->chunkCache, &pPool->readyChunkHead)); pPool->readyChunkTail = pPool->readyChunkHead; MP_ERR_RET(memPoolEnsureChunks(pPool)); @@ -224,7 +234,7 @@ void memPoolNotifyLowChunkNum(SMemPool* pPool) { } int32_t memPoolGetChunk(SMemPool* pPool, SMPChunk** ppChunk) { - SMPChunkGroup* pCache = NULL; + SMPCacheGroup* pCache = NULL; SMPChunk* pChunk = NULL; int32_t readyChunkNum = atomic_sub_fetch_32(&pPool->readyChunkNum, 1); if (readyChunkNum >= 0) { @@ -232,9 +242,9 @@ int32_t memPoolGetChunk(SMemPool* pPool, SMPChunk** ppChunk) { memPoolNotifyLowChunkNum(pPool); } - pChunk = (SMPChunk*)atomic_load_ptr(&pPool->readyChunkHead->pNext); - while (atomic_val_compare_exchange_ptr(&pPool->readyChunkHead->pNext, pChunk, pChunk->pNext) != pChunk) { - pChunk = (SMPChunk*)atomic_load_ptr(&pPool->readyChunkHead->pNext); + pChunk = (SMPChunk*)atomic_load_ptr(&pPool->readyChunkHead->list.pNext); + while (atomic_val_compare_exchange_ptr(&pPool->readyChunkHead->list.pNext, pChunk, pChunk->list.pNext) != pChunk) { + pChunk = (SMPChunk*)atomic_load_ptr(&pPool->readyChunkHead->list.pNext); } *ppChunk = pChunk; @@ -254,7 +264,7 @@ int32_t memPoolGetChunkFromSession(SMemPool* pPool, SMPSession* pSession, int64_ } *ppPreChunk = pChunk; - pChunk = (SMPChunk*)pChunk->pNext; + pChunk = (SMPChunk*)pChunk->list.pNext; } if (NULL == *ppChunk) { @@ -267,8 +277,10 @@ int32_t memPoolGetChunkFromSession(SMemPool* pPool, SMPSession* pSession, int64_ void* memPoolAllocFromChunk(SMemPool* pPool, SMPSession* pSession, int64_t size) { SMPChunk* pChunk = NULL, *preSrcChunk = NULL; void* pRes = NULL; + int64_t totalSize = size + sizeof(SMPMemHeader) + sizeof(SMPMemTailer); + if (pSession->srcChunkNum > 0) { - MP_ERR_JRET(memPoolGetChunkFromSession(pPool, pSession, size, &pChunk, &preSrcChunk)); + MP_ERR_JRET(memPoolGetChunkFromSession(pPool, pSession, totalSize, &pChunk, &preSrcChunk)); } if (NULL == pChunk) { @@ -276,21 +288,24 @@ void* memPoolAllocFromChunk(SMemPool* pPool, SMPSession* pSession, int64_t size) pSession->allocChunkNum++; pSession->allocChunkMemSize += pPool->cfg.chunkSize; - pSession->allocMemSize += size; + pSession->allocMemSize += totalSize; MP_ADD_TO_CHUNK_LIST(pSession->srcChunkHead, pSession->srcChunkTail, pSession->srcChunkNum, pChunk); MP_ADD_TO_CHUNK_LIST(pSession->inUseChunkHead, pSession->inUseChunkTail, pSession->inUseChunkNum, pChunk); } - pRes = pChunk->pMemStart + pChunk->offset; - pChunk->offset += size; + SMPMemHeader* pHeader = (SMPMemHeader*)(pChunk->pMemStart + pChunk->offset); + MP_INIT_MEM_HEADER(pHeader, size, false); + + pRes = (void*)(pHeader + 1); + pChunk->offset += totalSize; if (pChunk->offset >= (pPool->cfg.chunkSize - pPool->maxDiscardSize)) { if (NULL == preSrcChunk) { pSession->srcChunkHead = NULL; pSession->srcChunkTail = NULL; } else { - preSrcChunk->pNext = pChunk->pNext; + preSrcChunk->list.pNext = pChunk->list.pNext; } pSession->srcChunkNum--; @@ -304,22 +319,77 @@ _return: void* memPoolAllocFromNSChunk(SMemPool* pPool, SMPSession* pSession, int64_t size) { SMPNSChunk* pChunk = NULL; - MP_ERR_JRET(memPoolNewNSChunk(pPool, &pChunk, size)); + void* pRes = NULL; + int64_t totalSize = size + sizeof(SMPMemHeader) + sizeof(SMPMemTailer); + + MP_ERR_JRET(memPoolNewNSChunk(pPool, &pChunk, totalSize)); + SMPMemHeader* pHeader = (SMPMemHeader*)pChunk->pMemStart; + MP_INIT_MEM_HEADER(pHeader, size, false); + pRes = (void*)(pHeader + 1); + pSession->allocChunkNum++; - pSession->allocChunkMemSize += size; - pSession->allocMemSize += size; + pSession->allocChunkMemSize += totalSize; + pSession->allocMemSize += totalSize; if (NULL == pSession->inUseNSChunkHead) { pSession->inUseNSChunkHead = pChunk; pSession->inUseNSChunkTail = pChunk; } else { - pSession->inUseNSChunkTail->pNext = pChunk; + pSession->inUseNSChunkTail->list.pNext = pChunk; } _return: - return pChunk ? pChunk->pMemStart : NULL; + return pRes; +} + +void *memPoolMallocImpl(SMemPool* pPool, SMPSession* pSession, int64_t size, char* fileName, int32_t lineNo) { + int32_t code = TSDB_CODE_SUCCESS; + void *res = NULL; + + res = (size > pPool->cfg.chunkSize) ? memPoolAllocFromNSChunk(pPool, pSession, size) : memPoolAllocFromChunk(pPool, pSession, size); + + if (MP_GET_FLAG(pPool->dbgInfo.flags, MP_DBG_FLAG_LOG_MALLOC_FREE)) { + //TODO + } + +_return: + + return res; +} + +void *memPoolCallocImpl(SMemPool* pPool, SMPSession* pSession, int64_t num, int64_t size, char* fileName, int32_t lineNo) { + int32_t code = TSDB_CODE_SUCCESS; + int64_t totalSize = num * size; + void *res = memPoolMallocImpl(pPool, pSession, totalSize, fileName, lineNo); + + if (NULL != res) { + memset(res, 0, totalSize); + } + + if (MP_GET_FLAG(pPool->dbgInfo.flags, MP_DBG_FLAG_LOG_MALLOC_FREE)) { + //TODO + } + +_return: + + return res; +} + + +void memPoolFreeImpl(SMemPool* pPool, SMPSession* pSession, void *ptr, char* fileName, int32_t lineNo) { + //TODO + + if (MP_GET_FLAG(pPool->dbgInfo.flags, MP_DBG_FLAG_LOG_MALLOC_FREE)) { + //TODO + } +} + +int64_t memPoolGetMemorySizeImpl(SMemPool* pPool, SMPSession* pSession, void *ptr, char* fileName, int32_t lineNo) { + SMPMemHeader* pHeader = (char)ptr - sizeof(SMPMemHeader); + + return pHeader->size; } @@ -330,7 +400,7 @@ void taosMemPoolModInit(void) { gMPoolList = taosArrayInit(10, POINTER_BYTES); } -int32_t taosMemPoolOpen(char* poolName, SMemPoolCfg cfg, void** poolHandle) { +int32_t taosMemPoolOpen(char* poolName, SMemPoolCfg* cfg, void** poolHandle) { int32_t code = TSDB_CODE_SUCCESS; SMemPool* pPool = NULL; @@ -346,7 +416,7 @@ int32_t taosMemPoolOpen(char* poolName, SMemPoolCfg cfg, void** poolHandle) { MP_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY); } - MP_ERR_JRET(memPoolInit(pPool, poolName, &cfg)); + MP_ERR_JRET(memPoolInit(pPool, poolName, cfg)); taosThreadMutexLock(&gMPoolMutex); @@ -359,8 +429,11 @@ _return: if (TSDB_CODE_SUCCESS != code) { taosMemPoolClose(pPool); + pPool = NULL; } + *poolHandle = pPool; + return code; } @@ -372,11 +445,9 @@ void taosMemPoolDestroySession(void* session) { int32_t taosMemPoolInitSession(void* poolHandle, void** ppSession) { int32_t code = TSDB_CODE_SUCCESS; SMemPool* pPool = (SMemPool*)poolHandle; - SMPSession* pSession = (SMPSession*)taosMemCalloc(1, sizeof(SMPSession)); - if (NULL == pSession) { - uError("calloc memory pool session failed"); - MP_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY); - } + SMPSession* pSession = NULL; + + MP_ERR_JRET(memPoolGetIdleNode(pPool, &pPool->sessionCache, &pSession)); MP_ERR_JRET(memPoolGetChunk(pPool, &pSession->srcChunkHead)); @@ -400,7 +471,7 @@ _return: void *taosMemPoolMalloc(void* poolHandle, void* session, int64_t size, char* fileName, int32_t lineNo) { - void *res = NULL; + int32_t code = TSDB_CODE_SUCCESS; if (NULL == poolHandle || NULL == session || NULL == fileName || size < 0) { uError("%s invalid input param, handle:%p, session:%p, fileName:%p, size:%" PRId64, __FUNC__, poolHandle, session, fileName, size); @@ -409,18 +480,15 @@ void *taosMemPoolMalloc(void* poolHandle, void* session, int64_t size, char* f SMemPool* pPool = (SMemPool*)poolHandle; SMPSession* pSession = (SMPSession*)session; - res = (size > pPool->cfg.chunkSize) ? memPoolAllocFromNSChunk(pPool, pSession, size) : memPoolAllocFromChunk(pPool, pSession, size); - - if (MP_GET_FLAG(pPool->dbgInfo.flags, MP_DBG_FLAG_LOG_MALLOC_FREE)) { - //TODO - } + return memPoolMallocImpl(pPool, pSession, size, fileName, lineNo); _return: - return res; + return NULL; } void *taosMemPoolCalloc(void* poolHandle, void* session, int64_t num, int64_t size, char* fileName, int32_t lineNo) { + int32_t code = TSDB_CODE_SUCCESS; void *res = NULL; if (NULL == poolHandle || NULL == session || NULL == fileName || num < 0 || size < 0) { @@ -431,9 +499,11 @@ void *taosMemPoolCalloc(void* poolHandle, void* session, int64_t num, int64_t SMemPool* pPool = (SMemPool*)poolHandle; SMPSession* pSession = (SMPSession*)session; - res = (size > pPool->cfg.chunkSize) ? memPoolAllocFromNSChunk(pPool, pSession, size) : memPoolAllocFromChunk(pPool, pSession, size); + res = memPoolMallocImpl(pPool, pSession, num * size, fileName, lineNo); - memset(res, 0, num * size); + if (NULL != res) { + memset(res, 0, num * size); + } if (MP_GET_FLAG(pPool->dbgInfo.flags, MP_DBG_FLAG_LOG_MALLOC_FREE)) { //TODO @@ -445,26 +515,129 @@ _return: } void *taosMemPoolRealloc(void* poolHandle, void* session, void *ptr, int64_t size, char* fileName, int32_t lineNo) { + int32_t code = TSDB_CODE_SUCCESS; + void *res = NULL; + + if (NULL == poolHandle || NULL == session || NULL == fileName || size < 0) { + uError("%s invalid input param, handle:%p, session:%p, fileName:%p, size:%" PRId64, + __FUNC__, poolHandle, session, fileName, size); + MP_ERR_JRET(TSDB_CODE_INVALID_MEM_POOL_PARAM); + } + SMemPool* pPool = (SMemPool*)poolHandle; + SMPSession* pSession = (SMPSession*)session; + if (NULL == ptr) { + res = (size > pPool->cfg.chunkSize) ? memPoolAllocFromNSChunk(pPool, pSession, size) : memPoolAllocFromChunk(pPool, pSession, size); + } else if (0 == size) { + memPoolFreeImpl(pPool, pSession, ptr, fileName, lineNo); + } else { + int64_t origSize = memPoolGetMemorySizeImpl(pPool, pSession, ptr, fileName, lineNo); + if (origSize >= size) { + SMPMemHeader* pHeader = (SMPMemHeader*)((char*)ptr - sizeof(SMPMemHeader)); + pHeader->size = size; + } else { + res = (size > pPool->cfg.chunkSize) ? memPoolAllocFromNSChunk(pPool, pSession, size) : memPoolAllocFromChunk(pPool, pSession, size); + SMPMemHeader* pOrigHeader = (SMPMemHeader*)((char*)ptr - sizeof(SMPMemHeader)); + SMPMemHeader* pNewHeader = (SMPMemHeader*)((char*)res - sizeof(SMPMemHeader)); + + memcpy(res, ptr, origSize); + memset((char*)res + origSize, 0, size - origSize); + } + } + + if (MP_GET_FLAG(pPool->dbgInfo.flags, MP_DBG_FLAG_LOG_MALLOC_FREE)) { + //TODO + } + +_return: + + return res; } char *taosMemPoolStrdup(void* poolHandle, void* session, const char *ptr, char* fileName, int32_t lineNo) { + int32_t code = TSDB_CODE_SUCCESS; + void *res = NULL; + + if (NULL == poolHandle || NULL == session || NULL == fileName || NULL == ptr) { + uError("%s invalid input param, handle:%p, session:%p, fileName:%p, ptr:%p", + __FUNC__, poolHandle, session, fileName, ptr); + MP_ERR_JRET(TSDB_CODE_INVALID_MEM_POOL_PARAM); + } + SMemPool* pPool = (SMemPool*)poolHandle; + SMPSession* pSession = (SMPSession*)session; + int64_t size = strlen(ptr) + 1; + res = (size > pPool->cfg.chunkSize) ? memPoolAllocFromNSChunk(pPool, pSession, size) : memPoolAllocFromChunk(pPool, pSession, size); + if (NULL != res) { + strcpy(res, ptr); + } + + if (MP_GET_FLAG(pPool->dbgInfo.flags, MP_DBG_FLAG_LOG_MALLOC_FREE)) { + //TODO + } + +_return: + + return res; } -void taosMemPoolFree(void* poolHandle, void* session, void *ptr, char* fileName, int32_t lineNo) { +void taosMemPoolFree(void* poolHandle, void* session, void *ptr, char* fileName, int32_t lineNo) { + int32_t code = TSDB_CODE_SUCCESS; + if (NULL == poolHandle || NULL == session || NULL == fileName || NULL == ptr) { + uError("%s invalid input param, handle:%p, session:%p, fileName:%p, ptr:%p", + __FUNC__, poolHandle, session, fileName, ptr); + MP_ERR_JRET(TSDB_CODE_INVALID_MEM_POOL_PARAM); + } + SMemPool* pPool = (SMemPool*)poolHandle; + SMPSession* pSession = (SMPSession*)session; + memPoolFreeImpl(pPool, pSession, ptr, fileName, lineNo); + +_return: + + return; } -int32_t taosMemPoolGetMemorySize(void* poolHandle, void* session, void *ptr, int64_t* size, char* fileName, int32_t lineNo) { +int32_t taosMemPoolGetMemorySize(void* poolHandle, void* session, void *ptr, char* fileName, int32_t lineNo) { + int32_t code = TSDB_CODE_SUCCESS; + if (NULL == poolHandle || NULL == session || NULL == fileName) { + uError("%s invalid input param, handle:%p, session:%p, fileName:%p, size:%p", + __FUNC__, poolHandle, session, fileName, size); + MP_ERR_JRET(TSDB_CODE_INVALID_MEM_POOL_PARAM); + } + if (NULL == ptr) { + return 0; + } + + SMemPool* pPool = (SMemPool*)poolHandle; + SMPSession* pSession = (SMPSession*)session; + MP_RET(memPoolGetMemorySizeImpl(pPool, pSession, ptr, fileName, lineNo)); + +_return: + + return -1; } -void *taosMemPoolMallocAlign(void* poolHandle, void* session, uint32_t alignment, int64_t size, char* fileName, int32_t lineNo) { +void* taosMemPoolMallocAlign(void* poolHandle, void* session, uint32_t alignment, int64_t size, char* fileName, int32_t lineNo) { + int32_t code = TSDB_CODE_SUCCESS; + + if (NULL == poolHandle || NULL == session || NULL == fileName || size < 0 || alignment < POINTER_BYTES || alignment % POINTER_BYTES) { + uError("%s invalid input param, handle:%p, session:%p, fileName:%p, alignment:%u, size:%" PRId64, + __FUNC__, poolHandle, session, fileName, alignment, size); + MP_ERR_JRET(TSDB_CODE_INVALID_MEM_POOL_PARAM); + } + SMemPool* pPool = (SMemPool*)poolHandle; + SMPSession* pSession = (SMPSession*)session; + return memPoolMallocImpl(pPool, pSession, size, fileName, lineNo); + +_return: + + return NULL; } -void taosMemPoolClose(void* poolHandle) { +void taosMemPoolClose(void* poolHandle) { }