diff --git a/include/os/osMemPool.h b/include/os/osMemPool.h index 7eca9a53b9..b40a1e8db9 100644 --- a/include/os/osMemPool.h +++ b/include/os/osMemPool.h @@ -94,7 +94,9 @@ typedef struct SMemPoolCallBack { typedef struct SMemPoolCfg { bool autoMaxSize; - int64_t maxSize; + int64_t reserveSize; + int64_t retireUnitSize; + int64_t freeSize; int64_t jobQuota; int32_t chunkSize; int32_t threadNum; @@ -102,6 +104,11 @@ typedef struct SMemPoolCfg { SMemPoolCallBack cb; } SMemPoolCfg; +#define MEMPOOL_GET_ALLOC_SIZE(_dstat) ((_dstat)->bytes.memMalloc.succ + (_dstat)->bytes.memCalloc.succ + (_dstat)->bytes.memRealloc.succ + (_dstat)->bytes.strdup.succ + (_dstat)->bytes.strndup.succ) +#define MEMPOOL_GET_FREE_SIZE(_dstat) ((_dstat)->bytes.memRealloc.origSucc + (_dstat)->bytes.memFree.succ) +#define MEMPOOL_GET_USED_SIZE(_dstat) (MEMPOOL_GET_ALLOC_SIZE(_dstat) - MEMPOOL_GET_FREE_SIZE(_dstat)) + + int32_t taosMemPoolOpen(char* poolName, SMemPoolCfg* cfg, void** poolHandle); void *taosMemPoolMalloc(void* poolHandle, void* session, int64_t size, char* fileName, int32_t lineNo); void *taosMemPoolCalloc(void* poolHandle, void* session, int64_t num, int64_t size, char* fileName, int32_t lineNo); @@ -123,6 +130,8 @@ void taosMemPoolPrintStat(void* poolHandle, void* session, char* procName); void taosMemPoolGetUsedSizeBegin(void* poolHandle, int64_t* usedSize, bool* needEnd); void taosMemPoolGetUsedSizeEnd(void* poolHandle); bool taosMemPoolNeedRetireJob(void* poolHandle); +int32_t taosMemPoolGetSessionStat(void* session, SMPStatDetail** ppStat, int64_t* allocSize, int64_t* maxAllocSize); + #define taosMemPoolFreeClear(ptr) \ do { \ diff --git a/source/libs/qworker/inc/qwInt.h b/source/libs/qworker/inc/qwInt.h index 3e22839db9..81eb78e2c0 100644 --- a/source/libs/qworker/inc/qwInt.h +++ b/source/libs/qworker/inc/qwInt.h @@ -253,6 +253,7 @@ typedef struct SQueryMgmt { int32_t concTaskLevel; SHashObj* pJobInfo; void* memPoolHandle; + int8_t memPoolInited; SQWRetireCtx retireCtx; } SQueryMgmt; diff --git a/source/libs/qworker/src/qwMem.c b/source/libs/qworker/src/qwMem.c index e0e5119602..0ce3935d95 100644 --- a/source/libs/qworker/src/qwMem.c +++ b/source/libs/qworker/src/qwMem.c @@ -18,7 +18,7 @@ int32_t qwGetMemPoolMaxMemSize(void* pHandle, int64_t* maxSize) { } int64_t totalSize = freeSize + usedSize; - int64_t reserveSize = TMAX(totalSize * QW_DEFAULT_RESERVE_MEM_PERCENT / 100 / 1048576UL * 1048576UL, QW_MIN_RESERVE_MEM_SIZE); + int64_t reserveSize = TMAX(tsTotalMemoryKB * 1024 * QW_DEFAULT_RESERVE_MEM_PERCENT / 100 / 1048576UL * 1048576UL, QW_MIN_RESERVE_MEM_SIZE); int64_t availSize = (totalSize - reserveSize) / 1048576UL * 1048576UL; if (availSize < QW_MIN_MEM_POOL_SIZE) { qError("too little available query memory, totalAvailable: %" PRId64 ", reserveSize: %" PRId64, totalSize, reserveSize); @@ -238,15 +238,15 @@ void qwRetireJobsCb(void* pHandle, int64_t retireSize, bool lowLevelRetire, int3 (lowLevelRetire) ? qwLowLevelRetire(pHandle, retireSize, errCode) : qwMidLevelRetire(pHandle, retireSize, errCode); } -int32_t qwGetQueryMemPoolMaxSize(void* pHandle, int64_t* pMaxSize, bool* autoMaxSize) { +int32_t qwUpdateQueryMemPoolCfg(void* pHandle, int64_t* pFreeSize, bool* autoMaxSize, int64_t* pReserveSize, int64_t* pRetireUnitSize) { if (tsQueryBufferPoolSize > 0) { - *pMaxSize = tsQueryBufferPoolSize * 1048576UL; + *pFreeSize = tsQueryBufferPoolSize * 1048576UL; *autoMaxSize = false; return TSDB_CODE_SUCCESS; } - int32_t code = qwGetMemPoolMaxMemSize(pHandle, pMaxSize); + int32_t code = qwGetMemPoolMaxMemSize(pHandle, pFreeSize); if (TSDB_CODE_SUCCESS != code) { return code; } @@ -263,18 +263,17 @@ void qwCheckUpateCfgCb(void* pHandle, void* cfg) { atomic_store_64(&pCfg->jobQuota, newJobQuota); } - int64_t maxSize = 0; + int64_t freeSize = 0, reserveSize = 0, retireUnitSize = 0; bool autoMaxSize = false; - int32_t code = qwGetQueryMemPoolMaxSize(pHandle, &maxSize, &autoMaxSize); + int32_t code = qwUpdateQueryMemPoolCfg(pHandle, &freeSize, &autoMaxSize, &reserveSize, &retireUnitSize); if (TSDB_CODE_SUCCESS != code) { - pCfg->maxSize = 0; - qError("get query memPool maxSize failed, reset maxSize to %" PRId64, pCfg->maxSize); - return; + pCfg->freeSize = 0; + qError("get query memPool freeSize failed, reset freeSize to %" PRId64, pCfg->freeSize); } - if (pCfg->autoMaxSize != autoMaxSize || pCfg->maxSize != maxSize) { + if (pCfg->autoMaxSize != autoMaxSize || pCfg->freeSize != freeSize) { pCfg->autoMaxSize = autoMaxSize; - atomic_store_64(&pCfg->maxSize, maxSize); + atomic_store_64(&pCfg->freeSize, freeSize); taosMemPoolCfgUpdate(pHandle, pCfg); } } @@ -304,10 +303,10 @@ int32_t qwInitQueryPool(void) { } #endif + taosGetTotalMemory(&tsTotalMemoryKB); + SMemPoolCfg cfg = {0}; - int64_t maxSize = 0; - bool autoMaxSize = false; - code = qwGetQueryMemPoolMaxSize(NULL, &maxSize, &autoMaxSize); + code = qwUpdateQueryMemPoolCfg(NULL, &cfg.freeSize, &cfg.autoMaxSize, &cfg.reserveSize, &cfg.retireUnitSize); if (TSDB_CODE_SUCCESS != code) { return code; } @@ -322,11 +321,6 @@ int32_t qwInitQueryPool(void) { cfg.cb.retireJobFp = qwRetireJobCb; cfg.cb.cfgUpdateFp = qwCheckUpateCfgCb; - code = qwGetMemPoolChunkSize(cfg.maxSize, cfg.threadNum, &cfg.chunkSize); - if (TSDB_CODE_SUCCESS != code) { - return code; - } - gQueryMgmt.pJobInfo = taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK); if (NULL == gQueryMgmt.pJobInfo) { qError("init job hash failed, error:0x%x", terrno); diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index ada47403ec..2e6289acd2 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -1345,7 +1345,7 @@ int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, void **qWorkerMgmt, const S QW_RET(TSDB_CODE_QRY_INVALID_INPUT); } - if (NULL == gQueryMgmt.memPoolHandle) { + if (0 == atomic_val_compare_exchange_8(&gQueryMgmt.memPoolInited, 0, 1)) { QW_ERR_RET(qwInitQueryPool()); } diff --git a/source/util/inc/tmempoolInt.h b/source/util/inc/tmempoolInt.h index f0a02a5917..15e5362b93 100755 --- a/source/util/inc/tmempoolInt.h +++ b/source/util/inc/tmempoolInt.h @@ -283,10 +283,10 @@ typedef struct SMemPoolMgmt { int32_t code; } SMemPoolMgmt; -typedef int32_t (*mpAllocFunc)(SMemPool*, SMPSession*, int64_t, uint32_t, void**); +typedef int32_t (*mpAllocFunc)(SMemPool*, SMPSession*, int64_t*, uint32_t, void**); typedef void (*mpFreeFunc)(SMemPool*, SMPSession*, void *, int64_t*); typedef int64_t (*mpGetSizeFunc)(SMemPool*, SMPSession*, void*); -typedef int32_t (*mpReallocFunc)(SMemPool*, SMPSession*, void **, int64_t, int64_t*); +typedef int32_t (*mpReallocFunc)(SMemPool*, SMPSession*, void **, int64_t*, int64_t*); typedef int32_t (*mpInitSessionFunc)(SMemPool*, SMPSession*); typedef int32_t (*mpInitFunc)(SMemPool*, char*, SMemPoolCfg*); typedef int32_t (*mpUpdateCfgFunc)(SMemPool*); @@ -402,26 +402,27 @@ enum { } while (0) // direct -int32_t mpDirectAlloc(SMemPool* pPool, SMPSession* pSession, int64_t size, uint32_t alignment, void** ppRes); +int32_t mpDirectAlloc(SMemPool* pPool, SMPSession* pSession, int64_t* size, uint32_t alignment, void** ppRes); int64_t mpDirectGetMemSize(SMemPool* pPool, SMPSession* pSession, void *ptr); void mpDirectFree(SMemPool* pPool, SMPSession* pSession, void *ptr, int64_t* origSize); -int32_t mpDirectRealloc(SMemPool* pPool, SMPSession* pSession, void **pPtr, int64_t size, int64_t* origSize); +int32_t mpDirectRealloc(SMemPool* pPool, SMPSession* pSession, void **pPtr, int64_t* size, int64_t* origSize); +int32_t mpDirectTrim(SMemPool* pPool, SMPSession* pSession, int32_t size, bool* trimed); // chunk int32_t mpChunkInit(SMemPool* pPool, char* poolName, SMemPoolCfg* cfg); int64_t mpChunkGetMemSize(SMemPool* pPool, SMPSession* pSession, void *ptr); -int32_t mpChunkAlloc(SMemPool* pPool, SMPSession* pSession, int64_t size, uint32_t alignment, void** ppRes); +int32_t mpChunkAlloc(SMemPool* pPool, SMPSession* pSession, int64_t* size, uint32_t alignment, void** ppRes); void mpChunkFree(SMemPool* pPool, SMPSession* pSession, void *ptr, int64_t* origSize); -int32_t mpChunkRealloc(SMemPool* pPool, SMPSession* pSession, void **pPtr, int64_t size, int64_t* origSize); +int32_t mpChunkRealloc(SMemPool* pPool, SMPSession* pSession, void **pPtr, int64_t* size, int64_t* origSize); int32_t mpChunkInitSession(SMemPool* pPool, SMPSession* pSession); int32_t mpChunkUpdateCfg(SMemPool* pPool); int32_t mpPopIdleNode(SMemPool* pPool, SMPCacheGroupInfo* pInfo, void** ppRes); int32_t mpChkQuotaOverflow(SMemPool* pPool, SMPSession* pSession, int64_t size); -void mpUpdateAllocSize(SMemPool* pPool, SMPSession* pSession, int64_t size); +void mpUpdateAllocSize(SMemPool* pPool, SMPSession* pSession, int64_t size, int64_t addSize); int32_t mpAddCacheGroup(SMemPool* pPool, SMPCacheGroupInfo* pInfo, SMPCacheGroup* pHead); -int32_t mpMalloc(SMemPool* pPool, SMPSession* pSession, int64_t size, uint32_t alignment, void** ppRes); +int32_t mpMalloc(SMemPool* pPool, SMPSession* pSession, int64_t* size, uint32_t alignment, void** ppRes); diff --git a/source/util/src/mpChunk.c b/source/util/src/mpChunk.c index 53f7aa0206..5b0b58441c 100755 --- a/source/util/src/mpChunk.c +++ b/source/util/src/mpChunk.c @@ -153,7 +153,7 @@ int32_t mpChunkAllocMem(SMemPool* pPool, SMPSession* pSession, int64_t size, uin pSession->chunk.allocChunkNum++; pSession->chunk.allocChunkMemSize += pPool->cfg.chunkSize; - mpUpdateAllocSize(pPool, pSession, totalSize); + mpUpdateAllocSize(pPool, pSession, totalSize, 0); MP_ADD_TO_CHUNK_LIST(pSession->chunk.srcChunkHead, pSession->chunk.srcChunkTail, pSession->chunk.srcChunkNum, pChunk); MP_ADD_TO_CHUNK_LIST(pSession->chunk.inUseChunkHead, pSession->chunk.inUseChunkTail, pSession->chunk.inUseChunkNum, pChunk); @@ -198,7 +198,7 @@ int32_t mpChunkNSAllocMem(SMemPool* pPool, SMPSession* pSession, int64_t size, u pSession->chunk.allocChunkNum++; pSession->chunk.allocChunkMemSize += totalSize; - mpUpdateAllocSize(pPool, pSession, totalSize); + mpUpdateAllocSize(pPool, pSession, totalSize, 0); if (NULL == pSession->chunk.inUseNSChunkHead) { pSession->chunk.inUseNSChunkHead = pChunk; @@ -240,8 +240,8 @@ int64_t mpChunkGetMemSize(SMemPool* pPool, SMPSession* pSession, void *ptr) { return pHeader->size; } -int32_t mpChunkAlloc(SMemPool* pPool, SMPSession* pSession, int64_t size, uint32_t alignment, void** ppRes) { - MP_RET((size > pPool->cfg.chunkSize) ? mpChunkNSAllocMem(pPool, pSession, size, alignment, ppRes) : mpChunkAllocMem(pPool, pSession, size, alignment, ppRes)); +int32_t mpChunkAlloc(SMemPool* pPool, SMPSession* pSession, int64_t* size, uint32_t alignment, void** ppRes) { + MP_RET((*size > pPool->cfg.chunkSize) ? mpChunkNSAllocMem(pPool, pSession, *size, alignment, ppRes) : mpChunkAllocMem(pPool, pSession, *size, alignment, ppRes)); } @@ -258,12 +258,12 @@ void mpChunkFree(SMemPool* pPool, SMPSession* pSession, void *ptr, int64_t* orig } -int32_t mpChunkRealloc(SMemPool* pPool, SMPSession* pSession, void **pPtr, int64_t size, int64_t* origSize) { +int32_t mpChunkRealloc(SMemPool* pPool, SMPSession* pSession, void **pPtr, int64_t* size, int64_t* origSize) { int32_t code = TSDB_CODE_SUCCESS; - if (*origSize >= size) { + if (*origSize >= *size) { SMPMemHeader* pHeader = (SMPMemHeader*)((char*)*pPtr - sizeof(SMPMemHeader)); - pHeader->size = size; + pHeader->size = *size; return TSDB_CODE_SUCCESS; } @@ -301,9 +301,9 @@ int32_t mpChunkInitSession(SMemPool* pPool, SMPSession* pSession) { } int32_t mpChunkUpdateCfg(SMemPool* pPool) { - pPool->chunk.maxChunkNum = pPool->cfg.maxSize / pPool->cfg.chunkSize; + pPool->chunk.maxChunkNum = pPool->cfg.freeSize / pPool->cfg.chunkSize; if (pPool->chunk.maxChunkNum <= 0) { - uError("invalid memory pool max chunk num, maxSize:%" PRId64 ", chunkSize:%d", pPool->cfg.maxSize, pPool->cfg.chunkSize); + uError("invalid memory pool max chunk num, freeSize:%" PRId64 ", chunkSize:%d", pPool->cfg.freeSize, pPool->cfg.chunkSize); return TSDB_CODE_INVALID_MEM_POOL_PARAM; } diff --git a/source/util/src/mpDirect.c b/source/util/src/mpDirect.c index 699b621b07..5b47d7b01a 100755 --- a/source/util/src/mpDirect.c +++ b/source/util/src/mpDirect.c @@ -19,22 +19,24 @@ #include "tlog.h" #include "tutil.h" -int32_t mpDirectAlloc(SMemPool* pPool, SMPSession* pSession, int64_t size, uint32_t alignment, void** ppRes) { +int32_t mpDirectAlloc(SMemPool* pPool, SMPSession* pSession, int64_t* size, uint32_t alignment, void** ppRes) { int32_t code = TSDB_CODE_SUCCESS; void* res = NULL; + int64_t nSize = *size; taosRLockLatch(&pPool->cfgLock); - MP_ERR_JRET(mpChkQuotaOverflow(pPool, pSession, size)); + MP_ERR_JRET(mpChkQuotaOverflow(pPool, pSession, *size)); - res = alignment ? taosMemMallocAlign(alignment, size) : taosMemMalloc(size); + res = alignment ? taosMemMallocAlign(alignment, *size) : taosMemMalloc(*size); if (NULL != res) { - mpUpdateAllocSize(pPool, pSession, size); + nSize = taosMemSize(res); + mpUpdateAllocSize(pPool, pSession, nSize, nSize - *size); } else { - (void)atomic_sub_fetch_64(&pSession->pJob->job.allocMemSize, size); - (void)atomic_sub_fetch_64(&pPool->allocMemSize, size); + (void)atomic_sub_fetch_64(&pSession->pJob->job.allocMemSize, *size); + (void)atomic_sub_fetch_64(&pPool->allocMemSize, *size); - uError("malloc %" PRId64 " alignment %d failed, code: 0x%x", size, alignment, terrno); + uError("malloc %" PRId64 " alignment %d failed, code: 0x%x", *size, alignment, terrno); code = terrno; } @@ -44,6 +46,7 @@ _return: taosRUnLockLatch(&pPool->cfgLock); *ppRes = res; + *size = nSize; MP_RET(code); } @@ -69,16 +72,18 @@ void mpDirectFree(SMemPool* pPool, SMPSession* pSession, void *ptr, int64_t* ori taosRUnLockLatch(&pPool->cfgLock); } -int32_t mpDirectRealloc(SMemPool* pPool, SMPSession* pSession, void **pPtr, int64_t size, int64_t* origSize) { +int32_t mpDirectRealloc(SMemPool* pPool, SMPSession* pSession, void **pPtr, int64_t* size, int64_t* origSize) { int32_t code = TSDB_CODE_SUCCESS; + int64_t nSize = *size; taosRLockLatch(&pPool->cfgLock); - MP_ERR_JRET(mpChkQuotaOverflow(pPool, pSession, size - *origSize)); + MP_ERR_JRET(mpChkQuotaOverflow(pPool, pSession, *size - *origSize)); - *pPtr = taosMemRealloc(*pPtr, size); + *pPtr = taosMemRealloc(*pPtr, *size); if (NULL != *pPtr) { - mpUpdateAllocSize(pPool, pSession, size - *origSize); + nSize = taosMemSize(*pPtr); + mpUpdateAllocSize(pPool, pSession, nSize - *origSize, nSize - *size + *origSize); } else { MP_ERR_RET(terrno); } @@ -88,11 +93,17 @@ _return: taosRUnLockLatch(&pPool->cfgLock); if (code) { - mpDirectFree(pPool, pSession, *pPtr, NULL); + mpDirectFree(pPool, pSession, *pPtr, origSize); *pPtr = NULL; } + + *size = nSize; return TSDB_CODE_SUCCESS; } +int32_t mpDirectTrim(SMemPool* pPool, SMPSession* pSession, int32_t size, bool* trimed) { + return taosMemTrim(size, trimed); +} + diff --git a/source/util/src/tmempool.c b/source/util/src/tmempool.c index 5f5b190e13..ead6ce9f3c 100644 --- a/source/util/src/tmempool.c +++ b/source/util/src/tmempool.c @@ -25,7 +25,7 @@ threadlocal void* threadPoolSession = NULL; SMemPoolMgmt gMPMgmt = {0}; SMPStrategyFp gMPFps[] = { {NULL}, - {NULL, mpDirectAlloc, mpDirectFree, mpDirectGetMemSize, mpDirectRealloc, NULL, NULL, NULL}, + {NULL, mpDirectAlloc, mpDirectFree, mpDirectGetMemSize, mpDirectRealloc, NULL, NULL, mpDirectTrim}, {mpChunkInit, mpChunkAlloc, mpChunkFree, mpChunkGetMemSize, mpChunkRealloc, mpChunkInitSession, mpChunkUpdateCfg, NULL} }; @@ -161,21 +161,19 @@ void mpPushIdleNode(SMemPool* pPool, SMPCacheGroupInfo* pInfo, SMPListNode* pNod int32_t mpUpdateCfg(SMemPool* pPool) { - atomic_store_64(&pPool->retireThreshold[0], pPool->cfg.maxSize * MP_RETIRE_LOW_THRESHOLD_PERCENT); - atomic_store_64(&pPool->retireThreshold[1], pPool->cfg.maxSize * MP_RETIRE_MID_THRESHOLD_PERCENT); - atomic_store_64(&pPool->retireThreshold[2], pPool->cfg.maxSize * MP_RETIRE_HIGH_THRESHOLD_PERCENT); - - atomic_store_64(&pPool->retireUnit, TMAX(pPool->cfg.maxSize * MP_RETIRE_UNIT_PERCENT, MP_RETIRE_UNIT_MIN_SIZE)); + atomic_store_64(&pPool->retireThreshold[0], pPool->cfg.freeSize * MP_RETIRE_LOW_THRESHOLD_PERCENT); + atomic_store_64(&pPool->retireThreshold[1], pPool->cfg.freeSize * MP_RETIRE_MID_THRESHOLD_PERCENT); + atomic_store_64(&pPool->retireThreshold[2], pPool->cfg.freeSize * MP_RETIRE_HIGH_THRESHOLD_PERCENT); if (gMPFps[gMPMgmt.strategy].updateCfgFp) { MP_ERR_RET((*gMPFps[gMPMgmt.strategy].updateCfgFp)(pPool)); } - uDebug("memPool %s cfg updated, autoMaxSize:%d, maxSize:%" PRId64 + uDebug("memPool %s cfg updated, autoMaxSize:%d, freeSize:%" PRId64 ", jobQuota:%" PRId64 ", threadNum:%d, retireThreshold:%" PRId64 "-%" PRId64 "-%" PRId64 - ", retireUnit:%" PRId64, pPool->name, pPool->cfg.autoMaxSize, pPool->cfg.maxSize, + ", retireUnit:%" PRId64, pPool->name, pPool->cfg.autoMaxSize, pPool->cfg.freeSize, pPool->cfg.jobQuota, pPool->cfg.threadNum, pPool->retireThreshold[0], pPool->retireThreshold[1], - pPool->retireThreshold[2], pPool->retireUnit); + pPool->retireThreshold[2], pPool->cfg.retireUnitSize); return TSDB_CODE_SUCCESS; } @@ -223,7 +221,12 @@ FORCE_INLINE void mpUpdateMaxAllocSize(int64_t* pMaxAllocMemSize, int64_t newSiz } } -void mpUpdateAllocSize(SMemPool* pPool, SMPSession* pSession, int64_t size) { +void mpUpdateAllocSize(SMemPool* pPool, SMPSession* pSession, int64_t size, int64_t addSize) { + if (addSize) { + atomic_add_fetch_64(&pSession->pJob->job.allocMemSize, addSize); + atomic_add_fetch_64(&pPool->allocMemSize, addSize); + } + int64_t allocMemSize = atomic_add_fetch_64(&pSession->allocMemSize, size); mpUpdateMaxAllocSize(&pSession->maxAllocMemSize, allocMemSize); @@ -281,7 +284,7 @@ int32_t mpChkQuotaOverflow(SMemPool* pPool, SMPSession* pSession, int64_t size) quota = atomic_load_64(&pPool->retireThreshold[1]); if (pAllocSize >= quota) { uInfo("%s pool allocSize %" PRId64 " reaches the middle quota %" PRId64, pPool->name, pAllocSize, quota); - if (cAllocSize >= atomic_load_64(&pPool->retireUnit) / 2) { + if (cAllocSize >= atomic_load_64(&pPool->cfg.retireUnitSize) / 2) { code = TSDB_CODE_QRY_QUERY_MEM_EXHAUSTED; pPool->cfg.cb.retireJobFp(&pJob->job, code); (void)atomic_sub_fetch_64(&pJob->job.allocMemSize, size); @@ -298,7 +301,7 @@ int32_t mpChkQuotaOverflow(SMemPool* pPool, SMPSession* pSession, int64_t size) quota = atomic_load_64(&pPool->retireThreshold[0]); if (pAllocSize >= quota) { uInfo("%s pool allocSize %" PRId64 " reaches the low quota %" PRId64, pPool->name, pAllocSize, quota); - if (cAllocSize >= atomic_load_64(&pPool->retireUnit)) { + if (cAllocSize >= atomic_load_64(&pPool->cfg.retireUnitSize)) { code = TSDB_CODE_QRY_QUERY_MEM_EXHAUSTED; pPool->cfg.cb.retireJobFp(&pJob->job, code); @@ -318,19 +321,18 @@ int64_t mpGetMemorySizeImpl(SMemPool* pPool, SMPSession* pSession, void *ptr) { return (*gMPFps[gMPMgmt.strategy].getSizeFp)(pPool, pSession, ptr); } -int32_t mpMalloc(SMemPool* pPool, SMPSession* pSession, int64_t size, uint32_t alignment, void** ppRes) { +int32_t mpMalloc(SMemPool* pPool, SMPSession* pSession, int64_t* size, uint32_t alignment, void** ppRes) { MP_RET((*gMPFps[gMPMgmt.strategy].allocFp)(pPool, pSession, size, alignment, ppRes)); } -int32_t mpCalloc(SMemPool* pPool, SMPSession* pSession, int64_t num, int64_t size, void** ppRes) { +int32_t mpCalloc(SMemPool* pPool, SMPSession* pSession, int64_t* size, void** ppRes) { int32_t code = TSDB_CODE_SUCCESS; - int64_t totalSize = num * size; void *res = NULL; - MP_ERR_RET(mpMalloc(pPool, pSession, totalSize, 0, &res)); + MP_ERR_RET(mpMalloc(pPool, pSession, size, 0, &res)); if (NULL != res) { - TAOS_MEMSET(res, 0, totalSize); + TAOS_MEMSET(res, 0, *size); } _return: @@ -353,7 +355,7 @@ void mpFree(SMemPool* pPool, SMPSession* pSession, void *ptr, int64_t* origSize) (*gMPFps[gMPMgmt.strategy].freeFp)(pPool, pSession, ptr, origSize); } -int32_t mpRealloc(SMemPool* pPool, SMPSession* pSession, void **pPtr, int64_t size, int64_t* origSize) { +int32_t mpRealloc(SMemPool* pPool, SMPSession* pSession, void **pPtr, int64_t* size, int64_t* origSize) { int32_t code = TSDB_CODE_SUCCESS; if (NULL == *pPtr) { @@ -361,7 +363,7 @@ int32_t mpRealloc(SMemPool* pPool, SMPSession* pSession, void **pPtr, int64_t si MP_RET(mpMalloc(pPool, pSession, size, 0, pPtr)); } - if (0 == size) { + if (0 == *size) { mpFree(pPool, pSession, *pPtr, origSize); *pPtr = NULL; return TSDB_CODE_SUCCESS; @@ -554,7 +556,6 @@ void mpLogStatDetail(SMPStatDetail* pDetail, EMPStatLogItem item, SMPStatInput* case E_MP_STAT_LOG_MEM_TRIM: { if (MP_GET_FLAG(pInput->procFlags, MP_STAT_PROC_FLAG_EXEC)) { atomic_add_fetch_64(&pDetail->times.memTrim.exec, 1); - atomic_add_fetch_64(&pDetail->bytes.memTrim.exec, pInput->origSize); } if (MP_GET_FLAG(pInput->procFlags, MP_STAT_PROC_FLAG_RES_SUCC)) { atomic_add_fetch_64(&pDetail->times.memTrim.succ, 1); @@ -616,8 +617,8 @@ void mpCheckStatDetail(void* poolHandle, void* session, char* detailName) { pCtrl = &pSession->ctrlInfo; pDetail = &pSession->stat.statDetail; if (MP_GET_FLAG(pCtrl->funcFlags, MP_CTRL_FLAG_CHECK_STAT)) { - int64_t allocSize = pDetail->bytes.memMalloc.succ + pDetail->bytes.memCalloc.succ + pDetail->bytes.memRealloc.succ + pDetail->bytes.strdup.succ + pDetail->bytes.strndup.succ; - int64_t freeSize = pDetail->bytes.memRealloc.origSucc + pDetail->bytes.memFree.succ; + int64_t allocSize = MEMPOOL_GET_ALLOC_SIZE(pDetail); + int64_t freeSize = MEMPOOL_GET_FREE_SIZE(pDetail); if (allocSize != freeSize) { uError("%s Session in JOB:0x%" PRIx64 " stat check failed, allocSize:%" PRId64 ", freeSize:%" PRId64, @@ -671,9 +672,9 @@ void* mpMgmtThreadFunc(void* param) { } if (atomic_load_8(&gMPMgmt.msgQueue.midLevelRetire)) { - (*gMPMgmt.msgQueue.pPool->cfg.cb.retireJobsFp)(gMPMgmt.msgQueue.pPool, atomic_load_64(&gMPMgmt.msgQueue.pPool->retireUnit), false, TSDB_CODE_QRY_QUERY_MEM_EXHAUSTED); + (*gMPMgmt.msgQueue.pPool->cfg.cb.retireJobsFp)(gMPMgmt.msgQueue.pPool, atomic_load_64(&gMPMgmt.msgQueue.pPool->cfg.retireUnitSize), false, TSDB_CODE_QRY_QUERY_MEM_EXHAUSTED); } else if (atomic_load_8(&gMPMgmt.msgQueue.lowLevelRetire)) { - (*gMPMgmt.msgQueue.pPool->cfg.cb.retireJobsFp)(gMPMgmt.msgQueue.pPool, atomic_load_64(&gMPMgmt.msgQueue.pPool->retireUnit), true, TSDB_CODE_QRY_QUERY_MEM_EXHAUSTED); + (*gMPMgmt.msgQueue.pPool->cfg.cb.retireJobsFp)(gMPMgmt.msgQueue.pPool, atomic_load_64(&gMPMgmt.msgQueue.pPool->cfg.retireUnitSize), true, TSDB_CODE_QRY_QUERY_MEM_EXHAUSTED); } mpCheckUpateCfg(); @@ -874,7 +875,7 @@ void *taosMemPoolMalloc(void* poolHandle, void* session, int64_t size, char* fil SMPSession* pSession = (SMPSession*)session; SMPStatInput input = {.size = size, .file = fileName, .line = lineNo, .procFlags = MP_STAT_PROC_FLAG_EXEC}; - terrno = mpMalloc(pPool, pSession, size, 0, &res); + terrno = mpMalloc(pPool, pSession, &input.size, 0, &res); MP_SET_FLAG(input.procFlags, (res ? MP_STAT_PROC_FLAG_RES_SUCC : MP_STAT_PROC_FLAG_RES_FAIL)); mpLogStat(pPool, pSession, E_MP_STAT_LOG_MEM_MALLOC, &input); @@ -899,7 +900,7 @@ void *taosMemPoolCalloc(void* poolHandle, void* session, int64_t num, int64_t int64_t totalSize = num * size; SMPStatInput input = {.size = totalSize, .file = fileName, .line = lineNo, .procFlags = MP_STAT_PROC_FLAG_EXEC}; - terrno = mpCalloc(pPool, pSession, num, size, &res); + terrno = mpCalloc(pPool, pSession, &input.size, &res); MP_SET_FLAG(input.procFlags, (res ? MP_STAT_PROC_FLAG_RES_SUCC : MP_STAT_PROC_FLAG_RES_FAIL)); mpLogStat(pPool, pSession, E_MP_STAT_LOG_MEM_CALLOC, &input); @@ -920,9 +921,9 @@ void *taosMemPoolRealloc(void* poolHandle, void* session, void *ptr, int64_t siz SMemPool* pPool = (SMemPool*)poolHandle; SMPSession* pSession = (SMPSession*)session; - SMPStatInput input = {.size = size, .file = fileName, .line = lineNo, .procFlags = MP_STAT_PROC_FLAG_EXEC}; + SMPStatInput input = {.size = size, .file = fileName, .line = lineNo, .procFlags = MP_STAT_PROC_FLAG_EXEC, .origSize = 0}; - terrno = mpRealloc(pPool, pSession, &ptr, size, &input.origSize); + terrno = mpRealloc(pPool, pSession, &ptr, &input.size, &input.origSize); if (ptr || 0 == size) { MP_SET_FLAG(input.procFlags, MP_STAT_PROC_FLAG_RES_SUCC); @@ -956,7 +957,7 @@ char *taosMemPoolStrdup(void* poolHandle, void* session, const char *ptr, char* int64_t size = strlen(ptr) + 1; SMPStatInput input = {.size = size, .file = fileName, .line = lineNo, .procFlags = MP_STAT_PROC_FLAG_EXEC}; - terrno = mpMalloc(pPool, pSession, size, 0, &res); + terrno = mpMalloc(pPool, pSession, &input.size, 0, &res); if (NULL != res) { TAOS_STRCPY(res, ptr); *((char*)res + size - 1) = 0; @@ -986,7 +987,7 @@ char *taosMemPoolStrndup(void* poolHandle, void* session, const char *ptr, int64 size = TMIN(size, origSize) + 1; SMPStatInput input = {.size = size, .file = fileName, .line = lineNo, .procFlags = MP_STAT_PROC_FLAG_EXEC}; - terrno = mpMalloc(pPool, pSession, size, 0, &res); + terrno = mpMalloc(pPool, pSession, &input.size, 0, &res); if (NULL != res) { TAOS_MEMCPY(res, ptr, size - 1); *((char*)res + size - 1) = 0; @@ -1058,7 +1059,7 @@ void* taosMemPoolMallocAlign(void* poolHandle, void* session, uint32_t alignment SMPSession* pSession = (SMPSession*)session; SMPStatInput input = {.size = size, .file = fileName, .line = lineNo, .procFlags = MP_STAT_PROC_FLAG_EXEC}; - terrno = mpMalloc(pPool, pSession, size, alignment, &res); + terrno = mpMalloc(pPool, pSession, &input.size, alignment, &res); MP_SET_FLAG(input.procFlags, (res ? MP_STAT_PROC_FLAG_RES_SUCC : MP_STAT_PROC_FLAG_RES_FAIL)); mpLogStat(pPool, pSession, E_MP_STAT_LOG_MEM_MALLOC, &input); @@ -1095,11 +1096,11 @@ int32_t taosMemPoolTrim(void* poolHandle, void* session, int32_t size, char* fil SMemPool* pPool = (SMemPool*)poolHandle; SMPSession* pSession = (SMPSession*)session; - SMPStatInput input = {.origSize = 1, .size = 0, .file = fileName, .line = lineNo, .procFlags = MP_STAT_PROC_FLAG_EXEC}; + SMPStatInput input = {.size = 0, .file = fileName, .line = lineNo, .procFlags = MP_STAT_PROC_FLAG_EXEC}; code = mpTrim(pPool, pSession, size, trimed); - input.size = (trimed) ? 1 : 0; + input.size = (trimed) ? (*trimed) : 0; MP_SET_FLAG(input.procFlags, ((0 == code) ? MP_STAT_PROC_FLAG_RES_SUCC : MP_STAT_PROC_FLAG_RES_FAIL)); mpLogStat(pPool, pSession, E_MP_STAT_LOG_MEM_TRIM, &input); @@ -1123,7 +1124,14 @@ int32_t taosMemPoolCallocJob(uint64_t jobId, void** ppJob) { } void taosMemPoolGetUsedSizeBegin(void* poolHandle, int64_t* usedSize, bool* needEnd) { + if (NULL == poolHandle) { + *usedSize = 0; + *needEnd = false; + return; + } + SMemPool* pPool = (SMemPool*)poolHandle; +#if 0 if ((atomic_load_64(&pPool->cfg.maxSize) - atomic_load_64(&pPool->allocMemSize)) <= MP_CFG_UPDATE_MIN_RESERVE_SIZE) { *needEnd = true; taosWLockLatch(&pPool->cfgLock); @@ -1132,6 +1140,11 @@ void taosMemPoolGetUsedSizeBegin(void* poolHandle, int64_t* usedSize, bool* need } *usedSize = atomic_load_64(&pPool->allocMemSize); +#else + taosWLockLatch(&pPool->cfgLock); + *needEnd = true; + *usedSize = atomic_load_64(&pPool->allocMemSize); +#endif } void taosMemPoolGetUsedSizeEnd(void* poolHandle) { @@ -1144,6 +1157,26 @@ bool taosMemPoolNeedRetireJob(void* poolHandle) { return atomic_load_64(&pPool->allocMemSize) >= atomic_load_64(&pPool->retireThreshold[0]); } +int32_t taosMemPoolGetSessionStat(void* session, SMPStatDetail** ppStat, int64_t* allocSize, int64_t* maxAllocSize) { + if (NULL == session || (NULL == ppStat && NULL == allocSize && NULL == maxAllocSize)) { + uError("%s invalid input param, session:%p, ppStat:%p, allocSize:%p, maxAllocSize:%p", __FUNCTION__, session, ppStat, allocSize, maxAllocSize); + MP_ERR_RET(TSDB_CODE_INVALID_MEM_POOL_PARAM); + } + + SMPSession* pSession = (SMPSession*)session; + + if (ppStat) { + *ppStat = &pSession->stat.statDetail; + } + if (allocSize) { + *allocSize = atomic_load_64(&pSession->allocMemSize); + } + if (maxAllocSize) { + *maxAllocSize = atomic_load_64(&pSession->maxAllocMemSize); + } + + return TSDB_CODE_SUCCESS; +} void taosAutoMemoryFree(void *ptr) { if (NULL != threadPoolHandle) { diff --git a/source/util/test/memPoolTest.cpp b/source/util/test/memPoolTest.cpp index 4f6acf1bd7..956ec06b6a 100644 --- a/source/util/test/memPoolTest.cpp +++ b/source/util/test/memPoolTest.cpp @@ -42,7 +42,7 @@ namespace { #define MPT_PRINTF (void)printf -#define MPT_MAX_MEM_ACT_TIMES 1000 +#define MPT_MAX_MEM_ACT_TIMES 300 #define MPT_MAX_SESSION_NUM 100 #define MPT_MAX_JOB_NUM 100 #define MPT_MAX_THREAD_NUM 100 @@ -243,11 +243,11 @@ void mptInit() { mptInitLogFile(); mptCtrl.caseLoopTimes = 1; - mptCtrl.taskActTimes = 50; + mptCtrl.taskActTimes = 0; mptCtrl.maxSingleAllocSize = 104857600; mptCtrl.jobNum = 100; mptCtrl.jobExecTimes = 1; - mptCtrl.jobTaskNum = 1; + mptCtrl.jobTaskNum = 0; mptCtx.pJobs = taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK); ASSERT_TRUE(NULL != mptCtx.pJobs); @@ -269,6 +269,9 @@ void mptDestroyTaskCtx(SMPTestTaskCtx* pTask, void* pSession) { mptEnableMemoryPoolUsage(mptCtx.memPoolHandle, pSession); for (int32_t i = 0; i < pTask->memIdx; ++i) { + pTask->stat.times.memFree.exec++; + pTask->stat.bytes.memFree.exec+=mptMemorySize(pTask->pMemList[i].p); + pTask->stat.bytes.memFree.succ+=mptMemorySize(pTask->pMemList[i].p); mptMemoryFree(pTask->pMemList[i].p); pTask->pMemList[i].p = NULL; } @@ -397,6 +400,14 @@ int32_t mptDestroyJob(SMPTestJobCtx* pJobCtx, bool reset) { uint64_t jobId = pJobCtx->jobId; for (int32_t i = 0; i < pJobCtx->taskNum; ++i) { + SMPStatDetail* pStat = NULL; + int64_t allocSize = 0; + taosMemPoolGetSessionStat(pJobCtx->pSessions[i], &pStat, &allocSize, NULL); + int64_t usedSize = MEMPOOL_GET_USED_SIZE(pStat); + + assert(allocSize == usedSize); + assert(0 == memcmp(pStat, &pJobCtx->taskCtxs[i].stat, sizeof(*pStat))); + mptDestroyTaskCtx(&pJobCtx->taskCtxs[i], pJobCtx->pSessions[i]); taosMemPoolDestroySession(mptCtx.memPoolHandle, pJobCtx->pSessions[i]); } @@ -502,18 +513,18 @@ void mptCheckUpateCfgCb(void* pHandle, void* cfg) { atomic_store_64(&pCfg->jobQuota, newJobQuota); } - int64_t maxSize = 0; + int64_t freeSize = 0; bool autoMaxSize = false; - int32_t code = mptGetQueryMemPoolMaxSize(pHandle, &maxSize, &autoMaxSize); + int32_t code = mptGetQueryMemPoolMaxSize(pHandle, &freeSize, &autoMaxSize); if (TSDB_CODE_SUCCESS != code) { - pCfg->maxSize = 0; - uError("get query memPool maxSize failed, reset maxSize to %" PRId64, pCfg->maxSize); + pCfg->freeSize = 0; + uError("get query memPool freeSize failed, reset freeSize to %" PRId64, pCfg->freeSize); return; } - if (pCfg->autoMaxSize != autoMaxSize || pCfg->maxSize != maxSize) { + if (pCfg->autoMaxSize != autoMaxSize || pCfg->freeSize != freeSize) { pCfg->autoMaxSize = autoMaxSize; - atomic_store_64(&pCfg->maxSize, maxSize); + atomic_store_64(&pCfg->freeSize, freeSize); taosMemPoolCfgUpdate(pHandle, pCfg); } } @@ -618,11 +629,11 @@ void mptInitPool(void) { cfg.autoMaxSize = mptCtx.param.autoPoolSize; if (!mptCtx.param.autoPoolSize) { - cfg.maxSize = mptCtx.param.poolSize; + cfg.freeSize = mptCtx.param.poolSize; } else { int64_t memSize = 0; ASSERT_TRUE(0 == taosGetSysAvailMemory(&memSize)); - cfg.maxSize = memSize * 0.8; + cfg.freeSize = memSize * 0.8; } cfg.threadNum = 10; //TODO cfg.evicPolicy = E_EVICT_AUTO; //TODO @@ -635,10 +646,20 @@ void mptInitPool(void) { ASSERT_TRUE(0 == taosMemPoolOpen("testQMemPool", &cfg, &mptCtx.memPoolHandle)); } +void mptWriteMem(void* pStart, int32_t size) { + char* pEnd = (char*)pStart + size - 1; + char* p = (char*)pStart; + while (p <= pEnd) { + *p = 'a' + taosRand() % 26; + p += 4096; + } +} + void mptSimulateAction(SMPTestJobCtx* pJobCtx, SMPTestTaskCtx* pTask) { int32_t actId = 0; bool actDone = false; int32_t size = taosRand() % mptCtrl.maxSingleAllocSize; + int32_t osize = 0, nsize = 0; while (!actDone) { actId = taosRand() % 10; @@ -649,19 +670,23 @@ void mptSimulateAction(SMPTestJobCtx* pJobCtx, SMPTestTaskCtx* pTask) { } pTask->pMemList[pTask->memIdx].p = mptMemoryMalloc(size); - pTask->stat.times.memMalloc.exec++; - pTask->stat.bytes.memMalloc.exec+=size; if (NULL == pTask->pMemList[pTask->memIdx].p) { + pTask->stat.times.memMalloc.exec++; + pTask->stat.bytes.memMalloc.exec+=size; + pTask->stat.times.memMalloc.fail++; pTask->stat.bytes.memMalloc.fail+=size; uError("JOB:0x%x TASK:0x%x mpMalloc %d failed, terrno:0x%x", pJobCtx->jobId, pTask->taskId, size, terrno); return; } - pTask->stat.bytes.memMalloc.succ+=size; - *(char*)pTask->pMemList[pTask->memIdx].p = 'a' + taosRand() % 26; - *((char*)pTask->pMemList[pTask->memIdx].p + size -1) = 'a' + taosRand() % 26; + nsize = mptMemorySize(pTask->pMemList[pTask->memIdx].p); + pTask->stat.times.memMalloc.exec++; + pTask->stat.bytes.memMalloc.exec+=nsize; + pTask->stat.bytes.memMalloc.succ+=nsize; + pTask->stat.times.memMalloc.succ++; + + mptWriteMem(pTask->pMemList[pTask->memIdx].p, size); - pTask->pMemList[pTask->memIdx].size = size; pTask->memIdx++; pTask->lastAct = actId; actDone = true; @@ -673,73 +698,91 @@ void mptSimulateAction(SMPTestJobCtx* pJobCtx, SMPTestTaskCtx* pTask) { } pTask->pMemList[pTask->memIdx].p = mptMemoryCalloc(1, size); - pTask->stat.times.memCalloc.exec++; - pTask->stat.bytes.memCalloc.exec+=size; if (NULL == pTask->pMemList[pTask->memIdx].p) { + pTask->stat.times.memCalloc.exec++; + pTask->stat.bytes.memCalloc.exec+=size; + pTask->stat.times.memCalloc.fail++; pTask->stat.bytes.memCalloc.fail+=size; uError("JOB:0x%x TASK:0x%x mpCalloc %d failed, terrno:0x%x", pJobCtx->jobId, pTask->taskId, size, terrno); return; } - pTask->stat.bytes.memCalloc.succ+=size; - *(char*)pTask->pMemList[pTask->memIdx].p = 'a' + taosRand() % 26; - *((char*)pTask->pMemList[pTask->memIdx].p + size -1) = 'a' + taosRand() % 26; + nsize = mptMemorySize(pTask->pMemList[pTask->memIdx].p); + + pTask->stat.times.memCalloc.exec++; + pTask->stat.bytes.memCalloc.exec+=nsize; + pTask->stat.times.memCalloc.succ++; + pTask->stat.bytes.memCalloc.succ+=nsize; + + mptWriteMem(pTask->pMemList[pTask->memIdx].p, size); - pTask->pMemList[pTask->memIdx].size = size; pTask->memIdx++; pTask->lastAct = actId; actDone = true; break; } case 2:{ // new realloc + break; if (pTask->memIdx >= MPT_MAX_MEM_ACT_TIMES) { break; } pTask->pMemList[pTask->memIdx].p = mptMemoryRealloc(NULL, size); - pTask->stat.times.memRealloc.exec++; - pTask->stat.bytes.memRealloc.exec+=size; if (NULL == pTask->pMemList[pTask->memIdx].p) { + pTask->stat.times.memRealloc.exec++; + pTask->stat.bytes.memRealloc.exec+=size; + pTask->stat.times.memRealloc.fail++; pTask->stat.bytes.memRealloc.fail+=size; uError("JOB:0x%x TASK:0x%x new mpRealloc %d failed, terrno:0x%x", pJobCtx->jobId, pTask->taskId, size, terrno); return; } - pTask->stat.bytes.memRealloc.succ+=size; - *(char*)pTask->pMemList[pTask->memIdx].p = 'a' + taosRand() % 26; - *((char*)pTask->pMemList[pTask->memIdx].p + size -1) = 'a' + taosRand() % 26; + nsize = mptMemorySize(pTask->pMemList[pTask->memIdx].p); + + pTask->stat.times.memRealloc.exec++; + pTask->stat.bytes.memRealloc.exec+=nsize; + pTask->stat.bytes.memRealloc.succ+=nsize; + pTask->stat.times.memRealloc.succ++; + + mptWriteMem(pTask->pMemList[pTask->memIdx].p, size); - pTask->pMemList[pTask->memIdx].size = size; pTask->memIdx++; pTask->lastAct = actId; actDone = true; break; } case 3:{ // real realloc + break; if (pTask->memIdx <= 0) { break; } assert(pTask->pMemList[pTask->memIdx - 1].p); + osize = mptMemorySize(pTask->pMemList[pTask->memIdx - 1].p); size++; pTask->pMemList[pTask->memIdx - 1].p = mptMemoryRealloc(pTask->pMemList[pTask->memIdx - 1].p, size); - pTask->stat.times.memRealloc.exec++; - pTask->stat.bytes.memRealloc.exec+=size; if (NULL == pTask->pMemList[pTask->memIdx - 1].p) { + pTask->stat.times.memRealloc.exec++; + pTask->stat.bytes.memRealloc.exec+=size; + pTask->stat.bytes.memRealloc.origExec+=osize; + pTask->stat.times.memRealloc.fail++; pTask->stat.bytes.memRealloc.fail+=size; - pTask->stat.bytes.memFree.succ+=pTask->pMemList[pTask->memIdx - 1].size; + pTask->stat.bytes.memFree.succ+=osize; uError("JOB:0x%x TASK:0x%x real mpRealloc %d failed, terrno:0x%x", pJobCtx->jobId, pTask->taskId, size, terrno); pTask->memIdx--; - pTask->pMemList[pTask->memIdx].size = 0; return; } - pTask->stat.bytes.memRealloc.succ+=size; - pTask->stat.bytes.memRealloc.origSucc+=pTask->pMemList[pTask->memIdx - 1].size; - *(char*)pTask->pMemList[pTask->memIdx - 1].p = 'a' + taosRand() % 26; - *((char*)pTask->pMemList[pTask->memIdx - 1].p + size -1) = 'a' + taosRand() % 26; + nsize = mptMemorySize(pTask->pMemList[pTask->memIdx - 1].p); + pTask->stat.times.memRealloc.exec++; + pTask->stat.bytes.memRealloc.exec+=nsize; + pTask->stat.bytes.memRealloc.origExec+=osize; + pTask->stat.bytes.memRealloc.origSucc+=osize; + pTask->stat.times.memRealloc.succ++; + pTask->stat.bytes.memRealloc.succ+=nsize; + + mptWriteMem(pTask->pMemList[pTask->memIdx - 1].p, size); - pTask->pMemList[pTask->memIdx - 1].size = size; pTask->lastAct = actId; actDone = true; break; @@ -750,14 +793,16 @@ void mptSimulateAction(SMPTestJobCtx* pJobCtx, SMPTestTaskCtx* pTask) { } assert(pTask->pMemList[pTask->memIdx - 1].p); + osize = mptMemorySize(pTask->pMemList[pTask->memIdx - 1].p); + pTask->pMemList[pTask->memIdx - 1].p = mptMemoryRealloc(pTask->pMemList[pTask->memIdx - 1].p, 0); pTask->stat.times.memRealloc.exec++; - pTask->stat.bytes.memRealloc.exec+=size; + pTask->stat.bytes.memRealloc.origExec+=osize; assert(NULL == pTask->pMemList[pTask->memIdx - 1].p); - pTask->stat.bytes.memRealloc.origSucc+=pTask->pMemList[pTask->memIdx - 1].size; + pTask->stat.times.memRealloc.succ++; + pTask->stat.bytes.memRealloc.origSucc+=osize; - pTask->pMemList[pTask->memIdx - 1].size = 0; pTask->memIdx--; pTask->lastAct = actId; actDone = true; @@ -772,15 +817,25 @@ void mptSimulateAction(SMPTestJobCtx* pJobCtx, SMPTestTaskCtx* pTask) { mptCtrl.pSrcString[size] = 0; pTask->pMemList[pTask->memIdx].p = mptStrdup(mptCtrl.pSrcString); mptCtrl.pSrcString[size] = 'W'; + if (NULL == pTask->pMemList[pTask->memIdx].p) { + pTask->stat.times.strdup.exec++; + pTask->stat.bytes.strdup.exec+=size + 1; + pTask->stat.times.strdup.fail++; + pTask->stat.bytes.strdup.fail+=size + 1; uError("JOB:0x%x TASK:0x%x mpStrdup %d failed, terrno:0x%x", pJobCtx->jobId, pTask->taskId, size, terrno); return; } - *(char*)pTask->pMemList[pTask->memIdx].p = 'a' + taosRand() % 26; - *((char*)pTask->pMemList[pTask->memIdx].p + size - 1) = 'a' + taosRand() % 26; + nsize = mptMemorySize(pTask->pMemList[pTask->memIdx].p); + pTask->stat.times.strdup.exec++; + pTask->stat.bytes.strdup.exec+= nsize; + + pTask->stat.times.strdup.succ++; + pTask->stat.bytes.strdup.succ+=nsize; + + mptWriteMem(pTask->pMemList[pTask->memIdx].p, size); - pTask->pMemList[pTask->memIdx].size = size + 1; pTask->memIdx++; pTask->lastAct = actId; actDone = true; @@ -792,16 +847,28 @@ void mptSimulateAction(SMPTestJobCtx* pJobCtx, SMPTestTaskCtx* pTask) { } size /= 10; + assert(strlen(mptCtrl.pSrcString) > size); pTask->pMemList[pTask->memIdx].p = mptStrndup(mptCtrl.pSrcString, size); + if (NULL == pTask->pMemList[pTask->memIdx].p) { + pTask->stat.times.strndup.exec++; + pTask->stat.bytes.strndup.exec+=size + 1; + pTask->stat.times.strndup.fail++; + pTask->stat.bytes.strndup.fail+=size + 1; uError("JOB:0x%x TASK:0x%x mpStrndup %d failed, terrno:0x%x", pJobCtx->jobId, pTask->taskId, size, terrno); return; } - *(char*)pTask->pMemList[pTask->memIdx].p = 'a' + taosRand() % 26; - *((char*)pTask->pMemList[pTask->memIdx].p + size -1) = 'a' + taosRand() % 26; + assert(strlen((char*)pTask->pMemList[pTask->memIdx].p) == size); + nsize = mptMemorySize(pTask->pMemList[pTask->memIdx].p); + + pTask->stat.times.strndup.exec++; + pTask->stat.bytes.strndup.exec+=nsize; + pTask->stat.times.strndup.succ++; + pTask->stat.bytes.strndup.succ+=nsize; + + mptWriteMem(pTask->pMemList[pTask->memIdx].p, size); - pTask->pMemList[pTask->memIdx].size = size + 1; pTask->memIdx++; pTask->lastAct = actId; actDone = true; @@ -813,7 +880,12 @@ void mptSimulateAction(SMPTestJobCtx* pJobCtx, SMPTestTaskCtx* pTask) { } assert(pTask->pMemList[pTask->memIdx - 1].p); + osize = mptMemorySize(pTask->pMemList[pTask->memIdx - 1].p); mptMemoryFree(pTask->pMemList[pTask->memIdx - 1].p); + pTask->stat.times.memFree.exec++; + pTask->stat.times.memFree.succ++; + pTask->stat.bytes.memFree.exec+=osize; + pTask->stat.bytes.memFree.succ+=osize; pTask->pMemList[pTask->memIdx - 1].p = NULL; pTask->memIdx--; @@ -822,7 +894,17 @@ void mptSimulateAction(SMPTestJobCtx* pJobCtx, SMPTestTaskCtx* pTask) { break; } case 8:{ // trim - mptMemoryTrim(0, NULL); + bool trimed = false; + int32_t code = mptMemoryTrim(0, &trimed); + pTask->stat.times.memTrim.exec++; + if (code) { + pTask->stat.times.memTrim.fail++; + } else { + pTask->stat.times.memTrim.succ++; + if (trimed) { + pTask->stat.bytes.memTrim.succ++; + } + } pTask->lastAct = actId; actDone = true; break; @@ -834,14 +916,23 @@ void mptSimulateAction(SMPTestJobCtx* pJobCtx, SMPTestTaskCtx* pTask) { pTask->pMemList[pTask->memIdx].p = mptMemoryMallocAlign(8, size); if (NULL == pTask->pMemList[pTask->memIdx].p) { + pTask->stat.times.memMalloc.exec++; + pTask->stat.bytes.memMalloc.exec+=size; + pTask->stat.times.memMalloc.fail++; + pTask->stat.bytes.memMalloc.fail+=size; uError("JOB:0x%x TASK:0x%x mpMallocAlign %d failed, terrno:0x%x", pJobCtx->jobId, pTask->taskId, size, terrno); return; } + + nsize = mptMemorySize(pTask->pMemList[pTask->memIdx].p); - *(char*)pTask->pMemList[pTask->memIdx].p = 'a' + taosRand() % 26; - *((char*)pTask->pMemList[pTask->memIdx].p + size -1) = 'a' + taosRand() % 26; + mptWriteMem(pTask->pMemList[pTask->memIdx].p, size); + + pTask->stat.times.memMalloc.exec++; + pTask->stat.bytes.memMalloc.exec+=nsize; - pTask->pMemList[pTask->memIdx].size = size; + pTask->stat.times.memMalloc.succ++; + pTask->stat.bytes.memMalloc.succ+=nsize; pTask->memIdx++; pTask->lastAct = actId; actDone = true; @@ -936,6 +1027,37 @@ void mptInitJobs() { } } +void mptCheckPoolUsedSize(int32_t jobNum) { + int64_t usedSize = 0; + bool needEnd = false; + int64_t poolUsedSize = 0; + + taosMemPoolGetUsedSizeBegin(mptCtx.memPoolHandle, &usedSize, &needEnd); + for (int32_t i = 0; i < jobNum; ++i) { + SMPTestJobCtx* pJobCtx = &mptCtx.jobCtxs[i]; + int64_t jobUsedSize = 0; + for (int32_t m = 0; m < pJobCtx->taskNum; ++m) { + SMPStatDetail* pStat = NULL; + int64_t allocSize = 0; + taosMemPoolGetSessionStat(pJobCtx->pSessions[m], &pStat, &allocSize, NULL); + int64_t usedSize = MEMPOOL_GET_USED_SIZE(pStat); + + assert(allocSize == usedSize); + assert(0 == memcmp(pStat, &pJobCtx->taskCtxs[m].stat, sizeof(*pStat))); + + jobUsedSize += allocSize; + } + + assert(pJobCtx->pJob->memInfo->allocMemSize == jobUsedSize); + + poolUsedSize += jobUsedSize; + } + + assert(poolUsedSize == usedSize); + + taosMemPoolGetUsedSizeEnd(mptCtx.memPoolHandle); +} + void* mptThreadFunc(void* param) { SMPTestThread* pThread = (SMPTestThread*)param; int32_t jobExecTimes = (mptCtrl.jobExecTimes) ? mptCtrl.jobExecTimes : taosRand() % MPT_MAX_JOB_LOOP_TIMES + 1; @@ -977,6 +1099,8 @@ void* mptThreadFunc(void* param) { } MPT_PRINTF("Thread %d finish the %dth job loops\n", pThread->idx, n); + + mptCheckPoolUsedSize(jobNum); } return NULL; @@ -1037,7 +1161,34 @@ void mptPrintTestBeginInfo(char* caseName, SMPTestParam* param) { } // namespace #if 1 + #if 1 +TEST(FuncTest, SysMemoryPerfTest) { + char* caseName = "FuncTest:SingleThreadTest"; + int32_t code = 0; + + int64_t msize = 1048576UL*10240; + char* p = (char*)taosMemMalloc(msize); + int64_t st = taosGetTimestampUs(); + memset(p, 0, msize); + int64_t totalUs = taosGetTimestampUs() - st; + printf("memset %" PRId64 " used time:%" PRId64 "us\n", msize, totalUs, totalUs); + + int64_t freeSize = 0; + int32_t loopTimes = 1000000; + st = taosGetTimestampUs(); + int64_t lt = st; + for (int32_t i = 0; i < loopTimes; ++i) { + code = taosGetSysAvailMemory(&freeSize); + assert(0 == code); + } + totalUs = taosGetTimestampUs() - st; + + printf("%d times getSysMemory total time:%" PRId64 "us, avg:%dus\n", loopTimes, totalUs, totalUs/loopTimes); +} +#endif + +#if 0 TEST(FuncTest, SingleThreadTest) { char* caseName = "FuncTest:SingleThreadTest"; SMPTestParam param = {0};