diff --git a/include/os/osMemPool.h b/include/os/osMemPool.h index a126f01481..8771f29d0c 100644 --- a/include/os/osMemPool.h +++ b/include/os/osMemPool.h @@ -31,27 +31,26 @@ typedef enum MemPoolEvictPolicy { E_EVICT_MAX_VALUE, // no used } MemPoolEvictPolicy; -typedef enum MemPoolUsageLevel { - E_MEM_USAGE_LOW = 0, - E_MEM_USAGE_MIDDLE, - E_MEM_USAGE_HIGH, - E_MEM_USAGE_EXTRAME, - E_MEM_USAGE_MAX_VALUE -} MemPoolUsageLevel; +typedef struct SMemPoolJob { + uint64_t jobId; + int64_t allocMemSize; + int64_t maxAllocMemSize; +} SMemPoolJob; + typedef void (*mpDecConcSessionNum)(void); typedef void (*mpIncConcSessionNum)(void); typedef void (*mpSetConcSessionNum)(int32_t); -typedef void (*mpRetireCollections)(int64_t, bool); -typedef void (*mpRetireCollection)(uint64_t); +typedef void (*mpRetireJobs)(int64_t, bool, int32_t); +typedef void (*mpRetireJob)(SMemPoolJob*, int32_t); typedef void (*mpCfgUpdate)(void*, void*); typedef struct SMemPoolCallBack { mpDecConcSessionNum decSessFp; mpIncConcSessionNum incSessFp; mpSetConcSessionNum setSessFp; - mpRetireCollections retiresFp; - mpRetireCollection retireFp; + mpRetireJobs retireJobsFp; + mpRetireJob retireJobFp; mpCfgUpdate cfgUpdateFp; } SMemPoolCallBack; @@ -59,24 +58,13 @@ typedef struct SMemPoolCallBack { typedef struct SMemPoolCfg { bool autoMaxSize; int64_t maxSize; - int64_t sessionExpectSize; - int64_t collectionQuota; + int64_t jobQuota; int32_t chunkSize; int32_t threadNum; - int8_t usageLevel[E_MEM_USAGE_MAX_VALUE]; MemPoolEvictPolicy evicPolicy; SMemPoolCallBack cb; } SMemPoolCfg; -typedef struct SMemPoolCollection { - uint64_t collectionId; - int64_t allocMemSize; - int64_t maxAllocMemSize; -} SMemPoolCollection; - - - -void taosMemPoolModInit(void); int32_t taosMemPoolOpen(char* poolName, SMemPoolCfg* cfg, void** poolHandle); void *taosMemPoolMalloc(void* poolHandle, void* session, int64_t size, char* fileName, int32_t lineNo); void *taosMemPoolCalloc(void* poolHandle, void* session, int64_t num, int64_t size, char* fileName, int32_t lineNo); @@ -89,9 +77,9 @@ void *taosMemPoolMallocAlign(void* poolHandle, void* session, uint32_t alignme void taosMemPoolClose(void* poolHandle); void taosMemPoolModDestroy(void); void taosAutoMemoryFree(void *ptr); -int32_t taosMemPoolInitSession(void* poolHandle, void** ppSession, void* pCollection); +int32_t taosMemPoolInitSession(void* poolHandle, void** ppSession, void* pJob); void taosMemPoolDestroySession(void* poolHandle, void* session); -int32_t taosMemPoolCallocCollection(uint64_t collectionId, void** ppCollection); +int32_t taosMemPoolCallocJob(uint64_t jobId, void** ppJob); void taosMemPoolCfgUpdate(void* poolHandle, SMemPoolCfg* pCfg); #define taosMemPoolFreeClear(ptr) \ diff --git a/include/util/theap.h b/include/util/theap.h index b795db6aea..abef2ab391 100644 --- a/include/util/theap.h +++ b/include/util/theap.h @@ -87,6 +87,8 @@ BoundedQueue* createBoundedQueue(uint32_t maxSize, pq_comp_fn fn, FDelete delete void taosBQSetFn(BoundedQueue* q, pq_comp_fn fn); +void taosBQClear(BoundedQueue* q); + void destroyBoundedQueue(BoundedQueue* q); /* diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 2a4e7e51af..78e21d584f 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -672,7 +672,7 @@ static bool filterWindowWithLimit(SIntervalAggOperatorInfo* pOperatorInfo, STime if (pOperatorInfo->limit == 0) return true; if (pOperatorInfo->pBQ == NULL) { - pOperatorInfo->pBQ = createBoundedQueue(pOperatorInfo->limit - 1, tsKeyCompFn, NULL, pOperatorInfo); + pOperatorInfo->pBQ = createBoundedQueue(pOperatorInfo->limit - 1, tsKeyCompFn, taosAutoMemoryFree, pOperatorInfo); } bool shouldFilter = false; diff --git a/source/libs/qworker/inc/qwInt.h b/source/libs/qworker/inc/qwInt.h index 1a6056b344..3e22839db9 100644 --- a/source/libs/qworker/inc/qwInt.h +++ b/source/libs/qworker/inc/qwInt.h @@ -44,6 +44,7 @@ extern "C" { #define QW_DEFAULT_RESERVE_MEM_PERCENT 20 #define QW_MIN_RESERVE_MEM_SIZE (512 * 1048576UL) #define QW_MIN_MEM_POOL_SIZE (1048576UL) +#define QW_MAX_RETIRE_JOB_NUM 10000 #define QW_DEFAULT_THREAD_TASK_NUM 3 @@ -129,6 +130,15 @@ typedef struct SQWTaskStatus { int8_t status; } SQWTaskStatus; + +typedef struct SQWJobInfo { + int8_t retired; + int32_t errCode; + SMemPoolJob* memInfo; + SHashObj* pSessions; +} SQWJobInfo; + + typedef struct SQWTaskCtx { SRWLatch lock; int8_t phase; @@ -166,6 +176,7 @@ typedef struct SQWTaskCtx { SArray *tbInfo; // STbVerInfo void *memPoolSession; + SQWJobInfo *pJobInfo; } SQWTaskCtx; typedef struct SQWSchStatus { @@ -232,31 +243,15 @@ typedef struct SQWorkerMgmt { int32_t paramIdx; } SQWorkerMgmt; -typedef struct SQWQueryInfo { - int8_t retired; - SMemPoolCollection* pCollection; - SHashObj* pSessions; -} SQWQueryInfo; - -typedef struct SQWRetireLowCtx { - int8_t retireBegin; -} SQWRetireLowCtx; - -typedef struct SQWRetireMidCtx { - int8_t retireBegin; - TdThreadCond retired; - BoundedQueue* collectionQueue; -} SQWRetireMidCtx; typedef struct SQWRetireCtx { - SQWRetireLowCtx lowCtx; - SQWRetireMidCtx midCtx; + BoundedQueue* pJobQueue; } SQWRetireCtx; typedef struct SQueryMgmt { SRWLatch taskMgmtLock; int32_t concTaskLevel; - SHashObj* pQueryInfo; + SHashObj* pJobInfo; void* memPoolHandle; SQWRetireCtx retireCtx; } SQueryMgmt; @@ -482,8 +477,10 @@ void qwDbgSimulateSleep(void); void qwDbgSimulateDead(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *rsped); int32_t qwSendExplainResponse(QW_FPARAMS_DEF, SQWTaskCtx *ctx); int32_t qwInitQueryPool(void); -void qwDestroyQueryInfo(SQWQueryInfo* pQuery); -int32_t qwInitSession(QW_FPARAMS_DEF, void** ppSession); +void qwDestroyJobInfo(SQWJobInfo* pJob); +void qwRetireJob(SQWJobInfo* pJob); + +int32_t qwInitSession(QW_FPARAMS_DEF, SQWTaskCtx *ctx, void** ppSession); #ifdef __cplusplus } diff --git a/source/libs/qworker/src/qwMem.c b/source/libs/qworker/src/qwMem.c index b8c466dbca..4673ccadd2 100644 --- a/source/libs/qworker/src/qwMem.c +++ b/source/libs/qworker/src/qwMem.c @@ -15,6 +15,8 @@ int32_t qwGetMemPoolMaxMemSize(int64_t totalSize, int64_t* maxSize) { } int32_t qwGetMemPoolChunkSize(int64_t totalSize, int32_t threadNum, int32_t* chunkSize) { + //TODO + *chunkSize = 2 * 1048576; return TSDB_CODE_SUCCESS; @@ -53,101 +55,159 @@ void qwIncConcurrentTaskNumCb(void) { //TODO } -int32_t qwInitQueryInfo(uint64_t qId, SQWQueryInfo* pQuery) { - pQuery->pSessions= taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK); - if (NULL == pQuery->pSessions) { - qError("fail to init session hash"); - return TSDB_CODE_OUT_OF_MEMORY; +int32_t qwInitJobInfo(uint64_t qId, SQWJobInfo* pJob) { + pJob->pSessions= taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK); + if (NULL == pJob->pSessions) { + qError("fail to init session hash, code: 0x%x", terrno); + return terrno; } - int32_t code = taosMemPoolCallocCollection(qId, (void**)&pQuery->pCollection); + int32_t code = taosMemPoolCallocJob(qId, (void**)&pJob->memInfo); if (TSDB_CODE_SUCCESS != code) { - taosHashCleanup(pQuery->pSessions); + taosHashCleanup(pJob->pSessions); + pJob->pSessions = NULL; return code; } return code; } -int32_t qwInitSession(QW_FPARAMS_DEF, void** ppSession) { +int32_t qwInitSession(QW_FPARAMS_DEF, SQWTaskCtx *ctx, void** ppSession) { int32_t code = TSDB_CODE_SUCCESS; - SQWQueryInfo* pQuery = NULL; + SQWJobInfo* pJob = NULL; while (true) { - pQuery = (SQWQueryInfo*)taosHashGet(gQueryMgmt.pQueryInfo, &qId, sizeof(qId)); - if (NULL == pQuery) { - SQWQueryInfo queryInfo = {0}; - code = qwInitQueryInfo(qId, &queryInfo); + pJob = (SQWJobInfo*)taosHashAcquire(gQueryMgmt.pJobInfo, &qId, sizeof(qId)); + if (NULL == pJob) { + SQWJobInfo jobInfo = {0}; + code = qwInitJobInfo(qId, &jobInfo); if (TSDB_CODE_SUCCESS != code) { return code; } - code = taosHashPut(gQueryMgmt.pQueryInfo, &qId, sizeof(qId), &queryInfo, sizeof(queryInfo)); + code = taosHashPut(gQueryMgmt.pJobInfo, &qId, sizeof(qId), &jobInfo, sizeof(jobInfo)); if (TSDB_CODE_SUCCESS != code) { - qwDestroyQueryInfo(&queryInfo); - if (-2 == code) { + qwDestroyJobInfo(&jobInfo); + if (TSDB_CODE_DUP_KEY == code) { code = TSDB_CODE_SUCCESS; continue; } - return TSDB_CODE_OUT_OF_MEMORY; + return code; } - pQuery = (SQWQueryInfo*)taosHashGet(gQueryMgmt.pQueryInfo, &qId, sizeof(qId)); + pJob = (SQWJobInfo*)taosHashAcquire(gQueryMgmt.pJobInfo, &qId, sizeof(qId)); + if (NULL == pJob) { + qError("QID:0x%" PRIx64 " not in joj hash, may be dropped", qId); + return TSDB_CODE_QRY_JOB_NOT_EXIST; + } } break; } - QW_ERR_RET(taosMemPoolInitSession(gQueryMgmt.memPoolHandle, ppSession, pQuery->pCollection)); + ctx->pJobInfo = pJob; + + QW_ERR_JRET(taosMemPoolInitSession(gQueryMgmt.memPoolHandle, ppSession, pJob->memInfo)); char id[sizeof(tId) + sizeof(eId)] = {0}; QW_SET_TEID(id, tId, eId); - code = taosHashPut(pQuery->pSessions, id, sizeof(id), ppSession, POINTER_BYTES); + code = taosHashPut(pJob->pSessions, id, sizeof(id), ppSession, POINTER_BYTES); if (TSDB_CODE_SUCCESS != code) { - qError("fail to put session into query session hash, errno:%d", terrno); - return terrno; + qError("fail to put session into query session hash, code: 0x%x", code); + QW_ERR_JRET(code); + } + +_return: + + if (NULL != pJob) { + taosHashRelease(gQueryMgmt.pJobInfo, pJob); } return code; } -void qwRetireCollectionCb(uint64_t collectionId) { +void qwRetireJobCb(SMemPoolJob* mpJob, int32_t errCode) { + SQWJobInfo* pJob = (SQWJobInfo*)taosHashGet(gQueryMgmt.pJobInfo, &mpJob->jobId, sizeof(mpJob->jobId)); + if (NULL == pJob) { + qError("QID:0x%" PRIx64 " fail to get job from job hash", mpJob->jobId); + return; + } + if (0 == atomic_val_compare_exchange_32(&pJob->errCode, 0, errCode) && 0 == atomic_val_compare_exchange_8(&pJob->retired, 0, 1)) { + qwRetireJob(pJob); + + qInfo("QID:0x%" PRIx64 " retired directly, errCode: 0x%x", mpJob->jobId, errCode); + } else { + qDebug("QID:0x%" PRIx64 " already retired, retired: %d, errCode: 0x%x", mpJob->jobId, atomic_load_8(&pJob->retired), atomic_load_32(&pJob->errCode)); + } } -void qwLowLevelRetire(int64_t retireSize) { - SQWQueryInfo* pQuery = (SQWQueryInfo*)taosHashIterate(gQueryMgmt.pQueryInfo, NULL); - while (pQuery) { - int64_t aSize = atomic_load_64(&pQuery->pCollection->allocMemSize); - if (aSize >= retireSize) { - atomic_store_8(&pQuery->retired, 1); - - //TODO RETIRE JOB/TASKS DIRECTLY +void qwLowLevelRetire(int64_t retireSize, int32_t errCode) { + SQWJobInfo* pJob = (SQWJobInfo*)taosHashIterate(gQueryMgmt.pJobInfo, NULL); + while (pJob) { + int64_t aSize = atomic_load_64(&pJob->memInfo->allocMemSize); + if (aSize >= retireSize && 0 == atomic_val_compare_exchange_32(&pJob->errCode, 0, errCode) && 0 == atomic_val_compare_exchange_8(&pJob->retired, 0, 1)) { + qwRetireJob(pJob); qDebug("QID:0x%" PRIx64 " job retired cause of low level memory retire, usedSize:%" PRId64 ", retireSize:%" PRId64, - pQuery->pCollection->collectionId, aSize, retireSize); + pJob->memInfo->jobId, aSize, retireSize); - taosHashCancelIterate(gQueryMgmt.pQueryInfo, pQuery); + taosHashCancelIterate(gQueryMgmt.pJobInfo, pJob); break; } - pQuery = (SQWQueryInfo*)taosHashIterate(gQueryMgmt.pQueryInfo, pQuery); + pJob = (SQWJobInfo*)taosHashIterate(gQueryMgmt.pJobInfo, pJob); } } -void qwMidLevelRetire(int64_t retireSize) { +void qwMidLevelRetire(int64_t retireSize, int32_t errCode) { + SQWJobInfo* pJob = (SQWJobInfo*)taosHashIterate(gQueryMgmt.pJobInfo, NULL); + PriorityQueueNode qNode; + while (NULL != pJob) { + if (0 == atomic_load_8(&pJob->retired)) { + qNode.data = pJob; + (void)taosBQPush(gQueryMgmt.retireCtx.pJobQueue, &qNode); + } + + pJob = (SQWJobInfo*)taosHashIterate(gQueryMgmt.pJobInfo, pJob); + } + PriorityQueueNode* pNode = NULL; + int64_t retiredSize = 0; + while (retiredSize < retireSize) { + pNode = taosBQTop(gQueryMgmt.retireCtx.pJobQueue); + if (NULL == pNode) { + break; + } + + pJob = (SQWJobInfo*)pNode->data; + if (atomic_load_8(&pJob->retired)) { + taosBQPop(gQueryMgmt.retireCtx.pJobQueue); + continue; + } + + if (0 == atomic_val_compare_exchange_32(&pJob->errCode, 0, errCode) && 0 == atomic_val_compare_exchange_8(&pJob->retired, 0, 1)) { + int64_t aSize = atomic_load_64(&pJob->memInfo->allocMemSize); + + qwRetireJob(pJob); + + qDebug("QID:0x%" PRIx64 " job retired cause of mid level memory retire, usedSize:%" PRId64 ", retireSize:%" PRId64, + pJob->memInfo->jobId, aSize, retireSize); + + retiredSize += aSize; + } + + taosBQPop(gQueryMgmt.retireCtx.pJobQueue); + } + + taosBQClear(gQueryMgmt.retireCtx.pJobQueue); } -void qwRetireCollectionsCb(int64_t retireSize, bool lowLevelRetire) { - if (lowLevelRetire) { - qwLowLevelRetire(retireSize); - } else { - qwMidLevelRetire(retireSize); - } +void qwRetireJobsCb(int64_t retireSize, bool lowLevelRetire, int32_t errCode) { + (lowLevelRetire) ? qwLowLevelRetire(retireSize, errCode) : qwMidLevelRetire(retireSize, errCode); } int32_t qwGetQueryMemPoolMaxSize(int64_t* pMaxSize, bool* autoMaxSize) { @@ -161,8 +221,8 @@ int32_t qwGetQueryMemPoolMaxSize(int64_t* pMaxSize, bool* autoMaxSize) { int64_t memSize = 0; int32_t code = taosGetSysAvailMemory(&memSize); if (TSDB_CODE_SUCCESS != code) { - qError("get system avaiable memory size failed, errno: %d", errno); - return TAOS_SYSTEM_ERROR(errno); + qError("get system avaiable memory size failed, error: 0x%x", code); + return code; } code = qwGetMemPoolMaxMemSize(memSize, pMaxSize); @@ -177,9 +237,9 @@ int32_t qwGetQueryMemPoolMaxSize(int64_t* pMaxSize, bool* autoMaxSize) { void qwCheckUpateCfgCb(void* pHandle, void* cfg) { SMemPoolCfg* pCfg = (SMemPoolCfg*)cfg; - int64_t newCollectionQuota = tsSingleQueryMaxMemorySize * 1048576UL; - if (pCfg->collectionQuota != newCollectionQuota) { - atomic_store_64(&pCfg->collectionQuota, newCollectionQuota); + int64_t newJobQuota = tsSingleQueryMaxMemorySize * 1048576UL; + if (pCfg->jobQuota != newJobQuota) { + atomic_store_64(&pCfg->jobQuota, newJobQuota); } int64_t maxSize = 0; @@ -198,13 +258,28 @@ void qwCheckUpateCfgCb(void* pHandle, void* cfg) { } } +static bool qwJobMemSizeCompFn(void* l, void* r, void* param) { + SQWJobInfo* left = (SQWJobInfo*)l; + SQWJobInfo* right = (SQWJobInfo*)r; + if (atomic_load_8(&right->retired)) { + return true; + } + + return atomic_load_64(&right->memInfo->allocMemSize) < atomic_load_64(&left->memInfo->allocMemSize); +} + + int32_t qwInitQueryPool(void) { int32_t code = TSDB_CODE_SUCCESS; - + +#ifdef LINUX if (!tsQueryUseMemoryPool) { - qDebug("query memory pool disabled"); +#endif + qInfo("query memory pool disabled"); return code; +#ifdef LINUX } +#endif SMemPoolCfg cfg = {0}; int64_t maxSize = 0; @@ -216,12 +291,12 @@ int32_t qwInitQueryPool(void) { cfg.threadNum = 10; //TODO cfg.evicPolicy = E_EVICT_AUTO; //TODO - cfg.collectionQuota = tsSingleQueryMaxMemorySize * 1048576UL; + cfg.jobQuota = tsSingleQueryMaxMemorySize * 1048576UL; cfg.cb.setSessFp = qwSetConcurrentTaskNumCb; cfg.cb.decSessFp = qwDecConcurrentTaskNumCb; cfg.cb.incSessFp = qwIncConcurrentTaskNumCb; - cfg.cb.retiresFp = qwRetireCollectionsCb; - cfg.cb.retireFp = qwRetireCollectionCb; + cfg.cb.retireJobsFp = qwRetireJobsCb; + cfg.cb.retireJobFp = qwRetireJobCb; cfg.cb.cfgUpdateFp = qwCheckUpateCfgCb; code = qwGetMemPoolChunkSize(cfg.maxSize, cfg.threadNum, &cfg.chunkSize); @@ -229,18 +304,24 @@ int32_t qwInitQueryPool(void) { return code; } + gQueryMgmt.pJobInfo = taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK); + if (NULL == gQueryMgmt.pJobInfo) { + qError("init job hash failed, error:0x%x", terrno); + return terrno; + } + + gQueryMgmt.retireCtx.pJobQueue = createBoundedQueue(QW_MAX_RETIRE_JOB_NUM, qwJobMemSizeCompFn, NULL, NULL); + if (NULL == gQueryMgmt.retireCtx.pJobQueue) { + qError("init job bounded queue failed, error:0x%x", terrno); + return terrno; + } + code = taosMemPoolOpen(QW_QUERY_MEM_POOL_NAME, &cfg, &gQueryMgmt.memPoolHandle); if (TSDB_CODE_SUCCESS != code) { return code; } - gQueryMgmt.pQueryInfo = taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK); - if (NULL == gQueryMgmt.pQueryInfo) { - qError("init query hash failed"); - return TSDB_CODE_OUT_OF_MEMORY; - } - - qDebug("query memory pool initialized"); + qInfo("query memory pool initialized"); return code; } diff --git a/source/libs/qworker/src/qwUtil.c b/source/libs/qworker/src/qwUtil.c index 71544171e4..cf5c3cc5ce 100644 --- a/source/libs/qworker/src/qwUtil.c +++ b/source/libs/qworker/src/qwUtil.c @@ -711,7 +711,11 @@ void qwClearExpiredSch(SQWorker *mgmt, SArray *pExpiredSch) { } } -void qwDestroyQueryInfo(SQWQueryInfo* pQuery) { +void qwDestroyJobInfo(SQWJobInfo* pJob) { + //TODO +} + +void qwRetireJob(SQWJobInfo* pJob) { //TODO } diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index e48f894fb4..aa2fcb67ed 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -759,7 +759,7 @@ int32_t qwPreprocessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) { ctx->phase = -1; if (NULL != gQueryMgmt.memPoolHandle) { - QW_ERR_JRET(qwInitSession(QW_FPARAMS(), &ctx->memPoolSession)); + QW_ERR_JRET(qwInitSession(QW_FPARAMS(), ctx, &ctx->memPoolSession)); } QW_ERR_JRET(qwAddTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_INIT)); diff --git a/source/util/inc/tmempoolInt.h b/source/util/inc/tmempoolInt.h index ec00cc896e..77f8e063d1 100755 --- a/source/util/inc/tmempoolInt.h +++ b/source/util/inc/tmempoolInt.h @@ -193,26 +193,15 @@ typedef struct SMPStatInfo { } SMPStatInfo; -typedef struct SMPCollection { - SMemPoolCollection collection; // KEEP IT FIRST +typedef struct SMPJob { + SMemPoolJob job; // KEEP IT FIRST + int32_t remainSession; SMPStatInfo stat; -} SMPCollection; - - -typedef struct SMPSession { - SMPListNode list; - - int64_t sessionId; - - void* pCollection; - SMPCollection* pMpCollection; - bool needRetire; - SMPCtrlInfo ctrlInfo; +} SMPJob; +typedef struct SMPSessionChunk { int64_t allocChunkNum; int64_t allocChunkMemSize; - int64_t allocMemSize; - int64_t maxAllocMemSize; int64_t reUseChunkNum; int32_t srcChunkNum; @@ -231,6 +220,18 @@ typedef struct SMPSession { SMPNSChunk *reUseNSChunkHead; SMPNSChunk *reUseNSChunkTail; +} SMPSessionChunk; + +typedef struct SMPSession { + SMPListNode list; + + int64_t sessionId; + SMPJob* pJob; + SMPCtrlInfo ctrlInfo; + int64_t allocMemSize; + int64_t maxAllocMemSize; + + SMPSessionChunk chunk; SMPStatInfo stat; } SMPSession; @@ -240,32 +241,20 @@ typedef struct SMPCacheGroupInfo { int64_t allocNum; int32_t groupNum; SMPCacheGroup *pGrpHead; - SMPCacheGroup *pGrpTail; void *pIdleList; } SMPCacheGroupInfo; - -typedef struct SMemPool { - char *name; - int16_t slotId; - SMemPoolCfg cfg; - int64_t memRetireThreshold[3]; - int64_t memRetireUnit; +typedef struct SMPChunkMgmt { int32_t maxChunkNum; - SMPCtrlInfo ctrlInfo; - int16_t maxDiscardSize; double threadChunkReserveNum; int64_t allocChunkNum; int64_t allocChunkSize; int64_t allocNSChunkNum; int64_t allocNSChunkSize; - int64_t allocMemSize; - int64_t maxAllocMemSize; SMPCacheGroupInfo chunkCache; SMPCacheGroupInfo NSChunkCache; - SMPCacheGroupInfo sessionCache; int32_t readyChunkNum; int32_t readyChunkReserveNum; @@ -275,11 +264,28 @@ typedef struct SMemPool { SMPChunk *readyChunkHead; SMPChunk *readyChunkTail; - int64_t readyNSChunkNum; - SMPChunk *readyNSChunkHead; - SMPChunk *readyNSChunkTail; + int64_t readyNSChunkNum; + SMPChunk *readyNSChunkHead; + SMPChunk *readyNSChunkTail; +} SMPChunkMgmt; - SMPStatInfo stat; + +typedef struct SMemPool { + char *name; + int16_t slotId; + SMemPoolCfg cfg; + int64_t retireThreshold[3]; + int64_t retireUnit; + SMPCtrlInfo ctrlInfo; + + int64_t maxAllocMemSize; + int64_t allocMemSize; + + SMPCacheGroupInfo sessionCache; + + SMPChunkMgmt chunk; + + SMPStatInfo stat; } SMemPool; typedef enum EMPMemStrategy { @@ -305,6 +311,24 @@ typedef struct SMemPoolMgmt { int32_t code; } SMemPoolMgmt; +typedef int32_t (*mpAllocFunc)(SMemPool*, SMPSession*, int64_t, uint32_t, void**); +typedef void (*mpFreeFunc)(SMemPool*, SMPSession*, void *, int64_t*); +typedef int64_t (*mpGetSizeFunc)(SMemPool*, SMPSession*, void*); +typedef int32_t (*mpReallocFunc)(SMemPool*, SMPSession*, void **, int64_t, int64_t*); +typedef int32_t (*mpInitSessionFunc)(SMemPool*, SMPSession*); +typedef int32_t (*mpInitFunc)(SMemPool*, char*, SMemPoolCfg*); +typedef int32_t (*mpUpdateCfgFunc)(SMemPool*); + +typedef struct SMPStrategyFp { + mpInitFunc initFp; + mpAllocFunc allocFp; + mpFreeFunc freeFp; + mpGetSizeFunc getSizeFp; + mpReallocFunc reallocFp; + mpInitSessionFunc initSessionFp; + mpUpdateCfgFunc updateCfgFp; +} SMPStrategyFp; + #define MP_GET_FLAG(st, f) ((st) & (f)) #define MP_SET_FLAG(st, f) (st) |= (f) #define MP_CLR_FLAG(st, f) (st) &= (~f) @@ -403,6 +427,28 @@ enum { } \ } while (0) +// direct +int32_t mpDirectAlloc(SMemPool* pPool, SMPSession* pSession, int64_t size, uint32_t alignment, void** ppRes); +int64_t mpDirectGetMemSize(SMemPool* pPool, SMPSession* pSession, void *ptr); +void mpDirectFree(SMemPool* pPool, SMPSession* pSession, void *ptr, int64_t* origSize); +int32_t mpDirectRealloc(SMemPool* pPool, SMPSession* pSession, void **pPtr, int64_t size, int64_t* origSize); + +// chunk +int32_t mpChunkInit(SMemPool* pPool, char* poolName, SMemPoolCfg* cfg); +int64_t mpChunkGetMemSize(SMemPool* pPool, SMPSession* pSession, void *ptr); +int32_t mpChunkAlloc(SMemPool* pPool, SMPSession* pSession, int64_t size, uint32_t alignment, void** ppRes); +void mpChunkFree(SMemPool* pPool, SMPSession* pSession, void *ptr, int64_t* origSize); +int32_t mpChunkRealloc(SMemPool* pPool, SMPSession* pSession, void **pPtr, int64_t size, int64_t* origSize); +int32_t mpChunkInitSession(SMemPool* pPool, SMPSession* pSession); +int32_t mpChunkUpdateCfg(SMemPool* pPool); + + +int32_t mpPopIdleNode(SMemPool* pPool, SMPCacheGroupInfo* pInfo, void** ppRes); +int32_t mpChkQuotaOverflow(SMemPool* pPool, SMPSession* pSession, int64_t size); +void mpUpdateAllocSize(SMemPool* pPool, SMPSession* pSession, int64_t size); +int32_t mpAddCacheGroup(SMemPool* pPool, SMPCacheGroupInfo* pInfo, SMPCacheGroup* pHead); +int32_t mpMalloc(SMemPool* pPool, SMPSession* pSession, int64_t size, uint32_t alignment, void** ppRes); + #ifdef __cplusplus diff --git a/source/util/src/mpChunk.c b/source/util/src/mpChunk.c new file mode 100755 index 0000000000..53f7aa0206 --- /dev/null +++ b/source/util/src/mpChunk.c @@ -0,0 +1,317 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#define _DEFAULT_SOURCE +#include "osMemPool.h" +#include "tmempoolInt.h" +#include "tlog.h" +#include "tutil.h" + + +int32_t mpChunkNew(SMemPool* pPool, SMPChunk** ppChunk) { + SMPChunk* pChunk = NULL; + MP_ERR_RET(mpPopIdleNode(pPool, &pPool->chunk.chunkCache, (void**)&pChunk)); + + pChunk->pMemStart = taosMemMalloc(pPool->cfg.chunkSize); + if (NULL == pChunk->pMemStart) { + uError("add new chunk, memory malloc %d failed, code: 0x%x", pPool->cfg.chunkSize, terrno); + return terrno; + } + + pPool->chunk.allocChunkNum++; + pPool->chunk.allocChunkSize += pPool->cfg.chunkSize; + + *ppChunk = pChunk; + + return TSDB_CODE_SUCCESS; +} + + +int32_t mpChunkNewNS(SMemPool* pPool, SMPNSChunk** ppChunk, int64_t chunkSize) { + SMPNSChunk* pChunk = NULL; + MP_ERR_RET(mpPopIdleNode(pPool, &pPool->chunk.NSChunkCache, (void**)&pChunk)); + + pChunk->pMemStart = taosMemMalloc(chunkSize); + if (NULL == pChunk->pMemStart) { + uError("add new NS chunk, memory malloc %" PRId64 " failed, code: 0x%x", chunkSize, terrno); + return terrno; + } + + pChunk->memBytes = chunkSize; + MP_SET_FLAG(pChunk->flags, MP_CHUNK_FLAG_NS_CHUNK); + + pPool->chunk.allocNSChunkNum++; + pPool->chunk.allocNSChunkSize += pPool->cfg.chunkSize; + + *ppChunk = pChunk; + + return TSDB_CODE_SUCCESS; +} + + +int32_t mpChunkPrepare(SMemPool* pPool, int32_t num) { + SMPChunk* pChunk = NULL; + for (int32_t i = 0; i < num; ++i) { + MP_ERR_RET(mpChunkNew(pPool, &pChunk)); + + pPool->chunk.readyChunkTail->list.pNext = pChunk; + pPool->chunk.readyChunkTail = pChunk; + + atomic_add_fetch_32(&pPool->chunk.readyChunkNum, 1); + } + + return TSDB_CODE_SUCCESS; +} + + +int32_t mpChunkEnsureCapacity(SMemPool* pPool, SMPChunkMgmt* pChunk) { + if (E_EVICT_ALL == pPool->cfg.evicPolicy) { + return TSDB_CODE_SUCCESS; + } + + int32_t readyMissNum = pChunk->readyChunkReserveNum - atomic_load_32(&pChunk->readyChunkNum); + if (readyMissNum <= 0) { + return TSDB_CODE_SUCCESS; + } + + MP_ERR_RET(mpChunkPrepare(pPool, readyMissNum)); + + return TSDB_CODE_SUCCESS; +} + +void mpChunkNotifyLowNum(SMemPool* pPool) { + +} + +int32_t mpChunkRetrieve(SMemPool* pPool, SMPChunk** ppChunk) { + SMPCacheGroup* pCache = NULL; + SMPChunk* pChunk = NULL; + int32_t readyChunkNum = atomic_sub_fetch_32(&pPool->chunk.readyChunkNum, 1); + if (readyChunkNum >= 0) { + if (atomic_add_fetch_32(&pPool->chunk.readyChunkGotNum, 1) == pPool->chunk.readyChunkLowNum) { + mpChunkNotifyLowNum(pPool); + } + + pChunk = (SMPChunk*)atomic_load_ptr(&pPool->chunk.readyChunkHead->list.pNext); + while (atomic_val_compare_exchange_ptr(&pPool->chunk.readyChunkHead->list.pNext, pChunk, pChunk->list.pNext) != pChunk) { + pChunk = (SMPChunk*)atomic_load_ptr(&pPool->chunk.readyChunkHead->list.pNext); + } + + *ppChunk = pChunk; + + return TSDB_CODE_SUCCESS; + } else { + atomic_add_fetch_32(&pPool->chunk.readyChunkNum, 1); + } + + MP_RET(mpChunkNew(pPool, ppChunk)); +} + +int32_t mpChunkRetrieveFromSession(SMemPool* pPool, SMPSession* pSession, int64_t size, SMPChunk** ppChunk, SMPChunk** ppPreChunk) { + SMPChunk* pChunk = pSession->chunk.srcChunkHead; + while (NULL != pChunk) { + if ((pChunk->offset + size) <= pPool->cfg.chunkSize) { + *ppChunk = pChunk; + break; + } + + *ppPreChunk = pChunk; + pChunk = (SMPChunk*)pChunk->list.pNext; + } + + if (NULL == *ppChunk) { + *ppPreChunk = NULL; + } + + return TSDB_CODE_SUCCESS; +} + +int32_t mpChunkAllocMem(SMemPool* pPool, SMPSession* pSession, int64_t size, uint32_t alignment, void** ppRes) { + int32_t code = TSDB_CODE_SUCCESS; + SMPChunk* pChunk = NULL, *preSrcChunk = NULL; + void* pRes = NULL; + int64_t totalSize = size + sizeof(SMPMemHeader) + sizeof(SMPMemTailer); + + if (pSession->chunk.srcChunkNum > 0) { + MP_ERR_JRET(mpChunkRetrieveFromSession(pPool, pSession, totalSize, &pChunk, &preSrcChunk)); + } + + if (NULL == pChunk) { + MP_ERR_JRET(mpChunkNew(pPool, &pChunk)); + + pSession->chunk.allocChunkNum++; + pSession->chunk.allocChunkMemSize += pPool->cfg.chunkSize; + mpUpdateAllocSize(pPool, pSession, totalSize); + + MP_ADD_TO_CHUNK_LIST(pSession->chunk.srcChunkHead, pSession->chunk.srcChunkTail, pSession->chunk.srcChunkNum, pChunk); + MP_ADD_TO_CHUNK_LIST(pSession->chunk.inUseChunkHead, pSession->chunk.inUseChunkTail, pSession->chunk.inUseChunkNum, pChunk); + } + + SMPMemHeader* pHeader = (SMPMemHeader*)(pChunk->pMemStart + pChunk->offset); + MP_INIT_MEM_HEADER(pHeader, size, false); + + pRes = (void*)(pHeader + 1); + pChunk->offset += totalSize; + + if (pChunk->offset >= (pPool->cfg.chunkSize - pPool->chunk.maxDiscardSize)) { + if (NULL == preSrcChunk) { + pSession->chunk.srcChunkHead = NULL; + pSession->chunk.srcChunkTail = NULL; + } else { + preSrcChunk->list.pNext = pChunk->list.pNext; + } + + pSession->chunk.srcChunkNum--; + } + + +_return: + + *ppRes = pRes; + + return code; +} + +int32_t mpChunkNSAllocMem(SMemPool* pPool, SMPSession* pSession, int64_t size, uint32_t alignment, void** ppRes) { + int32_t code = TSDB_CODE_SUCCESS; + SMPNSChunk* pChunk = NULL; + void* pRes = NULL; + int64_t totalSize = size + sizeof(SMPMemHeader) + sizeof(SMPMemTailer) + alignment; + + MP_ERR_JRET(mpChunkNewNS(pPool, &pChunk, totalSize)); + SMPMemHeader* pHeader = (SMPMemHeader*)pChunk->pMemStart; + MP_INIT_MEM_HEADER(pHeader, size, false); + + pRes = (void*)(pHeader + 1); + + pSession->chunk.allocChunkNum++; + pSession->chunk.allocChunkMemSize += totalSize; + mpUpdateAllocSize(pPool, pSession, totalSize); + + if (NULL == pSession->chunk.inUseNSChunkHead) { + pSession->chunk.inUseNSChunkHead = pChunk; + pSession->chunk.inUseNSChunkTail = pChunk; + } else { + pSession->chunk.inUseNSChunkTail->list.pNext = pChunk; + } + +_return: + + *ppRes = pRes; + + return code; +} + + +int32_t mpChunkInit(SMemPool* pPool, char* poolName, SMemPoolCfg* cfg) { + SMPChunkMgmt* pChunk = &pPool->chunk; + pChunk->threadChunkReserveNum = 1; + + pChunk->chunkCache.nodeSize = sizeof(SMPChunk); + pChunk->NSChunkCache.groupNum = MP_NSCHUNK_CACHE_ALLOC_BATCH_SIZE; + pChunk->NSChunkCache.nodeSize = sizeof(SMPNSChunk); + + MP_ERR_RET(mpAddCacheGroup(pPool, &pChunk->chunkCache, NULL)); + MP_ERR_RET(mpAddCacheGroup(pPool, &pChunk->NSChunkCache, NULL)); + + MP_ERR_RET(mpPopIdleNode(pPool, &pChunk->chunkCache, (void**)&pChunk->readyChunkHead)); + pChunk->readyChunkTail = pChunk->readyChunkHead; + + MP_ERR_RET(mpChunkEnsureCapacity(pPool, pChunk)); + + return TSDB_CODE_SUCCESS; +} + + +int64_t mpChunkGetMemSize(SMemPool* pPool, SMPSession* pSession, void *ptr) { + SMPMemHeader* pHeader = (SMPMemHeader*)ptr - 1; + return pHeader->size; +} + +int32_t mpChunkAlloc(SMemPool* pPool, SMPSession* pSession, int64_t size, uint32_t alignment, void** ppRes) { + MP_RET((size > pPool->cfg.chunkSize) ? mpChunkNSAllocMem(pPool, pSession, size, alignment, ppRes) : mpChunkAllocMem(pPool, pSession, size, alignment, ppRes)); +} + + +void mpChunkFree(SMemPool* pPool, SMPSession* pSession, void *ptr, int64_t* origSize) { + int64_t oSize = mpChunkGetMemSize(pPool, pSession, ptr); + if (origSize) { + *origSize = oSize; + } + + // TODO + + atomic_sub_fetch_64(&pSession->allocMemSize, oSize); + atomic_sub_fetch_64(&pPool->allocMemSize, oSize); +} + + +int32_t mpChunkRealloc(SMemPool* pPool, SMPSession* pSession, void **pPtr, int64_t size, int64_t* origSize) { + int32_t code = TSDB_CODE_SUCCESS; + + if (*origSize >= size) { + SMPMemHeader* pHeader = (SMPMemHeader*)((char*)*pPtr - sizeof(SMPMemHeader)); + pHeader->size = size; + return TSDB_CODE_SUCCESS; + } + + void* res = NULL; + MP_ERR_JRET(mpMalloc(pPool, pSession, size, 0, &res)); + SMPMemHeader* pOrigHeader = (SMPMemHeader*)((char*)*pPtr - sizeof(SMPMemHeader)); + SMPMemHeader* pNewHeader = (SMPMemHeader*)((char*)res - sizeof(SMPMemHeader)); + + TAOS_MEMCPY(res, *pPtr, *origSize); + mpChunkFree(pPool, pSession, *pPtr, NULL); + *pPtr = res; + + return TSDB_CODE_SUCCESS; + +_return: + + mpChunkFree(pPool, pSession, *pPtr, NULL); + + return code; +} + +int32_t mpChunkInitSession(SMemPool* pPool, SMPSession* pSession) { + int32_t code = TSDB_CODE_SUCCESS; + SMPChunk* pChunk = NULL; + + MP_ERR_RET(mpChunkRetrieve(pPool, &pChunk)); + + pSession->chunk.allocChunkNum = 1; + pSession->chunk.allocChunkMemSize = pPool->cfg.chunkSize; + + MP_ADD_TO_CHUNK_LIST(pSession->chunk.srcChunkHead, pSession->chunk.srcChunkTail, pSession->chunk.srcChunkNum, pChunk); + MP_ADD_TO_CHUNK_LIST(pSession->chunk.inUseChunkHead, pSession->chunk.inUseChunkTail, pSession->chunk.inUseChunkNum, pChunk); + + return code; +} + +int32_t mpChunkUpdateCfg(SMemPool* pPool) { + pPool->chunk.maxChunkNum = pPool->cfg.maxSize / pPool->cfg.chunkSize; + if (pPool->chunk.maxChunkNum <= 0) { + uError("invalid memory pool max chunk num, maxSize:%" PRId64 ", chunkSize:%d", pPool->cfg.maxSize, pPool->cfg.chunkSize); + return TSDB_CODE_INVALID_MEM_POOL_PARAM; + } + + pPool->chunk.readyChunkReserveNum = TMIN(pPool->cfg.threadNum * pPool->chunk.threadChunkReserveNum, pPool->chunk.maxChunkNum); + + pPool->chunk.chunkCache.groupNum = TMAX(pPool->chunk.maxChunkNum / 10, MP_CHUNK_CACHE_ALLOC_BATCH_SIZE); + + return TSDB_CODE_SUCCESS; +} + + diff --git a/source/util/src/mpDirect.c b/source/util/src/mpDirect.c new file mode 100755 index 0000000000..03fec4560f --- /dev/null +++ b/source/util/src/mpDirect.c @@ -0,0 +1,74 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#define _DEFAULT_SOURCE +#include "osMemPool.h" +#include "tmempoolInt.h" +#include "tlog.h" +#include "tutil.h" + +int32_t mpDirectAlloc(SMemPool* pPool, SMPSession* pSession, int64_t size, uint32_t alignment, void** ppRes) { + int32_t code = TSDB_CODE_SUCCESS; + MP_ERR_RET(mpChkQuotaOverflow(pPool, pSession, size)); + + void* res = alignment ? taosMemMallocAlign(alignment, size) : taosMemMalloc(size); + if (NULL != res) { + mpUpdateAllocSize(pPool, pSession, size); + } else { + (void)atomic_sub_fetch_64(&pSession->pJob->job.allocMemSize, size); + (void)atomic_sub_fetch_64(&pPool->allocMemSize, size); + + uError("malloc %" PRId64 " alignment %d failed, code: 0x%x", size, alignment, terrno); + + code = terrno; + } + + *ppRes = res; + + MP_RET(code); +} + +int64_t mpDirectGetMemSize(SMemPool* pPool, SMPSession* pSession, void *ptr) { + return taosMemSize(ptr); +} + +void mpDirectFree(SMemPool* pPool, SMPSession* pSession, void *ptr, int64_t* origSize) { + int64_t oSize = taosMemSize(ptr); + if (origSize) { + *origSize = oSize; + } + taosMemFree(ptr); + + (void)atomic_sub_fetch_64(&pSession->allocMemSize, oSize); + (void)atomic_sub_fetch_64(&pSession->pJob->job.allocMemSize, oSize); + (void)atomic_sub_fetch_64(&pPool->allocMemSize, oSize); +} + +int32_t mpDirectRealloc(SMemPool* pPool, SMPSession* pSession, void **pPtr, int64_t size, int64_t* origSize) { + int32_t code = TSDB_CODE_SUCCESS; + + MP_ERR_RET(mpChkQuotaOverflow(pPool, pSession, size)); + + *pPtr = taosMemRealloc(*pPtr, size); + if (NULL != *pPtr) { + mpUpdateAllocSize(pPool, pSession, size - *origSize); + } else { + MP_ERR_RET(terrno); + } + + return TSDB_CODE_SUCCESS; +} + + diff --git a/source/util/src/theap.c b/source/util/src/theap.c index efd239b1c6..1cd6463152 100644 --- a/source/util/src/theap.c +++ b/source/util/src/theap.c @@ -343,6 +343,13 @@ BoundedQueue* createBoundedQueue(uint32_t maxSize, pq_comp_fn fn, FDelete delete void taosBQSetFn(BoundedQueue* q, pq_comp_fn fn) { taosPQSetFn(q->queue, fn); } +void taosBQClear(BoundedQueue* q) { + if (q->queue->deleteFn) + taosArrayClearEx(q->queue->container, q->queue->deleteFn); + else + taosArrayClear(q->queue->container); +} + void destroyBoundedQueue(BoundedQueue* q) { if (!q) return; destroyPriorityQueue(q->queue); @@ -357,11 +364,9 @@ PriorityQueueNode* taosBQPush(BoundedQueue* q, PriorityQueueNode* n) { } else { void* p = top->data; top->data = n->data; - n->data = p; if (q->queue->deleteFn) { + n->data = p; q->queue->deleteFn(n->data); - } else { - taosMemoryFree(n->data); } } return pqHeapify(q->queue, 0, taosBQSize(q)); diff --git a/source/util/src/tmempool.c b/source/util/src/tmempool.c index a7d7b30959..20819f32c5 100644 --- a/source/util/src/tmempool.c +++ b/source/util/src/tmempool.c @@ -23,8 +23,14 @@ static TdThreadOnce gMPoolInit = PTHREAD_ONCE_INIT; threadlocal void* threadPoolHandle = NULL; threadlocal void* threadPoolSession = NULL; SMemPoolMgmt gMPMgmt = {0}; +SMPStrategyFp gMPFps[] = { + {NULL}, + {NULL, mpDirectAlloc, mpDirectFree, mpDirectGetMemSize, mpDirectRealloc, NULL, NULL}, + {mpChunkInit, mpChunkAlloc, mpChunkFree, mpChunkGetMemSize, mpChunkRealloc, mpChunkInitSession, mpChunkUpdateCfg} +}; -int32_t memPoolCheckCfg(SMemPoolCfg* cfg) { + +int32_t mpCheckCfg(SMemPoolCfg* cfg) { if (cfg->chunkSize < MEMPOOL_MIN_CHUNK_SIZE || cfg->chunkSize > MEMPOOL_MAX_CHUNK_SIZE) { uError("invalid memory pool chunkSize:%d", cfg->chunkSize); return TSDB_CODE_INVALID_MEM_POOL_PARAM; @@ -43,11 +49,18 @@ int32_t memPoolCheckCfg(SMemPoolCfg* cfg) { return TSDB_CODE_SUCCESS; } -void memPoolFreeChunkGroup(SMPCacheGroup* pGrp) { - //TODO + +void mpFreeCacheGroup(SMPCacheGroup* pGrp) { + if (NULL == pGrp) { + return; + } + + taosMemoryFree(pGrp->pNodes); + taosMemoryFree(pGrp); } -int32_t memPoolAddCacheGroup(SMemPool* pPool, SMPCacheGroupInfo* pInfo, SMPCacheGroup* pTail) { + +int32_t mpAddCacheGroup(SMemPool* pPool, SMPCacheGroupInfo* pInfo, SMPCacheGroup* pHead) { SMPCacheGroup* pGrp = NULL; if (NULL == pInfo->pGrpHead) { pInfo->pGrpHead = taosMemCalloc(1, sizeof(*pInfo->pGrpHead)); @@ -59,6 +72,7 @@ int32_t memPoolAddCacheGroup(SMemPool* pPool, SMPCacheGroupInfo* pInfo, SMPCache pGrp = pInfo->pGrpHead; } else { pGrp = (SMPCacheGroup*)taosMemCalloc(1, sizeof(SMPCacheGroup)); + pGrp->pNext = pHead; } pGrp->nodesNum = pInfo->groupNum; @@ -68,8 +82,8 @@ int32_t memPoolAddCacheGroup(SMemPool* pPool, SMPCacheGroupInfo* pInfo, SMPCache MP_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); } - if (atomic_val_compare_exchange_ptr(&pInfo->pGrpTail, pTail, pGrp) != pTail) { - memPoolFreeChunkGroup(pGrp); + if (pHead && atomic_val_compare_exchange_ptr(&pInfo->pGrpHead, pHead, pGrp) != pHead) { + mpFreeCacheGroup(pGrp); return TSDB_CODE_SUCCESS; } @@ -78,222 +92,117 @@ int32_t memPoolAddCacheGroup(SMemPool* pPool, SMPCacheGroupInfo* pInfo, SMPCache return TSDB_CODE_SUCCESS; } -int32_t memPoolGetIdleNode(SMemPool* pPool, SMPCacheGroupInfo* pInfo, void** ppRes) { +void mpDestroyCacheGroup(SMPCacheGroupInfo* pInfo) { + SMPCacheGroup* pGrp = pInfo->pGrpHead; + SMPCacheGroup* pNext = NULL; + while (NULL != pGrp) { + pNext = pGrp->pNext; + + mpFreeCacheGroup(pGrp); + + pGrp = pNext; + } +} + + +int32_t mpPopIdleNode(SMemPool* pPool, SMPCacheGroupInfo* pInfo, void** ppRes) { SMPCacheGroup* pGrp = NULL; - SMPListNode* pList = NULL; + SMPListNode* pNode = NULL; while (true) { - pList = (SMPListNode*)atomic_load_ptr(&pInfo->pIdleList); - if (NULL == pList) { + pNode = (SMPListNode*)atomic_load_ptr(&pInfo->pIdleList); + if (NULL == pNode) { break; } - if (atomic_val_compare_exchange_ptr(&pInfo->pIdleList, pList, pList->pNext) != pList) { + if (atomic_val_compare_exchange_ptr(&pInfo->pIdleList, pNode, pNode->pNext) != pNode) { continue; } - pList->pNext = NULL; + pNode->pNext = NULL; goto _return; } while (true) { - pGrp = atomic_load_ptr(&pInfo->pGrpTail); + pGrp = atomic_load_ptr(&pInfo->pGrpHead); int32_t offset = atomic_fetch_add_32(&pGrp->idleOffset, 1); if (offset < pGrp->nodesNum) { - pList = (SMPListNode*)((char*)pGrp->pNodes + offset * pInfo->nodeSize); + pNode = (SMPListNode*)((char*)pGrp->pNodes + offset * pInfo->nodeSize); break; } else { atomic_sub_fetch_32(&pGrp->idleOffset, 1); } - MP_ERR_RET(memPoolAddCacheGroup(pPool, pInfo, pGrp)); + MP_ERR_RET(mpAddCacheGroup(pPool, pInfo, pGrp)); } _return: - *ppRes = pList; + *ppRes = pNode; return TSDB_CODE_SUCCESS; } -int32_t memPoolNewChunk(SMemPool* pPool, SMPChunk** ppChunk) { - SMPChunk* pChunk = NULL; - MP_ERR_RET(memPoolGetIdleNode(pPool, &pPool->chunkCache, (void**)&pChunk)); +void mpPushIdleNode(SMemPool* pPool, SMPCacheGroupInfo* pInfo, SMPListNode* pNode) { + SMPCacheGroup* pGrp = NULL; + SMPListNode* pOrig = NULL; - pChunk->pMemStart = taosMemMalloc(pPool->cfg.chunkSize); - if (NULL == pChunk->pMemStart) { - uError("add new chunk, memory malloc %d failed since %s", pPool->cfg.chunkSize, strerror(errno)); - return TSDB_CODE_OUT_OF_MEMORY; - } - - pPool->allocChunkNum++; - pPool->allocChunkSize += pPool->cfg.chunkSize; - - *ppChunk = pChunk; - - return TSDB_CODE_SUCCESS; -} - - -int32_t memPoolNewNSChunk(SMemPool* pPool, SMPNSChunk** ppChunk, int64_t chunkSize) { - SMPNSChunk* pChunk = NULL; - MP_ERR_RET(memPoolGetIdleNode(pPool, &pPool->NSChunkCache, (void**)&pChunk)); - - pChunk->pMemStart = taosMemMalloc(chunkSize); - if (NULL == pChunk->pMemStart) { - uError("add new NS chunk, memory malloc %" PRId64 " failed since %s", chunkSize, strerror(errno)); - return TSDB_CODE_OUT_OF_MEMORY; - } - - pChunk->memBytes = chunkSize; - MP_SET_FLAG(pChunk->flags, MP_CHUNK_FLAG_NS_CHUNK); - - pPool->allocNSChunkNum++; - pPool->allocNSChunkSize += pPool->cfg.chunkSize; - - *ppChunk = pChunk; - - return TSDB_CODE_SUCCESS; -} - - -int32_t memPoolPrepareChunks(SMemPool* pPool, int32_t num) { - SMPChunk* pChunk = NULL; - for (int32_t i = 0; i < num; ++i) { - MP_ERR_RET(memPoolNewChunk(pPool, &pChunk)); - - pPool->readyChunkTail->list.pNext = pChunk; - pPool->readyChunkTail = pChunk; - - atomic_add_fetch_32(&pPool->readyChunkNum, 1); - } - - return TSDB_CODE_SUCCESS; -} - - -int32_t memPoolEnsureChunks(SMemPool* pPool) { - if (E_EVICT_ALL == pPool->cfg.evicPolicy) { - return TSDB_CODE_SUCCESS; - } - - int32_t readyMissNum = pPool->readyChunkReserveNum - atomic_load_32(&pPool->readyChunkNum); - if (readyMissNum <= 0) { - return TSDB_CODE_SUCCESS; - } - - MP_ERR_RET(memPoolPrepareChunks(pPool, readyMissNum)); - - return TSDB_CODE_SUCCESS; -} - -int32_t memPoolApplyCfgUpdate(SMemPool* pPool) { - atomic_store_64(&pPool->memRetireThreshold[0], pPool->cfg.maxSize * MP_RETIRE_LOW_THRESHOLD_PERCENT); - atomic_store_64(&pPool->memRetireThreshold[1], pPool->cfg.maxSize * MP_RETIRE_MID_THRESHOLD_PERCENT); - atomic_store_64(&pPool->memRetireThreshold[2], pPool->cfg.maxSize * MP_RETIRE_HIGH_THRESHOLD_PERCENT); - - atomic_store_64(&pPool->memRetireUnit, TMAX(pPool->cfg.maxSize * MP_RETIRE_UNIT_PERCENT, MP_RETIRE_UNIT_MIN_SIZE)); - - if (E_MP_STRATEGY_CHUNK == gMPMgmt.strategy) { - pPool->maxChunkNum = pPool->cfg.maxSize / pPool->cfg.chunkSize; - if (pPool->maxChunkNum <= 0) { - uError("invalid memory pool max chunk num, maxSize:%" PRId64 ", chunkSize:%d", pPool->cfg.maxSize, pPool->cfg.chunkSize); - return TSDB_CODE_INVALID_MEM_POOL_PARAM; + while (true) { + pOrig = (SMPListNode*)atomic_load_ptr(&pInfo->pIdleList); + pNode->pNext = pOrig; + + if (atomic_val_compare_exchange_ptr(&pInfo->pIdleList, pOrig, pNode) != pOrig) { + continue; } - pPool->readyChunkReserveNum = TMIN(pPool->cfg.threadNum * pPool->threadChunkReserveNum, pPool->maxChunkNum); + break; + } +} - pPool->chunkCache.groupNum = TMAX(pPool->maxChunkNum / 10, MP_CHUNK_CACHE_ALLOC_BATCH_SIZE); + +int32_t mpUpdateCfg(SMemPool* pPool) { + atomic_store_64(&pPool->retireThreshold[0], pPool->cfg.maxSize * MP_RETIRE_LOW_THRESHOLD_PERCENT); + atomic_store_64(&pPool->retireThreshold[1], pPool->cfg.maxSize * MP_RETIRE_MID_THRESHOLD_PERCENT); + atomic_store_64(&pPool->retireThreshold[2], pPool->cfg.maxSize * MP_RETIRE_HIGH_THRESHOLD_PERCENT); + + atomic_store_64(&pPool->retireUnit, TMAX(pPool->cfg.maxSize * MP_RETIRE_UNIT_PERCENT, MP_RETIRE_UNIT_MIN_SIZE)); + + if (gMPFps[gMPMgmt.strategy].updateCfgFp) { + MP_ERR_RET((*gMPFps[gMPMgmt.strategy].updateCfgFp)(pPool)); } return TSDB_CODE_SUCCESS; } -int32_t memPoolInit(SMemPool* pPool, char* poolName, SMemPoolCfg* cfg) { - MP_ERR_RET(memPoolCheckCfg(cfg)); +int32_t mpInit(SMemPool* pPool, char* poolName, SMemPoolCfg* cfg) { + MP_ERR_RET(mpCheckCfg(cfg)); - memcpy(&pPool->cfg, cfg, sizeof(*cfg)); + TAOS_MEMCPY(&pPool->cfg, cfg, sizeof(*cfg)); pPool->name = taosStrdup(poolName); if (NULL == pPool->name) { uError("calloc memory pool name %s failed", poolName); - MP_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); + MP_ERR_RET(terrno); } - MP_ERR_RET(memPoolApplyCfgUpdate(pPool)); + MP_ERR_RET(mpUpdateCfg(pPool)); pPool->ctrlInfo.statFlags = MP_STAT_FLAG_LOG_ALL; pPool->ctrlInfo.funcFlags = MP_CTRL_FLAG_PRINT_STAT; - pPool->threadChunkReserveNum = 1; - - pPool->chunkCache.nodeSize = sizeof(SMPChunk); - pPool->NSChunkCache.groupNum = MP_NSCHUNK_CACHE_ALLOC_BATCH_SIZE; - pPool->NSChunkCache.nodeSize = sizeof(SMPNSChunk); pPool->sessionCache.groupNum = MP_SESSION_CACHE_ALLOC_BATCH_SIZE; pPool->sessionCache.nodeSize = sizeof(SMPSession); - MP_ERR_RET(memPoolAddCacheGroup(pPool, &pPool->chunkCache, NULL)); - MP_ERR_RET(memPoolAddCacheGroup(pPool, &pPool->NSChunkCache, NULL)); - MP_ERR_RET(memPoolAddCacheGroup(pPool, &pPool->sessionCache, NULL)); - - MP_ERR_RET(memPoolGetIdleNode(pPool, &pPool->chunkCache, (void**)&pPool->readyChunkHead)); - pPool->readyChunkTail = pPool->readyChunkHead; - - MP_ERR_RET(memPoolEnsureChunks(pPool)); + MP_ERR_RET(mpAddCacheGroup(pPool, &pPool->sessionCache, NULL)); + if (gMPFps[gMPMgmt.strategy].initFp) { + MP_ERR_RET((*gMPFps[gMPMgmt.strategy].initFp)(pPool, poolName, cfg)); + } + return TSDB_CODE_SUCCESS; } -void memPoolNotifyLowChunkNum(SMemPool* pPool) { - -} - -int32_t memPoolGetChunk(SMemPool* pPool, SMPChunk** ppChunk) { - SMPCacheGroup* pCache = NULL; - SMPChunk* pChunk = NULL; - int32_t readyChunkNum = atomic_sub_fetch_32(&pPool->readyChunkNum, 1); - if (readyChunkNum >= 0) { - if (atomic_add_fetch_32(&pPool->readyChunkGotNum, 1) == pPool->readyChunkLowNum) { - memPoolNotifyLowChunkNum(pPool); - } - - pChunk = (SMPChunk*)atomic_load_ptr(&pPool->readyChunkHead->list.pNext); - while (atomic_val_compare_exchange_ptr(&pPool->readyChunkHead->list.pNext, pChunk, pChunk->list.pNext) != pChunk) { - pChunk = (SMPChunk*)atomic_load_ptr(&pPool->readyChunkHead->list.pNext); - } - - *ppChunk = pChunk; - - return TSDB_CODE_SUCCESS; - } else { - atomic_add_fetch_32(&pPool->readyChunkNum, 1); - } - - MP_RET(memPoolNewChunk(pPool, ppChunk)); -} - -int32_t memPoolGetChunkFromSession(SMemPool* pPool, SMPSession* pSession, int64_t size, SMPChunk** ppChunk, SMPChunk** ppPreChunk) { - SMPChunk* pChunk = pSession->srcChunkHead; - while (NULL != pChunk) { - if ((pChunk->offset + size) <= pPool->cfg.chunkSize) { - *ppChunk = pChunk; - break; - } - - *ppPreChunk = pChunk; - pChunk = (SMPChunk*)pChunk->list.pNext; - } - - if (NULL == *ppChunk) { - *ppPreChunk = NULL; - } - - return TSDB_CODE_SUCCESS; -} - - -void memPoolUpdateMaxAllocMemSize(int64_t* pMaxAllocMemSize, int64_t newSize) { +FORCE_INLINE void mpUpdateMaxAllocSize(int64_t* pMaxAllocMemSize, int64_t newSize) { int64_t maxAllocMemSize = atomic_load_64(pMaxAllocMemSize); while (true) { if (newSize <= maxAllocMemSize) { @@ -308,99 +217,22 @@ void memPoolUpdateMaxAllocMemSize(int64_t* pMaxAllocMemSize, int64_t newSize) { } } -void memPoolUpdateAllocMemSize(SMemPool* pPool, SMPSession* pSession, int64_t size) { +void mpUpdateAllocSize(SMemPool* pPool, SMPSession* pSession, int64_t size) { int64_t allocMemSize = atomic_add_fetch_64(&pSession->allocMemSize, size); - memPoolUpdateMaxAllocMemSize(&pSession->maxAllocMemSize, allocMemSize); + mpUpdateMaxAllocSize(&pSession->maxAllocMemSize, allocMemSize); - allocMemSize = atomic_load_64(&pSession->pMpCollection->collection.allocMemSize); - memPoolUpdateMaxAllocMemSize(&pSession->pMpCollection->collection.maxAllocMemSize, allocMemSize); + allocMemSize = atomic_load_64(&pSession->pJob->job.allocMemSize); + mpUpdateMaxAllocSize(&pSession->pJob->job.maxAllocMemSize, allocMemSize); allocMemSize = atomic_load_64(&pPool->allocMemSize); - memPoolUpdateMaxAllocMemSize(&pPool->maxAllocMemSize, allocMemSize); + mpUpdateMaxAllocSize(&pPool->maxAllocMemSize, allocMemSize); } - -int32_t memPoolAllocFromChunk(SMemPool* pPool, SMPSession* pSession, int64_t size, uint32_t alignment, void** ppRes) { - int32_t code = TSDB_CODE_SUCCESS; - SMPChunk* pChunk = NULL, *preSrcChunk = NULL; - void* pRes = NULL; - int64_t totalSize = size + sizeof(SMPMemHeader) + sizeof(SMPMemTailer); - - if (pSession->srcChunkNum > 0) { - MP_ERR_JRET(memPoolGetChunkFromSession(pPool, pSession, totalSize, &pChunk, &preSrcChunk)); - } - - if (NULL == pChunk) { - MP_ERR_JRET(memPoolNewChunk(pPool, &pChunk)); - - pSession->allocChunkNum++; - pSession->allocChunkMemSize += pPool->cfg.chunkSize; - memPoolUpdateAllocMemSize(pPool, pSession, totalSize); - - MP_ADD_TO_CHUNK_LIST(pSession->srcChunkHead, pSession->srcChunkTail, pSession->srcChunkNum, pChunk); - MP_ADD_TO_CHUNK_LIST(pSession->inUseChunkHead, pSession->inUseChunkTail, pSession->inUseChunkNum, pChunk); - } - - SMPMemHeader* pHeader = (SMPMemHeader*)(pChunk->pMemStart + pChunk->offset); - MP_INIT_MEM_HEADER(pHeader, size, false); - - pRes = (void*)(pHeader + 1); - pChunk->offset += totalSize; - - if (pChunk->offset >= (pPool->cfg.chunkSize - pPool->maxDiscardSize)) { - if (NULL == preSrcChunk) { - pSession->srcChunkHead = NULL; - pSession->srcChunkTail = NULL; - } else { - preSrcChunk->list.pNext = pChunk->list.pNext; - } - - pSession->srcChunkNum--; - } - - -_return: - - *ppRes = pRes; - - return code; -} - -int32_t memPoolAllocFromNSChunk(SMemPool* pPool, SMPSession* pSession, int64_t size, uint32_t alignment, void** ppRes) { - int32_t code = TSDB_CODE_SUCCESS; - SMPNSChunk* pChunk = NULL; - void* pRes = NULL; - int64_t totalSize = size + sizeof(SMPMemHeader) + sizeof(SMPMemTailer) + alignment; - - MP_ERR_JRET(memPoolNewNSChunk(pPool, &pChunk, totalSize)); - SMPMemHeader* pHeader = (SMPMemHeader*)pChunk->pMemStart; - MP_INIT_MEM_HEADER(pHeader, size, false); - - pRes = (void*)(pHeader + 1); - - pSession->allocChunkNum++; - pSession->allocChunkMemSize += totalSize; - memPoolUpdateAllocMemSize(pPool, pSession, totalSize); - - if (NULL == pSession->inUseNSChunkHead) { - pSession->inUseNSChunkHead = pChunk; - pSession->inUseNSChunkTail = pChunk; - } else { - pSession->inUseNSChunkTail->list.pNext = pChunk; - } - -_return: - - *ppRes = pRes; - - return code; -} - -int32_t memPoolPutRetireMsgToQueue(SMemPool* pPool, bool retireLowLevel) { +int32_t mpPutRetireMsgToQueue(SMemPool* pPool, bool retireLowLevel) { if (retireLowLevel) { if (0 == atomic_val_compare_exchange_8(&gMPMgmt.msgQueue.lowLevelRetire, 0, 1)) { atomic_store_ptr(&gMPMgmt.msgQueue.pPool, pPool); - tsem2_post(&gMPMgmt.threadSem); + MP_ERR_RET(tsem2_post(&gMPMgmt.threadSem)); } return TSDB_CODE_SUCCESS; @@ -408,129 +240,88 @@ int32_t memPoolPutRetireMsgToQueue(SMemPool* pPool, bool retireLowLevel) { if (0 == atomic_val_compare_exchange_8(&gMPMgmt.msgQueue.midLevelRetire, 0, 1)) { atomic_store_ptr(&gMPMgmt.msgQueue.pPool, pPool); - tsem2_post(&gMPMgmt.threadSem); + MP_ERR_RET(tsem2_post(&gMPMgmt.threadSem)); } return TSDB_CODE_SUCCESS; } -int32_t memPoolMemQuotaOverflow(SMemPool* pPool, SMPSession* pSession, int64_t size) { - SMPCollection* pCollection = pSession->pMpCollection; - int64_t cAllocSize = atomic_add_fetch_64(&pCollection->collection.allocMemSize, size); - int64_t quota = atomic_load_64(&pPool->cfg.collectionQuota); +int32_t mpChkQuotaOverflow(SMemPool* pPool, SMPSession* pSession, int64_t size) { + SMPJob* pJob = pSession->pJob; + int32_t code = TSDB_CODE_SUCCESS; + int64_t cAllocSize = atomic_add_fetch_64(&pJob->job.allocMemSize, size); + int64_t quota = atomic_load_64(&pPool->cfg.jobQuota); if (quota > 0 && cAllocSize > quota) { - uWarn("collection %" PRIx64 " allocSize %" PRId64 " is over than quota %" PRId64, pCollection->collection.collectionId, cAllocSize, quota); - pPool->cfg.cb.retireFp(pCollection->collection.collectionId); - atomic_sub_fetch_64(&pCollection->collection.allocMemSize, size); - MP_RET(TSDB_CODE_QRY_REACH_QMEM_THRESHOLD); + code = TSDB_CODE_QRY_REACH_QMEM_THRESHOLD; + uWarn("job 0x%" PRIx64 " allocSize %" PRId64 " is over than quota %" PRId64, pJob->job.jobId, cAllocSize, quota); + pPool->cfg.cb.retireJobFp(&pJob->job, code); + (void)atomic_sub_fetch_64(&pJob->job.allocMemSize, size); + MP_RET(code); } int64_t pAllocSize = atomic_add_fetch_64(&pPool->allocMemSize, size); - quota = atomic_load_64(&pPool->memRetireThreshold[2]); + quota = atomic_load_64(&pPool->retireThreshold[2]); if (pAllocSize >= quota) { - uWarn("pool allocSize %" PRId64 " reaches the high quota %" PRId64, pAllocSize, quota); - pPool->cfg.cb.retireFp(pCollection->collection.collectionId); - atomic_sub_fetch_64(&pCollection->collection.allocMemSize, size); - atomic_sub_fetch_64(&pPool->allocMemSize, size); - MP_RET(TSDB_CODE_QRY_QUERY_MEM_EXHAUSTED); + code = TSDB_CODE_QRY_QUERY_MEM_EXHAUSTED; + uWarn("%s pool allocSize %" PRId64 " reaches the high quota %" PRId64, pPool->name, pAllocSize, quota); + pPool->cfg.cb.retireJobFp(&pJob->job, code); + (void)atomic_sub_fetch_64(&pJob->job.allocMemSize, size); + (void)atomic_sub_fetch_64(&pPool->allocMemSize, size); + MP_RET(code); } - quota = atomic_load_64(&pPool->memRetireThreshold[1]); + quota = atomic_load_64(&pPool->retireThreshold[1]); if (pAllocSize >= quota) { - uInfo("pool allocSize %" PRId64 " reaches the middle quota %" PRId64, pAllocSize, quota); - if (cAllocSize >= atomic_load_64(&pPool->memRetireUnit) / 2) { - uWarn("session retired in middle quota retire choise, sessionAllocSize: %" PRId64 ", collectionSize: %" PRId64, - atomic_load_64(&pSession->allocMemSize), cAllocSize); + uInfo("%s pool allocSize %" PRId64 " reaches the middle quota %" PRId64, pPool->name, pAllocSize, quota); + if (cAllocSize >= atomic_load_64(&pPool->retireUnit) / 2) { + code = TSDB_CODE_QRY_QUERY_MEM_EXHAUSTED; + pPool->cfg.cb.retireJobFp(&pJob->job, code); + (void)atomic_sub_fetch_64(&pJob->job.allocMemSize, size); + (void)atomic_sub_fetch_64(&pPool->allocMemSize, size); - pPool->cfg.cb.retireFp(pCollection->collection.collectionId); - atomic_sub_fetch_64(&pCollection->collection.allocMemSize, size); - atomic_sub_fetch_64(&pPool->allocMemSize, size); - - MP_ERR_RET(memPoolPutRetireMsgToQueue(pPool, false)); - MP_RET(TSDB_CODE_QRY_QUERY_MEM_EXHAUSTED); + MP_ERR_RET(mpPutRetireMsgToQueue(pPool, false)); + MP_RET(code); } return TSDB_CODE_SUCCESS; } - quota = atomic_load_64(&pPool->memRetireThreshold[0]); + quota = atomic_load_64(&pPool->retireThreshold[0]); if (pAllocSize >= quota) { - uInfo("pool allocSize %" PRId64 " reaches the low quota %" PRId64, pAllocSize, quota); - if (cAllocSize >= atomic_load_64(&pPool->memRetireUnit)) { - uWarn("session retired in low quota retire choise, sessionAllocSize: %" PRId64, atomic_load_64(&pSession->allocMemSize)); + uInfo("%s pool allocSize %" PRId64 " reaches the low quota %" PRId64, pPool->name, pAllocSize, quota); + if (cAllocSize >= atomic_load_64(&pPool->retireUnit)) { + code = TSDB_CODE_QRY_QUERY_MEM_EXHAUSTED; + pPool->cfg.cb.retireJobFp(&pJob->job, code); + + (void)atomic_sub_fetch_64(&pJob->job.allocMemSize, size); + (void)atomic_sub_fetch_64(&pPool->allocMemSize, size); - pPool->cfg.cb.retireFp(pCollection->collection.collectionId); - atomic_sub_fetch_64(&pCollection->collection.allocMemSize, size); - atomic_sub_fetch_64(&pPool->allocMemSize, size); - - MP_ERR_RET(memPoolPutRetireMsgToQueue(pPool, true)); - MP_RET(TSDB_CODE_QRY_QUERY_MEM_EXHAUSTED); + MP_ERR_RET(mpPutRetireMsgToQueue(pPool, true)); + MP_RET(code); } } return TSDB_CODE_SUCCESS; } -int32_t memPoolAllocDirect(SMemPool* pPool, SMPSession* pSession, int64_t size, uint32_t alignment, void** ppRes) { - int32_t code = TSDB_CODE_SUCCESS; - MP_ERR_RET(memPoolMemQuotaOverflow(pPool, pSession, size)); - - void* res = alignment ? taosMemMallocAlign(alignment, size) : taosMemMalloc(size); - if (NULL != res) { - memPoolUpdateAllocMemSize(pPool, pSession, size); - } else { - atomic_sub_fetch_64(&pSession->pMpCollection->collection.allocMemSize, size); - atomic_sub_fetch_64(&pPool->allocMemSize, size); - uError("malloc %" PRId64 " alighment %d failed", size, alignment); - - code = TAOS_SYSTEM_ERROR(errno); - } - - MP_RET(code); +int64_t mpGetMemorySizeImpl(SMemPool* pPool, SMPSession* pSession, void *ptr) { + return (*gMPFps[gMPMgmt.strategy].getSizeFp)(pPool, pSession, ptr); } -int64_t memPoolGetMemorySizeImpl(SMemPool* pPool, SMPSession* pSession, void *ptr) { - switch (gMPMgmt.strategy) { - case E_MP_STRATEGY_DIRECT: - return taosMemSize(ptr); - case E_MP_STRATEGY_CHUNK: { - SMPMemHeader* pHeader = (SMPMemHeader*)ptr - 1; - return pHeader->size; - } - default: - break; - } - - return 0; +int32_t mpMalloc(SMemPool* pPool, SMPSession* pSession, int64_t size, uint32_t alignment, void** ppRes) { + MP_RET((*gMPFps[gMPMgmt.strategy].allocFp)(pPool, pSession, size, alignment, ppRes)); } -int32_t memPoolMallocImpl(SMemPool* pPool, SMPSession* pSession, int64_t size, uint32_t alignment, void** ppRes) { - int32_t code = TSDB_CODE_SUCCESS; - - switch (gMPMgmt.strategy) { - case E_MP_STRATEGY_DIRECT: - MP_ERR_RET(memPoolAllocDirect(pPool, pSession, size, alignment, ppRes)); - break; - case E_MP_STRATEGY_CHUNK: - MP_ERR_RET((size > pPool->cfg.chunkSize) ? memPoolAllocFromNSChunk(pPool, pSession, size, alignment, ppRes) : memPoolAllocFromChunk(pPool, pSession, size, alignment, ppRes)); - break; - default: - break; - } - - return code; -} - -int32_t memPoolCallocImpl(SMemPool* pPool, SMPSession* pSession, int64_t num, int64_t size, void** ppRes) { +int32_t mpCalloc(SMemPool* pPool, SMPSession* pSession, int64_t num, int64_t size, void** ppRes) { int32_t code = TSDB_CODE_SUCCESS; int64_t totalSize = num * size; void *res = NULL; - MP_ERR_RET(memPoolMallocImpl(pPool, pSession, totalSize, 0, &res)); + MP_ERR_RET(mpMalloc(pPool, pSession, totalSize, 0, &res)); if (NULL != res) { - memset(res, 0, totalSize); + TAOS_MEMSET(res, 0, totalSize); } _return: @@ -541,7 +332,7 @@ _return: } -void memPoolFreeImpl(SMemPool* pPool, SMPSession* pSession, void *ptr, int64_t* origSize) { +void mpFree(SMemPool* pPool, SMPSession* pSession, void *ptr, int64_t* origSize) { if (NULL == ptr) { if (origSize) { *origSize = 0; @@ -550,87 +341,28 @@ void memPoolFreeImpl(SMemPool* pPool, SMPSession* pSession, void *ptr, int64_t* return; } - switch (gMPMgmt.strategy) { - case E_MP_STRATEGY_DIRECT: { - int64_t oSize = taosMemSize(ptr); - if (origSize) { - *origSize = oSize; - } - taosMemFree(ptr); - - atomic_sub_fetch_64(&pSession->allocMemSize, oSize); - atomic_sub_fetch_64(&pPool->allocMemSize, oSize); - break; - } - case E_MP_STRATEGY_CHUNK: - if (origSize) { - *origSize = memPoolGetMemorySizeImpl(pPool, pSession, ptr); - } - break; - default: - break; - } - - return; + (*gMPFps[gMPMgmt.strategy].freeFp)(pPool, pSession, ptr, origSize); } -int32_t memPoolReallocImpl(SMemPool* pPool, SMPSession* pSession, void **pPtr, int64_t size, int64_t* origSize) { +int32_t mpRealloc(SMemPool* pPool, SMPSession* pSession, void **pPtr, int64_t size, int64_t* origSize) { int32_t code = TSDB_CODE_SUCCESS; if (NULL == *pPtr) { *origSize = 0; - MP_RET(memPoolMallocImpl(pPool, pSession, size, 0, pPtr)); + MP_RET(mpMalloc(pPool, pSession, size, 0, pPtr)); } if (0 == size) { - memPoolFreeImpl(pPool, pSession, *pPtr, origSize); + mpFree(pPool, pSession, *pPtr, origSize); return TSDB_CODE_SUCCESS; } - *origSize = memPoolGetMemorySizeImpl(pPool, pSession, *pPtr); + *origSize = mpGetMemorySizeImpl(pPool, pSession, *pPtr); - switch (gMPMgmt.strategy) { - case E_MP_STRATEGY_DIRECT: { - MP_ERR_JRET(memPoolMemQuotaOverflow(pPool, pSession, size)); - - *pPtr = taosMemRealloc(*pPtr, size); - if (NULL != *pPtr) { - memPoolUpdateAllocMemSize(pPool, pSession, size - *origSize); - } else { - MP_ERR_JRET(TAOS_SYSTEM_ERROR(errno)); - } - break; - } - case E_MP_STRATEGY_CHUNK: { - if (*origSize >= size) { - SMPMemHeader* pHeader = (SMPMemHeader*)((char*)*pPtr - sizeof(SMPMemHeader)); - pHeader->size = size; - return TSDB_CODE_SUCCESS; - } - - void* res = NULL; - MP_ERR_JRET(memPoolMallocImpl(pPool, pSession, size, 0, &res)); - SMPMemHeader* pOrigHeader = (SMPMemHeader*)((char*)*pPtr - sizeof(SMPMemHeader)); - SMPMemHeader* pNewHeader = (SMPMemHeader*)((char*)res - sizeof(SMPMemHeader)); - - memcpy(res, *pPtr, *origSize); - memPoolFreeImpl(pPool, pSession, *pPtr, NULL); - *pPtr = res; - - return TSDB_CODE_SUCCESS; - } - default: - break; - } - -_return: - - memPoolFreeImpl(pPool, pSession, *pPtr, NULL); - - return code; + MP_RET((*gMPFps[gMPMgmt.strategy].reallocFp)(pPool, pSession, pPtr, size, origSize)); } -void memPoolPrintStatDetail(SMPCtrlInfo* pCtrl, SMPStatDetail* pDetail, char* detailName, int64_t maxAllocSize) { +void mpPrintStatDetail(SMPCtrlInfo* pCtrl, SMPStatDetail* pDetail, char* detailName, int64_t maxAllocSize) { if (!MP_GET_FLAG(pCtrl->funcFlags, MP_CTRL_FLAG_PRINT_STAT)) { return; } @@ -663,15 +395,15 @@ void memPoolPrintStatDetail(SMPCtrlInfo* pCtrl, SMPStatDetail* pDetail, char* de } -void memPoolPrintFileLineStat(SMPCtrlInfo* pCtrl, SHashObj* pHash, char* detailName) { +void mpPrintFileLineStat(SMPCtrlInfo* pCtrl, SHashObj* pHash, char* detailName) { //TODO } -void memPoolPrintNodeStat(SMPCtrlInfo* pCtrl, SHashObj* pHash, char* detailName) { +void mpPrintNodeStat(SMPCtrlInfo* pCtrl, SHashObj* pHash, char* detailName) { //TODO } -void memPoolPrintSessionStat(SMPCtrlInfo* pCtrl, SMPStatSession* pSessStat, char* detailName) { +void mpPrintSessionStat(SMPCtrlInfo* pCtrl, SMPStatSession* pSessStat, char* detailName) { if (!MP_GET_FLAG(pCtrl->funcFlags, MP_CTRL_FLAG_PRINT_STAT)) { return; } @@ -682,42 +414,42 @@ void memPoolPrintSessionStat(SMPCtrlInfo* pCtrl, SMPStatSession* pSessStat, char uInfo("session destroyed num: %" PRId64, pSessStat->destroyNum); } -void memPoolPrintStat(SMemPool* pPool, SMPSession* pSession, char* procName) { +void mpPrintStat(SMemPool* pPool, SMPSession* pSession, char* procName) { char detailName[128]; if (NULL != pSession) { snprintf(detailName, sizeof(detailName) - 1, "%s - %s", procName, "Session"); detailName[sizeof(detailName) - 1] = 0; - memPoolPrintStatDetail(&pSession->ctrlInfo, &pSession->stat.statDetail, detailName, pSession->maxAllocMemSize); + mpPrintStatDetail(&pSession->ctrlInfo, &pSession->stat.statDetail, detailName, pSession->maxAllocMemSize); snprintf(detailName, sizeof(detailName) - 1, "%s - %s", procName, "SessionFile"); detailName[sizeof(detailName) - 1] = 0; - memPoolPrintFileLineStat(&pSession->ctrlInfo, pSession->stat.fileStat, detailName); + mpPrintFileLineStat(&pSession->ctrlInfo, pSession->stat.fileStat, detailName); snprintf(detailName, sizeof(detailName) - 1, "%s - %s", procName, "SessionFileLine"); detailName[sizeof(detailName) - 1] = 0; - memPoolPrintFileLineStat(&pSession->ctrlInfo, pSession->stat.lineStat, detailName); + mpPrintFileLineStat(&pSession->ctrlInfo, pSession->stat.lineStat, detailName); } snprintf(detailName, sizeof(detailName) - 1, "%s - %s", procName, pPool->name); detailName[sizeof(detailName) - 1] = 0; - memPoolPrintSessionStat(&pPool->ctrlInfo, &pPool->stat.statSession, detailName); - memPoolPrintStatDetail(&pPool->ctrlInfo, &pPool->stat.statDetail, detailName, pPool->maxAllocMemSize); + mpPrintSessionStat(&pPool->ctrlInfo, &pPool->stat.statSession, detailName); + mpPrintStatDetail(&pPool->ctrlInfo, &pPool->stat.statDetail, detailName, pPool->maxAllocMemSize); snprintf(detailName, sizeof(detailName) - 1, "%s - %s", procName, "MemPoolNode"); detailName[sizeof(detailName) - 1] = 0; - memPoolPrintNodeStat(&pSession->ctrlInfo, pSession->stat.nodeStat, detailName); + mpPrintNodeStat(&pSession->ctrlInfo, pSession->stat.nodeStat, detailName); snprintf(detailName, sizeof(detailName) - 1, "%s - %s", procName, "MemPoolFile"); detailName[sizeof(detailName) - 1] = 0; - memPoolPrintFileLineStat(&pSession->ctrlInfo, pSession->stat.fileStat, detailName); + mpPrintFileLineStat(&pSession->ctrlInfo, pSession->stat.fileStat, detailName); snprintf(detailName, sizeof(detailName) - 1, "%s - %s", procName, "MemPoolFileLine"); detailName[sizeof(detailName) - 1] = 0; - memPoolPrintFileLineStat(&pSession->ctrlInfo, pSession->stat.lineStat, detailName); + mpPrintFileLineStat(&pSession->ctrlInfo, pSession->stat.lineStat, detailName); } -void memPoolLogStatDetail(SMPStatDetail* pDetail, EMPStatLogItem item, SMPStatInput* pInput) { +void mpLogStatDetail(SMPStatDetail* pDetail, EMPStatLogItem item, SMPStatInput* pInput) { switch (item) { case E_MP_STAT_LOG_MEM_MALLOC: { if (MP_GET_FLAG(pInput->procFlags, MP_STAT_PROC_FLAG_EXEC)) { @@ -807,11 +539,9 @@ void memPoolLogStatDetail(SMPStatDetail* pDetail, EMPStatLogItem item, SMPStatIn uError("Invalid stat item: %d", item); break; } - - } -void memPoolLogStat(SMemPool* pPool, SMPSession* pSession, EMPStatLogItem item, SMPStatInput* pInput) { +void mpLogStat(SMemPool* pPool, SMPSession* pSession, EMPStatLogItem item, SMPStatInput* pInput) { switch (item) { case E_MP_STAT_LOG_MEM_MALLOC: case E_MP_STAT_LOG_MEM_CALLOC: @@ -819,10 +549,10 @@ void memPoolLogStat(SMemPool* pPool, SMPSession* pSession, EMPStatLogItem item, case E_MP_STAT_LOG_MEM_FREE: case E_MP_STAT_LOG_MEM_STRDUP: { if (MP_GET_FLAG(pSession->ctrlInfo.statFlags, MP_STAT_FLAG_LOG_ALL_MEM_STAT)) { - memPoolLogStatDetail(&pSession->stat.statDetail, item, pInput); + mpLogStatDetail(&pSession->stat.statDetail, item, pInput); } if (MP_GET_FLAG(pPool->ctrlInfo.statFlags, MP_STAT_FLAG_LOG_ALL_MEM_STAT)) { - memPoolLogStatDetail(&pPool->stat.statDetail, item, pInput); + mpLogStatDetail(&pPool->stat.statDetail, item, pInput); } break; } @@ -838,7 +568,7 @@ void memPoolLogStat(SMemPool* pPool, SMPSession* pSession, EMPStatLogItem item, } } -void memPoolCheckUpateCfg(void) { +void mpCheckUpateCfg(void) { taosRLockLatch(&gMPMgmt.poolLock); int32_t poolNum = taosArrayGetSize(gMPMgmt.poolList); for (int32_t i = 0; i < poolNum; ++i) { @@ -850,79 +580,89 @@ void memPoolCheckUpateCfg(void) { taosRUnLockLatch(&gMPMgmt.poolLock); } -void* memPoolMgmtThreadFunc(void* param) { +void* mpMgmtThreadFunc(void* param) { int32_t timeout = 0; while (0 == atomic_load_8(&gMPMgmt.modExit)) { timeout = tsem2_timewait(&gMPMgmt.threadSem, gMPMgmt.waitMs); if (0 != timeout) { - memPoolCheckUpateCfg(); + mpCheckUpateCfg(); continue; } if (atomic_load_8(&gMPMgmt.msgQueue.midLevelRetire)) { - (*gMPMgmt.msgQueue.pPool->cfg.cb.retiresFp)(atomic_load_64(&gMPMgmt.msgQueue.pPool->memRetireUnit), false); + (*gMPMgmt.msgQueue.pPool->cfg.cb.retireJobsFp)(atomic_load_64(&gMPMgmt.msgQueue.pPool->retireUnit), false, TSDB_CODE_QRY_QUERY_MEM_EXHAUSTED); } else if (atomic_load_8(&gMPMgmt.msgQueue.lowLevelRetire)) { - (*gMPMgmt.msgQueue.pPool->cfg.cb.retiresFp)(atomic_load_64(&gMPMgmt.msgQueue.pPool->memRetireUnit), true); + (*gMPMgmt.msgQueue.pPool->cfg.cb.retireJobsFp)(atomic_load_64(&gMPMgmt.msgQueue.pPool->retireUnit), true, TSDB_CODE_QRY_QUERY_MEM_EXHAUSTED); } - memPoolCheckUpateCfg(); + mpCheckUpateCfg(); } return NULL; } -void taosMemPoolModInit(void) { +void mpModInit(void) { + int32_t code = TSDB_CODE_SUCCESS; + taosInitRWLatch(&gMPMgmt.poolLock); gMPMgmt.poolList = taosArrayInit(10, POINTER_BYTES); if (NULL == gMPMgmt.poolList) { - gMPMgmt.code = TSDB_CODE_OUT_OF_MEMORY; - return; + MP_ERR_JRET(terrno); } gMPMgmt.strategy = E_MP_STRATEGY_DIRECT; gMPMgmt.code = tsem2_init(&gMPMgmt.threadSem, 0, 0); if (TSDB_CODE_SUCCESS != gMPMgmt.code) { - uError("failed to init sem2, error: %x", gMPMgmt.code); + uError("failed to init sem2, error: 0x%x", gMPMgmt.code); return; } gMPMgmt.waitMs = MP_DEFAULT_MEM_CHK_INTERVAL_MS; TdThreadAttr thAttr; - taosThreadAttrInit(&thAttr); - taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE); - if (taosThreadCreate(&gMPMgmt.poolMgmtThread, &thAttr, memPoolMgmtThreadFunc, NULL) != 0) { - uError("failed to create memPool mgmt thread since %s", strerror(errno)); - gMPMgmt.code = TSDB_CODE_SYSTEM_ERROR; - return; + MP_ERR_JRET(taosThreadAttrInit(&thAttr)); + MP_ERR_JRET(taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE)); + code = taosThreadCreate(&gMPMgmt.poolMgmtThread, &thAttr, mpMgmtThreadFunc, NULL); + if (code != 0) { + uError("failed to create memPool mgmt thread, error: 0x%x", code); + (void)taosThreadAttrDestroy(&thAttr); + MP_ERR_JRET(code); } - taosThreadAttrDestroy(&thAttr); + MP_ERR_JRET(taosThreadAttrDestroy(&thAttr)); + +_return: + + gMPMgmt.code = code; } int32_t taosMemPoolOpen(char* poolName, SMemPoolCfg* cfg, void** poolHandle) { int32_t code = TSDB_CODE_SUCCESS; SMemPool* pPool = NULL; - taosThreadOnce(&gMPoolInit, taosMemPoolModInit); + MP_ERR_JRET(taosThreadOnce(&gMPoolInit, mpModInit)); if (TSDB_CODE_SUCCESS != gMPMgmt.code) { - uError("init memory pool failed"); + uError("init memory pool failed, code: 0x%x", gMPMgmt.code); MP_ERR_JRET(gMPMgmt.code); } pPool = (SMemPool*)taosMemoryCalloc(1, sizeof(SMemPool)); if (NULL == pPool) { - uError("calloc memory pool failed"); - MP_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY); + uError("calloc memory pool failed, code: 0x%x", terrno); + MP_ERR_JRET(terrno); } - MP_ERR_JRET(memPoolInit(pPool, poolName, cfg)); + MP_ERR_JRET(mpInit(pPool, poolName, cfg)); taosWLockLatch(&gMPMgmt.poolLock); - taosArrayPush(gMPMgmt.poolList, &pPool); + if (NULL == taosArrayPush(gMPMgmt.poolList, &pPool)) { + taosWUnLockLatch(&gMPMgmt.poolLock); + MP_ERR_JRET(terrno); + } + pPool->slotId = taosArrayGetSize(gMPMgmt.poolList) - 1; taosWUnLockLatch(&gMPMgmt.poolLock); @@ -942,47 +682,54 @@ _return: void taosMemPoolCfgUpdate(void* poolHandle, SMemPoolCfg* pCfg) { SMemPool* pPool = (SMemPool*)poolHandle; - (void)memPoolApplyCfgUpdate(pPool); + (void)mpUpdateCfg(pPool); } void taosMemPoolDestroySession(void* poolHandle, void* session) { SMemPool* pPool = (SMemPool*)poolHandle; SMPSession* pSession = (SMPSession*)session; + if (NULL == pSession) { + uWarn("null pointer of session"); + return; + } + + (void)atomic_sub_fetch_32(&pSession->pJob->remainSession, 1); + //TODO; - memPoolPrintStat(pPool, pSession, "DestroySession"); + (void)atomic_add_fetch_64(&pPool->stat.statSession.destroyNum, 1); - atomic_add_fetch_64(&pPool->stat.statSession.destroyNum, 1); + mpPrintStat(pPool, pSession, "DestroySession"); + + TAOS_MEMSET(pSession, 0, sizeof(*pSession)); + + mpPushIdleNode(pPool, &pPool->sessionCache, (SMPListNode*)pSession); } -int32_t taosMemPoolInitSession(void* poolHandle, void** ppSession, void* pCollection) { +int32_t taosMemPoolInitSession(void* poolHandle, void** ppSession, void* pJob) { int32_t code = TSDB_CODE_SUCCESS; SMemPool* pPool = (SMemPool*)poolHandle; SMPSession* pSession = NULL; - SMPChunk* pChunk = NULL; - MP_ERR_JRET(memPoolGetIdleNode(pPool, &pPool->sessionCache, (void**)&pSession)); + MP_ERR_JRET(mpPopIdleNode(pPool, &pPool->sessionCache, (void**)&pSession)); - memcpy(&pSession->ctrlInfo, &pPool->ctrlInfo, sizeof(pSession->ctrlInfo)); + TAOS_MEMCPY(&pSession->ctrlInfo, &pPool->ctrlInfo, sizeof(pSession->ctrlInfo)); - MP_ERR_JRET(memPoolGetChunk(pPool, &pChunk)); - - pSession->allocChunkNum = 1; - pSession->allocChunkMemSize = pPool->cfg.chunkSize; - - MP_ADD_TO_CHUNK_LIST(pSession->srcChunkHead, pSession->srcChunkTail, pSession->srcChunkNum, pChunk); - MP_ADD_TO_CHUNK_LIST(pSession->inUseChunkHead, pSession->inUseChunkTail, pSession->inUseChunkNum, pChunk); - - pSession->pMpCollection = (SMPCollection*)pCollection; + if (gMPFps[gMPMgmt.strategy].initSessionFp) { + MP_ERR_JRET((*gMPFps[gMPMgmt.strategy].initSessionFp)(pPool, pSession)); + } + + pSession->pJob = (SMPJob*)pJob; + (void)atomic_add_fetch_32(&pSession->pJob->remainSession, 1); _return: if (TSDB_CODE_SUCCESS != code) { taosMemPoolDestroySession(poolHandle, pSession); pSession = NULL; - atomic_add_fetch_64(&pPool->stat.statSession.initFail, 1); + (void)atomic_add_fetch_64(&pPool->stat.statSession.initFail, 1); } else { - atomic_add_fetch_64(&pPool->stat.statSession.initSucc, 1); + (void)atomic_add_fetch_64(&pPool->stat.statSession.initSucc, 1); } *ppSession = pSession; @@ -1004,10 +751,10 @@ void *taosMemPoolMalloc(void* poolHandle, void* session, int64_t size, char* fil SMPSession* pSession = (SMPSession*)session; SMPStatInput input = {.size = size, .file = fileName, .line = lineNo, .procFlags = MP_STAT_PROC_FLAG_EXEC}; - MP_ERR_JRET(memPoolMallocImpl(pPool, pSession, size, 0, &res)); + terrno = mpMalloc(pPool, pSession, size, 0, &res); MP_SET_FLAG(input.procFlags, (res ? MP_STAT_PROC_FLAG_RES_SUCC : MP_STAT_PROC_FLAG_RES_FAIL)); - memPoolLogStat(pPool, pSession, E_MP_STAT_LOG_MEM_MALLOC, &input); + mpLogStat(pPool, pSession, E_MP_STAT_LOG_MEM_MALLOC, &input); _return: @@ -1029,10 +776,10 @@ void *taosMemPoolCalloc(void* poolHandle, void* session, int64_t num, int64_t int64_t totalSize = num * size; SMPStatInput input = {.size = totalSize, .file = fileName, .line = lineNo, .procFlags = MP_STAT_PROC_FLAG_EXEC}; - MP_ERR_JRET(memPoolCallocImpl(pPool, pSession, num, size, &res)); + terrno = mpCalloc(pPool, pSession, num, size, &res); MP_SET_FLAG(input.procFlags, (res ? MP_STAT_PROC_FLAG_RES_SUCC : MP_STAT_PROC_FLAG_RES_FAIL)); - memPoolLogStat(pPool, pSession, E_MP_STAT_LOG_MEM_CALLOC, &input); + mpLogStat(pPool, pSession, E_MP_STAT_LOG_MEM_CALLOC, &input); _return: @@ -1053,10 +800,10 @@ void *taosMemPoolRealloc(void* poolHandle, void* session, void *ptr, int64_t siz SMPSession* pSession = (SMPSession*)session; SMPStatInput input = {.size = size, .file = fileName, .line = lineNo, .procFlags = MP_STAT_PROC_FLAG_EXEC}; - MP_ERR_JRET(memPoolReallocImpl(pPool, pSession, &ptr, size, &input.origSize)); + terrno = mpRealloc(pPool, pSession, &ptr, size, &input.origSize); MP_SET_FLAG(input.procFlags, ((res || 0 == size) ? MP_STAT_PROC_FLAG_RES_SUCC : MP_STAT_PROC_FLAG_RES_FAIL)); - memPoolLogStat(pPool, pSession, E_MP_STAT_LOG_MEM_REALLOC, &input); + mpLogStat(pPool, pSession, E_MP_STAT_LOG_MEM_REALLOC, &input); _return: @@ -1078,13 +825,14 @@ char *taosMemPoolStrdup(void* poolHandle, void* session, const char *ptr, char int64_t size = strlen(ptr) + 1; SMPStatInput input = {.size = size, .file = fileName, .line = lineNo, .procFlags = MP_STAT_PROC_FLAG_EXEC}; - MP_ERR_JRET(memPoolMallocImpl(pPool, pSession, size, 0, &res)); + terrno = mpMalloc(pPool, pSession, size, 0, &res); if (NULL != res) { - strcpy(res, ptr); + TAOS_STRCPY(res, ptr); + *((char*)res + size - 1) = 0; } MP_SET_FLAG(input.procFlags, (res ? MP_STAT_PROC_FLAG_RES_SUCC : MP_STAT_PROC_FLAG_RES_FAIL)); - memPoolLogStat(pPool, pSession, E_MP_STAT_LOG_MEM_STRDUP, &input); + mpLogStat(pPool, pSession, E_MP_STAT_LOG_MEM_STRDUP, &input); _return: @@ -1103,10 +851,10 @@ void taosMemPoolFree(void* poolHandle, void* session, void *ptr, char* fileName, SMPSession* pSession = (SMPSession*)session; SMPStatInput input = {.file = fileName, .line = lineNo, .procFlags = MP_STAT_PROC_FLAG_EXEC}; - memPoolFreeImpl(pPool, pSession, ptr, &input.size); + mpFree(pPool, pSession, ptr, &input.size); MP_SET_FLAG(input.procFlags, MP_STAT_PROC_FLAG_RES_SUCC); - memPoolLogStat(pPool, pSession, E_MP_STAT_LOG_MEM_FREE, &input); + mpLogStat(pPool, pSession, E_MP_STAT_LOG_MEM_FREE, &input); _return: @@ -1127,11 +875,11 @@ int64_t taosMemPoolGetMemorySize(void* poolHandle, void* session, void *ptr, cha SMemPool* pPool = (SMemPool*)poolHandle; SMPSession* pSession = (SMPSession*)session; - return memPoolGetMemorySizeImpl(pPool, pSession, ptr); + return mpGetMemorySizeImpl(pPool, pSession, ptr); _return: - return -1; + return code; } void* taosMemPoolMallocAlign(void* poolHandle, void* session, uint32_t alignment, int64_t size, char* fileName, int32_t lineNo) { @@ -1148,10 +896,10 @@ void* taosMemPoolMallocAlign(void* poolHandle, void* session, uint32_t alignment SMPSession* pSession = (SMPSession*)session; SMPStatInput input = {.size = size, .file = fileName, .line = lineNo, .procFlags = MP_STAT_PROC_FLAG_EXEC}; - MP_ERR_JRET(memPoolMallocImpl(pPool, pSession, size, alignment, &res)); + terrno = mpMalloc(pPool, pSession, size, alignment, &res); MP_SET_FLAG(input.procFlags, (res ? MP_STAT_PROC_FLAG_RES_SUCC : MP_STAT_PROC_FLAG_RES_FAIL)); - memPoolLogStat(pPool, pSession, E_MP_STAT_LOG_MEM_MALLOC, &input); + mpLogStat(pPool, pSession, E_MP_STAT_LOG_MEM_MALLOC, &input); _return: @@ -1159,13 +907,35 @@ _return: } void taosMemPoolClose(void* poolHandle) { + SMemPool* pPool = (SMemPool*)poolHandle; + + taosMemoryFree(pPool->name); + mpDestroyCacheGroup(&pPool->sessionCache); +} + +void taosMemPoolModDestroy(void) { } -void taosMemPoolModDestroy(void) { + +void taosMemPoolTrim(void* poolHandle, void* session, int32_t size, char* fileName, int32_t lineNo) { } +int32_t taosMemPoolCallocJob(uint64_t jobId, void** ppJob) { + *ppJob = taosMemoryCalloc(1, sizeof(SMPJob)); + if (NULL == *ppJob) { + uError("calloc mp job failed, code: 0x%x", terrno); + return terrno; + } + + SMPJob* pJob = (SMPJob*)*ppJob; + pJob->job.jobId = jobId; + + return TSDB_CODE_SUCCESS; +} + + void taosAutoMemoryFree(void *ptr) { if (NULL != threadPoolHandle) { taosMemPoolFree(threadPoolHandle, threadPoolSession, ptr, __FILE__, __LINE__); @@ -1174,21 +944,4 @@ void taosAutoMemoryFree(void *ptr) { } } -void taosMemPoolTrim(void* poolHandle, void* session, int32_t size, char* fileName, int32_t lineNo) { - -} - -int32_t taosMemPoolCallocCollection(uint64_t collectionId, void** ppCollection) { - *ppCollection = taosMemCalloc(1, sizeof(SMPCollection)); - if (NULL == *ppCollection) { - uError("calloc collection failed"); - return TSDB_CODE_OUT_OF_MEMORY; - } - - SMPCollection* pCollection = (SMPCollection*)*ppCollection; - pCollection->collection.collectionId = collectionId; - - return TSDB_CODE_SUCCESS; -} -