diff --git a/include/os/osMemPool.h b/include/os/osMemPool.h index 4f6d25b5e9..35b5c1fd2b 100644 --- a/include/os/osMemPool.h +++ b/include/os/osMemPool.h @@ -37,6 +37,41 @@ typedef struct SMemPoolJob { int64_t maxAllocMemSize; } SMemPoolJob; +typedef struct SMPStatItem { + int64_t inErr; + int64_t exec; + int64_t succ; + int64_t fail; +} SMPStatItem; + +typedef struct SMPStatItemExt { + int64_t inErr; + int64_t exec; + int64_t succ; + int64_t fail; + int64_t origExec; + int64_t origSucc; + int64_t origFail; +} SMPStatItemExt; + +typedef struct SMPMemoryStat { + SMPStatItem memMalloc; + SMPStatItem memCalloc; + SMPStatItemExt memRealloc; + SMPStatItem strdup; + SMPStatItem memFree; + + SMPStatItem chunkMalloc; + SMPStatItem chunkRecycle; + SMPStatItem chunkReUse; + SMPStatItem chunkFree; +} SMPMemoryStat; + +typedef struct SMPStatDetail { + SMPMemoryStat times; + SMPMemoryStat bytes; +} SMPStatDetail; + typedef void (*mpDecConcSessionNum)(void); typedef void (*mpIncConcSessionNum)(void); @@ -81,6 +116,7 @@ int32_t taosMemPoolInitSession(void* poolHandle, void** ppSession, void* pJob); void taosMemPoolDestroySession(void* poolHandle, void* session); int32_t taosMemPoolCallocJob(uint64_t jobId, void** ppJob); void taosMemPoolCfgUpdate(void* poolHandle, SMemPoolCfg* pCfg); +void taosMemPoolPrintStat(void* poolHandle, void* session, char* procName); #define taosMemPoolFreeClear(ptr) \ do { \ @@ -101,14 +137,14 @@ extern threadlocal void* threadPoolSession; #define taosSaveDisableMemoryPoolUsage(_handle) do { (_handle) = threadPoolHandle; threadPoolHandle = NULL; } while (0) #define taosRestoreEnableMemoryPoolUsage(_handle) (threadPoolHandle = (_handle)) -#define taosMemoryMalloc(_size) ((NULL != threadPoolHandle) ? (taosMemPoolMalloc(threadPoolHandle, threadPoolSession, _size, __FILE__, __LINE__)) : (taosMemMalloc(_size))) -#define taosMemoryCalloc(_num, _size) ((NULL != threadPoolHandle) ? (taosMemPoolCalloc(threadPoolHandle, threadPoolSession, _num, _size, __FILE__, __LINE__)) : (taosMemCalloc(_num, _size))) -#define taosMemoryRealloc(_ptr, _size) ((NULL != threadPoolHandle) ? (taosMemPoolRealloc(threadPoolHandle, threadPoolSession, _ptr, _size, __FILE__, __LINE__)) : (taosMemRealloc(_ptr, _size))) -#define taosStrdup(_ptr) ((NULL != threadPoolHandle) ? (taosMemPoolStrdup(threadPoolHandle, threadPoolSession, _ptr, __FILE__, __LINE__)) : (taosStrdupi(_ptr))) -#define taosMemoryFree(_ptr) ((NULL != threadPoolHandle) ? (taosMemPoolFree(threadPoolHandle, threadPoolSession, _ptr, __FILE__, __LINE__)) : (taosMemFree(_ptr))) -#define taosMemorySize(_ptr) ((NULL != threadPoolHandle) ? (taosMemPoolGetMemorySize(threadPoolHandle, threadPoolSession, _ptr, __FILE__, __LINE__)) : (taosMemSize(_ptr))) -#define taosMemoryTrim(_size) ((NULL != threadPoolHandle) ? (taosMemPoolTrim(threadPoolHandle, threadPoolSession, _size, __FILE__, __LINE__)) : (taosMemTrim(_size))) -#define taosMemoryMallocAlign(_alignment, _size) ((NULL != threadPoolHandle) ? (taosMemPoolMallocAlign(threadPoolHandle, threadPoolSession, _alignment, _size, __FILE__, __LINE__)) : (taosMemMallocAlign(_alignment, _size))) +#define taosMemoryMalloc(_size) ((NULL != threadPoolHandle) ? (taosMemPoolMalloc(threadPoolHandle, threadPoolSession, _size, (char*)__FILE__, __LINE__)) : (taosMemMalloc(_size))) +#define taosMemoryCalloc(_num, _size) ((NULL != threadPoolHandle) ? (taosMemPoolCalloc(threadPoolHandle, threadPoolSession, _num, _size, (char*)__FILE__, __LINE__)) : (taosMemCalloc(_num, _size))) +#define taosMemoryRealloc(_ptr, _size) ((NULL != threadPoolHandle) ? (taosMemPoolRealloc(threadPoolHandle, threadPoolSession, _ptr, _size, (char*)__FILE__, __LINE__)) : (taosMemRealloc(_ptr, _size))) +#define taosStrdup(_ptr) ((NULL != threadPoolHandle) ? (taosMemPoolStrdup(threadPoolHandle, threadPoolSession, _ptr, (char*)__FILE__, __LINE__)) : (taosStrdupi(_ptr))) +#define taosMemoryFree(_ptr) ((NULL != threadPoolHandle) ? (taosMemPoolFree(threadPoolHandle, threadPoolSession, _ptr, (char*)__FILE__, __LINE__)) : (taosMemFree(_ptr))) +#define taosMemorySize(_ptr) ((NULL != threadPoolHandle) ? (taosMemPoolGetMemorySize(threadPoolHandle, threadPoolSession, _ptr, (char*)__FILE__, __LINE__)) : (taosMemSize(_ptr))) +#define taosMemoryTrim(_size) ((NULL != threadPoolHandle) ? (taosMemPoolTrim(threadPoolHandle, threadPoolSession, _size, (char*)__FILE__, __LINE__)) : (taosMemTrim(_size))) +#define taosMemoryMallocAlign(_alignment, _size) ((NULL != threadPoolHandle) ? (taosMemPoolMallocAlign(threadPoolHandle, threadPoolSession, _alignment, _size, (char*)__FILE__, __LINE__)) : (taosMemMallocAlign(_alignment, _size))) #else #define taosEnableMemoryPoolUsage(_pool, _session) #define taosDisableMemoryPoolUsage() diff --git a/include/util/tlockfree.h b/include/util/tlockfree.h index 1a575f8ea1..80c93d31ec 100644 --- a/include/util/tlockfree.h +++ b/include/util/tlockfree.h @@ -75,6 +75,7 @@ void taosInitRWLatch(SRWLatch *pLatch); void taosWLockLatch(SRWLatch *pLatch); void taosWUnLockLatch(SRWLatch *pLatch); void taosRLockLatch(SRWLatch *pLatch); +int32_t taosRTryLockLatch(SRWLatch *pLatch); void taosRUnLockLatch(SRWLatch *pLatch); int32_t taosWTryLockLatch(SRWLatch *pLatch); diff --git a/source/util/inc/tmempoolInt.h b/source/util/inc/tmempoolInt.h index 66ff358b2c..166181d9be 100755 --- a/source/util/inc/tmempoolInt.h +++ b/source/util/inc/tmempoolInt.h @@ -137,40 +137,6 @@ typedef struct SMPStatInput { int32_t line; } SMPStatInput; -typedef struct SMPStatItem { - int64_t inErr; - int64_t exec; - int64_t succ; - int64_t fail; -} SMPStatItem; - -typedef struct SMPStatItemExt { - int64_t inErr; - int64_t exec; - int64_t succ; - int64_t fail; - int64_t origExec; - int64_t origSucc; - int64_t origFail; -} SMPStatItemExt; - -typedef struct SMPMemoryStat { - SMPStatItem memMalloc; - SMPStatItem memCalloc; - SMPStatItemExt memRealloc; - SMPStatItem strdup; - SMPStatItem memFree; - - SMPStatItem chunkMalloc; - SMPStatItem chunkRecycle; - SMPStatItem chunkReUse; - SMPStatItem chunkFree; -} SMPMemoryStat; - -typedef struct SMPStatDetail { - SMPMemoryStat times; - SMPMemoryStat bytes; -} SMPStatDetail; typedef struct SMPCtrlInfo { int64_t statFlags; diff --git a/source/util/src/tlockfree.c b/source/util/src/tlockfree.c index 1961b404b5..9af2e739e0 100644 --- a/source/util/src/tlockfree.c +++ b/source/util/src/tlockfree.c @@ -91,4 +91,21 @@ void taosRLockLatch(SRWLatch *pLatch) { } } +// no reentrant +int32_t taosRTryLockLatch(SRWLatch *pLatch) { + SRWLatch oLatch, nLatch; + oLatch = atomic_load_32(pLatch); + if (oLatch) { + return -1; + } + + nLatch = oLatch + 1; + if (atomic_val_compare_exchange_32(pLatch, oLatch, nLatch) == oLatch) { + return 0; + } + + return -1; +} + + void taosRUnLockLatch(SRWLatch *pLatch) { (void)atomic_fetch_sub_32(pLatch, 1); } diff --git a/source/util/src/tmempool.c b/source/util/src/tmempool.c index 0384f686aa..760eae3494 100644 --- a/source/util/src/tmempool.c +++ b/source/util/src/tmempool.c @@ -429,40 +429,7 @@ void mpPrintSessionStat(SMPCtrlInfo* pCtrl, SMPStatSession* pSessStat, char* det uInfo("session destroyed num: %" PRId64, pSessStat->destroyNum); } -void mpPrintStat(SMemPool* pPool, SMPSession* pSession, char* procName) { - char detailName[128]; - if (NULL != pSession) { - snprintf(detailName, sizeof(detailName) - 1, "%s - %s", procName, "Session"); - detailName[sizeof(detailName) - 1] = 0; - mpPrintStatDetail(&pSession->ctrlInfo, &pSession->stat.statDetail, detailName, pSession->maxAllocMemSize); - - snprintf(detailName, sizeof(detailName) - 1, "%s - %s", procName, "SessionFile"); - detailName[sizeof(detailName) - 1] = 0; - mpPrintFileLineStat(&pSession->ctrlInfo, pSession->stat.fileStat, detailName); - - snprintf(detailName, sizeof(detailName) - 1, "%s - %s", procName, "SessionFileLine"); - detailName[sizeof(detailName) - 1] = 0; - mpPrintFileLineStat(&pSession->ctrlInfo, pSession->stat.lineStat, detailName); - } - - snprintf(detailName, sizeof(detailName) - 1, "%s - %s", procName, pPool->name); - detailName[sizeof(detailName) - 1] = 0; - mpPrintSessionStat(&pPool->ctrlInfo, &pPool->stat.statSession, detailName); - mpPrintStatDetail(&pPool->ctrlInfo, &pPool->stat.statDetail, detailName, pPool->maxAllocMemSize); - - snprintf(detailName, sizeof(detailName) - 1, "%s - %s", procName, "MemPoolNode"); - detailName[sizeof(detailName) - 1] = 0; - mpPrintNodeStat(&pSession->ctrlInfo, pSession->stat.nodeStat, detailName); - - snprintf(detailName, sizeof(detailName) - 1, "%s - %s", procName, "MemPoolFile"); - detailName[sizeof(detailName) - 1] = 0; - mpPrintFileLineStat(&pSession->ctrlInfo, pSession->stat.fileStat, detailName); - - snprintf(detailName, sizeof(detailName) - 1, "%s - %s", procName, "MemPoolFileLine"); - detailName[sizeof(detailName) - 1] = 0; - mpPrintFileLineStat(&pSession->ctrlInfo, pSession->stat.lineStat, detailName); -} void mpLogStatDetail(SMPStatDetail* pDetail, EMPStatLogItem item, SMPStatInput* pInput) { switch (item) { @@ -653,6 +620,44 @@ _return: gMPMgmt.code = code; } +void taosMemPoolPrintStat(void* poolHandle, void* session, char* procName) { + SMemPool* pPool = (SMemPool*)poolHandle; + SMPSession* pSession = (SMPSession*)session; + char detailName[128]; + + if (NULL != pSession) { + snprintf(detailName, sizeof(detailName) - 1, "%s - %s", procName, "Session"); + detailName[sizeof(detailName) - 1] = 0; + mpPrintStatDetail(&pSession->ctrlInfo, &pSession->stat.statDetail, detailName, pSession->maxAllocMemSize); + + snprintf(detailName, sizeof(detailName) - 1, "%s - %s", procName, "SessionFile"); + detailName[sizeof(detailName) - 1] = 0; + mpPrintFileLineStat(&pSession->ctrlInfo, pSession->stat.fileStat, detailName); + + snprintf(detailName, sizeof(detailName) - 1, "%s - %s", procName, "SessionFileLine"); + detailName[sizeof(detailName) - 1] = 0; + mpPrintFileLineStat(&pSession->ctrlInfo, pSession->stat.lineStat, detailName); + } + + snprintf(detailName, sizeof(detailName) - 1, "%s - %s", procName, pPool->name); + detailName[sizeof(detailName) - 1] = 0; + mpPrintSessionStat(&pPool->ctrlInfo, &pPool->stat.statSession, detailName); + mpPrintStatDetail(&pPool->ctrlInfo, &pPool->stat.statDetail, detailName, pPool->maxAllocMemSize); + + snprintf(detailName, sizeof(detailName) - 1, "%s - %s", procName, "MemPoolNode"); + detailName[sizeof(detailName) - 1] = 0; + mpPrintNodeStat(&pSession->ctrlInfo, pSession->stat.nodeStat, detailName); + + snprintf(detailName, sizeof(detailName) - 1, "%s - %s", procName, "MemPoolFile"); + detailName[sizeof(detailName) - 1] = 0; + mpPrintFileLineStat(&pSession->ctrlInfo, pSession->stat.fileStat, detailName); + + snprintf(detailName, sizeof(detailName) - 1, "%s - %s", procName, "MemPoolFileLine"); + detailName[sizeof(detailName) - 1] = 0; + mpPrintFileLineStat(&pSession->ctrlInfo, pSession->stat.lineStat, detailName); +} + + int32_t taosMemPoolOpen(char* poolName, SMemPoolCfg* cfg, void** poolHandle) { int32_t code = TSDB_CODE_SUCCESS; SMemPool* pPool = NULL; @@ -714,7 +719,7 @@ void taosMemPoolDestroySession(void* poolHandle, void* session) { (void)atomic_add_fetch_64(&pPool->stat.statSession.destroyNum, 1); - mpPrintStat(pPool, pSession, "DestroySession"); + taosMemPoolPrintStat(pPool, pSession, "DestroySession"); TAOS_MEMSET(pSession, 0, sizeof(*pSession)); diff --git a/source/util/test/CMakeLists.txt b/source/util/test/CMakeLists.txt index e8aabfe338..d0fdca9f30 100644 --- a/source/util/test/CMakeLists.txt +++ b/source/util/test/CMakeLists.txt @@ -140,4 +140,13 @@ if (${TD_LINUX}) add_custom_command(TARGET terrorTest POST_BUILD COMMAND ${CMAKE_COMMAND} -E copy_if_different ${ERR_TBL_FILE} $ ) -endif () \ No newline at end of file + + # memPoolTest + add_executable(memPoolTest "memPoolTest.cpp") + target_link_libraries(memPoolTest os util common gtest_main) + add_test( + NAME memPoolTest + COMMAND memPoolTest + ) + +endif () diff --git a/source/util/test/memPoolTest.cpp b/source/util/test/memPoolTest.cpp index e185a9eb5a..12d499802f 100644 --- a/source/util/test/memPoolTest.cpp +++ b/source/util/test/memPoolTest.cpp @@ -31,27 +31,37 @@ #include "os.h" -#include "executor.h" -#include "executorInt.h" -#include "function.h" -#include "operator.h" +#include "thash.h" +#include "theap.h" #include "taos.h" -#include "tdatablock.h" #include "tdef.h" #include "tvariant.h" #include "stub.h" -#include "querytask.h" namespace { #define MPT_PRINTF (void)printf +#define MPT_MAX_MEM_ACT_TIMES 10000 #define MPT_MAX_SESSION_NUM 256 -#define MPT_MAX_JOB_NUM 20000 +#define MPT_MAX_JOB_NUM 200 +#define MPT_MAX_THREAD_NUM 100 + +#define MPT_DEFAULT_RESERVE_MEM_PERCENT 20 +#define MPT_MIN_RESERVE_MEM_SIZE (512 * 1048576UL) +#define MPT_MIN_MEM_POOL_SIZE (1048576UL) +#define MPT_MAX_RETIRE_JOB_NUM 10000 + threadlocal void* mptThreadPoolHandle = NULL; threadlocal void* mptThreadPoolSession = NULL; +#define MPT_SET_TEID(id, tId, eId) \ + do { \ + *(uint64_t *)(id) = (tId); \ + *(uint32_t *)((char *)(id) + sizeof(tId)) = (eId); \ + } while (0) + #define mptEnableMemoryPoolUsage(_pool, _session) do { mptThreadPoolHandle = _pool; mptThreadPoolSession = _session; } while (0) #define mptDisableMemoryPoolUsage() (mptThreadPoolHandle = NULL) @@ -67,7 +77,7 @@ threadlocal void* mptThreadPoolSession = NULL; #define mptMemoryTrim(_size) ((NULL != mptThreadPoolHandle) ? (taosMemPoolTrim(mptThreadPoolHandle, mptThreadPoolSession, _size, __FILE__, __LINE__)) : (taosMemTrim(_size))) #define mptMemoryMallocAlign(_alignment, _size) ((NULL != mptThreadPoolHandle) ? (taosMemPoolMallocAlign(mptThreadPoolHandle, mptThreadPoolSession, _alignment, _size, __FILE__, __LINE__)) : (taosMemMallocAlign(_alignment, _size))) -typedef enum { +enum { MPT_SMALL_MSIZE = 0, MPT_BIG_MSIZE, }; @@ -85,6 +95,7 @@ typedef struct SMPTJobInfo { int32_t errCode; SMemPoolJob* memInfo; SHashObj* pSessions; + void* pCtx; } SMPTJobInfo; @@ -94,33 +105,71 @@ typedef struct { } SMPTestCtrl; typedef struct { + void* p; + int64_t size; +} SMPTestMemInfo; + +typedef struct { + int8_t taskExecLock; + bool taskFinished; + + int64_t poolMaxUsedSize; + int64_t poolTotalUsedSize; + + SMPStatDetail stat; + + int32_t memIdx; + SMPTestMemInfo* pMemList[MPT_MAX_MEM_ACT_TIMES]; + + + int64_t npSize; + int32_t npMemIdx; + SMPTestMemInfo* npMemList[MPT_MAX_MEM_ACT_TIMES]; + + bool taskFreed; +} SMPTestTaskCtx; + +typedef struct { + SRWLatch jobExecLock; + + int32_t jobIdx; uint64_t jobId; - int32_t taskNum; - int64_t poolMaxSize; - int64_t npoolSize; void* pSessions[MPT_MAX_SESSION_NUM]; + int32_t taskNum; SMPTestTaskCtx taskCtxs[MPT_MAX_SESSION_NUM]; + int32_t taskExecIdx; + + SMPTJobInfo* pJob; + int32_t jobStatus; } SMPTestJobCtx; typedef struct { - int64_t poolMaxSize; - int64_t npoolSize; - int32_t memActTimes; -} SMPTestTaskCtx; + int64_t jobQuota; + bool autoPoolSize; + int32_t poolSize; + int32_t maxExecJobNum; + int32_t threadNum; + int32_t randTask; +} SMPTestParam; + +typedef struct { + bool allJobs; + bool autoJob; +} SMPTestThread; typedef struct SMPTestCtx { uint64_t qId; - uint64_t tId; - int32_t eId; SHashObj* pJobs; BoundedQueue* pJobQueue; void* memPoolHandle; + SMPTestThread threadCtxs[MPT_MAX_THREAD_NUM]; SMPTestJobCtx jobCtxs[MPT_MAX_JOB_NUM]; + SMPTestParam param; } SMPTestCtx; SMPTestCtx mptCtx = {0}; - +#if 0 void joinTestReplaceRetrieveFp() { static Stub stub; stub.set(getNextBlockFromDownstreamRemain, getDummyInputBlock); @@ -143,6 +192,7 @@ void joinTestReplaceRetrieveFp() { #endif } } +#endif void mptInitLogFile() { const char *defaultLogFileNamePrefix = "mplog"; @@ -153,13 +203,13 @@ void mptInitLogFile() { TAOS_STRCPY(tsLogDir, TD_LOG_DIR_PATH); if (taosInitLog(defaultLogFileNamePrefix, maxLogFileNum) < 0) { - JT_PRINTF("failed to open log file in directory:%s\n", tsLogDir); + MPT_PRINTF("failed to open log file in directory:%s\n", tsLogDir); } } static bool mptJobMemSizeCompFn(void* l, void* r, void* param) { - SQWJobInfo* left = (SQWJobInfo*)l; - SQWJobInfo* right = (SQWJobInfo*)r; + SMPTJobInfo* left = (SMPTJobInfo*)l; + SMPTJobInfo* right = (SMPTJobInfo*)r; if (atomic_load_8(&right->retired)) { return true; } @@ -174,29 +224,111 @@ void mptInit() { mptCtx.pJobs = taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK); ASSERT_TRUE(NULL != mptCtx.pJobs); - mptCtx.pJobQueue = createBoundedQueue(10000, qwJobMemSizeCompFn, NULL, NULL); + mptCtx.pJobQueue = createBoundedQueue(10000, mptJobMemSizeCompFn, NULL, NULL); ASSERT_TRUE(NULL != mptCtx.pJobQueue); } -void mptRetireJob(SQWJobInfo* pJob) { - //TODO +void mptDestroyTaskCtx(SMPTestTaskCtx* pTask) { + for (int32_t i = 0; i < pTask->memIdx; ++i) { + taosMemFreeClear(pTask->pMemList[i].p); + } + for (int32_t i = 0; i < pTask->npMemIdx; ++i) { + taosMemFreeClear(pTask->npMemList[i].p); + } +} + +void mptDestroyJobInfo(SMPTJobInfo* pJob) { + taosMemFree(pJob->memInfo); + taosHashCleanup(pJob->pSessions); +} + + +void mptDestroyJob(SMPTestJobCtx* pJobCtx, bool reset) { + taosWLockLatch(&pJobCtx->jobExecLock); + + mptDestroyJobInfo(pJobCtx->pJob); + (void)taosHashRemove(mptCtx.pJobs, &pJobCtx->jobId, sizeof(pJobCtx->jobId)); + for (int32_t i = 0; i < pJobCtx->taskNum; ++i) { + taosMemPoolDestroySession(mptCtx.memPoolHandle, pJobCtx->pSessions[i]); + mptDestroyTaskCtx(&pJobCtx->taskCtxs[i]); + } + if (reset) { + memset((char*)pJobCtx + sizeof(pJobCtx->jobExecLock), 0, sizeof(SMPTestJobCtx) - sizeof(pJobCtx->jobExecLock)); + mptInitJob(pJobCtx->jobIdx); + } + + taosWUnLockLatch(&pJobCtx->jobExecLock); +} + +void mptCheckCompareJobInfo(SMPTestJobCtx* pJobCtx) { + +} + +void mptResetJob(SMPTestJobCtx* pJobCtx) { + mptDestroyJob(pJobCtx, true); +} + +void mptRetireJob(SMPTJobInfo* pJob) { + SMPTestJobCtx* pCtx = (SMPTestJobCtx*)pJob->pCtx; + + mptCheckCompareJobInfo(pCtx); + + mptResetJob(pCtx); +} + +int32_t mptGetMemPoolMaxMemSize(int64_t totalSize, int64_t* maxSize) { + int64_t reserveSize = TMAX(totalSize * MPT_DEFAULT_RESERVE_MEM_PERCENT / 100 / 1048576UL * 1048576UL, MPT_MIN_RESERVE_MEM_SIZE); + int64_t availSize = (totalSize - reserveSize) / 1048576UL * 1048576UL; + if (availSize < MPT_MIN_MEM_POOL_SIZE) { + uError("too little available query memory, totalAvailable: %" PRId64 ", reserveSize: %" PRId64, totalSize, reserveSize); + return TSDB_CODE_QRY_TOO_FEW_AVAILBLE_MEM; + } + + *maxSize = availSize; + + return TSDB_CODE_SUCCESS; +} + +int32_t mptGetQueryMemPoolMaxSize(int64_t* pMaxSize, bool* autoMaxSize) { + if (!mptCtx.param.autoPoolSize && mptCtx.param.poolSize > 0) { + *pMaxSize = mptCtx.param.poolSize * 1048576UL; + *autoMaxSize = false; + + return TSDB_CODE_SUCCESS; + } + + int64_t memSize = 0; + int32_t code = taosGetSysAvailMemory(&memSize); + if (TSDB_CODE_SUCCESS != code) { + uError("get system avaiable memory size failed, error: 0x%x", code); + return code; + } + + code = mptGetMemPoolMaxMemSize(memSize, pMaxSize); + if (TSDB_CODE_SUCCESS != code) { + return code; + } + + *autoMaxSize = true; + + return code; } void mptCheckUpateCfgCb(void* pHandle, void* cfg) { SMemPoolCfg* pCfg = (SMemPoolCfg*)cfg; - int64_t newJobQuota = tsSingleQueryMaxMemorySize * 1048576UL; + int64_t newJobQuota = mptCtx.singleQueryMaxSize * 1048576UL; if (pCfg->jobQuota != newJobQuota) { atomic_store_64(&pCfg->jobQuota, newJobQuota); } int64_t maxSize = 0; bool autoMaxSize = false; - int32_t code = qwGetQueryMemPoolMaxSize(&maxSize, &autoMaxSize); + int32_t code = mptGetQueryMemPoolMaxSize(&maxSize, &autoMaxSize); if (TSDB_CODE_SUCCESS != code) { pCfg->maxSize = 0; - qError("get query memPool maxSize failed, reset maxSize to %" PRId64, pCfg->maxSize); + uError("get query memPool maxSize failed, reset maxSize to %" PRId64, pCfg->maxSize); return; } @@ -214,7 +346,7 @@ void mptLowLevelRetire(int64_t retireSize, int32_t errCode) { if (aSize >= retireSize && 0 == atomic_val_compare_exchange_32(&pJob->errCode, 0, errCode) && 0 == atomic_val_compare_exchange_8(&pJob->retired, 0, 1)) { mptRetireJob(pJob); - qDebug("QID:0x%" PRIx64 " job retired cause of low level memory retire, usedSize:%" PRId64 ", retireSize:%" PRId64, + uDebug("QID:0x%" PRIx64 " job retired cause of low level memory retire, usedSize:%" PRId64 ", retireSize:%" PRId64, pJob->memInfo->jobId, aSize, retireSize); taosHashCancelIterate(mptCtx.pJobs, pJob); @@ -226,7 +358,7 @@ void mptLowLevelRetire(int64_t retireSize, int32_t errCode) { } void mptMidLevelRetire(int64_t retireSize, int32_t errCode) { - SMPTJobInfo* pJob = (SMPTJobInfo*)taosHashIterate(mptCtx.pJobInfo, NULL); + SMPTJobInfo* pJob = (SMPTJobInfo*)taosHashIterate(mptCtx.pJobs, NULL); PriorityQueueNode qNode; while (NULL != pJob) { if (0 == atomic_load_8(&pJob->retired)) { @@ -256,7 +388,7 @@ void mptMidLevelRetire(int64_t retireSize, int32_t errCode) { mptRetireJob(pJob); - qDebug("QID:0x%" PRIx64 " job retired cause of mid level memory retire, usedSize:%" PRId64 ", retireSize:%" PRId64, + uDebug("QID:0x%" PRIx64 " job retired cause of mid level memory retire, usedSize:%" PRId64 ", retireSize:%" PRId64, pJob->memInfo->jobId, aSize, retireSize); retiredSize += aSize; @@ -277,23 +409,23 @@ void mptRetireJobsCb(int64_t retireSize, bool lowLevelRetire, int32_t errCode) { void mptRetireJobCb(SMemPoolJob* mpJob, int32_t errCode) { SMPTJobInfo* pJob = (SMPTJobInfo*)taosHashGet(mptCtx.pJobs, &mpJob->jobId, sizeof(mpJob->jobId)); if (NULL == pJob) { - qError("QID:0x%" PRIx64 " fail to get job from job hash", mpJob->jobId); + uError("QID:0x%" PRIx64 " fail to get job from job hash", mpJob->jobId); return; } if (0 == atomic_val_compare_exchange_32(&pJob->errCode, 0, errCode) && 0 == atomic_val_compare_exchange_8(&pJob->retired, 0, 1)) { - qwRetireJob(pJob); + mptRetireJob(pJob); - qInfo("QID:0x%" PRIx64 " retired directly, errCode: 0x%x", mpJob->jobId, errCode); + uInfo("QID:0x%" PRIx64 " retired directly, errCode: 0x%x", mpJob->jobId, errCode); } else { - qDebug("QID:0x%" PRIx64 " already retired, retired: %d, errCode: 0x%x", mpJob->jobId, atomic_load_8(&pJob->retired), atomic_load_32(&pJob->errCode)); + uDebug("QID:0x%" PRIx64 " already retired, retired: %d, errCode: 0x%x", mpJob->jobId, atomic_load_8(&pJob->retired), atomic_load_32(&pJob->errCode)); } } int32_t mptInitJobInfo(uint64_t qId, SMPTJobInfo* pJob) { pJob->pSessions= taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK); if (NULL == pJob->pSessions) { - qError("fail to init session hash, code: 0x%x", terrno); + uError("fail to init session hash, code: 0x%x", terrno); return terrno; } @@ -308,7 +440,7 @@ int32_t mptInitJobInfo(uint64_t qId, SMPTJobInfo* pJob) { } -int32_t mptInitSession(uint64_t qId, uint64_t tId, int32_t eId, void** ppSession) { +int32_t mptInitSession(uint64_t qId, uint64_t tId, int32_t eId, SMPTestJobCtx* pJobCtx, void** ppSession) { int32_t code = TSDB_CODE_SUCCESS; SMPTJobInfo* pJob = NULL; @@ -323,7 +455,7 @@ int32_t mptInitSession(uint64_t qId, uint64_t tId, int32_t eId, void** ppSession code = taosHashPut(mptCtx.pJobs, &qId, sizeof(qId), &jobInfo, sizeof(jobInfo)); if (TSDB_CODE_SUCCESS != code) { - qwDestroyJobInfo(&jobInfo); + mptDestroyJobInfo(&jobInfo); if (TSDB_CODE_DUP_KEY == code) { code = TSDB_CODE_SUCCESS; continue; @@ -334,7 +466,7 @@ int32_t mptInitSession(uint64_t qId, uint64_t tId, int32_t eId, void** ppSession pJob = (SMPTJobInfo*)taosHashAcquire(mptCtx.pJobs, &qId, sizeof(qId)); if (NULL == pJob) { - qError("QID:0x%" PRIx64 " not in joj hash, may be dropped", qId); + uError("QID:0x%" PRIx64 " not in joj hash, may be dropped", qId); return TSDB_CODE_QRY_JOB_NOT_EXIST; } } @@ -342,16 +474,15 @@ int32_t mptInitSession(uint64_t qId, uint64_t tId, int32_t eId, void** ppSession break; } - QW_ERR_JRET(taosMemPoolInitSession(mptCtx.memPoolHandle, ppSession, pJob->memInfo)); + pJobCtx->pJob = pJob; + pJob->pCtx = pJobCtx; + + assert(0 == taosMemPoolInitSession(mptCtx.memPoolHandle, ppSession, pJob->memInfo)); char id[sizeof(tId) + sizeof(eId)] = {0}; - QW_SET_TEID(id, tId, eId); + MPT_SET_TEID(id, tId, eId); - code = taosHashPut(pJob->pSessions, id, sizeof(id), ppSession, POINTER_BYTES); - if (TSDB_CODE_SUCCESS != code) { - qError("fail to put session into query session hash, code: 0x%x", code); - QW_ERR_JRET(code); - } + assert(0 == taosHashPut(pJob->pSessions, id, sizeof(id), ppSession, POINTER_BYTES)); _return: @@ -363,12 +494,12 @@ _return: } -void mptInitPool(int64_t jobQuota, bool autoMaxSize, int64_t maxSize) { +void mptInitPool(void) { SMemPoolCfg cfg = {0}; - cfg.autoMaxSize = autoMaxSize; - if (!autoMaxSize) { - cfg.maxSize = maxSize; + cfg.autoMaxSize = mptCtx.param.jobQuota; + if (!mptCtx.param.autoPoolSize) { + cfg.maxSize = mptCtx.param.poolSize; } else { int64_t memSize = 0; ASSERT_TRUE(0 == taosGetSysAvailMemory(&memSize)); @@ -376,7 +507,7 @@ void mptInitPool(int64_t jobQuota, bool autoMaxSize, int64_t maxSize) { } cfg.threadNum = 10; //TODO cfg.evicPolicy = E_EVICT_AUTO; //TODO - cfg.jobQuota = jobQuota; + cfg.jobQuota = mptCtx.param.jobQuota; cfg.cb.retireJobsFp = mptRetireJobsCb; cfg.cb.retireJobFp = mptRetireJobCb; cfg.cb.cfgUpdateFp = mptCheckUpateCfgCb; @@ -384,22 +515,77 @@ void mptInitPool(int64_t jobQuota, bool autoMaxSize, int64_t maxSize) { ASSERT_TRUE(0 == taosMemPoolOpen("SingleThreadTest", &cfg, &mptCtx.memPoolHandle)); } -void mptMemorySimulate(SMPTestTaskCtx* pCtx) { +void mptSimulateTask(SMPTestJobCtx* pJob, SMPTestTaskCtx* pCtx, bool finishTask) { + int32_t actTimes = 0; + if (!finishTask) { + actTimes = taosRand() % 100 * () + } + + +} + +void mptSimulateOutTask(SMPTestTaskCtx* pCtx, bool finishTask) { } -void mptTaskRun(int32_t idx, uint64_t qId, uint64_t tId, int32_t eId) { - ASSERT_TRUE(0 == mptInitSession(qId, tId, eId, &mptCtx.pSessions[idx])); - mptEnableMemoryPoolUsage(mptCtx.memPoolHandle, mptCtx.pSessions[idx]); - mptMemorySimulate(&mptCtx.taskCtxs[idx]); +void mptTaskRun(SMPTestJobCtx* pJob, SMPTestTaskCtx* pCtx, int32_t idx, bool finishTask) { + taosRLockLatch(&pCtx->taskExecLock); + + mptEnableMemoryPoolUsage(mptCtx.memPoolHandle, pJob->pSessions[idx]); + mptSimulateTask(pJob, pCtx, finishTask); mptDisableMemoryPoolUsage(); + mptSimulateOutTask(pCtx, finishTask); + + taosRUnLockLatch(&pCtx->taskExecLock); +} + +void mptInitTask(int32_t idx, uint64_t qId, uint64_t tId, int32_t eId, SMPTestJobCtx* pJob) { + ASSERT_TRUE(0 == mptInitSession(qId, tId, eId, pJob, &pJob->pSessions[idx])); +} + +void mptInitJob(int32_t idx) { + SMPTestJobCtx* pJobCtx = &mptCtx.jobCtxs[idx]; + + pJobCtx->jobIdx = idx; + pJobCtx->jobId = atomic_add_fetch_64(&mptCtx.qId, 1); + pJobCtx->taskNum = (taosRand() % MPT_MAX_SESSION_NUM) + 1; + for (int32_t i = 0; i < pJobCtx->taskNum; ++i) { + mptInitTask(i, pJobCtx->jobId, i, 0, pJobCtx); + } +} + +void mptInitJobs() { + for (int32_t i = 0; i < MPT_MAX_JOB_NUM; ++i) { + mptInitJob(i); + } } void* mptThreadFunc(void* param) { - int32_t* threadIdx = (int32_t*)param; - + SMPTestThread* pThread = (SMPTestThread*)param; + + for (int32_t i = 0; i < MPT_MAX_JOB_NUM; ++i) { + SMPTestJobCtx* pJobCtx = &mptCtx.jobCtxs[i]; + if (taosRTryLockLatch(&pJobCtx->jobExecLock)) { + continue; + } + + if (mptCtx.param.randTask) { + int32_t taskIdx = taosRand() % pJobCtx->taskNum; + mptTaskRun(pJobCtx, &pJobCtx->taskCtxs[taskIdx], taskIdx, false); + taosRUnLockLatch(&pJobCtx->jobExecLock); + continue; + } + + for (int32_t m = 0; m < pJobCtx->taskNum; ++m) { + mptTaskRun(pJobCtx, &pJobCtx->taskCtxs[m], m, false); + } + + taosRUnLockLatch(&pJobCtx->jobExecLock); + } + + return NULL; } void mptStartThreadTest(int32_t threadIdx) { @@ -407,21 +593,29 @@ void mptStartThreadTest(int32_t threadIdx) { TdThreadAttr thattr; ASSERT_EQ(0, taosThreadAttrInit(&thattr)); ASSERT_EQ(0, taosThreadAttrSetDetachState(&thattr, PTHREAD_CREATE_JOINABLE)); - ASSERT_EQ(0, taosThreadCreate(&(t1), &thattr, mptThreadFunc, &threadIdx)); + ASSERT_EQ(0, taosThreadCreate(&(t1), &thattr, mptThreadFunc, &mptCtx.threadCtxs[threadIdx])); ASSERT_EQ(0, taosThreadAttrDestroy(&thattr)); } -void mptRunCase() { +void mptRunCase(SMPTestParam* param) { + memcpy(&mptCtx.param, param, sizeof(SMPTestParam)); + mptInitPool(); + + mptInitJobs(); + + for (int32_t i = 0; i < mptCtx.param.threadNum; ++i) { + mptCtx.threadCtxs[i]. + mptStartThreadTest(i); + } } } // namespace #if 1 #if 1 -TEST(FuncTest, SingleThreadTest) { - SJoinTestParam param; +TEST(FuncTest, SingleJobTest) { char* caseName = "FuncTest:SingleThreadTest"; void* pSession = NULL; @@ -431,6 +625,18 @@ TEST(FuncTest, SingleThreadTest) { } #endif +#if 1 +TEST(FuncTest, MultiJobsTest) { + char* caseName = "FuncTest:SingleThreadTest"; + void* pSession = NULL; + + mptInitPool(0, false, 5*1048576UL); + + + +} +#endif + #endif