diff --git a/source/util/inc/tmempoolInt.h b/source/util/inc/tmempoolInt.h index b2c1beed2f..cd4612c2b2 100755 --- a/source/util/inc/tmempoolInt.h +++ b/source/util/inc/tmempoolInt.h @@ -103,7 +103,7 @@ typedef struct SMPMemHeader { } SMPMemHeader; typedef struct SMPMemTailer { - + uint8_t tail; } SMPMemTailer; typedef struct SMPListNode { diff --git a/source/util/src/tmempool.c b/source/util/src/tmempool.c index 6b85583510..7498c4693a 100644 --- a/source/util/src/tmempool.c +++ b/source/util/src/tmempool.c @@ -179,8 +179,18 @@ uint32_t mpFileIdHashFp(const char* fileId, uint32_t len) { return *(uint32_t*)fileId; } +void mpDestroyPosStat(SMPStatPos* pStat) { + taosHashCleanup(pStat->fileHash); + pStat->fileHash = NULL; + taosHashCleanup(pStat->remainHash); + pStat->remainHash = NULL; + taosHashCleanup(pStat->allocHash); + pStat->allocHash = NULL; + taosHashCleanup(pStat->freeHash); + pStat->freeHash = NULL; +} -int32_t mpInitStat(SMPStatPos* pStat, bool sessionStat) { +int32_t mpInitPosStat(SMPStatPos* pStat, bool sessionStat) { pStat->remainHash = taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK); if (NULL == pStat->remainHash) { uError("memPool init posStat remainHash failed, error:%s, sessionStat:%d", tstrerror(terrno), sessionStat); @@ -235,7 +245,7 @@ int32_t mpInit(SMemPool* pPool, char* poolName, SMemPoolCfg* cfg) { MP_ERR_RET((*gMPFps[gMPMgmt.strategy].initFp)(pPool, poolName, cfg)); } - MP_ERR_RET(mpInitStat(&pPool->stat.posStat, false)); + MP_ERR_RET(mpInitPosStat(&pPool->stat.posStat, false)); return TSDB_CODE_SUCCESS; } @@ -1180,6 +1190,8 @@ void taosMemPoolDestroySession(void* poolHandle, void* session) { mpCheckStatDetail(pPool, pSession, "DestroySession"); + mpDestroyPosStat(&pSession->stat.posStat); + TAOS_MEMSET(pSession, 0, sizeof(*pSession)); mpPushIdleNode(pPool, &pPool->sessionCache, (SMPListNode*)pSession); @@ -1198,7 +1210,7 @@ int32_t taosMemPoolInitSession(void* poolHandle, void** ppSession, void* pJob) { MP_ERR_JRET((*gMPFps[gMPMgmt.strategy].initSessionFp)(pPool, pSession)); } - MP_ERR_JRET(mpInitStat(&pSession->stat.posStat, true)); + MP_ERR_JRET(mpInitPosStat(&pSession->stat.posStat, true)); pSession->pJob = (SMPJob*)pJob; (void)atomic_add_fetch_32(&pSession->pJob->remainSession, 1); @@ -1293,6 +1305,8 @@ void *taosMemPoolRealloc(void* poolHandle, void* session, void *ptr, int64_t siz mpLogStat(pPool, pSession, E_MP_STAT_LOG_MEM_REALLOC, &input); } else if (0 == size){ input.pMem = input.pOrigMem; + input.pOrigMem = NULL; + input.size = input.origSize; MP_SET_FLAG(input.procFlags, MP_STAT_PROC_FLAG_RES_SUCC); mpLogStat(pPool, pSession, E_MP_STAT_LOG_MEM_FREE, &input); input.pMem = NULL; @@ -1301,6 +1315,8 @@ void *taosMemPoolRealloc(void* poolHandle, void* session, void *ptr, int64_t siz mpLogStat(pPool, pSession, E_MP_STAT_LOG_MEM_REALLOC, &input); input.pMem = input.pOrigMem; + input.pOrigMem = NULL; + input.size = input.origSize; input.procFlags = MP_STAT_PROC_FLAG_EXEC; MP_SET_FLAG(input.procFlags, MP_STAT_PROC_FLAG_RES_SUCC); mpLogStat(pPool, pSession, E_MP_STAT_LOG_MEM_FREE, &input); @@ -1453,6 +1469,8 @@ void taosMemPoolClose(void* poolHandle) { mpCheckStatDetail(pPool, NULL, "PoolClose"); + mpDestroyPosStat(&pPool->stat.posStat); + taosMemoryFree(pPool->name); mpDestroyCacheGroup(&pPool->sessionCache); } @@ -1556,7 +1574,12 @@ int32_t taosMemoryPoolInit(mpReserveFailFp failFp, mpReserveReachFp reachFp) { } #endif - taosGetTotalMemory(&tsTotalMemoryKB); + code = taosGetTotalMemory(&tsTotalMemoryKB); + if (TSDB_CODE_SUCCESS != code) { + uInfo("fail to system total memory, error: %s", tstrerror(code)); + return code; + } + if (tsTotalMemoryKB <= 0) { uInfo("memory pool disabled since no enough system total memory, size: %" PRId64 "KB", tsTotalMemoryKB); return code; @@ -1602,7 +1625,7 @@ int32_t taosMemoryPoolInit(mpReserveFailFp failFp, mpReserveReachFp reachFp) { return code; } - uInfo("memory pool initialized"); + uInfo("memory pool initialized, reservedSize:%dMB, freeAfterReserved:%" PRId64 "MB", tsMinReservedMemorySize, freeSizeAfterRes/1048576UL); return code; } diff --git a/source/util/test/memPoolTest.cpp b/source/util/test/memPoolTest.cpp index f9bc0d6783..f3718702ee 100644 --- a/source/util/test/memPoolTest.cpp +++ b/source/util/test/memPoolTest.cpp @@ -66,7 +66,7 @@ threadlocal void* mptThreadPoolSession = NULL; #define mptEnableMemoryPoolUsage(_pool, _session) do { mptThreadPoolHandle = _pool; mptThreadPoolSession = _session; } while (0) -#define mptDisableMemoryPoolUsage() (mptThreadPoolHandle = NULL) +#define mptDisableMemoryPoolUsage() (mptThreadPoolHandle = NULL, mptThreadPoolSession = NULL) #define mptSaveDisableMemoryPoolUsage(_handle) do { (_handle) = mptThreadPoolHandle; mptThreadPoolHandle = NULL; } while (0) #define mptRestoreEnableMemoryPoolUsage(_handle) (mptThreadPoolHandle = (_handle)) @@ -176,7 +176,6 @@ typedef struct SMPTestCtx { int64_t tId; SHashObj* pJobs; BoundedQueue* pJobQueue; - void* memPoolHandle; SMPTestThread threadCtxs[MPT_MAX_THREAD_NUM]; SMPTestJobCtx* jobCtxs; SMPTestParam param; @@ -242,6 +241,7 @@ void mptDeleteJobQueueData(void* pData) { } void mptInit() { + osDefaultInit(); mptInitLogFile(); mptCtrl.caseLoopTimes = 1; @@ -266,10 +266,10 @@ void mptInit() { } void mptDestroyTaskCtx(SMPTestTaskCtx* pTask, void* pSession) { - assert(mptCtx.memPoolHandle); + assert(gMemPoolHandle); assert(pSession); - mptEnableMemoryPoolUsage(mptCtx.memPoolHandle, pSession); + mptEnableMemoryPoolUsage(gMemPoolHandle, pSession); for (int32_t i = 0; i < pTask->memIdx; ++i) { pTask->stat.times.memFree.exec++; pTask->stat.bytes.memFree.exec+=mptMemorySize(pTask->pMemList[i].p); @@ -349,7 +349,7 @@ int32_t mptInitSession(uint64_t qId, uint64_t tId, int32_t eId, SMPTestJobCtx* p pJobCtx->pJob = pJob; pJob->pCtx = pJobCtx; - assert(0 == taosMemPoolInitSession(mptCtx.memPoolHandle, ppSession, pJob->memInfo)); + assert(0 == taosMemPoolInitSession(gMemPoolHandle, ppSession, pJob->memInfo)); char id[sizeof(tId) + sizeof(eId)] = {0}; MPT_SET_TEID(id, tId, eId); @@ -411,7 +411,7 @@ int32_t mptDestroyJob(SMPTestJobCtx* pJobCtx, bool reset) { assert(0 == memcmp(pStat, &pJobCtx->taskCtxs[i].stat, sizeof(*pStat))); mptDestroyTaskCtx(&pJobCtx->taskCtxs[i], pJobCtx->pSessions[i]); - taosMemPoolDestroySession(mptCtx.memPoolHandle, pJobCtx->pSessions[i]); + taosMemPoolDestroySession(gMemPoolHandle, pJobCtx->pSessions[i]); } uDebug("JOB:0x%x idx:%d destroyed, code:0x%x", pJobCtx->jobId, pJobCtx->jobIdx, pJobCtx->pJob->errCode); @@ -531,23 +531,7 @@ void mptRetireJobCb(uint64_t jobId, int32_t errCode) { } void mptInitPool(void) { - SMemPoolCfg cfg = {0}; - - if (!mptCtx.param.reserveMode) { - //cfg.upperLimitSize = mptCtx.param.upperLimitSize; - } else { - int64_t memSize = 0; - ASSERT_TRUE(0 == taosGetSysAvailMemory(&memSize)); - cfg.reserveSize = &mptCtx.param.reserveSize; - } - cfg.threadNum = 10; //TODO - cfg.evicPolicy = E_EVICT_AUTO; //TODO - cfg.chunkSize = 1048576; - cfg.jobQuota = &mptCtx.param.jobQuota; - cfg.cb.failFp = mptRetireJobsCb; - cfg.cb.reachFp = mptRetireJobCb; - - ASSERT_TRUE(0 == taosMemPoolOpen("testQMemPool", &cfg, &mptCtx.memPoolHandle)); + assert(0 == taosMemoryPoolInit(mptRetireJobsCb, mptRetireJobCb)); } void mptWriteMem(void* pStart, int32_t size) { @@ -671,7 +655,11 @@ void mptSimulateAction(SMPTestJobCtx* pJobCtx, SMPTestTaskCtx* pTask) { pTask->stat.bytes.memRealloc.origExec+=osize; pTask->stat.times.memRealloc.fail++; pTask->stat.bytes.memRealloc.fail+=size; - pTask->stat.bytes.memFree.succ+=osize; + + pTask->stat.times.memFree.exec++; + pTask->stat.bytes.memFree.exec+=osize; + pTask->stat.times.memFree.succ++; + pTask->stat.bytes.memFree.succ+=osize; uError("JOB:0x%x TASK:0x%x real mpRealloc %d failed, terrno:0x%x", pJobCtx->jobId, pTask->taskId, size, terrno); pTask->memIdx--; return; @@ -700,12 +688,12 @@ void mptSimulateAction(SMPTestJobCtx* pJobCtx, SMPTestTaskCtx* pTask) { osize = mptMemorySize(pTask->pMemList[pTask->memIdx - 1].p); pTask->pMemList[pTask->memIdx - 1].p = mptMemoryRealloc(pTask->pMemList[pTask->memIdx - 1].p, 0); - pTask->stat.times.memRealloc.exec++; - pTask->stat.bytes.memRealloc.origExec+=osize; + pTask->stat.times.memFree.exec++; + pTask->stat.bytes.memFree.exec+=osize; assert(NULL == pTask->pMemList[pTask->memIdx - 1].p); - pTask->stat.times.memRealloc.succ++; - pTask->stat.bytes.memRealloc.origSucc+=osize; + pTask->stat.times.memFree.succ++; + pTask->stat.bytes.memFree.succ+=osize; pTask->memIdx--; pTask->lastAct = actId; @@ -905,7 +893,7 @@ void mptTaskRun(SMPTestJobCtx* pJobCtx, SMPTestTaskCtx* pCtx, int32_t idx) { atomic_add_fetch_32(&pJobCtx->taskRunningNum, 1); - mptEnableMemoryPoolUsage(mptCtx.memPoolHandle, pJobCtx->pSessions[idx]); + mptEnableMemoryPoolUsage(gMemPoolHandle, pJobCtx->pSessions[idx]); mptSimulateTask(pJobCtx, pCtx); mptDisableMemoryPoolUsage(); @@ -936,7 +924,7 @@ void mptCheckPoolUsedSize(int32_t jobNum) { bool needEnd = false; int64_t poolUsedSize = 0; - taosMemPoolGetUsedSizeBegin(mptCtx.memPoolHandle, &usedSize, &needEnd); + taosMemPoolGetUsedSizeBegin(gMemPoolHandle, &usedSize, &needEnd); for (int32_t i = 0; i < jobNum; ++i) { SMPTestJobCtx* pJobCtx = &mptCtx.jobCtxs[i]; int64_t jobUsedSize = 0; @@ -959,7 +947,7 @@ void mptCheckPoolUsedSize(int32_t jobNum) { assert(poolUsedSize == usedSize); - taosMemPoolGetUsedSizeEnd(mptCtx.memPoolHandle); + taosMemPoolGetUsedSizeEnd(gMemPoolHandle); } void* mptThreadFunc(void* param) { @@ -1057,7 +1045,7 @@ void mptPrintTestBeginInfo(char* caseName, SMPTestParam* param) { MPT_PRINTF("\t max single alloc size: %" PRId64 "\n", mptCtrl.maxSingleAllocSize); MPT_PRINTF("\t job quota size: %" PRId64 "\n", param->jobQuota); MPT_PRINTF("\t reserve mode: %d\n", param->reserveMode); - MPT_PRINTF("\t upper limit size: %" PRId64 "\n", param->upperLimitSize); + //MPT_PRINTF("\t upper limit size: %" PRId64 "\n", param->upperLimitSize); MPT_PRINTF("\t test thread num: %d\n", param->threadNum); MPT_PRINTF("\t random exec task: %d\n", param->randTask); } @@ -1066,7 +1054,7 @@ void mptPrintTestBeginInfo(char* caseName, SMPTestParam* param) { #if 1 -#if 1 +#if 0 TEST(FuncTest, SysMemoryPerfTest) { char* caseName = "FuncTest:SingleThreadTest"; int32_t code = 0; @@ -1076,7 +1064,7 @@ TEST(FuncTest, SysMemoryPerfTest) { int64_t st = taosGetTimestampUs(); memset(p, 0, msize); int64_t totalUs = taosGetTimestampUs() - st; - printf("memset %" PRId64 " used time:%" PRId64 "us\n", msize, totalUs, totalUs); + printf("memset %" PRId64 " used time:%" PRId64 "us, speed:%dMB/ms\n", msize, totalUs, msize/1048576UL/(totalUs/1000UL)); int64_t freeSize = 0; int32_t loopTimes = 1000000; @@ -1085,7 +1073,7 @@ TEST(FuncTest, SysMemoryPerfTest) { for (int32_t i = 0; i < loopTimes; ++i) { code = taosGetSysAvailMemory(&freeSize); assert(0 == code); - taosMsleep(1); + taosMsleep(5); } totalUs = taosGetTimestampUs() - st;