fix: add more UT

This commit is contained in:
dapan1121 2024-08-30 18:13:35 +08:00
parent b95ef255d0
commit 60063b4d65
9 changed files with 334 additions and 134 deletions

View File

@ -94,7 +94,9 @@ typedef struct SMemPoolCallBack {
typedef struct SMemPoolCfg {
bool autoMaxSize;
int64_t maxSize;
int64_t reserveSize;
int64_t retireUnitSize;
int64_t freeSize;
int64_t jobQuota;
int32_t chunkSize;
int32_t threadNum;
@ -102,6 +104,11 @@ typedef struct SMemPoolCfg {
SMemPoolCallBack cb;
} SMemPoolCfg;
#define MEMPOOL_GET_ALLOC_SIZE(_dstat) ((_dstat)->bytes.memMalloc.succ + (_dstat)->bytes.memCalloc.succ + (_dstat)->bytes.memRealloc.succ + (_dstat)->bytes.strdup.succ + (_dstat)->bytes.strndup.succ)
#define MEMPOOL_GET_FREE_SIZE(_dstat) ((_dstat)->bytes.memRealloc.origSucc + (_dstat)->bytes.memFree.succ)
#define MEMPOOL_GET_USED_SIZE(_dstat) (MEMPOOL_GET_ALLOC_SIZE(_dstat) - MEMPOOL_GET_FREE_SIZE(_dstat))
int32_t taosMemPoolOpen(char* poolName, SMemPoolCfg* cfg, void** poolHandle);
void *taosMemPoolMalloc(void* poolHandle, void* session, int64_t size, char* fileName, int32_t lineNo);
void *taosMemPoolCalloc(void* poolHandle, void* session, int64_t num, int64_t size, char* fileName, int32_t lineNo);
@ -123,6 +130,8 @@ void taosMemPoolPrintStat(void* poolHandle, void* session, char* procName);
void taosMemPoolGetUsedSizeBegin(void* poolHandle, int64_t* usedSize, bool* needEnd);
void taosMemPoolGetUsedSizeEnd(void* poolHandle);
bool taosMemPoolNeedRetireJob(void* poolHandle);
int32_t taosMemPoolGetSessionStat(void* session, SMPStatDetail** ppStat, int64_t* allocSize, int64_t* maxAllocSize);
#define taosMemPoolFreeClear(ptr) \
do { \

View File

@ -253,6 +253,7 @@ typedef struct SQueryMgmt {
int32_t concTaskLevel;
SHashObj* pJobInfo;
void* memPoolHandle;
int8_t memPoolInited;
SQWRetireCtx retireCtx;
} SQueryMgmt;

View File

@ -18,7 +18,7 @@ int32_t qwGetMemPoolMaxMemSize(void* pHandle, int64_t* maxSize) {
}
int64_t totalSize = freeSize + usedSize;
int64_t reserveSize = TMAX(totalSize * QW_DEFAULT_RESERVE_MEM_PERCENT / 100 / 1048576UL * 1048576UL, QW_MIN_RESERVE_MEM_SIZE);
int64_t reserveSize = TMAX(tsTotalMemoryKB * 1024 * QW_DEFAULT_RESERVE_MEM_PERCENT / 100 / 1048576UL * 1048576UL, QW_MIN_RESERVE_MEM_SIZE);
int64_t availSize = (totalSize - reserveSize) / 1048576UL * 1048576UL;
if (availSize < QW_MIN_MEM_POOL_SIZE) {
qError("too little available query memory, totalAvailable: %" PRId64 ", reserveSize: %" PRId64, totalSize, reserveSize);
@ -238,15 +238,15 @@ void qwRetireJobsCb(void* pHandle, int64_t retireSize, bool lowLevelRetire, int3
(lowLevelRetire) ? qwLowLevelRetire(pHandle, retireSize, errCode) : qwMidLevelRetire(pHandle, retireSize, errCode);
}
int32_t qwGetQueryMemPoolMaxSize(void* pHandle, int64_t* pMaxSize, bool* autoMaxSize) {
int32_t qwUpdateQueryMemPoolCfg(void* pHandle, int64_t* pFreeSize, bool* autoMaxSize, int64_t* pReserveSize, int64_t* pRetireUnitSize) {
if (tsQueryBufferPoolSize > 0) {
*pMaxSize = tsQueryBufferPoolSize * 1048576UL;
*pFreeSize = tsQueryBufferPoolSize * 1048576UL;
*autoMaxSize = false;
return TSDB_CODE_SUCCESS;
}
int32_t code = qwGetMemPoolMaxMemSize(pHandle, pMaxSize);
int32_t code = qwGetMemPoolMaxMemSize(pHandle, pFreeSize);
if (TSDB_CODE_SUCCESS != code) {
return code;
}
@ -263,18 +263,17 @@ void qwCheckUpateCfgCb(void* pHandle, void* cfg) {
atomic_store_64(&pCfg->jobQuota, newJobQuota);
}
int64_t maxSize = 0;
int64_t freeSize = 0, reserveSize = 0, retireUnitSize = 0;
bool autoMaxSize = false;
int32_t code = qwGetQueryMemPoolMaxSize(pHandle, &maxSize, &autoMaxSize);
int32_t code = qwUpdateQueryMemPoolCfg(pHandle, &freeSize, &autoMaxSize, &reserveSize, &retireUnitSize);
if (TSDB_CODE_SUCCESS != code) {
pCfg->maxSize = 0;
qError("get query memPool maxSize failed, reset maxSize to %" PRId64, pCfg->maxSize);
return;
pCfg->freeSize = 0;
qError("get query memPool freeSize failed, reset freeSize to %" PRId64, pCfg->freeSize);
}
if (pCfg->autoMaxSize != autoMaxSize || pCfg->maxSize != maxSize) {
if (pCfg->autoMaxSize != autoMaxSize || pCfg->freeSize != freeSize) {
pCfg->autoMaxSize = autoMaxSize;
atomic_store_64(&pCfg->maxSize, maxSize);
atomic_store_64(&pCfg->freeSize, freeSize);
taosMemPoolCfgUpdate(pHandle, pCfg);
}
}
@ -304,10 +303,10 @@ int32_t qwInitQueryPool(void) {
}
#endif
taosGetTotalMemory(&tsTotalMemoryKB);
SMemPoolCfg cfg = {0};
int64_t maxSize = 0;
bool autoMaxSize = false;
code = qwGetQueryMemPoolMaxSize(NULL, &maxSize, &autoMaxSize);
code = qwUpdateQueryMemPoolCfg(NULL, &cfg.freeSize, &cfg.autoMaxSize, &cfg.reserveSize, &cfg.retireUnitSize);
if (TSDB_CODE_SUCCESS != code) {
return code;
}
@ -322,11 +321,6 @@ int32_t qwInitQueryPool(void) {
cfg.cb.retireJobFp = qwRetireJobCb;
cfg.cb.cfgUpdateFp = qwCheckUpateCfgCb;
code = qwGetMemPoolChunkSize(cfg.maxSize, cfg.threadNum, &cfg.chunkSize);
if (TSDB_CODE_SUCCESS != code) {
return code;
}
gQueryMgmt.pJobInfo = taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
if (NULL == gQueryMgmt.pJobInfo) {
qError("init job hash failed, error:0x%x", terrno);

View File

@ -1345,7 +1345,7 @@ int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, void **qWorkerMgmt, const S
QW_RET(TSDB_CODE_QRY_INVALID_INPUT);
}
if (NULL == gQueryMgmt.memPoolHandle) {
if (0 == atomic_val_compare_exchange_8(&gQueryMgmt.memPoolInited, 0, 1)) {
QW_ERR_RET(qwInitQueryPool());
}

View File

@ -283,10 +283,10 @@ typedef struct SMemPoolMgmt {
int32_t code;
} SMemPoolMgmt;
typedef int32_t (*mpAllocFunc)(SMemPool*, SMPSession*, int64_t, uint32_t, void**);
typedef int32_t (*mpAllocFunc)(SMemPool*, SMPSession*, int64_t*, uint32_t, void**);
typedef void (*mpFreeFunc)(SMemPool*, SMPSession*, void *, int64_t*);
typedef int64_t (*mpGetSizeFunc)(SMemPool*, SMPSession*, void*);
typedef int32_t (*mpReallocFunc)(SMemPool*, SMPSession*, void **, int64_t, int64_t*);
typedef int32_t (*mpReallocFunc)(SMemPool*, SMPSession*, void **, int64_t*, int64_t*);
typedef int32_t (*mpInitSessionFunc)(SMemPool*, SMPSession*);
typedef int32_t (*mpInitFunc)(SMemPool*, char*, SMemPoolCfg*);
typedef int32_t (*mpUpdateCfgFunc)(SMemPool*);
@ -402,26 +402,27 @@ enum {
} while (0)
// direct
int32_t mpDirectAlloc(SMemPool* pPool, SMPSession* pSession, int64_t size, uint32_t alignment, void** ppRes);
int32_t mpDirectAlloc(SMemPool* pPool, SMPSession* pSession, int64_t* size, uint32_t alignment, void** ppRes);
int64_t mpDirectGetMemSize(SMemPool* pPool, SMPSession* pSession, void *ptr);
void mpDirectFree(SMemPool* pPool, SMPSession* pSession, void *ptr, int64_t* origSize);
int32_t mpDirectRealloc(SMemPool* pPool, SMPSession* pSession, void **pPtr, int64_t size, int64_t* origSize);
int32_t mpDirectRealloc(SMemPool* pPool, SMPSession* pSession, void **pPtr, int64_t* size, int64_t* origSize);
int32_t mpDirectTrim(SMemPool* pPool, SMPSession* pSession, int32_t size, bool* trimed);
// chunk
int32_t mpChunkInit(SMemPool* pPool, char* poolName, SMemPoolCfg* cfg);
int64_t mpChunkGetMemSize(SMemPool* pPool, SMPSession* pSession, void *ptr);
int32_t mpChunkAlloc(SMemPool* pPool, SMPSession* pSession, int64_t size, uint32_t alignment, void** ppRes);
int32_t mpChunkAlloc(SMemPool* pPool, SMPSession* pSession, int64_t* size, uint32_t alignment, void** ppRes);
void mpChunkFree(SMemPool* pPool, SMPSession* pSession, void *ptr, int64_t* origSize);
int32_t mpChunkRealloc(SMemPool* pPool, SMPSession* pSession, void **pPtr, int64_t size, int64_t* origSize);
int32_t mpChunkRealloc(SMemPool* pPool, SMPSession* pSession, void **pPtr, int64_t* size, int64_t* origSize);
int32_t mpChunkInitSession(SMemPool* pPool, SMPSession* pSession);
int32_t mpChunkUpdateCfg(SMemPool* pPool);
int32_t mpPopIdleNode(SMemPool* pPool, SMPCacheGroupInfo* pInfo, void** ppRes);
int32_t mpChkQuotaOverflow(SMemPool* pPool, SMPSession* pSession, int64_t size);
void mpUpdateAllocSize(SMemPool* pPool, SMPSession* pSession, int64_t size);
void mpUpdateAllocSize(SMemPool* pPool, SMPSession* pSession, int64_t size, int64_t addSize);
int32_t mpAddCacheGroup(SMemPool* pPool, SMPCacheGroupInfo* pInfo, SMPCacheGroup* pHead);
int32_t mpMalloc(SMemPool* pPool, SMPSession* pSession, int64_t size, uint32_t alignment, void** ppRes);
int32_t mpMalloc(SMemPool* pPool, SMPSession* pSession, int64_t* size, uint32_t alignment, void** ppRes);

View File

@ -153,7 +153,7 @@ int32_t mpChunkAllocMem(SMemPool* pPool, SMPSession* pSession, int64_t size, uin
pSession->chunk.allocChunkNum++;
pSession->chunk.allocChunkMemSize += pPool->cfg.chunkSize;
mpUpdateAllocSize(pPool, pSession, totalSize);
mpUpdateAllocSize(pPool, pSession, totalSize, 0);
MP_ADD_TO_CHUNK_LIST(pSession->chunk.srcChunkHead, pSession->chunk.srcChunkTail, pSession->chunk.srcChunkNum, pChunk);
MP_ADD_TO_CHUNK_LIST(pSession->chunk.inUseChunkHead, pSession->chunk.inUseChunkTail, pSession->chunk.inUseChunkNum, pChunk);
@ -198,7 +198,7 @@ int32_t mpChunkNSAllocMem(SMemPool* pPool, SMPSession* pSession, int64_t size, u
pSession->chunk.allocChunkNum++;
pSession->chunk.allocChunkMemSize += totalSize;
mpUpdateAllocSize(pPool, pSession, totalSize);
mpUpdateAllocSize(pPool, pSession, totalSize, 0);
if (NULL == pSession->chunk.inUseNSChunkHead) {
pSession->chunk.inUseNSChunkHead = pChunk;
@ -240,8 +240,8 @@ int64_t mpChunkGetMemSize(SMemPool* pPool, SMPSession* pSession, void *ptr) {
return pHeader->size;
}
int32_t mpChunkAlloc(SMemPool* pPool, SMPSession* pSession, int64_t size, uint32_t alignment, void** ppRes) {
MP_RET((size > pPool->cfg.chunkSize) ? mpChunkNSAllocMem(pPool, pSession, size, alignment, ppRes) : mpChunkAllocMem(pPool, pSession, size, alignment, ppRes));
int32_t mpChunkAlloc(SMemPool* pPool, SMPSession* pSession, int64_t* size, uint32_t alignment, void** ppRes) {
MP_RET((*size > pPool->cfg.chunkSize) ? mpChunkNSAllocMem(pPool, pSession, *size, alignment, ppRes) : mpChunkAllocMem(pPool, pSession, *size, alignment, ppRes));
}
@ -258,12 +258,12 @@ void mpChunkFree(SMemPool* pPool, SMPSession* pSession, void *ptr, int64_t* orig
}
int32_t mpChunkRealloc(SMemPool* pPool, SMPSession* pSession, void **pPtr, int64_t size, int64_t* origSize) {
int32_t mpChunkRealloc(SMemPool* pPool, SMPSession* pSession, void **pPtr, int64_t* size, int64_t* origSize) {
int32_t code = TSDB_CODE_SUCCESS;
if (*origSize >= size) {
if (*origSize >= *size) {
SMPMemHeader* pHeader = (SMPMemHeader*)((char*)*pPtr - sizeof(SMPMemHeader));
pHeader->size = size;
pHeader->size = *size;
return TSDB_CODE_SUCCESS;
}
@ -301,9 +301,9 @@ int32_t mpChunkInitSession(SMemPool* pPool, SMPSession* pSession) {
}
int32_t mpChunkUpdateCfg(SMemPool* pPool) {
pPool->chunk.maxChunkNum = pPool->cfg.maxSize / pPool->cfg.chunkSize;
pPool->chunk.maxChunkNum = pPool->cfg.freeSize / pPool->cfg.chunkSize;
if (pPool->chunk.maxChunkNum <= 0) {
uError("invalid memory pool max chunk num, maxSize:%" PRId64 ", chunkSize:%d", pPool->cfg.maxSize, pPool->cfg.chunkSize);
uError("invalid memory pool max chunk num, freeSize:%" PRId64 ", chunkSize:%d", pPool->cfg.freeSize, pPool->cfg.chunkSize);
return TSDB_CODE_INVALID_MEM_POOL_PARAM;
}

View File

@ -19,22 +19,24 @@
#include "tlog.h"
#include "tutil.h"
int32_t mpDirectAlloc(SMemPool* pPool, SMPSession* pSession, int64_t size, uint32_t alignment, void** ppRes) {
int32_t mpDirectAlloc(SMemPool* pPool, SMPSession* pSession, int64_t* size, uint32_t alignment, void** ppRes) {
int32_t code = TSDB_CODE_SUCCESS;
void* res = NULL;
int64_t nSize = *size;
taosRLockLatch(&pPool->cfgLock);
MP_ERR_JRET(mpChkQuotaOverflow(pPool, pSession, size));
MP_ERR_JRET(mpChkQuotaOverflow(pPool, pSession, *size));
res = alignment ? taosMemMallocAlign(alignment, size) : taosMemMalloc(size);
res = alignment ? taosMemMallocAlign(alignment, *size) : taosMemMalloc(*size);
if (NULL != res) {
mpUpdateAllocSize(pPool, pSession, size);
nSize = taosMemSize(res);
mpUpdateAllocSize(pPool, pSession, nSize, nSize - *size);
} else {
(void)atomic_sub_fetch_64(&pSession->pJob->job.allocMemSize, size);
(void)atomic_sub_fetch_64(&pPool->allocMemSize, size);
(void)atomic_sub_fetch_64(&pSession->pJob->job.allocMemSize, *size);
(void)atomic_sub_fetch_64(&pPool->allocMemSize, *size);
uError("malloc %" PRId64 " alignment %d failed, code: 0x%x", size, alignment, terrno);
uError("malloc %" PRId64 " alignment %d failed, code: 0x%x", *size, alignment, terrno);
code = terrno;
}
@ -44,6 +46,7 @@ _return:
taosRUnLockLatch(&pPool->cfgLock);
*ppRes = res;
*size = nSize;
MP_RET(code);
}
@ -69,16 +72,18 @@ void mpDirectFree(SMemPool* pPool, SMPSession* pSession, void *ptr, int64_t* ori
taosRUnLockLatch(&pPool->cfgLock);
}
int32_t mpDirectRealloc(SMemPool* pPool, SMPSession* pSession, void **pPtr, int64_t size, int64_t* origSize) {
int32_t mpDirectRealloc(SMemPool* pPool, SMPSession* pSession, void **pPtr, int64_t* size, int64_t* origSize) {
int32_t code = TSDB_CODE_SUCCESS;
int64_t nSize = *size;
taosRLockLatch(&pPool->cfgLock);
MP_ERR_JRET(mpChkQuotaOverflow(pPool, pSession, size - *origSize));
MP_ERR_JRET(mpChkQuotaOverflow(pPool, pSession, *size - *origSize));
*pPtr = taosMemRealloc(*pPtr, size);
*pPtr = taosMemRealloc(*pPtr, *size);
if (NULL != *pPtr) {
mpUpdateAllocSize(pPool, pSession, size - *origSize);
nSize = taosMemSize(*pPtr);
mpUpdateAllocSize(pPool, pSession, nSize - *origSize, nSize - *size + *origSize);
} else {
MP_ERR_RET(terrno);
}
@ -88,11 +93,17 @@ _return:
taosRUnLockLatch(&pPool->cfgLock);
if (code) {
mpDirectFree(pPool, pSession, *pPtr, NULL);
mpDirectFree(pPool, pSession, *pPtr, origSize);
*pPtr = NULL;
}
*size = nSize;
return TSDB_CODE_SUCCESS;
}
int32_t mpDirectTrim(SMemPool* pPool, SMPSession* pSession, int32_t size, bool* trimed) {
return taosMemTrim(size, trimed);
}

View File

@ -25,7 +25,7 @@ threadlocal void* threadPoolSession = NULL;
SMemPoolMgmt gMPMgmt = {0};
SMPStrategyFp gMPFps[] = {
{NULL},
{NULL, mpDirectAlloc, mpDirectFree, mpDirectGetMemSize, mpDirectRealloc, NULL, NULL, NULL},
{NULL, mpDirectAlloc, mpDirectFree, mpDirectGetMemSize, mpDirectRealloc, NULL, NULL, mpDirectTrim},
{mpChunkInit, mpChunkAlloc, mpChunkFree, mpChunkGetMemSize, mpChunkRealloc, mpChunkInitSession, mpChunkUpdateCfg, NULL}
};
@ -161,21 +161,19 @@ void mpPushIdleNode(SMemPool* pPool, SMPCacheGroupInfo* pInfo, SMPListNode* pNod
int32_t mpUpdateCfg(SMemPool* pPool) {
atomic_store_64(&pPool->retireThreshold[0], pPool->cfg.maxSize * MP_RETIRE_LOW_THRESHOLD_PERCENT);
atomic_store_64(&pPool->retireThreshold[1], pPool->cfg.maxSize * MP_RETIRE_MID_THRESHOLD_PERCENT);
atomic_store_64(&pPool->retireThreshold[2], pPool->cfg.maxSize * MP_RETIRE_HIGH_THRESHOLD_PERCENT);
atomic_store_64(&pPool->retireUnit, TMAX(pPool->cfg.maxSize * MP_RETIRE_UNIT_PERCENT, MP_RETIRE_UNIT_MIN_SIZE));
atomic_store_64(&pPool->retireThreshold[0], pPool->cfg.freeSize * MP_RETIRE_LOW_THRESHOLD_PERCENT);
atomic_store_64(&pPool->retireThreshold[1], pPool->cfg.freeSize * MP_RETIRE_MID_THRESHOLD_PERCENT);
atomic_store_64(&pPool->retireThreshold[2], pPool->cfg.freeSize * MP_RETIRE_HIGH_THRESHOLD_PERCENT);
if (gMPFps[gMPMgmt.strategy].updateCfgFp) {
MP_ERR_RET((*gMPFps[gMPMgmt.strategy].updateCfgFp)(pPool));
}
uDebug("memPool %s cfg updated, autoMaxSize:%d, maxSize:%" PRId64
uDebug("memPool %s cfg updated, autoMaxSize:%d, freeSize:%" PRId64
", jobQuota:%" PRId64 ", threadNum:%d, retireThreshold:%" PRId64 "-%" PRId64 "-%" PRId64
", retireUnit:%" PRId64, pPool->name, pPool->cfg.autoMaxSize, pPool->cfg.maxSize,
", retireUnit:%" PRId64, pPool->name, pPool->cfg.autoMaxSize, pPool->cfg.freeSize,
pPool->cfg.jobQuota, pPool->cfg.threadNum, pPool->retireThreshold[0], pPool->retireThreshold[1],
pPool->retireThreshold[2], pPool->retireUnit);
pPool->retireThreshold[2], pPool->cfg.retireUnitSize);
return TSDB_CODE_SUCCESS;
}
@ -223,7 +221,12 @@ FORCE_INLINE void mpUpdateMaxAllocSize(int64_t* pMaxAllocMemSize, int64_t newSiz
}
}
void mpUpdateAllocSize(SMemPool* pPool, SMPSession* pSession, int64_t size) {
void mpUpdateAllocSize(SMemPool* pPool, SMPSession* pSession, int64_t size, int64_t addSize) {
if (addSize) {
atomic_add_fetch_64(&pSession->pJob->job.allocMemSize, addSize);
atomic_add_fetch_64(&pPool->allocMemSize, addSize);
}
int64_t allocMemSize = atomic_add_fetch_64(&pSession->allocMemSize, size);
mpUpdateMaxAllocSize(&pSession->maxAllocMemSize, allocMemSize);
@ -281,7 +284,7 @@ int32_t mpChkQuotaOverflow(SMemPool* pPool, SMPSession* pSession, int64_t size)
quota = atomic_load_64(&pPool->retireThreshold[1]);
if (pAllocSize >= quota) {
uInfo("%s pool allocSize %" PRId64 " reaches the middle quota %" PRId64, pPool->name, pAllocSize, quota);
if (cAllocSize >= atomic_load_64(&pPool->retireUnit) / 2) {
if (cAllocSize >= atomic_load_64(&pPool->cfg.retireUnitSize) / 2) {
code = TSDB_CODE_QRY_QUERY_MEM_EXHAUSTED;
pPool->cfg.cb.retireJobFp(&pJob->job, code);
(void)atomic_sub_fetch_64(&pJob->job.allocMemSize, size);
@ -298,7 +301,7 @@ int32_t mpChkQuotaOverflow(SMemPool* pPool, SMPSession* pSession, int64_t size)
quota = atomic_load_64(&pPool->retireThreshold[0]);
if (pAllocSize >= quota) {
uInfo("%s pool allocSize %" PRId64 " reaches the low quota %" PRId64, pPool->name, pAllocSize, quota);
if (cAllocSize >= atomic_load_64(&pPool->retireUnit)) {
if (cAllocSize >= atomic_load_64(&pPool->cfg.retireUnitSize)) {
code = TSDB_CODE_QRY_QUERY_MEM_EXHAUSTED;
pPool->cfg.cb.retireJobFp(&pJob->job, code);
@ -318,19 +321,18 @@ int64_t mpGetMemorySizeImpl(SMemPool* pPool, SMPSession* pSession, void *ptr) {
return (*gMPFps[gMPMgmt.strategy].getSizeFp)(pPool, pSession, ptr);
}
int32_t mpMalloc(SMemPool* pPool, SMPSession* pSession, int64_t size, uint32_t alignment, void** ppRes) {
int32_t mpMalloc(SMemPool* pPool, SMPSession* pSession, int64_t* size, uint32_t alignment, void** ppRes) {
MP_RET((*gMPFps[gMPMgmt.strategy].allocFp)(pPool, pSession, size, alignment, ppRes));
}
int32_t mpCalloc(SMemPool* pPool, SMPSession* pSession, int64_t num, int64_t size, void** ppRes) {
int32_t mpCalloc(SMemPool* pPool, SMPSession* pSession, int64_t* size, void** ppRes) {
int32_t code = TSDB_CODE_SUCCESS;
int64_t totalSize = num * size;
void *res = NULL;
MP_ERR_RET(mpMalloc(pPool, pSession, totalSize, 0, &res));
MP_ERR_RET(mpMalloc(pPool, pSession, size, 0, &res));
if (NULL != res) {
TAOS_MEMSET(res, 0, totalSize);
TAOS_MEMSET(res, 0, *size);
}
_return:
@ -353,7 +355,7 @@ void mpFree(SMemPool* pPool, SMPSession* pSession, void *ptr, int64_t* origSize)
(*gMPFps[gMPMgmt.strategy].freeFp)(pPool, pSession, ptr, origSize);
}
int32_t mpRealloc(SMemPool* pPool, SMPSession* pSession, void **pPtr, int64_t size, int64_t* origSize) {
int32_t mpRealloc(SMemPool* pPool, SMPSession* pSession, void **pPtr, int64_t* size, int64_t* origSize) {
int32_t code = TSDB_CODE_SUCCESS;
if (NULL == *pPtr) {
@ -361,7 +363,7 @@ int32_t mpRealloc(SMemPool* pPool, SMPSession* pSession, void **pPtr, int64_t si
MP_RET(mpMalloc(pPool, pSession, size, 0, pPtr));
}
if (0 == size) {
if (0 == *size) {
mpFree(pPool, pSession, *pPtr, origSize);
*pPtr = NULL;
return TSDB_CODE_SUCCESS;
@ -554,7 +556,6 @@ void mpLogStatDetail(SMPStatDetail* pDetail, EMPStatLogItem item, SMPStatInput*
case E_MP_STAT_LOG_MEM_TRIM: {
if (MP_GET_FLAG(pInput->procFlags, MP_STAT_PROC_FLAG_EXEC)) {
atomic_add_fetch_64(&pDetail->times.memTrim.exec, 1);
atomic_add_fetch_64(&pDetail->bytes.memTrim.exec, pInput->origSize);
}
if (MP_GET_FLAG(pInput->procFlags, MP_STAT_PROC_FLAG_RES_SUCC)) {
atomic_add_fetch_64(&pDetail->times.memTrim.succ, 1);
@ -616,8 +617,8 @@ void mpCheckStatDetail(void* poolHandle, void* session, char* detailName) {
pCtrl = &pSession->ctrlInfo;
pDetail = &pSession->stat.statDetail;
if (MP_GET_FLAG(pCtrl->funcFlags, MP_CTRL_FLAG_CHECK_STAT)) {
int64_t allocSize = pDetail->bytes.memMalloc.succ + pDetail->bytes.memCalloc.succ + pDetail->bytes.memRealloc.succ + pDetail->bytes.strdup.succ + pDetail->bytes.strndup.succ;
int64_t freeSize = pDetail->bytes.memRealloc.origSucc + pDetail->bytes.memFree.succ;
int64_t allocSize = MEMPOOL_GET_ALLOC_SIZE(pDetail);
int64_t freeSize = MEMPOOL_GET_FREE_SIZE(pDetail);
if (allocSize != freeSize) {
uError("%s Session in JOB:0x%" PRIx64 " stat check failed, allocSize:%" PRId64 ", freeSize:%" PRId64,
@ -671,9 +672,9 @@ void* mpMgmtThreadFunc(void* param) {
}
if (atomic_load_8(&gMPMgmt.msgQueue.midLevelRetire)) {
(*gMPMgmt.msgQueue.pPool->cfg.cb.retireJobsFp)(gMPMgmt.msgQueue.pPool, atomic_load_64(&gMPMgmt.msgQueue.pPool->retireUnit), false, TSDB_CODE_QRY_QUERY_MEM_EXHAUSTED);
(*gMPMgmt.msgQueue.pPool->cfg.cb.retireJobsFp)(gMPMgmt.msgQueue.pPool, atomic_load_64(&gMPMgmt.msgQueue.pPool->cfg.retireUnitSize), false, TSDB_CODE_QRY_QUERY_MEM_EXHAUSTED);
} else if (atomic_load_8(&gMPMgmt.msgQueue.lowLevelRetire)) {
(*gMPMgmt.msgQueue.pPool->cfg.cb.retireJobsFp)(gMPMgmt.msgQueue.pPool, atomic_load_64(&gMPMgmt.msgQueue.pPool->retireUnit), true, TSDB_CODE_QRY_QUERY_MEM_EXHAUSTED);
(*gMPMgmt.msgQueue.pPool->cfg.cb.retireJobsFp)(gMPMgmt.msgQueue.pPool, atomic_load_64(&gMPMgmt.msgQueue.pPool->cfg.retireUnitSize), true, TSDB_CODE_QRY_QUERY_MEM_EXHAUSTED);
}
mpCheckUpateCfg();
@ -874,7 +875,7 @@ void *taosMemPoolMalloc(void* poolHandle, void* session, int64_t size, char* fil
SMPSession* pSession = (SMPSession*)session;
SMPStatInput input = {.size = size, .file = fileName, .line = lineNo, .procFlags = MP_STAT_PROC_FLAG_EXEC};
terrno = mpMalloc(pPool, pSession, size, 0, &res);
terrno = mpMalloc(pPool, pSession, &input.size, 0, &res);
MP_SET_FLAG(input.procFlags, (res ? MP_STAT_PROC_FLAG_RES_SUCC : MP_STAT_PROC_FLAG_RES_FAIL));
mpLogStat(pPool, pSession, E_MP_STAT_LOG_MEM_MALLOC, &input);
@ -899,7 +900,7 @@ void *taosMemPoolCalloc(void* poolHandle, void* session, int64_t num, int64_t
int64_t totalSize = num * size;
SMPStatInput input = {.size = totalSize, .file = fileName, .line = lineNo, .procFlags = MP_STAT_PROC_FLAG_EXEC};
terrno = mpCalloc(pPool, pSession, num, size, &res);
terrno = mpCalloc(pPool, pSession, &input.size, &res);
MP_SET_FLAG(input.procFlags, (res ? MP_STAT_PROC_FLAG_RES_SUCC : MP_STAT_PROC_FLAG_RES_FAIL));
mpLogStat(pPool, pSession, E_MP_STAT_LOG_MEM_CALLOC, &input);
@ -920,9 +921,9 @@ void *taosMemPoolRealloc(void* poolHandle, void* session, void *ptr, int64_t siz
SMemPool* pPool = (SMemPool*)poolHandle;
SMPSession* pSession = (SMPSession*)session;
SMPStatInput input = {.size = size, .file = fileName, .line = lineNo, .procFlags = MP_STAT_PROC_FLAG_EXEC};
SMPStatInput input = {.size = size, .file = fileName, .line = lineNo, .procFlags = MP_STAT_PROC_FLAG_EXEC, .origSize = 0};
terrno = mpRealloc(pPool, pSession, &ptr, size, &input.origSize);
terrno = mpRealloc(pPool, pSession, &ptr, &input.size, &input.origSize);
if (ptr || 0 == size) {
MP_SET_FLAG(input.procFlags, MP_STAT_PROC_FLAG_RES_SUCC);
@ -956,7 +957,7 @@ char *taosMemPoolStrdup(void* poolHandle, void* session, const char *ptr, char*
int64_t size = strlen(ptr) + 1;
SMPStatInput input = {.size = size, .file = fileName, .line = lineNo, .procFlags = MP_STAT_PROC_FLAG_EXEC};
terrno = mpMalloc(pPool, pSession, size, 0, &res);
terrno = mpMalloc(pPool, pSession, &input.size, 0, &res);
if (NULL != res) {
TAOS_STRCPY(res, ptr);
*((char*)res + size - 1) = 0;
@ -986,7 +987,7 @@ char *taosMemPoolStrndup(void* poolHandle, void* session, const char *ptr, int64
size = TMIN(size, origSize) + 1;
SMPStatInput input = {.size = size, .file = fileName, .line = lineNo, .procFlags = MP_STAT_PROC_FLAG_EXEC};
terrno = mpMalloc(pPool, pSession, size, 0, &res);
terrno = mpMalloc(pPool, pSession, &input.size, 0, &res);
if (NULL != res) {
TAOS_MEMCPY(res, ptr, size - 1);
*((char*)res + size - 1) = 0;
@ -1058,7 +1059,7 @@ void* taosMemPoolMallocAlign(void* poolHandle, void* session, uint32_t alignment
SMPSession* pSession = (SMPSession*)session;
SMPStatInput input = {.size = size, .file = fileName, .line = lineNo, .procFlags = MP_STAT_PROC_FLAG_EXEC};
terrno = mpMalloc(pPool, pSession, size, alignment, &res);
terrno = mpMalloc(pPool, pSession, &input.size, alignment, &res);
MP_SET_FLAG(input.procFlags, (res ? MP_STAT_PROC_FLAG_RES_SUCC : MP_STAT_PROC_FLAG_RES_FAIL));
mpLogStat(pPool, pSession, E_MP_STAT_LOG_MEM_MALLOC, &input);
@ -1095,11 +1096,11 @@ int32_t taosMemPoolTrim(void* poolHandle, void* session, int32_t size, char* fil
SMemPool* pPool = (SMemPool*)poolHandle;
SMPSession* pSession = (SMPSession*)session;
SMPStatInput input = {.origSize = 1, .size = 0, .file = fileName, .line = lineNo, .procFlags = MP_STAT_PROC_FLAG_EXEC};
SMPStatInput input = {.size = 0, .file = fileName, .line = lineNo, .procFlags = MP_STAT_PROC_FLAG_EXEC};
code = mpTrim(pPool, pSession, size, trimed);
input.size = (trimed) ? 1 : 0;
input.size = (trimed) ? (*trimed) : 0;
MP_SET_FLAG(input.procFlags, ((0 == code) ? MP_STAT_PROC_FLAG_RES_SUCC : MP_STAT_PROC_FLAG_RES_FAIL));
mpLogStat(pPool, pSession, E_MP_STAT_LOG_MEM_TRIM, &input);
@ -1123,7 +1124,14 @@ int32_t taosMemPoolCallocJob(uint64_t jobId, void** ppJob) {
}
void taosMemPoolGetUsedSizeBegin(void* poolHandle, int64_t* usedSize, bool* needEnd) {
if (NULL == poolHandle) {
*usedSize = 0;
*needEnd = false;
return;
}
SMemPool* pPool = (SMemPool*)poolHandle;
#if 0
if ((atomic_load_64(&pPool->cfg.maxSize) - atomic_load_64(&pPool->allocMemSize)) <= MP_CFG_UPDATE_MIN_RESERVE_SIZE) {
*needEnd = true;
taosWLockLatch(&pPool->cfgLock);
@ -1132,6 +1140,11 @@ void taosMemPoolGetUsedSizeBegin(void* poolHandle, int64_t* usedSize, bool* need
}
*usedSize = atomic_load_64(&pPool->allocMemSize);
#else
taosWLockLatch(&pPool->cfgLock);
*needEnd = true;
*usedSize = atomic_load_64(&pPool->allocMemSize);
#endif
}
void taosMemPoolGetUsedSizeEnd(void* poolHandle) {
@ -1144,6 +1157,26 @@ bool taosMemPoolNeedRetireJob(void* poolHandle) {
return atomic_load_64(&pPool->allocMemSize) >= atomic_load_64(&pPool->retireThreshold[0]);
}
int32_t taosMemPoolGetSessionStat(void* session, SMPStatDetail** ppStat, int64_t* allocSize, int64_t* maxAllocSize) {
if (NULL == session || (NULL == ppStat && NULL == allocSize && NULL == maxAllocSize)) {
uError("%s invalid input param, session:%p, ppStat:%p, allocSize:%p, maxAllocSize:%p", __FUNCTION__, session, ppStat, allocSize, maxAllocSize);
MP_ERR_RET(TSDB_CODE_INVALID_MEM_POOL_PARAM);
}
SMPSession* pSession = (SMPSession*)session;
if (ppStat) {
*ppStat = &pSession->stat.statDetail;
}
if (allocSize) {
*allocSize = atomic_load_64(&pSession->allocMemSize);
}
if (maxAllocSize) {
*maxAllocSize = atomic_load_64(&pSession->maxAllocMemSize);
}
return TSDB_CODE_SUCCESS;
}
void taosAutoMemoryFree(void *ptr) {
if (NULL != threadPoolHandle) {

View File

@ -42,7 +42,7 @@
namespace {
#define MPT_PRINTF (void)printf
#define MPT_MAX_MEM_ACT_TIMES 1000
#define MPT_MAX_MEM_ACT_TIMES 300
#define MPT_MAX_SESSION_NUM 100
#define MPT_MAX_JOB_NUM 100
#define MPT_MAX_THREAD_NUM 100
@ -243,11 +243,11 @@ void mptInit() {
mptInitLogFile();
mptCtrl.caseLoopTimes = 1;
mptCtrl.taskActTimes = 50;
mptCtrl.taskActTimes = 0;
mptCtrl.maxSingleAllocSize = 104857600;
mptCtrl.jobNum = 100;
mptCtrl.jobExecTimes = 1;
mptCtrl.jobTaskNum = 1;
mptCtrl.jobTaskNum = 0;
mptCtx.pJobs = taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
ASSERT_TRUE(NULL != mptCtx.pJobs);
@ -269,6 +269,9 @@ void mptDestroyTaskCtx(SMPTestTaskCtx* pTask, void* pSession) {
mptEnableMemoryPoolUsage(mptCtx.memPoolHandle, pSession);
for (int32_t i = 0; i < pTask->memIdx; ++i) {
pTask->stat.times.memFree.exec++;
pTask->stat.bytes.memFree.exec+=mptMemorySize(pTask->pMemList[i].p);
pTask->stat.bytes.memFree.succ+=mptMemorySize(pTask->pMemList[i].p);
mptMemoryFree(pTask->pMemList[i].p);
pTask->pMemList[i].p = NULL;
}
@ -397,6 +400,14 @@ int32_t mptDestroyJob(SMPTestJobCtx* pJobCtx, bool reset) {
uint64_t jobId = pJobCtx->jobId;
for (int32_t i = 0; i < pJobCtx->taskNum; ++i) {
SMPStatDetail* pStat = NULL;
int64_t allocSize = 0;
taosMemPoolGetSessionStat(pJobCtx->pSessions[i], &pStat, &allocSize, NULL);
int64_t usedSize = MEMPOOL_GET_USED_SIZE(pStat);
assert(allocSize == usedSize);
assert(0 == memcmp(pStat, &pJobCtx->taskCtxs[i].stat, sizeof(*pStat)));
mptDestroyTaskCtx(&pJobCtx->taskCtxs[i], pJobCtx->pSessions[i]);
taosMemPoolDestroySession(mptCtx.memPoolHandle, pJobCtx->pSessions[i]);
}
@ -502,18 +513,18 @@ void mptCheckUpateCfgCb(void* pHandle, void* cfg) {
atomic_store_64(&pCfg->jobQuota, newJobQuota);
}
int64_t maxSize = 0;
int64_t freeSize = 0;
bool autoMaxSize = false;
int32_t code = mptGetQueryMemPoolMaxSize(pHandle, &maxSize, &autoMaxSize);
int32_t code = mptGetQueryMemPoolMaxSize(pHandle, &freeSize, &autoMaxSize);
if (TSDB_CODE_SUCCESS != code) {
pCfg->maxSize = 0;
uError("get query memPool maxSize failed, reset maxSize to %" PRId64, pCfg->maxSize);
pCfg->freeSize = 0;
uError("get query memPool freeSize failed, reset freeSize to %" PRId64, pCfg->freeSize);
return;
}
if (pCfg->autoMaxSize != autoMaxSize || pCfg->maxSize != maxSize) {
if (pCfg->autoMaxSize != autoMaxSize || pCfg->freeSize != freeSize) {
pCfg->autoMaxSize = autoMaxSize;
atomic_store_64(&pCfg->maxSize, maxSize);
atomic_store_64(&pCfg->freeSize, freeSize);
taosMemPoolCfgUpdate(pHandle, pCfg);
}
}
@ -618,11 +629,11 @@ void mptInitPool(void) {
cfg.autoMaxSize = mptCtx.param.autoPoolSize;
if (!mptCtx.param.autoPoolSize) {
cfg.maxSize = mptCtx.param.poolSize;
cfg.freeSize = mptCtx.param.poolSize;
} else {
int64_t memSize = 0;
ASSERT_TRUE(0 == taosGetSysAvailMemory(&memSize));
cfg.maxSize = memSize * 0.8;
cfg.freeSize = memSize * 0.8;
}
cfg.threadNum = 10; //TODO
cfg.evicPolicy = E_EVICT_AUTO; //TODO
@ -635,10 +646,20 @@ void mptInitPool(void) {
ASSERT_TRUE(0 == taosMemPoolOpen("testQMemPool", &cfg, &mptCtx.memPoolHandle));
}
void mptWriteMem(void* pStart, int32_t size) {
char* pEnd = (char*)pStart + size - 1;
char* p = (char*)pStart;
while (p <= pEnd) {
*p = 'a' + taosRand() % 26;
p += 4096;
}
}
void mptSimulateAction(SMPTestJobCtx* pJobCtx, SMPTestTaskCtx* pTask) {
int32_t actId = 0;
bool actDone = false;
int32_t size = taosRand() % mptCtrl.maxSingleAllocSize;
int32_t osize = 0, nsize = 0;
while (!actDone) {
actId = taosRand() % 10;
@ -649,19 +670,23 @@ void mptSimulateAction(SMPTestJobCtx* pJobCtx, SMPTestTaskCtx* pTask) {
}
pTask->pMemList[pTask->memIdx].p = mptMemoryMalloc(size);
pTask->stat.times.memMalloc.exec++;
pTask->stat.bytes.memMalloc.exec+=size;
if (NULL == pTask->pMemList[pTask->memIdx].p) {
pTask->stat.times.memMalloc.exec++;
pTask->stat.bytes.memMalloc.exec+=size;
pTask->stat.times.memMalloc.fail++;
pTask->stat.bytes.memMalloc.fail+=size;
uError("JOB:0x%x TASK:0x%x mpMalloc %d failed, terrno:0x%x", pJobCtx->jobId, pTask->taskId, size, terrno);
return;
}
pTask->stat.bytes.memMalloc.succ+=size;
*(char*)pTask->pMemList[pTask->memIdx].p = 'a' + taosRand() % 26;
*((char*)pTask->pMemList[pTask->memIdx].p + size -1) = 'a' + taosRand() % 26;
nsize = mptMemorySize(pTask->pMemList[pTask->memIdx].p);
pTask->stat.times.memMalloc.exec++;
pTask->stat.bytes.memMalloc.exec+=nsize;
pTask->stat.bytes.memMalloc.succ+=nsize;
pTask->stat.times.memMalloc.succ++;
mptWriteMem(pTask->pMemList[pTask->memIdx].p, size);
pTask->pMemList[pTask->memIdx].size = size;
pTask->memIdx++;
pTask->lastAct = actId;
actDone = true;
@ -673,73 +698,91 @@ void mptSimulateAction(SMPTestJobCtx* pJobCtx, SMPTestTaskCtx* pTask) {
}
pTask->pMemList[pTask->memIdx].p = mptMemoryCalloc(1, size);
pTask->stat.times.memCalloc.exec++;
pTask->stat.bytes.memCalloc.exec+=size;
if (NULL == pTask->pMemList[pTask->memIdx].p) {
pTask->stat.times.memCalloc.exec++;
pTask->stat.bytes.memCalloc.exec+=size;
pTask->stat.times.memCalloc.fail++;
pTask->stat.bytes.memCalloc.fail+=size;
uError("JOB:0x%x TASK:0x%x mpCalloc %d failed, terrno:0x%x", pJobCtx->jobId, pTask->taskId, size, terrno);
return;
}
pTask->stat.bytes.memCalloc.succ+=size;
*(char*)pTask->pMemList[pTask->memIdx].p = 'a' + taosRand() % 26;
*((char*)pTask->pMemList[pTask->memIdx].p + size -1) = 'a' + taosRand() % 26;
nsize = mptMemorySize(pTask->pMemList[pTask->memIdx].p);
pTask->stat.times.memCalloc.exec++;
pTask->stat.bytes.memCalloc.exec+=nsize;
pTask->stat.times.memCalloc.succ++;
pTask->stat.bytes.memCalloc.succ+=nsize;
mptWriteMem(pTask->pMemList[pTask->memIdx].p, size);
pTask->pMemList[pTask->memIdx].size = size;
pTask->memIdx++;
pTask->lastAct = actId;
actDone = true;
break;
}
case 2:{ // new realloc
break;
if (pTask->memIdx >= MPT_MAX_MEM_ACT_TIMES) {
break;
}
pTask->pMemList[pTask->memIdx].p = mptMemoryRealloc(NULL, size);
pTask->stat.times.memRealloc.exec++;
pTask->stat.bytes.memRealloc.exec+=size;
if (NULL == pTask->pMemList[pTask->memIdx].p) {
pTask->stat.times.memRealloc.exec++;
pTask->stat.bytes.memRealloc.exec+=size;
pTask->stat.times.memRealloc.fail++;
pTask->stat.bytes.memRealloc.fail+=size;
uError("JOB:0x%x TASK:0x%x new mpRealloc %d failed, terrno:0x%x", pJobCtx->jobId, pTask->taskId, size, terrno);
return;
}
pTask->stat.bytes.memRealloc.succ+=size;
*(char*)pTask->pMemList[pTask->memIdx].p = 'a' + taosRand() % 26;
*((char*)pTask->pMemList[pTask->memIdx].p + size -1) = 'a' + taosRand() % 26;
nsize = mptMemorySize(pTask->pMemList[pTask->memIdx].p);
pTask->stat.times.memRealloc.exec++;
pTask->stat.bytes.memRealloc.exec+=nsize;
pTask->stat.bytes.memRealloc.succ+=nsize;
pTask->stat.times.memRealloc.succ++;
mptWriteMem(pTask->pMemList[pTask->memIdx].p, size);
pTask->pMemList[pTask->memIdx].size = size;
pTask->memIdx++;
pTask->lastAct = actId;
actDone = true;
break;
}
case 3:{ // real realloc
break;
if (pTask->memIdx <= 0) {
break;
}
assert(pTask->pMemList[pTask->memIdx - 1].p);
osize = mptMemorySize(pTask->pMemList[pTask->memIdx - 1].p);
size++;
pTask->pMemList[pTask->memIdx - 1].p = mptMemoryRealloc(pTask->pMemList[pTask->memIdx - 1].p, size);
pTask->stat.times.memRealloc.exec++;
pTask->stat.bytes.memRealloc.exec+=size;
if (NULL == pTask->pMemList[pTask->memIdx - 1].p) {
pTask->stat.times.memRealloc.exec++;
pTask->stat.bytes.memRealloc.exec+=size;
pTask->stat.bytes.memRealloc.origExec+=osize;
pTask->stat.times.memRealloc.fail++;
pTask->stat.bytes.memRealloc.fail+=size;
pTask->stat.bytes.memFree.succ+=pTask->pMemList[pTask->memIdx - 1].size;
pTask->stat.bytes.memFree.succ+=osize;
uError("JOB:0x%x TASK:0x%x real mpRealloc %d failed, terrno:0x%x", pJobCtx->jobId, pTask->taskId, size, terrno);
pTask->memIdx--;
pTask->pMemList[pTask->memIdx].size = 0;
return;
}
pTask->stat.bytes.memRealloc.succ+=size;
pTask->stat.bytes.memRealloc.origSucc+=pTask->pMemList[pTask->memIdx - 1].size;
*(char*)pTask->pMemList[pTask->memIdx - 1].p = 'a' + taosRand() % 26;
*((char*)pTask->pMemList[pTask->memIdx - 1].p + size -1) = 'a' + taosRand() % 26;
nsize = mptMemorySize(pTask->pMemList[pTask->memIdx - 1].p);
pTask->stat.times.memRealloc.exec++;
pTask->stat.bytes.memRealloc.exec+=nsize;
pTask->stat.bytes.memRealloc.origExec+=osize;
pTask->stat.bytes.memRealloc.origSucc+=osize;
pTask->stat.times.memRealloc.succ++;
pTask->stat.bytes.memRealloc.succ+=nsize;
mptWriteMem(pTask->pMemList[pTask->memIdx - 1].p, size);
pTask->pMemList[pTask->memIdx - 1].size = size;
pTask->lastAct = actId;
actDone = true;
break;
@ -750,14 +793,16 @@ void mptSimulateAction(SMPTestJobCtx* pJobCtx, SMPTestTaskCtx* pTask) {
}
assert(pTask->pMemList[pTask->memIdx - 1].p);
osize = mptMemorySize(pTask->pMemList[pTask->memIdx - 1].p);
pTask->pMemList[pTask->memIdx - 1].p = mptMemoryRealloc(pTask->pMemList[pTask->memIdx - 1].p, 0);
pTask->stat.times.memRealloc.exec++;
pTask->stat.bytes.memRealloc.exec+=size;
pTask->stat.bytes.memRealloc.origExec+=osize;
assert(NULL == pTask->pMemList[pTask->memIdx - 1].p);
pTask->stat.bytes.memRealloc.origSucc+=pTask->pMemList[pTask->memIdx - 1].size;
pTask->stat.times.memRealloc.succ++;
pTask->stat.bytes.memRealloc.origSucc+=osize;
pTask->pMemList[pTask->memIdx - 1].size = 0;
pTask->memIdx--;
pTask->lastAct = actId;
actDone = true;
@ -772,15 +817,25 @@ void mptSimulateAction(SMPTestJobCtx* pJobCtx, SMPTestTaskCtx* pTask) {
mptCtrl.pSrcString[size] = 0;
pTask->pMemList[pTask->memIdx].p = mptStrdup(mptCtrl.pSrcString);
mptCtrl.pSrcString[size] = 'W';
if (NULL == pTask->pMemList[pTask->memIdx].p) {
pTask->stat.times.strdup.exec++;
pTask->stat.bytes.strdup.exec+=size + 1;
pTask->stat.times.strdup.fail++;
pTask->stat.bytes.strdup.fail+=size + 1;
uError("JOB:0x%x TASK:0x%x mpStrdup %d failed, terrno:0x%x", pJobCtx->jobId, pTask->taskId, size, terrno);
return;
}
*(char*)pTask->pMemList[pTask->memIdx].p = 'a' + taosRand() % 26;
*((char*)pTask->pMemList[pTask->memIdx].p + size - 1) = 'a' + taosRand() % 26;
nsize = mptMemorySize(pTask->pMemList[pTask->memIdx].p);
pTask->stat.times.strdup.exec++;
pTask->stat.bytes.strdup.exec+= nsize;
pTask->stat.times.strdup.succ++;
pTask->stat.bytes.strdup.succ+=nsize;
mptWriteMem(pTask->pMemList[pTask->memIdx].p, size);
pTask->pMemList[pTask->memIdx].size = size + 1;
pTask->memIdx++;
pTask->lastAct = actId;
actDone = true;
@ -792,16 +847,28 @@ void mptSimulateAction(SMPTestJobCtx* pJobCtx, SMPTestTaskCtx* pTask) {
}
size /= 10;
assert(strlen(mptCtrl.pSrcString) > size);
pTask->pMemList[pTask->memIdx].p = mptStrndup(mptCtrl.pSrcString, size);
if (NULL == pTask->pMemList[pTask->memIdx].p) {
pTask->stat.times.strndup.exec++;
pTask->stat.bytes.strndup.exec+=size + 1;
pTask->stat.times.strndup.fail++;
pTask->stat.bytes.strndup.fail+=size + 1;
uError("JOB:0x%x TASK:0x%x mpStrndup %d failed, terrno:0x%x", pJobCtx->jobId, pTask->taskId, size, terrno);
return;
}
*(char*)pTask->pMemList[pTask->memIdx].p = 'a' + taosRand() % 26;
*((char*)pTask->pMemList[pTask->memIdx].p + size -1) = 'a' + taosRand() % 26;
assert(strlen((char*)pTask->pMemList[pTask->memIdx].p) == size);
nsize = mptMemorySize(pTask->pMemList[pTask->memIdx].p);
pTask->stat.times.strndup.exec++;
pTask->stat.bytes.strndup.exec+=nsize;
pTask->stat.times.strndup.succ++;
pTask->stat.bytes.strndup.succ+=nsize;
mptWriteMem(pTask->pMemList[pTask->memIdx].p, size);
pTask->pMemList[pTask->memIdx].size = size + 1;
pTask->memIdx++;
pTask->lastAct = actId;
actDone = true;
@ -813,7 +880,12 @@ void mptSimulateAction(SMPTestJobCtx* pJobCtx, SMPTestTaskCtx* pTask) {
}
assert(pTask->pMemList[pTask->memIdx - 1].p);
osize = mptMemorySize(pTask->pMemList[pTask->memIdx - 1].p);
mptMemoryFree(pTask->pMemList[pTask->memIdx - 1].p);
pTask->stat.times.memFree.exec++;
pTask->stat.times.memFree.succ++;
pTask->stat.bytes.memFree.exec+=osize;
pTask->stat.bytes.memFree.succ+=osize;
pTask->pMemList[pTask->memIdx - 1].p = NULL;
pTask->memIdx--;
@ -822,7 +894,17 @@ void mptSimulateAction(SMPTestJobCtx* pJobCtx, SMPTestTaskCtx* pTask) {
break;
}
case 8:{ // trim
mptMemoryTrim(0, NULL);
bool trimed = false;
int32_t code = mptMemoryTrim(0, &trimed);
pTask->stat.times.memTrim.exec++;
if (code) {
pTask->stat.times.memTrim.fail++;
} else {
pTask->stat.times.memTrim.succ++;
if (trimed) {
pTask->stat.bytes.memTrim.succ++;
}
}
pTask->lastAct = actId;
actDone = true;
break;
@ -834,14 +916,23 @@ void mptSimulateAction(SMPTestJobCtx* pJobCtx, SMPTestTaskCtx* pTask) {
pTask->pMemList[pTask->memIdx].p = mptMemoryMallocAlign(8, size);
if (NULL == pTask->pMemList[pTask->memIdx].p) {
pTask->stat.times.memMalloc.exec++;
pTask->stat.bytes.memMalloc.exec+=size;
pTask->stat.times.memMalloc.fail++;
pTask->stat.bytes.memMalloc.fail+=size;
uError("JOB:0x%x TASK:0x%x mpMallocAlign %d failed, terrno:0x%x", pJobCtx->jobId, pTask->taskId, size, terrno);
return;
}
*(char*)pTask->pMemList[pTask->memIdx].p = 'a' + taosRand() % 26;
*((char*)pTask->pMemList[pTask->memIdx].p + size -1) = 'a' + taosRand() % 26;
nsize = mptMemorySize(pTask->pMemList[pTask->memIdx].p);
pTask->pMemList[pTask->memIdx].size = size;
mptWriteMem(pTask->pMemList[pTask->memIdx].p, size);
pTask->stat.times.memMalloc.exec++;
pTask->stat.bytes.memMalloc.exec+=nsize;
pTask->stat.times.memMalloc.succ++;
pTask->stat.bytes.memMalloc.succ+=nsize;
pTask->memIdx++;
pTask->lastAct = actId;
actDone = true;
@ -936,6 +1027,37 @@ void mptInitJobs() {
}
}
void mptCheckPoolUsedSize(int32_t jobNum) {
int64_t usedSize = 0;
bool needEnd = false;
int64_t poolUsedSize = 0;
taosMemPoolGetUsedSizeBegin(mptCtx.memPoolHandle, &usedSize, &needEnd);
for (int32_t i = 0; i < jobNum; ++i) {
SMPTestJobCtx* pJobCtx = &mptCtx.jobCtxs[i];
int64_t jobUsedSize = 0;
for (int32_t m = 0; m < pJobCtx->taskNum; ++m) {
SMPStatDetail* pStat = NULL;
int64_t allocSize = 0;
taosMemPoolGetSessionStat(pJobCtx->pSessions[m], &pStat, &allocSize, NULL);
int64_t usedSize = MEMPOOL_GET_USED_SIZE(pStat);
assert(allocSize == usedSize);
assert(0 == memcmp(pStat, &pJobCtx->taskCtxs[m].stat, sizeof(*pStat)));
jobUsedSize += allocSize;
}
assert(pJobCtx->pJob->memInfo->allocMemSize == jobUsedSize);
poolUsedSize += jobUsedSize;
}
assert(poolUsedSize == usedSize);
taosMemPoolGetUsedSizeEnd(mptCtx.memPoolHandle);
}
void* mptThreadFunc(void* param) {
SMPTestThread* pThread = (SMPTestThread*)param;
int32_t jobExecTimes = (mptCtrl.jobExecTimes) ? mptCtrl.jobExecTimes : taosRand() % MPT_MAX_JOB_LOOP_TIMES + 1;
@ -977,6 +1099,8 @@ void* mptThreadFunc(void* param) {
}
MPT_PRINTF("Thread %d finish the %dth job loops\n", pThread->idx, n);
mptCheckPoolUsedSize(jobNum);
}
return NULL;
@ -1037,7 +1161,34 @@ void mptPrintTestBeginInfo(char* caseName, SMPTestParam* param) {
} // namespace
#if 1
#if 1
TEST(FuncTest, SysMemoryPerfTest) {
char* caseName = "FuncTest:SingleThreadTest";
int32_t code = 0;
int64_t msize = 1048576UL*10240;
char* p = (char*)taosMemMalloc(msize);
int64_t st = taosGetTimestampUs();
memset(p, 0, msize);
int64_t totalUs = taosGetTimestampUs() - st;
printf("memset %" PRId64 " used time:%" PRId64 "us\n", msize, totalUs, totalUs);
int64_t freeSize = 0;
int32_t loopTimes = 1000000;
st = taosGetTimestampUs();
int64_t lt = st;
for (int32_t i = 0; i < loopTimes; ++i) {
code = taosGetSysAvailMemory(&freeSize);
assert(0 == code);
}
totalUs = taosGetTimestampUs() - st;
printf("%d times getSysMemory total time:%" PRId64 "us, avg:%dus\n", loopTimes, totalUs, totalUs/loopTimes);
}
#endif
#if 0
TEST(FuncTest, SingleThreadTest) {
char* caseName = "FuncTest:SingleThreadTest";
SMPTestParam param = {0};