diff --git a/include/os/osMemPool.h b/include/os/osMemPool.h index 083e0afeef..7eca9a53b9 100644 --- a/include/os/osMemPool.h +++ b/include/os/osMemPool.h @@ -78,7 +78,7 @@ typedef struct SMPStatDetail { typedef void (*mpDecConcSessionNum)(void); typedef void (*mpIncConcSessionNum)(void); typedef void (*mpSetConcSessionNum)(int32_t); -typedef void (*mpRetireJobs)(int64_t, bool, int32_t); +typedef void (*mpRetireJobs)(void*, int64_t, bool, int32_t); typedef void (*mpRetireJob)(SMemPoolJob*, int32_t); typedef void (*mpCfgUpdate)(void*, void*); @@ -120,6 +120,9 @@ void taosMemPoolDestroySession(void* poolHandle, void* session); int32_t taosMemPoolCallocJob(uint64_t jobId, void** ppJob); void taosMemPoolCfgUpdate(void* poolHandle, SMemPoolCfg* pCfg); void taosMemPoolPrintStat(void* poolHandle, void* session, char* procName); +void taosMemPoolGetUsedSizeBegin(void* poolHandle, int64_t* usedSize, bool* needEnd); +void taosMemPoolGetUsedSizeEnd(void* poolHandle); +bool taosMemPoolNeedRetireJob(void* poolHandle); #define taosMemPoolFreeClear(ptr) \ do { \ @@ -147,7 +150,7 @@ extern threadlocal void* threadPoolSession; #define taosStrndup(_ptr, _size) ((NULL != threadPoolHandle) ? (taosMemPoolStrndup(threadPoolHandle, threadPoolSession, _ptr, _size, (char*)__FILE__, __LINE__)) : (taosStrndupi(_ptr, _size))) #define taosMemoryFree(_ptr) ((NULL != threadPoolHandle) ? (taosMemPoolFree(threadPoolHandle, threadPoolSession, _ptr, (char*)__FILE__, __LINE__)) : (taosMemFree(_ptr))) #define taosMemorySize(_ptr) ((NULL != threadPoolHandle) ? (taosMemPoolGetMemorySize(threadPoolHandle, threadPoolSession, _ptr, (char*)__FILE__, __LINE__)) : (taosMemSize(_ptr))) -#define taosMemoryTrim(_size, _trimed) ((NULL != threadPoolHandle) ? (taosMemPoolTrim(threadPoolHandle, threadPoolSession, _size, (char*)__FILE__, __LINE__, _trimed)) : (taosMemTrim(_size))) +#define taosMemoryTrim(_size, _trimed) ((NULL != threadPoolHandle) ? (taosMemPoolTrim(threadPoolHandle, threadPoolSession, _size, (char*)__FILE__, __LINE__, _trimed)) : (taosMemTrim(_size, _trimed))) #define taosMemoryMallocAlign(_alignment, _size) ((NULL != threadPoolHandle) ? (taosMemPoolMallocAlign(threadPoolHandle, threadPoolSession, _alignment, _size, (char*)__FILE__, __LINE__)) : (taosMemMallocAlign(_alignment, _size))) #else #define taosEnableMemoryPoolUsage(_pool, _session) @@ -159,9 +162,10 @@ extern threadlocal void* threadPoolSession; #define taosMemoryCalloc(_num, _size) taosMemCalloc(_num, _size) #define taosMemoryRealloc(_ptr, _size) taosMemRealloc(_ptr, _size) #define taosStrdup(_ptr) taosStrdupi(_ptr) +#define taosStrndup(_ptr, _size) taosStrndupi(_ptr, _size) #define taosMemoryFree(_ptr) taosMemFree(_ptr) #define taosMemorySize(_ptr) taosMemSize(_ptr) -#define taosMemoryTrim(_size, _trimed) taosMemTrim(_size) +#define taosMemoryTrim(_size, _trimed) taosMemTrim(_size, _trimed) #define taosMemoryMallocAlign(_alignment, _size) taosMemMallocAlign(_alignment, _size) #endif diff --git a/include/os/osMemory.h b/include/os/osMemory.h index b76ca14265..51e57820b9 100644 --- a/include/os/osMemory.h +++ b/include/os/osMemory.h @@ -46,7 +46,7 @@ char *taosStrndupi(const char *ptr, int64_t size); void taosMemFree(void *ptr); int64_t taosMemSize(void *ptr); void taosPrintBackTrace(); -void taosMemTrim(int32_t size); +int32_t taosMemTrim(int32_t size, bool* trimed); void *taosMemMallocAlign(uint32_t alignment, int64_t size); #define TAOS_MEMSET(_s, _c, _n) ((void)memset(_s, _c, _n)) diff --git a/source/libs/qworker/src/qwMem.c b/source/libs/qworker/src/qwMem.c index 4673ccadd2..e0e5119602 100644 --- a/source/libs/qworker/src/qwMem.c +++ b/source/libs/qworker/src/qwMem.c @@ -1,7 +1,23 @@ #include "qwInt.h" #include "qworker.h" -int32_t qwGetMemPoolMaxMemSize(int64_t totalSize, int64_t* maxSize) { +int32_t qwGetMemPoolMaxMemSize(void* pHandle, int64_t* maxSize) { + int64_t freeSize = 0; + int64_t usedSize = 0; + bool needEnd = false; + + taosMemPoolGetUsedSizeBegin(pHandle, &usedSize, &needEnd); + int32_t code = taosGetSysAvailMemory(&freeSize); + if (needEnd) { + taosMemPoolGetUsedSizeEnd(pHandle); + } + + if (TSDB_CODE_SUCCESS != code) { + qError("get system avaiable memory size failed, error: 0x%x", code); + return code; + } + + int64_t totalSize = freeSize + usedSize; int64_t reserveSize = TMAX(totalSize * QW_DEFAULT_RESERVE_MEM_PERCENT / 100 / 1048576UL * 1048576UL, QW_MIN_RESERVE_MEM_SIZE); int64_t availSize = (totalSize - reserveSize) / 1048576UL * 1048576UL; if (availSize < QW_MIN_MEM_POOL_SIZE) { @@ -9,6 +25,8 @@ int32_t qwGetMemPoolMaxMemSize(int64_t totalSize, int64_t* maxSize) { return TSDB_CODE_QRY_TOO_FEW_AVAILBLE_MEM; } + uDebug("new pool maxSize:%" PRId64 ", usedSize:%" PRId64 ", freeSize:%" PRId64, availSize, usedSize, freeSize); + *maxSize = availSize; return TSDB_CODE_SUCCESS; @@ -108,11 +126,11 @@ int32_t qwInitSession(QW_FPARAMS_DEF, SQWTaskCtx *ctx, void** ppSession) { ctx->pJobInfo = pJob; - QW_ERR_JRET(taosMemPoolInitSession(gQueryMgmt.memPoolHandle, ppSession, pJob->memInfo)); - char id[sizeof(tId) + sizeof(eId)] = {0}; QW_SET_TEID(id, tId, eId); + QW_ERR_JRET(taosMemPoolInitSession(gQueryMgmt.memPoolHandle, ppSession, pJob->memInfo)); + code = taosHashPut(pJob->pSessions, id, sizeof(id), ppSession, POINTER_BYTES); if (TSDB_CODE_SUCCESS != code) { qError("fail to put session into query session hash, code: 0x%x", code); @@ -136,23 +154,27 @@ void qwRetireJobCb(SMemPoolJob* mpJob, int32_t errCode) { } if (0 == atomic_val_compare_exchange_32(&pJob->errCode, 0, errCode) && 0 == atomic_val_compare_exchange_8(&pJob->retired, 0, 1)) { - qwRetireJob(pJob); - - qInfo("QID:0x%" PRIx64 " retired directly, errCode: 0x%x", mpJob->jobId, errCode); + qInfo("QID:0x%" PRIx64 " mark retired, errCode: 0x%x, allocSize:%" PRId64, mpJob->jobId, errCode, atomic_load_64(&pJob->memInfo->allocMemSize)); } else { - qDebug("QID:0x%" PRIx64 " already retired, retired: %d, errCode: 0x%x", mpJob->jobId, atomic_load_8(&pJob->retired), atomic_load_32(&pJob->errCode)); + qDebug("QID:0x%" PRIx64 " already retired, retired: %d, errCode: 0x%x, allocSize:%" PRId64, mpJob->jobId, atomic_load_8(&pJob->retired), atomic_load_32(&pJob->errCode), atomic_load_64(&pJob->memInfo->allocMemSize)); } } -void qwLowLevelRetire(int64_t retireSize, int32_t errCode) { +void qwLowLevelRetire(void* pHandle, int64_t retireSize, int32_t errCode) { SQWJobInfo* pJob = (SQWJobInfo*)taosHashIterate(gQueryMgmt.pJobInfo, NULL); while (pJob) { + if (!taosMemPoolNeedRetireJob(pHandle)) { + taosHashCancelIterate(gQueryMgmt.pJobInfo, pJob); + return; + } + + uint64_t jobId = pJob->memInfo->jobId; int64_t aSize = atomic_load_64(&pJob->memInfo->allocMemSize); if (aSize >= retireSize && 0 == atomic_val_compare_exchange_32(&pJob->errCode, 0, errCode) && 0 == atomic_val_compare_exchange_8(&pJob->retired, 0, 1)) { qwRetireJob(pJob); qDebug("QID:0x%" PRIx64 " job retired cause of low level memory retire, usedSize:%" PRId64 ", retireSize:%" PRId64, - pJob->memInfo->jobId, aSize, retireSize); + jobId, aSize, retireSize); taosHashCancelIterate(gQueryMgmt.pJobInfo, pJob); break; @@ -162,7 +184,7 @@ void qwLowLevelRetire(int64_t retireSize, int32_t errCode) { } } -void qwMidLevelRetire(int64_t retireSize, int32_t errCode) { +void qwMidLevelRetire(void* pHandle, int64_t retireSize, int32_t errCode) { SQWJobInfo* pJob = (SQWJobInfo*)taosHashIterate(gQueryMgmt.pJobInfo, NULL); PriorityQueueNode qNode; while (NULL != pJob) { @@ -175,8 +197,13 @@ void qwMidLevelRetire(int64_t retireSize, int32_t errCode) { } PriorityQueueNode* pNode = NULL; + uint64_t jobId = 0; int64_t retiredSize = 0; while (retiredSize < retireSize) { + if (!taosMemPoolNeedRetireJob(pHandle)) { + break; + } + pNode = taosBQTop(gQueryMgmt.retireCtx.pJobQueue); if (NULL == pNode) { break; @@ -190,11 +217,12 @@ void qwMidLevelRetire(int64_t retireSize, int32_t errCode) { if (0 == atomic_val_compare_exchange_32(&pJob->errCode, 0, errCode) && 0 == atomic_val_compare_exchange_8(&pJob->retired, 0, 1)) { int64_t aSize = atomic_load_64(&pJob->memInfo->allocMemSize); + jobId = pJob->memInfo->jobId; qwRetireJob(pJob); qDebug("QID:0x%" PRIx64 " job retired cause of mid level memory retire, usedSize:%" PRId64 ", retireSize:%" PRId64, - pJob->memInfo->jobId, aSize, retireSize); + jobId, aSize, retireSize); retiredSize += aSize; } @@ -206,11 +234,11 @@ void qwMidLevelRetire(int64_t retireSize, int32_t errCode) { } -void qwRetireJobsCb(int64_t retireSize, bool lowLevelRetire, int32_t errCode) { - (lowLevelRetire) ? qwLowLevelRetire(retireSize, errCode) : qwMidLevelRetire(retireSize, errCode); +void qwRetireJobsCb(void* pHandle, int64_t retireSize, bool lowLevelRetire, int32_t errCode) { + (lowLevelRetire) ? qwLowLevelRetire(pHandle, retireSize, errCode) : qwMidLevelRetire(pHandle, retireSize, errCode); } -int32_t qwGetQueryMemPoolMaxSize(int64_t* pMaxSize, bool* autoMaxSize) { +int32_t qwGetQueryMemPoolMaxSize(void* pHandle, int64_t* pMaxSize, bool* autoMaxSize) { if (tsQueryBufferPoolSize > 0) { *pMaxSize = tsQueryBufferPoolSize * 1048576UL; *autoMaxSize = false; @@ -218,14 +246,7 @@ int32_t qwGetQueryMemPoolMaxSize(int64_t* pMaxSize, bool* autoMaxSize) { return TSDB_CODE_SUCCESS; } - int64_t memSize = 0; - int32_t code = taosGetSysAvailMemory(&memSize); - if (TSDB_CODE_SUCCESS != code) { - qError("get system avaiable memory size failed, error: 0x%x", code); - return code; - } - - code = qwGetMemPoolMaxMemSize(memSize, pMaxSize); + int32_t code = qwGetMemPoolMaxMemSize(pHandle, pMaxSize); if (TSDB_CODE_SUCCESS != code) { return code; } @@ -244,7 +265,7 @@ void qwCheckUpateCfgCb(void* pHandle, void* cfg) { int64_t maxSize = 0; bool autoMaxSize = false; - int32_t code = qwGetQueryMemPoolMaxSize(&maxSize, &autoMaxSize); + int32_t code = qwGetQueryMemPoolMaxSize(pHandle, &maxSize, &autoMaxSize); if (TSDB_CODE_SUCCESS != code) { pCfg->maxSize = 0; qError("get query memPool maxSize failed, reset maxSize to %" PRId64, pCfg->maxSize); @@ -268,6 +289,8 @@ static bool qwJobMemSizeCompFn(void* l, void* r, void* param) { return atomic_load_64(&right->memInfo->allocMemSize) < atomic_load_64(&left->memInfo->allocMemSize); } +void qwDeleteJobQueueData(void* pData) {} + int32_t qwInitQueryPool(void) { int32_t code = TSDB_CODE_SUCCESS; @@ -284,7 +307,7 @@ int32_t qwInitQueryPool(void) { SMemPoolCfg cfg = {0}; int64_t maxSize = 0; bool autoMaxSize = false; - code = qwGetQueryMemPoolMaxSize(&maxSize, &autoMaxSize); + code = qwGetQueryMemPoolMaxSize(NULL, &maxSize, &autoMaxSize); if (TSDB_CODE_SUCCESS != code) { return code; } @@ -310,7 +333,7 @@ int32_t qwInitQueryPool(void) { return terrno; } - gQueryMgmt.retireCtx.pJobQueue = createBoundedQueue(QW_MAX_RETIRE_JOB_NUM, qwJobMemSizeCompFn, NULL, NULL); + gQueryMgmt.retireCtx.pJobQueue = createBoundedQueue(QW_MAX_RETIRE_JOB_NUM, qwJobMemSizeCompFn, qwDeleteJobQueueData, NULL); if (NULL == gQueryMgmt.retireCtx.pJobQueue) { qError("init job bounded queue failed, error:0x%x", terrno); return terrno; diff --git a/source/os/src/osMemory.c b/source/os/src/osMemory.c index f1220c4aa3..e46b669663 100644 --- a/source/os/src/osMemory.c +++ b/source/os/src/osMemory.c @@ -460,12 +460,13 @@ int64_t taosMemSize(void *ptr) { #endif } -void taosMemTrim(int32_t size) { +int32_t taosMemTrim(int32_t size, bool* trimed) { #if defined(WINDOWS) || defined(DARWIN) || defined(_ALPINE) // do nothing - return; + return TSDB_CODE_SUCCESS; #else - (void)malloc_trim(size); + *trimed = malloc_trim(size); + return TSDB_CODE_SUCCESS; #endif } diff --git a/source/util/inc/tmempoolInt.h b/source/util/inc/tmempoolInt.h index aa0b25802b..f0a02a5917 100755 --- a/source/util/inc/tmempoolInt.h +++ b/source/util/inc/tmempoolInt.h @@ -38,6 +38,7 @@ extern "C" { #define MP_RETIRE_LOW_THRESHOLD_PERCENT (0.85) #define MP_RETIRE_UNIT_PERCENT (0.1) #define MP_RETIRE_UNIT_MIN_SIZE (50 * 1048576UL) +#define MP_CFG_UPDATE_MIN_RESERVE_SIZE (50 * 1024 * 1048576UL) // FLAGS AREA @@ -72,6 +73,7 @@ extern "C" { // CTRL FUNC FLAGS #define MP_CTRL_FLAG_PRINT_STAT (1 << 0) +#define MP_CTRL_FLAG_CHECK_STAT (1 << 1) typedef enum EMPStatLogItem { @@ -194,7 +196,7 @@ typedef struct SMPSessionChunk { typedef struct SMPSession { SMPListNode list; - int64_t sessionId; + char* sessionId; SMPJob* pJob; SMPCtrlInfo ctrlInfo; int64_t allocMemSize; @@ -242,6 +244,7 @@ typedef struct SMPChunkMgmt { typedef struct SMemPool { char *name; int16_t slotId; + SRWLatch cfgLock; SMemPoolCfg cfg; int64_t retireThreshold[3]; int64_t retireUnit; diff --git a/source/util/src/mpDirect.c b/source/util/src/mpDirect.c index 03fec4560f..699b621b07 100755 --- a/source/util/src/mpDirect.c +++ b/source/util/src/mpDirect.c @@ -21,9 +21,13 @@ int32_t mpDirectAlloc(SMemPool* pPool, SMPSession* pSession, int64_t size, uint32_t alignment, void** ppRes) { int32_t code = TSDB_CODE_SUCCESS; - MP_ERR_RET(mpChkQuotaOverflow(pPool, pSession, size)); + void* res = NULL; - void* res = alignment ? taosMemMallocAlign(alignment, size) : taosMemMalloc(size); + taosRLockLatch(&pPool->cfgLock); + + MP_ERR_JRET(mpChkQuotaOverflow(pPool, pSession, size)); + + res = alignment ? taosMemMallocAlign(alignment, size) : taosMemMalloc(size); if (NULL != res) { mpUpdateAllocSize(pPool, pSession, size); } else { @@ -35,6 +39,10 @@ int32_t mpDirectAlloc(SMemPool* pPool, SMPSession* pSession, int64_t size, uint3 code = terrno; } +_return: + + taosRUnLockLatch(&pPool->cfgLock); + *ppRes = res; MP_RET(code); @@ -49,17 +57,24 @@ void mpDirectFree(SMemPool* pPool, SMPSession* pSession, void *ptr, int64_t* ori if (origSize) { *origSize = oSize; } + + taosRLockLatch(&pPool->cfgLock); // tmp test + taosMemFree(ptr); (void)atomic_sub_fetch_64(&pSession->allocMemSize, oSize); (void)atomic_sub_fetch_64(&pSession->pJob->job.allocMemSize, oSize); (void)atomic_sub_fetch_64(&pPool->allocMemSize, oSize); + + taosRUnLockLatch(&pPool->cfgLock); } int32_t mpDirectRealloc(SMemPool* pPool, SMPSession* pSession, void **pPtr, int64_t size, int64_t* origSize) { int32_t code = TSDB_CODE_SUCCESS; - MP_ERR_RET(mpChkQuotaOverflow(pPool, pSession, size)); + taosRLockLatch(&pPool->cfgLock); + + MP_ERR_JRET(mpChkQuotaOverflow(pPool, pSession, size - *origSize)); *pPtr = taosMemRealloc(*pPtr, size); if (NULL != *pPtr) { @@ -68,6 +83,15 @@ int32_t mpDirectRealloc(SMemPool* pPool, SMPSession* pSession, void **pPtr, int6 MP_ERR_RET(terrno); } +_return: + + taosRUnLockLatch(&pPool->cfgLock); + + if (code) { + mpDirectFree(pPool, pSession, *pPtr, NULL); + *pPtr = NULL; + } + return TSDB_CODE_SUCCESS; } diff --git a/source/util/src/tmempool.c b/source/util/src/tmempool.c index 0fd8e69ce6..5f5b190e13 100644 --- a/source/util/src/tmempool.c +++ b/source/util/src/tmempool.c @@ -171,6 +171,12 @@ int32_t mpUpdateCfg(SMemPool* pPool) { MP_ERR_RET((*gMPFps[gMPMgmt.strategy].updateCfgFp)(pPool)); } + uDebug("memPool %s cfg updated, autoMaxSize:%d, maxSize:%" PRId64 + ", jobQuota:%" PRId64 ", threadNum:%d, retireThreshold:%" PRId64 "-%" PRId64 "-%" PRId64 + ", retireUnit:%" PRId64, pPool->name, pPool->cfg.autoMaxSize, pPool->cfg.maxSize, + pPool->cfg.jobQuota, pPool->cfg.threadNum, pPool->retireThreshold[0], pPool->retireThreshold[1], + pPool->retireThreshold[2], pPool->retireUnit); + return TSDB_CODE_SUCCESS; } @@ -188,7 +194,7 @@ int32_t mpInit(SMemPool* pPool, char* poolName, SMemPoolCfg* cfg) { MP_ERR_RET(mpUpdateCfg(pPool)); pPool->ctrlInfo.statFlags = MP_STAT_FLAG_LOG_ALL; - pPool->ctrlInfo.funcFlags = MP_CTRL_FLAG_PRINT_STAT; + pPool->ctrlInfo.funcFlags = MP_CTRL_FLAG_PRINT_STAT | MP_CTRL_FLAG_CHECK_STAT; pPool->sessionCache.groupNum = MP_SESSION_CACHE_ALLOC_BATCH_SIZE; pPool->sessionCache.nodeSize = sizeof(SMPSession); @@ -250,6 +256,7 @@ int32_t mpPutRetireMsgToQueue(SMemPool* pPool, bool retireLowLevel) { int32_t mpChkQuotaOverflow(SMemPool* pPool, SMPSession* pSession, int64_t size) { SMPJob* pJob = pSession->pJob; int32_t code = TSDB_CODE_SUCCESS; + int64_t cAllocSize = atomic_add_fetch_64(&pJob->job.allocMemSize, size); int64_t quota = atomic_load_64(&pPool->cfg.jobQuota); if (quota > 0 && cAllocSize > quota) { @@ -280,8 +287,9 @@ int32_t mpChkQuotaOverflow(SMemPool* pPool, SMPSession* pSession, int64_t size) (void)atomic_sub_fetch_64(&pJob->job.allocMemSize, size); (void)atomic_sub_fetch_64(&pPool->allocMemSize, size); - MP_ERR_RET(mpPutRetireMsgToQueue(pPool, false)); MP_RET(code); + } else { + MP_ERR_RET(mpPutRetireMsgToQueue(pPool, false)); } return TSDB_CODE_SUCCESS; @@ -297,8 +305,9 @@ int32_t mpChkQuotaOverflow(SMemPool* pPool, SMPSession* pSession, int64_t size) (void)atomic_sub_fetch_64(&pJob->job.allocMemSize, size); (void)atomic_sub_fetch_64(&pPool->allocMemSize, size); - MP_ERR_RET(mpPutRetireMsgToQueue(pPool, true)); MP_RET(code); + } else { + MP_ERR_RET(mpPutRetireMsgToQueue(pPool, true)); } } @@ -354,6 +363,7 @@ int32_t mpRealloc(SMemPool* pPool, SMPSession* pSession, void **pPtr, int64_t si if (0 == size) { mpFree(pPool, pSession, *pPtr, origSize); + *pPtr = NULL; return TSDB_CODE_SUCCESS; } @@ -596,6 +606,49 @@ void mpLogStat(SMemPool* pPool, SMPSession* pSession, EMPStatLogItem item, SMPSt } } +void mpCheckStatDetail(void* poolHandle, void* session, char* detailName) { + SMemPool* pPool = (SMemPool*)poolHandle; + SMPSession* pSession = (SMPSession*)session; + SMPCtrlInfo* pCtrl = NULL; + SMPStatDetail* pDetail = NULL; + + if (NULL != session) { + pCtrl = &pSession->ctrlInfo; + pDetail = &pSession->stat.statDetail; + if (MP_GET_FLAG(pCtrl->funcFlags, MP_CTRL_FLAG_CHECK_STAT)) { + int64_t allocSize = pDetail->bytes.memMalloc.succ + pDetail->bytes.memCalloc.succ + pDetail->bytes.memRealloc.succ + pDetail->bytes.strdup.succ + pDetail->bytes.strndup.succ; + int64_t freeSize = pDetail->bytes.memRealloc.origSucc + pDetail->bytes.memFree.succ; + + if (allocSize != freeSize) { + uError("%s Session in JOB:0x%" PRIx64 " stat check failed, allocSize:%" PRId64 ", freeSize:%" PRId64, + detailName, pSession->pJob->job.jobId, allocSize, freeSize); + ASSERT(0); + } else { + uDebug("%s Session in JOB:0x%" PRIx64 " stat check succeed, allocSize:%" PRId64 ", freeSize:%" PRId64, + detailName, pSession->pJob->job.jobId, allocSize, freeSize); + } + } + } + + if (NULL != poolHandle) { + pCtrl = &pPool->ctrlInfo; + pDetail = &pPool->stat.statDetail; + int64_t sessInit = pPool->stat.statSession.initFail + pPool->stat.statSession.initSucc; + if (MP_GET_FLAG(pCtrl->funcFlags, MP_CTRL_FLAG_CHECK_STAT) && sessInit == pPool->stat.statSession.destroyNum) { + int64_t allocSize = pDetail->bytes.memMalloc.succ + pDetail->bytes.memCalloc.succ + pDetail->bytes.memRealloc.succ + pDetail->bytes.strdup.succ + pDetail->bytes.strndup.succ; + int64_t freeSize = pDetail->bytes.memRealloc.origSucc + pDetail->bytes.memFree.succ; + + if (allocSize != freeSize) { + uError("%s MemPool %s stat check failed, allocSize:%" PRId64 ", freeSize:%" PRId64, detailName, pPool->name, allocSize, freeSize); + ASSERT(0); + } else { + uDebug("%s MemPool %s stat check succeed, allocSize:%" PRId64 ", freeSize:%" PRId64, detailName, pPool->name, allocSize, freeSize); + } + } + } +} + + void mpCheckUpateCfg(void) { taosRLockLatch(&gMPMgmt.poolLock); int32_t poolNum = taosArrayGetSize(gMPMgmt.poolList); @@ -618,9 +671,9 @@ void* mpMgmtThreadFunc(void* param) { } if (atomic_load_8(&gMPMgmt.msgQueue.midLevelRetire)) { - (*gMPMgmt.msgQueue.pPool->cfg.cb.retireJobsFp)(atomic_load_64(&gMPMgmt.msgQueue.pPool->retireUnit), false, TSDB_CODE_QRY_QUERY_MEM_EXHAUSTED); + (*gMPMgmt.msgQueue.pPool->cfg.cb.retireJobsFp)(gMPMgmt.msgQueue.pPool, atomic_load_64(&gMPMgmt.msgQueue.pPool->retireUnit), false, TSDB_CODE_QRY_QUERY_MEM_EXHAUSTED); } else if (atomic_load_8(&gMPMgmt.msgQueue.lowLevelRetire)) { - (*gMPMgmt.msgQueue.pPool->cfg.cb.retireJobsFp)(atomic_load_64(&gMPMgmt.msgQueue.pPool->retireUnit), true, TSDB_CODE_QRY_QUERY_MEM_EXHAUSTED); + (*gMPMgmt.msgQueue.pPool->cfg.cb.retireJobsFp)(gMPMgmt.msgQueue.pPool, atomic_load_64(&gMPMgmt.msgQueue.pPool->retireUnit), true, TSDB_CODE_QRY_QUERY_MEM_EXHAUSTED); } mpCheckUpateCfg(); @@ -733,6 +786,8 @@ int32_t taosMemPoolOpen(char* poolName, SMemPoolCfg* cfg, void** poolHandle) { taosWUnLockLatch(&gMPMgmt.poolLock); + uInfo("mempool %s opened", poolName); + _return: if (TSDB_CODE_SUCCESS != code) { @@ -767,6 +822,8 @@ void taosMemPoolDestroySession(void* poolHandle, void* session) { taosMemPoolPrintStat(pPool, pSession, "DestroySession"); + mpCheckStatDetail(pPool, pSession, "DestroySession"); + TAOS_MEMSET(pSession, 0, sizeof(*pSession)); mpPushIdleNode(pPool, &pPool->sessionCache, (SMPListNode*)pSession); @@ -867,8 +924,17 @@ void *taosMemPoolRealloc(void* poolHandle, void* session, void *ptr, int64_t siz terrno = mpRealloc(pPool, pSession, &ptr, size, &input.origSize); - MP_SET_FLAG(input.procFlags, ((ptr || 0 == size) ? MP_STAT_PROC_FLAG_RES_SUCC : MP_STAT_PROC_FLAG_RES_FAIL)); - mpLogStat(pPool, pSession, E_MP_STAT_LOG_MEM_REALLOC, &input); + if (ptr || 0 == size) { + MP_SET_FLAG(input.procFlags, MP_STAT_PROC_FLAG_RES_SUCC); + mpLogStat(pPool, pSession, E_MP_STAT_LOG_MEM_REALLOC, &input); + } else { + MP_SET_FLAG(input.procFlags, MP_STAT_PROC_FLAG_RES_FAIL); + mpLogStat(pPool, pSession, E_MP_STAT_LOG_MEM_REALLOC, &input); + + input.procFlags = 0; + MP_SET_FLAG(input.procFlags, MP_STAT_PROC_FLAG_RES_SUCC); + mpLogStat(pPool, pSession, E_MP_STAT_LOG_MEM_FREE, &input); + } _return: @@ -1005,6 +1071,10 @@ _return: void taosMemPoolClose(void* poolHandle) { SMemPool* pPool = (SMemPool*)poolHandle; + taosMemPoolPrintStat(poolHandle, NULL, "PoolClose"); + + mpCheckStatDetail(pPool, NULL, "PoolClose"); + taosMemoryFree(pPool->name); mpDestroyCacheGroup(&pPool->sessionCache); } @@ -1052,6 +1122,28 @@ int32_t taosMemPoolCallocJob(uint64_t jobId, void** ppJob) { return TSDB_CODE_SUCCESS; } +void taosMemPoolGetUsedSizeBegin(void* poolHandle, int64_t* usedSize, bool* needEnd) { + SMemPool* pPool = (SMemPool*)poolHandle; + if ((atomic_load_64(&pPool->cfg.maxSize) - atomic_load_64(&pPool->allocMemSize)) <= MP_CFG_UPDATE_MIN_RESERVE_SIZE) { + *needEnd = true; + taosWLockLatch(&pPool->cfgLock); + } else { + *needEnd = false; + } + + *usedSize = atomic_load_64(&pPool->allocMemSize); +} + +void taosMemPoolGetUsedSizeEnd(void* poolHandle) { + SMemPool* pPool = (SMemPool*)poolHandle; + taosWUnLockLatch(&pPool->cfgLock); +} + +bool taosMemPoolNeedRetireJob(void* poolHandle) { + SMemPool* pPool = (SMemPool*)poolHandle; + return atomic_load_64(&pPool->allocMemSize) >= atomic_load_64(&pPool->retireThreshold[0]); +} + void taosAutoMemoryFree(void *ptr) { if (NULL != threadPoolHandle) { diff --git a/source/util/test/memPoolTest.cpp b/source/util/test/memPoolTest.cpp index 845c01728a..4f6acf1bd7 100644 --- a/source/util/test/memPoolTest.cpp +++ b/source/util/test/memPoolTest.cpp @@ -42,11 +42,11 @@ namespace { #define MPT_PRINTF (void)printf -#define MPT_MAX_MEM_ACT_TIMES 10000 -#define MPT_MAX_SESSION_NUM 256 -#define MPT_MAX_JOB_NUM 200 +#define MPT_MAX_MEM_ACT_TIMES 1000 +#define MPT_MAX_SESSION_NUM 100 +#define MPT_MAX_JOB_NUM 100 #define MPT_MAX_THREAD_NUM 100 -#define MPT_MAX_JOB_LOOP_TIMES 1000 +#define MPT_MAX_JOB_LOOP_TIMES 100 #define MPT_DEFAULT_RESERVE_MEM_PERCENT 20 #define MPT_MIN_RESERVE_MEM_SIZE (512 * 1048576UL) @@ -73,9 +73,10 @@ threadlocal void* mptThreadPoolSession = NULL; #define mptMemoryCalloc(_num, _size) ((NULL != mptThreadPoolHandle) ? (taosMemPoolCalloc(mptThreadPoolHandle, mptThreadPoolSession, _num, _size, __FILE__, __LINE__)) : (taosMemCalloc(_num, _size))) #define mptMemoryRealloc(_ptr, _size) ((NULL != mptThreadPoolHandle) ? (taosMemPoolRealloc(mptThreadPoolHandle, mptThreadPoolSession, _ptr, _size, __FILE__, __LINE__)) : (taosMemRealloc(_ptr, _size))) #define mptStrdup(_ptr) ((NULL != mptThreadPoolHandle) ? (taosMemPoolStrdup(mptThreadPoolHandle, mptThreadPoolSession, _ptr, __FILE__, __LINE__)) : (taosStrdupi(_ptr))) +#define mptStrndup(_ptr, _size) ((NULL != mptThreadPoolHandle) ? (taosMemPoolStrndup(mptThreadPoolHandle, mptThreadPoolSession, _ptr, _size, (char*)__FILE__, __LINE__)) : (taosStrndupi(_ptr, _size))) #define mptMemoryFree(_ptr) ((NULL != mptThreadPoolHandle) ? (taosMemPoolFree(mptThreadPoolHandle, mptThreadPoolSession, _ptr, __FILE__, __LINE__)) : (taosMemFree(_ptr))) #define mptMemorySize(_ptr) ((NULL != mptThreadPoolHandle) ? (taosMemPoolGetMemorySize(mptThreadPoolHandle, mptThreadPoolSession, _ptr, __FILE__, __LINE__)) : (taosMemSize(_ptr))) -#define mptMemoryTrim(_size) ((NULL != mptThreadPoolHandle) ? (taosMemPoolTrim(mptThreadPoolHandle, mptThreadPoolSession, _size, __FILE__, __LINE__)) : (taosMemTrim(_size))) +#define mptMemoryTrim(_size, _trimed) ((NULL != mptThreadPoolHandle) ? (taosMemPoolTrim(mptThreadPoolHandle, mptThreadPoolSession, _size, __FILE__, __LINE__, _trimed)) : (taosMemTrim(_size, _trimed))) #define mptMemoryMallocAlign(_alignment, _size) ((NULL != mptThreadPoolHandle) ? (taosMemPoolMallocAlign(mptThreadPoolHandle, mptThreadPoolSession, _alignment, _size, __FILE__, __LINE__)) : (taosMemMallocAlign(_alignment, _size))) enum { @@ -101,11 +102,14 @@ typedef struct SMPTJobInfo { typedef struct { - int32_t taskMaxActTimes; + int32_t taskActTimes; int32_t caseLoopTimes; + int32_t jobExecTimes; + int32_t jobNum; + int32_t jobTaskNum; int64_t maxSingleAllocSize; char* pSrcString; - bool printTestInfo; + bool printExecDetail; bool printInputRow; } SMPTestCtrl; @@ -115,6 +119,7 @@ typedef struct { } SMPTestMemInfo; typedef struct { + uint64_t taskId; SRWLatch taskExecLock; bool taskFinished; @@ -124,14 +129,15 @@ typedef struct { SMPStatDetail stat; int32_t memIdx; - SMPTestMemInfo pMemList[MPT_MAX_MEM_ACT_TIMES]; + SMPTestMemInfo* pMemList; int64_t npSize; int32_t npMemIdx; - SMPTestMemInfo npMemList[MPT_MAX_MEM_ACT_TIMES]; + SMPTestMemInfo* npMemList; bool taskFreed; + int32_t lastAct; } SMPTestTaskCtx; typedef struct { @@ -151,12 +157,13 @@ typedef struct { typedef struct { int64_t jobQuota; bool autoPoolSize; - int32_t poolSize; + int64_t poolSize; int32_t threadNum; int32_t randTask; } SMPTestParam; typedef struct { + int32_t idx; TdThread threadFp; bool allJobs; bool autoJob; @@ -164,11 +171,12 @@ typedef struct { typedef struct SMPTestCtx { int64_t qId; + int64_t tId; SHashObj* pJobs; BoundedQueue* pJobQueue; void* memPoolHandle; SMPTestThread threadCtxs[MPT_MAX_THREAD_NUM]; - SMPTestJobCtx jobCtxs[MPT_MAX_JOB_NUM]; + SMPTestJobCtx* jobCtxs; SMPTestParam param; } SMPTestCtx; @@ -206,6 +214,9 @@ void mptInitLogFile() { tsAsyncLog = 0; qDebugFlag = 159; + uDebugFlag = 159; + tsNumOfLogLines = INT32_MAX; + tsLogKeepDays = 10; TAOS_STRCPY(tsLogDir, TD_LOG_DIR_PATH); if (taosInitLog(defaultLogFileNamePrefix, maxLogFileNum) < 0) { @@ -223,32 +234,51 @@ static bool mptJobMemSizeCompFn(void* l, void* r, void* param) { return atomic_load_64(&right->memInfo->allocMemSize) < atomic_load_64(&left->memInfo->allocMemSize); } +void mptDeleteJobQueueData(void* pData) { + SMPTJobInfo* pJob = (SMPTJobInfo*)pData; + taosHashRelease(mptCtx.pJobs, pJob); +} void mptInit() { mptInitLogFile(); + + mptCtrl.caseLoopTimes = 1; + mptCtrl.taskActTimes = 50; + mptCtrl.maxSingleAllocSize = 104857600; + mptCtrl.jobNum = 100; + mptCtrl.jobExecTimes = 1; + mptCtrl.jobTaskNum = 1; mptCtx.pJobs = taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK); ASSERT_TRUE(NULL != mptCtx.pJobs); - mptCtx.pJobQueue = createBoundedQueue(10000, mptJobMemSizeCompFn, NULL, NULL); + mptCtx.pJobQueue = createBoundedQueue(10000, mptJobMemSizeCompFn, mptDeleteJobQueueData, NULL); ASSERT_TRUE(NULL != mptCtx.pJobQueue); - - mptCtrl.caseLoopTimes = 100; - mptCtrl.taskMaxActTimes = 1000; - mptCtrl.maxSingleAllocSize = 104857600; + mptCtx.jobCtxs = (SMPTestJobCtx*)taosMemCalloc(MPT_MAX_JOB_NUM, sizeof(*mptCtx.jobCtxs)); + ASSERT_TRUE(NULL != mptCtx.jobCtxs); + mptCtrl.pSrcString = (char*)taosMemoryMalloc(mptCtrl.maxSingleAllocSize); ASSERT_TRUE(NULL != mptCtrl.pSrcString); memset(mptCtrl.pSrcString, 'P', mptCtrl.maxSingleAllocSize - 1); mptCtrl.pSrcString[mptCtrl.maxSingleAllocSize - 1] = 0; } -void mptDestroyTaskCtx(SMPTestTaskCtx* pTask) { +void mptDestroyTaskCtx(SMPTestTaskCtx* pTask, void* pSession) { + assert(mptCtx.memPoolHandle); + assert(pSession); + + mptEnableMemoryPoolUsage(mptCtx.memPoolHandle, pSession); for (int32_t i = 0; i < pTask->memIdx; ++i) { - taosMemFreeClear(pTask->pMemList[i].p); + mptMemoryFree(pTask->pMemList[i].p); + pTask->pMemList[i].p = NULL; } + mptDisableMemoryPoolUsage(); + for (int32_t i = 0; i < pTask->npMemIdx; ++i) { taosMemFreeClear(pTask->npMemList[i].p); } + taosMemFreeClear(pTask->pMemList); + taosMemFreeClear(pTask->npMemList); } @@ -332,8 +362,18 @@ _return: -void mptInitTask(int32_t idx, uint64_t qId, uint64_t tId, int32_t eId, SMPTestJobCtx* pJob) { - ASSERT_TRUE(0 == mptInitSession(qId, tId, eId, pJob, &pJob->pSessions[idx])); +void mptInitTask(int32_t idx, int32_t eId, SMPTestJobCtx* pJob) { + pJob->taskCtxs[idx].taskId = atomic_add_fetch_64(&mptCtx.tId, 1); + + ASSERT_TRUE(0 == mptInitSession(pJob->jobId, pJob->taskCtxs[idx].taskId, eId, pJob, &pJob->pSessions[idx])); + + pJob->taskCtxs[idx].pMemList = (SMPTestMemInfo*)taosMemCalloc(MPT_MAX_MEM_ACT_TIMES, sizeof(*pJob->taskCtxs[idx].pMemList)); + ASSERT_TRUE(NULL != pJob->taskCtxs[idx].pMemList); + + pJob->taskCtxs[idx].npMemList = (SMPTestMemInfo*)taosMemCalloc(MPT_MAX_MEM_ACT_TIMES, sizeof(*pJob->taskCtxs[idx].npMemList)); + ASSERT_TRUE(NULL != pJob->taskCtxs[idx].npMemList); + + uDebug("JOB:0x%x TASK:0x%x idx:%d initialized", pJob->jobId, pJob->taskCtxs[idx].taskId, idx); } void mptInitJob(int32_t idx) { @@ -341,30 +381,41 @@ void mptInitJob(int32_t idx) { pJobCtx->jobIdx = idx; pJobCtx->jobId = atomic_add_fetch_64(&mptCtx.qId, 1); - pJobCtx->taskNum = (taosRand() % MPT_MAX_SESSION_NUM) + 1; + pJobCtx->taskNum = (mptCtrl.jobTaskNum) ? mptCtrl.jobTaskNum : (taosRand() % MPT_MAX_SESSION_NUM) + 1; for (int32_t i = 0; i < pJobCtx->taskNum; ++i) { - mptInitTask(i, pJobCtx->jobId, i, 0, pJobCtx); + mptInitTask(i, 0, pJobCtx); + assert(pJobCtx->pJob); } + + uDebug("JOB:0x%x idx:%d initialized, taskNum:%d", pJobCtx->jobId, idx, pJobCtx->taskNum); } int32_t mptDestroyJob(SMPTestJobCtx* pJobCtx, bool reset) { if (taosWTryLockLatch(&pJobCtx->jobExecLock)) { return -1; } - + + uint64_t jobId = pJobCtx->jobId; + for (int32_t i = 0; i < pJobCtx->taskNum; ++i) { + mptDestroyTaskCtx(&pJobCtx->taskCtxs[i], pJobCtx->pSessions[i]); + taosMemPoolDestroySession(mptCtx.memPoolHandle, pJobCtx->pSessions[i]); + } + + uDebug("JOB:0x%x idx:%d destroyed, code:0x%x", pJobCtx->jobId, pJobCtx->jobIdx, pJobCtx->pJob->errCode); + mptDestroyJobInfo(pJobCtx->pJob); (void)taosHashRemove(mptCtx.pJobs, &pJobCtx->jobId, sizeof(pJobCtx->jobId)); - for (int32_t i = 0; i < pJobCtx->taskNum; ++i) { - taosMemPoolDestroySession(mptCtx.memPoolHandle, pJobCtx->pSessions[i]); - mptDestroyTaskCtx(&pJobCtx->taskCtxs[i]); - } + if (reset) { + int32_t jobIdx = pJobCtx->jobIdx; memset((char*)pJobCtx + sizeof(pJobCtx->jobExecLock), 0, sizeof(SMPTestJobCtx) - sizeof(pJobCtx->jobExecLock)); - mptInitJob(pJobCtx->jobIdx); + mptInitJob(jobIdx); } taosWUnLockLatch(&pJobCtx->jobExecLock); + MPT_PRINTF(" JOB:0x%x retired\n", jobId); + return 0; } @@ -374,9 +425,11 @@ void mptCheckCompareJobInfo(SMPTestJobCtx* pJobCtx) { int32_t mptResetJob(SMPTestJobCtx* pJobCtx) { if (atomic_load_8(&pJobCtx->pJob->retired)) { - if (0 == atomic_load_32(&pJobCtx->taskRunningNum)) { + int32_t taskRunning = atomic_load_32(&pJobCtx->taskRunningNum); + if (0 == taskRunning) { return mptDestroyJob(pJobCtx, true); } else { + uDebug("JOB:0x%x retired but will not destroy cause of task running, num:%d", pJobCtx->jobId, taskRunning); return -1; } } @@ -388,9 +441,27 @@ void mptRetireJob(SMPTJobInfo* pJob) { SMPTestJobCtx* pCtx = (SMPTestJobCtx*)pJob->pCtx; mptCheckCompareJobInfo(pCtx); + + mptResetJob(pCtx); } -int32_t mptGetMemPoolMaxMemSize(int64_t totalSize, int64_t* maxSize) { +int32_t mptGetMemPoolMaxMemSize(void* pHandle, int64_t* maxSize) { + int64_t freeSize = 0; + int64_t usedSize = 0; + bool needEnd = false; + + taosMemPoolGetUsedSizeBegin(pHandle, &usedSize, &needEnd); + int32_t code = taosGetSysAvailMemory(&freeSize); + if (needEnd) { + taosMemPoolGetUsedSizeEnd(pHandle); + } + + if (TSDB_CODE_SUCCESS != code) { + uError("get system avaiable memory size failed, error: 0x%x", code); + return code; + } + + int64_t totalSize = freeSize + usedSize; int64_t reserveSize = TMAX(totalSize * MPT_DEFAULT_RESERVE_MEM_PERCENT / 100 / 1048576UL * 1048576UL, MPT_MIN_RESERVE_MEM_SIZE); int64_t availSize = (totalSize - reserveSize) / 1048576UL * 1048576UL; if (availSize < MPT_MIN_MEM_POOL_SIZE) { @@ -398,12 +469,14 @@ int32_t mptGetMemPoolMaxMemSize(int64_t totalSize, int64_t* maxSize) { return TSDB_CODE_QRY_TOO_FEW_AVAILBLE_MEM; } + uDebug("new pool maxSize:%" PRId64 ", usedSize:%" PRId64 ", freeSize:%" PRId64, availSize, usedSize, freeSize); + *maxSize = availSize; return TSDB_CODE_SUCCESS; } -int32_t mptGetQueryMemPoolMaxSize(int64_t* pMaxSize, bool* autoMaxSize) { +int32_t mptGetQueryMemPoolMaxSize(void* pHandle, int64_t* pMaxSize, bool* autoMaxSize) { if (!mptCtx.param.autoPoolSize && mptCtx.param.poolSize > 0) { *pMaxSize = mptCtx.param.poolSize * 1048576UL; *autoMaxSize = false; @@ -411,14 +484,7 @@ int32_t mptGetQueryMemPoolMaxSize(int64_t* pMaxSize, bool* autoMaxSize) { return TSDB_CODE_SUCCESS; } - int64_t memSize = 0; - int32_t code = taosGetSysAvailMemory(&memSize); - if (TSDB_CODE_SUCCESS != code) { - uError("get system avaiable memory size failed, error: 0x%x", code); - return code; - } - - code = mptGetMemPoolMaxMemSize(memSize, pMaxSize); + int32_t code = mptGetMemPoolMaxMemSize(pHandle, pMaxSize); if (TSDB_CODE_SUCCESS != code) { return code; } @@ -438,7 +504,7 @@ void mptCheckUpateCfgCb(void* pHandle, void* cfg) { int64_t maxSize = 0; bool autoMaxSize = false; - int32_t code = mptGetQueryMemPoolMaxSize(&maxSize, &autoMaxSize); + int32_t code = mptGetQueryMemPoolMaxSize(pHandle, &maxSize, &autoMaxSize); if (TSDB_CODE_SUCCESS != code) { pCfg->maxSize = 0; uError("get query memPool maxSize failed, reset maxSize to %" PRId64, pCfg->maxSize); @@ -452,15 +518,21 @@ void mptCheckUpateCfgCb(void* pHandle, void* cfg) { } } -void mptLowLevelRetire(int64_t retireSize, int32_t errCode) { +void mptLowLevelRetire(void* pHandle, int64_t retireSize, int32_t errCode) { SMPTJobInfo* pJob = (SMPTJobInfo*)taosHashIterate(mptCtx.pJobs, NULL); while (pJob) { + if (!taosMemPoolNeedRetireJob(pHandle)) { + taosHashCancelIterate(mptCtx.pJobs, pJob); + return; + } + + uint64_t jobId = pJob->memInfo->jobId; int64_t aSize = atomic_load_64(&pJob->memInfo->allocMemSize); if (aSize >= retireSize && 0 == atomic_val_compare_exchange_32(&pJob->errCode, 0, errCode) && 0 == atomic_val_compare_exchange_8(&pJob->retired, 0, 1)) { mptRetireJob(pJob); uDebug("QID:0x%" PRIx64 " job retired cause of low level memory retire, usedSize:%" PRId64 ", retireSize:%" PRId64, - pJob->memInfo->jobId, aSize, retireSize); + jobId, aSize, retireSize); taosHashCancelIterate(mptCtx.pJobs, pJob); break; @@ -470,13 +542,16 @@ void mptLowLevelRetire(int64_t retireSize, int32_t errCode) { } } -void mptMidLevelRetire(int64_t retireSize, int32_t errCode) { +void mptMidLevelRetire(void* pHandle, int64_t retireSize, int32_t errCode) { SMPTJobInfo* pJob = (SMPTJobInfo*)taosHashIterate(mptCtx.pJobs, NULL); PriorityQueueNode qNode; while (NULL != pJob) { if (0 == atomic_load_8(&pJob->retired)) { + assert(pJob == taosHashAcquire(mptCtx.pJobs, &pJob->memInfo->jobId, sizeof(pJob->memInfo->jobId))); qNode.data = pJob; - (void)taosBQPush(mptCtx.pJobQueue, &qNode); + if (NULL == taosBQPush(mptCtx.pJobQueue, &qNode)) { + taosHashRelease(mptCtx.pJobs, pJob); + } } pJob = (SMPTJobInfo*)taosHashIterate(mptCtx.pJobs, pJob); @@ -485,6 +560,10 @@ void mptMidLevelRetire(int64_t retireSize, int32_t errCode) { PriorityQueueNode* pNode = NULL; int64_t retiredSize = 0; while (retiredSize < retireSize) { + if (!taosMemPoolNeedRetireJob(pHandle)) { + break; + } + pNode = taosBQTop(mptCtx.pJobQueue); if (NULL == pNode) { break; @@ -498,13 +577,14 @@ void mptMidLevelRetire(int64_t retireSize, int32_t errCode) { if (0 == atomic_val_compare_exchange_32(&pJob->errCode, 0, errCode) && 0 == atomic_val_compare_exchange_8(&pJob->retired, 0, 1)) { int64_t aSize = atomic_load_64(&pJob->memInfo->allocMemSize); + uint64_t jobId = pJob->memInfo->jobId; mptRetireJob(pJob); - uDebug("QID:0x%" PRIx64 " job retired cause of mid level memory retire, usedSize:%" PRId64 ", retireSize:%" PRId64, - pJob->memInfo->jobId, aSize, retireSize); - retiredSize += aSize; + + uDebug("QID:0x%" PRIx64 " job retired cause of mid level memory retire, usedSize:%" PRId64 ", retireSize:%" PRId64 ", retiredSize:%" PRId64, + jobId, aSize, retireSize, retiredSize); } taosBQPop(mptCtx.pJobQueue); @@ -514,8 +594,8 @@ void mptMidLevelRetire(int64_t retireSize, int32_t errCode) { } -void mptRetireJobsCb(int64_t retireSize, bool lowLevelRetire, int32_t errCode) { - (lowLevelRetire) ? mptLowLevelRetire(retireSize, errCode) : mptMidLevelRetire(retireSize, errCode); +void mptRetireJobsCb(void* pHandle, int64_t retireSize, bool lowLevelRetire, int32_t errCode) { + (lowLevelRetire) ? mptLowLevelRetire(pHandle, retireSize, errCode) : mptMidLevelRetire(pHandle, retireSize, errCode); } @@ -527,11 +607,9 @@ void mptRetireJobCb(SMemPoolJob* mpJob, int32_t errCode) { } if (0 == atomic_val_compare_exchange_32(&pJob->errCode, 0, errCode) && 0 == atomic_val_compare_exchange_8(&pJob->retired, 0, 1)) { - mptRetireJob(pJob); - - uInfo("QID:0x%" PRIx64 " retired directly, errCode: 0x%x", mpJob->jobId, errCode); + uInfo("QID:0x%" PRIx64 " mark retired, errCode: 0x%x, allocSize:%" PRId64, mpJob->jobId, errCode, atomic_load_64(&pJob->memInfo->allocMemSize)); } else { - uDebug("QID:0x%" PRIx64 " already retired, retired: %d, errCode: 0x%x", mpJob->jobId, atomic_load_8(&pJob->retired), atomic_load_32(&pJob->errCode)); + uDebug("QID:0x%" PRIx64 " already retired, retired: %d, errCode: 0x%x, allocSize:%" PRId64, mpJob->jobId, atomic_load_8(&pJob->retired), atomic_load_32(&pJob->errCode), atomic_load_64(&pJob->memInfo->allocMemSize)); } } @@ -548,34 +626,44 @@ void mptInitPool(void) { } cfg.threadNum = 10; //TODO cfg.evicPolicy = E_EVICT_AUTO; //TODO + cfg.chunkSize = 1048576; cfg.jobQuota = mptCtx.param.jobQuota; cfg.cb.retireJobsFp = mptRetireJobsCb; cfg.cb.retireJobFp = mptRetireJobCb; cfg.cb.cfgUpdateFp = mptCheckUpateCfgCb; - ASSERT_TRUE(0 == taosMemPoolOpen("SingleThreadTest", &cfg, &mptCtx.memPoolHandle)); + ASSERT_TRUE(0 == taosMemPoolOpen("testQMemPool", &cfg, &mptCtx.memPoolHandle)); } -void mptSimulateAction(SMPTestTaskCtx* pTask) { +void mptSimulateAction(SMPTestJobCtx* pJobCtx, SMPTestTaskCtx* pTask) { int32_t actId = 0; bool actDone = false; int32_t size = taosRand() % mptCtrl.maxSingleAllocSize; while (!actDone) { - actId = taosRand() % 9; + actId = taosRand() % 10; switch (actId) { case 0: { // malloc if (pTask->memIdx >= MPT_MAX_MEM_ACT_TIMES) { break; } - pTask->pMemList[pTask->memIdx].p = taosMemoryMalloc(size); + pTask->pMemList[pTask->memIdx].p = mptMemoryMalloc(size); + pTask->stat.times.memMalloc.exec++; + pTask->stat.bytes.memMalloc.exec+=size; if (NULL == pTask->pMemList[pTask->memIdx].p) { + pTask->stat.bytes.memMalloc.fail+=size; + uError("JOB:0x%x TASK:0x%x mpMalloc %d failed, terrno:0x%x", pJobCtx->jobId, pTask->taskId, size, terrno); return; } + + pTask->stat.bytes.memMalloc.succ+=size; + *(char*)pTask->pMemList[pTask->memIdx].p = 'a' + taosRand() % 26; + *((char*)pTask->pMemList[pTask->memIdx].p + size -1) = 'a' + taosRand() % 26; pTask->pMemList[pTask->memIdx].size = size; pTask->memIdx++; + pTask->lastAct = actId; actDone = true; break; } @@ -584,13 +672,22 @@ void mptSimulateAction(SMPTestTaskCtx* pTask) { break; } - pTask->pMemList[pTask->memIdx].p = taosMemoryCalloc(1, size); + pTask->pMemList[pTask->memIdx].p = mptMemoryCalloc(1, size); + pTask->stat.times.memCalloc.exec++; + pTask->stat.bytes.memCalloc.exec+=size; if (NULL == pTask->pMemList[pTask->memIdx].p) { + pTask->stat.bytes.memCalloc.fail+=size; + uError("JOB:0x%x TASK:0x%x mpCalloc %d failed, terrno:0x%x", pJobCtx->jobId, pTask->taskId, size, terrno); return; } + + pTask->stat.bytes.memCalloc.succ+=size; + *(char*)pTask->pMemList[pTask->memIdx].p = 'a' + taosRand() % 26; + *((char*)pTask->pMemList[pTask->memIdx].p + size -1) = 'a' + taosRand() % 26; pTask->pMemList[pTask->memIdx].size = size; pTask->memIdx++; + pTask->lastAct = actId; actDone = true; break; } @@ -599,13 +696,22 @@ void mptSimulateAction(SMPTestTaskCtx* pTask) { break; } - pTask->pMemList[pTask->memIdx].p = taosMemoryRealloc(NULL, size); + pTask->pMemList[pTask->memIdx].p = mptMemoryRealloc(NULL, size); + pTask->stat.times.memRealloc.exec++; + pTask->stat.bytes.memRealloc.exec+=size; if (NULL == pTask->pMemList[pTask->memIdx].p) { + pTask->stat.bytes.memRealloc.fail+=size; + uError("JOB:0x%x TASK:0x%x new mpRealloc %d failed, terrno:0x%x", pJobCtx->jobId, pTask->taskId, size, terrno); return; } + + pTask->stat.bytes.memRealloc.succ+=size; + *(char*)pTask->pMemList[pTask->memIdx].p = 'a' + taosRand() % 26; + *((char*)pTask->pMemList[pTask->memIdx].p + size -1) = 'a' + taosRand() % 26; pTask->pMemList[pTask->memIdx].size = size; pTask->memIdx++; + pTask->lastAct = actId; actDone = true; break; } @@ -615,12 +721,26 @@ void mptSimulateAction(SMPTestTaskCtx* pTask) { } assert(pTask->pMemList[pTask->memIdx - 1].p); - pTask->pMemList[pTask->memIdx - 1].p = taosMemoryRealloc(pTask->pMemList[pTask->memIdx - 1].p, size); + size++; + pTask->pMemList[pTask->memIdx - 1].p = mptMemoryRealloc(pTask->pMemList[pTask->memIdx - 1].p, size); + pTask->stat.times.memRealloc.exec++; + pTask->stat.bytes.memRealloc.exec+=size; if (NULL == pTask->pMemList[pTask->memIdx - 1].p) { + pTask->stat.bytes.memRealloc.fail+=size; + pTask->stat.bytes.memFree.succ+=pTask->pMemList[pTask->memIdx - 1].size; + uError("JOB:0x%x TASK:0x%x real mpRealloc %d failed, terrno:0x%x", pJobCtx->jobId, pTask->taskId, size, terrno); + pTask->memIdx--; + pTask->pMemList[pTask->memIdx].size = 0; return; } + + pTask->stat.bytes.memRealloc.succ+=size; + pTask->stat.bytes.memRealloc.origSucc+=pTask->pMemList[pTask->memIdx - 1].size; + *(char*)pTask->pMemList[pTask->memIdx - 1].p = 'a' + taosRand() % 26; + *((char*)pTask->pMemList[pTask->memIdx - 1].p + size -1) = 'a' + taosRand() % 26; pTask->pMemList[pTask->memIdx - 1].size = size; + pTask->lastAct = actId; actDone = true; break; } @@ -630,12 +750,16 @@ void mptSimulateAction(SMPTestTaskCtx* pTask) { } assert(pTask->pMemList[pTask->memIdx - 1].p); - pTask->pMemList[pTask->memIdx - 1].p = taosMemoryRealloc(pTask->pMemList[pTask->memIdx - 1].p, 0); - if (NULL != pTask->pMemList[pTask->memIdx - 1].p) { - taosMemoryFreeClear(pTask->pMemList[pTask->memIdx - 1].p); - } - + pTask->pMemList[pTask->memIdx - 1].p = mptMemoryRealloc(pTask->pMemList[pTask->memIdx - 1].p, 0); + pTask->stat.times.memRealloc.exec++; + pTask->stat.bytes.memRealloc.exec+=size; + assert(NULL == pTask->pMemList[pTask->memIdx - 1].p); + + pTask->stat.bytes.memRealloc.origSucc+=pTask->pMemList[pTask->memIdx - 1].size; + + pTask->pMemList[pTask->memIdx - 1].size = 0; pTask->memIdx--; + pTask->lastAct = actId; actDone = true; break; } @@ -644,15 +768,21 @@ void mptSimulateAction(SMPTestTaskCtx* pTask) { break; } + size /= 10; mptCtrl.pSrcString[size] = 0; - pTask->pMemList[pTask->memIdx].p = taosStrdup(mptCtrl.pSrcString); + pTask->pMemList[pTask->memIdx].p = mptStrdup(mptCtrl.pSrcString); mptCtrl.pSrcString[size] = 'W'; if (NULL == pTask->pMemList[pTask->memIdx].p) { + uError("JOB:0x%x TASK:0x%x mpStrdup %d failed, terrno:0x%x", pJobCtx->jobId, pTask->taskId, size, terrno); return; } + + *(char*)pTask->pMemList[pTask->memIdx].p = 'a' + taosRand() % 26; + *((char*)pTask->pMemList[pTask->memIdx].p + size - 1) = 'a' + taosRand() % 26; pTask->pMemList[pTask->memIdx].size = size + 1; pTask->memIdx++; + pTask->lastAct = actId; actDone = true; break; } @@ -661,13 +791,19 @@ void mptSimulateAction(SMPTestTaskCtx* pTask) { break; } - pTask->pMemList[pTask->memIdx].p = taosStrndup(mptCtrl.pSrcString, size); + size /= 10; + pTask->pMemList[pTask->memIdx].p = mptStrndup(mptCtrl.pSrcString, size); if (NULL == pTask->pMemList[pTask->memIdx].p) { + uError("JOB:0x%x TASK:0x%x mpStrndup %d failed, terrno:0x%x", pJobCtx->jobId, pTask->taskId, size, terrno); return; } + + *(char*)pTask->pMemList[pTask->memIdx].p = 'a' + taosRand() % 26; + *((char*)pTask->pMemList[pTask->memIdx].p + size -1) = 'a' + taosRand() % 26; pTask->pMemList[pTask->memIdx].size = size + 1; pTask->memIdx++; + pTask->lastAct = actId; actDone = true; break; } @@ -677,14 +813,37 @@ void mptSimulateAction(SMPTestTaskCtx* pTask) { } assert(pTask->pMemList[pTask->memIdx - 1].p); - taosMemoryFreeClear(pTask->pMemList[pTask->memIdx - 1].p); + mptMemoryFree(pTask->pMemList[pTask->memIdx - 1].p); + pTask->pMemList[pTask->memIdx - 1].p = NULL; pTask->memIdx--; + pTask->lastAct = actId; actDone = true; break; } case 8:{ // trim - taosMemoryTrim(0, NULL); + mptMemoryTrim(0, NULL); + pTask->lastAct = actId; + actDone = true; + break; + } + case 9: { // malloc_align + if (pTask->memIdx >= MPT_MAX_MEM_ACT_TIMES) { + break; + } + + pTask->pMemList[pTask->memIdx].p = mptMemoryMallocAlign(8, size); + if (NULL == pTask->pMemList[pTask->memIdx].p) { + uError("JOB:0x%x TASK:0x%x mpMallocAlign %d failed, terrno:0x%x", pJobCtx->jobId, pTask->taskId, size, terrno); + return; + } + + *(char*)pTask->pMemList[pTask->memIdx].p = 'a' + taosRand() % 26; + *((char*)pTask->pMemList[pTask->memIdx].p + size -1) = 'a' + taosRand() % 26; + + pTask->pMemList[pTask->memIdx].size = size; + pTask->memIdx++; + pTask->lastAct = actId; actDone = true; break; } @@ -696,13 +855,16 @@ void mptSimulateAction(SMPTestTaskCtx* pTask) { } void mptSimulateTask(SMPTestJobCtx* pJobCtx, SMPTestTaskCtx* pTask) { - int32_t actTimes = taosRand() % mptCtrl.taskMaxActTimes; + int32_t actTimes = mptCtrl.taskActTimes ? mptCtrl.taskActTimes : taosRand() % MPT_MAX_MEM_ACT_TIMES; + uDebug("JOB:0x%x TASK:0x%x will start total %d actions", pJobCtx->jobId, pTask->taskId, actTimes); + for (int32_t i = 0; i < actTimes; ++i) { if (atomic_load_8(&pJobCtx->pJob->retired)) { + uDebug("JOB:0x%x TASK:0x%x stop running cause of job already retired", pJobCtx->jobId, pTask->taskId); return; } - mptSimulateAction(pTask); + mptSimulateAction(pJobCtx, pTask); } } @@ -718,18 +880,31 @@ void mptSimulateOutTask(SMPTestJobCtx* pJobCtx, SMPTestTaskCtx* pTask) { if (pTask->npMemIdx >= MPT_MAX_MEM_ACT_TIMES) { return; } + + pTask->npMemList[pTask->npMemIdx].size = taosRand() % mptCtrl.maxSingleAllocSize; + pTask->npMemList[pTask->npMemIdx].p = taosMemMalloc(pTask->npMemList[pTask->npMemIdx].size); + if (NULL == pTask->npMemList[pTask->npMemIdx].p) { + uError("JOB:0x%x TASK:0x%x out malloc %" PRId64 " failed", pJobCtx->jobId, pTask->taskId, pTask->npMemList[pTask->npMemIdx].size); + pTask->npMemList[pTask->npMemIdx].size = 0; + return; + } + + uDebug("JOB:0x%x TASK:0x%x out malloced, size:%" PRId64 ", mIdx:%d", pJobCtx->jobId, pTask->taskId, pTask->npMemList[pTask->npMemIdx].size, pTask->npMemIdx); - pTask->npMemList[pTask->npMemIdx].p = taosMemoryMalloc(taosRand() % mptCtrl.maxSingleAllocSize); - pTask->npMemIdx++; + pTask->npMemIdx++; } void mptTaskRun(SMPTestJobCtx* pJobCtx, SMPTestTaskCtx* pCtx, int32_t idx) { + uDebug("JOB:0x%x TASK:0x%x start running", pJobCtx->jobId, pCtx->taskId); + if (atomic_load_8(&pJobCtx->pJob->retired)) { + uDebug("JOB:0x%x TASK:0x%x stop running cause of job already retired", pJobCtx->jobId, pCtx->taskId); return; } if (taosWTryLockLatch(&pCtx->taskExecLock)) { + uDebug("JOB:0x%x TASK:0x%x stop running cause of task already running", pJobCtx->jobId, pCtx->taskId); return; } @@ -739,38 +914,48 @@ void mptTaskRun(SMPTestJobCtx* pJobCtx, SMPTestTaskCtx* pCtx, int32_t idx) { mptSimulateTask(pJobCtx, pCtx); mptDisableMemoryPoolUsage(); - mptSimulateOutTask(pJobCtx, pCtx); - + if (!atomic_load_8(&pJobCtx->pJob->retired)) { + mptSimulateOutTask(pJobCtx, pCtx); + } + taosWUnLockLatch(&pCtx->taskExecLock); atomic_sub_fetch_32(&pJobCtx->taskRunningNum, 1); + + uDebug("JOB:0x%x TASK:0x%x end running", pJobCtx->jobId, pCtx->taskId); } void mptInitJobs() { - for (int32_t i = 0; i < MPT_MAX_JOB_NUM; ++i) { + int32_t jobNum = mptCtrl.jobNum ? mptCtrl.jobNum : MPT_MAX_JOB_NUM; + + memset(mptCtx.jobCtxs, 0, sizeof(*mptCtx.jobCtxs) * jobNum); + + for (int32_t i = 0; i < jobNum; ++i) { mptInitJob(i); } } void* mptThreadFunc(void* param) { SMPTestThread* pThread = (SMPTestThread*)param; + int32_t jobExecTimes = (mptCtrl.jobExecTimes) ? mptCtrl.jobExecTimes : taosRand() % MPT_MAX_JOB_LOOP_TIMES + 1; + int32_t jobNum = (mptCtrl.jobNum) ? mptCtrl.jobNum : MPT_MAX_JOB_NUM; - for (int32_t i = 0; i < MPT_MAX_JOB_NUM; ++i) { - SMPTestJobCtx* pJobCtx = &mptCtx.jobCtxs[i]; - if (mptResetJob(pJobCtx)) { - continue; - } + for (int32_t n = 0; n < jobExecTimes; ++n) { + MPT_PRINTF("Thread %d start the %d:%d job loops\n", pThread->idx, n, jobExecTimes); - if (taosRTryLockLatch(&pJobCtx->jobExecLock)) { - continue; - } + for (int32_t i = 0; i < jobNum; ++i) { + SMPTestJobCtx* pJobCtx = &mptCtx.jobCtxs[i]; + MPT_PRINTF(" Thread %d start %dth JOB 0x%x exec\n", pThread->idx, pJobCtx->jobIdx, pJobCtx->jobId); - int32_t jobExecTimes = taosRand() % MPT_MAX_JOB_LOOP_TIMES + 1; - for (int32_t n = 0; n < jobExecTimes; ++n) { - if (atomic_load_8(&pJobCtx->pJob->retired)) { - break; + if (mptResetJob(pJobCtx)) { + continue; } + + if (taosRTryLockLatch(&pJobCtx->jobExecLock)) { + continue; + } + if (mptCtx.param.randTask) { int32_t taskIdx = taosRand() % pJobCtx->taskNum; mptTaskRun(pJobCtx, &pJobCtx->taskCtxs[taskIdx], taskIdx); @@ -783,9 +968,15 @@ void* mptThreadFunc(void* param) { } mptTaskRun(pJobCtx, &pJobCtx->taskCtxs[m], m); } + + taosRUnLockLatch(&pJobCtx->jobExecLock); + + MPT_PRINTF(" Thread %d end %dth JOB 0x%x exec, retired:%d\n", pThread->idx, pJobCtx->jobIdx, pJobCtx->jobId, pJobCtx->pJob->retired); + + mptResetJob(pJobCtx); } - taosRUnLockLatch(&pJobCtx->jobExecLock); + MPT_PRINTF("Thread %d finish the %dth job loops\n", pThread->idx, n); } return NULL; @@ -795,18 +986,23 @@ void mptStartThreadTest(int32_t threadIdx) { TdThreadAttr thattr; ASSERT_EQ(0, taosThreadAttrInit(&thattr)); ASSERT_EQ(0, taosThreadAttrSetDetachState(&thattr, PTHREAD_CREATE_JOINABLE)); + mptCtx.threadCtxs[threadIdx].idx = threadIdx; ASSERT_EQ(0, taosThreadCreate(&mptCtx.threadCtxs[threadIdx].threadFp, &thattr, mptThreadFunc, &mptCtx.threadCtxs[threadIdx])); ASSERT_EQ(0, taosThreadAttrDestroy(&thattr)); } void mptDestroyJobs() { - for (int32_t i = 0; i < MPT_MAX_JOB_NUM; ++i) { + int32_t jobNum = mptCtrl.jobNum ? mptCtrl.jobNum : MPT_MAX_JOB_NUM; + + for (int32_t i = 0; i < jobNum; ++i) { mptDestroyJob(&mptCtx.jobCtxs[i], false); } } -void mptRunCase(SMPTestParam* param) { +void mptRunCase(SMPTestParam* param, int32_t times) { + MPT_PRINTF("\t case start the %dth running\n", times); + memcpy(&mptCtx.param, param, sizeof(SMPTestParam)); mptInitPool(); @@ -822,27 +1018,43 @@ void mptRunCase(SMPTestParam* param) { } mptDestroyJobs(); + + MPT_PRINTF("\t case end the %dth running\n", times); +} + +void mptPrintTestBeginInfo(char* caseName, SMPTestParam* param) { + MPT_PRINTF("Case [%s] begins:\n", caseName); + MPT_PRINTF("\t case loop times: %d\n", mptCtrl.caseLoopTimes); + MPT_PRINTF("\t task max act times: %d\n", mptCtrl.taskActTimes ? mptCtrl.taskActTimes : MPT_MAX_MEM_ACT_TIMES); + MPT_PRINTF("\t max single alloc size: %" PRId64 "\n", mptCtrl.maxSingleAllocSize); + MPT_PRINTF("\t job quota size: %" PRId64 "\n", param->jobQuota); + MPT_PRINTF("\t auto pool size: %d\n", param->autoPoolSize); + MPT_PRINTF("\t pool max size: %" PRId64 "\n", param->poolSize); + MPT_PRINTF("\t test thread num: %d\n", param->threadNum); + MPT_PRINTF("\t random exec task: %d\n", param->randTask); } } // namespace #if 1 #if 1 -TEST(FuncTest, SingleJobTest) { +TEST(FuncTest, SingleThreadTest) { char* caseName = "FuncTest:SingleThreadTest"; SMPTestParam param = {0}; param.autoPoolSize = true; - param.threadNum = 10; + param.threadNum = 1; + + mptPrintTestBeginInfo(caseName, ¶m); for (int32_t i = 0; i < mptCtrl.caseLoopTimes; ++i) { - mptRunCase(¶m); + mptRunCase(¶m, i); } } #endif #if 0 -TEST(FuncTest, MultiJobsTest) { - char* caseName = "FuncTest:SingleThreadTest"; +TEST(FuncTest, MultiThreadsTest) { + char* caseName = "FuncTest:MultiThreadsTest"; SMPTestParam param = {0}; for (int32_t i = 0; i < mptCtrl.caseLoopTimes; ++i) {