From d34112983f61b35a48dc1da913a7b165f4f60396 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Thu, 4 Jul 2024 19:28:30 +0800 Subject: [PATCH] enh: add query memory management --- include/os/osMemPool.h | 3 + source/client/src/clientHb.c | 7 +- source/common/src/tglobal.c | 1 + source/libs/qworker/inc/qwInt.h | 15 ++- source/libs/qworker/src/qwMem.c | 96 +++++++++++++-- source/libs/qworker/src/qwUtil.c | 5 + source/libs/qworker/src/qworker.c | 9 +- source/util/inc/tmempoolInt.h | 31 ++++- source/util/src/tmempool.c | 186 ++++++++++++++++++++++++------ 9 files changed, 294 insertions(+), 59 deletions(-) diff --git a/include/os/osMemPool.h b/include/os/osMemPool.h index 2af0d46c19..b75f908020 100644 --- a/include/os/osMemPool.h +++ b/include/os/osMemPool.h @@ -42,16 +42,19 @@ typedef enum MemPoolUsageLevel { typedef void (*decConcSessionNum)(void); typedef void (*incConcSessionNum)(void); typedef void (*setConcSessionNum)(int32_t); +typedef void (*retireCollection)(uint64_t, int64_t); typedef struct SMemPoolCallBack { decConcSessionNum decSessFp; incConcSessionNum incSessFp; setConcSessionNum setSessFp; + retireCollection retireFp; } SMemPoolCallBack; typedef struct SMemPoolCfg { int64_t maxSize; int64_t sessionExpectSize; + int64_t *collectionQuota; int32_t chunkSize; int32_t threadNum; int8_t usageLevel[E_MEM_USAGE_MAX_VALUE]; diff --git a/source/client/src/clientHb.c b/source/client/src/clientHb.c index 01ebdec173..684a492c67 100644 --- a/source/client/src/clientHb.c +++ b/source/client/src/clientHb.c @@ -546,9 +546,6 @@ static int32_t hbAsyncCallBack(void *param, SDataBuf *pMsg, int32_t code) { } SAppInstInfo *pInst = pAppHbMgr->pAppInstInfo; - pInst->monitorParas = pRsp.monitorParas; - tscDebug("[monitor] paras from hb, clusterId:%" PRIx64 " monitorParas threshold:%d scope:%d", - pInst->clusterId, pRsp.monitorParas.tsSlowLogThreshold, pRsp.monitorParas.tsSlowLogScope); if (code != 0) { pInst->onlineDnodes = pInst->totalDnodes ? 0 : -1; @@ -560,6 +557,10 @@ static int32_t hbAsyncCallBack(void *param, SDataBuf *pMsg, int32_t code) { return -1; } + pInst->monitorParas = pRsp.monitorParas; + tscDebug("[monitor] paras from hb, clusterId:%" PRIx64 " monitorParas threshold:%d scope:%d", + pInst->clusterId, pRsp.monitorParas.tsSlowLogThreshold, pRsp.monitorParas.tsSlowLogScope); + if (rspNum) { tscDebug("hb got %d rsp, %d empty rsp received before", rspNum, atomic_val_compare_exchange_32(&emptyRspNum, emptyRspNum, 0)); diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 45a1a64393..9f25dcb450 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -60,6 +60,7 @@ int8_t tsQueryUseMemoryPool = 1; int32_t tsQueryMinConcurrentTaskNum = 1; int32_t tsQueryMaxConcurrentTaskNum = 0; int32_t tsQueryConcurrentTaskNum = 0; +int64_t tsSingleQueryMaxMemorySize = 0; int32_t tsNumOfRpcThreads = 1; int32_t tsNumOfRpcSessions = 30000; diff --git a/source/libs/qworker/inc/qwInt.h b/source/libs/qworker/inc/qwInt.h index b28254d91d..f2fd55b67b 100644 --- a/source/libs/qworker/inc/qwInt.h +++ b/source/libs/qworker/inc/qwInt.h @@ -231,11 +231,16 @@ typedef struct SQWorkerMgmt { int32_t paramIdx; } SQWorkerMgmt; +typedef struct SQWQueryInfo { + void* pCollection; + SHashObj* pSessions; +} SQWQueryInfo; + typedef struct SQueryMgmt { - SRWLatch taskMgmtLock; - int32_t concTaskLevel; - - void* memPoolHandle; + SRWLatch taskMgmtLock; + int32_t concTaskLevel; + SHashObj* pQueryInfo; + void* memPoolHandle; } SQueryMgmt; #define QW_CTX_NOT_EXISTS_ERR_CODE(mgmt) (atomic_load_8(&(mgmt)->nodeStopped) ? TSDB_CODE_VND_STOPPED : TSDB_CODE_QRY_TASK_CTX_NOT_EXIST) @@ -446,7 +451,7 @@ void qwDbgSimulateRedirect(SQWMsg *qwMsg, SQWTaskCtx *ctx, bool *rsped); void qwDbgSimulateSleep(void); void qwDbgSimulateDead(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *rsped); int32_t qwSendExplainResponse(QW_FPARAMS_DEF, SQWTaskCtx *ctx); -void qwInitQueryPool(void); +int32_t qwInitQueryPool(void); #ifdef __cplusplus } diff --git a/source/libs/qworker/src/qwMem.c b/source/libs/qworker/src/qwMem.c index 932fb7b1fc..e39c7ec459 100644 --- a/source/libs/qworker/src/qwMem.c +++ b/source/libs/qworker/src/qwMem.c @@ -4,9 +4,9 @@ int32_t qwGetMemPoolMaxMemSize(int64_t totalSize, int64_t* maxSize) { int64_t reserveSize = TMAX(totalSize * QW_DEFAULT_RESERVE_MEM_PERCENT / 100 / 1048576 * 1048576, QW_MIN_RESERVE_MEM_SIZE); int64_t availSize = (totalSize - reserveSize) / 1048576 * 1048576; - if (availSize < QW_MIN_MEM_POOL_SIZE) { - return -1; - } + //if (availSize < QW_MIN_MEM_POOL_SIZE) { + // return -1; + //} *maxSize = availSize; @@ -34,42 +34,118 @@ void qwSetConcurrentTaskNum(int32_t taskNum) { void qwDecConcurrentTaskNum(void) { int32_t concTaskLevel = atomic_load_32(&gQueryMgmt.concTaskLevel); - if (concTaskLevel < QW_CONC_TASK_LEVEL_LOW) { + if (concTaskLevel <= QW_CONC_TASK_LEVEL_LOW) { qError("Unable to decrease concurrent task num, current task level:%d", concTaskLevel); return; } - + //TODO } -void qwInitQueryPool(void) { +void qwIncConcurrentTaskNum(void) { + int32_t concTaskLevel = atomic_load_32(&gQueryMgmt.concTaskLevel); + if (concTaskLevel >= QW_CONC_TASK_LEVEL_FULL) { + qError("Unable to increase concurrent task num, current task level:%d", concTaskLevel); + return; + } + + //TODO +} + +int32_t qwInitQueryInfo(uint64_t qId, SQWQueryInfo* pQuery) { + pQuery->pSessions= taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK); + if (NULL == pQuery->pSessions) { + qError("fail to init session hash"); + return TSDB_CODE_OUT_OF_MEMORY; + } + + int32_t code = taosMemPoolCallocCollection(qId, &pQuery->pCollection); + if (TSDB_CODE_SUCCESS != code) { + taosHashCleanup(pQuery->pSessions); + return code; + } + + return code; +} + +int32_t qwInitSession(uint64_t qId, void** ppSession) { + int32_t code = TSDB_CODE_SUCCESS; + SQWQueryInfo* pQuery = NULL; + + while (true) { + pQuery = (SQWQueryInfo*)taosHashGet(gQueryMgmt.pQueryInfo, &qId, sizeof(qId)); + if (NULL == pQuery) { + SQWQueryInfo queryInfo = {0}; + code = qwInitQueryInfo(qId, &queryInfo); + if (TSDB_CODE_SUCCESS != code) { + return code; + } + + code = taosHashPut(gQueryMgmt.pQueryInfo, &qId, sizeof(qId), &queryInfo, sizeof(queryInfo)); + if (TSDB_CODE_SUCCESS != code) { + qwDestroyQueryInfo(&queryInfo); + if (-2 == code) { + code = TSDB_CODE_SUCCESS; + continue; + } + + return TSDB_CODE_OUT_OF_MEMORY; + } + + pQuery = (SQWQueryInfo*)taosHashGet(gQueryMgmt.pQueryInfo, &qId, sizeof(qId)); + } + + code = taosHashPut(pQuery->pSessions, ppSession, POINTER_BYTES, NULL, 0); + if (TSDB_CODE_SUCCESS != code) { + qError("fail to put session into query session hash, errno:%d", terrno); + return terrno; + } + + break; + } + + QW_ERR_RET(taosMemPoolInitSession(gQueryMgmt.memPoolHandle, qId, ppSession, pQuery->pCollection)); + + return code; +} + +int32_t qwInitQueryPool(void) { int64_t memSize = 0; int32_t code = taosGetSysAvailMemory(&memSize); if (TSDB_CODE_SUCCESS != code) { - return; + return TAOS_SYSTEM_ERROR(errno); } SMemPoolCfg cfg = {0}; code = qwGetMemPoolMaxMemSize(memSize, &cfg.maxSize); if (TSDB_CODE_SUCCESS != code) { - return; + return code; } cfg.threadNum = 10; //TODO cfg.evicPolicy = E_EVICT_AUTO; //TODO + cfg.collectionQuota = &tsSingleQueryMaxMemorySize; cfg.cb.setSessFp = qwSetConcurrentTaskNum; cfg.cb.decSessFp = qwDecConcurrentTaskNum; cfg.cb.incSessFp = qwIncConcurrentTaskNum; code = qwGetMemPoolChunkSize(cfg.maxSize, cfg.threadNum, &cfg.chunkSize); if (TSDB_CODE_SUCCESS != code) { - return; + return code; } code = taosMemPoolOpen(QW_QUERY_MEM_POOL_NAME, &cfg, &gQueryMgmt.memPoolHandle); if (TSDB_CODE_SUCCESS != code) { - return; + return code; } + + gQueryMgmt.pQueryInfo = taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK); + if (NULL == gQueryMgmt.pQueryInfo) { + qError("init query hash failed"); + return TSDB_CODE_OUT_OF_MEMORY; + } + + return code; } diff --git a/source/libs/qworker/src/qwUtil.c b/source/libs/qworker/src/qwUtil.c index da69bcf95d..938cf372c8 100644 --- a/source/libs/qworker/src/qwUtil.c +++ b/source/libs/qworker/src/qwUtil.c @@ -672,3 +672,8 @@ void qwClearExpiredSch(SQWorker *mgmt, SArray *pExpiredSch) { qwReleaseScheduler(QW_WRITE, mgmt); } } + +void qwDestroyQueryInfo(SQWQueryInfo* pQuery) { + //TODO +} + diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index a7a2e7f6aa..d30b32d370 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -734,7 +734,7 @@ int32_t qwPreprocessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) { ctx->phase = -1; if (NULL != gQueryMgmt.memPoolHandle) { - QW_ERR_JRET(taosMemPoolInitSession(gQueryMgmt.memPoolHandle, &ctx->memPoolSession)); + QW_ERR_JRET(qwInitSession(qId, &ctx->memPoolSession)); } QW_ERR_JRET(qwAddTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_INIT)); @@ -1297,13 +1297,16 @@ _return: } int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, void **qWorkerMgmt, const SMsgCb *pMsgCb) { + int32_t code = TSDB_CODE_SUCCESS; if (NULL == qWorkerMgmt || (pMsgCb && pMsgCb->mgmt == NULL)) { qError("invalid param to init qworker"); QW_RET(TSDB_CODE_QRY_INVALID_INPUT); } - taosThreadOnce(&gQueryPoolInit, qwInitQueryPool); - + if (NULL == gQueryMgmt.memPoolHandle) { + QW_ERR_RET(qwInitQueryPool()); + } + int32_t qwNum = atomic_add_fetch_32(&gQwMgmt.qwNum, 1); if (1 == qwNum) { memset(gQwMgmt.param, 0, sizeof(gQwMgmt.param)); diff --git a/source/util/inc/tmempoolInt.h b/source/util/inc/tmempoolInt.h index d55fc819fd..f34d2d28b0 100755 --- a/source/util/inc/tmempoolInt.h +++ b/source/util/inc/tmempoolInt.h @@ -31,6 +31,9 @@ extern "C" { #define MP_MAX_KEEP_FREE_CHUNK_NUM 1000 #define MP_MAX_MALLOC_MEM_SIZE 0xFFFFFFFFFF +#define MP_RETIRE_THRESHOLD_PERCENT (0.9) +#define MP_RETIRE_UNIT_PERCENT (0.1) + // FLAGS AREA #define MP_CHUNK_FLAG_IN_USE (1 << 0) @@ -186,11 +189,16 @@ typedef struct SMPStatInfo { typedef struct SMPSession { SMPListNode list; + + int64_t sessionId; + SMPCollection* pCollection; + bool needRetire; SMPCtrlInfo ctrlInfo; int64_t allocChunkNum; int64_t allocChunkMemSize; int64_t allocMemSize; + int64_t maxAllocMemSize; int64_t reUseChunkNum; int32_t srcChunkNum; @@ -222,12 +230,20 @@ typedef struct SMPCacheGroupInfo { void *pIdleList; } SMPCacheGroupInfo; +typedef struct SMPCollection { + int64_t collectionId; + int64_t allocMemSize; + int64_t maxAllocMemSize; + SMPStatInfo stat; +} SMPCollection; typedef struct SMemPool { char *name; int16_t slotId; SMemPoolCfg cfg; + int64_t memRetireThreshold; + int64_t memRetireUnit; int32_t maxChunkNum; SMPCtrlInfo ctrlInfo; @@ -238,6 +254,7 @@ typedef struct SMemPool { int64_t allocNSChunkNum; int64_t allocNSChunkSize; int64_t allocMemSize; + int64_t maxAllocMemSize; SMPCacheGroupInfo chunkCache; SMPCacheGroupInfo NSChunkCache; @@ -258,11 +275,17 @@ typedef struct SMemPool { SMPStatInfo stat; } SMemPool; +typedef enum EMPMemStrategy { + E_MP_STRATEGY_DIRECT = 1, + E_MP_STRATEGY_CHUNK, +} EMPMemStrategy; + typedef struct SMemPoolMgmt { - SArray* poolList; - TdThreadMutex poolMutex; - TdThread poolMgmtThread; - int32_t code; + EMPMemStrategy strategy; + SArray* poolList; + TdThreadMutex poolMutex; + TdThread poolMgmtThread; + int32_t code; } SMemPoolMgmt; #define MP_GET_FLAG(st, f) ((st) & (f)) diff --git a/source/util/src/tmempool.c b/source/util/src/tmempool.c index 04e48e86dc..275a367d07 100644 --- a/source/util/src/tmempool.c +++ b/source/util/src/tmempool.c @@ -19,9 +19,7 @@ #include "tlog.h" #include "tutil.h" -static SArray* gMPoolList = NULL; static TdThreadOnce gMPoolInit = PTHREAD_ONCE_INIT; -static TdThreadMutex gMPoolMutex; threadlocal void* threadPoolHandle = NULL; threadlocal void* threadPoolSession = NULL; SMemPoolMgmt gMPMgmt; @@ -64,7 +62,7 @@ int32_t memPoolAddCacheGroup(SMemPool* pPool, SMPCacheGroupInfo* pInfo, SMPCache } pGrp->nodesNum = pInfo->groupNum; - pGrp->pNodes = taosMemoryCalloc(pGrp->nodesNum, pInfo->nodeSize); + pGrp->pNodes = taosMemCalloc(pGrp->nodesNum, pInfo->nodeSize); if (NULL == pGrp->pNodes) { uError("calloc %d %d nodes in cache group failed", pGrp->nodesNum, pInfo->nodeSize); MP_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); @@ -124,7 +122,7 @@ int32_t memPoolNewChunk(SMemPool* pPool, SMPChunk** ppChunk) { pChunk->pMemStart = taosMemMalloc(pPool->cfg.chunkSize); if (NULL == pChunk->pMemStart) { - uError("add new chunk, memory malloc %d failed", pPool->cfg.chunkSize); + uError("add new chunk, memory malloc %d failed since %s", pPool->cfg.chunkSize, strerror(errno)); return TSDB_CODE_OUT_OF_MEMORY; } @@ -143,7 +141,7 @@ int32_t memPoolNewNSChunk(SMemPool* pPool, SMPNSChunk** ppChunk, int64_t chunkSi pChunk->pMemStart = taosMemMalloc(chunkSize); if (NULL == pChunk->pMemStart) { - uError("add new chunk, memory malloc %" PRId64 " failed", chunkSize); + uError("add new NS chunk, memory malloc %" PRId64 " failed since %s", chunkSize, strerror(errno)); return TSDB_CODE_OUT_OF_MEMORY; } @@ -200,6 +198,8 @@ int32_t memPoolInit(SMemPool* pPool, char* poolName, SMemPoolCfg* cfg) { MP_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); } + pPool->memRetireThreshold = pPool->cfg.maxSize * MP_RETIRE_THRESHOLD_PERCENT; + pPool->memRetireUnit = pPool->cfg.maxSize * MP_RETIRE_UNIT_PERCENT; pPool->maxChunkNum = cfg->maxSize / cfg->chunkSize; if (pPool->maxChunkNum <= 0) { uError("invalid memory pool max chunk num, maxSize:%" PRId64 ", chunkSize:%d", cfg->maxSize, cfg->chunkSize); @@ -293,7 +293,7 @@ void* memPoolAllocFromChunk(SMemPool* pPool, SMPSession* pSession, int64_t size, pSession->allocChunkNum++; pSession->allocChunkMemSize += pPool->cfg.chunkSize; - pSession->allocMemSize += totalSize; + memPoolAddSessionAllocMemSize(pSession, totalSize); MP_ADD_TO_CHUNK_LIST(pSession->srcChunkHead, pSession->srcChunkTail, pSession->srcChunkNum, pChunk); MP_ADD_TO_CHUNK_LIST(pSession->inUseChunkHead, pSession->inUseChunkTail, pSession->inUseChunkNum, pChunk); @@ -336,7 +336,7 @@ void* memPoolAllocFromNSChunk(SMemPool* pPool, SMPSession* pSession, int64_t siz pSession->allocChunkNum++; pSession->allocChunkMemSize += totalSize; - pSession->allocMemSize += totalSize; + memPoolAddSessionAllocMemSize(pSession, totalSize); if (NULL == pSession->inUseNSChunkHead) { pSession->inUseNSChunkHead = pChunk; @@ -350,17 +350,86 @@ _return: return pRes; } -int64_t memPoolGetMemorySizeImpl(SMemPool* pPool, SMPSession* pSession, void *ptr) { - SMPMemHeader* pHeader = (SMPMemHeader*)ptr - 1; +void memPoolUpdateMaxAllocMemSize(int64_t* pMaxAllocMemSize, int64_t newSize) { + int64_t maxAllocMemSize = atomic_load_64(pMaxAllocMemSize); + while (true) { + if (newSize <= maxAllocMemSize) { + break; + } + + if (maxAllocMemSize == atomic_val_compare_exchange_64(pMaxAllocMemSize, maxAllocMemSize, newSize)) { + break; + } - return pHeader->size; + maxAllocMemSize = atomic_load_64(pMaxAllocMemSize); + } +} + +void memPoolUpdateAllocMemSize(SMemPool* pPool, SMPSession* pSession, int64_t size) { + int64_t allocMemSize = atomic_add_fetch_64(&pSession->allocMemSize, size); + memPoolUpdateMaxAllocMemSize(&pSession->maxAllocMemSize, allocMemSize); + + allocMemSize = atomic_add_fetch_64(&pSession->pCollection->allocMemSize, size); + memPoolUpdateMaxAllocMemSize(&pSession->pCollection->maxAllocMemSize, allocMemSize); + + allocMemSize = atomic_add_fetch_64(&pPool->allocMemSize, size); + memPoolUpdateMaxAllocMemSize(&pPool->maxAllocMemSize, allocMemSize); +} + +bool memPoolMemQuotaOverflow(SMemPool* pPool, SMPSession* pSession, int64_t size) { + if (pPool->cfg.collectionQuota > 0 && (atomic_load_64(&pSession->pCollection->allocMemSize) + size) > pPool->cfg.collectionQuota) { + return true; + } + if ((atomic_load_64(&pPool->allocMemSize) + size) >= pPool->memRetireThreshold) { + return (*pPool->cfg.cb.retireFp)(pSession->pCollection->collectionId, pPool->memRetireUnit); + } + + return false; +} + +void* memPoolAllocDirect(SMemPool* pPool, SMPSession* pSession, int64_t size, uint32_t alignment) { + if (memPoolMemQuotaOverflow(pPool, pSession, size)) { + uInfo("session needs to retire memory"); + return NULL; + } + + void* res = alignment ? taosMemMallocAlign(alignment, size) : taosMemMalloc(size); + if (NULL != res) { + memPoolUpdateAllocMemSize(pPool, pSession, size); + } + + return res; +} + +int64_t memPoolGetMemorySizeImpl(SMemPool* pPool, SMPSession* pSession, void *ptr) { + switch (gMPMgmt.strategy) { + case E_MP_STRATEGY_DIRECT: + return taosMemSize(ptr); + case E_MP_STRATEGY_CHUNK: { + SMPMemHeader* pHeader = (SMPMemHeader*)ptr - 1; + return pHeader->size; + } + default: + break; + } + + return 0; } void *memPoolMallocImpl(SMemPool* pPool, SMPSession* pSession, int64_t size, uint32_t alignment) { int32_t code = TSDB_CODE_SUCCESS; void *res = NULL; - - res = (size > pPool->cfg.chunkSize) ? memPoolAllocFromNSChunk(pPool, pSession, size, alignment) : memPoolAllocFromChunk(pPool, pSession, size, alignment); + + switch (gMPMgmt.strategy) { + case E_MP_STRATEGY_DIRECT: + res = memPoolAllocDirect(pPool, pSession, size, alignment); + break; + case E_MP_STRATEGY_CHUNK: + res = (size > pPool->cfg.chunkSize) ? memPoolAllocFromNSChunk(pPool, pSession, size, alignment) : memPoolAllocFromChunk(pPool, pSession, size, alignment); + break; + default: + break; + } _return: @@ -391,8 +460,25 @@ void memPoolFreeImpl(SMemPool* pPool, SMPSession* pSession, void *ptr, int64_t* return; } - if (origSize) { - *origSize = memPoolGetMemorySizeImpl(pPool, pSession, ptr); + switch (gMPMgmt.strategy) { + case E_MP_STRATEGY_DIRECT: { + int64_t oSize = taosMemSize(ptr); + if (origSize) { + *origSize = oSize; + } + taosMemFree(ptr); + + atomic_sub_fetch_64(&pSession->allocMemSize, oSize); + atomic_sub_fetch_64(&pPool->allocMemSize, oSize); + break; + } + case E_MP_STRATEGY_CHUNK: + if (origSize) { + *origSize = memPoolGetMemorySizeImpl(pPool, pSession, ptr); + } + break; + default: + break; } return; @@ -413,21 +499,36 @@ void *memPoolReallocImpl(SMemPool* pPool, SMPSession* pSession, void *ptr, int64 } *origSize = memPoolGetMemorySizeImpl(pPool, pSession, ptr); - - if (*origSize >= size) { - SMPMemHeader* pHeader = (SMPMemHeader*)((char*)ptr - sizeof(SMPMemHeader)); - pHeader->size = size; - return ptr; + + switch (gMPMgmt.strategy) { + case E_MP_STRATEGY_DIRECT: { + res = taosMemRealloc(ptr, size); + if (NULL != res) { + memPoolUpdateAllocMemSize(pPool, pSession, size - *origSize); + } + return res; + } + case E_MP_STRATEGY_CHUNK: { + if (*origSize >= size) { + SMPMemHeader* pHeader = (SMPMemHeader*)((char*)ptr - sizeof(SMPMemHeader)); + pHeader->size = size; + return ptr; + } + + res = memPoolMallocImpl(pPool, pSession, size, 0); + SMPMemHeader* pOrigHeader = (SMPMemHeader*)((char*)ptr - sizeof(SMPMemHeader)); + SMPMemHeader* pNewHeader = (SMPMemHeader*)((char*)res - sizeof(SMPMemHeader)); + + memcpy(res, ptr, *origSize); + memPoolFreeImpl(pPool, pSession, ptr, NULL); + + return res; + } + default: + break; } - res = memPoolMallocImpl(pPool, pSession, size, 0); - SMPMemHeader* pOrigHeader = (SMPMemHeader*)((char*)ptr - sizeof(SMPMemHeader)); - SMPMemHeader* pNewHeader = (SMPMemHeader*)((char*)res - sizeof(SMPMemHeader)); - - memcpy(res, ptr, *origSize); - memPoolFreeImpl(pPool, pSession, ptr, NULL); - - return res; + return NULL; } void memPoolPrintStatDetail(SMPCtrlInfo* pCtrl, SMPStatDetail* pDetail, char* detailName) { @@ -637,7 +738,8 @@ void memPoolLogStat(SMemPool* pPool, SMPSession* pSession, EMPStatLogItem item, } void* memPoolMgmtThreadFunc(void* param) { - + //TODO + return NULL; } void taosMemPoolModInit(void) { @@ -648,6 +750,8 @@ void taosMemPoolModInit(void) { gMPMgmt.code = TSDB_CODE_OUT_OF_MEMORY; return; } + + gMPMgmt.strategy = E_MP_STRATEGY_DIRECT; TdThreadAttr thAttr; taosThreadAttrInit(&thAttr); @@ -666,7 +770,7 @@ int32_t taosMemPoolOpen(char* poolName, SMemPoolCfg* cfg, void** poolHandle) { SMemPool* pPool = NULL; taosThreadOnce(&gMPoolInit, taosMemPoolModInit); - if (NULL == gMPoolList) { + if (NULL == gMPMgmt.poolList) { uError("init memory pool failed"); MP_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY); } @@ -679,12 +783,12 @@ int32_t taosMemPoolOpen(char* poolName, SMemPoolCfg* cfg, void** poolHandle) { MP_ERR_JRET(memPoolInit(pPool, poolName, cfg)); - taosThreadMutexLock(&gMPoolMutex); + taosThreadMutexLock(&gMPMgmt.poolMutex); - taosArrayPush(gMPoolList, &pPool); - pPool->slotId = taosArrayGetSize(gMPoolList) - 1; + taosArrayPush(gMPMgmt.poolList, &pPool); + pPool->slotId = taosArrayGetSize(gMPMgmt.poolList) - 1; - taosThreadMutexUnlock(&gMPoolMutex); + taosThreadMutexUnlock(&gMPMgmt.poolMutex); _return: @@ -708,7 +812,7 @@ void taosMemPoolDestroySession(void* poolHandle, void* session) { atomic_add_fetch_64(&pPool->stat.statSession.destroyNum, 1); } -int32_t taosMemPoolInitSession(void* poolHandle, void** ppSession) { +int32_t taosMemPoolInitSession(void* poolHandle, void** ppSession, void* pCollection) { int32_t code = TSDB_CODE_SUCCESS; SMemPool* pPool = (SMemPool*)poolHandle; SMPSession* pSession = NULL; @@ -726,6 +830,8 @@ int32_t taosMemPoolInitSession(void* poolHandle, void** ppSession) { MP_ADD_TO_CHUNK_LIST(pSession->srcChunkHead, pSession->srcChunkTail, pSession->srcChunkNum, pChunk); MP_ADD_TO_CHUNK_LIST(pSession->inUseChunkHead, pSession->inUseChunkTail, pSession->inUseChunkNum, pChunk); + pSession->pCollection = (SMPCollection*)pCollection; + _return: if (TSDB_CODE_SUCCESS != code) { @@ -933,5 +1039,17 @@ void taosMemPoolTrim(void* poolHandle, void* session, int32_t size, char* fil } +int32_t taosMemPoolCallocCollection(uint64_t collectionId, void** ppCollection) { + *ppCollection = taosMemCalloc(1, sizeof(SMPCollection)) + if (NULL == *ppCollection) { + uError("calloc collection failed"); + return TSDB_CODE_OUT_OF_MEMORY; + } + + SMPCollection* pCollection = (SMPCollection*)*ppCollection; + pCollection->collectionId = collectionId; + + return TSDB_CODE_SUCCESS; +}