fix: add ut case

This commit is contained in:
dapan1121 2024-08-16 18:22:31 +08:00
parent 40af7fb836
commit b95ef255d0
8 changed files with 498 additions and 139 deletions

View File

@ -78,7 +78,7 @@ typedef struct SMPStatDetail {
typedef void (*mpDecConcSessionNum)(void);
typedef void (*mpIncConcSessionNum)(void);
typedef void (*mpSetConcSessionNum)(int32_t);
typedef void (*mpRetireJobs)(int64_t, bool, int32_t);
typedef void (*mpRetireJobs)(void*, int64_t, bool, int32_t);
typedef void (*mpRetireJob)(SMemPoolJob*, int32_t);
typedef void (*mpCfgUpdate)(void*, void*);
@ -120,6 +120,9 @@ void taosMemPoolDestroySession(void* poolHandle, void* session);
int32_t taosMemPoolCallocJob(uint64_t jobId, void** ppJob);
void taosMemPoolCfgUpdate(void* poolHandle, SMemPoolCfg* pCfg);
void taosMemPoolPrintStat(void* poolHandle, void* session, char* procName);
void taosMemPoolGetUsedSizeBegin(void* poolHandle, int64_t* usedSize, bool* needEnd);
void taosMemPoolGetUsedSizeEnd(void* poolHandle);
bool taosMemPoolNeedRetireJob(void* poolHandle);
#define taosMemPoolFreeClear(ptr) \
do { \
@ -147,7 +150,7 @@ extern threadlocal void* threadPoolSession;
#define taosStrndup(_ptr, _size) ((NULL != threadPoolHandle) ? (taosMemPoolStrndup(threadPoolHandle, threadPoolSession, _ptr, _size, (char*)__FILE__, __LINE__)) : (taosStrndupi(_ptr, _size)))
#define taosMemoryFree(_ptr) ((NULL != threadPoolHandle) ? (taosMemPoolFree(threadPoolHandle, threadPoolSession, _ptr, (char*)__FILE__, __LINE__)) : (taosMemFree(_ptr)))
#define taosMemorySize(_ptr) ((NULL != threadPoolHandle) ? (taosMemPoolGetMemorySize(threadPoolHandle, threadPoolSession, _ptr, (char*)__FILE__, __LINE__)) : (taosMemSize(_ptr)))
#define taosMemoryTrim(_size, _trimed) ((NULL != threadPoolHandle) ? (taosMemPoolTrim(threadPoolHandle, threadPoolSession, _size, (char*)__FILE__, __LINE__, _trimed)) : (taosMemTrim(_size)))
#define taosMemoryTrim(_size, _trimed) ((NULL != threadPoolHandle) ? (taosMemPoolTrim(threadPoolHandle, threadPoolSession, _size, (char*)__FILE__, __LINE__, _trimed)) : (taosMemTrim(_size, _trimed)))
#define taosMemoryMallocAlign(_alignment, _size) ((NULL != threadPoolHandle) ? (taosMemPoolMallocAlign(threadPoolHandle, threadPoolSession, _alignment, _size, (char*)__FILE__, __LINE__)) : (taosMemMallocAlign(_alignment, _size)))
#else
#define taosEnableMemoryPoolUsage(_pool, _session)
@ -159,9 +162,10 @@ extern threadlocal void* threadPoolSession;
#define taosMemoryCalloc(_num, _size) taosMemCalloc(_num, _size)
#define taosMemoryRealloc(_ptr, _size) taosMemRealloc(_ptr, _size)
#define taosStrdup(_ptr) taosStrdupi(_ptr)
#define taosStrndup(_ptr, _size) taosStrndupi(_ptr, _size)
#define taosMemoryFree(_ptr) taosMemFree(_ptr)
#define taosMemorySize(_ptr) taosMemSize(_ptr)
#define taosMemoryTrim(_size, _trimed) taosMemTrim(_size)
#define taosMemoryTrim(_size, _trimed) taosMemTrim(_size, _trimed)
#define taosMemoryMallocAlign(_alignment, _size) taosMemMallocAlign(_alignment, _size)
#endif

View File

@ -46,7 +46,7 @@ char *taosStrndupi(const char *ptr, int64_t size);
void taosMemFree(void *ptr);
int64_t taosMemSize(void *ptr);
void taosPrintBackTrace();
void taosMemTrim(int32_t size);
int32_t taosMemTrim(int32_t size, bool* trimed);
void *taosMemMallocAlign(uint32_t alignment, int64_t size);
#define TAOS_MEMSET(_s, _c, _n) ((void)memset(_s, _c, _n))

View File

@ -1,7 +1,23 @@
#include "qwInt.h"
#include "qworker.h"
int32_t qwGetMemPoolMaxMemSize(int64_t totalSize, int64_t* maxSize) {
int32_t qwGetMemPoolMaxMemSize(void* pHandle, int64_t* maxSize) {
int64_t freeSize = 0;
int64_t usedSize = 0;
bool needEnd = false;
taosMemPoolGetUsedSizeBegin(pHandle, &usedSize, &needEnd);
int32_t code = taosGetSysAvailMemory(&freeSize);
if (needEnd) {
taosMemPoolGetUsedSizeEnd(pHandle);
}
if (TSDB_CODE_SUCCESS != code) {
qError("get system avaiable memory size failed, error: 0x%x", code);
return code;
}
int64_t totalSize = freeSize + usedSize;
int64_t reserveSize = TMAX(totalSize * QW_DEFAULT_RESERVE_MEM_PERCENT / 100 / 1048576UL * 1048576UL, QW_MIN_RESERVE_MEM_SIZE);
int64_t availSize = (totalSize - reserveSize) / 1048576UL * 1048576UL;
if (availSize < QW_MIN_MEM_POOL_SIZE) {
@ -9,6 +25,8 @@ int32_t qwGetMemPoolMaxMemSize(int64_t totalSize, int64_t* maxSize) {
return TSDB_CODE_QRY_TOO_FEW_AVAILBLE_MEM;
}
uDebug("new pool maxSize:%" PRId64 ", usedSize:%" PRId64 ", freeSize:%" PRId64, availSize, usedSize, freeSize);
*maxSize = availSize;
return TSDB_CODE_SUCCESS;
@ -108,11 +126,11 @@ int32_t qwInitSession(QW_FPARAMS_DEF, SQWTaskCtx *ctx, void** ppSession) {
ctx->pJobInfo = pJob;
QW_ERR_JRET(taosMemPoolInitSession(gQueryMgmt.memPoolHandle, ppSession, pJob->memInfo));
char id[sizeof(tId) + sizeof(eId)] = {0};
QW_SET_TEID(id, tId, eId);
QW_ERR_JRET(taosMemPoolInitSession(gQueryMgmt.memPoolHandle, ppSession, pJob->memInfo));
code = taosHashPut(pJob->pSessions, id, sizeof(id), ppSession, POINTER_BYTES);
if (TSDB_CODE_SUCCESS != code) {
qError("fail to put session into query session hash, code: 0x%x", code);
@ -136,23 +154,27 @@ void qwRetireJobCb(SMemPoolJob* mpJob, int32_t errCode) {
}
if (0 == atomic_val_compare_exchange_32(&pJob->errCode, 0, errCode) && 0 == atomic_val_compare_exchange_8(&pJob->retired, 0, 1)) {
qwRetireJob(pJob);
qInfo("QID:0x%" PRIx64 " retired directly, errCode: 0x%x", mpJob->jobId, errCode);
qInfo("QID:0x%" PRIx64 " mark retired, errCode: 0x%x, allocSize:%" PRId64, mpJob->jobId, errCode, atomic_load_64(&pJob->memInfo->allocMemSize));
} else {
qDebug("QID:0x%" PRIx64 " already retired, retired: %d, errCode: 0x%x", mpJob->jobId, atomic_load_8(&pJob->retired), atomic_load_32(&pJob->errCode));
qDebug("QID:0x%" PRIx64 " already retired, retired: %d, errCode: 0x%x, allocSize:%" PRId64, mpJob->jobId, atomic_load_8(&pJob->retired), atomic_load_32(&pJob->errCode), atomic_load_64(&pJob->memInfo->allocMemSize));
}
}
void qwLowLevelRetire(int64_t retireSize, int32_t errCode) {
void qwLowLevelRetire(void* pHandle, int64_t retireSize, int32_t errCode) {
SQWJobInfo* pJob = (SQWJobInfo*)taosHashIterate(gQueryMgmt.pJobInfo, NULL);
while (pJob) {
if (!taosMemPoolNeedRetireJob(pHandle)) {
taosHashCancelIterate(gQueryMgmt.pJobInfo, pJob);
return;
}
uint64_t jobId = pJob->memInfo->jobId;
int64_t aSize = atomic_load_64(&pJob->memInfo->allocMemSize);
if (aSize >= retireSize && 0 == atomic_val_compare_exchange_32(&pJob->errCode, 0, errCode) && 0 == atomic_val_compare_exchange_8(&pJob->retired, 0, 1)) {
qwRetireJob(pJob);
qDebug("QID:0x%" PRIx64 " job retired cause of low level memory retire, usedSize:%" PRId64 ", retireSize:%" PRId64,
pJob->memInfo->jobId, aSize, retireSize);
jobId, aSize, retireSize);
taosHashCancelIterate(gQueryMgmt.pJobInfo, pJob);
break;
@ -162,7 +184,7 @@ void qwLowLevelRetire(int64_t retireSize, int32_t errCode) {
}
}
void qwMidLevelRetire(int64_t retireSize, int32_t errCode) {
void qwMidLevelRetire(void* pHandle, int64_t retireSize, int32_t errCode) {
SQWJobInfo* pJob = (SQWJobInfo*)taosHashIterate(gQueryMgmt.pJobInfo, NULL);
PriorityQueueNode qNode;
while (NULL != pJob) {
@ -175,8 +197,13 @@ void qwMidLevelRetire(int64_t retireSize, int32_t errCode) {
}
PriorityQueueNode* pNode = NULL;
uint64_t jobId = 0;
int64_t retiredSize = 0;
while (retiredSize < retireSize) {
if (!taosMemPoolNeedRetireJob(pHandle)) {
break;
}
pNode = taosBQTop(gQueryMgmt.retireCtx.pJobQueue);
if (NULL == pNode) {
break;
@ -190,11 +217,12 @@ void qwMidLevelRetire(int64_t retireSize, int32_t errCode) {
if (0 == atomic_val_compare_exchange_32(&pJob->errCode, 0, errCode) && 0 == atomic_val_compare_exchange_8(&pJob->retired, 0, 1)) {
int64_t aSize = atomic_load_64(&pJob->memInfo->allocMemSize);
jobId = pJob->memInfo->jobId;
qwRetireJob(pJob);
qDebug("QID:0x%" PRIx64 " job retired cause of mid level memory retire, usedSize:%" PRId64 ", retireSize:%" PRId64,
pJob->memInfo->jobId, aSize, retireSize);
jobId, aSize, retireSize);
retiredSize += aSize;
}
@ -206,11 +234,11 @@ void qwMidLevelRetire(int64_t retireSize, int32_t errCode) {
}
void qwRetireJobsCb(int64_t retireSize, bool lowLevelRetire, int32_t errCode) {
(lowLevelRetire) ? qwLowLevelRetire(retireSize, errCode) : qwMidLevelRetire(retireSize, errCode);
void qwRetireJobsCb(void* pHandle, int64_t retireSize, bool lowLevelRetire, int32_t errCode) {
(lowLevelRetire) ? qwLowLevelRetire(pHandle, retireSize, errCode) : qwMidLevelRetire(pHandle, retireSize, errCode);
}
int32_t qwGetQueryMemPoolMaxSize(int64_t* pMaxSize, bool* autoMaxSize) {
int32_t qwGetQueryMemPoolMaxSize(void* pHandle, int64_t* pMaxSize, bool* autoMaxSize) {
if (tsQueryBufferPoolSize > 0) {
*pMaxSize = tsQueryBufferPoolSize * 1048576UL;
*autoMaxSize = false;
@ -218,14 +246,7 @@ int32_t qwGetQueryMemPoolMaxSize(int64_t* pMaxSize, bool* autoMaxSize) {
return TSDB_CODE_SUCCESS;
}
int64_t memSize = 0;
int32_t code = taosGetSysAvailMemory(&memSize);
if (TSDB_CODE_SUCCESS != code) {
qError("get system avaiable memory size failed, error: 0x%x", code);
return code;
}
code = qwGetMemPoolMaxMemSize(memSize, pMaxSize);
int32_t code = qwGetMemPoolMaxMemSize(pHandle, pMaxSize);
if (TSDB_CODE_SUCCESS != code) {
return code;
}
@ -244,7 +265,7 @@ void qwCheckUpateCfgCb(void* pHandle, void* cfg) {
int64_t maxSize = 0;
bool autoMaxSize = false;
int32_t code = qwGetQueryMemPoolMaxSize(&maxSize, &autoMaxSize);
int32_t code = qwGetQueryMemPoolMaxSize(pHandle, &maxSize, &autoMaxSize);
if (TSDB_CODE_SUCCESS != code) {
pCfg->maxSize = 0;
qError("get query memPool maxSize failed, reset maxSize to %" PRId64, pCfg->maxSize);
@ -268,6 +289,8 @@ static bool qwJobMemSizeCompFn(void* l, void* r, void* param) {
return atomic_load_64(&right->memInfo->allocMemSize) < atomic_load_64(&left->memInfo->allocMemSize);
}
void qwDeleteJobQueueData(void* pData) {}
int32_t qwInitQueryPool(void) {
int32_t code = TSDB_CODE_SUCCESS;
@ -284,7 +307,7 @@ int32_t qwInitQueryPool(void) {
SMemPoolCfg cfg = {0};
int64_t maxSize = 0;
bool autoMaxSize = false;
code = qwGetQueryMemPoolMaxSize(&maxSize, &autoMaxSize);
code = qwGetQueryMemPoolMaxSize(NULL, &maxSize, &autoMaxSize);
if (TSDB_CODE_SUCCESS != code) {
return code;
}
@ -310,7 +333,7 @@ int32_t qwInitQueryPool(void) {
return terrno;
}
gQueryMgmt.retireCtx.pJobQueue = createBoundedQueue(QW_MAX_RETIRE_JOB_NUM, qwJobMemSizeCompFn, NULL, NULL);
gQueryMgmt.retireCtx.pJobQueue = createBoundedQueue(QW_MAX_RETIRE_JOB_NUM, qwJobMemSizeCompFn, qwDeleteJobQueueData, NULL);
if (NULL == gQueryMgmt.retireCtx.pJobQueue) {
qError("init job bounded queue failed, error:0x%x", terrno);
return terrno;

View File

@ -460,12 +460,13 @@ int64_t taosMemSize(void *ptr) {
#endif
}
void taosMemTrim(int32_t size) {
int32_t taosMemTrim(int32_t size, bool* trimed) {
#if defined(WINDOWS) || defined(DARWIN) || defined(_ALPINE)
// do nothing
return;
return TSDB_CODE_SUCCESS;
#else
(void)malloc_trim(size);
*trimed = malloc_trim(size);
return TSDB_CODE_SUCCESS;
#endif
}

View File

@ -38,6 +38,7 @@ extern "C" {
#define MP_RETIRE_LOW_THRESHOLD_PERCENT (0.85)
#define MP_RETIRE_UNIT_PERCENT (0.1)
#define MP_RETIRE_UNIT_MIN_SIZE (50 * 1048576UL)
#define MP_CFG_UPDATE_MIN_RESERVE_SIZE (50 * 1024 * 1048576UL)
// FLAGS AREA
@ -72,6 +73,7 @@ extern "C" {
// CTRL FUNC FLAGS
#define MP_CTRL_FLAG_PRINT_STAT (1 << 0)
#define MP_CTRL_FLAG_CHECK_STAT (1 << 1)
typedef enum EMPStatLogItem {
@ -194,7 +196,7 @@ typedef struct SMPSessionChunk {
typedef struct SMPSession {
SMPListNode list;
int64_t sessionId;
char* sessionId;
SMPJob* pJob;
SMPCtrlInfo ctrlInfo;
int64_t allocMemSize;
@ -242,6 +244,7 @@ typedef struct SMPChunkMgmt {
typedef struct SMemPool {
char *name;
int16_t slotId;
SRWLatch cfgLock;
SMemPoolCfg cfg;
int64_t retireThreshold[3];
int64_t retireUnit;

View File

@ -21,9 +21,13 @@
int32_t mpDirectAlloc(SMemPool* pPool, SMPSession* pSession, int64_t size, uint32_t alignment, void** ppRes) {
int32_t code = TSDB_CODE_SUCCESS;
MP_ERR_RET(mpChkQuotaOverflow(pPool, pSession, size));
void* res = NULL;
void* res = alignment ? taosMemMallocAlign(alignment, size) : taosMemMalloc(size);
taosRLockLatch(&pPool->cfgLock);
MP_ERR_JRET(mpChkQuotaOverflow(pPool, pSession, size));
res = alignment ? taosMemMallocAlign(alignment, size) : taosMemMalloc(size);
if (NULL != res) {
mpUpdateAllocSize(pPool, pSession, size);
} else {
@ -35,6 +39,10 @@ int32_t mpDirectAlloc(SMemPool* pPool, SMPSession* pSession, int64_t size, uint3
code = terrno;
}
_return:
taosRUnLockLatch(&pPool->cfgLock);
*ppRes = res;
MP_RET(code);
@ -49,17 +57,24 @@ void mpDirectFree(SMemPool* pPool, SMPSession* pSession, void *ptr, int64_t* ori
if (origSize) {
*origSize = oSize;
}
taosRLockLatch(&pPool->cfgLock); // tmp test
taosMemFree(ptr);
(void)atomic_sub_fetch_64(&pSession->allocMemSize, oSize);
(void)atomic_sub_fetch_64(&pSession->pJob->job.allocMemSize, oSize);
(void)atomic_sub_fetch_64(&pPool->allocMemSize, oSize);
taosRUnLockLatch(&pPool->cfgLock);
}
int32_t mpDirectRealloc(SMemPool* pPool, SMPSession* pSession, void **pPtr, int64_t size, int64_t* origSize) {
int32_t code = TSDB_CODE_SUCCESS;
MP_ERR_RET(mpChkQuotaOverflow(pPool, pSession, size));
taosRLockLatch(&pPool->cfgLock);
MP_ERR_JRET(mpChkQuotaOverflow(pPool, pSession, size - *origSize));
*pPtr = taosMemRealloc(*pPtr, size);
if (NULL != *pPtr) {
@ -68,6 +83,15 @@ int32_t mpDirectRealloc(SMemPool* pPool, SMPSession* pSession, void **pPtr, int6
MP_ERR_RET(terrno);
}
_return:
taosRUnLockLatch(&pPool->cfgLock);
if (code) {
mpDirectFree(pPool, pSession, *pPtr, NULL);
*pPtr = NULL;
}
return TSDB_CODE_SUCCESS;
}

View File

@ -171,6 +171,12 @@ int32_t mpUpdateCfg(SMemPool* pPool) {
MP_ERR_RET((*gMPFps[gMPMgmt.strategy].updateCfgFp)(pPool));
}
uDebug("memPool %s cfg updated, autoMaxSize:%d, maxSize:%" PRId64
", jobQuota:%" PRId64 ", threadNum:%d, retireThreshold:%" PRId64 "-%" PRId64 "-%" PRId64
", retireUnit:%" PRId64, pPool->name, pPool->cfg.autoMaxSize, pPool->cfg.maxSize,
pPool->cfg.jobQuota, pPool->cfg.threadNum, pPool->retireThreshold[0], pPool->retireThreshold[1],
pPool->retireThreshold[2], pPool->retireUnit);
return TSDB_CODE_SUCCESS;
}
@ -188,7 +194,7 @@ int32_t mpInit(SMemPool* pPool, char* poolName, SMemPoolCfg* cfg) {
MP_ERR_RET(mpUpdateCfg(pPool));
pPool->ctrlInfo.statFlags = MP_STAT_FLAG_LOG_ALL;
pPool->ctrlInfo.funcFlags = MP_CTRL_FLAG_PRINT_STAT;
pPool->ctrlInfo.funcFlags = MP_CTRL_FLAG_PRINT_STAT | MP_CTRL_FLAG_CHECK_STAT;
pPool->sessionCache.groupNum = MP_SESSION_CACHE_ALLOC_BATCH_SIZE;
pPool->sessionCache.nodeSize = sizeof(SMPSession);
@ -250,6 +256,7 @@ int32_t mpPutRetireMsgToQueue(SMemPool* pPool, bool retireLowLevel) {
int32_t mpChkQuotaOverflow(SMemPool* pPool, SMPSession* pSession, int64_t size) {
SMPJob* pJob = pSession->pJob;
int32_t code = TSDB_CODE_SUCCESS;
int64_t cAllocSize = atomic_add_fetch_64(&pJob->job.allocMemSize, size);
int64_t quota = atomic_load_64(&pPool->cfg.jobQuota);
if (quota > 0 && cAllocSize > quota) {
@ -280,8 +287,9 @@ int32_t mpChkQuotaOverflow(SMemPool* pPool, SMPSession* pSession, int64_t size)
(void)atomic_sub_fetch_64(&pJob->job.allocMemSize, size);
(void)atomic_sub_fetch_64(&pPool->allocMemSize, size);
MP_ERR_RET(mpPutRetireMsgToQueue(pPool, false));
MP_RET(code);
} else {
MP_ERR_RET(mpPutRetireMsgToQueue(pPool, false));
}
return TSDB_CODE_SUCCESS;
@ -297,8 +305,9 @@ int32_t mpChkQuotaOverflow(SMemPool* pPool, SMPSession* pSession, int64_t size)
(void)atomic_sub_fetch_64(&pJob->job.allocMemSize, size);
(void)atomic_sub_fetch_64(&pPool->allocMemSize, size);
MP_ERR_RET(mpPutRetireMsgToQueue(pPool, true));
MP_RET(code);
} else {
MP_ERR_RET(mpPutRetireMsgToQueue(pPool, true));
}
}
@ -354,6 +363,7 @@ int32_t mpRealloc(SMemPool* pPool, SMPSession* pSession, void **pPtr, int64_t si
if (0 == size) {
mpFree(pPool, pSession, *pPtr, origSize);
*pPtr = NULL;
return TSDB_CODE_SUCCESS;
}
@ -596,6 +606,49 @@ void mpLogStat(SMemPool* pPool, SMPSession* pSession, EMPStatLogItem item, SMPSt
}
}
void mpCheckStatDetail(void* poolHandle, void* session, char* detailName) {
SMemPool* pPool = (SMemPool*)poolHandle;
SMPSession* pSession = (SMPSession*)session;
SMPCtrlInfo* pCtrl = NULL;
SMPStatDetail* pDetail = NULL;
if (NULL != session) {
pCtrl = &pSession->ctrlInfo;
pDetail = &pSession->stat.statDetail;
if (MP_GET_FLAG(pCtrl->funcFlags, MP_CTRL_FLAG_CHECK_STAT)) {
int64_t allocSize = pDetail->bytes.memMalloc.succ + pDetail->bytes.memCalloc.succ + pDetail->bytes.memRealloc.succ + pDetail->bytes.strdup.succ + pDetail->bytes.strndup.succ;
int64_t freeSize = pDetail->bytes.memRealloc.origSucc + pDetail->bytes.memFree.succ;
if (allocSize != freeSize) {
uError("%s Session in JOB:0x%" PRIx64 " stat check failed, allocSize:%" PRId64 ", freeSize:%" PRId64,
detailName, pSession->pJob->job.jobId, allocSize, freeSize);
ASSERT(0);
} else {
uDebug("%s Session in JOB:0x%" PRIx64 " stat check succeed, allocSize:%" PRId64 ", freeSize:%" PRId64,
detailName, pSession->pJob->job.jobId, allocSize, freeSize);
}
}
}
if (NULL != poolHandle) {
pCtrl = &pPool->ctrlInfo;
pDetail = &pPool->stat.statDetail;
int64_t sessInit = pPool->stat.statSession.initFail + pPool->stat.statSession.initSucc;
if (MP_GET_FLAG(pCtrl->funcFlags, MP_CTRL_FLAG_CHECK_STAT) && sessInit == pPool->stat.statSession.destroyNum) {
int64_t allocSize = pDetail->bytes.memMalloc.succ + pDetail->bytes.memCalloc.succ + pDetail->bytes.memRealloc.succ + pDetail->bytes.strdup.succ + pDetail->bytes.strndup.succ;
int64_t freeSize = pDetail->bytes.memRealloc.origSucc + pDetail->bytes.memFree.succ;
if (allocSize != freeSize) {
uError("%s MemPool %s stat check failed, allocSize:%" PRId64 ", freeSize:%" PRId64, detailName, pPool->name, allocSize, freeSize);
ASSERT(0);
} else {
uDebug("%s MemPool %s stat check succeed, allocSize:%" PRId64 ", freeSize:%" PRId64, detailName, pPool->name, allocSize, freeSize);
}
}
}
}
void mpCheckUpateCfg(void) {
taosRLockLatch(&gMPMgmt.poolLock);
int32_t poolNum = taosArrayGetSize(gMPMgmt.poolList);
@ -618,9 +671,9 @@ void* mpMgmtThreadFunc(void* param) {
}
if (atomic_load_8(&gMPMgmt.msgQueue.midLevelRetire)) {
(*gMPMgmt.msgQueue.pPool->cfg.cb.retireJobsFp)(atomic_load_64(&gMPMgmt.msgQueue.pPool->retireUnit), false, TSDB_CODE_QRY_QUERY_MEM_EXHAUSTED);
(*gMPMgmt.msgQueue.pPool->cfg.cb.retireJobsFp)(gMPMgmt.msgQueue.pPool, atomic_load_64(&gMPMgmt.msgQueue.pPool->retireUnit), false, TSDB_CODE_QRY_QUERY_MEM_EXHAUSTED);
} else if (atomic_load_8(&gMPMgmt.msgQueue.lowLevelRetire)) {
(*gMPMgmt.msgQueue.pPool->cfg.cb.retireJobsFp)(atomic_load_64(&gMPMgmt.msgQueue.pPool->retireUnit), true, TSDB_CODE_QRY_QUERY_MEM_EXHAUSTED);
(*gMPMgmt.msgQueue.pPool->cfg.cb.retireJobsFp)(gMPMgmt.msgQueue.pPool, atomic_load_64(&gMPMgmt.msgQueue.pPool->retireUnit), true, TSDB_CODE_QRY_QUERY_MEM_EXHAUSTED);
}
mpCheckUpateCfg();
@ -733,6 +786,8 @@ int32_t taosMemPoolOpen(char* poolName, SMemPoolCfg* cfg, void** poolHandle) {
taosWUnLockLatch(&gMPMgmt.poolLock);
uInfo("mempool %s opened", poolName);
_return:
if (TSDB_CODE_SUCCESS != code) {
@ -767,6 +822,8 @@ void taosMemPoolDestroySession(void* poolHandle, void* session) {
taosMemPoolPrintStat(pPool, pSession, "DestroySession");
mpCheckStatDetail(pPool, pSession, "DestroySession");
TAOS_MEMSET(pSession, 0, sizeof(*pSession));
mpPushIdleNode(pPool, &pPool->sessionCache, (SMPListNode*)pSession);
@ -867,8 +924,17 @@ void *taosMemPoolRealloc(void* poolHandle, void* session, void *ptr, int64_t siz
terrno = mpRealloc(pPool, pSession, &ptr, size, &input.origSize);
MP_SET_FLAG(input.procFlags, ((ptr || 0 == size) ? MP_STAT_PROC_FLAG_RES_SUCC : MP_STAT_PROC_FLAG_RES_FAIL));
mpLogStat(pPool, pSession, E_MP_STAT_LOG_MEM_REALLOC, &input);
if (ptr || 0 == size) {
MP_SET_FLAG(input.procFlags, MP_STAT_PROC_FLAG_RES_SUCC);
mpLogStat(pPool, pSession, E_MP_STAT_LOG_MEM_REALLOC, &input);
} else {
MP_SET_FLAG(input.procFlags, MP_STAT_PROC_FLAG_RES_FAIL);
mpLogStat(pPool, pSession, E_MP_STAT_LOG_MEM_REALLOC, &input);
input.procFlags = 0;
MP_SET_FLAG(input.procFlags, MP_STAT_PROC_FLAG_RES_SUCC);
mpLogStat(pPool, pSession, E_MP_STAT_LOG_MEM_FREE, &input);
}
_return:
@ -1005,6 +1071,10 @@ _return:
void taosMemPoolClose(void* poolHandle) {
SMemPool* pPool = (SMemPool*)poolHandle;
taosMemPoolPrintStat(poolHandle, NULL, "PoolClose");
mpCheckStatDetail(pPool, NULL, "PoolClose");
taosMemoryFree(pPool->name);
mpDestroyCacheGroup(&pPool->sessionCache);
}
@ -1052,6 +1122,28 @@ int32_t taosMemPoolCallocJob(uint64_t jobId, void** ppJob) {
return TSDB_CODE_SUCCESS;
}
void taosMemPoolGetUsedSizeBegin(void* poolHandle, int64_t* usedSize, bool* needEnd) {
SMemPool* pPool = (SMemPool*)poolHandle;
if ((atomic_load_64(&pPool->cfg.maxSize) - atomic_load_64(&pPool->allocMemSize)) <= MP_CFG_UPDATE_MIN_RESERVE_SIZE) {
*needEnd = true;
taosWLockLatch(&pPool->cfgLock);
} else {
*needEnd = false;
}
*usedSize = atomic_load_64(&pPool->allocMemSize);
}
void taosMemPoolGetUsedSizeEnd(void* poolHandle) {
SMemPool* pPool = (SMemPool*)poolHandle;
taosWUnLockLatch(&pPool->cfgLock);
}
bool taosMemPoolNeedRetireJob(void* poolHandle) {
SMemPool* pPool = (SMemPool*)poolHandle;
return atomic_load_64(&pPool->allocMemSize) >= atomic_load_64(&pPool->retireThreshold[0]);
}
void taosAutoMemoryFree(void *ptr) {
if (NULL != threadPoolHandle) {

View File

@ -42,11 +42,11 @@
namespace {
#define MPT_PRINTF (void)printf
#define MPT_MAX_MEM_ACT_TIMES 10000
#define MPT_MAX_SESSION_NUM 256
#define MPT_MAX_JOB_NUM 200
#define MPT_MAX_MEM_ACT_TIMES 1000
#define MPT_MAX_SESSION_NUM 100
#define MPT_MAX_JOB_NUM 100
#define MPT_MAX_THREAD_NUM 100
#define MPT_MAX_JOB_LOOP_TIMES 1000
#define MPT_MAX_JOB_LOOP_TIMES 100
#define MPT_DEFAULT_RESERVE_MEM_PERCENT 20
#define MPT_MIN_RESERVE_MEM_SIZE (512 * 1048576UL)
@ -73,9 +73,10 @@ threadlocal void* mptThreadPoolSession = NULL;
#define mptMemoryCalloc(_num, _size) ((NULL != mptThreadPoolHandle) ? (taosMemPoolCalloc(mptThreadPoolHandle, mptThreadPoolSession, _num, _size, __FILE__, __LINE__)) : (taosMemCalloc(_num, _size)))
#define mptMemoryRealloc(_ptr, _size) ((NULL != mptThreadPoolHandle) ? (taosMemPoolRealloc(mptThreadPoolHandle, mptThreadPoolSession, _ptr, _size, __FILE__, __LINE__)) : (taosMemRealloc(_ptr, _size)))
#define mptStrdup(_ptr) ((NULL != mptThreadPoolHandle) ? (taosMemPoolStrdup(mptThreadPoolHandle, mptThreadPoolSession, _ptr, __FILE__, __LINE__)) : (taosStrdupi(_ptr)))
#define mptStrndup(_ptr, _size) ((NULL != mptThreadPoolHandle) ? (taosMemPoolStrndup(mptThreadPoolHandle, mptThreadPoolSession, _ptr, _size, (char*)__FILE__, __LINE__)) : (taosStrndupi(_ptr, _size)))
#define mptMemoryFree(_ptr) ((NULL != mptThreadPoolHandle) ? (taosMemPoolFree(mptThreadPoolHandle, mptThreadPoolSession, _ptr, __FILE__, __LINE__)) : (taosMemFree(_ptr)))
#define mptMemorySize(_ptr) ((NULL != mptThreadPoolHandle) ? (taosMemPoolGetMemorySize(mptThreadPoolHandle, mptThreadPoolSession, _ptr, __FILE__, __LINE__)) : (taosMemSize(_ptr)))
#define mptMemoryTrim(_size) ((NULL != mptThreadPoolHandle) ? (taosMemPoolTrim(mptThreadPoolHandle, mptThreadPoolSession, _size, __FILE__, __LINE__)) : (taosMemTrim(_size)))
#define mptMemoryTrim(_size, _trimed) ((NULL != mptThreadPoolHandle) ? (taosMemPoolTrim(mptThreadPoolHandle, mptThreadPoolSession, _size, __FILE__, __LINE__, _trimed)) : (taosMemTrim(_size, _trimed)))
#define mptMemoryMallocAlign(_alignment, _size) ((NULL != mptThreadPoolHandle) ? (taosMemPoolMallocAlign(mptThreadPoolHandle, mptThreadPoolSession, _alignment, _size, __FILE__, __LINE__)) : (taosMemMallocAlign(_alignment, _size)))
enum {
@ -101,11 +102,14 @@ typedef struct SMPTJobInfo {
typedef struct {
int32_t taskMaxActTimes;
int32_t taskActTimes;
int32_t caseLoopTimes;
int32_t jobExecTimes;
int32_t jobNum;
int32_t jobTaskNum;
int64_t maxSingleAllocSize;
char* pSrcString;
bool printTestInfo;
bool printExecDetail;
bool printInputRow;
} SMPTestCtrl;
@ -115,6 +119,7 @@ typedef struct {
} SMPTestMemInfo;
typedef struct {
uint64_t taskId;
SRWLatch taskExecLock;
bool taskFinished;
@ -124,14 +129,15 @@ typedef struct {
SMPStatDetail stat;
int32_t memIdx;
SMPTestMemInfo pMemList[MPT_MAX_MEM_ACT_TIMES];
SMPTestMemInfo* pMemList;
int64_t npSize;
int32_t npMemIdx;
SMPTestMemInfo npMemList[MPT_MAX_MEM_ACT_TIMES];
SMPTestMemInfo* npMemList;
bool taskFreed;
int32_t lastAct;
} SMPTestTaskCtx;
typedef struct {
@ -151,12 +157,13 @@ typedef struct {
typedef struct {
int64_t jobQuota;
bool autoPoolSize;
int32_t poolSize;
int64_t poolSize;
int32_t threadNum;
int32_t randTask;
} SMPTestParam;
typedef struct {
int32_t idx;
TdThread threadFp;
bool allJobs;
bool autoJob;
@ -164,11 +171,12 @@ typedef struct {
typedef struct SMPTestCtx {
int64_t qId;
int64_t tId;
SHashObj* pJobs;
BoundedQueue* pJobQueue;
void* memPoolHandle;
SMPTestThread threadCtxs[MPT_MAX_THREAD_NUM];
SMPTestJobCtx jobCtxs[MPT_MAX_JOB_NUM];
SMPTestJobCtx* jobCtxs;
SMPTestParam param;
} SMPTestCtx;
@ -206,6 +214,9 @@ void mptInitLogFile() {
tsAsyncLog = 0;
qDebugFlag = 159;
uDebugFlag = 159;
tsNumOfLogLines = INT32_MAX;
tsLogKeepDays = 10;
TAOS_STRCPY(tsLogDir, TD_LOG_DIR_PATH);
if (taosInitLog(defaultLogFileNamePrefix, maxLogFileNum) < 0) {
@ -223,32 +234,51 @@ static bool mptJobMemSizeCompFn(void* l, void* r, void* param) {
return atomic_load_64(&right->memInfo->allocMemSize) < atomic_load_64(&left->memInfo->allocMemSize);
}
void mptDeleteJobQueueData(void* pData) {
SMPTJobInfo* pJob = (SMPTJobInfo*)pData;
taosHashRelease(mptCtx.pJobs, pJob);
}
void mptInit() {
mptInitLogFile();
mptCtrl.caseLoopTimes = 1;
mptCtrl.taskActTimes = 50;
mptCtrl.maxSingleAllocSize = 104857600;
mptCtrl.jobNum = 100;
mptCtrl.jobExecTimes = 1;
mptCtrl.jobTaskNum = 1;
mptCtx.pJobs = taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
ASSERT_TRUE(NULL != mptCtx.pJobs);
mptCtx.pJobQueue = createBoundedQueue(10000, mptJobMemSizeCompFn, NULL, NULL);
mptCtx.pJobQueue = createBoundedQueue(10000, mptJobMemSizeCompFn, mptDeleteJobQueueData, NULL);
ASSERT_TRUE(NULL != mptCtx.pJobQueue);
mptCtrl.caseLoopTimes = 100;
mptCtrl.taskMaxActTimes = 1000;
mptCtrl.maxSingleAllocSize = 104857600;
mptCtx.jobCtxs = (SMPTestJobCtx*)taosMemCalloc(MPT_MAX_JOB_NUM, sizeof(*mptCtx.jobCtxs));
ASSERT_TRUE(NULL != mptCtx.jobCtxs);
mptCtrl.pSrcString = (char*)taosMemoryMalloc(mptCtrl.maxSingleAllocSize);
ASSERT_TRUE(NULL != mptCtrl.pSrcString);
memset(mptCtrl.pSrcString, 'P', mptCtrl.maxSingleAllocSize - 1);
mptCtrl.pSrcString[mptCtrl.maxSingleAllocSize - 1] = 0;
}
void mptDestroyTaskCtx(SMPTestTaskCtx* pTask) {
void mptDestroyTaskCtx(SMPTestTaskCtx* pTask, void* pSession) {
assert(mptCtx.memPoolHandle);
assert(pSession);
mptEnableMemoryPoolUsage(mptCtx.memPoolHandle, pSession);
for (int32_t i = 0; i < pTask->memIdx; ++i) {
taosMemFreeClear(pTask->pMemList[i].p);
mptMemoryFree(pTask->pMemList[i].p);
pTask->pMemList[i].p = NULL;
}
mptDisableMemoryPoolUsage();
for (int32_t i = 0; i < pTask->npMemIdx; ++i) {
taosMemFreeClear(pTask->npMemList[i].p);
}
taosMemFreeClear(pTask->pMemList);
taosMemFreeClear(pTask->npMemList);
}
@ -332,8 +362,18 @@ _return:
void mptInitTask(int32_t idx, uint64_t qId, uint64_t tId, int32_t eId, SMPTestJobCtx* pJob) {
ASSERT_TRUE(0 == mptInitSession(qId, tId, eId, pJob, &pJob->pSessions[idx]));
void mptInitTask(int32_t idx, int32_t eId, SMPTestJobCtx* pJob) {
pJob->taskCtxs[idx].taskId = atomic_add_fetch_64(&mptCtx.tId, 1);
ASSERT_TRUE(0 == mptInitSession(pJob->jobId, pJob->taskCtxs[idx].taskId, eId, pJob, &pJob->pSessions[idx]));
pJob->taskCtxs[idx].pMemList = (SMPTestMemInfo*)taosMemCalloc(MPT_MAX_MEM_ACT_TIMES, sizeof(*pJob->taskCtxs[idx].pMemList));
ASSERT_TRUE(NULL != pJob->taskCtxs[idx].pMemList);
pJob->taskCtxs[idx].npMemList = (SMPTestMemInfo*)taosMemCalloc(MPT_MAX_MEM_ACT_TIMES, sizeof(*pJob->taskCtxs[idx].npMemList));
ASSERT_TRUE(NULL != pJob->taskCtxs[idx].npMemList);
uDebug("JOB:0x%x TASK:0x%x idx:%d initialized", pJob->jobId, pJob->taskCtxs[idx].taskId, idx);
}
void mptInitJob(int32_t idx) {
@ -341,30 +381,41 @@ void mptInitJob(int32_t idx) {
pJobCtx->jobIdx = idx;
pJobCtx->jobId = atomic_add_fetch_64(&mptCtx.qId, 1);
pJobCtx->taskNum = (taosRand() % MPT_MAX_SESSION_NUM) + 1;
pJobCtx->taskNum = (mptCtrl.jobTaskNum) ? mptCtrl.jobTaskNum : (taosRand() % MPT_MAX_SESSION_NUM) + 1;
for (int32_t i = 0; i < pJobCtx->taskNum; ++i) {
mptInitTask(i, pJobCtx->jobId, i, 0, pJobCtx);
mptInitTask(i, 0, pJobCtx);
assert(pJobCtx->pJob);
}
uDebug("JOB:0x%x idx:%d initialized, taskNum:%d", pJobCtx->jobId, idx, pJobCtx->taskNum);
}
int32_t mptDestroyJob(SMPTestJobCtx* pJobCtx, bool reset) {
if (taosWTryLockLatch(&pJobCtx->jobExecLock)) {
return -1;
}
uint64_t jobId = pJobCtx->jobId;
for (int32_t i = 0; i < pJobCtx->taskNum; ++i) {
mptDestroyTaskCtx(&pJobCtx->taskCtxs[i], pJobCtx->pSessions[i]);
taosMemPoolDestroySession(mptCtx.memPoolHandle, pJobCtx->pSessions[i]);
}
uDebug("JOB:0x%x idx:%d destroyed, code:0x%x", pJobCtx->jobId, pJobCtx->jobIdx, pJobCtx->pJob->errCode);
mptDestroyJobInfo(pJobCtx->pJob);
(void)taosHashRemove(mptCtx.pJobs, &pJobCtx->jobId, sizeof(pJobCtx->jobId));
for (int32_t i = 0; i < pJobCtx->taskNum; ++i) {
taosMemPoolDestroySession(mptCtx.memPoolHandle, pJobCtx->pSessions[i]);
mptDestroyTaskCtx(&pJobCtx->taskCtxs[i]);
}
if (reset) {
int32_t jobIdx = pJobCtx->jobIdx;
memset((char*)pJobCtx + sizeof(pJobCtx->jobExecLock), 0, sizeof(SMPTestJobCtx) - sizeof(pJobCtx->jobExecLock));
mptInitJob(pJobCtx->jobIdx);
mptInitJob(jobIdx);
}
taosWUnLockLatch(&pJobCtx->jobExecLock);
MPT_PRINTF(" JOB:0x%x retired\n", jobId);
return 0;
}
@ -374,9 +425,11 @@ void mptCheckCompareJobInfo(SMPTestJobCtx* pJobCtx) {
int32_t mptResetJob(SMPTestJobCtx* pJobCtx) {
if (atomic_load_8(&pJobCtx->pJob->retired)) {
if (0 == atomic_load_32(&pJobCtx->taskRunningNum)) {
int32_t taskRunning = atomic_load_32(&pJobCtx->taskRunningNum);
if (0 == taskRunning) {
return mptDestroyJob(pJobCtx, true);
} else {
uDebug("JOB:0x%x retired but will not destroy cause of task running, num:%d", pJobCtx->jobId, taskRunning);
return -1;
}
}
@ -388,9 +441,27 @@ void mptRetireJob(SMPTJobInfo* pJob) {
SMPTestJobCtx* pCtx = (SMPTestJobCtx*)pJob->pCtx;
mptCheckCompareJobInfo(pCtx);
mptResetJob(pCtx);
}
int32_t mptGetMemPoolMaxMemSize(int64_t totalSize, int64_t* maxSize) {
int32_t mptGetMemPoolMaxMemSize(void* pHandle, int64_t* maxSize) {
int64_t freeSize = 0;
int64_t usedSize = 0;
bool needEnd = false;
taosMemPoolGetUsedSizeBegin(pHandle, &usedSize, &needEnd);
int32_t code = taosGetSysAvailMemory(&freeSize);
if (needEnd) {
taosMemPoolGetUsedSizeEnd(pHandle);
}
if (TSDB_CODE_SUCCESS != code) {
uError("get system avaiable memory size failed, error: 0x%x", code);
return code;
}
int64_t totalSize = freeSize + usedSize;
int64_t reserveSize = TMAX(totalSize * MPT_DEFAULT_RESERVE_MEM_PERCENT / 100 / 1048576UL * 1048576UL, MPT_MIN_RESERVE_MEM_SIZE);
int64_t availSize = (totalSize - reserveSize) / 1048576UL * 1048576UL;
if (availSize < MPT_MIN_MEM_POOL_SIZE) {
@ -398,12 +469,14 @@ int32_t mptGetMemPoolMaxMemSize(int64_t totalSize, int64_t* maxSize) {
return TSDB_CODE_QRY_TOO_FEW_AVAILBLE_MEM;
}
uDebug("new pool maxSize:%" PRId64 ", usedSize:%" PRId64 ", freeSize:%" PRId64, availSize, usedSize, freeSize);
*maxSize = availSize;
return TSDB_CODE_SUCCESS;
}
int32_t mptGetQueryMemPoolMaxSize(int64_t* pMaxSize, bool* autoMaxSize) {
int32_t mptGetQueryMemPoolMaxSize(void* pHandle, int64_t* pMaxSize, bool* autoMaxSize) {
if (!mptCtx.param.autoPoolSize && mptCtx.param.poolSize > 0) {
*pMaxSize = mptCtx.param.poolSize * 1048576UL;
*autoMaxSize = false;
@ -411,14 +484,7 @@ int32_t mptGetQueryMemPoolMaxSize(int64_t* pMaxSize, bool* autoMaxSize) {
return TSDB_CODE_SUCCESS;
}
int64_t memSize = 0;
int32_t code = taosGetSysAvailMemory(&memSize);
if (TSDB_CODE_SUCCESS != code) {
uError("get system avaiable memory size failed, error: 0x%x", code);
return code;
}
code = mptGetMemPoolMaxMemSize(memSize, pMaxSize);
int32_t code = mptGetMemPoolMaxMemSize(pHandle, pMaxSize);
if (TSDB_CODE_SUCCESS != code) {
return code;
}
@ -438,7 +504,7 @@ void mptCheckUpateCfgCb(void* pHandle, void* cfg) {
int64_t maxSize = 0;
bool autoMaxSize = false;
int32_t code = mptGetQueryMemPoolMaxSize(&maxSize, &autoMaxSize);
int32_t code = mptGetQueryMemPoolMaxSize(pHandle, &maxSize, &autoMaxSize);
if (TSDB_CODE_SUCCESS != code) {
pCfg->maxSize = 0;
uError("get query memPool maxSize failed, reset maxSize to %" PRId64, pCfg->maxSize);
@ -452,15 +518,21 @@ void mptCheckUpateCfgCb(void* pHandle, void* cfg) {
}
}
void mptLowLevelRetire(int64_t retireSize, int32_t errCode) {
void mptLowLevelRetire(void* pHandle, int64_t retireSize, int32_t errCode) {
SMPTJobInfo* pJob = (SMPTJobInfo*)taosHashIterate(mptCtx.pJobs, NULL);
while (pJob) {
if (!taosMemPoolNeedRetireJob(pHandle)) {
taosHashCancelIterate(mptCtx.pJobs, pJob);
return;
}
uint64_t jobId = pJob->memInfo->jobId;
int64_t aSize = atomic_load_64(&pJob->memInfo->allocMemSize);
if (aSize >= retireSize && 0 == atomic_val_compare_exchange_32(&pJob->errCode, 0, errCode) && 0 == atomic_val_compare_exchange_8(&pJob->retired, 0, 1)) {
mptRetireJob(pJob);
uDebug("QID:0x%" PRIx64 " job retired cause of low level memory retire, usedSize:%" PRId64 ", retireSize:%" PRId64,
pJob->memInfo->jobId, aSize, retireSize);
jobId, aSize, retireSize);
taosHashCancelIterate(mptCtx.pJobs, pJob);
break;
@ -470,13 +542,16 @@ void mptLowLevelRetire(int64_t retireSize, int32_t errCode) {
}
}
void mptMidLevelRetire(int64_t retireSize, int32_t errCode) {
void mptMidLevelRetire(void* pHandle, int64_t retireSize, int32_t errCode) {
SMPTJobInfo* pJob = (SMPTJobInfo*)taosHashIterate(mptCtx.pJobs, NULL);
PriorityQueueNode qNode;
while (NULL != pJob) {
if (0 == atomic_load_8(&pJob->retired)) {
assert(pJob == taosHashAcquire(mptCtx.pJobs, &pJob->memInfo->jobId, sizeof(pJob->memInfo->jobId)));
qNode.data = pJob;
(void)taosBQPush(mptCtx.pJobQueue, &qNode);
if (NULL == taosBQPush(mptCtx.pJobQueue, &qNode)) {
taosHashRelease(mptCtx.pJobs, pJob);
}
}
pJob = (SMPTJobInfo*)taosHashIterate(mptCtx.pJobs, pJob);
@ -485,6 +560,10 @@ void mptMidLevelRetire(int64_t retireSize, int32_t errCode) {
PriorityQueueNode* pNode = NULL;
int64_t retiredSize = 0;
while (retiredSize < retireSize) {
if (!taosMemPoolNeedRetireJob(pHandle)) {
break;
}
pNode = taosBQTop(mptCtx.pJobQueue);
if (NULL == pNode) {
break;
@ -498,13 +577,14 @@ void mptMidLevelRetire(int64_t retireSize, int32_t errCode) {
if (0 == atomic_val_compare_exchange_32(&pJob->errCode, 0, errCode) && 0 == atomic_val_compare_exchange_8(&pJob->retired, 0, 1)) {
int64_t aSize = atomic_load_64(&pJob->memInfo->allocMemSize);
uint64_t jobId = pJob->memInfo->jobId;
mptRetireJob(pJob);
uDebug("QID:0x%" PRIx64 " job retired cause of mid level memory retire, usedSize:%" PRId64 ", retireSize:%" PRId64,
pJob->memInfo->jobId, aSize, retireSize);
retiredSize += aSize;
uDebug("QID:0x%" PRIx64 " job retired cause of mid level memory retire, usedSize:%" PRId64 ", retireSize:%" PRId64 ", retiredSize:%" PRId64,
jobId, aSize, retireSize, retiredSize);
}
taosBQPop(mptCtx.pJobQueue);
@ -514,8 +594,8 @@ void mptMidLevelRetire(int64_t retireSize, int32_t errCode) {
}
void mptRetireJobsCb(int64_t retireSize, bool lowLevelRetire, int32_t errCode) {
(lowLevelRetire) ? mptLowLevelRetire(retireSize, errCode) : mptMidLevelRetire(retireSize, errCode);
void mptRetireJobsCb(void* pHandle, int64_t retireSize, bool lowLevelRetire, int32_t errCode) {
(lowLevelRetire) ? mptLowLevelRetire(pHandle, retireSize, errCode) : mptMidLevelRetire(pHandle, retireSize, errCode);
}
@ -527,11 +607,9 @@ void mptRetireJobCb(SMemPoolJob* mpJob, int32_t errCode) {
}
if (0 == atomic_val_compare_exchange_32(&pJob->errCode, 0, errCode) && 0 == atomic_val_compare_exchange_8(&pJob->retired, 0, 1)) {
mptRetireJob(pJob);
uInfo("QID:0x%" PRIx64 " retired directly, errCode: 0x%x", mpJob->jobId, errCode);
uInfo("QID:0x%" PRIx64 " mark retired, errCode: 0x%x, allocSize:%" PRId64, mpJob->jobId, errCode, atomic_load_64(&pJob->memInfo->allocMemSize));
} else {
uDebug("QID:0x%" PRIx64 " already retired, retired: %d, errCode: 0x%x", mpJob->jobId, atomic_load_8(&pJob->retired), atomic_load_32(&pJob->errCode));
uDebug("QID:0x%" PRIx64 " already retired, retired: %d, errCode: 0x%x, allocSize:%" PRId64, mpJob->jobId, atomic_load_8(&pJob->retired), atomic_load_32(&pJob->errCode), atomic_load_64(&pJob->memInfo->allocMemSize));
}
}
@ -548,34 +626,44 @@ void mptInitPool(void) {
}
cfg.threadNum = 10; //TODO
cfg.evicPolicy = E_EVICT_AUTO; //TODO
cfg.chunkSize = 1048576;
cfg.jobQuota = mptCtx.param.jobQuota;
cfg.cb.retireJobsFp = mptRetireJobsCb;
cfg.cb.retireJobFp = mptRetireJobCb;
cfg.cb.cfgUpdateFp = mptCheckUpateCfgCb;
ASSERT_TRUE(0 == taosMemPoolOpen("SingleThreadTest", &cfg, &mptCtx.memPoolHandle));
ASSERT_TRUE(0 == taosMemPoolOpen("testQMemPool", &cfg, &mptCtx.memPoolHandle));
}
void mptSimulateAction(SMPTestTaskCtx* pTask) {
void mptSimulateAction(SMPTestJobCtx* pJobCtx, SMPTestTaskCtx* pTask) {
int32_t actId = 0;
bool actDone = false;
int32_t size = taosRand() % mptCtrl.maxSingleAllocSize;
while (!actDone) {
actId = taosRand() % 9;
actId = taosRand() % 10;
switch (actId) {
case 0: { // malloc
if (pTask->memIdx >= MPT_MAX_MEM_ACT_TIMES) {
break;
}
pTask->pMemList[pTask->memIdx].p = taosMemoryMalloc(size);
pTask->pMemList[pTask->memIdx].p = mptMemoryMalloc(size);
pTask->stat.times.memMalloc.exec++;
pTask->stat.bytes.memMalloc.exec+=size;
if (NULL == pTask->pMemList[pTask->memIdx].p) {
pTask->stat.bytes.memMalloc.fail+=size;
uError("JOB:0x%x TASK:0x%x mpMalloc %d failed, terrno:0x%x", pJobCtx->jobId, pTask->taskId, size, terrno);
return;
}
pTask->stat.bytes.memMalloc.succ+=size;
*(char*)pTask->pMemList[pTask->memIdx].p = 'a' + taosRand() % 26;
*((char*)pTask->pMemList[pTask->memIdx].p + size -1) = 'a' + taosRand() % 26;
pTask->pMemList[pTask->memIdx].size = size;
pTask->memIdx++;
pTask->lastAct = actId;
actDone = true;
break;
}
@ -584,13 +672,22 @@ void mptSimulateAction(SMPTestTaskCtx* pTask) {
break;
}
pTask->pMemList[pTask->memIdx].p = taosMemoryCalloc(1, size);
pTask->pMemList[pTask->memIdx].p = mptMemoryCalloc(1, size);
pTask->stat.times.memCalloc.exec++;
pTask->stat.bytes.memCalloc.exec+=size;
if (NULL == pTask->pMemList[pTask->memIdx].p) {
pTask->stat.bytes.memCalloc.fail+=size;
uError("JOB:0x%x TASK:0x%x mpCalloc %d failed, terrno:0x%x", pJobCtx->jobId, pTask->taskId, size, terrno);
return;
}
pTask->stat.bytes.memCalloc.succ+=size;
*(char*)pTask->pMemList[pTask->memIdx].p = 'a' + taosRand() % 26;
*((char*)pTask->pMemList[pTask->memIdx].p + size -1) = 'a' + taosRand() % 26;
pTask->pMemList[pTask->memIdx].size = size;
pTask->memIdx++;
pTask->lastAct = actId;
actDone = true;
break;
}
@ -599,13 +696,22 @@ void mptSimulateAction(SMPTestTaskCtx* pTask) {
break;
}
pTask->pMemList[pTask->memIdx].p = taosMemoryRealloc(NULL, size);
pTask->pMemList[pTask->memIdx].p = mptMemoryRealloc(NULL, size);
pTask->stat.times.memRealloc.exec++;
pTask->stat.bytes.memRealloc.exec+=size;
if (NULL == pTask->pMemList[pTask->memIdx].p) {
pTask->stat.bytes.memRealloc.fail+=size;
uError("JOB:0x%x TASK:0x%x new mpRealloc %d failed, terrno:0x%x", pJobCtx->jobId, pTask->taskId, size, terrno);
return;
}
pTask->stat.bytes.memRealloc.succ+=size;
*(char*)pTask->pMemList[pTask->memIdx].p = 'a' + taosRand() % 26;
*((char*)pTask->pMemList[pTask->memIdx].p + size -1) = 'a' + taosRand() % 26;
pTask->pMemList[pTask->memIdx].size = size;
pTask->memIdx++;
pTask->lastAct = actId;
actDone = true;
break;
}
@ -615,12 +721,26 @@ void mptSimulateAction(SMPTestTaskCtx* pTask) {
}
assert(pTask->pMemList[pTask->memIdx - 1].p);
pTask->pMemList[pTask->memIdx - 1].p = taosMemoryRealloc(pTask->pMemList[pTask->memIdx - 1].p, size);
size++;
pTask->pMemList[pTask->memIdx - 1].p = mptMemoryRealloc(pTask->pMemList[pTask->memIdx - 1].p, size);
pTask->stat.times.memRealloc.exec++;
pTask->stat.bytes.memRealloc.exec+=size;
if (NULL == pTask->pMemList[pTask->memIdx - 1].p) {
pTask->stat.bytes.memRealloc.fail+=size;
pTask->stat.bytes.memFree.succ+=pTask->pMemList[pTask->memIdx - 1].size;
uError("JOB:0x%x TASK:0x%x real mpRealloc %d failed, terrno:0x%x", pJobCtx->jobId, pTask->taskId, size, terrno);
pTask->memIdx--;
pTask->pMemList[pTask->memIdx].size = 0;
return;
}
pTask->stat.bytes.memRealloc.succ+=size;
pTask->stat.bytes.memRealloc.origSucc+=pTask->pMemList[pTask->memIdx - 1].size;
*(char*)pTask->pMemList[pTask->memIdx - 1].p = 'a' + taosRand() % 26;
*((char*)pTask->pMemList[pTask->memIdx - 1].p + size -1) = 'a' + taosRand() % 26;
pTask->pMemList[pTask->memIdx - 1].size = size;
pTask->lastAct = actId;
actDone = true;
break;
}
@ -630,12 +750,16 @@ void mptSimulateAction(SMPTestTaskCtx* pTask) {
}
assert(pTask->pMemList[pTask->memIdx - 1].p);
pTask->pMemList[pTask->memIdx - 1].p = taosMemoryRealloc(pTask->pMemList[pTask->memIdx - 1].p, 0);
if (NULL != pTask->pMemList[pTask->memIdx - 1].p) {
taosMemoryFreeClear(pTask->pMemList[pTask->memIdx - 1].p);
}
pTask->pMemList[pTask->memIdx - 1].p = mptMemoryRealloc(pTask->pMemList[pTask->memIdx - 1].p, 0);
pTask->stat.times.memRealloc.exec++;
pTask->stat.bytes.memRealloc.exec+=size;
assert(NULL == pTask->pMemList[pTask->memIdx - 1].p);
pTask->stat.bytes.memRealloc.origSucc+=pTask->pMemList[pTask->memIdx - 1].size;
pTask->pMemList[pTask->memIdx - 1].size = 0;
pTask->memIdx--;
pTask->lastAct = actId;
actDone = true;
break;
}
@ -644,15 +768,21 @@ void mptSimulateAction(SMPTestTaskCtx* pTask) {
break;
}
size /= 10;
mptCtrl.pSrcString[size] = 0;
pTask->pMemList[pTask->memIdx].p = taosStrdup(mptCtrl.pSrcString);
pTask->pMemList[pTask->memIdx].p = mptStrdup(mptCtrl.pSrcString);
mptCtrl.pSrcString[size] = 'W';
if (NULL == pTask->pMemList[pTask->memIdx].p) {
uError("JOB:0x%x TASK:0x%x mpStrdup %d failed, terrno:0x%x", pJobCtx->jobId, pTask->taskId, size, terrno);
return;
}
*(char*)pTask->pMemList[pTask->memIdx].p = 'a' + taosRand() % 26;
*((char*)pTask->pMemList[pTask->memIdx].p + size - 1) = 'a' + taosRand() % 26;
pTask->pMemList[pTask->memIdx].size = size + 1;
pTask->memIdx++;
pTask->lastAct = actId;
actDone = true;
break;
}
@ -661,13 +791,19 @@ void mptSimulateAction(SMPTestTaskCtx* pTask) {
break;
}
pTask->pMemList[pTask->memIdx].p = taosStrndup(mptCtrl.pSrcString, size);
size /= 10;
pTask->pMemList[pTask->memIdx].p = mptStrndup(mptCtrl.pSrcString, size);
if (NULL == pTask->pMemList[pTask->memIdx].p) {
uError("JOB:0x%x TASK:0x%x mpStrndup %d failed, terrno:0x%x", pJobCtx->jobId, pTask->taskId, size, terrno);
return;
}
*(char*)pTask->pMemList[pTask->memIdx].p = 'a' + taosRand() % 26;
*((char*)pTask->pMemList[pTask->memIdx].p + size -1) = 'a' + taosRand() % 26;
pTask->pMemList[pTask->memIdx].size = size + 1;
pTask->memIdx++;
pTask->lastAct = actId;
actDone = true;
break;
}
@ -677,14 +813,37 @@ void mptSimulateAction(SMPTestTaskCtx* pTask) {
}
assert(pTask->pMemList[pTask->memIdx - 1].p);
taosMemoryFreeClear(pTask->pMemList[pTask->memIdx - 1].p);
mptMemoryFree(pTask->pMemList[pTask->memIdx - 1].p);
pTask->pMemList[pTask->memIdx - 1].p = NULL;
pTask->memIdx--;
pTask->lastAct = actId;
actDone = true;
break;
}
case 8:{ // trim
taosMemoryTrim(0, NULL);
mptMemoryTrim(0, NULL);
pTask->lastAct = actId;
actDone = true;
break;
}
case 9: { // malloc_align
if (pTask->memIdx >= MPT_MAX_MEM_ACT_TIMES) {
break;
}
pTask->pMemList[pTask->memIdx].p = mptMemoryMallocAlign(8, size);
if (NULL == pTask->pMemList[pTask->memIdx].p) {
uError("JOB:0x%x TASK:0x%x mpMallocAlign %d failed, terrno:0x%x", pJobCtx->jobId, pTask->taskId, size, terrno);
return;
}
*(char*)pTask->pMemList[pTask->memIdx].p = 'a' + taosRand() % 26;
*((char*)pTask->pMemList[pTask->memIdx].p + size -1) = 'a' + taosRand() % 26;
pTask->pMemList[pTask->memIdx].size = size;
pTask->memIdx++;
pTask->lastAct = actId;
actDone = true;
break;
}
@ -696,13 +855,16 @@ void mptSimulateAction(SMPTestTaskCtx* pTask) {
}
void mptSimulateTask(SMPTestJobCtx* pJobCtx, SMPTestTaskCtx* pTask) {
int32_t actTimes = taosRand() % mptCtrl.taskMaxActTimes;
int32_t actTimes = mptCtrl.taskActTimes ? mptCtrl.taskActTimes : taosRand() % MPT_MAX_MEM_ACT_TIMES;
uDebug("JOB:0x%x TASK:0x%x will start total %d actions", pJobCtx->jobId, pTask->taskId, actTimes);
for (int32_t i = 0; i < actTimes; ++i) {
if (atomic_load_8(&pJobCtx->pJob->retired)) {
uDebug("JOB:0x%x TASK:0x%x stop running cause of job already retired", pJobCtx->jobId, pTask->taskId);
return;
}
mptSimulateAction(pTask);
mptSimulateAction(pJobCtx, pTask);
}
}
@ -718,18 +880,31 @@ void mptSimulateOutTask(SMPTestJobCtx* pJobCtx, SMPTestTaskCtx* pTask) {
if (pTask->npMemIdx >= MPT_MAX_MEM_ACT_TIMES) {
return;
}
pTask->npMemList[pTask->npMemIdx].size = taosRand() % mptCtrl.maxSingleAllocSize;
pTask->npMemList[pTask->npMemIdx].p = taosMemMalloc(pTask->npMemList[pTask->npMemIdx].size);
if (NULL == pTask->npMemList[pTask->npMemIdx].p) {
uError("JOB:0x%x TASK:0x%x out malloc %" PRId64 " failed", pJobCtx->jobId, pTask->taskId, pTask->npMemList[pTask->npMemIdx].size);
pTask->npMemList[pTask->npMemIdx].size = 0;
return;
}
uDebug("JOB:0x%x TASK:0x%x out malloced, size:%" PRId64 ", mIdx:%d", pJobCtx->jobId, pTask->taskId, pTask->npMemList[pTask->npMemIdx].size, pTask->npMemIdx);
pTask->npMemList[pTask->npMemIdx].p = taosMemoryMalloc(taosRand() % mptCtrl.maxSingleAllocSize);
pTask->npMemIdx++;
pTask->npMemIdx++;
}
void mptTaskRun(SMPTestJobCtx* pJobCtx, SMPTestTaskCtx* pCtx, int32_t idx) {
uDebug("JOB:0x%x TASK:0x%x start running", pJobCtx->jobId, pCtx->taskId);
if (atomic_load_8(&pJobCtx->pJob->retired)) {
uDebug("JOB:0x%x TASK:0x%x stop running cause of job already retired", pJobCtx->jobId, pCtx->taskId);
return;
}
if (taosWTryLockLatch(&pCtx->taskExecLock)) {
uDebug("JOB:0x%x TASK:0x%x stop running cause of task already running", pJobCtx->jobId, pCtx->taskId);
return;
}
@ -739,38 +914,48 @@ void mptTaskRun(SMPTestJobCtx* pJobCtx, SMPTestTaskCtx* pCtx, int32_t idx) {
mptSimulateTask(pJobCtx, pCtx);
mptDisableMemoryPoolUsage();
mptSimulateOutTask(pJobCtx, pCtx);
if (!atomic_load_8(&pJobCtx->pJob->retired)) {
mptSimulateOutTask(pJobCtx, pCtx);
}
taosWUnLockLatch(&pCtx->taskExecLock);
atomic_sub_fetch_32(&pJobCtx->taskRunningNum, 1);
uDebug("JOB:0x%x TASK:0x%x end running", pJobCtx->jobId, pCtx->taskId);
}
void mptInitJobs() {
for (int32_t i = 0; i < MPT_MAX_JOB_NUM; ++i) {
int32_t jobNum = mptCtrl.jobNum ? mptCtrl.jobNum : MPT_MAX_JOB_NUM;
memset(mptCtx.jobCtxs, 0, sizeof(*mptCtx.jobCtxs) * jobNum);
for (int32_t i = 0; i < jobNum; ++i) {
mptInitJob(i);
}
}
void* mptThreadFunc(void* param) {
SMPTestThread* pThread = (SMPTestThread*)param;
int32_t jobExecTimes = (mptCtrl.jobExecTimes) ? mptCtrl.jobExecTimes : taosRand() % MPT_MAX_JOB_LOOP_TIMES + 1;
int32_t jobNum = (mptCtrl.jobNum) ? mptCtrl.jobNum : MPT_MAX_JOB_NUM;
for (int32_t i = 0; i < MPT_MAX_JOB_NUM; ++i) {
SMPTestJobCtx* pJobCtx = &mptCtx.jobCtxs[i];
if (mptResetJob(pJobCtx)) {
continue;
}
for (int32_t n = 0; n < jobExecTimes; ++n) {
MPT_PRINTF("Thread %d start the %d:%d job loops\n", pThread->idx, n, jobExecTimes);
if (taosRTryLockLatch(&pJobCtx->jobExecLock)) {
continue;
}
for (int32_t i = 0; i < jobNum; ++i) {
SMPTestJobCtx* pJobCtx = &mptCtx.jobCtxs[i];
MPT_PRINTF(" Thread %d start %dth JOB 0x%x exec\n", pThread->idx, pJobCtx->jobIdx, pJobCtx->jobId);
int32_t jobExecTimes = taosRand() % MPT_MAX_JOB_LOOP_TIMES + 1;
for (int32_t n = 0; n < jobExecTimes; ++n) {
if (atomic_load_8(&pJobCtx->pJob->retired)) {
break;
if (mptResetJob(pJobCtx)) {
continue;
}
if (taosRTryLockLatch(&pJobCtx->jobExecLock)) {
continue;
}
if (mptCtx.param.randTask) {
int32_t taskIdx = taosRand() % pJobCtx->taskNum;
mptTaskRun(pJobCtx, &pJobCtx->taskCtxs[taskIdx], taskIdx);
@ -783,9 +968,15 @@ void* mptThreadFunc(void* param) {
}
mptTaskRun(pJobCtx, &pJobCtx->taskCtxs[m], m);
}
taosRUnLockLatch(&pJobCtx->jobExecLock);
MPT_PRINTF(" Thread %d end %dth JOB 0x%x exec, retired:%d\n", pThread->idx, pJobCtx->jobIdx, pJobCtx->jobId, pJobCtx->pJob->retired);
mptResetJob(pJobCtx);
}
taosRUnLockLatch(&pJobCtx->jobExecLock);
MPT_PRINTF("Thread %d finish the %dth job loops\n", pThread->idx, n);
}
return NULL;
@ -795,18 +986,23 @@ void mptStartThreadTest(int32_t threadIdx) {
TdThreadAttr thattr;
ASSERT_EQ(0, taosThreadAttrInit(&thattr));
ASSERT_EQ(0, taosThreadAttrSetDetachState(&thattr, PTHREAD_CREATE_JOINABLE));
mptCtx.threadCtxs[threadIdx].idx = threadIdx;
ASSERT_EQ(0, taosThreadCreate(&mptCtx.threadCtxs[threadIdx].threadFp, &thattr, mptThreadFunc, &mptCtx.threadCtxs[threadIdx]));
ASSERT_EQ(0, taosThreadAttrDestroy(&thattr));
}
void mptDestroyJobs() {
for (int32_t i = 0; i < MPT_MAX_JOB_NUM; ++i) {
int32_t jobNum = mptCtrl.jobNum ? mptCtrl.jobNum : MPT_MAX_JOB_NUM;
for (int32_t i = 0; i < jobNum; ++i) {
mptDestroyJob(&mptCtx.jobCtxs[i], false);
}
}
void mptRunCase(SMPTestParam* param) {
void mptRunCase(SMPTestParam* param, int32_t times) {
MPT_PRINTF("\t case start the %dth running\n", times);
memcpy(&mptCtx.param, param, sizeof(SMPTestParam));
mptInitPool();
@ -822,27 +1018,43 @@ void mptRunCase(SMPTestParam* param) {
}
mptDestroyJobs();
MPT_PRINTF("\t case end the %dth running\n", times);
}
void mptPrintTestBeginInfo(char* caseName, SMPTestParam* param) {
MPT_PRINTF("Case [%s] begins:\n", caseName);
MPT_PRINTF("\t case loop times: %d\n", mptCtrl.caseLoopTimes);
MPT_PRINTF("\t task max act times: %d\n", mptCtrl.taskActTimes ? mptCtrl.taskActTimes : MPT_MAX_MEM_ACT_TIMES);
MPT_PRINTF("\t max single alloc size: %" PRId64 "\n", mptCtrl.maxSingleAllocSize);
MPT_PRINTF("\t job quota size: %" PRId64 "\n", param->jobQuota);
MPT_PRINTF("\t auto pool size: %d\n", param->autoPoolSize);
MPT_PRINTF("\t pool max size: %" PRId64 "\n", param->poolSize);
MPT_PRINTF("\t test thread num: %d\n", param->threadNum);
MPT_PRINTF("\t random exec task: %d\n", param->randTask);
}
} // namespace
#if 1
#if 1
TEST(FuncTest, SingleJobTest) {
TEST(FuncTest, SingleThreadTest) {
char* caseName = "FuncTest:SingleThreadTest";
SMPTestParam param = {0};
param.autoPoolSize = true;
param.threadNum = 10;
param.threadNum = 1;
mptPrintTestBeginInfo(caseName, &param);
for (int32_t i = 0; i < mptCtrl.caseLoopTimes; ++i) {
mptRunCase(&param);
mptRunCase(&param, i);
}
}
#endif
#if 0
TEST(FuncTest, MultiJobsTest) {
char* caseName = "FuncTest:SingleThreadTest";
TEST(FuncTest, MultiThreadsTest) {
char* caseName = "FuncTest:MultiThreadsTest";
SMPTestParam param = {0};
for (int32_t i = 0; i < mptCtrl.caseLoopTimes; ++i) {